Some checks failed
CI / test (pull_request) Has been cancelled
- Fix ambiguous variable names (l → layer) - Remove unused imports and variables - Organize import statements Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
329 lines
11 KiB
Python
329 lines
11 KiB
Python
"""Context summarization for efficient historical data representation.
|
|
|
|
This module summarizes old context data instead of including raw details:
|
|
- Key metrics only (averages, trends, not details)
|
|
- Rolling window (keep last N days detailed, summarize older)
|
|
- Aggregate historical data efficiently
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from datetime import UTC, datetime, timedelta
|
|
from typing import Any
|
|
|
|
from src.context.layer import ContextLayer
|
|
from src.context.store import ContextStore
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class SummaryStats:
|
|
"""Statistical summary of historical data."""
|
|
|
|
count: int
|
|
mean: float | None = None
|
|
min: float | None = None
|
|
max: float | None = None
|
|
std: float | None = None
|
|
trend: str | None = None # "up", "down", "flat"
|
|
|
|
|
|
class ContextSummarizer:
|
|
"""Summarizes historical context data to reduce token usage."""
|
|
|
|
def __init__(self, store: ContextStore) -> None:
|
|
"""Initialize the context summarizer.
|
|
|
|
Args:
|
|
store: ContextStore instance for retrieving context data
|
|
"""
|
|
self.store = store
|
|
|
|
def summarize_numeric_values(self, values: list[float]) -> SummaryStats:
|
|
"""Summarize a list of numeric values.
|
|
|
|
Args:
|
|
values: List of numeric values to summarize
|
|
|
|
Returns:
|
|
SummaryStats with mean, min, max, std, and trend
|
|
"""
|
|
if not values:
|
|
return SummaryStats(count=0)
|
|
|
|
count = len(values)
|
|
mean = sum(values) / count
|
|
min_val = min(values)
|
|
max_val = max(values)
|
|
|
|
# Calculate standard deviation
|
|
if count > 1:
|
|
variance = sum((x - mean) ** 2 for x in values) / (count - 1)
|
|
std = variance**0.5
|
|
else:
|
|
std = 0.0
|
|
|
|
# Determine trend
|
|
trend = "flat"
|
|
if count >= 3:
|
|
# Simple trend: compare first third vs last third
|
|
first_third = values[: count // 3]
|
|
last_third = values[-(count // 3) :]
|
|
first_avg = sum(first_third) / len(first_third)
|
|
last_avg = sum(last_third) / len(last_third)
|
|
|
|
# Trend threshold: 5% change
|
|
threshold = 0.05 * abs(first_avg) if first_avg != 0 else 0.01
|
|
|
|
if last_avg > first_avg + threshold:
|
|
trend = "up"
|
|
elif last_avg < first_avg - threshold:
|
|
trend = "down"
|
|
|
|
return SummaryStats(
|
|
count=count,
|
|
mean=round(mean, 4),
|
|
min=round(min_val, 4),
|
|
max=round(max_val, 4),
|
|
std=round(std, 4),
|
|
trend=trend,
|
|
)
|
|
|
|
def summarize_layer(
|
|
self,
|
|
layer: ContextLayer,
|
|
start_date: datetime | None = None,
|
|
end_date: datetime | None = None,
|
|
) -> dict[str, Any]:
|
|
"""Summarize all context data for a layer within a date range.
|
|
|
|
Args:
|
|
layer: Context layer to summarize
|
|
start_date: Start date (inclusive), None for all
|
|
end_date: End date (inclusive), None for now
|
|
|
|
Returns:
|
|
Dictionary with summarized metrics
|
|
"""
|
|
if end_date is None:
|
|
end_date = datetime.now(UTC)
|
|
|
|
# Get all contexts for this layer
|
|
all_contexts = self.store.get_all_contexts(layer)
|
|
|
|
if not all_contexts:
|
|
return {"summary": "No data available", "count": 0}
|
|
|
|
# Group numeric values by key
|
|
numeric_data: dict[str, list[float]] = {}
|
|
text_data: dict[str, list[str]] = {}
|
|
|
|
for key, value in all_contexts.items():
|
|
# Try to extract numeric values
|
|
if isinstance(value, (int, float)):
|
|
if key not in numeric_data:
|
|
numeric_data[key] = []
|
|
numeric_data[key].append(float(value))
|
|
elif isinstance(value, dict):
|
|
# Extract numeric fields from dict
|
|
for subkey, subvalue in value.items():
|
|
if isinstance(subvalue, (int, float)):
|
|
full_key = f"{key}.{subkey}"
|
|
if full_key not in numeric_data:
|
|
numeric_data[full_key] = []
|
|
numeric_data[full_key].append(float(subvalue))
|
|
elif isinstance(value, str):
|
|
if key not in text_data:
|
|
text_data[key] = []
|
|
text_data[key].append(value)
|
|
|
|
# Summarize numeric data
|
|
summary: dict[str, Any] = {}
|
|
|
|
for key, values in numeric_data.items():
|
|
stats = self.summarize_numeric_values(values)
|
|
summary[key] = {
|
|
"count": stats.count,
|
|
"avg": stats.mean,
|
|
"range": [stats.min, stats.max],
|
|
"trend": stats.trend,
|
|
}
|
|
|
|
# Summarize text data (just counts)
|
|
for key, values in text_data.items():
|
|
summary[f"{key}_count"] = len(values)
|
|
|
|
summary["total_entries"] = len(all_contexts)
|
|
|
|
return summary
|
|
|
|
def rolling_window_summary(
|
|
self,
|
|
layer: ContextLayer,
|
|
window_days: int = 30,
|
|
summarize_older: bool = True,
|
|
) -> dict[str, Any]:
|
|
"""Create a rolling window summary.
|
|
|
|
Recent data (within window) is kept detailed.
|
|
Older data is summarized to key metrics.
|
|
|
|
Args:
|
|
layer: Context layer to summarize
|
|
window_days: Number of days to keep detailed
|
|
summarize_older: Whether to summarize data older than window
|
|
|
|
Returns:
|
|
Dictionary with recent (detailed) and historical (summary) data
|
|
"""
|
|
result: dict[str, Any] = {
|
|
"window_days": window_days,
|
|
"recent_data": {},
|
|
"historical_summary": {},
|
|
}
|
|
|
|
# Get all contexts
|
|
all_contexts = self.store.get_all_contexts(layer)
|
|
|
|
recent_values: dict[str, list[float]] = {}
|
|
historical_values: dict[str, list[float]] = {}
|
|
|
|
for key, value in all_contexts.items():
|
|
# For simplicity, treat all numeric values
|
|
if isinstance(value, (int, float)):
|
|
# Note: We don't have timestamps in context keys
|
|
# This is a simplified implementation
|
|
# In practice, would need to check timeframe field
|
|
|
|
# For now, put recent data in window
|
|
if key not in recent_values:
|
|
recent_values[key] = []
|
|
recent_values[key].append(float(value))
|
|
|
|
# Detailed recent data
|
|
result["recent_data"] = {key: values[-10:] for key, values in recent_values.items()}
|
|
|
|
# Summarized historical data
|
|
if summarize_older:
|
|
for key, values in historical_values.items():
|
|
stats = self.summarize_numeric_values(values)
|
|
result["historical_summary"][key] = {
|
|
"count": stats.count,
|
|
"avg": stats.mean,
|
|
"trend": stats.trend,
|
|
}
|
|
|
|
return result
|
|
|
|
def aggregate_to_higher_layer(
|
|
self,
|
|
source_layer: ContextLayer,
|
|
target_layer: ContextLayer,
|
|
metric_key: str,
|
|
aggregation_func: str = "mean",
|
|
) -> float | None:
|
|
"""Aggregate data from source layer to target layer.
|
|
|
|
Args:
|
|
source_layer: Source context layer (more granular)
|
|
target_layer: Target context layer (less granular)
|
|
metric_key: Key of metric to aggregate
|
|
aggregation_func: Aggregation function ("mean", "sum", "max", "min")
|
|
|
|
Returns:
|
|
Aggregated value, or None if no data available
|
|
"""
|
|
# Get all contexts from source layer
|
|
source_contexts = self.store.get_all_contexts(source_layer)
|
|
|
|
# Extract values for metric_key
|
|
values = []
|
|
for key, value in source_contexts.items():
|
|
if key == metric_key and isinstance(value, (int, float)):
|
|
values.append(float(value))
|
|
elif isinstance(value, dict) and metric_key in value:
|
|
subvalue = value[metric_key]
|
|
if isinstance(subvalue, (int, float)):
|
|
values.append(float(subvalue))
|
|
|
|
if not values:
|
|
return None
|
|
|
|
# Apply aggregation function
|
|
if aggregation_func == "mean":
|
|
return sum(values) / len(values)
|
|
elif aggregation_func == "sum":
|
|
return sum(values)
|
|
elif aggregation_func == "max":
|
|
return max(values)
|
|
elif aggregation_func == "min":
|
|
return min(values)
|
|
else:
|
|
return sum(values) / len(values) # Default to mean
|
|
|
|
def create_compact_summary(
|
|
self,
|
|
layers: list[ContextLayer],
|
|
top_n_metrics: int = 5,
|
|
) -> dict[str, Any]:
|
|
"""Create a compact summary across multiple layers.
|
|
|
|
Args:
|
|
layers: List of context layers to summarize
|
|
top_n_metrics: Number of top metrics to include per layer
|
|
|
|
Returns:
|
|
Compact summary dictionary
|
|
"""
|
|
summary: dict[str, Any] = {}
|
|
|
|
for layer in layers:
|
|
layer_summary = self.summarize_layer(layer)
|
|
|
|
# Keep only top N metrics (by count/relevance)
|
|
metrics = []
|
|
for key, value in layer_summary.items():
|
|
if isinstance(value, dict) and "count" in value:
|
|
metrics.append((key, value, value["count"]))
|
|
|
|
# Sort by count (descending)
|
|
metrics.sort(key=lambda x: x[2], reverse=True)
|
|
|
|
# Keep top N
|
|
top_metrics = {m[0]: m[1] for m in metrics[:top_n_metrics]}
|
|
|
|
summary[layer.value] = top_metrics
|
|
|
|
return summary
|
|
|
|
def format_summary_for_prompt(self, summary: dict[str, Any]) -> str:
|
|
"""Format summary for inclusion in a prompt.
|
|
|
|
Args:
|
|
summary: Summary dictionary
|
|
|
|
Returns:
|
|
Formatted string for prompt
|
|
"""
|
|
lines = []
|
|
|
|
for layer, metrics in summary.items():
|
|
if not metrics:
|
|
continue
|
|
|
|
lines.append(f"{layer}:")
|
|
for key, value in metrics.items():
|
|
if isinstance(value, dict):
|
|
# Format as: key: avg=X, trend=Y
|
|
parts = []
|
|
if "avg" in value and value["avg"] is not None:
|
|
parts.append(f"avg={value['avg']:.2f}")
|
|
if "trend" in value and value["trend"]:
|
|
parts.append(f"trend={value['trend']}")
|
|
if parts:
|
|
lines.append(f" {key}: {', '.join(parts)}")
|
|
else:
|
|
lines.append(f" {key}: {value}")
|
|
|
|
return "\n".join(lines)
|