目次
tickデータから機械学習を行う際のベストプラクティス
目次
1. tickデータの特性と課題
1.1 不規則な時間間隔
- 課題: tickデータは取引が発生したタイミングでのみ記録されるため、時間間隔が不規則
- 影響: 従来の時系列分析手法が直接適用できない
- 対策: イベントドリブンなサンプリング手法の採用
1.2 データの偏り(活発な時間帯と閑散期)
- 特徴: 仮想通貨市場は24/7で稼働し、時間帯による活動量の差が大きい
- 問題点:
- 活発な時間帯では大量のノイズが含まれる
- 閑散期では情報が少なくシグナルが弱い
- 解決法: 市場活動に応じた動的サンプリング
1.3 ノイズとマイクロストラクチャー
- Bad ticks: ゼロまたは負の価格・数量は即座に除外
- Bid/Ask bounce: スプレッド以上の価格変動のみを有効とする
- ステルストレーディング: 小〜中規模の取引に情報が含まれることが多い
2. 時間サンプリング手法
2.1 Time Bars(時間バー)
# 固定時間間隔でサンプリング
# 欠点: 市場活動を無視し、統計的性質が劣る
def create_time_bars(ticks, interval='1min'):
return ticks.resample(interval).agg({
'price': 'ohlc',
'volume': 'sum'
})
2.2 Tick Bars(ティックバー)
# 固定数の取引後にバーを作成
def create_tick_bars(ticks, threshold=1000):
bars = []
current_bar = []
for tick in ticks:
current_bar.append(tick)
if len(current_bar) >= threshold:
bars.append(aggregate_bar(current_bar))
current_bar = []
return bars
2.3 Volume Bars(出来高バー)
# 固定出来高に達したらバーを作成
def create_volume_bars(ticks, volume_threshold=100):
bars = []
current_volume = 0
current_bar = []
for tick in ticks:
current_bar.append(tick)
current_volume += tick['volume']
if current_volume >= volume_threshold:
bars.append(aggregate_bar(current_bar))
current_volume = 0
current_bar = []
return bars
2.4 Dollar Bars(ドルバー)
# 固定金額分の取引後にバーを作成
# 最も安定した統計的性質を持つ
def create_dollar_bars(ticks, dollar_threshold=10000):
bars = []
current_value = 0
current_bar = []
for tick in ticks:
current_bar.append(tick)
current_value += tick['price'] * tick['volume']
if current_value >= dollar_threshold:
bars.append(aggregate_bar(current_bar))
current_value = 0
current_bar = []
return bars
2.5 Information-Driven Bars(情報駆動バー)
# 市場の不均衡を検出してサンプリング頻度を調整
def create_imbalance_bars(ticks, initial_threshold=1000):
bars = []
buy_volume = 0
sell_volume = 0
threshold = initial_threshold
for tick in ticks:
if tick['side'] == 'buy':
buy_volume += tick['volume']
else:
sell_volume += tick['volume']
imbalance = abs(buy_volume - sell_volume)
if imbalance >= threshold:
bars.append(create_bar_with_imbalance(tick, imbalance))
# 動的に閾値を調整
threshold = adjust_threshold(imbalance, threshold)
buy_volume = 0
sell_volume = 0
return bars
3. 特徴量エンジニアリング
3.1 マイクロストラクチャー特徴量
// 現在のプロジェクトの実装例
pub struct OrderbookMetrics {
pub spread_bps: Option<f64>,
pub depth_imbalance_ratio: Option<f64>,
pub bid_levels: usize,
pub ask_levels: usize,
pub total_bid_volume: Decimal,
pub total_ask_volume: Decimal,
pub weighted_mid_price: Option<Decimal>,
}
3.2 流動性指標
def calculate_liquidity_metrics(orderbook):
# Kyle's Lambda (価格インパクト)
price_impact = calculate_price_impact(orderbook, trade_size=1000)
# Amihudの非流動性指標
amihud_illiquidity = abs(returns) / dollar_volume
# 実効スプレッド
effective_spread = 2 * abs(trade_price - mid_price) / mid_price
# Market Depth Score
depth_score = total_liquidity / (spread * 1000)
return {
'price_impact': price_impact,
'amihud_illiquidity': amihud_illiquidity,
'effective_spread': effective_spread,
'depth_score': min(depth_score, 100) # 0-100に正規化
}
3.3 オーダーフロー不均衡
def calculate_order_flow_imbalance(trades, window=100):
# Volume Order Imbalance (VOI)
buy_volume = sum(t['volume'] for t in trades if t['side'] == 'buy')
sell_volume = sum(t['volume'] for t in trades if t['side'] == 'sell')
voi = (buy_volume - sell_volume) / (buy_volume + sell_volume)
# Trade Flow Imbalance (TFI)
# より価格変動の説明力が高い
weighted_buy = sum(t['volume'] * t['price'] for t in trades if t['side'] == 'buy')
weighted_sell = sum(t['volume'] * t['price'] for t in trades if t['side'] == 'sell')
tfi = (weighted_buy - weighted_sell) / (weighted_buy + weighted_sell)
return {
'volume_order_imbalance': voi,
'trade_flow_imbalance': tfi
}
3.4 価格インパクト指標
def calculate_vwap_metrics(orderbook, levels=5):
# 上位5レベルのVWAP計算
bid_vwap = calculate_vwap(orderbook['bids'][:levels])
ask_vwap = calculate_vwap(orderbook['asks'][:levels])
# インパクトコスト
mid_price = (orderbook['best_bid'] + orderbook['best_ask']) / 2
bid_impact = (mid_price - bid_vwap) / mid_price
ask_impact = (ask_vwap - mid_price) / mid_price
return {
'bid_vwap_5': bid_vwap,
'ask_vwap_5': ask_vwap,
'bid_impact_bps': bid_impact * 10000,
'ask_impact_bps': ask_impact * 10000
}
4. 機械学習における考慮事項
4.1 データの定常性
# ADF検定による定常性チェック
from statsmodels.tsa.stattools import adfuller
def check_stationarity(series):
result = adfuller(series)
return {
'is_stationary': result[1] < 0.05,
'p_value': result[1],
'critical_values': result[4]
}
# 非定常データの処理
def make_stationary(series):
# 対数差分変換
log_returns = np.log(series).diff().dropna()
# フラクショナル差分(メモリ効果を保持)
fractional_diff = fractional_difference(series, d=0.3)
return log_returns, fractional_diff
4.2 過学習対策
# Walk-Forward Analysis(推奨)
def walk_forward_validation(data, model, train_size=1000, test_size=100):
results = []
for i in range(train_size, len(data) - test_size, test_size):
# 訓練データ
train = data[i-train_size:i]
# テストデータ(未来のデータ)
test = data[i:i+test_size]
# モデル訓練
model.fit(train)
# 予測と評価
predictions = model.predict(test)
score = evaluate(test, predictions)
results.append(score)
return results
# Combinatorial Purged Cross-Validation (CPCV)
# 時系列の依存関係を考慮した高度な手法
def combinatorial_purged_cv(data, n_splits=5, purge_gap=10):
# 実装は複雑だが、従来のCV手法より優れた性能
pass
4.3 特徴量の正規化
# ローリングウィンドウによる正規化(lookback biasを防ぐ)
def rolling_normalization(features, window=1000):
normalized = pd.DataFrame(index=features.index)
for col in features.columns:
# 過去のデータのみを使用
rolling_mean = features[col].rolling(window, min_periods=100).mean()
rolling_std = features[col].rolling(window, min_periods=100).std()
# Z-score正規化
normalized[col] = (features[col] - rolling_mean) / rolling_std
return normalized
4.4 時系列交差検証
# Blocked Time Series Split
def blocked_time_series_split(data, n_splits=5, test_size=0.2, gap=0.05):
"""
ギャップを設けて情報漏洩を防ぐ
"""
splits = []
total_size = len(data)
test_length = int(total_size * test_size)
gap_length = int(total_size * gap)
for i in range(n_splits):
test_end = total_size - i * test_length
test_start = test_end - test_length
train_end = test_start - gap_length
if train_end <= test_length:
break
train_idx = range(0, train_end)
test_idx = range(test_start, test_end)
splits.append((train_idx, test_idx))
return splits
5. 実装上のテクニック
5.1 メモリ効率的な処理
// 現在のプロジェクトの実装例(Rust)
impl OrderbookHistory {
fn add_snapshot(&mut self, msg: &OrderbookKafkaMessage) {
// データを追加
self.spreads.push_back(spread_f64);
self.depths.push_back((bid_depth_5, ask_depth_5));
self.timestamps.push_back(msg.timestamp);
// 古いデータを削除(メモリ効率)
let cutoff = msg.timestamp - Duration::seconds(60);
while let Some(&front_time) = self.timestamps.front() {
if front_time < cutoff {
self.timestamps.pop_front();
self.spreads.pop_front();
self.depths.pop_front();
} else {
break;
}
}
}
}
5.2 リアルタイム特徴量計算
class RealTimeFeatureCalculator:
def __init__(self, window_sizes=[10, 30, 60, 300]):
self.window_sizes = window_sizes
self.buffers = {w: deque(maxlen=w) for w in window_sizes}
def update(self, tick):
# 各ウィンドウサイズでバッファを更新
for window, buffer in self.buffers.items():
buffer.append(tick)
# 特徴量を計算
features = {}
for window, buffer in self.buffers.items():
if len(buffer) >= window // 2: # 最小データ数
features[f'mean_{window}'] = np.mean([t['price'] for t in buffer])
features[f'std_{window}'] = np.std([t['price'] for t in buffer])
features[f'volume_{window}'] = sum(t['volume'] for t in buffer)
return features
5.3 バックテストにおける注意点
class RealisticBacktester:
def __init__(self, latency_ms=10, slippage_bps=5):
self.latency_ms = latency_ms
self.slippage_bps = slippage_bps
def execute_trade(self, signal_time, orderbook_history):
# レイテンシを考慮した実行時刻
execution_time = signal_time + pd.Timedelta(milliseconds=self.latency_ms)
# 実行時点のオーダーブック
execution_orderbook = orderbook_history.at[execution_time]
# スリッページを適用
if signal == 'buy':
execution_price = execution_orderbook['ask'] * (1 + self.slippage_bps / 10000)
else:
execution_price = execution_orderbook['bid'] * (1 - self.slippage_bps / 10000)
return execution_price
6. 具体的な実装例と改善提案
6.1 現在のプロジェクトへの改善提案
1. Dollar Barsの実装
// src/rust/hyperliquid-orderbook-consumer/src/dollar_bars.rs
use rust_decimal::Decimal;
use chrono::{DateTime, Utc};
pub struct DollarBarBuilder {
threshold: Decimal,
current_value: Decimal,
current_trades: Vec<Trade>,
}
impl DollarBarBuilder {
pub fn new(threshold: Decimal) -> Self {
Self {
threshold,
current_value: Decimal::ZERO,
current_trades: Vec::new(),
}
}
pub fn add_trade(&mut self, trade: Trade) -> Option<DollarBar> {
let trade_value = trade.price * trade.size;
self.current_value += trade_value;
self.current_trades.push(trade);
if self.current_value >= self.threshold {
let bar = self.create_bar();
self.reset();
Some(bar)
} else {
None
}
}
fn create_bar(&self) -> DollarBar {
// OHLCV計算
let open = self.current_trades.first().unwrap().price;
let close = self.current_trades.last().unwrap().price;
let high = self.current_trades.iter().map(|t| t.price).max().unwrap();
let low = self.current_trades.iter().map(|t| t.price).min().unwrap();
let volume: Decimal = self.current_trades.iter().map(|t| t.size).sum();
DollarBar {
timestamp: Utc::now(),
open,
high,
low,
close,
volume,
dollar_volume: self.current_value,
trade_count: self.current_trades.len(),
}
}
}
2. マイクロストラクチャー特徴量の拡張
// 現在のmetrics_calculator.rsに追加
impl MetricsCalculator {
pub fn calculate_microstructure_features(&self, orderbook: &OrderbookSnapshot) -> MicrostructureFeatures {
// Kyle's Lambda (価格インパクト)
let kyle_lambda = self.estimate_kyle_lambda(orderbook);
// Amihudの非流動性指標
let amihud_illiquidity = self.calculate_amihud(orderbook);
// 実効スプレッド
let effective_spread = self.calculate_effective_spread(orderbook);
// PIN (Probability of Informed Trading)
let pin = self.estimate_pin(orderbook);
MicrostructureFeatures {
kyle_lambda,
amihud_illiquidity,
effective_spread,
pin,
// 既存の特徴量も含める
spread_bps: orderbook.spread.map(|s| s.to_f64().unwrap() * 10000.0),
depth_imbalance: self.calculate_depth_imbalance(orderbook),
}
}
}
3. Walk-Forward検証システム
// src/rust/hyperliquid-orderbook-consumer/src/ml_validator.rs
pub struct WalkForwardValidator {
train_window: Duration,
test_window: Duration,
gap: Duration,
}
impl WalkForwardValidator {
pub fn validate<M: Model>(&self, data: &[OrderbookSnapshot], model: &M) -> ValidationResults {
let mut results = Vec::new();
let mut current_time = data[0].timestamp + self.train_window;
while current_time < data.last().unwrap().timestamp - self.test_window {
// 訓練データ
let train_start = current_time - self.train_window;
let train_end = current_time;
// ギャップ
let test_start = train_end + self.gap;
let test_end = test_start + self.test_window;
// データ分割
let train_data = self.filter_by_time(data, train_start, train_end);
let test_data = self.filter_by_time(data, test_start, test_end);
// モデル訓練と評価
let trained_model = model.train(&train_data);
let score = trained_model.evaluate(&test_data);
results.push(score);
current_time += self.test_window;
}
ValidationResults::from(results)
}
}
4. リアルタイム異常検知
// 現在のconsumer.rsに追加
impl OrderbookConsumer {
async fn detect_anomalies(&self, orderbook: &OrderbookSnapshot) -> Vec<Anomaly> {
let mut anomalies = Vec::new();
// スプレッドの異常
if let Some(spread_z_score) = self.calculate_spread_zscore(orderbook) {
if spread_z_score.abs() > 3.0 {
anomalies.push(Anomaly::SpreadAnomaly {
z_score: spread_z_score,
spread: orderbook.spread,
});
}
}
// 流動性の急激な変化
if let Some(liquidity_change) = self.detect_liquidity_shock(orderbook) {
if liquidity_change > 0.5 { // 50%以上の変化
anomalies.push(Anomaly::LiquidityShock {
change_ratio: liquidity_change,
side: if orderbook.bid_liquidity < orderbook.ask_liquidity { "bid" } else { "ask" },
});
}
}
// アラート送信
if !anomalies.is_empty() {
self.send_alert(anomalies).await;
}
anomalies
}
}
6.2 パフォーマンス最適化の提案
1. バッチ処理の最適化
// 動的バッチサイズ調整
impl DynamicBatcher {
pub fn calculate_optimal_batch_size(&self, throughput: f64, latency: Duration) -> usize {
// スループットとレイテンシのバランスを取る
let base_size = 1000;
let throughput_factor = (throughput / 10000.0).min(2.0);
let latency_factor = (50.0 / latency.as_millis() as f64).max(0.5);
(base_size as f64 * throughput_factor * latency_factor) as usize
}
}
2. 特徴量計算の並列化
use rayon::prelude::*;
impl FeatureCalculator {
pub fn calculate_features_parallel(&self, orderbooks: &[OrderbookSnapshot]) -> Vec<Features> {
orderbooks.par_iter()
.map(|orderbook| {
let microstructure = self.calculate_microstructure_features(orderbook);
let technical = self.calculate_technical_features(orderbook);
let flow = self.calculate_flow_features(orderbook);
Features {
microstructure,
technical,
flow,
timestamp: orderbook.timestamp,
}
})
.collect()
}
}
6.3 モニタリングダッシュボードの拡張
# src/web/ml_monitoring.py
class MLMonitoringDashboard:
def __init__(self):
self.feature_stats = {}
self.model_performance = {}
self.anomaly_history = deque(maxlen=1000)
def update_feature_distribution(self, features):
"""特徴量の分布をリアルタイムで追跡"""
for name, value in features.items():
if name not in self.feature_stats:
self.feature_stats[name] = {
'mean': 0,
'std': 0,
'min': float('inf'),
'max': float('-inf'),
'count': 0
}
stats = self.feature_stats[name]
# オンライン統計更新
stats['count'] += 1
delta = value - stats['mean']
stats['mean'] += delta / stats['count']
stats['std'] = np.sqrt((stats['std']**2 * (stats['count']-1) + delta**2) / stats['count'])
stats['min'] = min(stats['min'], value)
stats['max'] = max(stats['max'], value)
def detect_feature_drift(self, threshold=3.0):
"""特徴量のドリフトを検出"""
drifts = []
for name, stats in self.feature_stats.items():
if stats['count'] > 100: # 十分なサンプル数
# 最新の値と履歴統計を比較
z_score = abs((stats['last_value'] - stats['mean']) / stats['std'])
if z_score > threshold:
drifts.append({
'feature': name,
'z_score': z_score,
'direction': 'up' if stats['last_value'] > stats['mean'] else 'down'
})
return drifts
まとめ
tickデータから機械学習を行う際の重要なポイント:
- データサンプリング: Dollar Barsが最も安定した統計的性質を持つ
- 特徴量エンジニアリング: マイクロストラクチャー特徴量とオーダーフロー分析が重要
- 検証手法: Walk-Forward AnalysisまたはCPCVを使用
- リアルタイム処理: メモリ効率と計算効率を考慮した実装
- 異常検知: 市場の異常を早期に検出するシステムの構築
現在のプロジェクトは基本的な機能は実装されていますが、上記の改善提案を実装することで、より堅牢で高性能な機械学習システムを構築できます。