目次
マルチモーダル学習とオンチェーンデータ統合による暗号資産分析
1. はじめに
1.1 マルチモーダル学習の必要性
暗号資産市場は複数の異なるデータソースから影響を受けます:
- 価格データ: OHLCV、ティックデータ
- オンチェーンデータ: トランザクション、ウォレット活動、スマートコントラクト
- ソーシャルデータ: Twitter、Reddit、Discord
- ニュースデータ: 記事、規制発表
- 画像データ: チャート、ミーム
これらの異種データを統合的に分析することで、より包括的な市場理解が可能になります。
1.2 オンチェーンデータの価値
オンチェーンデータは改ざん不可能で透明性が高く、以下の洞察を提供します:
- 実際の利用状況: アクティブアドレス数、トランザクション量
- 保有分布: 大口保有者の動向、取引所への資金流入出
- ネットワーク健全性: ハッシュレート、ステーキング率
- DeFi活動: TVL、流動性、清算
2. オンチェーンデータの収集と処理
2.1 データ収集システム
import asyncio
import aiohttp
from web3 import Web3
import pandas as pd
import numpy as np
from typing import Dict, List, Any
from dataclasses import dataclass
from datetime import datetime, timedelta
@dataclass
class OnChainMetrics:
"""オンチェーンメトリクスのデータクラス"""
timestamp: datetime
block_number: int
active_addresses: int
transaction_count: int
transaction_volume: float
average_gas_price: float
hash_rate: float
difficulty: float
total_supply: float
circulating_supply: float
class OnChainDataCollector:
"""オンチェーンデータ収集クラス"""
def __init__(self, rpc_endpoints: Dict[str, str], api_keys: Dict[str, str]):
self.rpc_endpoints = rpc_endpoints
self.api_keys = api_keys
self.web3_instances = {}
# Web3インスタンスの初期化
for chain, endpoint in rpc_endpoints.items():
self.web3_instances[chain] = Web3(Web3.HTTPProvider(endpoint))
async def collect_block_data(self, chain: str, block_number: int = None):
"""ブロックデータの収集"""
w3 = self.web3_instances[chain]
if block_number is None:
block_number = w3.eth.block_number
block = w3.eth.get_block(block_number, full_transactions=True)
# ブロック統計の計算
tx_count = len(block.transactions)
total_gas_used = block.gasUsed
avg_gas_price = np.mean([tx.gasPrice for tx in block.transactions]) if tx_count > 0 else 0
# トランザクション分析
tx_volume = 0
unique_addresses = set()
for tx in block.transactions:
tx_volume += tx.value / 1e18 # Wei to ETH
unique_addresses.add(tx['from'])
unique_addresses.add(tx['to'])
return {
'block_number': block_number,
'timestamp': datetime.fromtimestamp(block.timestamp),
'transaction_count': tx_count,
'transaction_volume': tx_volume,
'average_gas_price': avg_gas_price / 1e9, # Wei to Gwei
'active_addresses': len(unique_addresses),
'gas_used': total_gas_used,
'difficulty': block.difficulty,
'size': block.size
}
async def collect_defi_metrics(self, protocol: str, chain: str):
"""DeFiプロトコルのメトリクス収集"""
metrics = {}
if protocol == 'uniswap':
metrics = await self._collect_uniswap_metrics(chain)
elif protocol == 'aave':
metrics = await self._collect_aave_metrics(chain)
elif protocol == 'compound':
metrics = await self._collect_compound_metrics(chain)
return metrics
async def _collect_uniswap_metrics(self, chain: str):
"""Uniswapメトリクスの収集"""
# GraphQLクエリ
query = """
{
uniswapDayDatas(first: 1, orderBy: date, orderDirection: desc) {
date
volumeUSD
txCount
totalLiquidityUSD
}
pools(first: 10, orderBy: totalValueLockedUSD, orderDirection: desc) {
id
token0 { symbol }
token1 { symbol }
totalValueLockedUSD
volumeUSD
}
}
"""
# The Graphへのクエリ
async with aiohttp.ClientSession() as session:
async with session.post(
f"https://api.thegraph.com/subgraphs/name/uniswap/uniswap-v3",
json={'query': query}
) as response:
data = await response.json()
return self._parse_uniswap_data(data)
async def collect_wallet_metrics(self, addresses: List[str], chain: str):
"""ウォレットメトリクスの収集"""
w3 = self.web3_instances[chain]
wallet_data = []
for address in addresses:
# 残高取得
balance = w3.eth.get_balance(address) / 1e18
# トランザクション履歴(簡略化)
tx_count = w3.eth.get_transaction_count(address)
# トークン保有(ERC20)
token_balances = await self._get_token_balances(address, chain)
wallet_data.append({
'address': address,
'balance': balance,
'transaction_count': tx_count,
'token_holdings': token_balances,
'last_active': await self._get_last_activity(address, chain)
})
return wallet_data
async def collect_smart_contract_metrics(self, contract_address: str, chain: str):
"""スマートコントラクトメトリクスの収集"""
w3 = self.web3_instances[chain]
# コントラクトコードの取得
code = w3.eth.get_code(contract_address)
# イベントログの取得
events = await self._get_contract_events(contract_address, chain)
# ガス使用量の統計
gas_stats = await self._analyze_gas_usage(contract_address, chain)
return {
'address': contract_address,
'code_size': len(code),
'event_count': len(events),
'unique_callers': len(set(e['from'] for e in events)),
'gas_stats': gas_stats,
'is_verified': await self._check_contract_verification(contract_address, chain)
}
2.2 オンチェーンデータの特徴量エンジニアリング
class OnChainFeatureEngineering:
"""オンチェーンデータの特徴量エンジニアリング"""
def __init__(self):
self.feature_cache = {}
def extract_network_features(self, block_data: List[Dict]) -> pd.DataFrame:
"""ネットワークレベルの特徴量抽出"""
df = pd.DataFrame(block_data)
# 基本統計量
df['tx_count_ma7'] = df['transaction_count'].rolling(7).mean()
df['tx_volume_ma7'] = df['transaction_volume'].rolling(7).mean()
df['active_addresses_ma7'] = df['active_addresses'].rolling(7).mean()
# 成長率
df['tx_count_growth'] = df['transaction_count'].pct_change(7)
df['volume_growth'] = df['transaction_volume'].pct_change(7)
# ネットワーク使用率
df['network_utilization'] = df['gas_used'] / df['gas_limit']
# ボラティリティ指標
df['gas_price_volatility'] = df['average_gas_price'].rolling(24).std()
# NVT比率(Network Value to Transactions)
df['nvt_ratio'] = df['market_cap'] / df['transaction_volume'].rolling(30).mean()
# MVRV比率(Market Value to Realized Value)
df['mvrv_ratio'] = self._calculate_mvrv(df)
return df
def extract_wallet_features(self, wallet_data: List[Dict]) -> pd.DataFrame:
"""ウォレットレベルの特徴量抽出"""
features = []
for wallet in wallet_data:
wallet_features = {
'address': wallet['address'],
'balance_log': np.log1p(wallet['balance']),
'tx_frequency': wallet['transaction_count'] / wallet['age_days'],
'is_whale': wallet['balance'] > 1000, # 例:1000 ETH以上
'token_diversity': len(wallet['token_holdings']),
'dormancy': (datetime.now() - wallet['last_active']).days,
'balance_percentile': self._calculate_balance_percentile(wallet['balance'])
}
# 行動パターン分類
wallet_features['wallet_type'] = self._classify_wallet_behavior(wallet)
features.append(wallet_features)
return pd.DataFrame(features)
def extract_defi_features(self, defi_data: Dict) -> Dict:
"""DeFi関連の特徴量抽出"""
features = {}
# TVL(Total Value Locked)関連
features['tvl_growth_24h'] = (defi_data['tvl'] - defi_data['tvl_24h_ago']) / defi_data['tvl_24h_ago']
features['tvl_concentration'] = self._calculate_tvl_concentration(defi_data['pools'])
# 流動性指標
features['liquidity_depth'] = defi_data['total_liquidity_usd']
features['liquidity_utilization'] = defi_data['volume_24h'] / defi_data['total_liquidity_usd']
# 利回り指標
features['avg_apy'] = np.mean([pool['apy'] for pool in defi_data['pools']])
features['apy_volatility'] = np.std([pool['apy'] for pool in defi_data['pools']])
# リスク指標
features['impermanent_loss_risk'] = self._estimate_impermanent_loss(defi_data['pools'])
features['liquidation_risk'] = defi_data.get('liquidations_24h', 0) / defi_data['tvl']
return features
def extract_cross_chain_features(self, chain_data: Dict[str, pd.DataFrame]) -> pd.DataFrame:
"""クロスチェーン特徴量の抽出"""
features = pd.DataFrame()
# チェーン間の相関
for chain1 in chain_data:
for chain2 in chain_data:
if chain1 < chain2:
corr = chain_data[chain1]['transaction_count'].corr(
chain_data[chain2]['transaction_count']
)
features[f'corr_{chain1}_{chain2}'] = [corr]
# ブリッジ活動
features['bridge_volume'] = self._get_bridge_volume(chain_data)
features['cross_chain_ratio'] = features['bridge_volume'] / sum(
df['transaction_volume'].sum() for df in chain_data.values()
)
return features
def _calculate_mvrv(self, df: pd.DataFrame) -> pd.Series:
"""MVRV比率の計算"""
# 実現価値の計算(簡略化)
realized_value = df['transaction_volume'].rolling(365).mean() * df['price']
market_value = df['circulating_supply'] * df['price']
return market_value / realized_value
def _classify_wallet_behavior(self, wallet: Dict) -> str:
"""ウォレットの行動パターン分類"""
if wallet['dormancy'] > 365:
return 'hodler'
elif wallet['tx_frequency'] > 10:
return 'active_trader'
elif wallet['balance'] > 1000:
return 'whale'
elif len(wallet['defi_interactions']) > 5:
return 'defi_user'
else:
return 'regular'
3. マルチモーダルモデルアーキテクチャ
3.1 基本的なマルチモーダル融合
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import AutoModel, AutoTokenizer
class MultiModalCryptoModel(nn.Module):
"""マルチモーダル暗号資産予測モデル"""
def __init__(self, config):
super(MultiModalCryptoModel, self).__init__()
# 価格データエンコーダー(LSTM)
self.price_encoder = nn.LSTM(
input_size=config['price_features'],
hidden_size=config['lstm_hidden'],
num_layers=2,
batch_first=True,
dropout=0.2
)
# オンチェーンデータエンコーダー(Transformer)
self.onchain_encoder = nn.TransformerEncoder(
nn.TransformerEncoderLayer(
d_model=config['onchain_features'],
nhead=8,
dim_feedforward=256,
dropout=0.1
),
num_layers=3
)
# テキストエンコーダー(事前学習済みBERT)
self.text_encoder = AutoModel.from_pretrained('bert-base-uncased')
self.text_projection = nn.Linear(768, config['text_hidden'])
# 画像エンコーダー(CNN)
self.image_encoder = nn.Sequential(
nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1),
nn.ReLU(),
nn.MaxPool2d(2),
nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
nn.ReLU(),
nn.MaxPool2d(2),
nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
nn.ReLU(),
nn.AdaptiveAvgPool2d((1, 1)),
nn.Flatten(),
nn.Linear(256, config['image_hidden'])
)
# クロスモーダルアテンション
self.cross_attention = CrossModalAttention(
price_dim=config['lstm_hidden'] * 2,
onchain_dim=config['onchain_features'],
text_dim=config['text_hidden'],
image_dim=config['image_hidden']
)
# 融合層
total_dim = (config['lstm_hidden'] * 2 + config['onchain_features'] +
config['text_hidden'] + config['image_hidden'])
self.fusion_network = nn.Sequential(
nn.Linear(total_dim, 512),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(512, 256),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(256, config['output_size'])
)
def forward(self, price_data, onchain_data, text_data, image_data):
# 価格データのエンコード
price_out, (h_n, _) = self.price_encoder(price_data)
price_features = torch.cat([h_n[-2], h_n[-1]], dim=1)
# オンチェーンデータのエンコード
onchain_features = self.onchain_encoder(onchain_data)
onchain_features = onchain_features.mean(dim=1) # 時系列の平均
# テキストデータのエンコード
text_outputs = self.text_encoder(**text_data)
text_features = self.text_projection(text_outputs.pooler_output)
# 画像データのエンコード
image_features = self.image_encoder(image_data)
# クロスモーダルアテンション
attended_features = self.cross_attention(
price_features, onchain_features, text_features, image_features
)
# 特徴量の結合
combined_features = torch.cat(attended_features, dim=1)
# 最終予測
output = self.fusion_network(combined_features)
return output
class CrossModalAttention(nn.Module):
"""クロスモーダルアテンション機構"""
def __init__(self, price_dim, onchain_dim, text_dim, image_dim):
super(CrossModalAttention, self).__init__()
# 各モダリティの射影
self.price_proj = nn.Linear(price_dim, 256)
self.onchain_proj = nn.Linear(onchain_dim, 256)
self.text_proj = nn.Linear(text_dim, 256)
self.image_proj = nn.Linear(image_dim, 256)
# マルチヘッドアテンション
self.multihead_attn = nn.MultiheadAttention(256, num_heads=8)
# ゲーティング機構
self.gates = nn.ModuleDict({
'price': nn.Sequential(nn.Linear(256, 1), nn.Sigmoid()),
'onchain': nn.Sequential(nn.Linear(256, 1), nn.Sigmoid()),
'text': nn.Sequential(nn.Linear(256, 1), nn.Sigmoid()),
'image': nn.Sequential(nn.Linear(256, 1), nn.Sigmoid())
})
def forward(self, price_feat, onchain_feat, text_feat, image_feat):
# 射影
price_proj = self.price_proj(price_feat).unsqueeze(0)
onchain_proj = self.onchain_proj(onchain_feat).unsqueeze(0)
text_proj = self.text_proj(text_feat).unsqueeze(0)
image_proj = self.image_proj(image_feat).unsqueeze(0)
# スタック
all_features = torch.cat([price_proj, onchain_proj, text_proj, image_proj], dim=0)
# セルフアテンション
attended, _ = self.multihead_attn(all_features, all_features, all_features)
# ゲーティング
price_gate = self.gates['price'](attended[0])
onchain_gate = self.gates['onchain'](attended[1])
text_gate = self.gates['text'](attended[2])
image_gate = self.gates['image'](attended[3])
# ゲート適用
gated_features = [
price_feat * price_gate,
onchain_feat * onchain_gate,
text_feat * text_gate,
image_feat * image_gate
]
return gated_features
3.2 階層的マルチモーダル融合
class HierarchicalMultiModalFusion(nn.Module):
"""階層的マルチモーダル融合モデル"""
def __init__(self, config):
super(HierarchicalMultiModalFusion, self).__init__()
# レベル1: 同種データの融合
self.numeric_fusion = NumericDataFusion(
price_dim=config['price_dim'],
onchain_dim=config['onchain_dim']
)
self.text_fusion = TextDataFusion(
news_dim=config['news_dim'],
social_dim=config['social_dim']
)
# レベル2: 異種データの融合
self.cross_modal_fusion = CrossModalFusion(
numeric_dim=config['fused_numeric_dim'],
text_dim=config['fused_text_dim'],
image_dim=config['image_dim']
)
# レベル3: 時間的統合
self.temporal_fusion = TemporalFusion(
input_dim=config['cross_modal_dim'],
hidden_dim=256
)
# 予測ヘッド
self.prediction_head = nn.Sequential(
nn.Linear(256, 128),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(128, config['num_classes'])
)
def forward(self, price_data, onchain_data, news_data, social_data, image_data):
# レベル1: 同種データ融合
numeric_features = self.numeric_fusion(price_data, onchain_data)
text_features = self.text_fusion(news_data, social_data)
# レベル2: 異種データ融合
cross_modal_features = self.cross_modal_fusion(
numeric_features, text_features, image_data
)
# レベル3: 時間的統合
temporal_features = self.temporal_fusion(cross_modal_features)
# 予測
output = self.prediction_head(temporal_features)
return output
class NumericDataFusion(nn.Module):
"""数値データの融合"""
def __init__(self, price_dim, onchain_dim):
super(NumericDataFusion, self).__init__()
# 特徴抽出
self.price_extractor = nn.Sequential(
nn.Linear(price_dim, 128),
nn.ReLU(),
nn.BatchNorm1d(128),
nn.Linear(128, 64)
)
self.onchain_extractor = nn.Sequential(
nn.Linear(onchain_dim, 128),
nn.ReLU(),
nn.BatchNorm1d(128),
nn.Linear(128, 64)
)
# 相互作用モデリング
self.interaction = nn.Sequential(
nn.Linear(128, 64),
nn.ReLU(),
nn.Linear(64, 32)
)
# 出力
self.output = nn.Linear(160, 128) # 64 + 64 + 32
def forward(self, price_data, onchain_data):
price_feat = self.price_extractor(price_data)
onchain_feat = self.onchain_extractor(onchain_data)
# 相互作用
interaction_feat = self.interaction(
torch.cat([price_feat, onchain_feat], dim=1)
)
# 結合
combined = torch.cat([price_feat, onchain_feat, interaction_feat], dim=1)
return self.output(combined)
3.3 グラフベースマルチモーダル統合
from torch_geometric.nn import MessagePassing, global_mean_pool
class GraphMultiModalNetwork(nn.Module):
"""グラフ構造を利用したマルチモーダルネットワーク"""
def __init__(self, config):
super(GraphMultiModalNetwork, self).__init__()
# モダリティごとのノード作成
self.modality_embeddings = nn.ModuleDict({
'price': nn.Linear(config['price_dim'], config['node_dim']),
'onchain': nn.Linear(config['onchain_dim'], config['node_dim']),
'text': nn.Linear(config['text_dim'], config['node_dim']),
'social': nn.Linear(config['social_dim'], config['node_dim'])
})
# グラフニューラルネットワーク層
self.gnn_layers = nn.ModuleList([
MultiModalGraphLayer(config['node_dim'], config['node_dim'])
for _ in range(config['num_gnn_layers'])
])
# 読み出し層
self.readout = GlobalAttentionPooling(config['node_dim'])
# 予測層
self.predictor = nn.Sequential(
nn.Linear(config['node_dim'], 256),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(256, config['output_dim'])
)
def forward(self, multimodal_data, edge_index):
# 各モダリティをノードとして埋め込み
node_features = []
for modality, data in multimodal_data.items():
if modality in self.modality_embeddings:
node_feat = self.modality_embeddings[modality](data)
node_features.append(node_feat)
# ノード特徴量の結合
x = torch.stack(node_features, dim=0)
# GNN層の適用
for gnn_layer in self.gnn_layers:
x = gnn_layer(x, edge_index)
x = F.relu(x)
x = F.dropout(x, p=0.2, training=self.training)
# グラフレベルの表現を取得
graph_representation = self.readout(x)
# 予測
output = self.predictor(graph_representation)
return output
class MultiModalGraphLayer(MessagePassing):
"""マルチモーダルグラフ層"""
def __init__(self, in_channels, out_channels):
super(MultiModalGraphLayer, self).__init__(aggr='mean')
self.lin = nn.Linear(in_channels, out_channels)
self.att = nn.Sequential(
nn.Linear(2 * in_channels, 1),
nn.LeakyReLU(0.2)
)
def forward(self, x, edge_index):
return self.propagate(edge_index, x=x)
def message(self, x_i, x_j):
# アテンション重みの計算
alpha = self.att(torch.cat([x_i, x_j], dim=-1))
alpha = F.softmax(alpha, dim=0)
return alpha * self.lin(x_j)
4. ソーシャルデータとセンチメント分析
4.1 ソーシャルデータ収集
import tweepy
import praw
from textblob import TextBlob
import re
from collections import defaultdict
class SocialDataCollector:
"""ソーシャルメディアデータ収集"""
def __init__(self, api_keys):
# Twitter API
self.twitter_api = tweepy.Client(
bearer_token=api_keys['twitter_bearer_token']
)
# Reddit API
self.reddit = praw.Reddit(
client_id=api_keys['reddit_client_id'],
client_secret=api_keys['reddit_client_secret'],
user_agent='crypto_analyzer'
)
self.sentiment_analyzer = CryptoSentimentAnalyzer()
async def collect_twitter_data(self, query: str, max_results: int = 100):
"""Twitter データ収集"""
tweets = self.twitter_api.search_recent_tweets(
query=query,
max_results=max_results,
tweet_fields=['created_at', 'author_id', 'public_metrics']
)
processed_tweets = []
for tweet in tweets.data:
processed = {
'id': tweet.id,
'text': tweet.text,
'created_at': tweet.created_at,
'metrics': tweet.public_metrics,
'sentiment': self.sentiment_analyzer.analyze(tweet.text),
'entities': self._extract_entities(tweet.text)
}
processed_tweets.append(processed)
return processed_tweets
async def collect_reddit_data(self, subreddit_name: str, limit: int = 100):
"""Reddit データ収集"""
subreddit = self.reddit.subreddit(subreddit_name)
posts = []
for submission in subreddit.hot(limit=limit):
post_data = {
'id': submission.id,
'title': submission.title,
'text': submission.selftext,
'score': submission.score,
'num_comments': submission.num_comments,
'created_utc': submission.created_utc,
'sentiment': self.sentiment_analyzer.analyze(
submission.title + ' ' + submission.selftext
),
'comments': self._get_top_comments(submission)
}
posts.append(post_data)
return posts
def _extract_entities(self, text: str):
"""エンティティ抽出"""
entities = {
'tickers': re.findall(r'\$[A-Z]{2,5}', text),
'urls': re.findall(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+])+', text),
'mentions': re.findall(r'@\w+', text),
'hashtags': re.findall(r'#\w+', text)
}
return entities
def _get_top_comments(self, submission, limit=10):
"""トップコメント取得"""
submission.comments.replace_more(limit=0)
comments = []
for comment in submission.comments[:limit]:
comments.append({
'text': comment.body,
'score': comment.score,
'sentiment': self.sentiment_analyzer.analyze(comment.body)
})
return comments
class CryptoSentimentAnalyzer:
"""暗号資産特化のセンチメント分析"""
def __init__(self):
# カスタム辞書
self.crypto_lexicon = {
'moon': 3, 'lambo': 3, 'hodl': 2, 'bullish': 2,
'dump': -3, 'scam': -3, 'rug': -3, 'bearish': -2,
'fud': -2, 'rekt': -3, 'pump': 1, 'dyor': 0
}
# 絵文字スコア
self.emoji_scores = {
'🚀': 3, '📈': 2, '💎': 2, '🙌': 1,
'📉': -2, '🔻': -2, '😱': -1, '💩': -3
}
def analyze(self, text: str):
"""センチメント分析"""
# 基本的なセンチメント
blob = TextBlob(text.lower())
base_sentiment = blob.sentiment.polarity
# カスタム辞書によるスコア
custom_score = 0
word_count = 0
for word, score in self.crypto_lexicon.items():
if word in text.lower():
custom_score += score
word_count += 1
# 絵文字スコア
emoji_score = 0
emoji_count = 0
for emoji, score in self.emoji_scores.items():
if emoji in text:
emoji_score += score * text.count(emoji)
emoji_count += text.count(emoji)
# 統合スコア
if word_count > 0:
custom_score /= word_count
if emoji_count > 0:
emoji_score /= emoji_count
final_score = 0.5 * base_sentiment + 0.3 * custom_score + 0.2 * emoji_score
return {
'score': final_score,
'base_sentiment': base_sentiment,
'custom_score': custom_score,
'emoji_score': emoji_score,
'label': self._score_to_label(final_score)
}
def _score_to_label(self, score):
"""スコアをラベルに変換"""
if score > 0.5:
return 'very_bullish'
elif score > 0.1:
return 'bullish'
elif score > -0.1:
return 'neutral'
elif score > -0.5:
return 'bearish'
else:
return 'very_bearish'
4.2 ソーシャルシグナルの特徴量化
class SocialFeatureExtractor:
"""ソーシャルデータの特徴量抽出"""
def __init__(self):
self.feature_cache = {}
def extract_aggregate_features(self, social_data: List[Dict],
time_window: int = 3600) -> Dict:
"""集約特徴量の抽出"""
features = {}
# センチメントの統計
sentiments = [item['sentiment']['score'] for item in social_data]
features['avg_sentiment'] = np.mean(sentiments)
features['sentiment_std'] = np.std(sentiments)
features['sentiment_skew'] = self._calculate_skew(sentiments)
# ボリューム指標
features['post_count'] = len(social_data)
features['unique_authors'] = len(set(item.get('author_id') for item in social_data))
# エンゲージメント指標
if 'metrics' in social_data[0]:
total_engagement = sum(
item['metrics'].get('like_count', 0) +
item['metrics'].get('retweet_count', 0) +
item['metrics'].get('reply_count', 0)
for item in social_data
)
features['total_engagement'] = total_engagement
features['avg_engagement'] = total_engagement / len(social_data)
# 時系列特徴
features['sentiment_momentum'] = self._calculate_sentiment_momentum(social_data)
features['volume_acceleration'] = self._calculate_volume_acceleration(social_data)
# トピック分布
features['topic_distribution'] = self._extract_topic_distribution(social_data)
return features
def extract_influence_features(self, social_data: List[Dict]) -> Dict:
"""影響力関連の特徴量"""
influence_scores = []
for item in social_data:
# フォロワー数や評価に基づく影響力スコア
if 'author_followers' in item:
influence = np.log1p(item['author_followers'])
elif 'score' in item: # Reddit
influence = np.log1p(item['score'])
else:
influence = 1.0
influence_scores.append(influence)
return {
'avg_influence': np.mean(influence_scores),
'max_influence': np.max(influence_scores),
'influence_concentration': np.std(influence_scores) / np.mean(influence_scores)
}
def extract_network_features(self, social_data: List[Dict]) -> Dict:
"""ネットワーク分析特徴量"""
# メンショングラフの構築
mention_graph = defaultdict(list)
for item in social_data:
author = item.get('author_id', 'unknown')
mentions = item.get('entities', {}).get('mentions', [])
for mention in mentions:
mention_graph[author].append(mention)
# ネットワーク指標
features = {
'network_density': len(mention_graph) / len(social_data),
'avg_mentions': np.mean([len(v) for v in mention_graph.values()]),
'max_degree': max(len(v) for v in mention_graph.values()) if mention_graph else 0
}
return features
def _calculate_sentiment_momentum(self, social_data: List[Dict]) -> float:
"""センチメントモメンタムの計算"""
# 時系列でソート
sorted_data = sorted(social_data, key=lambda x: x.get('created_at', 0))
if len(sorted_data) < 2:
return 0
# 前半と後半のセンチメント比較
mid_point = len(sorted_data) // 2
first_half_sentiment = np.mean([
item['sentiment']['score'] for item in sorted_data[:mid_point]
])
second_half_sentiment = np.mean([
item['sentiment']['score'] for item in sorted_data[mid_point:]
])
return second_half_sentiment - first_half_sentiment
def _extract_topic_distribution(self, social_data: List[Dict]) -> Dict:
"""トピック分布の抽出"""
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import LatentDirichletAllocation
# テキストの収集
texts = [item.get('text', '') for item in social_data]
if not texts:
return {}
# TF-IDF変換
vectorizer = TfidfVectorizer(max_features=50, stop_words='english')
tfidf_matrix = vectorizer.fit_transform(texts)
# LDAによるトピック抽出
lda = LatentDirichletAllocation(n_components=5, random_state=42)
topic_distribution = lda.fit_transform(tfidf_matrix)
# 平均トピック分布
avg_distribution = topic_distribution.mean(axis=0)
return {
f'topic_{i}': prob for i, prob in enumerate(avg_distribution)
}
5. 統合システムの実装
5.1 データパイプライン
class MultiModalDataPipeline:
"""マルチモーダルデータパイプライン"""
def __init__(self, config):
self.config = config
# データコレクター
self.onchain_collector = OnChainDataCollector(
config['rpc_endpoints'],
config['api_keys']
)
self.social_collector = SocialDataCollector(config['api_keys'])
# 特徴抽出器
self.onchain_features = OnChainFeatureEngineering()
self.social_features = SocialFeatureExtractor()
# データストア
self.data_store = MultiModalDataStore()
async def collect_realtime_data(self, symbols: List[str]):
"""リアルタイムデータ収集"""
tasks = []
# 並行データ収集
for symbol in symbols:
tasks.extend([
self._collect_onchain_data(symbol),
self._collect_social_data(symbol),
self._collect_price_data(symbol),
self._collect_news_data(symbol)
])
results = await asyncio.gather(*tasks)
# データの統合と保存
integrated_data = self._integrate_data(results, symbols)
self.data_store.save(integrated_data)
return integrated_data
async def _collect_onchain_data(self, symbol: str):
"""オンチェーンデータ収集"""
chain = self._symbol_to_chain(symbol)
# ブロックデータ
block_data = await self.onchain_collector.collect_block_data(chain)
# DeFiメトリクス
defi_data = await self.onchain_collector.collect_defi_metrics(
self._get_defi_protocol(symbol), chain
)
# 特徴量抽出
features = self.onchain_features.extract_network_features([block_data])
features.update(self.onchain_features.extract_defi_features(defi_data))
return {
'type': 'onchain',
'symbol': symbol,
'timestamp': datetime.now(),
'raw_data': {'block': block_data, 'defi': defi_data},
'features': features
}
def prepare_training_data(self, start_date: datetime, end_date: datetime):
"""訓練データの準備"""
# データ読み込み
raw_data = self.data_store.load_range(start_date, end_date)
# モダリティごとに整理
modality_data = self._organize_by_modality(raw_data)
# 時系列アライメント
aligned_data = self._align_temporal_data(modality_data)
# 特徴量の正規化
normalized_data = self._normalize_features(aligned_data)
# ラベルの生成
labels = self._generate_labels(aligned_data)
return normalized_data, labels
def _align_temporal_data(self, modality_data: Dict) -> Dict:
"""時系列データのアライメント"""
# 共通のタイムスタンプグリッドを作成
all_timestamps = set()
for modality in modality_data.values():
all_timestamps.update(modality['timestamps'])
common_timestamps = sorted(all_timestamps)
# 各モダリティを共通グリッドに補間
aligned = {}
for modality_name, data in modality_data.items():
aligned[modality_name] = self._interpolate_to_grid(
data, common_timestamps
)
return aligned
5.2 訓練と評価
class MultiModalTrainer:
"""マルチモーダルモデルの訓練"""
def __init__(self, model, config):
self.model = model
self.config = config
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# 最適化
self.optimizer = torch.optim.AdamW(
model.parameters(),
lr=config['learning_rate'],
weight_decay=config['weight_decay']
)
# スケジューラー
self.scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
self.optimizer,
T_max=config['epochs']
)
# 損失関数
self.criterion = self._build_loss_function()
def train(self, train_loader, val_loader, epochs):
"""モデルの訓練"""
best_val_loss = float('inf')
for epoch in range(epochs):
# 訓練
train_loss = self._train_epoch(train_loader)
# 検証
val_loss, val_metrics = self._validate(val_loader)
# スケジューラー更新
self.scheduler.step()
# ログ
print(f"Epoch {epoch}: Train Loss = {train_loss:.4f}, "
f"Val Loss = {val_loss:.4f}")
# ベストモデル保存
if val_loss < best_val_loss:
best_val_loss = val_loss
self._save_checkpoint(epoch, val_loss, val_metrics)
# Early stopping
if self._should_stop_early(val_loss):
print("Early stopping triggered")
break
def _train_epoch(self, train_loader):
"""1エポックの訓練"""
self.model.train()
total_loss = 0
for batch in train_loader:
# データの準備
price_data = batch['price'].to(self.device)
onchain_data = batch['onchain'].to(self.device)
text_data = batch['text']
image_data = batch['image'].to(self.device)
labels = batch['label'].to(self.device)
# 勾配リセット
self.optimizer.zero_grad()
# 予測
outputs = self.model(price_data, onchain_data, text_data, image_data)
# 損失計算
loss = self.criterion(outputs, labels)
# バックプロパゲーション
loss.backward()
# 勾配クリッピング
torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
# パラメータ更新
self.optimizer.step()
total_loss += loss.item()
return total_loss / len(train_loader)
def _build_loss_function(self):
"""カスタム損失関数の構築"""
if self.config['task'] == 'regression':
return nn.MSELoss()
elif self.config['task'] == 'classification':
return nn.CrossEntropyLoss()
elif self.config['task'] == 'multi_task':
return MultiTaskLoss(
regression_weight=self.config['regression_weight'],
classification_weight=self.config['classification_weight']
)
class MultiTaskLoss(nn.Module):
"""マルチタスク学習用の損失関数"""
def __init__(self, regression_weight=0.5, classification_weight=0.5):
super(MultiTaskLoss, self).__init__()
self.regression_weight = regression_weight
self.classification_weight = classification_weight
self.mse = nn.MSELoss()
self.ce = nn.CrossEntropyLoss()
def forward(self, outputs, targets):
# outputs: {'regression': tensor, 'classification': tensor}
# targets: {'regression': tensor, 'classification': tensor}
reg_loss = self.mse(outputs['regression'], targets['regression'])
cls_loss = self.ce(outputs['classification'], targets['classification'])
total_loss = (self.regression_weight * reg_loss +
self.classification_weight * cls_loss)
return total_loss
5.3 推論とリアルタイム予測
class MultiModalPredictor:
"""マルチモーダルリアルタイム予測"""
def __init__(self, model_path: str, config: dict):
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# モデルのロード
self.model = self._load_model(model_path, config)
self.model.eval()
# データパイプライン
self.pipeline = MultiModalDataPipeline(config)
# バッファ
self.data_buffer = deque(maxlen=config['buffer_size'])
async def predict_realtime(self, symbol: str):
"""リアルタイム予測"""
# 最新データの収集
latest_data = await self.pipeline.collect_realtime_data([symbol])
# バッファに追加
self.data_buffer.append(latest_data)
# 予測用データの準備
input_data = self._prepare_prediction_input()
# 予測実行
with torch.no_grad():
prediction = self.model(
input_data['price'],
input_data['onchain'],
input_data['text'],
input_data['image']
)
# 後処理
result = self._postprocess_prediction(prediction, symbol)
# 信頼度の計算
confidence = self._calculate_confidence(prediction, input_data)
result['confidence'] = confidence
return result
def _calculate_confidence(self, prediction, input_data):
"""予測の信頼度計算"""
# アンサンブル不確実性
if hasattr(self.model, 'dropout'):
# MCドロップアウト
predictions = []
self.model.train() # ドロップアウトを有効化
for _ in range(10):
with torch.no_grad():
pred = self.model(
input_data['price'],
input_data['onchain'],
input_data['text'],
input_data['image']
)
predictions.append(pred)
self.model.eval()
# 予測の分散から不確実性を計算
predictions = torch.stack(predictions)
uncertainty = torch.var(predictions, dim=0)
confidence = 1 / (1 + uncertainty.mean().item())
else:
# デフォルトの信頼度
confidence = 0.8
return confidence
async def batch_predict(self, symbols: List[str]):
"""バッチ予測"""
# 並行データ収集
all_data = await self.pipeline.collect_realtime_data(symbols)
# バッチ処理用にデータを整理
batch_data = self._organize_batch_data(all_data)
# バッチ予測
with torch.no_grad():
predictions = self.model(**batch_data)
# 結果の整理
results = {}
for i, symbol in enumerate(symbols):
results[symbol] = {
'prediction': predictions[i],
'confidence': self._calculate_confidence(
predictions[i],
{k: v[i] for k, v in batch_data.items()}
)
}
return results
6. 実装例とベストプラクティス
6.1 完全な実装例
class CryptoMultiModalSystem:
"""暗号資産マルチモーダル分析システム"""
def __init__(self, config_path: str):
self.config = self._load_config(config_path)
# コンポーネントの初期化
self.data_pipeline = MultiModalDataPipeline(self.config)
self.model = self._build_model()
self.trainer = MultiModalTrainer(self.model, self.config)
self.predictor = None
# モニタリング
self.monitor = SystemMonitor()
def train_model(self, start_date: str, end_date: str):
"""モデルの訓練"""
# データ準備
print("Preparing training data...")
train_data, val_data = self.data_pipeline.prepare_training_data(
datetime.strptime(start_date, '%Y-%m-%d'),
datetime.strptime(end_date, '%Y-%m-%d')
)
# データローダー作成
train_loader = self._create_dataloader(train_data, shuffle=True)
val_loader = self._create_dataloader(val_data, shuffle=False)
# 訓練
print("Starting training...")
self.trainer.train(
train_loader,
val_loader,
epochs=self.config['epochs']
)
print("Training completed!")
async def run_live_prediction(self, symbols: List[str]):
"""ライブ予測の実行"""
if self.predictor is None:
self.predictor = MultiModalPredictor(
self.config['model_path'],
self.config
)
print(f"Starting live prediction for {symbols}")
while True:
try:
# 予測実行
predictions = await self.predictor.batch_predict(symbols)
# 結果の処理
self._process_predictions(predictions)
# モニタリング
self.monitor.log_prediction(predictions)
# 待機
await asyncio.sleep(self.config['prediction_interval'])
except Exception as e:
print(f"Error in live prediction: {e}")
self.monitor.log_error(e)
await asyncio.sleep(60) # エラー時は1分待機
def _process_predictions(self, predictions: Dict):
"""予測結果の処理"""
for symbol, result in predictions.items():
prediction = result['prediction']
confidence = result['confidence']
# 取引シグナルの生成
if confidence > self.config['confidence_threshold']:
signal = self._generate_trading_signal(prediction, confidence)
if signal:
print(f"Signal for {symbol}: {signal}")
# 取引システムへの送信
self._send_to_trading_system(symbol, signal)
# アラート
if self._should_alert(prediction, confidence):
self._send_alert(symbol, prediction, confidence)
def backtest(self, start_date: str, end_date: str):
"""バックテスト"""
# 履歴データの準備
historical_data = self.data_pipeline.load_historical_data(
start_date, end_date
)
# バックテストエンジン
backtester = MultiModalBacktester(
self.model,
self.config['backtest_config']
)
# 実行
results = backtester.run(historical_data)
# レポート生成
self._generate_backtest_report(results)
return results
class SystemMonitor:
"""システムモニタリング"""
def __init__(self):
self.metrics = defaultdict(list)
self.alerts = []
def log_prediction(self, predictions):
"""予測のログ"""
timestamp = datetime.now()
for symbol, result in predictions.items():
self.metrics[f'{symbol}_prediction'].append({
'timestamp': timestamp,
'value': result['prediction'],
'confidence': result['confidence']
})
# 異常検知
self._check_anomalies(predictions)
def _check_anomalies(self, predictions):
"""異常検知"""
for symbol, result in predictions.items():
# 予測値の急激な変化
if len(self.metrics[f'{symbol}_prediction']) > 10:
recent_predictions = [
m['value'] for m in self.metrics[f'{symbol}_prediction'][-10:]
]
if np.std(recent_predictions) > self.config['anomaly_threshold']:
self.alerts.append({
'type': 'prediction_anomaly',
'symbol': symbol,
'timestamp': datetime.now(),
'details': 'High prediction variance detected'
})
6.2 最適化とパフォーマンス
class PerformanceOptimizer:
"""パフォーマンス最適化"""
def __init__(self):
self.profiler = torch.profiler.profile(
activities=[
torch.profiler.ProfilerActivity.CPU,
torch.profiler.ProfilerActivity.CUDA,
]
)
def optimize_data_pipeline(self, pipeline):
"""データパイプラインの最適化"""
# 並列処理の最適化
pipeline.num_workers = multiprocessing.cpu_count()
# キャッシング
pipeline.enable_caching = True
pipeline.cache_size = 10000
# バッチサイズの動的調整
pipeline.dynamic_batch_size = True
def optimize_model(self, model):
"""モデルの最適化"""
# 量子化
quantized_model = torch.quantization.quantize_dynamic(
model, {nn.Linear}, dtype=torch.qint8
)
# プルーニング
pruned_model = self._prune_model(model, amount=0.3)
# JITコンパイル
scripted_model = torch.jit.script(pruned_model)
return scripted_model
def profile_inference(self, model, sample_input):
"""推論のプロファイリング"""
with self.profiler:
for _ in range(100):
_ = model(**sample_input)
# 結果の分析
print(self.profiler.key_averages().table(
sort_by="cuda_time_total", row_limit=10
))
7. まとめとベストプラクティス
7.1 実装ガイドライン
-
データ収集
- オンチェーンデータは15分ごとに更新
- ソーシャルデータは5分ごとに収集
- 価格データはリアルタイムストリーミング -
特徴量エンジニアリング
- モダリティごとに特化した特徴量を設計
- 時系列アライメントは必須
- 正規化とスケーリングに注意 -
モデル設計
- 階層的融合が効果的
- アテンション機構で重要な情報を選択
- ドロップアウトで過学習を防ぐ -
運用
- 継続的な再学習が必要
- A/Bテストで改善を検証
- モニタリングとアラートの設定
7.2 注意点
# 実装チェックリスト
MULTIMODAL_CHECKLIST = {
"データ品質": [
"オンチェーンデータの検証",
"ソーシャルデータのノイズ除去",
"時系列の同期確認"
],
"モデル設計": [
"モダリティ間のバランス",
"過学習の防止",
"計算効率の最適化"
],
"運用": [
"レイテンシの管理",
"スケーラビリティの確保",
"障害時のフォールバック"
]
}
マルチモーダル学習とオンチェーンデータの統合により、暗号資産市場のより深い理解と精度の高い予測が可能になります。各データソースの特性を理解し、適切に統合することが成功の鍵となります。