Compare commits

..

1 Commits

Author SHA1 Message Date
agentson
f7d33e69d1 docs: 실전 전환 체크리스트 작성 (issue #218)
Some checks failed
CI / test (pull_request) Has been cancelled
docs/live-trading-checklist.md 신규 작성:
- 사전 조건: KIS 실전 계좌/OpenAPI 신청, 리스크 파라미터 검토
- 환경 설정: .env 수정 가이드, TR_ID 분기표 (모의/실전)
- 최종 확인: DB 백업, 실행 명령, 시작 직후 점검
- 비상 정지: Ctrl+C / /stop 명령 / CB 발동
- 롤백 절차: MODE=paper 복원

CLAUDE.md: 문서 목록에 체크리스트 링크 추가

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-23 12:55:37 +09:00
12 changed files with 139 additions and 1299 deletions

View File

@@ -94,6 +94,7 @@ Smart Scanner runs in `TRADE_MODE=realtime` only. Daily mode uses static watchli
- **[Testing](docs/testing.md)** — Test structure, coverage requirements, writing tests
- **[Agent Policies](docs/agents.md)** — Prime directives, constraints, prohibited actions
- **[Requirements Log](docs/requirements-log.md)** — User requirements and feedback tracking
- **[Live Trading Checklist](docs/live-trading-checklist.md)** — 모의→실전 전환 체크리스트
## Core Principles

View File

@@ -0,0 +1,131 @@
# 실전 전환 체크리스트
모의 거래(paper)에서 실전(live)으로 전환하기 전에 아래 항목을 **순서대로** 모두 확인하세요.
---
## 1. 사전 조건
### 1-1. KIS OpenAPI 실전 계좌 준비
- [ ] 한국투자증권 계좌 개설 완료 (일반 위탁 계좌)
- [ ] OpenAPI 실전 사용 신청 (KIS 홈페이지 → Open API → 서비스 신청)
- [ ] 실전용 APP_KEY / APP_SECRET 발급 완료
- [ ] KIS_ACCOUNT_NO 형식 확인: `XXXXXXXX-XX` (8자리-2자리)
### 1-2. 리스크 파라미터 검토
- [ ] `CIRCUIT_BREAKER_PCT` 확인: 기본값 -3.0% (더 엄격하게 조정 권장)
- [ ] `FAT_FINGER_PCT` 확인: 기본값 30.0% (1회 주문 최대 잔고 대비 %)
- [ ] `CONFIDENCE_THRESHOLD` 확인: BEARISH ≥ 90, NEUTRAL ≥ 80, BULLISH ≥ 75
- [ ] 초기 투자금 결정 및 해외 주식 운용 한도 설정
### 1-3. 시스템 요건
- [ ] 커버리지 80% 이상 유지 확인: `pytest --cov=src`
- [ ] 타입 체크 통과: `mypy src/ --strict`
- [ ] Lint 통과: `ruff check src/ tests/`
---
## 2. 환경 설정
### 2-1. `.env` 파일 수정
```bash
# 1. KIS 실전 URL로 변경 (모의: openapivts 포트 29443)
KIS_BASE_URL=https://openapi.koreainvestment.com:9443
# 2. 실전 APP_KEY / APP_SECRET으로 교체
KIS_APP_KEY=<실전_APP_KEY>
KIS_APP_SECRET=<실전_APP_SECRET>
KIS_ACCOUNT_NO=<실전_계좌번호>
# 3. 모드를 live로 변경
MODE=live
# 4. PAPER_OVERSEAS_CASH 비활성화 (live 모드에선 무시되지만 명시적으로 0 설정)
PAPER_OVERSEAS_CASH=0
```
> ⚠️ `KIS_BASE_URL` 포트 주의:
> - **모의(VTS)**: `https://openapivts.koreainvestment.com:29443`
> - **실전**: `https://openapi.koreainvestment.com:9443`
### 2-2. TR_ID 자동 분기 확인
아래 TR_ID는 `MODE` 값에 따라 코드에서 **자동으로 선택**됩니다.
별도 설정 불필요하나, 문제 발생 시 아래 표를 참조하세요.
| 구분 | 모의 TR_ID | 실전 TR_ID |
|------|-----------|-----------|
| 국내 잔고 조회 | `VTTC8434R` | `TTTC8434R` |
| 국내 현금 매수 | `VTTC0012U` | `TTTC0012U` |
| 국내 현금 매도 | `VTTC0011U` | `TTTC0011U` |
| 해외 잔고 조회 | `VTTS3012R` | `TTTS3012R` |
| 해외 매수 | `VTTT1002U` | `TTTT1002U` |
| 해외 매도 | `VTTT1001U` | `TTTT1006U` |
> **출처**: `docs/한국투자증권_오픈API_전체문서_20260221_030000.xlsx` (공식 문서 기준)
---
## 3. 최종 확인
### 3-1. 실전 시작 전 점검
- [ ] DB 백업 완료: `data/trade_logs.db``data/backups/`
- [ ] Telegram 알림 설정 확인 (실전에서는 알림이 더욱 중요)
- [ ] 소액으로 첫 거래 진행 후 TR_ID/계좌 정상 동작 확인
### 3-2. 실행 명령
```bash
# 실전 모드로 실행
python -m src.main --mode=live
# 대시보드 함께 실행 (별도 터미널에서 모니터링)
python -m src.main --mode=live --dashboard
```
### 3-3. 실전 시작 직후 확인 사항
- [ ] 로그에 `MODE=live` 출력 확인
- [ ] 첫 잔고 조회 성공 (ConnectionError 없음)
- [ ] Telegram 알림 수신 확인 ("System started")
- [ ] 첫 주문 후 KIS 앱에서 체결 내역 확인
---
## 4. 비상 정지 방법
### 즉각 정지
```bash
# 터미널에서 Ctrl+C (정상 종료 트리거)
# 또는 Telegram 봇 명령:
/stop
```
### Circuit Breaker 발동 시
- CB가 발동되면 자동으로 거래 중단 및 Telegram 알림 전송
- CB 임계값: `CIRCUIT_BREAKER_PCT` (기본 -3.0%)
- **임계값은 엄격하게만 조정 가능** (더 낮은 음수 값으로만 변경)
---
## 5. 롤백 절차
실전 전환 후 문제 발생 시:
```bash
# 1. 즉시 .env에서 MODE=paper로 복원
# 2. 재시작
python -m src.main --mode=paper
# 3. DB에서 최근 거래 확인
sqlite3 data/trade_logs.db "SELECT * FROM trades ORDER BY id DESC LIMIT 20;"
```
---
## 관련 문서
- [시스템 아키텍처](architecture.md)
- [워크플로우 가이드](workflow.md)
- [재해 복구](disaster_recovery.md)
- [Agent 제약 조건](agents.md)

View File

@@ -17,7 +17,7 @@ class Settings(BaseSettings):
# Google Gemini
GEMINI_API_KEY: str
GEMINI_MODEL: str = "gemini-2.0-flash"
GEMINI_MODEL: str = "gemini-pro"
# External Data APIs (optional — for data-driven decisions)
NEWS_API_KEY: str | None = None

View File

@@ -88,47 +88,6 @@ DAILY_TRADE_SESSIONS = 4 # Number of trading sessions per day
TRADE_SESSION_INTERVAL_HOURS = 6 # Hours between sessions
async def _retry_connection(coro_factory: Any, *args: Any, label: str = "", **kwargs: Any) -> Any:
"""Call an async function retrying on ConnectionError with exponential backoff.
Retries up to MAX_CONNECTION_RETRIES times (exclusive of the first attempt),
sleeping 2^attempt seconds between attempts. Use only for idempotent read
operations — never for order submission.
Args:
coro_factory: Async callable (method or function) to invoke.
*args: Positional arguments forwarded to coro_factory.
label: Human-readable label for log messages.
**kwargs: Keyword arguments forwarded to coro_factory.
Raises:
ConnectionError: If all retries are exhausted.
"""
for attempt in range(1, MAX_CONNECTION_RETRIES + 1):
try:
return await coro_factory(*args, **kwargs)
except ConnectionError as exc:
if attempt < MAX_CONNECTION_RETRIES:
wait_secs = 2 ** attempt
logger.warning(
"Connection error %s (attempt %d/%d), retrying in %ds: %s",
label,
attempt,
MAX_CONNECTION_RETRIES,
wait_secs,
exc,
)
await asyncio.sleep(wait_secs)
else:
logger.error(
"Connection error %s — all %d retries exhausted: %s",
label,
MAX_CONNECTION_RETRIES,
exc,
)
raise
def _extract_symbol_from_holding(item: dict[str, Any]) -> str:
"""Extract symbol from overseas holding payload variants."""
for key in (
@@ -1005,18 +964,11 @@ async def run_daily_session(
try:
if market.is_domestic:
current_price, price_change_pct, foreigner_net = (
await _retry_connection(
broker.get_current_price,
stock_code,
label=stock_code,
)
await broker.get_current_price(stock_code)
)
else:
price_data = await _retry_connection(
overseas_broker.get_overseas_price,
market.exchange_code,
stock_code,
label=f"{stock_code}@{market.exchange_code}",
price_data = await overseas_broker.get_overseas_price(
market.exchange_code, stock_code
)
current_price = safe_float(
price_data.get("output", {}).get("last", "0")
@@ -1067,27 +1019,9 @@ async def run_daily_session(
logger.warning("No valid stock data for market %s", market.code)
continue
# Get balance data once for the market (read-only — safe to retry)
try:
if market.is_domestic:
balance_data = await _retry_connection(
broker.get_balance, label=f"balance:{market.code}"
)
else:
balance_data = await _retry_connection(
overseas_broker.get_overseas_balance,
market.exchange_code,
label=f"overseas_balance:{market.exchange_code}",
)
except ConnectionError as exc:
logger.error(
"Balance fetch failed for market %s after all retries — skipping market: %s",
market.code,
exc,
)
continue
# Get balance data once for the market
if market.is_domestic:
balance_data = await broker.get_balance()
output2 = balance_data.get("output2", [{}])
total_eval = safe_float(
output2[0].get("tot_evlu_amt", "0")
@@ -1099,6 +1033,7 @@ async def run_daily_session(
output2[0].get("pchs_amt_smtl_amt", "0")
) if output2 else 0
else:
balance_data = await overseas_broker.get_overseas_balance(market.exchange_code)
output2 = balance_data.get("output2", [{}])
if isinstance(output2, list) and output2:
balance_info = output2[0]

View File

@@ -1,114 +0,0 @@
"""Auto-generated strategy: v20260220_210124
Generated at: 2026-02-20T21:01:24.706847+00:00
Rationale: Auto-evolved from 6 failures. Primary failure markets: ['US_AMEX', 'US_NYSE', 'US_NASDAQ']. Average loss: -194.69
"""
from __future__ import annotations
from typing import Any
from src.strategies.base import BaseStrategy
class Strategy_v20260220_210124(BaseStrategy):
"""Strategy: v20260220_210124"""
def evaluate(self, market_data: dict[str, Any]) -> dict[str, Any]:
import datetime
# --- Strategy Constants ---
# Minimum price for a stock to be considered for trading (avoids penny stocks)
MIN_PRICE = 5.0
# Momentum signal thresholds (stricter than previous failures)
MOMENTUM_PRICE_CHANGE_THRESHOLD = 7.0 # % price change
MOMENTUM_VOLUME_RATIO_THRESHOLD = 4.0 # X times average volume
# Oversold signal thresholds (more conservative)
OVERSOLD_RSI_THRESHOLD = 25.0 # RSI value (lower means more oversold)
# Confidence levels
CONFIDENCE_HOLD = 30
CONFIDENCE_BUY_OVERSOLD = 65
CONFIDENCE_BUY_MOMENTUM = 85
CONFIDENCE_BUY_STRONG_MOMENTUM = 90 # For higher-priced stocks with strong momentum
# Market hours in UTC (9:30 AM ET to 4:00 PM ET)
MARKET_OPEN_UTC = datetime.time(14, 30)
MARKET_CLOSE_UTC = datetime.time(21, 0)
# Volatile periods within market hours (UTC) to avoid
# First hour after open (14:30 UTC - 15:30 UTC)
VOLATILE_OPEN_END_UTC = datetime.time(15, 30)
# Last 30 minutes before close (20:30 UTC - 21:00 UTC)
VOLATILE_CLOSE_START_UTC = datetime.time(20, 30)
current_price = market_data.get('current_price')
price_change_pct = market_data.get('price_change_pct')
volume_ratio = market_data.get('volume_ratio') # Assumed pre-computed indicator
rsi = market_data.get('rsi') # Assumed pre-computed indicator
timestamp_str = market_data.get('timestamp')
action = "HOLD"
confidence = CONFIDENCE_HOLD
rationale = "Initial HOLD: No clear signal or conditions not met."
# --- 1. Basic Data Validation ---
if current_price is None or price_change_pct is None:
return {"action": "HOLD", "confidence": CONFIDENCE_HOLD,
"rationale": "Insufficient core data (price or price change) to evaluate."}
# --- 2. Price Filter: Avoid low-priced/penny stocks ---
if current_price < MIN_PRICE:
return {"action": "HOLD", "confidence": CONFIDENCE_HOLD,
"rationale": f"Avoiding low-priced stock (${current_price:.2f} < ${MIN_PRICE:.2f})."}
# --- 3. Time Filter: Only trade during core market hours ---
if timestamp_str:
try:
dt_object = datetime.datetime.fromisoformat(timestamp_str)
current_time_utc = dt_object.time()
if not (MARKET_OPEN_UTC <= current_time_utc < MARKET_CLOSE_UTC):
return {"action": "HOLD", "confidence": CONFIDENCE_HOLD,
"rationale": f"Avoiding trade outside core market hours ({current_time_utc} UTC)."}
if (MARKET_OPEN_UTC <= current_time_utc < VOLATILE_OPEN_END_UTC) or \
(VOLATILE_CLOSE_START_UTC <= current_time_utc < MARKET_CLOSE_UTC):
return {"action": "HOLD", "confidence": CONFIDENCE_HOLD,
"rationale": f"Avoiding trade during volatile market open/close periods ({current_time_utc} UTC)."}
except ValueError:
rationale += " (Warning: Malformed timestamp, time filters skipped)"
# --- Initialize signal states ---
has_momentum_buy_signal = False
has_oversold_buy_signal = False
# --- 4. Evaluate Enhanced Buy Signals ---
# Momentum Buy Signal
if volume_ratio is not None and \
price_change_pct > MOMENTUM_PRICE_CHANGE_THRESHOLD and \
volume_ratio > MOMENTUM_VOLUME_RATIO_THRESHOLD:
has_momentum_buy_signal = True
rationale = f"Momentum BUY: Price change {price_change_pct:.2f}%, Volume {volume_ratio:.2f}x."
confidence = CONFIDENCE_BUY_MOMENTUM
if current_price >= 10.0:
confidence = CONFIDENCE_BUY_STRONG_MOMENTUM
# Oversold Buy Signal
if rsi is not None and rsi < OVERSOLD_RSI_THRESHOLD:
has_oversold_buy_signal = True
if not has_momentum_buy_signal:
rationale = f"Oversold BUY: RSI {rsi:.2f}."
confidence = CONFIDENCE_BUY_OVERSOLD
if current_price >= 10.0:
confidence = min(CONFIDENCE_BUY_OVERSOLD + 5, 80)
# --- 5. Decision Logic ---
if has_momentum_buy_signal:
action = "BUY"
elif has_oversold_buy_signal:
action = "BUY"
return {"action": action, "confidence": confidence, "rationale": rationale}

View File

@@ -1,97 +0,0 @@
"""Auto-generated strategy: v20260220_210159
Generated at: 2026-02-20T21:01:59.391523+00:00
Rationale: Auto-evolved from 6 failures. Primary failure markets: ['US_AMEX', 'US_NYSE', 'US_NASDAQ']. Average loss: -194.69
"""
from __future__ import annotations
from typing import Any
from src.strategies.base import BaseStrategy
class Strategy_v20260220_210159(BaseStrategy):
"""Strategy: v20260220_210159"""
def evaluate(self, market_data: dict[str, Any]) -> dict[str, Any]:
import datetime
current_price = market_data.get('current_price')
price_change_pct = market_data.get('price_change_pct')
volume_ratio = market_data.get('volume_ratio')
rsi = market_data.get('rsi')
timestamp_str = market_data.get('timestamp')
market_name = market_data.get('market')
# Default action
action = "HOLD"
confidence = 0
rationale = "No strong signal or conditions not met."
# --- FAILURE PATTERN AVOIDANCE ---
# 1. Avoid low-priced/penny stocks
MIN_PRICE_THRESHOLD = 5.0 # USD
if current_price is not None and current_price < MIN_PRICE_THRESHOLD:
rationale = (
f"HOLD: Stock price (${current_price:.2f}) is below minimum threshold "
f"(${MIN_PRICE_THRESHOLD:.2f}). Past failures consistently involved low-priced stocks."
)
return {"action": action, "confidence": confidence, "rationale": rationale}
# 2. Avoid early market hour volatility
if timestamp_str:
try:
dt_obj = datetime.datetime.fromisoformat(timestamp_str)
utc_hour = dt_obj.hour
utc_minute = dt_obj.minute
if (utc_hour == 14 and utc_minute < 45) or (utc_hour == 13 and utc_minute >= 30):
rationale = (
f"HOLD: Trading during early market hours (UTC {utc_hour}:{utc_minute}), "
f"a period identified with past failures due to high volatility."
)
return {"action": action, "confidence": confidence, "rationale": rationale}
except ValueError:
pass
# --- IMPROVED BUY STRATEGY ---
# Momentum BUY signal
if volume_ratio is not None and price_change_pct is not None:
if price_change_pct > 7.0 and volume_ratio > 3.0:
action = "BUY"
confidence = 70
rationale = "Improved BUY: Momentum signal with high volume and above price threshold."
if market_name == 'US_AMEX':
confidence = max(55, confidence - 5)
rationale += " (Adjusted lower for AMEX market's higher risk profile)."
elif market_name == 'US_NASDAQ' and price_change_pct > 20:
confidence = max(50, confidence - 10)
rationale += " (Adjusted lower for aggressive NASDAQ momentum volatility)."
if price_change_pct > 15.0:
confidence = max(50, confidence - 5)
rationale += " (Caution: Very high daily price change, potential for reversal)."
return {"action": action, "confidence": confidence, "rationale": rationale}
# Oversold BUY signal
if rsi is not None and price_change_pct is not None:
if rsi < 30 and price_change_pct < -3.0:
action = "BUY"
confidence = 65
rationale = "Improved BUY: Oversold signal with recent decline and above price threshold."
if market_name == 'US_AMEX':
confidence = max(50, confidence - 5)
rationale += " (Adjusted lower for AMEX market's higher risk on oversold assets)."
if price_change_pct < -10.0:
confidence = max(45, confidence - 10)
rationale += " (Caution: Very steep decline, potential falling knife)."
return {"action": action, "confidence": confidence, "rationale": rationale}
# If no specific BUY signal, default to HOLD
return {"action": action, "confidence": confidence, "rationale": rationale}

View File

@@ -1,88 +0,0 @@
"""Auto-generated strategy: v20260220_210244
Generated at: 2026-02-20T21:02:44.387355+00:00
Rationale: Auto-evolved from 6 failures. Primary failure markets: ['US_AMEX', 'US_NYSE', 'US_NASDAQ']. Average loss: -194.69
"""
from __future__ import annotations
from typing import Any
from src.strategies.base import BaseStrategy
class Strategy_v20260220_210244(BaseStrategy):
"""Strategy: v20260220_210244"""
def evaluate(self, market_data: dict[str, Any]) -> dict[str, Any]:
from datetime import datetime
# Extract required data points safely
current_price = market_data.get("current_price")
price_change_pct = market_data.get("price_change_pct")
volume_ratio = market_data.get("volume_ratio")
rsi = market_data.get("rsi")
timestamp_str = market_data.get("timestamp")
market_name = market_data.get("market")
stock_code = market_data.get("stock_code", "UNKNOWN")
# Default action is HOLD with conservative confidence and rationale
action = "HOLD"
confidence = 50
rationale = f"No strong BUY signal for {stock_code} or awaiting more favorable conditions after avoiding known failure patterns."
# --- 1. Failure Pattern Avoidance Filters ---
# A. Avoid low-priced (penny) stocks
if current_price is not None and current_price < 5.0:
return {
"action": "HOLD",
"confidence": 50,
"rationale": f"AVOID {stock_code}: Stock price (${current_price:.2f}) is below minimum threshold ($5.00) for BUY action. Identified past failures on highly volatile, low-priced stocks."
}
# B. Avoid initiating BUY trades during identified high-volatility hours
if timestamp_str:
try:
trade_hour = datetime.fromisoformat(timestamp_str).hour
if trade_hour in [14, 20]:
return {
"action": "HOLD",
"confidence": 50,
"rationale": f"AVOID {stock_code}: Trading during historically volatile hour ({trade_hour} UTC) where previous BUYs resulted in losses. Prefer to observe market stability."
}
except ValueError:
pass
# C. Be cautious with extreme momentum spikes
if volume_ratio is not None and price_change_pct is not None:
if volume_ratio >= 9.0 and price_change_pct >= 15.0:
return {
"action": "HOLD",
"confidence": 50,
"rationale": f"AVOID {stock_code}: Extreme short-term momentum detected (price change: +{price_change_pct:.2f}%, volume ratio: {volume_ratio:.1f}x). Historical failures indicate buying into such rapid spikes often leads to reversals."
}
# D. Be cautious with "oversold" signals without further confirmation
if rsi is not None and rsi < 30:
return {
"action": "HOLD",
"confidence": 50,
"rationale": f"AVOID {stock_code}: Oversold signal (RSI={rsi:.1f}) detected. While often a BUY signal, historical failures on similar 'oversold' trades suggest waiting for stronger confirmation."
}
# --- 2. Improved BUY Signal Generation ---
if volume_ratio is not None and 2.0 <= volume_ratio < 9.0 and \
price_change_pct is not None and 2.0 <= price_change_pct < 15.0:
action = "BUY"
confidence = 70
rationale = f"BUY {stock_code}: Moderate momentum detected (price change: +{price_change_pct:.2f}%, volume ratio: {volume_ratio:.1f}x). Passed filters for price and extreme momentum, avoiding past failure patterns."
if market_name in ["US_AMEX", "US_NASDAQ"]:
confidence = max(60, confidence - 5)
rationale += f" Adjusted confidence for {market_name} market characteristics."
elif market_name == "US_NYSE":
confidence = max(65, confidence)
confidence = max(50, min(85, confidence))
return {"action": action, "confidence": confidence, "rationale": rationale}

View File

@@ -3,11 +3,9 @@
from __future__ import annotations
import sqlite3
import sys
import tempfile
from datetime import UTC, datetime, timedelta
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
@@ -365,435 +363,3 @@ class TestHealthMonitor:
assert "timestamp" in report
assert "checks" in report
assert len(report["checks"]) == 3
# ---------------------------------------------------------------------------
# BackupExporter — additional coverage for previously uncovered branches
# ---------------------------------------------------------------------------
@pytest.fixture
def empty_db(tmp_path: Path) -> Path:
"""Create a temporary database with NO trade records."""
db_path = tmp_path / "empty_trades.db"
conn = sqlite3.connect(str(db_path))
conn.execute(
"""CREATE TABLE trades (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
stock_code TEXT NOT NULL,
action TEXT NOT NULL,
quantity INTEGER NOT NULL,
price REAL NOT NULL,
confidence INTEGER NOT NULL,
rationale TEXT,
pnl REAL DEFAULT 0.0
)"""
)
conn.commit()
conn.close()
return db_path
class TestBackupExporterAdditional:
"""Cover branches missed in the original TestBackupExporter suite."""
def test_export_all_default_formats(self, temp_db: Path, tmp_path: Path) -> None:
"""export_all with formats=None must default to JSON+CSV+Parquet path."""
exporter = BackupExporter(str(temp_db))
# formats=None triggers the default list assignment (line 62)
results = exporter.export_all(tmp_path / "out", formats=None, compress=False)
# JSON and CSV must always succeed; Parquet needs pyarrow
assert ExportFormat.JSON in results
assert ExportFormat.CSV in results
def test_export_all_logs_error_on_failure(
self, temp_db: Path, tmp_path: Path
) -> None:
"""export_all must log an error and continue when one format fails."""
exporter = BackupExporter(str(temp_db))
# Patch _export_format to raise on JSON, succeed on CSV
original = exporter._export_format
def failing_export(fmt, *args, **kwargs): # type: ignore[no-untyped-def]
if fmt == ExportFormat.JSON:
raise RuntimeError("simulated failure")
return original(fmt, *args, **kwargs)
exporter._export_format = failing_export # type: ignore[method-assign]
results = exporter.export_all(
tmp_path / "out",
formats=[ExportFormat.JSON, ExportFormat.CSV],
compress=False,
)
# JSON failed → not in results; CSV succeeded → in results
assert ExportFormat.JSON not in results
assert ExportFormat.CSV in results
def test_export_csv_empty_trades_no_compress(
self, empty_db: Path, tmp_path: Path
) -> None:
"""CSV export with no trades and compress=False must write header row only."""
exporter = BackupExporter(str(empty_db))
results = exporter.export_all(
tmp_path / "out",
formats=[ExportFormat.CSV],
compress=False,
)
assert ExportFormat.CSV in results
out = results[ExportFormat.CSV]
assert out.exists()
content = out.read_text()
assert "timestamp" in content
def test_export_csv_empty_trades_compressed(
self, empty_db: Path, tmp_path: Path
) -> None:
"""CSV export with no trades and compress=True must write gzipped header."""
import gzip
exporter = BackupExporter(str(empty_db))
results = exporter.export_all(
tmp_path / "out",
formats=[ExportFormat.CSV],
compress=True,
)
assert ExportFormat.CSV in results
out = results[ExportFormat.CSV]
assert out.suffix == ".gz"
with gzip.open(out, "rt", encoding="utf-8") as f:
content = f.read()
assert "timestamp" in content
def test_export_csv_with_data_compressed(
self, temp_db: Path, tmp_path: Path
) -> None:
"""CSV export with data and compress=True must write gzipped rows."""
import gzip
exporter = BackupExporter(str(temp_db))
results = exporter.export_all(
tmp_path / "out",
formats=[ExportFormat.CSV],
compress=True,
)
assert ExportFormat.CSV in results
out = results[ExportFormat.CSV]
with gzip.open(out, "rt", encoding="utf-8") as f:
lines = f.readlines()
# Header + 3 data rows
assert len(lines) == 4
def test_export_parquet_raises_import_error_without_pyarrow(
self, temp_db: Path, tmp_path: Path
) -> None:
"""Parquet export must raise ImportError when pyarrow is not installed."""
exporter = BackupExporter(str(temp_db))
with patch.dict(sys.modules, {"pyarrow": None, "pyarrow.parquet": None}):
try:
import pyarrow # noqa: F401
pytest.skip("pyarrow is installed; cannot test ImportError path")
except ImportError:
pass
results = exporter.export_all(
tmp_path / "out",
formats=[ExportFormat.PARQUET],
compress=False,
)
# Parquet export fails gracefully; result dict should not contain it
assert ExportFormat.PARQUET not in results
# ---------------------------------------------------------------------------
# CloudStorage — mocked boto3 tests
# ---------------------------------------------------------------------------
@pytest.fixture
def mock_boto3_module():
"""Inject a fake boto3 into sys.modules for the duration of the test."""
mock = MagicMock()
with patch.dict(sys.modules, {"boto3": mock}):
yield mock
@pytest.fixture
def s3_config():
"""Minimal S3Config for tests."""
from src.backup.cloud_storage import S3Config
return S3Config(
endpoint_url="http://localhost:9000",
access_key="minioadmin",
secret_key="minioadmin",
bucket_name="test-bucket",
region="us-east-1",
)
class TestCloudStorage:
"""Test CloudStorage using mocked boto3."""
def test_init_creates_s3_client(self, mock_boto3_module, s3_config) -> None:
"""CloudStorage.__init__ must call boto3.client with the correct args."""
from src.backup.cloud_storage import CloudStorage
storage = CloudStorage(s3_config)
mock_boto3_module.client.assert_called_once()
call_kwargs = mock_boto3_module.client.call_args[1]
assert call_kwargs["aws_access_key_id"] == "minioadmin"
assert call_kwargs["aws_secret_access_key"] == "minioadmin"
assert storage.config == s3_config
def test_init_raises_if_boto3_missing(self, s3_config) -> None:
"""CloudStorage.__init__ must raise ImportError when boto3 is absent."""
with patch.dict(sys.modules, {"boto3": None}): # type: ignore[dict-item]
with pytest.raises((ImportError, TypeError)):
# Re-import to trigger the try/except inside __init__
import importlib
import src.backup.cloud_storage as m
importlib.reload(m)
m.CloudStorage(s3_config)
def test_upload_file_success(
self, mock_boto3_module, s3_config, tmp_path: Path
) -> None:
"""upload_file must call client.upload_file and return the object key."""
from src.backup.cloud_storage import CloudStorage
test_file = tmp_path / "backup.json.gz"
test_file.write_bytes(b"data")
storage = CloudStorage(s3_config)
key = storage.upload_file(test_file, object_key="backups/backup.json.gz")
assert key == "backups/backup.json.gz"
storage.client.upload_file.assert_called_once()
def test_upload_file_default_key(
self, mock_boto3_module, s3_config, tmp_path: Path
) -> None:
"""upload_file without object_key must use the filename as key."""
from src.backup.cloud_storage import CloudStorage
test_file = tmp_path / "myfile.gz"
test_file.write_bytes(b"data")
storage = CloudStorage(s3_config)
key = storage.upload_file(test_file)
assert key == "myfile.gz"
def test_upload_file_not_found(
self, mock_boto3_module, s3_config, tmp_path: Path
) -> None:
"""upload_file must raise FileNotFoundError for missing files."""
from src.backup.cloud_storage import CloudStorage
storage = CloudStorage(s3_config)
with pytest.raises(FileNotFoundError):
storage.upload_file(tmp_path / "nonexistent.gz")
def test_upload_file_propagates_client_error(
self, mock_boto3_module, s3_config, tmp_path: Path
) -> None:
"""upload_file must re-raise exceptions from the boto3 client."""
from src.backup.cloud_storage import CloudStorage
test_file = tmp_path / "backup.gz"
test_file.write_bytes(b"data")
storage = CloudStorage(s3_config)
storage.client.upload_file.side_effect = RuntimeError("network error")
with pytest.raises(RuntimeError, match="network error"):
storage.upload_file(test_file)
def test_download_file_success(
self, mock_boto3_module, s3_config, tmp_path: Path
) -> None:
"""download_file must call client.download_file and return local path."""
from src.backup.cloud_storage import CloudStorage
storage = CloudStorage(s3_config)
dest = tmp_path / "downloads" / "backup.gz"
result = storage.download_file("backups/backup.gz", dest)
assert result == dest
storage.client.download_file.assert_called_once()
def test_download_file_propagates_error(
self, mock_boto3_module, s3_config, tmp_path: Path
) -> None:
"""download_file must re-raise exceptions from the boto3 client."""
from src.backup.cloud_storage import CloudStorage
storage = CloudStorage(s3_config)
storage.client.download_file.side_effect = RuntimeError("timeout")
with pytest.raises(RuntimeError, match="timeout"):
storage.download_file("key", tmp_path / "dest.gz")
def test_list_files_returns_objects(
self, mock_boto3_module, s3_config
) -> None:
"""list_files must return parsed file metadata from S3 response."""
from datetime import timezone
from src.backup.cloud_storage import CloudStorage
storage = CloudStorage(s3_config)
storage.client.list_objects_v2.return_value = {
"Contents": [
{
"Key": "backups/a.gz",
"Size": 1024,
"LastModified": datetime(2026, 1, 1, tzinfo=timezone.utc),
"ETag": '"abc123"',
}
]
}
files = storage.list_files(prefix="backups/")
assert len(files) == 1
assert files[0]["key"] == "backups/a.gz"
assert files[0]["size_bytes"] == 1024
def test_list_files_empty_bucket(
self, mock_boto3_module, s3_config
) -> None:
"""list_files must return empty list when bucket has no objects."""
from src.backup.cloud_storage import CloudStorage
storage = CloudStorage(s3_config)
storage.client.list_objects_v2.return_value = {}
files = storage.list_files()
assert files == []
def test_list_files_propagates_error(
self, mock_boto3_module, s3_config
) -> None:
"""list_files must re-raise exceptions from the boto3 client."""
from src.backup.cloud_storage import CloudStorage
storage = CloudStorage(s3_config)
storage.client.list_objects_v2.side_effect = RuntimeError("auth error")
with pytest.raises(RuntimeError):
storage.list_files()
def test_delete_file_success(
self, mock_boto3_module, s3_config
) -> None:
"""delete_file must call client.delete_object with the correct key."""
from src.backup.cloud_storage import CloudStorage
storage = CloudStorage(s3_config)
storage.delete_file("backups/old.gz")
storage.client.delete_object.assert_called_once_with(
Bucket="test-bucket", Key="backups/old.gz"
)
def test_delete_file_propagates_error(
self, mock_boto3_module, s3_config
) -> None:
"""delete_file must re-raise exceptions from the boto3 client."""
from src.backup.cloud_storage import CloudStorage
storage = CloudStorage(s3_config)
storage.client.delete_object.side_effect = RuntimeError("permission denied")
with pytest.raises(RuntimeError):
storage.delete_file("backups/old.gz")
def test_get_storage_stats_success(
self, mock_boto3_module, s3_config
) -> None:
"""get_storage_stats must aggregate file sizes correctly."""
from datetime import timezone
from src.backup.cloud_storage import CloudStorage
storage = CloudStorage(s3_config)
storage.client.list_objects_v2.return_value = {
"Contents": [
{
"Key": "a.gz",
"Size": 1024 * 1024,
"LastModified": datetime(2026, 1, 1, tzinfo=timezone.utc),
"ETag": '"x"',
},
{
"Key": "b.gz",
"Size": 1024 * 1024,
"LastModified": datetime(2026, 1, 2, tzinfo=timezone.utc),
"ETag": '"y"',
},
]
}
stats = storage.get_storage_stats()
assert stats["total_files"] == 2
assert stats["total_size_bytes"] == 2 * 1024 * 1024
assert stats["total_size_mb"] == pytest.approx(2.0)
def test_get_storage_stats_on_error(
self, mock_boto3_module, s3_config
) -> None:
"""get_storage_stats must return error dict without raising on failure."""
from src.backup.cloud_storage import CloudStorage
storage = CloudStorage(s3_config)
storage.client.list_objects_v2.side_effect = RuntimeError("no connection")
stats = storage.get_storage_stats()
assert "error" in stats
assert stats["total_files"] == 0
def test_verify_connection_success(
self, mock_boto3_module, s3_config
) -> None:
"""verify_connection must return True when head_bucket succeeds."""
from src.backup.cloud_storage import CloudStorage
storage = CloudStorage(s3_config)
result = storage.verify_connection()
assert result is True
def test_verify_connection_failure(
self, mock_boto3_module, s3_config
) -> None:
"""verify_connection must return False when head_bucket raises."""
from src.backup.cloud_storage import CloudStorage
storage = CloudStorage(s3_config)
storage.client.head_bucket.side_effect = RuntimeError("no such bucket")
result = storage.verify_connection()
assert result is False
def test_enable_versioning(
self, mock_boto3_module, s3_config
) -> None:
"""enable_versioning must call put_bucket_versioning."""
from src.backup.cloud_storage import CloudStorage
storage = CloudStorage(s3_config)
storage.enable_versioning()
storage.client.put_bucket_versioning.assert_called_once()
def test_enable_versioning_propagates_error(
self, mock_boto3_module, s3_config
) -> None:
"""enable_versioning must re-raise exceptions from the boto3 client."""
from src.backup.cloud_storage import CloudStorage
storage = CloudStorage(s3_config)
storage.client.put_bucket_versioning.side_effect = RuntimeError("denied")
with pytest.raises(RuntimeError):
storage.enable_versioning()

View File

@@ -10,7 +10,6 @@ import pytest
from src.context.aggregator import ContextAggregator
from src.context.layer import LAYER_CONFIG, ContextLayer
from src.context.store import ContextStore
from src.context.summarizer import ContextSummarizer
from src.db import init_db, log_trade
@@ -371,259 +370,3 @@ class TestLayerMetadata:
# L1 aggregates from L2
assert LAYER_CONFIG[ContextLayer.L1_LEGACY].aggregation_source == ContextLayer.L2_ANNUAL
# ---------------------------------------------------------------------------
# ContextSummarizer tests
# ---------------------------------------------------------------------------
@pytest.fixture
def summarizer(db_conn: sqlite3.Connection) -> ContextSummarizer:
"""Provide a ContextSummarizer backed by an in-memory store."""
return ContextSummarizer(ContextStore(db_conn))
class TestContextSummarizer:
"""Test suite for ContextSummarizer."""
# ------------------------------------------------------------------
# summarize_numeric_values
# ------------------------------------------------------------------
def test_summarize_empty_values(self, summarizer: ContextSummarizer) -> None:
"""Empty list must return SummaryStats with count=0 and no other fields."""
stats = summarizer.summarize_numeric_values([])
assert stats.count == 0
assert stats.mean is None
assert stats.min is None
assert stats.max is None
def test_summarize_single_value(self, summarizer: ContextSummarizer) -> None:
"""Single-element list must return correct stats with std=0 and trend=flat."""
stats = summarizer.summarize_numeric_values([42.0])
assert stats.count == 1
assert stats.mean == 42.0
assert stats.std == 0.0
assert stats.trend == "flat"
def test_summarize_upward_trend(self, summarizer: ContextSummarizer) -> None:
"""Increasing values must produce trend='up'."""
values = [1.0, 2.0, 3.0, 10.0, 20.0, 30.0]
stats = summarizer.summarize_numeric_values(values)
assert stats.trend == "up"
def test_summarize_downward_trend(self, summarizer: ContextSummarizer) -> None:
"""Decreasing values must produce trend='down'."""
values = [30.0, 20.0, 10.0, 3.0, 2.0, 1.0]
stats = summarizer.summarize_numeric_values(values)
assert stats.trend == "down"
def test_summarize_flat_trend(self, summarizer: ContextSummarizer) -> None:
"""Stable values must produce trend='flat'."""
values = [100.0, 100.1, 99.9, 100.0, 100.2, 99.8]
stats = summarizer.summarize_numeric_values(values)
assert stats.trend == "flat"
# ------------------------------------------------------------------
# summarize_layer
# ------------------------------------------------------------------
def test_summarize_layer_no_data(
self, summarizer: ContextSummarizer
) -> None:
"""summarize_layer with no data must return the 'No data' sentinel."""
result = summarizer.summarize_layer(ContextLayer.L6_DAILY)
assert result["count"] == 0
assert "No data" in result["summary"]
def test_summarize_layer_numeric(
self, summarizer: ContextSummarizer, db_conn: sqlite3.Connection
) -> None:
"""summarize_layer must collect numeric values and produce stats."""
store = summarizer.store
store.set_context(ContextLayer.L6_DAILY, "2026-02-01", "total_pnl", 100.0)
store.set_context(ContextLayer.L6_DAILY, "2026-02-02", "total_pnl", 200.0)
result = summarizer.summarize_layer(ContextLayer.L6_DAILY)
assert "total_entries" in result
def test_summarize_layer_with_dict_values(
self, summarizer: ContextSummarizer
) -> None:
"""summarize_layer must handle dict values by extracting numeric subkeys."""
store = summarizer.store
# set_context serialises the value as JSON, so passing a dict works
store.set_context(
ContextLayer.L6_DAILY, "2026-02-01", "metrics",
{"win_rate": 65.0, "label": "good"}
)
result = summarizer.summarize_layer(ContextLayer.L6_DAILY)
assert "total_entries" in result
# numeric subkey "win_rate" should appear as "metrics.win_rate"
assert "metrics.win_rate" in result
def test_summarize_layer_with_string_values(
self, summarizer: ContextSummarizer
) -> None:
"""summarize_layer must count string values separately."""
store = summarizer.store
# set_context stores string values as JSON-encoded strings
store.set_context(ContextLayer.L6_DAILY, "2026-02-01", "outlook", "BULLISH")
result = summarizer.summarize_layer(ContextLayer.L6_DAILY)
# String fields contribute a `<key>_count` entry
assert "outlook_count" in result
# ------------------------------------------------------------------
# rolling_window_summary
# ------------------------------------------------------------------
def test_rolling_window_summary_basic(
self, summarizer: ContextSummarizer
) -> None:
"""rolling_window_summary must return the expected structure."""
store = summarizer.store
store.set_context(ContextLayer.L6_DAILY, "2026-02-01", "pnl", 500.0)
result = summarizer.rolling_window_summary(ContextLayer.L6_DAILY)
assert "window_days" in result
assert "recent_data" in result
assert "historical_summary" in result
def test_rolling_window_summary_no_older_data(
self, summarizer: ContextSummarizer
) -> None:
"""rolling_window_summary with summarize_older=False skips history."""
result = summarizer.rolling_window_summary(
ContextLayer.L6_DAILY, summarize_older=False
)
assert result["historical_summary"] == {}
# ------------------------------------------------------------------
# aggregate_to_higher_layer
# ------------------------------------------------------------------
def test_aggregate_to_higher_layer_mean(
self, summarizer: ContextSummarizer
) -> None:
"""aggregate_to_higher_layer with 'mean' via dict subkeys returns average."""
store = summarizer.store
# Use different outer keys but same inner metric key so get_all_contexts
# returns multiple rows with the target subkey.
store.set_context(ContextLayer.L6_DAILY, "2026-02-01", "day1", {"pnl": 100.0})
store.set_context(ContextLayer.L6_DAILY, "2026-02-01", "day2", {"pnl": 200.0})
result = summarizer.aggregate_to_higher_layer(
ContextLayer.L6_DAILY, ContextLayer.L5_WEEKLY, "pnl", "mean"
)
assert result == pytest.approx(150.0)
def test_aggregate_to_higher_layer_sum(
self, summarizer: ContextSummarizer
) -> None:
"""aggregate_to_higher_layer with 'sum' must return the total."""
store = summarizer.store
store.set_context(ContextLayer.L6_DAILY, "2026-02-01", "day1", {"pnl": 100.0})
store.set_context(ContextLayer.L6_DAILY, "2026-02-01", "day2", {"pnl": 200.0})
result = summarizer.aggregate_to_higher_layer(
ContextLayer.L6_DAILY, ContextLayer.L5_WEEKLY, "pnl", "sum"
)
assert result == pytest.approx(300.0)
def test_aggregate_to_higher_layer_max(
self, summarizer: ContextSummarizer
) -> None:
"""aggregate_to_higher_layer with 'max' must return the maximum."""
store = summarizer.store
store.set_context(ContextLayer.L6_DAILY, "2026-02-01", "day1", {"pnl": 100.0})
store.set_context(ContextLayer.L6_DAILY, "2026-02-01", "day2", {"pnl": 200.0})
result = summarizer.aggregate_to_higher_layer(
ContextLayer.L6_DAILY, ContextLayer.L5_WEEKLY, "pnl", "max"
)
assert result == pytest.approx(200.0)
def test_aggregate_to_higher_layer_min(
self, summarizer: ContextSummarizer
) -> None:
"""aggregate_to_higher_layer with 'min' must return the minimum."""
store = summarizer.store
store.set_context(ContextLayer.L6_DAILY, "2026-02-01", "day1", {"pnl": 100.0})
store.set_context(ContextLayer.L6_DAILY, "2026-02-01", "day2", {"pnl": 200.0})
result = summarizer.aggregate_to_higher_layer(
ContextLayer.L6_DAILY, ContextLayer.L5_WEEKLY, "pnl", "min"
)
assert result == pytest.approx(100.0)
def test_aggregate_to_higher_layer_no_data(
self, summarizer: ContextSummarizer
) -> None:
"""aggregate_to_higher_layer with no matching key must return None."""
result = summarizer.aggregate_to_higher_layer(
ContextLayer.L6_DAILY, ContextLayer.L5_WEEKLY, "nonexistent", "mean"
)
assert result is None
def test_aggregate_to_higher_layer_unknown_func_defaults_to_mean(
self, summarizer: ContextSummarizer
) -> None:
"""Unknown aggregation function must fall back to mean."""
store = summarizer.store
store.set_context(ContextLayer.L6_DAILY, "2026-02-01", "day1", {"pnl": 100.0})
store.set_context(ContextLayer.L6_DAILY, "2026-02-01", "day2", {"pnl": 200.0})
result = summarizer.aggregate_to_higher_layer(
ContextLayer.L6_DAILY, ContextLayer.L5_WEEKLY, "pnl", "unknown_func"
)
assert result == pytest.approx(150.0)
# ------------------------------------------------------------------
# create_compact_summary + format_summary_for_prompt
# ------------------------------------------------------------------
def test_create_compact_summary(
self, summarizer: ContextSummarizer
) -> None:
"""create_compact_summary must produce a dict keyed by layer value."""
store = summarizer.store
store.set_context(ContextLayer.L6_DAILY, "2026-02-01", "pnl", 100.0)
result = summarizer.create_compact_summary([ContextLayer.L6_DAILY])
assert ContextLayer.L6_DAILY.value in result
def test_format_summary_for_prompt_with_numeric_metrics(
self, summarizer: ContextSummarizer
) -> None:
"""format_summary_for_prompt must render avg/trend fields."""
store = summarizer.store
store.set_context(ContextLayer.L6_DAILY, "2026-02-01", "pnl", 100.0)
store.set_context(ContextLayer.L6_DAILY, "2026-02-02", "pnl", 200.0)
compact = summarizer.create_compact_summary([ContextLayer.L6_DAILY])
text = summarizer.format_summary_for_prompt(compact)
assert isinstance(text, str)
def test_format_summary_for_prompt_skips_empty_layers(
self, summarizer: ContextSummarizer
) -> None:
"""format_summary_for_prompt must skip layers with no metrics."""
summary = {ContextLayer.L6_DAILY.value: {}}
text = summarizer.format_summary_for_prompt(summary)
assert text == ""
def test_format_summary_non_dict_value(
self, summarizer: ContextSummarizer
) -> None:
"""format_summary_for_prompt must render non-dict values as plain text."""
summary = {
"daily": {
"plain_count": 42,
}
}
text = summarizer.format_summary_for_prompt(summary)
assert "plain_count" in text
assert "42" in text

View File

@@ -1,117 +0,0 @@
"""Tests for JSON structured logging configuration."""
from __future__ import annotations
import json
import logging
import sys
from src.logging_config import JSONFormatter, setup_logging
class TestJSONFormatter:
"""Test JSONFormatter output."""
def test_basic_log_record(self) -> None:
"""JSONFormatter must emit valid JSON with required fields."""
formatter = JSONFormatter()
record = logging.LogRecord(
name="test.logger",
level=logging.INFO,
pathname="",
lineno=0,
msg="Hello %s",
args=("world",),
exc_info=None,
)
output = formatter.format(record)
data = json.loads(output)
assert data["level"] == "INFO"
assert data["logger"] == "test.logger"
assert data["message"] == "Hello world"
assert "timestamp" in data
def test_includes_exception_info(self) -> None:
"""JSONFormatter must include exception info when present."""
formatter = JSONFormatter()
try:
raise ValueError("test error")
except ValueError:
exc_info = sys.exc_info()
record = logging.LogRecord(
name="test",
level=logging.ERROR,
pathname="",
lineno=0,
msg="oops",
args=(),
exc_info=exc_info,
)
output = formatter.format(record)
data = json.loads(output)
assert "exception" in data
assert "ValueError" in data["exception"]
def test_extra_trading_fields_included(self) -> None:
"""Extra trading fields attached to the record must appear in JSON."""
formatter = JSONFormatter()
record = logging.LogRecord(
name="test",
level=logging.INFO,
pathname="",
lineno=0,
msg="trade",
args=(),
exc_info=None,
)
record.stock_code = "005930" # type: ignore[attr-defined]
record.action = "BUY" # type: ignore[attr-defined]
record.confidence = 85 # type: ignore[attr-defined]
record.pnl_pct = -1.5 # type: ignore[attr-defined]
record.order_amount = 1_000_000 # type: ignore[attr-defined]
output = formatter.format(record)
data = json.loads(output)
assert data["stock_code"] == "005930"
assert data["action"] == "BUY"
assert data["confidence"] == 85
assert data["pnl_pct"] == -1.5
assert data["order_amount"] == 1_000_000
def test_none_extra_fields_excluded(self) -> None:
"""Extra fields that are None must not appear in JSON output."""
formatter = JSONFormatter()
record = logging.LogRecord(
name="test",
level=logging.INFO,
pathname="",
lineno=0,
msg="no extras",
args=(),
exc_info=None,
)
output = formatter.format(record)
data = json.loads(output)
assert "stock_code" not in data
assert "action" not in data
assert "confidence" not in data
class TestSetupLogging:
"""Test setup_logging function."""
def test_configures_root_logger(self) -> None:
"""setup_logging must attach a JSON handler to the root logger."""
setup_logging(level=logging.DEBUG)
root = logging.getLogger()
json_handlers = [
h for h in root.handlers if isinstance(h.formatter, JSONFormatter)
]
assert len(json_handlers) == 1
assert root.level == logging.DEBUG
def test_avoids_duplicate_handlers(self) -> None:
"""Calling setup_logging twice must not add duplicate handlers."""
setup_logging()
setup_logging()
root = logging.getLogger()
assert len(root.handlers) == 1

View File

@@ -18,7 +18,6 @@ from src.main import (
_extract_held_codes_from_balance,
_extract_held_qty_from_balance,
_handle_market_close,
_retry_connection,
_run_context_scheduler,
_run_evolution_loop,
_start_dashboard_server,
@@ -3184,90 +3183,3 @@ class TestOverseasBrokerIntegration:
# DB도 브로커도 보유 없음 → BUY 주문이 실행되어야 함 (회귀 테스트)
overseas_broker.send_overseas_order.assert_called_once()
# ---------------------------------------------------------------------------
# _retry_connection — unit tests (issue #209)
# ---------------------------------------------------------------------------
class TestRetryConnection:
"""Unit tests for the _retry_connection helper (issue #209)."""
@pytest.mark.asyncio
async def test_success_on_first_attempt(self) -> None:
"""Returns the result immediately when the first call succeeds."""
async def ok() -> str:
return "data"
result = await _retry_connection(ok, label="test")
assert result == "data"
@pytest.mark.asyncio
async def test_succeeds_after_one_connection_error(self) -> None:
"""Retries once on ConnectionError and returns result on 2nd attempt."""
call_count = 0
async def flaky() -> str:
nonlocal call_count
call_count += 1
if call_count < 2:
raise ConnectionError("timeout")
return "ok"
with patch("src.main.asyncio.sleep") as mock_sleep:
mock_sleep.return_value = None
result = await _retry_connection(flaky, label="flaky")
assert result == "ok"
assert call_count == 2
mock_sleep.assert_called_once()
@pytest.mark.asyncio
async def test_raises_after_all_retries_exhausted(self) -> None:
"""Raises ConnectionError after MAX_CONNECTION_RETRIES attempts."""
from src.main import MAX_CONNECTION_RETRIES
call_count = 0
async def always_fail() -> None:
nonlocal call_count
call_count += 1
raise ConnectionError("unreachable")
with patch("src.main.asyncio.sleep") as mock_sleep:
mock_sleep.return_value = None
with pytest.raises(ConnectionError, match="unreachable"):
await _retry_connection(always_fail, label="always_fail")
assert call_count == MAX_CONNECTION_RETRIES
@pytest.mark.asyncio
async def test_passes_args_and_kwargs_to_factory(self) -> None:
"""Forwards positional and keyword arguments to the callable."""
received: dict = {}
async def capture(a: int, b: int, *, key: str) -> str:
received["a"] = a
received["b"] = b
received["key"] = key
return "captured"
result = await _retry_connection(capture, 1, 2, key="val", label="test")
assert result == "captured"
assert received == {"a": 1, "b": 2, "key": "val"}
@pytest.mark.asyncio
async def test_non_connection_error_not_retried(self) -> None:
"""Non-ConnectionError exceptions propagate immediately without retry."""
call_count = 0
async def bad_input() -> None:
nonlocal call_count
call_count += 1
raise ValueError("bad data")
with pytest.raises(ValueError, match="bad data"):
await _retry_connection(bad_input, label="bad")
assert call_count == 1 # No retry for non-ConnectionError

View File

@@ -1,32 +0,0 @@
"""Tests for BaseStrategy abstract class."""
from __future__ import annotations
from typing import Any
import pytest
from src.strategies.base import BaseStrategy
class ConcreteStrategy(BaseStrategy):
"""Minimal concrete strategy for testing."""
def evaluate(self, market_data: dict[str, Any]) -> dict[str, Any]:
return {"action": "HOLD", "confidence": 50, "rationale": "test"}
def test_base_strategy_cannot_be_instantiated() -> None:
"""BaseStrategy cannot be instantiated directly (it's abstract)."""
with pytest.raises(TypeError):
BaseStrategy() # type: ignore[abstract]
def test_concrete_strategy_evaluate_returns_decision() -> None:
"""Concrete subclass must implement evaluate and return a dict."""
strategy = ConcreteStrategy()
result = strategy.evaluate({"close": [100.0, 101.0]})
assert isinstance(result, dict)
assert result["action"] == "HOLD"
assert result["confidence"] == 50
assert "rationale" in result