ML Documentation

暗号通貨WebSocketクローラー システムドキュメント

概要

このシステムは、複数の暗号通貨取引所からリアルタイムで取引データを収集し、データ品質管理を行いながらQuestDBに保存する分散型データ収集システムです。

システム構成

主要コンポーネント

  1. データ収集層(Crawlers)
    - Hyperliquid Crawler
    - Binance Crawler(フェイルオーバー機能付き)
    - ~~Bybit Crawler~~(ネットワーク制限により一時停止中)

  2. メッセージング層
    - RedPanda(Kafka互換):リアルタイムストリーミング

  3. データ保存層
    - QuestDB:時系列データベース

  4. データ品質管理
    - リアルタイム品質チェック
    - 品質スコアリング(0-100点)
    - 品質メトリクスの記録

  5. モニタリング
    - Webダッシュボード(http://localhost:8888)
    - Grafana(http://localhost:3000)
    - RedPanda Console(http://localhost:8080)

新機能詳細

1. Binanceフェイルオーバー機能

binance_crawler_failover.pyは、接続障害時に自動的に別のエンドポイントに切り替える機能を実装しています。

エンドポイント優先順位:
1. Primary: wss://stream.binance.com:9443/ws (全機能)
2. Secondary: wss://stream.binance.com:443/ws (代替ポート)
3. Data-only: wss://data-stream.binance.vision/ws (マーケットデータのみ)

動作仕様:
- 各エンドポイントで3回失敗すると次のエンドポイントへ切り替え
- 全エンドポイントを巡回した場合、30秒待機後に最初から再試行
- 接続統計をログに記録

2. データ品質管理システム

data_quality_manager.pyは、以下の品質チェックを実行します:

品質チェック項目

  1. 重複検出
    - 同一取引IDの検出
    - 最近10,000件をメモリキャッシュで管理

  2. タイムスタンプ検証
    - 未来のタイムスタンプ検出(5分以上)
    - 順序異常検出(3分以上の逆転)

  3. 価格異常検出
    - 最小/最大価格の範囲チェック
    - 移動平均からの乖離率チェック(20%以上で警告)

  4. データ完全性
    - 必須フィールドの存在確認
    - データ型の検証と自動変換

  5. 品質スコアリング
    - 各取引所・シンボルペアごとに0-100点で評価
    - エラー率に基づく自動アラート(5%以上)

データベーススキーマ

-- メイン取引テーブル
CREATE TABLE trades (
    timestamp TIMESTAMP,
    exchange SYMBOL CAPACITY 50 CACHE,
    symbol SYMBOL CAPACITY 500 CACHE,
    trade_id STRING,
    price DOUBLE,
    size DOUBLE,
    side SYMBOL CAPACITY 2 CACHE,
    is_liquidation BOOLEAN,
    quality_score INT DEFAULT 100  -- 品質スコア (0-100)
) timestamp(timestamp) PARTITION BY DAY;

-- 品質メトリクステーブル
CREATE TABLE trade_quality_metrics (
    timestamp TIMESTAMP,
    exchange SYMBOL CAPACITY 50 CACHE,
    symbol SYMBOL CAPACITY 500 CACHE,
    total_processed LONG,
    valid_count LONG,
    invalid_count LONG,
    duplicate_count LONG,
    price_anomaly_count LONG,
    quality_score DOUBLE
) timestamp(timestamp) PARTITION BY DAY;

使用方法

起動

# 全サービスの起動
make up

# ビルドして再起動
make restart

# ログの確認
make logs
make logs-consumer
make logs-binance
make logs-hyperliquid

データ品質の確認

-- 品質スコアの確認
SELECT exchange, symbol, 
       COUNT(*) as count, 
       AVG(quality_score) as avg_quality 
FROM trades 
GROUP BY exchange, symbol;

-- 低品質データの検出
SELECT * FROM trades 
WHERE quality_score < 80
ORDER BY timestamp DESC
LIMIT 100;

-- 品質メトリクスの確認
SELECT * FROM trade_quality_metrics
ORDER BY timestamp DESC
LIMIT 10;

監視対象シンボル

現在の設定:
- ETH, SOL, AVAX, HYPE
- ~~BTC, ARB~~(除外)

環境変数

Crawler共通

QuestDB Consumer

トラブルシューティング

品質スコアが低い場合

  1. 重複が多い場合
    - ネットワーク遅延により同じデータを複数回受信している可能性
    - WebSocket接続の安定性を確認

  2. タイムスタンプエラー
    - システム時刻の同期を確認
    - NTPサービスの状態を確認

  3. 価格異常
    - 急激な価格変動が実際に発生していないか確認
    - 異常検出の閾値調整を検討

データが保存されない場合

# QuestDBの状態確認
docker-compose ps questdb

# テーブルの存在確認
curl -G "http://localhost:9000/exec" --data-urlencode "query=SHOW TABLES"

# コンシューマーのログ確認
docker-compose logs --tail=100 questdb-consumer

アーキテクチャ図

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│ Hyperliquid │     │   Binance   │     │    Bybit    │
│  WebSocket  │     │  WebSocket  │     │  WebSocket  │
└──────┬──────┘     └──────┬──────┘     └──────┬──────┘
       │                   │                    │
       ▼                   ▼                    ▼
┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│  Crawler    │     │  Crawler    │     │  Crawler    │
│             │     │(Failover対応)│     │ (一時停止)  │
└──────┬──────┘     └──────┬──────┘     └─────────────┘
       │                   │
       ▼                   ▼
    ┌──────────────────────────┐
    │       RedPanda           │
    │   (Kafka Compatible)     │
    └────────────┬─────────────┘
                 │
                 ▼
    ┌──────────────────────────┐
    │    QuestDB Consumer      │
    │  (品質管理機能付き)      │
    └────────────┬─────────────┘
                 │
                 ▼
    ┌──────────────────────────┐
    │        QuestDB           │
    │   (時系列データベース)   │
    └──────────────────────────┘

品質ダッシュボード

アクセス方法

http://localhost:8888/quality

機能一覧

  1. 品質概要カード
    - 全体品質スコア(0-100点)
    - データ完全性
    - タイムスタンプ精度
    - 重複率
    - 価格異常率

  2. 品質トレンドチャート
    - 取引所別品質スコア推移(1時間単位)
    - リアルタイム更新(30秒間隔)

  3. 取引所比較
    - Hyperliquid vs Binance 品質比較
    - 棒グラフで視覚化

  4. データ検証結果
    - 有効/無効/重複/価格異常の分布
    - ドーナツチャートで表示

  5. エラー発生頻度
    - 重複エラー、タイムスタンプエラー、価格異常の推移
    - 時系列グラフ

  6. 異常検出テーブル
    - 重要度別(高/中/低)の異常一覧
    - 詳細情報とタイムスタンプ

フィルター機能

今後の拡張計画

  1. アラート機能
    - Slack/Discord通知
    - 価格急変アラート
    - システム異常通知

  2. 追加取引所
    - OKX
    - Kraken
    - KuCoin

  3. 高度な分析
    - 裁定取引機会の検出
    - 異常取引パターンの検出
    - MLベースの価格予測

  4. 品質管理の強化
    - 自動異常検出の精度向上
    - カスタムアラート設定
    - 品質レポートの自動生成

ライセンス

[プロジェクトのライセンスを記載]