From 4987b6393ae8d70e19b7b15b6ffccc7f5e121c1a Mon Sep 17 00:00:00 2001 From: agentson Date: Thu, 26 Feb 2026 23:22:58 +0900 Subject: [PATCH 1/2] feat: implement phase1 state machine, composite exits, and kill-switch orchestration (#275) --- src/core/kill_switch.py | 71 +++++++++++++++++ src/main.py | 58 +++++++++++++- src/strategy/exit_rules.py | 104 +++++++++++++++++++++++++ src/strategy/position_state_machine.py | 70 +++++++++++++++++ tests/test_kill_switch.py | 55 +++++++++++++ tests/test_strategy_exit_rules.py | 38 +++++++++ tests/test_strategy_state_machine.py | 30 +++++++ 7 files changed, 424 insertions(+), 2 deletions(-) create mode 100644 src/core/kill_switch.py create mode 100644 src/strategy/exit_rules.py create mode 100644 src/strategy/position_state_machine.py create mode 100644 tests/test_kill_switch.py create mode 100644 tests/test_strategy_exit_rules.py create mode 100644 tests/test_strategy_state_machine.py diff --git a/src/core/kill_switch.py b/src/core/kill_switch.py new file mode 100644 index 0000000..9f2231b --- /dev/null +++ b/src/core/kill_switch.py @@ -0,0 +1,71 @@ +"""Kill switch orchestration for emergency risk actions. + +Order is fixed: +1) block new orders +2) cancel pending orders +3) refresh order state +4) reduce risk +5) snapshot and notify +""" + +from __future__ import annotations + +import inspect +from dataclasses import dataclass, field +from typing import Any, Awaitable, Callable + +StepCallable = Callable[[], Any | Awaitable[Any]] + + +@dataclass +class KillSwitchReport: + reason: str + steps: list[str] = field(default_factory=list) + errors: list[str] = field(default_factory=list) + + +class KillSwitchOrchestrator: + def __init__(self) -> None: + self.new_orders_blocked = False + + async def _run_step( + self, + report: KillSwitchReport, + name: str, + fn: StepCallable | None, + ) -> None: + report.steps.append(name) + if fn is None: + return + try: + result = fn() + if inspect.isawaitable(result): + await result + except Exception as exc: # pragma: no cover - intentionally resilient + report.errors.append(f"{name}: {exc}") + + async def trigger( + self, + *, + reason: str, + cancel_pending_orders: StepCallable | None = None, + refresh_order_state: StepCallable | None = None, + reduce_risk: StepCallable | None = None, + snapshot_state: StepCallable | None = None, + notify: StepCallable | None = None, + ) -> KillSwitchReport: + report = KillSwitchReport(reason=reason) + + self.new_orders_blocked = True + report.steps.append("block_new_orders") + + await self._run_step(report, "cancel_pending_orders", cancel_pending_orders) + await self._run_step(report, "refresh_order_state", refresh_order_state) + await self._run_step(report, "reduce_risk", reduce_risk) + await self._run_step(report, "snapshot_state", snapshot_state) + await self._run_step(report, "notify", notify) + + return report + + def clear_block(self) -> None: + self.new_orders_blocked = False diff --git a/src/main.py b/src/main.py index 3f16c75..eaa2642 100644 --- a/src/main.py +++ b/src/main.py @@ -27,6 +27,7 @@ from src.context.layer import ContextLayer from src.context.scheduler import ContextScheduler from src.context.store import ContextStore from src.core.criticality import CriticalityAssessor +from src.core.kill_switch import KillSwitchOrchestrator from src.core.priority_queue import PriorityTaskQueue from src.core.risk_manager import CircuitBreakerTripped, FatFingerRejected, RiskManager from src.db import ( @@ -43,11 +44,14 @@ from src.logging_config import setup_logging from src.markets.schedule import MARKETS, MarketInfo, get_next_market_open, get_open_markets from src.notifications.telegram_client import NotificationFilter, TelegramClient, TelegramCommandHandler from src.strategy.models import DayPlaybook, MarketOutlook +from src.strategy.exit_rules import ExitRuleConfig, ExitRuleInput, evaluate_exit from src.strategy.playbook_store import PlaybookStore from src.strategy.pre_market_planner import PreMarketPlanner +from src.strategy.position_state_machine import PositionState from src.strategy.scenario_engine import ScenarioEngine logger = logging.getLogger(__name__) +KILL_SWITCH = KillSwitchOrchestrator() def safe_float(value: str | float | None, default: float = 0.0) -> float: @@ -784,7 +788,24 @@ async def trading_cycle( stop_loss_threshold = stock_playbook.scenarios[0].stop_loss_pct take_profit_threshold = stock_playbook.scenarios[0].take_profit_pct - if loss_pct <= stop_loss_threshold: + exit_eval = evaluate_exit( + current_state=PositionState.HOLDING, + config=ExitRuleConfig( + hard_stop_pct=stop_loss_threshold, + be_arm_pct=max(0.5, take_profit_threshold * 0.4), + arm_pct=take_profit_threshold, + ), + inp=ExitRuleInput( + current_price=current_price, + entry_price=entry_price, + peak_price=max(entry_price, current_price), + atr_value=0.0, + pred_down_prob=0.0, + liquidity_weak=market_data.get("volume_ratio", 1.0) < 1.0, + ), + ) + + if exit_eval.reason == "hard_stop": decision = TradeDecision( action="SELL", confidence=95, @@ -800,7 +821,7 @@ async def trading_cycle( loss_pct, stop_loss_threshold, ) - elif loss_pct >= take_profit_threshold: + elif exit_eval.reason == "arm_take_profit": decision = TradeDecision( action="SELL", confidence=90, @@ -944,6 +965,28 @@ async def trading_cycle( except Exception as notify_exc: logger.warning("Fat finger notification failed: %s", notify_exc) raise # Re-raise to prevent trade + except CircuitBreakerTripped as exc: + await KILL_SWITCH.trigger( + reason=f"circuit_breaker:{market.code}:{stock_code}:{exc.pnl_pct:.2f}", + snapshot_state=lambda: logger.critical( + "KillSwitch snapshot %s/%s pnl=%.2f threshold=%.2f", + market.code, + stock_code, + exc.pnl_pct, + exc.threshold, + ), + ) + try: + await telegram.notify_circuit_breaker( + pnl_pct=exc.pnl_pct, + threshold=exc.threshold, + ) + except Exception as notify_exc: + logger.warning( + "Circuit breaker notification failed: %s", notify_exc + ) + KILL_SWITCH.clear_block() + raise # 5. Send order order_succeeded = True @@ -1911,6 +1954,16 @@ async def run_daily_session( logger.warning("Fat finger notification failed: %s", notify_exc) continue # Skip this order except CircuitBreakerTripped as exc: + await KILL_SWITCH.trigger( + reason=f"daily_circuit_breaker:{market.code}:{stock_code}:{exc.pnl_pct:.2f}", + snapshot_state=lambda: logger.critical( + "Daily KillSwitch snapshot %s/%s pnl=%.2f threshold=%.2f", + market.code, + stock_code, + exc.pnl_pct, + exc.threshold, + ), + ) logger.critical("Circuit breaker tripped — stopping session") try: await telegram.notify_circuit_breaker( @@ -1921,6 +1974,7 @@ async def run_daily_session( logger.warning( "Circuit breaker notification failed: %s", notify_exc ) + KILL_SWITCH.clear_block() raise # Send order diff --git a/src/strategy/exit_rules.py b/src/strategy/exit_rules.py new file mode 100644 index 0000000..efc037d --- /dev/null +++ b/src/strategy/exit_rules.py @@ -0,0 +1,104 @@ +"""Composite exit rules: hard stop, break-even lock, ATR trailing, model assist.""" + +from __future__ import annotations + +from dataclasses import dataclass + +from src.strategy.position_state_machine import PositionState, StateTransitionInput, promote_state + + +@dataclass(frozen=True) +class ExitRuleConfig: + hard_stop_pct: float = -2.0 + be_arm_pct: float = 1.2 + arm_pct: float = 3.0 + atr_multiplier_k: float = 2.2 + model_prob_threshold: float = 0.62 + + +@dataclass(frozen=True) +class ExitRuleInput: + current_price: float + entry_price: float + peak_price: float + atr_value: float = 0.0 + pred_down_prob: float = 0.0 + liquidity_weak: bool = False + + +@dataclass(frozen=True) +class ExitEvaluation: + state: PositionState + should_exit: bool + reason: str + unrealized_pnl_pct: float + trailing_stop_price: float | None + + +def evaluate_exit( + *, + current_state: PositionState, + config: ExitRuleConfig, + inp: ExitRuleInput, +) -> ExitEvaluation: + """Evaluate composite exit logic and return updated state.""" + if inp.entry_price <= 0 or inp.current_price <= 0: + return ExitEvaluation( + state=current_state, + should_exit=False, + reason="invalid_price", + unrealized_pnl_pct=0.0, + trailing_stop_price=None, + ) + + unrealized = (inp.current_price - inp.entry_price) / inp.entry_price * 100.0 + hard_stop_hit = unrealized <= config.hard_stop_pct + take_profit_hit = unrealized >= config.arm_pct + + trailing_stop_price: float | None = None + trailing_stop_hit = False + if inp.atr_value > 0 and inp.peak_price > 0: + trailing_stop_price = inp.peak_price - (config.atr_multiplier_k * inp.atr_value) + trailing_stop_hit = inp.current_price <= trailing_stop_price + + be_lock_threat = current_state in (PositionState.BE_LOCK, PositionState.ARMED) and ( + inp.current_price <= inp.entry_price + ) + model_exit_signal = inp.pred_down_prob >= config.model_prob_threshold and inp.liquidity_weak + + next_state = promote_state( + current=current_state, + inp=StateTransitionInput( + unrealized_pnl_pct=unrealized, + be_arm_pct=config.be_arm_pct, + arm_pct=config.arm_pct, + hard_stop_hit=hard_stop_hit, + trailing_stop_hit=trailing_stop_hit, + model_exit_signal=model_exit_signal, + be_lock_threat=be_lock_threat, + ), + ) + + if hard_stop_hit: + reason = "hard_stop" + elif trailing_stop_hit: + reason = "atr_trailing_stop" + elif be_lock_threat: + reason = "be_lock_threat" + elif model_exit_signal: + reason = "model_liquidity_exit" + elif take_profit_hit: + # Backward-compatible immediate profit-taking path. + reason = "arm_take_profit" + else: + reason = "hold" + + should_exit = next_state == PositionState.EXITED or take_profit_hit + + return ExitEvaluation( + state=next_state, + should_exit=should_exit, + reason=reason, + unrealized_pnl_pct=unrealized, + trailing_stop_price=trailing_stop_price, + ) diff --git a/src/strategy/position_state_machine.py b/src/strategy/position_state_machine.py new file mode 100644 index 0000000..6a9e3a6 --- /dev/null +++ b/src/strategy/position_state_machine.py @@ -0,0 +1,70 @@ +"""Position state machine for staged exit control. + +State progression is monotonic (promotion-only) except terminal EXITED. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from enum import Enum + + +class PositionState(str, Enum): + HOLDING = "HOLDING" + BE_LOCK = "BE_LOCK" + ARMED = "ARMED" + EXITED = "EXITED" + + +_STATE_RANK: dict[PositionState, int] = { + PositionState.HOLDING: 0, + PositionState.BE_LOCK: 1, + PositionState.ARMED: 2, + PositionState.EXITED: 3, +} + + +@dataclass(frozen=True) +class StateTransitionInput: + unrealized_pnl_pct: float + be_arm_pct: float + arm_pct: float + hard_stop_hit: bool = False + trailing_stop_hit: bool = False + model_exit_signal: bool = False + be_lock_threat: bool = False + + +def evaluate_exit_first(inp: StateTransitionInput) -> bool: + """Return True when terminal exit conditions are met. + + EXITED must be evaluated before any promotion. + """ + return ( + inp.hard_stop_hit + or inp.trailing_stop_hit + or inp.model_exit_signal + or inp.be_lock_threat + ) + + +def promote_state(current: PositionState, inp: StateTransitionInput) -> PositionState: + """Promote to highest admissible state for current tick/bar. + + Rules: + - EXITED has highest precedence and is terminal. + - Promotions are monotonic (no downgrade). + """ + if current == PositionState.EXITED: + return PositionState.EXITED + + if evaluate_exit_first(inp): + return PositionState.EXITED + + target = PositionState.HOLDING + if inp.unrealized_pnl_pct >= inp.arm_pct: + target = PositionState.ARMED + elif inp.unrealized_pnl_pct >= inp.be_arm_pct: + target = PositionState.BE_LOCK + + return target if _STATE_RANK[target] > _STATE_RANK[current] else current diff --git a/tests/test_kill_switch.py b/tests/test_kill_switch.py new file mode 100644 index 0000000..b4c47f9 --- /dev/null +++ b/tests/test_kill_switch.py @@ -0,0 +1,55 @@ +import pytest + +from src.core.kill_switch import KillSwitchOrchestrator + + +@pytest.mark.asyncio +async def test_kill_switch_executes_steps_in_order() -> None: + ks = KillSwitchOrchestrator() + calls: list[str] = [] + + async def _cancel() -> None: + calls.append("cancel") + + def _refresh() -> None: + calls.append("refresh") + + def _reduce() -> None: + calls.append("reduce") + + def _snapshot() -> None: + calls.append("snapshot") + + def _notify() -> None: + calls.append("notify") + + report = await ks.trigger( + reason="test", + cancel_pending_orders=_cancel, + refresh_order_state=_refresh, + reduce_risk=_reduce, + snapshot_state=_snapshot, + notify=_notify, + ) + + assert report.steps == [ + "block_new_orders", + "cancel_pending_orders", + "refresh_order_state", + "reduce_risk", + "snapshot_state", + "notify", + ] + assert calls == ["cancel", "refresh", "reduce", "snapshot", "notify"] + assert report.errors == [] + + +@pytest.mark.asyncio +async def test_kill_switch_collects_step_errors() -> None: + ks = KillSwitchOrchestrator() + + def _boom() -> None: + raise RuntimeError("boom") + + report = await ks.trigger(reason="test", cancel_pending_orders=_boom) + assert any(err.startswith("cancel_pending_orders:") for err in report.errors) diff --git a/tests/test_strategy_exit_rules.py b/tests/test_strategy_exit_rules.py new file mode 100644 index 0000000..6e23214 --- /dev/null +++ b/tests/test_strategy_exit_rules.py @@ -0,0 +1,38 @@ +from src.strategy.exit_rules import ExitRuleConfig, ExitRuleInput, evaluate_exit +from src.strategy.position_state_machine import PositionState + + +def test_hard_stop_exit() -> None: + out = evaluate_exit( + current_state=PositionState.HOLDING, + config=ExitRuleConfig(hard_stop_pct=-2.0, arm_pct=3.0), + inp=ExitRuleInput(current_price=97.0, entry_price=100.0, peak_price=100.0), + ) + assert out.should_exit is True + assert out.reason == "hard_stop" + + +def test_take_profit_exit_for_backward_compatibility() -> None: + out = evaluate_exit( + current_state=PositionState.HOLDING, + config=ExitRuleConfig(hard_stop_pct=-2.0, arm_pct=3.0), + inp=ExitRuleInput(current_price=104.0, entry_price=100.0, peak_price=104.0), + ) + assert out.should_exit is True + assert out.reason == "arm_take_profit" + + +def test_model_assist_exit_signal() -> None: + out = evaluate_exit( + current_state=PositionState.ARMED, + config=ExitRuleConfig(model_prob_threshold=0.62, arm_pct=10.0), + inp=ExitRuleInput( + current_price=101.0, + entry_price=100.0, + peak_price=105.0, + pred_down_prob=0.8, + liquidity_weak=True, + ), + ) + assert out.should_exit is True + assert out.reason == "model_liquidity_exit" diff --git a/tests/test_strategy_state_machine.py b/tests/test_strategy_state_machine.py new file mode 100644 index 0000000..648d28d --- /dev/null +++ b/tests/test_strategy_state_machine.py @@ -0,0 +1,30 @@ +from src.strategy.position_state_machine import ( + PositionState, + StateTransitionInput, + promote_state, +) + + +def test_gap_jump_promotes_to_armed_directly() -> None: + state = promote_state( + PositionState.HOLDING, + StateTransitionInput( + unrealized_pnl_pct=4.0, + be_arm_pct=1.2, + arm_pct=2.8, + ), + ) + assert state == PositionState.ARMED + + +def test_exited_has_priority_over_promotion() -> None: + state = promote_state( + PositionState.HOLDING, + StateTransitionInput( + unrealized_pnl_pct=5.0, + be_arm_pct=1.2, + arm_pct=2.8, + hard_stop_hit=True, + ), + ) + assert state == PositionState.EXITED From 5050a4cf849adacee4b1327c43bfac11024f22e0 Mon Sep 17 00:00:00 2001 From: agentson Date: Thu, 26 Feb 2026 23:46:02 +0900 Subject: [PATCH 2/2] fix: address reviewer feedback for kill-switch enforcement and observability (#275) --- src/main.py | 45 +++++++++++++++++++-------- tests/test_main.py | 77 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 110 insertions(+), 12 deletions(-) diff --git a/src/main.py b/src/main.py index eaa2642..f1679a4 100644 --- a/src/main.py +++ b/src/main.py @@ -897,6 +897,15 @@ async def trading_cycle( trade_price = current_price trade_pnl = 0.0 if decision.action in ("BUY", "SELL"): + if KILL_SWITCH.new_orders_blocked: + logger.critical( + "KillSwitch block active: skip %s order for %s (%s)", + decision.action, + stock_code, + market.name, + ) + return + broker_held_qty = ( _extract_held_qty_from_balance( balance_data, stock_code, is_domestic=market.is_domestic @@ -966,7 +975,7 @@ async def trading_cycle( logger.warning("Fat finger notification failed: %s", notify_exc) raise # Re-raise to prevent trade except CircuitBreakerTripped as exc: - await KILL_SWITCH.trigger( + ks_report = await KILL_SWITCH.trigger( reason=f"circuit_breaker:{market.code}:{stock_code}:{exc.pnl_pct:.2f}", snapshot_state=lambda: logger.critical( "KillSwitch snapshot %s/%s pnl=%.2f threshold=%.2f", @@ -976,16 +985,13 @@ async def trading_cycle( exc.threshold, ), ) - try: - await telegram.notify_circuit_breaker( - pnl_pct=exc.pnl_pct, - threshold=exc.threshold, + if ks_report.errors: + logger.critical( + "KillSwitch step errors for %s/%s: %s", + market.code, + stock_code, + "; ".join(ks_report.errors), ) - except Exception as notify_exc: - logger.warning( - "Circuit breaker notification failed: %s", notify_exc - ) - KILL_SWITCH.clear_block() raise # 5. Send order @@ -1888,6 +1894,15 @@ async def run_daily_session( trade_pnl = 0.0 order_succeeded = True if decision.action in ("BUY", "SELL"): + if KILL_SWITCH.new_orders_blocked: + logger.critical( + "KillSwitch block active: skip %s order for %s (%s)", + decision.action, + stock_code, + market.name, + ) + continue + daily_broker_held_qty = ( _extract_held_qty_from_balance( balance_data, stock_code, is_domestic=market.is_domestic @@ -1954,7 +1969,7 @@ async def run_daily_session( logger.warning("Fat finger notification failed: %s", notify_exc) continue # Skip this order except CircuitBreakerTripped as exc: - await KILL_SWITCH.trigger( + ks_report = await KILL_SWITCH.trigger( reason=f"daily_circuit_breaker:{market.code}:{stock_code}:{exc.pnl_pct:.2f}", snapshot_state=lambda: logger.critical( "Daily KillSwitch snapshot %s/%s pnl=%.2f threshold=%.2f", @@ -1974,7 +1989,13 @@ async def run_daily_session( logger.warning( "Circuit breaker notification failed: %s", notify_exc ) - KILL_SWITCH.clear_block() + if ks_report.errors: + logger.critical( + "Daily KillSwitch step errors for %s/%s: %s", + market.code, + stock_code, + "; ".join(ks_report.errors), + ) raise # Send order diff --git a/tests/test_main.py b/tests/test_main.py index 7a5e74c..8d7cb33 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -13,6 +13,7 @@ from src.db import init_db, log_trade from src.evolution.scorecard import DailyScorecard from src.logging.decision_logger import DecisionLogger from src.main import ( + KILL_SWITCH, _apply_dashboard_flag, _determine_order_quantity, _extract_avg_price_from_balance, @@ -77,6 +78,14 @@ def _make_sell_match(stock_code: str = "005930") -> ScenarioMatch: ) +@pytest.fixture(autouse=True) +def _reset_kill_switch_state() -> None: + """Prevent cross-test leakage from global kill-switch state.""" + KILL_SWITCH.clear_block() + yield + KILL_SWITCH.clear_block() + + class TestExtractAvgPriceFromBalance: """Tests for _extract_avg_price_from_balance() (issue #249).""" @@ -5039,3 +5048,71 @@ class TestOverseasGhostPositionClose: and "[ghost-close]" in (c.kwargs.get("rationale") or "") ] assert not ghost_close_calls, "Ghost-close must NOT be triggered for non-잔고없음 errors" + + +@pytest.mark.asyncio +async def test_kill_switch_block_skips_actionable_order_execution() -> None: + """Active kill-switch must prevent actionable order execution.""" + db_conn = init_db(":memory:") + decision_logger = DecisionLogger(db_conn) + + broker = MagicMock() + broker.get_current_price = AsyncMock(return_value=(100.0, 0.5, 0.0)) + broker.get_balance = AsyncMock( + return_value={ + "output1": [], + "output2": [ + { + "tot_evlu_amt": "100000", + "dnca_tot_amt": "50000", + "pchs_amt_smtl_amt": "50000", + } + ], + } + ) + broker.send_order = AsyncMock(return_value={"msg1": "OK"}) + + market = MagicMock() + market.name = "Korea" + market.code = "KR" + market.exchange_code = "KRX" + market.is_domestic = True + + telegram = MagicMock() + telegram.notify_trade_execution = AsyncMock() + telegram.notify_fat_finger = AsyncMock() + telegram.notify_circuit_breaker = AsyncMock() + telegram.notify_scenario_matched = AsyncMock() + + settings = MagicMock() + settings.POSITION_SIZING_ENABLED = False + settings.CONFIDENCE_THRESHOLD = 80 + + try: + KILL_SWITCH.new_orders_blocked = True + await trading_cycle( + broker=broker, + overseas_broker=MagicMock(), + scenario_engine=MagicMock(evaluate=MagicMock(return_value=_make_buy_match())), + playbook=_make_playbook(), + risk=MagicMock(), + db_conn=db_conn, + decision_logger=decision_logger, + context_store=MagicMock( + get_latest_timeframe=MagicMock(return_value=None), + set_context=MagicMock(), + ), + criticality_assessor=MagicMock( + assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")), + get_timeout=MagicMock(return_value=5.0), + ), + telegram=telegram, + market=market, + stock_code="005930", + scan_candidates={}, + settings=settings, + ) + finally: + KILL_SWITCH.clear_block() + + broker.send_order.assert_not_called()