目次
リスク管理と資金管理の機械学習アプローチ
1. はじめに
暗号通貨取引において、優れた予測モデルを持っていても、適切なリスク管理と資金管理がなければ長期的な成功は望めません。本ドキュメントでは、機械学習を活用した高度なリスク管理と資金管理手法について解説します。
2. ポートフォリオ最適化with ML
2.1 動的ポートフォリオ最適化
import numpy as np
import pandas as pd
from scipy.optimize import minimize
import cvxpy as cp
from sklearn.covariance import LedoitWolf
import torch
import torch.nn as nn
from typing import Dict, List, Tuple, Optional
class MLPortfolioOptimizer:
def __init__(self,
assets: List[str],
lookback_period: int = 252,
rebalance_frequency: str = 'daily'):
self.assets = assets
self.lookback_period = lookback_period
self.rebalance_frequency = rebalance_frequency
self.covariance_estimator = LedoitWolf()
def calculate_expected_returns(self,
price_data: pd.DataFrame,
method: str = 'ml_enhanced') -> np.ndarray:
"""機械学習で強化された期待リターンを計算"""
returns = price_data.pct_change().dropna()
if method == 'historical':
# 単純な過去平均
expected_returns = returns.mean()
elif method == 'ml_enhanced':
# 機械学習による予測
features = self._extract_return_features(price_data)
expected_returns = self._predict_returns_ml(features, returns)
elif method == 'black_litterman':
# Black-Littermanモデル
market_weights = self._calculate_market_weights(price_data)
expected_returns = self._black_litterman_returns(
returns, market_weights
)
return expected_returns.values
def _extract_return_features(self, price_data: pd.DataFrame) -> pd.DataFrame:
"""リターン予測のための特徴量を抽出"""
features = pd.DataFrame(index=price_data.index)
returns = price_data.pct_change()
for asset in self.assets:
# 移動平均リターン
features[f'{asset}_ma_5'] = returns[asset].rolling(5).mean()
features[f'{asset}_ma_20'] = returns[asset].rolling(20).mean()
features[f'{asset}_ma_60'] = returns[asset].rolling(60).mean()
# ボラティリティ
features[f'{asset}_vol_20'] = returns[asset].rolling(20).std()
features[f'{asset}_vol_60'] = returns[asset].rolling(60).std()
# モメンタム
features[f'{asset}_momentum_20'] = price_data[asset] / price_data[asset].shift(20) - 1
# RSI
features[f'{asset}_rsi'] = self._calculate_rsi(price_data[asset])
# 相対強度
market_return = returns.mean(axis=1)
features[f'{asset}_relative_strength'] = returns[asset] - market_return
return features.dropna()
def _predict_returns_ml(self,
features: pd.DataFrame,
historical_returns: pd.DataFrame) -> pd.Series:
"""機械学習モデルでリターンを予測"""
from sklearn.ensemble import RandomForestRegressor
from sklearn.preprocessing import StandardScaler
predicted_returns = {}
for asset in self.assets:
# 特徴量の準備
asset_features = features[[col for col in features.columns if asset in col]]
X = asset_features[:-1].values # 最後のデータポイントを除く
y = historical_returns[asset].shift(-1)[:-1].values # 1期先のリターン
# 欠損値を除去
mask = ~np.isnan(y)
X = X[mask]
y = y[mask]
if len(X) < 100: # データが少ない場合は過去平均を使用
predicted_returns[asset] = historical_returns[asset].mean()
continue
# スケーリング
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# モデルの学習
model = RandomForestRegressor(
n_estimators=100,
max_depth=5,
random_state=42
)
# 時系列を考慮した学習
train_size = int(len(X) * 0.8)
model.fit(X_scaled[:train_size], y[:train_size])
# 最新データで予測
latest_features = asset_features.iloc[-1:].values
latest_scaled = scaler.transform(latest_features)
predicted_returns[asset] = model.predict(latest_scaled)[0]
return pd.Series(predicted_returns)
def estimate_risk_matrix(self,
returns: pd.DataFrame,
method: str = 'ml_shrinkage') -> np.ndarray:
"""リスク(共分散行列)を推定"""
if method == 'sample':
# サンプル共分散行列
cov_matrix = returns.cov()
elif method == 'ml_shrinkage':
# Ledoit-Wolf縮小推定
cov_matrix, _ = self.covariance_estimator.fit(returns).covariance_, None
elif method == 'dynamic':
# 動的条件付き相関(DCC)モデル
cov_matrix = self._estimate_dcc_covariance(returns)
return cov_matrix
def _estimate_dcc_covariance(self, returns: pd.DataFrame) -> np.ndarray:
"""DCCモデルで動的共分散を推定"""
# GARCH(1,1)で個別のボラティリティを推定
volatilities = {}
for asset in returns.columns:
vol = self._garch_volatility(returns[asset])
volatilities[asset] = vol
# 標準化リターンを計算
standardized_returns = pd.DataFrame()
for asset in returns.columns:
standardized_returns[asset] = returns[asset] / volatilities[asset]
# 動的相関の推定
# 簡略化のため、指数加重移動平均を使用
lambda_param = 0.94
correlation = standardized_returns.ewm(
com=(1-lambda_param)/lambda_param
).corr()
# 最新の相関行列を取得
latest_corr = correlation.iloc[-len(self.assets):].values
# 共分散行列を構築
latest_vols = np.array([volatilities[asset].iloc[-1] for asset in self.assets])
D = np.diag(latest_vols)
cov_matrix = D @ latest_corr @ D
return cov_matrix
def _garch_volatility(self, returns: pd.Series) -> pd.Series:
"""GARCH(1,1)モデルでボラティリティを推定"""
# 簡略化されたGARCH実装
omega = 0.00001
alpha = 0.1
beta = 0.85
volatility = [returns.std()] # 初期値
for i in range(1, len(returns)):
prev_vol = volatility[-1]
prev_return = returns.iloc[i-1]
new_vol = np.sqrt(
omega + alpha * prev_return**2 + beta * prev_vol**2
)
volatility.append(new_vol)
return pd.Series(volatility, index=returns.index)
def optimize_portfolio(self,
expected_returns: np.ndarray,
cov_matrix: np.ndarray,
constraints: Dict = None) -> Dict:
"""ポートフォリオを最適化"""
n_assets = len(self.assets)
# 制約条件のデフォルト値
if constraints is None:
constraints = {
'min_weight': 0.0,
'max_weight': 0.3,
'risk_target': None,
'return_target': None
}
# 最適化問題の設定
weights = cp.Variable(n_assets)
returns = expected_returns @ weights
risk = cp.quad_form(weights, cov_matrix)
# 目的関数(シャープレシオの最大化に相当)
objective = cp.Maximize(returns - 0.5 * risk)
# 制約条件
constraints_list = [
cp.sum(weights) == 1, # 重みの合計は1
weights >= constraints['min_weight'], # 最小重み
weights <= constraints['max_weight'] # 最大重み
]
# リスク目標がある場合
if constraints.get('risk_target'):
constraints_list.append(risk <= constraints['risk_target']**2)
# リターン目標がある場合
if constraints.get('return_target'):
constraints_list.append(returns >= constraints['return_target'])
# 問題を解く
problem = cp.Problem(objective, constraints_list)
problem.solve()
if problem.status != 'optimal':
print(f"Warning: Optimization status: {problem.status}")
# 結果を返す
optimal_weights = weights.value
portfolio_return = expected_returns @ optimal_weights
portfolio_risk = np.sqrt(optimal_weights @ cov_matrix @ optimal_weights)
return {
'weights': dict(zip(self.assets, optimal_weights)),
'expected_return': portfolio_return,
'expected_risk': portfolio_risk,
'sharpe_ratio': portfolio_return / portfolio_risk if portfolio_risk > 0 else 0
}
def _calculate_rsi(self, prices: pd.Series, period: int = 14) -> pd.Series:
"""RSIを計算"""
delta = prices.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return rsi
2.2 リスクパリティポートフォリオ
class RiskParityOptimizer:
def __init__(self, method: str = 'equal_risk_contribution'):
self.method = method
def optimize(self, cov_matrix: np.ndarray, assets: List[str]) -> Dict:
"""リスクパリティポートフォリオを構築"""
n_assets = len(assets)
if self.method == 'equal_risk_contribution':
# 各資産のリスク貢献度を均等化
weights = self._equal_risk_contribution(cov_matrix)
elif self.method == 'inverse_volatility':
# ボラティリティの逆数で重み付け
volatilities = np.sqrt(np.diag(cov_matrix))
inv_vols = 1 / volatilities
weights = inv_vols / np.sum(inv_vols)
elif self.method == 'hierarchical':
# 階層的リスクパリティ(HRP)
weights = self._hierarchical_risk_parity(cov_matrix, assets)
# ポートフォリオのリスク指標を計算
portfolio_risk = np.sqrt(weights @ cov_matrix @ weights)
risk_contributions = self._calculate_risk_contributions(weights, cov_matrix)
return {
'weights': dict(zip(assets, weights)),
'portfolio_risk': portfolio_risk,
'risk_contributions': dict(zip(assets, risk_contributions)),
'concentration_ratio': self._calculate_concentration_ratio(weights)
}
def _equal_risk_contribution(self, cov_matrix: np.ndarray) -> np.ndarray:
"""等リスク貢献ポートフォリオを計算"""
n_assets = cov_matrix.shape[0]
def risk_contribution_objective(weights):
# ポートフォリオリスク
portfolio_risk = np.sqrt(weights @ cov_matrix @ weights)
# 各資産のリスク貢献
marginal_contrib = cov_matrix @ weights
contrib = weights * marginal_contrib / portfolio_risk
# リスク貢献の分散(これを最小化)
target_contrib = portfolio_risk / n_assets
return np.sum((contrib - target_contrib) ** 2)
# 制約条件
constraints = [
{'type': 'eq', 'fun': lambda w: np.sum(w) - 1} # 重みの合計は1
]
# 境界条件(すべて正の重み)
bounds = [(0.001, 1.0) for _ in range(n_assets)]
# 初期値
x0 = np.ones(n_assets) / n_assets
# 最適化
result = minimize(
risk_contribution_objective,
x0,
method='SLSQP',
bounds=bounds,
constraints=constraints,
options={'ftol': 1e-9}
)
return result.x
def _hierarchical_risk_parity(self,
cov_matrix: np.ndarray,
assets: List[str]) -> np.ndarray:
"""階層的リスクパリティ(López de Prado)"""
from scipy.cluster.hierarchy import linkage, dendrogram, to_tree
from scipy.spatial.distance import squareform
# 相関行列を計算
corr_matrix = self._cov_to_corr(cov_matrix)
# 距離行列
dist_matrix = np.sqrt(0.5 * (1 - corr_matrix))
condensed_dist = squareform(dist_matrix)
# 階層的クラスタリング
link = linkage(condensed_dist, method='single')
# クラスタの順序を取得
root = to_tree(link)
ordered_indices = self._get_cluster_order(root, len(assets))
# 再帰的二分法で重みを計算
weights = self._recursive_bisection(
cov_matrix,
ordered_indices
)
return weights
def _get_cluster_order(self, node, n_assets):
"""クラスタツリーから資産の順序を取得"""
if node.is_leaf():
return [node.id]
left_order = self._get_cluster_order(node.left, n_assets)
right_order = self._get_cluster_order(node.right, n_assets)
return left_order + right_order
def _recursive_bisection(self,
cov_matrix: np.ndarray,
indices: List[int]) -> np.ndarray:
"""再帰的二分法で重みを計算"""
n_assets = cov_matrix.shape[0]
weights = np.zeros(n_assets)
def _bisect(sub_indices, alloc):
if len(sub_indices) == 1:
weights[sub_indices[0]] = alloc
return
# サブ行列を抽出
sub_cov = cov_matrix[np.ix_(sub_indices, sub_indices)]
# 二分割
split = len(sub_indices) // 2
left_indices = sub_indices[:split]
right_indices = sub_indices[split:]
# 各クラスタのリスクを計算
left_var = self._cluster_variance(cov_matrix, left_indices)
right_var = self._cluster_variance(cov_matrix, right_indices)
# 逆分散で配分
left_alloc = alloc * right_var / (left_var + right_var)
right_alloc = alloc * left_var / (left_var + right_var)
# 再帰的に配分
_bisect(left_indices, left_alloc)
_bisect(right_indices, right_alloc)
_bisect(indices, 1.0)
return weights
def _cluster_variance(self, cov_matrix: np.ndarray, indices: List[int]) -> float:
"""クラスタの分散を計算"""
sub_cov = cov_matrix[np.ix_(indices, indices)]
# 逆分散重み付けポートフォリオの分散
inv_vol = 1 / np.sqrt(np.diag(sub_cov))
weights = inv_vol / np.sum(inv_vol)
return weights @ sub_cov @ weights
def _cov_to_corr(self, cov_matrix: np.ndarray) -> np.ndarray:
"""共分散行列を相関行列に変換"""
std = np.sqrt(np.diag(cov_matrix))
corr_matrix = cov_matrix / np.outer(std, std)
return corr_matrix
def _calculate_risk_contributions(self,
weights: np.ndarray,
cov_matrix: np.ndarray) -> np.ndarray:
"""各資産のリスク貢献度を計算"""
portfolio_risk = np.sqrt(weights @ cov_matrix @ weights)
marginal_contrib = cov_matrix @ weights
risk_contrib = weights * marginal_contrib / portfolio_risk
return risk_contrib
def _calculate_concentration_ratio(self, weights: np.ndarray) -> float:
"""ポートフォリオの集中度を計算(Herfindahl指数)"""
return np.sum(weights ** 2)
3. 動的ポジションサイジング
3.1 機械学習ベースのポジションサイジング
class MLPositionSizer:
def __init__(self,
base_capital: float,
max_position_pct: float = 0.1,
min_position_pct: float = 0.01):
self.base_capital = base_capital
self.max_position_pct = max_position_pct
self.min_position_pct = min_position_pct
self.volatility_model = None
self.risk_model = None
def calculate_position_size(self,
signal_strength: float,
market_data: Dict,
current_portfolio: Dict) -> float:
"""シグナル強度と市場状況に基づいてポジションサイズを計算"""
# 基本サイズ(シグナル強度に比例)
base_size = self.base_capital * abs(signal_strength) * 0.1
# ボラティリティ調整
volatility_factor = self._volatility_adjustment(market_data)
# 相関調整
correlation_factor = self._correlation_adjustment(
market_data, current_portfolio
)
# 市場状況調整
market_factor = self._market_condition_adjustment(market_data)
# 最終的なポジションサイズ
position_size = base_size * volatility_factor * correlation_factor * market_factor
# 制限を適用
max_size = self.base_capital * self.max_position_pct
min_size = self.base_capital * self.min_position_pct
position_size = np.clip(position_size, min_size, max_size)
return position_size
def _volatility_adjustment(self, market_data: Dict) -> float:
"""ボラティリティに基づく調整"""
current_vol = market_data.get('volatility', 0.02)
target_vol = 0.02 # 目標ボラティリティ(2%)
# 逆ボラティリティスケーリング
vol_factor = min(target_vol / current_vol, 2.0)
# 極端なボラティリティの場合はさらに制限
if current_vol > 0.05: # 5%以上
vol_factor *= 0.5
return vol_factor
def _correlation_adjustment(self,
market_data: Dict,
current_portfolio: Dict) -> float:
"""ポートフォリオ相関に基づく調整"""
if not current_portfolio:
return 1.0
# 新しいポジションと既存ポートフォリオの相関を計算
symbol = market_data['symbol']
correlations = []
for asset, position in current_portfolio.items():
if asset != symbol:
corr = market_data.get('correlations', {}).get(asset, 0)
correlations.append(abs(corr) * abs(position))
# 平均相関が高い場合はポジションを減らす
if correlations:
avg_correlation = np.mean(correlations)
corr_factor = 1 - 0.5 * avg_correlation
else:
corr_factor = 1.0
return max(corr_factor, 0.3)
def _market_condition_adjustment(self, market_data: Dict) -> float:
"""市場全体の状況に基づく調整"""
factors = []
# 市場のトレンド
market_trend = market_data.get('market_trend', 0)
if abs(market_trend) > 0.5: # 強いトレンド
factors.append(1.2)
else:
factors.append(1.0)
# 市場の恐怖指数(暗号通貨版VIX)
fear_index = market_data.get('fear_greed_index', 50) / 100
if fear_index < 0.2: # 極度の恐怖
factors.append(0.5)
elif fear_index > 0.8: # 極度の貪欲
factors.append(0.7)
else:
factors.append(1.0)
# 流動性
liquidity_ratio = market_data.get('liquidity_ratio', 1.0)
if liquidity_ratio < 0.5:
factors.append(0.6)
else:
factors.append(1.0)
return np.prod(factors)
def train_volatility_model(self, historical_data: pd.DataFrame):
"""ボラティリティ予測モデルを学習"""
# GARCHモデルの代わりにニューラルネットワークを使用
class VolatilityNet(nn.Module):
def __init__(self, input_dim: int, hidden_dim: int = 64):
super().__init__()
self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True)
self.fc = nn.Linear(hidden_dim, 1)
self.relu = nn.ReLU()
def forward(self, x):
lstm_out, _ = self.lstm(x)
output = self.fc(self.relu(lstm_out[:, -1, :]))
return torch.exp(output) # 正の値を保証
# 特徴量を準備
returns = historical_data['close'].pct_change().dropna()
features = []
targets = []
window = 20
for i in range(window, len(returns) - 1):
# 過去のリターンを特徴量として使用
feature_window = returns[i-window:i].values
features.append(feature_window)
# 次期のボラティリティ(実現ボラティリティ)をターゲット
future_vol = returns[i:i+5].std() # 5期先までの標準偏差
targets.append(future_vol)
# テンソルに変換
X = torch.FloatTensor(features).unsqueeze(-1)
y = torch.FloatTensor(targets)
# モデルの学習
self.volatility_model = VolatilityNet(input_dim=1)
optimizer = torch.optim.Adam(self.volatility_model.parameters(), lr=0.001)
criterion = nn.MSELoss()
for epoch in range(100):
self.volatility_model.train()
optimizer.zero_grad()
outputs = self.volatility_model(X).squeeze()
loss = criterion(outputs, y)
loss.backward()
optimizer.step()
if epoch % 20 == 0:
print(f"Epoch {epoch}, Loss: {loss.item():.6f}")
3.2 Kelly基準の機械学習拡張
class MLKellyCriterion:
def __init__(self,
confidence_threshold: float = 0.6,
kelly_fraction: float = 0.25):
self.confidence_threshold = confidence_threshold
self.kelly_fraction = kelly_fraction # フラクショナルKelly
self.win_rate_model = None
self.win_loss_ratio_model = None
def calculate_kelly_fraction(self,
signal: Dict,
historical_performance: pd.DataFrame) -> float:
"""機械学習で強化されたKelly基準"""
# 勝率と損益比を予測
predicted_win_rate = self._predict_win_rate(signal)
predicted_win_loss_ratio = self._predict_win_loss_ratio(signal)
# 信頼度を計算
confidence = self._calculate_confidence(signal, historical_performance)
if confidence < self.confidence_threshold:
return 0.0
# Kelly公式: f = p - q/b
# p: 勝率, q: 敗率(1-p), b: 勝った時の利益/負けた時の損失
p = predicted_win_rate
q = 1 - p
b = predicted_win_loss_ratio
# 基本Kelly割合
kelly = p - q / b if b > 0 else 0
# 安全のため割合を減らす(フラクショナルKelly)
kelly = kelly * self.kelly_fraction * confidence
# 上限と下限を設定
kelly = np.clip(kelly, 0, 0.25) # 最大25%
return kelly
def _predict_win_rate(self, signal: Dict) -> float:
"""シグナルから勝率を予測"""
if self.win_rate_model is None:
# 簡単なルールベースの予測
signal_strength = abs(signal.get('strength', 0))
base_win_rate = 0.5
# シグナルが強いほど勝率が上がる
win_rate = base_win_rate + 0.3 * signal_strength
# 市場状況による調整
if signal.get('market_trend', 0) * signal.get('direction', 0) > 0:
win_rate += 0.1 # トレンドと同方向
return min(win_rate, 0.8)
else:
# 学習済みモデルを使用
features = self._extract_signal_features(signal)
return self.win_rate_model.predict(features)[0]
def _predict_win_loss_ratio(self, signal: Dict) -> float:
"""シグナルから損益比を予測"""
if self.win_loss_ratio_model is None:
# 簡単なルールベース
volatility = signal.get('volatility', 0.02)
# ボラティリティが高いほど潜在的な利益も大きい
base_ratio = 1.5
ratio = base_ratio * (1 + volatility * 10)
# リスクリワード比を考慮
if signal.get('risk_reward_ratio', 0) > 2:
ratio *= 1.2
return min(ratio, 3.0)
else:
# 学習済みモデルを使用
features = self._extract_signal_features(signal)
return self.win_loss_ratio_model.predict(features)[0]
def _calculate_confidence(self,
signal: Dict,
historical_performance: pd.DataFrame) -> float:
"""予測の信頼度を計算"""
confidence_factors = []
# シグナルの強度
signal_strength = abs(signal.get('strength', 0))
confidence_factors.append(signal_strength)
# 過去のパフォーマンス(類似シグナル)
similar_signals = self._find_similar_signals(signal, historical_performance)
if len(similar_signals) > 5:
success_rate = (similar_signals['profit'] > 0).mean()
confidence_factors.append(success_rate)
# 市場状況の明確さ
market_clarity = 1 - signal.get('market_uncertainty', 0.5)
confidence_factors.append(market_clarity)
# 総合的な信頼度
confidence = np.mean(confidence_factors)
return confidence
def _find_similar_signals(self,
current_signal: Dict,
historical_performance: pd.DataFrame) -> pd.DataFrame:
"""過去の類似シグナルを検索"""
if historical_performance.empty:
return pd.DataFrame()
# 類似度を計算
similarities = []
for idx, row in historical_performance.iterrows():
similarity = 0
# シグナル強度の類似性
strength_diff = abs(row['signal_strength'] - current_signal.get('strength', 0))
similarity += (1 - strength_diff) * 0.4
# 市場状況の類似性
if 'volatility' in row and 'volatility' in current_signal:
vol_diff = abs(row['volatility'] - current_signal['volatility'])
similarity += (1 - min(vol_diff / 0.05, 1)) * 0.3
# 方向の一致
if row['direction'] == current_signal.get('direction', 0):
similarity += 0.3
similarities.append(similarity)
historical_performance['similarity'] = similarities
# 類似度が高い上位のシグナルを返す
return historical_performance[
historical_performance['similarity'] > 0.7
].sort_values('similarity', ascending=False)
def train_models(self,
historical_signals: pd.DataFrame,
outcomes: pd.DataFrame):
"""勝率と損益比の予測モデルを学習"""
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.model_selection import train_test_split
# 特徴量を準備
X = self._prepare_training_features(historical_signals)
# 勝率モデル
y_win = (outcomes['profit'] > 0).astype(float)
X_train, X_test, y_train, y_test = train_test_split(
X, y_win, test_size=0.2, random_state=42
)
self.win_rate_model = GradientBoostingRegressor(
n_estimators=100,
max_depth=5,
learning_rate=0.01
)
self.win_rate_model.fit(X_train, y_train)
# 損益比モデル
# 勝ったトレードと負けたトレードを分離
wins = outcomes[outcomes['profit'] > 0]['profit']
losses = outcomes[outcomes['profit'] < 0]['profit'].abs()
if len(wins) > 0 and len(losses) > 0:
# 各シグナルの期待損益比を計算
y_ratio = []
for idx in range(len(outcomes)):
if outcomes.iloc[idx]['profit'] > 0:
y_ratio.append(wins.mean() / losses.mean())
else:
y_ratio.append(1.0) # デフォルト値
self.win_loss_ratio_model = GradientBoostingRegressor(
n_estimators=100,
max_depth=5,
learning_rate=0.01
)
self.win_loss_ratio_model.fit(X_train, y_ratio[:len(X_train)])
4. ドローダウン予測と防止
4.1 機械学習によるドローダウン予測
class DrawdownPredictor:
def __init__(self,
prediction_horizon: int = 20,
risk_threshold: float = 0.1):
self.prediction_horizon = prediction_horizon
self.risk_threshold = risk_threshold
self.drawdown_model = None
self.feature_importance = None
def predict_drawdown_risk(self,
market_data: pd.DataFrame,
portfolio_data: pd.DataFrame) -> Dict:
"""将来のドローダウンリスクを予測"""
# 特徴量を抽出
features = self._extract_drawdown_features(market_data, portfolio_data)
if self.drawdown_model is None:
# 簡単なルールベースの予測
risk_score = self._rule_based_prediction(features)
else:
# 機械学習モデルで予測
risk_score = self.drawdown_model.predict_proba(features)[:, 1][0]
# リスクレベルを分類
risk_level = self._classify_risk_level(risk_score)
# 推奨アクション
recommended_actions = self._recommend_actions(risk_score, features)
return {
'risk_score': risk_score,
'risk_level': risk_level,
'recommended_actions': recommended_actions,
'contributing_factors': self._identify_risk_factors(features)
}
def _extract_drawdown_features(self,
market_data: pd.DataFrame,
portfolio_data: pd.DataFrame) -> pd.DataFrame:
"""ドローダウン予測のための特徴量を抽出"""
features = {}
# ポートフォリオメトリクス
portfolio_returns = portfolio_data['value'].pct_change()
# 現在のドローダウン
running_max = portfolio_data['value'].expanding().max()
current_dd = (portfolio_data['value'] - running_max) / running_max
features['current_drawdown'] = current_dd.iloc[-1]
# ドローダウンの期間
dd_duration = (current_dd < 0).sum()
features['drawdown_duration'] = dd_duration
# ポートフォリオのボラティリティ
features['portfolio_volatility'] = portfolio_returns.rolling(20).std().iloc[-1]
# 最近のリターンのトレンド
features['return_trend'] = portfolio_returns.rolling(10).mean().iloc[-1]
# 市場指標
market_returns = market_data['close'].pct_change()
# 市場のボラティリティ
features['market_volatility'] = market_returns.rolling(20).std().iloc[-1]
# VIX相当指標
features['implied_volatility'] = self._calculate_implied_volatility(market_data)
# 相関の変化
rolling_corr = portfolio_returns.rolling(60).corr(market_returns)
features['correlation_change'] = rolling_corr.diff(20).iloc[-1]
# テクニカル指標
features['rsi'] = self._calculate_rsi(market_data['close']).iloc[-1]
features['market_momentum'] = (
market_data['close'].iloc[-1] / market_data['close'].iloc[-20] - 1
)
# 流動性指標
features['volume_ratio'] = (
market_data['volume'].rolling(5).mean() /
market_data['volume'].rolling(20).mean()
).iloc[-1]
# センチメント指標(仮想的)
features['fear_greed_index'] = self._calculate_sentiment_index(market_data)
return pd.DataFrame([features])
def _calculate_implied_volatility(self, market_data: pd.DataFrame) -> float:
"""インプライドボラティリティを推定"""
# 簡略化: 高値-安値の比率から推定
high_low_ratio = (market_data['high'] / market_data['low']).rolling(20).mean()
impl_vol = (high_low_ratio.iloc[-1] - 1) * np.sqrt(252)
return impl_vol
def _calculate_sentiment_index(self, market_data: pd.DataFrame) -> float:
"""市場センチメント指標を計算"""
# 価格の勢いから推定
momentum = market_data['close'].pct_change(20).iloc[-1]
volume_trend = (
market_data['volume'].rolling(5).mean() /
market_data['volume'].rolling(50).mean()
).iloc[-1]
# センチメントスコア(0-100)
sentiment = 50 + momentum * 100 + (volume_trend - 1) * 20
return np.clip(sentiment, 0, 100)
def _rule_based_prediction(self, features: pd.DataFrame) -> float:
"""ルールベースのドローダウンリスク予測"""
risk_score = 0.0
# 現在のドローダウンが深い
if features['current_drawdown'].iloc[0] < -0.05:
risk_score += 0.3
# ボラティリティが高い
if features['portfolio_volatility'].iloc[0] > 0.03:
risk_score += 0.2
# 負のモメンタム
if features['return_trend'].iloc[0] < -0.001:
risk_score += 0.2
# 市場との相関が急上昇
if features['correlation_change'].iloc[0] > 0.2:
risk_score += 0.15
# RSIが極端
rsi = features['rsi'].iloc[0]
if rsi < 30 or rsi > 70:
risk_score += 0.15
return min(risk_score, 1.0)
def train_model(self,
historical_features: pd.DataFrame,
historical_drawdowns: pd.Series):
"""ドローダウン予測モデルを学習"""
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
# ターゲット変数を作成(閾値以上のドローダウンが発生したか)
y = (historical_drawdowns < -self.risk_threshold).astype(int)
# 特徴量をスケーリング
scaler = StandardScaler()
X_scaled = scaler.fit_transform(historical_features)
# モデルを学習
self.drawdown_model = RandomForestClassifier(
n_estimators=200,
max_depth=10,
min_samples_split=20,
random_state=42
)
self.drawdown_model.fit(X_scaled, y)
# 特徴量の重要度を保存
self.feature_importance = pd.DataFrame({
'feature': historical_features.columns,
'importance': self.drawdown_model.feature_importances_
}).sort_values('importance', ascending=False)
def _classify_risk_level(self, risk_score: float) -> str:
"""リスクスコアからリスクレベルを分類"""
if risk_score < 0.3:
return 'LOW'
elif risk_score < 0.6:
return 'MEDIUM'
elif risk_score < 0.8:
return 'HIGH'
else:
return 'EXTREME'
def _recommend_actions(self, risk_score: float, features: pd.DataFrame) -> List[str]:
"""リスクスコアに基づいて推奨アクションを提示"""
actions = []
if risk_score > 0.7:
actions.append("ポジションサイズを50%削減")
actions.append("ストップロスを厳格化")
if risk_score > 0.5:
actions.append("新規ポジションの開設を制限")
actions.append("ヘッジポジションの検討")
if features['portfolio_volatility'].iloc[0] > 0.04:
actions.append("低ボラティリティ資産への配分を増加")
if features['current_drawdown'].iloc[0] < -0.08:
actions.append("損失確定の検討")
actions.append("リバランスの実施")
return actions
def _identify_risk_factors(self, features: pd.DataFrame) -> List[str]:
"""主要なリスク要因を特定"""
risk_factors = []
feature_dict = features.iloc[0].to_dict()
# 各特徴量の閾値チェック
if feature_dict['portfolio_volatility'] > 0.03:
risk_factors.append("高ポートフォリオボラティリティ")
if feature_dict['current_drawdown'] < -0.05:
risk_factors.append("既存のドローダウン")
if feature_dict['correlation_change'] > 0.2:
risk_factors.append("市場相関の急上昇")
if feature_dict['market_volatility'] > 0.04:
risk_factors.append("高市場ボラティリティ")
return risk_factors
4.2 ドローダウン防止システム
class DrawdownPreventionSystem:
def __init__(self,
max_drawdown: float = 0.2,
recovery_threshold: float = 0.1):
self.max_drawdown = max_drawdown
self.recovery_threshold = recovery_threshold
self.protection_active = False
self.protection_level = 0
def monitor_and_protect(self,
current_portfolio_value: float,
peak_value: float,
market_conditions: Dict) -> Dict:
"""ポートフォリオを監視し、保護措置を実行"""
# 現在のドローダウンを計算
current_drawdown = (current_portfolio_value - peak_value) / peak_value
# 保護レベルを決定
protection_actions = {}
if current_drawdown < -self.max_drawdown * 0.5:
# レベル1: 警告
self.protection_level = 1
protection_actions = self._level1_protection(market_conditions)
if current_drawdown < -self.max_drawdown * 0.7:
# レベル2: リスク削減
self.protection_level = 2
protection_actions = self._level2_protection(market_conditions)
if current_drawdown < -self.max_drawdown * 0.9:
# レベル3: 緊急措置
self.protection_level = 3
protection_actions = self._level3_protection(market_conditions)
# 回復モード
if self.protection_active and current_drawdown > -self.recovery_threshold:
self.protection_active = False
self.protection_level = 0
protection_actions['mode'] = 'recovery'
return {
'current_drawdown': current_drawdown,
'protection_level': self.protection_level,
'protection_active': self.protection_active,
'actions': protection_actions
}
def _level1_protection(self, market_conditions: Dict) -> Dict:
"""レベル1保護: 警告と軽微な調整"""
return {
'mode': 'warning',
'position_size_multiplier': 0.8,
'stop_loss_tightening': 0.02,
'new_positions_allowed': True,
'rebalance_frequency': 'daily'
}
def _level2_protection(self, market_conditions: Dict) -> Dict:
"""レベル2保護: 積極的なリスク削減"""
self.protection_active = True
return {
'mode': 'risk_reduction',
'position_size_multiplier': 0.5,
'stop_loss_tightening': 0.01,
'new_positions_allowed': False,
'close_losing_positions': True,
'hedge_ratio': 0.3,
'rebalance_frequency': 'hourly'
}
def _level3_protection(self, market_conditions: Dict) -> Dict:
"""レベル3保護: 緊急措置"""
self.protection_active = True
return {
'mode': 'emergency',
'position_size_multiplier': 0.1,
'stop_loss_tightening': 0.005,
'new_positions_allowed': False,
'close_all_positions': market_conditions.get('extreme_volatility', False),
'hedge_ratio': 0.5,
'cash_allocation': 0.7,
'rebalance_frequency': 'immediate'
}
5. 相関分析とポートフォリオ分散
5.1 動的相関分析
class DynamicCorrelationAnalyzer:
def __init__(self, window_sizes: List[int] = [20, 60, 120]):
self.window_sizes = window_sizes
self.correlation_history = {}
def analyze_correlations(self,
returns_data: pd.DataFrame,
method: str = 'dynamic') -> Dict:
"""動的相関分析を実行"""
if method == 'static':
# 静的相関
correlation_matrix = returns_data.corr()
elif method == 'dynamic':
# DCC-GARCH型の動的相関
correlation_matrix = self._calculate_dcc_correlation(returns_data)
elif method == 'regime_switching':
# レジーム切り替え相関
correlation_matrix = self._regime_switching_correlation(returns_data)
# 相関構造の分析
analysis_results = {
'correlation_matrix': correlation_matrix,
'correlation_clusters': self._identify_correlation_clusters(correlation_matrix),
'correlation_stability': self._assess_correlation_stability(returns_data),
'diversification_ratio': self._calculate_diversification_ratio(correlation_matrix),
'effective_assets': self._calculate_effective_assets(correlation_matrix)
}
return analysis_results
def _calculate_dcc_correlation(self, returns: pd.DataFrame) -> pd.DataFrame:
"""DCC(Dynamic Conditional Correlation)を計算"""
n_assets = len(returns.columns)
T = len(returns)
# 標準化リターン
standardized_returns = pd.DataFrame()
for col in returns.columns:
vol = returns[col].rolling(20).std()
standardized_returns[col] = returns[col] / vol
# 無条件相関行列
Q_bar = standardized_returns.corr()
# DCC parameters (simplified)
a = 0.01
b = 0.95
# 初期化
Q_t = Q_bar.copy()
R_t = Q_t.copy()
# 最新の動的相関を計算
for t in range(1, min(100, T)): # 最新100期間
# 標準化リターンの外積
z_t = standardized_returns.iloc[t].values.reshape(-1, 1)
# Qの更新
Q_t = (1 - a - b) * Q_bar + a * (z_t @ z_t.T) + b * Q_t
# 相関行列への変換
D_t = np.diag(1 / np.sqrt(np.diag(Q_t)))
R_t = D_t @ Q_t @ D_t
return pd.DataFrame(R_t, index=returns.columns, columns=returns.columns)
def _regime_switching_correlation(self, returns: pd.DataFrame) -> pd.DataFrame:
"""レジーム切り替えモデルによる相関分析"""
from sklearn.mixture import GaussianMixture
# レジーム数
n_regimes = 2
# 特徴量: リターンとボラティリティ
features = pd.DataFrame()
features['mean_return'] = returns.mean(axis=1)
features['volatility'] = returns.std(axis=1)
# レジームを識別
gmm = GaussianMixture(n_components=n_regimes, random_state=42)
regimes = gmm.fit_predict(features.dropna())
# 現在のレジーム
current_regime = regimes[-1]
# 現在のレジームでの相関を計算
regime_returns = returns[regimes == current_regime]
return regime_returns.corr()
def _identify_correlation_clusters(self, corr_matrix: pd.DataFrame) -> Dict:
"""相関クラスターを識別"""
from scipy.cluster.hierarchy import linkage, fcluster
from scipy.spatial.distance import squareform
# 距離行列
distance_matrix = 1 - corr_matrix.abs()
condensed_dist = squareform(distance_matrix)
# 階層的クラスタリング
Z = linkage(condensed_dist, method='ward')
# クラスター数を決定(相関0.7を閾値)
clusters = fcluster(Z, 0.3, criterion='distance')
# クラスター情報をまとめる
cluster_info = {}
for i, cluster in enumerate(clusters):
asset = corr_matrix.index[i]
if cluster not in cluster_info:
cluster_info[cluster] = []
cluster_info[cluster].append(asset)
return cluster_info
def _assess_correlation_stability(self, returns: pd.DataFrame) -> float:
"""相関の安定性を評価"""
correlation_changes = []
for window in self.window_sizes:
# ローリング相関
rolling_corr = returns.rolling(window).corr()
# 相関の変化を計算
for i in range(window, len(returns), window//2):
corr1 = rolling_corr.iloc[i-window:i].mean()
corr2 = rolling_corr.iloc[i:i+window].mean()
# Frobenius normで差を測定
diff = np.linalg.norm(corr1 - corr2, 'fro')
correlation_changes.append(diff)
# 安定性スコア(0-1、1が最も安定)
stability = 1 / (1 + np.mean(correlation_changes))
return stability
def _calculate_diversification_ratio(self, corr_matrix: pd.DataFrame) -> float:
"""分散投資比率を計算"""
n_assets = len(corr_matrix)
# 等加重ポートフォリオを仮定
weights = np.ones(n_assets) / n_assets
# 個別資産のリスクの加重平均
avg_individual_risk = np.sqrt(np.diag(corr_matrix)) @ weights
# ポートフォリオリスク
portfolio_risk = np.sqrt(weights @ corr_matrix @ weights)
# 分散投資比率
diversification_ratio = avg_individual_risk / portfolio_risk
return diversification_ratio
def _calculate_effective_assets(self, corr_matrix: pd.DataFrame) -> float:
"""有効資産数を計算(Principal Component Analysis)"""
eigenvalues = np.linalg.eigvalsh(corr_matrix)
eigenvalues = eigenvalues[eigenvalues > 0] # 正の固有値のみ
# 有効資産数(エントロピーベース)
eigenvalues_normalized = eigenvalues / eigenvalues.sum()
entropy = -np.sum(eigenvalues_normalized * np.log(eigenvalues_normalized))
effective_assets = np.exp(entropy)
return effective_assets
5.2 最適分散化戦略
class OptimalDiversificationStrategy:
def __init__(self):
self.correlation_analyzer = DynamicCorrelationAnalyzer()
self.min_correlation_threshold = 0.7
def build_diversified_portfolio(self,
assets_data: pd.DataFrame,
constraints: Dict = None) -> Dict:
"""最適に分散されたポートフォリオを構築"""
returns = assets_data.pct_change().dropna()
# 相関分析
correlation_results = self.correlation_analyzer.analyze_correlations(returns)
corr_matrix = correlation_results['correlation_matrix']
# 最大分散化ポートフォリオ
mdp_weights = self._maximum_diversification_portfolio(returns, corr_matrix)
# 最大デコレレーションポートフォリオ
decorr_weights = self._maximum_decorrelation_portfolio(corr_matrix)
# エントロピー最大化ポートフォリオ
entropy_weights = self._maximum_entropy_portfolio(returns)
# 統合ポートフォリオ(各戦略の加重平均)
combined_weights = self._combine_strategies({
'mdp': mdp_weights,
'decorrelation': decorr_weights,
'entropy': entropy_weights
}, correlation_results)
# パフォーマンス指標を計算
performance_metrics = self._calculate_performance_metrics(
combined_weights, returns, corr_matrix
)
return {
'weights': combined_weights,
'strategy_weights': {
'maximum_diversification': mdp_weights,
'maximum_decorrelation': decorr_weights,
'maximum_entropy': entropy_weights
},
'performance_metrics': performance_metrics,
'correlation_info': correlation_results
}
def _maximum_diversification_portfolio(self,
returns: pd.DataFrame,
corr_matrix: pd.DataFrame) -> Dict:
"""最大分散化ポートフォリオ(MDP)"""
n_assets = len(returns.columns)
# 個別資産のボラティリティ
volatilities = returns.std()
# 共分散行列
cov_matrix = returns.cov()
def diversification_ratio(weights):
# 負の値を返す(最大化のため)
weighted_avg_vol = np.sum(weights * volatilities)
portfolio_vol = np.sqrt(weights @ cov_matrix @ weights)
return -weighted_avg_vol / portfolio_vol
# 制約条件
constraints = [
{'type': 'eq', 'fun': lambda w: np.sum(w) - 1}
]
# 境界条件
bounds = [(0, 0.3) for _ in range(n_assets)]
# 初期値
x0 = np.ones(n_assets) / n_assets
# 最適化
result = minimize(
diversification_ratio,
x0,
method='SLSQP',
bounds=bounds,
constraints=constraints
)
return dict(zip(returns.columns, result.x))
def _maximum_decorrelation_portfolio(self, corr_matrix: pd.DataFrame) -> Dict:
"""最大デコレレーションポートフォリオ"""
n_assets = len(corr_matrix)
def portfolio_correlation(weights):
# ポートフォリオの平均相関を最小化
weighted_corr = weights @ corr_matrix @ weights
# 対角要素を除く
avg_corr = (weighted_corr - np.sum(weights**2)) / (1 - np.sum(weights**2))
return avg_corr
# 制約条件
constraints = [
{'type': 'eq', 'fun': lambda w: np.sum(w) - 1}
]
# 境界条件
bounds = [(0.05, 0.3) for _ in range(n_assets)]
# 初期値
x0 = np.ones(n_assets) / n_assets
# 最適化
result = minimize(
portfolio_correlation,
x0,
method='SLSQP',
bounds=bounds,
constraints=constraints
)
return dict(zip(corr_matrix.columns, result.x))
def _maximum_entropy_portfolio(self, returns: pd.DataFrame) -> Dict:
"""エントロピー最大化ポートフォリオ"""
n_assets = len(returns.columns)
def negative_entropy(weights):
# Shannon entropyの負値
weights_positive = np.maximum(weights, 1e-10)
entropy = -np.sum(weights_positive * np.log(weights_positive))
return -entropy
# 期待リターンの制約を追加
expected_returns = returns.mean()
target_return = expected_returns.mean() # 平均的なリターンを目標
# 制約条件
constraints = [
{'type': 'eq', 'fun': lambda w: np.sum(w) - 1},
{'type': 'ineq', 'fun': lambda w: w @ expected_returns - target_return}
]
# 境界条件
bounds = [(0.01, 0.5) for _ in range(n_assets)]
# 初期値
x0 = np.ones(n_assets) / n_assets
# 最適化
result = minimize(
negative_entropy,
x0,
method='SLSQP',
bounds=bounds,
constraints=constraints
)
return dict(zip(returns.columns, result.x))
def _combine_strategies(self,
strategy_weights: Dict[str, Dict],
correlation_results: Dict) -> Dict:
"""複数の戦略を組み合わせ"""
# 戦略の重み(市場状況に応じて調整)
strategy_importance = {
'mdp': 0.4,
'decorrelation': 0.3,
'entropy': 0.3
}
# 相関の安定性に応じて調整
stability = correlation_results['correlation_stability']
if stability < 0.5:
# 不安定な場合はエントロピー戦略を重視
strategy_importance['entropy'] = 0.5
strategy_importance['mdp'] = 0.3
strategy_importance['decorrelation'] = 0.2
# 加重平均
combined = {}
all_assets = set()
for strategy_weights_dict in strategy_weights.values():
all_assets.update(strategy_weights_dict.keys())
for asset in all_assets:
weight_sum = 0
for strategy, importance in strategy_importance.items():
if asset in strategy_weights[strategy]:
weight_sum += importance * strategy_weights[strategy][asset]
combined[asset] = weight_sum
# 正規化
total_weight = sum(combined.values())
combined = {k: v/total_weight for k, v in combined.items()}
return combined
def _calculate_performance_metrics(self,
weights: Dict,
returns: pd.DataFrame,
corr_matrix: pd.DataFrame) -> Dict:
"""ポートフォリオのパフォーマンス指標を計算"""
# 重みベクトル
w = np.array([weights.get(asset, 0) for asset in returns.columns])
# 期待リターン
expected_return = w @ returns.mean()
# リスク(標準偏差)
cov_matrix = returns.cov()
portfolio_risk = np.sqrt(w @ cov_matrix @ w)
# シャープレシオ(リスクフリーレート0と仮定)
sharpe_ratio = expected_return / portfolio_risk if portfolio_risk > 0 else 0
# 分散投資比率
individual_risks = returns.std()
weighted_avg_risk = w @ individual_risks
diversification_ratio = weighted_avg_risk / portfolio_risk
# 最大ドローダウン(簡易計算)
portfolio_returns = returns @ w
cumulative_returns = (1 + portfolio_returns).cumprod()
running_max = cumulative_returns.expanding().max()
drawdown = (cumulative_returns - running_max) / running_max
max_drawdown = drawdown.min()
return {
'expected_return': expected_return,
'risk': portfolio_risk,
'sharpe_ratio': sharpe_ratio,
'diversification_ratio': diversification_ratio,
'max_drawdown': max_drawdown,
'effective_assets': self.correlation_analyzer._calculate_effective_assets(corr_matrix)
}
6. 統合リスク管理システム
class IntegratedRiskManagementSystem:
def __init__(self, config: Dict):
self.config = config
# コンポーネントの初期化
self.portfolio_optimizer = MLPortfolioOptimizer(config['assets'])
self.position_sizer = MLPositionSizer(config['capital'])
self.kelly_calculator = MLKellyCriterion()
self.drawdown_predictor = DrawdownPredictor()
self.drawdown_prevention = DrawdownPreventionSystem()
self.diversification_strategy = OptimalDiversificationStrategy()
# 状態管理
self.current_portfolio = {}
self.risk_metrics = {}
self.performance_history = []
def execute_risk_management_cycle(self,
market_data: pd.DataFrame,
signals: Dict) -> Dict:
"""完全なリスク管理サイクルを実行"""
# 1. ポートフォリオ最適化
optimization_results = self._optimize_portfolio(market_data)
# 2. ポジションサイジング
position_sizes = self._calculate_position_sizes(signals, market_data)
# 3. リスク評価
risk_assessment = self._assess_portfolio_risk(market_data)
# 4. ドローダウン管理
drawdown_management = self._manage_drawdown(market_data)
# 5. 実行可能な取引の生成
executable_trades = self._generate_executable_trades(
optimization_results,
position_sizes,
risk_assessment,
drawdown_management
)
# 6. パフォーマンストラッキング
self._track_performance(executable_trades)
return {
'optimization': optimization_results,
'position_sizes': position_sizes,
'risk_assessment': risk_assessment,
'drawdown_management': drawdown_management,
'trades': executable_trades,
'current_portfolio': self.current_portfolio,
'risk_metrics': self.risk_metrics
}
def _optimize_portfolio(self, market_data: pd.DataFrame) -> Dict:
"""ポートフォリオを最適化"""
# 期待リターンを計算
expected_returns = self.portfolio_optimizer.calculate_expected_returns(
market_data, method='ml_enhanced'
)
# リスク行列を推定
returns = market_data.pct_change().dropna()
risk_matrix = self.portfolio_optimizer.estimate_risk_matrix(
returns, method='ml_shrinkage'
)
# 制約条件を設定
constraints = {
'min_weight': 0.05,
'max_weight': 0.25,
'risk_target': self.config.get('risk_target', 0.15)
}
# 最適化を実行
optimization_result = self.portfolio_optimizer.optimize_portfolio(
expected_returns, risk_matrix, constraints
)
# 分散化戦略も適用
diversification_result = self.diversification_strategy.build_diversified_portfolio(
market_data, constraints
)
# 結果を統合
final_weights = self._blend_optimization_results(
optimization_result['weights'],
diversification_result['weights'],
market_conditions=self._analyze_market_conditions(market_data)
)
return {
'target_weights': final_weights,
'expected_return': optimization_result['expected_return'],
'expected_risk': optimization_result['expected_risk'],
'diversification_metrics': diversification_result['performance_metrics']
}
def _calculate_position_sizes(self,
signals: Dict,
market_data: pd.DataFrame) -> Dict:
"""各シグナルのポジションサイズを計算"""
position_sizes = {}
for symbol, signal in signals.items():
# 基本的なポジションサイズ
base_size = self.position_sizer.calculate_position_size(
signal['strength'],
{'symbol': symbol, 'volatility': signal.get('volatility', 0.02)},
self.current_portfolio
)
# Kelly基準による調整
kelly_fraction = self.kelly_calculator.calculate_kelly_fraction(
signal,
pd.DataFrame(self.performance_history)
)
# 最終的なサイズ
final_size = base_size * kelly_fraction
position_sizes[symbol] = {
'size': final_size,
'base_size': base_size,
'kelly_fraction': kelly_fraction,
'confidence': signal.get('confidence', 0.5)
}
return position_sizes
def _assess_portfolio_risk(self, market_data: pd.DataFrame) -> Dict:
"""ポートフォリオのリスクを評価"""
if not self.current_portfolio:
return {'risk_level': 'LOW', 'metrics': {}}
# VaRの計算
portfolio_returns = self._calculate_portfolio_returns(market_data)
var_95 = np.percentile(portfolio_returns, 5)
cvar_95 = portfolio_returns[portfolio_returns <= var_95].mean()
# ストレステスト
stress_scenarios = self._run_stress_tests(market_data)
# 相関リスク
correlation_risk = self._assess_correlation_risk(market_data)
# 総合リスクスコア
risk_score = self._calculate_composite_risk_score({
'var_95': var_95,
'cvar_95': cvar_95,
'stress_test_results': stress_scenarios,
'correlation_risk': correlation_risk
})
return {
'risk_level': self._classify_risk_level(risk_score),
'metrics': {
'var_95': var_95,
'cvar_95': cvar_95,
'risk_score': risk_score,
'stress_test_summary': stress_scenarios
}
}
def _manage_drawdown(self, market_data: pd.DataFrame) -> Dict:
"""ドローダウンを管理"""
# 現在のポートフォリオ価値
portfolio_value = self._calculate_portfolio_value(market_data)
# ピーク値(履歴から)
peak_value = max([h.get('portfolio_value', 0)
for h in self.performance_history] + [portfolio_value])
# ドローダウン予測
drawdown_prediction = self.drawdown_predictor.predict_drawdown_risk(
market_data,
pd.DataFrame(self.performance_history)
)
# 保護措置
protection_actions = self.drawdown_prevention.monitor_and_protect(
portfolio_value,
peak_value,
self._analyze_market_conditions(market_data)
)
return {
'current_drawdown': protection_actions['current_drawdown'],
'predicted_risk': drawdown_prediction['risk_level'],
'protection_level': protection_actions['protection_level'],
'recommended_actions': protection_actions['actions']
}
def _generate_executable_trades(self,
optimization: Dict,
position_sizes: Dict,
risk_assessment: Dict,
drawdown_management: Dict) -> List[Dict]:
"""実行可能な取引を生成"""
trades = []
# リスクレベルに応じた調整
risk_multiplier = self._get_risk_multiplier(
risk_assessment['risk_level'],
drawdown_management['protection_level']
)
# 目標ウェイトと現在のウェイトの差分
target_weights = optimization['target_weights']
current_weights = self._calculate_current_weights()
for asset, target_weight in target_weights.items():
current_weight = current_weights.get(asset, 0)
weight_diff = target_weight - current_weight
# 取引が必要な場合
if abs(weight_diff) > 0.01: # 1%以上の差
# ポジションサイズの計算
trade_size = abs(weight_diff) * self.config['capital'] * risk_multiplier
# シグナルがある場合は追加調整
if asset in position_sizes:
signal_size = position_sizes[asset]['size']
trade_size = (trade_size + signal_size) / 2 # 平均を取る
trades.append({
'asset': asset,
'action': 'buy' if weight_diff > 0 else 'sell',
'size': trade_size,
'target_weight': target_weight,
'current_weight': current_weight,
'risk_adjusted': True,
'timestamp': pd.Timestamp.now()
})
return trades
def _track_performance(self, trades: List[Dict]):
"""パフォーマンスを追跡"""
performance_entry = {
'timestamp': pd.Timestamp.now(),
'portfolio_value': sum(self.current_portfolio.values()),
'trades_executed': len(trades),
'risk_metrics': self.risk_metrics.copy()
}
self.performance_history.append(performance_entry)
# 履歴の制限(メモリ管理)
if len(self.performance_history) > 10000:
self.performance_history = self.performance_history[-5000:]
def _analyze_market_conditions(self, market_data: pd.DataFrame) -> Dict:
"""市場状況を分析"""
returns = market_data['close'].pct_change()
return {
'volatility': returns.rolling(20).std().iloc[-1],
'trend': (market_data['close'].iloc[-1] / market_data['close'].iloc[-20] - 1),
'volume_trend': (market_data['volume'].rolling(5).mean() /
market_data['volume'].rolling(20).mean()).iloc[-1],
'extreme_volatility': returns.rolling(20).std().iloc[-1] > 0.05
}
def _calculate_portfolio_returns(self, market_data: pd.DataFrame) -> np.ndarray:
"""ポートフォリオリターンを計算"""
returns = market_data.pct_change().dropna()
weights = np.array([self.current_portfolio.get(asset, 0)
for asset in returns.columns])
if weights.sum() > 0:
weights = weights / weights.sum()
portfolio_returns = returns @ weights
return portfolio_returns.values
def _run_stress_tests(self, market_data: pd.DataFrame) -> Dict:
"""ストレステストを実行"""
scenarios = {
'market_crash': -0.20,
'flash_crash': -0.10,
'volatility_spike': 3.0,
'correlation_breakdown': 0.9
}
results = {}
for scenario, shock in scenarios.items():
if 'crash' in scenario:
# 価格ショック
shocked_value = self._calculate_portfolio_value(
market_data * (1 + shock)
)
else:
# その他のシナリオ
shocked_value = self._calculate_portfolio_value(market_data) * 0.9
loss = (shocked_value - self._calculate_portfolio_value(market_data)) / \
self._calculate_portfolio_value(market_data)
results[scenario] = loss
return results
def _get_risk_multiplier(self, risk_level: str, protection_level: int) -> float:
"""リスクレベルに基づく乗数を取得"""
base_multipliers = {
'LOW': 1.0,
'MEDIUM': 0.7,
'HIGH': 0.4,
'EXTREME': 0.1
}
multiplier = base_multipliers.get(risk_level, 0.5)
# 保護レベルによる追加調整
if protection_level > 0:
multiplier *= (1 - protection_level * 0.2)
return max(multiplier, 0.1)
def _calculate_portfolio_value(self, market_data: pd.DataFrame) -> float:
"""ポートフォリオの現在価値を計算"""
total_value = 0
latest_prices = market_data.iloc[-1]
for asset, position in self.current_portfolio.items():
if asset in latest_prices:
total_value += position * latest_prices[asset]
return total_value
def _calculate_current_weights(self) -> Dict:
"""現在のポートフォリオウェイトを計算"""
total_value = sum(self.current_portfolio.values())
if total_value == 0:
return {}
return {asset: value/total_value
for asset, value in self.current_portfolio.items()}
def _blend_optimization_results(self,
opt_weights: Dict,
div_weights: Dict,
market_conditions: Dict) -> Dict:
"""最適化結果をブレンド"""
# 市場状況に応じてブレンド比率を調整
if market_conditions['volatility'] > 0.03:
# 高ボラティリティ時は分散化を重視
blend_ratio = 0.7
else:
# 通常時は最適化を重視
blend_ratio = 0.3
blended_weights = {}
all_assets = set(opt_weights.keys()) | set(div_weights.keys())
for asset in all_assets:
opt_w = opt_weights.get(asset, 0)
div_w = div_weights.get(asset, 0)
blended_weights[asset] = (1-blend_ratio) * opt_w + blend_ratio * div_w
# 正規化
total = sum(blended_weights.values())
if total > 0:
blended_weights = {k: v/total for k, v in blended_weights.items()}
return blended_weights
def _assess_correlation_risk(self, market_data: pd.DataFrame) -> float:
"""相関リスクを評価"""
returns = market_data.pct_change().dropna()
# ローリング相関の変化
rolling_corr = returns.rolling(60).corr()
recent_corr = rolling_corr.iloc[-20:].mean()
historical_corr = rolling_corr.iloc[-100:-20].mean()
# 相関の急激な変化を検出
corr_change = np.abs(recent_corr - historical_corr).mean()
# リスクスコア(0-1)
correlation_risk = min(corr_change * 10, 1.0)
return correlation_risk
def _calculate_composite_risk_score(self, risk_metrics: Dict) -> float:
"""複合リスクスコアを計算"""
# 各リスク指標の重み
weights = {
'var': 0.25,
'cvar': 0.25,
'stress': 0.30,
'correlation': 0.20
}
# スコアの計算
score = 0
# VaRとCVaR(負の値なので絶対値を取る)
score += weights['var'] * min(abs(risk_metrics['var_95']) * 10, 1)
score += weights['cvar'] * min(abs(risk_metrics['cvar_95']) * 10, 1)
# ストレステスト結果
max_stress_loss = max(abs(loss) for loss in
risk_metrics['stress_test_results'].values())
score += weights['stress'] * min(max_stress_loss * 5, 1)
# 相関リスク
score += weights['correlation'] * risk_metrics['correlation_risk']
return score
def _classify_risk_level(self, risk_score: float) -> str:
"""リスクスコアからレベルを分類"""
if risk_score < 0.25:
return 'LOW'
elif risk_score < 0.50:
return 'MEDIUM'
elif risk_score < 0.75:
return 'HIGH'
else:
return 'EXTREME'
# 使用例
def main():
# 設定
config = {
'assets': ['BTC', 'ETH', 'BNB', 'SOL', 'ADA'],
'capital': 100000,
'risk_target': 0.15,
'max_drawdown': 0.20
}
# システムの初期化
risk_system = IntegratedRiskManagementSystem(config)
# 市場データの読み込み(例)
market_data = pd.read_csv('crypto_market_data.csv', parse_dates=['timestamp'])
market_data.set_index('timestamp', inplace=True)
# シグナルの例
signals = {
'BTC': {'strength': 0.7, 'volatility': 0.02, 'confidence': 0.8},
'ETH': {'strength': 0.5, 'volatility': 0.03, 'confidence': 0.6}
}
# リスク管理サイクルの実行
results = risk_system.execute_risk_management_cycle(market_data, signals)
# 結果の表示
print("Portfolio Optimization:")
print(f"Target weights: {results['optimization']['target_weights']}")
print(f"Expected return: {results['optimization']['expected_return']:.2%}")
print(f"Expected risk: {results['optimization']['expected_risk']:.2%}")
print("\nRisk Assessment:")
print(f"Risk level: {results['risk_assessment']['risk_level']}")
print(f"VaR (95%): {results['risk_assessment']['metrics']['var_95']:.2%}")
print("\nDrawdown Management:")
print(f"Current drawdown: {results['drawdown_management']['current_drawdown']:.2%}")
print(f"Protection level: {results['drawdown_management']['protection_level']}")
print("\nExecutable Trades:")
for trade in results['trades']:
print(f"{trade['action']} {trade['size']:.2f} units of {trade['asset']}")
if __name__ == "__main__":
main()
まとめ
本ドキュメントでは、暗号通貨取引における機械学習を活用したリスク管理と資金管理について包括的に解説しました。
主要なポイント
-
ポートフォリオ最適化
- 機械学習による期待リターン予測
- 動的リスク行列の推定
- 複数の最適化手法の統合 -
動的ポジションサイジング
- 市場状況適応型サイジング
- Kelly基準の機械学習拡張
- リスク調整メカニズム -
ドローダウン管理
- 予測的ドローダウン検出
- 多段階保護システム
- 自動リスク削減メカニズム -
相関分析と分散化
- 動的相関モデリング
- 最適分散化戦略
- レジーム適応型ポートフォリオ -
統合システム
- 包括的リスク評価
- リアルタイム調整
- パフォーマンストラッキング
これらの技術を適切に実装・運用することで、暗号通貨取引における損失を最小限に抑えながら、長期的に安定したリターンを実現することが可能になります。