ML Documentation

非連続tickデータの機械学習ベストプラクティス

📊 概要

tickデータ(約定データ)は、従来のOHLCVデータと異なり、取引が発生した瞬間のみ記録される非連続・不規則な時系列データです。このドキュメントでは、このような特殊なデータから効果的に特徴量を計算し、機械学習を行うためのベストプラクティスをまとめます。

🔍 tickデータの特性と課題

OHLCVとの違い

特性 OHLCV Tick Data
時間間隔 固定(1分、5分、1時間など) 不規則(取引発生時のみ)
データ密度 一定 変動(活発な時間帯は密、閑散期は疎)
情報量 集約済み 生の取引情報
ノイズ 平滑化されている マイクロストラクチャーノイズが存在

主な課題

  1. 不規則なタイムスタンプ
    - 機械学習モデルが固定長の入力を期待する場合の問題
    - 時間的パターンの抽出が困難

  2. データの偏り
    東京時間: 100 ticks/分 ロンドン時間: 500 ticks/分 NY時間: 1000 ticks/分 週末: 50 ticks/分

  3. マイクロストラクチャーノイズ
    - Bid-Askバウンス
    - 誤った約定価格
    - HFTによる極小取引

🎯 サンプリング手法の比較

1. Time Bars(時間ベース)

# 問題:閑散時は情報が少なく、活発時は情報を失う
def create_time_bars(ticks, interval='1min'):
    return ticks.resample(interval).agg({
        'price': ['first', 'high', 'low', 'last'],
        'volume': 'sum'
    })

2. Tick Bars(取引回数ベース)

# 改善:一定の取引回数でサンプリング
def create_tick_bars(ticks, tick_count=1000):
    bars = []
    for i in range(0, len(ticks), tick_count):
        batch = ticks[i:i+tick_count]
        bar = {
            'timestamp': batch.iloc[-1]['timestamp'],
            'open': batch.iloc[0]['price'],
            'high': batch['price'].max(),
            'low': batch['price'].min(),
            'close': batch.iloc[-1]['price'],
            'volume': batch['volume'].sum()
        }
        bars.append(bar)
    return pd.DataFrame(bars)

3. Volume Bars(出来高ベース)

# より良い:市場活動に基づくサンプリング
def create_volume_bars(ticks, volume_threshold=100):
    bars = []
    current_volume = 0
    batch = []

    for tick in ticks.itertuples():
        batch.append(tick)
        current_volume += tick.volume

        if current_volume >= volume_threshold:
            bar = create_bar_from_batch(batch)
            bars.append(bar)
            batch = []
            current_volume = 0

    return pd.DataFrame(bars)

4. Dollar Bars(金額ベース)- 推奨

# 最良:価格変動を考慮した最も安定したサンプリング
def create_dollar_bars(ticks, dollar_threshold=1000000):
    bars = []
    current_dollars = 0
    batch = []

    for tick in ticks.itertuples():
        batch.append(tick)
        current_dollars += tick.price * tick.volume

        if current_dollars >= dollar_threshold:
            bar = create_bar_from_batch(batch)
            bars.append(bar)
            batch = []
            current_dollars = 0

    return pd.DataFrame(bars)

サンプリング手法の統計的性質

手法 正規性 定常性 自己相関 推奨度
Time Bars 高い
Tick Bars 中程度 ⭐⭐⭐
Volume Bars ✅✅ 低い ⭐⭐⭐⭐
Dollar Bars ✅✅ ✅✅✅ 最低 ⭐⭐⭐⭐⭐

🔧 特徴量エンジニアリング

1. マイクロストラクチャー特徴量

class MicrostructureFeatures:
    @staticmethod
    def kyle_lambda(ticks, window=100):
        """Kyle's Lambda: 価格インパクトの測定"""
        price_changes = ticks['price'].diff()
        signed_volume = ticks['volume'] * np.sign(ticks['side'])

        # OLS回帰: Δp = λ * signed_volume + ε
        lambda_coef = np.cov(price_changes[1:], signed_volume[1:])[0,1] / np.var(signed_volume[1:])
        return lambda_coef

    @staticmethod
    def amihud_illiquidity(ticks, window=100):
        """Amihudの非流動性指標"""
        returns = ticks['price'].pct_change()
        dollar_volume = ticks['price'] * ticks['volume']

        illiquidity = np.abs(returns) / dollar_volume
        return illiquidity.rolling(window).mean()

    @staticmethod
    def roll_spread(ticks, window=100):
        """Roll実効スプレッド推定"""
        price_changes = ticks['price'].diff()
        cov_consecutive = price_changes.rolling(window).apply(
            lambda x: np.cov(x[:-1], x[1:])[0,1]
        )
        roll_spread = 2 * np.sqrt(-cov_consecutive)
        return roll_spread

