ML Documentation

ARIMAとSARIMAによる暗号通貨時系列分析ガイド

1. はじめに

このドキュメントは、GitHubリポジトリ「Univariate-time-series-analysis-of-cryptocurrency-data-with-ARIMA-and-SARIMA-and-hypergrid-search」の分析結果をまとめたものです。ARIMA(自己回帰和分移動平均)モデルとSARIMA(季節性ARIMA)モデルを使用した暗号通貨価格の時系列予測について詳しく解説します。

2. ARIMAモデルの基礎

2.1 ARIMAとは

ARIMA(p,d,q)モデルは、時系列データの予測に広く使用される統計モデルです:

数式表現:

(1 - φ₁L - φ₂L² - ... - φₚLᵖ)(1 - L)ᵈyₜ = (1 + θ₁L + θ₂L² + ... + θₑLᵠ)εₜ

2.2 ARIMAモデルの前提条件

  1. 定常性: 時系列の統計的性質が時間によって変化しない
  2. 線形性: 将来の値が過去の値の線形結合で表現できる
  3. 誤差項の独立性: 誤差項が互いに独立で同一分布に従う

3. SARIMAモデル

3.1 季節性の考慮

SARIMA(p,d,q)(P,D,Q)sモデルは、ARIMAに季節性成分を追加:

3.2 暗号通貨における季節性

暗号通貨市場では以下の周期性が観察されます:

4. 実装コード

4.1 データの準備と前処理

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from statsmodels.tsa.stattools import adfuller, kpss
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.statespace.sarimax import SARIMAX
import warnings
warnings.filterwarnings('ignore')

class CryptoTimeSeriesAnalyzer:
    """暗号通貨時系列分析クラス"""

    def __init__(self, data):
        self.data = data
        self.original_data = data.copy()

    def check_stationarity(self, series, name='Series'):
        """定常性の検定"""
        # ADF検定(Augmented Dickey-Fuller test)
        adf_result = adfuller(series.dropna())
        print(f"\n=== {name} の定常性検定 ===")
        print(f"ADF統計量: {adf_result[0]:.4f}")
        print(f"p値: {adf_result[1]:.4f}")
        print(f"臨界値:")
        for key, value in adf_result[4].items():
            print(f"  {key}: {value:.4f}")

        # KPSS検定
        kpss_result = kpss(series.dropna(), regression='c')
        print(f"\nKPSS統計量: {kpss_result[0]:.4f}")
        print(f"p値: {kpss_result[1]:.4f}")

        # 判定
        is_stationary = adf_result[1] < 0.05 and kpss_result[1] > 0.05
        print(f"\n定常性: {'あり' if is_stationary else 'なし'}")

        return is_stationary

    def make_stationary(self, series, max_diff=2):
        """時系列を定常化"""
        diff_count = 0
        current_series = series.copy()

        while diff_count < max_diff:
            if self.check_stationarity(current_series, f"{diff_count}次差分"):
                return current_series, diff_count

            diff_count += 1
            current_series = current_series.diff().dropna()

        return current_series, diff_count

    def plot_diagnostics(self, series, lags=40):
        """ACFとPACFのプロット"""
        fig, axes = plt.subplots(3, 1, figsize=(12, 10))

        # 時系列プロット
        series.plot(ax=axes[0])
        axes[0].set_title('時系列データ')

        # ACF
        plot_acf(series.dropna(), lags=lags, ax=axes[1])
        axes[1].set_title('自己相関関数 (ACF)')

        # PACF
        plot_pacf(series.dropna(), lags=lags, ax=axes[2])
        axes[2].set_title('偏自己相関関数 (PACF)')

        plt.tight_layout()
        plt.show()

4.2 ARIMAモデルの実装

class ARIMAModel:
    """ARIMAモデルの実装"""

    def __init__(self, data):
        self.data = data
        self.model = None
        self.results = None

    def auto_arima(self, series, max_p=5, max_d=2, max_q=5):
        """グリッドサーチによる最適パラメータ探索"""
        best_aic = np.inf
        best_params = None
        best_model = None

        # グリッドサーチ
        for p in range(max_p + 1):
            for d in range(max_d + 1):
                for q in range(max_q + 1):
                    try:
                        model = ARIMA(series, order=(p, d, q))
                        results = model.fit()

                        if results.aic < best_aic:
                            best_aic = results.aic
                            best_params = (p, d, q)
                            best_model = results

                    except Exception as e:
                        continue

        print(f"最適パラメータ: ARIMA{best_params}")
        print(f"AIC: {best_aic:.2f}")

        self.model = best_model
        self.params = best_params

        return best_model

    def fit(self, series, order):
        """ARIMAモデルのフィット"""
        self.model = ARIMA(series, order=order)
        self.results = self.model.fit()

        # モデル診断
        print(self.results.summary())

        return self.results

    def forecast(self, steps, alpha=0.05):
        """将来予測"""
        # 予測
        forecast = self.results.forecast(steps=steps)

        # 予測区間
        forecast_df = pd.DataFrame({
            'forecast': forecast,
            'lower': forecast - 1.96 * np.sqrt(self.results.mse),
            'upper': forecast + 1.96 * np.sqrt(self.results.mse)
        })

        return forecast_df

    def plot_results(self, train, test, forecast_df):
        """結果のプロット"""
        plt.figure(figsize=(12, 6))

        # 実データ
        plt.plot(train.index, train.values, label='訓練データ', color='blue')
        plt.plot(test.index, test.values, label='テストデータ', color='green')

        # 予測
        plt.plot(forecast_df.index, forecast_df['forecast'], 
                label='予測', color='red', linestyle='--')

        # 予測区間
        plt.fill_between(forecast_df.index, 
                        forecast_df['lower'], 
                        forecast_df['upper'], 
                        alpha=0.3, color='red')

        plt.xlabel('日付')
        plt.ylabel('価格')
        plt.title('ARIMA予測結果')
        plt.legend()
        plt.grid(True, alpha=0.3)
        plt.show()

    def residual_diagnostics(self):
        """残差診断"""
        residuals = self.results.resid

        fig, axes = plt.subplots(2, 2, figsize=(12, 8))

        # 残差プロット
        residuals.plot(ax=axes[0, 0])
        axes[0, 0].set_title('残差')

        # 残差のヒストグラム
        residuals.hist(bins=30, ax=axes[0, 1])
        axes[0, 1].set_title('残差の分布')

        # Q-Qプロット
        from scipy import stats
        stats.probplot(residuals, dist="norm", plot=axes[1, 0])
        axes[1, 0].set_title('Q-Qプロット')

        # 残差のACF
        plot_acf(residuals, lags=20, ax=axes[1, 1])
        axes[1, 1].set_title('残差のACF')

        plt.tight_layout()
        plt.show()

        # Ljung-Box検定
        from statsmodels.stats.diagnostic import acorr_ljungbox
        lb_result = acorr_ljungbox(residuals, lags=10, return_df=True)
        print("\nLjung-Box検定:")
        print(lb_result)

4.3 SARIMAモデルの実装

class SARIMAModel:
    """SARIMAモデルの実装"""

    def __init__(self, data):
        self.data = data
        self.model = None
        self.results = None

    def find_seasonal_period(self, series, max_lag=365):
        """季節周期の自動検出"""
        from statsmodels.tsa.seasonal import seasonal_decompose

        # 複数の候補周期でテスト
        candidate_periods = [7, 14, 30, 90, 365]  # 週、2週、月、四半期、年

        best_period = None
        min_residual_var = np.inf

        for period in candidate_periods:
            if len(series) < 2 * period:
                continue

            try:
                decomposition = seasonal_decompose(series, model='additive', period=period)
                residual_var = decomposition.resid.dropna().var()

                if residual_var < min_residual_var:
                    min_residual_var = residual_var
                    best_period = period

            except:
                continue

        print(f"検出された季節周期: {best_period}")
        return best_period

    def grid_search_sarima(self, series, seasonal_period, 
                          p_range=(0, 3), d_range=(0, 2), q_range=(0, 3),
                          P_range=(0, 2), D_range=(0, 1), Q_range=(0, 2)):
        """SARIMAパラメータのグリッドサーチ"""
        best_aic = np.inf
        best_params = None
        best_seasonal_params = None

        total_combinations = (
            (p_range[1] - p_range[0] + 1) * 
            (d_range[1] - d_range[0] + 1) * 
            (q_range[1] - q_range[0] + 1) * 
            (P_range[1] - P_range[0] + 1) * 
            (D_range[1] - D_range[0] + 1) * 
            (Q_range[1] - Q_range[0] + 1)
        )

        print(f"総組み合わせ数: {total_combinations}")

        for p in range(p_range[0], p_range[1] + 1):
            for d in range(d_range[0], d_range[1] + 1):
                for q in range(q_range[0], q_range[1] + 1):
                    for P in range(P_range[0], P_range[1] + 1):
                        for D in range(D_range[0], D_range[1] + 1):
                            for Q in range(Q_range[0], Q_range[1] + 1):
                                try:
                                    model = SARIMAX(
                                        series,
                                        order=(p, d, q),
                                        seasonal_order=(P, D, Q, seasonal_period),
                                        enforce_stationarity=False,
                                        enforce_invertibility=False
                                    )
                                    results = model.fit(disp=False)

                                    if results.aic < best_aic:
                                        best_aic = results.aic
                                        best_params = (p, d, q)
                                        best_seasonal_params = (P, D, Q, seasonal_period)

                                except:
                                    continue

        print(f"\n最適パラメータ:")
        print(f"ARIMA{best_params} x SARIMA{best_seasonal_params}")
        print(f"AIC: {best_aic:.2f}")

        return best_params, best_seasonal_params

    def fit(self, series, order, seasonal_order):
        """SARIMAモデルのフィット"""
        self.model = SARIMAX(
            series,
            order=order,
            seasonal_order=seasonal_order,
            enforce_stationarity=False,
            enforce_invertibility=False
        )
        self.results = self.model.fit(disp=False)

        print(self.results.summary())

        return self.results

    def forecast(self, steps):
        """予測と予測区間"""
        forecast = self.results.forecast(steps=steps)
        forecast_ci = self.results.get_forecast(steps=steps).conf_int()

        forecast_df = pd.DataFrame({
            'forecast': forecast,
            'lower': forecast_ci.iloc[:, 0],
            'upper': forecast_ci.iloc[:, 1]
        })

        return forecast_df

4.4 ハイパーグリッドサーチの実装

class HyperGridSearch:
    """ハイパーパラメータのグリッドサーチ"""

    def __init__(self, series):
        self.series = series
        self.results = []

    def evaluate_model(self, train, test, order, seasonal_order=None):
        """モデルの評価"""
        try:
            if seasonal_order:
                model = SARIMAX(train, order=order, seasonal_order=seasonal_order)
            else:
                model = ARIMA(train, order=order)

            fitted = model.fit(disp=False)

            # 予測
            forecast = fitted.forecast(steps=len(test))

            # 評価指標
            mse = np.mean((forecast - test) ** 2)
            mae = np.mean(np.abs(forecast - test))
            mape = np.mean(np.abs((test - forecast) / test)) * 100

            return {
                'order': order,
                'seasonal_order': seasonal_order,
                'aic': fitted.aic,
                'bic': fitted.bic,
                'mse': mse,
                'mae': mae,
                'mape': mape
            }

        except:
            return None

    def parallel_grid_search(self, train, test, param_grid, n_jobs=-1):
        """並列グリッドサーチ"""
        from joblib import Parallel, delayed

        # パラメータの組み合わせを生成
        param_combinations = []
        for params in param_grid:
            param_combinations.append(params)

        # 並列処理
        results = Parallel(n_jobs=n_jobs)(
            delayed(self.evaluate_model)(train, test, *params) 
            for params in param_combinations
        )

        # 結果をフィルタリング
        valid_results = [r for r in results if r is not None]

        # データフレームに変換
        results_df = pd.DataFrame(valid_results)
        results_df = results_df.sort_values('aic')

        return results_df

4.5 暗号通貨データへの適用

