Compare commits

...

59 Commits

Author SHA1 Message Date
agentson
3fdb7a29d4 feat: US market code 정합성, Telegram 명령 4종, 손절 모니터링 (#132)
Some checks failed
CI / test (pull_request) Has been cancelled
- MARKET_SHORTHAND + expand_market_codes()로 config "US" → schedule "US_NASDAQ/NYSE/AMEX" 자동 확장
- /report, /scenarios, /review, /dashboard 텔레그램 명령 추가
- price_change_pct를 trading_cycle과 run_daily_session에 주입
- HOLD시 get_open_position 기반 손절 모니터링 및 자동 SELL 오버라이드
- 대시보드 /api/status 동적 market 조회로 변경

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 20:24:01 +09:00
31b4d0bf1e Merge pull request 'fix: daily_review 테스트 날짜 불일치 수정 (#129)' (#130) from feature/issue-129-fix-daily-review-test-date into main
Some checks failed
CI / test (push) Has been cancelled
CI / test (pull_request) Has been cancelled
Reviewed-on: #130
2026-02-16 11:30:20 +09:00
agentson
e2275a23b1 fix: daily_review 테스트에서 날짜 불일치로 인한 실패 수정 (#129)
Some checks failed
CI / test (pull_request) Has been cancelled
DecisionLogger와 log_trade가 datetime.now(UTC)로 현재 날짜를 저장하는데,
테스트에서 하드코딩된 '2026-02-14'로 조회하여 0건이 반환되던 문제 수정.
generate_scorecard 호출 시 TODAY 변수를 사용하도록 변경.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 10:05:17 +09:00
7522bb7e66 Merge pull request 'feat: 대시보드 실행 통합 - CLI + 환경변수 (issue #97)' (#128) from feature/issue-97-dashboard-integration into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #128
2026-02-15 00:01:57 +09:00
agentson
63fa6841a2 feat: dashboard background thread with CLI flag (issue #97)
Some checks failed
CI / test (pull_request) Has been cancelled
Add --dashboard CLI flag and DASHBOARD_ENABLED env var to start
FastAPI dashboard in a daemon thread alongside the trading loop.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-15 00:01:29 +09:00
ece3c5597b Merge pull request 'feat: FastAPI 읽기 전용 대시보드 (issue #96)' (#127) from feature/issue-96-evolution-main-integration into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #127
2026-02-14 23:57:17 +09:00
agentson
63f4e49d88 feat: read-only FastAPI dashboard with 7 API endpoints (issue #96)
Some checks failed
CI / test (pull_request) Has been cancelled
Add observability dashboard: status, playbook, scorecard, performance,
context browser, decisions, and active scenarios endpoints.
SQLite read-only on separate connections from trading loop.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-14 23:56:10 +09:00
agentson
e0a6b307a2 fix: add error handling to evolution loop telegram notification
Wrap evolution notification in try/except so telegram failures don't
crash the evolution loop. Add integration tests for market close flow.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-14 23:56:04 +09:00
75320eb587 Merge pull request 'feat: 전략 진화 루프 연결 (issue #95)' (#126) from feature/issue-95-evolution-loop into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #126
2026-02-14 23:42:30 +09:00
agentson
afb31b7f4b feat: wire evolution loop into market close flow (issue #95)
Some checks failed
CI / test (pull_request) Has been cancelled
Run EvolutionOptimizer.evolve() at US market close, skip for other
markets, and notify via Telegram when a strategy PR is generated.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-14 23:41:41 +09:00
a429a9f4da Merge pull request 'feat: 레거시 컨텍스트 정리 스케줄러 연결 (issue #89)' (#125) from feature/issue-89-legacy-context-cleanup into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #125
2026-02-14 23:38:11 +09:00
agentson
d9763def85 feat: integrate ContextScheduler into main loop (issue #89)
Some checks failed
CI / test (pull_request) Has been cancelled
Wire up periodic context rollups (weekly/monthly/quarterly/annual/legacy)
in both daily and realtime trading loops with dedup-safe scheduling.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-14 23:37:30 +09:00
ab7f0444b2 Merge pull request 'feat: 플래너에 자기 시장 성적표 주입 (issue #94)' (#124) from feature/issue-94-planner-scorecard-injection into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #124
2026-02-14 23:34:09 +09:00
agentson
6b3960a3a4 feat: inject self-market scorecard into planner prompt (issue #94)
Some checks failed
CI / test (pull_request) Has been cancelled
Add build_self_market_scorecard() to read previous day's own market
performance, and include it in the Gemini planning prompt alongside
cross-market context.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-14 23:27:01 +09:00
6cad8e74e1 Merge pull request 'feat: 플래너 크로스마켓 날짜 보정 + 전략 컨텍스트 (issue #88)' (#123) from feat/v2-2-4-planner-context-crossmarket into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #123
2026-02-14 23:21:12 +09:00
agentson
86c94cff62 feat: cross-market date fix and strategic context selector (issue #88)
Some checks failed
CI / test (pull_request) Has been cancelled
KR planner now reads US scorecard from previous day (timezone-aware),
and generate_playbook uses STRATEGIC context selection.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-14 23:20:24 +09:00
692cb61991 Merge pull request 'feat: main.py에 일일 리뷰 연결 (issue #93)' (#122) from feature/issue-93-daily-review-integration into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #122
2026-02-14 23:15:26 +09:00
agentson
392422992b feat: integrate DailyReviewer into market close flow (issue #93)
Some checks failed
CI / test (pull_request) Has been cancelled
Extract _handle_market_close() helper that runs EOD aggregation,
generates scorecard with optional AI lessons, and sends Telegram summary.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-14 23:13:57 +09:00
cc637a9738 Merge pull request 'feat: Daily Reviewer - 시장별 성적표 생성 (issue #91)' (#121) from feature/issue-91-daily-reviewer into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #121
2026-02-14 23:08:05 +09:00
agentson
8c27473fed feat: DailyReviewer for market-scoped scorecards and AI lessons (issue #91)
Some checks failed
CI / test (pull_request) Has been cancelled
Generate per-market daily scorecards from decision_logs and trades,
optional Gemini-powered lessons, and store results in L6 context.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-14 23:07:12 +09:00
bde54c7487 Merge pull request 'feat: Decision outcome 업데이트 (issue #92)' (#120) from feature/issue-92-decision-outcome into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #120
2026-02-14 22:41:29 +09:00
agentson
a14f944fcc feat: link decision outcomes to trades via decision_id (issue #92)
Some checks failed
CI / test (pull_request) Has been cancelled
Add decision_id column to trades table, capture log_decision() return
value, and update original BUY decision outcome on SELL execution.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-14 21:36:57 +09:00
56f7405baa Merge pull request 'feat: 컨텍스트 집계 스케줄러 (issue #87)' (#119) from feature/issue-87-context-scheduler into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #119
2026-02-10 04:28:42 +09:00
agentson
e3b1ecc572 feat: context aggregation scheduler (issue #87)
Some checks failed
CI / test (pull_request) Has been cancelled
- Add ContextScheduler with run_if_due() for periodic rollups
- Weekly (Sunday), monthly (last day), quarterly, annual, legacy schedules
- Daily cleanup of expired contexts via ContextStore
- Dedup guard: each task runs at most once per day

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-10 04:26:51 +09:00
8acf72b22c Merge pull request 'feat: DailyScorecard 모델 정의 (issue #90)' (#118) from feature/issue-90-scorecard-model into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #118
2026-02-10 04:26:21 +09:00
agentson
c95102a0bd feat: DailyScorecard model for per-market performance review (issue #90)
Some checks failed
CI / test (pull_request) Has been cancelled
- Add DailyScorecard dataclass with market-scoped fields
- Fields: date, market, decisions, pnl, win_rate, scenario_match_rate, lessons, cross_market_note
- Export from src/evolution/__init__.py

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-10 04:25:37 +09:00
0685d62f9c Merge pull request 'feat: EOD 집계 시장 필터 추가 (issue #86)' (#117) from feature/issue-86-eod-market-filter into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #117
2026-02-10 04:24:58 +09:00
agentson
78021d4695 feat: EOD aggregation with market filter (issue #86)
Some checks failed
CI / test (pull_request) Has been cancelled
- Add market parameter to aggregate_daily_from_trades() for per-market L6 aggregation
- Store market-scoped keys (total_pnl_KR, win_rate_US, etc.) in L6/L5/L4 layers
- Hook aggregate_daily_from_trades() into market close detection in run()
- Update tests for market-scoped context keys

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-10 04:23:49 +09:00
3cdd10783b Merge pull request 'feat: L7 실시간 컨텍스트 시장별 기록 (issue #85)' (#116) from feature/issue-85-l7-context-write into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #116
2026-02-10 04:22:57 +09:00
agentson
c4e31be27a feat: L7 real-time context write with market-scoped keys (issue #85)
Some checks failed
CI / test (pull_request) Has been cancelled
- Add L7_REALTIME writes in trading_cycle() for volatility, price, rsi, volume_ratio
- Normalize key format to {metric}_{market}_{stock_code} across scanner and main
- Fix existing key mismatch between scanner writes and main reads
- Remove unused MarketScanner dead code

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-10 04:21:52 +09:00
9d9ade14eb Merge pull request 'docs: add plan-implementation consistency check to code review checklist (#114)' (#115) from feature/issue-114-review-plan-consistency into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #115
2026-02-10 04:16:30 +09:00
agentson
9a8936ab34 docs: add plan-implementation consistency check to code review checklist (#114)
Some checks failed
CI / test (pull_request) Has been cancelled
리뷰 시 플랜과 구현의 일치 여부를 필수로 확인하는 규칙 추가.
- workflow.md에 Code Review Checklist 섹션 신설
- requirements-log.md에 사용자 요구사항 기록

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-10 04:15:51 +09:00
c5831966ed Merge pull request 'fix: derive all aggregation timeframes from trade timestamp (#112)' (#113) from fix/test-failures into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #113
2026-02-10 00:42:39 +09:00
agentson
f03cc6039b fix: derive all aggregation timeframes from trade timestamp (#112)
Some checks failed
CI / test (pull_request) Has been cancelled
run_all_aggregations() previously used datetime.now(UTC) for weekly
through annual layers while using the trade date only for daily,
causing data misalignment on backfill. Now all layers consistently
use the latest trade timestamp. Also adds "Z" suffix handling for
fromisoformat() compatibility and strengthens test assertions to
verify L4-L2 layer values end-to-end.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-10 00:40:28 +09:00
9171e54652 Merge pull request 'feat: integrate scenario engine and playbook into main trading loop (issue #84)' (#110) from feature/issue-84-main-integration into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #110
2026-02-09 23:18:24 +09:00
agentson
d64e072f06 fix: PR review — DB reload, market-local date, market-scoped scan_candidates
Some checks failed
CI / test (pull_request) Has been cancelled
Address PR #110 review findings:

1. High — Realtime mode now loads playbook from DB before calling Gemini,
   preventing duplicate API calls on process restart (4/day budget).
2. Medium — Pass market-local date (via market.timezone) to
   generate_playbook() and _empty_playbook() instead of date.today().
3. Medium — scan_candidates restructured from {stock_code: candidate}
   to {market_code: {stock_code: candidate}} to prevent KR/US symbol
   collision.

New test: test_scan_candidates_market_scoped verifies cross-market
isolation.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-09 23:00:06 +09:00
agentson
b2312fbe01 fix: resolve lint issues in main.py and test_main.py
Some checks failed
CI / test (pull_request) Has been cancelled
Remove unused imports (sys, ScenarioMatch, asyncio, StockPlaybook),
fix import ordering, and split long lines for ruff compliance.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 22:28:31 +09:00
agentson
98c4a2413c feat: integrate scenario engine and playbook into main trading loop (issue #84)
Replace brain.decide() with scenario_engine.evaluate() in trading_cycle
and brain.decide_batch() with per-stock scenario evaluation in
run_daily_session. Initialize PreMarketPlanner, ScenarioEngine, and
PlaybookStore in run(). Add pre-market playbook generation on market
open (1 Gemini call per market per day), market_data enrichment from
scanner metrics (rsi, volume_ratio), portfolio_data for global rules,
scenario match notifications, and playbook lifecycle management.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 22:24:19 +09:00
6fba7c7ae8 Merge pull request 'feat: implement pre-market planner with Gemini integration (issue #83)' (#109) from feature/issue-83-pre-market-planner into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #109
2026-02-08 22:07:36 +09:00
agentson
be695a5d7c fix: address PR review — inject today param, remove unused imports, fix lint
Some checks failed
CI / test (pull_request) Has been cancelled
Review findings addressed:
- Finding 1 (ImportError): false positive — ContextLayer is re-exported from
  src.context.store, import works correctly at runtime
- Finding 2 (timezone): generate_playbook() and build_cross_market_context()
  now accept optional today parameter for market-local date injection
- Finding 3 (lint): removed unused imports (UTC, datetime, PlaybookStatus),
  fixed line-too-long in prompt template
- Tests simplified: replaced date patching with direct today= parameter

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 21:57:39 +09:00
agentson
6471e66d89 fix: correct Settings field name in planner tests (KIS_ACCOUNT_NO)
Some checks failed
CI / test (pull_request) Has been cancelled
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 21:36:42 +09:00
agentson
149039a904 feat: implement pre-market planner with Gemini integration (issue #83)
PreMarketPlanner generates DayPlaybook via single Gemini API call per market:
- Structured JSON prompt with scan candidates + strategic context
- Cross-market context (KR reads US scorecard, US reads KR scorecard)
- Robust JSON parser with markdown fence stripping
- Unknown stock filtering (only scanner candidates allowed)
- MAX_SCENARIOS_PER_STOCK enforcement
- Defensive playbook on failure (HOLD + stop-loss)
- Empty playbook when no candidates (safe, no trades)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 21:35:57 +09:00
815d675529 Merge pull request 'feat: add Telegram playbook notifications (issue #81)' (#108) from feature/issue-81-telegram-playbook-notify into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #108
2026-02-08 21:27:46 +09:00
agentson
e8634b93c3 feat: add Telegram playbook notifications (issue #81)
Some checks failed
CI / test (pull_request) Has been cancelled
- notify_playbook_generated(): market, stock/scenario count, token usage (MEDIUM)
- notify_scenario_matched(): stock, action, condition, confidence (HIGH)
- notify_playbook_failed(): market, reason with 200-char truncation (HIGH)
- 6 new tests: 3 format + 3 priority validations

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 21:25:16 +09:00
f20736fd2a Merge pull request 'feat: add playbook persistence with DB schema and CRUD store (issue #82)' (#107) from feature/issue-82-playbook-persistence into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #107
2026-02-08 21:07:13 +09:00
agentson
7f2f96a819 feat: add playbook persistence with DB schema and CRUD store (issue #82)
Some checks failed
CI / test (pull_request) Has been cancelled
- Add playbooks table to src/db.py with UNIQUE(date, market) constraint
- PlaybookStore: save/load/delete, status management, match_count tracking,
  list_recent with market filter, stats without full deserialization
- DayPlaybook JSON serialization via Pydantic model_dump_json/model_validate_json
- 23 tests, 100% coverage on playbook_store.py

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 21:00:04 +09:00
aaa74894dd Merge pull request 'feat: implement local scenario engine for playbook execution (issue #80)' (#102) from feature/issue-80-scenario-engine into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #102
2026-02-08 20:47:34 +09:00
agentson
e711d6702a fix: deduplicate missing-key warnings and normalize match_details
Some checks failed
CI / test (pull_request) Has been cancelled
Addresses second round of PR #102 review:
- _warn_missing_key(): logs each missing key only once per engine instance
  to prevent log spam in high-frequency trading loops
- _build_match_details(): uses _safe_float() normalized values instead of
  raw market_data to ensure consistent float types in logging/analysis
- Test: verify warning fires exactly once across repeated calls
- Test: verify match_details contains normalized float values

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 20:41:20 +09:00
agentson
d2fc829380 fix: add safe type casting and missing-key warnings in ScenarioEngine
Some checks failed
CI / test (pull_request) Has been cancelled
Addresses PR #102 review findings:
- _safe_float() prevents TypeError from str/Decimal/invalid market_data values
- Warning logs when condition references a key missing from market_data
- 5 new tests: string, percent string, Decimal, mixed invalid types, log check

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 16:23:54 +09:00
de27b1af10 Merge pull request 'Require rebase after creating feature branch' (#106) from feature/issue-105-branch-rebase into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #106
2026-02-08 16:04:57 +09:00
agentson
7370220497 Require rebase after creating feature branch
Some checks failed
CI / test (pull_request) Has been cancelled
2026-02-08 16:03:41 +09:00
b01dacf328 Merge pull request 'docs: add persistent agent constraints document (issue #100)' (#103) from feature/issue-100-agent-constraints into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #103
2026-02-08 15:12:19 +09:00
agentson
1210c17989 docs: add persistent agent constraints document (issue #100)
Some checks failed
CI / test (pull_request) Has been cancelled
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 15:10:49 +09:00
agentson
9599b188e8 feat: implement local scenario engine for playbook execution (issue #80)
Some checks failed
CI / test (pull_request) Has been cancelled
ScenarioEngine evaluates pre-defined playbook scenarios against real-time
market data with sub-100ms execution (zero API calls). Supports condition
AND-matching, global portfolio rules, and first-match-wins priority.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 02:23:53 +09:00
c43660a58c Merge pull request 'feat: add strategy/playbook Pydantic models (issue #79)' (#99) from feature/issue-79-strategy-models into main
Some checks failed
CI / test (push) Has been cancelled
2026-02-08 02:19:48 +09:00
agentson
7fd48c7764 feat: add strategy/playbook Pydantic models (issue #79)
Some checks failed
CI / test (pull_request) Has been cancelled
Define data contracts for the proactive strategy system:
- StockCondition: AND-combined condition fields (RSI, volume, price)
- StockScenario: condition-action rules with stop loss/take profit
- StockPlaybook: per-stock scenario collection
- GlobalRule: portfolio-level rules (e.g. REDUCE_ALL on loss limit)
- DayPlaybook: complete daily playbook per market with validation
- CrossMarketContext: cross-market awareness (KR↔US)
- ScenarioAction, MarketOutlook, PlaybookStatus enums

33 tests covering validation, serialization, edge cases.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 02:06:16 +09:00
a105bb7c1a Merge pull request 'feat: add pre-market planner config and remove static watchlists (issue #78)' (#98) from feature/issue-78-config-watchlist-removal into main
Some checks failed
CI / test (push) Has been cancelled
2026-02-08 02:04:23 +09:00
agentson
1a34a74232 feat: add pre-market planner config and remove static watchlists (issue #78)
Some checks failed
CI / test (pull_request) Has been cancelled
- Add pre-market planner settings: PRE_MARKET_MINUTES, MAX_SCENARIOS_PER_STOCK,
  PLANNER_TIMEOUT_SECONDS, DEFENSIVE_PLAYBOOK_ON_FAILURE, RESCAN_INTERVAL_SECONDS
- Change ENABLED_MARKETS default from KR to KR,US
- Remove static WATCHLISTS and STOCK_UNIVERSE dictionaries from main.py
- Replace watchlist-based trading with dynamic scanner-only stock discovery
- SmartVolatilityScanner is now the sole source of trading candidates
- Add active_stocks dict for scanner-discovered stocks per market
- Add smart_scanner parameter to run_daily_session()

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 01:58:09 +09:00
a82a167915 Merge pull request 'feat: implement Smart Volatility Scanner (issue #76)' (#77) from feature/issue-76-smart-volatility-scanner into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #77
2026-02-06 07:43:54 +09:00
39 changed files with 7009 additions and 235 deletions

45
docs/agent-constraints.md Normal file
View File

@@ -0,0 +1,45 @@
# Agent Constraints
This document records **persistent behavioral constraints** for agents working on this repository.
It is distinct from `docs/requirements-log.md`, which records **project/product requirements**.
## Scope
- Applies to all AI agents and automation that modify this repo.
- Supplements (does not replace) `docs/agents.md` and `docs/workflow.md`.
## Persistent Rules
1. **Workflow enforcement**
- Follow `docs/workflow.md` for all changes.
- Create a Gitea issue before any code or documentation change.
- Work on a feature branch `feature/issue-{N}-{short-description}` and open a PR.
- Never commit directly to `main`.
2. **Document-first routing**
- When performing work, consult relevant `docs/` files *before* making changes.
- Route decisions to the documented policy whenever applicable.
- If guidance conflicts, prefer the stricter/safety-first rule and note it in the PR.
3. **Docs with code**
- Any code change must be accompanied by relevant documentation updates.
- If no doc update is needed, state the reason explicitly in the PR.
4. **Session-persistent user constraints**
- If the user requests that a behavior should persist across sessions, record it here
(or in a dedicated policy doc) and reference it when working.
- Keep entries short and concrete, with dates.
## Change Control
- Changes to this file follow the same workflow as code changes.
- Keep the history chronological and minimize rewording of existing entries.
## History
### 2026-02-08
- Always enforce Gitea workflow: issue -> feature branch -> PR before changes.
- When work requires guidance, consult the relevant `docs/` policies first.
- Any code change must be accompanied by relevant documentation updates.
- Persist user constraints across sessions by recording them in this document.

View File

@@ -64,3 +64,25 @@
**참고:** Realtime 모드 전용. Daily 모드는 배치 효율성을 위해 정적 watchlist 사용. **참고:** Realtime 모드 전용. Daily 모드는 배치 효율성을 위해 정적 watchlist 사용.
**이슈/PR:** #76, #77 **이슈/PR:** #76, #77
---
## 2026-02-10
### 코드 리뷰 시 플랜-구현 일치 검증 규칙
**배경:**
- 코드 리뷰 시 플랜(EnterPlanMode에서 승인된 계획)과 실제 구현이 일치하는지 확인하는 절차가 없었음
- 플랜과 다른 구현이 리뷰 없이 통과될 위험
**요구사항:**
1. 모든 PR 리뷰에서 플랜-구현 일치 여부를 필수 체크
2. 플랜에 없는 변경은 정당한 사유 필요
3. 플랜 항목이 누락되면 PR 설명에 사유 기록
4. 스코프가 플랜과 일치하는지 확인
**구현 결과:**
- `docs/workflow.md`에 Code Review Checklist 섹션 추가
- Plan Consistency (필수), Safety & Constraints, Quality, Workflow 4개 카테고리
**이슈/PR:** #114

View File

@@ -6,6 +6,7 @@
1. **Create Gitea Issue First** — All features, bug fixes, and policy changes require a Gitea issue before any code is written 1. **Create Gitea Issue First** — All features, bug fixes, and policy changes require a Gitea issue before any code is written
2. **Create Feature Branch** — Branch from `main` using format `feature/issue-{N}-{short-description}` 2. **Create Feature Branch** — Branch from `main` using format `feature/issue-{N}-{short-description}`
- After creating the branch, run `git pull origin main` and rebase to ensure the branch is up to date
3. **Implement Changes** — Write code, tests, and documentation on the feature branch 3. **Implement Changes** — Write code, tests, and documentation on the feature branch
4. **Create Pull Request** — Submit PR to `main` branch referencing the issue number 4. **Create Pull Request** — Submit PR to `main` branch referencing the issue number
5. **Review & Merge** — After approval, merge via PR (squash or merge commit) 5. **Review & Merge** — After approval, merge via PR (squash or merge commit)
@@ -73,3 +74,37 @@ task_tool(
``` ```
Use `run_in_background=True` for independent tasks that don't block subsequent work. Use `run_in_background=True` for independent tasks that don't block subsequent work.
## Code Review Checklist
**CRITICAL: Every PR review MUST verify plan-implementation consistency.**
Before approving any PR, the reviewer (human or agent) must check ALL of the following:
### 1. Plan Consistency (MANDATORY)
- [ ] **Implementation matches the approved plan** — Compare the actual code changes against the plan created during `EnterPlanMode`. Every item in the plan must be addressed.
- [ ] **No unplanned changes** — If the implementation includes changes not in the plan, they must be explicitly justified.
- [ ] **No plan items omitted** — If any planned item was skipped, the reason must be documented in the PR description.
- [ ] **Scope matches** — The PR does not exceed or fall short of the planned scope.
### 2. Safety & Constraints
- [ ] `src/core/risk_manager.py` is unchanged (READ-ONLY)
- [ ] Circuit breaker threshold not weakened (only stricter allowed)
- [ ] Fat-finger protection (30% max order) still enforced
- [ ] Confidence < 80 still forces HOLD
- [ ] No hardcoded API keys or secrets
### 3. Quality
- [ ] All new/modified code has corresponding tests
- [ ] Test coverage >= 80%
- [ ] `ruff check src/ tests/` passes (no lint errors)
- [ ] No `assert` statements removed from tests
### 4. Workflow
- [ ] PR references the Gitea issue number
- [ ] Feature branch follows naming convention (`feature/issue-N-description`)
- [ ] Commit messages are clear and descriptive

View File

@@ -9,6 +9,8 @@ dependencies = [
"pydantic-settings>=2.1,<3", "pydantic-settings>=2.1,<3",
"google-genai>=1.0,<2", "google-genai>=1.0,<2",
"scipy>=1.11,<2", "scipy>=1.11,<2",
"fastapi>=0.110,<1",
"uvicorn>=0.29,<1",
] ]
[project.optional-dependencies] [project.optional-dependencies]

View File

@@ -108,7 +108,7 @@ class MarketScanner:
self.context_store.set_context( self.context_store.set_context(
ContextLayer.L7_REALTIME, ContextLayer.L7_REALTIME,
timeframe, timeframe,
f"{market.code}_{stock_code}_volatility", f"volatility_{market.code}_{stock_code}",
{ {
"price": metrics.current_price, "price": metrics.current_price,
"atr": metrics.atr, "atr": metrics.atr,
@@ -179,7 +179,7 @@ class MarketScanner:
self.context_store.set_context( self.context_store.set_context(
ContextLayer.L7_REALTIME, ContextLayer.L7_REALTIME,
timeframe, timeframe,
f"{market.code}_scan_result", f"scan_result_{market.code}",
{ {
"total_scanned": len(valid_metrics), "total_scanned": len(valid_metrics),
"top_movers": [m.stock_code for m in top_movers], "top_movers": [m.stock_code for m in top_movers],

View File

@@ -55,8 +55,15 @@ class Settings(BaseSettings):
DAILY_SESSIONS: int = Field(default=4, ge=1, le=10) DAILY_SESSIONS: int = Field(default=4, ge=1, le=10)
SESSION_INTERVAL_HOURS: int = Field(default=6, ge=1, le=24) SESSION_INTERVAL_HOURS: int = Field(default=6, ge=1, le=24)
# Pre-Market Planner
PRE_MARKET_MINUTES: int = Field(default=30, ge=10, le=120)
MAX_SCENARIOS_PER_STOCK: int = Field(default=5, ge=1, le=10)
PLANNER_TIMEOUT_SECONDS: int = Field(default=60, ge=10, le=300)
DEFENSIVE_PLAYBOOK_ON_FAILURE: bool = True
RESCAN_INTERVAL_SECONDS: int = Field(default=300, ge=60, le=900)
# Market selection (comma-separated market codes) # Market selection (comma-separated market codes)
ENABLED_MARKETS: str = "KR" ENABLED_MARKETS: str = "KR,US"
# Backup and Disaster Recovery (optional) # Backup and Disaster Recovery (optional)
BACKUP_ENABLED: bool = True BACKUP_ENABLED: bool = True
@@ -76,6 +83,11 @@ class Settings(BaseSettings):
TELEGRAM_COMMANDS_ENABLED: bool = True TELEGRAM_COMMANDS_ENABLED: bool = True
TELEGRAM_POLLING_INTERVAL: float = 1.0 # seconds TELEGRAM_POLLING_INTERVAL: float = 1.0 # seconds
# Dashboard (optional)
DASHBOARD_ENABLED: bool = False
DASHBOARD_HOST: str = "127.0.0.1"
DASHBOARD_PORT: int = Field(default=8080, ge=1, le=65535)
model_config = {"env_file": ".env", "env_file_encoding": "utf-8"} model_config = {"env_file": ".env", "env_file_encoding": "utf-8"}
@property @property
@@ -89,4 +101,7 @@ class Settings(BaseSettings):
@property @property
def enabled_market_list(self) -> list[str]: def enabled_market_list(self) -> list[str]:
"""Parse ENABLED_MARKETS into list of market codes.""" """Parse ENABLED_MARKETS into list of market codes."""
return [m.strip() for m in self.ENABLED_MARKETS.split(",") if m.strip()] from src.markets.schedule import expand_market_codes
raw = [m.strip() for m in self.ENABLED_MARKETS.split(",") if m.strip()]
return expand_market_codes(raw)

View File

@@ -5,6 +5,7 @@ The context tree implements Pillar 2: hierarchical memory management across
""" """
from src.context.layer import ContextLayer from src.context.layer import ContextLayer
from src.context.scheduler import ContextScheduler
from src.context.store import ContextStore from src.context.store import ContextStore
__all__ = ["ContextLayer", "ContextStore"] __all__ = ["ContextLayer", "ContextScheduler", "ContextStore"]

View File

@@ -18,52 +18,83 @@ class ContextAggregator:
self.conn = conn self.conn = conn
self.store = ContextStore(conn) self.store = ContextStore(conn)
def aggregate_daily_from_trades(self, date: str | None = None) -> None: def aggregate_daily_from_trades(
self, date: str | None = None, market: str | None = None
) -> None:
"""Aggregate L6 (daily) context from trades table. """Aggregate L6 (daily) context from trades table.
Args: Args:
date: Date in YYYY-MM-DD format. If None, uses today. date: Date in YYYY-MM-DD format. If None, uses today.
market: Market code filter (e.g., "KR", "US"). If None, aggregates all markets.
""" """
if date is None: if date is None:
date = datetime.now(UTC).date().isoformat() date = datetime.now(UTC).date().isoformat()
# Calculate daily metrics from trades if market is None:
cursor = self.conn.execute( cursor = self.conn.execute(
""" """
SELECT SELECT DISTINCT market
COUNT(*) as trade_count, FROM trades
SUM(CASE WHEN action = 'BUY' THEN 1 ELSE 0 END) as buys, WHERE DATE(timestamp) = ?
SUM(CASE WHEN action = 'SELL' THEN 1 ELSE 0 END) as sells, """,
SUM(CASE WHEN action = 'HOLD' THEN 1 ELSE 0 END) as holds, (date,),
AVG(confidence) as avg_confidence,
SUM(pnl) as total_pnl,
COUNT(DISTINCT stock_code) as unique_stocks,
SUM(CASE WHEN pnl > 0 THEN 1 ELSE 0 END) as wins,
SUM(CASE WHEN pnl < 0 THEN 1 ELSE 0 END) as losses
FROM trades
WHERE DATE(timestamp) = ?
""",
(date,),
)
row = cursor.fetchone()
if row and row[0] > 0: # At least one trade
trade_count, buys, sells, holds, avg_conf, total_pnl, stocks, wins, losses = row
# Store daily metrics in L6
self.store.set_context(ContextLayer.L6_DAILY, date, "trade_count", trade_count)
self.store.set_context(ContextLayer.L6_DAILY, date, "buys", buys)
self.store.set_context(ContextLayer.L6_DAILY, date, "sells", sells)
self.store.set_context(ContextLayer.L6_DAILY, date, "holds", holds)
self.store.set_context(
ContextLayer.L6_DAILY, date, "avg_confidence", round(avg_conf, 2)
) )
self.store.set_context( markets = [row[0] for row in cursor.fetchall() if row[0]]
ContextLayer.L6_DAILY, date, "total_pnl", round(total_pnl, 2) else:
markets = [market]
for market_code in markets:
# Calculate daily metrics from trades for the market
cursor = self.conn.execute(
"""
SELECT
COUNT(*) as trade_count,
SUM(CASE WHEN action = 'BUY' THEN 1 ELSE 0 END) as buys,
SUM(CASE WHEN action = 'SELL' THEN 1 ELSE 0 END) as sells,
SUM(CASE WHEN action = 'HOLD' THEN 1 ELSE 0 END) as holds,
AVG(confidence) as avg_confidence,
SUM(pnl) as total_pnl,
COUNT(DISTINCT stock_code) as unique_stocks,
SUM(CASE WHEN pnl > 0 THEN 1 ELSE 0 END) as wins,
SUM(CASE WHEN pnl < 0 THEN 1 ELSE 0 END) as losses
FROM trades
WHERE DATE(timestamp) = ? AND market = ?
""",
(date, market_code),
) )
self.store.set_context(ContextLayer.L6_DAILY, date, "unique_stocks", stocks) row = cursor.fetchone()
win_rate = round(wins / max(wins + losses, 1) * 100, 2)
self.store.set_context(ContextLayer.L6_DAILY, date, "win_rate", win_rate) if row and row[0] > 0: # At least one trade
trade_count, buys, sells, holds, avg_conf, total_pnl, stocks, wins, losses = row
key_suffix = f"_{market_code}"
# Store daily metrics in L6 with market suffix
self.store.set_context(
ContextLayer.L6_DAILY, date, f"trade_count{key_suffix}", trade_count
)
self.store.set_context(ContextLayer.L6_DAILY, date, f"buys{key_suffix}", buys)
self.store.set_context(ContextLayer.L6_DAILY, date, f"sells{key_suffix}", sells)
self.store.set_context(ContextLayer.L6_DAILY, date, f"holds{key_suffix}", holds)
self.store.set_context(
ContextLayer.L6_DAILY,
date,
f"avg_confidence{key_suffix}",
round(avg_conf, 2),
)
self.store.set_context(
ContextLayer.L6_DAILY,
date,
f"total_pnl{key_suffix}",
round(total_pnl, 2),
)
self.store.set_context(
ContextLayer.L6_DAILY, date, f"unique_stocks{key_suffix}", stocks
)
win_rate = round(wins / max(wins + losses, 1) * 100, 2)
self.store.set_context(
ContextLayer.L6_DAILY, date, f"win_rate{key_suffix}", win_rate
)
def aggregate_weekly_from_daily(self, week: str | None = None) -> None: def aggregate_weekly_from_daily(self, week: str | None = None) -> None:
"""Aggregate L5 (weekly) context from L6 (daily). """Aggregate L5 (weekly) context from L6 (daily).
@@ -92,14 +123,25 @@ class ContextAggregator:
daily_data[row[0]].append(json.loads(row[1])) daily_data[row[0]].append(json.loads(row[1]))
if daily_data: if daily_data:
# Sum all PnL values # Sum all PnL values (market-specific if suffixed)
if "total_pnl" in daily_data: if "total_pnl" in daily_data:
total_pnl = sum(daily_data["total_pnl"]) total_pnl = sum(daily_data["total_pnl"])
self.store.set_context( self.store.set_context(
ContextLayer.L5_WEEKLY, week, "weekly_pnl", round(total_pnl, 2) ContextLayer.L5_WEEKLY, week, "weekly_pnl", round(total_pnl, 2)
) )
# Average all confidence values for key, values in daily_data.items():
if key.startswith("total_pnl_"):
market_code = key.split("total_pnl_", 1)[1]
total_pnl = sum(values)
self.store.set_context(
ContextLayer.L5_WEEKLY,
week,
f"weekly_pnl_{market_code}",
round(total_pnl, 2),
)
# Average all confidence values (market-specific if suffixed)
if "avg_confidence" in daily_data: if "avg_confidence" in daily_data:
conf_values = daily_data["avg_confidence"] conf_values = daily_data["avg_confidence"]
avg_conf = sum(conf_values) / len(conf_values) avg_conf = sum(conf_values) / len(conf_values)
@@ -107,6 +149,17 @@ class ContextAggregator:
ContextLayer.L5_WEEKLY, week, "avg_confidence", round(avg_conf, 2) ContextLayer.L5_WEEKLY, week, "avg_confidence", round(avg_conf, 2)
) )
for key, values in daily_data.items():
if key.startswith("avg_confidence_"):
market_code = key.split("avg_confidence_", 1)[1]
avg_conf = sum(values) / len(values)
self.store.set_context(
ContextLayer.L5_WEEKLY,
week,
f"avg_confidence_{market_code}",
round(avg_conf, 2),
)
def aggregate_monthly_from_weekly(self, month: str | None = None) -> None: def aggregate_monthly_from_weekly(self, month: str | None = None) -> None:
"""Aggregate L4 (monthly) context from L5 (weekly). """Aggregate L4 (monthly) context from L5 (weekly).
@@ -135,8 +188,16 @@ class ContextAggregator:
if weekly_data: if weekly_data:
# Sum all weekly PnL values # Sum all weekly PnL values
total_pnl_values: list[float] = []
if "weekly_pnl" in weekly_data: if "weekly_pnl" in weekly_data:
total_pnl = sum(weekly_data["weekly_pnl"]) total_pnl_values.extend(weekly_data["weekly_pnl"])
for key, values in weekly_data.items():
if key.startswith("weekly_pnl_"):
total_pnl_values.extend(values)
if total_pnl_values:
total_pnl = sum(total_pnl_values)
self.store.set_context( self.store.set_context(
ContextLayer.L4_MONTHLY, month, "monthly_pnl", round(total_pnl, 2) ContextLayer.L4_MONTHLY, month, "monthly_pnl", round(total_pnl, 2)
) )
@@ -230,21 +291,44 @@ class ContextAggregator:
) )
def run_all_aggregations(self) -> None: def run_all_aggregations(self) -> None:
"""Run all aggregations from L7 to L1 (bottom-up).""" """Run all aggregations from L7 to L1 (bottom-up).
All timeframes are derived from the latest trade timestamp so that
past data re-aggregation produces consistent results across layers.
"""
cursor = self.conn.execute("SELECT MAX(timestamp) FROM trades")
row = cursor.fetchone()
if not row or row[0] is None:
return
ts_raw = row[0]
if ts_raw.endswith("Z"):
ts_raw = ts_raw.replace("Z", "+00:00")
latest_ts = datetime.fromisoformat(ts_raw)
trade_date = latest_ts.date()
date_str = trade_date.isoformat()
iso_year, iso_week, _ = trade_date.isocalendar()
week_str = f"{iso_year}-W{iso_week:02d}"
month_str = f"{trade_date.year}-{trade_date.month:02d}"
quarter = (trade_date.month - 1) // 3 + 1
quarter_str = f"{trade_date.year}-Q{quarter}"
year_str = str(trade_date.year)
# L7 (trades) → L6 (daily) # L7 (trades) → L6 (daily)
self.aggregate_daily_from_trades() self.aggregate_daily_from_trades(date_str)
# L6 (daily) → L5 (weekly) # L6 (daily) → L5 (weekly)
self.aggregate_weekly_from_daily() self.aggregate_weekly_from_daily(week_str)
# L5 (weekly) → L4 (monthly) # L5 (weekly) → L4 (monthly)
self.aggregate_monthly_from_weekly() self.aggregate_monthly_from_weekly(month_str)
# L4 (monthly) → L3 (quarterly) # L4 (monthly) → L3 (quarterly)
self.aggregate_quarterly_from_monthly() self.aggregate_quarterly_from_monthly(quarter_str)
# L3 (quarterly) → L2 (annual) # L3 (quarterly) → L2 (annual)
self.aggregate_annual_from_quarterly() self.aggregate_annual_from_quarterly(year_str)
# L2 (annual) → L1 (legacy) # L2 (annual) → L1 (legacy)
self.aggregate_legacy_from_annual() self.aggregate_legacy_from_annual()

135
src/context/scheduler.py Normal file
View File

@@ -0,0 +1,135 @@
"""Context aggregation scheduler for periodic rollups and cleanup."""
from __future__ import annotations
import sqlite3
from calendar import monthrange
from dataclasses import dataclass
from datetime import UTC, datetime
from src.context.aggregator import ContextAggregator
from src.context.store import ContextStore
@dataclass(frozen=True)
class ScheduleResult:
"""Represents which scheduled tasks ran."""
weekly: bool = False
monthly: bool = False
quarterly: bool = False
annual: bool = False
legacy: bool = False
cleanup: bool = False
class ContextScheduler:
"""Run periodic context aggregations and cleanup when due."""
def __init__(
self,
conn: sqlite3.Connection | None = None,
aggregator: ContextAggregator | None = None,
store: ContextStore | None = None,
) -> None:
if aggregator is None:
if conn is None:
raise ValueError("conn is required when aggregator is not provided")
aggregator = ContextAggregator(conn)
self.aggregator = aggregator
if store is None:
store = getattr(aggregator, "store", None)
if store is None:
if conn is None:
raise ValueError("conn is required when store is not provided")
store = ContextStore(conn)
self.store = store
self._last_run: dict[str, str] = {}
def run_if_due(self, now: datetime | None = None) -> ScheduleResult:
"""Run scheduled aggregations if their schedule is due.
Args:
now: Current datetime (UTC). If None, uses current time.
Returns:
ScheduleResult indicating which tasks ran.
"""
if now is None:
now = datetime.now(UTC)
today = now.date().isoformat()
result = ScheduleResult()
if self._should_run("cleanup", today):
self.store.cleanup_expired_contexts()
result = self._with(result, cleanup=True)
if self._is_sunday(now) and self._should_run("weekly", today):
week = now.strftime("%Y-W%V")
self.aggregator.aggregate_weekly_from_daily(week)
result = self._with(result, weekly=True)
if self._is_last_day_of_month(now) and self._should_run("monthly", today):
month = now.strftime("%Y-%m")
self.aggregator.aggregate_monthly_from_weekly(month)
result = self._with(result, monthly=True)
if self._is_last_day_of_quarter(now) and self._should_run("quarterly", today):
quarter = self._current_quarter(now)
self.aggregator.aggregate_quarterly_from_monthly(quarter)
result = self._with(result, quarterly=True)
if self._is_last_day_of_year(now) and self._should_run("annual", today):
year = str(now.year)
self.aggregator.aggregate_annual_from_quarterly(year)
result = self._with(result, annual=True)
# Legacy rollup runs after annual aggregation.
self.aggregator.aggregate_legacy_from_annual()
result = self._with(result, legacy=True)
return result
def _should_run(self, key: str, date_str: str) -> bool:
if self._last_run.get(key) == date_str:
return False
self._last_run[key] = date_str
return True
@staticmethod
def _is_sunday(now: datetime) -> bool:
return now.weekday() == 6
@staticmethod
def _is_last_day_of_month(now: datetime) -> bool:
last_day = monthrange(now.year, now.month)[1]
return now.day == last_day
@classmethod
def _is_last_day_of_quarter(cls, now: datetime) -> bool:
if now.month not in (3, 6, 9, 12):
return False
return cls._is_last_day_of_month(now)
@staticmethod
def _is_last_day_of_year(now: datetime) -> bool:
return now.month == 12 and now.day == 31
@staticmethod
def _current_quarter(now: datetime) -> str:
quarter = (now.month - 1) // 3 + 1
return f"{now.year}-Q{quarter}"
@staticmethod
def _with(result: ScheduleResult, **kwargs: bool) -> ScheduleResult:
return ScheduleResult(
weekly=kwargs.get("weekly", result.weekly),
monthly=kwargs.get("monthly", result.monthly),
quarterly=kwargs.get("quarterly", result.quarterly),
annual=kwargs.get("annual", result.annual),
legacy=kwargs.get("legacy", result.legacy),
cleanup=kwargs.get("cleanup", result.cleanup),
)

View File

@@ -0,0 +1,5 @@
"""FastAPI dashboard package for observability APIs."""
from src.dashboard.app import create_dashboard_app
__all__ = ["create_dashboard_app"]

361
src/dashboard/app.py Normal file
View File

@@ -0,0 +1,361 @@
"""FastAPI application for observability dashboard endpoints."""
from __future__ import annotations
import json
import sqlite3
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from fastapi import FastAPI, HTTPException, Query
from fastapi.responses import FileResponse
def create_dashboard_app(db_path: str) -> FastAPI:
"""Create dashboard FastAPI app bound to a SQLite database path."""
app = FastAPI(title="The Ouroboros Dashboard", version="1.0.0")
app.state.db_path = db_path
@app.get("/")
def index() -> FileResponse:
index_path = Path(__file__).parent / "static" / "index.html"
return FileResponse(index_path)
@app.get("/api/status")
def get_status() -> dict[str, Any]:
today = datetime.now(UTC).date().isoformat()
with _connect(db_path) as conn:
market_rows = conn.execute(
"""
SELECT DISTINCT market FROM (
SELECT market FROM trades WHERE DATE(timestamp) = ?
UNION
SELECT market FROM decision_logs WHERE DATE(timestamp) = ?
UNION
SELECT market FROM playbooks WHERE date = ?
) ORDER BY market
""",
(today, today, today),
).fetchall()
markets = [row[0] for row in market_rows] if market_rows else []
market_status: dict[str, Any] = {}
total_trades = 0
total_pnl = 0.0
total_decisions = 0
for market in markets:
trade_row = conn.execute(
"""
SELECT COUNT(*) AS c, COALESCE(SUM(pnl), 0.0) AS p
FROM trades
WHERE DATE(timestamp) = ? AND market = ?
""",
(today, market),
).fetchone()
decision_row = conn.execute(
"""
SELECT COUNT(*) AS c
FROM decision_logs
WHERE DATE(timestamp) = ? AND market = ?
""",
(today, market),
).fetchone()
playbook_row = conn.execute(
"""
SELECT status
FROM playbooks
WHERE date = ? AND market = ?
LIMIT 1
""",
(today, market),
).fetchone()
market_status[market] = {
"trade_count": int(trade_row["c"] if trade_row else 0),
"total_pnl": float(trade_row["p"] if trade_row else 0.0),
"decision_count": int(decision_row["c"] if decision_row else 0),
"playbook_status": playbook_row["status"] if playbook_row else None,
}
total_trades += market_status[market]["trade_count"]
total_pnl += market_status[market]["total_pnl"]
total_decisions += market_status[market]["decision_count"]
return {
"date": today,
"markets": market_status,
"totals": {
"trade_count": total_trades,
"total_pnl": round(total_pnl, 2),
"decision_count": total_decisions,
},
}
@app.get("/api/playbook/{date_str}")
def get_playbook(date_str: str, market: str = Query("KR")) -> dict[str, Any]:
with _connect(db_path) as conn:
row = conn.execute(
"""
SELECT date, market, status, playbook_json, generated_at,
token_count, scenario_count, match_count
FROM playbooks
WHERE date = ? AND market = ?
""",
(date_str, market),
).fetchone()
if row is None:
raise HTTPException(status_code=404, detail="playbook not found")
return {
"date": row["date"],
"market": row["market"],
"status": row["status"],
"playbook": json.loads(row["playbook_json"]),
"generated_at": row["generated_at"],
"token_count": row["token_count"],
"scenario_count": row["scenario_count"],
"match_count": row["match_count"],
}
@app.get("/api/scorecard/{date_str}")
def get_scorecard(date_str: str, market: str = Query("KR")) -> dict[str, Any]:
key = f"scorecard_{market}"
with _connect(db_path) as conn:
row = conn.execute(
"""
SELECT value
FROM contexts
WHERE layer = 'L6_DAILY' AND timeframe = ? AND key = ?
""",
(date_str, key),
).fetchone()
if row is None:
raise HTTPException(status_code=404, detail="scorecard not found")
return {"date": date_str, "market": market, "scorecard": json.loads(row["value"])}
@app.get("/api/performance")
def get_performance(market: str = Query("all")) -> dict[str, Any]:
with _connect(db_path) as conn:
if market == "all":
by_market_rows = conn.execute(
"""
SELECT market,
COUNT(*) AS total_trades,
SUM(CASE WHEN pnl > 0 THEN 1 ELSE 0 END) AS wins,
SUM(CASE WHEN pnl < 0 THEN 1 ELSE 0 END) AS losses,
COALESCE(SUM(pnl), 0.0) AS total_pnl,
COALESCE(AVG(confidence), 0.0) AS avg_confidence
FROM trades
GROUP BY market
ORDER BY market
"""
).fetchall()
combined = _performance_from_rows(by_market_rows)
return {
"market": "all",
"combined": combined,
"by_market": [
_row_to_performance(row)
for row in by_market_rows
],
}
row = conn.execute(
"""
SELECT market,
COUNT(*) AS total_trades,
SUM(CASE WHEN pnl > 0 THEN 1 ELSE 0 END) AS wins,
SUM(CASE WHEN pnl < 0 THEN 1 ELSE 0 END) AS losses,
COALESCE(SUM(pnl), 0.0) AS total_pnl,
COALESCE(AVG(confidence), 0.0) AS avg_confidence
FROM trades
WHERE market = ?
GROUP BY market
""",
(market,),
).fetchone()
if row is None:
return {"market": market, "metrics": _empty_performance(market)}
return {"market": market, "metrics": _row_to_performance(row)}
@app.get("/api/context/{layer}")
def get_context_layer(
layer: str,
timeframe: str | None = Query(default=None),
limit: int = Query(default=100, ge=1, le=1000),
) -> dict[str, Any]:
with _connect(db_path) as conn:
if timeframe is None:
rows = conn.execute(
"""
SELECT timeframe, key, value, updated_at
FROM contexts
WHERE layer = ?
ORDER BY updated_at DESC
LIMIT ?
""",
(layer, limit),
).fetchall()
else:
rows = conn.execute(
"""
SELECT timeframe, key, value, updated_at
FROM contexts
WHERE layer = ? AND timeframe = ?
ORDER BY key
LIMIT ?
""",
(layer, timeframe, limit),
).fetchall()
entries = [
{
"timeframe": row["timeframe"],
"key": row["key"],
"value": json.loads(row["value"]),
"updated_at": row["updated_at"],
}
for row in rows
]
return {
"layer": layer,
"timeframe": timeframe,
"count": len(entries),
"entries": entries,
}
@app.get("/api/decisions")
def get_decisions(
market: str = Query("KR"),
limit: int = Query(default=50, ge=1, le=500),
) -> dict[str, Any]:
with _connect(db_path) as conn:
rows = conn.execute(
"""
SELECT decision_id, timestamp, stock_code, market, exchange_code,
action, confidence, rationale, context_snapshot, input_data,
outcome_pnl, outcome_accuracy
FROM decision_logs
WHERE market = ?
ORDER BY timestamp DESC
LIMIT ?
""",
(market, limit),
).fetchall()
decisions = []
for row in rows:
decisions.append(
{
"decision_id": row["decision_id"],
"timestamp": row["timestamp"],
"stock_code": row["stock_code"],
"market": row["market"],
"exchange_code": row["exchange_code"],
"action": row["action"],
"confidence": row["confidence"],
"rationale": row["rationale"],
"context_snapshot": json.loads(row["context_snapshot"]),
"input_data": json.loads(row["input_data"]),
"outcome_pnl": row["outcome_pnl"],
"outcome_accuracy": row["outcome_accuracy"],
}
)
return {"market": market, "count": len(decisions), "decisions": decisions}
@app.get("/api/scenarios/active")
def get_active_scenarios(
market: str = Query("US"),
date_str: str | None = Query(default=None),
limit: int = Query(default=50, ge=1, le=500),
) -> dict[str, Any]:
if date_str is None:
date_str = datetime.now(UTC).date().isoformat()
with _connect(db_path) as conn:
rows = conn.execute(
"""
SELECT timestamp, stock_code, action, confidence, rationale, context_snapshot
FROM decision_logs
WHERE market = ? AND DATE(timestamp) = ?
ORDER BY timestamp DESC
LIMIT ?
""",
(market, date_str, limit),
).fetchall()
matches: list[dict[str, Any]] = []
for row in rows:
snapshot = json.loads(row["context_snapshot"])
scenario_match = snapshot.get("scenario_match", {})
if not isinstance(scenario_match, dict) or not scenario_match:
continue
matches.append(
{
"timestamp": row["timestamp"],
"stock_code": row["stock_code"],
"action": row["action"],
"confidence": row["confidence"],
"rationale": row["rationale"],
"scenario_match": scenario_match,
}
)
return {"market": market, "date": date_str, "count": len(matches), "matches": matches}
return app
def _connect(db_path: str) -> sqlite3.Connection:
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
return conn
def _row_to_performance(row: sqlite3.Row) -> dict[str, Any]:
wins = int(row["wins"] or 0)
losses = int(row["losses"] or 0)
total = int(row["total_trades"] or 0)
win_rate = round((wins / (wins + losses) * 100), 2) if (wins + losses) > 0 else 0.0
return {
"market": row["market"],
"total_trades": total,
"wins": wins,
"losses": losses,
"win_rate": win_rate,
"total_pnl": round(float(row["total_pnl"] or 0.0), 2),
"avg_confidence": round(float(row["avg_confidence"] or 0.0), 2),
}
def _performance_from_rows(rows: list[sqlite3.Row]) -> dict[str, Any]:
total_trades = 0
wins = 0
losses = 0
total_pnl = 0.0
confidence_weighted = 0.0
for row in rows:
market_total = int(row["total_trades"] or 0)
market_conf = float(row["avg_confidence"] or 0.0)
total_trades += market_total
wins += int(row["wins"] or 0)
losses += int(row["losses"] or 0)
total_pnl += float(row["total_pnl"] or 0.0)
confidence_weighted += market_total * market_conf
win_rate = round((wins / (wins + losses) * 100), 2) if (wins + losses) > 0 else 0.0
avg_confidence = round(confidence_weighted / total_trades, 2) if total_trades > 0 else 0.0
return {
"market": "all",
"total_trades": total_trades,
"wins": wins,
"losses": losses,
"win_rate": win_rate,
"total_pnl": round(total_pnl, 2),
"avg_confidence": avg_confidence,
}
def _empty_performance(market: str) -> dict[str, Any]:
return {
"market": market,
"total_trades": 0,
"wins": 0,
"losses": 0,
"win_rate": 0.0,
"total_pnl": 0.0,
"avg_confidence": 0.0,
}

View File

@@ -0,0 +1,61 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>The Ouroboros Dashboard</title>
<style>
:root {
--bg: #0b1724;
--panel: #12263a;
--fg: #e6eef7;
--muted: #9fb3c8;
--accent: #3cb371;
}
body {
margin: 0;
font-family: ui-monospace, SFMono-Regular, Menlo, monospace;
background: radial-gradient(circle at top left, #173b58, var(--bg));
color: var(--fg);
}
.wrap {
max-width: 900px;
margin: 48px auto;
padding: 0 16px;
}
.card {
background: color-mix(in oklab, var(--panel), black 12%);
border: 1px solid #28455f;
border-radius: 12px;
padding: 20px;
}
h1 {
margin-top: 0;
}
code {
color: var(--accent);
}
li {
margin: 6px 0;
color: var(--muted);
}
</style>
</head>
<body>
<div class="wrap">
<div class="card">
<h1>The Ouroboros Dashboard API</h1>
<p>Use the following endpoints:</p>
<ul>
<li><code>/api/status</code></li>
<li><code>/api/playbook/{date}?market=KR</code></li>
<li><code>/api/scorecard/{date}?market=KR</code></li>
<li><code>/api/performance?market=all</code></li>
<li><code>/api/context/{layer}</code></li>
<li><code>/api/decisions?market=KR</code></li>
<li><code>/api/scenarios/active?market=US</code></li>
</ul>
</div>
</div>
</body>
</html>

View File

@@ -6,6 +6,7 @@ import json
import sqlite3 import sqlite3
from datetime import UTC, datetime from datetime import UTC, datetime
from pathlib import Path from pathlib import Path
from typing import Any
def init_db(db_path: str) -> sqlite3.Connection: def init_db(db_path: str) -> sqlite3.Connection:
@@ -26,7 +27,8 @@ def init_db(db_path: str) -> sqlite3.Connection:
price REAL, price REAL,
pnl REAL DEFAULT 0.0, pnl REAL DEFAULT 0.0,
market TEXT DEFAULT 'KR', market TEXT DEFAULT 'KR',
exchange_code TEXT DEFAULT 'KRX' exchange_code TEXT DEFAULT 'KRX',
decision_id TEXT
) )
""" """
) )
@@ -41,6 +43,8 @@ def init_db(db_path: str) -> sqlite3.Connection:
conn.execute("ALTER TABLE trades ADD COLUMN exchange_code TEXT DEFAULT 'KRX'") conn.execute("ALTER TABLE trades ADD COLUMN exchange_code TEXT DEFAULT 'KRX'")
if "selection_context" not in columns: if "selection_context" not in columns:
conn.execute("ALTER TABLE trades ADD COLUMN selection_context TEXT") conn.execute("ALTER TABLE trades ADD COLUMN selection_context TEXT")
if "decision_id" not in columns:
conn.execute("ALTER TABLE trades ADD COLUMN decision_id TEXT")
# Context tree tables for multi-layered memory management # Context tree tables for multi-layered memory management
conn.execute( conn.execute(
@@ -91,6 +95,27 @@ def init_db(db_path: str) -> sqlite3.Connection:
""" """
) )
# Playbook storage for pre-market strategy persistence
conn.execute(
"""
CREATE TABLE IF NOT EXISTS playbooks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT NOT NULL,
market TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
playbook_json TEXT NOT NULL,
generated_at TEXT NOT NULL,
token_count INTEGER DEFAULT 0,
scenario_count INTEGER DEFAULT 0,
match_count INTEGER DEFAULT 0,
UNIQUE(date, market)
)
"""
)
conn.execute("CREATE INDEX IF NOT EXISTS idx_playbooks_date ON playbooks(date)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_playbooks_market ON playbooks(market)")
# Create indices for efficient context queries # Create indices for efficient context queries
conn.execute("CREATE INDEX IF NOT EXISTS idx_contexts_layer ON contexts(layer)") conn.execute("CREATE INDEX IF NOT EXISTS idx_contexts_layer ON contexts(layer)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_contexts_timeframe ON contexts(timeframe)") conn.execute("CREATE INDEX IF NOT EXISTS idx_contexts_timeframe ON contexts(timeframe)")
@@ -122,6 +147,7 @@ def log_trade(
market: str = "KR", market: str = "KR",
exchange_code: str = "KRX", exchange_code: str = "KRX",
selection_context: dict[str, any] | None = None, selection_context: dict[str, any] | None = None,
decision_id: str | None = None,
) -> None: ) -> None:
"""Insert a trade record into the database. """Insert a trade record into the database.
@@ -145,9 +171,9 @@ def log_trade(
""" """
INSERT INTO trades ( INSERT INTO trades (
timestamp, stock_code, action, confidence, rationale, timestamp, stock_code, action, confidence, rationale,
quantity, price, pnl, market, exchange_code, selection_context quantity, price, pnl, market, exchange_code, selection_context, decision_id
) )
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", """,
( (
datetime.now(UTC).isoformat(), datetime.now(UTC).isoformat(),
@@ -161,6 +187,51 @@ def log_trade(
market, market,
exchange_code, exchange_code,
context_json, context_json,
decision_id,
), ),
) )
conn.commit() conn.commit()
def get_latest_buy_trade(
conn: sqlite3.Connection, stock_code: str, market: str
) -> dict[str, Any] | None:
"""Fetch the most recent BUY trade for a stock and market."""
cursor = conn.execute(
"""
SELECT decision_id, price, quantity
FROM trades
WHERE stock_code = ?
AND market = ?
AND action = 'BUY'
AND decision_id IS NOT NULL
ORDER BY timestamp DESC
LIMIT 1
""",
(stock_code, market),
)
row = cursor.fetchone()
if not row:
return None
return {"decision_id": row[0], "price": row[1], "quantity": row[2]}
def get_open_position(
conn: sqlite3.Connection, stock_code: str, market: str
) -> dict[str, Any] | None:
"""Return open position if latest trade is BUY, else None."""
cursor = conn.execute(
"""
SELECT action, decision_id, price, quantity
FROM trades
WHERE stock_code = ?
AND market = ?
ORDER BY timestamp DESC
LIMIT 1
""",
(stock_code, market),
)
row = cursor.fetchone()
if not row or row[0] != "BUY":
return None
return {"decision_id": row[1], "price": row[2], "quantity": row[3]}

View File

@@ -1,12 +1,14 @@
"""Evolution engine for self-improving trading strategies.""" """Evolution engine for self-improving trading strategies."""
from src.evolution.ab_test import ABTester, ABTestResult, StrategyPerformance from src.evolution.ab_test import ABTester, ABTestResult, StrategyPerformance
from src.evolution.daily_review import DailyReviewer
from src.evolution.optimizer import EvolutionOptimizer from src.evolution.optimizer import EvolutionOptimizer
from src.evolution.performance_tracker import ( from src.evolution.performance_tracker import (
PerformanceDashboard, PerformanceDashboard,
PerformanceTracker, PerformanceTracker,
StrategyMetrics, StrategyMetrics,
) )
from src.evolution.scorecard import DailyScorecard
__all__ = [ __all__ = [
"EvolutionOptimizer", "EvolutionOptimizer",
@@ -16,4 +18,6 @@ __all__ = [
"PerformanceTracker", "PerformanceTracker",
"PerformanceDashboard", "PerformanceDashboard",
"StrategyMetrics", "StrategyMetrics",
"DailyScorecard",
"DailyReviewer",
] ]

View File

@@ -0,0 +1,196 @@
"""Daily review generator for market-scoped end-of-day scorecards."""
from __future__ import annotations
import json
import logging
import re
import sqlite3
from dataclasses import asdict
from src.brain.gemini_client import GeminiClient
from src.context.layer import ContextLayer
from src.context.store import ContextStore
from src.evolution.scorecard import DailyScorecard
logger = logging.getLogger(__name__)
class DailyReviewer:
"""Builds daily scorecards and optional AI-generated lessons."""
def __init__(
self,
conn: sqlite3.Connection,
context_store: ContextStore,
gemini_client: GeminiClient | None = None,
) -> None:
self._conn = conn
self._context_store = context_store
self._gemini = gemini_client
def generate_scorecard(self, date: str, market: str) -> DailyScorecard:
"""Generate a market-scoped scorecard from decision logs and trades."""
decision_rows = self._conn.execute(
"""
SELECT action, confidence, context_snapshot
FROM decision_logs
WHERE DATE(timestamp) = ? AND market = ?
""",
(date, market),
).fetchall()
total_decisions = len(decision_rows)
buys = sum(1 for row in decision_rows if row[0] == "BUY")
sells = sum(1 for row in decision_rows if row[0] == "SELL")
holds = sum(1 for row in decision_rows if row[0] == "HOLD")
avg_confidence = (
round(sum(int(row[1]) for row in decision_rows) / total_decisions, 2)
if total_decisions > 0
else 0.0
)
matched = 0
for row in decision_rows:
try:
snapshot = json.loads(row[2]) if row[2] else {}
except json.JSONDecodeError:
snapshot = {}
scenario_match = snapshot.get("scenario_match", {})
if isinstance(scenario_match, dict) and scenario_match:
matched += 1
scenario_match_rate = (
round((matched / total_decisions) * 100, 2)
if total_decisions
else 0.0
)
trade_stats = self._conn.execute(
"""
SELECT
COALESCE(SUM(pnl), 0.0),
SUM(CASE WHEN pnl > 0 THEN 1 ELSE 0 END),
SUM(CASE WHEN pnl < 0 THEN 1 ELSE 0 END)
FROM trades
WHERE DATE(timestamp) = ? AND market = ?
""",
(date, market),
).fetchone()
total_pnl = round(float(trade_stats[0] or 0.0), 2) if trade_stats else 0.0
wins = int(trade_stats[1] or 0) if trade_stats else 0
losses = int(trade_stats[2] or 0) if trade_stats else 0
win_rate = round((wins / (wins + losses)) * 100, 2) if (wins + losses) > 0 else 0.0
top_winners = [
row[0]
for row in self._conn.execute(
"""
SELECT stock_code, SUM(pnl) AS stock_pnl
FROM trades
WHERE DATE(timestamp) = ? AND market = ?
GROUP BY stock_code
HAVING stock_pnl > 0
ORDER BY stock_pnl DESC
LIMIT 3
""",
(date, market),
).fetchall()
]
top_losers = [
row[0]
for row in self._conn.execute(
"""
SELECT stock_code, SUM(pnl) AS stock_pnl
FROM trades
WHERE DATE(timestamp) = ? AND market = ?
GROUP BY stock_code
HAVING stock_pnl < 0
ORDER BY stock_pnl ASC
LIMIT 3
""",
(date, market),
).fetchall()
]
return DailyScorecard(
date=date,
market=market,
total_decisions=total_decisions,
buys=buys,
sells=sells,
holds=holds,
total_pnl=total_pnl,
win_rate=win_rate,
avg_confidence=avg_confidence,
scenario_match_rate=scenario_match_rate,
top_winners=top_winners,
top_losers=top_losers,
lessons=[],
cross_market_note="",
)
async def generate_lessons(self, scorecard: DailyScorecard) -> list[str]:
"""Generate concise lessons from scorecard metrics using Gemini."""
if self._gemini is None:
return []
prompt = (
"You are a trading performance reviewer.\n"
"Return ONLY a JSON array of 1-3 short lessons in English.\n"
f"Market: {scorecard.market}\n"
f"Date: {scorecard.date}\n"
f"Total decisions: {scorecard.total_decisions}\n"
f"Buys/Sells/Holds: {scorecard.buys}/{scorecard.sells}/{scorecard.holds}\n"
f"Total PnL: {scorecard.total_pnl}\n"
f"Win rate: {scorecard.win_rate}%\n"
f"Average confidence: {scorecard.avg_confidence}\n"
f"Scenario match rate: {scorecard.scenario_match_rate}%\n"
f"Top winners: {', '.join(scorecard.top_winners) or 'N/A'}\n"
f"Top losers: {', '.join(scorecard.top_losers) or 'N/A'}\n"
)
try:
decision = await self._gemini.decide(
{
"stock_code": "REVIEW",
"market_name": scorecard.market,
"current_price": 0,
"prompt_override": prompt,
}
)
return self._parse_lessons(decision.rationale)
except Exception as exc:
logger.warning("Failed to generate daily lessons: %s", exc)
return []
def store_scorecard_in_context(self, scorecard: DailyScorecard) -> None:
"""Store scorecard in L6 using market-scoped key."""
self._context_store.set_context(
ContextLayer.L6_DAILY,
scorecard.date,
f"scorecard_{scorecard.market}",
asdict(scorecard),
)
def _parse_lessons(self, raw_text: str) -> list[str]:
"""Parse lessons from JSON array response or fallback text."""
raw_text = raw_text.strip()
try:
parsed = json.loads(raw_text)
if isinstance(parsed, list):
return [str(item).strip() for item in parsed if str(item).strip()][:3]
except json.JSONDecodeError:
pass
match = re.search(r"\[.*\]", raw_text, re.DOTALL)
if match:
try:
parsed = json.loads(match.group(0))
if isinstance(parsed, list):
return [str(item).strip() for item in parsed if str(item).strip()][:3]
except json.JSONDecodeError:
pass
lines = [line.strip("-* \t") for line in raw_text.splitlines() if line.strip()]
return lines[:3]

View File

@@ -0,0 +1,25 @@
"""Daily scorecard model for end-of-day performance review."""
from __future__ import annotations
from dataclasses import dataclass, field
@dataclass
class DailyScorecard:
"""Structured daily performance snapshot for a single market."""
date: str
market: str
total_decisions: int
buys: int
sells: int
holds: int
total_pnl: float
win_rate: float
avg_confidence: float
scenario_match_rate: float
top_winners: list[str] = field(default_factory=list)
top_losers: list[str] = field(default_factory=list)
lessons: list[str] = field(default_factory=list)
cross_market_note: str = ""

File diff suppressed because it is too large Load Diff

View File

@@ -123,6 +123,23 @@ MARKETS: dict[str, MarketInfo] = {
), ),
} }
MARKET_SHORTHAND: dict[str, list[str]] = {
"US": ["US_NASDAQ", "US_NYSE", "US_AMEX"],
"CN": ["CN_SHA", "CN_SZA"],
"VN": ["VN_HAN", "VN_HCM"],
}
def expand_market_codes(codes: list[str]) -> list[str]:
"""Expand shorthand market codes into concrete exchange market codes."""
expanded: list[str] = []
for code in codes:
if code in MARKET_SHORTHAND:
expanded.extend(MARKET_SHORTHAND[code])
else:
expanded.append(code)
return expanded
def is_market_open(market: MarketInfo, now: datetime | None = None) -> bool: def is_market_open(market: MarketInfo, now: datetime | None = None) -> bool:
""" """

View File

@@ -304,6 +304,77 @@ class TelegramClient:
NotificationMessage(priority=NotificationPriority.MEDIUM, message=message) NotificationMessage(priority=NotificationPriority.MEDIUM, message=message)
) )
async def notify_playbook_generated(
self,
market: str,
stock_count: int,
scenario_count: int,
token_count: int,
) -> None:
"""
Notify that a daily playbook was generated.
Args:
market: Market code (e.g., "KR", "US")
stock_count: Number of stocks in the playbook
scenario_count: Total number of scenarios
token_count: Gemini token usage for the playbook
"""
message = (
f"<b>Playbook Generated</b>\n"
f"Market: {market}\n"
f"Stocks: {stock_count}\n"
f"Scenarios: {scenario_count}\n"
f"Tokens: {token_count}"
)
await self._send_notification(
NotificationMessage(priority=NotificationPriority.MEDIUM, message=message)
)
async def notify_scenario_matched(
self,
stock_code: str,
action: str,
condition_summary: str,
confidence: float,
) -> None:
"""
Notify that a scenario matched for a stock.
Args:
stock_code: Stock ticker symbol
action: Scenario action (BUY/SELL/HOLD/REDUCE_ALL)
condition_summary: Short summary of the matched condition
confidence: Scenario confidence (0-100)
"""
message = (
f"<b>Scenario Matched</b>\n"
f"Symbol: <code>{stock_code}</code>\n"
f"Action: {action}\n"
f"Condition: {condition_summary}\n"
f"Confidence: {confidence:.0f}%"
)
await self._send_notification(
NotificationMessage(priority=NotificationPriority.HIGH, message=message)
)
async def notify_playbook_failed(self, market: str, reason: str) -> None:
"""
Notify that playbook generation failed.
Args:
market: Market code (e.g., "KR", "US")
reason: Failure reason summary
"""
message = (
f"<b>Playbook Failed</b>\n"
f"Market: {market}\n"
f"Reason: {reason[:200]}"
)
await self._send_notification(
NotificationMessage(priority=NotificationPriority.HIGH, message=message)
)
async def notify_system_shutdown(self, reason: str) -> None: async def notify_system_shutdown(self, reason: str) -> None:
""" """
Notify system shutdown. Notify system shutdown.

0
src/strategy/__init__.py Normal file
View File

164
src/strategy/models.py Normal file
View File

@@ -0,0 +1,164 @@
"""Pydantic models for pre-market scenario planning.
Defines the data contracts for the proactive strategy system:
- AI generates DayPlaybook before market open (structured JSON scenarios)
- Local ScenarioEngine matches conditions during market hours (no API calls)
"""
from __future__ import annotations
from datetime import UTC, date, datetime
from enum import Enum
from pydantic import BaseModel, Field, field_validator
class ScenarioAction(str, Enum):
"""Actions that can be taken by scenarios."""
BUY = "BUY"
SELL = "SELL"
HOLD = "HOLD"
REDUCE_ALL = "REDUCE_ALL"
class MarketOutlook(str, Enum):
"""AI's assessment of market direction."""
BULLISH = "bullish"
NEUTRAL_TO_BULLISH = "neutral_to_bullish"
NEUTRAL = "neutral"
NEUTRAL_TO_BEARISH = "neutral_to_bearish"
BEARISH = "bearish"
class PlaybookStatus(str, Enum):
"""Lifecycle status of a playbook."""
PENDING = "pending"
READY = "ready"
FAILED = "failed"
EXPIRED = "expired"
class StockCondition(BaseModel):
"""Condition fields for scenario matching (all optional, AND-combined).
The ScenarioEngine evaluates all non-None fields as AND conditions.
A condition matches only if ALL specified fields are satisfied.
"""
rsi_below: float | None = None
rsi_above: float | None = None
volume_ratio_above: float | None = None
volume_ratio_below: float | None = None
price_above: float | None = None
price_below: float | None = None
price_change_pct_above: float | None = None
price_change_pct_below: float | None = None
def has_any_condition(self) -> bool:
"""Check if at least one condition field is set."""
return any(
v is not None
for v in (
self.rsi_below,
self.rsi_above,
self.volume_ratio_above,
self.volume_ratio_below,
self.price_above,
self.price_below,
self.price_change_pct_above,
self.price_change_pct_below,
)
)
class StockScenario(BaseModel):
"""A single condition-action rule for one stock."""
condition: StockCondition
action: ScenarioAction
confidence: int = Field(ge=0, le=100)
allocation_pct: float = Field(ge=0, le=100, default=10.0)
stop_loss_pct: float = Field(le=0, default=-2.0)
take_profit_pct: float = Field(ge=0, default=3.0)
rationale: str = ""
class StockPlaybook(BaseModel):
"""All scenarios for a single stock (ordered by priority)."""
stock_code: str
stock_name: str = ""
scenarios: list[StockScenario] = Field(min_length=1)
class GlobalRule(BaseModel):
"""Portfolio-level rule (checked before stock-level scenarios)."""
condition: str # e.g. "portfolio_pnl_pct < -2.0"
action: ScenarioAction
rationale: str = ""
class CrossMarketContext(BaseModel):
"""Summary of another market's state for cross-market awareness."""
market: str # e.g. "US" or "KR"
date: str
total_pnl: float = 0.0
win_rate: float = 0.0
index_change_pct: float = 0.0 # e.g. KOSPI or S&P500 change
key_events: list[str] = Field(default_factory=list)
lessons: list[str] = Field(default_factory=list)
class DayPlaybook(BaseModel):
"""Complete playbook for a single trading day in a single market.
Generated by PreMarketPlanner (1 Gemini call per market per day).
Consumed by ScenarioEngine during market hours (0 API calls).
"""
date: date
market: str # "KR" or "US"
market_outlook: MarketOutlook = MarketOutlook.NEUTRAL
generated_at: str = "" # ISO timestamp
gemini_model: str = ""
token_count: int = 0
global_rules: list[GlobalRule] = Field(default_factory=list)
stock_playbooks: list[StockPlaybook] = Field(default_factory=list)
default_action: ScenarioAction = ScenarioAction.HOLD
context_summary: dict = Field(default_factory=dict)
cross_market: CrossMarketContext | None = None
@field_validator("stock_playbooks")
@classmethod
def validate_unique_stocks(cls, v: list[StockPlaybook]) -> list[StockPlaybook]:
codes = [pb.stock_code for pb in v]
if len(codes) != len(set(codes)):
raise ValueError("Duplicate stock codes in playbook")
return v
def get_stock_playbook(self, stock_code: str) -> StockPlaybook | None:
"""Find the playbook for a specific stock."""
for pb in self.stock_playbooks:
if pb.stock_code == stock_code:
return pb
return None
@property
def scenario_count(self) -> int:
"""Total number of scenarios across all stocks."""
return sum(len(pb.scenarios) for pb in self.stock_playbooks)
@property
def stock_count(self) -> int:
"""Number of stocks with scenarios."""
return len(self.stock_playbooks)
def model_post_init(self, __context: object) -> None:
"""Set generated_at if not provided."""
if not self.generated_at:
self.generated_at = datetime.now(UTC).isoformat()

View File

@@ -0,0 +1,184 @@
"""Playbook persistence layer — CRUD for DayPlaybook in SQLite.
Stores and retrieves market-specific daily playbooks with JSON serialization.
Designed for the pre-market strategy system (one playbook per market per day).
"""
from __future__ import annotations
import json
import logging
import sqlite3
from datetime import date
from src.strategy.models import DayPlaybook, PlaybookStatus
logger = logging.getLogger(__name__)
class PlaybookStore:
"""CRUD operations for DayPlaybook persistence."""
def __init__(self, conn: sqlite3.Connection) -> None:
self._conn = conn
def save(self, playbook: DayPlaybook) -> int:
"""Save or replace a playbook for a given date+market.
Uses INSERT OR REPLACE to enforce UNIQUE(date, market).
Returns:
The row id of the inserted/replaced record.
"""
playbook_json = playbook.model_dump_json()
cursor = self._conn.execute(
"""
INSERT OR REPLACE INTO playbooks
(date, market, status, playbook_json, generated_at,
token_count, scenario_count, match_count)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
playbook.date.isoformat(),
playbook.market,
PlaybookStatus.READY.value,
playbook_json,
playbook.generated_at,
playbook.token_count,
playbook.scenario_count,
0,
),
)
self._conn.commit()
row_id = cursor.lastrowid or 0
logger.info(
"Saved playbook for %s/%s (%d stocks, %d scenarios)",
playbook.date, playbook.market,
playbook.stock_count, playbook.scenario_count,
)
return row_id
def load(self, target_date: date, market: str) -> DayPlaybook | None:
"""Load a playbook for a specific date and market.
Returns:
DayPlaybook if found, None otherwise.
"""
row = self._conn.execute(
"SELECT playbook_json FROM playbooks WHERE date = ? AND market = ?",
(target_date.isoformat(), market),
).fetchone()
if row is None:
return None
return DayPlaybook.model_validate_json(row[0])
def get_status(self, target_date: date, market: str) -> PlaybookStatus | None:
"""Get the status of a playbook without deserializing the full JSON."""
row = self._conn.execute(
"SELECT status FROM playbooks WHERE date = ? AND market = ?",
(target_date.isoformat(), market),
).fetchone()
if row is None:
return None
return PlaybookStatus(row[0])
def update_status(self, target_date: date, market: str, status: PlaybookStatus) -> bool:
"""Update the status of a playbook.
Returns:
True if a row was updated, False if not found.
"""
cursor = self._conn.execute(
"UPDATE playbooks SET status = ? WHERE date = ? AND market = ?",
(status.value, target_date.isoformat(), market),
)
self._conn.commit()
return cursor.rowcount > 0
def increment_match_count(self, target_date: date, market: str) -> bool:
"""Increment the match_count for tracking scenario hits during the day.
Returns:
True if a row was updated, False if not found.
"""
cursor = self._conn.execute(
"UPDATE playbooks SET match_count = match_count + 1 WHERE date = ? AND market = ?",
(target_date.isoformat(), market),
)
self._conn.commit()
return cursor.rowcount > 0
def get_stats(self, target_date: date, market: str) -> dict | None:
"""Get playbook stats without full deserialization.
Returns:
Dict with status, token_count, scenario_count, match_count, or None.
"""
row = self._conn.execute(
"""
SELECT status, token_count, scenario_count, match_count, generated_at
FROM playbooks WHERE date = ? AND market = ?
""",
(target_date.isoformat(), market),
).fetchone()
if row is None:
return None
return {
"status": row[0],
"token_count": row[1],
"scenario_count": row[2],
"match_count": row[3],
"generated_at": row[4],
}
def list_recent(self, market: str | None = None, limit: int = 7) -> list[dict]:
"""List recent playbooks with summary info.
Args:
market: Filter by market code. None for all markets.
limit: Max number of results.
Returns:
List of dicts with date, market, status, scenario_count, match_count.
"""
if market is not None:
rows = self._conn.execute(
"""
SELECT date, market, status, scenario_count, match_count
FROM playbooks WHERE market = ?
ORDER BY date DESC LIMIT ?
""",
(market, limit),
).fetchall()
else:
rows = self._conn.execute(
"""
SELECT date, market, status, scenario_count, match_count
FROM playbooks
ORDER BY date DESC LIMIT ?
""",
(limit,),
).fetchall()
return [
{
"date": row[0],
"market": row[1],
"status": row[2],
"scenario_count": row[3],
"match_count": row[4],
}
for row in rows
]
def delete(self, target_date: date, market: str) -> bool:
"""Delete a playbook.
Returns:
True if a row was deleted, False if not found.
"""
cursor = self._conn.execute(
"DELETE FROM playbooks WHERE date = ? AND market = ?",
(target_date.isoformat(), market),
)
self._conn.commit()
return cursor.rowcount > 0

View File

@@ -0,0 +1,472 @@
"""Pre-market planner — generates DayPlaybook via Gemini before market open.
One Gemini API call per market per day. Candidates come from SmartVolatilityScanner.
On failure, returns a defensive playbook (all HOLD, no trades).
"""
from __future__ import annotations
import json
import logging
from datetime import date, timedelta
from typing import Any
from src.analysis.smart_scanner import ScanCandidate
from src.brain.context_selector import ContextSelector, DecisionType
from src.brain.gemini_client import GeminiClient
from src.config import Settings
from src.context.store import ContextLayer, ContextStore
from src.strategy.models import (
CrossMarketContext,
DayPlaybook,
GlobalRule,
MarketOutlook,
ScenarioAction,
StockCondition,
StockPlaybook,
StockScenario,
)
logger = logging.getLogger(__name__)
# Mapping from string to MarketOutlook enum
_OUTLOOK_MAP: dict[str, MarketOutlook] = {
"bullish": MarketOutlook.BULLISH,
"neutral_to_bullish": MarketOutlook.NEUTRAL_TO_BULLISH,
"neutral": MarketOutlook.NEUTRAL,
"neutral_to_bearish": MarketOutlook.NEUTRAL_TO_BEARISH,
"bearish": MarketOutlook.BEARISH,
}
_ACTION_MAP: dict[str, ScenarioAction] = {
"BUY": ScenarioAction.BUY,
"SELL": ScenarioAction.SELL,
"HOLD": ScenarioAction.HOLD,
"REDUCE_ALL": ScenarioAction.REDUCE_ALL,
}
class PreMarketPlanner:
"""Generates a DayPlaybook by calling Gemini once before market open.
Flow:
1. Collect strategic context (L5-L7) + cross-market context
2. Build a structured prompt with scan candidates
3. Call Gemini for JSON scenario generation
4. Parse and validate response into DayPlaybook
5. On failure → defensive playbook (HOLD everything)
"""
def __init__(
self,
gemini_client: GeminiClient,
context_store: ContextStore,
context_selector: ContextSelector,
settings: Settings,
) -> None:
self._gemini = gemini_client
self._context_store = context_store
self._context_selector = context_selector
self._settings = settings
async def generate_playbook(
self,
market: str,
candidates: list[ScanCandidate],
today: date | None = None,
) -> DayPlaybook:
"""Generate a DayPlaybook for a market using Gemini.
Args:
market: Market code ("KR" or "US")
candidates: Stock candidates from SmartVolatilityScanner
today: Override date (defaults to date.today()). Use market-local date.
Returns:
DayPlaybook with scenarios. Empty/defensive if no candidates or failure.
"""
if today is None:
today = date.today()
if not candidates:
logger.info("No candidates for %s — returning empty playbook", market)
return self._empty_playbook(today, market)
try:
# 1. Gather context
context_data = self._gather_context()
self_market_scorecard = self.build_self_market_scorecard(market, today)
cross_market = self.build_cross_market_context(market, today)
# 2. Build prompt
prompt = self._build_prompt(
market,
candidates,
context_data,
self_market_scorecard,
cross_market,
)
# 3. Call Gemini
market_data = {
"stock_code": "PLANNER",
"current_price": 0,
"prompt_override": prompt,
}
decision = await self._gemini.decide(market_data)
# 4. Parse response
playbook = self._parse_response(
decision.rationale, today, market, candidates, cross_market
)
playbook_with_tokens = playbook.model_copy(
update={"token_count": decision.token_count}
)
logger.info(
"Generated playbook for %s: %d stocks, %d scenarios, %d tokens",
market,
playbook_with_tokens.stock_count,
playbook_with_tokens.scenario_count,
playbook_with_tokens.token_count,
)
return playbook_with_tokens
except Exception:
logger.exception("Playbook generation failed for %s", market)
if self._settings.DEFENSIVE_PLAYBOOK_ON_FAILURE:
return self._defensive_playbook(today, market, candidates)
return self._empty_playbook(today, market)
def build_cross_market_context(
self, target_market: str, today: date | None = None,
) -> CrossMarketContext | None:
"""Build cross-market context from the other market's L6 data.
KR planner → reads US scorecard from previous night.
US planner → reads KR scorecard from today.
Args:
target_market: The market being planned ("KR" or "US")
today: Override date (defaults to date.today()). Use market-local date.
"""
other_market = "US" if target_market == "KR" else "KR"
if today is None:
today = date.today()
timeframe_date = today - timedelta(days=1) if target_market == "KR" else today
timeframe = timeframe_date.isoformat()
scorecard_key = f"scorecard_{other_market}"
scorecard_data = self._context_store.get_context(
ContextLayer.L6_DAILY, timeframe, scorecard_key
)
if scorecard_data is None:
logger.debug("No cross-market scorecard found for %s", other_market)
return None
if isinstance(scorecard_data, str):
try:
scorecard_data = json.loads(scorecard_data)
except (json.JSONDecodeError, TypeError):
return None
if not isinstance(scorecard_data, dict):
return None
return CrossMarketContext(
market=other_market,
date=timeframe,
total_pnl=float(scorecard_data.get("total_pnl", 0.0)),
win_rate=float(scorecard_data.get("win_rate", 0.0)),
index_change_pct=float(scorecard_data.get("index_change_pct", 0.0)),
key_events=scorecard_data.get("key_events", []),
lessons=scorecard_data.get("lessons", []),
)
def build_self_market_scorecard(
self, market: str, today: date | None = None,
) -> dict[str, Any] | None:
"""Build previous-day scorecard for the same market."""
if today is None:
today = date.today()
timeframe = (today - timedelta(days=1)).isoformat()
scorecard_key = f"scorecard_{market}"
scorecard_data = self._context_store.get_context(
ContextLayer.L6_DAILY, timeframe, scorecard_key
)
if scorecard_data is None:
return None
if isinstance(scorecard_data, str):
try:
scorecard_data = json.loads(scorecard_data)
except (json.JSONDecodeError, TypeError):
return None
if not isinstance(scorecard_data, dict):
return None
return {
"date": timeframe,
"total_pnl": float(scorecard_data.get("total_pnl", 0.0)),
"win_rate": float(scorecard_data.get("win_rate", 0.0)),
"lessons": scorecard_data.get("lessons", []),
}
def _gather_context(self) -> dict[str, Any]:
"""Gather strategic context using ContextSelector."""
layers = self._context_selector.select_layers(
decision_type=DecisionType.STRATEGIC,
include_realtime=True,
)
return self._context_selector.get_context_data(layers, max_items_per_layer=10)
def _build_prompt(
self,
market: str,
candidates: list[ScanCandidate],
context_data: dict[str, Any],
self_market_scorecard: dict[str, Any] | None,
cross_market: CrossMarketContext | None,
) -> str:
"""Build a structured prompt for Gemini to generate scenario JSON."""
max_scenarios = self._settings.MAX_SCENARIOS_PER_STOCK
candidates_text = "\n".join(
f" - {c.stock_code} ({c.name}): price={c.price}, "
f"RSI={c.rsi:.1f}, volume_ratio={c.volume_ratio:.1f}, "
f"signal={c.signal}, score={c.score:.1f}"
for c in candidates
)
cross_market_text = ""
if cross_market:
cross_market_text = (
f"\n## Other Market ({cross_market.market}) Summary\n"
f"- P&L: {cross_market.total_pnl:+.2f}%\n"
f"- Win Rate: {cross_market.win_rate:.0f}%\n"
f"- Index Change: {cross_market.index_change_pct:+.2f}%\n"
)
if cross_market.lessons:
cross_market_text += f"- Lessons: {'; '.join(cross_market.lessons[:3])}\n"
self_market_text = ""
if self_market_scorecard:
self_market_text = (
f"\n## My Market Previous Day ({market})\n"
f"- Date: {self_market_scorecard['date']}\n"
f"- P&L: {self_market_scorecard['total_pnl']:+.2f}%\n"
f"- Win Rate: {self_market_scorecard['win_rate']:.0f}%\n"
)
lessons = self_market_scorecard.get("lessons", [])
if lessons:
self_market_text += f"- Lessons: {'; '.join(lessons[:3])}\n"
context_text = ""
if context_data:
context_text = "\n## Strategic Context\n"
for layer_name, layer_data in context_data.items():
if layer_data:
context_text += f"### {layer_name}\n"
for key, value in list(layer_data.items())[:5]:
context_text += f" - {key}: {value}\n"
return (
f"You are a pre-market trading strategist for the {market} market.\n"
f"Generate structured trading scenarios for today.\n\n"
f"## Candidates (from volatility scanner)\n{candidates_text}\n"
f"{self_market_text}"
f"{cross_market_text}"
f"{context_text}\n"
f"## Instructions\n"
f"Return a JSON object with this exact structure:\n"
f'{{\n'
f' "market_outlook": "bullish|neutral_to_bullish|neutral'
f'|neutral_to_bearish|bearish",\n'
f' "global_rules": [\n'
f' {{"condition": "portfolio_pnl_pct < -2.0",'
f' "action": "REDUCE_ALL", "rationale": "..."}}\n'
f' ],\n'
f' "stocks": [\n'
f' {{\n'
f' "stock_code": "...",\n'
f' "scenarios": [\n'
f' {{\n'
f' "condition": {{"rsi_below": 30, "volume_ratio_above": 2.0}},\n'
f' "action": "BUY|SELL|HOLD",\n'
f' "confidence": 85,\n'
f' "allocation_pct": 10.0,\n'
f' "stop_loss_pct": -2.0,\n'
f' "take_profit_pct": 3.0,\n'
f' "rationale": "..."\n'
f' }}\n'
f' ]\n'
f' }}\n'
f' ]\n'
f'}}\n\n'
f"Rules:\n"
f"- Max {max_scenarios} scenarios per stock\n"
f"- Only use stocks from the candidates list\n"
f"- Confidence 0-100 (80+ for actionable trades)\n"
f"- stop_loss_pct must be <= 0, take_profit_pct must be >= 0\n"
f"- Return ONLY the JSON, no markdown fences or explanation\n"
)
def _parse_response(
self,
response_text: str,
today: date,
market: str,
candidates: list[ScanCandidate],
cross_market: CrossMarketContext | None,
) -> DayPlaybook:
"""Parse Gemini's JSON response into a validated DayPlaybook."""
cleaned = self._extract_json(response_text)
data = json.loads(cleaned)
valid_codes = {c.stock_code for c in candidates}
# Parse market outlook
outlook_str = data.get("market_outlook", "neutral")
market_outlook = _OUTLOOK_MAP.get(outlook_str, MarketOutlook.NEUTRAL)
# Parse global rules
global_rules = []
for rule_data in data.get("global_rules", []):
action_str = rule_data.get("action", "HOLD")
action = _ACTION_MAP.get(action_str, ScenarioAction.HOLD)
global_rules.append(
GlobalRule(
condition=rule_data.get("condition", ""),
action=action,
rationale=rule_data.get("rationale", ""),
)
)
# Parse stock playbooks
stock_playbooks = []
max_scenarios = self._settings.MAX_SCENARIOS_PER_STOCK
for stock_data in data.get("stocks", []):
code = stock_data.get("stock_code", "")
if code not in valid_codes:
logger.warning("Gemini returned unknown stock %s — skipping", code)
continue
scenarios = []
for sc_data in stock_data.get("scenarios", [])[:max_scenarios]:
scenario = self._parse_scenario(sc_data)
if scenario:
scenarios.append(scenario)
if scenarios:
stock_playbooks.append(
StockPlaybook(
stock_code=code,
scenarios=scenarios,
)
)
return DayPlaybook(
date=today,
market=market,
market_outlook=market_outlook,
global_rules=global_rules,
stock_playbooks=stock_playbooks,
cross_market=cross_market,
)
def _parse_scenario(self, sc_data: dict) -> StockScenario | None:
"""Parse a single scenario from JSON data. Returns None if invalid."""
try:
cond_data = sc_data.get("condition", {})
condition = StockCondition(
rsi_below=cond_data.get("rsi_below"),
rsi_above=cond_data.get("rsi_above"),
volume_ratio_above=cond_data.get("volume_ratio_above"),
volume_ratio_below=cond_data.get("volume_ratio_below"),
price_above=cond_data.get("price_above"),
price_below=cond_data.get("price_below"),
price_change_pct_above=cond_data.get("price_change_pct_above"),
price_change_pct_below=cond_data.get("price_change_pct_below"),
)
if not condition.has_any_condition():
logger.warning("Scenario has no conditions — skipping")
return None
action_str = sc_data.get("action", "HOLD")
action = _ACTION_MAP.get(action_str, ScenarioAction.HOLD)
return StockScenario(
condition=condition,
action=action,
confidence=int(sc_data.get("confidence", 50)),
allocation_pct=float(sc_data.get("allocation_pct", 10.0)),
stop_loss_pct=float(sc_data.get("stop_loss_pct", -2.0)),
take_profit_pct=float(sc_data.get("take_profit_pct", 3.0)),
rationale=sc_data.get("rationale", ""),
)
except (ValueError, TypeError) as e:
logger.warning("Failed to parse scenario: %s", e)
return None
@staticmethod
def _extract_json(text: str) -> str:
"""Extract JSON from response, stripping markdown fences if present."""
stripped = text.strip()
if stripped.startswith("```"):
# Remove first line (```json or ```) and last line (```)
lines = stripped.split("\n")
lines = lines[1:] # Remove opening fence
if lines and lines[-1].strip() == "```":
lines = lines[:-1]
stripped = "\n".join(lines)
return stripped.strip()
@staticmethod
def _empty_playbook(today: date, market: str) -> DayPlaybook:
"""Return an empty playbook (no stocks, no scenarios)."""
return DayPlaybook(
date=today,
market=market,
market_outlook=MarketOutlook.NEUTRAL,
stock_playbooks=[],
)
@staticmethod
def _defensive_playbook(
today: date,
market: str,
candidates: list[ScanCandidate],
) -> DayPlaybook:
"""Return a defensive playbook — HOLD everything with stop-loss ready."""
stock_playbooks = [
StockPlaybook(
stock_code=c.stock_code,
scenarios=[
StockScenario(
condition=StockCondition(price_change_pct_below=-3.0),
action=ScenarioAction.SELL,
confidence=90,
stop_loss_pct=-3.0,
rationale="Defensive stop-loss (planner failure)",
),
],
)
for c in candidates
]
return DayPlaybook(
date=today,
market=market,
market_outlook=MarketOutlook.NEUTRAL_TO_BEARISH,
default_action=ScenarioAction.HOLD,
stock_playbooks=stock_playbooks,
global_rules=[
GlobalRule(
condition="portfolio_pnl_pct < -2.0",
action=ScenarioAction.REDUCE_ALL,
rationale="Defensive: reduce on loss threshold",
),
],
)

View File

@@ -0,0 +1,270 @@
"""Local scenario engine for playbook execution.
Matches real-time market conditions against pre-defined scenarios
without any API calls. Designed for sub-100ms execution.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from typing import Any
from src.strategy.models import (
DayPlaybook,
GlobalRule,
ScenarioAction,
StockCondition,
StockScenario,
)
logger = logging.getLogger(__name__)
@dataclass
class ScenarioMatch:
"""Result of matching market conditions against scenarios."""
stock_code: str
matched_scenario: StockScenario | None
action: ScenarioAction
confidence: int
rationale: str
global_rule_triggered: GlobalRule | None = None
match_details: dict[str, Any] = field(default_factory=dict)
class ScenarioEngine:
"""Evaluates playbook scenarios against real-time market data.
No API calls — pure Python condition matching.
Expected market_data keys: "rsi", "volume_ratio", "current_price", "price_change_pct".
Callers must normalize data source keys to match this contract.
"""
def __init__(self) -> None:
self._warned_keys: set[str] = set()
@staticmethod
def _safe_float(value: Any) -> float | None:
"""Safely cast a value to float. Returns None on failure."""
if value is None:
return None
try:
return float(value)
except (ValueError, TypeError):
return None
def _warn_missing_key(self, key: str) -> None:
"""Log a missing-key warning once per key per engine instance."""
if key not in self._warned_keys:
self._warned_keys.add(key)
logger.warning("Condition requires '%s' but key missing from market_data", key)
def evaluate(
self,
playbook: DayPlaybook,
stock_code: str,
market_data: dict[str, Any],
portfolio_data: dict[str, Any],
) -> ScenarioMatch:
"""Match market conditions to scenarios and return a decision.
Algorithm:
1. Check global rules first (portfolio-level circuit breakers)
2. Find the StockPlaybook for the given stock_code
3. Iterate scenarios in order (first match wins)
4. If no match, return playbook.default_action (HOLD)
Args:
playbook: Today's DayPlaybook for this market
stock_code: Stock ticker to evaluate
market_data: Real-time market data (price, rsi, volume_ratio, etc.)
portfolio_data: Portfolio state (pnl_pct, total_cash, etc.)
Returns:
ScenarioMatch with the decision
"""
# 1. Check global rules
triggered_rule = self.check_global_rules(playbook, portfolio_data)
if triggered_rule is not None:
logger.info(
"Global rule triggered for %s: %s -> %s",
stock_code,
triggered_rule.condition,
triggered_rule.action.value,
)
return ScenarioMatch(
stock_code=stock_code,
matched_scenario=None,
action=triggered_rule.action,
confidence=100,
rationale=f"Global rule: {triggered_rule.rationale or triggered_rule.condition}",
global_rule_triggered=triggered_rule,
)
# 2. Find stock playbook
stock_pb = playbook.get_stock_playbook(stock_code)
if stock_pb is None:
logger.debug("No playbook for %s — defaulting to %s", stock_code, playbook.default_action)
return ScenarioMatch(
stock_code=stock_code,
matched_scenario=None,
action=playbook.default_action,
confidence=0,
rationale=f"No scenarios defined for {stock_code}",
)
# 3. Iterate scenarios (first match wins)
for scenario in stock_pb.scenarios:
if self.evaluate_condition(scenario.condition, market_data):
logger.info(
"Scenario matched for %s: %s (confidence=%d)",
stock_code,
scenario.action.value,
scenario.confidence,
)
return ScenarioMatch(
stock_code=stock_code,
matched_scenario=scenario,
action=scenario.action,
confidence=scenario.confidence,
rationale=scenario.rationale,
match_details=self._build_match_details(scenario.condition, market_data),
)
# 4. No match — default action
logger.debug("No scenario matched for %s — defaulting to %s", stock_code, playbook.default_action)
return ScenarioMatch(
stock_code=stock_code,
matched_scenario=None,
action=playbook.default_action,
confidence=0,
rationale="No scenario conditions met — holding position",
)
def check_global_rules(
self,
playbook: DayPlaybook,
portfolio_data: dict[str, Any],
) -> GlobalRule | None:
"""Check portfolio-level rules. Returns first triggered rule or None."""
for rule in playbook.global_rules:
if self._evaluate_global_condition(rule.condition, portfolio_data):
return rule
return None
def evaluate_condition(
self,
condition: StockCondition,
market_data: dict[str, Any],
) -> bool:
"""Evaluate all non-None fields in condition as AND.
Returns True only if ALL specified conditions are met.
Empty condition (no fields set) returns False for safety.
"""
if not condition.has_any_condition():
return False
checks: list[bool] = []
rsi = self._safe_float(market_data.get("rsi"))
if condition.rsi_below is not None or condition.rsi_above is not None:
if "rsi" not in market_data:
self._warn_missing_key("rsi")
if condition.rsi_below is not None:
checks.append(rsi is not None and rsi < condition.rsi_below)
if condition.rsi_above is not None:
checks.append(rsi is not None and rsi > condition.rsi_above)
volume_ratio = self._safe_float(market_data.get("volume_ratio"))
if condition.volume_ratio_above is not None or condition.volume_ratio_below is not None:
if "volume_ratio" not in market_data:
self._warn_missing_key("volume_ratio")
if condition.volume_ratio_above is not None:
checks.append(volume_ratio is not None and volume_ratio > condition.volume_ratio_above)
if condition.volume_ratio_below is not None:
checks.append(volume_ratio is not None and volume_ratio < condition.volume_ratio_below)
price = self._safe_float(market_data.get("current_price"))
if condition.price_above is not None or condition.price_below is not None:
if "current_price" not in market_data:
self._warn_missing_key("current_price")
if condition.price_above is not None:
checks.append(price is not None and price > condition.price_above)
if condition.price_below is not None:
checks.append(price is not None and price < condition.price_below)
price_change_pct = self._safe_float(market_data.get("price_change_pct"))
if condition.price_change_pct_above is not None or condition.price_change_pct_below is not None:
if "price_change_pct" not in market_data:
self._warn_missing_key("price_change_pct")
if condition.price_change_pct_above is not None:
checks.append(price_change_pct is not None and price_change_pct > condition.price_change_pct_above)
if condition.price_change_pct_below is not None:
checks.append(price_change_pct is not None and price_change_pct < condition.price_change_pct_below)
return len(checks) > 0 and all(checks)
def _evaluate_global_condition(
self,
condition_str: str,
portfolio_data: dict[str, Any],
) -> bool:
"""Evaluate a simple global condition string against portfolio data.
Supports: "field < value", "field > value", "field <= value", "field >= value"
"""
parts = condition_str.strip().split()
if len(parts) != 3:
logger.warning("Invalid global condition format: %s", condition_str)
return False
field_name, operator, value_str = parts
try:
threshold = float(value_str)
except ValueError:
logger.warning("Invalid threshold in condition: %s", condition_str)
return False
actual = portfolio_data.get(field_name)
if actual is None:
return False
try:
actual_val = float(actual)
except (ValueError, TypeError):
return False
if operator == "<":
return actual_val < threshold
elif operator == ">":
return actual_val > threshold
elif operator == "<=":
return actual_val <= threshold
elif operator == ">=":
return actual_val >= threshold
else:
logger.warning("Unknown operator in condition: %s", operator)
return False
def _build_match_details(
self,
condition: StockCondition,
market_data: dict[str, Any],
) -> dict[str, Any]:
"""Build a summary of which conditions matched and their normalized values."""
details: dict[str, Any] = {}
if condition.rsi_below is not None or condition.rsi_above is not None:
details["rsi"] = self._safe_float(market_data.get("rsi"))
if condition.volume_ratio_above is not None or condition.volume_ratio_below is not None:
details["volume_ratio"] = self._safe_float(market_data.get("volume_ratio"))
if condition.price_above is not None or condition.price_below is not None:
details["current_price"] = self._safe_float(market_data.get("current_price"))
if condition.price_change_pct_above is not None or condition.price_change_pct_below is not None:
details["price_change_pct"] = self._safe_float(market_data.get("price_change_pct"))
return details

View File

@@ -161,7 +161,7 @@ class TestContextAggregator:
self, aggregator: ContextAggregator, db_conn: sqlite3.Connection self, aggregator: ContextAggregator, db_conn: sqlite3.Connection
) -> None: ) -> None:
"""Test aggregating daily metrics from trades.""" """Test aggregating daily metrics from trades."""
date = "2026-02-04" date = datetime.now(UTC).date().isoformat()
# Create sample trades # Create sample trades
log_trade(db_conn, "005930", "BUY", 85, "Good signal", quantity=10, price=70000, pnl=500) log_trade(db_conn, "005930", "BUY", 85, "Good signal", quantity=10, price=70000, pnl=500)
@@ -175,36 +175,44 @@ class TestContextAggregator:
db_conn.commit() db_conn.commit()
# Aggregate # Aggregate
aggregator.aggregate_daily_from_trades(date) aggregator.aggregate_daily_from_trades(date, market="KR")
# Verify L6 contexts # Verify L6 contexts
store = aggregator.store store = aggregator.store
assert store.get_context(ContextLayer.L6_DAILY, date, "trade_count") == 3 assert store.get_context(ContextLayer.L6_DAILY, date, "trade_count_KR") == 3
assert store.get_context(ContextLayer.L6_DAILY, date, "buys") == 1 assert store.get_context(ContextLayer.L6_DAILY, date, "buys_KR") == 1
assert store.get_context(ContextLayer.L6_DAILY, date, "sells") == 1 assert store.get_context(ContextLayer.L6_DAILY, date, "sells_KR") == 1
assert store.get_context(ContextLayer.L6_DAILY, date, "holds") == 1 assert store.get_context(ContextLayer.L6_DAILY, date, "holds_KR") == 1
assert store.get_context(ContextLayer.L6_DAILY, date, "total_pnl") == 2000.0 assert store.get_context(ContextLayer.L6_DAILY, date, "total_pnl_KR") == 2000.0
assert store.get_context(ContextLayer.L6_DAILY, date, "unique_stocks") == 3 assert store.get_context(ContextLayer.L6_DAILY, date, "unique_stocks_KR") == 3
# 2 wins, 0 losses # 2 wins, 0 losses
assert store.get_context(ContextLayer.L6_DAILY, date, "win_rate") == 100.0 assert store.get_context(ContextLayer.L6_DAILY, date, "win_rate_KR") == 100.0
def test_aggregate_weekly_from_daily(self, aggregator: ContextAggregator) -> None: def test_aggregate_weekly_from_daily(self, aggregator: ContextAggregator) -> None:
"""Test aggregating weekly metrics from daily.""" """Test aggregating weekly metrics from daily."""
week = "2026-W06" week = "2026-W06"
# Set daily contexts # Set daily contexts
aggregator.store.set_context(ContextLayer.L6_DAILY, "2026-02-02", "total_pnl", 100.0) aggregator.store.set_context(
aggregator.store.set_context(ContextLayer.L6_DAILY, "2026-02-03", "total_pnl", 200.0) ContextLayer.L6_DAILY, "2026-02-02", "total_pnl_KR", 100.0
aggregator.store.set_context(ContextLayer.L6_DAILY, "2026-02-02", "avg_confidence", 80.0) )
aggregator.store.set_context(ContextLayer.L6_DAILY, "2026-02-03", "avg_confidence", 85.0) aggregator.store.set_context(
ContextLayer.L6_DAILY, "2026-02-03", "total_pnl_KR", 200.0
)
aggregator.store.set_context(
ContextLayer.L6_DAILY, "2026-02-02", "avg_confidence_KR", 80.0
)
aggregator.store.set_context(
ContextLayer.L6_DAILY, "2026-02-03", "avg_confidence_KR", 85.0
)
# Aggregate # Aggregate
aggregator.aggregate_weekly_from_daily(week) aggregator.aggregate_weekly_from_daily(week)
# Verify L5 contexts # Verify L5 contexts
store = aggregator.store store = aggregator.store
weekly_pnl = store.get_context(ContextLayer.L5_WEEKLY, week, "weekly_pnl") weekly_pnl = store.get_context(ContextLayer.L5_WEEKLY, week, "weekly_pnl_KR")
avg_conf = store.get_context(ContextLayer.L5_WEEKLY, week, "avg_confidence") avg_conf = store.get_context(ContextLayer.L5_WEEKLY, week, "avg_confidence_KR")
assert weekly_pnl == 300.0 assert weekly_pnl == 300.0
assert avg_conf == 82.5 assert avg_conf == 82.5
@@ -214,9 +222,15 @@ class TestContextAggregator:
month = "2026-02" month = "2026-02"
# Set weekly contexts # Set weekly contexts
aggregator.store.set_context(ContextLayer.L5_WEEKLY, "2026-W05", "weekly_pnl", 100.0) aggregator.store.set_context(
aggregator.store.set_context(ContextLayer.L5_WEEKLY, "2026-W06", "weekly_pnl", 200.0) ContextLayer.L5_WEEKLY, "2026-W05", "weekly_pnl_KR", 100.0
aggregator.store.set_context(ContextLayer.L5_WEEKLY, "2026-W07", "weekly_pnl", 150.0) )
aggregator.store.set_context(
ContextLayer.L5_WEEKLY, "2026-W06", "weekly_pnl_KR", 200.0
)
aggregator.store.set_context(
ContextLayer.L5_WEEKLY, "2026-W07", "weekly_pnl_KR", 150.0
)
# Aggregate # Aggregate
aggregator.aggregate_monthly_from_weekly(month) aggregator.aggregate_monthly_from_weekly(month)
@@ -285,7 +299,7 @@ class TestContextAggregator:
self, aggregator: ContextAggregator, db_conn: sqlite3.Connection self, aggregator: ContextAggregator, db_conn: sqlite3.Connection
) -> None: ) -> None:
"""Test running all aggregations from L7 to L1.""" """Test running all aggregations from L7 to L1."""
date = "2026-02-04" date = datetime.now(UTC).date().isoformat()
# Create sample trades # Create sample trades
log_trade(db_conn, "005930", "BUY", 85, "Good signal", quantity=10, price=70000, pnl=1000) log_trade(db_conn, "005930", "BUY", 85, "Good signal", quantity=10, price=70000, pnl=1000)
@@ -299,10 +313,18 @@ class TestContextAggregator:
# Verify data exists in each layer # Verify data exists in each layer
store = aggregator.store store = aggregator.store
assert store.get_context(ContextLayer.L6_DAILY, date, "total_pnl") == 1000.0 assert store.get_context(ContextLayer.L6_DAILY, date, "total_pnl_KR") == 1000.0
current_week = datetime.now(UTC).strftime("%Y-W%V") from datetime import date as date_cls
assert store.get_context(ContextLayer.L5_WEEKLY, current_week, "weekly_pnl") is not None trade_date = date_cls.fromisoformat(date)
# Further layers depend on time alignment, just verify no crashes iso_year, iso_week, _ = trade_date.isocalendar()
trade_week = f"{iso_year}-W{iso_week:02d}"
assert store.get_context(ContextLayer.L5_WEEKLY, trade_week, "weekly_pnl_KR") is not None
trade_month = f"{trade_date.year}-{trade_date.month:02d}"
trade_quarter = f"{trade_date.year}-Q{(trade_date.month - 1) // 3 + 1}"
trade_year = str(trade_date.year)
assert store.get_context(ContextLayer.L4_MONTHLY, trade_month, "monthly_pnl") == 1000.0
assert store.get_context(ContextLayer.L3_QUARTERLY, trade_quarter, "quarterly_pnl") == 1000.0
assert store.get_context(ContextLayer.L2_ANNUAL, trade_year, "annual_pnl") == 1000.0
class TestLayerMetadata: class TestLayerMetadata:

View File

@@ -0,0 +1,104 @@
"""Tests for ContextScheduler."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import UTC, datetime
from src.context.scheduler import ContextScheduler
@dataclass
class StubAggregator:
"""Stub aggregator that records calls."""
weekly_calls: list[str]
monthly_calls: list[str]
quarterly_calls: list[str]
annual_calls: list[str]
legacy_calls: int
def aggregate_weekly_from_daily(self, week: str) -> None:
self.weekly_calls.append(week)
def aggregate_monthly_from_weekly(self, month: str) -> None:
self.monthly_calls.append(month)
def aggregate_quarterly_from_monthly(self, quarter: str) -> None:
self.quarterly_calls.append(quarter)
def aggregate_annual_from_quarterly(self, year: str) -> None:
self.annual_calls.append(year)
def aggregate_legacy_from_annual(self) -> None:
self.legacy_calls += 1
@dataclass
class StubStore:
"""Stub store that records cleanup calls."""
cleanup_calls: int = 0
def cleanup_expired_contexts(self) -> None:
self.cleanup_calls += 1
def make_scheduler() -> tuple[ContextScheduler, StubAggregator, StubStore]:
aggregator = StubAggregator([], [], [], [], 0)
store = StubStore()
scheduler = ContextScheduler(aggregator=aggregator, store=store)
return scheduler, aggregator, store
def test_run_if_due_weekly() -> None:
scheduler, aggregator, store = make_scheduler()
now = datetime(2026, 2, 8, 10, 0, tzinfo=UTC) # Sunday
result = scheduler.run_if_due(now)
assert result.weekly is True
assert aggregator.weekly_calls == ["2026-W06"]
assert store.cleanup_calls == 1
def test_run_if_due_monthly() -> None:
scheduler, aggregator, _store = make_scheduler()
now = datetime(2026, 2, 28, 12, 0, tzinfo=UTC) # Last day of month
result = scheduler.run_if_due(now)
assert result.monthly is True
assert aggregator.monthly_calls == ["2026-02"]
def test_run_if_due_quarterly() -> None:
scheduler, aggregator, _store = make_scheduler()
now = datetime(2026, 3, 31, 12, 0, tzinfo=UTC) # Last day of Q1
result = scheduler.run_if_due(now)
assert result.quarterly is True
assert aggregator.quarterly_calls == ["2026-Q1"]
def test_run_if_due_annual_and_legacy() -> None:
scheduler, aggregator, _store = make_scheduler()
now = datetime(2026, 12, 31, 12, 0, tzinfo=UTC)
result = scheduler.run_if_due(now)
assert result.annual is True
assert result.legacy is True
assert aggregator.annual_calls == ["2026"]
assert aggregator.legacy_calls == 1
def test_cleanup_runs_once_per_day() -> None:
scheduler, _aggregator, store = make_scheduler()
now = datetime(2026, 2, 9, 9, 0, tzinfo=UTC)
scheduler.run_if_due(now)
scheduler.run_if_due(now)
assert store.cleanup_calls == 1

387
tests/test_daily_review.py Normal file
View File

@@ -0,0 +1,387 @@
"""Tests for DailyReviewer."""
from __future__ import annotations
import json
import sqlite3
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock
import pytest
from src.context.layer import ContextLayer
from src.context.store import ContextStore
from src.db import init_db, log_trade
from src.evolution.daily_review import DailyReviewer
from src.evolution.scorecard import DailyScorecard
from src.logging.decision_logger import DecisionLogger
from datetime import UTC, datetime
TODAY = datetime.now(UTC).strftime("%Y-%m-%d")
@pytest.fixture
def db_conn() -> sqlite3.Connection:
return init_db(":memory:")
@pytest.fixture
def context_store(db_conn: sqlite3.Connection) -> ContextStore:
return ContextStore(db_conn)
def _log_decision(
logger: DecisionLogger,
*,
stock_code: str,
market: str,
action: str,
confidence: int,
scenario_match: dict[str, float] | None = None,
) -> str:
return logger.log_decision(
stock_code=stock_code,
market=market,
exchange_code="KRX" if market == "KR" else "NASDAQ",
action=action,
confidence=confidence,
rationale="test",
context_snapshot={"scenario_match": scenario_match or {}},
input_data={"stock_code": stock_code},
)
def test_generate_scorecard_market_scoped(
db_conn: sqlite3.Connection, context_store: ContextStore,
) -> None:
reviewer = DailyReviewer(db_conn, context_store)
logger = DecisionLogger(db_conn)
buy_id = _log_decision(
logger,
stock_code="005930",
market="KR",
action="BUY",
confidence=90,
scenario_match={"rsi": 29.0},
)
_log_decision(
logger,
stock_code="000660",
market="KR",
action="HOLD",
confidence=60,
)
_log_decision(
logger,
stock_code="AAPL",
market="US",
action="SELL",
confidence=80,
scenario_match={"volume_ratio": 2.1},
)
log_trade(
db_conn,
"005930",
"BUY",
90,
"buy",
quantity=1,
price=100.0,
pnl=10.0,
market="KR",
exchange_code="KRX",
decision_id=buy_id,
)
log_trade(
db_conn,
"000660",
"HOLD",
60,
"hold",
quantity=0,
price=0.0,
pnl=0.0,
market="KR",
exchange_code="KRX",
)
log_trade(
db_conn,
"AAPL",
"SELL",
80,
"sell",
quantity=1,
price=200.0,
pnl=-5.0,
market="US",
exchange_code="NASDAQ",
)
scorecard = reviewer.generate_scorecard(TODAY, "KR")
assert scorecard.market == "KR"
assert scorecard.total_decisions == 2
assert scorecard.buys == 1
assert scorecard.sells == 0
assert scorecard.holds == 1
assert scorecard.total_pnl == 10.0
assert scorecard.win_rate == 100.0
assert scorecard.avg_confidence == 75.0
assert scorecard.scenario_match_rate == 50.0
def test_generate_scorecard_top_winners_and_losers(
db_conn: sqlite3.Connection, context_store: ContextStore,
) -> None:
reviewer = DailyReviewer(db_conn, context_store)
logger = DecisionLogger(db_conn)
for code, pnl in [("005930", 30.0), ("000660", 10.0), ("035420", -15.0), ("051910", -5.0)]:
decision_id = _log_decision(
logger,
stock_code=code,
market="KR",
action="BUY" if pnl >= 0 else "SELL",
confidence=80,
scenario_match={"rsi": 30.0},
)
log_trade(
db_conn,
code,
"BUY" if pnl >= 0 else "SELL",
80,
"test",
quantity=1,
price=100.0,
pnl=pnl,
market="KR",
exchange_code="KRX",
decision_id=decision_id,
)
scorecard = reviewer.generate_scorecard(TODAY, "KR")
assert scorecard.top_winners == ["005930", "000660"]
assert scorecard.top_losers == ["035420", "051910"]
def test_generate_scorecard_empty_day(
db_conn: sqlite3.Connection, context_store: ContextStore,
) -> None:
reviewer = DailyReviewer(db_conn, context_store)
scorecard = reviewer.generate_scorecard(TODAY, "KR")
assert scorecard.total_decisions == 0
assert scorecard.total_pnl == 0.0
assert scorecard.win_rate == 0.0
assert scorecard.avg_confidence == 0.0
assert scorecard.scenario_match_rate == 0.0
assert scorecard.top_winners == []
assert scorecard.top_losers == []
@pytest.mark.asyncio
async def test_generate_lessons_without_gemini_returns_empty(
db_conn: sqlite3.Connection, context_store: ContextStore,
) -> None:
reviewer = DailyReviewer(db_conn, context_store, gemini_client=None)
lessons = await reviewer.generate_lessons(
DailyScorecard(
date="2026-02-14",
market="KR",
total_decisions=1,
buys=1,
sells=0,
holds=0,
total_pnl=5.0,
win_rate=100.0,
avg_confidence=90.0,
scenario_match_rate=100.0,
)
)
assert lessons == []
@pytest.mark.asyncio
async def test_generate_lessons_parses_json_array(
db_conn: sqlite3.Connection, context_store: ContextStore,
) -> None:
mock_gemini = MagicMock()
mock_gemini.decide = AsyncMock(
return_value=SimpleNamespace(rationale='["Cut losers earlier", "Reduce midday churn"]')
)
reviewer = DailyReviewer(db_conn, context_store, gemini_client=mock_gemini)
lessons = await reviewer.generate_lessons(
DailyScorecard(
date="2026-02-14",
market="KR",
total_decisions=3,
buys=1,
sells=1,
holds=1,
total_pnl=-2.5,
win_rate=50.0,
avg_confidence=70.0,
scenario_match_rate=66.7,
)
)
assert lessons == ["Cut losers earlier", "Reduce midday churn"]
@pytest.mark.asyncio
async def test_generate_lessons_fallback_to_lines(
db_conn: sqlite3.Connection, context_store: ContextStore,
) -> None:
mock_gemini = MagicMock()
mock_gemini.decide = AsyncMock(
return_value=SimpleNamespace(rationale="- Keep risk tighter\n- Increase selectivity")
)
reviewer = DailyReviewer(db_conn, context_store, gemini_client=mock_gemini)
lessons = await reviewer.generate_lessons(
DailyScorecard(
date="2026-02-14",
market="US",
total_decisions=2,
buys=1,
sells=1,
holds=0,
total_pnl=1.0,
win_rate=50.0,
avg_confidence=75.0,
scenario_match_rate=100.0,
)
)
assert lessons == ["Keep risk tighter", "Increase selectivity"]
@pytest.mark.asyncio
async def test_generate_lessons_handles_gemini_error(
db_conn: sqlite3.Connection, context_store: ContextStore,
) -> None:
mock_gemini = MagicMock()
mock_gemini.decide = AsyncMock(side_effect=RuntimeError("boom"))
reviewer = DailyReviewer(db_conn, context_store, gemini_client=mock_gemini)
lessons = await reviewer.generate_lessons(
DailyScorecard(
date="2026-02-14",
market="US",
total_decisions=0,
buys=0,
sells=0,
holds=0,
total_pnl=0.0,
win_rate=0.0,
avg_confidence=0.0,
scenario_match_rate=0.0,
)
)
assert lessons == []
def test_store_scorecard_in_context(
db_conn: sqlite3.Connection, context_store: ContextStore,
) -> None:
reviewer = DailyReviewer(db_conn, context_store)
scorecard = DailyScorecard(
date="2026-02-14",
market="KR",
total_decisions=5,
buys=2,
sells=1,
holds=2,
total_pnl=15.0,
win_rate=66.67,
avg_confidence=82.0,
scenario_match_rate=80.0,
lessons=["Keep position sizing stable"],
cross_market_note="US risk-off",
)
reviewer.store_scorecard_in_context(scorecard)
stored = context_store.get_context(
ContextLayer.L6_DAILY,
"2026-02-14",
"scorecard_KR",
)
assert stored is not None
assert stored["market"] == "KR"
assert stored["total_pnl"] == 15.0
assert stored["lessons"] == ["Keep position sizing stable"]
def test_store_scorecard_key_is_market_scoped(
db_conn: sqlite3.Connection, context_store: ContextStore,
) -> None:
reviewer = DailyReviewer(db_conn, context_store)
kr = DailyScorecard(
date="2026-02-14",
market="KR",
total_decisions=1,
buys=1,
sells=0,
holds=0,
total_pnl=1.0,
win_rate=100.0,
avg_confidence=90.0,
scenario_match_rate=100.0,
)
us = DailyScorecard(
date="2026-02-14",
market="US",
total_decisions=1,
buys=0,
sells=1,
holds=0,
total_pnl=-1.0,
win_rate=0.0,
avg_confidence=70.0,
scenario_match_rate=100.0,
)
reviewer.store_scorecard_in_context(kr)
reviewer.store_scorecard_in_context(us)
kr_ctx = context_store.get_context(ContextLayer.L6_DAILY, "2026-02-14", "scorecard_KR")
us_ctx = context_store.get_context(ContextLayer.L6_DAILY, "2026-02-14", "scorecard_US")
assert kr_ctx["market"] == "KR"
assert us_ctx["market"] == "US"
assert kr_ctx["total_pnl"] == 1.0
assert us_ctx["total_pnl"] == -1.0
def test_generate_scorecard_handles_invalid_context_snapshot(
db_conn: sqlite3.Connection, context_store: ContextStore,
) -> None:
reviewer = DailyReviewer(db_conn, context_store)
db_conn.execute(
"""
INSERT INTO decision_logs (
decision_id, timestamp, stock_code, market, exchange_code,
action, confidence, rationale, context_snapshot, input_data
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"d1",
"2026-02-14T09:00:00+00:00",
"005930",
"KR",
"KRX",
"HOLD",
50,
"test",
"{invalid_json",
json.dumps({}),
),
)
db_conn.commit()
scorecard = reviewer.generate_scorecard("2026-02-14", "KR")
assert scorecard.total_decisions == 1
assert scorecard.scenario_match_rate == 0.0

298
tests/test_dashboard.py Normal file
View File

@@ -0,0 +1,298 @@
"""Tests for dashboard endpoint handlers."""
from __future__ import annotations
import json
import sqlite3
from collections.abc import Callable
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
import pytest
from fastapi import HTTPException
from fastapi.responses import FileResponse
from src.dashboard.app import create_dashboard_app
from src.db import init_db
def _seed_db(conn: sqlite3.Connection) -> None:
today = datetime.now(UTC).date().isoformat()
conn.execute(
"""
INSERT INTO playbooks (
date, market, status, playbook_json, generated_at,
token_count, scenario_count, match_count
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"2026-02-14",
"KR",
"ready",
json.dumps({"market": "KR", "stock_playbooks": []}),
"2026-02-14T08:30:00+00:00",
123,
2,
1,
),
)
conn.execute(
"""
INSERT INTO playbooks (
date, market, status, playbook_json, generated_at,
token_count, scenario_count, match_count
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
today,
"US_NASDAQ",
"ready",
json.dumps({"market": "US_NASDAQ", "stock_playbooks": []}),
f"{today}T08:30:00+00:00",
100,
1,
0,
),
)
conn.execute(
"""
INSERT INTO contexts (layer, timeframe, key, value, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
"L6_DAILY",
"2026-02-14",
"scorecard_KR",
json.dumps({"market": "KR", "total_pnl": 1.5, "win_rate": 60.0}),
"2026-02-14T15:30:00+00:00",
"2026-02-14T15:30:00+00:00",
),
)
conn.execute(
"""
INSERT INTO contexts (layer, timeframe, key, value, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
"L7_REALTIME",
"2026-02-14T10:00:00+00:00",
"volatility_KR_005930",
json.dumps({"momentum_score": 70.0}),
"2026-02-14T10:00:00+00:00",
"2026-02-14T10:00:00+00:00",
),
)
conn.execute(
"""
INSERT INTO decision_logs (
decision_id, timestamp, stock_code, market, exchange_code,
action, confidence, rationale, context_snapshot, input_data
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"d-kr-1",
f"{today}T09:10:00+00:00",
"005930",
"KR",
"KRX",
"BUY",
85,
"signal matched",
json.dumps({"scenario_match": {"rsi": 28.0}}),
json.dumps({"current_price": 70000}),
),
)
conn.execute(
"""
INSERT INTO decision_logs (
decision_id, timestamp, stock_code, market, exchange_code,
action, confidence, rationale, context_snapshot, input_data
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"d-us-1",
f"{today}T21:10:00+00:00",
"AAPL",
"US_NASDAQ",
"NASDAQ",
"SELL",
80,
"no match",
json.dumps({"scenario_match": {}}),
json.dumps({"current_price": 200}),
),
)
conn.execute(
"""
INSERT INTO trades (
timestamp, stock_code, action, confidence, rationale,
quantity, price, pnl, market, exchange_code, selection_context, decision_id
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
f"{today}T09:11:00+00:00",
"005930",
"BUY",
85,
"buy",
1,
70000,
2.0,
"KR",
"KRX",
None,
"d-kr-1",
),
)
conn.execute(
"""
INSERT INTO trades (
timestamp, stock_code, action, confidence, rationale,
quantity, price, pnl, market, exchange_code, selection_context, decision_id
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
f"{today}T21:11:00+00:00",
"AAPL",
"SELL",
80,
"sell",
1,
200,
-1.0,
"US_NASDAQ",
"NASDAQ",
None,
"d-us-1",
),
)
conn.commit()
def _app(tmp_path: Path) -> Any:
db_path = tmp_path / "dashboard_test.db"
conn = init_db(str(db_path))
_seed_db(conn)
conn.close()
return create_dashboard_app(str(db_path))
def _endpoint(app: Any, path: str) -> Callable[..., Any]:
for route in app.routes:
if getattr(route, "path", None) == path:
return route.endpoint
raise AssertionError(f"route not found: {path}")
def test_index_serves_html(tmp_path: Path) -> None:
app = _app(tmp_path)
index = _endpoint(app, "/")
resp = index()
assert isinstance(resp, FileResponse)
assert "index.html" in str(resp.path)
def test_status_endpoint(tmp_path: Path) -> None:
app = _app(tmp_path)
get_status = _endpoint(app, "/api/status")
body = get_status()
assert "KR" in body["markets"]
assert "US_NASDAQ" in body["markets"]
assert "totals" in body
def test_playbook_found(tmp_path: Path) -> None:
app = _app(tmp_path)
get_playbook = _endpoint(app, "/api/playbook/{date_str}")
body = get_playbook("2026-02-14", market="KR")
assert body["market"] == "KR"
def test_playbook_not_found(tmp_path: Path) -> None:
app = _app(tmp_path)
get_playbook = _endpoint(app, "/api/playbook/{date_str}")
with pytest.raises(HTTPException, match="playbook not found"):
get_playbook("2026-02-15", market="KR")
def test_scorecard_found(tmp_path: Path) -> None:
app = _app(tmp_path)
get_scorecard = _endpoint(app, "/api/scorecard/{date_str}")
body = get_scorecard("2026-02-14", market="KR")
assert body["scorecard"]["total_pnl"] == 1.5
def test_scorecard_not_found(tmp_path: Path) -> None:
app = _app(tmp_path)
get_scorecard = _endpoint(app, "/api/scorecard/{date_str}")
with pytest.raises(HTTPException, match="scorecard not found"):
get_scorecard("2026-02-15", market="KR")
def test_performance_all(tmp_path: Path) -> None:
app = _app(tmp_path)
get_performance = _endpoint(app, "/api/performance")
body = get_performance(market="all")
assert body["market"] == "all"
assert body["combined"]["total_trades"] == 2
assert len(body["by_market"]) == 2
def test_performance_market_filter(tmp_path: Path) -> None:
app = _app(tmp_path)
get_performance = _endpoint(app, "/api/performance")
body = get_performance(market="KR")
assert body["market"] == "KR"
assert body["metrics"]["total_trades"] == 1
def test_performance_empty_market(tmp_path: Path) -> None:
app = _app(tmp_path)
get_performance = _endpoint(app, "/api/performance")
body = get_performance(market="JP")
assert body["metrics"]["total_trades"] == 0
def test_context_layer_all(tmp_path: Path) -> None:
app = _app(tmp_path)
get_context_layer = _endpoint(app, "/api/context/{layer}")
body = get_context_layer("L7_REALTIME", timeframe=None, limit=100)
assert body["layer"] == "L7_REALTIME"
assert body["count"] == 1
def test_context_layer_timeframe_filter(tmp_path: Path) -> None:
app = _app(tmp_path)
get_context_layer = _endpoint(app, "/api/context/{layer}")
body = get_context_layer("L6_DAILY", timeframe="2026-02-14", limit=100)
assert body["count"] == 1
assert body["entries"][0]["key"] == "scorecard_KR"
def test_decisions_endpoint(tmp_path: Path) -> None:
app = _app(tmp_path)
get_decisions = _endpoint(app, "/api/decisions")
body = get_decisions(market="KR", limit=50)
assert body["count"] == 1
assert body["decisions"][0]["decision_id"] == "d-kr-1"
def test_scenarios_active_filters_non_matched(tmp_path: Path) -> None:
app = _app(tmp_path)
get_active_scenarios = _endpoint(app, "/api/scenarios/active")
body = get_active_scenarios(
market="KR",
date_str=datetime.now(UTC).date().isoformat(),
limit=50,
)
assert body["count"] == 1
assert body["matches"][0]["stock_code"] == "005930"
def test_scenarios_active_empty_when_no_matches(tmp_path: Path) -> None:
app = _app(tmp_path)
get_active_scenarios = _endpoint(app, "/api/scenarios/active")
body = get_active_scenarios(market="US", date_str="2026-02-14", limit=50)
assert body["count"] == 0

60
tests/test_db.py Normal file
View File

@@ -0,0 +1,60 @@
"""Tests for database helper functions."""
from src.db import get_open_position, init_db, log_trade
def test_get_open_position_returns_latest_buy() -> None:
conn = init_db(":memory:")
log_trade(
conn=conn,
stock_code="005930",
action="BUY",
confidence=90,
rationale="entry",
quantity=2,
price=70000.0,
market="KR",
exchange_code="KRX",
decision_id="d-buy-1",
)
position = get_open_position(conn, "005930", "KR")
assert position is not None
assert position["decision_id"] == "d-buy-1"
assert position["price"] == 70000.0
assert position["quantity"] == 2
def test_get_open_position_returns_none_when_latest_is_sell() -> None:
conn = init_db(":memory:")
log_trade(
conn=conn,
stock_code="005930",
action="BUY",
confidence=90,
rationale="entry",
quantity=1,
price=70000.0,
market="KR",
exchange_code="KRX",
decision_id="d-buy-1",
)
log_trade(
conn=conn,
stock_code="005930",
action="SELL",
confidence=95,
rationale="exit",
quantity=1,
price=71000.0,
market="KR",
exchange_code="KRX",
decision_id="d-sell-1",
)
assert get_open_position(conn, "005930", "KR") is None
def test_get_open_position_returns_none_when_no_trades() -> None:
conn = init_db(":memory:")
assert get_open_position(conn, "AAPL", "US_NASDAQ") is None

File diff suppressed because it is too large Load Diff

View File

@@ -7,6 +7,7 @@ import pytest
from src.markets.schedule import ( from src.markets.schedule import (
MARKETS, MARKETS,
expand_market_codes,
get_next_market_open, get_next_market_open,
get_open_markets, get_open_markets,
is_market_open, is_market_open,
@@ -199,3 +200,28 @@ class TestGetNextMarketOpen:
enabled_markets=["INVALID", "KR"], now=test_time enabled_markets=["INVALID", "KR"], now=test_time
) )
assert market.code == "KR" assert market.code == "KR"
class TestExpandMarketCodes:
"""Test shorthand market expansion."""
def test_expand_us_shorthand(self) -> None:
assert expand_market_codes(["US"]) == ["US_NASDAQ", "US_NYSE", "US_AMEX"]
def test_expand_cn_shorthand(self) -> None:
assert expand_market_codes(["CN"]) == ["CN_SHA", "CN_SZA"]
def test_expand_vn_shorthand(self) -> None:
assert expand_market_codes(["VN"]) == ["VN_HAN", "VN_HCM"]
def test_expand_mixed_codes(self) -> None:
assert expand_market_codes(["KR", "US", "JP"]) == [
"KR",
"US_NASDAQ",
"US_NYSE",
"US_AMEX",
"JP",
]
def test_expand_preserves_unknown_code(self) -> None:
assert expand_market_codes(["KR", "UNKNOWN"]) == ["KR", "UNKNOWN"]

View File

@@ -0,0 +1,289 @@
"""Tests for playbook persistence (PlaybookStore + DB schema)."""
from __future__ import annotations
from datetime import date
import pytest
from src.db import init_db
from src.strategy.models import (
DayPlaybook,
GlobalRule,
MarketOutlook,
PlaybookStatus,
ScenarioAction,
StockCondition,
StockPlaybook,
StockScenario,
)
from src.strategy.playbook_store import PlaybookStore
@pytest.fixture
def conn():
"""Create an in-memory DB with schema."""
connection = init_db(":memory:")
yield connection
connection.close()
@pytest.fixture
def store(conn) -> PlaybookStore:
return PlaybookStore(conn)
def _make_playbook(
target_date: date = date(2026, 2, 8),
market: str = "KR",
outlook: MarketOutlook = MarketOutlook.NEUTRAL,
stock_codes: list[str] | None = None,
) -> DayPlaybook:
"""Create a test playbook with sensible defaults."""
if stock_codes is None:
stock_codes = ["005930"]
return DayPlaybook(
date=target_date,
market=market,
market_outlook=outlook,
token_count=150,
stock_playbooks=[
StockPlaybook(
stock_code=code,
scenarios=[
StockScenario(
condition=StockCondition(rsi_below=30.0),
action=ScenarioAction.BUY,
confidence=85,
rationale=f"Oversold bounce for {code}",
),
],
)
for code in stock_codes
],
global_rules=[
GlobalRule(
condition="portfolio_pnl_pct < -2.0",
action=ScenarioAction.REDUCE_ALL,
rationale="Near circuit breaker",
),
],
)
# ---------------------------------------------------------------------------
# Schema
# ---------------------------------------------------------------------------
class TestSchema:
def test_playbooks_table_exists(self, conn) -> None:
row = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='playbooks'"
).fetchone()
assert row is not None
def test_unique_constraint(self, store: PlaybookStore) -> None:
pb = _make_playbook()
store.save(pb)
# Saving again for same date+market should replace, not error
pb2 = _make_playbook(stock_codes=["005930", "000660"])
store.save(pb2)
loaded = store.load(date(2026, 2, 8), "KR")
assert loaded is not None
assert loaded.stock_count == 2
# ---------------------------------------------------------------------------
# Save / Load
# ---------------------------------------------------------------------------
class TestSaveLoad:
def test_save_and_load(self, store: PlaybookStore) -> None:
pb = _make_playbook()
row_id = store.save(pb)
assert row_id > 0
loaded = store.load(date(2026, 2, 8), "KR")
assert loaded is not None
assert loaded.date == date(2026, 2, 8)
assert loaded.market == "KR"
assert loaded.stock_count == 1
assert loaded.scenario_count == 1
def test_load_not_found(self, store: PlaybookStore) -> None:
result = store.load(date(2026, 1, 1), "KR")
assert result is None
def test_save_preserves_all_fields(self, store: PlaybookStore) -> None:
pb = _make_playbook(
outlook=MarketOutlook.BULLISH,
stock_codes=["005930", "AAPL"],
)
store.save(pb)
loaded = store.load(date(2026, 2, 8), "KR")
assert loaded is not None
assert loaded.market_outlook == MarketOutlook.BULLISH
assert loaded.stock_count == 2
assert loaded.global_rules[0].action == ScenarioAction.REDUCE_ALL
assert loaded.token_count == 150
def test_save_different_markets(self, store: PlaybookStore) -> None:
kr = _make_playbook(market="KR")
us = _make_playbook(market="US", stock_codes=["AAPL"])
store.save(kr)
store.save(us)
kr_loaded = store.load(date(2026, 2, 8), "KR")
us_loaded = store.load(date(2026, 2, 8), "US")
assert kr_loaded is not None
assert us_loaded is not None
assert kr_loaded.market == "KR"
assert us_loaded.market == "US"
assert kr_loaded.stock_playbooks[0].stock_code == "005930"
assert us_loaded.stock_playbooks[0].stock_code == "AAPL"
def test_save_different_dates(self, store: PlaybookStore) -> None:
d1 = _make_playbook(target_date=date(2026, 2, 7))
d2 = _make_playbook(target_date=date(2026, 2, 8))
store.save(d1)
store.save(d2)
assert store.load(date(2026, 2, 7), "KR") is not None
assert store.load(date(2026, 2, 8), "KR") is not None
def test_replace_updates_data(self, store: PlaybookStore) -> None:
pb1 = _make_playbook(outlook=MarketOutlook.BEARISH)
store.save(pb1)
pb2 = _make_playbook(outlook=MarketOutlook.BULLISH)
store.save(pb2)
loaded = store.load(date(2026, 2, 8), "KR")
assert loaded is not None
assert loaded.market_outlook == MarketOutlook.BULLISH
# ---------------------------------------------------------------------------
# Status
# ---------------------------------------------------------------------------
class TestStatus:
def test_get_status(self, store: PlaybookStore) -> None:
store.save(_make_playbook())
status = store.get_status(date(2026, 2, 8), "KR")
assert status == PlaybookStatus.READY
def test_get_status_not_found(self, store: PlaybookStore) -> None:
assert store.get_status(date(2026, 1, 1), "KR") is None
def test_update_status(self, store: PlaybookStore) -> None:
store.save(_make_playbook())
updated = store.update_status(date(2026, 2, 8), "KR", PlaybookStatus.EXPIRED)
assert updated is True
status = store.get_status(date(2026, 2, 8), "KR")
assert status == PlaybookStatus.EXPIRED
def test_update_status_not_found(self, store: PlaybookStore) -> None:
updated = store.update_status(date(2026, 1, 1), "KR", PlaybookStatus.FAILED)
assert updated is False
# ---------------------------------------------------------------------------
# Match count
# ---------------------------------------------------------------------------
class TestMatchCount:
def test_increment_match_count(self, store: PlaybookStore) -> None:
store.save(_make_playbook())
store.increment_match_count(date(2026, 2, 8), "KR")
store.increment_match_count(date(2026, 2, 8), "KR")
stats = store.get_stats(date(2026, 2, 8), "KR")
assert stats is not None
assert stats["match_count"] == 2
def test_increment_not_found(self, store: PlaybookStore) -> None:
result = store.increment_match_count(date(2026, 1, 1), "KR")
assert result is False
# ---------------------------------------------------------------------------
# Stats
# ---------------------------------------------------------------------------
class TestStats:
def test_get_stats(self, store: PlaybookStore) -> None:
store.save(_make_playbook())
stats = store.get_stats(date(2026, 2, 8), "KR")
assert stats is not None
assert stats["status"] == "ready"
assert stats["token_count"] == 150
assert stats["scenario_count"] == 1
assert stats["match_count"] == 0
assert stats["generated_at"] != ""
def test_get_stats_not_found(self, store: PlaybookStore) -> None:
assert store.get_stats(date(2026, 1, 1), "KR") is None
# ---------------------------------------------------------------------------
# List recent
# ---------------------------------------------------------------------------
class TestListRecent:
def test_list_recent(self, store: PlaybookStore) -> None:
for day in range(5, 10):
store.save(_make_playbook(target_date=date(2026, 2, day)))
results = store.list_recent(market="KR", limit=3)
assert len(results) == 3
# Most recent first
assert results[0]["date"] == "2026-02-09"
assert results[2]["date"] == "2026-02-07"
def test_list_recent_all_markets(self, store: PlaybookStore) -> None:
store.save(_make_playbook(market="KR"))
store.save(_make_playbook(market="US", stock_codes=["AAPL"]))
results = store.list_recent(market=None, limit=10)
assert len(results) == 2
def test_list_recent_empty(self, store: PlaybookStore) -> None:
results = store.list_recent(market="KR")
assert results == []
def test_list_recent_filter_by_market(self, store: PlaybookStore) -> None:
store.save(_make_playbook(market="KR"))
store.save(_make_playbook(market="US", stock_codes=["AAPL"]))
kr_only = store.list_recent(market="KR")
assert len(kr_only) == 1
assert kr_only[0]["market"] == "KR"
# ---------------------------------------------------------------------------
# Delete
# ---------------------------------------------------------------------------
class TestDelete:
def test_delete(self, store: PlaybookStore) -> None:
store.save(_make_playbook())
deleted = store.delete(date(2026, 2, 8), "KR")
assert deleted is True
assert store.load(date(2026, 2, 8), "KR") is None
def test_delete_not_found(self, store: PlaybookStore) -> None:
deleted = store.delete(date(2026, 1, 1), "KR")
assert deleted is False
def test_delete_one_market_keeps_other(self, store: PlaybookStore) -> None:
store.save(_make_playbook(market="KR"))
store.save(_make_playbook(market="US", stock_codes=["AAPL"]))
store.delete(date(2026, 2, 8), "KR")
assert store.load(date(2026, 2, 8), "KR") is None
assert store.load(date(2026, 2, 8), "US") is not None

View File

@@ -0,0 +1,659 @@
"""Tests for PreMarketPlanner — Gemini prompt builder + response parser."""
from __future__ import annotations
import json
from datetime import date
from unittest.mock import AsyncMock, MagicMock
import pytest
from src.analysis.smart_scanner import ScanCandidate
from src.brain.context_selector import DecisionType
from src.brain.gemini_client import TradeDecision
from src.config import Settings
from src.context.store import ContextLayer
from src.strategy.models import (
CrossMarketContext,
DayPlaybook,
MarketOutlook,
ScenarioAction,
)
from src.strategy.pre_market_planner import PreMarketPlanner
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
def _candidate(
code: str = "005930",
name: str = "Samsung",
price: float = 71000,
rsi: float = 28.5,
volume_ratio: float = 3.2,
signal: str = "oversold",
score: float = 82.0,
) -> ScanCandidate:
return ScanCandidate(
stock_code=code,
name=name,
price=price,
volume=1_500_000,
volume_ratio=volume_ratio,
rsi=rsi,
signal=signal,
score=score,
)
def _gemini_response_json(
outlook: str = "neutral_to_bullish",
stocks: list[dict] | None = None,
global_rules: list[dict] | None = None,
) -> str:
"""Build a valid Gemini JSON response."""
if stocks is None:
stocks = [
{
"stock_code": "005930",
"scenarios": [
{
"condition": {"rsi_below": 30, "volume_ratio_above": 2.5},
"action": "BUY",
"confidence": 85,
"allocation_pct": 15.0,
"stop_loss_pct": -2.0,
"take_profit_pct": 4.0,
"rationale": "Oversold bounce with high volume",
}
],
}
]
if global_rules is None:
global_rules = [
{
"condition": "portfolio_pnl_pct < -2.0",
"action": "REDUCE_ALL",
"rationale": "Near circuit breaker",
}
]
return json.dumps(
{"market_outlook": outlook, "global_rules": global_rules, "stocks": stocks}
)
def _make_planner(
gemini_response: str = "",
token_count: int = 200,
context_data: dict | None = None,
scorecard_data: dict | None = None,
scorecard_map: dict[tuple[str, str, str], dict | None] | None = None,
) -> PreMarketPlanner:
"""Create a PreMarketPlanner with mocked dependencies."""
if not gemini_response:
gemini_response = _gemini_response_json()
# Mock GeminiClient
gemini = AsyncMock()
gemini.decide = AsyncMock(
return_value=TradeDecision(
action="HOLD",
confidence=0,
rationale=gemini_response,
token_count=token_count,
)
)
# Mock ContextStore
store = MagicMock()
if scorecard_map is not None:
store.get_context = MagicMock(
side_effect=lambda layer, timeframe, key: scorecard_map.get(
(layer.value if hasattr(layer, "value") else layer, timeframe, key)
)
)
else:
store.get_context = MagicMock(return_value=scorecard_data)
# Mock ContextSelector
selector = MagicMock()
selector.select_layers = MagicMock(
return_value=[ContextLayer.L7_REALTIME, ContextLayer.L6_DAILY]
)
selector.get_context_data = MagicMock(return_value=context_data or {})
settings = Settings(
KIS_APP_KEY="test",
KIS_APP_SECRET="test",
KIS_ACCOUNT_NO="12345678-01",
GEMINI_API_KEY="test",
)
return PreMarketPlanner(gemini, store, selector, settings)
# ---------------------------------------------------------------------------
# generate_playbook
# ---------------------------------------------------------------------------
class TestGeneratePlaybook:
@pytest.mark.asyncio
async def test_basic_generation(self) -> None:
planner = _make_planner()
candidates = [_candidate()]
pb = await planner.generate_playbook("KR", candidates, today=date(2026, 2, 8))
assert isinstance(pb, DayPlaybook)
assert pb.market == "KR"
assert pb.stock_count == 1
assert pb.scenario_count == 1
assert pb.market_outlook == MarketOutlook.NEUTRAL_TO_BULLISH
assert pb.token_count == 200
@pytest.mark.asyncio
async def test_empty_candidates_returns_empty_playbook(self) -> None:
planner = _make_planner()
pb = await planner.generate_playbook("KR", [], today=date(2026, 2, 8))
assert pb.stock_count == 0
assert pb.scenario_count == 0
assert pb.market_outlook == MarketOutlook.NEUTRAL
@pytest.mark.asyncio
async def test_gemini_failure_returns_defensive(self) -> None:
planner = _make_planner()
planner._gemini.decide = AsyncMock(side_effect=RuntimeError("API timeout"))
candidates = [_candidate()]
pb = await planner.generate_playbook("KR", candidates, today=date(2026, 2, 8))
assert pb.default_action == ScenarioAction.HOLD
assert pb.market_outlook == MarketOutlook.NEUTRAL_TO_BEARISH
assert pb.stock_count == 1
# Defensive playbook has stop-loss scenarios
assert pb.stock_playbooks[0].scenarios[0].action == ScenarioAction.SELL
@pytest.mark.asyncio
async def test_gemini_failure_empty_when_defensive_disabled(self) -> None:
planner = _make_planner()
planner._settings.DEFENSIVE_PLAYBOOK_ON_FAILURE = False
planner._gemini.decide = AsyncMock(side_effect=RuntimeError("fail"))
candidates = [_candidate()]
pb = await planner.generate_playbook("KR", candidates, today=date(2026, 2, 8))
assert pb.stock_count == 0
@pytest.mark.asyncio
async def test_multiple_candidates(self) -> None:
stocks = [
{
"stock_code": "005930",
"scenarios": [
{
"condition": {"rsi_below": 30},
"action": "BUY",
"confidence": 85,
"rationale": "Oversold",
}
],
},
{
"stock_code": "AAPL",
"scenarios": [
{
"condition": {"rsi_above": 75},
"action": "SELL",
"confidence": 80,
"rationale": "Overbought",
}
],
},
]
planner = _make_planner(gemini_response=_gemini_response_json(stocks=stocks))
candidates = [_candidate(), _candidate(code="AAPL", name="Apple")]
pb = await planner.generate_playbook("US", candidates, today=date(2026, 2, 8))
assert pb.stock_count == 2
codes = [sp.stock_code for sp in pb.stock_playbooks]
assert "005930" in codes
assert "AAPL" in codes
@pytest.mark.asyncio
async def test_unknown_stock_in_response_skipped(self) -> None:
stocks = [
{
"stock_code": "005930",
"scenarios": [
{
"condition": {"rsi_below": 30},
"action": "BUY",
"confidence": 85,
"rationale": "ok",
}
],
},
{
"stock_code": "UNKNOWN",
"scenarios": [
{
"condition": {"rsi_below": 20},
"action": "BUY",
"confidence": 90,
"rationale": "bad",
}
],
},
]
planner = _make_planner(gemini_response=_gemini_response_json(stocks=stocks))
candidates = [_candidate()] # Only 005930
pb = await planner.generate_playbook("KR", candidates, today=date(2026, 2, 8))
assert pb.stock_count == 1
assert pb.stock_playbooks[0].stock_code == "005930"
@pytest.mark.asyncio
async def test_global_rules_parsed(self) -> None:
planner = _make_planner()
candidates = [_candidate()]
pb = await planner.generate_playbook("KR", candidates, today=date(2026, 2, 8))
assert len(pb.global_rules) == 1
assert pb.global_rules[0].action == ScenarioAction.REDUCE_ALL
@pytest.mark.asyncio
async def test_token_count_from_decision(self) -> None:
planner = _make_planner(token_count=450)
candidates = [_candidate()]
pb = await planner.generate_playbook("KR", candidates, today=date(2026, 2, 8))
assert pb.token_count == 450
@pytest.mark.asyncio
async def test_generate_playbook_uses_strategic_context_selector(self) -> None:
planner = _make_planner()
candidates = [_candidate()]
await planner.generate_playbook("KR", candidates, today=date(2026, 2, 8))
planner._context_selector.select_layers.assert_called_once_with(
decision_type=DecisionType.STRATEGIC,
include_realtime=True,
)
planner._context_selector.get_context_data.assert_called_once()
@pytest.mark.asyncio
async def test_generate_playbook_injects_self_and_cross_scorecards(self) -> None:
scorecard_map = {
(ContextLayer.L6_DAILY.value, "2026-02-07", "scorecard_KR"): {
"total_pnl": -1.0,
"win_rate": 40,
"lessons": ["Tighten entries"],
},
(ContextLayer.L6_DAILY.value, "2026-02-07", "scorecard_US"): {
"total_pnl": 1.5,
"win_rate": 62,
"index_change_pct": 0.9,
"lessons": ["Follow momentum"],
},
}
planner = _make_planner(scorecard_map=scorecard_map)
await planner.generate_playbook("KR", [_candidate()], today=date(2026, 2, 8))
call_market_data = planner._gemini.decide.call_args.args[0]
prompt = call_market_data["prompt_override"]
assert "My Market Previous Day (KR)" in prompt
assert "Other Market (US)" in prompt
# ---------------------------------------------------------------------------
# _parse_response
# ---------------------------------------------------------------------------
class TestParseResponse:
def test_parse_full_response(self) -> None:
planner = _make_planner()
response = _gemini_response_json(outlook="bearish")
candidates = [_candidate()]
pb = planner._parse_response(response, date(2026, 2, 8), "KR", candidates, None)
assert pb.market_outlook == MarketOutlook.BEARISH
assert pb.stock_count == 1
assert pb.stock_playbooks[0].scenarios[0].confidence == 85
def test_parse_with_markdown_fences(self) -> None:
planner = _make_planner()
response = f"```json\n{_gemini_response_json()}\n```"
candidates = [_candidate()]
pb = planner._parse_response(response, date(2026, 2, 8), "KR", candidates, None)
assert pb.stock_count == 1
def test_parse_unknown_outlook_defaults_neutral(self) -> None:
planner = _make_planner()
response = _gemini_response_json(outlook="super_bullish")
candidates = [_candidate()]
pb = planner._parse_response(response, date(2026, 2, 8), "KR", candidates, None)
assert pb.market_outlook == MarketOutlook.NEUTRAL
def test_parse_scenario_with_all_condition_fields(self) -> None:
planner = _make_planner()
stocks = [
{
"stock_code": "005930",
"scenarios": [
{
"condition": {
"rsi_below": 25,
"volume_ratio_above": 3.0,
"price_change_pct_below": -2.0,
},
"action": "BUY",
"confidence": 92,
"allocation_pct": 20.0,
"stop_loss_pct": -3.0,
"take_profit_pct": 5.0,
"rationale": "Multi-condition entry",
}
],
}
]
response = _gemini_response_json(stocks=stocks)
candidates = [_candidate()]
pb = planner._parse_response(response, date(2026, 2, 8), "KR", candidates, None)
sc = pb.stock_playbooks[0].scenarios[0]
assert sc.condition.rsi_below == 25
assert sc.condition.volume_ratio_above == 3.0
assert sc.condition.price_change_pct_below == -2.0
assert sc.allocation_pct == 20.0
assert sc.stop_loss_pct == -3.0
assert sc.take_profit_pct == 5.0
def test_parse_empty_condition_scenario_skipped(self) -> None:
planner = _make_planner()
stocks = [
{
"stock_code": "005930",
"scenarios": [
{
"condition": {},
"action": "BUY",
"confidence": 85,
"rationale": "No conditions",
},
{
"condition": {"rsi_below": 30},
"action": "BUY",
"confidence": 80,
"rationale": "Valid",
},
],
}
]
response = _gemini_response_json(stocks=stocks)
candidates = [_candidate()]
pb = planner._parse_response(response, date(2026, 2, 8), "KR", candidates, None)
# Empty condition scenario skipped, valid one kept
assert pb.stock_count == 1
assert pb.stock_playbooks[0].scenarios[0].confidence == 80
def test_parse_max_scenarios_enforced(self) -> None:
planner = _make_planner()
# Settings default MAX_SCENARIOS_PER_STOCK = 5
scenarios = [
{
"condition": {"rsi_below": 20 + i},
"action": "BUY",
"confidence": 80 + i,
"rationale": f"Scenario {i}",
}
for i in range(8) # 8 scenarios, should be capped to 5
]
stocks = [{"stock_code": "005930", "scenarios": scenarios}]
response = _gemini_response_json(stocks=stocks)
candidates = [_candidate()]
pb = planner._parse_response(response, date(2026, 2, 8), "KR", candidates, None)
assert len(pb.stock_playbooks[0].scenarios) == 5
def test_parse_invalid_json_raises(self) -> None:
planner = _make_planner()
candidates = [_candidate()]
with pytest.raises(json.JSONDecodeError):
planner._parse_response("not json at all", date(2026, 2, 8), "KR", candidates, None)
def test_parse_cross_market_preserved(self) -> None:
planner = _make_planner()
response = _gemini_response_json()
candidates = [_candidate()]
cross = CrossMarketContext(market="US", date="2026-02-07", total_pnl=1.5, win_rate=60)
pb = planner._parse_response(response, date(2026, 2, 8), "KR", candidates, cross)
assert pb.cross_market is not None
assert pb.cross_market.market == "US"
assert pb.cross_market.total_pnl == 1.5
# ---------------------------------------------------------------------------
# build_cross_market_context
# ---------------------------------------------------------------------------
class TestBuildCrossMarketContext:
def test_kr_reads_us_scorecard(self) -> None:
scorecard = {
"total_pnl": 2.5,
"win_rate": 65,
"index_change_pct": 0.8,
"lessons": ["Stay patient"],
}
planner = _make_planner(scorecard_data=scorecard)
ctx = planner.build_cross_market_context("KR", today=date(2026, 2, 8))
assert ctx is not None
assert ctx.market == "US"
assert ctx.total_pnl == 2.5
assert ctx.win_rate == 65
assert "Stay patient" in ctx.lessons
# Verify it queried scorecard_US
planner._context_store.get_context.assert_called_once_with(
ContextLayer.L6_DAILY, "2026-02-07", "scorecard_US"
)
assert ctx.date == "2026-02-07"
def test_us_reads_kr_scorecard(self) -> None:
scorecard = {"total_pnl": -1.0, "win_rate": 40, "index_change_pct": -0.5}
planner = _make_planner(scorecard_data=scorecard)
ctx = planner.build_cross_market_context("US", today=date(2026, 2, 8))
assert ctx is not None
assert ctx.market == "KR"
assert ctx.total_pnl == -1.0
planner._context_store.get_context.assert_called_once_with(
ContextLayer.L6_DAILY, "2026-02-08", "scorecard_KR"
)
def test_no_scorecard_returns_none(self) -> None:
planner = _make_planner(scorecard_data=None)
ctx = planner.build_cross_market_context("KR", today=date(2026, 2, 8))
assert ctx is None
def test_invalid_scorecard_returns_none(self) -> None:
planner = _make_planner(scorecard_data="not a dict and not json")
ctx = planner.build_cross_market_context("KR", today=date(2026, 2, 8))
assert ctx is None
# ---------------------------------------------------------------------------
# build_self_market_scorecard
# ---------------------------------------------------------------------------
class TestBuildSelfMarketScorecard:
def test_reads_previous_day_scorecard(self) -> None:
scorecard = {"total_pnl": -1.2, "win_rate": 45, "lessons": ["Reduce overtrading"]}
planner = _make_planner(scorecard_data=scorecard)
data = planner.build_self_market_scorecard("KR", today=date(2026, 2, 8))
assert data is not None
assert data["date"] == "2026-02-07"
assert data["total_pnl"] == -1.2
assert data["win_rate"] == 45
assert "Reduce overtrading" in data["lessons"]
planner._context_store.get_context.assert_called_once_with(
ContextLayer.L6_DAILY, "2026-02-07", "scorecard_KR"
)
def test_missing_scorecard_returns_none(self) -> None:
planner = _make_planner(scorecard_data=None)
assert planner.build_self_market_scorecard("US", today=date(2026, 2, 8)) is None
# ---------------------------------------------------------------------------
# _build_prompt
# ---------------------------------------------------------------------------
class TestBuildPrompt:
def test_prompt_contains_candidates(self) -> None:
planner = _make_planner()
candidates = [_candidate(code="005930", name="Samsung")]
prompt = planner._build_prompt("KR", candidates, {}, None, None)
assert "005930" in prompt
assert "Samsung" in prompt
assert "RSI=" in prompt
assert "volume_ratio=" in prompt
def test_prompt_contains_cross_market(self) -> None:
planner = _make_planner()
cross = CrossMarketContext(
market="US", date="2026-02-07", total_pnl=1.5,
win_rate=60, index_change_pct=0.8, lessons=["Cut losses early"],
)
prompt = planner._build_prompt("KR", [_candidate()], {}, None, cross)
assert "Other Market (US)" in prompt
assert "+1.50%" in prompt
assert "Cut losses early" in prompt
def test_prompt_contains_context_data(self) -> None:
planner = _make_planner()
context = {"L6_DAILY": {"win_rate": 0.65, "total_pnl": 2.5}}
prompt = planner._build_prompt("KR", [_candidate()], context, None, None)
assert "Strategic Context" in prompt
assert "L6_DAILY" in prompt
assert "win_rate" in prompt
def test_prompt_contains_max_scenarios(self) -> None:
planner = _make_planner()
prompt = planner._build_prompt("KR", [_candidate()], {}, None, None)
assert f"Max {planner._settings.MAX_SCENARIOS_PER_STOCK} scenarios" in prompt
def test_prompt_market_name(self) -> None:
planner = _make_planner()
prompt = planner._build_prompt("US", [_candidate()], {}, None, None)
assert "US market" in prompt
def test_prompt_contains_self_market_scorecard(self) -> None:
planner = _make_planner()
self_scorecard = {
"date": "2026-02-07",
"total_pnl": -0.8,
"win_rate": 45.0,
"lessons": ["Avoid midday entries"],
}
prompt = planner._build_prompt("KR", [_candidate()], {}, self_scorecard, None)
assert "My Market Previous Day (KR)" in prompt
assert "2026-02-07" in prompt
assert "-0.80%" in prompt
assert "Avoid midday entries" in prompt
# ---------------------------------------------------------------------------
# _extract_json
# ---------------------------------------------------------------------------
class TestExtractJson:
def test_plain_json(self) -> None:
assert PreMarketPlanner._extract_json('{"a": 1}') == '{"a": 1}'
def test_with_json_fence(self) -> None:
text = '```json\n{"a": 1}\n```'
assert PreMarketPlanner._extract_json(text) == '{"a": 1}'
def test_with_plain_fence(self) -> None:
text = '```\n{"a": 1}\n```'
assert PreMarketPlanner._extract_json(text) == '{"a": 1}'
def test_with_whitespace(self) -> None:
text = ' \n {"a": 1} \n '
assert PreMarketPlanner._extract_json(text) == '{"a": 1}'
# ---------------------------------------------------------------------------
# Defensive playbook
# ---------------------------------------------------------------------------
class TestDefensivePlaybook:
def test_defensive_has_stop_loss(self) -> None:
candidates = [_candidate(code="005930"), _candidate(code="AAPL")]
pb = PreMarketPlanner._defensive_playbook(date(2026, 2, 8), "KR", candidates)
assert pb.default_action == ScenarioAction.HOLD
assert pb.market_outlook == MarketOutlook.NEUTRAL_TO_BEARISH
assert pb.stock_count == 2
for sp in pb.stock_playbooks:
assert sp.scenarios[0].action == ScenarioAction.SELL
assert sp.scenarios[0].stop_loss_pct == -3.0
def test_defensive_has_global_rule(self) -> None:
pb = PreMarketPlanner._defensive_playbook(date(2026, 2, 8), "KR", [_candidate()])
assert len(pb.global_rules) == 1
assert pb.global_rules[0].action == ScenarioAction.REDUCE_ALL
def test_empty_playbook(self) -> None:
pb = PreMarketPlanner._empty_playbook(date(2026, 2, 8), "US")
assert pb.stock_count == 0
assert pb.market == "US"
assert pb.market_outlook == MarketOutlook.NEUTRAL

View File

@@ -0,0 +1,442 @@
"""Tests for the local scenario engine."""
from __future__ import annotations
from datetime import date
import pytest
from src.strategy.models import (
DayPlaybook,
GlobalRule,
ScenarioAction,
StockCondition,
StockPlaybook,
StockScenario,
)
from src.strategy.scenario_engine import ScenarioEngine, ScenarioMatch
@pytest.fixture
def engine() -> ScenarioEngine:
return ScenarioEngine()
def _scenario(
rsi_below: float | None = None,
rsi_above: float | None = None,
volume_ratio_above: float | None = None,
action: ScenarioAction = ScenarioAction.BUY,
confidence: int = 85,
**kwargs,
) -> StockScenario:
return StockScenario(
condition=StockCondition(
rsi_below=rsi_below,
rsi_above=rsi_above,
volume_ratio_above=volume_ratio_above,
**kwargs,
),
action=action,
confidence=confidence,
rationale=f"Test scenario: {action.value}",
)
def _playbook(
stock_code: str = "005930",
scenarios: list[StockScenario] | None = None,
global_rules: list[GlobalRule] | None = None,
default_action: ScenarioAction = ScenarioAction.HOLD,
) -> DayPlaybook:
if scenarios is None:
scenarios = [_scenario(rsi_below=30.0)]
return DayPlaybook(
date=date(2026, 2, 7),
market="KR",
stock_playbooks=[StockPlaybook(stock_code=stock_code, scenarios=scenarios)],
global_rules=global_rules or [],
default_action=default_action,
)
# ---------------------------------------------------------------------------
# evaluate_condition
# ---------------------------------------------------------------------------
class TestEvaluateCondition:
def test_rsi_below_match(self, engine: ScenarioEngine) -> None:
cond = StockCondition(rsi_below=30.0)
assert engine.evaluate_condition(cond, {"rsi": 25.0})
def test_rsi_below_no_match(self, engine: ScenarioEngine) -> None:
cond = StockCondition(rsi_below=30.0)
assert not engine.evaluate_condition(cond, {"rsi": 35.0})
def test_rsi_above_match(self, engine: ScenarioEngine) -> None:
cond = StockCondition(rsi_above=70.0)
assert engine.evaluate_condition(cond, {"rsi": 75.0})
def test_rsi_above_no_match(self, engine: ScenarioEngine) -> None:
cond = StockCondition(rsi_above=70.0)
assert not engine.evaluate_condition(cond, {"rsi": 65.0})
def test_volume_ratio_above_match(self, engine: ScenarioEngine) -> None:
cond = StockCondition(volume_ratio_above=3.0)
assert engine.evaluate_condition(cond, {"volume_ratio": 4.5})
def test_volume_ratio_below_match(self, engine: ScenarioEngine) -> None:
cond = StockCondition(volume_ratio_below=1.0)
assert engine.evaluate_condition(cond, {"volume_ratio": 0.5})
def test_price_above_match(self, engine: ScenarioEngine) -> None:
cond = StockCondition(price_above=50000)
assert engine.evaluate_condition(cond, {"current_price": 55000})
def test_price_below_match(self, engine: ScenarioEngine) -> None:
cond = StockCondition(price_below=50000)
assert engine.evaluate_condition(cond, {"current_price": 45000})
def test_price_change_pct_above_match(self, engine: ScenarioEngine) -> None:
cond = StockCondition(price_change_pct_above=2.0)
assert engine.evaluate_condition(cond, {"price_change_pct": 3.5})
def test_price_change_pct_below_match(self, engine: ScenarioEngine) -> None:
cond = StockCondition(price_change_pct_below=-3.0)
assert engine.evaluate_condition(cond, {"price_change_pct": -4.0})
def test_multiple_conditions_and_logic(self, engine: ScenarioEngine) -> None:
cond = StockCondition(rsi_below=30.0, volume_ratio_above=3.0)
# Both met
assert engine.evaluate_condition(cond, {"rsi": 25.0, "volume_ratio": 4.0})
# Only RSI met
assert not engine.evaluate_condition(cond, {"rsi": 25.0, "volume_ratio": 2.0})
# Only volume met
assert not engine.evaluate_condition(cond, {"rsi": 35.0, "volume_ratio": 4.0})
# Neither met
assert not engine.evaluate_condition(cond, {"rsi": 35.0, "volume_ratio": 2.0})
def test_empty_condition_returns_false(self, engine: ScenarioEngine) -> None:
cond = StockCondition()
assert not engine.evaluate_condition(cond, {"rsi": 25.0})
def test_missing_data_returns_false(self, engine: ScenarioEngine) -> None:
cond = StockCondition(rsi_below=30.0)
assert not engine.evaluate_condition(cond, {})
def test_none_data_returns_false(self, engine: ScenarioEngine) -> None:
cond = StockCondition(rsi_below=30.0)
assert not engine.evaluate_condition(cond, {"rsi": None})
def test_boundary_value_not_matched(self, engine: ScenarioEngine) -> None:
"""rsi_below=30 should NOT match rsi=30 (strict less than)."""
cond = StockCondition(rsi_below=30.0)
assert not engine.evaluate_condition(cond, {"rsi": 30.0})
def test_boundary_value_above_not_matched(self, engine: ScenarioEngine) -> None:
"""rsi_above=70 should NOT match rsi=70 (strict greater than)."""
cond = StockCondition(rsi_above=70.0)
assert not engine.evaluate_condition(cond, {"rsi": 70.0})
def test_string_value_no_exception(self, engine: ScenarioEngine) -> None:
"""String numeric value should not raise TypeError."""
cond = StockCondition(rsi_below=30.0)
# "25" can be cast to float → should match
assert engine.evaluate_condition(cond, {"rsi": "25"})
# "35" → should not match
assert not engine.evaluate_condition(cond, {"rsi": "35"})
def test_percent_string_returns_false(self, engine: ScenarioEngine) -> None:
"""Percent string like '30%' cannot be cast to float → False, no exception."""
cond = StockCondition(rsi_below=30.0)
assert not engine.evaluate_condition(cond, {"rsi": "30%"})
def test_decimal_value_no_exception(self, engine: ScenarioEngine) -> None:
"""Decimal values should be safely handled."""
from decimal import Decimal
cond = StockCondition(rsi_below=30.0)
assert engine.evaluate_condition(cond, {"rsi": Decimal("25.0")})
def test_mixed_invalid_types_no_exception(self, engine: ScenarioEngine) -> None:
"""Various invalid types should not raise exceptions."""
cond = StockCondition(
rsi_below=30.0, volume_ratio_above=2.0,
price_above=100, price_change_pct_below=-1.0,
)
data = {
"rsi": [25], # list
"volume_ratio": "bad", # non-numeric string
"current_price": {}, # dict
"price_change_pct": object(), # arbitrary object
}
# Should return False (invalid types → None → False), never raise
assert not engine.evaluate_condition(cond, data)
def test_missing_key_logs_warning_once(self, caplog) -> None:
"""Missing key warning should fire only once per key per engine instance."""
import logging
eng = ScenarioEngine()
cond = StockCondition(rsi_below=30.0)
with caplog.at_level(logging.WARNING):
eng.evaluate_condition(cond, {})
eng.evaluate_condition(cond, {})
eng.evaluate_condition(cond, {})
# Warning should appear exactly once despite 3 calls
assert caplog.text.count("'rsi' but key missing") == 1
# ---------------------------------------------------------------------------
# check_global_rules
# ---------------------------------------------------------------------------
class TestCheckGlobalRules:
def test_no_rules(self, engine: ScenarioEngine) -> None:
pb = _playbook(global_rules=[])
result = engine.check_global_rules(pb, {"portfolio_pnl_pct": -1.0})
assert result is None
def test_rule_triggered(self, engine: ScenarioEngine) -> None:
pb = _playbook(
global_rules=[
GlobalRule(
condition="portfolio_pnl_pct < -2.0",
action=ScenarioAction.REDUCE_ALL,
rationale="Near circuit breaker",
),
]
)
result = engine.check_global_rules(pb, {"portfolio_pnl_pct": -2.5})
assert result is not None
assert result.action == ScenarioAction.REDUCE_ALL
def test_rule_not_triggered(self, engine: ScenarioEngine) -> None:
pb = _playbook(
global_rules=[
GlobalRule(
condition="portfolio_pnl_pct < -2.0",
action=ScenarioAction.REDUCE_ALL,
),
]
)
result = engine.check_global_rules(pb, {"portfolio_pnl_pct": -1.0})
assert result is None
def test_first_rule_wins(self, engine: ScenarioEngine) -> None:
pb = _playbook(
global_rules=[
GlobalRule(condition="portfolio_pnl_pct < -2.0", action=ScenarioAction.REDUCE_ALL),
GlobalRule(condition="portfolio_pnl_pct < -1.0", action=ScenarioAction.HOLD),
]
)
result = engine.check_global_rules(pb, {"portfolio_pnl_pct": -2.5})
assert result is not None
assert result.action == ScenarioAction.REDUCE_ALL
def test_greater_than_operator(self, engine: ScenarioEngine) -> None:
pb = _playbook(
global_rules=[
GlobalRule(condition="volatility_index > 30", action=ScenarioAction.HOLD),
]
)
result = engine.check_global_rules(pb, {"volatility_index": 35})
assert result is not None
def test_missing_field_not_triggered(self, engine: ScenarioEngine) -> None:
pb = _playbook(
global_rules=[
GlobalRule(condition="unknown_field < -2.0", action=ScenarioAction.REDUCE_ALL),
]
)
result = engine.check_global_rules(pb, {"portfolio_pnl_pct": -5.0})
assert result is None
def test_invalid_condition_format(self, engine: ScenarioEngine) -> None:
pb = _playbook(
global_rules=[
GlobalRule(condition="bad format", action=ScenarioAction.HOLD),
]
)
result = engine.check_global_rules(pb, {})
assert result is None
def test_le_operator(self, engine: ScenarioEngine) -> None:
pb = _playbook(
global_rules=[
GlobalRule(condition="portfolio_pnl_pct <= -2.0", action=ScenarioAction.REDUCE_ALL),
]
)
assert engine.check_global_rules(pb, {"portfolio_pnl_pct": -2.0}) is not None
assert engine.check_global_rules(pb, {"portfolio_pnl_pct": -1.9}) is None
def test_ge_operator(self, engine: ScenarioEngine) -> None:
pb = _playbook(
global_rules=[
GlobalRule(condition="volatility >= 80.0", action=ScenarioAction.HOLD),
]
)
assert engine.check_global_rules(pb, {"volatility": 80.0}) is not None
assert engine.check_global_rules(pb, {"volatility": 79.9}) is None
# ---------------------------------------------------------------------------
# evaluate (full pipeline)
# ---------------------------------------------------------------------------
class TestEvaluate:
def test_scenario_match(self, engine: ScenarioEngine) -> None:
pb = _playbook(scenarios=[_scenario(rsi_below=30.0)])
result = engine.evaluate(pb, "005930", {"rsi": 25.0}, {})
assert result.action == ScenarioAction.BUY
assert result.confidence == 85
assert result.matched_scenario is not None
def test_no_scenario_match_returns_default(self, engine: ScenarioEngine) -> None:
pb = _playbook(scenarios=[_scenario(rsi_below=30.0)])
result = engine.evaluate(pb, "005930", {"rsi": 50.0}, {})
assert result.action == ScenarioAction.HOLD
assert result.confidence == 0
assert result.matched_scenario is None
def test_stock_not_in_playbook(self, engine: ScenarioEngine) -> None:
pb = _playbook(stock_code="005930")
result = engine.evaluate(pb, "AAPL", {"rsi": 25.0}, {})
assert result.action == ScenarioAction.HOLD
assert result.confidence == 0
def test_global_rule_takes_priority(self, engine: ScenarioEngine) -> None:
pb = _playbook(
scenarios=[_scenario(rsi_below=30.0)],
global_rules=[
GlobalRule(
condition="portfolio_pnl_pct < -2.0",
action=ScenarioAction.REDUCE_ALL,
rationale="Loss limit",
),
],
)
result = engine.evaluate(
pb,
"005930",
{"rsi": 25.0}, # Would match scenario
{"portfolio_pnl_pct": -2.5}, # But global rule triggers first
)
assert result.action == ScenarioAction.REDUCE_ALL
assert result.global_rule_triggered is not None
assert result.matched_scenario is None
def test_first_scenario_wins(self, engine: ScenarioEngine) -> None:
pb = _playbook(
scenarios=[
_scenario(rsi_below=30.0, action=ScenarioAction.BUY, confidence=90),
_scenario(rsi_below=25.0, action=ScenarioAction.BUY, confidence=95),
]
)
result = engine.evaluate(pb, "005930", {"rsi": 20.0}, {})
# Both match, but first wins
assert result.confidence == 90
def test_sell_scenario(self, engine: ScenarioEngine) -> None:
pb = _playbook(
scenarios=[
_scenario(rsi_above=75.0, action=ScenarioAction.SELL, confidence=80),
]
)
result = engine.evaluate(pb, "005930", {"rsi": 80.0}, {})
assert result.action == ScenarioAction.SELL
def test_empty_playbook(self, engine: ScenarioEngine) -> None:
pb = DayPlaybook(date=date(2026, 2, 7), market="KR", stock_playbooks=[])
result = engine.evaluate(pb, "005930", {"rsi": 25.0}, {})
assert result.action == ScenarioAction.HOLD
def test_match_details_populated(self, engine: ScenarioEngine) -> None:
pb = _playbook(scenarios=[_scenario(rsi_below=30.0, volume_ratio_above=2.0)])
result = engine.evaluate(
pb, "005930", {"rsi": 25.0, "volume_ratio": 3.0}, {}
)
assert result.match_details.get("rsi") == 25.0
assert result.match_details.get("volume_ratio") == 3.0
def test_custom_default_action(self, engine: ScenarioEngine) -> None:
pb = _playbook(
scenarios=[_scenario(rsi_below=10.0)], # Very unlikely to match
default_action=ScenarioAction.SELL,
)
result = engine.evaluate(pb, "005930", {"rsi": 50.0}, {})
assert result.action == ScenarioAction.SELL
def test_multiple_stocks_in_playbook(self, engine: ScenarioEngine) -> None:
pb = DayPlaybook(
date=date(2026, 2, 7),
market="US",
stock_playbooks=[
StockPlaybook(
stock_code="AAPL",
scenarios=[_scenario(rsi_below=25.0, confidence=90)],
),
StockPlaybook(
stock_code="MSFT",
scenarios=[_scenario(rsi_above=75.0, action=ScenarioAction.SELL, confidence=80)],
),
],
)
aapl = engine.evaluate(pb, "AAPL", {"rsi": 20.0}, {})
assert aapl.action == ScenarioAction.BUY
assert aapl.confidence == 90
msft = engine.evaluate(pb, "MSFT", {"rsi": 80.0}, {})
assert msft.action == ScenarioAction.SELL
def test_complex_multi_condition(self, engine: ScenarioEngine) -> None:
pb = _playbook(
scenarios=[
_scenario(
rsi_below=30.0,
volume_ratio_above=3.0,
price_change_pct_below=-2.0,
confidence=95,
),
]
)
# All conditions met
result = engine.evaluate(
pb,
"005930",
{"rsi": 22.0, "volume_ratio": 4.0, "price_change_pct": -3.0},
{},
)
assert result.action == ScenarioAction.BUY
assert result.confidence == 95
# One condition not met
result2 = engine.evaluate(
pb,
"005930",
{"rsi": 22.0, "volume_ratio": 4.0, "price_change_pct": -1.0},
{},
)
assert result2.action == ScenarioAction.HOLD
def test_scenario_match_returns_rationale(self, engine: ScenarioEngine) -> None:
pb = _playbook(scenarios=[_scenario(rsi_below=30.0)])
result = engine.evaluate(pb, "005930", {"rsi": 25.0}, {})
assert result.rationale != ""
def test_result_stock_code(self, engine: ScenarioEngine) -> None:
pb = _playbook()
result = engine.evaluate(pb, "005930", {"rsi": 25.0}, {})
assert result.stock_code == "005930"
def test_match_details_normalized(self, engine: ScenarioEngine) -> None:
"""match_details should contain _safe_float normalized values, not raw."""
pb = _playbook(scenarios=[_scenario(rsi_below=30.0)])
# Pass string value — should be normalized to float in match_details
result = engine.evaluate(pb, "005930", {"rsi": "25.0"}, {})
assert result.action == ScenarioAction.BUY
assert result.match_details["rsi"] == 25.0
assert isinstance(result.match_details["rsi"], float)

81
tests/test_scorecard.py Normal file
View File

@@ -0,0 +1,81 @@
"""Tests for DailyScorecard model."""
from __future__ import annotations
from src.evolution.scorecard import DailyScorecard
def test_scorecard_initialization() -> None:
scorecard = DailyScorecard(
date="2026-02-08",
market="KR",
total_decisions=10,
buys=3,
sells=2,
holds=5,
total_pnl=1234.5,
win_rate=60.0,
avg_confidence=78.5,
scenario_match_rate=70.0,
top_winners=["005930", "000660"],
top_losers=["035420"],
lessons=["Avoid chasing breakouts"],
cross_market_note="US volatility spillover",
)
assert scorecard.market == "KR"
assert scorecard.total_decisions == 10
assert scorecard.total_pnl == 1234.5
assert scorecard.top_winners == ["005930", "000660"]
assert scorecard.lessons == ["Avoid chasing breakouts"]
assert scorecard.cross_market_note == "US volatility spillover"
def test_scorecard_defaults() -> None:
scorecard = DailyScorecard(
date="2026-02-08",
market="US",
total_decisions=0,
buys=0,
sells=0,
holds=0,
total_pnl=0.0,
win_rate=0.0,
avg_confidence=0.0,
scenario_match_rate=0.0,
)
assert scorecard.top_winners == []
assert scorecard.top_losers == []
assert scorecard.lessons == []
assert scorecard.cross_market_note == ""
def test_scorecard_list_isolation() -> None:
a = DailyScorecard(
date="2026-02-08",
market="KR",
total_decisions=1,
buys=1,
sells=0,
holds=0,
total_pnl=10.0,
win_rate=100.0,
avg_confidence=90.0,
scenario_match_rate=100.0,
)
b = DailyScorecard(
date="2026-02-08",
market="US",
total_decisions=1,
buys=0,
sells=1,
holds=0,
total_pnl=-5.0,
win_rate=0.0,
avg_confidence=60.0,
scenario_match_rate=50.0,
)
a.top_winners.append("005930")
assert b.top_winners == []

View File

@@ -0,0 +1,366 @@
"""Tests for strategy/playbook Pydantic models."""
from __future__ import annotations
from datetime import date
import pytest
from pydantic import ValidationError
from src.strategy.models import (
CrossMarketContext,
DayPlaybook,
GlobalRule,
MarketOutlook,
PlaybookStatus,
ScenarioAction,
StockCondition,
StockPlaybook,
StockScenario,
)
# ---------------------------------------------------------------------------
# StockCondition
# ---------------------------------------------------------------------------
class TestStockCondition:
def test_empty_condition(self) -> None:
cond = StockCondition()
assert not cond.has_any_condition()
def test_single_field(self) -> None:
cond = StockCondition(rsi_below=30.0)
assert cond.has_any_condition()
def test_multiple_fields(self) -> None:
cond = StockCondition(rsi_below=25.0, volume_ratio_above=3.0)
assert cond.has_any_condition()
def test_all_fields(self) -> None:
cond = StockCondition(
rsi_below=30,
rsi_above=10,
volume_ratio_above=2.0,
volume_ratio_below=10.0,
price_above=1000,
price_below=50000,
price_change_pct_above=-5.0,
price_change_pct_below=5.0,
)
assert cond.has_any_condition()
# ---------------------------------------------------------------------------
# StockScenario
# ---------------------------------------------------------------------------
class TestStockScenario:
def test_valid_scenario(self) -> None:
s = StockScenario(
condition=StockCondition(rsi_below=25.0),
action=ScenarioAction.BUY,
confidence=85,
allocation_pct=15.0,
stop_loss_pct=-2.0,
take_profit_pct=3.0,
rationale="Oversold bounce expected",
)
assert s.action == ScenarioAction.BUY
assert s.confidence == 85
def test_confidence_too_high(self) -> None:
with pytest.raises(ValidationError):
StockScenario(
condition=StockCondition(),
action=ScenarioAction.BUY,
confidence=101,
)
def test_confidence_too_low(self) -> None:
with pytest.raises(ValidationError):
StockScenario(
condition=StockCondition(),
action=ScenarioAction.BUY,
confidence=-1,
)
def test_allocation_too_high(self) -> None:
with pytest.raises(ValidationError):
StockScenario(
condition=StockCondition(),
action=ScenarioAction.BUY,
confidence=80,
allocation_pct=101.0,
)
def test_stop_loss_must_be_negative(self) -> None:
with pytest.raises(ValidationError):
StockScenario(
condition=StockCondition(),
action=ScenarioAction.BUY,
confidence=80,
stop_loss_pct=1.0,
)
def test_take_profit_must_be_positive(self) -> None:
with pytest.raises(ValidationError):
StockScenario(
condition=StockCondition(),
action=ScenarioAction.BUY,
confidence=80,
take_profit_pct=-1.0,
)
def test_defaults(self) -> None:
s = StockScenario(
condition=StockCondition(),
action=ScenarioAction.HOLD,
confidence=50,
)
assert s.allocation_pct == 10.0
assert s.stop_loss_pct == -2.0
assert s.take_profit_pct == 3.0
assert s.rationale == ""
# ---------------------------------------------------------------------------
# StockPlaybook
# ---------------------------------------------------------------------------
class TestStockPlaybook:
def test_valid_playbook(self) -> None:
pb = StockPlaybook(
stock_code="005930",
stock_name="Samsung Electronics",
scenarios=[
StockScenario(
condition=StockCondition(rsi_below=25.0),
action=ScenarioAction.BUY,
confidence=85,
),
],
)
assert pb.stock_code == "005930"
assert len(pb.scenarios) == 1
def test_empty_scenarios_rejected(self) -> None:
with pytest.raises(ValidationError):
StockPlaybook(
stock_code="005930",
scenarios=[],
)
def test_multiple_scenarios(self) -> None:
pb = StockPlaybook(
stock_code="AAPL",
scenarios=[
StockScenario(
condition=StockCondition(rsi_below=25.0),
action=ScenarioAction.BUY,
confidence=85,
),
StockScenario(
condition=StockCondition(rsi_above=75.0),
action=ScenarioAction.SELL,
confidence=80,
),
],
)
assert len(pb.scenarios) == 2
# ---------------------------------------------------------------------------
# GlobalRule
# ---------------------------------------------------------------------------
class TestGlobalRule:
def test_valid_rule(self) -> None:
rule = GlobalRule(
condition="portfolio_pnl_pct < -2.0",
action=ScenarioAction.REDUCE_ALL,
rationale="Risk limit approaching",
)
assert rule.action == ScenarioAction.REDUCE_ALL
def test_hold_rule(self) -> None:
rule = GlobalRule(
condition="volatility_index > 30",
action=ScenarioAction.HOLD,
)
assert rule.rationale == ""
# ---------------------------------------------------------------------------
# CrossMarketContext
# ---------------------------------------------------------------------------
class TestCrossMarketContext:
def test_valid_context(self) -> None:
ctx = CrossMarketContext(
market="US",
date="2026-02-07",
total_pnl=-1.5,
win_rate=40.0,
index_change_pct=-2.3,
key_events=["Fed rate decision"],
lessons=["Avoid tech sector on rate hike days"],
)
assert ctx.market == "US"
assert len(ctx.key_events) == 1
def test_defaults(self) -> None:
ctx = CrossMarketContext(market="KR", date="2026-02-07")
assert ctx.total_pnl == 0.0
assert ctx.key_events == []
assert ctx.lessons == []
# ---------------------------------------------------------------------------
# DayPlaybook
# ---------------------------------------------------------------------------
def _make_scenario(rsi_below: float = 25.0) -> StockScenario:
return StockScenario(
condition=StockCondition(rsi_below=rsi_below),
action=ScenarioAction.BUY,
confidence=85,
)
def _make_playbook(**kwargs) -> DayPlaybook:
defaults = {
"date": date(2026, 2, 7),
"market": "KR",
"stock_playbooks": [
StockPlaybook(stock_code="005930", scenarios=[_make_scenario()]),
],
}
defaults.update(kwargs)
return DayPlaybook(**defaults)
class TestDayPlaybook:
def test_valid_playbook(self) -> None:
pb = _make_playbook()
assert pb.market == "KR"
assert pb.date == date(2026, 2, 7)
assert pb.default_action == ScenarioAction.HOLD
assert pb.scenario_count == 1
assert pb.stock_count == 1
def test_generated_at_auto_set(self) -> None:
pb = _make_playbook()
assert pb.generated_at != ""
def test_explicit_generated_at(self) -> None:
pb = _make_playbook(generated_at="2026-02-07T08:30:00")
assert pb.generated_at == "2026-02-07T08:30:00"
def test_duplicate_stocks_rejected(self) -> None:
with pytest.raises(ValidationError):
DayPlaybook(
date=date(2026, 2, 7),
market="KR",
stock_playbooks=[
StockPlaybook(stock_code="005930", scenarios=[_make_scenario()]),
StockPlaybook(stock_code="005930", scenarios=[_make_scenario(30)]),
],
)
def test_empty_stock_playbooks_allowed(self) -> None:
pb = DayPlaybook(
date=date(2026, 2, 7),
market="KR",
stock_playbooks=[],
)
assert pb.stock_count == 0
assert pb.scenario_count == 0
def test_get_stock_playbook_found(self) -> None:
pb = _make_playbook()
result = pb.get_stock_playbook("005930")
assert result is not None
assert result.stock_code == "005930"
def test_get_stock_playbook_not_found(self) -> None:
pb = _make_playbook()
result = pb.get_stock_playbook("AAPL")
assert result is None
def test_with_global_rules(self) -> None:
pb = _make_playbook(
global_rules=[
GlobalRule(
condition="portfolio_pnl_pct < -2.0",
action=ScenarioAction.REDUCE_ALL,
),
],
)
assert len(pb.global_rules) == 1
def test_with_cross_market_context(self) -> None:
ctx = CrossMarketContext(market="US", date="2026-02-07", total_pnl=-1.5)
pb = _make_playbook(cross_market=ctx)
assert pb.cross_market is not None
assert pb.cross_market.market == "US"
def test_market_outlook(self) -> None:
pb = _make_playbook(market_outlook=MarketOutlook.BEARISH)
assert pb.market_outlook == MarketOutlook.BEARISH
def test_multiple_stocks_multiple_scenarios(self) -> None:
pb = DayPlaybook(
date=date(2026, 2, 7),
market="US",
stock_playbooks=[
StockPlaybook(
stock_code="AAPL",
scenarios=[_make_scenario(), _make_scenario(30)],
),
StockPlaybook(
stock_code="MSFT",
scenarios=[_make_scenario()],
),
],
)
assert pb.stock_count == 2
assert pb.scenario_count == 3
def test_serialization_roundtrip(self) -> None:
pb = _make_playbook(
market_outlook=MarketOutlook.BULLISH,
cross_market=CrossMarketContext(market="US", date="2026-02-07"),
)
json_str = pb.model_dump_json()
restored = DayPlaybook.model_validate_json(json_str)
assert restored.market == pb.market
assert restored.date == pb.date
assert restored.scenario_count == pb.scenario_count
assert restored.cross_market is not None
# ---------------------------------------------------------------------------
# Enums
# ---------------------------------------------------------------------------
class TestEnums:
def test_scenario_action_values(self) -> None:
assert ScenarioAction.BUY.value == "BUY"
assert ScenarioAction.SELL.value == "SELL"
assert ScenarioAction.HOLD.value == "HOLD"
assert ScenarioAction.REDUCE_ALL.value == "REDUCE_ALL"
def test_market_outlook_values(self) -> None:
assert len(MarketOutlook) == 5
def test_playbook_status_values(self) -> None:
assert PlaybookStatus.READY.value == "ready"
assert PlaybookStatus.EXPIRED.value == "expired"

View File

@@ -160,6 +160,83 @@ class TestNotificationSending:
assert "250.50" in payload["text"] assert "250.50" in payload["text"]
assert "92%" in payload["text"] assert "92%" in payload["text"]
@pytest.mark.asyncio
async def test_playbook_generated_format(self) -> None:
"""Playbook generated notification has expected fields."""
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True
)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
await client.notify_playbook_generated(
market="KR",
stock_count=4,
scenario_count=12,
token_count=980,
)
payload = mock_post.call_args.kwargs["json"]
assert "Playbook Generated" in payload["text"]
assert "Market: KR" in payload["text"]
assert "Stocks: 4" in payload["text"]
assert "Scenarios: 12" in payload["text"]
assert "Tokens: 980" in payload["text"]
@pytest.mark.asyncio
async def test_scenario_matched_format(self) -> None:
"""Scenario matched notification has expected fields."""
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True
)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
await client.notify_scenario_matched(
stock_code="AAPL",
action="BUY",
condition_summary="RSI < 30, volume_ratio > 2.0",
confidence=88.2,
)
payload = mock_post.call_args.kwargs["json"]
assert "Scenario Matched" in payload["text"]
assert "AAPL" in payload["text"]
assert "Action: BUY" in payload["text"]
assert "RSI < 30" in payload["text"]
assert "88%" in payload["text"]
@pytest.mark.asyncio
async def test_playbook_failed_format(self) -> None:
"""Playbook failed notification has expected fields."""
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True
)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
await client.notify_playbook_failed(
market="US",
reason="Gemini timeout",
)
payload = mock_post.call_args.kwargs["json"]
assert "Playbook Failed" in payload["text"]
assert "Market: US" in payload["text"]
assert "Gemini timeout" in payload["text"]
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_circuit_breaker_priority(self) -> None: async def test_circuit_breaker_priority(self) -> None:
"""Circuit breaker uses CRITICAL priority.""" """Circuit breaker uses CRITICAL priority."""
@@ -309,6 +386,73 @@ class TestMessagePriorities:
payload = mock_post.call_args.kwargs["json"] payload = mock_post.call_args.kwargs["json"]
assert NotificationPriority.CRITICAL.emoji in payload["text"] assert NotificationPriority.CRITICAL.emoji in payload["text"]
@pytest.mark.asyncio
async def test_playbook_generated_priority(self) -> None:
"""Playbook generated uses MEDIUM priority emoji."""
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True
)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
await client.notify_playbook_generated(
market="KR",
stock_count=2,
scenario_count=4,
token_count=123,
)
payload = mock_post.call_args.kwargs["json"]
assert NotificationPriority.MEDIUM.emoji in payload["text"]
@pytest.mark.asyncio
async def test_playbook_failed_priority(self) -> None:
"""Playbook failed uses HIGH priority emoji."""
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True
)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
await client.notify_playbook_failed(
market="KR",
reason="Invalid JSON",
)
payload = mock_post.call_args.kwargs["json"]
assert NotificationPriority.HIGH.emoji in payload["text"]
@pytest.mark.asyncio
async def test_scenario_matched_priority(self) -> None:
"""Scenario matched uses HIGH priority emoji."""
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True
)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
await client.notify_scenario_matched(
stock_code="AAPL",
action="BUY",
condition_summary="RSI < 30",
confidence=80.0,
)
payload = mock_post.call_args.kwargs["json"]
assert NotificationPriority.HIGH.emoji in payload["text"]
class TestClientCleanup: class TestClientCleanup:
"""Test client cleanup behavior.""" """Test client cleanup behavior."""

View File

@@ -682,6 +682,10 @@ class TestBasicCommands:
"/help - Show available commands\n" "/help - Show available commands\n"
"/status - Trading status (mode, markets, P&L)\n" "/status - Trading status (mode, markets, P&L)\n"
"/positions - Current holdings\n" "/positions - Current holdings\n"
"/report - Daily summary report\n"
"/scenarios - Today's playbook scenarios\n"
"/review - Recent scorecards\n"
"/dashboard - Dashboard URL/status\n"
"/stop - Pause trading\n" "/stop - Pause trading\n"
"/resume - Resume trading" "/resume - Resume trading"
) )
@@ -707,10 +711,106 @@ class TestBasicCommands:
assert "/help" in payload["text"] assert "/help" in payload["text"]
assert "/status" in payload["text"] assert "/status" in payload["text"]
assert "/positions" in payload["text"] assert "/positions" in payload["text"]
assert "/report" in payload["text"]
assert "/scenarios" in payload["text"]
assert "/review" in payload["text"]
assert "/dashboard" in payload["text"]
assert "/stop" in payload["text"] assert "/stop" in payload["text"]
assert "/resume" in payload["text"] assert "/resume" in payload["text"]
class TestExtendedCommands:
"""Test additional bot commands."""
@pytest.mark.asyncio
async def test_report_command(self) -> None:
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
async def mock_report() -> None:
await client.send_message("<b>📈 Daily Report</b>\n\nTrades: 1")
handler.register_command("report", mock_report)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
await handler._handle_update(
{"update_id": 1, "message": {"chat": {"id": 456}, "text": "/report"}}
)
payload = mock_post.call_args.kwargs["json"]
assert "Daily Report" in payload["text"]
@pytest.mark.asyncio
async def test_scenarios_command(self) -> None:
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
async def mock_scenarios() -> None:
await client.send_message("<b>🧠 Today's Scenarios</b>\n\n- AAPL: BUY (85)")
handler.register_command("scenarios", mock_scenarios)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
await handler._handle_update(
{"update_id": 1, "message": {"chat": {"id": 456}, "text": "/scenarios"}}
)
payload = mock_post.call_args.kwargs["json"]
assert "Today's Scenarios" in payload["text"]
@pytest.mark.asyncio
async def test_review_command(self) -> None:
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
async def mock_review() -> None:
await client.send_message("<b>📝 Recent Reviews</b>\n\n- 2026-02-14 KR")
handler.register_command("review", mock_review)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
await handler._handle_update(
{"update_id": 1, "message": {"chat": {"id": 456}, "text": "/review"}}
)
payload = mock_post.call_args.kwargs["json"]
assert "Recent Reviews" in payload["text"]
@pytest.mark.asyncio
async def test_dashboard_command(self) -> None:
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
async def mock_dashboard() -> None:
await client.send_message("<b>🖥️ Dashboard</b>\n\nURL: http://127.0.0.1:8080")
handler.register_command("dashboard", mock_dashboard)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
await handler._handle_update(
{"update_id": 1, "message": {"chat": {"id": 456}, "text": "/dashboard"}}
)
payload = mock_post.call_args.kwargs["json"]
assert "Dashboard" in payload["text"]
class TestGetUpdates: class TestGetUpdates:
"""Test getUpdates API interaction.""" """Test getUpdates API interaction."""

View File

@@ -412,7 +412,7 @@ class TestMarketScanner:
scan_result = context_store.get_context( scan_result = context_store.get_context(
ContextLayer.L7_REALTIME, ContextLayer.L7_REALTIME,
latest_timeframe, latest_timeframe,
"KR_scan_result", "scan_result_KR",
) )
assert scan_result is not None assert scan_result is not None
assert scan_result["total_scanned"] == 3 assert scan_result["total_scanned"] == 3