ML Documentation

フェーズ2メトリクス実装計画

優先度1: Consumerメトリクス(即座に実装可能)

1. Kafka消費パフォーマンス

// consumer/metrics.rs に追加
pub static ref KAFKA_CONSUMER_LAG: GaugeVec = register_gauge_vec!(
    "kafka_consumer_lag_messages",
    "Number of messages behind the latest offset",
    &["topic", "partition"]
).unwrap();

pub static ref BATCH_SIZE_HISTOGRAM: HistogramVec = register_histogram_vec!(
    HistogramOpts::new(
        "questdb_batch_size",
        "Actual batch sizes sent to QuestDB"
    )
    .buckets(vec![1.0, 10.0, 50.0, 100.0, 500.0, 1000.0]),
    &["data_type"]
).unwrap();

2. データ品質チェック

pub static ref DUPLICATE_MESSAGES: CounterVec = register_counter_vec!(
    "data_duplicate_messages_total",
    "Count of duplicate messages detected",
    &["symbol", "message_type"]
).unwrap();

pub static ref VALIDATION_FAILURES: CounterVec = register_counter_vec!(
    "data_validation_failures_total",
    "Count of data validation failures",
    &["symbol", "failure_type"]
).unwrap();

優先度2: 取引分析(SQLビュー)

QuestDBビュー作成

-- 5分間の取引統計
CREATE VIEW trade_stats_5m AS
SELECT 
    timestamp_floor('5m', timestamp) as time_bucket,
    symbol,
    count(*) as trade_count,
    sum(size) as total_volume,
    sum(price * size) / sum(size) as vwap,
    max(price) - min(price) as price_range,
    count(CASE WHEN side = 'buy' THEN 1 END) as buy_count,
    count(CASE WHEN side = 'sell' THEN 1 END) as sell_count,
    avg(CASE WHEN size > 10000 THEN 1 ELSE 0 END) as large_trade_ratio
FROM trades
WHERE timestamp >= dateadd('m', -5, now())
GROUP BY time_bucket, symbol;

-- 異常検知用スコア
CREATE VIEW anomaly_scores AS
WITH stats AS (
    SELECT 
        symbol,
        avg(price) as avg_price,
        stddev(price) as std_price,
        avg(size) as avg_size,
        stddev(size) as std_size
    FROM trades
    WHERE timestamp >= dateadd('h', -24, now())
    GROUP BY symbol
)
SELECT 
    t.timestamp,
    t.symbol,
    t.price,
    t.size,
    abs(t.price - s.avg_price) / s.std_price as price_z_score,
    abs(t.size - s.avg_size) / s.std_size as size_z_score,
    CASE 
        WHEN abs(t.price - s.avg_price) / s.std_price > 3 THEN 'price_spike'
        WHEN abs(t.size - s.avg_size) / s.std_size > 3 THEN 'volume_spike'
        ELSE 'normal'
    END as anomaly_type
FROM trades t
JOIN stats s ON t.symbol = s.symbol
WHERE t.timestamp >= dateadd('m', -5, now());

優先度3: 高度な分析メトリクス

市場品質スコア計算

# web分析システムに追加
def calculate_market_quality_metrics():
    """市場品質の総合スコアを計算"""

    # 実効スプレッド(実際の約定価格ベース)
    effective_spread = calculate_effective_spread()

    # 価格改善率(ミッドプライスより良い価格での約定率)
    price_improvement = calculate_price_improvement_rate()

    # 市場深度の安定性
    depth_stability = calculate_depth_stability()

    # レジリエンススコア(大口取引後の回復速度)
    resilience = calculate_order_book_resilience()

    return {
        'effective_spread': effective_spread,
        'price_improvement_rate': price_improvement,
        'depth_stability': depth_stability,
        'resilience_score': resilience,
        'overall_quality': (
            0.3 * (1 / effective_spread) +
            0.3 * price_improvement +
            0.2 * depth_stability +
            0.2 * resilience
        )
    }

実装の利点

1. 運用改善

2. 取引戦略

3. システム最適化

パフォーマンス考慮事項

実装順序

  1. 即座に: Consumerメトリクス(Rust)
  2. 今週中: SQLビューとcronジョブ
  3. 来週: Web分析システムの拡張