ML Documentation

イベント駆動型バックテスターの詳細解説

1. はじめに

このドキュメントは、GitHubリポジトリ「Enhanced-Event-Driven-Backtester」の分析結果をまとめたものです。イベント駆動型バックテスターは、実際の取引システムに近い形で戦略をテストできる高度なフレームワークです。

2. イベント駆動型アーキテクチャの概要

2.1 基本概念

イベント駆動型バックテスターは、以下の特徴を持ちます:

2.2 イベントフロー

graph TD
    A[データ更新] -->|MarketEvent| B[戦略]
    B -->|SignalEvent| C[ポートフォリオ]
    C -->|OrderEvent| D[実行ハンドラー]
    D -->|FillEvent| E[ポートフォリオ更新]
    E --> F[パフォーマンス計算]

3. コアコンポーネントの実装

3.1 イベントクラス

from abc import ABC, abstractmethod
from datetime import datetime
from typing import Optional, Union

class Event(ABC):
    """すべてのイベントの基底クラス"""

    @property
    @abstractmethod
    def type(self) -> str:
        """イベントタイプを返す"""
        raise NotImplementedError("Should implement type()")

class MarketEvent(Event):
    """
    市場データの更新を表すイベント

    すべての内部データが最新の市場データに更新されたことを示す
    """

    def __init__(self):
        self.type = 'MARKET'

class SignalEvent(Event):
    """
    売買シグナルを表すイベント

    Attributes:
        strategy_id: シグナルを生成した戦略のID
        symbol: ティッカーシンボル
        datetime: シグナル生成時刻
        signal_type: 'LONG' または 'SHORT'
        strength: シグナルの強さ(通常は確率や信頼度)
    """

    def __init__(self, strategy_id: str, symbol: str, datetime: datetime,
                 signal_type: str, strength: float):
        self.type = 'SIGNAL'
        self.strategy_id = strategy_id
        self.symbol = symbol
        self.datetime = datetime
        self.signal_type = signal_type
        self.strength = strength

class OrderEvent(Event):
    """
    注文を表すイベント

    Attributes:
        symbol: ティッカーシンボル
        order_type: 'MKT' (成行) or 'LMT' (指値)
        quantity: 注文数量
        direction: 'BUY' or 'SELL'
        limit_price: 指値価格(指値注文の場合)
    """

    def __init__(self, symbol: str, order_type: str, quantity: int,
                 direction: str, limit_price: Optional[float] = None):
        self.type = 'ORDER'
        self.symbol = symbol
        self.order_type = order_type
        self.quantity = quantity
        self.direction = direction
        self.limit_price = limit_price

class FillEvent(Event):
    """
    約定を表すイベント

    Attributes:
        datetime: 約定時刻
        symbol: ティッカーシンボル
        exchange: 取引所
        quantity: 約定数量
        direction: 'BUY' or 'SELL'
        fill_cost: 約定価格
        commission: 手数料
    """

    def __init__(self, datetime: datetime, symbol: str, exchange: str,
                 quantity: int, direction: str, fill_cost: float,
                 commission: Optional[float] = None):
        self.type = 'FILL'
        self.datetime = datetime
        self.symbol = symbol
        self.exchange = exchange
        self.quantity = quantity
        self.direction = direction
        self.fill_cost = fill_cost

        # 手数料の計算
        if commission is None:
            self.commission = self.calculate_commission()
        else:
            self.commission = commission

    def calculate_commission(self):
        """手数料を計算(デフォルトはIB手数料モデル)"""
        if self.quantity <= 500:
            commission = max(1.0, 0.005 * self.quantity)
        else:
            commission = max(1.0, 0.005 * 500) + 0.005 * (self.quantity - 500)
        return commission

3.2 データハンドラー

from abc import ABC, abstractmethod
import pandas as pd
import yfinance as yf
from typing import List, Dict

class DataHandler(ABC):
    """データハンドラーの抽象基底クラス"""

    @abstractmethod
    def get_latest_bar(self, symbol: str) -> pd.Series:
        """指定シンボルの最新バーを返す"""
        raise NotImplementedError("Should implement get_latest_bar()")

    @abstractmethod
    def get_latest_bars(self, symbol: str, N: int = 1) -> pd.DataFrame:
        """指定シンボルの最新N本のバーを返す"""
        raise NotImplementedError("Should implement get_latest_bars()")

    @abstractmethod
    def update_bars(self) -> bool:
        """すべてのシンボルを次のバーに更新"""
        raise NotImplementedError("Should implement update_bars()")

