ML Documentation

Hyperliquid Orderbookシステムのパフォーマンス最適化ガイド

概要

本ドキュメントでは、Hyperliquid orderbook-crawlerとorderbook-consumerのパフォーマンスを向上させるための具体的な施策について解説します。現在の実装の分析結果に基づき、システム全体のスループット、レイテンシー、リソース効率を改善する方法を提案します。

1. 現在のアーキテクチャと性能ボトルネック

1.1 システム構成

[Hyperliquid WebSocket]  [Orderbook Crawler]  [Kafka]  [Orderbook Consumer]  [QuestDB]

1.2 現在の性能ボトルネック

  1. 同期的なKafka送信: クローラーでメッセージを1つずつ送信
  2. 非効率なメモリ使用: 頻繁なアロケーションとコピー
  3. シングルスレッド処理: 並列化の不足
  4. 文字列フォーマットのオーバーヘッド: QuestDB書き込み時
  5. コネクションプーリングの欠如: HTTP接続の再利用なし

2. パフォーマンス最適化戦略

2.1 並行処理の最適化

2.1.1 非同期Kafka送信の実装

// 現在の実装(非効率)
for msg in pending_messages.drain(..) {
    self.kafka_producer.send_orderbook(&msg).await?;
}

// 最適化された実装
use futures::future::join_all;

// バッチ内のメッセージを並行送信
let send_futures: Vec<_> = pending_messages
    .drain(..)
    .map(|msg| self.kafka_producer.send_orderbook(msg))
    .collect();

let results = join_all(send_futures).await;

// エラーハンドリング
for result in results {
    if let Err(e) = result {
        error!("Failed to send message: {}", e);
        // リトライロジックまたはDLQへの送信
    }
}

2.1.2 マルチスレッド処理の導入

use tokio::sync::mpsc;
use std::sync::Arc;

pub struct ParallelOrderbookProcessor {
    symbol_channels: HashMap<String, mpsc::Sender<OrderbookUpdate>>,
    worker_handles: Vec<tokio::task::JoinHandle<()>>,
}

impl ParallelOrderbookProcessor {
    pub fn new(symbols: Vec<String>, kafka_producer: Arc<KafkaProducer>) -> Self {
        let mut symbol_channels = HashMap::new();
        let mut worker_handles = Vec::new();

        for symbol in symbols {
            let (tx, rx) = mpsc::channel(1000);
            symbol_channels.insert(symbol.clone(), tx);

            // 各シンボル用の専用ワーカー
            let producer = kafka_producer.clone();
            let handle = tokio::spawn(async move {
                process_symbol_orderbook(symbol, rx, producer).await;
            });

            worker_handles.push(handle);
        }

        Self { symbol_channels, worker_handles }
    }

    pub async fn process_update(&self, symbol: &str, update: OrderbookUpdate) {
        if let Some(tx) = self.symbol_channels.get(symbol) {
            if let Err(e) = tx.send(update).await {
                error!("Failed to send update to worker: {}", e);
            }
        }
    }
}

async fn process_symbol_orderbook(
    symbol: String,
    mut rx: mpsc::Receiver<OrderbookUpdate>,
    producer: Arc<KafkaProducer>,
) {
    let mut batch = Vec::with_capacity(100);
    let mut interval = tokio::time::interval(Duration::from_millis(100));

    loop {
        tokio::select! {
            Some(update) = rx.recv() => {
                batch.push(update);
                if batch.len() >= 100 {
                    send_batch(&producer, &symbol, &mut batch).await;
                }
            }
            _ = interval.tick() => {
                if !batch.is_empty() {
                    send_batch(&producer, &symbol, &mut batch).await;
                }
            }
        }
    }
}

2.2 メモリ効率の改善

2.2.1 ゼロコピー最適化

use bytes::Bytes;
use serde::Deserialize;

// ゼロコピーデシリアライゼーション
#[derive(Deserialize)]
pub struct OrderbookUpdateBorrowed<'a> {
    #[serde(borrow)]
    pub symbol: &'a str,
    pub bids: Vec<(Decimal, Decimal)>,
    pub asks: Vec<(Decimal, Decimal)>,
    pub timestamp: i64,
}

// メモリプールの実装
pub struct MessagePool {
    pool: Vec<Vec<u8>>,
    max_size: usize,
}

impl MessagePool {
    pub fn new(capacity: usize) -> Self {
        Self {
            pool: Vec::with_capacity(capacity),
            max_size: capacity,
        }
    }

