feat: implement blackout queue and recovery revalidation (TASK-CODE-008)

This commit is contained in:
agentson
2026-02-27 00:31:29 +09:00
parent 54d6cc3d7c
commit 356d085ab0
5 changed files with 579 additions and 0 deletions

View File

@@ -27,6 +27,11 @@ from src.context.layer import ContextLayer
from src.context.scheduler import ContextScheduler
from src.context.store import ContextStore
from src.core.criticality import CriticalityAssessor
from src.core.blackout_manager import (
BlackoutOrderManager,
QueuedOrderIntent,
parse_blackout_windows_kst,
)
from src.core.kill_switch import KillSwitchOrchestrator
from src.core.order_policy import OrderPolicyRejected, validate_order_policy
from src.core.priority_queue import PriorityTaskQueue
@@ -53,6 +58,11 @@ from src.strategy.scenario_engine import ScenarioEngine
logger = logging.getLogger(__name__)
KILL_SWITCH = KillSwitchOrchestrator()
BLACKOUT_ORDER_MANAGER = BlackoutOrderManager(
enabled=False,
windows=[],
max_queue_size=500,
)
def safe_float(value: str | float | None, default: float = 0.0) -> float:
@@ -461,6 +471,171 @@ async def build_overseas_symbol_universe(
return ordered_unique
def _build_queued_order_intent(
*,
market: MarketInfo,
stock_code: str,
order_type: str,
quantity: int,
price: float,
source: str,
) -> QueuedOrderIntent:
return QueuedOrderIntent(
market_code=market.code,
exchange_code=market.exchange_code,
stock_code=stock_code,
order_type=order_type,
quantity=quantity,
price=price,
source=source,
queued_at=datetime.now(UTC),
)
def _maybe_queue_order_intent(
*,
market: MarketInfo,
stock_code: str,
order_type: str,
quantity: int,
price: float,
source: str,
) -> bool:
if not BLACKOUT_ORDER_MANAGER.in_blackout():
return False
queued = BLACKOUT_ORDER_MANAGER.enqueue(
_build_queued_order_intent(
market=market,
stock_code=stock_code,
order_type=order_type,
quantity=quantity,
price=price,
source=source,
)
)
if queued:
logger.warning(
"Blackout active: queued order intent %s %s (%s) qty=%d price=%.4f source=%s pending=%d",
order_type,
stock_code,
market.code,
quantity,
price,
source,
BLACKOUT_ORDER_MANAGER.pending_count,
)
else:
logger.error(
"Blackout queue full: dropped order intent %s %s (%s) qty=%d source=%s",
order_type,
stock_code,
market.code,
quantity,
source,
)
return True
async def process_blackout_recovery_orders(
*,
broker: KISBroker,
overseas_broker: OverseasBroker,
db_conn: Any,
) -> None:
intents = BLACKOUT_ORDER_MANAGER.pop_recovery_batch()
if not intents:
return
logger.info(
"Blackout recovery started: processing %d queued intents",
len(intents),
)
for intent in intents:
market = MARKETS.get(intent.market_code)
if market is None:
continue
open_position = get_open_position(db_conn, intent.stock_code, market.code)
if intent.order_type == "BUY" and open_position is not None:
logger.info(
"Drop stale queued BUY %s (%s): position already open",
intent.stock_code,
market.code,
)
continue
if intent.order_type == "SELL" and open_position is None:
logger.info(
"Drop stale queued SELL %s (%s): no open position",
intent.stock_code,
market.code,
)
continue
try:
validate_order_policy(
market=market,
order_type=intent.order_type,
price=float(intent.price),
)
if market.is_domestic:
result = await broker.send_order(
stock_code=intent.stock_code,
order_type=intent.order_type,
quantity=intent.quantity,
price=intent.price,
)
else:
result = await overseas_broker.send_overseas_order(
exchange_code=market.exchange_code,
stock_code=intent.stock_code,
order_type=intent.order_type,
quantity=intent.quantity,
price=intent.price,
)
accepted = result.get("rt_cd", "0") == "0"
if accepted:
logger.info(
"Recovered queued order executed: %s %s (%s) qty=%d price=%.4f source=%s",
intent.order_type,
intent.stock_code,
market.code,
intent.quantity,
intent.price,
intent.source,
)
continue
logger.warning(
"Recovered queued order rejected: %s %s (%s) qty=%d msg=%s",
intent.order_type,
intent.stock_code,
market.code,
intent.quantity,
result.get("msg1"),
)
except Exception as exc:
if isinstance(exc, OrderPolicyRejected):
logger.info(
"Drop queued intent by policy: %s %s (%s): %s",
intent.order_type,
intent.stock_code,
market.code,
exc,
)
continue
logger.warning(
"Recovered queued order failed: %s %s (%s): %s",
intent.order_type,
intent.stock_code,
market.code,
exc,
)
if intent.attempts < 2:
intent.attempts += 1
BLACKOUT_ORDER_MANAGER.requeue(intent)
async def trading_cycle(
broker: KISBroker,
overseas_broker: OverseasBroker,
@@ -1022,6 +1197,15 @@ async def trading_cycle(
exc.session_id,
)
return
if _maybe_queue_order_intent(
market=market,
stock_code=stock_code,
order_type=decision.action,
quantity=quantity,
price=float(order_price),
source="trading_cycle",
):
return
result = await broker.send_order(
stock_code=stock_code,
order_type=decision.action,
@@ -1060,6 +1244,15 @@ async def trading_cycle(
exc.session_id,
)
return
if _maybe_queue_order_intent(
market=market,
stock_code=stock_code,
order_type=decision.action,
quantity=quantity,
price=float(overseas_price),
source="trading_cycle",
):
return
result = await overseas_broker.send_overseas_order(
exchange_code=market.exchange_code,
stock_code=stock_code,
@@ -1583,6 +1776,11 @@ async def run_daily_session(
# Process each open market
for market in open_markets:
await process_blackout_recovery_orders(
broker=broker,
overseas_broker=overseas_broker,
db_conn=db_conn,
)
# Use market-local date for playbook keying
market_today = datetime.now(market.timezone).date()
@@ -2079,6 +2277,15 @@ async def run_daily_session(
exc.session_id,
)
continue
if _maybe_queue_order_intent(
market=market,
stock_code=stock_code,
order_type=decision.action,
quantity=quantity,
price=float(order_price),
source="run_daily_session",
):
continue
result = await broker.send_order(
stock_code=stock_code,
order_type=decision.action,
@@ -2107,6 +2314,15 @@ async def run_daily_session(
exc.session_id,
)
continue
if _maybe_queue_order_intent(
market=market,
stock_code=stock_code,
order_type=decision.action,
quantity=quantity,
price=float(order_price),
source="run_daily_session",
):
continue
result = await overseas_broker.send_overseas_order(
exchange_code=market.exchange_code,
stock_code=stock_code,
@@ -2345,6 +2561,19 @@ def _apply_dashboard_flag(settings: Settings, dashboard_flag: bool) -> Settings:
async def run(settings: Settings) -> None:
"""Main async loop — iterate over open markets on a timer."""
global BLACKOUT_ORDER_MANAGER
BLACKOUT_ORDER_MANAGER = BlackoutOrderManager(
enabled=settings.ORDER_BLACKOUT_ENABLED,
windows=parse_blackout_windows_kst(settings.ORDER_BLACKOUT_WINDOWS_KST),
max_queue_size=settings.ORDER_BLACKOUT_QUEUE_MAX,
)
logger.info(
"Blackout manager initialized: enabled=%s windows=%s queue_max=%d",
settings.ORDER_BLACKOUT_ENABLED,
settings.ORDER_BLACKOUT_WINDOWS_KST,
settings.ORDER_BLACKOUT_QUEUE_MAX,
)
broker = KISBroker(settings)
overseas_broker = OverseasBroker(broker)
brain = GeminiClient(settings)
@@ -2944,6 +3173,12 @@ async def run(settings: Settings) -> None:
if shutdown.is_set():
break
await process_blackout_recovery_orders(
broker=broker,
overseas_broker=overseas_broker,
db_conn=db_conn,
)
# Notify market open if it just opened
if not _market_states.get(market.code, False):
try: