ML Documentation

不規則時系列予測手法の総合ガイド

📊 概要

不規則時系列データ(Irregular Time Series)は、観測間隔が一定でないデータであり、金融取引(tick data)、医療記録、センサーデータなど多くの実世界アプリケーションで見られます。本ドキュメントでは、Point Process Neural Networks以外の効果的な予測手法を包括的にまとめます。

🔍 不規則時系列の課題

従来手法の限界

特有の問題

  1. 時間情報の損失: リサンプリングによる情報劣化
  2. 不均一な情報密度: 活発な時間帯と閑散期の差
  3. イベント駆動的性質: 時間より出来事が重要

🚀 効果的な予測手法

1. Neural ODE(Neural Ordinary Differential Equations)

連続時間ダイナミクスをニューラルネットワークで学習する手法。

import torch
import torch.nn as nn
from torchdiffeq import odeint

class NeuralODE(nn.Module):
    def __init__(self, hidden_dim):
        super().__init__()
        self.func = nn.Sequential(
            nn.Linear(hidden_dim, 64),
            nn.Tanh(),
            nn.Linear(64, hidden_dim)
        )

    def forward(self, t, y):
        """ODEの右辺を定義"""
        return self.func(y)

class IrregularTimeSeriesODE(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super().__init__()
        self.encoder = nn.Linear(input_dim, hidden_dim)
        self.ode_func = NeuralODE(hidden_dim)
        self.decoder = nn.Linear(hidden_dim, output_dim)

    def forward(self, observations, observation_times, prediction_times):
        # 初期状態をエンコード
        h0 = self.encoder(observations[0])

        # ODEを解いて任意の時点での状態を取得
        all_times = torch.cat([observation_times, prediction_times])
        trajectories = odeint(self.ode_func, h0, all_times)

        # 予測時点での出力
        predictions = self.decoder(trajectories[len(observation_times):])
        return predictions

利点:
- 任意の時点での予測が可能
- 連続的な潜在ダイナミクスを学習
- メモリ効率的(Adjoint法)

欠点:
- 計算コストが高い
- 学習が不安定になることがある

2. GRU-D(GRU with Decay)

時間減衰を組み込んだGated Recurrent Unit。

class GRUD(nn.Module):
    def __init__(self, input_dim, hidden_dim):
        super().__init__()
        self.hidden_dim = hidden_dim

        # 標準的なGRUゲート
        self.W_r = nn.Linear(input_dim + hidden_dim, hidden_dim)
        self.W_z = nn.Linear(input_dim + hidden_dim, hidden_dim)
        self.W_h = nn.Linear(input_dim + hidden_dim, hidden_dim)

        # 時間減衰パラメータ
        self.W_gamma = nn.Linear(1, hidden_dim)

    def forward(self, x, h_prev, delta_t):
        """
        x: 現在の入力
        h_prev: 前の隠れ状態
        delta_t: 前回からの経過時間
        """
        # 時間減衰の計算
        gamma = torch.exp(-torch.relu(self.W_gamma(delta_t.unsqueeze(-1))))

        # 減衰を適用した隠れ状態
        h_decay = gamma * h_prev

        # GRUの計算
        combined = torch.cat([x, h_decay], dim=-1)
        r = torch.sigmoid(self.W_r(combined))
        z = torch.sigmoid(self.W_z(combined))

        combined_r = torch.cat([x, r * h_decay], dim=-1)
        h_tilde = torch.tanh(self.W_h(combined_r))

        h = (1 - z) * h_decay + z * h_tilde
        return h

class IrregularGRUD(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super().__init__()
        self.grud = GRUD(input_dim, hidden_dim)
        self.output_layer = nn.Linear(hidden_dim, output_dim)

    def forward(self, observations, times):
        batch_size = observations.size(0)
        h = torch.zeros(batch_size, self.grud.hidden_dim)

        outputs = []
        for i in range(len(times)):
            if i == 0:
                delta_t = torch.zeros(batch_size)
            else:
                delta_t = times[i] - times[i-1]

            h = self.grud(observations[i], h, delta_t)
            output = self.output_layer(h)
            outputs.append(output)

        return torch.stack(outputs)

利点:
- 時間間隔を明示的にモデル化
- GRUの利点を保持
- 実装が比較的簡単

欠点:
- 離散的な時点でのみ予測
- 長期依存関係の学習が困難

3. Temporal Point Process(時間点過程)

イベントの発生時刻と種類を同時にモデル化。

class HawkesProcess(nn.Module):
    """Neural Hawkes Process"""
    def __init__(self, hidden_dim):
        super().__init__()
        # 履歴エンコーダ
        self.history_encoder = nn.LSTM(
            input_size=1, 
            hidden_size=hidden_dim,
            batch_first=True
        )

        # 強度関数
        self.intensity_net = nn.Sequential(
            nn.Linear(hidden_dim + 1, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 1),
            nn.Softplus()  # 正の値を保証
        )

    def compute_intensity(self, history, query_times):
        """任意の時点での強度を計算"""
        # 履歴をエンコード
        _, (h_n, _) = self.history_encoder(history.unsqueeze(-1))
        h_n = h_n.squeeze(0)

        # 各クエリ時点での強度
        intensities = []
        for t in query_times:
            # 時間特徴を追加
            time_feature = (t - history[-1]).unsqueeze(-1)
            features = torch.cat([h_n, time_feature], dim=-1)
            intensity = self.intensity_net(features)
            intensities.append(intensity)

        return torch.stack(intensities)

    def log_likelihood(self, event_times):
        """対数尤度の計算(学習用)"""
        ll = 0
        history = []

        for i, t in enumerate(event_times):
            if i > 0:
                # 強度の積分(数値積分)
                intensity_integral = self._integrate_intensity(
                    history, event_times[i-1], t
                )
                ll -= intensity_integral

            # イベント時点での強度
            if len(history) > 0:
                intensity = self.compute_intensity(
                    torch.tensor(history), 
                    torch.tensor([t])
                )
                ll += torch.log(intensity + 1e-8)

            history.append(t)

        return ll

利点:
- イベントの時間的クラスタリングを捉える
- 自己励起的な性質をモデル化
- 確率的解釈が可能

欠点:
- 強度関数の積分が計算困難
- マーク(イベントの属性)の扱いが複雑

4. Transformer with Time Encoding

時間情報を位置エンコーディングとして扱うTransformer。

class TimeEncodingTransformer(nn.Module):
    def __init__(self, d_model, n_heads, n_layers):
        super().__init__()

        # 時間エンコーディング
        self.time_encoding = TimeEncoding(d_model)

        # Transformer層
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=n_heads,
            dim_feedforward=4*d_model,
            batch_first=True
        )
        self.transformer = nn.TransformerEncoder(
            encoder_layer, 
            num_layers=n_layers
        )

    def forward(self, values, times):
        # 時間エンコーディングを追加
        time_embeddings = self.time_encoding(times)

        # 値と時間情報を結合
        inputs = values + time_embeddings

        # Self-attention with time-aware positions
        output = self.transformer(inputs)
        return output

class TimeEncoding(nn.Module):
    def __init__(self, d_model):
        super().__init__()
        self.d_model = d_model
        self.w = nn.Linear(1, d_model // 2)

    def forward(self, times):
        """連続的な時間エンコーディング"""
        times = times.unsqueeze(-1)

        # 学習可能な周波数での正弦波エンコーディング
        freq = self.w(torch.zeros_like(times))
        args = times * freq

        embeddings = torch.cat([
            torch.sin(args),
            torch.cos(args)
        ], dim=-1)

        return embeddings

利点:
- 長距離依存関係を効率的に学習
- 並列計算が可能
- 任意の時間関係をモデル化

欠点:
- 大量のデータが必要
- 計算コストが高い

5. Set Function Networks

順序不変な集合関数として時系列を扱う。

class DeepSets(nn.Module):
    """Deep Sets for Irregular Time Series"""
    def __init__(self, input_dim, hidden_dim, output_dim):
        super().__init__()

        # 各要素の変換
        self.phi = nn.Sequential(
            nn.Linear(input_dim + 1, hidden_dim),  # +1 for time
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim)
        )

        # 集約後の変換
        self.rho = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, output_dim)
        )

    def forward(self, observations, times, mask=None):
        # 各観測に時間情報を付加
        time_features = times.unsqueeze(-1)
        inputs = torch.cat([observations, time_features], dim=-1)

        # 要素ごとの変換
        embeddings = self.phi(inputs)

        # マスクを適用(欠損値の処理)
        if mask is not None:
            embeddings = embeddings * mask.unsqueeze(-1)

        # 順序不変な集約(平均)
        aggregated = embeddings.mean(dim=1)

        # 最終出力
        output = self.rho(aggregated)
        return output

