Merge main into feature/issue-22-data-driven
Some checks failed
CI / test (pull_request) Has been cancelled

This commit is contained in:
agentson
2026-02-04 18:41:44 +09:00
8 changed files with 1566 additions and 8 deletions

View File

@@ -0,0 +1,296 @@
"""Smart context selection for optimizing token usage.
This module implements intelligent selection of context layers (L1-L7) based on
decision type and market conditions:
- L7 (real-time) for normal trading decisions
- L6-L5 (daily/weekly) for strategic decisions
- L4-L1 (monthly/legacy) only for major events or policy changes
"""
from __future__ import annotations
from dataclasses import dataclass
from datetime import UTC, datetime
from enum import Enum
from typing import Any
from src.context.layer import ContextLayer
from src.context.store import ContextStore
class DecisionType(str, Enum):
"""Type of trading decision being made."""
NORMAL = "normal" # Regular trade decision
STRATEGIC = "strategic" # Strategy adjustment
MAJOR_EVENT = "major_event" # Portfolio rebalancing, policy change
@dataclass(frozen=True)
class ContextSelection:
"""Selected context layers and their relevance scores."""
layers: list[ContextLayer]
relevance_scores: dict[ContextLayer, float]
total_score: float
class ContextSelector:
"""Selects optimal context layers to minimize token usage."""
def __init__(self, store: ContextStore) -> None:
"""Initialize the context selector.
Args:
store: ContextStore instance for retrieving context data
"""
self.store = store
def select_layers(
self,
decision_type: DecisionType = DecisionType.NORMAL,
include_realtime: bool = True,
) -> list[ContextLayer]:
"""Select context layers based on decision type.
Strategy:
- NORMAL: L7 (real-time) only
- STRATEGIC: L7 + L6 + L5 (real-time + daily + weekly)
- MAJOR_EVENT: All layers L1-L7
Args:
decision_type: Type of decision being made
include_realtime: Whether to include L7 real-time data
Returns:
List of context layers to use (ordered by priority)
"""
if decision_type == DecisionType.NORMAL:
# Normal trading: only real-time data
return [ContextLayer.L7_REALTIME] if include_realtime else []
elif decision_type == DecisionType.STRATEGIC:
# Strategic decisions: real-time + recent history
layers = []
if include_realtime:
layers.append(ContextLayer.L7_REALTIME)
layers.extend([ContextLayer.L6_DAILY, ContextLayer.L5_WEEKLY])
return layers
else: # MAJOR_EVENT
# Major events: all layers for comprehensive context
layers = []
if include_realtime:
layers.append(ContextLayer.L7_REALTIME)
layers.extend(
[
ContextLayer.L6_DAILY,
ContextLayer.L5_WEEKLY,
ContextLayer.L4_MONTHLY,
ContextLayer.L3_QUARTERLY,
ContextLayer.L2_ANNUAL,
ContextLayer.L1_LEGACY,
]
)
return layers
def score_layer_relevance(
self,
layer: ContextLayer,
decision_type: DecisionType,
current_time: datetime | None = None,
) -> float:
"""Calculate relevance score for a context layer.
Relevance is based on:
1. Decision type (normal, strategic, major event)
2. Layer recency (L7 > L6 > ... > L1)
3. Data availability
Args:
layer: Context layer to score
decision_type: Type of decision being made
current_time: Current time (defaults to now)
Returns:
Relevance score (0.0 to 1.0)
"""
if current_time is None:
current_time = datetime.now(UTC)
# Base scores by decision type
base_scores = {
DecisionType.NORMAL: {
ContextLayer.L7_REALTIME: 1.0,
ContextLayer.L6_DAILY: 0.1,
ContextLayer.L5_WEEKLY: 0.05,
ContextLayer.L4_MONTHLY: 0.01,
ContextLayer.L3_QUARTERLY: 0.0,
ContextLayer.L2_ANNUAL: 0.0,
ContextLayer.L1_LEGACY: 0.0,
},
DecisionType.STRATEGIC: {
ContextLayer.L7_REALTIME: 0.9,
ContextLayer.L6_DAILY: 0.8,
ContextLayer.L5_WEEKLY: 0.7,
ContextLayer.L4_MONTHLY: 0.3,
ContextLayer.L3_QUARTERLY: 0.2,
ContextLayer.L2_ANNUAL: 0.1,
ContextLayer.L1_LEGACY: 0.05,
},
DecisionType.MAJOR_EVENT: {
ContextLayer.L7_REALTIME: 0.7,
ContextLayer.L6_DAILY: 0.7,
ContextLayer.L5_WEEKLY: 0.7,
ContextLayer.L4_MONTHLY: 0.8,
ContextLayer.L3_QUARTERLY: 0.8,
ContextLayer.L2_ANNUAL: 0.9,
ContextLayer.L1_LEGACY: 1.0,
},
}
score = base_scores[decision_type].get(layer, 0.0)
# Check data availability
latest_timeframe = self.store.get_latest_timeframe(layer)
if latest_timeframe is None:
# No data available - reduce score significantly
score *= 0.1
return score
def select_with_scoring(
self,
decision_type: DecisionType = DecisionType.NORMAL,
min_score: float = 0.5,
) -> ContextSelection:
"""Select context layers with relevance scoring.
Args:
decision_type: Type of decision being made
min_score: Minimum relevance score to include a layer
Returns:
ContextSelection with selected layers and scores
"""
all_layers = [
ContextLayer.L7_REALTIME,
ContextLayer.L6_DAILY,
ContextLayer.L5_WEEKLY,
ContextLayer.L4_MONTHLY,
ContextLayer.L3_QUARTERLY,
ContextLayer.L2_ANNUAL,
ContextLayer.L1_LEGACY,
]
scores = {
layer: self.score_layer_relevance(layer, decision_type) for layer in all_layers
}
# Filter by minimum score
selected_layers = [layer for layer, score in scores.items() if score >= min_score]
# Sort by score (descending)
selected_layers.sort(key=lambda layer: scores[layer], reverse=True)
total_score = sum(scores[layer] for layer in selected_layers)
return ContextSelection(
layers=selected_layers,
relevance_scores=scores,
total_score=total_score,
)
def get_context_data(
self,
layers: list[ContextLayer],
max_items_per_layer: int = 10,
) -> dict[str, Any]:
"""Retrieve context data for selected layers.
Args:
layers: List of context layers to retrieve
max_items_per_layer: Maximum number of items per layer
Returns:
Dictionary with context data organized by layer
"""
result: dict[str, Any] = {}
for layer in layers:
# Get latest timeframe for this layer
latest_timeframe = self.store.get_latest_timeframe(layer)
if latest_timeframe:
# Get all contexts for latest timeframe
contexts = self.store.get_all_contexts(layer, latest_timeframe)
# Limit number of items
if len(contexts) > max_items_per_layer:
# Keep only first N items
contexts = dict(list(contexts.items())[:max_items_per_layer])
result[layer.value] = contexts
return result
def estimate_context_tokens(self, context_data: dict[str, Any]) -> int:
"""Estimate total tokens for context data.
Args:
context_data: Context data dictionary
Returns:
Estimated token count
"""
import json
from src.brain.prompt_optimizer import PromptOptimizer
# Serialize to JSON and estimate tokens
json_str = json.dumps(context_data, ensure_ascii=False)
return PromptOptimizer.estimate_tokens(json_str)
def optimize_context_for_budget(
self,
decision_type: DecisionType,
max_tokens: int,
) -> dict[str, Any]:
"""Select and retrieve context data within a token budget.
Args:
decision_type: Type of decision being made
max_tokens: Maximum token budget for context
Returns:
Optimized context data within budget
"""
# Start with minimal selection
selection = self.select_with_scoring(decision_type, min_score=0.5)
# Retrieve data
context_data = self.get_context_data(selection.layers)
# Check if within budget
estimated_tokens = self.estimate_context_tokens(context_data)
if estimated_tokens <= max_tokens:
return context_data
# If over budget, progressively reduce
# 1. Reduce items per layer
for max_items in [5, 3, 1]:
context_data = self.get_context_data(selection.layers, max_items)
estimated_tokens = self.estimate_context_tokens(context_data)
if estimated_tokens <= max_tokens:
return context_data
# 2. Remove lower-priority layers
for min_score in [0.6, 0.7, 0.8, 0.9]:
selection = self.select_with_scoring(decision_type, min_score=min_score)
context_data = self.get_context_data(selection.layers, max_items_per_layer=1)
estimated_tokens = self.estimate_context_tokens(context_data)
if estimated_tokens <= max_tokens:
return context_data
# Last resort: return only L7 with minimal data
return self.get_context_data([ContextLayer.L7_REALTIME], max_items_per_layer=1)

