ML Documentation

市場レジーム検出と適応的戦略

1. はじめに

暗号通貨市場は、伝統的な金融市場と比較して、より頻繁かつ劇的なレジーム変化を経験します。ブル市場、ベア市場、高ボラティリティ期間、レンジ相場など、異なる市場状態に応じて最適な取引戦略は大きく異なります。本ドキュメントでは、機械学習を活用した市場レジーム検出と、それに基づく適応的戦略について詳しく解説します。

2. Hidden Markov Models (HMM)によるレジーム検出

2.1 HMMの基本実装

import numpy as np
import pandas as pd
from hmmlearn import hmm
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
from typing import Dict, List, Tuple, Optional
import warnings
warnings.filterwarnings('ignore')

class MarketRegimeHMM:
    def __init__(self, 
                 n_states: int = 4,
                 covariance_type: str = 'full',
                 n_iter: int = 100):
        """
        Hidden Markov Modelによる市場レジーム検出

        Parameters:
        -----------
        n_states: レジーム数(デフォルト4: Bull, Bear, High Vol, Range)
        covariance_type: 共分散行列のタイプ
        n_iter: 学習イテレーション数
        """
        self.n_states = n_states
        self.model = hmm.GaussianHMM(
            n_components=n_states,
            covariance_type=covariance_type,
            n_iter=n_iter,
            random_state=42
        )
        self.scaler = StandardScaler()
        self.regime_labels = {
            0: 'Bull Market',
            1: 'Bear Market', 
            2: 'High Volatility',
            3: 'Range Bound'
        }

    def prepare_features(self, market_data: pd.DataFrame) -> np.ndarray:
        """市場データから特徴量を抽出"""
        features = pd.DataFrame(index=market_data.index)

        # リターン
        features['returns'] = market_data['close'].pct_change()

        # ボラティリティ(実現ボラティリティ)
        features['volatility'] = features['returns'].rolling(20).std()

        # ボリューム変化
        features['volume_change'] = market_data['volume'].pct_change()

        # 価格モメンタム
        features['momentum_5'] = market_data['close'].pct_change(5)
        features['momentum_20'] = market_data['close'].pct_change(20)

        # RSI
        features['rsi'] = self._calculate_rsi(market_data['close'])

        # ボリュームプロファイル
        features['volume_ma_ratio'] = (
            market_data['volume'] / 
            market_data['volume'].rolling(20).mean()
        )

        # 高値安値スプレッド(ボラティリティの代理指標)
        features['high_low_spread'] = (
            (market_data['high'] - market_data['low']) / market_data['close']
        )

        # トレンド強度
        sma_20 = market_data['close'].rolling(20).mean()
        sma_50 = market_data['close'].rolling(50).mean()
        features['trend_strength'] = (sma_20 - sma_50) / sma_50

        # 市場の歪度
        features['return_skew'] = features['returns'].rolling(60).skew()

        # 欠損値を除去
        features = features.dropna()

        return features

    def fit(self, market_data: pd.DataFrame):
        """HMMモデルを学習"""
        # 特徴量を準備
        features = self.prepare_features(market_data)

        # スケーリング
        features_scaled = self.scaler.fit_transform(features)

        # モデルを学習
        self.model.fit(features_scaled)

        # レジームを予測
        regimes = self.model.predict(features_scaled)

        # レジームの特性を分析
        self._analyze_regime_characteristics(features, regimes)

        return self

    def predict_regime(self, market_data: pd.DataFrame) -> np.ndarray:
        """現在の市場レジームを予測"""
        features = self.prepare_features(market_data)
        features_scaled = self.scaler.transform(features)

        # レジームを予測
        regimes = self.model.predict(features_scaled)

        return regimes

    def predict_regime_probabilities(self, market_data: pd.DataFrame) -> np.ndarray:
        """各レジームの確率を予測"""
        features = self.prepare_features(market_data)
        features_scaled = self.scaler.transform(features)

        # 各レジームの確率を計算
        _, posteriors = self.model.score_samples(features_scaled)

        return posteriors

    def get_transition_matrix(self) -> np.ndarray:
        """レジーム間の遷移確率行列を取得"""
        return self.model.transmat_

    def _analyze_regime_characteristics(self, features: pd.DataFrame, regimes: np.ndarray):
        """各レジームの特性を分析"""
        self.regime_stats = {}

        for regime in range(self.n_states):
            regime_mask = regimes == regime
            regime_features = features[regime_mask]

            stats = {
                'mean_return': regime_features['returns'].mean(),
                'volatility': regime_features['volatility'].mean(),
                'volume_profile': regime_features['volume_ma_ratio'].mean(),
                'trend_strength': regime_features['trend_strength'].mean(),
                'duration': np.mean(self._get_regime_durations(regimes, regime))
            }

            # レジームラベルを自動割り当て
            if regime not in self.regime_labels:
                self.regime_labels[regime] = self._assign_regime_label(stats)

            self.regime_stats[regime] = stats

    def _assign_regime_label(self, stats: Dict) -> str:
        """統計情報に基づいてレジームラベルを割り当て"""
        if stats['mean_return'] > 0.001 and stats['trend_strength'] > 0.01:
            return 'Bull Market'
        elif stats['mean_return'] < -0.001 and stats['trend_strength'] < -0.01:
            return 'Bear Market'
        elif stats['volatility'] > 0.03:
            return 'High Volatility'
        else:
            return 'Range Bound'

    def _get_regime_durations(self, regimes: np.ndarray, target_regime: int) -> List[int]:
        """特定レジームの継続期間を計算"""
        durations = []
        current_duration = 0

        for regime in regimes:
            if regime == target_regime:
                current_duration += 1
            else:
                if current_duration > 0:
                    durations.append(current_duration)
                current_duration = 0

        if current_duration > 0:
            durations.append(current_duration)

        return durations if durations else [0]

    def _calculate_rsi(self, prices: pd.Series, period: int = 14) -> pd.Series:
        """RSIを計算"""
        delta = prices.diff()
        gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
        rs = gain / loss
        rsi = 100 - (100 / (1 + rs))
        return rsi

    def plot_regimes(self, market_data: pd.DataFrame, regimes: np.ndarray):
        """レジームの可視化"""
        fig, axes = plt.subplots(3, 1, figsize=(15, 10), sharex=True)

        # 価格チャート with レジーム
        ax1 = axes[0]
        for regime in range(self.n_states):
            mask = regimes == regime
            ax1.scatter(market_data.index[mask], 
                       market_data['close'][mask],
                       label=self.regime_labels[regime],
                       alpha=0.5, s=10)
        ax1.plot(market_data['close'], 'k-', alpha=0.3, linewidth=0.5)
        ax1.set_ylabel('Price')
        ax1.legend()
        ax1.set_title('Market Regimes')

        # ボラティリティ
        ax2 = axes[1]
        returns = market_data['close'].pct_change()
        volatility = returns.rolling(20).std()
        ax2.plot(volatility, label='Realized Volatility')
        ax2.set_ylabel('Volatility')
        ax2.legend()

        # レジーム確率
        ax3 = axes[2]
        probabilities = self.predict_regime_probabilities(market_data)
        for i in range(self.n_states):
            ax3.plot(market_data.index[len(market_data)-len(probabilities):],
                    probabilities[:, i], 
                    label=self.regime_labels[i])
        ax3.set_ylabel('Regime Probability')
        ax3.set_xlabel('Date')
        ax3.legend()

        plt.tight_layout()
        plt.show()

