ML Documentation

機械学習クロスバリデーションとオーバーフィッティング対策

概要

暗号資産取引における機械学習では、時系列データの特性により従来のクロスバリデーション手法では不十分です。本ガイドでは、金融時系列データに特化したクロスバリデーション手法とオーバーフィッティング対策を詳細に解説します。

時系列データの特殊性

1. 金融時系列の特徴

import numpy as np
import pandas as pd
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_squared_error, mean_absolute_error
import matplotlib.pyplot as plt

class FinancialTimeSeriesCharacteristics:
    """金融時系列データの特徴分析"""

    def __init__(self, data):
        self.data = data

    def analyze_stationarity(self, column='price'):
        """定常性の分析"""
        from statsmodels.tsa.stattools import adfuller

        # ADFテスト
        adf_result = adfuller(self.data[column].dropna())

        analysis = {
            'adf_statistic': adf_result[0],
            'p_value': adf_result[1],
            'critical_values': adf_result[4],
            'is_stationary': adf_result[1] < 0.05
        }

        # 差分系列での検証
        diff_series = self.data[column].diff().dropna()
        adf_diff = adfuller(diff_series)

        analysis['diff_stationary'] = {
            'adf_statistic': adf_diff[0],
            'p_value': adf_diff[1],
            'is_stationary': adf_diff[1] < 0.05
        }

        return analysis

    def detect_regime_changes(self, column='price', window=100):
        """市場レジーム変化の検出"""

        # ローリング統計量の計算
        rolling_mean = self.data[column].rolling(window).mean()
        rolling_std = self.data[column].rolling(window).std()

        # 変化点の検出
        mean_changes = np.abs(rolling_mean.diff()) > rolling_std * 2
        std_changes = rolling_std.diff() > rolling_std.shift(1) * 0.5

        regime_changes = mean_changes | std_changes

        return {
            'change_points': self.data.index[regime_changes].tolist(),
            'rolling_mean': rolling_mean,
            'rolling_std': rolling_std
        }

    def calculate_autocorrelation(self, column='returns', max_lags=50):
        """自己相関の計算"""
        from statsmodels.tsa.stattools import acf, pacf

        clean_data = self.data[column].dropna()

        # 自己相関係数
        autocorr = acf(clean_data, nlags=max_lags, fft=True)

        # 偏自己相関係数
        partial_autocorr = pacf(clean_data, nlags=max_lags)

        # 有意な相関の検出
        significant_lags = np.where(np.abs(autocorr[1:]) > 2/np.sqrt(len(clean_data)))[0] + 1

        return {
            'autocorrelation': autocorr,
            'partial_autocorrelation': partial_autocorr,
            'significant_lags': significant_lags.tolist()
        }

時系列特化クロスバリデーション

1. 前進分析(Forward Chaining)

class TimeSeriesForwardChaining:
    """時系列前進分析バリデーション"""

    def __init__(self, initial_train_size=1000, step_size=100, max_train_size=5000):
        self.initial_train_size = initial_train_size
        self.step_size = step_size
        self.max_train_size = max_train_size

    def split(self, X, y=None):
        """前進分析の分割生成"""

        n_samples = len(X)
        splits = []

        current_train_end = self.initial_train_size

        while current_train_end + self.step_size < n_samples:
            # 訓練セットのインデックス
            train_start = max(0, current_train_end - self.max_train_size)
            train_indices = np.arange(train_start, current_train_end)

            # テストセットのインデックス
            test_start = current_train_end
            test_end = min(current_train_end + self.step_size, n_samples)
            test_indices = np.arange(test_start, test_end)

            splits.append((train_indices, test_indices))

            current_train_end += self.step_size

        return splits

    def validate_model(self, model, X, y, scoring_funcs=None):
        """モデルの前進分析検証"""

        if scoring_funcs is None:
            scoring_funcs = {
                'mse': mean_squared_error,
                'mae': mean_absolute_error
            }

        splits = self.split(X, y)
        results = {metric: [] for metric in scoring_funcs.keys()}
        results['train_sizes'] = []
        results['test_sizes'] = []

        for train_idx, test_idx in splits:
            # データ分割
            X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
            y_train, y_test = y.iloc[train_idx], y.iloc[test_idx]

            # モデル学習
            model.fit(X_train, y_train)

            # 予測
            y_pred = model.predict(X_test)

            # スコア計算
            for metric, func in scoring_funcs.items():
                score = func(y_test, y_pred)
                results[metric].append(score)

            results['train_sizes'].append(len(train_idx))
            results['test_sizes'].append(len(test_idx))

        return results

