ML Documentation

アンサンブル学習とメタ学習戦略

1. はじめに

暗号通貨取引において、単一のモデルに依存することはリスクが高く、市場の複雑性を完全に捉えることは困難です。アンサンブル学習とメタ学習は、複数のモデルを組み合わせることで、より堅牢で適応的な取引システムを構築する手法です。

2. アンサンブル学習の基礎

2.1 バギング(Bootstrap Aggregating)

バギングは、同じアルゴリズムを異なるデータサブセットで学習させ、予測を集約する手法です。

import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.tree import DecisionTreeRegressor
from typing import List, Dict, Tuple
import joblib
from concurrent.futures import ProcessPoolExecutor

class CryptoBaggingEnsemble:
    def __init__(self, 
                 n_estimators: int = 100,
                 max_samples: float = 0.8,
                 n_jobs: int = -1):
        self.n_estimators = n_estimators
        self.max_samples = max_samples
        self.n_jobs = n_jobs
        self.estimators = []
        self.feature_importances = None

    def fit(self, X: np.ndarray, y: np.ndarray):
        """バギングモデルを学習"""
        n_samples = X.shape[0]
        sample_size = int(n_samples * self.max_samples)

        # 並列処理でモデルを学習
        with ProcessPoolExecutor(max_workers=self.n_jobs) as executor:
            futures = []

            for i in range(self.n_estimators):
                # ブートストラップサンプリング
                indices = np.random.choice(n_samples, sample_size, replace=True)
                X_boot = X[indices]
                y_boot = y[indices]

                # モデルの学習を並列実行
                future = executor.submit(self._train_single_model, X_boot, y_boot, i)
                futures.append(future)

            # 結果を収集
            self.estimators = [future.result() for future in futures]

        # 特徴量の重要度を計算
        self._calculate_feature_importances(X, y)

    def _train_single_model(self, X: np.ndarray, y: np.ndarray, seed: int):
        """単一モデルを学習"""
        model = DecisionTreeRegressor(
            max_depth=10,
            min_samples_split=20,
            random_state=seed
        )
        model.fit(X, y)
        return model

    def predict(self, X: np.ndarray) -> np.ndarray:
        """アンサンブル予測"""
        predictions = np.array([model.predict(X) for model in self.estimators])

        # 予測の集約(平均)
        mean_prediction = np.mean(predictions, axis=0)

        # 予測の不確実性も計算
        prediction_std = np.std(predictions, axis=0)

        return mean_prediction, prediction_std

    def predict_proba(self, X: np.ndarray) -> Dict[str, np.ndarray]:
        """確率的予測(分類タスク用)"""
        predictions = []

        for model in self.estimators:
            # 各モデルの予測を取得
            pred = model.predict(X)
            # 予測を確率に変換(シグモイド関数を使用)
            prob = 1 / (1 + np.exp(-pred))
            predictions.append(prob)

        predictions = np.array(predictions)

        return {
            'mean_prob': np.mean(predictions, axis=0),
            'std_prob': np.std(predictions, axis=0),
            'confidence': 1 - np.std(predictions, axis=0)  # 信頼度
        }

    def _calculate_feature_importances(self, X: np.ndarray, y: np.ndarray):
        """特徴量の重要度を計算"""
        importances = []

        for model in self.estimators:
            if hasattr(model, 'feature_importances_'):
                importances.append(model.feature_importances_)

        if importances:
            self.feature_importances = np.mean(importances, axis=0)

2.2 ブースティング(Gradient Boosting)

ブースティングは、弱学習器を逐次的に学習させ、前のモデルの誤差を補正する手法です。

import lightgbm as lgb
from sklearn.model_selection import TimeSeriesSplit

class CryptoGradientBoosting:
    def __init__(self, 
                 n_estimators: int = 1000,
                 learning_rate: float = 0.01,
                 max_depth: int = 6):
        self.n_estimators = n_estimators
        self.learning_rate = learning_rate
        self.max_depth = max_depth
        self.models = []
        self.feature_names = None

    def fit(self, X: pd.DataFrame, y: np.ndarray, 
            validation_split: float = 0.2):
        """時系列を考慮したGradient Boostingモデルを学習"""
        self.feature_names = X.columns.tolist()

        # 時系列分割
        n_samples = len(X)
        train_size = int(n_samples * (1 - validation_split))

        X_train = X.iloc[:train_size]
        y_train = y[:train_size]
        X_val = X.iloc[train_size:]
        y_val = y[train_size:]

        # LightGBMパラメータ
        params = {
            'objective': 'regression',
            'metric': 'rmse',
            'num_leaves': 2 ** self.max_depth - 1,
            'learning_rate': self.learning_rate,
            'n_estimators': self.n_estimators,
            'min_child_samples': 20,
            'subsample': 0.8,
            'colsample_bytree': 0.8,
            'reg_alpha': 0.1,
            'reg_lambda': 0.1,
            'random_state': 42,
            'n_jobs': -1,
            'silent': True
        }

        # モデルの学習
        train_data = lgb.Dataset(X_train, label=y_train)
        val_data = lgb.Dataset(X_val, label=y_val, reference=train_data)

        self.model = lgb.train(
            params,
            train_data,
            valid_sets=[val_data],
            early_stopping_rounds=50,
            verbose_eval=False
        )

        # 特徴量の重要度を取得
        self.feature_importance = pd.DataFrame({
            'feature': self.feature_names,
            'importance': self.model.feature_importance(importance_type='gain')
        }).sort_values('importance', ascending=False)

        return self

    def predict(self, X: pd.DataFrame) -> np.ndarray:
        """予測を実行"""
        return self.model.predict(X, num_iteration=self.model.best_iteration)

    def predict_with_uncertainty(self, X: pd.DataFrame, n_iterations: int = 100) -> Dict:
        """不確実性を含む予測"""
        # 複数の予測を生成(ドロップアウトを使用)
        predictions = []

        for i in range(n_iterations):
            # ランダムに特徴量をドロップ
            mask = np.random.binomial(1, 0.8, size=X.shape[1]).astype(bool)
            X_masked = X.iloc[:, mask]

            # 予測
            pred = self.model.predict(
                X_masked, 
                num_iteration=self.model.best_iteration
            )
            predictions.append(pred)

        predictions = np.array(predictions)

        return {
            'mean': np.mean(predictions, axis=0),
            'std': np.std(predictions, axis=0),
            'lower_bound': np.percentile(predictions, 5, axis=0),
            'upper_bound': np.percentile(predictions, 95, axis=0)
        }

