目次
推奨追加メトリクス実装ガイド
優先度1: Crawlerで実装(即座に実装可能)
1. オーダーブック更新頻度
// metrics.rs に追加
pub static ref ORDERBOOK_UPDATE_RATE: GaugeVec = register_gauge_vec!(
"crypto_orderbook_updates_per_second",
"Orderbook update rate per symbol",
&["symbol"]
).unwrap();
効果: WebSocket接続の健全性とデータ品質を監視
2. 最良価格変化率
pub static ref BEST_PRICE_CHANGE_RATE: GaugeVec = register_gauge_vec!(
"crypto_best_price_change_percent",
"Best bid/ask price change rate",
&["symbol", "side"]
).unwrap();
効果: 市場のボラティリティをリアルタイム監視
3. メッセージサイズ統計
pub static ref MESSAGE_SIZE_BYTES: HistogramVec = register_histogram_vec!(
HistogramOpts::new(
"crypto_message_size_bytes",
"WebSocket message size distribution"
)
.buckets(vec![100.0, 500.0, 1000.0, 5000.0, 10000.0]),
&["message_type"]
).unwrap();
効果: ネットワーク帯域使用量の最適化
優先度2: Consumerで実装(バッチ処理で効率的)
4. 取引量集計(時間窓)
pub static ref TRADE_VOLUME_USD: GaugeVec = register_gauge_vec!(
"crypto_trade_volume_usd_total",
"Total trade volume in USD",
&["symbol", "window"]
).unwrap();
// 1分、5分、15分窓で集計
効果: 流動性とマーケット活動の把握
5. 大口取引検出
pub static ref LARGE_TRADE_COUNT: CounterVec = register_counter_vec!(
"crypto_large_trades_total",
"Count of large trades above threshold",
&["symbol", "size_category"] // small, medium, large, whale
).unwrap();
効果: マーケットインパクトの予測
6. 価格急変検出
pub static ref PRICE_SPIKE_EVENTS: CounterVec = register_counter_vec!(
"crypto_price_spike_events_total",
"Price movements exceeding threshold",
&["symbol", "direction", "severity"] // up/down, minor/major/extreme
).unwrap();
効果: アラート発火とリスク管理
優先度3: 後処理システムで実装
7. VWAP計算(5分ごと)
-- QuestDBで定期実行
INSERT INTO vwap_5m
SELECT
timestamp_floor('5m', timestamp) as time,
symbol,
sum(price * size) / sum(size) as vwap,
sum(size) as total_volume
FROM trades
WHERE timestamp >= dateadd('m', -5, now())
GROUP BY time, symbol;
8. 市場品質スコア
# Web分析システムで計算
def calculate_market_quality_score(symbol):
# スプレッド、深度、更新頻度を総合評価
spread_score = 1 / (1 + spread_percentage)
depth_score = log(market_depth_usd)
update_score = min(update_rate / 10, 1)
return (spread_score + depth_score + update_score) / 3
実装の指針
- Crawlerメトリクス: メッセージ受信時に即座に計算
- Consumerメトリクス: バッファリングして定期的に集計
- 後処理メトリクス: cronジョブやストリーミングSQLで計算
パフォーマンス影響
- Crawler追加負荷: < 1% CPU増加
- Consumer追加負荷: < 3% CPU増加(バッチ処理のため)
- メモリ影響: 最小限(ゲージとカウンターのみ)
実装例
Crawlerでの実装パターン
// crawler.rs の calculate_orderbook_metrics に追加
fn calculate_orderbook_metrics(&self, orderbook: &OrderbookSnapshot) {
// 既存のメトリクス...
// 更新頻度の追跡
let now = Instant::now();
if let Some(last_update) = self.last_update_time.get(&orderbook.symbol) {
let rate = 1.0 / now.duration_since(*last_update).as_secs_f64();
metrics::ORDERBOOK_UPDATE_RATE
.with_label_values(&[&orderbook.symbol])
.set(rate);
}
self.last_update_time.insert(orderbook.symbol.clone(), now);
}
Consumerでの実装パターン
// consumer.rs にウィンドウ集計を追加
struct VolumeAggregator {
windows: HashMap<String, HashMap<String, f64>>, // symbol -> window -> volume
last_flush: Instant,
}
impl VolumeAggregator {
fn add_trade(&mut self, symbol: &str, volume_usd: f64) {
for window in &["1m", "5m", "15m"] {
self.windows
.entry(symbol.to_string())
.or_insert_with(HashMap::new)
.entry(window.to_string())
.and_modify(|v| *v += volume_usd)
.or_insert(volume_usd);
}
}
fn flush_metrics(&mut self) {
if self.last_flush.elapsed() > Duration::from_secs(60) {
for (symbol, windows) in &self.windows {
for (window, volume) in windows {
metrics::TRADE_VOLUME_USD
.with_label_values(&[symbol, window])
.set(*volume);
}
}
self.windows.clear();
self.last_flush = Instant::now();
}
}
}
モニタリング効果
これらのメトリクスにより:
1. 市場の健全性: 更新頻度とメッセージサイズで接続品質を監視
2. 取引機会: 価格変動と大口取引でエントリーポイントを検出
3. リスク管理: 急激な価格変動とボラティリティを追跡
4. システム最適化: メッセージサイズとバッチ効率を改善