class CryptoARIMAForecaster:
    """暗号通貨価格予測システム"""

    def __init__(self, symbol='BTC-USD'):
        self.symbol = symbol
        self.data = None
        self.model = None

    def fetch_data(self, start_date, end_date):
        """データの取得"""
        import yfinance as yf

        ticker = yf.Ticker(self.symbol)
        self.data = ticker.history(start=start_date, end=end_date)

        # 価格データの対数変換(価格の安定化)
        self.data['log_close'] = np.log(self.data['Close'])

        return self.data

    def prepare_features(self):
        """特徴量の準備"""
        df = self.data.copy()

        # リターン
        df['returns'] = df['Close'].pct_change()
        df['log_returns'] = df['log_close'].diff()

        # ボラティリティ
        df['volatility'] = df['returns'].rolling(window=20).std()

        # 取引量関連
        df['volume_ma'] = df['Volume'].rolling(window=20).mean()
        df['volume_ratio'] = df['Volume'] / df['volume_ma']

        return df

    def train_test_split(self, test_size=0.2):
        """訓練・テストデータの分割"""
        n = len(self.data)
        split_idx = int(n * (1 - test_size))

        train = self.data['log_close'][:split_idx]
        test = self.data['log_close'][split_idx:]

        return train, test

    def build_ensemble_forecast(self, train, test, horizon):
        """アンサンブル予測"""
        forecasts = {}

        # 1. 単純なARIMA
        arima = ARIMAModel(train)
        arima_best = arima.auto_arima(train)
        arima_forecast = arima.forecast(horizon)
        forecasts['ARIMA'] = arima_forecast

        # 2. SARIMA(週次季節性)
        sarima = SARIMAModel(train)
        sarima_params, sarima_seasonal = sarima.grid_search_sarima(
            train, seasonal_period=7
        )
        sarima.fit(train, sarima_params, sarima_seasonal)
        sarima_forecast = sarima.forecast(horizon)
        forecasts['SARIMA'] = sarima_forecast

        # 3. アンサンブル予測
        ensemble_forecast = pd.DataFrame()
        ensemble_forecast['forecast'] = np.mean([
            forecasts['ARIMA']['forecast'],
            forecasts['SARIMA']['forecast']
        ], axis=0)

        # 対数変換を元に戻す
        ensemble_forecast['price_forecast'] = np.exp(ensemble_forecast['forecast'])

        return ensemble_forecast, forecasts

    def backtest_strategy(self, predictions, actual, initial_capital=10000):
        """予測に基づく取引戦略のバックテスト"""
        capital = initial_capital
        position = 0
        trades = []

        for i in range(len(predictions)):
            pred_return = predictions.iloc[i]
            actual_price = np.exp(actual.iloc[i])

            # 予測リターンが正なら買い、負なら売り
            if pred_return > 0 and position == 0:
                # 買い
                position = capital / actual_price
                capital = 0
                trades.append(('buy', actual_price, position))

            elif pred_return < 0 and position > 0:
                # 売り
                capital = position * actual_price
                position = 0
                trades.append(('sell', actual_price, capital))

        # 最終評価
        if position > 0:
            capital = position * np.exp(actual.iloc[-1])

        total_return = (capital - initial_capital) / initial_capital

        return {
            'final_capital': capital,
            'total_return': total_return,
            'trades': trades
        }

4.6 モデル診断と検証