2.2 高度なHMM実装(GARCH効果を含む)

class GARCHRegimeHMM:
    def __init__(self, n_states: int = 4):
        self.n_states = n_states
        self.model = None
        self.garch_params = {}

    def fit(self, returns: pd.Series):
        """GARCH効果を含むHMMを学習"""
        from arch import arch_model

        # 各レジームのGARCHパラメータを推定
        # 初期クラスタリングでレジームを仮決定
        initial_regimes = self._initial_clustering(returns)

        # 各レジームでGARCHモデルを推定
        for regime in range(self.n_states):
            regime_returns = returns[initial_regimes == regime]

            if len(regime_returns) > 100:
                # GARCH(1,1)モデル
                garch = arch_model(regime_returns, vol='Garch', p=1, q=1)
                garch_fit = garch.fit(disp='off')

                self.garch_params[regime] = {
                    'omega': garch_fit.params['omega'],
                    'alpha': garch_fit.params['alpha[1]'],
                    'beta': garch_fit.params['beta[1]']
                }
            else:
                # デフォルトパラメータ
                self.garch_params[regime] = {
                    'omega': 0.00001,
                    'alpha': 0.05,
                    'beta': 0.94
                }

        # HMMパラメータを推定
        self._estimate_hmm_parameters(returns, initial_regimes)

        return self

    def _initial_clustering(self, returns: pd.Series) -> np.ndarray:
        """K-meansクラスタリングで初期レジームを決定"""
        from sklearn.cluster import KMeans

        # 特徴量を作成
        features = pd.DataFrame()
        features['returns'] = returns
        features['abs_returns'] = np.abs(returns)
        features['squared_returns'] = returns ** 2
        features['volatility'] = returns.rolling(20).std()

        features = features.dropna()

        # クラスタリング
        kmeans = KMeans(n_clusters=self.n_states, random_state=42)
        regimes = kmeans.fit_predict(features)

        # 元のインデックスに合わせる
        full_regimes = np.zeros(len(returns))
        full_regimes[features.index] = regimes

        return full_regimes.astype(int)

    def _estimate_hmm_parameters(self, returns: pd.Series, initial_regimes: np.ndarray):
        """HMMパラメータを推定"""
        # 遷移確率行列を推定
        transition_matrix = self._estimate_transition_matrix(initial_regimes)

        # 初期状態確率
        initial_probs = np.bincount(initial_regimes) / len(initial_regimes)

        self.model = {
            'transition_matrix': transition_matrix,
            'initial_probs': initial_probs,
            'garch_params': self.garch_params
        }

    def _estimate_transition_matrix(self, regimes: np.ndarray) -> np.ndarray:
        """遷移確率行列を推定"""
        transition_counts = np.zeros((self.n_states, self.n_states))

        for i in range(len(regimes) - 1):
            transition_counts[regimes[i], regimes[i+1]] += 1

        # 正規化
        transition_matrix = transition_counts / transition_counts.sum(axis=1, keepdims=True)

        # ゼロ確率を回避
        transition_matrix = np.where(transition_matrix == 0, 1e-6, transition_matrix)

        return transition_matrix

    def predict_volatility(self, returns: pd.Series, regime: int) -> pd.Series:
        """レジーム条件付きボラティリティを予測"""
        params = self.garch_params[regime]

        # GARCH(1,1)ボラティリティ
        volatility = pd.Series(index=returns.index, dtype=float)
        volatility.iloc[0] = returns.std()  # 初期値

        for t in range(1, len(returns)):
            volatility.iloc[t] = np.sqrt(
                params['omega'] + 
                params['alpha'] * returns.iloc[t-1]**2 + 
                params['beta'] * volatility.iloc[t-1]**2
            )

        return volatility

3. 変化点検出アルゴリズム

3.1 オンライン変化点検出

class OnlineChangePointDetector:
    def __init__(self, 
                 hazard_rate: float = 0.01,
                 detection_threshold: float = 0.5):
        """
        ベイジアンオンライン変化点検出

        Parameters:
        -----------
        hazard_rate: 変化が発生する事前確率
        detection_threshold: 検出閾値
        """
        self.hazard_rate = hazard_rate
        self.detection_threshold = detection_threshold
        self.run_length_probabilities = None

    def detect_change_points(self, data: pd.Series) -> List[int]:
        """オンラインで変化点を検出"""
        change_points = []

        # 初期化
        self.run_length_probabilities = np.array([1.0])

        for t in range(1, len(data)):
            # 予測分布を計算
            pred_prob = self._predictive_probability(data[:t+1])

            # 成長確率(変化なし)
            growth_probs = self.run_length_probabilities * pred_prob * (1 - self.hazard_rate)

            # 変化点確率
            cp_prob = np.sum(self.run_length_probabilities * pred_prob * self.hazard_rate)

            # 新しいラン長確率
            self.run_length_probabilities = np.append(cp_prob, growth_probs)

            # 正規化
            self.run_length_probabilities /= np.sum(self.run_length_probabilities)

            # 最大長制限(メモリ効率)
            if len(self.run_length_probabilities) > 100:
                self.run_length_probabilities = self.run_length_probabilities[-100:]

            # 変化点の検出
            if self.run_length_probabilities[0] > self.detection_threshold:
                change_points.append(t)

        return change_points

    def _predictive_probability(self, data: np.ndarray) -> float:
        """予測確率を計算(正規分布を仮定)"""
        if len(data) < 2:
            return 1.0

        # オンライン平均と分散
        mean = np.mean(data)
        var = np.var(data) + 1e-6  # 数値安定性

        # 最新データ点の尤度
        likelihood = np.exp(-0.5 * (data[-1] - mean)**2 / var) / np.sqrt(2 * np.pi * var)

        return likelihood

    def plot_change_points(self, data: pd.Series, change_points: List[int]):
        """変化点を可視化"""
        fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(15, 8), sharex=True)

        # データプロット
        ax1.plot(data.index, data.values, 'b-', alpha=0.7)

        # 変化点をマーク
        for cp in change_points:
            ax1.axvline(x=data.index[cp], color='r', linestyle='--', alpha=0.5)

        ax1.set_ylabel('Value')
        ax1.set_title('Data with Change Points')

        # ラン長確率の推移
        if self.run_length_probabilities is not None:
            ax2.plot(self.run_length_probabilities)
            ax2.set_ylabel('Run Length Probability')
            ax2.set_xlabel('Time')

        plt.tight_layout()
        plt.show()

