目次
Hyperliquid Orderbookシステムのパフォーマンス最適化ガイド
概要
本ドキュメントでは、Hyperliquid orderbook-crawlerとorderbook-consumerのパフォーマンスを向上させるための具体的な施策について解説します。現在の実装の分析結果に基づき、システム全体のスループット、レイテンシー、リソース効率を改善する方法を提案します。
1. 現在のアーキテクチャと性能ボトルネック
1.1 システム構成
[Hyperliquid WebSocket] → [Orderbook Crawler] → [Kafka] → [Orderbook Consumer] → [QuestDB]
- Orderbook Crawler: WebSocketからリアルタイムでオーダーブックデータを取得
- Kafka: メッセージングレイヤーとしてデータを中継
- Orderbook Consumer: Kafkaからデータを消費し、QuestDBに保存
- QuestDB: 時系列データベースとして最終的なデータストア
1.2 現在の性能ボトルネック
- 同期的なKafka送信: クローラーでメッセージを1つずつ送信
- 非効率なメモリ使用: 頻繁なアロケーションとコピー
- シングルスレッド処理: 並列化の不足
- 文字列フォーマットのオーバーヘッド: QuestDB書き込み時
- コネクションプーリングの欠如: HTTP接続の再利用なし
2. パフォーマンス最適化戦略
2.1 並行処理の最適化
2.1.1 非同期Kafka送信の実装
// 現在の実装(非効率)
for msg in pending_messages.drain(..) {
self.kafka_producer.send_orderbook(&msg).await?;
}
// 最適化された実装
use futures::future::join_all;
// バッチ内のメッセージを並行送信
let send_futures: Vec<_> = pending_messages
.drain(..)
.map(|msg| self.kafka_producer.send_orderbook(msg))
.collect();
let results = join_all(send_futures).await;
// エラーハンドリング
for result in results {
if let Err(e) = result {
error!("Failed to send message: {}", e);
// リトライロジックまたはDLQへの送信
}
}
2.1.2 マルチスレッド処理の導入
use tokio::sync::mpsc;
use std::sync::Arc;
pub struct ParallelOrderbookProcessor {
symbol_channels: HashMap<String, mpsc::Sender<OrderbookUpdate>>,
worker_handles: Vec<tokio::task::JoinHandle<()>>,
}
impl ParallelOrderbookProcessor {
pub fn new(symbols: Vec<String>, kafka_producer: Arc<KafkaProducer>) -> Self {
let mut symbol_channels = HashMap::new();
let mut worker_handles = Vec::new();
for symbol in symbols {
let (tx, rx) = mpsc::channel(1000);
symbol_channels.insert(symbol.clone(), tx);
// 各シンボル用の専用ワーカー
let producer = kafka_producer.clone();
let handle = tokio::spawn(async move {
process_symbol_orderbook(symbol, rx, producer).await;
});
worker_handles.push(handle);
}
Self { symbol_channels, worker_handles }
}
pub async fn process_update(&self, symbol: &str, update: OrderbookUpdate) {
if let Some(tx) = self.symbol_channels.get(symbol) {
if let Err(e) = tx.send(update).await {
error!("Failed to send update to worker: {}", e);
}
}
}
}
async fn process_symbol_orderbook(
symbol: String,
mut rx: mpsc::Receiver<OrderbookUpdate>,
producer: Arc<KafkaProducer>,
) {
let mut batch = Vec::with_capacity(100);
let mut interval = tokio::time::interval(Duration::from_millis(100));
loop {
tokio::select! {
Some(update) = rx.recv() => {
batch.push(update);
if batch.len() >= 100 {
send_batch(&producer, &symbol, &mut batch).await;
}
}
_ = interval.tick() => {
if !batch.is_empty() {
send_batch(&producer, &symbol, &mut batch).await;
}
}
}
}
}
2.2 メモリ効率の改善
2.2.1 ゼロコピー最適化
use bytes::Bytes;
use serde::Deserialize;
// ゼロコピーデシリアライゼーション
#[derive(Deserialize)]
pub struct OrderbookUpdateBorrowed<'a> {
#[serde(borrow)]
pub symbol: &'a str,
pub bids: Vec<(Decimal, Decimal)>,
pub asks: Vec<(Decimal, Decimal)>,
pub timestamp: i64,
}
// メモリプールの実装
pub struct MessagePool {
pool: Vec<Vec<u8>>,
max_size: usize,
}
impl MessagePool {
pub fn new(capacity: usize) -> Self {
Self {
pool: Vec::with_capacity(capacity),
max_size: capacity,
}
}
pub fn acquire(&mut self) -> Vec<u8> {
self.pool.pop().unwrap_or_else(|| Vec::with_capacity(4096))
}
pub fn release(&mut self, mut buffer: Vec<u8>) {
if self.pool.len() < self.max_size {
buffer.clear();
self.pool.push(buffer);
}
}
}
2.2.2 効率的なデータ構造
use parking_lot::RwLock;
use dashmap::DashMap;
// ロックフリーなオーダーブック管理
pub struct OptimizedOrderbookManager {
// DashMapによる並行アクセス最適化
orderbooks: DashMap<String, Arc<RwLock<Orderbook>>>,
// 事前割り当てされたバッファプール
buffer_pool: Arc<Mutex<MessagePool>>,
}
impl OptimizedOrderbookManager {
pub async fn update_orderbook(&self, symbol: &str, update: OrderbookUpdate) {
let orderbook = self.orderbooks
.entry(symbol.to_string())
.or_insert_with(|| Arc::new(RwLock::new(Orderbook::new())));
// 読み取りロックで現在の状態を確認
let needs_update = {
let book = orderbook.read();
book.sequence < update.sequence
};
// 必要な場合のみ書き込みロック
if needs_update {
let mut book = orderbook.write();
book.apply_update(update);
}
}
}
2.3 I/O最適化
2.3.1 バッチ処理の改善
use std::time::Duration;
pub struct OptimizedBatchProcessor {
batch_size: usize,
max_latency: Duration,
compression_threshold: usize,
}
impl OptimizedBatchProcessor {
pub async fn process_batch(&mut self, messages: Vec<OrderbookMessage>) -> Result<()> {
// サイズに基づいて圧縮を判断
let should_compress = messages.len() > self.compression_threshold;
if should_compress {
let compressed = self.compress_batch(&messages)?;
self.send_compressed_batch(compressed).await?;
} else {
// 小さいバッチは非圧縮で送信
self.send_raw_batch(messages).await?;
}
Ok(())
}
fn compress_batch(&self, messages: &[OrderbookMessage]) -> Result<Vec<u8>> {
use flate2::Compression;
use flate2::write::GzEncoder;
let serialized = bincode::serialize(messages)?;
let mut encoder = GzEncoder::new(Vec::new(), Compression::fast());
encoder.write_all(&serialized)?;
Ok(encoder.finish()?)
}
}
2.3.2 QuestDBへの効率的な書き込み
use hyper::{Body, Client, Request};
use hyper::client::HttpConnector;
use bb8::{Pool, PooledConnection};
// コネクションプーリングの実装
pub struct QuestDBConnectionPool {
pool: Pool<QuestDBConnectionManager>,
}
impl QuestDBConnectionPool {
pub async fn new(config: QuestDBConfig) -> Result<Self> {
let manager = QuestDBConnectionManager::new(config);
let pool = Pool::builder()
.max_size(10)
.min_idle(Some(2))
.build(manager)
.await?;
Ok(Self { pool })
}
pub async fn write_batch(&self, data: Vec<String>) -> Result<()> {
let mut conn = self.pool.get().await?;
// バッファリングされた書き込み
let mut buffer = String::with_capacity(data.iter().map(|s| s.len()).sum());
for line in data {
buffer.push_str(&line);
buffer.push('\n');
}
conn.write_ilp(buffer).await
}
}
// 効率的なILPフォーマッティング
pub struct ILPFormatter {
buffer: String,
timestamp_cache: HashMap<i64, String>,
}
impl ILPFormatter {
pub fn format_orderbook(&mut self, msg: &OrderbookKafkaMessage) -> &str {
self.buffer.clear();
// 測定名とタグ
write!(&mut self.buffer, "orderbook,exchange={},symbol={}",
msg.exchange, msg.symbol).unwrap();
// フィールド値(事前計算された値を使用)
write!(&mut self.buffer, " bid={},ask={},spread={},mid_price={}",
msg.best_bid_price, msg.best_ask_price,
msg.spread, msg.mid_price).unwrap();
// タイムスタンプ(キャッシュを活用)
let ts_str = self.timestamp_cache
.entry(msg.timestamp)
.or_insert_with(|| format!("{}", msg.timestamp));
write!(&mut self.buffer, " {}", ts_str).unwrap();
&self.buffer
}
}
2.4 CPU最適化
2.4.1 SIMD命令を使用した計算高速化
use packed_simd::f64x4;
pub struct SimdOrderbookCalculator {
pub fn calculate_vwap_simd(&self, levels: &[(f64, f64)]) -> f64 {
let mut price_volume_sum = 0.0;
let mut volume_sum = 0.0;
// 4要素ずつSIMDで処理
let chunks = levels.chunks_exact(4);
let remainder = chunks.remainder();
for chunk in chunks {
let prices = f64x4::new(
chunk[0].0, chunk[1].0, chunk[2].0, chunk[3].0
);
let volumes = f64x4::new(
chunk[0].1, chunk[1].1, chunk[2].1, chunk[3].1
);
let pv = prices * volumes;
price_volume_sum += pv.sum();
volume_sum += volumes.sum();
}
// 残りの要素を処理
for &(price, volume) in remainder {
price_volume_sum += price * volume;
volume_sum += volume;
}
price_volume_sum / volume_sum
}
}
2.4.2 キャッシュ効率の改善
#[repr(align(64))] // キャッシュライン境界に配置
pub struct CacheAlignedOrderbook {
// ホットデータをまとめて配置
best_bid: Decimal,
best_ask: Decimal,
timestamp: i64,
_padding1: [u8; 16], // パディングでfalse sharingを防ぐ
// コールドデータ
bids: BTreeMap<Decimal, Decimal>,
asks: BTreeMap<Decimal, Decimal>,
sequence: u64,
}
// CPUアフィニティの設定
pub fn set_cpu_affinity(cpu_id: usize) -> Result<()> {
use core_affinity::CoreId;
let core_ids = core_affinity::get_core_ids()
.ok_or_else(|| anyhow!("Failed to get core IDs"))?;
if let Some(core_id) = core_ids.get(cpu_id) {
core_affinity::set_for_current(*core_id);
info!("Set CPU affinity to core {}", cpu_id);
}
Ok(())
}
2.5 設定の最適化
2.5.1 Kafkaパフォーマンスチューニング
# kafka_config.toml
[producer]
# バッチング設定
batch_size = 65536 # 64KB(デフォルト16KBから増加)
linger_ms = 5 # 5ms(デフォルト10msから削減)
compression_type = "lz4" # より高速な圧縮アルゴリズム
# バッファ設定
buffer_memory = 67108864 # 64MB
max_in_flight_requests_per_connection = 5
# パフォーマンス設定
acks = 1 # リーダーのみの確認で高速化
enable_idempotence = true
[consumer]
# フェッチ設定
fetch_min_bytes = 50000 # 50KB
fetch_max_wait_ms = 100 # 100ms
max_poll_records = 1000 # バッチサイズ増加
# パフォーマンス設定
enable_auto_commit = false # 手動コミットで制御
session_timeout_ms = 30000
2.5.2 システムレベルの最適化
// システム設定の最適化
pub fn optimize_system_settings() -> Result<()> {
// TCPバッファサイズの増加
std::process::Command::new("sysctl")
.args(&["-w", "net.core.rmem_max=134217728"])
.output()?;
std::process::Command::new("sysctl")
.args(&["-w", "net.core.wmem_max=134217728"])
.output()?;
// ファイルディスクリプタ制限の増加
let rlimit = libc::rlimit {
rlim_cur: 65536,
rlim_max: 65536,
};
unsafe {
libc::setrlimit(libc::RLIMIT_NOFILE, &rlimit);
}
Ok(())
}
3. パフォーマンスモニタリング
3.1 メトリクス収集の改善
use prometheus::{Counter, Histogram, HistogramOpts, Registry};
use once_cell::sync::Lazy;
pub struct PerformanceMetrics {
pub orderbook_updates: Counter,
pub processing_latency: Histogram,
pub batch_size: Histogram,
pub kafka_send_latency: Histogram,
pub questdb_write_latency: Histogram,
}
static METRICS: Lazy<PerformanceMetrics> = Lazy::new(|| {
let registry = Registry::new();
PerformanceMetrics {
orderbook_updates: Counter::new("orderbook_updates_total", "Total orderbook updates")
.unwrap(),
processing_latency: Histogram::with_opts(
HistogramOpts::new("processing_latency_seconds", "Processing latency")
.buckets(vec![0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0])
).unwrap(),
batch_size: Histogram::with_opts(
HistogramOpts::new("batch_size", "Batch size distribution")
.buckets(vec![10.0, 50.0, 100.0, 500.0, 1000.0])
).unwrap(),
kafka_send_latency: Histogram::with_opts(
HistogramOpts::new("kafka_send_latency_seconds", "Kafka send latency")
.buckets(vec![0.001, 0.005, 0.01, 0.05, 0.1])
).unwrap(),
questdb_write_latency: Histogram::with_opts(
HistogramOpts::new("questdb_write_latency_seconds", "QuestDB write latency")
.buckets(vec![0.01, 0.05, 0.1, 0.5, 1.0])
).unwrap(),
}
});
// 使用例
pub async fn process_with_metrics(update: OrderbookUpdate) {
let start = Instant::now();
// 処理実行
process_update(update).await;
// メトリクス記録
METRICS.orderbook_updates.inc();
METRICS.processing_latency.observe(start.elapsed().as_secs_f64());
}
3.2 プロファイリングとベンチマーク
#[cfg(test)]
mod benchmarks {
use super::*;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
fn bench_orderbook_update(c: &mut Criterion) {
let mut orderbook = Orderbook::new();
let update = create_sample_update();
c.bench_function("orderbook_update", |b| {
b.iter(|| {
orderbook.apply_update(black_box(&update));
});
});
}
fn bench_metrics_calculation(c: &mut Criterion) {
let orderbook = create_sample_orderbook();
c.bench_function("calculate_metrics", |b| {
b.iter(|| {
black_box(calculate_orderbook_metrics(&orderbook));
});
});
}
criterion_group!(benches, bench_orderbook_update, bench_metrics_calculation);
criterion_main!(benches);
}
4. 実装優先順位とロードマップ
Phase 1: 即効性の高い最適化(1-2週間)
- 非同期Kafka送信の実装 - 最大50%のスループット向上
- バッチサイズの調整 - 設定変更のみで10-20%改善
- コネクションプーリング - QuestDB書き込みの20-30%高速化
Phase 2: 中期的な改善(2-4週間)
- 並列処理の導入 - マルチスレッド化で2-3倍のスループット
- メモリプールの実装 - GC圧力の削減
- 効率的なシリアライゼーション - MessagePackやbincodeの導入
Phase 3: 長期的な最適化(1-2ヶ月)
- SIMD最適化 - 計算処理の2-4倍高速化
- カスタムメモリアロケータ - jemallocの導入
- ゼロコピー実装 - 完全なゼロコピーパイプライン
5. パフォーマンステスト結果の期待値
現在の実装と最適化後の期待されるパフォーマンス比較:
| メトリクス | 現在 | 最適化後 | 改善率 |
|---|---|---|---|
| スループット(msg/sec) | 10,000 | 50,000+ | 5倍 |
| 平均レイテンシー | 10ms | 2ms | 80%削減 |
| CPU使用率 | 60% | 30% | 50%削減 |
| メモリ使用量 | 2GB | 1GB | 50%削減 |
| GCポーズ時間 | 50ms | 5ms | 90%削減 |
6. トラブルシューティング
6.1 一般的な問題と解決策
-
高レイテンシー
- Kafkaのバッチ設定を確認
- ネットワーク遅延を測定
- GCログを分析 -
メモリリーク
-valgrindやheaptrackでプロファイリング
- 循環参照のチェック
- バッファプールのサイズ確認 -
CPU飽和
-perfでホットスポット特定
- 並列度の調整
- SIMD最適化の適用
6.2 モニタリングダッシュボード
Grafanaダッシュボードで以下のメトリクスを監視:
- スループット(messages/sec)
- レイテンシー分布(p50, p95, p99)
- エラー率
- リソース使用率(CPU、メモリ、ネットワーク)
- Kafkaラグ
- QuestDB書き込み成功率
まとめ
本ドキュメントで提案した最適化を段階的に実装することで、Hyperliquid Orderbookシステムのパフォーマンスを大幅に向上させることができます。特に、並行処理の改善とI/O最適化は即効性が高く、実装も比較的容易です。継続的なモニタリングとプロファイリングを行いながら、システムの要求に応じて最適化を進めることが重要です。