ML Documentation

Rust実装ガイド - Coinbase WebSocketクローラー

概要

本ドキュメントは、Coinbase Pro WebSocket APIからリアルタイム取引データを収集し、Kafka経由でQuestDBに保存するRust実装の技術ガイドです。


1. システムアーキテクチャ

1.1 データフロー

┌─────────────────┐    WebSocket    ┌──────────────────┐    Kafka    ┌──────────────────┐    HTTP/ILP    ┌──────────┐
│ Coinbase Pro    │ ──────────────▶ │ coinbase-crawler │ ──────────▶ │ coinbase-consumer│ ─────────────▶ │ QuestDB  │
│ WebSocket API   │     (TLS)       │ (Rust)           │   crypto-   │ (Rust)           │                │          │
│                 │                 │                  │   trades    │                  │                │          │
└─────────────────┘                 └──────────────────┘             └──────────────────┘                └──────────┘
                                            ↓                                ↓
                                    ┌──────────────────┐            ┌──────────────────┐
                                    │ 品質スコア算出    │            │ バッチ処理       │
                                    │ エラーハンドリング │            │ データ検証       │
                                    └──────────────────┘            └──────────────────┘

1.2 コンポーネント説明

コンポーネント 役割 使用技術
coinbase-crawler WebSocketデータ収集 tokio, tokio-tungstenite, rdkafka
coinbase-consumer データ処理・保存 tokio, rdkafka, reqwest
Kafka (RedPanda) メッセージブローカー Kafka互換API
QuestDB 時系列データベース HTTP API / ILP

2. QuestDB接続方式の詳細

2.1 現在の実装(HTTP API)

// HTTP POST経由でSQLクエリを実行
impl QuestDBClient {
    async fn flush_batch(&mut self) -> Result<()> {
        let query = format!(
            "INSERT INTO trades (timestamp, exchange, symbol, trade_id, price, size, side, sequence, quality_score) VALUES {}",
            values.join(", ")
        );

        let response = self.client
            .post(&format!("{}/exec", self.base_url))
            .form(&[("query", query.as_str())])
            .send()
            .await?;

        // ...
    }
}

特徴:
- ✅ 実装が簡単
- ✅ デバッグしやすい
- ❌ HTTPオーバーヘッド
- ❌ テキスト形式のSQL生成コスト

2.2 推奨実装(ILP - InfluxDB Line Protocol)

// ILP実装例(questdb-rsクレート使用)
use questdb::{Client, ClientBuilder, Row, Timestamp};

impl QuestDBClient {
    fn new(host: &str, port: u16) -> Result<Self> {
        let client = ClientBuilder::new()
            .host(host)
            .port(port)
            .build()?;

        Ok(Self { client })
    }

    async fn insert_trade(&mut self, trade: &NormalizedTrade) -> Result<()> {
        let mut row = Row::new("trades");

        // シンボルカラム(インデックス対象)
        row.symbol("exchange", &trade.exchange)
           .symbol("symbol", &trade.symbol)
           .symbol("side", &trade.side)
           .symbol("trade_id", &trade.trade_id);

        // 数値カラム
        row.float("price", trade.price)
           .float("size", trade.size)
           .int("sequence", trade.sequence.unwrap_or(0) as i64)
           .int("quality_score", trade.quality_score as i64);

        // タイムスタンプ
        row.timestamp(Timestamp::Nanos(
            trade.timestamp.timestamp_nanos()
        ));

        self.client.send_row(row).await?;
        Ok(())
    }
}

特徴:
- ✅ 高速バイナリプロトコル
- ✅ 自動バッファリング
- ✅ 低レイテンシ(1-2ms)
- ✅ 型安全

2.3 パフォーマンス比較

接続方式 スループット レイテンシ CPU使用率
HTTP API 60-100 trades/秒 20-30ms 3-5%
ILP (TCP) 200-500 trades/秒 1-2ms 1-2%
ILP (UDP) 500+ trades/秒 <1ms 1%

3. 品質管理機能

3.1 品質スコア算出ロジック

fn calculate_quality_score(&self, symbol: &str, price: f64, size: f64) -> u8 {
    let mut score = 100u8;

    // 価格検証
    if price <= 0.0 || price.is_infinite() || price.is_nan() {
        score = score.saturating_sub(30);
    }

    // 取引量検証
    if size <= 0.0 || size.is_infinite() || size.is_nan() {
        score = score.saturating_sub(30);
    }

    // シンボル検証
    if symbol.is_empty() || !symbol.contains('-') {
        score = score.saturating_sub(20);
    }

    // Dust取引検出
    if size < 0.0001 {
        score = score.saturating_sub(10);
    }

    // 異常価格検出
    if price > 1_000_000.0 {
        score = score.saturating_sub(20);
    }

    score
}

3.2 リアルタイム品質チェック

fn validate_trade(&self, trade: &NormalizedTrade) -> bool {
    let mut quality_issues = Vec::new();
    let mut warnings = Vec::new();

    // 基本検証
    if trade.price <= 0.0 || trade.size <= 0.0 {
        quality_issues.push("Invalid price or size");
        return false;
    }

    // 警告レベルのチェック
    if trade.size < 0.0001 {
        warnings.push("Very small trade size detected");
    }

    if trade.price > 1_000_000.0 {
        warnings.push("Extremely high price detected");
    }

    // ログ出力
    if !warnings.is_empty() {
        debug!("Quality warnings for {}: {}: {:?}", 
               trade.exchange, trade.symbol, warnings);
    }

    quality_issues.is_empty()
}

