ML Documentation

暗号通貨価格予測のバックテスト手法

概要

暗号通貨市場は24時間365日稼働し、高頻度で価格が変動します。本ドキュメントでは、暗号通貨特有の市場特性を考慮したバックテスト手法と、ティックデータなどの不規則時系列データの扱い方について解説します。

1. 暗号通貨市場の特性とバックテストの課題

1.1 市場特性

1.2 バックテストの主要な課題

# バックテストで考慮すべき課題
BACKTESTING_CHALLENGES = {
    'data_quality': {
        'issues': ['欠損データ', '異常値', '取引所ダウンタイム'],
        'impact': 'パフォーマンスの過大評価'
    },
    'market_microstructure': {
        'issues': ['スプレッド', 'スリッページ', '注文板の深さ'],
        'impact': '実際の取引コストの過小評価'
    },
    'survivorship_bias': {
        'issues': ['廃止された取引所', '上場廃止トークン'],
        'impact': 'リターンの過大評価'
    },
    'lookahead_bias': {
        'issues': ['未来情報の使用', 'データの遅延'],
        'impact': '非現実的な取引シグナル'
    }
}

2. 時系列データの前処理

2.1 規則的時系列データ(OHLCV)の処理

import pandas as pd
import numpy as np
from datetime import datetime, timedelta

class OHLCVDataProcessor:
    def __init__(self, timeframe='1h'):
        self.timeframe = timeframe
        self.required_columns = ['open', 'high', 'low', 'close', 'volume']

    def validate_and_clean_data(self, df):
        """OHLCVデータの検証とクリーニング"""
        # タイムスタンプのインデックス化
        df = df.set_index('timestamp') if 'timestamp' in df.columns else df
        df.index = pd.to_datetime(df.index)

        # 重複の削除
        df = df[~df.index.duplicated(keep='first')]

        # ソート
        df = df.sort_index()

        # 基本的な検証
        validation_results = {
            'missing_values': df.isnull().sum(),
            'negative_prices': (df[['open', 'high', 'low', 'close']] < 0).sum(),
            'volume_issues': (df['volume'] < 0).sum(),
            'ohlc_consistency': self.check_ohlc_consistency(df)
        }

        # データクリーニング
        df = self.fix_data_issues(df, validation_results)

        return df, validation_results

    def check_ohlc_consistency(self, df):
        """OHLC関係の整合性チェック"""
        issues = {
            'high_low': (df['high'] < df['low']).sum(),
            'high_open': (df['high'] < df['open']).sum(),
            'high_close': (df['high'] < df['close']).sum(),
            'low_open': (df['low'] > df['open']).sum(),
            'low_close': (df['low'] > df['close']).sum()
        }
        return issues

    def fix_data_issues(self, df, validation_results):
        """データの問題を修正"""
        # OHLC整合性の修正
        df.loc[df['high'] < df['low'], 'high'] = df['low']
        df.loc[df['high'] < df['open'], 'high'] = df['open']
        df.loc[df['high'] < df['close'], 'high'] = df['close']
        df.loc[df['low'] > df['open'], 'low'] = df['open']
        df.loc[df['low'] > df['close'], 'low'] = df['close']

        # 欠損値の処理
        df = self.handle_missing_values(df)

        return df

    def handle_missing_values(self, df):
        """欠損値の処理"""
        # 前方補完を基本とし、最初の値は後方補完
        df = df.fillna(method='ffill').fillna(method='bfill')

        # ボリュームの欠損は0で補完
        df['volume'] = df['volume'].fillna(0)

        return df

    def resample_to_timeframe(self, df, target_timeframe):
        """異なる時間枠へのリサンプリング"""
        resampling_rules = {
            'open': 'first',
            'high': 'max',
            'low': 'min',
            'close': 'last',
            'volume': 'sum'
        }

        resampled = df.resample(target_timeframe).agg(resampling_rules)

        # 取引がない期間の処理
        resampled = self.fill_no_trade_periods(resampled)

        return resampled

    def fill_no_trade_periods(self, df):
        """取引がない期間の処理"""
        # 前の終値で埋める
        df['open'] = df['open'].fillna(df['close'].shift(1))
        df['high'] = df['high'].fillna(df['close'].shift(1))
        df['low'] = df['low'].fillna(df['close'].shift(1))
        df['close'] = df['close'].fillna(method='ffill')
        df['volume'] = df['volume'].fillna(0)

        return df

2.2 不規則時系列データ(ティックデータ)の処理

