ML Documentation

ベイズ最適化とモデル不確実性定量化による暗号資産予測の高度化

1. はじめに

1.1 なぜベイズアプローチが重要か

暗号資産市場の予測において、以下の課題に直面します:

ベイズアプローチはこれらの課題に対して以下の利点を提供します:

  1. 不確実性の定量化: 点推定ではなく分布として予測
  2. 効率的な最適化: ベイズ最適化による効率的なパラメータ探索
  3. 事前知識の活用: ドメイン知識を事前分布として組み込み
  4. 適応的学習: 新しいデータで信念を更新

1.2 本ドキュメントの範囲

2. ベイズ最適化

2.1 基本理論と実装

import numpy as np
import torch
import torch.nn as nn
from scipy.stats import norm
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import Matern, RBF, WhiteKernel
from typing import Dict, List, Tuple, Callable
import optuna
from dataclasses import dataclass

@dataclass
class OptimizationResult:
    best_params: Dict
    best_value: float
    optimization_history: List[Dict]
    surrogate_model: Any
    uncertainty_estimates: List[float]

class BayesianOptimizer:
    """暗号資産モデルのベイズ最適化"""

    def __init__(self, 
                 objective_function: Callable,
                 search_space: Dict,
                 n_initial_points: int = 10,
                 acquisition_function: str = 'ei'):
        self.objective_function = objective_function
        self.search_space = search_space
        self.n_initial_points = n_initial_points
        self.acquisition_function = acquisition_function

        # ガウス過程の初期化
        kernel = Matern(length_scale=1.0, nu=2.5) + WhiteKernel(noise_level=1e-5)
        self.gp = GaussianProcessRegressor(
            kernel=kernel,
            alpha=1e-6,
            normalize_y=True,
            n_restarts_optimizer=10
        )

        # 観測データ
        self.X_observed = []
        self.y_observed = []

    def optimize(self, n_iterations: int = 100) -> OptimizationResult:
        """最適化の実行"""
        # 初期サンプリング
        self._initial_sampling()

        # ベイズ最適化ループ
        for iteration in range(n_iterations - self.n_initial_points):
            # 次の評価点を選択
            next_point = self._select_next_point()

            # 目的関数を評価
            result = self.objective_function(next_point)

            # 観測データに追加
            self.X_observed.append(self._params_to_vector(next_point))
            self.y_observed.append(result)

            # サロゲートモデルを更新
            self._update_surrogate()

            # ログ
            print(f"Iteration {iteration + self.n_initial_points}: "
                  f"Value = {result:.4f}, Best = {min(self.y_observed):.4f}")

        # 結果をまとめる
        best_idx = np.argmin(self.y_observed)
        best_params = self._vector_to_params(self.X_observed[best_idx])

        return OptimizationResult(
            best_params=best_params,
            best_value=self.y_observed[best_idx],
            optimization_history=self._create_history(),
            surrogate_model=self.gp,
            uncertainty_estimates=self._estimate_uncertainties()
        )

    def _initial_sampling(self):
        """初期点のランダムサンプリング"""
        for _ in range(self.n_initial_points):
            # ランダムにパラメータを生成
            params = {}
            for param_name, param_config in self.search_space.items():
                if param_config['type'] == 'float':
                    value = np.random.uniform(
                        param_config['low'], 
                        param_config['high']
                    )
                elif param_config['type'] == 'int':
                    value = np.random.randint(
                        param_config['low'], 
                        param_config['high'] + 1
                    )
                elif param_config['type'] == 'categorical':
                    value = np.random.choice(param_config['choices'])

                params[param_name] = value

            # 評価
            result = self.objective_function(params)
            self.X_observed.append(self._params_to_vector(params))
            self.y_observed.append(result)

    def _select_next_point(self):
        """獲得関数に基づく次の評価点の選択"""
        # 候補点の生成
        n_candidates = 10000
        candidates = self._generate_candidates(n_candidates)

        # 獲得関数の値を計算
        acquisition_values = []
        for candidate in candidates:
            if self.acquisition_function == 'ei':
                acq_value = self._expected_improvement(candidate)
            elif self.acquisition_function == 'ucb':
                acq_value = self._upper_confidence_bound(candidate)
            elif self.acquisition_function == 'pi':
                acq_value = self._probability_of_improvement(candidate)

            acquisition_values.append(acq_value)

        # 最大の獲得関数値を持つ点を選択
        best_idx = np.argmax(acquisition_values)
        return self._vector_to_params(candidates[best_idx])

    def _expected_improvement(self, x):
        """期待改善(Expected Improvement)"""
        x = np.array(x).reshape(1, -1)

        # 現在の最良値
        y_best = np.min(self.y_observed)

        # 予測
        mu, sigma = self.gp.predict(x, return_std=True)
        mu = mu[0]
        sigma = sigma[0]

        # EIの計算
        if sigma > 0:
            z = (y_best - mu) / sigma
            ei = sigma * (z * norm.cdf(z) + norm.pdf(z))
        else:
            ei = 0.0

        return ei

    def _upper_confidence_bound(self, x, kappa=2.0):
        """上側信頼限界(Upper Confidence Bound)"""
        x = np.array(x).reshape(1, -1)
        mu, sigma = self.gp.predict(x, return_std=True)
        return -(mu[0] - kappa * sigma[0])  # 最小化問題なので符号反転

    def _update_surrogate(self):
        """サロゲートモデルの更新"""
        X = np.array(self.X_observed)
        y = np.array(self.y_observed)
        self.gp.fit(X, y)

