ML Documentation

取引所APIとレート制限管理の実践ガイド

概要

暗号資産取引における機械学習システムでは、複数の取引所APIを効率的に管理し、レート制限を遵守しながら高頻度でのデータ取得・取引実行が必要です。本ガイドでは、実践的なAPI管理手法とレート制限対策を詳しく解説します。

主要取引所のAPI仕様

1. 取引所別レート制限まとめ

class ExchangeRateLimits:
    """主要取引所のレート制限設定"""

    BINANCE = {
        'rest_api': {
            'weight_limit': 1200,      # 1分間あたり
            'order_limit': 100,        # 10秒あたり
            'raw_requests': 6000       # 5分あたり
        },
        'websocket': {
            'connections': 5,          # 最大同時接続数
            'subscriptions': 1024      # 1接続あたり
        }
    }

    COINBASE = {
        'rest_api': {
            'public': 10,              # 1秒あたり
            'private': 5,              # 1秒あたり
            'advanced': 100            # 1秒あたり (Pro)
        },
        'websocket': {
            'connections': 'unlimited',
            'subscriptions': 'unlimited'
        }
    }

    HYPERLIQUID = {
        'rest_api': {
            'public': 1200,            # 1分あたり
            'private': 600             # 1分あたり
        },
        'websocket': {
            'connections': 'unlimited',
            'subscriptions': 'unlimited'
        }
    }

    BYBIT = {
        'rest_api': {
            'public': 120,             # 1分あたり
            'private': 600             # 1分あたり
        },
        'websocket': {
            'connections': 20,
            'subscriptions': 500
        }
    }

高度なレート制限管理システム

1. 動的レート制限管理

import asyncio
import time
from collections import deque, defaultdict
import aiohttp
from dataclasses import dataclass
from typing import Dict, List, Optional

@dataclass
class RateLimit:
    limit: int
    window: int  # seconds
    current_count: int = 0
    reset_time: float = 0

class AdaptiveRateLimiter:
    """適応的レート制限管理"""

    def __init__(self):
        self.limits = {}
        self.request_history = defaultdict(deque)
        self.retry_counts = defaultdict(int)
        self.error_rates = defaultdict(list)

    def add_limit(self, key: str, limit: int, window: int):
        """レート制限の追加"""
        self.limits[key] = RateLimit(limit, window)

    async def acquire(self, key: str, weight: int = 1) -> bool:
        """リクエスト許可の取得"""
        if key not in self.limits:
            return True

        limit = self.limits[key]
        now = time.time()

        # 古いリクエストを削除
        while (self.request_history[key] and 
               self.request_history[key][0] <= now - limit.window):
            self.request_history[key].popleft()

        # 現在のリクエスト数をチェック
        current_requests = sum(req[1] for req in self.request_history[key])

        if current_requests + weight <= limit.limit:
            self.request_history[key].append((now, weight))
            return True
        else:
            # 動的調整の実行
            await self._adaptive_wait(key, weight)
            return await self.acquire(key, weight)

    async def _adaptive_wait(self, key: str, weight: int):
        """適応的待機時間の計算"""
        limit = self.limits[key]
        error_rate = self._calculate_error_rate(key)
        retry_count = self.retry_counts[key]

        # 基本待機時間
        base_wait = limit.window / limit.limit

        # エラー率に基づく調整
        error_multiplier = 1 + (error_rate * 2)

        # リトライ回数に基づく調整
        retry_multiplier = 1.5 ** min(retry_count, 5)

        # 最終待機時間
        wait_time = base_wait * error_multiplier * retry_multiplier

        await asyncio.sleep(min(wait_time, 60))  # 最大60秒

    def record_error(self, key: str):
        """エラーの記録"""
        self.error_rates[key].append(time.time())
        self.retry_counts[key] += 1

    def record_success(self, key: str):
        """成功の記録"""
        self.retry_counts[key] = max(0, self.retry_counts[key] - 1)

    def _calculate_error_rate(self, key: str) -> float:
        """直近のエラー率計算"""
        now = time.time()
        recent_errors = [
            t for t in self.error_rates[key] 
            if t > now - 300  # 過去5分
        ]
        return len(recent_errors) / 300  # エラー/秒

2. マルチ取引所API管理

