ML Documentation

データ品質スコア算出とMLモデル評価

概要

暗号通貨市場データの品質評価とMLモデルの性能評価は、予測システムの信頼性を確保する上で極めて重要です。本ドキュメントでは、リアルタイムデータ品質モニタリング、品質スコアの算出方法、MLモデルの評価指標、およびデータドリフト検出について解説します。

1. データ品質の定義と評価指標

1.1 データ品質の次元

# データ品質の6つの次元
DATA_QUALITY_DIMENSIONS = {
    'completeness': '必要なデータがすべて存在するか',
    'accuracy': 'データが実際の値を正確に反映しているか',
    'consistency': 'データが矛盾なく一貫しているか',
    'timeliness': 'データが適切なタイミングで利用可能か',
    'validity': 'データが定義されたルールに従っているか',
    'uniqueness': '重複データが存在しないか'
}

1.2 暗号通貨データ特有の品質指標

class CryptoDataQualityMetrics:
    def __init__(self):
        self.metrics = {
            'price_integrity': {
                'description': '価格データの整合性',
                'checks': [
                    'OHLC関係の妥当性',
                    '価格スパイクの検出',
                    '取引所間価格差の妥当性'
                ]
            },
            'volume_authenticity': {
                'description': 'ボリュームの真正性',
                'checks': [
                    'ウォッシュトレーディング検出',
                    'ボリュームスパイク分析',
                    '価格変動とボリュームの相関'
                ]
            },
            'latency_metrics': {
                'description': 'データ遅延の評価',
                'checks': [
                    'データ到着時間の分布',
                    'ピーク時の遅延',
                    '取引所APIの応答時間'
                ]
            },
            'coverage_completeness': {
                'description': 'データカバレッジ',
                'checks': [
                    '取引ペアのカバー率',
                    '時間的連続性',
                    '取引所カバレッジ'
                ]
            }
        }

2. データ品質スコア算出システム

2.1 総合品質スコアの計算

import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List, Tuple
import logging

