ML Documentation

Point Process Neural Networks (PPNN) による不規則時系列予測

概要

Point Process Neural Networks(PPNN)は、不規則な時間間隔で発生するイベントを直接モデル化する手法です。金融市場のtickデータのように、取引が不規則なタイミングで発生するデータに特に適しています。従来の時系列モデルと異なり、PPNNは「いつ」イベントが発生するかと「何が」起こるかを同時にモデル化します。

1. Point Processの基礎

1.1 Point Processとは

Point Processは、時間軸上でランダムに発生するイベントの数学的モデルです:

1.2 金融市場での応用

import numpy as np
import pandas as pd
from dataclasses import dataclass
from typing import List, Tuple

@dataclass
class TradeEvent:
    """取引イベントの表現"""
    timestamp: float  # Unix timestamp
    price: float
    volume: float
    side: str  # 'buy' or 'sell'

class PointProcess:
    """基本的なPoint Processの実装"""
    def __init__(self, events: List[TradeEvent]):
        self.events = sorted(events, key=lambda x: x.timestamp)
        self.times = np.array([e.timestamp for e in events])

    def counting_process(self, t: float) -> int:
        """時刻tまでのイベント数"""
        return np.sum(self.times <= t)

    def intensity_estimate(self, t: float, bandwidth: float = 60.0) -> float:
        """カーネル密度推定による強度関数の推定"""
        # ガウシアンカーネル
        kernel_values = np.exp(-0.5 * ((self.times - t) / bandwidth) ** 2)
        return np.sum(kernel_values) / (bandwidth * np.sqrt(2 * np.pi))

2. Neural Hawkes Process

2.1 Hawkes Processの基礎

Hawkes Processは自己励起性を持つPoint Processで、過去のイベントが将来のイベント発生率を増加させます:

class HawkesProcess:
    """Neural Hawkes Processの実装"""
    def __init__(self, base_intensity: float = 0.1):
        self.base_intensity = base_intensity
        self.decay_rate = 1.0

    def intensity(self, t: float, history: List[float]) -> float:
        """
        条件付き強度関数
        λ*(t) = μ + Σ α * exp(-β(t - t_i))
        """
        intensity = self.base_intensity

        for t_i in history:
            if t_i < t:
                intensity += self.excitation_function(t - t_i)

        return intensity

    def excitation_function(self, delta_t: float) -> float:
        """励起関数(ニューラルネットで学習可能)"""
        return 0.8 * np.exp(-self.decay_rate * delta_t)

2.2 Neural Hawkes Processの実装

import torch
import torch.nn as nn

class NeuralHawkesProcess(nn.Module):
    """
    ニューラルネットワークによるHawkes Process
    """
    def __init__(self, hidden_dim: int = 64, history_dim: int = 32):
        super().__init__()

        # 履歴エンコーダー(LSTM)
        self.history_encoder = nn.LSTM(
            input_size=4,  # time_diff, price, volume, side
            hidden_size=history_dim,
            num_layers=2,
            batch_first=True
        )

        # 強度関数ネットワーク
        self.intensity_net = nn.Sequential(
            nn.Linear(history_dim + 1, hidden_dim),  # +1 for current time
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1),
            nn.Softplus()  # 強度は常に正
        )

        # 価格予測ネットワーク
        self.price_net = nn.Sequential(
            nn.Linear(history_dim + 1, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1)
        )

    def forward(self, history_sequence, current_time):
        """
        Parameters:
        -----------
        history_sequence: (batch, seq_len, 4) - 過去の取引履歴
        current_time: (batch, 1) - 現在時刻

        Returns:
        --------
        intensity: (batch, 1) - 条件付き強度
        price_pred: (batch, 1) - 予測価格
        """
        # 履歴をエンコード
        _, (h_n, _) = self.history_encoder(history_sequence)
        history_encoding = h_n[-1]  # 最後の隠れ状態

        # 現在時刻と結合
        combined = torch.cat([history_encoding, current_time], dim=1)

        # 強度と価格を予測
        intensity = self.intensity_net(combined)
        price_pred = self.price_net(combined)

        return intensity, price_pred

    def log_likelihood(self, events, T):
        """
        最大尤度推定のための対数尤度
        """
        ll = 0.0

        # 各イベントでの対数強度
        for i, event in enumerate(events[1:], 1):
            history = events[:i]
            intensity, _ = self.forward(history, event.timestamp)
            ll += torch.log(intensity + 1e-8)

        # 積分項(数値積分)
        num_samples = 100
        sample_times = torch.linspace(0, T, num_samples)
        intensities = []

        for t in sample_times:
            history = [e for e in events if e.timestamp < t]
            if history:
                intensity, _ = self.forward(history, t)
                intensities.append(intensity)

        integral = torch.trapz(torch.stack(intensities), sample_times)
        ll -= integral

        return ll

3. Transformer Hawkes Process (THP)

3.1 アーキテクチャ

