diff --git a/src/core/blackout_manager.py b/src/core/blackout_manager.py index 9078735..b0bc68b 100644 --- a/src/core/blackout_manager.py +++ b/src/core/blackout_manager.py @@ -98,3 +98,8 @@ class BlackoutOrderManager: def requeue(self, intent: QueuedOrderIntent) -> None: if len(self._queue) < self._max_queue_size: self._queue.append(intent) + + def clear(self) -> int: + count = len(self._queue) + self._queue.clear() + return count diff --git a/src/main.py b/src/main.py index 61aee8a..2fe9ef4 100644 --- a/src/main.py +++ b/src/main.py @@ -636,6 +636,187 @@ async def process_blackout_recovery_orders( BLACKOUT_ORDER_MANAGER.requeue(intent) +def _resolve_kill_switch_markets( + *, + settings: Settings | None, + current_market: MarketInfo | None, +) -> list[MarketInfo]: + if settings is not None: + markets: list[MarketInfo] = [] + seen: set[str] = set() + for market_code in settings.enabled_market_list: + market = MARKETS.get(market_code) + if market is None or market.code in seen: + continue + markets.append(market) + seen.add(market.code) + if markets: + return markets + if current_market is not None: + return [current_market] + return [] + + +async def _cancel_pending_orders_for_kill_switch( + *, + broker: KISBroker, + overseas_broker: OverseasBroker, + markets: list[MarketInfo], +) -> None: + failures: list[str] = [] + domestic = [m for m in markets if m.is_domestic] + overseas = [m for m in markets if not m.is_domestic] + + if domestic: + try: + orders = await broker.get_domestic_pending_orders() + except Exception as exc: + logger.warning("KillSwitch: failed to fetch domestic pending orders: %s", exc) + orders = [] + for order in orders: + stock_code = str(order.get("pdno", "")) + try: + orgn_odno = order.get("orgn_odno", "") + krx_fwdg_ord_orgno = order.get("ord_gno_brno", "") + psbl_qty = int(order.get("psbl_qty", "0") or "0") + if not stock_code or not orgn_odno or psbl_qty <= 0: + continue + cancel_result = await broker.cancel_domestic_order( + stock_code=stock_code, + orgn_odno=orgn_odno, + krx_fwdg_ord_orgno=krx_fwdg_ord_orgno, + qty=psbl_qty, + ) + if cancel_result.get("rt_cd") != "0": + failures.append( + "domestic cancel failed for" + f" {stock_code}: rt_cd={cancel_result.get('rt_cd')}" + f" msg={cancel_result.get('msg1')}" + ) + except Exception as exc: + logger.warning("KillSwitch: domestic cancel failed: %s", exc) + failures.append(f"domestic cancel exception for {stock_code}: {exc}") + + us_exchanges = frozenset({"NASD", "NYSE", "AMEX"}) + exchange_codes: list[str] = [] + seen_us = False + for market in overseas: + exc_code = market.exchange_code + if exc_code in us_exchanges: + if not seen_us: + exchange_codes.append("NASD") + seen_us = True + elif exc_code not in exchange_codes: + exchange_codes.append(exc_code) + + for exchange_code in exchange_codes: + try: + orders = await overseas_broker.get_overseas_pending_orders(exchange_code) + except Exception as exc: + logger.warning( + "KillSwitch: failed to fetch overseas pending orders for %s: %s", + exchange_code, + exc, + ) + continue + for order in orders: + stock_code = str(order.get("pdno", "")) + order_exchange = str(order.get("ovrs_excg_cd") or exchange_code) + try: + odno = order.get("odno", "") + nccs_qty = int(order.get("nccs_qty", "0") or "0") + if not stock_code or not odno or nccs_qty <= 0: + continue + cancel_result = await overseas_broker.cancel_overseas_order( + exchange_code=order_exchange, + stock_code=stock_code, + odno=odno, + qty=nccs_qty, + ) + if cancel_result.get("rt_cd") != "0": + failures.append( + "overseas cancel failed for" + f" {order_exchange}/{stock_code}: rt_cd={cancel_result.get('rt_cd')}" + f" msg={cancel_result.get('msg1')}" + ) + except Exception as exc: + logger.warning("KillSwitch: overseas cancel failed: %s", exc) + failures.append( + f"overseas cancel exception for {order_exchange}/{stock_code}: {exc}" + ) + + if failures: + raise RuntimeError("; ".join(failures[:3])) + + +async def _refresh_order_state_for_kill_switch( + *, + broker: KISBroker, + overseas_broker: OverseasBroker, + markets: list[MarketInfo], +) -> None: + seen_overseas: set[str] = set() + for market in markets: + try: + if market.is_domestic: + await broker.get_balance() + elif market.exchange_code not in seen_overseas: + seen_overseas.add(market.exchange_code) + await overseas_broker.get_overseas_balance(market.exchange_code) + except Exception as exc: + logger.warning( + "KillSwitch: refresh state failed for %s/%s: %s", + market.code, + market.exchange_code, + exc, + ) + + +def _reduce_risk_for_kill_switch() -> None: + dropped = BLACKOUT_ORDER_MANAGER.clear() + logger.critical("KillSwitch: reduced queued order risk by clearing %d queued intents", dropped) + + +async def _trigger_emergency_kill_switch( + *, + reason: str, + broker: KISBroker, + overseas_broker: OverseasBroker, + telegram: TelegramClient, + settings: Settings | None, + current_market: MarketInfo | None, + stock_code: str, + pnl_pct: float, + threshold: float, +) -> Any: + markets = _resolve_kill_switch_markets(settings=settings, current_market=current_market) + return await KILL_SWITCH.trigger( + reason=reason, + cancel_pending_orders=lambda: _cancel_pending_orders_for_kill_switch( + broker=broker, + overseas_broker=overseas_broker, + markets=markets, + ), + refresh_order_state=lambda: _refresh_order_state_for_kill_switch( + broker=broker, + overseas_broker=overseas_broker, + markets=markets, + ), + reduce_risk=_reduce_risk_for_kill_switch, + snapshot_state=lambda: logger.critical( + "KillSwitch snapshot %s/%s pnl=%.2f threshold=%.2f", + current_market.code if current_market else "UNKNOWN", + stock_code, + pnl_pct, + threshold, + ), + notify=lambda: telegram.notify_circuit_breaker( + pnl_pct=pnl_pct, + threshold=threshold, + ), + ) + + async def trading_cycle( broker: KISBroker, overseas_broker: OverseasBroker, @@ -1151,15 +1332,16 @@ async def trading_cycle( logger.warning("Fat finger notification failed: %s", notify_exc) raise # Re-raise to prevent trade except CircuitBreakerTripped as exc: - ks_report = await KILL_SWITCH.trigger( + ks_report = await _trigger_emergency_kill_switch( reason=f"circuit_breaker:{market.code}:{stock_code}:{exc.pnl_pct:.2f}", - snapshot_state=lambda: logger.critical( - "KillSwitch snapshot %s/%s pnl=%.2f threshold=%.2f", - market.code, - stock_code, - exc.pnl_pct, - exc.threshold, - ), + broker=broker, + overseas_broker=overseas_broker, + telegram=telegram, + settings=settings, + current_market=market, + stock_code=stock_code, + pnl_pct=exc.pnl_pct, + threshold=exc.threshold, ) if ks_report.errors: logger.critical( @@ -2218,26 +2400,18 @@ async def run_daily_session( logger.warning("Fat finger notification failed: %s", notify_exc) continue # Skip this order except CircuitBreakerTripped as exc: - ks_report = await KILL_SWITCH.trigger( + ks_report = await _trigger_emergency_kill_switch( reason=f"daily_circuit_breaker:{market.code}:{stock_code}:{exc.pnl_pct:.2f}", - snapshot_state=lambda: logger.critical( - "Daily KillSwitch snapshot %s/%s pnl=%.2f threshold=%.2f", - market.code, - stock_code, - exc.pnl_pct, - exc.threshold, - ), + broker=broker, + overseas_broker=overseas_broker, + telegram=telegram, + settings=settings, + current_market=market, + stock_code=stock_code, + pnl_pct=exc.pnl_pct, + threshold=exc.threshold, ) logger.critical("Circuit breaker tripped — stopping session") - try: - await telegram.notify_circuit_breaker( - pnl_pct=exc.pnl_pct, - threshold=exc.threshold, - ) - except Exception as notify_exc: - logger.warning( - "Circuit breaker notification failed: %s", notify_exc - ) if ks_report.errors: logger.critical( "Daily KillSwitch step errors for %s/%s: %s", diff --git a/tests/test_main.py b/tests/test_main.py index 934b113..3dee447 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -15,6 +15,7 @@ from src.evolution.scorecard import DailyScorecard from src.logging.decision_logger import DecisionLogger from src.main import ( KILL_SWITCH, + _trigger_emergency_kill_switch, _apply_dashboard_flag, _determine_order_quantity, _extract_avg_price_from_balance, @@ -5349,3 +5350,118 @@ async def test_process_blackout_recovery_drops_policy_rejected_intent() -> None: broker.send_order.assert_not_called() blackout_manager.requeue.assert_not_called() + + +@pytest.mark.asyncio +async def test_trigger_emergency_kill_switch_executes_operational_steps() -> None: + """Emergency kill switch should execute cancel/refresh/reduce/notify callbacks.""" + broker = MagicMock() + broker.get_domestic_pending_orders = AsyncMock( + return_value=[ + { + "pdno": "005930", + "orgn_odno": "1", + "ord_gno_brno": "01", + "psbl_qty": "3", + } + ] + ) + broker.cancel_domestic_order = AsyncMock(return_value={"rt_cd": "0"}) + broker.get_balance = AsyncMock(return_value={"output1": [], "output2": []}) + + overseas_broker = MagicMock() + overseas_broker.get_overseas_pending_orders = AsyncMock(return_value=[]) + overseas_broker.get_overseas_balance = AsyncMock(return_value={"output1": [], "output2": []}) + + telegram = MagicMock() + telegram.notify_circuit_breaker = AsyncMock() + + settings = MagicMock() + settings.enabled_market_list = ["KR"] + + market = MagicMock() + market.code = "KR" + market.exchange_code = "KRX" + market.is_domestic = True + + with ( + patch("src.main.MARKETS", {"KR": market}), + patch("src.main.BLACKOUT_ORDER_MANAGER.clear", return_value=2), + ): + report = await _trigger_emergency_kill_switch( + reason="test", + broker=broker, + overseas_broker=overseas_broker, + telegram=telegram, + settings=settings, + current_market=market, + stock_code="005930", + pnl_pct=-3.2, + threshold=-3.0, + ) + + assert report.steps == [ + "block_new_orders", + "cancel_pending_orders", + "refresh_order_state", + "reduce_risk", + "snapshot_state", + "notify", + ] + broker.cancel_domestic_order.assert_called_once() + broker.get_balance.assert_called_once() + telegram.notify_circuit_breaker.assert_called_once_with( + pnl_pct=-3.2, + threshold=-3.0, + ) + + +@pytest.mark.asyncio +async def test_trigger_emergency_kill_switch_records_cancel_failure() -> None: + """Cancel API rejection should be captured in kill switch errors.""" + broker = MagicMock() + broker.get_domestic_pending_orders = AsyncMock( + return_value=[ + { + "pdno": "005930", + "orgn_odno": "1", + "ord_gno_brno": "01", + "psbl_qty": "3", + } + ] + ) + broker.cancel_domestic_order = AsyncMock(return_value={"rt_cd": "1", "msg1": "fail"}) + broker.get_balance = AsyncMock(return_value={"output1": [], "output2": []}) + + overseas_broker = MagicMock() + overseas_broker.get_overseas_pending_orders = AsyncMock(return_value=[]) + overseas_broker.get_overseas_balance = AsyncMock(return_value={"output1": [], "output2": []}) + + telegram = MagicMock() + telegram.notify_circuit_breaker = AsyncMock() + + settings = MagicMock() + settings.enabled_market_list = ["KR"] + + market = MagicMock() + market.code = "KR" + market.exchange_code = "KRX" + market.is_domestic = True + + with ( + patch("src.main.MARKETS", {"KR": market}), + patch("src.main.BLACKOUT_ORDER_MANAGER.clear", return_value=0), + ): + report = await _trigger_emergency_kill_switch( + reason="test-fail", + broker=broker, + overseas_broker=overseas_broker, + telegram=telegram, + settings=settings, + current_market=market, + stock_code="005930", + pnl_pct=-3.2, + threshold=-3.0, + ) + + assert any(err.startswith("cancel_pending_orders:") for err in report.errors)