class DataQualityScorer:
    def __init__(self, weights: Dict[str, float] = None):
        self.weights = weights or {
            'completeness': 0.25,
            'accuracy': 0.30,
            'consistency': 0.20,
            'timeliness': 0.15,
            'validity': 0.10
        }
        self.score_history = []
        self.logger = logging.getLogger(__name__)

    def calculate_quality_score(self, data: pd.DataFrame, 
                              metadata: Dict = None) -> Dict:
        """総合的な品質スコアの計算"""
        scores = {}

        # 完全性スコア
        scores['completeness'] = self.calculate_completeness_score(data)

        # 正確性スコア
        scores['accuracy'] = self.calculate_accuracy_score(data, metadata)

        # 一貫性スコア
        scores['consistency'] = self.calculate_consistency_score(data)

        # 適時性スコア
        scores['timeliness'] = self.calculate_timeliness_score(data, metadata)

        # 妥当性スコア
        scores['validity'] = self.calculate_validity_score(data)

        # 総合スコアの計算
        total_score = sum(
            scores[dim] * self.weights[dim] 
            for dim in self.weights
        )

        # 詳細な分析結果
        result = {
            'total_score': total_score,
            'dimension_scores': scores,
            'timestamp': datetime.now(),
            'data_points': len(data),
            'issues_found': self.identify_issues(scores),
            'recommendations': self.generate_recommendations(scores)
        }

        # 履歴に追加
        self.score_history.append(result)

        return result

    def calculate_completeness_score(self, data: pd.DataFrame) -> float:
        """完全性スコアの計算"""
        # 欠損値の割合
        missing_ratio = data.isnull().sum().sum() / (len(data) * len(data.columns))

        # 必須カラムの存在確認
        required_columns = ['timestamp', 'open', 'high', 'low', 'close', 'volume']
        column_coverage = sum(
            1 for col in required_columns if col in data.columns
        ) / len(required_columns)

        # 時系列の連続性
        if 'timestamp' in data.columns:
            time_gaps = self.detect_time_gaps(data['timestamp'])
            continuity_score = 1 - min(len(time_gaps) / len(data), 1)
        else:
            continuity_score = 0

        # スコアの統合
        completeness_score = (
            (1 - missing_ratio) * 0.4 +
            column_coverage * 0.3 +
            continuity_score * 0.3
        )

        return completeness_score

    def calculate_accuracy_score(self, data: pd.DataFrame, 
                               metadata: Dict = None) -> float:
        """正確性スコアの計算"""
        accuracy_checks = []

        # 価格データの妥当性チェック
        if all(col in data.columns for col in ['high', 'low', 'open', 'close']):
            # OHLC関係のチェック
            ohlc_valid = (
                (data['high'] >= data['low']).all() and
                (data['high'] >= data['open']).all() and
                (data['high'] >= data['close']).all() and
                (data['low'] <= data['open']).all() and
                (data['low'] <= data['close']).all()
            )
            accuracy_checks.append(1.0 if ohlc_valid else 0.0)

            # 価格スパイクの検出
            price_changes = data['close'].pct_change()
            extreme_changes = (price_changes.abs() > 0.5).sum()  # 50%以上の変動
            spike_score = 1 - min(extreme_changes / len(data), 1)
            accuracy_checks.append(spike_score)

        # ボリュームの妥当性
        if 'volume' in data.columns:
            negative_volumes = (data['volume'] < 0).sum()
            volume_score = 1 - (negative_volumes / len(data))
            accuracy_checks.append(volume_score)

        # 外れ値の検出
        outlier_score = self.calculate_outlier_score(data)
        accuracy_checks.append(outlier_score)

        return np.mean(accuracy_checks) if accuracy_checks else 0

    def calculate_consistency_score(self, data: pd.DataFrame) -> float:
        """一貫性スコアの計算"""
        consistency_checks = []

        # データ型の一貫性
        expected_types = {
            'open': 'float64',
            'high': 'float64',
            'low': 'float64',
            'close': 'float64',
            'volume': 'float64'
        }

        type_consistency = sum(
            1 for col, expected_type in expected_types.items()
            if col in data.columns and data[col].dtype == expected_type
        ) / len(expected_types)
        consistency_checks.append(type_consistency)

        # 時系列の順序
        if 'timestamp' in data.columns:
            is_sorted = data['timestamp'].is_monotonic_increasing
            consistency_checks.append(1.0 if is_sorted else 0.0)

        # 価格とボリュームの相関
        if 'close' in data.columns and 'volume' in data.columns:
            price_vol_corr = self.check_price_volume_correlation(data)
            consistency_checks.append(price_vol_corr)

        return np.mean(consistency_checks) if consistency_checks else 0

    def calculate_timeliness_score(self, data: pd.DataFrame, 
                                 metadata: Dict = None) -> float:
        """適時性スコアの計算"""
        if metadata is None:
            return 1.0  # メタデータがない場合はデフォルト値

        timeliness_factors = []

        # データの鮮度
        if 'last_update' in metadata:
            age = datetime.now() - metadata['last_update']
            freshness_score = max(0, 1 - age.seconds / 3600)  # 1時間で0になる
            timeliness_factors.append(freshness_score)

        # 遅延の評価
        if 'latency_ms' in metadata:
            latency_score = max(0, 1 - metadata['latency_ms'] / 1000)  # 1秒で0
            timeliness_factors.append(latency_score)

        # 更新頻度
        if 'update_frequency' in metadata:
            freq_score = min(metadata['update_frequency'] / 60, 1)  # 60回/分で満点
            timeliness_factors.append(freq_score)

        return np.mean(timeliness_factors) if timeliness_factors else 0.5

    def calculate_validity_score(self, data: pd.DataFrame) -> float:
        """妥当性スコアの計算"""
        validity_checks = []

        # 価格の範囲チェック
        if 'close' in data.columns:
            valid_prices = ((data['close'] > 0) & (data['close'] < 1e9)).sum()
            price_validity = valid_prices / len(data)
            validity_checks.append(price_validity)

        # ボリュームの範囲チェック
        if 'volume' in data.columns:
            valid_volumes = (data['volume'] >= 0).sum()
            volume_validity = valid_volumes / len(data)
            validity_checks.append(volume_validity)

        # タイムスタンプの妥当性
        if 'timestamp' in data.columns:
            try:
                # 未来の日付がないかチェック
                future_dates = (data['timestamp'] > datetime.now()).sum()
                # 極端に古い日付がないかチェック
                old_dates = (data['timestamp'] < datetime.now() - timedelta(days=365*5)).sum()
                timestamp_validity = 1 - (future_dates + old_dates) / len(data)
                validity_checks.append(timestamp_validity)
            except:
                validity_checks.append(0)

        return np.mean(validity_checks) if validity_checks else 0

    def detect_time_gaps(self, timestamps: pd.Series) -> List[Tuple[datetime, datetime]]:
        """時系列データのギャップ検出"""
        gaps = []
        timestamps = pd.to_datetime(timestamps).sort_values()

        # 期待される間隔を推定
        intervals = timestamps.diff().dropna()
        expected_interval = intervals.mode()[0] if len(intervals) > 0 else pd.Timedelta(minutes=1)

        # ギャップの検出
        for i in range(1, len(timestamps)):
            actual_interval = timestamps.iloc[i] - timestamps.iloc[i-1]
            if actual_interval > expected_interval * 2:  # 期待値の2倍以上
                gaps.append((timestamps.iloc[i-1], timestamps.iloc[i]))

        return gaps

    def calculate_outlier_score(self, data: pd.DataFrame) -> float:
        """外れ値スコアの計算"""
        numeric_columns = data.select_dtypes(include=[np.number]).columns
        outlier_ratios = []

        for col in numeric_columns:
            if col in data.columns:
                # IQR法による外れ値検出
                Q1 = data[col].quantile(0.25)
                Q3 = data[col].quantile(0.75)
                IQR = Q3 - Q1

                lower_bound = Q1 - 1.5 * IQR
                upper_bound = Q3 + 1.5 * IQR

                outliers = ((data[col] < lower_bound) | (data[col] > upper_bound)).sum()
                outlier_ratio = outliers / len(data)
                outlier_ratios.append(outlier_ratio)

        avg_outlier_ratio = np.mean(outlier_ratios) if outlier_ratios else 0
        return 1 - avg_outlier_ratio

    def check_price_volume_correlation(self, data: pd.DataFrame) -> float:
        """価格とボリュームの相関チェック"""
        # 価格変動率とボリュームの相関
        price_changes = data['close'].pct_change().abs()
        volume_changes = data['volume'].pct_change().abs()

        # 相関係数の計算
        correlation = price_changes.corr(volume_changes)

        # 適度な正の相関が期待される(0.2-0.7)
        if 0.2 <= correlation <= 0.7:
            return 1.0
        elif correlation < 0:
            return 0.5  # 負の相関は異常
        else:
            return 0.7  # 相関が強すぎるか弱すぎる

