Compare commits

..

6 Commits

Author SHA1 Message Date
agentson
e3b1ecc572 feat: context aggregation scheduler (issue #87)
Some checks failed
CI / test (pull_request) Has been cancelled
- Add ContextScheduler with run_if_due() for periodic rollups
- Weekly (Sunday), monthly (last day), quarterly, annual, legacy schedules
- Daily cleanup of expired contexts via ContextStore
- Dedup guard: each task runs at most once per day

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-10 04:26:51 +09:00
8acf72b22c Merge pull request 'feat: DailyScorecard 모델 정의 (issue #90)' (#118) from feature/issue-90-scorecard-model into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #118
2026-02-10 04:26:21 +09:00
agentson
c95102a0bd feat: DailyScorecard model for per-market performance review (issue #90)
Some checks failed
CI / test (pull_request) Has been cancelled
- Add DailyScorecard dataclass with market-scoped fields
- Fields: date, market, decisions, pnl, win_rate, scenario_match_rate, lessons, cross_market_note
- Export from src/evolution/__init__.py

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-10 04:25:37 +09:00
0685d62f9c Merge pull request 'feat: EOD 집계 시장 필터 추가 (issue #86)' (#117) from feature/issue-86-eod-market-filter into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #117
2026-02-10 04:24:58 +09:00
agentson
78021d4695 feat: EOD aggregation with market filter (issue #86)
Some checks failed
CI / test (pull_request) Has been cancelled
- Add market parameter to aggregate_daily_from_trades() for per-market L6 aggregation
- Store market-scoped keys (total_pnl_KR, win_rate_US, etc.) in L6/L5/L4 layers
- Hook aggregate_daily_from_trades() into market close detection in run()
- Update tests for market-scoped context keys

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-10 04:23:49 +09:00
3cdd10783b Merge pull request 'feat: L7 실시간 컨텍스트 시장별 기록 (issue #85)' (#116) from feature/issue-85-l7-context-write into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #116
2026-02-10 04:22:57 +09:00
9 changed files with 493 additions and 61 deletions

View File

@@ -5,6 +5,7 @@ The context tree implements Pillar 2: hierarchical memory management across
""" """
from src.context.layer import ContextLayer from src.context.layer import ContextLayer
from src.context.scheduler import ContextScheduler
from src.context.store import ContextStore from src.context.store import ContextStore
__all__ = ["ContextLayer", "ContextStore"] __all__ = ["ContextLayer", "ContextScheduler", "ContextStore"]

View File

@@ -18,16 +18,33 @@ class ContextAggregator:
self.conn = conn self.conn = conn
self.store = ContextStore(conn) self.store = ContextStore(conn)
def aggregate_daily_from_trades(self, date: str | None = None) -> None: def aggregate_daily_from_trades(
self, date: str | None = None, market: str | None = None
) -> None:
"""Aggregate L6 (daily) context from trades table. """Aggregate L6 (daily) context from trades table.
Args: Args:
date: Date in YYYY-MM-DD format. If None, uses today. date: Date in YYYY-MM-DD format. If None, uses today.
market: Market code filter (e.g., "KR", "US"). If None, aggregates all markets.
""" """
if date is None: if date is None:
date = datetime.now(UTC).date().isoformat() date = datetime.now(UTC).date().isoformat()
# Calculate daily metrics from trades if market is None:
cursor = self.conn.execute(
"""
SELECT DISTINCT market
FROM trades
WHERE DATE(timestamp) = ?
""",
(date,),
)
markets = [row[0] for row in cursor.fetchall() if row[0]]
else:
markets = [market]
for market_code in markets:
# Calculate daily metrics from trades for the market
cursor = self.conn.execute( cursor = self.conn.execute(
""" """
SELECT SELECT
@@ -41,29 +58,43 @@ class ContextAggregator:
SUM(CASE WHEN pnl > 0 THEN 1 ELSE 0 END) as wins, SUM(CASE WHEN pnl > 0 THEN 1 ELSE 0 END) as wins,
SUM(CASE WHEN pnl < 0 THEN 1 ELSE 0 END) as losses SUM(CASE WHEN pnl < 0 THEN 1 ELSE 0 END) as losses
FROM trades FROM trades
WHERE DATE(timestamp) = ? WHERE DATE(timestamp) = ? AND market = ?
""", """,
(date,), (date, market_code),
) )
row = cursor.fetchone() row = cursor.fetchone()
if row and row[0] > 0: # At least one trade if row and row[0] > 0: # At least one trade
trade_count, buys, sells, holds, avg_conf, total_pnl, stocks, wins, losses = row trade_count, buys, sells, holds, avg_conf, total_pnl, stocks, wins, losses = row
# Store daily metrics in L6 key_suffix = f"_{market_code}"
self.store.set_context(ContextLayer.L6_DAILY, date, "trade_count", trade_count)
self.store.set_context(ContextLayer.L6_DAILY, date, "buys", buys) # Store daily metrics in L6 with market suffix
self.store.set_context(ContextLayer.L6_DAILY, date, "sells", sells)
self.store.set_context(ContextLayer.L6_DAILY, date, "holds", holds)
self.store.set_context( self.store.set_context(
ContextLayer.L6_DAILY, date, "avg_confidence", round(avg_conf, 2) ContextLayer.L6_DAILY, date, f"trade_count{key_suffix}", trade_count
)
self.store.set_context(ContextLayer.L6_DAILY, date, f"buys{key_suffix}", buys)
self.store.set_context(ContextLayer.L6_DAILY, date, f"sells{key_suffix}", sells)
self.store.set_context(ContextLayer.L6_DAILY, date, f"holds{key_suffix}", holds)
self.store.set_context(
ContextLayer.L6_DAILY,
date,
f"avg_confidence{key_suffix}",
round(avg_conf, 2),
) )
self.store.set_context( self.store.set_context(
ContextLayer.L6_DAILY, date, "total_pnl", round(total_pnl, 2) ContextLayer.L6_DAILY,
date,
f"total_pnl{key_suffix}",
round(total_pnl, 2),
)
self.store.set_context(
ContextLayer.L6_DAILY, date, f"unique_stocks{key_suffix}", stocks
) )
self.store.set_context(ContextLayer.L6_DAILY, date, "unique_stocks", stocks)
win_rate = round(wins / max(wins + losses, 1) * 100, 2) win_rate = round(wins / max(wins + losses, 1) * 100, 2)
self.store.set_context(ContextLayer.L6_DAILY, date, "win_rate", win_rate) self.store.set_context(
ContextLayer.L6_DAILY, date, f"win_rate{key_suffix}", win_rate
)
def aggregate_weekly_from_daily(self, week: str | None = None) -> None: def aggregate_weekly_from_daily(self, week: str | None = None) -> None:
"""Aggregate L5 (weekly) context from L6 (daily). """Aggregate L5 (weekly) context from L6 (daily).
@@ -92,14 +123,25 @@ class ContextAggregator:
daily_data[row[0]].append(json.loads(row[1])) daily_data[row[0]].append(json.loads(row[1]))
if daily_data: if daily_data:
# Sum all PnL values # Sum all PnL values (market-specific if suffixed)
if "total_pnl" in daily_data: if "total_pnl" in daily_data:
total_pnl = sum(daily_data["total_pnl"]) total_pnl = sum(daily_data["total_pnl"])
self.store.set_context( self.store.set_context(
ContextLayer.L5_WEEKLY, week, "weekly_pnl", round(total_pnl, 2) ContextLayer.L5_WEEKLY, week, "weekly_pnl", round(total_pnl, 2)
) )
# Average all confidence values for key, values in daily_data.items():
if key.startswith("total_pnl_"):
market_code = key.split("total_pnl_", 1)[1]
total_pnl = sum(values)
self.store.set_context(
ContextLayer.L5_WEEKLY,
week,
f"weekly_pnl_{market_code}",
round(total_pnl, 2),
)
# Average all confidence values (market-specific if suffixed)
if "avg_confidence" in daily_data: if "avg_confidence" in daily_data:
conf_values = daily_data["avg_confidence"] conf_values = daily_data["avg_confidence"]
avg_conf = sum(conf_values) / len(conf_values) avg_conf = sum(conf_values) / len(conf_values)
@@ -107,6 +149,17 @@ class ContextAggregator:
ContextLayer.L5_WEEKLY, week, "avg_confidence", round(avg_conf, 2) ContextLayer.L5_WEEKLY, week, "avg_confidence", round(avg_conf, 2)
) )
for key, values in daily_data.items():
if key.startswith("avg_confidence_"):
market_code = key.split("avg_confidence_", 1)[1]
avg_conf = sum(values) / len(values)
self.store.set_context(
ContextLayer.L5_WEEKLY,
week,
f"avg_confidence_{market_code}",
round(avg_conf, 2),
)
def aggregate_monthly_from_weekly(self, month: str | None = None) -> None: def aggregate_monthly_from_weekly(self, month: str | None = None) -> None:
"""Aggregate L4 (monthly) context from L5 (weekly). """Aggregate L4 (monthly) context from L5 (weekly).
@@ -135,8 +188,16 @@ class ContextAggregator:
if weekly_data: if weekly_data:
# Sum all weekly PnL values # Sum all weekly PnL values
total_pnl_values: list[float] = []
if "weekly_pnl" in weekly_data: if "weekly_pnl" in weekly_data:
total_pnl = sum(weekly_data["weekly_pnl"]) total_pnl_values.extend(weekly_data["weekly_pnl"])
for key, values in weekly_data.items():
if key.startswith("weekly_pnl_"):
total_pnl_values.extend(values)
if total_pnl_values:
total_pnl = sum(total_pnl_values)
self.store.set_context( self.store.set_context(
ContextLayer.L4_MONTHLY, month, "monthly_pnl", round(total_pnl, 2) ContextLayer.L4_MONTHLY, month, "monthly_pnl", round(total_pnl, 2)
) )

135
src/context/scheduler.py Normal file
View File

@@ -0,0 +1,135 @@
"""Context aggregation scheduler for periodic rollups and cleanup."""
from __future__ import annotations
import sqlite3
from calendar import monthrange
from dataclasses import dataclass
from datetime import UTC, datetime
from src.context.aggregator import ContextAggregator
from src.context.store import ContextStore
@dataclass(frozen=True)
class ScheduleResult:
"""Represents which scheduled tasks ran."""
weekly: bool = False
monthly: bool = False
quarterly: bool = False
annual: bool = False
legacy: bool = False
cleanup: bool = False
class ContextScheduler:
"""Run periodic context aggregations and cleanup when due."""
def __init__(
self,
conn: sqlite3.Connection | None = None,
aggregator: ContextAggregator | None = None,
store: ContextStore | None = None,
) -> None:
if aggregator is None:
if conn is None:
raise ValueError("conn is required when aggregator is not provided")
aggregator = ContextAggregator(conn)
self.aggregator = aggregator
if store is None:
store = getattr(aggregator, "store", None)
if store is None:
if conn is None:
raise ValueError("conn is required when store is not provided")
store = ContextStore(conn)
self.store = store
self._last_run: dict[str, str] = {}
def run_if_due(self, now: datetime | None = None) -> ScheduleResult:
"""Run scheduled aggregations if their schedule is due.
Args:
now: Current datetime (UTC). If None, uses current time.
Returns:
ScheduleResult indicating which tasks ran.
"""
if now is None:
now = datetime.now(UTC)
today = now.date().isoformat()
result = ScheduleResult()
if self._should_run("cleanup", today):
self.store.cleanup_expired_contexts()
result = self._with(result, cleanup=True)
if self._is_sunday(now) and self._should_run("weekly", today):
week = now.strftime("%Y-W%V")
self.aggregator.aggregate_weekly_from_daily(week)
result = self._with(result, weekly=True)
if self._is_last_day_of_month(now) and self._should_run("monthly", today):
month = now.strftime("%Y-%m")
self.aggregator.aggregate_monthly_from_weekly(month)
result = self._with(result, monthly=True)
if self._is_last_day_of_quarter(now) and self._should_run("quarterly", today):
quarter = self._current_quarter(now)
self.aggregator.aggregate_quarterly_from_monthly(quarter)
result = self._with(result, quarterly=True)
if self._is_last_day_of_year(now) and self._should_run("annual", today):
year = str(now.year)
self.aggregator.aggregate_annual_from_quarterly(year)
result = self._with(result, annual=True)
# Legacy rollup runs after annual aggregation.
self.aggregator.aggregate_legacy_from_annual()
result = self._with(result, legacy=True)
return result
def _should_run(self, key: str, date_str: str) -> bool:
if self._last_run.get(key) == date_str:
return False
self._last_run[key] = date_str
return True
@staticmethod
def _is_sunday(now: datetime) -> bool:
return now.weekday() == 6
@staticmethod
def _is_last_day_of_month(now: datetime) -> bool:
last_day = monthrange(now.year, now.month)[1]
return now.day == last_day
@classmethod
def _is_last_day_of_quarter(cls, now: datetime) -> bool:
if now.month not in (3, 6, 9, 12):
return False
return cls._is_last_day_of_month(now)
@staticmethod
def _is_last_day_of_year(now: datetime) -> bool:
return now.month == 12 and now.day == 31
@staticmethod
def _current_quarter(now: datetime) -> str:
quarter = (now.month - 1) // 3 + 1
return f"{now.year}-Q{quarter}"
@staticmethod
def _with(result: ScheduleResult, **kwargs: bool) -> ScheduleResult:
return ScheduleResult(
weekly=kwargs.get("weekly", result.weekly),
monthly=kwargs.get("monthly", result.monthly),
quarterly=kwargs.get("quarterly", result.quarterly),
annual=kwargs.get("annual", result.annual),
legacy=kwargs.get("legacy", result.legacy),
cleanup=kwargs.get("cleanup", result.cleanup),
)

View File

@@ -7,6 +7,7 @@ from src.evolution.performance_tracker import (
PerformanceTracker, PerformanceTracker,
StrategyMetrics, StrategyMetrics,
) )
from src.evolution.scorecard import DailyScorecard
__all__ = [ __all__ = [
"EvolutionOptimizer", "EvolutionOptimizer",
@@ -16,4 +17,5 @@ __all__ = [
"PerformanceTracker", "PerformanceTracker",
"PerformanceDashboard", "PerformanceDashboard",
"StrategyMetrics", "StrategyMetrics",
"DailyScorecard",
] ]

View File

@@ -0,0 +1,25 @@
"""Daily scorecard model for end-of-day performance review."""
from __future__ import annotations
from dataclasses import dataclass, field
@dataclass
class DailyScorecard:
"""Structured daily performance snapshot for a single market."""
date: str
market: str
total_decisions: int
buys: int
sells: int
holds: int
total_pnl: float
win_rate: float
avg_confidence: float
scenario_match_rate: float
top_winners: list[str] = field(default_factory=list)
top_losers: list[str] = field(default_factory=list)
lessons: list[str] = field(default_factory=list)
cross_market_note: str = ""

View File

@@ -20,6 +20,7 @@ from src.brain.gemini_client import GeminiClient, TradeDecision
from src.broker.kis_api import KISBroker from src.broker.kis_api import KISBroker
from src.broker.overseas import OverseasBroker from src.broker.overseas import OverseasBroker
from src.config import Settings from src.config import Settings
from src.context.aggregator import ContextAggregator
from src.context.layer import ContextLayer from src.context.layer import ContextLayer
from src.context.store import ContextStore from src.context.store import ContextStore
from src.core.criticality import CriticalityAssessor from src.core.criticality import CriticalityAssessor
@@ -706,6 +707,7 @@ async def run(settings: Settings) -> None:
db_conn = init_db(settings.DB_PATH) db_conn = init_db(settings.DB_PATH)
decision_logger = DecisionLogger(db_conn) decision_logger = DecisionLogger(db_conn)
context_store = ContextStore(db_conn) context_store = ContextStore(db_conn)
context_aggregator = ContextAggregator(db_conn)
# V2 proactive strategy components # V2 proactive strategy components
context_selector = ContextSelector(context_store) context_selector = ContextSelector(context_store)
@@ -990,6 +992,13 @@ async def run(settings: Settings) -> None:
market_info = MARKETS.get(market_code) market_info = MARKETS.get(market_code)
if market_info: if market_info:
await telegram.notify_market_close(market_info.name, 0.0) await telegram.notify_market_close(market_info.name, 0.0)
market_date = datetime.now(
market_info.timezone
).date().isoformat()
context_aggregator.aggregate_daily_from_trades(
date=market_date,
market=market_code,
)
except Exception as exc: except Exception as exc:
logger.warning("Market close notification failed: %s", exc) logger.warning("Market close notification failed: %s", exc)
_market_states[market_code] = False _market_states[market_code] = False

View File

@@ -161,7 +161,7 @@ class TestContextAggregator:
self, aggregator: ContextAggregator, db_conn: sqlite3.Connection self, aggregator: ContextAggregator, db_conn: sqlite3.Connection
) -> None: ) -> None:
"""Test aggregating daily metrics from trades.""" """Test aggregating daily metrics from trades."""
date = "2026-02-04" date = datetime.now(UTC).date().isoformat()
# Create sample trades # Create sample trades
log_trade(db_conn, "005930", "BUY", 85, "Good signal", quantity=10, price=70000, pnl=500) log_trade(db_conn, "005930", "BUY", 85, "Good signal", quantity=10, price=70000, pnl=500)
@@ -175,36 +175,44 @@ class TestContextAggregator:
db_conn.commit() db_conn.commit()
# Aggregate # Aggregate
aggregator.aggregate_daily_from_trades(date) aggregator.aggregate_daily_from_trades(date, market="KR")
# Verify L6 contexts # Verify L6 contexts
store = aggregator.store store = aggregator.store
assert store.get_context(ContextLayer.L6_DAILY, date, "trade_count") == 3 assert store.get_context(ContextLayer.L6_DAILY, date, "trade_count_KR") == 3
assert store.get_context(ContextLayer.L6_DAILY, date, "buys") == 1 assert store.get_context(ContextLayer.L6_DAILY, date, "buys_KR") == 1
assert store.get_context(ContextLayer.L6_DAILY, date, "sells") == 1 assert store.get_context(ContextLayer.L6_DAILY, date, "sells_KR") == 1
assert store.get_context(ContextLayer.L6_DAILY, date, "holds") == 1 assert store.get_context(ContextLayer.L6_DAILY, date, "holds_KR") == 1
assert store.get_context(ContextLayer.L6_DAILY, date, "total_pnl") == 2000.0 assert store.get_context(ContextLayer.L6_DAILY, date, "total_pnl_KR") == 2000.0
assert store.get_context(ContextLayer.L6_DAILY, date, "unique_stocks") == 3 assert store.get_context(ContextLayer.L6_DAILY, date, "unique_stocks_KR") == 3
# 2 wins, 0 losses # 2 wins, 0 losses
assert store.get_context(ContextLayer.L6_DAILY, date, "win_rate") == 100.0 assert store.get_context(ContextLayer.L6_DAILY, date, "win_rate_KR") == 100.0
def test_aggregate_weekly_from_daily(self, aggregator: ContextAggregator) -> None: def test_aggregate_weekly_from_daily(self, aggregator: ContextAggregator) -> None:
"""Test aggregating weekly metrics from daily.""" """Test aggregating weekly metrics from daily."""
week = "2026-W06" week = "2026-W06"
# Set daily contexts # Set daily contexts
aggregator.store.set_context(ContextLayer.L6_DAILY, "2026-02-02", "total_pnl", 100.0) aggregator.store.set_context(
aggregator.store.set_context(ContextLayer.L6_DAILY, "2026-02-03", "total_pnl", 200.0) ContextLayer.L6_DAILY, "2026-02-02", "total_pnl_KR", 100.0
aggregator.store.set_context(ContextLayer.L6_DAILY, "2026-02-02", "avg_confidence", 80.0) )
aggregator.store.set_context(ContextLayer.L6_DAILY, "2026-02-03", "avg_confidence", 85.0) aggregator.store.set_context(
ContextLayer.L6_DAILY, "2026-02-03", "total_pnl_KR", 200.0
)
aggregator.store.set_context(
ContextLayer.L6_DAILY, "2026-02-02", "avg_confidence_KR", 80.0
)
aggregator.store.set_context(
ContextLayer.L6_DAILY, "2026-02-03", "avg_confidence_KR", 85.0
)
# Aggregate # Aggregate
aggregator.aggregate_weekly_from_daily(week) aggregator.aggregate_weekly_from_daily(week)
# Verify L5 contexts # Verify L5 contexts
store = aggregator.store store = aggregator.store
weekly_pnl = store.get_context(ContextLayer.L5_WEEKLY, week, "weekly_pnl") weekly_pnl = store.get_context(ContextLayer.L5_WEEKLY, week, "weekly_pnl_KR")
avg_conf = store.get_context(ContextLayer.L5_WEEKLY, week, "avg_confidence") avg_conf = store.get_context(ContextLayer.L5_WEEKLY, week, "avg_confidence_KR")
assert weekly_pnl == 300.0 assert weekly_pnl == 300.0
assert avg_conf == 82.5 assert avg_conf == 82.5
@@ -214,9 +222,15 @@ class TestContextAggregator:
month = "2026-02" month = "2026-02"
# Set weekly contexts # Set weekly contexts
aggregator.store.set_context(ContextLayer.L5_WEEKLY, "2026-W05", "weekly_pnl", 100.0) aggregator.store.set_context(
aggregator.store.set_context(ContextLayer.L5_WEEKLY, "2026-W06", "weekly_pnl", 200.0) ContextLayer.L5_WEEKLY, "2026-W05", "weekly_pnl_KR", 100.0
aggregator.store.set_context(ContextLayer.L5_WEEKLY, "2026-W07", "weekly_pnl", 150.0) )
aggregator.store.set_context(
ContextLayer.L5_WEEKLY, "2026-W06", "weekly_pnl_KR", 200.0
)
aggregator.store.set_context(
ContextLayer.L5_WEEKLY, "2026-W07", "weekly_pnl_KR", 150.0
)
# Aggregate # Aggregate
aggregator.aggregate_monthly_from_weekly(month) aggregator.aggregate_monthly_from_weekly(month)
@@ -285,7 +299,7 @@ class TestContextAggregator:
self, aggregator: ContextAggregator, db_conn: sqlite3.Connection self, aggregator: ContextAggregator, db_conn: sqlite3.Connection
) -> None: ) -> None:
"""Test running all aggregations from L7 to L1.""" """Test running all aggregations from L7 to L1."""
date = "2026-02-04" date = datetime.now(UTC).date().isoformat()
# Create sample trades # Create sample trades
log_trade(db_conn, "005930", "BUY", 85, "Good signal", quantity=10, price=70000, pnl=1000) log_trade(db_conn, "005930", "BUY", 85, "Good signal", quantity=10, price=70000, pnl=1000)
@@ -299,12 +313,12 @@ class TestContextAggregator:
# Verify data exists in each layer # Verify data exists in each layer
store = aggregator.store store = aggregator.store
assert store.get_context(ContextLayer.L6_DAILY, date, "total_pnl") == 1000.0 assert store.get_context(ContextLayer.L6_DAILY, date, "total_pnl_KR") == 1000.0
from datetime import date as date_cls from datetime import date as date_cls
trade_date = date_cls.fromisoformat(date) trade_date = date_cls.fromisoformat(date)
iso_year, iso_week, _ = trade_date.isocalendar() iso_year, iso_week, _ = trade_date.isocalendar()
trade_week = f"{iso_year}-W{iso_week:02d}" trade_week = f"{iso_year}-W{iso_week:02d}"
assert store.get_context(ContextLayer.L5_WEEKLY, trade_week, "weekly_pnl") is not None assert store.get_context(ContextLayer.L5_WEEKLY, trade_week, "weekly_pnl_KR") is not None
trade_month = f"{trade_date.year}-{trade_date.month:02d}" trade_month = f"{trade_date.year}-{trade_date.month:02d}"
trade_quarter = f"{trade_date.year}-Q{(trade_date.month - 1) // 3 + 1}" trade_quarter = f"{trade_date.year}-Q{(trade_date.month - 1) // 3 + 1}"
trade_year = str(trade_date.year) trade_year = str(trade_date.year)