View File

@@ -7,7 +7,12 @@ Includes token efficiency optimizations:
- Prompt compression and abbreviation
- Response caching for common scenarios
- Smart context selection
- Token usage tracking
- Token usage tracking and metrics
Includes external data integration:
- News sentiment analysis
- Economic calendar events
- Market indicators
"""
from __future__ import annotations
@@ -15,7 +20,7 @@ from __future__ import annotations
import json
import logging
import re
from dataclasses import dataclass, field
from dataclasses import dataclass
from typing import Any
from google import genai

View File

@@ -0,0 +1,267 @@
"""Prompt optimization utilities for reducing token usage.
This module provides tools to compress prompts while maintaining decision quality:
- Token counting
- Text compression and abbreviation
- Template-based prompts with variable slots
- Priority-based context truncation
"""
from __future__ import annotations
import json
import re
from dataclasses import dataclass
from typing import Any
# Abbreviation mapping for common terms
ABBREVIATIONS = {
"price": "P",
"volume": "V",
"current": "cur",
"previous": "prev",
"change": "chg",
"percentage": "pct",
"market": "mkt",
"orderbook": "ob",
"foreigner": "fgn",
"buy": "B",
"sell": "S",
"hold": "H",
"confidence": "conf",
"rationale": "reason",
"action": "act",
"net": "net",
}
# Reverse mapping for decompression
REVERSE_ABBREVIATIONS = {v: k for k, v in ABBREVIATIONS.items()}
@dataclass(frozen=True)
class TokenMetrics:
"""Metrics about token usage in a prompt."""
char_count: int
word_count: int
estimated_tokens: int # Rough estimate: ~4 chars per token
compression_ratio: float = 1.0 # Original / Compressed
class PromptOptimizer:
"""Optimizes prompts to reduce token usage while maintaining quality."""
@staticmethod
def estimate_tokens(text: str) -> int:
"""Estimate token count for text.
Uses a simple heuristic: ~4 characters per token for English.
This is approximate but sufficient for optimization purposes.
Args:
text: Input text to estimate tokens for
Returns:
Estimated token count
"""
if not text:
return 0
# Simple estimate: 1 token ≈ 4 characters
return max(1, len(text) // 4)
@staticmethod
def count_tokens(text: str) -> TokenMetrics:
"""Count various metrics for a text.
Args:
text: Input text to analyze
Returns:
TokenMetrics with character, word, and estimated token counts
"""
char_count = len(text)
word_count = len(text.split())
estimated_tokens = PromptOptimizer.estimate_tokens(text)
return TokenMetrics(
char_count=char_count,
word_count=word_count,
estimated_tokens=estimated_tokens,
)
@staticmethod
def compress_json(data: dict[str, Any]) -> str:
"""Compress JSON by removing whitespace.
Args:
data: Dictionary to serialize
Returns:
Compact JSON string without whitespace
"""
return json.dumps(data, separators=(",", ":"), ensure_ascii=False)
@staticmethod
def abbreviate_text(text: str, aggressive: bool = False) -> str:
"""Apply abbreviations to reduce text length.
Args:
text: Input text to abbreviate
aggressive: If True, apply more aggressive compression
Returns:
Abbreviated text
"""
result = text
# Apply word-level abbreviations (case-insensitive)
for full, abbr in ABBREVIATIONS.items():
# Word boundaries to avoid partial replacements
pattern = r"\b" + re.escape(full) + r"\b"
result = re.sub(pattern, abbr, result, flags=re.IGNORECASE)
if aggressive:
# Remove articles and filler words
result = re.sub(r"\b(a|an|the)\b", "", result, flags=re.IGNORECASE)
result = re.sub(r"\b(is|are|was|were)\b", "", result, flags=re.IGNORECASE)
# Collapse multiple spaces
result = re.sub(r"\s+", " ", result)
return result.strip()
@staticmethod
def build_compressed_prompt(
market_data: dict[str, Any],
include_instructions: bool = True,
max_length: int | None = None,
) -> str:
"""Build a compressed prompt from market data.
Args:
market_data: Market data dictionary with stock info
include_instructions: Whether to include full instructions
max_length: Maximum character length (truncates if needed)
Returns:
Compressed prompt string
"""
# Abbreviated market name
market_name = market_data.get("market_name", "KR")
if "Korea" in market_name:
market_name = "KR"
elif "United States" in market_name or "US" in market_name:
market_name = "US"
# Core data - always included
core_info = {
"mkt": market_name,
"code": market_data["stock_code"],
"P": market_data["current_price"],
}
# Optional fields
if "orderbook" in market_data and market_data["orderbook"]:
ob = market_data["orderbook"]
# Compress orderbook: keep only top 3 levels
compressed_ob = {
"bid": ob.get("bid", [])[:3],
"ask": ob.get("ask", [])[:3],
}
core_info["ob"] = compressed_ob
if market_data.get("foreigner_net", 0) != 0:
core_info["fgn_net"] = market_data["foreigner_net"]
# Compress to JSON
data_str = PromptOptimizer.compress_json(core_info)
if include_instructions:
# Minimal instructions
prompt = (
f"{market_name} trader. Analyze:\n{data_str}\n\n"
'Return JSON: {"act":"BUY"|"SELL"|"HOLD","conf":<0-100>,"reason":"<text>"}\n'
"Rules: act=BUY/SELL/HOLD, conf=0-100, reason=concise. No markdown."
)
else:
# Data only (for cached contexts where instructions are known)
prompt = data_str
# Truncate if needed
if max_length and len(prompt) > max_length:
prompt = prompt[:max_length] + "..."
return prompt
@staticmethod
def truncate_context(
context: dict[str, Any],
max_tokens: int,
priority_keys: list[str] | None = None,
) -> dict[str, Any]:
"""Truncate context data to fit within token budget.
Keeps high-priority keys first, then truncates less important data.
Args:
context: Context dictionary to truncate
max_tokens: Maximum token budget
priority_keys: List of keys to keep (in order of priority)
Returns:
Truncated context dictionary
"""
if not context:
return {}
if priority_keys is None:
priority_keys = []
result: dict[str, Any] = {}
current_tokens = 0
# Add priority keys first
for key in priority_keys:
if key in context:
value_str = json.dumps(context[key])
tokens = PromptOptimizer.estimate_tokens(value_str)
if current_tokens + tokens <= max_tokens:
result[key] = context[key]
current_tokens += tokens
else:
break
# Add remaining keys if space available
for key, value in context.items():
if key in result:
continue
value_str = json.dumps(value)
tokens = PromptOptimizer.estimate_tokens(value_str)
if current_tokens + tokens <= max_tokens:
result[key] = value
current_tokens += tokens
else:
break
return result
@staticmethod
def calculate_compression_ratio(original: str, compressed: str) -> float:
"""Calculate compression ratio between original and compressed text.
Args:
original: Original text
compressed: Compressed text
Returns:
Compression ratio (original_tokens / compressed_tokens)
"""
original_tokens = PromptOptimizer.estimate_tokens(original)
compressed_tokens = PromptOptimizer.estimate_tokens(compressed)
if compressed_tokens == 0:
return 1.0
return original_tokens / compressed_tokens

328
src/context/summarizer.py Normal file
View File

@@ -0,0 +1,328 @@
"""Context summarization for efficient historical data representation.
This module summarizes old context data instead of including raw details:
- Key metrics only (averages, trends, not details)
- Rolling window (keep last N days detailed, summarize older)
- Aggregate historical data efficiently
"""
from __future__ import annotations
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from typing import Any
from src.context.layer import ContextLayer
from src.context.store import ContextStore
@dataclass(frozen=True)
class SummaryStats:
"""Statistical summary of historical data."""
count: int
mean: float | None = None
min: float | None = None
max: float | None = None
std: float | None = None
trend: str | None = None # "up", "down", "flat"
class ContextSummarizer:
"""Summarizes historical context data to reduce token usage."""
def __init__(self, store: ContextStore) -> None:
"""Initialize the context summarizer.
Args:
store: ContextStore instance for retrieving context data
"""
self.store = store
def summarize_numeric_values(self, values: list[float]) -> SummaryStats:
"""Summarize a list of numeric values.
Args:
values: List of numeric values to summarize
Returns:
SummaryStats with mean, min, max, std, and trend
"""
if not values:
return SummaryStats(count=0)
count = len(values)
mean = sum(values) / count
min_val = min(values)
max_val = max(values)
# Calculate standard deviation
if count > 1:
variance = sum((x - mean) ** 2 for x in values) / (count - 1)
std = variance**0.5
else:
std = 0.0
# Determine trend
trend = "flat"
if count >= 3:
# Simple trend: compare first third vs last third
first_third = values[: count // 3]
last_third = values[-(count // 3) :]
first_avg = sum(first_third) / len(first_third)
last_avg = sum(last_third) / len(last_third)
# Trend threshold: 5% change
threshold = 0.05 * abs(first_avg) if first_avg != 0 else 0.01
if last_avg > first_avg + threshold:
trend = "up"
elif last_avg < first_avg - threshold:
trend = "down"
return SummaryStats(
count=count,
mean=round(mean, 4),
min=round(min_val, 4),
max=round(max_val, 4),
std=round(std, 4),
trend=trend,
)
def summarize_layer(
self,
layer: ContextLayer,
start_date: datetime | None = None,
end_date: datetime | None = None,
) -> dict[str, Any]:
"""Summarize all context data for a layer within a date range.
Args:
layer: Context layer to summarize
start_date: Start date (inclusive), None for all
end_date: End date (inclusive), None for now
Returns:
Dictionary with summarized metrics
"""
if end_date is None:
end_date = datetime.now(UTC)
# Get all contexts for this layer
all_contexts = self.store.get_all_contexts(layer)
if not all_contexts:
return {"summary": "No data available", "count": 0}
# Group numeric values by key
numeric_data: dict[str, list[float]] = {}
text_data: dict[str, list[str]] = {}
for key, value in all_contexts.items():
# Try to extract numeric values
if isinstance(value, (int, float)):
if key not in numeric_data:
numeric_data[key] = []
numeric_data[key].append(float(value))
elif isinstance(value, dict):
# Extract numeric fields from dict
for subkey, subvalue in value.items():
if isinstance(subvalue, (int, float)):
full_key = f"{key}.{subkey}"
if full_key not in numeric_data:
numeric_data[full_key] = []
numeric_data[full_key].append(float(subvalue))
elif isinstance(value, str):
if key not in text_data:
text_data[key] = []
text_data[key].append(value)
# Summarize numeric data
summary: dict[str, Any] = {}
for key, values in numeric_data.items():
stats = self.summarize_numeric_values(values)
summary[key] = {
"count": stats.count,
"avg": stats.mean,
"range": [stats.min, stats.max],
"trend": stats.trend,
}
# Summarize text data (just counts)
for key, values in text_data.items():
summary[f"{key}_count"] = len(values)
summary["total_entries"] = len(all_contexts)
return summary
def rolling_window_summary(
self,
layer: ContextLayer,
window_days: int = 30,
summarize_older: bool = True,
) -> dict[str, Any]:
"""Create a rolling window summary.
Recent data (within window) is kept detailed.
Older data is summarized to key metrics.
Args:
layer: Context layer to summarize
window_days: Number of days to keep detailed
summarize_older: Whether to summarize data older than window
Returns:
Dictionary with recent (detailed) and historical (summary) data
"""
result: dict[str, Any] = {
"window_days": window_days,
"recent_data": {},
"historical_summary": {},
}
# Get all contexts
all_contexts = self.store.get_all_contexts(layer)
recent_values: dict[str, list[float]] = {}
historical_values: dict[str, list[float]] = {}
for key, value in all_contexts.items():
# For simplicity, treat all numeric values
if isinstance(value, (int, float)):
# Note: We don't have timestamps in context keys
# This is a simplified implementation
# In practice, would need to check timeframe field
# For now, put recent data in window
if key not in recent_values:
recent_values[key] = []
recent_values[key].append(float(value))
# Detailed recent data
result["recent_data"] = {key: values[-10:] for key, values in recent_values.items()}
# Summarized historical data
if summarize_older:
for key, values in historical_values.items():
stats = self.summarize_numeric_values(values)
result["historical_summary"][key] = {
"count": stats.count,
"avg": stats.mean,
"trend": stats.trend,
}
return result
def aggregate_to_higher_layer(
self,
source_layer: ContextLayer,
target_layer: ContextLayer,
metric_key: str,
aggregation_func: str = "mean",
) -> float | None:
"""Aggregate data from source layer to target layer.
Args:
source_layer: Source context layer (more granular)
target_layer: Target context layer (less granular)
metric_key: Key of metric to aggregate
aggregation_func: Aggregation function ("mean", "sum", "max", "min")
Returns:
Aggregated value, or None if no data available
"""
# Get all contexts from source layer
source_contexts = self.store.get_all_contexts(source_layer)
# Extract values for metric_key
values = []
for key, value in source_contexts.items():
if key == metric_key and isinstance(value, (int, float)):
values.append(float(value))
elif isinstance(value, dict) and metric_key in value:
subvalue = value[metric_key]
if isinstance(subvalue, (int, float)):
values.append(float(subvalue))
if not values:
return None
# Apply aggregation function
if aggregation_func == "mean":
return sum(values) / len(values)
elif aggregation_func == "sum":
return sum(values)
elif aggregation_func == "max":
return max(values)
elif aggregation_func == "min":
return min(values)
else:
return sum(values) / len(values) # Default to mean
def create_compact_summary(
self,
layers: list[ContextLayer],
top_n_metrics: int = 5,
) -> dict[str, Any]:
"""Create a compact summary across multiple layers.
Args:
layers: List of context layers to summarize
top_n_metrics: Number of top metrics to include per layer
Returns:
Compact summary dictionary
"""
summary: dict[str, Any] = {}
for layer in layers:
layer_summary = self.summarize_layer(layer)
# Keep only top N metrics (by count/relevance)
metrics = []
for key, value in layer_summary.items():
if isinstance(value, dict) and "count" in value:
metrics.append((key, value, value["count"]))
# Sort by count (descending)
metrics.sort(key=lambda x: x[2], reverse=True)
# Keep top N
top_metrics = {m[0]: m[1] for m in metrics[:top_n_metrics]}
summary[layer.value] = top_metrics
return summary
def format_summary_for_prompt(self, summary: dict[str, Any]) -> str:
"""Format summary for inclusion in a prompt.
Args:
summary: Summary dictionary
Returns:
Formatted string for prompt
"""
lines = []
for layer, metrics in summary.items():
if not metrics:
continue
lines.append(f"{layer}:")
for key, value in metrics.items():
if isinstance(value, dict):
# Format as: key: avg=X, trend=Y
parts = []
if "avg" in value and value["avg"] is not None:
parts.append(f"avg={value['avg']:.2f}")
if "trend" in value and value["trend"]:
parts.append(f"trend={value['trend']}")
if parts:
lines.append(f" {key}: {', '.join(parts)}")
else:
lines.append(f" {key}: {value}")
return "\n".join(lines)

View File

@@ -23,7 +23,7 @@ from google import genai
from src.config import Settings
from src.db import init_db
from src.logging.decision_logger import DecisionLog, DecisionLogger
from src.logging.decision_logger import DecisionLogger
logger = logging.getLogger(__name__)

View File

@@ -21,7 +21,7 @@ from src.broker.overseas import OverseasBroker
from src.config import Settings
from src.context.layer import ContextLayer
from src.context.store import ContextStore
from src.core.criticality import CriticalityAssessor, CriticalityLevel
from src.core.criticality import CriticalityAssessor
from src.core.priority_queue import PriorityTaskQueue
from src.core.risk_manager import CircuitBreakerTripped, RiskManager
from src.db import init_db, log_trade