目次
暗号通貨価格予測のバックテスト手法
概要
暗号通貨市場は24時間365日稼働し、高頻度で価格が変動します。本ドキュメントでは、暗号通貨特有の市場特性を考慮したバックテスト手法と、ティックデータなどの不規則時系列データの扱い方について解説します。
1. 暗号通貨市場の特性とバックテストの課題
1.1 市場特性
- 24/7取引: 従来の金融市場と異なり休場日がない
- 高ボラティリティ: 日次10%以上の価格変動も珍しくない
- 流動性の偏り: 取引所や時間帯による流動性の大きな差
- 市場の未成熟性: 規制や市場構造の急速な変化
- 複数取引所: 同一資産でも取引所間で価格差が存在
1.2 バックテストの主要な課題
# バックテストで考慮すべき課題
BACKTESTING_CHALLENGES = {
'data_quality': {
'issues': ['欠損データ', '異常値', '取引所ダウンタイム'],
'impact': 'パフォーマンスの過大評価'
},
'market_microstructure': {
'issues': ['スプレッド', 'スリッページ', '注文板の深さ'],
'impact': '実際の取引コストの過小評価'
},
'survivorship_bias': {
'issues': ['廃止された取引所', '上場廃止トークン'],
'impact': 'リターンの過大評価'
},
'lookahead_bias': {
'issues': ['未来情報の使用', 'データの遅延'],
'impact': '非現実的な取引シグナル'
}
}
2. 時系列データの前処理
2.1 規則的時系列データ(OHLCV)の処理
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
class OHLCVDataProcessor:
def __init__(self, timeframe='1h'):
self.timeframe = timeframe
self.required_columns = ['open', 'high', 'low', 'close', 'volume']
def validate_and_clean_data(self, df):
"""OHLCVデータの検証とクリーニング"""
# タイムスタンプのインデックス化
df = df.set_index('timestamp') if 'timestamp' in df.columns else df
df.index = pd.to_datetime(df.index)
# 重複の削除
df = df[~df.index.duplicated(keep='first')]
# ソート
df = df.sort_index()
# 基本的な検証
validation_results = {
'missing_values': df.isnull().sum(),
'negative_prices': (df[['open', 'high', 'low', 'close']] < 0).sum(),
'volume_issues': (df['volume'] < 0).sum(),
'ohlc_consistency': self.check_ohlc_consistency(df)
}
# データクリーニング
df = self.fix_data_issues(df, validation_results)
return df, validation_results
def check_ohlc_consistency(self, df):
"""OHLC関係の整合性チェック"""
issues = {
'high_low': (df['high'] < df['low']).sum(),
'high_open': (df['high'] < df['open']).sum(),
'high_close': (df['high'] < df['close']).sum(),
'low_open': (df['low'] > df['open']).sum(),
'low_close': (df['low'] > df['close']).sum()
}
return issues
def fix_data_issues(self, df, validation_results):
"""データの問題を修正"""
# OHLC整合性の修正
df.loc[df['high'] < df['low'], 'high'] = df['low']
df.loc[df['high'] < df['open'], 'high'] = df['open']
df.loc[df['high'] < df['close'], 'high'] = df['close']
df.loc[df['low'] > df['open'], 'low'] = df['open']
df.loc[df['low'] > df['close'], 'low'] = df['close']
# 欠損値の処理
df = self.handle_missing_values(df)
return df
def handle_missing_values(self, df):
"""欠損値の処理"""
# 前方補完を基本とし、最初の値は後方補完
df = df.fillna(method='ffill').fillna(method='bfill')
# ボリュームの欠損は0で補完
df['volume'] = df['volume'].fillna(0)
return df
def resample_to_timeframe(self, df, target_timeframe):
"""異なる時間枠へのリサンプリング"""
resampling_rules = {
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': 'sum'
}
resampled = df.resample(target_timeframe).agg(resampling_rules)
# 取引がない期間の処理
resampled = self.fill_no_trade_periods(resampled)
return resampled
def fill_no_trade_periods(self, df):
"""取引がない期間の処理"""
# 前の終値で埋める
df['open'] = df['open'].fillna(df['close'].shift(1))
df['high'] = df['high'].fillna(df['close'].shift(1))
df['low'] = df['low'].fillna(df['close'].shift(1))
df['close'] = df['close'].fillna(method='ffill')
df['volume'] = df['volume'].fillna(0)
return df
2.2 不規則時系列データ(ティックデータ)の処理
class TickDataProcessor:
def __init__(self):
self.tick_columns = ['timestamp', 'price', 'volume', 'side']
def process_tick_data(self, ticks):
"""ティックデータの処理"""
df = pd.DataFrame(ticks)
df['timestamp'] = pd.to_datetime(df['timestamp'])
df = df.sort_values('timestamp')
# マイクロ秒単位の重複処理
df = self.handle_microsecond_duplicates(df)
# 異常値の検出と処理
df = self.detect_and_handle_outliers(df)
return df
def handle_microsecond_duplicates(self, df):
"""マイクロ秒単位の重複処理"""
# 同一タイムスタンプの取引を集約
aggregation = {
'price': 'mean', # VWAP的なアプローチ
'volume': 'sum',
'side': lambda x: x.mode()[0] if len(x.mode()) > 0 else x.iloc[0]
}
df = df.groupby('timestamp').agg(aggregation).reset_index()
return df
def detect_and_handle_outliers(self, df):
"""異常値の検出と処理"""
# ローリングウィンドウでの標準偏差計算
window_size = 100
df['price_mean'] = df['price'].rolling(window=window_size, center=True).mean()
df['price_std'] = df['price'].rolling(window=window_size, center=True).std()
# 3シグマを超える値を異常値とする
df['is_outlier'] = np.abs(df['price'] - df['price_mean']) > 3 * df['price_std']
# 異常値の処理(前後の平均値で置換)
outlier_indices = df[df['is_outlier']].index
for idx in outlier_indices:
if idx > 0 and idx < len(df) - 1:
df.loc[idx, 'price'] = (df.loc[idx-1, 'price'] + df.loc[idx+1, 'price']) / 2
# 一時列の削除
df = df.drop(columns=['price_mean', 'price_std', 'is_outlier'])
return df
def convert_to_bars(self, tick_df, bar_type='time', bar_size=60):
"""ティックデータからバーデータへの変換"""
if bar_type == 'time':
return self.create_time_bars(tick_df, bar_size)
elif bar_type == 'tick':
return self.create_tick_bars(tick_df, bar_size)
elif bar_type == 'volume':
return self.create_volume_bars(tick_df, bar_size)
elif bar_type == 'dollar':
return self.create_dollar_bars(tick_df, bar_size)
else:
raise ValueError(f"Unknown bar type: {bar_type}")
def create_time_bars(self, df, seconds):
"""時間バーの作成"""
df.set_index('timestamp', inplace=True)
ohlc = df['price'].resample(f'{seconds}S').ohlc()
volume = df['volume'].resample(f'{seconds}S').sum()
bars = pd.concat([ohlc, volume], axis=1)
bars = bars.dropna()
return bars
def create_tick_bars(self, df, tick_count):
"""ティックバーの作成(固定数のティックごと)"""
bars = []
for i in range(0, len(df), tick_count):
subset = df.iloc[i:i+tick_count]
if len(subset) > 0:
bar = {
'timestamp': subset.iloc[-1]['timestamp'],
'open': subset.iloc[0]['price'],
'high': subset['price'].max(),
'low': subset['price'].min(),
'close': subset.iloc[-1]['price'],
'volume': subset['volume'].sum(),
'tick_count': len(subset)
}
bars.append(bar)
return pd.DataFrame(bars).set_index('timestamp')
def create_volume_bars(self, df, volume_threshold):
"""ボリュームバーの作成(固定ボリュームごと)"""
bars = []
current_volume = 0
current_bar = []
for _, row in df.iterrows():
current_bar.append(row)
current_volume += row['volume']
if current_volume >= volume_threshold:
bar_df = pd.DataFrame(current_bar)
bar = {
'timestamp': bar_df.iloc[-1]['timestamp'],
'open': bar_df.iloc[0]['price'],
'high': bar_df['price'].max(),
'low': bar_df['price'].min(),
'close': bar_df.iloc[-1]['price'],
'volume': bar_df['volume'].sum(),
'tick_count': len(bar_df)
}
bars.append(bar)
# リセット
current_volume = 0
current_bar = []
return pd.DataFrame(bars).set_index('timestamp')
def create_dollar_bars(self, df, dollar_threshold):
"""ドルバーの作成(固定金額ごと)"""
bars = []
current_dollars = 0
current_bar = []
for _, row in df.iterrows():
current_bar.append(row)
current_dollars += row['price'] * row['volume']
if current_dollars >= dollar_threshold:
bar_df = pd.DataFrame(current_bar)
bar = {
'timestamp': bar_df.iloc[-1]['timestamp'],
'open': bar_df.iloc[0]['price'],
'high': bar_df['price'].max(),
'low': bar_df['price'].min(),
'close': bar_df.iloc[-1]['price'],
'volume': bar_df['volume'].sum(),
'dollar_volume': current_dollars,
'tick_count': len(bar_df)
}
bars.append(bar)
# リセット
current_dollars = 0
current_bar = []
return pd.DataFrame(bars).set_index('timestamp')
3. バックテストフレームワーク
3.1 基本的なバックテストエンジン
class BacktestEngine:
def __init__(self, initial_capital=10000, commission=0.001, slippage=0.0005):
self.initial_capital = initial_capital
self.commission = commission # 0.1%
self.slippage = slippage # 0.05%
self.positions = {}
self.trades = []
self.equity_curve = []
def run_backtest(self, data, strategy, start_date=None, end_date=None):
"""バックテストの実行"""
# データの期間フィルタリング
if start_date:
data = data[data.index >= start_date]
if end_date:
data = data[data.index <= end_date]
# 初期化
self.reset()
capital = self.initial_capital
position = 0
# メインループ
for timestamp, row in data.iterrows():
# シグナル生成
signal = strategy.generate_signal(data.loc[:timestamp])
# 取引実行
if signal != 0 and signal != position:
capital, position = self.execute_trade(
timestamp, row, signal, capital, position
)
# 資産価値の記録
equity = self.calculate_equity(capital, position, row['close'])
self.equity_curve.append({
'timestamp': timestamp,
'equity': equity,
'capital': capital,
'position_value': position * row['close'] if position else 0
})
return self.generate_results()
def execute_trade(self, timestamp, row, signal, capital, position):
"""取引の実行"""
price = row['close']
# スリッページの適用
if signal > position: # 買い
execution_price = price * (1 + self.slippage)
else: # 売り
execution_price = price * (1 - self.slippage)
# ポジションサイズの計算
if signal == 1 and position == 0: # 新規買い
size = (capital * 0.95) / execution_price # 資金の95%を使用
cost = size * execution_price * (1 + self.commission)
if cost <= capital:
capital -= cost
position = size
self.record_trade('BUY', timestamp, execution_price, size, cost)
elif signal == -1 and position > 0: # 売り(ポジションクローズ)
proceeds = position * execution_price * (1 - self.commission)
capital += proceeds
self.record_trade('SELL', timestamp, execution_price, position, proceeds)
position = 0
return capital, position
def calculate_equity(self, capital, position, current_price):
"""総資産価値の計算"""
return capital + (position * current_price if position else 0)
def record_trade(self, side, timestamp, price, size, value):
"""取引の記録"""
self.trades.append({
'timestamp': timestamp,
'side': side,
'price': price,
'size': size,
'value': value,
'commission': value * self.commission
})
def generate_results(self):
"""バックテスト結果の生成"""
equity_df = pd.DataFrame(self.equity_curve).set_index('timestamp')
trades_df = pd.DataFrame(self.trades)
# パフォーマンス指標の計算
metrics = self.calculate_performance_metrics(equity_df, trades_df)
return {
'equity_curve': equity_df,
'trades': trades_df,
'metrics': metrics
}
def calculate_performance_metrics(self, equity_df, trades_df):
"""パフォーマンス指標の計算"""
returns = equity_df['equity'].pct_change().dropna()
metrics = {
'total_return': (equity_df['equity'].iloc[-1] / self.initial_capital - 1) * 100,
'annualized_return': self.calculate_annualized_return(returns),
'sharpe_ratio': self.calculate_sharpe_ratio(returns),
'max_drawdown': self.calculate_max_drawdown(equity_df['equity']),
'win_rate': self.calculate_win_rate(trades_df),
'profit_factor': self.calculate_profit_factor(trades_df),
'total_trades': len(trades_df),
'avg_trade_duration': self.calculate_avg_trade_duration(trades_df)
}
return metrics
def calculate_annualized_return(self, returns):
"""年率リターンの計算"""
days = len(returns)
if days == 0:
return 0
total_return = (1 + returns).prod() - 1
return (1 + total_return) ** (365 / days) - 1
def calculate_sharpe_ratio(self, returns, risk_free_rate=0.02):
"""シャープレシオの計算"""
if len(returns) == 0 or returns.std() == 0:
return 0
excess_returns = returns - risk_free_rate / 365
return np.sqrt(365) * excess_returns.mean() / returns.std()
def calculate_max_drawdown(self, equity_series):
"""最大ドローダウンの計算"""
cumulative = (1 + equity_series.pct_change()).cumprod()
running_max = cumulative.cummax()
drawdown = (cumulative - running_max) / running_max
return drawdown.min() * 100
def calculate_win_rate(self, trades_df):
"""勝率の計算"""
if len(trades_df) < 2:
return 0
# ペアごとの損益計算
profits = []
for i in range(0, len(trades_df) - 1, 2):
if i + 1 < len(trades_df):
buy_trade = trades_df.iloc[i]
sell_trade = trades_df.iloc[i + 1]
profit = sell_trade['value'] - buy_trade['value']
profits.append(profit)
if not profits:
return 0
wins = sum(1 for p in profits if p > 0)
return (wins / len(profits)) * 100
3.2 高度なバックテスト機能
class AdvancedBacktestEngine(BacktestEngine):
def __init__(self, initial_capital=10000, commission=0.001, slippage=0.0005):
super().__init__(initial_capital, commission, slippage)
self.order_book_impact = OrderBookImpact()
self.market_impact_model = MarketImpactModel()
def execute_trade_with_market_impact(self, timestamp, row, signal, capital, position, order_book):
"""市場影響を考慮した取引実行"""
price = row['close']
# 注文サイズの計算
if signal == 1 and position == 0:
target_size = (capital * 0.95) / price
# 市場影響の推定
impact = self.market_impact_model.estimate_impact(
target_size,
order_book,
row['volume']
)
# 実行価格の調整
execution_price = price * (1 + impact + self.slippage)
# 実際に実行可能なサイズ
executable_size = self.order_book_impact.calculate_executable_size(
target_size,
order_book,
execution_price
)
cost = executable_size * execution_price * (1 + self.commission)
if cost <= capital:
capital -= cost
position = executable_size
self.record_trade_with_impact(
'BUY', timestamp, execution_price, executable_size, cost, impact
)
return capital, position
def simulate_limit_order(self, order_price, order_size, market_data, order_book):
"""指値注文のシミュレーション"""
filled_size = 0
filled_value = 0
# 注文が約定するかチェック
if order_price >= market_data['low'] and order_price <= market_data['high']:
# 部分約定の可能性を考慮
available_liquidity = self.estimate_liquidity_at_price(
order_price,
order_book,
market_data['volume']
)
filled_size = min(order_size, available_liquidity)
filled_value = filled_size * order_price
return filled_size, filled_value
class OrderBookImpact:
"""注文板への影響モデル"""
def calculate_executable_size(self, target_size, order_book, max_price):
"""実行可能なサイズの計算"""
if not order_book or 'asks' not in order_book:
return target_size * 0.5 # デフォルトで50%
executable = 0
total_cost = 0
for ask in order_book['asks']:
price, size = ask
if price > max_price:
break
take_size = min(target_size - executable, size)
executable += take_size
total_cost += take_size * price
if executable >= target_size:
break
return executable
class MarketImpactModel:
"""市場影響モデル"""
def __init__(self, permanent_impact=0.1, temporary_impact=0.5):
self.permanent_impact = permanent_impact
self.temporary_impact = temporary_impact
def estimate_impact(self, order_size, order_book, daily_volume):
"""市場影響の推定"""
# 参加率(order size / daily volume)
participation_rate = order_size / max(daily_volume, 1)
# 平方根モデル
permanent = self.permanent_impact * np.sqrt(participation_rate)
temporary = self.temporary_impact * participation_rate
return permanent + temporary
3.3 ウォークフォワード分析
class WalkForwardAnalysis:
def __init__(self, backtest_engine, optimization_window=180, test_window=30):
self.backtest_engine = backtest_engine
self.optimization_window = optimization_window # 日数
self.test_window = test_window
self.results = []
def run_walk_forward(self, data, strategy_class, param_grid):
"""ウォークフォワード分析の実行"""
total_days = (data.index[-1] - data.index[0]).days
# ウィンドウをスライドさせながら分析
start_date = data.index[0]
while start_date + timedelta(days=self.optimization_window + self.test_window) <= data.index[-1]:
# 最適化期間
opt_start = start_date
opt_end = start_date + timedelta(days=self.optimization_window)
# テスト期間
test_start = opt_end
test_end = test_start + timedelta(days=self.test_window)
# パラメータ最適化
best_params = self.optimize_parameters(
data[opt_start:opt_end],
strategy_class,
param_grid
)
# テスト期間でのバックテスト
strategy = strategy_class(**best_params)
test_results = self.backtest_engine.run_backtest(
data[test_start:test_end],
strategy
)
self.results.append({
'optimization_period': (opt_start, opt_end),
'test_period': (test_start, test_end),
'best_params': best_params,
'test_results': test_results
})
# 次のウィンドウへ
start_date += timedelta(days=self.test_window)
return self.analyze_results()
def optimize_parameters(self, data, strategy_class, param_grid):
"""パラメータの最適化"""
best_score = -np.inf
best_params = None
# グリッドサーチ
for params in self.generate_param_combinations(param_grid):
strategy = strategy_class(**params)
results = self.backtest_engine.run_backtest(data, strategy)
# 評価指標(シャープレシオを使用)
score = results['metrics']['sharpe_ratio']
if score > best_score:
best_score = score
best_params = params
return best_params
def analyze_results(self):
"""ウォークフォワード結果の分析"""
# 各期間の結果を集約
all_metrics = []
for result in self.results:
metrics = result['test_results']['metrics']
metrics['period'] = result['test_period']
all_metrics.append(metrics)
metrics_df = pd.DataFrame(all_metrics)
# 統計サマリー
summary = {
'avg_return': metrics_df['total_return'].mean(),
'std_return': metrics_df['total_return'].std(),
'avg_sharpe': metrics_df['sharpe_ratio'].mean(),
'consistency': (metrics_df['total_return'] > 0).mean() * 100,
'worst_period': metrics_df['total_return'].min(),
'best_period': metrics_df['total_return'].max()
}
return {
'detailed_results': self.results,
'metrics_summary': summary,
'metrics_dataframe': metrics_df
}
4. 暗号通貨特有の考慮事項
4.1 取引所別バックテスト
class MultiExchangeBacktest:
def __init__(self, exchanges=['binance', 'coinbase', 'kraken']):
self.exchanges = exchanges
self.arbitrage_tracker = ArbitrageTracker()
def run_multi_exchange_backtest(self, data_dict, strategy):
"""複数取引所でのバックテスト"""
results = {}
for exchange in self.exchanges:
if exchange in data_dict:
engine = BacktestEngine(
commission=self.get_exchange_fees(exchange)
)
results[exchange] = engine.run_backtest(
data_dict[exchange],
strategy
)
# アービトラージ機会の分析
arbitrage_opps = self.arbitrage_tracker.find_opportunities(data_dict)
return {
'exchange_results': results,
'arbitrage_opportunities': arbitrage_opps,
'best_exchange': self.find_best_exchange(results)
}
def get_exchange_fees(self, exchange):
"""取引所別手数料"""
fees = {
'binance': 0.001,
'coinbase': 0.005,
'kraken': 0.0026,
'ftx': 0.0007 # 過去のデータ用
}
return fees.get(exchange, 0.002)
class ArbitrageTracker:
def find_opportunities(self, data_dict):
"""アービトラージ機会の検出"""
opportunities = []
timestamps = self.get_common_timestamps(data_dict)
for ts in timestamps:
prices = {}
for exchange, data in data_dict.items():
if ts in data.index:
prices[exchange] = data.loc[ts, 'close']
if len(prices) >= 2:
max_exchange = max(prices, key=prices.get)
min_exchange = min(prices, key=prices.get)
spread = (prices[max_exchange] - prices[min_exchange]) / prices[min_exchange]
if spread > 0.002: # 0.2%以上の差
opportunities.append({
'timestamp': ts,
'buy_exchange': min_exchange,
'sell_exchange': max_exchange,
'spread_percentage': spread * 100,
'buy_price': prices[min_exchange],
'sell_price': prices[max_exchange]
})
return opportunities
4.2 ステーブルコイン考慮
class StablecoinAwareBacktest:
def __init__(self, stable_coins=['USDT', 'USDC', 'BUSD', 'DAI']):
self.stable_coins = stable_coins
self.depeg_threshold = 0.02 # 2%
def check_stablecoin_depeg(self, stablecoin_data):
"""ステーブルコインのデペッグ検出"""
depeg_events = []
for coin in self.stable_coins:
if coin in stablecoin_data:
data = stablecoin_data[coin]
# $1からの乖離をチェック
deviations = np.abs(data['close'] - 1.0)
depeg_mask = deviations > self.depeg_threshold
if depeg_mask.any():
depeg_periods = self.find_continuous_periods(depeg_mask)
for period in depeg_periods:
depeg_events.append({
'coin': coin,
'start': period['start'],
'end': period['end'],
'max_deviation': deviations[period['start']:period['end']].max(),
'duration_hours': (period['end'] - period['start']).total_seconds() / 3600
})
return depeg_events
def adjust_for_stablecoin_risk(self, backtest_results, depeg_events):
"""ステーブルコインリスクの調整"""
# デペッグ期間中の取引に追加コストを適用
adjusted_trades = []
for trade in backtest_results['trades']:
trade_time = trade['timestamp']
additional_cost = 0
# デペッグ期間中かチェック
for event in depeg_events:
if event['start'] <= trade_time <= event['end']:
# デペッグの深刻度に応じてコスト追加
additional_cost = trade['value'] * event['max_deviation'] * 0.5
break
trade['adjusted_value'] = trade['value'] - additional_cost
adjusted_trades.append(trade)
return adjusted_trades
4.3 ガス代・ネットワーク手数料の考慮
class NetworkFeeBacktest:
def __init__(self):
self.gas_price_history = {}
self.network_fees = {
'ethereum': self.calculate_eth_gas_fee,
'bsc': self.calculate_bsc_fee,
'polygon': self.calculate_polygon_fee
}
def load_gas_price_history(self, network, data):
"""ガス価格履歴の読み込み"""
self.gas_price_history[network] = data
def calculate_eth_gas_fee(self, timestamp, transaction_type='swap'):
"""イーサリアムのガス代計算"""
gas_units = {
'transfer': 21000,
'swap': 150000,
'add_liquidity': 200000,
'remove_liquidity': 150000
}
if timestamp in self.gas_price_history.get('ethereum', {}):
gas_price_gwei = self.gas_price_history['ethereum'][timestamp]
gas_cost_eth = (gas_units[transaction_type] * gas_price_gwei) / 1e9
# ETH価格を使ってUSD換算
eth_price = self.get_eth_price(timestamp)
return gas_cost_eth * eth_price
return 10 # デフォルト値 $10
def apply_network_fees(self, trades, network='ethereum'):
"""ネットワーク手数料の適用"""
adjusted_trades = []
for trade in trades:
if network in self.network_fees:
network_fee = self.network_fees[network](
trade['timestamp'],
'swap'
)
trade['network_fee'] = network_fee
trade['net_value'] = trade['value'] - trade['commission'] - network_fee
else:
trade['network_fee'] = 0
trade['net_value'] = trade['value'] - trade['commission']
adjusted_trades.append(trade)
return adjusted_trades
5. パフォーマンス評価と最適化
5.1 詳細なパフォーマンス分析
class PerformanceAnalyzer:
def __init__(self):
self.risk_free_rate = 0.02 # 2%年率
def comprehensive_analysis(self, equity_curve, trades, market_data):
"""包括的なパフォーマンス分析"""
returns = equity_curve['equity'].pct_change().dropna()
# 基本統計
basic_stats = {
'total_return': (equity_curve['equity'].iloc[-1] / equity_curve['equity'].iloc[0] - 1) * 100,
'cagr': self.calculate_cagr(equity_curve),
'volatility': returns.std() * np.sqrt(365),
'sharpe_ratio': self.calculate_sharpe_ratio(returns),
'sortino_ratio': self.calculate_sortino_ratio(returns),
'calmar_ratio': self.calculate_calmar_ratio(equity_curve)
}
# ドローダウン分析
drawdown_stats = self.analyze_drawdowns(equity_curve)
# 取引分析
trade_stats = self.analyze_trades(trades)
# 市場相関
market_correlation = self.calculate_market_correlation(returns, market_data)
# リスク指標
risk_metrics = {
'var_95': self.calculate_var(returns, 0.95),
'cvar_95': self.calculate_cvar(returns, 0.95),
'max_leverage': self.calculate_max_leverage(equity_curve),
'kelly_criterion': self.calculate_kelly_criterion(trade_stats)
}
return {
'basic_stats': basic_stats,
'drawdown_stats': drawdown_stats,
'trade_stats': trade_stats,
'market_correlation': market_correlation,
'risk_metrics': risk_metrics
}
def analyze_drawdowns(self, equity_curve):
"""ドローダウンの詳細分析"""
equity = equity_curve['equity']
running_max = equity.cummax()
drawdown = (equity - running_max) / running_max
# ドローダウン期間の特定
drawdown_periods = []
in_drawdown = False
start_idx = None
for i, dd in enumerate(drawdown):
if dd < 0 and not in_drawdown:
in_drawdown = True
start_idx = i
elif dd == 0 and in_drawdown:
in_drawdown = False
end_idx = i
period_dd = drawdown[start_idx:end_idx]
drawdown_periods.append({
'start': equity.index[start_idx],
'end': equity.index[end_idx],
'max_drawdown': period_dd.min() * 100,
'duration_days': (equity.index[end_idx] - equity.index[start_idx]).days,
'recovery_days': end_idx - start_idx - period_dd.argmin()
})
# 統計サマリー
if drawdown_periods:
stats = {
'max_drawdown': min(p['max_drawdown'] for p in drawdown_periods),
'avg_drawdown': np.mean([p['max_drawdown'] for p in drawdown_periods]),
'max_duration': max(p['duration_days'] for p in drawdown_periods),
'avg_duration': np.mean([p['duration_days'] for p in drawdown_periods]),
'total_underwater_time': sum(p['duration_days'] for p in drawdown_periods),
'drawdown_periods': drawdown_periods
}
else:
stats = {
'max_drawdown': 0,
'avg_drawdown': 0,
'max_duration': 0,
'avg_duration': 0,
'total_underwater_time': 0,
'drawdown_periods': []
}
return stats
def calculate_sortino_ratio(self, returns, target_return=0):
"""ソルティノレシオの計算"""
excess_returns = returns - target_return / 365
downside_returns = excess_returns[excess_returns < 0]
if len(downside_returns) == 0:
return np.inf
downside_std = np.sqrt(np.mean(downside_returns ** 2))
if downside_std == 0:
return np.inf
return np.sqrt(365) * excess_returns.mean() / downside_std
def calculate_kelly_criterion(self, trade_stats):
"""ケリー基準の計算"""
if trade_stats['total_trades'] == 0:
return 0
win_rate = trade_stats['win_rate'] / 100
avg_win = trade_stats.get('avg_win', 0)
avg_loss = abs(trade_stats.get('avg_loss', 1))
if avg_loss == 0:
return 0
odds = avg_win / avg_loss
kelly = (win_rate * odds - (1 - win_rate)) / odds
# 安全のため1/4ケリーを推奨
return max(0, min(kelly / 4, 0.25))
5.2 モンテカルロシミュレーション
class MonteCarloSimulation:
def __init__(self, n_simulations=1000):
self.n_simulations = n_simulations
def run_monte_carlo(self, historical_returns, n_days=252):
"""モンテカルロシミュレーションの実行"""
results = []
# リターンの統計量
mean_return = historical_returns.mean()
std_return = historical_returns.std()
for _ in range(self.n_simulations):
# ランダムウォーク
simulated_returns = np.random.normal(
mean_return,
std_return,
n_days
)
# 累積リターン
cumulative_return = (1 + simulated_returns).cumprod()
results.append({
'final_return': cumulative_return[-1] - 1,
'max_drawdown': self.calculate_max_dd(cumulative_return),
'volatility': simulated_returns.std() * np.sqrt(252),
'path': cumulative_return
})
return self.analyze_simulation_results(results)
def analyze_simulation_results(self, results):
"""シミュレーション結果の分析"""
final_returns = [r['final_return'] for r in results]
max_drawdowns = [r['max_drawdown'] for r in results]
analysis = {
'return_percentiles': {
'5%': np.percentile(final_returns, 5),
'25%': np.percentile(final_returns, 25),
'50%': np.percentile(final_returns, 50),
'75%': np.percentile(final_returns, 75),
'95%': np.percentile(final_returns, 95)
},
'drawdown_percentiles': {
'5%': np.percentile(max_drawdowns, 5),
'25%': np.percentile(max_drawdowns, 25),
'50%': np.percentile(max_drawdowns, 50),
'75%': np.percentile(max_drawdowns, 75),
'95%': np.percentile(max_drawdowns, 95)
},
'probability_of_loss': sum(1 for r in final_returns if r < 0) / len(final_returns),
'expected_return': np.mean(final_returns),
'return_std': np.std(final_returns)
}
return analysis
def bootstrap_confidence_intervals(self, returns, statistic_func, confidence=0.95):
"""ブートストラップ信頼区間"""
bootstrap_stats = []
for _ in range(self.n_simulations):
# リサンプリング
sample = np.random.choice(returns, size=len(returns), replace=True)
stat = statistic_func(sample)
bootstrap_stats.append(stat)
# 信頼区間
alpha = 1 - confidence
lower = np.percentile(bootstrap_stats, alpha/2 * 100)
upper = np.percentile(bootstrap_stats, (1 - alpha/2) * 100)
return {
'mean': np.mean(bootstrap_stats),
'std': np.std(bootstrap_stats),
'confidence_interval': (lower, upper)
}
6. 実装例:完全なバックテストシステム
class CryptoBacktestSystem:
def __init__(self):
self.data_processor = OHLCVDataProcessor()
self.tick_processor = TickDataProcessor()
self.backtest_engine = AdvancedBacktestEngine()
self.performance_analyzer = PerformanceAnalyzer()
self.monte_carlo = MonteCarloSimulation()
def run_complete_backtest(self, data_source, strategy, config):
"""完全なバックテストの実行"""
# 1. データの準備
if config['data_type'] == 'ohlcv':
data, validation = self.data_processor.validate_and_clean_data(data_source)
else: # tick data
tick_data = self.tick_processor.process_tick_data(data_source)
data = self.tick_processor.convert_to_bars(
tick_data,
config['bar_type'],
config['bar_size']
)
# 2. データ分割(訓練・検証・テスト)
train_end = int(len(data) * 0.6)
val_end = int(len(data) * 0.8)
train_data = data.iloc[:train_end]
val_data = data.iloc[train_end:val_end]
test_data = data.iloc[val_end:]
# 3. パラメータ最適化(訓練データ)
if config.get('optimize_params', True):
best_params = self.optimize_strategy_params(
train_data,
strategy.__class__,
config['param_grid']
)
strategy.update_params(best_params)
# 4. 検証データでの評価
val_results = self.backtest_engine.run_backtest(val_data, strategy)
# 5. ウォークフォワード分析
if config.get('walk_forward', True):
wf_analysis = WalkForwardAnalysis(self.backtest_engine)
wf_results = wf_analysis.run_walk_forward(
data,
strategy.__class__,
config['param_grid']
)
# 6. 最終バックテスト(テストデータ)
test_results = self.backtest_engine.run_backtest(test_data, strategy)
# 7. パフォーマンス分析
performance = self.performance_analyzer.comprehensive_analysis(
test_results['equity_curve'],
test_results['trades'],
test_data
)
# 8. モンテカルロシミュレーション
returns = test_results['equity_curve']['equity'].pct_change().dropna()
monte_carlo_results = self.monte_carlo.run_monte_carlo(returns)
# 9. 結果の統合
final_results = {
'validation_results': val_results,
'test_results': test_results,
'walk_forward_results': wf_results if config.get('walk_forward', True) else None,
'performance_analysis': performance,
'monte_carlo_simulation': monte_carlo_results,
'data_validation': validation if config['data_type'] == 'ohlcv' else None
}
# 10. レポート生成
self.generate_backtest_report(final_results, config)
return final_results
def generate_backtest_report(self, results, config):
"""バックテストレポートの生成"""
import matplotlib.pyplot as plt
import seaborn as sns
fig, axes = plt.subplots(3, 2, figsize=(15, 12))
# 1. エクイティカーブ
ax = axes[0, 0]
results['test_results']['equity_curve']['equity'].plot(ax=ax)
ax.set_title('Equity Curve')
ax.set_ylabel('Portfolio Value')
# 2. ドローダウン
ax = axes[0, 1]
equity = results['test_results']['equity_curve']['equity']
drawdown = (equity - equity.cummax()) / equity.cummax() * 100
drawdown.plot(ax=ax, color='red')
ax.set_title('Drawdown')
ax.set_ylabel('Drawdown %')
# 3. 月次リターン
ax = axes[1, 0]
monthly_returns = equity.resample('M').last().pct_change() * 100
monthly_returns.plot(kind='bar', ax=ax)
ax.set_title('Monthly Returns')
ax.set_ylabel('Return %')
# 4. リターン分布
ax = axes[1, 1]
returns = equity.pct_change().dropna()
returns.hist(bins=50, ax=ax)
ax.set_title('Return Distribution')
ax.set_xlabel('Daily Return')
# 5. モンテカルロ結果
ax = axes[2, 0]
mc_returns = [r['final_return'] for r in results['monte_carlo_simulation']['paths'][:100]]
ax.hist(mc_returns, bins=30)
ax.set_title('Monte Carlo Final Returns')
ax.set_xlabel('Final Return')
# 6. パフォーマンス指標
ax = axes[2, 1]
ax.axis('off')
metrics_text = self.format_metrics_text(results['performance_analysis'])
ax.text(0.1, 0.9, metrics_text, transform=ax.transAxes, verticalalignment='top', fontfamily='monospace')
plt.tight_layout()
plt.savefig(f'backtest_report_{config["strategy_name"]}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.png')
plt.close()
def format_metrics_text(self, performance):
"""指標のテキストフォーマット"""
basic = performance['basic_stats']
risk = performance['risk_metrics']
text = f"""Performance Metrics:
Total Return: {basic['total_return']:.2f}%
CAGR: {basic['cagr']:.2f}%
Sharpe Ratio: {basic['sharpe_ratio']:.2f}
Sortino Ratio: {basic['sortino_ratio']:.2f}
Max Drawdown: {performance['drawdown_stats']['max_drawdown']:.2f}%
VaR (95%): {risk['var_95']:.2f}%
Kelly Criterion: {risk['kelly_criterion']:.2%}
"""
return text
まとめ
暗号通貨のバックテストには以下の要素が重要です:
- データ品質: ティックデータの処理、異常値除去、欠損値補完
- 市場特性の考慮: 24時間取引、高ボラティリティ、取引所差異
- リアリスティックな実行: スリッページ、市場影響、流動性制約
- 包括的な評価: リスク調整後リターン、ドローダウン分析、モンテカルロ
- ロバスト性検証: ウォークフォワード分析、アウトオブサンプルテスト
これらを適切に実装することで、実際の取引により近い条件でのバックテストが可能になります。