目次
Rust実装理解のための手がかりドキュメント
このドキュメントは、本プロジェクトのRust実装(hyperliquid-crawler/consumer)を理解するための重要な手がかりをまとめています。
📋 目次
アーキテクチャ概要
システムの目的
高頻度取引データのリアルタイム収集・処理システム。WebSocketからの生データを効率的に処理し、時系列データベースに保存。
パフォーマンス特性
- スループット: 61.7~500取引/秒(Python実装の19.5倍)
- レイテンシ: マイクロ秒オーダー
- 信頼性: 自動再接続、エラーリカバリ
プロジェクト構造
src/rust/
├── Cargo.toml # ワークスペース定義
├── hyperliquid-crawler/ # データ収集サービス
│ ├── src/
│ │ ├── main.rs # エントリーポイント、初期化
│ │ ├── config.rs # 環境変数からの設定読み込み
│ │ ├── crawler.rs # WebSocket接続管理、メインループ
│ │ ├── orderbook.rs # L2オーダーブック処理
│ │ ├── kafka_producer.rs # Kafkaへの高速送信
│ │ ├── models.rs # データ構造定義
│ │ └── metrics.rs # Prometheusメトリクス
│ └── Cargo.toml
└── hyperliquid-consumer/ # データ処理・保存サービス
├── src/
│ ├── main.rs # エントリーポイント
│ ├── consumer.rs # Kafka消費、バッチ処理
│ ├── ilp_client.rs # QuestDB ILPプロトコル実装
│ ├── questdb_writer.rs # HTTP経由のQuestDB書き込み(バックアップ)
│ ├── metrics_calculator.rs # マーケットメトリクス計算
│ ├── microstructure_features.rs # 高度な市場分析
│ ├── ohlcv_aggregator.rs # OHLCV集計(1分~日足)
│ ├── dollar_bars.rs # ドルバー集計
│ └── walk_forward_validator.rs # バックテスト検証
└── Cargo.toml
データフロー
1. データ収集(Crawler)
Hyperliquid WebSocket API
↓ (TLS接続)
WebSocketメッセージ受信
↓ (JSONパース)
データ型判定(L2Book / Trades)
↓
バッチング(メモリ内蓄積)
↓ (batch_size到達 or flush_interval経過)
Kafka/RedPandaへ送信
2. データ処理(Consumer)
Kafka/RedPandaから消費
↓ (並行処理)
┌─────────────┬───────────────┬──────────────┐
│ メトリクス計算 │ OHLCV集計 │ ドルバー集計 │
└─────────────┴───────────────┴──────────────┘
↓
ILPプロトコルでバッチ送信
↓
QuestDB(時系列保存)
主要コンポーネント
1. WebSocket接続管理(crawler.rs)
- 自動再接続: 指数バックオフ(最大60秒)
- ハートビート: 30秒間隔でPing送信
- メッセージ処理: 非同期ストリーム処理
// 重要なパターン:tokio::select!による非同期処理
tokio::select! {
Some(msg) = read.next() => { /* メッセージ処理 */ }
_ = flush_interval.tick() => { /* 定期フラッシュ */ }
}
2. オーダーブック処理(orderbook.rs)
- データ構造: BTreeMap(自動ソート)
- 精度: rust_decimal使用(金融計算用)
- メトリクス: スプレッド、深度インバランス、VWAP
3. Kafka統合
- プロデューサー: rdkafka(librdkafkaベース)
- 圧縮: Snappy圧縮オプション
- バッチング: 効率的なメッセージ送信
4. ILPクライアント(ilp_client.rs)
- プロトコル: InfluxDB Line Protocol
- 最適化: HTTPより3-5倍高速
- エラー処理: 数値の有限性チェック
技術的な特徴
非同期プログラミング
- tokio: フル機能の非同期ランタイム
- futures: ストリーム処理
- async-trait: 非同期トレイト
エラーハンドリング
- anyhow: エラーチェーンとコンテキスト
- thiserror: カスタムエラー型定義
- Result
: 全関数でエラー伝播
並行性・共有状態
- Arc: 参照カウントによる共有
- Mutex: 排他制御
- アトミック操作: ロックフリーカウンター
設定管理
- 環境変数: 12-factor app準拠
- デフォルト値: 合理的なデフォルト設定
- 検証: 起動時の設定チェック
パフォーマンス最適化
1. バッチ処理
// 動的バッチサイズ管理
if pending_messages.len() >= batch_size {
flush_messages(&mut pending_messages).await?;
}
2. ゼロコピー最適化
serde_json::from_slice: 直接バイト配列から変換- 文字列の不要なコピーを回避
3. リリースビルド最適化
[profile.release]
opt-level = 3 # 最大最適化
lto = true # Link Time Optimization
codegen-units = 1 # 単一コード生成ユニット
panic = "abort" # 軽量パニック処理
4. メモリ効率
- 事前割り当て: 予測可能なサイズでVec::with_capacity
- 制限付きコレクション: max_levelsでメモリ使用量制御
- 定期的クリーンアップ: 古いデータの削除
開発・デバッグのヒント
1. ログレベル設定
# 詳細なデバッグログ
RUST_LOG=debug cargo run
# 特定モジュールのみ
RUST_LOG=hyperliquid_crawler::crawler=debug cargo run
2. 開発時の推奨設定
# Crawlerの開発
BATCH_SIZE=10 \
FLUSH_INTERVAL_MS=1000 \
RUST_LOG=debug \
cargo run
# Consumerの開発
ENABLE_METRICS_CALCULATION=true \
ENABLE_OHLCV_AGGREGATION=true \
RUST_LOG=hyperliquid_consumer=debug \
cargo run
3. パフォーマンス計測
# CPU/メモリプロファイリング
cargo build --release
perf record --call-graph=dwarf target/release/hyperliquid-crawler
perf report
# ベンチマーク実行
cargo bench
4. よくあるエラーと対処法
WebSocket接続エラー
- 原因: ネットワーク問題、API制限
- 対処: バックオフ設定の調整、ログ確認
Kafka接続エラー
- 原因: ブローカー未起動、設定ミス
- 対処:
docker-compose psで確認、KAFKA_BROKERS環境変数チェック
QuestDB書き込みエラー
- 原因: スキーマ不一致、データ型エラー
- 対処: ILPフォーマット確認、数値の有限性チェック
5. コード理解のポイント
- エントリーポイントから追う: main.rs → 各モジュールの初期化
- データフローを意識: WebSocket → 処理 → Kafka → QuestDB
- エラー処理パターン:
?演算子とResult型の使用 - 非同期パターン: async/await、tokio::select!
- 共有状態の管理: Arc
>パターン
6. 拡張・カスタマイズ
新しい取引所の追加
models.rsに新しいメッセージ型を定義crawler.rsでWebSocket接続とメッセージ解析を実装consumer.rsで処理ロジックを追加
新しいメトリクスの追加
metrics_calculator.rsに計算ロジックを実装models.rsにメトリクス構造体を定義ilp_client.rsでQuestDBへの書き込みを実装
まとめ
このRust実装は、高頻度取引データの処理に必要な以下の要素を実現しています:
- 高性能: 最適化されたデータ構造とアルゴリズム
- 信頼性: 包括的なエラーハンドリングと自動リカバリ
- 拡張性: モジュラーな設計による容易な機能追加
- 観測性: 詳細なメトリクスとログ
Rustの所有権システムとゼロコスト抽象化により、メモリ安全性とパフォーマンスを両立させています。