3.2 CUSUM(累積和)変化点検出

class CUSUMChangeDetector:
    def __init__(self, threshold: float = 5.0, drift: float = 0.5):
        self.threshold = threshold
        self.drift = drift

    def detect(self, data: pd.Series) -> Tuple[List[int], Dict]:
        """CUSUM統計量による変化点検出"""
        mean = data.mean()
        std = data.std()

        # 正規化
        normalized_data = (data - mean) / std

        # CUSUM統計量
        cusum_pos = np.zeros(len(data))
        cusum_neg = np.zeros(len(data))

        change_points = []

        for i in range(1, len(data)):
            # 正の変化
            cusum_pos[i] = max(0, cusum_pos[i-1] + normalized_data.iloc[i] - self.drift)

            # 負の変化
            cusum_neg[i] = max(0, cusum_neg[i-1] - normalized_data.iloc[i] - self.drift)

            # 閾値チェック
            if cusum_pos[i] > self.threshold or cusum_neg[i] > self.threshold:
                change_points.append(i)
                # リセット
                cusum_pos[i] = 0
                cusum_neg[i] = 0

        # 統計情報
        stats = {
            'cusum_pos': cusum_pos,
            'cusum_neg': cusum_neg,
            'num_change_points': len(change_points),
            'mean_segment_length': len(data) / (len(change_points) + 1) if change_points else len(data)
        }

        return change_points, stats

    def adaptive_threshold(self, data: pd.Series, window: int = 100) -> pd.Series:
        """適応的閾値を計算"""
        rolling_std = data.rolling(window).std()
        adaptive_threshold = self.threshold * rolling_std

        return adaptive_threshold

4. 適応的戦略切り替えシステム

4.1 戦略セレクター

class AdaptiveStrategySelector:
    def __init__(self):
        self.strategies = {}
        self.performance_history = {}
        self.current_regime = None
        self.regime_detector = MarketRegimeHMM()

    def register_strategy(self, regime: str, strategy):
        """レジーム別の戦略を登録"""
        self.strategies[regime] = strategy
        self.performance_history[regime] = []

    def select_strategy(self, market_data: pd.DataFrame) -> any:
        """現在の市場状況に最適な戦略を選択"""
        # レジームを検出
        regimes = self.regime_detector.predict_regime(market_data)
        current_regime_idx = regimes[-1]
        current_regime = self.regime_detector.regime_labels[current_regime_idx]

        # レジーム遷移確率を考慮
        regime_probs = self.regime_detector.predict_regime_probabilities(market_data)[-1]

        # 最も確率の高いレジームの戦略を選択
        if regime_probs[current_regime_idx] > 0.6:
            # 確信度が高い場合
            selected_strategy = self.strategies.get(current_regime)
        else:
            # 確信度が低い場合はブレンド戦略
            selected_strategy = self._create_blended_strategy(regime_probs)

        self.current_regime = current_regime

        return selected_strategy

    def _create_blended_strategy(self, regime_probs: np.ndarray):
        """複数戦略のブレンド"""
        class BlendedStrategy:
            def __init__(self, strategies, weights):
                self.strategies = strategies
                self.weights = weights

            def generate_signal(self, data):
                signals = []
                for regime_idx, weight in enumerate(self.weights):
                    if weight > 0.1:  # 10%以上の重みがある戦略のみ使用
                        regime_name = self.regime_detector.regime_labels[regime_idx]
                        if regime_name in self.strategies:
                            signal = self.strategies[regime_name].generate_signal(data)
                            signals.append(signal * weight)

                return sum(signals) if signals else 0

        return BlendedStrategy(self.strategies, regime_probs)

    def update_performance(self, regime: str, performance: float):
        """戦略のパフォーマンスを更新"""
        self.performance_history[regime].append({
            'timestamp': pd.Timestamp.now(),
            'performance': performance
        })

        # 古いデータを削除(メモリ管理)
        if len(self.performance_history[regime]) > 1000:
            self.performance_history[regime] = self.performance_history[regime][-500:]

4.2 レジーム別戦略の実装