利点:
- 順序不変性により柔軟
- 可変長入力に対応
- 実装がシンプル

欠点:
- 時間的順序情報が失われやすい
- 複雑な時間依存関係の学習が困難

6. Continuous-time Autoregressive Models

連続時間での自己回帰モデル。

class ContinuousTimeAR(nn.Module):
    def __init__(self, hidden_dim, num_basis=20):
        super().__init__()

        # 基底関数の係数
        self.basis_weights = nn.Parameter(torch.randn(num_basis, hidden_dim))
        self.basis_freq = nn.Parameter(torch.randn(num_basis))

        # 出力層
        self.output_layer = nn.Linear(hidden_dim, 1)

    def kernel(self, delta_t):
        """時間差に基づくカーネル関数"""
        # 指数減衰する正弦波の組み合わせ
        basis = torch.exp(-delta_t.unsqueeze(-1) * torch.relu(self.basis_freq))
        basis = basis * torch.sin(delta_t.unsqueeze(-1) * self.basis_freq)
        return basis @ self.basis_weights

    def forward(self, history_values, history_times, query_times):
        predictions = []

        for query_t in query_times:
            # 過去の各点からの寄与を計算
            contributions = []
            for i, (val, t) in enumerate(zip(history_values, history_times)):
                if t < query_t:
                    delta_t = query_t - t
                    weight = self.kernel(delta_t)
                    contribution = weight * val
                    contributions.append(contribution)

            if contributions:
                # 寄与を集約
                aggregated = torch.stack(contributions).sum(dim=0)
                prediction = self.output_layer(aggregated)
                predictions.append(prediction)
            else:
                predictions.append(torch.zeros(1))

        return torch.stack(predictions)

