Compare commits

..

1 Commits

Author SHA1 Message Date
agentson
e7de9aa894 Add scanner diagnostics for zero-candidate trade stalls 2026-02-18 00:11:53 +09:00
13 changed files with 202 additions and 1159 deletions

View File

@@ -0,0 +1,29 @@
# Issue: Realtime 모드에서 거래가 지속적으로 0건
## Summary
`realtime` 실행 중 주문 단계까지 진입하지 못하고, 스캐너 단계에서 후보가 0건으로 반복 종료된다.
## Observed
- 로그에서 반복적으로 `Smart Scanner: No candidates ... — no trades` 출력
- 해외 시장에서 `Overseas ranking endpoint unavailable (404)` 다수 발생
- fallback 심볼 스캔도 `0 candidates`로 종료
- `data/trade_logs.db` 기준 최근 구간에 `BUY/SELL` 없음
## Impact
- 매매 전략 품질과 무관하게 주문 경로가 실행되지 않아 실질 거래 불가
- 장애 원인을 로그만으로 즉시 분해하기 어려움
## Root-Cause Hypothesis
- 스캐너 필터(가격/변동성) 단계에서 대부분 탈락
- 해외 랭킹 API 불가 시 입력 유니버스가 빈 상태가 되어 후보 생성 실패
- 기존 로그는 최종 결과(0 candidates)만 보여 원인별 분해가 어려움
## Acceptance Criteria
- 스캔 1회마다 탈락 사유가 구조화되어 로그에 남아야 함
- 국내/해외(랭킹/폴백) 경로 모두 동일한 진단 지표를 제공해야 함
- 운영자가 로그만 보고 `왜 0 candidates인지`를 즉시 판단 가능해야 함
## Scope
- 이번 이슈는 **진단 가능성 개선(Observability)** 에 한정
- 후보 생성 전략 변경(기본 유니버스 강제 추가 등)은 별도 이슈로 분리

View File

@@ -0,0 +1,32 @@
# PR: Smart Scanner 진단 로그 추가 (0 candidates 원인 분해)
## Linked Issue
- `docs/issues/ISSUE-2026-02-17-no-trades-zero-candidates.md`
## What Changed
- `src/analysis/smart_scanner.py`에 스캔 진단 카운터 추가
- 국내 스캔 진단 로그 추가
- 해외 랭킹 스캔 진단 로그 추가
- 해외 fallback 심볼 스캔 진단 로그 추가
## Diagnostics Keys
- `total_rows`
- `missing_code`
- `invalid_price`
- `low_volatility`
- `connection_error` (해당 경로에서만)
- `unexpected_error` (해당 경로에서만)
- `qualified`
## Expected Log Examples
- `Domestic scan diagnostics: {...}`
- `Overseas ranking scan diagnostics for US_NASDAQ: {...}`
- `Overseas fallback scan diagnostics for US_NYSE: {...}`
## Out of Scope
- 해외 랭킹 404 시 기본 심볼 유니버스 강제 주입
- 국내 경로 fallback 정책 변경
## Validation
- `.venv/bin/python -m py_compile src/analysis/smart_scanner.py`

View File

@@ -165,39 +165,3 @@
**효과:** **효과:**
- 국내/해외 스캐너 기준이 변동성 중심으로 일관화 - 국내/해외 스캐너 기준이 변동성 중심으로 일관화
- 고변동 구간에서 자동 익스포저 축소, 저변동 구간에서 과소진입 완화 - 고변동 구간에서 자동 익스포저 축소, 저변동 구간에서 과소진입 완화
## 2026-02-18
### KIS 해외 랭킹 API 404 에러 수정
**배경:**
- KIS 해외주식 랭킹 API(`fetch_overseas_rankings`)가 모든 거래소에서 HTTP 404를 반환
- Smart Scanner가 해외 시장 후보 종목을 찾지 못해 거래가 전혀 실행되지 않음
**근본 원인:**
- TR_ID, API 경로, 거래소 코드가 모두 KIS 공식 문서와 불일치
**구현 결과:**
- `src/config.py`: TR_ID/Path 기본값을 KIS 공식 스펙으로 수정
- `src/broker/overseas.py`: 랭킹 API 전용 거래소 코드 매핑 추가 (NASD→NAS, NYSE→NYS, AMEX→AMS), 올바른 API 파라미터 사용
- `tests/test_overseas_broker.py`: 19개 단위 테스트 추가
**효과:**
- 해외 시장 랭킹 스캔이 정상 동작하여 Smart Scanner가 후보 종목 탐지 가능
### Gemini prompt_override 미적용 버그 수정
**배경:**
- `run_overnight` 실행 시 모든 시장에서 Playbook 생성 실패 (`JSONDecodeError`)
- defensive playbook으로 폴백되어 모든 종목이 HOLD 처리
**근본 원인:**
- `pre_market_planner.py``market_data["prompt_override"]`에 Playbook 전용 프롬프트를 넣어 `gemini.decide()` 호출
- `gemini_client.py``decide()` 메서드가 `prompt_override` 키를 전혀 확인하지 않고 항상 일반 트레이드 결정 프롬프트 생성
- Gemini가 Playbook JSON 대신 일반 트레이드 결정을 반환하여 파싱 실패
**구현 결과:**
- `src/brain/gemini_client.py`: `decide()` 메서드에서 `prompt_override` 우선 사용 로직 추가
- `tests/test_brain.py`: 3개 테스트 추가 (override 전달, optimization 우회, 미지정 시 기존 동작 유지)
**이슈/PR:** #143

View File