class YahooDataHandler(DataHandler):
    """Yahoo Financeからデータを取得するハンドラー"""

    def __init__(self, events, symbol_list: List[str], start_date: datetime,
                 end_date: datetime, interval: str = '1d'):
        self.events = events
        self.symbol_list = symbol_list
        self.start_date = start_date
        self.end_date = end_date
        self.interval = interval

        self.symbol_data = {}
        self.latest_symbol_data = {}
        self.bar_index = 0

        self._download_data()

    def _download_data(self):
        """Yahoo Financeからデータをダウンロード"""
        combined_index = None

        for symbol in self.symbol_list:
            try:
                data = yf.download(
                    symbol,
                    start=self.start_date,
                    end=self.end_date,
                    interval=self.interval
                )

                # インデックスの統合
                if combined_index is None:
                    combined_index = data.index
                else:
                    combined_index = combined_index.union(data.index)

                self.symbol_data[symbol] = data

            except Exception as e:
                print(f"Could not download data for {symbol}: {e}")

        # すべてのデータを同じインデックスに揃える
        for symbol in self.symbol_list:
            if symbol in self.symbol_data:
                self.symbol_data[symbol] = self.symbol_data[symbol].reindex(
                    index=combined_index, method='ffill'
                )
                self.latest_symbol_data[symbol] = []

    def get_latest_bar(self, symbol: str) -> pd.Series:
        """最新のバーを返す"""
        try:
            return self.latest_symbol_data[symbol][-1]
        except (KeyError, IndexError):
            return None

    def get_latest_bars(self, symbol: str, N: int = 1) -> pd.DataFrame:
        """最新のN本のバーを返す"""
        try:
            return self.latest_symbol_data[symbol][-N:]
        except KeyError:
            return None

    def update_bars(self) -> bool:
        """すべてのシンボルのバーを更新"""
        try:
            for symbol in self.symbol_list:
                if symbol in self.symbol_data:
                    bar = self.symbol_data[symbol].iloc[self.bar_index]
                    self.latest_symbol_data[symbol].append(bar)

            self.bar_index += 1
            self.events.put(MarketEvent())
            return True

        except IndexError:
            # データの終端に到達
            return False

3.3 戦略の実装

class Strategy(ABC):
    """戦略の抽象基底クラス"""

    @abstractmethod
    def calculate_signals(self, event: Event):
        """シグナルを計算する"""
        raise NotImplementedError("Should implement calculate_signals()")

class MovingAverageCrossStrategy(Strategy):
    """移動平均クロスオーバー戦略"""

    def __init__(self, bars: DataHandler, events, short_window: int = 100,
                 long_window: int = 400):
        self.bars = bars
        self.symbol_list = self.bars.symbol_list
        self.events = events

        self.short_window = short_window
        self.long_window = long_window

        # 各シンボルの状態を追跡
        self.bought = self._calculate_initial_bought()

    def _calculate_initial_bought(self) -> Dict[str, bool]:
        """初期ポジション状態を設定"""
        bought = {}
        for symbol in self.symbol_list:
            bought[symbol] = False
        return bought

    def calculate_signals(self, event: Event):
        """移動平均クロスオーバーに基づくシグナルを生成"""
        if event.type == 'MARKET':
            for symbol in self.symbol_list:
                bars = self.bars.get_latest_bars(symbol, self.long_window)

                if bars is not None and len(bars) >= self.long_window:
                    # 短期・長期移動平均を計算
                    short_ma = bars['Close'].iloc[-self.short_window:].mean()
                    long_ma = bars['Close'].mean()

                    current_price = bars['Close'].iloc[-1]
                    dt = bars.index[-1]

                    # ゴールデンクロス(買いシグナル)
                    if short_ma > long_ma and not self.bought[symbol]:
                        signal = SignalEvent(
                            strategy_id='MAC',
                            symbol=symbol,
                            datetime=dt,
                            signal_type='LONG',
                            strength=1.0
                        )
                        self.events.put(signal)
                        self.bought[symbol] = True

                    # デッドクロス(売りシグナル)
                    elif short_ma < long_ma and self.bought[symbol]:
                        signal = SignalEvent(
                            strategy_id='MAC',
                            symbol=symbol,
                            datetime=dt,
                            signal_type='EXIT',
                            strength=1.0
                        )
                        self.events.put(signal)
                        self.bought[symbol] = False