4. エラーハンドリングとリトライ

4.1 WebSocket再接続

impl CoinbaseCrawler {
    async fn start(&mut self) -> Result<()> {
        loop {
            if let Err(e) = self.connect_and_stream().await {
                error!("Connection error: {}", e);

                self.reconnect_attempts += 1;
                if self.reconnect_attempts > self.max_reconnect_attempts {
                    return Err(e);
                }

                // 指数バックオフ
                let delay = Duration::from_secs(
                    2_u64.pow(self.reconnect_attempts.min(5))
                );
                warn!("Reconnecting in {:?} (attempt {})", 
                      delay, self.reconnect_attempts);
                sleep(delay).await;
            }
        }
    }
}

4.2 Kafkaエラー処理

match self.producer.send(record, Duration::from_secs(5)).await {
    Ok((partition, offset)) => {
        debug!("Trade sent to Kafka: partition={}, offset={}", 
               partition, offset);
    }
    Err((kafka_err, _)) => {
        error!("Failed to send trade to Kafka: {}", kafka_err);
        // メトリクス更新
        self.metrics.kafka_errors.inc();
        return Err(kafka_err.into());
    }
}

5. 最適化テクニック

5.1 メモリ最適化

// 固定サイズバッファの使用
use std::collections::VecDeque;

struct RealtimeDataStore {
    // 最大300ポイント(5分間のデータ)
    data: VecDeque<Trade>,
    max_points: usize,
}

impl RealtimeDataStore {
    fn add_trade(&mut self, trade: Trade) {
        if self.data.len() >= self.max_points {
            self.data.pop_front();
        }
        self.data.push_back(trade);
    }
}

5.2 バッチ処理最適化

// 動的バッチサイズ
struct AdaptiveBatcher {
    min_batch_size: usize,
    max_batch_size: usize,
    current_throughput: f64,
}

impl AdaptiveBatcher {
    fn optimal_batch_size(&self) -> usize {
        match self.current_throughput as u32 {
            0..=50 => self.min_batch_size,
            51..=200 => 100,
            201..=500 => 200,
            _ => self.max_batch_size,
        }
    }
}

5.3 並行処理

// マルチスレッド処理
use tokio::sync::mpsc;

async fn start_parallel_processing(num_workers: usize) {
    let (tx, rx) = mpsc::channel(1000);

    // ワーカースレッド起動
    for id in 0..num_workers {
        let rx = rx.clone();
        tokio::spawn(async move {
            process_worker(id, rx).await;
        });
    }

    // データ配信
    distribute_work(tx).await;
}

6. デプロイメントと運用

6.1 Docker設定

# マルチステージビルド
FROM rust:1.81-slim as builder
WORKDIR /app

# 依存関係のキャッシュ
COPY Cargo.toml Cargo.lock ./
RUN mkdir src && echo "fn main() {}" > src/main.rs
RUN cargo build --release && rm -rf src

# 実際のコードをコピー
COPY . .
RUN touch src/main.rs && cargo build --release

# 実行イメージ
FROM debian:bookworm-slim
COPY --from=builder /app/target/release/coinbase-crawler /usr/local/bin/
CMD ["coinbase-crawler"]

6.2 環境変数設定

# Crawler設定
RUST_LOG=coinbase_crawler=info
KAFKA_BROKERS=redpanda:9092
COINBASE_SYMBOLS=BTC-USD,ETH-USD,SOL-USD

# Consumer設定
QUESTDB_HOST=questdb
QUESTDB_ILP_PORT=9000
BATCH_SIZE=100
KAFKA_GROUP_ID=coinbase-consumer-group

6.3 監視設定

// Prometheusメトリクス(将来実装)
use prometheus::{Counter, Gauge, Registry};

struct Metrics {
    trades_processed: Counter,
    current_lag: Gauge,
    errors_total: Counter,
}

7. トラブルシューティング

7.1 よくある問題と解決方法

問題 原因 解決方法
TLS接続エラー TLS機能が無効 tokio-tungstenitenative-tlsフィーチャーを追加
QuestDB挿入失敗 HTTPエンコーディング form()メソッドを使用、またはILPに移行
高メモリ使用 バッファサイズ過大 固定サイズバッファ、定期的なフラッシュ
Kafka接続エラー ブローカー設定 KAFKA_BROKERS環境変数を確認

7.2 デバッグ方法

# 詳細ログの有効化
RUST_LOG=coinbase_crawler=debug,coinbase_consumer=debug cargo run

# 特定モジュールのみ
RUST_LOG=coinbase_crawler::websocket=trace cargo run

# Kafkaメッセージの確認
docker exec redpanda rpk topic consume crypto-trades --num 10

8. 今後の開発計画

Phase 1 - 基本機能強化(完了)

Phase 2 - パフォーマンス最適化(進行中)

Phase 3 - 運用機能(計画)


最終更新: 2025年6月12日