feat: unify domestic scanner and sizing; update docs
Some checks failed
CI / test (pull_request) Has been cancelled

This commit is contained in:
agentson
2026-02-17 06:15:20 +09:00
parent 0659cc0aca
commit 733e6b36e9
6 changed files with 284 additions and 264 deletions

View File

@@ -87,12 +87,10 @@ High-frequency trading with individual stock analysis:
**SmartVolatilityScanner** (`smart_scanner.py`) — Python-first filtering pipeline **SmartVolatilityScanner** (`smart_scanner.py`) — Python-first filtering pipeline
- **Domestic (KR)**: - **Domestic (KR)**:
- **Step 1**: Fetch volume rankings from KIS API (top 30 stocks) - **Step 1**: Fetch domestic fluctuation ranking as primary universe
- **Step 2**: Calculate RSI and volume ratio for each stock - **Step 2**: Fetch domestic volume ranking for liquidity bonus
- **Step 3**: Apply filters: - **Step 3**: Compute volatility-first score (max of daily change% and intraday range%)
- Volume ratio >= `VOL_MULTIPLIER` (default 2.0x previous day) - **Step 4**: Apply liquidity bonus and return top N candidates
- RSI < `RSI_OVERSOLD_THRESHOLD` (30) OR RSI > `RSI_MOMENTUM_THRESHOLD` (70)
- **Step 4**: Score candidates by RSI extremity (60%) + volume surge (40%)
- **Overseas (US/JP/HK/CN/VN)**: - **Overseas (US/JP/HK/CN/VN)**:
- **Step 1**: Fetch overseas ranking universe (fluctuation rank + volume rank bonus) - **Step 1**: Fetch overseas ranking universe (fluctuation rank + volume rank bonus)
- **Step 2**: Compute volatility-first score (max of daily change% and intraday range%) - **Step 2**: Compute volatility-first score (max of daily change% and intraday range%)
@@ -102,6 +100,11 @@ High-frequency trading with individual stock analysis:
from runtime active symbols + recent traded symbols + current holdings (no static watchlist) from runtime active symbols + recent traded symbols + current holdings (no static watchlist)
- **Realtime mode only**: Daily mode uses batch processing for API efficiency - **Realtime mode only**: Daily mode uses batch processing for API efficiency
**Benefits:**
- Reduces Gemini API calls from 20-30 stocks to 1-3 qualified candidates
- Fast Python-based filtering before expensive AI judgment
- Logs selection context (RSI-compatible proxy, volume_ratio, signal, score) for Evolution system
### 3. Brain (`src/brain/`) ### 3. Brain (`src/brain/`)
**GeminiClient** (`gemini_client.py`) — AI decision engine powered by Google Gemini **GeminiClient** (`gemini_client.py`) — AI decision engine powered by Google Gemini
@@ -581,6 +584,18 @@ NEWS_API_KEY=...
NEWS_API_PROVIDER=... NEWS_API_PROVIDER=...
MARKET_DATA_API_KEY=... MARKET_DATA_API_KEY=...
# Position Sizing (optional)
POSITION_SIZING_ENABLED=true
POSITION_BASE_ALLOCATION_PCT=5.0
POSITION_MIN_ALLOCATION_PCT=1.0
POSITION_MAX_ALLOCATION_PCT=10.0
POSITION_VOLATILITY_TARGET_SCORE=50.0
# Legacy/compat scanner thresholds (kept for backward compatibility)
RSI_OVERSOLD_THRESHOLD=30
RSI_MOMENTUM_THRESHOLD=70
VOL_MULTIPLIER=2.0
# Overseas Ranking API (optional override; account-dependent) # Overseas Ranking API (optional override; account-dependent)
OVERSEAS_RANKING_ENABLED=true OVERSEAS_RANKING_ENABLED=true
OVERSEAS_RANKING_FLUCT_TR_ID=HHDFS76200100 OVERSEAS_RANKING_FLUCT_TR_ID=HHDFS76200100

View File