    pub fn acquire(&mut self) -> Vec<u8> {
        self.pool.pop().unwrap_or_else(|| Vec::with_capacity(4096))
    }

    pub fn release(&mut self, mut buffer: Vec<u8>) {
        if self.pool.len() < self.max_size {
            buffer.clear();
            self.pool.push(buffer);
        }
    }
}

2.2.2 効率的なデータ構造

use parking_lot::RwLock;
use dashmap::DashMap;

// ロックフリーなオーダーブック管理
pub struct OptimizedOrderbookManager {
    // DashMapによる並行アクセス最適化
    orderbooks: DashMap<String, Arc<RwLock<Orderbook>>>,
    // 事前割り当てされたバッファプール
    buffer_pool: Arc<Mutex<MessagePool>>,
}

impl OptimizedOrderbookManager {
    pub async fn update_orderbook(&self, symbol: &str, update: OrderbookUpdate) {
        let orderbook = self.orderbooks
            .entry(symbol.to_string())
            .or_insert_with(|| Arc::new(RwLock::new(Orderbook::new())));

        // 読み取りロックで現在の状態を確認
        let needs_update = {
            let book = orderbook.read();
            book.sequence < update.sequence
        };

        // 必要な場合のみ書き込みロック
        if needs_update {
            let mut book = orderbook.write();
            book.apply_update(update);
        }
    }
}

2.3 I/O最適化

2.3.1 バッチ処理の改善

use std::time::Duration;

pub struct OptimizedBatchProcessor {
    batch_size: usize,
    max_latency: Duration,
    compression_threshold: usize,
}

impl OptimizedBatchProcessor {
    pub async fn process_batch(&mut self, messages: Vec<OrderbookMessage>) -> Result<()> {
        // サイズに基づいて圧縮を判断
        let should_compress = messages.len() > self.compression_threshold;

        if should_compress {
            let compressed = self.compress_batch(&messages)?;
            self.send_compressed_batch(compressed).await?;
        } else {
            // 小さいバッチは非圧縮で送信
            self.send_raw_batch(messages).await?;
        }

        Ok(())
    }

    fn compress_batch(&self, messages: &[OrderbookMessage]) -> Result<Vec<u8>> {
        use flate2::Compression;
        use flate2::write::GzEncoder;

        let serialized = bincode::serialize(messages)?;
        let mut encoder = GzEncoder::new(Vec::new(), Compression::fast());
        encoder.write_all(&serialized)?;
        Ok(encoder.finish()?)
    }
}

2.3.2 QuestDBへの効率的な書き込み

use hyper::{Body, Client, Request};
use hyper::client::HttpConnector;
use bb8::{Pool, PooledConnection};

// コネクションプーリングの実装
pub struct QuestDBConnectionPool {
    pool: Pool<QuestDBConnectionManager>,
}

impl QuestDBConnectionPool {
    pub async fn new(config: QuestDBConfig) -> Result<Self> {
        let manager = QuestDBConnectionManager::new(config);
        let pool = Pool::builder()
            .max_size(10)
            .min_idle(Some(2))
            .build(manager)
            .await?;

        Ok(Self { pool })
    }

    pub async fn write_batch(&self, data: Vec<String>) -> Result<()> {
        let mut conn = self.pool.get().await?;

        // バッファリングされた書き込み
        let mut buffer = String::with_capacity(data.iter().map(|s| s.len()).sum());
        for line in data {
            buffer.push_str(&line);
            buffer.push('\n');
        }

        conn.write_ilp(buffer).await
    }
}

// 効率的なILPフォーマッティング
pub struct ILPFormatter {
    buffer: String,
    timestamp_cache: HashMap<i64, String>,
}

impl ILPFormatter {
    pub fn format_orderbook(&mut self, msg: &OrderbookKafkaMessage) -> &str {
        self.buffer.clear();

        // 測定名とタグ
        write!(&mut self.buffer, "orderbook,exchange={},symbol={}", 
            msg.exchange, msg.symbol).unwrap();

        // フィールド値(事前計算された値を使用)
        write!(&mut self.buffer, " bid={},ask={},spread={},mid_price={}",
            msg.best_bid_price, msg.best_ask_price, 
            msg.spread, msg.mid_price).unwrap();

        // タイムスタンプ(キャッシュを活用)
        let ts_str = self.timestamp_cache
            .entry(msg.timestamp)
            .or_insert_with(|| format!("{}", msg.timestamp));
        write!(&mut self.buffer, " {}", ts_str).unwrap();

        &self.buffer
    }
}

