目次
LSTMによる取引方向予測と最新価格予測手法
1. 概要
本ドキュメントでは、暗号通貨の取引データ(OHLCV、Ticks、Orderbook)を用いたLSTMベースの予測システムの実装方法を詳細に解説します。特に、取引方向(上昇/下降)の分類と具体的な価格予測の両方に対応した実践的な手法を提供します。
2. LSTMアーキテクチャの設計
2.1 基本的なLSTMモデル
import torch
import torch.nn as nn
import numpy as np
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence
class CryptoLSTM(nn.Module):
"""暗号通貨価格予測用のLSTMモデル"""
def __init__(self, input_dim, hidden_dim=128, num_layers=3, dropout=0.2):
super(CryptoLSTM, self).__init__()
self.hidden_dim = hidden_dim
self.num_layers = num_layers
# 多層LSTM
self.lstm = nn.LSTM(
input_size=input_dim,
hidden_size=hidden_dim,
num_layers=num_layers,
batch_first=True,
dropout=dropout if num_layers > 1 else 0,
bidirectional=False
)
# Attention機構
self.attention = nn.MultiheadAttention(
embed_dim=hidden_dim,
num_heads=8,
dropout=dropout
)
# 出力層(回帰と分類の両方に対応)
self.regression_head = nn.Sequential(
nn.Linear(hidden_dim, 64),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(64, 1)
)
self.classification_head = nn.Sequential(
nn.Linear(hidden_dim, 64),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(64, 3) # 上昇/横ばい/下降
)
def forward(self, x, lengths=None):
batch_size = x.size(0)
# 可変長シーケンスの処理
if lengths is not None:
x = pack_padded_sequence(x, lengths, batch_first=True, enforce_sorted=False)
# LSTM処理
lstm_out, (h_n, c_n) = self.lstm(x)
if lengths is not None:
lstm_out, _ = pad_packed_sequence(lstm_out, batch_first=True)
# Self-Attention
attended, _ = self.attention(lstm_out, lstm_out, lstm_out)
# 最後の隠れ状態を使用
final_hidden = attended[:, -1, :]
# 予測
price_prediction = self.regression_head(final_hidden)
direction_prediction = self.classification_head(final_hidden)
return price_prediction, direction_prediction
2.2 マルチモーダルLSTM(複数データソース統合)
class MultiModalCryptoLSTM(nn.Module):
"""OHLCV、Ticks、Orderbookを統合するマルチモーダルLSTM"""
def __init__(self, ohlcv_dim, tick_dim, orderbook_dim, hidden_dim=256):
super(MultiModalCryptoLSTM, self).__init__()
# 各データタイプ用のエンコーダー
self.ohlcv_encoder = nn.LSTM(ohlcv_dim, hidden_dim//3, 2, batch_first=True)
self.tick_encoder = nn.LSTM(tick_dim, hidden_dim//3, 2, batch_first=True)
self.orderbook_encoder = nn.LSTM(orderbook_dim, hidden_dim//3, 2, batch_first=True)
# 特徴量の統合
self.fusion_layer = nn.Sequential(
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Dropout(0.3)
)
# 統合LSTM
self.integrated_lstm = nn.LSTM(
hidden_dim,
hidden_dim,
3,
batch_first=True,
dropout=0.2
)
# 予測ヘッド
self.predictor = nn.Sequential(
nn.Linear(hidden_dim, 128),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(128, 64),
nn.ReLU(),
nn.Linear(64, 4) # [価格, 上昇確率, 横ばい確率, 下降確率]
)
def forward(self, ohlcv_data, tick_data, orderbook_data):
# 各データタイプをエンコード
ohlcv_out, _ = self.ohlcv_encoder(ohlcv_data)
tick_out, _ = self.tick_encoder(tick_data)
orderbook_out, _ = self.orderbook_encoder(orderbook_data)
# 特徴量を結合
combined = torch.cat([
ohlcv_out[:, -1, :],
tick_out[:, -1, :],
orderbook_out[:, -1, :]
], dim=-1)
# 融合
fused = self.fusion_layer(combined).unsqueeze(1)
# 統合LSTM処理
integrated_out, _ = self.integrated_lstm(fused)
# 予測
predictions = self.predictor(integrated_out[:, -1, :])
# 価格と確率に分離
price = predictions[:, 0:1]
direction_probs = torch.softmax(predictions[:, 1:], dim=-1)
return price, direction_probs
3. 特徴量エンジニアリング
3.1 テクニカル指標の計算
class TechnicalIndicators:
"""テクニカル指標を計算するクラス"""
@staticmethod
def calculate_rsi(prices, period=14):
"""RSI(相対力指数)の計算"""
deltas = np.diff(prices)
seed = deltas[:period+1]
up = seed[seed >= 0].sum() / period
down = -seed[seed < 0].sum() / period
rs = up / down
rsi = np.zeros_like(prices)
rsi[:period] = 100. - 100. / (1. + rs)
for i in range(period, len(prices)):
delta = deltas[i-1]
if delta > 0:
upval = delta
downval = 0.
else:
upval = 0.
downval = -delta
up = (up * (period - 1) + upval) / period
down = (down * (period - 1) + downval) / period
rs = up / down
rsi[i] = 100. - 100. / (1. + rs)
return rsi
@staticmethod
def calculate_macd(prices, fast=12, slow=26, signal=9):
"""MACD(移動平均収束拡散)の計算"""
exp1 = pd.Series(prices).ewm(span=fast).mean()
exp2 = pd.Series(prices).ewm(span=slow).mean()
macd = exp1 - exp2
signal_line = macd.ewm(span=signal).mean()
histogram = macd - signal_line
return macd.values, signal_line.values, histogram.values
@staticmethod
def calculate_bollinger_bands(prices, period=20, std_dev=2):
"""ボリンジャーバンドの計算"""
sma = pd.Series(prices).rolling(window=period).mean()
std = pd.Series(prices).rolling(window=period).std()
upper_band = sma + (std * std_dev)
lower_band = sma - (std * std_dev)
return upper_band.values, sma.values, lower_band.values
3.2 特徴量生成パイプライン
class FeatureEngineering:
"""総合的な特徴量生成クラス"""
def __init__(self):
self.tech_indicators = TechnicalIndicators()
self.scaler = StandardScaler()
def create_ohlcv_features(self, ohlcv_data):
"""OHLCVデータから特徴量を生成"""
features = []
# 基本的な価格特徴
features.append(ohlcv_data['close'].values)
features.append(ohlcv_data['volume'].values)
# 価格変化率
returns = ohlcv_data['close'].pct_change().fillna(0).values
features.append(returns)
# ログリターン
log_returns = np.log(ohlcv_data['close'] / ohlcv_data['close'].shift(1)).fillna(0).values
features.append(log_returns)
# ボラティリティ(ローリング標準偏差)
volatility = ohlcv_data['close'].rolling(window=20).std().fillna(0).values
features.append(volatility)
# テクニカル指標
rsi = self.tech_indicators.calculate_rsi(ohlcv_data['close'].values)
features.append(rsi)
macd, signal, histogram = self.tech_indicators.calculate_macd(ohlcv_data['close'].values)
features.append(macd)
features.append(signal)
features.append(histogram)
# ボリンジャーバンド
upper, middle, lower = self.tech_indicators.calculate_bollinger_bands(ohlcv_data['close'].values)
bb_width = upper - lower
bb_position = (ohlcv_data['close'].values - lower) / (upper - lower + 1e-8)
features.append(bb_width)
features.append(bb_position)
# 高値安値比率
hl_ratio = (ohlcv_data['high'] - ohlcv_data['low']) / (ohlcv_data['close'] + 1e-8)
features.append(hl_ratio.values)
# 出来高変化率
volume_change = ohlcv_data['volume'].pct_change().fillna(0).values
features.append(volume_change)
# 価格と移動平均の乖離率
ma_20 = ohlcv_data['close'].rolling(window=20).mean()
ma_deviation = ((ohlcv_data['close'] - ma_20) / ma_20).fillna(0).values
features.append(ma_deviation)
# 特徴量を結合
feature_matrix = np.column_stack(features)
return feature_matrix
def create_tick_features(self, tick_data, window_size=100):
"""Tickデータから特徴量を生成"""
features = []
# 取引サイズの統計量
trade_sizes = tick_data['size'].rolling(window=window_size)
features.append(trade_sizes.mean().fillna(0).values)
features.append(trade_sizes.std().fillna(0).values)
features.append(trade_sizes.skew().fillna(0).values)
# 買い/売り比率
buy_ratio = tick_data[tick_data['side'] == 'buy'].groupby(
pd.Grouper(freq='1min')
).size() / tick_data.groupby(pd.Grouper(freq='1min')).size()
features.append(buy_ratio.fillna(0.5).values)
# 取引頻度
trade_frequency = tick_data.groupby(pd.Grouper(freq='1min')).size()
features.append(trade_frequency.values)
# 価格インパクト(大口取引の検出)
large_trades = tick_data['size'] > tick_data['size'].quantile(0.95)
large_trade_impact = large_trades.rolling(window=window_size).sum()
features.append(large_trade_impact.fillna(0).values)
# VWAP(出来高加重平均価格)
vwap = (tick_data['price'] * tick_data['size']).rolling(window=window_size).sum() / \
tick_data['size'].rolling(window=window_size).sum()
features.append(vwap.fillna(method='ffill').values)
return np.column_stack(features)
def create_orderbook_features(self, orderbook_snapshots):
"""オーダーブックから特徴量を生成"""
features = []
for snapshot in orderbook_snapshots:
# スプレッド
spread = snapshot['best_ask'] - snapshot['best_bid']
spread_pct = spread / snapshot['mid_price']
# 深度不均衡(5レベル)
bid_volume_5 = sum(snapshot['bids'][:5, 1])
ask_volume_5 = sum(snapshot['asks'][:5, 1])
imbalance_5 = (bid_volume_5 - ask_volume_5) / (bid_volume_5 + ask_volume_5 + 1e-8)
# 加重中間価格
weighted_mid = self.calculate_weighted_mid_price(snapshot)
# 価格レベル別の勾配
bid_gradient = self.calculate_price_gradient(snapshot['bids'][:10])
ask_gradient = self.calculate_price_gradient(snapshot['asks'][:10])
# 流動性指標
liquidity = bid_volume_5 + ask_volume_5
features.append([
spread, spread_pct, imbalance_5, weighted_mid,
bid_gradient, ask_gradient, liquidity
])
return np.array(features)
def calculate_weighted_mid_price(self, snapshot, levels=10):
"""深度加重中間価格の計算"""
bid_prices = snapshot['bids'][:levels, 0]
bid_volumes = snapshot['bids'][:levels, 1]
ask_prices = snapshot['asks'][:levels, 0]
ask_volumes = snapshot['asks'][:levels, 1]
total_bid_volume = bid_volumes.sum()
total_ask_volume = ask_volumes.sum()
weighted_bid = (bid_prices * bid_volumes).sum() / total_bid_volume
weighted_ask = (ask_prices * ask_volumes).sum() / total_ask_volume
return (weighted_bid + weighted_ask) / 2
def calculate_price_gradient(self, levels):
"""価格レベルの勾配を計算"""
if len(levels) < 2:
return 0
prices = levels[:, 0]
volumes = levels[:, 1]
# 線形回帰で勾配を計算
x = np.arange(len(prices))
gradient = np.polyfit(x, prices, 1)[0]
return gradient
4. 学習と予測の実装
4.1 データローダーの作成
class CryptoDataset(torch.utils.data.Dataset):
"""暗号通貨データ用のDataset"""
def __init__(self, features, targets, sequence_length=60, prediction_horizon=5):
self.features = features
self.targets = targets
self.sequence_length = sequence_length
self.prediction_horizon = prediction_horizon
def __len__(self):
return len(self.features) - self.sequence_length - self.prediction_horizon + 1
def __getitem__(self, idx):
# 入力シーケンス
x = self.features[idx:idx + self.sequence_length]
# ターゲット(予測対象)
target_idx = idx + self.sequence_length + self.prediction_horizon - 1
y_price = self.targets[target_idx]
# 方向性のラベル(上昇/横ばい/下降)
current_price = self.features[idx + self.sequence_length - 1, 0] # 現在価格
price_change = (y_price - current_price) / current_price
if price_change > 0.001: # 0.1%以上の上昇
y_direction = 0 # 上昇
elif price_change < -0.001: # 0.1%以上の下降
y_direction = 2 # 下降
else:
y_direction = 1 # 横ばい
return torch.FloatTensor(x), torch.FloatTensor([y_price]), torch.LongTensor([y_direction])
4.2 学習ループ
class LSTMTrainer:
"""LSTM モデルの学習を管理するクラス"""
def __init__(self, model, device='cuda'):
self.model = model.to(device)
self.device = device
# 損失関数
self.regression_criterion = nn.MSELoss()
self.classification_criterion = nn.CrossEntropyLoss()
# オプティマイザー
self.optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
self.scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
self.optimizer, mode='min', patience=5, factor=0.5
)
def train_epoch(self, train_loader):
"""1エポックの学習"""
self.model.train()
total_loss = 0
regression_loss_sum = 0
classification_loss_sum = 0
for batch_idx, (features, price_targets, direction_targets) in enumerate(train_loader):
features = features.to(self.device)
price_targets = price_targets.to(self.device)
direction_targets = direction_targets.to(self.device)
# 勾配のリセット
self.optimizer.zero_grad()
# 予測
price_pred, direction_pred = self.model(features)
# 損失計算
reg_loss = self.regression_criterion(price_pred, price_targets)
class_loss = self.classification_criterion(
direction_pred,
direction_targets.squeeze()
)
# 総合損失(重み付き)
total_batch_loss = reg_loss + 0.5 * class_loss
# バックプロパゲーション
total_batch_loss.backward()
# 勾配クリッピング
torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
# パラメータ更新
self.optimizer.step()
# 損失の記録
total_loss += total_batch_loss.item()
regression_loss_sum += reg_loss.item()
classification_loss_sum += class_loss.item()
avg_total_loss = total_loss / len(train_loader)
avg_reg_loss = regression_loss_sum / len(train_loader)
avg_class_loss = classification_loss_sum / len(train_loader)
return avg_total_loss, avg_reg_loss, avg_class_loss
def evaluate(self, val_loader):
"""検証データでの評価"""
self.model.eval()
total_loss = 0
all_price_preds = []
all_price_targets = []
all_direction_preds = []
all_direction_targets = []
with torch.no_grad():
for features, price_targets, direction_targets in val_loader:
features = features.to(self.device)
price_targets = price_targets.to(self.device)
direction_targets = direction_targets.to(self.device)
# 予測
price_pred, direction_pred = self.model(features)
# 損失計算
reg_loss = self.regression_criterion(price_pred, price_targets)
class_loss = self.classification_criterion(
direction_pred,
direction_targets.squeeze()
)
total_loss += reg_loss.item() + 0.5 * class_loss.item()
# 予測の保存
all_price_preds.extend(price_pred.cpu().numpy())
all_price_targets.extend(price_targets.cpu().numpy())
all_direction_preds.extend(
torch.argmax(direction_pred, dim=1).cpu().numpy()
)
all_direction_targets.extend(direction_targets.cpu().numpy())
# メトリクスの計算
avg_loss = total_loss / len(val_loader)
mae = np.mean(np.abs(np.array(all_price_preds) - np.array(all_price_targets)))
direction_accuracy = np.mean(
np.array(all_direction_preds) == np.array(all_direction_targets).squeeze()
)
return avg_loss, mae, direction_accuracy
def train(self, train_loader, val_loader, epochs=100, early_stopping_patience=10):
"""完全な学習プロセス"""
best_val_loss = float('inf')
patience_counter = 0
train_history = []
for epoch in range(epochs):
# 学習
train_loss, reg_loss, class_loss = self.train_epoch(train_loader)
# 検証
val_loss, val_mae, val_accuracy = self.evaluate(val_loader)
# 学習率の調整
self.scheduler.step(val_loss)
# 履歴の記録
train_history.append({
'epoch': epoch,
'train_loss': train_loss,
'val_loss': val_loss,
'val_mae': val_mae,
'val_accuracy': val_accuracy
})
print(f"Epoch {epoch}: Train Loss: {train_loss:.4f}, "
f"Val Loss: {val_loss:.4f}, MAE: {val_mae:.4f}, "
f"Direction Acc: {val_accuracy:.4f}")
# Early Stopping
if val_loss < best_val_loss:
best_val_loss = val_loss
patience_counter = 0
# ベストモデルの保存
torch.save(self.model.state_dict(), 'best_model.pth')
else:
patience_counter += 1
if patience_counter >= early_stopping_patience:
print(f"Early stopping at epoch {epoch}")
break
return train_history
4.3 予測とリアルタイム推論
class RealTimePredictor:
"""リアルタイム予測を行うクラス"""
def __init__(self, model, feature_engineering, sequence_length=60):
self.model = model
self.feature_engineering = feature_engineering
self.sequence_length = sequence_length
self.feature_buffer = deque(maxlen=sequence_length)
def update_buffer(self, new_data):
"""新しいデータでバッファを更新"""
features = self.feature_engineering.create_ohlcv_features(new_data)
self.feature_buffer.append(features[-1])
def predict(self):
"""現在のバッファから予測を実行"""
if len(self.feature_buffer) < self.sequence_length:
return None, None
# バッファをテンソルに変換
features = torch.FloatTensor(list(self.feature_buffer)).unsqueeze(0)
# 予測
self.model.eval()
with torch.no_grad():
price_pred, direction_pred = self.model(features)
# 結果の後処理
predicted_price = price_pred.item()
direction_probs = torch.softmax(direction_pred, dim=1).squeeze().numpy()
predicted_direction = np.argmax(direction_probs)
# 信頼度の計算
confidence = direction_probs[predicted_direction]
return {
'price': predicted_price,
'direction': ['UP', 'NEUTRAL', 'DOWN'][predicted_direction],
'confidence': confidence,
'probabilities': {
'up': direction_probs[0],
'neutral': direction_probs[1],
'down': direction_probs[2]
}
}
def predict_multiple_horizons(self, horizons=[1, 5, 15, 30]):
"""複数の時間軸での予測"""
predictions = {}
for horizon in horizons:
# 各時間軸用の予測を実行
pred = self.predict_with_horizon(horizon)
predictions[f'{horizon}min'] = pred
return predictions
5. アンサンブル手法
5.1 複数モデルのアンサンブル
class EnsemblePredictor:
"""複数のLSTMモデルを組み合わせた予測"""
def __init__(self, models, weights=None):
self.models = models
self.weights = weights or [1.0 / len(models)] * len(models)
def predict(self, features):
"""アンサンブル予測"""
all_price_preds = []
all_direction_probs = []
for model in self.models:
model.eval()
with torch.no_grad():
price_pred, direction_pred = model(features)
all_price_preds.append(price_pred)
all_direction_probs.append(
torch.softmax(direction_pred, dim=1)
)
# 重み付き平均
ensemble_price = sum(
w * p for w, p in zip(self.weights, all_price_preds)
)
ensemble_probs = sum(
w * p for w, p in zip(self.weights, all_direction_probs)
)
return ensemble_price, ensemble_probs
6. バックテストと評価
6.1 バックテストフレームワーク
class BacktestEngine:
"""予測モデルのバックテスト"""
def __init__(self, initial_capital=10000, trading_fee=0.001):
self.initial_capital = initial_capital
self.trading_fee = trading_fee
def backtest(self, predictions, actual_prices, threshold=0.6):
"""バックテストの実行"""
capital = self.initial_capital
position = 0 # 現在のポジション
trades = []
portfolio_values = []
for i in range(len(predictions)):
pred = predictions[i]
actual_price = actual_prices[i]
# 取引シグナルの生成
if pred['confidence'] > threshold:
if pred['direction'] == 'UP' and position <= 0:
# 買いシグナル
if position < 0: # ショートポジションをクローズ
capital += position * actual_price * (1 - self.trading_fee)
position = 0
# ロングポジションを開く
position = capital / actual_price * (1 - self.trading_fee)
capital = 0
trades.append({
'type': 'BUY',
'price': actual_price,
'time': i,
'position': position
})
elif pred['direction'] == 'DOWN' and position >= 0:
# 売りシグナル
if position > 0: # ロングポジションをクローズ
capital = position * actual_price * (1 - self.trading_fee)
position = 0
# ショートポジションを開く(簡略化のため省略)
trades.append({
'type': 'SELL',
'price': actual_price,
'time': i
})
# ポートフォリオ価値の計算
portfolio_value = capital + position * actual_price
portfolio_values.append(portfolio_value)
# パフォーマンスメトリクスの計算
returns = np.diff(portfolio_values) / portfolio_values[:-1]
sharpe_ratio = np.sqrt(252) * np.mean(returns) / np.std(returns)
max_drawdown = self.calculate_max_drawdown(portfolio_values)
return {
'final_value': portfolio_values[-1],
'total_return': (portfolio_values[-1] - self.initial_capital) / self.initial_capital,
'sharpe_ratio': sharpe_ratio,
'max_drawdown': max_drawdown,
'trades': trades,
'portfolio_values': portfolio_values
}
def calculate_max_drawdown(self, values):
"""最大ドローダウンの計算"""
peak = values[0]
max_dd = 0
for value in values:
if value > peak:
peak = value
dd = (peak - value) / peak
if dd > max_dd:
max_dd = dd
return max_dd
7. 実装のベストプラクティス
7.1 データの前処理
def preprocess_data(ohlcv_data, tick_data, orderbook_data):
"""データの前処理とクリーニング"""
# 1. 欠損値の処理
ohlcv_data = ohlcv_data.fillna(method='ffill')
# 2. 異常値の除去
price_changes = ohlcv_data['close'].pct_change()
outlier_mask = np.abs(price_changes) < 0.1 # 10%以上の変動を異常値とする
ohlcv_data = ohlcv_data[outlier_mask]
# 3. データの正規化
scaler = RobustScaler() # 外れ値に強い正規化
normalized_prices = scaler.fit_transform(
ohlcv_data[['open', 'high', 'low', 'close']].values
)
# 4. 時間の同期
# すべてのデータソースを同じ時間軸に揃える
common_timestamps = ohlcv_data.index.intersection(tick_data.index)
return normalized_prices, scaler
7.2 モデルの保存と読み込み
def save_model_checkpoint(model, optimizer, epoch, loss, path):
"""モデルのチェックポイントを保存"""
torch.save({
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'loss': loss,
'model_config': {
'input_dim': model.input_dim,
'hidden_dim': model.hidden_dim,
'num_layers': model.num_layers
}
}, path)
def load_model_checkpoint(path, model_class):
"""モデルのチェックポイントを読み込み"""
checkpoint = torch.load(path)
# モデルの再構築
model = model_class(**checkpoint['model_config'])
model.load_state_dict(checkpoint['model_state_dict'])
return model, checkpoint['epoch'], checkpoint['loss']
8. まとめ
本ドキュメントでは、LSTMを用いた暗号通貨の価格予測と取引方向予測の包括的な実装方法を提供しました。重要なポイント:
- マルチモーダル学習: OHLCV、Ticks、Orderbookの各データを効果的に統合
- ハイブリッド予測: 価格の回帰予測と方向の分類予測を同時に実行
- 実践的な特徴量: テクニカル指標と市場マイクロストラクチャの特徴を活用
- リアルタイム対応: ストリーミングデータに対応した予測システム
- 評価とバックテスト: 実際の取引を想定した評価フレームワーク
次のステップとして、以下の改善を検討することを推奨します:
- Attention機構の強化(Transformer型アーキテクチャへの移行)
- 強化学習を用いた取引戦略の最適化
- より高度なアンサンブル手法(スタッキング、ブレンディング)
- リアルタイム異常検知の統合