class RegimeSpecificStrategies:
    def __init__(self):
        self.strategies = {
            'Bull Market': self.BullMarketStrategy(),
            'Bear Market': self.BearMarketStrategy(),
            'High Volatility': self.HighVolatilityStrategy(),
            'Range Bound': self.RangeBoundStrategy()
        }

    class BullMarketStrategy:
        """ブル市場戦略:トレンドフォロー重視"""
        def __init__(self):
            self.lookback_short = 10
            self.lookback_long = 30

        def generate_signal(self, data: pd.DataFrame) -> float:
            # 移動平均クロスオーバー
            sma_short = data['close'].rolling(self.lookback_short).mean()
            sma_long = data['close'].rolling(self.lookback_long).mean()

            # モメンタム指標
            momentum = data['close'].pct_change(20).iloc[-1]

            # RSI(買われ過ぎを避ける)
            rsi = self._calculate_rsi(data['close'])

            # シグナル生成
            signal = 0

            if sma_short.iloc[-1] > sma_long.iloc[-1]:
                signal += 0.5

            if momentum > 0.05:
                signal += 0.3

            if 30 < rsi.iloc[-1] < 70:
                signal += 0.2
            else:
                signal -= 0.3  # 極端なRSIは避ける

            # ボリューム確認
            volume_ratio = data['volume'].iloc[-1] / data['volume'].rolling(20).mean().iloc[-1]
            if volume_ratio > 1.2:
                signal *= 1.2  # ボリュームで確認

            return np.clip(signal, -1, 1)

        def _calculate_rsi(self, prices: pd.Series, period: int = 14) -> pd.Series:
            delta = prices.diff()
            gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
            loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
            rs = gain / loss
            rsi = 100 - (100 / (1 + rs))
            return rsi

    class BearMarketStrategy:
        """ベア市場戦略:ショート重視、リスク管理強化"""
        def __init__(self):
            self.resistance_window = 20
            self.support_window = 20

        def generate_signal(self, data: pd.DataFrame) -> float:
            # レジスタンスレベル
            resistance = data['high'].rolling(self.resistance_window).max()

            # 下降トレンドの確認
            sma_20 = data['close'].rolling(20).mean()
            sma_50 = data['close'].rolling(50).mean()

            # ボラティリティ
            volatility = data['close'].pct_change().rolling(20).std()

            signal = 0

            # 価格がレジスタンスに近い場合、ショート
            if data['close'].iloc[-1] > resistance.iloc[-1] * 0.98:
                signal -= 0.5

            # 下降トレンド確認
            if sma_20.iloc[-1] < sma_50.iloc[-1]:
                signal -= 0.3

            # 高ボラティリティ時はポジション削減
            if volatility.iloc[-1] > 0.03:
                signal *= 0.5

            # 反発の可能性を考慮
            oversold_bounce = self._detect_oversold_bounce(data)
            if oversold_bounce:
                signal += 0.4  # 一時的なロング

            return np.clip(signal, -1, 1)

        def _detect_oversold_bounce(self, data: pd.DataFrame) -> bool:
            """売られ過ぎからの反発を検出"""
            rsi = self._calculate_rsi(data['close'])

            # RSIが20以下かつ上昇転換
            if rsi.iloc[-1] < 30 and rsi.iloc[-1] > rsi.iloc[-2]:
                return True

            # ボリンジャーバンド下限タッチ
            bb_lower = self._bollinger_bands(data['close'])[1]
            if data['close'].iloc[-1] < bb_lower.iloc[-1]:
                return True

            return False

        def _calculate_rsi(self, prices: pd.Series, period: int = 14) -> pd.Series:
            delta = prices.diff()
            gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
            loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
            rs = gain / loss
            rsi = 100 - (100 / (1 + rs))
            return rsi

        def _bollinger_bands(self, prices: pd.Series, window: int = 20, num_std: float = 2):
            sma = prices.rolling(window).mean()
            std = prices.rolling(window).std()
            upper_band = sma + (std * num_std)
            lower_band = sma - (std * num_std)
            return upper_band, lower_band

    class HighVolatilityStrategy:
        """高ボラティリティ戦略:短期取引、厳格なリスク管理"""
        def __init__(self):
            self.atr_period = 14
            self.breakout_period = 20

        def generate_signal(self, data: pd.DataFrame) -> float:
            # ATR(Average True Range)
            atr = self._calculate_atr(data)

            # ブレイクアウト検出
            high_breakout = data['close'].iloc[-1] > data['high'].rolling(self.breakout_period).max().iloc[-2]
            low_breakout = data['close'].iloc[-1] < data['low'].rolling(self.breakout_period).min().iloc[-2]

            # ボラティリティスマイル
            iv_skew = self._estimate_iv_skew(data)

            signal = 0

            # ブレイクアウト戦略
            if high_breakout:
                signal += 0.6
            elif low_breakout:
                signal -= 0.6

            # ボラティリティが極端に高い場合は逆張り
            if atr.iloc[-1] > atr.rolling(100).mean().iloc[-1] * 2:
                signal *= -0.5  # 逆張り

            # IVスキューに基づく調整
            signal += iv_skew * 0.2

            # ポジションサイズは小さく
            signal *= 0.5

            return np.clip(signal, -1, 1)

        def _calculate_atr(self, data: pd.DataFrame, period: int = 14) -> pd.Series:
            """Average True Rangeを計算"""
            high_low = data['high'] - data['low']
            high_close = np.abs(data['high'] - data['close'].shift())
            low_close = np.abs(data['low'] - data['close'].shift())

            true_range = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
            atr = true_range.rolling(period).mean()

            return atr

        def _estimate_iv_skew(self, data: pd.DataFrame) -> float:
            """インプライドボラティリティスキューを推定"""
            # 実際にはオプションデータが必要だが、ここでは価格の歪度で代用
            returns = data['close'].pct_change()
            skew = returns.rolling(60).skew().iloc[-1]

            # スキューが負の場合、プット需要が高い(下落懸念)
            return -skew * 0.1

    class RangeBoundStrategy:
        """レンジ相場戦略:平均回帰、サポート・レジスタンス取引"""
        def __init__(self):
            self.lookback = 50
            self.num_std = 2

        def generate_signal(self, data: pd.DataFrame) -> float:
            # ボリンジャーバンド
            sma = data['close'].rolling(20).mean()
            std = data['close'].rolling(20).std()
            upper_band = sma + std * self.num_std
            lower_band = sma - std * self.num_std

            # サポート・レジスタンスレベル
            support, resistance = self._identify_support_resistance(data)

            # 現在価格の位置
            current_price = data['close'].iloc[-1]

            signal = 0

            # ボリンジャーバンド戦略
            if current_price < lower_band.iloc[-1]:
                signal += 0.7  # 買い
            elif current_price > upper_band.iloc[-1]:
                signal -= 0.7  # 売り

            # サポート・レジスタンス戦略
            if support and current_price < support * 1.01:
                signal += 0.3
            elif resistance and current_price > resistance * 0.99:
                signal -= 0.3

            # レンジの強度を確認
            range_strength = self._calculate_range_strength(data)
            signal *= range_strength

            return np.clip(signal, -1, 1)

        def _identify_support_resistance(self, data: pd.DataFrame) -> Tuple[float, float]:
            """サポート・レジスタンスレベルを特定"""
            # 直近の高値・安値
            recent_data = data.tail(self.lookback)

            # ローカルな極値を検出
            highs = recent_data['high'].rolling(5).max() == recent_data['high']
            lows = recent_data['low'].rolling(5).min() == recent_data['low']

            resistance_levels = recent_data['high'][highs].values
            support_levels = recent_data['low'][lows].values

            # クラスタリングして主要レベルを特定
            if len(resistance_levels) > 0:
                resistance = np.percentile(resistance_levels, 75)
            else:
                resistance = recent_data['high'].max()

            if len(support_levels) > 0:
                support = np.percentile(support_levels, 25)
            else:
                support = recent_data['low'].min()

            return support, resistance

        def _calculate_range_strength(self, data: pd.DataFrame) -> float:
            """レンジ相場の強度を計算"""
            # ADX(Average Directional Index)の逆数を使用
            adx = self._calculate_adx(data)

            # ADXが低いほどレンジ相場
            if adx.iloc[-1] < 25:
                return 1.0
            elif adx.iloc[-1] < 40:
                return 0.5
            else:
                return 0.2

        def _calculate_adx(self, data: pd.DataFrame, period: int = 14) -> pd.Series:
            """ADXを計算"""
            # +DM, -DM
            high_diff = data['high'].diff()
            low_diff = -data['low'].diff()

            plus_dm = high_diff.where((high_diff > low_diff) & (high_diff > 0), 0)
            minus_dm = low_diff.where((low_diff > high_diff) & (low_diff > 0), 0)

            # TR
            tr = self._calculate_atr(data, 1)

            # +DI, -DI
            plus_di = 100 * (plus_dm.rolling(period).mean() / tr.rolling(period).mean())
            minus_di = 100 * (minus_dm.rolling(period).mean() / tr.rolling(period).mean())

            # DX
            dx = 100 * np.abs(plus_di - minus_di) / (plus_di + minus_di)

            # ADX
            adx = dx.rolling(period).mean()

            return adx

        def _calculate_atr(self, data: pd.DataFrame, period: int = 14) -> pd.Series:
            high_low = data['high'] - data['low']
            high_close = np.abs(data['high'] - data['close'].shift())
            low_close = np.abs(data['low'] - data['close'].shift())

            true_range = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)

            if period == 1:
                return true_range
            else:
                return true_range.rolling(period).mean()

