Compare commits

..

9 Commits

10 changed files with 586 additions and 1 deletions

View File

@@ -149,6 +149,7 @@ TPM 티켓 운영 규칙:
- TPM은 합의된 변경을 이슈로 등록하고 우선순위(`P0/P1/P2`)를 지정한다. - TPM은 합의된 변경을 이슈로 등록하고 우선순위(`P0/P1/P2`)를 지정한다.
- PR 본문에는 TPM이 지정한 우선순위와 범위가 그대로 반영되어야 한다. - PR 본문에는 TPM이 지정한 우선순위와 범위가 그대로 반영되어야 한다.
- 우선순위 변경은 TPM 제안 + Main Agent 승인으로만 가능하다. - 우선순위 변경은 TPM 제안 + Main Agent 승인으로만 가능하다.
- PM/TPM/Dev/Reviewer/Verifier/Runtime Verifier는 주요 의사결정 시점마다 PR 코멘트를 남겨 결정 근거를 추적 가능 상태로 유지한다.
브랜치 운영 규칙: 브랜치 운영 규칙:
- TPM은 각 티켓에 대해 `ticket temp branch -> program feature branch` PR 경로를 지정한다. - TPM은 각 티켓에 대해 `ticket temp branch -> program feature branch` PR 경로를 지정한다.

View File

@@ -50,6 +50,7 @@ Updated: 2026-02-26
- PR 본문에 `REQ-*`, `TASK-*`, `TEST-*` 매핑 표 존재 - PR 본문에 `REQ-*`, `TASK-*`, `TEST-*` 매핑 표 존재
- `src/core/risk_manager.py` 변경 없음 - `src/core/risk_manager.py` 변경 없음
- 주요 의사결정 체크포인트(DCP-01~04) 중 해당 단계 Main Agent 확인 기록 존재 - 주요 의사결정 체크포인트(DCP-01~04) 중 해당 단계 Main Agent 확인 기록 존재
- 주요 의사결정(리뷰 지적/수정 합의/검증 승인)에 대한 에이전트 PR 코멘트 존재
- 티켓 PR의 base가 `main`이 아닌 program feature branch인지 확인 - 티켓 PR의 base가 `main`이 아닌 program feature branch인지 확인
자동 점검: 자동 점검:

View File

@@ -22,6 +22,7 @@
- Ticket-level development happens only on **ticket temp branches** cut from the program feature branch. - Ticket-level development happens only on **ticket temp branches** cut from the program feature branch.
- Ticket PR merges into program feature branch are allowed after verifier approval. - Ticket PR merges into program feature branch are allowed after verifier approval.
- Until final user sign-off, `main` merge is prohibited. - Until final user sign-off, `main` merge is prohibited.
- 각 에이전트는 주요 의사결정(리뷰 지적, 수정 방향, 검증 승인)마다 PR 코멘트를 적극 작성해 의사결정 과정을 남긴다.
## Gitea CLI Formatting Troubleshooting ## Gitea CLI Formatting Troubleshooting

View File

