目次
バックテストとリスク管理の詳細ガイド
1. はじめに
1.1 バックテストの重要性
バックテストは暗号資産取引戦略の評価において最も重要なプロセスの一つです。適切に実施されたバックテストにより、以下が可能になります:
- 戦略の有効性検証: 過去のデータで戦略がどの程度機能したかを確認
- リスク・リターン特性の把握: 期待リターン、ボラティリティ、最大ドローダウンの測定
- パラメータ最適化: 戦略パラメータの調整とオーバーフィッティングの回避
- リスク管理ルールの検証: ストップロス、ポジションサイジングの効果測定
1.2 暗号資産特有の課題
暗号資産のバックテストには特有の課題があります:
- 24/7市場: 従来の株式市場と異なり、常時取引が行われる
- 高ボラティリティ: 急激な価格変動がリスク管理に与える影響
- 流動性の変動: 取引量や板の厚さの時間的変化
- 取引所の多様性: 複数の取引所間での価格差やスリッページ
- 技術的要因: ネットワーク遅延、APIの制限、取引所のメンテナンス
2. バックテストエンジンの設計
2.1 イベント駆動型バックテストエンジン
import pandas as pd
import numpy as np
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from abc import ABC, abstractmethod
from enum import Enum
import logging
from datetime import datetime, timedelta
import warnings
class OrderType(Enum):
MARKET = "market"
LIMIT = "limit"
STOP = "stop"
STOP_LIMIT = "stop_limit"
class OrderSide(Enum):
BUY = "buy"
SELL = "sell"
@dataclass
class Order:
"""注文オブジェクト"""
id: str
symbol: str
side: OrderSide
order_type: OrderType
quantity: float
price: Optional[float] = None
stop_price: Optional[float] = None
timestamp: datetime = None
status: str = "pending"
@dataclass
class Trade:
"""約定オブジェクト"""
id: str
order_id: str
symbol: str
side: OrderSide
quantity: float
price: float
timestamp: datetime
commission: float = 0.0
@dataclass
class Position:
"""ポジションオブジェクト"""
symbol: str
quantity: float
avg_price: float
unrealized_pnl: float = 0.0
realized_pnl: float = 0.0
class Event(ABC):
"""イベントの基底クラス"""
def __init__(self, timestamp: datetime):
self.timestamp = timestamp
class MarketEvent(Event):
"""市場データイベント"""
def __init__(self, timestamp: datetime, data: Dict[str, Any]):
super().__init__(timestamp)
self.data = data
class OrderEvent(Event):
"""注文イベント"""
def __init__(self, timestamp: datetime, order: Order):
super().__init__(timestamp)
self.order = order
class TradeEvent(Event):
"""約定イベント"""
def __init__(self, timestamp: datetime, trade: Trade):
super().__init__(timestamp)
self.trade = trade
class BacktestEngine:
"""イベント駆動型バックテストエンジン"""
def __init__(self,
initial_capital: float = 100000,
commission_rate: float = 0.001,
slippage_model: str = 'linear'):
self.initial_capital = initial_capital
self.commission_rate = commission_rate
self.slippage_model = slippage_model
# 状態管理
self.current_time = None
self.cash = initial_capital
self.positions = {}
self.orders = {}
self.trades = []
self.portfolio_history = []
# イベントキュー
self.events = []
self.event_handlers = {
MarketEvent: self._handle_market_event,
OrderEvent: self._handle_order_event,
TradeEvent: self._handle_trade_event
}
# 統計情報
self.stats = BacktestStats()
def add_event(self, event: Event):
"""イベントをキューに追加"""
self.events.append(event)
self.events.sort(key=lambda x: x.timestamp)
def run_backtest(self, data: pd.DataFrame, strategy):
"""バックテストの実行"""
print("Starting backtest...")
# データの前処理
data = data.sort_index()
# 各時点でのイベント処理
for timestamp, row in data.iterrows():
self.current_time = timestamp
# 市場データイベントの生成
market_event = MarketEvent(timestamp, row.to_dict())
self.add_event(market_event)
# イベントの処理
self._process_events()
# 戦略の実行
signals = strategy.generate_signals(timestamp, row, self.positions)
for signal in signals:
order = self._create_order_from_signal(signal, timestamp)
if order:
self.add_event(OrderEvent(timestamp, order))
# ポートフォリオ価値の記録
self._update_portfolio_value(row)
# イベントの再処理(注文処理など)
self._process_events()
# 最終統計の計算
self._finalize_backtest()
print("Backtest completed!")
return self.get_results()
def _process_events(self):
"""イベントキューの処理"""
while self.events:
event = self.events.pop(0)
if event.timestamp <= self.current_time:
handler = self.event_handlers.get(type(event))
if handler:
handler(event)
else:
# 未来のイベントは再度キューに戻す
self.events.insert(0, event)
break
def _handle_market_event(self, event: MarketEvent):
"""市場データイベントの処理"""
# 未決済注文の確認と約定処理
self._check_pending_orders(event.data)
# ポジションの未実現損益更新
self._update_unrealized_pnl(event.data)
def _handle_order_event(self, event: OrderEvent):
"""注文イベントの処理"""
order = event.order
self.orders[order.id] = order
# 即座に約定する条件をチェック
if self._can_execute_immediately(order):
trade = self._execute_order(order)
if trade:
self.add_event(TradeEvent(event.timestamp, trade))
def _handle_trade_event(self, event: TradeEvent):
"""約定イベントの処理"""
trade = event.trade
self.trades.append(trade)
# ポジションの更新
self._update_position(trade)
# キャッシュの更新
if trade.side == OrderSide.BUY:
self.cash -= trade.quantity * trade.price + trade.commission
else:
self.cash += trade.quantity * trade.price - trade.commission
# 注文のステータス更新
if trade.order_id in self.orders:
self.orders[trade.order_id].status = "filled"
def _can_execute_immediately(self, order: Order) -> bool:
"""注文が即座に約定可能かチェック"""
if order.order_type == OrderType.MARKET:
return True
# その他の注文タイプの条件は省略
return False
def _execute_order(self, order: Order) -> Optional[Trade]:
"""注文の約定処理"""
# スリッページの計算
executed_price = self._calculate_slippage(order)
# 手数料の計算
commission = order.quantity * executed_price * self.commission_rate
# 十分な資金があるかチェック
if order.side == OrderSide.BUY:
required_cash = order.quantity * executed_price + commission
if self.cash < required_cash:
return None
# 約定の生成
trade = Trade(
id=f"trade_{len(self.trades)+1}",
order_id=order.id,
symbol=order.symbol,
side=order.side,
quantity=order.quantity,
price=executed_price,
timestamp=self.current_time,
commission=commission
)
return trade
def _calculate_slippage(self, order: Order) -> float:
"""スリッページの計算"""
# 基本価格(現在の市場価格と仮定)
base_price = order.price if order.price else 1000 # 実際の実装では市場データから取得
if self.slippage_model == 'linear':
# 線形スリッページモデル
slippage_rate = min(0.001 * order.quantity / 1000, 0.005) # 最大0.5%
if order.side == OrderSide.BUY:
return base_price * (1 + slippage_rate)
else:
return base_price * (1 - slippage_rate)
return base_price
def _update_position(self, trade: Trade):
"""ポジションの更新"""
symbol = trade.symbol
if symbol not in self.positions:
self.positions[symbol] = Position(symbol, 0, 0)
position = self.positions[symbol]
if trade.side == OrderSide.BUY:
# 平均価格の更新
total_quantity = position.quantity + trade.quantity
if total_quantity > 0:
position.avg_price = (
(position.quantity * position.avg_price +
trade.quantity * trade.price) / total_quantity
)
position.quantity = total_quantity
else:
# 売りの場合は実現損益を計算
if position.quantity > 0:
realized_pnl = trade.quantity * (trade.price - position.avg_price)
position.realized_pnl += realized_pnl
position.quantity -= trade.quantity
# ポジションがゼロになった場合は削除
if position.quantity == 0:
del self.positions[symbol]
def _update_portfolio_value(self, market_data: Dict):
"""ポートフォリオ価値の更新"""
total_value = self.cash
for position in self.positions.values():
# 現在価格でポジション価値を計算
current_price = market_data.get(f"{position.symbol}_close", 1000)
position_value = position.quantity * current_price
total_value += position_value
# 未実現損益の更新
position.unrealized_pnl = position.quantity * (current_price - position.avg_price)
# 履歴に記録
self.portfolio_history.append({
'timestamp': self.current_time,
'total_value': total_value,
'cash': self.cash,
'positions_value': total_value - self.cash
})
def get_results(self) -> Dict:
"""バックテスト結果の取得"""
# 統計計算
df = pd.DataFrame(self.portfolio_history)
if len(df) == 0:
return {'error': 'No portfolio history available'}
df.set_index('timestamp', inplace=True)
df['returns'] = df['total_value'].pct_change()
return {
'portfolio_history': df,
'trades': self.trades,
'final_portfolio_value': df['total_value'].iloc[-1],
'total_return': (df['total_value'].iloc[-1] - self.initial_capital) / self.initial_capital,
'statistics': self._calculate_statistics(df)
}
def _calculate_statistics(self, df: pd.DataFrame) -> Dict:
"""統計指標の計算"""
returns = df['returns'].dropna()
if len(returns) == 0:
return {}
# 基本統計
total_return = (df['total_value'].iloc[-1] - df['total_value'].iloc[0]) / df['total_value'].iloc[0]
annual_return = (1 + total_return) ** (252 / len(df)) - 1
# リスク指標
volatility = returns.std() * np.sqrt(252)
sharpe_ratio = annual_return / volatility if volatility > 0 else 0
# ドローダウン
cummax = df['total_value'].cummax()
drawdown = (df['total_value'] - cummax) / cummax
max_drawdown = drawdown.min()
# 勝率
win_rate = len([t for t in self.trades if self._calculate_trade_pnl(t) > 0]) / len(self.trades) if self.trades else 0
return {
'total_return': total_return,
'annual_return': annual_return,
'volatility': volatility,
'sharpe_ratio': sharpe_ratio,
'max_drawdown': max_drawdown,
'win_rate': win_rate,
'total_trades': len(self.trades)
}
def _calculate_trade_pnl(self, trade: Trade) -> float:
"""個別取引の損益計算"""
# 簡略化された実装
return 0.0
2.2 スリッページと取引コストのモデリング
class AdvancedSlippageModel:
"""高度なスリッページモデル"""
def __init__(self, model_type: str = 'market_impact'):
self.model_type = model_type
self.calibration_data = {}
def calculate_slippage(self,
order: Order,
market_data: Dict,
orderbook_data: Optional[Dict] = None) -> float:
"""スリッページの計算"""
if self.model_type == 'linear':
return self._linear_slippage(order, market_data)
elif self.model_type == 'sqrt':
return self._sqrt_slippage(order, market_data)
elif self.model_type == 'market_impact':
return self._market_impact_slippage(order, market_data, orderbook_data)
elif self.model_type == 'orderbook':
return self._orderbook_slippage(order, orderbook_data)
return 0.0
def _linear_slippage(self, order: Order, market_data: Dict) -> float:
"""線形スリッページモデル"""
base_price = market_data.get('close', 100)
volume = market_data.get('volume', 1000000)
# 取引量に比例したスリッページ
impact = (order.quantity / volume) * 0.01 # 1%のマーケットインパクト
if order.side == OrderSide.BUY:
return base_price * (1 + impact)
else:
return base_price * (1 - impact)
def _sqrt_slippage(self, order: Order, market_data: Dict) -> float:
"""平方根スリッページモデル(Almgren-Chriss)"""
base_price = market_data.get('close', 100)
volume = market_data.get('volume', 1000000)
volatility = market_data.get('volatility', 0.02)
# 永続的インパクト
permanent_impact = 0.314 * volatility * np.sqrt(order.quantity / volume)
# 一時的インパクト
temporary_impact = 0.142 * volatility * np.sqrt(order.quantity / volume)
total_impact = permanent_impact + temporary_impact
if order.side == OrderSide.BUY:
return base_price * (1 + total_impact)
else:
return base_price * (1 - total_impact)
def _orderbook_slippage(self, order: Order, orderbook_data: Dict) -> float:
"""オーダーブックベースのスリッページ"""
if not orderbook_data:
return 0.0
if order.side == OrderSide.BUY:
asks = orderbook_data.get('asks', [])
return self._calculate_execution_price(order.quantity, asks, 'ask')
else:
bids = orderbook_data.get('bids', [])
return self._calculate_execution_price(order.quantity, bids, 'bid')
def _calculate_execution_price(self, quantity: float, levels: List, side: str) -> float:
"""板情報から実行価格を計算"""
remaining_quantity = quantity
total_cost = 0.0
for price, size in levels:
if remaining_quantity <= 0:
break
executable_quantity = min(remaining_quantity, size)
total_cost += executable_quantity * price
remaining_quantity -= executable_quantity
if remaining_quantity > 0:
# 板が薄い場合は最後の価格で残りを約定
last_price = levels[-1][0] if levels else 100
total_cost += remaining_quantity * last_price
return total_cost / quantity
class TransactionCostModel:
"""取引コストモデル"""
def __init__(self, config: Dict):
self.maker_fee = config.get('maker_fee', 0.001)
self.taker_fee = config.get('taker_fee', 0.001)
self.withdrawal_fee = config.get('withdrawal_fee', 0.0005)
self.funding_rate = config.get('funding_rate', 0.0001)
def calculate_trading_cost(self, trade: Trade, order_type: str = 'taker') -> float:
"""取引手数料の計算"""
if order_type == 'maker':
fee_rate = self.maker_fee
else:
fee_rate = self.taker_fee
return trade.quantity * trade.price * fee_rate
def calculate_funding_cost(self, position: Position, hours: int = 8) -> float:
"""資金調達コストの計算(先物取引)"""
position_value = abs(position.quantity) * position.avg_price
return position_value * self.funding_rate * (hours / 8)
def calculate_borrowing_cost(self, borrowed_amount: float, rate: float, days: int) -> float:
"""借用コストの計算(信用取引)"""
return borrowed_amount * rate * (days / 365)
3. リスク管理フレームワーク
3.1 ポジションサイジング戦略
class PositionSizingManager:
"""ポジションサイジング管理"""
def __init__(self, config: Dict):
self.max_position_size = config.get('max_position_size', 0.1) # ポートフォリオの10%
self.max_portfolio_risk = config.get('max_portfolio_risk', 0.02) # 2%のVaR
self.kelly_fraction = config.get('kelly_fraction', 0.25)
self.risk_free_rate = config.get('risk_free_rate', 0.02)
def calculate_position_size(self,
signal: Dict,
portfolio_value: float,
volatility: float,
win_rate: float = 0.5,
avg_win_loss_ratio: float = 1.0) -> float:
"""ポジションサイズの計算"""
# Kelly基準
kelly_size = self._kelly_criterion(win_rate, avg_win_loss_ratio, portfolio_value)
# ボラティリティベース
volatility_size = self._volatility_based_sizing(signal, portfolio_value, volatility)
# 固定パーセンテージ
fixed_size = portfolio_value * self.max_position_size
# 最小値を採用(最も保守的)
position_size = min(kelly_size, volatility_size, fixed_size)
return position_size
def _kelly_criterion(self, win_rate: float, win_loss_ratio: float, capital: float) -> float:
"""Kelly基準によるポジションサイジング"""
if win_rate <= 0 or win_loss_ratio <= 0:
return 0
# Kelly公式: f = p - q/b
# p: 勝率, q: 負率, b: 勝ち/負けの比率
p = win_rate
q = 1 - win_rate
b = win_loss_ratio
kelly_fraction = p - q / b
# 安全のため、フルKellyの25%を使用
safe_kelly = max(0, kelly_fraction) * self.kelly_fraction
return capital * safe_kelly
def _volatility_based_sizing(self, signal: Dict, capital: float, volatility: float) -> float:
"""ボラティリティベースのサイジング"""
target_volatility = 0.02 # 2%の目標ボラティリティ
# ポジションサイズ = 目標ボラティリティ / 予想ボラティリティ
if volatility > 0:
size_multiplier = target_volatility / volatility
position_size = capital * min(size_multiplier, self.max_position_size)
else:
position_size = capital * self.max_position_size
return position_size
class RiskManager:
"""リスク管理システム"""
def __init__(self, config: Dict):
self.max_drawdown = config.get('max_drawdown', 0.2)
self.max_var = config.get('max_var', 0.05)
self.stop_loss_pct = config.get('stop_loss_pct', 0.05)
self.position_sizing = PositionSizingManager(config)
# リスク状態
self.current_drawdown = 0.0
self.portfolio_var = 0.0
self.risk_level = 'normal'
def evaluate_portfolio_risk(self,
positions: Dict[str, Position],
market_data: Dict,
returns_history: pd.DataFrame) -> Dict:
"""ポートフォリオリスクの評価"""
# VaR計算
portfolio_var = self._calculate_var(positions, returns_history)
# ドローダウン計算
current_drawdown = self._calculate_current_drawdown(market_data)
# ポジション集中度
concentration_risk = self._calculate_concentration_risk(positions)
# 流動性リスク
liquidity_risk = self._calculate_liquidity_risk(positions, market_data)
risk_metrics = {
'var_95': portfolio_var,
'current_drawdown': current_drawdown,
'concentration_risk': concentration_risk,
'liquidity_risk': liquidity_risk,
'risk_level': self._determine_risk_level(portfolio_var, current_drawdown)
}
return risk_metrics
def _calculate_var(self, positions: Dict, returns_history: pd.DataFrame, confidence: float = 0.95) -> float:
"""Value at Risk (VaR)の計算"""
if returns_history.empty or not positions:
return 0.0
# ポートフォリオリターンの計算
portfolio_returns = self._calculate_portfolio_returns(positions, returns_history)
# VaR計算(パーセンタイル法)
var = np.percentile(portfolio_returns, (1 - confidence) * 100)
return abs(var)
def _calculate_portfolio_returns(self, positions: Dict, returns_history: pd.DataFrame) -> np.ndarray:
"""ポートフォリオリターンの計算"""
portfolio_returns = np.zeros(len(returns_history))
total_value = sum(abs(pos.quantity * pos.avg_price) for pos in positions.values())
if total_value == 0:
return portfolio_returns
for symbol, position in positions.items():
if symbol in returns_history.columns:
weight = abs(position.quantity * position.avg_price) / total_value
portfolio_returns += weight * returns_history[symbol].values
return portfolio_returns
def _calculate_concentration_risk(self, positions: Dict) -> float:
"""ポジション集中度リスクの計算"""
if not positions:
return 0.0
# Herfindahl-Hirschman Index (HHI)
total_value = sum(abs(pos.quantity * pos.avg_price) for pos in positions.values())
if total_value == 0:
return 0.0
hhi = sum((abs(pos.quantity * pos.avg_price) / total_value) ** 2
for pos in positions.values())
return hhi
def _calculate_liquidity_risk(self, positions: Dict, market_data: Dict) -> float:
"""流動性リスクの計算"""
total_liquidity_risk = 0.0
total_value = 0.0
for symbol, position in positions.items():
position_value = abs(position.quantity * position.avg_price)
volume = market_data.get(f"{symbol}_volume", 0)
# 流動性リスク = ポジションサイズ / 平均出来高
if volume > 0:
liquidity_risk = position_value / (volume * position.avg_price)
else:
liquidity_risk = 1.0 # 最大リスク
total_liquidity_risk += position_value * liquidity_risk
total_value += position_value
return total_liquidity_risk / total_value if total_value > 0 else 0.0
def _determine_risk_level(self, var: float, drawdown: float) -> str:
"""リスクレベルの判定"""
if var > self.max_var * 1.5 or abs(drawdown) > self.max_drawdown * 1.5:
return 'high'
elif var > self.max_var or abs(drawdown) > self.max_drawdown:
return 'medium'
else:
return 'normal'
def should_reduce_risk(self, risk_metrics: Dict) -> bool:
"""リスク削減が必要かの判定"""
return (risk_metrics['risk_level'] in ['high', 'medium'] or
risk_metrics['var_95'] > self.max_var or
abs(risk_metrics['current_drawdown']) > self.max_drawdown)
def get_risk_reduction_actions(self, risk_metrics: Dict) -> List[str]:
"""リスク削減アクションの提案"""
actions = []
if risk_metrics['var_95'] > self.max_var:
actions.append("ポジションサイズを50%削減")
if abs(risk_metrics['current_drawdown']) > self.max_drawdown * 0.8:
actions.append("損切りの実行")
if risk_metrics['concentration_risk'] > 0.5:
actions.append("ポートフォリオの分散化")
if risk_metrics['liquidity_risk'] > 0.1:
actions.append("流動性の低いポジションの削減")
return actions
3.2 ストップロスとリスク制御
class StopLossManager:
"""ストップロス管理"""
def __init__(self, config: Dict):
self.default_stop_loss_pct = config.get('stop_loss_pct', 0.05)
self.trailing_stop_pct = config.get('trailing_stop_pct', 0.03)
self.atr_multiplier = config.get('atr_multiplier', 2.0)
self.adaptive_stop = config.get('adaptive_stop', True)
# アクティブなストップロス
self.active_stops = {}
def create_stop_loss(self,
position: Position,
market_data: Dict,
stop_type: str = 'percentage') -> Dict:
"""ストップロスの作成"""
current_price = market_data.get(f"{position.symbol}_close", position.avg_price)
if stop_type == 'percentage':
stop_price = self._percentage_stop(position, current_price)
elif stop_type == 'atr':
stop_price = self._atr_stop(position, market_data)
elif stop_type == 'volatility':
stop_price = self._volatility_stop(position, market_data)
elif stop_type == 'adaptive':
stop_price = self._adaptive_stop(position, market_data)
else:
stop_price = self._percentage_stop(position, current_price)
stop_order = {
'symbol': position.symbol,
'stop_price': stop_price,
'quantity': abs(position.quantity),
'side': 'sell' if position.quantity > 0 else 'buy',
'type': stop_type,
'created_at': datetime.now()
}
self.active_stops[position.symbol] = stop_order
return stop_order
def _percentage_stop(self, position: Position, current_price: float) -> float:
"""パーセンテージベースのストップロス"""
if position.quantity > 0: # ロングポジション
return current_price * (1 - self.default_stop_loss_pct)
else: # ショートポジション
return current_price * (1 + self.default_stop_loss_pct)
def _atr_stop(self, position: Position, market_data: Dict) -> float:
"""ATRベースのストップロス"""
current_price = market_data.get(f"{position.symbol}_close", position.avg_price)
atr = market_data.get(f"{position.symbol}_atr", current_price * 0.02)
if position.quantity > 0:
return current_price - (atr * self.atr_multiplier)
else:
return current_price + (atr * self.atr_multiplier)
def _volatility_stop(self, position: Position, market_data: Dict) -> float:
"""ボラティリティベースのストップロス"""
current_price = market_data.get(f"{position.symbol}_close", position.avg_price)
volatility = market_data.get(f"{position.symbol}_volatility", 0.02)
# 2σのストップロス
stop_distance = current_price * volatility * 2
if position.quantity > 0:
return current_price - stop_distance
else:
return current_price + stop_distance
def _adaptive_stop(self, position: Position, market_data: Dict) -> float:
"""適応的ストップロス"""
# 複数の手法を組み合わせ
percentage_stop = self._percentage_stop(position, market_data.get(f"{position.symbol}_close"))
atr_stop = self._atr_stop(position, market_data)
volatility_stop = self._volatility_stop(position, market_data)
current_price = market_data.get(f"{position.symbol}_close", position.avg_price)
if position.quantity > 0:
# 最も近い(保守的な)ストップを選択
return max(percentage_stop, atr_stop, volatility_stop)
else:
# 最も近い(保守的な)ストップを選択
return min(percentage_stop, atr_stop, volatility_stop)
def update_trailing_stops(self, positions: Dict, market_data: Dict):
"""トレーリングストップの更新"""
for symbol, position in positions.items():
if symbol in self.active_stops:
current_price = market_data.get(f"{symbol}_close")
if current_price is None:
continue
stop_order = self.active_stops[symbol]
if position.quantity > 0: # ロング
# 価格が上昇した場合、ストップロスを引き上げ
new_stop = current_price * (1 - self.trailing_stop_pct)
if new_stop > stop_order['stop_price']:
stop_order['stop_price'] = new_stop
else: # ショート
# 価格が下落した場合、ストップロスを引き下げ
new_stop = current_price * (1 + self.trailing_stop_pct)
if new_stop < stop_order['stop_price']:
stop_order['stop_price'] = new_stop
def check_stop_triggers(self, market_data: Dict) -> List[Dict]:
"""ストップロストリガーのチェック"""
triggered_stops = []
for symbol, stop_order in self.active_stops.items():
current_price = market_data.get(f"{symbol}_close")
if current_price is None:
continue
# トリガー条件のチェック
if self._is_stop_triggered(stop_order, current_price):
triggered_stops.append(stop_order)
# トリガーされたストップを削除
for stop in triggered_stops:
if stop['symbol'] in self.active_stops:
del self.active_stops[stop['symbol']]
return triggered_stops
def _is_stop_triggered(self, stop_order: Dict, current_price: float) -> bool:
"""ストップロストリガーの判定"""
if stop_order['side'] == 'sell':
# ロングポジションのストップロス
return current_price <= stop_order['stop_price']
else:
# ショートポジションのストップロス
return current_price >= stop_order['stop_price']
class DynamicRiskControl:
"""動的リスク制御"""
def __init__(self, config: Dict):
self.volatility_threshold = config.get('volatility_threshold', 0.05)
self.correlation_threshold = config.get('correlation_threshold', 0.8)
self.market_stress_threshold = config.get('market_stress_threshold', 0.1)
def assess_market_conditions(self, market_data: Dict) -> Dict:
"""市場状況の評価"""
# ボラティリティの評価
volatility_score = self._assess_volatility(market_data)
# 相関の評価
correlation_score = self._assess_correlation(market_data)
# 流動性の評価
liquidity_score = self._assess_liquidity(market_data)
# 総合スコア
overall_score = np.mean([volatility_score, correlation_score, liquidity_score])
return {
'volatility_score': volatility_score,
'correlation_score': correlation_score,
'liquidity_score': liquidity_score,
'overall_stress_score': overall_score,
'market_regime': self._classify_market_regime(overall_score)
}
def _assess_volatility(self, market_data: Dict) -> float:
"""ボラティリティの評価"""
# 複数銘柄の平均ボラティリティ
volatilities = [v for k, v in market_data.items() if k.endswith('_volatility')]
if not volatilities:
return 0.5 # 中立
avg_volatility = np.mean(volatilities)
# 0-1のスコアに正規化
return min(avg_volatility / self.volatility_threshold, 1.0)
def _assess_correlation(self, market_data: Dict) -> float:
"""相関の評価"""
# 相関データが利用可能な場合の処理
correlations = [v for k, v in market_data.items() if k.endswith('_correlation')]
if not correlations:
return 0.5 # 中立
avg_correlation = np.mean(np.abs(correlations))
return avg_correlation / self.correlation_threshold
def _assess_liquidity(self, market_data: Dict) -> float:
"""流動性の評価"""
volumes = [v for k, v in market_data.items() if k.endswith('_volume')]
if not volumes:
return 0.5 # 中立
# 過去の平均との比較
current_volume = np.mean(volumes)
historical_avg = market_data.get('historical_avg_volume', current_volume)
liquidity_ratio = current_volume / historical_avg if historical_avg > 0 else 1.0
# 逆数(流動性が低いほど高スコア)
return max(0, 1 - liquidity_ratio)
def _classify_market_regime(self, stress_score: float) -> str:
"""市場レジームの分類"""
if stress_score > 0.8:
return 'high_stress'
elif stress_score > 0.6:
return 'medium_stress'
elif stress_score > 0.4:
return 'normal'
else:
return 'low_stress'
def get_risk_adjustments(self, market_conditions: Dict) -> Dict:
"""リスク調整の提案"""
regime = market_conditions['market_regime']
stress_score = market_conditions['overall_stress_score']
adjustments = {
'position_size_multiplier': 1.0,
'stop_loss_multiplier': 1.0,
'max_positions': 10,
'rebalance_frequency': 'daily'
}
if regime == 'high_stress':
adjustments.update({
'position_size_multiplier': 0.3,
'stop_loss_multiplier': 0.5,
'max_positions': 3,
'rebalance_frequency': 'hourly'
})
elif regime == 'medium_stress':
adjustments.update({
'position_size_multiplier': 0.6,
'stop_loss_multiplier': 0.7,
'max_positions': 5,
'rebalance_frequency': '4hourly'
})
elif regime == 'normal':
adjustments.update({
'position_size_multiplier': 1.0,
'stop_loss_multiplier': 1.0,
'max_positions': 10,
'rebalance_frequency': 'daily'
})
return adjustments
4. パフォーマンス分析とレポート
4.1 詳細なパフォーマンス分析
class PerformanceAnalyzer:
"""パフォーマンス分析器"""
def __init__(self):
self.benchmark_data = None
self.risk_free_rate = 0.02
def analyze_performance(self,
portfolio_history: pd.DataFrame,
trades: List[Trade],
benchmark_returns: pd.Series = None) -> Dict:
"""包括的なパフォーマンス分析"""
# 基本統計
basic_stats = self._calculate_basic_statistics(portfolio_history)
# リスク調整済みリターン
risk_adjusted = self._calculate_risk_adjusted_returns(portfolio_history, benchmark_returns)
# トレード分析
trade_analysis = self._analyze_trades(trades)
# ドローダウン分析
drawdown_analysis = self._analyze_drawdowns(portfolio_history)
# ベンチマーク比較
benchmark_comparison = self._compare_to_benchmark(portfolio_history, benchmark_returns)
# リスクメトリクス
risk_metrics = self._calculate_risk_metrics(portfolio_history)
return {
'basic_statistics': basic_stats,
'risk_adjusted_returns': risk_adjusted,
'trade_analysis': trade_analysis,
'drawdown_analysis': drawdown_analysis,
'benchmark_comparison': benchmark_comparison,
'risk_metrics': risk_metrics
}
def _calculate_basic_statistics(self, portfolio_history: pd.DataFrame) -> Dict:
"""基本統計の計算"""
returns = portfolio_history['total_value'].pct_change().dropna()
if len(returns) == 0:
return {}
total_return = (portfolio_history['total_value'].iloc[-1] /
portfolio_history['total_value'].iloc[0] - 1)
# 年率換算
periods_per_year = self._get_periods_per_year(portfolio_history.index)
annual_return = (1 + total_return) ** (periods_per_year / len(portfolio_history)) - 1
annual_volatility = returns.std() * np.sqrt(periods_per_year)
return {
'total_return': total_return,
'annual_return': annual_return,
'annual_volatility': annual_volatility,
'max_value': portfolio_history['total_value'].max(),
'min_value': portfolio_history['total_value'].min(),
'final_value': portfolio_history['total_value'].iloc[-1],
'total_periods': len(portfolio_history)
}
def _calculate_risk_adjusted_returns(self,
portfolio_history: pd.DataFrame,
benchmark_returns: pd.Series = None) -> Dict:
"""リスク調整済みリターンの計算"""
returns = portfolio_history['total_value'].pct_change().dropna()
if len(returns) == 0:
return {}
periods_per_year = self._get_periods_per_year(portfolio_history.index)
annual_return = returns.mean() * periods_per_year
annual_volatility = returns.std() * np.sqrt(periods_per_year)
# シャープレシオ
sharpe_ratio = (annual_return - self.risk_free_rate) / annual_volatility if annual_volatility > 0 else 0
# ソルティノレシオ
downside_returns = returns[returns < 0]
downside_volatility = downside_returns.std() * np.sqrt(periods_per_year)
sortino_ratio = (annual_return - self.risk_free_rate) / downside_volatility if downside_volatility > 0 else 0
# カルマーレシオ
max_drawdown = self._calculate_max_drawdown(portfolio_history)
calmar_ratio = annual_return / abs(max_drawdown) if max_drawdown != 0 else 0
risk_adjusted = {
'sharpe_ratio': sharpe_ratio,
'sortino_ratio': sortino_ratio,
'calmar_ratio': calmar_ratio,
'downside_volatility': downside_volatility
}
# ベンチマーク関連指標
if benchmark_returns is not None:
beta, alpha = self._calculate_beta_alpha(returns, benchmark_returns)
information_ratio = self._calculate_information_ratio(returns, benchmark_returns)
risk_adjusted.update({
'beta': beta,
'alpha': alpha,
'information_ratio': information_ratio
})
return risk_adjusted
def _analyze_trades(self, trades: List[Trade]) -> Dict:
"""トレード分析"""
if not trades:
return {}
# 損益の計算
profits = []
for i in range(0, len(trades), 2): # ペアトレードを想定
if i + 1 < len(trades):
buy_trade = trades[i] if trades[i].side == OrderSide.BUY else trades[i+1]
sell_trade = trades[i+1] if trades[i+1].side == OrderSide.SELL else trades[i]
if buy_trade.symbol == sell_trade.symbol:
profit = (sell_trade.price - buy_trade.price) * buy_trade.quantity
profits.append(profit)
if not profits:
return {}
winning_trades = [p for p in profits if p > 0]
losing_trades = [p for p in profits if p < 0]
return {
'total_trades': len(profits),
'winning_trades': len(winning_trades),
'losing_trades': len(losing_trades),
'win_rate': len(winning_trades) / len(profits),
'avg_win': np.mean(winning_trades) if winning_trades else 0,
'avg_loss': np.mean(losing_trades) if losing_trades else 0,
'avg_win_loss_ratio': (np.mean(winning_trades) / abs(np.mean(losing_trades))) if losing_trades else 0,
'profit_factor': sum(winning_trades) / abs(sum(losing_trades)) if losing_trades else float('inf'),
'largest_win': max(profits),
'largest_loss': min(profits),
'total_profit': sum(profits)
}
def _analyze_drawdowns(self, portfolio_history: pd.DataFrame) -> Dict:
"""ドローダウン分析"""
values = portfolio_history['total_value']
peak = values.expanding().max()
drawdown = (values - peak) / peak
# ドローダウン期間の計算
drawdown_periods = []
current_dd_start = None
for i, dd in enumerate(drawdown):
if dd < 0 and current_dd_start is None:
current_dd_start = i
elif dd >= 0 and current_dd_start is not None:
drawdown_periods.append(i - current_dd_start)
current_dd_start = None
# 最後のドローダウンが継続中の場合
if current_dd_start is not None:
drawdown_periods.append(len(drawdown) - current_dd_start)
return {
'max_drawdown': drawdown.min(),
'avg_drawdown': drawdown[drawdown < 0].mean() if any(drawdown < 0) else 0,
'max_drawdown_duration': max(drawdown_periods) if drawdown_periods else 0,
'avg_drawdown_duration': np.mean(drawdown_periods) if drawdown_periods else 0,
'num_drawdown_periods': len(drawdown_periods),
'current_drawdown': drawdown.iloc[-1],
'days_since_peak': len(drawdown) - drawdown.idxmax() - 1
}
def _compare_to_benchmark(self,
portfolio_history: pd.DataFrame,
benchmark_returns: pd.Series) -> Dict:
"""ベンチマーク比較"""
if benchmark_returns is None:
return {}
portfolio_returns = portfolio_history['total_value'].pct_change().dropna()
# 期間を合わせる
common_index = portfolio_returns.index.intersection(benchmark_returns.index)
if len(common_index) == 0:
return {}
portfolio_aligned = portfolio_returns.loc[common_index]
benchmark_aligned = benchmark_returns.loc[common_index]
# 超過リターン
excess_returns = portfolio_aligned - benchmark_aligned
# 累積リターン
portfolio_cumulative = (1 + portfolio_aligned).cumprod() - 1
benchmark_cumulative = (1 + benchmark_aligned).cumprod() - 1
return {
'portfolio_total_return': portfolio_cumulative.iloc[-1],
'benchmark_total_return': benchmark_cumulative.iloc[-1],
'excess_return': portfolio_cumulative.iloc[-1] - benchmark_cumulative.iloc[-1],
'tracking_error': excess_returns.std() * np.sqrt(252),
'correlation': portfolio_aligned.corr(benchmark_aligned),
'up_capture': portfolio_aligned[benchmark_aligned > 0].mean() / benchmark_aligned[benchmark_aligned > 0].mean(),
'down_capture': portfolio_aligned[benchmark_aligned < 0].mean() / benchmark_aligned[benchmark_aligned < 0].mean()
}
def _calculate_risk_metrics(self, portfolio_history: pd.DataFrame) -> Dict:
"""リスクメトリクスの計算"""
returns = portfolio_history['total_value'].pct_change().dropna()
if len(returns) == 0:
return {}
# VaR計算
var_95 = np.percentile(returns, 5)
var_99 = np.percentile(returns, 1)
# CVaR計算
cvar_95 = returns[returns <= var_95].mean()
cvar_99 = returns[returns <= var_99].mean()
# 歪度と尖度
skewness = returns.skew()
kurtosis = returns.kurtosis()
return {
'var_95': var_95,
'var_99': var_99,
'cvar_95': cvar_95,
'cvar_99': cvar_99,
'skewness': skewness,
'kurtosis': kurtosis,
'positive_periods': (returns > 0).sum(),
'negative_periods': (returns < 0).sum(),
'max_consecutive_wins': self._max_consecutive(returns > 0),
'max_consecutive_losses': self._max_consecutive(returns < 0)
}
def _max_consecutive(self, boolean_series: pd.Series) -> int:
"""最大連続回数の計算"""
groups = boolean_series.groupby((boolean_series != boolean_series.shift()).cumsum())
consecutive_true = groups.apply(lambda x: len(x) if x.iloc[0] else 0)
return consecutive_true.max()
def _get_periods_per_year(self, index: pd.DatetimeIndex) -> float:
"""年間期間数の計算"""
if len(index) < 2:
return 252 # デフォルト(日次)
avg_timedelta = (index[-1] - index[0]) / (len(index) - 1)
days_per_period = avg_timedelta.total_seconds() / (24 * 3600)
return 365.25 / days_per_period
4.2 レポート生成
class BacktestReport:
"""バックテストレポート生成"""
def __init__(self, analyzer: PerformanceAnalyzer):
self.analyzer = analyzer
def generate_html_report(self,
backtest_results: Dict,
output_file: str = "backtest_report.html") -> str:
"""HTMLレポートの生成"""
html_template = """
<!DOCTYPE html>
<html>
<head>
<title>Backtest Report</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.section { margin-bottom: 30px; }
.metric { display: inline-block; margin: 10px; padding: 10px; border: 1px solid #ddd; }
.positive { color: green; }
.negative { color: red; }
table { border-collapse: collapse; width: 100%; }
th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
th { background-color: #f2f2f2; }
</style>
</head>
<body>
<h1>Cryptocurrency Trading Backtest Report</h1>
<div class="section">
<h2>Executive Summary</h2>
{executive_summary}
</div>
<div class="section">
<h2>Performance Metrics</h2>
{performance_metrics}
</div>
<div class="section">
<h2>Risk Analysis</h2>
{risk_analysis}
</div>
<div class="section">
<h2>Trade Analysis</h2>
{trade_analysis}
</div>
<div class="section">
<h2>Recommendations</h2>
{recommendations}
</div>
</body>
</html>
"""
# 各セクションの生成
executive_summary = self._generate_executive_summary(backtest_results)
performance_metrics = self._generate_performance_metrics_html(backtest_results)
risk_analysis = self._generate_risk_analysis_html(backtest_results)
trade_analysis = self._generate_trade_analysis_html(backtest_results)
recommendations = self._generate_recommendations_html(backtest_results)
# HTMLの組み立て
html_content = html_template.format(
executive_summary=executive_summary,
performance_metrics=performance_metrics,
risk_analysis=risk_analysis,
trade_analysis=trade_analysis,
recommendations=recommendations
)
# ファイルに保存
with open(output_file, 'w', encoding='utf-8') as f:
f.write(html_content)
return html_content
def _generate_executive_summary(self, results: Dict) -> str:
"""エグゼクティブサマリーの生成"""
analysis = results.get('analysis', {})
basic_stats = analysis.get('basic_statistics', {})
risk_adjusted = analysis.get('risk_adjusted_returns', {})
total_return = basic_stats.get('total_return', 0) * 100
annual_return = basic_stats.get('annual_return', 0) * 100
max_drawdown = analysis.get('drawdown_analysis', {}).get('max_drawdown', 0) * 100
sharpe_ratio = risk_adjusted.get('sharpe_ratio', 0)
summary = f"""
<div class="metric">
<strong>Total Return:</strong>
<span class="{'positive' if total_return > 0 else 'negative'}">{total_return:.2f}%</span>
</div>
<div class="metric">
<strong>Annual Return:</strong>
<span class="{'positive' if annual_return > 0 else 'negative'}">{annual_return:.2f}%</span>
</div>
<div class="metric">
<strong>Sharpe Ratio:</strong> {sharpe_ratio:.2f}
</div>
<div class="metric">
<strong>Max Drawdown:</strong>
<span class="negative">{max_drawdown:.2f}%</span>
</div>
"""
return summary
def _generate_performance_metrics_html(self, results: Dict) -> str:
"""パフォーマンスメトリクスのHTML生成"""
analysis = results.get('analysis', {})
table_data = []
# 基本統計
basic_stats = analysis.get('basic_statistics', {})
for key, value in basic_stats.items():
if isinstance(value, (int, float)):
if 'return' in key.lower():
formatted_value = f"{value*100:.2f}%"
else:
formatted_value = f"{value:,.2f}"
table_data.append([key.replace('_', ' ').title(), formatted_value])
# リスク調整済みリターン
risk_adjusted = analysis.get('risk_adjusted_returns', {})
for key, value in risk_adjusted.items():
if isinstance(value, (int, float)):
table_data.append([key.replace('_', ' ').title(), f"{value:.3f}"])
# HTMLテーブルの生成
html = "<table><tr><th>Metric</th><th>Value</th></tr>"
for row in table_data:
html += f"<tr><td>{row[0]}</td><td>{row[1]}</td></tr>"
html += "</table>"
return html
def _generate_risk_analysis_html(self, results: Dict) -> str:
"""リスク分析のHTML生成"""
analysis = results.get('analysis', {})
risk_metrics = analysis.get('risk_metrics', {})
drawdown_analysis = analysis.get('drawdown_analysis', {})
risk_table = "<table><tr><th>Risk Metric</th><th>Value</th></tr>"
# リスクメトリクス
for key, value in risk_metrics.items():
if isinstance(value, (int, float)):
if 'var' in key.lower() or 'cvar' in key.lower():
formatted_value = f"{value*100:.2f}%"
else:
formatted_value = f"{value:.3f}"
risk_table += f"<tr><td>{key.replace('_', ' ').title()}</td><td>{formatted_value}</td></tr>"
# ドローダウン分析
for key, value in drawdown_analysis.items():
if isinstance(value, (int, float)):
if 'drawdown' in key.lower():
formatted_value = f"{value*100:.2f}%"
else:
formatted_value = f"{value:.0f}"
risk_table += f"<tr><td>{key.replace('_', ' ').title()}</td><td>{formatted_value}</td></tr>"
risk_table += "</table>"
return risk_table
def _generate_trade_analysis_html(self, results: Dict) -> str:
"""トレード分析のHTML生成"""
analysis = results.get('analysis', {})
trade_analysis = analysis.get('trade_analysis', {})
if not trade_analysis:
return "<p>No trade data available.</p>"
trade_table = "<table><tr><th>Trade Metric</th><th>Value</th></tr>"
for key, value in trade_analysis.items():
if isinstance(value, (int, float)):
if 'rate' in key.lower():
formatted_value = f"{value*100:.1f}%"
elif 'ratio' in key.lower() or 'factor' in key.lower():
formatted_value = f"{value:.2f}"
else:
formatted_value = f"{value:,.0f}"
trade_table += f"<tr><td>{key.replace('_', ' ').title()}</td><td>{formatted_value}</td></tr>"
trade_table += "</table>"
return trade_table
def _generate_recommendations_html(self, results: Dict) -> str:
"""推奨事項のHTML生成"""
analysis = results.get('analysis', {})
recommendations = []
# パフォーマンスベースの推奨事項
basic_stats = analysis.get('basic_statistics', {})
risk_adjusted = analysis.get('risk_adjusted_returns', {})
trade_analysis = analysis.get('trade_analysis', {})
sharpe_ratio = risk_adjusted.get('sharpe_ratio', 0)
win_rate = trade_analysis.get('win_rate', 0)
max_drawdown = analysis.get('drawdown_analysis', {}).get('max_drawdown', 0)
if sharpe_ratio < 1.0:
recommendations.append("シャープレシオが低いため、リスク調整済みリターンの改善が必要です。")
if win_rate < 0.5:
recommendations.append("勝率が50%を下回っているため、エントリー条件の見直しを検討してください。")
if abs(max_drawdown) > 0.2:
recommendations.append("最大ドローダウンが20%を超えているため、リスク管理の強化が必要です。")
if not recommendations:
recommendations.append("総合的に良好なパフォーマンスです。現在の戦略を継続することを推奨します。")
# HTML生成
html = "<ul>"
for rec in recommendations:
html += f"<li>{rec}</li>"
html += "</ul>"
return html
def main():
"""使用例"""
# 設定
config = {
'initial_capital': 100000,
'commission_rate': 0.001,
'max_position_size': 0.1,
'stop_loss_pct': 0.05,
'max_drawdown': 0.2,
'max_var': 0.05
}
# バックテストエンジンの初期化
engine = BacktestEngine(
initial_capital=config['initial_capital'],
commission_rate=config['commission_rate']
)
# サンプルデータの生成(実際の実装では実データを使用)
dates = pd.date_range('2023-01-01', '2023-12-31', freq='D')
sample_data = pd.DataFrame({
'BTC_close': np.random.randn(len(dates)).cumsum() + 30000,
'ETH_close': np.random.randn(len(dates)).cumsum() + 2000,
'BTC_volume': np.random.randint(1000000, 5000000, len(dates)),
'ETH_volume': np.random.randint(500000, 2000000, len(dates))
}, index=dates)
# 簡単な戦略の例
class SimpleStrategy:
def generate_signals(self, timestamp, data, positions):
# 簡単な移動平均戦略
signals = []
# 実際の戦略ロジックをここに実装
return signals
strategy = SimpleStrategy()
# バックテストの実行
results = engine.run_backtest(sample_data, strategy)
# パフォーマンス分析
analyzer = PerformanceAnalyzer()
if 'portfolio_history' in results:
analysis = analyzer.analyze_performance(
results['portfolio_history'],
results.get('trades', [])
)
results['analysis'] = analysis
# レポート生成
report_generator = BacktestReport(analyzer)
html_report = report_generator.generate_html_report(results)
print("バックテストが完了しました。")
print(f"総リターン: {results.get('total_return', 0)*100:.2f}%")
print(f"取引回数: {len(results.get('trades', []))}")
if __name__ == "__main__":
main()
5. まとめとベストプラクティス
5.1 バックテストのベストプラクティス
-
データの品質確保
- 欠損値の適切な処理
- サバイバーシップバイアスの回避
- 取引時間外データの除外 -
現実的な仮定
- 適切なスリッページモデル
- 実際の取引コストの反映
- 流動性制約の考慮 -
オーバーフィッティングの回避
- アウトオブサンプルテスト
- ウォークフォワード分析
- パラメータ最適化の制限 -
リスク管理の重要性
- 動的ポジションサイジング
- 適応的ストップロス
- 市場状況に応じた調整
5.2 実装チェックリスト
BACKTEST_CHECKLIST = {
"データ準備": [
"価格データの品質確認",
"タイムゾーンの統一",
"企業行動の調整",
"サバイバーシップバイアス対策"
],
"取引コスト": [
"手数料の正確な反映",
"スリッページモデルの選択",
"ファンディングコストの考慮",
"税金の影響"
],
"リスク管理": [
"ポジションサイズ制限",
"ストップロス設定",
"ドローダウン制御",
"相関リスク管理"
],
"検証": [
"アウトオブサンプルテスト",
"ベンチマーク比較",
"感度分析",
"ストレステスト"
]
}
適切なバックテストとリスク管理により、暗号資産取引における成功確率を大幅に向上させることができます。継続的な監視と改善が重要です。