2. 時系列ブロッククロスバリデーション

class TimeSeriesBlockCV:
    """時系列ブロッククロスバリデーション"""

    def __init__(self, n_blocks=5, test_block_ratio=0.2, gap_size=0):
        self.n_blocks = n_blocks
        self.test_block_ratio = test_block_ratio
        self.gap_size = gap_size

    def split(self, X, y=None):
        """ブロック分割の生成"""

        n_samples = len(X)
        block_size = n_samples // self.n_blocks
        test_size = int(block_size * self.test_block_ratio)

        splits = []

        for i in range(self.n_blocks):
            # テストブロックの位置
            test_start = i * block_size + (block_size - test_size) // 2
            test_end = test_start + test_size

            # ギャップを考慮した訓練データ
            train_indices = []

            for j in range(self.n_blocks):
                if j == i:
                    continue

                block_start = j * block_size
                block_end = min((j + 1) * block_size, n_samples)

                # テストブロックとのギャップを設ける
                if j < i:
                    # テストブロックより前
                    gap_adjusted_end = min(block_end, test_start - self.gap_size)
                    if gap_adjusted_end > block_start:
                        train_indices.extend(range(block_start, gap_adjusted_end))
                else:
                    # テストブロックより後
                    gap_adjusted_start = max(block_start, test_end + self.gap_size)
                    if gap_adjusted_start < block_end:
                        train_indices.extend(range(gap_adjusted_start, block_end))

            if train_indices and test_start < n_samples:
                test_indices = list(range(test_start, min(test_end, n_samples)))
                splits.append((np.array(train_indices), np.array(test_indices)))

        return splits

3. プレディクティブバリデーション

class PredictiveValidation:
    """予測的バリデーション(時系列予測専用)"""

    def __init__(self, forecast_horizon=1, min_train_size=500):
        self.forecast_horizon = forecast_horizon
        self.min_train_size = min_train_size

    def split(self, X, y=None):
        """予測的分割の生成"""

        n_samples = len(X)
        splits = []

        for test_start in range(
            self.min_train_size + self.forecast_horizon, 
            n_samples, 
            self.forecast_horizon
        ):
            # 訓練データ(テスト開始点より前のすべて)
            train_indices = np.arange(0, test_start - self.forecast_horizon)

            # テストデータ(予測ホライズン分)
            test_end = min(test_start, n_samples)
            test_indices = np.arange(test_start - self.forecast_horizon, test_end)

            if len(train_indices) >= self.min_train_size and len(test_indices) > 0:
                splits.append((train_indices, test_indices))

        return splits

    def validate_with_forecasting(self, model, X, y, feature_lag=1):
        """予測モデルの検証"""

        splits = self.split(X, y)
        results = {
            'predictions': [],
            'actuals': [],
            'errors': [],
            'timestamps': []
        }

        for train_idx, test_idx in splits:
            # 特徴量のラグを考慮したデータ準備
            X_train = X.iloc[train_idx[:-feature_lag]]
            y_train = y.iloc[train_idx[feature_lag:]]

            X_test = X.iloc[test_idx[:-self.forecast_horizon]]
            y_test = y.iloc[test_idx[self.forecast_horizon:]]

            if len(X_train) == 0 or len(X_test) == 0:
                continue

            # モデル学習
            model.fit(X_train, y_train)

            # 予測
            y_pred = model.predict(X_test)

            # 結果保存
            results['predictions'].extend(y_pred.tolist())
            results['actuals'].extend(y_test.tolist())
            results['errors'].extend((y_pred - y_test).tolist())
            results['timestamps'].extend(y_test.index.tolist())

        return results

オーバーフィッティング検出

1. 学習曲線分析

