Compare commits

...

21 Commits

Author SHA1 Message Date
agentson
42c06929ea test: add session-risk reload edge-case coverage (#327)
Some checks are pending
Gitea CI / test (push) Waiting to run
Gitea CI / test (pull_request) Waiting to run
2026-02-28 22:20:59 +09:00
agentson
5facd22ef9 feat: reload session risk profile on session transitions (#327)
Some checks failed
Gitea CI / test (pull_request) Waiting to run
Gitea CI / test (push) Has been cancelled
2026-02-28 21:04:06 +09:00
3af62ce598 Merge pull request 'feat: v2 staged exit에 실제 피처(ATR, pred_down_prob) 공급 (#325)' (#343) from feature/issue-325-staged-exit-real-features into feature/v3-session-policy-stream
Some checks are pending
Gitea CI / test (push) Waiting to run
Reviewed-on: #343
2026-02-28 20:59:38 +09:00
agentson
62cd8a81a4 feat: feed staged-exit with ATR/RSI runtime features (#325)
Some checks are pending
Gitea CI / test (push) Waiting to run
Gitea CI / test (pull_request) Waiting to run
2026-02-28 20:58:23 +09:00
dd8549b912 Merge pull request 'feat: KR ATR-based dynamic hard-stop threshold (#318)' (#342) from feature/issue-318-kr-atr-dynamic-stoploss into feature/v3-session-policy-stream
Some checks failed
Gitea CI / test (push) Has been cancelled
Reviewed-on: #342
2026-02-28 20:56:18 +09:00
agentson
8bba85da1e feat: add KR ATR-based dynamic hard-stop threshold (#318)
Some checks are pending
Gitea CI / test (push) Waiting to run
Gitea CI / test (pull_request) Waiting to run
2026-02-28 18:30:52 +09:00
fc6083bd2a Merge pull request 'feat: stop-loss reentry cooldown guard (#319)' (#341) from feature/issue-319-stoploss-reentry-cooldown into feature/v3-session-policy-stream
Some checks failed
Gitea CI / test (push) Has been cancelled
Reviewed-on: #341
2026-02-28 18:27:12 +09:00
agentson
5f53b02da8 test: add stop-loss reentry cooldown behavioral coverage (#319)
Some checks are pending
Gitea CI / test (pull_request) Waiting to run
Gitea CI / test (push) Waiting to run
2026-02-28 18:24:28 +09:00
agentson
82808a8493 feat: enforce stop-loss reentry cooldown window (#319) 2026-02-28 18:24:28 +09:00
9456d66de4 Merge pull request 'feat: US minimum price entry filter (#320)' (#340) from feature/issue-320-us-min-price-filter into feature/v3-session-policy-stream
Some checks failed
Gitea CI / test (push) Has been cancelled
Reviewed-on: #340
2026-02-28 18:22:28 +09:00
33b97f21ac Merge pull request 'fix: log blackout recovery executions to DB (#324)' (#339) from feature/issue-324-blackout-recovery-trade-log into feature/v3-session-policy-stream
Some checks failed
Gitea CI / test (push) Has been cancelled
Reviewed-on: #339
2026-02-28 18:22:11 +09:00
3b135c3080 Merge pull request 'fix: SELL outcome PnL uses sell quantity (#322)' (#337) from feature/issue-322-sell-pnl-sell-qty into feature/v3-session-policy-stream
Some checks failed
Gitea CI / test (push) Has been cancelled
Reviewed-on: #337
2026-02-28 18:21:34 +09:00
1b0d5568d3 Merge pull request 'infra: governance registry sync gate in CI (#330)' (#335) from feature/issue-330-governance-ci-guard into feature/v3-session-policy-stream
Some checks failed
Gitea CI / test (push) Has been cancelled
Reviewed-on: #335
2026-02-28 18:21:10 +09:00
agentson
2406a80782 test: add governance validator unit coverage (#330)
Some checks are pending
Gitea CI / test (push) Waiting to run
Gitea CI / test (pull_request) Waiting to run
2026-02-28 17:40:51 +09:00
b8569d9de1 Merge pull request 'fix: exchange-aware latest BUY matching (#323)' (#338) from feature/issue-323-buy-match-exchange-code into feature/v3-session-policy-stream
Some checks failed
Gitea CI / test (push) Has been cancelled
Reviewed-on: #338
2026-02-28 17:37:43 +09:00
agentson
9267f1fb77 test: add US minimum price boundary and KR-scope coverage (#320)
Some checks are pending
Gitea CI / test (push) Waiting to run
Gitea CI / test (pull_request) Waiting to run
2026-02-28 17:15:10 +09:00
agentson
fd0246769a test: add sell qty fallback guard and quantity-basis coverage (#322)
Some checks are pending
Gitea CI / test (push) Waiting to run
Gitea CI / test (pull_request) Waiting to run
2026-02-28 17:13:56 +09:00
agentson
08607eaa56 feat: block US BUY entries below minimum price threshold (#320)
Some checks failed
Gitea CI / test (pull_request) Waiting to run
Gitea CI / test (push) Has been cancelled
2026-02-28 14:40:19 +09:00
agentson
5c107d2435 fix: persist blackout recovery executions to trades log (#324)
Some checks are pending
Gitea CI / test (push) Waiting to run
Gitea CI / test (pull_request) Waiting to run
2026-02-28 14:39:30 +09:00
agentson
6d7e6557d2 fix: compute SELL decision outcome using sell quantity (#322)
Some checks failed
Gitea CI / test (pull_request) Waiting to run
Gitea CI / test (push) Has been cancelled
2026-02-28 14:38:10 +09:00
agentson
2e394cd17c infra: enforce governance registry sync checks in CI (#330)
Some checks failed
Gitea CI / test (pull_request) Waiting to run
Gitea CI / test (push) Has been cancelled
2026-02-28 14:36:05 +09:00
6 changed files with 1181 additions and 12 deletions

View File

@@ -13,6 +13,8 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Set up Python
uses: actions/setup-python@v5
@@ -26,7 +28,18 @@ jobs:
run: python3 scripts/session_handover_check.py --strict
- name: Validate governance assets
run: python3 scripts/validate_governance_assets.py
run: |
RANGE=""
if [ "${{ github.event_name }}" = "pull_request" ] && [ -n "${{ github.event.pull_request.base.sha }}" ]; then
RANGE="${{ github.event.pull_request.base.sha }}...${{ github.sha }}"
elif [ -n "${{ github.event.before }}" ] && [ "${{ github.event.before }}" != "0000000000000000000000000000000000000000" ]; then
RANGE="${{ github.event.before }}...${{ github.sha }}"
fi
if [ -n "$RANGE" ]; then
python3 scripts/validate_governance_assets.py "$RANGE"
else
python3 scripts/validate_governance_assets.py
fi
- name: Validate Ouroboros docs
run: python3 scripts/validate_ouroboros_docs.py

View File

@@ -3,9 +3,12 @@
from __future__ import annotations
import subprocess
import sys
from pathlib import Path
REQUIREMENTS_REGISTRY = "docs/ouroboros/01_requirements_registry.md"
def must_contain(path: Path, required: list[str], errors: list[str]) -> None:
if not path.exists():
@@ -17,8 +20,64 @@ def must_contain(path: Path, required: list[str], errors: list[str]) -> None:
errors.append(f"{path}: missing required token -> {token}")
def normalize_changed_path(path: str) -> str:
normalized = path.strip().replace("\\", "/")
if normalized.startswith("./"):
normalized = normalized[2:]
return normalized
def is_policy_file(path: str) -> bool:
normalized = normalize_changed_path(path)
if not normalized.endswith(".md"):
return False
if not normalized.startswith("docs/ouroboros/"):
return False
return normalized != REQUIREMENTS_REGISTRY
def load_changed_files(args: list[str], errors: list[str]) -> list[str]:
if not args:
return []
# Single range input (e.g. BASE..HEAD or BASE...HEAD)
if len(args) == 1 and ".." in args[0]:
range_spec = args[0]
try:
completed = subprocess.run(
["git", "diff", "--name-only", range_spec],
check=True,
capture_output=True,
text=True,
)
except (subprocess.CalledProcessError, FileNotFoundError) as exc:
errors.append(f"failed to load changed files from range '{range_spec}': {exc}")
return []
return [
normalize_changed_path(line)
for line in completed.stdout.splitlines()
if line.strip()
]
return [normalize_changed_path(path) for path in args if path.strip()]
def validate_registry_sync(changed_files: list[str], errors: list[str]) -> None:
if not changed_files:
return
changed_set = set(changed_files)
policy_changed = any(is_policy_file(path) for path in changed_set)
registry_changed = REQUIREMENTS_REGISTRY in changed_set
if policy_changed and not registry_changed:
errors.append(
"policy file changed without updating docs/ouroboros/01_requirements_registry.md"
)
def main() -> int:
errors: list[str] = []
changed_files = load_changed_files(sys.argv[1:], errors)
pr_template = Path(".gitea/PULL_REQUEST_TEMPLATE.md")
issue_template = Path(".gitea/ISSUE_TEMPLATE/runtime_verification.md")
@@ -81,6 +140,8 @@ def main() -> int:
if not handover_script.exists():
errors.append(f"missing file: {handover_script}")
validate_registry_sync(changed_files, errors)
if errors:
print("[FAIL] governance asset validation failed")
for err in errors:

View File

@@ -60,7 +60,16 @@ class Settings(BaseSettings):
# This value is used as a fallback when the balance API returns 0 in paper mode.
PAPER_OVERSEAS_CASH: float = Field(default=50000.0, ge=0.0)
USD_BUFFER_MIN: float = Field(default=1000.0, ge=0.0)
US_MIN_PRICE: float = Field(default=5.0, ge=0.0)
STAGED_EXIT_BE_ARM_PCT: float = Field(default=1.2, gt=0.0, le=30.0)
STAGED_EXIT_ARM_PCT: float = Field(default=3.0, gt=0.0, le=100.0)
STOPLOSS_REENTRY_COOLDOWN_MINUTES: int = Field(default=120, ge=1, le=1440)
KR_ATR_STOP_MULTIPLIER_K: float = Field(default=2.0, ge=0.1, le=10.0)
KR_ATR_STOP_MIN_PCT: float = Field(default=-2.0, le=0.0)
KR_ATR_STOP_MAX_PCT: float = Field(default=-7.0, le=0.0)
OVERNIGHT_EXCEPTION_ENABLED: bool = True
SESSION_RISK_RELOAD_ENABLED: bool = True
SESSION_RISK_PROFILES_JSON: str = "{}"
# Trading frequency mode (daily = batch API calls, realtime = per-stock calls)
TRADE_MODE: str = Field(default="daily", pattern="^(daily|realtime)$")

View File

@@ -70,6 +70,12 @@ BLACKOUT_ORDER_MANAGER = BlackoutOrderManager(
_SESSION_CLOSE_WINDOWS = {"NXT_AFTER", "US_AFTER"}
_RUNTIME_EXIT_STATES: dict[str, PositionState] = {}
_RUNTIME_EXIT_PEAKS: dict[str, float] = {}
_STOPLOSS_REENTRY_COOLDOWN_UNTIL: dict[str, float] = {}
_VOLATILITY_ANALYZER = VolatilityAnalyzer()
_SESSION_RISK_PROFILES_RAW = "{}"
_SESSION_RISK_PROFILES_MAP: dict[str, dict[str, Any]] = {}
_SESSION_RISK_LAST_BY_MARKET: dict[str, str] = {}
_SESSION_RISK_OVERRIDES_BY_MARKET: dict[str, dict[str, Any]] = {}
def safe_float(value: str | float | None, default: float = 0.0) -> float:
@@ -110,6 +116,258 @@ DAILY_TRADE_SESSIONS = 4 # Number of trading sessions per day
TRADE_SESSION_INTERVAL_HOURS = 6 # Hours between sessions
def _resolve_sell_qty_for_pnl(*, sell_qty: int | None, buy_qty: int | None) -> int:
"""Choose quantity basis for SELL outcome PnL with safe fallback."""
resolved_sell = int(sell_qty or 0)
if resolved_sell > 0:
return resolved_sell
return max(0, int(buy_qty or 0))
def _compute_kr_dynamic_stop_loss_pct(
*,
market: MarketInfo | None = None,
entry_price: float,
atr_value: float,
fallback_stop_loss_pct: float,
settings: Settings | None,
) -> float:
"""Compute KR dynamic hard-stop threshold in percent."""
if entry_price <= 0 or atr_value <= 0:
return fallback_stop_loss_pct
k = _resolve_market_setting(
market=market,
settings=settings,
key="KR_ATR_STOP_MULTIPLIER_K",
default=2.0,
)
min_pct = _resolve_market_setting(
market=market,
settings=settings,
key="KR_ATR_STOP_MIN_PCT",
default=-2.0,
)
max_pct = _resolve_market_setting(
market=market,
settings=settings,
key="KR_ATR_STOP_MAX_PCT",
default=-7.0,
)
if max_pct > min_pct:
min_pct, max_pct = max_pct, min_pct
dynamic_stop_pct = -((k * atr_value) / entry_price) * 100.0
return max(max_pct, min(min_pct, dynamic_stop_pct))
def _stoploss_cooldown_key(*, market: MarketInfo, stock_code: str) -> str:
return f"{market.code}:{stock_code}"
def _parse_session_risk_profiles(settings: Settings | None) -> dict[str, dict[str, Any]]:
if settings is None:
return {}
global _SESSION_RISK_PROFILES_RAW, _SESSION_RISK_PROFILES_MAP
raw = str(getattr(settings, "SESSION_RISK_PROFILES_JSON", "{}") or "{}")
if raw == _SESSION_RISK_PROFILES_RAW:
return _SESSION_RISK_PROFILES_MAP
parsed_map: dict[str, dict[str, Any]] = {}
try:
decoded = json.loads(raw)
if isinstance(decoded, dict):
for session_id, session_values in decoded.items():
if isinstance(session_id, str) and isinstance(session_values, dict):
parsed_map[session_id] = session_values
except (ValueError, TypeError) as exc:
logger.warning("Invalid SESSION_RISK_PROFILES_JSON; using defaults: %s", exc)
parsed_map = {}
_SESSION_RISK_PROFILES_RAW = raw
_SESSION_RISK_PROFILES_MAP = parsed_map
return _SESSION_RISK_PROFILES_MAP
def _coerce_setting_value(*, value: Any, default: Any) -> Any:
if isinstance(default, bool):
if isinstance(value, bool):
return value
if isinstance(value, str):
return value.strip().lower() in {"1", "true", "yes", "on"}
if isinstance(value, (int, float)):
return value != 0
return default
if isinstance(default, int) and not isinstance(default, bool):
try:
return int(value)
except (ValueError, TypeError):
return default
if isinstance(default, float):
return safe_float(value, float(default))
if isinstance(default, str):
return str(value)
return value
def _session_risk_overrides(
*,
market: MarketInfo | None,
settings: Settings | None,
) -> dict[str, Any]:
if market is None or settings is None:
return {}
if not bool(getattr(settings, "SESSION_RISK_RELOAD_ENABLED", True)):
return {}
session_id = get_session_info(market).session_id
previous_session = _SESSION_RISK_LAST_BY_MARKET.get(market.code)
if previous_session == session_id:
return _SESSION_RISK_OVERRIDES_BY_MARKET.get(market.code, {})
profile_map = _parse_session_risk_profiles(settings)
merged: dict[str, Any] = {}
default_profile = profile_map.get("default")
if isinstance(default_profile, dict):
merged.update(default_profile)
session_profile = profile_map.get(session_id)
if isinstance(session_profile, dict):
merged.update(session_profile)
_SESSION_RISK_LAST_BY_MARKET[market.code] = session_id
_SESSION_RISK_OVERRIDES_BY_MARKET[market.code] = merged
if previous_session is None:
logger.info(
"Session risk profile initialized for %s: %s (overrides=%s)",
market.code,
session_id,
",".join(sorted(merged.keys())) if merged else "none",
)
else:
logger.info(
"Session risk profile reloaded for %s: %s -> %s (overrides=%s)",
market.code,
previous_session,
session_id,
",".join(sorted(merged.keys())) if merged else "none",
)
return merged
def _resolve_market_setting(
*,
market: MarketInfo | None,
settings: Settings | None,
key: str,
default: Any,
) -> Any:
if settings is None:
return default
fallback = getattr(settings, key, default)
overrides = _session_risk_overrides(market=market, settings=settings)
if key not in overrides:
return fallback
return _coerce_setting_value(value=overrides[key], default=fallback)
def _stoploss_cooldown_minutes(
settings: Settings | None,
market: MarketInfo | None = None,
) -> int:
minutes = _resolve_market_setting(
market=market,
settings=settings,
key="STOPLOSS_REENTRY_COOLDOWN_MINUTES",
default=120,
)
return max(1, int(minutes))
def _estimate_pred_down_prob_from_rsi(rsi: float | str | None) -> float:
"""Estimate downside probability from RSI using a simple linear mapping."""
if rsi is None:
return 0.5
rsi_value = max(0.0, min(100.0, safe_float(rsi, 50.0)))
return rsi_value / 100.0
async def _compute_kr_atr_value(
*,
broker: KISBroker,
stock_code: str,
period: int = 14,
) -> float:
"""Compute ATR(period) for KR stocks using daily OHLC."""
days = max(period + 1, 30)
try:
daily_prices = await _retry_connection(
broker.get_daily_prices,
stock_code,
days=days,
label=f"daily_prices:{stock_code}",
)
except ConnectionError as exc:
logger.warning("ATR source unavailable for %s: %s", stock_code, exc)
return 0.0
except Exception as exc:
logger.warning("Unexpected ATR fetch failure for %s: %s", stock_code, exc)
return 0.0
if not isinstance(daily_prices, list):
return 0.0
highs: list[float] = []
lows: list[float] = []
closes: list[float] = []
for row in daily_prices:
if not isinstance(row, dict):
continue
high = safe_float(row.get("high"), 0.0)
low = safe_float(row.get("low"), 0.0)
close = safe_float(row.get("close"), 0.0)
if high <= 0 or low <= 0 or close <= 0:
continue
highs.append(high)
lows.append(low)
closes.append(close)
if len(highs) < period + 1 or len(lows) < period + 1 or len(closes) < period + 1:
return 0.0
return max(0.0, _VOLATILITY_ANALYZER.calculate_atr(highs, lows, closes, period=period))
async def _inject_staged_exit_features(
*,
market: MarketInfo,
stock_code: str,
open_position: dict[str, Any] | None,
market_data: dict[str, Any],
broker: KISBroker | None,
) -> None:
"""Inject ATR/pred_down_prob used by staged exit evaluation."""
if not open_position:
return
if "pred_down_prob" not in market_data:
market_data["pred_down_prob"] = _estimate_pred_down_prob_from_rsi(
market_data.get("rsi")
)
existing_atr = safe_float(market_data.get("atr_value"), 0.0)
if existing_atr > 0:
return
if market.is_domestic and broker is not None:
market_data["atr_value"] = await _compute_kr_atr_value(
broker=broker,
stock_code=stock_code,
)
return
market_data["atr_value"] = 0.0
async def _retry_connection(coro_factory: Any, *args: Any, label: str = "", **kwargs: Any) -> Any:
"""Call an async function retrying on ConnectionError with exponential backoff.
@@ -453,7 +711,14 @@ def _should_block_overseas_buy_for_fx_buffer(
):
return False, total_cash - order_amount, 0.0
remaining = total_cash - order_amount
required = settings.USD_BUFFER_MIN
required = float(
_resolve_market_setting(
market=market,
settings=settings,
key="USD_BUFFER_MIN",
default=1000.0,
)
)
return remaining < required, remaining, required
@@ -469,7 +734,13 @@ def _should_force_exit_for_overnight(
return True
if settings is None:
return False
return not settings.OVERNIGHT_EXCEPTION_ENABLED
overnight_enabled = _resolve_market_setting(
market=market,
settings=settings,
key="OVERNIGHT_EXCEPTION_ENABLED",
default=True,
)
return not bool(overnight_enabled)
def _build_runtime_position_key(
@@ -499,6 +770,7 @@ def _apply_staged_exit_override_for_hold(
open_position: dict[str, Any] | None,
market_data: dict[str, Any],
stock_playbook: Any | None,
settings: Settings | None = None,
) -> TradeDecision:
"""Apply v2 staged exit semantics for HOLD positions using runtime state."""
if decision.action != "HOLD" or not open_position:
@@ -514,6 +786,41 @@ def _apply_staged_exit_override_for_hold(
if stock_playbook and stock_playbook.scenarios:
stop_loss_threshold = stock_playbook.scenarios[0].stop_loss_pct
take_profit_threshold = stock_playbook.scenarios[0].take_profit_pct
atr_value = safe_float(market_data.get("atr_value"), 0.0)
if market.code == "KR":
stop_loss_threshold = _compute_kr_dynamic_stop_loss_pct(
market=market,
entry_price=entry_price,
atr_value=atr_value,
fallback_stop_loss_pct=stop_loss_threshold,
settings=settings,
)
if settings is None:
be_arm_pct = max(0.5, take_profit_threshold * 0.4)
arm_pct = take_profit_threshold
else:
be_arm_pct = max(
0.1,
float(
_resolve_market_setting(
market=market,
settings=settings,
key="STAGED_EXIT_BE_ARM_PCT",
default=1.2,
)
),
)
arm_pct = max(
be_arm_pct,
float(
_resolve_market_setting(
market=market,
settings=settings,
key="STAGED_EXIT_ARM_PCT",
default=3.0,
)
),
)
runtime_key = _build_runtime_position_key(
market_code=market.code,
@@ -532,14 +839,14 @@ def _apply_staged_exit_override_for_hold(
current_state=current_state,
config=ExitRuleConfig(
hard_stop_pct=stop_loss_threshold,
be_arm_pct=max(0.5, take_profit_threshold * 0.4),
arm_pct=take_profit_threshold,
be_arm_pct=be_arm_pct,
arm_pct=arm_pct,
),
inp=ExitRuleInput(
current_price=current_price,
entry_price=entry_price,
peak_price=peak_price,
atr_value=safe_float(market_data.get("atr_value"), 0.0),
atr_value=atr_value,
pred_down_prob=safe_float(market_data.get("pred_down_prob"), 0.0),
liquidity_weak=safe_float(market_data.get("volume_ratio"), 1.0) < 1.0,
),
@@ -559,7 +866,7 @@ def _apply_staged_exit_override_for_hold(
elif exit_eval.reason == "arm_take_profit":
rationale = (
f"Take-profit triggered ({pnl_pct:.2f}% >= "
f"{take_profit_threshold:.2f}%)"
f"{arm_pct:.2f}%)"
)
elif exit_eval.reason == "atr_trailing_stop":
rationale = "ATR trailing-stop triggered"
@@ -751,6 +1058,20 @@ async def process_blackout_recovery_orders(
accepted = result.get("rt_cd", "0") == "0"
if accepted:
runtime_session_id = get_session_info(market).session_id
log_trade(
conn=db_conn,
stock_code=intent.stock_code,
action=intent.order_type,
confidence=0,
rationale=f"[blackout-recovery] {intent.source}",
quantity=intent.quantity,
price=float(intent.price),
pnl=0.0,
market=market.code,
exchange_code=market.exchange_code,
session_id=runtime_session_id,
)
logger.info(
"Recovered queued order executed: %s %s (%s) qty=%d price=%.4f source=%s",
intent.order_type,
@@ -991,6 +1312,7 @@ async def trading_cycle(
) -> None:
"""Execute one trading cycle for a single stock."""
cycle_start_time = asyncio.get_event_loop().time()
_session_risk_overrides(market=market, settings=settings)
# 1. Fetch market data
price_output: dict[str, Any] = {} # Populated for overseas markets; used for fallback metrics
@@ -1240,7 +1562,14 @@ async def trading_cycle(
# 2.1. Apply market_outlook-based BUY confidence threshold
if decision.action == "BUY":
base_threshold = (settings.CONFIDENCE_THRESHOLD if settings else 80)
base_threshold = int(
_resolve_market_setting(
market=market,
settings=settings,
key="CONFIDENCE_THRESHOLD",
default=80,
)
)
outlook = playbook.market_outlook
if outlook == MarketOutlook.BEARISH:
min_confidence = 90
@@ -1292,6 +1621,48 @@ async def trading_cycle(
stock_code,
market.name,
)
elif market.code.startswith("US"):
min_price = float(
_resolve_market_setting(
market=market,
settings=settings,
key="US_MIN_PRICE",
default=5.0,
)
)
if current_price <= min_price:
decision = TradeDecision(
action="HOLD",
confidence=decision.confidence,
rationale=(
f"US minimum price filter blocked BUY "
f"(price={current_price:.4f} <= {min_price:.4f})"
),
)
logger.info(
"BUY suppressed for %s (%s): US min price filter %.4f <= %.4f",
stock_code,
market.name,
current_price,
min_price,
)
if decision.action == "BUY":
cooldown_key = _stoploss_cooldown_key(market=market, stock_code=stock_code)
now_epoch = datetime.now(UTC).timestamp()
cooldown_until = _STOPLOSS_REENTRY_COOLDOWN_UNTIL.get(cooldown_key, 0.0)
if now_epoch < cooldown_until:
remaining = int(cooldown_until - now_epoch)
decision = TradeDecision(
action="HOLD",
confidence=decision.confidence,
rationale=f"Stop-loss reentry cooldown active ({remaining}s remaining)",
)
logger.info(
"BUY suppressed for %s (%s): stop-loss cooldown active (%ds remaining)",
stock_code,
market.name,
remaining,
)
if decision.action == "HOLD":
open_position = get_open_position(db_conn, stock_code, market.code)
@@ -1300,6 +1671,13 @@ async def trading_cycle(
market_code=market.code,
stock_code=stock_code,
)
await _inject_staged_exit_features(
market=market,
stock_code=stock_code,
open_position=open_position,
market_data=market_data,
broker=broker,
)
decision = _apply_staged_exit_override_for_hold(
decision=decision,
market=market,
@@ -1307,6 +1685,7 @@ async def trading_cycle(
open_position=open_position,
market_data=market_data,
stock_playbook=stock_playbook,
settings=settings,
)
if open_position and decision.action == "HOLD" and _should_force_exit_for_overnight(
market=market,
@@ -1667,13 +2046,26 @@ async def trading_cycle(
)
if buy_trade and buy_trade.get("price") is not None:
buy_price = float(buy_trade["price"])
buy_qty = int(buy_trade.get("quantity") or 1)
trade_pnl = (trade_price - buy_price) * buy_qty
buy_qty = int(buy_trade.get("quantity") or 0)
sell_qty = _resolve_sell_qty_for_pnl(sell_qty=quantity, buy_qty=buy_qty)
trade_pnl = (trade_price - buy_price) * sell_qty
decision_logger.update_outcome(
decision_id=buy_trade["decision_id"],
pnl=trade_pnl,
accuracy=1 if trade_pnl > 0 else 0,
)
if trade_pnl < 0:
cooldown_key = _stoploss_cooldown_key(market=market, stock_code=stock_code)
cooldown_minutes = _stoploss_cooldown_minutes(settings, market=market)
_STOPLOSS_REENTRY_COOLDOWN_UNTIL[cooldown_key] = (
datetime.now(UTC).timestamp() + cooldown_minutes * 60
)
logger.info(
"Stop-loss cooldown set for %s (%s): %d minutes",
stock_code,
market.name,
cooldown_minutes,
)
# 6. Log trade with selection context (skip if order was rejected)
if decision.action in ("BUY", "SELL") and not order_succeeded:
@@ -2116,6 +2508,7 @@ async def run_daily_session(
# Process each open market
for market in open_markets:
_session_risk_overrides(market=market, settings=settings)
await process_blackout_recovery_orders(
broker=broker,
overseas_broker=overseas_broker,
@@ -2452,6 +2845,48 @@ async def run_daily_session(
stock_code,
market.name,
)
elif market.code.startswith("US"):
min_price = float(
_resolve_market_setting(
market=market,
settings=settings,
key="US_MIN_PRICE",
default=5.0,
)
)
if stock_data["current_price"] <= min_price:
decision = TradeDecision(
action="HOLD",
confidence=decision.confidence,
rationale=(
f"US minimum price filter blocked BUY "
f"(price={stock_data['current_price']:.4f} <= {min_price:.4f})"
),
)
logger.info(
"BUY suppressed for %s (%s): US min price filter %.4f <= %.4f",
stock_code,
market.name,
stock_data["current_price"],
min_price,
)
if decision.action == "BUY":
cooldown_key = _stoploss_cooldown_key(market=market, stock_code=stock_code)
now_epoch = datetime.now(UTC).timestamp()
cooldown_until = _STOPLOSS_REENTRY_COOLDOWN_UNTIL.get(cooldown_key, 0.0)
if now_epoch < cooldown_until:
remaining = int(cooldown_until - now_epoch)
decision = TradeDecision(
action="HOLD",
confidence=decision.confidence,
rationale=f"Stop-loss reentry cooldown active ({remaining}s remaining)",
)
logger.info(
"BUY suppressed for %s (%s): stop-loss cooldown active (%ds remaining)",
stock_code,
market.name,
remaining,
)
if decision.action == "HOLD":
daily_open = get_open_position(db_conn, stock_code, market.code)
if not daily_open:
@@ -2459,6 +2894,13 @@ async def run_daily_session(
market_code=market.code,
stock_code=stock_code,
)
await _inject_staged_exit_features(
market=market,
stock_code=stock_code,
open_position=daily_open,
market_data=stock_data,
broker=broker,
)
decision = _apply_staged_exit_override_for_hold(
decision=decision,
market=market,
@@ -2466,6 +2908,7 @@ async def run_daily_session(
open_position=daily_open,
market_data=stock_data,
stock_playbook=stock_playbook,
settings=settings,
)
if daily_open and decision.action == "HOLD" and _should_force_exit_for_overnight(
market=market,
@@ -2772,13 +3215,32 @@ async def run_daily_session(
)
if buy_trade and buy_trade.get("price") is not None:
buy_price = float(buy_trade["price"])
buy_qty = int(buy_trade.get("quantity") or 1)
trade_pnl = (trade_price - buy_price) * buy_qty
buy_qty = int(buy_trade.get("quantity") or 0)
sell_qty = _resolve_sell_qty_for_pnl(
sell_qty=quantity,
buy_qty=buy_qty,
)
trade_pnl = (trade_price - buy_price) * sell_qty
decision_logger.update_outcome(
decision_id=buy_trade["decision_id"],
pnl=trade_pnl,
accuracy=1 if trade_pnl > 0 else 0,
)
if trade_pnl < 0:
cooldown_key = _stoploss_cooldown_key(market=market, stock_code=stock_code)
cooldown_minutes = _stoploss_cooldown_minutes(
settings,
market=market,
)
_STOPLOSS_REENTRY_COOLDOWN_UNTIL[cooldown_key] = (
datetime.now(UTC).timestamp() + cooldown_minutes * 60
)
logger.info(
"Stop-loss cooldown set for %s (%s): %d minutes",
stock_code,
market.name,
cooldown_minutes,
)
# Log trade (skip if order was rejected by API)
if decision.action in ("BUY", "SELL") and not order_succeeded:
@@ -3577,6 +4039,7 @@ async def run(settings: Settings) -> None:
break
session_info = get_session_info(market)
_session_risk_overrides(market=market, settings=settings)
logger.info(
"Market session active: %s (%s) session=%s",
market.code,

View File

@@ -4,6 +4,7 @@ from datetime import UTC, date, datetime
from unittest.mock import ANY, AsyncMock, MagicMock, patch
import pytest
import src.main as main_module
from src.config import Settings
from src.context.layer import ContextLayer
@@ -15,6 +16,14 @@ from src.evolution.scorecard import DailyScorecard
from src.logging.decision_logger import DecisionLogger
from src.main import (
KILL_SWITCH,
_SESSION_RISK_LAST_BY_MARKET,
_SESSION_RISK_OVERRIDES_BY_MARKET,
_SESSION_RISK_PROFILES_MAP,
_STOPLOSS_REENTRY_COOLDOWN_UNTIL,
_apply_staged_exit_override_for_hold,
_compute_kr_atr_value,
_estimate_pred_down_prob_from_rsi,
_inject_staged_exit_features,
_RUNTIME_EXIT_PEAKS,
_RUNTIME_EXIT_STATES,
_should_force_exit_for_overnight,
@@ -27,9 +36,13 @@ from src.main import (
_extract_held_qty_from_balance,
_handle_market_close,
_retry_connection,
_resolve_market_setting,
_resolve_sell_qty_for_pnl,
_run_context_scheduler,
_run_evolution_loop,
_start_dashboard_server,
_stoploss_cooldown_minutes,
_compute_kr_dynamic_stop_loss_pct,
handle_domestic_pending_orders,
handle_overseas_pending_orders,
process_blackout_recovery_orders,
@@ -92,10 +105,20 @@ def _reset_kill_switch_state() -> None:
KILL_SWITCH.clear_block()
_RUNTIME_EXIT_STATES.clear()
_RUNTIME_EXIT_PEAKS.clear()
_SESSION_RISK_LAST_BY_MARKET.clear()
_SESSION_RISK_OVERRIDES_BY_MARKET.clear()
_SESSION_RISK_PROFILES_MAP.clear()
main_module._SESSION_RISK_PROFILES_RAW = "__reset__"
_STOPLOSS_REENTRY_COOLDOWN_UNTIL.clear()
yield
KILL_SWITCH.clear_block()
_RUNTIME_EXIT_STATES.clear()
_RUNTIME_EXIT_PEAKS.clear()
_SESSION_RISK_LAST_BY_MARKET.clear()
_SESSION_RISK_OVERRIDES_BY_MARKET.clear()
_SESSION_RISK_PROFILES_MAP.clear()
main_module._SESSION_RISK_PROFILES_RAW = "__reset__"
_STOPLOSS_REENTRY_COOLDOWN_UNTIL.clear()
class TestExtractAvgPriceFromBalance:
@@ -119,6 +142,266 @@ class TestExtractAvgPriceFromBalance:
result = _extract_avg_price_from_balance(balance, "005930", is_domestic=True)
assert result == 0.0
def test_resolve_sell_qty_for_pnl_prefers_sell_qty() -> None:
assert _resolve_sell_qty_for_pnl(sell_qty=30, buy_qty=100) == 30
def test_resolve_sell_qty_for_pnl_uses_buy_qty_fallback_when_sell_qty_missing() -> None:
assert _resolve_sell_qty_for_pnl(sell_qty=None, buy_qty=12) == 12
def test_resolve_sell_qty_for_pnl_returns_zero_when_both_missing() -> None:
assert _resolve_sell_qty_for_pnl(sell_qty=None, buy_qty=None) == 0
def test_compute_kr_dynamic_stop_loss_pct_falls_back_without_atr() -> None:
out = _compute_kr_dynamic_stop_loss_pct(
entry_price=100.0,
atr_value=0.0,
fallback_stop_loss_pct=-2.0,
settings=None,
)
assert out == -2.0
def test_compute_kr_dynamic_stop_loss_pct_clamps_to_min_and_max() -> None:
# Small ATR -> clamp to min (-2%)
out_small = _compute_kr_dynamic_stop_loss_pct(
entry_price=100.0,
atr_value=0.2,
fallback_stop_loss_pct=-2.0,
settings=None,
)
assert out_small == -2.0
# Large ATR -> clamp to max (-7%)
out_large = _compute_kr_dynamic_stop_loss_pct(
entry_price=100.0,
atr_value=10.0,
fallback_stop_loss_pct=-2.0,
settings=None,
)
assert out_large == -7.0
def test_compute_kr_dynamic_stop_loss_pct_uses_settings_values() -> None:
settings = MagicMock(
KR_ATR_STOP_MULTIPLIER_K=3.0,
KR_ATR_STOP_MIN_PCT=-1.5,
KR_ATR_STOP_MAX_PCT=-6.0,
)
out = _compute_kr_dynamic_stop_loss_pct(
entry_price=100.0,
atr_value=1.0,
fallback_stop_loss_pct=-2.0,
settings=settings,
)
assert out == -3.0
def test_resolve_market_setting_uses_session_profile_override() -> None:
settings = Settings(
KIS_APP_KEY="k",
KIS_APP_SECRET="s",
KIS_ACCOUNT_NO="12345678-01",
GEMINI_API_KEY="g",
SESSION_RISK_PROFILES_JSON='{"US_PRE": {"US_MIN_PRICE": 7.5}}',
)
market = MagicMock()
market.code = "US_NASDAQ"
with patch("src.main.get_session_info", return_value=MagicMock(session_id="US_PRE")):
value = _resolve_market_setting(
market=market,
settings=settings,
key="US_MIN_PRICE",
default=5.0,
)
assert value == pytest.approx(7.5)
def test_stoploss_cooldown_minutes_uses_session_override() -> None:
settings = Settings(
KIS_APP_KEY="k",
KIS_APP_SECRET="s",
KIS_ACCOUNT_NO="12345678-01",
GEMINI_API_KEY="g",
STOPLOSS_REENTRY_COOLDOWN_MINUTES=120,
SESSION_RISK_PROFILES_JSON='{"NXT_AFTER": {"STOPLOSS_REENTRY_COOLDOWN_MINUTES": 45}}',
)
market = MagicMock()
market.code = "KR"
with patch("src.main.get_session_info", return_value=MagicMock(session_id="NXT_AFTER")):
value = _stoploss_cooldown_minutes(settings, market=market)
assert value == 45
def test_resolve_market_setting_ignores_profile_when_reload_disabled() -> None:
settings = Settings(
KIS_APP_KEY="k",
KIS_APP_SECRET="s",
KIS_ACCOUNT_NO="12345678-01",
GEMINI_API_KEY="g",
US_MIN_PRICE=5.0,
SESSION_RISK_RELOAD_ENABLED=False,
SESSION_RISK_PROFILES_JSON='{"US_PRE": {"US_MIN_PRICE": 9.5}}',
)
market = MagicMock()
market.code = "US_NASDAQ"
with patch("src.main.get_session_info", return_value=MagicMock(session_id="US_PRE")):
value = _resolve_market_setting(
market=market,
settings=settings,
key="US_MIN_PRICE",
default=5.0,
)
assert value == pytest.approx(5.0)
def test_resolve_market_setting_falls_back_on_invalid_profile_json() -> None:
settings = Settings(
KIS_APP_KEY="k",
KIS_APP_SECRET="s",
KIS_ACCOUNT_NO="12345678-01",
GEMINI_API_KEY="g",
US_MIN_PRICE=5.0,
SESSION_RISK_PROFILES_JSON="{invalid-json",
)
market = MagicMock()
market.code = "US_NASDAQ"
with patch("src.main.get_session_info", return_value=MagicMock(session_id="US_PRE")):
value = _resolve_market_setting(
market=market,
settings=settings,
key="US_MIN_PRICE",
default=5.0,
)
assert value == pytest.approx(5.0)
def test_resolve_market_setting_coerces_bool_string_override() -> None:
settings = Settings(
KIS_APP_KEY="k",
KIS_APP_SECRET="s",
KIS_ACCOUNT_NO="12345678-01",
GEMINI_API_KEY="g",
OVERNIGHT_EXCEPTION_ENABLED=True,
SESSION_RISK_PROFILES_JSON='{"US_AFTER": {"OVERNIGHT_EXCEPTION_ENABLED": "false"}}',
)
market = MagicMock()
market.code = "US_NASDAQ"
with patch("src.main.get_session_info", return_value=MagicMock(session_id="US_AFTER")):
value = _resolve_market_setting(
market=market,
settings=settings,
key="OVERNIGHT_EXCEPTION_ENABLED",
default=True,
)
assert value is False
def test_estimate_pred_down_prob_from_rsi_uses_linear_mapping() -> None:
assert _estimate_pred_down_prob_from_rsi(None) == 0.5
assert _estimate_pred_down_prob_from_rsi(0.0) == 0.0
assert _estimate_pred_down_prob_from_rsi(50.0) == 0.5
assert _estimate_pred_down_prob_from_rsi(100.0) == 1.0
@pytest.mark.asyncio
async def test_compute_kr_atr_value_returns_zero_on_short_series() -> None:
broker = MagicMock()
broker.get_daily_prices = AsyncMock(
return_value=[{"high": 101.0, "low": 99.0, "close": 100.0}] * 10
)
atr = await _compute_kr_atr_value(broker=broker, stock_code="005930")
assert atr == 0.0
@pytest.mark.asyncio
async def test_inject_staged_exit_features_sets_pred_down_prob_and_atr_for_kr() -> None:
market = MagicMock()
market.is_domestic = True
stock_data: dict[str, float] = {"rsi": 65.0}
broker = MagicMock()
broker.get_daily_prices = AsyncMock(
return_value=[
{"high": 102.0 + i, "low": 98.0 + i, "close": 100.0 + i}
for i in range(40)
]
)
await _inject_staged_exit_features(
market=market,
stock_code="005930",
open_position={"price": 100.0, "quantity": 1},
market_data=stock_data,
broker=broker,
)
assert stock_data["pred_down_prob"] == pytest.approx(0.65)
assert stock_data["atr_value"] > 0.0
def test_apply_staged_exit_uses_independent_arm_threshold_settings() -> None:
market = MagicMock()
market.code = "KR"
market.name = "Korea"
decision = MagicMock()
decision.action = "HOLD"
decision.confidence = 70
decision.rationale = "hold"
settings = Settings(
KIS_APP_KEY="k",
KIS_APP_SECRET="s",
KIS_ACCOUNT_NO="12345678-01",
GEMINI_API_KEY="g",
STAGED_EXIT_BE_ARM_PCT=2.2,
STAGED_EXIT_ARM_PCT=5.4,
)
captured: dict[str, float] = {}
def _fake_eval(**kwargs): # type: ignore[no-untyped-def]
cfg = kwargs["config"]
captured["be_arm_pct"] = cfg.be_arm_pct
captured["arm_pct"] = cfg.arm_pct
class _Out:
should_exit = False
reason = "none"
state = PositionState.HOLDING
return _Out()
with patch("src.main.evaluate_exit", side_effect=_fake_eval):
out = _apply_staged_exit_override_for_hold(
decision=decision,
market=market,
stock_code="005930",
open_position={"price": 100.0, "quantity": 1, "decision_id": "d1", "timestamp": "t1"},
market_data={"current_price": 101.0, "rsi": 60.0, "pred_down_prob": 0.6},
stock_playbook=None,
settings=settings,
)
assert out is decision
assert captured["be_arm_pct"] == pytest.approx(2.2)
assert captured["arm_pct"] == pytest.approx(5.4)
def test_returns_zero_when_field_empty_string(self) -> None:
"""Returns 0.0 when pchs_avg_pric is an empty string."""
balance = {"output1": [{"pdno": "005930", "pchs_avg_pric": ""}]}
@@ -2040,6 +2323,105 @@ async def test_sell_updates_original_buy_decision_outcome() -> None:
assert updated_buy is not None
assert updated_buy.outcome_pnl == 20.0
assert updated_buy.outcome_accuracy == 1
assert "KR:005930" not in _STOPLOSS_REENTRY_COOLDOWN_UNTIL
@pytest.mark.asyncio
async def test_stoploss_reentry_cooldown_blocks_buy_when_active() -> None:
_STOPLOSS_REENTRY_COOLDOWN_UNTIL["KR:005930"] = datetime.now(UTC).timestamp() + 300
db_conn = init_db(":memory:")
broker = MagicMock()
broker.get_current_price = AsyncMock(return_value=(100.0, 0.0, 0.0))
broker.get_balance = AsyncMock(
return_value={
"output1": [],
"output2": [{"tot_evlu_amt": "100000", "dnca_tot_amt": "50000", "pchs_amt_smtl_amt": "50000"}],
}
)
broker.send_order = AsyncMock(return_value={"msg1": "OK"})
market = MagicMock()
market.name = "Korea"
market.code = "KR"
market.exchange_code = "KRX"
market.is_domestic = True
await trading_cycle(
broker=broker,
overseas_broker=MagicMock(),
scenario_engine=MagicMock(evaluate=MagicMock(return_value=_make_buy_match("005930"))),
playbook=_make_playbook(),
risk=MagicMock(validate_order=MagicMock(), check_circuit_breaker=MagicMock()),
db_conn=db_conn,
decision_logger=DecisionLogger(db_conn),
context_store=MagicMock(get_latest_timeframe=MagicMock(return_value=None), set_context=MagicMock()),
criticality_assessor=MagicMock(
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
get_timeout=MagicMock(return_value=5.0),
),
telegram=MagicMock(
notify_trade_execution=AsyncMock(),
notify_fat_finger=AsyncMock(),
notify_circuit_breaker=AsyncMock(),
notify_scenario_matched=AsyncMock(),
),
market=market,
stock_code="005930",
scan_candidates={},
settings=MagicMock(POSITION_SIZING_ENABLED=False, CONFIDENCE_THRESHOLD=80, MODE="paper"),
)
broker.send_order.assert_not_called()
@pytest.mark.asyncio
async def test_stoploss_reentry_cooldown_allows_buy_after_expiry() -> None:
_STOPLOSS_REENTRY_COOLDOWN_UNTIL["KR:005930"] = datetime.now(UTC).timestamp() - 10
db_conn = init_db(":memory:")
broker = MagicMock()
broker.get_current_price = AsyncMock(return_value=(100.0, 0.0, 0.0))
broker.get_balance = AsyncMock(
return_value={
"output1": [],
"output2": [{"tot_evlu_amt": "100000", "dnca_tot_amt": "50000", "pchs_amt_smtl_amt": "50000"}],
}
)
broker.send_order = AsyncMock(return_value={"msg1": "OK"})
market = MagicMock()
market.name = "Korea"
market.code = "KR"
market.exchange_code = "KRX"
market.is_domestic = True
await trading_cycle(
broker=broker,
overseas_broker=MagicMock(),
scenario_engine=MagicMock(evaluate=MagicMock(return_value=_make_buy_match("005930"))),
playbook=_make_playbook(),
risk=MagicMock(validate_order=MagicMock(), check_circuit_breaker=MagicMock()),
db_conn=db_conn,
decision_logger=DecisionLogger(db_conn),
context_store=MagicMock(get_latest_timeframe=MagicMock(return_value=None), set_context=MagicMock()),
criticality_assessor=MagicMock(
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
get_timeout=MagicMock(return_value=5.0),
),
telegram=MagicMock(
notify_trade_execution=AsyncMock(),
notify_fat_finger=AsyncMock(),
notify_circuit_breaker=AsyncMock(),
notify_scenario_matched=AsyncMock(),
),
market=market,
stock_code="005930",
scan_candidates={},
settings=MagicMock(POSITION_SIZING_ENABLED=False, CONFIDENCE_THRESHOLD=80, MODE="paper"),
)
broker.send_order.assert_called_once()
@pytest.mark.asyncio
@@ -2750,6 +3132,9 @@ async def test_sell_order_uses_broker_balance_qty_not_db() -> None:
assert call_kwargs["order_type"] == "SELL"
# Must use broker-confirmed qty (5), NOT DB-recorded ordered qty (10)
assert call_kwargs["quantity"] == 5
updated_buy = decision_logger.get_decision_by_id(buy_decision_id)
assert updated_buy is not None
assert updated_buy.outcome_pnl == -25.0
@pytest.mark.asyncio
@@ -5654,6 +6039,149 @@ async def test_order_policy_rejection_skips_order_execution() -> None:
broker.send_order.assert_not_called()
@pytest.mark.asyncio
@pytest.mark.parametrize(
("price", "should_block"),
[
(4.99, True),
(5.00, True),
(5.01, False),
],
)
async def test_us_min_price_filter_boundary(price: float, should_block: bool) -> None:
db_conn = init_db(":memory:")
decision_logger = DecisionLogger(db_conn)
broker = MagicMock()
broker.get_balance = AsyncMock(return_value={"output1": [], "output2": [{}]})
overseas_broker = MagicMock()
overseas_broker.get_overseas_price = AsyncMock(
return_value={"output": {"last": str(price), "rate": "0.0"}}
)
overseas_broker.get_overseas_balance = AsyncMock(
return_value={"output1": [], "output2": [{"frcr_evlu_tota": "10000", "frcr_buy_amt_smtl": "0"}]}
)
overseas_broker.get_overseas_buying_power = AsyncMock(
return_value={"output": {"ovrs_ord_psbl_amt": "10000"}}
)
overseas_broker.send_overseas_order = AsyncMock(return_value={"rt_cd": "0", "msg1": "OK"})
market = MagicMock()
market.name = "NASDAQ"
market.code = "US_NASDAQ"
market.exchange_code = "NASD"
market.is_domestic = False
telegram = MagicMock()
telegram.notify_trade_execution = AsyncMock()
telegram.notify_fat_finger = AsyncMock()
telegram.notify_circuit_breaker = AsyncMock()
telegram.notify_scenario_matched = AsyncMock()
settings = MagicMock()
settings.POSITION_SIZING_ENABLED = False
settings.CONFIDENCE_THRESHOLD = 80
settings.MODE = "paper"
settings.PAPER_OVERSEAS_CASH = 50000
settings.US_MIN_PRICE = 5.0
settings.USD_BUFFER_MIN = 1000.0
await trading_cycle(
broker=broker,
overseas_broker=overseas_broker,
scenario_engine=MagicMock(evaluate=MagicMock(return_value=_make_buy_match("AAPL"))),
playbook=_make_playbook("US_NASDAQ"),
risk=MagicMock(validate_order=MagicMock(), check_circuit_breaker=MagicMock()),
db_conn=db_conn,
decision_logger=decision_logger,
context_store=MagicMock(
get_latest_timeframe=MagicMock(return_value=None),
set_context=MagicMock(),
),
criticality_assessor=MagicMock(
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
get_timeout=MagicMock(return_value=5.0),
),
telegram=telegram,
market=market,
stock_code="AAPL",
scan_candidates={},
settings=settings,
)
if should_block:
overseas_broker.send_overseas_order.assert_not_called()
else:
overseas_broker.send_overseas_order.assert_called_once()
@pytest.mark.asyncio
async def test_us_min_price_filter_not_applied_to_kr_market() -> None:
db_conn = init_db(":memory:")
decision_logger = DecisionLogger(db_conn)
broker = MagicMock()
broker.get_current_price = AsyncMock(return_value=(4.0, 0.0, 0.0))
broker.get_balance = AsyncMock(
return_value={
"output1": [],
"output2": [
{
"tot_evlu_amt": "100000",
"dnca_tot_amt": "50000",
"pchs_amt_smtl_amt": "50000",
}
],
}
)
broker.send_order = AsyncMock(return_value={"msg1": "OK"})
market = MagicMock()
market.name = "Korea"
market.code = "KR"
market.exchange_code = "KRX"
market.is_domestic = True
telegram = MagicMock()
telegram.notify_trade_execution = AsyncMock()
telegram.notify_fat_finger = AsyncMock()
telegram.notify_circuit_breaker = AsyncMock()
telegram.notify_scenario_matched = AsyncMock()
settings = MagicMock()
settings.POSITION_SIZING_ENABLED = False
settings.CONFIDENCE_THRESHOLD = 80
settings.MODE = "paper"
settings.US_MIN_PRICE = 5.0
settings.USD_BUFFER_MIN = 1000.0
await trading_cycle(
broker=broker,
overseas_broker=MagicMock(),
scenario_engine=MagicMock(evaluate=MagicMock(return_value=_make_buy_match("005930"))),
playbook=_make_playbook(),
risk=MagicMock(validate_order=MagicMock(), check_circuit_breaker=MagicMock()),
db_conn=db_conn,
decision_logger=decision_logger,
context_store=MagicMock(
get_latest_timeframe=MagicMock(return_value=None),
set_context=MagicMock(),
),
criticality_assessor=MagicMock(
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
get_timeout=MagicMock(return_value=5.0),
),
telegram=telegram,
market=market,
stock_code="005930",
scan_candidates={},
settings=settings,
)
broker.send_order.assert_called_once()
def test_overnight_policy_prioritizes_killswitch_over_exception() -> None:
market = MagicMock()
with patch("src.main.get_session_info", return_value=MagicMock(session_id="US_AFTER")):
@@ -5837,6 +6365,7 @@ async def test_process_blackout_recovery_executes_valid_intents() -> None:
patch("src.main.MARKETS", {"KR": market}),
patch("src.main.get_open_position", return_value=None),
patch("src.main.validate_order_policy"),
patch("src.main.get_session_info", return_value=MagicMock(session_id="KRX_REG")),
):
await process_blackout_recovery_orders(
broker=broker,
@@ -5845,6 +6374,19 @@ async def test_process_blackout_recovery_executes_valid_intents() -> None:
)
broker.send_order.assert_called_once()
row = db_conn.execute(
"""
SELECT action, quantity, session_id, rationale
FROM trades
WHERE stock_code = '005930'
ORDER BY id DESC LIMIT 1
"""
).fetchone()
assert row is not None
assert row[0] == "BUY"
assert row[1] == 1
assert row[2] == "KRX_REG"
assert row[3].startswith("[blackout-recovery]")
@pytest.mark.asyncio

View File

@@ -0,0 +1,81 @@
from __future__ import annotations
import importlib.util
from pathlib import Path
from types import SimpleNamespace
def _load_module():
script_path = Path(__file__).resolve().parents[1] / "scripts" / "validate_governance_assets.py"
spec = importlib.util.spec_from_file_location("validate_governance_assets", script_path)
assert spec is not None
assert spec.loader is not None
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def test_is_policy_file_detects_ouroboros_policy_docs() -> None:
module = _load_module()
assert module.is_policy_file("docs/ouroboros/85_loss_recovery_action_plan.md")
assert not module.is_policy_file("docs/ouroboros/01_requirements_registry.md")
assert not module.is_policy_file("docs/workflow.md")
assert not module.is_policy_file("docs/ouroboros/notes.txt")
def test_validate_registry_sync_requires_registry_update_when_policy_changes() -> None:
module = _load_module()
errors: list[str] = []
module.validate_registry_sync(
["docs/ouroboros/85_loss_recovery_action_plan.md"],
errors,
)
assert errors
assert "policy file changed without updating" in errors[0]
def test_validate_registry_sync_passes_when_registry_included() -> None:
module = _load_module()
errors: list[str] = []
module.validate_registry_sync(
[
"docs/ouroboros/85_loss_recovery_action_plan.md",
"docs/ouroboros/01_requirements_registry.md",
],
errors,
)
assert errors == []
def test_load_changed_files_supports_explicit_paths() -> None:
module = _load_module()
errors: list[str] = []
changed = module.load_changed_files(
["./docs/ouroboros/85_loss_recovery_action_plan.md", " src/main.py "],
errors,
)
assert errors == []
assert changed == [
"docs/ouroboros/85_loss_recovery_action_plan.md",
"src/main.py",
]
def test_load_changed_files_with_range_uses_git_diff(monkeypatch) -> None:
module = _load_module()
errors: list[str] = []
def fake_run(cmd, check, capture_output, text): # noqa: ANN001
assert cmd[:3] == ["git", "diff", "--name-only"]
assert check is True
assert capture_output is True
assert text is True
return SimpleNamespace(stdout="docs/ouroboros/85_loss_recovery_action_plan.md\nsrc/main.py\n")
monkeypatch.setattr(module.subprocess, "run", fake_run)
changed = module.load_changed_files(["abc...def"], errors)
assert errors == []
assert changed == [
"docs/ouroboros/85_loss_recovery_action_plan.md",
"src/main.py",
]