目次
データ品質スコア算出とMLモデル評価
概要
暗号通貨市場データの品質評価とMLモデルの性能評価は、予測システムの信頼性を確保する上で極めて重要です。本ドキュメントでは、リアルタイムデータ品質モニタリング、品質スコアの算出方法、MLモデルの評価指標、およびデータドリフト検出について解説します。
1. データ品質の定義と評価指標
1.1 データ品質の次元
# データ品質の6つの次元
DATA_QUALITY_DIMENSIONS = {
'completeness': '必要なデータがすべて存在するか',
'accuracy': 'データが実際の値を正確に反映しているか',
'consistency': 'データが矛盾なく一貫しているか',
'timeliness': 'データが適切なタイミングで利用可能か',
'validity': 'データが定義されたルールに従っているか',
'uniqueness': '重複データが存在しないか'
}
1.2 暗号通貨データ特有の品質指標
class CryptoDataQualityMetrics:
def __init__(self):
self.metrics = {
'price_integrity': {
'description': '価格データの整合性',
'checks': [
'OHLC関係の妥当性',
'価格スパイクの検出',
'取引所間価格差の妥当性'
]
},
'volume_authenticity': {
'description': 'ボリュームの真正性',
'checks': [
'ウォッシュトレーディング検出',
'ボリュームスパイク分析',
'価格変動とボリュームの相関'
]
},
'latency_metrics': {
'description': 'データ遅延の評価',
'checks': [
'データ到着時間の分布',
'ピーク時の遅延',
'取引所APIの応答時間'
]
},
'coverage_completeness': {
'description': 'データカバレッジ',
'checks': [
'取引ペアのカバー率',
'時間的連続性',
'取引所カバレッジ'
]
}
}
2. データ品質スコア算出システム
2.1 総合品質スコアの計算
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List, Tuple
import logging
class DataQualityScorer:
def __init__(self, weights: Dict[str, float] = None):
self.weights = weights or {
'completeness': 0.25,
'accuracy': 0.30,
'consistency': 0.20,
'timeliness': 0.15,
'validity': 0.10
}
self.score_history = []
self.logger = logging.getLogger(__name__)
def calculate_quality_score(self, data: pd.DataFrame,
metadata: Dict = None) -> Dict:
"""総合的な品質スコアの計算"""
scores = {}
# 完全性スコア
scores['completeness'] = self.calculate_completeness_score(data)
# 正確性スコア
scores['accuracy'] = self.calculate_accuracy_score(data, metadata)
# 一貫性スコア
scores['consistency'] = self.calculate_consistency_score(data)
# 適時性スコア
scores['timeliness'] = self.calculate_timeliness_score(data, metadata)
# 妥当性スコア
scores['validity'] = self.calculate_validity_score(data)
# 総合スコアの計算
total_score = sum(
scores[dim] * self.weights[dim]
for dim in self.weights
)
# 詳細な分析結果
result = {
'total_score': total_score,
'dimension_scores': scores,
'timestamp': datetime.now(),
'data_points': len(data),
'issues_found': self.identify_issues(scores),
'recommendations': self.generate_recommendations(scores)
}
# 履歴に追加
self.score_history.append(result)
return result
def calculate_completeness_score(self, data: pd.DataFrame) -> float:
"""完全性スコアの計算"""
# 欠損値の割合
missing_ratio = data.isnull().sum().sum() / (len(data) * len(data.columns))
# 必須カラムの存在確認
required_columns = ['timestamp', 'open', 'high', 'low', 'close', 'volume']
column_coverage = sum(
1 for col in required_columns if col in data.columns
) / len(required_columns)
# 時系列の連続性
if 'timestamp' in data.columns:
time_gaps = self.detect_time_gaps(data['timestamp'])
continuity_score = 1 - min(len(time_gaps) / len(data), 1)
else:
continuity_score = 0
# スコアの統合
completeness_score = (
(1 - missing_ratio) * 0.4 +
column_coverage * 0.3 +
continuity_score * 0.3
)
return completeness_score
def calculate_accuracy_score(self, data: pd.DataFrame,
metadata: Dict = None) -> float:
"""正確性スコアの計算"""
accuracy_checks = []
# 価格データの妥当性チェック
if all(col in data.columns for col in ['high', 'low', 'open', 'close']):
# OHLC関係のチェック
ohlc_valid = (
(data['high'] >= data['low']).all() and
(data['high'] >= data['open']).all() and
(data['high'] >= data['close']).all() and
(data['low'] <= data['open']).all() and
(data['low'] <= data['close']).all()
)
accuracy_checks.append(1.0 if ohlc_valid else 0.0)
# 価格スパイクの検出
price_changes = data['close'].pct_change()
extreme_changes = (price_changes.abs() > 0.5).sum() # 50%以上の変動
spike_score = 1 - min(extreme_changes / len(data), 1)
accuracy_checks.append(spike_score)
# ボリュームの妥当性
if 'volume' in data.columns:
negative_volumes = (data['volume'] < 0).sum()
volume_score = 1 - (negative_volumes / len(data))
accuracy_checks.append(volume_score)
# 外れ値の検出
outlier_score = self.calculate_outlier_score(data)
accuracy_checks.append(outlier_score)
return np.mean(accuracy_checks) if accuracy_checks else 0
def calculate_consistency_score(self, data: pd.DataFrame) -> float:
"""一貫性スコアの計算"""
consistency_checks = []
# データ型の一貫性
expected_types = {
'open': 'float64',
'high': 'float64',
'low': 'float64',
'close': 'float64',
'volume': 'float64'
}
type_consistency = sum(
1 for col, expected_type in expected_types.items()
if col in data.columns and data[col].dtype == expected_type
) / len(expected_types)
consistency_checks.append(type_consistency)
# 時系列の順序
if 'timestamp' in data.columns:
is_sorted = data['timestamp'].is_monotonic_increasing
consistency_checks.append(1.0 if is_sorted else 0.0)
# 価格とボリュームの相関
if 'close' in data.columns and 'volume' in data.columns:
price_vol_corr = self.check_price_volume_correlation(data)
consistency_checks.append(price_vol_corr)
return np.mean(consistency_checks) if consistency_checks else 0
def calculate_timeliness_score(self, data: pd.DataFrame,
metadata: Dict = None) -> float:
"""適時性スコアの計算"""
if metadata is None:
return 1.0 # メタデータがない場合はデフォルト値
timeliness_factors = []
# データの鮮度
if 'last_update' in metadata:
age = datetime.now() - metadata['last_update']
freshness_score = max(0, 1 - age.seconds / 3600) # 1時間で0になる
timeliness_factors.append(freshness_score)
# 遅延の評価
if 'latency_ms' in metadata:
latency_score = max(0, 1 - metadata['latency_ms'] / 1000) # 1秒で0
timeliness_factors.append(latency_score)
# 更新頻度
if 'update_frequency' in metadata:
freq_score = min(metadata['update_frequency'] / 60, 1) # 60回/分で満点
timeliness_factors.append(freq_score)
return np.mean(timeliness_factors) if timeliness_factors else 0.5
def calculate_validity_score(self, data: pd.DataFrame) -> float:
"""妥当性スコアの計算"""
validity_checks = []
# 価格の範囲チェック
if 'close' in data.columns:
valid_prices = ((data['close'] > 0) & (data['close'] < 1e9)).sum()
price_validity = valid_prices / len(data)
validity_checks.append(price_validity)
# ボリュームの範囲チェック
if 'volume' in data.columns:
valid_volumes = (data['volume'] >= 0).sum()
volume_validity = valid_volumes / len(data)
validity_checks.append(volume_validity)
# タイムスタンプの妥当性
if 'timestamp' in data.columns:
try:
# 未来の日付がないかチェック
future_dates = (data['timestamp'] > datetime.now()).sum()
# 極端に古い日付がないかチェック
old_dates = (data['timestamp'] < datetime.now() - timedelta(days=365*5)).sum()
timestamp_validity = 1 - (future_dates + old_dates) / len(data)
validity_checks.append(timestamp_validity)
except:
validity_checks.append(0)
return np.mean(validity_checks) if validity_checks else 0
def detect_time_gaps(self, timestamps: pd.Series) -> List[Tuple[datetime, datetime]]:
"""時系列データのギャップ検出"""
gaps = []
timestamps = pd.to_datetime(timestamps).sort_values()
# 期待される間隔を推定
intervals = timestamps.diff().dropna()
expected_interval = intervals.mode()[0] if len(intervals) > 0 else pd.Timedelta(minutes=1)
# ギャップの検出
for i in range(1, len(timestamps)):
actual_interval = timestamps.iloc[i] - timestamps.iloc[i-1]
if actual_interval > expected_interval * 2: # 期待値の2倍以上
gaps.append((timestamps.iloc[i-1], timestamps.iloc[i]))
return gaps
def calculate_outlier_score(self, data: pd.DataFrame) -> float:
"""外れ値スコアの計算"""
numeric_columns = data.select_dtypes(include=[np.number]).columns
outlier_ratios = []
for col in numeric_columns:
if col in data.columns:
# IQR法による外れ値検出
Q1 = data[col].quantile(0.25)
Q3 = data[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers = ((data[col] < lower_bound) | (data[col] > upper_bound)).sum()
outlier_ratio = outliers / len(data)
outlier_ratios.append(outlier_ratio)
avg_outlier_ratio = np.mean(outlier_ratios) if outlier_ratios else 0
return 1 - avg_outlier_ratio
def check_price_volume_correlation(self, data: pd.DataFrame) -> float:
"""価格とボリュームの相関チェック"""
# 価格変動率とボリュームの相関
price_changes = data['close'].pct_change().abs()
volume_changes = data['volume'].pct_change().abs()
# 相関係数の計算
correlation = price_changes.corr(volume_changes)
# 適度な正の相関が期待される(0.2-0.7)
if 0.2 <= correlation <= 0.7:
return 1.0
elif correlation < 0:
return 0.5 # 負の相関は異常
else:
return 0.7 # 相関が強すぎるか弱すぎる
2.2 リアルタイム品質モニタリング
class RealTimeQualityMonitor:
def __init__(self, alert_threshold=0.7):
self.alert_threshold = alert_threshold
self.quality_scorer = DataQualityScorer()
self.monitoring_stats = {
'total_checks': 0,
'alerts_raised': 0,
'quality_history': deque(maxlen=1000)
}
self.alert_callbacks = []
async def monitor_data_stream(self, data_stream):
"""データストリームのリアルタイム品質監視"""
async for data_batch in data_stream:
# 品質スコアの計算
quality_result = self.quality_scorer.calculate_quality_score(
data_batch['data'],
data_batch.get('metadata')
)
# 統計の更新
self.monitoring_stats['total_checks'] += 1
self.monitoring_stats['quality_history'].append({
'timestamp': datetime.now(),
'score': quality_result['total_score'],
'data_source': data_batch.get('source', 'unknown')
})
# アラートチェック
if quality_result['total_score'] < self.alert_threshold:
await self.raise_quality_alert(quality_result, data_batch)
# 定期的な統計レポート
if self.monitoring_stats['total_checks'] % 100 == 0:
await self.generate_quality_report()
async def raise_quality_alert(self, quality_result, data_batch):
"""品質アラートの発生"""
self.monitoring_stats['alerts_raised'] += 1
alert = {
'severity': self.determine_severity(quality_result['total_score']),
'timestamp': datetime.now(),
'quality_score': quality_result['total_score'],
'issues': quality_result['issues_found'],
'data_source': data_batch.get('source'),
'recommendations': quality_result['recommendations']
}
# コールバック実行
for callback in self.alert_callbacks:
await callback(alert)
# ログ記録
self.logger.warning(f"Data quality alert: {alert}")
def determine_severity(self, score):
"""アラートの深刻度判定"""
if score < 0.3:
return 'CRITICAL'
elif score < 0.5:
return 'HIGH'
elif score < 0.7:
return 'MEDIUM'
else:
return 'LOW'
async def generate_quality_report(self):
"""品質レポートの生成"""
recent_scores = [
h['score'] for h in self.monitoring_stats['quality_history']
]
if not recent_scores:
return
report = {
'timestamp': datetime.now(),
'period_stats': {
'avg_quality_score': np.mean(recent_scores),
'min_quality_score': np.min(recent_scores),
'max_quality_score': np.max(recent_scores),
'std_quality_score': np.std(recent_scores),
'alert_rate': self.monitoring_stats['alerts_raised'] /
self.monitoring_stats['total_checks']
},
'quality_trend': self.calculate_quality_trend(),
'source_breakdown': self.analyze_by_source()
}
return report
def calculate_quality_trend(self):
"""品質トレンドの計算"""
history = list(self.monitoring_stats['quality_history'])
if len(history) < 10:
return 'insufficient_data'
# 最近のスコアと過去のスコアの比較
recent = [h['score'] for h in history[-50:]]
past = [h['score'] for h in history[-100:-50]]
if not past:
return 'improving' # データが少ない場合
recent_avg = np.mean(recent)
past_avg = np.mean(past)
if recent_avg > past_avg * 1.05:
return 'improving'
elif recent_avg < past_avg * 0.95:
return 'degrading'
else:
return 'stable'
3. MLモデル評価システム
3.1 モデル性能評価指標
class MLModelEvaluator:
def __init__(self):
self.evaluation_metrics = {
'regression': [
'mse', 'rmse', 'mae', 'mape', 'r2',
'directional_accuracy', 'profit_factor'
],
'classification': [
'accuracy', 'precision', 'recall', 'f1',
'auc_roc', 'log_loss', 'confusion_matrix'
],
'time_series': [
'mase', 'smape', 'msis', 'coverage',
'sharpness', 'calibration'
]
}
def evaluate_price_prediction_model(self, y_true, y_pred, prices):
"""価格予測モデルの総合評価"""
evaluation_results = {}
# 基本的な回帰指標
evaluation_results['regression_metrics'] = self.calculate_regression_metrics(
y_true, y_pred
)
# 方向性の精度
evaluation_results['directional_metrics'] = self.calculate_directional_metrics(
y_true, y_pred
)
# 取引シミュレーション
evaluation_results['trading_metrics'] = self.calculate_trading_metrics(
y_true, y_pred, prices
)
# 予測の信頼区間
evaluation_results['prediction_intervals'] = self.calculate_prediction_intervals(
y_true, y_pred
)
# リスク調整後指標
evaluation_results['risk_adjusted_metrics'] = self.calculate_risk_adjusted_metrics(
y_true, y_pred, prices
)
# 総合スコア
evaluation_results['overall_score'] = self.calculate_overall_score(
evaluation_results
)
return evaluation_results
def calculate_regression_metrics(self, y_true, y_pred):
"""回帰指標の計算"""
metrics = {}
# MSE (Mean Squared Error)
metrics['mse'] = np.mean((y_true - y_pred) ** 2)
# RMSE (Root Mean Squared Error)
metrics['rmse'] = np.sqrt(metrics['mse'])
# MAE (Mean Absolute Error)
metrics['mae'] = np.mean(np.abs(y_true - y_pred))
# MAPE (Mean Absolute Percentage Error)
# ゼロ除算を避ける
mask = y_true != 0
if mask.any():
metrics['mape'] = np.mean(np.abs((y_true[mask] - y_pred[mask]) / y_true[mask])) * 100
else:
metrics['mape'] = np.inf
# R² (決定係数)
ss_tot = np.sum((y_true - np.mean(y_true)) ** 2)
ss_res = np.sum((y_true - y_pred) ** 2)
metrics['r2'] = 1 - (ss_res / ss_tot) if ss_tot != 0 else 0
# Symmetric MAPE
denominator = (np.abs(y_true) + np.abs(y_pred)) / 2
mask = denominator != 0
if mask.any():
metrics['smape'] = np.mean(
np.abs(y_true[mask] - y_pred[mask]) / denominator[mask]
) * 100
else:
metrics['smape'] = 0
return metrics
def calculate_directional_metrics(self, y_true, y_pred):
"""方向性精度の計算"""
metrics = {}
# 実際の方向(上昇/下降)
true_direction = np.sign(np.diff(y_true))
pred_direction = np.sign(np.diff(y_pred))
# 方向性精度
correct_direction = (true_direction == pred_direction)
metrics['directional_accuracy'] = np.mean(correct_direction) * 100
# 方向別の精度
up_mask = true_direction > 0
down_mask = true_direction < 0
if up_mask.any():
metrics['upward_accuracy'] = np.mean(correct_direction[up_mask]) * 100
else:
metrics['upward_accuracy'] = None
if down_mask.any():
metrics['downward_accuracy'] = np.mean(correct_direction[down_mask]) * 100
else:
metrics['downward_accuracy'] = None
# 方向の一致度(相関)
metrics['direction_correlation'] = np.corrcoef(true_direction, pred_direction)[0, 1]
return metrics
def calculate_trading_metrics(self, y_true, y_pred, prices):
"""取引シミュレーションに基づく指標"""
metrics = {}
# 予測に基づく取引シグナル
signals = np.where(y_pred > prices, 1, -1) # 買い: 1, 売り: -1
# リターンの計算
actual_returns = np.diff(y_true) / y_true[:-1]
strategy_returns = signals[:-1] * actual_returns
# 累積リターン
metrics['cumulative_return'] = (1 + strategy_returns).prod() - 1
# シャープレシオ
if strategy_returns.std() != 0:
metrics['sharpe_ratio'] = np.sqrt(252) * strategy_returns.mean() / strategy_returns.std()
else:
metrics['sharpe_ratio'] = 0
# 最大ドローダウン
cumulative = (1 + strategy_returns).cumprod()
running_max = np.maximum.accumulate(cumulative)
drawdown = (cumulative - running_max) / running_max
metrics['max_drawdown'] = drawdown.min() * 100
# 勝率
winning_trades = strategy_returns > 0
metrics['win_rate'] = np.mean(winning_trades) * 100
# プロフィットファクター
gross_profit = strategy_returns[strategy_returns > 0].sum()
gross_loss = abs(strategy_returns[strategy_returns < 0].sum())
metrics['profit_factor'] = gross_profit / gross_loss if gross_loss != 0 else np.inf
return metrics
def calculate_prediction_intervals(self, y_true, y_pred, confidence=0.95):
"""予測区間の計算"""
residuals = y_true - y_pred
# 残差の標準偏差
std_residuals = np.std(residuals)
# 正規分布を仮定した予測区間
z_score = 1.96 # 95%信頼区間
intervals = {
'lower_bound': y_pred - z_score * std_residuals,
'upper_bound': y_pred + z_score * std_residuals,
'coverage': np.mean(
(y_true >= y_pred - z_score * std_residuals) &
(y_true <= y_pred + z_score * std_residuals)
) * 100,
'interval_width': 2 * z_score * std_residuals
}
return intervals
def calculate_risk_adjusted_metrics(self, y_true, y_pred, prices):
"""リスク調整後の評価指標"""
metrics = {}
# 予測誤差
errors = y_true - y_pred
# 下方リスク(ダウンサイドリスク)
downside_errors = errors[errors < 0]
if len(downside_errors) > 0:
metrics['downside_deviation'] = np.std(downside_errors)
# ソルティノレシオ
returns = np.diff(y_true) / y_true[:-1]
mean_return = returns.mean()
metrics['sortino_ratio'] = np.sqrt(252) * mean_return / metrics['downside_deviation']
else:
metrics['downside_deviation'] = 0
metrics['sortino_ratio'] = np.inf
# VaR (Value at Risk)
metrics['var_95'] = np.percentile(errors, 5)
# CVaR (Conditional Value at Risk)
var_threshold = metrics['var_95']
tail_errors = errors[errors <= var_threshold]
metrics['cvar_95'] = tail_errors.mean() if len(tail_errors) > 0 else var_threshold
# カルマーレシオ
cumulative_return = ((1 + np.diff(y_true) / y_true[:-1]).prod() - 1)
max_dd = self.calculate_max_drawdown(y_true)
metrics['calmar_ratio'] = cumulative_return / abs(max_dd) if max_dd != 0 else 0
return metrics
3.2 モデルドリフト検出
class ModelDriftDetector:
def __init__(self, baseline_performance, sensitivity=0.1):
self.baseline_performance = baseline_performance
self.sensitivity = sensitivity
self.performance_history = deque(maxlen=100)
self.drift_alerts = []
def detect_drift(self, current_performance, feature_distributions=None):
"""モデルドリフトの検出"""
drift_indicators = {}
# パフォーマンスドリフト
perf_drift = self.detect_performance_drift(current_performance)
drift_indicators['performance_drift'] = perf_drift
# 予測分布のドリフト
if 'prediction_distribution' in current_performance:
pred_drift = self.detect_prediction_drift(
current_performance['prediction_distribution']
)
drift_indicators['prediction_drift'] = pred_drift
# 特徴量分布のドリフト
if feature_distributions:
feature_drift = self.detect_feature_drift(feature_distributions)
drift_indicators['feature_drift'] = feature_drift
# 総合的なドリフト判定
overall_drift = self.assess_overall_drift(drift_indicators)
# アラート生成
if overall_drift['is_drifting']:
self.generate_drift_alert(overall_drift)
return overall_drift
def detect_performance_drift(self, current_performance):
"""パフォーマンスメトリクスのドリフト検出"""
drift_results = {}
for metric, baseline_value in self.baseline_performance.items():
if metric in current_performance:
current_value = current_performance[metric]
# 相対的な変化
if baseline_value != 0:
relative_change = abs(current_value - baseline_value) / abs(baseline_value)
else:
relative_change = abs(current_value - baseline_value)
# ドリフト判定
is_drifting = relative_change > self.sensitivity
drift_results[metric] = {
'baseline': baseline_value,
'current': current_value,
'relative_change': relative_change,
'is_drifting': is_drifting
}
return drift_results
def detect_prediction_drift(self, current_predictions):
"""予測分布のドリフト検出(KSテスト)"""
from scipy import stats
if not hasattr(self, 'baseline_predictions'):
self.baseline_predictions = current_predictions
return {'is_drifting': False, 'message': 'Baseline set'}
# Kolmogorov-Smirnov検定
ks_statistic, p_value = stats.ks_2samp(
self.baseline_predictions,
current_predictions
)
is_drifting = p_value < 0.05 # 5%有意水準
return {
'is_drifting': is_drifting,
'ks_statistic': ks_statistic,
'p_value': p_value,
'interpretation': 'Significant drift detected' if is_drifting else 'No significant drift'
}
def detect_feature_drift(self, feature_distributions):
"""特徴量分布のドリフト検出"""
drift_results = {}
for feature_name, current_dist in feature_distributions.items():
if hasattr(self, f'baseline_dist_{feature_name}'):
baseline_dist = getattr(self, f'baseline_dist_{feature_name}')
# PSI (Population Stability Index) の計算
psi = self.calculate_psi(baseline_dist, current_dist)
# ドリフト判定(PSI > 0.2 で大きなドリフト)
drift_level = self.interpret_psi(psi)
drift_results[feature_name] = {
'psi': psi,
'drift_level': drift_level,
'is_drifting': psi > 0.1
}
else:
# ベースラインの設定
setattr(self, f'baseline_dist_{feature_name}', current_dist)
drift_results[feature_name] = {
'psi': 0,
'drift_level': 'baseline_set',
'is_drifting': False
}
return drift_results
def calculate_psi(self, baseline_dist, current_dist, buckets=10):
"""Population Stability Index の計算"""
# ヒストグラムの作成
min_val = min(baseline_dist.min(), current_dist.min())
max_val = max(baseline_dist.max(), current_dist.max())
bins = np.linspace(min_val, max_val, buckets + 1)
# 各ビンの割合を計算
baseline_percents = np.histogram(baseline_dist, bins=bins)[0] / len(baseline_dist)
current_percents = np.histogram(current_dist, bins=bins)[0] / len(current_dist)
# 0を避けるための小さな値を追加
baseline_percents = np.maximum(baseline_percents, 0.0001)
current_percents = np.maximum(current_percents, 0.0001)
# PSI計算
psi = np.sum(
(current_percents - baseline_percents) *
np.log(current_percents / baseline_percents)
)
return psi
def interpret_psi(self, psi):
"""PSI値の解釈"""
if psi < 0.1:
return 'no_drift'
elif psi < 0.2:
return 'slight_drift'
else:
return 'significant_drift'
4. データ品質改善システム
4.1 自動データクリーニング
class AutoDataCleaner:
def __init__(self):
self.cleaning_strategies = {
'missing_values': self.handle_missing_values,
'outliers': self.handle_outliers,
'duplicates': self.handle_duplicates,
'inconsistencies': self.handle_inconsistencies
}
self.cleaning_log = []
def clean_crypto_data(self, data: pd.DataFrame, config: Dict = None) -> pd.DataFrame:
"""暗号通貨データの自動クリーニング"""
config = config or self.get_default_config()
cleaned_data = data.copy()
# クリーニング前の品質スコア
quality_scorer = DataQualityScorer()
before_score = quality_scorer.calculate_quality_score(cleaned_data)
# 各クリーニング戦略の適用
for issue_type, strategy in self.cleaning_strategies.items():
if config.get(f'clean_{issue_type}', True):
cleaned_data = strategy(cleaned_data, config)
# クリーニング後の品質スコア
after_score = quality_scorer.calculate_quality_score(cleaned_data)
# クリーニング結果の記録
self.log_cleaning_results(before_score, after_score, len(data), len(cleaned_data))
return cleaned_data
def handle_missing_values(self, data: pd.DataFrame, config: Dict) -> pd.DataFrame:
"""欠損値の処理"""
strategy = config.get('missing_value_strategy', 'interpolate')
if strategy == 'interpolate':
# 時系列補間
numeric_columns = data.select_dtypes(include=[np.number]).columns
data[numeric_columns] = data[numeric_columns].interpolate(method='time')
elif strategy == 'forward_fill':
# 前方補完
data = data.fillna(method='ffill')
elif strategy == 'rolling_mean':
# 移動平均で補完
window = config.get('rolling_window', 5)
numeric_columns = data.select_dtypes(include=[np.number]).columns
for col in numeric_columns:
data[col] = data[col].fillna(data[col].rolling(window, min_periods=1).mean())
return data
def handle_outliers(self, data: pd.DataFrame, config: Dict) -> pd.DataFrame:
"""外れ値の処理"""
method = config.get('outlier_method', 'iqr')
threshold = config.get('outlier_threshold', 3)
numeric_columns = ['open', 'high', 'low', 'close', 'volume']
numeric_columns = [col for col in numeric_columns if col in data.columns]
for col in numeric_columns:
if method == 'iqr':
# IQR法
Q1 = data[col].quantile(0.25)
Q3 = data[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - threshold * IQR
upper_bound = Q3 + threshold * IQR
# 外れ値をクリップ
data[col] = data[col].clip(lower_bound, upper_bound)
elif method == 'zscore':
# Zスコア法
mean = data[col].mean()
std = data[col].std()
z_scores = np.abs((data[col] - mean) / std)
data.loc[z_scores > threshold, col] = mean
return data
def handle_duplicates(self, data: pd.DataFrame, config: Dict) -> pd.DataFrame:
"""重複データの処理"""
# タイムスタンプの重複を削除
if 'timestamp' in data.columns:
data = data.drop_duplicates(subset=['timestamp'], keep='last')
# 完全に同一の行を削除
data = data.drop_duplicates()
return data
def handle_inconsistencies(self, data: pd.DataFrame, config: Dict) -> pd.DataFrame:
"""データの不整合を修正"""
# OHLC関係の修正
if all(col in data.columns for col in ['open', 'high', 'low', 'close']):
# highが最大値になるよう修正
data['high'] = data[['open', 'high', 'low', 'close']].max(axis=1)
# lowが最小値になるよう修正
data['low'] = data[['open', 'high', 'low', 'close']].min(axis=1)
# 負のボリュームを0に修正
if 'volume' in data.columns:
data.loc[data['volume'] < 0, 'volume'] = 0
# タイムスタンプの順序を確保
if 'timestamp' in data.columns:
data = data.sort_values('timestamp')
return data
4.2 品質メトリクスダッシュボード
class QualityMetricsDashboard:
def __init__(self):
self.metrics_history = defaultdict(list)
self.alert_thresholds = {
'total_score': 0.7,
'completeness': 0.8,
'accuracy': 0.75,
'consistency': 0.8,
'timeliness': 0.6
}
def update_metrics(self, source: str, quality_result: Dict):
"""メトリクスの更新"""
timestamp = quality_result['timestamp']
# 履歴に追加
self.metrics_history[source].append({
'timestamp': timestamp,
'total_score': quality_result['total_score'],
'dimensions': quality_result['dimension_scores']
})
# 閾値チェック
alerts = self.check_thresholds(source, quality_result)
return alerts
def generate_summary_report(self, time_window: timedelta = timedelta(hours=24)):
"""サマリーレポートの生成"""
cutoff_time = datetime.now() - time_window
report = {}
for source, history in self.metrics_history.items():
# 指定期間内のデータをフィルタ
recent_data = [
h for h in history
if h['timestamp'] > cutoff_time
]
if not recent_data:
continue
# 統計情報の計算
scores = [h['total_score'] for h in recent_data]
dimension_scores = defaultdict(list)
for h in recent_data:
for dim, score in h['dimensions'].items():
dimension_scores[dim].append(score)
report[source] = {
'avg_total_score': np.mean(scores),
'min_total_score': np.min(scores),
'max_total_score': np.max(scores),
'score_volatility': np.std(scores),
'sample_count': len(recent_data),
'dimension_averages': {
dim: np.mean(scores)
for dim, scores in dimension_scores.items()
},
'quality_trend': self.calculate_trend(scores)
}
return report
def calculate_trend(self, scores: List[float]) -> str:
"""品質トレンドの計算"""
if len(scores) < 10:
return 'insufficient_data'
# 線形回帰でトレンドを計算
x = np.arange(len(scores))
slope, _ = np.polyfit(x, scores, 1)
if slope > 0.001:
return 'improving'
elif slope < -0.001:
return 'degrading'
else:
return 'stable'
def visualize_quality_metrics(self, source: str, save_path: str = None):
"""品質メトリクスの可視化"""
import matplotlib.pyplot as plt
import seaborn as sns
if source not in self.metrics_history:
return
history = self.metrics_history[source]
# データフレームに変換
df_data = []
for h in history:
row = {'timestamp': h['timestamp'], 'total_score': h['total_score']}
row.update(h['dimensions'])
df_data.append(row)
df = pd.DataFrame(df_data)
# 図の作成
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 1. 総合スコアの時系列
ax = axes[0, 0]
df.plot(x='timestamp', y='total_score', ax=ax, legend=False)
ax.axhline(y=self.alert_thresholds['total_score'], color='r', linestyle='--', label='Alert Threshold')
ax.set_title(f'Total Quality Score - {source}')
ax.set_ylabel('Score')
ax.legend()
# 2. 各次元のスコア
ax = axes[0, 1]
dimensions = ['completeness', 'accuracy', 'consistency', 'timeliness', 'validity']
dimension_cols = [col for col in dimensions if col in df.columns]
if dimension_cols:
df[dimension_cols].plot(ax=ax)
ax.set_title('Quality Dimensions Over Time')
ax.set_ylabel('Score')
ax.legend()
# 3. スコア分布
ax = axes[1, 0]
df['total_score'].hist(bins=30, ax=ax)
ax.set_title('Quality Score Distribution')
ax.set_xlabel('Total Score')
ax.set_ylabel('Frequency')
# 4. 品質ヒートマップ
ax = axes[1, 1]
if len(df) > 0 and dimension_cols:
# 時間をビンに分割
df['time_bin'] = pd.cut(df.index, bins=min(20, len(df)))
heatmap_data = df.groupby('time_bin')[dimension_cols].mean()
sns.heatmap(heatmap_data.T, cmap='RdYlGn', vmin=0, vmax=1, ax=ax)
ax.set_title('Quality Dimensions Heatmap')
ax.set_xlabel('Time Period')
plt.tight_layout()
if save_path:
plt.savefig(save_path)
else:
plt.show()
plt.close()
5. 統合品質管理システム
5.1 エンドツーエンド品質管理
class IntegratedQualityManagementSystem:
def __init__(self):
self.quality_scorer = DataQualityScorer()
self.quality_monitor = RealTimeQualityMonitor()
self.model_evaluator = MLModelEvaluator()
self.drift_detector = ModelDriftDetector(baseline_performance={})
self.data_cleaner = AutoDataCleaner()
self.dashboard = QualityMetricsDashboard()
async def process_data_pipeline(self, raw_data: pd.DataFrame,
source: str,
model_predictions: np.ndarray = None):
"""データパイプライン全体の品質管理"""
pipeline_results = {
'timestamp': datetime.now(),
'source': source,
'stages': {}
}
# 1. 入力データの品質評価
input_quality = self.quality_scorer.calculate_quality_score(raw_data)
pipeline_results['stages']['input_quality'] = input_quality
# 2. データクリーニング
if input_quality['total_score'] < 0.8:
cleaned_data = self.data_cleaner.clean_crypto_data(raw_data)
post_cleaning_quality = self.quality_scorer.calculate_quality_score(cleaned_data)
pipeline_results['stages']['cleaning'] = {
'applied': True,
'quality_improvement': post_cleaning_quality['total_score'] - input_quality['total_score']
}
else:
cleaned_data = raw_data
pipeline_results['stages']['cleaning'] = {'applied': False}
# 3. モデル予測の評価(該当する場合)
if model_predictions is not None:
model_performance = self.model_evaluator.evaluate_price_prediction_model(
cleaned_data['close'].values[1:], # 実際の値
model_predictions, # 予測値
cleaned_data['close'].values[:-1] # 価格
)
pipeline_results['stages']['model_evaluation'] = model_performance
# 4. ドリフト検出
drift_result = self.drift_detector.detect_drift(
model_performance['regression_metrics']
)
pipeline_results['stages']['drift_detection'] = drift_result
# 5. ダッシュボード更新
alerts = self.dashboard.update_metrics(source, input_quality)
pipeline_results['alerts'] = alerts
# 6. 品質改善の推奨事項
recommendations = self.generate_recommendations(pipeline_results)
pipeline_results['recommendations'] = recommendations
return pipeline_results
def generate_recommendations(self, pipeline_results: Dict) -> List[str]:
"""品質改善の推奨事項生成"""
recommendations = []
# 入力品質に基づく推奨
input_quality = pipeline_results['stages']['input_quality']
if input_quality['total_score'] < 0.7:
recommendations.append("Critical: データ品質が基準を下回っています。データソースの確認が必要です。")
# 各次元の推奨
for dimension, score in input_quality['dimension_scores'].items():
if score < 0.7:
if dimension == 'completeness':
recommendations.append("欠損データが多いです。データ収集プロセスを見直してください。")
elif dimension == 'accuracy':
recommendations.append("データの正確性に問題があります。外れ値処理を強化してください。")
elif dimension == 'consistency':
recommendations.append("データの一貫性が低いです。検証ルールを追加してください。")
elif dimension == 'timeliness':
recommendations.append("データの遅延が発生しています。リアルタイム性を改善してください。")
# モデル関連の推奨
if 'model_evaluation' in pipeline_results['stages']:
model_eval = pipeline_results['stages']['model_evaluation']
if model_eval['overall_score'] < 0.6:
recommendations.append("モデルの性能が低下しています。再訓練を検討してください。")
# ドリフト検出に基づく推奨
if 'drift_detection' in pipeline_results['stages']:
drift = pipeline_results['stages']['drift_detection']
if drift.get('is_drifting', False):
recommendations.append("モデルドリフトが検出されました。モデルの更新が必要です。")
return recommendations
5.2 品質保証のベストプラクティス
class QualityAssuranceBestPractices:
"""品質保証のベストプラクティス実装"""
@staticmethod
def implement_data_validation_rules():
"""データ検証ルールの実装"""
validation_rules = {
'price_rules': [
lambda x: x > 0, # 正の価格
lambda x: x < 1e9, # 現実的な上限
lambda x: not np.isnan(x) # NaNチェック
],
'volume_rules': [
lambda x: x >= 0, # 非負のボリューム
lambda x: not np.isinf(x) # 無限大チェック
],
'timestamp_rules': [
lambda x: x <= datetime.now(), # 未来の日付でない
lambda x: x > datetime(2009, 1, 3) # ビットコイン開始後
]
}
return validation_rules
@staticmethod
def setup_monitoring_alerts():
"""監視アラートの設定"""
alert_config = {
'quality_alerts': {
'critical': 0.5, # 品質スコア < 0.5
'warning': 0.7, # 品質スコア < 0.7
'info': 0.85 # 品質スコア < 0.85
},
'performance_alerts': {
'accuracy_drop': 0.1, # 10%以上の精度低下
'latency_spike': 1000, # 1秒以上の遅延
'error_rate': 0.05 # 5%以上のエラー率
},
'data_alerts': {
'missing_data': 0.1, # 10%以上の欠損
'outlier_ratio': 0.05, # 5%以上の外れ値
'duplicate_ratio': 0.01 # 1%以上の重複
}
}
return alert_config
@staticmethod
def implement_continuous_improvement():
"""継続的改善プロセス"""
improvement_cycle = {
'measure': 'データ品質とモデル性能の定期測定',
'analyze': '問題の根本原因分析',
'improve': '改善策の実装',
'control': '改善効果のモニタリング',
'document': 'ベストプラクティスの文書化'
}
return improvement_cycle
まとめ
データ品質スコア算出とMLモデル評価システムの実装には、以下が重要です:
- 多次元的な品質評価: 完全性、正確性、一貫性、適時性、妥当性の総合評価
- リアルタイムモニタリング: 継続的な品質監視とアラート
- 自動データクリーニング: 品質問題の自動検出と修正
- モデル性能の包括的評価: 予測精度だけでなく、取引性能も評価
- ドリフト検出: データとモデルの経時的変化の監視
- 統合的アプローチ: データ品質とモデル品質の統合管理
これらの要素を組み合わせることで、信頼性の高い暗号通貨予測システムを構築できます。