Compare commits

...

22 Commits

Author SHA1 Message Date
agentson
2742628b78 feat: prioritize kill-switch over overnight exception policy (TASK-CODE-012) 2026-02-27 08:55:24 +09:00
agentson
694d73b212 fix: lazy session resolver and one-time session_id backfill 2026-02-27 08:51:00 +09:00
agentson
b2b02b6f57 feat: enforce session_id persistence in trade ledger (TASK-CODE-007) 2026-02-27 08:49:04 +09:00
2dbe98615d Merge pull request '[FX-ACCOUNTING] TKT-P1-006 전략/환율 PnL 분리 회계' (#296) from feature/issue-tkt-p1-006-fx-pnl-separation into feature/v3-session-policy-stream 2026-02-27 08:46:56 +09:00
agentson
34cf081c96 fix: backfill split pnl migration and harden partial pnl inputs 2026-02-27 08:46:22 +09:00
agentson
7bc4e88335 feat: separate strategy and fx pnl fields in trade logs (TASK-CODE-011) 2026-02-27 08:44:05 +09:00
386e039ff6 Merge pull request '[BACKTEST-MODEL] TKT-P1-005 보수적 체결 모델 구현' (#294) from feature/issue-tkt-p1-005-conservative-fill-model into feature/v3-session-policy-stream 2026-02-27 08:42:22 +09:00
agentson
13ba9e8081 fix: validate execution assumption ranges in backtest model 2026-02-27 08:41:56 +09:00
agentson
5b52f593a8 feat: add conservative backtest execution simulator (TASK-CODE-010) 2026-02-27 08:40:23 +09:00
2798558bf3 Merge pull request '[BACKTEST-MODEL] TKT-P1-002 백테스트 비용/슬리피지 옵션 필수화' (#292) from feature/issue-tkt-p1-002-backtest-cost-mandatory into feature/v3-session-policy-stream 2026-02-27 08:37:15 +09:00
agentson
2331d80915 fix: reject non-finite backtest cost assumptions 2026-02-27 08:36:38 +09:00
agentson
7d72669cb8 feat: enforce mandatory backtest cost assumptions (TASK-CODE-006) 2026-02-27 08:34:44 +09:00
74a4784b7a Merge pull request '[BACKTEST-MODEL] TKT-P1-004 Walk-forward + Purge/Embargo 분할 유틸' (#290) from feature/issue-tkt-p1-004-walkforward-purge-embargo into feature/v3-session-policy-stream 2026-02-27 08:33:01 +09:00
agentson
dc70311aed fix: keep embargo tied to accepted folds and enforce PR-comment decision logs 2026-02-27 08:32:09 +09:00
agentson
e56819e9e2 feat: add walk-forward splitter with purge and embargo controls (TASK-CODE-005) 2026-02-27 08:28:11 +09:00
cfd5351b58 Merge pull request '[FX-ACCOUNTING] TKT-P1-001 USD/KRW 버퍼 진입 제한' (#288) from feature/issue-tkt-p1-001-fx-buffer-guard into feature/v3-session-policy-stream 2026-02-27 00:53:21 +09:00
agentson
b206c23fc9 fix: scope USD buffer guard to US markets and add boundary tests 2026-02-27 00:52:44 +09:00
agentson
4d9f3e2cfc feat: enforce overseas buy guard with USD buffer threshold (TASK-V3-014) 2026-02-27 00:50:12 +09:00
a93a5c616b Merge pull request '[BACKTEST-MODEL] TKT-P1-003 Triple Barrier 라벨러 구현' (#286) from feature/issue-tkt-p1-003-triple-barrier-labeler into feature/v3-session-policy-stream 2026-02-27 00:47:37 +09:00
agentson
9f64c9944a fix: correct short-side tie-break semantics in triple barrier 2026-02-27 00:47:09 +09:00
agentson
bb391d502c feat: add triple barrier labeler with first-touch logic (TASK-CODE-004) 2026-02-27 00:45:18 +09:00
b0100fde10 Merge pull request '[RISK-EMERGENCY][SCN-FAIL-003] TKT-P0-002 Kill Switch 순서 강제 검증 자동화' (#284) from feature/issue-tkt-p0-002-killswitch-ordering into feature/v3-session-policy-stream 2026-02-27 00:42:16 +09:00
16 changed files with 1279 additions and 8 deletions

View File

@@ -149,6 +149,7 @@ TPM 티켓 운영 규칙:
- TPM은 합의된 변경을 이슈로 등록하고 우선순위(`P0/P1/P2`)를 지정한다.
- PR 본문에는 TPM이 지정한 우선순위와 범위가 그대로 반영되어야 한다.
- 우선순위 변경은 TPM 제안 + Main Agent 승인으로만 가능하다.
- PM/TPM/Dev/Reviewer/Verifier/Runtime Verifier는 주요 의사결정 시점마다 PR 코멘트를 남겨 결정 근거를 추적 가능 상태로 유지한다.
브랜치 운영 규칙:
- TPM은 각 티켓에 대해 `ticket temp branch -> program feature branch` PR 경로를 지정한다.

View File

@@ -50,6 +50,7 @@ Updated: 2026-02-26
- PR 본문에 `REQ-*`, `TASK-*`, `TEST-*` 매핑 표 존재
- `src/core/risk_manager.py` 변경 없음
- 주요 의사결정 체크포인트(DCP-01~04) 중 해당 단계 Main Agent 확인 기록 존재
- 주요 의사결정(리뷰 지적/수정 합의/검증 승인)에 대한 에이전트 PR 코멘트 존재
- 티켓 PR의 base가 `main`이 아닌 program feature branch인지 확인
자동 점검:

View File

@@ -22,6 +22,7 @@
- Ticket-level development happens only on **ticket temp branches** cut from the program feature branch.
- Ticket PR merges into program feature branch are allowed after verifier approval.
- Until final user sign-off, `main` merge is prohibited.
- 각 에이전트는 주요 의사결정(리뷰 지적, 수정 방향, 검증 승인)마다 PR 코멘트를 적극 작성해 의사결정 과정을 남긴다.
## Gitea CLI Formatting Troubleshooting

View File

@@ -0,0 +1,52 @@
"""Backtest cost/slippage/failure validation guard."""
from __future__ import annotations
from dataclasses import dataclass
import math
@dataclass(frozen=True)
class BacktestCostModel:
commission_bps: float | None = None
slippage_bps_by_session: dict[str, float] | None = None
failure_rate_by_session: dict[str, float] | None = None
unfavorable_fill_required: bool = True
def validate_backtest_cost_model(
*,
model: BacktestCostModel,
required_sessions: list[str],
) -> None:
"""Raise ValueError when required cost assumptions are missing/invalid."""
if (
model.commission_bps is None
or not math.isfinite(model.commission_bps)
or model.commission_bps < 0
):
raise ValueError("commission_bps must be provided and >= 0")
if not model.unfavorable_fill_required:
raise ValueError("unfavorable_fill_required must be True")
slippage = model.slippage_bps_by_session or {}
failure = model.failure_rate_by_session or {}
missing_slippage = [s for s in required_sessions if s not in slippage]
if missing_slippage:
raise ValueError(
f"missing slippage_bps_by_session for sessions: {', '.join(missing_slippage)}"
)
missing_failure = [s for s in required_sessions if s not in failure]
if missing_failure:
raise ValueError(
f"missing failure_rate_by_session for sessions: {', '.join(missing_failure)}"
)
for sess, bps in slippage.items():
if not math.isfinite(bps) or bps < 0:
raise ValueError(f"slippage bps must be >= 0 for session={sess}")
for sess, rate in failure.items():
if not math.isfinite(rate) or rate < 0 or rate > 1:
raise ValueError(f"failure rate must be within [0,1] for session={sess}")

View File

@@ -0,0 +1,103 @@
"""Conservative backtest execution model."""
from __future__ import annotations
from dataclasses import dataclass
import math
from random import Random
from typing import Literal
OrderSide = Literal["BUY", "SELL"]
@dataclass(frozen=True)
class ExecutionRequest:
side: OrderSide
session_id: str
qty: int
reference_price: float
@dataclass(frozen=True)
class ExecutionAssumptions:
slippage_bps_by_session: dict[str, float]
failure_rate_by_session: dict[str, float]
partial_fill_rate_by_session: dict[str, float]
partial_fill_min_ratio: float = 0.3
partial_fill_max_ratio: float = 0.8
seed: int = 0
@dataclass(frozen=True)
class ExecutionResult:
status: Literal["FILLED", "PARTIAL", "REJECTED"]
filled_qty: int
avg_price: float
slippage_bps: float
reason: str
class BacktestExecutionModel:
"""Execution simulator with conservative unfavorable fill assumptions."""
def __init__(self, assumptions: ExecutionAssumptions) -> None:
self.assumptions = assumptions
self._rng = Random(assumptions.seed)
if assumptions.partial_fill_min_ratio <= 0 or assumptions.partial_fill_max_ratio > 1:
raise ValueError("partial fill ratios must be within (0,1]")
if assumptions.partial_fill_min_ratio > assumptions.partial_fill_max_ratio:
raise ValueError("partial_fill_min_ratio must be <= partial_fill_max_ratio")
for sess, bps in assumptions.slippage_bps_by_session.items():
if not math.isfinite(bps) or bps < 0:
raise ValueError(f"slippage_bps must be finite and >= 0 for session={sess}")
for sess, rate in assumptions.failure_rate_by_session.items():
if not math.isfinite(rate) or rate < 0 or rate > 1:
raise ValueError(f"failure_rate must be in [0,1] for session={sess}")
for sess, rate in assumptions.partial_fill_rate_by_session.items():
if not math.isfinite(rate) or rate < 0 or rate > 1:
raise ValueError(f"partial_fill_rate must be in [0,1] for session={sess}")
def simulate(self, request: ExecutionRequest) -> ExecutionResult:
if request.qty <= 0:
raise ValueError("qty must be positive")
if request.reference_price <= 0:
raise ValueError("reference_price must be positive")
slippage_bps = self.assumptions.slippage_bps_by_session.get(request.session_id, 0.0)
failure_rate = self.assumptions.failure_rate_by_session.get(request.session_id, 0.0)
partial_rate = self.assumptions.partial_fill_rate_by_session.get(request.session_id, 0.0)
if self._rng.random() < failure_rate:
return ExecutionResult(
status="REJECTED",
filled_qty=0,
avg_price=0.0,
slippage_bps=slippage_bps,
reason="execution_failure",
)
slip_mult = 1.0 + (slippage_bps / 10000.0 if request.side == "BUY" else -slippage_bps / 10000.0)
exec_price = request.reference_price * slip_mult
if self._rng.random() < partial_rate:
ratio = self._rng.uniform(
self.assumptions.partial_fill_min_ratio,
self.assumptions.partial_fill_max_ratio,
)
filled = max(1, min(request.qty - 1, int(request.qty * ratio)))
return ExecutionResult(
status="PARTIAL",
filled_qty=filled,
avg_price=exec_price,
slippage_bps=slippage_bps,
reason="partial_fill",
)
return ExecutionResult(
status="FILLED",
filled_qty=request.qty,
avg_price=exec_price,
slippage_bps=slippage_bps,
reason="filled",
)

View File

@@ -0,0 +1,111 @@
"""Triple barrier labeler utilities.
Implements first-touch labeling with upper/lower/time barriers.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Literal, Sequence
TieBreakMode = Literal["stop_first", "take_first"]
@dataclass(frozen=True)
class TripleBarrierSpec:
take_profit_pct: float
stop_loss_pct: float
max_holding_bars: int
tie_break: TieBreakMode = "stop_first"
@dataclass(frozen=True)
class TripleBarrierLabel:
label: int # +1 take-profit first, -1 stop-loss first, 0 timeout
touched: Literal["take_profit", "stop_loss", "time"]
touch_bar: int
entry_price: float
upper_barrier: float
lower_barrier: float
def label_with_triple_barrier(
*,
highs: Sequence[float],
lows: Sequence[float],
closes: Sequence[float],
entry_index: int,
side: int,
spec: TripleBarrierSpec,
) -> TripleBarrierLabel:
"""Label one entry using triple-barrier first-touch rule.
Args:
highs/lows/closes: OHLC components with identical length.
entry_index: Entry bar index in the sequences.
side: +1 for long, -1 for short.
spec: Barrier specification.
"""
if side not in {1, -1}:
raise ValueError("side must be +1 or -1")
if len(highs) != len(lows) or len(highs) != len(closes):
raise ValueError("highs, lows, closes lengths must match")
if entry_index < 0 or entry_index >= len(closes):
raise IndexError("entry_index out of range")
if spec.max_holding_bars <= 0:
raise ValueError("max_holding_bars must be positive")
entry_price = float(closes[entry_index])
if entry_price <= 0:
raise ValueError("entry price must be positive")
if side == 1:
upper = entry_price * (1.0 + spec.take_profit_pct)
lower = entry_price * (1.0 - spec.stop_loss_pct)
else:
# For short side, favorable move is down.
upper = entry_price * (1.0 + spec.stop_loss_pct)
lower = entry_price * (1.0 - spec.take_profit_pct)
last_index = min(len(closes) - 1, entry_index + spec.max_holding_bars)
for idx in range(entry_index + 1, last_index + 1):
h = float(highs[idx])
l = float(lows[idx])
up_touch = h >= upper
down_touch = l <= lower
if not up_touch and not down_touch:
continue
if up_touch and down_touch:
if spec.tie_break == "stop_first":
touched = "stop_loss"
label = -1
else:
touched = "take_profit"
label = 1
elif up_touch:
touched = "take_profit" if side == 1 else "stop_loss"
label = 1 if side == 1 else -1
else:
touched = "stop_loss" if side == 1 else "take_profit"
label = -1 if side == 1 else 1
return TripleBarrierLabel(
label=label,
touched=touched,
touch_bar=idx,
entry_price=entry_price,
upper_barrier=upper,
lower_barrier=lower,
)
return TripleBarrierLabel(
label=0,
touched="time",
touch_bar=last_index,
entry_price=entry_price,
upper_barrier=upper,
lower_barrier=lower,
)

View File

@@ -0,0 +1,74 @@
"""Walk-forward splitter with purge/embargo controls."""
from __future__ import annotations
from dataclasses import dataclass
@dataclass(frozen=True)
class WalkForwardFold:
train_indices: list[int]
test_indices: list[int]
@property
def train_size(self) -> int:
return len(self.train_indices)
@property
def test_size(self) -> int:
return len(self.test_indices)
def generate_walk_forward_splits(
*,
n_samples: int,
train_size: int,
test_size: int,
step_size: int | None = None,
purge_size: int = 0,
embargo_size: int = 0,
min_train_size: int = 1,
) -> list[WalkForwardFold]:
"""Generate chronological folds with purge/embargo leakage controls."""
if n_samples <= 0:
raise ValueError("n_samples must be positive")
if train_size <= 0 or test_size <= 0:
raise ValueError("train_size and test_size must be positive")
if purge_size < 0 or embargo_size < 0:
raise ValueError("purge_size and embargo_size must be >= 0")
if min_train_size <= 0:
raise ValueError("min_train_size must be positive")
step = step_size if step_size is not None else test_size
if step <= 0:
raise ValueError("step_size must be positive")
folds: list[WalkForwardFold] = []
prev_test_end: int | None = None
test_start = train_size + purge_size
while test_start + test_size <= n_samples:
test_end = test_start + test_size - 1
train_end = test_start - purge_size - 1
if train_end < 0:
break
train_start = max(0, train_end - train_size + 1)
train_indices = list(range(train_start, train_end + 1))
if prev_test_end is not None and embargo_size > 0:
emb_from = prev_test_end + 1
emb_to = prev_test_end + embargo_size
train_indices = [i for i in train_indices if i < emb_from or i > emb_to]
if len(train_indices) >= min_train_size:
folds.append(
WalkForwardFold(
train_indices=train_indices,
test_indices=list(range(test_start, test_end + 1)),
)
)
prev_test_end = test_end
test_start += step
return folds

View File

@@ -59,6 +59,8 @@ class Settings(BaseSettings):
# KIS VTS overseas balance API returns errors for most accounts.
# This value is used as a fallback when the balance API returns 0 in paper mode.
PAPER_OVERSEAS_CASH: float = Field(default=50000.0, ge=0.0)
USD_BUFFER_MIN: float = Field(default=1000.0, ge=0.0)
OVERNIGHT_EXCEPTION_ENABLED: bool = True
# Trading frequency mode (daily = batch API calls, realtime = per-stock calls)
TRADE_MODE: str = Field(default="daily", pattern="^(daily|realtime)$")

View File

@@ -31,8 +31,12 @@ def init_db(db_path: str) -> sqlite3.Connection:
quantity INTEGER,
price REAL,
pnl REAL DEFAULT 0.0,
strategy_pnl REAL DEFAULT 0.0,
fx_pnl REAL DEFAULT 0.0,
market TEXT DEFAULT 'KR',
exchange_code TEXT DEFAULT 'KRX',
session_id TEXT DEFAULT 'UNKNOWN',
selection_context TEXT,
decision_id TEXT,
mode TEXT DEFAULT 'paper'
)
@@ -53,6 +57,32 @@ def init_db(db_path: str) -> sqlite3.Connection:
conn.execute("ALTER TABLE trades ADD COLUMN decision_id TEXT")
if "mode" not in columns:
conn.execute("ALTER TABLE trades ADD COLUMN mode TEXT DEFAULT 'paper'")
session_id_added = False
if "session_id" not in columns:
conn.execute("ALTER TABLE trades ADD COLUMN session_id TEXT DEFAULT 'UNKNOWN'")
session_id_added = True
if "strategy_pnl" not in columns:
conn.execute("ALTER TABLE trades ADD COLUMN strategy_pnl REAL DEFAULT 0.0")
if "fx_pnl" not in columns:
conn.execute("ALTER TABLE trades ADD COLUMN fx_pnl REAL DEFAULT 0.0")
# Backfill legacy rows where only pnl existed before split accounting columns.
conn.execute(
"""
UPDATE trades
SET strategy_pnl = pnl, fx_pnl = 0.0
WHERE pnl != 0.0
AND strategy_pnl = 0.0
AND fx_pnl = 0.0
"""
)
if session_id_added:
conn.execute(
"""
UPDATE trades
SET session_id = 'UNKNOWN'
WHERE session_id IS NULL OR session_id = ''
"""
)
# Context tree tables for multi-layered memory management
conn.execute(
@@ -171,8 +201,11 @@ def log_trade(
quantity: int = 0,
price: float = 0.0,
pnl: float = 0.0,
strategy_pnl: float | None = None,
fx_pnl: float | None = None,
market: str = "KR",
exchange_code: str = "KRX",
session_id: str | None = None,
selection_context: dict[str, any] | None = None,
decision_id: str | None = None,
mode: str = "paper",
@@ -187,24 +220,37 @@ def log_trade(
rationale: AI decision rationale
quantity: Number of shares
price: Trade price
pnl: Profit/loss
pnl: Total profit/loss (backward compatibility)
strategy_pnl: Strategy PnL component
fx_pnl: FX PnL component
market: Market code
exchange_code: Exchange code
session_id: Session identifier (if omitted, auto-derived from market)
selection_context: Scanner selection data (RSI, volume_ratio, signal, score)
decision_id: Unique decision identifier for audit linking
mode: Trading mode ('paper' or 'live') for data separation
"""
# Serialize selection context to JSON
context_json = json.dumps(selection_context) if selection_context else None
resolved_session_id = _resolve_session_id(market=market, session_id=session_id)
if strategy_pnl is None and fx_pnl is None:
strategy_pnl = pnl
fx_pnl = 0.0
elif strategy_pnl is None:
strategy_pnl = pnl - float(fx_pnl or 0.0) if pnl != 0.0 else 0.0
elif fx_pnl is None:
fx_pnl = pnl - float(strategy_pnl) if pnl != 0.0 else 0.0
if pnl == 0.0 and (strategy_pnl or fx_pnl):
pnl = float(strategy_pnl) + float(fx_pnl)
conn.execute(
"""
INSERT INTO trades (
timestamp, stock_code, action, confidence, rationale,
quantity, price, pnl, market, exchange_code, selection_context, decision_id,
mode
quantity, price, pnl, strategy_pnl, fx_pnl,
market, exchange_code, session_id, selection_context, decision_id, mode
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
datetime.now(UTC).isoformat(),
@@ -215,8 +261,11 @@ def log_trade(
quantity,
price,
pnl,
strategy_pnl,
fx_pnl,
market,
exchange_code,
resolved_session_id,
context_json,
decision_id,
mode,
@@ -225,6 +274,21 @@ def log_trade(
conn.commit()
def _resolve_session_id(*, market: str, session_id: str | None) -> str:
if session_id:
return session_id
try:
from src.core.order_policy import classify_session_id
from src.markets.schedule import MARKETS
market_info = MARKETS.get(market)
if market_info is not None:
return classify_session_id(market_info)
except Exception:
pass
return "UNKNOWN"
def get_latest_buy_trade(
conn: sqlite3.Connection, stock_code: str, market: str
) -> dict[str, Any] | None:

View File

@@ -33,7 +33,11 @@ from src.core.blackout_manager import (
parse_blackout_windows_kst,
)
from src.core.kill_switch import KillSwitchOrchestrator
from src.core.order_policy import OrderPolicyRejected, validate_order_policy
from src.core.order_policy import (
OrderPolicyRejected,
get_session_info,
validate_order_policy,
)
from src.core.priority_queue import PriorityTaskQueue
from src.core.risk_manager import CircuitBreakerTripped, FatFingerRejected, RiskManager
from src.db import (
@@ -63,6 +67,7 @@ BLACKOUT_ORDER_MANAGER = BlackoutOrderManager(
windows=[],
max_queue_size=500,
)
_SESSION_CLOSE_WINDOWS = {"NXT_AFTER", "US_AFTER"}
def safe_float(value: str | float | None, default: float = 0.0) -> float:
@@ -429,6 +434,41 @@ def _determine_order_quantity(
return quantity
def _should_block_overseas_buy_for_fx_buffer(
*,
market: MarketInfo,
action: str,
total_cash: float,
order_amount: float,
settings: Settings | None,
) -> tuple[bool, float, float]:
if (
market.is_domestic
or not market.code.startswith("US")
or action != "BUY"
or settings is None
):
return False, total_cash - order_amount, 0.0
remaining = total_cash - order_amount
required = settings.USD_BUFFER_MIN
return remaining < required, remaining, required
def _should_force_exit_for_overnight(
*,
market: MarketInfo,
settings: Settings | None,
) -> bool:
session_id = get_session_info(market).session_id
if session_id not in _SESSION_CLOSE_WINDOWS:
return False
if KILL_SWITCH.new_orders_blocked:
return True
if settings is None:
return False
return not settings.OVERNIGHT_EXCEPTION_ENABLED
async def build_overseas_symbol_universe(
db_conn: Any,
overseas_broker: OverseasBroker,
@@ -1194,6 +1234,23 @@ async def trading_cycle(
loss_pct,
take_profit_threshold,
)
if decision.action == "HOLD" and _should_force_exit_for_overnight(
market=market,
settings=settings,
):
decision = TradeDecision(
action="SELL",
confidence=max(decision.confidence, 85),
rationale=(
"Forced exit by overnight policy"
" (session close window / kill switch priority)"
),
)
logger.info(
"Overnight policy override for %s (%s): HOLD -> SELL",
stock_code,
market.name,
)
logger.info(
"Decision for %s (%s): %s (confidence=%d)",
stock_code,
@@ -1254,7 +1311,7 @@ async def trading_cycle(
trade_price = current_price
trade_pnl = 0.0
if decision.action in ("BUY", "SELL"):
if KILL_SWITCH.new_orders_blocked:
if KILL_SWITCH.new_orders_blocked and decision.action == "BUY":
logger.critical(
"KillSwitch block active: skip %s order for %s (%s)",
decision.action,
@@ -1292,6 +1349,24 @@ async def trading_cycle(
)
return
order_amount = current_price * quantity
fx_blocked, remaining_cash, required_buffer = _should_block_overseas_buy_for_fx_buffer(
market=market,
action=decision.action,
total_cash=total_cash,
order_amount=order_amount,
settings=settings,
)
if fx_blocked:
logger.warning(
"Skip BUY %s (%s): FX buffer guard (remaining=%.2f, required=%.2f, cash=%.2f, order=%.2f)",
stock_code,
market.name,
remaining_cash,
required_buffer,
total_cash,
order_amount,
)
return
# 4. Check BUY cooldown (set when a prior BUY failed due to insufficient balance)
if decision.action == "BUY" and buy_cooldown is not None:
@@ -2285,6 +2360,25 @@ async def run_daily_session(
stock_code,
market.name,
)
if decision.action == "HOLD":
daily_open = get_open_position(db_conn, stock_code, market.code)
if daily_open and _should_force_exit_for_overnight(
market=market,
settings=settings,
):
decision = TradeDecision(
action="SELL",
confidence=max(decision.confidence, 85),
rationale=(
"Forced exit by overnight policy"
" (session close window / kill switch priority)"
),
)
logger.info(
"Daily overnight policy override for %s (%s): HOLD -> SELL",
stock_code,
market.name,
)
# Log decision
context_snapshot = {
@@ -2325,7 +2419,7 @@ async def run_daily_session(
trade_pnl = 0.0
order_succeeded = True
if decision.action in ("BUY", "SELL"):
if KILL_SWITCH.new_orders_blocked:
if KILL_SWITCH.new_orders_blocked and decision.action == "BUY":
logger.critical(
"KillSwitch block active: skip %s order for %s (%s)",
decision.action,
@@ -2360,6 +2454,24 @@ async def run_daily_session(
)
continue
order_amount = stock_data["current_price"] * quantity
fx_blocked, remaining_cash, required_buffer = _should_block_overseas_buy_for_fx_buffer(
market=market,
action=decision.action,
total_cash=total_cash,
order_amount=order_amount,
settings=settings,
)
if fx_blocked:
logger.warning(
"Skip BUY %s (%s): FX buffer guard (remaining=%.2f, required=%.2f, cash=%.2f, order=%.2f)",
stock_code,
market.name,
remaining_cash,
required_buffer,
total_cash,
order_amount,
)
continue
# Check BUY cooldown (insufficient balance)
if decision.action == "BUY":

View File

@@ -0,0 +1,83 @@
from __future__ import annotations
import pytest
from src.analysis.backtest_cost_guard import BacktestCostModel, validate_backtest_cost_model
def test_valid_backtest_cost_model_passes() -> None:
model = BacktestCostModel(
commission_bps=5.0,
slippage_bps_by_session={"KRX_REG": 10.0, "US_PRE": 50.0},
failure_rate_by_session={"KRX_REG": 0.01, "US_PRE": 0.08},
unfavorable_fill_required=True,
)
validate_backtest_cost_model(model=model, required_sessions=["KRX_REG", "US_PRE"])
def test_missing_required_slippage_session_raises() -> None:
model = BacktestCostModel(
commission_bps=5.0,
slippage_bps_by_session={"KRX_REG": 10.0},
failure_rate_by_session={"KRX_REG": 0.01, "US_PRE": 0.08},
unfavorable_fill_required=True,
)
with pytest.raises(ValueError, match="missing slippage_bps_by_session.*US_PRE"):
validate_backtest_cost_model(model=model, required_sessions=["KRX_REG", "US_PRE"])
def test_missing_required_failure_rate_session_raises() -> None:
model = BacktestCostModel(
commission_bps=5.0,
slippage_bps_by_session={"KRX_REG": 10.0, "US_PRE": 50.0},
failure_rate_by_session={"KRX_REG": 0.01},
unfavorable_fill_required=True,
)
with pytest.raises(ValueError, match="missing failure_rate_by_session.*US_PRE"):
validate_backtest_cost_model(model=model, required_sessions=["KRX_REG", "US_PRE"])
def test_invalid_failure_rate_range_raises() -> None:
model = BacktestCostModel(
commission_bps=5.0,
slippage_bps_by_session={"KRX_REG": 10.0},
failure_rate_by_session={"KRX_REG": 1.2},
unfavorable_fill_required=True,
)
with pytest.raises(ValueError, match="failure rate must be within"):
validate_backtest_cost_model(model=model, required_sessions=["KRX_REG"])
def test_unfavorable_fill_requirement_cannot_be_disabled() -> None:
model = BacktestCostModel(
commission_bps=5.0,
slippage_bps_by_session={"KRX_REG": 10.0},
failure_rate_by_session={"KRX_REG": 0.02},
unfavorable_fill_required=False,
)
with pytest.raises(ValueError, match="unfavorable_fill_required must be True"):
validate_backtest_cost_model(model=model, required_sessions=["KRX_REG"])
@pytest.mark.parametrize("bad_commission", [float("nan"), float("inf"), float("-inf")])
def test_non_finite_commission_rejected(bad_commission: float) -> None:
model = BacktestCostModel(
commission_bps=bad_commission,
slippage_bps_by_session={"KRX_REG": 10.0},
failure_rate_by_session={"KRX_REG": 0.02},
unfavorable_fill_required=True,
)
with pytest.raises(ValueError, match="commission_bps"):
validate_backtest_cost_model(model=model, required_sessions=["KRX_REG"])
@pytest.mark.parametrize("bad_slippage", [float("nan"), float("inf"), float("-inf")])
def test_non_finite_slippage_rejected(bad_slippage: float) -> None:
model = BacktestCostModel(
commission_bps=5.0,
slippage_bps_by_session={"KRX_REG": bad_slippage},
failure_rate_by_session={"KRX_REG": 0.02},
unfavorable_fill_required=True,
)
with pytest.raises(ValueError, match="slippage bps"):
validate_backtest_cost_model(model=model, required_sessions=["KRX_REG"])

View File

@@ -0,0 +1,108 @@
from __future__ import annotations
import pytest
from src.analysis.backtest_execution_model import (
BacktestExecutionModel,
ExecutionAssumptions,
ExecutionRequest,
)
def test_buy_uses_unfavorable_slippage_direction() -> None:
model = BacktestExecutionModel(
ExecutionAssumptions(
slippage_bps_by_session={"US_PRE": 50.0},
failure_rate_by_session={"US_PRE": 0.0},
partial_fill_rate_by_session={"US_PRE": 0.0},
seed=1,
)
)
out = model.simulate(
ExecutionRequest(side="BUY", session_id="US_PRE", qty=10, reference_price=100.0)
)
assert out.status == "FILLED"
assert out.avg_price == pytest.approx(100.5)
def test_sell_uses_unfavorable_slippage_direction() -> None:
model = BacktestExecutionModel(
ExecutionAssumptions(
slippage_bps_by_session={"US_PRE": 50.0},
failure_rate_by_session={"US_PRE": 0.0},
partial_fill_rate_by_session={"US_PRE": 0.0},
seed=1,
)
)
out = model.simulate(
ExecutionRequest(side="SELL", session_id="US_PRE", qty=10, reference_price=100.0)
)
assert out.status == "FILLED"
assert out.avg_price == pytest.approx(99.5)
def test_failure_rate_can_reject_order() -> None:
model = BacktestExecutionModel(
ExecutionAssumptions(
slippage_bps_by_session={"KRX_REG": 10.0},
failure_rate_by_session={"KRX_REG": 1.0},
partial_fill_rate_by_session={"KRX_REG": 0.0},
seed=42,
)
)
out = model.simulate(
ExecutionRequest(side="BUY", session_id="KRX_REG", qty=10, reference_price=100.0)
)
assert out.status == "REJECTED"
assert out.filled_qty == 0
def test_partial_fill_applies_when_rate_is_one() -> None:
model = BacktestExecutionModel(
ExecutionAssumptions(
slippage_bps_by_session={"KRX_REG": 0.0},
failure_rate_by_session={"KRX_REG": 0.0},
partial_fill_rate_by_session={"KRX_REG": 1.0},
partial_fill_min_ratio=0.4,
partial_fill_max_ratio=0.4,
seed=0,
)
)
out = model.simulate(
ExecutionRequest(side="BUY", session_id="KRX_REG", qty=10, reference_price=100.0)
)
assert out.status == "PARTIAL"
assert out.filled_qty == 4
assert out.avg_price == 100.0
@pytest.mark.parametrize("bad_slip", [-1.0, float("nan"), float("inf")])
def test_invalid_slippage_is_rejected(bad_slip: float) -> None:
with pytest.raises(ValueError, match="slippage_bps"):
BacktestExecutionModel(
ExecutionAssumptions(
slippage_bps_by_session={"US_PRE": bad_slip},
failure_rate_by_session={"US_PRE": 0.0},
partial_fill_rate_by_session={"US_PRE": 0.0},
)
)
@pytest.mark.parametrize("bad_rate", [-0.1, 1.1, float("nan")])
def test_invalid_failure_or_partial_rates_are_rejected(bad_rate: float) -> None:
with pytest.raises(ValueError, match="failure_rate"):
BacktestExecutionModel(
ExecutionAssumptions(
slippage_bps_by_session={"US_PRE": 10.0},
failure_rate_by_session={"US_PRE": bad_rate},
partial_fill_rate_by_session={"US_PRE": 0.0},
)
)
with pytest.raises(ValueError, match="partial_fill_rate"):
BacktestExecutionModel(
ExecutionAssumptions(
slippage_bps_by_session={"US_PRE": 10.0},
failure_rate_by_session={"US_PRE": 0.0},
partial_fill_rate_by_session={"US_PRE": bad_rate},
)
)

View File

@@ -155,6 +155,9 @@ def test_mode_column_exists_in_schema() -> None:
cursor = conn.execute("PRAGMA table_info(trades)")
columns = {row[1] for row in cursor.fetchall()}
assert "mode" in columns
assert "session_id" in columns
assert "strategy_pnl" in columns
assert "fx_pnl" in columns
def test_mode_migration_adds_column_to_existing_db() -> None:
@@ -182,6 +185,13 @@ def test_mode_migration_adds_column_to_existing_db() -> None:
decision_id TEXT
)"""
)
old_conn.execute(
"""
INSERT INTO trades (
timestamp, stock_code, action, confidence, rationale, quantity, price, pnl
) VALUES ('2026-01-01T00:00:00+00:00', 'AAPL', 'SELL', 90, 'legacy', 1, 100.0, 123.45)
"""
)
old_conn.commit()
old_conn.close()
@@ -190,6 +200,132 @@ def test_mode_migration_adds_column_to_existing_db() -> None:
cursor = conn.execute("PRAGMA table_info(trades)")
columns = {row[1] for row in cursor.fetchall()}
assert "mode" in columns
assert "session_id" in columns
assert "strategy_pnl" in columns
assert "fx_pnl" in columns
migrated = conn.execute(
"SELECT pnl, strategy_pnl, fx_pnl, session_id FROM trades WHERE stock_code='AAPL' LIMIT 1"
).fetchone()
assert migrated is not None
assert migrated[0] == 123.45
assert migrated[1] == 123.45
assert migrated[2] == 0.0
assert migrated[3] == "UNKNOWN"
conn.close()
finally:
os.unlink(db_path)
def test_log_trade_stores_strategy_and_fx_pnl_separately() -> None:
conn = init_db(":memory:")
log_trade(
conn=conn,
stock_code="AAPL",
action="SELL",
confidence=90,
rationale="fx split",
pnl=120.0,
strategy_pnl=100.0,
fx_pnl=20.0,
market="US_NASDAQ",
exchange_code="NASD",
)
row = conn.execute(
"SELECT pnl, strategy_pnl, fx_pnl FROM trades ORDER BY id DESC LIMIT 1"
).fetchone()
assert row is not None
assert row[0] == 120.0
assert row[1] == 100.0
assert row[2] == 20.0
def test_log_trade_backward_compat_sets_strategy_pnl_from_pnl() -> None:
conn = init_db(":memory:")
log_trade(
conn=conn,
stock_code="005930",
action="SELL",
confidence=80,
rationale="legacy",
pnl=50.0,
market="KR",
exchange_code="KRX",
)
row = conn.execute(
"SELECT pnl, strategy_pnl, fx_pnl FROM trades ORDER BY id DESC LIMIT 1"
).fetchone()
assert row is not None
assert row[0] == 50.0
assert row[1] == 50.0
assert row[2] == 0.0
def test_log_trade_partial_fx_input_does_not_infer_negative_strategy_pnl() -> None:
conn = init_db(":memory:")
log_trade(
conn=conn,
stock_code="AAPL",
action="SELL",
confidence=70,
rationale="fx only",
pnl=0.0,
fx_pnl=10.0,
market="US_NASDAQ",
exchange_code="NASD",
)
row = conn.execute(
"SELECT pnl, strategy_pnl, fx_pnl FROM trades ORDER BY id DESC LIMIT 1"
).fetchone()
assert row is not None
assert row[0] == 10.0
assert row[1] == 0.0
assert row[2] == 10.0
def test_log_trade_persists_explicit_session_id() -> None:
conn = init_db(":memory:")
log_trade(
conn=conn,
stock_code="AAPL",
action="BUY",
confidence=70,
rationale="session test",
market="US_NASDAQ",
exchange_code="NASD",
session_id="US_PRE",
)
row = conn.execute("SELECT session_id FROM trades ORDER BY id DESC LIMIT 1").fetchone()
assert row is not None
assert row[0] == "US_PRE"
def test_log_trade_auto_derives_session_id_when_not_provided() -> None:
conn = init_db(":memory:")
log_trade(
conn=conn,
stock_code="005930",
action="BUY",
confidence=70,
rationale="auto session",
market="KR",
exchange_code="KRX",
)
row = conn.execute("SELECT session_id FROM trades ORDER BY id DESC LIMIT 1").fetchone()
assert row is not None
assert row[0] != "UNKNOWN"
def test_log_trade_unknown_market_falls_back_to_unknown_session() -> None:
conn = init_db(":memory:")
log_trade(
conn=conn,
stock_code="X",
action="BUY",
confidence=70,
rationale="unknown market",
market="MARS",
exchange_code="MARS",
)
row = conn.execute("SELECT session_id FROM trades ORDER BY id DESC LIMIT 1").fetchone()
assert row is not None
assert row[0] == "UNKNOWN"

View File

@@ -15,6 +15,8 @@ from src.evolution.scorecard import DailyScorecard
from src.logging.decision_logger import DecisionLogger
from src.main import (
KILL_SWITCH,
_should_force_exit_for_overnight,
_should_block_overseas_buy_for_fx_buffer,
_trigger_emergency_kill_switch,
_apply_dashboard_flag,
_determine_order_quantity,
@@ -3690,6 +3692,81 @@ class TestOverseasBrokerIntegration:
# DB도 브로커도 보유 없음 → BUY 주문이 실행되어야 함 (회귀 테스트)
overseas_broker.send_overseas_order.assert_called_once()
@pytest.mark.asyncio
async def test_overseas_buy_blocked_by_usd_buffer_guard(self) -> None:
"""Overseas BUY must be blocked when USD buffer would be breached."""
db_conn = init_db(":memory:")
overseas_broker = MagicMock()
overseas_broker.get_overseas_price = AsyncMock(
return_value={"output": {"last": "182.50"}}
)
overseas_broker.get_overseas_balance = AsyncMock(
return_value={
"output1": [],
"output2": [
{
"frcr_evlu_tota": "50000.00",
"frcr_buy_amt_smtl": "0.00",
}
],
}
)
overseas_broker.get_overseas_buying_power = AsyncMock(
return_value={"output": {"ovrs_ord_psbl_amt": "50000.00"}}
)
overseas_broker.send_overseas_order = AsyncMock(return_value={"msg1": "주문접수"})
engine = MagicMock(spec=ScenarioEngine)
engine.evaluate = MagicMock(return_value=_make_buy_match("AAPL"))
market = MagicMock()
market.name = "NASDAQ"
market.code = "US_NASDAQ"
market.exchange_code = "NASD"
market.is_domestic = False
telegram = MagicMock()
telegram.notify_trade_execution = AsyncMock()
telegram.notify_fat_finger = AsyncMock()
telegram.notify_circuit_breaker = AsyncMock()
telegram.notify_scenario_matched = AsyncMock()
decision_logger = MagicMock()
decision_logger.log_decision = MagicMock(return_value="decision-id")
settings = MagicMock()
settings.POSITION_SIZING_ENABLED = False
settings.CONFIDENCE_THRESHOLD = 80
settings.USD_BUFFER_MIN = 49900.0
settings.MODE = "paper"
settings.PAPER_OVERSEAS_CASH = 50000.0
await trading_cycle(
broker=MagicMock(),
overseas_broker=overseas_broker,
scenario_engine=engine,
playbook=_make_playbook(market="US"),
risk=MagicMock(),
db_conn=db_conn,
decision_logger=decision_logger,
context_store=MagicMock(
get_latest_timeframe=MagicMock(return_value=None),
set_context=MagicMock(),
),
criticality_assessor=MagicMock(
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
get_timeout=MagicMock(return_value=5.0),
),
telegram=telegram,
market=market,
stock_code="AAPL",
scan_candidates={},
settings=settings,
)
overseas_broker.send_overseas_order.assert_not_called()
# ---------------------------------------------------------------------------
# _retry_connection — unit tests (issue #209)
@@ -3723,7 +3800,6 @@ class TestRetryConnection:
with patch("src.main.asyncio.sleep") as mock_sleep:
mock_sleep.return_value = None
result = await _retry_connection(flaky, label="flaky")
assert result == "ok"
assert call_count == 2
mock_sleep.assert_called_once()
@@ -3778,6 +3854,48 @@ class TestRetryConnection:
assert call_count == 1 # No retry for non-ConnectionError
def test_fx_buffer_guard_applies_only_to_us_and_respects_boundary() -> None:
settings = MagicMock()
settings.USD_BUFFER_MIN = 1000.0
us_market = MagicMock()
us_market.is_domestic = False
us_market.code = "US_NASDAQ"
blocked, remaining, required = _should_block_overseas_buy_for_fx_buffer(
market=us_market,
action="BUY",
total_cash=5000.0,
order_amount=4001.0,
settings=settings,
)
assert blocked
assert remaining == 999.0
assert required == 1000.0
blocked_eq, _, _ = _should_block_overseas_buy_for_fx_buffer(
market=us_market,
action="BUY",
total_cash=5000.0,
order_amount=4000.0,
settings=settings,
)
assert not blocked_eq
jp_market = MagicMock()
jp_market.is_domestic = False
jp_market.code = "JP"
blocked_jp, _, required_jp = _should_block_overseas_buy_for_fx_buffer(
market=jp_market,
action="BUY",
total_cash=5000.0,
order_amount=4500.0,
settings=settings,
)
assert not blocked_jp
assert required_jp == 0.0
# run_daily_session — daily CB baseline (daily_start_eval) tests (issue #207)
# ---------------------------------------------------------------------------
@@ -5193,6 +5311,88 @@ async def test_order_policy_rejection_skips_order_execution() -> None:
broker.send_order.assert_not_called()
def test_overnight_policy_prioritizes_killswitch_over_exception() -> None:
market = MagicMock()
with patch("src.main.get_session_info", return_value=MagicMock(session_id="US_AFTER")):
settings = MagicMock()
settings.OVERNIGHT_EXCEPTION_ENABLED = True
try:
KILL_SWITCH.new_orders_blocked = True
assert _should_force_exit_for_overnight(market=market, settings=settings)
finally:
KILL_SWITCH.clear_block()
@pytest.mark.asyncio
async def test_kill_switch_block_does_not_block_sell_reduction() -> None:
"""KillSwitch should block BUY entries, but allow SELL risk reduction orders."""
db_conn = init_db(":memory:")
decision_logger = DecisionLogger(db_conn)
broker = MagicMock()
broker.get_current_price = AsyncMock(return_value=(100.0, 0.5, 0.0))
broker.get_balance = AsyncMock(
return_value={
"output1": [{"pdno": "005930", "ord_psbl_qty": "3"}],
"output2": [
{
"tot_evlu_amt": "100000",
"dnca_tot_amt": "50000",
"pchs_amt_smtl_amt": "50000",
}
],
}
)
broker.send_order = AsyncMock(return_value={"msg1": "OK"})
market = MagicMock()
market.name = "Korea"
market.code = "KR"
market.exchange_code = "KRX"
market.is_domestic = True
telegram = MagicMock()
telegram.notify_trade_execution = AsyncMock()
telegram.notify_fat_finger = AsyncMock()
telegram.notify_circuit_breaker = AsyncMock()
telegram.notify_scenario_matched = AsyncMock()
settings = MagicMock()
settings.POSITION_SIZING_ENABLED = False
settings.CONFIDENCE_THRESHOLD = 80
settings.OVERNIGHT_EXCEPTION_ENABLED = True
settings.MODE = "paper"
try:
KILL_SWITCH.new_orders_blocked = True
await trading_cycle(
broker=broker,
overseas_broker=MagicMock(),
scenario_engine=MagicMock(evaluate=MagicMock(return_value=_make_sell_match())),
playbook=_make_playbook(),
risk=MagicMock(),
db_conn=db_conn,
decision_logger=decision_logger,
context_store=MagicMock(
get_latest_timeframe=MagicMock(return_value=None),
set_context=MagicMock(),
),
criticality_assessor=MagicMock(
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
get_timeout=MagicMock(return_value=5.0),
),
telegram=telegram,
market=market,
stock_code="005930",
scan_candidates={},
settings=settings,
)
finally:
KILL_SWITCH.clear_block()
broker.send_order.assert_called_once()
@pytest.mark.asyncio
async def test_blackout_queues_order_and_skips_submission() -> None:
"""When blackout is active, order submission is replaced by queueing."""

View File

@@ -0,0 +1,131 @@
from __future__ import annotations
from src.analysis.triple_barrier import TripleBarrierSpec, label_with_triple_barrier
def test_long_take_profit_first() -> None:
highs = [100, 101, 103]
lows = [100, 99.6, 100]
closes = [100, 100, 102]
spec = TripleBarrierSpec(take_profit_pct=0.02, stop_loss_pct=0.01, max_holding_bars=3)
out = label_with_triple_barrier(
highs=highs,
lows=lows,
closes=closes,
entry_index=0,
side=1,
spec=spec,
)
assert out.label == 1
assert out.touched == "take_profit"
assert out.touch_bar == 2
def test_long_stop_loss_first() -> None:
highs = [100, 100.5, 101]
lows = [100, 98.8, 99]
closes = [100, 99.5, 100]
spec = TripleBarrierSpec(take_profit_pct=0.02, stop_loss_pct=0.01, max_holding_bars=3)
out = label_with_triple_barrier(
highs=highs,
lows=lows,
closes=closes,
entry_index=0,
side=1,
spec=spec,
)
assert out.label == -1
assert out.touched == "stop_loss"
assert out.touch_bar == 1
def test_time_barrier_timeout() -> None:
highs = [100, 100.8, 100.7]
lows = [100, 99.3, 99.4]
closes = [100, 100, 100]
spec = TripleBarrierSpec(take_profit_pct=0.02, stop_loss_pct=0.02, max_holding_bars=2)
out = label_with_triple_barrier(
highs=highs,
lows=lows,
closes=closes,
entry_index=0,
side=1,
spec=spec,
)
assert out.label == 0
assert out.touched == "time"
assert out.touch_bar == 2
def test_tie_break_stop_first_default() -> None:
highs = [100, 102.1]
lows = [100, 98.9]
closes = [100, 100]
spec = TripleBarrierSpec(take_profit_pct=0.02, stop_loss_pct=0.01, max_holding_bars=1)
out = label_with_triple_barrier(
highs=highs,
lows=lows,
closes=closes,
entry_index=0,
side=1,
spec=spec,
)
assert out.label == -1
assert out.touched == "stop_loss"
def test_short_side_inverts_barrier_semantics() -> None:
highs = [100, 100.5, 101.2]
lows = [100, 97.8, 98.0]
closes = [100, 99, 99]
spec = TripleBarrierSpec(take_profit_pct=0.02, stop_loss_pct=0.01, max_holding_bars=3)
out = label_with_triple_barrier(
highs=highs,
lows=lows,
closes=closes,
entry_index=0,
side=-1,
spec=spec,
)
assert out.label == 1
assert out.touched == "take_profit"
def test_short_tie_break_modes() -> None:
highs = [100, 101.1]
lows = [100, 97.9]
closes = [100, 100]
stop_first = TripleBarrierSpec(
take_profit_pct=0.02,
stop_loss_pct=0.01,
max_holding_bars=1,
tie_break="stop_first",
)
out_stop = label_with_triple_barrier(
highs=highs,
lows=lows,
closes=closes,
entry_index=0,
side=-1,
spec=stop_first,
)
assert out_stop.label == -1
assert out_stop.touched == "stop_loss"
take_first = TripleBarrierSpec(
take_profit_pct=0.02,
stop_loss_pct=0.01,
max_holding_bars=1,
tie_break="take_first",
)
out_take = label_with_triple_barrier(
highs=highs,
lows=lows,
closes=closes,
entry_index=0,
side=-1,
spec=take_first,
)
assert out_take.label == 1
assert out_take.touched == "take_profit"

View File

@@ -0,0 +1,92 @@
from __future__ import annotations
import pytest
from src.analysis.walk_forward_split import generate_walk_forward_splits
def test_generates_sequential_folds() -> None:
folds = generate_walk_forward_splits(
n_samples=30,
train_size=10,
test_size=5,
)
assert len(folds) == 4
assert folds[0].train_indices == list(range(0, 10))
assert folds[0].test_indices == list(range(10, 15))
assert folds[1].train_indices == list(range(5, 15))
assert folds[1].test_indices == list(range(15, 20))
def test_purge_removes_boundary_samples_before_test() -> None:
folds = generate_walk_forward_splits(
n_samples=25,
train_size=8,
test_size=4,
purge_size=2,
)
first = folds[0]
# test starts at 10, purge=2 => train end must be 7
assert first.train_indices == list(range(0, 8))
assert first.test_indices == list(range(10, 14))
def test_embargo_excludes_post_test_samples_from_next_train() -> None:
folds = generate_walk_forward_splits(
n_samples=45,
train_size=15,
test_size=5,
step_size=10,
embargo_size=3,
)
assert len(folds) >= 2
# Fold1 test: 15..19, next fold train window: 10..24.
# embargo_size=3 should remove 20,21,22 from fold2 train.
second_train = folds[1].train_indices
assert 20 not in second_train
assert 21 not in second_train
assert 22 not in second_train
assert 23 in second_train
def test_respects_min_train_size_and_returns_empty_when_impossible() -> None:
folds = generate_walk_forward_splits(
n_samples=15,
train_size=5,
test_size=5,
min_train_size=6,
)
assert folds == []
def test_embargo_uses_last_accepted_fold_when_intermediate_fold_skips() -> None:
folds = generate_walk_forward_splits(
n_samples=30,
train_size=5,
test_size=3,
step_size=5,
embargo_size=1,
min_train_size=5,
)
# 1st fold accepted, 2nd skipped by min_train_size, subsequent folds still generated.
assert len(folds) == 3
assert folds[0].test_indices == [5, 6, 7]
assert folds[1].test_indices == [15, 16, 17]
assert folds[2].test_indices == [25, 26, 27]
@pytest.mark.parametrize(
("n_samples", "train_size", "test_size"),
[
(0, 10, 2),
(10, 0, 2),
(10, 5, 0),
],
)
def test_invalid_args_raise(n_samples: int, train_size: int, test_size: int) -> None:
with pytest.raises(ValueError):
generate_walk_forward_splits(
n_samples=n_samples,
train_size=train_size,
test_size=test_size,
)