ML Documentation

GRU-Dの暗号通貨データ(Ticks/Orderbook)への適用ガイド

1. 暗号通貨データの特性とGRU-Dの適合性

1.1 Ticksデータの特性

Ticksデータ(約定データ)は典型的な不規則時系列データです:

1.2 Orderbookデータの特性

Orderbookデータも不規則に更新される:

2. Ticksデータ用GRU-D実装

2.1 データ構造と前処理

import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from collections import deque
from datetime import datetime, timedelta

class TicksDataProcessor:
    """Ticksデータ用の前処理クラス"""

    def __init__(self, 
                 base_interval='1s',  # 基本時間間隔(1秒)
                 max_gap_minutes=5,   # 最大欠損許容時間
                 feature_windows=[60, 300, 600]):  # 特徴量計算窓(秒)
        self.base_interval = base_interval
        self.max_gap_minutes = max_gap_minutes
        self.feature_windows = feature_windows

    def extract_tick_features(self, ticks_df):
        """Tickデータから特徴量を抽出"""
        features = []

        # 基本特徴
        features.append(ticks_df['price'].values)
        features.append(ticks_df['size'].values)
        features.append((ticks_df['side'] == 'buy').astype(float).values)

        # 累積特徴
        features.append(ticks_df['size'].cumsum().values)
        features.append(ticks_df.groupby('side')['size'].cumsum().values)

        # 価格変動特徴
        features.append(ticks_df['price'].diff().fillna(0).values)
        features.append(np.log(ticks_df['price'] / ticks_df['price'].shift(1)).fillna(0).values)

        # 取引インパクト
        features.append((ticks_df['size'] * ticks_df['price']).values)

        return np.column_stack(features)

    def create_regular_grid(self, ticks_df, start_time, end_time):
        """不規則なTicksデータを規則的グリッドに変換"""
        # 規則的な時間グリッド作成
        time_grid = pd.date_range(start=start_time, end=end_time, freq=self.base_interval)
        n_steps = len(time_grid)
        n_features = 8  # 上記で定義した特徴量の数

        # 初期化
        values = np.zeros((n_steps, n_features))
        mask = np.zeros((n_steps, n_features))
        delta_t = np.zeros((n_steps, 1))
        last_observed = np.zeros((n_steps, n_features))

        # 各時間窓での集計
        for window in self.feature_windows:
            window_features = self._compute_window_features(ticks_df, time_grid, window)
            values = np.concatenate([values, window_features['values']], axis=1)
            mask = np.concatenate([mask, window_features['mask']], axis=1)

        # 時間間隔とlast observed値の計算
        last_tick_idx = -1
        last_tick_features = np.zeros(values.shape[1])

        for i, t in enumerate(time_grid):
            # 時間窓内のtickを検索
            window_start = t - pd.Timedelta(seconds=1)
            window_end = t

            ticks_in_window = ticks_df[
                (ticks_df.index >= window_start) & 
                (ticks_df.index < window_end)
            ]

            if len(ticks_in_window) > 0:
                # 実測値あり
                tick_features = self._aggregate_ticks(ticks_in_window)
                values[i, :len(tick_features)] = tick_features
                mask[i, :len(tick_features)] = 1
                last_tick_idx = i
                last_tick_features[:len(tick_features)] = tick_features

            # 時間間隔の計算
            if last_tick_idx >= 0:
                delta_t[i] = (i - last_tick_idx) * pd.Timedelta(self.base_interval).total_seconds()

            # 最後の観測値
            last_observed[i] = last_tick_features

        return {
            'values': values,
            'mask': mask,
            'delta_t': delta_t / 60.0,  # 分単位に変換
            'last_observed': last_observed,
            'timestamps': time_grid
        }

    def _aggregate_ticks(self, ticks):
        """時間窓内のticksを集計"""
        if len(ticks) == 0:
            return np.zeros(8)

        features = [
            ticks['price'].mean(),                    # 平均価格
            ticks['size'].sum(),                      # 総取引量
            (ticks['side'] == 'buy').mean(),         # 買い比率
            len(ticks),                               # 取引回数
            ticks['price'].std() if len(ticks) > 1 else 0,  # 価格標準偏差
            ticks['size'].mean(),                     # 平均取引サイズ
            ticks['price'].max() - ticks['price'].min(),    # 価格レンジ
            (ticks['price'] * ticks['size']).sum() / ticks['size'].sum()  # VWAP
        ]

        return np.array(features)

    def _compute_window_features(self, ticks_df, time_grid, window_seconds):
        """指定された時間窓での特徴量計算"""
        n_steps = len(time_grid)
        window_features = {
            'momentum': np.zeros(n_steps),
            'volatility': np.zeros(n_steps),
            'volume_rate': np.zeros(n_steps),
            'trade_intensity': np.zeros(n_steps),
            'buy_pressure': np.zeros(n_steps)
        }

        mask = np.zeros((n_steps, len(window_features)))

        for i, t in enumerate(time_grid):
            window_start = t - pd.Timedelta(seconds=window_seconds)
            window_ticks = ticks_df[(ticks_df.index >= window_start) & (ticks_df.index <= t)]

            if len(window_ticks) >= 2:
                # モメンタム
                window_features['momentum'][i] = (
                    window_ticks['price'].iloc[-1] - window_ticks['price'].iloc[0]
                ) / window_ticks['price'].iloc[0]

                # ボラティリティ
                window_features['volatility'][i] = window_ticks['price'].pct_change().std()

                # 取引頻度
                window_features['volume_rate'][i] = window_ticks['size'].sum() / window_seconds

                # 取引強度
                window_features['trade_intensity'][i] = len(window_ticks) / window_seconds

                # 買い圧力
                buy_volume = window_ticks[window_ticks['side'] == 'buy']['size'].sum()
                total_volume = window_ticks['size'].sum()
                window_features['buy_pressure'][i] = buy_volume / total_volume if total_volume > 0 else 0.5

                mask[i] = 1

        # 特徴量を結合
        values = np.column_stack([window_features[k] for k in window_features])

        return {'values': values, 'mask': mask}