2.2 暗号資産モデル専用のベイズ最適化

class CryptoModelBayesianOptimizer:
    """暗号資産予測モデル専用のベイズ最適化"""

    def __init__(self, model_class, train_data, val_data):
        self.model_class = model_class
        self.train_data = train_data
        self.val_data = val_data

        # 探索空間の定義
        self.search_space = {
            'learning_rate': {'type': 'float', 'low': 1e-5, 'high': 1e-2, 'log': True},
            'batch_size': {'type': 'int', 'low': 16, 'high': 256},
            'hidden_size': {'type': 'int', 'low': 32, 'high': 512},
            'num_layers': {'type': 'int', 'low': 1, 'high': 5},
            'dropout': {'type': 'float', 'low': 0.0, 'high': 0.5},
            'weight_decay': {'type': 'float', 'low': 1e-6, 'high': 1e-2, 'log': True},
            'optimizer': {'type': 'categorical', 'choices': ['adam', 'sgd', 'rmsprop']},
            'activation': {'type': 'categorical', 'choices': ['relu', 'gelu', 'tanh']}
        }

    def objective_function(self, params):
        """目的関数:検証損失を最小化"""
        # モデルの作成
        model = self.model_class(
            hidden_size=params['hidden_size'],
            num_layers=params['num_layers'],
            dropout=params['dropout'],
            activation=params['activation']
        )

        # 最適化器の設定
        if params['optimizer'] == 'adam':
            optimizer = torch.optim.Adam(
                model.parameters(),
                lr=params['learning_rate'],
                weight_decay=params['weight_decay']
            )
        elif params['optimizer'] == 'sgd':
            optimizer = torch.optim.SGD(
                model.parameters(),
                lr=params['learning_rate'],
                weight_decay=params['weight_decay'],
                momentum=0.9
            )
        else:
            optimizer = torch.optim.RMSprop(
                model.parameters(),
                lr=params['learning_rate'],
                weight_decay=params['weight_decay']
            )

        # 訓練
        train_loader = DataLoader(
            self.train_data, 
            batch_size=params['batch_size'],
            shuffle=True
        )

        # 簡易訓練(早期終了あり)
        best_val_loss = float('inf')
        patience = 10
        patience_counter = 0

        for epoch in range(50):  # 最大50エポック
            # 訓練
            model.train()
            for batch in train_loader:
                optimizer.zero_grad()
                loss = model.compute_loss(batch)
                loss.backward()
                optimizer.step()

            # 検証
            val_loss = self._evaluate(model, self.val_data)

            if val_loss < best_val_loss:
                best_val_loss = val_loss
                patience_counter = 0
            else:
                patience_counter += 1
                if patience_counter >= patience:
                    break

        return best_val_loss

    def optimize_with_constraints(self, n_trials=100, constraints=None):
        """制約付き最適化"""
        def objective_with_constraints(trial):
            # パラメータのサンプリング
            params = {
                'learning_rate': trial.suggest_loguniform('learning_rate', 1e-5, 1e-2),
                'batch_size': trial.suggest_int('batch_size', 16, 256, step=16),
                'hidden_size': trial.suggest_int('hidden_size', 32, 512, step=32),
                'num_layers': trial.suggest_int('num_layers', 1, 5),
                'dropout': trial.suggest_float('dropout', 0.0, 0.5),
                'weight_decay': trial.suggest_loguniform('weight_decay', 1e-6, 1e-2),
                'optimizer': trial.suggest_categorical('optimizer', ['adam', 'sgd', 'rmsprop']),
                'activation': trial.suggest_categorical('activation', ['relu', 'gelu', 'tanh'])
            }

            # 制約のチェック
            if constraints:
                # メモリ制約
                if 'max_memory_mb' in constraints:
                    estimated_memory = self._estimate_memory_usage(params)
                    if estimated_memory > constraints['max_memory_mb']:
                        return float('inf')

                # 推論時間制約
                if 'max_inference_ms' in constraints:
                    inference_time = self._estimate_inference_time(params)
                    if inference_time > constraints['max_inference_ms']:
                        return float('inf')

            return self.objective_function(params)

        # Optunaによる最適化
        study = optuna.create_study(
            direction='minimize',
            sampler=optuna.samplers.TPESampler(seed=42)
        )

        study.optimize(objective_with_constraints, n_trials=n_trials)

        return study.best_params, study.best_value

2.3 多目的ベイズ最適化

class MultiObjectiveBayesianOptimizer:
    """多目的ベイズ最適化(精度と効率のトレードオフ)"""

    def __init__(self, objectives: List[Callable], weights: List[float] = None):
        self.objectives = objectives
        self.weights = weights or [1.0] * len(objectives)
        self.pareto_front = []

    def optimize(self, search_space, n_iterations=100):
        """パレート最適解の探索"""

        def multi_objective(trial):
            # パラメータのサンプリング
            params = self._sample_params(trial, search_space)

            # 各目的関数の評価
            objective_values = []
            for obj_func in self.objectives:
                value = obj_func(params)
                objective_values.append(value)

            # パレートフロントの更新
            self._update_pareto_front(params, objective_values)

            # 重み付き和(スカラー化)
            weighted_sum = sum(w * v for w, v in zip(self.weights, objective_values))

            return weighted_sum

        # 最適化の実行
        study = optuna.create_study(
            direction='minimize',
            sampler=optuna.samplers.NSGAIISampler()  # 多目的最適化用サンプラー
        )

        study.optimize(multi_objective, n_trials=n_iterations)

        return self.pareto_front

    def _update_pareto_front(self, params, objectives):
        """パレートフロントの更新"""
        # 新しい解が支配されているかチェック
        dominated = False
        for solution in self.pareto_front:
            if all(solution['objectives'][i] <= objectives[i] 
                   for i in range(len(objectives))):
                dominated = True
                break

        if not dominated:
            # 支配される解を削除
            self.pareto_front = [
                solution for solution in self.pareto_front
                if not all(objectives[i] <= solution['objectives'][i] 
                          for i in range(len(objectives)))
            ]

            # 新しい解を追加
            self.pareto_front.append({
                'params': params,
                'objectives': objectives
            })