3.4 ポートフォリオ管理

class Portfolio:
    """ポートフォリオ管理クラス"""

    def __init__(self, bars: DataHandler, events, start_date: datetime,
                 initial_capital: float = 100000.0):
        self.bars = bars
        self.events = events
        self.symbol_list = self.bars.symbol_list
        self.start_date = start_date
        self.initial_capital = initial_capital

        self.current_positions = self._construct_current_positions()
        self.current_holdings = self._construct_current_holdings()

        self.all_positions = []
        self.all_holdings = []

    def _construct_current_positions(self) -> Dict:
        """現在のポジションを初期化"""
        positions = {symbol: 0 for symbol in self.symbol_list}
        positions['datetime'] = self.start_date
        return positions

    def _construct_current_holdings(self) -> Dict:
        """現在の保有資産を初期化"""
        holdings = {symbol: 0.0 for symbol in self.symbol_list}
        holdings['datetime'] = self.start_date
        holdings['cash'] = self.initial_capital
        holdings['commission'] = 0.0
        holdings['total'] = self.initial_capital
        return holdings

    def update_from_market(self, event: Event):
        """市場データ更新時にポートフォリオを更新"""
        if event.type == 'MARKET':
            # 現在の市場価値を計算
            holdings = {symbol: 0.0 for symbol in self.symbol_list}
            holdings['datetime'] = self.bars.get_latest_bar(
                self.symbol_list[0]
            ).name
            holdings['cash'] = self.current_holdings['cash']
            holdings['commission'] = self.current_holdings['commission']
            holdings['total'] = self.current_holdings['cash']

            for symbol in self.symbol_list:
                market_value = self.current_positions[symbol] * \
                    self.bars.get_latest_bar(symbol)['Close']
                holdings[symbol] = market_value
                holdings['total'] += market_value

            self.all_holdings.append(holdings)
            self.current_holdings = holdings

    def update_from_fill(self, event: Event):
        """約定イベントでポートフォリオを更新"""
        if event.type == 'FILL':
            # ポジションを更新
            fill_dir = 1 if event.direction == 'BUY' else -1
            self.current_positions[event.symbol] += fill_dir * event.quantity

            # 保有資産を更新
            fill_cost = event.fill_cost * event.quantity
            self.current_holdings[event.symbol] += fill_dir * fill_cost
            self.current_holdings['commission'] += event.commission
            self.current_holdings['cash'] -= (fill_dir * fill_cost + 
                                              event.commission)
            self.current_holdings['total'] = self.current_holdings['cash']

            for symbol in self.symbol_list:
                market_value = self.current_positions[symbol] * \
                    self.bars.get_latest_bar(symbol)['Close']
                self.current_holdings['total'] += market_value

    def generate_order(self, signal: Event):
        """シグナルから注文を生成"""
        if signal.type == 'SIGNAL':
            order = None

            symbol = signal.symbol
            direction = signal.signal_type
            strength = signal.strength

            # 注文サイズの計算(簡略化)
            current_price = self.bars.get_latest_bar(symbol)['Close']
            current_value = self.current_holdings['total']

            # Kelly基準やリスクパリティなどの高度な手法も実装可能
            # ここでは固定比率(ポートフォリオの10%)を使用
            position_size = int((current_value * 0.1) / current_price)

            if direction == 'LONG' and self.current_positions[symbol] == 0:
                order = OrderEvent(symbol, 'MKT', position_size, 'BUY')

            elif direction == 'EXIT' and self.current_positions[symbol] > 0:
                order = OrderEvent(symbol, 'MKT', 
                                  abs(self.current_positions[symbol]), 'SELL')

            if order:
                self.events.put(order)

3.5 実行ハンドラー

class ExecutionHandler(ABC):
    """実行ハンドラーの抽象基底クラス"""

    @abstractmethod
    def execute_order(self, event: Event):
        """注文を実行する"""
        raise NotImplementedError("Should implement execute_order()")