class LearningCurveAnalyzer:
    """学習曲線によるオーバーフィッティング検出"""

    def __init__(self):
        self.results = {}

    def analyze_learning_curve(self, model, X, y, train_sizes=None, cv=None):
        """学習曲線の分析"""

        if train_sizes is None:
            train_sizes = np.linspace(0.1, 1.0, 10)

        if cv is None:
            cv = TimeSeriesForwardChaining()

        train_scores = []
        val_scores = []
        train_sizes_abs = []

        for size in train_sizes:
            size_scores_train = []
            size_scores_val = []

            # 各分割での評価
            splits = cv.split(X, y)

            for train_idx, val_idx in splits:
                # サイズ調整
                n_train_samples = int(len(train_idx) * size)
                if n_train_samples < 10:
                    continue

                train_subset = train_idx[:n_train_samples]

                # データ分割
                X_train = X.iloc[train_subset]
                y_train = y.iloc[train_subset]
                X_val = X.iloc[val_idx]
                y_val = y.iloc[val_idx]

                # モデル学習
                model.fit(X_train, y_train)

                # スコア計算
                train_pred = model.predict(X_train)
                val_pred = model.predict(X_val)

                train_score = mean_squared_error(y_train, train_pred)
                val_score = mean_squared_error(y_val, val_pred)

                size_scores_train.append(train_score)
                size_scores_val.append(val_score)

            if size_scores_train:
                train_scores.append(np.mean(size_scores_train))
                val_scores.append(np.mean(size_scores_val))
                train_sizes_abs.append(int(len(X) * size))

        return {
            'train_sizes': train_sizes_abs,
            'train_scores': train_scores,
            'validation_scores': val_scores,
            'overfitting_detected': self._detect_overfitting(train_scores, val_scores)
        }

    def _detect_overfitting(self, train_scores, val_scores, threshold=0.1):
        """オーバーフィッティングの検出"""

        if len(train_scores) < 3 or len(val_scores) < 3:
            return False

        # 最後の几つのポイントで評価
        recent_train = np.mean(train_scores[-3:])
        recent_val = np.mean(val_scores[-3:])

        # 検証スコアが訓練スコアより大幅に悪い場合
        score_gap = (recent_val - recent_train) / recent_train

        # 検証スコアが改善していない場合
        val_trend = np.polyfit(range(len(val_scores)), val_scores, 1)[0]

        return score_gap > threshold or val_trend > 0

2. アーリーストッピング

class TimeSeriesEarlyStopping:
    """時系列データ用アーリーストッピング"""

    def __init__(self, patience=10, min_delta=0.001, restore_best_weights=True):
        self.patience = patience
        self.min_delta = min_delta
        self.restore_best_weights = restore_best_weights
        self.best_score = float('inf')
        self.wait = 0
        self.best_weights = None

    def __call__(self, model, current_score, epoch):
        """アーリーストッピングの判定"""

        if current_score < self.best_score - self.min_delta:
            self.best_score = current_score
            self.wait = 0

            if self.restore_best_weights and hasattr(model, 'get_weights'):
                self.best_weights = model.get_weights()

        else:
            self.wait += 1

        should_stop = self.wait >= self.patience

        if should_stop and self.restore_best_weights and self.best_weights:
            if hasattr(model, 'set_weights'):
                model.set_weights(self.best_weights)

        return should_stop

class AdaptiveEarlyStopping:
    """適応的アーリーストッピング"""

    def __init__(self, base_patience=10, volatility_adjustment=True):
        self.base_patience = base_patience
        self.volatility_adjustment = volatility_adjustment
        self.score_history = []

    def calculate_dynamic_patience(self, validation_scores):
        """動的な忍耐値の計算"""

        if len(validation_scores) < 5:
            return self.base_patience

        # 直近のボラティリティを計算
        recent_scores = validation_scores[-10:]
        volatility = np.std(recent_scores) / np.mean(recent_scores)

        # ボラティリティが高い場合は忍耐値を増加
        if self.volatility_adjustment:
            adjusted_patience = self.base_patience * (1 + volatility)
            return int(min(adjusted_patience, self.base_patience * 2))
        else:
            return self.base_patience

正則化手法

1. 時系列特化正則化

