ML Documentation

推奨追加メトリクス実装ガイド

優先度1: Crawlerで実装(即座に実装可能)

1. オーダーブック更新頻度

// metrics.rs に追加
pub static ref ORDERBOOK_UPDATE_RATE: GaugeVec = register_gauge_vec!(
    "crypto_orderbook_updates_per_second",
    "Orderbook update rate per symbol",
    &["symbol"]
).unwrap();

効果: WebSocket接続の健全性とデータ品質を監視

2. 最良価格変化率

pub static ref BEST_PRICE_CHANGE_RATE: GaugeVec = register_gauge_vec!(
    "crypto_best_price_change_percent",
    "Best bid/ask price change rate",
    &["symbol", "side"]
).unwrap();

効果: 市場のボラティリティをリアルタイム監視

3. メッセージサイズ統計

pub static ref MESSAGE_SIZE_BYTES: HistogramVec = register_histogram_vec!(
    HistogramOpts::new(
        "crypto_message_size_bytes",
        "WebSocket message size distribution"
    )
    .buckets(vec![100.0, 500.0, 1000.0, 5000.0, 10000.0]),
    &["message_type"]
).unwrap();

効果: ネットワーク帯域使用量の最適化

優先度2: Consumerで実装(バッチ処理で効率的)

4. 取引量集計(時間窓)

pub static ref TRADE_VOLUME_USD: GaugeVec = register_gauge_vec!(
    "crypto_trade_volume_usd_total",
    "Total trade volume in USD",
    &["symbol", "window"]
).unwrap();

// 1分、5分、15分窓で集計

効果: 流動性とマーケット活動の把握

5. 大口取引検出

pub static ref LARGE_TRADE_COUNT: CounterVec = register_counter_vec!(
    "crypto_large_trades_total",
    "Count of large trades above threshold",
    &["symbol", "size_category"] // small, medium, large, whale
).unwrap();

効果: マーケットインパクトの予測

6. 価格急変検出

pub static ref PRICE_SPIKE_EVENTS: CounterVec = register_counter_vec!(
    "crypto_price_spike_events_total",
    "Price movements exceeding threshold",
    &["symbol", "direction", "severity"] // up/down, minor/major/extreme
).unwrap();

効果: アラート発火とリスク管理

優先度3: 後処理システムで実装

7. VWAP計算(5分ごと)

-- QuestDBで定期実行
INSERT INTO vwap_5m
SELECT 
    timestamp_floor('5m', timestamp) as time,
    symbol,
    sum(price * size) / sum(size) as vwap,
    sum(size) as total_volume
FROM trades
WHERE timestamp >= dateadd('m', -5, now())
GROUP BY time, symbol;

8. 市場品質スコア

# Web分析システムで計算
def calculate_market_quality_score(symbol):
    # スプレッド、深度、更新頻度を総合評価
    spread_score = 1 / (1 + spread_percentage)
    depth_score = log(market_depth_usd)
    update_score = min(update_rate / 10, 1)
    return (spread_score + depth_score + update_score) / 3

実装の指針

  1. Crawlerメトリクス: メッセージ受信時に即座に計算
  2. Consumerメトリクス: バッファリングして定期的に集計
  3. 後処理メトリクス: cronジョブやストリーミングSQLで計算

パフォーマンス影響

実装例

Crawlerでの実装パターン

// crawler.rs の calculate_orderbook_metrics に追加
fn calculate_orderbook_metrics(&self, orderbook: &OrderbookSnapshot) {
    // 既存のメトリクス...

    // 更新頻度の追跡
    let now = Instant::now();
    if let Some(last_update) = self.last_update_time.get(&orderbook.symbol) {
        let rate = 1.0 / now.duration_since(*last_update).as_secs_f64();
        metrics::ORDERBOOK_UPDATE_RATE
            .with_label_values(&[&orderbook.symbol])
            .set(rate);
    }
    self.last_update_time.insert(orderbook.symbol.clone(), now);
}

Consumerでの実装パターン

// consumer.rs にウィンドウ集計を追加
struct VolumeAggregator {
    windows: HashMap<String, HashMap<String, f64>>, // symbol -> window -> volume
    last_flush: Instant,
}

impl VolumeAggregator {
    fn add_trade(&mut self, symbol: &str, volume_usd: f64) {
        for window in &["1m", "5m", "15m"] {
            self.windows
                .entry(symbol.to_string())
                .or_insert_with(HashMap::new)
                .entry(window.to_string())
                .and_modify(|v| *v += volume_usd)
                .or_insert(volume_usd);
        }
    }

    fn flush_metrics(&mut self) {
        if self.last_flush.elapsed() > Duration::from_secs(60) {
            for (symbol, windows) in &self.windows {
                for (window, volume) in windows {
                    metrics::TRADE_VOLUME_USD
                        .with_label_values(&[symbol, window])
                        .set(*volume);
                }
            }
            self.windows.clear();
            self.last_flush = Instant::now();
        }
    }
}

モニタリング効果

これらのメトリクスにより:
1. 市場の健全性: 更新頻度とメッセージサイズで接続品質を監視
2. 取引機会: 価格変動と大口取引でエントリーポイントを検出
3. リスク管理: 急激な価格変動とボラティリティを追跡
4. システム最適化: メッセージサイズとバッチ効率を改善