ML Documentation

クラウドネイティブMLパイプライン構築による暗号資産取引システム

1. はじめに

1.1 クラウドネイティブMLパイプラインの必要性

暗号資産取引において、機械学習モデルを本格運用するには以下の課題に対処する必要があります:

1.2 クラウドネイティブアーキテクチャの利点

  1. マイクロサービス化: 各機能を独立してスケール・デプロイ
  2. コンテナ化: 環境の一貫性、ポータビリティ
  3. オーケストレーション: Kubernetesによる自動化
  4. DevOps統合: CI/CDパイプライン、IaC
  5. オブザーバビリティ: メトリクス、ログ、トレーシング

2. アーキテクチャ設計

2.1 全体アーキテクチャ

# architecture-overview.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: architecture-overview
data:
  components: |
    クラウドネイティブMLパイプライン
    ├── データ収集層
    │   ├── 価格データコレクター
    │   ├── オンチェーンデータコレクター
    │   └── ソーシャルデータコレクター
    ├── データ処理層
    │   ├── ストリーミング処理 (Apache Kafka + Spark)
    │   ├── バッチ処理 (Apache Airflow)
    │   └── 特徴量ストア (Feast)
    ├── ML処理層
    │   ├── モデル学習 (Kubeflow)
    │   ├── モデル推論 (KServe)
    │   └── モデル管理 (MLflow)
    ├── 取引実行層
    │   ├── シグナル生成サービス
    │   ├── リスク管理サービス
    │   └── 注文実行サービス
    └── 監視・運用層
        ├── メトリクス収集 (Prometheus)
        ├── ログ管理 (ELK Stack)
        └── 可視化 (Grafana)

2.2 コンテナ化戦略

# Dockerfile for ML Training Service
FROM python:3.10-slim