5. 市場状況分類システム

5.1 マルチモーダル市場分類

class MarketConditionClassifier:
    def __init__(self):
        self.feature_extractors = {
            'price': self._extract_price_features,
            'volume': self._extract_volume_features,
            'volatility': self._extract_volatility_features,
            'microstructure': self._extract_microstructure_features,
            'sentiment': self._extract_sentiment_features
        }
        self.classifier = None

    def extract_all_features(self, market_data: pd.DataFrame) -> pd.DataFrame:
        """すべての特徴量を抽出"""
        all_features = pd.DataFrame(index=market_data.index)

        for feature_type, extractor in self.feature_extractors.items():
            features = extractor(market_data)
            all_features = pd.concat([all_features, features], axis=1)

        return all_features.dropna()

    def _extract_price_features(self, data: pd.DataFrame) -> pd.DataFrame:
        """価格関連の特徴量"""
        features = pd.DataFrame(index=data.index)

        # リターン統計
        returns = data['close'].pct_change()
        features['returns'] = returns
        features['returns_squared'] = returns ** 2
        features['returns_abs'] = np.abs(returns)

        # 移動平均
        for period in [5, 10, 20, 50]:
            features[f'sma_{period}'] = data['close'] / data['close'].rolling(period).mean() - 1

        # 価格レンジ
        features['high_low_ratio'] = data['high'] / data['low'] - 1
        features['close_open_ratio'] = data['close'] / data['open'] - 1

        # トレンド強度
        features['trend_strength'] = self._calculate_trend_strength(data['close'])

        # 価格のフラクタル次元
        features['fractal_dimension'] = self._calculate_fractal_dimension(data['close'])

        return features

    def _extract_volume_features(self, data: pd.DataFrame) -> pd.DataFrame:
        """ボリューム関連の特徴量"""
        features = pd.DataFrame(index=data.index)

        # ボリューム比率
        features['volume_ratio'] = data['volume'] / data['volume'].rolling(20).mean()

        # VWAP(Volume Weighted Average Price)
        vwap = (data['close'] * data['volume']).rolling(20).sum() / data['volume'].rolling(20).sum()
        features['vwap_ratio'] = data['close'] / vwap - 1

        # OBV(On Balance Volume)
        obv = self._calculate_obv(data)
        features['obv_trend'] = obv / obv.rolling(20).mean() - 1

        # ボリュームプロファイル
        features['volume_profile'] = self._calculate_volume_profile(data)

        return features

    def _extract_volatility_features(self, data: pd.DataFrame) -> pd.DataFrame:
        """ボラティリティ関連の特徴量"""
        features = pd.DataFrame(index=data.index)

        returns = data['close'].pct_change()

        # 実現ボラティリティ
        for period in [5, 10, 20]:
            features[f'volatility_{period}'] = returns.rolling(period).std()

        # GARCH推定ボラティリティ
        features['garch_vol'] = self._estimate_garch_volatility(returns)

        # Parkinson推定量
        features['parkinson_vol'] = self._calculate_parkinson_volatility(data)

        # ボラティリティの変化率
        features['vol_change'] = features['volatility_20'].pct_change(5)

        return features

    def _extract_microstructure_features(self, data: pd.DataFrame) -> pd.DataFrame:
        """市場微細構造の特徴量"""
        features = pd.DataFrame(index=data.index)

        # ビッド・アスクスプレッド(推定)
        features['spread_estimate'] = self._estimate_spread(data)

        # Amihudの非流動性指標
        features['illiquidity'] = self._calculate_amihud_illiquidity(data)

        # Kyle's Lambda
        features['kyle_lambda'] = self._estimate_kyle_lambda(data)

        # Roll's measure
        features['roll_measure'] = self._calculate_roll_measure(data)

        return features

    def _extract_sentiment_features(self, data: pd.DataFrame) -> pd.DataFrame:
        """センチメント関連の特徴量"""
        features = pd.DataFrame(index=data.index)

        # Put/Call ratio proxy (価格の歪度から推定)
        returns = data['close'].pct_change()
        features['return_skew'] = returns.rolling(60).skew()

        # Fear index proxy
        features['fear_index'] = self._calculate_fear_index(data)

        # Momentum
        features['momentum_10'] = data['close'].pct_change(10)
        features['momentum_30'] = data['close'].pct_change(30)

        return features

    def _calculate_trend_strength(self, prices: pd.Series) -> pd.Series:
        """トレンド強度を計算"""
        # 線形回帰の決定係数
        window = 20
        trend_strength = pd.Series(index=prices.index, dtype=float)

        for i in range(window, len(prices)):
            y = prices[i-window:i].values
            x = np.arange(window)

            # 線形回帰
            slope, intercept = np.polyfit(x, y, 1)
            y_pred = slope * x + intercept

            # R²
            ss_res = np.sum((y - y_pred) ** 2)
            ss_tot = np.sum((y - np.mean(y)) ** 2)
            r_squared = 1 - (ss_res / ss_tot) if ss_tot != 0 else 0

            # 符号付きR²(トレンドの方向を含む)
            trend_strength.iloc[i] = r_squared * np.sign(slope)

        return trend_strength

    def _calculate_fractal_dimension(self, prices: pd.Series, window: int = 60) -> pd.Series:
        """フラクタル次元を計算(Hurst指数の逆)"""
        fractal_dim = pd.Series(index=prices.index, dtype=float)

        for i in range(window, len(prices)):
            data = prices[i-window:i].values

            # R/S分析
            mean_data = np.mean(data)
            cumdev = np.cumsum(data - mean_data)
            R = np.max(cumdev) - np.min(cumdev)
            S = np.std(data)

            if S != 0:
                RS = R / S
                # Hurst指数の簡易推定
                H = np.log(RS) / np.log(window)
                # フラクタル次元
                fractal_dim.iloc[i] = 2 - H
            else:
                fractal_dim.iloc[i] = 1.5  # デフォルト値

        return fractal_dim

    def _calculate_obv(self, data: pd.DataFrame) -> pd.Series:
        """On Balance Volumeを計算"""
        obv = pd.Series(index=data.index, dtype=float)
        obv.iloc[0] = 0

        for i in range(1, len(data)):
            if data['close'].iloc[i] > data['close'].iloc[i-1]:
                obv.iloc[i] = obv.iloc[i-1] + data['volume'].iloc[i]
            elif data['close'].iloc[i] < data['close'].iloc[i-1]:
                obv.iloc[i] = obv.iloc[i-1] - data['volume'].iloc[i]
            else:
                obv.iloc[i] = obv.iloc[i-1]

        return obv

    def _calculate_volume_profile(self, data: pd.DataFrame, bins: int = 20) -> pd.Series:
        """ボリュームプロファイルを計算"""
        volume_profile = pd.Series(index=data.index, dtype=float)
        window = 100

        for i in range(window, len(data)):
            window_data = data[i-window:i]

            # 価格帯別のボリューム分布
            price_bins = pd.cut(window_data['close'], bins=bins)
            volume_by_price = window_data.groupby(price_bins)['volume'].sum()

            # 現在価格の相対位置
            current_price = data['close'].iloc[i]
            price_range = window_data['close'].max() - window_data['close'].min()
            relative_position = (current_price - window_data['close'].min()) / price_range

            # ボリュームの偏り(上位価格帯vs下位価格帯)
            mid_point = len(volume_by_price) // 2
            upper_volume = volume_by_price.iloc[mid_point:].sum()
            lower_volume = volume_by_price.iloc[:mid_point].sum()

            volume_skew = (upper_volume - lower_volume) / (upper_volume + lower_volume)
            volume_profile.iloc[i] = volume_skew

        return volume_profile

    def _estimate_garch_volatility(self, returns: pd.Series) -> pd.Series:
        """GARCH(1,1)ボラティリティを推定"""
        # 簡略化されたGARCH
        omega = 0.00001
        alpha = 0.1
        beta = 0.85

        garch_vol = pd.Series(index=returns.index, dtype=float)
        garch_vol.iloc[0] = returns.std()

        for i in range(1, len(returns)):
            garch_vol.iloc[i] = np.sqrt(
                omega + 
                alpha * returns.iloc[i-1]**2 + 
                beta * garch_vol.iloc[i-1]**2
            )

        return garch_vol

    def _calculate_parkinson_volatility(self, data: pd.DataFrame, window: int = 20) -> pd.Series:
        """Parkinsonボラティリティ推定量"""
        factor = 1 / (4 * np.log(2))
        hl_ratio = np.log(data['high'] / data['low']) ** 2

        parkinson_vol = np.sqrt(factor * hl_ratio.rolling(window).mean()) * np.sqrt(252)

        return parkinson_vol

    def _estimate_spread(self, data: pd.DataFrame) -> pd.Series:
        """ビッド・アスクスプレッドを推定(Roll's measure)"""
        returns = data['close'].pct_change()

        # 連続するリターンの共分散
        cov = returns.rolling(20).apply(
            lambda x: np.cov(x[:-1], x[1:])[0, 1] if len(x) > 1 else 0
        )

        # Roll's measure
        spread = 2 * np.sqrt(-cov.clip(upper=0))

        return spread

    def _calculate_amihud_illiquidity(self, data: pd.DataFrame, window: int = 20) -> pd.Series:
        """Amihudの非流動性指標"""
        returns = data['close'].pct_change()

        # |return| / volume
        illiquidity = (np.abs(returns) / data['volume']).rolling(window).mean()

        return illiquidity * 1e6  # スケーリング

    def _estimate_kyle_lambda(self, data: pd.DataFrame, window: int = 60) -> pd.Series:
        """Kyle's Lambda(価格インパクト)を推定"""
        kyle_lambda = pd.Series(index=data.index, dtype=float)

        returns = data['close'].pct_change()
        signed_volume = data['volume'] * np.sign(returns)

        for i in range(window, len(data)):
            # 価格変化と符号付きボリュームの回帰
            y = returns[i-window:i].values
            x = signed_volume[i-window:i].values

            # 回帰係数がKyle's Lambda
            if np.std(x) > 0:
                lambda_est = np.cov(x, y)[0, 1] / np.var(x)
                kyle_lambda.iloc[i] = lambda_est
            else:
                kyle_lambda.iloc[i] = 0

        return kyle_lambda

    def _calculate_roll_measure(self, data: pd.DataFrame) -> pd.Series:
        """Roll's measureによる取引コスト推定"""
        returns = data['close'].pct_change()

        # 自己共分散
        autocov = returns.rolling(20).apply(
            lambda x: np.cov(x[:-1], x[1:])[0, 1] if len(x) > 1 else 0
        )

        # Roll's measure
        roll_measure = 2 * np.sqrt(-autocov.clip(upper=0))

        return roll_measure

    def _calculate_fear_index(self, data: pd.DataFrame) -> pd.Series:
        """恐怖指数を計算(VIXの代理)"""
        returns = data['close'].pct_change()

        # 下方ボラティリティ
        downside_returns = returns.where(returns < 0, 0)
        downside_vol = downside_returns.rolling(20).std()

        # 上方ボラティリティ
        upside_returns = returns.where(returns > 0, 0)
        upside_vol = upside_returns.rolling(20).std()

        # 非対称性(下方が大きいほど恐怖)
        fear_index = downside_vol / (upside_vol + 1e-6)

        return fear_index

    def train_classifier(self, features: pd.DataFrame, labels: pd.Series):
        """市場状況分類器を学習"""
        from sklearn.ensemble import RandomForestClassifier
        from sklearn.preprocessing import StandardScaler

        # 特徴量をスケーリング
        scaler = StandardScaler()
        X_scaled = scaler.fit_transform(features)

        # 分類器を学習
        self.classifier = RandomForestClassifier(
            n_estimators=200,
            max_depth=10,
            min_samples_split=20,
            random_state=42
        )

        self.classifier.fit(X_scaled, labels)

        # 特徴量の重要度
        feature_importance = pd.DataFrame({
            'feature': features.columns,
            'importance': self.classifier.feature_importances_
        }).sort_values('importance', ascending=False)

        return feature_importance

    def classify_market_condition(self, market_data: pd.DataFrame) -> Dict:
        """現在の市場状況を分類"""
        # 特徴量を抽出
        features = self.extract_all_features(market_data)

        if self.classifier is None:
            # 簡単なルールベース分類
            return self._rule_based_classification(features)

        # 機械学習による分類
        latest_features = features.iloc[-1:].values
        prediction = self.classifier.predict(latest_features)[0]
        probabilities = self.classifier.predict_proba(latest_features)[0]

        return {
            'condition': prediction,
            'probabilities': dict(zip(self.classifier.classes_, probabilities)),
            'features': features.iloc[-1].to_dict()
        }

    def _rule_based_classification(self, features: pd.DataFrame) -> Dict:
        """ルールベースの市場分類"""
        latest = features.iloc[-1]

        # 基本的な分類ルール
        if latest['volatility_20'] > 0.04:
            condition = 'High Volatility'
        elif latest['trend_strength'] > 0.7:
            condition = 'Strong Trend'
        elif latest['trend_strength'] < -0.7:
            condition = 'Strong Downtrend'
        elif abs(latest['trend_strength']) < 0.3:
            condition = 'Range Bound'
        else:
            condition = 'Normal'

        return {
            'condition': condition,
            'probabilities': {condition: 1.0},
            'features': latest.to_dict()
        }