class TickDataProcessor:
    def __init__(self):
        self.tick_columns = ['timestamp', 'price', 'volume', 'side']

    def process_tick_data(self, ticks):
        """ティックデータの処理"""
        df = pd.DataFrame(ticks)
        df['timestamp'] = pd.to_datetime(df['timestamp'])
        df = df.sort_values('timestamp')

        # マイクロ秒単位の重複処理
        df = self.handle_microsecond_duplicates(df)

        # 異常値の検出と処理
        df = self.detect_and_handle_outliers(df)

        return df

    def handle_microsecond_duplicates(self, df):
        """マイクロ秒単位の重複処理"""
        # 同一タイムスタンプの取引を集約
        aggregation = {
            'price': 'mean',  # VWAP的なアプローチ
            'volume': 'sum',
            'side': lambda x: x.mode()[0] if len(x.mode()) > 0 else x.iloc[0]
        }

        df = df.groupby('timestamp').agg(aggregation).reset_index()

        return df

    def detect_and_handle_outliers(self, df):
        """異常値の検出と処理"""
        # ローリングウィンドウでの標準偏差計算
        window_size = 100
        df['price_mean'] = df['price'].rolling(window=window_size, center=True).mean()
        df['price_std'] = df['price'].rolling(window=window_size, center=True).std()

        # 3シグマを超える値を異常値とする
        df['is_outlier'] = np.abs(df['price'] - df['price_mean']) > 3 * df['price_std']

        # 異常値の処理(前後の平均値で置換)
        outlier_indices = df[df['is_outlier']].index
        for idx in outlier_indices:
            if idx > 0 and idx < len(df) - 1:
                df.loc[idx, 'price'] = (df.loc[idx-1, 'price'] + df.loc[idx+1, 'price']) / 2

        # 一時列の削除
        df = df.drop(columns=['price_mean', 'price_std', 'is_outlier'])

        return df

    def convert_to_bars(self, tick_df, bar_type='time', bar_size=60):
        """ティックデータからバーデータへの変換"""
        if bar_type == 'time':
            return self.create_time_bars(tick_df, bar_size)
        elif bar_type == 'tick':
            return self.create_tick_bars(tick_df, bar_size)
        elif bar_type == 'volume':
            return self.create_volume_bars(tick_df, bar_size)
        elif bar_type == 'dollar':
            return self.create_dollar_bars(tick_df, bar_size)
        else:
            raise ValueError(f"Unknown bar type: {bar_type}")

    def create_time_bars(self, df, seconds):
        """時間バーの作成"""
        df.set_index('timestamp', inplace=True)

        ohlc = df['price'].resample(f'{seconds}S').ohlc()
        volume = df['volume'].resample(f'{seconds}S').sum()

        bars = pd.concat([ohlc, volume], axis=1)
        bars = bars.dropna()

        return bars

    def create_tick_bars(self, df, tick_count):
        """ティックバーの作成(固定数のティックごと)"""
        bars = []

        for i in range(0, len(df), tick_count):
            subset = df.iloc[i:i+tick_count]
            if len(subset) > 0:
                bar = {
                    'timestamp': subset.iloc[-1]['timestamp'],
                    'open': subset.iloc[0]['price'],
                    'high': subset['price'].max(),
                    'low': subset['price'].min(),
                    'close': subset.iloc[-1]['price'],
                    'volume': subset['volume'].sum(),
                    'tick_count': len(subset)
                }
                bars.append(bar)

        return pd.DataFrame(bars).set_index('timestamp')

    def create_volume_bars(self, df, volume_threshold):
        """ボリュームバーの作成(固定ボリュームごと)"""
        bars = []
        current_volume = 0
        current_bar = []

        for _, row in df.iterrows():
            current_bar.append(row)
            current_volume += row['volume']

            if current_volume >= volume_threshold:
                bar_df = pd.DataFrame(current_bar)
                bar = {
                    'timestamp': bar_df.iloc[-1]['timestamp'],
                    'open': bar_df.iloc[0]['price'],
                    'high': bar_df['price'].max(),
                    'low': bar_df['price'].min(),
                    'close': bar_df.iloc[-1]['price'],
                    'volume': bar_df['volume'].sum(),
                    'tick_count': len(bar_df)
                }
                bars.append(bar)

                # リセット
                current_volume = 0
                current_bar = []

        return pd.DataFrame(bars).set_index('timestamp')

    def create_dollar_bars(self, df, dollar_threshold):
        """ドルバーの作成(固定金額ごと)"""
        bars = []
        current_dollars = 0
        current_bar = []

        for _, row in df.iterrows():
            current_bar.append(row)
            current_dollars += row['price'] * row['volume']

            if current_dollars >= dollar_threshold:
                bar_df = pd.DataFrame(current_bar)
                bar = {
                    'timestamp': bar_df.iloc[-1]['timestamp'],
                    'open': bar_df.iloc[0]['price'],
                    'high': bar_df['price'].max(),
                    'low': bar_df['price'].min(),
                    'close': bar_df.iloc[-1]['price'],
                    'volume': bar_df['volume'].sum(),
                    'dollar_volume': current_dollars,
                    'tick_count': len(bar_df)
                }
                bars.append(bar)

                # リセット
                current_dollars = 0
                current_bar = []

        return pd.DataFrame(bars).set_index('timestamp')

3. バックテストフレームワーク