2.4 CPU最適化

2.4.1 SIMD命令を使用した計算高速化

use packed_simd::f64x4;

pub struct SimdOrderbookCalculator {
    pub fn calculate_vwap_simd(&self, levels: &[(f64, f64)]) -> f64 {
        let mut price_volume_sum = 0.0;
        let mut volume_sum = 0.0;

        // 4要素ずつSIMDで処理
        let chunks = levels.chunks_exact(4);
        let remainder = chunks.remainder();

        for chunk in chunks {
            let prices = f64x4::new(
                chunk[0].0, chunk[1].0, chunk[2].0, chunk[3].0
            );
            let volumes = f64x4::new(
                chunk[0].1, chunk[1].1, chunk[2].1, chunk[3].1
            );

            let pv = prices * volumes;
            price_volume_sum += pv.sum();
            volume_sum += volumes.sum();
        }

        // 残りの要素を処理
        for &(price, volume) in remainder {
            price_volume_sum += price * volume;
            volume_sum += volume;
        }

        price_volume_sum / volume_sum
    }
}

2.4.2 キャッシュ効率の改善

#[repr(align(64))] // キャッシュライン境界に配置
pub struct CacheAlignedOrderbook {
    // ホットデータをまとめて配置
    best_bid: Decimal,
    best_ask: Decimal,
    timestamp: i64,
    _padding1: [u8; 16], // パディングでfalse sharingを防ぐ

    // コールドデータ
    bids: BTreeMap<Decimal, Decimal>,
    asks: BTreeMap<Decimal, Decimal>,
    sequence: u64,
}

// CPUアフィニティの設定
pub fn set_cpu_affinity(cpu_id: usize) -> Result<()> {
    use core_affinity::CoreId;

    let core_ids = core_affinity::get_core_ids()
        .ok_or_else(|| anyhow!("Failed to get core IDs"))?;

    if let Some(core_id) = core_ids.get(cpu_id) {
        core_affinity::set_for_current(*core_id);
        info!("Set CPU affinity to core {}", cpu_id);
    }

    Ok(())
}

2.5 設定の最適化

2.5.1 Kafkaパフォーマンスチューニング

# kafka_config.toml
[producer]
# バッチング設定
batch_size = 65536  # 64KB(デフォルト16KBから増加)
linger_ms = 5      # 5ms(デフォルト10msから削減)
compression_type = "lz4"  # より高速な圧縮アルゴリズム

# バッファ設定
buffer_memory = 67108864  # 64MB
max_in_flight_requests_per_connection = 5

# パフォーマンス設定
acks = 1  # リーダーのみの確認で高速化
enable_idempotence = true

[consumer]
# フェッチ設定
fetch_min_bytes = 50000  # 50KB
fetch_max_wait_ms = 100  # 100ms
max_poll_records = 1000  # バッチサイズ増加

# パフォーマンス設定
enable_auto_commit = false  # 手動コミットで制御
session_timeout_ms = 30000

2.5.2 システムレベルの最適化

// システム設定の最適化
pub fn optimize_system_settings() -> Result<()> {
    // TCPバッファサイズの増加
    std::process::Command::new("sysctl")
        .args(&["-w", "net.core.rmem_max=134217728"])
        .output()?;

    std::process::Command::new("sysctl")
        .args(&["-w", "net.core.wmem_max=134217728"])
        .output()?;

    // ファイルディスクリプタ制限の増加
    let rlimit = libc::rlimit {
        rlim_cur: 65536,
        rlim_max: 65536,
    };
    unsafe {
        libc::setrlimit(libc::RLIMIT_NOFILE, &rlimit);
    }

    Ok(())
}

3. パフォーマンスモニタリング

3.1 メトリクス収集の改善

use prometheus::{Counter, Histogram, HistogramOpts, Registry};
use once_cell::sync::Lazy;

pub struct PerformanceMetrics {
    pub orderbook_updates: Counter,
    pub processing_latency: Histogram,
    pub batch_size: Histogram,
    pub kafka_send_latency: Histogram,
    pub questdb_write_latency: Histogram,
}