利点:
- 解釈可能なカーネル関数
- 連続時間での予測
- 理論的基盤が確立

欠点:
- カーネル設計が重要
- 非線形性の表現が限定的

🔧 実装のベストプラクティス

データ前処理

class IrregularTimeSeriesPreprocessor:
    def __init__(self):
        self.scaler = StandardScaler()

    def preprocess(self, times, values):
        # 1. 時間の正規化
        time_diffs = np.diff(times, prepend=times[0])
        normalized_times = (times - times[0]) / (times[-1] - times[0])

        # 2. 値の標準化
        scaled_values = self.scaler.fit_transform(values.reshape(-1, 1))

        # 3. 時間特徴の追加
        time_features = {
            'time_since_start': normalized_times,
            'time_since_last': time_diffs / np.median(time_diffs),
            'log_time_since_last': np.log1p(time_diffs)
        }

        return scaled_values, time_features

モデル選択ガイドライン

手法 適用場面 データ量 計算コスト
Neural ODE 連続的予測が必要 中〜大
GRU-D 欠損値が多い 小〜中
Hawkes Process イベント予測
Time Transformer 長期依存関係
Deep Sets 順序の重要性が低い 小〜中
Continuous AR 解釈性重視 小〜中

評価指標

def evaluate_irregular_predictions(predictions, targets, times):
    """不規則時系列の予測評価"""
    metrics = {}

    # 1. 時間重み付きMAE
    time_weights = 1.0 / (1.0 + times)  # 近い予測を重視
    weighted_mae = np.average(
        np.abs(predictions - targets), 
        weights=time_weights
    )
    metrics['weighted_mae'] = weighted_mae

    # 2. イベント予測精度(閾値超えのタイミング)
    threshold = np.percentile(targets, 90)
    pred_events = predictions > threshold
    true_events = targets > threshold
    event_f1 = f1_score(true_events, pred_events)
    metrics['event_f1'] = event_f1

    # 3. 時間的一貫性
    pred_diff = np.diff(predictions)
    true_diff = np.diff(targets)
    consistency = 1 - np.mean(np.sign(pred_diff) != np.sign(true_diff))
    metrics['temporal_consistency'] = consistency

    return metrics

🚀 プロジェクトへの実装提案

1. ハイブリッドアプローチ

class HybridIrregularPredictor:
    """複数の手法を組み合わせたアンサンブル"""

    def __init__(self):
        self.models = {
            'neural_ode': NeuralODEModel(),
            'gru_d': GRUDModel(),
            'transformer': TimeTransformer()
        }
        self.weights = nn.Parameter(torch.ones(3) / 3)

    def forward(self, observations, obs_times, pred_times):
        predictions = []

        for name, model in self.models.items():
            pred = model(observations, obs_times, pred_times)
            predictions.append(pred)

        # 重み付き平均
        weights = torch.softmax(self.weights, dim=0)
        ensemble_pred = sum(w * p for w, p in zip(weights, predictions))

        return ensemble_pred

2. リアルタイム予測パイプライン

class RealTimeIrregularPredictor:
    def __init__(self, model, buffer_size=1000):
        self.model = model
        self.buffer = deque(maxlen=buffer_size)
        self.last_update = None

    async def update(self, value, timestamp):
        """新しい観測値でバッファを更新"""
        self.buffer.append((timestamp, value))
        self.last_update = timestamp

        # 適応的な予測更新
        if self._should_update_prediction():
            await self._update_predictions()

    def _should_update_prediction(self):
        """予測更新の必要性を判断"""
        if self.last_update is None:
            return False

        # 最後の更新からの経過時間
        time_since_update = time.time() - self.last_update

        # データの到着頻度に基づく適応的更新
        avg_interval = self._estimate_avg_interval()
        return time_since_update > 2 * avg_interval

まとめ

不規則時系列の予測には様々な手法が存在し、それぞれに長所と短所があります。実践的には:

  1. データの性質を理解:イベント駆動か、連続的変化か
  2. 計算リソースを考慮:リアルタイム要件とのバランス
  3. 複数手法の組み合わせ:アンサンブルによる頑健性向上

現在のtickデータプロジェクトでは、GRU-DとTime Transformerの組み合わせが有効と考えられます。