feat: enforce operational kill switch callbacks in runtime flow (TASK-CODE-003)

This commit is contained in:
agentson
2026-02-27 00:38:26 +09:00
parent 1381b140ab
commit 25401ac132
3 changed files with 249 additions and 25 deletions

View File

@@ -98,3 +98,8 @@ class BlackoutOrderManager:
def requeue(self, intent: QueuedOrderIntent) -> None: def requeue(self, intent: QueuedOrderIntent) -> None:
if len(self._queue) < self._max_queue_size: if len(self._queue) < self._max_queue_size:
self._queue.append(intent) self._queue.append(intent)
def clear(self) -> int:
count = len(self._queue)
self._queue.clear()
return count

View File

@@ -636,6 +636,167 @@ async def process_blackout_recovery_orders(
BLACKOUT_ORDER_MANAGER.requeue(intent) BLACKOUT_ORDER_MANAGER.requeue(intent)
def _resolve_kill_switch_markets(
*,
settings: Settings | None,
current_market: MarketInfo | None,
) -> list[MarketInfo]:
if settings is not None:
markets: list[MarketInfo] = []
seen: set[str] = set()
for market_code in settings.enabled_market_list:
market = MARKETS.get(market_code)
if market is None or market.code in seen:
continue
markets.append(market)
seen.add(market.code)
if markets:
return markets
if current_market is not None:
return [current_market]
return []
async def _cancel_pending_orders_for_kill_switch(
*,
broker: KISBroker,
overseas_broker: OverseasBroker,
markets: list[MarketInfo],
) -> None:
domestic = [m for m in markets if m.is_domestic]
overseas = [m for m in markets if not m.is_domestic]
if domestic:
try:
orders = await broker.get_domestic_pending_orders()
except Exception as exc:
logger.warning("KillSwitch: failed to fetch domestic pending orders: %s", exc)
orders = []
for order in orders:
try:
stock_code = order.get("pdno", "")
orgn_odno = order.get("orgn_odno", "")
krx_fwdg_ord_orgno = order.get("ord_gno_brno", "")
psbl_qty = int(order.get("psbl_qty", "0") or "0")
if not stock_code or not orgn_odno or psbl_qty <= 0:
continue
await broker.cancel_domestic_order(
stock_code=stock_code,
orgn_odno=orgn_odno,
krx_fwdg_ord_orgno=krx_fwdg_ord_orgno,
qty=psbl_qty,
)
except Exception as exc:
logger.warning("KillSwitch: domestic cancel failed: %s", exc)
us_exchanges = frozenset({"NASD", "NYSE", "AMEX"})
exchange_codes: list[str] = []
seen_us = False
for market in overseas:
exc_code = market.exchange_code
if exc_code in us_exchanges:
if not seen_us:
exchange_codes.append("NASD")
seen_us = True
elif exc_code not in exchange_codes:
exchange_codes.append(exc_code)
for exchange_code in exchange_codes:
try:
orders = await overseas_broker.get_overseas_pending_orders(exchange_code)
except Exception as exc:
logger.warning(
"KillSwitch: failed to fetch overseas pending orders for %s: %s",
exchange_code,
exc,
)
continue
for order in orders:
try:
stock_code = order.get("pdno", "")
odno = order.get("odno", "")
nccs_qty = int(order.get("nccs_qty", "0") or "0")
order_exchange = order.get("ovrs_excg_cd") or exchange_code
if not stock_code or not odno or nccs_qty <= 0:
continue
await overseas_broker.cancel_overseas_order(
exchange_code=order_exchange,
stock_code=stock_code,
odno=odno,
qty=nccs_qty,
)
except Exception as exc:
logger.warning("KillSwitch: overseas cancel failed: %s", exc)
async def _refresh_order_state_for_kill_switch(
*,
broker: KISBroker,
overseas_broker: OverseasBroker,
markets: list[MarketInfo],
) -> None:
seen_overseas: set[str] = set()
for market in markets:
try:
if market.is_domestic:
await broker.get_balance()
elif market.exchange_code not in seen_overseas:
seen_overseas.add(market.exchange_code)
await overseas_broker.get_overseas_balance(market.exchange_code)
except Exception as exc:
logger.warning(
"KillSwitch: refresh state failed for %s/%s: %s",
market.code,
market.exchange_code,
exc,
)
def _reduce_risk_for_kill_switch() -> None:
dropped = BLACKOUT_ORDER_MANAGER.clear()
logger.critical("KillSwitch: reduced queued order risk by clearing %d queued intents", dropped)
async def _trigger_emergency_kill_switch(
*,
reason: str,
broker: KISBroker,
overseas_broker: OverseasBroker,
telegram: TelegramClient,
settings: Settings | None,
current_market: MarketInfo | None,
stock_code: str,
pnl_pct: float,
threshold: float,
) -> Any:
markets = _resolve_kill_switch_markets(settings=settings, current_market=current_market)
return await KILL_SWITCH.trigger(
reason=reason,
cancel_pending_orders=lambda: _cancel_pending_orders_for_kill_switch(
broker=broker,
overseas_broker=overseas_broker,
markets=markets,
),
refresh_order_state=lambda: _refresh_order_state_for_kill_switch(
broker=broker,
overseas_broker=overseas_broker,
markets=markets,
),
reduce_risk=_reduce_risk_for_kill_switch,
snapshot_state=lambda: logger.critical(
"KillSwitch snapshot %s/%s pnl=%.2f threshold=%.2f",
current_market.code if current_market else "UNKNOWN",
stock_code,
pnl_pct,
threshold,
),
notify=lambda: telegram.notify_circuit_breaker(
pnl_pct=pnl_pct,
threshold=threshold,
),
)
async def trading_cycle( async def trading_cycle(
broker: KISBroker, broker: KISBroker,
overseas_broker: OverseasBroker, overseas_broker: OverseasBroker,
@@ -1151,15 +1312,16 @@ async def trading_cycle(
logger.warning("Fat finger notification failed: %s", notify_exc) logger.warning("Fat finger notification failed: %s", notify_exc)
raise # Re-raise to prevent trade raise # Re-raise to prevent trade
except CircuitBreakerTripped as exc: except CircuitBreakerTripped as exc:
ks_report = await KILL_SWITCH.trigger( ks_report = await _trigger_emergency_kill_switch(
reason=f"circuit_breaker:{market.code}:{stock_code}:{exc.pnl_pct:.2f}", reason=f"circuit_breaker:{market.code}:{stock_code}:{exc.pnl_pct:.2f}",
snapshot_state=lambda: logger.critical( broker=broker,
"KillSwitch snapshot %s/%s pnl=%.2f threshold=%.2f", overseas_broker=overseas_broker,
market.code, telegram=telegram,
stock_code, settings=settings,
exc.pnl_pct, current_market=market,
exc.threshold, stock_code=stock_code,
), pnl_pct=exc.pnl_pct,
threshold=exc.threshold,
) )
if ks_report.errors: if ks_report.errors:
logger.critical( logger.critical(
@@ -2218,26 +2380,18 @@ async def run_daily_session(
logger.warning("Fat finger notification failed: %s", notify_exc) logger.warning("Fat finger notification failed: %s", notify_exc)
continue # Skip this order continue # Skip this order
except CircuitBreakerTripped as exc: except CircuitBreakerTripped as exc:
ks_report = await KILL_SWITCH.trigger( ks_report = await _trigger_emergency_kill_switch(
reason=f"daily_circuit_breaker:{market.code}:{stock_code}:{exc.pnl_pct:.2f}", reason=f"daily_circuit_breaker:{market.code}:{stock_code}:{exc.pnl_pct:.2f}",
snapshot_state=lambda: logger.critical( broker=broker,
"Daily KillSwitch snapshot %s/%s pnl=%.2f threshold=%.2f", overseas_broker=overseas_broker,
market.code, telegram=telegram,
stock_code, settings=settings,
exc.pnl_pct, current_market=market,
exc.threshold, stock_code=stock_code,
),
)
logger.critical("Circuit breaker tripped — stopping session")
try:
await telegram.notify_circuit_breaker(
pnl_pct=exc.pnl_pct, pnl_pct=exc.pnl_pct,
threshold=exc.threshold, threshold=exc.threshold,
) )
except Exception as notify_exc: logger.critical("Circuit breaker tripped — stopping session")
logger.warning(
"Circuit breaker notification failed: %s", notify_exc
)
if ks_report.errors: if ks_report.errors:
logger.critical( logger.critical(
"Daily KillSwitch step errors for %s/%s: %s", "Daily KillSwitch step errors for %s/%s: %s",

View File

@@ -15,6 +15,7 @@ from src.evolution.scorecard import DailyScorecard
from src.logging.decision_logger import DecisionLogger from src.logging.decision_logger import DecisionLogger
from src.main import ( from src.main import (
KILL_SWITCH, KILL_SWITCH,
_trigger_emergency_kill_switch,
_apply_dashboard_flag, _apply_dashboard_flag,
_determine_order_quantity, _determine_order_quantity,
_extract_avg_price_from_balance, _extract_avg_price_from_balance,
@@ -5349,3 +5350,67 @@ async def test_process_blackout_recovery_drops_policy_rejected_intent() -> None:
broker.send_order.assert_not_called() broker.send_order.assert_not_called()
blackout_manager.requeue.assert_not_called() blackout_manager.requeue.assert_not_called()
@pytest.mark.asyncio
async def test_trigger_emergency_kill_switch_executes_operational_steps() -> None:
"""Emergency kill switch should execute cancel/refresh/reduce/notify callbacks."""
broker = MagicMock()
broker.get_domestic_pending_orders = AsyncMock(
return_value=[
{
"pdno": "005930",
"orgn_odno": "1",
"ord_gno_brno": "01",
"psbl_qty": "3",
}
]
)
broker.cancel_domestic_order = AsyncMock(return_value={"rt_cd": "0"})
broker.get_balance = AsyncMock(return_value={"output1": [], "output2": []})
overseas_broker = MagicMock()
overseas_broker.get_overseas_pending_orders = AsyncMock(return_value=[])
overseas_broker.get_overseas_balance = AsyncMock(return_value={"output1": [], "output2": []})
telegram = MagicMock()
telegram.notify_circuit_breaker = AsyncMock()
settings = MagicMock()
settings.enabled_market_list = ["KR"]
market = MagicMock()
market.code = "KR"
market.exchange_code = "KRX"
market.is_domestic = True
with (
patch("src.main.MARKETS", {"KR": market}),
patch("src.main.BLACKOUT_ORDER_MANAGER.clear", return_value=2),
):
report = await _trigger_emergency_kill_switch(
reason="test",
broker=broker,
overseas_broker=overseas_broker,
telegram=telegram,
settings=settings,
current_market=market,
stock_code="005930",
pnl_pct=-3.2,
threshold=-3.0,
)
assert report.steps == [
"block_new_orders",
"cancel_pending_orders",
"refresh_order_state",
"reduce_risk",
"snapshot_state",
"notify",
]
broker.cancel_domestic_order.assert_called_once()
broker.get_balance.assert_called_once()
telegram.notify_circuit_breaker.assert_called_once_with(
pnl_pct=-3.2,
threshold=-3.0,
)