class ModelDiagnostics:
    """モデル診断ツール"""

    def __init__(self, model_results):
        self.results = model_results

    def check_residuals(self):
        """残差の診断"""
        residuals = self.results.resid

        # 正規性検定
        from scipy.stats import jarque_bera, shapiro

        jb_stat, jb_pvalue = jarque_bera(residuals)
        shapiro_stat, shapiro_pvalue = shapiro(residuals)

        print("=== 残差の正規性検定 ===")
        print(f"Jarque-Bera検定: 統計量={jb_stat:.4f}, p値={jb_pvalue:.4f}")
        print(f"Shapiro-Wilk検定: 統計量={shapiro_stat:.4f}, p値={shapiro_pvalue:.4f}")

        # 自己相関の検定
        from statsmodels.stats.diagnostic import acorr_ljungbox

        lb_result = acorr_ljungbox(residuals, lags=20, return_df=True)
        print("\n=== Ljung-Box検定(残差の自己相関)===")
        print(lb_result[['lb_stat', 'lb_pvalue']].head(10))

        # ARCH効果の検定
        from statsmodels.stats.diagnostic import het_arch

        arch_result = het_arch(residuals)
        print(f"\n=== ARCH検定 ===")
        print(f"統計量: {arch_result[0]:.4f}")
        print(f"p値: {arch_result[1]:.4f}")

    def plot_forecast_vs_actual(self, forecast, actual):
        """予測と実績の比較プロット"""
        fig, axes = plt.subplots(2, 1, figsize=(12, 8))

        # 時系列プロット
        axes[0].plot(actual.index, actual.values, label='実績', color='blue')
        axes[0].plot(forecast.index, forecast['forecast'], 
                    label='予測', color='red', linestyle='--')
        axes[0].fill_between(forecast.index, 
                           forecast['lower'], 
                           forecast['upper'], 
                           alpha=0.3, color='red')
        axes[0].set_title('予測 vs 実績')
        axes[0].legend()
        axes[0].grid(True, alpha=0.3)

        # 誤差プロット
        errors = actual.values - forecast['forecast'].values
        axes[1].plot(forecast.index, errors, color='green')
        axes[1].axhline(y=0, color='black', linestyle='-', alpha=0.5)
        axes[1].set_title('予測誤差')
        axes[1].grid(True, alpha=0.3)

        plt.tight_layout()
        plt.show()

        # 誤差統計
        print("=== 予測精度 ===")
        print(f"MAE: {np.mean(np.abs(errors)):.4f}")
        print(f"RMSE: {np.sqrt(np.mean(errors**2)):.4f}")
        print(f"MAPE: {np.mean(np.abs(errors / actual.values)) * 100:.2f}%")

5. 実践例:Bitcoin価格予測

# 完全な実装例
def run_bitcoin_analysis():
    """Bitcoin価格の完全な分析"""

    # 1. データの準備
    forecaster = CryptoARIMAForecaster('BTC-USD')
    data = forecaster.fetch_data('2020-01-01', '2023-12-31')

    # 2. 時系列分析
    analyzer = CryptoTimeSeriesAnalyzer(data['log_close'])

    # 3. 定常性チェック
    stationary_series, d = analyzer.make_stationary(data['log_close'])

    # 4. モデル選択
    train, test = forecaster.train_test_split(test_size=0.2)

    # 5. グリッドサーチ
    param_grid = [
        ((p, d, q), None) 
        for p in range(5) 
        for d in range(3) 
        for q in range(5)
    ]

    grid_search = HyperGridSearch(train)
    results_df = grid_search.parallel_grid_search(
        train, test, param_grid, n_jobs=-1
    )

    print("\n=== トップ5モデル ===")
    print(results_df.head())

    # 6. 最適モデルで予測
    best_order = results_df.iloc[0]['order']
    final_model = ARIMA(train, order=best_order)
    final_results = final_model.fit()

    # 7. 予測
    forecast_horizon = len(test)
    forecast = final_results.forecast(steps=forecast_horizon)

    # 8. 診断
    diagnostics = ModelDiagnostics(final_results)
    diagnostics.check_residuals()

    # 9. バックテスト
    backtest_results = forecaster.backtest_strategy(
        forecast, test, initial_capital=10000
    )

    print(f"\n=== バックテスト結果 ===")
    print(f"初期資本: $10,000")
    print(f"最終資本: ${backtest_results['final_capital']:,.2f}")
    print(f"総リターン: {backtest_results['total_return']:.2%}")

    return final_results, forecast, backtest_results

if __name__ == "__main__":
    results, forecast, backtest = run_bitcoin_analysis()

6. 高度な手法

6.1 外生変数を含むARIMAX