3.1 基本的なバックテストエンジン

class BacktestEngine:
    def __init__(self, initial_capital=10000, commission=0.001, slippage=0.0005):
        self.initial_capital = initial_capital
        self.commission = commission  # 0.1%
        self.slippage = slippage     # 0.05%
        self.positions = {}
        self.trades = []
        self.equity_curve = []

    def run_backtest(self, data, strategy, start_date=None, end_date=None):
        """バックテストの実行"""
        # データの期間フィルタリング
        if start_date:
            data = data[data.index >= start_date]
        if end_date:
            data = data[data.index <= end_date]

        # 初期化
        self.reset()
        capital = self.initial_capital
        position = 0

        # メインループ
        for timestamp, row in data.iterrows():
            # シグナル生成
            signal = strategy.generate_signal(data.loc[:timestamp])

            # 取引実行
            if signal != 0 and signal != position:
                capital, position = self.execute_trade(
                    timestamp, row, signal, capital, position
                )

            # 資産価値の記録
            equity = self.calculate_equity(capital, position, row['close'])
            self.equity_curve.append({
                'timestamp': timestamp,
                'equity': equity,
                'capital': capital,
                'position_value': position * row['close'] if position else 0
            })

        return self.generate_results()

    def execute_trade(self, timestamp, row, signal, capital, position):
        """取引の実行"""
        price = row['close']

        # スリッページの適用
        if signal > position:  # 買い
            execution_price = price * (1 + self.slippage)
        else:  # 売り
            execution_price = price * (1 - self.slippage)

        # ポジションサイズの計算
        if signal == 1 and position == 0:  # 新規買い
            size = (capital * 0.95) / execution_price  # 資金の95%を使用
            cost = size * execution_price * (1 + self.commission)

            if cost <= capital:
                capital -= cost
                position = size
                self.record_trade('BUY', timestamp, execution_price, size, cost)

        elif signal == -1 and position > 0:  # 売り(ポジションクローズ)
            proceeds = position * execution_price * (1 - self.commission)
            capital += proceeds
            self.record_trade('SELL', timestamp, execution_price, position, proceeds)
            position = 0

        return capital, position

    def calculate_equity(self, capital, position, current_price):
        """総資産価値の計算"""
        return capital + (position * current_price if position else 0)

    def record_trade(self, side, timestamp, price, size, value):
        """取引の記録"""
        self.trades.append({
            'timestamp': timestamp,
            'side': side,
            'price': price,
            'size': size,
            'value': value,
            'commission': value * self.commission
        })

    def generate_results(self):
        """バックテスト結果の生成"""
        equity_df = pd.DataFrame(self.equity_curve).set_index('timestamp')
        trades_df = pd.DataFrame(self.trades)

        # パフォーマンス指標の計算
        metrics = self.calculate_performance_metrics(equity_df, trades_df)

        return {
            'equity_curve': equity_df,
            'trades': trades_df,
            'metrics': metrics
        }

    def calculate_performance_metrics(self, equity_df, trades_df):
        """パフォーマンス指標の計算"""
        returns = equity_df['equity'].pct_change().dropna()

        metrics = {
            'total_return': (equity_df['equity'].iloc[-1] / self.initial_capital - 1) * 100,
            'annualized_return': self.calculate_annualized_return(returns),
            'sharpe_ratio': self.calculate_sharpe_ratio(returns),
            'max_drawdown': self.calculate_max_drawdown(equity_df['equity']),
            'win_rate': self.calculate_win_rate(trades_df),
            'profit_factor': self.calculate_profit_factor(trades_df),
            'total_trades': len(trades_df),
            'avg_trade_duration': self.calculate_avg_trade_duration(trades_df)
        }

        return metrics

    def calculate_annualized_return(self, returns):
        """年率リターンの計算"""
        days = len(returns)
        if days == 0:
            return 0
        total_return = (1 + returns).prod() - 1
        return (1 + total_return) ** (365 / days) - 1

    def calculate_sharpe_ratio(self, returns, risk_free_rate=0.02):
        """シャープレシオの計算"""
        if len(returns) == 0 or returns.std() == 0:
            return 0
        excess_returns = returns - risk_free_rate / 365
        return np.sqrt(365) * excess_returns.mean() / returns.std()

    def calculate_max_drawdown(self, equity_series):
        """最大ドローダウンの計算"""
        cumulative = (1 + equity_series.pct_change()).cumprod()
        running_max = cumulative.cummax()
        drawdown = (cumulative - running_max) / running_max
        return drawdown.min() * 100

    def calculate_win_rate(self, trades_df):
        """勝率の計算"""
        if len(trades_df) < 2:
            return 0

        # ペアごとの損益計算
        profits = []
        for i in range(0, len(trades_df) - 1, 2):
            if i + 1 < len(trades_df):
                buy_trade = trades_df.iloc[i]
                sell_trade = trades_df.iloc[i + 1]
                profit = sell_trade['value'] - buy_trade['value']
                profits.append(profit)

        if not profits:
            return 0

        wins = sum(1 for p in profits if p > 0)
        return (wins / len(profits)) * 100

