Compare commits

...

12 Commits

Author SHA1 Message Date
agentson
9a8936ab34 docs: add plan-implementation consistency check to code review checklist (#114)
Some checks failed
CI / test (pull_request) Has been cancelled
리뷰 시 플랜과 구현의 일치 여부를 필수로 확인하는 규칙 추가.
- workflow.md에 Code Review Checklist 섹션 신설
- requirements-log.md에 사용자 요구사항 기록

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-10 04:15:51 +09:00
9171e54652 Merge pull request 'feat: integrate scenario engine and playbook into main trading loop (issue #84)' (#110) from feature/issue-84-main-integration into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #110
2026-02-09 23:18:24 +09:00
agentson
d64e072f06 fix: PR review — DB reload, market-local date, market-scoped scan_candidates
Some checks failed
CI / test (pull_request) Has been cancelled
Address PR #110 review findings:

1. High — Realtime mode now loads playbook from DB before calling Gemini,
   preventing duplicate API calls on process restart (4/day budget).
2. Medium — Pass market-local date (via market.timezone) to
   generate_playbook() and _empty_playbook() instead of date.today().
3. Medium — scan_candidates restructured from {stock_code: candidate}
   to {market_code: {stock_code: candidate}} to prevent KR/US symbol
   collision.

New test: test_scan_candidates_market_scoped verifies cross-market
isolation.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 23:00:06 +09:00
agentson
b2312fbe01 fix: resolve lint issues in main.py and test_main.py
Some checks failed
CI / test (pull_request) Has been cancelled
Remove unused imports (sys, ScenarioMatch, asyncio, StockPlaybook),
fix import ordering, and split long lines for ruff compliance.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 22:28:31 +09:00
agentson
98c4a2413c feat: integrate scenario engine and playbook into main trading loop (issue #84)
Replace brain.decide() with scenario_engine.evaluate() in trading_cycle
and brain.decide_batch() with per-stock scenario evaluation in
run_daily_session. Initialize PreMarketPlanner, ScenarioEngine, and
PlaybookStore in run(). Add pre-market playbook generation on market
open (1 Gemini call per market per day), market_data enrichment from
scanner metrics (rsi, volume_ratio), portfolio_data for global rules,
scenario match notifications, and playbook lifecycle management.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 22:24:19 +09:00
6fba7c7ae8 Merge pull request 'feat: implement pre-market planner with Gemini integration (issue #83)' (#109) from feature/issue-83-pre-market-planner into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #109
2026-02-08 22:07:36 +09:00
agentson
be695a5d7c fix: address PR review — inject today param, remove unused imports, fix lint
Some checks failed
CI / test (pull_request) Has been cancelled
Review findings addressed:
- Finding 1 (ImportError): false positive — ContextLayer is re-exported from
  src.context.store, import works correctly at runtime
- Finding 2 (timezone): generate_playbook() and build_cross_market_context()
  now accept optional today parameter for market-local date injection
- Finding 3 (lint): removed unused imports (UTC, datetime, PlaybookStatus),
  fixed line-too-long in prompt template
- Tests simplified: replaced date patching with direct today= parameter

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 21:57:39 +09:00
agentson
6471e66d89 fix: correct Settings field name in planner tests (KIS_ACCOUNT_NO)
Some checks failed
CI / test (pull_request) Has been cancelled
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 21:36:42 +09:00
agentson
149039a904 feat: implement pre-market planner with Gemini integration (issue #83)
PreMarketPlanner generates DayPlaybook via single Gemini API call per market:
- Structured JSON prompt with scan candidates + strategic context
- Cross-market context (KR reads US scorecard, US reads KR scorecard)
- Robust JSON parser with markdown fence stripping
- Unknown stock filtering (only scanner candidates allowed)
- MAX_SCENARIOS_PER_STOCK enforcement
- Defensive playbook on failure (HOLD + stop-loss)
- Empty playbook when no candidates (safe, no trades)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 21:35:57 +09:00
815d675529 Merge pull request 'feat: add Telegram playbook notifications (issue #81)' (#108) from feature/issue-81-telegram-playbook-notify into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #108
2026-02-08 21:27:46 +09:00
agentson
e8634b93c3 feat: add Telegram playbook notifications (issue #81)
Some checks failed
CI / test (pull_request) Has been cancelled
- notify_playbook_generated(): market, stock/scenario count, token usage (MEDIUM)
- notify_scenario_matched(): stock, action, condition, confidence (HIGH)
- notify_playbook_failed(): market, reason with 200-char truncation (HIGH)
- 6 new tests: 3 format + 3 priority validations

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 21:25:16 +09:00
f20736fd2a Merge pull request 'feat: add playbook persistence with DB schema and CRUD store (issue #82)' (#107) from feature/issue-82-playbook-persistence into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #107
2026-02-08 21:07:13 +09:00
8 changed files with 1924 additions and 100 deletions

View File

@@ -64,3 +64,25 @@
**참고:** Realtime 모드 전용. Daily 모드는 배치 효율성을 위해 정적 watchlist 사용. **참고:** Realtime 모드 전용. Daily 모드는 배치 효율성을 위해 정적 watchlist 사용.
**이슈/PR:** #76, #77 **이슈/PR:** #76, #77
---
## 2026-02-10
### 코드 리뷰 시 플랜-구현 일치 검증 규칙
**배경:**
- 코드 리뷰 시 플랜(EnterPlanMode에서 승인된 계획)과 실제 구현이 일치하는지 확인하는 절차가 없었음
- 플랜과 다른 구현이 리뷰 없이 통과될 위험
**요구사항:**
1. 모든 PR 리뷰에서 플랜-구현 일치 여부를 필수 체크
2. 플랜에 없는 변경은 정당한 사유 필요
3. 플랜 항목이 누락되면 PR 설명에 사유 기록
4. 스코프가 플랜과 일치하는지 확인
**구현 결과:**
- `docs/workflow.md`에 Code Review Checklist 섹션 추가
- Plan Consistency (필수), Safety & Constraints, Quality, Workflow 4개 카테고리
**이슈/PR:** #114

View File

@@ -74,3 +74,37 @@ task_tool(
``` ```
Use `run_in_background=True` for independent tasks that don't block subsequent work. Use `run_in_background=True` for independent tasks that don't block subsequent work.
## Code Review Checklist
**CRITICAL: Every PR review MUST verify plan-implementation consistency.**
Before approving any PR, the reviewer (human or agent) must check ALL of the following:
### 1. Plan Consistency (MANDATORY)
- [ ] **Implementation matches the approved plan** — Compare the actual code changes against the plan created during `EnterPlanMode`. Every item in the plan must be addressed.
- [ ] **No unplanned changes** — If the implementation includes changes not in the plan, they must be explicitly justified.
- [ ] **No plan items omitted** — If any planned item was skipped, the reason must be documented in the PR description.
- [ ] **Scope matches** — The PR does not exceed or fall short of the planned scope.
### 2. Safety & Constraints
- [ ] `src/core/risk_manager.py` is unchanged (READ-ONLY)
- [ ] Circuit breaker threshold not weakened (only stricter allowed)
- [ ] Fat-finger protection (30% max order) still enforced
- [ ] Confidence < 80 still forces HOLD
- [ ] No hardcoded API keys or secrets
### 3. Quality
- [ ] All new/modified code has corresponding tests
- [ ] Test coverage >= 80%
- [ ] `ruff check src/ tests/` passes (no lint errors)
- [ ] No `assert` statements removed from tests
### 4. Workflow
- [ ] PR references the Gitea issue number
- [ ] Feature branch follows naming convention (`feature/issue-N-description`)
- [ ] Commit messages are clear and descriptive

View File

@@ -10,14 +10,14 @@ import argparse
import asyncio import asyncio
import logging import logging
import signal import signal
import sys
from datetime import UTC, datetime from datetime import UTC, datetime
from typing import Any from typing import Any
from src.analysis.scanner import MarketScanner from src.analysis.scanner import MarketScanner
from src.analysis.smart_scanner import ScanCandidate, SmartVolatilityScanner from src.analysis.smart_scanner import ScanCandidate, SmartVolatilityScanner
from src.analysis.volatility import VolatilityAnalyzer from src.analysis.volatility import VolatilityAnalyzer
from src.brain.gemini_client import GeminiClient from src.brain.context_selector import ContextSelector
from src.brain.gemini_client import GeminiClient, TradeDecision
from src.broker.kis_api import KISBroker from src.broker.kis_api import KISBroker
from src.broker.overseas import OverseasBroker from src.broker.overseas import OverseasBroker
from src.config import Settings from src.config import Settings
@@ -31,6 +31,10 @@ from src.logging.decision_logger import DecisionLogger
from src.logging_config import setup_logging from src.logging_config import setup_logging
from src.markets.schedule import MarketInfo, get_next_market_open, get_open_markets from src.markets.schedule import MarketInfo, get_next_market_open, get_open_markets
from src.notifications.telegram_client import TelegramClient, TelegramCommandHandler from src.notifications.telegram_client import TelegramClient, TelegramCommandHandler
from src.strategy.models import DayPlaybook
from src.strategy.playbook_store import PlaybookStore
from src.strategy.pre_market_planner import PreMarketPlanner
from src.strategy.scenario_engine import ScenarioEngine
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -75,7 +79,8 @@ TRADE_SESSION_INTERVAL_HOURS = 6 # Hours between sessions
async def trading_cycle( async def trading_cycle(
broker: KISBroker, broker: KISBroker,
overseas_broker: OverseasBroker, overseas_broker: OverseasBroker,
brain: GeminiClient, scenario_engine: ScenarioEngine,
playbook: DayPlaybook,
risk: RiskManager, risk: RiskManager,
db_conn: Any, db_conn: Any,
decision_logger: DecisionLogger, decision_logger: DecisionLogger,
@@ -84,7 +89,7 @@ async def trading_cycle(
telegram: TelegramClient, telegram: TelegramClient,
market: MarketInfo, market: MarketInfo,
stock_code: str, stock_code: str,
scan_candidates: dict[str, ScanCandidate], scan_candidates: dict[str, dict[str, ScanCandidate]],
) -> None: ) -> None:
"""Execute one trading cycle for a single stock.""" """Execute one trading cycle for a single stock."""
cycle_start_time = asyncio.get_event_loop().time() cycle_start_time = asyncio.get_event_loop().time()
@@ -135,13 +140,27 @@ async def trading_cycle(
else 0.0 else 0.0
) )
market_data = { market_data: dict[str, Any] = {
"stock_code": stock_code, "stock_code": stock_code,
"market_name": market.name, "market_name": market.name,
"current_price": current_price, "current_price": current_price,
"foreigner_net": foreigner_net, "foreigner_net": foreigner_net,
} }
# Enrich market_data with scanner metrics for scenario engine
market_candidates = scan_candidates.get(market.code, {})
candidate = market_candidates.get(stock_code)
if candidate:
market_data["rsi"] = candidate.rsi
market_data["volume_ratio"] = candidate.volume_ratio
# Build portfolio data for global rule evaluation
portfolio_data = {
"portfolio_pnl_pct": pnl_pct,
"total_cash": total_cash,
"total_eval": total_eval,
}
# 1.5. Get volatility metrics from context store (L7_REALTIME) # 1.5. Get volatility metrics from context store (L7_REALTIME)
latest_timeframe = context_store.get_latest_timeframe(ContextLayer.L7_REALTIME) latest_timeframe = context_store.get_latest_timeframe(ContextLayer.L7_REALTIME)
volatility_score = 50.0 # Default normal volatility volatility_score = 50.0 # Default normal volatility
@@ -178,8 +197,13 @@ async def trading_cycle(
volume_surge, volume_surge,
) )
# 2. Ask the brain for a decision # 2. Evaluate scenario (local, no API call)
decision = await brain.decide(market_data) match = scenario_engine.evaluate(playbook, stock_code, market_data, portfolio_data)
decision = TradeDecision(
action=match.action.value,
confidence=match.confidence,
rationale=match.rationale,
)
logger.info( logger.info(
"Decision for %s (%s): %s (confidence=%d)", "Decision for %s (%s): %s (confidence=%d)",
stock_code, stock_code,
@@ -188,6 +212,19 @@ async def trading_cycle(
decision.confidence, decision.confidence,
) )
# 2.1. Notify scenario match
if match.matched_scenario is not None:
try:
condition_parts = [f"{k}={v}" for k, v in match.match_details.items()]
await telegram.notify_scenario_matched(
stock_code=stock_code,
action=decision.action,
condition_summary=", ".join(condition_parts) if condition_parts else "matched",
confidence=float(decision.confidence),
)
except Exception as exc:
logger.warning("Scenario matched notification failed: %s", exc)
# 2.5. Log decision with context snapshot # 2.5. Log decision with context snapshot
context_snapshot = { context_snapshot = {
"L1": { "L1": {
@@ -200,7 +237,7 @@ async def trading_cycle(
"purchase_total": purchase_total, "purchase_total": purchase_total,
"pnl_pct": pnl_pct, "pnl_pct": pnl_pct,
}, },
# L3-L7 will be populated when context tree is implemented "scenario_match": match.match_details,
} }
input_data = { input_data = {
"current_price": current_price, "current_price": current_price,
@@ -279,8 +316,8 @@ async def trading_cycle(
# 6. Log trade with selection context # 6. Log trade with selection context
selection_context = None selection_context = None
if stock_code in scan_candidates: if stock_code in market_candidates:
candidate = scan_candidates[stock_code] candidate = market_candidates[stock_code]
selection_context = { selection_context = {
"rsi": candidate.rsi, "rsi": candidate.rsi,
"volume_ratio": candidate.volume_ratio, "volume_ratio": candidate.volume_ratio,
@@ -324,7 +361,9 @@ async def trading_cycle(
async def run_daily_session( async def run_daily_session(
broker: KISBroker, broker: KISBroker,
overseas_broker: OverseasBroker, overseas_broker: OverseasBroker,
brain: GeminiClient, scenario_engine: ScenarioEngine,
playbook_store: PlaybookStore,
pre_market_planner: PreMarketPlanner,
risk: RiskManager, risk: RiskManager,
db_conn: Any, db_conn: Any,
decision_logger: DecisionLogger, decision_logger: DecisionLogger,
@@ -336,10 +375,8 @@ async def run_daily_session(
) -> None: ) -> None:
"""Execute one daily trading session. """Execute one daily trading session.
Designed for API efficiency with Gemini Free tier: V2 proactive strategy: 1 Gemini call for playbook generation,
- Batch decision making (1 API call per market) then local scenario evaluation per stock (0 API calls).
- Runs N times per day at fixed intervals
- Minimizes API usage while maintaining trading capability
""" """
# Get currently open markets # Get currently open markets
open_markets = get_open_markets(settings.enabled_market_list) open_markets = get_open_markets(settings.enabled_market_list)
@@ -352,27 +389,66 @@ async def run_daily_session(
# Process each open market # Process each open market
for market in open_markets: for market in open_markets:
# Use market-local date for playbook keying
market_today = datetime.now(market.timezone).date()
# Dynamic stock discovery via scanner (no static watchlists) # Dynamic stock discovery via scanner (no static watchlists)
candidates_list: list[ScanCandidate] = []
try: try:
candidates = await smart_scanner.scan() candidates_list = await smart_scanner.scan() if smart_scanner else []
watchlist = [c.stock_code for c in candidates] if candidates else []
except Exception as exc: except Exception as exc:
logger.error("Smart Scanner failed for %s: %s", market.name, exc) logger.error("Smart Scanner failed for %s: %s", market.name, exc)
watchlist = []
if not watchlist: if not candidates_list:
logger.info("No scanner candidates for market %s — skipping", market.code) logger.info("No scanner candidates for market %s — skipping", market.code)
continue continue
watchlist = [c.stock_code for c in candidates_list]
candidate_map = {c.stock_code: c for c in candidates_list}
logger.info("Processing market: %s (%d stocks)", market.name, len(watchlist)) logger.info("Processing market: %s (%d stocks)", market.name, len(watchlist))
# Generate or load playbook (1 Gemini API call per market per day)
playbook = playbook_store.load(market_today, market.code)
if playbook is None:
try:
playbook = await pre_market_planner.generate_playbook(
market=market.code,
candidates=candidates_list,
today=market_today,
)
playbook_store.save(playbook)
try:
await telegram.notify_playbook_generated(
market=market.code,
stock_count=playbook.stock_count,
scenario_count=playbook.scenario_count,
token_count=playbook.token_count,
)
except Exception as exc:
logger.warning("Playbook notification failed: %s", exc)
logger.info(
"Generated playbook for %s: %d stocks, %d scenarios",
market.code, playbook.stock_count, playbook.scenario_count,
)
except Exception as exc:
logger.error("Playbook generation failed for %s: %s", market.code, exc)
try:
await telegram.notify_playbook_failed(
market=market.code, reason=str(exc)[:200],
)
except Exception as notify_exc:
logger.warning("Playbook failed notification error: %s", notify_exc)
playbook = PreMarketPlanner._empty_playbook(market_today, market.code)
# Collect market data for all stocks from scanner # Collect market data for all stocks from scanner
stocks_data = [] stocks_data = []
for stock_code in watchlist: for stock_code in watchlist:
try: try:
if market.is_domestic: if market.is_domestic:
orderbook = await broker.get_orderbook(stock_code) orderbook = await broker.get_orderbook(stock_code)
current_price = safe_float(orderbook.get("output1", {}).get("stck_prpr", "0")) current_price = safe_float(
orderbook.get("output1", {}).get("stck_prpr", "0")
)
foreigner_net = safe_float( foreigner_net = safe_float(
orderbook.get("output1", {}).get("frgn_ntby_qty", "0") orderbook.get("output1", {}).get("frgn_ntby_qty", "0")
) )
@@ -380,17 +456,23 @@ async def run_daily_session(
price_data = await overseas_broker.get_overseas_price( price_data = await overseas_broker.get_overseas_price(
market.exchange_code, stock_code market.exchange_code, stock_code
) )
current_price = safe_float(price_data.get("output", {}).get("last", "0")) current_price = safe_float(
price_data.get("output", {}).get("last", "0")
)
foreigner_net = 0.0 foreigner_net = 0.0
stocks_data.append( stock_data: dict[str, Any] = {
{
"stock_code": stock_code, "stock_code": stock_code,
"market_name": market.name, "market_name": market.name,
"current_price": current_price, "current_price": current_price,
"foreigner_net": foreigner_net, "foreigner_net": foreigner_net,
} }
) # Enrich with scanner metrics
cand = candidate_map.get(stock_code)
if cand:
stock_data["rsi"] = cand.rsi
stock_data["volume_ratio"] = cand.volume_ratio
stocks_data.append(stock_data)
except Exception as exc: except Exception as exc:
logger.error("Failed to fetch data for %s: %s", stock_code, exc) logger.error("Failed to fetch data for %s: %s", stock_code, exc)
continue continue
@@ -399,17 +481,19 @@ async def run_daily_session(
logger.warning("No valid stock data for market %s", market.code) logger.warning("No valid stock data for market %s", market.code)
continue continue
# Get batch decisions (1 API call for all stocks in this market)
logger.info("Requesting batch decision for %d stocks in %s", len(stocks_data), market.name)
decisions = await brain.decide_batch(stocks_data)
# Get balance data once for the market # Get balance data once for the market
if market.is_domestic: if market.is_domestic:
balance_data = await broker.get_balance() balance_data = await broker.get_balance()
output2 = balance_data.get("output2", [{}]) output2 = balance_data.get("output2", [{}])
total_eval = safe_float(output2[0].get("tot_evlu_amt", "0")) if output2 else 0 total_eval = safe_float(
total_cash = safe_float(output2[0].get("dnca_tot_amt", "0")) if output2 else 0 output2[0].get("tot_evlu_amt", "0")
purchase_total = safe_float(output2[0].get("pchs_amt_smtl_amt", "0")) if output2 else 0 ) if output2 else 0
total_cash = safe_float(
output2[0].get("dnca_tot_amt", "0")
) if output2 else 0
purchase_total = safe_float(
output2[0].get("pchs_amt_smtl_amt", "0")
) if output2 else 0
else: else:
balance_data = await overseas_broker.get_overseas_balance(market.exchange_code) balance_data = await overseas_broker.get_overseas_balance(market.exchange_code)
output2 = balance_data.get("output2", [{}]) output2 = balance_data.get("output2", [{}])
@@ -422,21 +506,37 @@ async def run_daily_session(
total_eval = safe_float(balance_info.get("frcr_evlu_tota", "0") or "0") total_eval = safe_float(balance_info.get("frcr_evlu_tota", "0") or "0")
total_cash = safe_float(balance_info.get("frcr_dncl_amt_2", "0") or "0") total_cash = safe_float(balance_info.get("frcr_dncl_amt_2", "0") or "0")
purchase_total = safe_float(balance_info.get("frcr_buy_amt_smtl", "0") or "0") purchase_total = safe_float(
balance_info.get("frcr_buy_amt_smtl", "0") or "0"
)
# Calculate daily P&L % # Calculate daily P&L %
pnl_pct = ( pnl_pct = (
((total_eval - purchase_total) / purchase_total * 100) if purchase_total > 0 else 0.0 ((total_eval - purchase_total) / purchase_total * 100)
if purchase_total > 0
else 0.0
) )
portfolio_data = {
"portfolio_pnl_pct": pnl_pct,
"total_cash": total_cash,
"total_eval": total_eval,
}
# Execute decisions for each stock # Evaluate scenarios for each stock (local, no API calls)
logger.info(
"Evaluating %d stocks against playbook for %s",
len(stocks_data), market.name,
)
for stock_data in stocks_data: for stock_data in stocks_data:
stock_code = stock_data["stock_code"] stock_code = stock_data["stock_code"]
decision = decisions.get(stock_code) match = scenario_engine.evaluate(
playbook, stock_code, stock_data, portfolio_data,
if not decision: )
logger.warning("No decision for %s — skipping", stock_code) decision = TradeDecision(
continue action=match.action.value,
confidence=match.confidence,
rationale=match.rationale,
)
logger.info( logger.info(
"Decision for %s (%s): %s (confidence=%d)", "Decision for %s (%s): %s (confidence=%d)",
@@ -458,6 +558,7 @@ async def run_daily_session(
"purchase_total": purchase_total, "purchase_total": purchase_total,
"pnl_pct": pnl_pct, "pnl_pct": pnl_pct,
}, },
"scenario_match": match.match_details,
} }
input_data = { input_data = {
"current_price": stock_data["current_price"], "current_price": stock_data["current_price"],
@@ -509,7 +610,9 @@ async def run_daily_session(
threshold=exc.threshold, threshold=exc.threshold,
) )
except Exception as notify_exc: except Exception as notify_exc:
logger.warning("Circuit breaker notification failed: %s", notify_exc) logger.warning(
"Circuit breaker notification failed: %s", notify_exc
)
raise raise
# Send order # Send order
@@ -544,7 +647,9 @@ async def run_daily_session(
except Exception as exc: except Exception as exc:
logger.warning("Telegram notification failed: %s", exc) logger.warning("Telegram notification failed: %s", exc)
except Exception as exc: except Exception as exc:
logger.error("Order execution failed for %s: %s", stock_code, exc) logger.error(
"Order execution failed for %s: %s", stock_code, exc
)
continue continue
# Log trade # Log trade
@@ -571,6 +676,20 @@ async def run(settings: Settings) -> None:
decision_logger = DecisionLogger(db_conn) decision_logger = DecisionLogger(db_conn)
context_store = ContextStore(db_conn) context_store = ContextStore(db_conn)
# V2 proactive strategy components
context_selector = ContextSelector(context_store)
scenario_engine = ScenarioEngine()
playbook_store = PlaybookStore(db_conn)
pre_market_planner = PreMarketPlanner(
gemini_client=brain,
context_store=context_store,
context_selector=context_selector,
settings=settings,
)
# Track playbooks per market (in-memory cache)
playbooks: dict[str, DayPlaybook] = {}
# Initialize Telegram notifications # Initialize Telegram notifications
telegram = TelegramClient( telegram = TelegramClient(
bot_token=settings.TELEGRAM_BOT_TOKEN, bot_token=settings.TELEGRAM_BOT_TOKEN,
@@ -732,8 +851,8 @@ async def run(settings: Settings) -> None:
settings=settings, settings=settings,
) )
# Track scan candidates for selection context logging # Track scan candidates per market for selection context logging
scan_candidates: dict[str, ScanCandidate] = {} # stock_code -> candidate scan_candidates: dict[str, dict[str, ScanCandidate]] = {} # market -> {stock_code -> candidate}
# Active stocks per market (dynamically discovered by scanner) # Active stocks per market (dynamically discovered by scanner)
active_stocks: dict[str, list[str]] = {} # market_code -> [stock_codes] active_stocks: dict[str, list[str]] = {} # market_code -> [stock_codes]
@@ -802,7 +921,9 @@ async def run(settings: Settings) -> None:
await run_daily_session( await run_daily_session(
broker, broker,
overseas_broker, overseas_broker,
brain, scenario_engine,
playbook_store,
pre_market_planner,
risk, risk,
db_conn, db_conn,
decision_logger, decision_logger,
@@ -850,6 +971,8 @@ async def run(settings: Settings) -> None:
except Exception as exc: except Exception as exc:
logger.warning("Market close notification failed: %s", exc) logger.warning("Market close notification failed: %s", exc)
_market_states[market_code] = False _market_states[market_code] = False
# Clear playbook for closed market (new one generated next open)
playbooks.pop(market_code, None)
# No markets open — wait until next market opens # No markets open — wait until next market opens
try: try:
@@ -887,7 +1010,8 @@ async def run(settings: Settings) -> None:
# Smart Scanner: dynamic stock discovery (no static watchlists) # Smart Scanner: dynamic stock discovery (no static watchlists)
now_timestamp = asyncio.get_event_loop().time() now_timestamp = asyncio.get_event_loop().time()
last_scan = last_scan_time.get(market.code, 0.0) last_scan = last_scan_time.get(market.code, 0.0)
if now_timestamp - last_scan >= SCAN_INTERVAL_SECONDS: rescan_interval = settings.RESCAN_INTERVAL_SECONDS
if now_timestamp - last_scan >= rescan_interval:
try: try:
logger.info("Smart Scanner: Scanning %s market", market.name) logger.info("Smart Scanner: Scanning %s market", market.name)
@@ -899,9 +1023,10 @@ async def run(settings: Settings) -> None:
candidates candidates
) )
# Store candidates for selection context logging # Store candidates per market for selection context logging
for candidate in candidates: scan_candidates[market.code] = {
scan_candidates[candidate.stock_code] = candidate c.stock_code: c for c in candidates
}
logger.info( logger.info(
"Smart Scanner: Found %d candidates for %s: %s", "Smart Scanner: Found %d candidates for %s: %s",
@@ -909,6 +1034,62 @@ async def run(settings: Settings) -> None:
market.name, market.name,
[f"{c.stock_code}(RSI={c.rsi:.0f})" for c in candidates], [f"{c.stock_code}(RSI={c.rsi:.0f})" for c in candidates],
) )
# Get market-local date for playbook keying
market_today = datetime.now(
market.timezone
).date()
# Load or generate playbook (1 Gemini call per market per day)
if market.code not in playbooks:
# Try DB first (survives process restart)
stored_pb = playbook_store.load(market_today, market.code)
if stored_pb is not None:
playbooks[market.code] = stored_pb
logger.info(
"Loaded existing playbook for %s from DB"
" (%d stocks, %d scenarios)",
market.code,
stored_pb.stock_count,
stored_pb.scenario_count,
)
else:
try:
pb = await pre_market_planner.generate_playbook(
market=market.code,
candidates=candidates,
today=market_today,
)
playbook_store.save(pb)
playbooks[market.code] = pb
try:
await telegram.notify_playbook_generated(
market=market.code,
stock_count=pb.stock_count,
scenario_count=pb.scenario_count,
token_count=pb.token_count,
)
except Exception as exc:
logger.warning(
"Playbook notification failed: %s", exc
)
except Exception as exc:
logger.error(
"Playbook generation failed for %s: %s",
market.code, exc,
)
try:
await telegram.notify_playbook_failed(
market=market.code,
reason=str(exc)[:200],
)
except Exception:
pass
playbooks[market.code] = (
PreMarketPlanner._empty_playbook(
market_today, market.code
)
)
else: else:
logger.info( logger.info(
"Smart Scanner: No candidates for %s — no trades", market.name "Smart Scanner: No candidates for %s — no trades", market.name
@@ -933,13 +1114,22 @@ async def run(settings: Settings) -> None:
if shutdown.is_set(): if shutdown.is_set():
break break
# Get playbook for this market
market_playbook = playbooks.get(
market.code,
PreMarketPlanner._empty_playbook(
datetime.now(market.timezone).date(), market.code
),
)
# Retry logic for connection errors # Retry logic for connection errors
for attempt in range(1, MAX_CONNECTION_RETRIES + 1): for attempt in range(1, MAX_CONNECTION_RETRIES + 1):
try: try:
await trading_cycle( await trading_cycle(
broker, broker,
overseas_broker, overseas_broker,
brain, scenario_engine,
market_playbook,
risk, risk,
db_conn, db_conn,
decision_logger, decision_logger,
@@ -988,7 +1178,8 @@ async def run(settings: Settings) -> None:
metrics = await priority_queue.get_metrics() metrics = await priority_queue.get_metrics()
if metrics.total_enqueued > 0: if metrics.total_enqueued > 0:
logger.info( logger.info(
"Priority queue metrics: enqueued=%d, dequeued=%d, size=%d, timeouts=%d, errors=%d", "Priority queue metrics: enqueued=%d, dequeued=%d,"
" size=%d, timeouts=%d, errors=%d",
metrics.total_enqueued, metrics.total_enqueued,
metrics.total_dequeued, metrics.total_dequeued,
metrics.current_size, metrics.current_size,

View File

@@ -304,6 +304,77 @@ class TelegramClient:
NotificationMessage(priority=NotificationPriority.MEDIUM, message=message) NotificationMessage(priority=NotificationPriority.MEDIUM, message=message)
) )
async def notify_playbook_generated(
self,
market: str,
stock_count: int,
scenario_count: int,
token_count: int,
) -> None:
"""
Notify that a daily playbook was generated.
Args:
market: Market code (e.g., "KR", "US")
stock_count: Number of stocks in the playbook
scenario_count: Total number of scenarios
token_count: Gemini token usage for the playbook
"""
message = (
f"<b>Playbook Generated</b>\n"
f"Market: {market}\n"
f"Stocks: {stock_count}\n"
f"Scenarios: {scenario_count}\n"
f"Tokens: {token_count}"
)
await self._send_notification(
NotificationMessage(priority=NotificationPriority.MEDIUM, message=message)
)
async def notify_scenario_matched(
self,
stock_code: str,
action: str,
condition_summary: str,
confidence: float,
) -> None:
"""
Notify that a scenario matched for a stock.
Args:
stock_code: Stock ticker symbol
action: Scenario action (BUY/SELL/HOLD/REDUCE_ALL)
condition_summary: Short summary of the matched condition
confidence: Scenario confidence (0-100)
"""
message = (
f"<b>Scenario Matched</b>\n"
f"Symbol: <code>{stock_code}</code>\n"
f"Action: {action}\n"
f"Condition: {condition_summary}\n"
f"Confidence: {confidence:.0f}%"
)
await self._send_notification(
NotificationMessage(priority=NotificationPriority.HIGH, message=message)
)
async def notify_playbook_failed(self, market: str, reason: str) -> None:
"""
Notify that playbook generation failed.
Args:
market: Market code (e.g., "KR", "US")
reason: Failure reason summary
"""
message = (
f"<b>Playbook Failed</b>\n"
f"Market: {market}\n"
f"Reason: {reason[:200]}"
)
await self._send_notification(
NotificationMessage(priority=NotificationPriority.HIGH, message=message)
)
async def notify_system_shutdown(self, reason: str) -> None: async def notify_system_shutdown(self, reason: str) -> None:
""" """
Notify system shutdown. Notify system shutdown.

View File

@@ -0,0 +1,419 @@
"""Pre-market planner — generates DayPlaybook via Gemini before market open.
One Gemini API call per market per day. Candidates come from SmartVolatilityScanner.
On failure, returns a defensive playbook (all HOLD, no trades).
"""
from __future__ import annotations
import json
import logging
from datetime import date
from typing import Any
from src.analysis.smart_scanner import ScanCandidate
from src.brain.context_selector import ContextSelector, DecisionType
from src.brain.gemini_client import GeminiClient
from src.config import Settings
from src.context.store import ContextLayer, ContextStore
from src.strategy.models import (
CrossMarketContext,
DayPlaybook,
GlobalRule,
MarketOutlook,
ScenarioAction,
StockCondition,
StockPlaybook,
StockScenario,
)
logger = logging.getLogger(__name__)
# Mapping from string to MarketOutlook enum
_OUTLOOK_MAP: dict[str, MarketOutlook] = {
"bullish": MarketOutlook.BULLISH,
"neutral_to_bullish": MarketOutlook.NEUTRAL_TO_BULLISH,
"neutral": MarketOutlook.NEUTRAL,
"neutral_to_bearish": MarketOutlook.NEUTRAL_TO_BEARISH,
"bearish": MarketOutlook.BEARISH,
}
_ACTION_MAP: dict[str, ScenarioAction] = {
"BUY": ScenarioAction.BUY,
"SELL": ScenarioAction.SELL,
"HOLD": ScenarioAction.HOLD,
"REDUCE_ALL": ScenarioAction.REDUCE_ALL,
}
class PreMarketPlanner:
"""Generates a DayPlaybook by calling Gemini once before market open.
Flow:
1. Collect strategic context (L5-L7) + cross-market context
2. Build a structured prompt with scan candidates
3. Call Gemini for JSON scenario generation
4. Parse and validate response into DayPlaybook
5. On failure → defensive playbook (HOLD everything)
"""
def __init__(
self,
gemini_client: GeminiClient,
context_store: ContextStore,
context_selector: ContextSelector,
settings: Settings,
) -> None:
self._gemini = gemini_client
self._context_store = context_store
self._context_selector = context_selector
self._settings = settings
async def generate_playbook(
self,
market: str,
candidates: list[ScanCandidate],
today: date | None = None,
) -> DayPlaybook:
"""Generate a DayPlaybook for a market using Gemini.
Args:
market: Market code ("KR" or "US")
candidates: Stock candidates from SmartVolatilityScanner
today: Override date (defaults to date.today()). Use market-local date.
Returns:
DayPlaybook with scenarios. Empty/defensive if no candidates or failure.
"""
if today is None:
today = date.today()
if not candidates:
logger.info("No candidates for %s — returning empty playbook", market)
return self._empty_playbook(today, market)
try:
# 1. Gather context
context_data = self._gather_context()
cross_market = self.build_cross_market_context(market, today)
# 2. Build prompt
prompt = self._build_prompt(market, candidates, context_data, cross_market)
# 3. Call Gemini
market_data = {
"stock_code": "PLANNER",
"current_price": 0,
"prompt_override": prompt,
}
decision = await self._gemini.decide(market_data)
# 4. Parse response
playbook = self._parse_response(
decision.rationale, today, market, candidates, cross_market
)
playbook_with_tokens = playbook.model_copy(
update={"token_count": decision.token_count}
)
logger.info(
"Generated playbook for %s: %d stocks, %d scenarios, %d tokens",
market,
playbook_with_tokens.stock_count,
playbook_with_tokens.scenario_count,
playbook_with_tokens.token_count,
)
return playbook_with_tokens
except Exception:
logger.exception("Playbook generation failed for %s", market)
if self._settings.DEFENSIVE_PLAYBOOK_ON_FAILURE:
return self._defensive_playbook(today, market, candidates)
return self._empty_playbook(today, market)
def build_cross_market_context(
self, target_market: str, today: date | None = None,
) -> CrossMarketContext | None:
"""Build cross-market context from the other market's L6 data.
KR planner → reads US scorecard from previous night.
US planner → reads KR scorecard from today.
Args:
target_market: The market being planned ("KR" or "US")
today: Override date (defaults to date.today()). Use market-local date.
"""
other_market = "US" if target_market == "KR" else "KR"
if today is None:
today = date.today()
timeframe = today.isoformat()
scorecard_key = f"scorecard_{other_market}"
scorecard_data = self._context_store.get_context(
ContextLayer.L6_DAILY, timeframe, scorecard_key
)
if scorecard_data is None:
logger.debug("No cross-market scorecard found for %s", other_market)
return None
if isinstance(scorecard_data, str):
try:
scorecard_data = json.loads(scorecard_data)
except (json.JSONDecodeError, TypeError):
return None
if not isinstance(scorecard_data, dict):
return None
return CrossMarketContext(
market=other_market,
date=timeframe,
total_pnl=float(scorecard_data.get("total_pnl", 0.0)),
win_rate=float(scorecard_data.get("win_rate", 0.0)),
index_change_pct=float(scorecard_data.get("index_change_pct", 0.0)),
key_events=scorecard_data.get("key_events", []),
lessons=scorecard_data.get("lessons", []),
)
def _gather_context(self) -> dict[str, Any]:
"""Gather strategic context using ContextSelector."""
layers = self._context_selector.select_layers(
decision_type=DecisionType.STRATEGIC,
include_realtime=True,
)
return self._context_selector.get_context_data(layers, max_items_per_layer=10)
def _build_prompt(
self,
market: str,
candidates: list[ScanCandidate],
context_data: dict[str, Any],
cross_market: CrossMarketContext | None,
) -> str:
"""Build a structured prompt for Gemini to generate scenario JSON."""
max_scenarios = self._settings.MAX_SCENARIOS_PER_STOCK
candidates_text = "\n".join(
f" - {c.stock_code} ({c.name}): price={c.price}, "
f"RSI={c.rsi:.1f}, volume_ratio={c.volume_ratio:.1f}, "
f"signal={c.signal}, score={c.score:.1f}"
for c in candidates
)
cross_market_text = ""
if cross_market:
cross_market_text = (
f"\n## Other Market ({cross_market.market}) Summary\n"
f"- P&L: {cross_market.total_pnl:+.2f}%\n"
f"- Win Rate: {cross_market.win_rate:.0f}%\n"
f"- Index Change: {cross_market.index_change_pct:+.2f}%\n"
)
if cross_market.lessons:
cross_market_text += f"- Lessons: {'; '.join(cross_market.lessons[:3])}\n"
context_text = ""
if context_data:
context_text = "\n## Strategic Context\n"
for layer_name, layer_data in context_data.items():
if layer_data:
context_text += f"### {layer_name}\n"
for key, value in list(layer_data.items())[:5]:
context_text += f" - {key}: {value}\n"
return (
f"You are a pre-market trading strategist for the {market} market.\n"
f"Generate structured trading scenarios for today.\n\n"
f"## Candidates (from volatility scanner)\n{candidates_text}\n"
f"{cross_market_text}"
f"{context_text}\n"
f"## Instructions\n"
f"Return a JSON object with this exact structure:\n"
f'{{\n'
f' "market_outlook": "bullish|neutral_to_bullish|neutral'
f'|neutral_to_bearish|bearish",\n'
f' "global_rules": [\n'
f' {{"condition": "portfolio_pnl_pct < -2.0",'
f' "action": "REDUCE_ALL", "rationale": "..."}}\n'
f' ],\n'
f' "stocks": [\n'
f' {{\n'
f' "stock_code": "...",\n'
f' "scenarios": [\n'
f' {{\n'
f' "condition": {{"rsi_below": 30, "volume_ratio_above": 2.0}},\n'
f' "action": "BUY|SELL|HOLD",\n'
f' "confidence": 85,\n'
f' "allocation_pct": 10.0,\n'
f' "stop_loss_pct": -2.0,\n'
f' "take_profit_pct": 3.0,\n'
f' "rationale": "..."\n'
f' }}\n'
f' ]\n'
f' }}\n'
f' ]\n'
f'}}\n\n'
f"Rules:\n"
f"- Max {max_scenarios} scenarios per stock\n"
f"- Only use stocks from the candidates list\n"
f"- Confidence 0-100 (80+ for actionable trades)\n"
f"- stop_loss_pct must be <= 0, take_profit_pct must be >= 0\n"
f"- Return ONLY the JSON, no markdown fences or explanation\n"
)
def _parse_response(
self,
response_text: str,
today: date,
market: str,
candidates: list[ScanCandidate],
cross_market: CrossMarketContext | None,
) -> DayPlaybook:
"""Parse Gemini's JSON response into a validated DayPlaybook."""
cleaned = self._extract_json(response_text)
data = json.loads(cleaned)
valid_codes = {c.stock_code for c in candidates}
# Parse market outlook
outlook_str = data.get("market_outlook", "neutral")
market_outlook = _OUTLOOK_MAP.get(outlook_str, MarketOutlook.NEUTRAL)
# Parse global rules
global_rules = []
for rule_data in data.get("global_rules", []):
action_str = rule_data.get("action", "HOLD")
action = _ACTION_MAP.get(action_str, ScenarioAction.HOLD)
global_rules.append(
GlobalRule(
condition=rule_data.get("condition", ""),
action=action,
rationale=rule_data.get("rationale", ""),
)
)
# Parse stock playbooks
stock_playbooks = []
max_scenarios = self._settings.MAX_SCENARIOS_PER_STOCK
for stock_data in data.get("stocks", []):
code = stock_data.get("stock_code", "")
if code not in valid_codes:
logger.warning("Gemini returned unknown stock %s — skipping", code)
continue
scenarios = []
for sc_data in stock_data.get("scenarios", [])[:max_scenarios]:
scenario = self._parse_scenario(sc_data)
if scenario:
scenarios.append(scenario)
if scenarios:
stock_playbooks.append(
StockPlaybook(
stock_code=code,
scenarios=scenarios,
)
)
return DayPlaybook(
date=today,
market=market,
market_outlook=market_outlook,
global_rules=global_rules,
stock_playbooks=stock_playbooks,
cross_market=cross_market,
)
def _parse_scenario(self, sc_data: dict) -> StockScenario | None:
"""Parse a single scenario from JSON data. Returns None if invalid."""
try:
cond_data = sc_data.get("condition", {})
condition = StockCondition(
rsi_below=cond_data.get("rsi_below"),
rsi_above=cond_data.get("rsi_above"),
volume_ratio_above=cond_data.get("volume_ratio_above"),
volume_ratio_below=cond_data.get("volume_ratio_below"),
price_above=cond_data.get("price_above"),
price_below=cond_data.get("price_below"),
price_change_pct_above=cond_data.get("price_change_pct_above"),
price_change_pct_below=cond_data.get("price_change_pct_below"),
)
if not condition.has_any_condition():
logger.warning("Scenario has no conditions — skipping")
return None
action_str = sc_data.get("action", "HOLD")
action = _ACTION_MAP.get(action_str, ScenarioAction.HOLD)
return StockScenario(
condition=condition,
action=action,
confidence=int(sc_data.get("confidence", 50)),
allocation_pct=float(sc_data.get("allocation_pct", 10.0)),
stop_loss_pct=float(sc_data.get("stop_loss_pct", -2.0)),
take_profit_pct=float(sc_data.get("take_profit_pct", 3.0)),
rationale=sc_data.get("rationale", ""),
)
except (ValueError, TypeError) as e:
logger.warning("Failed to parse scenario: %s", e)
return None
@staticmethod
def _extract_json(text: str) -> str:
"""Extract JSON from response, stripping markdown fences if present."""
stripped = text.strip()
if stripped.startswith("```"):
# Remove first line (```json or ```) and last line (```)
lines = stripped.split("\n")
lines = lines[1:] # Remove opening fence
if lines and lines[-1].strip() == "```":
lines = lines[:-1]
stripped = "\n".join(lines)
return stripped.strip()
@staticmethod
def _empty_playbook(today: date, market: str) -> DayPlaybook:
"""Return an empty playbook (no stocks, no scenarios)."""
return DayPlaybook(
date=today,
market=market,
market_outlook=MarketOutlook.NEUTRAL,
stock_playbooks=[],
)
@staticmethod
def _defensive_playbook(
today: date,
market: str,
candidates: list[ScanCandidate],
) -> DayPlaybook:
"""Return a defensive playbook — HOLD everything with stop-loss ready."""
stock_playbooks = [
StockPlaybook(
stock_code=c.stock_code,
scenarios=[
StockScenario(
condition=StockCondition(price_change_pct_below=-3.0),
action=ScenarioAction.SELL,
confidence=90,
stop_loss_pct=-3.0,
rationale="Defensive stop-loss (planner failure)",
),
],
)
for c in candidates
]
return DayPlaybook(
date=today,
market=market,
market_outlook=MarketOutlook.NEUTRAL_TO_BEARISH,
default_action=ScenarioAction.HOLD,
stock_playbooks=stock_playbooks,
global_rules=[
GlobalRule(
condition="portfolio_pnl_pct < -2.0",
action=ScenarioAction.REDUCE_ALL,
rationale="Defensive: reduce on loss threshold",
),
],
)

View File

@@ -1,12 +1,46 @@
"""Tests for main trading loop telegram integration.""" """Tests for main trading loop integration."""
import asyncio from datetime import date
from unittest.mock import AsyncMock, MagicMock, patch from unittest.mock import AsyncMock, MagicMock, patch
import pytest import pytest
from src.core.risk_manager import CircuitBreakerTripped, FatFingerRejected from src.core.risk_manager import CircuitBreakerTripped, FatFingerRejected
from src.main import safe_float, trading_cycle from src.main import safe_float, trading_cycle
from src.strategy.models import (
DayPlaybook,
ScenarioAction,
StockCondition,
StockScenario,
)
from src.strategy.scenario_engine import ScenarioEngine, ScenarioMatch
def _make_playbook(market: str = "KR") -> DayPlaybook:
"""Create a minimal empty playbook for testing."""
return DayPlaybook(date=date(2026, 2, 8), market=market)
def _make_buy_match(stock_code: str = "005930") -> ScenarioMatch:
"""Create a ScenarioMatch that returns BUY."""
return ScenarioMatch(
stock_code=stock_code,
matched_scenario=None,
action=ScenarioAction.BUY,
confidence=85,
rationale="Test buy",
)
def _make_hold_match(stock_code: str = "005930") -> ScenarioMatch:
"""Create a ScenarioMatch that returns HOLD."""
return ScenarioMatch(
stock_code=stock_code,
matched_scenario=None,
action=ScenarioAction.HOLD,
confidence=0,
rationale="No scenario conditions met",
)
class TestSafeFloat: class TestSafeFloat:
@@ -81,15 +115,16 @@ class TestTradingCycleTelegramIntegration:
return broker return broker
@pytest.fixture @pytest.fixture
def mock_brain(self) -> MagicMock: def mock_scenario_engine(self) -> MagicMock:
"""Create mock brain that decides to buy.""" """Create mock scenario engine that returns BUY."""
brain = MagicMock() engine = MagicMock(spec=ScenarioEngine)
decision = MagicMock() engine.evaluate = MagicMock(return_value=_make_buy_match())
decision.action = "BUY" return engine
decision.confidence = 85
decision.rationale = "Test buy" @pytest.fixture
brain.decide = AsyncMock(return_value=decision) def mock_playbook(self) -> DayPlaybook:
return brain """Create a minimal day playbook."""
return _make_playbook()
@pytest.fixture @pytest.fixture
def mock_risk(self) -> MagicMock: def mock_risk(self) -> MagicMock:
@@ -134,6 +169,7 @@ class TestTradingCycleTelegramIntegration:
telegram.notify_trade_execution = AsyncMock() telegram.notify_trade_execution = AsyncMock()
telegram.notify_fat_finger = AsyncMock() telegram.notify_fat_finger = AsyncMock()
telegram.notify_circuit_breaker = AsyncMock() telegram.notify_circuit_breaker = AsyncMock()
telegram.notify_scenario_matched = AsyncMock()
return telegram return telegram
@pytest.fixture @pytest.fixture
@@ -151,7 +187,8 @@ class TestTradingCycleTelegramIntegration:
self, self,
mock_broker: MagicMock, mock_broker: MagicMock,
mock_overseas_broker: MagicMock, mock_overseas_broker: MagicMock,
mock_brain: MagicMock, mock_scenario_engine: MagicMock,
mock_playbook: DayPlaybook,
mock_risk: MagicMock, mock_risk: MagicMock,
mock_db: MagicMock, mock_db: MagicMock,
mock_decision_logger: MagicMock, mock_decision_logger: MagicMock,
@@ -165,7 +202,8 @@ class TestTradingCycleTelegramIntegration:
await trading_cycle( await trading_cycle(
broker=mock_broker, broker=mock_broker,
overseas_broker=mock_overseas_broker, overseas_broker=mock_overseas_broker,
brain=mock_brain, scenario_engine=mock_scenario_engine,
playbook=mock_playbook,
risk=mock_risk, risk=mock_risk,
db_conn=mock_db, db_conn=mock_db,
decision_logger=mock_decision_logger, decision_logger=mock_decision_logger,
@@ -190,7 +228,8 @@ class TestTradingCycleTelegramIntegration:
self, self,
mock_broker: MagicMock, mock_broker: MagicMock,
mock_overseas_broker: MagicMock, mock_overseas_broker: MagicMock,
mock_brain: MagicMock, mock_scenario_engine: MagicMock,
mock_playbook: DayPlaybook,
mock_risk: MagicMock, mock_risk: MagicMock,
mock_db: MagicMock, mock_db: MagicMock,
mock_decision_logger: MagicMock, mock_decision_logger: MagicMock,
@@ -208,7 +247,8 @@ class TestTradingCycleTelegramIntegration:
await trading_cycle( await trading_cycle(
broker=mock_broker, broker=mock_broker,
overseas_broker=mock_overseas_broker, overseas_broker=mock_overseas_broker,
brain=mock_brain, scenario_engine=mock_scenario_engine,
playbook=mock_playbook,
risk=mock_risk, risk=mock_risk,
db_conn=mock_db, db_conn=mock_db,
decision_logger=mock_decision_logger, decision_logger=mock_decision_logger,
@@ -228,7 +268,8 @@ class TestTradingCycleTelegramIntegration:
self, self,
mock_broker: MagicMock, mock_broker: MagicMock,
mock_overseas_broker: MagicMock, mock_overseas_broker: MagicMock,
mock_brain: MagicMock, mock_scenario_engine: MagicMock,
mock_playbook: DayPlaybook,
mock_risk: MagicMock, mock_risk: MagicMock,
mock_db: MagicMock, mock_db: MagicMock,
mock_decision_logger: MagicMock, mock_decision_logger: MagicMock,
@@ -250,7 +291,8 @@ class TestTradingCycleTelegramIntegration:
await trading_cycle( await trading_cycle(
broker=mock_broker, broker=mock_broker,
overseas_broker=mock_overseas_broker, overseas_broker=mock_overseas_broker,
brain=mock_brain, scenario_engine=mock_scenario_engine,
playbook=mock_playbook,
risk=mock_risk, risk=mock_risk,
db_conn=mock_db, db_conn=mock_db,
decision_logger=mock_decision_logger, decision_logger=mock_decision_logger,
@@ -275,7 +317,8 @@ class TestTradingCycleTelegramIntegration:
self, self,
mock_broker: MagicMock, mock_broker: MagicMock,
mock_overseas_broker: MagicMock, mock_overseas_broker: MagicMock,
mock_brain: MagicMock, mock_scenario_engine: MagicMock,
mock_playbook: DayPlaybook,
mock_risk: MagicMock, mock_risk: MagicMock,
mock_db: MagicMock, mock_db: MagicMock,
mock_decision_logger: MagicMock, mock_decision_logger: MagicMock,
@@ -299,7 +342,8 @@ class TestTradingCycleTelegramIntegration:
await trading_cycle( await trading_cycle(
broker=mock_broker, broker=mock_broker,
overseas_broker=mock_overseas_broker, overseas_broker=mock_overseas_broker,
brain=mock_brain, scenario_engine=mock_scenario_engine,
playbook=mock_playbook,
risk=mock_risk, risk=mock_risk,
db_conn=mock_db, db_conn=mock_db,
decision_logger=mock_decision_logger, decision_logger=mock_decision_logger,
@@ -319,7 +363,8 @@ class TestTradingCycleTelegramIntegration:
self, self,
mock_broker: MagicMock, mock_broker: MagicMock,
mock_overseas_broker: MagicMock, mock_overseas_broker: MagicMock,
mock_brain: MagicMock, mock_scenario_engine: MagicMock,
mock_playbook: DayPlaybook,
mock_risk: MagicMock, mock_risk: MagicMock,
mock_db: MagicMock, mock_db: MagicMock,
mock_decision_logger: MagicMock, mock_decision_logger: MagicMock,
@@ -329,18 +374,15 @@ class TestTradingCycleTelegramIntegration:
mock_market: MagicMock, mock_market: MagicMock,
) -> None: ) -> None:
"""Test no trade notification sent when decision is HOLD.""" """Test no trade notification sent when decision is HOLD."""
# Change brain decision to HOLD # Scenario engine returns HOLD
decision = MagicMock() mock_scenario_engine.evaluate = MagicMock(return_value=_make_hold_match())
decision.action = "HOLD"
decision.confidence = 50
decision.rationale = "Insufficient signal"
mock_brain.decide = AsyncMock(return_value=decision)
with patch("src.main.log_trade"): with patch("src.main.log_trade"):
await trading_cycle( await trading_cycle(
broker=mock_broker, broker=mock_broker,
overseas_broker=mock_overseas_broker, overseas_broker=mock_overseas_broker,
brain=mock_brain, scenario_engine=mock_scenario_engine,
playbook=mock_playbook,
risk=mock_risk, risk=mock_risk,
db_conn=mock_db, db_conn=mock_db,
decision_logger=mock_decision_logger, decision_logger=mock_decision_logger,
@@ -472,15 +514,16 @@ class TestOverseasBalanceParsing:
return market return market
@pytest.fixture @pytest.fixture
def mock_brain_hold(self) -> MagicMock: def mock_scenario_engine_hold(self) -> MagicMock:
"""Create mock brain that always holds.""" """Create mock scenario engine that always returns HOLD."""
brain = MagicMock() engine = MagicMock(spec=ScenarioEngine)
decision = MagicMock() engine.evaluate = MagicMock(return_value=_make_hold_match("AAPL"))
decision.action = "HOLD" return engine
decision.confidence = 50
decision.rationale = "Testing balance parsing" @pytest.fixture
brain.decide = AsyncMock(return_value=decision) def mock_playbook(self) -> DayPlaybook:
return brain """Create a minimal playbook."""
return _make_playbook("US")
@pytest.fixture @pytest.fixture
def mock_risk(self) -> MagicMock: def mock_risk(self) -> MagicMock:
@@ -517,14 +560,17 @@ class TestOverseasBalanceParsing:
@pytest.fixture @pytest.fixture
def mock_telegram(self) -> MagicMock: def mock_telegram(self) -> MagicMock:
"""Create mock telegram client.""" """Create mock telegram client."""
return MagicMock() telegram = MagicMock()
telegram.notify_scenario_matched = AsyncMock()
return telegram
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_overseas_balance_list_format( async def test_overseas_balance_list_format(
self, self,
mock_domestic_broker: MagicMock, mock_domestic_broker: MagicMock,
mock_overseas_broker_with_list: MagicMock, mock_overseas_broker_with_list: MagicMock,
mock_brain_hold: MagicMock, mock_scenario_engine_hold: MagicMock,
mock_playbook: DayPlaybook,
mock_risk: MagicMock, mock_risk: MagicMock,
mock_db: MagicMock, mock_db: MagicMock,
mock_decision_logger: MagicMock, mock_decision_logger: MagicMock,
@@ -539,7 +585,8 @@ class TestOverseasBalanceParsing:
await trading_cycle( await trading_cycle(
broker=mock_domestic_broker, broker=mock_domestic_broker,
overseas_broker=mock_overseas_broker_with_list, overseas_broker=mock_overseas_broker_with_list,
brain=mock_brain_hold, scenario_engine=mock_scenario_engine_hold,
playbook=mock_playbook,
risk=mock_risk, risk=mock_risk,
db_conn=mock_db, db_conn=mock_db,
decision_logger=mock_decision_logger, decision_logger=mock_decision_logger,
@@ -559,7 +606,8 @@ class TestOverseasBalanceParsing:
self, self,
mock_domestic_broker: MagicMock, mock_domestic_broker: MagicMock,
mock_overseas_broker_with_dict: MagicMock, mock_overseas_broker_with_dict: MagicMock,
mock_brain_hold: MagicMock, mock_scenario_engine_hold: MagicMock,
mock_playbook: DayPlaybook,
mock_risk: MagicMock, mock_risk: MagicMock,
mock_db: MagicMock, mock_db: MagicMock,
mock_decision_logger: MagicMock, mock_decision_logger: MagicMock,
@@ -574,7 +622,8 @@ class TestOverseasBalanceParsing:
await trading_cycle( await trading_cycle(
broker=mock_domestic_broker, broker=mock_domestic_broker,
overseas_broker=mock_overseas_broker_with_dict, overseas_broker=mock_overseas_broker_with_dict,
brain=mock_brain_hold, scenario_engine=mock_scenario_engine_hold,
playbook=mock_playbook,
risk=mock_risk, risk=mock_risk,
db_conn=mock_db, db_conn=mock_db,
decision_logger=mock_decision_logger, decision_logger=mock_decision_logger,
@@ -594,7 +643,8 @@ class TestOverseasBalanceParsing:
self, self,
mock_domestic_broker: MagicMock, mock_domestic_broker: MagicMock,
mock_overseas_broker_with_empty: MagicMock, mock_overseas_broker_with_empty: MagicMock,
mock_brain_hold: MagicMock, mock_scenario_engine_hold: MagicMock,
mock_playbook: DayPlaybook,
mock_risk: MagicMock, mock_risk: MagicMock,
mock_db: MagicMock, mock_db: MagicMock,
mock_decision_logger: MagicMock, mock_decision_logger: MagicMock,
@@ -609,7 +659,8 @@ class TestOverseasBalanceParsing:
await trading_cycle( await trading_cycle(
broker=mock_domestic_broker, broker=mock_domestic_broker,
overseas_broker=mock_overseas_broker_with_empty, overseas_broker=mock_overseas_broker_with_empty,
brain=mock_brain_hold, scenario_engine=mock_scenario_engine_hold,
playbook=mock_playbook,
risk=mock_risk, risk=mock_risk,
db_conn=mock_db, db_conn=mock_db,
decision_logger=mock_decision_logger, decision_logger=mock_decision_logger,
@@ -629,7 +680,8 @@ class TestOverseasBalanceParsing:
self, self,
mock_domestic_broker: MagicMock, mock_domestic_broker: MagicMock,
mock_overseas_broker_with_empty_price: MagicMock, mock_overseas_broker_with_empty_price: MagicMock,
mock_brain_hold: MagicMock, mock_scenario_engine_hold: MagicMock,
mock_playbook: DayPlaybook,
mock_risk: MagicMock, mock_risk: MagicMock,
mock_db: MagicMock, mock_db: MagicMock,
mock_decision_logger: MagicMock, mock_decision_logger: MagicMock,
@@ -644,7 +696,8 @@ class TestOverseasBalanceParsing:
await trading_cycle( await trading_cycle(
broker=mock_domestic_broker, broker=mock_domestic_broker,
overseas_broker=mock_overseas_broker_with_empty_price, overseas_broker=mock_overseas_broker_with_empty_price,
brain=mock_brain_hold, scenario_engine=mock_scenario_engine_hold,
playbook=mock_playbook,
risk=mock_risk, risk=mock_risk,
db_conn=mock_db, db_conn=mock_db,
decision_logger=mock_decision_logger, decision_logger=mock_decision_logger,
@@ -658,3 +711,341 @@ class TestOverseasBalanceParsing:
# Verify price API was called # Verify price API was called
mock_overseas_broker_with_empty_price.get_overseas_price.assert_called_once() mock_overseas_broker_with_empty_price.get_overseas_price.assert_called_once()
class TestScenarioEngineIntegration:
"""Test scenario engine integration in trading_cycle."""
@pytest.fixture
def mock_broker(self) -> MagicMock:
"""Create mock broker with standard domestic data."""
broker = MagicMock()
broker.get_orderbook = AsyncMock(
return_value={
"output1": {"stck_prpr": "50000", "frgn_ntby_qty": "100"}
}
)
broker.get_balance = AsyncMock(
return_value={
"output2": [
{
"tot_evlu_amt": "10000000",
"dnca_tot_amt": "5000000",
"pchs_amt_smtl_amt": "9500000",
}
]
}
)
broker.send_order = AsyncMock(return_value={"msg1": "OK"})
return broker
@pytest.fixture
def mock_market(self) -> MagicMock:
"""Create mock KR market."""
market = MagicMock()
market.name = "Korea"
market.code = "KR"
market.exchange_code = "KRX"
market.is_domestic = True
return market
@pytest.fixture
def mock_telegram(self) -> MagicMock:
"""Create mock telegram with all notification methods."""
telegram = MagicMock()
telegram.notify_trade_execution = AsyncMock()
telegram.notify_scenario_matched = AsyncMock()
telegram.notify_fat_finger = AsyncMock()
return telegram
@pytest.mark.asyncio
async def test_scenario_engine_called_with_enriched_market_data(
self, mock_broker: MagicMock, mock_market: MagicMock, mock_telegram: MagicMock,
) -> None:
"""Test scenario engine receives market_data enriched with scanner metrics."""
from src.analysis.smart_scanner import ScanCandidate
engine = MagicMock(spec=ScenarioEngine)
engine.evaluate = MagicMock(return_value=_make_hold_match())
playbook = _make_playbook()
candidate = ScanCandidate(
stock_code="005930", name="Samsung", price=50000,
volume=1000000, volume_ratio=3.5, rsi=25.0,
signal="oversold", score=85.0,
)
with patch("src.main.log_trade"):
await trading_cycle(
broker=mock_broker,
overseas_broker=MagicMock(),
scenario_engine=engine,
playbook=playbook,
risk=MagicMock(),
db_conn=MagicMock(),
decision_logger=MagicMock(),
context_store=MagicMock(get_latest_timeframe=MagicMock(return_value=None)),
criticality_assessor=MagicMock(
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
get_timeout=MagicMock(return_value=5.0),
),
telegram=mock_telegram,
market=mock_market,
stock_code="005930",
scan_candidates={"KR": {"005930": candidate}},
)
# Verify evaluate was called
engine.evaluate.assert_called_once()
call_args = engine.evaluate.call_args
market_data = call_args[0][2] # 3rd positional arg
portfolio_data = call_args[0][3] # 4th positional arg
# Scanner data should be enriched into market_data
assert market_data["rsi"] == 25.0
assert market_data["volume_ratio"] == 3.5
assert market_data["current_price"] == 50000.0
# Portfolio data should include pnl
assert "portfolio_pnl_pct" in portfolio_data
assert "total_cash" in portfolio_data
@pytest.mark.asyncio
async def test_scan_candidates_market_scoped(
self, mock_broker: MagicMock, mock_market: MagicMock, mock_telegram: MagicMock,
) -> None:
"""Test scan_candidates uses market-scoped lookup, ignoring other markets."""
from src.analysis.smart_scanner import ScanCandidate
engine = MagicMock(spec=ScenarioEngine)
engine.evaluate = MagicMock(return_value=_make_hold_match())
# Candidate stored under US market — should NOT be found for KR market
us_candidate = ScanCandidate(
stock_code="005930", name="Overlap", price=100,
volume=500000, volume_ratio=5.0, rsi=15.0,
signal="oversold", score=90.0,
)
with patch("src.main.log_trade"):
await trading_cycle(
broker=mock_broker,
overseas_broker=MagicMock(),
scenario_engine=engine,
playbook=_make_playbook(),
risk=MagicMock(),
db_conn=MagicMock(),
decision_logger=MagicMock(),
context_store=MagicMock(get_latest_timeframe=MagicMock(return_value=None)),
criticality_assessor=MagicMock(
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
get_timeout=MagicMock(return_value=5.0),
),
telegram=mock_telegram,
market=mock_market, # KR market
stock_code="005930",
scan_candidates={"US": {"005930": us_candidate}}, # Wrong market
)
# Should NOT have rsi/volume_ratio because candidate is under US, not KR
market_data = engine.evaluate.call_args[0][2]
assert "rsi" not in market_data
assert "volume_ratio" not in market_data
@pytest.mark.asyncio
async def test_scenario_engine_called_without_scanner_data(
self, mock_broker: MagicMock, mock_market: MagicMock, mock_telegram: MagicMock,
) -> None:
"""Test scenario engine works when stock has no scan candidate."""
engine = MagicMock(spec=ScenarioEngine)
engine.evaluate = MagicMock(return_value=_make_hold_match())
playbook = _make_playbook()
with patch("src.main.log_trade"):
await trading_cycle(
broker=mock_broker,
overseas_broker=MagicMock(),
scenario_engine=engine,
playbook=playbook,
risk=MagicMock(),
db_conn=MagicMock(),
decision_logger=MagicMock(),
context_store=MagicMock(get_latest_timeframe=MagicMock(return_value=None)),
criticality_assessor=MagicMock(
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
get_timeout=MagicMock(return_value=5.0),
),
telegram=mock_telegram,
market=mock_market,
stock_code="005930",
scan_candidates={}, # No scanner data
)
# Should still work, just without rsi/volume_ratio
engine.evaluate.assert_called_once()
market_data = engine.evaluate.call_args[0][2]
assert "rsi" not in market_data
assert "volume_ratio" not in market_data
assert market_data["current_price"] == 50000.0
@pytest.mark.asyncio
async def test_scenario_matched_notification_sent(
self, mock_broker: MagicMock, mock_market: MagicMock, mock_telegram: MagicMock,
) -> None:
"""Test telegram notification sent when a scenario matches."""
# Create a match with matched_scenario (not None)
scenario = StockScenario(
condition=StockCondition(rsi_below=30),
action=ScenarioAction.BUY,
confidence=88,
rationale="RSI oversold bounce",
)
match = ScenarioMatch(
stock_code="005930",
matched_scenario=scenario,
action=ScenarioAction.BUY,
confidence=88,
rationale="RSI oversold bounce",
match_details={"rsi": 25.0},
)
engine = MagicMock(spec=ScenarioEngine)
engine.evaluate = MagicMock(return_value=match)
with patch("src.main.log_trade"):
await trading_cycle(
broker=mock_broker,
overseas_broker=MagicMock(),
scenario_engine=engine,
playbook=_make_playbook(),
risk=MagicMock(),
db_conn=MagicMock(),
decision_logger=MagicMock(),
context_store=MagicMock(get_latest_timeframe=MagicMock(return_value=None)),
criticality_assessor=MagicMock(
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
get_timeout=MagicMock(return_value=5.0),
),
telegram=mock_telegram,
market=mock_market,
stock_code="005930",
scan_candidates={},
)
# Scenario matched notification should be sent
mock_telegram.notify_scenario_matched.assert_called_once()
call_kwargs = mock_telegram.notify_scenario_matched.call_args.kwargs
assert call_kwargs["stock_code"] == "005930"
assert call_kwargs["action"] == "BUY"
assert "rsi=25.0" in call_kwargs["condition_summary"]
@pytest.mark.asyncio
async def test_no_scenario_matched_notification_on_default_hold(
self, mock_broker: MagicMock, mock_market: MagicMock, mock_telegram: MagicMock,
) -> None:
"""Test no scenario notification when default HOLD is returned."""
engine = MagicMock(spec=ScenarioEngine)
engine.evaluate = MagicMock(return_value=_make_hold_match())
with patch("src.main.log_trade"):
await trading_cycle(
broker=mock_broker,
overseas_broker=MagicMock(),
scenario_engine=engine,
playbook=_make_playbook(),
risk=MagicMock(),
db_conn=MagicMock(),
decision_logger=MagicMock(),
context_store=MagicMock(get_latest_timeframe=MagicMock(return_value=None)),
criticality_assessor=MagicMock(
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
get_timeout=MagicMock(return_value=5.0),
),
telegram=mock_telegram,
market=mock_market,
stock_code="005930",
scan_candidates={},
)
# No scenario matched notification for default HOLD
mock_telegram.notify_scenario_matched.assert_not_called()
@pytest.mark.asyncio
async def test_decision_logger_receives_scenario_match_details(
self, mock_broker: MagicMock, mock_market: MagicMock, mock_telegram: MagicMock,
) -> None:
"""Test decision logger context includes scenario match details."""
match = ScenarioMatch(
stock_code="005930",
matched_scenario=None,
action=ScenarioAction.HOLD,
confidence=0,
rationale="No match",
match_details={"rsi": 45.0, "volume_ratio": 1.2},
)
engine = MagicMock(spec=ScenarioEngine)
engine.evaluate = MagicMock(return_value=match)
decision_logger = MagicMock()
with patch("src.main.log_trade"):
await trading_cycle(
broker=mock_broker,
overseas_broker=MagicMock(),
scenario_engine=engine,
playbook=_make_playbook(),
risk=MagicMock(),
db_conn=MagicMock(),
decision_logger=decision_logger,
context_store=MagicMock(get_latest_timeframe=MagicMock(return_value=None)),
criticality_assessor=MagicMock(
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
get_timeout=MagicMock(return_value=5.0),
),
telegram=mock_telegram,
market=mock_market,
stock_code="005930",
scan_candidates={},
)
decision_logger.log_decision.assert_called_once()
call_kwargs = decision_logger.log_decision.call_args.kwargs
assert "scenario_match" in call_kwargs["context_snapshot"]
assert call_kwargs["context_snapshot"]["scenario_match"]["rsi"] == 45.0
@pytest.mark.asyncio
async def test_reduce_all_does_not_execute_order(
self, mock_broker: MagicMock, mock_market: MagicMock, mock_telegram: MagicMock,
) -> None:
"""Test REDUCE_ALL action does not trigger order execution."""
match = ScenarioMatch(
stock_code="005930",
matched_scenario=None,
action=ScenarioAction.REDUCE_ALL,
confidence=100,
rationale="Global rule: portfolio loss > 2%",
)
engine = MagicMock(spec=ScenarioEngine)
engine.evaluate = MagicMock(return_value=match)
with patch("src.main.log_trade"):
await trading_cycle(
broker=mock_broker,
overseas_broker=MagicMock(),
scenario_engine=engine,
playbook=_make_playbook(),
risk=MagicMock(),
db_conn=MagicMock(),
decision_logger=MagicMock(),
context_store=MagicMock(get_latest_timeframe=MagicMock(return_value=None)),
criticality_assessor=MagicMock(
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
get_timeout=MagicMock(return_value=5.0),
),
telegram=mock_telegram,
market=mock_market,
stock_code="005930",
scan_candidates={},
)
# REDUCE_ALL is not BUY or SELL — no order sent
mock_broker.send_order.assert_not_called()
mock_telegram.notify_trade_execution.assert_not_called()

View File

@@ -0,0 +1,552 @@
"""Tests for PreMarketPlanner — Gemini prompt builder + response parser."""
from __future__ import annotations
import json
from datetime import date
from unittest.mock import AsyncMock, MagicMock
import pytest
from src.analysis.smart_scanner import ScanCandidate
from src.brain.gemini_client import TradeDecision
from src.config import Settings
from src.context.store import ContextLayer
from src.strategy.models import (
CrossMarketContext,
DayPlaybook,
MarketOutlook,
PlaybookStatus,
ScenarioAction,
)
from src.strategy.pre_market_planner import PreMarketPlanner
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
def _candidate(
code: str = "005930",
name: str = "Samsung",
price: float = 71000,
rsi: float = 28.5,
volume_ratio: float = 3.2,
signal: str = "oversold",
score: float = 82.0,
) -> ScanCandidate:
return ScanCandidate(
stock_code=code,
name=name,
price=price,
volume=1_500_000,
volume_ratio=volume_ratio,
rsi=rsi,
signal=signal,
score=score,
)
def _gemini_response_json(
outlook: str = "neutral_to_bullish",
stocks: list[dict] | None = None,
global_rules: list[dict] | None = None,
) -> str:
"""Build a valid Gemini JSON response."""
if stocks is None:
stocks = [
{
"stock_code": "005930",
"scenarios": [
{
"condition": {"rsi_below": 30, "volume_ratio_above": 2.5},
"action": "BUY",
"confidence": 85,
"allocation_pct": 15.0,
"stop_loss_pct": -2.0,
"take_profit_pct": 4.0,
"rationale": "Oversold bounce with high volume",
}
],
}
]
if global_rules is None:
global_rules = [
{
"condition": "portfolio_pnl_pct < -2.0",
"action": "REDUCE_ALL",
"rationale": "Near circuit breaker",
}
]
return json.dumps(
{"market_outlook": outlook, "global_rules": global_rules, "stocks": stocks}
)
def _make_planner(
gemini_response: str = "",
token_count: int = 200,
context_data: dict | None = None,
scorecard_data: dict | None = None,
) -> PreMarketPlanner:
"""Create a PreMarketPlanner with mocked dependencies."""
if not gemini_response:
gemini_response = _gemini_response_json()
# Mock GeminiClient
gemini = AsyncMock()
gemini.decide = AsyncMock(
return_value=TradeDecision(
action="HOLD",
confidence=0,
rationale=gemini_response,
token_count=token_count,
)
)
# Mock ContextStore
store = MagicMock()
store.get_context = MagicMock(return_value=scorecard_data)
# Mock ContextSelector
selector = MagicMock()
selector.select_layers = MagicMock(return_value=[ContextLayer.L7_REALTIME, ContextLayer.L6_DAILY])
selector.get_context_data = MagicMock(return_value=context_data or {})
settings = Settings(
KIS_APP_KEY="test",
KIS_APP_SECRET="test",
KIS_ACCOUNT_NO="12345678-01",
GEMINI_API_KEY="test",
)
return PreMarketPlanner(gemini, store, selector, settings)
# ---------------------------------------------------------------------------
# generate_playbook
# ---------------------------------------------------------------------------
class TestGeneratePlaybook:
@pytest.mark.asyncio
async def test_basic_generation(self) -> None:
planner = _make_planner()
candidates = [_candidate()]
pb = await planner.generate_playbook("KR", candidates, today=date(2026, 2, 8))
assert isinstance(pb, DayPlaybook)
assert pb.market == "KR"
assert pb.stock_count == 1
assert pb.scenario_count == 1
assert pb.market_outlook == MarketOutlook.NEUTRAL_TO_BULLISH
assert pb.token_count == 200
@pytest.mark.asyncio
async def test_empty_candidates_returns_empty_playbook(self) -> None:
planner = _make_planner()
pb = await planner.generate_playbook("KR", [], today=date(2026, 2, 8))
assert pb.stock_count == 0
assert pb.scenario_count == 0
assert pb.market_outlook == MarketOutlook.NEUTRAL
@pytest.mark.asyncio
async def test_gemini_failure_returns_defensive(self) -> None:
planner = _make_planner()
planner._gemini.decide = AsyncMock(side_effect=RuntimeError("API timeout"))
candidates = [_candidate()]
pb = await planner.generate_playbook("KR", candidates, today=date(2026, 2, 8))
assert pb.default_action == ScenarioAction.HOLD
assert pb.market_outlook == MarketOutlook.NEUTRAL_TO_BEARISH
assert pb.stock_count == 1
# Defensive playbook has stop-loss scenarios
assert pb.stock_playbooks[0].scenarios[0].action == ScenarioAction.SELL
@pytest.mark.asyncio
async def test_gemini_failure_empty_when_defensive_disabled(self) -> None:
planner = _make_planner()
planner._settings.DEFENSIVE_PLAYBOOK_ON_FAILURE = False
planner._gemini.decide = AsyncMock(side_effect=RuntimeError("fail"))
candidates = [_candidate()]
pb = await planner.generate_playbook("KR", candidates, today=date(2026, 2, 8))
assert pb.stock_count == 0
@pytest.mark.asyncio
async def test_multiple_candidates(self) -> None:
stocks = [
{
"stock_code": "005930",
"scenarios": [
{
"condition": {"rsi_below": 30},
"action": "BUY",
"confidence": 85,
"rationale": "Oversold",
}
],
},
{
"stock_code": "AAPL",
"scenarios": [
{
"condition": {"rsi_above": 75},
"action": "SELL",
"confidence": 80,
"rationale": "Overbought",
}
],
},
]
planner = _make_planner(gemini_response=_gemini_response_json(stocks=stocks))
candidates = [_candidate(), _candidate(code="AAPL", name="Apple")]
pb = await planner.generate_playbook("US", candidates, today=date(2026, 2, 8))
assert pb.stock_count == 2
codes = [sp.stock_code for sp in pb.stock_playbooks]
assert "005930" in codes
assert "AAPL" in codes
@pytest.mark.asyncio
async def test_unknown_stock_in_response_skipped(self) -> None:
stocks = [
{
"stock_code": "005930",
"scenarios": [{"condition": {"rsi_below": 30}, "action": "BUY", "confidence": 85, "rationale": "ok"}],
},
{
"stock_code": "UNKNOWN",
"scenarios": [{"condition": {"rsi_below": 20}, "action": "BUY", "confidence": 90, "rationale": "bad"}],
},
]
planner = _make_planner(gemini_response=_gemini_response_json(stocks=stocks))
candidates = [_candidate()] # Only 005930
pb = await planner.generate_playbook("KR", candidates, today=date(2026, 2, 8))
assert pb.stock_count == 1
assert pb.stock_playbooks[0].stock_code == "005930"
@pytest.mark.asyncio
async def test_global_rules_parsed(self) -> None:
planner = _make_planner()
candidates = [_candidate()]
pb = await planner.generate_playbook("KR", candidates, today=date(2026, 2, 8))
assert len(pb.global_rules) == 1
assert pb.global_rules[0].action == ScenarioAction.REDUCE_ALL
@pytest.mark.asyncio
async def test_token_count_from_decision(self) -> None:
planner = _make_planner(token_count=450)
candidates = [_candidate()]
pb = await planner.generate_playbook("KR", candidates, today=date(2026, 2, 8))
assert pb.token_count == 450
# ---------------------------------------------------------------------------
# _parse_response
# ---------------------------------------------------------------------------
class TestParseResponse:
def test_parse_full_response(self) -> None:
planner = _make_planner()
response = _gemini_response_json(outlook="bearish")
candidates = [_candidate()]
pb = planner._parse_response(response, date(2026, 2, 8), "KR", candidates, None)
assert pb.market_outlook == MarketOutlook.BEARISH
assert pb.stock_count == 1
assert pb.stock_playbooks[0].scenarios[0].confidence == 85
def test_parse_with_markdown_fences(self) -> None:
planner = _make_planner()
response = f"```json\n{_gemini_response_json()}\n```"
candidates = [_candidate()]
pb = planner._parse_response(response, date(2026, 2, 8), "KR", candidates, None)
assert pb.stock_count == 1
def test_parse_unknown_outlook_defaults_neutral(self) -> None:
planner = _make_planner()
response = _gemini_response_json(outlook="super_bullish")
candidates = [_candidate()]
pb = planner._parse_response(response, date(2026, 2, 8), "KR", candidates, None)
assert pb.market_outlook == MarketOutlook.NEUTRAL
def test_parse_scenario_with_all_condition_fields(self) -> None:
planner = _make_planner()
stocks = [
{
"stock_code": "005930",
"scenarios": [
{
"condition": {
"rsi_below": 25,
"volume_ratio_above": 3.0,
"price_change_pct_below": -2.0,
},
"action": "BUY",
"confidence": 92,
"allocation_pct": 20.0,
"stop_loss_pct": -3.0,
"take_profit_pct": 5.0,
"rationale": "Multi-condition entry",
}
],
}
]
response = _gemini_response_json(stocks=stocks)
candidates = [_candidate()]
pb = planner._parse_response(response, date(2026, 2, 8), "KR", candidates, None)
sc = pb.stock_playbooks[0].scenarios[0]
assert sc.condition.rsi_below == 25
assert sc.condition.volume_ratio_above == 3.0
assert sc.condition.price_change_pct_below == -2.0
assert sc.allocation_pct == 20.0
assert sc.stop_loss_pct == -3.0
assert sc.take_profit_pct == 5.0
def test_parse_empty_condition_scenario_skipped(self) -> None:
planner = _make_planner()
stocks = [
{
"stock_code": "005930",
"scenarios": [
{
"condition": {},
"action": "BUY",
"confidence": 85,
"rationale": "No conditions",
},
{
"condition": {"rsi_below": 30},
"action": "BUY",
"confidence": 80,
"rationale": "Valid",
},
],
}
]
response = _gemini_response_json(stocks=stocks)
candidates = [_candidate()]
pb = planner._parse_response(response, date(2026, 2, 8), "KR", candidates, None)
# Empty condition scenario skipped, valid one kept
assert pb.stock_count == 1
assert pb.stock_playbooks[0].scenarios[0].confidence == 80
def test_parse_max_scenarios_enforced(self) -> None:
planner = _make_planner()
# Settings default MAX_SCENARIOS_PER_STOCK = 5
scenarios = [
{
"condition": {"rsi_below": 20 + i},
"action": "BUY",
"confidence": 80 + i,
"rationale": f"Scenario {i}",
}
for i in range(8) # 8 scenarios, should be capped to 5
]
stocks = [{"stock_code": "005930", "scenarios": scenarios}]
response = _gemini_response_json(stocks=stocks)
candidates = [_candidate()]
pb = planner._parse_response(response, date(2026, 2, 8), "KR", candidates, None)
assert len(pb.stock_playbooks[0].scenarios) == 5
def test_parse_invalid_json_raises(self) -> None:
planner = _make_planner()
candidates = [_candidate()]
with pytest.raises(json.JSONDecodeError):
planner._parse_response("not json at all", date(2026, 2, 8), "KR", candidates, None)
def test_parse_cross_market_preserved(self) -> None:
planner = _make_planner()
response = _gemini_response_json()
candidates = [_candidate()]
cross = CrossMarketContext(market="US", date="2026-02-07", total_pnl=1.5, win_rate=60)
pb = planner._parse_response(response, date(2026, 2, 8), "KR", candidates, cross)
assert pb.cross_market is not None
assert pb.cross_market.market == "US"
assert pb.cross_market.total_pnl == 1.5
# ---------------------------------------------------------------------------
# build_cross_market_context
# ---------------------------------------------------------------------------
class TestBuildCrossMarketContext:
def test_kr_reads_us_scorecard(self) -> None:
scorecard = {"total_pnl": 2.5, "win_rate": 65, "index_change_pct": 0.8, "lessons": ["Stay patient"]}
planner = _make_planner(scorecard_data=scorecard)
ctx = planner.build_cross_market_context("KR", today=date(2026, 2, 8))
assert ctx is not None
assert ctx.market == "US"
assert ctx.total_pnl == 2.5
assert ctx.win_rate == 65
assert "Stay patient" in ctx.lessons
# Verify it queried scorecard_US
planner._context_store.get_context.assert_called_once_with(
ContextLayer.L6_DAILY, "2026-02-08", "scorecard_US"
)
def test_us_reads_kr_scorecard(self) -> None:
scorecard = {"total_pnl": -1.0, "win_rate": 40, "index_change_pct": -0.5}
planner = _make_planner(scorecard_data=scorecard)
ctx = planner.build_cross_market_context("US", today=date(2026, 2, 8))
assert ctx is not None
assert ctx.market == "KR"
assert ctx.total_pnl == -1.0
planner._context_store.get_context.assert_called_once_with(
ContextLayer.L6_DAILY, "2026-02-08", "scorecard_KR"
)
def test_no_scorecard_returns_none(self) -> None:
planner = _make_planner(scorecard_data=None)
ctx = planner.build_cross_market_context("KR", today=date(2026, 2, 8))
assert ctx is None
def test_invalid_scorecard_returns_none(self) -> None:
planner = _make_planner(scorecard_data="not a dict and not json")
ctx = planner.build_cross_market_context("KR", today=date(2026, 2, 8))
assert ctx is None
# ---------------------------------------------------------------------------
# _build_prompt
# ---------------------------------------------------------------------------
class TestBuildPrompt:
def test_prompt_contains_candidates(self) -> None:
planner = _make_planner()
candidates = [_candidate(code="005930", name="Samsung")]
prompt = planner._build_prompt("KR", candidates, {}, None)
assert "005930" in prompt
assert "Samsung" in prompt
assert "RSI=" in prompt
assert "volume_ratio=" in prompt
def test_prompt_contains_cross_market(self) -> None:
planner = _make_planner()
cross = CrossMarketContext(
market="US", date="2026-02-07", total_pnl=1.5,
win_rate=60, index_change_pct=0.8, lessons=["Cut losses early"],
)
prompt = planner._build_prompt("KR", [_candidate()], {}, cross)
assert "Other Market (US)" in prompt
assert "+1.50%" in prompt
assert "Cut losses early" in prompt
def test_prompt_contains_context_data(self) -> None:
planner = _make_planner()
context = {"L6_DAILY": {"win_rate": 0.65, "total_pnl": 2.5}}
prompt = planner._build_prompt("KR", [_candidate()], context, None)
assert "Strategic Context" in prompt
assert "L6_DAILY" in prompt
assert "win_rate" in prompt
def test_prompt_contains_max_scenarios(self) -> None:
planner = _make_planner()
prompt = planner._build_prompt("KR", [_candidate()], {}, None)
assert f"Max {planner._settings.MAX_SCENARIOS_PER_STOCK} scenarios" in prompt
def test_prompt_market_name(self) -> None:
planner = _make_planner()
prompt = planner._build_prompt("US", [_candidate()], {}, None)
assert "US market" in prompt
# ---------------------------------------------------------------------------
# _extract_json
# ---------------------------------------------------------------------------
class TestExtractJson:
def test_plain_json(self) -> None:
assert PreMarketPlanner._extract_json('{"a": 1}') == '{"a": 1}'
def test_with_json_fence(self) -> None:
text = '```json\n{"a": 1}\n```'
assert PreMarketPlanner._extract_json(text) == '{"a": 1}'
def test_with_plain_fence(self) -> None:
text = '```\n{"a": 1}\n```'
assert PreMarketPlanner._extract_json(text) == '{"a": 1}'
def test_with_whitespace(self) -> None:
text = ' \n {"a": 1} \n '
assert PreMarketPlanner._extract_json(text) == '{"a": 1}'
# ---------------------------------------------------------------------------
# Defensive playbook
# ---------------------------------------------------------------------------
class TestDefensivePlaybook:
def test_defensive_has_stop_loss(self) -> None:
candidates = [_candidate(code="005930"), _candidate(code="AAPL")]
pb = PreMarketPlanner._defensive_playbook(date(2026, 2, 8), "KR", candidates)
assert pb.default_action == ScenarioAction.HOLD
assert pb.market_outlook == MarketOutlook.NEUTRAL_TO_BEARISH
assert pb.stock_count == 2
for sp in pb.stock_playbooks:
assert sp.scenarios[0].action == ScenarioAction.SELL
assert sp.scenarios[0].stop_loss_pct == -3.0
def test_defensive_has_global_rule(self) -> None:
pb = PreMarketPlanner._defensive_playbook(date(2026, 2, 8), "KR", [_candidate()])
assert len(pb.global_rules) == 1
assert pb.global_rules[0].action == ScenarioAction.REDUCE_ALL
def test_empty_playbook(self) -> None:
pb = PreMarketPlanner._empty_playbook(date(2026, 2, 8), "US")
assert pb.stock_count == 0
assert pb.market == "US"
assert pb.market_outlook == MarketOutlook.NEUTRAL

View File

@@ -160,6 +160,83 @@ class TestNotificationSending:
assert "250.50" in payload["text"] assert "250.50" in payload["text"]
assert "92%" in payload["text"] assert "92%" in payload["text"]
@pytest.mark.asyncio
async def test_playbook_generated_format(self) -> None:
"""Playbook generated notification has expected fields."""
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True
)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
await client.notify_playbook_generated(
market="KR",
stock_count=4,
scenario_count=12,
token_count=980,
)
payload = mock_post.call_args.kwargs["json"]
assert "Playbook Generated" in payload["text"]
assert "Market: KR" in payload["text"]
assert "Stocks: 4" in payload["text"]
assert "Scenarios: 12" in payload["text"]
assert "Tokens: 980" in payload["text"]
@pytest.mark.asyncio
async def test_scenario_matched_format(self) -> None:
"""Scenario matched notification has expected fields."""
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True
)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
await client.notify_scenario_matched(
stock_code="AAPL",
action="BUY",
condition_summary="RSI < 30, volume_ratio > 2.0",
confidence=88.2,
)
payload = mock_post.call_args.kwargs["json"]
assert "Scenario Matched" in payload["text"]
assert "AAPL" in payload["text"]
assert "Action: BUY" in payload["text"]
assert "RSI < 30" in payload["text"]
assert "88%" in payload["text"]
@pytest.mark.asyncio
async def test_playbook_failed_format(self) -> None:
"""Playbook failed notification has expected fields."""
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True
)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
await client.notify_playbook_failed(
market="US",
reason="Gemini timeout",
)
payload = mock_post.call_args.kwargs["json"]
assert "Playbook Failed" in payload["text"]
assert "Market: US" in payload["text"]
assert "Gemini timeout" in payload["text"]
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_circuit_breaker_priority(self) -> None: async def test_circuit_breaker_priority(self) -> None:
"""Circuit breaker uses CRITICAL priority.""" """Circuit breaker uses CRITICAL priority."""
@@ -309,6 +386,73 @@ class TestMessagePriorities:
payload = mock_post.call_args.kwargs["json"] payload = mock_post.call_args.kwargs["json"]
assert NotificationPriority.CRITICAL.emoji in payload["text"] assert NotificationPriority.CRITICAL.emoji in payload["text"]
@pytest.mark.asyncio
async def test_playbook_generated_priority(self) -> None:
"""Playbook generated uses MEDIUM priority emoji."""
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True
)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
await client.notify_playbook_generated(
market="KR",
stock_count=2,
scenario_count=4,
token_count=123,
)
payload = mock_post.call_args.kwargs["json"]
assert NotificationPriority.MEDIUM.emoji in payload["text"]
@pytest.mark.asyncio
async def test_playbook_failed_priority(self) -> None:
"""Playbook failed uses HIGH priority emoji."""
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True
)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
await client.notify_playbook_failed(
market="KR",
reason="Invalid JSON",
)
payload = mock_post.call_args.kwargs["json"]
assert NotificationPriority.HIGH.emoji in payload["text"]
@pytest.mark.asyncio
async def test_scenario_matched_priority(self) -> None:
"""Scenario matched uses HIGH priority emoji."""
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True
)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
await client.notify_scenario_matched(
stock_code="AAPL",
action="BUY",
condition_summary="RSI < 30",
confidence=80.0,
)
payload = mock_post.call_args.kwargs["json"]
assert NotificationPriority.HIGH.emoji in payload["text"]
class TestClientCleanup: class TestClientCleanup:
"""Test client cleanup behavior.""" """Test client cleanup behavior."""