2. Information-Driven特徴量

class InformationFeatures:
    @staticmethod
    def vpin(ticks, volume_bucket_size=50):
        """Volume-Synchronized PIN (情報トレーダーの確率)"""
        # Volume bucketsの作成
        buckets = create_volume_buckets(ticks, volume_bucket_size)

        # Buy/Sell volumeの分類
        buy_volume = []
        sell_volume = []

        for bucket in buckets:
            # Lee-Ready アルゴリズムで分類
            mid_price = (bucket['bid'] + bucket['ask']) / 2
            buy_vol = bucket[bucket['price'] > mid_price]['volume'].sum()
            sell_vol = bucket[bucket['price'] <= mid_price]['volume'].sum()

            buy_volume.append(buy_vol)
            sell_volume.append(sell_vol)

        # VPINの計算
        order_imbalance = np.abs(np.array(buy_volume) - np.array(sell_volume))
        total_volume = np.array(buy_volume) + np.array(sell_volume)
        vpin = order_imbalance / total_volume

        return pd.Series(vpin).rolling(50).mean()

3. 時間認識特徴量

class TimeAwareFeatures:
    @staticmethod
    def time_since_last_tick(ticks):
        """前回のtickからの経過時間(秒)"""
        timestamps = pd.to_datetime(ticks['timestamp'])
        time_diff = timestamps.diff().dt.total_seconds()
        return time_diff.fillna(0)

    @staticmethod
    def tick_intensity(ticks, window='1min'):
        """時間あたりのtick数(情報到着率)"""
        tick_counts = ticks.set_index('timestamp').resample(window).size()
        return tick_counts.reindex(ticks['timestamp'], method='ffill')

    @staticmethod
    def weighted_price_by_time(ticks, decay_factor=0.99):
        """時間減衰を考慮した加重価格"""
        time_diff = TimeAwareFeatures.time_since_last_tick(ticks)
        weights = decay_factor ** time_diff
        weighted_price = (ticks['price'] * weights).rolling(100).sum() / weights.rolling(100).sum()
        return weighted_price

🤖 機械学習での考慮事項

1. データの前処理

def preprocess_tick_data(ticks):
    # 1. 異常値の除去
    price_zscore = np.abs(stats.zscore(ticks['price']))
    ticks = ticks[price_zscore < 5]

    # 2. フラクショナル差分(定常性の確保)
    from fracdiff import fdiff
    ticks['price_stationary'] = fdiff(ticks['price'], d=0.3)

    # 3. 対数変換(分散安定化)
    ticks['log_price'] = np.log(ticks['price'])
    ticks['log_volume'] = np.log(ticks['volume'] + 1)

    return ticks

2. 特徴量の正規化

class TickDataScaler:
    def __init__(self):
        self.price_scaler = StandardScaler()
        self.volume_scaler = RobustScaler()  # 外れ値に強い
        self.time_scaler = MinMaxScaler()

    def fit_transform(self, ticks):
        # 価格関連:標準化
        price_features = ['price', 'kyle_lambda', 'roll_spread']
        ticks[price_features] = self.price_scaler.fit_transform(ticks[price_features])

        # ボリューム関連:ロバストスケーリング
        volume_features = ['volume', 'dollar_volume', 'vpin']
        ticks[volume_features] = self.volume_scaler.fit_transform(ticks[volume_features])

        # 時間関連:0-1正規化
        time_features = ['time_since_last_tick', 'tick_intensity']
        ticks[time_features] = self.time_scaler.fit_transform(ticks[time_features])

        return ticks

3. 時系列交差検証

class WalkForwardValidator:
    def __init__(self, train_window=10000, test_window=1000, step=1000):
        self.train_window = train_window
        self.test_window = test_window
        self.step = step

    def split(self, ticks):
        n_ticks = len(ticks)

        for start in range(0, n_ticks - self.train_window - self.test_window, self.step):
            train_end = start + self.train_window
            test_end = train_end + self.test_window

            train_idx = range(start, train_end)
            test_idx = range(train_end, test_end)

            yield train_idx, test_idx