class AdaptiveBoostingTrader:
    def __init__(self):
        self.weak_learners = []
        self.learner_weights = []
        self.performance_history = []

    def add_weak_learner(self, learner, initial_weight: float = 1.0):
        """弱学習器を追加"""
        self.weak_learners.append(learner)
        self.learner_weights.append(initial_weight)

    def update_weights(self, predictions: List[np.ndarray], 
                      actual: np.ndarray) -> None:
        """学習器の重みを更新"""
        errors = []

        for i, pred in enumerate(predictions):
            # 各学習器の誤差を計算
            error = np.mean(np.abs(pred - actual))
            errors.append(error)

        # 誤差に基づいて重みを更新
        errors = np.array(errors)
        # 誤差の逆数を重みとする
        new_weights = 1 / (errors + 1e-6)
        # 正規化
        self.learner_weights = new_weights / np.sum(new_weights)

        # パフォーマンス履歴を保存
        self.performance_history.append({
            'timestamp': pd.Timestamp.now(),
            'errors': errors,
            'weights': self.learner_weights.copy()
        })

    def predict(self, X: pd.DataFrame) -> np.ndarray:
        """加重アンサンブル予測"""
        predictions = []

        for learner in self.weak_learners:
            pred = learner.predict(X)
            predictions.append(pred)

        # 加重平均
        weighted_pred = np.average(
            predictions, 
            axis=0, 
            weights=self.learner_weights
        )

        return weighted_pred

2.3 スタッキング(Stacked Generalization)

スタッキングは、複数のベースモデルの予測を新たな特徴量として使用し、メタモデルで最終予測を行う手法です。

from sklearn.model_selection import KFold
from sklearn.linear_model import Ridge
from sklearn.neural_network import MLPRegressor
from xgboost import XGBRegressor

class CryptoStackingEnsemble:
    def __init__(self, 
                 base_models: List = None,
                 meta_model = None,
                 n_folds: int = 5):
        self.base_models = base_models or self._get_default_base_models()
        self.meta_model = meta_model or Ridge(alpha=1.0)
        self.n_folds = n_folds
        self.trained_base_models = []

    def _get_default_base_models(self):
        """デフォルトのベースモデル"""
        return [
            ('rf', RandomForestRegressor(n_estimators=100, random_state=42)),
            ('xgb', XGBRegressor(n_estimators=100, random_state=42)),
            ('mlp', MLPRegressor(hidden_layer_sizes=(100, 50), random_state=42)),
            ('lgb', lgb.LGBMRegressor(n_estimators=100, random_state=42))
        ]

    def fit(self, X: np.ndarray, y: np.ndarray):
        """スタッキングモデルを学習"""
        n_samples = X.shape[0]
        n_models = len(self.base_models)

        # ベースモデルの予測を格納する配列
        base_predictions = np.zeros((n_samples, n_models))

        # K-fold cross-validation
        kf = KFold(n_splits=self.n_folds, shuffle=False)  # 時系列なのでshuffleしない

        for fold_idx, (train_idx, val_idx) in enumerate(kf.split(X)):
            X_train_fold = X[train_idx]
            y_train_fold = y[train_idx]
            X_val_fold = X[val_idx]

            # 各ベースモデルを学習
            fold_models = []
            for model_idx, (name, model) in enumerate(self.base_models):
                # モデルのクローンを作成
                model_clone = self._clone_model(model)

                # 学習
                model_clone.fit(X_train_fold, y_train_fold)

                # 検証データで予測
                val_pred = model_clone.predict(X_val_fold)
                base_predictions[val_idx, model_idx] = val_pred

                fold_models.append((name, model_clone))

            self.trained_base_models.append(fold_models)

        # メタモデルを学習
        self.meta_model.fit(base_predictions, y)

        # 全データで最終的なベースモデルを学習
        self.final_base_models = []
        for name, model in self.base_models:
            model_clone = self._clone_model(model)
            model_clone.fit(X, y)
            self.final_base_models.append((name, model_clone))

    def predict(self, X: np.ndarray) -> np.ndarray:
        """スタッキング予測"""
        n_samples = X.shape[0]
        n_models = len(self.base_models)

        # ベースモデルの予測を取得
        base_predictions = np.zeros((n_samples, n_models))

        for model_idx, (name, model) in enumerate(self.final_base_models):
            base_predictions[:, model_idx] = model.predict(X)

        # メタモデルで最終予測
        final_prediction = self.meta_model.predict(base_predictions)

        return final_prediction

    def predict_with_confidence(self, X: np.ndarray) -> Dict:
        """信頼区間付き予測"""
        # 各フォールドのモデルで予測
        all_predictions = []

        for fold_models in self.trained_base_models:
            fold_base_predictions = []

            for model_idx, (name, model) in enumerate(fold_models):
                pred = model.predict(X)
                fold_base_predictions.append(pred)

            # このフォールドのメタ予測
            fold_base_predictions = np.column_stack(fold_base_predictions)
            fold_final_pred = self.meta_model.predict(fold_base_predictions)
            all_predictions.append(fold_final_pred)

        all_predictions = np.array(all_predictions)

        return {
            'mean': np.mean(all_predictions, axis=0),
            'std': np.std(all_predictions, axis=0),
            'lower_95': np.percentile(all_predictions, 2.5, axis=0),
            'upper_95': np.percentile(all_predictions, 97.5, axis=0)
        }

    def _clone_model(self, model):
        """モデルをクローン"""
        from sklearn.base import clone
        return clone(model)

    def get_model_contributions(self, X: np.ndarray) -> pd.DataFrame:
        """各モデルの貢献度を分析"""
        contributions = {}

        # ベースモデルの予測
        for name, model in self.final_base_models:
            contributions[name] = model.predict(X)

        # メタモデルの係数から重要度を取得
        if hasattr(self.meta_model, 'coef_'):
            importances = np.abs(self.meta_model.coef_)
            for i, (name, _) in enumerate(self.base_models):
                contributions[f'{name}_importance'] = importances[i]

        return pd.DataFrame(contributions)