2.2 リアルタイム品質モニタリング

class RealTimeQualityMonitor:
    def __init__(self, alert_threshold=0.7):
        self.alert_threshold = alert_threshold
        self.quality_scorer = DataQualityScorer()
        self.monitoring_stats = {
            'total_checks': 0,
            'alerts_raised': 0,
            'quality_history': deque(maxlen=1000)
        }
        self.alert_callbacks = []

    async def monitor_data_stream(self, data_stream):
        """データストリームのリアルタイム品質監視"""
        async for data_batch in data_stream:
            # 品質スコアの計算
            quality_result = self.quality_scorer.calculate_quality_score(
                data_batch['data'],
                data_batch.get('metadata')
            )

            # 統計の更新
            self.monitoring_stats['total_checks'] += 1
            self.monitoring_stats['quality_history'].append({
                'timestamp': datetime.now(),
                'score': quality_result['total_score'],
                'data_source': data_batch.get('source', 'unknown')
            })

            # アラートチェック
            if quality_result['total_score'] < self.alert_threshold:
                await self.raise_quality_alert(quality_result, data_batch)

            # 定期的な統計レポート
            if self.monitoring_stats['total_checks'] % 100 == 0:
                await self.generate_quality_report()

    async def raise_quality_alert(self, quality_result, data_batch):
        """品質アラートの発生"""
        self.monitoring_stats['alerts_raised'] += 1

        alert = {
            'severity': self.determine_severity(quality_result['total_score']),
            'timestamp': datetime.now(),
            'quality_score': quality_result['total_score'],
            'issues': quality_result['issues_found'],
            'data_source': data_batch.get('source'),
            'recommendations': quality_result['recommendations']
        }

        # コールバック実行
        for callback in self.alert_callbacks:
            await callback(alert)

        # ログ記録
        self.logger.warning(f"Data quality alert: {alert}")

    def determine_severity(self, score):
        """アラートの深刻度判定"""
        if score < 0.3:
            return 'CRITICAL'
        elif score < 0.5:
            return 'HIGH'
        elif score < 0.7:
            return 'MEDIUM'
        else:
            return 'LOW'

    async def generate_quality_report(self):
        """品質レポートの生成"""
        recent_scores = [
            h['score'] for h in self.monitoring_stats['quality_history']
        ]

        if not recent_scores:
            return

        report = {
            'timestamp': datetime.now(),
            'period_stats': {
                'avg_quality_score': np.mean(recent_scores),
                'min_quality_score': np.min(recent_scores),
                'max_quality_score': np.max(recent_scores),
                'std_quality_score': np.std(recent_scores),
                'alert_rate': self.monitoring_stats['alerts_raised'] / 
                            self.monitoring_stats['total_checks']
            },
            'quality_trend': self.calculate_quality_trend(),
            'source_breakdown': self.analyze_by_source()
        }

        return report

    def calculate_quality_trend(self):
        """品質トレンドの計算"""
        history = list(self.monitoring_stats['quality_history'])
        if len(history) < 10:
            return 'insufficient_data'

        # 最近のスコアと過去のスコアの比較
        recent = [h['score'] for h in history[-50:]]
        past = [h['score'] for h in history[-100:-50]]

        if not past:
            return 'improving'  # データが少ない場合

        recent_avg = np.mean(recent)
        past_avg = np.mean(past)

        if recent_avg > past_avg * 1.05:
            return 'improving'
        elif recent_avg < past_avg * 0.95:
            return 'degrading'
        else:
            return 'stable'

3. MLモデル評価システム

3.1 モデル性能評価指標

