Compare commits

...

10 Commits

Author SHA1 Message Date
agentson
d64e072f06 fix: PR review — DB reload, market-local date, market-scoped scan_candidates
Some checks failed
CI / test (pull_request) Has been cancelled
Address PR #110 review findings:

1. High — Realtime mode now loads playbook from DB before calling Gemini,
   preventing duplicate API calls on process restart (4/day budget).
2. Medium — Pass market-local date (via market.timezone) to
   generate_playbook() and _empty_playbook() instead of date.today().
3. Medium — scan_candidates restructured from {stock_code: candidate}
   to {market_code: {stock_code: candidate}} to prevent KR/US symbol
   collision.

New test: test_scan_candidates_market_scoped verifies cross-market
isolation.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 23:00:06 +09:00
agentson
b2312fbe01 fix: resolve lint issues in main.py and test_main.py
Some checks failed
CI / test (pull_request) Has been cancelled
Remove unused imports (sys, ScenarioMatch, asyncio, StockPlaybook),
fix import ordering, and split long lines for ruff compliance.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 22:28:31 +09:00
agentson
98c4a2413c feat: integrate scenario engine and playbook into main trading loop (issue #84)
Replace brain.decide() with scenario_engine.evaluate() in trading_cycle
and brain.decide_batch() with per-stock scenario evaluation in
run_daily_session. Initialize PreMarketPlanner, ScenarioEngine, and
PlaybookStore in run(). Add pre-market playbook generation on market
open (1 Gemini call per market per day), market_data enrichment from
scanner metrics (rsi, volume_ratio), portfolio_data for global rules,
scenario match notifications, and playbook lifecycle management.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 22:24:19 +09:00
6fba7c7ae8 Merge pull request 'feat: implement pre-market planner with Gemini integration (issue #83)' (#109) from feature/issue-83-pre-market-planner into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #109
2026-02-08 22:07:36 +09:00
agentson
be695a5d7c fix: address PR review — inject today param, remove unused imports, fix lint
Some checks failed
CI / test (pull_request) Has been cancelled
Review findings addressed:
- Finding 1 (ImportError): false positive — ContextLayer is re-exported from
  src.context.store, import works correctly at runtime
- Finding 2 (timezone): generate_playbook() and build_cross_market_context()
  now accept optional today parameter for market-local date injection
- Finding 3 (lint): removed unused imports (UTC, datetime, PlaybookStatus),
  fixed line-too-long in prompt template
- Tests simplified: replaced date patching with direct today= parameter

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 21:57:39 +09:00
agentson
6471e66d89 fix: correct Settings field name in planner tests (KIS_ACCOUNT_NO)
Some checks failed
CI / test (pull_request) Has been cancelled
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 21:36:42 +09:00
agentson
149039a904 feat: implement pre-market planner with Gemini integration (issue #83)
PreMarketPlanner generates DayPlaybook via single Gemini API call per market:
- Structured JSON prompt with scan candidates + strategic context
- Cross-market context (KR reads US scorecard, US reads KR scorecard)
- Robust JSON parser with markdown fence stripping
- Unknown stock filtering (only scanner candidates allowed)
- MAX_SCENARIOS_PER_STOCK enforcement
- Defensive playbook on failure (HOLD + stop-loss)
- Empty playbook when no candidates (safe, no trades)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 21:35:57 +09:00
815d675529 Merge pull request 'feat: add Telegram playbook notifications (issue #81)' (#108) from feature/issue-81-telegram-playbook-notify into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #108
2026-02-08 21:27:46 +09:00
agentson
e8634b93c3 feat: add Telegram playbook notifications (issue #81)
Some checks failed
CI / test (pull_request) Has been cancelled
- notify_playbook_generated(): market, stock/scenario count, token usage (MEDIUM)
- notify_scenario_matched(): stock, action, condition, confidence (HIGH)
- notify_playbook_failed(): market, reason with 200-char truncation (HIGH)
- 6 new tests: 3 format + 3 priority validations

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 21:25:16 +09:00
f20736fd2a Merge pull request 'feat: add playbook persistence with DB schema and CRUD store (issue #82)' (#107) from feature/issue-82-playbook-persistence into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #107
2026-02-08 21:07:13 +09:00
6 changed files with 1868 additions and 100 deletions

View File

@@ -10,14 +10,14 @@ import argparse
import asyncio
import logging
import signal
import sys
from datetime import UTC, datetime
from typing import Any
from src.analysis.scanner import MarketScanner
from src.analysis.smart_scanner import ScanCandidate, SmartVolatilityScanner
from src.analysis.volatility import VolatilityAnalyzer
from src.brain.gemini_client import GeminiClient
from src.brain.context_selector import ContextSelector
from src.brain.gemini_client import GeminiClient, TradeDecision
from src.broker.kis_api import KISBroker
from src.broker.overseas import OverseasBroker
from src.config import Settings
@@ -31,6 +31,10 @@ from src.logging.decision_logger import DecisionLogger
from src.logging_config import setup_logging
from src.markets.schedule import MarketInfo, get_next_market_open, get_open_markets
from src.notifications.telegram_client import TelegramClient, TelegramCommandHandler
from src.strategy.models import DayPlaybook
from src.strategy.playbook_store import PlaybookStore
from src.strategy.pre_market_planner import PreMarketPlanner
from src.strategy.scenario_engine import ScenarioEngine
logger = logging.getLogger(__name__)
@@ -75,7 +79,8 @@ TRADE_SESSION_INTERVAL_HOURS = 6 # Hours between sessions
async def trading_cycle(
broker: KISBroker,
overseas_broker: OverseasBroker,
brain: GeminiClient,
scenario_engine: ScenarioEngine,
playbook: DayPlaybook,
risk: RiskManager,
db_conn: Any,
decision_logger: DecisionLogger,
@@ -84,7 +89,7 @@ async def trading_cycle(
telegram: TelegramClient,
market: MarketInfo,
stock_code: str,
scan_candidates: dict[str, ScanCandidate],
scan_candidates: dict[str, dict[str, ScanCandidate]],
) -> None:
"""Execute one trading cycle for a single stock."""
cycle_start_time = asyncio.get_event_loop().time()
@@ -135,13 +140,27 @@ async def trading_cycle(
else 0.0
)
market_data = {
market_data: dict[str, Any] = {
"stock_code": stock_code,
"market_name": market.name,
"current_price": current_price,
"foreigner_net": foreigner_net,
}
# Enrich market_data with scanner metrics for scenario engine
market_candidates = scan_candidates.get(market.code, {})
candidate = market_candidates.get(stock_code)
if candidate:
market_data["rsi"] = candidate.rsi
market_data["volume_ratio"] = candidate.volume_ratio
# Build portfolio data for global rule evaluation
portfolio_data = {
"portfolio_pnl_pct": pnl_pct,
"total_cash": total_cash,
"total_eval": total_eval,
}
# 1.5. Get volatility metrics from context store (L7_REALTIME)
latest_timeframe = context_store.get_latest_timeframe(ContextLayer.L7_REALTIME)
volatility_score = 50.0 # Default normal volatility
@@ -178,8 +197,13 @@ async def trading_cycle(
volume_surge,
)
# 2. Ask the brain for a decision
decision = await brain.decide(market_data)
# 2. Evaluate scenario (local, no API call)
match = scenario_engine.evaluate(playbook, stock_code, market_data, portfolio_data)
decision = TradeDecision(
action=match.action.value,
confidence=match.confidence,
rationale=match.rationale,
)
logger.info(
"Decision for %s (%s): %s (confidence=%d)",
stock_code,
@@ -188,6 +212,19 @@ async def trading_cycle(
decision.confidence,
)
# 2.1. Notify scenario match
if match.matched_scenario is not None:
try:
condition_parts = [f"{k}={v}" for k, v in match.match_details.items()]
await telegram.notify_scenario_matched(
stock_code=stock_code,
action=decision.action,
condition_summary=", ".join(condition_parts) if condition_parts else "matched",
confidence=float(decision.confidence),
)
except Exception as exc:
logger.warning("Scenario matched notification failed: %s", exc)
# 2.5. Log decision with context snapshot
context_snapshot = {
"L1": {
@@ -200,7 +237,7 @@ async def trading_cycle(
"purchase_total": purchase_total,
"pnl_pct": pnl_pct,
},
# L3-L7 will be populated when context tree is implemented
"scenario_match": match.match_details,
}
input_data = {
"current_price": current_price,
@@ -279,8 +316,8 @@ async def trading_cycle(
# 6. Log trade with selection context
selection_context = None
if stock_code in scan_candidates:
candidate = scan_candidates[stock_code]
if stock_code in market_candidates:
candidate = market_candidates[stock_code]
selection_context = {
"rsi": candidate.rsi,
"volume_ratio": candidate.volume_ratio,
@@ -324,7 +361,9 @@ async def trading_cycle(
async def run_daily_session(
broker: KISBroker,
overseas_broker: OverseasBroker,
brain: GeminiClient,
scenario_engine: ScenarioEngine,
playbook_store: PlaybookStore,
pre_market_planner: PreMarketPlanner,
risk: RiskManager,
db_conn: Any,
decision_logger: DecisionLogger,
@@ -336,10 +375,8 @@ async def run_daily_session(
) -> None:
"""Execute one daily trading session.
Designed for API efficiency with Gemini Free tier:
- Batch decision making (1 API call per market)
- Runs N times per day at fixed intervals
- Minimizes API usage while maintaining trading capability
V2 proactive strategy: 1 Gemini call for playbook generation,
then local scenario evaluation per stock (0 API calls).
"""
# Get currently open markets
open_markets = get_open_markets(settings.enabled_market_list)
@@ -352,27 +389,66 @@ async def run_daily_session(
# Process each open market
for market in open_markets:
# Use market-local date for playbook keying
market_today = datetime.now(market.timezone).date()
# Dynamic stock discovery via scanner (no static watchlists)
candidates_list: list[ScanCandidate] = []
try:
candidates = await smart_scanner.scan()
watchlist = [c.stock_code for c in candidates] if candidates else []
candidates_list = await smart_scanner.scan() if smart_scanner else []
except Exception as exc:
logger.error("Smart Scanner failed for %s: %s", market.name, exc)
watchlist = []
if not watchlist:
if not candidates_list:
logger.info("No scanner candidates for market %s — skipping", market.code)
continue
watchlist = [c.stock_code for c in candidates_list]
candidate_map = {c.stock_code: c for c in candidates_list}
logger.info("Processing market: %s (%d stocks)", market.name, len(watchlist))
# Generate or load playbook (1 Gemini API call per market per day)
playbook = playbook_store.load(market_today, market.code)
if playbook is None:
try:
playbook = await pre_market_planner.generate_playbook(
market=market.code,
candidates=candidates_list,
today=market_today,
)
playbook_store.save(playbook)
try:
await telegram.notify_playbook_generated(
market=market.code,
stock_count=playbook.stock_count,
scenario_count=playbook.scenario_count,
token_count=playbook.token_count,
)
except Exception as exc:
logger.warning("Playbook notification failed: %s", exc)
logger.info(
"Generated playbook for %s: %d stocks, %d scenarios",
market.code, playbook.stock_count, playbook.scenario_count,
)
except Exception as exc:
logger.error("Playbook generation failed for %s: %s", market.code, exc)
try:
await telegram.notify_playbook_failed(
market=market.code, reason=str(exc)[:200],
)
except Exception as notify_exc:
logger.warning("Playbook failed notification error: %s", notify_exc)
playbook = PreMarketPlanner._empty_playbook(market_today, market.code)
# Collect market data for all stocks from scanner
stocks_data = []
for stock_code in watchlist:
try:
if market.is_domestic:
orderbook = await broker.get_orderbook(stock_code)
current_price = safe_float(orderbook.get("output1", {}).get("stck_prpr", "0"))
current_price = safe_float(
orderbook.get("output1", {}).get("stck_prpr", "0")
)
foreigner_net = safe_float(
orderbook.get("output1", {}).get("frgn_ntby_qty", "0")
)
@@ -380,17 +456,23 @@ async def run_daily_session(
price_data = await overseas_broker.get_overseas_price(
market.exchange_code, stock_code
)
current_price = safe_float(price_data.get("output", {}).get("last", "0"))
current_price = safe_float(
price_data.get("output", {}).get("last", "0")
)
foreigner_net = 0.0
stocks_data.append(
{
stock_data: dict[str, Any] = {
"stock_code": stock_code,
"market_name": market.name,
"current_price": current_price,
"foreigner_net": foreigner_net,
}
)
# Enrich with scanner metrics
cand = candidate_map.get(stock_code)
if cand:
stock_data["rsi"] = cand.rsi
stock_data["volume_ratio"] = cand.volume_ratio
stocks_data.append(stock_data)
except Exception as exc:
logger.error("Failed to fetch data for %s: %s", stock_code, exc)
continue
@@ -399,17 +481,19 @@ async def run_daily_session(
logger.warning("No valid stock data for market %s", market.code)
continue
# Get batch decisions (1 API call for all stocks in this market)
logger.info("Requesting batch decision for %d stocks in %s", len(stocks_data), market.name)
decisions = await brain.decide_batch(stocks_data)
# Get balance data once for the market
if market.is_domestic:
balance_data = await broker.get_balance()
output2 = balance_data.get("output2", [{}])
total_eval = safe_float(output2[0].get("tot_evlu_amt", "0")) if output2 else 0
total_cash = safe_float(output2[0].get("dnca_tot_amt", "0")) if output2 else 0
purchase_total = safe_float(output2[0].get("pchs_amt_smtl_amt", "0")) if output2 else 0
total_eval = safe_float(
output2[0].get("tot_evlu_amt", "0")
) if output2 else 0
total_cash = safe_float(
output2[0].get("dnca_tot_amt", "0")
) if output2 else 0
purchase_total = safe_float(
output2[0].get("pchs_amt_smtl_amt", "0")
) if output2 else 0
else:
balance_data = await overseas_broker.get_overseas_balance(market.exchange_code)
output2 = balance_data.get("output2", [{}])
@@ -422,21 +506,37 @@ async def run_daily_session(
total_eval = safe_float(balance_info.get("frcr_evlu_tota", "0") or "0")
total_cash = safe_float(balance_info.get("frcr_dncl_amt_2", "0") or "0")
purchase_total = safe_float(balance_info.get("frcr_buy_amt_smtl", "0") or "0")
purchase_total = safe_float(
balance_info.get("frcr_buy_amt_smtl", "0") or "0"
)
# Calculate daily P&L %
pnl_pct = (
((total_eval - purchase_total) / purchase_total * 100) if purchase_total > 0 else 0.0
((total_eval - purchase_total) / purchase_total * 100)
if purchase_total > 0
else 0.0
)
portfolio_data = {
"portfolio_pnl_pct": pnl_pct,
"total_cash": total_cash,
"total_eval": total_eval,
}
# Execute decisions for each stock
# Evaluate scenarios for each stock (local, no API calls)
logger.info(
"Evaluating %d stocks against playbook for %s",
len(stocks_data), market.name,
)
for stock_data in stocks_data:
stock_code = stock_data["stock_code"]
decision = decisions.get(stock_code)
if not decision:
logger.warning("No decision for %s — skipping", stock_code)
continue
match = scenario_engine.evaluate(
playbook, stock_code, stock_data, portfolio_data,
)
decision = TradeDecision(
action=match.action.value,
confidence=match.confidence,
rationale=match.rationale,
)
logger.info(
"Decision for %s (%s): %s (confidence=%d)",
@@ -458,6 +558,7 @@ async def run_daily_session(
"purchase_total": purchase_total,
"pnl_pct": pnl_pct,
},
"scenario_match": match.match_details,
}
input_data = {
"current_price": stock_data["current_price"],
@@ -509,7 +610,9 @@ async def run_daily_session(
threshold=exc.threshold,
)
except Exception as notify_exc:
logger.warning("Circuit breaker notification failed: %s", notify_exc)
logger.warning(
"Circuit breaker notification failed: %s", notify_exc
)
raise
# Send order
@@ -544,7 +647,9 @@ async def run_daily_session(
except Exception as exc:
logger.warning("Telegram notification failed: %s", exc)
except Exception as exc:
logger.error("Order execution failed for %s: %s", stock_code, exc)
logger.error(
"Order execution failed for %s: %s", stock_code, exc
)
continue
# Log trade
@@ -571,6 +676,20 @@ async def run(settings: Settings) -> None:
decision_logger = DecisionLogger(db_conn)
context_store = ContextStore(db_conn)
# V2 proactive strategy components
context_selector = ContextSelector(context_store)
scenario_engine = ScenarioEngine()
playbook_store = PlaybookStore(db_conn)
pre_market_planner = PreMarketPlanner(
gemini_client=brain,
context_store=context_store,
context_selector=context_selector,
settings=settings,
)
# Track playbooks per market (in-memory cache)
playbooks: dict[str, DayPlaybook] = {}
# Initialize Telegram notifications
telegram = TelegramClient(
bot_token=settings.TELEGRAM_BOT_TOKEN,
@@ -732,8 +851,8 @@ async def run(settings: Settings) -> None:
settings=settings,
)
# Track scan candidates for selection context logging
scan_candidates: dict[str, ScanCandidate] = {} # stock_code -> candidate
# Track scan candidates per market for selection context logging
scan_candidates: dict[str, dict[str, ScanCandidate]] = {} # market -> {stock_code -> candidate}
# Active stocks per market (dynamically discovered by scanner)
active_stocks: dict[str, list[str]] = {} # market_code -> [stock_codes]
@@ -802,7 +921,9 @@ async def run(settings: Settings) -> None:
await run_daily_session(
broker,
overseas_broker,
brain,
scenario_engine,
playbook_store,
pre_market_planner,
risk,
db_conn,
decision_logger,
@@ -850,6 +971,8 @@ async def run(settings: Settings) -> None:
except Exception as exc:
logger.warning("Market close notification failed: %s", exc)
_market_states[market_code] = False
# Clear playbook for closed market (new one generated next open)
playbooks.pop(market_code, None)
# No markets open — wait until next market opens
try:
@@ -887,7 +1010,8 @@ async def run(settings: Settings) -> None:
# Smart Scanner: dynamic stock discovery (no static watchlists)
now_timestamp = asyncio.get_event_loop().time()
last_scan = last_scan_time.get(market.code, 0.0)
if now_timestamp - last_scan >= SCAN_INTERVAL_SECONDS:
rescan_interval = settings.RESCAN_INTERVAL_SECONDS
if now_timestamp - last_scan >= rescan_interval:
try:
logger.info("Smart Scanner: Scanning %s market", market.name)
@@ -899,9 +1023,10 @@ async def run(settings: Settings) -> None:
candidates
)
# Store candidates for selection context logging
for candidate in candidates:
scan_candidates[candidate.stock_code] = candidate
# Store candidates per market for selection context logging
scan_candidates[market.code] = {
c.stock_code: c for c in candidates
}
logger.info(
"Smart Scanner: Found %d candidates for %s: %s",
@@ -909,6 +1034,62 @@ async def run(settings: Settings) -> None:
market.name,
[f"{c.stock_code}(RSI={c.rsi:.0f})" for c in candidates],
)
# Get market-local date for playbook keying
market_today = datetime.now(
market.timezone
).date()
# Load or generate playbook (1 Gemini call per market per day)
if market.code not in playbooks:
# Try DB first (survives process restart)
stored_pb = playbook_store.load(market_today, market.code)
if stored_pb is not None:
playbooks[market.code] = stored_pb
logger.info(
"Loaded existing playbook for %s from DB"
" (%d stocks, %d scenarios)",
market.code,
stored_pb.stock_count,
stored_pb.scenario_count,
)
else:
try:
pb = await pre_market_planner.generate_playbook(
market=market.code,
candidates=candidates,
today=market_today,
)
playbook_store.save(pb)
playbooks[market.code] = pb
try:
await telegram.notify_playbook_generated(
market=market.code,
stock_count=pb.stock_count,
scenario_count=pb.scenario_count,
token_count=pb.token_count,
)
except Exception as exc:
logger.warning(
"Playbook notification failed: %s", exc
)
except Exception as exc:
logger.error(
"Playbook generation failed for %s: %s",
market.code, exc,
)
try:
await telegram.notify_playbook_failed(
market=market.code,
reason=str(exc)[:200],
)
except Exception:
pass
playbooks[market.code] = (
PreMarketPlanner._empty_playbook(
market_today, market.code
)
)
else:
logger.info(
"Smart Scanner: No candidates for %s — no trades", market.name
@@ -933,13 +1114,22 @@ async def run(settings: Settings) -> None:
if shutdown.is_set():
break
# Get playbook for this market
market_playbook = playbooks.get(
market.code,
PreMarketPlanner._empty_playbook(
datetime.now(market.timezone).date(), market.code
),
)
# Retry logic for connection errors
for attempt in range(1, MAX_CONNECTION_RETRIES + 1):
try:
await trading_cycle(
broker,
overseas_broker,
brain,
scenario_engine,
market_playbook,
risk,
db_conn,
decision_logger,
@@ -988,7 +1178,8 @@ async def run(settings: Settings) -> None:
metrics = await priority_queue.get_metrics()
if metrics.total_enqueued > 0:
logger.info(
"Priority queue metrics: enqueued=%d, dequeued=%d, size=%d, timeouts=%d, errors=%d",
"Priority queue metrics: enqueued=%d, dequeued=%d,"
" size=%d, timeouts=%d, errors=%d",
metrics.total_enqueued,
metrics.total_dequeued,
metrics.current_size,

View File

@@ -304,6 +304,77 @@ class TelegramClient:
NotificationMessage(priority=NotificationPriority.MEDIUM, message=message)
)
async def notify_playbook_generated(
self,
market: str,
stock_count: int,
scenario_count: int,
token_count: int,
) -> None:
"""
Notify that a daily playbook was generated.
Args:
market: Market code (e.g., "KR", "US")
stock_count: Number of stocks in the playbook
scenario_count: Total number of scenarios
token_count: Gemini token usage for the playbook
"""
message = (
f"<b>Playbook Generated</b>\n"
f"Market: {market}\n"
f"Stocks: {stock_count}\n"
f"Scenarios: {scenario_count}\n"
f"Tokens: {token_count}"
)
await self._send_notification(
NotificationMessage(priority=NotificationPriority.MEDIUM, message=message)
)
async def notify_scenario_matched(
self,
stock_code: str,
action: str,
condition_summary: str,
confidence: float,
) -> None:
"""
Notify that a scenario matched for a stock.
Args:
stock_code: Stock ticker symbol
action: Scenario action (BUY/SELL/HOLD/REDUCE_ALL)
condition_summary: Short summary of the matched condition
confidence: Scenario confidence (0-100)
"""
message = (
f"<b>Scenario Matched</b>\n"
f"Symbol: <code>{stock_code}</code>\n"
f"Action: {action}\n"
f"Condition: {condition_summary}\n"
f"Confidence: {confidence:.0f}%"
)
await self._send_notification(
NotificationMessage(priority=NotificationPriority.HIGH, message=message)
)
async def notify_playbook_failed(self, market: str, reason: str) -> None:
"""
Notify that playbook generation failed.
Args:
market: Market code (e.g., "KR", "US")
reason: Failure reason summary
"""
message = (
f"<b>Playbook Failed</b>\n"
f"Market: {market}\n"
f"Reason: {reason[:200]}"
)
await self._send_notification(
NotificationMessage(priority=NotificationPriority.HIGH, message=message)
)
async def notify_system_shutdown(self, reason: str) -> None:
"""
Notify system shutdown.

View File

@@ -0,0 +1,419 @@
"""Pre-market planner — generates DayPlaybook via Gemini before market open.
One Gemini API call per market per day. Candidates come from SmartVolatilityScanner.
On failure, returns a defensive playbook (all HOLD, no trades).
"""
from __future__ import annotations
import json
import logging
from datetime import date
from typing import Any
from src.analysis.smart_scanner import ScanCandidate
from src.brain.context_selector import ContextSelector, DecisionType
from src.brain.gemini_client import GeminiClient
from src.config import Settings
from src.context.store import ContextLayer, ContextStore
from src.strategy.models import (
CrossMarketContext,
DayPlaybook,
GlobalRule,
MarketOutlook,
ScenarioAction,
StockCondition,
StockPlaybook,
StockScenario,
)
logger = logging.getLogger(__name__)
# Mapping from string to MarketOutlook enum
_OUTLOOK_MAP: dict[str, MarketOutlook] = {
"bullish": MarketOutlook.BULLISH,
"neutral_to_bullish": MarketOutlook.NEUTRAL_TO_BULLISH,
"neutral": MarketOutlook.NEUTRAL,
"neutral_to_bearish": MarketOutlook.NEUTRAL_TO_BEARISH,
"bearish": MarketOutlook.BEARISH,
}
_ACTION_MAP: dict[str, ScenarioAction] = {
"BUY": ScenarioAction.BUY,
"SELL": ScenarioAction.SELL,
"HOLD": ScenarioAction.HOLD,
"REDUCE_ALL": ScenarioAction.REDUCE_ALL,
}
class PreMarketPlanner:
"""Generates a DayPlaybook by calling Gemini once before market open.
Flow:
1. Collect strategic context (L5-L7) + cross-market context
2. Build a structured prompt with scan candidates
3. Call Gemini for JSON scenario generation
4. Parse and validate response into DayPlaybook
5. On failure → defensive playbook (HOLD everything)
"""
def __init__(
self,
gemini_client: GeminiClient,
context_store: ContextStore,
context_selector: ContextSelector,
settings: Settings,
) -> None:
self._gemini = gemini_client
self._context_store = context_store
self._context_selector = context_selector
self._settings = settings
async def generate_playbook(
self,
market: str,
candidates: list[ScanCandidate],
today: date | None = None,
) -> DayPlaybook:
"""Generate a DayPlaybook for a market using Gemini.
Args:
market: Market code ("KR" or "US")
candidates: Stock candidates from SmartVolatilityScanner
today: Override date (defaults to date.today()). Use market-local date.
Returns:
DayPlaybook with scenarios. Empty/defensive if no candidates or failure.
"""
if today is None:
today = date.today()
if not candidates:
logger.info("No candidates for %s — returning empty playbook", market)
return self._empty_playbook(today, market)
try:
# 1. Gather context
context_data = self._gather_context()
cross_market = self.build_cross_market_context(market, today)
# 2. Build prompt
prompt = self._build_prompt(market, candidates, context_data, cross_market)
# 3. Call Gemini
market_data = {
"stock_code": "PLANNER",
"current_price": 0,
"prompt_override": prompt,
}
decision = await self._gemini.decide(market_data)
# 4. Parse response
playbook = self._parse_response(
decision.rationale, today, market, candidates, cross_market
)
playbook_with_tokens = playbook.model_copy(
update={"token_count": decision.token_count}
)
logger.info(
"Generated playbook for %s: %d stocks, %d scenarios, %d tokens",
market,
playbook_with_tokens.stock_count,
playbook_with_tokens.scenario_count,
playbook_with_tokens.token_count,
)
return playbook_with_tokens
except Exception:
logger.exception("Playbook generation failed for %s", market)
if self._settings.DEFENSIVE_PLAYBOOK_ON_FAILURE:
return self._defensive_playbook(today, market, candidates)
return self._empty_playbook(today, market)
def build_cross_market_context(
self, target_market: str, today: date | None = None,
) -> CrossMarketContext | None:
"""Build cross-market context from the other market's L6 data.
KR planner → reads US scorecard from previous night.
US planner → reads KR scorecard from today.
Args:
target_market: The market being planned ("KR" or "US")
today: Override date (defaults to date.today()). Use market-local date.
"""
other_market = "US" if target_market == "KR" else "KR"
if today is None:
today = date.today()
timeframe = today.isoformat()
scorecard_key = f"scorecard_{other_market}"
scorecard_data = self._context_store.get_context(
ContextLayer.L6_DAILY, timeframe, scorecard_key
)
if scorecard_data is None:
logger.debug("No cross-market scorecard found for %s", other_market)
return None
if isinstance(scorecard_data, str):
try:
scorecard_data = json.loads(scorecard_data)
except (json.JSONDecodeError, TypeError):
return None
if not isinstance(scorecard_data, dict):
return None
return CrossMarketContext(
market=other_market,
date=timeframe,
total_pnl=float(scorecard_data.get("total_pnl", 0.0)),
win_rate=float(scorecard_data.get("win_rate", 0.0)),
index_change_pct=float(scorecard_data.get("index_change_pct", 0.0)),
key_events=scorecard_data.get("key_events", []),
lessons=scorecard_data.get("lessons", []),
)
def _gather_context(self) -> dict[str, Any]:
"""Gather strategic context using ContextSelector."""
layers = self._context_selector.select_layers(
decision_type=DecisionType.STRATEGIC,
include_realtime=True,
)
return self._context_selector.get_context_data(layers, max_items_per_layer=10)
def _build_prompt(
self,
market: str,
candidates: list[ScanCandidate],
context_data: dict[str, Any],
cross_market: CrossMarketContext | None,
) -> str:
"""Build a structured prompt for Gemini to generate scenario JSON."""
max_scenarios = self._settings.MAX_SCENARIOS_PER_STOCK
candidates_text = "\n".join(
f" - {c.stock_code} ({c.name}): price={c.price}, "
f"RSI={c.rsi:.1f}, volume_ratio={c.volume_ratio:.1f}, "
f"signal={c.signal}, score={c.score:.1f}"
for c in candidates
)
cross_market_text = ""
if cross_market:
cross_market_text = (
f"\n## Other Market ({cross_market.market}) Summary\n"
f"- P&L: {cross_market.total_pnl:+.2f}%\n"
f"- Win Rate: {cross_market.win_rate:.0f}%\n"
f"- Index Change: {cross_market.index_change_pct:+.2f}%\n"
)
if cross_market.lessons:
cross_market_text += f"- Lessons: {'; '.join(cross_market.lessons[:3])}\n"
context_text = ""
if context_data:
context_text = "\n## Strategic Context\n"
for layer_name, layer_data in context_data.items():
if layer_data:
context_text += f"### {layer_name}\n"
for key, value in list(layer_data.items())[:5]:
context_text += f" - {key}: {value}\n"
return (
f"You are a pre-market trading strategist for the {market} market.\n"
f"Generate structured trading scenarios for today.\n\n"
f"## Candidates (from volatility scanner)\n{candidates_text}\n"
f"{cross_market_text}"
f"{context_text}\n"
f"## Instructions\n"
f"Return a JSON object with this exact structure:\n"
f'{{\n'
f' "market_outlook": "bullish|neutral_to_bullish|neutral'
f'|neutral_to_bearish|bearish",\n'
f' "global_rules": [\n'
f' {{"condition": "portfolio_pnl_pct < -2.0",'
f' "action": "REDUCE_ALL", "rationale": "..."}}\n'
f' ],\n'
f' "stocks": [\n'
f' {{\n'
f' "stock_code": "...",\n'
f' "scenarios": [\n'
f' {{\n'
f' "condition": {{"rsi_below": 30, "volume_ratio_above": 2.0}},\n'
f' "action": "BUY|SELL|HOLD",\n'
f' "confidence": 85,\n'
f' "allocation_pct": 10.0,\n'
f' "stop_loss_pct": -2.0,\n'
f' "take_profit_pct": 3.0,\n'
f' "rationale": "..."\n'
f' }}\n'
f' ]\n'
f' }}\n'
f' ]\n'
f'}}\n\n'
f"Rules:\n"
f"- Max {max_scenarios} scenarios per stock\n"
f"- Only use stocks from the candidates list\n"
f"- Confidence 0-100 (80+ for actionable trades)\n"
f"- stop_loss_pct must be <= 0, take_profit_pct must be >= 0\n"
f"- Return ONLY the JSON, no markdown fences or explanation\n"
)
def _parse_response(
self,
response_text: str,
today: date,
market: str,
candidates: list[ScanCandidate],
cross_market: CrossMarketContext | None,
) -> DayPlaybook:
"""Parse Gemini's JSON response into a validated DayPlaybook."""
cleaned = self._extract_json(response_text)
data = json.loads(cleaned)
valid_codes = {c.stock_code for c in candidates}
# Parse market outlook
outlook_str = data.get("market_outlook", "neutral")
market_outlook = _OUTLOOK_MAP.get(outlook_str, MarketOutlook.NEUTRAL)
# Parse global rules
global_rules = []
for rule_data in data.get("global_rules", []):
action_str = rule_data.get("action", "HOLD")
action = _ACTION_MAP.get(action_str, ScenarioAction.HOLD)
global_rules.append(
GlobalRule(
condition=rule_data.get("condition", ""),
action=action,
rationale=rule_data.get("rationale", ""),
)
)
# Parse stock playbooks
stock_playbooks = []
max_scenarios = self._settings.MAX_SCENARIOS_PER_STOCK
for stock_data in data.get("stocks", []):
code = stock_data.get("stock_code", "")
if code not in valid_codes:
logger.warning("Gemini returned unknown stock %s — skipping", code)
continue
scenarios = []
for sc_data in stock_data.get("scenarios", [])[:max_scenarios]:
scenario = self._parse_scenario(sc_data)
if scenario:
scenarios.append(scenario)
if scenarios:
stock_playbooks.append(
StockPlaybook(
stock_code=code,
scenarios=scenarios,
)
)
return DayPlaybook(
date=today,
market=market,
market_outlook=market_outlook,
global_rules=global_rules,
stock_playbooks=stock_playbooks,
cross_market=cross_market,
)
def _parse_scenario(self, sc_data: dict) -> StockScenario | None:
"""Parse a single scenario from JSON data. Returns None if invalid."""
try:
cond_data = sc_data.get("condition", {})
condition = StockCondition(
rsi_below=cond_data.get("rsi_below"),
rsi_above=cond_data.get("rsi_above"),
volume_ratio_above=cond_data.get("volume_ratio_above"),
volume_ratio_below=cond_data.get("volume_ratio_below"),
price_above=cond_data.get("price_above"),
price_below=cond_data.get("price_below"),
price_change_pct_above=cond_data.get("price_change_pct_above"),
price_change_pct_below=cond_data.get("price_change_pct_below"),
)
if not condition.has_any_condition():
logger.warning("Scenario has no conditions — skipping")
return None
action_str = sc_data.get("action", "HOLD")
action = _ACTION_MAP.get(action_str, ScenarioAction.HOLD)
return StockScenario(
condition=condition,
action=action,
confidence=int(sc_data.get("confidence", 50)),
allocation_pct=float(sc_data.get("allocation_pct", 10.0)),
stop_loss_pct=float(sc_data.get("stop_loss_pct", -2.0)),
take_profit_pct=float(sc_data.get("take_profit_pct", 3.0)),
rationale=sc_data.get("rationale", ""),
)
except (ValueError, TypeError) as e:
logger.warning("Failed to parse scenario: %s", e)
return None
@staticmethod
def _extract_json(text: str) -> str:
"""Extract JSON from response, stripping markdown fences if present."""
stripped = text.strip()
if stripped.startswith("```"):
# Remove first line (```json or ```) and last line (```)
lines = stripped.split("\n")
lines = lines[1:] # Remove opening fence
if lines and lines[-1].strip() == "```":
lines = lines[:-1]
stripped = "\n".join(lines)
return stripped.strip()
@staticmethod
def _empty_playbook(today: date, market: str) -> DayPlaybook:
"""Return an empty playbook (no stocks, no scenarios)."""
return DayPlaybook(
date=today,
market=market,
market_outlook=MarketOutlook.NEUTRAL,
stock_playbooks=[],
)
@staticmethod
def _defensive_playbook(
today: date,
market: str,
candidates: list[ScanCandidate],
) -> DayPlaybook:
"""Return a defensive playbook — HOLD everything with stop-loss ready."""
stock_playbooks = [
StockPlaybook(
stock_code=c.stock_code,
scenarios=[
StockScenario(
condition=StockCondition(price_change_pct_below=-3.0),
action=ScenarioAction.SELL,
confidence=90,
stop_loss_pct=-3.0,
rationale="Defensive stop-loss (planner failure)",
),
],
)
for c in candidates
]
return DayPlaybook(
date=today,
market=market,
market_outlook=MarketOutlook.NEUTRAL_TO_BEARISH,
default_action=ScenarioAction.HOLD,
stock_playbooks=stock_playbooks,
global_rules=[
GlobalRule(
condition="portfolio_pnl_pct < -2.0",
action=ScenarioAction.REDUCE_ALL,
rationale="Defensive: reduce on loss threshold",
),
],
)

View File

@@ -1,12 +1,46 @@
"""Tests for main trading loop telegram integration."""
"""Tests for main trading loop integration."""
import asyncio
from datetime import date
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from src.core.risk_manager import CircuitBreakerTripped, FatFingerRejected
from src.main import safe_float, trading_cycle
from src.strategy.models import (
DayPlaybook,
ScenarioAction,
StockCondition,
StockScenario,
)
from src.strategy.scenario_engine import ScenarioEngine, ScenarioMatch
def _make_playbook(market: str = "KR") -> DayPlaybook:
"""Create a minimal empty playbook for testing."""
return DayPlaybook(date=date(2026, 2, 8), market=market)
def _make_buy_match(stock_code: str = "005930") -> ScenarioMatch:
"""Create a ScenarioMatch that returns BUY."""
return ScenarioMatch(
stock_code=stock_code,
matched_scenario=None,
action=ScenarioAction.BUY,
confidence=85,
rationale="Test buy",
)
def _make_hold_match(stock_code: str = "005930") -> ScenarioMatch:
"""Create a ScenarioMatch that returns HOLD."""
return ScenarioMatch(
stock_code=stock_code,
matched_scenario=None,
action=ScenarioAction.HOLD,
confidence=0,
rationale="No scenario conditions met",
)
class TestSafeFloat:
@@ -81,15 +115,16 @@ class TestTradingCycleTelegramIntegration:
return broker
@pytest.fixture
def mock_brain(self) -> MagicMock:
"""Create mock brain that decides to buy."""
brain = MagicMock()
decision = MagicMock()
decision.action = "BUY"
decision.confidence = 85
decision.rationale = "Test buy"
brain.decide = AsyncMock(return_value=decision)
return brain
def mock_scenario_engine(self) -> MagicMock:
"""Create mock scenario engine that returns BUY."""
engine = MagicMock(spec=ScenarioEngine)
engine.evaluate = MagicMock(return_value=_make_buy_match())
return engine
@pytest.fixture
def mock_playbook(self) -> DayPlaybook:
"""Create a minimal day playbook."""
return _make_playbook()
@pytest.fixture
def mock_risk(self) -> MagicMock:
@@ -134,6 +169,7 @@ class TestTradingCycleTelegramIntegration:
telegram.notify_trade_execution = AsyncMock()
telegram.notify_fat_finger = AsyncMock()
telegram.notify_circuit_breaker = AsyncMock()
telegram.notify_scenario_matched = AsyncMock()
return telegram
@pytest.fixture
@@ -151,7 +187,8 @@ class TestTradingCycleTelegramIntegration:
self,
mock_broker: MagicMock,
mock_overseas_broker: MagicMock,
mock_brain: MagicMock,
mock_scenario_engine: MagicMock,
mock_playbook: DayPlaybook,
mock_risk: MagicMock,
mock_db: MagicMock,
mock_decision_logger: MagicMock,
@@ -165,7 +202,8 @@ class TestTradingCycleTelegramIntegration:
await trading_cycle(
broker=mock_broker,
overseas_broker=mock_overseas_broker,
brain=mock_brain,
scenario_engine=mock_scenario_engine,
playbook=mock_playbook,
risk=mock_risk,
db_conn=mock_db,
decision_logger=mock_decision_logger,
@@ -190,7 +228,8 @@ class TestTradingCycleTelegramIntegration:
self,
mock_broker: MagicMock,
mock_overseas_broker: MagicMock,
mock_brain: MagicMock,
mock_scenario_engine: MagicMock,
mock_playbook: DayPlaybook,
mock_risk: MagicMock,
mock_db: MagicMock,
mock_decision_logger: MagicMock,
@@ -208,7 +247,8 @@ class TestTradingCycleTelegramIntegration:
await trading_cycle(
broker=mock_broker,
overseas_broker=mock_overseas_broker,
brain=mock_brain,
scenario_engine=mock_scenario_engine,
playbook=mock_playbook,
risk=mock_risk,
db_conn=mock_db,
decision_logger=mock_decision_logger,
@@ -228,7 +268,8 @@ class TestTradingCycleTelegramIntegration:
self,
mock_broker: MagicMock,
mock_overseas_broker: MagicMock,
mock_brain: MagicMock,
mock_scenario_engine: MagicMock,
mock_playbook: DayPlaybook,
mock_risk: MagicMock,
mock_db: MagicMock,
mock_decision_logger: MagicMock,
@@ -250,7 +291,8 @@ class TestTradingCycleTelegramIntegration:
await trading_cycle(
broker=mock_broker,
overseas_broker=mock_overseas_broker,
brain=mock_brain,
scenario_engine=mock_scenario_engine,
playbook=mock_playbook,
risk=mock_risk,
db_conn=mock_db,
decision_logger=mock_decision_logger,
@@ -275,7 +317,8 @@ class TestTradingCycleTelegramIntegration:
self,
mock_broker: MagicMock,
mock_overseas_broker: MagicMock,
mock_brain: MagicMock,
mock_scenario_engine: MagicMock,
mock_playbook: DayPlaybook,
mock_risk: MagicMock,
mock_db: MagicMock,
mock_decision_logger: MagicMock,
@@ -299,7 +342,8 @@ class TestTradingCycleTelegramIntegration:
await trading_cycle(
broker=mock_broker,
overseas_broker=mock_overseas_broker,
brain=mock_brain,
scenario_engine=mock_scenario_engine,
playbook=mock_playbook,
risk=mock_risk,
db_conn=mock_db,
decision_logger=mock_decision_logger,
@@ -319,7 +363,8 @@ class TestTradingCycleTelegramIntegration:
self,
mock_broker: MagicMock,
mock_overseas_broker: MagicMock,
mock_brain: MagicMock,
mock_scenario_engine: MagicMock,
mock_playbook: DayPlaybook,
mock_risk: MagicMock,
mock_db: MagicMock,
mock_decision_logger: MagicMock,
@@ -329,18 +374,15 @@ class TestTradingCycleTelegramIntegration:
mock_market: MagicMock,
) -> None:
"""Test no trade notification sent when decision is HOLD."""
# Change brain decision to HOLD
decision = MagicMock()
decision.action = "HOLD"
decision.confidence = 50
decision.rationale = "Insufficient signal"
mock_brain.decide = AsyncMock(return_value=decision)
# Scenario engine returns HOLD
mock_scenario_engine.evaluate = MagicMock(return_value=_make_hold_match())
with patch("src.main.log_trade"):
await trading_cycle(
broker=mock_broker,
overseas_broker=mock_overseas_broker,
brain=mock_brain,
scenario_engine=mock_scenario_engine,
playbook=mock_playbook,
risk=mock_risk,
db_conn=mock_db,
decision_logger=mock_decision_logger,
@@ -472,15 +514,16 @@ class TestOverseasBalanceParsing:
return market
@pytest.fixture
def mock_brain_hold(self) -> MagicMock:
"""Create mock brain that always holds."""
brain = MagicMock()
decision = MagicMock()
decision.action = "HOLD"
decision.confidence = 50
decision.rationale = "Testing balance parsing"
brain.decide = AsyncMock(return_value=decision)
return brain
def mock_scenario_engine_hold(self) -> MagicMock:
"""Create mock scenario engine that always returns HOLD."""
engine = MagicMock(spec=ScenarioEngine)
engine.evaluate = MagicMock(return_value=_make_hold_match("AAPL"))
return engine
@pytest.fixture
def mock_playbook(self) -> DayPlaybook:
"""Create a minimal playbook."""
return _make_playbook("US")
@pytest.fixture
def mock_risk(self) -> MagicMock:
@@ -517,14 +560,17 @@ class TestOverseasBalanceParsing:
@pytest.fixture
def mock_telegram(self) -> MagicMock:
"""Create mock telegram client."""
return MagicMock()
telegram = MagicMock()
telegram.notify_scenario_matched = AsyncMock()
return telegram
@pytest.mark.asyncio
async def test_overseas_balance_list_format(
self,
mock_domestic_broker: MagicMock,
mock_overseas_broker_with_list: MagicMock,
mock_brain_hold: MagicMock,
mock_scenario_engine_hold: MagicMock,
mock_playbook: DayPlaybook,
mock_risk: MagicMock,
mock_db: MagicMock,
mock_decision_logger: MagicMock,
@@ -539,7 +585,8 @@ class TestOverseasBalanceParsing:
await trading_cycle(
broker=mock_domestic_broker,
overseas_broker=mock_overseas_broker_with_list,
brain=mock_brain_hold,
scenario_engine=mock_scenario_engine_hold,
playbook=mock_playbook,
risk=mock_risk,
db_conn=mock_db,
decision_logger=mock_decision_logger,
@@ -559,7 +606,8 @@ class TestOverseasBalanceParsing:
self,
mock_domestic_broker: MagicMock,
mock_overseas_broker_with_dict: MagicMock,
mock_brain_hold: MagicMock,
mock_scenario_engine_hold: MagicMock,
mock_playbook: DayPlaybook,
mock_risk: MagicMock,
mock_db: MagicMock,
mock_decision_logger: MagicMock,
@@ -574,7 +622,8 @@ class TestOverseasBalanceParsing:
await trading_cycle(
broker=mock_domestic_broker,
overseas_broker=mock_overseas_broker_with_dict,
brain=mock_brain_hold,
scenario_engine=mock_scenario_engine_hold,
playbook=mock_playbook,
risk=mock_risk,
db_conn=mock_db,
decision_logger=mock_decision_logger,
@@ -594,7 +643,8 @@ class TestOverseasBalanceParsing:
self,
mock_domestic_broker: MagicMock,
mock_overseas_broker_with_empty: MagicMock,
mock_brain_hold: MagicMock,
mock_scenario_engine_hold: MagicMock,
mock_playbook: DayPlaybook,
mock_risk: MagicMock,
mock_db: MagicMock,
mock_decision_logger: MagicMock,
@@ -609,7 +659,8 @@ class TestOverseasBalanceParsing:
await trading_cycle(
broker=mock_domestic_broker,
overseas_broker=mock_overseas_broker_with_empty,
brain=mock_brain_hold,
scenario_engine=mock_scenario_engine_hold,
playbook=mock_playbook,
risk=mock_risk,
db_conn=mock_db,
decision_logger=mock_decision_logger,
@@ -629,7 +680,8 @@ class TestOverseasBalanceParsing:
self,
mock_domestic_broker: MagicMock,
mock_overseas_broker_with_empty_price: MagicMock,
mock_brain_hold: MagicMock,
mock_scenario_engine_hold: MagicMock,
mock_playbook: DayPlaybook,
mock_risk: MagicMock,
mock_db: MagicMock,
mock_decision_logger: MagicMock,
@@ -644,7 +696,8 @@ class TestOverseasBalanceParsing:
await trading_cycle(
broker=mock_domestic_broker,
overseas_broker=mock_overseas_broker_with_empty_price,
brain=mock_brain_hold,
scenario_engine=mock_scenario_engine_hold,
playbook=mock_playbook,
risk=mock_risk,
db_conn=mock_db,
decision_logger=mock_decision_logger,
@@ -658,3 +711,341 @@ class TestOverseasBalanceParsing:
# Verify price API was called
mock_overseas_broker_with_empty_price.get_overseas_price.assert_called_once()
class TestScenarioEngineIntegration:
"""Test scenario engine integration in trading_cycle."""
@pytest.fixture
def mock_broker(self) -> MagicMock:
"""Create mock broker with standard domestic data."""
broker = MagicMock()
broker.get_orderbook = AsyncMock(
return_value={
"output1": {"stck_prpr": "50000", "frgn_ntby_qty": "100"}
}
)
broker.get_balance = AsyncMock(
return_value={
"output2": [
{
"tot_evlu_amt": "10000000",
"dnca_tot_amt": "5000000",
"pchs_amt_smtl_amt": "9500000",
}
]
}
)
broker.send_order = AsyncMock(return_value={"msg1": "OK"})
return broker
@pytest.fixture
def mock_market(self) -> MagicMock:
"""Create mock KR market."""
market = MagicMock()
market.name = "Korea"
market.code = "KR"
market.exchange_code = "KRX"
market.is_domestic = True
return market
@pytest.fixture
def mock_telegram(self) -> MagicMock:
"""Create mock telegram with all notification methods."""
telegram = MagicMock()
telegram.notify_trade_execution = AsyncMock()
telegram.notify_scenario_matched = AsyncMock()
telegram.notify_fat_finger = AsyncMock()
return telegram
@pytest.mark.asyncio
async def test_scenario_engine_called_with_enriched_market_data(
self, mock_broker: MagicMock, mock_market: MagicMock, mock_telegram: MagicMock,
) -> None:
"""Test scenario engine receives market_data enriched with scanner metrics."""
from src.analysis.smart_scanner import ScanCandidate
engine = MagicMock(spec=ScenarioEngine)
engine.evaluate = MagicMock(return_value=_make_hold_match())
playbook = _make_playbook()
candidate = ScanCandidate(
stock_code="005930", name="Samsung", price=50000,
volume=1000000, volume_ratio=3.5, rsi=25.0,
signal="oversold", score=85.0,
)
with patch("src.main.log_trade"):
await trading_cycle(
broker=mock_broker,
overseas_broker=MagicMock(),
scenario_engine=engine,
playbook=playbook,
risk=MagicMock(),
db_conn=MagicMock(),
decision_logger=MagicMock(),
context_store=MagicMock(get_latest_timeframe=MagicMock(return_value=None)),
criticality_assessor=MagicMock(
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
get_timeout=MagicMock(return_value=5.0),
),
telegram=mock_telegram,
market=mock_market,
stock_code="005930",
scan_candidates={"KR": {"005930": candidate}},
)
# Verify evaluate was called
engine.evaluate.assert_called_once()
call_args = engine.evaluate.call_args
market_data = call_args[0][2] # 3rd positional arg
portfolio_data = call_args[0][3] # 4th positional arg
# Scanner data should be enriched into market_data
assert market_data["rsi"] == 25.0
assert market_data["volume_ratio"] == 3.5
assert market_data["current_price"] == 50000.0
# Portfolio data should include pnl
assert "portfolio_pnl_pct" in portfolio_data
assert "total_cash" in portfolio_data
@pytest.mark.asyncio
async def test_scan_candidates_market_scoped(
self, mock_broker: MagicMock, mock_market: MagicMock, mock_telegram: MagicMock,
) -> None:
"""Test scan_candidates uses market-scoped lookup, ignoring other markets."""
from src.analysis.smart_scanner import ScanCandidate
engine = MagicMock(spec=ScenarioEngine)
engine.evaluate = MagicMock(return_value=_make_hold_match())
# Candidate stored under US market — should NOT be found for KR market
us_candidate = ScanCandidate(
stock_code="005930", name="Overlap", price=100,
volume=500000, volume_ratio=5.0, rsi=15.0,
signal="oversold", score=90.0,
)
with patch("src.main.log_trade"):
await trading_cycle(
broker=mock_broker,
overseas_broker=MagicMock(),
scenario_engine=engine,
playbook=_make_playbook(),
risk=MagicMock(),
db_conn=MagicMock(),
decision_logger=MagicMock(),
context_store=MagicMock(get_latest_timeframe=MagicMock(return_value=None)),
criticality_assessor=MagicMock(
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
get_timeout=MagicMock(return_value=5.0),
),
telegram=mock_telegram,
market=mock_market, # KR market
stock_code="005930",
scan_candidates={"US": {"005930": us_candidate}}, # Wrong market
)
# Should NOT have rsi/volume_ratio because candidate is under US, not KR
market_data = engine.evaluate.call_args[0][2]
assert "rsi" not in market_data
assert "volume_ratio" not in market_data
@pytest.mark.asyncio
async def test_scenario_engine_called_without_scanner_data(
self, mock_broker: MagicMock, mock_market: MagicMock, mock_telegram: MagicMock,
) -> None:
"""Test scenario engine works when stock has no scan candidate."""
engine = MagicMock(spec=ScenarioEngine)
engine.evaluate = MagicMock(return_value=_make_hold_match())
playbook = _make_playbook()
with patch("src.main.log_trade"):
await trading_cycle(
broker=mock_broker,
overseas_broker=MagicMock(),
scenario_engine=engine,
playbook=playbook,
risk=MagicMock(),
db_conn=MagicMock(),
decision_logger=MagicMock(),
context_store=MagicMock(get_latest_timeframe=MagicMock(return_value=None)),
criticality_assessor=MagicMock(
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
get_timeout=MagicMock(return_value=5.0),
),
telegram=mock_telegram,
market=mock_market,
stock_code="005930",
scan_candidates={}, # No scanner data
)
# Should still work, just without rsi/volume_ratio
engine.evaluate.assert_called_once()
market_data = engine.evaluate.call_args[0][2]
assert "rsi" not in market_data
assert "volume_ratio" not in market_data
assert market_data["current_price"] == 50000.0
@pytest.mark.asyncio
async def test_scenario_matched_notification_sent(
self, mock_broker: MagicMock, mock_market: MagicMock, mock_telegram: MagicMock,
) -> None:
"""Test telegram notification sent when a scenario matches."""
# Create a match with matched_scenario (not None)
scenario = StockScenario(
condition=StockCondition(rsi_below=30),
action=ScenarioAction.BUY,
confidence=88,
rationale="RSI oversold bounce",
)
match = ScenarioMatch(
stock_code="005930",
matched_scenario=scenario,
action=ScenarioAction.BUY,
confidence=88,
rationale="RSI oversold bounce",
match_details={"rsi": 25.0},
)
engine = MagicMock(spec=ScenarioEngine)
engine.evaluate = MagicMock(return_value=match)
with patch("src.main.log_trade"):
await trading_cycle(
broker=mock_broker,
overseas_broker=MagicMock(),
scenario_engine=engine,
playbook=_make_playbook(),
risk=MagicMock(),
db_conn=MagicMock(),
decision_logger=MagicMock(),
context_store=MagicMock(get_latest_timeframe=MagicMock(return_value=None)),
criticality_assessor=MagicMock(
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
get_timeout=MagicMock(return_value=5.0),
),
telegram=mock_telegram,
market=mock_market,
stock_code="005930",
scan_candidates={},
)
# Scenario matched notification should be sent
mock_telegram.notify_scenario_matched.assert_called_once()
call_kwargs = mock_telegram.notify_scenario_matched.call_args.kwargs
assert call_kwargs["stock_code"] == "005930"
assert call_kwargs["action"] == "BUY"
assert "rsi=25.0" in call_kwargs["condition_summary"]
@pytest.mark.asyncio
async def test_no_scenario_matched_notification_on_default_hold(
self, mock_broker: MagicMock, mock_market: MagicMock, mock_telegram: MagicMock,
) -> None:
"""Test no scenario notification when default HOLD is returned."""
engine = MagicMock(spec=ScenarioEngine)
engine.evaluate = MagicMock(return_value=_make_hold_match())
with patch("src.main.log_trade"):
await trading_cycle(
broker=mock_broker,
overseas_broker=MagicMock(),
scenario_engine=engine,
playbook=_make_playbook(),
risk=MagicMock(),
db_conn=MagicMock(),
decision_logger=MagicMock(),
context_store=MagicMock(get_latest_timeframe=MagicMock(return_value=None)),
criticality_assessor=MagicMock(
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
get_timeout=MagicMock(return_value=5.0),
),
telegram=mock_telegram,
market=mock_market,
stock_code="005930",
scan_candidates={},
)
# No scenario matched notification for default HOLD
mock_telegram.notify_scenario_matched.assert_not_called()
@pytest.mark.asyncio
async def test_decision_logger_receives_scenario_match_details(
self, mock_broker: MagicMock, mock_market: MagicMock, mock_telegram: MagicMock,
) -> None:
"""Test decision logger context includes scenario match details."""
match = ScenarioMatch(
stock_code="005930",
matched_scenario=None,
action=ScenarioAction.HOLD,
confidence=0,
rationale="No match",
match_details={"rsi": 45.0, "volume_ratio": 1.2},
)
engine = MagicMock(spec=ScenarioEngine)
engine.evaluate = MagicMock(return_value=match)
decision_logger = MagicMock()
with patch("src.main.log_trade"):
await trading_cycle(
broker=mock_broker,
overseas_broker=MagicMock(),
scenario_engine=engine,
playbook=_make_playbook(),
risk=MagicMock(),
db_conn=MagicMock(),
decision_logger=decision_logger,
context_store=MagicMock(get_latest_timeframe=MagicMock(return_value=None)),
criticality_assessor=MagicMock(
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
get_timeout=MagicMock(return_value=5.0),
),
telegram=mock_telegram,
market=mock_market,
stock_code="005930",
scan_candidates={},
)
decision_logger.log_decision.assert_called_once()
call_kwargs = decision_logger.log_decision.call_args.kwargs
assert "scenario_match" in call_kwargs["context_snapshot"]
assert call_kwargs["context_snapshot"]["scenario_match"]["rsi"] == 45.0
@pytest.mark.asyncio
async def test_reduce_all_does_not_execute_order(
self, mock_broker: MagicMock, mock_market: MagicMock, mock_telegram: MagicMock,
) -> None:
"""Test REDUCE_ALL action does not trigger order execution."""
match = ScenarioMatch(
stock_code="005930",
matched_scenario=None,
action=ScenarioAction.REDUCE_ALL,
confidence=100,
rationale="Global rule: portfolio loss > 2%",
)
engine = MagicMock(spec=ScenarioEngine)
engine.evaluate = MagicMock(return_value=match)
with patch("src.main.log_trade"):
await trading_cycle(
broker=mock_broker,
overseas_broker=MagicMock(),
scenario_engine=engine,
playbook=_make_playbook(),
risk=MagicMock(),
db_conn=MagicMock(),
decision_logger=MagicMock(),
context_store=MagicMock(get_latest_timeframe=MagicMock(return_value=None)),
criticality_assessor=MagicMock(
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
get_timeout=MagicMock(return_value=5.0),
),
telegram=mock_telegram,
market=mock_market,
stock_code="005930",
scan_candidates={},
)
# REDUCE_ALL is not BUY or SELL — no order sent
mock_broker.send_order.assert_not_called()
mock_telegram.notify_trade_execution.assert_not_called()

View File

@@ -0,0 +1,552 @@
"""Tests for PreMarketPlanner — Gemini prompt builder + response parser."""
from __future__ import annotations
import json
from datetime import date
from unittest.mock import AsyncMock, MagicMock
import pytest
from src.analysis.smart_scanner import ScanCandidate
from src.brain.gemini_client import TradeDecision
from src.config import Settings
from src.context.store import ContextLayer
from src.strategy.models import (
CrossMarketContext,
DayPlaybook,
MarketOutlook,
PlaybookStatus,
ScenarioAction,
)
from src.strategy.pre_market_planner import PreMarketPlanner
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
def _candidate(
code: str = "005930",
name: str = "Samsung",
price: float = 71000,
rsi: float = 28.5,
volume_ratio: float = 3.2,
signal: str = "oversold",
score: float = 82.0,
) -> ScanCandidate:
return ScanCandidate(
stock_code=code,
name=name,
price=price,
volume=1_500_000,
volume_ratio=volume_ratio,
rsi=rsi,
signal=signal,
score=score,
)
def _gemini_response_json(
outlook: str = "neutral_to_bullish",
stocks: list[dict] | None = None,
global_rules: list[dict] | None = None,
) -> str:
"""Build a valid Gemini JSON response."""
if stocks is None:
stocks = [
{
"stock_code": "005930",
"scenarios": [
{
"condition": {"rsi_below": 30, "volume_ratio_above": 2.5},
"action": "BUY",
"confidence": 85,
"allocation_pct": 15.0,
"stop_loss_pct": -2.0,
"take_profit_pct": 4.0,
"rationale": "Oversold bounce with high volume",
}
],
}
]
if global_rules is None:
global_rules = [
{
"condition": "portfolio_pnl_pct < -2.0",
"action": "REDUCE_ALL",
"rationale": "Near circuit breaker",
}
]
return json.dumps(
{"market_outlook": outlook, "global_rules": global_rules, "stocks": stocks}
)
def _make_planner(
gemini_response: str = "",
token_count: int = 200,
context_data: dict | None = None,
scorecard_data: dict | None = None,
) -> PreMarketPlanner:
"""Create a PreMarketPlanner with mocked dependencies."""
if not gemini_response:
gemini_response = _gemini_response_json()
# Mock GeminiClient
gemini = AsyncMock()
gemini.decide = AsyncMock(
return_value=TradeDecision(
action="HOLD",
confidence=0,
rationale=gemini_response,
token_count=token_count,
)
)
# Mock ContextStore
store = MagicMock()
store.get_context = MagicMock(return_value=scorecard_data)
# Mock ContextSelector
selector = MagicMock()
selector.select_layers = MagicMock(return_value=[ContextLayer.L7_REALTIME, ContextLayer.L6_DAILY])
selector.get_context_data = MagicMock(return_value=context_data or {})
settings = Settings(
KIS_APP_KEY="test",
KIS_APP_SECRET="test",
KIS_ACCOUNT_NO="12345678-01",
GEMINI_API_KEY="test",
)
return PreMarketPlanner(gemini, store, selector, settings)
# ---------------------------------------------------------------------------
# generate_playbook
# ---------------------------------------------------------------------------
class TestGeneratePlaybook:
@pytest.mark.asyncio
async def test_basic_generation(self) -> None:
planner = _make_planner()
candidates = [_candidate()]
pb = await planner.generate_playbook("KR", candidates, today=date(2026, 2, 8))
assert isinstance(pb, DayPlaybook)
assert pb.market == "KR"
assert pb.stock_count == 1
assert pb.scenario_count == 1
assert pb.market_outlook == MarketOutlook.NEUTRAL_TO_BULLISH
assert pb.token_count == 200
@pytest.mark.asyncio
async def test_empty_candidates_returns_empty_playbook(self) -> None:
planner = _make_planner()
pb = await planner.generate_playbook("KR", [], today=date(2026, 2, 8))
assert pb.stock_count == 0
assert pb.scenario_count == 0
assert pb.market_outlook == MarketOutlook.NEUTRAL
@pytest.mark.asyncio
async def test_gemini_failure_returns_defensive(self) -> None:
planner = _make_planner()
planner._gemini.decide = AsyncMock(side_effect=RuntimeError("API timeout"))
candidates = [_candidate()]
pb = await planner.generate_playbook("KR", candidates, today=date(2026, 2, 8))
assert pb.default_action == ScenarioAction.HOLD
assert pb.market_outlook == MarketOutlook.NEUTRAL_TO_BEARISH
assert pb.stock_count == 1
# Defensive playbook has stop-loss scenarios
assert pb.stock_playbooks[0].scenarios[0].action == ScenarioAction.SELL
@pytest.mark.asyncio
async def test_gemini_failure_empty_when_defensive_disabled(self) -> None:
planner = _make_planner()
planner._settings.DEFENSIVE_PLAYBOOK_ON_FAILURE = False
planner._gemini.decide = AsyncMock(side_effect=RuntimeError("fail"))
candidates = [_candidate()]
pb = await planner.generate_playbook("KR", candidates, today=date(2026, 2, 8))
assert pb.stock_count == 0
@pytest.mark.asyncio
async def test_multiple_candidates(self) -> None:
stocks = [
{
"stock_code": "005930",
"scenarios": [
{
"condition": {"rsi_below": 30},
"action": "BUY",
"confidence": 85,
"rationale": "Oversold",
}
],
},
{
"stock_code": "AAPL",
"scenarios": [
{
"condition": {"rsi_above": 75},
"action": "SELL",
"confidence": 80,
"rationale": "Overbought",
}
],
},
]
planner = _make_planner(gemini_response=_gemini_response_json(stocks=stocks))
candidates = [_candidate(), _candidate(code="AAPL", name="Apple")]
pb = await planner.generate_playbook("US", candidates, today=date(2026, 2, 8))
assert pb.stock_count == 2
codes = [sp.stock_code for sp in pb.stock_playbooks]
assert "005930" in codes
assert "AAPL" in codes
@pytest.mark.asyncio
async def test_unknown_stock_in_response_skipped(self) -> None:
stocks = [
{
"stock_code": "005930",
"scenarios": [{"condition": {"rsi_below": 30}, "action": "BUY", "confidence": 85, "rationale": "ok"}],
},
{
"stock_code": "UNKNOWN",
"scenarios": [{"condition": {"rsi_below": 20}, "action": "BUY", "confidence": 90, "rationale": "bad"}],
},
]
planner = _make_planner(gemini_response=_gemini_response_json(stocks=stocks))
candidates = [_candidate()] # Only 005930
pb = await planner.generate_playbook("KR", candidates, today=date(2026, 2, 8))
assert pb.stock_count == 1
assert pb.stock_playbooks[0].stock_code == "005930"
@pytest.mark.asyncio
async def test_global_rules_parsed(self) -> None:
planner = _make_planner()
candidates = [_candidate()]
pb = await planner.generate_playbook("KR", candidates, today=date(2026, 2, 8))
assert len(pb.global_rules) == 1
assert pb.global_rules[0].action == ScenarioAction.REDUCE_ALL
@pytest.mark.asyncio
async def test_token_count_from_decision(self) -> None:
planner = _make_planner(token_count=450)
candidates = [_candidate()]
pb = await planner.generate_playbook("KR", candidates, today=date(2026, 2, 8))
assert pb.token_count == 450
# ---------------------------------------------------------------------------
# _parse_response
# ---------------------------------------------------------------------------
class TestParseResponse:
def test_parse_full_response(self) -> None:
planner = _make_planner()
response = _gemini_response_json(outlook="bearish")
candidates = [_candidate()]
pb = planner._parse_response(response, date(2026, 2, 8), "KR", candidates, None)
assert pb.market_outlook == MarketOutlook.BEARISH
assert pb.stock_count == 1
assert pb.stock_playbooks[0].scenarios[0].confidence == 85
def test_parse_with_markdown_fences(self) -> None:
planner = _make_planner()
response = f"```json\n{_gemini_response_json()}\n```"
candidates = [_candidate()]
pb = planner._parse_response(response, date(2026, 2, 8), "KR", candidates, None)
assert pb.stock_count == 1
def test_parse_unknown_outlook_defaults_neutral(self) -> None:
planner = _make_planner()
response = _gemini_response_json(outlook="super_bullish")
candidates = [_candidate()]
pb = planner._parse_response(response, date(2026, 2, 8), "KR", candidates, None)
assert pb.market_outlook == MarketOutlook.NEUTRAL
def test_parse_scenario_with_all_condition_fields(self) -> None:
planner = _make_planner()
stocks = [
{
"stock_code": "005930",
"scenarios": [
{
"condition": {
"rsi_below": 25,
"volume_ratio_above": 3.0,
"price_change_pct_below": -2.0,
},
"action": "BUY",
"confidence": 92,
"allocation_pct": 20.0,
"stop_loss_pct": -3.0,
"take_profit_pct": 5.0,
"rationale": "Multi-condition entry",
}
],
}
]
response = _gemini_response_json(stocks=stocks)
candidates = [_candidate()]
pb = planner._parse_response(response, date(2026, 2, 8), "KR", candidates, None)
sc = pb.stock_playbooks[0].scenarios[0]
assert sc.condition.rsi_below == 25
assert sc.condition.volume_ratio_above == 3.0
assert sc.condition.price_change_pct_below == -2.0
assert sc.allocation_pct == 20.0
assert sc.stop_loss_pct == -3.0
assert sc.take_profit_pct == 5.0
def test_parse_empty_condition_scenario_skipped(self) -> None:
planner = _make_planner()
stocks = [
{
"stock_code": "005930",
"scenarios": [
{
"condition": {},
"action": "BUY",
"confidence": 85,
"rationale": "No conditions",
},
{
"condition": {"rsi_below": 30},
"action": "BUY",
"confidence": 80,
"rationale": "Valid",
},
],
}
]
response = _gemini_response_json(stocks=stocks)
candidates = [_candidate()]
pb = planner._parse_response(response, date(2026, 2, 8), "KR", candidates, None)
# Empty condition scenario skipped, valid one kept
assert pb.stock_count == 1
assert pb.stock_playbooks[0].scenarios[0].confidence == 80
def test_parse_max_scenarios_enforced(self) -> None:
planner = _make_planner()
# Settings default MAX_SCENARIOS_PER_STOCK = 5
scenarios = [
{
"condition": {"rsi_below": 20 + i},
"action": "BUY",
"confidence": 80 + i,
"rationale": f"Scenario {i}",
}
for i in range(8) # 8 scenarios, should be capped to 5
]
stocks = [{"stock_code": "005930", "scenarios": scenarios}]
response = _gemini_response_json(stocks=stocks)
candidates = [_candidate()]
pb = planner._parse_response(response, date(2026, 2, 8), "KR", candidates, None)
assert len(pb.stock_playbooks[0].scenarios) == 5
def test_parse_invalid_json_raises(self) -> None:
planner = _make_planner()
candidates = [_candidate()]
with pytest.raises(json.JSONDecodeError):
planner._parse_response("not json at all", date(2026, 2, 8), "KR", candidates, None)
def test_parse_cross_market_preserved(self) -> None:
planner = _make_planner()
response = _gemini_response_json()
candidates = [_candidate()]
cross = CrossMarketContext(market="US", date="2026-02-07", total_pnl=1.5, win_rate=60)
pb = planner._parse_response(response, date(2026, 2, 8), "KR", candidates, cross)
assert pb.cross_market is not None
assert pb.cross_market.market == "US"
assert pb.cross_market.total_pnl == 1.5
# ---------------------------------------------------------------------------
# build_cross_market_context
# ---------------------------------------------------------------------------
class TestBuildCrossMarketContext:
def test_kr_reads_us_scorecard(self) -> None:
scorecard = {"total_pnl": 2.5, "win_rate": 65, "index_change_pct": 0.8, "lessons": ["Stay patient"]}
planner = _make_planner(scorecard_data=scorecard)
ctx = planner.build_cross_market_context("KR", today=date(2026, 2, 8))
assert ctx is not None
assert ctx.market == "US"
assert ctx.total_pnl == 2.5
assert ctx.win_rate == 65
assert "Stay patient" in ctx.lessons
# Verify it queried scorecard_US
planner._context_store.get_context.assert_called_once_with(
ContextLayer.L6_DAILY, "2026-02-08", "scorecard_US"
)
def test_us_reads_kr_scorecard(self) -> None:
scorecard = {"total_pnl": -1.0, "win_rate": 40, "index_change_pct": -0.5}
planner = _make_planner(scorecard_data=scorecard)
ctx = planner.build_cross_market_context("US", today=date(2026, 2, 8))
assert ctx is not None
assert ctx.market == "KR"
assert ctx.total_pnl == -1.0
planner._context_store.get_context.assert_called_once_with(
ContextLayer.L6_DAILY, "2026-02-08", "scorecard_KR"
)
def test_no_scorecard_returns_none(self) -> None:
planner = _make_planner(scorecard_data=None)
ctx = planner.build_cross_market_context("KR", today=date(2026, 2, 8))
assert ctx is None
def test_invalid_scorecard_returns_none(self) -> None:
planner = _make_planner(scorecard_data="not a dict and not json")
ctx = planner.build_cross_market_context("KR", today=date(2026, 2, 8))
assert ctx is None
# ---------------------------------------------------------------------------
# _build_prompt
# ---------------------------------------------------------------------------
class TestBuildPrompt:
def test_prompt_contains_candidates(self) -> None:
planner = _make_planner()
candidates = [_candidate(code="005930", name="Samsung")]
prompt = planner._build_prompt("KR", candidates, {}, None)
assert "005930" in prompt
assert "Samsung" in prompt
assert "RSI=" in prompt
assert "volume_ratio=" in prompt
def test_prompt_contains_cross_market(self) -> None:
planner = _make_planner()
cross = CrossMarketContext(
market="US", date="2026-02-07", total_pnl=1.5,
win_rate=60, index_change_pct=0.8, lessons=["Cut losses early"],
)
prompt = planner._build_prompt("KR", [_candidate()], {}, cross)
assert "Other Market (US)" in prompt
assert "+1.50%" in prompt
assert "Cut losses early" in prompt
def test_prompt_contains_context_data(self) -> None:
planner = _make_planner()
context = {"L6_DAILY": {"win_rate": 0.65, "total_pnl": 2.5}}
prompt = planner._build_prompt("KR", [_candidate()], context, None)
assert "Strategic Context" in prompt
assert "L6_DAILY" in prompt
assert "win_rate" in prompt
def test_prompt_contains_max_scenarios(self) -> None:
planner = _make_planner()
prompt = planner._build_prompt("KR", [_candidate()], {}, None)
assert f"Max {planner._settings.MAX_SCENARIOS_PER_STOCK} scenarios" in prompt
def test_prompt_market_name(self) -> None:
planner = _make_planner()
prompt = planner._build_prompt("US", [_candidate()], {}, None)
assert "US market" in prompt
# ---------------------------------------------------------------------------
# _extract_json
# ---------------------------------------------------------------------------
class TestExtractJson:
def test_plain_json(self) -> None:
assert PreMarketPlanner._extract_json('{"a": 1}') == '{"a": 1}'
def test_with_json_fence(self) -> None:
text = '```json\n{"a": 1}\n```'
assert PreMarketPlanner._extract_json(text) == '{"a": 1}'
def test_with_plain_fence(self) -> None:
text = '```\n{"a": 1}\n```'
assert PreMarketPlanner._extract_json(text) == '{"a": 1}'
def test_with_whitespace(self) -> None:
text = ' \n {"a": 1} \n '
assert PreMarketPlanner._extract_json(text) == '{"a": 1}'
# ---------------------------------------------------------------------------
# Defensive playbook
# ---------------------------------------------------------------------------
class TestDefensivePlaybook:
def test_defensive_has_stop_loss(self) -> None:
candidates = [_candidate(code="005930"), _candidate(code="AAPL")]
pb = PreMarketPlanner._defensive_playbook(date(2026, 2, 8), "KR", candidates)
assert pb.default_action == ScenarioAction.HOLD
assert pb.market_outlook == MarketOutlook.NEUTRAL_TO_BEARISH
assert pb.stock_count == 2
for sp in pb.stock_playbooks:
assert sp.scenarios[0].action == ScenarioAction.SELL
assert sp.scenarios[0].stop_loss_pct == -3.0
def test_defensive_has_global_rule(self) -> None:
pb = PreMarketPlanner._defensive_playbook(date(2026, 2, 8), "KR", [_candidate()])
assert len(pb.global_rules) == 1
assert pb.global_rules[0].action == ScenarioAction.REDUCE_ALL
def test_empty_playbook(self) -> None:
pb = PreMarketPlanner._empty_playbook(date(2026, 2, 8), "US")
assert pb.stock_count == 0
assert pb.market == "US"
assert pb.market_outlook == MarketOutlook.NEUTRAL

View File

@@ -160,6 +160,83 @@ class TestNotificationSending:
assert "250.50" in payload["text"]
assert "92%" in payload["text"]
@pytest.mark.asyncio
async def test_playbook_generated_format(self) -> None:
"""Playbook generated notification has expected fields."""
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True
)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
await client.notify_playbook_generated(
market="KR",
stock_count=4,
scenario_count=12,
token_count=980,
)
payload = mock_post.call_args.kwargs["json"]
assert "Playbook Generated" in payload["text"]
assert "Market: KR" in payload["text"]
assert "Stocks: 4" in payload["text"]
assert "Scenarios: 12" in payload["text"]
assert "Tokens: 980" in payload["text"]
@pytest.mark.asyncio
async def test_scenario_matched_format(self) -> None:
"""Scenario matched notification has expected fields."""
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True
)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
await client.notify_scenario_matched(
stock_code="AAPL",
action="BUY",
condition_summary="RSI < 30, volume_ratio > 2.0",
confidence=88.2,
)
payload = mock_post.call_args.kwargs["json"]
assert "Scenario Matched" in payload["text"]
assert "AAPL" in payload["text"]
assert "Action: BUY" in payload["text"]
assert "RSI < 30" in payload["text"]
assert "88%" in payload["text"]
@pytest.mark.asyncio
async def test_playbook_failed_format(self) -> None:
"""Playbook failed notification has expected fields."""
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True
)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
await client.notify_playbook_failed(
market="US",
reason="Gemini timeout",
)
payload = mock_post.call_args.kwargs["json"]
assert "Playbook Failed" in payload["text"]
assert "Market: US" in payload["text"]
assert "Gemini timeout" in payload["text"]
@pytest.mark.asyncio
async def test_circuit_breaker_priority(self) -> None:
"""Circuit breaker uses CRITICAL priority."""
@@ -309,6 +386,73 @@ class TestMessagePriorities:
payload = mock_post.call_args.kwargs["json"]
assert NotificationPriority.CRITICAL.emoji in payload["text"]
@pytest.mark.asyncio
async def test_playbook_generated_priority(self) -> None:
"""Playbook generated uses MEDIUM priority emoji."""
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True
)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
await client.notify_playbook_generated(
market="KR",
stock_count=2,
scenario_count=4,
token_count=123,
)
payload = mock_post.call_args.kwargs["json"]
assert NotificationPriority.MEDIUM.emoji in payload["text"]
@pytest.mark.asyncio
async def test_playbook_failed_priority(self) -> None:
"""Playbook failed uses HIGH priority emoji."""
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True
)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
await client.notify_playbook_failed(
market="KR",
reason="Invalid JSON",
)
payload = mock_post.call_args.kwargs["json"]
assert NotificationPriority.HIGH.emoji in payload["text"]
@pytest.mark.asyncio
async def test_scenario_matched_priority(self) -> None:
"""Scenario matched uses HIGH priority emoji."""
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True
)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
await client.notify_scenario_matched(
stock_code="AAPL",
action="BUY",
condition_summary="RSI < 30",
confidence=80.0,
)
payload = mock_post.call_args.kwargs["json"]
assert NotificationPriority.HIGH.emoji in payload["text"]
class TestClientCleanup:
"""Test client cleanup behavior."""