ML Documentation

マーケットマイクロストラクチャと高頻度取引

1. はじめに

マーケットマイクロストラクチャは、金融市場における価格形成プロセスと取引メカニズムを研究する分野です。暗号通貨市場では、24時間365日の取引、分散型取引所の存在、高いボラティリティなど、独特の特徴があります。

2. オーダーフロー分析

2.1 オーダーフローの基本概念

オーダーフローは、市場参加者の売買注文の流れを表します。これを分析することで、市場の需給バランスと価格動向を予測できます。

import pandas as pd
import numpy as np
from typing import Dict, List, Tuple
import ccxt

class OrderFlowAnalyzer:
    def __init__(self, exchange: str = 'binance'):
        self.exchange = getattr(ccxt, exchange)()
        self.order_imbalance_threshold = 0.6

    def calculate_order_imbalance(self, orderbook: Dict) -> float:
        """オーダーインバランスを計算"""
        bids = np.array(orderbook['bids'])
        asks = np.array(orderbook['asks'])

        # 上位N段階の注文量を取得
        n_levels = min(20, len(bids), len(asks))
        bid_volume = np.sum([bid[1] for bid in bids[:n_levels]])
        ask_volume = np.sum([ask[1] for ask in asks[:n_levels]])

        # インバランス率を計算
        total_volume = bid_volume + ask_volume
        if total_volume == 0:
            return 0

        imbalance = (bid_volume - ask_volume) / total_volume
        return imbalance

    def analyze_order_flow_toxicity(self, trades: pd.DataFrame) -> pd.DataFrame:
        """VPIN(Volume-synchronized Probability of Informed Trading)を計算"""
        trades = trades.copy()

        # 取引の方向を推定(Lee-Ready アルゴリズム)
        trades['mid_price'] = (trades['bid'] + trades['ask']) / 2
        trades['trade_sign'] = np.where(
            trades['price'] > trades['mid_price'], 1,
            np.where(trades['price'] < trades['mid_price'], -1, 0)
        )

        # ボリュームバケットを作成
        bucket_size = trades['volume'].sum() / 50  # 50バケット
        trades['volume_bucket'] = (trades['volume'].cumsum() // bucket_size).astype(int)

        # 各バケットでの買い・売りボリュームを計算
        bucket_stats = trades.groupby('volume_bucket').apply(
            lambda x: pd.Series({
                'buy_volume': x[x['trade_sign'] == 1]['volume'].sum(),
                'sell_volume': x[x['trade_sign'] == -1]['volume'].sum(),
                'total_volume': x['volume'].sum()
            })
        )

        # VPINを計算
        bucket_stats['order_imbalance'] = np.abs(
            bucket_stats['buy_volume'] - bucket_stats['sell_volume']
        )
        bucket_stats['vpin'] = bucket_stats['order_imbalance'].rolling(
            window=50, min_periods=1
        ).mean() / bucket_stats['total_volume'].rolling(
            window=50, min_periods=1
        ).mean()

        return bucket_stats

2.2 アグレッシブ注文とパッシブ注文の分類

class OrderClassifier:
    def __init__(self):
        self.aggressive_ratio_threshold = 0.7

    def classify_orders(self, order_data: pd.DataFrame) -> pd.DataFrame:
        """注文をアグレッシブ/パッシブに分類"""
        order_data = order_data.copy()

        # マーケット注文はアグレッシブ
        order_data['is_aggressive'] = order_data['order_type'] == 'market'

        # 指値注文の場合、最良気配との距離で判定
        limit_orders = order_data['order_type'] == 'limit'

        # 買い注文
        buy_orders = limit_orders & (order_data['side'] == 'buy')
        order_data.loc[buy_orders, 'is_aggressive'] = (
            order_data.loc[buy_orders, 'price'] >= 
            order_data.loc[buy_orders, 'best_ask'] * 0.9999
        )

        # 売り注文
        sell_orders = limit_orders & (order_data['side'] == 'sell')
        order_data.loc[sell_orders, 'is_aggressive'] = (
            order_data.loc[sell_orders, 'price'] <= 
            order_data.loc[sell_orders, 'best_bid'] * 1.0001
        )

        return order_data

    def calculate_aggression_metrics(self, 
                                   classified_orders: pd.DataFrame) -> Dict:
        """アグレッション指標を計算"""
        metrics = {}

        # 時間帯別のアグレッシブ注文比率
        classified_orders['hour'] = pd.to_datetime(
            classified_orders['timestamp']
        ).dt.hour

        hourly_aggression = classified_orders.groupby('hour').agg({
            'is_aggressive': ['sum', 'count']
        })
        hourly_aggression.columns = ['aggressive_count', 'total_count']
        hourly_aggression['aggression_ratio'] = (
            hourly_aggression['aggressive_count'] / 
            hourly_aggression['total_count']
        )

        metrics['hourly_aggression'] = hourly_aggression

        # サイド別のアグレッション
        side_aggression = classified_orders.groupby('side')['is_aggressive'].mean()
        metrics['buy_aggression'] = side_aggression.get('buy', 0)
        metrics['sell_aggression'] = side_aggression.get('sell', 0)

        return metrics

3. ティックデータ処理

3.1 高速ティックデータ処理システム

import asyncio
from collections import deque
from datetime import datetime, timedelta
import numpy as np

class TickDataProcessor:
    def __init__(self, symbol: str, buffer_size: int = 10000):
        self.symbol = symbol
        self.tick_buffer = deque(maxlen=buffer_size)
        self.processed_ticks = 0
        self.start_time = datetime.now()

    async def process_tick(self, tick: Dict) -> None:
        """ティックデータをリアルタイム処理"""
        # タイムスタンプの正規化
        tick['normalized_time'] = self._normalize_timestamp(tick['timestamp'])

        # ティックの品質チェック
        if self._validate_tick(tick):
            # バッファに追加
            self.tick_buffer.append(tick)
            self.processed_ticks += 1

            # マイクロストラクチャノイズの除去
            if len(self.tick_buffer) >= 100:
                await self._remove_microstructure_noise()

            # 特徴量の計算
            features = self._calculate_tick_features()

            # イベント検出
            events = await self._detect_market_events(features)

            if events:
                await self._handle_events(events)

    def _normalize_timestamp(self, timestamp: int) -> datetime:
        """ナノ秒精度のタイムスタンプを正規化"""
        return datetime.fromtimestamp(timestamp / 1e9)

    def _validate_tick(self, tick: Dict) -> bool:
        """ティックデータの妥当性検証"""
        # 価格の妥当性チェック
        if tick['price'] <= 0 or tick['price'] > 1e6:
            return False

        # ボリュームの妥当性チェック
        if tick['volume'] < 0 or tick['volume'] > 1e6:
            return False

        # タイムスタンプの妥当性チェック
        tick_time = self._normalize_timestamp(tick['timestamp'])
        if tick_time > datetime.now() + timedelta(seconds=60):
            return False

        return True

    async def _remove_microstructure_noise(self) -> None:
        """マイクロストラクチャノイズを除去"""
        prices = [tick['price'] for tick in self.tick_buffer]

        # ローパスフィルタの適用
        from scipy.signal import butter, filtfilt

        # バターワースフィルタの設計
        fs = 1.0  # サンプリング周波数
        cutoff = 0.1  # カットオフ周波数
        order = 3

        b, a = butter(order, cutoff / (fs / 2), btype='low')
        filtered_prices = filtfilt(b, a, prices)

        # フィルタ後の価格で更新
        for i, tick in enumerate(self.tick_buffer):
            tick['filtered_price'] = filtered_prices[i]

    def _calculate_tick_features(self) -> Dict:
        """ティックレベルの特徴量を計算"""
        if len(self.tick_buffer) < 10:
            return {}

        recent_ticks = list(self.tick_buffer)[-100:]
        prices = [t['price'] for t in recent_ticks]
        volumes = [t['volume'] for t in recent_ticks]

        features = {
            # 価格関連の特徴量
            'price_mean': np.mean(prices),
            'price_std': np.std(prices),
            'price_skew': self._calculate_skew(prices),
            'price_kurtosis': self._calculate_kurtosis(prices),

            # ボリューム関連の特徴量
            'volume_mean': np.mean(volumes),
            'volume_std': np.std(volumes),
            'volume_price_corr': np.corrcoef(prices, volumes)[0, 1],

            # ティック間隔の特徴量
            'tick_rate': self._calculate_tick_rate(recent_ticks),
            'tick_clustering': self._calculate_tick_clustering(recent_ticks),

            # マイクロプライスの計算
            'microprice': self._calculate_microprice()
        }

        return features

    def _calculate_skew(self, data: List[float]) -> float:
        """歪度を計算"""
        from scipy.stats import skew
        return skew(data)

    def _calculate_kurtosis(self, data: List[float]) -> float:
        """尖度を計算"""
        from scipy.stats import kurtosis
        return kurtosis(data)

    def _calculate_tick_rate(self, ticks: List[Dict]) -> float:
        """ティックレートを計算"""
        if len(ticks) < 2:
            return 0

        time_diffs = []
        for i in range(1, len(ticks)):
            t1 = self._normalize_timestamp(ticks[i-1]['timestamp'])
            t2 = self._normalize_timestamp(ticks[i]['timestamp'])
            time_diffs.append((t2 - t1).total_seconds())

        return 1 / np.mean(time_diffs) if np.mean(time_diffs) > 0 else 0

    def _calculate_tick_clustering(self, ticks: List[Dict]) -> float:
        """ティッククラスタリングを計算(Hawkes過程の強度)"""
        if len(ticks) < 10:
            return 0

        # 時間間隔の計算
        intervals = []
        for i in range(1, len(ticks)):
            t1 = self._normalize_timestamp(ticks[i-1]['timestamp'])
            t2 = self._normalize_timestamp(ticks[i]['timestamp'])
            intervals.append((t2 - t1).total_seconds())

        # 指数移動平均との比較
        ema_interval = np.mean(intervals)
        actual_intervals = np.array(intervals)

        # クラスタリング指標(実際の間隔がEMAより短い場合の比率)
        clustering = np.sum(actual_intervals < ema_interval * 0.5) / len(intervals)

        return clustering

    def _calculate_microprice(self) -> float:
        """マイクロプライスを計算"""
        if not self.tick_buffer:
            return 0

        last_tick = self.tick_buffer[-1]

        # 最良気配が利用可能な場合
        if 'bid' in last_tick and 'ask' in last_tick:
            bid = last_tick['bid']
            ask = last_tick['ask']
            bid_size = last_tick.get('bid_size', 1)
            ask_size = last_tick.get('ask_size', 1)

            # サイズ加重マイクロプライス
            microprice = (bid * ask_size + ask * bid_size) / (bid_size + ask_size)
            return microprice

        return last_tick['price']

    async def _detect_market_events(self, features: Dict) -> List[Dict]:
        """市場イベントを検出"""
        events = []

        if not features:
            return events

        # 急激な価格変動の検出
        if features['price_std'] > features['price_mean'] * 0.001:
            events.append({
                'type': 'high_volatility',
                'severity': features['price_std'] / features['price_mean'],
                'timestamp': datetime.now()
            })

        # 異常なティッククラスタリングの検出
        if features['tick_clustering'] > 0.7:
            events.append({
                'type': 'tick_clustering',
                'intensity': features['tick_clustering'],
                'timestamp': datetime.now()
            })

        # ボリュームスパイクの検出
        if features['volume_std'] > features['volume_mean'] * 2:
            events.append({
                'type': 'volume_spike',
                'magnitude': features['volume_std'] / features['volume_mean'],
                'timestamp': datetime.now()
            })

        return events

    async def _handle_events(self, events: List[Dict]) -> None:
        """検出されたイベントを処理"""
        for event in events:
            print(f"Event detected: {event['type']} at {event['timestamp']}")
            # イベントに応じた処理を実装

4. 流動性提供戦略

4.1 動的スプレッド調整

class LiquidityProvider:
    def __init__(self, symbol: str, base_spread: float = 0.001):
        self.symbol = symbol
        self.base_spread = base_spread
        self.inventory = 0
        self.inventory_limit = 1000
        self.risk_aversion = 0.1

    def calculate_optimal_spread(self, 
                               market_data: Dict,
                               volatility: float,
                               order_flow_imbalance: float) -> Tuple[float, float]:
        """最適なビッド・アスクスプレッドを計算"""

        # 在庫リスクの計算
        inventory_risk = self.risk_aversion * abs(self.inventory) / self.inventory_limit

        # ボラティリティ調整
        volatility_adjustment = volatility * 2

        # オーダーフローインバランス調整
        flow_adjustment = abs(order_flow_imbalance) * self.base_spread

        # 基本スプレッドの計算
        adjusted_spread = self.base_spread + volatility_adjustment + flow_adjustment

        # 在庫に基づく非対称スプレッド
        if self.inventory > 0:  # ロングポジション
            bid_spread = adjusted_spread * (1 + inventory_risk)
            ask_spread = adjusted_spread * (1 - inventory_risk * 0.5)
        else:  # ショートポジション
            bid_spread = adjusted_spread * (1 - abs(inventory_risk) * 0.5)
            ask_spread = adjusted_spread * (1 + abs(inventory_risk))

        return bid_spread, ask_spread

    def update_quotes(self, 
                     mid_price: float,
                     market_data: Dict) -> Tuple[float, float, float, float]:
        """気配値を更新"""
        # 市場データから特徴量を抽出
        volatility = self._calculate_realized_volatility(market_data)
        imbalance = self._calculate_order_imbalance(market_data)

        # 最適スプレッドを計算
        bid_spread, ask_spread = self.calculate_optimal_spread(
            market_data, volatility, imbalance
        )

        # 気配価格を設定
        bid_price = mid_price * (1 - bid_spread)
        ask_price = mid_price * (1 + ask_spread)

        # 注文サイズを計算
        bid_size = self._calculate_order_size('buy', market_data)
        ask_size = self._calculate_order_size('sell', market_data)

        return bid_price, bid_size, ask_price, ask_size

    def _calculate_realized_volatility(self, market_data: Dict) -> float:
        """実現ボラティリティを計算"""
        prices = market_data.get('recent_prices', [])
        if len(prices) < 2:
            return 0.001

        returns = np.diff(np.log(prices))
        volatility = np.std(returns) * np.sqrt(252 * 24 * 60)  # 年率換算

        return volatility

    def _calculate_order_imbalance(self, market_data: Dict) -> float:
        """オーダーインバランスを計算"""
        orderbook = market_data.get('orderbook', {})

        bid_volume = sum([level[1] for level in orderbook.get('bids', [])[:10]])
        ask_volume = sum([level[1] for level in orderbook.get('asks', [])[:10]])

        total_volume = bid_volume + ask_volume
        if total_volume == 0:
            return 0

        return (bid_volume - ask_volume) / total_volume

    def _calculate_order_size(self, side: str, market_data: Dict) -> float:
        """注文サイズを計算"""
        base_size = 100  # 基本サイズ

        # 在庫調整
        if side == 'buy' and self.inventory > self.inventory_limit * 0.8:
            return base_size * 0.5
        elif side == 'sell' and self.inventory < -self.inventory_limit * 0.8:
            return base_size * 0.5

        # 市場の厚みに応じた調整
        orderbook = market_data.get('orderbook', {})
        if side == 'buy':
            competitor_size = sum([level[1] for level in orderbook.get('bids', [])[:3]])
        else:
            competitor_size = sum([level[1] for level in orderbook.get('asks', [])[:3]])

        # 競合の注文サイズの20%程度に設定
        adjusted_size = min(base_size, competitor_size * 0.2)

        return max(adjusted_size, 10)  # 最小サイズを保証

5. マーケットメイキングアルゴリズム

5.1 Avellaneda-Stoikovモデルの実装

class AvellanedaStoikovMM:
    def __init__(self, 
                 symbol: str,
                 gamma: float = 0.1,  # リスク回避パラメータ
                 k: float = 1.5,      # 流動性パラメータ
                 sigma: float = 0.01, # ボラティリティ
                 T: float = 1.0):     # 時間ホライズン
        self.symbol = symbol
        self.gamma = gamma
        self.k = k
        self.sigma = sigma
        self.T = T
        self.inventory = 0
        self.cash = 10000

    def calculate_reservation_price(self, 
                                  mid_price: float,
                                  time_remaining: float) -> float:
        """リザベーション価格を計算"""
        # 在庫ペナルティ
        inventory_penalty = self.inventory * self.gamma * self.sigma**2 * time_remaining

        # リザベーション価格
        reservation_price = mid_price - inventory_penalty

        return reservation_price

    def calculate_optimal_spread(self, time_remaining: float) -> float:
        """最適スプレッドを計算"""
        # Avellaneda-Stoikovの最適スプレッド公式
        optimal_spread = self.gamma * self.sigma**2 * time_remaining + \
                        (2 / self.gamma) * np.log(1 + self.gamma / self.k)

        return optimal_spread

    def get_quotes(self, 
                  mid_price: float,
                  current_time: float) -> Tuple[float, float]:
        """最適な気配値を取得"""
        time_remaining = max(0, self.T - current_time)

        # リザベーション価格を計算
        reservation_price = self.calculate_reservation_price(
            mid_price, time_remaining
        )

        # 最適スプレッドを計算
        half_spread = self.calculate_optimal_spread(time_remaining) / 2

        # ビッド・アスク価格
        bid_price = reservation_price - half_spread
        ask_price = reservation_price + half_spread

        return bid_price, ask_price

    def update_state(self, 
                    trade_side: str,
                    trade_price: float,
                    trade_size: float) -> None:
        """取引後の状態を更新"""
        if trade_side == 'buy':
            self.inventory += trade_size
            self.cash -= trade_price * trade_size
        else:
            self.inventory -= trade_size
            self.cash += trade_price * trade_size

    def calculate_pnl(self, current_mid_price: float) -> float:
        """PnLを計算"""
        mark_to_market = self.inventory * current_mid_price
        total_value = self.cash + mark_to_market
        initial_value = 10000  # 初期資金

        return total_value - initial_value

5.2 強化学習ベースのマーケットメイキング

import torch
import torch.nn as nn
import torch.optim as optim
from collections import namedtuple

Experience = namedtuple('Experience', 
                       ['state', 'action', 'reward', 'next_state', 'done'])

class DeepMarketMaker(nn.Module):
    def __init__(self, state_dim: int, action_dim: int):
        super().__init__()
        self.fc1 = nn.Linear(state_dim, 256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, 64)

        # デュエリングネットワーク
        self.value_head = nn.Linear(64, 1)
        self.advantage_head = nn.Linear(64, action_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = torch.relu(self.fc3(x))

        value = self.value_head(x)
        advantage = self.advantage_head(x)

        # Q値の計算
        q_values = value + advantage - advantage.mean(dim=1, keepdim=True)

        return q_values

class RLMarketMaker:
    def __init__(self, 
                 state_dim: int = 20,
                 learning_rate: float = 0.001,
                 gamma: float = 0.99):
        self.state_dim = state_dim
        self.action_space = self._define_action_space()
        self.action_dim = len(self.action_space)

        # ニューラルネットワーク
        self.q_network = DeepMarketMaker(state_dim, self.action_dim)
        self.target_network = DeepMarketMaker(state_dim, self.action_dim)
        self.optimizer = optim.Adam(self.q_network.parameters(), lr=learning_rate)

        self.gamma = gamma
        self.epsilon = 1.0
        self.epsilon_decay = 0.995
        self.epsilon_min = 0.01

        # リプレイバッファ
        self.memory = deque(maxlen=10000)

    def _define_action_space(self) -> List[Tuple[float, float]]:
        """離散的なアクション空間を定義"""
        # (ビッドスプレッド, アスクスプレッド)の組み合わせ
        spreads = [0.0001, 0.0002, 0.0005, 0.001, 0.002]
        actions = []

        for bid_spread in spreads:
            for ask_spread in spreads:
                actions.append((bid_spread, ask_spread))

        return actions

    def get_state(self, market_data: Dict) -> np.ndarray:
        """市場データから状態ベクトルを構築"""
        state_features = []

        # 価格関連の特徴量
        prices = market_data['recent_prices'][-100:]
        state_features.extend([
            np.mean(prices),
            np.std(prices),
            (prices[-1] - np.mean(prices)) / np.std(prices),  # Zスコア
        ])

        # ボリューム関連の特徴量
        volumes = market_data['recent_volumes'][-100:]
        state_features.extend([
            np.mean(volumes),
            np.std(volumes),
            volumes[-1] / np.mean(volumes),
        ])

        # オーダーブック関連の特徴量
        orderbook = market_data['orderbook']
        bid_depths = [sum([b[1] for b in orderbook['bids'][:i+1]]) 
                     for i in range(5)]
        ask_depths = [sum([a[1] for a in orderbook['asks'][:i+1]]) 
                     for i in range(5)]

        state_features.extend(bid_depths + ask_depths)

        # 在庫情報
        state_features.extend([
            market_data['inventory'],
            market_data['inventory'] / market_data['inventory_limit'],
        ])

        # 時間情報
        state_features.append(market_data['time_remaining'])

        return np.array(state_features)

    def choose_action(self, state: np.ndarray) -> int:
        """ε-greedy方策でアクションを選択"""
        if np.random.random() < self.epsilon:
            return np.random.randint(self.action_dim)

        with torch.no_grad():
            state_tensor = torch.FloatTensor(state).unsqueeze(0)
            q_values = self.q_network(state_tensor)
            return q_values.argmax().item()

    def remember(self, experience: Experience):
        """経験をリプレイバッファに保存"""
        self.memory.append(experience)

    def replay(self, batch_size: int = 32):
        """リプレイバッファから学習"""
        if len(self.memory) < batch_size:
            return

        batch = random.sample(self.memory, batch_size)
        states = torch.FloatTensor([e.state for e in batch])
        actions = torch.LongTensor([e.action for e in batch])
        rewards = torch.FloatTensor([e.reward for e in batch])
        next_states = torch.FloatTensor([e.next_state for e in batch])
        dones = torch.FloatTensor([e.done for e in batch])

        current_q_values = self.q_network(states).gather(1, actions.unsqueeze(1))

        with torch.no_grad():
            next_q_values = self.target_network(next_states).max(1)[0]
            target_q_values = rewards + (1 - dones) * self.gamma * next_q_values

        loss = nn.MSELoss()(current_q_values.squeeze(), target_q_values)

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # εの減衰
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

    def update_target_network(self):
        """ターゲットネットワークを更新"""
        self.target_network.load_state_dict(self.q_network.state_dict())

    def get_spread_from_action(self, action_idx: int) -> Tuple[float, float]:
        """アクションインデックスからスプレッドを取得"""
        return self.action_space[action_idx]

6. 逆選択と在庫リスク

6.1 逆選択の検出と対策

class AdverseSelectionDetector:
    def __init__(self, window_size: int = 1000):
        self.window_size = window_size
        self.trade_history = deque(maxlen=window_size)

    def detect_adverse_selection(self, 
                               trades: pd.DataFrame,
                               time_horizon: int = 60) -> pd.DataFrame:
        """逆選択を検出"""
        trades = trades.copy()

        # 各取引後の価格変動を計算
        trades['future_price'] = trades['price'].shift(-time_horizon)
        trades['price_change'] = (
            trades['future_price'] - trades['price']
        ) / trades['price']

        # 自分の取引を特定
        my_trades = trades[trades['is_mine'] == True].copy()

        # 買い注文の逆選択
        buy_trades = my_trades[my_trades['side'] == 'buy']
        buy_adverse = buy_trades['price_change'] < 0

        # 売り注文の逆選択
        sell_trades = my_trades[my_trades['side'] == 'sell']
        sell_adverse = sell_trades['price_change'] > 0

        # 逆選択指標の計算
        results = pd.DataFrame({
            'timestamp': my_trades['timestamp'],
            'side': my_trades['side'],
            'is_adverse': pd.concat([buy_adverse, sell_adverse]),
            'adverse_magnitude': abs(my_trades['price_change']),
            'cumulative_adverse_rate': pd.concat([buy_adverse, sell_adverse]).expanding().mean()
        })

        return results

    def calculate_toxic_flow_probability(self, 
                                       order_features: Dict) -> float:
        """有毒な注文フローの確率を計算"""
        # 特徴量から有毒フローの確率を推定
        features = []

        # 注文サイズの異常性
        size_zscore = (order_features['size'] - order_features['avg_size']) / \
                     order_features['std_size']
        features.append(min(abs(size_zscore), 3) / 3)

        # 注文の攻撃性
        features.append(order_features['aggressiveness'])

        # 市場のボラティリティ
        features.append(min(order_features['volatility'] / 0.01, 1))

        # オーダーブックのインバランス
        features.append(abs(order_features['order_imbalance']))

        # 加重平均で確率を計算
        weights = [0.3, 0.3, 0.2, 0.2]
        toxic_probability = sum(f * w for f, w in zip(features, weights))

        return min(toxic_probability, 1.0)

6.2 在庫リスク管理

class InventoryRiskManager:
    def __init__(self, 
                 max_inventory: float = 1000,
                 risk_limit: float = 10000):
        self.max_inventory = max_inventory
        self.risk_limit = risk_limit
        self.inventory_history = []

    def calculate_inventory_risk(self, 
                               current_inventory: float,
                               current_price: float,
                               volatility: float) -> Dict:
        """在庫リスクを計算"""
        # 在庫のVaR(Value at Risk)
        confidence_level = 0.95
        time_horizon = 1 / 24  # 1時間
        z_score = 1.96  # 95%信頼区間

        var = abs(current_inventory) * current_price * volatility * \
              np.sqrt(time_horizon) * z_score

        # 最大ドローダウンリスク
        max_drawdown_risk = abs(current_inventory) * current_price * 0.1  # 10%下落想定

        # 在庫の偏りリスク
        inventory_skew = current_inventory / self.max_inventory
        skew_penalty = abs(inventory_skew) ** 2

        # 総合リスクスコア
        total_risk = var + max_drawdown_risk * skew_penalty

        return {
            'var': var,
            'max_drawdown_risk': max_drawdown_risk,
            'inventory_skew': inventory_skew,
            'total_risk': total_risk,
            'risk_utilization': total_risk / self.risk_limit
        }

    def get_inventory_limits(self, 
                           market_conditions: Dict) -> Tuple[float, float]:
        """市場状況に応じた在庫制限を取得"""
        base_limit = self.max_inventory

        # ボラティリティ調整
        volatility_factor = min(0.01 / market_conditions['volatility'], 1.0)

        # 流動性調整
        liquidity_factor = min(market_conditions['avg_volume'] / 1000, 1.0)

        # スプレッド調整
        spread_factor = min(0.001 / market_conditions['avg_spread'], 1.0)

        # 調整後の制限
        adjusted_limit = base_limit * volatility_factor * liquidity_factor * spread_factor

        return -adjusted_limit, adjusted_limit

    def calculate_optimal_unwind_schedule(self, 
                                        current_inventory: float,
                                        market_data: Dict) -> List[Dict]:
        """最適な在庫解消スケジュールを計算"""
        if abs(current_inventory) < 10:
            return []

        # Almgren-Chrissモデルを使用
        total_shares = abs(current_inventory)
        time_horizon = 3600  # 1時間で解消
        n_slices = 20

        # 市場インパクトパラメータ
        eta = market_data.get('temporary_impact', 0.0001)
        gamma = market_data.get('permanent_impact', 0.00005)
        sigma = market_data.get('volatility', 0.01)

        # リスク回避パラメータ
        lambda_risk = 0.0001

        # 最適な取引軌道を計算
        kappa = np.sqrt(lambda_risk * sigma**2 / eta)

        schedule = []
        for i in range(n_slices):
            t = i * time_horizon / n_slices
            remaining_time = time_horizon - t

            # 最適な保有量
            optimal_holding = total_shares * np.sinh(kappa * remaining_time) / \
                            np.sinh(kappa * time_horizon)

            # 各期間の取引量
            if i == 0:
                trade_size = total_shares - optimal_holding
            else:
                trade_size = schedule[i-1]['remaining'] - optimal_holding

            schedule.append({
                'time': t,
                'trade_size': trade_size if current_inventory > 0 else -trade_size,
                'remaining': optimal_holding,
                'expected_cost': self._calculate_trade_cost(
                    trade_size, eta, gamma, market_data['mid_price']
                )
            })

        return schedule

    def _calculate_trade_cost(self, 
                            size: float,
                            temp_impact: float,
                            perm_impact: float,
                            price: float) -> float:
        """取引コストを計算"""
        temporary_cost = temp_impact * abs(size) * price
        permanent_cost = perm_impact * size**2 * price

        return temporary_cost + permanent_cost

7. 実装例

7.1 統合マーケットメイキングシステム

import asyncio
from typing import Optional

class IntegratedMarketMakingSystem:
    def __init__(self, config: Dict):
        self.config = config
        self.symbol = config['symbol']

        # コンポーネントの初期化
        self.tick_processor = TickDataProcessor(self.symbol)
        self.order_flow_analyzer = OrderFlowAnalyzer()
        self.liquidity_provider = LiquidityProvider(self.symbol)
        self.risk_manager = InventoryRiskManager()
        self.adverse_selection_detector = AdverseSelectionDetector()

        # マーケットメイキングモデル
        self.mm_model = AvellanedaStoikovMM(self.symbol)
        self.rl_model = RLMarketMaker()

        # 状態管理
        self.is_running = False
        self.current_position = 0
        self.total_pnl = 0

    async def start(self):
        """システムを開始"""
        self.is_running = True

        # 並行タスクを開始
        tasks = [
            self._process_market_data(),
            self._update_quotes(),
            self._monitor_risk(),
            self._train_models()
        ]

        await asyncio.gather(*tasks)

    async def _process_market_data(self):
        """市場データを処理"""
        while self.is_running:
            try:
                # 最新のティックデータを取得
                tick = await self._fetch_tick_data()

                # ティックデータを処理
                await self.tick_processor.process_tick(tick)

                # オーダーブックを取得
                orderbook = await self._fetch_orderbook()

                # オーダーフロー分析
                imbalance = self.order_flow_analyzer.calculate_order_imbalance(
                    orderbook
                )

                # 市場状態を更新
                await self._update_market_state({
                    'tick': tick,
                    'orderbook': orderbook,
                    'imbalance': imbalance
                })

            except Exception as e:
                print(f"Error processing market data: {e}")

            await asyncio.sleep(0.1)

    async def _update_quotes(self):
        """気配値を更新"""
        while self.is_running:
            try:
                # 現在の市場状態を取得
                market_state = await self._get_market_state()

                # リスクチェック
                risk_metrics = self.risk_manager.calculate_inventory_risk(
                    self.current_position,
                    market_state['mid_price'],
                    market_state['volatility']
                )

                if risk_metrics['risk_utilization'] > 0.8:
                    # リスクが高い場合は気配を広げる
                    await self._widen_quotes()
                    continue

                # 逆選択チェック
                toxic_prob = self.adverse_selection_detector.calculate_toxic_flow_probability(
                    market_state['order_features']
                )

                if toxic_prob > 0.7:
                    # 有毒フローの可能性が高い場合
                    await self._defensive_quoting()
                    continue

                # 通常の気配値更新
                if self.config['use_rl']:
                    # 強化学習モデルを使用
                    state = self.rl_model.get_state(market_state)
                    action = self.rl_model.choose_action(state)
                    bid_spread, ask_spread = self.rl_model.get_spread_from_action(action)
                else:
                    # Avellaneda-Stoikovモデルを使用
                    bid_price, ask_price = self.mm_model.get_quotes(
                        market_state['mid_price'],
                        market_state['current_time']
                    )
                    bid_spread = (market_state['mid_price'] - bid_price) / market_state['mid_price']
                    ask_spread = (ask_price - market_state['mid_price']) / market_state['mid_price']

                # 注文を送信
                await self._submit_quotes(
                    market_state['mid_price'],
                    bid_spread,
                    ask_spread
                )

            except Exception as e:
                print(f"Error updating quotes: {e}")

            await asyncio.sleep(1)

    async def _monitor_risk(self):
        """リスクを監視"""
        while self.is_running:
            try:
                # ポジションリスクのチェック
                if abs(self.current_position) > self.risk_manager.max_inventory * 0.9:
                    # 在庫制限に近い場合
                    unwind_schedule = self.risk_manager.calculate_optimal_unwind_schedule(
                        self.current_position,
                        await self._get_market_state()
                    )

                    # 在庫解消を開始
                    await self._execute_unwind_schedule(unwind_schedule)

                # PnLの監視
                if self.total_pnl < -self.config['max_loss']:
                    # 最大損失に達した場合
                    await self._emergency_stop()

            except Exception as e:
                print(f"Error monitoring risk: {e}")

            await asyncio.sleep(5)

    async def _train_models(self):
        """モデルを継続的に学習"""
        while self.is_running:
            try:
                if len(self.rl_model.memory) > 1000:
                    # バッチ学習
                    self.rl_model.replay(batch_size=64)

                    # 定期的にターゲットネットワークを更新
                    if self.total_trades % 100 == 0:
                        self.rl_model.update_target_network()

            except Exception as e:
                print(f"Error training models: {e}")

            await asyncio.sleep(10)

    async def _fetch_tick_data(self) -> Dict:
        """ティックデータを取得(実装は取引所APIに依存)"""
        # 実際の実装では取引所のWebSocket APIを使用
        pass

    async def _fetch_orderbook(self) -> Dict:
        """オーダーブックを取得(実装は取引所APIに依存)"""
        # 実際の実装では取引所のREST/WebSocket APIを使用
        pass

    async def stop(self):
        """システムを停止"""
        self.is_running = False
        # すべてのポジションをクローズ
        await self._close_all_positions()

# 使用例
async def main():
    config = {
        'symbol': 'BTC/USDT',
        'use_rl': True,
        'max_loss': 1000,
        'base_spread': 0.001
    }

    system = IntegratedMarketMakingSystem(config)

    try:
        await system.start()
    except KeyboardInterrupt:
        await system.stop()

if __name__ == "__main__":
    asyncio.run(main())

8. パフォーマンス評価とバックテスト

class MarketMakingBacktester:
    def __init__(self, data: pd.DataFrame):
        self.data = data
        self.trades = []
        self.pnl_history = []

    def backtest_strategy(self, 
                         strategy,
                         initial_capital: float = 10000) -> Dict:
        """戦略をバックテスト"""
        capital = initial_capital
        position = 0

        for idx, row in self.data.iterrows():
            # 市場データを準備
            market_data = self._prepare_market_data(idx)

            # 戦略から気配値を取得
            bid_price, bid_size, ask_price, ask_size = strategy.update_quotes(
                row['mid_price'], market_data
            )

            # 約定シミュレーション
            trades = self._simulate_execution(
                row, bid_price, bid_size, ask_price, ask_size, position
            )

            # ポジションと資本を更新
            for trade in trades:
                if trade['side'] == 'buy':
                    position += trade['size']
                    capital -= trade['price'] * trade['size']
                else:
                    position -= trade['size']
                    capital += trade['price'] * trade['size']

                self.trades.append(trade)

            # PnLを記録
            unrealized_pnl = position * row['mid_price']
            total_pnl = capital + unrealized_pnl - initial_capital
            self.pnl_history.append({
                'timestamp': row['timestamp'],
                'realized_pnl': capital - initial_capital,
                'unrealized_pnl': unrealized_pnl,
                'total_pnl': total_pnl,
                'position': position
            })

        return self._calculate_performance_metrics()

    def _calculate_performance_metrics(self) -> Dict:
        """パフォーマンス指標を計算"""
        pnl_df = pd.DataFrame(self.pnl_history)
        trade_df = pd.DataFrame(self.trades)

        # 基本統計
        total_return = pnl_df['total_pnl'].iloc[-1] / 10000

        # シャープレシオ
        returns = pnl_df['total_pnl'].diff() / 10000
        sharpe_ratio = np.sqrt(252) * returns.mean() / returns.std()

        # 最大ドローダウン
        cumulative = (1 + returns).cumprod()
        running_max = cumulative.cummax()
        drawdown = (cumulative - running_max) / running_max
        max_drawdown = drawdown.min()

        # 取引統計
        n_trades = len(trade_df)
        win_rate = (trade_df['pnl'] > 0).mean() if n_trades > 0 else 0

        # スプレッド収益
        spread_capture = trade_df['spread_capture'].sum() if 'spread_capture' in trade_df else 0

        return {
            'total_return': total_return,
            'sharpe_ratio': sharpe_ratio,
            'max_drawdown': max_drawdown,
            'n_trades': n_trades,
            'win_rate': win_rate,
            'spread_capture': spread_capture,
            'avg_position': pnl_df['position'].mean(),
            'max_position': pnl_df['position'].abs().max()
        }

まとめ

本ドキュメントでは、暗号通貨市場における高頻度取引とマーケットマイクロストラクチャの実装について詳しく解説しました。主要なポイント:

  1. オーダーフロー分析:市場の需給を理解し、価格動向を予測
  2. ティックデータ処理:高速かつ正確なデータ処理システムの構築
  3. 流動性提供戦略:動的なスプレッド調整と在庫管理
  4. マーケットメイキング:理論的モデルと機械学習の組み合わせ
  5. リスク管理:逆選択の検出と在庫リスクの制御

これらの技術を組み合わせることで、効率的で収益性の高い高頻度取引システムを構築できます。ただし、実際の運用では、取引所のAPI制限、レイテンシー、規制要件なども考慮する必要があります。