@@ -128,6 +128,16 @@ class SmartVolatilityScanner:
if not fluct_rows: if not fluct_rows:
return [] return []
diagnostics: dict[str, int | float] = {
"total_rows": len(fluct_rows),
"missing_code": 0,
"invalid_price": 0,
"low_volatility": 0,
"connection_error": 0,
"unexpected_error": 0,
"qualified": 0,
}
volume_rank_bonus: dict[str, float] = {} volume_rank_bonus: dict[str, float] = {}
for idx, row in enumerate(volume_rows): for idx, row in enumerate(volume_rows):
code = _extract_stock_code(row) code = _extract_stock_code(row)
@@ -139,6 +149,7 @@ class SmartVolatilityScanner:
for stock in fluct_rows: for stock in fluct_rows:
stock_code = _extract_stock_code(stock) stock_code = _extract_stock_code(stock)
if not stock_code: if not stock_code:
diagnostics["missing_code"] += 1
continue continue
try: try:
@@ -168,7 +179,11 @@ class SmartVolatilityScanner:
volume_ratio = max(volume_ratio, volume / prev_day_volume) volume_ratio = max(volume_ratio, volume / prev_day_volume)
volatility_pct = max(abs(change_rate), intraday_range_pct) volatility_pct = max(abs(change_rate), intraday_range_pct)
if price <= 0 or volatility_pct < 0.8: if price <= 0:
diagnostics["invalid_price"] += 1
continue
if volatility_pct < 0.8:
diagnostics["low_volatility"] += 1
continue continue
volatility_score = min(volatility_pct / 10.0, 1.0) * 85.0 volatility_score = min(volatility_pct / 10.0, 1.0) * 85.0
@@ -189,14 +204,22 @@ class SmartVolatilityScanner:
score=score, score=score,
) )
) )
diagnostics["qualified"] += 1
except ConnectionError as exc: except ConnectionError as exc:
diagnostics["connection_error"] += 1
logger.warning("Failed to analyze %s: %s", stock_code, exc) logger.warning("Failed to analyze %s: %s", stock_code, exc)
continue continue
except Exception as exc: except Exception as exc:
diagnostics["unexpected_error"] += 1
logger.error("Unexpected error analyzing %s: %s", stock_code, exc) logger.error("Unexpected error analyzing %s: %s", stock_code, exc)
continue continue
logger.info(
"Domestic scan diagnostics: %s (volatility_threshold=0.8, top_n=%d)",
diagnostics,
self.top_n,
)
logger.info("Domestic ranking scan found %d candidates", len(candidates)) logger.info("Domestic ranking scan found %d candidates", len(candidates))
candidates.sort(key=lambda c: c.score, reverse=True) candidates.sort(key=lambda c: c.score, reverse=True)
return candidates[: self.top_n] return candidates[: self.top_n]
@@ -242,6 +265,14 @@ class SmartVolatilityScanner:
if not fluct_rows: if not fluct_rows:
return [] return []
diagnostics: dict[str, int | float] = {
"total_rows": len(fluct_rows),
"missing_code": 0,
"invalid_price": 0,
"low_volatility": 0,
"qualified": 0,
}
volume_rank_bonus: dict[str, float] = {} volume_rank_bonus: dict[str, float] = {}
try: try:
volume_rows = await self.overseas_broker.fetch_overseas_rankings( volume_rows = await self.overseas_broker.fetch_overseas_rankings(
@@ -266,6 +297,7 @@ class SmartVolatilityScanner:
for row in fluct_rows: for row in fluct_rows:
stock_code = _extract_stock_code(row) stock_code = _extract_stock_code(row)
if not stock_code: if not stock_code:
diagnostics["missing_code"] += 1
continue continue
price = _extract_last_price(row) price = _extract_last_price(row)
@@ -275,7 +307,11 @@ class SmartVolatilityScanner:
volatility_pct = max(abs(change_rate), intraday_range_pct) volatility_pct = max(abs(change_rate), intraday_range_pct)
# Volatility-first filter (not simple gainers/value ranking). # Volatility-first filter (not simple gainers/value ranking).
if price <= 0 or volatility_pct < 0.8: if price <= 0:
diagnostics["invalid_price"] += 1
continue
if volatility_pct < 0.8:
diagnostics["low_volatility"] += 1
continue continue
volatility_score = min(volatility_pct / 10.0, 1.0) * 85.0 volatility_score = min(volatility_pct / 10.0, 1.0) * 85.0
@@ -295,7 +331,14 @@ class SmartVolatilityScanner:
score=score, score=score,
) )
) )
diagnostics["qualified"] += 1
logger.info(
"Overseas ranking scan diagnostics for %s: %s (volatility_threshold=0.8, top_n=%d)",
market.code,
diagnostics,
self.top_n,
)
if candidates: if candidates:
logger.info( logger.info(
"Overseas ranking scan found %d candidates for %s", "Overseas ranking scan found %d candidates for %s",
@@ -320,6 +363,14 @@ class SmartVolatilityScanner:
len(symbols), len(symbols),
market.name, market.name,
) )
diagnostics: dict[str, int | float] = {
"total_rows": len(symbols),
"invalid_price": 0,
"low_volatility": 0,
"connection_error": 0,
"unexpected_error": 0,
"qualified": 0,
}
candidates: list[ScanCandidate] = [] candidates: list[ScanCandidate] = []
for stock_code in symbols: for stock_code in symbols:
try: try:
@@ -333,7 +384,11 @@ class SmartVolatilityScanner:
intraday_range_pct = _extract_intraday_range_pct(output, price) intraday_range_pct = _extract_intraday_range_pct(output, price)
volatility_pct = max(abs(change_rate), intraday_range_pct) volatility_pct = max(abs(change_rate), intraday_range_pct)
if price <= 0 or volatility_pct < 0.8: if price <= 0:
diagnostics["invalid_price"] += 1
continue
if volatility_pct < 0.8:
diagnostics["low_volatility"] += 1
continue continue
score = min(volatility_pct / 10.0, 1.0) * 100.0 score = min(volatility_pct / 10.0, 1.0) * 100.0
@@ -351,10 +406,19 @@ class SmartVolatilityScanner:
score=score, score=score,
) )
) )
diagnostics["qualified"] += 1
except ConnectionError as exc: except ConnectionError as exc:
diagnostics["connection_error"] += 1
logger.warning("Failed to analyze overseas %s: %s", stock_code, exc) logger.warning("Failed to analyze overseas %s: %s", stock_code, exc)
except Exception as exc: except Exception as exc:
diagnostics["unexpected_error"] += 1
logger.error("Unexpected error analyzing overseas %s: %s", stock_code, exc) logger.error("Unexpected error analyzing overseas %s: %s", stock_code, exc)
logger.info(
"Overseas fallback scan diagnostics for %s: %s (volatility_threshold=0.8, top_n=%d)",
market.code,
diagnostics,
self.top_n,
)
logger.info( logger.info(
"Overseas symbol fallback scan found %d candidates for %s", "Overseas symbol fallback scan found %d candidates for %s",
len(candidates), len(candidates),

View File

@@ -410,10 +410,8 @@ class GeminiClient:
cached=True, cached=True,
) )
# Build prompt (prompt_override takes priority for callers like pre_market_planner) # Build optimized prompt
if "prompt_override" in market_data: if self._enable_optimization:
prompt = market_data["prompt_override"]
elif self._enable_optimization:
prompt = self._optimizer.build_compressed_prompt(market_data) prompt = self._optimizer.build_compressed_prompt(market_data)
else: else:
prompt = await self.build_prompt(market_data, news_sentiment) prompt = await self.build_prompt(market_data, news_sentiment)

View File

