Compare commits

..

1 Commits

Author SHA1 Message Date
agentson
3fdb7a29d4 feat: US market code 정합성, Telegram 명령 4종, 손절 모니터링 (#132)
Some checks failed
CI / test (pull_request) Has been cancelled
- MARKET_SHORTHAND + expand_market_codes()로 config "US" → schedule "US_NASDAQ/NYSE/AMEX" 자동 확장
- /report, /scenarios, /review, /dashboard 텔레그램 명령 추가
- price_change_pct를 trading_cycle과 run_daily_session에 주입
- HOLD시 get_open_position 기반 손절 모니터링 및 자동 SELL 오버라이드
- 대시보드 /api/status 동적 market 조회로 변경

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 20:24:01 +09:00
17 changed files with 1682 additions and 460 deletions

182
README.md
View File

@@ -1,126 +1,154 @@
# The Ouroboros — 자가 진화형 AI 투자 시스템
KIS API 기반 자동매매 + Gemini 기반 장전 전략 생성 + 장중 로컬 시나리오 실행 + 장후 리뷰/진화 루프를 결합한 시스템입니다.
KIS(한국투자증권) API로 매매하고, Google Gemini로 판단하며, 자체 전략 코드를 TDD 기반으로 진화시키는 자율 주식 트레이딩 에이전트.
## 현재 상태 (2026-02-16)
## 아키텍처
- V2 계획 기준 완료: **18/20**
- 부분 완료: **1/20** (`1-7` 일부 항목)
- 미완료: **1/20** (`4-1` Telegram 확장 명령어)
```
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ KIS Broker │◄───►│ Main │◄───►│ Gemini Brain│
│ (매매 실행) │ │ (거래 루프) │ │ (의사결정) │
└─────────────┘ └──────┬──────┘ └─────────────┘
┌──────┴──────┐
│Risk Manager │
│ (안전장치) │
└──────┬──────┘
┌──────┴──────┐
│ Evolution │
│ (전략 진화) │
└─────────────┘
```
핵심 전환은 이미 반영되었습니다.
## 핵심 모듈
- 기존: 장중 `brain.decide()` 실시간 의존
- 현재: 장전 `DayPlaybook` 생성 + 장중 `ScenarioEngine` 로컬 매칭
| 모듈 | 파일 | 설명 |
|------|------|------|
| 설정 | `src/config.py` | Pydantic 기반 환경변수 로딩 및 타입 검증 |
| 브로커 | `src/broker/kis_api.py` | KIS API 비동기 래퍼 (토큰 갱신, 레이트 리미터, 해시키) |
| 두뇌 | `src/brain/gemini_client.py` | Gemini 프롬프트 구성 및 JSON 응답 파싱 |
| 방패 | `src/core/risk_manager.py` | 서킷 브레이커 + 팻 핑거 체크 |
| 알림 | `src/notifications/telegram_client.py` | 텔레그램 실시간 거래 알림 (선택사항) |
| 진화 | `src/evolution/optimizer.py` | 실패 패턴 분석 → 새 전략 생성 → 테스트 → PR |
| DB | `src/db.py` | SQLite 거래 로그 기록 |
## 핵심 구성
## 안전장치
- `src/main.py`: 시장 루프, 플레이북 생성/적용, EOD 집계, 리뷰/진화 연결
- `src/strategy/`: `models`, `pre_market_planner`, `scenario_engine`, `playbook_store`
- `src/context/`: `store`, `aggregator`, `scheduler` (L1~L7)
- `src/evolution/daily_review.py`: 시장별 scorecard/lessons 생성
- `src/dashboard/app.py`: FastAPI 관측 API
- `src/notifications/telegram_client.py`: 알림 및 명령 핸들러
| 규칙 | 내용 |
|------|------|
| 서킷 브레이커 | 일일 손실률 -3.0% 초과 시 전체 매매 중단 (`SystemExit`) |
| 팻 핑거 방지 | 주문 금액이 보유 현금의 30% 초과 시 주문 거부 |
| 신뢰도 임계값 | Gemini 신뢰도 80 미만이면 강제 HOLD |
| 레이트 리미터 | Leaky Bucket 알고리즘으로 API 호출 제한 |
| 토큰 자동 갱신 | 만료 1분 전 자동으로 Access Token 재발급 |
## Quick Start
## 빠른 시작
### 1. 환경 설정
```bash
cp .env.example .env
# .env 파일에 KIS API 키와 Gemini API 키 입력
```
필수 값:
- `KIS_APP_KEY`
- `KIS_APP_SECRET`
- `KIS_ACCOUNT_NO`
- `GEMINI_API_KEY`
### 2. 의존성 설치
```bash
pip install -e ".[dev]"
pip install ".[dev]"
```
### 3. 테스트
### 3. 테스트 실행
```bash
pytest -v --cov=src
ruff check src/ tests/
mypy src/ --strict
pytest -v --cov=src --cov-report=term-missing
```
## 실행
### 기본 실행
### 4. 실행 (모의투자)
```bash
python -m src.main --mode=paper
```
### 대시보드 포함 실행
### 5. Docker 실행
```bash
python -m src.main --mode=paper --dashboard
docker compose up -d ouroboros
```
또는 환경변수:
## 텔레그램 알림 (선택사항)
```bash
DASHBOARD_ENABLED=true
DASHBOARD_HOST=127.0.0.1
DASHBOARD_PORT=8080
```
거래 실행, 서킷 브레이커 발동, 시스템 상태 등을 텔레그램으로 실시간 알림 받을 수 있습니다.
## 주요 API/기능
### 빠른 설정
- 플레이북 저장: `playbooks` 테이블 (`date + market` UNIQUE)
- 의사결정/결과 연결: `trades.decision_id` + `DecisionLogger.update_outcome()`
- 시장별 scorecard 컨텍스트: `scorecard_KR`, `scorecard_US`
- 컨텍스트 스케줄러: weekly/monthly/quarterly/annual/legacy + cleanup
- 대시보드 API:
- `/api/status`
- `/api/playbook/{date}?market=KR`
- `/api/scorecard/{date}?market=KR`
- `/api/performance?market=all`
- `/api/context/{layer}`
- `/api/decisions?market=KR`
- `/api/scenarios/active?market=US`
1. **봇 생성**: 텔레그램에서 [@BotFather](https://t.me/BotFather) 메시지 → `/newbot` 명령
2. **채팅 ID 확인**: [@userinfobot](https://t.me/userinfobot) 메시지 → `/start` 명령
3. **환경변수 설정**: `.env` 파일에 추가
```bash
TELEGRAM_BOT_TOKEN=1234567890:ABCdefGHIjklMNOpqrsTUVwxyz
TELEGRAM_CHAT_ID=123456789
TELEGRAM_ENABLED=true
```
4. **테스트**: 봇과 대화 시작 (`/start` 전송) 후 에이전트 실행
## 현재 갭 (코드 기준)
**상세 문서**: [src/notifications/README.md](src/notifications/README.md)
- `Issue 4-1` 미구현: `/report`, `/scenarios`, `/review`, `/dashboard` Telegram 명령 미등록
- `Issue 1-7` 일부 미완:
- `price_change_pct` 정규화 어댑터 명시 구현 없음
- 영향: `price_change_pct_above/below` 조건을 사용하는 시나리오는 사실상 매칭 불가(dead path)
- HOLD 시 별도 손절 모니터링 플래그 처리 분리 미흡
- 시장 코드 정합성 이슈:
- 설정 기본값은 `ENABLED_MARKETS="KR,US"`
- 스케줄 정의는 `US_NASDAQ`, `US_NYSE` 중심
- 영향: `get_open_markets(["KR", "US"])`에서 `US` 미정의로 US 시장이 누락될 수 있음(런타임 영향)
### 알림 종류
- 🟢 거래 체결 알림 (BUY/SELL + 신뢰도)
- 🚨 서킷 브레이커 발동 (자동 거래 중단)
- ⚠️ 팻 핑거 차단 (과도한 주문 차단)
- 장 시작/종료 알림
- 📝 시스템 시작/종료 상태
**안전장치**: 알림 실패해도 거래는 계속 진행됩니다. 텔레그램 API 오류나 설정 누락이 있어도 거래 시스템은 정상 작동합니다.
## 테스트
로컬 수집 기준:
35개 테스트가 TDD 방식으로 구현 전에 먼저 작성되었습니다.
```bash
pytest --collect-only -q
# 538 tests collected
```
tests/test_risk.py — 서킷 브레이커, 팻 핑거, 통합 검증 (11개)
tests/test_broker.py — 토큰 관리, 타임아웃, HTTP 에러, 해시키 (6개)
tests/test_brain.py — JSON 파싱, 신뢰도 임계값, 비정상 응답 처리 (15개)
```
권장 검증:
## 기술 스택
```bash
pytest -v --cov=src
ruff check src/ tests/
mypy src/ --strict
- **언어**: Python 3.11+ (asyncio 기반)
- **브로커**: KIS Open API (REST)
- **AI**: Google Gemini Pro
- **DB**: SQLite
- **검증**: pytest + coverage
- **CI/CD**: GitHub Actions
- **배포**: Docker + Docker Compose
## 프로젝트 구조
```
The-Ouroboros/
├── .github/workflows/ci.yml # CI 파이프라인
├── docs/
│ ├── agents.md # AI 에이전트 페르소나 정의
│ └── skills.md # 사용 가능한 도구 목록
├── src/
│ ├── config.py # Pydantic 설정
│ ├── logging_config.py # JSON 구조화 로깅
│ ├── db.py # SQLite 거래 기록
│ ├── main.py # 비동기 거래 루프
│ ├── broker/kis_api.py # KIS API 클라이언트
│ ├── brain/gemini_client.py # Gemini 의사결정 엔진
│ ├── core/risk_manager.py # 리스크 관리
│ ├── notifications/telegram_client.py # 텔레그램 알림
│ ├── evolution/optimizer.py # 전략 진화 엔진
│ └── strategies/base.py # 전략 베이스 클래스
├── tests/ # TDD 테스트 스위트
├── Dockerfile # 멀티스테이지 빌드
├── docker-compose.yml # 서비스 오케스트레이션
└── pyproject.toml # 의존성 및 도구 설정
```
## 문서
## 라이선스
- 아키텍처: `docs/architecture.md`
- 컨텍스트 트리: `docs/context-tree.md`
- 워크플로우: `docs/workflow.md`
- 요구사항 로그: `docs/requirements-log.md`
- 명령 레퍼런스: `docs/commands.md`
이 프로젝트의 라이선스는 [LICENSE](LICENSE) 파일을 참조하세요.

View File

@@ -2,140 +2,342 @@
## Overview
The Ouroboros V2는 `Proactive` 구조를 중심으로 동작합니다.
Self-evolving AI trading agent for global stock markets via KIS (Korea Investment & Securities) API. The main loop in `src/main.py` orchestrates four components across multiple markets with two trading modes: daily (batch API calls) or realtime (per-stock decisions).
- 장전: Gemini 1회 호출로 시장별 `DayPlaybook` 생성
- 장중: `ScenarioEngine`이 로컬 조건 매칭으로 의사결정
- 장후: `ContextAggregator` + `DailyReviewer`로 성과 집계/교훈 생성
## Trading Modes
`main.py`가 아래 컴포넌트를 오케스트레이션합니다.
The system supports two trading frequency modes controlled by the `TRADE_MODE` environment variable:
- `KISBroker` / `OverseasBroker`
- `PreMarketPlanner` / `ScenarioEngine` / `PlaybookStore`
- `ContextStore` / `ContextAggregator` / `ContextScheduler`
- `DailyReviewer` / `EvolutionOptimizer`
- `TelegramClient` / `TelegramCommandHandler`
### Daily Mode (default)
안전/운영 컴포넌트도 핵심입니다.
Optimized for Gemini Free tier API limits (20 calls/day):
- `RiskManager`: circuit breaker, fat-finger 검증
- `PriorityTaskQueue` + `CriticalityAssessor`: 우선순위/지연 제어
- **Batch decisions**: 1 API call per market per session
- **Fixed schedule**: 4 sessions per day at 6-hour intervals (configurable)
- **API efficiency**: Processes all stocks in a market simultaneously
- **Use case**: Free tier users, cost-conscious deployments
- **Configuration**:
```bash
TRADE_MODE=daily
DAILY_SESSIONS=4 # Sessions per day (1-10)
SESSION_INTERVAL_HOURS=6 # Hours between sessions (1-24)
```
## Market Scope
**Example**: With 2 markets (US, KR) and 4 sessions/day = 8 API calls/day (within 20 call limit)
V2 기본 설정은 `ENABLED_MARKETS="KR,US"` 입니다.
### Realtime Mode
현재 코드 기준 주의점(런타임 영향):
High-frequency trading with individual stock analysis:
- 설정은 `KR,US`를 기본값으로 사용
- 스케줄 레이어(`src/markets/schedule.py`)는 `US_NASDAQ`, `US_NYSE` 구조를 아직 유지
- `US` 코드가 스케줄에 직접 정의되지 않아 US 시장 누락 가능성이 있음
- **Per-stock decisions**: 1 API call per stock per cycle
- **60-second interval**: Continuous monitoring
- **Use case**: Production deployments with Gemini paid tier
- **Configuration**:
```bash
TRADE_MODE=realtime
```
## Decision Flow
**Note**: Realtime mode requires Gemini API subscription due to high call volume.
### 1) Pre-market
## Core Components
1. `SmartVolatilityScanner.scan()`으로 후보 종목 수집
2. `PreMarketPlanner.generate_playbook(market, candidates)` 호출
3. 결과를 `PlaybookStore.save()`로 DB 저장
4. 실패 시 empty/defensive playbook 사용
### 1. Broker (`src/broker/`)
### 2) In-market
**KISBroker** (`kis_api.py`) — Async KIS API client for domestic Korean market
1. 시장 데이터 + 스캐너 메트릭(`rsi`, `volume_ratio`) 구성
2. `ScenarioEngine.evaluate(playbook, stock_code, market_data, portfolio_data)`
3. `TradeDecision` 변환 후 주문/로그/알림 처리
4. `decision_logs``trades``decision_id`로 연결
- Automatic OAuth token refresh (valid for 24 hours)
- Leaky-bucket rate limiter (10 requests per second)
- POST body hash-key signing for order authentication
- Custom SSL context with disabled hostname verification for VTS (virtual trading) endpoint due to known certificate mismatch
### 3) End-of-day
**OverseasBroker** (`overseas.py`) — KIS overseas stock API wrapper
1. `ContextAggregator.aggregate_daily_from_trades(date, market)`
2. `DailyReviewer.generate_scorecard(date, market)`
3. `store_scorecard_in_context()``scorecard_{market}` 저장
4. `generate_lessons()`로 장후 교훈 생성
5. (US 종료 시) `EvolutionOptimizer.evolve()` 실행
- Reuses KISBroker infrastructure (session, token, rate limiter) via composition
- Supports 9 global markets: US (NASDAQ/NYSE/AMEX), Japan, Hong Kong, China (Shanghai/Shenzhen), Vietnam (Hanoi/HCM)
- Different API endpoints for overseas price/balance/order operations
## Risk Policy
**Market Schedule** (`src/markets/schedule.py`) — Timezone-aware market management
- `RiskManager`는 주문 전 검증을 강제합니다.
- circuit breaker: 손실 임계치 하회 시 거래 중단
- fat-finger: 주문 금액 과대 시 주문 차단
- 실패 시 알림은 보내되, 예외 처리로 루프 안정성 유지
- `MarketInfo` dataclass with timezone, trading hours, lunch breaks
- Automatic DST handling via `zoneinfo.ZoneInfo`
- `is_market_open()` checks weekends, trading hours, lunch breaks
- `get_open_markets()` returns currently active markets
- `get_next_market_open()` finds next market to open and when
## Error Handling Strategy
**New API Methods** (added in v0.9.0):
- `fetch_market_rankings()` — Fetch volume surge rankings from KIS API
- `get_daily_prices()` — Fetch OHLCV history for technical analysis
- API 호출 실패: 재시도(지수 백오프) 후 종목/사이클 스킵
- 시나리오/플래너 실패: empty 또는 defensive playbook으로 안전 폴백
- Telegram 실패: warning 로깅 후 거래 루프 지속
- 대시보드 스레드 실패: warning 로깅 후 메인 트레이딩 루프와 분리 유지
### 2. Analysis (`src/analysis/`)
## Configuration Reference
**VolatilityAnalyzer** (`volatility.py`) — Technical indicator calculations
상세 설정은 `src/config.py`를 기준으로 합니다.
- ATR (Average True Range) for volatility measurement
- RSI (Relative Strength Index) using Wilder's smoothing method
- Price change percentages across multiple timeframes
- Volume surge ratios and price-volume divergence
- Momentum scoring (0-100 scale)
- Breakout/breakdown pattern detection
- 거래 모드: `TRADE_MODE`, `DAILY_SESSIONS`, `SESSION_INTERVAL_HOURS`
- 전략: `PRE_MARKET_MINUTES`, `MAX_SCENARIOS_PER_STOCK`, `RESCAN_INTERVAL_SECONDS`
- 시장: `ENABLED_MARKETS`
- 대시보드: `DASHBOARD_ENABLED`, `DASHBOARD_HOST`, `DASHBOARD_PORT`
- 알림: `TELEGRAM_*`
**SmartVolatilityScanner** (`smart_scanner.py`) — Python-first filtering pipeline
## Context Tree
- **Step 1**: Fetch volume rankings from KIS API (top 30 stocks)
- **Step 2**: Calculate RSI and volume ratio for each stock
- **Step 3**: Apply filters:
- Volume ratio >= `VOL_MULTIPLIER` (default 2.0x previous day)
- RSI < `RSI_OVERSOLD_THRESHOLD` (30) OR RSI > `RSI_MOMENTUM_THRESHOLD` (70)
- **Step 4**: Score candidates by RSI extremity (60%) + volume surge (40%)
- **Step 5**: Return top N candidates (default 3) for AI analysis
- **Fallback**: Uses static watchlist if ranking API unavailable
- **Realtime mode only**: Daily mode uses batch processing for API efficiency
레이어 전략:
**Benefits:**
- Reduces Gemini API calls from 20-30 stocks to 1-3 qualified candidates
- Fast Python-based filtering before expensive AI judgment
- Logs selection context (RSI, volume_ratio, signal, score) for Evolution system
- `L7~L5`: 시장별 키
- `L4~L1`: 글로벌 통합 롤업
### 3. Brain (`src/brain/gemini_client.py`)
구현 포인트:
**GeminiClient** — AI decision engine powered by Google Gemini
- `L7` 쓰기: `volatility_{market}_{stock}`
- `L6` 집계: `total_pnl_KR`, `trade_count_US`
- `ContextScheduler.run_if_due()`:
- 주간/월간/분기/연간/legacy 집계
- 일 1회 `cleanup_expired_contexts()` 호출
- Constructs structured prompts from market data
- Parses JSON responses into `TradeDecision` objects (`action`, `confidence`, `rationale`)
- Forces HOLD when confidence < threshold (default 80)
- Falls back to safe HOLD on any parse/API error
- Handles markdown-wrapped JSON, malformed responses, invalid actions
## Data Model (핵심)
### 4. Risk Manager (`src/core/risk_manager.py`)
### `trades`
**RiskManager** — Safety circuit breaker and order validation
- `market`, `exchange_code`, `selection_context`, `decision_id` 포함
- SELL 시 `get_latest_buy_trade()`를 통해 원본 BUY `decision_id`를 찾아 결과 업데이트
⚠️ **READ-ONLY by policy** (see [`docs/agents.md`](./agents.md))
### `decision_logs`
- **Circuit Breaker**: Halts all trading via `SystemExit` when daily P&L drops below -3.0%
- Threshold may only be made stricter, never relaxed
- Calculated as `(total_eval - purchase_total) / purchase_total * 100`
- **Fat-Finger Protection**: Rejects orders exceeding 30% of available cash
- Must always be enforced, cannot be disabled
- 의사결정 입력/컨텍스트 스냅샷 저장
- `outcome_pnl`, `outcome_accuracy` 업데이트 가능
### 5. Notifications (`src/notifications/telegram_client.py`)
### `playbooks`
**TelegramClient** — Real-time event notifications via Telegram Bot API
- `UNIQUE(date, market)`
- `status`, `token_count`, `scenario_count`, `match_count` 관리
- Sends alerts for trades, circuit breakers, fat-finger rejections, system events
- Non-blocking: failures are logged but never crash trading
- Rate-limited: 1 message/second default to respect Telegram API limits
- Auto-disabled when credentials missing
- Gracefully handles API errors, network timeouts, invalid tokens
## Dashboard
**Notification Types:**
- Trade execution (BUY/SELL with confidence)
- Circuit breaker trips (critical alert)
- Fat-finger protection triggers (order rejection)
- Market open/close events
- System startup/shutdown status
`src/dashboard/app.py`의 FastAPI 앱이 SQLite를 직접 조회합니다.
**Setup:** See [src/notifications/README.md](../src/notifications/README.md) for bot creation and configuration.
엔드포인트:
### 6. Evolution (`src/evolution/optimizer.py`)
- `GET /api/status`
- `GET /api/playbook/{date}?market=KR`
- `GET /api/scorecard/{date}?market=KR`
- `GET /api/performance?market=all`
- `GET /api/context/{layer}`
- `GET /api/decisions?market=KR`
- `GET /api/scenarios/active?market=US`
**StrategyOptimizer** — Self-improvement loop
실행 통합:
- Analyzes high-confidence losing trades from SQLite
- Asks Gemini to generate new `BaseStrategy` subclasses
- Validates generated strategies by running full pytest suite
- Simulates PR creation for human review
- Only activates strategies that pass all tests
- CLI `--dashboard`
- 또는 `DASHBOARD_ENABLED=true`
- `main.py`에서 daemon thread로 uvicorn 실행
## Data Flow
## Known Gaps (2026-02-16)
### Realtime Mode (with Smart Scanner)
- `Issue 4-1` Telegram 확장 명령 미구현 (`/report`, `/scenarios`, `/review`, `/dashboard`)
- `Issue 1-7` 일부 미완:
- `price_change_pct` 정규화 계층 명시 미흡
- 영향: `price_change_pct` 기반 조건은 현재 사실상 매칭되지 않음
- HOLD 시 별도 손절 모니터링 플래그 처리 미완
- US 스캐닝 확장(`fetch_overseas_rankings`) 미구현
```
┌─────────────────────────────────────────────────────────────┐
│ Main Loop (60s cycle per market) │
└─────────────────────────────────────────────────────────────┘
┌──────────────────────────────────┐
│ Market Schedule Check │
│ - Get open markets │
│ - Filter by enabled markets │
│ - Wait if all closed │
└──────────────────┬────────────────┘
┌──────────────────────────────────┐
│ Smart Scanner (Python-first) │
│ - Fetch volume rankings (KIS) │
│ - Get 20d price history per stock│
│ - Calculate RSI(14) + vol ratio │
│ - Filter: vol>2x AND RSI extreme │
│ - Return top 3 qualified stocks │
└──────────────────┬────────────────┘
┌──────────────────────────────────┐
│ For Each Qualified Candidate │
└──────────────────┬────────────────┘
┌──────────────────────────────────┐
│ Broker: Fetch Market Data │
│ - Domestic: orderbook + balance │
│ - Overseas: price + balance │
└──────────────────┬────────────────┘
┌──────────────────────────────────┐
│ Calculate P&L │
│ pnl_pct = (eval - cost) / cost │
└──────────────────┬────────────────┘
┌──────────────────────────────────┐
│ Brain: Get Decision (AI) │
│ - Build prompt with market data │
│ - Call Gemini API │
│ - Parse JSON response │
│ - Return TradeDecision │
└──────────────────┬────────────────┘
┌──────────────────────────────────┐
│ Risk Manager: Validate Order │
│ - Check circuit breaker │
│ - Check fat-finger limit │
│ - Raise if validation fails │
└──────────────────┬────────────────┘
┌──────────────────────────────────┐
│ Broker: Execute Order │
│ - Domestic: send_order() │
│ - Overseas: send_overseas_order() │
└──────────────────┬────────────────┘
┌──────────────────────────────────┐
│ Notifications: Send Alert │
│ - Trade execution notification │
│ - Non-blocking (errors logged) │
│ - Rate-limited to 1/sec │
└──────────────────┬────────────────┘
┌──────────────────────────────────┐
│ Database: Log Trade │
│ - SQLite (data/trades.db) │
│ - Track: action, confidence, │
│ rationale, market, exchange │
│ - NEW: selection_context (JSON) │
│ - RSI, volume_ratio, signal │
│ - For Evolution optimization │
└───────────────────────────────────┘
```
## Database Schema
**SQLite** (`src/db.py`)
```sql
CREATE TABLE trades (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
stock_code TEXT NOT NULL,
action TEXT NOT NULL, -- BUY | SELL | HOLD
confidence INTEGER NOT NULL, -- 0-100
rationale TEXT,
quantity INTEGER,
price REAL,
pnl REAL DEFAULT 0.0,
market TEXT DEFAULT 'KR', -- KR | US_NASDAQ | JP | etc.
exchange_code TEXT DEFAULT 'KRX', -- KRX | NASD | NYSE | etc.
selection_context TEXT -- JSON: {rsi, volume_ratio, signal, score}
);
```
**Selection Context** (new in v0.9.0): Stores scanner selection criteria as JSON:
```json
{
"rsi": 28.5,
"volume_ratio": 2.7,
"signal": "oversold",
"score": 85.2
}
```
Enables Evolution system to analyze correlation between selection criteria and trade outcomes.
Auto-migration: Adds `market`, `exchange_code`, and `selection_context` columns if missing for backward compatibility.
## Configuration
**Pydantic Settings** (`src/config.py`)
Loaded from `.env` file:
```bash
# Required
KIS_APP_KEY=your_app_key
KIS_APP_SECRET=your_app_secret
KIS_ACCOUNT_NO=XXXXXXXX-XX
GEMINI_API_KEY=your_gemini_key
# Optional
MODE=paper # paper | live
DB_PATH=data/trades.db
CONFIDENCE_THRESHOLD=80
MAX_LOSS_PCT=3.0
MAX_ORDER_PCT=30.0
ENABLED_MARKETS=KR,US_NASDAQ # Comma-separated market codes
# Trading Mode (API efficiency)
TRADE_MODE=daily # daily | realtime
DAILY_SESSIONS=4 # Sessions per day (daily mode only)
SESSION_INTERVAL_HOURS=6 # Hours between sessions (daily mode only)
# Telegram Notifications (optional)
TELEGRAM_BOT_TOKEN=1234567890:ABCdefGHIjklMNOpqrsTUVwxyz
TELEGRAM_CHAT_ID=123456789
TELEGRAM_ENABLED=true
# Smart Scanner (optional, realtime mode only)
RSI_OVERSOLD_THRESHOLD=30 # 0-50, oversold threshold
RSI_MOMENTUM_THRESHOLD=70 # 50-100, momentum threshold
VOL_MULTIPLIER=2.0 # Minimum volume ratio (2.0 = 200%)
SCANNER_TOP_N=3 # Max qualified candidates per scan
```
Tests use in-memory SQLite (`DB_PATH=":memory:"`) and dummy credentials via `tests/conftest.py`.
## Error Handling
### Connection Errors (Broker API)
- Retry with exponential backoff (2^attempt seconds)
- Max 3 retries per stock
- After exhaustion, skip stock and continue with next
### API Quota Errors (Gemini)
- Return safe HOLD decision with confidence=0
- Log error but don't crash
- Agent continues trading on next cycle
### Circuit Breaker Tripped
- Immediately halt via `SystemExit`
- Log critical message
- Requires manual intervention to restart
### Market Closed
- Wait until next market opens
- Use `get_next_market_open()` to calculate wait time
- Sleep until market open time
### Telegram API Errors
- Log warning but continue trading
- Missing credentials → auto-disable notifications
- Network timeout → skip notification, no retry
- Invalid token → log error, trading unaffected
- Rate limit exceeded → queued via rate limiter
**Guarantee**: Notification failures never interrupt trading operations.

View File

@@ -1,82 +1,156 @@
# Command Reference
## Core Runtime Commands
## Common Command Failures
**Critical: Learn from failures. Never repeat the same failed command without modification.**
### tea CLI (Gitea Command Line Tool)
#### ❌ TTY Error - Interactive Confirmation Fails
```bash
~/bin/tea issues create --repo X --title "Y" --description "Z"
# Error: huh: could not open a new TTY: open /dev/tty: no such device or address
```
**💡 Reason:** tea tries to open `/dev/tty` for interactive confirmation prompts, which is unavailable in non-interactive environments.
**✅ Solution:** Use `YES=""` environment variable to bypass confirmation
```bash
YES="" ~/bin/tea issues create --repo jihoson/The-Ouroboros --title "Title" --description "Body"
YES="" ~/bin/tea issues edit <number> --repo jihoson/The-Ouroboros --description "Updated body"
YES="" ~/bin/tea pulls create --repo jihoson/The-Ouroboros --head feature-branch --base main --title "Title" --description "Body"
```
**📝 Notes:**
- Always set default login: `~/bin/tea login default local`
- Use `--repo jihoson/The-Ouroboros` when outside repo directory
- tea is preferred over direct Gitea API calls for consistency
#### ❌ Wrong Parameter Name
```bash
tea issues create --body "text"
# Error: flag provided but not defined: -body
```
**💡 Reason:** Parameter is `--description`, not `--body`.
**✅ Solution:** Use correct parameter name
```bash
YES="" ~/bin/tea issues create --description "text"
```
### Gitea API (Direct HTTP Calls)
#### ❌ Wrong Hostname
```bash
curl http://gitea.local:3000/api/v1/...
# Error: Could not resolve host: gitea.local
```
**💡 Reason:** Gitea instance runs on `localhost:3000`, not `gitea.local`.
**✅ Solution:** Use correct hostname (but prefer tea CLI)
```bash
curl http://localhost:3000/api/v1/repos/jihoson/The-Ouroboros/issues \
-H "Authorization: token $GITEA_TOKEN" \
-H "Content-Type: application/json" \
-d '{"title":"...", "body":"..."}'
```
**📝 Notes:**
- Prefer `tea` CLI over direct API calls
- Only use curl for operations tea doesn't support
### Git Commands
#### ❌ User Not Configured
```bash
git commit -m "message"
# Error: Author identity unknown
```
**💡 Reason:** Git user.name and user.email not set.
**✅ Solution:** Configure git user
```bash
git config user.name "agentson"
git config user.email "agentson@localhost"
```
#### ❌ Permission Denied on Push
```bash
git push origin branch
# Error: User permission denied for writing
```
**💡 Reason:** Repository access token lacks write permissions or user lacks repo write access.
**✅ Solution:**
1. Verify user has write access to repository (admin grants this)
2. Ensure git credential has correct token with `write:repository` scope
3. Check remote URL uses correct authentication
### Python/Pytest
#### ❌ Module Import Error
```bash
pytest tests/test_foo.py
# ModuleNotFoundError: No module named 'src'
```
**💡 Reason:** Package not installed in development mode.
**✅ Solution:** Install package with dev dependencies
```bash
pip install -e ".[dev]"
```
#### ❌ Async Test Hangs
```python
async def test_something(): # Hangs forever
result = await async_function()
```
**💡 Reason:** Missing pytest-asyncio or wrong configuration.
**✅ Solution:** Already configured in pyproject.toml
```toml
[tool.pytest.ini_options]
asyncio_mode = "auto"
```
No decorator needed for async tests.
## Build & Test Commands
```bash
# run (paper)
python -m src.main --mode=paper
# Install all dependencies (production + dev)
pip install -e ".[dev]"
# run with dashboard thread
python -m src.main --mode=paper --dashboard
# Run full test suite with coverage
pytest -v --cov=src --cov-report=term-missing
# tests
pytest -v --cov=src
# Run a single test file
pytest tests/test_risk.py -v
# lint
# Run a single test by name
pytest tests/test_brain.py -k "test_parse_valid_json" -v
# Lint
ruff check src/ tests/
# type-check
# Type check (strict mode, non-blocking in CI)
mypy src/ --strict
# Run the trading agent
python -m src.main --mode=paper
# Docker
docker compose up -d ouroboros # Run agent
docker compose --profile test up test # Run tests in container
```
## Dashboard Runtime Controls
`Issue 4-3` 기준 반영:
- CLI: `--dashboard`
- ENV: `DASHBOARD_ENABLED=true`
- Host/Port:
- `DASHBOARD_HOST` (default `127.0.0.1`)
- `DASHBOARD_PORT` (default `8080`)
## Telegram Commands (현재 구현)
`main.py` 등록 기준:
- `/help`
- `/status`
- `/positions`
- `/stop`
- `/resume`
## Telegram Commands (미구현 상태)
V2 플랜 `Issue 4-1` 항목은 아직 미구현:
- `/report [KR|US]`
- `/scenarios [KR|US]`
- `/review [KR|US]`
- `/dashboard`
## Gitea / tea Workflow Commands
이슈 선등록 후 작업 시작:
## Environment Setup
```bash
YES="" ~/bin/tea issues create \
--repo jihoson/The-Ouroboros \
--title "..." \
--description "..."
# Create .env file from example
cp .env.example .env
# Edit .env with your credentials
# Required: KIS_APP_KEY, KIS_APP_SECRET, KIS_ACCOUNT_NO, GEMINI_API_KEY
# Verify configuration
python -c "from src.config import Settings; print(Settings())"
```
작업은 `worktree` 기준 권장:
```bash
git worktree add ../The-Ouroboros-issue-<N> feature/issue-<N>-<slug>
```
PR 생성:
```bash
YES="" ~/bin/tea pulls create \
--repo jihoson/The-Ouroboros \
--head feature/issue-<N>-<slug> \
--base main \
--title "..." \
--description "..."
```
## Known tea CLI Gotcha
TTY 없는 환경에서는 `tea` 확인 프롬프트가 실패할 수 있습니다.
항상 `YES=""`를 붙여 비대화식으로 실행하세요.

View File

@@ -1,81 +1,243 @@
# Context Tree: Multi-Layered Memory Management
## Summary
The context tree implements **Pillar 2** of The Ouroboros: hierarchical memory management across 7 time horizons, from real-time market data to generational trading wisdom.
컨텍스트 트리는 L7(실시간)부터 L1(레거시)까지 계층화된 메모리 구조입니다.
## Overview
- L7~L5: 시장별 독립 데이터 중심
- L4~L1: 글로벌 포트폴리오 통합 데이터
Instead of a flat memory structure, The Ouroboros maintains a **7-tier context tree** where each layer represents a different time horizon and level of abstraction:
## Layer Policy
```
L1 (Legacy) ← Cumulative wisdom across generations
L2 (Annual) ← Yearly performance metrics
L3 (Quarterly) ← Quarterly strategy adjustments
L4 (Monthly) ← Monthly portfolio rebalancing
L5 (Weekly) ← Weekly stock selection
L6 (Daily) ← Daily trade logs
L7 (Real-time) ← Live market data
```
### L7_REALTIME (시장+종목 스코프)
Data flows **bottom-up**: real-time trades aggregate into daily summaries, which roll up to weekly, then monthly, quarterly, annual, and finally into permanent legacy knowledge.
- 주요 키 패턴:
- `volatility_{market}_{stock_code}`
- `price_{market}_{stock_code}`
- `rsi_{market}_{stock_code}`
- `volume_ratio_{market}_{stock_code}`
## The 7 Layers
`trading_cycle()`에서 실시간으로 기록합니다.
### L7: Real-time
**Retention**: 7 days
**Timeframe format**: `YYYY-MM-DD` (same-day)
**Content**: Current positions, live quotes, orderbook snapshots, tick-by-tick volatility
### L6_DAILY (시장 스코프)
**Use cases**:
- Immediate execution decisions
- Stop-loss triggers
- Real-time P&L tracking
EOD 집계 결과를 시장별 키로 저장합니다.
**Example keys**:
- `current_position_{stock_code}`: Current holdings
- `live_price_{stock_code}`: Latest quote
- `volatility_5m_{stock_code}`: 5-minute rolling volatility
- `trade_count_KR`, `buys_KR`, `sells_KR`, `holds_KR`
- `avg_confidence_US`, `total_pnl_US`, `win_rate_US`
- scorecard 저장 키: `scorecard_KR`, `scorecard_US`
### L6: Daily
**Retention**: 90 days
**Timeframe format**: `YYYY-MM-DD`
**Content**: Daily trade logs, end-of-day P&L, market summaries, decision accuracy
### L5_WEEKLY
**Use cases**:
- Daily performance review
- Identify patterns in recent trading
- Backtest strategy adjustments
L6 일일 데이터에서 시장별 주간 합계를 생성합니다.
**Example keys**:
- `total_pnl`: Daily profit/loss
- `trade_count`: Number of trades
- `win_rate`: Percentage of profitable trades
- `avg_confidence`: Average Gemini confidence
- `weekly_pnl_KR`, `weekly_pnl_US`
- `avg_confidence_KR`, `avg_confidence_US`
### L5: Weekly
**Retention**: 1 year
**Timeframe format**: `YYYY-Www` (ISO week, e.g., `2026-W06`)
**Content**: Weekly stock selection, sector rotation, volatility regime classification
### L4_MONTHLY 이상
**Use cases**:
- Weekly strategy adjustment
- Sector momentum tracking
- Identify hot/cold markets
글로벌 통합 롤업입니다.
**Example keys**:
- `weekly_pnl`: Week's total P&L
- `top_performers`: Best-performing stocks
- `sector_focus`: Dominant sectors
- `avg_confidence`: Weekly average confidence
- L5 → L4: `monthly_pnl`
- L4 → L3: `quarterly_pnl`
- L3 → L2: `annual_pnl`
- L2 → L1: `total_pnl`, `years_traded`, `avg_annual_pnl`
### L4: Monthly
**Retention**: 2 years
**Timeframe format**: `YYYY-MM`
**Content**: Monthly portfolio rebalancing, risk exposure analysis, drawdown recovery
## Aggregation Flow
**Use cases**:
- Monthly performance reporting
- Risk exposure adjustment
- Correlation analysis
- EOD: `ContextAggregator.aggregate_daily_from_trades(date, market)`
- 주기 롤업: `ContextScheduler.run_if_due()`
**Example keys**:
- `monthly_pnl`: Month's total P&L
- `sharpe_ratio`: Risk-adjusted return
- `max_drawdown`: Largest peak-to-trough decline
- `rebalancing_notes`: Manual insights
`ContextScheduler`는 다음을 처리합니다.
### L3: Quarterly
**Retention**: 3 years
**Timeframe format**: `YYYY-Qn` (e.g., `2026-Q1`)
**Content**: Quarterly strategy pivots, market phase detection (bull/bear/sideways), macro regime changes
- weekly/monthly/quarterly/annual/legacy 집계
- 일 1회 `ContextStore.cleanup_expired_contexts()` 실행
- 동일 날짜 중복 실행 방지(`_last_run`)
**Use cases**:
- Strategic pivots (e.g., growth → value)
- Macro regime classification
- Long-term pattern recognition
**Example keys**:
- `quarterly_pnl`: Quarter's total P&L
- `market_phase`: Bull/Bear/Sideways
- `strategy_adjustments`: Major changes made
- `lessons_learned`: Key insights
### L2: Annual
**Retention**: 10 years
**Timeframe format**: `YYYY`
**Content**: Yearly returns, Sharpe ratio, max drawdown, win rate, strategy effectiveness
**Use cases**:
- Annual performance review
- Multi-year trend analysis
- Strategy benchmarking
**Example keys**:
- `annual_pnl`: Year's total P&L
- `sharpe_ratio`: Annual risk-adjusted return
- `win_rate`: Yearly win percentage
- `best_strategy`: Most successful strategy
- `worst_mistake`: Biggest lesson learned
### L1: Legacy
**Retention**: Forever
**Timeframe format**: `LEGACY` (single timeframe)
**Content**: Cumulative trading history, core principles, generational wisdom
**Use cases**:
- Long-term philosophy
- Foundational rules
- Lessons that transcend market cycles
**Example keys**:
- `total_pnl`: All-time profit/loss
- `years_traded`: Trading longevity
- `avg_annual_pnl`: Long-term average return
- `core_principles`: Immutable trading rules
- `greatest_trades`: Hall of fame
- `never_again`: Permanent warnings
## Usage
### Setting Context
```python
from datetime import UTC, datetime
from src.context import ContextLayer, ContextStore
from src.db import init_db
from src.context.aggregator import ContextAggregator
from src.context.scheduler import ContextScheduler
conn = init_db("data/ouroboros.db")
store = ContextStore(conn)
aggregator = ContextAggregator(conn)
scheduler = ContextScheduler(aggregator=aggregator, store=context_store)
# Store daily P&L
store.set_context(
layer=ContextLayer.L6_DAILY,
timeframe="2026-02-04",
key="total_pnl",
value=1234.56
)
# EOD market-scoped daily aggregation
aggregator.aggregate_daily_from_trades(date="2026-02-16", market="KR")
# Store weekly insight
store.set_context(
layer=ContextLayer.L5_WEEKLY,
timeframe="2026-W06",
key="top_performers",
value=["005930", "000660", "035720"] # JSON-serializable
)
# Run scheduled rollups when due
scheduler.run_if_due(now=datetime.now(UTC))
# Store legacy wisdom
store.set_context(
layer=ContextLayer.L1_LEGACY,
timeframe="LEGACY",
key="core_principles",
value=[
"Cut losses fast",
"Let winners run",
"Never average down on losing positions"
]
)
```
## Retention
### Retrieving Context
`src/context/layer.py` 기준:
```python
# Get a specific value
pnl = store.get_context(ContextLayer.L6_DAILY, "2026-02-04", "total_pnl")
# Returns: 1234.56
# Get all keys for a timeframe
daily_summary = store.get_all_contexts(ContextLayer.L6_DAILY, "2026-02-04")
# Returns: {"total_pnl": 1234.56, "trade_count": 10, "win_rate": 60.0, ...}
# Get all data for a layer (any timeframe)
all_daily = store.get_all_contexts(ContextLayer.L6_DAILY)
# Returns: {"total_pnl": 1234.56, "trade_count": 10, ...} (latest timeframes first)
# Get the latest timeframe
latest = store.get_latest_timeframe(ContextLayer.L6_DAILY)
# Returns: "2026-02-04"
```
### Automatic Aggregation
The `ContextAggregator` rolls up data from lower to higher layers:
```python
from src.context.aggregator import ContextAggregator
aggregator = ContextAggregator(conn)
# Aggregate daily metrics from trades
aggregator.aggregate_daily_from_trades("2026-02-04")
# Roll up weekly from daily
aggregator.aggregate_weekly_from_daily("2026-W06")
# Roll up all layers at once (bottom-up)
aggregator.run_all_aggregations()
```
**Aggregation schedule** (recommended):
- **L7 → L6**: Every midnight (daily rollup)
- **L6 → L5**: Every Sunday (weekly rollup)
- **L5 → L4**: First day of each month (monthly rollup)
- **L4 → L3**: First day of quarter (quarterly rollup)
- **L3 → L2**: January 1st (annual rollup)
- **L2 → L1**: On demand (major milestones)
### Context Cleanup
Expired contexts are automatically deleted based on retention policies:
```python
# Manual cleanup
deleted = store.cleanup_expired_contexts()
# Returns: {ContextLayer.L7_REALTIME: 42, ContextLayer.L6_DAILY: 15, ...}
```
**Retention policies** (defined in `src/context/layer.py`):
- L1: Forever
- L2: 10 years
- L3: 3 years
@@ -84,8 +246,93 @@ scheduler.run_if_due(now=datetime.now(UTC))
- L6: 90 days
- L7: 7 days
## Current Notes (2026-02-16)
## Integration with Gemini Brain
- L7 쓰기와 L6 시장별 집계는 `main.py`에 연결됨
- scheduler 기반 cleanup/rollup도 연결됨
- cross-market scorecard 조회는 `PreMarketPlanner`에서 사용 중
The context tree provides hierarchical memory for decision-making:
```python
from src.brain.gemini_client import GeminiClient
# Build prompt with multi-layer context
def build_enhanced_prompt(stock_code: str, store: ContextStore) -> str:
# L7: Real-time data
current_price = store.get_context(ContextLayer.L7_REALTIME, "2026-02-04", f"live_price_{stock_code}")
# L6: Recent daily performance
yesterday_pnl = store.get_context(ContextLayer.L6_DAILY, "2026-02-03", "total_pnl")
# L5: Weekly trend
weekly_data = store.get_all_contexts(ContextLayer.L5_WEEKLY, "2026-W06")
# L1: Core principles
principles = store.get_context(ContextLayer.L1_LEGACY, "LEGACY", "core_principles")
return f"""
Analyze {stock_code} for trading decision.
Current price: {current_price}
Yesterday's P&L: {yesterday_pnl}
This week: {weekly_data}
Core principles:
{chr(10).join(f'- {p}' for p in principles)}
Decision (BUY/SELL/HOLD):
"""
```
## Database Schema
```sql
-- Context storage
CREATE TABLE contexts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
layer TEXT NOT NULL, -- L1_LEGACY, L2_ANNUAL, ..., L7_REALTIME
timeframe TEXT NOT NULL, -- "LEGACY", "2026", "2026-Q1", "2026-02", "2026-W06", "2026-02-04"
key TEXT NOT NULL, -- "total_pnl", "win_rate", "core_principles", etc.
value TEXT NOT NULL, -- JSON-serialized value
created_at TEXT NOT NULL, -- ISO 8601 timestamp
updated_at TEXT NOT NULL, -- ISO 8601 timestamp
UNIQUE(layer, timeframe, key)
);
-- Layer metadata
CREATE TABLE context_metadata (
layer TEXT PRIMARY KEY,
description TEXT NOT NULL,
retention_days INTEGER, -- NULL = keep forever
aggregation_source TEXT -- Parent layer for rollup
);
-- Indices for fast queries
CREATE INDEX idx_contexts_layer ON contexts(layer);
CREATE INDEX idx_contexts_timeframe ON contexts(timeframe);
CREATE INDEX idx_contexts_updated ON contexts(updated_at);
```
## Best Practices
1. **Write to leaf layers only** — Never manually write to L1-L5; let aggregation populate them
2. **Aggregate regularly** — Schedule aggregation jobs to keep higher layers fresh
3. **Query specific timeframes** — Use `get_context(layer, timeframe, key)` for precise retrieval
4. **Clean up periodically** — Run `cleanup_expired_contexts()` weekly to free space
5. **Preserve L1 forever** — Legacy wisdom should never expire
6. **Use JSON-serializable values** — Store dicts, lists, strings, numbers (not custom objects)
## Testing
See `tests/test_context.py` for comprehensive test coverage (18 tests, 100% coverage on context modules).
```bash
pytest tests/test_context.py -v
```
## References
- **Implementation**: `src/context/`
- `layer.py`: Layer definitions and metadata
- `store.py`: CRUD operations
- `aggregator.py`: Bottom-up aggregation logic
- **Database**: `src/db.py` (table initialization)
- **Tests**: `tests/test_context.py`
- **Related**: Pillar 2 (Multi-layered Context Management)

View File

@@ -86,48 +86,3 @@
- Plan Consistency (필수), Safety & Constraints, Quality, Workflow 4개 카테고리
**이슈/PR:** #114
---
## 2026-02-16
### V2 진행상태 재정렬 + 문서 동기화
**배경:**
- V2 이슈 다수가 병렬로 진행되며 구현/문서 간 상태 불일치가 발생
- 사용자 요청으로 "현재 코드 기준 사실"에 맞춘 전면 문서 갱신 필요
**확인된 상태(코드 기준):**
- 완료: 18/20
- 부분 완료: `1-7`
- 미완료: `4-1`
**핵심 반영 사항:**
1. 대시보드 실행 통합(`Issue 4-3`) 반영
- `--dashboard` 플래그
- `DASHBOARD_ENABLED`, `DASHBOARD_HOST`, `DASHBOARD_PORT`
2. 컨텍스트 스케줄러 및 시장 스코프 키 정책 반영
3. scorecard/review/evolution 연결 상태 반영
4. 미완료 갭 명시
- Telegram 확장 명령어(`4-1`) 미구현
- `1-7` 잔여 항목(키 정규화/HOLD 손절 모니터링/US 코드 정합성)
**프로세스 요구사항 강화:**
- 모든 문서 작업도 Gitea 이슈 선등록 후 진행
- 병렬 작업 후 상태 정합성 점검 결과를 `requirements-log`에 기록
**이슈/브랜치:**
- Issue: #131
- Branch(worktree): `feature/issue-131-docs-v2-status-sync`
### 문서 보강 2차 (리뷰 반영)
**리뷰 피드백 반영:**
- README에 Quick Start(환경설정/설치/검증) 복원
- architecture에 RiskManager/에러 처리/설정 레퍼런스 복원
- testing 문서에 기존 핵심 테스트 파일 설명 복원
- 시장 코드 불일치(`KR,US` vs `US_NASDAQ/US_NYSE`)를 "런타임 영향"으로 격상 명시
- `price_change_pct` 누락 영향(조건 dead path)을 명시
**의도:**
- V2 상태 반영과 기존 온보딩/운영 문서 가치를 동시에 유지

View File

@@ -1,63 +1,213 @@
# Testing Guidelines
## Current Test Baseline (2026-02-16)
## Test Structure
수집 기준:
**54 tests** across four files. `asyncio_mode = "auto"` in pyproject.toml — async tests need no special decorator.
```bash
pytest --collect-only -q
# 538 tests collected
The `settings` fixture in `conftest.py` provides safe defaults with test credentials and in-memory DB.
### Test Files
#### `tests/test_risk.py` (11 tests)
- Circuit breaker boundaries
- Fat-finger edge cases
- P&L calculation edge cases
- Order validation logic
**Example:**
```python
def test_circuit_breaker_exact_threshold(risk_manager):
"""Circuit breaker should trip at exactly -3.0%."""
with pytest.raises(CircuitBreakerTripped):
risk_manager.validate_order(
current_pnl_pct=-3.0,
order_amount=1000,
total_cash=10000
)
```
V2 핵심 영역 테스트가 포함되어 있습니다.
#### `tests/test_broker.py` (6 tests)
- OAuth token lifecycle
- Rate limiting enforcement
- Hash key generation
- Network error handling
- SSL context configuration
- `tests/test_strategy_models.py`
- `tests/test_pre_market_planner.py`
- `tests/test_scenario_engine.py`
- `tests/test_playbook_store.py`
- `tests/test_context_scheduler.py`
- `tests/test_daily_review.py`
- `tests/test_scorecard.py`
- `tests/test_dashboard.py`
- `tests/test_main.py`
기존 핵심 영역 테스트도 유지됩니다.
- `tests/test_risk.py`: circuit breaker/fat-finger 안전장치 검증
- `tests/test_broker.py`: KIS API 호출/에러 처리/인증 흐름 검증
- `tests/test_brain.py`: Gemini 응답 파싱/신뢰도 게이트 검증
- `tests/test_market_schedule.py`: 시장 오픈/클로즈/타임존 로직 검증
## Required Checks
```bash
pytest -v --cov=src
ruff check src/ tests/
mypy src/ --strict
**Example:**
```python
async def test_rate_limiter(broker):
"""Rate limiter should delay requests to stay under 10 RPS."""
start = time.monotonic()
for _ in range(15): # 15 requests
await broker._rate_limiter.acquire()
elapsed = time.monotonic() - start
assert elapsed >= 1.0 # Should take at least 1 second
```
## FastAPI Note
#### `tests/test_brain.py` (18 tests)
- Valid JSON parsing
- Markdown-wrapped JSON handling
- Malformed JSON fallback
- Missing fields handling
- Invalid action validation
- Confidence threshold enforcement
- Empty response handling
- Prompt construction for different markets
대시보드 테스트(`tests/test_dashboard.py`)는 `fastapi`가 환경에 없으면 skip될 수 있습니다.
의도된 동작이며 CI/개발환경에서 의존성 설치 여부를 확인하세요.
## Targeted Smoke Commands
```bash
# dashboard integration
pytest -q tests/test_main.py -k "dashboard"
# planner/scenario/review paths
pytest -q tests/test_pre_market_planner.py tests/test_scenario_engine.py tests/test_daily_review.py
# context rollup/scheduler
pytest -q tests/test_context.py tests/test_context_scheduler.py
**Example:**
```python
async def test_confidence_below_threshold_forces_hold(brain):
"""Decisions below confidence threshold should force HOLD."""
decision = brain.parse_response('{"action":"BUY","confidence":70,"rationale":"test"}')
assert decision.action == "HOLD"
assert decision.confidence == 70
```
## Review Checklist (테스트 관점)
#### `tests/test_market_schedule.py` (19 tests)
- Market open/close logic
- Timezone handling (UTC, Asia/Seoul, America/New_York, etc.)
- DST (Daylight Saving Time) transitions
- Weekend handling
- Lunch break logic
- Multiple market filtering
- Next market open calculation
- 플랜 항목별 테스트 존재 여부 확인
- 시장 스코프 키(`*_KR`, `*_US`) 검증 확인
- EOD 흐름(`aggregate_daily_from_trades`, `scorecard_{market}` 저장) 검증
- decision outcome 연결(`decision_id`) 검증
- 대시보드 API market filter 검증
**Example:**
```python
def test_is_market_open_during_trading_hours():
"""Market should be open during regular trading hours."""
# KRX: 9:00-15:30 KST, no lunch break
market = MARKETS["KR"]
trading_time = datetime(2026, 2, 3, 10, 0, tzinfo=ZoneInfo("Asia/Seoul")) # Monday 10:00
assert is_market_open(market, trading_time) is True
```
## Coverage Requirements
**Minimum coverage: 80%**
Check coverage:
```bash
pytest -v --cov=src --cov-report=term-missing
```
Expected output:
```
Name Stmts Miss Cover Missing
-----------------------------------------------------------
src/brain/gemini_client.py 85 5 94% 165-169
src/broker/kis_api.py 120 12 90% ...
src/core/risk_manager.py 35 2 94% ...
src/db.py 25 1 96% ...
src/main.py 150 80 47% (excluded from CI)
src/markets/schedule.py 95 3 97% ...
-----------------------------------------------------------
TOTAL 510 103 80%
```
**Note:** `main.py` has lower coverage as it contains the main loop which is tested via integration/manual testing.
## Test Configuration
### `pyproject.toml`
```toml
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
python_files = ["test_*.py"]
```
### `tests/conftest.py`
```python
@pytest.fixture
def settings() -> Settings:
"""Provide test settings with safe defaults."""
return Settings(
KIS_APP_KEY="test_key",
KIS_APP_SECRET="test_secret",
KIS_ACCOUNT_NO="12345678-01",
GEMINI_API_KEY="test_gemini_key",
MODE="paper",
DB_PATH=":memory:", # In-memory SQLite
CONFIDENCE_THRESHOLD=80,
ENABLED_MARKETS="KR",
)
```
## Writing New Tests
### Naming Convention
- Test files: `test_<module>.py`
- Test functions: `test_<feature>_<scenario>()`
- Use descriptive names that explain what is being tested
### Good Test Example
```python
async def test_send_order_with_market_price(broker, settings):
"""Market orders should use price=0 and ORD_DVSN='01'."""
# Arrange
stock_code = "005930"
order_type = "BUY"
quantity = 10
# Act
with patch.object(broker._session, 'post') as mock_post:
mock_post.return_value.__aenter__.return_value.status = 200
mock_post.return_value.__aenter__.return_value.json = AsyncMock(
return_value={"rt_cd": "0", "msg1": "OK"}
)
await broker.send_order(stock_code, order_type, quantity, price=0)
# Assert
call_args = mock_post.call_args
body = call_args.kwargs['json']
assert body['ORD_DVSN'] == '01' # Market order
assert body['ORD_UNPR'] == '0' # Price 0
```
### Test Checklist
- [ ] Test passes in isolation (`pytest tests/test_foo.py::test_bar -v`)
- [ ] Test has clear docstring explaining what it tests
- [ ] Arrange-Act-Assert structure
- [ ] Uses appropriate fixtures from conftest.py
- [ ] Mocks external dependencies (API calls, network)
- [ ] Tests edge cases and error conditions
- [ ] Doesn't rely on test execution order
## Running Tests
```bash
# All tests
pytest -v
# Specific file
pytest tests/test_risk.py -v
# Specific test
pytest tests/test_brain.py::test_parse_valid_json -v
# With coverage
pytest -v --cov=src --cov-report=term-missing
# Stop on first failure
pytest -x
# Verbose output with print statements
pytest -v -s
```
## CI/CD Integration
Tests run automatically on:
- Every commit to feature branches
- Every PR to main
- Scheduled daily runs
**Blocking conditions:**
- Test failures → PR blocked
- Coverage < 80% → PR blocked (warning only for main.py)
**Non-blocking:**
- `mypy --strict` errors (type hints encouraged but not enforced)
- `ruff check` warnings (must be acknowledged)

View File

@@ -8,9 +8,8 @@
2. **Create Feature Branch** — Branch from `main` using format `feature/issue-{N}-{short-description}`
- After creating the branch, run `git pull origin main` and rebase to ensure the branch is up to date
3. **Implement Changes** — Write code, tests, and documentation on the feature branch
4. **Sync Status Docs** — Before PR, update `README.md` and relevant `docs/*.md` so implementation status/gaps are explicit
5. **Create Pull Request** — Submit PR to `main` branch referencing the issue number
6. **Review & Merge** — After approval, merge via PR (squash or merge commit)
4. **Create Pull Request** — Submit PR to `main` branch referencing the issue number
5. **Review & Merge** — After approval, merge via PR (squash or merge commit)
**Never commit directly to `main`.** This policy applies to all changes, no exceptions.

View File

@@ -101,4 +101,7 @@ class Settings(BaseSettings):
@property
def enabled_market_list(self) -> list[str]:
"""Parse ENABLED_MARKETS into list of market codes."""
return [m.strip() for m in self.ENABLED_MARKETS.split(",") if m.strip()]
from src.markets.schedule import expand_market_codes
raw = [m.strip() for m in self.ENABLED_MARKETS.split(",") if m.strip()]
return expand_market_codes(raw)

View File

@@ -26,7 +26,19 @@ def create_dashboard_app(db_path: str) -> FastAPI:
def get_status() -> dict[str, Any]:
today = datetime.now(UTC).date().isoformat()
with _connect(db_path) as conn:
markets = ["KR", "US"]
market_rows = conn.execute(
"""
SELECT DISTINCT market FROM (
SELECT market FROM trades WHERE DATE(timestamp) = ?
UNION
SELECT market FROM decision_logs WHERE DATE(timestamp) = ?
UNION
SELECT market FROM playbooks WHERE date = ?
) ORDER BY market
""",
(today, today, today),
).fetchall()
markets = [row[0] for row in market_rows] if market_rows else []
market_status: dict[str, Any] = {}
total_trades = 0
total_pnl = 0.0

View File

@@ -214,3 +214,24 @@ def get_latest_buy_trade(
if not row:
return None
return {"decision_id": row[0], "price": row[1], "quantity": row[2]}
def get_open_position(
conn: sqlite3.Connection, stock_code: str, market: str
) -> dict[str, Any] | None:
"""Return open position if latest trade is BUY, else None."""
cursor = conn.execute(
"""
SELECT action, decision_id, price, quantity
FROM trades
WHERE stock_code = ?
AND market = ?
ORDER BY timestamp DESC
LIMIT 1
""",
(stock_code, market),
)
row = cursor.fetchone()
if not row or row[0] != "BUY":
return None
return {"decision_id": row[1], "price": row[2], "quantity": row[3]}

View File

@@ -8,6 +8,7 @@ from __future__ import annotations
import argparse
import asyncio
import json
import logging
import signal
import threading
@@ -28,7 +29,7 @@ from src.context.store import ContextStore
from src.core.criticality import CriticalityAssessor
from src.core.priority_queue import PriorityTaskQueue
from src.core.risk_manager import CircuitBreakerTripped, FatFingerRejected, RiskManager
from src.db import get_latest_buy_trade, init_db, log_trade
from src.db import get_latest_buy_trade, get_open_position, init_db, log_trade
from src.evolution.daily_review import DailyReviewer
from src.evolution.optimizer import EvolutionOptimizer
from src.logging.decision_logger import DecisionLogger
@@ -114,6 +115,7 @@ async def trading_cycle(
current_price = safe_float(orderbook.get("output1", {}).get("stck_prpr", "0"))
foreigner_net = safe_float(orderbook.get("output1", {}).get("frgn_ntby_qty", "0"))
price_change_pct = safe_float(orderbook.get("output1", {}).get("prdy_ctrt", "0"))
else:
# Overseas market
price_data = await overseas_broker.get_overseas_price(
@@ -136,6 +138,7 @@ async def trading_cycle(
current_price = safe_float(price_data.get("output", {}).get("last", "0"))
foreigner_net = 0.0 # Not available for overseas
price_change_pct = safe_float(price_data.get("output", {}).get("rate", "0"))
# Calculate daily P&L %
pnl_pct = (
@@ -149,6 +152,7 @@ async def trading_cycle(
"market_name": market.name,
"current_price": current_price,
"foreigner_net": foreigner_net,
"price_change_pct": price_change_pct,
}
# Enrich market_data with scanner metrics for scenario engine
@@ -240,6 +244,34 @@ async def trading_cycle(
confidence=match.confidence,
rationale=match.rationale,
)
stock_playbook = playbook.get_stock_playbook(stock_code)
if decision.action == "HOLD":
open_position = get_open_position(db_conn, stock_code, market.code)
if open_position:
entry_price = safe_float(open_position.get("price"), 0.0)
if entry_price > 0:
loss_pct = (current_price - entry_price) / entry_price * 100
stop_loss_threshold = -2.0
if stock_playbook and stock_playbook.scenarios:
stop_loss_threshold = stock_playbook.scenarios[0].stop_loss_pct
if loss_pct <= stop_loss_threshold:
decision = TradeDecision(
action="SELL",
confidence=95,
rationale=(
f"Stop-loss triggered ({loss_pct:.2f}% <= "
f"{stop_loss_threshold:.2f}%)"
),
)
logger.info(
"Stop-loss override for %s (%s): %.2f%% <= %.2f%%",
stock_code,
market.name,
loss_pct,
stop_loss_threshold,
)
logger.info(
"Decision for %s (%s): %s (confidence=%d)",
stock_code,
@@ -278,6 +310,7 @@ async def trading_cycle(
input_data = {
"current_price": current_price,
"foreigner_net": foreigner_net,
"price_change_pct": price_change_pct,
"total_eval": total_eval,
"total_cash": total_cash,
"pnl_pct": pnl_pct,
@@ -507,6 +540,9 @@ async def run_daily_session(
foreigner_net = safe_float(
orderbook.get("output1", {}).get("frgn_ntby_qty", "0")
)
price_change_pct = safe_float(
orderbook.get("output1", {}).get("prdy_ctrt", "0")
)
else:
price_data = await overseas_broker.get_overseas_price(
market.exchange_code, stock_code
@@ -515,12 +551,16 @@ async def run_daily_session(
price_data.get("output", {}).get("last", "0")
)
foreigner_net = 0.0
price_change_pct = safe_float(
price_data.get("output", {}).get("rate", "0")
)
stock_data: dict[str, Any] = {
"stock_code": stock_code,
"market_name": market.name,
"current_price": current_price,
"foreigner_net": foreigner_net,
"price_change_pct": price_change_pct,
}
# Enrich with scanner metrics
cand = candidate_map.get(stock_code)
@@ -820,7 +860,7 @@ async def _run_evolution_loop(
market_date: str,
) -> None:
"""Run evolution loop once at US close (end of trading day)."""
if market_code != "US":
if not market_code.startswith("US"):
return
try:
@@ -936,6 +976,10 @@ async def run(settings: Settings) -> None:
"/help - Show available commands\n"
"/status - Trading status (mode, markets, P&L)\n"
"/positions - Current holdings\n"
"/report - Daily summary report\n"
"/scenarios - Today's playbook scenarios\n"
"/review - Recent scorecards\n"
"/dashboard - Dashboard URL/status\n"
"/stop - Pause trading\n"
"/resume - Resume trading"
)
@@ -1055,11 +1099,164 @@ async def run(settings: Settings) -> None:
"<b>⚠️ Error</b>\n\nFailed to retrieve positions."
)
async def handle_report() -> None:
"""Handle /report command - show daily summary metrics."""
try:
today = datetime.now(UTC).date().isoformat()
trade_row = db_conn.execute(
"""
SELECT COUNT(*) AS trade_count,
COALESCE(SUM(pnl), 0.0) AS total_pnl,
SUM(CASE WHEN pnl > 0 THEN 1 ELSE 0 END) AS wins
FROM trades
WHERE DATE(timestamp) = ?
""",
(today,),
).fetchone()
decision_row = db_conn.execute(
"""
SELECT COUNT(*) AS decision_count,
COALESCE(AVG(confidence), 0.0) AS avg_confidence
FROM decision_logs
WHERE DATE(timestamp) = ?
""",
(today,),
).fetchone()
trade_count = int(trade_row[0] if trade_row else 0)
total_pnl = float(trade_row[1] if trade_row else 0.0)
wins = int(trade_row[2] if trade_row and trade_row[2] is not None else 0)
decision_count = int(decision_row[0] if decision_row else 0)
avg_confidence = float(decision_row[1] if decision_row else 0.0)
win_rate = (wins / trade_count * 100.0) if trade_count > 0 else 0.0
await telegram.send_message(
"<b>📈 Daily Report</b>\n\n"
f"<b>Date:</b> {today}\n"
f"<b>Trades:</b> {trade_count}\n"
f"<b>Total P&L:</b> {total_pnl:+.2f}\n"
f"<b>Win Rate:</b> {win_rate:.2f}%\n"
f"<b>Decisions:</b> {decision_count}\n"
f"<b>Avg Confidence:</b> {avg_confidence:.2f}"
)
except Exception as exc:
logger.error("Error in /report handler: %s", exc)
await telegram.send_message(
"<b>⚠️ Error</b>\n\nFailed to generate daily report."
)
async def handle_scenarios() -> None:
"""Handle /scenarios command - show today's playbook scenarios."""
try:
today = datetime.now(UTC).date().isoformat()
rows = db_conn.execute(
"""
SELECT market, playbook_json
FROM playbooks
WHERE date = ?
ORDER BY market
""",
(today,),
).fetchall()
if not rows:
await telegram.send_message(
"<b>🧠 Today's Scenarios</b>\n\nNo playbooks found for today."
)
return
lines = ["<b>🧠 Today's Scenarios</b>", ""]
for market, playbook_json in rows:
lines.append(f"<b>{market}</b>")
playbook_data = {}
try:
playbook_data = json.loads(playbook_json)
except Exception:
playbook_data = {}
stock_playbooks = playbook_data.get("stock_playbooks", [])
if not stock_playbooks:
lines.append("- No scenarios")
lines.append("")
continue
for stock_pb in stock_playbooks:
stock_code = stock_pb.get("stock_code", "N/A")
scenarios = stock_pb.get("scenarios", [])
for sc in scenarios:
action = sc.get("action", "HOLD")
confidence = sc.get("confidence", 0)
lines.append(f"- {stock_code}: {action} ({confidence})")
lines.append("")
await telegram.send_message("\n".join(lines).strip())
except Exception as exc:
logger.error("Error in /scenarios handler: %s", exc)
await telegram.send_message(
"<b>⚠️ Error</b>\n\nFailed to retrieve scenarios."
)
async def handle_review() -> None:
"""Handle /review command - show recent scorecards."""
try:
rows = db_conn.execute(
"""
SELECT timeframe, key, value
FROM contexts
WHERE layer = 'L6_DAILY' AND key LIKE 'scorecard_%'
ORDER BY updated_at DESC
LIMIT 5
"""
).fetchall()
if not rows:
await telegram.send_message(
"<b>📝 Recent Reviews</b>\n\nNo scorecards available."
)
return
lines = ["<b>📝 Recent Reviews</b>", ""]
for timeframe, key, value in rows:
scorecard = json.loads(value)
market = key.replace("scorecard_", "")
total_pnl = float(scorecard.get("total_pnl", 0.0))
win_rate = float(scorecard.get("win_rate", 0.0))
decisions = int(scorecard.get("total_decisions", 0))
lines.append(
f"- {timeframe} {market}: P&L {total_pnl:+.2f}, "
f"Win {win_rate:.2f}%, Decisions {decisions}"
)
await telegram.send_message("\n".join(lines))
except Exception as exc:
logger.error("Error in /review handler: %s", exc)
await telegram.send_message(
"<b>⚠️ Error</b>\n\nFailed to retrieve reviews."
)
async def handle_dashboard() -> None:
"""Handle /dashboard command - show dashboard URL if enabled."""
if not settings.DASHBOARD_ENABLED:
await telegram.send_message(
"<b>🖥️ Dashboard</b>\n\nDashboard is not enabled."
)
return
url = f"http://{settings.DASHBOARD_HOST}:{settings.DASHBOARD_PORT}"
await telegram.send_message(
"<b>🖥️ Dashboard</b>\n\n"
f"<b>URL:</b> {url}"
)
command_handler.register_command("help", handle_help)
command_handler.register_command("stop", handle_stop)
command_handler.register_command("resume", handle_resume)
command_handler.register_command("status", handle_status)
command_handler.register_command("positions", handle_positions)
command_handler.register_command("report", handle_report)
command_handler.register_command("scenarios", handle_scenarios)
command_handler.register_command("review", handle_review)
command_handler.register_command("dashboard", handle_dashboard)
# Initialize volatility hunter
volatility_analyzer = VolatilityAnalyzer(min_volume_surge=2.0, min_price_change=1.0)

View File

@@ -123,6 +123,23 @@ MARKETS: dict[str, MarketInfo] = {
),
}
MARKET_SHORTHAND: dict[str, list[str]] = {
"US": ["US_NASDAQ", "US_NYSE", "US_AMEX"],
"CN": ["CN_SHA", "CN_SZA"],
"VN": ["VN_HAN", "VN_HCM"],
}
def expand_market_codes(codes: list[str]) -> list[str]:
"""Expand shorthand market codes into concrete exchange market codes."""
expanded: list[str] = []
for code in codes:
if code in MARKET_SHORTHAND:
expanded.extend(MARKET_SHORTHAND[code])
else:
expanded.append(code)
return expanded
def is_market_open(market: MarketInfo, now: datetime | None = None) -> bool:
"""

View File

@@ -1,21 +1,25 @@
"""Tests for FastAPI dashboard endpoints."""
"""Tests for dashboard endpoint handlers."""
from __future__ import annotations
import json
import sqlite3
from collections.abc import Callable
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
import pytest
pytest.importorskip("fastapi")
from fastapi.testclient import TestClient
from fastapi import HTTPException
from fastapi.responses import FileResponse
from src.dashboard.app import create_dashboard_app
from src.db import init_db
def _seed_db(conn: sqlite3.Connection) -> None:
today = datetime.now(UTC).date().isoformat()
conn.execute(
"""
INSERT INTO playbooks (
@@ -34,6 +38,24 @@ def _seed_db(conn: sqlite3.Connection) -> None:
1,
),
)
conn.execute(
"""
INSERT INTO playbooks (
date, market, status, playbook_json, generated_at,
token_count, scenario_count, match_count
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
today,
"US_NASDAQ",
"ready",
json.dumps({"market": "US_NASDAQ", "stock_playbooks": []}),
f"{today}T08:30:00+00:00",
100,
1,
0,
),
)
conn.execute(
"""
INSERT INTO contexts (layer, timeframe, key, value, created_at, updated_at)
@@ -71,7 +93,7 @@ def _seed_db(conn: sqlite3.Connection) -> None:
""",
(
"d-kr-1",
"2026-02-14T09:10:00+00:00",
f"{today}T09:10:00+00:00",
"005930",
"KR",
"KRX",
@@ -91,9 +113,9 @@ def _seed_db(conn: sqlite3.Connection) -> None:
""",
(
"d-us-1",
"2026-02-14T21:10:00+00:00",
f"{today}T21:10:00+00:00",
"AAPL",
"US",
"US_NASDAQ",
"NASDAQ",
"SELL",
80,
@@ -110,7 +132,7 @@ def _seed_db(conn: sqlite3.Connection) -> None:
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"2026-02-14T09:11:00+00:00",
f"{today}T09:11:00+00:00",
"005930",
"BUY",
85,
@@ -132,7 +154,7 @@ def _seed_db(conn: sqlite3.Connection) -> None:
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
"2026-02-14T21:11:00+00:00",
f"{today}T21:11:00+00:00",
"AAPL",
"SELL",
80,
@@ -140,7 +162,7 @@ def _seed_db(conn: sqlite3.Connection) -> None:
1,
200,
-1.0,
"US",
"US_NASDAQ",
"NASDAQ",
None,
"d-us-1",
@@ -149,122 +171,128 @@ def _seed_db(conn: sqlite3.Connection) -> None:
conn.commit()
def _client(tmp_path: Path) -> TestClient:
def _app(tmp_path: Path) -> Any:
db_path = tmp_path / "dashboard_test.db"
conn = init_db(str(db_path))
_seed_db(conn)
conn.close()
app = create_dashboard_app(str(db_path))
return TestClient(app)
return create_dashboard_app(str(db_path))
def _endpoint(app: Any, path: str) -> Callable[..., Any]:
for route in app.routes:
if getattr(route, "path", None) == path:
return route.endpoint
raise AssertionError(f"route not found: {path}")
def test_index_serves_html(tmp_path: Path) -> None:
client = _client(tmp_path)
resp = client.get("/")
assert resp.status_code == 200
assert "The Ouroboros Dashboard API" in resp.text
app = _app(tmp_path)
index = _endpoint(app, "/")
resp = index()
assert isinstance(resp, FileResponse)
assert "index.html" in str(resp.path)
def test_status_endpoint(tmp_path: Path) -> None:
client = _client(tmp_path)
resp = client.get("/api/status")
assert resp.status_code == 200
body = resp.json()
app = _app(tmp_path)
get_status = _endpoint(app, "/api/status")
body = get_status()
assert "KR" in body["markets"]
assert "US" in body["markets"]
assert "US_NASDAQ" in body["markets"]
assert "totals" in body
def test_playbook_found(tmp_path: Path) -> None:
client = _client(tmp_path)
resp = client.get("/api/playbook/2026-02-14?market=KR")
assert resp.status_code == 200
assert resp.json()["market"] == "KR"
app = _app(tmp_path)
get_playbook = _endpoint(app, "/api/playbook/{date_str}")
body = get_playbook("2026-02-14", market="KR")
assert body["market"] == "KR"
def test_playbook_not_found(tmp_path: Path) -> None:
client = _client(tmp_path)
resp = client.get("/api/playbook/2026-02-15?market=KR")
assert resp.status_code == 404
app = _app(tmp_path)
get_playbook = _endpoint(app, "/api/playbook/{date_str}")
with pytest.raises(HTTPException, match="playbook not found"):
get_playbook("2026-02-15", market="KR")
def test_scorecard_found(tmp_path: Path) -> None:
client = _client(tmp_path)
resp = client.get("/api/scorecard/2026-02-14?market=KR")
assert resp.status_code == 200
assert resp.json()["scorecard"]["total_pnl"] == 1.5
app = _app(tmp_path)
get_scorecard = _endpoint(app, "/api/scorecard/{date_str}")
body = get_scorecard("2026-02-14", market="KR")
assert body["scorecard"]["total_pnl"] == 1.5
def test_scorecard_not_found(tmp_path: Path) -> None:
client = _client(tmp_path)
resp = client.get("/api/scorecard/2026-02-15?market=KR")
assert resp.status_code == 404
app = _app(tmp_path)
get_scorecard = _endpoint(app, "/api/scorecard/{date_str}")
with pytest.raises(HTTPException, match="scorecard not found"):
get_scorecard("2026-02-15", market="KR")
def test_performance_all(tmp_path: Path) -> None:
client = _client(tmp_path)
resp = client.get("/api/performance?market=all")
assert resp.status_code == 200
body = resp.json()
app = _app(tmp_path)
get_performance = _endpoint(app, "/api/performance")
body = get_performance(market="all")
assert body["market"] == "all"
assert body["combined"]["total_trades"] == 2
assert len(body["by_market"]) == 2
def test_performance_market_filter(tmp_path: Path) -> None:
client = _client(tmp_path)
resp = client.get("/api/performance?market=KR")
assert resp.status_code == 200
body = resp.json()
app = _app(tmp_path)
get_performance = _endpoint(app, "/api/performance")
body = get_performance(market="KR")
assert body["market"] == "KR"
assert body["metrics"]["total_trades"] == 1
def test_performance_empty_market(tmp_path: Path) -> None:
client = _client(tmp_path)
resp = client.get("/api/performance?market=JP")
assert resp.status_code == 200
assert resp.json()["metrics"]["total_trades"] == 0
app = _app(tmp_path)
get_performance = _endpoint(app, "/api/performance")
body = get_performance(market="JP")
assert body["metrics"]["total_trades"] == 0
def test_context_layer_all(tmp_path: Path) -> None:
client = _client(tmp_path)
resp = client.get("/api/context/L7_REALTIME")
assert resp.status_code == 200
body = resp.json()
app = _app(tmp_path)
get_context_layer = _endpoint(app, "/api/context/{layer}")
body = get_context_layer("L7_REALTIME", timeframe=None, limit=100)
assert body["layer"] == "L7_REALTIME"
assert body["count"] == 1
def test_context_layer_timeframe_filter(tmp_path: Path) -> None:
client = _client(tmp_path)
resp = client.get("/api/context/L6_DAILY?timeframe=2026-02-14")
assert resp.status_code == 200
body = resp.json()
app = _app(tmp_path)
get_context_layer = _endpoint(app, "/api/context/{layer}")
body = get_context_layer("L6_DAILY", timeframe="2026-02-14", limit=100)
assert body["count"] == 1
assert body["entries"][0]["key"] == "scorecard_KR"
def test_decisions_endpoint(tmp_path: Path) -> None:
client = _client(tmp_path)
resp = client.get("/api/decisions?market=KR")
assert resp.status_code == 200
body = resp.json()
app = _app(tmp_path)
get_decisions = _endpoint(app, "/api/decisions")
body = get_decisions(market="KR", limit=50)
assert body["count"] == 1
assert body["decisions"][0]["decision_id"] == "d-kr-1"
def test_scenarios_active_filters_non_matched(tmp_path: Path) -> None:
client = _client(tmp_path)
resp = client.get("/api/scenarios/active?market=KR&date_str=2026-02-14")
assert resp.status_code == 200
body = resp.json()
app = _app(tmp_path)
get_active_scenarios = _endpoint(app, "/api/scenarios/active")
body = get_active_scenarios(
market="KR",
date_str=datetime.now(UTC).date().isoformat(),
limit=50,
)
assert body["count"] == 1
assert body["matches"][0]["stock_code"] == "005930"
def test_scenarios_active_empty_when_no_matches(tmp_path: Path) -> None:
client = _client(tmp_path)
resp = client.get("/api/scenarios/active?market=US&date_str=2026-02-14")
assert resp.status_code == 200
assert resp.json()["count"] == 0
app = _app(tmp_path)
get_active_scenarios = _endpoint(app, "/api/scenarios/active")
body = get_active_scenarios(market="US", date_str="2026-02-14", limit=50)
assert body["count"] == 0

60
tests/test_db.py Normal file
View File

@@ -0,0 +1,60 @@
"""Tests for database helper functions."""
from src.db import get_open_position, init_db, log_trade
def test_get_open_position_returns_latest_buy() -> None:
conn = init_db(":memory:")
log_trade(
conn=conn,
stock_code="005930",
action="BUY",
confidence=90,
rationale="entry",
quantity=2,
price=70000.0,
market="KR",
exchange_code="KRX",
decision_id="d-buy-1",
)
position = get_open_position(conn, "005930", "KR")
assert position is not None
assert position["decision_id"] == "d-buy-1"
assert position["price"] == 70000.0
assert position["quantity"] == 2
def test_get_open_position_returns_none_when_latest_is_sell() -> None:
conn = init_db(":memory:")
log_trade(
conn=conn,
stock_code="005930",
action="BUY",
confidence=90,
rationale="entry",
quantity=1,
price=70000.0,
market="KR",
exchange_code="KRX",
decision_id="d-buy-1",
)
log_trade(
conn=conn,
stock_code="005930",
action="SELL",
confidence=95,
rationale="exit",
quantity=1,
price=71000.0,
market="KR",
exchange_code="KRX",
decision_id="d-sell-1",
)
assert get_open_position(conn, "005930", "KR") is None
def test_get_open_position_returns_none_when_no_trades() -> None:
conn = init_db(":memory:")
assert get_open_position(conn, "AAPL", "US_NASDAQ") is None

View File

@@ -116,6 +116,7 @@ class TestTradingCycleTelegramIntegration:
"output1": {
"stck_prpr": "50000",
"frgn_ntby_qty": "100",
"prdy_ctrt": "1.23",
}
}
)
@@ -747,7 +748,7 @@ class TestScenarioEngineIntegration:
broker = MagicMock()
broker.get_orderbook = AsyncMock(
return_value={
"output1": {"stck_prpr": "50000", "frgn_ntby_qty": "100"}
"output1": {"stck_prpr": "50000", "frgn_ntby_qty": "100", "prdy_ctrt": "2.50"}
}
)
broker.get_balance = AsyncMock(
@@ -830,6 +831,7 @@ class TestScenarioEngineIntegration:
assert market_data["rsi"] == 25.0
assert market_data["volume_ratio"] == 3.5
assert market_data["current_price"] == 50000.0
assert market_data["price_change_pct"] == 2.5
# Portfolio data should include pnl
assert "portfolio_pnl_pct" in portfolio_data
@@ -1232,6 +1234,107 @@ async def test_sell_updates_original_buy_decision_outcome() -> None:
assert updated_buy.outcome_accuracy == 1
@pytest.mark.asyncio
async def test_hold_overridden_to_sell_when_stop_loss_triggered() -> None:
"""HOLD decision should be overridden to SELL when stop-loss threshold is breached."""
db_conn = init_db(":memory:")
decision_logger = DecisionLogger(db_conn)
buy_decision_id = decision_logger.log_decision(
stock_code="005930",
market="KR",
exchange_code="KRX",
action="BUY",
confidence=90,
rationale="entry",
context_snapshot={},
input_data={},
)
log_trade(
conn=db_conn,
stock_code="005930",
action="BUY",
confidence=90,
rationale="entry",
quantity=1,
price=100.0,
market="KR",
exchange_code="KRX",
decision_id=buy_decision_id,
)
broker = MagicMock()
broker.get_orderbook = AsyncMock(
return_value={"output1": {"stck_prpr": "95", "frgn_ntby_qty": "0", "prdy_ctrt": "-5.0"}}
)
broker.get_balance = AsyncMock(
return_value={
"output2": [
{
"tot_evlu_amt": "100000",
"dnca_tot_amt": "10000",
"pchs_amt_smtl_amt": "90000",
}
]
}
)
broker.send_order = AsyncMock(return_value={"msg1": "OK"})
scenario = StockScenario(
condition=StockCondition(rsi_below=30),
action=ScenarioAction.BUY,
confidence=88,
stop_loss_pct=-2.0,
rationale="stop loss policy",
)
playbook = DayPlaybook(
date=date(2026, 2, 8),
market="KR",
stock_playbooks=[
{"stock_code": "005930", "stock_name": "Samsung", "scenarios": [scenario]}
],
)
engine = MagicMock(spec=ScenarioEngine)
engine.evaluate = MagicMock(return_value=_make_hold_match())
market = MagicMock()
market.name = "Korea"
market.code = "KR"
market.exchange_code = "KRX"
market.is_domestic = True
telegram = MagicMock()
telegram.notify_trade_execution = AsyncMock()
telegram.notify_fat_finger = AsyncMock()
telegram.notify_circuit_breaker = AsyncMock()
telegram.notify_scenario_matched = AsyncMock()
await trading_cycle(
broker=broker,
overseas_broker=MagicMock(),
scenario_engine=engine,
playbook=playbook,
risk=MagicMock(),
db_conn=db_conn,
decision_logger=decision_logger,
context_store=MagicMock(
get_latest_timeframe=MagicMock(return_value=None),
set_context=MagicMock(),
),
criticality_assessor=MagicMock(
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
get_timeout=MagicMock(return_value=5.0),
),
telegram=telegram,
market=market,
stock_code="005930",
scan_candidates={},
)
broker.send_order.assert_called_once()
assert broker.send_order.call_args.kwargs["order_type"] == "SELL"
@pytest.mark.asyncio
async def test_handle_market_close_runs_daily_review_flow() -> None:
"""Market close should aggregate, create scorecard, lessons, and notify."""
@@ -1427,7 +1530,7 @@ async def test_run_evolution_loop_notifies_when_pr_generated() -> None:
await _run_evolution_loop(
evolution_optimizer=optimizer,
telegram=telegram,
market_code="US",
market_code="US_NASDAQ",
market_date="2026-02-14",
)
@@ -1451,7 +1554,7 @@ async def test_run_evolution_loop_notification_error_is_ignored() -> None:
await _run_evolution_loop(
evolution_optimizer=optimizer,
telegram=telegram,
market_code="US",
market_code="US_NYSE",
market_date="2026-02-14",
)

View File

@@ -7,6 +7,7 @@ import pytest
from src.markets.schedule import (
MARKETS,
expand_market_codes,
get_next_market_open,
get_open_markets,
is_market_open,
@@ -199,3 +200,28 @@ class TestGetNextMarketOpen:
enabled_markets=["INVALID", "KR"], now=test_time
)
assert market.code == "KR"
class TestExpandMarketCodes:
"""Test shorthand market expansion."""
def test_expand_us_shorthand(self) -> None:
assert expand_market_codes(["US"]) == ["US_NASDAQ", "US_NYSE", "US_AMEX"]
def test_expand_cn_shorthand(self) -> None:
assert expand_market_codes(["CN"]) == ["CN_SHA", "CN_SZA"]
def test_expand_vn_shorthand(self) -> None:
assert expand_market_codes(["VN"]) == ["VN_HAN", "VN_HCM"]
def test_expand_mixed_codes(self) -> None:
assert expand_market_codes(["KR", "US", "JP"]) == [
"KR",
"US_NASDAQ",
"US_NYSE",
"US_AMEX",
"JP",
]
def test_expand_preserves_unknown_code(self) -> None:
assert expand_market_codes(["KR", "UNKNOWN"]) == ["KR", "UNKNOWN"]

View File

@@ -682,6 +682,10 @@ class TestBasicCommands:
"/help - Show available commands\n"
"/status - Trading status (mode, markets, P&L)\n"
"/positions - Current holdings\n"
"/report - Daily summary report\n"
"/scenarios - Today's playbook scenarios\n"
"/review - Recent scorecards\n"
"/dashboard - Dashboard URL/status\n"
"/stop - Pause trading\n"
"/resume - Resume trading"
)
@@ -707,10 +711,106 @@ class TestBasicCommands:
assert "/help" in payload["text"]
assert "/status" in payload["text"]
assert "/positions" in payload["text"]
assert "/report" in payload["text"]
assert "/scenarios" in payload["text"]
assert "/review" in payload["text"]
assert "/dashboard" in payload["text"]
assert "/stop" in payload["text"]
assert "/resume" in payload["text"]
class TestExtendedCommands:
"""Test additional bot commands."""
@pytest.mark.asyncio
async def test_report_command(self) -> None:
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
async def mock_report() -> None:
await client.send_message("<b>📈 Daily Report</b>\n\nTrades: 1")
handler.register_command("report", mock_report)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
await handler._handle_update(
{"update_id": 1, "message": {"chat": {"id": 456}, "text": "/report"}}
)
payload = mock_post.call_args.kwargs["json"]
assert "Daily Report" in payload["text"]
@pytest.mark.asyncio
async def test_scenarios_command(self) -> None:
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
async def mock_scenarios() -> None:
await client.send_message("<b>🧠 Today's Scenarios</b>\n\n- AAPL: BUY (85)")
handler.register_command("scenarios", mock_scenarios)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
await handler._handle_update(
{"update_id": 1, "message": {"chat": {"id": 456}, "text": "/scenarios"}}
)
payload = mock_post.call_args.kwargs["json"]
assert "Today's Scenarios" in payload["text"]
@pytest.mark.asyncio
async def test_review_command(self) -> None:
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
async def mock_review() -> None:
await client.send_message("<b>📝 Recent Reviews</b>\n\n- 2026-02-14 KR")
handler.register_command("review", mock_review)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
await handler._handle_update(
{"update_id": 1, "message": {"chat": {"id": 456}, "text": "/review"}}
)
payload = mock_post.call_args.kwargs["json"]
assert "Recent Reviews" in payload["text"]
@pytest.mark.asyncio
async def test_dashboard_command(self) -> None:
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
async def mock_dashboard() -> None:
await client.send_message("<b>🖥️ Dashboard</b>\n\nURL: http://127.0.0.1:8080")
handler.register_command("dashboard", mock_dashboard)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
await handler._handle_update(
{"update_id": 1, "message": {"chat": {"id": 456}, "text": "/dashboard"}}
)
payload = mock_post.call_args.kwargs["json"]
assert "Dashboard" in payload["text"]
class TestGetUpdates:
"""Test getUpdates API interaction."""