ML Documentation

Rust実装理解のための手がかりドキュメント

このドキュメントは、本プロジェクトのRust実装(hyperliquid-crawler/consumer)を理解するための重要な手がかりをまとめています。

📋 目次

  1. アーキテクチャ概要
  2. プロジェクト構造
  3. データフロー
  4. 主要コンポーネント
  5. 技術的な特徴
  6. パフォーマンス最適化
  7. 開発・デバッグのヒント

アーキテクチャ概要

システムの目的

高頻度取引データのリアルタイム収集・処理システム。WebSocketからの生データを効率的に処理し、時系列データベースに保存。

パフォーマンス特性

プロジェクト構造

src/rust/
├── Cargo.toml                 # ワークスペース定義
├── hyperliquid-crawler/       # データ収集サービス
   ├── src/
      ├── main.rs           # エントリーポイント、初期化
      ├── config.rs         # 環境変数からの設定読み込み
      ├── crawler.rs        # WebSocket接続管理、メインループ
      ├── orderbook.rs      # L2オーダーブック処理
      ├── kafka_producer.rs # Kafkaへの高速送信
      ├── models.rs         # データ構造定義
      └── metrics.rs        # Prometheusメトリクス
   └── Cargo.toml
└── hyperliquid-consumer/      # データ処理・保存サービス
    ├── src/
       ├── main.rs           # エントリーポイント
       ├── consumer.rs       # Kafka消費、バッチ処理
       ├── ilp_client.rs     # QuestDB ILPプロトコル実装
       ├── questdb_writer.rs # HTTP経由のQuestDB書き込み(バックアップ)
       ├── metrics_calculator.rs    # マーケットメトリクス計算
       ├── microstructure_features.rs # 高度な市場分析
       ├── ohlcv_aggregator.rs      # OHLCV集計1分~日足)
       ├── dollar_bars.rs            # ドルバー集計
       └── walk_forward_validator.rs # バックテスト検証
    └── Cargo.toml

データフロー

1. データ収集(Crawler)

Hyperliquid WebSocket API
    ↓ (TLS接続)
WebSocketメッセージ受信
    ↓ (JSONパース)
データ型判定(L2Book / Trades)
    ↓
バッチング(メモリ内蓄積)
    ↓ (batch_size到達 or flush_interval経過)
Kafka/RedPandaへ送信

2. データ処理(Consumer)

Kafka/RedPandaから消費
    ↓ (並行処理)
┌─────────────┬───────────────┬──────────────┐
│ メトリクス計算 │ OHLCV集計     │ ドルバー集計  │
└─────────────┴───────────────┴──────────────┘
    ↓
ILPプロトコルでバッチ送信
    ↓
QuestDB(時系列保存)

主要コンポーネント

1. WebSocket接続管理(crawler.rs)

// 重要なパターン:tokio::select!による非同期処理
tokio::select! {
    Some(msg) = read.next() => { /* メッセージ処理 */ }
    _ = flush_interval.tick() => { /* 定期フラッシュ */ }
}

2. オーダーブック処理(orderbook.rs)

3. Kafka統合

4. ILPクライアント(ilp_client.rs)

技術的な特徴

非同期プログラミング

エラーハンドリング

並行性・共有状態

設定管理

パフォーマンス最適化

1. バッチ処理

// 動的バッチサイズ管理
if pending_messages.len() >= batch_size {
    flush_messages(&mut pending_messages).await?;
}

2. ゼロコピー最適化

3. リリースビルド最適化

[profile.release]
opt-level = 3        # 最大最適化
lto = true          # Link Time Optimization
codegen-units = 1   # 単一コード生成ユニット
panic = "abort"     # 軽量パニック処理

4. メモリ効率

開発・デバッグのヒント

1. ログレベル設定

# 詳細なデバッグログ
RUST_LOG=debug cargo run

# 特定モジュールのみ
RUST_LOG=hyperliquid_crawler::crawler=debug cargo run

2. 開発時の推奨設定

# Crawlerの開発
BATCH_SIZE=10 \
FLUSH_INTERVAL_MS=1000 \
RUST_LOG=debug \
cargo run

# Consumerの開発
ENABLE_METRICS_CALCULATION=true \
ENABLE_OHLCV_AGGREGATION=true \
RUST_LOG=hyperliquid_consumer=debug \
cargo run

3. パフォーマンス計測

# CPU/メモリプロファイリング
cargo build --release
perf record --call-graph=dwarf target/release/hyperliquid-crawler
perf report

# ベンチマーク実行
cargo bench

4. よくあるエラーと対処法

WebSocket接続エラー

Kafka接続エラー

QuestDB書き込みエラー

5. コード理解のポイント

  1. エントリーポイントから追う: main.rs → 各モジュールの初期化
  2. データフローを意識: WebSocket → 処理 → Kafka → QuestDB
  3. エラー処理パターン: ?演算子とResult型の使用
  4. 非同期パターン: async/await、tokio::select!
  5. 共有状態の管理: Arc>パターン

6. 拡張・カスタマイズ

新しい取引所の追加

  1. models.rsに新しいメッセージ型を定義
  2. crawler.rsでWebSocket接続とメッセージ解析を実装
  3. consumer.rsで処理ロジックを追加

新しいメトリクスの追加

  1. metrics_calculator.rsに計算ロジックを実装
  2. models.rsにメトリクス構造体を定義
  3. ilp_client.rsでQuestDBへの書き込みを実装

まとめ

このRust実装は、高頻度取引データの処理に必要な以下の要素を実現しています:

Rustの所有権システムとゼロコスト抽象化により、メモリ安全性とパフォーマンスを両立させています。