From 733e6b36e924aac1fa2a39f8aedac0a9126982a8 Mon Sep 17 00:00:00 2001 From: agentson Date: Tue, 17 Feb 2026 06:15:20 +0900 Subject: [PATCH] feat: unify domestic scanner and sizing; update docs --- docs/architecture.md | 27 +++- docs/requirements-log.md | 26 ++++ src/analysis/smart_scanner.py | 191 +++++++++++++++-------------- src/config.py | 5 + src/main.py | 74 ++++++++++- tests/test_smart_scanner.py | 225 +++++++++------------------------- 6 files changed, 284 insertions(+), 264 deletions(-) diff --git a/docs/architecture.md b/docs/architecture.md index 0410ec5..a334e2d 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -87,12 +87,10 @@ High-frequency trading with individual stock analysis: **SmartVolatilityScanner** (`smart_scanner.py`) — Python-first filtering pipeline - **Domestic (KR)**: - - **Step 1**: Fetch volume rankings from KIS API (top 30 stocks) - - **Step 2**: Calculate RSI and volume ratio for each stock - - **Step 3**: Apply filters: - - Volume ratio >= `VOL_MULTIPLIER` (default 2.0x previous day) - - RSI < `RSI_OVERSOLD_THRESHOLD` (30) OR RSI > `RSI_MOMENTUM_THRESHOLD` (70) - - **Step 4**: Score candidates by RSI extremity (60%) + volume surge (40%) + - **Step 1**: Fetch domestic fluctuation ranking as primary universe + - **Step 2**: Fetch domestic volume ranking for liquidity bonus + - **Step 3**: Compute volatility-first score (max of daily change% and intraday range%) + - **Step 4**: Apply liquidity bonus and return top N candidates - **Overseas (US/JP/HK/CN/VN)**: - **Step 1**: Fetch overseas ranking universe (fluctuation rank + volume rank bonus) - **Step 2**: Compute volatility-first score (max of daily change% and intraday range%) @@ -102,6 +100,11 @@ High-frequency trading with individual stock analysis: from runtime active symbols + recent traded symbols + current holdings (no static watchlist) - **Realtime mode only**: Daily mode uses batch processing for API efficiency +**Benefits:** +- Reduces Gemini API calls from 20-30 stocks to 1-3 qualified candidates +- Fast Python-based filtering before expensive AI judgment +- Logs selection context (RSI-compatible proxy, volume_ratio, signal, score) for Evolution system + ### 3. Brain (`src/brain/`) **GeminiClient** (`gemini_client.py`) — AI decision engine powered by Google Gemini @@ -581,6 +584,18 @@ NEWS_API_KEY=... NEWS_API_PROVIDER=... MARKET_DATA_API_KEY=... +# Position Sizing (optional) +POSITION_SIZING_ENABLED=true +POSITION_BASE_ALLOCATION_PCT=5.0 +POSITION_MIN_ALLOCATION_PCT=1.0 +POSITION_MAX_ALLOCATION_PCT=10.0 +POSITION_VOLATILITY_TARGET_SCORE=50.0 + +# Legacy/compat scanner thresholds (kept for backward compatibility) +RSI_OVERSOLD_THRESHOLD=30 +RSI_MOMENTUM_THRESHOLD=70 +VOL_MULTIPLIER=2.0 + # Overseas Ranking API (optional override; account-dependent) OVERSEAS_RANKING_ENABLED=true OVERSEAS_RANKING_FLUCT_TR_ID=HHDFS76200100 diff --git a/docs/requirements-log.md b/docs/requirements-log.md index 81a82f8..a7c4ca2 100644 --- a/docs/requirements-log.md +++ b/docs/requirements-log.md @@ -139,3 +139,29 @@ - 해외 시장에서 스캐너 후보 0개로 정지되는 상황 완화 - 종목 선정 기준이 단순 상승률 중심에서 변동성 중심으로 개선 - 고정 티커 없이도 시장 주도 변동 종목 탐지 가능 + +### 국내 스캐너/주문수량 정렬: 변동성 우선 + 리스크 타기팅 + +**배경:** +- 해외만 변동성 우선으로 동작하고, 국내는 RSI/거래량 필터 중심으로 동작해 시장 간 전략 일관성이 낮았음 +- 매수 수량이 고정 1주라서 변동성 구간별 익스포저 관리가 어려웠음 + +**요구사항:** +1. 국내 스캐너도 변동성 우선 선별로 해외와 통일 +2. 고변동 종목일수록 포지션 크기를 줄이는 수량 산식 적용 + +**구현 결과:** +- `src/analysis/smart_scanner.py` + - 국내: `fluctuation ranking + volume ranking bonus` 기반 점수화로 전환 + - 점수는 `max(abs(change_rate), intraday_range_pct)` 중심으로 계산 + - 국내 랭킹 응답 스키마 키(`price`, `change_rate`, `volume`) 파싱 보강 +- `src/main.py` + - `_determine_order_quantity()` 추가 + - BUY 시 변동성 점수 기반 동적 수량 산정 적용 + - `trading_cycle`, `run_daily_session` 경로 모두 동일 수량 로직 사용 +- `src/config.py` + - `POSITION_SIZING_*` 설정 추가 + +**효과:** +- 국내/해외 스캐너 기준이 변동성 중심으로 일관화 +- 고변동 구간에서 자동 익스포저 축소, 저변동 구간에서 과소진입 완화 diff --git a/src/analysis/smart_scanner.py b/src/analysis/smart_scanner.py index 7201d54..0f4f8c8 100644 --- a/src/analysis/smart_scanner.py +++ b/src/analysis/smart_scanner.py @@ -1,8 +1,4 @@ -"""Smart Volatility Scanner with RSI and volume filters. - -Fetches market rankings from KIS API and applies technical filters -to identify high-probability trading candidates. -""" +"""Smart Volatility Scanner with volatility-first market ranking logic.""" from __future__ import annotations @@ -34,14 +30,13 @@ class ScanCandidate: class SmartVolatilityScanner: - """Scans market rankings and applies RSI/volume filters. + """Scans market rankings and applies volatility-first filters. Flow: - 1. Fetch volume rankings from KIS API - 2. For each ranked stock, fetch daily prices - 3. Calculate RSI and volume ratio - 4. Apply filters: volume > VOL_MULTIPLIER AND (RSI < 30 OR RSI > 70) - 5. Return top N qualified candidates + 1. Fetch fluctuation rankings as primary universe + 2. Fetch volume rankings for liquidity bonus + 3. Score by volatility first, liquidity second + 4. Return top N qualified candidates """ def __init__( @@ -92,98 +87,108 @@ class SmartVolatilityScanner: self, fallback_stocks: list[str] | None = None, ) -> list[ScanCandidate]: - """Scan domestic market using ranking API + RSI/volume filters.""" - # Step 1: Fetch rankings + """Scan domestic market using volatility-first ranking + liquidity bonus.""" + # 1) Primary universe from fluctuation ranking. try: - rankings = await self.broker.fetch_market_rankings( - ranking_type="volume", - limit=30, # Fetch more than needed for filtering + fluct_rows = await self.broker.fetch_market_rankings( + ranking_type="fluctuation", + limit=50, ) - logger.info("Fetched %d stocks from volume rankings", len(rankings)) except ConnectionError as exc: - logger.warning("Ranking API failed, using fallback: %s", exc) - if fallback_stocks: - # Create minimal ranking data for fallback - rankings = [ - { - "stock_code": code, - "name": code, - "price": 0, - "volume": 0, - "change_rate": 0, - "volume_increase_rate": 0, - } - for code in fallback_stocks - ] - else: - return [] + logger.warning("Domestic fluctuation ranking failed: %s", exc) + fluct_rows = [] + + # 2) Liquidity bonus from volume ranking. + try: + volume_rows = await self.broker.fetch_market_rankings( + ranking_type="volume", + limit=50, + ) + except ConnectionError as exc: + logger.warning("Domestic volume ranking failed: %s", exc) + volume_rows = [] + + if not fluct_rows and fallback_stocks: + logger.info( + "Domestic ranking unavailable; using fallback symbols (%d)", + len(fallback_stocks), + ) + fluct_rows = [ + { + "stock_code": code, + "name": code, + "price": 0.0, + "volume": 0.0, + "change_rate": 0.0, + "volume_increase_rate": 0.0, + } + for code in fallback_stocks + ] + + if not fluct_rows: + return [] + + volume_rank_bonus: dict[str, float] = {} + for idx, row in enumerate(volume_rows): + code = _extract_stock_code(row) + if not code: + continue + volume_rank_bonus[code] = max(0.0, 15.0 - idx * 0.3) - # Step 2: Analyze each stock candidates: list[ScanCandidate] = [] - - for stock in rankings: - stock_code = stock["stock_code"] + for stock in fluct_rows: + stock_code = _extract_stock_code(stock) if not stock_code: continue try: - # Fetch daily prices for RSI calculation - daily_prices = await self.broker.get_daily_prices(stock_code, days=20) + price = _extract_last_price(stock) + change_rate = _extract_change_rate_pct(stock) + volume = _extract_volume(stock) - if len(daily_prices) < 15: # Need at least 14+1 for RSI - logger.debug("Insufficient price history for %s", stock_code) + intraday_range_pct = 0.0 + volume_ratio = _safe_float(stock.get("volume_increase_rate"), 0.0) / 100.0 + 1.0 + + # Use daily chart to refine range/volume when available. + daily_prices = await self.broker.get_daily_prices(stock_code, days=2) + if daily_prices: + latest = daily_prices[-1] + latest_close = _safe_float(latest.get("close"), default=price) + if price <= 0: + price = latest_close + latest_high = _safe_float(latest.get("high")) + latest_low = _safe_float(latest.get("low")) + if latest_close > 0 and latest_high > 0 and latest_low > 0 and latest_high >= latest_low: + intraday_range_pct = (latest_high - latest_low) / latest_close * 100.0 + if volume <= 0: + volume = _safe_float(latest.get("volume")) + if len(daily_prices) >= 2: + prev_day_volume = _safe_float(daily_prices[-2].get("volume")) + if prev_day_volume > 0: + volume_ratio = max(volume_ratio, volume / prev_day_volume) + + volatility_pct = max(abs(change_rate), intraday_range_pct) + if price <= 0 or volatility_pct < 0.8: continue - # Calculate RSI - close_prices = [p["close"] for p in daily_prices] - rsi = self.analyzer.calculate_rsi(close_prices, period=14) + volatility_score = min(volatility_pct / 10.0, 1.0) * 85.0 + liquidity_score = volume_rank_bonus.get(stock_code, 0.0) + score = min(100.0, volatility_score + liquidity_score) + signal = "momentum" if change_rate >= 0 else "oversold" + implied_rsi = max(0.0, min(100.0, 50.0 + (change_rate * 4.0))) - # Calculate volume ratio (today vs previous day avg) - if len(daily_prices) >= 2: - prev_day_volume = daily_prices[-2]["volume"] - current_volume = stock.get("volume", 0) or daily_prices[-1]["volume"] - volume_ratio = ( - current_volume / prev_day_volume if prev_day_volume > 0 else 1.0 - ) - else: - volume_ratio = stock.get("volume_increase_rate", 0) / 100 + 1 # Fallback - - # Apply filters - volume_qualified = volume_ratio >= self.vol_multiplier - rsi_oversold = rsi < self.rsi_oversold - rsi_momentum = rsi > self.rsi_momentum - - if volume_qualified and (rsi_oversold or rsi_momentum): - signal = "oversold" if rsi_oversold else "momentum" - - # Calculate composite score - # Higher score for: extreme RSI + high volume - rsi_extremity = abs(rsi - 50) / 50 # 0-1 scale - volume_score = min(volume_ratio / 5, 1.0) # Cap at 5x - score = (rsi_extremity * 0.6 + volume_score * 0.4) * 100 - - candidates.append( - ScanCandidate( - stock_code=stock_code, - name=stock.get("name", stock_code), - price=stock.get("price", daily_prices[-1]["close"]), - volume=current_volume, - volume_ratio=volume_ratio, - rsi=rsi, - signal=signal, - score=score, - ) - ) - - logger.info( - "Qualified: %s (%s) RSI=%.1f vol=%.1fx signal=%s score=%.1f", - stock_code, - stock.get("name", ""), - rsi, - volume_ratio, - signal, - score, + candidates.append( + ScanCandidate( + stock_code=stock_code, + name=stock.get("name", stock_code), + price=price, + volume=volume, + volume_ratio=max(1.0, volume_ratio, volatility_pct / 2.0), + rsi=implied_rsi, + signal=signal, + score=score, ) + ) except ConnectionError as exc: logger.warning("Failed to analyze %s: %s", stock_code, exc) @@ -192,7 +197,7 @@ class SmartVolatilityScanner: logger.error("Unexpected error analyzing %s: %s", stock_code, exc) continue - # Sort by score and return top N + logger.info("Domestic ranking scan found %d candidates", len(candidates)) candidates.sort(key=lambda c: c.score, reverse=True) return candidates[: self.top_n] @@ -390,6 +395,7 @@ def _extract_last_price(row: dict[str, Any]) -> float: row.get("last") or row.get("ovrs_nmix_prpr") or row.get("stck_prpr") + or row.get("price") or row.get("close") ) @@ -398,6 +404,7 @@ def _extract_change_rate_pct(row: dict[str, Any]) -> float: """Extract daily change rate (%) from API schema variants.""" return _safe_float( row.get("rate") + or row.get("change_rate") or row.get("prdy_ctrt") or row.get("evlu_pfls_rt") or row.get("chg_rt") @@ -406,7 +413,9 @@ def _extract_change_rate_pct(row: dict[str, Any]) -> float: def _extract_volume(row: dict[str, Any]) -> float: """Extract volume/traded-amount proxy from schema variants.""" - return _safe_float(row.get("tvol") or row.get("acml_vol") or row.get("vol")) + return _safe_float( + row.get("tvol") or row.get("acml_vol") or row.get("vol") or row.get("volume") + ) def _extract_intraday_range_pct(row: dict[str, Any], price: float) -> float: diff --git a/src/config.py b/src/config.py index 8516cb8..6d79119 100644 --- a/src/config.py +++ b/src/config.py @@ -38,6 +38,11 @@ class Settings(BaseSettings): RSI_MOMENTUM_THRESHOLD: int = Field(default=70, ge=50, le=100) VOL_MULTIPLIER: float = Field(default=2.0, gt=1.0, le=10.0) SCANNER_TOP_N: int = Field(default=3, ge=1, le=10) + POSITION_SIZING_ENABLED: bool = True + POSITION_BASE_ALLOCATION_PCT: float = Field(default=5.0, gt=0.0, le=30.0) + POSITION_MIN_ALLOCATION_PCT: float = Field(default=1.0, gt=0.0, le=20.0) + POSITION_MAX_ALLOCATION_PCT: float = Field(default=10.0, gt=0.0, le=50.0) + POSITION_VOLATILITY_TARGET_SCORE: float = Field(default=50.0, gt=0.0, le=100.0) # Database DB_PATH: str = "data/trade_logs.db" diff --git a/src/main.py b/src/main.py index da6f658..b4f147c 100644 --- a/src/main.py +++ b/src/main.py @@ -106,6 +106,41 @@ def _extract_symbol_from_holding(item: dict[str, Any]) -> str: return "" +def _determine_order_quantity( + *, + action: str, + current_price: float, + total_cash: float, + candidate: ScanCandidate | None, + settings: Settings | None, +) -> int: + """Determine order quantity using volatility-aware position sizing.""" + if action != "BUY": + return 1 + if current_price <= 0 or total_cash <= 0: + return 0 + + if settings is None or not settings.POSITION_SIZING_ENABLED: + return 1 + + target_score = max(1.0, settings.POSITION_VOLATILITY_TARGET_SCORE) + observed_score = candidate.score if candidate else target_score + observed_score = max(1.0, min(100.0, observed_score)) + + # Higher observed volatility score => smaller allocation. + scaled_pct = settings.POSITION_BASE_ALLOCATION_PCT * (target_score / observed_score) + allocation_pct = min( + settings.POSITION_MAX_ALLOCATION_PCT, + max(settings.POSITION_MIN_ALLOCATION_PCT, scaled_pct), + ) + + budget = total_cash * (allocation_pct / 100.0) + quantity = int(budget // current_price) + if quantity <= 0: + return 0 + return quantity + + async def build_overseas_symbol_universe( db_conn: Any, overseas_broker: OverseasBroker, @@ -162,6 +197,7 @@ async def trading_cycle( market: MarketInfo, stock_code: str, scan_candidates: dict[str, dict[str, ScanCandidate]], + settings: Settings | None = None, ) -> None: """Execute one trading cycle for a single stock.""" cycle_start_time = asyncio.get_event_loop().time() @@ -399,8 +435,23 @@ async def trading_cycle( trade_price = current_price trade_pnl = 0.0 if decision.action in ("BUY", "SELL"): - # Determine order size (simplified: 1 lot) - quantity = 1 + quantity = _determine_order_quantity( + action=decision.action, + current_price=current_price, + total_cash=total_cash, + candidate=candidate, + settings=settings, + ) + if quantity <= 0: + logger.info( + "Skip %s %s (%s): no affordable quantity (cash=%.2f, price=%.2f)", + decision.action, + stock_code, + market.name, + total_cash, + current_price, + ) + return order_amount = current_price * quantity # 4. Risk check BEFORE order @@ -766,7 +817,23 @@ async def run_daily_session( trade_price = stock_data["current_price"] trade_pnl = 0.0 if decision.action in ("BUY", "SELL"): - quantity = 1 + quantity = _determine_order_quantity( + action=decision.action, + current_price=stock_data["current_price"], + total_cash=total_cash, + candidate=candidate_map.get(stock_code), + settings=settings, + ) + if quantity <= 0: + logger.info( + "Skip %s %s (%s): no affordable quantity (cash=%.2f, price=%.2f)", + decision.action, + stock_code, + market.name, + total_cash, + stock_data["current_price"], + ) + continue order_amount = stock_data["current_price"] * quantity # Risk check @@ -1672,6 +1739,7 @@ async def run(settings: Settings) -> None: market, stock_code, scan_candidates, + settings, ) break # Success — exit retry loop except CircuitBreakerTripped as exc: diff --git a/tests/test_smart_scanner.py b/tests/test_smart_scanner.py index 31655bc..18f3873 100644 --- a/tests/test_smart_scanner.py +++ b/tests/test_smart_scanner.py @@ -63,52 +63,51 @@ class TestSmartVolatilityScanner: """Test suite for SmartVolatilityScanner.""" @pytest.mark.asyncio - async def test_scan_finds_oversold_candidates( + async def test_scan_domestic_prefers_volatility_with_liquidity_bonus( self, scanner: SmartVolatilityScanner, mock_broker: MagicMock ) -> None: - """Test that scanner identifies oversold stocks with high volume.""" - # Mock rankings - mock_broker.fetch_market_rankings.return_value = [ + """Domestic scan should score by volatility first and volume rank second.""" + fluctuation_rows = [ { "stock_code": "005930", "name": "Samsung", "price": 70000, "volume": 5000000, - "change_rate": -3.5, + "change_rate": -5.0, "volume_increase_rate": 250, }, + { + "stock_code": "035420", + "name": "NAVER", + "price": 250000, + "volume": 3000000, + "change_rate": 3.0, + "volume_increase_rate": 200, + }, + ] + volume_rows = [ + {"stock_code": "035420", "name": "NAVER", "price": 250000, "volume": 3000000}, + {"stock_code": "005930", "name": "Samsung", "price": 70000, "volume": 5000000}, + ] + mock_broker.fetch_market_rankings.side_effect = [fluctuation_rows, volume_rows] + mock_broker.get_daily_prices.return_value = [ + {"open": 1, "high": 1, "low": 1, "close": 1, "volume": 1000000}, + {"open": 1, "high": 1, "low": 1, "close": 1, "volume": 1000000}, ] - - # Mock daily prices - trending down (oversold) - prices = [] - for i in range(20): - prices.append({ - "date": f"2026020{i:02d}", - "open": 75000 - i * 200, - "high": 75500 - i * 200, - "low": 74500 - i * 200, - "close": 75000 - i * 250, # Steady decline - "volume": 2000000, - }) - mock_broker.get_daily_prices.return_value = prices candidates = await scanner.scan() - # Should find at least one candidate (depending on exact RSI calculation) - mock_broker.fetch_market_rankings.assert_called_once() - mock_broker.get_daily_prices.assert_called_once_with("005930", days=20) - - # If qualified, should have oversold signal - if candidates: - assert candidates[0].signal in ["oversold", "momentum"] - assert candidates[0].volume_ratio >= scanner.vol_multiplier + assert len(candidates) >= 1 + # Samsung has higher absolute move, so it should lead despite lower volume rank bonus. + assert candidates[0].stock_code == "005930" + assert candidates[0].signal == "oversold" @pytest.mark.asyncio - async def test_scan_finds_momentum_candidates( + async def test_scan_domestic_finds_momentum_candidate( self, scanner: SmartVolatilityScanner, mock_broker: MagicMock ) -> None: - """Test that scanner identifies momentum stocks with high volume.""" - mock_broker.fetch_market_rankings.return_value = [ + """Positive change should be represented as momentum signal.""" + fluctuation_rows = [ { "stock_code": "035420", "name": "NAVER", @@ -118,124 +117,67 @@ class TestSmartVolatilityScanner: "volume_increase_rate": 300, }, ] - - # Mock daily prices - trending up (momentum) - prices = [] - for i in range(20): - prices.append({ - "date": f"2026020{i:02d}", - "open": 230000 + i * 500, - "high": 231000 + i * 500, - "low": 229000 + i * 500, - "close": 230500 + i * 500, # Steady rise - "volume": 1000000, - }) - mock_broker.get_daily_prices.return_value = prices + mock_broker.fetch_market_rankings.side_effect = [fluctuation_rows, fluctuation_rows] + mock_broker.get_daily_prices.return_value = [ + {"open": 1, "high": 1, "low": 1, "close": 1, "volume": 1000000}, + {"open": 1, "high": 1, "low": 1, "close": 1, "volume": 1000000}, + ] candidates = await scanner.scan() - mock_broker.fetch_market_rankings.assert_called_once() + assert [c.stock_code for c in candidates] == ["035420"] + assert candidates[0].signal == "momentum" @pytest.mark.asyncio - async def test_scan_filters_low_volume( + async def test_scan_domestic_filters_low_volatility( self, scanner: SmartVolatilityScanner, mock_broker: MagicMock ) -> None: - """Test that stocks with low volume ratio are filtered out.""" - mock_broker.fetch_market_rankings.return_value = [ + """Domestic scan should drop symbols below volatility threshold.""" + fluctuation_rows = [ { "stock_code": "000660", "name": "SK Hynix", "price": 150000, "volume": 500000, - "change_rate": -5.0, - "volume_increase_rate": 50, # Only 50% increase (< 200%) + "change_rate": 0.2, + "volume_increase_rate": 50, }, ] - - # Low volume - prices = [] - for i in range(20): - prices.append({ - "date": f"2026020{i:02d}", - "open": 150000 - i * 100, - "high": 151000 - i * 100, - "low": 149000 - i * 100, - "close": 150000 - i * 150, # Declining (would be oversold) - "volume": 1000000, # Current 500k < 2x prev day 1M - }) - mock_broker.get_daily_prices.return_value = prices + mock_broker.fetch_market_rankings.side_effect = [fluctuation_rows, fluctuation_rows] + mock_broker.get_daily_prices.return_value = [ + {"open": 1, "high": 150100, "low": 149900, "close": 150000, "volume": 1000000}, + {"open": 1, "high": 150100, "low": 149900, "close": 150000, "volume": 1000000}, + ] candidates = await scanner.scan() - # Should be filtered out due to low volume ratio - assert len(candidates) == 0 - - @pytest.mark.asyncio - async def test_scan_filters_neutral_rsi( - self, scanner: SmartVolatilityScanner, mock_broker: MagicMock - ) -> None: - """Test that stocks with neutral RSI are filtered out.""" - mock_broker.fetch_market_rankings.return_value = [ - { - "stock_code": "051910", - "name": "LG Chem", - "price": 500000, - "volume": 3000000, - "change_rate": 0.5, - "volume_increase_rate": 300, # High volume - }, - ] - - # Flat prices (neutral RSI ~50) - prices = [] - for i in range(20): - prices.append({ - "date": f"2026020{i:02d}", - "open": 500000 + (i % 2) * 100, # Small oscillation - "high": 500500, - "low": 499500, - "close": 500000 + (i % 2) * 50, - "volume": 1000000, - }) - mock_broker.get_daily_prices.return_value = prices - - candidates = await scanner.scan() - - # Should be filtered out (RSI ~50, not < 30 or > 70) assert len(candidates) == 0 @pytest.mark.asyncio async def test_scan_uses_fallback_on_api_error( self, scanner: SmartVolatilityScanner, mock_broker: MagicMock ) -> None: - """Test fallback to static list when ranking API fails.""" - mock_broker.fetch_market_rankings.side_effect = ConnectionError("API unavailable") - - # Fallback stocks should still be analyzed - prices = [] - for i in range(20): - prices.append({ - "date": f"2026020{i:02d}", - "open": 50000 - i * 50, - "high": 51000 - i * 50, - "low": 49000 - i * 50, - "close": 50000 - i * 75, # Declining - "volume": 1000000, - }) - mock_broker.get_daily_prices.return_value = prices + """Domestic scan should remain operational using fallback symbols.""" + mock_broker.fetch_market_rankings.side_effect = [ + ConnectionError("API unavailable"), + ConnectionError("API unavailable"), + ] + mock_broker.get_daily_prices.return_value = [ + {"open": 1, "high": 103, "low": 97, "close": 100, "volume": 1000000}, + {"open": 1, "high": 103, "low": 97, "close": 100, "volume": 800000}, + ] candidates = await scanner.scan(fallback_stocks=["005930", "000660"]) - # Should not crash assert isinstance(candidates, list) + assert len(candidates) >= 1 @pytest.mark.asyncio async def test_scan_returns_top_n_only( self, scanner: SmartVolatilityScanner, mock_broker: MagicMock ) -> None: """Test that scan returns at most top_n candidates.""" - # Return many stocks - mock_broker.fetch_market_rankings.return_value = [ + fluctuation_rows = [ { "stock_code": f"00{i}000", "name": f"Stock{i}", @@ -246,62 +188,17 @@ class TestSmartVolatilityScanner: } for i in range(1, 10) ] - - # All oversold with high volume - def make_prices(code: str) -> list[dict]: - prices = [] - for i in range(20): - prices.append({ - "date": f"2026020{i:02d}", - "open": 10000 - i * 100, - "high": 10500 - i * 100, - "low": 9500 - i * 100, - "close": 10000 - i * 150, - "volume": 1000000, - }) - return prices - - mock_broker.get_daily_prices.side_effect = make_prices + mock_broker.fetch_market_rankings.side_effect = [fluctuation_rows, fluctuation_rows] + mock_broker.get_daily_prices.return_value = [ + {"open": 1, "high": 105, "low": 95, "close": 100, "volume": 1000000}, + {"open": 1, "high": 105, "low": 95, "close": 100, "volume": 900000}, + ] candidates = await scanner.scan() # Should respect top_n limit (3) assert len(candidates) <= scanner.top_n - @pytest.mark.asyncio - async def test_scan_skips_insufficient_price_history( - self, scanner: SmartVolatilityScanner, mock_broker: MagicMock - ) -> None: - """Test that stocks with insufficient history are skipped.""" - mock_broker.fetch_market_rankings.return_value = [ - { - "stock_code": "005930", - "name": "Samsung", - "price": 70000, - "volume": 5000000, - "change_rate": -5.0, - "volume_increase_rate": 300, - }, - ] - - # Only 5 days of data (need 15+ for RSI) - mock_broker.get_daily_prices.return_value = [ - { - "date": f"2026020{i:02d}", - "open": 70000, - "high": 71000, - "low": 69000, - "close": 70000, - "volume": 2000000, - } - for i in range(5) - ] - - candidates = await scanner.scan() - - # Should skip due to insufficient data - assert len(candidates) == 0 - @pytest.mark.asyncio async def test_get_stock_codes( self, scanner: SmartVolatilityScanner