feat: integrate scenario engine and playbook into main trading loop (issue #84)
Replace brain.decide() with scenario_engine.evaluate() in trading_cycle and brain.decide_batch() with per-stock scenario evaluation in run_daily_session. Initialize PreMarketPlanner, ScenarioEngine, and PlaybookStore in run(). Add pre-market playbook generation on market open (1 Gemini call per market per day), market_data enrichment from scanner metrics (rsi, volume_ratio), portfolio_data for global rules, scenario match notifications, and playbook lifecycle management. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
260
src/main.py
260
src/main.py
@@ -11,13 +11,12 @@ import asyncio
|
||||
import logging
|
||||
import signal
|
||||
import sys
|
||||
from datetime import UTC, datetime
|
||||
from datetime import UTC, date, datetime
|
||||
from typing import Any
|
||||
|
||||
from src.analysis.scanner import MarketScanner
|
||||
from src.analysis.smart_scanner import ScanCandidate, SmartVolatilityScanner
|
||||
from src.analysis.volatility import VolatilityAnalyzer
|
||||
from src.brain.gemini_client import GeminiClient
|
||||
from src.broker.kis_api import KISBroker
|
||||
from src.broker.overseas import OverseasBroker
|
||||
from src.config import Settings
|
||||
@@ -30,7 +29,13 @@ from src.db import init_db, log_trade
|
||||
from src.logging.decision_logger import DecisionLogger
|
||||
from src.logging_config import setup_logging
|
||||
from src.markets.schedule import MarketInfo, get_next_market_open, get_open_markets
|
||||
from src.brain.context_selector import ContextSelector
|
||||
from src.brain.gemini_client import GeminiClient, TradeDecision
|
||||
from src.notifications.telegram_client import TelegramClient, TelegramCommandHandler
|
||||
from src.strategy.models import DayPlaybook
|
||||
from src.strategy.playbook_store import PlaybookStore
|
||||
from src.strategy.pre_market_planner import PreMarketPlanner
|
||||
from src.strategy.scenario_engine import ScenarioEngine, ScenarioMatch
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -75,7 +80,8 @@ TRADE_SESSION_INTERVAL_HOURS = 6 # Hours between sessions
|
||||
async def trading_cycle(
|
||||
broker: KISBroker,
|
||||
overseas_broker: OverseasBroker,
|
||||
brain: GeminiClient,
|
||||
scenario_engine: ScenarioEngine,
|
||||
playbook: DayPlaybook,
|
||||
risk: RiskManager,
|
||||
db_conn: Any,
|
||||
decision_logger: DecisionLogger,
|
||||
@@ -135,13 +141,26 @@ async def trading_cycle(
|
||||
else 0.0
|
||||
)
|
||||
|
||||
market_data = {
|
||||
market_data: dict[str, Any] = {
|
||||
"stock_code": stock_code,
|
||||
"market_name": market.name,
|
||||
"current_price": current_price,
|
||||
"foreigner_net": foreigner_net,
|
||||
}
|
||||
|
||||
# Enrich market_data with scanner metrics for scenario engine
|
||||
candidate = scan_candidates.get(stock_code)
|
||||
if candidate:
|
||||
market_data["rsi"] = candidate.rsi
|
||||
market_data["volume_ratio"] = candidate.volume_ratio
|
||||
|
||||
# Build portfolio data for global rule evaluation
|
||||
portfolio_data = {
|
||||
"portfolio_pnl_pct": pnl_pct,
|
||||
"total_cash": total_cash,
|
||||
"total_eval": total_eval,
|
||||
}
|
||||
|
||||
# 1.5. Get volatility metrics from context store (L7_REALTIME)
|
||||
latest_timeframe = context_store.get_latest_timeframe(ContextLayer.L7_REALTIME)
|
||||
volatility_score = 50.0 # Default normal volatility
|
||||
@@ -178,8 +197,13 @@ async def trading_cycle(
|
||||
volume_surge,
|
||||
)
|
||||
|
||||
# 2. Ask the brain for a decision
|
||||
decision = await brain.decide(market_data)
|
||||
# 2. Evaluate scenario (local, no API call)
|
||||
match = scenario_engine.evaluate(playbook, stock_code, market_data, portfolio_data)
|
||||
decision = TradeDecision(
|
||||
action=match.action.value,
|
||||
confidence=match.confidence,
|
||||
rationale=match.rationale,
|
||||
)
|
||||
logger.info(
|
||||
"Decision for %s (%s): %s (confidence=%d)",
|
||||
stock_code,
|
||||
@@ -188,6 +212,19 @@ async def trading_cycle(
|
||||
decision.confidence,
|
||||
)
|
||||
|
||||
# 2.1. Notify scenario match
|
||||
if match.matched_scenario is not None:
|
||||
try:
|
||||
condition_parts = [f"{k}={v}" for k, v in match.match_details.items()]
|
||||
await telegram.notify_scenario_matched(
|
||||
stock_code=stock_code,
|
||||
action=decision.action,
|
||||
condition_summary=", ".join(condition_parts) if condition_parts else "matched",
|
||||
confidence=float(decision.confidence),
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("Scenario matched notification failed: %s", exc)
|
||||
|
||||
# 2.5. Log decision with context snapshot
|
||||
context_snapshot = {
|
||||
"L1": {
|
||||
@@ -200,7 +237,7 @@ async def trading_cycle(
|
||||
"purchase_total": purchase_total,
|
||||
"pnl_pct": pnl_pct,
|
||||
},
|
||||
# L3-L7 will be populated when context tree is implemented
|
||||
"scenario_match": match.match_details,
|
||||
}
|
||||
input_data = {
|
||||
"current_price": current_price,
|
||||
@@ -324,7 +361,9 @@ async def trading_cycle(
|
||||
async def run_daily_session(
|
||||
broker: KISBroker,
|
||||
overseas_broker: OverseasBroker,
|
||||
brain: GeminiClient,
|
||||
scenario_engine: ScenarioEngine,
|
||||
playbook_store: PlaybookStore,
|
||||
pre_market_planner: PreMarketPlanner,
|
||||
risk: RiskManager,
|
||||
db_conn: Any,
|
||||
decision_logger: DecisionLogger,
|
||||
@@ -336,10 +375,8 @@ async def run_daily_session(
|
||||
) -> None:
|
||||
"""Execute one daily trading session.
|
||||
|
||||
Designed for API efficiency with Gemini Free tier:
|
||||
- Batch decision making (1 API call per market)
|
||||
- Runs N times per day at fixed intervals
|
||||
- Minimizes API usage while maintaining trading capability
|
||||
V2 proactive strategy: 1 Gemini call for playbook generation,
|
||||
then local scenario evaluation per stock (0 API calls).
|
||||
"""
|
||||
# Get currently open markets
|
||||
open_markets = get_open_markets(settings.enabled_market_list)
|
||||
@@ -350,29 +387,67 @@ async def run_daily_session(
|
||||
|
||||
logger.info("Starting daily trading session for %d markets", len(open_markets))
|
||||
|
||||
today = date.today()
|
||||
|
||||
# Process each open market
|
||||
for market in open_markets:
|
||||
# Dynamic stock discovery via scanner (no static watchlists)
|
||||
candidates_list: list[ScanCandidate] = []
|
||||
try:
|
||||
candidates = await smart_scanner.scan()
|
||||
watchlist = [c.stock_code for c in candidates] if candidates else []
|
||||
candidates_list = await smart_scanner.scan() if smart_scanner else []
|
||||
except Exception as exc:
|
||||
logger.error("Smart Scanner failed for %s: %s", market.name, exc)
|
||||
watchlist = []
|
||||
|
||||
if not watchlist:
|
||||
if not candidates_list:
|
||||
logger.info("No scanner candidates for market %s — skipping", market.code)
|
||||
continue
|
||||
|
||||
watchlist = [c.stock_code for c in candidates_list]
|
||||
candidate_map = {c.stock_code: c for c in candidates_list}
|
||||
logger.info("Processing market: %s (%d stocks)", market.name, len(watchlist))
|
||||
|
||||
# Generate or load playbook (1 Gemini API call per market per day)
|
||||
playbook = playbook_store.load(today, market.code)
|
||||
if playbook is None:
|
||||
try:
|
||||
playbook = await pre_market_planner.generate_playbook(
|
||||
market=market.code,
|
||||
candidates=candidates_list,
|
||||
today=today,
|
||||
)
|
||||
playbook_store.save(playbook)
|
||||
try:
|
||||
await telegram.notify_playbook_generated(
|
||||
market=market.code,
|
||||
stock_count=playbook.stock_count,
|
||||
scenario_count=playbook.scenario_count,
|
||||
token_count=playbook.token_count,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("Playbook notification failed: %s", exc)
|
||||
logger.info(
|
||||
"Generated playbook for %s: %d stocks, %d scenarios",
|
||||
market.code, playbook.stock_count, playbook.scenario_count,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.error("Playbook generation failed for %s: %s", market.code, exc)
|
||||
try:
|
||||
await telegram.notify_playbook_failed(
|
||||
market=market.code, reason=str(exc)[:200],
|
||||
)
|
||||
except Exception as notify_exc:
|
||||
logger.warning("Playbook failed notification error: %s", notify_exc)
|
||||
playbook = PreMarketPlanner._empty_playbook(today, market.code)
|
||||
|
||||
# Collect market data for all stocks from scanner
|
||||
stocks_data = []
|
||||
for stock_code in watchlist:
|
||||
try:
|
||||
if market.is_domestic:
|
||||
orderbook = await broker.get_orderbook(stock_code)
|
||||
current_price = safe_float(orderbook.get("output1", {}).get("stck_prpr", "0"))
|
||||
current_price = safe_float(
|
||||
orderbook.get("output1", {}).get("stck_prpr", "0")
|
||||
)
|
||||
foreigner_net = safe_float(
|
||||
orderbook.get("output1", {}).get("frgn_ntby_qty", "0")
|
||||
)
|
||||
@@ -380,17 +455,23 @@ async def run_daily_session(
|
||||
price_data = await overseas_broker.get_overseas_price(
|
||||
market.exchange_code, stock_code
|
||||
)
|
||||
current_price = safe_float(price_data.get("output", {}).get("last", "0"))
|
||||
current_price = safe_float(
|
||||
price_data.get("output", {}).get("last", "0")
|
||||
)
|
||||
foreigner_net = 0.0
|
||||
|
||||
stocks_data.append(
|
||||
{
|
||||
"stock_code": stock_code,
|
||||
"market_name": market.name,
|
||||
"current_price": current_price,
|
||||
"foreigner_net": foreigner_net,
|
||||
}
|
||||
)
|
||||
stock_data: dict[str, Any] = {
|
||||
"stock_code": stock_code,
|
||||
"market_name": market.name,
|
||||
"current_price": current_price,
|
||||
"foreigner_net": foreigner_net,
|
||||
}
|
||||
# Enrich with scanner metrics
|
||||
cand = candidate_map.get(stock_code)
|
||||
if cand:
|
||||
stock_data["rsi"] = cand.rsi
|
||||
stock_data["volume_ratio"] = cand.volume_ratio
|
||||
stocks_data.append(stock_data)
|
||||
except Exception as exc:
|
||||
logger.error("Failed to fetch data for %s: %s", stock_code, exc)
|
||||
continue
|
||||
@@ -399,17 +480,19 @@ async def run_daily_session(
|
||||
logger.warning("No valid stock data for market %s", market.code)
|
||||
continue
|
||||
|
||||
# Get batch decisions (1 API call for all stocks in this market)
|
||||
logger.info("Requesting batch decision for %d stocks in %s", len(stocks_data), market.name)
|
||||
decisions = await brain.decide_batch(stocks_data)
|
||||
|
||||
# Get balance data once for the market
|
||||
if market.is_domestic:
|
||||
balance_data = await broker.get_balance()
|
||||
output2 = balance_data.get("output2", [{}])
|
||||
total_eval = safe_float(output2[0].get("tot_evlu_amt", "0")) if output2 else 0
|
||||
total_cash = safe_float(output2[0].get("dnca_tot_amt", "0")) if output2 else 0
|
||||
purchase_total = safe_float(output2[0].get("pchs_amt_smtl_amt", "0")) if output2 else 0
|
||||
total_eval = safe_float(
|
||||
output2[0].get("tot_evlu_amt", "0")
|
||||
) if output2 else 0
|
||||
total_cash = safe_float(
|
||||
output2[0].get("dnca_tot_amt", "0")
|
||||
) if output2 else 0
|
||||
purchase_total = safe_float(
|
||||
output2[0].get("pchs_amt_smtl_amt", "0")
|
||||
) if output2 else 0
|
||||
else:
|
||||
balance_data = await overseas_broker.get_overseas_balance(market.exchange_code)
|
||||
output2 = balance_data.get("output2", [{}])
|
||||
@@ -422,21 +505,37 @@ async def run_daily_session(
|
||||
|
||||
total_eval = safe_float(balance_info.get("frcr_evlu_tota", "0") or "0")
|
||||
total_cash = safe_float(balance_info.get("frcr_dncl_amt_2", "0") or "0")
|
||||
purchase_total = safe_float(balance_info.get("frcr_buy_amt_smtl", "0") or "0")
|
||||
purchase_total = safe_float(
|
||||
balance_info.get("frcr_buy_amt_smtl", "0") or "0"
|
||||
)
|
||||
|
||||
# Calculate daily P&L %
|
||||
pnl_pct = (
|
||||
((total_eval - purchase_total) / purchase_total * 100) if purchase_total > 0 else 0.0
|
||||
((total_eval - purchase_total) / purchase_total * 100)
|
||||
if purchase_total > 0
|
||||
else 0.0
|
||||
)
|
||||
portfolio_data = {
|
||||
"portfolio_pnl_pct": pnl_pct,
|
||||
"total_cash": total_cash,
|
||||
"total_eval": total_eval,
|
||||
}
|
||||
|
||||
# Execute decisions for each stock
|
||||
# Evaluate scenarios for each stock (local, no API calls)
|
||||
logger.info(
|
||||
"Evaluating %d stocks against playbook for %s",
|
||||
len(stocks_data), market.name,
|
||||
)
|
||||
for stock_data in stocks_data:
|
||||
stock_code = stock_data["stock_code"]
|
||||
decision = decisions.get(stock_code)
|
||||
|
||||
if not decision:
|
||||
logger.warning("No decision for %s — skipping", stock_code)
|
||||
continue
|
||||
match = scenario_engine.evaluate(
|
||||
playbook, stock_code, stock_data, portfolio_data,
|
||||
)
|
||||
decision = TradeDecision(
|
||||
action=match.action.value,
|
||||
confidence=match.confidence,
|
||||
rationale=match.rationale,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"Decision for %s (%s): %s (confidence=%d)",
|
||||
@@ -458,6 +557,7 @@ async def run_daily_session(
|
||||
"purchase_total": purchase_total,
|
||||
"pnl_pct": pnl_pct,
|
||||
},
|
||||
"scenario_match": match.match_details,
|
||||
}
|
||||
input_data = {
|
||||
"current_price": stock_data["current_price"],
|
||||
@@ -509,7 +609,9 @@ async def run_daily_session(
|
||||
threshold=exc.threshold,
|
||||
)
|
||||
except Exception as notify_exc:
|
||||
logger.warning("Circuit breaker notification failed: %s", notify_exc)
|
||||
logger.warning(
|
||||
"Circuit breaker notification failed: %s", notify_exc
|
||||
)
|
||||
raise
|
||||
|
||||
# Send order
|
||||
@@ -544,7 +646,9 @@ async def run_daily_session(
|
||||
except Exception as exc:
|
||||
logger.warning("Telegram notification failed: %s", exc)
|
||||
except Exception as exc:
|
||||
logger.error("Order execution failed for %s: %s", stock_code, exc)
|
||||
logger.error(
|
||||
"Order execution failed for %s: %s", stock_code, exc
|
||||
)
|
||||
continue
|
||||
|
||||
# Log trade
|
||||
@@ -571,6 +675,20 @@ async def run(settings: Settings) -> None:
|
||||
decision_logger = DecisionLogger(db_conn)
|
||||
context_store = ContextStore(db_conn)
|
||||
|
||||
# V2 proactive strategy components
|
||||
context_selector = ContextSelector(context_store)
|
||||
scenario_engine = ScenarioEngine()
|
||||
playbook_store = PlaybookStore(db_conn)
|
||||
pre_market_planner = PreMarketPlanner(
|
||||
gemini_client=brain,
|
||||
context_store=context_store,
|
||||
context_selector=context_selector,
|
||||
settings=settings,
|
||||
)
|
||||
|
||||
# Track playbooks per market (in-memory cache)
|
||||
playbooks: dict[str, DayPlaybook] = {}
|
||||
|
||||
# Initialize Telegram notifications
|
||||
telegram = TelegramClient(
|
||||
bot_token=settings.TELEGRAM_BOT_TOKEN,
|
||||
@@ -802,7 +920,9 @@ async def run(settings: Settings) -> None:
|
||||
await run_daily_session(
|
||||
broker,
|
||||
overseas_broker,
|
||||
brain,
|
||||
scenario_engine,
|
||||
playbook_store,
|
||||
pre_market_planner,
|
||||
risk,
|
||||
db_conn,
|
||||
decision_logger,
|
||||
@@ -850,6 +970,8 @@ async def run(settings: Settings) -> None:
|
||||
except Exception as exc:
|
||||
logger.warning("Market close notification failed: %s", exc)
|
||||
_market_states[market_code] = False
|
||||
# Clear playbook for closed market (new one generated next open)
|
||||
playbooks.pop(market_code, None)
|
||||
|
||||
# No markets open — wait until next market opens
|
||||
try:
|
||||
@@ -887,7 +1009,8 @@ async def run(settings: Settings) -> None:
|
||||
# Smart Scanner: dynamic stock discovery (no static watchlists)
|
||||
now_timestamp = asyncio.get_event_loop().time()
|
||||
last_scan = last_scan_time.get(market.code, 0.0)
|
||||
if now_timestamp - last_scan >= SCAN_INTERVAL_SECONDS:
|
||||
rescan_interval = settings.RESCAN_INTERVAL_SECONDS
|
||||
if now_timestamp - last_scan >= rescan_interval:
|
||||
try:
|
||||
logger.info("Smart Scanner: Scanning %s market", market.name)
|
||||
|
||||
@@ -909,6 +1032,44 @@ async def run(settings: Settings) -> None:
|
||||
market.name,
|
||||
[f"{c.stock_code}(RSI={c.rsi:.0f})" for c in candidates],
|
||||
)
|
||||
|
||||
# Generate playbook on first scan (1 Gemini call per market)
|
||||
if market.code not in playbooks:
|
||||
try:
|
||||
pb = await pre_market_planner.generate_playbook(
|
||||
market=market.code,
|
||||
candidates=candidates,
|
||||
)
|
||||
playbook_store.save(pb)
|
||||
playbooks[market.code] = pb
|
||||
try:
|
||||
await telegram.notify_playbook_generated(
|
||||
market=market.code,
|
||||
stock_count=pb.stock_count,
|
||||
scenario_count=pb.scenario_count,
|
||||
token_count=pb.token_count,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"Playbook notification failed: %s", exc
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.error(
|
||||
"Playbook generation failed for %s: %s",
|
||||
market.code, exc,
|
||||
)
|
||||
try:
|
||||
await telegram.notify_playbook_failed(
|
||||
market=market.code,
|
||||
reason=str(exc)[:200],
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
playbooks[market.code] = (
|
||||
PreMarketPlanner._empty_playbook(
|
||||
date.today(), market.code
|
||||
)
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
"Smart Scanner: No candidates for %s — no trades", market.name
|
||||
@@ -933,13 +1094,20 @@ async def run(settings: Settings) -> None:
|
||||
if shutdown.is_set():
|
||||
break
|
||||
|
||||
# Get playbook for this market
|
||||
market_playbook = playbooks.get(
|
||||
market.code,
|
||||
PreMarketPlanner._empty_playbook(date.today(), market.code),
|
||||
)
|
||||
|
||||
# Retry logic for connection errors
|
||||
for attempt in range(1, MAX_CONNECTION_RETRIES + 1):
|
||||
try:
|
||||
await trading_cycle(
|
||||
broker,
|
||||
overseas_broker,
|
||||
brain,
|
||||
scenario_engine,
|
||||
market_playbook,
|
||||
risk,
|
||||
db_conn,
|
||||
decision_logger,
|
||||
|
||||
Reference in New Issue
Block a user