Compare commits

..

11 Commits

Author SHA1 Message Date
agentson
9f64c9944a fix: correct short-side tie-break semantics in triple barrier 2026-02-27 00:47:09 +09:00
agentson
bb391d502c feat: add triple barrier labeler with first-touch logic (TASK-CODE-004) 2026-02-27 00:45:18 +09:00
b0100fde10 Merge pull request '[RISK-EMERGENCY][SCN-FAIL-003] TKT-P0-002 Kill Switch 순서 강제 검증 자동화' (#284) from feature/issue-tkt-p0-002-killswitch-ordering into feature/v3-session-policy-stream 2026-02-27 00:42:16 +09:00
agentson
0a4e69d40c fix: record kill switch cancel failures and add failure-path tests 2026-02-27 00:41:13 +09:00
agentson
25401ac132 feat: enforce operational kill switch callbacks in runtime flow (TASK-CODE-003) 2026-02-27 00:38:26 +09:00
1381b140ab Merge pull request '[EXEC-POLICY][SCN-FAIL-001] TKT-P0-001 블랙아웃 차단/큐/복구 재검증' (#282) from feature/issue-tkt-p0-001-blackout-queue-revalidate into feature/v3-session-policy-stream 2026-02-27 00:32:59 +09:00
agentson
356d085ab0 feat: implement blackout queue and recovery revalidation (TASK-CODE-008) 2026-02-27 00:31:29 +09:00
54d6cc3d7c Merge pull request 'docs: feature-branch 팀 운영 규칙 및 모니터링 검증 게이트 반영 (#279)' (#280) from feature/issue-279-session-order-policy-guard into feature/v3-session-policy-stream 2026-02-27 00:19:55 +09:00
agentson
3ffad58d57 docs: allow ticket->feature merges without user approval; keep main gated (#279) 2026-02-27 00:19:51 +09:00
agentson
df6baee7f1 feat: add session-aware order policy guard for low-liquidity market-order rejection (#279) 2026-02-27 00:13:47 +09:00
agentson
c31a6a569d docs: enforce feature-branch team flow and mandatory runtime monitoring validation (#279) 2026-02-27 00:05:01 +09:00
13 changed files with 1475 additions and 38 deletions

View File

@@ -43,6 +43,11 @@ Updated: 2026-02-26
- 기존 `tests/` 스위트 전량 실행
- 신규 기능 플래그 ON/OFF 비교
4. 구동/모니터링 검증 (필수)
- 개발 완료 후 시스템을 실제 구동해 핵심 경로를 관찰
- 필수 관찰 항목: 주문 차단 정책, Kill Switch 동작, 경보/예외 로그, 세션 전환 로그
- Runtime Verifier 코멘트로 증적(실행 명령/요약 로그) 첨부
## 실행 명령
```bash
@@ -55,3 +60,4 @@ python3 scripts/validate_ouroboros_docs.py
- 문서 검증 실패 시 구현 PR 병합 금지
- `REQ-*` 변경 후 테스트 매핑 누락 시 병합 금지
- 회귀 실패 시 원인 모듈 분리 후 재검증
- 구동/모니터링 증적 누락 시 검증 승인 금지

View File

@@ -150,6 +150,10 @@ TPM 티켓 운영 규칙:
- PR 본문에는 TPM이 지정한 우선순위와 범위가 그대로 반영되어야 한다.
- 우선순위 변경은 TPM 제안 + Main Agent 승인으로만 가능하다.
브랜치 운영 규칙:
- TPM은 각 티켓에 대해 `ticket temp branch -> program feature branch` PR 경로를 지정한다.
- 티켓 머지 대상은 항상 program feature branch이며, `main`은 최종 통합 단계에서만 사용한다.
## Runtime Verification Protocol
- Runtime Verifier는 테스트 통과 이후 실제 동작(스테이징/실운영)을 모니터링한다.
@@ -159,12 +163,16 @@ TPM 티켓 운영 규칙:
- 이슈 클로즈 규칙:
- Dev 수정 완료 + Verifier 재검증 통과 + Runtime Verifier 재관측 정상
- 최종 클로즈 승인자는 Main Agent
- 개발 완료 필수 절차:
- 시스템 실제 구동(스테이징/로컬 실운영 모드) 실행
- 모니터링 체크리스트(핵심 경보/주문 경로/예외 로그) 수행
- 결과를 티켓/PR 코멘트에 증적으로 첨부하지 않으면 완료로 간주하지 않음
## Server Reflection Rule (No-Merge by Default)
## Server Reflection Rule
- 서버 반영 기본 규칙은 `브랜치 푸시 + PR 생성/코멘트`까지로 제한한다.
- 기본 흐름에서 검증 승인 후 자동/수동 머지 실행은 금지한다.
- 예외는 사용자 명시 승인 시에만 허용되며, Main Agent가 예외 근거를 PR에 기록한다.
- `ticket temp branch -> program feature branch` 머지는 검증 승인 후 자동/수동 진행 가능하다.
- `program feature branch -> main` 머지는 사용자 명시 승인 시에만 허용한다.
- Main 병합 시 Main Agent가 승인 근거를 PR 코멘트에 기록한다.
## Acceptance Matrix (PM Scenario -> Dev Tasks -> Verifier Checks)

View File

@@ -50,10 +50,12 @@ Updated: 2026-02-26
- PR 본문에 `REQ-*`, `TASK-*`, `TEST-*` 매핑 표 존재
- `src/core/risk_manager.py` 변경 없음
- 주요 의사결정 체크포인트(DCP-01~04) 중 해당 단계 Main Agent 확인 기록 존재
- 티켓 PR의 base가 `main`이 아닌 program feature branch인지 확인
자동 점검:
- 문서 검증 스크립트 통과
- 테스트 통과
- 개발 완료 시 시스템 구동/모니터링 증적 코멘트 존재
## 5) 감사 추적
@@ -87,8 +89,14 @@ Updated: 2026-02-26
- `REPLAN-REQUEST`는 Main Agent 승인 전 \"제안\" 상태로 유지
- 승인된 재계획은 `REQ/TASK/TEST` 문서를 동시 갱신해야 유효
## 9) 서버 반영 규칙 (No-Merge by Default)
## 9) 서버 반영 규칙
- 서버 반영은 `브랜치 푸시 + PR 코멘트(리뷰/논의/검증승인)`까지를 기본으로 한다.
- 기본 규칙에서 `tea pulls merge` 실행 금지한다.
- 사용자 명시 승인 시에만 예외적으로 머지를 허용한다(예외 근거를 PR 코멘트에 기록).
- 티켓 PR(`feature/issue-* -> feature/{stream}`)은 검증 승인 후 머지 가능하다.
- 최종 통합 PR(`feature/{stream} -> main`)은 사용자 명시 승인 전 `tea pulls merge` 실행 금지.
- Main 병합 시 승인 근거 코멘트 필수.
## 10) 최종 main 병합 조건
- 모든 티켓이 program feature branch로 병합 완료
- Runtime Verifier의 구동/모니터링 검증 완료
- 사용자 최종 승인 코멘트 확인 후에만 `feature -> main` PR 머지 허용

View File

@@ -5,14 +5,24 @@
**CRITICAL: All code changes MUST follow this workflow. Direct pushes to `main` are ABSOLUTELY PROHIBITED.**
1. **Create Gitea Issue First** — All features, bug fixes, and policy changes require a Gitea issue before any code is written
2. **Create Feature Branch** — Branch from `main` using format `feature/issue-{N}-{short-description}`
- After creating the branch, run `git pull origin main` and rebase to ensure the branch is up to date
3. **Implement Changes** — Write code, tests, and documentation on the feature branch
4. **Create Pull Request** — Submit PR to `main` branch referencing the issue number
5. **Review & Merge** — After approval, merge via PR (squash or merge commit)
2. **Create Program Feature Branch** — Branch from `main` for the whole development stream
- Format: `feature/{epic-or-stream-name}`
3. **Create Ticket Temp Branch** — Branch from the program feature branch per ticket
- Format: `feature/issue-{N}-{short-description}`
4. **Implement Per Ticket** — Write code, tests, and documentation on the ticket temp branch
5. **Create Pull Request to Program Feature Branch**`feature/issue-N-* -> feature/{stream}`
6. **Review/Verify and Merge into Program Feature Branch** — user approval not required
7. **Final Integration PR to main** — Only after all ticket stages complete and explicit user approval
**Never commit directly to `main`.** This policy applies to all changes, no exceptions.
## Branch Strategy (Mandatory)
- Team operation default branch is the **program feature branch**, not `main`.
- Ticket-level development happens only on **ticket temp branches** cut from the program feature branch.
- Ticket PR merges into program feature branch are allowed after verifier approval.
- Until final user sign-off, `main` merge is prohibited.
## Gitea CLI Formatting Troubleshooting
Issue/PR 본문 작성 시 줄바꿈(`\n`)이 문자열 그대로 저장되는 문제가 반복될 수 있다. 원인은 `-d "...\n..."` 형태에서 쉘/CLI가 이스케이프를 실제 개행으로 해석하지 않기 때문이다.

View File

@@ -0,0 +1,111 @@
"""Triple barrier labeler utilities.
Implements first-touch labeling with upper/lower/time barriers.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Literal, Sequence
TieBreakMode = Literal["stop_first", "take_first"]
@dataclass(frozen=True)
class TripleBarrierSpec:
take_profit_pct: float
stop_loss_pct: float
max_holding_bars: int
tie_break: TieBreakMode = "stop_first"
@dataclass(frozen=True)
class TripleBarrierLabel:
label: int # +1 take-profit first, -1 stop-loss first, 0 timeout
touched: Literal["take_profit", "stop_loss", "time"]
touch_bar: int
entry_price: float
upper_barrier: float
lower_barrier: float
def label_with_triple_barrier(
*,
highs: Sequence[float],
lows: Sequence[float],
closes: Sequence[float],
entry_index: int,
side: int,
spec: TripleBarrierSpec,
) -> TripleBarrierLabel:
"""Label one entry using triple-barrier first-touch rule.
Args:
highs/lows/closes: OHLC components with identical length.
entry_index: Entry bar index in the sequences.
side: +1 for long, -1 for short.
spec: Barrier specification.
"""
if side not in {1, -1}:
raise ValueError("side must be +1 or -1")
if len(highs) != len(lows) or len(highs) != len(closes):
raise ValueError("highs, lows, closes lengths must match")
if entry_index < 0 or entry_index >= len(closes):
raise IndexError("entry_index out of range")
if spec.max_holding_bars <= 0:
raise ValueError("max_holding_bars must be positive")
entry_price = float(closes[entry_index])
if entry_price <= 0:
raise ValueError("entry price must be positive")
if side == 1:
upper = entry_price * (1.0 + spec.take_profit_pct)
lower = entry_price * (1.0 - spec.stop_loss_pct)
else:
# For short side, favorable move is down.
upper = entry_price * (1.0 + spec.stop_loss_pct)
lower = entry_price * (1.0 - spec.take_profit_pct)
last_index = min(len(closes) - 1, entry_index + spec.max_holding_bars)
for idx in range(entry_index + 1, last_index + 1):
h = float(highs[idx])
l = float(lows[idx])
up_touch = h >= upper
down_touch = l <= lower
if not up_touch and not down_touch:
continue
if up_touch and down_touch:
if spec.tie_break == "stop_first":
touched = "stop_loss"
label = -1
else:
touched = "take_profit"
label = 1
elif up_touch:
touched = "take_profit" if side == 1 else "stop_loss"
label = 1 if side == 1 else -1
else:
touched = "stop_loss" if side == 1 else "take_profit"
label = -1 if side == 1 else 1
return TripleBarrierLabel(
label=label,
touched=touched,
touch_bar=idx,
entry_price=entry_price,
upper_barrier=upper,
lower_barrier=lower,
)
return TripleBarrierLabel(
label=0,
touched="time",
touch_bar=last_index,
entry_price=entry_price,
upper_barrier=upper,
lower_barrier=lower,
)

View File

@@ -64,6 +64,9 @@ class Settings(BaseSettings):
TRADE_MODE: str = Field(default="daily", pattern="^(daily|realtime)$")
DAILY_SESSIONS: int = Field(default=4, ge=1, le=10)
SESSION_INTERVAL_HOURS: int = Field(default=6, ge=1, le=24)
ORDER_BLACKOUT_ENABLED: bool = True
ORDER_BLACKOUT_WINDOWS_KST: str = "23:30-00:10"
ORDER_BLACKOUT_QUEUE_MAX: int = Field(default=500, ge=10, le=5000)
# Pre-Market Planner
PRE_MARKET_MINUTES: int = Field(default=30, ge=10, le=120)

View File

@@ -0,0 +1,105 @@
"""Blackout policy and queued order-intent manager."""
from __future__ import annotations
from collections import deque
from dataclasses import dataclass
from datetime import UTC, datetime, time
from zoneinfo import ZoneInfo
@dataclass(frozen=True)
class BlackoutWindow:
start: time
end: time
def contains(self, kst_time: time) -> bool:
if self.start <= self.end:
return self.start <= kst_time < self.end
return kst_time >= self.start or kst_time < self.end
@dataclass
class QueuedOrderIntent:
market_code: str
exchange_code: str
stock_code: str
order_type: str
quantity: int
price: float
source: str
queued_at: datetime
attempts: int = 0
def parse_blackout_windows_kst(raw: str) -> list[BlackoutWindow]:
"""Parse comma-separated KST windows like '23:30-00:10,11:20-11:30'."""
windows: list[BlackoutWindow] = []
for token in raw.split(","):
span = token.strip()
if not span or "-" not in span:
continue
start_raw, end_raw = [part.strip() for part in span.split("-", 1)]
try:
start_h, start_m = [int(v) for v in start_raw.split(":", 1)]
end_h, end_m = [int(v) for v in end_raw.split(":", 1)]
except (ValueError, TypeError):
continue
if not (0 <= start_h <= 23 and 0 <= end_h <= 23):
continue
if not (0 <= start_m <= 59 and 0 <= end_m <= 59):
continue
windows.append(BlackoutWindow(start=time(start_h, start_m), end=time(end_h, end_m)))
return windows
class BlackoutOrderManager:
"""Tracks blackout mode and queues order intents until recovery."""
def __init__(
self,
*,
enabled: bool,
windows: list[BlackoutWindow],
max_queue_size: int = 500,
) -> None:
self.enabled = enabled
self._windows = windows
self._queue: deque[QueuedOrderIntent] = deque()
self._was_blackout = False
self._max_queue_size = max_queue_size
@property
def pending_count(self) -> int:
return len(self._queue)
def in_blackout(self, now: datetime | None = None) -> bool:
if not self.enabled or not self._windows:
return False
now = now or datetime.now(UTC)
kst_now = now.astimezone(ZoneInfo("Asia/Seoul")).timetz().replace(tzinfo=None)
return any(window.contains(kst_now) for window in self._windows)
def enqueue(self, intent: QueuedOrderIntent) -> bool:
if len(self._queue) >= self._max_queue_size:
return False
self._queue.append(intent)
return True
def pop_recovery_batch(self, now: datetime | None = None) -> list[QueuedOrderIntent]:
in_blackout_now = self.in_blackout(now)
batch: list[QueuedOrderIntent] = []
if not in_blackout_now and self._queue:
while self._queue:
batch.append(self._queue.popleft())
self._was_blackout = in_blackout_now
return batch
def requeue(self, intent: QueuedOrderIntent) -> None:
if len(self._queue) < self._max_queue_size:
self._queue.append(intent)
def clear(self) -> int:
count = len(self._queue)
self._queue.clear()
return count

93
src/core/order_policy.py Normal file
View File

@@ -0,0 +1,93 @@
"""Session-aware order policy guards.
Default policy:
- Low-liquidity sessions must reject market orders (price <= 0).
"""
from __future__ import annotations
from dataclasses import dataclass
from datetime import UTC, datetime, time
from zoneinfo import ZoneInfo
from src.markets.schedule import MarketInfo
_LOW_LIQUIDITY_SESSIONS = {"NXT_AFTER", "US_PRE", "US_DAY", "US_AFTER"}
class OrderPolicyRejected(Exception):
"""Raised when an order violates session policy."""
def __init__(self, message: str, *, session_id: str, market_code: str) -> None:
super().__init__(message)
self.session_id = session_id
self.market_code = market_code
@dataclass(frozen=True)
class SessionInfo:
session_id: str
is_low_liquidity: bool
def classify_session_id(market: MarketInfo, now: datetime | None = None) -> str:
"""Classify current session by KST schedule used in v3 docs."""
now = now or datetime.now(UTC)
# v3 session tables are explicitly defined in KST perspective.
kst_time = now.astimezone(ZoneInfo("Asia/Seoul")).timetz().replace(tzinfo=None)
if market.code == "KR":
if time(8, 0) <= kst_time < time(8, 50):
return "NXT_PRE"
if time(9, 0) <= kst_time < time(15, 30):
return "KRX_REG"
if time(15, 30) <= kst_time < time(20, 0):
return "NXT_AFTER"
return "KR_OFF"
if market.code.startswith("US"):
if time(10, 0) <= kst_time < time(18, 0):
return "US_DAY"
if time(18, 0) <= kst_time < time(23, 30):
return "US_PRE"
if time(23, 30) <= kst_time or kst_time < time(6, 0):
return "US_REG"
if time(6, 0) <= kst_time < time(7, 0):
return "US_AFTER"
return "US_OFF"
return "GENERIC_REG"
def get_session_info(market: MarketInfo, now: datetime | None = None) -> SessionInfo:
session_id = classify_session_id(market, now)
return SessionInfo(session_id=session_id, is_low_liquidity=session_id in _LOW_LIQUIDITY_SESSIONS)
def validate_order_policy(
*,
market: MarketInfo,
order_type: str,
price: float,
now: datetime | None = None,
) -> SessionInfo:
"""Validate order against session policy and return resolved session info."""
info = get_session_info(market, now)
is_market_order = price <= 0
if info.is_low_liquidity and is_market_order:
raise OrderPolicyRejected(
f"Market order is forbidden in low-liquidity session ({info.session_id})",
session_id=info.session_id,
market_code=market.code,
)
# Guard against accidental unsupported actions.
if order_type not in {"BUY", "SELL"}:
raise OrderPolicyRejected(
f"Unsupported order_type={order_type}",
session_id=info.session_id,
market_code=market.code,
)
return info

View File

@@ -27,7 +27,13 @@ from src.context.layer import ContextLayer
from src.context.scheduler import ContextScheduler
from src.context.store import ContextStore
from src.core.criticality import CriticalityAssessor
from src.core.blackout_manager import (
BlackoutOrderManager,
QueuedOrderIntent,
parse_blackout_windows_kst,
)
from src.core.kill_switch import KillSwitchOrchestrator
from src.core.order_policy import OrderPolicyRejected, validate_order_policy
from src.core.priority_queue import PriorityTaskQueue
from src.core.risk_manager import CircuitBreakerTripped, FatFingerRejected, RiskManager
from src.db import (
@@ -52,6 +58,11 @@ from src.strategy.scenario_engine import ScenarioEngine
logger = logging.getLogger(__name__)
KILL_SWITCH = KillSwitchOrchestrator()
BLACKOUT_ORDER_MANAGER = BlackoutOrderManager(
enabled=False,
windows=[],
max_queue_size=500,
)
def safe_float(value: str | float | None, default: float = 0.0) -> float:
@@ -460,6 +471,352 @@ async def build_overseas_symbol_universe(
return ordered_unique
def _build_queued_order_intent(
*,
market: MarketInfo,
stock_code: str,
order_type: str,
quantity: int,
price: float,
source: str,
) -> QueuedOrderIntent:
return QueuedOrderIntent(
market_code=market.code,
exchange_code=market.exchange_code,
stock_code=stock_code,
order_type=order_type,
quantity=quantity,
price=price,
source=source,
queued_at=datetime.now(UTC),
)
def _maybe_queue_order_intent(
*,
market: MarketInfo,
stock_code: str,
order_type: str,
quantity: int,
price: float,
source: str,
) -> bool:
if not BLACKOUT_ORDER_MANAGER.in_blackout():
return False
queued = BLACKOUT_ORDER_MANAGER.enqueue(
_build_queued_order_intent(
market=market,
stock_code=stock_code,
order_type=order_type,
quantity=quantity,
price=price,
source=source,
)
)
if queued:
logger.warning(
"Blackout active: queued order intent %s %s (%s) qty=%d price=%.4f source=%s pending=%d",
order_type,
stock_code,
market.code,
quantity,
price,
source,
BLACKOUT_ORDER_MANAGER.pending_count,
)
else:
logger.error(
"Blackout queue full: dropped order intent %s %s (%s) qty=%d source=%s",
order_type,
stock_code,
market.code,
quantity,
source,
)
return True
async def process_blackout_recovery_orders(
*,
broker: KISBroker,
overseas_broker: OverseasBroker,
db_conn: Any,
) -> None:
intents = BLACKOUT_ORDER_MANAGER.pop_recovery_batch()
if not intents:
return
logger.info(
"Blackout recovery started: processing %d queued intents",
len(intents),
)
for intent in intents:
market = MARKETS.get(intent.market_code)
if market is None:
continue
open_position = get_open_position(db_conn, intent.stock_code, market.code)
if intent.order_type == "BUY" and open_position is not None:
logger.info(
"Drop stale queued BUY %s (%s): position already open",
intent.stock_code,
market.code,
)
continue
if intent.order_type == "SELL" and open_position is None:
logger.info(
"Drop stale queued SELL %s (%s): no open position",
intent.stock_code,
market.code,
)
continue
try:
validate_order_policy(
market=market,
order_type=intent.order_type,
price=float(intent.price),
)
if market.is_domestic:
result = await broker.send_order(
stock_code=intent.stock_code,
order_type=intent.order_type,
quantity=intent.quantity,
price=intent.price,
)
else:
result = await overseas_broker.send_overseas_order(
exchange_code=market.exchange_code,
stock_code=intent.stock_code,
order_type=intent.order_type,
quantity=intent.quantity,
price=intent.price,
)
accepted = result.get("rt_cd", "0") == "0"
if accepted:
logger.info(
"Recovered queued order executed: %s %s (%s) qty=%d price=%.4f source=%s",
intent.order_type,
intent.stock_code,
market.code,
intent.quantity,
intent.price,
intent.source,
)
continue
logger.warning(
"Recovered queued order rejected: %s %s (%s) qty=%d msg=%s",
intent.order_type,
intent.stock_code,
market.code,
intent.quantity,
result.get("msg1"),
)
except Exception as exc:
if isinstance(exc, OrderPolicyRejected):
logger.info(
"Drop queued intent by policy: %s %s (%s): %s",
intent.order_type,
intent.stock_code,
market.code,
exc,
)
continue
logger.warning(
"Recovered queued order failed: %s %s (%s): %s",
intent.order_type,
intent.stock_code,
market.code,
exc,
)
if intent.attempts < 2:
intent.attempts += 1
BLACKOUT_ORDER_MANAGER.requeue(intent)
def _resolve_kill_switch_markets(
*,
settings: Settings | None,
current_market: MarketInfo | None,
) -> list[MarketInfo]:
if settings is not None:
markets: list[MarketInfo] = []
seen: set[str] = set()
for market_code in settings.enabled_market_list:
market = MARKETS.get(market_code)
if market is None or market.code in seen:
continue
markets.append(market)
seen.add(market.code)
if markets:
return markets
if current_market is not None:
return [current_market]
return []
async def _cancel_pending_orders_for_kill_switch(
*,
broker: KISBroker,
overseas_broker: OverseasBroker,
markets: list[MarketInfo],
) -> None:
failures: list[str] = []
domestic = [m for m in markets if m.is_domestic]
overseas = [m for m in markets if not m.is_domestic]
if domestic:
try:
orders = await broker.get_domestic_pending_orders()
except Exception as exc:
logger.warning("KillSwitch: failed to fetch domestic pending orders: %s", exc)
orders = []
for order in orders:
stock_code = str(order.get("pdno", ""))
try:
orgn_odno = order.get("orgn_odno", "")
krx_fwdg_ord_orgno = order.get("ord_gno_brno", "")
psbl_qty = int(order.get("psbl_qty", "0") or "0")
if not stock_code or not orgn_odno or psbl_qty <= 0:
continue
cancel_result = await broker.cancel_domestic_order(
stock_code=stock_code,
orgn_odno=orgn_odno,
krx_fwdg_ord_orgno=krx_fwdg_ord_orgno,
qty=psbl_qty,
)
if cancel_result.get("rt_cd") != "0":
failures.append(
"domestic cancel failed for"
f" {stock_code}: rt_cd={cancel_result.get('rt_cd')}"
f" msg={cancel_result.get('msg1')}"
)
except Exception as exc:
logger.warning("KillSwitch: domestic cancel failed: %s", exc)
failures.append(f"domestic cancel exception for {stock_code}: {exc}")
us_exchanges = frozenset({"NASD", "NYSE", "AMEX"})
exchange_codes: list[str] = []
seen_us = False
for market in overseas:
exc_code = market.exchange_code
if exc_code in us_exchanges:
if not seen_us:
exchange_codes.append("NASD")
seen_us = True
elif exc_code not in exchange_codes:
exchange_codes.append(exc_code)
for exchange_code in exchange_codes:
try:
orders = await overseas_broker.get_overseas_pending_orders(exchange_code)
except Exception as exc:
logger.warning(
"KillSwitch: failed to fetch overseas pending orders for %s: %s",
exchange_code,
exc,
)
continue
for order in orders:
stock_code = str(order.get("pdno", ""))
order_exchange = str(order.get("ovrs_excg_cd") or exchange_code)
try:
odno = order.get("odno", "")
nccs_qty = int(order.get("nccs_qty", "0") or "0")
if not stock_code or not odno or nccs_qty <= 0:
continue
cancel_result = await overseas_broker.cancel_overseas_order(
exchange_code=order_exchange,
stock_code=stock_code,
odno=odno,
qty=nccs_qty,
)
if cancel_result.get("rt_cd") != "0":
failures.append(
"overseas cancel failed for"
f" {order_exchange}/{stock_code}: rt_cd={cancel_result.get('rt_cd')}"
f" msg={cancel_result.get('msg1')}"
)
except Exception as exc:
logger.warning("KillSwitch: overseas cancel failed: %s", exc)
failures.append(
f"overseas cancel exception for {order_exchange}/{stock_code}: {exc}"
)
if failures:
raise RuntimeError("; ".join(failures[:3]))
async def _refresh_order_state_for_kill_switch(
*,
broker: KISBroker,
overseas_broker: OverseasBroker,
markets: list[MarketInfo],
) -> None:
seen_overseas: set[str] = set()
for market in markets:
try:
if market.is_domestic:
await broker.get_balance()
elif market.exchange_code not in seen_overseas:
seen_overseas.add(market.exchange_code)
await overseas_broker.get_overseas_balance(market.exchange_code)
except Exception as exc:
logger.warning(
"KillSwitch: refresh state failed for %s/%s: %s",
market.code,
market.exchange_code,
exc,
)
def _reduce_risk_for_kill_switch() -> None:
dropped = BLACKOUT_ORDER_MANAGER.clear()
logger.critical("KillSwitch: reduced queued order risk by clearing %d queued intents", dropped)
async def _trigger_emergency_kill_switch(
*,
reason: str,
broker: KISBroker,
overseas_broker: OverseasBroker,
telegram: TelegramClient,
settings: Settings | None,
current_market: MarketInfo | None,
stock_code: str,
pnl_pct: float,
threshold: float,
) -> Any:
markets = _resolve_kill_switch_markets(settings=settings, current_market=current_market)
return await KILL_SWITCH.trigger(
reason=reason,
cancel_pending_orders=lambda: _cancel_pending_orders_for_kill_switch(
broker=broker,
overseas_broker=overseas_broker,
markets=markets,
),
refresh_order_state=lambda: _refresh_order_state_for_kill_switch(
broker=broker,
overseas_broker=overseas_broker,
markets=markets,
),
reduce_risk=_reduce_risk_for_kill_switch,
snapshot_state=lambda: logger.critical(
"KillSwitch snapshot %s/%s pnl=%.2f threshold=%.2f",
current_market.code if current_market else "UNKNOWN",
stock_code,
pnl_pct,
threshold,
),
notify=lambda: telegram.notify_circuit_breaker(
pnl_pct=pnl_pct,
threshold=threshold,
),
)
async def trading_cycle(
broker: KISBroker,
overseas_broker: OverseasBroker,
@@ -975,15 +1332,16 @@ async def trading_cycle(
logger.warning("Fat finger notification failed: %s", notify_exc)
raise # Re-raise to prevent trade
except CircuitBreakerTripped as exc:
ks_report = await KILL_SWITCH.trigger(
ks_report = await _trigger_emergency_kill_switch(
reason=f"circuit_breaker:{market.code}:{stock_code}:{exc.pnl_pct:.2f}",
snapshot_state=lambda: logger.critical(
"KillSwitch snapshot %s/%s pnl=%.2f threshold=%.2f",
market.code,
stock_code,
exc.pnl_pct,
exc.threshold,
),
broker=broker,
overseas_broker=overseas_broker,
telegram=telegram,
settings=settings,
current_market=market,
stock_code=stock_code,
pnl_pct=exc.pnl_pct,
threshold=exc.threshold,
)
if ks_report.errors:
logger.critical(
@@ -1005,6 +1363,31 @@ async def trading_cycle(
order_price = kr_round_down(current_price * 1.002)
else:
order_price = kr_round_down(current_price * 0.998)
try:
validate_order_policy(
market=market,
order_type=decision.action,
price=float(order_price),
)
except OrderPolicyRejected as exc:
logger.warning(
"Order policy rejected %s %s (%s): %s [session=%s]",
decision.action,
stock_code,
market.name,
exc,
exc.session_id,
)
return
if _maybe_queue_order_intent(
market=market,
stock_code=stock_code,
order_type=decision.action,
quantity=quantity,
price=float(order_price),
source="trading_cycle",
):
return
result = await broker.send_order(
stock_code=stock_code,
order_type=decision.action,
@@ -1027,6 +1410,31 @@ async def trading_cycle(
overseas_price = round(current_price * 1.002, _price_decimals)
else:
overseas_price = round(current_price * 0.998, _price_decimals)
try:
validate_order_policy(
market=market,
order_type=decision.action,
price=float(overseas_price),
)
except OrderPolicyRejected as exc:
logger.warning(
"Order policy rejected %s %s (%s): %s [session=%s]",
decision.action,
stock_code,
market.name,
exc,
exc.session_id,
)
return
if _maybe_queue_order_intent(
market=market,
stock_code=stock_code,
order_type=decision.action,
quantity=quantity,
price=float(overseas_price),
source="trading_cycle",
):
return
result = await overseas_broker.send_overseas_order(
exchange_code=market.exchange_code,
stock_code=stock_code,
@@ -1271,6 +1679,11 @@ async def handle_domestic_pending_orders(
f"Invalid price ({last_price}) for {stock_code}"
)
new_price = kr_round_down(last_price * 0.996)
validate_order_policy(
market=MARKETS["KR"],
order_type="SELL",
price=float(new_price),
)
await broker.send_order(
stock_code=stock_code,
order_type="SELL",
@@ -1444,6 +1857,19 @@ async def handle_overseas_pending_orders(
f"Invalid price ({last_price}) for {stock_code}"
)
new_price = round(last_price * 0.996, 4)
market_info = next(
(
m for m in MARKETS.values()
if m.exchange_code == order_exchange and not m.is_domestic
),
None,
)
if market_info is not None:
validate_order_policy(
market=market_info,
order_type="SELL",
price=float(new_price),
)
await overseas_broker.send_overseas_order(
exchange_code=order_exchange,
stock_code=stock_code,
@@ -1532,6 +1958,11 @@ async def run_daily_session(
# Process each open market
for market in open_markets:
await process_blackout_recovery_orders(
broker=broker,
overseas_broker=overseas_broker,
db_conn=db_conn,
)
# Use market-local date for playbook keying
market_today = datetime.now(market.timezone).date()
@@ -1969,26 +2400,18 @@ async def run_daily_session(
logger.warning("Fat finger notification failed: %s", notify_exc)
continue # Skip this order
except CircuitBreakerTripped as exc:
ks_report = await KILL_SWITCH.trigger(
ks_report = await _trigger_emergency_kill_switch(
reason=f"daily_circuit_breaker:{market.code}:{stock_code}:{exc.pnl_pct:.2f}",
snapshot_state=lambda: logger.critical(
"Daily KillSwitch snapshot %s/%s pnl=%.2f threshold=%.2f",
market.code,
stock_code,
exc.pnl_pct,
exc.threshold,
),
)
logger.critical("Circuit breaker tripped — stopping session")
try:
await telegram.notify_circuit_breaker(
broker=broker,
overseas_broker=overseas_broker,
telegram=telegram,
settings=settings,
current_market=market,
stock_code=stock_code,
pnl_pct=exc.pnl_pct,
threshold=exc.threshold,
)
except Exception as notify_exc:
logger.warning(
"Circuit breaker notification failed: %s", notify_exc
)
logger.critical("Circuit breaker tripped — stopping session")
if ks_report.errors:
logger.critical(
"Daily KillSwitch step errors for %s/%s: %s",
@@ -2012,6 +2435,31 @@ async def run_daily_session(
order_price = kr_round_down(
stock_data["current_price"] * 0.998
)
try:
validate_order_policy(
market=market,
order_type=decision.action,
price=float(order_price),
)
except OrderPolicyRejected as exc:
logger.warning(
"Order policy rejected %s %s (%s): %s [session=%s]",
decision.action,
stock_code,
market.name,
exc,
exc.session_id,
)
continue
if _maybe_queue_order_intent(
market=market,
stock_code=stock_code,
order_type=decision.action,
quantity=quantity,
price=float(order_price),
source="run_daily_session",
):
continue
result = await broker.send_order(
stock_code=stock_code,
order_type=decision.action,
@@ -2024,6 +2472,31 @@ async def run_daily_session(
order_price = round(stock_data["current_price"] * 1.005, 4)
else:
order_price = stock_data["current_price"]
try:
validate_order_policy(
market=market,
order_type=decision.action,
price=float(order_price),
)
except OrderPolicyRejected as exc:
logger.warning(
"Order policy rejected %s %s (%s): %s [session=%s]",
decision.action,
stock_code,
market.name,
exc,
exc.session_id,
)
continue
if _maybe_queue_order_intent(
market=market,
stock_code=stock_code,
order_type=decision.action,
quantity=quantity,
price=float(order_price),
source="run_daily_session",
):
continue
result = await overseas_broker.send_overseas_order(
exchange_code=market.exchange_code,
stock_code=stock_code,
@@ -2262,6 +2735,19 @@ def _apply_dashboard_flag(settings: Settings, dashboard_flag: bool) -> Settings:
async def run(settings: Settings) -> None:
"""Main async loop — iterate over open markets on a timer."""
global BLACKOUT_ORDER_MANAGER
BLACKOUT_ORDER_MANAGER = BlackoutOrderManager(
enabled=settings.ORDER_BLACKOUT_ENABLED,
windows=parse_blackout_windows_kst(settings.ORDER_BLACKOUT_WINDOWS_KST),
max_queue_size=settings.ORDER_BLACKOUT_QUEUE_MAX,
)
logger.info(
"Blackout manager initialized: enabled=%s windows=%s queue_max=%d",
settings.ORDER_BLACKOUT_ENABLED,
settings.ORDER_BLACKOUT_WINDOWS_KST,
settings.ORDER_BLACKOUT_QUEUE_MAX,
)
broker = KISBroker(settings)
overseas_broker = OverseasBroker(broker)
brain = GeminiClient(settings)
@@ -2861,6 +3347,12 @@ async def run(settings: Settings) -> None:
if shutdown.is_set():
break
await process_blackout_recovery_orders(
broker=broker,
overseas_broker=overseas_broker,
db_conn=db_conn,
)
# Notify market open if it just opened
if not _market_states.get(market.code, False):
try:

View File

@@ -0,0 +1,81 @@
from __future__ import annotations
from datetime import UTC, datetime
from src.core.blackout_manager import (
BlackoutOrderManager,
QueuedOrderIntent,
parse_blackout_windows_kst,
)
def test_parse_blackout_windows_kst() -> None:
windows = parse_blackout_windows_kst("23:30-00:10,11:20-11:30,invalid")
assert len(windows) == 2
def test_blackout_manager_handles_cross_midnight_window() -> None:
manager = BlackoutOrderManager(
enabled=True,
windows=parse_blackout_windows_kst("23:30-00:10"),
max_queue_size=10,
)
# 2026-01-01 23:40 KST = 2026-01-01 14:40 UTC
assert manager.in_blackout(datetime(2026, 1, 1, 14, 40, tzinfo=UTC))
# 2026-01-02 00:20 KST = 2026-01-01 15:20 UTC
assert not manager.in_blackout(datetime(2026, 1, 1, 15, 20, tzinfo=UTC))
def test_recovery_batch_only_after_blackout_exit() -> None:
manager = BlackoutOrderManager(
enabled=True,
windows=parse_blackout_windows_kst("23:30-00:10"),
max_queue_size=10,
)
intent = QueuedOrderIntent(
market_code="KR",
exchange_code="KRX",
stock_code="005930",
order_type="BUY",
quantity=1,
price=100.0,
source="test",
queued_at=datetime.now(UTC),
)
assert manager.enqueue(intent)
# Inside blackout: no pop yet
inside_blackout = datetime(2026, 1, 1, 14, 40, tzinfo=UTC)
assert manager.pop_recovery_batch(inside_blackout) == []
# Outside blackout: pop full batch once
outside_blackout = datetime(2026, 1, 1, 15, 20, tzinfo=UTC)
batch = manager.pop_recovery_batch(outside_blackout)
assert len(batch) == 1
assert manager.pending_count == 0
def test_requeued_intent_is_processed_next_non_blackout_cycle() -> None:
manager = BlackoutOrderManager(
enabled=True,
windows=parse_blackout_windows_kst("23:30-00:10"),
max_queue_size=10,
)
intent = QueuedOrderIntent(
market_code="KR",
exchange_code="KRX",
stock_code="005930",
order_type="BUY",
quantity=1,
price=100.0,
source="test",
queued_at=datetime.now(UTC),
)
manager.enqueue(intent)
outside_blackout = datetime(2026, 1, 1, 15, 20, tzinfo=UTC)
first_batch = manager.pop_recovery_batch(outside_blackout)
assert len(first_batch) == 1
manager.requeue(first_batch[0])
second_batch = manager.pop_recovery_batch(outside_blackout)
assert len(second_batch) == 1

View File

@@ -8,12 +8,14 @@ import pytest
from src.config import Settings
from src.context.layer import ContextLayer
from src.context.scheduler import ScheduleResult
from src.core.order_policy import OrderPolicyRejected
from src.core.risk_manager import CircuitBreakerTripped, FatFingerRejected
from src.db import init_db, log_trade
from src.evolution.scorecard import DailyScorecard
from src.logging.decision_logger import DecisionLogger
from src.main import (
KILL_SWITCH,
_trigger_emergency_kill_switch,
_apply_dashboard_flag,
_determine_order_quantity,
_extract_avg_price_from_balance,
@@ -26,6 +28,7 @@ from src.main import (
_start_dashboard_server,
handle_domestic_pending_orders,
handle_overseas_pending_orders,
process_blackout_recovery_orders,
run_daily_session,
safe_float,
sync_positions_from_broker,
@@ -5116,3 +5119,349 @@ async def test_kill_switch_block_skips_actionable_order_execution() -> None:
KILL_SWITCH.clear_block()
broker.send_order.assert_not_called()
@pytest.mark.asyncio
async def test_order_policy_rejection_skips_order_execution() -> None:
"""Order policy rejection must prevent order submission."""
db_conn = init_db(":memory:")
decision_logger = DecisionLogger(db_conn)
broker = MagicMock()
broker.get_current_price = AsyncMock(return_value=(100.0, 0.5, 0.0))
broker.get_balance = AsyncMock(
return_value={
"output1": [],
"output2": [
{
"tot_evlu_amt": "100000",
"dnca_tot_amt": "50000",
"pchs_amt_smtl_amt": "50000",
}
],
}
)
broker.send_order = AsyncMock(return_value={"msg1": "OK"})
market = MagicMock()
market.name = "Korea"
market.code = "KR"
market.exchange_code = "KRX"
market.is_domestic = True
telegram = MagicMock()
telegram.notify_trade_execution = AsyncMock()
telegram.notify_fat_finger = AsyncMock()
telegram.notify_circuit_breaker = AsyncMock()
telegram.notify_scenario_matched = AsyncMock()
settings = MagicMock()
settings.POSITION_SIZING_ENABLED = False
settings.CONFIDENCE_THRESHOLD = 80
with patch(
"src.main.validate_order_policy",
side_effect=OrderPolicyRejected(
"rejected",
session_id="NXT_AFTER",
market_code="KR",
),
):
await trading_cycle(
broker=broker,
overseas_broker=MagicMock(),
scenario_engine=MagicMock(evaluate=MagicMock(return_value=_make_buy_match())),
playbook=_make_playbook(),
risk=MagicMock(),
db_conn=db_conn,
decision_logger=decision_logger,
context_store=MagicMock(
get_latest_timeframe=MagicMock(return_value=None),
set_context=MagicMock(),
),
criticality_assessor=MagicMock(
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
get_timeout=MagicMock(return_value=5.0),
),
telegram=telegram,
market=market,
stock_code="005930",
scan_candidates={},
settings=settings,
)
broker.send_order.assert_not_called()
@pytest.mark.asyncio
async def test_blackout_queues_order_and_skips_submission() -> None:
"""When blackout is active, order submission is replaced by queueing."""
db_conn = init_db(":memory:")
decision_logger = DecisionLogger(db_conn)
broker = MagicMock()
broker.get_current_price = AsyncMock(return_value=(100.0, 0.5, 0.0))
broker.get_balance = AsyncMock(
return_value={
"output1": [],
"output2": [
{
"tot_evlu_amt": "100000",
"dnca_tot_amt": "50000",
"pchs_amt_smtl_amt": "50000",
}
],
}
)
broker.send_order = AsyncMock(return_value={"msg1": "OK"})
market = MagicMock()
market.name = "Korea"
market.code = "KR"
market.exchange_code = "KRX"
market.is_domestic = True
settings = MagicMock()
settings.POSITION_SIZING_ENABLED = False
settings.CONFIDENCE_THRESHOLD = 80
telegram = MagicMock()
telegram.notify_trade_execution = AsyncMock()
telegram.notify_fat_finger = AsyncMock()
telegram.notify_circuit_breaker = AsyncMock()
telegram.notify_scenario_matched = AsyncMock()
blackout_manager = MagicMock()
blackout_manager.in_blackout.return_value = True
blackout_manager.enqueue.return_value = True
blackout_manager.pending_count = 1
with patch("src.main.BLACKOUT_ORDER_MANAGER", blackout_manager):
await trading_cycle(
broker=broker,
overseas_broker=MagicMock(),
scenario_engine=MagicMock(evaluate=MagicMock(return_value=_make_buy_match())),
playbook=_make_playbook(),
risk=MagicMock(),
db_conn=db_conn,
decision_logger=decision_logger,
context_store=MagicMock(
get_latest_timeframe=MagicMock(return_value=None),
set_context=MagicMock(),
),
criticality_assessor=MagicMock(
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
get_timeout=MagicMock(return_value=5.0),
),
telegram=telegram,
market=market,
stock_code="005930",
scan_candidates={},
settings=settings,
)
broker.send_order.assert_not_called()
blackout_manager.enqueue.assert_called_once()
@pytest.mark.asyncio
async def test_process_blackout_recovery_executes_valid_intents() -> None:
"""Recovery must execute queued intents that pass revalidation."""
db_conn = init_db(":memory:")
broker = MagicMock()
broker.send_order = AsyncMock(return_value={"rt_cd": "0", "msg1": "OK"})
overseas_broker = MagicMock()
market = MagicMock()
market.code = "KR"
market.exchange_code = "KRX"
market.is_domestic = True
intent = MagicMock()
intent.market_code = "KR"
intent.stock_code = "005930"
intent.order_type = "BUY"
intent.quantity = 1
intent.price = 100.0
intent.source = "test"
intent.attempts = 0
blackout_manager = MagicMock()
blackout_manager.pop_recovery_batch.return_value = [intent]
with (
patch("src.main.BLACKOUT_ORDER_MANAGER", blackout_manager),
patch("src.main.MARKETS", {"KR": market}),
patch("src.main.get_open_position", return_value=None),
patch("src.main.validate_order_policy"),
):
await process_blackout_recovery_orders(
broker=broker,
overseas_broker=overseas_broker,
db_conn=db_conn,
)
broker.send_order.assert_called_once()
@pytest.mark.asyncio
async def test_process_blackout_recovery_drops_policy_rejected_intent() -> None:
"""Policy-rejected queued intents must not be requeued."""
db_conn = init_db(":memory:")
broker = MagicMock()
broker.send_order = AsyncMock(return_value={"rt_cd": "0", "msg1": "OK"})
overseas_broker = MagicMock()
market = MagicMock()
market.code = "KR"
market.exchange_code = "KRX"
market.is_domestic = True
intent = MagicMock()
intent.market_code = "KR"
intent.stock_code = "005930"
intent.order_type = "BUY"
intent.quantity = 1
intent.price = 100.0
intent.source = "test"
intent.attempts = 0
blackout_manager = MagicMock()
blackout_manager.pop_recovery_batch.return_value = [intent]
with (
patch("src.main.BLACKOUT_ORDER_MANAGER", blackout_manager),
patch("src.main.MARKETS", {"KR": market}),
patch("src.main.get_open_position", return_value=None),
patch(
"src.main.validate_order_policy",
side_effect=OrderPolicyRejected(
"blocked",
session_id="NXT_AFTER",
market_code="KR",
),
),
):
await process_blackout_recovery_orders(
broker=broker,
overseas_broker=overseas_broker,
db_conn=db_conn,
)
broker.send_order.assert_not_called()
blackout_manager.requeue.assert_not_called()
@pytest.mark.asyncio
async def test_trigger_emergency_kill_switch_executes_operational_steps() -> None:
"""Emergency kill switch should execute cancel/refresh/reduce/notify callbacks."""
broker = MagicMock()
broker.get_domestic_pending_orders = AsyncMock(
return_value=[
{
"pdno": "005930",
"orgn_odno": "1",
"ord_gno_brno": "01",
"psbl_qty": "3",
}
]
)
broker.cancel_domestic_order = AsyncMock(return_value={"rt_cd": "0"})
broker.get_balance = AsyncMock(return_value={"output1": [], "output2": []})
overseas_broker = MagicMock()
overseas_broker.get_overseas_pending_orders = AsyncMock(return_value=[])
overseas_broker.get_overseas_balance = AsyncMock(return_value={"output1": [], "output2": []})
telegram = MagicMock()
telegram.notify_circuit_breaker = AsyncMock()
settings = MagicMock()
settings.enabled_market_list = ["KR"]
market = MagicMock()
market.code = "KR"
market.exchange_code = "KRX"
market.is_domestic = True
with (
patch("src.main.MARKETS", {"KR": market}),
patch("src.main.BLACKOUT_ORDER_MANAGER.clear", return_value=2),
):
report = await _trigger_emergency_kill_switch(
reason="test",
broker=broker,
overseas_broker=overseas_broker,
telegram=telegram,
settings=settings,
current_market=market,
stock_code="005930",
pnl_pct=-3.2,
threshold=-3.0,
)
assert report.steps == [
"block_new_orders",
"cancel_pending_orders",
"refresh_order_state",
"reduce_risk",
"snapshot_state",
"notify",
]
broker.cancel_domestic_order.assert_called_once()
broker.get_balance.assert_called_once()
telegram.notify_circuit_breaker.assert_called_once_with(
pnl_pct=-3.2,
threshold=-3.0,
)
@pytest.mark.asyncio
async def test_trigger_emergency_kill_switch_records_cancel_failure() -> None:
"""Cancel API rejection should be captured in kill switch errors."""
broker = MagicMock()
broker.get_domestic_pending_orders = AsyncMock(
return_value=[
{
"pdno": "005930",
"orgn_odno": "1",
"ord_gno_brno": "01",
"psbl_qty": "3",
}
]
)
broker.cancel_domestic_order = AsyncMock(return_value={"rt_cd": "1", "msg1": "fail"})
broker.get_balance = AsyncMock(return_value={"output1": [], "output2": []})
overseas_broker = MagicMock()
overseas_broker.get_overseas_pending_orders = AsyncMock(return_value=[])
overseas_broker.get_overseas_balance = AsyncMock(return_value={"output1": [], "output2": []})
telegram = MagicMock()
telegram.notify_circuit_breaker = AsyncMock()
settings = MagicMock()
settings.enabled_market_list = ["KR"]
market = MagicMock()
market.code = "KR"
market.exchange_code = "KRX"
market.is_domestic = True
with (
patch("src.main.MARKETS", {"KR": market}),
patch("src.main.BLACKOUT_ORDER_MANAGER.clear", return_value=0),
):
report = await _trigger_emergency_kill_switch(
reason="test-fail",
broker=broker,
overseas_broker=overseas_broker,
telegram=telegram,
settings=settings,
current_market=market,
stock_code="005930",
pnl_pct=-3.2,
threshold=-3.0,
)
assert any(err.startswith("cancel_pending_orders:") for err in report.errors)

View File

@@ -0,0 +1,40 @@
from datetime import UTC, datetime
import pytest
from src.core.order_policy import OrderPolicyRejected, classify_session_id, validate_order_policy
from src.markets.schedule import MARKETS
def test_classify_kr_nxt_after() -> None:
# 2026-02-26 16:00 KST == 07:00 UTC
now = datetime(2026, 2, 26, 7, 0, tzinfo=UTC)
assert classify_session_id(MARKETS["KR"], now) == "NXT_AFTER"
def test_classify_us_pre() -> None:
# 2026-02-26 19:00 KST == 10:00 UTC
now = datetime(2026, 2, 26, 10, 0, tzinfo=UTC)
assert classify_session_id(MARKETS["US_NASDAQ"], now) == "US_PRE"
def test_reject_market_order_in_low_liquidity_session() -> None:
now = datetime(2026, 2, 26, 10, 0, tzinfo=UTC) # 19:00 KST -> US_PRE
with pytest.raises(OrderPolicyRejected):
validate_order_policy(
market=MARKETS["US_NASDAQ"],
order_type="BUY",
price=0.0,
now=now,
)
def test_allow_limit_order_in_low_liquidity_session() -> None:
now = datetime(2026, 2, 26, 10, 0, tzinfo=UTC) # 19:00 KST -> US_PRE
info = validate_order_policy(
market=MARKETS["US_NASDAQ"],
order_type="BUY",
price=100.0,
now=now,
)
assert info.session_id == "US_PRE"

View File

@@ -0,0 +1,131 @@
from __future__ import annotations
from src.analysis.triple_barrier import TripleBarrierSpec, label_with_triple_barrier
def test_long_take_profit_first() -> None:
highs = [100, 101, 103]
lows = [100, 99.6, 100]
closes = [100, 100, 102]
spec = TripleBarrierSpec(take_profit_pct=0.02, stop_loss_pct=0.01, max_holding_bars=3)
out = label_with_triple_barrier(
highs=highs,
lows=lows,
closes=closes,
entry_index=0,
side=1,
spec=spec,
)
assert out.label == 1
assert out.touched == "take_profit"
assert out.touch_bar == 2
def test_long_stop_loss_first() -> None:
highs = [100, 100.5, 101]
lows = [100, 98.8, 99]
closes = [100, 99.5, 100]
spec = TripleBarrierSpec(take_profit_pct=0.02, stop_loss_pct=0.01, max_holding_bars=3)
out = label_with_triple_barrier(
highs=highs,
lows=lows,
closes=closes,
entry_index=0,
side=1,
spec=spec,
)
assert out.label == -1
assert out.touched == "stop_loss"
assert out.touch_bar == 1
def test_time_barrier_timeout() -> None:
highs = [100, 100.8, 100.7]
lows = [100, 99.3, 99.4]
closes = [100, 100, 100]
spec = TripleBarrierSpec(take_profit_pct=0.02, stop_loss_pct=0.02, max_holding_bars=2)
out = label_with_triple_barrier(
highs=highs,
lows=lows,
closes=closes,
entry_index=0,
side=1,
spec=spec,
)
assert out.label == 0
assert out.touched == "time"
assert out.touch_bar == 2
def test_tie_break_stop_first_default() -> None:
highs = [100, 102.1]
lows = [100, 98.9]
closes = [100, 100]
spec = TripleBarrierSpec(take_profit_pct=0.02, stop_loss_pct=0.01, max_holding_bars=1)
out = label_with_triple_barrier(
highs=highs,
lows=lows,
closes=closes,
entry_index=0,
side=1,
spec=spec,
)
assert out.label == -1
assert out.touched == "stop_loss"
def test_short_side_inverts_barrier_semantics() -> None:
highs = [100, 100.5, 101.2]
lows = [100, 97.8, 98.0]
closes = [100, 99, 99]
spec = TripleBarrierSpec(take_profit_pct=0.02, stop_loss_pct=0.01, max_holding_bars=3)
out = label_with_triple_barrier(
highs=highs,
lows=lows,
closes=closes,
entry_index=0,
side=-1,
spec=spec,
)
assert out.label == 1
assert out.touched == "take_profit"
def test_short_tie_break_modes() -> None:
highs = [100, 101.1]
lows = [100, 97.9]
closes = [100, 100]
stop_first = TripleBarrierSpec(
take_profit_pct=0.02,
stop_loss_pct=0.01,
max_holding_bars=1,
tie_break="stop_first",
)
out_stop = label_with_triple_barrier(
highs=highs,
lows=lows,
closes=closes,
entry_index=0,
side=-1,
spec=stop_first,
)
assert out_stop.label == -1
assert out_stop.touched == "stop_loss"
take_first = TripleBarrierSpec(
take_profit_pct=0.02,
stop_loss_pct=0.01,
max_holding_bars=1,
tie_break="take_first",
)
out_take = label_with_triple_barrier(
highs=highs,
lows=lows,
closes=closes,
entry_index=0,
side=-1,
spec=take_first,
)
assert out_take.label == 1
assert out_take.touched == "take_profit"