class ARIMAXModel:
    """外生変数を含むARIMAモデル"""

    def __init__(self):
        self.model = None

    def prepare_exogenous_variables(self, data):
        """外生変数の準備"""
        exog = pd.DataFrame()

        # 取引量
        exog['volume'] = np.log(data['Volume'] + 1)

        # テクニカル指標
        exog['rsi'] = self.calculate_rsi(data['Close'])
        exog['macd'] = self.calculate_macd(data['Close'])[0]

        # マーケット指標
        exog['volatility'] = data['Close'].pct_change().rolling(20).std()

        # 曜日ダミー変数
        for i in range(7):
            exog[f'weekday_{i}'] = (data.index.dayofweek == i).astype(int)

        return exog

    def fit(self, endog, exog, order):
        """ARIMAXモデルのフィット"""
        self.model = SARIMAX(
            endog=endog,
            exog=exog,
            order=order,
            enforce_stationarity=False,
            enforce_invertibility=False
        )
        self.results = self.model.fit(disp=False)

        return self.results

6.2 動的ファクターモデル

class DynamicFactorARIMA:
    """動的ファクターを組み込んだARIMAモデル"""

    def __init__(self):
        self.factor_model = None
        self.arima_model = None

    def extract_factors(self, returns_matrix, n_factors=3):
        """主要ファクターの抽出"""
        from sklearn.decomposition import PCA

        # PCAでファクター抽出
        pca = PCA(n_components=n_factors)
        factors = pca.fit_transform(returns_matrix)

        # ファクターの寄与率
        print(f"説明分散比: {pca.explained_variance_ratio_}")

        return factors, pca

    def build_factor_arima(self, target_returns, factors, max_lag=5):
        """ファクターを含むARIMAモデル"""
        # ファクターの自己回帰項を含む
        X = []
        y = []

        for t in range(max_lag, len(target_returns)):
            # ファクターのラグ
            factor_lags = []
            for lag in range(1, max_lag + 1):
                factor_lags.extend(factors[t - lag])

            X.append(factor_lags)
            y.append(target_returns[t])

        X = np.array(X)
        y = np.array(y)

        # ARIMAモデルにファクターを組み込む
        model = SARIMAX(
            endog=y,
            exog=X,
            order=(1, 0, 1)
        )

        results = model.fit()

        return results

6.3 ベイズ構造時系列モデル

class BayesianStructuralTimeSeries:
    """ベイズ構造時系列モデル(Prophet風)"""

    def __init__(self):
        self.components = {}

    def add_trend_component(self, series):
        """トレンド成分の追加"""
        from statsmodels.tsa.filters.hp_filter import hpfilter

        # Hodrick-Prescottフィルタでトレンド抽出
        trend, cycle = hpfilter(series, lamb=1600)

        self.components['trend'] = trend
        self.components['cycle'] = cycle

    def add_seasonal_component(self, series, period):
        """季節成分の追加"""
        from statsmodels.tsa.seasonal import seasonal_decompose

        decomposition = seasonal_decompose(
            series, 
            model='additive', 
            period=period
        )

        self.components['seasonal'] = decomposition.seasonal
        self.components['residual'] = decomposition.resid

    def forecast_components(self, horizon):
        """各成分の予測"""
        forecasts = {}

        # トレンド予測(線形外挿)
        trend = self.components['trend']
        trend_model = np.polyfit(range(len(trend)), trend, 1)
        future_trend = np.polyval(
            trend_model, 
            range(len(trend), len(trend) + horizon)
        )
        forecasts['trend'] = future_trend

        # 季節成分(繰り返し)
        seasonal = self.components['seasonal']
        period = len(seasonal[seasonal.notna()].unique())
        future_seasonal = np.tile(
            seasonal[-period:], 
            (horizon // period) + 1
        )[:horizon]
        forecasts['seasonal'] = future_seasonal

        # 合成予測
        total_forecast = forecasts['trend'] + forecasts['seasonal']

        return total_forecast, forecasts

7. モデルの比較と選択

7.1 モデル比較フレームワーク

class ModelComparison:
    """複数モデルの比較"""

    def __init__(self):
        self.models = {}
        self.results = {}

    def add_model(self, name, model_class, params):
        """モデルの追加"""
        self.models[name] = {
            'class': model_class,
            'params': params
        }

    def compare_models(self, train, test):
        """モデルの比較評価"""
        comparison_results = []

        for name, model_info in self.models.items():
            try:
                # モデルの学習
                model = model_info['class'](**model_info['params'])
                fitted = model.fit(train)

                # 予測
                forecast = fitted.forecast(steps=len(test))

                # 評価指標
                mae = np.mean(np.abs(forecast - test))
                rmse = np.sqrt(np.mean((forecast - test) ** 2))
                mape = np.mean(np.abs((test - forecast) / test)) * 100

                # 情報量基準
                aic = fitted.aic if hasattr(fitted, 'aic') else np.nan
                bic = fitted.bic if hasattr(fitted, 'bic') else np.nan

                comparison_results.append({
                    'model': name,
                    'mae': mae,
                    'rmse': rmse,
                    'mape': mape,
                    'aic': aic,
                    'bic': bic
                })

            except Exception as e:
                print(f"モデル {name} でエラー: {e}")

        # 結果をデータフレームに
        results_df = pd.DataFrame(comparison_results)
        results_df = results_df.sort_values('rmse')

        return results_df

    def plot_forecast_comparison(self, train, test, horizon):
        """予測結果の比較プロット"""
        plt.figure(figsize=(14, 8))

        # 実データ
        plt.plot(train.index[-100:], train.values[-100:], 
                label='訓練データ', color='black', alpha=0.7)
        plt.plot(test.index, test.values, 
                label='実データ', color='black', linewidth=2)

        # 各モデルの予測
        colors = ['blue', 'red', 'green', 'orange', 'purple']

        for i, (name, model_info) in enumerate(self.models.items()):
            try:
                model = model_info['class'](**model_info['params'])
                fitted = model.fit(train)
                forecast = fitted.forecast(steps=horizon)

                plt.plot(test.index[:horizon], forecast, 
                        label=f'{name}予測', 
                        color=colors[i % len(colors)],
                        linestyle='--')

            except:
                pass

        plt.xlabel('日付')
        plt.ylabel('価格(対数)')
        plt.title('モデル予測の比較')
        plt.legend()
        plt.grid(True, alpha=0.3)
        plt.show()

8. 実装上の注意点とベストプラクティス

8.1 データの前処理

  1. 対数変換: 価格データは対数変換で安定化
  2. 差分処理: トレンドの除去
  3. 外れ値処理: 異常な価格スパイクの処理
  4. 欠損値: 取引所メンテナンス時のデータ補完

8.2 モデル選択の指針

8.3 評価と検証

def rolling_forecast_evaluation(model_class, data, window_size=100, step_size=1):
    """ローリングウィンドウでの評価"""
    forecasts = []
    actuals = []

    for i in range(window_size, len(data) - step_size, step_size):
        # 訓練データ
        train = data[i-window_size:i]

        # モデル学習と予測
        model = model_class(train)
        forecast = model.forecast(step_size)

        forecasts.append(forecast[0])
        actuals.append(data[i])

    # 累積誤差の計算
    errors = np.array(actuals) - np.array(forecasts)

    return {
        'mae': np.mean(np.abs(errors)),
        'rmse': np.sqrt(np.mean(errors**2)),
        'directional_accuracy': np.mean(
            np.sign(np.diff(forecasts)) == np.sign(np.diff(actuals))
        )
    }

9. まとめ

ARIMAとSARIMAモデルは暗号通貨価格予測において以下の特徴があります:

利点
- 統計的に確立された手法
- 解釈可能性が高い
- 計算効率が良い
- 短期予測に有効

制限
- 線形性の仮定
- 定常性が必要
- 複雑なパターンの捕捉が困難
- 外部ショックへの対応が弱い

推奨される使用場面
1. 短期的な価格トレンド予測
2. ボラティリティが安定している期間
3. 他の機械学習モデルのベースライン
4. リスク管理のための信頼区間推定

実際の取引システムでは、ARIMAモデルを他の手法(機械学習、テクニカル分析)と組み合わせて使用することが推奨されます。