class TimeSeriesRegularization:
    """時系列データ用正則化手法"""

    def __init__(self):
        self.temporal_weights = None

    def temporal_l2_regularization(self, model_weights, temporal_decay=0.9):
        """時間的L2正則化"""

        if self.temporal_weights is None:
            self.temporal_weights = np.ones_like(model_weights)

        # 時間的重要度による重み付け
        regularization_term = 0

        for i, weight in enumerate(model_weights):
            # より古いパラメータにより強い正則化を適用
            temporal_weight = temporal_decay ** i
            regularization_term += temporal_weight * np.sum(weight ** 2)

        return regularization_term

    def stability_regularization(self, predictions_t, predictions_t_minus_1, lambda_stability=0.1):
        """予測安定性正則化"""

        # 連続する予測間の安定性を促進
        stability_loss = lambda_stability * np.mean((predictions_t - predictions_t_minus_1) ** 2)

        return stability_loss

    def trend_consistency_regularization(self, predictions, actual_trend, lambda_trend=0.05):
        """トレンド一貫性正則化"""

        pred_trend = np.diff(predictions)
        actual_trend_diff = np.diff(actual_trend)

        # トレンド方向の一致を促進
        trend_consistency = lambda_trend * np.mean((pred_trend - actual_trend_diff) ** 2)

        return trend_consistency

2. ドロップアウトとノイズ注入

class TimeSeriesDataAugmentation:
    """時系列データ拡張とノイズ注入"""

    def __init__(self):
        self.noise_strategies = [
            'gaussian_noise',
            'temporal_masking',
            'feature_shuffling',
            'trend_reversal'
        ]

    def add_gaussian_noise(self, X, noise_factor=0.01):
        """ガウシアンノイズの追加"""

        noise = np.random.normal(0, noise_factor, X.shape)
        return X + noise

    def temporal_masking(self, X, mask_ratio=0.1):
        """時間的マスキング"""

        X_masked = X.copy()
        n_samples, n_features = X.shape

        # ランダムな時間窓をマスク
        mask_length = int(n_samples * mask_ratio)
        mask_start = np.random.randint(0, n_samples - mask_length)

        X_masked[mask_start:mask_start + mask_length] = 0

        return X_masked

    def feature_shuffling(self, X, shuffle_ratio=0.1):
        """特徴量シャッフリング"""

        X_shuffled = X.copy()
        n_samples, n_features = X.shape

        # ランダムな特徴量をシャッフル
        n_shuffle_features = int(n_features * shuffle_ratio)
        shuffle_indices = np.random.choice(n_features, n_shuffle_features, replace=False)

        for idx in shuffle_indices:
            permutation = np.random.permutation(n_samples)
            X_shuffled[:, idx] = X_shuffled[permutation, idx]

        return X_shuffled

    def trend_reversal_augmentation(self, X, reversal_ratio=0.05):
        """トレンド反転拡張"""

        X_augmented = []
        n_samples = len(X)

        # 元データを追加
        X_augmented.append(X)

        # 一部の期間でトレンドを反転
        reversal_length = int(n_samples * reversal_ratio)

        for _ in range(3):  # 3つの反転パターンを生成
            X_reversed = X.copy()
            start_idx = np.random.randint(0, n_samples - reversal_length)
            end_idx = start_idx + reversal_length

            # 指定された期間のトレンドを反転
            segment = X_reversed[start_idx:end_idx]
            reversed_segment = segment.iloc[::-1]
            X_reversed[start_idx:end_idx] = reversed_segment.values

            X_augmented.append(X_reversed)

        return X_augmented

モデル選択とハイパーパラメータ最適化

1. 時系列特化ハイパーパラメータ最適化

from sklearn.model_selection import GridSearchCV
import optuna