@@ -0,0 +1,111 @@
"""Triple barrier labeler utilities.
Implements first-touch labeling with upper/lower/time barriers.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Literal, Sequence
TieBreakMode = Literal["stop_first", "take_first"]
@dataclass(frozen=True)
class TripleBarrierSpec:
take_profit_pct: float
stop_loss_pct: float
max_holding_bars: int
tie_break: TieBreakMode = "stop_first"
@dataclass(frozen=True)
class TripleBarrierLabel:
label: int # +1 take-profit first, -1 stop-loss first, 0 timeout
touched: Literal["take_profit", "stop_loss", "time"]
touch_bar: int
entry_price: float
upper_barrier: float
lower_barrier: float
def label_with_triple_barrier(
*,
highs: Sequence[float],
lows: Sequence[float],
closes: Sequence[float],
entry_index: int,
side: int,
spec: TripleBarrierSpec,
) -> TripleBarrierLabel:
"""Label one entry using triple-barrier first-touch rule.
Args:
highs/lows/closes: OHLC components with identical length.
entry_index: Entry bar index in the sequences.
side: +1 for long, -1 for short.
spec: Barrier specification.
"""
if side not in {1, -1}:
raise ValueError("side must be +1 or -1")
if len(highs) != len(lows) or len(highs) != len(closes):
raise ValueError("highs, lows, closes lengths must match")
if entry_index < 0 or entry_index >= len(closes):
raise IndexError("entry_index out of range")
if spec.max_holding_bars <= 0:
raise ValueError("max_holding_bars must be positive")
entry_price = float(closes[entry_index])
if entry_price <= 0:
raise ValueError("entry price must be positive")
if side == 1:
upper = entry_price * (1.0 + spec.take_profit_pct)
lower = entry_price * (1.0 - spec.stop_loss_pct)
else:
# For short side, favorable move is down.
upper = entry_price * (1.0 + spec.stop_loss_pct)
lower = entry_price * (1.0 - spec.take_profit_pct)
last_index = min(len(closes) - 1, entry_index + spec.max_holding_bars)
for idx in range(entry_index + 1, last_index + 1):
h = float(highs[idx])
l = float(lows[idx])
up_touch = h >= upper
down_touch = l <= lower
if not up_touch and not down_touch:
continue
if up_touch and down_touch:
if spec.tie_break == "stop_first":
touched = "stop_loss"
label = -1
else:
touched = "take_profit"
label = 1
elif up_touch:
touched = "take_profit" if side == 1 else "stop_loss"
label = 1 if side == 1 else -1
else:
touched = "stop_loss" if side == 1 else "take_profit"
label = -1 if side == 1 else 1
return TripleBarrierLabel(
label=label,
touched=touched,
touch_bar=idx,
entry_price=entry_price,
upper_barrier=upper,
lower_barrier=lower,
)
return TripleBarrierLabel(
label=0,
touched="time",
touch_bar=last_index,
entry_price=entry_price,
upper_barrier=upper,
lower_barrier=lower,
)

View File

@@ -0,0 +1,74 @@
"""Walk-forward splitter with purge/embargo controls."""
from __future__ import annotations
from dataclasses import dataclass
@dataclass(frozen=True)
class WalkForwardFold:
train_indices: list[int]
test_indices: list[int]
@property
def train_size(self) -> int:
return len(self.train_indices)
@property
def test_size(self) -> int:
return len(self.test_indices)
def generate_walk_forward_splits(
*,
n_samples: int,
train_size: int,
test_size: int,
step_size: int | None = None,
purge_size: int = 0,
embargo_size: int = 0,
min_train_size: int = 1,
) -> list[WalkForwardFold]:
"""Generate chronological folds with purge/embargo leakage controls."""
if n_samples <= 0:
raise ValueError("n_samples must be positive")
if train_size <= 0 or test_size <= 0:
raise ValueError("train_size and test_size must be positive")
if purge_size < 0 or embargo_size < 0:
raise ValueError("purge_size and embargo_size must be >= 0")
if min_train_size <= 0:
raise ValueError("min_train_size must be positive")
step = step_size if step_size is not None else test_size
if step <= 0:
raise ValueError("step_size must be positive")
folds: list[WalkForwardFold] = []
prev_test_end: int | None = None
test_start = train_size + purge_size
while test_start + test_size <= n_samples:
test_end = test_start + test_size - 1
train_end = test_start - purge_size - 1
if train_end < 0:
break
train_start = max(0, train_end - train_size + 1)
train_indices = list(range(train_start, train_end + 1))
if prev_test_end is not None and embargo_size > 0:
emb_from = prev_test_end + 1
emb_to = prev_test_end + embargo_size
train_indices = [i for i in train_indices if i < emb_from or i > emb_to]
if len(train_indices) >= min_train_size:
folds.append(
WalkForwardFold(
train_indices=train_indices,
test_indices=list(range(test_start, test_end + 1)),
)
)
prev_test_end = test_end
test_start += step
return folds

View File

@@ -59,6 +59,7 @@ class Settings(BaseSettings):
# KIS VTS overseas balance API returns errors for most accounts. # KIS VTS overseas balance API returns errors for most accounts.
# This value is used as a fallback when the balance API returns 0 in paper mode. # This value is used as a fallback when the balance API returns 0 in paper mode.
PAPER_OVERSEAS_CASH: float = Field(default=50000.0, ge=0.0) PAPER_OVERSEAS_CASH: float = Field(default=50000.0, ge=0.0)
USD_BUFFER_MIN: float = Field(default=1000.0, ge=0.0)
# Trading frequency mode (daily = batch API calls, realtime = per-stock calls) # Trading frequency mode (daily = batch API calls, realtime = per-stock calls)
TRADE_MODE: str = Field(default="daily", pattern="^(daily|realtime)$") TRADE_MODE: str = Field(default="daily", pattern="^(daily|realtime)$")

View File

@@ -429,6 +429,26 @@ def _determine_order_quantity(
return quantity return quantity
def _should_block_overseas_buy_for_fx_buffer(
*,
market: MarketInfo,
action: str,
total_cash: float,
order_amount: float,
settings: Settings | None,
) -> tuple[bool, float, float]:
if (
market.is_domestic
or not market.code.startswith("US")
or action != "BUY"
or settings is None
):
return False, total_cash - order_amount, 0.0
remaining = total_cash - order_amount
required = settings.USD_BUFFER_MIN
return remaining < required, remaining, required
async def build_overseas_symbol_universe( async def build_overseas_symbol_universe(
db_conn: Any, db_conn: Any,
overseas_broker: OverseasBroker, overseas_broker: OverseasBroker,
@@ -1292,6 +1312,24 @@ async def trading_cycle(
) )
return return
order_amount = current_price * quantity order_amount = current_price * quantity
fx_blocked, remaining_cash, required_buffer = _should_block_overseas_buy_for_fx_buffer(
market=market,
action=decision.action,
total_cash=total_cash,
order_amount=order_amount,
settings=settings,
)
if fx_blocked:
logger.warning(
"Skip BUY %s (%s): FX buffer guard (remaining=%.2f, required=%.2f, cash=%.2f, order=%.2f)",
stock_code,
market.name,
remaining_cash,
required_buffer,
total_cash,
order_amount,
)
return
# 4. Check BUY cooldown (set when a prior BUY failed due to insufficient balance) # 4. Check BUY cooldown (set when a prior BUY failed due to insufficient balance)
if decision.action == "BUY" and buy_cooldown is not None: if decision.action == "BUY" and buy_cooldown is not None:
@@ -2360,6 +2398,24 @@ async def run_daily_session(
) )
continue continue
order_amount = stock_data["current_price"] * quantity order_amount = stock_data["current_price"] * quantity
fx_blocked, remaining_cash, required_buffer = _should_block_overseas_buy_for_fx_buffer(
market=market,
action=decision.action,
total_cash=total_cash,
order_amount=order_amount,
settings=settings,
)
if fx_blocked:
logger.warning(
"Skip BUY %s (%s): FX buffer guard (remaining=%.2f, required=%.2f, cash=%.2f, order=%.2f)",
stock_code,
market.name,
remaining_cash,
required_buffer,
total_cash,
order_amount,
)
continue
# Check BUY cooldown (insufficient balance) # Check BUY cooldown (insufficient balance)
if decision.action == "BUY": if decision.action == "BUY":

View File

@@ -15,6 +15,7 @@ from src.evolution.scorecard import DailyScorecard
from src.logging.decision_logger import DecisionLogger from src.logging.decision_logger import DecisionLogger
from src.main import ( from src.main import (
KILL_SWITCH, KILL_SWITCH,
_should_block_overseas_buy_for_fx_buffer,
_trigger_emergency_kill_switch, _trigger_emergency_kill_switch,
_apply_dashboard_flag, _apply_dashboard_flag,
_determine_order_quantity, _determine_order_quantity,
@@ -3690,6 +3691,81 @@ class TestOverseasBrokerIntegration:
# DB도 브로커도 보유 없음 → BUY 주문이 실행되어야 함 (회귀 테스트) # DB도 브로커도 보유 없음 → BUY 주문이 실행되어야 함 (회귀 테스트)
overseas_broker.send_overseas_order.assert_called_once() overseas_broker.send_overseas_order.assert_called_once()
@pytest.mark.asyncio
async def test_overseas_buy_blocked_by_usd_buffer_guard(self) -> None:
"""Overseas BUY must be blocked when USD buffer would be breached."""
db_conn = init_db(":memory:")
overseas_broker = MagicMock()
overseas_broker.get_overseas_price = AsyncMock(
return_value={"output": {"last": "182.50"}}
)
overseas_broker.get_overseas_balance = AsyncMock(
return_value={
"output1": [],
"output2": [
{
"frcr_evlu_tota": "50000.00",
"frcr_buy_amt_smtl": "0.00",
}
],
}
)
overseas_broker.get_overseas_buying_power = AsyncMock(
return_value={"output": {"ovrs_ord_psbl_amt": "50000.00"}}
)
overseas_broker.send_overseas_order = AsyncMock(return_value={"msg1": "주문접수"})
engine = MagicMock(spec=ScenarioEngine)
engine.evaluate = MagicMock(return_value=_make_buy_match("AAPL"))
market = MagicMock()
market.name = "NASDAQ"
market.code = "US_NASDAQ"
market.exchange_code = "NASD"
market.is_domestic = False
telegram = MagicMock()
telegram.notify_trade_execution = AsyncMock()
telegram.notify_fat_finger = AsyncMock()
telegram.notify_circuit_breaker = AsyncMock()
telegram.notify_scenario_matched = AsyncMock()
decision_logger = MagicMock()
decision_logger.log_decision = MagicMock(return_value="decision-id")
settings = MagicMock()
settings.POSITION_SIZING_ENABLED = False
settings.CONFIDENCE_THRESHOLD = 80
settings.USD_BUFFER_MIN = 49900.0
settings.MODE = "paper"
settings.PAPER_OVERSEAS_CASH = 50000.0
await trading_cycle(
broker=MagicMock(),
overseas_broker=overseas_broker,
scenario_engine=engine,
playbook=_make_playbook(market="US"),
risk=MagicMock(),
db_conn=db_conn,
decision_logger=decision_logger,
context_store=MagicMock(
get_latest_timeframe=MagicMock(return_value=None),
set_context=MagicMock(),
),
criticality_assessor=MagicMock(
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
get_timeout=MagicMock(return_value=5.0),
),
telegram=telegram,
market=market,
stock_code="AAPL",
scan_candidates={},
settings=settings,
)
overseas_broker.send_overseas_order.assert_not_called()
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# _retry_connection — unit tests (issue #209) # _retry_connection — unit tests (issue #209)
@@ -3723,7 +3799,6 @@ class TestRetryConnection:
with patch("src.main.asyncio.sleep") as mock_sleep: with patch("src.main.asyncio.sleep") as mock_sleep:
mock_sleep.return_value = None mock_sleep.return_value = None
result = await _retry_connection(flaky, label="flaky") result = await _retry_connection(flaky, label="flaky")
assert result == "ok" assert result == "ok"
assert call_count == 2 assert call_count == 2
mock_sleep.assert_called_once() mock_sleep.assert_called_once()
@@ -3778,6 +3853,48 @@ class TestRetryConnection:
assert call_count == 1 # No retry for non-ConnectionError assert call_count == 1 # No retry for non-ConnectionError
def test_fx_buffer_guard_applies_only_to_us_and_respects_boundary() -> None:
settings = MagicMock()
settings.USD_BUFFER_MIN = 1000.0
us_market = MagicMock()
us_market.is_domestic = False
us_market.code = "US_NASDAQ"
blocked, remaining, required = _should_block_overseas_buy_for_fx_buffer(
market=us_market,
action="BUY",
total_cash=5000.0,
order_amount=4001.0,
settings=settings,
)
assert blocked
assert remaining == 999.0
assert required == 1000.0
blocked_eq, _, _ = _should_block_overseas_buy_for_fx_buffer(
market=us_market,
action="BUY",
total_cash=5000.0,
order_amount=4000.0,
settings=settings,
)
assert not blocked_eq
jp_market = MagicMock()
jp_market.is_domestic = False
jp_market.code = "JP"
blocked_jp, _, required_jp = _should_block_overseas_buy_for_fx_buffer(
market=jp_market,
action="BUY",
total_cash=5000.0,
order_amount=4500.0,
settings=settings,
)
assert not blocked_jp
assert required_jp == 0.0
# run_daily_session — daily CB baseline (daily_start_eval) tests (issue #207) # run_daily_session — daily CB baseline (daily_start_eval) tests (issue #207)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------

View File

@@ -0,0 +1,131 @@
from __future__ import annotations
from src.analysis.triple_barrier import TripleBarrierSpec, label_with_triple_barrier
def test_long_take_profit_first() -> None:
highs = [100, 101, 103]
lows = [100, 99.6, 100]
closes = [100, 100, 102]
spec = TripleBarrierSpec(take_profit_pct=0.02, stop_loss_pct=0.01, max_holding_bars=3)
out = label_with_triple_barrier(
highs=highs,
lows=lows,
closes=closes,
entry_index=0,
side=1,
spec=spec,
)
assert out.label == 1
assert out.touched == "take_profit"
assert out.touch_bar == 2
def test_long_stop_loss_first() -> None:
highs = [100, 100.5, 101]
lows = [100, 98.8, 99]
closes = [100, 99.5, 100]
spec = TripleBarrierSpec(take_profit_pct=0.02, stop_loss_pct=0.01, max_holding_bars=3)
out = label_with_triple_barrier(
highs=highs,
lows=lows,
closes=closes,
entry_index=0,
side=1,
spec=spec,
)
assert out.label == -1
assert out.touched == "stop_loss"
assert out.touch_bar == 1
def test_time_barrier_timeout() -> None:
highs = [100, 100.8, 100.7]
lows = [100, 99.3, 99.4]
closes = [100, 100, 100]
spec = TripleBarrierSpec(take_profit_pct=0.02, stop_loss_pct=0.02, max_holding_bars=2)
out = label_with_triple_barrier(
highs=highs,
lows=lows,
closes=closes,
entry_index=0,
side=1,
spec=spec,
)
assert out.label == 0
assert out.touched == "time"
assert out.touch_bar == 2
def test_tie_break_stop_first_default() -> None:
highs = [100, 102.1]
lows = [100, 98.9]
closes = [100, 100]
spec = TripleBarrierSpec(take_profit_pct=0.02, stop_loss_pct=0.01, max_holding_bars=1)
out = label_with_triple_barrier(
highs=highs,
lows=lows,
closes=closes,
entry_index=0,
side=1,
spec=spec,
)
assert out.label == -1
assert out.touched == "stop_loss"
def test_short_side_inverts_barrier_semantics() -> None:
highs = [100, 100.5, 101.2]
lows = [100, 97.8, 98.0]
closes = [100, 99, 99]
spec = TripleBarrierSpec(take_profit_pct=0.02, stop_loss_pct=0.01, max_holding_bars=3)
out = label_with_triple_barrier(
highs=highs,
lows=lows,
closes=closes,
entry_index=0,
side=-1,
spec=spec,
)
assert out.label == 1
assert out.touched == "take_profit"
def test_short_tie_break_modes() -> None:
highs = [100, 101.1]
lows = [100, 97.9]
closes = [100, 100]
stop_first = TripleBarrierSpec(
take_profit_pct=0.02,
stop_loss_pct=0.01,
max_holding_bars=1,
tie_break="stop_first",
)
out_stop = label_with_triple_barrier(
highs=highs,
lows=lows,
closes=closes,
entry_index=0,
side=-1,
spec=stop_first,
)
assert out_stop.label == -1
assert out_stop.touched == "stop_loss"
take_first = TripleBarrierSpec(
take_profit_pct=0.02,
stop_loss_pct=0.01,
max_holding_bars=1,
tie_break="take_first",
)
out_take = label_with_triple_barrier(
highs=highs,
lows=lows,
closes=closes,
entry_index=0,
side=-1,
spec=take_first,
)
assert out_take.label == 1
assert out_take.touched == "take_profit"

View File

@@ -0,0 +1,92 @@
from __future__ import annotations
import pytest
from src.analysis.walk_forward_split import generate_walk_forward_splits
def test_generates_sequential_folds() -> None:
folds = generate_walk_forward_splits(
n_samples=30,
train_size=10,
test_size=5,
)
assert len(folds) == 4
assert folds[0].train_indices == list(range(0, 10))
assert folds[0].test_indices == list(range(10, 15))
assert folds[1].train_indices == list(range(5, 15))
assert folds[1].test_indices == list(range(15, 20))
def test_purge_removes_boundary_samples_before_test() -> None:
folds = generate_walk_forward_splits(
n_samples=25,
train_size=8,
test_size=4,
purge_size=2,
)
first = folds[0]
# test starts at 10, purge=2 => train end must be 7
assert first.train_indices == list(range(0, 8))
assert first.test_indices == list(range(10, 14))
def test_embargo_excludes_post_test_samples_from_next_train() -> None:
folds = generate_walk_forward_splits(
n_samples=45,
train_size=15,
test_size=5,
step_size=10,
embargo_size=3,
)
assert len(folds) >= 2
# Fold1 test: 15..19, next fold train window: 10..24.
# embargo_size=3 should remove 20,21,22 from fold2 train.
second_train = folds[1].train_indices
assert 20 not in second_train
assert 21 not in second_train
assert 22 not in second_train
assert 23 in second_train
def test_respects_min_train_size_and_returns_empty_when_impossible() -> None:
folds = generate_walk_forward_splits(
n_samples=15,
train_size=5,
test_size=5,
min_train_size=6,
)
assert folds == []
def test_embargo_uses_last_accepted_fold_when_intermediate_fold_skips() -> None:
folds = generate_walk_forward_splits(
n_samples=30,
train_size=5,
test_size=3,
step_size=5,
embargo_size=1,
min_train_size=5,
)
# 1st fold accepted, 2nd skipped by min_train_size, subsequent folds still generated.
assert len(folds) == 3
assert folds[0].test_indices == [5, 6, 7]
assert folds[1].test_indices == [15, 16, 17]
assert folds[2].test_indices == [25, 26, 27]
@pytest.mark.parametrize(
("n_samples", "train_size", "test_size"),
[
(0, 10, 2),
(10, 0, 2),
(10, 5, 0),
],
)
def test_invalid_args_raise(n_samples: int, train_size: int, test_size: int) -> None:
with pytest.raises(ValueError):
generate_walk_forward_splits(
n_samples=n_samples,
train_size=train_size,
test_size=test_size,
)