2.2 Ticks用GRU-Dモデル

class TicksGRUD(nn.Module):
    """Ticksデータ専用のGRU-Dモデル"""

    def __init__(self, 
                 tick_feature_size=8,
                 window_feature_size=15,  # 3 windows × 5 features
                 hidden_size=128,
                 num_layers=2,
                 dropout=0.2):
        super(TicksGRUD, self).__init__()

        total_feature_size = tick_feature_size + window_feature_size

        # 基本GRU-D
        self.grud = GRUD(
            input_size=total_feature_size,
            hidden_size=hidden_size,
            output_size=hidden_size,
            num_layers=num_layers,
            dropout=dropout
        )

        # Tick特有の処理層
        self.tick_embedding = nn.Sequential(
            nn.Linear(tick_feature_size, hidden_size // 2),
            nn.ReLU(),
            nn.Dropout(dropout)
        )

        # 予測ヘッド
        self.price_predictor = nn.Sequential(
            nn.Linear(hidden_size, hidden_size // 2),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_size // 2, 1)
        )

        self.volume_predictor = nn.Sequential(
            nn.Linear(hidden_size, hidden_size // 2),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_size // 2, 1)
        )

        self.direction_classifier = nn.Sequential(
            nn.Linear(hidden_size, hidden_size // 2),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_size // 2, 3)  # up/neutral/down
        )

    def forward(self, x, mask, delta_t, x_last_observed):
        # GRU-D処理
        grud_output, hidden = self.grud(x, mask, delta_t, x_last_observed)

        # 最後の時点の出力を使用
        last_output = grud_output[:, -1, :]

        # 予測
        price_pred = self.price_predictor(last_output)
        volume_pred = self.volume_predictor(last_output)
        direction_logits = self.direction_classifier(last_output)

        return {
            'price': price_pred,
            'volume': volume_pred,
            'direction': torch.softmax(direction_logits, dim=-1),
            'hidden': hidden
        }

2.3 リアルタイムTicks処理

class RealtimeTicksProcessor:
    """リアルタイムTicksデータ処理"""

    def __init__(self, model, buffer_minutes=10, update_interval_seconds=1):
        self.model = model
        self.buffer_minutes = buffer_minutes
        self.update_interval = update_interval_seconds

        # バッファ
        self.ticks_buffer = deque()
        self.features_buffer = deque(maxlen=buffer_minutes * 60 // update_interval_seconds)
        self.mask_buffer = deque(maxlen=buffer_minutes * 60 // update_interval_seconds)
        self.time_buffer = deque(maxlen=buffer_minutes * 60 // update_interval_seconds)

        # 状態
        self.last_update_time = None
        self.last_features = None
        self.hidden_state = None

        # 前処理
        self.processor = TicksDataProcessor()

    def add_tick(self, tick):
        """新しいTickを追加"""
        self.ticks_buffer.append(tick)
        current_time = tick['timestamp']

        # 更新間隔チェック
        if self.last_update_time is None:
            self.last_update_time = current_time

        if (current_time - self.last_update_time).total_seconds() >= self.update_interval:
            self._update_features(current_time)
            self.last_update_time = current_time

    def _update_features(self, current_time):
        """特徴量バッファを更新"""
        # 時間窓内のticksを取得
        window_start = current_time - timedelta(seconds=self.update_interval)
        window_ticks = [t for t in self.ticks_buffer 
                       if t['timestamp'] >= window_start and t['timestamp'] <= current_time]

        if window_ticks:
            # 特徴量抽出
            features = self._extract_realtime_features(window_ticks)
            mask = np.ones_like(features)
            self.last_features = features
        else:
            # 欠損
            features = np.zeros_like(self.last_features) if self.last_features is not None else np.zeros(23)
            mask = np.zeros_like(features)

        # バッファに追加
        self.features_buffer.append(features)
        self.mask_buffer.append(mask)
        self.time_buffer.append(current_time)

        # 古いticksを削除
        cutoff_time = current_time - timedelta(minutes=self.buffer_minutes)
        self.ticks_buffer = deque(t for t in self.ticks_buffer if t['timestamp'] > cutoff_time)

    def predict_next(self, horizon_seconds=60):
        """次の価格を予測"""
        if len(self.features_buffer) < 10:
            return None

        # 現在の状態を準備
        features = torch.FloatTensor(list(self.features_buffer)).unsqueeze(0)
        mask = torch.FloatTensor(list(self.mask_buffer)).unsqueeze(0)

        # 時間間隔を計算
        delta_t = []
        last_observed_idx = -1
        for i in range(len(self.time_buffer)):
            if self.mask_buffer[i].sum() > 0:
                last_observed_idx = i

            if last_observed_idx >= 0:
                time_diff = (self.time_buffer[i] - self.time_buffer[last_observed_idx]).total_seconds() / 60
            else:
                time_diff = 0

            delta_t.append(time_diff)

        delta_t = torch.FloatTensor(delta_t).unsqueeze(0).unsqueeze(-1)

        # 最後の観測値
        last_observed = self._compute_last_observed()

        # 予測
        self.model.eval()
        with torch.no_grad():
            predictions = self.model(features, mask, delta_t, last_observed)

        # 将来の予測を外挿
        future_price = self._extrapolate_price(
            predictions['price'].item(),
            predictions['direction'].numpy()[0],
            horizon_seconds
        )

        return {
            'current_price': self._get_last_price(),
            'predicted_price': future_price,
            'predicted_volume': predictions['volume'].item(),
            'direction_probs': {
                'up': predictions['direction'][0, 0].item(),
                'neutral': predictions['direction'][0, 1].item(),
                'down': predictions['direction'][0, 2].item()
            },
            'horizon': horizon_seconds
        }

    def _extract_realtime_features(self, ticks):
        """リアルタイムの特徴量抽出"""
        if not ticks:
            return np.zeros(23)

        # 基本統計
        prices = [t['price'] for t in ticks]
        sizes = [t['size'] for t in ticks]
        sides = [1 if t['side'] == 'buy' else 0 for t in ticks]

        basic_features = [
            np.mean(prices),
            np.sum(sizes),
            np.mean(sides),
            len(ticks),
            np.std(prices) if len(prices) > 1 else 0,
            np.mean(sizes),
            max(prices) - min(prices) if prices else 0,
            sum(p * s for p, s in zip(prices, sizes)) / sum(sizes) if sum(sizes) > 0 else np.mean(prices)
        ]

        # 時間窓特徴量(簡略化)
        window_features = []
        for window_minutes in [1, 5, 10]:
            window_start = ticks[-1]['timestamp'] - timedelta(minutes=window_minutes)
            window_ticks = [t for t in ticks if t['timestamp'] >= window_start]

            if len(window_ticks) >= 2:
                window_prices = [t['price'] for t in window_ticks]
                momentum = (window_prices[-1] - window_prices[0]) / window_prices[0]
                volatility = np.std(np.diff(window_prices)) / np.mean(window_prices)
                volume_rate = sum(t['size'] for t in window_ticks) / (window_minutes * 60)
            else:
                momentum = 0
                volatility = 0
                volume_rate = 0

            window_features.extend([momentum, volatility, volume_rate])

        # マーケットマイクロストラクチャ特徴
        if len(ticks) > 1:
            # Kyle's lambda (簡略版)
            price_changes = np.diff(prices)
            signed_volumes = [s * (2 * side - 1) for s, side in zip(sizes[1:], sides[1:])]
            if len(price_changes) > 0 and np.std(signed_volumes) > 0:
                kyle_lambda = np.corrcoef(price_changes, signed_volumes)[0, 1] * \
                             (np.std(price_changes) / np.std(signed_volumes))
            else:
                kyle_lambda = 0

            # Order flow imbalance
            buy_volume = sum(s for s, side in zip(sizes, sides) if side == 1)
            sell_volume = sum(s for s, side in zip(sizes, sides) if side == 0)
            ofi = (buy_volume - sell_volume) / (buy_volume + sell_volume) if (buy_volume + sell_volume) > 0 else 0
        else:
            kyle_lambda = 0
            ofi = 0

        market_features = [kyle_lambda, ofi]

        # 大口取引検出
        large_trade_threshold = np.percentile(sizes, 90) if len(sizes) > 10 else np.mean(sizes) * 2
        large_trades = sum(1 for s in sizes if s > large_trade_threshold)
        large_volume = sum(s for s in sizes if s > large_trade_threshold)

        large_trade_features = [
            large_trades / len(ticks) if ticks else 0,
            large_volume / sum(sizes) if sum(sizes) > 0 else 0
        ]

        return np.array(basic_features + window_features + market_features + large_trade_features)

    def _get_last_price(self):
        """最後の価格を取得"""
        if self.ticks_buffer:
            return self.ticks_buffer[-1]['price']
        return None

    def _extrapolate_price(self, base_prediction, direction_probs, horizon_seconds):
        """価格を外挿"""
        # 方向性に基づく調整
        direction_factor = (
            direction_probs[0] * 1.0 +    # up
            direction_probs[1] * 0.0 +    # neutral
            direction_probs[2] * (-1.0)   # down
        )

        # 時間に応じた調整(簡略化)
        time_factor = np.sqrt(horizon_seconds / 60.0)  # ルート時間則

        return base_prediction * (1 + direction_factor * 0.001 * time_factor)

    def _compute_last_observed(self):
        """最後の観測値を計算"""
        last_observed = []

        for i in range(len(self.features_buffer)):
            if self.mask_buffer[i].sum() > 0:
                last_observed.append(self.features_buffer[i])
            elif last_observed:
                last_observed.append(last_observed[-1])
            else:
                last_observed.append(np.zeros_like(self.features_buffer[0]))

        return torch.FloatTensor(last_observed).unsqueeze(0)

3. Orderbookデータ用GRU-D実装

3.1 Orderbookデータの前処理

class OrderbookDataProcessor:
    """Orderbookデータ用の前処理クラス"""

    def __init__(self, 
                 levels=20,           # 使用する価格レベル数
                 base_interval='100ms',  # 基本時間間隔(100ミリ秒)
                 feature_type='raw'):    # 'raw', 'normalized', 'log'
        self.levels = levels
        self.base_interval = base_interval
        self.feature_type = feature_type

    def extract_orderbook_features(self, orderbook):
        """Orderbookから特徴量を抽出"""
        features = []

        # 基本特徴
        mid_price = (orderbook['bids'][0][0] + orderbook['asks'][0][0]) / 2
        spread = orderbook['asks'][0][0] - orderbook['bids'][0][0]
        spread_pct = spread / mid_price

        features.extend([mid_price, spread, spread_pct])

        # 各レベルの価格と数量
        for i in range(self.levels):
            if i < len(orderbook['bids']):
                bid_price, bid_size = orderbook['bids'][i]
                # 価格を中間価格からの相対値に変換
                features.extend([
                    (bid_price - mid_price) / mid_price,
                    bid_size
                ])
            else:
                features.extend([0, 0])

            if i < len(orderbook['asks']):
                ask_price, ask_size = orderbook['asks'][i]
                features.extend([
                    (ask_price - mid_price) / mid_price,
                    ask_size
                ])
            else:
                features.extend([0, 0])

        # 集計特徴
        features.extend(self._compute_aggregate_features(orderbook, mid_price))

        return np.array(features)

    def _compute_aggregate_features(self, orderbook, mid_price):
        """集計特徴量の計算"""
        features = []

        # 深度別の不均衡
        for depth in [5, 10, 20]:
            bid_volume = sum(level[1] for level in orderbook['bids'][:depth])
            ask_volume = sum(level[1] for level in orderbook['asks'][:depth])

            imbalance = (bid_volume - ask_volume) / (bid_volume + ask_volume) if (bid_volume + ask_volume) > 0 else 0
            features.append(imbalance)

            # 加重平均価格
            if bid_volume > 0:
                weighted_bid = sum(level[0] * level[1] for level in orderbook['bids'][:depth]) / bid_volume
            else:
                weighted_bid = orderbook['bids'][0][0] if orderbook['bids'] else mid_price

            if ask_volume > 0:
                weighted_ask = sum(level[0] * level[1] for level in orderbook['asks'][:depth]) / ask_volume
            else:
                weighted_ask = orderbook['asks'][0][0] if orderbook['asks'] else mid_price

            weighted_mid = (weighted_bid + weighted_ask) / 2
            features.append((weighted_mid - mid_price) / mid_price)

        # 板の形状特徴
        features.extend(self._compute_shape_features(orderbook))

        return features

    def _compute_shape_features(self, orderbook):
        """板の形状に関する特徴量"""
        features = []

        # 価格の勾配
        if len(orderbook['bids']) > 1:
            bid_prices = [level[0] for level in orderbook['bids'][:10]]
            bid_gradient = np.polyfit(range(len(bid_prices)), bid_prices, 1)[0]
        else:
            bid_gradient = 0

        if len(orderbook['asks']) > 1:
            ask_prices = [level[0] for level in orderbook['asks'][:10]]
            ask_gradient = np.polyfit(range(len(ask_prices)), ask_prices, 1)[0]
        else:
            ask_gradient = 0

        features.extend([bid_gradient, ask_gradient])

        # 流動性の集中度
        total_bid_volume = sum(level[1] for level in orderbook['bids'])
        total_ask_volume = sum(level[1] for level in orderbook['asks'])

        if total_bid_volume > 0:
            bid_concentration = orderbook['bids'][0][1] / total_bid_volume
        else:
            bid_concentration = 0

        if total_ask_volume > 0:
            ask_concentration = orderbook['asks'][0][1] / total_ask_volume
        else:
            ask_concentration = 0

        features.extend([bid_concentration, ask_concentration])

        return features

3.2 Orderbook用GRU-Dモデル

class OrderbookGRUD(nn.Module):
    """Orderbook専用のGRU-Dモデル"""

    def __init__(self,
                 levels=20,
                 feature_size=None,
                 hidden_size=256,
                 num_layers=3,
                 dropout=0.2):
        super(OrderbookGRUD, self).__init__()

        # 特徴量サイズの計算
        if feature_size is None:
            # 3 (基本) + levels*4 (価格と数量) + 12 (集計特徴)
            feature_size = 3 + levels * 4 + 12

        self.feature_size = feature_size
        self.hidden_size = hidden_size

        # 特徴量の前処理
        self.feature_projection = nn.Sequential(
            nn.Linear(feature_size, hidden_size),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_size, hidden_size)
        )

        # GRU-D層
        self.grud = GRUD(
            input_size=hidden_size,
            hidden_size=hidden_size,
            output_size=hidden_size,
            num_layers=num_layers,
            dropout=dropout
        )

        # Attention機構
        self.attention = nn.MultiheadAttention(
            embed_dim=hidden_size,
            num_heads=8,
            dropout=dropout
        )

        # 予測ヘッド
        self.price_movement_head = nn.Sequential(
            nn.Linear(hidden_size, hidden_size // 2),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_size // 2, 5)  # 大幅下落/下落/横ばい/上昇/大幅上昇
        )

        self.spread_predictor = nn.Sequential(
            nn.Linear(hidden_size, hidden_size // 4),
            nn.ReLU(),
            nn.Linear(hidden_size // 4, 1)
        )

        self.liquidity_predictor = nn.Sequential(
            nn.Linear(hidden_size, hidden_size // 4),
            nn.ReLU(),
            nn.Linear(hidden_size // 4, 2)  # bid/ask流動性
        )

        self.execution_price_predictor = nn.Sequential(
            nn.Linear(hidden_size + 1, hidden_size // 2),  # +1 for order size
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_size // 2, 2)  # bid/ask実行価格
        )

    def forward(self, x, mask, delta_t, x_last_observed, order_sizes=None):
        # 特徴量の射影
        x_projected = self.feature_projection(x)

        # GRU-D処理
        grud_output, hidden = self.grud(x_projected, mask, delta_t, x_last_observed)

        # Self-attention
        attended, _ = self.attention(
            grud_output.transpose(0, 1),
            grud_output.transpose(0, 1),
            grud_output.transpose(0, 1)
        )
        attended = attended.transpose(0, 1)

        # 最後の出力を使用
        last_output = attended[:, -1, :]

        # 各種予測
        price_movement = self.price_movement_head(last_output)
        spread = self.spread_predictor(last_output)
        liquidity = self.liquidity_predictor(last_output)

        outputs = {
            'price_movement': torch.softmax(price_movement, dim=-1),
            'spread': spread,
            'liquidity': liquidity,
            'hidden': hidden
        }

        # 執行価格予測(オプション)
        if order_sizes is not None:
            # 注文サイズを考慮した執行価格
            exec_input = torch.cat([last_output, order_sizes], dim=-1)
            execution_prices = self.execution_price_predictor(exec_input)
            outputs['execution_prices'] = execution_prices

        return outputs

3.3 リアルタイムOrderbook処理

class RealtimeOrderbookProcessor:
    """リアルタイムOrderbookデータ処理"""

    def __init__(self, model, buffer_seconds=30, update_interval_ms=100):
        self.model = model
        self.buffer_seconds = buffer_seconds
        self.update_interval_ms = update_interval_ms

        # バッファ
        self.buffer_size = buffer_seconds * 1000 // update_interval_ms
        self.orderbook_buffer = deque(maxlen=self.buffer_size)
        self.features_buffer = deque(maxlen=self.buffer_size)
        self.mask_buffer = deque(maxlen=self.buffer_size)
        self.time_buffer = deque(maxlen=self.buffer_size)

        # 状態
        self.last_update_time = None
        self.last_orderbook = None
        self.last_features = None
        self.hidden_state = None

        # 前処理
        self.processor = OrderbookDataProcessor()

        # 市場インパクトモデル
        self.impact_estimator = MarketImpactEstimator()

    def update_orderbook(self, orderbook, timestamp):
        """Orderbookの更新"""
        self.orderbook_buffer.append({
            'orderbook': orderbook,
            'timestamp': timestamp
        })

        # 更新間隔チェック
        if self.last_update_time is None:
            self.last_update_time = timestamp

        time_diff_ms = (timestamp - self.last_update_time).total_seconds() * 1000

        if time_diff_ms >= self.update_interval_ms:
            self._update_features(orderbook, timestamp)
            self.last_update_time = timestamp
            self.last_orderbook = orderbook

    def _update_features(self, orderbook, timestamp):
        """特徴量の更新"""
        # 特徴量抽出
        features = self.processor.extract_orderbook_features(orderbook)

        # 変化の検出
        if self.last_features is not None:
            # 重要な変化があったかチェック
            feature_change = np.abs(features - self.last_features).mean()
            if feature_change < 1e-6:
                # 変化なし = 欠損として扱う
                mask = np.zeros_like(features)
            else:
                mask = np.ones_like(features)
        else:
            mask = np.ones_like(features)

        # バッファに追加
        self.features_buffer.append(features)
        self.mask_buffer.append(mask)
        self.time_buffer.append(timestamp)
        self.last_features = features

    def predict_market_state(self):
        """市場状態の予測"""
        if len(self.features_buffer) < 10:
            return None

        # データ準備
        features = torch.FloatTensor(list(self.features_buffer)).unsqueeze(0)
        mask = torch.FloatTensor(list(self.mask_buffer)).unsqueeze(0)

        # 時間間隔計算
        delta_t = self._compute_time_intervals()
        last_observed = self._compute_last_observed()

        # 予測
        self.model.eval()
        with torch.no_grad():
            predictions = self.model(features, mask, delta_t, last_observed)

        # 結果の解釈
        price_movement_probs = predictions['price_movement'][0].numpy()
        movement_labels = ['大幅下落', '下落', '横ばい', '上昇', '大幅上昇']
        predicted_movement = movement_labels[np.argmax(price_movement_probs)]

        return {
            'predicted_movement': predicted_movement,
            'movement_probabilities': dict(zip(movement_labels, price_movement_probs)),
            'predicted_spread': predictions['spread'].item(),
            'predicted_liquidity': {
                'bid': predictions['liquidity'][0, 0].item(),
                'ask': predictions['liquidity'][0, 1].item()
            },
            'market_conditions': self._assess_market_conditions()
        }

    def predict_execution_price(self, side, size):
        """執行価格の予測"""
        if len(self.features_buffer) < 10:
            return None

        # 現在の板情報
        current_orderbook = self.orderbook_buffer[-1]['orderbook']

        # 市場インパクトの推定
        immediate_impact = self.impact_estimator.estimate_immediate_impact(
            current_orderbook, side, size
        )

        # モデルによる予測
        features = torch.FloatTensor(list(self.features_buffer)).unsqueeze(0)
        mask = torch.FloatTensor(list(self.mask_buffer)).unsqueeze(0)
        delta_t = self._compute_time_intervals()
        last_observed = self._compute_last_observed()
        order_size_normalized = torch.FloatTensor([[size / 1000.0]])  # 正規化

        self.model.eval()
        with torch.no_grad():
            predictions = self.model(
                features, mask, delta_t, last_observed, 
                order_sizes=order_size_normalized
            )

        # 予測執行価格
        if side == 'buy':
            model_adjustment = predictions['execution_prices'][0, 1].item()
        else:
            model_adjustment = predictions['execution_prices'][0, 0].item()

        # 最終的な執行価格
        mid_price = (current_orderbook['bids'][0][0] + current_orderbook['asks'][0][0]) / 2
        execution_price = mid_price * (1 + immediate_impact + model_adjustment)

        return {
            'execution_price': execution_price,
            'immediate_impact': immediate_impact,
            'model_adjustment': model_adjustment,
            'total_cost': execution_price * size,
            'slippage': abs(execution_price - mid_price) / mid_price
        }

    def _compute_time_intervals(self):
        """時間間隔の計算"""
        delta_t = []
        last_update_idx = -1

        for i in range(len(self.time_buffer)):
            if self.mask_buffer[i].sum() > 0:
                last_update_idx = i

            if last_update_idx >= 0:
                time_diff = (self.time_buffer[i] - self.time_buffer[last_update_idx]).total_seconds()
            else:
                time_diff = 0

            delta_t.append(time_diff)

        return torch.FloatTensor(delta_t).unsqueeze(0).unsqueeze(-1)

    def _compute_last_observed(self):
        """最後の観測値"""
        last_observed = []
        last_valid = None

        for i in range(len(self.features_buffer)):
            if self.mask_buffer[i].sum() > 0:
                last_valid = self.features_buffer[i]

            if last_valid is not None:
                last_observed.append(last_valid)
            else:
                last_observed.append(np.zeros_like(self.features_buffer[0]))

        return torch.FloatTensor(last_observed).unsqueeze(0)

    def _assess_market_conditions(self):
        """市場状況の評価"""
        if not self.orderbook_buffer:
            return "unknown"

        recent_orderbooks = list(self.orderbook_buffer)[-10:]

        # スプレッドの変動
        spreads = []
        for ob in recent_orderbooks:
            spread = ob['orderbook']['asks'][0][0] - ob['orderbook']['bids'][0][0]
            mid = (ob['orderbook']['asks'][0][0] + ob['orderbook']['bids'][0][0]) / 2
            spreads.append(spread / mid)

        avg_spread = np.mean(spreads)
        spread_volatility = np.std(spreads)

        # 深度の変化
        depths = []
        for ob in recent_orderbooks:
            total_bid = sum(level[1] for level in ob['orderbook']['bids'][:10])
            total_ask = sum(level[1] for level in ob['orderbook']['asks'][:10])
            depths.append(total_bid + total_ask)

        avg_depth = np.mean(depths)
        depth_trend = np.polyfit(range(len(depths)), depths, 1)[0]

        # 市場状況の判定
        if avg_spread > 0.002 and spread_volatility > 0.0005:
            return "volatile"
        elif depth_trend < -avg_depth * 0.1:
            return "thinning"
        elif avg_spread < 0.0005 and spread_volatility < 0.0001:
            return "stable"
        else:
            return "normal"

3.4 市場インパクトモデル

class MarketImpactEstimator:
    """市場インパクトの推定"""

    def estimate_immediate_impact(self, orderbook, side, size):
        """即時的な市場インパクトを推定"""
        if side == 'buy':
            levels = orderbook['asks']
        else:
            levels = orderbook['bids']

        remaining_size = size
        total_cost = 0

        for price, available_size in levels:
            if remaining_size <= 0:
                break

            filled_size = min(remaining_size, available_size)
            total_cost += filled_size * price
            remaining_size -= filled_size

        if remaining_size > 0:
            # 板を食い尽くした場合
            last_price = levels[-1][0] if levels else 0
            total_cost += remaining_size * last_price * 1.01  # ペナルティ

        # 平均執行価格
        avg_execution_price = total_cost / size

        # 最良価格からの乖離
        best_price = levels[0][0] if levels else 0
        impact = (avg_execution_price - best_price) / best_price if best_price > 0 else 0

        return impact if side == 'buy' else -impact

4. 統合システムと最適化

4.1 Ticks/Orderbook統合モデル

class IntegratedGRUD(nn.Module):
    """TicksとOrderbookを統合したGRU-Dモデル"""

    def __init__(self, config):
        super(IntegratedGRUD, self).__init__()

        # 個別のGRU-D
        self.ticks_grud = TicksGRUD(
            hidden_size=config['ticks_hidden_size'],
            num_layers=config['ticks_layers']
        )

        self.orderbook_grud = OrderbookGRUD(
            hidden_size=config['orderbook_hidden_size'],
            num_layers=config['orderbook_layers']
        )

        # 統合層
        combined_size = config['ticks_hidden_size'] + config['orderbook_hidden_size']
        self.fusion_layer = nn.Sequential(
            nn.Linear(combined_size, config['fusion_hidden_size']),
            nn.ReLU(),
            nn.Dropout(config['dropout']),
            nn.Linear(config['fusion_hidden_size'], config['fusion_hidden_size'])
        )

        # 最終予測層
        self.final_predictor = nn.Sequential(
            nn.Linear(config['fusion_hidden_size'], config['fusion_hidden_size'] // 2),
            nn.ReLU(),
            nn.Dropout(config['dropout']),
            nn.Linear(config['fusion_hidden_size'] // 2, config['output_size'])
        )

    def forward(self, ticks_data, orderbook_data):
        # Ticks処理
        ticks_output = self.ticks_grud(
            ticks_data['features'],
            ticks_data['mask'],
            ticks_data['delta_t'],
            ticks_data['last_observed']
        )

        # Orderbook処理
        orderbook_output = self.orderbook_grud(
            orderbook_data['features'],
            orderbook_data['mask'],
            orderbook_data['delta_t'],
            orderbook_data['last_observed']
        )

        # 統合
        combined = torch.cat([
            ticks_output['hidden'][-1],
            orderbook_output['hidden'][-1]
        ], dim=-1)

        fused = self.fusion_layer(combined)
        prediction = self.final_predictor(fused)

        return {
            'prediction': prediction,
            'ticks_features': ticks_output,
            'orderbook_features': orderbook_output
        }

4.2 最適化とパフォーマンス

class OptimizedGRUDProcessor:
    """最適化されたGRU-D処理"""

    def __init__(self, model, device='cuda'):
        self.model = model.to(device)
        self.device = device

        # JITコンパイル
        self.model = torch.jit.script(self.model)

        # バッチ処理の設定
        self.batch_queue = []
        self.batch_size = 32
        self.max_latency_ms = 10

    async def process_stream(self, data_stream):
        """ストリーミングデータの非同期処理"""
        async for data in data_stream:
            self.batch_queue.append(data)

            if len(self.batch_queue) >= self.batch_size:
                await self._process_batch()

    async def _process_batch(self):
        """バッチ処理"""
        if not self.batch_queue:
            return

        # バッチデータの準備
        batch_data = self._prepare_batch(self.batch_queue)

        # GPU処理
        with torch.cuda.amp.autocast():  # 混合精度
            with torch.no_grad():
                predictions = self.model(batch_data)

        # 結果の配信
        for i, pred in enumerate(predictions):
            await self._send_prediction(self.batch_queue[i]['id'], pred)

        self.batch_queue.clear()

5. まとめ

GRU-Dは暗号通貨のTicksとOrderbookデータに非常に適したモデルです:

Ticksデータへの適用:
- 不規則な取引発生を自然に処理
- 市場の閑散期と活発期を区別
- マイクロストラクチャ特徴量と組み合わせて高精度予測

Orderbookデータへの適用:
- 高頻度の板更新を効率的に処理
- 重要な変化のみを捕捉
- 執行価格予測に有効

実装のポイント:
1. 適切な時間間隔の選択(Ticks: 秒単位、Orderbook: ミリ秒単位)
2. 特徴量エンジニアリングの重要性
3. リアルタイム処理のための最適化
4. 統合モデルによる相補的な情報活用