class MultiExchangeAPIManager:
    """複数取引所のAPI統合管理"""

    def __init__(self):
        self.exchanges = {}
        self.rate_limiters = {}
        self.circuit_breakers = {}
        self.request_queues = {}

    def add_exchange(self, name: str, api_config: dict):
        """取引所の追加"""
        self.exchanges[name] = api_config
        self.rate_limiters[name] = AdaptiveRateLimiter()
        self.circuit_breakers[name] = CircuitBreaker(
            failure_threshold=5,
            recovery_timeout=30
        )
        self.request_queues[name] = asyncio.Queue()

        # レート制限の設定
        for endpoint, limits in api_config['rate_limits'].items():
            self.rate_limiters[name].add_limit(
                endpoint, limits['limit'], limits['window']
            )

    async def request(self, exchange: str, endpoint: str, **kwargs):
        """統一APIリクエスト"""

        # サーキットブレーカーチェック
        if not self.circuit_breakers[exchange].can_execute():
            raise Exception(f"Circuit breaker open for {exchange}")

        # レート制限チェック
        await self.rate_limiters[exchange].acquire(endpoint)

        try:
            # リクエスト実行
            response = await self._execute_request(exchange, endpoint, **kwargs)

            # 成功記録
            self.rate_limiters[exchange].record_success(endpoint)
            self.circuit_breakers[exchange].record_success()

            return response

        except Exception as e:
            # エラー記録
            self.rate_limiters[exchange].record_error(endpoint)
            self.circuit_breakers[exchange].record_failure()

            # エラーハンドリング
            return await self._handle_api_error(exchange, endpoint, e, **kwargs)

    async def _handle_api_error(self, exchange: str, endpoint: str, error: Exception, **kwargs):
        """APIエラーのハンドリング"""

        if "rate limit" in str(error).lower():
            # レート制限エラー
            await self._handle_rate_limit_error(exchange, endpoint)
            return await self.request(exchange, endpoint, **kwargs)

        elif "503" in str(error) or "502" in str(error):
            # サーバーエラー
            await asyncio.sleep(5)
            return await self.request(exchange, endpoint, **kwargs)

        else:
            # その他のエラー
            raise error

3. インテリジェントリクエスト分散

class IntelligentRequestDistributor:
    """インテリジェントなリクエスト分散"""

    def __init__(self):
        self.exchange_priorities = {}
        self.response_times = defaultdict(list)
        self.success_rates = defaultdict(list)
        self.cost_factors = {}

    def calculate_exchange_score(self, exchange: str) -> float:
        """取引所のスコア計算"""

        # レスポンス時間スコア (低いほど良い)
        avg_response_time = np.mean(self.response_times[exchange][-100:]) if self.response_times[exchange] else 1.0
        response_score = 1.0 / (1.0 + avg_response_time)

        # 成功率スコア
        success_rate = np.mean(self.success_rates[exchange][-100:]) if self.success_rates[exchange] else 0.5

        # コストスコア
        cost_score = 1.0 / (1.0 + self.cost_factors.get(exchange, 1.0))

        # 現在の負荷スコア
        current_load = self.get_current_load(exchange)
        load_score = 1.0 / (1.0 + current_load)

        # 総合スコア
        total_score = (
            response_score * 0.3 +
            success_rate * 0.4 +
            cost_score * 0.2 +
            load_score * 0.1
        )

        return total_score

    async def distribute_requests(self, requests: List[dict]) -> Dict[str, List[dict]]:
        """リクエストの最適分散"""

        # 各取引所のスコア計算
        exchange_scores = {
            exchange: self.calculate_exchange_score(exchange)
            for exchange in self.exchanges
        }

        # リクエストの分散
        distribution = defaultdict(list)

        for request in requests:
            # リクエストタイプに基づく重み付け
            weights = self._calculate_request_weights(request, exchange_scores)

            # 最適な取引所を選択
            best_exchange = max(weights.keys(), key=weights.get)
            distribution[best_exchange].append(request)

        return distribution

    def _calculate_request_weights(self, request: dict, base_scores: dict) -> dict:
        """リクエスト固有の重み計算"""

        weights = base_scores.copy()

        # データの重要度
        if request.get('priority') == 'high':
            # 高優先度は信頼性を重視
            for exchange in weights:
                success_rate = np.mean(self.success_rates[exchange][-20:]) if self.success_rates[exchange] else 0.5
                weights[exchange] *= (1 + success_rate)

        # レイテンシ要件
        if request.get('latency_sensitive'):
            # レイテンシ敏感は応答時間を重視
            for exchange in weights:
                avg_response = np.mean(self.response_times[exchange][-20:]) if self.response_times[exchange] else 1.0
                weights[exchange] *= (2.0 / (1.0 + avg_response))

        return weights