class MLModelEvaluator:
    def __init__(self):
        self.evaluation_metrics = {
            'regression': [
                'mse', 'rmse', 'mae', 'mape', 'r2',
                'directional_accuracy', 'profit_factor'
            ],
            'classification': [
                'accuracy', 'precision', 'recall', 'f1',
                'auc_roc', 'log_loss', 'confusion_matrix'
            ],
            'time_series': [
                'mase', 'smape', 'msis', 'coverage',
                'sharpness', 'calibration'
            ]
        }

    def evaluate_price_prediction_model(self, y_true, y_pred, prices):
        """価格予測モデルの総合評価"""
        evaluation_results = {}

        # 基本的な回帰指標
        evaluation_results['regression_metrics'] = self.calculate_regression_metrics(
            y_true, y_pred
        )

        # 方向性の精度
        evaluation_results['directional_metrics'] = self.calculate_directional_metrics(
            y_true, y_pred
        )

        # 取引シミュレーション
        evaluation_results['trading_metrics'] = self.calculate_trading_metrics(
            y_true, y_pred, prices
        )

        # 予測の信頼区間
        evaluation_results['prediction_intervals'] = self.calculate_prediction_intervals(
            y_true, y_pred
        )

        # リスク調整後指標
        evaluation_results['risk_adjusted_metrics'] = self.calculate_risk_adjusted_metrics(
            y_true, y_pred, prices
        )

        # 総合スコア
        evaluation_results['overall_score'] = self.calculate_overall_score(
            evaluation_results
        )

        return evaluation_results

    def calculate_regression_metrics(self, y_true, y_pred):
        """回帰指標の計算"""
        metrics = {}

        # MSE (Mean Squared Error)
        metrics['mse'] = np.mean((y_true - y_pred) ** 2)

        # RMSE (Root Mean Squared Error)
        metrics['rmse'] = np.sqrt(metrics['mse'])

        # MAE (Mean Absolute Error)
        metrics['mae'] = np.mean(np.abs(y_true - y_pred))

        # MAPE (Mean Absolute Percentage Error)
        # ゼロ除算を避ける
        mask = y_true != 0
        if mask.any():
            metrics['mape'] = np.mean(np.abs((y_true[mask] - y_pred[mask]) / y_true[mask])) * 100
        else:
            metrics['mape'] = np.inf

        # R² (決定係数)
        ss_tot = np.sum((y_true - np.mean(y_true)) ** 2)
        ss_res = np.sum((y_true - y_pred) ** 2)
        metrics['r2'] = 1 - (ss_res / ss_tot) if ss_tot != 0 else 0

        # Symmetric MAPE
        denominator = (np.abs(y_true) + np.abs(y_pred)) / 2
        mask = denominator != 0
        if mask.any():
            metrics['smape'] = np.mean(
                np.abs(y_true[mask] - y_pred[mask]) / denominator[mask]
            ) * 100
        else:
            metrics['smape'] = 0

        return metrics

    def calculate_directional_metrics(self, y_true, y_pred):
        """方向性精度の計算"""
        metrics = {}

        # 実際の方向(上昇/下降)
        true_direction = np.sign(np.diff(y_true))
        pred_direction = np.sign(np.diff(y_pred))

        # 方向性精度
        correct_direction = (true_direction == pred_direction)
        metrics['directional_accuracy'] = np.mean(correct_direction) * 100

        # 方向別の精度
        up_mask = true_direction > 0
        down_mask = true_direction < 0

        if up_mask.any():
            metrics['upward_accuracy'] = np.mean(correct_direction[up_mask]) * 100
        else:
            metrics['upward_accuracy'] = None

        if down_mask.any():
            metrics['downward_accuracy'] = np.mean(correct_direction[down_mask]) * 100
        else:
            metrics['downward_accuracy'] = None

        # 方向の一致度(相関)
        metrics['direction_correlation'] = np.corrcoef(true_direction, pred_direction)[0, 1]

        return metrics

    def calculate_trading_metrics(self, y_true, y_pred, prices):
        """取引シミュレーションに基づく指標"""
        metrics = {}

        # 予測に基づく取引シグナル
        signals = np.where(y_pred > prices, 1, -1)  # 買い: 1, 売り: -1

        # リターンの計算
        actual_returns = np.diff(y_true) / y_true[:-1]
        strategy_returns = signals[:-1] * actual_returns

        # 累積リターン
        metrics['cumulative_return'] = (1 + strategy_returns).prod() - 1

        # シャープレシオ
        if strategy_returns.std() != 0:
            metrics['sharpe_ratio'] = np.sqrt(252) * strategy_returns.mean() / strategy_returns.std()
        else:
            metrics['sharpe_ratio'] = 0

        # 最大ドローダウン
        cumulative = (1 + strategy_returns).cumprod()
        running_max = np.maximum.accumulate(cumulative)
        drawdown = (cumulative - running_max) / running_max
        metrics['max_drawdown'] = drawdown.min() * 100

        # 勝率
        winning_trades = strategy_returns > 0
        metrics['win_rate'] = np.mean(winning_trades) * 100

        # プロフィットファクター
        gross_profit = strategy_returns[strategy_returns > 0].sum()
        gross_loss = abs(strategy_returns[strategy_returns < 0].sum())
        metrics['profit_factor'] = gross_profit / gross_loss if gross_loss != 0 else np.inf

        return metrics

    def calculate_prediction_intervals(self, y_true, y_pred, confidence=0.95):
        """予測区間の計算"""
        residuals = y_true - y_pred

        # 残差の標準偏差
        std_residuals = np.std(residuals)

        # 正規分布を仮定した予測区間
        z_score = 1.96  # 95%信頼区間

        intervals = {
            'lower_bound': y_pred - z_score * std_residuals,
            'upper_bound': y_pred + z_score * std_residuals,
            'coverage': np.mean(
                (y_true >= y_pred - z_score * std_residuals) & 
                (y_true <= y_pred + z_score * std_residuals)
            ) * 100,
            'interval_width': 2 * z_score * std_residuals
        }

        return intervals

    def calculate_risk_adjusted_metrics(self, y_true, y_pred, prices):
        """リスク調整後の評価指標"""
        metrics = {}

        # 予測誤差
        errors = y_true - y_pred

        # 下方リスク(ダウンサイドリスク)
        downside_errors = errors[errors < 0]
        if len(downside_errors) > 0:
            metrics['downside_deviation'] = np.std(downside_errors)

            # ソルティノレシオ
            returns = np.diff(y_true) / y_true[:-1]
            mean_return = returns.mean()
            metrics['sortino_ratio'] = np.sqrt(252) * mean_return / metrics['downside_deviation']
        else:
            metrics['downside_deviation'] = 0
            metrics['sortino_ratio'] = np.inf

        # VaR (Value at Risk)
        metrics['var_95'] = np.percentile(errors, 5)

        # CVaR (Conditional Value at Risk)
        var_threshold = metrics['var_95']
        tail_errors = errors[errors <= var_threshold]
        metrics['cvar_95'] = tail_errors.mean() if len(tail_errors) > 0 else var_threshold

        # カルマーレシオ
        cumulative_return = ((1 + np.diff(y_true) / y_true[:-1]).prod() - 1)
        max_dd = self.calculate_max_drawdown(y_true)
        metrics['calmar_ratio'] = cumulative_return / abs(max_dd) if max_dd != 0 else 0

        return metrics