6. 統合実装例

class IntegratedAdaptiveSystem:
    def __init__(self, config: Dict):
        self.config = config

        # コンポーネントの初期化
        self.regime_detector = MarketRegimeHMM(n_states=4)
        self.change_detector = OnlineChangePointDetector()
        self.condition_classifier = MarketConditionClassifier()
        self.strategy_selector = AdaptiveStrategySelector()

        # レジーム別戦略を登録
        strategies = RegimeSpecificStrategies()
        for regime, strategy in strategies.strategies.items():
            self.strategy_selector.register_strategy(regime, strategy)

        # パフォーマンストラッキング
        self.performance_tracker = {
            'trades': [],
            'returns': [],
            'regimes': [],
            'signals': []
        }

    def train_models(self, historical_data: pd.DataFrame):
        """すべてのモデルを学習"""
        print("Training regime detection model...")
        self.regime_detector.fit(historical_data)

        print("Extracting market features...")
        features = self.condition_classifier.extract_all_features(historical_data)

        print("Models trained successfully!")

        return self

    def generate_trading_signal(self, current_data: pd.DataFrame) -> Dict:
        """統合的な取引シグナルを生成"""
        # 1. 現在のレジームを検出
        current_regime = self._detect_current_regime(current_data)

        # 2. 変化点を検出
        change_points = self._detect_change_points(current_data)

        # 3. 市場状況を分類
        market_condition = self._classify_market_condition(current_data)

        # 4. 適応的に戦略を選択
        selected_strategy = self.strategy_selector.select_strategy(current_data)

        # 5. シグナルを生成
        base_signal = selected_strategy.generate_signal(current_data)

        # 6. シグナルを調整
        adjusted_signal = self._adjust_signal(
            base_signal, 
            current_regime, 
            change_points, 
            market_condition
        )

        # 7. リスク管理
        final_signal = self._apply_risk_management(adjusted_signal, current_data)

        return {
            'signal': final_signal,
            'regime': current_regime,
            'market_condition': market_condition,
            'confidence': self._calculate_confidence(current_regime, market_condition),
            'metadata': {
                'change_points': change_points,
                'base_signal': base_signal,
                'adjusted_signal': adjusted_signal
            }
        }

    def _detect_current_regime(self, data: pd.DataFrame) -> Dict:
        """現在のレジームを検出"""
        regimes = self.regime_detector.predict_regime(data)
        probabilities = self.regime_detector.predict_regime_probabilities(data)

        current_regime_idx = regimes[-1]
        current_regime_name = self.regime_detector.regime_labels[current_regime_idx]

        return {
            'name': current_regime_name,
            'index': current_regime_idx,
            'probabilities': probabilities[-1],
            'confidence': np.max(probabilities[-1])
        }

    def _detect_change_points(self, data: pd.DataFrame) -> List[int]:
        """変化点を検出"""
        # 複数の系列で変化点を検出
        price_changes = self.change_detector.detect_change_points(data['close'])
        volume_changes = self.change_detector.detect_change_points(data['volume'])

        # 統合
        all_changes = sorted(set(price_changes + volume_changes))

        # 最近の変化点のみ
        recent_changes = [cp for cp in all_changes if cp > len(data) - 100]

        return recent_changes

    def _classify_market_condition(self, data: pd.DataFrame) -> Dict:
        """市場状況を分類"""
        return self.condition_classifier.classify_market_condition(data)

    def _adjust_signal(self, 
                      base_signal: float,
                      regime: Dict,
                      change_points: List[int],
                      market_condition: Dict) -> float:
        """シグナルを調整"""
        adjusted_signal = base_signal

        # レジーム確信度による調整
        if regime['confidence'] < 0.5:
            adjusted_signal *= 0.5  # 確信度が低い場合は控えめに

        # 直近の変化点による調整
        if change_points and len(change_points) > 0:
            latest_change = change_points[-1]
            periods_since_change = len(self.performance_tracker['trades']) - latest_change

            if periods_since_change < 10:
                # 変化点直後は慎重に
                adjusted_signal *= 0.7

        # 市場状況による調整
        if market_condition['condition'] == 'High Volatility':
            adjusted_signal *= 0.6  # ボラティリティが高い場合はポジション削減

        return adjusted_signal

    def _apply_risk_management(self, signal: float, data: pd.DataFrame) -> float:
        """リスク管理を適用"""
        # 最大ポジションサイズ
        max_position = self.config.get('max_position_size', 1.0)

        # ボラティリティベースの調整
        current_vol = data['close'].pct_change().rolling(20).std().iloc[-1]
        target_vol = self.config.get('target_volatility', 0.02)

        vol_adjusted_signal = signal * min(target_vol / current_vol, 1.5)

        # クリッピング
        final_signal = np.clip(vol_adjusted_signal, -max_position, max_position)

        return final_signal

    def _calculate_confidence(self, regime: Dict, market_condition: Dict) -> float:
        """総合的な信頼度を計算"""
        # レジーム確信度
        regime_confidence = regime['confidence']

        # 市場状況の明確さ
        condition_probs = market_condition.get('probabilities', {})
        if condition_probs:
            condition_confidence = max(condition_probs.values())
        else:
            condition_confidence = 0.5

        # 総合信頼度
        overall_confidence = (regime_confidence + condition_confidence) / 2

        return overall_confidence

    def backtest(self, data: pd.DataFrame, initial_capital: float = 10000) -> Dict:
        """バックテストを実行"""
        capital = initial_capital
        position = 0
        trades = []

        # スライディングウィンドウでバックテスト
        window_size = 500

        for i in range(window_size, len(data)):
            # 現在までのデータ
            current_data = data.iloc[:i+1]

            # シグナルを生成
            signal_info = self.generate_trading_signal(current_data)
            signal = signal_info['signal']

            # 取引実行
            current_price = data['close'].iloc[i]

            if signal != 0:
                # ポジション変更
                position_change = signal - position

                if position_change != 0:
                    trade = {
                        'timestamp': data.index[i],
                        'price': current_price,
                        'position_change': position_change,
                        'new_position': signal,
                        'regime': signal_info['regime']['name'],
                        'confidence': signal_info['confidence']
                    }
                    trades.append(trade)

                    # 資本の更新
                    capital -= position_change * current_price * 1.001  # 手数料
                    position = signal

            # パフォーマンス記録
            portfolio_value = capital + position * current_price
            returns = (portfolio_value / initial_capital - 1) * 100

            self.performance_tracker['returns'].append(returns)
            self.performance_tracker['regimes'].append(signal_info['regime']['name'])
            self.performance_tracker['signals'].append(signal)

        # 結果の分析
        return self._analyze_backtest_results(trades, data)

    def _analyze_backtest_results(self, trades: List[Dict], data: pd.DataFrame) -> Dict:
        """バックテスト結果を分析"""
        if not trades:
            return {'error': 'No trades executed'}

        # 基本統計
        returns = pd.Series(self.performance_tracker['returns'])

        # シャープレシオ
        sharpe = np.sqrt(252) * returns.pct_change().mean() / returns.pct_change().std()

        # 最大ドローダウン
        cummax = returns.cummax()
        drawdown = (returns - cummax) / cummax
        max_drawdown = drawdown.min()

        # レジーム別パフォーマンス
        regime_performance = self._analyze_regime_performance()

        return {
            'total_return': returns.iloc[-1],
            'sharpe_ratio': sharpe,
            'max_drawdown': max_drawdown,
            'num_trades': len(trades),
            'win_rate': self._calculate_win_rate(trades, data),
            'regime_performance': regime_performance,
            'trades': trades
        }

    def _calculate_win_rate(self, trades: List[Dict], data: pd.DataFrame) -> float:
        """勝率を計算"""
        if len(trades) < 2:
            return 0.0

        wins = 0
        for i in range(1, len(trades)):
            entry_price = trades[i-1]['price']
            exit_price = trades[i]['price']
            position = trades[i-1]['new_position']

            if position > 0 and exit_price > entry_price:
                wins += 1
            elif position < 0 and exit_price < entry_price:
                wins += 1

        return wins / (len(trades) - 1)

    def _analyze_regime_performance(self) -> Dict:
        """レジーム別のパフォーマンスを分析"""
        regime_returns = {}

        current_regime = None
        regime_start_idx = 0

        for i, regime in enumerate(self.performance_tracker['regimes']):
            if regime != current_regime:
                if current_regime is not None:
                    # 前のレジームのリターンを計算
                    regime_return = (
                        self.performance_tracker['returns'][i-1] - 
                        self.performance_tracker['returns'][regime_start_idx]
                    )

                    if current_regime not in regime_returns:
                        regime_returns[current_regime] = []
                    regime_returns[current_regime].append(regime_return)

                current_regime = regime
                regime_start_idx = i

        # 統計を計算
        regime_stats = {}
        for regime, returns in regime_returns.items():
            if returns:
                regime_stats[regime] = {
                    'mean_return': np.mean(returns),
                    'total_return': np.sum(returns),
                    'num_periods': len(returns)
                }

        return regime_stats

