diff --git a/src/config.py b/src/config.py index 0a10619..7ff1c4d 100644 --- a/src/config.py +++ b/src/config.py @@ -64,6 +64,9 @@ class Settings(BaseSettings): TRADE_MODE: str = Field(default="daily", pattern="^(daily|realtime)$") DAILY_SESSIONS: int = Field(default=4, ge=1, le=10) SESSION_INTERVAL_HOURS: int = Field(default=6, ge=1, le=24) + ORDER_BLACKOUT_ENABLED: bool = True + ORDER_BLACKOUT_WINDOWS_KST: str = "23:30-00:10" + ORDER_BLACKOUT_QUEUE_MAX: int = Field(default=500, ge=10, le=5000) # Pre-Market Planner PRE_MARKET_MINUTES: int = Field(default=30, ge=10, le=120) diff --git a/src/core/blackout_manager.py b/src/core/blackout_manager.py new file mode 100644 index 0000000..9078735 --- /dev/null +++ b/src/core/blackout_manager.py @@ -0,0 +1,100 @@ +"""Blackout policy and queued order-intent manager.""" + +from __future__ import annotations + +from collections import deque +from dataclasses import dataclass +from datetime import UTC, datetime, time +from zoneinfo import ZoneInfo + + +@dataclass(frozen=True) +class BlackoutWindow: + start: time + end: time + + def contains(self, kst_time: time) -> bool: + if self.start <= self.end: + return self.start <= kst_time < self.end + return kst_time >= self.start or kst_time < self.end + + +@dataclass +class QueuedOrderIntent: + market_code: str + exchange_code: str + stock_code: str + order_type: str + quantity: int + price: float + source: str + queued_at: datetime + attempts: int = 0 + + +def parse_blackout_windows_kst(raw: str) -> list[BlackoutWindow]: + """Parse comma-separated KST windows like '23:30-00:10,11:20-11:30'.""" + windows: list[BlackoutWindow] = [] + for token in raw.split(","): + span = token.strip() + if not span or "-" not in span: + continue + start_raw, end_raw = [part.strip() for part in span.split("-", 1)] + try: + start_h, start_m = [int(v) for v in start_raw.split(":", 1)] + end_h, end_m = [int(v) for v in end_raw.split(":", 1)] + except (ValueError, TypeError): + continue + if not (0 <= start_h <= 23 and 0 <= end_h <= 23): + continue + if not (0 <= start_m <= 59 and 0 <= end_m <= 59): + continue + windows.append(BlackoutWindow(start=time(start_h, start_m), end=time(end_h, end_m))) + return windows + + +class BlackoutOrderManager: + """Tracks blackout mode and queues order intents until recovery.""" + + def __init__( + self, + *, + enabled: bool, + windows: list[BlackoutWindow], + max_queue_size: int = 500, + ) -> None: + self.enabled = enabled + self._windows = windows + self._queue: deque[QueuedOrderIntent] = deque() + self._was_blackout = False + self._max_queue_size = max_queue_size + + @property + def pending_count(self) -> int: + return len(self._queue) + + def in_blackout(self, now: datetime | None = None) -> bool: + if not self.enabled or not self._windows: + return False + now = now or datetime.now(UTC) + kst_now = now.astimezone(ZoneInfo("Asia/Seoul")).timetz().replace(tzinfo=None) + return any(window.contains(kst_now) for window in self._windows) + + def enqueue(self, intent: QueuedOrderIntent) -> bool: + if len(self._queue) >= self._max_queue_size: + return False + self._queue.append(intent) + return True + + def pop_recovery_batch(self, now: datetime | None = None) -> list[QueuedOrderIntent]: + in_blackout_now = self.in_blackout(now) + batch: list[QueuedOrderIntent] = [] + if not in_blackout_now and self._queue: + while self._queue: + batch.append(self._queue.popleft()) + self._was_blackout = in_blackout_now + return batch + + def requeue(self, intent: QueuedOrderIntent) -> None: + if len(self._queue) < self._max_queue_size: + self._queue.append(intent) diff --git a/src/main.py b/src/main.py index c9f36b9..61aee8a 100644 --- a/src/main.py +++ b/src/main.py @@ -27,6 +27,11 @@ from src.context.layer import ContextLayer from src.context.scheduler import ContextScheduler from src.context.store import ContextStore from src.core.criticality import CriticalityAssessor +from src.core.blackout_manager import ( + BlackoutOrderManager, + QueuedOrderIntent, + parse_blackout_windows_kst, +) from src.core.kill_switch import KillSwitchOrchestrator from src.core.order_policy import OrderPolicyRejected, validate_order_policy from src.core.priority_queue import PriorityTaskQueue @@ -53,6 +58,11 @@ from src.strategy.scenario_engine import ScenarioEngine logger = logging.getLogger(__name__) KILL_SWITCH = KillSwitchOrchestrator() +BLACKOUT_ORDER_MANAGER = BlackoutOrderManager( + enabled=False, + windows=[], + max_queue_size=500, +) def safe_float(value: str | float | None, default: float = 0.0) -> float: @@ -461,6 +471,171 @@ async def build_overseas_symbol_universe( return ordered_unique +def _build_queued_order_intent( + *, + market: MarketInfo, + stock_code: str, + order_type: str, + quantity: int, + price: float, + source: str, +) -> QueuedOrderIntent: + return QueuedOrderIntent( + market_code=market.code, + exchange_code=market.exchange_code, + stock_code=stock_code, + order_type=order_type, + quantity=quantity, + price=price, + source=source, + queued_at=datetime.now(UTC), + ) + + +def _maybe_queue_order_intent( + *, + market: MarketInfo, + stock_code: str, + order_type: str, + quantity: int, + price: float, + source: str, +) -> bool: + if not BLACKOUT_ORDER_MANAGER.in_blackout(): + return False + + queued = BLACKOUT_ORDER_MANAGER.enqueue( + _build_queued_order_intent( + market=market, + stock_code=stock_code, + order_type=order_type, + quantity=quantity, + price=price, + source=source, + ) + ) + if queued: + logger.warning( + "Blackout active: queued order intent %s %s (%s) qty=%d price=%.4f source=%s pending=%d", + order_type, + stock_code, + market.code, + quantity, + price, + source, + BLACKOUT_ORDER_MANAGER.pending_count, + ) + else: + logger.error( + "Blackout queue full: dropped order intent %s %s (%s) qty=%d source=%s", + order_type, + stock_code, + market.code, + quantity, + source, + ) + return True + + +async def process_blackout_recovery_orders( + *, + broker: KISBroker, + overseas_broker: OverseasBroker, + db_conn: Any, +) -> None: + intents = BLACKOUT_ORDER_MANAGER.pop_recovery_batch() + if not intents: + return + + logger.info( + "Blackout recovery started: processing %d queued intents", + len(intents), + ) + for intent in intents: + market = MARKETS.get(intent.market_code) + if market is None: + continue + + open_position = get_open_position(db_conn, intent.stock_code, market.code) + if intent.order_type == "BUY" and open_position is not None: + logger.info( + "Drop stale queued BUY %s (%s): position already open", + intent.stock_code, + market.code, + ) + continue + if intent.order_type == "SELL" and open_position is None: + logger.info( + "Drop stale queued SELL %s (%s): no open position", + intent.stock_code, + market.code, + ) + continue + + try: + validate_order_policy( + market=market, + order_type=intent.order_type, + price=float(intent.price), + ) + if market.is_domestic: + result = await broker.send_order( + stock_code=intent.stock_code, + order_type=intent.order_type, + quantity=intent.quantity, + price=intent.price, + ) + else: + result = await overseas_broker.send_overseas_order( + exchange_code=market.exchange_code, + stock_code=intent.stock_code, + order_type=intent.order_type, + quantity=intent.quantity, + price=intent.price, + ) + + accepted = result.get("rt_cd", "0") == "0" + if accepted: + logger.info( + "Recovered queued order executed: %s %s (%s) qty=%d price=%.4f source=%s", + intent.order_type, + intent.stock_code, + market.code, + intent.quantity, + intent.price, + intent.source, + ) + continue + logger.warning( + "Recovered queued order rejected: %s %s (%s) qty=%d msg=%s", + intent.order_type, + intent.stock_code, + market.code, + intent.quantity, + result.get("msg1"), + ) + except Exception as exc: + if isinstance(exc, OrderPolicyRejected): + logger.info( + "Drop queued intent by policy: %s %s (%s): %s", + intent.order_type, + intent.stock_code, + market.code, + exc, + ) + continue + logger.warning( + "Recovered queued order failed: %s %s (%s): %s", + intent.order_type, + intent.stock_code, + market.code, + exc, + ) + if intent.attempts < 2: + intent.attempts += 1 + BLACKOUT_ORDER_MANAGER.requeue(intent) + + async def trading_cycle( broker: KISBroker, overseas_broker: OverseasBroker, @@ -1022,6 +1197,15 @@ async def trading_cycle( exc.session_id, ) return + if _maybe_queue_order_intent( + market=market, + stock_code=stock_code, + order_type=decision.action, + quantity=quantity, + price=float(order_price), + source="trading_cycle", + ): + return result = await broker.send_order( stock_code=stock_code, order_type=decision.action, @@ -1060,6 +1244,15 @@ async def trading_cycle( exc.session_id, ) return + if _maybe_queue_order_intent( + market=market, + stock_code=stock_code, + order_type=decision.action, + quantity=quantity, + price=float(overseas_price), + source="trading_cycle", + ): + return result = await overseas_broker.send_overseas_order( exchange_code=market.exchange_code, stock_code=stock_code, @@ -1583,6 +1776,11 @@ async def run_daily_session( # Process each open market for market in open_markets: + await process_blackout_recovery_orders( + broker=broker, + overseas_broker=overseas_broker, + db_conn=db_conn, + ) # Use market-local date for playbook keying market_today = datetime.now(market.timezone).date() @@ -2079,6 +2277,15 @@ async def run_daily_session( exc.session_id, ) continue + if _maybe_queue_order_intent( + market=market, + stock_code=stock_code, + order_type=decision.action, + quantity=quantity, + price=float(order_price), + source="run_daily_session", + ): + continue result = await broker.send_order( stock_code=stock_code, order_type=decision.action, @@ -2107,6 +2314,15 @@ async def run_daily_session( exc.session_id, ) continue + if _maybe_queue_order_intent( + market=market, + stock_code=stock_code, + order_type=decision.action, + quantity=quantity, + price=float(order_price), + source="run_daily_session", + ): + continue result = await overseas_broker.send_overseas_order( exchange_code=market.exchange_code, stock_code=stock_code, @@ -2345,6 +2561,19 @@ def _apply_dashboard_flag(settings: Settings, dashboard_flag: bool) -> Settings: async def run(settings: Settings) -> None: """Main async loop — iterate over open markets on a timer.""" + global BLACKOUT_ORDER_MANAGER + BLACKOUT_ORDER_MANAGER = BlackoutOrderManager( + enabled=settings.ORDER_BLACKOUT_ENABLED, + windows=parse_blackout_windows_kst(settings.ORDER_BLACKOUT_WINDOWS_KST), + max_queue_size=settings.ORDER_BLACKOUT_QUEUE_MAX, + ) + logger.info( + "Blackout manager initialized: enabled=%s windows=%s queue_max=%d", + settings.ORDER_BLACKOUT_ENABLED, + settings.ORDER_BLACKOUT_WINDOWS_KST, + settings.ORDER_BLACKOUT_QUEUE_MAX, + ) + broker = KISBroker(settings) overseas_broker = OverseasBroker(broker) brain = GeminiClient(settings) @@ -2944,6 +3173,12 @@ async def run(settings: Settings) -> None: if shutdown.is_set(): break + await process_blackout_recovery_orders( + broker=broker, + overseas_broker=overseas_broker, + db_conn=db_conn, + ) + # Notify market open if it just opened if not _market_states.get(market.code, False): try: diff --git a/tests/test_blackout_manager.py b/tests/test_blackout_manager.py new file mode 100644 index 0000000..0a1bd5e --- /dev/null +++ b/tests/test_blackout_manager.py @@ -0,0 +1,81 @@ +from __future__ import annotations + +from datetime import UTC, datetime + +from src.core.blackout_manager import ( + BlackoutOrderManager, + QueuedOrderIntent, + parse_blackout_windows_kst, +) + + +def test_parse_blackout_windows_kst() -> None: + windows = parse_blackout_windows_kst("23:30-00:10,11:20-11:30,invalid") + assert len(windows) == 2 + + +def test_blackout_manager_handles_cross_midnight_window() -> None: + manager = BlackoutOrderManager( + enabled=True, + windows=parse_blackout_windows_kst("23:30-00:10"), + max_queue_size=10, + ) + # 2026-01-01 23:40 KST = 2026-01-01 14:40 UTC + assert manager.in_blackout(datetime(2026, 1, 1, 14, 40, tzinfo=UTC)) + # 2026-01-02 00:20 KST = 2026-01-01 15:20 UTC + assert not manager.in_blackout(datetime(2026, 1, 1, 15, 20, tzinfo=UTC)) + + +def test_recovery_batch_only_after_blackout_exit() -> None: + manager = BlackoutOrderManager( + enabled=True, + windows=parse_blackout_windows_kst("23:30-00:10"), + max_queue_size=10, + ) + intent = QueuedOrderIntent( + market_code="KR", + exchange_code="KRX", + stock_code="005930", + order_type="BUY", + quantity=1, + price=100.0, + source="test", + queued_at=datetime.now(UTC), + ) + assert manager.enqueue(intent) + + # Inside blackout: no pop yet + inside_blackout = datetime(2026, 1, 1, 14, 40, tzinfo=UTC) + assert manager.pop_recovery_batch(inside_blackout) == [] + + # Outside blackout: pop full batch once + outside_blackout = datetime(2026, 1, 1, 15, 20, tzinfo=UTC) + batch = manager.pop_recovery_batch(outside_blackout) + assert len(batch) == 1 + assert manager.pending_count == 0 + + +def test_requeued_intent_is_processed_next_non_blackout_cycle() -> None: + manager = BlackoutOrderManager( + enabled=True, + windows=parse_blackout_windows_kst("23:30-00:10"), + max_queue_size=10, + ) + intent = QueuedOrderIntent( + market_code="KR", + exchange_code="KRX", + stock_code="005930", + order_type="BUY", + quantity=1, + price=100.0, + source="test", + queued_at=datetime.now(UTC), + ) + manager.enqueue(intent) + outside_blackout = datetime(2026, 1, 1, 15, 20, tzinfo=UTC) + first_batch = manager.pop_recovery_batch(outside_blackout) + assert len(first_batch) == 1 + + manager.requeue(first_batch[0]) + second_batch = manager.pop_recovery_batch(outside_blackout) + assert len(second_batch) == 1 diff --git a/tests/test_main.py b/tests/test_main.py index 8c540ca..934b113 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -27,6 +27,7 @@ from src.main import ( _start_dashboard_server, handle_domestic_pending_orders, handle_overseas_pending_orders, + process_blackout_recovery_orders, run_daily_session, safe_float, sync_positions_from_broker, @@ -5189,3 +5190,162 @@ async def test_order_policy_rejection_skips_order_execution() -> None: ) broker.send_order.assert_not_called() + + +@pytest.mark.asyncio +async def test_blackout_queues_order_and_skips_submission() -> None: + """When blackout is active, order submission is replaced by queueing.""" + db_conn = init_db(":memory:") + decision_logger = DecisionLogger(db_conn) + + broker = MagicMock() + broker.get_current_price = AsyncMock(return_value=(100.0, 0.5, 0.0)) + broker.get_balance = AsyncMock( + return_value={ + "output1": [], + "output2": [ + { + "tot_evlu_amt": "100000", + "dnca_tot_amt": "50000", + "pchs_amt_smtl_amt": "50000", + } + ], + } + ) + broker.send_order = AsyncMock(return_value={"msg1": "OK"}) + + market = MagicMock() + market.name = "Korea" + market.code = "KR" + market.exchange_code = "KRX" + market.is_domestic = True + + settings = MagicMock() + settings.POSITION_SIZING_ENABLED = False + settings.CONFIDENCE_THRESHOLD = 80 + + telegram = MagicMock() + telegram.notify_trade_execution = AsyncMock() + telegram.notify_fat_finger = AsyncMock() + telegram.notify_circuit_breaker = AsyncMock() + telegram.notify_scenario_matched = AsyncMock() + + blackout_manager = MagicMock() + blackout_manager.in_blackout.return_value = True + blackout_manager.enqueue.return_value = True + blackout_manager.pending_count = 1 + + with patch("src.main.BLACKOUT_ORDER_MANAGER", blackout_manager): + await trading_cycle( + broker=broker, + overseas_broker=MagicMock(), + scenario_engine=MagicMock(evaluate=MagicMock(return_value=_make_buy_match())), + playbook=_make_playbook(), + risk=MagicMock(), + db_conn=db_conn, + decision_logger=decision_logger, + context_store=MagicMock( + get_latest_timeframe=MagicMock(return_value=None), + set_context=MagicMock(), + ), + criticality_assessor=MagicMock( + assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")), + get_timeout=MagicMock(return_value=5.0), + ), + telegram=telegram, + market=market, + stock_code="005930", + scan_candidates={}, + settings=settings, + ) + + broker.send_order.assert_not_called() + blackout_manager.enqueue.assert_called_once() + + +@pytest.mark.asyncio +async def test_process_blackout_recovery_executes_valid_intents() -> None: + """Recovery must execute queued intents that pass revalidation.""" + db_conn = init_db(":memory:") + broker = MagicMock() + broker.send_order = AsyncMock(return_value={"rt_cd": "0", "msg1": "OK"}) + overseas_broker = MagicMock() + + market = MagicMock() + market.code = "KR" + market.exchange_code = "KRX" + market.is_domestic = True + + intent = MagicMock() + intent.market_code = "KR" + intent.stock_code = "005930" + intent.order_type = "BUY" + intent.quantity = 1 + intent.price = 100.0 + intent.source = "test" + intent.attempts = 0 + + blackout_manager = MagicMock() + blackout_manager.pop_recovery_batch.return_value = [intent] + + with ( + patch("src.main.BLACKOUT_ORDER_MANAGER", blackout_manager), + patch("src.main.MARKETS", {"KR": market}), + patch("src.main.get_open_position", return_value=None), + patch("src.main.validate_order_policy"), + ): + await process_blackout_recovery_orders( + broker=broker, + overseas_broker=overseas_broker, + db_conn=db_conn, + ) + + broker.send_order.assert_called_once() + + +@pytest.mark.asyncio +async def test_process_blackout_recovery_drops_policy_rejected_intent() -> None: + """Policy-rejected queued intents must not be requeued.""" + db_conn = init_db(":memory:") + broker = MagicMock() + broker.send_order = AsyncMock(return_value={"rt_cd": "0", "msg1": "OK"}) + overseas_broker = MagicMock() + + market = MagicMock() + market.code = "KR" + market.exchange_code = "KRX" + market.is_domestic = True + + intent = MagicMock() + intent.market_code = "KR" + intent.stock_code = "005930" + intent.order_type = "BUY" + intent.quantity = 1 + intent.price = 100.0 + intent.source = "test" + intent.attempts = 0 + + blackout_manager = MagicMock() + blackout_manager.pop_recovery_batch.return_value = [intent] + + with ( + patch("src.main.BLACKOUT_ORDER_MANAGER", blackout_manager), + patch("src.main.MARKETS", {"KR": market}), + patch("src.main.get_open_position", return_value=None), + patch( + "src.main.validate_order_policy", + side_effect=OrderPolicyRejected( + "blocked", + session_id="NXT_AFTER", + market_code="KR", + ), + ), + ): + await process_blackout_recovery_orders( + broker=broker, + overseas_broker=overseas_broker, + db_conn=db_conn, + ) + + broker.send_order.assert_not_called() + blackout_manager.requeue.assert_not_called()