3. ベイズニューラルネットワーク

3.1 変分推論によるBNN

class BayesianLinear(nn.Module):
    """ベイズ線形層"""

    def __init__(self, in_features, out_features, prior_var=1.0):
        super(BayesianLinear, self).__init__()
        self.in_features = in_features
        self.out_features = out_features

        # 重みの事前分布
        self.prior_var = prior_var

        # 変分パラメータ
        self.weight_mu = nn.Parameter(torch.Tensor(out_features, in_features))
        self.weight_rho = nn.Parameter(torch.Tensor(out_features, in_features))
        self.bias_mu = nn.Parameter(torch.Tensor(out_features))
        self.bias_rho = nn.Parameter(torch.Tensor(out_features))

        self.reset_parameters()

    def reset_parameters(self):
        """パラメータの初期化"""
        nn.init.kaiming_uniform_(self.weight_mu, a=np.sqrt(5))
        nn.init.constant_(self.weight_rho, -3)
        nn.init.constant_(self.bias_mu, 0)
        nn.init.constant_(self.bias_rho, -3)

    def forward(self, x):
        """順伝播(再パラメータ化トリック)"""
        # 標準偏差の計算
        weight_sigma = torch.log1p(torch.exp(self.weight_rho))
        bias_sigma = torch.log1p(torch.exp(self.bias_rho))

        # サンプリング
        weight_epsilon = torch.randn_like(self.weight_mu)
        bias_epsilon = torch.randn_like(self.bias_mu)

        weight = self.weight_mu + weight_sigma * weight_epsilon
        bias = self.bias_mu + bias_sigma * bias_epsilon

        return F.linear(x, weight, bias)

    def kl_divergence(self):
        """KLダイバージェンスの計算"""
        weight_sigma = torch.log1p(torch.exp(self.weight_rho))
        bias_sigma = torch.log1p(torch.exp(self.bias_rho))

        # KL(q||p) for weights
        kl_weight = 0.5 * torch.sum(
            weight_sigma**2 / self.prior_var + 
            self.weight_mu**2 / self.prior_var - 
            1 - 
            2 * torch.log(weight_sigma) + 
            np.log(self.prior_var)
        )

        # KL(q||p) for bias
        kl_bias = 0.5 * torch.sum(
            bias_sigma**2 / self.prior_var + 
            self.bias_mu**2 / self.prior_var - 
            1 - 
            2 * torch.log(bias_sigma) + 
            np.log(self.prior_var)
        )

        return kl_weight + kl_bias

class BayesianCryptoPredictor(nn.Module):
    """ベイズニューラルネットワークによる暗号資産予測"""

    def __init__(self, input_size, hidden_sizes=[128, 64], output_size=1):
        super(BayesianCryptoPredictor, self).__init__()

        layers = []
        prev_size = input_size

        for hidden_size in hidden_sizes:
            layers.append(BayesianLinear(prev_size, hidden_size))
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(0.2))
            prev_size = hidden_size

        layers.append(BayesianLinear(prev_size, output_size))

        self.network = nn.Sequential(*layers)

    def forward(self, x, n_samples=1):
        """予測(複数サンプル)"""
        if n_samples == 1:
            return self.network(x)
        else:
            # 複数回サンプリングして不確実性を推定
            outputs = []
            for _ in range(n_samples):
                outputs.append(self.network(x))

            outputs = torch.stack(outputs)
            mean = outputs.mean(dim=0)
            std = outputs.std(dim=0)

            return mean, std

    def kl_divergence(self):
        """モデル全体のKLダイバージェンス"""
        kl = 0
        for module in self.modules():
            if isinstance(module, BayesianLinear):
                kl += module.kl_divergence()
        return kl

    def elbo_loss(self, predictions, targets, kl_weight=1.0):
        """Evidence Lower Bound (ELBO) 損失"""
        # 尤度項(負の対数尤度)
        nll = F.mse_loss(predictions, targets, reduction='sum')

        # KL項
        kl = self.kl_divergence()

        # ELBO = -NLL - KL
        return nll + kl_weight * kl

3.2 MCドロップアウトによる不確実性推定

class MCDropoutPredictor(nn.Module):
    """MCドロップアウトによる不確実性推定"""

    def __init__(self, input_size, hidden_sizes=[256, 128], dropout_rate=0.2):
        super(MCDropoutPredictor, self).__init__()

        layers = []
        prev_size = input_size

        for hidden_size in hidden_sizes:
            layers.extend([
                nn.Linear(prev_size, hidden_size),
                nn.ReLU(),
                nn.Dropout(dropout_rate)  # 推論時もドロップアウトを有効に
            ])
            prev_size = hidden_size

        layers.append(nn.Linear(prev_size, 1))

        self.network = nn.Sequential(*layers)
        self.dropout_rate = dropout_rate

    def forward(self, x):
        return self.network(x)

    def predict_with_uncertainty(self, x, n_forward_passes=100):
        """不確実性付き予測"""
        # 訓練モードにしてドロップアウトを有効化
        self.train()

        predictions = []
        with torch.no_grad():
            for _ in range(n_forward_passes):
                pred = self.forward(x)
                predictions.append(pred)

        predictions = torch.stack(predictions)

        # 平均と標準偏差
        mean = predictions.mean(dim=0)
        std = predictions.std(dim=0)

        # 予測区間
        lower = torch.quantile(predictions, 0.025, dim=0)
        upper = torch.quantile(predictions, 0.975, dim=0)

        return {
            'mean': mean,
            'std': std,
            'lower_95': lower,
            'upper_95': upper,
            'epistemic_uncertainty': std.mean().item()
        }