# 使用例
def main():
    # データの読み込み
    data = pd.read_csv('crypto_data.csv', parse_dates=['timestamp'])
    data.set_index('timestamp', inplace=True)

    # システムの設定
    config = {
        'max_position_size': 1.0,
        'target_volatility': 0.02,
        'transaction_cost': 0.001
    }

    # システムの初期化と学習
    system = IntegratedAdaptiveSystem(config)
    system.train_models(data[:1000])  # 最初の1000データポイントで学習

    # バックテスト
    results = system.backtest(data[1000:])

    # 結果の表示
    print(f"Total Return: {results['total_return']:.2f}%")
    print(f"Sharpe Ratio: {results['sharpe_ratio']:.2f}")
    print(f"Max Drawdown: {results['max_drawdown']:.2%}")
    print(f"Win Rate: {results['win_rate']:.2%}")
    print(f"Number of Trades: {results['num_trades']}")

    print("\nRegime Performance:")
    for regime, stats in results['regime_performance'].items():
        print(f"{regime}: {stats['mean_return']:.2f}% average return")

if __name__ == "__main__":
    main()

まとめ

本ドキュメントでは、暗号通貨取引における市場レジーム検出と適応的戦略について包括的に解説しました。

主要なポイント

  1. Hidden Markov Models(HMM)
    - 複数の市場状態を自動検出
    - レジーム遷移確率の推定
    - GARCH効果を含む高度なモデリング

  2. 変化点検出
    - オンラインベイジアン検出
    - CUSUM統計量
    - リアルタイム適応

  3. 適応的戦略切り替え
    - レジーム別最適戦略
    - 動的戦略ブレンディング
    - パフォーマンストラッキング

  4. 市場状況分類
    - マルチモーダル特徴抽出
    - 機械学習による分類
    - 包括的な市場分析

  5. 統合システム
    - すべての要素の統合
    - リスク管理の組み込み
    - バックテストフレームワーク

これらの技術を適切に実装することで、市場の変化に素早く適応し、各市場状況に最適な戦略を自動的に選択する高度な取引システムを構築できます。