Compare commits

..

2 Commits

5 changed files with 579 additions and 0 deletions

View File

@@ -64,6 +64,9 @@ class Settings(BaseSettings):
TRADE_MODE: str = Field(default="daily", pattern="^(daily|realtime)$")
DAILY_SESSIONS: int = Field(default=4, ge=1, le=10)
SESSION_INTERVAL_HOURS: int = Field(default=6, ge=1, le=24)
ORDER_BLACKOUT_ENABLED: bool = True
ORDER_BLACKOUT_WINDOWS_KST: str = "23:30-00:10"
ORDER_BLACKOUT_QUEUE_MAX: int = Field(default=500, ge=10, le=5000)
# Pre-Market Planner
PRE_MARKET_MINUTES: int = Field(default=30, ge=10, le=120)

View File

@@ -0,0 +1,100 @@
"""Blackout policy and queued order-intent manager."""
from __future__ import annotations
from collections import deque
from dataclasses import dataclass
from datetime import UTC, datetime, time
from zoneinfo import ZoneInfo
@dataclass(frozen=True)
class BlackoutWindow:
start: time
end: time
def contains(self, kst_time: time) -> bool:
if self.start <= self.end:
return self.start <= kst_time < self.end
return kst_time >= self.start or kst_time < self.end
@dataclass
class QueuedOrderIntent:
market_code: str
exchange_code: str
stock_code: str
order_type: str
quantity: int
price: float
source: str
queued_at: datetime
attempts: int = 0
def parse_blackout_windows_kst(raw: str) -> list[BlackoutWindow]:
"""Parse comma-separated KST windows like '23:30-00:10,11:20-11:30'."""
windows: list[BlackoutWindow] = []
for token in raw.split(","):
span = token.strip()
if not span or "-" not in span:
continue
start_raw, end_raw = [part.strip() for part in span.split("-", 1)]
try:
start_h, start_m = [int(v) for v in start_raw.split(":", 1)]
end_h, end_m = [int(v) for v in end_raw.split(":", 1)]
except (ValueError, TypeError):
continue
if not (0 <= start_h <= 23 and 0 <= end_h <= 23):
continue
if not (0 <= start_m <= 59 and 0 <= end_m <= 59):
continue
windows.append(BlackoutWindow(start=time(start_h, start_m), end=time(end_h, end_m)))
return windows
class BlackoutOrderManager:
"""Tracks blackout mode and queues order intents until recovery."""
def __init__(
self,
*,
enabled: bool,
windows: list[BlackoutWindow],
max_queue_size: int = 500,
) -> None:
self.enabled = enabled
self._windows = windows
self._queue: deque[QueuedOrderIntent] = deque()
self._was_blackout = False
self._max_queue_size = max_queue_size
@property
def pending_count(self) -> int:
return len(self._queue)
def in_blackout(self, now: datetime | None = None) -> bool:
if not self.enabled or not self._windows:
return False
now = now or datetime.now(UTC)
kst_now = now.astimezone(ZoneInfo("Asia/Seoul")).timetz().replace(tzinfo=None)
return any(window.contains(kst_now) for window in self._windows)
def enqueue(self, intent: QueuedOrderIntent) -> bool:
if len(self._queue) >= self._max_queue_size:
return False
self._queue.append(intent)
return True
def pop_recovery_batch(self, now: datetime | None = None) -> list[QueuedOrderIntent]:
in_blackout_now = self.in_blackout(now)
batch: list[QueuedOrderIntent] = []
if not in_blackout_now and self._queue:
while self._queue:
batch.append(self._queue.popleft())
self._was_blackout = in_blackout_now
return batch
def requeue(self, intent: QueuedOrderIntent) -> None:
if len(self._queue) < self._max_queue_size:
self._queue.append(intent)

View File