3.3 ガウス過程による時系列予測

class GaussianProcessCrypto:
    """ガウス過程による暗号資産価格予測"""

    def __init__(self, kernel_type='matern'):
        # カーネルの選択
        if kernel_type == 'matern':
            self.kernel = Matern(length_scale=1.0, nu=2.5)
        elif kernel_type == 'rbf':
            self.kernel = RBF(length_scale=1.0)
        elif kernel_type == 'periodic':
            from sklearn.gaussian_process.kernels import ExpSineSquared
            self.kernel = ExpSineSquared(length_scale=1.0, periodicity=1.0)

        # ノイズ項を追加
        self.kernel += WhiteKernel(noise_level=1e-5)

        self.gp = GaussianProcessRegressor(
            kernel=self.kernel,
            alpha=1e-6,
            normalize_y=True,
            n_restarts_optimizer=10
        )

        self.scaler_X = StandardScaler()
        self.scaler_y = StandardScaler()

    def fit(self, X, y):
        """モデルの学習"""
        # データの正規化
        X_scaled = self.scaler_X.fit_transform(X)
        y_scaled = self.scaler_y.fit_transform(y.reshape(-1, 1)).ravel()

        # ガウス過程の学習
        self.gp.fit(X_scaled, y_scaled)

        # 最適化されたカーネルパラメータ
        print(f"Optimized kernel: {self.gp.kernel_}")

    def predict(self, X, return_std=True):
        """予測と不確実性の推定"""
        X_scaled = self.scaler_X.transform(X)

        if return_std:
            y_mean_scaled, y_std_scaled = self.gp.predict(X_scaled, return_std=True)

            # 元のスケールに戻す
            y_mean = self.scaler_y.inverse_transform(
                y_mean_scaled.reshape(-1, 1)
            ).ravel()

            # 標準偏差のスケール調整
            y_std = y_std_scaled * self.scaler_y.scale_[0]

            return y_mean, y_std
        else:
            y_mean_scaled = self.gp.predict(X_scaled, return_std=False)
            y_mean = self.scaler_y.inverse_transform(
                y_mean_scaled.reshape(-1, 1)
            ).ravel()

            return y_mean

    def plot_predictions(self, X_train, y_train, X_test, y_test):
        """予測結果の可視化"""
        import matplotlib.pyplot as plt

        # 予測
        y_pred, y_std = self.predict(X_test, return_std=True)

        # 信頼区間
        confidence_interval = 1.96  # 95%信頼区間

        plt.figure(figsize=(12, 6))

        # 訓練データ
        plt.scatter(X_train[:, 0], y_train, c='black', s=10, label='Training data')

        # 予測
        plt.plot(X_test[:, 0], y_pred, 'b-', label='GP mean')
        plt.fill_between(
            X_test[:, 0],
            y_pred - confidence_interval * y_std,
            y_pred + confidence_interval * y_std,
            alpha=0.2,
            color='blue',
            label='95% confidence interval'
        )

        # 実際の値
        plt.scatter(X_test[:, 0], y_test, c='red', s=10, label='Actual')

        plt.xlabel('Time')
        plt.ylabel('Price')
        plt.title('Gaussian Process Prediction with Uncertainty')
        plt.legend()
        plt.grid(True, alpha=0.3)
        plt.show()

4. 変分推論とMCMC

4.1 変分オートエンコーダーによる特徴学習

class VariationalEncoder(nn.Module):
    """変分エンコーダー"""

    def __init__(self, input_dim, hidden_dim, latent_dim):
        super(VariationalEncoder, self).__init__()

        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2_mu = nn.Linear(hidden_dim, latent_dim)
        self.fc2_logvar = nn.Linear(hidden_dim, latent_dim)

    def forward(self, x):
        h = F.relu(self.fc1(x))
        mu = self.fc2_mu(h)
        logvar = self.fc2_logvar(h)
        return mu, logvar

class VariationalDecoder(nn.Module):
    """変分デコーダー"""

    def __init__(self, latent_dim, hidden_dim, output_dim):
        super(VariationalDecoder, self).__init__()

        self.fc1 = nn.Linear(latent_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, output_dim)

    def forward(self, z):
        h = F.relu(self.fc1(z))
        return self.fc2(h)