WebSocket管理とフォールバック

1. 堅牢なWebSocket管理

class RobustWebSocketManager:
    """堅牢なWebSocket接続管理"""

    def __init__(self):
        self.connections = {}
        self.subscription_managers = {}
        self.heartbeat_tasks = {}
        self.reconnect_strategies = {}

    async def create_connection(self, exchange: str, url: str, 
                              subscriptions: List[dict]):
        """WebSocket接続の作成"""

        strategy = ExponentialBackoffReconnectStrategy(
            initial_delay=1.0,
            max_delay=60.0,
            max_attempts=10
        )

        self.reconnect_strategies[exchange] = strategy

        connection = await self._establish_connection(exchange, url)
        self.connections[exchange] = connection

        # サブスクリプション管理
        subscription_manager = SubscriptionManager(connection)
        self.subscription_managers[exchange] = subscription_manager

        # サブスクリプションの実行
        for sub in subscriptions:
            await subscription_manager.subscribe(sub)

        # ハートビートの開始
        self.heartbeat_tasks[exchange] = asyncio.create_task(
            self._heartbeat_loop(exchange)
        )

    async def _establish_connection(self, exchange: str, url: str):
        """WebSocket接続の確立"""

        session = aiohttp.ClientSession()

        try:
            ws = await session.ws_connect(url)

            # 接続成功のログ
            self.log_connection_event(exchange, "connected")

            return WebSocketConnection(ws, session, exchange)

        except Exception as e:
            await session.close()
            raise ConnectionError(f"Failed to connect to {exchange}: {e}")

    async def _heartbeat_loop(self, exchange: str):
        """ハートビートループ"""

        while exchange in self.connections:
            try:
                connection = self.connections[exchange]

                # Ping送信
                await connection.ping()

                # レスポンス待機(タイムアウト付き)
                pong_received = await asyncio.wait_for(
                    connection.wait_pong(), timeout=10
                )

                if not pong_received:
                    raise TimeoutError("Pong timeout")

                await asyncio.sleep(30)  # 30秒間隔

            except Exception as e:
                # 接続エラー、再接続を試行
                await self._handle_connection_error(exchange, e)
                break

    async def _handle_connection_error(self, exchange: str, error: Exception):
        """接続エラーの処理"""

        self.log_connection_event(exchange, f"error: {error}")

        # 現在の接続をクリーンアップ
        if exchange in self.connections:
            await self.connections[exchange].close()
            del self.connections[exchange]

        # 再接続の試行
        strategy = self.reconnect_strategies[exchange]

        for attempt in range(strategy.max_attempts):
            try:
                delay = strategy.calculate_delay(attempt)
                await asyncio.sleep(delay)

                # 再接続
                await self._reconnect(exchange)
                break

            except Exception as reconnect_error:
                if attempt == strategy.max_attempts - 1:
                    # 最終的な失敗
                    self.log_connection_event(
                        exchange, 
                        f"reconnection failed: {reconnect_error}"
                    )
                    raise

2. データ品質チェックとフィルタリング

class DataQualityManager:
    """リアルタイムデータ品質管理"""

    def __init__(self):
        self.quality_thresholds = {
            'price_deviation': 0.1,      # 10%以上の急激な価格変動
            'volume_spike': 5.0,         # 5倍以上の出来高急増
            'timestamp_gap': 5.0,        # 5秒以上のタイムスタンプギャップ
            'duplicate_threshold': 0.001  # 重複データ検出閾値
        }
        self.data_history = defaultdict(deque)
        self.anomaly_counts = defaultdict(int)

    def validate_tick_data(self, tick: dict) -> bool:
        """ティックデータの検証"""

        symbol = tick['symbol']
        current_price = tick['price']
        current_time = tick['timestamp']
        current_volume = tick['volume']

        # 履歴データの取得
        history = self.data_history[symbol]

        if not history:
            # 初回データ
            history.append(tick)
            return True

        last_tick = history[-1]

        # 価格異常チェック
        if self._check_price_anomaly(current_price, last_tick['price']):
            self.anomaly_counts[f"{symbol}_price"] += 1
            return False

        # タイムスタンプチェック
        if self._check_timestamp_anomaly(current_time, last_tick['timestamp']):
            self.anomaly_counts[f"{symbol}_timestamp"] += 1
            return False

        # 出来高チェック
        if self._check_volume_anomaly(current_volume, history):
            self.anomaly_counts[f"{symbol}_volume"] += 1
            return False

        # 重複チェック
        if self._check_duplicate(tick, history):
            self.anomaly_counts[f"{symbol}_duplicate"] += 1
            return False

        # 検証通過、履歴に追加
        history.append(tick)
        if len(history) > 1000:  # 最新1000件のみ保持
            history.popleft()

        return True

    def _check_price_anomaly(self, current_price: float, last_price: float) -> bool:
        """価格異常の検出"""
        if last_price == 0:
            return False

        price_change = abs(current_price - last_price) / last_price
        return price_change > self.quality_thresholds['price_deviation']

    def _check_volume_anomaly(self, current_volume: float, history: deque) -> bool:
        """出来高異常の検出"""
        if len(history) < 10:
            return False

        recent_volumes = [tick['volume'] for tick in list(history)[-10:]]
        avg_volume = np.mean(recent_volumes)

        if avg_volume == 0:
            return False

        volume_ratio = current_volume / avg_volume
        return volume_ratio > self.quality_thresholds['volume_spike']

    def generate_quality_report(self) -> dict:
        """データ品質レポートの生成"""

        total_anomalies = sum(self.anomaly_counts.values())

        report = {
            'total_anomalies': total_anomalies,
            'anomaly_breakdown': dict(self.anomaly_counts),
            'quality_score': self._calculate_quality_score(),
            'recommendations': self._generate_recommendations()
        }

        return report