3. メタ学習戦略

3.1 市場レジーム適応型メタ学習

from sklearn.mixture import GaussianMixture
from sklearn.preprocessing import StandardScaler

class MarketRegimeMetaLearner:
    def __init__(self, n_regimes: int = 3):
        self.n_regimes = n_regimes
        self.regime_detector = GaussianMixture(
            n_components=n_regimes,
            covariance_type='full',
            random_state=42
        )
        self.regime_models = {}
        self.scaler = StandardScaler()
        self.current_regime = None

    def identify_market_features(self, market_data: pd.DataFrame) -> np.ndarray:
        """市場の特徴量を抽出"""
        features = []

        # ボラティリティ
        returns = market_data['close'].pct_change()
        features.append(returns.rolling(20).std())

        # トレンド強度
        sma_20 = market_data['close'].rolling(20).mean()
        sma_50 = market_data['close'].rolling(50).mean()
        trend_strength = (sma_20 - sma_50) / sma_50
        features.append(trend_strength)

        # ボリューム変化率
        volume_ratio = market_data['volume'] / market_data['volume'].rolling(20).mean()
        features.append(volume_ratio)

        # RSI
        rsi = self._calculate_rsi(market_data['close'])
        features.append(rsi)

        # 市場の効率性(Hurst指数)
        hurst = self._calculate_hurst_exponent(returns.dropna())
        features.append(pd.Series(hurst, index=market_data.index))

        # 特徴量を結合
        feature_matrix = pd.concat(features, axis=1).dropna()

        return feature_matrix.values

    def fit_regime_detector(self, market_data: pd.DataFrame):
        """市場レジームを学習"""
        # 市場特徴量を抽出
        features = self.identify_market_features(market_data)

        # 正規化
        features_scaled = self.scaler.fit_transform(features)

        # レジームを学習
        self.regime_detector.fit(features_scaled)

        # 各データポイントのレジームを予測
        regimes = self.regime_detector.predict(features_scaled)

        return regimes

    def train_regime_specific_models(self, 
                                   X: pd.DataFrame,
                                   y: np.ndarray,
                                   market_data: pd.DataFrame):
        """レジーム別のモデルを学習"""
        # レジームを識別
        regimes = self.fit_regime_detector(market_data)

        # 各レジームごとにモデルを学習
        for regime in range(self.n_regimes):
            # このレジームのデータを抽出
            regime_mask = regimes == regime
            X_regime = X[regime_mask]
            y_regime = y[regime_mask]

            if len(X_regime) < 100:  # 十分なデータがない場合はスキップ
                continue

            # レジーム専用モデルを作成
            if regime == 0:  # 低ボラティリティレジーム
                model = self._create_low_volatility_model()
            elif regime == 1:  # トレンドレジーム
                model = self._create_trend_model()
            else:  # 高ボラティリティレジーム
                model = self._create_high_volatility_model()

            # モデルを学習
            model.fit(X_regime, y_regime)
            self.regime_models[regime] = model

            print(f"Regime {regime}: {len(X_regime)} samples")

    def predict(self, X: pd.DataFrame, market_data: pd.DataFrame) -> np.ndarray:
        """現在のレジームに応じた予測"""
        # 現在の市場レジームを識別
        current_features = self.identify_market_features(market_data.tail(100))
        current_features_scaled = self.scaler.transform(current_features[-1:])
        current_regime = self.regime_detector.predict(current_features_scaled)[0]

        # レジーム確率を取得
        regime_probs = self.regime_detector.predict_proba(current_features_scaled)[0]

        # 確率加重予測
        predictions = np.zeros(len(X))

        for regime, prob in enumerate(regime_probs):
            if regime in self.regime_models and prob > 0.1:
                regime_pred = self.regime_models[regime].predict(X)
                predictions += prob * regime_pred

        self.current_regime = current_regime

        return predictions

    def _create_low_volatility_model(self):
        """低ボラティリティ用モデル"""
        return Ridge(alpha=0.1)

    def _create_trend_model(self):
        """トレンド追従用モデル"""
        return XGBRegressor(
            n_estimators=200,
            max_depth=5,
            learning_rate=0.01,
            subsample=0.8
        )

    def _create_high_volatility_model(self):
        """高ボラティリティ用モデル"""
        return RandomForestRegressor(
            n_estimators=300,
            max_depth=10,
            min_samples_split=20,
            bootstrap=True
        )

    def _calculate_rsi(self, prices: pd.Series, period: int = 14) -> pd.Series:
        """RSIを計算"""
        delta = prices.diff()
        gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
        rs = gain / loss
        rsi = 100 - (100 / (1 + rs))
        return rsi

    def _calculate_hurst_exponent(self, returns: np.ndarray) -> float:
        """Hurst指数を計算"""
        if len(returns) < 100:
            return 0.5

        # R/S分析
        lags = range(2, min(100, len(returns) // 2))
        tau = []

        for lag in lags:
            # 平均を計算
            mean = np.mean(returns[:lag])
            # 累積偏差
            cumdev = np.cumsum(returns[:lag] - mean)
            # レンジ
            R = np.max(cumdev) - np.min(cumdev)
            # 標準偏差
            S = np.std(returns[:lag])

            if S != 0:
                tau.append(R / S)

        # log-log回帰でHurst指数を推定
        if len(tau) > 0:
            log_lags = np.log(list(lags))
            log_tau = np.log(tau)
            hurst = np.polyfit(log_lags, log_tau, 1)[0]
            return hurst

        return 0.5

3.2 マルチタイムフレームアンサンブル

class MultiTimeframeEnsemble:
    def __init__(self, 
                 timeframes: List[str] = ['5m', '15m', '1h', '4h', '1d']):
        self.timeframes = timeframes
        self.models = {}
        self.weights = {}
        self.performance_tracker = {}

    def prepare_multiframe_features(self, 
                                  base_data: pd.DataFrame,
                                  timeframe: str) -> pd.DataFrame:
        """各タイムフレーム用の特徴量を準備"""
        # タイムフレームに応じたリサンプリング
        resampled = self._resample_data(base_data, timeframe)

        features = pd.DataFrame(index=resampled.index)

        # 価格関連特徴量
        features[f'{timeframe}_returns'] = resampled['close'].pct_change()
        features[f'{timeframe}_log_returns'] = np.log(resampled['close'] / resampled['close'].shift(1))

        # 移動平均
        for period in [5, 10, 20]:
            features[f'{timeframe}_sma_{period}'] = resampled['close'].rolling(period).mean()
            features[f'{timeframe}_sma_{period}_slope'] = features[f'{timeframe}_sma_{period}'].diff()

        # ボラティリティ
        features[f'{timeframe}_volatility'] = features[f'{timeframe}_returns'].rolling(20).std()

        # ボリューム分析
        features[f'{timeframe}_volume_sma'] = resampled['volume'].rolling(20).mean()
        features[f'{timeframe}_volume_ratio'] = resampled['volume'] / features[f'{timeframe}_volume_sma']

        # テクニカル指標
        features[f'{timeframe}_rsi'] = self._calculate_rsi(resampled['close'])
        features[f'{timeframe}_macd'], features[f'{timeframe}_macd_signal'] = self._calculate_macd(resampled['close'])

        # ボリンジャーバンド
        bb_period = 20
        bb_std = 2
        sma = resampled['close'].rolling(bb_period).mean()
        std = resampled['close'].rolling(bb_period).std()
        features[f'{timeframe}_bb_upper'] = sma + (bb_std * std)
        features[f'{timeframe}_bb_lower'] = sma - (bb_std * std)
        features[f'{timeframe}_bb_width'] = features[f'{timeframe}_bb_upper'] - features[f'{timeframe}_bb_lower']
        features[f'{timeframe}_bb_position'] = (resampled['close'] - features[f'{timeframe}_bb_lower']) / features[f'{timeframe}_bb_width']

        return features.dropna()

    def train_timeframe_models(self, 
                             base_data: pd.DataFrame,
                             target: pd.Series):
        """各タイムフレームのモデルを学習"""
        for timeframe in self.timeframes:
            print(f"Training model for {timeframe} timeframe...")

            # 特徴量を準備
            features = self.prepare_multiframe_features(base_data, timeframe)

            # ターゲットをリサンプリング
            resampled_target = self._resample_target(target, timeframe)

            # インデックスを揃える
            common_index = features.index.intersection(resampled_target.index)
            X = features.loc[common_index]
            y = resampled_target.loc[common_index]

            # モデルを選択(タイムフレームに応じて)
            if timeframe in ['5m', '15m']:
                # 短期:高速モデル
                model = lgb.LGBMRegressor(
                    n_estimators=100,
                    max_depth=5,
                    learning_rate=0.1
                )
            elif timeframe in ['1h', '4h']:
                # 中期:バランス型
                model = XGBRegressor(
                    n_estimators=200,
                    max_depth=7,
                    learning_rate=0.05
                )
            else:
                # 長期:複雑なモデル
                model = RandomForestRegressor(
                    n_estimators=300,
                    max_depth=10,
                    min_samples_split=10
                )

            # 学習
            train_size = int(len(X) * 0.8)
            X_train = X[:train_size]
            y_train = y[:train_size]
            X_test = X[train_size:]
            y_test = y[train_size:]

            model.fit(X_train, y_train)

            # パフォーマンスを評価
            train_score = model.score(X_train, y_train)
            test_score = model.score(X_test, y_test)

            self.models[timeframe] = model
            self.performance_tracker[timeframe] = {
                'train_score': train_score,
                'test_score': test_score
            }

            # 初期重みを設定(パフォーマンスに基づく)
            self.weights[timeframe] = max(0.1, test_score)

        # 重みを正規化
        total_weight = sum(self.weights.values())
        self.weights = {k: v/total_weight for k, v in self.weights.items()}

    def predict_ensemble(self, base_data: pd.DataFrame) -> Dict[str, np.ndarray]:
        """アンサンブル予測"""
        predictions = {}
        weighted_prediction = None

        for timeframe in self.timeframes:
            if timeframe not in self.models:
                continue

            # 特徴量を準備
            features = self.prepare_multiframe_features(base_data, timeframe)

            # 予測
            pred = self.models[timeframe].predict(features)
            predictions[timeframe] = pred

            # 加重予測に追加
            if weighted_prediction is None:
                weighted_prediction = self.weights[timeframe] * pred
            else:
                # インデックスを揃えて加算
                min_len = min(len(weighted_prediction), len(pred))
                weighted_prediction[:min_len] += self.weights[timeframe] * pred[:min_len]

        predictions['ensemble'] = weighted_prediction

        return predictions

    def adaptive_weight_update(self, 
                             predictions: Dict[str, np.ndarray],
                             actual: np.ndarray):
        """適応的に重みを更新"""
        errors = {}

        for timeframe in self.timeframes:
            if timeframe in predictions:
                # 予測誤差を計算
                pred = predictions[timeframe]
                min_len = min(len(pred), len(actual))
                error = np.mean(np.abs(pred[:min_len] - actual[:min_len]))
                errors[timeframe] = error

        # 誤差に基づいて重みを更新(誤差が小さいほど高い重み)
        if errors:
            min_error = min(errors.values())
            for timeframe in errors:
                # 相対的なパフォーマンスに基づく重み
                relative_performance = min_error / (errors[timeframe] + 1e-6)
                # 既存の重みとのブレンド(モメンタム)
                self.weights[timeframe] = 0.7 * self.weights[timeframe] + \
                                        0.3 * relative_performance

            # 正規化
            total_weight = sum(self.weights.values())
            self.weights = {k: v/total_weight for k, v in self.weights.items()}

    def _resample_data(self, data: pd.DataFrame, timeframe: str) -> pd.DataFrame:
        """データをリサンプリング"""
        # タイムフレームをpandasの頻度文字列に変換
        freq_map = {
            '5m': '5T',
            '15m': '15T',
            '1h': '1H',
            '4h': '4H',
            '1d': '1D'
        }

        freq = freq_map.get(timeframe, '1H')

        # OHLCVデータをリサンプリング
        resampled = data.resample(freq).agg({
            'open': 'first',
            'high': 'max',
            'low': 'min',
            'close': 'last',
            'volume': 'sum'
        })

        return resampled.dropna()

    def _resample_target(self, target: pd.Series, timeframe: str) -> pd.Series:
        """ターゲットをリサンプリング"""
        freq_map = {
            '5m': '5T',
            '15m': '15T',
            '1h': '1H',
            '4h': '4H',
            '1d': '1D'
        }

        freq = freq_map.get(timeframe, '1H')

        # 最後の値を使用
        return target.resample(freq).last()

    def _calculate_macd(self, prices: pd.Series, 
                       fast: int = 12, 
                       slow: int = 26, 
                       signal: int = 9) -> Tuple[pd.Series, pd.Series]:
        """MACDを計算"""
        ema_fast = prices.ewm(span=fast).mean()
        ema_slow = prices.ewm(span=slow).mean()
        macd = ema_fast - ema_slow
        macd_signal = macd.ewm(span=signal).mean()

        return macd, macd_signal

4. モデルブレンディング技術

4.1 動的重み付けブレンディング

class DynamicModelBlender:
    def __init__(self, 
                 models: Dict[str, any],
                 lookback_window: int = 100):
        self.models = models
        self.lookback_window = lookback_window
        self.performance_history = {name: [] for name in models.keys()}
        self.current_weights = {name: 1/len(models) for name in models.keys()}

    def blend_predictions(self, 
                         X: pd.DataFrame,
                         market_conditions: Dict) -> np.ndarray:
        """市場状況に応じた動的ブレンディング"""
        predictions = {}

        # 各モデルの予測を取得
        for name, model in self.models.items():
            pred = model.predict(X)
            predictions[name] = pred

        # 市場状況に基づいて重みを調整
        adjusted_weights = self._adjust_weights_for_market(market_conditions)

        # ブレンド予測
        blended = np.zeros_like(list(predictions.values())[0])
        for name, pred in predictions.items():
            blended += adjusted_weights[name] * pred

        return blended

    def _adjust_weights_for_market(self, market_conditions: Dict) -> Dict:
        """市場状況に応じて重みを調整"""
        adjusted_weights = self.current_weights.copy()

        # ボラティリティが高い場合
        if market_conditions.get('volatility', 0) > 0.02:
            # 保守的なモデルの重みを増やす
            if 'ridge' in adjusted_weights:
                adjusted_weights['ridge'] *= 1.5
            if 'random_forest' in adjusted_weights:
                adjusted_weights['random_forest'] *= 0.8

        # トレンドが強い場合
        if abs(market_conditions.get('trend_strength', 0)) > 0.5:
            # トレンドフォロー型モデルの重みを増やす
            if 'xgboost' in adjusted_weights:
                adjusted_weights['xgboost'] *= 1.3
            if 'lstm' in adjusted_weights:
                adjusted_weights['lstm'] *= 1.2

        # 正規化
        total = sum(adjusted_weights.values())
        adjusted_weights = {k: v/total for k, v in adjusted_weights.items()}

        return adjusted_weights

    def update_weights_online(self, 
                            predictions: Dict[str, np.ndarray],
                            actual: np.ndarray):
        """オンラインで重みを更新"""
        # 各モデルのパフォーマンスを計算
        for name, pred in predictions.items():
            error = np.mean(np.abs(pred - actual))
            self.performance_history[name].append(error)

            # 移動窓でのパフォーマンスを計算
            if len(self.performance_history[name]) > self.lookback_window:
                self.performance_history[name].pop(0)

        # 重みを更新
        if all(len(hist) >= 10 for hist in self.performance_history.values()):
            # 各モデルの平均誤差
            avg_errors = {
                name: np.mean(hist) 
                for name, hist in self.performance_history.items()
            }

            # 誤差の逆数を重みとする
            min_error = min(avg_errors.values())
            for name in self.models.keys():
                self.current_weights[name] = min_error / (avg_errors[name] + 1e-6)

            # 正規化
            total = sum(self.current_weights.values())
            self.current_weights = {k: v/total for k, v in self.current_weights.items()}

4.2 ベイジアンモデル平均化

class BayesianModelAveraging:
    def __init__(self, models: List[Tuple[str, any]], prior_weights: np.ndarray = None):
        self.models = models
        self.n_models = len(models)

        # 事前分布
        if prior_weights is None:
            self.prior_weights = np.ones(self.n_models) / self.n_models
        else:
            self.prior_weights = prior_weights

        # 事後分布
        self.posterior_weights = self.prior_weights.copy()

    def update_posterior(self, X: np.ndarray, y: np.ndarray):
        """ベイズ更新で事後分布を更新"""
        likelihoods = np.zeros(self.n_models)

        for i, (name, model) in enumerate(self.models):
            # 予測
            pred = model.predict(X)

            # 尤度を計算(正規分布を仮定)
            residuals = y - pred
            sigma = np.std(residuals)

            # 対数尤度
            log_likelihood = -0.5 * np.sum((residuals / sigma) ** 2) - \
                           len(residuals) * np.log(sigma * np.sqrt(2 * np.pi))

            likelihoods[i] = np.exp(log_likelihood - np.max(log_likelihood))  # 数値安定性

        # ベイズ更新
        self.posterior_weights = self.prior_weights * likelihoods
        self.posterior_weights /= np.sum(self.posterior_weights)

    def predict(self, X: np.ndarray) -> Dict[str, np.ndarray]:
        """ベイジアンモデル平均による予測"""
        predictions = []

        for i, (name, model) in enumerate(self.models):
            pred = model.predict(X)
            predictions.append(pred)

        predictions = np.array(predictions)

        # 加重平均
        mean_prediction = np.average(predictions, axis=0, weights=self.posterior_weights)

        # 予測の不確実性
        variance = np.average(
            (predictions - mean_prediction) ** 2, 
            axis=0, 
            weights=self.posterior_weights
        )

        return {
            'mean': mean_prediction,
            'std': np.sqrt(variance),
            'model_weights': self.posterior_weights,
            'individual_predictions': {
                name: predictions[i] 
                for i, (name, _) in enumerate(self.models)
            }
        }

5. 実装例:統合アンサンブルシステム

class IntegratedEnsembleTradingSystem:
    def __init__(self, config: Dict):
        self.config = config

        # コンポーネントの初期化
        self.bagging_ensemble = CryptoBaggingEnsemble()
        self.boosting_model = CryptoGradientBoosting()
        self.stacking_ensemble = CryptoStackingEnsemble()
        self.regime_learner = MarketRegimeMetaLearner()
        self.timeframe_ensemble = MultiTimeframeEnsemble()
        self.model_blender = None
        self.bayesian_averager = None

        # データストア
        self.feature_store = {}
        self.prediction_history = []

    def prepare_features(self, market_data: pd.DataFrame) -> pd.DataFrame:
        """包括的な特徴量エンジニアリング"""
        features = pd.DataFrame(index=market_data.index)

        # 価格関連特徴量
        features['returns'] = market_data['close'].pct_change()
        features['log_returns'] = np.log(market_data['close'] / market_data['close'].shift(1))
        features['high_low_ratio'] = market_data['high'] / market_data['low']
        features['close_open_ratio'] = market_data['close'] / market_data['open']

        # ボリューム関連特徴量
        features['volume_sma_ratio'] = market_data['volume'] / market_data['volume'].rolling(20).mean()
        features['volume_std'] = market_data['volume'].rolling(20).std()

        # テクニカル指標
        # RSI
        features['rsi'] = self._calculate_rsi(market_data['close'])

        # MACD
        features['macd'], features['macd_signal'] = self._calculate_macd(market_data['close'])
        features['macd_diff'] = features['macd'] - features['macd_signal']

        # ボリンジャーバンド
        sma_20 = market_data['close'].rolling(20).mean()
        std_20 = market_data['close'].rolling(20).std()
        features['bb_upper'] = sma_20 + 2 * std_20
        features['bb_lower'] = sma_20 - 2 * std_20
        features['bb_position'] = (market_data['close'] - features['bb_lower']) / \
                                 (features['bb_upper'] - features['bb_lower'])

        # 市場構造特徴量
        features['volatility'] = features['returns'].rolling(20).std()
        features['skewness'] = features['returns'].rolling(50).skew()
        features['kurtosis'] = features['returns'].rolling(50).kurt()

        # マイクロストラクチャ特徴量
        features['spread'] = market_data['ask'] - market_data['bid']
        features['mid_price'] = (market_data['ask'] + market_data['bid']) / 2
        features['spread_ratio'] = features['spread'] / features['mid_price']

        return features.dropna()

    def train_all_models(self, 
                        market_data: pd.DataFrame,
                        target: np.ndarray):
        """全モデルを学習"""
        print("Preparing features...")
        X = self.prepare_features(market_data)

        # データの分割
        train_size = int(len(X) * 0.8)
        X_train = X[:train_size]
        y_train = target[:train_size]
        X_val = X[train_size:]
        y_val = target[train_size:]

        print("Training bagging ensemble...")
        self.bagging_ensemble.fit(X_train.values, y_train)

        print("Training gradient boosting...")
        self.boosting_model.fit(X_train, y_train)

        print("Training stacking ensemble...")
        self.stacking_ensemble.fit(X_train.values, y_train)

        print("Training regime-specific models...")
        self.regime_learner.train_regime_specific_models(
            X_train, y_train, market_data[:train_size]
        )

        print("Training multi-timeframe models...")
        self.timeframe_ensemble.train_timeframe_models(
            market_data[:train_size], 
            pd.Series(y_train, index=X_train.index)
        )

        # モデルブレンダーを初期化
        all_models = {
            'bagging': self.bagging_ensemble,
            'boosting': self.boosting_model,
            'stacking': self.stacking_ensemble
        }
        self.model_blender = DynamicModelBlender(all_models)

        # ベイジアン平均化を初期化
        model_list = [
            ('bagging', self.bagging_ensemble),
            ('boosting', self.boosting_model),
            ('stacking', self.stacking_ensemble)
        ]
        self.bayesian_averager = BayesianModelAveraging(model_list)

        # 検証データで評価
        self._evaluate_models(X_val, y_val)

    def predict(self, market_data: pd.DataFrame) -> Dict[str, any]:
        """統合予測"""
        # 特徴量を準備
        X = self.prepare_features(market_data)

        predictions = {}

        # 個別モデルの予測
        predictions['bagging'], predictions['bagging_std'] = \
            self.bagging_ensemble.predict(X.values)

        predictions['boosting'] = self.boosting_model.predict(X)

        predictions['stacking'] = self.stacking_ensemble.predict(X.values)

        # レジーム適応予測
        predictions['regime'] = self.regime_learner.predict(X, market_data)

        # マルチタイムフレーム予測
        tf_predictions = self.timeframe_ensemble.predict_ensemble(market_data)
        predictions.update({f'tf_{k}': v for k, v in tf_predictions.items()})

        # 動的ブレンディング
        market_conditions = self._analyze_market_conditions(market_data)
        predictions['blended'] = self.model_blender.blend_predictions(
            X, market_conditions
        )

        # ベイジアン平均
        bayesian_result = self.bayesian_averager.predict(X.values)
        predictions['bayesian_mean'] = bayesian_result['mean']
        predictions['bayesian_std'] = bayesian_result['std']

        # 最終的なアンサンブル予測
        ensemble_components = [
            predictions['bagging'],
            predictions['boosting'],
            predictions['stacking'],
            predictions['regime'],
            predictions['blended'],
            predictions['bayesian_mean']
        ]

        # 重み付き平均(各モデルの信頼度に基づく)
        weights = self._calculate_confidence_weights(predictions)
        predictions['final'] = np.average(
            ensemble_components, 
            axis=0, 
            weights=weights
        )

        # メタデータを追加
        predictions['metadata'] = {
            'timestamp': pd.Timestamp.now(),
            'model_weights': weights,
            'market_regime': self.regime_learner.current_regime,
            'confidence': self._calculate_prediction_confidence(predictions)
        }

        return predictions

    def _analyze_market_conditions(self, market_data: pd.DataFrame) -> Dict:
        """市場状況を分析"""
        recent_data = market_data.tail(100)

        returns = recent_data['close'].pct_change()

        conditions = {
            'volatility': returns.std(),
            'trend_strength': (recent_data['close'].iloc[-1] - recent_data['close'].iloc[0]) / recent_data['close'].iloc[0],
            'volume_trend': recent_data['volume'].mean() / market_data['volume'].mean(),
            'spread': (recent_data['ask'] - recent_data['bid']).mean()
        }

        return conditions

    def _calculate_confidence_weights(self, predictions: Dict) -> np.ndarray:
        """予測の信頼度に基づく重みを計算"""
        # 各モデルの予測のばらつきから信頼度を推定
        pred_values = [
            predictions['bagging'],
            predictions['boosting'],
            predictions['stacking'],
            predictions['regime'],
            predictions['blended'],
            predictions['bayesian_mean']
        ]

        # 予測の標準偏差(ばらつきが小さいほど信頼度が高い)
        std_dev = np.std(pred_values, axis=0)
        confidence = 1 / (1 + std_dev)

        # 各モデルの過去のパフォーマンスも考慮
        performance_weights = np.array([0.9, 0.85, 0.88, 0.82, 0.9, 0.92])

        # 最終的な重み
        weights = confidence.mean() * performance_weights
        weights = weights / weights.sum()

        return weights

    def _calculate_prediction_confidence(self, predictions: Dict) -> float:
        """予測の全体的な信頼度を計算"""
        # 各モデルの予測の一致度
        pred_values = [
            predictions['bagging'],
            predictions['boosting'],
            predictions['stacking'],
            predictions['regime']
        ]

        # 相関係数の平均
        correlations = []
        for i in range(len(pred_values)):
            for j in range(i+1, len(pred_values)):
                corr = np.corrcoef(pred_values[i], pred_values[j])[0, 1]
                correlations.append(corr)

        avg_correlation = np.mean(correlations)

        # 予測の分散
        prediction_variance = np.var(pred_values, axis=0).mean()

        # 信頼度スコア(0-1)
        confidence = avg_correlation * (1 / (1 + prediction_variance))

        return np.clip(confidence, 0, 1)

    def _evaluate_models(self, X_val: pd.DataFrame, y_val: np.ndarray):
        """モデルを評価"""
        from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

        results = {}

        # 各モデルの評価
        models = {
            'bagging': lambda x: self.bagging_ensemble.predict(x.values)[0],
            'boosting': lambda x: self.boosting_model.predict(x),
            'stacking': lambda x: self.stacking_ensemble.predict(x.values)
        }

        for name, predict_func in models.items():
            pred = predict_func(X_val)

            results[name] = {
                'mse': mean_squared_error(y_val, pred),
                'mae': mean_absolute_error(y_val, pred),
                'r2': r2_score(y_val, pred)
            }

        print("\nModel Evaluation Results:")
        for name, metrics in results.items():
            print(f"\n{name}:")
            for metric, value in metrics.items():
                print(f"  {metric}: {value:.6f}")

        return results

    def _calculate_rsi(self, prices: pd.Series, period: int = 14) -> pd.Series:
        """RSIを計算"""
        delta = prices.diff()
        gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
        rs = gain / loss
        rsi = 100 - (100 / (1 + rs))
        return rsi

    def _calculate_macd(self, prices: pd.Series) -> Tuple[pd.Series, pd.Series]:
        """MACDを計算"""
        ema_12 = prices.ewm(span=12).mean()
        ema_26 = prices.ewm(span=26).mean()
        macd = ema_12 - ema_26
        signal = macd.ewm(span=9).mean()
        return macd, signal

# 使用例
def main():
    # データの読み込み
    market_data = pd.read_csv('crypto_market_data.csv', parse_dates=['timestamp'])
    market_data.set_index('timestamp', inplace=True)

    # ターゲット変数の作成(例:1時間後の価格変化率)
    target = market_data['close'].shift(-12).pct_change(12).dropna()

    # システムの初期化
    config = {
        'symbol': 'BTC/USDT',
        'target_horizon': '1h'
    }

    system = IntegratedEnsembleTradingSystem(config)

    # モデルの学習
    system.train_all_models(market_data, target.values)

    # 予測の実行
    latest_data = market_data.tail(1000)
    predictions = system.predict(latest_data)

    print(f"\nFinal prediction: {predictions['final'][-1]:.6f}")
    print(f"Confidence: {predictions['metadata']['confidence']:.2%}")
    print(f"Current regime: {predictions['metadata']['market_regime']}")

if __name__ == "__main__":
    main()

6. パフォーマンス最適化とバックテスト

class EnsembleBacktester:
    def __init__(self, ensemble_system: IntegratedEnsembleTradingSystem):
        self.ensemble_system = ensemble_system
        self.results = {}

    def backtest(self, 
                market_data: pd.DataFrame,
                initial_capital: float = 10000,
                position_size: float = 0.1,
                transaction_cost: float = 0.001):
        """アンサンブルシステムのバックテスト"""
        capital = initial_capital
        position = 0
        trades = []
        equity_curve = []

        # ローリングウィンドウでバックテスト
        window_size = 1000
        step_size = 100

        for i in range(window_size, len(market_data), step_size):
            # 訓練データ
            train_data = market_data[i-window_size:i]

            # テストデータ
            test_data = market_data[i:i+step_size]

            # 予測
            predictions = self.ensemble_system.predict(test_data)

            # 取引シグナルの生成
            signals = self._generate_signals(predictions['final'])

            # 取引の実行
            for j, signal in enumerate(signals):
                if j >= len(test_data):
                    break

                current_price = test_data['close'].iloc[j]

                if signal > 0.5 and position <= 0:  # 買いシグナル
                    # ポジションサイズの計算
                    trade_size = capital * position_size / current_price
                    cost = trade_size * current_price * transaction_cost

                    position = trade_size
                    capital -= trade_size * current_price + cost

                    trades.append({
                        'timestamp': test_data.index[j],
                        'type': 'buy',
                        'price': current_price,
                        'size': trade_size,
                        'cost': cost
                    })

                elif signal < -0.5 and position >= 0:  # 売りシグナル
                    if position > 0:
                        # 既存のロングポジションをクローズ
                        capital += position * current_price
                        cost = position * current_price * transaction_cost
                        capital -= cost

                        trades.append({
                            'timestamp': test_data.index[j],
                            'type': 'sell',
                            'price': current_price,
                            'size': position,
                            'cost': cost
                        })

                    # ショートポジションを開く
                    trade_size = capital * position_size / current_price
                    position = -trade_size
                    capital += trade_size * current_price - cost

                # エクイティカーブを記録
                mark_to_market = position * current_price
                total_equity = capital + mark_to_market

                equity_curve.append({
                    'timestamp': test_data.index[j],
                    'equity': total_equity,
                    'capital': capital,
                    'position': position,
                    'position_value': mark_to_market
                })

        # 結果の分析
        self.results = self._analyze_results(
            equity_curve, 
            trades, 
            initial_capital
        )

        return self.results

    def _generate_signals(self, predictions: np.ndarray) -> np.ndarray:
        """予測から取引シグナルを生成"""
        # 閾値ベースのシグナル生成
        signals = np.zeros_like(predictions)

        # 強い上昇予測
        signals[predictions > 0.002] = 1

        # 強い下落予測
        signals[predictions < -0.002] = -1

        return signals

    def _analyze_results(self, 
                        equity_curve: List[Dict],
                        trades: List[Dict],
                        initial_capital: float) -> Dict:
        """バックテスト結果を分析"""
        equity_df = pd.DataFrame(equity_curve)
        trades_df = pd.DataFrame(trades)

        # リターンの計算
        equity_df['returns'] = equity_df['equity'].pct_change()
        total_return = (equity_df['equity'].iloc[-1] / initial_capital - 1) * 100

        # シャープレシオ
        sharpe_ratio = np.sqrt(252) * equity_df['returns'].mean() / equity_df['returns'].std()

        # 最大ドローダウン
        rolling_max = equity_df['equity'].expanding().max()
        drawdown = (equity_df['equity'] - rolling_max) / rolling_max
        max_drawdown = drawdown.min() * 100

        # 勝率
        if len(trades_df) > 1:
            winning_trades = 0
            for i in range(0, len(trades_df)-1, 2):
                if i+1 < len(trades_df):
                    entry_price = trades_df.iloc[i]['price']
                    exit_price = trades_df.iloc[i+1]['price']
                    if trades_df.iloc[i]['type'] == 'buy':
                        if exit_price > entry_price:
                            winning_trades += 1
                    else:
                        if exit_price < entry_price:
                            winning_trades += 1

            win_rate = winning_trades / (len(trades_df) // 2) * 100
        else:
            win_rate = 0

        return {
            'total_return': total_return,
            'sharpe_ratio': sharpe_ratio,
            'max_drawdown': max_drawdown,
            'win_rate': win_rate,
            'num_trades': len(trades_df),
            'final_equity': equity_df['equity'].iloc[-1],
            'equity_curve': equity_df
        }

まとめ

本ドキュメントでは、暗号通貨取引における高度なアンサンブル学習とメタ学習戦略について詳しく解説しました。

主要なポイント

  1. アンサンブル学習
    - バギング:予測の安定性を向上
    - ブースティング:逐次的な誤差補正
    - スタッキング:複数モデルの相乗効果

  2. メタ学習戦略
    - 市場レジーム適応
    - マルチタイムフレーム統合
    - 動的モデル選択

  3. モデルブレンディング
    - 動的重み付け
    - ベイジアンモデル平均化
    - 信頼度ベースの統合

  4. 実装のベストプラクティス
    - 包括的な特徴量エンジニアリング
    - リアルタイム適応メカニズム
    - 堅牢なバックテストフレームワーク

これらの技術を組み合わせることで、市場の変化に適応し、単一モデルよりも安定した予測性能を実現できます。