3.2 高度なバックテスト機能

class AdvancedBacktestEngine(BacktestEngine):
    def __init__(self, initial_capital=10000, commission=0.001, slippage=0.0005):
        super().__init__(initial_capital, commission, slippage)
        self.order_book_impact = OrderBookImpact()
        self.market_impact_model = MarketImpactModel()

    def execute_trade_with_market_impact(self, timestamp, row, signal, capital, position, order_book):
        """市場影響を考慮した取引実行"""
        price = row['close']

        # 注文サイズの計算
        if signal == 1 and position == 0:
            target_size = (capital * 0.95) / price

            # 市場影響の推定
            impact = self.market_impact_model.estimate_impact(
                target_size,
                order_book,
                row['volume']
            )

            # 実行価格の調整
            execution_price = price * (1 + impact + self.slippage)

            # 実際に実行可能なサイズ
            executable_size = self.order_book_impact.calculate_executable_size(
                target_size,
                order_book,
                execution_price
            )

            cost = executable_size * execution_price * (1 + self.commission)

            if cost <= capital:
                capital -= cost
                position = executable_size
                self.record_trade_with_impact(
                    'BUY', timestamp, execution_price, executable_size, cost, impact
                )

        return capital, position

    def simulate_limit_order(self, order_price, order_size, market_data, order_book):
        """指値注文のシミュレーション"""
        filled_size = 0
        filled_value = 0

        # 注文が約定するかチェック
        if order_price >= market_data['low'] and order_price <= market_data['high']:
            # 部分約定の可能性を考慮
            available_liquidity = self.estimate_liquidity_at_price(
                order_price,
                order_book,
                market_data['volume']
            )

            filled_size = min(order_size, available_liquidity)
            filled_value = filled_size * order_price

        return filled_size, filled_value

class OrderBookImpact:
    """注文板への影響モデル"""
    def calculate_executable_size(self, target_size, order_book, max_price):
        """実行可能なサイズの計算"""
        if not order_book or 'asks' not in order_book:
            return target_size * 0.5  # デフォルトで50%

        executable = 0
        total_cost = 0

        for ask in order_book['asks']:
            price, size = ask
            if price > max_price:
                break

            take_size = min(target_size - executable, size)
            executable += take_size
            total_cost += take_size * price

            if executable >= target_size:
                break

        return executable

class MarketImpactModel:
    """市場影響モデル"""
    def __init__(self, permanent_impact=0.1, temporary_impact=0.5):
        self.permanent_impact = permanent_impact
        self.temporary_impact = temporary_impact

    def estimate_impact(self, order_size, order_book, daily_volume):
        """市場影響の推定"""
        # 参加率(order size / daily volume)
        participation_rate = order_size / max(daily_volume, 1)

        # 平方根モデル
        permanent = self.permanent_impact * np.sqrt(participation_rate)
        temporary = self.temporary_impact * participation_rate

        return permanent + temporary

3.3 ウォークフォワード分析

class WalkForwardAnalysis:
    def __init__(self, backtest_engine, optimization_window=180, test_window=30):
        self.backtest_engine = backtest_engine
        self.optimization_window = optimization_window  # 日数
        self.test_window = test_window
        self.results = []

    def run_walk_forward(self, data, strategy_class, param_grid):
        """ウォークフォワード分析の実行"""
        total_days = (data.index[-1] - data.index[0]).days

        # ウィンドウをスライドさせながら分析
        start_date = data.index[0]

        while start_date + timedelta(days=self.optimization_window + self.test_window) <= data.index[-1]:
            # 最適化期間
            opt_start = start_date
            opt_end = start_date + timedelta(days=self.optimization_window)

            # テスト期間
            test_start = opt_end
            test_end = test_start + timedelta(days=self.test_window)

            # パラメータ最適化
            best_params = self.optimize_parameters(
                data[opt_start:opt_end],
                strategy_class,
                param_grid
            )

            # テスト期間でのバックテスト
            strategy = strategy_class(**best_params)
            test_results = self.backtest_engine.run_backtest(
                data[test_start:test_end],
                strategy
            )

            self.results.append({
                'optimization_period': (opt_start, opt_end),
                'test_period': (test_start, test_end),
                'best_params': best_params,
                'test_results': test_results
            })

            # 次のウィンドウへ
            start_date += timedelta(days=self.test_window)

        return self.analyze_results()

    def optimize_parameters(self, data, strategy_class, param_grid):
        """パラメータの最適化"""
        best_score = -np.inf
        best_params = None

        # グリッドサーチ
        for params in self.generate_param_combinations(param_grid):
            strategy = strategy_class(**params)
            results = self.backtest_engine.run_backtest(data, strategy)

            # 評価指標(シャープレシオを使用)
            score = results['metrics']['sharpe_ratio']

            if score > best_score:
                best_score = score
                best_params = params

        return best_params

    def analyze_results(self):
        """ウォークフォワード結果の分析"""
        # 各期間の結果を集約
        all_metrics = []
        for result in self.results:
            metrics = result['test_results']['metrics']
            metrics['period'] = result['test_period']
            all_metrics.append(metrics)

        metrics_df = pd.DataFrame(all_metrics)

        # 統計サマリー
        summary = {
            'avg_return': metrics_df['total_return'].mean(),
            'std_return': metrics_df['total_return'].std(),
            'avg_sharpe': metrics_df['sharpe_ratio'].mean(),
            'consistency': (metrics_df['total_return'] > 0).mean() * 100,
            'worst_period': metrics_df['total_return'].min(),
            'best_period': metrics_df['total_return'].max()
        }

        return {
            'detailed_results': self.results,
            'metrics_summary': summary,
            'metrics_dataframe': metrics_df
        }