障害復旧とフェイルオーバー

1. 自動フェイルオーバーシステム

class AutoFailoverSystem:
    """自動フェイルオーバーシステム"""

    def __init__(self):
        self.primary_sources = {}
        self.backup_sources = {}
        self.health_monitors = {}
        self.failover_states = {}

    def configure_failover(self, data_type: str, primary: str, backups: List[str]):
        """フェイルオーバー設定"""

        self.primary_sources[data_type] = primary
        self.backup_sources[data_type] = backups
        self.failover_states[data_type] = {
            'current_source': primary,
            'failed_sources': set(),
            'last_failover': 0
        }

        # ヘルスモニターの開始
        self.health_monitors[data_type] = asyncio.create_task(
            self._monitor_health(data_type)
        )

    async def _monitor_health(self, data_type: str):
        """ヘルス監視"""

        while True:
            try:
                current_source = self.failover_states[data_type]['current_source']

                # ヘルスチェック実行
                is_healthy = await self._check_source_health(current_source, data_type)

                if not is_healthy:
                    # フェイルオーバーの実行
                    await self._execute_failover(data_type)

                await asyncio.sleep(10)  # 10秒間隔で監視

            except Exception as e:
                print(f"Health monitor error for {data_type}: {e}")
                await asyncio.sleep(30)

    async def _execute_failover(self, data_type: str):
        """フェイルオーバーの実行"""

        state = self.failover_states[data_type]
        current_source = state['current_source']

        # 現在のソースを失敗リストに追加
        state['failed_sources'].add(current_source)

        # 利用可能なバックアップソースを検索
        available_backups = [
            source for source in self.backup_sources[data_type]
            if source not in state['failed_sources']
        ]

        if not available_backups:
            # 全てのソースが失敗、プライマリをリセット
            state['failed_sources'].clear()
            available_backups = [self.primary_sources[data_type]]

        # 最適なバックアップソースを選択
        best_backup = await self._select_best_backup(available_backups, data_type)

        # フェイルオーバー実行
        old_source = state['current_source']
        state['current_source'] = best_backup
        state['last_failover'] = time.time()

        # イベント記録
        self._log_failover_event(data_type, old_source, best_backup)

        # データソースの切り替え
        await self._switch_data_source(data_type, old_source, best_backup)

    async def _select_best_backup(self, candidates: List[str], data_type: str) -> str:
        """最適なバックアップソースの選択"""

        scores = {}

        for candidate in candidates:
            # レイテンシテスト
            latency = await self._test_latency(candidate, data_type)

            # データ品質テスト
            quality = await self._test_data_quality(candidate, data_type)

            # 総合スコア
            scores[candidate] = quality * 0.7 + (1.0 / (1.0 + latency)) * 0.3

        return max(scores.keys(), key=scores.get)

パフォーマンス最適化

1. 接続プール管理

