ML Documentation

tickデータから機械学習を行う際のベストプラクティス

目次

  1. tickデータの特性と課題
  2. 時間サンプリング手法
  3. 特徴量エンジニアリング
  4. 機械学習における考慮事項
  5. 実装上のテクニック
  6. 具体的な実装例と改善提案

1. tickデータの特性と課題

1.1 不規則な時間間隔

1.2 データの偏り(活発な時間帯と閑散期)

1.3 ノイズとマイクロストラクチャー

2. 時間サンプリング手法

2.1 Time Bars(時間バー)

# 固定時間間隔でサンプリング
# 欠点: 市場活動を無視し、統計的性質が劣る
def create_time_bars(ticks, interval='1min'):
    return ticks.resample(interval).agg({
        'price': 'ohlc',
        'volume': 'sum'
    })

2.2 Tick Bars(ティックバー)

# 固定数の取引後にバーを作成
def create_tick_bars(ticks, threshold=1000):
    bars = []
    current_bar = []
    for tick in ticks:
        current_bar.append(tick)
        if len(current_bar) >= threshold:
            bars.append(aggregate_bar(current_bar))
            current_bar = []
    return bars

2.3 Volume Bars(出来高バー)

# 固定出来高に達したらバーを作成
def create_volume_bars(ticks, volume_threshold=100):
    bars = []
    current_volume = 0
    current_bar = []

    for tick in ticks:
        current_bar.append(tick)
        current_volume += tick['volume']

        if current_volume >= volume_threshold:
            bars.append(aggregate_bar(current_bar))
            current_volume = 0
            current_bar = []
    return bars

2.4 Dollar Bars(ドルバー)

# 固定金額分の取引後にバーを作成
# 最も安定した統計的性質を持つ
def create_dollar_bars(ticks, dollar_threshold=10000):
    bars = []
    current_value = 0
    current_bar = []

    for tick in ticks:
        current_bar.append(tick)
        current_value += tick['price'] * tick['volume']

        if current_value >= dollar_threshold:
            bars.append(aggregate_bar(current_bar))
            current_value = 0
            current_bar = []
    return bars

2.5 Information-Driven Bars(情報駆動バー)

# 市場の不均衡を検出してサンプリング頻度を調整
def create_imbalance_bars(ticks, initial_threshold=1000):
    bars = []
    buy_volume = 0
    sell_volume = 0
    threshold = initial_threshold

    for tick in ticks:
        if tick['side'] == 'buy':
            buy_volume += tick['volume']
        else:
            sell_volume += tick['volume']

        imbalance = abs(buy_volume - sell_volume)

        if imbalance >= threshold:
            bars.append(create_bar_with_imbalance(tick, imbalance))
            # 動的に閾値を調整
            threshold = adjust_threshold(imbalance, threshold)
            buy_volume = 0
            sell_volume = 0

    return bars

3. 特徴量エンジニアリング

3.1 マイクロストラクチャー特徴量

// 現在のプロジェクトの実装例
pub struct OrderbookMetrics {
    pub spread_bps: Option<f64>,
    pub depth_imbalance_ratio: Option<f64>,
    pub bid_levels: usize,
    pub ask_levels: usize,
    pub total_bid_volume: Decimal,
    pub total_ask_volume: Decimal,
    pub weighted_mid_price: Option<Decimal>,
}

3.2 流動性指標

def calculate_liquidity_metrics(orderbook):
    # Kyle's Lambda (価格インパクト)
    price_impact = calculate_price_impact(orderbook, trade_size=1000)

    # Amihudの非流動性指標
    amihud_illiquidity = abs(returns) / dollar_volume

    # 実効スプレッド
    effective_spread = 2 * abs(trade_price - mid_price) / mid_price

    # Market Depth Score
    depth_score = total_liquidity / (spread * 1000)

    return {
        'price_impact': price_impact,
        'amihud_illiquidity': amihud_illiquidity,
        'effective_spread': effective_spread,
        'depth_score': min(depth_score, 100)  # 0-100に正規化
    }

3.3 オーダーフロー不均衡

def calculate_order_flow_imbalance(trades, window=100):
    # Volume Order Imbalance (VOI)
    buy_volume = sum(t['volume'] for t in trades if t['side'] == 'buy')
    sell_volume = sum(t['volume'] for t in trades if t['side'] == 'sell')
    voi = (buy_volume - sell_volume) / (buy_volume + sell_volume)

    # Trade Flow Imbalance (TFI)
    # より価格変動の説明力が高い
    weighted_buy = sum(t['volume'] * t['price'] for t in trades if t['side'] == 'buy')
    weighted_sell = sum(t['volume'] * t['price'] for t in trades if t['side'] == 'sell')
    tfi = (weighted_buy - weighted_sell) / (weighted_buy + weighted_sell)

    return {
        'volume_order_imbalance': voi,
        'trade_flow_imbalance': tfi
    }