4. 暗号通貨特有の考慮事項

4.1 取引所別バックテスト

class MultiExchangeBacktest:
    def __init__(self, exchanges=['binance', 'coinbase', 'kraken']):
        self.exchanges = exchanges
        self.arbitrage_tracker = ArbitrageTracker()

    def run_multi_exchange_backtest(self, data_dict, strategy):
        """複数取引所でのバックテスト"""
        results = {}

        for exchange in self.exchanges:
            if exchange in data_dict:
                engine = BacktestEngine(
                    commission=self.get_exchange_fees(exchange)
                )
                results[exchange] = engine.run_backtest(
                    data_dict[exchange],
                    strategy
                )

        # アービトラージ機会の分析
        arbitrage_opps = self.arbitrage_tracker.find_opportunities(data_dict)

        return {
            'exchange_results': results,
            'arbitrage_opportunities': arbitrage_opps,
            'best_exchange': self.find_best_exchange(results)
        }

    def get_exchange_fees(self, exchange):
        """取引所別手数料"""
        fees = {
            'binance': 0.001,
            'coinbase': 0.005,
            'kraken': 0.0026,
            'ftx': 0.0007  # 過去のデータ用
        }
        return fees.get(exchange, 0.002)

class ArbitrageTracker:
    def find_opportunities(self, data_dict):
        """アービトラージ機会の検出"""
        opportunities = []
        timestamps = self.get_common_timestamps(data_dict)

        for ts in timestamps:
            prices = {}
            for exchange, data in data_dict.items():
                if ts in data.index:
                    prices[exchange] = data.loc[ts, 'close']

            if len(prices) >= 2:
                max_exchange = max(prices, key=prices.get)
                min_exchange = min(prices, key=prices.get)
                spread = (prices[max_exchange] - prices[min_exchange]) / prices[min_exchange]

                if spread > 0.002:  # 0.2%以上の差
                    opportunities.append({
                        'timestamp': ts,
                        'buy_exchange': min_exchange,
                        'sell_exchange': max_exchange,
                        'spread_percentage': spread * 100,
                        'buy_price': prices[min_exchange],
                        'sell_price': prices[max_exchange]
                    })

        return opportunities

4.2 ステーブルコイン考慮

class StablecoinAwareBacktest:
    def __init__(self, stable_coins=['USDT', 'USDC', 'BUSD', 'DAI']):
        self.stable_coins = stable_coins
        self.depeg_threshold = 0.02  # 2%

    def check_stablecoin_depeg(self, stablecoin_data):
        """ステーブルコインのデペッグ検出"""
        depeg_events = []

        for coin in self.stable_coins:
            if coin in stablecoin_data:
                data = stablecoin_data[coin]

                # $1からの乖離をチェック
                deviations = np.abs(data['close'] - 1.0)
                depeg_mask = deviations > self.depeg_threshold

                if depeg_mask.any():
                    depeg_periods = self.find_continuous_periods(depeg_mask)

                    for period in depeg_periods:
                        depeg_events.append({
                            'coin': coin,
                            'start': period['start'],
                            'end': period['end'],
                            'max_deviation': deviations[period['start']:period['end']].max(),
                            'duration_hours': (period['end'] - period['start']).total_seconds() / 3600
                        })

        return depeg_events

    def adjust_for_stablecoin_risk(self, backtest_results, depeg_events):
        """ステーブルコインリスクの調整"""
        # デペッグ期間中の取引に追加コストを適用
        adjusted_trades = []

        for trade in backtest_results['trades']:
            trade_time = trade['timestamp']
            additional_cost = 0

            # デペッグ期間中かチェック
            for event in depeg_events:
                if event['start'] <= trade_time <= event['end']:
                    # デペッグの深刻度に応じてコスト追加
                    additional_cost = trade['value'] * event['max_deviation'] * 0.5
                    break

            trade['adjusted_value'] = trade['value'] - additional_cost
            adjusted_trades.append(trade)

        return adjusted_trades