class SimpleSimulatedExecutionHandler(ExecutionHandler):
    """シンプルな約定シミュレーター"""

    def __init__(self, events):
        self.events = events

    def execute_order(self, event: Event):
        """成行注文を即座に約定させる"""
        if event.type == 'ORDER':
            # 実際の実装では、スリッページやマーケットインパクトを考慮
            # ここでは簡略化のため、現在価格で即座に約定

            fill_event = FillEvent(
                datetime=datetime.now(),
                symbol=event.symbol,
                exchange='SIMULATED',
                quantity=event.quantity,
                direction=event.direction,
                fill_cost=None  # 実際には市場価格を取得
            )

            self.events.put(fill_event)

class AdvancedExecutionHandler(ExecutionHandler):
    """高度な約定シミュレーター"""

    def __init__(self, events, bars: DataHandler, slippage_model=None):
        self.events = events
        self.bars = bars
        self.slippage_model = slippage_model or self._default_slippage_model

    def _default_slippage_model(self, order_size: int, 
                                current_volume: float) -> float:
        """デフォルトのスリッページモデル"""
        # 注文サイズが日次出来高の何%かに基づいてスリッページを計算
        participation_rate = order_size / current_volume

        # 線形インパクトモデル(簡略化)
        if participation_rate < 0.01:
            return 0.0001  # 0.01%
        elif participation_rate < 0.05:
            return 0.0005  # 0.05%
        else:
            return 0.001   # 0.1%

    def execute_order(self, event: Event):
        """スリッページを考慮した約定シミュレーション"""
        if event.type == 'ORDER':
            # 現在の市場データを取得
            latest_bar = self.bars.get_latest_bar(event.symbol)

            if latest_bar is not None:
                # 基準価格(成行注文の場合は現在価格)
                if event.order_type == 'MKT':
                    base_price = latest_bar['Close']

                    # スリッページの計算
                    slippage = self.slippage_model(
                        event.quantity, 
                        latest_bar['Volume']
                    )

                    # 買い注文の場合は価格が上昇、売り注文の場合は下降
                    if event.direction == 'BUY':
                        fill_price = base_price * (1 + slippage)
                    else:
                        fill_price = base_price * (1 - slippage)

                    # 約定イベントの生成
                    fill_event = FillEvent(
                        datetime=latest_bar.name,
                        symbol=event.symbol,
                        exchange='SIMULATED',
                        quantity=event.quantity,
                        direction=event.direction,
                        fill_cost=fill_price
                    )

                    self.events.put(fill_event)

4. バックテストエンジン

4.1 メインループの実装

import queue
from typing import Type

class Backtest:
    """バックテストエンジンのメインクラス"""

    def __init__(self, 
                 data_dir: str,
                 symbol_list: List[str],
                 initial_capital: float,
                 heartbeat: float,
                 start_date: datetime,
                 end_date: datetime,
                 interval: str,
                 data_handler: Type[DataHandler],
                 execution_handler: Type[ExecutionHandler],
                 portfolio: Type[Portfolio],
                 strategy: Type[Strategy]):

        self.data_dir = data_dir
        self.symbol_list = symbol_list
        self.initial_capital = initial_capital
        self.heartbeat = heartbeat
        self.start_date = start_date
        self.end_date = end_date
        self.interval = interval

        self.events = queue.Queue()

        self.data_handler = data_handler(
            self.events, self.symbol_list, 
            self.start_date, self.end_date, self.interval
        )

        self.strategy = strategy(self.data_handler, self.events)

        self.portfolio = portfolio(
            self.data_handler, self.events, 
            self.start_date, self.initial_capital
        )

        self.execution_handler = execution_handler(self.events)

    def _run_backtest(self):
        """バックテストのメインループ"""
        while True:
            # データハンドラーを更新
            if self.data_handler.update_bars():
                # イベントキューを処理
                while True:
                    try:
                        event = self.events.get(False)
                    except queue.Empty:
                        break
                    else:
                        if event is not None:
                            if event.type == 'MARKET':
                                self.strategy.calculate_signals(event)
                                self.portfolio.update_from_market(event)

                            elif event.type == 'SIGNAL':
                                self.portfolio.generate_order(event)

                            elif event.type == 'ORDER':
                                self.execution_handler.execute_order(event)

                            elif event.type == 'FILL':
                                self.portfolio.update_from_fill(event)
            else:
                # データの終端に到達
                break

    def simulate_trading(self):
        """取引シミュレーションを実行"""
        self._run_backtest()
        return self.portfolio.all_holdings

