"""Smart context selection for optimizing token usage. This module implements intelligent selection of context layers (L1-L7) based on decision type and market conditions: - L7 (real-time) for normal trading decisions - L6-L5 (daily/weekly) for strategic decisions - L4-L1 (monthly/legacy) only for major events or policy changes """ from __future__ import annotations from dataclasses import dataclass from datetime import UTC, datetime from enum import StrEnum from typing import Any from src.context.layer import ContextLayer from src.context.store import ContextStore class DecisionType(StrEnum): """Type of trading decision being made.""" NORMAL = "normal" # Regular trade decision STRATEGIC = "strategic" # Strategy adjustment MAJOR_EVENT = "major_event" # Portfolio rebalancing, policy change @dataclass(frozen=True) class ContextSelection: """Selected context layers and their relevance scores.""" layers: list[ContextLayer] relevance_scores: dict[ContextLayer, float] total_score: float class ContextSelector: """Selects optimal context layers to minimize token usage.""" def __init__(self, store: ContextStore) -> None: """Initialize the context selector. Args: store: ContextStore instance for retrieving context data """ self.store = store def select_layers( self, decision_type: DecisionType = DecisionType.NORMAL, include_realtime: bool = True, ) -> list[ContextLayer]: """Select context layers based on decision type. Strategy: - NORMAL: L7 (real-time) only - STRATEGIC: L7 + L6 + L5 (real-time + daily + weekly) - MAJOR_EVENT: All layers L1-L7 Args: decision_type: Type of decision being made include_realtime: Whether to include L7 real-time data Returns: List of context layers to use (ordered by priority) """ if decision_type == DecisionType.NORMAL: # Normal trading: only real-time data return [ContextLayer.L7_REALTIME] if include_realtime else [] elif decision_type == DecisionType.STRATEGIC: # Strategic decisions: real-time + recent history layers = [] if include_realtime: layers.append(ContextLayer.L7_REALTIME) layers.extend([ContextLayer.L6_DAILY, ContextLayer.L5_WEEKLY]) return layers else: # MAJOR_EVENT # Major events: all layers for comprehensive context layers = [] if include_realtime: layers.append(ContextLayer.L7_REALTIME) layers.extend( [ ContextLayer.L6_DAILY, ContextLayer.L5_WEEKLY, ContextLayer.L4_MONTHLY, ContextLayer.L3_QUARTERLY, ContextLayer.L2_ANNUAL, ContextLayer.L1_LEGACY, ] ) return layers def score_layer_relevance( self, layer: ContextLayer, decision_type: DecisionType, current_time: datetime | None = None, ) -> float: """Calculate relevance score for a context layer. Relevance is based on: 1. Decision type (normal, strategic, major event) 2. Layer recency (L7 > L6 > ... > L1) 3. Data availability Args: layer: Context layer to score decision_type: Type of decision being made current_time: Current time (defaults to now) Returns: Relevance score (0.0 to 1.0) """ if current_time is None: current_time = datetime.now(UTC) # Base scores by decision type base_scores = { DecisionType.NORMAL: { ContextLayer.L7_REALTIME: 1.0, ContextLayer.L6_DAILY: 0.1, ContextLayer.L5_WEEKLY: 0.05, ContextLayer.L4_MONTHLY: 0.01, ContextLayer.L3_QUARTERLY: 0.0, ContextLayer.L2_ANNUAL: 0.0, ContextLayer.L1_LEGACY: 0.0, }, DecisionType.STRATEGIC: { ContextLayer.L7_REALTIME: 0.9, ContextLayer.L6_DAILY: 0.8, ContextLayer.L5_WEEKLY: 0.7, ContextLayer.L4_MONTHLY: 0.3, ContextLayer.L3_QUARTERLY: 0.2, ContextLayer.L2_ANNUAL: 0.1, ContextLayer.L1_LEGACY: 0.05, }, DecisionType.MAJOR_EVENT: { ContextLayer.L7_REALTIME: 0.7, ContextLayer.L6_DAILY: 0.7, ContextLayer.L5_WEEKLY: 0.7, ContextLayer.L4_MONTHLY: 0.8, ContextLayer.L3_QUARTERLY: 0.8, ContextLayer.L2_ANNUAL: 0.9, ContextLayer.L1_LEGACY: 1.0, }, } score = base_scores[decision_type].get(layer, 0.0) # Check data availability latest_timeframe = self.store.get_latest_timeframe(layer) if latest_timeframe is None: # No data available - reduce score significantly score *= 0.1 return score def select_with_scoring( self, decision_type: DecisionType = DecisionType.NORMAL, min_score: float = 0.5, ) -> ContextSelection: """Select context layers with relevance scoring. Args: decision_type: Type of decision being made min_score: Minimum relevance score to include a layer Returns: ContextSelection with selected layers and scores """ all_layers = [ ContextLayer.L7_REALTIME, ContextLayer.L6_DAILY, ContextLayer.L5_WEEKLY, ContextLayer.L4_MONTHLY, ContextLayer.L3_QUARTERLY, ContextLayer.L2_ANNUAL, ContextLayer.L1_LEGACY, ] scores = {layer: self.score_layer_relevance(layer, decision_type) for layer in all_layers} # Filter by minimum score selected_layers = [layer for layer, score in scores.items() if score >= min_score] # Sort by score (descending) selected_layers.sort(key=lambda layer: scores[layer], reverse=True) total_score = sum(scores[layer] for layer in selected_layers) return ContextSelection( layers=selected_layers, relevance_scores=scores, total_score=total_score, ) def get_context_data( self, layers: list[ContextLayer], max_items_per_layer: int = 10, ) -> dict[str, Any]: """Retrieve context data for selected layers. Args: layers: List of context layers to retrieve max_items_per_layer: Maximum number of items per layer Returns: Dictionary with context data organized by layer """ result: dict[str, Any] = {} for layer in layers: # Get latest timeframe for this layer latest_timeframe = self.store.get_latest_timeframe(layer) if latest_timeframe: # Get all contexts for latest timeframe contexts = self.store.get_all_contexts(layer, latest_timeframe) # Limit number of items if len(contexts) > max_items_per_layer: # Keep only first N items contexts = dict(list(contexts.items())[:max_items_per_layer]) result[layer.value] = contexts return result def estimate_context_tokens(self, context_data: dict[str, Any]) -> int: """Estimate total tokens for context data. Args: context_data: Context data dictionary Returns: Estimated token count """ import json from src.brain.prompt_optimizer import PromptOptimizer # Serialize to JSON and estimate tokens json_str = json.dumps(context_data, ensure_ascii=False) return PromptOptimizer.estimate_tokens(json_str) def optimize_context_for_budget( self, decision_type: DecisionType, max_tokens: int, ) -> dict[str, Any]: """Select and retrieve context data within a token budget. Args: decision_type: Type of decision being made max_tokens: Maximum token budget for context Returns: Optimized context data within budget """ # Start with minimal selection selection = self.select_with_scoring(decision_type, min_score=0.5) # Retrieve data context_data = self.get_context_data(selection.layers) # Check if within budget estimated_tokens = self.estimate_context_tokens(context_data) if estimated_tokens <= max_tokens: return context_data # If over budget, progressively reduce # 1. Reduce items per layer for max_items in [5, 3, 1]: context_data = self.get_context_data(selection.layers, max_items) estimated_tokens = self.estimate_context_tokens(context_data) if estimated_tokens <= max_tokens: return context_data # 2. Remove lower-priority layers for min_score in [0.6, 0.7, 0.8, 0.9]: selection = self.select_with_scoring(decision_type, min_score=min_score) context_data = self.get_context_data(selection.layers, max_items_per_layer=1) estimated_tokens = self.estimate_context_tokens(context_data) if estimated_tokens <= max_tokens: return context_data # Last resort: return only L7 with minimal data return self.get_context_data([ContextLayer.L7_REALTIME], max_items_per_layer=1)