View File

@@ -0,0 +1,104 @@
"""Tests for ContextScheduler."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import UTC, datetime
from src.context.scheduler import ContextScheduler
@dataclass
class StubAggregator:
"""Stub aggregator that records calls."""
weekly_calls: list[str]
monthly_calls: list[str]
quarterly_calls: list[str]
annual_calls: list[str]
legacy_calls: int
def aggregate_weekly_from_daily(self, week: str) -> None:
self.weekly_calls.append(week)
def aggregate_monthly_from_weekly(self, month: str) -> None:
self.monthly_calls.append(month)
def aggregate_quarterly_from_monthly(self, quarter: str) -> None:
self.quarterly_calls.append(quarter)
def aggregate_annual_from_quarterly(self, year: str) -> None:
self.annual_calls.append(year)
def aggregate_legacy_from_annual(self) -> None:
self.legacy_calls += 1
@dataclass
class StubStore:
"""Stub store that records cleanup calls."""
cleanup_calls: int = 0
def cleanup_expired_contexts(self) -> None:
self.cleanup_calls += 1
def make_scheduler() -> tuple[ContextScheduler, StubAggregator, StubStore]:
aggregator = StubAggregator([], [], [], [], 0)
store = StubStore()
scheduler = ContextScheduler(aggregator=aggregator, store=store)
return scheduler, aggregator, store
def test_run_if_due_weekly() -> None:
scheduler, aggregator, store = make_scheduler()
now = datetime(2026, 2, 8, 10, 0, tzinfo=UTC) # Sunday
result = scheduler.run_if_due(now)
assert result.weekly is True
assert aggregator.weekly_calls == ["2026-W06"]
assert store.cleanup_calls == 1
def test_run_if_due_monthly() -> None:
scheduler, aggregator, _store = make_scheduler()
now = datetime(2026, 2, 28, 12, 0, tzinfo=UTC) # Last day of month
result = scheduler.run_if_due(now)
assert result.monthly is True
assert aggregator.monthly_calls == ["2026-02"]
def test_run_if_due_quarterly() -> None:
scheduler, aggregator, _store = make_scheduler()
now = datetime(2026, 3, 31, 12, 0, tzinfo=UTC) # Last day of Q1
result = scheduler.run_if_due(now)
assert result.quarterly is True
assert aggregator.quarterly_calls == ["2026-Q1"]
def test_run_if_due_annual_and_legacy() -> None:
scheduler, aggregator, _store = make_scheduler()
now = datetime(2026, 12, 31, 12, 0, tzinfo=UTC)
result = scheduler.run_if_due(now)
assert result.annual is True
assert result.legacy is True
assert aggregator.annual_calls == ["2026"]
assert aggregator.legacy_calls == 1
def test_cleanup_runs_once_per_day() -> None:
scheduler, _aggregator, store = make_scheduler()
now = datetime(2026, 2, 9, 9, 0, tzinfo=UTC)
scheduler.run_if_due(now)
scheduler.run_if_due(now)
assert store.cleanup_calls == 1

81
tests/test_scorecard.py Normal file
View File

@@ -0,0 +1,81 @@
"""Tests for DailyScorecard model."""
from __future__ import annotations
from src.evolution.scorecard import DailyScorecard
def test_scorecard_initialization() -> None:
scorecard = DailyScorecard(
date="2026-02-08",
market="KR",
total_decisions=10,
buys=3,
sells=2,
holds=5,
total_pnl=1234.5,
win_rate=60.0,
avg_confidence=78.5,
scenario_match_rate=70.0,
top_winners=["005930", "000660"],
top_losers=["035420"],
lessons=["Avoid chasing breakouts"],
cross_market_note="US volatility spillover",
)
assert scorecard.market == "KR"
assert scorecard.total_decisions == 10
assert scorecard.total_pnl == 1234.5
assert scorecard.top_winners == ["005930", "000660"]
assert scorecard.lessons == ["Avoid chasing breakouts"]
assert scorecard.cross_market_note == "US volatility spillover"
def test_scorecard_defaults() -> None:
scorecard = DailyScorecard(
date="2026-02-08",
market="US",
total_decisions=0,
buys=0,
sells=0,
holds=0,
total_pnl=0.0,
win_rate=0.0,
avg_confidence=0.0,
scenario_match_rate=0.0,
)
assert scorecard.top_winners == []
assert scorecard.top_losers == []
assert scorecard.lessons == []
assert scorecard.cross_market_note == ""
def test_scorecard_list_isolation() -> None:
a = DailyScorecard(
date="2026-02-08",
market="KR",
total_decisions=1,
buys=1,
sells=0,
holds=0,
total_pnl=10.0,
win_rate=100.0,
avg_confidence=90.0,
scenario_match_rate=100.0,
)
b = DailyScorecard(
date="2026-02-08",
market="US",
total_decisions=1,
buys=0,
sells=1,
holds=0,
total_pnl=-5.0,
win_rate=0.0,
avg_confidence=60.0,
scenario_match_rate=50.0,
)
a.top_winners.append("005930")
assert b.top_winners == []