5. パフォーマンス分析

5.1 評価指標の実装

import numpy as np
import pandas as pd
from typing import List, Dict, Tuple

class PerformanceAnalyzer:
    """パフォーマンス分析クラス"""

    def __init__(self, holdings: List[Dict]):
        self.holdings = pd.DataFrame(holdings)
        self.holdings.set_index('datetime', inplace=True)

    def calculate_total_return(self) -> float:
        """総リターンを計算"""
        initial_value = self.holdings['total'].iloc[0]
        final_value = self.holdings['total'].iloc[-1]
        return (final_value - initial_value) / initial_value

    def calculate_sharpe_ratio(self, periods: int = 252) -> float:
        """シャープレシオを計算"""
        returns = self.holdings['total'].pct_change()
        return np.sqrt(periods) * returns.mean() / returns.std()

    def calculate_max_drawdown(self) -> Tuple[float, pd.Timestamp, pd.Timestamp]:
        """最大ドローダウンを計算"""
        wealth_index = self.holdings['total']
        previous_peaks = wealth_index.cummax()
        drawdowns = (wealth_index - previous_peaks) / previous_peaks

        max_drawdown = drawdowns.min()
        max_drawdown_end = drawdowns.idxmin()

        # ドローダウン開始時点を見つける
        window = drawdowns[:max_drawdown_end]
        max_drawdown_start = wealth_index[:max_drawdown_end][
            window == 0
        ].index[-1]

        return max_drawdown, max_drawdown_start, max_drawdown_end

    def calculate_calmar_ratio(self, periods: int = 252) -> float:
        """カルマーレシオを計算"""
        annual_return = self.calculate_annual_return(periods)
        max_dd, _, _ = self.calculate_max_drawdown()
        return annual_return / abs(max_dd)

    def calculate_annual_return(self, periods: int = 252) -> float:
        """年率リターンを計算"""
        total_return = self.calculate_total_return()
        num_periods = len(self.holdings)
        return (1 + total_return) ** (periods / num_periods) - 1

    def calculate_win_rate(self, positions: pd.DataFrame) -> float:
        """勝率を計算"""
        # ポジションの損益を計算
        trades = self._extract_trades(positions)
        winning_trades = sum(1 for trade in trades if trade['pnl'] > 0)
        return winning_trades / len(trades) if trades else 0

    def _extract_trades(self, positions: pd.DataFrame) -> List[Dict]:
        """ポジション履歴から個別取引を抽出"""
        trades = []

        for symbol in positions.columns:
            if symbol == 'datetime':
                continue

            position_series = positions[symbol]
            entries = position_series[
                (position_series != 0) & 
                (position_series.shift(1) == 0)
            ]
            exits = position_series[
                (position_series == 0) & 
                (position_series.shift(1) != 0)
            ]

            # エントリーとエグジットをペアリング
            for entry_date in entries.index:
                exit_dates = exits[exits.index > entry_date]
                if not exit_dates.empty:
                    exit_date = exit_dates.index[0]

                    # 損益を計算(簡略化)
                    entry_price = self.holdings.loc[entry_date, symbol]
                    exit_price = self.holdings.loc[exit_date, symbol]
                    pnl = exit_price - entry_price

                    trades.append({
                        'symbol': symbol,
                        'entry_date': entry_date,
                        'exit_date': exit_date,
                        'pnl': pnl
                    })

        return trades

    def generate_performance_report(self) -> Dict:
        """総合的なパフォーマンスレポートを生成"""
        total_return = self.calculate_total_return()
        annual_return = self.calculate_annual_return()
        sharpe_ratio = self.calculate_sharpe_ratio()
        max_dd, dd_start, dd_end = self.calculate_max_drawdown()
        calmar_ratio = self.calculate_calmar_ratio()

        report = {
            'Total Return': f"{total_return:.2%}",
            'Annual Return': f"{annual_return:.2%}",
            'Sharpe Ratio': f"{sharpe_ratio:.2f}",
            'Max Drawdown': f"{max_dd:.2%}",
            'Drawdown Period': f"{dd_start} to {dd_end}",
            'Calmar Ratio': f"{calmar_ratio:.2f}",
            'Final Portfolio Value': f"${self.holdings['total'].iloc[-1]:,.2f}"
        }

        return report

5.2 可視化