💡 実装のベストプラクティス

1. メモリ効率的な処理

class TickDataProcessor:
    def __init__(self, buffer_size=10000):
        self.buffer = deque(maxlen=buffer_size)
        self.features = {}

    def process_tick(self, tick):
        self.buffer.append(tick)

        # インクリメンタルな特徴量計算
        if len(self.buffer) >= 100:
            self.update_features()

    def update_features(self):
        # 最新のデータのみで計算
        recent_ticks = list(self.buffer)[-1000:]

        self.features['kyle_lambda'] = calculate_kyle_lambda(recent_ticks)
        self.features['vpin'] = calculate_vpin(recent_ticks)
        self.features['tick_intensity'] = len(recent_ticks) / time_span(recent_ticks)

2. リアルタイム処理

async def process_tick_stream(websocket):
    processor = TickDataProcessor()
    dollar_bar_creator = DollarBarCreator(threshold=1000000)

    async for tick in websocket:
        # tickの処理
        processor.process_tick(tick)

        # Dollar barの作成
        if dollar_bar := dollar_bar_creator.add_tick(tick):
            # 特徴量の計算
            features = processor.get_current_features()

            # 予測
            prediction = model.predict(features)

            # 取引シグナルの生成
            if prediction > threshold:
                await execute_trade(tick.symbol, 'buy')

3. バックテストの注意点

class RealisticBacktester:
    def __init__(self, latency_ms=10, slippage_bps=2):
        self.latency = latency_ms
        self.slippage = slippage_bps / 10000

    def simulate_execution(self, signal_time, ticks):
        # レイテンシーを考慮
        execution_time = signal_time + pd.Timedelta(milliseconds=self.latency)

        # 実行時点の価格を取得
        future_ticks = ticks[ticks['timestamp'] >= execution_time]
        if future_ticks.empty:
            return None

        execution_price = future_ticks.iloc[0]['price']

        # スリッページを適用
        if signal_type == 'buy':
            execution_price *= (1 + self.slippage)
        else:
            execution_price *= (1 - self.slippage)

        return execution_price

📈 推奨アーキテクチャ

Tick Data Stream
    ↓
Dollar Bar Creator (推奨サンプリング)
    ↓
Feature Engineering Pipeline
    ├─ Microstructure Features
    ├─ Information Features
    └─ Time-Aware Features
    ↓
Data Preprocessing
    ├─ Outlier Removal
    ├─ Fractional Differencing
    └─ Feature Scaling
    ↓
ML Model (LSTM/Transformer/XGBoost)
    ↓
Walk-Forward Validation
    ↓
Production Deployment

🚀 実装例

Rustでの高速Dollar Bar実装

pub struct DollarBarCreator {
    threshold: f64,
    current_batch: Vec<Tick>,
    current_dollars: f64,
}

impl DollarBarCreator {
    pub fn add_tick(&mut self, tick: Tick) -> Option<DollarBar> {
        self.current_batch.push(tick.clone());
        self.current_dollars += tick.price * tick.volume;

        if self.current_dollars >= self.threshold {
            let bar = self.create_bar();
            self.reset();
            Some(bar)
        } else {
            None
        }
    }

    fn create_bar(&self) -> DollarBar {
        DollarBar {
            timestamp: self.current_batch.last().unwrap().timestamp,
            open: self.current_batch.first().unwrap().price,
            high: self.current_batch.iter().map(|t| t.price).max_by(|a, b| a.partial_cmp(b).unwrap()).unwrap(),
            low: self.current_batch.iter().map(|t| t.price).min_by(|a, b| a.partial_cmp(b).unwrap()).unwrap(),
            close: self.current_batch.last().unwrap().price,
            volume: self.current_batch.iter().map(|t| t.volume).sum(),
            dollar_volume: self.current_dollars,
            tick_count: self.current_batch.len(),
        }
    }
}

まとめ

非連続なtickデータから機械学習を行う際の重要なポイント:

  1. Dollar Barsを使用する - 最も安定した統計的性質
  2. マイクロストラクチャー特徴量を活用 - Kyle's Lambda、VPIN等
  3. 時間情報を特徴量に含める - tick間隔、到着率
  4. Walk-Forward検証を使用 - 時系列データに適切
  5. リアルタイム処理を考慮 - レイテンシー、スリッページ

これらのベストプラクティスに従うことで、tickデータから高品質な予測モデルを構築できます。