@@ -139,3 +139,29 @@
- 해외 시장에서 스캐너 후보 0개로 정지되는 상황 완화 - 해외 시장에서 스캐너 후보 0개로 정지되는 상황 완화
- 종목 선정 기준이 단순 상승률 중심에서 변동성 중심으로 개선 - 종목 선정 기준이 단순 상승률 중심에서 변동성 중심으로 개선
- 고정 티커 없이도 시장 주도 변동 종목 탐지 가능 - 고정 티커 없이도 시장 주도 변동 종목 탐지 가능
### 국내 스캐너/주문수량 정렬: 변동성 우선 + 리스크 타기팅
**배경:**
- 해외만 변동성 우선으로 동작하고, 국내는 RSI/거래량 필터 중심으로 동작해 시장 간 전략 일관성이 낮았음
- 매수 수량이 고정 1주라서 변동성 구간별 익스포저 관리가 어려웠음
**요구사항:**
1. 국내 스캐너도 변동성 우선 선별로 해외와 통일
2. 고변동 종목일수록 포지션 크기를 줄이는 수량 산식 적용
**구현 결과:**
- `src/analysis/smart_scanner.py`
- 국내: `fluctuation ranking + volume ranking bonus` 기반 점수화로 전환
- 점수는 `max(abs(change_rate), intraday_range_pct)` 중심으로 계산
- 국내 랭킹 응답 스키마 키(`price`, `change_rate`, `volume`) 파싱 보강
- `src/main.py`
- `_determine_order_quantity()` 추가
- BUY 시 변동성 점수 기반 동적 수량 산정 적용
- `trading_cycle`, `run_daily_session` 경로 모두 동일 수량 로직 사용
- `src/config.py`
- `POSITION_SIZING_*` 설정 추가
**효과:**
- 국내/해외 스캐너 기준이 변동성 중심으로 일관화
- 고변동 구간에서 자동 익스포저 축소, 저변동 구간에서 과소진입 완화

View File