@@ -27,6 +27,11 @@ from src.context.layer import ContextLayer
from src.context.scheduler import ContextScheduler
from src.context.store import ContextStore
from src.core.criticality import CriticalityAssessor
from src.core.blackout_manager import (
BlackoutOrderManager,
QueuedOrderIntent,
parse_blackout_windows_kst,
)
from src.core.kill_switch import KillSwitchOrchestrator
from src.core.order_policy import OrderPolicyRejected, validate_order_policy
from src.core.priority_queue import PriorityTaskQueue
@@ -53,6 +58,11 @@ from src.strategy.scenario_engine import ScenarioEngine
logger = logging.getLogger(__name__)
KILL_SWITCH = KillSwitchOrchestrator()
BLACKOUT_ORDER_MANAGER = BlackoutOrderManager(
enabled=False,
windows=[],
max_queue_size=500,
)
def safe_float(value: str | float | None, default: float = 0.0) -> float:
@@ -461,6 +471,171 @@ async def build_overseas_symbol_universe(
return ordered_unique
def _build_queued_order_intent(
*,
market: MarketInfo,
stock_code: str,
order_type: str,
quantity: int,
price: float,
source: str,
) -> QueuedOrderIntent:
return QueuedOrderIntent(
market_code=market.code,
exchange_code=market.exchange_code,
stock_code=stock_code,
order_type=order_type,
quantity=quantity,
price=price,
source=source,
queued_at=datetime.now(UTC),
)
def _maybe_queue_order_intent(
*,
market: MarketInfo,
stock_code: str,
order_type: str,
quantity: int,
price: float,
source: str,
) -> bool:
if not BLACKOUT_ORDER_MANAGER.in_blackout():
return False
queued = BLACKOUT_ORDER_MANAGER.enqueue(
_build_queued_order_intent(
market=market,
stock_code=stock_code,
order_type=order_type,
quantity=quantity,
price=price,
source=source,
)
)
if queued:
logger.warning(
"Blackout active: queued order intent %s %s (%s) qty=%d price=%.4f source=%s pending=%d",
order_type,
stock_code,
market.code,
quantity,
price,
source,
BLACKOUT_ORDER_MANAGER.pending_count,
)
else:
logger.error(
"Blackout queue full: dropped order intent %s %s (%s) qty=%d source=%s",
order_type,
stock_code,
market.code,
quantity,
source,
)
return True
async def process_blackout_recovery_orders(
*,
broker: KISBroker,
overseas_broker: OverseasBroker,
db_conn: Any,
) -> None:
intents = BLACKOUT_ORDER_MANAGER.pop_recovery_batch()
if not intents:
return
logger.info(
"Blackout recovery started: processing %d queued intents",
len(intents),
)
for intent in intents:
market = MARKETS.get(intent.market_code)
if market is None:
continue
open_position = get_open_position(db_conn, intent.stock_code, market.code)
if intent.order_type == "BUY" and open_position is not None:
logger.info(
"Drop stale queued BUY %s (%s): position already open",
intent.stock_code,
market.code,
)
continue
if intent.order_type == "SELL" and open_position is None:
logger.info(
"Drop stale queued SELL %s (%s): no open position",
intent.stock_code,
market.code,
)
continue
try:
validate_order_policy(
market=market,
order_type=intent.order_type,
price=float(intent.price),
)
if market.is_domestic:
result = await broker.send_order(
stock_code=intent.stock_code,
order_type=intent.order_type,
quantity=intent.quantity,
price=intent.price,
)
else:
result = await overseas_broker.send_overseas_order(
exchange_code=market.exchange_code,
stock_code=intent.stock_code,
order_type=intent.order_type,
quantity=intent.quantity,
price=intent.price,
)
accepted = result.get("rt_cd", "0") == "0"
if accepted:
logger.info(
"Recovered queued order executed: %s %s (%s) qty=%d price=%.4f source=%s",
intent.order_type,
intent.stock_code,
market.code,
intent.quantity,
intent.price,
intent.source,
)
continue
logger.warning(
"Recovered queued order rejected: %s %s (%s) qty=%d msg=%s",
intent.order_type,
intent.stock_code,
market.code,
intent.quantity,
result.get("msg1"),
)
except Exception as exc:
if isinstance(exc, OrderPolicyRejected):
logger.info(
"Drop queued intent by policy: %s %s (%s): %s",
intent.order_type,
intent.stock_code,
market.code,
exc,
)
continue
logger.warning(
"Recovered queued order failed: %s %s (%s): %s",
intent.order_type,
intent.stock_code,
market.code,
exc,
)
if intent.attempts < 2:
intent.attempts += 1
BLACKOUT_ORDER_MANAGER.requeue(intent)
async def trading_cycle(
broker: KISBroker,
overseas_broker: OverseasBroker,
@@ -1022,6 +1197,15 @@ async def trading_cycle(
exc.session_id,
)
return
if _maybe_queue_order_intent(
market=market,
stock_code=stock_code,
order_type=decision.action,
quantity=quantity,
price=float(order_price),
source="trading_cycle",
):
return
result = await broker.send_order(
stock_code=stock_code,
order_type=decision.action,
@@ -1060,6 +1244,15 @@ async def trading_cycle(
exc.session_id,
)
return
if _maybe_queue_order_intent(
market=market,
stock_code=stock_code,
order_type=decision.action,
quantity=quantity,
price=float(overseas_price),
source="trading_cycle",
):
return
result = await overseas_broker.send_overseas_order(
exchange_code=market.exchange_code,
stock_code=stock_code,
@@ -1583,6 +1776,11 @@ async def run_daily_session(
# Process each open market
for market in open_markets:
await process_blackout_recovery_orders(
broker=broker,
overseas_broker=overseas_broker,
db_conn=db_conn,
)
# Use market-local date for playbook keying
market_today = datetime.now(market.timezone).date()
@@ -2079,6 +2277,15 @@ async def run_daily_session(
exc.session_id,
)
continue
if _maybe_queue_order_intent(
market=market,
stock_code=stock_code,
order_type=decision.action,
quantity=quantity,
price=float(order_price),
source="run_daily_session",
):
continue
result = await broker.send_order(
stock_code=stock_code,
order_type=decision.action,
@@ -2107,6 +2314,15 @@ async def run_daily_session(
exc.session_id,
)
continue
if _maybe_queue_order_intent(
market=market,
stock_code=stock_code,
order_type=decision.action,
quantity=quantity,
price=float(order_price),
source="run_daily_session",
):
continue
result = await overseas_broker.send_overseas_order(
exchange_code=market.exchange_code,
stock_code=stock_code,
@@ -2345,6 +2561,19 @@ def _apply_dashboard_flag(settings: Settings, dashboard_flag: bool) -> Settings:
async def run(settings: Settings) -> None:
"""Main async loop — iterate over open markets on a timer."""
global BLACKOUT_ORDER_MANAGER
BLACKOUT_ORDER_MANAGER = BlackoutOrderManager(
enabled=settings.ORDER_BLACKOUT_ENABLED,
windows=parse_blackout_windows_kst(settings.ORDER_BLACKOUT_WINDOWS_KST),
max_queue_size=settings.ORDER_BLACKOUT_QUEUE_MAX,
)
logger.info(
"Blackout manager initialized: enabled=%s windows=%s queue_max=%d",
settings.ORDER_BLACKOUT_ENABLED,
settings.ORDER_BLACKOUT_WINDOWS_KST,
settings.ORDER_BLACKOUT_QUEUE_MAX,
)
broker = KISBroker(settings)
overseas_broker = OverseasBroker(broker)
brain = GeminiClient(settings)
@@ -2944,6 +3173,12 @@ async def run(settings: Settings) -> None:
if shutdown.is_set():
break
await process_blackout_recovery_orders(
broker=broker,
overseas_broker=overseas_broker,
db_conn=db_conn,
)
# Notify market open if it just opened
if not _market_states.get(market.code, False):
try:

View File

@@ -0,0 +1,81 @@
from __future__ import annotations
from datetime import UTC, datetime
from src.core.blackout_manager import (
BlackoutOrderManager,
QueuedOrderIntent,
parse_blackout_windows_kst,
)
def test_parse_blackout_windows_kst() -> None:
windows = parse_blackout_windows_kst("23:30-00:10,11:20-11:30,invalid")
assert len(windows) == 2
def test_blackout_manager_handles_cross_midnight_window() -> None:
manager = BlackoutOrderManager(
enabled=True,
windows=parse_blackout_windows_kst("23:30-00:10"),
max_queue_size=10,
)
# 2026-01-01 23:40 KST = 2026-01-01 14:40 UTC
assert manager.in_blackout(datetime(2026, 1, 1, 14, 40, tzinfo=UTC))
# 2026-01-02 00:20 KST = 2026-01-01 15:20 UTC
assert not manager.in_blackout(datetime(2026, 1, 1, 15, 20, tzinfo=UTC))
def test_recovery_batch_only_after_blackout_exit() -> None:
manager = BlackoutOrderManager(
enabled=True,
windows=parse_blackout_windows_kst("23:30-00:10"),
max_queue_size=10,
)
intent = QueuedOrderIntent(
market_code="KR",
exchange_code="KRX",
stock_code="005930",
order_type="BUY",
quantity=1,
price=100.0,
source="test",
queued_at=datetime.now(UTC),
)
assert manager.enqueue(intent)
# Inside blackout: no pop yet
inside_blackout = datetime(2026, 1, 1, 14, 40, tzinfo=UTC)
assert manager.pop_recovery_batch(inside_blackout) == []
# Outside blackout: pop full batch once
outside_blackout = datetime(2026, 1, 1, 15, 20, tzinfo=UTC)
batch = manager.pop_recovery_batch(outside_blackout)
assert len(batch) == 1
assert manager.pending_count == 0
def test_requeued_intent_is_processed_next_non_blackout_cycle() -> None:
manager = BlackoutOrderManager(
enabled=True,
windows=parse_blackout_windows_kst("23:30-00:10"),
max_queue_size=10,
)
intent = QueuedOrderIntent(
market_code="KR",
exchange_code="KRX",
stock_code="005930",
order_type="BUY",
quantity=1,
price=100.0,
source="test",
queued_at=datetime.now(UTC),
)
manager.enqueue(intent)
outside_blackout = datetime(2026, 1, 1, 15, 20, tzinfo=UTC)
first_batch = manager.pop_recovery_batch(outside_blackout)
assert len(first_batch) == 1
manager.requeue(first_batch[0])
second_batch = manager.pop_recovery_batch(outside_blackout)
assert len(second_batch) == 1

View File

@@ -27,6 +27,7 @@ from src.main import (
_start_dashboard_server,
handle_domestic_pending_orders,
handle_overseas_pending_orders,
process_blackout_recovery_orders,
run_daily_session,
safe_float,
sync_positions_from_broker,
@@ -5189,3 +5190,162 @@ async def test_order_policy_rejection_skips_order_execution() -> None:
)
broker.send_order.assert_not_called()
@pytest.mark.asyncio
async def test_blackout_queues_order_and_skips_submission() -> None:
"""When blackout is active, order submission is replaced by queueing."""
db_conn = init_db(":memory:")
decision_logger = DecisionLogger(db_conn)
broker = MagicMock()
broker.get_current_price = AsyncMock(return_value=(100.0, 0.5, 0.0))
broker.get_balance = AsyncMock(
return_value={
"output1": [],
"output2": [
{
"tot_evlu_amt": "100000",
"dnca_tot_amt": "50000",
"pchs_amt_smtl_amt": "50000",
}
],
}
)
broker.send_order = AsyncMock(return_value={"msg1": "OK"})
market = MagicMock()
market.name = "Korea"
market.code = "KR"
market.exchange_code = "KRX"
market.is_domestic = True
settings = MagicMock()
settings.POSITION_SIZING_ENABLED = False
settings.CONFIDENCE_THRESHOLD = 80
telegram = MagicMock()
telegram.notify_trade_execution = AsyncMock()
telegram.notify_fat_finger = AsyncMock()
telegram.notify_circuit_breaker = AsyncMock()
telegram.notify_scenario_matched = AsyncMock()
blackout_manager = MagicMock()
blackout_manager.in_blackout.return_value = True
blackout_manager.enqueue.return_value = True
blackout_manager.pending_count = 1
with patch("src.main.BLACKOUT_ORDER_MANAGER", blackout_manager):
await trading_cycle(
broker=broker,
overseas_broker=MagicMock(),
scenario_engine=MagicMock(evaluate=MagicMock(return_value=_make_buy_match())),
playbook=_make_playbook(),
risk=MagicMock(),
db_conn=db_conn,
decision_logger=decision_logger,
context_store=MagicMock(
get_latest_timeframe=MagicMock(return_value=None),
set_context=MagicMock(),
),
criticality_assessor=MagicMock(
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
get_timeout=MagicMock(return_value=5.0),
),
telegram=telegram,
market=market,
stock_code="005930",
scan_candidates={},
settings=settings,
)
broker.send_order.assert_not_called()
blackout_manager.enqueue.assert_called_once()
@pytest.mark.asyncio
async def test_process_blackout_recovery_executes_valid_intents() -> None:
"""Recovery must execute queued intents that pass revalidation."""
db_conn = init_db(":memory:")
broker = MagicMock()
broker.send_order = AsyncMock(return_value={"rt_cd": "0", "msg1": "OK"})
overseas_broker = MagicMock()
market = MagicMock()
market.code = "KR"
market.exchange_code = "KRX"
market.is_domestic = True
intent = MagicMock()
intent.market_code = "KR"
intent.stock_code = "005930"
intent.order_type = "BUY"
intent.quantity = 1
intent.price = 100.0
intent.source = "test"
intent.attempts = 0
blackout_manager = MagicMock()
blackout_manager.pop_recovery_batch.return_value = [intent]
with (
patch("src.main.BLACKOUT_ORDER_MANAGER", blackout_manager),
patch("src.main.MARKETS", {"KR": market}),
patch("src.main.get_open_position", return_value=None),
patch("src.main.validate_order_policy"),
):
await process_blackout_recovery_orders(
broker=broker,
overseas_broker=overseas_broker,
db_conn=db_conn,
)
broker.send_order.assert_called_once()
@pytest.mark.asyncio
async def test_process_blackout_recovery_drops_policy_rejected_intent() -> None:
"""Policy-rejected queued intents must not be requeued."""
db_conn = init_db(":memory:")
broker = MagicMock()
broker.send_order = AsyncMock(return_value={"rt_cd": "0", "msg1": "OK"})
overseas_broker = MagicMock()
market = MagicMock()
market.code = "KR"
market.exchange_code = "KRX"
market.is_domestic = True
intent = MagicMock()
intent.market_code = "KR"
intent.stock_code = "005930"
intent.order_type = "BUY"
intent.quantity = 1
intent.price = 100.0
intent.source = "test"
intent.attempts = 0
blackout_manager = MagicMock()
blackout_manager.pop_recovery_batch.return_value = [intent]
with (
patch("src.main.BLACKOUT_ORDER_MANAGER", blackout_manager),
patch("src.main.MARKETS", {"KR": market}),
patch("src.main.get_open_position", return_value=None),
patch(
"src.main.validate_order_policy",
side_effect=OrderPolicyRejected(
"blocked",
session_id="NXT_AFTER",
market_code="KR",
),
),
):
await process_blackout_recovery_orders(
broker=broker,
overseas_broker=overseas_broker,
db_conn=db_conn,
)
broker.send_order.assert_not_called()
blackout_manager.requeue.assert_not_called()