ベイズ最適化とモデル不確実性定量化による暗号資産予測の高度化
1. はじめに
1.1 なぜベイズアプローチが重要か
暗号資産市場の予測において、以下の課題に直面します:
- 限られたデータ: 新しい資産やプロトコルではデータが少ない
- 高い不確実性: 市場の変動性が極めて高い
- モデルの信頼性: 予測の確信度を定量化する必要
- 計算コスト: ハイパーパラメータ探索が高コスト
ベイズアプローチはこれらの課題に対して以下の利点を提供します:
- 不確実性の定量化: 点推定ではなく分布として予測
- 効率的な最適化: ベイズ最適化による効率的なパラメータ探索
- 事前知識の活用: ドメイン知識を事前分布として組み込み
- 適応的学習: 新しいデータで信念を更新
1.2 本ドキュメントの範囲
- ベイズ最適化によるハイパーパラメータチューニング
- ベイズニューラルネットワークによる不確実性定量化
- ガウス過程による価格予測
- 変分推論とMCMCの実装
- アンサンブル手法との統合
2. ベイズ最適化
2.1 基本理論と実装
import numpy as np
import torch
import torch.nn as nn
from scipy.stats import norm
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import Matern, RBF, WhiteKernel
from typing import Dict, List, Tuple, Callable
import optuna
from dataclasses import dataclass
@dataclass
class OptimizationResult:
best_params: Dict
best_value: float
optimization_history: List[Dict]
surrogate_model: Any
uncertainty_estimates: List[float]
class BayesianOptimizer:
"""暗号資産モデルのベイズ最適化"""
def __init__(self,
objective_function: Callable,
search_space: Dict,
n_initial_points: int = 10,
acquisition_function: str = 'ei'):
self.objective_function = objective_function
self.search_space = search_space
self.n_initial_points = n_initial_points
self.acquisition_function = acquisition_function
# ガウス過程の初期化
kernel = Matern(length_scale=1.0, nu=2.5) + WhiteKernel(noise_level=1e-5)
self.gp = GaussianProcessRegressor(
kernel=kernel,
alpha=1e-6,
normalize_y=True,
n_restarts_optimizer=10
)
# 観測データ
self.X_observed = []
self.y_observed = []
def optimize(self, n_iterations: int = 100) -> OptimizationResult:
"""最適化の実行"""
# 初期サンプリング
self._initial_sampling()
# ベイズ最適化ループ
for iteration in range(n_iterations - self.n_initial_points):
# 次の評価点を選択
next_point = self._select_next_point()
# 目的関数を評価
result = self.objective_function(next_point)
# 観測データに追加
self.X_observed.append(self._params_to_vector(next_point))
self.y_observed.append(result)
# サロゲートモデルを更新
self._update_surrogate()
# ログ
print(f"Iteration {iteration + self.n_initial_points}: "
f"Value = {result:.4f}, Best = {min(self.y_observed):.4f}")
# 結果をまとめる
best_idx = np.argmin(self.y_observed)
best_params = self._vector_to_params(self.X_observed[best_idx])
return OptimizationResult(
best_params=best_params,
best_value=self.y_observed[best_idx],
optimization_history=self._create_history(),
surrogate_model=self.gp,
uncertainty_estimates=self._estimate_uncertainties()
)
def _initial_sampling(self):
"""初期点のランダムサンプリング"""
for _ in range(self.n_initial_points):
# ランダムにパラメータを生成
params = {}
for param_name, param_config in self.search_space.items():
if param_config['type'] == 'float':
value = np.random.uniform(
param_config['low'],
param_config['high']
)
elif param_config['type'] == 'int':
value = np.random.randint(
param_config['low'],
param_config['high'] + 1
)
elif param_config['type'] == 'categorical':
value = np.random.choice(param_config['choices'])
params[param_name] = value
# 評価
result = self.objective_function(params)
self.X_observed.append(self._params_to_vector(params))
self.y_observed.append(result)
def _select_next_point(self):
"""獲得関数に基づく次の評価点の選択"""
# 候補点の生成
n_candidates = 10000
candidates = self._generate_candidates(n_candidates)
# 獲得関数の値を計算
acquisition_values = []
for candidate in candidates:
if self.acquisition_function == 'ei':
acq_value = self._expected_improvement(candidate)
elif self.acquisition_function == 'ucb':
acq_value = self._upper_confidence_bound(candidate)
elif self.acquisition_function == 'pi':
acq_value = self._probability_of_improvement(candidate)
acquisition_values.append(acq_value)
# 最大の獲得関数値を持つ点を選択
best_idx = np.argmax(acquisition_values)
return self._vector_to_params(candidates[best_idx])
def _expected_improvement(self, x):
"""期待改善(Expected Improvement)"""
x = np.array(x).reshape(1, -1)
# 現在の最良値
y_best = np.min(self.y_observed)
# 予測
mu, sigma = self.gp.predict(x, return_std=True)
mu = mu[0]
sigma = sigma[0]
# EIの計算
if sigma > 0:
z = (y_best - mu) / sigma
ei = sigma * (z * norm.cdf(z) + norm.pdf(z))
else:
ei = 0.0
return ei
def _upper_confidence_bound(self, x, kappa=2.0):
"""上側信頼限界(Upper Confidence Bound)"""
x = np.array(x).reshape(1, -1)
mu, sigma = self.gp.predict(x, return_std=True)
return -(mu[0] - kappa * sigma[0]) # 最小化問題なので符号反転
def _update_surrogate(self):
"""サロゲートモデルの更新"""
X = np.array(self.X_observed)
y = np.array(self.y_observed)
self.gp.fit(X, y)
2.2 暗号資産モデル専用のベイズ最適化
class CryptoModelBayesianOptimizer:
"""暗号資産予測モデル専用のベイズ最適化"""
def __init__(self, model_class, train_data, val_data):
self.model_class = model_class
self.train_data = train_data
self.val_data = val_data
# 探索空間の定義
self.search_space = {
'learning_rate': {'type': 'float', 'low': 1e-5, 'high': 1e-2, 'log': True},
'batch_size': {'type': 'int', 'low': 16, 'high': 256},
'hidden_size': {'type': 'int', 'low': 32, 'high': 512},
'num_layers': {'type': 'int', 'low': 1, 'high': 5},
'dropout': {'type': 'float', 'low': 0.0, 'high': 0.5},
'weight_decay': {'type': 'float', 'low': 1e-6, 'high': 1e-2, 'log': True},
'optimizer': {'type': 'categorical', 'choices': ['adam', 'sgd', 'rmsprop']},
'activation': {'type': 'categorical', 'choices': ['relu', 'gelu', 'tanh']}
}
def objective_function(self, params):
"""目的関数:検証損失を最小化"""
# モデルの作成
model = self.model_class(
hidden_size=params['hidden_size'],
num_layers=params['num_layers'],
dropout=params['dropout'],
activation=params['activation']
)
# 最適化器の設定
if params['optimizer'] == 'adam':
optimizer = torch.optim.Adam(
model.parameters(),
lr=params['learning_rate'],
weight_decay=params['weight_decay']
)
elif params['optimizer'] == 'sgd':
optimizer = torch.optim.SGD(
model.parameters(),
lr=params['learning_rate'],
weight_decay=params['weight_decay'],
momentum=0.9
)
else:
optimizer = torch.optim.RMSprop(
model.parameters(),
lr=params['learning_rate'],
weight_decay=params['weight_decay']
)
# 訓練
train_loader = DataLoader(
self.train_data,
batch_size=params['batch_size'],
shuffle=True
)
# 簡易訓練(早期終了あり)
best_val_loss = float('inf')
patience = 10
patience_counter = 0
for epoch in range(50): # 最大50エポック
# 訓練
model.train()
for batch in train_loader:
optimizer.zero_grad()
loss = model.compute_loss(batch)
loss.backward()
optimizer.step()
# 検証
val_loss = self._evaluate(model, self.val_data)
if val_loss < best_val_loss:
best_val_loss = val_loss
patience_counter = 0
else:
patience_counter += 1
if patience_counter >= patience:
break
return best_val_loss
def optimize_with_constraints(self, n_trials=100, constraints=None):
"""制約付き最適化"""
def objective_with_constraints(trial):
# パラメータのサンプリング
params = {
'learning_rate': trial.suggest_loguniform('learning_rate', 1e-5, 1e-2),
'batch_size': trial.suggest_int('batch_size', 16, 256, step=16),
'hidden_size': trial.suggest_int('hidden_size', 32, 512, step=32),
'num_layers': trial.suggest_int('num_layers', 1, 5),
'dropout': trial.suggest_float('dropout', 0.0, 0.5),
'weight_decay': trial.suggest_loguniform('weight_decay', 1e-6, 1e-2),
'optimizer': trial.suggest_categorical('optimizer', ['adam', 'sgd', 'rmsprop']),
'activation': trial.suggest_categorical('activation', ['relu', 'gelu', 'tanh'])
}
# 制約のチェック
if constraints:
# メモリ制約
if 'max_memory_mb' in constraints:
estimated_memory = self._estimate_memory_usage(params)
if estimated_memory > constraints['max_memory_mb']:
return float('inf')
# 推論時間制約
if 'max_inference_ms' in constraints:
inference_time = self._estimate_inference_time(params)
if inference_time > constraints['max_inference_ms']:
return float('inf')
return self.objective_function(params)
# Optunaによる最適化
study = optuna.create_study(
direction='minimize',
sampler=optuna.samplers.TPESampler(seed=42)
)
study.optimize(objective_with_constraints, n_trials=n_trials)
return study.best_params, study.best_value
2.3 多目的ベイズ最適化
class MultiObjectiveBayesianOptimizer:
"""多目的ベイズ最適化(精度と効率のトレードオフ)"""
def __init__(self, objectives: List[Callable], weights: List[float] = None):
self.objectives = objectives
self.weights = weights or [1.0] * len(objectives)
self.pareto_front = []
def optimize(self, search_space, n_iterations=100):
"""パレート最適解の探索"""
def multi_objective(trial):
# パラメータのサンプリング
params = self._sample_params(trial, search_space)
# 各目的関数の評価
objective_values = []
for obj_func in self.objectives:
value = obj_func(params)
objective_values.append(value)
# パレートフロントの更新
self._update_pareto_front(params, objective_values)
# 重み付き和(スカラー化)
weighted_sum = sum(w * v for w, v in zip(self.weights, objective_values))
return weighted_sum
# 最適化の実行
study = optuna.create_study(
direction='minimize',
sampler=optuna.samplers.NSGAIISampler() # 多目的最適化用サンプラー
)
study.optimize(multi_objective, n_trials=n_iterations)
return self.pareto_front
def _update_pareto_front(self, params, objectives):
"""パレートフロントの更新"""
# 新しい解が支配されているかチェック
dominated = False
for solution in self.pareto_front:
if all(solution['objectives'][i] <= objectives[i]
for i in range(len(objectives))):
dominated = True
break
if not dominated:
# 支配される解を削除
self.pareto_front = [
solution for solution in self.pareto_front
if not all(objectives[i] <= solution['objectives'][i]
for i in range(len(objectives)))
]
# 新しい解を追加
self.pareto_front.append({
'params': params,
'objectives': objectives
})
3. ベイズニューラルネットワーク
3.1 変分推論によるBNN
class BayesianLinear(nn.Module):
"""ベイズ線形層"""
def __init__(self, in_features, out_features, prior_var=1.0):
super(BayesianLinear, self).__init__()
self.in_features = in_features
self.out_features = out_features
# 重みの事前分布
self.prior_var = prior_var
# 変分パラメータ
self.weight_mu = nn.Parameter(torch.Tensor(out_features, in_features))
self.weight_rho = nn.Parameter(torch.Tensor(out_features, in_features))
self.bias_mu = nn.Parameter(torch.Tensor(out_features))
self.bias_rho = nn.Parameter(torch.Tensor(out_features))
self.reset_parameters()
def reset_parameters(self):
"""パラメータの初期化"""
nn.init.kaiming_uniform_(self.weight_mu, a=np.sqrt(5))
nn.init.constant_(self.weight_rho, -3)
nn.init.constant_(self.bias_mu, 0)
nn.init.constant_(self.bias_rho, -3)
def forward(self, x):
"""順伝播(再パラメータ化トリック)"""
# 標準偏差の計算
weight_sigma = torch.log1p(torch.exp(self.weight_rho))
bias_sigma = torch.log1p(torch.exp(self.bias_rho))
# サンプリング
weight_epsilon = torch.randn_like(self.weight_mu)
bias_epsilon = torch.randn_like(self.bias_mu)
weight = self.weight_mu + weight_sigma * weight_epsilon
bias = self.bias_mu + bias_sigma * bias_epsilon
return F.linear(x, weight, bias)
def kl_divergence(self):
"""KLダイバージェンスの計算"""
weight_sigma = torch.log1p(torch.exp(self.weight_rho))
bias_sigma = torch.log1p(torch.exp(self.bias_rho))
# KL(q||p) for weights
kl_weight = 0.5 * torch.sum(
weight_sigma**2 / self.prior_var +
self.weight_mu**2 / self.prior_var -
1 -
2 * torch.log(weight_sigma) +
np.log(self.prior_var)
)
# KL(q||p) for bias
kl_bias = 0.5 * torch.sum(
bias_sigma**2 / self.prior_var +
self.bias_mu**2 / self.prior_var -
1 -
2 * torch.log(bias_sigma) +
np.log(self.prior_var)
)
return kl_weight + kl_bias
class BayesianCryptoPredictor(nn.Module):
"""ベイズニューラルネットワークによる暗号資産予測"""
def __init__(self, input_size, hidden_sizes=[128, 64], output_size=1):
super(BayesianCryptoPredictor, self).__init__()
layers = []
prev_size = input_size
for hidden_size in hidden_sizes:
layers.append(BayesianLinear(prev_size, hidden_size))
layers.append(nn.ReLU())
layers.append(nn.Dropout(0.2))
prev_size = hidden_size
layers.append(BayesianLinear(prev_size, output_size))
self.network = nn.Sequential(*layers)
def forward(self, x, n_samples=1):
"""予測(複数サンプル)"""
if n_samples == 1:
return self.network(x)
else:
# 複数回サンプリングして不確実性を推定
outputs = []
for _ in range(n_samples):
outputs.append(self.network(x))
outputs = torch.stack(outputs)
mean = outputs.mean(dim=0)
std = outputs.std(dim=0)
return mean, std
def kl_divergence(self):
"""モデル全体のKLダイバージェンス"""
kl = 0
for module in self.modules():
if isinstance(module, BayesianLinear):
kl += module.kl_divergence()
return kl
def elbo_loss(self, predictions, targets, kl_weight=1.0):
"""Evidence Lower Bound (ELBO) 損失"""
# 尤度項(負の対数尤度)
nll = F.mse_loss(predictions, targets, reduction='sum')
# KL項
kl = self.kl_divergence()
# ELBO = -NLL - KL
return nll + kl_weight * kl
3.2 MCドロップアウトによる不確実性推定
class MCDropoutPredictor(nn.Module):
"""MCドロップアウトによる不確実性推定"""
def __init__(self, input_size, hidden_sizes=[256, 128], dropout_rate=0.2):
super(MCDropoutPredictor, self).__init__()
layers = []
prev_size = input_size
for hidden_size in hidden_sizes:
layers.extend([
nn.Linear(prev_size, hidden_size),
nn.ReLU(),
nn.Dropout(dropout_rate) # 推論時もドロップアウトを有効に
])
prev_size = hidden_size
layers.append(nn.Linear(prev_size, 1))
self.network = nn.Sequential(*layers)
self.dropout_rate = dropout_rate
def forward(self, x):
return self.network(x)
def predict_with_uncertainty(self, x, n_forward_passes=100):
"""不確実性付き予測"""
# 訓練モードにしてドロップアウトを有効化
self.train()
predictions = []
with torch.no_grad():
for _ in range(n_forward_passes):
pred = self.forward(x)
predictions.append(pred)
predictions = torch.stack(predictions)
# 平均と標準偏差
mean = predictions.mean(dim=0)
std = predictions.std(dim=0)
# 予測区間
lower = torch.quantile(predictions, 0.025, dim=0)
upper = torch.quantile(predictions, 0.975, dim=0)
return {
'mean': mean,
'std': std,
'lower_95': lower,
'upper_95': upper,
'epistemic_uncertainty': std.mean().item()
}
3.3 ガウス過程による時系列予測
class GaussianProcessCrypto:
"""ガウス過程による暗号資産価格予測"""
def __init__(self, kernel_type='matern'):
# カーネルの選択
if kernel_type == 'matern':
self.kernel = Matern(length_scale=1.0, nu=2.5)
elif kernel_type == 'rbf':
self.kernel = RBF(length_scale=1.0)
elif kernel_type == 'periodic':
from sklearn.gaussian_process.kernels import ExpSineSquared
self.kernel = ExpSineSquared(length_scale=1.0, periodicity=1.0)
# ノイズ項を追加
self.kernel += WhiteKernel(noise_level=1e-5)
self.gp = GaussianProcessRegressor(
kernel=self.kernel,
alpha=1e-6,
normalize_y=True,
n_restarts_optimizer=10
)
self.scaler_X = StandardScaler()
self.scaler_y = StandardScaler()
def fit(self, X, y):
"""モデルの学習"""
# データの正規化
X_scaled = self.scaler_X.fit_transform(X)
y_scaled = self.scaler_y.fit_transform(y.reshape(-1, 1)).ravel()
# ガウス過程の学習
self.gp.fit(X_scaled, y_scaled)
# 最適化されたカーネルパラメータ
print(f"Optimized kernel: {self.gp.kernel_}")
def predict(self, X, return_std=True):
"""予測と不確実性の推定"""
X_scaled = self.scaler_X.transform(X)
if return_std:
y_mean_scaled, y_std_scaled = self.gp.predict(X_scaled, return_std=True)
# 元のスケールに戻す
y_mean = self.scaler_y.inverse_transform(
y_mean_scaled.reshape(-1, 1)
).ravel()
# 標準偏差のスケール調整
y_std = y_std_scaled * self.scaler_y.scale_[0]
return y_mean, y_std
else:
y_mean_scaled = self.gp.predict(X_scaled, return_std=False)
y_mean = self.scaler_y.inverse_transform(
y_mean_scaled.reshape(-1, 1)
).ravel()
return y_mean
def plot_predictions(self, X_train, y_train, X_test, y_test):
"""予測結果の可視化"""
import matplotlib.pyplot as plt
# 予測
y_pred, y_std = self.predict(X_test, return_std=True)
# 信頼区間
confidence_interval = 1.96 # 95%信頼区間
plt.figure(figsize=(12, 6))
# 訓練データ
plt.scatter(X_train[:, 0], y_train, c='black', s=10, label='Training data')
# 予測
plt.plot(X_test[:, 0], y_pred, 'b-', label='GP mean')
plt.fill_between(
X_test[:, 0],
y_pred - confidence_interval * y_std,
y_pred + confidence_interval * y_std,
alpha=0.2,
color='blue',
label='95% confidence interval'
)
# 実際の値
plt.scatter(X_test[:, 0], y_test, c='red', s=10, label='Actual')
plt.xlabel('Time')
plt.ylabel('Price')
plt.title('Gaussian Process Prediction with Uncertainty')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
4. 変分推論とMCMC
4.1 変分オートエンコーダーによる特徴学習
class VariationalEncoder(nn.Module):
"""変分エンコーダー"""
def __init__(self, input_dim, hidden_dim, latent_dim):
super(VariationalEncoder, self).__init__()
self.fc1 = nn.Linear(input_dim, hidden_dim)
self.fc2_mu = nn.Linear(hidden_dim, latent_dim)
self.fc2_logvar = nn.Linear(hidden_dim, latent_dim)
def forward(self, x):
h = F.relu(self.fc1(x))
mu = self.fc2_mu(h)
logvar = self.fc2_logvar(h)
return mu, logvar
class VariationalDecoder(nn.Module):
"""変分デコーダー"""
def __init__(self, latent_dim, hidden_dim, output_dim):
super(VariationalDecoder, self).__init__()
self.fc1 = nn.Linear(latent_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, output_dim)
def forward(self, z):
h = F.relu(self.fc1(z))
return self.fc2(h)
class CryptoVAE(nn.Module):
"""暗号資産データ用の変分オートエンコーダー"""
def __init__(self, input_dim, hidden_dim=128, latent_dim=32):
super(CryptoVAE, self).__init__()
self.encoder = VariationalEncoder(input_dim, hidden_dim, latent_dim)
self.decoder = VariationalDecoder(latent_dim, hidden_dim, input_dim)
def reparameterize(self, mu, logvar):
"""再パラメータ化トリック"""
std = torch.exp(0.5 * logvar)
eps = torch.randn_like(std)
return mu + eps * std
def forward(self, x):
mu, logvar = self.encoder(x)
z = self.reparameterize(mu, logvar)
reconstruction = self.decoder(z)
return reconstruction, mu, logvar
def loss_function(self, reconstruction, x, mu, logvar, beta=1.0):
"""VAE損失関数"""
# 再構成誤差
reconstruction_loss = F.mse_loss(reconstruction, x, reduction='sum')
# KLダイバージェンス
kl_divergence = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
# β-VAE損失
return reconstruction_loss + beta * kl_divergence
def generate_features(self, x, n_samples=100):
"""潜在特徴の生成と不確実性推定"""
self.eval()
with torch.no_grad():
# 複数回サンプリング
latent_samples = []
for _ in range(n_samples):
mu, logvar = self.encoder(x)
z = self.reparameterize(mu, logvar)
latent_samples.append(z)
latent_samples = torch.stack(latent_samples)
# 統計量
mean_latent = latent_samples.mean(dim=0)
std_latent = latent_samples.std(dim=0)
return mean_latent, std_latent
4.2 MCMCによるパラメータ推定
import pymc3 as pm
import theano.tensor as tt
class BayesianTimeSeriesModel:
"""MCMCによるベイズ時系列モデル"""
def __init__(self):
self.model = None
self.trace = None
def build_model(self, data):
"""階層ベイズモデルの構築"""
with pm.Model() as model:
# ハイパープライア
mu_alpha = pm.Normal('mu_alpha', mu=0, sigma=1)
sigma_alpha = pm.HalfNormal('sigma_alpha', sigma=1)
# 時変パラメータ
alpha = pm.Normal('alpha', mu=mu_alpha, sigma=sigma_alpha,
shape=len(data))
# トレンド成分
trend = pm.GaussianRandomWalk('trend', sigma=0.1,
shape=len(data))
# 季節性成分(週次)
seasonal_period = 7
seasonal = pm.Normal('seasonal', mu=0, sigma=1,
shape=seasonal_period)
# 季節性の繰り返し
seasonal_full = tt.tile(seasonal, len(data) // seasonal_period + 1)[:len(data)]
# ボラティリティ(GARCH風)
sigma = pm.Exponential('sigma', lam=1)
# 観測モデル
mu = alpha + trend + seasonal_full
# 尤度
observations = pm.Normal('obs', mu=mu, sigma=sigma,
observed=data)
self.model = model
return model
def fit(self, data, n_samples=2000, n_chains=2):
"""MCMCによるパラメータ推定"""
if self.model is None:
self.build_model(data)
with self.model:
# NUTS(No-U-Turn Sampler)
self.trace = pm.sample(
n_samples,
chains=n_chains,
tune=1000,
target_accept=0.9,
return_inferencedata=True
)
return self.trace
def predict(self, n_periods=30):
"""将来予測"""
with self.model:
# 事後予測サンプリング
posterior_predictive = pm.sample_posterior_predictive(
self.trace,
samples=1000,
var_names=['obs']
)
# 予測の統計量
predictions = posterior_predictive['obs']
mean_pred = predictions.mean(axis=0)
std_pred = predictions.std(axis=0)
# 予測区間
lower_95 = np.percentile(predictions, 2.5, axis=0)
upper_95 = np.percentile(predictions, 97.5, axis=0)
return {
'mean': mean_pred,
'std': std_pred,
'lower_95': lower_95,
'upper_95': upper_95
}
def plot_components(self):
"""成分分解のプロット"""
import matplotlib.pyplot as plt
fig, axes = plt.subplots(3, 1, figsize=(12, 10))
# トレンド
trend_samples = self.trace.posterior['trend'].values
trend_mean = trend_samples.mean(axis=(0, 1))
trend_std = trend_samples.std(axis=(0, 1))
axes[0].plot(trend_mean, label='Trend')
axes[0].fill_between(
range(len(trend_mean)),
trend_mean - 2*trend_std,
trend_mean + 2*trend_std,
alpha=0.3
)
axes[0].set_title('Trend Component')
axes[0].legend()
# 季節性
seasonal_samples = self.trace.posterior['seasonal'].values
seasonal_mean = seasonal_samples.mean(axis=(0, 1))
axes[1].plot(seasonal_mean, 'o-')
axes[1].set_title('Seasonal Component (Weekly)')
axes[1].set_xlabel('Day of Week')
# ボラティリティ
sigma_samples = self.trace.posterior['sigma'].values.flatten()
axes[2].hist(sigma_samples, bins=50, alpha=0.7)
axes[2].axvline(sigma_samples.mean(), color='red',
linestyle='--', label=f'Mean: {sigma_samples.mean():.3f}')
axes[2].set_title('Volatility Distribution')
axes[2].legend()
plt.tight_layout()
plt.show()
5. アンサンブル不確実性推定
5.1 ディープアンサンブル
class DeepEnsemble:
"""ディープアンサンブルによる不確実性推定"""
def __init__(self, model_class, n_models=5):
self.model_class = model_class
self.n_models = n_models
self.models = []
def train(self, train_data, val_data, **kwargs):
"""アンサンブルメンバーの訓練"""
for i in range(self.n_models):
print(f"Training model {i+1}/{self.n_models}")
# 異なる初期化とデータシャッフル
model = self.model_class(**kwargs)
# 異なる乱数シードでデータをシャッフル
train_loader = DataLoader(
train_data,
batch_size=32,
shuffle=True,
generator=torch.Generator().manual_seed(i)
)
# 訓練
self._train_single_model(model, train_loader, val_data)
self.models.append(model)
def predict_with_uncertainty(self, x):
"""アンサンブル予測と不確実性"""
predictions = []
for model in self.models:
model.eval()
with torch.no_grad():
pred = model(x)
predictions.append(pred)
predictions = torch.stack(predictions)
# 予測統計
mean = predictions.mean(dim=0)
# 認識的不確実性(モデル間の分散)
epistemic_uncertainty = predictions.var(dim=0)
# 偶然的不確実性(各モデルの予測分散の平均)
if hasattr(self.models[0], 'predict_with_variance'):
aleatoric_uncertainties = []
for model in self.models:
_, var = model.predict_with_variance(x)
aleatoric_uncertainties.append(var)
aleatoric_uncertainty = torch.stack(aleatoric_uncertainties).mean(dim=0)
# 全体の不確実性
total_uncertainty = epistemic_uncertainty + aleatoric_uncertainty
else:
total_uncertainty = epistemic_uncertainty
aleatoric_uncertainty = None
return {
'mean': mean,
'epistemic_uncertainty': epistemic_uncertainty,
'aleatoric_uncertainty': aleatoric_uncertainty,
'total_uncertainty': total_uncertainty,
'individual_predictions': predictions
}
5.2 ベイズモデル平均
class BayesianModelAveraging:
"""ベイズモデル平均による予測統合"""
def __init__(self, models, prior_weights=None):
self.models = models
self.prior_weights = prior_weights or np.ones(len(models)) / len(models)
self.posterior_weights = None
def update_weights(self, validation_data):
"""検証データに基づく事後重みの更新"""
log_likelihoods = []
for model in self.models:
# 各モデルの対数尤度を計算
ll = self._compute_log_likelihood(model, validation_data)
log_likelihoods.append(ll)
log_likelihoods = np.array(log_likelihoods)
# ベイズの定理による重みの更新
log_posterior = np.log(self.prior_weights) + log_likelihoods
log_posterior -= np.max(log_posterior) # 数値安定性
posterior_weights = np.exp(log_posterior)
self.posterior_weights = posterior_weights / posterior_weights.sum()
print("Model weights:", self.posterior_weights)
def predict(self, x):
"""重み付き予測"""
predictions = []
uncertainties = []
for model, weight in zip(self.models, self.posterior_weights):
if hasattr(model, 'predict_with_uncertainty'):
pred_dict = model.predict_with_uncertainty(x)
pred = pred_dict['mean']
unc = pred_dict.get('total_uncertainty',
pred_dict.get('std', torch.zeros_like(pred)))
else:
pred = model(x)
unc = torch.zeros_like(pred)
predictions.append(weight * pred)
uncertainties.append(weight * unc)
# 加重平均
mean_prediction = sum(predictions)
# 不確実性の統合
# モデル間の分散も考慮
model_variance = sum(
weight * (pred - mean_prediction)**2
for pred, weight in zip(predictions, self.posterior_weights)
)
total_uncertainty = sum(uncertainties) + model_variance
return {
'prediction': mean_prediction,
'uncertainty': total_uncertainty,
'model_weights': self.posterior_weights
}
6. 実践的な統合システム
6.1 ベイズ最適化と不確実性定量化の統合
class BayesianCryptoTradingSystem:
"""ベイズ手法を統合した暗号資産取引システム"""
def __init__(self, config):
self.config = config
# ベイズ最適化器
self.optimizer = CryptoModelBayesianOptimizer(
model_class=BayesianCryptoPredictor,
train_data=config['train_data'],
val_data=config['val_data']
)
# アンサンブルモデル
self.ensemble = None
# ガウス過程(短期予測用)
self.gp_short_term = GaussianProcessCrypto(kernel_type='matern')
# ベイズ時系列モデル(長期トレンド)
self.bayesian_ts = BayesianTimeSeriesModel()
def optimize_and_train(self):
"""モデルの最適化と訓練"""
# 1. ベイズ最適化でハイパーパラメータ探索
print("Optimizing hyperparameters...")
best_params, best_value = self.optimizer.optimize_with_constraints(
n_trials=50,
constraints={
'max_memory_mb': 1024,
'max_inference_ms': 10
}
)
print(f"Best parameters: {best_params}")
print(f"Best validation loss: {best_value}")
# 2. 最適パラメータでアンサンブル訓練
print("Training ensemble...")
self.ensemble = DeepEnsemble(
model_class=BayesianCryptoPredictor,
n_models=5
)
self.ensemble.train(
self.config['train_data'],
self.config['val_data'],
**best_params
)
# 3. ガウス過程の訓練(短期予測)
print("Training Gaussian Process...")
X_recent = self.config['recent_features']
y_recent = self.config['recent_prices']
self.gp_short_term.fit(X_recent, y_recent)
# 4. ベイズ時系列モデルの訓練(長期トレンド)
print("Training Bayesian time series model...")
self.bayesian_ts.fit(self.config['price_history'])
def predict_with_full_uncertainty(self, features, horizon='short'):
"""完全な不確実性を考慮した予測"""
results = {}
if horizon == 'short':
# 短期予測:アンサンブル + GP
# アンサンブル予測
ensemble_pred = self.ensemble.predict_with_uncertainty(features)
# ガウス過程予測
gp_mean, gp_std = self.gp_short_term.predict(
features.numpy(),
return_std=True
)
# 予測の統合
combined_mean = 0.7 * ensemble_pred['mean'] + 0.3 * torch.tensor(gp_mean)
# 不確実性の統合
ensemble_unc = ensemble_pred['total_uncertainty']
gp_unc = torch.tensor(gp_std**2)
combined_uncertainty = torch.sqrt(
0.7**2 * ensemble_unc + 0.3**2 * gp_unc
)
results['prediction'] = combined_mean
results['uncertainty'] = combined_uncertainty
results['confidence_interval'] = self._compute_confidence_interval(
combined_mean, combined_uncertainty
)
elif horizon == 'long':
# 長期予測:ベイズ時系列モデル
ts_pred = self.bayesian_ts.predict(n_periods=30)
results['prediction'] = ts_pred['mean']
results['uncertainty'] = ts_pred['std']
results['confidence_interval'] = (ts_pred['lower_95'], ts_pred['upper_95'])
# リスク指標の計算
results['risk_metrics'] = self._compute_risk_metrics(results)
return results
def _compute_risk_metrics(self, predictions):
"""予測に基づくリスク指標"""
uncertainty = predictions['uncertainty']
# 変動係数(Coefficient of Variation)
cv = uncertainty / torch.abs(predictions['prediction'])
# 予測の信頼度スコア
confidence_score = 1 / (1 + cv)
# VaR(Value at Risk)
var_95 = predictions['prediction'] - 1.96 * uncertainty
# CVaR(Conditional Value at Risk)
# 正規分布を仮定
from scipy.stats import norm
alpha = 0.05
phi = norm.pdf(norm.ppf(alpha))
cvar = predictions['prediction'] - uncertainty * phi / alpha
return {
'coefficient_of_variation': cv,
'confidence_score': confidence_score,
'var_95': var_95,
'cvar_95': cvar,
'uncertainty_percentile': self._uncertainty_to_percentile(uncertainty)
}
def _uncertainty_to_percentile(self, uncertainty):
"""不確実性を市場のパーセンタイルに変換"""
# 過去の不確実性分布との比較
historical_uncertainties = self.config.get('historical_uncertainties', [])
if historical_uncertainties:
percentile = np.searchsorted(
np.sort(historical_uncertainties),
uncertainty.numpy()
) / len(historical_uncertainties)
else:
percentile = 0.5 # デフォルト
return percentile
def make_trading_decision(self, current_price, features):
"""不確実性を考慮した取引判断"""
# 予測
predictions = self.predict_with_full_uncertainty(features, horizon='short')
pred_price = predictions['prediction'].item()
uncertainty = predictions['uncertainty'].item()
confidence = predictions['risk_metrics']['confidence_score'].item()
# 期待リターン
expected_return = (pred_price - current_price) / current_price
# リスク調整済みリターン
risk_adjusted_return = expected_return / (1 + uncertainty / current_price)
# 取引シグナル
signal = {
'action': 'hold',
'confidence': confidence,
'size': 0,
'reason': ''
}
# 閾値ベースの判断
if confidence > self.config['min_confidence']:
if risk_adjusted_return > self.config['buy_threshold']:
signal['action'] = 'buy'
signal['size'] = self._calculate_position_size(confidence, uncertainty)
signal['reason'] = f'Expected return: {expected_return:.2%}, Risk-adjusted: {risk_adjusted_return:.2%}'
elif risk_adjusted_return < -self.config['sell_threshold']:
signal['action'] = 'sell'
signal['size'] = self._calculate_position_size(confidence, uncertainty)
signal['reason'] = f'Expected return: {expected_return:.2%}, Risk-adjusted: {risk_adjusted_return:.2%}'
else:
signal['reason'] = f'Low confidence: {confidence:.2f}'
return signal
def _calculate_position_size(self, confidence, uncertainty):
"""信頼度と不確実性に基づくポジションサイズ"""
# Kelly基準の修正版
base_size = self.config['base_position_size']
# 信頼度による調整
confidence_factor = confidence ** 2 # 非線形スケーリング
# 不確実性による調整
uncertainty_factor = 1 / (1 + uncertainty)
# 最終的なポジションサイズ
position_size = base_size * confidence_factor * uncertainty_factor
# 上限設定
max_size = self.config['max_position_size']
position_size = min(position_size, max_size)
return position_size
6.2 バックテストと評価
class BayesianBacktester:
"""ベイズモデルのバックテスト"""
def __init__(self, model_system, initial_capital=10000):
self.model_system = model_system
self.initial_capital = initial_capital
def backtest(self, test_data, start_date, end_date):
"""不確実性を考慮したバックテスト"""
results = {
'dates': [],
'prices': [],
'predictions': [],
'uncertainties': [],
'positions': [],
'portfolio_values': [],
'trades': []
}
capital = self.initial_capital
position = 0
for date, data in test_data.iterate_by_date(start_date, end_date):
current_price = data['price']
features = data['features']
# 予測と取引判断
pred_result = self.model_system.predict_with_full_uncertainty(
features, horizon='short'
)
signal = self.model_system.make_trading_decision(
current_price, features
)
# 取引実行
if signal['action'] == 'buy' and position <= 0:
# 買い
shares = (capital * signal['size']) / current_price
position = shares
capital -= shares * current_price * (1 + 0.001) # 手数料
results['trades'].append({
'date': date,
'action': 'buy',
'price': current_price,
'shares': shares,
'confidence': signal['confidence'],
'uncertainty': pred_result['uncertainty'].item()
})
elif signal['action'] == 'sell' and position > 0:
# 売り
capital += position * current_price * (1 - 0.001) # 手数料
results['trades'].append({
'date': date,
'action': 'sell',
'price': current_price,
'shares': position,
'confidence': signal['confidence'],
'uncertainty': pred_result['uncertainty'].item()
})
position = 0
# ポートフォリオ価値
portfolio_value = capital + position * current_price
# 結果の記録
results['dates'].append(date)
results['prices'].append(current_price)
results['predictions'].append(pred_result['prediction'].item())
results['uncertainties'].append(pred_result['uncertainty'].item())
results['positions'].append(position)
results['portfolio_values'].append(portfolio_value)
# パフォーマンス分析
performance = self._analyze_performance(results)
return results, performance
def _analyze_performance(self, results):
"""パフォーマンス分析"""
portfolio_values = np.array(results['portfolio_values'])
# 基本指標
total_return = (portfolio_values[-1] - portfolio_values[0]) / portfolio_values[0]
# リターン系列
returns = np.diff(portfolio_values) / portfolio_values[:-1]
# シャープレシオ
sharpe_ratio = np.mean(returns) / np.std(returns) * np.sqrt(252)
# 最大ドローダウン
cumulative = np.cumprod(1 + returns)
running_max = np.maximum.accumulate(cumulative)
drawdown = (cumulative - running_max) / running_max
max_drawdown = np.min(drawdown)
# 不確実性調整済み指標
uncertainties = np.array(results['uncertainties'])
uncertainty_weighted_returns = returns[1:] / (1 + uncertainties[:-1])
uncertainty_adjusted_sharpe = (
np.mean(uncertainty_weighted_returns) /
np.std(uncertainty_weighted_returns) * np.sqrt(252)
)
# 取引統計
trades = results['trades']
n_trades = len(trades)
win_trades = [t for t in trades if t['action'] == 'sell'
and t['price'] > prev_buy_price] # 簡略化
win_rate = len(win_trades) / n_trades if n_trades > 0 else 0
# 信頼度と精度の相関
if trades:
confidences = [t['confidence'] for t in trades]
# 実際の利益/損失を計算(簡略化)
confidence_accuracy_corr = np.corrcoef(confidences,
[1 if t in win_trades else 0
for t in trades])[0, 1]
else:
confidence_accuracy_corr = 0
return {
'total_return': total_return,
'sharpe_ratio': sharpe_ratio,
'max_drawdown': max_drawdown,
'uncertainty_adjusted_sharpe': uncertainty_adjusted_sharpe,
'n_trades': n_trades,
'win_rate': win_rate,
'confidence_accuracy_correlation': confidence_accuracy_corr,
'avg_uncertainty': np.mean(uncertainties),
'uncertainty_trend': np.polyfit(range(len(uncertainties)),
uncertainties, 1)[0]
}
7. まとめとベストプラクティス
7.1 実装ガイドライン
# ベイズ手法実装のチェックリスト
BAYESIAN_IMPLEMENTATION_CHECKLIST = {
"ベイズ最適化": [
"適切な獲得関数の選択",
"初期サンプル数の設定(10-20点)",
"探索と活用のバランス調整",
"制約条件の明確な定義"
],
"不確実性定量化": [
"認識的・偶然的不確実性の区別",
"適切な事前分布の選択",
"計算コストとのトレードオフ",
"キャリブレーションの確認"
],
"モデル選択": [
"データ量に応じた手法選択",
"MCドロップアウト(簡易)",
"変分推論(中規模)",
"MCMC(小規模・高精度)"
],
"運用": [
"不確実性に基づくアラート設定",
"信頼度閾値の動的調整",
"定期的な再キャリブレーション",
"異常な不確実性の検出"
]
}
7.2 注意点と推奨事項
-
計算コスト
- ベイズ手法は計算コストが高い
- リアルタイム用途では近似手法を検討
- GPUの活用が重要 -
過信の回避
- 不確実性推定も完璧ではない
- 複数の手法を組み合わせる
- 定期的な検証が必要 -
ドメイン知識の活用
- 事前分布にドメイン知識を反映
- 制約条件の適切な設定
- 異常値の適切な処理 -
継続的改善
- 新しいデータでの更新
- ハイパーパラメータの再最適化
- モデルアンサンブルの更新
ベイズアプローチは暗号資産の高い不確実性に対処する強力なツールです。適切に実装することで、より信頼性の高い予測と堅牢な取引戦略が実現できます。