diff --git a/src/strategy/pre_market_planner.py b/src/strategy/pre_market_planner.py new file mode 100644 index 0000000..cb03589 --- /dev/null +++ b/src/strategy/pre_market_planner.py @@ -0,0 +1,419 @@ +"""Pre-market planner — generates DayPlaybook via Gemini before market open. + +One Gemini API call per market per day. Candidates come from SmartVolatilityScanner. +On failure, returns a defensive playbook (all HOLD, no trades). +""" + +from __future__ import annotations + +import json +import logging +from datetime import date +from typing import Any + +from src.analysis.smart_scanner import ScanCandidate +from src.brain.context_selector import ContextSelector, DecisionType +from src.brain.gemini_client import GeminiClient +from src.config import Settings +from src.context.store import ContextLayer, ContextStore +from src.strategy.models import ( + CrossMarketContext, + DayPlaybook, + GlobalRule, + MarketOutlook, + ScenarioAction, + StockCondition, + StockPlaybook, + StockScenario, +) + +logger = logging.getLogger(__name__) + +# Mapping from string to MarketOutlook enum +_OUTLOOK_MAP: dict[str, MarketOutlook] = { + "bullish": MarketOutlook.BULLISH, + "neutral_to_bullish": MarketOutlook.NEUTRAL_TO_BULLISH, + "neutral": MarketOutlook.NEUTRAL, + "neutral_to_bearish": MarketOutlook.NEUTRAL_TO_BEARISH, + "bearish": MarketOutlook.BEARISH, +} + +_ACTION_MAP: dict[str, ScenarioAction] = { + "BUY": ScenarioAction.BUY, + "SELL": ScenarioAction.SELL, + "HOLD": ScenarioAction.HOLD, + "REDUCE_ALL": ScenarioAction.REDUCE_ALL, +} + + +class PreMarketPlanner: + """Generates a DayPlaybook by calling Gemini once before market open. + + Flow: + 1. Collect strategic context (L5-L7) + cross-market context + 2. Build a structured prompt with scan candidates + 3. Call Gemini for JSON scenario generation + 4. Parse and validate response into DayPlaybook + 5. On failure → defensive playbook (HOLD everything) + """ + + def __init__( + self, + gemini_client: GeminiClient, + context_store: ContextStore, + context_selector: ContextSelector, + settings: Settings, + ) -> None: + self._gemini = gemini_client + self._context_store = context_store + self._context_selector = context_selector + self._settings = settings + + async def generate_playbook( + self, + market: str, + candidates: list[ScanCandidate], + today: date | None = None, + ) -> DayPlaybook: + """Generate a DayPlaybook for a market using Gemini. + + Args: + market: Market code ("KR" or "US") + candidates: Stock candidates from SmartVolatilityScanner + today: Override date (defaults to date.today()). Use market-local date. + + Returns: + DayPlaybook with scenarios. Empty/defensive if no candidates or failure. + """ + if today is None: + today = date.today() + + if not candidates: + logger.info("No candidates for %s — returning empty playbook", market) + return self._empty_playbook(today, market) + + try: + # 1. Gather context + context_data = self._gather_context() + cross_market = self.build_cross_market_context(market, today) + + # 2. Build prompt + prompt = self._build_prompt(market, candidates, context_data, cross_market) + + # 3. Call Gemini + market_data = { + "stock_code": "PLANNER", + "current_price": 0, + "prompt_override": prompt, + } + decision = await self._gemini.decide(market_data) + + # 4. Parse response + playbook = self._parse_response( + decision.rationale, today, market, candidates, cross_market + ) + playbook_with_tokens = playbook.model_copy( + update={"token_count": decision.token_count} + ) + logger.info( + "Generated playbook for %s: %d stocks, %d scenarios, %d tokens", + market, + playbook_with_tokens.stock_count, + playbook_with_tokens.scenario_count, + playbook_with_tokens.token_count, + ) + return playbook_with_tokens + + except Exception: + logger.exception("Playbook generation failed for %s", market) + if self._settings.DEFENSIVE_PLAYBOOK_ON_FAILURE: + return self._defensive_playbook(today, market, candidates) + return self._empty_playbook(today, market) + + def build_cross_market_context( + self, target_market: str, today: date | None = None, + ) -> CrossMarketContext | None: + """Build cross-market context from the other market's L6 data. + + KR planner → reads US scorecard from previous night. + US planner → reads KR scorecard from today. + + Args: + target_market: The market being planned ("KR" or "US") + today: Override date (defaults to date.today()). Use market-local date. + """ + other_market = "US" if target_market == "KR" else "KR" + if today is None: + today = date.today() + timeframe = today.isoformat() + + scorecard_key = f"scorecard_{other_market}" + scorecard_data = self._context_store.get_context( + ContextLayer.L6_DAILY, timeframe, scorecard_key + ) + + if scorecard_data is None: + logger.debug("No cross-market scorecard found for %s", other_market) + return None + + if isinstance(scorecard_data, str): + try: + scorecard_data = json.loads(scorecard_data) + except (json.JSONDecodeError, TypeError): + return None + + if not isinstance(scorecard_data, dict): + return None + + return CrossMarketContext( + market=other_market, + date=timeframe, + total_pnl=float(scorecard_data.get("total_pnl", 0.0)), + win_rate=float(scorecard_data.get("win_rate", 0.0)), + index_change_pct=float(scorecard_data.get("index_change_pct", 0.0)), + key_events=scorecard_data.get("key_events", []), + lessons=scorecard_data.get("lessons", []), + ) + + def _gather_context(self) -> dict[str, Any]: + """Gather strategic context using ContextSelector.""" + layers = self._context_selector.select_layers( + decision_type=DecisionType.STRATEGIC, + include_realtime=True, + ) + return self._context_selector.get_context_data(layers, max_items_per_layer=10) + + def _build_prompt( + self, + market: str, + candidates: list[ScanCandidate], + context_data: dict[str, Any], + cross_market: CrossMarketContext | None, + ) -> str: + """Build a structured prompt for Gemini to generate scenario JSON.""" + max_scenarios = self._settings.MAX_SCENARIOS_PER_STOCK + + candidates_text = "\n".join( + f" - {c.stock_code} ({c.name}): price={c.price}, " + f"RSI={c.rsi:.1f}, volume_ratio={c.volume_ratio:.1f}, " + f"signal={c.signal}, score={c.score:.1f}" + for c in candidates + ) + + cross_market_text = "" + if cross_market: + cross_market_text = ( + f"\n## Other Market ({cross_market.market}) Summary\n" + f"- P&L: {cross_market.total_pnl:+.2f}%\n" + f"- Win Rate: {cross_market.win_rate:.0f}%\n" + f"- Index Change: {cross_market.index_change_pct:+.2f}%\n" + ) + if cross_market.lessons: + cross_market_text += f"- Lessons: {'; '.join(cross_market.lessons[:3])}\n" + + context_text = "" + if context_data: + context_text = "\n## Strategic Context\n" + for layer_name, layer_data in context_data.items(): + if layer_data: + context_text += f"### {layer_name}\n" + for key, value in list(layer_data.items())[:5]: + context_text += f" - {key}: {value}\n" + + return ( + f"You are a pre-market trading strategist for the {market} market.\n" + f"Generate structured trading scenarios for today.\n\n" + f"## Candidates (from volatility scanner)\n{candidates_text}\n" + f"{cross_market_text}" + f"{context_text}\n" + f"## Instructions\n" + f"Return a JSON object with this exact structure:\n" + f'{{\n' + f' "market_outlook": "bullish|neutral_to_bullish|neutral' + f'|neutral_to_bearish|bearish",\n' + f' "global_rules": [\n' + f' {{"condition": "portfolio_pnl_pct < -2.0",' + f' "action": "REDUCE_ALL", "rationale": "..."}}\n' + f' ],\n' + f' "stocks": [\n' + f' {{\n' + f' "stock_code": "...",\n' + f' "scenarios": [\n' + f' {{\n' + f' "condition": {{"rsi_below": 30, "volume_ratio_above": 2.0}},\n' + f' "action": "BUY|SELL|HOLD",\n' + f' "confidence": 85,\n' + f' "allocation_pct": 10.0,\n' + f' "stop_loss_pct": -2.0,\n' + f' "take_profit_pct": 3.0,\n' + f' "rationale": "..."\n' + f' }}\n' + f' ]\n' + f' }}\n' + f' ]\n' + f'}}\n\n' + f"Rules:\n" + f"- Max {max_scenarios} scenarios per stock\n" + f"- Only use stocks from the candidates list\n" + f"- Confidence 0-100 (80+ for actionable trades)\n" + f"- stop_loss_pct must be <= 0, take_profit_pct must be >= 0\n" + f"- Return ONLY the JSON, no markdown fences or explanation\n" + ) + + def _parse_response( + self, + response_text: str, + today: date, + market: str, + candidates: list[ScanCandidate], + cross_market: CrossMarketContext | None, + ) -> DayPlaybook: + """Parse Gemini's JSON response into a validated DayPlaybook.""" + cleaned = self._extract_json(response_text) + data = json.loads(cleaned) + + valid_codes = {c.stock_code for c in candidates} + + # Parse market outlook + outlook_str = data.get("market_outlook", "neutral") + market_outlook = _OUTLOOK_MAP.get(outlook_str, MarketOutlook.NEUTRAL) + + # Parse global rules + global_rules = [] + for rule_data in data.get("global_rules", []): + action_str = rule_data.get("action", "HOLD") + action = _ACTION_MAP.get(action_str, ScenarioAction.HOLD) + global_rules.append( + GlobalRule( + condition=rule_data.get("condition", ""), + action=action, + rationale=rule_data.get("rationale", ""), + ) + ) + + # Parse stock playbooks + stock_playbooks = [] + max_scenarios = self._settings.MAX_SCENARIOS_PER_STOCK + for stock_data in data.get("stocks", []): + code = stock_data.get("stock_code", "") + if code not in valid_codes: + logger.warning("Gemini returned unknown stock %s — skipping", code) + continue + + scenarios = [] + for sc_data in stock_data.get("scenarios", [])[:max_scenarios]: + scenario = self._parse_scenario(sc_data) + if scenario: + scenarios.append(scenario) + + if scenarios: + stock_playbooks.append( + StockPlaybook( + stock_code=code, + scenarios=scenarios, + ) + ) + + return DayPlaybook( + date=today, + market=market, + market_outlook=market_outlook, + global_rules=global_rules, + stock_playbooks=stock_playbooks, + cross_market=cross_market, + ) + + def _parse_scenario(self, sc_data: dict) -> StockScenario | None: + """Parse a single scenario from JSON data. Returns None if invalid.""" + try: + cond_data = sc_data.get("condition", {}) + condition = StockCondition( + rsi_below=cond_data.get("rsi_below"), + rsi_above=cond_data.get("rsi_above"), + volume_ratio_above=cond_data.get("volume_ratio_above"), + volume_ratio_below=cond_data.get("volume_ratio_below"), + price_above=cond_data.get("price_above"), + price_below=cond_data.get("price_below"), + price_change_pct_above=cond_data.get("price_change_pct_above"), + price_change_pct_below=cond_data.get("price_change_pct_below"), + ) + + if not condition.has_any_condition(): + logger.warning("Scenario has no conditions — skipping") + return None + + action_str = sc_data.get("action", "HOLD") + action = _ACTION_MAP.get(action_str, ScenarioAction.HOLD) + + return StockScenario( + condition=condition, + action=action, + confidence=int(sc_data.get("confidence", 50)), + allocation_pct=float(sc_data.get("allocation_pct", 10.0)), + stop_loss_pct=float(sc_data.get("stop_loss_pct", -2.0)), + take_profit_pct=float(sc_data.get("take_profit_pct", 3.0)), + rationale=sc_data.get("rationale", ""), + ) + except (ValueError, TypeError) as e: + logger.warning("Failed to parse scenario: %s", e) + return None + + @staticmethod + def _extract_json(text: str) -> str: + """Extract JSON from response, stripping markdown fences if present.""" + stripped = text.strip() + if stripped.startswith("```"): + # Remove first line (```json or ```) and last line (```) + lines = stripped.split("\n") + lines = lines[1:] # Remove opening fence + if lines and lines[-1].strip() == "```": + lines = lines[:-1] + stripped = "\n".join(lines) + return stripped.strip() + + @staticmethod + def _empty_playbook(today: date, market: str) -> DayPlaybook: + """Return an empty playbook (no stocks, no scenarios).""" + return DayPlaybook( + date=today, + market=market, + market_outlook=MarketOutlook.NEUTRAL, + stock_playbooks=[], + ) + + @staticmethod + def _defensive_playbook( + today: date, + market: str, + candidates: list[ScanCandidate], + ) -> DayPlaybook: + """Return a defensive playbook — HOLD everything with stop-loss ready.""" + stock_playbooks = [ + StockPlaybook( + stock_code=c.stock_code, + scenarios=[ + StockScenario( + condition=StockCondition(price_change_pct_below=-3.0), + action=ScenarioAction.SELL, + confidence=90, + stop_loss_pct=-3.0, + rationale="Defensive stop-loss (planner failure)", + ), + ], + ) + for c in candidates + ] + return DayPlaybook( + date=today, + market=market, + market_outlook=MarketOutlook.NEUTRAL_TO_BEARISH, + default_action=ScenarioAction.HOLD, + stock_playbooks=stock_playbooks, + global_rules=[ + GlobalRule( + condition="portfolio_pnl_pct < -2.0", + action=ScenarioAction.REDUCE_ALL, + rationale="Defensive: reduce on loss threshold", + ), + ], + ) diff --git a/tests/test_pre_market_planner.py b/tests/test_pre_market_planner.py new file mode 100644 index 0000000..aa3a662 --- /dev/null +++ b/tests/test_pre_market_planner.py @@ -0,0 +1,552 @@ +"""Tests for PreMarketPlanner — Gemini prompt builder + response parser.""" + +from __future__ import annotations + +import json +from datetime import date +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from src.analysis.smart_scanner import ScanCandidate +from src.brain.gemini_client import TradeDecision +from src.config import Settings +from src.context.store import ContextLayer +from src.strategy.models import ( + CrossMarketContext, + DayPlaybook, + MarketOutlook, + PlaybookStatus, + ScenarioAction, +) +from src.strategy.pre_market_planner import PreMarketPlanner + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +def _candidate( + code: str = "005930", + name: str = "Samsung", + price: float = 71000, + rsi: float = 28.5, + volume_ratio: float = 3.2, + signal: str = "oversold", + score: float = 82.0, +) -> ScanCandidate: + return ScanCandidate( + stock_code=code, + name=name, + price=price, + volume=1_500_000, + volume_ratio=volume_ratio, + rsi=rsi, + signal=signal, + score=score, + ) + + +def _gemini_response_json( + outlook: str = "neutral_to_bullish", + stocks: list[dict] | None = None, + global_rules: list[dict] | None = None, +) -> str: + """Build a valid Gemini JSON response.""" + if stocks is None: + stocks = [ + { + "stock_code": "005930", + "scenarios": [ + { + "condition": {"rsi_below": 30, "volume_ratio_above": 2.5}, + "action": "BUY", + "confidence": 85, + "allocation_pct": 15.0, + "stop_loss_pct": -2.0, + "take_profit_pct": 4.0, + "rationale": "Oversold bounce with high volume", + } + ], + } + ] + if global_rules is None: + global_rules = [ + { + "condition": "portfolio_pnl_pct < -2.0", + "action": "REDUCE_ALL", + "rationale": "Near circuit breaker", + } + ] + return json.dumps( + {"market_outlook": outlook, "global_rules": global_rules, "stocks": stocks} + ) + + +def _make_planner( + gemini_response: str = "", + token_count: int = 200, + context_data: dict | None = None, + scorecard_data: dict | None = None, +) -> PreMarketPlanner: + """Create a PreMarketPlanner with mocked dependencies.""" + if not gemini_response: + gemini_response = _gemini_response_json() + + # Mock GeminiClient + gemini = AsyncMock() + gemini.decide = AsyncMock( + return_value=TradeDecision( + action="HOLD", + confidence=0, + rationale=gemini_response, + token_count=token_count, + ) + ) + + # Mock ContextStore + store = MagicMock() + store.get_context = MagicMock(return_value=scorecard_data) + + # Mock ContextSelector + selector = MagicMock() + selector.select_layers = MagicMock(return_value=[ContextLayer.L7_REALTIME, ContextLayer.L6_DAILY]) + selector.get_context_data = MagicMock(return_value=context_data or {}) + + settings = Settings( + KIS_APP_KEY="test", + KIS_APP_SECRET="test", + KIS_ACCOUNT_NO="12345678-01", + GEMINI_API_KEY="test", + ) + + return PreMarketPlanner(gemini, store, selector, settings) + + +# --------------------------------------------------------------------------- +# generate_playbook +# --------------------------------------------------------------------------- + + +class TestGeneratePlaybook: + @pytest.mark.asyncio + async def test_basic_generation(self) -> None: + planner = _make_planner() + candidates = [_candidate()] + + pb = await planner.generate_playbook("KR", candidates, today=date(2026, 2, 8)) + + assert isinstance(pb, DayPlaybook) + assert pb.market == "KR" + assert pb.stock_count == 1 + assert pb.scenario_count == 1 + assert pb.market_outlook == MarketOutlook.NEUTRAL_TO_BULLISH + assert pb.token_count == 200 + + @pytest.mark.asyncio + async def test_empty_candidates_returns_empty_playbook(self) -> None: + planner = _make_planner() + + pb = await planner.generate_playbook("KR", [], today=date(2026, 2, 8)) + + assert pb.stock_count == 0 + assert pb.scenario_count == 0 + assert pb.market_outlook == MarketOutlook.NEUTRAL + + @pytest.mark.asyncio + async def test_gemini_failure_returns_defensive(self) -> None: + planner = _make_planner() + planner._gemini.decide = AsyncMock(side_effect=RuntimeError("API timeout")) + candidates = [_candidate()] + + pb = await planner.generate_playbook("KR", candidates, today=date(2026, 2, 8)) + + assert pb.default_action == ScenarioAction.HOLD + assert pb.market_outlook == MarketOutlook.NEUTRAL_TO_BEARISH + assert pb.stock_count == 1 + # Defensive playbook has stop-loss scenarios + assert pb.stock_playbooks[0].scenarios[0].action == ScenarioAction.SELL + + @pytest.mark.asyncio + async def test_gemini_failure_empty_when_defensive_disabled(self) -> None: + planner = _make_planner() + planner._settings.DEFENSIVE_PLAYBOOK_ON_FAILURE = False + planner._gemini.decide = AsyncMock(side_effect=RuntimeError("fail")) + candidates = [_candidate()] + + pb = await planner.generate_playbook("KR", candidates, today=date(2026, 2, 8)) + + assert pb.stock_count == 0 + + @pytest.mark.asyncio + async def test_multiple_candidates(self) -> None: + stocks = [ + { + "stock_code": "005930", + "scenarios": [ + { + "condition": {"rsi_below": 30}, + "action": "BUY", + "confidence": 85, + "rationale": "Oversold", + } + ], + }, + { + "stock_code": "AAPL", + "scenarios": [ + { + "condition": {"rsi_above": 75}, + "action": "SELL", + "confidence": 80, + "rationale": "Overbought", + } + ], + }, + ] + planner = _make_planner(gemini_response=_gemini_response_json(stocks=stocks)) + candidates = [_candidate(), _candidate(code="AAPL", name="Apple")] + + pb = await planner.generate_playbook("US", candidates, today=date(2026, 2, 8)) + + assert pb.stock_count == 2 + codes = [sp.stock_code for sp in pb.stock_playbooks] + assert "005930" in codes + assert "AAPL" in codes + + @pytest.mark.asyncio + async def test_unknown_stock_in_response_skipped(self) -> None: + stocks = [ + { + "stock_code": "005930", + "scenarios": [{"condition": {"rsi_below": 30}, "action": "BUY", "confidence": 85, "rationale": "ok"}], + }, + { + "stock_code": "UNKNOWN", + "scenarios": [{"condition": {"rsi_below": 20}, "action": "BUY", "confidence": 90, "rationale": "bad"}], + }, + ] + planner = _make_planner(gemini_response=_gemini_response_json(stocks=stocks)) + candidates = [_candidate()] # Only 005930 + + pb = await planner.generate_playbook("KR", candidates, today=date(2026, 2, 8)) + + assert pb.stock_count == 1 + assert pb.stock_playbooks[0].stock_code == "005930" + + @pytest.mark.asyncio + async def test_global_rules_parsed(self) -> None: + planner = _make_planner() + candidates = [_candidate()] + + pb = await planner.generate_playbook("KR", candidates, today=date(2026, 2, 8)) + + assert len(pb.global_rules) == 1 + assert pb.global_rules[0].action == ScenarioAction.REDUCE_ALL + + @pytest.mark.asyncio + async def test_token_count_from_decision(self) -> None: + planner = _make_planner(token_count=450) + candidates = [_candidate()] + + pb = await planner.generate_playbook("KR", candidates, today=date(2026, 2, 8)) + + assert pb.token_count == 450 + + +# --------------------------------------------------------------------------- +# _parse_response +# --------------------------------------------------------------------------- + + +class TestParseResponse: + def test_parse_full_response(self) -> None: + planner = _make_planner() + response = _gemini_response_json(outlook="bearish") + candidates = [_candidate()] + + pb = planner._parse_response(response, date(2026, 2, 8), "KR", candidates, None) + + assert pb.market_outlook == MarketOutlook.BEARISH + assert pb.stock_count == 1 + assert pb.stock_playbooks[0].scenarios[0].confidence == 85 + + def test_parse_with_markdown_fences(self) -> None: + planner = _make_planner() + response = f"```json\n{_gemini_response_json()}\n```" + candidates = [_candidate()] + + pb = planner._parse_response(response, date(2026, 2, 8), "KR", candidates, None) + + assert pb.stock_count == 1 + + def test_parse_unknown_outlook_defaults_neutral(self) -> None: + planner = _make_planner() + response = _gemini_response_json(outlook="super_bullish") + candidates = [_candidate()] + + pb = planner._parse_response(response, date(2026, 2, 8), "KR", candidates, None) + + assert pb.market_outlook == MarketOutlook.NEUTRAL + + def test_parse_scenario_with_all_condition_fields(self) -> None: + planner = _make_planner() + stocks = [ + { + "stock_code": "005930", + "scenarios": [ + { + "condition": { + "rsi_below": 25, + "volume_ratio_above": 3.0, + "price_change_pct_below": -2.0, + }, + "action": "BUY", + "confidence": 92, + "allocation_pct": 20.0, + "stop_loss_pct": -3.0, + "take_profit_pct": 5.0, + "rationale": "Multi-condition entry", + } + ], + } + ] + response = _gemini_response_json(stocks=stocks) + candidates = [_candidate()] + + pb = planner._parse_response(response, date(2026, 2, 8), "KR", candidates, None) + + sc = pb.stock_playbooks[0].scenarios[0] + assert sc.condition.rsi_below == 25 + assert sc.condition.volume_ratio_above == 3.0 + assert sc.condition.price_change_pct_below == -2.0 + assert sc.allocation_pct == 20.0 + assert sc.stop_loss_pct == -3.0 + assert sc.take_profit_pct == 5.0 + + def test_parse_empty_condition_scenario_skipped(self) -> None: + planner = _make_planner() + stocks = [ + { + "stock_code": "005930", + "scenarios": [ + { + "condition": {}, + "action": "BUY", + "confidence": 85, + "rationale": "No conditions", + }, + { + "condition": {"rsi_below": 30}, + "action": "BUY", + "confidence": 80, + "rationale": "Valid", + }, + ], + } + ] + response = _gemini_response_json(stocks=stocks) + candidates = [_candidate()] + + pb = planner._parse_response(response, date(2026, 2, 8), "KR", candidates, None) + + # Empty condition scenario skipped, valid one kept + assert pb.stock_count == 1 + assert pb.stock_playbooks[0].scenarios[0].confidence == 80 + + def test_parse_max_scenarios_enforced(self) -> None: + planner = _make_planner() + # Settings default MAX_SCENARIOS_PER_STOCK = 5 + scenarios = [ + { + "condition": {"rsi_below": 20 + i}, + "action": "BUY", + "confidence": 80 + i, + "rationale": f"Scenario {i}", + } + for i in range(8) # 8 scenarios, should be capped to 5 + ] + stocks = [{"stock_code": "005930", "scenarios": scenarios}] + response = _gemini_response_json(stocks=stocks) + candidates = [_candidate()] + + pb = planner._parse_response(response, date(2026, 2, 8), "KR", candidates, None) + + assert len(pb.stock_playbooks[0].scenarios) == 5 + + def test_parse_invalid_json_raises(self) -> None: + planner = _make_planner() + candidates = [_candidate()] + + with pytest.raises(json.JSONDecodeError): + planner._parse_response("not json at all", date(2026, 2, 8), "KR", candidates, None) + + def test_parse_cross_market_preserved(self) -> None: + planner = _make_planner() + response = _gemini_response_json() + candidates = [_candidate()] + cross = CrossMarketContext(market="US", date="2026-02-07", total_pnl=1.5, win_rate=60) + + pb = planner._parse_response(response, date(2026, 2, 8), "KR", candidates, cross) + + assert pb.cross_market is not None + assert pb.cross_market.market == "US" + assert pb.cross_market.total_pnl == 1.5 + + +# --------------------------------------------------------------------------- +# build_cross_market_context +# --------------------------------------------------------------------------- + + +class TestBuildCrossMarketContext: + def test_kr_reads_us_scorecard(self) -> None: + scorecard = {"total_pnl": 2.5, "win_rate": 65, "index_change_pct": 0.8, "lessons": ["Stay patient"]} + planner = _make_planner(scorecard_data=scorecard) + + ctx = planner.build_cross_market_context("KR", today=date(2026, 2, 8)) + + assert ctx is not None + assert ctx.market == "US" + assert ctx.total_pnl == 2.5 + assert ctx.win_rate == 65 + assert "Stay patient" in ctx.lessons + + # Verify it queried scorecard_US + planner._context_store.get_context.assert_called_once_with( + ContextLayer.L6_DAILY, "2026-02-08", "scorecard_US" + ) + + def test_us_reads_kr_scorecard(self) -> None: + scorecard = {"total_pnl": -1.0, "win_rate": 40, "index_change_pct": -0.5} + planner = _make_planner(scorecard_data=scorecard) + + ctx = planner.build_cross_market_context("US", today=date(2026, 2, 8)) + + assert ctx is not None + assert ctx.market == "KR" + assert ctx.total_pnl == -1.0 + + planner._context_store.get_context.assert_called_once_with( + ContextLayer.L6_DAILY, "2026-02-08", "scorecard_KR" + ) + + def test_no_scorecard_returns_none(self) -> None: + planner = _make_planner(scorecard_data=None) + + ctx = planner.build_cross_market_context("KR", today=date(2026, 2, 8)) + + assert ctx is None + + def test_invalid_scorecard_returns_none(self) -> None: + planner = _make_planner(scorecard_data="not a dict and not json") + + ctx = planner.build_cross_market_context("KR", today=date(2026, 2, 8)) + + assert ctx is None + + +# --------------------------------------------------------------------------- +# _build_prompt +# --------------------------------------------------------------------------- + + +class TestBuildPrompt: + def test_prompt_contains_candidates(self) -> None: + planner = _make_planner() + candidates = [_candidate(code="005930", name="Samsung")] + + prompt = planner._build_prompt("KR", candidates, {}, None) + + assert "005930" in prompt + assert "Samsung" in prompt + assert "RSI=" in prompt + assert "volume_ratio=" in prompt + + def test_prompt_contains_cross_market(self) -> None: + planner = _make_planner() + cross = CrossMarketContext( + market="US", date="2026-02-07", total_pnl=1.5, + win_rate=60, index_change_pct=0.8, lessons=["Cut losses early"], + ) + + prompt = planner._build_prompt("KR", [_candidate()], {}, cross) + + assert "Other Market (US)" in prompt + assert "+1.50%" in prompt + assert "Cut losses early" in prompt + + def test_prompt_contains_context_data(self) -> None: + planner = _make_planner() + context = {"L6_DAILY": {"win_rate": 0.65, "total_pnl": 2.5}} + + prompt = planner._build_prompt("KR", [_candidate()], context, None) + + assert "Strategic Context" in prompt + assert "L6_DAILY" in prompt + assert "win_rate" in prompt + + def test_prompt_contains_max_scenarios(self) -> None: + planner = _make_planner() + prompt = planner._build_prompt("KR", [_candidate()], {}, None) + + assert f"Max {planner._settings.MAX_SCENARIOS_PER_STOCK} scenarios" in prompt + + def test_prompt_market_name(self) -> None: + planner = _make_planner() + prompt = planner._build_prompt("US", [_candidate()], {}, None) + assert "US market" in prompt + + +# --------------------------------------------------------------------------- +# _extract_json +# --------------------------------------------------------------------------- + + +class TestExtractJson: + def test_plain_json(self) -> None: + assert PreMarketPlanner._extract_json('{"a": 1}') == '{"a": 1}' + + def test_with_json_fence(self) -> None: + text = '```json\n{"a": 1}\n```' + assert PreMarketPlanner._extract_json(text) == '{"a": 1}' + + def test_with_plain_fence(self) -> None: + text = '```\n{"a": 1}\n```' + assert PreMarketPlanner._extract_json(text) == '{"a": 1}' + + def test_with_whitespace(self) -> None: + text = ' \n {"a": 1} \n ' + assert PreMarketPlanner._extract_json(text) == '{"a": 1}' + + +# --------------------------------------------------------------------------- +# Defensive playbook +# --------------------------------------------------------------------------- + + +class TestDefensivePlaybook: + def test_defensive_has_stop_loss(self) -> None: + candidates = [_candidate(code="005930"), _candidate(code="AAPL")] + pb = PreMarketPlanner._defensive_playbook(date(2026, 2, 8), "KR", candidates) + + assert pb.default_action == ScenarioAction.HOLD + assert pb.market_outlook == MarketOutlook.NEUTRAL_TO_BEARISH + assert pb.stock_count == 2 + for sp in pb.stock_playbooks: + assert sp.scenarios[0].action == ScenarioAction.SELL + assert sp.scenarios[0].stop_loss_pct == -3.0 + + def test_defensive_has_global_rule(self) -> None: + pb = PreMarketPlanner._defensive_playbook(date(2026, 2, 8), "KR", [_candidate()]) + + assert len(pb.global_rules) == 1 + assert pb.global_rules[0].action == ScenarioAction.REDUCE_ALL + + def test_empty_playbook(self) -> None: + pb = PreMarketPlanner._empty_playbook(date(2026, 2, 8), "US") + + assert pb.stock_count == 0 + assert pb.market == "US" + assert pb.market_outlook == MarketOutlook.NEUTRAL