class CryptoVAE(nn.Module):
    """暗号資産データ用の変分オートエンコーダー"""

    def __init__(self, input_dim, hidden_dim=128, latent_dim=32):
        super(CryptoVAE, self).__init__()

        self.encoder = VariationalEncoder(input_dim, hidden_dim, latent_dim)
        self.decoder = VariationalDecoder(latent_dim, hidden_dim, input_dim)

    def reparameterize(self, mu, logvar):
        """再パラメータ化トリック"""
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def forward(self, x):
        mu, logvar = self.encoder(x)
        z = self.reparameterize(mu, logvar)
        reconstruction = self.decoder(z)
        return reconstruction, mu, logvar

    def loss_function(self, reconstruction, x, mu, logvar, beta=1.0):
        """VAE損失関数"""
        # 再構成誤差
        reconstruction_loss = F.mse_loss(reconstruction, x, reduction='sum')

        # KLダイバージェンス
        kl_divergence = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())

        # β-VAE損失
        return reconstruction_loss + beta * kl_divergence

    def generate_features(self, x, n_samples=100):
        """潜在特徴の生成と不確実性推定"""
        self.eval()

        with torch.no_grad():
            # 複数回サンプリング
            latent_samples = []
            for _ in range(n_samples):
                mu, logvar = self.encoder(x)
                z = self.reparameterize(mu, logvar)
                latent_samples.append(z)

            latent_samples = torch.stack(latent_samples)

            # 統計量
            mean_latent = latent_samples.mean(dim=0)
            std_latent = latent_samples.std(dim=0)

        return mean_latent, std_latent

4.2 MCMCによるパラメータ推定

import pymc3 as pm
import theano.tensor as tt

