Some checks failed
CI / test (pull_request) Has been cancelled
- Fix ambiguous variable names (l → layer) - Remove unused imports and variables - Organize import statements Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
297 lines
9.7 KiB
Python
297 lines
9.7 KiB
Python
"""Smart context selection for optimizing token usage.
|
|
|
|
This module implements intelligent selection of context layers (L1-L7) based on
|
|
decision type and market conditions:
|
|
- L7 (real-time) for normal trading decisions
|
|
- L6-L5 (daily/weekly) for strategic decisions
|
|
- L4-L1 (monthly/legacy) only for major events or policy changes
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from datetime import UTC, datetime
|
|
from enum import Enum
|
|
from typing import Any
|
|
|
|
from src.context.layer import ContextLayer
|
|
from src.context.store import ContextStore
|
|
|
|
|
|
class DecisionType(str, Enum):
|
|
"""Type of trading decision being made."""
|
|
|
|
NORMAL = "normal" # Regular trade decision
|
|
STRATEGIC = "strategic" # Strategy adjustment
|
|
MAJOR_EVENT = "major_event" # Portfolio rebalancing, policy change
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ContextSelection:
|
|
"""Selected context layers and their relevance scores."""
|
|
|
|
layers: list[ContextLayer]
|
|
relevance_scores: dict[ContextLayer, float]
|
|
total_score: float
|
|
|
|
|
|
class ContextSelector:
|
|
"""Selects optimal context layers to minimize token usage."""
|
|
|
|
def __init__(self, store: ContextStore) -> None:
|
|
"""Initialize the context selector.
|
|
|
|
Args:
|
|
store: ContextStore instance for retrieving context data
|
|
"""
|
|
self.store = store
|
|
|
|
def select_layers(
|
|
self,
|
|
decision_type: DecisionType = DecisionType.NORMAL,
|
|
include_realtime: bool = True,
|
|
) -> list[ContextLayer]:
|
|
"""Select context layers based on decision type.
|
|
|
|
Strategy:
|
|
- NORMAL: L7 (real-time) only
|
|
- STRATEGIC: L7 + L6 + L5 (real-time + daily + weekly)
|
|
- MAJOR_EVENT: All layers L1-L7
|
|
|
|
Args:
|
|
decision_type: Type of decision being made
|
|
include_realtime: Whether to include L7 real-time data
|
|
|
|
Returns:
|
|
List of context layers to use (ordered by priority)
|
|
"""
|
|
if decision_type == DecisionType.NORMAL:
|
|
# Normal trading: only real-time data
|
|
return [ContextLayer.L7_REALTIME] if include_realtime else []
|
|
|
|
elif decision_type == DecisionType.STRATEGIC:
|
|
# Strategic decisions: real-time + recent history
|
|
layers = []
|
|
if include_realtime:
|
|
layers.append(ContextLayer.L7_REALTIME)
|
|
layers.extend([ContextLayer.L6_DAILY, ContextLayer.L5_WEEKLY])
|
|
return layers
|
|
|
|
else: # MAJOR_EVENT
|
|
# Major events: all layers for comprehensive context
|
|
layers = []
|
|
if include_realtime:
|
|
layers.append(ContextLayer.L7_REALTIME)
|
|
layers.extend(
|
|
[
|
|
ContextLayer.L6_DAILY,
|
|
ContextLayer.L5_WEEKLY,
|
|
ContextLayer.L4_MONTHLY,
|
|
ContextLayer.L3_QUARTERLY,
|
|
ContextLayer.L2_ANNUAL,
|
|
ContextLayer.L1_LEGACY,
|
|
]
|
|
)
|
|
return layers
|
|
|
|
def score_layer_relevance(
|
|
self,
|
|
layer: ContextLayer,
|
|
decision_type: DecisionType,
|
|
current_time: datetime | None = None,
|
|
) -> float:
|
|
"""Calculate relevance score for a context layer.
|
|
|
|
Relevance is based on:
|
|
1. Decision type (normal, strategic, major event)
|
|
2. Layer recency (L7 > L6 > ... > L1)
|
|
3. Data availability
|
|
|
|
Args:
|
|
layer: Context layer to score
|
|
decision_type: Type of decision being made
|
|
current_time: Current time (defaults to now)
|
|
|
|
Returns:
|
|
Relevance score (0.0 to 1.0)
|
|
"""
|
|
if current_time is None:
|
|
current_time = datetime.now(UTC)
|
|
|
|
# Base scores by decision type
|
|
base_scores = {
|
|
DecisionType.NORMAL: {
|
|
ContextLayer.L7_REALTIME: 1.0,
|
|
ContextLayer.L6_DAILY: 0.1,
|
|
ContextLayer.L5_WEEKLY: 0.05,
|
|
ContextLayer.L4_MONTHLY: 0.01,
|
|
ContextLayer.L3_QUARTERLY: 0.0,
|
|
ContextLayer.L2_ANNUAL: 0.0,
|
|
ContextLayer.L1_LEGACY: 0.0,
|
|
},
|
|
DecisionType.STRATEGIC: {
|
|
ContextLayer.L7_REALTIME: 0.9,
|
|
ContextLayer.L6_DAILY: 0.8,
|
|
ContextLayer.L5_WEEKLY: 0.7,
|
|
ContextLayer.L4_MONTHLY: 0.3,
|
|
ContextLayer.L3_QUARTERLY: 0.2,
|
|
ContextLayer.L2_ANNUAL: 0.1,
|
|
ContextLayer.L1_LEGACY: 0.05,
|
|
},
|
|
DecisionType.MAJOR_EVENT: {
|
|
ContextLayer.L7_REALTIME: 0.7,
|
|
ContextLayer.L6_DAILY: 0.7,
|
|
ContextLayer.L5_WEEKLY: 0.7,
|
|
ContextLayer.L4_MONTHLY: 0.8,
|
|
ContextLayer.L3_QUARTERLY: 0.8,
|
|
ContextLayer.L2_ANNUAL: 0.9,
|
|
ContextLayer.L1_LEGACY: 1.0,
|
|
},
|
|
}
|
|
|
|
score = base_scores[decision_type].get(layer, 0.0)
|
|
|
|
# Check data availability
|
|
latest_timeframe = self.store.get_latest_timeframe(layer)
|
|
if latest_timeframe is None:
|
|
# No data available - reduce score significantly
|
|
score *= 0.1
|
|
|
|
return score
|
|
|
|
def select_with_scoring(
|
|
self,
|
|
decision_type: DecisionType = DecisionType.NORMAL,
|
|
min_score: float = 0.5,
|
|
) -> ContextSelection:
|
|
"""Select context layers with relevance scoring.
|
|
|
|
Args:
|
|
decision_type: Type of decision being made
|
|
min_score: Minimum relevance score to include a layer
|
|
|
|
Returns:
|
|
ContextSelection with selected layers and scores
|
|
"""
|
|
all_layers = [
|
|
ContextLayer.L7_REALTIME,
|
|
ContextLayer.L6_DAILY,
|
|
ContextLayer.L5_WEEKLY,
|
|
ContextLayer.L4_MONTHLY,
|
|
ContextLayer.L3_QUARTERLY,
|
|
ContextLayer.L2_ANNUAL,
|
|
ContextLayer.L1_LEGACY,
|
|
]
|
|
|
|
scores = {
|
|
layer: self.score_layer_relevance(layer, decision_type) for layer in all_layers
|
|
}
|
|
|
|
# Filter by minimum score
|
|
selected_layers = [layer for layer, score in scores.items() if score >= min_score]
|
|
|
|
# Sort by score (descending)
|
|
selected_layers.sort(key=lambda layer: scores[layer], reverse=True)
|
|
|
|
total_score = sum(scores[layer] for layer in selected_layers)
|
|
|
|
return ContextSelection(
|
|
layers=selected_layers,
|
|
relevance_scores=scores,
|
|
total_score=total_score,
|
|
)
|
|
|
|
def get_context_data(
|
|
self,
|
|
layers: list[ContextLayer],
|
|
max_items_per_layer: int = 10,
|
|
) -> dict[str, Any]:
|
|
"""Retrieve context data for selected layers.
|
|
|
|
Args:
|
|
layers: List of context layers to retrieve
|
|
max_items_per_layer: Maximum number of items per layer
|
|
|
|
Returns:
|
|
Dictionary with context data organized by layer
|
|
"""
|
|
result: dict[str, Any] = {}
|
|
|
|
for layer in layers:
|
|
# Get latest timeframe for this layer
|
|
latest_timeframe = self.store.get_latest_timeframe(layer)
|
|
if latest_timeframe:
|
|
# Get all contexts for latest timeframe
|
|
contexts = self.store.get_all_contexts(layer, latest_timeframe)
|
|
|
|
# Limit number of items
|
|
if len(contexts) > max_items_per_layer:
|
|
# Keep only first N items
|
|
contexts = dict(list(contexts.items())[:max_items_per_layer])
|
|
|
|
result[layer.value] = contexts
|
|
|
|
return result
|
|
|
|
def estimate_context_tokens(self, context_data: dict[str, Any]) -> int:
|
|
"""Estimate total tokens for context data.
|
|
|
|
Args:
|
|
context_data: Context data dictionary
|
|
|
|
Returns:
|
|
Estimated token count
|
|
"""
|
|
import json
|
|
|
|
from src.brain.prompt_optimizer import PromptOptimizer
|
|
|
|
# Serialize to JSON and estimate tokens
|
|
json_str = json.dumps(context_data, ensure_ascii=False)
|
|
return PromptOptimizer.estimate_tokens(json_str)
|
|
|
|
def optimize_context_for_budget(
|
|
self,
|
|
decision_type: DecisionType,
|
|
max_tokens: int,
|
|
) -> dict[str, Any]:
|
|
"""Select and retrieve context data within a token budget.
|
|
|
|
Args:
|
|
decision_type: Type of decision being made
|
|
max_tokens: Maximum token budget for context
|
|
|
|
Returns:
|
|
Optimized context data within budget
|
|
"""
|
|
# Start with minimal selection
|
|
selection = self.select_with_scoring(decision_type, min_score=0.5)
|
|
|
|
# Retrieve data
|
|
context_data = self.get_context_data(selection.layers)
|
|
|
|
# Check if within budget
|
|
estimated_tokens = self.estimate_context_tokens(context_data)
|
|
|
|
if estimated_tokens <= max_tokens:
|
|
return context_data
|
|
|
|
# If over budget, progressively reduce
|
|
# 1. Reduce items per layer
|
|
for max_items in [5, 3, 1]:
|
|
context_data = self.get_context_data(selection.layers, max_items)
|
|
estimated_tokens = self.estimate_context_tokens(context_data)
|
|
if estimated_tokens <= max_tokens:
|
|
return context_data
|
|
|
|
# 2. Remove lower-priority layers
|
|
for min_score in [0.6, 0.7, 0.8, 0.9]:
|
|
selection = self.select_with_scoring(decision_type, min_score=min_score)
|
|
context_data = self.get_context_data(selection.layers, max_items_per_layer=1)
|
|
estimated_tokens = self.estimate_context_tokens(context_data)
|
|
if estimated_tokens <= max_tokens:
|
|
return context_data
|
|
|
|
# Last resort: return only L7 with minimal data
|
|
return self.get_context_data([ContextLayer.L7_REALTIME], max_items_per_layer=1)
|