static METRICS: Lazy<PerformanceMetrics> = Lazy::new(|| {
    let registry = Registry::new();

    PerformanceMetrics {
        orderbook_updates: Counter::new("orderbook_updates_total", "Total orderbook updates")
            .unwrap(),
        processing_latency: Histogram::with_opts(
            HistogramOpts::new("processing_latency_seconds", "Processing latency")
                .buckets(vec![0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0])
        ).unwrap(),
        batch_size: Histogram::with_opts(
            HistogramOpts::new("batch_size", "Batch size distribution")
                .buckets(vec![10.0, 50.0, 100.0, 500.0, 1000.0])
        ).unwrap(),
        kafka_send_latency: Histogram::with_opts(
            HistogramOpts::new("kafka_send_latency_seconds", "Kafka send latency")
                .buckets(vec![0.001, 0.005, 0.01, 0.05, 0.1])
        ).unwrap(),
        questdb_write_latency: Histogram::with_opts(
            HistogramOpts::new("questdb_write_latency_seconds", "QuestDB write latency")
                .buckets(vec![0.01, 0.05, 0.1, 0.5, 1.0])
        ).unwrap(),
    }
});

// 使用例
pub async fn process_with_metrics(update: OrderbookUpdate) {
    let start = Instant::now();

    // 処理実行
    process_update(update).await;

    // メトリクス記録
    METRICS.orderbook_updates.inc();
    METRICS.processing_latency.observe(start.elapsed().as_secs_f64());
}

3.2 プロファイリングとベンチマーク

#[cfg(test)]
mod benchmarks {
    use super::*;
    use criterion::{black_box, criterion_group, criterion_main, Criterion};

    fn bench_orderbook_update(c: &mut Criterion) {
        let mut orderbook = Orderbook::new();
        let update = create_sample_update();

        c.bench_function("orderbook_update", |b| {
            b.iter(|| {
                orderbook.apply_update(black_box(&update));
            });
        });
    }

    fn bench_metrics_calculation(c: &mut Criterion) {
        let orderbook = create_sample_orderbook();

        c.bench_function("calculate_metrics", |b| {
            b.iter(|| {
                black_box(calculate_orderbook_metrics(&orderbook));
            });
        });
    }

    criterion_group!(benches, bench_orderbook_update, bench_metrics_calculation);
    criterion_main!(benches);
}

4. 実装優先順位とロードマップ

Phase 1: 即効性の高い最適化(1-2週間)

  1. 非同期Kafka送信の実装 - 最大50%のスループット向上
  2. バッチサイズの調整 - 設定変更のみで10-20%改善
  3. コネクションプーリング - QuestDB書き込みの20-30%高速化

Phase 2: 中期的な改善(2-4週間)

  1. 並列処理の導入 - マルチスレッド化で2-3倍のスループット
  2. メモリプールの実装 - GC圧力の削減
  3. 効率的なシリアライゼーション - MessagePackやbincodeの導入

Phase 3: 長期的な最適化(1-2ヶ月)

  1. SIMD最適化 - 計算処理の2-4倍高速化
  2. カスタムメモリアロケータ - jemallocの導入
  3. ゼロコピー実装 - 完全なゼロコピーパイプライン

5. パフォーマンステスト結果の期待値

現在の実装と最適化後の期待されるパフォーマンス比較:

メトリクス 現在 最適化後 改善率
スループット(msg/sec) 10,000 50,000+ 5倍
平均レイテンシー 10ms 2ms 80%削減
CPU使用率 60% 30% 50%削減
メモリ使用量 2GB 1GB 50%削減
GCポーズ時間 50ms 5ms 90%削減

6. トラブルシューティング

6.1 一般的な問題と解決策

  1. 高レイテンシー
    - Kafkaのバッチ設定を確認
    - ネットワーク遅延を測定
    - GCログを分析

  2. メモリリーク
    - valgrindheaptrackでプロファイリング
    - 循環参照のチェック
    - バッファプールのサイズ確認

  3. CPU飽和
    - perfでホットスポット特定
    - 並列度の調整
    - SIMD最適化の適用

6.2 モニタリングダッシュボード

Grafanaダッシュボードで以下のメトリクスを監視:
- スループット(messages/sec)
- レイテンシー分布(p50, p95, p99)
- エラー率
- リソース使用率(CPU、メモリ、ネットワーク)
- Kafkaラグ
- QuestDB書き込み成功率

まとめ

本ドキュメントで提案した最適化を段階的に実装することで、Hyperliquid Orderbookシステムのパフォーマンスを大幅に向上させることができます。特に、並行処理の改善とI/O最適化は即効性が高く、実装も比較的容易です。継続的なモニタリングとプロファイリングを行いながら、システムの要求に応じて最適化を進めることが重要です。