import matplotlib.pyplot as plt
import seaborn as sns

class BacktestVisualizer:
    """バックテスト結果の可視化クラス"""

    def __init__(self, holdings: pd.DataFrame):
        self.holdings = holdings

    def plot_equity_curve(self, benchmark=None):
        """エクイティカーブをプロット"""
        fig, ax = plt.subplots(figsize=(12, 6))

        # ポートフォリオのエクイティカーブ
        ax.plot(self.holdings.index, self.holdings['total'], 
                label='Portfolio', linewidth=2)

        # ベンチマークがある場合
        if benchmark is not None:
            ax.plot(benchmark.index, benchmark.values, 
                   label='Benchmark', linewidth=2, alpha=0.7)

        ax.set_xlabel('Date')
        ax.set_ylabel('Portfolio Value ($)')
        ax.set_title('Equity Curve')
        ax.legend()
        ax.grid(True, alpha=0.3)

        return fig

    def plot_drawdown(self):
        """ドローダウンをプロット"""
        wealth_index = self.holdings['total']
        previous_peaks = wealth_index.cummax()
        drawdown = (wealth_index - previous_peaks) / previous_peaks

        fig, ax = plt.subplots(figsize=(12, 4))
        drawdown.plot(ax=ax, color='red', alpha=0.7)
        ax.fill_between(drawdown.index, 0, drawdown.values, 
                       color='red', alpha=0.3)

        ax.set_xlabel('Date')
        ax.set_ylabel('Drawdown (%)')
        ax.set_title('Drawdown Over Time')
        ax.grid(True, alpha=0.3)

        return fig

    def plot_returns_distribution(self):
        """リターン分布をプロット"""
        returns = self.holdings['total'].pct_change().dropna()

        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))

        # ヒストグラム
        ax1.hist(returns, bins=50, alpha=0.7, color='blue', edgecolor='black')
        ax1.axvline(returns.mean(), color='red', linestyle='--', 
                   label=f'Mean: {returns.mean():.4f}')
        ax1.axvline(0, color='black', linestyle='-', alpha=0.3)
        ax1.set_xlabel('Daily Returns')
        ax1.set_ylabel('Frequency')
        ax1.set_title('Returns Distribution')
        ax1.legend()

        # Q-Qプロット
        from scipy import stats
        stats.probplot(returns, dist="norm", plot=ax2)
        ax2.set_title('Normal Q-Q Plot')

        return fig

    def plot_monthly_returns_heatmap(self):
        """月次リターンのヒートマップ"""
        # 日次リターンから月次リターンを計算
        daily_returns = self.holdings['total'].pct_change()
        monthly_returns = daily_returns.resample('M').apply(
            lambda x: (1 + x).prod() - 1
        )

        # 年月でピボット
        monthly_returns_df = pd.DataFrame(monthly_returns)
        monthly_returns_df['Year'] = monthly_returns_df.index.year
        monthly_returns_df['Month'] = monthly_returns_df.index.month

        pivot = monthly_returns_df.pivot(
            index='Year', 
            columns='Month', 
            values='total'
        )

        # ヒートマップ
        fig, ax = plt.subplots(figsize=(12, 8))
        sns.heatmap(pivot, annot=True, fmt='.2%', cmap='RdYlGn', 
                   center=0, ax=ax)
        ax.set_title('Monthly Returns Heatmap')
        ax.set_xlabel('Month')
        ax.set_ylabel('Year')

        return fig

6. 実装例

6.1 完全なバックテストの実行

