異常検知とフラッシュクラッシュ予測
概要
暗号通貨市場における異常検知は、急激な価格変動、不正取引、システム障害などを早期に発見し、損失を防ぐために重要です。本ドキュメントでは、機械学習を用いた異常検知手法とフラッシュクラッシュの予測について解説します。
1. 異常検知の種類と特徴
1.1 検知対象となる異常パターン
価格異常
- フラッシュクラッシュ: 数秒〜数分での急激な価格下落
- ポンプ&ダンプ: 人為的な価格操作
- 価格乖離: 取引所間の異常な価格差
- ファットフィンガー: 誤発注による異常取引
取引量異常
- ウォッシュトレーディング: 仮装売買による出来高操作
- 異常な大口取引: クジラによる市場操作
- 流動性の急激な変化: 取引量の異常な増減
ネットワーク異常
- メンプール詰まり: トランザクション遅延
- ガス代急騰: ネットワーク手数料の異常上昇
- 51%攻撃の兆候: ハッシュレート集中
1.2 異常検知手法の分類
# 異常検知手法の特徴
ANOMALY_DETECTION_METHODS = {
'statistical': {
'methods': ['Z-score', 'IQR', 'EWMA'],
'pros': '計算が高速、解釈が容易',
'cons': '複雑なパターンを見逃す可能性'
},
'machine_learning': {
'methods': ['Isolation Forest', 'One-Class SVM', 'LOF'],
'pros': '非線形パターンの検出が可能',
'cons': 'パラメータ調整が必要'
},
'deep_learning': {
'methods': ['Autoencoder', 'LSTM-AD', 'VAE'],
'pros': '複雑な時系列パターンを学習',
'cons': '計算コストが高い、説明性が低い'
}
}
2. 統計的異常検知
2.1 リアルタイムZ-score異常検知
import numpy as np
from collections import deque
from scipy import stats
class RealTimeZScoreDetector:
def __init__(self, window_size=100, threshold=3.0):
self.window_size = window_size
self.threshold = threshold
self.price_window = deque(maxlen=window_size)
self.volume_window = deque(maxlen=window_size)
def update(self, price, volume):
"""新しいデータポイントの追加"""
self.price_window.append(price)
self.volume_window.append(volume)
if len(self.price_window) < self.window_size:
return None
# Z-scoreの計算
price_zscore = self.calculate_zscore(price, self.price_window)
volume_zscore = self.calculate_zscore(volume, self.volume_window)
# 異常判定
anomalies = []
if abs(price_zscore) > self.threshold:
anomalies.append({
'type': 'price_anomaly',
'zscore': price_zscore,
'value': price,
'severity': self.get_severity(price_zscore)
})
if abs(volume_zscore) > self.threshold:
anomalies.append({
'type': 'volume_anomaly',
'zscore': volume_zscore,
'value': volume,
'severity': self.get_severity(volume_zscore)
})
return anomalies if anomalies else None
def calculate_zscore(self, value, window):
"""Z-scoreの計算"""
mean = np.mean(window)
std = np.std(window)
if std == 0:
return 0
return (value - mean) / std
def get_severity(self, zscore):
"""異常の深刻度を判定"""
abs_zscore = abs(zscore)
if abs_zscore > 5:
return 'critical'
elif abs_zscore > 4:
return 'high'
elif abs_zscore > 3:
return 'medium'
else:
return 'low'
2.2 EWMA(指数加重移動平均)ベースの検知
class EWMADetector:
def __init__(self, alpha=0.1, std_multiplier=3):
self.alpha = alpha
self.std_multiplier = std_multiplier
self.ewma = None
self.ewmstd = None
def update(self, value):
"""EWMAの更新と異常検知"""
if self.ewma is None:
self.ewma = value
self.ewmstd = 0
return False
# EWMA更新
self.ewma = self.alpha * value + (1 - self.alpha) * self.ewma
# EWMA標準偏差の更新
deviation = abs(value - self.ewma)
self.ewmstd = self.alpha * deviation + (1 - self.alpha) * self.ewmstd
# 異常判定
upper_bound = self.ewma + self.std_multiplier * self.ewmstd
lower_bound = self.ewma - self.std_multiplier * self.ewmstd
is_anomaly = value > upper_bound or value < lower_bound
return {
'is_anomaly': is_anomaly,
'value': value,
'ewma': self.ewma,
'upper_bound': upper_bound,
'lower_bound': lower_bound,
'deviation_ratio': deviation / self.ewmstd if self.ewmstd > 0 else 0
}
3. 機械学習による異常検知
3.1 Isolation Forestによる多次元異常検知
from sklearn.ensemble import IsolationForest
import pandas as pd
class MultiDimensionalAnomalyDetector:
def __init__(self, contamination=0.01, n_estimators=100):
self.model = IsolationForest(
contamination=contamination,
n_estimators=n_estimators,
random_state=42
)
self.feature_buffer = []
self.min_samples = 1000
self.is_fitted = False
def extract_features(self, market_data):
"""特徴量の抽出"""
features = {
# 価格関連
'price': market_data['price'],
'price_change_1m': market_data.get('price_change_1m', 0),
'price_change_5m': market_data.get('price_change_5m', 0),
'volatility': market_data.get('volatility', 0),
# ボリューム関連
'volume': market_data['volume'],
'volume_ratio': market_data.get('volume_ratio', 1),
'buy_sell_ratio': market_data.get('buy_sell_ratio', 0.5),
# 取引関連
'trade_count': market_data.get('trade_count', 0),
'avg_trade_size': market_data.get('avg_trade_size', 0),
'large_trade_ratio': market_data.get('large_trade_ratio', 0),
# スプレッド
'bid_ask_spread': market_data.get('spread', 0),
'spread_ratio': market_data.get('spread_ratio', 0),
# 取引所間の差
'price_deviation': market_data.get('price_deviation', 0),
'volume_concentration': market_data.get('volume_concentration', 0)
}
return list(features.values())
def detect(self, market_data):
"""異常検知の実行"""
features = self.extract_features(market_data)
# モデルの訓練(初回または定期的な再訓練)
if not self.is_fitted:
self.feature_buffer.append(features)
if len(self.feature_buffer) >= self.min_samples:
self.model.fit(self.feature_buffer)
self.is_fitted = True
return None
# 予測
anomaly_score = self.model.score_samples([features])[0]
is_anomaly = self.model.predict([features])[0] == -1
if is_anomaly:
# 異常の詳細分析
feature_names = list(self.extract_features(market_data).keys())
feature_importance = self.analyze_anomaly_features(features, feature_names)
return {
'is_anomaly': True,
'anomaly_score': anomaly_score,
'timestamp': market_data.get('timestamp'),
'features': dict(zip(feature_names, features)),
'important_features': feature_importance
}
return None
def analyze_anomaly_features(self, features, feature_names):
"""異常に寄与した特徴量の分析"""
# 各特徴量を個別に正常値に置き換えて影響を測定
base_score = self.model.score_samples([features])[0]
feature_importance = {}
for i, name in enumerate(feature_names):
# 特徴量を平均値に置き換え
modified_features = features.copy()
modified_features[i] = np.mean([f[i] for f in self.feature_buffer[-100:]])
# スコアの変化を計算
modified_score = self.model.score_samples([modified_features])[0]
importance = abs(modified_score - base_score)
feature_importance[name] = importance
# 重要度でソート
return dict(sorted(feature_importance.items(), key=lambda x: x[1], reverse=True)[:5])
3.2 LSTM Autoencoderによる時系列異常検知
import tensorflow as tf
from tensorflow.keras import layers, models
class LSTMAutoencoder:
def __init__(self, sequence_length=60, n_features=10, encoding_dim=32):
self.sequence_length = sequence_length
self.n_features = n_features
self.encoding_dim = encoding_dim
self.model = self.build_model()
self.threshold = None
def build_model(self):
"""LSTMオートエンコーダーの構築"""
# エンコーダー
inputs = layers.Input(shape=(self.sequence_length, self.n_features))
# エンコーダーLSTM
encoded = layers.LSTM(64, activation='relu', return_sequences=True)(inputs)
encoded = layers.LSTM(self.encoding_dim, activation='relu', return_sequences=False)(encoded)
# デコーダー
decoded = layers.RepeatVector(self.sequence_length)(encoded)
decoded = layers.LSTM(self.encoding_dim, activation='relu', return_sequences=True)(decoded)
decoded = layers.LSTM(64, activation='relu', return_sequences=True)(decoded)
decoded = layers.TimeDistributed(layers.Dense(self.n_features))(decoded)
# モデル
autoencoder = models.Model(inputs, decoded)
autoencoder.compile(optimizer='adam', loss='mse')
return autoencoder
def train(self, normal_data, validation_split=0.1, epochs=50):
"""正常データでの訓練"""
history = self.model.fit(
normal_data, normal_data,
epochs=epochs,
batch_size=32,
validation_split=validation_split,
shuffle=True,
callbacks=[
tf.keras.callbacks.EarlyStopping(patience=5),
tf.keras.callbacks.ReduceLROnPlateau(patience=3)
]
)
# 閾値の設定(訓練データの再構築誤差の95パーセンタイル)
train_predictions = self.model.predict(normal_data)
mse = np.mean(np.square(normal_data - train_predictions), axis=(1, 2))
self.threshold = np.percentile(mse, 95)
return history
def detect_anomalies(self, data):
"""異常検知の実行"""
predictions = self.model.predict(data)
mse = np.mean(np.square(data - predictions), axis=(1, 2))
anomalies = []
for i, error in enumerate(mse):
if error > self.threshold:
# 異常な時系列の特定の時点を分析
sequence_errors = np.mean(np.square(data[i] - predictions[i]), axis=1)
max_error_idx = np.argmax(sequence_errors)
anomalies.append({
'index': i,
'reconstruction_error': error,
'threshold': self.threshold,
'anomaly_ratio': error / self.threshold,
'max_error_timestep': max_error_idx,
'sequence_errors': sequence_errors.tolist()
})
return anomalies
4. フラッシュクラッシュ予測
4.1 早期警告システム
class FlashCrashEarlyWarning:
def __init__(self):
self.indicators = {
'volatility_spike': VolatilitySpikeDetector(),
'liquidity_drain': LiquidityDrainDetector(),
'cascade_selling': CascadeSellingDetector(),
'order_book_imbalance': OrderBookImbalanceDetector()
}
self.warning_history = deque(maxlen=100)
def analyze_market_conditions(self, market_data):
"""市場状況の総合分析"""
warnings = {}
risk_score = 0
# 各指標の評価
for name, detector in self.indicators.items():
result = detector.analyze(market_data)
if result['warning_level'] > 0:
warnings[name] = result
risk_score += result['warning_level'] * result.get('weight', 1)
# 総合リスクスコア
total_risk = risk_score / len(self.indicators)
# 警告履歴の更新
self.warning_history.append({
'timestamp': market_data['timestamp'],
'risk_score': total_risk,
'warnings': warnings
})
# フラッシュクラッシュの確率推定
crash_probability = self.estimate_crash_probability(total_risk)
return {
'risk_score': total_risk,
'crash_probability': crash_probability,
'warnings': warnings,
'recommended_action': self.get_recommended_action(total_risk)
}
def estimate_crash_probability(self, risk_score):
"""フラッシュクラッシュの確率推定"""
# シグモイド関数でマッピング
return 1 / (1 + np.exp(-2 * (risk_score - 0.5)))
def get_recommended_action(self, risk_score):
"""推奨アクション"""
if risk_score > 0.8:
return 'IMMEDIATE_EXIT'
elif risk_score > 0.6:
return 'REDUCE_POSITION'
elif risk_score > 0.4:
return 'MONITOR_CLOSELY'
else:
return 'NORMAL_OPERATION'
4.2 ボラティリティスパイク検出
class VolatilitySpikeDetector:
def __init__(self, lookback_period=20, spike_threshold=2.5):
self.lookback_period = lookback_period
self.spike_threshold = spike_threshold
self.price_history = deque(maxlen=lookback_period * 2)
def analyze(self, market_data):
"""ボラティリティスパイクの分析"""
self.price_history.append(market_data['price'])
if len(self.price_history) < self.lookback_period:
return {'warning_level': 0, 'weight': 1.5}
# リターンの計算
returns = np.diff(list(self.price_history)) / list(self.price_history)[:-1]
# 短期と長期のボラティリティ
short_vol = np.std(returns[-5:]) if len(returns) >= 5 else 0
long_vol = np.std(returns[-self.lookback_period:])
# ボラティリティ比率
vol_ratio = short_vol / long_vol if long_vol > 0 else 1
# GARCH効果の検出
garch_score = self.detect_garch_effect(returns)
# 警告レベルの決定
warning_level = 0
if vol_ratio > self.spike_threshold:
warning_level = min((vol_ratio - self.spike_threshold) / self.spike_threshold, 1)
return {
'warning_level': warning_level,
'weight': 1.5,
'volatility_ratio': vol_ratio,
'short_term_vol': short_vol,
'long_term_vol': long_vol,
'garch_score': garch_score
}
def detect_garch_effect(self, returns):
"""GARCH効果(ボラティリティクラスタリング)の検出"""
if len(returns) < 20:
return 0
# 二乗リターンの自己相関
squared_returns = np.square(returns)
autocorr = np.corrcoef(squared_returns[:-1], squared_returns[1:])[0, 1]
return max(0, autocorr)
4.3 流動性枯渇検出
class LiquidityDrainDetector:
def __init__(self):
self.volume_history = deque(maxlen=100)
self.spread_history = deque(maxlen=100)
self.depth_history = deque(maxlen=100)
def analyze(self, market_data):
"""流動性の分析"""
# データの更新
self.volume_history.append(market_data.get('volume', 0))
self.spread_history.append(market_data.get('spread', 0))
self.depth_history.append(market_data.get('order_book_depth', 0))
if len(self.volume_history) < 20:
return {'warning_level': 0, 'weight': 2.0}
# 流動性指標の計算
volume_trend = self.calculate_trend(list(self.volume_history))
spread_trend = self.calculate_trend(list(self.spread_history))
depth_trend = self.calculate_trend(list(self.depth_history))
# 流動性スコア(低いほど危険)
liquidity_score = (
(1 + volume_trend) * 0.3 + # ボリューム減少
(1 - spread_trend) * 0.4 + # スプレッド拡大
(1 + depth_trend) * 0.3 # 板の薄さ
) / 3
# 警告レベル
warning_level = max(0, 1 - liquidity_score)
return {
'warning_level': warning_level,
'weight': 2.0,
'liquidity_score': liquidity_score,
'volume_trend': volume_trend,
'spread_trend': spread_trend,
'depth_trend': depth_trend,
'current_volume': market_data.get('volume', 0),
'current_spread': market_data.get('spread', 0)
}
def calculate_trend(self, data):
"""トレンドの計算(-1: 減少, 0: 横ばい, 1: 増加)"""
if len(data) < 2:
return 0
# 線形回帰
x = np.arange(len(data))
slope, _ = np.polyfit(x, data, 1)
# 正規化
data_range = np.max(data) - np.min(data)
if data_range > 0:
normalized_slope = slope / data_range
return np.clip(normalized_slope * 10, -1, 1)
return 0
4.4 カスケード売却検出
class CascadeSellingDetector:
def __init__(self):
self.sell_pressure_history = deque(maxlen=50)
self.stop_loss_levels = {}
def analyze(self, market_data):
"""カスケード売却の検出"""
# 売り圧力の計算
sell_pressure = self.calculate_sell_pressure(market_data)
self.sell_pressure_history.append(sell_pressure)
# ストップロスレベルの推定
self.update_stop_loss_levels(market_data['price'])
# カスケードリスクの評価
cascade_risk = self.evaluate_cascade_risk(
market_data['price'],
sell_pressure
)
return {
'warning_level': cascade_risk,
'weight': 1.8,
'sell_pressure': sell_pressure,
'critical_levels': self.get_critical_price_levels(market_data['price']),
'momentum': self.calculate_momentum()
}
def calculate_sell_pressure(self, market_data):
"""売り圧力の計算"""
# 売り注文の割合
sell_ratio = market_data.get('sell_volume', 0) / max(market_data.get('total_volume', 1), 1)
# 価格下落速度
price_velocity = market_data.get('price_change_1m', 0) / market_data['price']
# 総合的な売り圧力
return sell_ratio * 0.6 + min(0, price_velocity) * -10 * 0.4
def update_stop_loss_levels(self, current_price):
"""ストップロスレベルの更新"""
# 一般的なストップロスレベル(5%, 10%, 15%下)
common_levels = [0.95, 0.90, 0.85]
for level in common_levels:
stop_price = current_price * level
if stop_price not in self.stop_loss_levels:
self.stop_loss_levels[stop_price] = 0
self.stop_loss_levels[stop_price] += 1
# 古いレベルの削除
current_range = (current_price * 0.8, current_price * 1.2)
self.stop_loss_levels = {
k: v for k, v in self.stop_loss_levels.items()
if current_range[0] <= k <= current_range[1]
}
def evaluate_cascade_risk(self, current_price, sell_pressure):
"""カスケードリスクの評価"""
# 近いストップロスレベルの密度
nearby_stops = sum(
1 for price in self.stop_loss_levels
if current_price * 0.95 <= price <= current_price
)
# リスクスコア
risk = (nearby_stops / 10) * 0.5 + sell_pressure * 0.5
return min(risk, 1.0)
5. 統合異常検知システム
5.1 アンサンブル異常検知
class EnsembleAnomalyDetector:
def __init__(self):
self.detectors = {
'statistical': RealTimeZScoreDetector(),
'isolation_forest': MultiDimensionalAnomalyDetector(),
'lstm_autoencoder': LSTMAutoencoder(),
'flash_crash': FlashCrashEarlyWarning()
}
self.alert_manager = AlertManager()
self.performance_tracker = PerformanceTracker()
async def process_market_data(self, market_data):
"""市場データの処理と異常検知"""
all_anomalies = []
# 各検知器の実行
for name, detector in self.detectors.items():
try:
if name == 'statistical':
anomalies = detector.update(
market_data['price'],
market_data['volume']
)
elif name == 'flash_crash':
result = detector.analyze_market_conditions(market_data)
if result['risk_score'] > 0.5:
anomalies = [{
'type': 'flash_crash_warning',
'risk_score': result['risk_score'],
'details': result
}]
else:
anomalies = None
else:
anomalies = detector.detect(market_data)
if anomalies:
all_anomalies.append({
'detector': name,
'anomalies': anomalies,
'timestamp': market_data['timestamp']
})
except Exception as e:
print(f"Error in {name} detector: {e}")
# 異常の統合評価
if all_anomalies:
integrated_result = self.integrate_anomalies(all_anomalies)
# アラート生成
if integrated_result['severity'] >= 'medium':
await self.alert_manager.send_alert(integrated_result)
# パフォーマンス追跡
self.performance_tracker.record(integrated_result)
return integrated_result
return None
def integrate_anomalies(self, all_anomalies):
"""複数の異常検知結果の統合"""
# 検知器ごとの重み
weights = {
'statistical': 0.2,
'isolation_forest': 0.3,
'lstm_autoencoder': 0.3,
'flash_crash': 0.2
}
# 統合スコアの計算
total_score = 0
detected_by = []
for anomaly_group in all_anomalies:
detector = anomaly_group['detector']
weight = weights.get(detector, 0.25)
# 各検知器のスコアを正規化
if detector == 'flash_crash':
score = anomaly_group['anomalies'][0]['risk_score']
else:
score = len(anomaly_group['anomalies']) / 10 # 簡易スコア
total_score += score * weight
detected_by.append(detector)
# 深刻度の判定
if total_score > 0.8:
severity = 'critical'
elif total_score > 0.6:
severity = 'high'
elif total_score > 0.4:
severity = 'medium'
else:
severity = 'low'
return {
'integrated_score': total_score,
'severity': severity,
'detected_by': detected_by,
'anomaly_details': all_anomalies,
'timestamp': all_anomalies[0]['timestamp']
}
5.2 アラート管理システム
class AlertManager:
def __init__(self):
self.alert_channels = {
'email': EmailNotifier(),
'slack': SlackNotifier(),
'telegram': TelegramNotifier(),
'webhook': WebhookNotifier()
}
self.alert_history = deque(maxlen=1000)
self.rate_limiter = RateLimiter()
async def send_alert(self, anomaly_result):
"""アラートの送信"""
# レート制限チェック
if not self.rate_limiter.can_send(anomaly_result['severity']):
return
# アラートメッセージの生成
alert_message = self.format_alert_message(anomaly_result)
# 優先度に基づいてチャネル選択
channels = self.select_channels(anomaly_result['severity'])
# 並列送信
tasks = []
for channel in channels:
if channel in self.alert_channels:
task = self.alert_channels[channel].send(alert_message)
tasks.append(task)
await asyncio.gather(*tasks)
# 履歴に記録
self.alert_history.append({
'timestamp': anomaly_result['timestamp'],
'severity': anomaly_result['severity'],
'channels': channels,
'message': alert_message
})
def format_alert_message(self, anomaly_result):
"""アラートメッセージのフォーマット"""
severity_emoji = {
'critical': '🚨',
'high': '⚠️',
'medium': '📊',
'low': 'ℹ️'
}
message = f"{severity_emoji[anomaly_result['severity']]} **{anomaly_result['severity'].upper()} ANOMALY DETECTED**\n\n"
message += f"Score: {anomaly_result['integrated_score']:.2f}\n"
message += f"Detected by: {', '.join(anomaly_result['detected_by'])}\n"
message += f"Time: {anomaly_result['timestamp']}\n\n"
# 詳細情報
for anomaly_group in anomaly_result['anomaly_details']:
if anomaly_group['detector'] == 'flash_crash':
details = anomaly_group['anomalies'][0]['details']
message += f"**Flash Crash Risk**: {details['crash_probability']:.1%}\n"
message += f"Recommended Action: {details['recommended_action']}\n"
return message
def select_channels(self, severity):
"""深刻度に基づくチャネル選択"""
if severity == 'critical':
return ['email', 'slack', 'telegram', 'webhook']
elif severity == 'high':
return ['slack', 'telegram']
elif severity == 'medium':
return ['slack']
else:
return ['webhook']
6. パフォーマンス評価とバックテスト
6.1 異常検知パフォーマンス評価
class AnomalyDetectionEvaluator:
def __init__(self):
self.true_positives = 0
self.false_positives = 0
self.false_negatives = 0
self.true_negatives = 0
self.detection_latency = []
def evaluate_detection(self, detected_anomalies, true_anomalies):
"""検知結果の評価"""
for detected in detected_anomalies:
if self.is_true_positive(detected, true_anomalies):
self.true_positives += 1
latency = self.calculate_detection_latency(detected, true_anomalies)
self.detection_latency.append(latency)
else:
self.false_positives += 1
# 見逃した異常
for true_anomaly in true_anomalies:
if not self.was_detected(true_anomaly, detected_anomalies):
self.false_negatives += 1
def calculate_metrics(self):
"""評価指標の計算"""
precision = self.true_positives / max(self.true_positives + self.false_positives, 1)
recall = self.true_positives / max(self.true_positives + self.false_negatives, 1)
f1_score = 2 * (precision * recall) / max(precision + recall, 1e-6)
return {
'precision': precision,
'recall': recall,
'f1_score': f1_score,
'avg_detection_latency': np.mean(self.detection_latency) if self.detection_latency else 0,
'total_detections': self.true_positives + self.false_positives,
'missed_anomalies': self.false_negatives
}
まとめ
異常検知とフラッシュクラッシュ予測システムの実装には、以下が重要です:
- 多層防御: 統計的手法と機械学習の組み合わせ
- リアルタイム性: 低レイテンシーでの検知と警告
- 誤検知の最小化: アンサンブル手法による精度向上
- 説明可能性: 異常の原因と影響の明確化
- 適応性: 市場環境の変化への自動適応
これらの要素を統合することで、暗号通貨市場の異常を効果的に検知し、投資家の資産を保護できます。