Compare commits

..

2 Commits

Author SHA1 Message Date
agentson
1ef5dcb2b3 docs: README.md v2 현행화 (#131)
Some checks failed
CI / test (pull_request) Has been cancelled
- 아키텍처 다이어그램에 v2 컴포넌트 (Strategy, Context, Evolution) 추가
- 핵심 모듈 테이블: 6개 → 14개 모듈 반영
- 테스트: 35개/3파일 → 551개/25파일
- 지원 시장 10개 거래소 테이블 추가
- 텔레그램 양방향 명령어 9종 레퍼런스
- 프로젝트 구조 트리 전면 갱신
- 문서 링크 섹션 추가

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 21:48:49 +09:00
agentson
d105a3ff5e docs: v2 상태 반영 - 전체 문서 현행화 (#131)
Some checks failed
CI / test (pull_request) Has been cancelled
- testing.md: 54 tests/4 files → 551 tests/25 files 반영, 전체 테스트 파일 설명
- architecture.md: v2 컴포넌트 추가 (Strategy, Context, Dashboard, Decision Logger 등),
  Playbook Mode 데이터 플로우, DB 스키마 5개 테이블, v2 환경변수
- commands.md: Dashboard 실행, Telegram 명령어 9종 레퍼런스
- CLAUDE.md: Project Structure 확장, 테스트 수 업데이트, --dashboard 플래그
- skills.md: DB 파일명 trades.db로 통일, Dashboard 명령어 추가
- requirements-log.md: 2026-02-16 문서 v2 동기화 요구사항 기록

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 21:44:59 +09:00
13 changed files with 983 additions and 1098 deletions

View File

@@ -15,6 +15,9 @@ pytest -v --cov=src
# Run (paper trading)
python -m src.main --mode=paper
# Run with dashboard
python -m src.main --mode=paper --dashboard
```
## Telegram Notifications (Optional)
@@ -43,6 +46,10 @@ Get real-time alerts for trades, circuit breakers, and system events via Telegra
- Market open/close notifications
- 📝 System startup/shutdown status
### Interactive Commands
With `TELEGRAM_COMMANDS_ENABLED=true` (default), the bot supports 9 bidirectional commands: `/help`, `/status`, `/positions`, `/report`, `/scenarios`, `/review`, `/dashboard`, `/stop`, `/resume`.
**Fail-safe**: Notifications never crash the trading system. Missing credentials or API errors are logged but trading continues normally.
## Smart Volatility Scanner (Optional)
@@ -109,17 +116,23 @@ User requirements and feedback are tracked in [docs/requirements-log.md](docs/re
```
src/
├── analysis/ # Technical analysis (RSI, volatility, smart scanner)
├── backup/ # Disaster recovery (scheduler, cloud storage, health)
├── brain/ # Gemini AI decision engine (prompt optimizer, context selector)
├── broker/ # KIS API client (domestic + overseas)
├── brain/ # Gemini AI decision engine
├── context/ # L1-L7 hierarchical memory system
├── core/ # Risk manager (READ-ONLY)
├── evolution/ # Self-improvement optimizer
├── dashboard/ # FastAPI read-only monitoring (8 API endpoints)
├── data/ # External data integration (news, market data, calendar)
├── evolution/ # Self-improvement (optimizer, daily review, scorecard)
├── logging/ # Decision logger (audit trail)
├── markets/ # Market schedules and timezone handling
├── notifications/ # Telegram real-time alerts
├── notifications/ # Telegram alerts + bidirectional commands (9 commands)
├── strategy/ # Pre-market planner, scenario engine, playbook store
├── db.py # SQLite trade logging
├── main.py # Trading loop orchestrator
└── config.py # Settings (from .env)
tests/ # 343 tests across 14 files
tests/ # 551 tests across 25 files
docs/ # Extended documentation
```
@@ -131,6 +144,7 @@ ruff check src/ tests/ # Lint
mypy src/ --strict # Type check
python -m src.main --mode=paper # Paper trading
python -m src.main --mode=paper --dashboard # With dashboard
python -m src.main --mode=live # Live trading (⚠️ real money)
# Gitea workflow (requires tea CLI)

160
README.md
View File

@@ -10,28 +10,41 @@ KIS(한국투자증권) API로 매매하고, Google Gemini로 판단하며, 자
│ (매매 실행) │ │ (거래 루프) │ │ (의사결정) │
└─────────────┘ └──────┬──────┘ └─────────────┘
┌────────────┐
│Risk Manager
│ (안전장치) │
└──────┬──────┘
────────────
│ Evolution
│ (전략 진화) │
└─────────────┘
┌────────────┼────────────┐
│ │
┌──────┴──────┐ ┌──┴───┐ ┌──────┴──────┐
│Risk Manager │ │ DB │ │ Telegram │
(안전장치) │ │ │ │ (알림+명령)
└──────┬──────┘ └──────┘ └─────────────
┌────────┼────────┐
┌────┴────┐┌──┴──┐┌────┴─────┐
│Strategy ││Ctx ││Evolution │
│(플레이북)││(메모리)││ (진화) │
└─────────┘└─────┘└──────────┘
```
**v2 핵심**: "Plan Once, Execute Locally" — 장 시작 전 AI가 시나리오 플레이북을 1회 생성하고, 거래 시간에는 로컬 시나리오 매칭만 수행하여 API 비용과 지연 시간을 대폭 절감.
## 핵심 모듈
| 모듈 | 파일 | 설명 |
| 모듈 | 위치 | 설명 |
|------|------|------|
| 설정 | `src/config.py` | Pydantic 기반 환경변수 로딩 및 타입 검증 |
| 브로커 | `src/broker/kis_api.py` | KIS API 비동기 래퍼 (토큰 갱신, 레이트 리미터, 해시키) |
| 두뇌 | `src/brain/gemini_client.py` | Gemini 프롬프트 구성 JSON 응답 파싱 |
| 방패 | `src/core/risk_manager.py` | 서킷 브레이커 + 팻 핑거 체크 |
| 알림 | `src/notifications/telegram_client.py` | 텔레그램 실시간 거래 알림 (선택사항) |
| 진화 | `src/evolution/optimizer.py` | 실패 패턴 분석 → 새 전략 생성 → 테스트 → PR |
| DB | `src/db.py` | SQLite 거래 로그 기록 |
| 설정 | `src/config.py` | Pydantic 기반 환경변수 로딩 및 타입 검증 (35+ 변수) |
| 브로커 | `src/broker/` | KIS API 비동기 래퍼 (국내 + 해외 9개 시장) |
| 두뇌 | `src/brain/` | Gemini 프롬프트 구성, JSON 파싱, 토큰 최적화 |
| 방패 | `src/core/risk_manager.py` | 서킷 브레이커 + 팻 핑거 체크 (READ-ONLY) |
| 전략 | `src/strategy/` | Pre-Market Planner, Scenario Engine, Playbook Store |
| 컨텍스트 | `src/context/` | L1-L7 계층형 메모리 시스템 |
| 분석 | `src/analysis/` | RSI, ATR, Smart Volatility Scanner |
| 알림 | `src/notifications/` | 텔레그램 양방향 (알림 + 9개 명령어) |
| 대시보드 | `src/dashboard/` | FastAPI 읽기 전용 모니터링 (8개 API) |
| 진화 | `src/evolution/` | 전략 진화 + Daily Review + Scorecard |
| 의사결정 로그 | `src/logging/` | 전체 거래 결정 감사 추적 |
| 데이터 | `src/data/` | 뉴스, 시장 데이터, 경제 캘린더 연동 |
| 백업 | `src/backup/` | 자동 백업, S3 클라우드, 무결성 검증 |
| DB | `src/db.py` | SQLite 거래 로그 (5개 테이블) |
## 안전장치
@@ -42,6 +55,7 @@ KIS(한국투자증권) API로 매매하고, Google Gemini로 판단하며, 자
| 신뢰도 임계값 | Gemini 신뢰도 80 미만이면 강제 HOLD |
| 레이트 리미터 | Leaky Bucket 알고리즘으로 API 호출 제한 |
| 토큰 자동 갱신 | 만료 1분 전 자동으로 Access Token 재발급 |
| 손절 모니터링 | 플레이북 시나리오 기반 실시간 포지션 보호 |
## 빠른 시작
@@ -67,7 +81,11 @@ pytest -v --cov=src --cov-report=term-missing
### 4. 실행 (모의투자)
```bash
# 기본 실행
python -m src.main --mode=paper
# 대시보드 활성화
python -m src.main --mode=paper --dashboard
```
### 5. Docker 실행
@@ -76,7 +94,20 @@ python -m src.main --mode=paper
docker compose up -d ouroboros
```
## 텔레그램 알림 (선택사항)
## 지원 시장
| 국가 | 거래소 | 코드 |
|------|--------|------|
| 🇰🇷 한국 | KRX | KR |
| 🇺🇸 미국 | NASDAQ, NYSE, AMEX | US_NASDAQ, US_NYSE, US_AMEX |
| 🇯🇵 일본 | TSE | JP |
| 🇭🇰 홍콩 | SEHK | HK |
| 🇨🇳 중국 | 상하이, 선전 | CN_SHA, CN_SZA |
| 🇻🇳 베트남 | 하노이, 호치민 | VN_HNX, VN_HSX |
`ENABLED_MARKETS` 환경변수로 활성 시장 선택 (기본: `KR,US`).
## 텔레그램 (선택사항)
거래 실행, 서킷 브레이커 발동, 시스템 상태 등을 텔레그램으로 실시간 알림 받을 수 있습니다.
@@ -102,25 +133,51 @@ docker compose up -d ouroboros
- 장 시작/종료 알림
- 📝 시스템 시작/종료 상태
**안전장치**: 알림 실패해도 거래는 계속 진행됩니다. 텔레그램 API 오류나 설정 누락이 있어도 거래 시스템은 정상 작동합니다.
### 양방향 명령어
`TELEGRAM_COMMANDS_ENABLED=true` (기본값) 설정 시 9개 대화형 명령어 지원:
| 명령어 | 설명 |
|--------|------|
| `/help` | 사용 가능한 명령어 목록 |
| `/status` | 거래 상태 (모드, 시장, P&L) |
| `/positions` | 계좌 요약 (잔고, 현금, P&L) |
| `/report` | 일일 요약 (거래 수, P&L, 승률) |
| `/scenarios` | 오늘의 플레이북 시나리오 |
| `/review` | 최근 스코어카드 (L6_DAILY) |
| `/dashboard` | 대시보드 URL 표시 |
| `/stop` | 거래 일시 정지 |
| `/resume` | 거래 재개 |
**안전장치**: 알림 실패해도 거래는 계속 진행됩니다.
## 테스트
35개 테스트가 TDD 방식으로 구현 전에 먼저 작성되었습니다.
551개 테스트가 25개 파일에 걸쳐 구현되어 있습니다. 최소 커버리지 80%.
```
tests/test_risk.py — 서킷 브레이커, 팻 핑거, 통합 검증 (11개)
tests/test_broker.py — 토큰 관리, 타임아웃, HTTP 에러, 해시키 (6개)
tests/test_brain.py JSON 파싱, 신뢰도 임계값, 비정상 응답 처리 (15개)
tests/test_scenario_engine.py 시나리오 매칭 (44개)
tests/test_data_integration.py — 외부 데이터 연동 (38개)
tests/test_pre_market_planner.py — 플레이북 생성 (37개)
tests/test_main.py — 거래 루프 통합 (37개)
tests/test_token_efficiency.py — 토큰 최적화 (34개)
tests/test_strategy_models.py — 전략 모델 검증 (33개)
tests/test_telegram_commands.py — 텔레그램 명령어 (31개)
tests/test_latency_control.py — 지연시간 제어 (30개)
tests/test_telegram.py — 텔레그램 알림 (25개)
... 외 16개 파일
```
**상세**: [docs/testing.md](docs/testing.md)
## 기술 스택
- **언어**: Python 3.11+ (asyncio 기반)
- **브로커**: KIS Open API (REST)
- **브로커**: KIS Open API (REST, 국내+해외)
- **AI**: Google Gemini Pro
- **DB**: SQLite
- **검증**: pytest + coverage
- **DB**: SQLite (5개 테이블: trades, contexts, decision_logs, playbooks, context_metadata)
- **대시보드**: FastAPI + uvicorn
- **검증**: pytest + coverage (551 tests)
- **CI/CD**: GitHub Actions
- **배포**: Docker + Docker Compose
@@ -128,27 +185,50 @@ tests/test_brain.py — JSON 파싱, 신뢰도 임계값, 비정상 응답 처
```
The-Ouroboros/
├── .github/workflows/ci.yml # CI 파이프라인
├── docs/
│ ├── agents.md # AI 에이전트 페르소나 정의
── skills.md # 사용 가능한 도구 목록
│ ├── architecture.md # 시스템 아키텍처
── testing.md # 테스트 가이드
│ ├── commands.md # 명령어 레퍼런스
│ ├── context-tree.md # L1-L7 메모리 시스템
│ ├── workflow.md # Git 워크플로우
│ ├── agents.md # 에이전트 정책
│ ├── skills.md # 도구 목록
│ ├── disaster_recovery.md # 백업/복구
│ └── requirements-log.md # 요구사항 기록
├── src/
│ ├── config.py # Pydantic 설정
│ ├── logging_config.py # JSON 구조화 로깅
│ ├── db.py # SQLite 거래 기록
│ ├── main.py # 비동기 거래 루프
│ ├── broker/kis_api.py # KIS API 클라이언트
│ ├── brain/gemini_client.py # Gemini 의사결정 엔진
│ ├── core/risk_manager.py # 리스크 관리
│ ├── notifications/telegram_client.py # 텔레그램 알림
│ ├── evolution/optimizer.py # 전략 진화 엔진
── strategies/base.py # 전략 베이스 클래스
├── tests/ # TDD 테스트 스위트
│ ├── analysis/ # 기술적 분석 (RSI, ATR, Smart Scanner)
│ ├── backup/ # 백업 (스케줄러, S3, 무결성 검증)
│ ├── brain/ # Gemini 의사결정 (프롬프트 최적화, 컨텍스트 선택)
│ ├── broker/ # KIS API (국내 + 해외)
│ ├── context/ # L1-L7 계층 메모리
│ ├── core/ # 리스크 관리 (READ-ONLY)
│ ├── dashboard/ # FastAPI 모니터링 대시보드
│ ├── data/ # 외부 데이터 연동
│ ├── evolution/ # 전략 진화 + Daily Review
── logging/ # 의사결정 감사 추적
│ ├── markets/ # 시장 스케줄 + 타임존
│ ├── notifications/ # 텔레그램 알림 + 명령어
│ ├── strategy/ # 플레이북 (Planner, Scenario Engine)
│ ├── config.py # Pydantic 설정
│ ├── db.py # SQLite 데이터베이스
│ └── main.py # 비동기 거래 루프
├── tests/ # 551개 테스트 (25개 파일)
├── Dockerfile # 멀티스테이지 빌드
├── docker-compose.yml # 서비스 오케스트레이션
└── pyproject.toml # 의존성 및 도구 설정
```
## 문서
- **[아키텍처](docs/architecture.md)** — 시스템 설계, 컴포넌트, 데이터 흐름
- **[테스트](docs/testing.md)** — 테스트 구조, 커버리지, 작성 가이드
- **[명령어](docs/commands.md)** — CLI, Dashboard, Telegram 명령어
- **[컨텍스트 트리](docs/context-tree.md)** — L1-L7 계층 메모리
- **[워크플로우](docs/workflow.md)** — Git 워크플로우 정책
- **[에이전트 정책](docs/agents.md)** — 안전 제약, 금지 행위
- **[백업/복구](docs/disaster_recovery.md)** — 재해 복구 절차
- **[요구사항](docs/requirements-log.md)** — 사용자 요구사항 추적
## 라이선스
이 프로젝트의 라이선스는 [LICENSE](LICENSE) 파일을 참조하세요.

View File

@@ -2,7 +2,9 @@
## Overview
Self-evolving AI trading agent for global stock markets via KIS (Korea Investment & Securities) API. The main loop in `src/main.py` orchestrates four components across multiple markets with two trading modes: daily (batch API calls) or realtime (per-stock decisions).
Self-evolving AI trading agent for global stock markets via KIS (Korea Investment & Securities) API. The main loop in `src/main.py` orchestrates components across multiple markets with two trading modes: daily (batch API calls) or realtime (per-stock decisions).
**v2 Proactive Playbook Architecture**: The system uses a "plan once, execute locally" approach. Pre-market, the AI generates a playbook of scenarios (one Gemini API call per market per day). During trading hours, a local scenario engine matches live market data against these pre-computed scenarios — no additional AI calls needed. This dramatically reduces API costs and latency.
## Trading Modes
@@ -46,9 +48,11 @@ High-frequency trading with individual stock analysis:
**KISBroker** (`kis_api.py`) — Async KIS API client for domestic Korean market
- Automatic OAuth token refresh (valid for 24 hours)
- Leaky-bucket rate limiter (10 requests per second)
- Leaky-bucket rate limiter (configurable RPS, default 2.0)
- POST body hash-key signing for order authentication
- Custom SSL context with disabled hostname verification for VTS (virtual trading) endpoint due to known certificate mismatch
- `fetch_market_rankings()` — Fetch volume surge rankings from KIS API
- `get_daily_prices()` — Fetch OHLCV history for technical analysis
**OverseasBroker** (`overseas.py`) — KIS overseas stock API wrapper
@@ -63,14 +67,7 @@ High-frequency trading with individual stock analysis:
- `is_market_open()` checks weekends, trading hours, lunch breaks
- `get_open_markets()` returns currently active markets
- `get_next_market_open()` finds next market to open and when
**New API Methods** (added in v0.9.0):
- `fetch_market_rankings()` — Fetch volume surge rankings from KIS API
- `get_daily_prices()` — Fetch OHLCV history for technical analysis
**Overseas Ranking API Methods** (added in v0.10.x):
- `fetch_overseas_rankings()` — Fetch overseas ranking universe (fluctuation / volume)
- Ranking endpoint paths and TR_IDs are configurable via environment variables
- 10 global markets defined (KR, US_NASDAQ, US_NYSE, US_AMEX, JP, HK, CN_SHA, CN_SZA, VN_HNX, VN_HSX)
### 2. Analysis (`src/analysis/`)
@@ -85,28 +82,19 @@ High-frequency trading with individual stock analysis:
**SmartVolatilityScanner** (`smart_scanner.py`) — Python-first filtering pipeline
- **Domestic (KR)**:
- **Step 1**: Fetch domestic fluctuation ranking as primary universe
- **Step 2**: Fetch domestic volume ranking for liquidity bonus
- **Step 3**: Compute volatility-first score (max of daily change% and intraday range%)
- **Step 4**: Apply liquidity bonus and return top N candidates
- **Overseas (US/JP/HK/CN/VN)**:
- **Step 1**: Fetch overseas ranking universe (fluctuation rank + volume rank bonus)
- **Step 2**: Compute volatility-first score (max of daily change% and intraday range%)
- **Step 3**: Apply liquidity bonus from volume ranking
- **Step 4**: Return top N candidates (default 3)
- **Fallback (overseas only)**: If ranking API is unavailable, uses dynamic universe
from runtime active symbols + recent traded symbols + current holdings (no static watchlist)
- **Step 1**: Fetch volume rankings from KIS API (top 30 stocks)
- **Step 2**: Calculate RSI and volume ratio for each stock
- **Step 3**: Apply filters:
- Volume ratio >= `VOL_MULTIPLIER` (default 2.0x previous day)
- RSI < `RSI_OVERSOLD_THRESHOLD` (30) OR RSI > `RSI_MOMENTUM_THRESHOLD` (70)
- **Step 4**: Score candidates by RSI extremity (60%) + volume surge (40%)
- **Step 5**: Return top N candidates (default 3) for AI analysis
- **Fallback**: Uses static watchlist if ranking API unavailable
- **Realtime mode only**: Daily mode uses batch processing for API efficiency
**Benefits:**
- Reduces Gemini API calls from 20-30 stocks to 1-3 qualified candidates
- Fast Python-based filtering before expensive AI judgment
- Logs selection context (RSI-compatible proxy, volume_ratio, signal, score) for Evolution system
### 3. Brain (`src/brain/`)
### 3. Brain (`src/brain/gemini_client.py`)
**GeminiClient** — AI decision engine powered by Google Gemini
**GeminiClient** (`gemini_client.py`) — AI decision engine powered by Google Gemini
- Constructs structured prompts from market data
- Parses JSON responses into `TradeDecision` objects (`action`, `confidence`, `rationale`)
@@ -114,11 +102,20 @@ High-frequency trading with individual stock analysis:
- Falls back to safe HOLD on any parse/API error
- Handles markdown-wrapped JSON, malformed responses, invalid actions
**PromptOptimizer** (`prompt_optimizer.py`) — Token efficiency optimization
- Reduces prompt size while preserving decision quality
- Caches optimized prompts
**ContextSelector** (`context_selector.py`) — Relevant context selection for prompts
- Selects appropriate context layers for current market conditions
### 4. Risk Manager (`src/core/risk_manager.py`)
**RiskManager** — Safety circuit breaker and order validation
⚠️ **READ-ONLY by policy** (see [`docs/agents.md`](./agents.md))
> **READ-ONLY by policy** (see [`docs/agents.md`](./agents.md))
- **Circuit Breaker**: Halts all trading via `SystemExit` when daily P&L drops below -3.0%
- Threshold may only be made stricter, never relaxed
@@ -126,7 +123,79 @@ High-frequency trading with individual stock analysis:
- **Fat-Finger Protection**: Rejects orders exceeding 30% of available cash
- Must always be enforced, cannot be disabled
### 5. Notifications (`src/notifications/telegram_client.py`)
### 5. Strategy (`src/strategy/`)
**Pre-Market Planner** (`pre_market_planner.py`) — AI playbook generation
- Runs before market open (configurable `PRE_MARKET_MINUTES`, default 30)
- Generates scenario-based playbooks via single Gemini API call per market
- Handles timeout (`PLANNER_TIMEOUT_SECONDS`, default 60) with defensive playbook fallback
- Persists playbooks to database for audit trail
**Scenario Engine** (`scenario_engine.py`) — Local scenario matching
- Matches live market data against pre-computed playbook scenarios
- No AI calls during trading hours — pure Python matching logic
- Returns matched scenarios with confidence scores
- Configurable `MAX_SCENARIOS_PER_STOCK` (default 5)
- Periodic rescan at `RESCAN_INTERVAL_SECONDS` (default 300)
**Playbook Store** (`playbook_store.py`) — Playbook persistence
- SQLite-backed storage for daily playbooks
- Date and market-based retrieval
- Status tracking (generated, active, expired)
**Models** (`models.py`) — Pydantic data models
- Scenario, Playbook, MatchResult, and related type definitions
### 6. Context System (`src/context/`)
**Context Store** (`store.py`) — L1-L7 hierarchical memory
- 7-layer context system (see [docs/context-tree.md](./context-tree.md)):
- L1: Tick-level (real-time price)
- L2: Intraday (session summary)
- L3: Daily (end-of-day)
- L4: Weekly (trend analysis)
- L5: Monthly (strategy review)
- L6: Daily Review (scorecard)
- L7: Evolution (long-term learning)
- Key-value storage with timeframe tagging
- SQLite persistence in `contexts` table
**Context Scheduler** (`scheduler.py`) — Periodic aggregation
- Scheduled summarization from lower to higher layers
- Configurable aggregation intervals
**Context Summarizer** (`summarizer.py`) — Layer summarization
- Aggregates lower-layer data into higher-layer summaries
### 7. Dashboard (`src/dashboard/`)
**FastAPI App** (`app.py`) — Read-only monitoring dashboard
- Runs as daemon thread when enabled (`--dashboard` CLI flag or `DASHBOARD_ENABLED=true`)
- Configurable host/port (`DASHBOARD_HOST`, `DASHBOARD_PORT`, default `127.0.0.1:8080`)
- Serves static HTML frontend
**8 API Endpoints:**
| Endpoint | Method | Description |
|----------|--------|-------------|
| `/` | GET | Static HTML dashboard |
| `/api/status` | GET | Daily trading status by market |
| `/api/playbook/{date}` | GET | Playbook for specific date and market |
| `/api/scorecard/{date}` | GET | Daily scorecard from L6_DAILY context |
| `/api/performance` | GET | Trading performance metrics (by market + combined) |
| `/api/context/{layer}` | GET | Query context by layer (L1-L7) |
| `/api/decisions` | GET | Decision log entries with outcomes |
| `/api/scenarios/active` | GET | Today's matched scenarios |
### 8. Notifications (`src/notifications/telegram_client.py`)
**TelegramClient** — Real-time event notifications via Telegram Bot API
@@ -134,7 +203,13 @@ High-frequency trading with individual stock analysis:
- Non-blocking: failures are logged but never crash trading
- Rate-limited: 1 message/second default to respect Telegram API limits
- Auto-disabled when credentials missing
- Gracefully handles API errors, network timeouts, invalid tokens
**TelegramCommandHandler** — Bidirectional command interface
- Long polling from Telegram API (configurable `TELEGRAM_POLLING_INTERVAL`)
- 9 interactive commands: `/help`, `/status`, `/positions`, `/report`, `/scenarios`, `/review`, `/dashboard`, `/stop`, `/resume`
- Authorization filtering by `TELEGRAM_CHAT_ID`
- Enable/disable via `TELEGRAM_COMMANDS_ENABLED` (default: true)
**Notification Types:**
- Trade execution (BUY/SELL with confidence)
@@ -142,12 +217,12 @@ High-frequency trading with individual stock analysis:
- Fat-finger protection triggers (order rejection)
- Market open/close events
- System startup/shutdown status
- Playbook generation results
- Stop-loss monitoring alerts
**Setup:** See [src/notifications/README.md](../src/notifications/README.md) for bot creation and configuration.
### 9. Evolution (`src/evolution/`)
### 6. Evolution (`src/evolution/optimizer.py`)
**StrategyOptimizer** — Self-improvement loop
**StrategyOptimizer** (`optimizer.py`) — Self-improvement loop
- Analyzes high-confidence losing trades from SQLite
- Asks Gemini to generate new `BaseStrategy` subclasses
@@ -155,101 +230,196 @@ High-frequency trading with individual stock analysis:
- Simulates PR creation for human review
- Only activates strategies that pass all tests
**DailyReview** (`daily_review.py`) — End-of-day review
- Generates comprehensive trade performance summary
- Stores results in L6_DAILY context layer
- Tracks win rate, P&L, confidence accuracy
**DailyScorecard** (`scorecard.py`) — Performance scoring
- Calculates daily metrics (trades, P&L, win rate, avg confidence)
- Enables trend tracking across days
**Stop-Loss Monitoring** — Real-time position protection
- Monitors positions against stop-loss levels from playbook scenarios
- Sends Telegram alerts when thresholds approached or breached
### 10. Decision Logger (`src/logging/decision_logger.py`)
**DecisionLogger** — Comprehensive audit trail
- Logs every trading decision with full context snapshot
- Captures input data, rationale, confidence, and outcomes
- Supports outcome tracking (P&L, accuracy) for post-analysis
- Stored in `decision_logs` table with indexed queries
- Review workflow support (reviewed flag, review notes)
### 11. Data Integration (`src/data/`)
**External Data Sources** (optional):
- `news_api.py` — News sentiment data
- `market_data.py` — Extended market data
- `economic_calendar.py` — Economic event calendar
### 12. Backup (`src/backup/`)
**Disaster Recovery** (see [docs/disaster_recovery.md](./disaster_recovery.md)):
- `scheduler.py` — Automated backup scheduling
- `exporter.py` — Data export to various formats
- `cloud_storage.py` — S3-compatible cloud backup
- `health_monitor.py` — Backup integrity verification
## Data Flow
### Playbook Mode (Daily — Primary v2 Flow)
```
┌─────────────────────────────────────────────────────────────┐
│ Pre-Market Phase (before market open) │
└─────────────────────────────────────────────────────────────┘
┌──────────────────────────────────┐
│ Pre-Market Planner │
│ - 1 Gemini API call per market │
│ - Generate scenario playbook │
│ - Store in playbooks table │
└──────────────────┬───────────────┘
┌─────────────────────────────────────────────────────────────┐
│ Trading Hours (market open → close) │
└─────────────────────────────────────────────────────────────┘
┌──────────────────────────────────┐
│ Market Schedule Check │
│ - Get open markets │
│ - Filter by enabled markets │
└──────────────────┬───────────────┘
┌──────────────────────────────────┐
│ Scenario Engine (local) │
│ - Match live data vs playbook │
│ - No AI calls needed │
│ - Return matched scenarios │
└──────────────────┬───────────────┘
┌──────────────────────────────────┐
│ Risk Manager: Validate Order │
│ - Check circuit breaker │
│ - Check fat-finger limit │
└──────────────────┬───────────────┘
┌──────────────────────────────────┐
│ Broker: Execute Order │
│ - Domestic: send_order() │
│ - Overseas: send_overseas_order()│
└──────────────────┬───────────────┘
┌──────────────────────────────────┐
│ Decision Logger + DB │
│ - Full audit trail │
│ - Context snapshot │
│ - Telegram notification │
└──────────────────┬───────────────┘
┌─────────────────────────────────────────────────────────────┐
│ Post-Market Phase │
└─────────────────────────────────────────────────────────────┘
┌──────────────────────────────────┐
│ Daily Review + Scorecard │
│ - Performance summary │
│ - Store in L6_DAILY context │
│ - Evolution learning │
└──────────────────────────────────┘
```
### Realtime Mode (with Smart Scanner)
```
┌─────────────────────────────────────────────────────────────┐
│ Main Loop (60s cycle per market) │
│ Main Loop (60s cycle per market)
└─────────────────────────────────────────────────────────────┘
┌──────────────────────────────────┐
│ Market Schedule Check
│ - Get open markets
│ - Filter by enabled markets
│ - Wait if all closed
└──────────────────┬───────────────
│ Market Schedule Check │
│ - Get open markets │
│ - Filter by enabled markets │
│ - Wait if all closed │
└──────────────────┬───────────────┘
┌──────────────────────────────────┐
│ Smart Scanner (Python-first)
│ - Domestic: fluctuation rank
+ volume rank bonus
+ volatility-first scoring
│ - Overseas: ranking universe
│ + volatility-first scoring │
│ - Fallback: dynamic universe │
│ Smart Scanner (Python-first) │
│ - Fetch volume rankings (KIS)
- Get 20d price history per stock
- Calculate RSI(14) + vol ratio
│ - Filter: vol>2x AND RSI extreme
│ - Return top 3 qualified stocks │
└──────────────────┬───────────────
└──────────────────┬───────────────┘
┌──────────────────────────────────┐
│ For Each Qualified Candidate
└──────────────────┬───────────────
│ For Each Qualified Candidate │
└──────────────────┬───────────────┘
┌──────────────────────────────────┐
│ Broker: Fetch Market Data │
│ - Domestic: orderbook + balance │
│ - Overseas: price + balance │
└──────────────────┬───────────────
└──────────────────┬───────────────┘
┌──────────────────────────────────┐
Calculate P&L
pnl_pct = (eval - cost) / cost
└──────────────────┬────────────────┘
Brain: Get Decision (AI)
- Build prompt with market data
│ - Call Gemini API │
│ - Parse JSON response │
│ - Return TradeDecision │
└──────────────────┬───────────────┘
┌──────────────────────────────────┐
Brain: Get Decision (AI)
│ - Build prompt with market data
│ - Call Gemini API
│ - Parse JSON response │
│ - Return TradeDecision │
└──────────────────┬────────────────┘
Risk Manager: Validate Order
│ - Check circuit breaker
│ - Check fat-finger limit
└──────────────────┬───────────────┘
┌──────────────────────────────────┐
Risk Manager: Validate Order │
│ - Check circuit breaker
│ - Check fat-finger limit
│ - Raise if validation fails │
└──────────────────┬────────────────┘
Broker: Execute Order
│ - Domestic: send_order()
│ - Overseas: send_overseas_order()
└──────────────────┬───────────────┘
┌──────────────────────────────────┐
Broker: Execute Order
│ - Domestic: send_order()
│ - Overseas: send_overseas_order()
└──────────────────┬────────────────┘
┌──────────────────────────────────┐
│ Notifications: Send Alert │
│ - Trade execution notification │
│ - Non-blocking (errors logged) │
│ - Rate-limited to 1/sec │
└──────────────────┬────────────────┘
┌──────────────────────────────────┐
│ Database: Log Trade │
│ - SQLite (data/trades.db) │
│ - Track: action, confidence, │
│ rationale, market, exchange │
│ - NEW: selection_context (JSON) │
│ - RSI, volume_ratio, signal │
│ - For Evolution optimization │
└───────────────────────────────────┘
Decision Logger + Notifications
│ - Log trade to SQLite
│ - selection_context (JSON)
│ - Telegram notification │
└──────────────────────────────────┘
```
## Database Schema
**SQLite** (`src/db.py`)
**SQLite** (`src/db.py`) — Database: `data/trades.db`
### trades
```sql
CREATE TABLE trades (
id INTEGER PRIMARY KEY AUTOINCREMENT,
@@ -261,25 +431,73 @@ CREATE TABLE trades (
quantity INTEGER,
price REAL,
pnl REAL DEFAULT 0.0,
market TEXT DEFAULT 'KR', -- KR | US_NASDAQ | JP | etc.
exchange_code TEXT DEFAULT 'KRX', -- KRX | NASD | NYSE | etc.
selection_context TEXT -- JSON: {rsi, volume_ratio, signal, score}
market TEXT DEFAULT 'KR',
exchange_code TEXT DEFAULT 'KRX',
selection_context TEXT, -- JSON: {rsi, volume_ratio, signal, score}
decision_id TEXT -- Links to decision_logs
);
```
**Selection Context** (new in v0.9.0): Stores scanner selection criteria as JSON:
```json
{
"rsi": 28.5,
"volume_ratio": 2.7,
"signal": "oversold",
"score": 85.2
}
### contexts
```sql
CREATE TABLE contexts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
layer TEXT NOT NULL, -- L1 through L7
timeframe TEXT,
key TEXT NOT NULL,
value TEXT NOT NULL, -- JSON data
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
-- Indices: idx_contexts_layer, idx_contexts_timeframe, idx_contexts_updated
```
Enables Evolution system to analyze correlation between selection criteria and trade outcomes.
### decision_logs
```sql
CREATE TABLE decision_logs (
decision_id TEXT PRIMARY KEY,
timestamp TEXT NOT NULL,
stock_code TEXT,
market TEXT,
exchange_code TEXT,
action TEXT,
confidence INTEGER,
rationale TEXT,
context_snapshot TEXT, -- JSON: full context at decision time
input_data TEXT, -- JSON: market data used
outcome_pnl REAL,
outcome_accuracy REAL,
reviewed INTEGER DEFAULT 0,
review_notes TEXT
);
-- Indices: idx_decision_logs_timestamp, idx_decision_logs_reviewed, idx_decision_logs_confidence
```
Auto-migration: Adds `market`, `exchange_code`, and `selection_context` columns if missing for backward compatibility.
### playbooks
```sql
CREATE TABLE playbooks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT NOT NULL,
market TEXT NOT NULL,
status TEXT DEFAULT 'generated',
playbook_json TEXT NOT NULL, -- Full playbook with scenarios
generated_at TEXT NOT NULL,
token_count INTEGER,
scenario_count INTEGER,
match_count INTEGER DEFAULT 0
);
-- Indices: idx_playbooks_date, idx_playbooks_market
```
### context_metadata
```sql
CREATE TABLE context_metadata (
layer TEXT PRIMARY KEY,
description TEXT,
retention_days INTEGER,
aggregation_source TEXT
);
```
## Configuration
@@ -294,42 +512,62 @@ KIS_APP_SECRET=your_app_secret
KIS_ACCOUNT_NO=XXXXXXXX-XX
GEMINI_API_KEY=your_gemini_key
# Optional
# Optional — Trading Mode
MODE=paper # paper | live
DB_PATH=data/trades.db
CONFIDENCE_THRESHOLD=80
MAX_LOSS_PCT=3.0
MAX_ORDER_PCT=30.0
ENABLED_MARKETS=KR,US_NASDAQ # Comma-separated market codes
# Trading Mode (API efficiency)
TRADE_MODE=daily # daily | realtime
DAILY_SESSIONS=4 # Sessions per day (daily mode only)
SESSION_INTERVAL_HOURS=6 # Hours between sessions (daily mode only)
# Telegram Notifications (optional)
# Optional — Database
DB_PATH=data/trades.db
# Optional — Risk
CONFIDENCE_THRESHOLD=80
MAX_LOSS_PCT=3.0
MAX_ORDER_PCT=30.0
# Optional — Markets
ENABLED_MARKETS=KR,US # Comma-separated market codes
RATE_LIMIT_RPS=2.0 # KIS API requests per second
# Optional — Pre-Market Planner (v2)
PRE_MARKET_MINUTES=30 # Minutes before market open to generate playbook
MAX_SCENARIOS_PER_STOCK=5 # Max scenarios per stock in playbook
PLANNER_TIMEOUT_SECONDS=60 # Timeout for playbook generation
DEFENSIVE_PLAYBOOK_ON_FAILURE=true # Fallback on AI failure
RESCAN_INTERVAL_SECONDS=300 # Scenario rescan interval during trading
# Optional — Smart Scanner (realtime mode only)
RSI_OVERSOLD_THRESHOLD=30 # 0-50, oversold threshold
RSI_MOMENTUM_THRESHOLD=70 # 50-100, momentum threshold
VOL_MULTIPLIER=2.0 # Minimum volume ratio (2.0 = 200%)
SCANNER_TOP_N=3 # Max qualified candidates per scan
# Optional — Dashboard
DASHBOARD_ENABLED=false # Enable FastAPI dashboard
DASHBOARD_HOST=127.0.0.1 # Dashboard bind address
DASHBOARD_PORT=8080 # Dashboard port (1-65535)
# Optional — Telegram
TELEGRAM_BOT_TOKEN=1234567890:ABCdefGHIjklMNOpqrsTUVwxyz
TELEGRAM_CHAT_ID=123456789
TELEGRAM_ENABLED=true
TELEGRAM_COMMANDS_ENABLED=true # Enable bidirectional commands
TELEGRAM_POLLING_INTERVAL=1.0 # Command polling interval (seconds)
# Smart Scanner (optional, realtime mode only)
SCANNER_TOP_N=3 # Max qualified candidates per scan
POSITION_SIZING_ENABLED=true
POSITION_BASE_ALLOCATION_PCT=5.0
POSITION_MIN_ALLOCATION_PCT=1.0
POSITION_MAX_ALLOCATION_PCT=10.0
POSITION_VOLATILITY_TARGET_SCORE=50.0
# Legacy/compat scanner thresholds (kept for backward compatibility)
RSI_OVERSOLD_THRESHOLD=30
RSI_MOMENTUM_THRESHOLD=70
VOL_MULTIPLIER=2.0
# Optional — Backup
BACKUP_ENABLED=false
BACKUP_DIR=data/backups
S3_ENDPOINT_URL=...
S3_ACCESS_KEY=...
S3_SECRET_KEY=...
S3_BUCKET_NAME=...
S3_REGION=...
# Overseas Ranking API (optional override; account-dependent)
OVERSEAS_RANKING_ENABLED=true
OVERSEAS_RANKING_FLUCT_TR_ID=HHDFS76200100
OVERSEAS_RANKING_VOLUME_TR_ID=HHDFS76200200
OVERSEAS_RANKING_FLUCT_PATH=/uapi/overseas-price/v1/quotations/inquire-updown-rank
OVERSEAS_RANKING_VOLUME_PATH=/uapi/overseas-price/v1/quotations/inquire-volume-rank
# Optional — External Data
NEWS_API_KEY=...
NEWS_API_PROVIDER=...
MARKET_DATA_API_KEY=...
```
Tests use in-memory SQLite (`DB_PATH=":memory:"`) and dummy credentials via `tests/conftest.py`.
@@ -363,4 +601,9 @@ Tests use in-memory SQLite (`DB_PATH=":memory:"`) and dummy credentials via `tes
- Invalid token → log error, trading unaffected
- Rate limit exceeded → queued via rate limiter
**Guarantee**: Notification failures never interrupt trading operations.
### Playbook Generation Failure
- Timeout → fall back to defensive playbook (`DEFENSIVE_PLAYBOOK_ON_FAILURE`)
- API error → use previous day's playbook if available
- No playbook → skip pre-market phase, fall back to direct AI calls
**Guarantee**: Notification and dashboard failures never interrupt trading operations.

View File

@@ -119,7 +119,7 @@ No decorator needed for async tests.
# Install all dependencies (production + dev)
pip install -e ".[dev]"
# Run full test suite with coverage
# Run full test suite with coverage (551 tests across 25 files)
pytest -v --cov=src --cov-report=term-missing
# Run a single test file
@@ -137,11 +137,61 @@ mypy src/ --strict
# Run the trading agent
python -m src.main --mode=paper
# Run with dashboard enabled
python -m src.main --mode=paper --dashboard
# Docker
docker compose up -d ouroboros # Run agent
docker compose --profile test up test # Run tests in container
```
## Dashboard
The FastAPI dashboard provides read-only monitoring of the trading system.
### Starting the Dashboard
```bash
# Via CLI flag
python -m src.main --mode=paper --dashboard
# Via environment variable
DASHBOARD_ENABLED=true python -m src.main --mode=paper
```
Dashboard runs as a daemon thread on `DASHBOARD_HOST:DASHBOARD_PORT` (default: `127.0.0.1:8080`).
### API Endpoints
| Endpoint | Description |
|----------|-------------|
| `GET /` | HTML dashboard UI |
| `GET /api/status` | Daily trading status by market |
| `GET /api/playbook/{date}` | Playbook for specific date (query: `market`) |
| `GET /api/scorecard/{date}` | Daily scorecard from L6_DAILY context |
| `GET /api/performance` | Performance metrics by market and combined |
| `GET /api/context/{layer}` | Context data by layer L1-L7 (query: `timeframe`) |
| `GET /api/decisions` | Decision log entries (query: `limit`, `market`) |
| `GET /api/scenarios/active` | Today's matched scenarios |
## Telegram Commands
When `TELEGRAM_COMMANDS_ENABLED=true` (default), the bot accepts these interactive commands:
| Command | Description |
|---------|-------------|
| `/help` | List available commands |
| `/status` | Show trading status (mode, markets, P&L) |
| `/positions` | Display account summary (balance, cash, P&L) |
| `/report` | Daily summary metrics (trades, P&L, win rate) |
| `/scenarios` | Show today's playbook scenarios |
| `/review` | Display recent scorecards (L6_DAILY layer) |
| `/dashboard` | Show dashboard URL if enabled |
| `/stop` | Pause trading |
| `/resume` | Resume trading |
Commands are only processed from the authorized `TELEGRAM_CHAT_ID`.
## Environment Setup
```bash

View File

@@ -91,56 +91,23 @@
## 2026-02-16
### 해외 스캐너 개선: 랭킹 연동 + 변동성 우선 선별
### 문서 v2 동기화 (전체 문서 현행화)
**배경:**
- `run_overnight` 실운영에서 미국장 동안 거래가 0건 지속
- 원인: 해외 시장에서도 국내 랭킹/일봉 API 경로를 사용하던 구조적 불일치
- v2 기능 구현 완료 후 문서가 실제 코드 상태와 크게 괴리
- 문서에는 54 tests / 4 files로 기록되었으나 실제로는 551 tests / 25 files
- v2 핵심 기능(Playbook, Scenario Engine, Dashboard, Telegram Commands, Daily Review, Context System, Backup) 문서화 누락
**요구사항:**
1. 해외 시장도 랭킹 API 기반 유니버스 탐색 지원
2. 단순 상승률/거래대금 상위가 아니라, **변동성이 큰 종목**을 우선 선별
3. 고정 티커 fallback 금지
1. `docs/testing.md` — 551 tests / 25 files 반영, 전체 테스트 파일 설명
2. `docs/architecture.md` — v2 컴포넌트(Strategy, Context, Dashboard, Decision Logger 등) 추가, Playbook Mode 데이터 플로우, DB 스키마 5개 테이블, v2 환경변수
3. `docs/commands.md` — Dashboard 실행 명령어, Telegram 명령어 9종 레퍼런스
4. `CLAUDE.md` — Project Structure 트리 확장, 테스트 수 업데이트, `--dashboard` 플래그
5. `docs/skills.md` — DB 파일명 `trades.db`로 통일, Dashboard 명령어 추가
6. 기존에 유효한 트러블슈팅, 코드 예제 등은 유지
**구현 결과:**
- `src/broker/overseas.py`
- `fetch_overseas_rankings()` 추가 (fluctuation / volume)
- 해외 랭킹 API 경로/TR_ID를 설정값으로 오버라이드 가능하게 구현
- `src/analysis/smart_scanner.py`
- market-aware 스캔(국내/해외 분리)
- 해외: 랭킹 API 유니버스 + 변동성 우선 점수(일변동률 vs 장중 고저폭)
- 거래대금/거래량 랭킹은 유동성 보정 점수로 활용
- 랭킹 실패 시에는 동적 유니버스(active/recent/holdings)만 사용
- `src/config.py`
- `OVERSEAS_RANKING_*` 설정 추가
- 6개 문서 파일 업데이트
- 이전 시도(2개 커밋)는 기존 내용을 과도하게 삭제하여 폐기, main 기준으로 재작업
**효과:**
- 해외 시장에서 스캐너 후보 0개로 정지되는 상황 완화
- 종목 선정 기준이 단순 상승률 중심에서 변동성 중심으로 개선
- 고정 티커 없이도 시장 주도 변동 종목 탐지 가능
### 국내 스캐너/주문수량 정렬: 변동성 우선 + 리스크 타기팅
**배경:**
- 해외만 변동성 우선으로 동작하고, 국내는 RSI/거래량 필터 중심으로 동작해 시장 간 전략 일관성이 낮았음
- 매수 수량이 고정 1주라서 변동성 구간별 익스포저 관리가 어려웠음
**요구사항:**
1. 국내 스캐너도 변동성 우선 선별로 해외와 통일
2. 고변동 종목일수록 포지션 크기를 줄이는 수량 산식 적용
**구현 결과:**
- `src/analysis/smart_scanner.py`
- 국내: `fluctuation ranking + volume ranking bonus` 기반 점수화로 전환
- 점수는 `max(abs(change_rate), intraday_range_pct)` 중심으로 계산
- 국내 랭킹 응답 스키마 키(`price`, `change_rate`, `volume`) 파싱 보강
- `src/main.py`
- `_determine_order_quantity()` 추가
- BUY 시 변동성 점수 기반 동적 수량 산정 적용
- `trading_cycle`, `run_daily_session` 경로 모두 동일 수량 로직 사용
- `src/config.py`
- `POSITION_SIZING_*` 설정 추가
**효과:**
- 국내/해외 스캐너 기준이 변동성 중심으로 일관화
- 고변동 구간에서 자동 익스포저 축소, 저변동 구간에서 과소진입 완화
**이슈/PR:** #131, PR #134

View File

@@ -34,6 +34,12 @@ python -m src.main --mode=paper
```
Runs the agent in paper-trading mode (no real orders).
### Start Trading Agent with Dashboard
```bash
python -m src.main --mode=paper --dashboard
```
Runs the agent with FastAPI dashboard on `127.0.0.1:8080` (configurable via `DASHBOARD_HOST`/`DASHBOARD_PORT`).
### Start Trading Agent (Production)
```bash
docker compose up -d ouroboros
@@ -59,7 +65,7 @@ Analyze the last 30 days of trade logs and generate performance metrics.
python -m src.evolution.optimizer --evolve
```
Triggers the evolution engine to:
1. Analyze `trade_logs.db` for failing patterns
1. Analyze `trades.db` for failing patterns
2. Ask Gemini to generate a new strategy
3. Run tests on the new strategy
4. Create a PR if tests pass
@@ -91,12 +97,12 @@ curl http://localhost:8080/health
### View Trade Logs
```bash
sqlite3 data/trade_logs.db "SELECT * FROM trades ORDER BY timestamp DESC LIMIT 20;"
sqlite3 data/trades.db "SELECT * FROM trades ORDER BY timestamp DESC LIMIT 20;"
```
### Export Trade History
```bash
sqlite3 -header -csv data/trade_logs.db "SELECT * FROM trades;" > trades_export.csv
sqlite3 -header -csv data/trades.db "SELECT * FROM trades;" > trades_export.csv
```
## Safety Checklist (Pre-Deploy)

View File

@@ -2,51 +2,29 @@
## Test Structure
**54 tests** across four files. `asyncio_mode = "auto"` in pyproject.toml — async tests need no special decorator.
**551 tests** across **25 files**. `asyncio_mode = "auto"` in pyproject.toml — async tests need no special decorator.
The `settings` fixture in `conftest.py` provides safe defaults with test credentials and in-memory DB.
### Test Files
#### `tests/test_risk.py` (11 tests)
- Circuit breaker boundaries
- Fat-finger edge cases
#### Core Components
##### `tests/test_risk.py` (14 tests)
- Circuit breaker boundaries and exact threshold triggers
- Fat-finger edge cases and percentage validation
- P&L calculation edge cases
- Order validation logic
**Example:**
```python
def test_circuit_breaker_exact_threshold(risk_manager):
"""Circuit breaker should trip at exactly -3.0%."""
with pytest.raises(CircuitBreakerTripped):
risk_manager.validate_order(
current_pnl_pct=-3.0,
order_amount=1000,
total_cash=10000
)
```
#### `tests/test_broker.py` (6 tests)
##### `tests/test_broker.py` (11 tests)
- OAuth token lifecycle
- Rate limiting enforcement
- Hash key generation
- Network error handling
- SSL context configuration
**Example:**
```python
async def test_rate_limiter(broker):
"""Rate limiter should delay requests to stay under 10 RPS."""
start = time.monotonic()
for _ in range(15): # 15 requests
await broker._rate_limiter.acquire()
elapsed = time.monotonic() - start
assert elapsed >= 1.0 # Should take at least 1 second
```
#### `tests/test_brain.py` (18 tests)
- Valid JSON parsing
- Markdown-wrapped JSON handling
##### `tests/test_brain.py` (24 tests)
- Valid JSON parsing and markdown-wrapped JSON handling
- Malformed JSON fallback
- Missing fields handling
- Invalid action validation
@@ -54,33 +32,143 @@ async def test_rate_limiter(broker):
- Empty response handling
- Prompt construction for different markets
**Example:**
```python
async def test_confidence_below_threshold_forces_hold(brain):
"""Decisions below confidence threshold should force HOLD."""
decision = brain.parse_response('{"action":"BUY","confidence":70,"rationale":"test"}')
assert decision.action == "HOLD"
assert decision.confidence == 70
```
#### `tests/test_market_schedule.py` (19 tests)
##### `tests/test_market_schedule.py` (24 tests)
- Market open/close logic
- Timezone handling (UTC, Asia/Seoul, America/New_York, etc.)
- DST (Daylight Saving Time) transitions
- Weekend handling
- Lunch break logic
- Weekend handling and lunch break logic
- Multiple market filtering
- Next market open calculation
**Example:**
```python
def test_is_market_open_during_trading_hours():
"""Market should be open during regular trading hours."""
# KRX: 9:00-15:30 KST, no lunch break
market = MARKETS["KR"]
trading_time = datetime(2026, 2, 3, 10, 0, tzinfo=ZoneInfo("Asia/Seoul")) # Monday 10:00
assert is_market_open(market, trading_time) is True
```
##### `tests/test_db.py` (3 tests)
- Database initialization and table creation
- Trade logging with all fields (market, exchange_code, decision_id)
- Query and retrieval operations
##### `tests/test_main.py` (37 tests)
- Trading loop orchestration
- Market iteration and stock processing
- Dashboard integration (`--dashboard` flag)
- Telegram command handler wiring
- Error handling and graceful shutdown
#### Strategy & Playbook (v2)
##### `tests/test_pre_market_planner.py` (37 tests)
- Pre-market playbook generation
- Gemini API integration for scenario creation
- Timeout handling and defensive playbook fallback
- Multi-market playbook generation
##### `tests/test_scenario_engine.py` (44 tests)
- Scenario matching against live market data
- Confidence scoring and threshold filtering
- Multiple scenario type handling
- Edge cases (no match, partial match, expired scenarios)
##### `tests/test_playbook_store.py` (23 tests)
- Playbook persistence to SQLite
- Date-based retrieval and market filtering
- Playbook status management (generated, active, expired)
- JSON serialization/deserialization
##### `tests/test_strategy_models.py` (33 tests)
- Pydantic model validation for scenarios, playbooks, decisions
- Field constraints and default values
- Serialization round-trips
#### Analysis & Scanning
##### `tests/test_volatility.py` (24 tests)
- ATR and RSI calculation accuracy
- Volume surge ratio computation
- Momentum scoring
- Breakout/breakdown pattern detection
- Market scanner watchlist management
##### `tests/test_smart_scanner.py` (13 tests)
- Python-first filtering pipeline
- RSI and volume ratio filter logic
- Candidate scoring and ranking
- Fallback to static watchlist
#### Context & Memory
##### `tests/test_context.py` (18 tests)
- L1-L7 layer storage and retrieval
- Context key-value CRUD operations
- Timeframe-based queries
- Layer metadata management
##### `tests/test_context_scheduler.py` (5 tests)
- Periodic context aggregation scheduling
- Layer summarization triggers
#### Evolution & Review
##### `tests/test_evolution.py` (24 tests)
- Strategy optimization loop
- High-confidence losing trade analysis
- Generated strategy validation
##### `tests/test_daily_review.py` (10 tests)
- End-of-day review generation
- Trade performance summarization
- Context layer (L6_DAILY) integration
##### `tests/test_scorecard.py` (3 tests)
- Daily scorecard metrics calculation
- Win rate, P&L, confidence tracking
#### Notifications & Commands
##### `tests/test_telegram.py` (25 tests)
- Message sending and formatting
- Rate limiting (leaky bucket)
- Error handling (network timeout, invalid token)
- Auto-disable on missing credentials
- Notification types (trade, circuit breaker, fat-finger, market events)
##### `tests/test_telegram_commands.py` (31 tests)
- 9 command handlers (/help, /status, /positions, /report, /scenarios, /review, /dashboard, /stop, /resume)
- Long polling and command dispatch
- Authorization filtering by chat_id
- Command response formatting
#### Dashboard
##### `tests/test_dashboard.py` (14 tests)
- FastAPI endpoint responses (8 API routes)
- Status, playbook, scorecard, performance, context, decisions, scenarios
- Query parameter handling (market, date, limit)
#### Performance & Quality
##### `tests/test_token_efficiency.py` (34 tests)
- Gemini token usage optimization
- Prompt size reduction verification
- Cache effectiveness
##### `tests/test_latency_control.py` (30 tests)
- API call latency measurement
- Rate limiter timing accuracy
- Async operation overhead
##### `tests/test_decision_logger.py` (9 tests)
- Decision audit trail completeness
- Context snapshot capture
- Outcome tracking (P&L, accuracy)
##### `tests/test_data_integration.py` (38 tests)
- External data source integration
- News API, market data, economic calendar
- Error handling for API failures
##### `tests/test_backup.py` (23 tests)
- Backup scheduler and execution
- Cloud storage (S3) upload
- Health monitoring
- Data export functionality
## Coverage Requirements
@@ -91,20 +179,6 @@ Check coverage:
pytest -v --cov=src --cov-report=term-missing
```
Expected output:
```
Name Stmts Miss Cover Missing
-----------------------------------------------------------
src/brain/gemini_client.py 85 5 94% 165-169
src/broker/kis_api.py 120 12 90% ...
src/core/risk_manager.py 35 2 94% ...
src/db.py 25 1 96% ...
src/main.py 150 80 47% (excluded from CI)
src/markets/schedule.py 95 3 97% ...
-----------------------------------------------------------
TOTAL 510 103 80%
```
**Note:** `main.py` has lower coverage as it contains the main loop which is tested via integration/manual testing.
## Test Configuration

View File

@@ -1,4 +1,8 @@
"""Smart Volatility Scanner with volatility-first market ranking logic."""
"""Smart Volatility Scanner with RSI and volume filters.
Fetches market rankings from KIS API and applies technical filters
to identify high-probability trading candidates.
"""
from __future__ import annotations
@@ -8,9 +12,7 @@ from typing import Any
from src.analysis.volatility import VolatilityAnalyzer
from src.broker.kis_api import KISBroker
from src.broker.overseas import OverseasBroker
from src.config import Settings
from src.markets.schedule import MarketInfo
logger = logging.getLogger(__name__)
@@ -30,19 +32,19 @@ class ScanCandidate:
class SmartVolatilityScanner:
"""Scans market rankings and applies volatility-first filters.
"""Scans market rankings and applies RSI/volume filters.
Flow:
1. Fetch fluctuation rankings as primary universe
2. Fetch volume rankings for liquidity bonus
3. Score by volatility first, liquidity second
4. Return top N qualified candidates
1. Fetch volume rankings from KIS API
2. For each ranked stock, fetch daily prices
3. Calculate RSI and volume ratio
4. Apply filters: volume > VOL_MULTIPLIER AND (RSI < 30 OR RSI > 70)
5. Return top N qualified candidates
"""
def __init__(
self,
broker: KISBroker,
overseas_broker: OverseasBroker | None,
volatility_analyzer: VolatilityAnalyzer,
settings: Settings,
) -> None:
@@ -54,7 +56,6 @@ class SmartVolatilityScanner:
settings: Application settings
"""
self.broker = broker
self.overseas_broker = overseas_broker
self.analyzer = volatility_analyzer
self.settings = settings
@@ -66,129 +67,107 @@ class SmartVolatilityScanner:
async def scan(
self,
market: MarketInfo | None = None,
fallback_stocks: list[str] | None = None,
) -> list[ScanCandidate]:
"""Execute smart scan and return qualified candidates.
Args:
market: Target market info (domestic vs overseas behavior)
fallback_stocks: Stock codes to use if ranking API fails
Returns:
List of ScanCandidate, sorted by score, up to top_n items
"""
if market and not market.is_domestic:
return await self._scan_overseas(market, fallback_stocks)
return await self._scan_domestic(fallback_stocks)
async def _scan_domestic(
self,
fallback_stocks: list[str] | None = None,
) -> list[ScanCandidate]:
"""Scan domestic market using volatility-first ranking + liquidity bonus."""
# 1) Primary universe from fluctuation ranking.
# Step 1: Fetch rankings
try:
fluct_rows = await self.broker.fetch_market_rankings(
ranking_type="fluctuation",
limit=50,
)
except ConnectionError as exc:
logger.warning("Domestic fluctuation ranking failed: %s", exc)
fluct_rows = []
# 2) Liquidity bonus from volume ranking.
try:
volume_rows = await self.broker.fetch_market_rankings(
rankings = await self.broker.fetch_market_rankings(
ranking_type="volume",
limit=50,
limit=30, # Fetch more than needed for filtering
)
logger.info("Fetched %d stocks from volume rankings", len(rankings))
except ConnectionError as exc:
logger.warning("Domestic volume ranking failed: %s", exc)
volume_rows = []
if not fluct_rows and fallback_stocks:
logger.info(
"Domestic ranking unavailable; using fallback symbols (%d)",
len(fallback_stocks),
)
fluct_rows = [
{
"stock_code": code,
"name": code,
"price": 0.0,
"volume": 0.0,
"change_rate": 0.0,
"volume_increase_rate": 0.0,
}
for code in fallback_stocks
]
if not fluct_rows:
return []
volume_rank_bonus: dict[str, float] = {}
for idx, row in enumerate(volume_rows):
code = _extract_stock_code(row)
if not code:
continue
volume_rank_bonus[code] = max(0.0, 15.0 - idx * 0.3)
logger.warning("Ranking API failed, using fallback: %s", exc)
if fallback_stocks:
# Create minimal ranking data for fallback
rankings = [
{
"stock_code": code,
"name": code,
"price": 0,
"volume": 0,
"change_rate": 0,
"volume_increase_rate": 0,
}
for code in fallback_stocks
]
else:
return []
# Step 2: Analyze each stock
candidates: list[ScanCandidate] = []
for stock in fluct_rows:
stock_code = _extract_stock_code(stock)
for stock in rankings:
stock_code = stock["stock_code"]
if not stock_code:
continue
try:
price = _extract_last_price(stock)
change_rate = _extract_change_rate_pct(stock)
volume = _extract_volume(stock)
# Fetch daily prices for RSI calculation
daily_prices = await self.broker.get_daily_prices(stock_code, days=20)
intraday_range_pct = 0.0
volume_ratio = _safe_float(stock.get("volume_increase_rate"), 0.0) / 100.0 + 1.0
# Use daily chart to refine range/volume when available.
daily_prices = await self.broker.get_daily_prices(stock_code, days=2)
if daily_prices:
latest = daily_prices[-1]
latest_close = _safe_float(latest.get("close"), default=price)
if price <= 0:
price = latest_close
latest_high = _safe_float(latest.get("high"))
latest_low = _safe_float(latest.get("low"))
if latest_close > 0 and latest_high > 0 and latest_low > 0 and latest_high >= latest_low:
intraday_range_pct = (latest_high - latest_low) / latest_close * 100.0
if volume <= 0:
volume = _safe_float(latest.get("volume"))
if len(daily_prices) >= 2:
prev_day_volume = _safe_float(daily_prices[-2].get("volume"))
if prev_day_volume > 0:
volume_ratio = max(volume_ratio, volume / prev_day_volume)
volatility_pct = max(abs(change_rate), intraday_range_pct)
if price <= 0 or volatility_pct < 0.8:
if len(daily_prices) < 15: # Need at least 14+1 for RSI
logger.debug("Insufficient price history for %s", stock_code)
continue
volatility_score = min(volatility_pct / 10.0, 1.0) * 85.0
liquidity_score = volume_rank_bonus.get(stock_code, 0.0)
score = min(100.0, volatility_score + liquidity_score)
signal = "momentum" if change_rate >= 0 else "oversold"
implied_rsi = max(0.0, min(100.0, 50.0 + (change_rate * 4.0)))
# Calculate RSI
close_prices = [p["close"] for p in daily_prices]
rsi = self.analyzer.calculate_rsi(close_prices, period=14)
candidates.append(
ScanCandidate(
stock_code=stock_code,
name=stock.get("name", stock_code),
price=price,
volume=volume,
volume_ratio=max(1.0, volume_ratio, volatility_pct / 2.0),
rsi=implied_rsi,
signal=signal,
score=score,
# Calculate volume ratio (today vs previous day avg)
if len(daily_prices) >= 2:
prev_day_volume = daily_prices[-2]["volume"]
current_volume = stock.get("volume", 0) or daily_prices[-1]["volume"]
volume_ratio = (
current_volume / prev_day_volume if prev_day_volume > 0 else 1.0
)
else:
volume_ratio = stock.get("volume_increase_rate", 0) / 100 + 1 # Fallback
# Apply filters
volume_qualified = volume_ratio >= self.vol_multiplier
rsi_oversold = rsi < self.rsi_oversold
rsi_momentum = rsi > self.rsi_momentum
if volume_qualified and (rsi_oversold or rsi_momentum):
signal = "oversold" if rsi_oversold else "momentum"
# Calculate composite score
# Higher score for: extreme RSI + high volume
rsi_extremity = abs(rsi - 50) / 50 # 0-1 scale
volume_score = min(volume_ratio / 5, 1.0) # Cap at 5x
score = (rsi_extremity * 0.6 + volume_score * 0.4) * 100
candidates.append(
ScanCandidate(
stock_code=stock_code,
name=stock.get("name", stock_code),
price=stock.get("price", daily_prices[-1]["close"]),
volume=current_volume,
volume_ratio=volume_ratio,
rsi=rsi,
signal=signal,
score=score,
)
)
logger.info(
"Qualified: %s (%s) RSI=%.1f vol=%.1fx signal=%s score=%.1f",
stock_code,
stock.get("name", ""),
rsi,
volume_ratio,
signal,
score,
)
)
except ConnectionError as exc:
logger.warning("Failed to analyze %s: %s", stock_code, exc)
@@ -197,161 +176,10 @@ class SmartVolatilityScanner:
logger.error("Unexpected error analyzing %s: %s", stock_code, exc)
continue
logger.info("Domestic ranking scan found %d candidates", len(candidates))
# Sort by score and return top N
candidates.sort(key=lambda c: c.score, reverse=True)
return candidates[: self.top_n]
async def _scan_overseas(
self,
market: MarketInfo,
fallback_stocks: list[str] | None = None,
) -> list[ScanCandidate]:
"""Scan overseas symbols using ranking API first, then fallback universe."""
if self.overseas_broker is None:
logger.warning(
"Overseas scanner unavailable for %s: overseas broker not configured",
market.name,
)
return []
candidates = await self._scan_overseas_from_rankings(market)
if not candidates:
candidates = await self._scan_overseas_from_symbols(market, fallback_stocks)
candidates.sort(key=lambda c: c.score, reverse=True)
return candidates[: self.top_n]
async def _scan_overseas_from_rankings(
self,
market: MarketInfo,
) -> list[ScanCandidate]:
"""Build overseas candidates from ranking APIs using volatility-first scoring."""
assert self.overseas_broker is not None
try:
fluct_rows = await self.overseas_broker.fetch_overseas_rankings(
exchange_code=market.exchange_code,
ranking_type="fluctuation",
limit=50,
)
except Exception as exc:
logger.warning(
"Overseas fluctuation ranking failed for %s: %s", market.code, exc
)
fluct_rows = []
if not fluct_rows:
return []
volume_rank_bonus: dict[str, float] = {}
try:
volume_rows = await self.overseas_broker.fetch_overseas_rankings(
exchange_code=market.exchange_code,
ranking_type="volume",
limit=50,
)
except Exception as exc:
logger.warning(
"Overseas volume ranking failed for %s: %s", market.code, exc
)
volume_rows = []
for idx, row in enumerate(volume_rows):
code = _extract_stock_code(row)
if not code:
continue
# Top-ranked by traded value/volume gets higher liquidity bonus.
volume_rank_bonus[code] = max(0.0, 15.0 - idx * 0.3)
candidates: list[ScanCandidate] = []
for row in fluct_rows:
stock_code = _extract_stock_code(row)
if not stock_code:
continue
price = _extract_last_price(row)
change_rate = _extract_change_rate_pct(row)
volume = _extract_volume(row)
intraday_range_pct = _extract_intraday_range_pct(row, price)
volatility_pct = max(abs(change_rate), intraday_range_pct)
# Volatility-first filter (not simple gainers/value ranking).
if price <= 0 or volatility_pct < 0.8:
continue
volatility_score = min(volatility_pct / 10.0, 1.0) * 85.0
liquidity_score = volume_rank_bonus.get(stock_code, 0.0)
score = min(100.0, volatility_score + liquidity_score)
signal = "momentum" if change_rate >= 0 else "oversold"
implied_rsi = max(0.0, min(100.0, 50.0 + (change_rate * 4.0)))
candidates.append(
ScanCandidate(
stock_code=stock_code,
name=str(row.get("name") or row.get("ovrs_item_name") or stock_code),
price=price,
volume=volume,
volume_ratio=max(1.0, volatility_pct / 2.0),
rsi=implied_rsi,
signal=signal,
score=score,
)
)
if candidates:
logger.info(
"Overseas ranking scan found %d candidates for %s",
len(candidates),
market.name,
)
return candidates
async def _scan_overseas_from_symbols(
self,
market: MarketInfo,
symbols: list[str] | None,
) -> list[ScanCandidate]:
"""Fallback overseas scan from dynamic symbol universe."""
assert self.overseas_broker is not None
if not symbols:
logger.info("Overseas scanner: no symbol universe for %s", market.name)
return []
candidates: list[ScanCandidate] = []
for stock_code in symbols:
try:
price_data = await self.overseas_broker.get_overseas_price(
market.exchange_code, stock_code
)
output = price_data.get("output", {})
price = _extract_last_price(output)
change_rate = _extract_change_rate_pct(output)
volume = _extract_volume(output)
intraday_range_pct = _extract_intraday_range_pct(output, price)
volatility_pct = max(abs(change_rate), intraday_range_pct)
if price <= 0 or volatility_pct < 0.8:
continue
score = min(volatility_pct / 10.0, 1.0) * 100.0
signal = "momentum" if change_rate >= 0 else "oversold"
implied_rsi = max(0.0, min(100.0, 50.0 + (change_rate * 4.0)))
candidates.append(
ScanCandidate(
stock_code=stock_code,
name=stock_code,
price=price,
volume=volume,
volume_ratio=max(1.0, volatility_pct / 2.0),
rsi=implied_rsi,
signal=signal,
score=score,
)
)
except ConnectionError as exc:
logger.warning("Failed to analyze overseas %s: %s", stock_code, exc)
except Exception as exc:
logger.error("Unexpected error analyzing overseas %s: %s", stock_code, exc)
return candidates
def get_stock_codes(self, candidates: list[ScanCandidate]) -> list[str]:
"""Extract stock codes from candidates for watchlist update.
@@ -362,78 +190,3 @@ class SmartVolatilityScanner:
List of stock codes
"""
return [c.stock_code for c in candidates]
def _safe_float(value: Any, default: float = 0.0) -> float:
"""Convert arbitrary values to float safely."""
if value in (None, ""):
return default
try:
return float(value)
except (TypeError, ValueError):
return default
def _extract_stock_code(row: dict[str, Any]) -> str:
"""Extract normalized stock code from various API schemas."""
return (
str(
row.get("symb")
or row.get("ovrs_pdno")
or row.get("stock_code")
or row.get("pdno")
or ""
)
.strip()
.upper()
)
def _extract_last_price(row: dict[str, Any]) -> float:
"""Extract last/close-like price from API schema variants."""
return _safe_float(
row.get("last")
or row.get("ovrs_nmix_prpr")
or row.get("stck_prpr")
or row.get("price")
or row.get("close")
)
def _extract_change_rate_pct(row: dict[str, Any]) -> float:
"""Extract daily change rate (%) from API schema variants."""
return _safe_float(
row.get("rate")
or row.get("change_rate")
or row.get("prdy_ctrt")
or row.get("evlu_pfls_rt")
or row.get("chg_rt")
)
def _extract_volume(row: dict[str, Any]) -> float:
"""Extract volume/traded-amount proxy from schema variants."""
return _safe_float(
row.get("tvol") or row.get("acml_vol") or row.get("vol") or row.get("volume")
)
def _extract_intraday_range_pct(row: dict[str, Any], price: float) -> float:
"""Estimate intraday range percentage from high/low fields."""
if price <= 0:
return 0.0
high = _safe_float(
row.get("high")
or row.get("ovrs_hgpr")
or row.get("stck_hgpr")
or row.get("day_hgpr")
)
low = _safe_float(
row.get("low")
or row.get("ovrs_lwpr")
or row.get("stck_lwpr")
or row.get("day_lwpr")
)
if high <= 0 or low <= 0 or high < low:
return 0.0
return (high - low) / price * 100.0

View File

@@ -64,65 +64,6 @@ class OverseasBroker:
f"Network error fetching overseas price: {exc}"
) from exc
async def fetch_overseas_rankings(
self,
exchange_code: str,
ranking_type: str = "fluctuation",
limit: int = 30,
) -> list[dict[str, Any]]:
"""Fetch overseas rankings (price change or volume amount).
Ranking API specs may differ by account/product. Endpoint paths and
TR_IDs are configurable via settings and can be overridden in .env.
"""
if not self._broker._settings.OVERSEAS_RANKING_ENABLED:
return []
await self._broker._rate_limiter.acquire()
session = self._broker._get_session()
if ranking_type == "volume":
tr_id = self._broker._settings.OVERSEAS_RANKING_VOLUME_TR_ID
path = self._broker._settings.OVERSEAS_RANKING_VOLUME_PATH
else:
tr_id = self._broker._settings.OVERSEAS_RANKING_FLUCT_TR_ID
path = self._broker._settings.OVERSEAS_RANKING_FLUCT_PATH
headers = await self._broker._auth_headers(tr_id)
url = f"{self._broker._base_url}{path}"
# Try common param variants used by KIS overseas quotation APIs.
param_variants = [
{"AUTH": "", "EXCD": exchange_code, "NREC": str(max(limit, 30))},
{"AUTH": "", "OVRS_EXCG_CD": exchange_code, "NREC": str(max(limit, 30))},
{"AUTH": "", "EXCD": exchange_code},
{"AUTH": "", "OVRS_EXCG_CD": exchange_code},
]
last_error: str | None = None
for params in param_variants:
try:
async with session.get(url, headers=headers, params=params) as resp:
text = await resp.text()
if resp.status != 200:
last_error = f"HTTP {resp.status}: {text}"
continue
data = await resp.json()
rows = self._extract_ranking_rows(data)
if rows:
return rows[:limit]
# keep trying another param variant if response has no usable rows
last_error = f"empty output (keys={list(data.keys())})"
except (TimeoutError, aiohttp.ClientError) as exc:
last_error = str(exc)
continue
raise ConnectionError(
f"fetch_overseas_rankings failed for {exchange_code}/{ranking_type}: {last_error}"
)
async def get_overseas_balance(self, exchange_code: str) -> dict[str, Any]:
"""
Fetch overseas account balance.
@@ -257,11 +198,3 @@ class OverseasBroker:
"HSX": "VND",
}
return currency_map.get(exchange_code, "USD")
def _extract_ranking_rows(self, data: dict[str, Any]) -> list[dict[str, Any]]:
"""Extract list rows from ranking response across schema variants."""
candidates = [data.get("output"), data.get("output1"), data.get("output2")]
for value in candidates:
if isinstance(value, list):
return [row for row in value if isinstance(row, dict)]
return []

View File

@@ -38,11 +38,6 @@ class Settings(BaseSettings):
RSI_MOMENTUM_THRESHOLD: int = Field(default=70, ge=50, le=100)
VOL_MULTIPLIER: float = Field(default=2.0, gt=1.0, le=10.0)
SCANNER_TOP_N: int = Field(default=3, ge=1, le=10)
POSITION_SIZING_ENABLED: bool = True
POSITION_BASE_ALLOCATION_PCT: float = Field(default=5.0, gt=0.0, le=30.0)
POSITION_MIN_ALLOCATION_PCT: float = Field(default=1.0, gt=0.0, le=20.0)
POSITION_MAX_ALLOCATION_PCT: float = Field(default=10.0, gt=0.0, le=50.0)
POSITION_VOLATILITY_TARGET_SCORE: float = Field(default=50.0, gt=0.0, le=100.0)
# Database
DB_PATH: str = "data/trade_logs.db"
@@ -88,18 +83,6 @@ class Settings(BaseSettings):
TELEGRAM_COMMANDS_ENABLED: bool = True
TELEGRAM_POLLING_INTERVAL: float = 1.0 # seconds
# Overseas ranking API (KIS endpoint/TR_ID may vary by account/product)
# Override these from .env if your account uses different specs.
OVERSEAS_RANKING_ENABLED: bool = True
OVERSEAS_RANKING_FLUCT_TR_ID: str = "HHDFS76200100"
OVERSEAS_RANKING_VOLUME_TR_ID: str = "HHDFS76200200"
OVERSEAS_RANKING_FLUCT_PATH: str = (
"/uapi/overseas-price/v1/quotations/inquire-updown-rank"
)
OVERSEAS_RANKING_VOLUME_PATH: str = (
"/uapi/overseas-price/v1/quotations/inquire-volume-rank"
)
# Dashboard (optional)
DASHBOARD_ENABLED: bool = False
DASHBOARD_HOST: str = "127.0.0.1"

View File

@@ -235,21 +235,3 @@ def get_open_position(
if not row or row[0] != "BUY":
return None
return {"decision_id": row[1], "price": row[2], "quantity": row[3]}
def get_recent_symbols(
conn: sqlite3.Connection, market: str, limit: int = 30
) -> list[str]:
"""Return recent unique symbols for a market, newest first."""
cursor = conn.execute(
"""
SELECT stock_code, MAX(timestamp) AS last_ts
FROM trades
WHERE market = ?
GROUP BY stock_code
ORDER BY last_ts DESC
LIMIT ?
""",
(market, limit),
)
return [row[0] for row in cursor.fetchall() if row and row[0]]

View File

@@ -29,13 +29,7 @@ from src.context.store import ContextStore
from src.core.criticality import CriticalityAssessor
from src.core.priority_queue import PriorityTaskQueue
from src.core.risk_manager import CircuitBreakerTripped, FatFingerRejected, RiskManager
from src.db import (
get_latest_buy_trade,
get_open_position,
get_recent_symbols,
init_db,
log_trade,
)
from src.db import get_latest_buy_trade, get_open_position, init_db, log_trade
from src.evolution.daily_review import DailyReviewer
from src.evolution.optimizer import EvolutionOptimizer
from src.logging.decision_logger import DecisionLogger
@@ -87,102 +81,6 @@ DAILY_TRADE_SESSIONS = 4 # Number of trading sessions per day
TRADE_SESSION_INTERVAL_HOURS = 6 # Hours between sessions
def _extract_symbol_from_holding(item: dict[str, Any]) -> str:
"""Extract symbol from overseas holding payload variants."""
for key in (
"ovrs_pdno",
"pdno",
"ovrs_item_name",
"prdt_name",
"symb",
"symbol",
"stock_code",
):
value = item.get(key)
if isinstance(value, str):
symbol = value.strip().upper()
if symbol and symbol.replace(".", "").replace("-", "").isalnum():
return symbol
return ""
def _determine_order_quantity(
*,
action: str,
current_price: float,
total_cash: float,
candidate: ScanCandidate | None,
settings: Settings | None,
) -> int:
"""Determine order quantity using volatility-aware position sizing."""
if action != "BUY":
return 1
if current_price <= 0 or total_cash <= 0:
return 0
if settings is None or not settings.POSITION_SIZING_ENABLED:
return 1
target_score = max(1.0, settings.POSITION_VOLATILITY_TARGET_SCORE)
observed_score = candidate.score if candidate else target_score
observed_score = max(1.0, min(100.0, observed_score))
# Higher observed volatility score => smaller allocation.
scaled_pct = settings.POSITION_BASE_ALLOCATION_PCT * (target_score / observed_score)
allocation_pct = min(
settings.POSITION_MAX_ALLOCATION_PCT,
max(settings.POSITION_MIN_ALLOCATION_PCT, scaled_pct),
)
budget = total_cash * (allocation_pct / 100.0)
quantity = int(budget // current_price)
if quantity <= 0:
return 0
return quantity
async def build_overseas_symbol_universe(
db_conn: Any,
overseas_broker: OverseasBroker,
market: MarketInfo,
active_stocks: dict[str, list[str]],
) -> list[str]:
"""Build dynamic overseas symbol universe from runtime, DB, and holdings."""
symbols: list[str] = []
# 1) Keep current active stocks first to avoid sudden churn between cycles.
symbols.extend(active_stocks.get(market.code, []))
# 2) Add recent symbols from own trading history (no fixed list).
symbols.extend(get_recent_symbols(db_conn, market.code, limit=30))
# 3) Add current overseas holdings from broker balance if available.
try:
balance_data = await overseas_broker.get_overseas_balance(market.exchange_code)
output1 = balance_data.get("output1", [])
if isinstance(output1, dict):
output1 = [output1]
if isinstance(output1, list):
for row in output1:
if not isinstance(row, dict):
continue
symbol = _extract_symbol_from_holding(row)
if symbol:
symbols.append(symbol)
except Exception as exc:
logger.warning("Failed to build overseas holdings universe for %s: %s", market.code, exc)
seen: set[str] = set()
ordered_unique: list[str] = []
for symbol in symbols:
normalized = symbol.strip().upper()
if not normalized or normalized in seen:
continue
seen.add(normalized)
ordered_unique.append(normalized)
return ordered_unique
async def trading_cycle(
broker: KISBroker,
overseas_broker: OverseasBroker,
@@ -197,7 +95,6 @@ async def trading_cycle(
market: MarketInfo,
stock_code: str,
scan_candidates: dict[str, dict[str, ScanCandidate]],
settings: Settings | None = None,
) -> None:
"""Execute one trading cycle for a single stock."""
cycle_start_time = asyncio.get_event_loop().time()
@@ -435,23 +332,8 @@ async def trading_cycle(
trade_price = current_price
trade_pnl = 0.0
if decision.action in ("BUY", "SELL"):
quantity = _determine_order_quantity(
action=decision.action,
current_price=current_price,
total_cash=total_cash,
candidate=candidate,
settings=settings,
)
if quantity <= 0:
logger.info(
"Skip %s %s (%s): no affordable quantity (cash=%.2f, price=%.2f)",
decision.action,
stock_code,
market.name,
total_cash,
current_price,
)
return
# Determine order size (simplified: 1 lot)
quantity = 1
order_amount = current_price * quantity
# 4. Risk check BEFORE order
@@ -600,28 +482,8 @@ async def run_daily_session(
# Dynamic stock discovery via scanner (no static watchlists)
candidates_list: list[ScanCandidate] = []
fallback_stocks: list[str] | None = None
if not market.is_domestic:
fallback_stocks = await build_overseas_symbol_universe(
db_conn=db_conn,
overseas_broker=overseas_broker,
market=market,
active_stocks={},
)
if not fallback_stocks:
logger.warning(
"No dynamic overseas symbol universe for %s; scanner cannot run",
market.code,
)
try:
candidates_list = (
await smart_scanner.scan(
market=market,
fallback_stocks=fallback_stocks,
)
if smart_scanner
else []
)
candidates_list = await smart_scanner.scan() if smart_scanner else []
except Exception as exc:
logger.error("Smart Scanner failed for %s: %s", market.name, exc)
@@ -817,23 +679,7 @@ async def run_daily_session(
trade_price = stock_data["current_price"]
trade_pnl = 0.0
if decision.action in ("BUY", "SELL"):
quantity = _determine_order_quantity(
action=decision.action,
current_price=stock_data["current_price"],
total_cash=total_cash,
candidate=candidate_map.get(stock_code),
settings=settings,
)
if quantity <= 0:
logger.info(
"Skip %s %s (%s): no affordable quantity (cash=%.2f, price=%.2f)",
decision.action,
stock_code,
market.name,
total_cash,
stock_data["current_price"],
)
continue
quantity = 1
order_amount = stock_data["current_price"] * quantity
# Risk check
@@ -1417,7 +1263,6 @@ async def run(settings: Settings) -> None:
# Initialize smart scanner (Python-first, AI-last pipeline)
smart_scanner = SmartVolatilityScanner(
broker=broker,
overseas_broker=overseas_broker,
volatility_analyzer=volatility_analyzer,
settings=settings,
)
@@ -1597,25 +1442,7 @@ async def run(settings: Settings) -> None:
try:
logger.info("Smart Scanner: Scanning %s market", market.name)
fallback_stocks: list[str] | None = None
if not market.is_domestic:
fallback_stocks = await build_overseas_symbol_universe(
db_conn=db_conn,
overseas_broker=overseas_broker,
market=market,
active_stocks=active_stocks,
)
if not fallback_stocks:
logger.warning(
"No dynamic overseas symbol universe for %s;"
" scanner cannot run",
market.code,
)
candidates = await smart_scanner.scan(
market=market,
fallback_stocks=fallback_stocks,
)
candidates = await smart_scanner.scan()
if candidates:
# Use scanner results directly as trading candidates
@@ -1739,7 +1566,6 @@ async def run(settings: Settings) -> None:
market,
stock_code,
scan_candidates,
settings,
)
break # Success — exit retry loop
except CircuitBreakerTripped as exc:

View File

@@ -8,7 +8,6 @@ from unittest.mock import AsyncMock, MagicMock
from src.analysis.smart_scanner import ScanCandidate, SmartVolatilityScanner
from src.analysis.volatility import VolatilityAnalyzer
from src.broker.kis_api import KISBroker
from src.broker.overseas import OverseasBroker
from src.config import Settings
@@ -44,70 +43,61 @@ def scanner(mock_broker: MagicMock, mock_settings: Settings) -> SmartVolatilityS
analyzer = VolatilityAnalyzer()
return SmartVolatilityScanner(
broker=mock_broker,
overseas_broker=None,
volatility_analyzer=analyzer,
settings=mock_settings,
)
@pytest.fixture
def mock_overseas_broker() -> MagicMock:
"""Create mock overseas broker."""
broker = MagicMock(spec=OverseasBroker)
broker.get_overseas_price = AsyncMock()
broker.fetch_overseas_rankings = AsyncMock(return_value=[])
return broker
class TestSmartVolatilityScanner:
"""Test suite for SmartVolatilityScanner."""
@pytest.mark.asyncio
async def test_scan_domestic_prefers_volatility_with_liquidity_bonus(
async def test_scan_finds_oversold_candidates(
self, scanner: SmartVolatilityScanner, mock_broker: MagicMock
) -> None:
"""Domestic scan should score by volatility first and volume rank second."""
fluctuation_rows = [
"""Test that scanner identifies oversold stocks with high volume."""
# Mock rankings
mock_broker.fetch_market_rankings.return_value = [
{
"stock_code": "005930",
"name": "Samsung",
"price": 70000,
"volume": 5000000,
"change_rate": -5.0,
"change_rate": -3.5,
"volume_increase_rate": 250,
},
{
"stock_code": "035420",
"name": "NAVER",
"price": 250000,
"volume": 3000000,
"change_rate": 3.0,
"volume_increase_rate": 200,
},
]
volume_rows = [
{"stock_code": "035420", "name": "NAVER", "price": 250000, "volume": 3000000},
{"stock_code": "005930", "name": "Samsung", "price": 70000, "volume": 5000000},
]
mock_broker.fetch_market_rankings.side_effect = [fluctuation_rows, volume_rows]
mock_broker.get_daily_prices.return_value = [
{"open": 1, "high": 1, "low": 1, "close": 1, "volume": 1000000},
{"open": 1, "high": 1, "low": 1, "close": 1, "volume": 1000000},
]
# Mock daily prices - trending down (oversold)
prices = []
for i in range(20):
prices.append({
"date": f"2026020{i:02d}",
"open": 75000 - i * 200,
"high": 75500 - i * 200,
"low": 74500 - i * 200,
"close": 75000 - i * 250, # Steady decline
"volume": 2000000,
})
mock_broker.get_daily_prices.return_value = prices
candidates = await scanner.scan()
assert len(candidates) >= 1
# Samsung has higher absolute move, so it should lead despite lower volume rank bonus.
assert candidates[0].stock_code == "005930"
assert candidates[0].signal == "oversold"
# Should find at least one candidate (depending on exact RSI calculation)
mock_broker.fetch_market_rankings.assert_called_once()
mock_broker.get_daily_prices.assert_called_once_with("005930", days=20)
# If qualified, should have oversold signal
if candidates:
assert candidates[0].signal in ["oversold", "momentum"]
assert candidates[0].volume_ratio >= scanner.vol_multiplier
@pytest.mark.asyncio
async def test_scan_domestic_finds_momentum_candidate(
async def test_scan_finds_momentum_candidates(
self, scanner: SmartVolatilityScanner, mock_broker: MagicMock
) -> None:
"""Positive change should be represented as momentum signal."""
fluctuation_rows = [
"""Test that scanner identifies momentum stocks with high volume."""
mock_broker.fetch_market_rankings.return_value = [
{
"stock_code": "035420",
"name": "NAVER",
@@ -117,67 +107,124 @@ class TestSmartVolatilityScanner:
"volume_increase_rate": 300,
},
]
mock_broker.fetch_market_rankings.side_effect = [fluctuation_rows, fluctuation_rows]
mock_broker.get_daily_prices.return_value = [
{"open": 1, "high": 1, "low": 1, "close": 1, "volume": 1000000},
{"open": 1, "high": 1, "low": 1, "close": 1, "volume": 1000000},
]
# Mock daily prices - trending up (momentum)
prices = []
for i in range(20):
prices.append({
"date": f"2026020{i:02d}",
"open": 230000 + i * 500,
"high": 231000 + i * 500,
"low": 229000 + i * 500,
"close": 230500 + i * 500, # Steady rise
"volume": 1000000,
})
mock_broker.get_daily_prices.return_value = prices
candidates = await scanner.scan()
assert [c.stock_code for c in candidates] == ["035420"]
assert candidates[0].signal == "momentum"
mock_broker.fetch_market_rankings.assert_called_once()
@pytest.mark.asyncio
async def test_scan_domestic_filters_low_volatility(
async def test_scan_filters_low_volume(
self, scanner: SmartVolatilityScanner, mock_broker: MagicMock
) -> None:
"""Domestic scan should drop symbols below volatility threshold."""
fluctuation_rows = [
"""Test that stocks with low volume ratio are filtered out."""
mock_broker.fetch_market_rankings.return_value = [
{
"stock_code": "000660",
"name": "SK Hynix",
"price": 150000,
"volume": 500000,
"change_rate": 0.2,
"volume_increase_rate": 50,
"change_rate": -5.0,
"volume_increase_rate": 50, # Only 50% increase (< 200%)
},
]
mock_broker.fetch_market_rankings.side_effect = [fluctuation_rows, fluctuation_rows]
mock_broker.get_daily_prices.return_value = [
{"open": 1, "high": 150100, "low": 149900, "close": 150000, "volume": 1000000},
{"open": 1, "high": 150100, "low": 149900, "close": 150000, "volume": 1000000},
]
# Low volume
prices = []
for i in range(20):
prices.append({
"date": f"2026020{i:02d}",
"open": 150000 - i * 100,
"high": 151000 - i * 100,
"low": 149000 - i * 100,
"close": 150000 - i * 150, # Declining (would be oversold)
"volume": 1000000, # Current 500k < 2x prev day 1M
})
mock_broker.get_daily_prices.return_value = prices
candidates = await scanner.scan()
# Should be filtered out due to low volume ratio
assert len(candidates) == 0
@pytest.mark.asyncio
async def test_scan_filters_neutral_rsi(
self, scanner: SmartVolatilityScanner, mock_broker: MagicMock
) -> None:
"""Test that stocks with neutral RSI are filtered out."""
mock_broker.fetch_market_rankings.return_value = [
{
"stock_code": "051910",
"name": "LG Chem",
"price": 500000,
"volume": 3000000,
"change_rate": 0.5,
"volume_increase_rate": 300, # High volume
},
]
# Flat prices (neutral RSI ~50)
prices = []
for i in range(20):
prices.append({
"date": f"2026020{i:02d}",
"open": 500000 + (i % 2) * 100, # Small oscillation
"high": 500500,
"low": 499500,
"close": 500000 + (i % 2) * 50,
"volume": 1000000,
})
mock_broker.get_daily_prices.return_value = prices
candidates = await scanner.scan()
# Should be filtered out (RSI ~50, not < 30 or > 70)
assert len(candidates) == 0
@pytest.mark.asyncio
async def test_scan_uses_fallback_on_api_error(
self, scanner: SmartVolatilityScanner, mock_broker: MagicMock
) -> None:
"""Domestic scan should remain operational using fallback symbols."""
mock_broker.fetch_market_rankings.side_effect = [
ConnectionError("API unavailable"),
ConnectionError("API unavailable"),
]
mock_broker.get_daily_prices.return_value = [
{"open": 1, "high": 103, "low": 97, "close": 100, "volume": 1000000},
{"open": 1, "high": 103, "low": 97, "close": 100, "volume": 800000},
]
"""Test fallback to static list when ranking API fails."""
mock_broker.fetch_market_rankings.side_effect = ConnectionError("API unavailable")
# Fallback stocks should still be analyzed
prices = []
for i in range(20):
prices.append({
"date": f"2026020{i:02d}",
"open": 50000 - i * 50,
"high": 51000 - i * 50,
"low": 49000 - i * 50,
"close": 50000 - i * 75, # Declining
"volume": 1000000,
})
mock_broker.get_daily_prices.return_value = prices
candidates = await scanner.scan(fallback_stocks=["005930", "000660"])
# Should not crash
assert isinstance(candidates, list)
assert len(candidates) >= 1
@pytest.mark.asyncio
async def test_scan_returns_top_n_only(
self, scanner: SmartVolatilityScanner, mock_broker: MagicMock
) -> None:
"""Test that scan returns at most top_n candidates."""
fluctuation_rows = [
# Return many stocks
mock_broker.fetch_market_rankings.return_value = [
{
"stock_code": f"00{i}000",
"name": f"Stock{i}",
@@ -188,17 +235,62 @@ class TestSmartVolatilityScanner:
}
for i in range(1, 10)
]
mock_broker.fetch_market_rankings.side_effect = [fluctuation_rows, fluctuation_rows]
mock_broker.get_daily_prices.return_value = [
{"open": 1, "high": 105, "low": 95, "close": 100, "volume": 1000000},
{"open": 1, "high": 105, "low": 95, "close": 100, "volume": 900000},
]
# All oversold with high volume
def make_prices(code: str) -> list[dict]:
prices = []
for i in range(20):
prices.append({
"date": f"2026020{i:02d}",
"open": 10000 - i * 100,
"high": 10500 - i * 100,
"low": 9500 - i * 100,
"close": 10000 - i * 150,
"volume": 1000000,
})
return prices
mock_broker.get_daily_prices.side_effect = make_prices
candidates = await scanner.scan()
# Should respect top_n limit (3)
assert len(candidates) <= scanner.top_n
@pytest.mark.asyncio
async def test_scan_skips_insufficient_price_history(
self, scanner: SmartVolatilityScanner, mock_broker: MagicMock
) -> None:
"""Test that stocks with insufficient history are skipped."""
mock_broker.fetch_market_rankings.return_value = [
{
"stock_code": "005930",
"name": "Samsung",
"price": 70000,
"volume": 5000000,
"change_rate": -5.0,
"volume_increase_rate": 300,
},
]
# Only 5 days of data (need 15+ for RSI)
mock_broker.get_daily_prices.return_value = [
{
"date": f"2026020{i:02d}",
"open": 70000,
"high": 71000,
"low": 69000,
"close": 70000,
"volume": 2000000,
}
for i in range(5)
]
candidates = await scanner.scan()
# Should skip due to insufficient data
assert len(candidates) == 0
@pytest.mark.asyncio
async def test_get_stock_codes(
self, scanner: SmartVolatilityScanner
@@ -231,124 +323,6 @@ class TestSmartVolatilityScanner:
assert codes == ["005930", "035420"]
@pytest.mark.asyncio
async def test_scan_overseas_uses_dynamic_symbols(
self, mock_broker: MagicMock, mock_overseas_broker: MagicMock, mock_settings: Settings
) -> None:
"""Overseas scan should use provided dynamic universe symbols."""
analyzer = VolatilityAnalyzer()
scanner = SmartVolatilityScanner(
broker=mock_broker,
overseas_broker=mock_overseas_broker,
volatility_analyzer=analyzer,
settings=mock_settings,
)
market = MagicMock()
market.name = "NASDAQ"
market.code = "US_NASDAQ"
market.exchange_code = "NASD"
market.is_domestic = False
mock_overseas_broker.get_overseas_price.side_effect = [
{"output": {"last": "210.5", "rate": "1.6", "tvol": "1500000"}},
{"output": {"last": "330.1", "rate": "0.2", "tvol": "900000"}},
]
candidates = await scanner.scan(
market=market,
fallback_stocks=["AAPL", "MSFT"],
)
assert [c.stock_code for c in candidates] == ["AAPL"]
assert candidates[0].signal == "momentum"
assert candidates[0].price == 210.5
@pytest.mark.asyncio
async def test_scan_overseas_uses_ranking_api_first(
self, mock_broker: MagicMock, mock_overseas_broker: MagicMock, mock_settings: Settings
) -> None:
"""Overseas scan should prioritize ranking API when available."""
analyzer = VolatilityAnalyzer()
scanner = SmartVolatilityScanner(
broker=mock_broker,
overseas_broker=mock_overseas_broker,
volatility_analyzer=analyzer,
settings=mock_settings,
)
market = MagicMock()
market.name = "NASDAQ"
market.code = "US_NASDAQ"
market.exchange_code = "NASD"
market.is_domestic = False
mock_overseas_broker.fetch_overseas_rankings.return_value = [
{"symb": "NVDA", "last": "780.2", "rate": "2.4", "tvol": "1200000"},
{"symb": "MSFT", "last": "420.0", "rate": "0.3", "tvol": "900000"},
]
candidates = await scanner.scan(market=market, fallback_stocks=["AAPL", "TSLA"])
assert mock_overseas_broker.fetch_overseas_rankings.call_count >= 1
mock_overseas_broker.get_overseas_price.assert_not_called()
assert [c.stock_code for c in candidates] == ["NVDA"]
@pytest.mark.asyncio
async def test_scan_overseas_without_symbols_returns_empty(
self, mock_broker: MagicMock, mock_overseas_broker: MagicMock, mock_settings: Settings
) -> None:
"""Overseas scan should return empty list when no symbol universe exists."""
analyzer = VolatilityAnalyzer()
scanner = SmartVolatilityScanner(
broker=mock_broker,
overseas_broker=mock_overseas_broker,
volatility_analyzer=analyzer,
settings=mock_settings,
)
market = MagicMock()
market.name = "NASDAQ"
market.code = "US_NASDAQ"
market.exchange_code = "NASD"
market.is_domestic = False
candidates = await scanner.scan(market=market, fallback_stocks=[])
assert candidates == []
@pytest.mark.asyncio
async def test_scan_overseas_picks_high_intraday_range_even_with_low_change(
self, mock_broker: MagicMock, mock_overseas_broker: MagicMock, mock_settings: Settings
) -> None:
"""Volatility selection should consider intraday range, not only change rate."""
analyzer = VolatilityAnalyzer()
scanner = SmartVolatilityScanner(
broker=mock_broker,
overseas_broker=mock_overseas_broker,
volatility_analyzer=analyzer,
settings=mock_settings,
)
market = MagicMock()
market.name = "NASDAQ"
market.code = "US_NASDAQ"
market.exchange_code = "NASD"
market.is_domestic = False
# change rate is tiny, but high-low range is large (15%).
mock_overseas_broker.fetch_overseas_rankings.return_value = [
{
"symb": "ABCD",
"last": "100",
"rate": "0.2",
"high": "110",
"low": "95",
"tvol": "800000",
}
]
candidates = await scanner.scan(market=market, fallback_stocks=[])
assert [c.stock_code for c in candidates] == ["ABCD"]
class TestRSICalculation:
"""Test RSI calculation in VolatilityAnalyzer."""