4.3 ガス代・ネットワーク手数料の考慮

class NetworkFeeBacktest:
    def __init__(self):
        self.gas_price_history = {}
        self.network_fees = {
            'ethereum': self.calculate_eth_gas_fee,
            'bsc': self.calculate_bsc_fee,
            'polygon': self.calculate_polygon_fee
        }

    def load_gas_price_history(self, network, data):
        """ガス価格履歴の読み込み"""
        self.gas_price_history[network] = data

    def calculate_eth_gas_fee(self, timestamp, transaction_type='swap'):
        """イーサリアムのガス代計算"""
        gas_units = {
            'transfer': 21000,
            'swap': 150000,
            'add_liquidity': 200000,
            'remove_liquidity': 150000
        }

        if timestamp in self.gas_price_history.get('ethereum', {}):
            gas_price_gwei = self.gas_price_history['ethereum'][timestamp]
            gas_cost_eth = (gas_units[transaction_type] * gas_price_gwei) / 1e9

            # ETH価格を使ってUSD換算
            eth_price = self.get_eth_price(timestamp)
            return gas_cost_eth * eth_price

        return 10  # デフォルト値 $10

    def apply_network_fees(self, trades, network='ethereum'):
        """ネットワーク手数料の適用"""
        adjusted_trades = []

        for trade in trades:
            if network in self.network_fees:
                network_fee = self.network_fees[network](
                    trade['timestamp'],
                    'swap'
                )
                trade['network_fee'] = network_fee
                trade['net_value'] = trade['value'] - trade['commission'] - network_fee
            else:
                trade['network_fee'] = 0
                trade['net_value'] = trade['value'] - trade['commission']

            adjusted_trades.append(trade)

        return adjusted_trades

5. パフォーマンス評価と最適化

5.1 詳細なパフォーマンス分析

class PerformanceAnalyzer:
    def __init__(self):
        self.risk_free_rate = 0.02  # 2%年率

    def comprehensive_analysis(self, equity_curve, trades, market_data):
        """包括的なパフォーマンス分析"""
        returns = equity_curve['equity'].pct_change().dropna()

        # 基本統計
        basic_stats = {
            'total_return': (equity_curve['equity'].iloc[-1] / equity_curve['equity'].iloc[0] - 1) * 100,
            'cagr': self.calculate_cagr(equity_curve),
            'volatility': returns.std() * np.sqrt(365),
            'sharpe_ratio': self.calculate_sharpe_ratio(returns),
            'sortino_ratio': self.calculate_sortino_ratio(returns),
            'calmar_ratio': self.calculate_calmar_ratio(equity_curve)
        }

        # ドローダウン分析
        drawdown_stats = self.analyze_drawdowns(equity_curve)

        # 取引分析
        trade_stats = self.analyze_trades(trades)

        # 市場相関
        market_correlation = self.calculate_market_correlation(returns, market_data)

        # リスク指標
        risk_metrics = {
            'var_95': self.calculate_var(returns, 0.95),
            'cvar_95': self.calculate_cvar(returns, 0.95),
            'max_leverage': self.calculate_max_leverage(equity_curve),
            'kelly_criterion': self.calculate_kelly_criterion(trade_stats)
        }

        return {
            'basic_stats': basic_stats,
            'drawdown_stats': drawdown_stats,
            'trade_stats': trade_stats,
            'market_correlation': market_correlation,
            'risk_metrics': risk_metrics
        }

    def analyze_drawdowns(self, equity_curve):
        """ドローダウンの詳細分析"""
        equity = equity_curve['equity']
        running_max = equity.cummax()
        drawdown = (equity - running_max) / running_max

        # ドローダウン期間の特定
        drawdown_periods = []
        in_drawdown = False
        start_idx = None

        for i, dd in enumerate(drawdown):
            if dd < 0 and not in_drawdown:
                in_drawdown = True
                start_idx = i
            elif dd == 0 and in_drawdown:
                in_drawdown = False
                end_idx = i

                period_dd = drawdown[start_idx:end_idx]
                drawdown_periods.append({
                    'start': equity.index[start_idx],
                    'end': equity.index[end_idx],
                    'max_drawdown': period_dd.min() * 100,
                    'duration_days': (equity.index[end_idx] - equity.index[start_idx]).days,
                    'recovery_days': end_idx - start_idx - period_dd.argmin()
                })

        # 統計サマリー
        if drawdown_periods:
            stats = {
                'max_drawdown': min(p['max_drawdown'] for p in drawdown_periods),
                'avg_drawdown': np.mean([p['max_drawdown'] for p in drawdown_periods]),
                'max_duration': max(p['duration_days'] for p in drawdown_periods),
                'avg_duration': np.mean([p['duration_days'] for p in drawdown_periods]),
                'total_underwater_time': sum(p['duration_days'] for p in drawdown_periods),
                'drawdown_periods': drawdown_periods
            }
        else:
            stats = {
                'max_drawdown': 0,
                'avg_drawdown': 0,
                'max_duration': 0,
                'avg_duration': 0,
                'total_underwater_time': 0,
                'drawdown_periods': []
            }

        return stats

    def calculate_sortino_ratio(self, returns, target_return=0):
        """ソルティノレシオの計算"""
        excess_returns = returns - target_return / 365
        downside_returns = excess_returns[excess_returns < 0]

        if len(downside_returns) == 0:
            return np.inf

        downside_std = np.sqrt(np.mean(downside_returns ** 2))

        if downside_std == 0:
            return np.inf

        return np.sqrt(365) * excess_returns.mean() / downside_std

    def calculate_kelly_criterion(self, trade_stats):
        """ケリー基準の計算"""
        if trade_stats['total_trades'] == 0:
            return 0

        win_rate = trade_stats['win_rate'] / 100
        avg_win = trade_stats.get('avg_win', 0)
        avg_loss = abs(trade_stats.get('avg_loss', 1))

        if avg_loss == 0:
            return 0

        odds = avg_win / avg_loss
        kelly = (win_rate * odds - (1 - win_rate)) / odds

        # 安全のため1/4ケリーを推奨
        return max(0, min(kelly / 4, 0.25))

