ML Documentation

不規則時系列データ(Tickデータ)の処理手法

概要

金融市場のtickデータは、OHLCVデータと異なり時間間隔が不規則です。取引が発生したタイミングでのみデータが記録されるため、活発な時間帯では秒間数百件、閑散時では数分間データがないこともあります。本ドキュメントでは、この不規則性に対処する様々な手法を解説します。

1. tickデータとOHLCVの違い

1.1 データ特性の比較

特性 Tickデータ OHLCV
時間間隔 不規則(取引発生時) 規則的(1分、5分等)
データ量 非常に大(日中数十万件) 比較的小
情報の粒度 個別取引レベル 集約済み
ノイズ 高い 低い
マーケット・マイクロストラクチャ 完全に保持 一部喪失

1.2 不規則性がもたらす課題

import pandas as pd
import numpy as np

# tickデータの例
tick_data = pd.DataFrame({
    'timestamp': ['2024-01-01 09:00:00.123', 
                  '2024-01-01 09:00:00.456',
                  '2024-01-01 09:00:02.789',  # 2秒以上の間隔
                  '2024-01-01 09:00:02.890'],
    'price': [50000, 50001, 50003, 50002],
    'volume': [0.1, 0.2, 0.5, 0.3]
})

# 時間間隔の計算
tick_data['timestamp'] = pd.to_datetime(tick_data['timestamp'])
tick_data['time_diff'] = tick_data['timestamp'].diff().dt.total_seconds()
print(f"時間間隔の統計: 平均{tick_data['time_diff'].mean():.2f}秒, 
      最大{tick_data['time_diff'].max():.2f}")

2. 時間ベースサンプリング手法

2.1 固定時間間隔へのリサンプリング

def time_based_sampling(tick_df, freq='5S'):
    """
    固定時間間隔でのリサンプリング

    Parameters:
    -----------
    tick_df : DataFrame
        tickデータ(timestamp, price, volume列を含む)
    freq : str
        リサンプリング頻度('5S'=5秒, '1T'=1分等)
    """
    # timestampをインデックスに設定
    tick_df.set_index('timestamp', inplace=True)

    # OHLCVへの変換
    ohlc = tick_df['price'].resample(freq).ohlc()
    volume = tick_df['volume'].resample(freq).sum()

    # 結果の結合
    result = pd.concat([ohlc, volume], axis=1)
    result.columns = ['open', 'high', 'low', 'close', 'volume']

    # 欠損値の処理(前方補間)
    result.fillna(method='ffill', inplace=True)

    return result

2.2 補間手法の比較

from scipy import interpolate

def interpolation_methods(tick_df, target_times):
    """
    各種補間手法の実装
    """
    # 時刻を数値に変換(Unix timestamp)
    x = tick_df['timestamp'].astype(np.int64) / 1e9
    y = tick_df['price'].values

    # 1. 線形補間
    linear_interp = np.interp(target_times, x, y)

    # 2. スプライン補間(3次)
    spline = interpolate.CubicSpline(x, y)
    spline_interp = spline(target_times)

    # 3. 前方補間(最後の既知の値を使用)
    forward_fill = pd.Series(y, index=x).reindex(target_times, method='ffill')

    # 4. 階段補間(最近傍)
    nearest = interpolate.interp1d(x, y, kind='nearest', 
                                  fill_value='extrapolate')
    nearest_interp = nearest(target_times)

    return {
        'linear': linear_interp,
        'spline': spline_interp,
        'forward_fill': forward_fill,
        'nearest': nearest_interp
    }

3. ボリュームベースサンプリング

3.1 ボリュームバーの作成

def create_volume_bars(tick_df, volume_threshold=100):
    """
    一定のボリュームごとにバーを作成

    メリット:
    - 市場活動に比例したサンプリング
    - 情報の均一性が高い
    """
    bars = []
    current_bar = {'open': None, 'high': -np.inf, 
                   'low': np.inf, 'close': None,
                   'volume': 0, 'timestamp': None}

    for _, row in tick_df.iterrows():
        if current_bar['open'] is None:
            current_bar['open'] = row['price']
            current_bar['timestamp'] = row['timestamp']

        current_bar['high'] = max(current_bar['high'], row['price'])
        current_bar['low'] = min(current_bar['low'], row['price'])
        current_bar['close'] = row['price']
        current_bar['volume'] += row['volume']

        if current_bar['volume'] >= volume_threshold:
            bars.append(current_bar.copy())
            current_bar = {'open': None, 'high': -np.inf, 
                          'low': np.inf, 'close': None,
                          'volume': 0, 'timestamp': None}

    return pd.DataFrame(bars)

3.2 ドルバー(Dollar Bars)

def create_dollar_bars(tick_df, dollar_threshold=1000000):
    """
    一定の取引金額ごとにバーを作成

    メリット:
    - 価格変動を考慮
    - より安定した統計的性質
    """
    bars = []
    current_bar = {'open': None, 'high': -np.inf, 
                   'low': np.inf, 'close': None,
                   'volume': 0, 'dollar_volume': 0,
                   'timestamp': None, 'tick_count': 0}

    for _, row in tick_df.iterrows():
        dollar_volume = row['price'] * row['volume']

        if current_bar['open'] is None:
            current_bar['open'] = row['price']
            current_bar['timestamp'] = row['timestamp']

        current_bar['high'] = max(current_bar['high'], row['price'])
        current_bar['low'] = min(current_bar['low'], row['price'])
        current_bar['close'] = row['price']
        current_bar['volume'] += row['volume']
        current_bar['dollar_volume'] += dollar_volume
        current_bar['tick_count'] += 1

        if current_bar['dollar_volume'] >= dollar_threshold:
            bars.append(current_bar.copy())
            current_bar = {'open': None, 'high': -np.inf, 
                          'low': np.inf, 'close': None,
                          'volume': 0, 'dollar_volume': 0,
                          'timestamp': None, 'tick_count': 0}

    return pd.DataFrame(bars)

4. 情報駆動バー(Information-Driven Bars)

4.1 インバランスバー

def calculate_tick_imbalance(tick_df):
    """
    買い圧力と売り圧力の不均衡を検出
    """
    # 価格変化の方向を計算
    tick_df['price_change'] = tick_df['price'].diff()
    tick_df['direction'] = np.sign(tick_df['price_change'])
    tick_df['direction'].fillna(0, inplace=True)

    # 累積インバランスを計算
    tick_df['cumulative_imbalance'] = tick_df['direction'].cumsum()

    return tick_df

def create_imbalance_bars(tick_df, expected_imbalance, initial_T=100):
    """
    累積インバランスが閾値に達したらバーを作成
    """
    tick_df = calculate_tick_imbalance(tick_df)
    bars = []
    current_bar = None
    T = initial_T  # 動的に調整される期待tick数

    for i, row in tick_df.iterrows():
        if current_bar is None:
            current_bar = {
                'start_idx': i, 'timestamp': row['timestamp'],
                'open': row['price'], 'high': row['price'],
                'low': row['price'], 'volume': 0,
                'tick_count': 0, 'imbalance': 0
            }

        current_bar['high'] = max(current_bar['high'], row['price'])
        current_bar['low'] = min(current_bar['low'], row['price'])
        current_bar['close'] = row['price']
        current_bar['volume'] += row['volume']
        current_bar['tick_count'] += 1
        current_bar['imbalance'] += row['direction']

        # インバランス閾値チェック
        if abs(current_bar['imbalance']) >= expected_imbalance * T:
            bars.append(current_bar)
            # 指数加重移動平均でTを更新
            T = 0.95 * T + 0.05 * current_bar['tick_count']
            current_bar = None

    return pd.DataFrame(bars)

5. 時間重み付けサンプリング

5.1 指数重み付け

def exponential_weighted_sampling(tick_df, half_life='30S'):
    """
    最近のデータにより高い重みを付けてサンプリング
    """
    # 時間重みの計算
    latest_time = tick_df['timestamp'].max()
    time_diff = (latest_time - tick_df['timestamp']).dt.total_seconds()

    # 指数重み(半減期で調整)
    half_life_seconds = pd.Timedelta(half_life).total_seconds()
    weights = np.exp(-np.log(2) * time_diff / half_life_seconds)

    # 重み付き統計量
    weighted_price = np.average(tick_df['price'], weights=weights)
    weighted_volume = np.sum(tick_df['volume'] * weights)

    return {
        'weighted_price': weighted_price,
        'weighted_volume': weighted_volume,
        'effective_samples': np.sum(weights)  # 実効サンプル数
    }

6. 機械学習向けの特徴量エンジニアリング

6.1 マイクロストラクチャ特徴量

def calculate_microstructure_features(tick_df, window='60S'):
    """
    tickデータから市場マイクロストラクチャ特徴量を抽出
    """
    features = pd.DataFrame(index=tick_df.index)

    # 1. 取引強度(単位時間あたりの取引数)
    features['trade_intensity'] = tick_df.groupby(
        pd.Grouper(key='timestamp', freq=window)
    ).size()

    # 2. 実効スプレッド
    tick_df['mid_price'] = tick_df['price'].rolling(2).mean()
    features['effective_spread'] = 2 * abs(tick_df['price'] - tick_df['mid_price'])

    # 3. 価格変化の自己相関
    price_changes = tick_df['price'].diff()
    features['price_autocorr'] = price_changes.rolling(20).apply(
        lambda x: x.autocorr(lag=1) if len(x) > 1 else 0
    )

    # 4. ボリューム加重平均価格(VWAP)からの乖離
    tick_df['dollar_volume'] = tick_df['price'] * tick_df['volume']
    vwap = tick_df['dollar_volume'].rolling(window).sum() / \
           tick_df['volume'].rolling(window).sum()
    features['vwap_deviation'] = (tick_df['price'] - vwap) / vwap

    # 5. 注文フローのインバランス
    tick_df['signed_volume'] = tick_df['volume'] * np.sign(tick_df['price'].diff())
    features['order_flow_imbalance'] = tick_df['signed_volume'].rolling(window).sum()

    # 6. 実現ボラティリティ
    features['realized_volatility'] = np.sqrt(
        (price_changes ** 2).rolling(window).sum()
    )

    return features

6.2 適応的サンプリング

class AdaptiveSampler:
    """
    市場状況に応じて動的にサンプリング方法を変更
    """
    def __init__(self, base_interval='5S'):
        self.base_interval = pd.Timedelta(base_interval)
        self.volatility_threshold = 0.001
        self.volume_threshold = 100

    def sample(self, tick_df, lookback='5T'):
        """
        ボラティリティと取引量に基づいて適応的にサンプリング
        """
        # 直近のボラティリティを計算
        recent_data = tick_df[
            tick_df['timestamp'] > tick_df['timestamp'].max() - pd.Timedelta(lookback)
        ]

        returns = recent_data['price'].pct_change()
        current_volatility = returns.std()
        current_volume_rate = len(recent_data) / pd.Timedelta(lookback).total_seconds()

        # サンプリング間隔の調整
        if current_volatility > self.volatility_threshold:
            # 高ボラティリティ時は細かくサンプリング
            interval = self.base_interval / 2
        elif current_volume_rate > self.volume_threshold:
            # 高取引量時も細かくサンプリング
            interval = self.base_interval / 1.5
        else:
            # 通常時
            interval = self.base_interval

        # リサンプリング実行
        resampled = tick_df.set_index('timestamp').resample(interval).agg({
            'price': ['first', 'max', 'min', 'last'],
            'volume': 'sum'
        })

        return resampled, {
            'current_volatility': current_volatility,
            'current_volume_rate': current_volume_rate,
            'used_interval': interval
        }

7. 実装上の注意点

7.1 メモリ管理

def process_large_tick_data(file_path, chunk_size=100000):
    """
    大規模tickデータの効率的な処理
    """
    processed_data = []

    # チャンク単位で読み込み
    for chunk in pd.read_csv(file_path, chunksize=chunk_size):
        # 各チャンクを処理
        chunk['timestamp'] = pd.to_datetime(chunk['timestamp'])

        # ボリュームバーに変換(メモリ効率的)
        volume_bars = create_volume_bars(chunk, volume_threshold=50)
        processed_data.append(volume_bars)

    # 結果を結合
    return pd.concat(processed_data, ignore_index=True)

7.2 リアルタイム処理

class RealTimeTickProcessor:
    """
    リアルタイムtickデータ処理
    """
    def __init__(self, window_size=1000):
        self.buffer = deque(maxlen=window_size)
        self.current_bar = None
        self.bars = []

    def process_tick(self, tick):
        """
        新しいtickを処理
        """
        self.buffer.append(tick)

        # バーの更新判定
        if self._should_create_new_bar(tick):
            if self.current_bar:
                self.bars.append(self.current_bar)
            self.current_bar = self._initialize_bar(tick)
        else:
            self._update_current_bar(tick)

        # 特徴量の計算
        features = self._calculate_real_time_features()

        return self.current_bar, features

    def _calculate_real_time_features(self):
        """
        リアルタイム特徴量計算
        """
        if len(self.buffer) < 2:
            return {}

        prices = [t['price'] for t in self.buffer]
        volumes = [t['volume'] for t in self.buffer]

        return {
            'ma_5': np.mean(prices[-5:]) if len(prices) >= 5 else prices[-1],
            'volatility': np.std(prices[-20:]) if len(prices) >= 20 else 0,
            'volume_rate': sum(volumes[-10:]) if len(volumes) >= 10 else 0,
            'price_momentum': (prices[-1] - prices[-10]) / prices[-10] 
                             if len(prices) >= 10 else 0
        }

まとめ

tickデータの不規則性に対処する方法は、用途に応じて選択する必要があります:

  1. 高頻度取引(HFT): マイクロストラクチャ特徴量を保持するため、最小限の集約
  2. 中長期予測: ボリュームバーやドルバーで情報の均一性を確保
  3. リアルタイム分析: 適応的サンプリングで計算効率と精度のバランス
  4. 機械学習: 情報駆動バーで統計的性質の安定化

各手法にはトレードオフがあり、実際の取引戦略やモデルの要求に応じて最適な方法を選択することが重要です。