3.2 モデルドリフト検出

class ModelDriftDetector:
    def __init__(self, baseline_performance, sensitivity=0.1):
        self.baseline_performance = baseline_performance
        self.sensitivity = sensitivity
        self.performance_history = deque(maxlen=100)
        self.drift_alerts = []

    def detect_drift(self, current_performance, feature_distributions=None):
        """モデルドリフトの検出"""
        drift_indicators = {}

        # パフォーマンスドリフト
        perf_drift = self.detect_performance_drift(current_performance)
        drift_indicators['performance_drift'] = perf_drift

        # 予測分布のドリフト
        if 'prediction_distribution' in current_performance:
            pred_drift = self.detect_prediction_drift(
                current_performance['prediction_distribution']
            )
            drift_indicators['prediction_drift'] = pred_drift

        # 特徴量分布のドリフト
        if feature_distributions:
            feature_drift = self.detect_feature_drift(feature_distributions)
            drift_indicators['feature_drift'] = feature_drift

        # 総合的なドリフト判定
        overall_drift = self.assess_overall_drift(drift_indicators)

        # アラート生成
        if overall_drift['is_drifting']:
            self.generate_drift_alert(overall_drift)

        return overall_drift

    def detect_performance_drift(self, current_performance):
        """パフォーマンスメトリクスのドリフト検出"""
        drift_results = {}

        for metric, baseline_value in self.baseline_performance.items():
            if metric in current_performance:
                current_value = current_performance[metric]

                # 相対的な変化
                if baseline_value != 0:
                    relative_change = abs(current_value - baseline_value) / abs(baseline_value)
                else:
                    relative_change = abs(current_value - baseline_value)

                # ドリフト判定
                is_drifting = relative_change > self.sensitivity

                drift_results[metric] = {
                    'baseline': baseline_value,
                    'current': current_value,
                    'relative_change': relative_change,
                    'is_drifting': is_drifting
                }

        return drift_results

    def detect_prediction_drift(self, current_predictions):
        """予測分布のドリフト検出(KSテスト)"""
        from scipy import stats

        if not hasattr(self, 'baseline_predictions'):
            self.baseline_predictions = current_predictions
            return {'is_drifting': False, 'message': 'Baseline set'}

        # Kolmogorov-Smirnov検定
        ks_statistic, p_value = stats.ks_2samp(
            self.baseline_predictions,
            current_predictions
        )

        is_drifting = p_value < 0.05  # 5%有意水準

        return {
            'is_drifting': is_drifting,
            'ks_statistic': ks_statistic,
            'p_value': p_value,
            'interpretation': 'Significant drift detected' if is_drifting else 'No significant drift'
        }

    def detect_feature_drift(self, feature_distributions):
        """特徴量分布のドリフト検出"""
        drift_results = {}

        for feature_name, current_dist in feature_distributions.items():
            if hasattr(self, f'baseline_dist_{feature_name}'):
                baseline_dist = getattr(self, f'baseline_dist_{feature_name}')

                # PSI (Population Stability Index) の計算
                psi = self.calculate_psi(baseline_dist, current_dist)

                # ドリフト判定(PSI > 0.2 で大きなドリフト)
                drift_level = self.interpret_psi(psi)

                drift_results[feature_name] = {
                    'psi': psi,
                    'drift_level': drift_level,
                    'is_drifting': psi > 0.1
                }
            else:
                # ベースラインの設定
                setattr(self, f'baseline_dist_{feature_name}', current_dist)
                drift_results[feature_name] = {
                    'psi': 0,
                    'drift_level': 'baseline_set',
                    'is_drifting': False
                }

        return drift_results

    def calculate_psi(self, baseline_dist, current_dist, buckets=10):
        """Population Stability Index の計算"""
        # ヒストグラムの作成
        min_val = min(baseline_dist.min(), current_dist.min())
        max_val = max(baseline_dist.max(), current_dist.max())
        bins = np.linspace(min_val, max_val, buckets + 1)

        # 各ビンの割合を計算
        baseline_percents = np.histogram(baseline_dist, bins=bins)[0] / len(baseline_dist)
        current_percents = np.histogram(current_dist, bins=bins)[0] / len(current_dist)

        # 0を避けるための小さな値を追加
        baseline_percents = np.maximum(baseline_percents, 0.0001)
        current_percents = np.maximum(current_percents, 0.0001)

        # PSI計算
        psi = np.sum(
            (current_percents - baseline_percents) * 
            np.log(current_percents / baseline_percents)
        )

        return psi

    def interpret_psi(self, psi):
        """PSI値の解釈"""
        if psi < 0.1:
            return 'no_drift'
        elif psi < 0.2:
            return 'slight_drift'
        else:
            return 'significant_drift'