3.4 価格インパクト指標

def calculate_vwap_metrics(orderbook, levels=5):
    # 上位5レベルのVWAP計算
    bid_vwap = calculate_vwap(orderbook['bids'][:levels])
    ask_vwap = calculate_vwap(orderbook['asks'][:levels])

    # インパクトコスト
    mid_price = (orderbook['best_bid'] + orderbook['best_ask']) / 2
    bid_impact = (mid_price - bid_vwap) / mid_price
    ask_impact = (ask_vwap - mid_price) / mid_price

    return {
        'bid_vwap_5': bid_vwap,
        'ask_vwap_5': ask_vwap,
        'bid_impact_bps': bid_impact * 10000,
        'ask_impact_bps': ask_impact * 10000
    }

4. 機械学習における考慮事項

4.1 データの定常性

# ADF検定による定常性チェック
from statsmodels.tsa.stattools import adfuller

def check_stationarity(series):
    result = adfuller(series)
    return {
        'is_stationary': result[1] < 0.05,
        'p_value': result[1],
        'critical_values': result[4]
    }

# 非定常データの処理
def make_stationary(series):
    # 対数差分変換
    log_returns = np.log(series).diff().dropna()

    # フラクショナル差分(メモリ効果を保持)
    fractional_diff = fractional_difference(series, d=0.3)

    return log_returns, fractional_diff

4.2 過学習対策

# Walk-Forward Analysis(推奨)
def walk_forward_validation(data, model, train_size=1000, test_size=100):
    results = []

    for i in range(train_size, len(data) - test_size, test_size):
        # 訓練データ
        train = data[i-train_size:i]

        # テストデータ(未来のデータ)
        test = data[i:i+test_size]

        # モデル訓練
        model.fit(train)

        # 予測と評価
        predictions = model.predict(test)
        score = evaluate(test, predictions)
        results.append(score)

    return results

# Combinatorial Purged Cross-Validation (CPCV)
# 時系列の依存関係を考慮した高度な手法
def combinatorial_purged_cv(data, n_splits=5, purge_gap=10):
    # 実装は複雑だが、従来のCV手法より優れた性能
    pass

4.3 特徴量の正規化

# ローリングウィンドウによる正規化(lookback biasを防ぐ)
def rolling_normalization(features, window=1000):
    normalized = pd.DataFrame(index=features.index)

    for col in features.columns:
        # 過去のデータのみを使用
        rolling_mean = features[col].rolling(window, min_periods=100).mean()
        rolling_std = features[col].rolling(window, min_periods=100).std()

        # Z-score正規化
        normalized[col] = (features[col] - rolling_mean) / rolling_std

    return normalized

4.4 時系列交差検証

# Blocked Time Series Split
def blocked_time_series_split(data, n_splits=5, test_size=0.2, gap=0.05):
    """
    ギャップを設けて情報漏洩を防ぐ
    """
    splits = []
    total_size = len(data)
    test_length = int(total_size * test_size)
    gap_length = int(total_size * gap)

    for i in range(n_splits):
        test_end = total_size - i * test_length
        test_start = test_end - test_length
        train_end = test_start - gap_length

        if train_end <= test_length:
            break

        train_idx = range(0, train_end)
        test_idx = range(test_start, test_end)
        splits.append((train_idx, test_idx))

    return splits

5. 実装上のテクニック

5.1 メモリ効率的な処理

// 現在のプロジェクトの実装例(Rust)
impl OrderbookHistory {
    fn add_snapshot(&mut self, msg: &OrderbookKafkaMessage) {
        // データを追加
        self.spreads.push_back(spread_f64);
        self.depths.push_back((bid_depth_5, ask_depth_5));
        self.timestamps.push_back(msg.timestamp);

        // 古いデータを削除(メモリ効率)
        let cutoff = msg.timestamp - Duration::seconds(60);
        while let Some(&front_time) = self.timestamps.front() {
            if front_time < cutoff {
                self.timestamps.pop_front();
                self.spreads.pop_front();
                self.depths.pop_front();
            } else {
                break;
            }
        }
    }
}

5.2 リアルタイム特徴量計算

