Compare commits

..

18 Commits

Author SHA1 Message Date
0542e78f90 Merge pull request 'process: automate backtest gate for PR/push/schedule (#314)' (#315) from feature/issue-314-backtest-gate-automation into feature/v3-session-policy-stream
Some checks are pending
Gitea CI / test (push) Waiting to run
2026-02-28 03:25:45 +09:00
agentson
8396dc1606 process: automate backtest gate for PR/push/schedule (#314)
Some checks are pending
Gitea CI / test (push) Waiting to run
Gitea CI / test (pull_request) Waiting to run
2026-02-28 03:25:00 +09:00
343631a935 Merge pull request 'feat: integrate v2 backtest validation pipeline (#305)' (#313) from feature/issue-305-backtest-pipeline-integration into feature/v3-session-policy-stream
Some checks failed
Gitea CI / test (push) Has been cancelled
2026-02-27 23:59:34 +09:00
agentson
c00525eb4d feat: integrate v2 backtest pipeline for triple barrier and walk-forward (#305)
Some checks are pending
Gitea CI / test (push) Waiting to run
Gitea CI / test (pull_request) Waiting to run
2026-02-27 23:58:52 +09:00
1ae12f92f6 Merge pull request 'fix: runtime staged exit semantics in trading_cycle and run_daily_session (#304)' (#312) from feature/issue-304-runtime-staged-exit-semantics into feature/v3-session-policy-stream
Some checks failed
Gitea CI / test (push) Has been cancelled
2026-02-27 23:49:59 +09:00
agentson
98dab2e06e fix: apply staged exit semantics in runtime paths (#304)
Some checks are pending
Gitea CI / test (push) Waiting to run
Gitea CI / test (pull_request) Waiting to run
2026-02-27 23:48:52 +09:00
a63d23fab9 Merge pull request 'process: harden implementation-start gate before coding (#310)' (#311) from feature/issue-310-implementation-start-gate into feature/v3-session-policy-stream
Some checks failed
Gitea CI / test (push) Has been cancelled
2026-02-27 23:24:40 +09:00
agentson
85a59542f8 process: harden implementation-start gate before coding
Some checks are pending
Gitea CI / test (push) Waiting to run
Gitea CI / test (pull_request) Waiting to run
2026-02-27 23:21:54 +09:00
5830791355 Merge pull request 'process: enforce session handover gate across sessions (#308)' (#309) from feature/issue-308-session-handover-gate into feature/v3-session-policy-stream
Some checks failed
Gitea CI / test (push) Has been cancelled
2026-02-27 23:09:04 +09:00
agentson
b1610f14c5 process: enforce session handover gate across sessions (#308)
Some checks are pending
Gitea CI / test (push) Waiting to run
Gitea CI / test (pull_request) Waiting to run
2026-02-27 23:08:29 +09:00
1984065499 Merge pull request 'process: enforce process-change-first and staged acceptance gates (#306)' (#307) from feature/issue-306-process-change-first into feature/v3-session-policy-stream 2026-02-27 22:46:33 +09:00
agentson
d912471d0e process: enforce process-change-first and staged ticket maturity (#306) 2026-02-27 22:46:18 +09:00
5f337e2ebc Merge pull request 'fix: realtime include extended KR/US sessions (#301)' (#303) from feature/issue-301-extended-session-schedule into feature/v3-session-policy-stream 2026-02-27 22:30:26 +09:00
agentson
4a404875a9 fix: include extended KR/US sessions in realtime market scheduling (#301) 2026-02-27 22:30:13 +09:00
cdd3814781 Merge pull request 'governance: enforce runtime NOT_OBSERVED recovery gates (#301)' (#302) from feature/issue-301-runtime-verify-recovery into feature/v3-session-policy-stream 2026-02-27 22:14:03 +09:00
agentson
dbf57b5068 governance: enforce runtime verification coverage gates (#301) 2026-02-27 22:13:11 +09:00
7efc254ab5 Merge pull request '[RISK-EMERGENCY] TKT-P1-008 오버나잇 예외 vs Kill Switch 우선순위' (#300) from feature/issue-tkt-p1-008-overnight-killswitch-priority into feature/v3-session-policy-stream 2026-02-27 08:57:25 +09:00
d60fd8947b Merge pull request '[EXEC-POLICY] TKT-P1-007 session_id 로그 원장 강제' (#298) from feature/issue-tkt-p1-007-session-id-ledger into feature/v3-session-policy-stream 2026-02-27 08:51:27 +09:00
23 changed files with 1878 additions and 82 deletions

View File

@@ -0,0 +1,41 @@
---
name: Runtime Verification Incident
about: 실운영/스테이징 동작 검증 중 발견된 이상 징후 등록
title: "[RUNTIME-VERIFY][SCN-XXX] "
labels: runtime, verification
---
## Summary
- 현상:
- 최초 관측 시각(UTC):
## Reproduction / Observation
- 실행 모드(`live`/`paper`):
- 세션(`NXT`, `US_PRE`, `US_DAY`, `US_AFTER`, ...):
- 실행 커맨드:
- 로그 경로:
## Expected vs Actual
- Expected:
- Actual:
## Requirement Mapping
- REQ:
- TASK:
- TEST:
## Temporary Mitigation
- 즉시 완화책:
## Close Criteria
- [ ] Dev 수정 반영
- [ ] Verifier 재검증 PASS
- [ ] Runtime Verifier 재관측 PASS
- [ ] `NOT_OBSERVED = 0`

View File

@@ -0,0 +1,53 @@
## Linked Issue
- Closes #N
## Scope
- REQ: `REQ-...`
- TASK: `TASK-...`
- TEST: `TEST-...`
## Ticket Stage
- Current stage: `Implemented` / `Integrated` / `Observed` / `Accepted`
- Previous stage evidence link:
## Main -> Verifier Directive Contract
- Scope: 대상 요구사항/코드/로그 경로
- Method: 실행 커맨드 + 관측 포인트
- PASS criteria:
- FAIL criteria:
- NOT_OBSERVED criteria:
- Evidence format: PR 코멘트 `Coverage Matrix`
## Verifier Coverage Matrix (Required)
| Item | Evidence | Status (PASS/FAIL/NOT_OBSERVED) |
|---|---|---|
| REQ-... | 링크/로그 | PASS |
`NOT_OBSERVED`가 1개라도 있으면 승인/머지 금지.
## Gitea Preflight
- [ ] `docs/commands.md``docs/workflow.md` 트러블슈팅 선확인
- [ ] `tea` 사용 (`gh` 미사용)
## Session Handover Gate
- [ ] `python3 scripts/session_handover_check.py --strict` 통과
- [ ] `workflow/session-handover.md` 최신 엔트리가 현재 브랜치/당일(UTC) 기준으로 갱신됨
- 최신 handover 엔트리 heading:
## Runtime Evidence
- 시스템 실제 구동 커맨드:
- 모니터링 로그 경로:
- 이상 징후/이슈 링크:
## Approval Gate
- [ ] Static Verifier approval comment linked
- [ ] Runtime Verifier approval comment linked

38
.gitea/workflows/ci.yml Normal file
View File

@@ -0,0 +1,38 @@
name: Gitea CI
on:
pull_request:
push:
branches:
- main
- feature/**
jobs:
test:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install dependencies
run: pip install ".[dev]"
- name: Session handover gate
run: python3 scripts/session_handover_check.py --strict
- name: Validate governance assets
run: python3 scripts/validate_governance_assets.py
- name: Validate Ouroboros docs
run: python3 scripts/validate_ouroboros_docs.py
- name: Lint
run: ruff check src/ tests/
- name: Run tests with coverage
run: pytest -v --cov=src --cov-report=term-missing --cov-fail-under=80

66
.github/workflows/backtest-gate.yml vendored Normal file
View File

@@ -0,0 +1,66 @@
name: Backtest Gate
on:
pull_request:
branches: ["**"]
push:
branches:
- "feature/**"
schedule:
# Daily scheduled gate (KST 01:20)
- cron: "20 16 * * *"
workflow_dispatch:
inputs:
mode:
description: "backtest mode (auto|smoke|full)"
required: false
default: "auto"
base_ref:
description: "git base ref for changed-file diff"
required: false
default: "origin/main"
jobs:
backtest-gate:
runs-on: ubuntu-latest
concurrency:
group: backtest-gate-${{ github.ref }}
cancel-in-progress: true
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Set up Python 3.11
uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install dependencies
run: pip install ".[dev]"
- name: Resolve base ref
id: base
run: |
if [ "${{ github.event_name }}" = "pull_request" ]; then
echo "ref=origin/${{ github.base_ref }}" >> "$GITHUB_OUTPUT"
elif [ "${{ github.event_name }}" = "workflow_dispatch" ] && [ -n "${{ github.event.inputs.base_ref }}" ]; then
echo "ref=${{ github.event.inputs.base_ref }}" >> "$GITHUB_OUTPUT"
else
echo "ref=origin/main" >> "$GITHUB_OUTPUT"
fi
- name: Run backtest gate
env:
BASE_REF: ${{ steps.base.outputs.ref }}
BACKTEST_MODE: ${{ github.event_name == 'workflow_dispatch' && github.event.inputs.mode || 'auto' }}
FORCE_FULL_BACKTEST: ${{ github.event_name == 'schedule' && 'true' || 'false' }}
run: bash scripts/backtest_gate.sh
- name: Upload backtest logs
if: always()
uses: actions/upload-artifact@v4
with:
name: backtest-gate-logs
path: data/backtest-gate/*.log

View File

@@ -21,6 +21,15 @@ jobs:
- name: Install dependencies
run: pip install ".[dev]"
- name: Session handover gate
run: python3 scripts/session_handover_check.py --strict
- name: Validate governance assets
run: python3 scripts/validate_governance_assets.py
- name: Validate Ouroboros docs
run: python3 scripts/validate_ouroboros_docs.py
- name: Lint
run: ruff check src/ tests/

View File

@@ -12,6 +12,8 @@ It is distinct from `docs/requirements-log.md`, which records **project/product
1. **Workflow enforcement**
- Follow `docs/workflow.md` for all changes.
- Before any Gitea issue/PR/comment operation, read `docs/commands.md` and `docs/workflow.md` troubleshooting section.
- Use `tea` for Gitea operations; do not use GitHub CLI (`gh`) in this repository workflow.
- Create a Gitea issue before any code or documentation change.
- Work on a feature branch `feature/issue-{N}-{short-description}` and open a PR.
- Never commit directly to `main`.
@@ -30,6 +32,16 @@ It is distinct from `docs/requirements-log.md`, which records **project/product
(or in a dedicated policy doc) and reference it when working.
- Keep entries short and concrete, with dates.
5. **Session start handover gate**
- Before implementation/verification work, run `python3 scripts/session_handover_check.py --strict`.
- Keep `workflow/session-handover.md` updated with a same-day entry for the active branch.
- If the check fails, stop and fix handover artifacts first.
6. **Process-change-first execution gate**
- If process/governance change is required, merge the process ticket to the feature branch first.
- Do not start code/test edits for implementation tickets until process merge evidence is confirmed.
- Subagents must be constrained to read-only exploration until the process gate is satisfied.
## Change Control
- Changes to this file follow the same workflow as code changes.
@@ -43,3 +55,15 @@ It is distinct from `docs/requirements-log.md`, which records **project/product
- When work requires guidance, consult the relevant `docs/` policies first.
- Any code change must be accompanied by relevant documentation updates.
- Persist user constraints across sessions by recording them in this document.
### 2026-02-27
- All agents must pre-read `docs/commands.md` and `docs/workflow.md` troubleshooting before running Gitea issue/PR/comment commands.
- `gh` CLI is prohibited for repository ticket/PR operations; use `tea` (or documented Gitea API fallback only).
- Session start must pass `python3 scripts/session_handover_check.py --strict`, with branch-matched entry in `workflow/session-handover.md`.
### 2026-02-27
- Apply process-change-first as an execution gate: process ticket must be merged before implementation ticket coding.
- Handover entry must record concrete `next_ticket` and `process_gate_checked`; placeholders are not allowed in strict gate.
- Before process merge confirmation, all subagent tasks must remain read-only (analysis only).

View File

@@ -4,6 +4,23 @@
**Critical: Learn from failures. Never repeat the same failed command without modification.**
## Repository VCS Rule (Mandatory)
- 이 저장소의 티켓/PR/코멘트 작업은 Gitea 기준으로 수행한다.
- `gh`(GitHub CLI) 명령 사용은 금지한다.
- 기본 도구는 `tea`이며, `tea` 미지원 케이스만 Gitea API를 fallback으로 사용한다.
- 실행 전 `docs/workflow.md``Gitea CLI Formatting Troubleshooting`을 반드시 확인한다.
## Session Handover Preflight (Mandatory)
- 세션 시작 직후(코드 변경 전) 아래 명령을 먼저 실행한다.
```bash
python3 scripts/session_handover_check.py --strict
```
- 실패 시 `workflow/session-handover.md` 최신 엔트리를 보강한 뒤 재실행한다.
### tea CLI (Gitea Command Line Tool)
#### ❌ TTY Error - Interactive Confirmation Fails
@@ -140,6 +157,15 @@ python -m src.main --mode=paper
# Run with dashboard enabled
python -m src.main --mode=paper --dashboard
# Runtime verification monitor (NOT_OBSERVED detection)
bash scripts/runtime_verify_monitor.sh
# Session handover gate (must pass before implementation)
python3 scripts/session_handover_check.py --strict
# Follow runtime verification log
tail -f data/overnight/runtime_verify_*.log
# Docker
docker compose up -d ouroboros # Run agent
docker compose --profile test up test # Run tests in container

View File

@@ -34,6 +34,12 @@ Main Agent 아이디에이션 책임:
- DCP-03 구현 착수: Phase 2 종료 전 Main Agent 승인 필수
- DCP-04 배포 승인: Phase 4 종료 후 Main Agent 최종 승인 필수
Main/Verifier 사고 재발 방지 규칙:
- Main Agent는 검증 위임 시 `Directive Contract`를 충족하지 않으면 검증 착수 금지
- Verifier Agent는 지시 누락/모호성 발견 시 즉시 `BLOCKED`를 선언하고 보완 요청
- Verifier Agent는 `미관측(NOT_OBSERVED)` 항목을 PASS로 보고할 수 없다
- Runtime 검증에서 요구 세션 증적이 없으면 "정상"이 아니라 `미검증 이상`으로 이슈화한다
## Phase Control Gates
### Phase 0: Scenario Intake and Scope Lock
@@ -112,7 +118,10 @@ Exit criteria:
Control checks:
- Verifier가 테스트 증적(로그/리포트/실행 커맨드) 첨부
- Verifier가 `Coverage Matrix`(`REQ/TASK/TEST` x `PASS/FAIL/NOT_OBSERVED`) 첨부
- `NOT_OBSERVED` 항목 수가 0인지 확인(0이 아니면 Gate 실패)
- Runtime Verifier가 스테이징/실운영 모니터링 계획 승인
- 정적 Verifier 승인 + Runtime Verifier 승인 2개 모두 확인
- 산출물: 수용 승인 레코드
### Phase 5: Release and Post-Release Control
@@ -150,6 +159,17 @@ TPM 티켓 운영 규칙:
- PR 본문에는 TPM이 지정한 우선순위와 범위가 그대로 반영되어야 한다.
- 우선순위 변경은 TPM 제안 + Main Agent 승인으로만 가능하다.
- PM/TPM/Dev/Reviewer/Verifier/Runtime Verifier는 주요 의사결정 시점마다 PR 코멘트를 남겨 결정 근거를 추적 가능 상태로 유지한다.
- PM/TPM/Dev/Reviewer/Verifier/Runtime Verifier는 이슈/PR/코멘트 조작 전에 `docs/commands.md``docs/workflow.md`의 Gitea 트러블슈팅 섹션을 선참조해야 한다.
- 저장소 협업에서 GitHub CLI(`gh`) 사용은 금지하며, Gitea 작업은 `tea`(필요 시 문서화된 API fallback)만 허용한다.
- 재발 방지/운영 규칙 변경이 합의되면, 기능 구현 이전에 process 티켓을 먼저 생성/머지해야 한다.
- process 티켓 미반영 상태에서 구현 티켓 진행 시 TPM이 즉시 `BLOCKED` 처리한다.
티켓 성숙도 단계 (Mandatory):
- `Implemented`: 코드/문서 변경 완료
- `Integrated`: 호출 경로/파이프라인 연결 확인
- `Observed`: 런타임/실행 증적 확보
- `Accepted`: Verifier + Runtime Verifier 승인 완료
- 단계는 순차 전진만 허용되며, 단계 점프는 허용되지 않는다.
브랜치 운영 규칙:
- TPM은 각 티켓에 대해 `ticket temp branch -> program feature branch` PR 경로를 지정한다.
@@ -168,6 +188,8 @@ TPM 티켓 운영 규칙:
- 시스템 실제 구동(스테이징/로컬 실운영 모드) 실행
- 모니터링 체크리스트(핵심 경보/주문 경로/예외 로그) 수행
- 결과를 티켓/PR 코멘트에 증적으로 첨부하지 않으면 완료로 간주하지 않음
- 세션별 필수 관측 포인트(`NXT`, `US_PRE`, `US_DAY`, `US_AFTER` 등) 중 미관측 항목은 `NOT_OBSERVED`로 기록
- `NOT_OBSERVED` 존재 시 승인 금지 + Runtime 이슈 발행
## Server Reflection Rule

View File

@@ -3,7 +3,7 @@ Doc-ID: DOC-OPS-002
Version: 1.0.0
Status: active
Owner: tpm
Updated: 2026-02-26
Updated: 2026-02-27
-->
# 저장소 강제 설정 체크리스트
@@ -48,6 +48,8 @@ Updated: 2026-02-26
병합 전 체크리스트:
- 이슈 연결(`Closes #N`) 존재
- PR 본문에 `REQ-*`, `TASK-*`, `TEST-*` 매핑 표 존재
- Main -> Verifier Directive Contract(범위/방법/합격/실패/미관측/증적 형식) 기재
- process-change-first 대상이면 process 티켓 PR이 선머지됨
- `src/core/risk_manager.py` 변경 없음
- 주요 의사결정 체크포인트(DCP-01~04) 중 해당 단계 Main Agent 확인 기록 존재
- 주요 의사결정(리뷰 지적/수정 합의/검증 승인)에 대한 에이전트 PR 코멘트 존재
@@ -56,7 +58,14 @@ Updated: 2026-02-26
자동 점검:
- 문서 검증 스크립트 통과
- 테스트 통과
- `python3 scripts/session_handover_check.py --strict` 통과
- 개발 완료 시 시스템 구동/모니터링 증적 코멘트 존재
- 이슈/PR 조작 전에 `docs/commands.md``docs/workflow.md` 트러블슈팅 확인 코멘트 존재
- `gh` CLI 미사용, `tea` 사용 증적 존재
- Verifier `Coverage Matrix` 첨부(PASS/FAIL/NOT_OBSERVED)
- `NOT_OBSERVED` 항목 0 확인(0이 아니면 머지 금지)
- 티켓 단계 기록(`Implemented` -> `Integrated` -> `Observed` -> `Accepted`) 존재
- 정적 Verifier 승인 + Runtime Verifier 승인 2개 확인
## 5) 감사 추적

View File

@@ -355,3 +355,36 @@ Order result: 모의투자 매수주문이 완료 되었습니다. ✓
- `TestOverseasGhostPositionClose` 2개: ghost-close 로그 확인, 일반 오류 무시
**이슈/PR:** #235, PR #236
---
## 2026-02-27
### v2 백테스트 파이프라인 통합 (#305)
**배경:**
- `TripleBarrier`, `WalkForward`, `BacktestCostGuard`는 개별 모듈로 존재했으나,
하나의 실행 경로로 연결된 파이프라인이 없어 통합 검증이 불가능했다.
**구현 내용:**
1. `src/analysis/backtest_pipeline.py`
- `run_v2_backtest_pipeline()` 추가:
- `validate_backtest_cost_model()` 선검증(fail-fast)
- `label_with_triple_barrier()`로 entry 라벨 생성
- `generate_walk_forward_splits()`로 fold 생성
- fold별 baseline(`B0`, `B1`, `M1`) score 산출
- 결과 아티팩트 계약 구조(`BacktestPipelineResult`) 정의
- leakage 검사 유틸 `fold_has_leakage()` 제공
2. `tests/test_backtest_pipeline_integration.py` 신규
- happy path 통합 검증
- cost guard 실패 fail-fast 검증
- purge/embargo 기반 누수 방지 검증
- 동일 입력 재실행 결정성 검증
**검증:**
- `pytest -q tests/test_backtest_pipeline_integration.py tests/test_triple_barrier.py tests/test_walk_forward_split.py tests/test_backtest_cost_guard.py tests/test_backtest_execution_model.py`
- `ruff check src/analysis/backtest_pipeline.py tests/test_backtest_pipeline_integration.py`
**이슈/PR:** #305

View File

@@ -181,6 +181,29 @@ pytest -v --cov=src --cov-report=term-missing
**Note:** `main.py` has lower coverage as it contains the main loop which is tested via integration/manual testing.
## Backtest Automation Gate
백테스트 관련 검증은 `scripts/backtest_gate.sh``.github/workflows/backtest-gate.yml`로 자동 실행된다.
- PR: 변경 파일 기준 `auto` 모드
- `feature/**` push: 변경 파일 기준 `auto` 모드
- Daily schedule: `full` 강제 실행
- Manual dispatch: `mode`(`auto|smoke|full`) 지정 가능
실행 기준:
- `src/analysis/`, `src/strategy/`, `src/strategies/`, `src/main.py`, `src/markets/`, `src/broker/`
- 백테스트 핵심 테스트 파일 변경
- `docs/ouroboros/` 변경
`auto` 모드에서 백테스트 민감 영역 변경이 없으면 게이트는 `skip` 처리되며 실패로 간주하지 않는다.
로컬 수동 실행:
```bash
bash scripts/backtest_gate.sh
BACKTEST_MODE=full bash scripts/backtest_gate.sh
BASE_REF=origin/feature/v3-session-policy-stream BACKTEST_MODE=auto bash scripts/backtest_gate.sh
```
## Test Configuration
### `pyproject.toml`

View File

@@ -16,6 +16,33 @@
**Never commit directly to `main`.** This policy applies to all changes, no exceptions.
## Agent Gitea Preflight (Mandatory)
Gitea 이슈/PR/코멘트 작업 전에 모든 에이전트는 아래를 먼저 확인해야 한다.
1. `docs/commands.md``tea CLI` 실패 사례/해결 패턴 확인
2. 본 문서의 `Gitea CLI Formatting Troubleshooting` 확인
3. 명령 실행 전 `gh`(GitHub CLI) 사용 금지 확인
강제 규칙:
- 이 저장소 협업 명령은 `tea`를 기본으로 사용한다.
- `gh issue`, `gh pr` 등 GitHub CLI 명령은 사용 금지다.
- `tea` 실패 시 동일 명령 재시도 전에 원인/수정사항을 PR 코멘트에 남긴다.
- 필요한 경우에만 Gitea API(`localhost:3000`)를 fallback으로 사용한다.
## Session Handover Gate (Mandatory)
새 세션에서 구현/검증을 시작하기 전에 아래를 선행해야 한다.
1. `docs/workflow.md`, `docs/commands.md`, `docs/agent-constraints.md` 재확인
2. `workflow/session-handover.md`에 최신 세션 엔트리 추가
3. `python3 scripts/session_handover_check.py --strict` 통과 확인
강제 규칙:
- handover check 실패 상태에서 코드 수정/이슈 상태 전이/PR 생성 금지
- 최신 handover 엔트리는 현재 작업 브랜치를 명시해야 한다
- 최신 handover 엔트리는 당일(UTC) 날짜를 포함해야 한다
## Branch Strategy (Mandatory)
- Team operation default branch is the **program feature branch**, not `main`.
@@ -24,6 +51,21 @@
- Until final user sign-off, `main` merge is prohibited.
- 각 에이전트는 주요 의사결정(리뷰 지적, 수정 방향, 검증 승인)마다 PR 코멘트를 적극 작성해 의사결정 과정을 남긴다.
## Backtest Gate Policy (Mandatory)
사람 의존도를 줄이기 위해 백테스트 검증은 자동 게이트를 기본으로 한다.
- 워크플로우: `.github/workflows/backtest-gate.yml`
- 실행 스크립트: `scripts/backtest_gate.sh`
- 기본 모드: `auto` (변경 파일 기반 실행/skip 판정)
- 정기 스케줄: daily `full` 강제 실행
- 수동 재실행: workflow dispatch + `mode` 지정
강제 규칙:
- 백테스트 민감 변경(PR/feature push)에서 게이트 실패 시 머지 금지
- 스케줄 게이트 실패 시 이슈 등록 후 원인/복구 계획 기록
- `python` 대신 `python3` 기준으로 실행한다
## Gitea CLI Formatting Troubleshooting
Issue/PR 본문 작성 시 줄바꿈(`\n`)이 문자열 그대로 저장되는 문제가 반복될 수 있다. 원인은 `-d "...\n..."` 형태에서 쉘/CLI가 이스케이프를 실제 개행으로 해석하지 않기 때문이다.
@@ -137,6 +179,57 @@ task_tool(
Use `run_in_background=True` for independent tasks that don't block subsequent work.
### Main -> Verifier Directive Contract (Mandatory)
메인 에이전트가 검증 에이전트에 작업을 위임할 때, 아래 6개를 누락하면 지시가 무효다.
1. 검증 대상 범위: `REQ-*`, `TASK-*`, 코드/로그 경로
2. 검증 방법: 실행 커맨드와 관측 포인트(예: 세션별 로그 키워드)
3. 합격 기준: PASS 조건을 수치/문구로 명시
4. 실패 기준: FAIL 조건을 수치/문구로 명시
5. 미관측 기준: `NOT_OBSERVED` 조건과 즉시 에스컬레이션 규칙
6. 증적 형식: PR 코멘트에 `Coverage Matrix` 표로 제출
`NOT_OBSERVED` 처리 규칙:
- 요구사항 항목이 관측되지 않았으면 PASS로 간주 금지
- `NOT_OBSERVED`는 운영상 `FAIL`과 동일하게 처리
- `NOT_OBSERVED`가 하나라도 있으면 승인/머지 금지
### Process-Change-First Rule (Mandatory)
재발 방지/운영 규칙 변경이 결정되면, 기능 구현 티켓보다 먼저 서버(feature branch)에 반영해야 한다.
- 순서: `process ticket merge` -> `implementation ticket start`
- process ticket 미반영 상태에서 기능 티켓 코딩/머지 금지
- 세션 전환 시에도 동일 규칙 유지
### Implementation Start Gate (Mandatory)
구현 티켓을 시작하기 전에 아래 3개를 모두 만족해야 한다.
1. `process ticket merge` 증적 확인 (feature branch 반영 커밋/PR)
2. `workflow/session-handover.md` 최신 엔트리에 `next_ticket``process_gate_checked` 기록
3. `python3 scripts/session_handover_check.py --strict` 통과
강제 규칙:
- 위 3개 중 하나라도 불충족이면 코드/테스트 수정 금지
- 서브에이전트 지시도 동일하게 제한한다 (`process merged 확인 전 read-only 탐색만 허용`)
- 성급 착수 발견 시 구현 작업을 즉시 중단하고 handover/proces gate부터 복구한다
### Ticket Maturity Stages (Mandatory)
모든 티켓은 아래 4단계를 순서대로 통과해야 한다.
1. `Implemented`: 코드/문서 변경 완료
2. `Integrated`: 호출 경로/파이프라인 연결 완료
3. `Observed`: 런타임/실행 증적 확보 완료
4. `Accepted`: 정적 Verifier + Runtime Verifier 승인 완료
강제 규칙:
- 단계 점프 금지 (예: Implemented -> Accepted 금지)
- `Observed` 전에는 완료 선언 금지
- `Accepted` 전에는 머지 금지
## Code Review Checklist
**CRITICAL: Every PR review MUST verify plan-implementation consistency.**
@@ -170,3 +263,10 @@ Before approving any PR, the reviewer (human or agent) must check ALL of the fol
- [ ] PR references the Gitea issue number
- [ ] Feature branch follows naming convention (`feature/issue-N-description`)
- [ ] Commit messages are clear and descriptive
- [ ] 이슈/PR 작업 전에 `docs/commands.md`와 본 문서 트러블슈팅 섹션을 확인했다
- [ ] `gh` 명령을 사용하지 않고 `tea`(또는 허용된 Gitea API fallback)만 사용했다
- [ ] Main -> Verifier 지시가 Directive Contract 6개 항목을 모두 포함한다
- [ ] Verifier 결과에 `Coverage Matrix`(PASS/FAIL/NOT_OBSERVED)가 있고, `NOT_OBSERVED=0`이다
- [ ] Process-change-first 대상이면 해당 process PR이 먼저 머지되었다
- [ ] 티켓 단계가 `Implemented -> Integrated -> Observed -> Accepted` 순서로 기록되었다
- [ ] 정적 Verifier와 Runtime Verifier 승인 코멘트가 모두 존재한다

106
scripts/backtest_gate.sh Executable file
View File

@@ -0,0 +1,106 @@
#!/usr/bin/env bash
# Backtest gate for PR/push/scheduled verification.
set -euo pipefail
MODE="${BACKTEST_MODE:-auto}" # auto | smoke | full
BASE_REF="${BASE_REF:-origin/main}" # used when MODE=auto
FORCE_FULL="${FORCE_FULL_BACKTEST:-false}"
LOG_DIR="${LOG_DIR:-data/backtest-gate}"
mkdir -p "$LOG_DIR"
STAMP="$(date -u +%Y%m%d_%H%M%S)"
LOG_FILE="$LOG_DIR/backtest_gate_${STAMP}.log"
log() {
printf '%s %s\n' "$(date -u +%Y-%m-%dT%H:%M:%SZ)" "$1" | tee -a "$LOG_FILE"
}
run_cmd() {
log "[RUN] $*"
"$@" 2>&1 | tee -a "$LOG_FILE"
}
resolve_mode_from_changes() {
if [ "$FORCE_FULL" = "true" ]; then
echo "full"
return
fi
if ! git rev-parse --verify "$BASE_REF" >/dev/null 2>&1; then
log "[WARN] BASE_REF not found: $BASE_REF; fallback to full"
echo "full"
return
fi
changed_files="$(git diff --name-only "$BASE_REF"...HEAD || true)"
if [ -z "$changed_files" ]; then
log "[INFO] no changed files between $BASE_REF...HEAD; skip backtest gate"
echo "skip"
return
fi
log "[INFO] changed files from $BASE_REF...HEAD:"
while IFS= read -r line; do
[ -n "$line" ] && log " - $line"
done <<< "$changed_files"
# Backtest-sensitive areas: analysis/strategy/runtime execution semantics.
if printf '%s\n' "$changed_files" | rg -q \
'^(src/analysis/|src/strategy/|src/strategies/|src/main.py|src/markets/|src/broker/|tests/test_backtest_|tests/test_triple_barrier.py|tests/test_walk_forward_split.py|tests/test_main.py|docs/ouroboros/)'
then
echo "full"
else
echo "skip"
fi
}
SMOKE_TESTS=(
tests/test_backtest_pipeline_integration.py
tests/test_triple_barrier.py
tests/test_walk_forward_split.py
tests/test_backtest_cost_guard.py
tests/test_backtest_execution_model.py
)
FULL_TESTS=(
tests/test_backtest_pipeline_integration.py
tests/test_triple_barrier.py
tests/test_walk_forward_split.py
tests/test_backtest_cost_guard.py
tests/test_backtest_execution_model.py
tests/test_main.py
)
main() {
log "[INFO] backtest gate started mode=$MODE base_ref=$BASE_REF force_full=$FORCE_FULL"
selected_mode="$MODE"
if [ "$MODE" = "auto" ]; then
selected_mode="$(resolve_mode_from_changes)"
fi
case "$selected_mode" in
skip)
log "[PASS] backtest gate skipped (no backtest-sensitive changes)"
exit 0
;;
smoke)
run_cmd python3 -m pytest -q "${SMOKE_TESTS[@]}"
log "[PASS] smoke backtest gate passed"
;;
full)
run_cmd python3 -m pytest -q "${SMOKE_TESTS[@]}"
# Runtime semantics tied to v2 staged-exit must remain covered in full gate.
run_cmd python3 -m pytest -q tests/test_main.py -k \
"staged_exit_override or runtime_exit_cache_cleared or run_daily_session_applies_staged_exit_override_on_hold"
log "[PASS] full backtest gate passed"
;;
*)
log "[FAIL] invalid BACKTEST_MODE=$selected_mode (expected auto|smoke|full)"
exit 2
;;
esac
}
main "$@"

View File

@@ -0,0 +1,78 @@
#!/usr/bin/env bash
# Runtime verification monitor with NOT_OBSERVED detection.
set -euo pipefail
ROOT_DIR="${ROOT_DIR:-/home/agentson/repos/The-Ouroboros}"
LOG_DIR="${LOG_DIR:-$ROOT_DIR/data/overnight}"
INTERVAL_SEC="${INTERVAL_SEC:-60}"
MAX_HOURS="${MAX_HOURS:-24}"
cd "$ROOT_DIR"
OUT_LOG="$LOG_DIR/runtime_verify_$(date +%Y%m%d_%H%M%S).log"
END_TS=$(( $(date +%s) + MAX_HOURS*3600 ))
log() {
printf '%s %s\n' "$(date -u +%Y-%m-%dT%H:%M:%SZ)" "$1" | tee -a "$OUT_LOG" >/dev/null
}
check_signal() {
local name="$1"
local pattern="$2"
local run_log="$3"
if rg -q "$pattern" "$run_log"; then
log "[COVERAGE] ${name}=PASS pattern=${pattern}"
return 0
fi
log "[COVERAGE] ${name}=NOT_OBSERVED pattern=${pattern}"
return 1
}
log "[INFO] runtime verify monitor started interval=${INTERVAL_SEC}s max_hours=${MAX_HOURS}"
while true; do
now=$(date +%s)
if [ "$now" -ge "$END_TS" ]; then
log "[INFO] monitor completed (time window reached)"
exit 0
fi
latest_run="$(ls -t "$LOG_DIR"/run_*.log 2>/dev/null | head -n1 || true)"
if [ -z "$latest_run" ]; then
log "[ANOMALY] no run log found"
sleep "$INTERVAL_SEC"
continue
fi
# Basic liveness hints.
app_pid="$(cat "$LOG_DIR/app.pid" 2>/dev/null || true)"
wd_pid="$(cat "$LOG_DIR/watchdog.pid" 2>/dev/null || true)"
app_alive=0
wd_alive=0
port_alive=0
[ -n "$app_pid" ] && kill -0 "$app_pid" 2>/dev/null && app_alive=1
[ -n "$wd_pid" ] && kill -0 "$wd_pid" 2>/dev/null && wd_alive=1
ss -ltnp 2>/dev/null | rg -q ':8080' && port_alive=1
log "[HEARTBEAT] run_log=$latest_run app_alive=$app_alive watchdog_alive=$wd_alive port8080=$port_alive"
# Coverage matrix rows (session paths and policy gate evidence).
not_observed=0
check_signal "LIVE_MODE" "Mode: live" "$latest_run" || not_observed=$((not_observed+1))
check_signal "KR_LOOP" "Processing market: Korea Exchange" "$latest_run" || not_observed=$((not_observed+1))
check_signal "NXT_PATH" "NXT_PRE|NXT_AFTER|session=NXT_" "$latest_run" || not_observed=$((not_observed+1))
check_signal "US_PRE_PATH" "US_PRE|session=US_PRE" "$latest_run" || not_observed=$((not_observed+1))
check_signal "US_DAY_PATH" "US_DAY|session=US_DAY|Processing market: .*NASDAQ|Processing market: .*NYSE|Processing market: .*AMEX" "$latest_run" || not_observed=$((not_observed+1))
check_signal "US_AFTER_PATH" "US_AFTER|session=US_AFTER" "$latest_run" || not_observed=$((not_observed+1))
check_signal "ORDER_POLICY_SESSION" "Order policy rejected .*\\[session=" "$latest_run" || not_observed=$((not_observed+1))
if [ "$not_observed" -gt 0 ]; then
log "[ANOMALY] coverage_not_observed=$not_observed (treat as FAIL)"
else
log "[OK] coverage complete (NOT_OBSERVED=0)"
fi
sleep "$INTERVAL_SEC"
done

146
scripts/session_handover_check.py Executable file
View File

@@ -0,0 +1,146 @@
#!/usr/bin/env python3
"""Session handover preflight gate.
This script enforces a minimal handover record per working branch so that
new sessions cannot start implementation without reading the required docs
and recording current intent.
"""
from __future__ import annotations
import argparse
import subprocess
import sys
from datetime import UTC, datetime
from pathlib import Path
REQUIRED_DOCS = (
Path("docs/workflow.md"),
Path("docs/commands.md"),
Path("docs/agent-constraints.md"),
)
HANDOVER_LOG = Path("workflow/session-handover.md")
def _run_git(*args: str) -> str:
try:
return (
subprocess.check_output(["git", *args], stderr=subprocess.DEVNULL)
.decode("utf-8")
.strip()
)
except Exception:
return ""
def _current_branch() -> str:
branch = _run_git("branch", "--show-current")
if branch:
return branch
return _run_git("rev-parse", "--abbrev-ref", "HEAD")
def _latest_entry(text: str) -> str:
chunks = text.split("\n### ")
if not chunks:
return ""
if chunks[0].startswith("### "):
chunks[0] = chunks[0][4:]
latest = chunks[-1].strip()
if not latest:
return ""
if not latest.startswith("### "):
latest = f"### {latest}"
return latest
def _check_required_files(errors: list[str]) -> None:
for path in REQUIRED_DOCS:
if not path.exists():
errors.append(f"missing required document: {path}")
if not HANDOVER_LOG.exists():
errors.append(f"missing handover log: {HANDOVER_LOG}")
def _check_handover_entry(
*,
branch: str,
strict: bool,
errors: list[str],
) -> None:
if not HANDOVER_LOG.exists():
return
text = HANDOVER_LOG.read_text(encoding="utf-8")
latest = _latest_entry(text)
if not latest:
errors.append("handover log has no session entry")
return
required_tokens = (
"- branch:",
"- docs_checked:",
"- open_issues_reviewed:",
"- next_ticket:",
"- process_gate_checked:",
)
for token in required_tokens:
if token not in latest:
errors.append(f"latest handover entry missing token: {token}")
if strict:
today_utc = datetime.now(UTC).date().isoformat()
if today_utc not in latest:
errors.append(
f"latest handover entry must contain today's UTC date ({today_utc})"
)
branch_token = f"- branch: {branch}"
if branch_token not in latest:
errors.append(
"latest handover entry must target current branch "
f"({branch_token})"
)
if "- next_ticket: #TBD" in latest:
errors.append("latest handover entry must not use placeholder next_ticket (#TBD)")
if "merged_to_feature_branch=no" in latest:
errors.append(
"process gate indicates not merged; implementation must stay blocked "
"(merged_to_feature_branch=no)"
)
def main() -> int:
parser = argparse.ArgumentParser(
description="Validate session handover gate requirements."
)
parser.add_argument(
"--strict",
action="store_true",
help="Enforce today-date and current-branch match on latest handover entry.",
)
args = parser.parse_args()
errors: list[str] = []
_check_required_files(errors)
branch = _current_branch()
if not branch:
errors.append("cannot resolve current git branch")
elif branch in {"main", "master"}:
errors.append(f"working branch must not be {branch}")
_check_handover_entry(branch=branch, strict=args.strict, errors=errors)
if errors:
print("[FAIL] session handover check failed")
for err in errors:
print(f"- {err}")
return 1
print("[OK] session handover check passed")
print(f"[OK] branch={branch}")
print(f"[OK] handover_log={HANDOVER_LOG}")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,95 @@
#!/usr/bin/env python3
"""Validate persistent governance assets for agent workflow safety."""
from __future__ import annotations
import sys
from pathlib import Path
def must_contain(path: Path, required: list[str], errors: list[str]) -> None:
if not path.exists():
errors.append(f"missing file: {path}")
return
text = path.read_text(encoding="utf-8")
for token in required:
if token not in text:
errors.append(f"{path}: missing required token -> {token}")
def main() -> int:
errors: list[str] = []
pr_template = Path(".gitea/PULL_REQUEST_TEMPLATE.md")
issue_template = Path(".gitea/ISSUE_TEMPLATE/runtime_verification.md")
workflow_doc = Path("docs/workflow.md")
commands_doc = Path("docs/commands.md")
handover_script = Path("scripts/session_handover_check.py")
handover_log = Path("workflow/session-handover.md")
must_contain(
pr_template,
[
"Closes #N",
"Main -> Verifier Directive Contract",
"Coverage Matrix",
"NOT_OBSERVED",
"tea",
"gh",
"Session Handover Gate",
"session_handover_check.py --strict",
],
errors,
)
must_contain(
issue_template,
[
"[RUNTIME-VERIFY][SCN-XXX]",
"Requirement Mapping",
"Close Criteria",
"NOT_OBSERVED = 0",
],
errors,
)
must_contain(
workflow_doc,
[
"Session Handover Gate (Mandatory)",
"session_handover_check.py --strict",
],
errors,
)
must_contain(
commands_doc,
[
"Session Handover Preflight (Mandatory)",
"session_handover_check.py --strict",
],
errors,
)
must_contain(
handover_log,
[
"Session Handover Log",
"- branch:",
"- docs_checked:",
"- open_issues_reviewed:",
"- next_ticket:",
],
errors,
)
if not handover_script.exists():
errors.append(f"missing file: {handover_script}")
if errors:
print("[FAIL] governance asset validation failed")
for err in errors:
print(f"- {err}")
return 1
print("[OK] governance assets validated")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,187 @@
"""Integrated v2 backtest pipeline.
Wires TripleBarrier labeling + WalkForward split + CostGuard validation
into a single deterministic orchestration path.
"""
from __future__ import annotations
from collections.abc import Sequence
from dataclasses import dataclass
from statistics import mean
from typing import Literal
from src.analysis.backtest_cost_guard import BacktestCostModel, validate_backtest_cost_model
from src.analysis.triple_barrier import TripleBarrierSpec, label_with_triple_barrier
from src.analysis.walk_forward_split import WalkForwardFold, generate_walk_forward_splits
@dataclass(frozen=True)
class BacktestBar:
high: float
low: float
close: float
session_id: str
@dataclass(frozen=True)
class WalkForwardConfig:
train_size: int
test_size: int
step_size: int | None = None
purge_size: int = 0
embargo_size: int = 0
min_train_size: int = 1
@dataclass(frozen=True)
class BaselineScore:
name: Literal["B0", "B1", "M1"]
accuracy: float
@dataclass(frozen=True)
class BacktestFoldResult:
fold_index: int
train_indices: list[int]
test_indices: list[int]
train_label_distribution: dict[int, int]
test_label_distribution: dict[int, int]
baseline_scores: list[BaselineScore]
@dataclass(frozen=True)
class BacktestPipelineResult:
run_id: str
n_bars: int
n_entries: int
required_sessions: list[str]
label_distribution: dict[int, int]
folds: list[BacktestFoldResult]
def run_v2_backtest_pipeline(
*,
bars: Sequence[BacktestBar],
entry_indices: Sequence[int],
side: int,
triple_barrier_spec: TripleBarrierSpec,
walk_forward: WalkForwardConfig,
cost_model: BacktestCostModel,
required_sessions: list[str] | None = None,
) -> BacktestPipelineResult:
"""Run v2 integrated pipeline (cost guard -> labels -> walk-forward baselines)."""
if not bars:
raise ValueError("bars must not be empty")
if not entry_indices:
raise ValueError("entry_indices must not be empty")
resolved_sessions = (
sorted(set(required_sessions))
if required_sessions is not None
else sorted({bar.session_id for bar in bars})
)
validate_backtest_cost_model(model=cost_model, required_sessions=resolved_sessions)
highs = [float(bar.high) for bar in bars]
lows = [float(bar.low) for bar in bars]
closes = [float(bar.close) for bar in bars]
normalized_entries = sorted(set(int(i) for i in entry_indices))
if normalized_entries[0] < 0 or normalized_entries[-1] >= len(bars):
raise IndexError("entry index out of range")
labels_by_bar_index: dict[int, int] = {}
for idx in normalized_entries:
labels_by_bar_index[idx] = label_with_triple_barrier(
highs=highs,
lows=lows,
closes=closes,
entry_index=idx,
side=side,
spec=triple_barrier_spec,
).label
ordered_labels = [labels_by_bar_index[idx] for idx in normalized_entries]
folds = generate_walk_forward_splits(
n_samples=len(normalized_entries),
train_size=walk_forward.train_size,
test_size=walk_forward.test_size,
step_size=walk_forward.step_size,
purge_size=walk_forward.purge_size,
embargo_size=walk_forward.embargo_size,
min_train_size=walk_forward.min_train_size,
)
fold_results: list[BacktestFoldResult] = []
for fold_idx, fold in enumerate(folds):
train_labels = [ordered_labels[i] for i in fold.train_indices]
test_labels = [ordered_labels[i] for i in fold.test_indices]
if not test_labels:
continue
fold_results.append(
BacktestFoldResult(
fold_index=fold_idx,
train_indices=fold.train_indices,
test_indices=fold.test_indices,
train_label_distribution=_label_dist(train_labels),
test_label_distribution=_label_dist(test_labels),
baseline_scores=[
BaselineScore(name="B0", accuracy=_baseline_b0(train_labels, test_labels)),
BaselineScore(name="B1", accuracy=_score_constant(1, test_labels)),
BaselineScore(
name="M1",
accuracy=_score_constant(_m1_pred(train_labels), test_labels),
),
],
)
)
return BacktestPipelineResult(
run_id=_build_run_id(
n_entries=len(normalized_entries),
n_folds=len(fold_results),
sessions=resolved_sessions,
),
n_bars=len(bars),
n_entries=len(normalized_entries),
required_sessions=resolved_sessions,
label_distribution=_label_dist(ordered_labels),
folds=fold_results,
)
def _label_dist(labels: Sequence[int]) -> dict[int, int]:
dist: dict[int, int] = {-1: 0, 0: 0, 1: 0}
for val in labels:
if val in dist:
dist[val] += 1
return dist
def _score_constant(pred: int, actual: Sequence[int]) -> float:
return mean(1.0 if pred == label else 0.0 for label in actual)
def _baseline_b0(train_labels: Sequence[int], test_labels: Sequence[int]) -> float:
if not train_labels:
return _score_constant(0, test_labels)
# Majority-class baseline from training fold.
choices = (-1, 0, 1)
pred = max(choices, key=lambda c: train_labels.count(c))
return _score_constant(pred, test_labels)
def _m1_pred(train_labels: Sequence[int]) -> int:
if not train_labels:
return 0
return train_labels[-1]
def _build_run_id(*, n_entries: int, n_folds: int, sessions: Sequence[str]) -> str:
sess_key = "_".join(sessions)
return f"v2p-e{n_entries}-f{n_folds}-s{sess_key}"
def fold_has_leakage(fold: WalkForwardFold) -> bool:
"""Utility for tests/verification: True when train/test overlap exists."""
return bool(set(fold.train_indices).intersection(fold.test_indices))

View File

@@ -68,6 +68,8 @@ BLACKOUT_ORDER_MANAGER = BlackoutOrderManager(
max_queue_size=500,
)
_SESSION_CLOSE_WINDOWS = {"NXT_AFTER", "US_AFTER"}
_RUNTIME_EXIT_STATES: dict[str, PositionState] = {}
_RUNTIME_EXIT_PEAKS: dict[str, float] = {}
def safe_float(value: str | float | None, default: float = 0.0) -> float:
@@ -469,6 +471,118 @@ def _should_force_exit_for_overnight(
return not settings.OVERNIGHT_EXCEPTION_ENABLED
def _build_runtime_position_key(
*,
market_code: str,
stock_code: str,
open_position: dict[str, Any],
) -> str:
decision_id = str(open_position.get("decision_id") or "")
timestamp = str(open_position.get("timestamp") or "")
return f"{market_code}:{stock_code}:{decision_id}:{timestamp}"
def _clear_runtime_exit_cache_for_symbol(*, market_code: str, stock_code: str) -> None:
prefix = f"{market_code}:{stock_code}:"
stale_keys = [key for key in _RUNTIME_EXIT_STATES if key.startswith(prefix)]
for key in stale_keys:
_RUNTIME_EXIT_STATES.pop(key, None)
_RUNTIME_EXIT_PEAKS.pop(key, None)
def _apply_staged_exit_override_for_hold(
*,
decision: TradeDecision,
market: MarketInfo,
stock_code: str,
open_position: dict[str, Any] | None,
market_data: dict[str, Any],
stock_playbook: Any | None,
) -> TradeDecision:
"""Apply v2 staged exit semantics for HOLD positions using runtime state."""
if decision.action != "HOLD" or not open_position:
return decision
entry_price = safe_float(open_position.get("price"), 0.0)
current_price = safe_float(market_data.get("current_price"), 0.0)
if entry_price <= 0 or current_price <= 0:
return decision
stop_loss_threshold = -2.0
take_profit_threshold = 3.0
if stock_playbook and stock_playbook.scenarios:
stop_loss_threshold = stock_playbook.scenarios[0].stop_loss_pct
take_profit_threshold = stock_playbook.scenarios[0].take_profit_pct
runtime_key = _build_runtime_position_key(
market_code=market.code,
stock_code=stock_code,
open_position=open_position,
)
current_state = _RUNTIME_EXIT_STATES.get(runtime_key, PositionState.HOLDING)
prev_peak = _RUNTIME_EXIT_PEAKS.get(runtime_key, 0.0)
peak_hint = max(
safe_float(market_data.get("peak_price"), 0.0),
safe_float(market_data.get("session_high_price"), 0.0),
)
peak_price = max(entry_price, current_price, prev_peak, peak_hint)
exit_eval = evaluate_exit(
current_state=current_state,
config=ExitRuleConfig(
hard_stop_pct=stop_loss_threshold,
be_arm_pct=max(0.5, take_profit_threshold * 0.4),
arm_pct=take_profit_threshold,
),
inp=ExitRuleInput(
current_price=current_price,
entry_price=entry_price,
peak_price=peak_price,
atr_value=safe_float(market_data.get("atr_value"), 0.0),
pred_down_prob=safe_float(market_data.get("pred_down_prob"), 0.0),
liquidity_weak=safe_float(market_data.get("volume_ratio"), 1.0) < 1.0,
),
)
_RUNTIME_EXIT_STATES[runtime_key] = exit_eval.state
_RUNTIME_EXIT_PEAKS[runtime_key] = peak_price
if not exit_eval.should_exit:
return decision
pnl_pct = (current_price - entry_price) / entry_price * 100.0
if exit_eval.reason == "hard_stop":
rationale = (
f"Stop-loss triggered ({pnl_pct:.2f}% <= "
f"{stop_loss_threshold:.2f}%)"
)
elif exit_eval.reason == "arm_take_profit":
rationale = (
f"Take-profit triggered ({pnl_pct:.2f}% >= "
f"{take_profit_threshold:.2f}%)"
)
elif exit_eval.reason == "atr_trailing_stop":
rationale = "ATR trailing-stop triggered"
elif exit_eval.reason == "be_lock_threat":
rationale = "Break-even lock threat detected"
elif exit_eval.reason == "model_liquidity_exit":
rationale = "Model/liquidity exit triggered"
else:
rationale = f"Exit rule triggered ({exit_eval.reason})"
logger.info(
"Staged exit override for %s (%s): HOLD -> SELL (reason=%s, state=%s)",
stock_code,
market.name,
exit_eval.reason,
exit_eval.state.value,
)
return TradeDecision(
action="SELL",
confidence=max(decision.confidence, 90),
rationale=rationale,
)
async def build_overseas_symbol_universe(
db_conn: Any,
overseas_broker: OverseasBroker,
@@ -977,6 +1091,11 @@ async def trading_cycle(
"foreigner_net": foreigner_net,
"price_change_pct": price_change_pct,
}
session_high_price = safe_float(
price_output.get("high") or price_output.get("ovrs_hgpr") or price_output.get("stck_hgpr")
)
if session_high_price > 0:
market_data["session_high_price"] = session_high_price
# Enrich market_data with scanner metrics for scenario engine
market_candidates = scan_candidates.get(market.code, {})
@@ -1175,82 +1294,36 @@ async def trading_cycle(
if decision.action == "HOLD":
open_position = get_open_position(db_conn, stock_code, market.code)
if open_position:
entry_price = safe_float(open_position.get("price"), 0.0)
if entry_price > 0 and current_price > 0:
loss_pct = (current_price - entry_price) / entry_price * 100
stop_loss_threshold = -2.0
take_profit_threshold = 3.0
if stock_playbook and stock_playbook.scenarios:
stop_loss_threshold = stock_playbook.scenarios[0].stop_loss_pct
take_profit_threshold = stock_playbook.scenarios[0].take_profit_pct
exit_eval = evaluate_exit(
current_state=PositionState.HOLDING,
config=ExitRuleConfig(
hard_stop_pct=stop_loss_threshold,
be_arm_pct=max(0.5, take_profit_threshold * 0.4),
arm_pct=take_profit_threshold,
),
inp=ExitRuleInput(
current_price=current_price,
entry_price=entry_price,
peak_price=max(entry_price, current_price),
atr_value=0.0,
pred_down_prob=0.0,
liquidity_weak=market_data.get("volume_ratio", 1.0) < 1.0,
),
)
if exit_eval.reason == "hard_stop":
decision = TradeDecision(
action="SELL",
confidence=95,
rationale=(
f"Stop-loss triggered ({loss_pct:.2f}% <= "
f"{stop_loss_threshold:.2f}%)"
),
)
logger.info(
"Stop-loss override for %s (%s): %.2f%% <= %.2f%%",
stock_code,
market.name,
loss_pct,
stop_loss_threshold,
)
elif exit_eval.reason == "arm_take_profit":
decision = TradeDecision(
action="SELL",
confidence=90,
rationale=(
f"Take-profit triggered ({loss_pct:.2f}% >= "
f"{take_profit_threshold:.2f}%)"
),
)
logger.info(
"Take-profit override for %s (%s): %.2f%% >= %.2f%%",
stock_code,
market.name,
loss_pct,
take_profit_threshold,
)
if decision.action == "HOLD" and _should_force_exit_for_overnight(
if not open_position:
_clear_runtime_exit_cache_for_symbol(
market_code=market.code,
stock_code=stock_code,
)
decision = _apply_staged_exit_override_for_hold(
decision=decision,
market=market,
stock_code=stock_code,
open_position=open_position,
market_data=market_data,
stock_playbook=stock_playbook,
)
if open_position and decision.action == "HOLD" and _should_force_exit_for_overnight(
market=market,
settings=settings,
):
decision = TradeDecision(
action="SELL",
confidence=max(decision.confidence, 85),
rationale=(
"Forced exit by overnight policy"
" (session close window / kill switch priority)"
),
)
logger.info(
"Overnight policy override for %s (%s): HOLD -> SELL",
stock_code,
market.name,
)
):
decision = TradeDecision(
action="SELL",
confidence=max(decision.confidence, 85),
rationale=(
"Forced exit by overnight policy"
" (session close window / kill switch priority)"
),
)
logger.info(
"Overnight policy override for %s (%s): HOLD -> SELL",
stock_code,
market.name,
)
logger.info(
"Decision for %s (%s): %s (confidence=%d)",
stock_code,
@@ -2190,6 +2263,14 @@ async def run_daily_session(
"foreigner_net": foreigner_net,
"price_change_pct": price_change_pct,
}
if not market.is_domestic:
session_high_price = safe_float(
price_data.get("output", {}).get("high")
or price_data.get("output", {}).get("ovrs_hgpr")
or price_data.get("output", {}).get("stck_hgpr")
)
if session_high_price > 0:
stock_data["session_high_price"] = session_high_price
# Enrich with scanner metrics
cand = candidate_map.get(stock_code)
if cand:
@@ -2317,6 +2398,7 @@ async def run_daily_session(
)
for stock_data in stocks_data:
stock_code = stock_data["stock_code"]
stock_playbook = playbook.get_stock_playbook(stock_code)
match = scenario_engine.evaluate(
playbook, stock_code, stock_data, portfolio_data,
)
@@ -2362,7 +2444,20 @@ async def run_daily_session(
)
if decision.action == "HOLD":
daily_open = get_open_position(db_conn, stock_code, market.code)
if daily_open and _should_force_exit_for_overnight(
if not daily_open:
_clear_runtime_exit_cache_for_symbol(
market_code=market.code,
stock_code=stock_code,
)
decision = _apply_staged_exit_override_for_hold(
decision=decision,
market=market,
stock_code=stock_code,
open_position=daily_open,
market_data=stock_data,
stock_playbook=stock_playbook,
)
if daily_open and decision.action == "HOLD" and _should_force_exit_for_overnight(
market=market,
settings=settings,
):
@@ -3408,7 +3503,10 @@ async def run(settings: Settings) -> None:
_run_context_scheduler(context_scheduler, now=datetime.now(UTC))
# Get currently open markets
open_markets = get_open_markets(settings.enabled_market_list)
open_markets = get_open_markets(
settings.enabled_market_list,
include_extended_sessions=True,
)
if not open_markets:
# Notify market close for any markets that were open
@@ -3437,7 +3535,8 @@ async def run(settings: Settings) -> None:
# No markets open — wait until next market opens
try:
next_market, next_open_time = get_next_market_open(
settings.enabled_market_list
settings.enabled_market_list,
include_extended_sessions=True,
)
now = datetime.now(UTC)
wait_seconds = (next_open_time - now).total_seconds()
@@ -3459,6 +3558,14 @@ async def run(settings: Settings) -> None:
if shutdown.is_set():
break
session_info = get_session_info(market)
logger.info(
"Market session active: %s (%s) session=%s",
market.code,
market.name,
session_info.session_id,
)
await process_blackout_recovery_orders(
broker=broker,
overseas_broker=overseas_broker,

View File

@@ -1,7 +1,7 @@
"""Market schedule management with timezone support."""
from dataclasses import dataclass
from datetime import datetime, time, timedelta
from datetime import UTC, datetime, time, timedelta
from zoneinfo import ZoneInfo
@@ -181,7 +181,10 @@ def is_market_open(market: MarketInfo, now: datetime | None = None) -> bool:
def get_open_markets(
enabled_markets: list[str] | None = None, now: datetime | None = None
enabled_markets: list[str] | None = None,
now: datetime | None = None,
*,
include_extended_sessions: bool = False,
) -> list[MarketInfo]:
"""
Get list of currently open markets.
@@ -196,17 +199,31 @@ def get_open_markets(
if enabled_markets is None:
enabled_markets = list(MARKETS.keys())
def is_available(market: MarketInfo) -> bool:
if not include_extended_sessions:
return is_market_open(market, now)
if market.code == "KR" or market.code.startswith("US"):
# Import lazily to avoid module cycle at import-time.
from src.core.order_policy import classify_session_id
session_id = classify_session_id(market, now)
return session_id not in {"KR_OFF", "US_OFF"}
return is_market_open(market, now)
open_markets = [
MARKETS[code]
for code in enabled_markets
if code in MARKETS and is_market_open(MARKETS[code], now)
if code in MARKETS and is_available(MARKETS[code])
]
return sorted(open_markets, key=lambda m: m.code)
def get_next_market_open(
enabled_markets: list[str] | None = None, now: datetime | None = None
enabled_markets: list[str] | None = None,
now: datetime | None = None,
*,
include_extended_sessions: bool = False,
) -> tuple[MarketInfo, datetime]:
"""
Find the next market that will open and when.
@@ -233,6 +250,21 @@ def get_next_market_open(
next_open_time: datetime | None = None
next_market: MarketInfo | None = None
def first_extended_open_after(market: MarketInfo, start_utc: datetime) -> datetime | None:
# Search minute-by-minute for KR/US session transition into active window.
# Bounded to 7 days to match existing behavior.
from src.core.order_policy import classify_session_id
ts = start_utc.astimezone(ZoneInfo("UTC")).replace(second=0, microsecond=0)
prev_active = classify_session_id(market, ts) not in {"KR_OFF", "US_OFF"}
for _ in range(7 * 24 * 60):
ts = ts + timedelta(minutes=1)
active = classify_session_id(market, ts) not in {"KR_OFF", "US_OFF"}
if active and not prev_active:
return ts
prev_active = active
return None
for code in enabled_markets:
if code not in MARKETS:
continue
@@ -240,6 +272,13 @@ def get_next_market_open(
market = MARKETS[code]
market_now = now.astimezone(market.timezone)
if include_extended_sessions and (market.code == "KR" or market.code.startswith("US")):
ext_open = first_extended_open_after(market, now.astimezone(UTC))
if ext_open and (next_open_time is None or ext_open < next_open_time):
next_open_time = ext_open
next_market = market
continue
# Calculate next open time for this market
for days_ahead in range(7): # Check next 7 days
check_date = market_now.date() + timedelta(days=days_ahead)

View File

@@ -0,0 +1,136 @@
from __future__ import annotations
from src.analysis.backtest_cost_guard import BacktestCostModel
from src.analysis.backtest_pipeline import (
BacktestBar,
WalkForwardConfig,
fold_has_leakage,
run_v2_backtest_pipeline,
)
from src.analysis.triple_barrier import TripleBarrierSpec
from src.analysis.walk_forward_split import generate_walk_forward_splits
def _bars() -> list[BacktestBar]:
closes = [100.0, 101.0, 102.0, 101.5, 103.0, 102.5, 104.0, 103.5, 105.0, 104.5, 106.0, 105.5]
bars: list[BacktestBar] = []
for i, close in enumerate(closes):
bars.append(
BacktestBar(
high=close + 1.0,
low=close - 1.0,
close=close,
session_id="KRX_REG" if i % 2 == 0 else "US_PRE",
)
)
return bars
def _cost_model() -> BacktestCostModel:
return BacktestCostModel(
commission_bps=3.0,
slippage_bps_by_session={"KRX_REG": 10.0, "US_PRE": 50.0},
failure_rate_by_session={"KRX_REG": 0.01, "US_PRE": 0.08},
unfavorable_fill_required=True,
)
def test_pipeline_happy_path_returns_fold_and_artifact_contract() -> None:
out = run_v2_backtest_pipeline(
bars=_bars(),
entry_indices=[0, 1, 2, 3, 4, 5, 6, 7],
side=1,
triple_barrier_spec=TripleBarrierSpec(
take_profit_pct=0.02,
stop_loss_pct=0.01,
max_holding_bars=3,
),
walk_forward=WalkForwardConfig(
train_size=4,
test_size=2,
step_size=2,
purge_size=1,
embargo_size=1,
min_train_size=3,
),
cost_model=_cost_model(),
)
assert out.run_id.startswith("v2p-e8-f")
assert out.n_bars == 12
assert out.n_entries == 8
assert out.required_sessions == ["KRX_REG", "US_PRE"]
assert len(out.folds) > 0
assert set(out.label_distribution) == {-1, 0, 1}
for fold in out.folds:
names = {score.name for score in fold.baseline_scores}
assert names == {"B0", "B1", "M1"}
for score in fold.baseline_scores:
assert 0.0 <= score.accuracy <= 1.0
def test_pipeline_cost_guard_fail_fast() -> None:
bad = BacktestCostModel(
commission_bps=3.0,
slippage_bps_by_session={"KRX_REG": 10.0},
failure_rate_by_session={"KRX_REG": 0.01},
unfavorable_fill_required=True,
)
try:
run_v2_backtest_pipeline(
bars=_bars(),
entry_indices=[0, 1, 2, 3],
side=1,
triple_barrier_spec=TripleBarrierSpec(
take_profit_pct=0.02,
stop_loss_pct=0.01,
max_holding_bars=3,
),
walk_forward=WalkForwardConfig(train_size=2, test_size=1),
cost_model=bad,
required_sessions=["KRX_REG", "US_PRE"],
)
except ValueError as exc:
assert "missing slippage_bps_by_session" in str(exc)
else:
raise AssertionError("expected cost guard validation error")
def test_pipeline_fold_leakage_guard() -> None:
folds = generate_walk_forward_splits(
n_samples=12,
train_size=6,
test_size=2,
step_size=2,
purge_size=1,
embargo_size=1,
min_train_size=5,
)
assert folds
for fold in folds:
assert not fold_has_leakage(fold)
def test_pipeline_deterministic_seed_free_deterministic_result() -> None:
cfg = dict(
bars=_bars(),
entry_indices=[0, 1, 2, 3, 4, 5, 6, 7],
side=1,
triple_barrier_spec=TripleBarrierSpec(
take_profit_pct=0.02,
stop_loss_pct=0.01,
max_holding_bars=3,
),
walk_forward=WalkForwardConfig(
train_size=4,
test_size=2,
step_size=2,
purge_size=1,
embargo_size=1,
min_train_size=3,
),
cost_model=_cost_model(),
)
out1 = run_v2_backtest_pipeline(**cfg)
out2 = run_v2_backtest_pipeline(**cfg)
assert out1 == out2

View File

@@ -15,6 +15,8 @@ from src.evolution.scorecard import DailyScorecard
from src.logging.decision_logger import DecisionLogger
from src.main import (
KILL_SWITCH,
_RUNTIME_EXIT_PEAKS,
_RUNTIME_EXIT_STATES,
_should_force_exit_for_overnight,
_should_block_overseas_buy_for_fx_buffer,
_trigger_emergency_kill_switch,
@@ -42,6 +44,7 @@ from src.strategy.models import (
StockCondition,
StockScenario,
)
from src.strategy.position_state_machine import PositionState
from src.strategy.scenario_engine import ScenarioEngine, ScenarioMatch
@@ -87,8 +90,12 @@ def _make_sell_match(stock_code: str = "005930") -> ScenarioMatch:
def _reset_kill_switch_state() -> None:
"""Prevent cross-test leakage from global kill-switch state."""
KILL_SWITCH.clear_block()
_RUNTIME_EXIT_STATES.clear()
_RUNTIME_EXIT_PEAKS.clear()
yield
KILL_SWITCH.clear_block()
_RUNTIME_EXIT_STATES.clear()
_RUNTIME_EXIT_PEAKS.clear()
class TestExtractAvgPriceFromBalance:
@@ -2337,6 +2344,218 @@ async def test_hold_not_overridden_when_between_stop_loss_and_take_profit() -> N
broker.send_order.assert_not_called()
@pytest.mark.asyncio
async def test_hold_overridden_to_sell_on_be_lock_threat_after_state_arms() -> None:
"""Staged exit must use runtime state (BE_LOCK -> be_lock_threat -> SELL)."""
db_conn = init_db(":memory:")
decision_logger = DecisionLogger(db_conn)
buy_decision_id = decision_logger.log_decision(
stock_code="005930",
market="KR",
exchange_code="KRX",
action="BUY",
confidence=90,
rationale="entry",
context_snapshot={},
input_data={},
)
log_trade(
conn=db_conn,
stock_code="005930",
action="BUY",
confidence=90,
rationale="entry",
quantity=1,
price=100.0,
market="KR",
exchange_code="KRX",
decision_id=buy_decision_id,
)
broker = MagicMock()
broker.get_current_price = AsyncMock(side_effect=[(102.0, 2.0, 0.0), (99.0, -1.0, 0.0)])
broker.get_balance = AsyncMock(
return_value={
"output1": [{"pdno": "005930", "ord_psbl_qty": "1"}],
"output2": [
{
"tot_evlu_amt": "100000",
"dnca_tot_amt": "10000",
"pchs_amt_smtl_amt": "90000",
}
],
}
)
broker.send_order = AsyncMock(return_value={"msg1": "OK"})
scenario = StockScenario(
condition=StockCondition(rsi_below=30),
action=ScenarioAction.BUY,
confidence=88,
stop_loss_pct=-5.0,
take_profit_pct=3.0,
rationale="staged exit policy",
)
playbook = DayPlaybook(
date=date(2026, 2, 8),
market="KR",
stock_playbooks=[
{"stock_code": "005930", "stock_name": "Samsung", "scenarios": [scenario]}
],
)
engine = MagicMock(spec=ScenarioEngine)
engine.evaluate = MagicMock(return_value=_make_hold_match())
market = MagicMock()
market.name = "Korea"
market.code = "KR"
market.exchange_code = "KRX"
market.is_domestic = True
telegram = MagicMock()
telegram.notify_trade_execution = AsyncMock()
telegram.notify_fat_finger = AsyncMock()
telegram.notify_circuit_breaker = AsyncMock()
telegram.notify_scenario_matched = AsyncMock()
for _ in range(2):
await trading_cycle(
broker=broker,
overseas_broker=MagicMock(),
scenario_engine=engine,
playbook=playbook,
risk=MagicMock(),
db_conn=db_conn,
decision_logger=decision_logger,
context_store=MagicMock(
get_latest_timeframe=MagicMock(return_value=None),
set_context=MagicMock(),
),
criticality_assessor=MagicMock(
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
get_timeout=MagicMock(return_value=5.0),
),
telegram=telegram,
market=market,
stock_code="005930",
scan_candidates={},
)
broker.send_order.assert_called_once()
assert broker.send_order.call_args.kwargs["order_type"] == "SELL"
@pytest.mark.asyncio
async def test_runtime_exit_cache_cleared_when_position_closed() -> None:
"""Runtime staged-exit cache must be cleared when no open position exists."""
db_conn = init_db(":memory:")
decision_logger = DecisionLogger(db_conn)
buy_decision_id = decision_logger.log_decision(
stock_code="005930",
market="KR",
exchange_code="KRX",
action="BUY",
confidence=90,
rationale="entry",
context_snapshot={},
input_data={},
)
log_trade(
conn=db_conn,
stock_code="005930",
action="BUY",
confidence=90,
rationale="entry",
quantity=1,
price=100.0,
market="KR",
exchange_code="KRX",
decision_id=buy_decision_id,
)
broker = MagicMock()
broker.get_current_price = AsyncMock(return_value=(100.0, 0.0, 0.0))
broker.get_balance = AsyncMock(
return_value={
"output1": [{"pdno": "005930", "ord_psbl_qty": "1"}],
"output2": [
{
"tot_evlu_amt": "100000",
"dnca_tot_amt": "10000",
"pchs_amt_smtl_amt": "90000",
}
],
}
)
broker.send_order = AsyncMock(return_value={"msg1": "OK"})
market = MagicMock()
market.name = "Korea"
market.code = "KR"
market.exchange_code = "KRX"
market.is_domestic = True
telegram = MagicMock()
telegram.notify_trade_execution = AsyncMock()
telegram.notify_fat_finger = AsyncMock()
telegram.notify_circuit_breaker = AsyncMock()
telegram.notify_scenario_matched = AsyncMock()
_RUNTIME_EXIT_STATES[f"{market.code}:005930:{buy_decision_id}:dummy-ts"] = PositionState.BE_LOCK
_RUNTIME_EXIT_PEAKS[f"{market.code}:005930:{buy_decision_id}:dummy-ts"] = 120.0
# Close position first so trading_cycle observes no open position.
sell_decision_id = decision_logger.log_decision(
stock_code="005930",
market="KR",
exchange_code="KRX",
action="SELL",
confidence=90,
rationale="manual close",
context_snapshot={},
input_data={},
)
log_trade(
conn=db_conn,
stock_code="005930",
action="SELL",
confidence=90,
rationale="manual close",
quantity=1,
price=100.0,
market="KR",
exchange_code="KRX",
decision_id=sell_decision_id,
)
await trading_cycle(
broker=broker,
overseas_broker=MagicMock(),
scenario_engine=MagicMock(evaluate=MagicMock(return_value=_make_hold_match())),
playbook=_make_playbook(),
risk=MagicMock(),
db_conn=db_conn,
decision_logger=decision_logger,
context_store=MagicMock(
get_latest_timeframe=MagicMock(return_value=None),
set_context=MagicMock(),
),
criticality_assessor=MagicMock(
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
get_timeout=MagicMock(return_value=5.0),
),
telegram=telegram,
market=market,
stock_code="005930",
scan_candidates={},
)
assert not [k for k in _RUNTIME_EXIT_STATES if k.startswith("KR:005930:")]
assert not [k for k in _RUNTIME_EXIT_PEAKS if k.startswith("KR:005930:")]
@pytest.mark.asyncio
async def test_stop_loss_not_triggered_when_current_price_is_zero() -> None:
"""HOLD must stay HOLD when current_price=0 even if entry_price is set (issue #251).
@@ -4135,6 +4354,130 @@ class TestDailyCBBaseline:
assert result == 55000.0
@pytest.mark.asyncio
async def test_run_daily_session_applies_staged_exit_override_on_hold() -> None:
"""run_daily_session must apply HOLD staged exit semantics (issue #304)."""
from src.analysis.smart_scanner import ScanCandidate
db_conn = init_db(":memory:")
log_trade(
conn=db_conn,
stock_code="005930",
action="BUY",
confidence=90,
rationale="entry",
quantity=1,
price=100.0,
market="KR",
exchange_code="KRX",
decision_id="buy-d1",
)
settings = Settings(
KIS_APP_KEY="k",
KIS_APP_SECRET="s",
KIS_ACCOUNT_NO="12345678-01",
GEMINI_API_KEY="g",
MODE="paper",
)
broker = MagicMock()
broker.get_balance = AsyncMock(
return_value={
"output1": [{"pdno": "005930", "ord_psbl_qty": "1"}],
"output2": [
{
"tot_evlu_amt": "100000",
"dnca_tot_amt": "10000",
"pchs_amt_smtl_amt": "90000",
}
],
}
)
broker.get_current_price = AsyncMock(return_value=(95.0, -5.0, 0.0))
broker.send_order = AsyncMock(return_value={"msg1": "OK"})
market = MagicMock()
market.name = "Korea"
market.code = "KR"
market.exchange_code = "KRX"
market.is_domestic = True
market.timezone = __import__("zoneinfo").ZoneInfo("Asia/Seoul")
scenario = StockScenario(
condition=StockCondition(rsi_below=30),
action=ScenarioAction.BUY,
confidence=88,
stop_loss_pct=-2.0,
take_profit_pct=3.0,
rationale="stop loss policy",
)
playbook = DayPlaybook(
date=date(2026, 2, 8),
market="KR",
stock_playbooks=[
{"stock_code": "005930", "stock_name": "Samsung", "scenarios": [scenario]}
],
)
playbook_store = MagicMock()
playbook_store.load = MagicMock(return_value=playbook)
smart_scanner = MagicMock()
smart_scanner.scan = AsyncMock(
return_value=[
ScanCandidate(
stock_code="005930",
name="Samsung",
price=95.0,
volume=1_000_000.0,
volume_ratio=2.0,
rsi=42.0,
signal="momentum",
score=80.0,
)
]
)
scenario_engine = MagicMock(spec=ScenarioEngine)
scenario_engine.evaluate = MagicMock(return_value=_make_hold_match("005930"))
risk = MagicMock()
risk.check_circuit_breaker = MagicMock()
risk.validate_order = MagicMock()
decision_logger = MagicMock()
decision_logger.log_decision = MagicMock(return_value="d1")
telegram = MagicMock()
telegram.notify_trade_execution = AsyncMock()
telegram.notify_scenario_matched = AsyncMock()
async def _passthrough(fn, *a, label: str = "", **kw): # type: ignore[override]
return await fn(*a, **kw)
with patch("src.main.get_open_markets", return_value=[market]), \
patch("src.main._retry_connection", new=_passthrough):
await run_daily_session(
broker=broker,
overseas_broker=MagicMock(),
scenario_engine=scenario_engine,
playbook_store=playbook_store,
pre_market_planner=MagicMock(),
risk=risk,
db_conn=db_conn,
decision_logger=decision_logger,
context_store=MagicMock(),
criticality_assessor=MagicMock(),
telegram=telegram,
settings=settings,
smart_scanner=smart_scanner,
daily_start_eval=0.0,
)
broker.send_order.assert_called_once()
assert broker.send_order.call_args.kwargs["order_type"] == "SELL"
# ---------------------------------------------------------------------------
# sync_positions_from_broker — startup DB sync tests (issue #206)
# ---------------------------------------------------------------------------

View File

@@ -147,6 +147,24 @@ class TestGetOpenMarkets:
codes = [m.code for m in open_markets]
assert codes == sorted(codes)
def test_get_open_markets_us_pre_extended_session(self) -> None:
"""US premarket should be considered open when extended sessions enabled."""
# Monday 2026-02-02 08:30 EST = 13:30 UTC (premarket window)
test_time = datetime(2026, 2, 2, 13, 30, tzinfo=ZoneInfo("UTC"))
regular = get_open_markets(
enabled_markets=["US_NASDAQ", "US_NYSE", "US_AMEX"],
now=test_time,
)
assert regular == []
extended = get_open_markets(
enabled_markets=["US_NASDAQ", "US_NYSE", "US_AMEX"],
now=test_time,
include_extended_sessions=True,
)
assert {m.code for m in extended} == {"US_NASDAQ", "US_NYSE", "US_AMEX"}
class TestGetNextMarketOpen:
"""Test get_next_market_open function."""
@@ -201,6 +219,20 @@ class TestGetNextMarketOpen:
)
assert market.code == "KR"
def test_get_next_market_open_prefers_extended_session(self) -> None:
"""Extended lookup should return premarket open time before regular open."""
# Monday 2026-02-02 07:00 EST = 12:00 UTC
# By v3 KST session rules, US is OFF only in KST 07:00-10:00 (UTC 22:00-01:00).
# At 12:00 UTC market is active, so next OFF->ON transition is 01:00 UTC next day.
test_time = datetime(2026, 2, 2, 12, 0, tzinfo=ZoneInfo("UTC"))
market, next_open = get_next_market_open(
enabled_markets=["US_NASDAQ"],
now=test_time,
include_extended_sessions=True,
)
assert market.code == "US_NASDAQ"
assert next_open == datetime(2026, 2, 3, 1, 0, tzinfo=ZoneInfo("UTC"))
class TestExpandMarketCodes:
"""Test shorthand market expansion."""

View File

@@ -0,0 +1,83 @@
# Session Handover Log
목적: 세션 시작 시 인수인계 확인을 기록하고, 구현/검증 작업 시작 전에 공통 컨텍스트를 강제한다.
작성 규칙:
- 세션 시작마다 최신 엔트리를 맨 아래에 추가한다.
- `docs/workflow.md`, `docs/commands.md`, `docs/agent-constraints.md`를 먼저 확인한 뒤 기록한다.
- 각 엔트리는 현재 작업 브랜치 기준으로 작성한다.
템플릿:
```md
### YYYY-MM-DD | session=<id or short label>
- branch: <current-branch>
- docs_checked: docs/workflow.md, docs/commands.md, docs/agent-constraints.md
- open_issues_reviewed: #...
- next_ticket: #...
- process_gate_checked: process_ticket=#..., merged_to_feature_branch=yes|no|n/a
- risks_or_notes: ...
```
### 2026-02-27 | session=handover-gate-bootstrap
- branch: feature/v3-session-policy-stream
- docs_checked: docs/workflow.md, docs/commands.md, docs/agent-constraints.md
- open_issues_reviewed: #304, #305, #306
- next_ticket: #304
- risks_or_notes: 세션 시작 게이트를 문서/스크립트/CI로 강제 적용
### 2026-02-27 | session=codex-handover-start
- branch: feature/v3-session-policy-stream
- docs_checked: docs/workflow.md, docs/commands.md, docs/agent-constraints.md
- open_issues_reviewed: #306, #308, #309
- next_ticket: #304
- process_gate_checked: process_ticket=#306,#308 merged_to_feature_branch=yes
- risks_or_notes: 미추적 로컬 파일 존재(문서/DB/lock)로 커밋 범위 분리 필요
### 2026-02-27 | session=codex-process-gate-hardening
- branch: feature/issue-304-runtime-staged-exit-semantics
- docs_checked: docs/workflow.md, docs/commands.md, docs/agent-constraints.md
- open_issues_reviewed: #304, #305
- next_ticket: #304
- process_gate_checked: process_ticket=#306,#308 merged_to_feature_branch=yes
- risks_or_notes: process-change-first 실행 게이트를 문서+스크립트로 강화
### 2026-02-27 | session=codex-handover-start-2
- branch: feature/issue-304-runtime-staged-exit-semantics
- docs_checked: docs/workflow.md, docs/commands.md, docs/agent-constraints.md
- open_issues_reviewed: #304, #305
- next_ticket: #304
- process_gate_checked: process_ticket=#306,#308 merged_to_feature_branch=yes
- risks_or_notes: handover 재시작 요청으로 세션 엔트리 추가, 미추적 산출물(AMS/NAS/NYS, DB, lock, xlsx) 커밋 분리 필요
### 2026-02-27 | session=codex-issue305-start
- branch: feature/v3-session-policy-stream
- docs_checked: docs/workflow.md, docs/commands.md, docs/agent-constraints.md
- open_issues_reviewed: #305
- next_ticket: #305
- process_gate_checked: process_ticket=#306,#308 merged_to_feature_branch=yes
- risks_or_notes: #305 구현을 위해 분석/백테스트 모듈 통합 경로 점검 시작
### 2026-02-27 | session=codex-issue305-ticket-branch
- branch: feature/issue-305-backtest-pipeline-integration
- docs_checked: docs/workflow.md, docs/commands.md, docs/agent-constraints.md
- open_issues_reviewed: #305
- next_ticket: #305
- process_gate_checked: process_ticket=#306,#308 merged_to_feature_branch=yes
- risks_or_notes: 티켓 브랜치 분기 후 strict gate 재통과를 위한 엔트리 추가
### 2026-02-27 | session=codex-backtest-gate-automation
- branch: feature/v3-session-policy-stream
- docs_checked: docs/workflow.md, docs/commands.md, docs/agent-constraints.md
- open_issues_reviewed: #304, #305
- next_ticket: (create) backtest automation gate
- process_gate_checked: process_ticket=#306,#308 merged_to_feature_branch=yes
- risks_or_notes: 백테스트 자동화 누락 재발 방지 위해 이슈/티켓 브랜치/PR 절차로 즉시 정규화
### 2026-02-27 | session=codex-issue314-ticket-branch
- branch: feature/issue-314-backtest-gate-automation
- docs_checked: docs/workflow.md, docs/commands.md, docs/agent-constraints.md
- open_issues_reviewed: #314
- next_ticket: #314
- process_gate_checked: process_ticket=#306,#308 merged_to_feature_branch=yes
- risks_or_notes: 백테스트 자동 게이트 도입 티켓 브랜치 strict gate 통과용 엔트리