"""The Ouroboros — main trading loop. Orchestrates the broker, brain, and risk manager into a continuous trading cycle with configurable intervals. """ from __future__ import annotations import argparse import asyncio import json import logging import signal import threading from datetime import UTC, datetime from typing import Any from src.analysis.smart_scanner import ScanCandidate, SmartVolatilityScanner from src.analysis.volatility import VolatilityAnalyzer from src.brain.context_selector import ContextSelector from src.brain.gemini_client import GeminiClient, TradeDecision from src.broker.kis_api import KISBroker, kr_round_down from src.broker.overseas import OverseasBroker from src.config import Settings from src.context.aggregator import ContextAggregator from src.context.layer import ContextLayer from src.context.scheduler import ContextScheduler from src.context.store import ContextStore from src.core.criticality import CriticalityAssessor from src.core.blackout_manager import ( BlackoutOrderManager, QueuedOrderIntent, parse_blackout_windows_kst, ) from src.core.kill_switch import KillSwitchOrchestrator from src.core.order_policy import OrderPolicyRejected, validate_order_policy from src.core.priority_queue import PriorityTaskQueue from src.core.risk_manager import CircuitBreakerTripped, FatFingerRejected, RiskManager from src.db import ( get_latest_buy_trade, get_open_position, get_recent_symbols, init_db, log_trade, ) from src.evolution.daily_review import DailyReviewer from src.evolution.optimizer import EvolutionOptimizer from src.logging.decision_logger import DecisionLogger from src.logging_config import setup_logging from src.markets.schedule import MARKETS, MarketInfo, get_next_market_open, get_open_markets from src.notifications.telegram_client import NotificationFilter, TelegramClient, TelegramCommandHandler from src.strategy.models import DayPlaybook, MarketOutlook from src.strategy.exit_rules import ExitRuleConfig, ExitRuleInput, evaluate_exit from src.strategy.playbook_store import PlaybookStore from src.strategy.pre_market_planner import PreMarketPlanner from src.strategy.position_state_machine import PositionState from src.strategy.scenario_engine import ScenarioEngine logger = logging.getLogger(__name__) KILL_SWITCH = KillSwitchOrchestrator() BLACKOUT_ORDER_MANAGER = BlackoutOrderManager( enabled=False, windows=[], max_queue_size=500, ) def safe_float(value: str | float | None, default: float = 0.0) -> float: """Convert to float, handling empty strings and None. Args: value: Value to convert (string, float, or None) default: Default value if conversion fails Returns: Converted float or default value Examples: >>> safe_float("123.45") 123.45 >>> safe_float("") 0.0 >>> safe_float(None) 0.0 >>> safe_float("invalid", 99.0) 99.0 """ if value is None or value == "": return default try: return float(value) except (ValueError, TypeError): return default TRADE_INTERVAL_SECONDS = 60 SCAN_INTERVAL_SECONDS = 60 # Scan markets every 60 seconds MAX_CONNECTION_RETRIES = 3 _BUY_COOLDOWN_SECONDS = 600 # 10-minute cooldown after insufficient-balance rejection # Daily trading mode constants (for Free tier API efficiency) DAILY_TRADE_SESSIONS = 4 # Number of trading sessions per day TRADE_SESSION_INTERVAL_HOURS = 6 # Hours between sessions async def _retry_connection(coro_factory: Any, *args: Any, label: str = "", **kwargs: Any) -> Any: """Call an async function retrying on ConnectionError with exponential backoff. Retries up to MAX_CONNECTION_RETRIES times (exclusive of the first attempt), sleeping 2^attempt seconds between attempts. Use only for idempotent read operations — never for order submission. Args: coro_factory: Async callable (method or function) to invoke. *args: Positional arguments forwarded to coro_factory. label: Human-readable label for log messages. **kwargs: Keyword arguments forwarded to coro_factory. Raises: ConnectionError: If all retries are exhausted. """ for attempt in range(1, MAX_CONNECTION_RETRIES + 1): try: return await coro_factory(*args, **kwargs) except ConnectionError as exc: if attempt < MAX_CONNECTION_RETRIES: wait_secs = 2 ** attempt logger.warning( "Connection error %s (attempt %d/%d), retrying in %ds: %s", label, attempt, MAX_CONNECTION_RETRIES, wait_secs, exc, ) await asyncio.sleep(wait_secs) else: logger.error( "Connection error %s — all %d retries exhausted: %s", label, MAX_CONNECTION_RETRIES, exc, ) raise async def sync_positions_from_broker( broker: Any, overseas_broker: Any, db_conn: Any, settings: "Settings", ) -> int: """Sync open positions from the live broker into the local DB at startup. Fetches current holdings from the broker for all configured markets and inserts a synthetic BUY record for any position that the DB does not already know about. This prevents double-buy when positions were opened in a previous session or entered manually outside the system. Returns: Number of new positions synced. """ synced = 0 seen_exchange_codes: set[str] = set() for market_code in settings.enabled_market_list: market = MARKETS.get(market_code) if market is None: continue try: if market.is_domestic: balance_data = await broker.get_balance() log_market = market_code # "KR" else: if market.exchange_code in seen_exchange_codes: continue seen_exchange_codes.add(market.exchange_code) balance_data = await overseas_broker.get_overseas_balance( market.exchange_code ) log_market = market_code # e.g. "US_NASDAQ" except ConnectionError as exc: logger.warning( "Startup sync: balance fetch failed for %s — skipping: %s", market_code, exc, ) continue held_codes = _extract_held_codes_from_balance( balance_data, is_domestic=market.is_domestic ) for stock_code in held_codes: if get_open_position(db_conn, stock_code, log_market): continue # already tracked qty = _extract_held_qty_from_balance( balance_data, stock_code, is_domestic=market.is_domestic ) avg_price = _extract_avg_price_from_balance( balance_data, stock_code, is_domestic=market.is_domestic ) log_trade( conn=db_conn, stock_code=stock_code, action="BUY", confidence=0, rationale="[startup-sync] Position detected from broker at startup", quantity=qty, price=avg_price, market=log_market, exchange_code=market.exchange_code, mode=settings.MODE, ) logger.info( "Startup sync: %s/%s recorded as open position (qty=%d)", log_market, stock_code, qty, ) synced += 1 if synced: logger.info( "Startup sync complete: %d position(s) synced from broker", synced ) else: logger.info("Startup sync: no new positions to sync from broker") return synced def _extract_symbol_from_holding(item: dict[str, Any]) -> str: """Extract symbol from overseas holding payload variants.""" for key in ( "ovrs_pdno", "pdno", "ovrs_item_name", "prdt_name", "symb", "symbol", "stock_code", ): value = item.get(key) if isinstance(value, str): symbol = value.strip().upper() if symbol and symbol.replace(".", "").replace("-", "").isalnum(): return symbol return "" def _extract_held_codes_from_balance( balance_data: dict[str, Any], *, is_domestic: bool, ) -> list[str]: """Return stock codes with a positive orderable quantity from a balance response. Uses the broker's live output1 as the source of truth so that partial fills and manual external trades are always reflected correctly. """ output1 = balance_data.get("output1", []) if isinstance(output1, dict): output1 = [output1] if not isinstance(output1, list): return [] codes: list[str] = [] for holding in output1: if not isinstance(holding, dict): continue code_key = "pdno" if is_domestic else "ovrs_pdno" code = str(holding.get(code_key, "")).strip().upper() if not code: continue if is_domestic: qty = int(holding.get("ord_psbl_qty") or holding.get("hldg_qty") or 0) else: # ord_psbl_qty (주문가능수량) is the actual sellable quantity. # ovrs_cblc_qty (해외잔고수량) includes unsettled/expired holdings # that cannot actually be sold (e.g. expired warrants). qty = int( holding.get("ord_psbl_qty") or holding.get("ovrs_cblc_qty") or holding.get("hldg_qty") or 0 ) if qty > 0: codes.append(code) return codes def _extract_held_qty_from_balance( balance_data: dict[str, Any], stock_code: str, *, is_domestic: bool, ) -> int: """Extract the broker-confirmed orderable quantity for a stock. Uses the broker's live balance response (output1) as the source of truth rather than the local DB, because DB records reflect order quantity which may differ from actual fill quantity due to partial fills. Domestic fields (VTTC8434R output1): pdno — 종목코드 ord_psbl_qty — 주문가능수량 (preferred: excludes unsettled) hldg_qty — 보유수량 (fallback) Overseas fields (VTTS3012R / TTTS3012R output1): ovrs_pdno — 종목코드 ord_psbl_qty — 주문가능수량 (preferred: actual sellable qty) ovrs_cblc_qty — 해외잔고수량 (fallback: total holding, may include unsettled or expired positions with ord_psbl_qty=0) hldg_qty — 보유수량 (last-resort fallback) """ output1 = balance_data.get("output1", []) if isinstance(output1, dict): output1 = [output1] if not isinstance(output1, list): return 0 for holding in output1: if not isinstance(holding, dict): continue code_key = "pdno" if is_domestic else "ovrs_pdno" held_code = str(holding.get(code_key, "")).strip().upper() if held_code != stock_code.strip().upper(): continue if is_domestic: qty = int(holding.get("ord_psbl_qty") or holding.get("hldg_qty") or 0) else: qty = int( holding.get("ord_psbl_qty") or holding.get("ovrs_cblc_qty") or holding.get("hldg_qty") or 0 ) return qty return 0 def _extract_avg_price_from_balance( balance_data: dict[str, Any], stock_code: str, *, is_domestic: bool, ) -> float: """Extract the broker-reported average purchase price for a stock. Uses ``pchs_avg_pric`` (매입평균가격) from the balance response (output1). Returns 0.0 when absent so callers can use ``if price > 0`` as sentinel. Domestic fields (VTTC8434R output1): pdno, pchs_avg_pric Overseas fields (VTTS3012R output1): ovrs_pdno, pchs_avg_pric """ output1 = balance_data.get("output1", []) if isinstance(output1, dict): output1 = [output1] if not isinstance(output1, list): return 0.0 for holding in output1: if not isinstance(holding, dict): continue code_key = "pdno" if is_domestic else "ovrs_pdno" held_code = str(holding.get(code_key, "")).strip().upper() if held_code != stock_code.strip().upper(): continue return safe_float(holding.get("pchs_avg_pric"), 0.0) return 0.0 def _determine_order_quantity( *, action: str, current_price: float, total_cash: float, candidate: ScanCandidate | None, settings: Settings | None, broker_held_qty: int = 0, playbook_allocation_pct: float | None = None, scenario_confidence: int = 80, ) -> int: """Determine order quantity using volatility-aware position sizing. Priority: 1. playbook_allocation_pct (AI-specified) scaled by scenario_confidence 2. Fallback: volatility-score-based allocation from scanner candidate """ if action == "SELL": return broker_held_qty if current_price <= 0 or total_cash <= 0: return 0 if settings is None or not settings.POSITION_SIZING_ENABLED: return 1 # Use AI-specified allocation_pct if available if playbook_allocation_pct is not None: # Confidence scaling: confidence 80 → 1.0x, confidence 95 → 1.19x confidence_scale = scenario_confidence / 80.0 effective_pct = min( settings.POSITION_MAX_ALLOCATION_PCT, max( settings.POSITION_MIN_ALLOCATION_PCT, playbook_allocation_pct * confidence_scale, ), ) budget = total_cash * (effective_pct / 100.0) quantity = int(budget // current_price) return max(0, quantity) # Fallback: volatility-score-based allocation target_score = max(1.0, settings.POSITION_VOLATILITY_TARGET_SCORE) observed_score = candidate.score if candidate else target_score observed_score = max(1.0, min(100.0, observed_score)) # Higher observed volatility score => smaller allocation. scaled_pct = settings.POSITION_BASE_ALLOCATION_PCT * (target_score / observed_score) allocation_pct = min( settings.POSITION_MAX_ALLOCATION_PCT, max(settings.POSITION_MIN_ALLOCATION_PCT, scaled_pct), ) budget = total_cash * (allocation_pct / 100.0) quantity = int(budget // current_price) if quantity <= 0: return 0 return quantity async def build_overseas_symbol_universe( db_conn: Any, overseas_broker: OverseasBroker, market: MarketInfo, active_stocks: dict[str, list[str]], ) -> list[str]: """Build dynamic overseas symbol universe from runtime, DB, and holdings.""" symbols: list[str] = [] # 1) Keep current active stocks first to avoid sudden churn between cycles. symbols.extend(active_stocks.get(market.code, [])) # 2) Add recent symbols from own trading history (no fixed list). symbols.extend(get_recent_symbols(db_conn, market.code, limit=30)) # 3) Add current overseas holdings from broker balance if available. try: balance_data = await overseas_broker.get_overseas_balance(market.exchange_code) output1 = balance_data.get("output1", []) if isinstance(output1, dict): output1 = [output1] if isinstance(output1, list): for row in output1: if not isinstance(row, dict): continue symbol = _extract_symbol_from_holding(row) if symbol: symbols.append(symbol) except Exception as exc: logger.warning("Failed to build overseas holdings universe for %s: %s", market.code, exc) seen: set[str] = set() ordered_unique: list[str] = [] for symbol in symbols: normalized = symbol.strip().upper() if not normalized or normalized in seen: continue seen.add(normalized) ordered_unique.append(normalized) return ordered_unique def _build_queued_order_intent( *, market: MarketInfo, stock_code: str, order_type: str, quantity: int, price: float, source: str, ) -> QueuedOrderIntent: return QueuedOrderIntent( market_code=market.code, exchange_code=market.exchange_code, stock_code=stock_code, order_type=order_type, quantity=quantity, price=price, source=source, queued_at=datetime.now(UTC), ) def _maybe_queue_order_intent( *, market: MarketInfo, stock_code: str, order_type: str, quantity: int, price: float, source: str, ) -> bool: if not BLACKOUT_ORDER_MANAGER.in_blackout(): return False queued = BLACKOUT_ORDER_MANAGER.enqueue( _build_queued_order_intent( market=market, stock_code=stock_code, order_type=order_type, quantity=quantity, price=price, source=source, ) ) if queued: logger.warning( "Blackout active: queued order intent %s %s (%s) qty=%d price=%.4f source=%s pending=%d", order_type, stock_code, market.code, quantity, price, source, BLACKOUT_ORDER_MANAGER.pending_count, ) else: logger.error( "Blackout queue full: dropped order intent %s %s (%s) qty=%d source=%s", order_type, stock_code, market.code, quantity, source, ) return True async def process_blackout_recovery_orders( *, broker: KISBroker, overseas_broker: OverseasBroker, db_conn: Any, ) -> None: intents = BLACKOUT_ORDER_MANAGER.pop_recovery_batch() if not intents: return logger.info( "Blackout recovery started: processing %d queued intents", len(intents), ) for intent in intents: market = MARKETS.get(intent.market_code) if market is None: continue open_position = get_open_position(db_conn, intent.stock_code, market.code) if intent.order_type == "BUY" and open_position is not None: logger.info( "Drop stale queued BUY %s (%s): position already open", intent.stock_code, market.code, ) continue if intent.order_type == "SELL" and open_position is None: logger.info( "Drop stale queued SELL %s (%s): no open position", intent.stock_code, market.code, ) continue try: validate_order_policy( market=market, order_type=intent.order_type, price=float(intent.price), ) if market.is_domestic: result = await broker.send_order( stock_code=intent.stock_code, order_type=intent.order_type, quantity=intent.quantity, price=intent.price, ) else: result = await overseas_broker.send_overseas_order( exchange_code=market.exchange_code, stock_code=intent.stock_code, order_type=intent.order_type, quantity=intent.quantity, price=intent.price, ) accepted = result.get("rt_cd", "0") == "0" if accepted: logger.info( "Recovered queued order executed: %s %s (%s) qty=%d price=%.4f source=%s", intent.order_type, intent.stock_code, market.code, intent.quantity, intent.price, intent.source, ) continue logger.warning( "Recovered queued order rejected: %s %s (%s) qty=%d msg=%s", intent.order_type, intent.stock_code, market.code, intent.quantity, result.get("msg1"), ) except Exception as exc: if isinstance(exc, OrderPolicyRejected): logger.info( "Drop queued intent by policy: %s %s (%s): %s", intent.order_type, intent.stock_code, market.code, exc, ) continue logger.warning( "Recovered queued order failed: %s %s (%s): %s", intent.order_type, intent.stock_code, market.code, exc, ) if intent.attempts < 2: intent.attempts += 1 BLACKOUT_ORDER_MANAGER.requeue(intent) async def trading_cycle( broker: KISBroker, overseas_broker: OverseasBroker, scenario_engine: ScenarioEngine, playbook: DayPlaybook, risk: RiskManager, db_conn: Any, decision_logger: DecisionLogger, context_store: ContextStore, criticality_assessor: CriticalityAssessor, telegram: TelegramClient, market: MarketInfo, stock_code: str, scan_candidates: dict[str, dict[str, ScanCandidate]], settings: Settings | None = None, buy_cooldown: dict[str, float] | None = None, ) -> None: """Execute one trading cycle for a single stock.""" cycle_start_time = asyncio.get_event_loop().time() # 1. Fetch market data price_output: dict[str, Any] = {} # Populated for overseas markets; used for fallback metrics if market.is_domestic: current_price, price_change_pct, foreigner_net = await broker.get_current_price( stock_code ) balance_data = await broker.get_balance() output2 = balance_data.get("output2", [{}]) total_eval = safe_float(output2[0].get("tot_evlu_amt", "0")) if output2 else 0 total_cash = safe_float( balance_data.get("output2", [{}])[0].get("dnca_tot_amt", "0") if output2 else "0" ) purchase_total = safe_float(output2[0].get("pchs_amt_smtl_amt", "0")) if output2 else 0 else: # Overseas market price_data = await overseas_broker.get_overseas_price( market.exchange_code, stock_code ) balance_data = await overseas_broker.get_overseas_balance(market.exchange_code) output2 = balance_data.get("output2", [{}]) # Handle both list and dict response formats if isinstance(output2, list) and output2: balance_info = output2[0] elif isinstance(output2, dict): balance_info = output2 else: balance_info = {} total_eval = safe_float(balance_info.get("frcr_evlu_tota", "0") or "0") purchase_total = safe_float(balance_info.get("frcr_buy_amt_smtl", "0") or "0") # Resolve current price first (needed for buying power API) price_output = price_data.get("output", {}) current_price = safe_float(price_output.get("last", "0")) if current_price <= 0: market_candidates_lookup = scan_candidates.get(market.code, {}) cand_lookup = market_candidates_lookup.get(stock_code) if cand_lookup and cand_lookup.price > 0: logger.debug( "Price API returned 0 for %s; using scanner candidate price %.4f", stock_code, cand_lookup.price, ) current_price = cand_lookup.price foreigner_net = 0.0 # Not available for overseas price_change_pct = safe_float(price_output.get("rate", "0")) # Fetch available foreign currency cash via inquire-psamount (TTTS3007R/VTTS3007R). # TTTS3012R output2 does not include a cash/deposit field — frcr_dncl_amt_2 does not exist. # Source: 한국투자증권 오픈API 전체문서 (20260221) — '해외주식 매수가능금액조회' 시트 total_cash = 0.0 if current_price > 0: try: ps_data = await overseas_broker.get_overseas_buying_power( market.exchange_code, stock_code, current_price ) total_cash = safe_float( ps_data.get("output", {}).get("ovrs_ord_psbl_amt", "0") or "0" ) except ConnectionError as exc: logger.warning( "Could not fetch overseas buying power for %s/%s: %s", market.exchange_code, stock_code, exc, ) # Paper mode fallback: VTS overseas balance API often fails for many accounts. # Only activate in paper mode — live mode must use real balance from KIS. if ( total_cash <= 0 and settings and settings.MODE == "paper" and settings.PAPER_OVERSEAS_CASH > 0 ): logger.debug( "Overseas cash balance is 0 for %s; using paper fallback %.2f USD", market.exchange_code, settings.PAPER_OVERSEAS_CASH, ) total_cash = settings.PAPER_OVERSEAS_CASH # Calculate daily P&L % pnl_pct = ( ((total_eval - purchase_total) / purchase_total * 100) if purchase_total > 0 else 0.0 ) market_data: dict[str, Any] = { "stock_code": stock_code, "market_name": market.name, "current_price": current_price, "foreigner_net": foreigner_net, "price_change_pct": price_change_pct, } # Enrich market_data with scanner metrics for scenario engine market_candidates = scan_candidates.get(market.code, {}) candidate = market_candidates.get(stock_code) if candidate: market_data["rsi"] = candidate.rsi market_data["volume_ratio"] = candidate.volume_ratio else: # Holding stocks not in scanner: derive metrics from price API data already fetched. # For overseas stocks, price_output contains high/low/rate from get_overseas_price. # For domestic stocks, only price_change_pct is available from get_current_price. market_data["rsi"] = max(0.0, min(100.0, 50.0 + price_change_pct * 2.0)) if price_output and current_price > 0: pr_high = safe_float( price_output.get("high") or price_output.get("ovrs_hgpr") or price_output.get("stck_hgpr") ) pr_low = safe_float( price_output.get("low") or price_output.get("ovrs_lwpr") or price_output.get("stck_lwpr") ) if pr_high > 0 and pr_low > 0 and pr_high >= pr_low: intraday_range_pct = (pr_high - pr_low) / current_price * 100.0 volatility_pct = max(abs(price_change_pct), intraday_range_pct) market_data["volume_ratio"] = max(1.0, volatility_pct / 2.0) else: market_data["volume_ratio"] = 1.0 else: market_data["volume_ratio"] = 1.0 # Enrich market_data with holding info for SELL/HOLD scenario conditions open_pos = get_open_position(db_conn, stock_code, market.code) if open_pos and current_price > 0: entry_price = safe_float(open_pos.get("price"), 0.0) if entry_price > 0: market_data["unrealized_pnl_pct"] = ( (current_price - entry_price) / entry_price * 100 ) entry_ts = open_pos.get("timestamp") if entry_ts: try: entry_date = datetime.fromisoformat(entry_ts).date() market_data["holding_days"] = (datetime.now(UTC).date() - entry_date).days except (ValueError, TypeError): pass # 1.3. Record L7 real-time context (market-scoped keys) timeframe = datetime.now(UTC).isoformat() context_store.set_context( ContextLayer.L7_REALTIME, timeframe, f"volatility_{market.code}_{stock_code}", { "momentum_score": 50.0, "volume_surge": 1.0, "price_change_1m": 0.0, }, ) context_store.set_context( ContextLayer.L7_REALTIME, timeframe, f"price_{market.code}_{stock_code}", {"current_price": current_price}, ) if candidate: context_store.set_context( ContextLayer.L7_REALTIME, timeframe, f"rsi_{market.code}_{stock_code}", {"rsi": candidate.rsi}, ) context_store.set_context( ContextLayer.L7_REALTIME, timeframe, f"volume_ratio_{market.code}_{stock_code}", {"volume_ratio": candidate.volume_ratio}, ) # Write pnl_pct to system_metrics (dashboard-only table, separate from AI context tree) db_conn.execute( "INSERT OR REPLACE INTO system_metrics (key, value, updated_at) VALUES (?, ?, ?)", ( f"portfolio_pnl_pct_{market.code}", json.dumps({"pnl_pct": round(pnl_pct, 4)}), datetime.now(UTC).isoformat(), ), ) db_conn.commit() # Build portfolio data for global rule evaluation portfolio_data = { "portfolio_pnl_pct": pnl_pct, "total_cash": total_cash, "total_eval": total_eval, } # 1.5. Get volatility metrics from context store (L7_REALTIME) latest_timeframe = context_store.get_latest_timeframe(ContextLayer.L7_REALTIME) volatility_score = 50.0 # Default normal volatility volume_surge = 1.0 price_change_1m = 0.0 if latest_timeframe: volatility_data = context_store.get_context( ContextLayer.L7_REALTIME, latest_timeframe, f"volatility_{market.code}_{stock_code}", ) if volatility_data: volatility_score = volatility_data.get("momentum_score", 50.0) volume_surge = volatility_data.get("volume_surge", 1.0) price_change_1m = volatility_data.get("price_change_1m", 0.0) # 1.6. Assess criticality based on market conditions criticality = criticality_assessor.assess_market_conditions( pnl_pct=pnl_pct, volatility_score=volatility_score, volume_surge=volume_surge, price_change_1m=price_change_1m, is_market_open=True, ) logger.info( "Criticality for %s (%s): %s (pnl=%.2f%%, volatility=%.1f, volume_surge=%.1fx)", stock_code, market.name, criticality.value, pnl_pct, volatility_score, volume_surge, ) # 2. Evaluate scenario (local, no API call) match = scenario_engine.evaluate(playbook, stock_code, market_data, portfolio_data) decision = TradeDecision( action=match.action.value, confidence=match.confidence, rationale=match.rationale, ) stock_playbook = playbook.get_stock_playbook(stock_code) # 2.1. Apply market_outlook-based BUY confidence threshold if decision.action == "BUY": base_threshold = (settings.CONFIDENCE_THRESHOLD if settings else 80) outlook = playbook.market_outlook if outlook == MarketOutlook.BEARISH: min_confidence = 90 elif outlook == MarketOutlook.BULLISH: min_confidence = 75 else: min_confidence = base_threshold if match.confidence < min_confidence: logger.info( "BUY suppressed for %s (%s): confidence %d < %d (market_outlook=%s)", stock_code, market.name, match.confidence, min_confidence, outlook.value, ) decision = TradeDecision( action="HOLD", confidence=match.confidence, rationale=( f"BUY confidence {match.confidence} < {min_confidence} " f"(market_outlook={outlook.value})" ), ) # BUY 결정 전 기존 포지션 체크 (중복 매수 방지) if decision.action == "BUY": existing_position = get_open_position(db_conn, stock_code, market.code) if not existing_position: # SELL 지정가 접수 후 미체결 시 DB는 종료로 기록되나 브로커는 여전히 보유 중. # 국내/해외 모두 라이브 브로커 잔고를 authoritative source로 사용. broker_qty = _extract_held_qty_from_balance( balance_data, stock_code, is_domestic=market.is_domestic ) if broker_qty > 0: existing_position = {"price": 0.0, "quantity": broker_qty} if existing_position: decision = TradeDecision( action="HOLD", confidence=decision.confidence, rationale=( f"Already holding {stock_code} " f"(entry={existing_position['price']:.4f}, " f"qty={existing_position['quantity']})" ), ) logger.info( "BUY suppressed for %s (%s): already holding open position", stock_code, market.name, ) if decision.action == "HOLD": open_position = get_open_position(db_conn, stock_code, market.code) if open_position: entry_price = safe_float(open_position.get("price"), 0.0) if entry_price > 0 and current_price > 0: loss_pct = (current_price - entry_price) / entry_price * 100 stop_loss_threshold = -2.0 take_profit_threshold = 3.0 if stock_playbook and stock_playbook.scenarios: stop_loss_threshold = stock_playbook.scenarios[0].stop_loss_pct take_profit_threshold = stock_playbook.scenarios[0].take_profit_pct exit_eval = evaluate_exit( current_state=PositionState.HOLDING, config=ExitRuleConfig( hard_stop_pct=stop_loss_threshold, be_arm_pct=max(0.5, take_profit_threshold * 0.4), arm_pct=take_profit_threshold, ), inp=ExitRuleInput( current_price=current_price, entry_price=entry_price, peak_price=max(entry_price, current_price), atr_value=0.0, pred_down_prob=0.0, liquidity_weak=market_data.get("volume_ratio", 1.0) < 1.0, ), ) if exit_eval.reason == "hard_stop": decision = TradeDecision( action="SELL", confidence=95, rationale=( f"Stop-loss triggered ({loss_pct:.2f}% <= " f"{stop_loss_threshold:.2f}%)" ), ) logger.info( "Stop-loss override for %s (%s): %.2f%% <= %.2f%%", stock_code, market.name, loss_pct, stop_loss_threshold, ) elif exit_eval.reason == "arm_take_profit": decision = TradeDecision( action="SELL", confidence=90, rationale=( f"Take-profit triggered ({loss_pct:.2f}% >= " f"{take_profit_threshold:.2f}%)" ), ) logger.info( "Take-profit override for %s (%s): %.2f%% >= %.2f%%", stock_code, market.name, loss_pct, take_profit_threshold, ) logger.info( "Decision for %s (%s): %s (confidence=%d)", stock_code, market.name, decision.action, decision.confidence, ) # 2.1. Notify scenario match if match.matched_scenario is not None: try: condition_parts = [f"{k}={v}" for k, v in match.match_details.items()] await telegram.notify_scenario_matched( stock_code=stock_code, action=decision.action, condition_summary=", ".join(condition_parts) if condition_parts else "matched", confidence=float(decision.confidence), ) except Exception as exc: logger.warning("Scenario matched notification failed: %s", exc) # 2.5. Log decision with context snapshot context_snapshot = { "L1": { "current_price": current_price, "foreigner_net": foreigner_net, }, "L2": { "total_eval": total_eval, "total_cash": total_cash, "purchase_total": purchase_total, "pnl_pct": pnl_pct, }, "scenario_match": match.match_details, } input_data = { "current_price": current_price, "foreigner_net": foreigner_net, "price_change_pct": price_change_pct, "total_eval": total_eval, "total_cash": total_cash, "pnl_pct": pnl_pct, } decision_id = decision_logger.log_decision( stock_code=stock_code, market=market.code, exchange_code=market.exchange_code, action=decision.action, confidence=decision.confidence, rationale=decision.rationale, context_snapshot=context_snapshot, input_data=input_data, ) # 3. Execute if actionable quantity = 0 trade_price = current_price trade_pnl = 0.0 if decision.action in ("BUY", "SELL"): if KILL_SWITCH.new_orders_blocked: logger.critical( "KillSwitch block active: skip %s order for %s (%s)", decision.action, stock_code, market.name, ) return broker_held_qty = ( _extract_held_qty_from_balance( balance_data, stock_code, is_domestic=market.is_domestic ) if decision.action == "SELL" else 0 ) matched_scenario = match.matched_scenario quantity = _determine_order_quantity( action=decision.action, current_price=current_price, total_cash=total_cash, candidate=candidate, settings=settings, broker_held_qty=broker_held_qty, playbook_allocation_pct=matched_scenario.allocation_pct if matched_scenario else None, scenario_confidence=match.confidence, ) if quantity <= 0: logger.info( "Skip %s %s (%s): no affordable quantity (cash=%.2f, price=%.2f)", decision.action, stock_code, market.name, total_cash, current_price, ) return order_amount = current_price * quantity # 4. Check BUY cooldown (set when a prior BUY failed due to insufficient balance) if decision.action == "BUY" and buy_cooldown is not None: cooldown_key = f"{market.code}:{stock_code}" cooldown_until = buy_cooldown.get(cooldown_key, 0.0) now = asyncio.get_event_loop().time() if now < cooldown_until: remaining = int(cooldown_until - now) logger.info( "Skip BUY %s (%s): insufficient-balance cooldown active (%ds remaining)", stock_code, market.name, remaining, ) return # 5a. Risk check BEFORE order # SELL orders do not consume cash (they receive it), so fat-finger check # is skipped for SELLs — only circuit breaker applies. try: if decision.action == "SELL": risk.check_circuit_breaker(pnl_pct) else: risk.validate_order( current_pnl_pct=pnl_pct, order_amount=order_amount, total_cash=total_cash, ) except FatFingerRejected as exc: try: await telegram.notify_fat_finger( stock_code=stock_code, order_amount=exc.order_amount, total_cash=exc.total_cash, max_pct=exc.max_pct, ) except Exception as notify_exc: logger.warning("Fat finger notification failed: %s", notify_exc) raise # Re-raise to prevent trade except CircuitBreakerTripped as exc: ks_report = await KILL_SWITCH.trigger( reason=f"circuit_breaker:{market.code}:{stock_code}:{exc.pnl_pct:.2f}", snapshot_state=lambda: logger.critical( "KillSwitch snapshot %s/%s pnl=%.2f threshold=%.2f", market.code, stock_code, exc.pnl_pct, exc.threshold, ), ) if ks_report.errors: logger.critical( "KillSwitch step errors for %s/%s: %s", market.code, stock_code, "; ".join(ks_report.errors), ) raise # 5. Send order order_succeeded = True if market.is_domestic: # Use limit orders (지정가) for domestic stocks to avoid market order # quantity calculation issues. KRX tick rounding applied via kr_round_down. # BUY: +0.2% — ensures fill even when ask is slightly above last price. # SELL: -0.2% — ensures fill even when bid is slightly below last price. if decision.action == "BUY": order_price = kr_round_down(current_price * 1.002) else: order_price = kr_round_down(current_price * 0.998) try: validate_order_policy( market=market, order_type=decision.action, price=float(order_price), ) except OrderPolicyRejected as exc: logger.warning( "Order policy rejected %s %s (%s): %s [session=%s]", decision.action, stock_code, market.name, exc, exc.session_id, ) return if _maybe_queue_order_intent( market=market, stock_code=stock_code, order_type=decision.action, quantity=quantity, price=float(order_price), source="trading_cycle", ): return result = await broker.send_order( stock_code=stock_code, order_type=decision.action, quantity=quantity, price=order_price, ) else: # For overseas orders, always use limit orders (지정가): # - KIS market orders (ORD_DVSN=01) calculate quantity based on upper limit # price (상한가 기준), resulting in only 60-80% of intended cash being used. # - BUY: +0.2% above last price — tight enough to minimise overpayment while # achieving >90% fill rate on large-cap US stocks. # - SELL: -0.2% below last price — ensures fill even when price dips slightly # (placing at exact last price risks no-fill if the bid is just below). overseas_price: float # KIS requires at most 2 decimal places for prices >= $1 (≥1달러 소수점 2자리 제한). # Penny stocks (< $1) keep 4 decimal places to preserve price precision. _price_decimals = 2 if current_price >= 1.0 else 4 if decision.action == "BUY": overseas_price = round(current_price * 1.002, _price_decimals) else: overseas_price = round(current_price * 0.998, _price_decimals) try: validate_order_policy( market=market, order_type=decision.action, price=float(overseas_price), ) except OrderPolicyRejected as exc: logger.warning( "Order policy rejected %s %s (%s): %s [session=%s]", decision.action, stock_code, market.name, exc, exc.session_id, ) return if _maybe_queue_order_intent( market=market, stock_code=stock_code, order_type=decision.action, quantity=quantity, price=float(overseas_price), source="trading_cycle", ): return result = await overseas_broker.send_overseas_order( exchange_code=market.exchange_code, stock_code=stock_code, order_type=decision.action, quantity=quantity, price=overseas_price, # limit order ) # Check if KIS rejected the order (rt_cd != "0") if result.get("rt_cd", "") != "0": order_succeeded = False msg1 = result.get("msg1") or "" logger.warning( "Overseas order not accepted for %s: rt_cd=%s msg=%s", stock_code, result.get("rt_cd"), msg1, ) # Set BUY cooldown when the rejection is due to insufficient balance if decision.action == "BUY" and buy_cooldown is not None and "주문가능금액" in msg1: cooldown_key = f"{market.code}:{stock_code}" buy_cooldown[cooldown_key] = ( asyncio.get_event_loop().time() + _BUY_COOLDOWN_SECONDS ) logger.info( "BUY cooldown set for %s: %.0fs (insufficient balance)", stock_code, _BUY_COOLDOWN_SECONDS, ) # Close ghost position when broker has no matching balance. # This prevents infinite SELL retry cycles for positions that # exist in the DB (from startup sync) but are no longer # sellable at the broker (expired warrants, delisted stocks, etc.) if decision.action == "SELL" and "잔고내역이 없습니다" in msg1: logger.warning( "Ghost position detected for %s (%s): broker reports no balance." " Closing DB position to prevent infinite retry.", stock_code, market.exchange_code, ) log_trade( conn=db_conn, stock_code=stock_code, action="SELL", confidence=0, rationale=( "[ghost-close] Broker reported no balance;" " position closed without fill" ), quantity=0, price=0.0, pnl=0.0, market=market.code, exchange_code=market.exchange_code, mode=settings.MODE if settings else "paper", ) logger.info("Order result: %s", result.get("msg1", "OK")) # 5.5. Notify trade execution (only on success) if order_succeeded: try: await telegram.notify_trade_execution( stock_code=stock_code, market=market.name, action=decision.action, quantity=quantity, price=current_price, confidence=decision.confidence, ) except Exception as exc: logger.warning("Telegram notification failed: %s", exc) if decision.action == "SELL" and order_succeeded: buy_trade = get_latest_buy_trade(db_conn, stock_code, market.code) if buy_trade and buy_trade.get("price") is not None: buy_price = float(buy_trade["price"]) buy_qty = int(buy_trade.get("quantity") or 1) trade_pnl = (trade_price - buy_price) * buy_qty decision_logger.update_outcome( decision_id=buy_trade["decision_id"], pnl=trade_pnl, accuracy=1 if trade_pnl > 0 else 0, ) # 6. Log trade with selection context (skip if order was rejected) if decision.action in ("BUY", "SELL") and not order_succeeded: return selection_context = None if stock_code in market_candidates: candidate = market_candidates[stock_code] selection_context = { "rsi": candidate.rsi, "volume_ratio": candidate.volume_ratio, "signal": candidate.signal, "score": candidate.score, } log_trade( conn=db_conn, stock_code=stock_code, action=decision.action, confidence=decision.confidence, rationale=decision.rationale, quantity=quantity, price=trade_price, pnl=trade_pnl, market=market.code, exchange_code=market.exchange_code, selection_context=selection_context, decision_id=decision_id, mode=settings.MODE if settings else "paper", ) # 7. Latency monitoring cycle_end_time = asyncio.get_event_loop().time() cycle_latency = cycle_end_time - cycle_start_time timeout = criticality_assessor.get_timeout(criticality) if timeout and cycle_latency > timeout: logger.warning( "Trading cycle exceeded timeout for %s (criticality=%s, latency=%.2fs, timeout=%.2fs)", stock_code, criticality.value, cycle_latency, timeout, ) else: logger.debug( "Trading cycle completed within timeout for %s (criticality=%s, latency=%.2fs)", stock_code, criticality.value, cycle_latency, ) async def handle_domestic_pending_orders( broker: KISBroker, telegram: TelegramClient, settings: Settings, sell_resubmit_counts: dict[str, int], buy_cooldown: dict[str, float] | None = None, ) -> None: """Check and handle unfilled (pending) domestic limit orders. Called once per market loop iteration before new orders are considered. In paper mode the KIS pending-orders API (TTTC0084R) is unsupported, so ``get_domestic_pending_orders`` returns [] immediately and this function exits without making further API calls. BUY pending → cancel (to free up balance) + optionally set cooldown. SELL pending → cancel then resubmit at a wider spread (-0.4% from last price, kr_round_down applied). Resubmission is attempted at most once per key per session to avoid infinite loops. Args: broker: KISBroker instance. telegram: TelegramClient for notifications. settings: Application settings. sell_resubmit_counts: Mutable dict tracking SELL resubmission attempts per "KR:{stock_code}" key. Passed by reference so counts persist across calls within the same session. buy_cooldown: Optional cooldown dict shared with the main trading loop. When provided, cancelled BUY orders are added with a _BUY_COOLDOWN_SECONDS expiry. """ try: orders = await broker.get_domestic_pending_orders() except Exception as exc: logger.warning("Failed to fetch domestic pending orders: %s", exc) return now = asyncio.get_event_loop().time() for order in orders: try: stock_code = order.get("pdno", "") orgn_odno = order.get("orgn_odno", "") krx_fwdg_ord_orgno = order.get("ord_gno_brno", "") sll_buy = order.get("sll_buy_dvsn_cd", "") # "01"=SELL, "02"=BUY psbl_qty = int(order.get("psbl_qty", "0") or "0") key = f"KR:{stock_code}" if not stock_code or not orgn_odno or psbl_qty <= 0: continue # Cancel the pending order first regardless of direction. cancel_result = await broker.cancel_domestic_order( stock_code=stock_code, orgn_odno=orgn_odno, krx_fwdg_ord_orgno=krx_fwdg_ord_orgno, qty=psbl_qty, ) if cancel_result.get("rt_cd") != "0": logger.warning( "Cancel failed for KR %s: rt_cd=%s msg=%s", stock_code, cancel_result.get("rt_cd"), cancel_result.get("msg1"), ) continue if sll_buy == "02": # BUY pending → cancelled; set cooldown to avoid immediate re-buy. if buy_cooldown is not None: buy_cooldown[key] = now + _BUY_COOLDOWN_SECONDS try: await telegram.notify_unfilled_order( stock_code=stock_code, market="KR", action="BUY", quantity=psbl_qty, outcome="cancelled", ) except Exception as notify_exc: logger.warning("notify_unfilled_order failed: %s", notify_exc) elif sll_buy == "01": # SELL pending — attempt one resubmit at a wider spread. if sell_resubmit_counts.get(key, 0) >= 1: # Already resubmitted once — only cancel (already done above). logger.warning( "SELL KR %s already resubmitted once — no further resubmit", stock_code, ) try: await telegram.notify_unfilled_order( stock_code=stock_code, market="KR", action="SELL", quantity=psbl_qty, outcome="cancelled", ) except Exception as notify_exc: logger.warning( "notify_unfilled_order failed: %s", notify_exc ) else: # First unfilled SELL → resubmit at last * 0.996 (-0.4%). try: last_price, _, _ = await broker.get_current_price(stock_code) if last_price <= 0: raise ValueError( f"Invalid price ({last_price}) for {stock_code}" ) new_price = kr_round_down(last_price * 0.996) validate_order_policy( market=MARKETS["KR"], order_type="SELL", price=float(new_price), ) await broker.send_order( stock_code=stock_code, order_type="SELL", quantity=psbl_qty, price=new_price, ) sell_resubmit_counts[key] = ( sell_resubmit_counts.get(key, 0) + 1 ) try: await telegram.notify_unfilled_order( stock_code=stock_code, market="KR", action="SELL", quantity=psbl_qty, outcome="resubmitted", new_price=float(new_price), ) except Exception as notify_exc: logger.warning( "notify_unfilled_order failed: %s", notify_exc ) except Exception as exc: logger.error( "SELL resubmit failed for KR %s: %s", stock_code, exc, ) except Exception as exc: logger.error( "Error handling domestic pending order for %s: %s", order.get("pdno", "?"), exc, ) async def handle_overseas_pending_orders( overseas_broker: OverseasBroker, telegram: TelegramClient, settings: Settings, sell_resubmit_counts: dict[str, int], buy_cooldown: dict[str, float] | None = None, ) -> None: """Check and handle unfilled (pending) overseas limit orders. Called once per market loop iteration before new orders are considered. In paper mode the KIS pending-orders API (TTTS3018R) is unsupported, so this function returns immediately without making any API calls. BUY pending → cancel (to free up balance) + optionally set cooldown. SELL pending → cancel then resubmit at a wider spread (-0.4% from last price). Resubmission is attempted at most once per key per session to avoid infinite retry loops. Args: overseas_broker: OverseasBroker instance. telegram: TelegramClient for notifications. settings: Application settings (MODE, ENABLED_MARKETS). sell_resubmit_counts: Mutable dict tracking SELL resubmission attempts per "{exchange_code}:{stock_code}" key. Passed by reference so counts persist across calls within the same session. buy_cooldown: Optional cooldown dict shared with the main trading loop. When provided, cancelled BUY orders are added with a _BUY_COOLDOWN_SECONDS expiry. """ # Determine which exchange codes to query, deduplicating US exchanges. # NASD alone returns all US (NASD/NYSE/AMEX) pending orders. us_exchanges = frozenset({"NASD", "NYSE", "AMEX"}) exchange_codes: list[str] = [] seen_us = False for market_code in settings.enabled_market_list: market_info = MARKETS.get(market_code) if market_info is None or market_info.is_domestic: continue exc_code = market_info.exchange_code if exc_code in us_exchanges: if not seen_us: exchange_codes.append("NASD") seen_us = True elif exc_code not in exchange_codes: exchange_codes.append(exc_code) now = asyncio.get_event_loop().time() for exchange_code in exchange_codes: try: orders = await overseas_broker.get_overseas_pending_orders(exchange_code) except Exception as exc: logger.warning( "Failed to fetch pending orders for %s: %s", exchange_code, exc ) continue for order in orders: try: stock_code = order.get("pdno", "") odno = order.get("odno", "") sll_buy = order.get("sll_buy_dvsn_cd", "") # "01"=SELL, "02"=BUY nccs_qty = int(order.get("nccs_qty", "0") or "0") order_exchange = order.get("ovrs_excg_cd") or exchange_code key = f"{order_exchange}:{stock_code}" if not stock_code or not odno or nccs_qty <= 0: continue # Cancel the pending order first regardless of direction. cancel_result = await overseas_broker.cancel_overseas_order( exchange_code=order_exchange, stock_code=stock_code, odno=odno, qty=nccs_qty, ) if cancel_result.get("rt_cd") != "0": logger.warning( "Cancel failed for %s %s: rt_cd=%s msg=%s", order_exchange, stock_code, cancel_result.get("rt_cd"), cancel_result.get("msg1"), ) continue if sll_buy == "02": # BUY pending → cancelled; set cooldown to avoid immediate re-buy. if buy_cooldown is not None: buy_cooldown[key] = now + _BUY_COOLDOWN_SECONDS try: await telegram.notify_unfilled_order( stock_code=stock_code, market=order_exchange, action="BUY", quantity=nccs_qty, outcome="cancelled", ) except Exception as notify_exc: logger.warning("notify_unfilled_order failed: %s", notify_exc) elif sll_buy == "01": # SELL pending — attempt one resubmit at a wider spread. if sell_resubmit_counts.get(key, 0) >= 1: # Already resubmitted once — only cancel (already done above). logger.warning( "SELL %s %s already resubmitted once — no further resubmit", order_exchange, stock_code, ) try: await telegram.notify_unfilled_order( stock_code=stock_code, market=order_exchange, action="SELL", quantity=nccs_qty, outcome="cancelled", ) except Exception as notify_exc: logger.warning( "notify_unfilled_order failed: %s", notify_exc ) else: # First unfilled SELL → resubmit at last * 0.996 (-0.4%). try: price_data = await overseas_broker.get_overseas_price( order_exchange, stock_code ) last_price = float( price_data.get("output", {}).get("last", "0") or "0" ) if last_price <= 0: raise ValueError( f"Invalid price ({last_price}) for {stock_code}" ) new_price = round(last_price * 0.996, 4) market_info = next( ( m for m in MARKETS.values() if m.exchange_code == order_exchange and not m.is_domestic ), None, ) if market_info is not None: validate_order_policy( market=market_info, order_type="SELL", price=float(new_price), ) await overseas_broker.send_overseas_order( exchange_code=order_exchange, stock_code=stock_code, order_type="SELL", quantity=nccs_qty, price=new_price, ) sell_resubmit_counts[key] = ( sell_resubmit_counts.get(key, 0) + 1 ) try: await telegram.notify_unfilled_order( stock_code=stock_code, market=order_exchange, action="SELL", quantity=nccs_qty, outcome="resubmitted", new_price=new_price, ) except Exception as notify_exc: logger.warning( "notify_unfilled_order failed: %s", notify_exc ) except Exception as exc: logger.error( "SELL resubmit failed for %s %s: %s", order_exchange, stock_code, exc, ) except Exception as exc: logger.error( "Error handling pending order for %s: %s", order.get("pdno", "?"), exc, ) async def run_daily_session( broker: KISBroker, overseas_broker: OverseasBroker, scenario_engine: ScenarioEngine, playbook_store: PlaybookStore, pre_market_planner: PreMarketPlanner, risk: RiskManager, db_conn: Any, decision_logger: DecisionLogger, context_store: ContextStore, criticality_assessor: CriticalityAssessor, telegram: TelegramClient, settings: Settings, smart_scanner: SmartVolatilityScanner | None = None, daily_start_eval: float = 0.0, ) -> float: """Execute one daily trading session. V2 proactive strategy: 1 Gemini call for playbook generation, then local scenario evaluation per stock (0 API calls). Args: daily_start_eval: Portfolio evaluation at the start of the trading day. Used to compute intra-day P&L for the Circuit Breaker. Pass 0.0 on the first session of each day; the function will set it from the first balance query and return it for subsequent sessions. Returns: The daily_start_eval value that should be forwarded to the next session of the same trading day. """ # Get currently open markets open_markets = get_open_markets(settings.enabled_market_list) if not open_markets: logger.info("No markets open for this session") return daily_start_eval logger.info("Starting daily trading session for %d markets", len(open_markets)) # BUY cooldown: prevents retrying stocks rejected for insufficient balance daily_buy_cooldown: dict[str, float] = {} # "{market_code}:{stock_code}" -> expiry timestamp # Tracks SELL resubmission attempts per "{exchange_code}:{stock_code}" (max 1 per session). sell_resubmit_counts: dict[str, int] = {} # Process each open market for market in open_markets: await process_blackout_recovery_orders( broker=broker, overseas_broker=overseas_broker, db_conn=db_conn, ) # Use market-local date for playbook keying market_today = datetime.now(market.timezone).date() # Check and handle domestic pending (unfilled) limit orders before new decisions. if market.is_domestic: try: await handle_domestic_pending_orders( broker, telegram, settings, sell_resubmit_counts, daily_buy_cooldown, ) except Exception as exc: logger.warning("Domestic pending order check failed: %s", exc) # Check and handle overseas pending (unfilled) limit orders before new decisions. if not market.is_domestic: try: await handle_overseas_pending_orders( overseas_broker, telegram, settings, sell_resubmit_counts, daily_buy_cooldown, ) except Exception as exc: logger.warning("Pending order check failed: %s", exc) # Dynamic stock discovery via scanner (no static watchlists) candidates_list: list[ScanCandidate] = [] fallback_stocks: list[str] | None = None if not market.is_domestic: fallback_stocks = await build_overseas_symbol_universe( db_conn=db_conn, overseas_broker=overseas_broker, market=market, active_stocks={}, ) if not fallback_stocks: logger.debug( "No dynamic overseas symbol universe for %s;" " scanner will use overseas ranking API", market.code, ) try: candidates_list = ( await smart_scanner.scan( market=market, fallback_stocks=fallback_stocks, ) if smart_scanner else [] ) except Exception as exc: logger.error("Smart Scanner failed for %s: %s", market.name, exc) if not candidates_list: logger.info("No scanner candidates for market %s — skipping", market.code) continue watchlist = [c.stock_code for c in candidates_list] candidate_map = {c.stock_code: c for c in candidates_list} logger.info("Processing market: %s (%d stocks)", market.name, len(watchlist)) # Generate or load playbook (1 Gemini API call per market per day) playbook = playbook_store.load(market_today, market.code) if playbook is None: try: playbook = await pre_market_planner.generate_playbook( market=market.code, candidates=candidates_list, today=market_today, ) playbook_store.save(playbook) try: await telegram.notify_playbook_generated( market=market.code, stock_count=playbook.stock_count, scenario_count=playbook.scenario_count, token_count=playbook.token_count, ) except Exception as exc: logger.warning("Playbook notification failed: %s", exc) logger.info( "Generated playbook for %s: %d stocks, %d scenarios", market.code, playbook.stock_count, playbook.scenario_count, ) except Exception as exc: logger.error("Playbook generation failed for %s: %s", market.code, exc) try: await telegram.notify_playbook_failed( market=market.code, reason=str(exc)[:200], ) except Exception as notify_exc: logger.warning("Playbook failed notification error: %s", notify_exc) playbook = PreMarketPlanner._empty_playbook(market_today, market.code) # Collect market data for all stocks from scanner stocks_data = [] for stock_code in watchlist: try: if market.is_domestic: current_price, price_change_pct, foreigner_net = ( await _retry_connection( broker.get_current_price, stock_code, label=stock_code, ) ) else: price_data = await _retry_connection( overseas_broker.get_overseas_price, market.exchange_code, stock_code, label=f"{stock_code}@{market.exchange_code}", ) current_price = safe_float( price_data.get("output", {}).get("last", "0") ) # Fallback: if price API returns 0, use scanner candidate price if current_price <= 0: cand_lookup = candidate_map.get(stock_code) if cand_lookup and cand_lookup.price > 0: logger.debug( "Price API returned 0 for %s; using scanner candidate price %.4f", stock_code, cand_lookup.price, ) current_price = cand_lookup.price foreigner_net = 0.0 price_change_pct = safe_float( price_data.get("output", {}).get("rate", "0") ) # Fall back to scanner candidate price if API returns 0. if current_price <= 0: cand_lookup = candidate_map.get(stock_code) if cand_lookup and cand_lookup.price > 0: current_price = cand_lookup.price logger.debug( "Price API returned 0 for %s; using scanner price %.4f", stock_code, current_price, ) stock_data: dict[str, Any] = { "stock_code": stock_code, "market_name": market.name, "current_price": current_price, "foreigner_net": foreigner_net, "price_change_pct": price_change_pct, } # Enrich with scanner metrics cand = candidate_map.get(stock_code) if cand: stock_data["rsi"] = cand.rsi stock_data["volume_ratio"] = cand.volume_ratio stocks_data.append(stock_data) except Exception as exc: logger.error("Failed to fetch data for %s: %s", stock_code, exc) continue if not stocks_data: logger.warning("No valid stock data for market %s", market.code) continue # Get balance data once for the market (read-only — safe to retry) try: if market.is_domestic: balance_data = await _retry_connection( broker.get_balance, label=f"balance:{market.code}" ) else: balance_data = await _retry_connection( overseas_broker.get_overseas_balance, market.exchange_code, label=f"overseas_balance:{market.exchange_code}", ) except ConnectionError as exc: logger.error( "Balance fetch failed for market %s after all retries — skipping market: %s", market.code, exc, ) continue if market.is_domestic: output2 = balance_data.get("output2", [{}]) total_eval = safe_float( output2[0].get("tot_evlu_amt", "0") ) if output2 else 0 total_cash = safe_float( output2[0].get("dnca_tot_amt", "0") ) if output2 else 0 purchase_total = safe_float( output2[0].get("pchs_amt_smtl_amt", "0") ) if output2 else 0 else: output2 = balance_data.get("output2", [{}]) if isinstance(output2, list) and output2: balance_info = output2[0] elif isinstance(output2, dict): balance_info = output2 else: balance_info = {} total_eval = safe_float(balance_info.get("frcr_evlu_tota", "0") or "0") purchase_total = safe_float( balance_info.get("frcr_buy_amt_smtl", "0") or "0" ) # Fetch available foreign currency cash via inquire-psamount (TTTS3007R/VTTS3007R). # TTTS3012R output2 does not include a cash/deposit field — frcr_dncl_amt_2 does not exist. # Use the first stock with a valid price as the reference for the buying power query. # Source: 한국투자증권 오픈API 전체문서 (20260221) — '해외주식 매수가능금액조회' 시트 total_cash = 0.0 ref_stock = next( (s for s in stocks_data if s.get("current_price", 0) > 0), None ) if ref_stock: try: ps_data = await overseas_broker.get_overseas_buying_power( market.exchange_code, ref_stock["stock_code"], ref_stock["current_price"], ) total_cash = safe_float( ps_data.get("output", {}).get("ovrs_ord_psbl_amt", "0") or "0" ) except ConnectionError as exc: logger.warning( "Could not fetch overseas buying power for %s: %s", market.exchange_code, exc, ) # Paper mode fallback: VTS overseas balance API often fails for many accounts. # Only activate in paper mode — live mode must use real balance from KIS. if ( total_cash <= 0 and settings.MODE == "paper" and settings.PAPER_OVERSEAS_CASH > 0 ): total_cash = settings.PAPER_OVERSEAS_CASH # Capture the day's opening portfolio value on the first market processed # in this session. Used to compute intra-day P&L for the CB instead of # the cumulative purchase_total which spans the entire account history. if daily_start_eval <= 0 and total_eval > 0: daily_start_eval = total_eval logger.info( "Daily CB baseline set: total_eval=%.2f (first balance of the day)", daily_start_eval, ) # Daily P&L: compare current eval vs start-of-day eval. # Falls back to purchase_total if daily_start_eval is unavailable (e.g. paper # mode where balance API returns 0 for all values). if daily_start_eval > 0: pnl_pct = (total_eval - daily_start_eval) / daily_start_eval * 100 else: pnl_pct = ( ((total_eval - purchase_total) / purchase_total * 100) if purchase_total > 0 else 0.0 ) portfolio_data = { "portfolio_pnl_pct": pnl_pct, "total_cash": total_cash, "total_eval": total_eval, } # Evaluate scenarios for each stock (local, no API calls) logger.info( "Evaluating %d stocks against playbook for %s", len(stocks_data), market.name, ) for stock_data in stocks_data: stock_code = stock_data["stock_code"] match = scenario_engine.evaluate( playbook, stock_code, stock_data, portfolio_data, ) decision = TradeDecision( action=match.action.value, confidence=match.confidence, rationale=match.rationale, ) logger.info( "Decision for %s (%s): %s (confidence=%d)", stock_code, market.name, decision.action, decision.confidence, ) # BUY 중복 방지: 브로커 잔고 기반 (미체결 SELL 리밋 주문 보호) if decision.action == "BUY": daily_existing = get_open_position(db_conn, stock_code, market.code) if not daily_existing: # SELL 지정가 접수 후 미체결 시 DB는 종료로 기록되나 브로커는 여전히 보유 중. # 국내/해외 모두 라이브 브로커 잔고를 authoritative source로 사용. broker_qty = _extract_held_qty_from_balance( balance_data, stock_code, is_domestic=market.is_domestic ) if broker_qty > 0: daily_existing = {"price": 0.0, "quantity": broker_qty} if daily_existing: decision = TradeDecision( action="HOLD", confidence=decision.confidence, rationale=( f"Already holding {stock_code} " f"(entry={daily_existing['price']:.4f}, " f"qty={daily_existing['quantity']})" ), ) logger.info( "BUY suppressed for %s (%s): already holding open position", stock_code, market.name, ) # Log decision context_snapshot = { "L1": { "current_price": stock_data["current_price"], "foreigner_net": stock_data["foreigner_net"], }, "L2": { "total_eval": total_eval, "total_cash": total_cash, "purchase_total": purchase_total, "pnl_pct": pnl_pct, }, "scenario_match": match.match_details, } input_data = { "current_price": stock_data["current_price"], "foreigner_net": stock_data["foreigner_net"], "total_eval": total_eval, "total_cash": total_cash, "pnl_pct": pnl_pct, } decision_id = decision_logger.log_decision( stock_code=stock_code, market=market.code, exchange_code=market.exchange_code, action=decision.action, confidence=decision.confidence, rationale=decision.rationale, context_snapshot=context_snapshot, input_data=input_data, ) # Execute if actionable quantity = 0 trade_price = stock_data["current_price"] trade_pnl = 0.0 order_succeeded = True if decision.action in ("BUY", "SELL"): if KILL_SWITCH.new_orders_blocked: logger.critical( "KillSwitch block active: skip %s order for %s (%s)", decision.action, stock_code, market.name, ) continue daily_broker_held_qty = ( _extract_held_qty_from_balance( balance_data, stock_code, is_domestic=market.is_domestic ) if decision.action == "SELL" else 0 ) quantity = _determine_order_quantity( action=decision.action, current_price=stock_data["current_price"], total_cash=total_cash, candidate=candidate_map.get(stock_code), settings=settings, broker_held_qty=daily_broker_held_qty, ) if quantity <= 0: logger.info( "Skip %s %s (%s): no affordable quantity (cash=%.2f, price=%.2f)", decision.action, stock_code, market.name, total_cash, stock_data["current_price"], ) continue order_amount = stock_data["current_price"] * quantity # Check BUY cooldown (insufficient balance) if decision.action == "BUY": daily_cooldown_key = f"{market.code}:{stock_code}" daily_cooldown_until = daily_buy_cooldown.get(daily_cooldown_key, 0.0) now = asyncio.get_event_loop().time() if now < daily_cooldown_until: remaining = int(daily_cooldown_until - now) logger.info( "Skip BUY %s (%s): insufficient-balance cooldown active (%ds remaining)", stock_code, market.name, remaining, ) continue # Risk check # SELL orders do not consume cash (they receive it), so fat-finger # check is skipped for SELLs — only circuit breaker applies. try: if decision.action == "SELL": risk.check_circuit_breaker(pnl_pct) else: risk.validate_order( current_pnl_pct=pnl_pct, order_amount=order_amount, total_cash=total_cash, ) except FatFingerRejected as exc: try: await telegram.notify_fat_finger( stock_code=stock_code, order_amount=exc.order_amount, total_cash=exc.total_cash, max_pct=exc.max_pct, ) except Exception as notify_exc: logger.warning("Fat finger notification failed: %s", notify_exc) continue # Skip this order except CircuitBreakerTripped as exc: ks_report = await KILL_SWITCH.trigger( reason=f"daily_circuit_breaker:{market.code}:{stock_code}:{exc.pnl_pct:.2f}", snapshot_state=lambda: logger.critical( "Daily KillSwitch snapshot %s/%s pnl=%.2f threshold=%.2f", market.code, stock_code, exc.pnl_pct, exc.threshold, ), ) logger.critical("Circuit breaker tripped — stopping session") try: await telegram.notify_circuit_breaker( pnl_pct=exc.pnl_pct, threshold=exc.threshold, ) except Exception as notify_exc: logger.warning( "Circuit breaker notification failed: %s", notify_exc ) if ks_report.errors: logger.critical( "Daily KillSwitch step errors for %s/%s: %s", market.code, stock_code, "; ".join(ks_report.errors), ) raise # Send order order_succeeded = True try: if market.is_domestic: # Use limit orders (지정가) for domestic stocks. # KRX tick rounding applied via kr_round_down. if decision.action == "BUY": order_price = kr_round_down( stock_data["current_price"] * 1.002 ) else: order_price = kr_round_down( stock_data["current_price"] * 0.998 ) try: validate_order_policy( market=market, order_type=decision.action, price=float(order_price), ) except OrderPolicyRejected as exc: logger.warning( "Order policy rejected %s %s (%s): %s [session=%s]", decision.action, stock_code, market.name, exc, exc.session_id, ) continue if _maybe_queue_order_intent( market=market, stock_code=stock_code, order_type=decision.action, quantity=quantity, price=float(order_price), source="run_daily_session", ): continue result = await broker.send_order( stock_code=stock_code, order_type=decision.action, quantity=quantity, price=order_price, ) else: # KIS VTS only accepts limit orders; use 0.5% premium for BUY if decision.action == "BUY": order_price = round(stock_data["current_price"] * 1.005, 4) else: order_price = stock_data["current_price"] try: validate_order_policy( market=market, order_type=decision.action, price=float(order_price), ) except OrderPolicyRejected as exc: logger.warning( "Order policy rejected %s %s (%s): %s [session=%s]", decision.action, stock_code, market.name, exc, exc.session_id, ) continue if _maybe_queue_order_intent( market=market, stock_code=stock_code, order_type=decision.action, quantity=quantity, price=float(order_price), source="run_daily_session", ): continue result = await overseas_broker.send_overseas_order( exchange_code=market.exchange_code, stock_code=stock_code, order_type=decision.action, quantity=quantity, price=order_price, # limit order ) if result.get("rt_cd", "") != "0": order_succeeded = False daily_msg1 = result.get("msg1") or "" logger.warning( "Overseas order not accepted for %s: rt_cd=%s msg=%s", stock_code, result.get("rt_cd"), daily_msg1, ) if decision.action == "BUY" and "주문가능금액" in daily_msg1: daily_cooldown_key = f"{market.code}:{stock_code}" daily_buy_cooldown[daily_cooldown_key] = ( asyncio.get_event_loop().time() + _BUY_COOLDOWN_SECONDS ) logger.info( "BUY cooldown set for %s: %.0fs (insufficient balance)", stock_code, _BUY_COOLDOWN_SECONDS, ) logger.info("Order result: %s", result.get("msg1", "OK")) # Notify trade execution (only on success) if order_succeeded: try: await telegram.notify_trade_execution( stock_code=stock_code, market=market.name, action=decision.action, quantity=quantity, price=stock_data["current_price"], confidence=decision.confidence, ) except Exception as exc: logger.warning("Telegram notification failed: %s", exc) except Exception as exc: logger.error( "Order execution failed for %s: %s", stock_code, exc ) continue if decision.action == "SELL" and order_succeeded: buy_trade = get_latest_buy_trade(db_conn, stock_code, market.code) if buy_trade and buy_trade.get("price") is not None: buy_price = float(buy_trade["price"]) buy_qty = int(buy_trade.get("quantity") or 1) trade_pnl = (trade_price - buy_price) * buy_qty decision_logger.update_outcome( decision_id=buy_trade["decision_id"], pnl=trade_pnl, accuracy=1 if trade_pnl > 0 else 0, ) # Log trade (skip if order was rejected by API) if decision.action in ("BUY", "SELL") and not order_succeeded: continue log_trade( conn=db_conn, stock_code=stock_code, action=decision.action, confidence=decision.confidence, rationale=decision.rationale, quantity=quantity, price=trade_price, pnl=trade_pnl, market=market.code, exchange_code=market.exchange_code, decision_id=decision_id, mode=settings.MODE, ) logger.info("Daily trading session completed") return daily_start_eval async def _handle_market_close( market_code: str, market_name: str, market_timezone: Any, telegram: TelegramClient, context_aggregator: ContextAggregator, daily_reviewer: DailyReviewer, evolution_optimizer: EvolutionOptimizer | None = None, ) -> None: """Handle market-close tasks: notify, aggregate, review, and store context.""" await telegram.notify_market_close(market_name, 0.0) market_date = datetime.now(market_timezone).date().isoformat() context_aggregator.aggregate_daily_from_trades( date=market_date, market=market_code, ) scorecard = daily_reviewer.generate_scorecard(market_date, market_code) daily_reviewer.store_scorecard_in_context(scorecard) lessons = await daily_reviewer.generate_lessons(scorecard) if lessons: scorecard.lessons = lessons daily_reviewer.store_scorecard_in_context(scorecard) await telegram.send_message( f"Daily Review ({market_code})\n" f"Date: {scorecard.date}\n" f"Decisions: {scorecard.total_decisions}\n" f"P&L: {scorecard.total_pnl:+.2f}\n" f"Win Rate: {scorecard.win_rate:.2f}%\n" f"Lessons: {', '.join(scorecard.lessons) if scorecard.lessons else 'N/A'}" ) if evolution_optimizer is not None: await _run_evolution_loop( evolution_optimizer=evolution_optimizer, telegram=telegram, market_code=market_code, market_date=market_date, ) def _run_context_scheduler( scheduler: ContextScheduler, now: datetime | None = None, ) -> None: """Run periodic context scheduler tasks and log when anything executes.""" result = scheduler.run_if_due(now=now) if any( [ result.weekly, result.monthly, result.quarterly, result.annual, result.legacy, result.cleanup, ] ): logger.info( ( "Context scheduler ran (weekly=%s, monthly=%s, quarterly=%s, " "annual=%s, legacy=%s, cleanup=%s)" ), result.weekly, result.monthly, result.quarterly, result.annual, result.legacy, result.cleanup, ) async def _run_evolution_loop( evolution_optimizer: EvolutionOptimizer, telegram: TelegramClient, market_code: str, market_date: str, ) -> None: """Run evolution loop once at US close (end of trading day).""" if not market_code.startswith("US"): return try: pr_info = await evolution_optimizer.evolve() except Exception as exc: logger.warning("Evolution loop failed on %s: %s", market_date, exc) return if pr_info is None: logger.info("Evolution loop skipped on %s (no actionable failures)", market_date) return try: await telegram.send_message( "Evolution Update\n" f"Date: {market_date}\n" f"PR: {pr_info.get('title', 'N/A')}\n" f"Branch: {pr_info.get('branch', 'N/A')}\n" f"Status: {pr_info.get('status', 'N/A')}" ) except Exception as exc: logger.warning("Evolution notification failed on %s: %s", market_date, exc) def _start_dashboard_server(settings: Settings) -> threading.Thread | None: """Start FastAPI dashboard in a daemon thread when enabled.""" if not settings.DASHBOARD_ENABLED: return None # Validate dependencies before spawning the thread so startup failures are # reported synchronously (avoids the misleading "started" → "failed" log pair). try: import uvicorn # noqa: F401 from src.dashboard import create_dashboard_app # noqa: F401 except ImportError as exc: logger.warning("Dashboard server unavailable (missing dependency): %s", exc) return None def _serve() -> None: try: import uvicorn from src.dashboard import create_dashboard_app app = create_dashboard_app(settings.DB_PATH, mode=settings.MODE) uvicorn.run( app, host=settings.DASHBOARD_HOST, port=settings.DASHBOARD_PORT, log_level="info", ) except Exception as exc: logger.warning("Dashboard server stopped unexpectedly: %s", exc) thread = threading.Thread( target=_serve, name="dashboard-server", daemon=True, ) thread.start() logger.info( "Dashboard server started at http://%s:%d", settings.DASHBOARD_HOST, settings.DASHBOARD_PORT, ) return thread def _apply_dashboard_flag(settings: Settings, dashboard_flag: bool) -> Settings: """Apply CLI dashboard flag over environment settings.""" if dashboard_flag and not settings.DASHBOARD_ENABLED: return settings.model_copy(update={"DASHBOARD_ENABLED": True}) return settings async def run(settings: Settings) -> None: """Main async loop — iterate over open markets on a timer.""" global BLACKOUT_ORDER_MANAGER BLACKOUT_ORDER_MANAGER = BlackoutOrderManager( enabled=settings.ORDER_BLACKOUT_ENABLED, windows=parse_blackout_windows_kst(settings.ORDER_BLACKOUT_WINDOWS_KST), max_queue_size=settings.ORDER_BLACKOUT_QUEUE_MAX, ) logger.info( "Blackout manager initialized: enabled=%s windows=%s queue_max=%d", settings.ORDER_BLACKOUT_ENABLED, settings.ORDER_BLACKOUT_WINDOWS_KST, settings.ORDER_BLACKOUT_QUEUE_MAX, ) broker = KISBroker(settings) overseas_broker = OverseasBroker(broker) brain = GeminiClient(settings) risk = RiskManager(settings) db_conn = init_db(settings.DB_PATH) decision_logger = DecisionLogger(db_conn) context_store = ContextStore(db_conn) context_aggregator = ContextAggregator(db_conn) context_scheduler = ContextScheduler( aggregator=context_aggregator, store=context_store, ) evolution_optimizer = EvolutionOptimizer(settings) # V2 proactive strategy components context_selector = ContextSelector(context_store) scenario_engine = ScenarioEngine() playbook_store = PlaybookStore(db_conn) daily_reviewer = DailyReviewer(db_conn, context_store, gemini_client=brain) pre_market_planner = PreMarketPlanner( gemini_client=brain, context_store=context_store, context_selector=context_selector, settings=settings, ) # Track playbooks per market (in-memory cache) playbooks: dict[str, DayPlaybook] = {} # Initialize Telegram notifications telegram = TelegramClient( bot_token=settings.TELEGRAM_BOT_TOKEN, chat_id=settings.TELEGRAM_CHAT_ID, enabled=settings.TELEGRAM_ENABLED, notification_filter=NotificationFilter( trades=settings.TELEGRAM_NOTIFY_TRADES, market_open_close=settings.TELEGRAM_NOTIFY_MARKET_OPEN_CLOSE, fat_finger=settings.TELEGRAM_NOTIFY_FAT_FINGER, system_events=settings.TELEGRAM_NOTIFY_SYSTEM_EVENTS, playbook=settings.TELEGRAM_NOTIFY_PLAYBOOK, scenario_match=settings.TELEGRAM_NOTIFY_SCENARIO_MATCH, errors=settings.TELEGRAM_NOTIFY_ERRORS, ), ) # Initialize Telegram command handler command_handler = TelegramCommandHandler(telegram) # Register basic commands async def handle_help() -> None: """Handle /help command.""" message = ( "📖 Available Commands\n\n" "/help - Show available commands\n" "/status - Trading status (mode, markets, P&L)\n" "/positions - Current holdings\n" "/report - Daily summary report\n" "/scenarios - Today's playbook scenarios\n" "/review - Recent scorecards\n" "/dashboard - Dashboard URL/status\n" "/stop - Pause trading\n" "/resume - Resume trading\n" "/notify - Show notification filter status\n" "/notify [key] [on|off] - Toggle notification type\n" " Keys: trades, market, scenario, playbook,\n" " system, fatfinger, errors, all" ) await telegram.send_message(message) async def handle_stop() -> None: """Handle /stop command - pause trading.""" if not pause_trading.is_set(): await telegram.send_message("⏸️ Trading is already paused") return pause_trading.clear() logger.info("Trading paused via Telegram command") await telegram.send_message( "⏸️ Trading Paused\n\n" "All trading operations have been suspended.\n" "Use /resume to restart trading." ) async def handle_resume() -> None: """Handle /resume command - resume trading.""" if pause_trading.is_set(): await telegram.send_message("▶️ Trading is already active") return pause_trading.set() logger.info("Trading resumed via Telegram command") await telegram.send_message( "▶️ Trading Resumed\n\n" "Trading operations have been restarted." ) async def handle_status() -> None: """Handle /status command - show trading status.""" try: # Get trading status trading_status = "Active" if pause_trading.is_set() else "Paused" # Calculate P&L from balance data try: balance = await broker.get_balance() output2 = balance.get("output2", [{}]) if output2: total_eval = safe_float(output2[0].get("tot_evlu_amt", "0")) purchase_total = safe_float(output2[0].get("pchs_amt_smtl_amt", "0")) current_pnl = ( ((total_eval - purchase_total) / purchase_total * 100) if purchase_total > 0 else 0.0 ) pnl_str = f"{current_pnl:+.2f}%" else: pnl_str = "N/A" except Exception as exc: logger.warning("Failed to get P&L: %s", exc) pnl_str = "N/A" # Format market list markets_str = ", ".join(settings.enabled_market_list) message = ( "📊 Trading Status\n\n" f"Mode: {settings.MODE.upper()}\n" f"Markets: {markets_str}\n" f"Trading: {trading_status}\n\n" f"Current P&L: {pnl_str}\n" f"Circuit Breaker: {risk._cb_threshold:.1f}%" ) await telegram.send_message(message) except Exception as exc: logger.error("Error in /status handler: %s", exc) await telegram.send_message( "⚠️ Error\n\nFailed to retrieve trading status." ) async def handle_positions() -> None: """Handle /positions command - show account summary.""" try: # Get account balance balance = await broker.get_balance() output2 = balance.get("output2", [{}]) if not output2: await telegram.send_message( "💼 Account Summary\n\n" "No balance information available." ) return # Extract account-level data total_eval = safe_float(output2[0].get("tot_evlu_amt", "0")) total_cash = safe_float(output2[0].get("dnca_tot_amt", "0")) purchase_total = safe_float(output2[0].get("pchs_amt_smtl_amt", "0")) # Calculate P&L pnl_pct = ( ((total_eval - purchase_total) / purchase_total * 100) if purchase_total > 0 else 0.0 ) pnl_sign = "+" if pnl_pct >= 0 else "" message = ( "💼 Account Summary\n\n" f"Total Evaluation: ₩{total_eval:,.0f}\n" f"Available Cash: ₩{total_cash:,.0f}\n" f"Purchase Total: ₩{purchase_total:,.0f}\n" f"P&L: {pnl_sign}{pnl_pct:.2f}%\n\n" "Note: Individual position details require API enhancement" ) await telegram.send_message(message) except Exception as exc: logger.error("Error in /positions handler: %s", exc) await telegram.send_message( "⚠️ Error\n\nFailed to retrieve positions." ) async def handle_report() -> None: """Handle /report command - show daily summary metrics.""" try: today = datetime.now(UTC).date().isoformat() trade_row = db_conn.execute( """ SELECT COUNT(*) AS trade_count, COALESCE(SUM(pnl), 0.0) AS total_pnl, SUM(CASE WHEN pnl > 0 THEN 1 ELSE 0 END) AS wins FROM trades WHERE DATE(timestamp) = ? """, (today,), ).fetchone() decision_row = db_conn.execute( """ SELECT COUNT(*) AS decision_count, COALESCE(AVG(confidence), 0.0) AS avg_confidence FROM decision_logs WHERE DATE(timestamp) = ? """, (today,), ).fetchone() trade_count = int(trade_row[0] if trade_row else 0) total_pnl = float(trade_row[1] if trade_row else 0.0) wins = int(trade_row[2] if trade_row and trade_row[2] is not None else 0) decision_count = int(decision_row[0] if decision_row else 0) avg_confidence = float(decision_row[1] if decision_row else 0.0) win_rate = (wins / trade_count * 100.0) if trade_count > 0 else 0.0 await telegram.send_message( "📈 Daily Report\n\n" f"Date: {today}\n" f"Trades: {trade_count}\n" f"Total P&L: {total_pnl:+.2f}\n" f"Win Rate: {win_rate:.2f}%\n" f"Decisions: {decision_count}\n" f"Avg Confidence: {avg_confidence:.2f}" ) except Exception as exc: logger.error("Error in /report handler: %s", exc) await telegram.send_message( "⚠️ Error\n\nFailed to generate daily report." ) async def handle_scenarios() -> None: """Handle /scenarios command - show today's playbook scenarios.""" try: today = datetime.now(UTC).date().isoformat() rows = db_conn.execute( """ SELECT market, playbook_json FROM playbooks WHERE date = ? ORDER BY market """, (today,), ).fetchall() if not rows: await telegram.send_message( "🧠 Today's Scenarios\n\nNo playbooks found for today." ) return lines = ["🧠 Today's Scenarios", ""] for market, playbook_json in rows: lines.append(f"{market}") playbook_data = {} try: playbook_data = json.loads(playbook_json) except Exception: playbook_data = {} stock_playbooks = playbook_data.get("stock_playbooks", []) if not stock_playbooks: lines.append("- No scenarios") lines.append("") continue for stock_pb in stock_playbooks: stock_code = stock_pb.get("stock_code", "N/A") scenarios = stock_pb.get("scenarios", []) for sc in scenarios: action = sc.get("action", "HOLD") confidence = sc.get("confidence", 0) lines.append(f"- {stock_code}: {action} ({confidence})") lines.append("") await telegram.send_message("\n".join(lines).strip()) except Exception as exc: logger.error("Error in /scenarios handler: %s", exc) await telegram.send_message( "⚠️ Error\n\nFailed to retrieve scenarios." ) async def handle_review() -> None: """Handle /review command - show recent scorecards.""" try: rows = db_conn.execute( """ SELECT timeframe, key, value FROM contexts WHERE layer = 'L6_DAILY' AND key LIKE 'scorecard_%' ORDER BY updated_at DESC LIMIT 5 """ ).fetchall() if not rows: await telegram.send_message( "📝 Recent Reviews\n\nNo scorecards available." ) return lines = ["📝 Recent Reviews", ""] for timeframe, key, value in rows: scorecard = json.loads(value) market = key.replace("scorecard_", "") total_pnl = float(scorecard.get("total_pnl", 0.0)) win_rate = float(scorecard.get("win_rate", 0.0)) decisions = int(scorecard.get("total_decisions", 0)) lines.append( f"- {timeframe} {market}: P&L {total_pnl:+.2f}, " f"Win {win_rate:.2f}%, Decisions {decisions}" ) await telegram.send_message("\n".join(lines)) except Exception as exc: logger.error("Error in /review handler: %s", exc) await telegram.send_message( "⚠️ Error\n\nFailed to retrieve reviews." ) async def handle_notify(args: list[str]) -> None: """Handle /notify [key] [on|off] — query or change notification filters.""" status = telegram.filter_status() # /notify — show current state if not args: lines = ["🔔 알림 필터 현재 상태\n"] for key, enabled in status.items(): icon = "✅" if enabled else "❌" lines.append(f"{icon} {key}") lines.append("\n예) /notify scenario off") lines.append("예) /notify all off") await telegram.send_message("\n".join(lines)) return # /notify [key] — missing on/off if len(args) == 1: key = args[0].lower() if key == "all": lines = ["🔔 알림 필터 현재 상태\n"] for k, enabled in status.items(): icon = "✅" if enabled else "❌" lines.append(f"{icon} {k}") await telegram.send_message("\n".join(lines)) elif key in status: icon = "✅" if status[key] else "❌" await telegram.send_message( f"🔔 {key}: {icon} {'켜짐' if status[key] else '꺼짐'}\n" f"/notify {key} on 또는 /notify {key} off" ) else: valid = ", ".join(list(status.keys()) + ["all"]) await telegram.send_message( f"❌ 알 수 없는 키: {key}\n" f"유효한 키: {valid}" ) return # /notify [key] [on|off] key, toggle = args[0].lower(), args[1].lower() if toggle not in ("on", "off"): await telegram.send_message("❌ on 또는 off 를 입력해 주세요.") return value = toggle == "on" if telegram.set_notification(key, value): icon = "✅" if value else "❌" label = f"전체 알림" if key == "all" else f"{key} 알림" state = "켜짐" if value else "꺼짐" await telegram.send_message(f"{icon} {label} → {state}") logger.info("Notification filter changed via Telegram: %s=%s", key, value) else: valid = ", ".join(list(telegram.filter_status().keys()) + ["all"]) await telegram.send_message( f"❌ 알 수 없는 키: {key}\n" f"유효한 키: {valid}" ) async def handle_dashboard() -> None: """Handle /dashboard command - show dashboard URL if enabled.""" if not settings.DASHBOARD_ENABLED: await telegram.send_message( "🖥️ Dashboard\n\nDashboard is not enabled." ) return url = f"http://{settings.DASHBOARD_HOST}:{settings.DASHBOARD_PORT}" await telegram.send_message( "🖥️ Dashboard\n\n" f"URL: {url}" ) command_handler.register_command("help", handle_help) command_handler.register_command("stop", handle_stop) command_handler.register_command("resume", handle_resume) command_handler.register_command("status", handle_status) command_handler.register_command("positions", handle_positions) command_handler.register_command("report", handle_report) command_handler.register_command("scenarios", handle_scenarios) command_handler.register_command("review", handle_review) command_handler.register_command("dashboard", handle_dashboard) command_handler.register_command_with_args("notify", handle_notify) # Initialize volatility hunter volatility_analyzer = VolatilityAnalyzer(min_volume_surge=2.0, min_price_change=1.0) # Initialize smart scanner (Python-first, AI-last pipeline) smart_scanner = SmartVolatilityScanner( broker=broker, overseas_broker=overseas_broker, volatility_analyzer=volatility_analyzer, settings=settings, ) # Track scan candidates per market for selection context logging scan_candidates: dict[str, dict[str, ScanCandidate]] = {} # market -> {stock_code -> candidate} # Active stocks per market (dynamically discovered by scanner) active_stocks: dict[str, list[str]] = {} # market_code -> [stock_codes] # BUY cooldown: prevents retrying a stock rejected for insufficient balance buy_cooldown: dict[str, float] = {} # "{market_code}:{stock_code}" -> expiry timestamp # Tracks SELL resubmission attempts per "{exchange_code}:{stock_code}" (max 1 until restart). sell_resubmit_counts: dict[str, int] = {} # Initialize latency control system criticality_assessor = CriticalityAssessor( critical_pnl_threshold=-2.5, # Near circuit breaker at -3.0% critical_price_change_threshold=5.0, # 5% in 1 minute critical_volume_surge_threshold=10.0, # 10x average high_volatility_threshold=70.0, low_volatility_threshold=30.0, ) priority_queue = PriorityTaskQueue(max_size=1000) _start_dashboard_server(settings) # Track last scan time for each market last_scan_time: dict[str, float] = {} # Track market open/close state for notifications _market_states: dict[str, bool] = {} # market_code -> is_open # Trading control events shutdown = asyncio.Event() pause_trading = asyncio.Event() pause_trading.set() # Default: trading enabled def _signal_handler() -> None: logger.info("Shutdown signal received") shutdown.set() loop = asyncio.get_running_loop() for sig in (signal.SIGINT, signal.SIGTERM): loop.add_signal_handler(sig, _signal_handler) logger.info("The Ouroboros is alive. Mode: %s, Trading: %s", settings.MODE, settings.TRADE_MODE) logger.info("Enabled markets: %s", settings.enabled_market_list) # Notify system startup try: await telegram.notify_system_start(settings.MODE, settings.enabled_market_list) except Exception as exc: logger.warning("System startup notification failed: %s", exc) # Sync broker positions → DB to prevent double-buy on restart try: await sync_positions_from_broker(broker, overseas_broker, db_conn, settings) except Exception as exc: logger.warning("Startup position sync failed (non-fatal): %s", exc) # Start command handler try: await command_handler.start_polling() except Exception as exc: logger.warning("Failed to start command handler: %s", exc) try: # Branch based on trading mode if settings.TRADE_MODE == "daily": # Daily trading mode: batch decisions at fixed intervals logger.info( "Daily trading mode: %d sessions every %d hours", settings.DAILY_SESSIONS, settings.SESSION_INTERVAL_HOURS, ) session_interval = settings.SESSION_INTERVAL_HOURS * 3600 # Convert to seconds # daily_start_eval: portfolio eval captured at the first session of each # trading day. Reset on calendar-date change so the CB measures only # today's drawdown, not cumulative account history. _cb_daily_start_eval: float = 0.0 _cb_last_date: str = "" while not shutdown.is_set(): # Wait for trading to be unpaused await pause_trading.wait() _run_context_scheduler(context_scheduler, now=datetime.now(UTC)) # Reset intra-day CB baseline on a new calendar date today_str = datetime.now(UTC).date().isoformat() if today_str != _cb_last_date: _cb_last_date = today_str _cb_daily_start_eval = 0.0 logger.info("New trading day %s — daily CB baseline reset", today_str) try: _cb_daily_start_eval = await run_daily_session( broker, overseas_broker, scenario_engine, playbook_store, pre_market_planner, risk, db_conn, decision_logger, context_store, criticality_assessor, telegram, settings, smart_scanner=smart_scanner, daily_start_eval=_cb_daily_start_eval, ) except CircuitBreakerTripped: logger.critical("Circuit breaker tripped — shutting down") await telegram.notify_circuit_breaker( pnl_pct=settings.CIRCUIT_BREAKER_PCT, threshold=settings.CIRCUIT_BREAKER_PCT, ) shutdown.set() break except Exception as exc: logger.exception("Daily session error: %s", exc) # Wait for next session or shutdown logger.info("Next session in %.1f hours", session_interval / 3600) try: await asyncio.wait_for(shutdown.wait(), timeout=session_interval) except TimeoutError: pass # Normal — time for next session else: # Realtime trading mode: original per-stock loop logger.info("Realtime trading mode: 60s interval per stock") while not shutdown.is_set(): # Wait for trading to be unpaused await pause_trading.wait() _run_context_scheduler(context_scheduler, now=datetime.now(UTC)) # Get currently open markets open_markets = get_open_markets(settings.enabled_market_list) if not open_markets: # Notify market close for any markets that were open for market_code, is_open in list(_market_states.items()): if is_open: try: from src.markets.schedule import MARKETS market_info = MARKETS.get(market_code) if market_info: await _handle_market_close( market_code=market_code, market_name=market_info.name, market_timezone=market_info.timezone, telegram=telegram, context_aggregator=context_aggregator, daily_reviewer=daily_reviewer, evolution_optimizer=evolution_optimizer, ) except Exception as exc: logger.warning("Market close notification failed: %s", exc) _market_states[market_code] = False # Clear playbook for closed market (new one generated next open) playbooks.pop(market_code, None) # No markets open — wait until next market opens try: next_market, next_open_time = get_next_market_open( settings.enabled_market_list ) now = datetime.now(UTC) wait_seconds = (next_open_time - now).total_seconds() logger.info( "No markets open. Next market: %s, opens in %.1f hours", next_market.name, wait_seconds / 3600, ) await asyncio.wait_for(shutdown.wait(), timeout=wait_seconds) except TimeoutError: continue # Market should be open now except ValueError as exc: logger.error("Failed to find next market open: %s", exc) await asyncio.sleep(TRADE_INTERVAL_SECONDS) continue # Process each open market for market in open_markets: if shutdown.is_set(): break await process_blackout_recovery_orders( broker=broker, overseas_broker=overseas_broker, db_conn=db_conn, ) # Notify market open if it just opened if not _market_states.get(market.code, False): try: await telegram.notify_market_open(market.name) except Exception as exc: logger.warning("Market open notification failed: %s", exc) _market_states[market.code] = True # Check and handle domestic pending (unfilled) limit orders. if market.is_domestic: try: await handle_domestic_pending_orders( broker, telegram, settings, sell_resubmit_counts, buy_cooldown, ) except Exception as exc: logger.warning("Domestic pending order check failed: %s", exc) # Check and handle overseas pending (unfilled) limit orders. if not market.is_domestic: try: await handle_overseas_pending_orders( overseas_broker, telegram, settings, sell_resubmit_counts, buy_cooldown, ) except Exception as exc: logger.warning("Pending order check failed: %s", exc) # Smart Scanner: dynamic stock discovery (no static watchlists) now_timestamp = asyncio.get_event_loop().time() last_scan = last_scan_time.get(market.code, 0.0) rescan_interval = settings.RESCAN_INTERVAL_SECONDS if now_timestamp - last_scan >= rescan_interval: try: logger.info("Smart Scanner: Scanning %s market", market.name) fallback_stocks: list[str] | None = None if not market.is_domestic: fallback_stocks = await build_overseas_symbol_universe( db_conn=db_conn, overseas_broker=overseas_broker, market=market, active_stocks=active_stocks, ) if not fallback_stocks: logger.debug( "No dynamic overseas symbol universe for %s;" " scanner will use overseas ranking API", market.code, ) candidates = await smart_scanner.scan( market=market, fallback_stocks=fallback_stocks, ) if candidates: # Use scanner results directly as trading candidates active_stocks[market.code] = smart_scanner.get_stock_codes( candidates ) # Store candidates per market for selection context logging scan_candidates[market.code] = { c.stock_code: c for c in candidates } logger.info( "Smart Scanner: Found %d candidates for %s: %s", len(candidates), market.name, [f"{c.stock_code}(RSI={c.rsi:.0f})" for c in candidates], ) # Get market-local date for playbook keying market_today = datetime.now( market.timezone ).date() # Load or generate playbook (1 Gemini call per market per day) if market.code not in playbooks: # Try DB first (survives process restart) stored_pb = playbook_store.load(market_today, market.code) if stored_pb is not None: playbooks[market.code] = stored_pb logger.info( "Loaded existing playbook for %s from DB" " (%d stocks, %d scenarios)", market.code, stored_pb.stock_count, stored_pb.scenario_count, ) else: try: pb = await pre_market_planner.generate_playbook( market=market.code, candidates=candidates, today=market_today, ) playbook_store.save(pb) playbooks[market.code] = pb try: await telegram.notify_playbook_generated( market=market.code, stock_count=pb.stock_count, scenario_count=pb.scenario_count, token_count=pb.token_count, ) except Exception as exc: logger.warning( "Playbook notification failed: %s", exc ) except Exception as exc: logger.error( "Playbook generation failed for %s: %s", market.code, exc, ) try: await telegram.notify_playbook_failed( market=market.code, reason=str(exc)[:200], ) except Exception: pass playbooks[market.code] = ( PreMarketPlanner._empty_playbook( market_today, market.code ) ) else: logger.info( "Smart Scanner: No candidates for %s — no trades", market.name ) active_stocks[market.code] = [] last_scan_time[market.code] = now_timestamp except Exception as exc: logger.error("Smart Scanner failed for %s: %s", market.name, exc) # Get active stocks from scanner (dynamic, no static fallback). # Also include currently-held positions so stop-loss / # take-profit can fire even when a holding drops off the # scanner. Broker balance is the source of truth here — # unlike the local DB it reflects actual fills and any # manual trades done outside the bot. scanner_codes = active_stocks.get(market.code, []) try: if market.is_domestic: held_balance = await broker.get_balance() else: held_balance = await overseas_broker.get_overseas_balance( market.exchange_code ) held_codes = _extract_held_codes_from_balance( held_balance, is_domestic=market.is_domestic ) except Exception as exc: logger.warning( "Failed to fetch holdings for %s: %s — skipping holdings merge", market.name, exc, ) held_codes = [] stock_codes = list(dict.fromkeys(scanner_codes + held_codes)) extra_held = [c for c in held_codes if c not in set(scanner_codes)] if extra_held: logger.info( "Holdings added to loop for %s (not in scanner): %s", market.name, extra_held, ) if not stock_codes: logger.debug("No active stocks for market %s", market.code) continue logger.info("Processing market: %s (%d stocks)", market.name, len(stock_codes)) # Process each stock from scanner results for stock_code in stock_codes: if shutdown.is_set(): break # Get playbook for this market market_playbook = playbooks.get( market.code, PreMarketPlanner._empty_playbook( datetime.now(market.timezone).date(), market.code ), ) # Retry logic for connection errors for attempt in range(1, MAX_CONNECTION_RETRIES + 1): try: await trading_cycle( broker, overseas_broker, scenario_engine, market_playbook, risk, db_conn, decision_logger, context_store, criticality_assessor, telegram, market, stock_code, scan_candidates, settings, buy_cooldown, ) break # Success — exit retry loop except CircuitBreakerTripped as exc: logger.critical("Circuit breaker tripped — shutting down") try: await telegram.notify_circuit_breaker( pnl_pct=exc.pnl_pct, threshold=exc.threshold, ) except Exception as notify_exc: logger.warning( "Circuit breaker notification failed: %s", notify_exc ) raise except ConnectionError as exc: if attempt < MAX_CONNECTION_RETRIES: logger.warning( "Connection error for %s (attempt %d/%d): %s", stock_code, attempt, MAX_CONNECTION_RETRIES, exc, ) await asyncio.sleep(2**attempt) # Exponential backoff else: logger.error( "Connection error for %s (all retries exhausted): %s", stock_code, exc, ) break # Give up on this stock except Exception as exc: logger.exception("Unexpected error for %s: %s", stock_code, exc) break # Don't retry on unexpected errors # Log priority queue metrics periodically metrics = await priority_queue.get_metrics() if metrics.total_enqueued > 0: logger.info( "Priority queue metrics: enqueued=%d, dequeued=%d," " size=%d, timeouts=%d, errors=%d", metrics.total_enqueued, metrics.total_dequeued, metrics.current_size, metrics.total_timeouts, metrics.total_errors, ) # Wait for next cycle or shutdown try: await asyncio.wait_for(shutdown.wait(), timeout=TRADE_INTERVAL_SECONDS) except TimeoutError: pass # Normal — timeout means it's time for next cycle finally: # Notify shutdown before closing resources await telegram.notify_system_shutdown("Normal shutdown") # Clean up resources await command_handler.stop_polling() await broker.close() await telegram.close() db_conn.close() logger.info("The Ouroboros rests.") def main() -> None: parser = argparse.ArgumentParser(description="The Ouroboros Trading Agent") parser.add_argument( "--mode", choices=["paper", "live"], default="paper", help="Trading mode (default: paper)", ) parser.add_argument( "--dashboard", action="store_true", help="Enable FastAPI dashboard server in background thread", ) args = parser.parse_args() setup_logging() settings = Settings(MODE=args.mode) # type: ignore[call-arg] settings = _apply_dashboard_flag(settings, args.dashboard) asyncio.run(run(settings)) if __name__ == "__main__": main()