Compare commits

..

9 Commits

Author SHA1 Message Date
agentson
1a34a74232 feat: add pre-market planner config and remove static watchlists (issue #78)
Some checks failed
CI / test (pull_request) Has been cancelled
- Add pre-market planner settings: PRE_MARKET_MINUTES, MAX_SCENARIOS_PER_STOCK,
  PLANNER_TIMEOUT_SECONDS, DEFENSIVE_PLAYBOOK_ON_FAILURE, RESCAN_INTERVAL_SECONDS
- Change ENABLED_MARKETS default from KR to KR,US
- Remove static WATCHLISTS and STOCK_UNIVERSE dictionaries from main.py
- Replace watchlist-based trading with dynamic scanner-only stock discovery
- SmartVolatilityScanner is now the sole source of trading candidates
- Add active_stocks dict for scanner-discovered stocks per market
- Add smart_scanner parameter to run_daily_session()

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 01:58:09 +09:00
a82a167915 Merge pull request 'feat: implement Smart Volatility Scanner (issue #76)' (#77) from feature/issue-76-smart-volatility-scanner into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #77
2026-02-06 07:43:54 +09:00
agentson
7725e7a8de docs: update documentation for Smart Volatility Scanner
Some checks failed
CI / test (pull_request) Has been cancelled
Update project documentation to reflect new Smart Volatility Scanner feature:

## CLAUDE.md
- Add Smart Volatility Scanner section with configuration guide
- Update project structure to include analysis/ module
- Update test count (273→343 tests)

## docs/architecture.md
- Add Analysis component (VolatilityAnalyzer + SmartVolatilityScanner)
- Add new KIS API methods (fetch_market_rankings, get_daily_prices)
- Update data flow diagram to show Python-first filtering pipeline
- Add selection_context to database schema documentation
- Add Smart Scanner configuration section
- Renumber components (Brain 2→3, Risk Manager 3→4, etc.)

## docs/requirements-log.md
- Document 2026-02-06 requirement for Smart Volatility Scanner
- Explain Python-First, AI-Last pipeline rationale
- Record implementation details and benefits
- Reference issue #76 and PR #77

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-06 07:35:25 +09:00
agentson
f0ae25c533 feat: implement Smart Volatility Scanner with RSI/volume filters (issue #76)
Some checks failed
CI / test (pull_request) Has been cancelled
Add Python-first scanning pipeline that reduces Gemini API calls by filtering
stocks before AI analysis: KIS rankings API -> RSI/volume filter -> AI judgment.

## Implementation
- Add RSI calculation (Wilder's smoothing method) to VolatilityAnalyzer
- Add KIS API methods: fetch_market_rankings() and get_daily_prices()
- Create SmartVolatilityScanner with configurable thresholds
- Integrate scanner into main.py realtime mode
- Add selection_context logging to trades table for Evolution system

## Configuration
- RSI_OVERSOLD_THRESHOLD: 30 (configurable 0-50)
- RSI_MOMENTUM_THRESHOLD: 70 (configurable 50-100)
- VOL_MULTIPLIER: 2.0 (minimum volume ratio, configurable 1-10)
- SCANNER_TOP_N: 3 (max candidates per scan, configurable 1-10)

## Benefits
- Reduces Gemini API calls (process 1-3 qualified stocks vs 20-30 ranked)
- Python-based technical filtering before expensive AI judgment
- Tracks selection criteria (RSI, volume_ratio, signal, score) for strategy optimization
- Graceful fallback to static watchlist if ranking API fails

## Tests
- 13 new tests for SmartVolatilityScanner and RSI calculation
- All existing tests updated and passing
- Coverage maintained at 73%

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-06 00:48:23 +09:00
27f581f17d Merge pull request 'fix: resolve Telegram command handler errors for /status and /positions (issue #74)' (#75) from feature/issue-74-telegram-command-fix into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #75
2026-02-05 18:56:24 +09:00
agentson
18a098d9a6 fix: resolve Telegram command handler errors for /status and /positions (issue #74)
Some checks failed
CI / test (pull_request) Has been cancelled
Fixed AttributeError exceptions in /status and /positions commands:
- Replaced invalid risk.calculate_pnl() with inline P&L calculation from balance dict
- Changed risk.circuit_breaker_threshold to risk._cb_threshold
- Replaced balance.stocks access with account summary from output2 dict
- Updated tests to match new account summary format

All 27 telegram command tests pass. Live bot testing confirms no errors.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-05 18:54:42 +09:00
d2b07326ed Merge pull request 'fix: remove /start command and handle @botname suffix (issue #71)' (#72) from fix/start-command-parsing into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #72
2026-02-05 17:15:14 +09:00
agentson
1c5eadc23b fix: remove /start command and handle @botname suffix
Some checks failed
CI / test (pull_request) Has been cancelled
Remove /start command as name doesn't match functionality, and fix
command parsing to handle @botname suffix for group chat compatibility.

Changes:
- Remove handle_start function and registration
- Remove /start from help command list
- Remove test_start_command_content test
- Strip @botname suffix from commands (e.g., /help@mybot → help)

Rationale:
- /start command name implies bot initialization, but it was just
  showing help text (duplicate of /help)
- Better to have one clear /help command
- @botname suffix handling needed for group chats

Test:
- 27 tests pass (1 removed, 1 added for @botname handling)
- All existing functionality preserved

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-05 15:59:07 +09:00
10ff718045 Merge pull request 'feat: add configuration and documentation for Telegram commands (issue #69)' (#70) from feature/issue-69-config-docs into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #70
2026-02-05 15:50:52 +09:00
14 changed files with 1116 additions and 178 deletions

View File

@@ -45,6 +45,39 @@ Get real-time alerts for trades, circuit breakers, and system events via Telegra
**Fail-safe**: Notifications never crash the trading system. Missing credentials or API errors are logged but trading continues normally.
## Smart Volatility Scanner (Optional)
Python-first filtering pipeline that reduces Gemini API calls by pre-filtering stocks using technical indicators.
### How It Works
1. **Fetch Rankings** — KIS API volume surge rankings (top 30 stocks)
2. **Python Filter** — RSI + volume ratio calculations (no AI)
- Volume > 200% of previous day
- RSI(14) < 30 (oversold) OR RSI(14) > 70 (momentum)
3. **AI Judgment** — Only qualified candidates (1-3 stocks) sent to Gemini
### Configuration
Add to `.env` (optional, has sensible defaults):
```bash
RSI_OVERSOLD_THRESHOLD=30 # 0-50, default 30
RSI_MOMENTUM_THRESHOLD=70 # 50-100, default 70
VOL_MULTIPLIER=2.0 # Volume threshold (2.0 = 200%)
SCANNER_TOP_N=3 # Max candidates per scan
```
### Benefits
- **Reduces API costs** — Process 1-3 stocks instead of 20-30
- **Python-based filtering** — Fast technical analysis before AI
- **Evolution-ready** — Selection context logged for strategy optimization
- **Fault-tolerant** — Falls back to static watchlist on API failure
### Realtime Mode Only
Smart Scanner runs in `TRADE_MODE=realtime` only. Daily mode uses static watchlists for batch efficiency.
## Documentation
- **[Workflow Guide](docs/workflow.md)** — Git workflow policy and agent-based development
@@ -75,6 +108,7 @@ User requirements and feedback are tracked in [docs/requirements-log.md](docs/re
```
src/
├── analysis/ # Technical analysis (RSI, volatility, smart scanner)
├── broker/ # KIS API client (domestic + overseas)
├── brain/ # Gemini AI decision engine
├── core/ # Risk manager (READ-ONLY)
@@ -85,7 +119,7 @@ src/
├── main.py # Trading loop orchestrator
└── config.py # Settings (from .env)
tests/ # 273 tests across 13 files
tests/ # 343 tests across 14 files
docs/ # Extended documentation
```

View File

@@ -64,7 +64,39 @@ High-frequency trading with individual stock analysis:
- `get_open_markets()` returns currently active markets
- `get_next_market_open()` finds next market to open and when
### 2. Brain (`src/brain/gemini_client.py`)
**New API Methods** (added in v0.9.0):
- `fetch_market_rankings()` — Fetch volume surge rankings from KIS API
- `get_daily_prices()` — Fetch OHLCV history for technical analysis
### 2. Analysis (`src/analysis/`)
**VolatilityAnalyzer** (`volatility.py`) — Technical indicator calculations
- ATR (Average True Range) for volatility measurement
- RSI (Relative Strength Index) using Wilder's smoothing method
- Price change percentages across multiple timeframes
- Volume surge ratios and price-volume divergence
- Momentum scoring (0-100 scale)
- Breakout/breakdown pattern detection
**SmartVolatilityScanner** (`smart_scanner.py`) — Python-first filtering pipeline
- **Step 1**: Fetch volume rankings from KIS API (top 30 stocks)
- **Step 2**: Calculate RSI and volume ratio for each stock
- **Step 3**: Apply filters:
- Volume ratio >= `VOL_MULTIPLIER` (default 2.0x previous day)
- RSI < `RSI_OVERSOLD_THRESHOLD` (30) OR RSI > `RSI_MOMENTUM_THRESHOLD` (70)
- **Step 4**: Score candidates by RSI extremity (60%) + volume surge (40%)
- **Step 5**: Return top N candidates (default 3) for AI analysis
- **Fallback**: Uses static watchlist if ranking API unavailable
- **Realtime mode only**: Daily mode uses batch processing for API efficiency
**Benefits:**
- Reduces Gemini API calls from 20-30 stocks to 1-3 qualified candidates
- Fast Python-based filtering before expensive AI judgment
- Logs selection context (RSI, volume_ratio, signal, score) for Evolution system
### 3. Brain (`src/brain/gemini_client.py`)
**GeminiClient** — AI decision engine powered by Google Gemini
@@ -74,7 +106,7 @@ High-frequency trading with individual stock analysis:
- Falls back to safe HOLD on any parse/API error
- Handles markdown-wrapped JSON, malformed responses, invalid actions
### 3. Risk Manager (`src/core/risk_manager.py`)
### 4. Risk Manager (`src/core/risk_manager.py`)
**RiskManager** — Safety circuit breaker and order validation
@@ -86,7 +118,7 @@ High-frequency trading with individual stock analysis:
- **Fat-Finger Protection**: Rejects orders exceeding 30% of available cash
- Must always be enforced, cannot be disabled
### 4. Notifications (`src/notifications/telegram_client.py`)
### 5. Notifications (`src/notifications/telegram_client.py`)
**TelegramClient** — Real-time event notifications via Telegram Bot API
@@ -105,7 +137,7 @@ High-frequency trading with individual stock analysis:
**Setup:** See [src/notifications/README.md](../src/notifications/README.md) for bot creation and configuration.
### 5. Evolution (`src/evolution/optimizer.py`)
### 6. Evolution (`src/evolution/optimizer.py`)
**StrategyOptimizer** — Self-improvement loop
@@ -117,9 +149,11 @@ High-frequency trading with individual stock analysis:
## Data Flow
### Realtime Mode (with Smart Scanner)
```
┌─────────────────────────────────────────────────────────────┐
│ Main Loop (60s cycle per stock, per market)
│ Main Loop (60s cycle per market)
└─────────────────────────────────────────────────────────────┘
@@ -132,6 +166,21 @@ High-frequency trading with individual stock analysis:
┌──────────────────────────────────┐
│ Smart Scanner (Python-first) │
│ - Fetch volume rankings (KIS) │
│ - Get 20d price history per stock│
│ - Calculate RSI(14) + vol ratio │
│ - Filter: vol>2x AND RSI extreme │
│ - Return top 3 qualified stocks │
└──────────────────┬────────────────┘
┌──────────────────────────────────┐
│ For Each Qualified Candidate │
└──────────────────┬────────────────┘
┌──────────────────────────────────┐
│ Broker: Fetch Market Data │
│ - Domestic: orderbook + balance │
│ - Overseas: price + balance │
@@ -145,7 +194,7 @@ High-frequency trading with individual stock analysis:
┌──────────────────────────────────┐
│ Brain: Get Decision
│ Brain: Get Decision (AI)
│ - Build prompt with market data │
│ - Call Gemini API │
│ - Parse JSON response │
@@ -181,6 +230,9 @@ High-frequency trading with individual stock analysis:
│ - SQLite (data/trades.db) │
│ - Track: action, confidence, │
│ rationale, market, exchange │
│ - NEW: selection_context (JSON) │
│ - RSI, volume_ratio, signal │
│ - For Evolution optimization │
└───────────────────────────────────┘
```
@@ -200,11 +252,24 @@ CREATE TABLE trades (
price REAL,
pnl REAL DEFAULT 0.0,
market TEXT DEFAULT 'KR', -- KR | US_NASDAQ | JP | etc.
exchange_code TEXT DEFAULT 'KRX' -- KRX | NASD | NYSE | etc.
exchange_code TEXT DEFAULT 'KRX', -- KRX | NASD | NYSE | etc.
selection_context TEXT -- JSON: {rsi, volume_ratio, signal, score}
);
```
Auto-migration: Adds `market` and `exchange_code` columns if missing for backward compatibility.
**Selection Context** (new in v0.9.0): Stores scanner selection criteria as JSON:
```json
{
"rsi": 28.5,
"volume_ratio": 2.7,
"signal": "oversold",
"score": 85.2
}
```
Enables Evolution system to analyze correlation between selection criteria and trade outcomes.
Auto-migration: Adds `market`, `exchange_code`, and `selection_context` columns if missing for backward compatibility.
## Configuration
@@ -236,6 +301,12 @@ SESSION_INTERVAL_HOURS=6 # Hours between sessions (daily mode only)
TELEGRAM_BOT_TOKEN=1234567890:ABCdefGHIjklMNOpqrsTUVwxyz
TELEGRAM_CHAT_ID=123456789
TELEGRAM_ENABLED=true
# Smart Scanner (optional, realtime mode only)
RSI_OVERSOLD_THRESHOLD=30 # 0-50, oversold threshold
RSI_MOMENTUM_THRESHOLD=70 # 50-100, momentum threshold
VOL_MULTIPLIER=2.0 # Minimum volume ratio (2.0 = 200%)
SCANNER_TOP_N=3 # Max qualified candidates per scan
```
Tests use in-memory SQLite (`DB_PATH=":memory:"`) and dummy credentials via `tests/conftest.py`.

View File

@@ -26,3 +26,41 @@
### 문서화
- 시스템 구조, 기능별 설명 등 코드 문서화 항상 신경쓸 것
- 새로운 기능 추가 시 관련 문서 업데이트 필수
---
## 2026-02-06
### Smart Volatility Scanner (Python-First, AI-Last 파이프라인)
**배경:**
- 정적 종목 리스트를 순회하는 방식은 비효율적
- KIS API 거래량 순위를 통해 시장 주도주를 자동 탐지해야 함
- Gemini API 호출 전에 Python 기반 기술적 분석으로 필터링 필요
**요구사항:**
1. KIS API 거래량 순위 API 통합 (`fetch_market_rankings`)
2. 일별 가격 히스토리 API 추가 (`get_daily_prices`)
3. RSI(14) 계산 기능 구현 (Wilder's smoothing method)
4. 필터 조건:
- 거래량 > 전일 대비 200% (VOL_MULTIPLIER)
- RSI < 30 (과매도) OR RSI > 70 (모멘텀)
5. 상위 1-3개 적격 종목만 Gemini에 전달
6. 종목 선정 배경(RSI, volume_ratio, signal, score) 데이터베이스 기록
**구현 결과:**
- `src/analysis/smart_scanner.py`: SmartVolatilityScanner 클래스
- `src/analysis/volatility.py`: calculate_rsi() 메서드 추가
- `src/broker/kis_api.py`: 2개 신규 API 메서드
- `src/db.py`: selection_context 컬럼 추가
- 설정 가능한 임계값: RSI_OVERSOLD_THRESHOLD, RSI_MOMENTUM_THRESHOLD, VOL_MULTIPLIER, SCANNER_TOP_N
**효과:**
- Gemini API 호출 20-30개 → 1-3개로 감소
- Python 기반 빠른 필터링 → 비용 절감
- 선정 기준 추적 → Evolution 시스템 최적화 가능
- API 장애 시 정적 watchlist로 자동 전환
**참고:** Realtime 모드 전용. Daily 모드는 배치 효율성을 위해 정적 watchlist 사용.
**이슈/PR:** #76, #77

View File

@@ -3,6 +3,7 @@
from __future__ import annotations
from src.analysis.scanner import MarketScanner
from src.analysis.smart_scanner import ScanCandidate, SmartVolatilityScanner
from src.analysis.volatility import VolatilityAnalyzer
__all__ = ["VolatilityAnalyzer", "MarketScanner"]
__all__ = ["VolatilityAnalyzer", "MarketScanner", "SmartVolatilityScanner", "ScanCandidate"]

View File

@@ -0,0 +1,192 @@
"""Smart Volatility Scanner with RSI and volume filters.
Fetches market rankings from KIS API and applies technical filters
to identify high-probability trading candidates.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import Any
from src.analysis.volatility import VolatilityAnalyzer
from src.broker.kis_api import KISBroker
from src.config import Settings
logger = logging.getLogger(__name__)
@dataclass
class ScanCandidate:
"""A qualified candidate from the smart scanner."""
stock_code: str
name: str
price: float
volume: float
volume_ratio: float # Current volume / previous day volume
rsi: float
signal: str # "oversold" or "momentum"
score: float # Composite score for ranking
class SmartVolatilityScanner:
"""Scans market rankings and applies RSI/volume filters.
Flow:
1. Fetch volume rankings from KIS API
2. For each ranked stock, fetch daily prices
3. Calculate RSI and volume ratio
4. Apply filters: volume > VOL_MULTIPLIER AND (RSI < 30 OR RSI > 70)
5. Return top N qualified candidates
"""
def __init__(
self,
broker: KISBroker,
volatility_analyzer: VolatilityAnalyzer,
settings: Settings,
) -> None:
"""Initialize the smart scanner.
Args:
broker: KIS broker for API calls
volatility_analyzer: Analyzer for RSI calculation
settings: Application settings
"""
self.broker = broker
self.analyzer = volatility_analyzer
self.settings = settings
# Extract scanner settings
self.rsi_oversold = settings.RSI_OVERSOLD_THRESHOLD
self.rsi_momentum = settings.RSI_MOMENTUM_THRESHOLD
self.vol_multiplier = settings.VOL_MULTIPLIER
self.top_n = settings.SCANNER_TOP_N
async def scan(
self,
fallback_stocks: list[str] | None = None,
) -> list[ScanCandidate]:
"""Execute smart scan and return qualified candidates.
Args:
fallback_stocks: Stock codes to use if ranking API fails
Returns:
List of ScanCandidate, sorted by score, up to top_n items
"""
# Step 1: Fetch rankings
try:
rankings = await self.broker.fetch_market_rankings(
ranking_type="volume",
limit=30, # Fetch more than needed for filtering
)
logger.info("Fetched %d stocks from volume rankings", len(rankings))
except ConnectionError as exc:
logger.warning("Ranking API failed, using fallback: %s", exc)
if fallback_stocks:
# Create minimal ranking data for fallback
rankings = [
{
"stock_code": code,
"name": code,
"price": 0,
"volume": 0,
"change_rate": 0,
"volume_increase_rate": 0,
}
for code in fallback_stocks
]
else:
return []
# Step 2: Analyze each stock
candidates: list[ScanCandidate] = []
for stock in rankings:
stock_code = stock["stock_code"]
if not stock_code:
continue
try:
# Fetch daily prices for RSI calculation
daily_prices = await self.broker.get_daily_prices(stock_code, days=20)
if len(daily_prices) < 15: # Need at least 14+1 for RSI
logger.debug("Insufficient price history for %s", stock_code)
continue
# Calculate RSI
close_prices = [p["close"] for p in daily_prices]
rsi = self.analyzer.calculate_rsi(close_prices, period=14)
# Calculate volume ratio (today vs previous day avg)
if len(daily_prices) >= 2:
prev_day_volume = daily_prices[-2]["volume"]
current_volume = stock.get("volume", 0) or daily_prices[-1]["volume"]
volume_ratio = (
current_volume / prev_day_volume if prev_day_volume > 0 else 1.0
)
else:
volume_ratio = stock.get("volume_increase_rate", 0) / 100 + 1 # Fallback
# Apply filters
volume_qualified = volume_ratio >= self.vol_multiplier
rsi_oversold = rsi < self.rsi_oversold
rsi_momentum = rsi > self.rsi_momentum
if volume_qualified and (rsi_oversold or rsi_momentum):
signal = "oversold" if rsi_oversold else "momentum"
# Calculate composite score
# Higher score for: extreme RSI + high volume
rsi_extremity = abs(rsi - 50) / 50 # 0-1 scale
volume_score = min(volume_ratio / 5, 1.0) # Cap at 5x
score = (rsi_extremity * 0.6 + volume_score * 0.4) * 100
candidates.append(
ScanCandidate(
stock_code=stock_code,
name=stock.get("name", stock_code),
price=stock.get("price", daily_prices[-1]["close"]),
volume=current_volume,
volume_ratio=volume_ratio,
rsi=rsi,
signal=signal,
score=score,
)
)
logger.info(
"Qualified: %s (%s) RSI=%.1f vol=%.1fx signal=%s score=%.1f",
stock_code,
stock.get("name", ""),
rsi,
volume_ratio,
signal,
score,
)
except ConnectionError as exc:
logger.warning("Failed to analyze %s: %s", stock_code, exc)
continue
except Exception as exc:
logger.error("Unexpected error analyzing %s: %s", stock_code, exc)
continue
# Sort by score and return top N
candidates.sort(key=lambda c: c.score, reverse=True)
return candidates[: self.top_n]
def get_stock_codes(self, candidates: list[ScanCandidate]) -> list[str]:
"""Extract stock codes from candidates for watchlist update.
Args:
candidates: List of scan candidates
Returns:
List of stock codes
"""
return [c.stock_code for c in candidates]

View File

@@ -124,6 +124,54 @@ class VolatilityAnalyzer:
return 1.0
return current_volume / avg_volume
def calculate_rsi(
self,
close_prices: list[float],
period: int = 14,
) -> float:
"""Calculate Relative Strength Index (RSI) using Wilder's smoothing.
Args:
close_prices: List of closing prices (oldest to newest, minimum period+1 values)
period: RSI period (default 14)
Returns:
RSI value between 0 and 100, or 50.0 (neutral) if insufficient data
Examples:
>>> analyzer = VolatilityAnalyzer()
>>> prices = [100 - i * 0.5 for i in range(20)] # Downtrend
>>> rsi = analyzer.calculate_rsi(prices)
>>> assert rsi < 50 # Oversold territory
"""
if len(close_prices) < period + 1:
return 50.0 # Neutral RSI if insufficient data
# Calculate price changes
changes = [close_prices[i] - close_prices[i - 1] for i in range(1, len(close_prices))]
# Separate gains and losses
gains = [max(0.0, change) for change in changes]
losses = [max(0.0, -change) for change in changes]
# Calculate initial average gain/loss (simple average for first period)
avg_gain = sum(gains[:period]) / period
avg_loss = sum(losses[:period]) / period
# Apply Wilder's smoothing for remaining periods
for i in range(period, len(changes)):
avg_gain = (avg_gain * (period - 1) + gains[i]) / period
avg_loss = (avg_loss * (period - 1) + losses[i]) / period
# Calculate RS and RSI
if avg_loss == 0:
return 100.0 # All gains, maximum RSI
rs = avg_gain / avg_loss
rsi = 100 - (100 / (1 + rs))
return rsi
def calculate_pv_divergence(
self,
price_change: float,

View File

@@ -280,3 +280,153 @@ class KISBroker:
return data
except (TimeoutError, aiohttp.ClientError) as exc:
raise ConnectionError(f"Network error sending order: {exc}") from exc
async def fetch_market_rankings(
self,
ranking_type: str = "volume",
limit: int = 30,
) -> list[dict[str, Any]]:
"""Fetch market rankings from KIS API.
Args:
ranking_type: Type of ranking ("volume" or "fluctuation")
limit: Maximum number of results to return
Returns:
List of stock data dicts with keys: stock_code, name, price, volume,
change_rate, volume_increase_rate
Raises:
ConnectionError: If API request fails
"""
await self._rate_limiter.acquire()
session = self._get_session()
# TR_ID for volume ranking
tr_id = "FHPST01710000" if ranking_type == "volume" else "FHPST01710100"
headers = await self._auth_headers(tr_id)
params = {
"FID_COND_MRKT_DIV_CODE": "J", # Stock/ETF/ETN
"FID_COND_SCR_DIV_CODE": "20001", # Volume surge
"FID_INPUT_ISCD": "0000", # All stocks
"FID_DIV_CLS_CODE": "0", # All types
"FID_BLNG_CLS_CODE": "0",
"FID_TRGT_CLS_CODE": "111111111",
"FID_TRGT_EXLS_CLS_CODE": "000000",
"FID_INPUT_PRICE_1": "0",
"FID_INPUT_PRICE_2": "0",
"FID_VOL_CNT": "0",
"FID_INPUT_DATE_1": "",
}
url = f"{self._base_url}/uapi/domestic-stock/v1/quotations/volume-rank"
try:
async with session.get(url, headers=headers, params=params) as resp:
if resp.status != 200:
text = await resp.text()
raise ConnectionError(
f"fetch_market_rankings failed ({resp.status}): {text}"
)
data = await resp.json()
# Parse response - output is a list of ranked stocks
def _safe_float(value: str | float | None, default: float = 0.0) -> float:
if value is None or value == "":
return default
try:
return float(value)
except (ValueError, TypeError):
return default
rankings = []
for item in data.get("output", [])[:limit]:
rankings.append({
"stock_code": item.get("mksc_shrn_iscd", ""),
"name": item.get("hts_kor_isnm", ""),
"price": _safe_float(item.get("stck_prpr", "0")),
"volume": _safe_float(item.get("acml_vol", "0")),
"change_rate": _safe_float(item.get("prdy_ctrt", "0")),
"volume_increase_rate": _safe_float(item.get("vol_inrt", "0")),
})
return rankings
except (TimeoutError, aiohttp.ClientError) as exc:
raise ConnectionError(f"Network error fetching rankings: {exc}") from exc
async def get_daily_prices(
self,
stock_code: str,
days: int = 20,
) -> list[dict[str, Any]]:
"""Fetch daily OHLCV price history for a stock.
Args:
stock_code: 6-digit stock code
days: Number of trading days to fetch (default 20 for RSI calculation)
Returns:
List of daily price dicts with keys: date, open, high, low, close, volume
Sorted oldest to newest
Raises:
ConnectionError: If API request fails
"""
await self._rate_limiter.acquire()
session = self._get_session()
headers = await self._auth_headers("FHKST03010100")
# Calculate date range (today and N days ago)
from datetime import datetime, timedelta
end_date = datetime.now().strftime("%Y%m%d")
start_date = (datetime.now() - timedelta(days=days + 10)).strftime("%Y%m%d")
params = {
"FID_COND_MRKT_DIV_CODE": "J",
"FID_INPUT_ISCD": stock_code,
"FID_INPUT_DATE_1": start_date,
"FID_INPUT_DATE_2": end_date,
"FID_PERIOD_DIV_CODE": "D", # Daily
"FID_ORG_ADJ_PRC": "0", # Adjusted price
}
url = f"{self._base_url}/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice"
try:
async with session.get(url, headers=headers, params=params) as resp:
if resp.status != 200:
text = await resp.text()
raise ConnectionError(
f"get_daily_prices failed ({resp.status}): {text}"
)
data = await resp.json()
# Parse response
def _safe_float(value: str | float | None, default: float = 0.0) -> float:
if value is None or value == "":
return default
try:
return float(value)
except (ValueError, TypeError):
return default
prices = []
for item in data.get("output2", []):
prices.append({
"date": item.get("stck_bsop_date", ""),
"open": _safe_float(item.get("stck_oprc", "0")),
"high": _safe_float(item.get("stck_hgpr", "0")),
"low": _safe_float(item.get("stck_lwpr", "0")),
"close": _safe_float(item.get("stck_clpr", "0")),
"volume": _safe_float(item.get("acml_vol", "0")),
})
# Sort oldest to newest (KIS returns newest first)
prices.reverse()
return prices[:days] # Return only requested number of days
except (TimeoutError, aiohttp.ClientError) as exc:
raise ConnectionError(f"Network error fetching daily prices: {exc}") from exc

View File

@@ -33,6 +33,12 @@ class Settings(BaseSettings):
FAT_FINGER_PCT: float = Field(default=30.0, gt=0.0, le=100.0)
CONFIDENCE_THRESHOLD: int = Field(default=80, ge=0, le=100)
# Smart Scanner Configuration
RSI_OVERSOLD_THRESHOLD: int = Field(default=30, ge=0, le=50)
RSI_MOMENTUM_THRESHOLD: int = Field(default=70, ge=50, le=100)
VOL_MULTIPLIER: float = Field(default=2.0, gt=1.0, le=10.0)
SCANNER_TOP_N: int = Field(default=3, ge=1, le=10)
# Database
DB_PATH: str = "data/trade_logs.db"
@@ -49,8 +55,15 @@ class Settings(BaseSettings):
DAILY_SESSIONS: int = Field(default=4, ge=1, le=10)
SESSION_INTERVAL_HOURS: int = Field(default=6, ge=1, le=24)
# Pre-Market Planner
PRE_MARKET_MINUTES: int = Field(default=30, ge=10, le=120)
MAX_SCENARIOS_PER_STOCK: int = Field(default=5, ge=1, le=10)
PLANNER_TIMEOUT_SECONDS: int = Field(default=60, ge=10, le=300)
DEFENSIVE_PLAYBOOK_ON_FAILURE: bool = True
RESCAN_INTERVAL_SECONDS: int = Field(default=300, ge=60, le=900)
# Market selection (comma-separated market codes)
ENABLED_MARKETS: str = "KR"
ENABLED_MARKETS: str = "KR,US"
# Backup and Disaster Recovery (optional)
BACKUP_ENABLED: bool = True

View File

@@ -2,6 +2,7 @@
from __future__ import annotations
import json
import sqlite3
from datetime import UTC, datetime
from pathlib import Path
@@ -38,6 +39,8 @@ def init_db(db_path: str) -> sqlite3.Connection:
conn.execute("ALTER TABLE trades ADD COLUMN market TEXT DEFAULT 'KR'")
if "exchange_code" not in columns:
conn.execute("ALTER TABLE trades ADD COLUMN exchange_code TEXT DEFAULT 'KRX'")
if "selection_context" not in columns:
conn.execute("ALTER TABLE trades ADD COLUMN selection_context TEXT")
# Context tree tables for multi-layered memory management
conn.execute(
@@ -118,15 +121,33 @@ def log_trade(
pnl: float = 0.0,
market: str = "KR",
exchange_code: str = "KRX",
selection_context: dict[str, any] | None = None,
) -> None:
"""Insert a trade record into the database."""
"""Insert a trade record into the database.
Args:
conn: Database connection
stock_code: Stock code
action: Trade action (BUY/SELL/HOLD)
confidence: Confidence level (0-100)
rationale: AI decision rationale
quantity: Number of shares
price: Trade price
pnl: Profit/loss
market: Market code
exchange_code: Exchange code
selection_context: Scanner selection data (RSI, volume_ratio, signal, score)
"""
# Serialize selection context to JSON
context_json = json.dumps(selection_context) if selection_context else None
conn.execute(
"""
INSERT INTO trades (
timestamp, stock_code, action, confidence, rationale,
quantity, price, pnl, market, exchange_code
quantity, price, pnl, market, exchange_code, selection_context
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
datetime.now(UTC).isoformat(),
@@ -139,6 +160,7 @@ def log_trade(
pnl,
market,
exchange_code,
context_json,
),
)
conn.commit()

View File

@@ -15,6 +15,7 @@ from datetime import UTC, datetime
from typing import Any
from src.analysis.scanner import MarketScanner
from src.analysis.smart_scanner import ScanCandidate, SmartVolatilityScanner
from src.analysis.volatility import VolatilityAnalyzer
from src.brain.gemini_client import GeminiClient
from src.broker.kis_api import KISBroker
@@ -62,14 +63,6 @@ def safe_float(value: str | float | None, default: float = 0.0) -> float:
return default
# Target stock codes to monitor per market
WATCHLISTS = {
"KR": ["005930", "000660", "035420"], # Samsung, SK Hynix, NAVER
"US_NASDAQ": ["AAPL", "MSFT", "GOOGL"], # Example US stocks
"US_NYSE": ["JPM", "BAC"], # Example NYSE stocks
"JP": ["7203", "6758"], # Toyota, Sony
}
TRADE_INTERVAL_SECONDS = 60
SCAN_INTERVAL_SECONDS = 60 # Scan markets every 60 seconds
MAX_CONNECTION_RETRIES = 3
@@ -78,15 +71,6 @@ MAX_CONNECTION_RETRIES = 3
DAILY_TRADE_SESSIONS = 4 # Number of trading sessions per day
TRADE_SESSION_INTERVAL_HOURS = 6 # Hours between sessions
# Full stock universe per market (for scanning)
# In production, this would be loaded from a database or API
STOCK_UNIVERSE = {
"KR": ["005930", "000660", "035420", "051910", "005380", "005490"],
"US_NASDAQ": ["AAPL", "MSFT", "GOOGL", "AMZN", "NVDA", "TSLA"],
"US_NYSE": ["JPM", "BAC", "XOM", "JNJ", "V"],
"JP": ["7203", "6758", "9984", "6861"],
}
async def trading_cycle(
broker: KISBroker,
@@ -100,6 +84,7 @@ async def trading_cycle(
telegram: TelegramClient,
market: MarketInfo,
stock_code: str,
scan_candidates: dict[str, ScanCandidate],
) -> None:
"""Execute one trading cycle for a single stock."""
cycle_start_time = asyncio.get_event_loop().time()
@@ -292,7 +277,17 @@ async def trading_cycle(
except Exception as exc:
logger.warning("Telegram notification failed: %s", exc)
# 6. Log trade
# 6. Log trade with selection context
selection_context = None
if stock_code in scan_candidates:
candidate = scan_candidates[stock_code]
selection_context = {
"rsi": candidate.rsi,
"volume_ratio": candidate.volume_ratio,
"signal": candidate.signal,
"score": candidate.score,
}
log_trade(
conn=db_conn,
stock_code=stock_code,
@@ -301,6 +296,7 @@ async def trading_cycle(
rationale=decision.rationale,
market=market.code,
exchange_code=market.exchange_code,
selection_context=selection_context,
)
# 7. Latency monitoring
@@ -336,6 +332,7 @@ async def run_daily_session(
criticality_assessor: CriticalityAssessor,
telegram: TelegramClient,
settings: Settings,
smart_scanner: SmartVolatilityScanner | None = None,
) -> None:
"""Execute one daily trading session.
@@ -355,15 +352,21 @@ async def run_daily_session(
# Process each open market
for market in open_markets:
# Get watchlist for this market
watchlist = WATCHLISTS.get(market.code, [])
# Dynamic stock discovery via scanner (no static watchlists)
try:
candidates = await smart_scanner.scan()
watchlist = [c.stock_code for c in candidates] if candidates else []
except Exception as exc:
logger.error("Smart Scanner failed for %s: %s", market.name, exc)
watchlist = []
if not watchlist:
logger.debug("No watchlist for market %s", market.code)
logger.info("No scanner candidates for market %s — skipping", market.code)
continue
logger.info("Processing market: %s (%d stocks)", market.name, len(watchlist))
# Collect market data for all stocks in the watchlist
# Collect market data for all stocks from scanner
stocks_data = []
for stock_code in watchlist:
try:
@@ -579,25 +582,10 @@ async def run(settings: Settings) -> None:
command_handler = TelegramCommandHandler(telegram)
# Register basic commands
async def handle_start() -> None:
"""Handle /start command."""
message = (
"<b>🤖 The Ouroboros Trading Bot</b>\n\n"
"AI-powered global stock trading agent with real-time notifications.\n\n"
"<b>Available commands:</b>\n"
"/help - Show this help message\n"
"/status - Current trading status\n"
"/positions - View holdings\n"
"/stop - Pause trading\n"
"/resume - Resume trading"
)
await telegram.send_message(message)
async def handle_help() -> None:
"""Handle /help command."""
message = (
"<b>📖 Available Commands</b>\n\n"
"/start - Welcome message\n"
"/help - Show available commands\n"
"/status - Trading status (mode, markets, P&L)\n"
"/positions - Current holdings\n"
@@ -639,11 +627,21 @@ async def run(settings: Settings) -> None:
# Get trading status
trading_status = "Active" if pause_trading.is_set() else "Paused"
# Get current P&L from risk manager
# Calculate P&L from balance data
try:
balance = await broker.get_balance()
current_pnl = risk.calculate_pnl(balance)
output2 = balance.get("output2", [{}])
if output2:
total_eval = safe_float(output2[0].get("tot_evlu_amt", "0"))
purchase_total = safe_float(output2[0].get("pchs_amt_smtl_amt", "0"))
current_pnl = (
((total_eval - purchase_total) / purchase_total * 100)
if purchase_total > 0
else 0.0
)
pnl_str = f"{current_pnl:+.2f}%"
else:
pnl_str = "N/A"
except Exception as exc:
logger.warning("Failed to get P&L: %s", exc)
pnl_str = "N/A"
@@ -657,7 +655,7 @@ async def run(settings: Settings) -> None:
f"<b>Markets:</b> {markets_str}\n"
f"<b>Trading:</b> {trading_status}\n\n"
f"<b>Current P&L:</b> {pnl_str}\n"
f"<b>Circuit Breaker:</b> {risk.circuit_breaker_threshold:.1f}%"
f"<b>Circuit Breaker:</b> {risk._cb_threshold:.1f}%"
)
await telegram.send_message(message)
@@ -668,52 +666,40 @@ async def run(settings: Settings) -> None:
)
async def handle_positions() -> None:
"""Handle /positions command - show current holdings."""
"""Handle /positions command - show account summary."""
try:
# Get account balance
balance = await broker.get_balance()
output2 = balance.get("output2", [{}])
# Check if there are any positions
if not balance.stocks:
if not output2:
await telegram.send_message(
"<b>💼 Current Holdings</b>\n\n"
"No positions currently held."
"<b>💼 Account Summary</b>\n\n"
"No balance information available."
)
return
# Group positions by market (domestic vs overseas)
domestic_positions = []
overseas_positions = []
# Extract account-level data
total_eval = safe_float(output2[0].get("tot_evlu_amt", "0"))
total_cash = safe_float(output2[0].get("dnca_tot_amt", "0"))
purchase_total = safe_float(output2[0].get("pchs_amt_smtl_amt", "0"))
for stock in balance.stocks:
position_str = (
f"{stock.code}: {stock.quantity} shares @ "
f"{stock.avg_price:,.0f}"
# Calculate P&L
pnl_pct = (
((total_eval - purchase_total) / purchase_total * 100)
if purchase_total > 0
else 0.0
)
pnl_sign = "+" if pnl_pct >= 0 else ""
# Simple heuristic: if code is 6 digits, it's domestic (Korea)
if len(stock.code) == 6 and stock.code.isdigit():
domestic_positions.append(position_str)
else:
overseas_positions.append(position_str)
# Build message
message_parts = ["<b>💼 Current Holdings</b>\n"]
if domestic_positions:
message_parts.append("\n🇰🇷 <b>Korea</b>")
message_parts.extend(domestic_positions)
if overseas_positions:
message_parts.append("\n🇺🇸 <b>Overseas</b>")
message_parts.extend(overseas_positions)
# Add total cash
message_parts.append(
f"\n<b>Cash:</b> ₩{balance.total_cash:,.0f}"
message = (
"<b>💼 Account Summary</b>\n\n"
f"<b>Total Evaluation:</b> ₩{total_eval:,.0f}\n"
f"<b>Available Cash:</b> ₩{total_cash:,.0f}\n"
f"<b>Purchase Total:</b> ₩{purchase_total:,.0f}\n"
f"<b>P&L:</b> {pnl_sign}{pnl_pct:.2f}%\n\n"
"<i>Note: Individual position details require API enhancement</i>"
)
message = "\n".join(message_parts)
await telegram.send_message(message)
except Exception as exc:
@@ -722,7 +708,6 @@ async def run(settings: Settings) -> None:
"<b>⚠️ Error</b>\n\nFailed to retrieve positions."
)
command_handler.register_command("start", handle_start)
command_handler.register_command("help", handle_help)
command_handler.register_command("stop", handle_stop)
command_handler.register_command("resume", handle_resume)
@@ -740,6 +725,19 @@ async def run(settings: Settings) -> None:
max_concurrent_scans=1, # Fully serialized to avoid EGW00201
)
# Initialize smart scanner (Python-first, AI-last pipeline)
smart_scanner = SmartVolatilityScanner(
broker=broker,
volatility_analyzer=volatility_analyzer,
settings=settings,
)
# Track scan candidates for selection context logging
scan_candidates: dict[str, ScanCandidate] = {} # stock_code -> candidate
# Active stocks per market (dynamically discovered by scanner)
active_stocks: dict[str, list[str]] = {} # market_code -> [stock_codes]
# Initialize latency control system
criticality_assessor = CriticalityAssessor(
critical_pnl_threshold=-2.5, # Near circuit breaker at -3.0%
@@ -812,6 +810,7 @@ async def run(settings: Settings) -> None:
criticality_assessor,
telegram,
settings,
smart_scanner=smart_scanner,
)
except CircuitBreakerTripped:
logger.critical("Circuit breaker tripped — shutting down")
@@ -885,49 +884,52 @@ async def run(settings: Settings) -> None:
logger.warning("Market open notification failed: %s", exc)
_market_states[market.code] = True
# Volatility Hunter: Scan market periodically to update watchlist
# Smart Scanner: dynamic stock discovery (no static watchlists)
now_timestamp = asyncio.get_event_loop().time()
last_scan = last_scan_time.get(market.code, 0.0)
if now_timestamp - last_scan >= SCAN_INTERVAL_SECONDS:
try:
# Scan all stocks in the universe
stock_universe = STOCK_UNIVERSE.get(market.code, [])
if stock_universe:
logger.info("Volatility Hunter: Scanning %s market", market.name)
scan_result = await market_scanner.scan_market(
market, stock_universe
logger.info("Smart Scanner: Scanning %s market", market.name)
candidates = await smart_scanner.scan()
if candidates:
# Use scanner results directly as trading candidates
active_stocks[market.code] = smart_scanner.get_stock_codes(
candidates
)
# Update watchlist with top movers
current_watchlist = WATCHLISTS.get(market.code, [])
updated_watchlist = market_scanner.get_updated_watchlist(
current_watchlist,
scan_result,
max_replacements=2,
)
WATCHLISTS[market.code] = updated_watchlist
# Store candidates for selection context logging
for candidate in candidates:
scan_candidates[candidate.stock_code] = candidate
logger.info(
"Volatility Hunter: Watchlist updated for %s (%d top movers, %d breakouts)",
"Smart Scanner: Found %d candidates for %s: %s",
len(candidates),
market.name,
len(scan_result.top_movers),
len(scan_result.breakouts),
[f"{c.stock_code}(RSI={c.rsi:.0f})" for c in candidates],
)
else:
logger.info(
"Smart Scanner: No candidates for %s — no trades", market.name
)
active_stocks[market.code] = []
last_scan_time[market.code] = now_timestamp
except Exception as exc:
logger.error("Volatility Hunter scan failed for %s: %s", market.name, exc)
# Get watchlist for this market
watchlist = WATCHLISTS.get(market.code, [])
if not watchlist:
logger.debug("No watchlist for market %s", market.code)
except Exception as exc:
logger.error("Smart Scanner failed for %s: %s", market.name, exc)
# Get active stocks from scanner (dynamic, no static fallback)
stock_codes = active_stocks.get(market.code, [])
if not stock_codes:
logger.debug("No active stocks for market %s", market.code)
continue
logger.info("Processing market: %s (%d stocks)", market.name, len(watchlist))
logger.info("Processing market: %s (%d stocks)", market.name, len(stock_codes))
# Process each stock in the watchlist
for stock_code in watchlist:
# Process each stock from scanner results
for stock_code in stock_codes:
if shutdown.is_set():
break
@@ -946,6 +948,7 @@ async def run(settings: Settings) -> None:
telegram,
market,
stock_code,
scan_candidates,
)
break # Success — exit retry loop
except CircuitBreakerTripped as exc:

View File

@@ -492,7 +492,8 @@ class TelegramCommandHandler:
if not command_parts:
return
command_name = command_parts[0]
# Remove @botname suffix if present (for group chats)
command_name = command_parts[0].split("@")[0]
# Execute handler
handler = self._commands.get(command_name)

View File

@@ -174,6 +174,7 @@ class TestTradingCycleTelegramIntegration:
telegram=mock_telegram,
market=mock_market,
stock_code="005930",
scan_candidates={},
)
# Verify notification was sent
@@ -216,6 +217,7 @@ class TestTradingCycleTelegramIntegration:
telegram=mock_telegram,
market=mock_market,
stock_code="005930",
scan_candidates={},
)
# Verify notification was attempted
@@ -257,6 +259,7 @@ class TestTradingCycleTelegramIntegration:
telegram=mock_telegram,
market=mock_market,
stock_code="005930",
scan_candidates={},
)
# Verify notification was sent
@@ -305,6 +308,7 @@ class TestTradingCycleTelegramIntegration:
telegram=mock_telegram,
market=mock_market,
stock_code="005930",
scan_candidates={},
)
# Verify notification was attempted
@@ -345,6 +349,7 @@ class TestTradingCycleTelegramIntegration:
telegram=mock_telegram,
market=mock_market,
stock_code="005930",
scan_candidates={},
)
# Verify no trade notification sent
@@ -543,6 +548,7 @@ class TestOverseasBalanceParsing:
telegram=mock_telegram,
market=mock_overseas_market,
stock_code="AAPL",
scan_candidates={},
)
# Verify balance API was called
@@ -577,6 +583,7 @@ class TestOverseasBalanceParsing:
telegram=mock_telegram,
market=mock_overseas_market,
stock_code="AAPL",
scan_candidates={},
)
# Verify balance API was called
@@ -611,6 +618,7 @@ class TestOverseasBalanceParsing:
telegram=mock_telegram,
market=mock_overseas_market,
stock_code="AAPL",
scan_candidates={},
)
# Verify balance API was called
@@ -645,6 +653,7 @@ class TestOverseasBalanceParsing:
telegram=mock_telegram,
market=mock_overseas_market,
stock_code="AAPL",
scan_candidates={},
)
# Verify price API was called

377
tests/test_smart_scanner.py Normal file
View File

@@ -0,0 +1,377 @@
"""Tests for SmartVolatilityScanner."""
from __future__ import annotations
import pytest
from unittest.mock import AsyncMock, MagicMock
from src.analysis.smart_scanner import ScanCandidate, SmartVolatilityScanner
from src.analysis.volatility import VolatilityAnalyzer
from src.broker.kis_api import KISBroker
from src.config import Settings
@pytest.fixture
def mock_settings() -> Settings:
"""Create test settings."""
return Settings(
KIS_APP_KEY="test",
KIS_APP_SECRET="test",
KIS_ACCOUNT_NO="12345678-01",
GEMINI_API_KEY="test",
RSI_OVERSOLD_THRESHOLD=30,
RSI_MOMENTUM_THRESHOLD=70,
VOL_MULTIPLIER=2.0,
SCANNER_TOP_N=3,
DB_PATH=":memory:",
)
@pytest.fixture
def mock_broker(mock_settings: Settings) -> MagicMock:
"""Create mock broker."""
broker = MagicMock(spec=KISBroker)
broker._settings = mock_settings
broker.fetch_market_rankings = AsyncMock()
broker.get_daily_prices = AsyncMock()
return broker
@pytest.fixture
def scanner(mock_broker: MagicMock, mock_settings: Settings) -> SmartVolatilityScanner:
"""Create smart scanner instance."""
analyzer = VolatilityAnalyzer()
return SmartVolatilityScanner(
broker=mock_broker,
volatility_analyzer=analyzer,
settings=mock_settings,
)
class TestSmartVolatilityScanner:
"""Test suite for SmartVolatilityScanner."""
@pytest.mark.asyncio
async def test_scan_finds_oversold_candidates(
self, scanner: SmartVolatilityScanner, mock_broker: MagicMock
) -> None:
"""Test that scanner identifies oversold stocks with high volume."""
# Mock rankings
mock_broker.fetch_market_rankings.return_value = [
{
"stock_code": "005930",
"name": "Samsung",
"price": 70000,
"volume": 5000000,
"change_rate": -3.5,
"volume_increase_rate": 250,
},
]
# Mock daily prices - trending down (oversold)
prices = []
for i in range(20):
prices.append({
"date": f"2026020{i:02d}",
"open": 75000 - i * 200,
"high": 75500 - i * 200,
"low": 74500 - i * 200,
"close": 75000 - i * 250, # Steady decline
"volume": 2000000,
})
mock_broker.get_daily_prices.return_value = prices
candidates = await scanner.scan()
# Should find at least one candidate (depending on exact RSI calculation)
mock_broker.fetch_market_rankings.assert_called_once()
mock_broker.get_daily_prices.assert_called_once_with("005930", days=20)
# If qualified, should have oversold signal
if candidates:
assert candidates[0].signal in ["oversold", "momentum"]
assert candidates[0].volume_ratio >= scanner.vol_multiplier
@pytest.mark.asyncio
async def test_scan_finds_momentum_candidates(
self, scanner: SmartVolatilityScanner, mock_broker: MagicMock
) -> None:
"""Test that scanner identifies momentum stocks with high volume."""
mock_broker.fetch_market_rankings.return_value = [
{
"stock_code": "035420",
"name": "NAVER",
"price": 250000,
"volume": 3000000,
"change_rate": 5.0,
"volume_increase_rate": 300,
},
]
# Mock daily prices - trending up (momentum)
prices = []
for i in range(20):
prices.append({
"date": f"2026020{i:02d}",
"open": 230000 + i * 500,
"high": 231000 + i * 500,
"low": 229000 + i * 500,
"close": 230500 + i * 500, # Steady rise
"volume": 1000000,
})
mock_broker.get_daily_prices.return_value = prices
candidates = await scanner.scan()
mock_broker.fetch_market_rankings.assert_called_once()
@pytest.mark.asyncio
async def test_scan_filters_low_volume(
self, scanner: SmartVolatilityScanner, mock_broker: MagicMock
) -> None:
"""Test that stocks with low volume ratio are filtered out."""
mock_broker.fetch_market_rankings.return_value = [
{
"stock_code": "000660",
"name": "SK Hynix",
"price": 150000,
"volume": 500000,
"change_rate": -5.0,
"volume_increase_rate": 50, # Only 50% increase (< 200%)
},
]
# Low volume
prices = []
for i in range(20):
prices.append({
"date": f"2026020{i:02d}",
"open": 150000 - i * 100,
"high": 151000 - i * 100,
"low": 149000 - i * 100,
"close": 150000 - i * 150, # Declining (would be oversold)
"volume": 1000000, # Current 500k < 2x prev day 1M
})
mock_broker.get_daily_prices.return_value = prices
candidates = await scanner.scan()
# Should be filtered out due to low volume ratio
assert len(candidates) == 0
@pytest.mark.asyncio
async def test_scan_filters_neutral_rsi(
self, scanner: SmartVolatilityScanner, mock_broker: MagicMock
) -> None:
"""Test that stocks with neutral RSI are filtered out."""
mock_broker.fetch_market_rankings.return_value = [
{
"stock_code": "051910",
"name": "LG Chem",
"price": 500000,
"volume": 3000000,
"change_rate": 0.5,
"volume_increase_rate": 300, # High volume
},
]
# Flat prices (neutral RSI ~50)
prices = []
for i in range(20):
prices.append({
"date": f"2026020{i:02d}",
"open": 500000 + (i % 2) * 100, # Small oscillation
"high": 500500,
"low": 499500,
"close": 500000 + (i % 2) * 50,
"volume": 1000000,
})
mock_broker.get_daily_prices.return_value = prices
candidates = await scanner.scan()
# Should be filtered out (RSI ~50, not < 30 or > 70)
assert len(candidates) == 0
@pytest.mark.asyncio
async def test_scan_uses_fallback_on_api_error(
self, scanner: SmartVolatilityScanner, mock_broker: MagicMock
) -> None:
"""Test fallback to static list when ranking API fails."""
mock_broker.fetch_market_rankings.side_effect = ConnectionError("API unavailable")
# Fallback stocks should still be analyzed
prices = []
for i in range(20):
prices.append({
"date": f"2026020{i:02d}",
"open": 50000 - i * 50,
"high": 51000 - i * 50,
"low": 49000 - i * 50,
"close": 50000 - i * 75, # Declining
"volume": 1000000,
})
mock_broker.get_daily_prices.return_value = prices
candidates = await scanner.scan(fallback_stocks=["005930", "000660"])
# Should not crash
assert isinstance(candidates, list)
@pytest.mark.asyncio
async def test_scan_returns_top_n_only(
self, scanner: SmartVolatilityScanner, mock_broker: MagicMock
) -> None:
"""Test that scan returns at most top_n candidates."""
# Return many stocks
mock_broker.fetch_market_rankings.return_value = [
{
"stock_code": f"00{i}000",
"name": f"Stock{i}",
"price": 10000 * i,
"volume": 5000000,
"change_rate": -10,
"volume_increase_rate": 500,
}
for i in range(1, 10)
]
# All oversold with high volume
def make_prices(code: str) -> list[dict]:
prices = []
for i in range(20):
prices.append({
"date": f"2026020{i:02d}",
"open": 10000 - i * 100,
"high": 10500 - i * 100,
"low": 9500 - i * 100,
"close": 10000 - i * 150,
"volume": 1000000,
})
return prices
mock_broker.get_daily_prices.side_effect = make_prices
candidates = await scanner.scan()
# Should respect top_n limit (3)
assert len(candidates) <= scanner.top_n
@pytest.mark.asyncio
async def test_scan_skips_insufficient_price_history(
self, scanner: SmartVolatilityScanner, mock_broker: MagicMock
) -> None:
"""Test that stocks with insufficient history are skipped."""
mock_broker.fetch_market_rankings.return_value = [
{
"stock_code": "005930",
"name": "Samsung",
"price": 70000,
"volume": 5000000,
"change_rate": -5.0,
"volume_increase_rate": 300,
},
]
# Only 5 days of data (need 15+ for RSI)
mock_broker.get_daily_prices.return_value = [
{
"date": f"2026020{i:02d}",
"open": 70000,
"high": 71000,
"low": 69000,
"close": 70000,
"volume": 2000000,
}
for i in range(5)
]
candidates = await scanner.scan()
# Should skip due to insufficient data
assert len(candidates) == 0
@pytest.mark.asyncio
async def test_get_stock_codes(
self, scanner: SmartVolatilityScanner
) -> None:
"""Test extraction of stock codes from candidates."""
candidates = [
ScanCandidate(
stock_code="005930",
name="Samsung",
price=70000,
volume=5000000,
volume_ratio=2.5,
rsi=28,
signal="oversold",
score=85.0,
),
ScanCandidate(
stock_code="035420",
name="NAVER",
price=250000,
volume=3000000,
volume_ratio=3.0,
rsi=75,
signal="momentum",
score=88.0,
),
]
codes = scanner.get_stock_codes(candidates)
assert codes == ["005930", "035420"]
class TestRSICalculation:
"""Test RSI calculation in VolatilityAnalyzer."""
def test_rsi_oversold(self) -> None:
"""Test RSI calculation for downtrending prices."""
analyzer = VolatilityAnalyzer()
# Steadily declining prices
prices = [100 - i * 0.5 for i in range(20)]
rsi = analyzer.calculate_rsi(prices, period=14)
assert rsi < 50 # Should be oversold territory
def test_rsi_overbought(self) -> None:
"""Test RSI calculation for uptrending prices."""
analyzer = VolatilityAnalyzer()
# Steadily rising prices
prices = [100 + i * 0.5 for i in range(20)]
rsi = analyzer.calculate_rsi(prices, period=14)
assert rsi > 50 # Should be overbought territory
def test_rsi_neutral(self) -> None:
"""Test RSI calculation for flat prices."""
analyzer = VolatilityAnalyzer()
# Flat prices with small oscillation
prices = [100 + (i % 2) * 0.1 for i in range(20)]
rsi = analyzer.calculate_rsi(prices, period=14)
assert 40 < rsi < 60 # Should be near neutral
def test_rsi_insufficient_data(self) -> None:
"""Test RSI returns neutral when insufficient data."""
analyzer = VolatilityAnalyzer()
prices = [100, 101, 102] # Only 3 prices, need 15+
rsi = analyzer.calculate_rsi(prices, period=14)
assert rsi == 50.0 # Default neutral
def test_rsi_all_gains(self) -> None:
"""Test RSI returns 100 when all gains (no losses)."""
analyzer = VolatilityAnalyzer()
# Monotonic increase
prices = [100 + i for i in range(20)]
rsi = analyzer.calculate_rsi(prices, period=14)
assert rsi == 100.0 # Maximum RSI

View File

@@ -230,6 +230,31 @@ class TestUpdateHandling:
await handler._handle_update(update)
assert executed is False
@pytest.mark.asyncio
async def test_handle_command_with_botname(self) -> None:
"""Commands with @botname suffix are handled correctly."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
executed = False
async def test_command() -> None:
nonlocal executed
executed = True
handler.register_command("start", test_command)
update = {
"update_id": 1,
"message": {
"chat": {"id": 456},
"text": "/start@mybot",
},
}
await handler._handle_update(update)
assert executed is True
@pytest.mark.asyncio
async def test_handle_update_error_isolation(self) -> None:
"""Errors in handlers don't crash the system."""
@@ -524,7 +549,7 @@ class TestStatusCommands:
@pytest.mark.asyncio
async def test_positions_command_shows_holdings(self) -> None:
"""Positions command displays current holdings."""
"""Positions command displays account summary."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
@@ -536,12 +561,12 @@ class TestStatusCommands:
async def mock_positions() -> None:
"""Mock /positions handler."""
message = (
"<b>💼 Current Holdings</b>\n"
"\n🇰🇷 <b>Korea</b>\n"
"• 005930: 10 shares @ 70,000\n"
"\n🇺🇸 <b>Overseas</b>\n"
"• AAPL: 15 shares @ 175\n"
"\n<b>Cash:</b> ₩5,000,000"
"<b>💼 Account Summary</b>\n\n"
"<b>Total Evaluation:</b> ₩10,500,000\n"
"<b>Available Cash:</b> ₩5,000,000\n"
"<b>Purchase Total:</b> ₩10,000,000\n"
"<b>P&L:</b> +5.00%\n\n"
"<i>Note: Individual position details require API enhancement</i>"
)
await client.send_message(message)
@@ -561,8 +586,9 @@ class TestStatusCommands:
# Verify message was sent
assert mock_post.call_count == 1
payload = mock_post.call_args.kwargs["json"]
assert "Current Holdings" in payload["text"]
assert "shares" in payload["text"]
assert "Account Summary" in payload["text"]
assert "Total Evaluation" in payload["text"]
assert "P&L" in payload["text"]
@pytest.mark.asyncio
async def test_positions_command_empty_holdings(self) -> None:
@@ -578,8 +604,8 @@ class TestStatusCommands:
async def mock_positions_empty() -> None:
"""Mock /positions handler with no positions."""
message = (
"<b>💼 Current Holdings</b>\n\n"
"No positions currently held."
"<b>💼 Account Summary</b>\n\n"
"No balance information available."
)
await client.send_message(message)
@@ -598,7 +624,7 @@ class TestStatusCommands:
# Verify message was sent
payload = mock_post.call_args.kwargs["json"]
assert "No positions" in payload["text"]
assert "No balance information available" in payload["text"]
@pytest.mark.asyncio
async def test_positions_command_error_handling(self) -> None:
@@ -638,51 +664,6 @@ class TestStatusCommands:
class TestBasicCommands:
"""Test basic command implementations."""
@pytest.mark.asyncio
async def test_start_command_content(self) -> None:
"""Start command contains welcome message and command list."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
async def mock_start() -> None:
"""Mock /start handler."""
message = (
"<b>🤖 The Ouroboros Trading Bot</b>\n\n"
"AI-powered global stock trading agent with real-time notifications.\n\n"
"<b>Available commands:</b>\n"
"/help - Show this help message\n"
"/status - Current trading status\n"
"/positions - View holdings\n"
"/stop - Pause trading\n"
"/resume - Resume trading"
)
await client.send_message(message)
handler.register_command("start", mock_start)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
update = {
"update_id": 1,
"message": {
"chat": {"id": 456},
"text": "/start",
},
}
await handler._handle_update(update)
# Verify message was sent
assert mock_post.call_count == 1
payload = mock_post.call_args.kwargs["json"]
assert "Ouroboros Trading Bot" in payload["text"]
assert "/help" in payload["text"]
assert "/status" in payload["text"]
@pytest.mark.asyncio
async def test_help_command_content(self) -> None:
"""Help command lists all available commands."""
@@ -698,7 +679,6 @@ class TestBasicCommands:
"""Mock /help handler."""
message = (
"<b>📖 Available Commands</b>\n\n"
"/start - Welcome message\n"
"/help - Show available commands\n"
"/status - Trading status (mode, markets, P&L)\n"
"/positions - Current holdings\n"
@@ -724,7 +704,6 @@ class TestBasicCommands:
assert mock_post.call_count == 1
payload = mock_post.call_args.kwargs["json"]
assert "Available Commands" in payload["text"]
assert "/start" in payload["text"]
assert "/help" in payload["text"]
assert "/status" in payload["text"]
assert "/positions" in payload["text"]