@@ -12,24 +12,6 @@ from src.broker.kis_api import KISBroker
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Ranking API uses different exchange codes than order/quote APIs.
_RANKING_EXCHANGE_MAP: dict[str, str] = {
"NASD": "NAS",
"NYSE": "NYS",
"AMEX": "AMS",
"SEHK": "HKS",
"SHAA": "SHS",
"SZAA": "SZS",
"HSX": "HSX",
"HNX": "HNX",
"TSE": "TSE",
}
# Price inquiry API (HHDFS00000300) uses the same short exchange codes as rankings.
# NASD → NAS, NYSE → NYS, AMEX → AMS (confirmed: AMEX returns empty, AMS returns price).
_PRICE_EXCHANGE_MAP: dict[str, str] = _RANKING_EXCHANGE_MAP
class OverseasBroker: class OverseasBroker:
"""KIS Overseas Stock API wrapper that reuses KISBroker infrastructure.""" """KIS Overseas Stock API wrapper that reuses KISBroker infrastructure."""
@@ -62,11 +44,9 @@ class OverseasBroker:
session = self._broker._get_session() session = self._broker._get_session()
headers = await self._broker._auth_headers("HHDFS00000300") headers = await self._broker._auth_headers("HHDFS00000300")
# Map internal exchange codes to the short form expected by the price API.
price_excd = _PRICE_EXCHANGE_MAP.get(exchange_code, exchange_code)
params = { params = {
"AUTH": "", "AUTH": "",
"EXCD": price_excd, "EXCD": exchange_code,
"SYMB": stock_code, "SYMB": stock_code,
} }
url = f"{self._broker._base_url}/uapi/overseas-price/v1/quotations/price" url = f"{self._broker._base_url}/uapi/overseas-price/v1/quotations/price"
@@ -90,7 +70,7 @@ class OverseasBroker:
ranking_type: str = "fluctuation", ranking_type: str = "fluctuation",
limit: int = 30, limit: int = 30,
) -> list[dict[str, Any]]: ) -> list[dict[str, Any]]:
"""Fetch overseas rankings (price change or volume surge). """Fetch overseas rankings (price change or volume amount).
Ranking API specs may differ by account/product. Endpoint paths and Ranking API specs may differ by account/product. Endpoint paths and
TR_IDs are configurable via settings and can be overridden in .env. TR_IDs are configurable via settings and can be overridden in .env.
@@ -101,63 +81,66 @@ class OverseasBroker:
await self._broker._rate_limiter.acquire() await self._broker._rate_limiter.acquire()
session = self._broker._get_session() session = self._broker._get_session()
ranking_excd = _RANKING_EXCHANGE_MAP.get(exchange_code, exchange_code)
if ranking_type == "volume": if ranking_type == "volume":
tr_id = self._broker._settings.OVERSEAS_RANKING_VOLUME_TR_ID configured_tr_id = self._broker._settings.OVERSEAS_RANKING_VOLUME_TR_ID
path = self._broker._settings.OVERSEAS_RANKING_VOLUME_PATH configured_path = self._broker._settings.OVERSEAS_RANKING_VOLUME_PATH
params: dict[str, str] = { default_tr_id = "HHDFS76200200"
"AUTH": "", default_path = "/uapi/overseas-price/v1/quotations/inquire-volume-rank"
"EXCD": ranking_excd,
"MIXN": "0",
"VOL_RANG": "0",
}
else: else:
tr_id = self._broker._settings.OVERSEAS_RANKING_FLUCT_TR_ID configured_tr_id = self._broker._settings.OVERSEAS_RANKING_FLUCT_TR_ID
path = self._broker._settings.OVERSEAS_RANKING_FLUCT_PATH configured_path = self._broker._settings.OVERSEAS_RANKING_FLUCT_PATH
params = { default_tr_id = "HHDFS76200100"
"AUTH": "", default_path = "/uapi/overseas-price/v1/quotations/inquire-updown-rank"
"EXCD": ranking_excd,
"NDAY": "0",
"GUBN": "1",
"VOL_RANG": "0",
}
headers = await self._broker._auth_headers(tr_id) endpoint_specs: list[tuple[str, str]] = [(configured_tr_id, configured_path)]
url = f"{self._broker._base_url}{path}" if (configured_tr_id, configured_path) != (default_tr_id, default_path):
endpoint_specs.append((default_tr_id, default_path))
try: # Try common param variants used by KIS overseas quotation APIs.
async with session.get(url, headers=headers, params=params) as resp: param_variants = [
if resp.status != 200: {"AUTH": "", "EXCD": exchange_code, "NREC": str(max(limit, 30))},
text = await resp.text() {"AUTH": "", "OVRS_EXCG_CD": exchange_code, "NREC": str(max(limit, 30))},
if resp.status == 404: {"AUTH": "", "EXCD": exchange_code},
logger.warning( {"AUTH": "", "OVRS_EXCG_CD": exchange_code},
"Overseas ranking endpoint unavailable (404) for %s/%s; " ]
"using symbol fallback scan",
exchange_code,
ranking_type,
)
return []
raise ConnectionError(
f"fetch_overseas_rankings failed ({resp.status}): {text}"
)
data = await resp.json() last_error: str | None = None
rows = self._extract_ranking_rows(data) saw_http_404 = False
if rows: for tr_id, path in endpoint_specs:
return rows[:limit] headers = await self._broker._auth_headers(tr_id)
url = f"{self._broker._base_url}{path}"
for params in param_variants:
try:
async with session.get(url, headers=headers, params=params) as resp:
text = await resp.text()
if resp.status != 200:
last_error = f"HTTP {resp.status}: {text}"
if resp.status == 404:
saw_http_404 = True
continue
logger.debug( data = await resp.json()
"Overseas ranking returned empty for %s/%s (keys=%s)", rows = self._extract_ranking_rows(data)
exchange_code, if rows:
ranking_type, return rows[:limit]
list(data.keys()),
) # keep trying another param variant if response has no usable rows
return [] last_error = f"empty output (keys={list(data.keys())})"
except (TimeoutError, aiohttp.ClientError) as exc: except (TimeoutError, aiohttp.ClientError) as exc:
raise ConnectionError( last_error = str(exc)
f"Network error fetching overseas rankings: {exc}" continue
) from exc
if saw_http_404:
logger.warning(
"Overseas ranking endpoint unavailable (404) for %s/%s; using symbol fallback scan",
exchange_code,
ranking_type,
)
return []
raise ConnectionError(
f"fetch_overseas_rankings failed for {exchange_code}/{ranking_type}: {last_error}"
)
async def get_overseas_balance(self, exchange_code: str) -> dict[str, Any]: async def get_overseas_balance(self, exchange_code: str) -> dict[str, Any]:
""" """

View File

@@ -55,11 +55,6 @@ class Settings(BaseSettings):
# Trading mode # Trading mode
MODE: str = Field(default="paper", pattern="^(paper|live)$") MODE: str = Field(default="paper", pattern="^(paper|live)$")
# Simulated USD cash for VTS (paper) overseas trading.
# KIS VTS overseas balance API returns errors for most accounts.
# This value is used as a fallback when the balance API returns 0 in paper mode.
PAPER_OVERSEAS_CASH: float = Field(default=50000.0, ge=0.0)
# Trading frequency mode (daily = batch API calls, realtime = per-stock calls) # Trading frequency mode (daily = batch API calls, realtime = per-stock calls)
TRADE_MODE: str = Field(default="daily", pattern="^(daily|realtime)$") TRADE_MODE: str = Field(default="daily", pattern="^(daily|realtime)$")
DAILY_SESSIONS: int = Field(default=4, ge=1, le=10) DAILY_SESSIONS: int = Field(default=4, ge=1, le=10)
@@ -96,13 +91,13 @@ class Settings(BaseSettings):
# Overseas ranking API (KIS endpoint/TR_ID may vary by account/product) # Overseas ranking API (KIS endpoint/TR_ID may vary by account/product)
# Override these from .env if your account uses different specs. # Override these from .env if your account uses different specs.
OVERSEAS_RANKING_ENABLED: bool = True OVERSEAS_RANKING_ENABLED: bool = True
OVERSEAS_RANKING_FLUCT_TR_ID: str = "HHDFS76290000" OVERSEAS_RANKING_FLUCT_TR_ID: str = "HHDFS76200100"
OVERSEAS_RANKING_VOLUME_TR_ID: str = "HHDFS76270000" OVERSEAS_RANKING_VOLUME_TR_ID: str = "HHDFS76200200"
OVERSEAS_RANKING_FLUCT_PATH: str = ( OVERSEAS_RANKING_FLUCT_PATH: str = (
"/uapi/overseas-stock/v1/ranking/updown-rate" "/uapi/overseas-price/v1/quotations/inquire-updown-rank"
) )
OVERSEAS_RANKING_VOLUME_PATH: str = ( OVERSEAS_RANKING_VOLUME_PATH: str = (
"/uapi/overseas-stock/v1/ranking/volume-surge" "/uapi/overseas-price/v1/quotations/inquire-volume-rank"
) )
# Dashboard (optional) # Dashboard (optional)

View File