class RealTimeFeatureCalculator:
    def __init__(self, window_sizes=[10, 30, 60, 300]):
        self.window_sizes = window_sizes
        self.buffers = {w: deque(maxlen=w) for w in window_sizes}

    def update(self, tick):
        # 各ウィンドウサイズでバッファを更新
        for window, buffer in self.buffers.items():
            buffer.append(tick)

        # 特徴量を計算
        features = {}
        for window, buffer in self.buffers.items():
            if len(buffer) >= window // 2:  # 最小データ数
                features[f'mean_{window}'] = np.mean([t['price'] for t in buffer])
                features[f'std_{window}'] = np.std([t['price'] for t in buffer])
                features[f'volume_{window}'] = sum(t['volume'] for t in buffer)

        return features

5.3 バックテストにおける注意点

class RealisticBacktester:
    def __init__(self, latency_ms=10, slippage_bps=5):
        self.latency_ms = latency_ms
        self.slippage_bps = slippage_bps

    def execute_trade(self, signal_time, orderbook_history):
        # レイテンシを考慮した実行時刻
        execution_time = signal_time + pd.Timedelta(milliseconds=self.latency_ms)

        # 実行時点のオーダーブック
        execution_orderbook = orderbook_history.at[execution_time]

        # スリッページを適用
        if signal == 'buy':
            execution_price = execution_orderbook['ask'] * (1 + self.slippage_bps / 10000)
        else:
            execution_price = execution_orderbook['bid'] * (1 - self.slippage_bps / 10000)

        return execution_price

6. 具体的な実装例と改善提案

6.1 現在のプロジェクトへの改善提案

1. Dollar Barsの実装

// src/rust/hyperliquid-orderbook-consumer/src/dollar_bars.rs
use rust_decimal::Decimal;
use chrono::{DateTime, Utc};

pub struct DollarBarBuilder {
    threshold: Decimal,
    current_value: Decimal,
    current_trades: Vec<Trade>,
}

impl DollarBarBuilder {
    pub fn new(threshold: Decimal) -> Self {
        Self {
            threshold,
            current_value: Decimal::ZERO,
            current_trades: Vec::new(),
        }
    }

    pub fn add_trade(&mut self, trade: Trade) -> Option<DollarBar> {
        let trade_value = trade.price * trade.size;
        self.current_value += trade_value;
        self.current_trades.push(trade);

        if self.current_value >= self.threshold {
            let bar = self.create_bar();
            self.reset();
            Some(bar)
        } else {
            None
        }
    }

    fn create_bar(&self) -> DollarBar {
        // OHLCV計算
        let open = self.current_trades.first().unwrap().price;
        let close = self.current_trades.last().unwrap().price;
        let high = self.current_trades.iter().map(|t| t.price).max().unwrap();
        let low = self.current_trades.iter().map(|t| t.price).min().unwrap();
        let volume: Decimal = self.current_trades.iter().map(|t| t.size).sum();

        DollarBar {
            timestamp: Utc::now(),
            open,
            high,
            low,
            close,
            volume,
            dollar_volume: self.current_value,
            trade_count: self.current_trades.len(),
        }
    }
}

2. マイクロストラクチャー特徴量の拡張

// 現在のmetrics_calculator.rsに追加
impl MetricsCalculator {
    pub fn calculate_microstructure_features(&self, orderbook: &OrderbookSnapshot) -> MicrostructureFeatures {
        // Kyle's Lambda (価格インパクト)
        let kyle_lambda = self.estimate_kyle_lambda(orderbook);

        // Amihudの非流動性指標
        let amihud_illiquidity = self.calculate_amihud(orderbook);

        // 実効スプレッド
        let effective_spread = self.calculate_effective_spread(orderbook);

        // PIN (Probability of Informed Trading)
        let pin = self.estimate_pin(orderbook);

        MicrostructureFeatures {
            kyle_lambda,
            amihud_illiquidity,
            effective_spread,
            pin,
            // 既存の特徴量も含める
            spread_bps: orderbook.spread.map(|s| s.to_f64().unwrap() * 10000.0),
            depth_imbalance: self.calculate_depth_imbalance(orderbook),
        }
    }
}

3. Walk-Forward検証システム

// src/rust/hyperliquid-orderbook-consumer/src/ml_validator.rs
pub struct WalkForwardValidator {
    train_window: Duration,
    test_window: Duration,
    gap: Duration,
}

impl WalkForwardValidator {
    pub fn validate<M: Model>(&self, data: &[OrderbookSnapshot], model: &M) -> ValidationResults {
        let mut results = Vec::new();
        let mut current_time = data[0].timestamp + self.train_window;

        while current_time < data.last().unwrap().timestamp - self.test_window {
            // 訓練データ
            let train_start = current_time - self.train_window;
            let train_end = current_time;

            // ギャップ
            let test_start = train_end + self.gap;
            let test_end = test_start + self.test_window;

            // データ分割
            let train_data = self.filter_by_time(data, train_start, train_end);
            let test_data = self.filter_by_time(data, test_start, test_end);

            // モデル訓練と評価
            let trained_model = model.train(&train_data);
            let score = trained_model.evaluate(&test_data);

            results.push(score);
            current_time += self.test_window;
        }

        ValidationResults::from(results)
    }
}