4. データ品質改善システム

4.1 自動データクリーニング

class AutoDataCleaner:
    def __init__(self):
        self.cleaning_strategies = {
            'missing_values': self.handle_missing_values,
            'outliers': self.handle_outliers,
            'duplicates': self.handle_duplicates,
            'inconsistencies': self.handle_inconsistencies
        }
        self.cleaning_log = []

    def clean_crypto_data(self, data: pd.DataFrame, config: Dict = None) -> pd.DataFrame:
        """暗号通貨データの自動クリーニング"""
        config = config or self.get_default_config()
        cleaned_data = data.copy()

        # クリーニング前の品質スコア
        quality_scorer = DataQualityScorer()
        before_score = quality_scorer.calculate_quality_score(cleaned_data)

        # 各クリーニング戦略の適用
        for issue_type, strategy in self.cleaning_strategies.items():
            if config.get(f'clean_{issue_type}', True):
                cleaned_data = strategy(cleaned_data, config)

        # クリーニング後の品質スコア
        after_score = quality_scorer.calculate_quality_score(cleaned_data)

        # クリーニング結果の記録
        self.log_cleaning_results(before_score, after_score, len(data), len(cleaned_data))

        return cleaned_data

    def handle_missing_values(self, data: pd.DataFrame, config: Dict) -> pd.DataFrame:
        """欠損値の処理"""
        strategy = config.get('missing_value_strategy', 'interpolate')

        if strategy == 'interpolate':
            # 時系列補間
            numeric_columns = data.select_dtypes(include=[np.number]).columns
            data[numeric_columns] = data[numeric_columns].interpolate(method='time')

        elif strategy == 'forward_fill':
            # 前方補完
            data = data.fillna(method='ffill')

        elif strategy == 'rolling_mean':
            # 移動平均で補完
            window = config.get('rolling_window', 5)
            numeric_columns = data.select_dtypes(include=[np.number]).columns
            for col in numeric_columns:
                data[col] = data[col].fillna(data[col].rolling(window, min_periods=1).mean())

        return data

    def handle_outliers(self, data: pd.DataFrame, config: Dict) -> pd.DataFrame:
        """外れ値の処理"""
        method = config.get('outlier_method', 'iqr')
        threshold = config.get('outlier_threshold', 3)

        numeric_columns = ['open', 'high', 'low', 'close', 'volume']
        numeric_columns = [col for col in numeric_columns if col in data.columns]

        for col in numeric_columns:
            if method == 'iqr':
                # IQR法
                Q1 = data[col].quantile(0.25)
                Q3 = data[col].quantile(0.75)
                IQR = Q3 - Q1

                lower_bound = Q1 - threshold * IQR
                upper_bound = Q3 + threshold * IQR

                # 外れ値をクリップ
                data[col] = data[col].clip(lower_bound, upper_bound)

            elif method == 'zscore':
                # Zスコア法
                mean = data[col].mean()
                std = data[col].std()

                z_scores = np.abs((data[col] - mean) / std)
                data.loc[z_scores > threshold, col] = mean

        return data

    def handle_duplicates(self, data: pd.DataFrame, config: Dict) -> pd.DataFrame:
        """重複データの処理"""
        # タイムスタンプの重複を削除
        if 'timestamp' in data.columns:
            data = data.drop_duplicates(subset=['timestamp'], keep='last')

        # 完全に同一の行を削除
        data = data.drop_duplicates()

        return data

    def handle_inconsistencies(self, data: pd.DataFrame, config: Dict) -> pd.DataFrame:
        """データの不整合を修正"""
        # OHLC関係の修正
        if all(col in data.columns for col in ['open', 'high', 'low', 'close']):
            # highが最大値になるよう修正
            data['high'] = data[['open', 'high', 'low', 'close']].max(axis=1)

            # lowが最小値になるよう修正
            data['low'] = data[['open', 'high', 'low', 'close']].min(axis=1)

        # 負のボリュームを0に修正
        if 'volume' in data.columns:
            data.loc[data['volume'] < 0, 'volume'] = 0

        # タイムスタンプの順序を確保
        if 'timestamp' in data.columns:
            data = data.sort_values('timestamp')

        return data

4.2 品質メトリクスダッシュボード