@@ -239,33 +239,10 @@ async def trading_cycle(
total_cash = safe_float(balance_info.get("frcr_dncl_amt_2", "0") or "0") total_cash = safe_float(balance_info.get("frcr_dncl_amt_2", "0") or "0")
purchase_total = safe_float(balance_info.get("frcr_buy_amt_smtl", "0") or "0") purchase_total = safe_float(balance_info.get("frcr_buy_amt_smtl", "0") or "0")
# VTS (paper trading) overseas balance API often returns 0 or errors.
# Fall back to configured paper cash so BUY orders can be sized.
if total_cash <= 0 and settings and settings.PAPER_OVERSEAS_CASH > 0:
logger.debug(
"Overseas cash balance is 0 for %s; using paper fallback %.2f",
stock_code,
settings.PAPER_OVERSEAS_CASH,
)
total_cash = settings.PAPER_OVERSEAS_CASH
current_price = safe_float(price_data.get("output", {}).get("last", "0")) current_price = safe_float(price_data.get("output", {}).get("last", "0"))
foreigner_net = 0.0 # Not available for overseas foreigner_net = 0.0 # Not available for overseas
price_change_pct = safe_float(price_data.get("output", {}).get("rate", "0")) price_change_pct = safe_float(price_data.get("output", {}).get("rate", "0"))
# Price API may return 0/empty for certain VTS exchange codes.
# Fall back to the scanner candidate's price so order sizing still works.
if current_price <= 0:
market_candidates_lookup = scan_candidates.get(market.code, {})
cand_lookup = market_candidates_lookup.get(stock_code)
if cand_lookup and cand_lookup.price > 0:
current_price = cand_lookup.price
logger.debug(
"Price API returned 0 for %s; using scanner price %.4f",
stock_code,
current_price,
)
# Calculate daily P&L % # Calculate daily P&L %
pnl_pct = ( pnl_pct = (
((total_eval - purchase_total) / purchase_total * 100) ((total_eval - purchase_total) / purchase_total * 100)
@@ -715,16 +692,6 @@ async def run_daily_session(
price_change_pct = safe_float( price_change_pct = safe_float(
price_data.get("output", {}).get("rate", "0") price_data.get("output", {}).get("rate", "0")
) )
# Fall back to scanner candidate price if API returns 0.
if current_price <= 0:
cand_lookup = candidate_map.get(stock_code)
if cand_lookup and cand_lookup.price > 0:
current_price = cand_lookup.price
logger.debug(
"Price API returned 0 for %s; using scanner price %.4f",
stock_code,
current_price,
)
stock_data: dict[str, Any] = { stock_data: dict[str, Any] = {
"stock_code": stock_code, "stock_code": stock_code,
@@ -776,10 +743,6 @@ async def run_daily_session(
balance_info.get("frcr_buy_amt_smtl", "0") or "0" balance_info.get("frcr_buy_amt_smtl", "0") or "0"
) )
# VTS overseas balance API often returns 0; use paper fallback.
if total_cash <= 0 and settings.PAPER_OVERSEAS_CASH > 0:
total_cash = settings.PAPER_OVERSEAS_CASH
# Calculate daily P&L % # Calculate daily P&L %
pnl_pct = ( pnl_pct = (
((total_eval - purchase_total) / purchase_total * 100) ((total_eval - purchase_total) / purchase_total * 100)

View File

@@ -1,8 +1,7 @@
"""Pre-market planner — generates DayPlaybook via Gemini before market open. """Pre-market planner — generates DayPlaybook via Gemini before market open.
One Gemini API call per market per day. Candidates come from SmartVolatilityScanner. One Gemini API call per market per day. Candidates come from SmartVolatilityScanner.
On failure, returns a smart rule-based fallback playbook that uses scanner signals On failure, returns a defensive playbook (all HOLD, no trades).
(momentum/oversold) to generate BUY conditions, avoiding the all-HOLD problem.
""" """
from __future__ import annotations from __future__ import annotations
@@ -135,7 +134,7 @@ class PreMarketPlanner:
except Exception: except Exception:
logger.exception("Playbook generation failed for %s", market) logger.exception("Playbook generation failed for %s", market)
if self._settings.DEFENSIVE_PLAYBOOK_ON_FAILURE: if self._settings.DEFENSIVE_PLAYBOOK_ON_FAILURE:
return self._smart_fallback_playbook(today, market, candidates, self._settings) return self._defensive_playbook(today, market, candidates)
return self._empty_playbook(today, market) return self._empty_playbook(today, market)
def build_cross_market_context( def build_cross_market_context(
@@ -471,99 +470,3 @@ class PreMarketPlanner:
), ),
], ],
) )
@staticmethod
def _smart_fallback_playbook(
today: date,
market: str,
candidates: list[ScanCandidate],
settings: Settings,
) -> DayPlaybook:
"""Rule-based fallback playbook when Gemini is unavailable.
Uses scanner signals (RSI, volume_ratio) to generate meaningful BUY
conditions instead of the all-SELL defensive playbook. Candidates are
already pre-qualified by SmartVolatilityScanner, so we trust their
signals and build actionable scenarios from them.
Scenario logic per candidate:
- momentum signal: BUY when volume_ratio exceeds scanner threshold
- oversold signal: BUY when RSI is below oversold threshold
- always: SELL stop-loss at -3.0% as guard
"""
stock_playbooks = []
for c in candidates:
scenarios: list[StockScenario] = []
if c.signal == "momentum":
scenarios.append(
StockScenario(
condition=StockCondition(
volume_ratio_above=settings.VOL_MULTIPLIER,
),
action=ScenarioAction.BUY,
confidence=80,
allocation_pct=10.0,
stop_loss_pct=-3.0,
take_profit_pct=5.0,
rationale=(
f"Rule-based BUY: momentum signal, "
f"volume={c.volume_ratio:.1f}x (fallback planner)"
),
)
)
elif c.signal == "oversold":
scenarios.append(
StockScenario(
condition=StockCondition(
rsi_below=settings.RSI_OVERSOLD_THRESHOLD,
),
action=ScenarioAction.BUY,
confidence=80,
allocation_pct=10.0,
stop_loss_pct=-3.0,
take_profit_pct=5.0,
rationale=(
f"Rule-based BUY: oversold signal, "
f"RSI={c.rsi:.0f} (fallback planner)"
),
)
)
# Always add stop-loss guard
scenarios.append(
StockScenario(
condition=StockCondition(price_change_pct_below=-3.0),
action=ScenarioAction.SELL,
confidence=90,
stop_loss_pct=-3.0,
rationale="Rule-based stop-loss (fallback planner)",
)
)
stock_playbooks.append(
StockPlaybook(
stock_code=c.stock_code,
scenarios=scenarios,
)
)
logger.info(
"Smart fallback playbook for %s: %d stocks with rule-based BUY/SELL conditions",
market,
len(stock_playbooks),
)
return DayPlaybook(
date=today,
market=market,
market_outlook=MarketOutlook.NEUTRAL,
default_action=ScenarioAction.HOLD,
stock_playbooks=stock_playbooks,
global_rules=[
GlobalRule(
condition="portfolio_pnl_pct < -2.0",
action=ScenarioAction.REDUCE_ALL,
rationale="Defensive: reduce on loss threshold",
),
],
)

View File

@@ -2,10 +2,6 @@
from __future__ import annotations from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from src.brain.gemini_client import GeminiClient from src.brain.gemini_client import GeminiClient
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -274,97 +270,3 @@ class TestBatchDecisionParsing:
assert decisions["AAPL"].action == "HOLD" assert decisions["AAPL"].action == "HOLD"
assert decisions["AAPL"].confidence == 0 assert decisions["AAPL"].confidence == 0
# ---------------------------------------------------------------------------
# Prompt Override (used by pre_market_planner)
# ---------------------------------------------------------------------------
class TestPromptOverride:
"""decide() must use prompt_override when present in market_data."""
@pytest.mark.asyncio
async def test_prompt_override_is_sent_to_gemini(self, settings):
"""When prompt_override is in market_data, it should be used as the prompt."""
client = GeminiClient(settings)
custom_prompt = "You are a playbook generator. Return JSON with scenarios."
mock_response = MagicMock()
mock_response.text = '{"action": "HOLD", "confidence": 50, "rationale": "test"}'
with patch.object(
client._client.aio.models,
"generate_content",
new_callable=AsyncMock,
return_value=mock_response,
) as mock_generate:
market_data = {
"stock_code": "PLANNER",
"current_price": 0,
"prompt_override": custom_prompt,
}
await client.decide(market_data)
# Verify the custom prompt was sent, not a built prompt
mock_generate.assert_called_once()
actual_prompt = mock_generate.call_args[1].get(
"contents", mock_generate.call_args[0][1] if len(mock_generate.call_args[0]) > 1 else None
)
assert actual_prompt == custom_prompt
@pytest.mark.asyncio
async def test_prompt_override_skips_optimization(self, settings):
"""prompt_override should bypass prompt optimization."""
client = GeminiClient(settings)
client._enable_optimization = True
custom_prompt = "Custom playbook prompt"
mock_response = MagicMock()
mock_response.text = '{"action": "HOLD", "confidence": 50, "rationale": "ok"}'
with patch.object(
client._client.aio.models,
"generate_content",
new_callable=AsyncMock,
return_value=mock_response,
) as mock_generate:
market_data = {
"stock_code": "PLANNER",
"current_price": 0,
"prompt_override": custom_prompt,
}
await client.decide(market_data)
actual_prompt = mock_generate.call_args[1].get(
"contents", mock_generate.call_args[0][1] if len(mock_generate.call_args[0]) > 1 else None
)
assert actual_prompt == custom_prompt
@pytest.mark.asyncio
async def test_without_prompt_override_uses_build_prompt(self, settings):
"""Without prompt_override, decide() should use build_prompt as before."""
client = GeminiClient(settings)
mock_response = MagicMock()
mock_response.text = '{"action": "HOLD", "confidence": 50, "rationale": "ok"}'
with patch.object(
client._client.aio.models,
"generate_content",
new_callable=AsyncMock,
return_value=mock_response,
) as mock_generate:
market_data = {
"stock_code": "005930",
"current_price": 72000,
}
await client.decide(market_data)
actual_prompt = mock_generate.call_args[1].get(
"contents", mock_generate.call_args[0][1] if len(mock_generate.call_args[0]) > 1 else None
)
# Should contain stock code from build_prompt, not be a custom override
assert "005930" in actual_prompt

View File

@@ -90,12 +90,12 @@ class TestTokenManagement:
await broker.close() await broker.close()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_token_refresh_cooldown_waits_then_retries(self, settings): async def test_token_refresh_cooldown_prevents_rapid_retries(self, settings):
"""Token refresh should wait out cooldown then retry (issue #54).""" """Token refresh should enforce cooldown after failure (issue #54)."""
broker = KISBroker(settings) broker = KISBroker(settings)
broker._refresh_cooldown = 0.1 # Short cooldown for testing broker._refresh_cooldown = 2.0 # Short cooldown for testing
# All attempts fail with 403 (EGW00133) # First refresh attempt fails with 403 (EGW00133)
mock_resp_403 = AsyncMock() mock_resp_403 = AsyncMock()
mock_resp_403.status = 403 mock_resp_403.status = 403
mock_resp_403.text = AsyncMock( mock_resp_403.text = AsyncMock(
@@ -109,8 +109,8 @@ class TestTokenManagement:
with pytest.raises(ConnectionError, match="Token refresh failed"): with pytest.raises(ConnectionError, match="Token refresh failed"):
await broker._ensure_token() await broker._ensure_token()
# Second attempt within cooldown should wait then retry (and still get 403) # Second attempt within cooldown should fail with cooldown error
with pytest.raises(ConnectionError, match="Token refresh failed"): with pytest.raises(ConnectionError, match="Token refresh on cooldown"):
await broker._ensure_token() await broker._ensure_token()
await broker.close() await broker.close()

View File

@@ -1,617 +0,0 @@
"""Tests for OverseasBroker — rankings, price, balance, order, and helpers."""
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock
import aiohttp
import pytest
from src.broker.kis_api import KISBroker
from src.broker.overseas import OverseasBroker, _PRICE_EXCHANGE_MAP, _RANKING_EXCHANGE_MAP
from src.config import Settings
def _make_async_cm(mock_resp: AsyncMock) -> MagicMock:
"""Create an async context manager that returns mock_resp on __aenter__."""
cm = MagicMock()
cm.__aenter__ = AsyncMock(return_value=mock_resp)
cm.__aexit__ = AsyncMock(return_value=False)
return cm
@pytest.fixture
def mock_settings() -> Settings:
"""Provide mock settings with correct default TR_IDs/paths."""
return Settings(
KIS_APP_KEY="test_key",
KIS_APP_SECRET="test_secret",
KIS_ACCOUNT_NO="12345678-01",
GEMINI_API_KEY="test_gemini_key",
)
@pytest.fixture
def mock_broker(mock_settings: Settings) -> KISBroker:
"""Provide a mock KIS broker."""
broker = KISBroker(mock_settings)
broker.get_orderbook = AsyncMock() # type: ignore[method-assign]
return broker
@pytest.fixture
def overseas_broker(mock_broker: KISBroker) -> OverseasBroker:
"""Provide an OverseasBroker wrapping a mock KISBroker."""
return OverseasBroker(mock_broker)
def _setup_broker_mocks(overseas_broker: OverseasBroker, mock_session: MagicMock) -> None:
"""Wire up common broker mocks."""
overseas_broker._broker._rate_limiter.acquire = AsyncMock()
overseas_broker._broker._get_session = MagicMock(return_value=mock_session)
overseas_broker._broker._auth_headers = AsyncMock(return_value={})
class TestRankingExchangeMap:
"""Test exchange code mapping for ranking API."""
def test_nasd_maps_to_nas(self) -> None:
assert _RANKING_EXCHANGE_MAP["NASD"] == "NAS"
def test_nyse_maps_to_nys(self) -> None:
assert _RANKING_EXCHANGE_MAP["NYSE"] == "NYS"
def test_amex_maps_to_ams(self) -> None:
assert _RANKING_EXCHANGE_MAP["AMEX"] == "AMS"
def test_sehk_maps_to_hks(self) -> None:
assert _RANKING_EXCHANGE_MAP["SEHK"] == "HKS"
def test_unmapped_exchange_passes_through(self) -> None:
assert _RANKING_EXCHANGE_MAP.get("UNKNOWN", "UNKNOWN") == "UNKNOWN"
def test_tse_unchanged(self) -> None:
assert _RANKING_EXCHANGE_MAP["TSE"] == "TSE"
class TestConfigDefaults:
"""Test that config defaults match KIS official API specs."""
def test_fluct_tr_id(self, mock_settings: Settings) -> None:
assert mock_settings.OVERSEAS_RANKING_FLUCT_TR_ID == "HHDFS76290000"
def test_volume_tr_id(self, mock_settings: Settings) -> None:
assert mock_settings.OVERSEAS_RANKING_VOLUME_TR_ID == "HHDFS76270000"
def test_fluct_path(self, mock_settings: Settings) -> None:
assert mock_settings.OVERSEAS_RANKING_FLUCT_PATH == "/uapi/overseas-stock/v1/ranking/updown-rate"
def test_volume_path(self, mock_settings: Settings) -> None:
assert mock_settings.OVERSEAS_RANKING_VOLUME_PATH == "/uapi/overseas-stock/v1/ranking/volume-surge"
class TestFetchOverseasRankings:
"""Test fetch_overseas_rankings method."""
@pytest.mark.asyncio
async def test_fluctuation_uses_correct_params(
self, overseas_broker: OverseasBroker
) -> None:
"""Fluctuation ranking should use HHDFS76290000, updown-rate path, and correct params."""
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.json = AsyncMock(
return_value={"output": [{"symb": "AAPL", "name": "Apple"}]}
)
mock_session = MagicMock()
mock_session.get = MagicMock(return_value=_make_async_cm(mock_resp))
_setup_broker_mocks(overseas_broker, mock_session)
overseas_broker._broker._auth_headers = AsyncMock(
return_value={"authorization": "Bearer test"}
)
result = await overseas_broker.fetch_overseas_rankings("NASD", "fluctuation")
assert len(result) == 1
assert result[0]["symb"] == "AAPL"
call_args = mock_session.get.call_args
url = call_args[0][0]
params = call_args[1]["params"]
assert "/uapi/overseas-stock/v1/ranking/updown-rate" in url
assert params["EXCD"] == "NAS"
assert params["NDAY"] == "0"
assert params["GUBN"] == "1"
assert params["VOL_RANG"] == "0"
overseas_broker._broker._auth_headers.assert_called_with("HHDFS76290000")
@pytest.mark.asyncio
async def test_volume_uses_correct_params(
self, overseas_broker: OverseasBroker
) -> None:
"""Volume ranking should use HHDFS76270000, volume-surge path, and correct params."""
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.json = AsyncMock(
return_value={"output": [{"symb": "TSLA", "name": "Tesla"}]}
)
mock_session = MagicMock()
mock_session.get = MagicMock(return_value=_make_async_cm(mock_resp))
_setup_broker_mocks(overseas_broker, mock_session)
overseas_broker._broker._auth_headers = AsyncMock(
return_value={"authorization": "Bearer test"}
)
result = await overseas_broker.fetch_overseas_rankings("NYSE", "volume")
assert len(result) == 1
call_args = mock_session.get.call_args
url = call_args[0][0]
params = call_args[1]["params"]
assert "/uapi/overseas-stock/v1/ranking/volume-surge" in url
assert params["EXCD"] == "NYS"
assert params["MIXN"] == "0"
assert params["VOL_RANG"] == "0"
assert "NDAY" not in params
assert "GUBN" not in params
overseas_broker._broker._auth_headers.assert_called_with("HHDFS76270000")
@pytest.mark.asyncio
async def test_404_returns_empty_list(
self, overseas_broker: OverseasBroker
) -> None:
"""HTTP 404 should return empty list (fallback) instead of raising."""
mock_resp = AsyncMock()
mock_resp.status = 404
mock_resp.text = AsyncMock(return_value="Not Found")
mock_session = MagicMock()
mock_session.get = MagicMock(return_value=_make_async_cm(mock_resp))
_setup_broker_mocks(overseas_broker, mock_session)
result = await overseas_broker.fetch_overseas_rankings("AMEX", "fluctuation")
assert result == []
@pytest.mark.asyncio
async def test_non_404_error_raises(
self, overseas_broker: OverseasBroker
) -> None:
"""Non-404 HTTP errors should raise ConnectionError."""
mock_resp = AsyncMock()
mock_resp.status = 500
mock_resp.text = AsyncMock(return_value="Internal Server Error")
mock_session = MagicMock()
mock_session.get = MagicMock(return_value=_make_async_cm(mock_resp))
_setup_broker_mocks(overseas_broker, mock_session)
with pytest.raises(ConnectionError, match="500"):
await overseas_broker.fetch_overseas_rankings("NASD")
@pytest.mark.asyncio
async def test_empty_response_returns_empty(
self, overseas_broker: OverseasBroker
) -> None:
"""Empty output in response should return empty list."""
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.json = AsyncMock(return_value={"output": []})
mock_session = MagicMock()
mock_session.get = MagicMock(return_value=_make_async_cm(mock_resp))
_setup_broker_mocks(overseas_broker, mock_session)
result = await overseas_broker.fetch_overseas_rankings("NASD")
assert result == []
@pytest.mark.asyncio
async def test_ranking_disabled_returns_empty(
self, overseas_broker: OverseasBroker
) -> None:
"""When OVERSEAS_RANKING_ENABLED=False, should return empty immediately."""
overseas_broker._broker._settings.OVERSEAS_RANKING_ENABLED = False
result = await overseas_broker.fetch_overseas_rankings("NASD")
assert result == []
@pytest.mark.asyncio
async def test_limit_truncates_results(
self, overseas_broker: OverseasBroker
) -> None:
"""Results should be truncated to the specified limit."""
rows = [{"symb": f"SYM{i}"} for i in range(20)]
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.json = AsyncMock(return_value={"output": rows})
mock_session = MagicMock()
mock_session.get = MagicMock(return_value=_make_async_cm(mock_resp))
_setup_broker_mocks(overseas_broker, mock_session)
result = await overseas_broker.fetch_overseas_rankings("NASD", limit=5)
assert len(result) == 5
@pytest.mark.asyncio
async def test_network_error_raises(
self, overseas_broker: OverseasBroker
) -> None:
"""Network errors should raise ConnectionError."""
cm = MagicMock()
cm.__aenter__ = AsyncMock(side_effect=aiohttp.ClientError("timeout"))
cm.__aexit__ = AsyncMock(return_value=False)
mock_session = MagicMock()
mock_session.get = MagicMock(return_value=cm)
_setup_broker_mocks(overseas_broker, mock_session)
with pytest.raises(ConnectionError, match="Network error"):
await overseas_broker.fetch_overseas_rankings("NASD")
@pytest.mark.asyncio
async def test_exchange_code_mapping_applied(
self, overseas_broker: OverseasBroker
) -> None:
"""All major exchanges should use mapped codes in API params."""
for original, mapped in [("NASD", "NAS"), ("NYSE", "NYS"), ("AMEX", "AMS")]:
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.json = AsyncMock(return_value={"output": [{"symb": "X"}]})
mock_session = MagicMock()
mock_session.get = MagicMock(return_value=_make_async_cm(mock_resp))
_setup_broker_mocks(overseas_broker, mock_session)
await overseas_broker.fetch_overseas_rankings(original)
call_params = mock_session.get.call_args[1]["params"]
assert call_params["EXCD"] == mapped, f"{original} should map to {mapped}"
class TestGetOverseasPrice:
"""Test get_overseas_price method."""
@pytest.mark.asyncio
async def test_success(self, overseas_broker: OverseasBroker) -> None:
"""Successful price fetch returns JSON data."""
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.json = AsyncMock(return_value={"output": {"last": "150.00"}})
mock_session = MagicMock()
mock_session.get = MagicMock(return_value=_make_async_cm(mock_resp))
_setup_broker_mocks(overseas_broker, mock_session)
overseas_broker._broker._auth_headers = AsyncMock(return_value={"authorization": "Bearer t"})
result = await overseas_broker.get_overseas_price("NASD", "AAPL")
assert result["output"]["last"] == "150.00"
call_args = mock_session.get.call_args
params = call_args[1]["params"]
# NASD is mapped to NAS for the price inquiry API (same as ranking API).
assert params["EXCD"] == "NAS"
assert params["SYMB"] == "AAPL"
@pytest.mark.asyncio
async def test_http_error_raises(self, overseas_broker: OverseasBroker) -> None:
"""Non-200 response should raise ConnectionError."""
mock_resp = AsyncMock()
mock_resp.status = 400
mock_resp.text = AsyncMock(return_value="Bad Request")
mock_session = MagicMock()
mock_session.get = MagicMock(return_value=_make_async_cm(mock_resp))
_setup_broker_mocks(overseas_broker, mock_session)
with pytest.raises(ConnectionError, match="get_overseas_price failed"):
await overseas_broker.get_overseas_price("NASD", "AAPL")
@pytest.mark.asyncio
async def test_network_error_raises(self, overseas_broker: OverseasBroker) -> None:
"""Network error should raise ConnectionError."""
cm = MagicMock()
cm.__aenter__ = AsyncMock(side_effect=aiohttp.ClientError("conn refused"))
cm.__aexit__ = AsyncMock(return_value=False)
mock_session = MagicMock()
mock_session.get = MagicMock(return_value=cm)
_setup_broker_mocks(overseas_broker, mock_session)
with pytest.raises(ConnectionError, match="Network error"):
await overseas_broker.get_overseas_price("NASD", "AAPL")
class TestGetOverseasBalance:
"""Test get_overseas_balance method."""
@pytest.mark.asyncio
async def test_success(self, overseas_broker: OverseasBroker) -> None:
"""Successful balance fetch returns JSON data."""
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.json = AsyncMock(return_value={"output1": [{"pdno": "AAPL"}]})
mock_session = MagicMock()
mock_session.get = MagicMock(return_value=_make_async_cm(mock_resp))
_setup_broker_mocks(overseas_broker, mock_session)
result = await overseas_broker.get_overseas_balance("NASD")
assert result["output1"][0]["pdno"] == "AAPL"
@pytest.mark.asyncio
async def test_http_error_raises(self, overseas_broker: OverseasBroker) -> None:
"""Non-200 should raise ConnectionError."""
mock_resp = AsyncMock()
mock_resp.status = 500
mock_resp.text = AsyncMock(return_value="Server Error")
mock_session = MagicMock()
mock_session.get = MagicMock(return_value=_make_async_cm(mock_resp))
_setup_broker_mocks(overseas_broker, mock_session)
with pytest.raises(ConnectionError, match="get_overseas_balance failed"):
await overseas_broker.get_overseas_balance("NASD")
@pytest.mark.asyncio
async def test_network_error_raises(self, overseas_broker: OverseasBroker) -> None:
"""Network error should raise ConnectionError."""
cm = MagicMock()
cm.__aenter__ = AsyncMock(side_effect=TimeoutError("timeout"))
cm.__aexit__ = AsyncMock(return_value=False)
mock_session = MagicMock()
mock_session.get = MagicMock(return_value=cm)
_setup_broker_mocks(overseas_broker, mock_session)
with pytest.raises(ConnectionError, match="Network error"):
await overseas_broker.get_overseas_balance("NYSE")
class TestSendOverseasOrder:
"""Test send_overseas_order method."""
@pytest.mark.asyncio
async def test_buy_market_order(self, overseas_broker: OverseasBroker) -> None:
"""Market buy order should use VTTT1002U and ORD_DVSN=01."""
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.json = AsyncMock(return_value={"rt_cd": "0"})
mock_session = MagicMock()
mock_session.post = MagicMock(return_value=_make_async_cm(mock_resp))
_setup_broker_mocks(overseas_broker, mock_session)
overseas_broker._broker._get_hash_key = AsyncMock(return_value="hashval")
result = await overseas_broker.send_overseas_order("NASD", "AAPL", "BUY", 10)
assert result["rt_cd"] == "0"
# Verify BUY TR_ID
overseas_broker._broker._auth_headers.assert_called_with("VTTT1002U")
call_args = mock_session.post.call_args
body = call_args[1]["json"]
assert body["ORD_DVSN"] == "01" # market order
assert body["OVRS_ORD_UNPR"] == "0"
@pytest.mark.asyncio
async def test_sell_limit_order(self, overseas_broker: OverseasBroker) -> None:
"""Limit sell order should use VTTT1006U and ORD_DVSN=00."""
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.json = AsyncMock(return_value={"rt_cd": "0"})
mock_session = MagicMock()
mock_session.post = MagicMock(return_value=_make_async_cm(mock_resp))
_setup_broker_mocks(overseas_broker, mock_session)
overseas_broker._broker._get_hash_key = AsyncMock(return_value="hashval")
result = await overseas_broker.send_overseas_order("NYSE", "MSFT", "SELL", 5, price=350.0)
assert result["rt_cd"] == "0"
overseas_broker._broker._auth_headers.assert_called_with("VTTT1006U")
call_args = mock_session.post.call_args
body = call_args[1]["json"]
assert body["ORD_DVSN"] == "00" # limit order
assert body["OVRS_ORD_UNPR"] == "350.0"
@pytest.mark.asyncio
async def test_order_http_error_raises(self, overseas_broker: OverseasBroker) -> None:
"""Non-200 should raise ConnectionError."""
mock_resp = AsyncMock()
mock_resp.status = 400
mock_resp.text = AsyncMock(return_value="Bad Request")
mock_session = MagicMock()
mock_session.post = MagicMock(return_value=_make_async_cm(mock_resp))
_setup_broker_mocks(overseas_broker, mock_session)
overseas_broker._broker._get_hash_key = AsyncMock(return_value="hashval")
with pytest.raises(ConnectionError, match="send_overseas_order failed"):
await overseas_broker.send_overseas_order("NASD", "AAPL", "BUY", 1)
@pytest.mark.asyncio
async def test_order_network_error_raises(self, overseas_broker: OverseasBroker) -> None:
"""Network error should raise ConnectionError."""
cm = MagicMock()
cm.__aenter__ = AsyncMock(side_effect=aiohttp.ClientError("conn reset"))
cm.__aexit__ = AsyncMock(return_value=False)
mock_session = MagicMock()
mock_session.post = MagicMock(return_value=cm)
_setup_broker_mocks(overseas_broker, mock_session)
overseas_broker._broker._get_hash_key = AsyncMock(return_value="hashval")
with pytest.raises(ConnectionError, match="Network error"):
await overseas_broker.send_overseas_order("NASD", "TSLA", "SELL", 2)
class TestGetCurrencyCode:
"""Test _get_currency_code mapping."""
def test_us_exchanges(self, overseas_broker: OverseasBroker) -> None:
assert overseas_broker._get_currency_code("NASD") == "USD"
assert overseas_broker._get_currency_code("NYSE") == "USD"
assert overseas_broker._get_currency_code("AMEX") == "USD"
def test_japan(self, overseas_broker: OverseasBroker) -> None:
assert overseas_broker._get_currency_code("TSE") == "JPY"
def test_hong_kong(self, overseas_broker: OverseasBroker) -> None:
assert overseas_broker._get_currency_code("SEHK") == "HKD"
def test_china(self, overseas_broker: OverseasBroker) -> None:
assert overseas_broker._get_currency_code("SHAA") == "CNY"
assert overseas_broker._get_currency_code("SZAA") == "CNY"
def test_vietnam(self, overseas_broker: OverseasBroker) -> None:
assert overseas_broker._get_currency_code("HNX") == "VND"
assert overseas_broker._get_currency_code("HSX") == "VND"
def test_unknown_defaults_usd(self, overseas_broker: OverseasBroker) -> None:
assert overseas_broker._get_currency_code("UNKNOWN") == "USD"
class TestExtractRankingRows:
"""Test _extract_ranking_rows helper."""
def test_output_key(self, overseas_broker: OverseasBroker) -> None:
data = {"output": [{"a": 1}, {"b": 2}]}
assert overseas_broker._extract_ranking_rows(data) == [{"a": 1}, {"b": 2}]
def test_output1_key(self, overseas_broker: OverseasBroker) -> None:
data = {"output1": [{"c": 3}]}
assert overseas_broker._extract_ranking_rows(data) == [{"c": 3}]
def test_output2_key(self, overseas_broker: OverseasBroker) -> None:
data = {"output2": [{"d": 4}]}
assert overseas_broker._extract_ranking_rows(data) == [{"d": 4}]
def test_no_list_returns_empty(self, overseas_broker: OverseasBroker) -> None:
data = {"output": "not a list"}
assert overseas_broker._extract_ranking_rows(data) == []
def test_empty_data(self, overseas_broker: OverseasBroker) -> None:
assert overseas_broker._extract_ranking_rows({}) == []
def test_filters_non_dict_rows(self, overseas_broker: OverseasBroker) -> None:
data = {"output": [{"a": 1}, "invalid", {"b": 2}]}
assert overseas_broker._extract_ranking_rows(data) == [{"a": 1}, {"b": 2}]
# ---------------------------------------------------------------------------
# Price exchange code mapping
# ---------------------------------------------------------------------------
class TestPriceExchangeMap:
"""Test that get_overseas_price uses the short exchange codes."""
def test_price_map_equals_ranking_map(self) -> None:
assert _PRICE_EXCHANGE_MAP is _RANKING_EXCHANGE_MAP
def test_nasd_maps_to_nas(self) -> None:
assert _PRICE_EXCHANGE_MAP["NASD"] == "NAS"
def test_amex_maps_to_ams(self) -> None:
assert _PRICE_EXCHANGE_MAP["AMEX"] == "AMS"
def test_nyse_maps_to_nys(self) -> None:
assert _PRICE_EXCHANGE_MAP["NYSE"] == "NYS"
@pytest.mark.asyncio
async def test_get_overseas_price_uses_mapped_excd(
self, overseas_broker: OverseasBroker
) -> None:
"""AMEX should be sent as AMS to the price API."""
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.json = AsyncMock(return_value={"output": {"last": "44.30"}})
mock_session = MagicMock()
mock_session.get = MagicMock(return_value=_make_async_cm(mock_resp))
_setup_broker_mocks(overseas_broker, mock_session)
overseas_broker._broker._auth_headers = AsyncMock(return_value={})
await overseas_broker.get_overseas_price("AMEX", "EWUS")
params = mock_session.get.call_args[1]["params"]
assert params["EXCD"] == "AMS" # mapped, not raw "AMEX"
assert params["SYMB"] == "EWUS"
@pytest.mark.asyncio
async def test_get_overseas_price_nasd_uses_nas(
self, overseas_broker: OverseasBroker
) -> None:
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.json = AsyncMock(return_value={"output": {"last": "220.00"}})
mock_session = MagicMock()
mock_session.get = MagicMock(return_value=_make_async_cm(mock_resp))
_setup_broker_mocks(overseas_broker, mock_session)
overseas_broker._broker._auth_headers = AsyncMock(return_value={})
await overseas_broker.get_overseas_price("NASD", "AAPL")
params = mock_session.get.call_args[1]["params"]
assert params["EXCD"] == "NAS"
# ---------------------------------------------------------------------------
# PAPER_OVERSEAS_CASH config default
# ---------------------------------------------------------------------------
class TestPaperOverseasCash:
def test_default_value(self) -> None:
settings = Settings(
KIS_APP_KEY="x",
KIS_APP_SECRET="x",
KIS_ACCOUNT_NO="12345678-01",
GEMINI_API_KEY="x",
)
assert settings.PAPER_OVERSEAS_CASH == 50000.0
def test_can_be_set_via_env(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("PAPER_OVERSEAS_CASH", "100000.0")
settings = Settings(
KIS_APP_KEY="x",
KIS_APP_SECRET="x",
KIS_ACCOUNT_NO="12345678-01",
GEMINI_API_KEY="x",
)
assert settings.PAPER_OVERSEAS_CASH == 100000.0
def test_zero_disables_fallback(self) -> None:
settings = Settings(
KIS_APP_KEY="x",
KIS_APP_SECRET="x",
KIS_ACCOUNT_NO="12345678-01",
GEMINI_API_KEY="x",
PAPER_OVERSEAS_CASH=0.0,
)
assert settings.PAPER_OVERSEAS_CASH == 0.0

View File

@@ -164,23 +164,18 @@ class TestGeneratePlaybook:
assert pb.market_outlook == MarketOutlook.NEUTRAL assert pb.market_outlook == MarketOutlook.NEUTRAL
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_gemini_failure_returns_smart_fallback(self) -> None: async def test_gemini_failure_returns_defensive(self) -> None:
planner = _make_planner() planner = _make_planner()
planner._gemini.decide = AsyncMock(side_effect=RuntimeError("API timeout")) planner._gemini.decide = AsyncMock(side_effect=RuntimeError("API timeout"))
# oversold candidate (signal="oversold", rsi=28.5)
candidates = [_candidate()] candidates = [_candidate()]
pb = await planner.generate_playbook("KR", candidates, today=date(2026, 2, 8)) pb = await planner.generate_playbook("KR", candidates, today=date(2026, 2, 8))
assert pb.default_action == ScenarioAction.HOLD assert pb.default_action == ScenarioAction.HOLD
# Smart fallback uses NEUTRAL outlook (not NEUTRAL_TO_BEARISH) assert pb.market_outlook == MarketOutlook.NEUTRAL_TO_BEARISH
assert pb.market_outlook == MarketOutlook.NEUTRAL
assert pb.stock_count == 1 assert pb.stock_count == 1
# Oversold candidate → first scenario is BUY, second is SELL stop-loss # Defensive playbook has stop-loss scenarios
scenarios = pb.stock_playbooks[0].scenarios assert pb.stock_playbooks[0].scenarios[0].action == ScenarioAction.SELL
assert scenarios[0].action == ScenarioAction.BUY
assert scenarios[0].condition.rsi_below == 30
assert scenarios[1].action == ScenarioAction.SELL
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_gemini_failure_empty_when_defensive_disabled(self) -> None: async def test_gemini_failure_empty_when_defensive_disabled(self) -> None:
@@ -662,171 +657,3 @@ class TestDefensivePlaybook:
assert pb.stock_count == 0 assert pb.stock_count == 0
assert pb.market == "US" assert pb.market == "US"
assert pb.market_outlook == MarketOutlook.NEUTRAL assert pb.market_outlook == MarketOutlook.NEUTRAL
# ---------------------------------------------------------------------------
# Smart fallback playbook
# ---------------------------------------------------------------------------
class TestSmartFallbackPlaybook:
"""Tests for _smart_fallback_playbook — rule-based BUY/SELL on Gemini failure."""
def _make_settings(self) -> Settings:
return Settings(
KIS_APP_KEY="test",
KIS_APP_SECRET="test",
KIS_ACCOUNT_NO="12345678-01",
GEMINI_API_KEY="test",
RSI_OVERSOLD_THRESHOLD=30,
VOL_MULTIPLIER=2.0,
)
def test_momentum_candidate_gets_buy_on_volume(self) -> None:
candidates = [
_candidate(code="CHOW", signal="momentum", volume_ratio=13.64, rsi=100.0)
]
settings = self._make_settings()
pb = PreMarketPlanner._smart_fallback_playbook(
date(2026, 2, 17), "US_AMEX", candidates, settings
)
assert pb.stock_count == 1
sp = pb.stock_playbooks[0]
assert sp.stock_code == "CHOW"
# First scenario: BUY with volume_ratio_above
buy_sc = sp.scenarios[0]
assert buy_sc.action == ScenarioAction.BUY
assert buy_sc.condition.volume_ratio_above == 2.0
assert buy_sc.condition.rsi_below is None
assert buy_sc.confidence == 80
# Second scenario: stop-loss SELL
sell_sc = sp.scenarios[1]
assert sell_sc.action == ScenarioAction.SELL
assert sell_sc.condition.price_change_pct_below == -3.0
def test_oversold_candidate_gets_buy_on_rsi(self) -> None:
candidates = [
_candidate(code="005930", signal="oversold", rsi=22.0, volume_ratio=3.5)
]
settings = self._make_settings()
pb = PreMarketPlanner._smart_fallback_playbook(
date(2026, 2, 17), "KR", candidates, settings
)
sp = pb.stock_playbooks[0]
buy_sc = sp.scenarios[0]
assert buy_sc.action == ScenarioAction.BUY
assert buy_sc.condition.rsi_below == 30
assert buy_sc.condition.volume_ratio_above is None
def test_all_candidates_have_stop_loss_sell(self) -> None:
candidates = [
_candidate(code="AAA", signal="momentum", volume_ratio=5.0),
_candidate(code="BBB", signal="oversold", rsi=25.0),
]
settings = self._make_settings()
pb = PreMarketPlanner._smart_fallback_playbook(
date(2026, 2, 17), "US_NASDAQ", candidates, settings
)
assert pb.stock_count == 2
for sp in pb.stock_playbooks:
sell_scenarios = [s for s in sp.scenarios if s.action == ScenarioAction.SELL]
assert len(sell_scenarios) == 1
assert sell_scenarios[0].condition.price_change_pct_below == -3.0
assert sell_scenarios[0].condition.price_change_pct_below == -3.0
def test_market_outlook_is_neutral(self) -> None:
candidates = [_candidate(signal="momentum", volume_ratio=5.0)]
settings = self._make_settings()
pb = PreMarketPlanner._smart_fallback_playbook(
date(2026, 2, 17), "US_AMEX", candidates, settings
)
assert pb.market_outlook == MarketOutlook.NEUTRAL
def test_default_action_is_hold(self) -> None:
candidates = [_candidate(signal="momentum", volume_ratio=5.0)]
settings = self._make_settings()
pb = PreMarketPlanner._smart_fallback_playbook(
date(2026, 2, 17), "US_AMEX", candidates, settings
)
assert pb.default_action == ScenarioAction.HOLD
def test_has_global_reduce_all_rule(self) -> None:
candidates = [_candidate(signal="momentum", volume_ratio=5.0)]
settings = self._make_settings()
pb = PreMarketPlanner._smart_fallback_playbook(
date(2026, 2, 17), "US_AMEX", candidates, settings
)
assert len(pb.global_rules) == 1
rule = pb.global_rules[0]
assert rule.action == ScenarioAction.REDUCE_ALL
assert "portfolio_pnl_pct" in rule.condition
def test_empty_candidates_returns_empty_playbook(self) -> None:
settings = self._make_settings()
pb = PreMarketPlanner._smart_fallback_playbook(
date(2026, 2, 17), "US_AMEX", [], settings
)
assert pb.stock_count == 0
def test_vol_multiplier_applied_from_settings(self) -> None:
"""VOL_MULTIPLIER=3.0 should set volume_ratio_above=3.0 for momentum."""
candidates = [_candidate(signal="momentum", volume_ratio=5.0)]
settings = self._make_settings()
settings = settings.model_copy(update={"VOL_MULTIPLIER": 3.0})
pb = PreMarketPlanner._smart_fallback_playbook(
date(2026, 2, 17), "US_AMEX", candidates, settings
)
buy_sc = pb.stock_playbooks[0].scenarios[0]
assert buy_sc.condition.volume_ratio_above == 3.0
def test_rsi_oversold_threshold_applied_from_settings(self) -> None:
"""RSI_OVERSOLD_THRESHOLD=25 should set rsi_below=25 for oversold."""
candidates = [_candidate(signal="oversold", rsi=22.0)]
settings = self._make_settings()
settings = settings.model_copy(update={"RSI_OVERSOLD_THRESHOLD": 25})
pb = PreMarketPlanner._smart_fallback_playbook(
date(2026, 2, 17), "KR", candidates, settings
)
buy_sc = pb.stock_playbooks[0].scenarios[0]
assert buy_sc.condition.rsi_below == 25
@pytest.mark.asyncio
async def test_generate_playbook_uses_smart_fallback_on_gemini_error(self) -> None:
"""generate_playbook() should use smart fallback (not defensive) on API failure."""
planner = _make_planner()
planner._gemini.decide = AsyncMock(side_effect=ConnectionError("429 quota exceeded"))
# momentum candidate
candidates = [
_candidate(code="CHOW", signal="momentum", volume_ratio=13.64, rsi=100.0)
]
pb = await planner.generate_playbook(
"US_AMEX", candidates, today=date(2026, 2, 18)
)
# Should NOT be all-SELL defensive; should have BUY for momentum
assert pb.stock_count == 1
buy_scenarios = [
s for s in pb.stock_playbooks[0].scenarios
if s.action == ScenarioAction.BUY
]
assert len(buy_scenarios) == 1
assert buy_scenarios[0].condition.volume_ratio_above == 2.0 # VOL_MULTIPLIER default