目次
GRU-Dの暗号通貨データ(Ticks/Orderbook)への適用ガイド
1. 暗号通貨データの特性とGRU-Dの適合性
1.1 Ticksデータの特性
Ticksデータ(約定データ)は典型的な不規則時系列データです:
- 不規則な発生: 取引は市場の活発さに応じて不規則に発生
- クラスタリング: 高ボラティリティ期間に取引が集中
- 情報の非対称性: 大口取引と小口取引で市場への影響が異なる
- 欠損の意味: 取引がない期間も重要な情報(流動性の低下)
1.2 Orderbookデータの特性
Orderbookデータも不規則に更新される:
- 高頻度更新: ミリ秒単位での注文の追加・キャンセル
- 多次元性: 複数の価格レベルでの同時変化
- 状態依存性: 現在の板の状態が次の変化に影響
- スパース性: 深い価格レベルではデータが疎
2. Ticksデータ用GRU-D実装
2.1 データ構造と前処理
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from collections import deque
from datetime import datetime, timedelta
class TicksDataProcessor:
"""Ticksデータ用の前処理クラス"""
def __init__(self,
base_interval='1s', # 基本時間間隔(1秒)
max_gap_minutes=5, # 最大欠損許容時間
feature_windows=[60, 300, 600]): # 特徴量計算窓(秒)
self.base_interval = base_interval
self.max_gap_minutes = max_gap_minutes
self.feature_windows = feature_windows
def extract_tick_features(self, ticks_df):
"""Tickデータから特徴量を抽出"""
features = []
# 基本特徴
features.append(ticks_df['price'].values)
features.append(ticks_df['size'].values)
features.append((ticks_df['side'] == 'buy').astype(float).values)
# 累積特徴
features.append(ticks_df['size'].cumsum().values)
features.append(ticks_df.groupby('side')['size'].cumsum().values)
# 価格変動特徴
features.append(ticks_df['price'].diff().fillna(0).values)
features.append(np.log(ticks_df['price'] / ticks_df['price'].shift(1)).fillna(0).values)
# 取引インパクト
features.append((ticks_df['size'] * ticks_df['price']).values)
return np.column_stack(features)
def create_regular_grid(self, ticks_df, start_time, end_time):
"""不規則なTicksデータを規則的グリッドに変換"""
# 規則的な時間グリッド作成
time_grid = pd.date_range(start=start_time, end=end_time, freq=self.base_interval)
n_steps = len(time_grid)
n_features = 8 # 上記で定義した特徴量の数
# 初期化
values = np.zeros((n_steps, n_features))
mask = np.zeros((n_steps, n_features))
delta_t = np.zeros((n_steps, 1))
last_observed = np.zeros((n_steps, n_features))
# 各時間窓での集計
for window in self.feature_windows:
window_features = self._compute_window_features(ticks_df, time_grid, window)
values = np.concatenate([values, window_features['values']], axis=1)
mask = np.concatenate([mask, window_features['mask']], axis=1)
# 時間間隔とlast observed値の計算
last_tick_idx = -1
last_tick_features = np.zeros(values.shape[1])
for i, t in enumerate(time_grid):
# 時間窓内のtickを検索
window_start = t - pd.Timedelta(seconds=1)
window_end = t
ticks_in_window = ticks_df[
(ticks_df.index >= window_start) &
(ticks_df.index < window_end)
]
if len(ticks_in_window) > 0:
# 実測値あり
tick_features = self._aggregate_ticks(ticks_in_window)
values[i, :len(tick_features)] = tick_features
mask[i, :len(tick_features)] = 1
last_tick_idx = i
last_tick_features[:len(tick_features)] = tick_features
# 時間間隔の計算
if last_tick_idx >= 0:
delta_t[i] = (i - last_tick_idx) * pd.Timedelta(self.base_interval).total_seconds()
# 最後の観測値
last_observed[i] = last_tick_features
return {
'values': values,
'mask': mask,
'delta_t': delta_t / 60.0, # 分単位に変換
'last_observed': last_observed,
'timestamps': time_grid
}
def _aggregate_ticks(self, ticks):
"""時間窓内のticksを集計"""
if len(ticks) == 0:
return np.zeros(8)
features = [
ticks['price'].mean(), # 平均価格
ticks['size'].sum(), # 総取引量
(ticks['side'] == 'buy').mean(), # 買い比率
len(ticks), # 取引回数
ticks['price'].std() if len(ticks) > 1 else 0, # 価格標準偏差
ticks['size'].mean(), # 平均取引サイズ
ticks['price'].max() - ticks['price'].min(), # 価格レンジ
(ticks['price'] * ticks['size']).sum() / ticks['size'].sum() # VWAP
]
return np.array(features)
def _compute_window_features(self, ticks_df, time_grid, window_seconds):
"""指定された時間窓での特徴量計算"""
n_steps = len(time_grid)
window_features = {
'momentum': np.zeros(n_steps),
'volatility': np.zeros(n_steps),
'volume_rate': np.zeros(n_steps),
'trade_intensity': np.zeros(n_steps),
'buy_pressure': np.zeros(n_steps)
}
mask = np.zeros((n_steps, len(window_features)))
for i, t in enumerate(time_grid):
window_start = t - pd.Timedelta(seconds=window_seconds)
window_ticks = ticks_df[(ticks_df.index >= window_start) & (ticks_df.index <= t)]
if len(window_ticks) >= 2:
# モメンタム
window_features['momentum'][i] = (
window_ticks['price'].iloc[-1] - window_ticks['price'].iloc[0]
) / window_ticks['price'].iloc[0]
# ボラティリティ
window_features['volatility'][i] = window_ticks['price'].pct_change().std()
# 取引頻度
window_features['volume_rate'][i] = window_ticks['size'].sum() / window_seconds
# 取引強度
window_features['trade_intensity'][i] = len(window_ticks) / window_seconds
# 買い圧力
buy_volume = window_ticks[window_ticks['side'] == 'buy']['size'].sum()
total_volume = window_ticks['size'].sum()
window_features['buy_pressure'][i] = buy_volume / total_volume if total_volume > 0 else 0.5
mask[i] = 1
# 特徴量を結合
values = np.column_stack([window_features[k] for k in window_features])
return {'values': values, 'mask': mask}
2.2 Ticks用GRU-Dモデル
class TicksGRUD(nn.Module):
"""Ticksデータ専用のGRU-Dモデル"""
def __init__(self,
tick_feature_size=8,
window_feature_size=15, # 3 windows × 5 features
hidden_size=128,
num_layers=2,
dropout=0.2):
super(TicksGRUD, self).__init__()
total_feature_size = tick_feature_size + window_feature_size
# 基本GRU-D
self.grud = GRUD(
input_size=total_feature_size,
hidden_size=hidden_size,
output_size=hidden_size,
num_layers=num_layers,
dropout=dropout
)
# Tick特有の処理層
self.tick_embedding = nn.Sequential(
nn.Linear(tick_feature_size, hidden_size // 2),
nn.ReLU(),
nn.Dropout(dropout)
)
# 予測ヘッド
self.price_predictor = nn.Sequential(
nn.Linear(hidden_size, hidden_size // 2),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(hidden_size // 2, 1)
)
self.volume_predictor = nn.Sequential(
nn.Linear(hidden_size, hidden_size // 2),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(hidden_size // 2, 1)
)
self.direction_classifier = nn.Sequential(
nn.Linear(hidden_size, hidden_size // 2),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(hidden_size // 2, 3) # up/neutral/down
)
def forward(self, x, mask, delta_t, x_last_observed):
# GRU-D処理
grud_output, hidden = self.grud(x, mask, delta_t, x_last_observed)
# 最後の時点の出力を使用
last_output = grud_output[:, -1, :]
# 予測
price_pred = self.price_predictor(last_output)
volume_pred = self.volume_predictor(last_output)
direction_logits = self.direction_classifier(last_output)
return {
'price': price_pred,
'volume': volume_pred,
'direction': torch.softmax(direction_logits, dim=-1),
'hidden': hidden
}
2.3 リアルタイムTicks処理
class RealtimeTicksProcessor:
"""リアルタイムTicksデータ処理"""
def __init__(self, model, buffer_minutes=10, update_interval_seconds=1):
self.model = model
self.buffer_minutes = buffer_minutes
self.update_interval = update_interval_seconds
# バッファ
self.ticks_buffer = deque()
self.features_buffer = deque(maxlen=buffer_minutes * 60 // update_interval_seconds)
self.mask_buffer = deque(maxlen=buffer_minutes * 60 // update_interval_seconds)
self.time_buffer = deque(maxlen=buffer_minutes * 60 // update_interval_seconds)
# 状態
self.last_update_time = None
self.last_features = None
self.hidden_state = None
# 前処理
self.processor = TicksDataProcessor()
def add_tick(self, tick):
"""新しいTickを追加"""
self.ticks_buffer.append(tick)
current_time = tick['timestamp']
# 更新間隔チェック
if self.last_update_time is None:
self.last_update_time = current_time
if (current_time - self.last_update_time).total_seconds() >= self.update_interval:
self._update_features(current_time)
self.last_update_time = current_time
def _update_features(self, current_time):
"""特徴量バッファを更新"""
# 時間窓内のticksを取得
window_start = current_time - timedelta(seconds=self.update_interval)
window_ticks = [t for t in self.ticks_buffer
if t['timestamp'] >= window_start and t['timestamp'] <= current_time]
if window_ticks:
# 特徴量抽出
features = self._extract_realtime_features(window_ticks)
mask = np.ones_like(features)
self.last_features = features
else:
# 欠損
features = np.zeros_like(self.last_features) if self.last_features is not None else np.zeros(23)
mask = np.zeros_like(features)
# バッファに追加
self.features_buffer.append(features)
self.mask_buffer.append(mask)
self.time_buffer.append(current_time)
# 古いticksを削除
cutoff_time = current_time - timedelta(minutes=self.buffer_minutes)
self.ticks_buffer = deque(t for t in self.ticks_buffer if t['timestamp'] > cutoff_time)
def predict_next(self, horizon_seconds=60):
"""次の価格を予測"""
if len(self.features_buffer) < 10:
return None
# 現在の状態を準備
features = torch.FloatTensor(list(self.features_buffer)).unsqueeze(0)
mask = torch.FloatTensor(list(self.mask_buffer)).unsqueeze(0)
# 時間間隔を計算
delta_t = []
last_observed_idx = -1
for i in range(len(self.time_buffer)):
if self.mask_buffer[i].sum() > 0:
last_observed_idx = i
if last_observed_idx >= 0:
time_diff = (self.time_buffer[i] - self.time_buffer[last_observed_idx]).total_seconds() / 60
else:
time_diff = 0
delta_t.append(time_diff)
delta_t = torch.FloatTensor(delta_t).unsqueeze(0).unsqueeze(-1)
# 最後の観測値
last_observed = self._compute_last_observed()
# 予測
self.model.eval()
with torch.no_grad():
predictions = self.model(features, mask, delta_t, last_observed)
# 将来の予測を外挿
future_price = self._extrapolate_price(
predictions['price'].item(),
predictions['direction'].numpy()[0],
horizon_seconds
)
return {
'current_price': self._get_last_price(),
'predicted_price': future_price,
'predicted_volume': predictions['volume'].item(),
'direction_probs': {
'up': predictions['direction'][0, 0].item(),
'neutral': predictions['direction'][0, 1].item(),
'down': predictions['direction'][0, 2].item()
},
'horizon': horizon_seconds
}
def _extract_realtime_features(self, ticks):
"""リアルタイムの特徴量抽出"""
if not ticks:
return np.zeros(23)
# 基本統計
prices = [t['price'] for t in ticks]
sizes = [t['size'] for t in ticks]
sides = [1 if t['side'] == 'buy' else 0 for t in ticks]
basic_features = [
np.mean(prices),
np.sum(sizes),
np.mean(sides),
len(ticks),
np.std(prices) if len(prices) > 1 else 0,
np.mean(sizes),
max(prices) - min(prices) if prices else 0,
sum(p * s for p, s in zip(prices, sizes)) / sum(sizes) if sum(sizes) > 0 else np.mean(prices)
]
# 時間窓特徴量(簡略化)
window_features = []
for window_minutes in [1, 5, 10]:
window_start = ticks[-1]['timestamp'] - timedelta(minutes=window_minutes)
window_ticks = [t for t in ticks if t['timestamp'] >= window_start]
if len(window_ticks) >= 2:
window_prices = [t['price'] for t in window_ticks]
momentum = (window_prices[-1] - window_prices[0]) / window_prices[0]
volatility = np.std(np.diff(window_prices)) / np.mean(window_prices)
volume_rate = sum(t['size'] for t in window_ticks) / (window_minutes * 60)
else:
momentum = 0
volatility = 0
volume_rate = 0
window_features.extend([momentum, volatility, volume_rate])
# マーケットマイクロストラクチャ特徴
if len(ticks) > 1:
# Kyle's lambda (簡略版)
price_changes = np.diff(prices)
signed_volumes = [s * (2 * side - 1) for s, side in zip(sizes[1:], sides[1:])]
if len(price_changes) > 0 and np.std(signed_volumes) > 0:
kyle_lambda = np.corrcoef(price_changes, signed_volumes)[0, 1] * \
(np.std(price_changes) / np.std(signed_volumes))
else:
kyle_lambda = 0
# Order flow imbalance
buy_volume = sum(s for s, side in zip(sizes, sides) if side == 1)
sell_volume = sum(s for s, side in zip(sizes, sides) if side == 0)
ofi = (buy_volume - sell_volume) / (buy_volume + sell_volume) if (buy_volume + sell_volume) > 0 else 0
else:
kyle_lambda = 0
ofi = 0
market_features = [kyle_lambda, ofi]
# 大口取引検出
large_trade_threshold = np.percentile(sizes, 90) if len(sizes) > 10 else np.mean(sizes) * 2
large_trades = sum(1 for s in sizes if s > large_trade_threshold)
large_volume = sum(s for s in sizes if s > large_trade_threshold)
large_trade_features = [
large_trades / len(ticks) if ticks else 0,
large_volume / sum(sizes) if sum(sizes) > 0 else 0
]
return np.array(basic_features + window_features + market_features + large_trade_features)
def _get_last_price(self):
"""最後の価格を取得"""
if self.ticks_buffer:
return self.ticks_buffer[-1]['price']
return None
def _extrapolate_price(self, base_prediction, direction_probs, horizon_seconds):
"""価格を外挿"""
# 方向性に基づく調整
direction_factor = (
direction_probs[0] * 1.0 + # up
direction_probs[1] * 0.0 + # neutral
direction_probs[2] * (-1.0) # down
)
# 時間に応じた調整(簡略化)
time_factor = np.sqrt(horizon_seconds / 60.0) # ルート時間則
return base_prediction * (1 + direction_factor * 0.001 * time_factor)
def _compute_last_observed(self):
"""最後の観測値を計算"""
last_observed = []
for i in range(len(self.features_buffer)):
if self.mask_buffer[i].sum() > 0:
last_observed.append(self.features_buffer[i])
elif last_observed:
last_observed.append(last_observed[-1])
else:
last_observed.append(np.zeros_like(self.features_buffer[0]))
return torch.FloatTensor(last_observed).unsqueeze(0)
3. Orderbookデータ用GRU-D実装
3.1 Orderbookデータの前処理
class OrderbookDataProcessor:
"""Orderbookデータ用の前処理クラス"""
def __init__(self,
levels=20, # 使用する価格レベル数
base_interval='100ms', # 基本時間間隔(100ミリ秒)
feature_type='raw'): # 'raw', 'normalized', 'log'
self.levels = levels
self.base_interval = base_interval
self.feature_type = feature_type
def extract_orderbook_features(self, orderbook):
"""Orderbookから特徴量を抽出"""
features = []
# 基本特徴
mid_price = (orderbook['bids'][0][0] + orderbook['asks'][0][0]) / 2
spread = orderbook['asks'][0][0] - orderbook['bids'][0][0]
spread_pct = spread / mid_price
features.extend([mid_price, spread, spread_pct])
# 各レベルの価格と数量
for i in range(self.levels):
if i < len(orderbook['bids']):
bid_price, bid_size = orderbook['bids'][i]
# 価格を中間価格からの相対値に変換
features.extend([
(bid_price - mid_price) / mid_price,
bid_size
])
else:
features.extend([0, 0])
if i < len(orderbook['asks']):
ask_price, ask_size = orderbook['asks'][i]
features.extend([
(ask_price - mid_price) / mid_price,
ask_size
])
else:
features.extend([0, 0])
# 集計特徴
features.extend(self._compute_aggregate_features(orderbook, mid_price))
return np.array(features)
def _compute_aggregate_features(self, orderbook, mid_price):
"""集計特徴量の計算"""
features = []
# 深度別の不均衡
for depth in [5, 10, 20]:
bid_volume = sum(level[1] for level in orderbook['bids'][:depth])
ask_volume = sum(level[1] for level in orderbook['asks'][:depth])
imbalance = (bid_volume - ask_volume) / (bid_volume + ask_volume) if (bid_volume + ask_volume) > 0 else 0
features.append(imbalance)
# 加重平均価格
if bid_volume > 0:
weighted_bid = sum(level[0] * level[1] for level in orderbook['bids'][:depth]) / bid_volume
else:
weighted_bid = orderbook['bids'][0][0] if orderbook['bids'] else mid_price
if ask_volume > 0:
weighted_ask = sum(level[0] * level[1] for level in orderbook['asks'][:depth]) / ask_volume
else:
weighted_ask = orderbook['asks'][0][0] if orderbook['asks'] else mid_price
weighted_mid = (weighted_bid + weighted_ask) / 2
features.append((weighted_mid - mid_price) / mid_price)
# 板の形状特徴
features.extend(self._compute_shape_features(orderbook))
return features
def _compute_shape_features(self, orderbook):
"""板の形状に関する特徴量"""
features = []
# 価格の勾配
if len(orderbook['bids']) > 1:
bid_prices = [level[0] for level in orderbook['bids'][:10]]
bid_gradient = np.polyfit(range(len(bid_prices)), bid_prices, 1)[0]
else:
bid_gradient = 0
if len(orderbook['asks']) > 1:
ask_prices = [level[0] for level in orderbook['asks'][:10]]
ask_gradient = np.polyfit(range(len(ask_prices)), ask_prices, 1)[0]
else:
ask_gradient = 0
features.extend([bid_gradient, ask_gradient])
# 流動性の集中度
total_bid_volume = sum(level[1] for level in orderbook['bids'])
total_ask_volume = sum(level[1] for level in orderbook['asks'])
if total_bid_volume > 0:
bid_concentration = orderbook['bids'][0][1] / total_bid_volume
else:
bid_concentration = 0
if total_ask_volume > 0:
ask_concentration = orderbook['asks'][0][1] / total_ask_volume
else:
ask_concentration = 0
features.extend([bid_concentration, ask_concentration])
return features
3.2 Orderbook用GRU-Dモデル
class OrderbookGRUD(nn.Module):
"""Orderbook専用のGRU-Dモデル"""
def __init__(self,
levels=20,
feature_size=None,
hidden_size=256,
num_layers=3,
dropout=0.2):
super(OrderbookGRUD, self).__init__()
# 特徴量サイズの計算
if feature_size is None:
# 3 (基本) + levels*4 (価格と数量) + 12 (集計特徴)
feature_size = 3 + levels * 4 + 12
self.feature_size = feature_size
self.hidden_size = hidden_size
# 特徴量の前処理
self.feature_projection = nn.Sequential(
nn.Linear(feature_size, hidden_size),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(hidden_size, hidden_size)
)
# GRU-D層
self.grud = GRUD(
input_size=hidden_size,
hidden_size=hidden_size,
output_size=hidden_size,
num_layers=num_layers,
dropout=dropout
)
# Attention機構
self.attention = nn.MultiheadAttention(
embed_dim=hidden_size,
num_heads=8,
dropout=dropout
)
# 予測ヘッド
self.price_movement_head = nn.Sequential(
nn.Linear(hidden_size, hidden_size // 2),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(hidden_size // 2, 5) # 大幅下落/下落/横ばい/上昇/大幅上昇
)
self.spread_predictor = nn.Sequential(
nn.Linear(hidden_size, hidden_size // 4),
nn.ReLU(),
nn.Linear(hidden_size // 4, 1)
)
self.liquidity_predictor = nn.Sequential(
nn.Linear(hidden_size, hidden_size // 4),
nn.ReLU(),
nn.Linear(hidden_size // 4, 2) # bid/ask流動性
)
self.execution_price_predictor = nn.Sequential(
nn.Linear(hidden_size + 1, hidden_size // 2), # +1 for order size
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(hidden_size // 2, 2) # bid/ask実行価格
)
def forward(self, x, mask, delta_t, x_last_observed, order_sizes=None):
# 特徴量の射影
x_projected = self.feature_projection(x)
# GRU-D処理
grud_output, hidden = self.grud(x_projected, mask, delta_t, x_last_observed)
# Self-attention
attended, _ = self.attention(
grud_output.transpose(0, 1),
grud_output.transpose(0, 1),
grud_output.transpose(0, 1)
)
attended = attended.transpose(0, 1)
# 最後の出力を使用
last_output = attended[:, -1, :]
# 各種予測
price_movement = self.price_movement_head(last_output)
spread = self.spread_predictor(last_output)
liquidity = self.liquidity_predictor(last_output)
outputs = {
'price_movement': torch.softmax(price_movement, dim=-1),
'spread': spread,
'liquidity': liquidity,
'hidden': hidden
}
# 執行価格予測(オプション)
if order_sizes is not None:
# 注文サイズを考慮した執行価格
exec_input = torch.cat([last_output, order_sizes], dim=-1)
execution_prices = self.execution_price_predictor(exec_input)
outputs['execution_prices'] = execution_prices
return outputs
3.3 リアルタイムOrderbook処理
class RealtimeOrderbookProcessor:
"""リアルタイムOrderbookデータ処理"""
def __init__(self, model, buffer_seconds=30, update_interval_ms=100):
self.model = model
self.buffer_seconds = buffer_seconds
self.update_interval_ms = update_interval_ms
# バッファ
self.buffer_size = buffer_seconds * 1000 // update_interval_ms
self.orderbook_buffer = deque(maxlen=self.buffer_size)
self.features_buffer = deque(maxlen=self.buffer_size)
self.mask_buffer = deque(maxlen=self.buffer_size)
self.time_buffer = deque(maxlen=self.buffer_size)
# 状態
self.last_update_time = None
self.last_orderbook = None
self.last_features = None
self.hidden_state = None
# 前処理
self.processor = OrderbookDataProcessor()
# 市場インパクトモデル
self.impact_estimator = MarketImpactEstimator()
def update_orderbook(self, orderbook, timestamp):
"""Orderbookの更新"""
self.orderbook_buffer.append({
'orderbook': orderbook,
'timestamp': timestamp
})
# 更新間隔チェック
if self.last_update_time is None:
self.last_update_time = timestamp
time_diff_ms = (timestamp - self.last_update_time).total_seconds() * 1000
if time_diff_ms >= self.update_interval_ms:
self._update_features(orderbook, timestamp)
self.last_update_time = timestamp
self.last_orderbook = orderbook
def _update_features(self, orderbook, timestamp):
"""特徴量の更新"""
# 特徴量抽出
features = self.processor.extract_orderbook_features(orderbook)
# 変化の検出
if self.last_features is not None:
# 重要な変化があったかチェック
feature_change = np.abs(features - self.last_features).mean()
if feature_change < 1e-6:
# 変化なし = 欠損として扱う
mask = np.zeros_like(features)
else:
mask = np.ones_like(features)
else:
mask = np.ones_like(features)
# バッファに追加
self.features_buffer.append(features)
self.mask_buffer.append(mask)
self.time_buffer.append(timestamp)
self.last_features = features
def predict_market_state(self):
"""市場状態の予測"""
if len(self.features_buffer) < 10:
return None
# データ準備
features = torch.FloatTensor(list(self.features_buffer)).unsqueeze(0)
mask = torch.FloatTensor(list(self.mask_buffer)).unsqueeze(0)
# 時間間隔計算
delta_t = self._compute_time_intervals()
last_observed = self._compute_last_observed()
# 予測
self.model.eval()
with torch.no_grad():
predictions = self.model(features, mask, delta_t, last_observed)
# 結果の解釈
price_movement_probs = predictions['price_movement'][0].numpy()
movement_labels = ['大幅下落', '下落', '横ばい', '上昇', '大幅上昇']
predicted_movement = movement_labels[np.argmax(price_movement_probs)]
return {
'predicted_movement': predicted_movement,
'movement_probabilities': dict(zip(movement_labels, price_movement_probs)),
'predicted_spread': predictions['spread'].item(),
'predicted_liquidity': {
'bid': predictions['liquidity'][0, 0].item(),
'ask': predictions['liquidity'][0, 1].item()
},
'market_conditions': self._assess_market_conditions()
}
def predict_execution_price(self, side, size):
"""執行価格の予測"""
if len(self.features_buffer) < 10:
return None
# 現在の板情報
current_orderbook = self.orderbook_buffer[-1]['orderbook']
# 市場インパクトの推定
immediate_impact = self.impact_estimator.estimate_immediate_impact(
current_orderbook, side, size
)
# モデルによる予測
features = torch.FloatTensor(list(self.features_buffer)).unsqueeze(0)
mask = torch.FloatTensor(list(self.mask_buffer)).unsqueeze(0)
delta_t = self._compute_time_intervals()
last_observed = self._compute_last_observed()
order_size_normalized = torch.FloatTensor([[size / 1000.0]]) # 正規化
self.model.eval()
with torch.no_grad():
predictions = self.model(
features, mask, delta_t, last_observed,
order_sizes=order_size_normalized
)
# 予測執行価格
if side == 'buy':
model_adjustment = predictions['execution_prices'][0, 1].item()
else:
model_adjustment = predictions['execution_prices'][0, 0].item()
# 最終的な執行価格
mid_price = (current_orderbook['bids'][0][0] + current_orderbook['asks'][0][0]) / 2
execution_price = mid_price * (1 + immediate_impact + model_adjustment)
return {
'execution_price': execution_price,
'immediate_impact': immediate_impact,
'model_adjustment': model_adjustment,
'total_cost': execution_price * size,
'slippage': abs(execution_price - mid_price) / mid_price
}
def _compute_time_intervals(self):
"""時間間隔の計算"""
delta_t = []
last_update_idx = -1
for i in range(len(self.time_buffer)):
if self.mask_buffer[i].sum() > 0:
last_update_idx = i
if last_update_idx >= 0:
time_diff = (self.time_buffer[i] - self.time_buffer[last_update_idx]).total_seconds()
else:
time_diff = 0
delta_t.append(time_diff)
return torch.FloatTensor(delta_t).unsqueeze(0).unsqueeze(-1)
def _compute_last_observed(self):
"""最後の観測値"""
last_observed = []
last_valid = None
for i in range(len(self.features_buffer)):
if self.mask_buffer[i].sum() > 0:
last_valid = self.features_buffer[i]
if last_valid is not None:
last_observed.append(last_valid)
else:
last_observed.append(np.zeros_like(self.features_buffer[0]))
return torch.FloatTensor(last_observed).unsqueeze(0)
def _assess_market_conditions(self):
"""市場状況の評価"""
if not self.orderbook_buffer:
return "unknown"
recent_orderbooks = list(self.orderbook_buffer)[-10:]
# スプレッドの変動
spreads = []
for ob in recent_orderbooks:
spread = ob['orderbook']['asks'][0][0] - ob['orderbook']['bids'][0][0]
mid = (ob['orderbook']['asks'][0][0] + ob['orderbook']['bids'][0][0]) / 2
spreads.append(spread / mid)
avg_spread = np.mean(spreads)
spread_volatility = np.std(spreads)
# 深度の変化
depths = []
for ob in recent_orderbooks:
total_bid = sum(level[1] for level in ob['orderbook']['bids'][:10])
total_ask = sum(level[1] for level in ob['orderbook']['asks'][:10])
depths.append(total_bid + total_ask)
avg_depth = np.mean(depths)
depth_trend = np.polyfit(range(len(depths)), depths, 1)[0]
# 市場状況の判定
if avg_spread > 0.002 and spread_volatility > 0.0005:
return "volatile"
elif depth_trend < -avg_depth * 0.1:
return "thinning"
elif avg_spread < 0.0005 and spread_volatility < 0.0001:
return "stable"
else:
return "normal"
3.4 市場インパクトモデル
class MarketImpactEstimator:
"""市場インパクトの推定"""
def estimate_immediate_impact(self, orderbook, side, size):
"""即時的な市場インパクトを推定"""
if side == 'buy':
levels = orderbook['asks']
else:
levels = orderbook['bids']
remaining_size = size
total_cost = 0
for price, available_size in levels:
if remaining_size <= 0:
break
filled_size = min(remaining_size, available_size)
total_cost += filled_size * price
remaining_size -= filled_size
if remaining_size > 0:
# 板を食い尽くした場合
last_price = levels[-1][0] if levels else 0
total_cost += remaining_size * last_price * 1.01 # ペナルティ
# 平均執行価格
avg_execution_price = total_cost / size
# 最良価格からの乖離
best_price = levels[0][0] if levels else 0
impact = (avg_execution_price - best_price) / best_price if best_price > 0 else 0
return impact if side == 'buy' else -impact
4. 統合システムと最適化
4.1 Ticks/Orderbook統合モデル
class IntegratedGRUD(nn.Module):
"""TicksとOrderbookを統合したGRU-Dモデル"""
def __init__(self, config):
super(IntegratedGRUD, self).__init__()
# 個別のGRU-D
self.ticks_grud = TicksGRUD(
hidden_size=config['ticks_hidden_size'],
num_layers=config['ticks_layers']
)
self.orderbook_grud = OrderbookGRUD(
hidden_size=config['orderbook_hidden_size'],
num_layers=config['orderbook_layers']
)
# 統合層
combined_size = config['ticks_hidden_size'] + config['orderbook_hidden_size']
self.fusion_layer = nn.Sequential(
nn.Linear(combined_size, config['fusion_hidden_size']),
nn.ReLU(),
nn.Dropout(config['dropout']),
nn.Linear(config['fusion_hidden_size'], config['fusion_hidden_size'])
)
# 最終予測層
self.final_predictor = nn.Sequential(
nn.Linear(config['fusion_hidden_size'], config['fusion_hidden_size'] // 2),
nn.ReLU(),
nn.Dropout(config['dropout']),
nn.Linear(config['fusion_hidden_size'] // 2, config['output_size'])
)
def forward(self, ticks_data, orderbook_data):
# Ticks処理
ticks_output = self.ticks_grud(
ticks_data['features'],
ticks_data['mask'],
ticks_data['delta_t'],
ticks_data['last_observed']
)
# Orderbook処理
orderbook_output = self.orderbook_grud(
orderbook_data['features'],
orderbook_data['mask'],
orderbook_data['delta_t'],
orderbook_data['last_observed']
)
# 統合
combined = torch.cat([
ticks_output['hidden'][-1],
orderbook_output['hidden'][-1]
], dim=-1)
fused = self.fusion_layer(combined)
prediction = self.final_predictor(fused)
return {
'prediction': prediction,
'ticks_features': ticks_output,
'orderbook_features': orderbook_output
}
4.2 最適化とパフォーマンス
class OptimizedGRUDProcessor:
"""最適化されたGRU-D処理"""
def __init__(self, model, device='cuda'):
self.model = model.to(device)
self.device = device
# JITコンパイル
self.model = torch.jit.script(self.model)
# バッチ処理の設定
self.batch_queue = []
self.batch_size = 32
self.max_latency_ms = 10
async def process_stream(self, data_stream):
"""ストリーミングデータの非同期処理"""
async for data in data_stream:
self.batch_queue.append(data)
if len(self.batch_queue) >= self.batch_size:
await self._process_batch()
async def _process_batch(self):
"""バッチ処理"""
if not self.batch_queue:
return
# バッチデータの準備
batch_data = self._prepare_batch(self.batch_queue)
# GPU処理
with torch.cuda.amp.autocast(): # 混合精度
with torch.no_grad():
predictions = self.model(batch_data)
# 結果の配信
for i, pred in enumerate(predictions):
await self._send_prediction(self.batch_queue[i]['id'], pred)
self.batch_queue.clear()
5. まとめ
GRU-Dは暗号通貨のTicksとOrderbookデータに非常に適したモデルです:
Ticksデータへの適用:
- 不規則な取引発生を自然に処理
- 市場の閑散期と活発期を区別
- マイクロストラクチャ特徴量と組み合わせて高精度予測
Orderbookデータへの適用:
- 高頻度の板更新を効率的に処理
- 重要な変化のみを捕捉
- 執行価格予測に有効
実装のポイント:
1. 適切な時間間隔の選択(Ticks: 秒単位、Orderbook: ミリ秒単位)
2. 特徴量エンジニアリングの重要性
3. リアルタイム処理のための最適化
4. 統合モデルによる相補的な情報活用