// consumer/metrics.rs に追加
pub static ref KAFKA_CONSUMER_LAG: GaugeVec = register_gauge_vec!(
"kafka_consumer_lag_messages",
"Number of messages behind the latest offset",
&["topic", "partition"]
).unwrap();
pub static ref BATCH_SIZE_HISTOGRAM: HistogramVec = register_histogram_vec!(
HistogramOpts::new(
"questdb_batch_size",
"Actual batch sizes sent to QuestDB"
)
.buckets(vec![1.0, 10.0, 50.0, 100.0, 500.0, 1000.0]),
&["data_type"]
).unwrap();
pub static ref DUPLICATE_MESSAGES: CounterVec = register_counter_vec!(
"data_duplicate_messages_total",
"Count of duplicate messages detected",
&["symbol", "message_type"]
).unwrap();
pub static ref VALIDATION_FAILURES: CounterVec = register_counter_vec!(
"data_validation_failures_total",
"Count of data validation failures",
&["symbol", "failure_type"]
).unwrap();
-- 5分間の取引統計
CREATE VIEW trade_stats_5m AS
SELECT
timestamp_floor('5m', timestamp) as time_bucket,
symbol,
count(*) as trade_count,
sum(size) as total_volume,
sum(price * size) / sum(size) as vwap,
max(price) - min(price) as price_range,
count(CASE WHEN side = 'buy' THEN 1 END) as buy_count,
count(CASE WHEN side = 'sell' THEN 1 END) as sell_count,
avg(CASE WHEN size > 10000 THEN 1 ELSE 0 END) as large_trade_ratio
FROM trades
WHERE timestamp >= dateadd('m', -5, now())
GROUP BY time_bucket, symbol;
-- 異常検知用スコア
CREATE VIEW anomaly_scores AS
WITH stats AS (
SELECT
symbol,
avg(price) as avg_price,
stddev(price) as std_price,
avg(size) as avg_size,
stddev(size) as std_size
FROM trades
WHERE timestamp >= dateadd('h', -24, now())
GROUP BY symbol
)
SELECT
t.timestamp,
t.symbol,
t.price,
t.size,
abs(t.price - s.avg_price) / s.std_price as price_z_score,
abs(t.size - s.avg_size) / s.std_size as size_z_score,
CASE
WHEN abs(t.price - s.avg_price) / s.std_price > 3 THEN 'price_spike'
WHEN abs(t.size - s.avg_size) / s.std_size > 3 THEN 'volume_spike'
ELSE 'normal'
END as anomaly_type
FROM trades t
JOIN stats s ON t.symbol = s.symbol
WHERE t.timestamp >= dateadd('m', -5, now());
# web分析システムに追加
def calculate_market_quality_metrics():
"""市場品質の総合スコアを計算"""
# 実効スプレッド(実際の約定価格ベース)
effective_spread = calculate_effective_spread()
# 価格改善率(ミッドプライスより良い価格での約定率)
price_improvement = calculate_price_improvement_rate()
# 市場深度の安定性
depth_stability = calculate_depth_stability()
# レジリエンススコア(大口取引後の回復速度)
resilience = calculate_order_book_resilience()
return {
'effective_spread': effective_spread,
'price_improvement_rate': price_improvement,
'depth_stability': depth_stability,
'resilience_score': resilience,
'overall_quality': (
0.3 * (1 / effective_spread) +
0.3 * price_improvement +
0.2 * depth_stability +
0.2 * resilience
)
}