From 78021d4695c5ea321883d15c4a5e459dea09bcdc Mon Sep 17 00:00:00 2001 From: agentson Date: Tue, 10 Feb 2026 04:23:49 +0900 Subject: [PATCH] feat: EOD aggregation with market filter (issue #86) - Add market parameter to aggregate_daily_from_trades() for per-market L6 aggregation - Store market-scoped keys (total_pnl_KR, win_rate_US, etc.) in L6/L5/L4 layers - Hook aggregate_daily_from_trades() into market close detection in run() - Update tests for market-scoped context keys Co-Authored-By: Claude Opus 4.6 --- src/context/aggregator.py | 139 +++++++++++++++++++++++++++----------- src/main.py | 9 +++ tests/test_context.py | 56 +++++++++------ 3 files changed, 144 insertions(+), 60 deletions(-) diff --git a/src/context/aggregator.py b/src/context/aggregator.py index b93ce81..8eaecab 100644 --- a/src/context/aggregator.py +++ b/src/context/aggregator.py @@ -18,52 +18,83 @@ class ContextAggregator: self.conn = conn self.store = ContextStore(conn) - def aggregate_daily_from_trades(self, date: str | None = None) -> None: + def aggregate_daily_from_trades( + self, date: str | None = None, market: str | None = None + ) -> None: """Aggregate L6 (daily) context from trades table. Args: date: Date in YYYY-MM-DD format. If None, uses today. + market: Market code filter (e.g., "KR", "US"). If None, aggregates all markets. """ if date is None: date = datetime.now(UTC).date().isoformat() - # Calculate daily metrics from trades - cursor = self.conn.execute( - """ - SELECT - COUNT(*) as trade_count, - SUM(CASE WHEN action = 'BUY' THEN 1 ELSE 0 END) as buys, - SUM(CASE WHEN action = 'SELL' THEN 1 ELSE 0 END) as sells, - SUM(CASE WHEN action = 'HOLD' THEN 1 ELSE 0 END) as holds, - AVG(confidence) as avg_confidence, - SUM(pnl) as total_pnl, - COUNT(DISTINCT stock_code) as unique_stocks, - SUM(CASE WHEN pnl > 0 THEN 1 ELSE 0 END) as wins, - SUM(CASE WHEN pnl < 0 THEN 1 ELSE 0 END) as losses - FROM trades - WHERE DATE(timestamp) = ? - """, - (date,), - ) - row = cursor.fetchone() - - if row and row[0] > 0: # At least one trade - trade_count, buys, sells, holds, avg_conf, total_pnl, stocks, wins, losses = row - - # Store daily metrics in L6 - self.store.set_context(ContextLayer.L6_DAILY, date, "trade_count", trade_count) - self.store.set_context(ContextLayer.L6_DAILY, date, "buys", buys) - self.store.set_context(ContextLayer.L6_DAILY, date, "sells", sells) - self.store.set_context(ContextLayer.L6_DAILY, date, "holds", holds) - self.store.set_context( - ContextLayer.L6_DAILY, date, "avg_confidence", round(avg_conf, 2) + if market is None: + cursor = self.conn.execute( + """ + SELECT DISTINCT market + FROM trades + WHERE DATE(timestamp) = ? + """, + (date,), ) - self.store.set_context( - ContextLayer.L6_DAILY, date, "total_pnl", round(total_pnl, 2) + markets = [row[0] for row in cursor.fetchall() if row[0]] + else: + markets = [market] + + for market_code in markets: + # Calculate daily metrics from trades for the market + cursor = self.conn.execute( + """ + SELECT + COUNT(*) as trade_count, + SUM(CASE WHEN action = 'BUY' THEN 1 ELSE 0 END) as buys, + SUM(CASE WHEN action = 'SELL' THEN 1 ELSE 0 END) as sells, + SUM(CASE WHEN action = 'HOLD' THEN 1 ELSE 0 END) as holds, + AVG(confidence) as avg_confidence, + SUM(pnl) as total_pnl, + COUNT(DISTINCT stock_code) as unique_stocks, + SUM(CASE WHEN pnl > 0 THEN 1 ELSE 0 END) as wins, + SUM(CASE WHEN pnl < 0 THEN 1 ELSE 0 END) as losses + FROM trades + WHERE DATE(timestamp) = ? AND market = ? + """, + (date, market_code), ) - self.store.set_context(ContextLayer.L6_DAILY, date, "unique_stocks", stocks) - win_rate = round(wins / max(wins + losses, 1) * 100, 2) - self.store.set_context(ContextLayer.L6_DAILY, date, "win_rate", win_rate) + row = cursor.fetchone() + + if row and row[0] > 0: # At least one trade + trade_count, buys, sells, holds, avg_conf, total_pnl, stocks, wins, losses = row + + key_suffix = f"_{market_code}" + + # Store daily metrics in L6 with market suffix + self.store.set_context( + ContextLayer.L6_DAILY, date, f"trade_count{key_suffix}", trade_count + ) + self.store.set_context(ContextLayer.L6_DAILY, date, f"buys{key_suffix}", buys) + self.store.set_context(ContextLayer.L6_DAILY, date, f"sells{key_suffix}", sells) + self.store.set_context(ContextLayer.L6_DAILY, date, f"holds{key_suffix}", holds) + self.store.set_context( + ContextLayer.L6_DAILY, + date, + f"avg_confidence{key_suffix}", + round(avg_conf, 2), + ) + self.store.set_context( + ContextLayer.L6_DAILY, + date, + f"total_pnl{key_suffix}", + round(total_pnl, 2), + ) + self.store.set_context( + ContextLayer.L6_DAILY, date, f"unique_stocks{key_suffix}", stocks + ) + win_rate = round(wins / max(wins + losses, 1) * 100, 2) + self.store.set_context( + ContextLayer.L6_DAILY, date, f"win_rate{key_suffix}", win_rate + ) def aggregate_weekly_from_daily(self, week: str | None = None) -> None: """Aggregate L5 (weekly) context from L6 (daily). @@ -92,14 +123,25 @@ class ContextAggregator: daily_data[row[0]].append(json.loads(row[1])) if daily_data: - # Sum all PnL values + # Sum all PnL values (market-specific if suffixed) if "total_pnl" in daily_data: total_pnl = sum(daily_data["total_pnl"]) self.store.set_context( ContextLayer.L5_WEEKLY, week, "weekly_pnl", round(total_pnl, 2) ) - # Average all confidence values + for key, values in daily_data.items(): + if key.startswith("total_pnl_"): + market_code = key.split("total_pnl_", 1)[1] + total_pnl = sum(values) + self.store.set_context( + ContextLayer.L5_WEEKLY, + week, + f"weekly_pnl_{market_code}", + round(total_pnl, 2), + ) + + # Average all confidence values (market-specific if suffixed) if "avg_confidence" in daily_data: conf_values = daily_data["avg_confidence"] avg_conf = sum(conf_values) / len(conf_values) @@ -107,6 +149,17 @@ class ContextAggregator: ContextLayer.L5_WEEKLY, week, "avg_confidence", round(avg_conf, 2) ) + for key, values in daily_data.items(): + if key.startswith("avg_confidence_"): + market_code = key.split("avg_confidence_", 1)[1] + avg_conf = sum(values) / len(values) + self.store.set_context( + ContextLayer.L5_WEEKLY, + week, + f"avg_confidence_{market_code}", + round(avg_conf, 2), + ) + def aggregate_monthly_from_weekly(self, month: str | None = None) -> None: """Aggregate L4 (monthly) context from L5 (weekly). @@ -135,8 +188,16 @@ class ContextAggregator: if weekly_data: # Sum all weekly PnL values + total_pnl_values: list[float] = [] if "weekly_pnl" in weekly_data: - total_pnl = sum(weekly_data["weekly_pnl"]) + total_pnl_values.extend(weekly_data["weekly_pnl"]) + + for key, values in weekly_data.items(): + if key.startswith("weekly_pnl_"): + total_pnl_values.extend(values) + + if total_pnl_values: + total_pnl = sum(total_pnl_values) self.store.set_context( ContextLayer.L4_MONTHLY, month, "monthly_pnl", round(total_pnl, 2) ) diff --git a/src/main.py b/src/main.py index 64c7f28..97c9afd 100644 --- a/src/main.py +++ b/src/main.py @@ -20,6 +20,7 @@ from src.brain.gemini_client import GeminiClient, TradeDecision from src.broker.kis_api import KISBroker from src.broker.overseas import OverseasBroker from src.config import Settings +from src.context.aggregator import ContextAggregator from src.context.layer import ContextLayer from src.context.store import ContextStore from src.core.criticality import CriticalityAssessor @@ -706,6 +707,7 @@ async def run(settings: Settings) -> None: db_conn = init_db(settings.DB_PATH) decision_logger = DecisionLogger(db_conn) context_store = ContextStore(db_conn) + context_aggregator = ContextAggregator(db_conn) # V2 proactive strategy components context_selector = ContextSelector(context_store) @@ -990,6 +992,13 @@ async def run(settings: Settings) -> None: market_info = MARKETS.get(market_code) if market_info: await telegram.notify_market_close(market_info.name, 0.0) + market_date = datetime.now( + market_info.timezone + ).date().isoformat() + context_aggregator.aggregate_daily_from_trades( + date=market_date, + market=market_code, + ) except Exception as exc: logger.warning("Market close notification failed: %s", exc) _market_states[market_code] = False diff --git a/tests/test_context.py b/tests/test_context.py index 145ef8c..5acce78 100644 --- a/tests/test_context.py +++ b/tests/test_context.py @@ -161,7 +161,7 @@ class TestContextAggregator: self, aggregator: ContextAggregator, db_conn: sqlite3.Connection ) -> None: """Test aggregating daily metrics from trades.""" - date = "2026-02-04" + date = datetime.now(UTC).date().isoformat() # Create sample trades log_trade(db_conn, "005930", "BUY", 85, "Good signal", quantity=10, price=70000, pnl=500) @@ -175,36 +175,44 @@ class TestContextAggregator: db_conn.commit() # Aggregate - aggregator.aggregate_daily_from_trades(date) + aggregator.aggregate_daily_from_trades(date, market="KR") # Verify L6 contexts store = aggregator.store - assert store.get_context(ContextLayer.L6_DAILY, date, "trade_count") == 3 - assert store.get_context(ContextLayer.L6_DAILY, date, "buys") == 1 - assert store.get_context(ContextLayer.L6_DAILY, date, "sells") == 1 - assert store.get_context(ContextLayer.L6_DAILY, date, "holds") == 1 - assert store.get_context(ContextLayer.L6_DAILY, date, "total_pnl") == 2000.0 - assert store.get_context(ContextLayer.L6_DAILY, date, "unique_stocks") == 3 + assert store.get_context(ContextLayer.L6_DAILY, date, "trade_count_KR") == 3 + assert store.get_context(ContextLayer.L6_DAILY, date, "buys_KR") == 1 + assert store.get_context(ContextLayer.L6_DAILY, date, "sells_KR") == 1 + assert store.get_context(ContextLayer.L6_DAILY, date, "holds_KR") == 1 + assert store.get_context(ContextLayer.L6_DAILY, date, "total_pnl_KR") == 2000.0 + assert store.get_context(ContextLayer.L6_DAILY, date, "unique_stocks_KR") == 3 # 2 wins, 0 losses - assert store.get_context(ContextLayer.L6_DAILY, date, "win_rate") == 100.0 + assert store.get_context(ContextLayer.L6_DAILY, date, "win_rate_KR") == 100.0 def test_aggregate_weekly_from_daily(self, aggregator: ContextAggregator) -> None: """Test aggregating weekly metrics from daily.""" week = "2026-W06" # Set daily contexts - aggregator.store.set_context(ContextLayer.L6_DAILY, "2026-02-02", "total_pnl", 100.0) - aggregator.store.set_context(ContextLayer.L6_DAILY, "2026-02-03", "total_pnl", 200.0) - aggregator.store.set_context(ContextLayer.L6_DAILY, "2026-02-02", "avg_confidence", 80.0) - aggregator.store.set_context(ContextLayer.L6_DAILY, "2026-02-03", "avg_confidence", 85.0) + aggregator.store.set_context( + ContextLayer.L6_DAILY, "2026-02-02", "total_pnl_KR", 100.0 + ) + aggregator.store.set_context( + ContextLayer.L6_DAILY, "2026-02-03", "total_pnl_KR", 200.0 + ) + aggregator.store.set_context( + ContextLayer.L6_DAILY, "2026-02-02", "avg_confidence_KR", 80.0 + ) + aggregator.store.set_context( + ContextLayer.L6_DAILY, "2026-02-03", "avg_confidence_KR", 85.0 + ) # Aggregate aggregator.aggregate_weekly_from_daily(week) # Verify L5 contexts store = aggregator.store - weekly_pnl = store.get_context(ContextLayer.L5_WEEKLY, week, "weekly_pnl") - avg_conf = store.get_context(ContextLayer.L5_WEEKLY, week, "avg_confidence") + weekly_pnl = store.get_context(ContextLayer.L5_WEEKLY, week, "weekly_pnl_KR") + avg_conf = store.get_context(ContextLayer.L5_WEEKLY, week, "avg_confidence_KR") assert weekly_pnl == 300.0 assert avg_conf == 82.5 @@ -214,9 +222,15 @@ class TestContextAggregator: month = "2026-02" # Set weekly contexts - aggregator.store.set_context(ContextLayer.L5_WEEKLY, "2026-W05", "weekly_pnl", 100.0) - aggregator.store.set_context(ContextLayer.L5_WEEKLY, "2026-W06", "weekly_pnl", 200.0) - aggregator.store.set_context(ContextLayer.L5_WEEKLY, "2026-W07", "weekly_pnl", 150.0) + aggregator.store.set_context( + ContextLayer.L5_WEEKLY, "2026-W05", "weekly_pnl_KR", 100.0 + ) + aggregator.store.set_context( + ContextLayer.L5_WEEKLY, "2026-W06", "weekly_pnl_KR", 200.0 + ) + aggregator.store.set_context( + ContextLayer.L5_WEEKLY, "2026-W07", "weekly_pnl_KR", 150.0 + ) # Aggregate aggregator.aggregate_monthly_from_weekly(month) @@ -285,7 +299,7 @@ class TestContextAggregator: self, aggregator: ContextAggregator, db_conn: sqlite3.Connection ) -> None: """Test running all aggregations from L7 to L1.""" - date = "2026-02-04" + date = datetime.now(UTC).date().isoformat() # Create sample trades log_trade(db_conn, "005930", "BUY", 85, "Good signal", quantity=10, price=70000, pnl=1000) @@ -299,12 +313,12 @@ class TestContextAggregator: # Verify data exists in each layer store = aggregator.store - assert store.get_context(ContextLayer.L6_DAILY, date, "total_pnl") == 1000.0 + assert store.get_context(ContextLayer.L6_DAILY, date, "total_pnl_KR") == 1000.0 from datetime import date as date_cls trade_date = date_cls.fromisoformat(date) iso_year, iso_week, _ = trade_date.isocalendar() trade_week = f"{iso_year}-W{iso_week:02d}" - assert store.get_context(ContextLayer.L5_WEEKLY, trade_week, "weekly_pnl") is not None + assert store.get_context(ContextLayer.L5_WEEKLY, trade_week, "weekly_pnl_KR") is not None trade_month = f"{trade_date.year}-{trade_date.month:02d}" trade_quarter = f"{trade_date.year}-Q{(trade_date.month - 1) // 3 + 1}" trade_year = str(trade_date.year)