5.2 モンテカルロシミュレーション

class MonteCarloSimulation:
    def __init__(self, n_simulations=1000):
        self.n_simulations = n_simulations

    def run_monte_carlo(self, historical_returns, n_days=252):
        """モンテカルロシミュレーションの実行"""
        results = []

        # リターンの統計量
        mean_return = historical_returns.mean()
        std_return = historical_returns.std()

        for _ in range(self.n_simulations):
            # ランダムウォーク
            simulated_returns = np.random.normal(
                mean_return,
                std_return,
                n_days
            )

            # 累積リターン
            cumulative_return = (1 + simulated_returns).cumprod()

            results.append({
                'final_return': cumulative_return[-1] - 1,
                'max_drawdown': self.calculate_max_dd(cumulative_return),
                'volatility': simulated_returns.std() * np.sqrt(252),
                'path': cumulative_return
            })

        return self.analyze_simulation_results(results)

    def analyze_simulation_results(self, results):
        """シミュレーション結果の分析"""
        final_returns = [r['final_return'] for r in results]
        max_drawdowns = [r['max_drawdown'] for r in results]

        analysis = {
            'return_percentiles': {
                '5%': np.percentile(final_returns, 5),
                '25%': np.percentile(final_returns, 25),
                '50%': np.percentile(final_returns, 50),
                '75%': np.percentile(final_returns, 75),
                '95%': np.percentile(final_returns, 95)
            },
            'drawdown_percentiles': {
                '5%': np.percentile(max_drawdowns, 5),
                '25%': np.percentile(max_drawdowns, 25),
                '50%': np.percentile(max_drawdowns, 50),
                '75%': np.percentile(max_drawdowns, 75),
                '95%': np.percentile(max_drawdowns, 95)
            },
            'probability_of_loss': sum(1 for r in final_returns if r < 0) / len(final_returns),
            'expected_return': np.mean(final_returns),
            'return_std': np.std(final_returns)
        }

        return analysis

    def bootstrap_confidence_intervals(self, returns, statistic_func, confidence=0.95):
        """ブートストラップ信頼区間"""
        bootstrap_stats = []

        for _ in range(self.n_simulations):
            # リサンプリング
            sample = np.random.choice(returns, size=len(returns), replace=True)
            stat = statistic_func(sample)
            bootstrap_stats.append(stat)

        # 信頼区間
        alpha = 1 - confidence
        lower = np.percentile(bootstrap_stats, alpha/2 * 100)
        upper = np.percentile(bootstrap_stats, (1 - alpha/2) * 100)

        return {
            'mean': np.mean(bootstrap_stats),
            'std': np.std(bootstrap_stats),
            'confidence_interval': (lower, upper)
        }

6. 実装例:完全なバックテストシステム