class BayesianTimeSeriesModel:
    """MCMCによるベイズ時系列モデル"""

    def __init__(self):
        self.model = None
        self.trace = None

    def build_model(self, data):
        """階層ベイズモデルの構築"""
        with pm.Model() as model:
            # ハイパープライア
            mu_alpha = pm.Normal('mu_alpha', mu=0, sigma=1)
            sigma_alpha = pm.HalfNormal('sigma_alpha', sigma=1)

            # 時変パラメータ
            alpha = pm.Normal('alpha', mu=mu_alpha, sigma=sigma_alpha, 
                            shape=len(data))

            # トレンド成分
            trend = pm.GaussianRandomWalk('trend', sigma=0.1, 
                                         shape=len(data))

            # 季節性成分(週次)
            seasonal_period = 7
            seasonal = pm.Normal('seasonal', mu=0, sigma=1, 
                                shape=seasonal_period)

            # 季節性の繰り返し
            seasonal_full = tt.tile(seasonal, len(data) // seasonal_period + 1)[:len(data)]

            # ボラティリティ(GARCH風)
            sigma = pm.Exponential('sigma', lam=1)

            # 観測モデル
            mu = alpha + trend + seasonal_full

            # 尤度
            observations = pm.Normal('obs', mu=mu, sigma=sigma, 
                                   observed=data)

        self.model = model
        return model

    def fit(self, data, n_samples=2000, n_chains=2):
        """MCMCによるパラメータ推定"""
        if self.model is None:
            self.build_model(data)

        with self.model:
            # NUTS(No-U-Turn Sampler)
            self.trace = pm.sample(
                n_samples,
                chains=n_chains,
                tune=1000,
                target_accept=0.9,
                return_inferencedata=True
            )

        return self.trace

    def predict(self, n_periods=30):
        """将来予測"""
        with self.model:
            # 事後予測サンプリング
            posterior_predictive = pm.sample_posterior_predictive(
                self.trace,
                samples=1000,
                var_names=['obs']
            )

        # 予測の統計量
        predictions = posterior_predictive['obs']
        mean_pred = predictions.mean(axis=0)
        std_pred = predictions.std(axis=0)

        # 予測区間
        lower_95 = np.percentile(predictions, 2.5, axis=0)
        upper_95 = np.percentile(predictions, 97.5, axis=0)

        return {
            'mean': mean_pred,
            'std': std_pred,
            'lower_95': lower_95,
            'upper_95': upper_95
        }

    def plot_components(self):
        """成分分解のプロット"""
        import matplotlib.pyplot as plt

        fig, axes = plt.subplots(3, 1, figsize=(12, 10))

        # トレンド
        trend_samples = self.trace.posterior['trend'].values
        trend_mean = trend_samples.mean(axis=(0, 1))
        trend_std = trend_samples.std(axis=(0, 1))

        axes[0].plot(trend_mean, label='Trend')
        axes[0].fill_between(
            range(len(trend_mean)),
            trend_mean - 2*trend_std,
            trend_mean + 2*trend_std,
            alpha=0.3
        )
        axes[0].set_title('Trend Component')
        axes[0].legend()

        # 季節性
        seasonal_samples = self.trace.posterior['seasonal'].values
        seasonal_mean = seasonal_samples.mean(axis=(0, 1))

        axes[1].plot(seasonal_mean, 'o-')
        axes[1].set_title('Seasonal Component (Weekly)')
        axes[1].set_xlabel('Day of Week')

        # ボラティリティ
        sigma_samples = self.trace.posterior['sigma'].values.flatten()
        axes[2].hist(sigma_samples, bins=50, alpha=0.7)
        axes[2].axvline(sigma_samples.mean(), color='red', 
                       linestyle='--', label=f'Mean: {sigma_samples.mean():.3f}')
        axes[2].set_title('Volatility Distribution')
        axes[2].legend()

        plt.tight_layout()
        plt.show()

5. アンサンブル不確実性推定

5.1 ディープアンサンブル

class DeepEnsemble:
    """ディープアンサンブルによる不確実性推定"""

    def __init__(self, model_class, n_models=5):
        self.model_class = model_class
        self.n_models = n_models
        self.models = []

    def train(self, train_data, val_data, **kwargs):
        """アンサンブルメンバーの訓練"""
        for i in range(self.n_models):
            print(f"Training model {i+1}/{self.n_models}")

            # 異なる初期化とデータシャッフル
            model = self.model_class(**kwargs)

            # 異なる乱数シードでデータをシャッフル
            train_loader = DataLoader(
                train_data,
                batch_size=32,
                shuffle=True,
                generator=torch.Generator().manual_seed(i)
            )

            # 訓練
            self._train_single_model(model, train_loader, val_data)
            self.models.append(model)

    def predict_with_uncertainty(self, x):
        """アンサンブル予測と不確実性"""
        predictions = []

        for model in self.models:
            model.eval()
            with torch.no_grad():
                pred = model(x)
                predictions.append(pred)

        predictions = torch.stack(predictions)

        # 予測統計
        mean = predictions.mean(dim=0)

        # 認識的不確実性(モデル間の分散)
        epistemic_uncertainty = predictions.var(dim=0)

        # 偶然的不確実性(各モデルの予測分散の平均)
        if hasattr(self.models[0], 'predict_with_variance'):
            aleatoric_uncertainties = []
            for model in self.models:
                _, var = model.predict_with_variance(x)
                aleatoric_uncertainties.append(var)

            aleatoric_uncertainty = torch.stack(aleatoric_uncertainties).mean(dim=0)

            # 全体の不確実性
            total_uncertainty = epistemic_uncertainty + aleatoric_uncertainty
        else:
            total_uncertainty = epistemic_uncertainty
            aleatoric_uncertainty = None

        return {
            'mean': mean,
            'epistemic_uncertainty': epistemic_uncertainty,
            'aleatoric_uncertainty': aleatoric_uncertainty,
            'total_uncertainty': total_uncertainty,
            'individual_predictions': predictions
        }

5.2 ベイズモデル平均

class BayesianModelAveraging:
    """ベイズモデル平均による予測統合"""

    def __init__(self, models, prior_weights=None):
        self.models = models
        self.prior_weights = prior_weights or np.ones(len(models)) / len(models)
        self.posterior_weights = None

    def update_weights(self, validation_data):
        """検証データに基づく事後重みの更新"""
        log_likelihoods = []

        for model in self.models:
            # 各モデルの対数尤度を計算
            ll = self._compute_log_likelihood(model, validation_data)
            log_likelihoods.append(ll)

        log_likelihoods = np.array(log_likelihoods)

        # ベイズの定理による重みの更新
        log_posterior = np.log(self.prior_weights) + log_likelihoods
        log_posterior -= np.max(log_posterior)  # 数値安定性

        posterior_weights = np.exp(log_posterior)
        self.posterior_weights = posterior_weights / posterior_weights.sum()

        print("Model weights:", self.posterior_weights)

    def predict(self, x):
        """重み付き予測"""
        predictions = []
        uncertainties = []

        for model, weight in zip(self.models, self.posterior_weights):
            if hasattr(model, 'predict_with_uncertainty'):
                pred_dict = model.predict_with_uncertainty(x)
                pred = pred_dict['mean']
                unc = pred_dict.get('total_uncertainty', 
                                   pred_dict.get('std', torch.zeros_like(pred)))
            else:
                pred = model(x)
                unc = torch.zeros_like(pred)

            predictions.append(weight * pred)
            uncertainties.append(weight * unc)

        # 加重平均
        mean_prediction = sum(predictions)

        # 不確実性の統合
        # モデル間の分散も考慮
        model_variance = sum(
            weight * (pred - mean_prediction)**2 
            for pred, weight in zip(predictions, self.posterior_weights)
        )

        total_uncertainty = sum(uncertainties) + model_variance

        return {
            'prediction': mean_prediction,
            'uncertainty': total_uncertainty,
            'model_weights': self.posterior_weights
        }

6. 実践的な統合システム

6.1 ベイズ最適化と不確実性定量化の統合

class BayesianCryptoTradingSystem:
    """ベイズ手法を統合した暗号資産取引システム"""

    def __init__(self, config):
        self.config = config

        # ベイズ最適化器
        self.optimizer = CryptoModelBayesianOptimizer(
            model_class=BayesianCryptoPredictor,
            train_data=config['train_data'],
            val_data=config['val_data']
        )

        # アンサンブルモデル
        self.ensemble = None

        # ガウス過程(短期予測用)
        self.gp_short_term = GaussianProcessCrypto(kernel_type='matern')

        # ベイズ時系列モデル(長期トレンド)
        self.bayesian_ts = BayesianTimeSeriesModel()

    def optimize_and_train(self):
        """モデルの最適化と訓練"""
        # 1. ベイズ最適化でハイパーパラメータ探索
        print("Optimizing hyperparameters...")
        best_params, best_value = self.optimizer.optimize_with_constraints(
            n_trials=50,
            constraints={
                'max_memory_mb': 1024,
                'max_inference_ms': 10
            }
        )

        print(f"Best parameters: {best_params}")
        print(f"Best validation loss: {best_value}")

        # 2. 最適パラメータでアンサンブル訓練
        print("Training ensemble...")
        self.ensemble = DeepEnsemble(
            model_class=BayesianCryptoPredictor,
            n_models=5
        )

        self.ensemble.train(
            self.config['train_data'],
            self.config['val_data'],
            **best_params
        )

        # 3. ガウス過程の訓練(短期予測)
        print("Training Gaussian Process...")
        X_recent = self.config['recent_features']
        y_recent = self.config['recent_prices']
        self.gp_short_term.fit(X_recent, y_recent)

        # 4. ベイズ時系列モデルの訓練(長期トレンド)
        print("Training Bayesian time series model...")
        self.bayesian_ts.fit(self.config['price_history'])

    def predict_with_full_uncertainty(self, features, horizon='short'):
        """完全な不確実性を考慮した予測"""
        results = {}

        if horizon == 'short':
            # 短期予測:アンサンブル + GP

            # アンサンブル予測
            ensemble_pred = self.ensemble.predict_with_uncertainty(features)

            # ガウス過程予測
            gp_mean, gp_std = self.gp_short_term.predict(
                features.numpy(), 
                return_std=True
            )

            # 予測の統合
            combined_mean = 0.7 * ensemble_pred['mean'] + 0.3 * torch.tensor(gp_mean)

            # 不確実性の統合
            ensemble_unc = ensemble_pred['total_uncertainty']
            gp_unc = torch.tensor(gp_std**2)

            combined_uncertainty = torch.sqrt(
                0.7**2 * ensemble_unc + 0.3**2 * gp_unc
            )

            results['prediction'] = combined_mean
            results['uncertainty'] = combined_uncertainty
            results['confidence_interval'] = self._compute_confidence_interval(
                combined_mean, combined_uncertainty
            )

        elif horizon == 'long':
            # 長期予測:ベイズ時系列モデル
            ts_pred = self.bayesian_ts.predict(n_periods=30)

            results['prediction'] = ts_pred['mean']
            results['uncertainty'] = ts_pred['std']
            results['confidence_interval'] = (ts_pred['lower_95'], ts_pred['upper_95'])

        # リスク指標の計算
        results['risk_metrics'] = self._compute_risk_metrics(results)

        return results

    def _compute_risk_metrics(self, predictions):
        """予測に基づくリスク指標"""
        uncertainty = predictions['uncertainty']

        # 変動係数(Coefficient of Variation)
        cv = uncertainty / torch.abs(predictions['prediction'])

        # 予測の信頼度スコア
        confidence_score = 1 / (1 + cv)

        # VaR(Value at Risk)
        var_95 = predictions['prediction'] - 1.96 * uncertainty

        # CVaR(Conditional Value at Risk)
        # 正規分布を仮定
        from scipy.stats import norm
        alpha = 0.05
        phi = norm.pdf(norm.ppf(alpha))
        cvar = predictions['prediction'] - uncertainty * phi / alpha

        return {
            'coefficient_of_variation': cv,
            'confidence_score': confidence_score,
            'var_95': var_95,
            'cvar_95': cvar,
            'uncertainty_percentile': self._uncertainty_to_percentile(uncertainty)
        }

    def _uncertainty_to_percentile(self, uncertainty):
        """不確実性を市場のパーセンタイルに変換"""
        # 過去の不確実性分布との比較
        historical_uncertainties = self.config.get('historical_uncertainties', [])

        if historical_uncertainties:
            percentile = np.searchsorted(
                np.sort(historical_uncertainties), 
                uncertainty.numpy()
            ) / len(historical_uncertainties)
        else:
            percentile = 0.5  # デフォルト

        return percentile

    def make_trading_decision(self, current_price, features):
        """不確実性を考慮した取引判断"""
        # 予測
        predictions = self.predict_with_full_uncertainty(features, horizon='short')

        pred_price = predictions['prediction'].item()
        uncertainty = predictions['uncertainty'].item()
        confidence = predictions['risk_metrics']['confidence_score'].item()

        # 期待リターン
        expected_return = (pred_price - current_price) / current_price

        # リスク調整済みリターン
        risk_adjusted_return = expected_return / (1 + uncertainty / current_price)

        # 取引シグナル
        signal = {
            'action': 'hold',
            'confidence': confidence,
            'size': 0,
            'reason': ''
        }

        # 閾値ベースの判断
        if confidence > self.config['min_confidence']:
            if risk_adjusted_return > self.config['buy_threshold']:
                signal['action'] = 'buy'
                signal['size'] = self._calculate_position_size(confidence, uncertainty)
                signal['reason'] = f'Expected return: {expected_return:.2%}, Risk-adjusted: {risk_adjusted_return:.2%}'

            elif risk_adjusted_return < -self.config['sell_threshold']:
                signal['action'] = 'sell'
                signal['size'] = self._calculate_position_size(confidence, uncertainty)
                signal['reason'] = f'Expected return: {expected_return:.2%}, Risk-adjusted: {risk_adjusted_return:.2%}'
        else:
            signal['reason'] = f'Low confidence: {confidence:.2f}'

        return signal

    def _calculate_position_size(self, confidence, uncertainty):
        """信頼度と不確実性に基づくポジションサイズ"""
        # Kelly基準の修正版
        base_size = self.config['base_position_size']

        # 信頼度による調整
        confidence_factor = confidence ** 2  # 非線形スケーリング

        # 不確実性による調整
        uncertainty_factor = 1 / (1 + uncertainty)

        # 最終的なポジションサイズ
        position_size = base_size * confidence_factor * uncertainty_factor

        # 上限設定
        max_size = self.config['max_position_size']
        position_size = min(position_size, max_size)

        return position_size

6.2 バックテストと評価

class BayesianBacktester:
    """ベイズモデルのバックテスト"""

    def __init__(self, model_system, initial_capital=10000):
        self.model_system = model_system
        self.initial_capital = initial_capital

    def backtest(self, test_data, start_date, end_date):
        """不確実性を考慮したバックテスト"""
        results = {
            'dates': [],
            'prices': [],
            'predictions': [],
            'uncertainties': [],
            'positions': [],
            'portfolio_values': [],
            'trades': []
        }

        capital = self.initial_capital
        position = 0

        for date, data in test_data.iterate_by_date(start_date, end_date):
            current_price = data['price']
            features = data['features']

            # 予測と取引判断
            pred_result = self.model_system.predict_with_full_uncertainty(
                features, horizon='short'
            )

            signal = self.model_system.make_trading_decision(
                current_price, features
            )

            # 取引実行
            if signal['action'] == 'buy' and position <= 0:
                # 買い
                shares = (capital * signal['size']) / current_price
                position = shares
                capital -= shares * current_price * (1 + 0.001)  # 手数料

                results['trades'].append({
                    'date': date,
                    'action': 'buy',
                    'price': current_price,
                    'shares': shares,
                    'confidence': signal['confidence'],
                    'uncertainty': pred_result['uncertainty'].item()
                })

            elif signal['action'] == 'sell' and position > 0:
                # 売り
                capital += position * current_price * (1 - 0.001)  # 手数料

                results['trades'].append({
                    'date': date,
                    'action': 'sell',
                    'price': current_price,
                    'shares': position,
                    'confidence': signal['confidence'],
                    'uncertainty': pred_result['uncertainty'].item()
                })

                position = 0

            # ポートフォリオ価値
            portfolio_value = capital + position * current_price

            # 結果の記録
            results['dates'].append(date)
            results['prices'].append(current_price)
            results['predictions'].append(pred_result['prediction'].item())
            results['uncertainties'].append(pred_result['uncertainty'].item())
            results['positions'].append(position)
            results['portfolio_values'].append(portfolio_value)

        # パフォーマンス分析
        performance = self._analyze_performance(results)

        return results, performance

    def _analyze_performance(self, results):
        """パフォーマンス分析"""
        portfolio_values = np.array(results['portfolio_values'])

        # 基本指標
        total_return = (portfolio_values[-1] - portfolio_values[0]) / portfolio_values[0]

        # リターン系列
        returns = np.diff(portfolio_values) / portfolio_values[:-1]

        # シャープレシオ
        sharpe_ratio = np.mean(returns) / np.std(returns) * np.sqrt(252)

        # 最大ドローダウン
        cumulative = np.cumprod(1 + returns)
        running_max = np.maximum.accumulate(cumulative)
        drawdown = (cumulative - running_max) / running_max
        max_drawdown = np.min(drawdown)

        # 不確実性調整済み指標
        uncertainties = np.array(results['uncertainties'])
        uncertainty_weighted_returns = returns[1:] / (1 + uncertainties[:-1])

        uncertainty_adjusted_sharpe = (
            np.mean(uncertainty_weighted_returns) / 
            np.std(uncertainty_weighted_returns) * np.sqrt(252)
        )

        # 取引統計
        trades = results['trades']
        n_trades = len(trades)

        win_trades = [t for t in trades if t['action'] == 'sell' 
                     and t['price'] > prev_buy_price]  # 簡略化
        win_rate = len(win_trades) / n_trades if n_trades > 0 else 0

        # 信頼度と精度の相関
        if trades:
            confidences = [t['confidence'] for t in trades]
            # 実際の利益/損失を計算(簡略化)
            confidence_accuracy_corr = np.corrcoef(confidences, 
                                                  [1 if t in win_trades else 0 
                                                   for t in trades])[0, 1]
        else:
            confidence_accuracy_corr = 0

        return {
            'total_return': total_return,
            'sharpe_ratio': sharpe_ratio,
            'max_drawdown': max_drawdown,
            'uncertainty_adjusted_sharpe': uncertainty_adjusted_sharpe,
            'n_trades': n_trades,
            'win_rate': win_rate,
            'confidence_accuracy_correlation': confidence_accuracy_corr,
            'avg_uncertainty': np.mean(uncertainties),
            'uncertainty_trend': np.polyfit(range(len(uncertainties)), 
                                          uncertainties, 1)[0]
        }

7. まとめとベストプラクティス

7.1 実装ガイドライン

# ベイズ手法実装のチェックリスト
BAYESIAN_IMPLEMENTATION_CHECKLIST = {
    "ベイズ最適化": [
        "適切な獲得関数の選択",
        "初期サンプル数の設定(10-20点)",
        "探索と活用のバランス調整",
        "制約条件の明確な定義"
    ],
    "不確実性定量化": [
        "認識的・偶然的不確実性の区別",
        "適切な事前分布の選択",
        "計算コストとのトレードオフ",
        "キャリブレーションの確認"
    ],
    "モデル選択": [
        "データ量に応じた手法選択",
        "MCドロップアウト(簡易)",
        "変分推論(中規模)",
        "MCMC(小規模・高精度)"
    ],
    "運用": [
        "不確実性に基づくアラート設定",
        "信頼度閾値の動的調整",
        "定期的な再キャリブレーション",
        "異常な不確実性の検出"
    ]
}

7.2 注意点と推奨事項

  1. 計算コスト
    - ベイズ手法は計算コストが高い
    - リアルタイム用途では近似手法を検討
    - GPUの活用が重要

  2. 過信の回避
    - 不確実性推定も完璧ではない
    - 複数の手法を組み合わせる
    - 定期的な検証が必要

  3. ドメイン知識の活用
    - 事前分布にドメイン知識を反映
    - 制約条件の適切な設定
    - 異常値の適切な処理

  4. 継続的改善
    - 新しいデータでの更新
    - ハイパーパラメータの再最適化
    - モデルアンサンブルの更新

ベイズアプローチは暗号資産の高い不確実性に対処する強力なツールです。適切に実装することで、より信頼性の高い予測と堅牢な取引戦略が実現できます。