目次
Rust実装ガイド - Coinbase WebSocketクローラー
概要
本ドキュメントは、Coinbase Pro WebSocket APIからリアルタイム取引データを収集し、Kafka経由でQuestDBに保存するRust実装の技術ガイドです。
1. システムアーキテクチャ
1.1 データフロー
┌─────────────────┐ WebSocket ┌──────────────────┐ Kafka ┌──────────────────┐ HTTP/ILP ┌──────────┐
│ Coinbase Pro │ ──────────────▶ │ coinbase-crawler │ ──────────▶ │ coinbase-consumer│ ─────────────▶ │ QuestDB │
│ WebSocket API │ (TLS) │ (Rust) │ crypto- │ (Rust) │ │ │
│ │ │ │ trades │ │ │ │
└─────────────────┘ └──────────────────┘ └──────────────────┘ └──────────┘
↓ ↓
┌──────────────────┐ ┌──────────────────┐
│ 品質スコア算出 │ │ バッチ処理 │
│ エラーハンドリング │ │ データ検証 │
└──────────────────┘ └──────────────────┘
1.2 コンポーネント説明
| コンポーネント | 役割 | 使用技術 |
|---|---|---|
| coinbase-crawler | WebSocketデータ収集 | tokio, tokio-tungstenite, rdkafka |
| coinbase-consumer | データ処理・保存 | tokio, rdkafka, reqwest |
| Kafka (RedPanda) | メッセージブローカー | Kafka互換API |
| QuestDB | 時系列データベース | HTTP API / ILP |
2. QuestDB接続方式の詳細
2.1 現在の実装(HTTP API)
// HTTP POST経由でSQLクエリを実行
impl QuestDBClient {
async fn flush_batch(&mut self) -> Result<()> {
let query = format!(
"INSERT INTO trades (timestamp, exchange, symbol, trade_id, price, size, side, sequence, quality_score) VALUES {}",
values.join(", ")
);
let response = self.client
.post(&format!("{}/exec", self.base_url))
.form(&[("query", query.as_str())])
.send()
.await?;
// ...
}
}
特徴:
- ✅ 実装が簡単
- ✅ デバッグしやすい
- ❌ HTTPオーバーヘッド
- ❌ テキスト形式のSQL生成コスト
2.2 推奨実装(ILP - InfluxDB Line Protocol)
// ILP実装例(questdb-rsクレート使用)
use questdb::{Client, ClientBuilder, Row, Timestamp};
impl QuestDBClient {
fn new(host: &str, port: u16) -> Result<Self> {
let client = ClientBuilder::new()
.host(host)
.port(port)
.build()?;
Ok(Self { client })
}
async fn insert_trade(&mut self, trade: &NormalizedTrade) -> Result<()> {
let mut row = Row::new("trades");
// シンボルカラム(インデックス対象)
row.symbol("exchange", &trade.exchange)
.symbol("symbol", &trade.symbol)
.symbol("side", &trade.side)
.symbol("trade_id", &trade.trade_id);
// 数値カラム
row.float("price", trade.price)
.float("size", trade.size)
.int("sequence", trade.sequence.unwrap_or(0) as i64)
.int("quality_score", trade.quality_score as i64);
// タイムスタンプ
row.timestamp(Timestamp::Nanos(
trade.timestamp.timestamp_nanos()
));
self.client.send_row(row).await?;
Ok(())
}
}
特徴:
- ✅ 高速バイナリプロトコル
- ✅ 自動バッファリング
- ✅ 低レイテンシ(1-2ms)
- ✅ 型安全
2.3 パフォーマンス比較
| 接続方式 | スループット | レイテンシ | CPU使用率 |
|---|---|---|---|
| HTTP API | 60-100 trades/秒 | 20-30ms | 3-5% |
| ILP (TCP) | 200-500 trades/秒 | 1-2ms | 1-2% |
| ILP (UDP) | 500+ trades/秒 | <1ms | 1% |
3. 品質管理機能
3.1 品質スコア算出ロジック
fn calculate_quality_score(&self, symbol: &str, price: f64, size: f64) -> u8 {
let mut score = 100u8;
// 価格検証
if price <= 0.0 || price.is_infinite() || price.is_nan() {
score = score.saturating_sub(30);
}
// 取引量検証
if size <= 0.0 || size.is_infinite() || size.is_nan() {
score = score.saturating_sub(30);
}
// シンボル検証
if symbol.is_empty() || !symbol.contains('-') {
score = score.saturating_sub(20);
}
// Dust取引検出
if size < 0.0001 {
score = score.saturating_sub(10);
}
// 異常価格検出
if price > 1_000_000.0 {
score = score.saturating_sub(20);
}
score
}
3.2 リアルタイム品質チェック
fn validate_trade(&self, trade: &NormalizedTrade) -> bool {
let mut quality_issues = Vec::new();
let mut warnings = Vec::new();
// 基本検証
if trade.price <= 0.0 || trade.size <= 0.0 {
quality_issues.push("Invalid price or size");
return false;
}
// 警告レベルのチェック
if trade.size < 0.0001 {
warnings.push("Very small trade size detected");
}
if trade.price > 1_000_000.0 {
warnings.push("Extremely high price detected");
}
// ログ出力
if !warnings.is_empty() {
debug!("Quality warnings for {}: {}: {:?}",
trade.exchange, trade.symbol, warnings);
}
quality_issues.is_empty()
}
4. エラーハンドリングとリトライ
4.1 WebSocket再接続
impl CoinbaseCrawler {
async fn start(&mut self) -> Result<()> {
loop {
if let Err(e) = self.connect_and_stream().await {
error!("Connection error: {}", e);
self.reconnect_attempts += 1;
if self.reconnect_attempts > self.max_reconnect_attempts {
return Err(e);
}
// 指数バックオフ
let delay = Duration::from_secs(
2_u64.pow(self.reconnect_attempts.min(5))
);
warn!("Reconnecting in {:?} (attempt {})",
delay, self.reconnect_attempts);
sleep(delay).await;
}
}
}
}
4.2 Kafkaエラー処理
match self.producer.send(record, Duration::from_secs(5)).await {
Ok((partition, offset)) => {
debug!("Trade sent to Kafka: partition={}, offset={}",
partition, offset);
}
Err((kafka_err, _)) => {
error!("Failed to send trade to Kafka: {}", kafka_err);
// メトリクス更新
self.metrics.kafka_errors.inc();
return Err(kafka_err.into());
}
}
5. 最適化テクニック
5.1 メモリ最適化
// 固定サイズバッファの使用
use std::collections::VecDeque;
struct RealtimeDataStore {
// 最大300ポイント(5分間のデータ)
data: VecDeque<Trade>,
max_points: usize,
}
impl RealtimeDataStore {
fn add_trade(&mut self, trade: Trade) {
if self.data.len() >= self.max_points {
self.data.pop_front();
}
self.data.push_back(trade);
}
}
5.2 バッチ処理最適化
// 動的バッチサイズ
struct AdaptiveBatcher {
min_batch_size: usize,
max_batch_size: usize,
current_throughput: f64,
}
impl AdaptiveBatcher {
fn optimal_batch_size(&self) -> usize {
match self.current_throughput as u32 {
0..=50 => self.min_batch_size,
51..=200 => 100,
201..=500 => 200,
_ => self.max_batch_size,
}
}
}
5.3 並行処理
// マルチスレッド処理
use tokio::sync::mpsc;
async fn start_parallel_processing(num_workers: usize) {
let (tx, rx) = mpsc::channel(1000);
// ワーカースレッド起動
for id in 0..num_workers {
let rx = rx.clone();
tokio::spawn(async move {
process_worker(id, rx).await;
});
}
// データ配信
distribute_work(tx).await;
}
6. デプロイメントと運用
6.1 Docker設定
# マルチステージビルド
FROM rust:1.81-slim as builder
WORKDIR /app
# 依存関係のキャッシュ
COPY Cargo.toml Cargo.lock ./
RUN mkdir src && echo "fn main() {}" > src/main.rs
RUN cargo build --release && rm -rf src
# 実際のコードをコピー
COPY . .
RUN touch src/main.rs && cargo build --release
# 実行イメージ
FROM debian:bookworm-slim
COPY --from=builder /app/target/release/coinbase-crawler /usr/local/bin/
CMD ["coinbase-crawler"]
6.2 環境変数設定
# Crawler設定
RUST_LOG=coinbase_crawler=info
KAFKA_BROKERS=redpanda:9092
COINBASE_SYMBOLS=BTC-USD,ETH-USD,SOL-USD
# Consumer設定
QUESTDB_HOST=questdb
QUESTDB_ILP_PORT=9000
BATCH_SIZE=100
KAFKA_GROUP_ID=coinbase-consumer-group
6.3 監視設定
// Prometheusメトリクス(将来実装)
use prometheus::{Counter, Gauge, Registry};
struct Metrics {
trades_processed: Counter,
current_lag: Gauge,
errors_total: Counter,
}
7. トラブルシューティング
7.1 よくある問題と解決方法
| 問題 | 原因 | 解決方法 |
|---|---|---|
| TLS接続エラー | TLS機能が無効 | tokio-tungsteniteにnative-tlsフィーチャーを追加 |
| QuestDB挿入失敗 | HTTPエンコーディング | form()メソッドを使用、またはILPに移行 |
| 高メモリ使用 | バッファサイズ過大 | 固定サイズバッファ、定期的なフラッシュ |
| Kafka接続エラー | ブローカー設定 | KAFKA_BROKERS環境変数を確認 |
7.2 デバッグ方法
# 詳細ログの有効化
RUST_LOG=coinbase_crawler=debug,coinbase_consumer=debug cargo run
# 特定モジュールのみ
RUST_LOG=coinbase_crawler::websocket=trace cargo run
# Kafkaメッセージの確認
docker exec redpanda rpk topic consume crypto-trades --num 10
8. 今後の開発計画
Phase 1 - 基本機能強化(完了)
- ✅ WebSocket接続・データ収集
- ✅ Kafka統合
- ✅ 品質スコア機能
- ✅ エラーハンドリング
Phase 2 - パフォーマンス最適化(進行中)
- 🚧 ILPプロトコル実装
- 🚧 バッチ処理最適化
- 🚧 コネクションプーリング
Phase 3 - 運用機能(計画)
- 📋 Prometheusメトリクス
- 📋 ヘルスチェックAPI
- 📋 設定ホットリロード
- 📋 gRPCインターフェース
最終更新: 2025年6月12日