目次
非連続tickデータの機械学習ベストプラクティス
📊 概要
tickデータ(約定データ)は、従来のOHLCVデータと異なり、取引が発生した瞬間のみ記録される非連続・不規則な時系列データです。このドキュメントでは、このような特殊なデータから効果的に特徴量を計算し、機械学習を行うためのベストプラクティスをまとめます。
🔍 tickデータの特性と課題
OHLCVとの違い
| 特性 | OHLCV | Tick Data |
|---|---|---|
| 時間間隔 | 固定(1分、5分、1時間など) | 不規則(取引発生時のみ) |
| データ密度 | 一定 | 変動(活発な時間帯は密、閑散期は疎) |
| 情報量 | 集約済み | 生の取引情報 |
| ノイズ | 平滑化されている | マイクロストラクチャーノイズが存在 |
主な課題
-
不規則なタイムスタンプ
- 機械学習モデルが固定長の入力を期待する場合の問題
- 時間的パターンの抽出が困難 -
データの偏り
東京時間: 100 ticks/分 ロンドン時間: 500 ticks/分 NY時間: 1000 ticks/分 週末: 50 ticks/分 -
マイクロストラクチャーノイズ
- Bid-Askバウンス
- 誤った約定価格
- HFTによる極小取引
🎯 サンプリング手法の比較
1. Time Bars(時間ベース)
# 問題:閑散時は情報が少なく、活発時は情報を失う
def create_time_bars(ticks, interval='1min'):
return ticks.resample(interval).agg({
'price': ['first', 'high', 'low', 'last'],
'volume': 'sum'
})
2. Tick Bars(取引回数ベース)
# 改善:一定の取引回数でサンプリング
def create_tick_bars(ticks, tick_count=1000):
bars = []
for i in range(0, len(ticks), tick_count):
batch = ticks[i:i+tick_count]
bar = {
'timestamp': batch.iloc[-1]['timestamp'],
'open': batch.iloc[0]['price'],
'high': batch['price'].max(),
'low': batch['price'].min(),
'close': batch.iloc[-1]['price'],
'volume': batch['volume'].sum()
}
bars.append(bar)
return pd.DataFrame(bars)
3. Volume Bars(出来高ベース)
# より良い:市場活動に基づくサンプリング
def create_volume_bars(ticks, volume_threshold=100):
bars = []
current_volume = 0
batch = []
for tick in ticks.itertuples():
batch.append(tick)
current_volume += tick.volume
if current_volume >= volume_threshold:
bar = create_bar_from_batch(batch)
bars.append(bar)
batch = []
current_volume = 0
return pd.DataFrame(bars)
4. Dollar Bars(金額ベース)- 推奨
# 最良:価格変動を考慮した最も安定したサンプリング
def create_dollar_bars(ticks, dollar_threshold=1000000):
bars = []
current_dollars = 0
batch = []
for tick in ticks.itertuples():
batch.append(tick)
current_dollars += tick.price * tick.volume
if current_dollars >= dollar_threshold:
bar = create_bar_from_batch(batch)
bars.append(bar)
batch = []
current_dollars = 0
return pd.DataFrame(bars)
サンプリング手法の統計的性質
| 手法 | 正規性 | 定常性 | 自己相関 | 推奨度 |
|---|---|---|---|---|
| Time Bars | ❌ | ❌ | 高い | ⭐ |
| Tick Bars | ✅ | ✅ | 中程度 | ⭐⭐⭐ |
| Volume Bars | ✅ | ✅✅ | 低い | ⭐⭐⭐⭐ |
| Dollar Bars | ✅✅ | ✅✅✅ | 最低 | ⭐⭐⭐⭐⭐ |
🔧 特徴量エンジニアリング
1. マイクロストラクチャー特徴量
class MicrostructureFeatures:
@staticmethod
def kyle_lambda(ticks, window=100):
"""Kyle's Lambda: 価格インパクトの測定"""
price_changes = ticks['price'].diff()
signed_volume = ticks['volume'] * np.sign(ticks['side'])
# OLS回帰: Δp = λ * signed_volume + ε
lambda_coef = np.cov(price_changes[1:], signed_volume[1:])[0,1] / np.var(signed_volume[1:])
return lambda_coef
@staticmethod
def amihud_illiquidity(ticks, window=100):
"""Amihudの非流動性指標"""
returns = ticks['price'].pct_change()
dollar_volume = ticks['price'] * ticks['volume']
illiquidity = np.abs(returns) / dollar_volume
return illiquidity.rolling(window).mean()
@staticmethod
def roll_spread(ticks, window=100):
"""Roll実効スプレッド推定"""
price_changes = ticks['price'].diff()
cov_consecutive = price_changes.rolling(window).apply(
lambda x: np.cov(x[:-1], x[1:])[0,1]
)
roll_spread = 2 * np.sqrt(-cov_consecutive)
return roll_spread
2. Information-Driven特徴量
class InformationFeatures:
@staticmethod
def vpin(ticks, volume_bucket_size=50):
"""Volume-Synchronized PIN (情報トレーダーの確率)"""
# Volume bucketsの作成
buckets = create_volume_buckets(ticks, volume_bucket_size)
# Buy/Sell volumeの分類
buy_volume = []
sell_volume = []
for bucket in buckets:
# Lee-Ready アルゴリズムで分類
mid_price = (bucket['bid'] + bucket['ask']) / 2
buy_vol = bucket[bucket['price'] > mid_price]['volume'].sum()
sell_vol = bucket[bucket['price'] <= mid_price]['volume'].sum()
buy_volume.append(buy_vol)
sell_volume.append(sell_vol)
# VPINの計算
order_imbalance = np.abs(np.array(buy_volume) - np.array(sell_volume))
total_volume = np.array(buy_volume) + np.array(sell_volume)
vpin = order_imbalance / total_volume
return pd.Series(vpin).rolling(50).mean()
3. 時間認識特徴量
class TimeAwareFeatures:
@staticmethod
def time_since_last_tick(ticks):
"""前回のtickからの経過時間(秒)"""
timestamps = pd.to_datetime(ticks['timestamp'])
time_diff = timestamps.diff().dt.total_seconds()
return time_diff.fillna(0)
@staticmethod
def tick_intensity(ticks, window='1min'):
"""時間あたりのtick数(情報到着率)"""
tick_counts = ticks.set_index('timestamp').resample(window).size()
return tick_counts.reindex(ticks['timestamp'], method='ffill')
@staticmethod
def weighted_price_by_time(ticks, decay_factor=0.99):
"""時間減衰を考慮した加重価格"""
time_diff = TimeAwareFeatures.time_since_last_tick(ticks)
weights = decay_factor ** time_diff
weighted_price = (ticks['price'] * weights).rolling(100).sum() / weights.rolling(100).sum()
return weighted_price
🤖 機械学習での考慮事項
1. データの前処理
def preprocess_tick_data(ticks):
# 1. 異常値の除去
price_zscore = np.abs(stats.zscore(ticks['price']))
ticks = ticks[price_zscore < 5]
# 2. フラクショナル差分(定常性の確保)
from fracdiff import fdiff
ticks['price_stationary'] = fdiff(ticks['price'], d=0.3)
# 3. 対数変換(分散安定化)
ticks['log_price'] = np.log(ticks['price'])
ticks['log_volume'] = np.log(ticks['volume'] + 1)
return ticks
2. 特徴量の正規化
class TickDataScaler:
def __init__(self):
self.price_scaler = StandardScaler()
self.volume_scaler = RobustScaler() # 外れ値に強い
self.time_scaler = MinMaxScaler()
def fit_transform(self, ticks):
# 価格関連:標準化
price_features = ['price', 'kyle_lambda', 'roll_spread']
ticks[price_features] = self.price_scaler.fit_transform(ticks[price_features])
# ボリューム関連:ロバストスケーリング
volume_features = ['volume', 'dollar_volume', 'vpin']
ticks[volume_features] = self.volume_scaler.fit_transform(ticks[volume_features])
# 時間関連:0-1正規化
time_features = ['time_since_last_tick', 'tick_intensity']
ticks[time_features] = self.time_scaler.fit_transform(ticks[time_features])
return ticks
3. 時系列交差検証
class WalkForwardValidator:
def __init__(self, train_window=10000, test_window=1000, step=1000):
self.train_window = train_window
self.test_window = test_window
self.step = step
def split(self, ticks):
n_ticks = len(ticks)
for start in range(0, n_ticks - self.train_window - self.test_window, self.step):
train_end = start + self.train_window
test_end = train_end + self.test_window
train_idx = range(start, train_end)
test_idx = range(train_end, test_end)
yield train_idx, test_idx
💡 実装のベストプラクティス
1. メモリ効率的な処理
class TickDataProcessor:
def __init__(self, buffer_size=10000):
self.buffer = deque(maxlen=buffer_size)
self.features = {}
def process_tick(self, tick):
self.buffer.append(tick)
# インクリメンタルな特徴量計算
if len(self.buffer) >= 100:
self.update_features()
def update_features(self):
# 最新のデータのみで計算
recent_ticks = list(self.buffer)[-1000:]
self.features['kyle_lambda'] = calculate_kyle_lambda(recent_ticks)
self.features['vpin'] = calculate_vpin(recent_ticks)
self.features['tick_intensity'] = len(recent_ticks) / time_span(recent_ticks)
2. リアルタイム処理
async def process_tick_stream(websocket):
processor = TickDataProcessor()
dollar_bar_creator = DollarBarCreator(threshold=1000000)
async for tick in websocket:
# tickの処理
processor.process_tick(tick)
# Dollar barの作成
if dollar_bar := dollar_bar_creator.add_tick(tick):
# 特徴量の計算
features = processor.get_current_features()
# 予測
prediction = model.predict(features)
# 取引シグナルの生成
if prediction > threshold:
await execute_trade(tick.symbol, 'buy')
3. バックテストの注意点
class RealisticBacktester:
def __init__(self, latency_ms=10, slippage_bps=2):
self.latency = latency_ms
self.slippage = slippage_bps / 10000
def simulate_execution(self, signal_time, ticks):
# レイテンシーを考慮
execution_time = signal_time + pd.Timedelta(milliseconds=self.latency)
# 実行時点の価格を取得
future_ticks = ticks[ticks['timestamp'] >= execution_time]
if future_ticks.empty:
return None
execution_price = future_ticks.iloc[0]['price']
# スリッページを適用
if signal_type == 'buy':
execution_price *= (1 + self.slippage)
else:
execution_price *= (1 - self.slippage)
return execution_price
📈 推奨アーキテクチャ
Tick Data Stream
↓
Dollar Bar Creator (推奨サンプリング)
↓
Feature Engineering Pipeline
├─ Microstructure Features
├─ Information Features
└─ Time-Aware Features
↓
Data Preprocessing
├─ Outlier Removal
├─ Fractional Differencing
└─ Feature Scaling
↓
ML Model (LSTM/Transformer/XGBoost)
↓
Walk-Forward Validation
↓
Production Deployment
🚀 実装例
Rustでの高速Dollar Bar実装
pub struct DollarBarCreator {
threshold: f64,
current_batch: Vec<Tick>,
current_dollars: f64,
}
impl DollarBarCreator {
pub fn add_tick(&mut self, tick: Tick) -> Option<DollarBar> {
self.current_batch.push(tick.clone());
self.current_dollars += tick.price * tick.volume;
if self.current_dollars >= self.threshold {
let bar = self.create_bar();
self.reset();
Some(bar)
} else {
None
}
}
fn create_bar(&self) -> DollarBar {
DollarBar {
timestamp: self.current_batch.last().unwrap().timestamp,
open: self.current_batch.first().unwrap().price,
high: self.current_batch.iter().map(|t| t.price).max_by(|a, b| a.partial_cmp(b).unwrap()).unwrap(),
low: self.current_batch.iter().map(|t| t.price).min_by(|a, b| a.partial_cmp(b).unwrap()).unwrap(),
close: self.current_batch.last().unwrap().price,
volume: self.current_batch.iter().map(|t| t.volume).sum(),
dollar_volume: self.current_dollars,
tick_count: self.current_batch.len(),
}
}
}
まとめ
非連続なtickデータから機械学習を行う際の重要なポイント:
- Dollar Barsを使用する - 最も安定した統計的性質
- マイクロストラクチャー特徴量を活用 - Kyle's Lambda、VPIN等
- 時間情報を特徴量に含める - tick間隔、到着率
- Walk-Forward検証を使用 - 時系列データに適切
- リアルタイム処理を考慮 - レイテンシー、スリッページ
これらのベストプラクティスに従うことで、tickデータから高品質な予測モデルを構築できます。