# システム依存関係
RUN apt-get update && apt-get install -y \
    gcc \
    g++ \
    git \
    && rm -rf /var/lib/apt/lists/*

# Python依存関係
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# アプリケーションコード
COPY src/ /app/src/
COPY config/ /app/config/
WORKDIR /app

# セキュリティ: 非rootユーザー
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser

# ヘルスチェック
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD python -c "import requests; requests.get('http://localhost:8080/health')"

EXPOSE 8080
CMD ["python", "src/training_service.py"]
# docker-compose.yml for local development
version: '3.8'
services:
  redis:
    image: redis:alpine
    ports:
      - "6379:6379"

  postgres:
    image: postgres:14
    environment:
      POSTGRES_DB: crypto_ml
      POSTGRES_USER: user
      POSTGRES_PASSWORD: password
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data

  kafka:
    image: confluentinc/cp-kafka:latest
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper

  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  mlflow:
    image: mlflow/mlflow:latest
    ports:
      - "5000:5000"
    environment:
      MLFLOW_BACKEND_STORE_URI: postgresql://user:password@postgres:5432/crypto_ml
    command: >
      mlflow server 
      --backend-store-uri postgresql://user:password@postgres:5432/crypto_ml
      --default-artifact-root s3://mlflow-artifacts
      --host 0.0.0.0
      --port 5000
    depends_on:
      - postgres

volumes:
  postgres_data:

3. データ収集・処理パイプライン

3.1 Kafkaによるリアルタイムストリーミング

# kafka_producer.py
import asyncio
import json
from kafka import KafkaProducer
from typing import Dict, Any
import websockets
import logging

class CryptoDataProducer:
    def __init__(self, kafka_config: Dict[str, Any]):
        self.producer = KafkaProducer(
            bootstrap_servers=kafka_config['servers'],
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            key_serializer=lambda k: k.encode('utf-8') if k else None,
            retries=5,
            acks='all',
            enable_idempotence=True,
            batch_size=16384,
            linger_ms=10,
            buffer_memory=33554432,
            compression_type='gzip'
        )

    async def collect_price_data(self):
        """価格データをWebSocketから収集してKafkaに送信"""
        uri = "wss://stream.binance.com:9443/ws/btcusdt@ticker"

        async with websockets.connect(uri) as websocket:
            async for message in websocket:
                data = json.loads(message)

                # データの前処理
                processed_data = {
                    'symbol': data['s'],
                    'price': float(data['c']),
                    'volume': float(data['v']),
                    'change_percent': float(data['P']),
                    'timestamp': int(data['E']),
                    'source': 'binance'
                }

                # Kafkaに送信
                self.producer.send(
                    'crypto-price-data',
                    key=data['s'],
                    value=processed_data
                )

                # バッチ送信
                self.producer.flush()

    async def collect_onchain_data(self):
        """オンチェーンデータを収集"""
        # Ethereum ノードから定期的にデータ取得
        while True:
            try:
                block_data = await self._get_latest_block()

                self.producer.send(
                    'onchain-data',
                    key='ethereum',
                    value=block_data
                )

                await asyncio.sleep(15)  # 15秒間隔

            except Exception as e:
                logging.error(f"オンチェーンデータ収集エラー: {e}")
                await asyncio.sleep(60)

    async def _get_latest_block(self) -> Dict[str, Any]:
        """最新ブロック情報を取得"""
        from web3 import Web3

        w3 = Web3(Web3.HTTPProvider('https://mainnet.infura.io/v3/YOUR_KEY'))
        latest_block = w3.eth.get_block('latest', full_transactions=True)

        return {
            'block_number': latest_block.number,
            'timestamp': latest_block.timestamp,
            'transaction_count': len(latest_block.transactions),
            'gas_used': latest_block.gasUsed,
            'gas_limit': latest_block.gasLimit,
            'difficulty': latest_block.difficulty,
            'total_difficulty': latest_block.totalDifficulty
        }

3.2 Apache Airflowによるバッチ処理

# airflow_dags/crypto_ml_pipeline.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
import pandas as pd

default_args = {
    'owner': 'crypto-ml-team',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'catchup': False
}

dag = DAG(
    'crypto_ml_pipeline',
    default_args=default_args,
    description='暗号資産MLパイプライン',
    schedule_interval='@hourly',
    max_active_runs=1,
    tags=['crypto', 'ml', 'training']
)

def extract_features(**context):
    """特徴量抽出タスク"""
    from src.feature_engineering import FeatureExtractor

    extractor = FeatureExtractor()

    # 過去24時間のデータを処理
    end_time = context['execution_date']
    start_time = end_time - timedelta(hours=24)

    features = extractor.extract_features(start_time, end_time)

    # 特徴量ストアに保存
    extractor.save_to_feature_store(features)

    return f"特徴量抽出完了: {len(features)} 件"

def train_model(**context):
    """モデル学習タスク"""
    from src.model_training import ModelTrainer

    trainer = ModelTrainer()

    # 特徴量を読み込み
    features = trainer.load_features_from_store()

    # モデル学習
    model, metrics = trainer.train(features)

    # MLflowに記録
    trainer.log_to_mlflow(model, metrics)

    return metrics

def validate_model(**context):
    """モデル検証タスク"""
    from src.model_validation import ModelValidator

    validator = ModelValidator()

    # 最新モデルを検証
    validation_results = validator.validate_latest_model()

    # 閾値チェック
    if validation_results['accuracy'] < 0.6:
        raise ValueError(f"モデル精度が低すぎます: {validation_results['accuracy']}")

    return validation_results

def deploy_model(**context):
    """モデルデプロイタスク"""
    from src.model_deployment import ModelDeployer

    deployer = ModelDeployer()

    # Blue-Greenデプロイメント
    deployment_result = deployer.blue_green_deploy()

    return deployment_result

# タスク定義
extract_task = PythonOperator(
    task_id='extract_features',
    python_callable=extract_features,
    dag=dag
)

train_task = PythonOperator(
    task_id='train_model',
    python_callable=train_model,
    dag=dag
)

validate_task = PythonOperator(
    task_id='validate_model',
    python_callable=validate_model,
    dag=dag
)

deploy_task = PythonOperator(
    task_id='deploy_model',
    python_callable=deploy_model,
    dag=dag
)

# データ品質チェック
data_quality_check = BashOperator(
    task_id='data_quality_check',
    bash_command='''
    python /app/src/data_quality_check.py \
        --date {{ ds }} \
        --threshold 0.95
    ''',
    dag=dag
)

# 依存関係設定
data_quality_check >> extract_task >> train_task >> validate_task >> deploy_task

3.3 特徴量ストア(Feast)

# feature_store/crypto_features.py
from feast import Entity, Feature, FeatureView, Field, ValueType
from feast.types import Float64, Int64, String
from datetime import timedelta

# エンティティ定義
crypto_symbol = Entity(
    name="crypto_symbol",
    description="暗号通貨シンボル",
    value_type=ValueType.STRING
)

# 特徴量ビュー定義
price_features = FeatureView(
    name="price_features",
    entities=["crypto_symbol"],
    ttl=timedelta(days=7),
    schema=[
        Field(name="price", dtype=Float64),
        Field(name="volume", dtype=Float64),
        Field(name="sma_5", dtype=Float64),
        Field(name="sma_20", dtype=Float64),
        Field(name="rsi", dtype=Float64),
        Field(name="volatility", dtype=Float64),
        Field(name="momentum", dtype=Float64)
    ],
    source="crypto_price_source"
)

onchain_features = FeatureView(
    name="onchain_features",
    entities=["crypto_symbol"],
    ttl=timedelta(days=7),
    schema=[
        Field(name="active_addresses", dtype=Int64),
        Field(name="transaction_count", dtype=Int64),
        Field(name="network_hash_rate", dtype=Float64),
        Field(name="nvt_ratio", dtype=Float64),
        Field(name="mvrv_ratio", dtype=Float64)
    ],
    source="onchain_data_source"
)

# 特徴量取得クライアント
class FeatureStoreClient:
    def __init__(self):
        from feast import FeatureStore
        self.store = FeatureStore(repo_path=".")

    def get_online_features(self, symbol: str) -> dict:
        """オンライン特徴量取得(リアルタイム推論用)"""
        feature_vector = self.store.get_online_features(
            features=[
                'price_features:price',
                'price_features:volume',
                'price_features:sma_5',
                'price_features:sma_20',
                'price_features:rsi',
                'price_features:volatility',
                'onchain_features:active_addresses',
                'onchain_features:nvt_ratio'
            ],
            entity_rows=[{"crypto_symbol": symbol}]
        ).to_dict()

        return feature_vector

    def get_historical_features(self, symbols: list, start_date: str, end_date: str):
        """履歴特徴量取得(学習用)"""
        entity_df = pd.DataFrame({
            "crypto_symbol": symbols,
            "event_timestamp": pd.to_datetime([end_date] * len(symbols))
        })

        training_df = self.store.get_historical_features(
            entity_df=entity_df,
            features=[
                'price_features:price',
                'price_features:volume',
                'price_features:sma_5',
                'price_features:sma_20',
                'price_features:rsi',
                'price_features:volatility',
                'onchain_features:active_addresses',
                'onchain_features:nvt_ratio'
            ]
        ).to_df()

        return training_df

4. MLモデル管理とデプロイメント

4.1 Kubeflowによる学習パイプライン

# kubeflow_pipeline/training_pipeline.py
import kfp
from kfp import dsl
from kfp.components import InputPath, OutputPath

@dsl.component(
    base_image='python:3.10',
    packages_to_install=['pandas', 'scikit-learn', 'mlflow']
)
def data_preprocessing(
    input_data_path: InputPath(str),
    output_data_path: OutputPath(str),
    train_test_split_ratio: float = 0.8
):
    """データ前処理コンポーネント"""
    import pandas as pd
    from sklearn.model_selection import train_test_split
    import pickle

    # データ読み込み
    data = pd.read_parquet(input_data_path)

    # 前処理
    # 欠損値処理
    data = data.fillna(method='ffill')

    # 外れ値除去
    Q1 = data.quantile(0.25)
    Q3 = data.quantile(0.75)
    IQR = Q3 - Q1
    data = data[~((data < (Q1 - 1.5 * IQR)) | (data > (Q3 + 1.5 * IQR))).any(axis=1)]

    # 特徴量とターゲットを分離
    features = data.drop(['target'], axis=1)
    target = data['target']

    # 訓練・テストデータ分割
    X_train, X_test, y_train, y_test = train_test_split(
        features, target, test_size=1-train_test_split_ratio, random_state=42
    )

    # 結果を保存
    processed_data = {
        'X_train': X_train,
        'X_test': X_test,
        'y_train': y_train,
        'y_test': y_test
    }

    with open(output_data_path, 'wb') as f:
        pickle.dump(processed_data, f)

@dsl.component(
    base_image='python:3.10',
    packages_to_install=['pandas', 'scikit-learn', 'mlflow', 'xgboost']
)
def model_training(
    processed_data_path: InputPath(str),
    model_output_path: OutputPath(str),
    learning_rate: float = 0.1,
    max_depth: int = 6,
    n_estimators: int = 100
):
    """モデル学習コンポーネント"""
    import pickle
    import mlflow
    import mlflow.xgboost
    from xgboost import XGBRegressor
    from sklearn.metrics import mean_squared_error, r2_score

    # データ読み込み
    with open(processed_data_path, 'rb') as f:
        data = pickle.load(f)

    X_train, X_test = data['X_train'], data['X_test']
    y_train, y_test = data['y_train'], data['y_test']

    # MLflow実験開始
    mlflow.start_run()

    # モデル学習
    model = XGBRegressor(
        learning_rate=learning_rate,
        max_depth=max_depth,
        n_estimators=n_estimators,
        random_state=42
    )

    model.fit(X_train, y_train)

    # 予測と評価
    y_pred = model.predict(X_test)
    mse = mean_squared_error(y_test, y_pred)
    r2 = r2_score(y_test, y_pred)

    # MLflowにログ
    mlflow.log_params({
        'learning_rate': learning_rate,
        'max_depth': max_depth,
        'n_estimators': n_estimators
    })

    mlflow.log_metrics({
        'mse': mse,
        'r2': r2
    })

    mlflow.xgboost.log_model(model, "model")

    # モデル保存
    with open(model_output_path, 'wb') as f:
        pickle.dump(model, f)

    mlflow.end_run()

@dsl.component(
    base_image='python:3.10',
    packages_to_install=['pandas', 'scikit-learn']
)
def model_validation(
    model_path: InputPath(str),
    processed_data_path: InputPath(str),
    validation_threshold: float = 0.6
) -> str:
    """モデル検証コンポーネント"""
    import pickle
    from sklearn.metrics import mean_squared_error, r2_score

    # モデルとデータ読み込み
    with open(model_path, 'rb') as f:
        model = pickle.load(f)

    with open(processed_data_path, 'rb') as f:
        data = pickle.load(f)

    X_test, y_test = data['X_test'], data['y_test']

    # 予測と評価
    y_pred = model.predict(X_test)
    r2 = r2_score(y_test, y_pred)

    # 検証
    if r2 < validation_threshold:
        raise ValueError(f"モデル性能が基準を下回ります: R² = {r2}")

    return f"検証成功: R² = {r2}"

@dsl.pipeline(
    name='crypto-ml-training-pipeline',
    description='暗号資産ML学習パイプライン'
)
def crypto_training_pipeline(
    input_data_path: str = "gs://crypto-ml-data/raw_data.parquet",
    learning_rate: float = 0.1,
    max_depth: int = 6,
    n_estimators: int = 100
):
    """学習パイプライン定義"""

    # データ前処理
    preprocessing_task = data_preprocessing(
        input_data_path=input_data_path
    )

    # モデル学習
    training_task = model_training(
        processed_data_path=preprocessing_task.outputs['output_data_path'],
        learning_rate=learning_rate,
        max_depth=max_depth,
        n_estimators=n_estimators
    )

    # モデル検証
    validation_task = model_validation(
        model_path=training_task.outputs['model_output_path'],
        processed_data_path=preprocessing_task.outputs['output_data_path']
    )

# パイプライン実行
if __name__ == '__main__':
    kfp.compiler.Compiler().compile(
        pipeline_func=crypto_training_pipeline,
        package_path='crypto_training_pipeline.yaml'
    )

4.2 KServeによるモデル推論サービス

# inference_service/model_server.py
import asyncio
import json
from typing import Dict, List
import torch
import mlflow.pytorch
from kserve import Model, ModelServer
import numpy as np
import pandas as pd

class CryptoMLModel(Model):
    def __init__(self, name: str):
        super().__init__(name)
        self.name = name
        self.model = None
        self.feature_store_client = None
        self.ready = False

    def load(self):
        """モデルロード"""
        try:
            # MLflowからモデル読み込み
            model_uri = f"models:/{self.name}/Production"
            self.model = mlflow.pytorch.load_model(model_uri)

            # 特徴量ストアクライアント初期化
            from feature_store import FeatureStoreClient
            self.feature_store_client = FeatureStoreClient()

            self.ready = True
            print(f"モデル {self.name} のロードが完了しました")

        except Exception as e:
            print(f"モデルロードエラー: {e}")
            raise

    async def predict(self, payload: Dict) -> Dict:
        """推論実行"""
        if not self.ready:
            raise RuntimeError("モデルがロードされていません")

        try:
            # 入力データから特徴量を取得
            symbol = payload.get('symbol', 'BTCUSDT')

            # 特徴量ストアから最新特徴量を取得
            features = self.feature_store_client.get_online_features(symbol)

            # 特徴量をテンソルに変換
            feature_tensor = self._prepare_features(features)

            # 推論実行
            with torch.no_grad():
                prediction = self.model(feature_tensor)

            # 結果の後処理
            result = {
                'symbol': symbol,
                'prediction': float(prediction.item()),
                'timestamp': pd.Timestamp.now().isoformat(),
                'model_version': self._get_model_version(),
                'confidence': self._calculate_confidence(prediction, features)
            }

            return result

        except Exception as e:
            return {
                'error': str(e),
                'symbol': payload.get('symbol', 'unknown'),
                'timestamp': pd.Timestamp.now().isoformat()
            }

    def _prepare_features(self, features: Dict) -> torch.Tensor:
        """特徴量をモデル入力用に変換"""
        feature_vector = []

        # 必要な特徴量を順序通りに抽出
        required_features = [
            'price', 'volume', 'sma_5', 'sma_20', 'rsi', 
            'volatility', 'active_addresses', 'nvt_ratio'
        ]

        for feature in required_features:
            value = features.get(feature, 0.0)
            feature_vector.append(float(value))

        return torch.tensor(feature_vector, dtype=torch.float32).unsqueeze(0)

    def _get_model_version(self) -> str:
        """モデルバージョンを取得"""
        return "v1.0.0"  # 実際にはMLflowから取得

    def _calculate_confidence(self, prediction: torch.Tensor, features: Dict) -> float:
        """予測信頼度を計算"""
        # 簡単な信頼度計算(実際はより複雑な計算を実装)
        volatility = features.get('volatility', 0.02)
        confidence = max(0.1, 1.0 - (volatility * 10))
        return min(1.0, confidence)

# サーバー起動
if __name__ == "__main__":
    model = CryptoMLModel("crypto-price-predictor")
    ModelServer().start([model])
# kserve-deployment.yaml
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
  name: crypto-ml-model
  namespace: default
spec:
  predictor:
    serviceAccountName: kserve-sa
    model:
      modelFormat:
        name: pytorch
      storageUri: "s3://mlflow-artifacts/models/crypto-price-predictor"
      resources:
        requests:
          cpu: 500m
          memory: 1Gi
        limits:
          cpu: 2
          memory: 4Gi
      env:
        - name: MLFLOW_TRACKING_URI
          value: "http://mlflow:5000"
        - name: FEAST_FEATURE_STORE_URL
          value: "http://feast:6566"
    minReplicas: 2
    maxReplicas: 10
    scaleTarget: 80
    scaleMetric: concurrency
  canaryTrafficPercent: 10

5. 監視・運用システム

5.1 Prometheusメトリクス収集

# monitoring/metrics.py
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
import functools

# メトリクス定義
PREDICTION_REQUESTS = Counter(
    'prediction_requests_total',
    'Total prediction requests',
    ['model_name', 'symbol']
)

PREDICTION_LATENCY = Histogram(
    'prediction_latency_seconds',
    'Prediction latency in seconds',
    ['model_name']
)

MODEL_ACCURACY = Gauge(
    'model_accuracy',
    'Current model accuracy',
    ['model_name', 'time_window']
)

FEATURE_DRIFT = Gauge(
    'feature_drift_score',
    'Feature drift detection score',
    ['feature_name', 'symbol']
)

TRADING_PNL = Gauge(
    'trading_pnl_usd',
    'Trading P&L in USD',
    ['strategy', 'symbol']
)

class MetricsCollector:
    def __init__(self):
        self.start_time = time.time()

    def record_prediction(self, model_name: str, symbol: str, latency: float):
        """予測リクエストを記録"""
        PREDICTION_REQUESTS.labels(model_name=model_name, symbol=symbol).inc()
        PREDICTION_LATENCY.labels(model_name=model_name).observe(latency)

    def update_model_accuracy(self, model_name: str, accuracy: float, time_window: str):
        """モデル精度を更新"""
        MODEL_ACCURACY.labels(model_name=model_name, time_window=time_window).set(accuracy)

    def record_feature_drift(self, feature_name: str, symbol: str, drift_score: float):
        """特徴量ドリフトを記録"""
        FEATURE_DRIFT.labels(feature_name=feature_name, symbol=symbol).set(drift_score)

    def update_trading_pnl(self, strategy: str, symbol: str, pnl: float):
        """取引損益を更新"""
        TRADING_PNL.labels(strategy=strategy, symbol=symbol).set(pnl)

def monitor_performance(func):
    """パフォーマンス監視デコレータ"""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()

        try:
            result = func(*args, **kwargs)
            latency = time.time() - start_time

            # メトリクス記録
            PREDICTION_LATENCY.labels(model_name=func.__name__).observe(latency)

            return result

        except Exception as e:
            # エラーメトリクス記録
            ERROR_COUNTER.labels(
                function=func.__name__,
                error_type=type(e).__name__
            ).inc()
            raise

    return wrapper

# メトリクスサーバー起動
if __name__ == '__main__':
    start_http_server(8000)
    print("Prometheusメトリクスサーバーが起動しました(ポート8000)")

5.2 ELKスタックによるログ管理

# logging/structured_logger.py
import json
import logging
import sys
from datetime import datetime
from typing import Dict, Any

class StructuredLogger:
    def __init__(self, service_name: str, log_level: str = "INFO"):
        self.service_name = service_name
        self.logger = logging.getLogger(service_name)
        self.logger.setLevel(getattr(logging, log_level.upper()))

        # JSONフォーマッター
        handler = logging.StreamHandler(sys.stdout)
        handler.setFormatter(JSONFormatter())
        self.logger.addHandler(handler)

    def log_prediction(self, symbol: str, prediction: float, confidence: float, 
                      model_version: str, latency_ms: float):
        """予測ログ"""
        self.logger.info("prediction", extra={
            'event_type': 'prediction',
            'symbol': symbol,
            'prediction': prediction,
            'confidence': confidence,
            'model_version': model_version,
            'latency_ms': latency_ms,
            'timestamp': datetime.utcnow().isoformat()
        })

    def log_trade(self, symbol: str, action: str, quantity: float, 
                  price: float, strategy: str):
        """取引ログ"""
        self.logger.info("trade_execution", extra={
            'event_type': 'trade',
            'symbol': symbol,
            'action': action,
            'quantity': quantity,
            'price': price,
            'strategy': strategy,
            'timestamp': datetime.utcnow().isoformat()
        })

    def log_model_drift(self, model_name: str, drift_metrics: Dict[str, float]):
        """モデルドリフトログ"""
        self.logger.warning("model_drift_detected", extra={
            'event_type': 'model_drift',
            'model_name': model_name,
            'drift_metrics': drift_metrics,
            'timestamp': datetime.utcnow().isoformat()
        })

    def log_error(self, error: Exception, context: Dict[str, Any]):
        """エラーログ"""
        self.logger.error("error_occurred", extra={
            'event_type': 'error',
            'error_type': type(error).__name__,
            'error_message': str(error),
            'context': context,
            'timestamp': datetime.utcnow().isoformat()
        })

class JSONFormatter(logging.Formatter):
    def format(self, record):
        log_entry = {
            'service': getattr(record, 'service_name', 'unknown'),
            'level': record.levelname,
            'message': record.getMessage(),
            'timestamp': datetime.utcnow().isoformat()
        }

        # 追加フィールドを統合
        for key, value in record.__dict__.items():
            if key not in ['name', 'msg', 'args', 'levelname', 'levelno', 
                          'pathname', 'filename', 'module', 'lineno', 
                          'funcName', 'created', 'msecs', 'relativeCreated', 
                          'thread', 'threadName', 'processName', 'process']:
                log_entry[key] = value

        return json.dumps(log_entry)
# elk-stack.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: logstash-config
data:
  logstash.conf: |
    input {
      beats {
        port => 5044
      }
    }

    filter {
      if [fields][service] == "crypto-ml" {
        json {
          source => "message"
        }

        # 予測ログの処理
        if [event_type] == "prediction" {
          mutate {
            add_tag => ["prediction"]
          }
        }

        # 取引ログの処理
        if [event_type] == "trade" {
          mutate {
            add_tag => ["trading"]
          }
        }

        # エラーログの処理
        if [event_type] == "error" {
          mutate {
            add_tag => ["error"]
          }
        }
      }
    }

    output {
      elasticsearch {
        hosts => ["elasticsearch:9200"]
        index => "crypto-ml-%{+YYYY.MM.dd}"
      }
    }
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: elasticsearch
spec:
  replicas: 1
  selector:
    matchLabels:
      app: elasticsearch
  template:
    metadata:
      labels:
        app: elasticsearch
    spec:
      containers:
      - name: elasticsearch
        image: elasticsearch:7.15.0
        ports:
        - containerPort: 9200
        env:
        - name: discovery.type
          value: single-node
        - name: ES_JAVA_OPTS
          value: "-Xms512m -Xmx512m"
        resources:
          requests:
            memory: "1Gi"
            cpu: "500m"
          limits:
            memory: "2Gi"
            cpu: "1"

5.3 Grafanaダッシュボード

{
  "dashboard": {
    "title": "Crypto ML Pipeline Dashboard",
    "panels": [
      {
        "title": "Model Prediction Accuracy",
        "type": "stat",
        "targets": [
          {
            "expr": "model_accuracy{time_window=\"1h\"}",
            "legendFormat": "{{model_name}}"
          }
        ],
        "fieldConfig": {
          "defaults": {
            "min": 0,
            "max": 1,
            "unit": "percentunit"
          }
        }
      },
      {
        "title": "Prediction Latency",
        "type": "graph",
        "targets": [
          {
            "expr": "histogram_quantile(0.95, prediction_latency_seconds_bucket)",
            "legendFormat": "95th percentile"
          },
          {
            "expr": "histogram_quantile(0.50, prediction_latency_seconds_bucket)",
            "legendFormat": "50th percentile"
          }
        ]
      },
      {
        "title": "Trading P&L",
        "type": "graph",
        "targets": [
          {
            "expr": "trading_pnl_usd",
            "legendFormat": "{{strategy}} - {{symbol}}"
          }
        ]
      },
      {
        "title": "Feature Drift Scores",
        "type": "heatmap",
        "targets": [
          {
            "expr": "feature_drift_score",
            "format": "time_series"
          }
        ]
      },
      {
        "title": "System Resource Usage",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(container_cpu_usage_seconds_total[5m])",
            "legendFormat": "CPU Usage - {{pod}}"
          },
          {
            "expr": "container_memory_usage_bytes / 1024 / 1024",
            "legendFormat": "Memory Usage (MB) - {{pod}}"
          }
        ]
      }
    ]
  }
}

6. CI/CDパイプライン

6.1 GitHub Actions

# .github/workflows/ml-pipeline.yml
name: ML Pipeline CI/CD

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]

env:
  DOCKER_REGISTRY: your-registry.com
  KUBE_NAMESPACE: crypto-ml

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v3

    - name: Set up Python
      uses: actions/setup-python@v4
      with:
        python-version: '3.10'

    - name: Install dependencies
      run: |
        pip install -r requirements.txt
        pip install -r requirements-test.txt

    - name: Run unit tests
      run: |
        pytest tests/unit/ --cov=src --cov-report=xml

    - name: Run integration tests
      run: |
        docker-compose -f docker-compose.test.yml up -d
        pytest tests/integration/
        docker-compose -f docker-compose.test.yml down

    - name: Upload coverage to Codecov
      uses: codecov/codecov-action@v3

  build:
    needs: test
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'

    steps:
    - uses: actions/checkout@v3

    - name: Build Docker images
      run: |
        docker build -t $DOCKER_REGISTRY/crypto-ml-training:$GITHUB_SHA .
        docker build -t $DOCKER_REGISTRY/crypto-ml-inference:$GITHUB_SHA \
          -f Dockerfile.inference .

    - name: Push Docker images
      run: |
        echo ${{ secrets.DOCKER_PASSWORD }} | \
          docker login $DOCKER_REGISTRY -u ${{ secrets.DOCKER_USERNAME }} --password-stdin
        docker push $DOCKER_REGISTRY/crypto-ml-training:$GITHUB_SHA
        docker push $DOCKER_REGISTRY/crypto-ml-inference:$GITHUB_SHA

  deploy:
    needs: build
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'

    steps:
    - uses: actions/checkout@v3

    - name: Configure kubectl
      run: |
        echo ${{ secrets.KUBE_CONFIG }} | base64 -d > $HOME/.kube/config

    - name: Deploy to staging
      run: |
        helm upgrade --install crypto-ml-staging ./helm-chart \
          --namespace $KUBE_NAMESPACE-staging \
          --set image.tag=$GITHUB_SHA \
          --set environment=staging

    - name: Run smoke tests
      run: |
        python tests/smoke_tests.py --environment=staging

    - name: Deploy to production
      if: success()
      run: |
        helm upgrade --install crypto-ml-prod ./helm-chart \
          --namespace $KUBE_NAMESPACE-prod \
          --set image.tag=$GITHUB_SHA \
          --set environment=production

6.2 Helmチャート

# helm-chart/values.yaml
replicaCount: 3

image:
  repository: your-registry.com/crypto-ml-inference
  tag: latest
  pullPolicy: IfNotPresent

service:
  type: ClusterIP
  port: 80
  targetPort: 8080

ingress:
  enabled: true
  className: nginx
  annotations:
    nginx.ingress.kubernetes.io/rate-limit: "100"
  hosts:
    - host: crypto-ml-api.yourdomain.com
      paths:
        - path: /
          pathType: Prefix

resources:
  limits:
    cpu: 2000m
    memory: 4Gi
  requests:
    cpu: 1000m
    memory: 2Gi

autoscaling:
  enabled: true
  minReplicas: 2
  maxReplicas: 10
  targetCPUUtilizationPercentage: 70
  targetMemoryUtilizationPercentage: 80

nodeSelector: {}

tolerations: []

affinity:
  podAntiAffinity:
    preferredDuringSchedulingIgnoredDuringExecution:
    - weight: 100
      podAffinityTerm:
        labelSelector:
          matchExpressions:
          - key: app.kubernetes.io/name
            operator: In
            values:
            - crypto-ml-inference
        topologyKey: kubernetes.io/hostname

configMap:
  MLFLOW_TRACKING_URI: "http://mlflow:5000"
  FEAST_FEATURE_STORE_URL: "http://feast:6566"
  LOG_LEVEL: "INFO"
  PROMETHEUS_PORT: "8000"

secret:
  DATABASE_URL: "postgresql://user:password@postgres:5432/crypto_ml"
  REDIS_URL: "redis://redis:6379/0"
  API_KEY: "your-secret-api-key"
# helm-chart/templates/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: {{ include "crypto-ml.fullname" . }}
  labels:
    {{- include "crypto-ml.labels" . | nindent 4 }}
spec:
  {{- if not .Values.autoscaling.enabled }}
  replicas: {{ .Values.replicaCount }}
  {{- end }}
  selector:
    matchLabels:
      {{- include "crypto-ml.selectorLabels" . | nindent 6 }}
  template:
    metadata:
      annotations:
        checksum/config: {{ include (print $.Template.BasePath "/configmap.yaml") . | sha256sum }}
      labels:
        {{- include "crypto-ml.selectorLabels" . | nindent 8 }}
    spec:
      containers:
        - name: {{ .Chart.Name }}
          image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}"
          imagePullPolicy: {{ .Values.image.pullPolicy }}
          ports:
            - name: http
              containerPort: 8080
              protocol: TCP
            - name: metrics
              containerPort: 8000
              protocol: TCP
          env:
            {{- range $key, $value := .Values.configMap }}
            - name: {{ $key }}
              value: {{ $value | quote }}
            {{- end }}
            {{- range $key, $value := .Values.secret }}
            - name: {{ $key }}
              valueFrom:
                secretKeyRef:
                  name: {{ include "crypto-ml.fullname" $ }}
                  key: {{ $key }}
            {{- end }}
          livenessProbe:
            httpGet:
              path: /health
              port: http
            initialDelaySeconds: 30
            periodSeconds: 10
          readinessProbe:
            httpGet:
              path: /ready
              port: http
            initialDelaySeconds: 5
            periodSeconds: 5
          resources:
            {{- toYaml .Values.resources | nindent 12 }}
      {{- with .Values.nodeSelector }}
      nodeSelector:
        {{- toYaml . | nindent 8 }}
      {{- end }}
      {{- with .Values.affinity }}
      affinity:
        {{- toYaml . | nindent 8 }}
      {{- end }}
      {{- with .Values.tolerations }}
      tolerations:
        {{- toYaml . | nindent 8 }}
      {{- end }}

7. セキュリティとコンプライアンス

7.1 セキュリティ設定

# security/network-policy.yaml
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: crypto-ml-network-policy
  namespace: crypto-ml
spec:
  podSelector:
    matchLabels:
      app: crypto-ml
  policyTypes:
  - Ingress
  - Egress
  ingress:
  - from:
    - namespaceSelector:
        matchLabels:
          name: ingress-nginx
    ports:
    - protocol: TCP
      port: 8080
  - from:
    - namespaceSelector:
        matchLabels:
          name: monitoring
    ports:
    - protocol: TCP
      port: 8000
  egress:
  - to:
    - namespaceSelector:
        matchLabels:
          name: database
    ports:
    - protocol: TCP
      port: 5432
  - to: []
    ports:
    - protocol: TCP
      port: 443
    - protocol: TCP
      port: 80
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: crypto-ml-sa
  namespace: crypto-ml
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: crypto-ml-role
  namespace: crypto-ml
rules:
- apiGroups: [""]
  resources: ["configmaps", "secrets"]
  verbs: ["get", "list"]
- apiGroups: [""]
  resources: ["pods"]
  verbs: ["get", "list"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: crypto-ml-rolebinding
  namespace: crypto-ml
roleRef:
  kind: Role
  name: crypto-ml-role
  apiGroup: rbac.authorization.k8s.io
subjects:
- kind: ServiceAccount
  name: crypto-ml-sa
  namespace: crypto-ml

7.2 シークレット管理

# security/secrets_manager.py
import os
import json
from typing import Dict, Any
from cryptography.fernet import Fernet
import boto3
from kubernetes import client, config

class SecretsManager:
    def __init__(self, provider: str = "kubernetes"):
        self.provider = provider

        if provider == "kubernetes":
            config.load_incluster_config()
            self.k8s_client = client.CoreV1Api()
        elif provider == "aws":
            self.secrets_client = boto3.client('secretsmanager')
        elif provider == "vault":
            # HashiCorp Vault連携
            pass

    def get_secret(self, secret_name: str, namespace: str = "default") -> Dict[str, Any]:
        """シークレット取得"""
        if self.provider == "kubernetes":
            return self._get_k8s_secret(secret_name, namespace)
        elif self.provider == "aws":
            return self._get_aws_secret(secret_name)

    def _get_k8s_secret(self, secret_name: str, namespace: str) -> Dict[str, Any]:
        """Kubernetesシークレット取得"""
        try:
            secret = self.k8s_client.read_namespaced_secret(
                name=secret_name,
                namespace=namespace
            )

            # Base64デコード
            decoded_data = {}
            for key, value in secret.data.items():
                decoded_data[key] = base64.b64decode(value).decode('utf-8')

            return decoded_data

        except Exception as e:
            raise ValueError(f"シークレット取得エラー: {e}")

    def _get_aws_secret(self, secret_name: str) -> Dict[str, Any]:
        """AWS Secrets Managerからシークレット取得"""
        try:
            response = self.secrets_client.get_secret_value(SecretId=secret_name)
            return json.loads(response['SecretString'])

        except Exception as e:
            raise ValueError(f"AWS シークレット取得エラー: {e}")

class EncryptionManager:
    def __init__(self, key: bytes = None):
        if key is None:
            key = Fernet.generate_key()
        self.fernet = Fernet(key)

    def encrypt(self, data: str) -> bytes:
        """データ暗号化"""
        return self.fernet.encrypt(data.encode())

    def decrypt(self, encrypted_data: bytes) -> str:
        """データ復号化"""
        return self.fernet.decrypt(encrypted_data).decode()

# 使用例
secrets_manager = SecretsManager("kubernetes")
encryption_manager = EncryptionManager()

# API キーを安全に取得
api_keys = secrets_manager.get_secret("api-keys", "crypto-ml")
database_url = secrets_manager.get_secret("database-config", "crypto-ml")

8. 災害復旧とバックアップ

8.1 バックアップ戦略

# backup/backup_manager.py
import asyncio
import boto3
import subprocess
from datetime import datetime, timedelta
from typing import Dict, List
import schedule
import time

class BackupManager:
    def __init__(self, config: Dict):
        self.config = config
        self.s3_client = boto3.client('s3')

    async def backup_database(self):
        """データベースバックアップ"""
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        backup_file = f"crypto_ml_backup_{timestamp}.sql"

        # PostgreSQLダンプ
        cmd = [
            'pg_dump',
            '-h', self.config['db_host'],
            '-U', self.config['db_user'],
            '-d', self.config['db_name'],
            '-f', backup_file
        ]

        process = await asyncio.create_subprocess_exec(
            *cmd,
            env={'PGPASSWORD': self.config['db_password']},
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )

        stdout, stderr = await process.communicate()

        if process.returncode == 0:
            # S3にアップロード
            await self._upload_to_s3(backup_file, f"database/{backup_file}")
            print(f"データベースバックアップ完了: {backup_file}")
        else:
            print(f"バックアップエラー: {stderr.decode()}")

    async def backup_models(self):
        """MLモデルバックアップ"""
        import mlflow

        # MLflowから最新モデルを取得
        client = mlflow.tracking.MlflowClient()
        models = client.list_registered_models()

        for model in models:
            latest_version = client.get_latest_versions(
                model.name, stages=["Production"]
            )[0]

            # モデルファイルをダウンロード
            model_path = mlflow.artifacts.download_artifacts(
                latest_version.source
            )

            # S3にバックアップ
            backup_key = f"models/{model.name}/v{latest_version.version}/"
            await self._upload_directory_to_s3(model_path, backup_key)

    async def backup_feature_store(self):
        """特徴量ストアバックアップ"""
        from feast import FeatureStore

        fs = FeatureStore(".")

        # 特徴量定義をエクスポート
        feature_definitions = fs.list_feature_views()

        # YAML形式で保存
        backup_data = {
            'timestamp': datetime.now().isoformat(),
            'feature_views': [fv.to_dict() for fv in feature_definitions]
        }

        # S3に保存
        backup_key = f"feature_store/backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
        await self._upload_json_to_s3(backup_data, backup_key)

    async def _upload_to_s3(self, local_file: str, s3_key: str):
        """S3にファイルアップロード"""
        try:
            self.s3_client.upload_file(
                local_file,
                self.config['backup_bucket'],
                s3_key
            )
        except Exception as e:
            print(f"S3アップロードエラー: {e}")

    async def _upload_directory_to_s3(self, local_dir: str, s3_prefix: str):
        """ディレクトリをS3にアップロード"""
        import os

        for root, dirs, files in os.walk(local_dir):
            for file in files:
                local_path = os.path.join(root, file)
                relative_path = os.path.relpath(local_path, local_dir)
                s3_key = os.path.join(s3_prefix, relative_path)

                await self._upload_to_s3(local_path, s3_key)

    def schedule_backups(self):
        """バックアップスケジュール設定"""
        # 毎日深夜にデータベースバックアップ
        schedule.every().day.at("02:00").do(
            lambda: asyncio.run(self.backup_database())
        )

        # 週次でモデルバックアップ
        schedule.every().sunday.at("03:00").do(
            lambda: asyncio.run(self.backup_models())
        )

        # 日次で特徴量ストアバックアップ
        schedule.every().day.at("04:00").do(
            lambda: asyncio.run(self.backup_feature_store())
        )

        # スケジューラー実行
        while True:
            schedule.run_pending()
            time.sleep(60)

8.2 災害復旧計画

# disaster_recovery/recovery_manager.py
import asyncio
import boto3
from kubernetes import client, config
from typing import Dict, List

class DisasterRecoveryManager:
    def __init__(self, config: Dict):
        self.config = config
        self.s3_client = boto3.client('s3')
        config.load_incluster_config()
        self.k8s_apps = client.AppsV1Api()
        self.k8s_core = client.CoreV1Api()

    async def execute_recovery_plan(self, disaster_type: str):
        """災害復旧計画実行"""
        recovery_plans = {
            'database_failure': self._recover_database,
            'model_service_failure': self._recover_model_service,
            'complete_system_failure': self._recover_complete_system,
            'data_corruption': self._recover_from_corruption
        }

        if disaster_type in recovery_plans:
            await recovery_plans[disaster_type]()
        else:
            raise ValueError(f"未知の災害タイプ: {disaster_type}")

    async def _recover_database(self):
        """データベース復旧"""
        print("データベース復旧を開始...")

        # 1. 最新バックアップを特定
        latest_backup = await self._find_latest_backup('database/')

        # 2. バックアップをダウンロード
        local_backup = await self._download_from_s3(latest_backup)

        # 3. データベースを復元
        await self._restore_database(local_backup)

        # 4. ヘルスチェック
        if await self._check_database_health():
            print("データベース復旧完了")
        else:
            raise Exception("データベース復旧に失敗")

    async def _recover_model_service(self):
        """モデルサービス復旧"""
        print("モデルサービス復旧を開始...")

        # 1. 現在のデプロイメントを削除
        await self._delete_deployment('crypto-ml-inference')

        # 2. 最新の安定版イメージでデプロイ
        await self._deploy_stable_version()

        # 3. ヘルスチェック
        if await self._check_service_health('crypto-ml-inference'):
            print("モデルサービス復旧完了")
        else:
            raise Exception("モデルサービス復旧に失敗")

    async def _recover_complete_system(self):
        """完全システム復旧"""
        print("完全システム復旧を開始...")

        # 1. インフラストラクチャ復旧
        await self._restore_infrastructure()

        # 2. データベース復旧
        await self._recover_database()

        # 3. 特徴量ストア復旧
        await self._restore_feature_store()

        # 4. MLモデル復旧
        await self._restore_models()

        # 5. アプリケーションサービス復旧
        await self._restore_applications()

        # 6. 完全性チェック
        if await self._verify_system_integrity():
            print("完全システム復旧完了")
        else:
            raise Exception("システム復旧に失敗")

    async def _restore_infrastructure(self):
        """インフラストラクチャ復旧"""
        # Terraformを使用したインフラ復旧
        import subprocess

        terraform_cmd = [
            'terraform', 'apply',
            '-auto-approve',
            '-var-file', 'disaster_recovery.tfvars'
        ]

        process = await asyncio.create_subprocess_exec(
            *terraform_cmd,
            cwd='/app/terraform',
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )

        stdout, stderr = await process.communicate()

        if process.returncode != 0:
            raise Exception(f"インフラ復旧エラー: {stderr.decode()}")

    async def _verify_system_integrity(self) -> bool:
        """システム完全性検証"""
        checks = [
            self._check_database_health(),
            self._check_service_health('crypto-ml-inference'),
            self._check_service_health('crypto-ml-training'),
            self._check_feature_store_health(),
            self._check_monitoring_health()
        ]

        results = await asyncio.gather(*checks, return_exceptions=True)

        # すべてのチェックが成功した場合のみTrue
        return all(result is True for result in results)

    async def _check_database_health(self) -> bool:
        """データベースヘルスチェック"""
        import asyncpg

        try:
            conn = await asyncpg.connect(self.config['database_url'])
            result = await conn.fetchval('SELECT 1')
            await conn.close()
            return result == 1
        except Exception:
            return False

    async def _check_service_health(self, service_name: str) -> bool:
        """サービスヘルスチェック"""
        try:
            pods = self.k8s_core.list_namespaced_pod(
                namespace=self.config['namespace'],
                label_selector=f'app={service_name}'
            )

            # すべてのPodがRunning状態かチェック
            return all(pod.status.phase == 'Running' for pod in pods.items)
        except Exception:
            return False

9. 統合実装例

# main_application.py
import asyncio
import logging
from typing import Dict, Any
from dataclasses import dataclass

@dataclass
class MLPipelineConfig:
    """MLパイプライン設定"""
    namespace: str = "crypto-ml"
    redis_url: str = "redis://redis:6379/0"
    database_url: str = "postgresql://user:pass@postgres:5432/crypto_ml"
    mlflow_tracking_uri: str = "http://mlflow:5000"
    kafka_servers: list = None
    feature_store_url: str = "http://feast:6566"
    monitoring_enabled: bool = True

class CryptoMLPipelineOrchestrator:
    """暗号資産MLパイプラインオーケストレーター"""

    def __init__(self, config: MLPipelineConfig):
        self.config = config
        self.logger = self._setup_logging()

        # コンポーネント初期化
        self.data_collector = self._init_data_collector()
        self.feature_store = self._init_feature_store()
        self.model_manager = self._init_model_manager()
        self.inference_service = self._init_inference_service()
        self.monitoring = self._init_monitoring()

    def _setup_logging(self):
        """ログ設定"""
        from logging_config import StructuredLogger
        return StructuredLogger("crypto-ml-orchestrator")

    async def start_pipeline(self):
        """パイプライン開始"""
        self.logger.info("Crypto ML Pipeline starting...")

        # 並行タスク開始
        tasks = [
            self._run_data_collection(),
            self._run_feature_processing(),
            self._run_model_training(),
            self._run_inference_service(),
            self._run_monitoring()
        ]

        try:
            await asyncio.gather(*tasks)
        except Exception as e:
            self.logger.log_error(e, {"component": "orchestrator"})
            await self._graceful_shutdown()

    async def _run_data_collection(self):
        """データ収集実行"""
        while True:
            try:
                # 価格データ収集
                await self.data_collector.collect_price_data()

                # オンチェーンデータ収集
                await self.data_collector.collect_onchain_data()

                await asyncio.sleep(60)  # 1分間隔

            except Exception as e:
                self.logger.log_error(e, {"component": "data_collection"})
                await asyncio.sleep(60)

    async def _run_inference_service(self):
        """推論サービス実行"""
        from inference_service import CryptoMLModel

        model = CryptoMLModel("crypto-price-predictor")
        model.load()

        # Web API サーバー起動
        from fastapi import FastAPI
        import uvicorn

        app = FastAPI(title="Crypto ML API")

        @app.post("/predict")
        async def predict(payload: Dict[str, Any]):
            return await model.predict(payload)

        @app.get("/health")
        async def health():
            return {"status": "healthy", "timestamp": pd.Timestamp.now().isoformat()}

        config = uvicorn.Config(app, host="0.0.0.0", port=8080, log_level="info")
        server = uvicorn.Server(config)
        await server.serve()

    async def _graceful_shutdown(self):
        """グレースフルシャットダウン"""
        self.logger.info("Graceful shutdown initiated...")

        # リソースクリーンアップ
        await self.data_collector.close()
        await self.feature_store.close()
        await self.model_manager.close()

        self.logger.info("Shutdown complete")

# エントリーポイント
async def main():
    config = MLPipelineConfig(
        kafka_servers=["kafka:9092"],
        monitoring_enabled=True
    )

    orchestrator = CryptoMLPipelineOrchestrator(config)
    await orchestrator.start_pipeline()

if __name__ == "__main__":
    asyncio.run(main())

10. まとめとベストプラクティス

10.1 実装チェックリスト

CLOUD_NATIVE_ML_CHECKLIST = {
    "アーキテクチャ": [
        "マイクロサービス分割の適切性",
        "スケーラビリティ要件の定義",
        "データフローの最適化",
        "依存関係の管理"
    ],
    "コンテナ化": [
        "マルチステージビルドの使用",
        "セキュリティベストプラクティス",
        "イメージサイズ最適化",
        "ヘルスチェック実装"
    ],
    "オーケストレーション": [
        "リソース要求・制限設定",
        "オートスケーリング設定",
        "ローリングアップデート戦略",
        "アフィニティルール設定"
    ],
    "監視・運用": [
        "メトリクス収集設定",
        "ログ集約設定",
        "アラート設定",
        "ダッシュボード構築"
    ],
    "セキュリティ": [
        "ネットワークポリシー設定",
        "RBAC設定",
        "シークレット管理",
        "イメージスキャン"
    ],
    "CI/CD": [
        "自動テスト設定",
        "自動デプロイ設定",
        "ロールバック戦略",
        "環境別設定管理"
    ]
}

10.2 推奨事項

  1. 段階的移行
    - モノリスから徐々にマイクロサービス化
    - 一つずつサービスをコンテナ化
    - リスクを最小化

  2. 監視の重要性
    - 最初から監視を組み込む
    - SREの原則に従う
    - 可観測性の4つの柱を意識

  3. セキュリティファースト
    - 設計段階からセキュリティを考慮
    - 最小権限の原則
    - 定期的なセキュリティ監査

  4. コスト最適化
    - リソースの適切なサイジング
    - スポットインスタンスの活用
    - 不要なリソースの自動削除

クラウドネイティブMLパイプラインの構築により、暗号資産取引システムの可用性、スケーラビリティ、保守性を大幅に向上させることができます。適切な設計と実装により、24/7稼働する堅牢なシステムを実現できます。