def run_complete_backtest():
    """完全なバックテストを実行"""

    # パラメータ設定
    symbol_list = ['AAPL', 'MSFT', 'GOOGL', 'AMZN']
    initial_capital = 100000.0
    start_date = datetime(2020, 1, 1)
    end_date = datetime(2023, 12, 31)

    # バックテストの初期化
    backtest = Backtest(
        data_dir='./data',
        symbol_list=symbol_list,
        initial_capital=initial_capital,
        heartbeat=0.0,
        start_date=start_date,
        end_date=end_date,
        interval='1d',
        data_handler=YahooDataHandler,
        execution_handler=AdvancedExecutionHandler,
        portfolio=Portfolio,
        strategy=MovingAverageCrossStrategy
    )

    # シミュレーション実行
    print("Running backtest...")
    holdings = backtest.simulate_trading()

    # パフォーマンス分析
    print("\nAnalyzing performance...")
    analyzer = PerformanceAnalyzer(holdings)
    report = analyzer.generate_performance_report()

    print("\n=== Performance Report ===")
    for metric, value in report.items():
        print(f"{metric}: {value}")

    # 可視化
    print("\nGenerating visualizations...")
    visualizer = BacktestVisualizer(analyzer.holdings)

    # エクイティカーブ
    fig1 = visualizer.plot_equity_curve()
    fig1.savefig('equity_curve.png')

    # ドローダウン
    fig2 = visualizer.plot_drawdown()
    fig2.savefig('drawdown.png')

    # リターン分布
    fig3 = visualizer.plot_returns_distribution()
    fig3.savefig('returns_distribution.png')

    # 月次リターンヒートマップ
    fig4 = visualizer.plot_monthly_returns_heatmap()
    fig4.savefig('monthly_returns.png')

    print("Backtest complete!")

    return analyzer, visualizer

if __name__ == "__main__":
    analyzer, visualizer = run_complete_backtest()

6.2 カスタム戦略の実装例

class MeanReversionStrategy(Strategy):
    """平均回帰戦略"""

    def __init__(self, bars: DataHandler, events, lookback: int = 20,
                 z_score_threshold: float = 2.0):
        self.bars = bars
        self.symbol_list = self.bars.symbol_list
        self.events = events

        self.lookback = lookback
        self.z_score_threshold = z_score_threshold

        self.positions = {symbol: 0 for symbol in self.symbol_list}

    def calculate_signals(self, event: Event):
        """Z-スコアに基づく平均回帰シグナルを生成"""
        if event.type == 'MARKET':
            for symbol in self.symbol_list:
                bars = self.bars.get_latest_bars(symbol, self.lookback + 1)

                if bars is not None and len(bars) > self.lookback:
                    # 移動平均と標準偏差を計算
                    prices = bars['Close'].values
                    mean = prices[:-1].mean()
                    std = prices[:-1].std()

                    if std > 0:
                        # Z-スコア
                        current_price = prices[-1]
                        z_score = (current_price - mean) / std

                        dt = bars.index[-1]

                        # オーバーソールド(買いシグナル)
                        if z_score < -self.z_score_threshold and \
                           self.positions[symbol] <= 0:
                            signal = SignalEvent(
                                strategy_id='MR',
                                symbol=symbol,
                                datetime=dt,
                                signal_type='LONG',
                                strength=abs(z_score) / self.z_score_threshold
                            )
                            self.events.put(signal)
                            self.positions[symbol] = 1

                        # オーバーボート(売りシグナル)
                        elif z_score > self.z_score_threshold and \
                             self.positions[symbol] >= 0:
                            signal = SignalEvent(
                                strategy_id='MR',
                                symbol=symbol,
                                datetime=dt,
                                signal_type='SHORT',
                                strength=abs(z_score) / self.z_score_threshold
                            )
                            self.events.put(signal)
                            self.positions[symbol] = -1

                        # ポジションクローズ
                        elif abs(z_score) < 0.5:
                            if self.positions[symbol] != 0:
                                signal = SignalEvent(
                                    strategy_id='MR',
                                    symbol=symbol,
                                    datetime=dt,
                                    signal_type='EXIT',
                                    strength=1.0
                                )
                                self.events.put(signal)
                                self.positions[symbol] = 0

7. 高度な機能

7.1 リスク管理の実装