class QualityMetricsDashboard:
    def __init__(self):
        self.metrics_history = defaultdict(list)
        self.alert_thresholds = {
            'total_score': 0.7,
            'completeness': 0.8,
            'accuracy': 0.75,
            'consistency': 0.8,
            'timeliness': 0.6
        }

    def update_metrics(self, source: str, quality_result: Dict):
        """メトリクスの更新"""
        timestamp = quality_result['timestamp']

        # 履歴に追加
        self.metrics_history[source].append({
            'timestamp': timestamp,
            'total_score': quality_result['total_score'],
            'dimensions': quality_result['dimension_scores']
        })

        # 閾値チェック
        alerts = self.check_thresholds(source, quality_result)

        return alerts

    def generate_summary_report(self, time_window: timedelta = timedelta(hours=24)):
        """サマリーレポートの生成"""
        cutoff_time = datetime.now() - time_window
        report = {}

        for source, history in self.metrics_history.items():
            # 指定期間内のデータをフィルタ
            recent_data = [
                h for h in history 
                if h['timestamp'] > cutoff_time
            ]

            if not recent_data:
                continue

            # 統計情報の計算
            scores = [h['total_score'] for h in recent_data]
            dimension_scores = defaultdict(list)

            for h in recent_data:
                for dim, score in h['dimensions'].items():
                    dimension_scores[dim].append(score)

            report[source] = {
                'avg_total_score': np.mean(scores),
                'min_total_score': np.min(scores),
                'max_total_score': np.max(scores),
                'score_volatility': np.std(scores),
                'sample_count': len(recent_data),
                'dimension_averages': {
                    dim: np.mean(scores) 
                    for dim, scores in dimension_scores.items()
                },
                'quality_trend': self.calculate_trend(scores)
            }

        return report

    def calculate_trend(self, scores: List[float]) -> str:
        """品質トレンドの計算"""
        if len(scores) < 10:
            return 'insufficient_data'

        # 線形回帰でトレンドを計算
        x = np.arange(len(scores))
        slope, _ = np.polyfit(x, scores, 1)

        if slope > 0.001:
            return 'improving'
        elif slope < -0.001:
            return 'degrading'
        else:
            return 'stable'

    def visualize_quality_metrics(self, source: str, save_path: str = None):
        """品質メトリクスの可視化"""
        import matplotlib.pyplot as plt
        import seaborn as sns

        if source not in self.metrics_history:
            return

        history = self.metrics_history[source]

        # データフレームに変換
        df_data = []
        for h in history:
            row = {'timestamp': h['timestamp'], 'total_score': h['total_score']}
            row.update(h['dimensions'])
            df_data.append(row)

        df = pd.DataFrame(df_data)

        # 図の作成
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))

        # 1. 総合スコアの時系列
        ax = axes[0, 0]
        df.plot(x='timestamp', y='total_score', ax=ax, legend=False)
        ax.axhline(y=self.alert_thresholds['total_score'], color='r', linestyle='--', label='Alert Threshold')
        ax.set_title(f'Total Quality Score - {source}')
        ax.set_ylabel('Score')
        ax.legend()

        # 2. 各次元のスコア
        ax = axes[0, 1]
        dimensions = ['completeness', 'accuracy', 'consistency', 'timeliness', 'validity']
        dimension_cols = [col for col in dimensions if col in df.columns]

        if dimension_cols:
            df[dimension_cols].plot(ax=ax)
            ax.set_title('Quality Dimensions Over Time')
            ax.set_ylabel('Score')
            ax.legend()

        # 3. スコア分布
        ax = axes[1, 0]
        df['total_score'].hist(bins=30, ax=ax)
        ax.set_title('Quality Score Distribution')
        ax.set_xlabel('Total Score')
        ax.set_ylabel('Frequency')

        # 4. 品質ヒートマップ
        ax = axes[1, 1]
        if len(df) > 0 and dimension_cols:
            # 時間をビンに分割
            df['time_bin'] = pd.cut(df.index, bins=min(20, len(df)))
            heatmap_data = df.groupby('time_bin')[dimension_cols].mean()

            sns.heatmap(heatmap_data.T, cmap='RdYlGn', vmin=0, vmax=1, ax=ax)
            ax.set_title('Quality Dimensions Heatmap')
            ax.set_xlabel('Time Period')

        plt.tight_layout()

        if save_path:
            plt.savefig(save_path)
        else:
            plt.show()

        plt.close()

5. 統合品質管理システム

5.1 エンドツーエンド品質管理