class TransformerHawkesProcess(nn.Module):
    """
    Transformerを使用したHawkes Process
    """
    def __init__(self, d_model: int = 256, n_heads: int = 8, 
                 n_layers: int = 4, max_seq_len: int = 1000):
        super().__init__()

        # 時間エンコーディング
        self.time_encoding = TimeEncoding(d_model)

        # イベントエンベディング
        self.event_embedding = nn.Linear(4, d_model)  # price, volume, side, time_diff

        # Transformer
        self.transformer = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(
                d_model=d_model,
                nhead=n_heads,
                dim_feedforward=d_model * 4,
                dropout=0.1
            ),
            num_layers=n_layers
        )

        # 出力層
        self.intensity_head = nn.Sequential(
            nn.Linear(d_model, d_model // 2),
            nn.ReLU(),
            nn.Linear(d_model // 2, 1),
            nn.Softplus()
        )

        self.price_head = nn.Sequential(
            nn.Linear(d_model, d_model // 2),
            nn.ReLU(),
            nn.Linear(d_model // 2, 1)
        )

    def forward(self, event_sequence, query_times):
        """
        event_sequence: (batch, seq_len, features)
        query_times: (batch, n_queries)
        """
        # イベントをエンベディング
        event_embeddings = self.event_embedding(event_sequence)

        # 時間エンコーディングを追加
        times = event_sequence[:, :, -1]  # time_diff features
        time_embeddings = self.time_encoding(times)
        embeddings = event_embeddings + time_embeddings

        # Transformerでエンコード
        encoded = self.transformer(embeddings)

        # クエリ時刻での予測
        predictions = []
        for query_time in query_times:
            # 最も関連する履歴を注意機構で選択
            attn_weights = self.compute_temporal_attention(encoded, query_time)
            context = torch.sum(encoded * attn_weights.unsqueeze(-1), dim=1)

            # 強度と価格を予測
            intensity = self.intensity_head(context)
            price = self.price_head(context)
            predictions.append((intensity, price))

        return predictions

3.2 時間エンコーディング

class TimeEncoding(nn.Module):
    """
    連続時間のための学習可能な時間エンコーディング
    """
    def __init__(self, d_model: int):
        super().__init__()
        self.d_model = d_model
        self.w = nn.Parameter(torch.randn(1, d_model // 2))
        self.b = nn.Parameter(torch.randn(1, d_model // 2))

    def forward(self, times):
        """
        times: (batch, seq_len)
        """
        times = times.unsqueeze(-1)  # (batch, seq_len, 1)

        # 基底関数の線形結合
        angles = times * self.w + self.b  # (batch, seq_len, d_model // 2)

        # sin/cos変換
        encoding = torch.cat([
            torch.sin(angles),
            torch.cos(angles)
        ], dim=-1)

        return encoding

4. 実践的な実装

4.1 データ準備

class TickDataProcessor:
    """
    tickデータをPPNN用に前処理
    """
    def __init__(self, lookback_window: int = 100):
        self.lookback_window = lookback_window
        self.price_scaler = None
        self.volume_scaler = None

    def prepare_sequences(self, tick_df):
        """
        tickデータからシーケンスを作成
        """
        sequences = []

        # 正規化
        tick_df['price_normalized'] = (tick_df['price'] - tick_df['price'].mean()) / tick_df['price'].std()
        tick_df['volume_normalized'] = np.log1p(tick_df['volume'])
        tick_df['side_encoded'] = tick_df['side'].map({'buy': 1, 'sell': -1})

        # 時間差分
        tick_df['time_diff'] = tick_df['timestamp'].diff().fillna(0)

        # シーケンス作成
        for i in range(self.lookback_window, len(tick_df)):
            history = tick_df.iloc[i-self.lookback_window:i]
            current = tick_df.iloc[i]

            sequence = history[['time_diff', 'price_normalized', 
                              'volume_normalized', 'side_encoded']].values

            target_time = current['time_diff']
            target_price = current['price_normalized']

            sequences.append({
                'sequence': sequence,
                'target_time': target_time,
                'target_price': target_price
            })

        return sequences

4.2 訓練ループ

def train_ppnn(model, train_data, val_data, epochs=100, lr=1e-3):
    """
    PPNNモデルの訓練
    """
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, patience=10, factor=0.5
    )

    for epoch in range(epochs):
        model.train()
        total_loss = 0

        for batch in train_data:
            optimizer.zero_grad()

            # 予測
            intensity, price_pred = model(
                batch['sequence'], 
                batch['current_time']
            )

            # 損失計算
            # 1. 負の対数尤度(時間予測)
            nll_loss = -model.log_likelihood(batch['events'], batch['T'])

            # 2. 価格予測のMSE
            price_loss = nn.MSELoss()(price_pred, batch['target_price'])

            # 総合損失
            loss = nll_loss + 0.1 * price_loss

            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()

            total_loss += loss.item()

        # 検証
        val_loss = validate(model, val_data)
        scheduler.step(val_loss)

        print(f"Epoch {epoch}: Train Loss = {total_loss/len(train_data):.4f}, "
              f"Val Loss = {val_loss:.4f}")

5. 予測と取引戦略

5.1 次回取引の予測

class PPNNPredictor:
    """
    PPNNを使用した予測器
    """
    def __init__(self, model, horizon=300):  # 5分先まで
        self.model = model
        self.horizon = horizon

    def predict_next_trade(self, history, n_samples=100):
        """
        次回取引の時刻と価格を予測
        """
        self.model.eval()

        with torch.no_grad():
            # モンテカルロサンプリング
            time_samples = []
            price_samples = []

            for _ in range(n_samples):
                # 累積強度関数からサンプリング
                next_time = self.sample_next_event_time(history)

                # その時刻での価格を予測
                _, price = self.model(history, next_time)

                time_samples.append(next_time)
                price_samples.append(price)

            # 統計量
            mean_time = np.mean(time_samples)
            std_time = np.std(time_samples)
            mean_price = np.mean(price_samples)
            std_price = np.std(price_samples)

            return {
                'next_trade_time': mean_time,
                'time_uncertainty': std_time,
                'next_price': mean_price,
                'price_uncertainty': std_price
            }

    def sample_next_event_time(self, history, dt=0.1):
        """
        Thinning algorithmによる次回イベント時刻のサンプリング
        """
        current_time = history[-1].timestamp

        while True:
            # 指数分布から候補時刻をサンプル
            tau = np.random.exponential(1.0)
            candidate_time = current_time + tau

            # その時刻での強度を計算
            intensity, _ = self.model(history, candidate_time)

            # 受理/棄却
            if np.random.random() < intensity.item():
                return candidate_time

            current_time = candidate_time

5.2 取引戦略の実装

class PPNNTradingStrategy:
    """
    PPNN予測に基づく取引戦略
    """
    def __init__(self, predictor, risk_threshold=0.02):
        self.predictor = predictor
        self.risk_threshold = risk_threshold

    def generate_signal(self, history, current_price):
        """
        取引シグナルの生成
        """
        # 予測
        prediction = self.predictor.predict_next_trade(history)

        # 期待リターン
        expected_return = (prediction['next_price'] - current_price) / current_price

        # リスク調整後シグナル
        uncertainty_ratio = prediction['price_uncertainty'] / abs(expected_return)

        if uncertainty_ratio < self.risk_threshold:
            if expected_return > 0.001:  # 0.1%以上の上昇期待
                position_size = 1.0 / (1 + uncertainty_ratio)
                return {'action': 'BUY', 'size': position_size}
            elif expected_return < -0.001:  # 0.1%以上の下落期待
                position_size = 1.0 / (1 + uncertainty_ratio)
                return {'action': 'SELL', 'size': position_size}

        return {'action': 'HOLD', 'size': 0}

6. 評価とバックテスト

6.1 予測精度の評価

def evaluate_ppnn(model, test_data):
    """
    PPNNモデルの評価
    """
    metrics = {
        'time_mae': [],
        'price_mae': [],
        'intensity_calibration': [],
        'price_correlation': []
    }

    for sequence in test_data:
        # 予測
        pred_intensity, pred_price = model(
            sequence['history'], 
            sequence['actual_time']
        )

        # 時間予測の評価(次回取引までの時間)
        pred_next_time = 1.0 / pred_intensity.item()  # 指数分布の期待値
        actual_next_time = sequence['actual_next_time']
        metrics['time_mae'].append(abs(pred_next_time - actual_next_time))

        # 価格予測の評価
        metrics['price_mae'].append(
            abs(pred_price.item() - sequence['actual_price'])
        )

    # 強度関数のキャリブレーション(KSテスト)
    # 省略

    return {
        'time_mae': np.mean(metrics['time_mae']),
        'price_mae': np.mean(metrics['price_mae']),
        'time_rmse': np.sqrt(np.mean(np.array(metrics['time_mae'])**2))
    }

7. 実装のベストプラクティス

7.1 計算効率の最適化

class EfficientPPNN:
    """
    効率的なPPNN実装
    """
    def __init__(self):
        # 計算結果のキャッシュ
        self.intensity_cache = {}
        self.cache_size = 10000

    def compute_intensity_batch(self, histories, times):
        """
        バッチ処理による効率化
        """
        # GPUでのバッチ計算
        with torch.cuda.amp.autocast():  # 混合精度
            intensities = self.model(histories, times)

        return intensities

    def adaptive_sampling(self, history, n_samples):
        """
        適応的サンプリング(重要度サンプリング)
        """
        # 高強度領域により多くサンプル
        intensity_estimate = self.estimate_intensity_profile(history)
        sample_weights = intensity_estimate / intensity_estimate.sum()

        # 重み付きサンプリング
        samples = np.random.choice(
            len(intensity_estimate), 
            size=n_samples, 
            p=sample_weights
        )

        return samples

まとめ

Point Process Neural Networksは、不規則な時間間隔を持つ金融tickデータに対して以下の利点を提供します:

  1. 自然なモデル化: イベントの発生時刻と内容を同時にモデル化
  2. 不確実性定量化: 次回取引までの時間の不確実性を考慮
  3. マイクロストラクチャの保持: 取引パターンの時間的クラスタリングを捕捉
  4. 柔軟な予測: 任意の時点での条件付き予測が可能

特に高頻度取引やマーケットメイキングなど、取引のタイミングが重要な戦略において有効です。