目次
アンサンブル学習とメタ学習戦略
1. はじめに
暗号通貨取引において、単一のモデルに依存することはリスクが高く、市場の複雑性を完全に捉えることは困難です。アンサンブル学習とメタ学習は、複数のモデルを組み合わせることで、より堅牢で適応的な取引システムを構築する手法です。
2. アンサンブル学習の基礎
2.1 バギング(Bootstrap Aggregating)
バギングは、同じアルゴリズムを異なるデータサブセットで学習させ、予測を集約する手法です。
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.tree import DecisionTreeRegressor
from typing import List, Dict, Tuple
import joblib
from concurrent.futures import ProcessPoolExecutor
class CryptoBaggingEnsemble:
def __init__(self,
n_estimators: int = 100,
max_samples: float = 0.8,
n_jobs: int = -1):
self.n_estimators = n_estimators
self.max_samples = max_samples
self.n_jobs = n_jobs
self.estimators = []
self.feature_importances = None
def fit(self, X: np.ndarray, y: np.ndarray):
"""バギングモデルを学習"""
n_samples = X.shape[0]
sample_size = int(n_samples * self.max_samples)
# 並列処理でモデルを学習
with ProcessPoolExecutor(max_workers=self.n_jobs) as executor:
futures = []
for i in range(self.n_estimators):
# ブートストラップサンプリング
indices = np.random.choice(n_samples, sample_size, replace=True)
X_boot = X[indices]
y_boot = y[indices]
# モデルの学習を並列実行
future = executor.submit(self._train_single_model, X_boot, y_boot, i)
futures.append(future)
# 結果を収集
self.estimators = [future.result() for future in futures]
# 特徴量の重要度を計算
self._calculate_feature_importances(X, y)
def _train_single_model(self, X: np.ndarray, y: np.ndarray, seed: int):
"""単一モデルを学習"""
model = DecisionTreeRegressor(
max_depth=10,
min_samples_split=20,
random_state=seed
)
model.fit(X, y)
return model
def predict(self, X: np.ndarray) -> np.ndarray:
"""アンサンブル予測"""
predictions = np.array([model.predict(X) for model in self.estimators])
# 予測の集約(平均)
mean_prediction = np.mean(predictions, axis=0)
# 予測の不確実性も計算
prediction_std = np.std(predictions, axis=0)
return mean_prediction, prediction_std
def predict_proba(self, X: np.ndarray) -> Dict[str, np.ndarray]:
"""確率的予測(分類タスク用)"""
predictions = []
for model in self.estimators:
# 各モデルの予測を取得
pred = model.predict(X)
# 予測を確率に変換(シグモイド関数を使用)
prob = 1 / (1 + np.exp(-pred))
predictions.append(prob)
predictions = np.array(predictions)
return {
'mean_prob': np.mean(predictions, axis=0),
'std_prob': np.std(predictions, axis=0),
'confidence': 1 - np.std(predictions, axis=0) # 信頼度
}
def _calculate_feature_importances(self, X: np.ndarray, y: np.ndarray):
"""特徴量の重要度を計算"""
importances = []
for model in self.estimators:
if hasattr(model, 'feature_importances_'):
importances.append(model.feature_importances_)
if importances:
self.feature_importances = np.mean(importances, axis=0)
2.2 ブースティング(Gradient Boosting)
ブースティングは、弱学習器を逐次的に学習させ、前のモデルの誤差を補正する手法です。
import lightgbm as lgb
from sklearn.model_selection import TimeSeriesSplit
class CryptoGradientBoosting:
def __init__(self,
n_estimators: int = 1000,
learning_rate: float = 0.01,
max_depth: int = 6):
self.n_estimators = n_estimators
self.learning_rate = learning_rate
self.max_depth = max_depth
self.models = []
self.feature_names = None
def fit(self, X: pd.DataFrame, y: np.ndarray,
validation_split: float = 0.2):
"""時系列を考慮したGradient Boostingモデルを学習"""
self.feature_names = X.columns.tolist()
# 時系列分割
n_samples = len(X)
train_size = int(n_samples * (1 - validation_split))
X_train = X.iloc[:train_size]
y_train = y[:train_size]
X_val = X.iloc[train_size:]
y_val = y[train_size:]
# LightGBMパラメータ
params = {
'objective': 'regression',
'metric': 'rmse',
'num_leaves': 2 ** self.max_depth - 1,
'learning_rate': self.learning_rate,
'n_estimators': self.n_estimators,
'min_child_samples': 20,
'subsample': 0.8,
'colsample_bytree': 0.8,
'reg_alpha': 0.1,
'reg_lambda': 0.1,
'random_state': 42,
'n_jobs': -1,
'silent': True
}
# モデルの学習
train_data = lgb.Dataset(X_train, label=y_train)
val_data = lgb.Dataset(X_val, label=y_val, reference=train_data)
self.model = lgb.train(
params,
train_data,
valid_sets=[val_data],
early_stopping_rounds=50,
verbose_eval=False
)
# 特徴量の重要度を取得
self.feature_importance = pd.DataFrame({
'feature': self.feature_names,
'importance': self.model.feature_importance(importance_type='gain')
}).sort_values('importance', ascending=False)
return self
def predict(self, X: pd.DataFrame) -> np.ndarray:
"""予測を実行"""
return self.model.predict(X, num_iteration=self.model.best_iteration)
def predict_with_uncertainty(self, X: pd.DataFrame, n_iterations: int = 100) -> Dict:
"""不確実性を含む予測"""
# 複数の予測を生成(ドロップアウトを使用)
predictions = []
for i in range(n_iterations):
# ランダムに特徴量をドロップ
mask = np.random.binomial(1, 0.8, size=X.shape[1]).astype(bool)
X_masked = X.iloc[:, mask]
# 予測
pred = self.model.predict(
X_masked,
num_iteration=self.model.best_iteration
)
predictions.append(pred)
predictions = np.array(predictions)
return {
'mean': np.mean(predictions, axis=0),
'std': np.std(predictions, axis=0),
'lower_bound': np.percentile(predictions, 5, axis=0),
'upper_bound': np.percentile(predictions, 95, axis=0)
}
class AdaptiveBoostingTrader:
def __init__(self):
self.weak_learners = []
self.learner_weights = []
self.performance_history = []
def add_weak_learner(self, learner, initial_weight: float = 1.0):
"""弱学習器を追加"""
self.weak_learners.append(learner)
self.learner_weights.append(initial_weight)
def update_weights(self, predictions: List[np.ndarray],
actual: np.ndarray) -> None:
"""学習器の重みを更新"""
errors = []
for i, pred in enumerate(predictions):
# 各学習器の誤差を計算
error = np.mean(np.abs(pred - actual))
errors.append(error)
# 誤差に基づいて重みを更新
errors = np.array(errors)
# 誤差の逆数を重みとする
new_weights = 1 / (errors + 1e-6)
# 正規化
self.learner_weights = new_weights / np.sum(new_weights)
# パフォーマンス履歴を保存
self.performance_history.append({
'timestamp': pd.Timestamp.now(),
'errors': errors,
'weights': self.learner_weights.copy()
})
def predict(self, X: pd.DataFrame) -> np.ndarray:
"""加重アンサンブル予測"""
predictions = []
for learner in self.weak_learners:
pred = learner.predict(X)
predictions.append(pred)
# 加重平均
weighted_pred = np.average(
predictions,
axis=0,
weights=self.learner_weights
)
return weighted_pred
2.3 スタッキング(Stacked Generalization)
スタッキングは、複数のベースモデルの予測を新たな特徴量として使用し、メタモデルで最終予測を行う手法です。
from sklearn.model_selection import KFold
from sklearn.linear_model import Ridge
from sklearn.neural_network import MLPRegressor
from xgboost import XGBRegressor
class CryptoStackingEnsemble:
def __init__(self,
base_models: List = None,
meta_model = None,
n_folds: int = 5):
self.base_models = base_models or self._get_default_base_models()
self.meta_model = meta_model or Ridge(alpha=1.0)
self.n_folds = n_folds
self.trained_base_models = []
def _get_default_base_models(self):
"""デフォルトのベースモデル"""
return [
('rf', RandomForestRegressor(n_estimators=100, random_state=42)),
('xgb', XGBRegressor(n_estimators=100, random_state=42)),
('mlp', MLPRegressor(hidden_layer_sizes=(100, 50), random_state=42)),
('lgb', lgb.LGBMRegressor(n_estimators=100, random_state=42))
]
def fit(self, X: np.ndarray, y: np.ndarray):
"""スタッキングモデルを学習"""
n_samples = X.shape[0]
n_models = len(self.base_models)
# ベースモデルの予測を格納する配列
base_predictions = np.zeros((n_samples, n_models))
# K-fold cross-validation
kf = KFold(n_splits=self.n_folds, shuffle=False) # 時系列なのでshuffleしない
for fold_idx, (train_idx, val_idx) in enumerate(kf.split(X)):
X_train_fold = X[train_idx]
y_train_fold = y[train_idx]
X_val_fold = X[val_idx]
# 各ベースモデルを学習
fold_models = []
for model_idx, (name, model) in enumerate(self.base_models):
# モデルのクローンを作成
model_clone = self._clone_model(model)
# 学習
model_clone.fit(X_train_fold, y_train_fold)
# 検証データで予測
val_pred = model_clone.predict(X_val_fold)
base_predictions[val_idx, model_idx] = val_pred
fold_models.append((name, model_clone))
self.trained_base_models.append(fold_models)
# メタモデルを学習
self.meta_model.fit(base_predictions, y)
# 全データで最終的なベースモデルを学習
self.final_base_models = []
for name, model in self.base_models:
model_clone = self._clone_model(model)
model_clone.fit(X, y)
self.final_base_models.append((name, model_clone))
def predict(self, X: np.ndarray) -> np.ndarray:
"""スタッキング予測"""
n_samples = X.shape[0]
n_models = len(self.base_models)
# ベースモデルの予測を取得
base_predictions = np.zeros((n_samples, n_models))
for model_idx, (name, model) in enumerate(self.final_base_models):
base_predictions[:, model_idx] = model.predict(X)
# メタモデルで最終予測
final_prediction = self.meta_model.predict(base_predictions)
return final_prediction
def predict_with_confidence(self, X: np.ndarray) -> Dict:
"""信頼区間付き予測"""
# 各フォールドのモデルで予測
all_predictions = []
for fold_models in self.trained_base_models:
fold_base_predictions = []
for model_idx, (name, model) in enumerate(fold_models):
pred = model.predict(X)
fold_base_predictions.append(pred)
# このフォールドのメタ予測
fold_base_predictions = np.column_stack(fold_base_predictions)
fold_final_pred = self.meta_model.predict(fold_base_predictions)
all_predictions.append(fold_final_pred)
all_predictions = np.array(all_predictions)
return {
'mean': np.mean(all_predictions, axis=0),
'std': np.std(all_predictions, axis=0),
'lower_95': np.percentile(all_predictions, 2.5, axis=0),
'upper_95': np.percentile(all_predictions, 97.5, axis=0)
}
def _clone_model(self, model):
"""モデルをクローン"""
from sklearn.base import clone
return clone(model)
def get_model_contributions(self, X: np.ndarray) -> pd.DataFrame:
"""各モデルの貢献度を分析"""
contributions = {}
# ベースモデルの予測
for name, model in self.final_base_models:
contributions[name] = model.predict(X)
# メタモデルの係数から重要度を取得
if hasattr(self.meta_model, 'coef_'):
importances = np.abs(self.meta_model.coef_)
for i, (name, _) in enumerate(self.base_models):
contributions[f'{name}_importance'] = importances[i]
return pd.DataFrame(contributions)
3. メタ学習戦略
3.1 市場レジーム適応型メタ学習
from sklearn.mixture import GaussianMixture
from sklearn.preprocessing import StandardScaler
class MarketRegimeMetaLearner:
def __init__(self, n_regimes: int = 3):
self.n_regimes = n_regimes
self.regime_detector = GaussianMixture(
n_components=n_regimes,
covariance_type='full',
random_state=42
)
self.regime_models = {}
self.scaler = StandardScaler()
self.current_regime = None
def identify_market_features(self, market_data: pd.DataFrame) -> np.ndarray:
"""市場の特徴量を抽出"""
features = []
# ボラティリティ
returns = market_data['close'].pct_change()
features.append(returns.rolling(20).std())
# トレンド強度
sma_20 = market_data['close'].rolling(20).mean()
sma_50 = market_data['close'].rolling(50).mean()
trend_strength = (sma_20 - sma_50) / sma_50
features.append(trend_strength)
# ボリューム変化率
volume_ratio = market_data['volume'] / market_data['volume'].rolling(20).mean()
features.append(volume_ratio)
# RSI
rsi = self._calculate_rsi(market_data['close'])
features.append(rsi)
# 市場の効率性(Hurst指数)
hurst = self._calculate_hurst_exponent(returns.dropna())
features.append(pd.Series(hurst, index=market_data.index))
# 特徴量を結合
feature_matrix = pd.concat(features, axis=1).dropna()
return feature_matrix.values
def fit_regime_detector(self, market_data: pd.DataFrame):
"""市場レジームを学習"""
# 市場特徴量を抽出
features = self.identify_market_features(market_data)
# 正規化
features_scaled = self.scaler.fit_transform(features)
# レジームを学習
self.regime_detector.fit(features_scaled)
# 各データポイントのレジームを予測
regimes = self.regime_detector.predict(features_scaled)
return regimes
def train_regime_specific_models(self,
X: pd.DataFrame,
y: np.ndarray,
market_data: pd.DataFrame):
"""レジーム別のモデルを学習"""
# レジームを識別
regimes = self.fit_regime_detector(market_data)
# 各レジームごとにモデルを学習
for regime in range(self.n_regimes):
# このレジームのデータを抽出
regime_mask = regimes == regime
X_regime = X[regime_mask]
y_regime = y[regime_mask]
if len(X_regime) < 100: # 十分なデータがない場合はスキップ
continue
# レジーム専用モデルを作成
if regime == 0: # 低ボラティリティレジーム
model = self._create_low_volatility_model()
elif regime == 1: # トレンドレジーム
model = self._create_trend_model()
else: # 高ボラティリティレジーム
model = self._create_high_volatility_model()
# モデルを学習
model.fit(X_regime, y_regime)
self.regime_models[regime] = model
print(f"Regime {regime}: {len(X_regime)} samples")
def predict(self, X: pd.DataFrame, market_data: pd.DataFrame) -> np.ndarray:
"""現在のレジームに応じた予測"""
# 現在の市場レジームを識別
current_features = self.identify_market_features(market_data.tail(100))
current_features_scaled = self.scaler.transform(current_features[-1:])
current_regime = self.regime_detector.predict(current_features_scaled)[0]
# レジーム確率を取得
regime_probs = self.regime_detector.predict_proba(current_features_scaled)[0]
# 確率加重予測
predictions = np.zeros(len(X))
for regime, prob in enumerate(regime_probs):
if regime in self.regime_models and prob > 0.1:
regime_pred = self.regime_models[regime].predict(X)
predictions += prob * regime_pred
self.current_regime = current_regime
return predictions
def _create_low_volatility_model(self):
"""低ボラティリティ用モデル"""
return Ridge(alpha=0.1)
def _create_trend_model(self):
"""トレンド追従用モデル"""
return XGBRegressor(
n_estimators=200,
max_depth=5,
learning_rate=0.01,
subsample=0.8
)
def _create_high_volatility_model(self):
"""高ボラティリティ用モデル"""
return RandomForestRegressor(
n_estimators=300,
max_depth=10,
min_samples_split=20,
bootstrap=True
)
def _calculate_rsi(self, prices: pd.Series, period: int = 14) -> pd.Series:
"""RSIを計算"""
delta = prices.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return rsi
def _calculate_hurst_exponent(self, returns: np.ndarray) -> float:
"""Hurst指数を計算"""
if len(returns) < 100:
return 0.5
# R/S分析
lags = range(2, min(100, len(returns) // 2))
tau = []
for lag in lags:
# 平均を計算
mean = np.mean(returns[:lag])
# 累積偏差
cumdev = np.cumsum(returns[:lag] - mean)
# レンジ
R = np.max(cumdev) - np.min(cumdev)
# 標準偏差
S = np.std(returns[:lag])
if S != 0:
tau.append(R / S)
# log-log回帰でHurst指数を推定
if len(tau) > 0:
log_lags = np.log(list(lags))
log_tau = np.log(tau)
hurst = np.polyfit(log_lags, log_tau, 1)[0]
return hurst
return 0.5
3.2 マルチタイムフレームアンサンブル
class MultiTimeframeEnsemble:
def __init__(self,
timeframes: List[str] = ['5m', '15m', '1h', '4h', '1d']):
self.timeframes = timeframes
self.models = {}
self.weights = {}
self.performance_tracker = {}
def prepare_multiframe_features(self,
base_data: pd.DataFrame,
timeframe: str) -> pd.DataFrame:
"""各タイムフレーム用の特徴量を準備"""
# タイムフレームに応じたリサンプリング
resampled = self._resample_data(base_data, timeframe)
features = pd.DataFrame(index=resampled.index)
# 価格関連特徴量
features[f'{timeframe}_returns'] = resampled['close'].pct_change()
features[f'{timeframe}_log_returns'] = np.log(resampled['close'] / resampled['close'].shift(1))
# 移動平均
for period in [5, 10, 20]:
features[f'{timeframe}_sma_{period}'] = resampled['close'].rolling(period).mean()
features[f'{timeframe}_sma_{period}_slope'] = features[f'{timeframe}_sma_{period}'].diff()
# ボラティリティ
features[f'{timeframe}_volatility'] = features[f'{timeframe}_returns'].rolling(20).std()
# ボリューム分析
features[f'{timeframe}_volume_sma'] = resampled['volume'].rolling(20).mean()
features[f'{timeframe}_volume_ratio'] = resampled['volume'] / features[f'{timeframe}_volume_sma']
# テクニカル指標
features[f'{timeframe}_rsi'] = self._calculate_rsi(resampled['close'])
features[f'{timeframe}_macd'], features[f'{timeframe}_macd_signal'] = self._calculate_macd(resampled['close'])
# ボリンジャーバンド
bb_period = 20
bb_std = 2
sma = resampled['close'].rolling(bb_period).mean()
std = resampled['close'].rolling(bb_period).std()
features[f'{timeframe}_bb_upper'] = sma + (bb_std * std)
features[f'{timeframe}_bb_lower'] = sma - (bb_std * std)
features[f'{timeframe}_bb_width'] = features[f'{timeframe}_bb_upper'] - features[f'{timeframe}_bb_lower']
features[f'{timeframe}_bb_position'] = (resampled['close'] - features[f'{timeframe}_bb_lower']) / features[f'{timeframe}_bb_width']
return features.dropna()
def train_timeframe_models(self,
base_data: pd.DataFrame,
target: pd.Series):
"""各タイムフレームのモデルを学習"""
for timeframe in self.timeframes:
print(f"Training model for {timeframe} timeframe...")
# 特徴量を準備
features = self.prepare_multiframe_features(base_data, timeframe)
# ターゲットをリサンプリング
resampled_target = self._resample_target(target, timeframe)
# インデックスを揃える
common_index = features.index.intersection(resampled_target.index)
X = features.loc[common_index]
y = resampled_target.loc[common_index]
# モデルを選択(タイムフレームに応じて)
if timeframe in ['5m', '15m']:
# 短期:高速モデル
model = lgb.LGBMRegressor(
n_estimators=100,
max_depth=5,
learning_rate=0.1
)
elif timeframe in ['1h', '4h']:
# 中期:バランス型
model = XGBRegressor(
n_estimators=200,
max_depth=7,
learning_rate=0.05
)
else:
# 長期:複雑なモデル
model = RandomForestRegressor(
n_estimators=300,
max_depth=10,
min_samples_split=10
)
# 学習
train_size = int(len(X) * 0.8)
X_train = X[:train_size]
y_train = y[:train_size]
X_test = X[train_size:]
y_test = y[train_size:]
model.fit(X_train, y_train)
# パフォーマンスを評価
train_score = model.score(X_train, y_train)
test_score = model.score(X_test, y_test)
self.models[timeframe] = model
self.performance_tracker[timeframe] = {
'train_score': train_score,
'test_score': test_score
}
# 初期重みを設定(パフォーマンスに基づく)
self.weights[timeframe] = max(0.1, test_score)
# 重みを正規化
total_weight = sum(self.weights.values())
self.weights = {k: v/total_weight for k, v in self.weights.items()}
def predict_ensemble(self, base_data: pd.DataFrame) -> Dict[str, np.ndarray]:
"""アンサンブル予測"""
predictions = {}
weighted_prediction = None
for timeframe in self.timeframes:
if timeframe not in self.models:
continue
# 特徴量を準備
features = self.prepare_multiframe_features(base_data, timeframe)
# 予測
pred = self.models[timeframe].predict(features)
predictions[timeframe] = pred
# 加重予測に追加
if weighted_prediction is None:
weighted_prediction = self.weights[timeframe] * pred
else:
# インデックスを揃えて加算
min_len = min(len(weighted_prediction), len(pred))
weighted_prediction[:min_len] += self.weights[timeframe] * pred[:min_len]
predictions['ensemble'] = weighted_prediction
return predictions
def adaptive_weight_update(self,
predictions: Dict[str, np.ndarray],
actual: np.ndarray):
"""適応的に重みを更新"""
errors = {}
for timeframe in self.timeframes:
if timeframe in predictions:
# 予測誤差を計算
pred = predictions[timeframe]
min_len = min(len(pred), len(actual))
error = np.mean(np.abs(pred[:min_len] - actual[:min_len]))
errors[timeframe] = error
# 誤差に基づいて重みを更新(誤差が小さいほど高い重み)
if errors:
min_error = min(errors.values())
for timeframe in errors:
# 相対的なパフォーマンスに基づく重み
relative_performance = min_error / (errors[timeframe] + 1e-6)
# 既存の重みとのブレンド(モメンタム)
self.weights[timeframe] = 0.7 * self.weights[timeframe] + \
0.3 * relative_performance
# 正規化
total_weight = sum(self.weights.values())
self.weights = {k: v/total_weight for k, v in self.weights.items()}
def _resample_data(self, data: pd.DataFrame, timeframe: str) -> pd.DataFrame:
"""データをリサンプリング"""
# タイムフレームをpandasの頻度文字列に変換
freq_map = {
'5m': '5T',
'15m': '15T',
'1h': '1H',
'4h': '4H',
'1d': '1D'
}
freq = freq_map.get(timeframe, '1H')
# OHLCVデータをリサンプリング
resampled = data.resample(freq).agg({
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': 'sum'
})
return resampled.dropna()
def _resample_target(self, target: pd.Series, timeframe: str) -> pd.Series:
"""ターゲットをリサンプリング"""
freq_map = {
'5m': '5T',
'15m': '15T',
'1h': '1H',
'4h': '4H',
'1d': '1D'
}
freq = freq_map.get(timeframe, '1H')
# 最後の値を使用
return target.resample(freq).last()
def _calculate_macd(self, prices: pd.Series,
fast: int = 12,
slow: int = 26,
signal: int = 9) -> Tuple[pd.Series, pd.Series]:
"""MACDを計算"""
ema_fast = prices.ewm(span=fast).mean()
ema_slow = prices.ewm(span=slow).mean()
macd = ema_fast - ema_slow
macd_signal = macd.ewm(span=signal).mean()
return macd, macd_signal
4. モデルブレンディング技術
4.1 動的重み付けブレンディング
class DynamicModelBlender:
def __init__(self,
models: Dict[str, any],
lookback_window: int = 100):
self.models = models
self.lookback_window = lookback_window
self.performance_history = {name: [] for name in models.keys()}
self.current_weights = {name: 1/len(models) for name in models.keys()}
def blend_predictions(self,
X: pd.DataFrame,
market_conditions: Dict) -> np.ndarray:
"""市場状況に応じた動的ブレンディング"""
predictions = {}
# 各モデルの予測を取得
for name, model in self.models.items():
pred = model.predict(X)
predictions[name] = pred
# 市場状況に基づいて重みを調整
adjusted_weights = self._adjust_weights_for_market(market_conditions)
# ブレンド予測
blended = np.zeros_like(list(predictions.values())[0])
for name, pred in predictions.items():
blended += adjusted_weights[name] * pred
return blended
def _adjust_weights_for_market(self, market_conditions: Dict) -> Dict:
"""市場状況に応じて重みを調整"""
adjusted_weights = self.current_weights.copy()
# ボラティリティが高い場合
if market_conditions.get('volatility', 0) > 0.02:
# 保守的なモデルの重みを増やす
if 'ridge' in adjusted_weights:
adjusted_weights['ridge'] *= 1.5
if 'random_forest' in adjusted_weights:
adjusted_weights['random_forest'] *= 0.8
# トレンドが強い場合
if abs(market_conditions.get('trend_strength', 0)) > 0.5:
# トレンドフォロー型モデルの重みを増やす
if 'xgboost' in adjusted_weights:
adjusted_weights['xgboost'] *= 1.3
if 'lstm' in adjusted_weights:
adjusted_weights['lstm'] *= 1.2
# 正規化
total = sum(adjusted_weights.values())
adjusted_weights = {k: v/total for k, v in adjusted_weights.items()}
return adjusted_weights
def update_weights_online(self,
predictions: Dict[str, np.ndarray],
actual: np.ndarray):
"""オンラインで重みを更新"""
# 各モデルのパフォーマンスを計算
for name, pred in predictions.items():
error = np.mean(np.abs(pred - actual))
self.performance_history[name].append(error)
# 移動窓でのパフォーマンスを計算
if len(self.performance_history[name]) > self.lookback_window:
self.performance_history[name].pop(0)
# 重みを更新
if all(len(hist) >= 10 for hist in self.performance_history.values()):
# 各モデルの平均誤差
avg_errors = {
name: np.mean(hist)
for name, hist in self.performance_history.items()
}
# 誤差の逆数を重みとする
min_error = min(avg_errors.values())
for name in self.models.keys():
self.current_weights[name] = min_error / (avg_errors[name] + 1e-6)
# 正規化
total = sum(self.current_weights.values())
self.current_weights = {k: v/total for k, v in self.current_weights.items()}
4.2 ベイジアンモデル平均化
class BayesianModelAveraging:
def __init__(self, models: List[Tuple[str, any]], prior_weights: np.ndarray = None):
self.models = models
self.n_models = len(models)
# 事前分布
if prior_weights is None:
self.prior_weights = np.ones(self.n_models) / self.n_models
else:
self.prior_weights = prior_weights
# 事後分布
self.posterior_weights = self.prior_weights.copy()
def update_posterior(self, X: np.ndarray, y: np.ndarray):
"""ベイズ更新で事後分布を更新"""
likelihoods = np.zeros(self.n_models)
for i, (name, model) in enumerate(self.models):
# 予測
pred = model.predict(X)
# 尤度を計算(正規分布を仮定)
residuals = y - pred
sigma = np.std(residuals)
# 対数尤度
log_likelihood = -0.5 * np.sum((residuals / sigma) ** 2) - \
len(residuals) * np.log(sigma * np.sqrt(2 * np.pi))
likelihoods[i] = np.exp(log_likelihood - np.max(log_likelihood)) # 数値安定性
# ベイズ更新
self.posterior_weights = self.prior_weights * likelihoods
self.posterior_weights /= np.sum(self.posterior_weights)
def predict(self, X: np.ndarray) -> Dict[str, np.ndarray]:
"""ベイジアンモデル平均による予測"""
predictions = []
for i, (name, model) in enumerate(self.models):
pred = model.predict(X)
predictions.append(pred)
predictions = np.array(predictions)
# 加重平均
mean_prediction = np.average(predictions, axis=0, weights=self.posterior_weights)
# 予測の不確実性
variance = np.average(
(predictions - mean_prediction) ** 2,
axis=0,
weights=self.posterior_weights
)
return {
'mean': mean_prediction,
'std': np.sqrt(variance),
'model_weights': self.posterior_weights,
'individual_predictions': {
name: predictions[i]
for i, (name, _) in enumerate(self.models)
}
}
5. 実装例:統合アンサンブルシステム
class IntegratedEnsembleTradingSystem:
def __init__(self, config: Dict):
self.config = config
# コンポーネントの初期化
self.bagging_ensemble = CryptoBaggingEnsemble()
self.boosting_model = CryptoGradientBoosting()
self.stacking_ensemble = CryptoStackingEnsemble()
self.regime_learner = MarketRegimeMetaLearner()
self.timeframe_ensemble = MultiTimeframeEnsemble()
self.model_blender = None
self.bayesian_averager = None
# データストア
self.feature_store = {}
self.prediction_history = []
def prepare_features(self, market_data: pd.DataFrame) -> pd.DataFrame:
"""包括的な特徴量エンジニアリング"""
features = pd.DataFrame(index=market_data.index)
# 価格関連特徴量
features['returns'] = market_data['close'].pct_change()
features['log_returns'] = np.log(market_data['close'] / market_data['close'].shift(1))
features['high_low_ratio'] = market_data['high'] / market_data['low']
features['close_open_ratio'] = market_data['close'] / market_data['open']
# ボリューム関連特徴量
features['volume_sma_ratio'] = market_data['volume'] / market_data['volume'].rolling(20).mean()
features['volume_std'] = market_data['volume'].rolling(20).std()
# テクニカル指標
# RSI
features['rsi'] = self._calculate_rsi(market_data['close'])
# MACD
features['macd'], features['macd_signal'] = self._calculate_macd(market_data['close'])
features['macd_diff'] = features['macd'] - features['macd_signal']
# ボリンジャーバンド
sma_20 = market_data['close'].rolling(20).mean()
std_20 = market_data['close'].rolling(20).std()
features['bb_upper'] = sma_20 + 2 * std_20
features['bb_lower'] = sma_20 - 2 * std_20
features['bb_position'] = (market_data['close'] - features['bb_lower']) / \
(features['bb_upper'] - features['bb_lower'])
# 市場構造特徴量
features['volatility'] = features['returns'].rolling(20).std()
features['skewness'] = features['returns'].rolling(50).skew()
features['kurtosis'] = features['returns'].rolling(50).kurt()
# マイクロストラクチャ特徴量
features['spread'] = market_data['ask'] - market_data['bid']
features['mid_price'] = (market_data['ask'] + market_data['bid']) / 2
features['spread_ratio'] = features['spread'] / features['mid_price']
return features.dropna()
def train_all_models(self,
market_data: pd.DataFrame,
target: np.ndarray):
"""全モデルを学習"""
print("Preparing features...")
X = self.prepare_features(market_data)
# データの分割
train_size = int(len(X) * 0.8)
X_train = X[:train_size]
y_train = target[:train_size]
X_val = X[train_size:]
y_val = target[train_size:]
print("Training bagging ensemble...")
self.bagging_ensemble.fit(X_train.values, y_train)
print("Training gradient boosting...")
self.boosting_model.fit(X_train, y_train)
print("Training stacking ensemble...")
self.stacking_ensemble.fit(X_train.values, y_train)
print("Training regime-specific models...")
self.regime_learner.train_regime_specific_models(
X_train, y_train, market_data[:train_size]
)
print("Training multi-timeframe models...")
self.timeframe_ensemble.train_timeframe_models(
market_data[:train_size],
pd.Series(y_train, index=X_train.index)
)
# モデルブレンダーを初期化
all_models = {
'bagging': self.bagging_ensemble,
'boosting': self.boosting_model,
'stacking': self.stacking_ensemble
}
self.model_blender = DynamicModelBlender(all_models)
# ベイジアン平均化を初期化
model_list = [
('bagging', self.bagging_ensemble),
('boosting', self.boosting_model),
('stacking', self.stacking_ensemble)
]
self.bayesian_averager = BayesianModelAveraging(model_list)
# 検証データで評価
self._evaluate_models(X_val, y_val)
def predict(self, market_data: pd.DataFrame) -> Dict[str, any]:
"""統合予測"""
# 特徴量を準備
X = self.prepare_features(market_data)
predictions = {}
# 個別モデルの予測
predictions['bagging'], predictions['bagging_std'] = \
self.bagging_ensemble.predict(X.values)
predictions['boosting'] = self.boosting_model.predict(X)
predictions['stacking'] = self.stacking_ensemble.predict(X.values)
# レジーム適応予測
predictions['regime'] = self.regime_learner.predict(X, market_data)
# マルチタイムフレーム予測
tf_predictions = self.timeframe_ensemble.predict_ensemble(market_data)
predictions.update({f'tf_{k}': v for k, v in tf_predictions.items()})
# 動的ブレンディング
market_conditions = self._analyze_market_conditions(market_data)
predictions['blended'] = self.model_blender.blend_predictions(
X, market_conditions
)
# ベイジアン平均
bayesian_result = self.bayesian_averager.predict(X.values)
predictions['bayesian_mean'] = bayesian_result['mean']
predictions['bayesian_std'] = bayesian_result['std']
# 最終的なアンサンブル予測
ensemble_components = [
predictions['bagging'],
predictions['boosting'],
predictions['stacking'],
predictions['regime'],
predictions['blended'],
predictions['bayesian_mean']
]
# 重み付き平均(各モデルの信頼度に基づく)
weights = self._calculate_confidence_weights(predictions)
predictions['final'] = np.average(
ensemble_components,
axis=0,
weights=weights
)
# メタデータを追加
predictions['metadata'] = {
'timestamp': pd.Timestamp.now(),
'model_weights': weights,
'market_regime': self.regime_learner.current_regime,
'confidence': self._calculate_prediction_confidence(predictions)
}
return predictions
def _analyze_market_conditions(self, market_data: pd.DataFrame) -> Dict:
"""市場状況を分析"""
recent_data = market_data.tail(100)
returns = recent_data['close'].pct_change()
conditions = {
'volatility': returns.std(),
'trend_strength': (recent_data['close'].iloc[-1] - recent_data['close'].iloc[0]) / recent_data['close'].iloc[0],
'volume_trend': recent_data['volume'].mean() / market_data['volume'].mean(),
'spread': (recent_data['ask'] - recent_data['bid']).mean()
}
return conditions
def _calculate_confidence_weights(self, predictions: Dict) -> np.ndarray:
"""予測の信頼度に基づく重みを計算"""
# 各モデルの予測のばらつきから信頼度を推定
pred_values = [
predictions['bagging'],
predictions['boosting'],
predictions['stacking'],
predictions['regime'],
predictions['blended'],
predictions['bayesian_mean']
]
# 予測の標準偏差(ばらつきが小さいほど信頼度が高い)
std_dev = np.std(pred_values, axis=0)
confidence = 1 / (1 + std_dev)
# 各モデルの過去のパフォーマンスも考慮
performance_weights = np.array([0.9, 0.85, 0.88, 0.82, 0.9, 0.92])
# 最終的な重み
weights = confidence.mean() * performance_weights
weights = weights / weights.sum()
return weights
def _calculate_prediction_confidence(self, predictions: Dict) -> float:
"""予測の全体的な信頼度を計算"""
# 各モデルの予測の一致度
pred_values = [
predictions['bagging'],
predictions['boosting'],
predictions['stacking'],
predictions['regime']
]
# 相関係数の平均
correlations = []
for i in range(len(pred_values)):
for j in range(i+1, len(pred_values)):
corr = np.corrcoef(pred_values[i], pred_values[j])[0, 1]
correlations.append(corr)
avg_correlation = np.mean(correlations)
# 予測の分散
prediction_variance = np.var(pred_values, axis=0).mean()
# 信頼度スコア(0-1)
confidence = avg_correlation * (1 / (1 + prediction_variance))
return np.clip(confidence, 0, 1)
def _evaluate_models(self, X_val: pd.DataFrame, y_val: np.ndarray):
"""モデルを評価"""
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
results = {}
# 各モデルの評価
models = {
'bagging': lambda x: self.bagging_ensemble.predict(x.values)[0],
'boosting': lambda x: self.boosting_model.predict(x),
'stacking': lambda x: self.stacking_ensemble.predict(x.values)
}
for name, predict_func in models.items():
pred = predict_func(X_val)
results[name] = {
'mse': mean_squared_error(y_val, pred),
'mae': mean_absolute_error(y_val, pred),
'r2': r2_score(y_val, pred)
}
print("\nModel Evaluation Results:")
for name, metrics in results.items():
print(f"\n{name}:")
for metric, value in metrics.items():
print(f" {metric}: {value:.6f}")
return results
def _calculate_rsi(self, prices: pd.Series, period: int = 14) -> pd.Series:
"""RSIを計算"""
delta = prices.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return rsi
def _calculate_macd(self, prices: pd.Series) -> Tuple[pd.Series, pd.Series]:
"""MACDを計算"""
ema_12 = prices.ewm(span=12).mean()
ema_26 = prices.ewm(span=26).mean()
macd = ema_12 - ema_26
signal = macd.ewm(span=9).mean()
return macd, signal
# 使用例
def main():
# データの読み込み
market_data = pd.read_csv('crypto_market_data.csv', parse_dates=['timestamp'])
market_data.set_index('timestamp', inplace=True)
# ターゲット変数の作成(例:1時間後の価格変化率)
target = market_data['close'].shift(-12).pct_change(12).dropna()
# システムの初期化
config = {
'symbol': 'BTC/USDT',
'target_horizon': '1h'
}
system = IntegratedEnsembleTradingSystem(config)
# モデルの学習
system.train_all_models(market_data, target.values)
# 予測の実行
latest_data = market_data.tail(1000)
predictions = system.predict(latest_data)
print(f"\nFinal prediction: {predictions['final'][-1]:.6f}")
print(f"Confidence: {predictions['metadata']['confidence']:.2%}")
print(f"Current regime: {predictions['metadata']['market_regime']}")
if __name__ == "__main__":
main()
6. パフォーマンス最適化とバックテスト
class EnsembleBacktester:
def __init__(self, ensemble_system: IntegratedEnsembleTradingSystem):
self.ensemble_system = ensemble_system
self.results = {}
def backtest(self,
market_data: pd.DataFrame,
initial_capital: float = 10000,
position_size: float = 0.1,
transaction_cost: float = 0.001):
"""アンサンブルシステムのバックテスト"""
capital = initial_capital
position = 0
trades = []
equity_curve = []
# ローリングウィンドウでバックテスト
window_size = 1000
step_size = 100
for i in range(window_size, len(market_data), step_size):
# 訓練データ
train_data = market_data[i-window_size:i]
# テストデータ
test_data = market_data[i:i+step_size]
# 予測
predictions = self.ensemble_system.predict(test_data)
# 取引シグナルの生成
signals = self._generate_signals(predictions['final'])
# 取引の実行
for j, signal in enumerate(signals):
if j >= len(test_data):
break
current_price = test_data['close'].iloc[j]
if signal > 0.5 and position <= 0: # 買いシグナル
# ポジションサイズの計算
trade_size = capital * position_size / current_price
cost = trade_size * current_price * transaction_cost
position = trade_size
capital -= trade_size * current_price + cost
trades.append({
'timestamp': test_data.index[j],
'type': 'buy',
'price': current_price,
'size': trade_size,
'cost': cost
})
elif signal < -0.5 and position >= 0: # 売りシグナル
if position > 0:
# 既存のロングポジションをクローズ
capital += position * current_price
cost = position * current_price * transaction_cost
capital -= cost
trades.append({
'timestamp': test_data.index[j],
'type': 'sell',
'price': current_price,
'size': position,
'cost': cost
})
# ショートポジションを開く
trade_size = capital * position_size / current_price
position = -trade_size
capital += trade_size * current_price - cost
# エクイティカーブを記録
mark_to_market = position * current_price
total_equity = capital + mark_to_market
equity_curve.append({
'timestamp': test_data.index[j],
'equity': total_equity,
'capital': capital,
'position': position,
'position_value': mark_to_market
})
# 結果の分析
self.results = self._analyze_results(
equity_curve,
trades,
initial_capital
)
return self.results
def _generate_signals(self, predictions: np.ndarray) -> np.ndarray:
"""予測から取引シグナルを生成"""
# 閾値ベースのシグナル生成
signals = np.zeros_like(predictions)
# 強い上昇予測
signals[predictions > 0.002] = 1
# 強い下落予測
signals[predictions < -0.002] = -1
return signals
def _analyze_results(self,
equity_curve: List[Dict],
trades: List[Dict],
initial_capital: float) -> Dict:
"""バックテスト結果を分析"""
equity_df = pd.DataFrame(equity_curve)
trades_df = pd.DataFrame(trades)
# リターンの計算
equity_df['returns'] = equity_df['equity'].pct_change()
total_return = (equity_df['equity'].iloc[-1] / initial_capital - 1) * 100
# シャープレシオ
sharpe_ratio = np.sqrt(252) * equity_df['returns'].mean() / equity_df['returns'].std()
# 最大ドローダウン
rolling_max = equity_df['equity'].expanding().max()
drawdown = (equity_df['equity'] - rolling_max) / rolling_max
max_drawdown = drawdown.min() * 100
# 勝率
if len(trades_df) > 1:
winning_trades = 0
for i in range(0, len(trades_df)-1, 2):
if i+1 < len(trades_df):
entry_price = trades_df.iloc[i]['price']
exit_price = trades_df.iloc[i+1]['price']
if trades_df.iloc[i]['type'] == 'buy':
if exit_price > entry_price:
winning_trades += 1
else:
if exit_price < entry_price:
winning_trades += 1
win_rate = winning_trades / (len(trades_df) // 2) * 100
else:
win_rate = 0
return {
'total_return': total_return,
'sharpe_ratio': sharpe_ratio,
'max_drawdown': max_drawdown,
'win_rate': win_rate,
'num_trades': len(trades_df),
'final_equity': equity_df['equity'].iloc[-1],
'equity_curve': equity_df
}
まとめ
本ドキュメントでは、暗号通貨取引における高度なアンサンブル学習とメタ学習戦略について詳しく解説しました。
主要なポイント
-
アンサンブル学習
- バギング:予測の安定性を向上
- ブースティング:逐次的な誤差補正
- スタッキング:複数モデルの相乗効果 -
メタ学習戦略
- 市場レジーム適応
- マルチタイムフレーム統合
- 動的モデル選択 -
モデルブレンディング
- 動的重み付け
- ベイジアンモデル平均化
- 信頼度ベースの統合 -
実装のベストプラクティス
- 包括的な特徴量エンジニアリング
- リアルタイム適応メカニズム
- 堅牢なバックテストフレームワーク
これらの技術を組み合わせることで、市場の変化に適応し、単一モデルよりも安定した予測性能を実現できます。