イベント駆動型バックテスターの詳細解説
1. はじめに
このドキュメントは、GitHubリポジトリ「Enhanced-Event-Driven-Backtester」の分析結果をまとめたものです。イベント駆動型バックテスターは、実際の取引システムに近い形で戦略をテストできる高度なフレームワークです。
2. イベント駆動型アーキテクチャの概要
2.1 基本概念
イベント駆動型バックテスターは、以下の特徴を持ちます:
- リアルタイム模擬: 実際の取引と同じ順序でイベントを処理
- 疎結合設計: 各コンポーネントが独立して動作
- 拡張性: 新しい戦略や実行ロジックを容易に追加可能
- 詳細なシミュレーション: スリッページ、手数料、部分約定を考慮
2.2 イベントフロー
graph TD
A[データ更新] -->|MarketEvent| B[戦略]
B -->|SignalEvent| C[ポートフォリオ]
C -->|OrderEvent| D[実行ハンドラー]
D -->|FillEvent| E[ポートフォリオ更新]
E --> F[パフォーマンス計算]
3. コアコンポーネントの実装
3.1 イベントクラス
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Optional, Union
class Event(ABC):
"""すべてのイベントの基底クラス"""
@property
@abstractmethod
def type(self) -> str:
"""イベントタイプを返す"""
raise NotImplementedError("Should implement type()")
class MarketEvent(Event):
"""
市場データの更新を表すイベント
すべての内部データが最新の市場データに更新されたことを示す
"""
def __init__(self):
self.type = 'MARKET'
class SignalEvent(Event):
"""
売買シグナルを表すイベント
Attributes:
strategy_id: シグナルを生成した戦略のID
symbol: ティッカーシンボル
datetime: シグナル生成時刻
signal_type: 'LONG' または 'SHORT'
strength: シグナルの強さ(通常は確率や信頼度)
"""
def __init__(self, strategy_id: str, symbol: str, datetime: datetime,
signal_type: str, strength: float):
self.type = 'SIGNAL'
self.strategy_id = strategy_id
self.symbol = symbol
self.datetime = datetime
self.signal_type = signal_type
self.strength = strength
class OrderEvent(Event):
"""
注文を表すイベント
Attributes:
symbol: ティッカーシンボル
order_type: 'MKT' (成行) or 'LMT' (指値)
quantity: 注文数量
direction: 'BUY' or 'SELL'
limit_price: 指値価格(指値注文の場合)
"""
def __init__(self, symbol: str, order_type: str, quantity: int,
direction: str, limit_price: Optional[float] = None):
self.type = 'ORDER'
self.symbol = symbol
self.order_type = order_type
self.quantity = quantity
self.direction = direction
self.limit_price = limit_price
class FillEvent(Event):
"""
約定を表すイベント
Attributes:
datetime: 約定時刻
symbol: ティッカーシンボル
exchange: 取引所
quantity: 約定数量
direction: 'BUY' or 'SELL'
fill_cost: 約定価格
commission: 手数料
"""
def __init__(self, datetime: datetime, symbol: str, exchange: str,
quantity: int, direction: str, fill_cost: float,
commission: Optional[float] = None):
self.type = 'FILL'
self.datetime = datetime
self.symbol = symbol
self.exchange = exchange
self.quantity = quantity
self.direction = direction
self.fill_cost = fill_cost
# 手数料の計算
if commission is None:
self.commission = self.calculate_commission()
else:
self.commission = commission
def calculate_commission(self):
"""手数料を計算(デフォルトはIB手数料モデル)"""
if self.quantity <= 500:
commission = max(1.0, 0.005 * self.quantity)
else:
commission = max(1.0, 0.005 * 500) + 0.005 * (self.quantity - 500)
return commission
3.2 データハンドラー
from abc import ABC, abstractmethod
import pandas as pd
import yfinance as yf
from typing import List, Dict
class DataHandler(ABC):
"""データハンドラーの抽象基底クラス"""
@abstractmethod
def get_latest_bar(self, symbol: str) -> pd.Series:
"""指定シンボルの最新バーを返す"""
raise NotImplementedError("Should implement get_latest_bar()")
@abstractmethod
def get_latest_bars(self, symbol: str, N: int = 1) -> pd.DataFrame:
"""指定シンボルの最新N本のバーを返す"""
raise NotImplementedError("Should implement get_latest_bars()")
@abstractmethod
def update_bars(self) -> bool:
"""すべてのシンボルを次のバーに更新"""
raise NotImplementedError("Should implement update_bars()")
class YahooDataHandler(DataHandler):
"""Yahoo Financeからデータを取得するハンドラー"""
def __init__(self, events, symbol_list: List[str], start_date: datetime,
end_date: datetime, interval: str = '1d'):
self.events = events
self.symbol_list = symbol_list
self.start_date = start_date
self.end_date = end_date
self.interval = interval
self.symbol_data = {}
self.latest_symbol_data = {}
self.bar_index = 0
self._download_data()
def _download_data(self):
"""Yahoo Financeからデータをダウンロード"""
combined_index = None
for symbol in self.symbol_list:
try:
data = yf.download(
symbol,
start=self.start_date,
end=self.end_date,
interval=self.interval
)
# インデックスの統合
if combined_index is None:
combined_index = data.index
else:
combined_index = combined_index.union(data.index)
self.symbol_data[symbol] = data
except Exception as e:
print(f"Could not download data for {symbol}: {e}")
# すべてのデータを同じインデックスに揃える
for symbol in self.symbol_list:
if symbol in self.symbol_data:
self.symbol_data[symbol] = self.symbol_data[symbol].reindex(
index=combined_index, method='ffill'
)
self.latest_symbol_data[symbol] = []
def get_latest_bar(self, symbol: str) -> pd.Series:
"""最新のバーを返す"""
try:
return self.latest_symbol_data[symbol][-1]
except (KeyError, IndexError):
return None
def get_latest_bars(self, symbol: str, N: int = 1) -> pd.DataFrame:
"""最新のN本のバーを返す"""
try:
return self.latest_symbol_data[symbol][-N:]
except KeyError:
return None
def update_bars(self) -> bool:
"""すべてのシンボルのバーを更新"""
try:
for symbol in self.symbol_list:
if symbol in self.symbol_data:
bar = self.symbol_data[symbol].iloc[self.bar_index]
self.latest_symbol_data[symbol].append(bar)
self.bar_index += 1
self.events.put(MarketEvent())
return True
except IndexError:
# データの終端に到達
return False
3.3 戦略の実装
class Strategy(ABC):
"""戦略の抽象基底クラス"""
@abstractmethod
def calculate_signals(self, event: Event):
"""シグナルを計算する"""
raise NotImplementedError("Should implement calculate_signals()")
class MovingAverageCrossStrategy(Strategy):
"""移動平均クロスオーバー戦略"""
def __init__(self, bars: DataHandler, events, short_window: int = 100,
long_window: int = 400):
self.bars = bars
self.symbol_list = self.bars.symbol_list
self.events = events
self.short_window = short_window
self.long_window = long_window
# 各シンボルの状態を追跡
self.bought = self._calculate_initial_bought()
def _calculate_initial_bought(self) -> Dict[str, bool]:
"""初期ポジション状態を設定"""
bought = {}
for symbol in self.symbol_list:
bought[symbol] = False
return bought
def calculate_signals(self, event: Event):
"""移動平均クロスオーバーに基づくシグナルを生成"""
if event.type == 'MARKET':
for symbol in self.symbol_list:
bars = self.bars.get_latest_bars(symbol, self.long_window)
if bars is not None and len(bars) >= self.long_window:
# 短期・長期移動平均を計算
short_ma = bars['Close'].iloc[-self.short_window:].mean()
long_ma = bars['Close'].mean()
current_price = bars['Close'].iloc[-1]
dt = bars.index[-1]
# ゴールデンクロス(買いシグナル)
if short_ma > long_ma and not self.bought[symbol]:
signal = SignalEvent(
strategy_id='MAC',
symbol=symbol,
datetime=dt,
signal_type='LONG',
strength=1.0
)
self.events.put(signal)
self.bought[symbol] = True
# デッドクロス(売りシグナル)
elif short_ma < long_ma and self.bought[symbol]:
signal = SignalEvent(
strategy_id='MAC',
symbol=symbol,
datetime=dt,
signal_type='EXIT',
strength=1.0
)
self.events.put(signal)
self.bought[symbol] = False
3.4 ポートフォリオ管理
class Portfolio:
"""ポートフォリオ管理クラス"""
def __init__(self, bars: DataHandler, events, start_date: datetime,
initial_capital: float = 100000.0):
self.bars = bars
self.events = events
self.symbol_list = self.bars.symbol_list
self.start_date = start_date
self.initial_capital = initial_capital
self.current_positions = self._construct_current_positions()
self.current_holdings = self._construct_current_holdings()
self.all_positions = []
self.all_holdings = []
def _construct_current_positions(self) -> Dict:
"""現在のポジションを初期化"""
positions = {symbol: 0 for symbol in self.symbol_list}
positions['datetime'] = self.start_date
return positions
def _construct_current_holdings(self) -> Dict:
"""現在の保有資産を初期化"""
holdings = {symbol: 0.0 for symbol in self.symbol_list}
holdings['datetime'] = self.start_date
holdings['cash'] = self.initial_capital
holdings['commission'] = 0.0
holdings['total'] = self.initial_capital
return holdings
def update_from_market(self, event: Event):
"""市場データ更新時にポートフォリオを更新"""
if event.type == 'MARKET':
# 現在の市場価値を計算
holdings = {symbol: 0.0 for symbol in self.symbol_list}
holdings['datetime'] = self.bars.get_latest_bar(
self.symbol_list[0]
).name
holdings['cash'] = self.current_holdings['cash']
holdings['commission'] = self.current_holdings['commission']
holdings['total'] = self.current_holdings['cash']
for symbol in self.symbol_list:
market_value = self.current_positions[symbol] * \
self.bars.get_latest_bar(symbol)['Close']
holdings[symbol] = market_value
holdings['total'] += market_value
self.all_holdings.append(holdings)
self.current_holdings = holdings
def update_from_fill(self, event: Event):
"""約定イベントでポートフォリオを更新"""
if event.type == 'FILL':
# ポジションを更新
fill_dir = 1 if event.direction == 'BUY' else -1
self.current_positions[event.symbol] += fill_dir * event.quantity
# 保有資産を更新
fill_cost = event.fill_cost * event.quantity
self.current_holdings[event.symbol] += fill_dir * fill_cost
self.current_holdings['commission'] += event.commission
self.current_holdings['cash'] -= (fill_dir * fill_cost +
event.commission)
self.current_holdings['total'] = self.current_holdings['cash']
for symbol in self.symbol_list:
market_value = self.current_positions[symbol] * \
self.bars.get_latest_bar(symbol)['Close']
self.current_holdings['total'] += market_value
def generate_order(self, signal: Event):
"""シグナルから注文を生成"""
if signal.type == 'SIGNAL':
order = None
symbol = signal.symbol
direction = signal.signal_type
strength = signal.strength
# 注文サイズの計算(簡略化)
current_price = self.bars.get_latest_bar(symbol)['Close']
current_value = self.current_holdings['total']
# Kelly基準やリスクパリティなどの高度な手法も実装可能
# ここでは固定比率(ポートフォリオの10%)を使用
position_size = int((current_value * 0.1) / current_price)
if direction == 'LONG' and self.current_positions[symbol] == 0:
order = OrderEvent(symbol, 'MKT', position_size, 'BUY')
elif direction == 'EXIT' and self.current_positions[symbol] > 0:
order = OrderEvent(symbol, 'MKT',
abs(self.current_positions[symbol]), 'SELL')
if order:
self.events.put(order)
3.5 実行ハンドラー
class ExecutionHandler(ABC):
"""実行ハンドラーの抽象基底クラス"""
@abstractmethod
def execute_order(self, event: Event):
"""注文を実行する"""
raise NotImplementedError("Should implement execute_order()")
class SimpleSimulatedExecutionHandler(ExecutionHandler):
"""シンプルな約定シミュレーター"""
def __init__(self, events):
self.events = events
def execute_order(self, event: Event):
"""成行注文を即座に約定させる"""
if event.type == 'ORDER':
# 実際の実装では、スリッページやマーケットインパクトを考慮
# ここでは簡略化のため、現在価格で即座に約定
fill_event = FillEvent(
datetime=datetime.now(),
symbol=event.symbol,
exchange='SIMULATED',
quantity=event.quantity,
direction=event.direction,
fill_cost=None # 実際には市場価格を取得
)
self.events.put(fill_event)
class AdvancedExecutionHandler(ExecutionHandler):
"""高度な約定シミュレーター"""
def __init__(self, events, bars: DataHandler, slippage_model=None):
self.events = events
self.bars = bars
self.slippage_model = slippage_model or self._default_slippage_model
def _default_slippage_model(self, order_size: int,
current_volume: float) -> float:
"""デフォルトのスリッページモデル"""
# 注文サイズが日次出来高の何%かに基づいてスリッページを計算
participation_rate = order_size / current_volume
# 線形インパクトモデル(簡略化)
if participation_rate < 0.01:
return 0.0001 # 0.01%
elif participation_rate < 0.05:
return 0.0005 # 0.05%
else:
return 0.001 # 0.1%
def execute_order(self, event: Event):
"""スリッページを考慮した約定シミュレーション"""
if event.type == 'ORDER':
# 現在の市場データを取得
latest_bar = self.bars.get_latest_bar(event.symbol)
if latest_bar is not None:
# 基準価格(成行注文の場合は現在価格)
if event.order_type == 'MKT':
base_price = latest_bar['Close']
# スリッページの計算
slippage = self.slippage_model(
event.quantity,
latest_bar['Volume']
)
# 買い注文の場合は価格が上昇、売り注文の場合は下降
if event.direction == 'BUY':
fill_price = base_price * (1 + slippage)
else:
fill_price = base_price * (1 - slippage)
# 約定イベントの生成
fill_event = FillEvent(
datetime=latest_bar.name,
symbol=event.symbol,
exchange='SIMULATED',
quantity=event.quantity,
direction=event.direction,
fill_cost=fill_price
)
self.events.put(fill_event)
4. バックテストエンジン
4.1 メインループの実装
import queue
from typing import Type
class Backtest:
"""バックテストエンジンのメインクラス"""
def __init__(self,
data_dir: str,
symbol_list: List[str],
initial_capital: float,
heartbeat: float,
start_date: datetime,
end_date: datetime,
interval: str,
data_handler: Type[DataHandler],
execution_handler: Type[ExecutionHandler],
portfolio: Type[Portfolio],
strategy: Type[Strategy]):
self.data_dir = data_dir
self.symbol_list = symbol_list
self.initial_capital = initial_capital
self.heartbeat = heartbeat
self.start_date = start_date
self.end_date = end_date
self.interval = interval
self.events = queue.Queue()
self.data_handler = data_handler(
self.events, self.symbol_list,
self.start_date, self.end_date, self.interval
)
self.strategy = strategy(self.data_handler, self.events)
self.portfolio = portfolio(
self.data_handler, self.events,
self.start_date, self.initial_capital
)
self.execution_handler = execution_handler(self.events)
def _run_backtest(self):
"""バックテストのメインループ"""
while True:
# データハンドラーを更新
if self.data_handler.update_bars():
# イベントキューを処理
while True:
try:
event = self.events.get(False)
except queue.Empty:
break
else:
if event is not None:
if event.type == 'MARKET':
self.strategy.calculate_signals(event)
self.portfolio.update_from_market(event)
elif event.type == 'SIGNAL':
self.portfolio.generate_order(event)
elif event.type == 'ORDER':
self.execution_handler.execute_order(event)
elif event.type == 'FILL':
self.portfolio.update_from_fill(event)
else:
# データの終端に到達
break
def simulate_trading(self):
"""取引シミュレーションを実行"""
self._run_backtest()
return self.portfolio.all_holdings
5. パフォーマンス分析
5.1 評価指標の実装
import numpy as np
import pandas as pd
from typing import List, Dict, Tuple
class PerformanceAnalyzer:
"""パフォーマンス分析クラス"""
def __init__(self, holdings: List[Dict]):
self.holdings = pd.DataFrame(holdings)
self.holdings.set_index('datetime', inplace=True)
def calculate_total_return(self) -> float:
"""総リターンを計算"""
initial_value = self.holdings['total'].iloc[0]
final_value = self.holdings['total'].iloc[-1]
return (final_value - initial_value) / initial_value
def calculate_sharpe_ratio(self, periods: int = 252) -> float:
"""シャープレシオを計算"""
returns = self.holdings['total'].pct_change()
return np.sqrt(periods) * returns.mean() / returns.std()
def calculate_max_drawdown(self) -> Tuple[float, pd.Timestamp, pd.Timestamp]:
"""最大ドローダウンを計算"""
wealth_index = self.holdings['total']
previous_peaks = wealth_index.cummax()
drawdowns = (wealth_index - previous_peaks) / previous_peaks
max_drawdown = drawdowns.min()
max_drawdown_end = drawdowns.idxmin()
# ドローダウン開始時点を見つける
window = drawdowns[:max_drawdown_end]
max_drawdown_start = wealth_index[:max_drawdown_end][
window == 0
].index[-1]
return max_drawdown, max_drawdown_start, max_drawdown_end
def calculate_calmar_ratio(self, periods: int = 252) -> float:
"""カルマーレシオを計算"""
annual_return = self.calculate_annual_return(periods)
max_dd, _, _ = self.calculate_max_drawdown()
return annual_return / abs(max_dd)
def calculate_annual_return(self, periods: int = 252) -> float:
"""年率リターンを計算"""
total_return = self.calculate_total_return()
num_periods = len(self.holdings)
return (1 + total_return) ** (periods / num_periods) - 1
def calculate_win_rate(self, positions: pd.DataFrame) -> float:
"""勝率を計算"""
# ポジションの損益を計算
trades = self._extract_trades(positions)
winning_trades = sum(1 for trade in trades if trade['pnl'] > 0)
return winning_trades / len(trades) if trades else 0
def _extract_trades(self, positions: pd.DataFrame) -> List[Dict]:
"""ポジション履歴から個別取引を抽出"""
trades = []
for symbol in positions.columns:
if symbol == 'datetime':
continue
position_series = positions[symbol]
entries = position_series[
(position_series != 0) &
(position_series.shift(1) == 0)
]
exits = position_series[
(position_series == 0) &
(position_series.shift(1) != 0)
]
# エントリーとエグジットをペアリング
for entry_date in entries.index:
exit_dates = exits[exits.index > entry_date]
if not exit_dates.empty:
exit_date = exit_dates.index[0]
# 損益を計算(簡略化)
entry_price = self.holdings.loc[entry_date, symbol]
exit_price = self.holdings.loc[exit_date, symbol]
pnl = exit_price - entry_price
trades.append({
'symbol': symbol,
'entry_date': entry_date,
'exit_date': exit_date,
'pnl': pnl
})
return trades
def generate_performance_report(self) -> Dict:
"""総合的なパフォーマンスレポートを生成"""
total_return = self.calculate_total_return()
annual_return = self.calculate_annual_return()
sharpe_ratio = self.calculate_sharpe_ratio()
max_dd, dd_start, dd_end = self.calculate_max_drawdown()
calmar_ratio = self.calculate_calmar_ratio()
report = {
'Total Return': f"{total_return:.2%}",
'Annual Return': f"{annual_return:.2%}",
'Sharpe Ratio': f"{sharpe_ratio:.2f}",
'Max Drawdown': f"{max_dd:.2%}",
'Drawdown Period': f"{dd_start} to {dd_end}",
'Calmar Ratio': f"{calmar_ratio:.2f}",
'Final Portfolio Value': f"${self.holdings['total'].iloc[-1]:,.2f}"
}
return report
5.2 可視化
import matplotlib.pyplot as plt
import seaborn as sns
class BacktestVisualizer:
"""バックテスト結果の可視化クラス"""
def __init__(self, holdings: pd.DataFrame):
self.holdings = holdings
def plot_equity_curve(self, benchmark=None):
"""エクイティカーブをプロット"""
fig, ax = plt.subplots(figsize=(12, 6))
# ポートフォリオのエクイティカーブ
ax.plot(self.holdings.index, self.holdings['total'],
label='Portfolio', linewidth=2)
# ベンチマークがある場合
if benchmark is not None:
ax.plot(benchmark.index, benchmark.values,
label='Benchmark', linewidth=2, alpha=0.7)
ax.set_xlabel('Date')
ax.set_ylabel('Portfolio Value ($)')
ax.set_title('Equity Curve')
ax.legend()
ax.grid(True, alpha=0.3)
return fig
def plot_drawdown(self):
"""ドローダウンをプロット"""
wealth_index = self.holdings['total']
previous_peaks = wealth_index.cummax()
drawdown = (wealth_index - previous_peaks) / previous_peaks
fig, ax = plt.subplots(figsize=(12, 4))
drawdown.plot(ax=ax, color='red', alpha=0.7)
ax.fill_between(drawdown.index, 0, drawdown.values,
color='red', alpha=0.3)
ax.set_xlabel('Date')
ax.set_ylabel('Drawdown (%)')
ax.set_title('Drawdown Over Time')
ax.grid(True, alpha=0.3)
return fig
def plot_returns_distribution(self):
"""リターン分布をプロット"""
returns = self.holdings['total'].pct_change().dropna()
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))
# ヒストグラム
ax1.hist(returns, bins=50, alpha=0.7, color='blue', edgecolor='black')
ax1.axvline(returns.mean(), color='red', linestyle='--',
label=f'Mean: {returns.mean():.4f}')
ax1.axvline(0, color='black', linestyle='-', alpha=0.3)
ax1.set_xlabel('Daily Returns')
ax1.set_ylabel('Frequency')
ax1.set_title('Returns Distribution')
ax1.legend()
# Q-Qプロット
from scipy import stats
stats.probplot(returns, dist="norm", plot=ax2)
ax2.set_title('Normal Q-Q Plot')
return fig
def plot_monthly_returns_heatmap(self):
"""月次リターンのヒートマップ"""
# 日次リターンから月次リターンを計算
daily_returns = self.holdings['total'].pct_change()
monthly_returns = daily_returns.resample('M').apply(
lambda x: (1 + x).prod() - 1
)
# 年月でピボット
monthly_returns_df = pd.DataFrame(monthly_returns)
monthly_returns_df['Year'] = monthly_returns_df.index.year
monthly_returns_df['Month'] = monthly_returns_df.index.month
pivot = monthly_returns_df.pivot(
index='Year',
columns='Month',
values='total'
)
# ヒートマップ
fig, ax = plt.subplots(figsize=(12, 8))
sns.heatmap(pivot, annot=True, fmt='.2%', cmap='RdYlGn',
center=0, ax=ax)
ax.set_title('Monthly Returns Heatmap')
ax.set_xlabel('Month')
ax.set_ylabel('Year')
return fig
6. 実装例
6.1 完全なバックテストの実行
def run_complete_backtest():
"""完全なバックテストを実行"""
# パラメータ設定
symbol_list = ['AAPL', 'MSFT', 'GOOGL', 'AMZN']
initial_capital = 100000.0
start_date = datetime(2020, 1, 1)
end_date = datetime(2023, 12, 31)
# バックテストの初期化
backtest = Backtest(
data_dir='./data',
symbol_list=symbol_list,
initial_capital=initial_capital,
heartbeat=0.0,
start_date=start_date,
end_date=end_date,
interval='1d',
data_handler=YahooDataHandler,
execution_handler=AdvancedExecutionHandler,
portfolio=Portfolio,
strategy=MovingAverageCrossStrategy
)
# シミュレーション実行
print("Running backtest...")
holdings = backtest.simulate_trading()
# パフォーマンス分析
print("\nAnalyzing performance...")
analyzer = PerformanceAnalyzer(holdings)
report = analyzer.generate_performance_report()
print("\n=== Performance Report ===")
for metric, value in report.items():
print(f"{metric}: {value}")
# 可視化
print("\nGenerating visualizations...")
visualizer = BacktestVisualizer(analyzer.holdings)
# エクイティカーブ
fig1 = visualizer.plot_equity_curve()
fig1.savefig('equity_curve.png')
# ドローダウン
fig2 = visualizer.plot_drawdown()
fig2.savefig('drawdown.png')
# リターン分布
fig3 = visualizer.plot_returns_distribution()
fig3.savefig('returns_distribution.png')
# 月次リターンヒートマップ
fig4 = visualizer.plot_monthly_returns_heatmap()
fig4.savefig('monthly_returns.png')
print("Backtest complete!")
return analyzer, visualizer
if __name__ == "__main__":
analyzer, visualizer = run_complete_backtest()
6.2 カスタム戦略の実装例
class MeanReversionStrategy(Strategy):
"""平均回帰戦略"""
def __init__(self, bars: DataHandler, events, lookback: int = 20,
z_score_threshold: float = 2.0):
self.bars = bars
self.symbol_list = self.bars.symbol_list
self.events = events
self.lookback = lookback
self.z_score_threshold = z_score_threshold
self.positions = {symbol: 0 for symbol in self.symbol_list}
def calculate_signals(self, event: Event):
"""Z-スコアに基づく平均回帰シグナルを生成"""
if event.type == 'MARKET':
for symbol in self.symbol_list:
bars = self.bars.get_latest_bars(symbol, self.lookback + 1)
if bars is not None and len(bars) > self.lookback:
# 移動平均と標準偏差を計算
prices = bars['Close'].values
mean = prices[:-1].mean()
std = prices[:-1].std()
if std > 0:
# Z-スコア
current_price = prices[-1]
z_score = (current_price - mean) / std
dt = bars.index[-1]
# オーバーソールド(買いシグナル)
if z_score < -self.z_score_threshold and \
self.positions[symbol] <= 0:
signal = SignalEvent(
strategy_id='MR',
symbol=symbol,
datetime=dt,
signal_type='LONG',
strength=abs(z_score) / self.z_score_threshold
)
self.events.put(signal)
self.positions[symbol] = 1
# オーバーボート(売りシグナル)
elif z_score > self.z_score_threshold and \
self.positions[symbol] >= 0:
signal = SignalEvent(
strategy_id='MR',
symbol=symbol,
datetime=dt,
signal_type='SHORT',
strength=abs(z_score) / self.z_score_threshold
)
self.events.put(signal)
self.positions[symbol] = -1
# ポジションクローズ
elif abs(z_score) < 0.5:
if self.positions[symbol] != 0:
signal = SignalEvent(
strategy_id='MR',
symbol=symbol,
datetime=dt,
signal_type='EXIT',
strength=1.0
)
self.events.put(signal)
self.positions[symbol] = 0
7. 高度な機能
7.1 リスク管理の実装
class RiskManager:
"""リスク管理クラス"""
def __init__(self, max_position_size: float = 0.1,
max_portfolio_risk: float = 0.02,
stop_loss: float = 0.02,
take_profit: float = 0.05):
self.max_position_size = max_position_size
self.max_portfolio_risk = max_portfolio_risk
self.stop_loss = stop_loss
self.take_profit = take_profit
self.entry_prices = {}
def check_position_size(self, symbol: str, order_value: float,
portfolio_value: float) -> bool:
"""ポジションサイズのチェック"""
position_ratio = order_value / portfolio_value
return position_ratio <= self.max_position_size
def check_stop_loss(self, symbol: str, current_price: float,
position_type: str) -> bool:
"""ストップロスのチェック"""
if symbol in self.entry_prices:
entry_price = self.entry_prices[symbol]
if position_type == 'LONG':
loss = (entry_price - current_price) / entry_price
return loss >= self.stop_loss
elif position_type == 'SHORT':
loss = (current_price - entry_price) / entry_price
return loss >= self.stop_loss
return False
def check_take_profit(self, symbol: str, current_price: float,
position_type: str) -> bool:
"""利益確定のチェック"""
if symbol in self.entry_prices:
entry_price = self.entry_prices[symbol]
if position_type == 'LONG':
profit = (current_price - entry_price) / entry_price
return profit >= self.take_profit
elif position_type == 'SHORT':
profit = (entry_price - current_price) / entry_price
return profit >= self.take_profit
return False
def calculate_position_size_kelly(self, win_rate: float,
avg_win: float, avg_loss: float,
portfolio_value: float) -> float:
"""Kelly基準によるポジションサイズ計算"""
if avg_loss == 0:
return 0
# Kelly %
b = avg_win / avg_loss
p = win_rate
q = 1 - p
kelly_percent = (b * p - q) / b
# 安全のため、Kelly%の25%を使用
safe_kelly = kelly_percent * 0.25
# 最大ポジションサイズでキャップ
position_size = min(safe_kelly, self.max_position_size)
return position_size * portfolio_value
7.2 複数時間軸の統合
class MultiTimeframeStrategy(Strategy):
"""複数時間軸戦略"""
def __init__(self, bars_dict: Dict[str, DataHandler], events):
self.bars_dict = bars_dict # {'1h': hourly_bars, '1d': daily_bars}
self.events = events
self.symbol_list = list(bars_dict.values())[0].symbol_list
def calculate_signals(self, event: Event):
"""複数時間軸からのシグナルを統合"""
if event.type == 'MARKET':
for symbol in self.symbol_list:
# 各時間軸でのトレンドを確認
hourly_trend = self._check_trend(
self.bars_dict['1h'], symbol, 20
)
daily_trend = self._check_trend(
self.bars_dict['1d'], symbol, 20
)
# トレンドが一致する場合のみシグナル生成
if hourly_trend == daily_trend and hourly_trend != 0:
signal_type = 'LONG' if hourly_trend > 0 else 'SHORT'
signal = SignalEvent(
strategy_id='MTF',
symbol=symbol,
datetime=datetime.now(),
signal_type=signal_type,
strength=1.0
)
self.events.put(signal)
def _check_trend(self, bars: DataHandler, symbol: str,
lookback: int) -> int:
"""トレンドをチェック(1: 上昇、-1: 下降、0: 横ばい)"""
data = bars.get_latest_bars(symbol, lookback)
if data is not None and len(data) >= lookback:
prices = data['Close'].values
# 単純な線形回帰でトレンドを判定
x = np.arange(len(prices))
slope = np.polyfit(x, prices, 1)[0]
# 傾きの標準化
normalized_slope = slope / prices.mean()
if normalized_slope > 0.001:
return 1
elif normalized_slope < -0.001:
return -1
else:
return 0
return 0
8. まとめ
8.1 イベント駆動型バックテスターの利点
- 現実的なシミュレーション: 実際の取引システムの動作を忠実に再現
- 柔軟性: 複雑な注文タイプや執行ロジックの実装が容易
- 拡張性: 新しいコンポーネントの追加が簡単
- デバッグ: 各イベントを追跡できるため、問題の特定が容易
8.2 ベクトル化バックテスターとの比較
| 特徴 | イベント駆動型 | ベクトル化型 |
|---|---|---|
| 実行速度 | 遅い | 速い |
| メモリ使用量 | 少ない | 多い |
| 現実性 | 高い | 低い |
| 実装の複雑さ | 高い | 低い |
| カスタマイズ性 | 高い | 限定的 |
8.3 実装のベストプラクティス
- モジュール設計: 各コンポーネントを独立して開発・テスト
- ログ記録: すべてのイベントと決定を記録
- エラーハンドリング: 例外処理を適切に実装
- パフォーマンス最適化: ボトルネックの特定と改善
- 検証: 結果の妥当性を常に確認
このフレームワークは、実践的なアルゴリズムトレーディングの開発において、戦略の検証と改善のための強力な基盤を提供します。