class CryptoBacktestSystem:
    def __init__(self):
        self.data_processor = OHLCVDataProcessor()
        self.tick_processor = TickDataProcessor()
        self.backtest_engine = AdvancedBacktestEngine()
        self.performance_analyzer = PerformanceAnalyzer()
        self.monte_carlo = MonteCarloSimulation()

    def run_complete_backtest(self, data_source, strategy, config):
        """完全なバックテストの実行"""
        # 1. データの準備
        if config['data_type'] == 'ohlcv':
            data, validation = self.data_processor.validate_and_clean_data(data_source)
        else:  # tick data
            tick_data = self.tick_processor.process_tick_data(data_source)
            data = self.tick_processor.convert_to_bars(
                tick_data,
                config['bar_type'],
                config['bar_size']
            )

        # 2. データ分割(訓練・検証・テスト)
        train_end = int(len(data) * 0.6)
        val_end = int(len(data) * 0.8)

        train_data = data.iloc[:train_end]
        val_data = data.iloc[train_end:val_end]
        test_data = data.iloc[val_end:]

        # 3. パラメータ最適化(訓練データ)
        if config.get('optimize_params', True):
            best_params = self.optimize_strategy_params(
                train_data,
                strategy.__class__,
                config['param_grid']
            )
            strategy.update_params(best_params)

        # 4. 検証データでの評価
        val_results = self.backtest_engine.run_backtest(val_data, strategy)

        # 5. ウォークフォワード分析
        if config.get('walk_forward', True):
            wf_analysis = WalkForwardAnalysis(self.backtest_engine)
            wf_results = wf_analysis.run_walk_forward(
                data,
                strategy.__class__,
                config['param_grid']
            )

        # 6. 最終バックテスト(テストデータ)
        test_results = self.backtest_engine.run_backtest(test_data, strategy)

        # 7. パフォーマンス分析
        performance = self.performance_analyzer.comprehensive_analysis(
            test_results['equity_curve'],
            test_results['trades'],
            test_data
        )

        # 8. モンテカルロシミュレーション
        returns = test_results['equity_curve']['equity'].pct_change().dropna()
        monte_carlo_results = self.monte_carlo.run_monte_carlo(returns)

        # 9. 結果の統合
        final_results = {
            'validation_results': val_results,
            'test_results': test_results,
            'walk_forward_results': wf_results if config.get('walk_forward', True) else None,
            'performance_analysis': performance,
            'monte_carlo_simulation': monte_carlo_results,
            'data_validation': validation if config['data_type'] == 'ohlcv' else None
        }

        # 10. レポート生成
        self.generate_backtest_report(final_results, config)

        return final_results

    def generate_backtest_report(self, results, config):
        """バックテストレポートの生成"""
        import matplotlib.pyplot as plt
        import seaborn as sns

        fig, axes = plt.subplots(3, 2, figsize=(15, 12))

        # 1. エクイティカーブ
        ax = axes[0, 0]
        results['test_results']['equity_curve']['equity'].plot(ax=ax)
        ax.set_title('Equity Curve')
        ax.set_ylabel('Portfolio Value')

        # 2. ドローダウン
        ax = axes[0, 1]
        equity = results['test_results']['equity_curve']['equity']
        drawdown = (equity - equity.cummax()) / equity.cummax() * 100
        drawdown.plot(ax=ax, color='red')
        ax.set_title('Drawdown')
        ax.set_ylabel('Drawdown %')

        # 3. 月次リターン
        ax = axes[1, 0]
        monthly_returns = equity.resample('M').last().pct_change() * 100
        monthly_returns.plot(kind='bar', ax=ax)
        ax.set_title('Monthly Returns')
        ax.set_ylabel('Return %')

        # 4. リターン分布
        ax = axes[1, 1]
        returns = equity.pct_change().dropna()
        returns.hist(bins=50, ax=ax)
        ax.set_title('Return Distribution')
        ax.set_xlabel('Daily Return')

        # 5. モンテカルロ結果
        ax = axes[2, 0]
        mc_returns = [r['final_return'] for r in results['monte_carlo_simulation']['paths'][:100]]
        ax.hist(mc_returns, bins=30)
        ax.set_title('Monte Carlo Final Returns')
        ax.set_xlabel('Final Return')

        # 6. パフォーマンス指標
        ax = axes[2, 1]
        ax.axis('off')
        metrics_text = self.format_metrics_text(results['performance_analysis'])
        ax.text(0.1, 0.9, metrics_text, transform=ax.transAxes, verticalalignment='top', fontfamily='monospace')

        plt.tight_layout()
        plt.savefig(f'backtest_report_{config["strategy_name"]}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.png')
        plt.close()

    def format_metrics_text(self, performance):
        """指標のテキストフォーマット"""
        basic = performance['basic_stats']
        risk = performance['risk_metrics']

        text = f"""Performance Metrics:
Total Return: {basic['total_return']:.2f}%
CAGR: {basic['cagr']:.2f}%
Sharpe Ratio: {basic['sharpe_ratio']:.2f}
Sortino Ratio: {basic['sortino_ratio']:.2f}
Max Drawdown: {performance['drawdown_stats']['max_drawdown']:.2f}%
VaR (95%): {risk['var_95']:.2f}%
Kelly Criterion: {risk['kelly_criterion']:.2%}
"""
        return text

まとめ

暗号通貨のバックテストには以下の要素が重要です:

  1. データ品質: ティックデータの処理、異常値除去、欠損値補完
  2. 市場特性の考慮: 24時間取引、高ボラティリティ、取引所差異
  3. リアリスティックな実行: スリッページ、市場影響、流動性制約
  4. 包括的な評価: リスク調整後リターン、ドローダウン分析、モンテカルロ
  5. ロバスト性検証: ウォークフォワード分析、アウトオブサンプルテスト

これらを適切に実装することで、実際の取引により近い条件でのバックテストが可能になります。