@@ -1,8 +1,4 @@
"""Smart Volatility Scanner with RSI and volume filters. """Smart Volatility Scanner with volatility-first market ranking logic."""
Fetches market rankings from KIS API and applies technical filters
to identify high-probability trading candidates.
"""
from __future__ import annotations from __future__ import annotations
@@ -34,14 +30,13 @@ class ScanCandidate:
class SmartVolatilityScanner: class SmartVolatilityScanner:
"""Scans market rankings and applies RSI/volume filters. """Scans market rankings and applies volatility-first filters.
Flow: Flow:
1. Fetch volume rankings from KIS API 1. Fetch fluctuation rankings as primary universe
2. For each ranked stock, fetch daily prices 2. Fetch volume rankings for liquidity bonus
3. Calculate RSI and volume ratio 3. Score by volatility first, liquidity second
4. Apply filters: volume > VOL_MULTIPLIER AND (RSI < 30 OR RSI > 70) 4. Return top N qualified candidates
5. Return top N qualified candidates
""" """
def __init__( def __init__(
@@ -92,99 +87,109 @@ class SmartVolatilityScanner:
self, self,
fallback_stocks: list[str] | None = None, fallback_stocks: list[str] | None = None,
) -> list[ScanCandidate]: ) -> list[ScanCandidate]:
"""Scan domestic market using ranking API + RSI/volume filters.""" """Scan domestic market using volatility-first ranking + liquidity bonus."""
# Step 1: Fetch rankings # 1) Primary universe from fluctuation ranking.
try: try:
rankings = await self.broker.fetch_market_rankings( fluct_rows = await self.broker.fetch_market_rankings(
ranking_type="volume", ranking_type="fluctuation",
limit=30, # Fetch more than needed for filtering limit=50,
) )
logger.info("Fetched %d stocks from volume rankings", len(rankings))
except ConnectionError as exc: except ConnectionError as exc:
logger.warning("Ranking API failed, using fallback: %s", exc) logger.warning("Domestic fluctuation ranking failed: %s", exc)
if fallback_stocks: fluct_rows = []
# Create minimal ranking data for fallback
rankings = [ # 2) Liquidity bonus from volume ranking.
try:
volume_rows = await self.broker.fetch_market_rankings(
ranking_type="volume",
limit=50,
)
except ConnectionError as exc:
logger.warning("Domestic volume ranking failed: %s", exc)
volume_rows = []
if not fluct_rows and fallback_stocks:
logger.info(
"Domestic ranking unavailable; using fallback symbols (%d)",
len(fallback_stocks),
)
fluct_rows = [
{ {
"stock_code": code, "stock_code": code,
"name": code, "name": code,
"price": 0, "price": 0.0,
"volume": 0, "volume": 0.0,
"change_rate": 0, "change_rate": 0.0,
"volume_increase_rate": 0, "volume_increase_rate": 0.0,
} }
for code in fallback_stocks for code in fallback_stocks
] ]
else:
if not fluct_rows:
return [] return []
# Step 2: Analyze each stock volume_rank_bonus: dict[str, float] = {}
candidates: list[ScanCandidate] = [] for idx, row in enumerate(volume_rows):
code = _extract_stock_code(row)
if not code:
continue
volume_rank_bonus[code] = max(0.0, 15.0 - idx * 0.3)
for stock in rankings: candidates: list[ScanCandidate] = []
stock_code = stock["stock_code"] for stock in fluct_rows:
stock_code = _extract_stock_code(stock)
if not stock_code: if not stock_code:
continue continue
try: try:
# Fetch daily prices for RSI calculation price = _extract_last_price(stock)
daily_prices = await self.broker.get_daily_prices(stock_code, days=20) change_rate = _extract_change_rate_pct(stock)
volume = _extract_volume(stock)
if len(daily_prices) < 15: # Need at least 14+1 for RSI intraday_range_pct = 0.0
logger.debug("Insufficient price history for %s", stock_code) volume_ratio = _safe_float(stock.get("volume_increase_rate"), 0.0) / 100.0 + 1.0
# Use daily chart to refine range/volume when available.
daily_prices = await self.broker.get_daily_prices(stock_code, days=2)
if daily_prices:
latest = daily_prices[-1]
latest_close = _safe_float(latest.get("close"), default=price)
if price <= 0:
price = latest_close
latest_high = _safe_float(latest.get("high"))
latest_low = _safe_float(latest.get("low"))
if latest_close > 0 and latest_high > 0 and latest_low > 0 and latest_high >= latest_low:
intraday_range_pct = (latest_high - latest_low) / latest_close * 100.0
if volume <= 0:
volume = _safe_float(latest.get("volume"))
if len(daily_prices) >= 2:
prev_day_volume = _safe_float(daily_prices[-2].get("volume"))
if prev_day_volume > 0:
volume_ratio = max(volume_ratio, volume / prev_day_volume)
volatility_pct = max(abs(change_rate), intraday_range_pct)
if price <= 0 or volatility_pct < 0.8:
continue continue
# Calculate RSI volatility_score = min(volatility_pct / 10.0, 1.0) * 85.0
close_prices = [p["close"] for p in daily_prices] liquidity_score = volume_rank_bonus.get(stock_code, 0.0)
rsi = self.analyzer.calculate_rsi(close_prices, period=14) score = min(100.0, volatility_score + liquidity_score)
signal = "momentum" if change_rate >= 0 else "oversold"
# Calculate volume ratio (today vs previous day avg) implied_rsi = max(0.0, min(100.0, 50.0 + (change_rate * 4.0)))
if len(daily_prices) >= 2:
prev_day_volume = daily_prices[-2]["volume"]
current_volume = stock.get("volume", 0) or daily_prices[-1]["volume"]
volume_ratio = (
current_volume / prev_day_volume if prev_day_volume > 0 else 1.0
)
else:
volume_ratio = stock.get("volume_increase_rate", 0) / 100 + 1 # Fallback
# Apply filters
volume_qualified = volume_ratio >= self.vol_multiplier
rsi_oversold = rsi < self.rsi_oversold
rsi_momentum = rsi > self.rsi_momentum
if volume_qualified and (rsi_oversold or rsi_momentum):
signal = "oversold" if rsi_oversold else "momentum"
# Calculate composite score
# Higher score for: extreme RSI + high volume
rsi_extremity = abs(rsi - 50) / 50 # 0-1 scale
volume_score = min(volume_ratio / 5, 1.0) # Cap at 5x
score = (rsi_extremity * 0.6 + volume_score * 0.4) * 100
candidates.append( candidates.append(
ScanCandidate( ScanCandidate(
stock_code=stock_code, stock_code=stock_code,
name=stock.get("name", stock_code), name=stock.get("name", stock_code),
price=stock.get("price", daily_prices[-1]["close"]), price=price,
volume=current_volume, volume=volume,
volume_ratio=volume_ratio, volume_ratio=max(1.0, volume_ratio, volatility_pct / 2.0),
rsi=rsi, rsi=implied_rsi,
signal=signal, signal=signal,
score=score, score=score,
) )
) )
logger.info(
"Qualified: %s (%s) RSI=%.1f vol=%.1fx signal=%s score=%.1f",
stock_code,
stock.get("name", ""),
rsi,
volume_ratio,
signal,
score,
)
except ConnectionError as exc: except ConnectionError as exc:
logger.warning("Failed to analyze %s: %s", stock_code, exc) logger.warning("Failed to analyze %s: %s", stock_code, exc)
continue continue
@@ -192,7 +197,7 @@ class SmartVolatilityScanner:
logger.error("Unexpected error analyzing %s: %s", stock_code, exc) logger.error("Unexpected error analyzing %s: %s", stock_code, exc)
continue continue
# Sort by score and return top N logger.info("Domestic ranking scan found %d candidates", len(candidates))
candidates.sort(key=lambda c: c.score, reverse=True) candidates.sort(key=lambda c: c.score, reverse=True)
return candidates[: self.top_n] return candidates[: self.top_n]
@@ -390,6 +395,7 @@ def _extract_last_price(row: dict[str, Any]) -> float:
row.get("last") row.get("last")
or row.get("ovrs_nmix_prpr") or row.get("ovrs_nmix_prpr")
or row.get("stck_prpr") or row.get("stck_prpr")
or row.get("price")
or row.get("close") or row.get("close")
) )
@@ -398,6 +404,7 @@ def _extract_change_rate_pct(row: dict[str, Any]) -> float:
"""Extract daily change rate (%) from API schema variants.""" """Extract daily change rate (%) from API schema variants."""
return _safe_float( return _safe_float(
row.get("rate") row.get("rate")
or row.get("change_rate")
or row.get("prdy_ctrt") or row.get("prdy_ctrt")
or row.get("evlu_pfls_rt") or row.get("evlu_pfls_rt")
or row.get("chg_rt") or row.get("chg_rt")
@@ -406,7 +413,9 @@ def _extract_change_rate_pct(row: dict[str, Any]) -> float:
def _extract_volume(row: dict[str, Any]) -> float: def _extract_volume(row: dict[str, Any]) -> float:
"""Extract volume/traded-amount proxy from schema variants.""" """Extract volume/traded-amount proxy from schema variants."""
return _safe_float(row.get("tvol") or row.get("acml_vol") or row.get("vol")) return _safe_float(
row.get("tvol") or row.get("acml_vol") or row.get("vol") or row.get("volume")
)
def _extract_intraday_range_pct(row: dict[str, Any], price: float) -> float: def _extract_intraday_range_pct(row: dict[str, Any], price: float) -> float:

View File

@@ -38,6 +38,11 @@ class Settings(BaseSettings):
RSI_MOMENTUM_THRESHOLD: int = Field(default=70, ge=50, le=100) RSI_MOMENTUM_THRESHOLD: int = Field(default=70, ge=50, le=100)
VOL_MULTIPLIER: float = Field(default=2.0, gt=1.0, le=10.0) VOL_MULTIPLIER: float = Field(default=2.0, gt=1.0, le=10.0)
SCANNER_TOP_N: int = Field(default=3, ge=1, le=10) SCANNER_TOP_N: int = Field(default=3, ge=1, le=10)
POSITION_SIZING_ENABLED: bool = True
POSITION_BASE_ALLOCATION_PCT: float = Field(default=5.0, gt=0.0, le=30.0)
POSITION_MIN_ALLOCATION_PCT: float = Field(default=1.0, gt=0.0, le=20.0)
POSITION_MAX_ALLOCATION_PCT: float = Field(default=10.0, gt=0.0, le=50.0)
POSITION_VOLATILITY_TARGET_SCORE: float = Field(default=50.0, gt=0.0, le=100.0)
# Database # Database
DB_PATH: str = "data/trade_logs.db" DB_PATH: str = "data/trade_logs.db"

View File

@@ -106,6 +106,41 @@ def _extract_symbol_from_holding(item: dict[str, Any]) -> str:
return "" return ""
def _determine_order_quantity(
*,
action: str,
current_price: float,
total_cash: float,
candidate: ScanCandidate | None,
settings: Settings | None,
) -> int:
"""Determine order quantity using volatility-aware position sizing."""
if action != "BUY":
return 1
if current_price <= 0 or total_cash <= 0:
return 0
if settings is None or not settings.POSITION_SIZING_ENABLED:
return 1
target_score = max(1.0, settings.POSITION_VOLATILITY_TARGET_SCORE)
observed_score = candidate.score if candidate else target_score
observed_score = max(1.0, min(100.0, observed_score))
# Higher observed volatility score => smaller allocation.
scaled_pct = settings.POSITION_BASE_ALLOCATION_PCT * (target_score / observed_score)
allocation_pct = min(
settings.POSITION_MAX_ALLOCATION_PCT,
max(settings.POSITION_MIN_ALLOCATION_PCT, scaled_pct),
)
budget = total_cash * (allocation_pct / 100.0)
quantity = int(budget // current_price)
if quantity <= 0:
return 0
return quantity
async def build_overseas_symbol_universe( async def build_overseas_symbol_universe(
db_conn: Any, db_conn: Any,
overseas_broker: OverseasBroker, overseas_broker: OverseasBroker,
@@ -162,6 +197,7 @@ async def trading_cycle(
market: MarketInfo, market: MarketInfo,
stock_code: str, stock_code: str,
scan_candidates: dict[str, dict[str, ScanCandidate]], scan_candidates: dict[str, dict[str, ScanCandidate]],
settings: Settings | None = None,
) -> None: ) -> None:
"""Execute one trading cycle for a single stock.""" """Execute one trading cycle for a single stock."""
cycle_start_time = asyncio.get_event_loop().time() cycle_start_time = asyncio.get_event_loop().time()
@@ -399,8 +435,23 @@ async def trading_cycle(
trade_price = current_price trade_price = current_price
trade_pnl = 0.0 trade_pnl = 0.0
if decision.action in ("BUY", "SELL"): if decision.action in ("BUY", "SELL"):
# Determine order size (simplified: 1 lot) quantity = _determine_order_quantity(
quantity = 1 action=decision.action,
current_price=current_price,
total_cash=total_cash,
candidate=candidate,
settings=settings,
)
if quantity <= 0:
logger.info(
"Skip %s %s (%s): no affordable quantity (cash=%.2f, price=%.2f)",
decision.action,
stock_code,
market.name,
total_cash,
current_price,
)
return
order_amount = current_price * quantity order_amount = current_price * quantity
# 4. Risk check BEFORE order # 4. Risk check BEFORE order
@@ -766,7 +817,23 @@ async def run_daily_session(
trade_price = stock_data["current_price"] trade_price = stock_data["current_price"]
trade_pnl = 0.0 trade_pnl = 0.0
if decision.action in ("BUY", "SELL"): if decision.action in ("BUY", "SELL"):
quantity = 1 quantity = _determine_order_quantity(
action=decision.action,
current_price=stock_data["current_price"],
total_cash=total_cash,
candidate=candidate_map.get(stock_code),
settings=settings,
)
if quantity <= 0:
logger.info(
"Skip %s %s (%s): no affordable quantity (cash=%.2f, price=%.2f)",
decision.action,
stock_code,
market.name,
total_cash,
stock_data["current_price"],
)
continue
order_amount = stock_data["current_price"] * quantity order_amount = stock_data["current_price"] * quantity
# Risk check # Risk check
@@ -1672,6 +1739,7 @@ async def run(settings: Settings) -> None:
market, market,
stock_code, stock_code,
scan_candidates, scan_candidates,
settings,
) )
break # Success — exit retry loop break # Success — exit retry loop
except CircuitBreakerTripped as exc: except CircuitBreakerTripped as exc:

View File

@@ -63,52 +63,51 @@ class TestSmartVolatilityScanner:
"""Test suite for SmartVolatilityScanner.""" """Test suite for SmartVolatilityScanner."""
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_scan_finds_oversold_candidates( async def test_scan_domestic_prefers_volatility_with_liquidity_bonus(
self, scanner: SmartVolatilityScanner, mock_broker: MagicMock self, scanner: SmartVolatilityScanner, mock_broker: MagicMock
) -> None: ) -> None:
"""Test that scanner identifies oversold stocks with high volume.""" """Domestic scan should score by volatility first and volume rank second."""
# Mock rankings fluctuation_rows = [
mock_broker.fetch_market_rankings.return_value = [
{ {
"stock_code": "005930", "stock_code": "005930",
"name": "Samsung", "name": "Samsung",
"price": 70000, "price": 70000,
"volume": 5000000, "volume": 5000000,
"change_rate": -3.5, "change_rate": -5.0,
"volume_increase_rate": 250, "volume_increase_rate": 250,
}, },
{
"stock_code": "035420",
"name": "NAVER",
"price": 250000,
"volume": 3000000,
"change_rate": 3.0,
"volume_increase_rate": 200,
},
]
volume_rows = [
{"stock_code": "035420", "name": "NAVER", "price": 250000, "volume": 3000000},
{"stock_code": "005930", "name": "Samsung", "price": 70000, "volume": 5000000},
]
mock_broker.fetch_market_rankings.side_effect = [fluctuation_rows, volume_rows]
mock_broker.get_daily_prices.return_value = [
{"open": 1, "high": 1, "low": 1, "close": 1, "volume": 1000000},
{"open": 1, "high": 1, "low": 1, "close": 1, "volume": 1000000},
] ]
# Mock daily prices - trending down (oversold)
prices = []
for i in range(20):
prices.append({
"date": f"2026020{i:02d}",
"open": 75000 - i * 200,
"high": 75500 - i * 200,
"low": 74500 - i * 200,
"close": 75000 - i * 250, # Steady decline
"volume": 2000000,
})
mock_broker.get_daily_prices.return_value = prices
candidates = await scanner.scan() candidates = await scanner.scan()
# Should find at least one candidate (depending on exact RSI calculation) assert len(candidates) >= 1
mock_broker.fetch_market_rankings.assert_called_once() # Samsung has higher absolute move, so it should lead despite lower volume rank bonus.
mock_broker.get_daily_prices.assert_called_once_with("005930", days=20) assert candidates[0].stock_code == "005930"
assert candidates[0].signal == "oversold"
# If qualified, should have oversold signal
if candidates:
assert candidates[0].signal in ["oversold", "momentum"]
assert candidates[0].volume_ratio >= scanner.vol_multiplier
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_scan_finds_momentum_candidates( async def test_scan_domestic_finds_momentum_candidate(
self, scanner: SmartVolatilityScanner, mock_broker: MagicMock self, scanner: SmartVolatilityScanner, mock_broker: MagicMock
) -> None: ) -> None:
"""Test that scanner identifies momentum stocks with high volume.""" """Positive change should be represented as momentum signal."""
mock_broker.fetch_market_rankings.return_value = [ fluctuation_rows = [
{ {
"stock_code": "035420", "stock_code": "035420",
"name": "NAVER", "name": "NAVER",
@@ -118,124 +117,67 @@ class TestSmartVolatilityScanner:
"volume_increase_rate": 300, "volume_increase_rate": 300,
}, },
] ]
mock_broker.fetch_market_rankings.side_effect = [fluctuation_rows, fluctuation_rows]
# Mock daily prices - trending up (momentum) mock_broker.get_daily_prices.return_value = [
prices = [] {"open": 1, "high": 1, "low": 1, "close": 1, "volume": 1000000},
for i in range(20): {"open": 1, "high": 1, "low": 1, "close": 1, "volume": 1000000},
prices.append({ ]
"date": f"2026020{i:02d}",
"open": 230000 + i * 500,
"high": 231000 + i * 500,
"low": 229000 + i * 500,
"close": 230500 + i * 500, # Steady rise
"volume": 1000000,
})
mock_broker.get_daily_prices.return_value = prices
candidates = await scanner.scan() candidates = await scanner.scan()
mock_broker.fetch_market_rankings.assert_called_once() assert [c.stock_code for c in candidates] == ["035420"]
assert candidates[0].signal == "momentum"
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_scan_filters_low_volume( async def test_scan_domestic_filters_low_volatility(
self, scanner: SmartVolatilityScanner, mock_broker: MagicMock self, scanner: SmartVolatilityScanner, mock_broker: MagicMock
) -> None: ) -> None:
"""Test that stocks with low volume ratio are filtered out.""" """Domestic scan should drop symbols below volatility threshold."""
mock_broker.fetch_market_rankings.return_value = [ fluctuation_rows = [
{ {
"stock_code": "000660", "stock_code": "000660",
"name": "SK Hynix", "name": "SK Hynix",
"price": 150000, "price": 150000,
"volume": 500000, "volume": 500000,
"change_rate": -5.0, "change_rate": 0.2,
"volume_increase_rate": 50, # Only 50% increase (< 200%) "volume_increase_rate": 50,
}, },
] ]
mock_broker.fetch_market_rankings.side_effect = [fluctuation_rows, fluctuation_rows]
# Low volume mock_broker.get_daily_prices.return_value = [
prices = [] {"open": 1, "high": 150100, "low": 149900, "close": 150000, "volume": 1000000},
for i in range(20): {"open": 1, "high": 150100, "low": 149900, "close": 150000, "volume": 1000000},
prices.append({ ]
"date": f"2026020{i:02d}",
"open": 150000 - i * 100,
"high": 151000 - i * 100,
"low": 149000 - i * 100,
"close": 150000 - i * 150, # Declining (would be oversold)
"volume": 1000000, # Current 500k < 2x prev day 1M
})
mock_broker.get_daily_prices.return_value = prices
candidates = await scanner.scan() candidates = await scanner.scan()
# Should be filtered out due to low volume ratio
assert len(candidates) == 0
@pytest.mark.asyncio
async def test_scan_filters_neutral_rsi(
self, scanner: SmartVolatilityScanner, mock_broker: MagicMock
) -> None:
"""Test that stocks with neutral RSI are filtered out."""
mock_broker.fetch_market_rankings.return_value = [
{
"stock_code": "051910",
"name": "LG Chem",
"price": 500000,
"volume": 3000000,
"change_rate": 0.5,
"volume_increase_rate": 300, # High volume
},
]
# Flat prices (neutral RSI ~50)
prices = []
for i in range(20):
prices.append({
"date": f"2026020{i:02d}",
"open": 500000 + (i % 2) * 100, # Small oscillation
"high": 500500,
"low": 499500,
"close": 500000 + (i % 2) * 50,
"volume": 1000000,
})
mock_broker.get_daily_prices.return_value = prices
candidates = await scanner.scan()
# Should be filtered out (RSI ~50, not < 30 or > 70)
assert len(candidates) == 0 assert len(candidates) == 0
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_scan_uses_fallback_on_api_error( async def test_scan_uses_fallback_on_api_error(
self, scanner: SmartVolatilityScanner, mock_broker: MagicMock self, scanner: SmartVolatilityScanner, mock_broker: MagicMock
) -> None: ) -> None:
"""Test fallback to static list when ranking API fails.""" """Domestic scan should remain operational using fallback symbols."""
mock_broker.fetch_market_rankings.side_effect = ConnectionError("API unavailable") mock_broker.fetch_market_rankings.side_effect = [
ConnectionError("API unavailable"),
# Fallback stocks should still be analyzed ConnectionError("API unavailable"),
prices = [] ]
for i in range(20): mock_broker.get_daily_prices.return_value = [
prices.append({ {"open": 1, "high": 103, "low": 97, "close": 100, "volume": 1000000},
"date": f"2026020{i:02d}", {"open": 1, "high": 103, "low": 97, "close": 100, "volume": 800000},
"open": 50000 - i * 50, ]
"high": 51000 - i * 50,
"low": 49000 - i * 50,
"close": 50000 - i * 75, # Declining
"volume": 1000000,
})
mock_broker.get_daily_prices.return_value = prices
candidates = await scanner.scan(fallback_stocks=["005930", "000660"]) candidates = await scanner.scan(fallback_stocks=["005930", "000660"])
# Should not crash
assert isinstance(candidates, list) assert isinstance(candidates, list)
assert len(candidates) >= 1
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_scan_returns_top_n_only( async def test_scan_returns_top_n_only(
self, scanner: SmartVolatilityScanner, mock_broker: MagicMock self, scanner: SmartVolatilityScanner, mock_broker: MagicMock
) -> None: ) -> None:
"""Test that scan returns at most top_n candidates.""" """Test that scan returns at most top_n candidates."""
# Return many stocks fluctuation_rows = [
mock_broker.fetch_market_rankings.return_value = [
{ {
"stock_code": f"00{i}000", "stock_code": f"00{i}000",
"name": f"Stock{i}", "name": f"Stock{i}",
@@ -246,62 +188,17 @@ class TestSmartVolatilityScanner:
} }
for i in range(1, 10) for i in range(1, 10)
] ]
mock_broker.fetch_market_rankings.side_effect = [fluctuation_rows, fluctuation_rows]
# All oversold with high volume mock_broker.get_daily_prices.return_value = [
def make_prices(code: str) -> list[dict]: {"open": 1, "high": 105, "low": 95, "close": 100, "volume": 1000000},
prices = [] {"open": 1, "high": 105, "low": 95, "close": 100, "volume": 900000},
for i in range(20): ]
prices.append({
"date": f"2026020{i:02d}",
"open": 10000 - i * 100,
"high": 10500 - i * 100,
"low": 9500 - i * 100,
"close": 10000 - i * 150,
"volume": 1000000,
})
return prices
mock_broker.get_daily_prices.side_effect = make_prices
candidates = await scanner.scan() candidates = await scanner.scan()
# Should respect top_n limit (3) # Should respect top_n limit (3)
assert len(candidates) <= scanner.top_n assert len(candidates) <= scanner.top_n
@pytest.mark.asyncio
async def test_scan_skips_insufficient_price_history(
self, scanner: SmartVolatilityScanner, mock_broker: MagicMock
) -> None:
"""Test that stocks with insufficient history are skipped."""
mock_broker.fetch_market_rankings.return_value = [
{
"stock_code": "005930",
"name": "Samsung",
"price": 70000,
"volume": 5000000,
"change_rate": -5.0,
"volume_increase_rate": 300,
},
]
# Only 5 days of data (need 15+ for RSI)
mock_broker.get_daily_prices.return_value = [
{
"date": f"2026020{i:02d}",
"open": 70000,
"high": 71000,
"low": 69000,
"close": 70000,
"volume": 2000000,
}
for i in range(5)
]
candidates = await scanner.scan()
# Should skip due to insufficient data
assert len(candidates) == 0
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_get_stock_codes( async def test_get_stock_codes(
self, scanner: SmartVolatilityScanner self, scanner: SmartVolatilityScanner