class ConnectionPoolManager:
    """効率的な接続プール管理"""

    def __init__(self):
        self.pools = {}
        self.pool_configs = {}

    def create_pool(self, exchange: str, max_connections: int = 100):
        """接続プールの作成"""

        connector = aiohttp.TCPConnector(
            limit=max_connections,
            limit_per_host=max_connections // 2,
            keepalive_timeout=300,
            enable_cleanup_closed=True
        )

        timeout = aiohttp.ClientTimeout(
            total=30,
            connect=10,
            sock_read=10
        )

        session = aiohttp.ClientSession(
            connector=connector,
            timeout=timeout,
            headers={'User-Agent': f'MLTradingSystem/{exchange}'}
        )

        self.pools[exchange] = session
        self.pool_configs[exchange] = {
            'max_connections': max_connections,
            'created_at': time.time(),
            'requests_count': 0
        }

    async def get_session(self, exchange: str) -> aiohttp.ClientSession:
        """セッションの取得"""

        if exchange not in self.pools:
            self.create_pool(exchange)

        # リクエスト数を記録
        self.pool_configs[exchange]['requests_count'] += 1

        return self.pools[exchange]

    async def cleanup_pools(self):
        """プールのクリーンアップ"""

        for exchange, session in self.pools.items():
            await session.close()

        self.pools.clear()
        self.pool_configs.clear()

監視とアラート

1. 包括的監視システム

class APIMonitoringSystem:
    """API監視システム"""

    def __init__(self):
        self.metrics_collector = MetricsCollector()
        self.alert_manager = AlertManager()
        self.dashboard_updater = DashboardUpdater()

    def collect_metrics(self, exchange: str, endpoint: str, 
                       response_time: float, status_code: int):
        """メトリクスの収集"""

        timestamp = time.time()

        metrics = {
            'exchange': exchange,
            'endpoint': endpoint,
            'response_time': response_time,
            'status_code': status_code,
            'timestamp': timestamp,
            'success': status_code < 400
        }

        self.metrics_collector.record(metrics)

        # アラートチェック
        self._check_alerts(metrics)

        # ダッシュボード更新
        self.dashboard_updater.update(metrics)

    def _check_alerts(self, metrics: dict):
        """アラートチェック"""

        # レスポンス時間アラート
        if metrics['response_time'] > 5.0:  # 5秒超過
            self.alert_manager.send_alert(
                level='warning',
                message=f"High response time: {metrics['response_time']:.2f}s for {metrics['exchange']}"
            )

        # エラー率アラート
        error_rate = self.metrics_collector.get_error_rate(
            metrics['exchange'], 
            window_minutes=5
        )

        if error_rate > 0.1:  # 10%超過
            self.alert_manager.send_alert(
                level='critical',
                message=f"High error rate: {error_rate:.1%} for {metrics['exchange']}"
            )

ベストプラクティス

1. 効率的なデータ取得戦略

class OptimalDataAcquisitionStrategy:
    """最適なデータ取得戦略"""

    def __init__(self):
        self.data_priorities = {
            'orderbook': {'priority': 1, 'frequency': 100},  # 100ms
            'trades': {'priority': 2, 'frequency': 50},      # 50ms
            'ohlcv': {'priority': 3, 'frequency': 60000},    # 1分
            'funding': {'priority': 4, 'frequency': 3600000} # 1時間
        }

    def optimize_subscription_strategy(self, available_bandwidth: int,
                                     required_data_types: List[str]) -> dict:
        """サブスクリプション戦略の最適化"""

        strategy = {}

        # 優先度順でソート
        sorted_types = sorted(
            required_data_types,
            key=lambda x: self.data_priorities[x]['priority']
        )

        allocated_bandwidth = 0

        for data_type in sorted_types:
            estimated_bandwidth = self._estimate_bandwidth_usage(data_type)

            if allocated_bandwidth + estimated_bandwidth <= available_bandwidth:
                strategy[data_type] = {
                    'enabled': True,
                    'frequency': self.data_priorities[data_type]['frequency'],
                    'bandwidth': estimated_bandwidth
                }
                allocated_bandwidth += estimated_bandwidth
            else:
                strategy[data_type] = {'enabled': False}

        return strategy

まとめ

効果的な取引所API管理には以下が重要です:

  1. 適応的レート制限管理 - 動的な制限調整とエラー回復
  2. 堅牢なWebSocket接続 - 自動再接続とフォールバック
  3. インテリジェントな負荷分散 - 複数取引所間での最適な分散
  4. 包括的な監視システム - リアルタイムメトリクスとアラート
  5. データ品質管理 - 異常データの検出とフィルタリング

これらの実装により、安定性と効率性を両立した高品質な取引システムが構築できます。