class TimeSeriesHyperparameterOptimization:
    """時系列データ用ハイパーパラメータ最適化"""

    def __init__(self, cv_strategy=None):
        self.cv_strategy = cv_strategy or TimeSeriesForwardChaining()

    def optimize_with_optuna(self, model_factory, X, y, n_trials=100):
        """Optunaによる最適化"""

        def objective(trial):
            # ハイパーパラメータの提案
            params = self._suggest_hyperparameters(trial, model_factory)

            # モデル作成
            model = model_factory(**params)

            # クロスバリデーション
            cv_results = self.cv_strategy.validate_model(model, X, y)

            # 平均検証スコアを返す
            return np.mean(cv_results['mse'])

        study = optuna.create_study(direction='minimize')
        study.optimize(objective, n_trials=n_trials)

        return {
            'best_params': study.best_params,
            'best_score': study.best_value,
            'study': study
        }

    def _suggest_hyperparameters(self, trial, model_factory):
        """モデル固有のハイパーパラメータ提案"""

        # ここでは一般的なパラメータの例
        params = {
            'n_estimators': trial.suggest_int('n_estimators', 50, 300),
            'max_depth': trial.suggest_int('max_depth', 3, 15),
            'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3),
            'subsample': trial.suggest_float('subsample', 0.6, 1.0),
        }

        return params

    def robust_model_selection(self, models_and_params, X, y, stability_weight=0.3):
        """安定性を考慮したモデル選択"""

        results = {}

        for model_name, (model_factory, param_grid) in models_and_params.items():
            model_results = []

            # 複数回の評価で安定性を測定
            for seed in range(5):
                np.random.seed(seed)

                # ハイパーパラメータ最適化
                opt_result = self.optimize_with_optuna(model_factory, X, y, n_trials=50)

                model_results.append({
                    'score': opt_result['best_score'],
                    'params': opt_result['best_params']
                })

            # 平均性能と安定性の計算
            scores = [r['score'] for r in model_results]
            mean_score = np.mean(scores)
            score_std = np.std(scores)

            # 安定性を考慮した総合スコア
            stability_penalty = score_std * stability_weight
            final_score = mean_score + stability_penalty

            results[model_name] = {
                'mean_score': mean_score,
                'score_std': score_std,
                'final_score': final_score,
                'best_params': model_results[np.argmin(scores)]['params']
            }

        return results

実践的な統合システム

1. 包括的バリデーションフレームワーク

class ComprehensiveValidationFramework:
    """包括的バリデーションフレームワーク"""

    def __init__(self):
        self.validators = {
            'forward_chaining': TimeSeriesForwardChaining(),
            'block_cv': TimeSeriesBlockCV(),
            'predictive': PredictiveValidation()
        }
        self.early_stopping = TimeSeriesEarlyStopping()
        self.regularization = TimeSeriesRegularization()

    def comprehensive_validation(self, model, X, y, validation_strategy='all'):
        """包括的バリデーション実行"""

        results = {}

        if validation_strategy == 'all':
            strategies = self.validators.keys()
        else:
            strategies = [validation_strategy]

        for strategy_name in strategies:
            validator = self.validators[strategy_name]

            # バリデーション実行
            if hasattr(validator, 'validate_model'):
                strategy_results = validator.validate_model(model, X, y)
            else:
                strategy_results = self._manual_validation(validator, model, X, y)

            results[strategy_name] = strategy_results

        # 統合評価
        final_assessment = self._integrate_results(results)

        return {
            'individual_results': results,
            'integrated_assessment': final_assessment
        }

    def _integrate_results(self, results):
        """結果の統合評価"""

        all_scores = []

        for strategy, result in results.items():
            if 'mse' in result:
                all_scores.extend(result['mse'])

        if not all_scores:
            return {'overall_score': float('inf'), 'confidence': 0}

        overall_score = np.mean(all_scores)
        score_std = np.std(all_scores)

        # 信頼度スコア(標準偏差が小さいほど高信頼)
        confidence = 1.0 / (1.0 + score_std / overall_score)

        return {
            'overall_score': overall_score,
            'score_std': score_std,
            'confidence': confidence,
            'overfitting_risk': self._assess_overfitting_risk(results)
        }

    def _assess_overfitting_risk(self, results):
        """オーバーフィッティングリスクの評価"""

        # 複数の戦略で一貫して良いスコアが出ているかチェック
        strategy_scores = []

        for strategy, result in results.items():
            if 'mse' in result:
                strategy_scores.append(np.mean(result['mse']))

        if len(strategy_scores) < 2:
            return 'unknown'

        score_variance = np.var(strategy_scores)
        mean_score = np.mean(strategy_scores)

        # 相対的な分散でリスクを評価
        relative_variance = score_variance / (mean_score ** 2)

        if relative_variance > 0.1:
            return 'high'
        elif relative_variance > 0.05:
            return 'medium'
        else:
            return 'low'

まとめ

時系列機械学習における適切なクロスバリデーションとオーバーフィッティング対策には以下が重要です:

  1. 時系列特性の理解 - データリークを避ける適切な分割
  2. 複数検証手法の併用 - 単一手法の限界を補完
  3. 動的な早期停止 - 市場変動を考慮した学習制御
  4. 時系列特化正則化 - 時間的安定性とトレンド一貫性
  5. 安定性重視のモデル選択 - 単純な性能だけでなく頑健性を評価

これらの手法を適切に組み合わせることで、実際の取引環境で安定して機能する機械学習モデルを構築できます。