class IntegratedQualityManagementSystem:
    def __init__(self):
        self.quality_scorer = DataQualityScorer()
        self.quality_monitor = RealTimeQualityMonitor()
        self.model_evaluator = MLModelEvaluator()
        self.drift_detector = ModelDriftDetector(baseline_performance={})
        self.data_cleaner = AutoDataCleaner()
        self.dashboard = QualityMetricsDashboard()

    async def process_data_pipeline(self, raw_data: pd.DataFrame, 
                                  source: str, 
                                  model_predictions: np.ndarray = None):
        """データパイプライン全体の品質管理"""
        pipeline_results = {
            'timestamp': datetime.now(),
            'source': source,
            'stages': {}
        }

        # 1. 入力データの品質評価
        input_quality = self.quality_scorer.calculate_quality_score(raw_data)
        pipeline_results['stages']['input_quality'] = input_quality

        # 2. データクリーニング
        if input_quality['total_score'] < 0.8:
            cleaned_data = self.data_cleaner.clean_crypto_data(raw_data)
            post_cleaning_quality = self.quality_scorer.calculate_quality_score(cleaned_data)
            pipeline_results['stages']['cleaning'] = {
                'applied': True,
                'quality_improvement': post_cleaning_quality['total_score'] - input_quality['total_score']
            }
        else:
            cleaned_data = raw_data
            pipeline_results['stages']['cleaning'] = {'applied': False}

        # 3. モデル予測の評価(該当する場合)
        if model_predictions is not None:
            model_performance = self.model_evaluator.evaluate_price_prediction_model(
                cleaned_data['close'].values[1:],  # 実際の値
                model_predictions,  # 予測値
                cleaned_data['close'].values[:-1]  # 価格
            )
            pipeline_results['stages']['model_evaluation'] = model_performance

            # 4. ドリフト検出
            drift_result = self.drift_detector.detect_drift(
                model_performance['regression_metrics']
            )
            pipeline_results['stages']['drift_detection'] = drift_result

        # 5. ダッシュボード更新
        alerts = self.dashboard.update_metrics(source, input_quality)
        pipeline_results['alerts'] = alerts

        # 6. 品質改善の推奨事項
        recommendations = self.generate_recommendations(pipeline_results)
        pipeline_results['recommendations'] = recommendations

        return pipeline_results

    def generate_recommendations(self, pipeline_results: Dict) -> List[str]:
        """品質改善の推奨事項生成"""
        recommendations = []

        # 入力品質に基づく推奨
        input_quality = pipeline_results['stages']['input_quality']

        if input_quality['total_score'] < 0.7:
            recommendations.append("Critical: データ品質が基準を下回っています。データソースの確認が必要です。")

        # 各次元の推奨
        for dimension, score in input_quality['dimension_scores'].items():
            if score < 0.7:
                if dimension == 'completeness':
                    recommendations.append("欠損データが多いです。データ収集プロセスを見直してください。")
                elif dimension == 'accuracy':
                    recommendations.append("データの正確性に問題があります。外れ値処理を強化してください。")
                elif dimension == 'consistency':
                    recommendations.append("データの一貫性が低いです。検証ルールを追加してください。")
                elif dimension == 'timeliness':
                    recommendations.append("データの遅延が発生しています。リアルタイム性を改善してください。")

        # モデル関連の推奨
        if 'model_evaluation' in pipeline_results['stages']:
            model_eval = pipeline_results['stages']['model_evaluation']
            if model_eval['overall_score'] < 0.6:
                recommendations.append("モデルの性能が低下しています。再訓練を検討してください。")

        # ドリフト検出に基づく推奨
        if 'drift_detection' in pipeline_results['stages']:
            drift = pipeline_results['stages']['drift_detection']
            if drift.get('is_drifting', False):
                recommendations.append("モデルドリフトが検出されました。モデルの更新が必要です。")

        return recommendations

5.2 品質保証のベストプラクティス

class QualityAssuranceBestPractices:
    """品質保証のベストプラクティス実装"""

    @staticmethod
    def implement_data_validation_rules():
        """データ検証ルールの実装"""
        validation_rules = {
            'price_rules': [
                lambda x: x > 0,  # 正の価格
                lambda x: x < 1e9,  # 現実的な上限
                lambda x: not np.isnan(x)  # NaNチェック
            ],
            'volume_rules': [
                lambda x: x >= 0,  # 非負のボリューム
                lambda x: not np.isinf(x)  # 無限大チェック
            ],
            'timestamp_rules': [
                lambda x: x <= datetime.now(),  # 未来の日付でない
                lambda x: x > datetime(2009, 1, 3)  # ビットコイン開始後
            ]
        }
        return validation_rules

    @staticmethod
    def setup_monitoring_alerts():
        """監視アラートの設定"""
        alert_config = {
            'quality_alerts': {
                'critical': 0.5,  # 品質スコア < 0.5
                'warning': 0.7,   # 品質スコア < 0.7
                'info': 0.85      # 品質スコア < 0.85
            },
            'performance_alerts': {
                'accuracy_drop': 0.1,  # 10%以上の精度低下
                'latency_spike': 1000,  # 1秒以上の遅延
                'error_rate': 0.05     # 5%以上のエラー率
            },
            'data_alerts': {
                'missing_data': 0.1,    # 10%以上の欠損
                'outlier_ratio': 0.05,  # 5%以上の外れ値
                'duplicate_ratio': 0.01 # 1%以上の重複
            }
        }
        return alert_config

    @staticmethod
    def implement_continuous_improvement():
        """継続的改善プロセス"""
        improvement_cycle = {
            'measure': 'データ品質とモデル性能の定期測定',
            'analyze': '問題の根本原因分析',
            'improve': '改善策の実装',
            'control': '改善効果のモニタリング',
            'document': 'ベストプラクティスの文書化'
        }
        return improvement_cycle

まとめ

データ品質スコア算出とMLモデル評価システムの実装には、以下が重要です:

  1. 多次元的な品質評価: 完全性、正確性、一貫性、適時性、妥当性の総合評価
  2. リアルタイムモニタリング: 継続的な品質監視とアラート
  3. 自動データクリーニング: 品質問題の自動検出と修正
  4. モデル性能の包括的評価: 予測精度だけでなく、取引性能も評価
  5. ドリフト検出: データとモデルの経時的変化の監視
  6. 統合的アプローチ: データ品質とモデル品質の統合管理

これらの要素を組み合わせることで、信頼性の高い暗号通貨予測システムを構築できます。