4. リアルタイム異常検知

// 現在のconsumer.rsに追加
impl OrderbookConsumer {
    async fn detect_anomalies(&self, orderbook: &OrderbookSnapshot) -> Vec<Anomaly> {
        let mut anomalies = Vec::new();

        // スプレッドの異常
        if let Some(spread_z_score) = self.calculate_spread_zscore(orderbook) {
            if spread_z_score.abs() > 3.0 {
                anomalies.push(Anomaly::SpreadAnomaly {
                    z_score: spread_z_score,
                    spread: orderbook.spread,
                });
            }
        }

        // 流動性の急激な変化
        if let Some(liquidity_change) = self.detect_liquidity_shock(orderbook) {
            if liquidity_change > 0.5 {  // 50%以上の変化
                anomalies.push(Anomaly::LiquidityShock {
                    change_ratio: liquidity_change,
                    side: if orderbook.bid_liquidity < orderbook.ask_liquidity { "bid" } else { "ask" },
                });
            }
        }

        // アラート送信
        if !anomalies.is_empty() {
            self.send_alert(anomalies).await;
        }

        anomalies
    }
}

6.2 パフォーマンス最適化の提案

1. バッチ処理の最適化

// 動的バッチサイズ調整
impl DynamicBatcher {
    pub fn calculate_optimal_batch_size(&self, throughput: f64, latency: Duration) -> usize {
        // スループットとレイテンシのバランスを取る
        let base_size = 1000;
        let throughput_factor = (throughput / 10000.0).min(2.0);
        let latency_factor = (50.0 / latency.as_millis() as f64).max(0.5);

        (base_size as f64 * throughput_factor * latency_factor) as usize
    }
}

2. 特徴量計算の並列化

use rayon::prelude::*;

impl FeatureCalculator {
    pub fn calculate_features_parallel(&self, orderbooks: &[OrderbookSnapshot]) -> Vec<Features> {
        orderbooks.par_iter()
            .map(|orderbook| {
                let microstructure = self.calculate_microstructure_features(orderbook);
                let technical = self.calculate_technical_features(orderbook);
                let flow = self.calculate_flow_features(orderbook);

                Features {
                    microstructure,
                    technical,
                    flow,
                    timestamp: orderbook.timestamp,
                }
            })
            .collect()
    }
}

6.3 モニタリングダッシュボードの拡張

# src/web/ml_monitoring.py
class MLMonitoringDashboard:
    def __init__(self):
        self.feature_stats = {}
        self.model_performance = {}
        self.anomaly_history = deque(maxlen=1000)

    def update_feature_distribution(self, features):
        """特徴量の分布をリアルタイムで追跡"""
        for name, value in features.items():
            if name not in self.feature_stats:
                self.feature_stats[name] = {
                    'mean': 0,
                    'std': 0,
                    'min': float('inf'),
                    'max': float('-inf'),
                    'count': 0
                }

            stats = self.feature_stats[name]
            # オンライン統計更新
            stats['count'] += 1
            delta = value - stats['mean']
            stats['mean'] += delta / stats['count']
            stats['std'] = np.sqrt((stats['std']**2 * (stats['count']-1) + delta**2) / stats['count'])
            stats['min'] = min(stats['min'], value)
            stats['max'] = max(stats['max'], value)

    def detect_feature_drift(self, threshold=3.0):
        """特徴量のドリフトを検出"""
        drifts = []
        for name, stats in self.feature_stats.items():
            if stats['count'] > 100:  # 十分なサンプル数
                # 最新の値と履歴統計を比較
                z_score = abs((stats['last_value'] - stats['mean']) / stats['std'])
                if z_score > threshold:
                    drifts.append({
                        'feature': name,
                        'z_score': z_score,
                        'direction': 'up' if stats['last_value'] > stats['mean'] else 'down'
                    })
        return drifts

まとめ

tickデータから機械学習を行う際の重要なポイント:

  1. データサンプリング: Dollar Barsが最も安定した統計的性質を持つ
  2. 特徴量エンジニアリング: マイクロストラクチャー特徴量とオーダーフロー分析が重要
  3. 検証手法: Walk-Forward AnalysisまたはCPCVを使用
  4. リアルタイム処理: メモリ効率と計算効率を考慮した実装
  5. 異常検知: 市場の異常を早期に検出するシステムの構築

現在のプロジェクトは基本的な機能は実装されていますが、上記の改善提案を実装することで、より堅牢で高性能な機械学習システムを構築できます。