Compare commits

..

22 Commits

Author SHA1 Message Date
agentson
7fd48c7764 feat: add strategy/playbook Pydantic models (issue #79)
Some checks failed
CI / test (pull_request) Has been cancelled
Define data contracts for the proactive strategy system:
- StockCondition: AND-combined condition fields (RSI, volume, price)
- StockScenario: condition-action rules with stop loss/take profit
- StockPlaybook: per-stock scenario collection
- GlobalRule: portfolio-level rules (e.g. REDUCE_ALL on loss limit)
- DayPlaybook: complete daily playbook per market with validation
- CrossMarketContext: cross-market awareness (KR↔US)
- ScenarioAction, MarketOutlook, PlaybookStatus enums

33 tests covering validation, serialization, edge cases.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 02:06:16 +09:00
a105bb7c1a Merge pull request 'feat: add pre-market planner config and remove static watchlists (issue #78)' (#98) from feature/issue-78-config-watchlist-removal into main
Some checks failed
CI / test (push) Has been cancelled
2026-02-08 02:04:23 +09:00
agentson
1a34a74232 feat: add pre-market planner config and remove static watchlists (issue #78)
Some checks failed
CI / test (pull_request) Has been cancelled
- Add pre-market planner settings: PRE_MARKET_MINUTES, MAX_SCENARIOS_PER_STOCK,
  PLANNER_TIMEOUT_SECONDS, DEFENSIVE_PLAYBOOK_ON_FAILURE, RESCAN_INTERVAL_SECONDS
- Change ENABLED_MARKETS default from KR to KR,US
- Remove static WATCHLISTS and STOCK_UNIVERSE dictionaries from main.py
- Replace watchlist-based trading with dynamic scanner-only stock discovery
- SmartVolatilityScanner is now the sole source of trading candidates
- Add active_stocks dict for scanner-discovered stocks per market
- Add smart_scanner parameter to run_daily_session()

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 01:58:09 +09:00
a82a167915 Merge pull request 'feat: implement Smart Volatility Scanner (issue #76)' (#77) from feature/issue-76-smart-volatility-scanner into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #77
2026-02-06 07:43:54 +09:00
agentson
7725e7a8de docs: update documentation for Smart Volatility Scanner
Some checks failed
CI / test (pull_request) Has been cancelled
Update project documentation to reflect new Smart Volatility Scanner feature:

## CLAUDE.md
- Add Smart Volatility Scanner section with configuration guide
- Update project structure to include analysis/ module
- Update test count (273→343 tests)

## docs/architecture.md
- Add Analysis component (VolatilityAnalyzer + SmartVolatilityScanner)
- Add new KIS API methods (fetch_market_rankings, get_daily_prices)
- Update data flow diagram to show Python-first filtering pipeline
- Add selection_context to database schema documentation
- Add Smart Scanner configuration section
- Renumber components (Brain 2→3, Risk Manager 3→4, etc.)

## docs/requirements-log.md
- Document 2026-02-06 requirement for Smart Volatility Scanner
- Explain Python-First, AI-Last pipeline rationale
- Record implementation details and benefits
- Reference issue #76 and PR #77

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-06 07:35:25 +09:00
agentson
f0ae25c533 feat: implement Smart Volatility Scanner with RSI/volume filters (issue #76)
Some checks failed
CI / test (pull_request) Has been cancelled
Add Python-first scanning pipeline that reduces Gemini API calls by filtering
stocks before AI analysis: KIS rankings API -> RSI/volume filter -> AI judgment.

## Implementation
- Add RSI calculation (Wilder's smoothing method) to VolatilityAnalyzer
- Add KIS API methods: fetch_market_rankings() and get_daily_prices()
- Create SmartVolatilityScanner with configurable thresholds
- Integrate scanner into main.py realtime mode
- Add selection_context logging to trades table for Evolution system

## Configuration
- RSI_OVERSOLD_THRESHOLD: 30 (configurable 0-50)
- RSI_MOMENTUM_THRESHOLD: 70 (configurable 50-100)
- VOL_MULTIPLIER: 2.0 (minimum volume ratio, configurable 1-10)
- SCANNER_TOP_N: 3 (max candidates per scan, configurable 1-10)

## Benefits
- Reduces Gemini API calls (process 1-3 qualified stocks vs 20-30 ranked)
- Python-based technical filtering before expensive AI judgment
- Tracks selection criteria (RSI, volume_ratio, signal, score) for strategy optimization
- Graceful fallback to static watchlist if ranking API fails

## Tests
- 13 new tests for SmartVolatilityScanner and RSI calculation
- All existing tests updated and passing
- Coverage maintained at 73%

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-06 00:48:23 +09:00
27f581f17d Merge pull request 'fix: resolve Telegram command handler errors for /status and /positions (issue #74)' (#75) from feature/issue-74-telegram-command-fix into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #75
2026-02-05 18:56:24 +09:00
agentson
18a098d9a6 fix: resolve Telegram command handler errors for /status and /positions (issue #74)
Some checks failed
CI / test (pull_request) Has been cancelled
Fixed AttributeError exceptions in /status and /positions commands:
- Replaced invalid risk.calculate_pnl() with inline P&L calculation from balance dict
- Changed risk.circuit_breaker_threshold to risk._cb_threshold
- Replaced balance.stocks access with account summary from output2 dict
- Updated tests to match new account summary format

All 27 telegram command tests pass. Live bot testing confirms no errors.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-05 18:54:42 +09:00
d2b07326ed Merge pull request 'fix: remove /start command and handle @botname suffix (issue #71)' (#72) from fix/start-command-parsing into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #72
2026-02-05 17:15:14 +09:00
agentson
1c5eadc23b fix: remove /start command and handle @botname suffix
Some checks failed
CI / test (pull_request) Has been cancelled
Remove /start command as name doesn't match functionality, and fix
command parsing to handle @botname suffix for group chat compatibility.

Changes:
- Remove handle_start function and registration
- Remove /start from help command list
- Remove test_start_command_content test
- Strip @botname suffix from commands (e.g., /help@mybot → help)

Rationale:
- /start command name implies bot initialization, but it was just
  showing help text (duplicate of /help)
- Better to have one clear /help command
- @botname suffix handling needed for group chats

Test:
- 27 tests pass (1 removed, 1 added for @botname handling)
- All existing functionality preserved

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-05 15:59:07 +09:00
10ff718045 Merge pull request 'feat: add configuration and documentation for Telegram commands (issue #69)' (#70) from feature/issue-69-config-docs into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #70
2026-02-05 15:50:52 +09:00
agentson
0ca3fe9f5d feat: add configuration and documentation for Telegram commands (issue #69)
Some checks failed
CI / test (pull_request) Has been cancelled
Add configuration options and comprehensive documentation for the new
bidirectional command feature.

Changes:
- Add TELEGRAM_COMMANDS_ENABLED to config.py
- Add TELEGRAM_POLLING_INTERVAL to config.py
- Add extensive "Bidirectional Commands" section to README.md

Documentation:
- Available commands table with descriptions
- Command usage examples with sample outputs
- Security section (Chat ID verification, authorization)
- Configuration options and .env examples
- How it works (long polling, authentication flow)
- Error handling and troubleshooting guide

Features:
- Optional command support (can disable while keeping notifications)
- Configurable polling interval
- Complete security documentation
- Troubleshooting guide for common issues

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-05 15:39:02 +09:00
462f8763ab Merge pull request 'feat: implement status query commands /status and /positions (issue #67)' (#68) from feature/issue-67-status-commands into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #68
2026-02-05 15:34:16 +09:00
agentson
57a45a24cb feat: implement status query commands /status and /positions (issue #67)
Some checks failed
CI / test (pull_request) Has been cancelled
Add real-time status and portfolio monitoring via Telegram.

Changes:
- Implement /status handler (mode, markets, P&L, trading state)
- Implement /positions handler (holdings with grouping by market)
- Integrate with Broker API and RiskManager
- Add 5 comprehensive tests for status commands

Features:
- /status: Shows trading mode, enabled markets, pause state, P&L, circuit breaker
- /positions: Lists holdings grouped by market (domestic/overseas)
- Error handling: Graceful degradation on API failures
- Empty state: Handles portfolios with no positions

Integration:
- Uses broker.get_balance() for account data
- Uses risk.calculate_pnl() for P&L calculation
- Accesses pause_trading.is_set() for trading state
- Groups positions by market for better readability

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-05 15:29:52 +09:00
a7696568cc Merge pull request 'feat: implement trading control commands /stop and /resume (issue #65)' (#66) from feature/issue-65-trading-control into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #66
2026-02-05 15:17:35 +09:00
agentson
70701bf73a feat: implement trading control commands /stop and /resume (issue #65)
Some checks failed
CI / test (pull_request) Has been cancelled
Add pause/resume functionality for remote trading control via Telegram.

Changes:
- Add pause_trading Event to main.py
- Implement /stop handler (pause trading)
- Implement /resume handler (resume trading)
- Integrate pause logic into both daily and realtime trading loops
- Add 4 comprehensive tests for trading control

Features:
- /stop: Pauses all trading operations
- /resume: Resumes trading operations
- Idempotent: Handles repeated stop/resume gracefully
- Status feedback: Informs if already paused/active
- Works in both daily and realtime trading modes

Security:
- Commands verified by TelegramCommandHandler chat_id check
- Only authorized users can control trading

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-05 14:40:19 +09:00
20dbd94892 Merge pull request 'feat: implement basic commands /start and /help (issue #63)' (#64) from feature/issue-63-basic-commands into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #64
2026-02-05 13:56:51 +09:00
agentson
48a99962e3 feat: implement basic commands /start and /help (issue #63)
Some checks failed
CI / test (pull_request) Has been cancelled
Integrate TelegramCommandHandler into main.py and implement
welcome and help commands.

Changes:
- Import TelegramCommandHandler in main.py
- Initialize command handler and register /start and /help
- Start/stop command handler with proper lifecycle management
- Add tests for command content validation

Features:
- /start: Welcome message with bot introduction
- /help: Complete command reference
- Handlers respond with HTML-formatted messages
- Clean startup/shutdown integration

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-05 13:55:52 +09:00
ee66ecc305 Merge pull request 'feat: implement TelegramCommandHandler core structure (issue #61)' (#62) from feature/issue-61-command-handler into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #62
2026-02-05 13:51:18 +09:00
agentson
065c9daaad feat: implement TelegramCommandHandler core structure (issue #61)
Some checks failed
CI / test (pull_request) Has been cancelled
Add TelegramCommandHandler class with long polling, command routing,
and security features.

Changes:
- Add TelegramCommandHandler class to telegram_client.py
- Implement long polling with getUpdates API
- Add command registration and routing mechanism
- Implement chat ID verification for security
- Add comprehensive tests (16 tests)
- Coverage: 85% for telegram_client.py

Features:
- start_polling() / stop_polling() lifecycle management
- register_command() for handler registration
- Chat ID verification to prevent unauthorized access
- Error isolation (command failures don't crash system)
- Graceful handling of API errors and timeouts

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-05 13:47:27 +09:00
c76b9d5c15 Merge pull request 'feat: add generic send_message method to TelegramClient (issue #59)' (#60) from feature/issue-59-send-message into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #60
2026-02-05 13:40:06 +09:00
8e715c55cd Merge pull request 'feat: 일일 거래 모드 + 요구사항 문서화 체계 (issue #57)' (#58) from feature/issue-57-daily-trading-mode into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #58
2026-02-05 09:49:26 +09:00
18 changed files with 2811 additions and 65 deletions

View File

@@ -45,6 +45,39 @@ Get real-time alerts for trades, circuit breakers, and system events via Telegra
**Fail-safe**: Notifications never crash the trading system. Missing credentials or API errors are logged but trading continues normally. **Fail-safe**: Notifications never crash the trading system. Missing credentials or API errors are logged but trading continues normally.
## Smart Volatility Scanner (Optional)
Python-first filtering pipeline that reduces Gemini API calls by pre-filtering stocks using technical indicators.
### How It Works
1. **Fetch Rankings** — KIS API volume surge rankings (top 30 stocks)
2. **Python Filter** — RSI + volume ratio calculations (no AI)
- Volume > 200% of previous day
- RSI(14) < 30 (oversold) OR RSI(14) > 70 (momentum)
3. **AI Judgment** — Only qualified candidates (1-3 stocks) sent to Gemini
### Configuration
Add to `.env` (optional, has sensible defaults):
```bash
RSI_OVERSOLD_THRESHOLD=30 # 0-50, default 30
RSI_MOMENTUM_THRESHOLD=70 # 50-100, default 70
VOL_MULTIPLIER=2.0 # Volume threshold (2.0 = 200%)
SCANNER_TOP_N=3 # Max candidates per scan
```
### Benefits
- **Reduces API costs** — Process 1-3 stocks instead of 20-30
- **Python-based filtering** — Fast technical analysis before AI
- **Evolution-ready** — Selection context logged for strategy optimization
- **Fault-tolerant** — Falls back to static watchlist on API failure
### Realtime Mode Only
Smart Scanner runs in `TRADE_MODE=realtime` only. Daily mode uses static watchlists for batch efficiency.
## Documentation ## Documentation
- **[Workflow Guide](docs/workflow.md)** — Git workflow policy and agent-based development - **[Workflow Guide](docs/workflow.md)** — Git workflow policy and agent-based development
@@ -75,6 +108,7 @@ User requirements and feedback are tracked in [docs/requirements-log.md](docs/re
``` ```
src/ src/
├── analysis/ # Technical analysis (RSI, volatility, smart scanner)
├── broker/ # KIS API client (domestic + overseas) ├── broker/ # KIS API client (domestic + overseas)
├── brain/ # Gemini AI decision engine ├── brain/ # Gemini AI decision engine
├── core/ # Risk manager (READ-ONLY) ├── core/ # Risk manager (READ-ONLY)
@@ -85,7 +119,7 @@ src/
├── main.py # Trading loop orchestrator ├── main.py # Trading loop orchestrator
└── config.py # Settings (from .env) └── config.py # Settings (from .env)
tests/ # 273 tests across 13 files tests/ # 343 tests across 14 files
docs/ # Extended documentation docs/ # Extended documentation
``` ```

View File

@@ -64,7 +64,39 @@ High-frequency trading with individual stock analysis:
- `get_open_markets()` returns currently active markets - `get_open_markets()` returns currently active markets
- `get_next_market_open()` finds next market to open and when - `get_next_market_open()` finds next market to open and when
### 2. Brain (`src/brain/gemini_client.py`) **New API Methods** (added in v0.9.0):
- `fetch_market_rankings()` — Fetch volume surge rankings from KIS API
- `get_daily_prices()` — Fetch OHLCV history for technical analysis
### 2. Analysis (`src/analysis/`)
**VolatilityAnalyzer** (`volatility.py`) — Technical indicator calculations
- ATR (Average True Range) for volatility measurement
- RSI (Relative Strength Index) using Wilder's smoothing method
- Price change percentages across multiple timeframes
- Volume surge ratios and price-volume divergence
- Momentum scoring (0-100 scale)
- Breakout/breakdown pattern detection
**SmartVolatilityScanner** (`smart_scanner.py`) — Python-first filtering pipeline
- **Step 1**: Fetch volume rankings from KIS API (top 30 stocks)
- **Step 2**: Calculate RSI and volume ratio for each stock
- **Step 3**: Apply filters:
- Volume ratio >= `VOL_MULTIPLIER` (default 2.0x previous day)
- RSI < `RSI_OVERSOLD_THRESHOLD` (30) OR RSI > `RSI_MOMENTUM_THRESHOLD` (70)
- **Step 4**: Score candidates by RSI extremity (60%) + volume surge (40%)
- **Step 5**: Return top N candidates (default 3) for AI analysis
- **Fallback**: Uses static watchlist if ranking API unavailable
- **Realtime mode only**: Daily mode uses batch processing for API efficiency
**Benefits:**
- Reduces Gemini API calls from 20-30 stocks to 1-3 qualified candidates
- Fast Python-based filtering before expensive AI judgment
- Logs selection context (RSI, volume_ratio, signal, score) for Evolution system
### 3. Brain (`src/brain/gemini_client.py`)
**GeminiClient** — AI decision engine powered by Google Gemini **GeminiClient** — AI decision engine powered by Google Gemini
@@ -74,7 +106,7 @@ High-frequency trading with individual stock analysis:
- Falls back to safe HOLD on any parse/API error - Falls back to safe HOLD on any parse/API error
- Handles markdown-wrapped JSON, malformed responses, invalid actions - Handles markdown-wrapped JSON, malformed responses, invalid actions
### 3. Risk Manager (`src/core/risk_manager.py`) ### 4. Risk Manager (`src/core/risk_manager.py`)
**RiskManager** — Safety circuit breaker and order validation **RiskManager** — Safety circuit breaker and order validation
@@ -86,7 +118,7 @@ High-frequency trading with individual stock analysis:
- **Fat-Finger Protection**: Rejects orders exceeding 30% of available cash - **Fat-Finger Protection**: Rejects orders exceeding 30% of available cash
- Must always be enforced, cannot be disabled - Must always be enforced, cannot be disabled
### 4. Notifications (`src/notifications/telegram_client.py`) ### 5. Notifications (`src/notifications/telegram_client.py`)
**TelegramClient** — Real-time event notifications via Telegram Bot API **TelegramClient** — Real-time event notifications via Telegram Bot API
@@ -105,7 +137,7 @@ High-frequency trading with individual stock analysis:
**Setup:** See [src/notifications/README.md](../src/notifications/README.md) for bot creation and configuration. **Setup:** See [src/notifications/README.md](../src/notifications/README.md) for bot creation and configuration.
### 5. Evolution (`src/evolution/optimizer.py`) ### 6. Evolution (`src/evolution/optimizer.py`)
**StrategyOptimizer** — Self-improvement loop **StrategyOptimizer** — Self-improvement loop
@@ -117,9 +149,11 @@ High-frequency trading with individual stock analysis:
## Data Flow ## Data Flow
### Realtime Mode (with Smart Scanner)
``` ```
┌─────────────────────────────────────────────────────────────┐ ┌─────────────────────────────────────────────────────────────┐
│ Main Loop (60s cycle per stock, per market) │ Main Loop (60s cycle per market)
└─────────────────────────────────────────────────────────────┘ └─────────────────────────────────────────────────────────────┘
@@ -132,6 +166,21 @@ High-frequency trading with individual stock analysis:
┌──────────────────────────────────┐ ┌──────────────────────────────────┐
│ Smart Scanner (Python-first) │
│ - Fetch volume rankings (KIS) │
│ - Get 20d price history per stock│
│ - Calculate RSI(14) + vol ratio │
│ - Filter: vol>2x AND RSI extreme │
│ - Return top 3 qualified stocks │
└──────────────────┬────────────────┘
┌──────────────────────────────────┐
│ For Each Qualified Candidate │
└──────────────────┬────────────────┘
┌──────────────────────────────────┐
│ Broker: Fetch Market Data │ │ Broker: Fetch Market Data │
│ - Domestic: orderbook + balance │ │ - Domestic: orderbook + balance │
│ - Overseas: price + balance │ │ - Overseas: price + balance │
@@ -145,7 +194,7 @@ High-frequency trading with individual stock analysis:
┌──────────────────────────────────┐ ┌──────────────────────────────────┐
│ Brain: Get Decision │ Brain: Get Decision (AI)
│ - Build prompt with market data │ │ - Build prompt with market data │
│ - Call Gemini API │ │ - Call Gemini API │
│ - Parse JSON response │ │ - Parse JSON response │
@@ -181,6 +230,9 @@ High-frequency trading with individual stock analysis:
│ - SQLite (data/trades.db) │ │ - SQLite (data/trades.db) │
│ - Track: action, confidence, │ │ - Track: action, confidence, │
│ rationale, market, exchange │ │ rationale, market, exchange │
│ - NEW: selection_context (JSON) │
│ - RSI, volume_ratio, signal │
│ - For Evolution optimization │
└───────────────────────────────────┘ └───────────────────────────────────┘
``` ```
@@ -200,11 +252,24 @@ CREATE TABLE trades (
price REAL, price REAL,
pnl REAL DEFAULT 0.0, pnl REAL DEFAULT 0.0,
market TEXT DEFAULT 'KR', -- KR | US_NASDAQ | JP | etc. market TEXT DEFAULT 'KR', -- KR | US_NASDAQ | JP | etc.
exchange_code TEXT DEFAULT 'KRX' -- KRX | NASD | NYSE | etc. exchange_code TEXT DEFAULT 'KRX', -- KRX | NASD | NYSE | etc.
selection_context TEXT -- JSON: {rsi, volume_ratio, signal, score}
); );
``` ```
Auto-migration: Adds `market` and `exchange_code` columns if missing for backward compatibility. **Selection Context** (new in v0.9.0): Stores scanner selection criteria as JSON:
```json
{
"rsi": 28.5,
"volume_ratio": 2.7,
"signal": "oversold",
"score": 85.2
}
```
Enables Evolution system to analyze correlation between selection criteria and trade outcomes.
Auto-migration: Adds `market`, `exchange_code`, and `selection_context` columns if missing for backward compatibility.
## Configuration ## Configuration
@@ -236,6 +301,12 @@ SESSION_INTERVAL_HOURS=6 # Hours between sessions (daily mode only)
TELEGRAM_BOT_TOKEN=1234567890:ABCdefGHIjklMNOpqrsTUVwxyz TELEGRAM_BOT_TOKEN=1234567890:ABCdefGHIjklMNOpqrsTUVwxyz
TELEGRAM_CHAT_ID=123456789 TELEGRAM_CHAT_ID=123456789
TELEGRAM_ENABLED=true TELEGRAM_ENABLED=true
# Smart Scanner (optional, realtime mode only)
RSI_OVERSOLD_THRESHOLD=30 # 0-50, oversold threshold
RSI_MOMENTUM_THRESHOLD=70 # 50-100, momentum threshold
VOL_MULTIPLIER=2.0 # Minimum volume ratio (2.0 = 200%)
SCANNER_TOP_N=3 # Max qualified candidates per scan
``` ```
Tests use in-memory SQLite (`DB_PATH=":memory:"`) and dummy credentials via `tests/conftest.py`. Tests use in-memory SQLite (`DB_PATH=":memory:"`) and dummy credentials via `tests/conftest.py`.

View File

@@ -26,3 +26,41 @@
### 문서화 ### 문서화
- 시스템 구조, 기능별 설명 등 코드 문서화 항상 신경쓸 것 - 시스템 구조, 기능별 설명 등 코드 문서화 항상 신경쓸 것
- 새로운 기능 추가 시 관련 문서 업데이트 필수 - 새로운 기능 추가 시 관련 문서 업데이트 필수
---
## 2026-02-06
### Smart Volatility Scanner (Python-First, AI-Last 파이프라인)
**배경:**
- 정적 종목 리스트를 순회하는 방식은 비효율적
- KIS API 거래량 순위를 통해 시장 주도주를 자동 탐지해야 함
- Gemini API 호출 전에 Python 기반 기술적 분석으로 필터링 필요
**요구사항:**
1. KIS API 거래량 순위 API 통합 (`fetch_market_rankings`)
2. 일별 가격 히스토리 API 추가 (`get_daily_prices`)
3. RSI(14) 계산 기능 구현 (Wilder's smoothing method)
4. 필터 조건:
- 거래량 > 전일 대비 200% (VOL_MULTIPLIER)
- RSI < 30 (과매도) OR RSI > 70 (모멘텀)
5. 상위 1-3개 적격 종목만 Gemini에 전달
6. 종목 선정 배경(RSI, volume_ratio, signal, score) 데이터베이스 기록
**구현 결과:**
- `src/analysis/smart_scanner.py`: SmartVolatilityScanner 클래스
- `src/analysis/volatility.py`: calculate_rsi() 메서드 추가
- `src/broker/kis_api.py`: 2개 신규 API 메서드
- `src/db.py`: selection_context 컬럼 추가
- 설정 가능한 임계값: RSI_OVERSOLD_THRESHOLD, RSI_MOMENTUM_THRESHOLD, VOL_MULTIPLIER, SCANNER_TOP_N
**효과:**
- Gemini API 호출 20-30개 → 1-3개로 감소
- Python 기반 빠른 필터링 → 비용 절감
- 선정 기준 추적 → Evolution 시스템 최적화 가능
- API 장애 시 정적 watchlist로 자동 전환
**참고:** Realtime 모드 전용. Daily 모드는 배치 효율성을 위해 정적 watchlist 사용.
**이슈/PR:** #76, #77

View File

@@ -3,6 +3,7 @@
from __future__ import annotations from __future__ import annotations
from src.analysis.scanner import MarketScanner from src.analysis.scanner import MarketScanner
from src.analysis.smart_scanner import ScanCandidate, SmartVolatilityScanner
from src.analysis.volatility import VolatilityAnalyzer from src.analysis.volatility import VolatilityAnalyzer
__all__ = ["VolatilityAnalyzer", "MarketScanner"] __all__ = ["VolatilityAnalyzer", "MarketScanner", "SmartVolatilityScanner", "ScanCandidate"]

View File

@@ -0,0 +1,192 @@
"""Smart Volatility Scanner with RSI and volume filters.
Fetches market rankings from KIS API and applies technical filters
to identify high-probability trading candidates.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import Any
from src.analysis.volatility import VolatilityAnalyzer
from src.broker.kis_api import KISBroker
from src.config import Settings
logger = logging.getLogger(__name__)
@dataclass
class ScanCandidate:
"""A qualified candidate from the smart scanner."""
stock_code: str
name: str
price: float
volume: float
volume_ratio: float # Current volume / previous day volume
rsi: float
signal: str # "oversold" or "momentum"
score: float # Composite score for ranking
class SmartVolatilityScanner:
"""Scans market rankings and applies RSI/volume filters.
Flow:
1. Fetch volume rankings from KIS API
2. For each ranked stock, fetch daily prices
3. Calculate RSI and volume ratio
4. Apply filters: volume > VOL_MULTIPLIER AND (RSI < 30 OR RSI > 70)
5. Return top N qualified candidates
"""
def __init__(
self,
broker: KISBroker,
volatility_analyzer: VolatilityAnalyzer,
settings: Settings,
) -> None:
"""Initialize the smart scanner.
Args:
broker: KIS broker for API calls
volatility_analyzer: Analyzer for RSI calculation
settings: Application settings
"""
self.broker = broker
self.analyzer = volatility_analyzer
self.settings = settings
# Extract scanner settings
self.rsi_oversold = settings.RSI_OVERSOLD_THRESHOLD
self.rsi_momentum = settings.RSI_MOMENTUM_THRESHOLD
self.vol_multiplier = settings.VOL_MULTIPLIER
self.top_n = settings.SCANNER_TOP_N
async def scan(
self,
fallback_stocks: list[str] | None = None,
) -> list[ScanCandidate]:
"""Execute smart scan and return qualified candidates.
Args:
fallback_stocks: Stock codes to use if ranking API fails
Returns:
List of ScanCandidate, sorted by score, up to top_n items
"""
# Step 1: Fetch rankings
try:
rankings = await self.broker.fetch_market_rankings(
ranking_type="volume",
limit=30, # Fetch more than needed for filtering
)
logger.info("Fetched %d stocks from volume rankings", len(rankings))
except ConnectionError as exc:
logger.warning("Ranking API failed, using fallback: %s", exc)
if fallback_stocks:
# Create minimal ranking data for fallback
rankings = [
{
"stock_code": code,
"name": code,
"price": 0,
"volume": 0,
"change_rate": 0,
"volume_increase_rate": 0,
}
for code in fallback_stocks
]
else:
return []
# Step 2: Analyze each stock
candidates: list[ScanCandidate] = []
for stock in rankings:
stock_code = stock["stock_code"]
if not stock_code:
continue
try:
# Fetch daily prices for RSI calculation
daily_prices = await self.broker.get_daily_prices(stock_code, days=20)
if len(daily_prices) < 15: # Need at least 14+1 for RSI
logger.debug("Insufficient price history for %s", stock_code)
continue
# Calculate RSI
close_prices = [p["close"] for p in daily_prices]
rsi = self.analyzer.calculate_rsi(close_prices, period=14)
# Calculate volume ratio (today vs previous day avg)
if len(daily_prices) >= 2:
prev_day_volume = daily_prices[-2]["volume"]
current_volume = stock.get("volume", 0) or daily_prices[-1]["volume"]
volume_ratio = (
current_volume / prev_day_volume if prev_day_volume > 0 else 1.0
)
else:
volume_ratio = stock.get("volume_increase_rate", 0) / 100 + 1 # Fallback
# Apply filters
volume_qualified = volume_ratio >= self.vol_multiplier
rsi_oversold = rsi < self.rsi_oversold
rsi_momentum = rsi > self.rsi_momentum
if volume_qualified and (rsi_oversold or rsi_momentum):
signal = "oversold" if rsi_oversold else "momentum"
# Calculate composite score
# Higher score for: extreme RSI + high volume
rsi_extremity = abs(rsi - 50) / 50 # 0-1 scale
volume_score = min(volume_ratio / 5, 1.0) # Cap at 5x
score = (rsi_extremity * 0.6 + volume_score * 0.4) * 100
candidates.append(
ScanCandidate(
stock_code=stock_code,
name=stock.get("name", stock_code),
price=stock.get("price", daily_prices[-1]["close"]),
volume=current_volume,
volume_ratio=volume_ratio,
rsi=rsi,
signal=signal,
score=score,
)
)
logger.info(
"Qualified: %s (%s) RSI=%.1f vol=%.1fx signal=%s score=%.1f",
stock_code,
stock.get("name", ""),
rsi,
volume_ratio,
signal,
score,
)
except ConnectionError as exc:
logger.warning("Failed to analyze %s: %s", stock_code, exc)
continue
except Exception as exc:
logger.error("Unexpected error analyzing %s: %s", stock_code, exc)
continue
# Sort by score and return top N
candidates.sort(key=lambda c: c.score, reverse=True)
return candidates[: self.top_n]
def get_stock_codes(self, candidates: list[ScanCandidate]) -> list[str]:
"""Extract stock codes from candidates for watchlist update.
Args:
candidates: List of scan candidates
Returns:
List of stock codes
"""
return [c.stock_code for c in candidates]

View File

@@ -124,6 +124,54 @@ class VolatilityAnalyzer:
return 1.0 return 1.0
return current_volume / avg_volume return current_volume / avg_volume
def calculate_rsi(
self,
close_prices: list[float],
period: int = 14,
) -> float:
"""Calculate Relative Strength Index (RSI) using Wilder's smoothing.
Args:
close_prices: List of closing prices (oldest to newest, minimum period+1 values)
period: RSI period (default 14)
Returns:
RSI value between 0 and 100, or 50.0 (neutral) if insufficient data
Examples:
>>> analyzer = VolatilityAnalyzer()
>>> prices = [100 - i * 0.5 for i in range(20)] # Downtrend
>>> rsi = analyzer.calculate_rsi(prices)
>>> assert rsi < 50 # Oversold territory
"""
if len(close_prices) < period + 1:
return 50.0 # Neutral RSI if insufficient data
# Calculate price changes
changes = [close_prices[i] - close_prices[i - 1] for i in range(1, len(close_prices))]
# Separate gains and losses
gains = [max(0.0, change) for change in changes]
losses = [max(0.0, -change) for change in changes]
# Calculate initial average gain/loss (simple average for first period)
avg_gain = sum(gains[:period]) / period
avg_loss = sum(losses[:period]) / period
# Apply Wilder's smoothing for remaining periods
for i in range(period, len(changes)):
avg_gain = (avg_gain * (period - 1) + gains[i]) / period
avg_loss = (avg_loss * (period - 1) + losses[i]) / period
# Calculate RS and RSI
if avg_loss == 0:
return 100.0 # All gains, maximum RSI
rs = avg_gain / avg_loss
rsi = 100 - (100 / (1 + rs))
return rsi
def calculate_pv_divergence( def calculate_pv_divergence(
self, self,
price_change: float, price_change: float,

View File

@@ -280,3 +280,153 @@ class KISBroker:
return data return data
except (TimeoutError, aiohttp.ClientError) as exc: except (TimeoutError, aiohttp.ClientError) as exc:
raise ConnectionError(f"Network error sending order: {exc}") from exc raise ConnectionError(f"Network error sending order: {exc}") from exc
async def fetch_market_rankings(
self,
ranking_type: str = "volume",
limit: int = 30,
) -> list[dict[str, Any]]:
"""Fetch market rankings from KIS API.
Args:
ranking_type: Type of ranking ("volume" or "fluctuation")
limit: Maximum number of results to return
Returns:
List of stock data dicts with keys: stock_code, name, price, volume,
change_rate, volume_increase_rate
Raises:
ConnectionError: If API request fails
"""
await self._rate_limiter.acquire()
session = self._get_session()
# TR_ID for volume ranking
tr_id = "FHPST01710000" if ranking_type == "volume" else "FHPST01710100"
headers = await self._auth_headers(tr_id)
params = {
"FID_COND_MRKT_DIV_CODE": "J", # Stock/ETF/ETN
"FID_COND_SCR_DIV_CODE": "20001", # Volume surge
"FID_INPUT_ISCD": "0000", # All stocks
"FID_DIV_CLS_CODE": "0", # All types
"FID_BLNG_CLS_CODE": "0",
"FID_TRGT_CLS_CODE": "111111111",
"FID_TRGT_EXLS_CLS_CODE": "000000",
"FID_INPUT_PRICE_1": "0",
"FID_INPUT_PRICE_2": "0",
"FID_VOL_CNT": "0",
"FID_INPUT_DATE_1": "",
}
url = f"{self._base_url}/uapi/domestic-stock/v1/quotations/volume-rank"
try:
async with session.get(url, headers=headers, params=params) as resp:
if resp.status != 200:
text = await resp.text()
raise ConnectionError(
f"fetch_market_rankings failed ({resp.status}): {text}"
)
data = await resp.json()
# Parse response - output is a list of ranked stocks
def _safe_float(value: str | float | None, default: float = 0.0) -> float:
if value is None or value == "":
return default
try:
return float(value)
except (ValueError, TypeError):
return default
rankings = []
for item in data.get("output", [])[:limit]:
rankings.append({
"stock_code": item.get("mksc_shrn_iscd", ""),
"name": item.get("hts_kor_isnm", ""),
"price": _safe_float(item.get("stck_prpr", "0")),
"volume": _safe_float(item.get("acml_vol", "0")),
"change_rate": _safe_float(item.get("prdy_ctrt", "0")),
"volume_increase_rate": _safe_float(item.get("vol_inrt", "0")),
})
return rankings
except (TimeoutError, aiohttp.ClientError) as exc:
raise ConnectionError(f"Network error fetching rankings: {exc}") from exc
async def get_daily_prices(
self,
stock_code: str,
days: int = 20,
) -> list[dict[str, Any]]:
"""Fetch daily OHLCV price history for a stock.
Args:
stock_code: 6-digit stock code
days: Number of trading days to fetch (default 20 for RSI calculation)
Returns:
List of daily price dicts with keys: date, open, high, low, close, volume
Sorted oldest to newest
Raises:
ConnectionError: If API request fails
"""
await self._rate_limiter.acquire()
session = self._get_session()
headers = await self._auth_headers("FHKST03010100")
# Calculate date range (today and N days ago)
from datetime import datetime, timedelta
end_date = datetime.now().strftime("%Y%m%d")
start_date = (datetime.now() - timedelta(days=days + 10)).strftime("%Y%m%d")
params = {
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD": stock_code,
"FID_INPUT_DATE_1": start_date,
"FID_INPUT_DATE_2": end_date,
"FID_PERIOD_DIV_CODE": "D", # Daily
"FID_ORG_ADJ_PRC": "0", # Adjusted price
}
url = f"{self._base_url}/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice"
try:
async with session.get(url, headers=headers, params=params) as resp:
if resp.status != 200:
text = await resp.text()
raise ConnectionError(
f"get_daily_prices failed ({resp.status}): {text}"
)
data = await resp.json()
# Parse response
def _safe_float(value: str | float | None, default: float = 0.0) -> float:
if value is None or value == "":
return default
try:
return float(value)
except (ValueError, TypeError):
return default
prices = []
for item in data.get("output2", []):
prices.append({
"date": item.get("stck_bsop_date", ""),
"open": _safe_float(item.get("stck_oprc", "0")),
"high": _safe_float(item.get("stck_hgpr", "0")),
"low": _safe_float(item.get("stck_lwpr", "0")),
"close": _safe_float(item.get("stck_clpr", "0")),
"volume": _safe_float(item.get("acml_vol", "0")),
})
# Sort oldest to newest (KIS returns newest first)
prices.reverse()
return prices[:days] # Return only requested number of days
except (TimeoutError, aiohttp.ClientError) as exc:
raise ConnectionError(f"Network error fetching daily prices: {exc}") from exc

View File

@@ -33,6 +33,12 @@ class Settings(BaseSettings):
FAT_FINGER_PCT: float = Field(default=30.0, gt=0.0, le=100.0) FAT_FINGER_PCT: float = Field(default=30.0, gt=0.0, le=100.0)
CONFIDENCE_THRESHOLD: int = Field(default=80, ge=0, le=100) CONFIDENCE_THRESHOLD: int = Field(default=80, ge=0, le=100)
# Smart Scanner Configuration
RSI_OVERSOLD_THRESHOLD: int = Field(default=30, ge=0, le=50)
RSI_MOMENTUM_THRESHOLD: int = Field(default=70, ge=50, le=100)
VOL_MULTIPLIER: float = Field(default=2.0, gt=1.0, le=10.0)
SCANNER_TOP_N: int = Field(default=3, ge=1, le=10)
# Database # Database
DB_PATH: str = "data/trade_logs.db" DB_PATH: str = "data/trade_logs.db"
@@ -49,8 +55,15 @@ class Settings(BaseSettings):
DAILY_SESSIONS: int = Field(default=4, ge=1, le=10) DAILY_SESSIONS: int = Field(default=4, ge=1, le=10)
SESSION_INTERVAL_HOURS: int = Field(default=6, ge=1, le=24) SESSION_INTERVAL_HOURS: int = Field(default=6, ge=1, le=24)
# Pre-Market Planner
PRE_MARKET_MINUTES: int = Field(default=30, ge=10, le=120)
MAX_SCENARIOS_PER_STOCK: int = Field(default=5, ge=1, le=10)
PLANNER_TIMEOUT_SECONDS: int = Field(default=60, ge=10, le=300)
DEFENSIVE_PLAYBOOK_ON_FAILURE: bool = True
RESCAN_INTERVAL_SECONDS: int = Field(default=300, ge=60, le=900)
# Market selection (comma-separated market codes) # Market selection (comma-separated market codes)
ENABLED_MARKETS: str = "KR" ENABLED_MARKETS: str = "KR,US"
# Backup and Disaster Recovery (optional) # Backup and Disaster Recovery (optional)
BACKUP_ENABLED: bool = True BACKUP_ENABLED: bool = True
@@ -66,6 +79,10 @@ class Settings(BaseSettings):
TELEGRAM_CHAT_ID: str | None = None TELEGRAM_CHAT_ID: str | None = None
TELEGRAM_ENABLED: bool = True TELEGRAM_ENABLED: bool = True
# Telegram Commands (optional)
TELEGRAM_COMMANDS_ENABLED: bool = True
TELEGRAM_POLLING_INTERVAL: float = 1.0 # seconds
model_config = {"env_file": ".env", "env_file_encoding": "utf-8"} model_config = {"env_file": ".env", "env_file_encoding": "utf-8"}
@property @property

View File

@@ -2,6 +2,7 @@
from __future__ import annotations from __future__ import annotations
import json
import sqlite3 import sqlite3
from datetime import UTC, datetime from datetime import UTC, datetime
from pathlib import Path from pathlib import Path
@@ -38,6 +39,8 @@ def init_db(db_path: str) -> sqlite3.Connection:
conn.execute("ALTER TABLE trades ADD COLUMN market TEXT DEFAULT 'KR'") conn.execute("ALTER TABLE trades ADD COLUMN market TEXT DEFAULT 'KR'")
if "exchange_code" not in columns: if "exchange_code" not in columns:
conn.execute("ALTER TABLE trades ADD COLUMN exchange_code TEXT DEFAULT 'KRX'") conn.execute("ALTER TABLE trades ADD COLUMN exchange_code TEXT DEFAULT 'KRX'")
if "selection_context" not in columns:
conn.execute("ALTER TABLE trades ADD COLUMN selection_context TEXT")
# Context tree tables for multi-layered memory management # Context tree tables for multi-layered memory management
conn.execute( conn.execute(
@@ -118,15 +121,33 @@ def log_trade(
pnl: float = 0.0, pnl: float = 0.0,
market: str = "KR", market: str = "KR",
exchange_code: str = "KRX", exchange_code: str = "KRX",
selection_context: dict[str, any] | None = None,
) -> None: ) -> None:
"""Insert a trade record into the database.""" """Insert a trade record into the database.
Args:
conn: Database connection
stock_code: Stock code
action: Trade action (BUY/SELL/HOLD)
confidence: Confidence level (0-100)
rationale: AI decision rationale
quantity: Number of shares
price: Trade price
pnl: Profit/loss
market: Market code
exchange_code: Exchange code
selection_context: Scanner selection data (RSI, volume_ratio, signal, score)
"""
# Serialize selection context to JSON
context_json = json.dumps(selection_context) if selection_context else None
conn.execute( conn.execute(
""" """
INSERT INTO trades ( INSERT INTO trades (
timestamp, stock_code, action, confidence, rationale, timestamp, stock_code, action, confidence, rationale,
quantity, price, pnl, market, exchange_code quantity, price, pnl, market, exchange_code, selection_context
) )
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", """,
( (
datetime.now(UTC).isoformat(), datetime.now(UTC).isoformat(),
@@ -139,6 +160,7 @@ def log_trade(
pnl, pnl,
market, market,
exchange_code, exchange_code,
context_json,
), ),
) )
conn.commit() conn.commit()

View File

@@ -15,6 +15,7 @@ from datetime import UTC, datetime
from typing import Any from typing import Any
from src.analysis.scanner import MarketScanner from src.analysis.scanner import MarketScanner
from src.analysis.smart_scanner import ScanCandidate, SmartVolatilityScanner
from src.analysis.volatility import VolatilityAnalyzer from src.analysis.volatility import VolatilityAnalyzer
from src.brain.gemini_client import GeminiClient from src.brain.gemini_client import GeminiClient
from src.broker.kis_api import KISBroker from src.broker.kis_api import KISBroker
@@ -29,7 +30,7 @@ from src.db import init_db, log_trade
from src.logging.decision_logger import DecisionLogger from src.logging.decision_logger import DecisionLogger
from src.logging_config import setup_logging from src.logging_config import setup_logging
from src.markets.schedule import MarketInfo, get_next_market_open, get_open_markets from src.markets.schedule import MarketInfo, get_next_market_open, get_open_markets
from src.notifications.telegram_client import TelegramClient from src.notifications.telegram_client import TelegramClient, TelegramCommandHandler
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -62,14 +63,6 @@ def safe_float(value: str | float | None, default: float = 0.0) -> float:
return default return default
# Target stock codes to monitor per market
WATCHLISTS = {
"KR": ["005930", "000660", "035420"], # Samsung, SK Hynix, NAVER
"US_NASDAQ": ["AAPL", "MSFT", "GOOGL"], # Example US stocks
"US_NYSE": ["JPM", "BAC"], # Example NYSE stocks
"JP": ["7203", "6758"], # Toyota, Sony
}
TRADE_INTERVAL_SECONDS = 60 TRADE_INTERVAL_SECONDS = 60
SCAN_INTERVAL_SECONDS = 60 # Scan markets every 60 seconds SCAN_INTERVAL_SECONDS = 60 # Scan markets every 60 seconds
MAX_CONNECTION_RETRIES = 3 MAX_CONNECTION_RETRIES = 3
@@ -78,15 +71,6 @@ MAX_CONNECTION_RETRIES = 3
DAILY_TRADE_SESSIONS = 4 # Number of trading sessions per day DAILY_TRADE_SESSIONS = 4 # Number of trading sessions per day
TRADE_SESSION_INTERVAL_HOURS = 6 # Hours between sessions TRADE_SESSION_INTERVAL_HOURS = 6 # Hours between sessions
# Full stock universe per market (for scanning)
# In production, this would be loaded from a database or API
STOCK_UNIVERSE = {
"KR": ["005930", "000660", "035420", "051910", "005380", "005490"],
"US_NASDAQ": ["AAPL", "MSFT", "GOOGL", "AMZN", "NVDA", "TSLA"],
"US_NYSE": ["JPM", "BAC", "XOM", "JNJ", "V"],
"JP": ["7203", "6758", "9984", "6861"],
}
async def trading_cycle( async def trading_cycle(
broker: KISBroker, broker: KISBroker,
@@ -100,6 +84,7 @@ async def trading_cycle(
telegram: TelegramClient, telegram: TelegramClient,
market: MarketInfo, market: MarketInfo,
stock_code: str, stock_code: str,
scan_candidates: dict[str, ScanCandidate],
) -> None: ) -> None:
"""Execute one trading cycle for a single stock.""" """Execute one trading cycle for a single stock."""
cycle_start_time = asyncio.get_event_loop().time() cycle_start_time = asyncio.get_event_loop().time()
@@ -292,7 +277,17 @@ async def trading_cycle(
except Exception as exc: except Exception as exc:
logger.warning("Telegram notification failed: %s", exc) logger.warning("Telegram notification failed: %s", exc)
# 6. Log trade # 6. Log trade with selection context
selection_context = None
if stock_code in scan_candidates:
candidate = scan_candidates[stock_code]
selection_context = {
"rsi": candidate.rsi,
"volume_ratio": candidate.volume_ratio,
"signal": candidate.signal,
"score": candidate.score,
}
log_trade( log_trade(
conn=db_conn, conn=db_conn,
stock_code=stock_code, stock_code=stock_code,
@@ -301,6 +296,7 @@ async def trading_cycle(
rationale=decision.rationale, rationale=decision.rationale,
market=market.code, market=market.code,
exchange_code=market.exchange_code, exchange_code=market.exchange_code,
selection_context=selection_context,
) )
# 7. Latency monitoring # 7. Latency monitoring
@@ -336,6 +332,7 @@ async def run_daily_session(
criticality_assessor: CriticalityAssessor, criticality_assessor: CriticalityAssessor,
telegram: TelegramClient, telegram: TelegramClient,
settings: Settings, settings: Settings,
smart_scanner: SmartVolatilityScanner | None = None,
) -> None: ) -> None:
"""Execute one daily trading session. """Execute one daily trading session.
@@ -355,15 +352,21 @@ async def run_daily_session(
# Process each open market # Process each open market
for market in open_markets: for market in open_markets:
# Get watchlist for this market # Dynamic stock discovery via scanner (no static watchlists)
watchlist = WATCHLISTS.get(market.code, []) try:
candidates = await smart_scanner.scan()
watchlist = [c.stock_code for c in candidates] if candidates else []
except Exception as exc:
logger.error("Smart Scanner failed for %s: %s", market.name, exc)
watchlist = []
if not watchlist: if not watchlist:
logger.debug("No watchlist for market %s", market.code) logger.info("No scanner candidates for market %s — skipping", market.code)
continue continue
logger.info("Processing market: %s (%d stocks)", market.name, len(watchlist)) logger.info("Processing market: %s (%d stocks)", market.name, len(watchlist))
# Collect market data for all stocks in the watchlist # Collect market data for all stocks from scanner
stocks_data = [] stocks_data = []
for stock_code in watchlist: for stock_code in watchlist:
try: try:
@@ -575,6 +578,142 @@ async def run(settings: Settings) -> None:
enabled=settings.TELEGRAM_ENABLED, enabled=settings.TELEGRAM_ENABLED,
) )
# Initialize Telegram command handler
command_handler = TelegramCommandHandler(telegram)
# Register basic commands
async def handle_help() -> None:
"""Handle /help command."""
message = (
"<b>📖 Available Commands</b>\n\n"
"/help - Show available commands\n"
"/status - Trading status (mode, markets, P&L)\n"
"/positions - Current holdings\n"
"/stop - Pause trading\n"
"/resume - Resume trading"
)
await telegram.send_message(message)
async def handle_stop() -> None:
"""Handle /stop command - pause trading."""
if not pause_trading.is_set():
await telegram.send_message("⏸️ Trading is already paused")
return
pause_trading.clear()
logger.info("Trading paused via Telegram command")
await telegram.send_message(
"<b>⏸️ Trading Paused</b>\n\n"
"All trading operations have been suspended.\n"
"Use /resume to restart trading."
)
async def handle_resume() -> None:
"""Handle /resume command - resume trading."""
if pause_trading.is_set():
await telegram.send_message("▶️ Trading is already active")
return
pause_trading.set()
logger.info("Trading resumed via Telegram command")
await telegram.send_message(
"<b>▶️ Trading Resumed</b>\n\n"
"Trading operations have been restarted."
)
async def handle_status() -> None:
"""Handle /status command - show trading status."""
try:
# Get trading status
trading_status = "Active" if pause_trading.is_set() else "Paused"
# Calculate P&L from balance data
try:
balance = await broker.get_balance()
output2 = balance.get("output2", [{}])
if output2:
total_eval = safe_float(output2[0].get("tot_evlu_amt", "0"))
purchase_total = safe_float(output2[0].get("pchs_amt_smtl_amt", "0"))
current_pnl = (
((total_eval - purchase_total) / purchase_total * 100)
if purchase_total > 0
else 0.0
)
pnl_str = f"{current_pnl:+.2f}%"
else:
pnl_str = "N/A"
except Exception as exc:
logger.warning("Failed to get P&L: %s", exc)
pnl_str = "N/A"
# Format market list
markets_str = ", ".join(settings.enabled_market_list)
message = (
"<b>📊 Trading Status</b>\n\n"
f"<b>Mode:</b> {settings.MODE.upper()}\n"
f"<b>Markets:</b> {markets_str}\n"
f"<b>Trading:</b> {trading_status}\n\n"
f"<b>Current P&L:</b> {pnl_str}\n"
f"<b>Circuit Breaker:</b> {risk._cb_threshold:.1f}%"
)
await telegram.send_message(message)
except Exception as exc:
logger.error("Error in /status handler: %s", exc)
await telegram.send_message(
"<b>⚠️ Error</b>\n\nFailed to retrieve trading status."
)
async def handle_positions() -> None:
"""Handle /positions command - show account summary."""
try:
# Get account balance
balance = await broker.get_balance()
output2 = balance.get("output2", [{}])
if not output2:
await telegram.send_message(
"<b>💼 Account Summary</b>\n\n"
"No balance information available."
)
return
# Extract account-level data
total_eval = safe_float(output2[0].get("tot_evlu_amt", "0"))
total_cash = safe_float(output2[0].get("dnca_tot_amt", "0"))
purchase_total = safe_float(output2[0].get("pchs_amt_smtl_amt", "0"))
# Calculate P&L
pnl_pct = (
((total_eval - purchase_total) / purchase_total * 100)
if purchase_total > 0
else 0.0
)
pnl_sign = "+" if pnl_pct >= 0 else ""
message = (
"<b>💼 Account Summary</b>\n\n"
f"<b>Total Evaluation:</b> ₩{total_eval:,.0f}\n"
f"<b>Available Cash:</b> ₩{total_cash:,.0f}\n"
f"<b>Purchase Total:</b> ₩{purchase_total:,.0f}\n"
f"<b>P&L:</b> {pnl_sign}{pnl_pct:.2f}%\n\n"
"<i>Note: Individual position details require API enhancement</i>"
)
await telegram.send_message(message)
except Exception as exc:
logger.error("Error in /positions handler: %s", exc)
await telegram.send_message(
"<b>⚠️ Error</b>\n\nFailed to retrieve positions."
)
command_handler.register_command("help", handle_help)
command_handler.register_command("stop", handle_stop)
command_handler.register_command("resume", handle_resume)
command_handler.register_command("status", handle_status)
command_handler.register_command("positions", handle_positions)
# Initialize volatility hunter # Initialize volatility hunter
volatility_analyzer = VolatilityAnalyzer(min_volume_surge=2.0, min_price_change=1.0) volatility_analyzer = VolatilityAnalyzer(min_volume_surge=2.0, min_price_change=1.0)
market_scanner = MarketScanner( market_scanner = MarketScanner(
@@ -586,6 +725,19 @@ async def run(settings: Settings) -> None:
max_concurrent_scans=1, # Fully serialized to avoid EGW00201 max_concurrent_scans=1, # Fully serialized to avoid EGW00201
) )
# Initialize smart scanner (Python-first, AI-last pipeline)
smart_scanner = SmartVolatilityScanner(
broker=broker,
volatility_analyzer=volatility_analyzer,
settings=settings,
)
# Track scan candidates for selection context logging
scan_candidates: dict[str, ScanCandidate] = {} # stock_code -> candidate
# Active stocks per market (dynamically discovered by scanner)
active_stocks: dict[str, list[str]] = {} # market_code -> [stock_codes]
# Initialize latency control system # Initialize latency control system
criticality_assessor = CriticalityAssessor( criticality_assessor = CriticalityAssessor(
critical_pnl_threshold=-2.5, # Near circuit breaker at -3.0% critical_pnl_threshold=-2.5, # Near circuit breaker at -3.0%
@@ -602,7 +754,10 @@ async def run(settings: Settings) -> None:
# Track market open/close state for notifications # Track market open/close state for notifications
_market_states: dict[str, bool] = {} # market_code -> is_open _market_states: dict[str, bool] = {} # market_code -> is_open
# Trading control events
shutdown = asyncio.Event() shutdown = asyncio.Event()
pause_trading = asyncio.Event()
pause_trading.set() # Default: trading enabled
def _signal_handler() -> None: def _signal_handler() -> None:
logger.info("Shutdown signal received") logger.info("Shutdown signal received")
@@ -621,6 +776,12 @@ async def run(settings: Settings) -> None:
except Exception as exc: except Exception as exc:
logger.warning("System startup notification failed: %s", exc) logger.warning("System startup notification failed: %s", exc)
# Start command handler
try:
await command_handler.start_polling()
except Exception as exc:
logger.warning("Failed to start command handler: %s", exc)
try: try:
# Branch based on trading mode # Branch based on trading mode
if settings.TRADE_MODE == "daily": if settings.TRADE_MODE == "daily":
@@ -634,6 +795,9 @@ async def run(settings: Settings) -> None:
session_interval = settings.SESSION_INTERVAL_HOURS * 3600 # Convert to seconds session_interval = settings.SESSION_INTERVAL_HOURS * 3600 # Convert to seconds
while not shutdown.is_set(): while not shutdown.is_set():
# Wait for trading to be unpaused
await pause_trading.wait()
try: try:
await run_daily_session( await run_daily_session(
broker, broker,
@@ -646,6 +810,7 @@ async def run(settings: Settings) -> None:
criticality_assessor, criticality_assessor,
telegram, telegram,
settings, settings,
smart_scanner=smart_scanner,
) )
except CircuitBreakerTripped: except CircuitBreakerTripped:
logger.critical("Circuit breaker tripped — shutting down") logger.critical("Circuit breaker tripped — shutting down")
@@ -666,6 +831,9 @@ async def run(settings: Settings) -> None:
logger.info("Realtime trading mode: 60s interval per stock") logger.info("Realtime trading mode: 60s interval per stock")
while not shutdown.is_set(): while not shutdown.is_set():
# Wait for trading to be unpaused
await pause_trading.wait()
# Get currently open markets # Get currently open markets
open_markets = get_open_markets(settings.enabled_market_list) open_markets = get_open_markets(settings.enabled_market_list)
@@ -716,49 +884,52 @@ async def run(settings: Settings) -> None:
logger.warning("Market open notification failed: %s", exc) logger.warning("Market open notification failed: %s", exc)
_market_states[market.code] = True _market_states[market.code] = True
# Volatility Hunter: Scan market periodically to update watchlist # Smart Scanner: dynamic stock discovery (no static watchlists)
now_timestamp = asyncio.get_event_loop().time() now_timestamp = asyncio.get_event_loop().time()
last_scan = last_scan_time.get(market.code, 0.0) last_scan = last_scan_time.get(market.code, 0.0)
if now_timestamp - last_scan >= SCAN_INTERVAL_SECONDS: if now_timestamp - last_scan >= SCAN_INTERVAL_SECONDS:
try: try:
# Scan all stocks in the universe logger.info("Smart Scanner: Scanning %s market", market.name)
stock_universe = STOCK_UNIVERSE.get(market.code, [])
if stock_universe: candidates = await smart_scanner.scan()
logger.info("Volatility Hunter: Scanning %s market", market.name)
scan_result = await market_scanner.scan_market( if candidates:
market, stock_universe # Use scanner results directly as trading candidates
active_stocks[market.code] = smart_scanner.get_stock_codes(
candidates
) )
# Update watchlist with top movers # Store candidates for selection context logging
current_watchlist = WATCHLISTS.get(market.code, []) for candidate in candidates:
updated_watchlist = market_scanner.get_updated_watchlist( scan_candidates[candidate.stock_code] = candidate
current_watchlist,
scan_result,
max_replacements=2,
)
WATCHLISTS[market.code] = updated_watchlist
logger.info( logger.info(
"Volatility Hunter: Watchlist updated for %s (%d top movers, %d breakouts)", "Smart Scanner: Found %d candidates for %s: %s",
len(candidates),
market.name, market.name,
len(scan_result.top_movers), [f"{c.stock_code}(RSI={c.rsi:.0f})" for c in candidates],
len(scan_result.breakouts),
) )
else:
logger.info(
"Smart Scanner: No candidates for %s — no trades", market.name
)
active_stocks[market.code] = []
last_scan_time[market.code] = now_timestamp last_scan_time[market.code] = now_timestamp
except Exception as exc:
logger.error("Volatility Hunter scan failed for %s: %s", market.name, exc)
# Get watchlist for this market except Exception as exc:
watchlist = WATCHLISTS.get(market.code, []) logger.error("Smart Scanner failed for %s: %s", market.name, exc)
if not watchlist:
logger.debug("No watchlist for market %s", market.code) # Get active stocks from scanner (dynamic, no static fallback)
stock_codes = active_stocks.get(market.code, [])
if not stock_codes:
logger.debug("No active stocks for market %s", market.code)
continue continue
logger.info("Processing market: %s (%d stocks)", market.name, len(watchlist)) logger.info("Processing market: %s (%d stocks)", market.name, len(stock_codes))
# Process each stock in the watchlist # Process each stock from scanner results
for stock_code in watchlist: for stock_code in stock_codes:
if shutdown.is_set(): if shutdown.is_set():
break break
@@ -777,6 +948,7 @@ async def run(settings: Settings) -> None:
telegram, telegram,
market, market,
stock_code, stock_code,
scan_candidates,
) )
break # Success — exit retry loop break # Success — exit retry loop
except CircuitBreakerTripped as exc: except CircuitBreakerTripped as exc:
@@ -831,6 +1003,7 @@ async def run(settings: Settings) -> None:
pass # Normal — timeout means it's time for next cycle pass # Normal — timeout means it's time for next cycle
finally: finally:
# Clean up resources # Clean up resources
await command_handler.stop_polling()
await broker.close() await broker.close()
await telegram.close() await telegram.close()
db_conn.close() db_conn.close()

View File

@@ -200,14 +200,151 @@ telegram = TelegramClient(
) )
``` ```
## Bidirectional Commands
Control your trading bot remotely via Telegram commands. The bot not only sends notifications but also accepts commands for real-time control.
### Available Commands
| Command | Description |
|---------|-------------|
| `/start` | Welcome message with quick start guide |
| `/help` | List all available commands |
| `/status` | Current trading status (mode, markets, P&L, circuit breaker) |
| `/positions` | View current holdings grouped by market |
| `/stop` | Pause all trading operations |
| `/resume` | Resume trading operations |
### Command Examples
**Check Trading Status**
```
You: /status
Bot:
📊 Trading Status
Mode: PAPER
Markets: Korea, United States
Trading: Active
Current P&L: +2.50%
Circuit Breaker: -3.0%
```
**View Holdings**
```
You: /positions
Bot:
💼 Current Holdings
🇰🇷 Korea
• 005930: 10 shares @ 70,000
• 035420: 5 shares @ 200,000
🇺🇸 Overseas
• AAPL: 15 shares @ 175
• TSLA: 8 shares @ 245
Cash: ₩5,000,000
```
**Pause Trading**
```
You: /stop
Bot:
⏸️ Trading Paused
All trading operations have been suspended.
Use /resume to restart trading.
```
**Resume Trading**
```
You: /resume
Bot:
▶️ Trading Resumed
Trading operations have been restarted.
```
### Security
**Chat ID Verification**
- Commands are only accepted from the configured `TELEGRAM_CHAT_ID`
- Unauthorized users receive no response
- Command attempts from wrong chat IDs are logged
**Authorization Required**
- Only the bot owner (chat ID in `.env`) can control trading
- No way for unauthorized users to discover or use commands
- All command executions are logged for audit
### Configuration
Add to your `.env` file:
```bash
# Commands are enabled by default
TELEGRAM_COMMANDS_ENABLED=true
# Polling interval (seconds) - how often to check for commands
TELEGRAM_POLLING_INTERVAL=1.0
```
To disable commands but keep notifications:
```bash
TELEGRAM_COMMANDS_ENABLED=false
```
### How It Works
1. **Long Polling**: Bot checks Telegram API every second for new messages
2. **Command Parsing**: Messages starting with `/` are parsed as commands
3. **Authentication**: Chat ID is verified before executing any command
4. **Execution**: Command handler is called with current bot state
5. **Response**: Result is sent back via Telegram
### Error Handling
- Command parsing errors → "Unknown command" response
- API failures → Graceful degradation, error logged
- Invalid state → Appropriate message (e.g., "Trading is already paused")
- Trading loop isolation → Command errors never crash trading
### Troubleshooting Commands
**Commands not responding**
1. Check `TELEGRAM_COMMANDS_ENABLED=true` in `.env`
2. Verify you started conversation with `/start`
3. Check logs for command handler errors
4. Confirm chat ID matches `.env` configuration
**Wrong chat ID**
- Commands from unauthorized chats are silently ignored
- Check logs for "unauthorized chat_id" warnings
**Delayed responses**
- Polling interval is 1 second by default
- Network latency may add delay
- Check `TELEGRAM_POLLING_INTERVAL` setting
## API Reference ## API Reference
See `telegram_client.py` for full API documentation. See `telegram_client.py` for full API documentation.
Key methods: ### Notification Methods
- `notify_trade_execution()` - Trade alerts - `notify_trade_execution()` - Trade alerts
- `notify_circuit_breaker()` - Emergency stops - `notify_circuit_breaker()` - Emergency stops
- `notify_fat_finger()` - Order rejections - `notify_fat_finger()` - Order rejections
- `notify_market_open/close()` - Session tracking - `notify_market_open/close()` - Session tracking
- `notify_system_start/shutdown()` - Lifecycle events - `notify_system_start/shutdown()` - Lifecycle events
- `notify_error()` - Error alerts - `notify_error()` - Error alerts
### Command Handler
- `TelegramCommandHandler` - Bidirectional command processing
- `register_command()` - Register custom command handlers
- `start_polling()` / `stop_polling()` - Lifecycle management

View File

@@ -3,6 +3,7 @@
import asyncio import asyncio
import logging import logging
import time import time
from collections.abc import Awaitable, Callable
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum from enum import Enum
@@ -339,3 +340,172 @@ class TelegramClient:
await self._send_notification( await self._send_notification(
NotificationMessage(priority=NotificationPriority.HIGH, message=message) NotificationMessage(priority=NotificationPriority.HIGH, message=message)
) )
class TelegramCommandHandler:
"""Handles incoming Telegram commands via long polling."""
def __init__(
self, client: TelegramClient, polling_interval: float = 1.0
) -> None:
"""
Initialize command handler.
Args:
client: TelegramClient instance for sending responses
polling_interval: Polling interval in seconds
"""
self._client = client
self._polling_interval = polling_interval
self._commands: dict[str, Callable[[], Awaitable[None]]] = {}
self._last_update_id = 0
self._polling_task: asyncio.Task[None] | None = None
self._running = False
def register_command(
self, command: str, handler: Callable[[], Awaitable[None]]
) -> None:
"""
Register a command handler.
Args:
command: Command name (without leading slash, e.g., "start")
handler: Async function to handle the command
"""
self._commands[command] = handler
logger.debug("Registered command handler: /%s", command)
async def start_polling(self) -> None:
"""Start long polling for commands."""
if self._running:
logger.warning("Command handler already running")
return
if not self._client._enabled:
logger.info("Command handler disabled (TelegramClient disabled)")
return
self._running = True
self._polling_task = asyncio.create_task(self._poll_loop())
logger.info("Started Telegram command polling")
async def stop_polling(self) -> None:
"""Stop polling and cancel pending tasks."""
if not self._running:
return
self._running = False
if self._polling_task:
self._polling_task.cancel()
try:
await self._polling_task
except asyncio.CancelledError:
pass
logger.info("Stopped Telegram command polling")
async def _poll_loop(self) -> None:
"""Main polling loop that fetches updates."""
while self._running:
try:
updates = await self._get_updates()
for update in updates:
await self._handle_update(update)
except asyncio.CancelledError:
break
except Exception as exc:
logger.error("Error in polling loop: %s", exc)
await asyncio.sleep(self._polling_interval)
async def _get_updates(self) -> list[dict]:
"""
Fetch updates from Telegram API.
Returns:
List of update objects
"""
try:
url = f"{self._client.API_BASE.format(token=self._client._bot_token)}/getUpdates"
payload = {
"offset": self._last_update_id + 1,
"timeout": int(self._polling_interval),
"allowed_updates": ["message"],
}
session = self._client._get_session()
async with session.post(url, json=payload) as resp:
if resp.status != 200:
error_text = await resp.text()
logger.error(
"getUpdates API error (status=%d): %s", resp.status, error_text
)
return []
data = await resp.json()
if not data.get("ok"):
logger.error("getUpdates returned ok=false: %s", data)
return []
updates = data.get("result", [])
if updates:
self._last_update_id = updates[-1]["update_id"]
return updates
except asyncio.TimeoutError:
logger.debug("getUpdates timeout (normal)")
return []
except aiohttp.ClientError as exc:
logger.error("getUpdates failed: %s", exc)
return []
except Exception as exc:
logger.error("Unexpected error in _get_updates: %s", exc)
return []
async def _handle_update(self, update: dict) -> None:
"""
Parse and handle a single update.
Args:
update: Update object from Telegram API
"""
try:
message = update.get("message")
if not message:
return
# Verify chat_id matches configured chat
chat_id = str(message.get("chat", {}).get("id", ""))
if chat_id != self._client._chat_id:
logger.warning(
"Ignoring command from unauthorized chat_id: %s", chat_id
)
return
# Extract command text
text = message.get("text", "").strip()
if not text.startswith("/"):
return
# Parse command (remove leading slash and extract command name)
command_parts = text[1:].split()
if not command_parts:
return
# Remove @botname suffix if present (for group chats)
command_name = command_parts[0].split("@")[0]
# Execute handler
handler = self._commands.get(command_name)
if handler:
logger.info("Executing command: /%s", command_name)
await handler()
else:
logger.debug("Unknown command: /%s", command_name)
await self._client.send_message(
f"Unknown command: /{command_name}\nUse /help to see available commands."
)
except Exception as exc:
logger.error("Error handling update: %s", exc)
# Don't crash the polling loop on handler errors

0
src/strategy/__init__.py Normal file
View File

164
src/strategy/models.py Normal file
View File

@@ -0,0 +1,164 @@
"""Pydantic models for pre-market scenario planning.
Defines the data contracts for the proactive strategy system:
- AI generates DayPlaybook before market open (structured JSON scenarios)
- Local ScenarioEngine matches conditions during market hours (no API calls)
"""
from __future__ import annotations
from datetime import UTC, date, datetime
from enum import Enum
from pydantic import BaseModel, Field, field_validator
class ScenarioAction(str, Enum):
"""Actions that can be taken by scenarios."""
BUY = "BUY"
SELL = "SELL"
HOLD = "HOLD"
REDUCE_ALL = "REDUCE_ALL"
class MarketOutlook(str, Enum):
"""AI's assessment of market direction."""
BULLISH = "bullish"
NEUTRAL_TO_BULLISH = "neutral_to_bullish"
NEUTRAL = "neutral"
NEUTRAL_TO_BEARISH = "neutral_to_bearish"
BEARISH = "bearish"
class PlaybookStatus(str, Enum):
"""Lifecycle status of a playbook."""
PENDING = "pending"
READY = "ready"
FAILED = "failed"
EXPIRED = "expired"
class StockCondition(BaseModel):
"""Condition fields for scenario matching (all optional, AND-combined).
The ScenarioEngine evaluates all non-None fields as AND conditions.
A condition matches only if ALL specified fields are satisfied.
"""
rsi_below: float | None = None
rsi_above: float | None = None
volume_ratio_above: float | None = None
volume_ratio_below: float | None = None
price_above: float | None = None
price_below: float | None = None
price_change_pct_above: float | None = None
price_change_pct_below: float | None = None
def has_any_condition(self) -> bool:
"""Check if at least one condition field is set."""
return any(
v is not None
for v in (
self.rsi_below,
self.rsi_above,
self.volume_ratio_above,
self.volume_ratio_below,
self.price_above,
self.price_below,
self.price_change_pct_above,
self.price_change_pct_below,
)
)
class StockScenario(BaseModel):
"""A single condition-action rule for one stock."""
condition: StockCondition
action: ScenarioAction
confidence: int = Field(ge=0, le=100)
allocation_pct: float = Field(ge=0, le=100, default=10.0)
stop_loss_pct: float = Field(le=0, default=-2.0)
take_profit_pct: float = Field(ge=0, default=3.0)
rationale: str = ""
class StockPlaybook(BaseModel):
"""All scenarios for a single stock (ordered by priority)."""
stock_code: str
stock_name: str = ""
scenarios: list[StockScenario] = Field(min_length=1)
class GlobalRule(BaseModel):
"""Portfolio-level rule (checked before stock-level scenarios)."""
condition: str # e.g. "portfolio_pnl_pct < -2.0"
action: ScenarioAction
rationale: str = ""
class CrossMarketContext(BaseModel):
"""Summary of another market's state for cross-market awareness."""
market: str # e.g. "US" or "KR"
date: str
total_pnl: float = 0.0
win_rate: float = 0.0
index_change_pct: float = 0.0 # e.g. KOSPI or S&P500 change
key_events: list[str] = Field(default_factory=list)
lessons: list[str] = Field(default_factory=list)
class DayPlaybook(BaseModel):
"""Complete playbook for a single trading day in a single market.
Generated by PreMarketPlanner (1 Gemini call per market per day).
Consumed by ScenarioEngine during market hours (0 API calls).
"""
date: date
market: str # "KR" or "US"
market_outlook: MarketOutlook = MarketOutlook.NEUTRAL
generated_at: str = "" # ISO timestamp
gemini_model: str = ""
token_count: int = 0
global_rules: list[GlobalRule] = Field(default_factory=list)
stock_playbooks: list[StockPlaybook] = Field(default_factory=list)
default_action: ScenarioAction = ScenarioAction.HOLD
context_summary: dict = Field(default_factory=dict)
cross_market: CrossMarketContext | None = None
@field_validator("stock_playbooks")
@classmethod
def validate_unique_stocks(cls, v: list[StockPlaybook]) -> list[StockPlaybook]:
codes = [pb.stock_code for pb in v]
if len(codes) != len(set(codes)):
raise ValueError("Duplicate stock codes in playbook")
return v
def get_stock_playbook(self, stock_code: str) -> StockPlaybook | None:
"""Find the playbook for a specific stock."""
for pb in self.stock_playbooks:
if pb.stock_code == stock_code:
return pb
return None
@property
def scenario_count(self) -> int:
"""Total number of scenarios across all stocks."""
return sum(len(pb.scenarios) for pb in self.stock_playbooks)
@property
def stock_count(self) -> int:
"""Number of stocks with scenarios."""
return len(self.stock_playbooks)
def model_post_init(self, __context: object) -> None:
"""Set generated_at if not provided."""
if not self.generated_at:
self.generated_at = datetime.now(UTC).isoformat()

View File

@@ -174,6 +174,7 @@ class TestTradingCycleTelegramIntegration:
telegram=mock_telegram, telegram=mock_telegram,
market=mock_market, market=mock_market,
stock_code="005930", stock_code="005930",
scan_candidates={},
) )
# Verify notification was sent # Verify notification was sent
@@ -216,6 +217,7 @@ class TestTradingCycleTelegramIntegration:
telegram=mock_telegram, telegram=mock_telegram,
market=mock_market, market=mock_market,
stock_code="005930", stock_code="005930",
scan_candidates={},
) )
# Verify notification was attempted # Verify notification was attempted
@@ -257,6 +259,7 @@ class TestTradingCycleTelegramIntegration:
telegram=mock_telegram, telegram=mock_telegram,
market=mock_market, market=mock_market,
stock_code="005930", stock_code="005930",
scan_candidates={},
) )
# Verify notification was sent # Verify notification was sent
@@ -305,6 +308,7 @@ class TestTradingCycleTelegramIntegration:
telegram=mock_telegram, telegram=mock_telegram,
market=mock_market, market=mock_market,
stock_code="005930", stock_code="005930",
scan_candidates={},
) )
# Verify notification was attempted # Verify notification was attempted
@@ -345,6 +349,7 @@ class TestTradingCycleTelegramIntegration:
telegram=mock_telegram, telegram=mock_telegram,
market=mock_market, market=mock_market,
stock_code="005930", stock_code="005930",
scan_candidates={},
) )
# Verify no trade notification sent # Verify no trade notification sent
@@ -543,6 +548,7 @@ class TestOverseasBalanceParsing:
telegram=mock_telegram, telegram=mock_telegram,
market=mock_overseas_market, market=mock_overseas_market,
stock_code="AAPL", stock_code="AAPL",
scan_candidates={},
) )
# Verify balance API was called # Verify balance API was called
@@ -577,6 +583,7 @@ class TestOverseasBalanceParsing:
telegram=mock_telegram, telegram=mock_telegram,
market=mock_overseas_market, market=mock_overseas_market,
stock_code="AAPL", stock_code="AAPL",
scan_candidates={},
) )
# Verify balance API was called # Verify balance API was called
@@ -611,6 +618,7 @@ class TestOverseasBalanceParsing:
telegram=mock_telegram, telegram=mock_telegram,
market=mock_overseas_market, market=mock_overseas_market,
stock_code="AAPL", stock_code="AAPL",
scan_candidates={},
) )
# Verify balance API was called # Verify balance API was called
@@ -645,6 +653,7 @@ class TestOverseasBalanceParsing:
telegram=mock_telegram, telegram=mock_telegram,
market=mock_overseas_market, market=mock_overseas_market,
stock_code="AAPL", stock_code="AAPL",
scan_candidates={},
) )
# Verify price API was called # Verify price API was called

377
tests/test_smart_scanner.py Normal file
View File

@@ -0,0 +1,377 @@
"""Tests for SmartVolatilityScanner."""
from __future__ import annotations
import pytest
from unittest.mock import AsyncMock, MagicMock
from src.analysis.smart_scanner import ScanCandidate, SmartVolatilityScanner
from src.analysis.volatility import VolatilityAnalyzer
from src.broker.kis_api import KISBroker
from src.config import Settings
@pytest.fixture
def mock_settings() -> Settings:
"""Create test settings."""
return Settings(
KIS_APP_KEY="test",
KIS_APP_SECRET="test",
KIS_ACCOUNT_NO="12345678-01",
GEMINI_API_KEY="test",
RSI_OVERSOLD_THRESHOLD=30,
RSI_MOMENTUM_THRESHOLD=70,
VOL_MULTIPLIER=2.0,
SCANNER_TOP_N=3,
DB_PATH=":memory:",
)
@pytest.fixture
def mock_broker(mock_settings: Settings) -> MagicMock:
"""Create mock broker."""
broker = MagicMock(spec=KISBroker)
broker._settings = mock_settings
broker.fetch_market_rankings = AsyncMock()
broker.get_daily_prices = AsyncMock()
return broker
@pytest.fixture
def scanner(mock_broker: MagicMock, mock_settings: Settings) -> SmartVolatilityScanner:
"""Create smart scanner instance."""
analyzer = VolatilityAnalyzer()
return SmartVolatilityScanner(
broker=mock_broker,
volatility_analyzer=analyzer,
settings=mock_settings,
)
class TestSmartVolatilityScanner:
"""Test suite for SmartVolatilityScanner."""
@pytest.mark.asyncio
async def test_scan_finds_oversold_candidates(
self, scanner: SmartVolatilityScanner, mock_broker: MagicMock
) -> None:
"""Test that scanner identifies oversold stocks with high volume."""
# Mock rankings
mock_broker.fetch_market_rankings.return_value = [
{
"stock_code": "005930",
"name": "Samsung",
"price": 70000,
"volume": 5000000,
"change_rate": -3.5,
"volume_increase_rate": 250,
},
]
# Mock daily prices - trending down (oversold)
prices = []
for i in range(20):
prices.append({
"date": f"2026020{i:02d}",
"open": 75000 - i * 200,
"high": 75500 - i * 200,
"low": 74500 - i * 200,
"close": 75000 - i * 250, # Steady decline
"volume": 2000000,
})
mock_broker.get_daily_prices.return_value = prices
candidates = await scanner.scan()
# Should find at least one candidate (depending on exact RSI calculation)
mock_broker.fetch_market_rankings.assert_called_once()
mock_broker.get_daily_prices.assert_called_once_with("005930", days=20)
# If qualified, should have oversold signal
if candidates:
assert candidates[0].signal in ["oversold", "momentum"]
assert candidates[0].volume_ratio >= scanner.vol_multiplier
@pytest.mark.asyncio
async def test_scan_finds_momentum_candidates(
self, scanner: SmartVolatilityScanner, mock_broker: MagicMock
) -> None:
"""Test that scanner identifies momentum stocks with high volume."""
mock_broker.fetch_market_rankings.return_value = [
{
"stock_code": "035420",
"name": "NAVER",
"price": 250000,
"volume": 3000000,
"change_rate": 5.0,
"volume_increase_rate": 300,
},
]
# Mock daily prices - trending up (momentum)
prices = []
for i in range(20):
prices.append({
"date": f"2026020{i:02d}",
"open": 230000 + i * 500,
"high": 231000 + i * 500,
"low": 229000 + i * 500,
"close": 230500 + i * 500, # Steady rise
"volume": 1000000,
})
mock_broker.get_daily_prices.return_value = prices
candidates = await scanner.scan()
mock_broker.fetch_market_rankings.assert_called_once()
@pytest.mark.asyncio
async def test_scan_filters_low_volume(
self, scanner: SmartVolatilityScanner, mock_broker: MagicMock
) -> None:
"""Test that stocks with low volume ratio are filtered out."""
mock_broker.fetch_market_rankings.return_value = [
{
"stock_code": "000660",
"name": "SK Hynix",
"price": 150000,
"volume": 500000,
"change_rate": -5.0,
"volume_increase_rate": 50, # Only 50% increase (< 200%)
},
]
# Low volume
prices = []
for i in range(20):
prices.append({
"date": f"2026020{i:02d}",
"open": 150000 - i * 100,
"high": 151000 - i * 100,
"low": 149000 - i * 100,
"close": 150000 - i * 150, # Declining (would be oversold)
"volume": 1000000, # Current 500k < 2x prev day 1M
})
mock_broker.get_daily_prices.return_value = prices
candidates = await scanner.scan()
# Should be filtered out due to low volume ratio
assert len(candidates) == 0
@pytest.mark.asyncio
async def test_scan_filters_neutral_rsi(
self, scanner: SmartVolatilityScanner, mock_broker: MagicMock
) -> None:
"""Test that stocks with neutral RSI are filtered out."""
mock_broker.fetch_market_rankings.return_value = [
{
"stock_code": "051910",
"name": "LG Chem",
"price": 500000,
"volume": 3000000,
"change_rate": 0.5,
"volume_increase_rate": 300, # High volume
},
]
# Flat prices (neutral RSI ~50)
prices = []
for i in range(20):
prices.append({
"date": f"2026020{i:02d}",
"open": 500000 + (i % 2) * 100, # Small oscillation
"high": 500500,
"low": 499500,
"close": 500000 + (i % 2) * 50,
"volume": 1000000,
})
mock_broker.get_daily_prices.return_value = prices
candidates = await scanner.scan()
# Should be filtered out (RSI ~50, not < 30 or > 70)
assert len(candidates) == 0
@pytest.mark.asyncio
async def test_scan_uses_fallback_on_api_error(
self, scanner: SmartVolatilityScanner, mock_broker: MagicMock
) -> None:
"""Test fallback to static list when ranking API fails."""
mock_broker.fetch_market_rankings.side_effect = ConnectionError("API unavailable")
# Fallback stocks should still be analyzed
prices = []
for i in range(20):
prices.append({
"date": f"2026020{i:02d}",
"open": 50000 - i * 50,
"high": 51000 - i * 50,
"low": 49000 - i * 50,
"close": 50000 - i * 75, # Declining
"volume": 1000000,
})
mock_broker.get_daily_prices.return_value = prices
candidates = await scanner.scan(fallback_stocks=["005930", "000660"])
# Should not crash
assert isinstance(candidates, list)
@pytest.mark.asyncio
async def test_scan_returns_top_n_only(
self, scanner: SmartVolatilityScanner, mock_broker: MagicMock
) -> None:
"""Test that scan returns at most top_n candidates."""
# Return many stocks
mock_broker.fetch_market_rankings.return_value = [
{
"stock_code": f"00{i}000",
"name": f"Stock{i}",
"price": 10000 * i,
"volume": 5000000,
"change_rate": -10,
"volume_increase_rate": 500,
}
for i in range(1, 10)
]
# All oversold with high volume
def make_prices(code: str) -> list[dict]:
prices = []
for i in range(20):
prices.append({
"date": f"2026020{i:02d}",
"open": 10000 - i * 100,
"high": 10500 - i * 100,
"low": 9500 - i * 100,
"close": 10000 - i * 150,
"volume": 1000000,
})
return prices
mock_broker.get_daily_prices.side_effect = make_prices
candidates = await scanner.scan()
# Should respect top_n limit (3)
assert len(candidates) <= scanner.top_n
@pytest.mark.asyncio
async def test_scan_skips_insufficient_price_history(
self, scanner: SmartVolatilityScanner, mock_broker: MagicMock
) -> None:
"""Test that stocks with insufficient history are skipped."""
mock_broker.fetch_market_rankings.return_value = [
{
"stock_code": "005930",
"name": "Samsung",
"price": 70000,
"volume": 5000000,
"change_rate": -5.0,
"volume_increase_rate": 300,
},
]
# Only 5 days of data (need 15+ for RSI)
mock_broker.get_daily_prices.return_value = [
{
"date": f"2026020{i:02d}",
"open": 70000,
"high": 71000,
"low": 69000,
"close": 70000,
"volume": 2000000,
}
for i in range(5)
]
candidates = await scanner.scan()
# Should skip due to insufficient data
assert len(candidates) == 0
@pytest.mark.asyncio
async def test_get_stock_codes(
self, scanner: SmartVolatilityScanner
) -> None:
"""Test extraction of stock codes from candidates."""
candidates = [
ScanCandidate(
stock_code="005930",
name="Samsung",
price=70000,
volume=5000000,
volume_ratio=2.5,
rsi=28,
signal="oversold",
score=85.0,
),
ScanCandidate(
stock_code="035420",
name="NAVER",
price=250000,
volume=3000000,
volume_ratio=3.0,
rsi=75,
signal="momentum",
score=88.0,
),
]
codes = scanner.get_stock_codes(candidates)
assert codes == ["005930", "035420"]
class TestRSICalculation:
"""Test RSI calculation in VolatilityAnalyzer."""
def test_rsi_oversold(self) -> None:
"""Test RSI calculation for downtrending prices."""
analyzer = VolatilityAnalyzer()
# Steadily declining prices
prices = [100 - i * 0.5 for i in range(20)]
rsi = analyzer.calculate_rsi(prices, period=14)
assert rsi < 50 # Should be oversold territory
def test_rsi_overbought(self) -> None:
"""Test RSI calculation for uptrending prices."""
analyzer = VolatilityAnalyzer()
# Steadily rising prices
prices = [100 + i * 0.5 for i in range(20)]
rsi = analyzer.calculate_rsi(prices, period=14)
assert rsi > 50 # Should be overbought territory
def test_rsi_neutral(self) -> None:
"""Test RSI calculation for flat prices."""
analyzer = VolatilityAnalyzer()
# Flat prices with small oscillation
prices = [100 + (i % 2) * 0.1 for i in range(20)]
rsi = analyzer.calculate_rsi(prices, period=14)
assert 40 < rsi < 60 # Should be near neutral
def test_rsi_insufficient_data(self) -> None:
"""Test RSI returns neutral when insufficient data."""
analyzer = VolatilityAnalyzer()
prices = [100, 101, 102] # Only 3 prices, need 15+
rsi = analyzer.calculate_rsi(prices, period=14)
assert rsi == 50.0 # Default neutral
def test_rsi_all_gains(self) -> None:
"""Test RSI returns 100 when all gains (no losses)."""
analyzer = VolatilityAnalyzer()
# Monotonic increase
prices = [100 + i for i in range(20)]
rsi = analyzer.calculate_rsi(prices, period=14)
assert rsi == 100.0 # Maximum RSI

View File

@@ -0,0 +1,366 @@
"""Tests for strategy/playbook Pydantic models."""
from __future__ import annotations
from datetime import date
import pytest
from pydantic import ValidationError
from src.strategy.models import (
CrossMarketContext,
DayPlaybook,
GlobalRule,
MarketOutlook,
PlaybookStatus,
ScenarioAction,
StockCondition,
StockPlaybook,
StockScenario,
)
# ---------------------------------------------------------------------------
# StockCondition
# ---------------------------------------------------------------------------
class TestStockCondition:
def test_empty_condition(self) -> None:
cond = StockCondition()
assert not cond.has_any_condition()
def test_single_field(self) -> None:
cond = StockCondition(rsi_below=30.0)
assert cond.has_any_condition()
def test_multiple_fields(self) -> None:
cond = StockCondition(rsi_below=25.0, volume_ratio_above=3.0)
assert cond.has_any_condition()
def test_all_fields(self) -> None:
cond = StockCondition(
rsi_below=30,
rsi_above=10,
volume_ratio_above=2.0,
volume_ratio_below=10.0,
price_above=1000,
price_below=50000,
price_change_pct_above=-5.0,
price_change_pct_below=5.0,
)
assert cond.has_any_condition()
# ---------------------------------------------------------------------------
# StockScenario
# ---------------------------------------------------------------------------
class TestStockScenario:
def test_valid_scenario(self) -> None:
s = StockScenario(
condition=StockCondition(rsi_below=25.0),
action=ScenarioAction.BUY,
confidence=85,
allocation_pct=15.0,
stop_loss_pct=-2.0,
take_profit_pct=3.0,
rationale="Oversold bounce expected",
)
assert s.action == ScenarioAction.BUY
assert s.confidence == 85
def test_confidence_too_high(self) -> None:
with pytest.raises(ValidationError):
StockScenario(
condition=StockCondition(),
action=ScenarioAction.BUY,
confidence=101,
)
def test_confidence_too_low(self) -> None:
with pytest.raises(ValidationError):
StockScenario(
condition=StockCondition(),
action=ScenarioAction.BUY,
confidence=-1,
)
def test_allocation_too_high(self) -> None:
with pytest.raises(ValidationError):
StockScenario(
condition=StockCondition(),
action=ScenarioAction.BUY,
confidence=80,
allocation_pct=101.0,
)
def test_stop_loss_must_be_negative(self) -> None:
with pytest.raises(ValidationError):
StockScenario(
condition=StockCondition(),
action=ScenarioAction.BUY,
confidence=80,
stop_loss_pct=1.0,
)
def test_take_profit_must_be_positive(self) -> None:
with pytest.raises(ValidationError):
StockScenario(
condition=StockCondition(),
action=ScenarioAction.BUY,
confidence=80,
take_profit_pct=-1.0,
)
def test_defaults(self) -> None:
s = StockScenario(
condition=StockCondition(),
action=ScenarioAction.HOLD,
confidence=50,
)
assert s.allocation_pct == 10.0
assert s.stop_loss_pct == -2.0
assert s.take_profit_pct == 3.0
assert s.rationale == ""
# ---------------------------------------------------------------------------
# StockPlaybook
# ---------------------------------------------------------------------------
class TestStockPlaybook:
def test_valid_playbook(self) -> None:
pb = StockPlaybook(
stock_code="005930",
stock_name="Samsung Electronics",
scenarios=[
StockScenario(
condition=StockCondition(rsi_below=25.0),
action=ScenarioAction.BUY,
confidence=85,
),
],
)
assert pb.stock_code == "005930"
assert len(pb.scenarios) == 1
def test_empty_scenarios_rejected(self) -> None:
with pytest.raises(ValidationError):
StockPlaybook(
stock_code="005930",
scenarios=[],
)
def test_multiple_scenarios(self) -> None:
pb = StockPlaybook(
stock_code="AAPL",
scenarios=[
StockScenario(
condition=StockCondition(rsi_below=25.0),
action=ScenarioAction.BUY,
confidence=85,
),
StockScenario(
condition=StockCondition(rsi_above=75.0),
action=ScenarioAction.SELL,
confidence=80,
),
],
)
assert len(pb.scenarios) == 2
# ---------------------------------------------------------------------------
# GlobalRule
# ---------------------------------------------------------------------------
class TestGlobalRule:
def test_valid_rule(self) -> None:
rule = GlobalRule(
condition="portfolio_pnl_pct < -2.0",
action=ScenarioAction.REDUCE_ALL,
rationale="Risk limit approaching",
)
assert rule.action == ScenarioAction.REDUCE_ALL
def test_hold_rule(self) -> None:
rule = GlobalRule(
condition="volatility_index > 30",
action=ScenarioAction.HOLD,
)
assert rule.rationale == ""
# ---------------------------------------------------------------------------
# CrossMarketContext
# ---------------------------------------------------------------------------
class TestCrossMarketContext:
def test_valid_context(self) -> None:
ctx = CrossMarketContext(
market="US",
date="2026-02-07",
total_pnl=-1.5,
win_rate=40.0,
index_change_pct=-2.3,
key_events=["Fed rate decision"],
lessons=["Avoid tech sector on rate hike days"],
)
assert ctx.market == "US"
assert len(ctx.key_events) == 1
def test_defaults(self) -> None:
ctx = CrossMarketContext(market="KR", date="2026-02-07")
assert ctx.total_pnl == 0.0
assert ctx.key_events == []
assert ctx.lessons == []
# ---------------------------------------------------------------------------
# DayPlaybook
# ---------------------------------------------------------------------------
def _make_scenario(rsi_below: float = 25.0) -> StockScenario:
return StockScenario(
condition=StockCondition(rsi_below=rsi_below),
action=ScenarioAction.BUY,
confidence=85,
)
def _make_playbook(**kwargs) -> DayPlaybook:
defaults = {
"date": date(2026, 2, 7),
"market": "KR",
"stock_playbooks": [
StockPlaybook(stock_code="005930", scenarios=[_make_scenario()]),
],
}
defaults.update(kwargs)
return DayPlaybook(**defaults)
class TestDayPlaybook:
def test_valid_playbook(self) -> None:
pb = _make_playbook()
assert pb.market == "KR"
assert pb.date == date(2026, 2, 7)
assert pb.default_action == ScenarioAction.HOLD
assert pb.scenario_count == 1
assert pb.stock_count == 1
def test_generated_at_auto_set(self) -> None:
pb = _make_playbook()
assert pb.generated_at != ""
def test_explicit_generated_at(self) -> None:
pb = _make_playbook(generated_at="2026-02-07T08:30:00")
assert pb.generated_at == "2026-02-07T08:30:00"
def test_duplicate_stocks_rejected(self) -> None:
with pytest.raises(ValidationError):
DayPlaybook(
date=date(2026, 2, 7),
market="KR",
stock_playbooks=[
StockPlaybook(stock_code="005930", scenarios=[_make_scenario()]),
StockPlaybook(stock_code="005930", scenarios=[_make_scenario(30)]),
],
)
def test_empty_stock_playbooks_allowed(self) -> None:
pb = DayPlaybook(
date=date(2026, 2, 7),
market="KR",
stock_playbooks=[],
)
assert pb.stock_count == 0
assert pb.scenario_count == 0
def test_get_stock_playbook_found(self) -> None:
pb = _make_playbook()
result = pb.get_stock_playbook("005930")
assert result is not None
assert result.stock_code == "005930"
def test_get_stock_playbook_not_found(self) -> None:
pb = _make_playbook()
result = pb.get_stock_playbook("AAPL")
assert result is None
def test_with_global_rules(self) -> None:
pb = _make_playbook(
global_rules=[
GlobalRule(
condition="portfolio_pnl_pct < -2.0",
action=ScenarioAction.REDUCE_ALL,
),
],
)
assert len(pb.global_rules) == 1
def test_with_cross_market_context(self) -> None:
ctx = CrossMarketContext(market="US", date="2026-02-07", total_pnl=-1.5)
pb = _make_playbook(cross_market=ctx)
assert pb.cross_market is not None
assert pb.cross_market.market == "US"
def test_market_outlook(self) -> None:
pb = _make_playbook(market_outlook=MarketOutlook.BEARISH)
assert pb.market_outlook == MarketOutlook.BEARISH
def test_multiple_stocks_multiple_scenarios(self) -> None:
pb = DayPlaybook(
date=date(2026, 2, 7),
market="US",
stock_playbooks=[
StockPlaybook(
stock_code="AAPL",
scenarios=[_make_scenario(), _make_scenario(30)],
),
StockPlaybook(
stock_code="MSFT",
scenarios=[_make_scenario()],
),
],
)
assert pb.stock_count == 2
assert pb.scenario_count == 3
def test_serialization_roundtrip(self) -> None:
pb = _make_playbook(
market_outlook=MarketOutlook.BULLISH,
cross_market=CrossMarketContext(market="US", date="2026-02-07"),
)
json_str = pb.model_dump_json()
restored = DayPlaybook.model_validate_json(json_str)
assert restored.market == pb.market
assert restored.date == pb.date
assert restored.scenario_count == pb.scenario_count
assert restored.cross_market is not None
# ---------------------------------------------------------------------------
# Enums
# ---------------------------------------------------------------------------
class TestEnums:
def test_scenario_action_values(self) -> None:
assert ScenarioAction.BUY.value == "BUY"
assert ScenarioAction.SELL.value == "SELL"
assert ScenarioAction.HOLD.value == "HOLD"
assert ScenarioAction.REDUCE_ALL.value == "REDUCE_ALL"
def test_market_outlook_values(self) -> None:
assert len(MarketOutlook) == 5
def test_playbook_status_values(self) -> None:
assert PlaybookStatus.READY.value == "ready"
assert PlaybookStatus.EXPIRED.value == "expired"

View File

@@ -0,0 +1,777 @@
"""Tests for Telegram command handler."""
from unittest.mock import AsyncMock, patch
import pytest
from src.notifications.telegram_client import TelegramClient, TelegramCommandHandler
class TestCommandHandlerInit:
"""Test command handler initialization."""
def test_init_with_client(self) -> None:
"""Handler initializes with TelegramClient."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
assert handler._client is client
assert handler._polling_interval == 1.0
assert handler._commands == {}
assert handler._running is False
def test_custom_polling_interval(self) -> None:
"""Handler accepts custom polling interval."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client, polling_interval=2.5)
assert handler._polling_interval == 2.5
class TestCommandRegistration:
"""Test command registration."""
@pytest.mark.asyncio
async def test_register_command(self) -> None:
"""Commands can be registered."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
async def test_handler() -> None:
pass
handler.register_command("test", test_handler)
assert "test" in handler._commands
assert handler._commands["test"] is test_handler
@pytest.mark.asyncio
async def test_register_multiple_commands(self) -> None:
"""Multiple commands can be registered."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
async def handler1() -> None:
pass
async def handler2() -> None:
pass
handler.register_command("start", handler1)
handler.register_command("help", handler2)
assert len(handler._commands) == 2
assert handler._commands["start"] is handler1
assert handler._commands["help"] is handler2
class TestPollingLifecycle:
"""Test polling start/stop."""
@pytest.mark.asyncio
async def test_start_polling(self) -> None:
"""Polling can be started."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
with patch.object(handler, "_poll_loop", new_callable=AsyncMock):
await handler.start_polling()
assert handler._running is True
assert handler._polling_task is not None
await handler.stop_polling()
@pytest.mark.asyncio
async def test_start_polling_disabled_client(self) -> None:
"""Polling not started when client disabled."""
client = TelegramClient(enabled=False)
handler = TelegramCommandHandler(client)
await handler.start_polling()
assert handler._running is False
assert handler._polling_task is None
@pytest.mark.asyncio
async def test_stop_polling(self) -> None:
"""Polling can be stopped."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
with patch.object(handler, "_poll_loop", new_callable=AsyncMock):
await handler.start_polling()
await handler.stop_polling()
assert handler._running is False
@pytest.mark.asyncio
async def test_double_start_ignored(self) -> None:
"""Starting already running handler is ignored."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
with patch.object(handler, "_poll_loop", new_callable=AsyncMock):
await handler.start_polling()
task1 = handler._polling_task
await handler.start_polling() # Second start
task2 = handler._polling_task
# Should be the same task
assert task1 is task2
await handler.stop_polling()
class TestUpdateHandling:
"""Test update parsing and handling."""
@pytest.mark.asyncio
async def test_handle_valid_command(self) -> None:
"""Valid commands are executed."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
executed = False
async def test_command() -> None:
nonlocal executed
executed = True
handler.register_command("test", test_command)
update = {
"update_id": 1,
"message": {
"chat": {"id": 456},
"text": "/test",
},
}
await handler._handle_update(update)
assert executed is True
@pytest.mark.asyncio
async def test_handle_unknown_command(self) -> None:
"""Unknown commands send help message."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
update = {
"update_id": 1,
"message": {
"chat": {"id": 456},
"text": "/unknown",
},
}
await handler._handle_update(update)
# Should send error message
assert mock_post.call_count == 1
payload = mock_post.call_args.kwargs["json"]
assert "Unknown command" in payload["text"]
assert "/unknown" in payload["text"]
@pytest.mark.asyncio
async def test_ignore_unauthorized_chat(self) -> None:
"""Commands from unauthorized chats are ignored."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
executed = False
async def test_command() -> None:
nonlocal executed
executed = True
handler.register_command("test", test_command)
update = {
"update_id": 1,
"message": {
"chat": {"id": 999}, # Wrong chat_id
"text": "/test",
},
}
await handler._handle_update(update)
assert executed is False
@pytest.mark.asyncio
async def test_ignore_non_command_text(self) -> None:
"""Non-command text is ignored."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
executed = False
async def test_command() -> None:
nonlocal executed
executed = True
handler.register_command("test", test_command)
update = {
"update_id": 1,
"message": {
"chat": {"id": 456},
"text": "Hello, not a command",
},
}
await handler._handle_update(update)
assert executed is False
@pytest.mark.asyncio
async def test_handle_command_with_botname(self) -> None:
"""Commands with @botname suffix are handled correctly."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
executed = False
async def test_command() -> None:
nonlocal executed
executed = True
handler.register_command("start", test_command)
update = {
"update_id": 1,
"message": {
"chat": {"id": 456},
"text": "/start@mybot",
},
}
await handler._handle_update(update)
assert executed is True
@pytest.mark.asyncio
async def test_handle_update_error_isolation(self) -> None:
"""Errors in handlers don't crash the system."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
async def failing_command() -> None:
raise ValueError("Test error")
handler.register_command("fail", failing_command)
update = {
"update_id": 1,
"message": {
"chat": {"id": 456},
"text": "/fail",
},
}
# Should not raise exception
await handler._handle_update(update)
class TestTradingControlCommands:
"""Test trading control commands."""
@pytest.mark.asyncio
async def test_stop_command_pauses_trading(self) -> None:
"""Stop command clears pause event."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
# Create mock pause event
import asyncio
pause_event = asyncio.Event()
pause_event.set() # Initially active
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
async def mock_stop() -> None:
"""Mock /stop handler."""
if not pause_event.is_set():
await client.send_message("⏸️ Trading is already paused")
return
pause_event.clear()
await client.send_message(
"<b>⏸️ Trading Paused</b>\n\n"
"All trading operations have been suspended.\n"
"Use /resume to restart trading."
)
handler.register_command("stop", mock_stop)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
update = {
"update_id": 1,
"message": {
"chat": {"id": 456},
"text": "/stop",
},
}
await handler._handle_update(update)
# Verify pause event was cleared
assert not pause_event.is_set()
# Verify message was sent
assert mock_post.call_count == 1
payload = mock_post.call_args.kwargs["json"]
assert "Trading Paused" in payload["text"]
@pytest.mark.asyncio
async def test_resume_command_resumes_trading(self) -> None:
"""Resume command sets pause event."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
# Create mock pause event (initially paused)
import asyncio
pause_event = asyncio.Event()
pause_event.clear() # Initially paused
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
async def mock_resume() -> None:
"""Mock /resume handler."""
if pause_event.is_set():
await client.send_message("▶️ Trading is already active")
return
pause_event.set()
await client.send_message(
"<b>▶️ Trading Resumed</b>\n\n"
"Trading operations have been restarted."
)
handler.register_command("resume", mock_resume)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
update = {
"update_id": 1,
"message": {
"chat": {"id": 456},
"text": "/resume",
},
}
await handler._handle_update(update)
# Verify pause event was set
assert pause_event.is_set()
# Verify message was sent
assert mock_post.call_count == 1
payload = mock_post.call_args.kwargs["json"]
assert "Trading Resumed" in payload["text"]
@pytest.mark.asyncio
async def test_stop_when_already_paused(self) -> None:
"""Stop command when already paused sends appropriate message."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
# Create mock pause event (already paused)
import asyncio
pause_event = asyncio.Event()
pause_event.clear()
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
async def mock_stop() -> None:
"""Mock /stop handler."""
if not pause_event.is_set():
await client.send_message("⏸️ Trading is already paused")
return
pause_event.clear()
handler.register_command("stop", mock_stop)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
update = {
"update_id": 1,
"message": {
"chat": {"id": 456},
"text": "/stop",
},
}
await handler._handle_update(update)
# Verify message was sent
payload = mock_post.call_args.kwargs["json"]
assert "already paused" in payload["text"]
@pytest.mark.asyncio
async def test_resume_when_already_active(self) -> None:
"""Resume command when already active sends appropriate message."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
# Create mock pause event (already active)
import asyncio
pause_event = asyncio.Event()
pause_event.set()
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
async def mock_resume() -> None:
"""Mock /resume handler."""
if pause_event.is_set():
await client.send_message("▶️ Trading is already active")
return
pause_event.set()
handler.register_command("resume", mock_resume)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
update = {
"update_id": 1,
"message": {
"chat": {"id": 456},
"text": "/resume",
},
}
await handler._handle_update(update)
# Verify message was sent
payload = mock_post.call_args.kwargs["json"]
assert "already active" in payload["text"]
class TestStatusCommands:
"""Test status query commands."""
@pytest.mark.asyncio
async def test_status_command_shows_trading_info(self) -> None:
"""Status command displays mode, markets, and P&L."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
async def mock_status() -> None:
"""Mock /status handler."""
message = (
"<b>📊 Trading Status</b>\n\n"
"<b>Mode:</b> PAPER\n"
"<b>Markets:</b> Korea, United States\n"
"<b>Trading:</b> Active\n\n"
"<b>Current P&L:</b> +2.50%\n"
"<b>Circuit Breaker:</b> -3.0%"
)
await client.send_message(message)
handler.register_command("status", mock_status)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
update = {
"update_id": 1,
"message": {
"chat": {"id": 456},
"text": "/status",
},
}
await handler._handle_update(update)
# Verify message was sent
assert mock_post.call_count == 1
payload = mock_post.call_args.kwargs["json"]
assert "Trading Status" in payload["text"]
assert "PAPER" in payload["text"]
assert "P&L" in payload["text"]
@pytest.mark.asyncio
async def test_status_command_error_handling(self) -> None:
"""Status command handles errors gracefully."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
async def mock_status_error() -> None:
"""Mock /status handler with error."""
await client.send_message(
"<b>⚠️ Error</b>\n\nFailed to retrieve trading status."
)
handler.register_command("status", mock_status_error)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
update = {
"update_id": 1,
"message": {
"chat": {"id": 456},
"text": "/status",
},
}
await handler._handle_update(update)
# Should send error message
payload = mock_post.call_args.kwargs["json"]
assert "Error" in payload["text"]
@pytest.mark.asyncio
async def test_positions_command_shows_holdings(self) -> None:
"""Positions command displays account summary."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
async def mock_positions() -> None:
"""Mock /positions handler."""
message = (
"<b>💼 Account Summary</b>\n\n"
"<b>Total Evaluation:</b> ₩10,500,000\n"
"<b>Available Cash:</b> ₩5,000,000\n"
"<b>Purchase Total:</b> ₩10,000,000\n"
"<b>P&L:</b> +5.00%\n\n"
"<i>Note: Individual position details require API enhancement</i>"
)
await client.send_message(message)
handler.register_command("positions", mock_positions)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
update = {
"update_id": 1,
"message": {
"chat": {"id": 456},
"text": "/positions",
},
}
await handler._handle_update(update)
# Verify message was sent
assert mock_post.call_count == 1
payload = mock_post.call_args.kwargs["json"]
assert "Account Summary" in payload["text"]
assert "Total Evaluation" in payload["text"]
assert "P&L" in payload["text"]
@pytest.mark.asyncio
async def test_positions_command_empty_holdings(self) -> None:
"""Positions command handles empty portfolio."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
async def mock_positions_empty() -> None:
"""Mock /positions handler with no positions."""
message = (
"<b>💼 Account Summary</b>\n\n"
"No balance information available."
)
await client.send_message(message)
handler.register_command("positions", mock_positions_empty)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
update = {
"update_id": 1,
"message": {
"chat": {"id": 456},
"text": "/positions",
},
}
await handler._handle_update(update)
# Verify message was sent
payload = mock_post.call_args.kwargs["json"]
assert "No balance information available" in payload["text"]
@pytest.mark.asyncio
async def test_positions_command_error_handling(self) -> None:
"""Positions command handles errors gracefully."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
async def mock_positions_error() -> None:
"""Mock /positions handler with error."""
await client.send_message(
"<b>⚠️ Error</b>\n\nFailed to retrieve positions."
)
handler.register_command("positions", mock_positions_error)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
update = {
"update_id": 1,
"message": {
"chat": {"id": 456},
"text": "/positions",
},
}
await handler._handle_update(update)
# Should send error message
payload = mock_post.call_args.kwargs["json"]
assert "Error" in payload["text"]
class TestBasicCommands:
"""Test basic command implementations."""
@pytest.mark.asyncio
async def test_help_command_content(self) -> None:
"""Help command lists all available commands."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
async def mock_help() -> None:
"""Mock /help handler."""
message = (
"<b>📖 Available Commands</b>\n\n"
"/help - Show available commands\n"
"/status - Trading status (mode, markets, P&L)\n"
"/positions - Current holdings\n"
"/stop - Pause trading\n"
"/resume - Resume trading"
)
await client.send_message(message)
handler.register_command("help", mock_help)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
update = {
"update_id": 1,
"message": {
"chat": {"id": 456},
"text": "/help",
},
}
await handler._handle_update(update)
# Verify message was sent
assert mock_post.call_count == 1
payload = mock_post.call_args.kwargs["json"]
assert "Available Commands" in payload["text"]
assert "/help" in payload["text"]
assert "/status" in payload["text"]
assert "/positions" in payload["text"]
assert "/stop" in payload["text"]
assert "/resume" in payload["text"]
class TestGetUpdates:
"""Test getUpdates API interaction."""
@pytest.mark.asyncio
async def test_get_updates_success(self) -> None:
"""getUpdates fetches and parses updates."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.json = AsyncMock(
return_value={
"ok": True,
"result": [
{"update_id": 1, "message": {"text": "/test"}},
{"update_id": 2, "message": {"text": "/help"}},
],
}
)
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp):
updates = await handler._get_updates()
assert len(updates) == 2
assert updates[0]["update_id"] == 1
assert updates[1]["update_id"] == 2
assert handler._last_update_id == 2
@pytest.mark.asyncio
async def test_get_updates_api_error(self) -> None:
"""getUpdates handles API errors gracefully."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
mock_resp = AsyncMock()
mock_resp.status = 400
mock_resp.text = AsyncMock(return_value="Bad Request")
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp):
updates = await handler._get_updates()
assert updates == []
@pytest.mark.asyncio
async def test_get_updates_empty_result(self) -> None:
"""getUpdates handles empty results."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.json = AsyncMock(return_value={"ok": True, "result": []})
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp):
updates = await handler._get_updates()
assert updates == []