class RiskManager:
    """リスク管理クラス"""

    def __init__(self, max_position_size: float = 0.1,
                 max_portfolio_risk: float = 0.02,
                 stop_loss: float = 0.02,
                 take_profit: float = 0.05):
        self.max_position_size = max_position_size
        self.max_portfolio_risk = max_portfolio_risk
        self.stop_loss = stop_loss
        self.take_profit = take_profit

        self.entry_prices = {}

    def check_position_size(self, symbol: str, order_value: float,
                           portfolio_value: float) -> bool:
        """ポジションサイズのチェック"""
        position_ratio = order_value / portfolio_value
        return position_ratio <= self.max_position_size

    def check_stop_loss(self, symbol: str, current_price: float,
                       position_type: str) -> bool:
        """ストップロスのチェック"""
        if symbol in self.entry_prices:
            entry_price = self.entry_prices[symbol]

            if position_type == 'LONG':
                loss = (entry_price - current_price) / entry_price
                return loss >= self.stop_loss
            elif position_type == 'SHORT':
                loss = (current_price - entry_price) / entry_price
                return loss >= self.stop_loss

        return False

    def check_take_profit(self, symbol: str, current_price: float,
                         position_type: str) -> bool:
        """利益確定のチェック"""
        if symbol in self.entry_prices:
            entry_price = self.entry_prices[symbol]

            if position_type == 'LONG':
                profit = (current_price - entry_price) / entry_price
                return profit >= self.take_profit
            elif position_type == 'SHORT':
                profit = (entry_price - current_price) / entry_price
                return profit >= self.take_profit

        return False

    def calculate_position_size_kelly(self, win_rate: float,
                                     avg_win: float, avg_loss: float,
                                     portfolio_value: float) -> float:
        """Kelly基準によるポジションサイズ計算"""
        if avg_loss == 0:
            return 0

        # Kelly %
        b = avg_win / avg_loss
        p = win_rate
        q = 1 - p

        kelly_percent = (b * p - q) / b

        # 安全のため、Kelly%の25%を使用
        safe_kelly = kelly_percent * 0.25

        # 最大ポジションサイズでキャップ
        position_size = min(safe_kelly, self.max_position_size)

        return position_size * portfolio_value

7.2 複数時間軸の統合

class MultiTimeframeStrategy(Strategy):
    """複数時間軸戦略"""

    def __init__(self, bars_dict: Dict[str, DataHandler], events):
        self.bars_dict = bars_dict  # {'1h': hourly_bars, '1d': daily_bars}
        self.events = events
        self.symbol_list = list(bars_dict.values())[0].symbol_list

    def calculate_signals(self, event: Event):
        """複数時間軸からのシグナルを統合"""
        if event.type == 'MARKET':
            for symbol in self.symbol_list:
                # 各時間軸でのトレンドを確認
                hourly_trend = self._check_trend(
                    self.bars_dict['1h'], symbol, 20
                )
                daily_trend = self._check_trend(
                    self.bars_dict['1d'], symbol, 20
                )

                # トレンドが一致する場合のみシグナル生成
                if hourly_trend == daily_trend and hourly_trend != 0:
                    signal_type = 'LONG' if hourly_trend > 0 else 'SHORT'

                    signal = SignalEvent(
                        strategy_id='MTF',
                        symbol=symbol,
                        datetime=datetime.now(),
                        signal_type=signal_type,
                        strength=1.0
                    )
                    self.events.put(signal)

    def _check_trend(self, bars: DataHandler, symbol: str,
                    lookback: int) -> int:
        """トレンドをチェック(1: 上昇、-1: 下降、0: 横ばい)"""
        data = bars.get_latest_bars(symbol, lookback)

        if data is not None and len(data) >= lookback:
            prices = data['Close'].values

            # 単純な線形回帰でトレンドを判定
            x = np.arange(len(prices))
            slope = np.polyfit(x, prices, 1)[0]

            # 傾きの標準化
            normalized_slope = slope / prices.mean()

            if normalized_slope > 0.001:
                return 1
            elif normalized_slope < -0.001:
                return -1
            else:
                return 0

        return 0

8. まとめ

8.1 イベント駆動型バックテスターの利点

  1. 現実的なシミュレーション: 実際の取引システムの動作を忠実に再現
  2. 柔軟性: 複雑な注文タイプや執行ロジックの実装が容易
  3. 拡張性: 新しいコンポーネントの追加が簡単
  4. デバッグ: 各イベントを追跡できるため、問題の特定が容易

8.2 ベクトル化バックテスターとの比較

特徴 イベント駆動型 ベクトル化型
実行速度 遅い 速い
メモリ使用量 少ない 多い
現実性 高い 低い
実装の複雑さ 高い 低い
カスタマイズ性 高い 限定的

8.3 実装のベストプラクティス

  1. モジュール設計: 各コンポーネントを独立して開発・テスト
  2. ログ記録: すべてのイベントと決定を記録
  3. エラーハンドリング: 例外処理を適切に実装
  4. パフォーマンス最適化: ボトルネックの特定と改善
  5. 検証: 結果の妥当性を常に確認

このフレームワークは、実践的なアルゴリズムトレーディングの開発において、戦略の検証と改善のための強力な基盤を提供します。