Compare commits

...

5 Commits

Author SHA1 Message Date
agentson
b961c53a92 improve: implied_rsi 계수 4.0→2.0으로 완화 — 포화 임계점 12.5%→25% (#181)
Some checks failed
CI / test (pull_request) Has been cancelled
SmartScanner의 implied_rsi 공식에서 계수를 4.0에서 2.0으로 수정.
12.5% 이상 변동률에서 RSI=100으로 포화되던 문제를 개선.

변경 전: 50 + (change_rate * 4.0) → 12.5% 변동 시 RSI=100
변경 후: 50 + (change_rate * 2.0) → 25% 변동 시 RSI=100

이제 10% 상승 → RSI=70, 12.5% 상승 → RSI=75 (의미 있는 구분 가능)
해외 소형주(NYSE American 등)의 RSI=100 집단 현상 완화.

- smart_scanner.py 3곳 동일 공식 모두 수정
- TestImpliedRSIFormula 클래스 5개 테스트 추가

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-20 09:33:35 +09:00
76a7ee7cdb Merge pull request 'fix: 잔액 부족 주문 실패 후 10분간 BUY 재시도 방지 (#179)' (#183) from feature/issue-179-insufficient-balance-cooldown into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #183
2026-02-20 09:31:08 +09:00
17112b864a Merge pull request 'fix: uvicorn 미설치 시 dashboard 오해 없는 실패 처리 (#178)' (#184) from feature/issue-178-dashboard-log-order into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #184
2026-02-20 09:30:48 +09:00
agentson
28bcc7acd7 fix: uvicorn 미설치 시 dashboard 실패를 동기적으로 감지하여 오해 없는 로그 출력 (#178)
Some checks failed
CI / test (pull_request) Has been cancelled
스레드 시작 전에 uvicorn import를 검증하도록 _start_dashboard_server 수정.
uvicorn 미설치 시 "started" 로그 없이 즉시 WARNING 출력 후 None 반환.

- 사전 import 검증으로 "started" → "failed" 오해 소지 있는 로그 쌍 제거
- uvicorn 미설치 시 명확한 경고 메시지 출력
- test_start_dashboard_server_returns_none_when_uvicorn_missing 테스트 추가

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-20 09:28:23 +09:00
agentson
39b9f179f4 fix: 잔액 부족 주문 실패 후 10분간 BUY 재시도 방지 (issue #179)
Some checks failed
CI / test (pull_request) Has been cancelled
잔액 부족(주문가능금액 부족) 에러 발생 시 해당 종목을 10분간 BUY 시도에서
제외하는 cooldown 메커니즘을 realtime/daily 루프 모두에 적용.

- _BUY_COOLDOWN_SECONDS = 600 상수 추가
- trading_cycle()에 buy_cooldown 파라미터 추가
- 잔액 부족 에러(주문가능금액) 감지 후 cooldown 설정
- BUY 실행 전 cooldown 체크 (realtime + daily session 모두)
- TestBuyCooldown 테스트 클래스 4개 추가

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-20 09:26:09 +09:00
4 changed files with 376 additions and 8 deletions

View File

@@ -175,7 +175,7 @@ class SmartVolatilityScanner:
liquidity_score = volume_rank_bonus.get(stock_code, 0.0) liquidity_score = volume_rank_bonus.get(stock_code, 0.0)
score = min(100.0, volatility_score + liquidity_score) score = min(100.0, volatility_score + liquidity_score)
signal = "momentum" if change_rate >= 0 else "oversold" signal = "momentum" if change_rate >= 0 else "oversold"
implied_rsi = max(0.0, min(100.0, 50.0 + (change_rate * 4.0))) implied_rsi = max(0.0, min(100.0, 50.0 + (change_rate * 2.0)))
candidates.append( candidates.append(
ScanCandidate( ScanCandidate(
@@ -282,7 +282,7 @@ class SmartVolatilityScanner:
liquidity_score = volume_rank_bonus.get(stock_code, 0.0) liquidity_score = volume_rank_bonus.get(stock_code, 0.0)
score = min(100.0, volatility_score + liquidity_score) score = min(100.0, volatility_score + liquidity_score)
signal = "momentum" if change_rate >= 0 else "oversold" signal = "momentum" if change_rate >= 0 else "oversold"
implied_rsi = max(0.0, min(100.0, 50.0 + (change_rate * 4.0))) implied_rsi = max(0.0, min(100.0, 50.0 + (change_rate * 2.0)))
candidates.append( candidates.append(
ScanCandidate( ScanCandidate(
stock_code=stock_code, stock_code=stock_code,
@@ -338,7 +338,7 @@ class SmartVolatilityScanner:
score = min(volatility_pct / 10.0, 1.0) * 100.0 score = min(volatility_pct / 10.0, 1.0) * 100.0
signal = "momentum" if change_rate >= 0 else "oversold" signal = "momentum" if change_rate >= 0 else "oversold"
implied_rsi = max(0.0, min(100.0, 50.0 + (change_rate * 4.0))) implied_rsi = max(0.0, min(100.0, 50.0 + (change_rate * 2.0)))
candidates.append( candidates.append(
ScanCandidate( ScanCandidate(
stock_code=stock_code, stock_code=stock_code,

View File

@@ -81,6 +81,7 @@ def safe_float(value: str | float | None, default: float = 0.0) -> float:
TRADE_INTERVAL_SECONDS = 60 TRADE_INTERVAL_SECONDS = 60
SCAN_INTERVAL_SECONDS = 60 # Scan markets every 60 seconds SCAN_INTERVAL_SECONDS = 60 # Scan markets every 60 seconds
MAX_CONNECTION_RETRIES = 3 MAX_CONNECTION_RETRIES = 3
_BUY_COOLDOWN_SECONDS = 600 # 10-minute cooldown after insufficient-balance rejection
# Daily trading mode constants (for Free tier API efficiency) # Daily trading mode constants (for Free tier API efficiency)
DAILY_TRADE_SESSIONS = 4 # Number of trading sessions per day DAILY_TRADE_SESSIONS = 4 # Number of trading sessions per day
@@ -298,6 +299,7 @@ async def trading_cycle(
stock_code: str, stock_code: str,
scan_candidates: dict[str, dict[str, ScanCandidate]], scan_candidates: dict[str, dict[str, ScanCandidate]],
settings: Settings | None = None, settings: Settings | None = None,
buy_cooldown: dict[str, float] | None = None,
) -> None: ) -> None:
"""Execute one trading cycle for a single stock.""" """Execute one trading cycle for a single stock."""
cycle_start_time = asyncio.get_event_loop().time() cycle_start_time = asyncio.get_event_loop().time()
@@ -642,7 +644,22 @@ async def trading_cycle(
return return
order_amount = current_price * quantity order_amount = current_price * quantity
# 4. Risk check BEFORE order # 4. Check BUY cooldown (set when a prior BUY failed due to insufficient balance)
if decision.action == "BUY" and buy_cooldown is not None:
cooldown_key = f"{market.code}:{stock_code}"
cooldown_until = buy_cooldown.get(cooldown_key, 0.0)
now = asyncio.get_event_loop().time()
if now < cooldown_until:
remaining = int(cooldown_until - now)
logger.info(
"Skip BUY %s (%s): insufficient-balance cooldown active (%ds remaining)",
stock_code,
market.name,
remaining,
)
return
# 5a. Risk check BEFORE order
try: try:
risk.validate_order( risk.validate_order(
current_pnl_pct=pnl_pct, current_pnl_pct=pnl_pct,
@@ -690,11 +707,23 @@ async def trading_cycle(
# Check if KIS rejected the order (rt_cd != "0") # Check if KIS rejected the order (rt_cd != "0")
if result.get("rt_cd", "") != "0": if result.get("rt_cd", "") != "0":
order_succeeded = False order_succeeded = False
msg1 = result.get("msg1") or ""
logger.warning( logger.warning(
"Overseas order not accepted for %s: rt_cd=%s msg=%s", "Overseas order not accepted for %s: rt_cd=%s msg=%s",
stock_code, stock_code,
result.get("rt_cd"), result.get("rt_cd"),
result.get("msg1"), msg1,
)
# Set BUY cooldown when the rejection is due to insufficient balance
if decision.action == "BUY" and buy_cooldown is not None and "주문가능금액" in msg1:
cooldown_key = f"{market.code}:{stock_code}"
buy_cooldown[cooldown_key] = (
asyncio.get_event_loop().time() + _BUY_COOLDOWN_SECONDS
)
logger.info(
"BUY cooldown set for %s: %.0fs (insufficient balance)",
stock_code,
_BUY_COOLDOWN_SECONDS,
) )
logger.info("Order result: %s", result.get("msg1", "OK")) logger.info("Order result: %s", result.get("msg1", "OK"))
@@ -803,6 +832,9 @@ async def run_daily_session(
logger.info("Starting daily trading session for %d markets", len(open_markets)) logger.info("Starting daily trading session for %d markets", len(open_markets))
# BUY cooldown: prevents retrying stocks rejected for insufficient balance
daily_buy_cooldown: dict[str, float] = {} # "{market_code}:{stock_code}" -> expiry timestamp
# Process each open market # Process each open market
for market in open_markets: for market in open_markets:
# Use market-local date for playbook keying # Use market-local date for playbook keying
@@ -1075,6 +1107,21 @@ async def run_daily_session(
continue continue
order_amount = stock_data["current_price"] * quantity order_amount = stock_data["current_price"] * quantity
# Check BUY cooldown (insufficient balance)
if decision.action == "BUY":
daily_cooldown_key = f"{market.code}:{stock_code}"
daily_cooldown_until = daily_buy_cooldown.get(daily_cooldown_key, 0.0)
now = asyncio.get_event_loop().time()
if now < daily_cooldown_until:
remaining = int(daily_cooldown_until - now)
logger.info(
"Skip BUY %s (%s): insufficient-balance cooldown active (%ds remaining)",
stock_code,
market.name,
remaining,
)
continue
# Risk check # Risk check
try: try:
risk.validate_order( risk.validate_order(
@@ -1131,11 +1178,22 @@ async def run_daily_session(
) )
if result.get("rt_cd", "") != "0": if result.get("rt_cd", "") != "0":
order_succeeded = False order_succeeded = False
daily_msg1 = result.get("msg1") or ""
logger.warning( logger.warning(
"Overseas order not accepted for %s: rt_cd=%s msg=%s", "Overseas order not accepted for %s: rt_cd=%s msg=%s",
stock_code, stock_code,
result.get("rt_cd"), result.get("rt_cd"),
result.get("msg1"), daily_msg1,
)
if decision.action == "BUY" and "주문가능금액" in daily_msg1:
daily_cooldown_key = f"{market.code}:{stock_code}"
daily_buy_cooldown[daily_cooldown_key] = (
asyncio.get_event_loop().time() + _BUY_COOLDOWN_SECONDS
)
logger.info(
"BUY cooldown set for %s: %.0fs (insufficient balance)",
stock_code,
_BUY_COOLDOWN_SECONDS,
) )
logger.info("Order result: %s", result.get("msg1", "OK")) logger.info("Order result: %s", result.get("msg1", "OK"))
@@ -1300,10 +1358,18 @@ def _start_dashboard_server(settings: Settings) -> threading.Thread | None:
if not settings.DASHBOARD_ENABLED: if not settings.DASHBOARD_ENABLED:
return None return None
# Validate dependencies before spawning the thread so startup failures are
# reported synchronously (avoids the misleading "started" → "failed" log pair).
try:
import uvicorn # noqa: F401
from src.dashboard import create_dashboard_app # noqa: F401
except ImportError as exc:
logger.warning("Dashboard server unavailable (missing dependency): %s", exc)
return None
def _serve() -> None: def _serve() -> None:
try: try:
import uvicorn import uvicorn
from src.dashboard import create_dashboard_app from src.dashboard import create_dashboard_app
app = create_dashboard_app(settings.DB_PATH) app = create_dashboard_app(settings.DB_PATH)
@@ -1314,7 +1380,7 @@ def _start_dashboard_server(settings: Settings) -> threading.Thread | None:
log_level="info", log_level="info",
) )
except Exception as exc: except Exception as exc:
logger.warning("Dashboard server failed to start: %s", exc) logger.warning("Dashboard server stopped unexpectedly: %s", exc)
thread = threading.Thread( thread = threading.Thread(
target=_serve, target=_serve,
@@ -1755,6 +1821,9 @@ async def run(settings: Settings) -> None:
# Active stocks per market (dynamically discovered by scanner) # Active stocks per market (dynamically discovered by scanner)
active_stocks: dict[str, list[str]] = {} # market_code -> [stock_codes] active_stocks: dict[str, list[str]] = {} # market_code -> [stock_codes]
# BUY cooldown: prevents retrying a stock rejected for insufficient balance
buy_cooldown: dict[str, float] = {} # "{market_code}:{stock_code}" -> expiry timestamp
# Initialize latency control system # Initialize latency control system
criticality_assessor = CriticalityAssessor( criticality_assessor = CriticalityAssessor(
critical_pnl_threshold=-2.5, # Near circuit breaker at -3.0% critical_pnl_threshold=-2.5, # Near circuit breaker at -3.0%
@@ -2097,6 +2166,7 @@ async def run(settings: Settings) -> None:
stock_code, stock_code,
scan_candidates, scan_candidates,
settings, settings,
buy_cooldown,
) )
break # Success — exit retry loop break # Success — exit retry loop
except CircuitBreakerTripped as exc: except CircuitBreakerTripped as exc:

View File

@@ -2194,6 +2194,268 @@ def test_start_dashboard_server_enabled_starts_thread() -> None:
mock_thread.start.assert_called_once() mock_thread.start.assert_called_once()
def test_start_dashboard_server_returns_none_when_uvicorn_missing() -> None:
"""Returns None (no thread) and logs a warning when uvicorn is not installed."""
settings = Settings(
KIS_APP_KEY="test_key",
KIS_APP_SECRET="test_secret",
KIS_ACCOUNT_NO="12345678-01",
GEMINI_API_KEY="test_gemini_key",
DASHBOARD_ENABLED=True,
)
import builtins
real_import = builtins.__import__
def mock_import(name: str, *args: object, **kwargs: object) -> object:
if name == "uvicorn":
raise ImportError("No module named 'uvicorn'")
return real_import(name, *args, **kwargs)
with patch("builtins.__import__", side_effect=mock_import):
thread = _start_dashboard_server(settings)
assert thread is None
# ---------------------------------------------------------------------------
# BUY cooldown tests (#179)
# ---------------------------------------------------------------------------
class TestBuyCooldown:
"""Tests for BUY cooldown after insufficient-balance rejection."""
@pytest.fixture
def mock_broker(self) -> MagicMock:
broker = MagicMock()
broker.get_current_price = AsyncMock(return_value=(100.0, 1.0, 0.0))
broker.get_balance = AsyncMock(
return_value={
"output2": [{"tot_evlu_amt": "1000000", "dnca_tot_amt": "500000",
"pchs_amt_smtl_amt": "500000"}]
}
)
broker.send_order = AsyncMock(return_value={"msg1": "OK"})
return broker
@pytest.fixture
def mock_market(self) -> MagicMock:
market = MagicMock()
market.name = "Korea"
market.code = "KR"
market.exchange_code = "KRX"
market.is_domestic = True
return market
@pytest.fixture
def mock_overseas_market(self) -> MagicMock:
market = MagicMock()
market.name = "NASDAQ"
market.code = "US_NASDAQ"
market.exchange_code = "NAS"
market.is_domestic = False
return market
@pytest.fixture
def mock_overseas_broker(self) -> MagicMock:
broker = MagicMock()
broker.get_overseas_price = AsyncMock(
return_value={"output": {"last": "1.0", "rate": "0.0",
"high": "1.05", "low": "0.95", "tvol": "1000000"}}
)
broker.get_overseas_balance = AsyncMock(return_value={
"output1": [],
"output2": [{"frcr_dncl_amt_2": "50000", "frcr_evlu_tota": "50000",
"frcr_buy_amt_smtl": "0"}],
})
broker.send_overseas_order = AsyncMock(
return_value={"rt_cd": "1", "msg1": "모의투자 주문가능금액이 부족합니다."}
)
return broker
def _make_buy_match_overseas(self, stock_code: str = "MLECW") -> ScenarioMatch:
return ScenarioMatch(
stock_code=stock_code,
matched_scenario=None,
action=ScenarioAction.BUY,
confidence=85,
rationale="Test buy",
)
@pytest.mark.asyncio
async def test_cooldown_set_on_insufficient_balance(
self, mock_broker: MagicMock, mock_overseas_broker: MagicMock,
mock_overseas_market: MagicMock,
) -> None:
"""BUY cooldown entry is created after 주문가능금액 rejection."""
engine = MagicMock(spec=ScenarioEngine)
engine.evaluate = MagicMock(return_value=self._make_buy_match_overseas("MLECW"))
buy_cooldown: dict[str, float] = {}
with patch("src.main.log_trade"):
await trading_cycle(
broker=mock_broker,
overseas_broker=mock_overseas_broker,
scenario_engine=engine,
playbook=_make_playbook("US_NASDAQ"),
risk=MagicMock(),
db_conn=MagicMock(),
decision_logger=MagicMock(),
context_store=MagicMock(get_latest_timeframe=MagicMock(return_value=None)),
criticality_assessor=MagicMock(
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
get_timeout=MagicMock(return_value=5.0),
),
telegram=MagicMock(
notify_trade_execution=AsyncMock(),
notify_fat_finger=AsyncMock(),
notify_circuit_breaker=AsyncMock(),
notify_scenario_matched=AsyncMock(),
),
market=mock_overseas_market,
stock_code="MLECW",
scan_candidates={},
buy_cooldown=buy_cooldown,
)
assert "US_NASDAQ:MLECW" in buy_cooldown
assert buy_cooldown["US_NASDAQ:MLECW"] > 0
@pytest.mark.asyncio
async def test_cooldown_skips_buy(
self, mock_broker: MagicMock, mock_overseas_broker: MagicMock,
mock_overseas_market: MagicMock,
) -> None:
"""BUY is skipped when cooldown is active for the stock."""
engine = MagicMock(spec=ScenarioEngine)
engine.evaluate = MagicMock(return_value=self._make_buy_match_overseas("MLECW"))
import asyncio
# Set an active cooldown (expires far in the future)
buy_cooldown: dict[str, float] = {
"US_NASDAQ:MLECW": asyncio.get_event_loop().time() + 600
}
with patch("src.main.log_trade"):
await trading_cycle(
broker=mock_broker,
overseas_broker=mock_overseas_broker,
scenario_engine=engine,
playbook=_make_playbook("US_NASDAQ"),
risk=MagicMock(),
db_conn=MagicMock(),
decision_logger=MagicMock(),
context_store=MagicMock(get_latest_timeframe=MagicMock(return_value=None)),
criticality_assessor=MagicMock(
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
get_timeout=MagicMock(return_value=5.0),
),
telegram=MagicMock(
notify_trade_execution=AsyncMock(),
notify_fat_finger=AsyncMock(),
notify_circuit_breaker=AsyncMock(),
notify_scenario_matched=AsyncMock(),
),
market=mock_overseas_market,
stock_code="MLECW",
scan_candidates={},
buy_cooldown=buy_cooldown,
)
# Order should NOT have been sent
mock_overseas_broker.send_overseas_order.assert_not_called()
@pytest.mark.asyncio
async def test_cooldown_not_set_on_other_errors(
self, mock_broker: MagicMock, mock_overseas_market: MagicMock,
) -> None:
"""Cooldown is NOT set for non-balance-related rejections."""
engine = MagicMock(spec=ScenarioEngine)
engine.evaluate = MagicMock(return_value=self._make_buy_match_overseas("MLECW"))
# Different rejection reason
overseas_broker = MagicMock()
overseas_broker.get_overseas_price = AsyncMock(
return_value={"output": {"last": "1.0", "rate": "0.0",
"high": "1.05", "low": "0.95", "tvol": "1000000"}}
)
overseas_broker.get_overseas_balance = AsyncMock(return_value={
"output1": [],
"output2": [{"frcr_dncl_amt_2": "50000", "frcr_evlu_tota": "50000",
"frcr_buy_amt_smtl": "0"}],
})
overseas_broker.send_overseas_order = AsyncMock(
return_value={"rt_cd": "1", "msg1": "기타 오류 메시지"}
)
buy_cooldown: dict[str, float] = {}
with patch("src.main.log_trade"):
await trading_cycle(
broker=mock_broker,
overseas_broker=overseas_broker,
scenario_engine=engine,
playbook=_make_playbook("US_NASDAQ"),
risk=MagicMock(),
db_conn=MagicMock(),
decision_logger=MagicMock(),
context_store=MagicMock(get_latest_timeframe=MagicMock(return_value=None)),
criticality_assessor=MagicMock(
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
get_timeout=MagicMock(return_value=5.0),
),
telegram=MagicMock(
notify_trade_execution=AsyncMock(),
notify_fat_finger=AsyncMock(),
notify_circuit_breaker=AsyncMock(),
notify_scenario_matched=AsyncMock(),
),
market=mock_overseas_market,
stock_code="MLECW",
scan_candidates={},
buy_cooldown=buy_cooldown,
)
# Cooldown should NOT be set for non-balance errors
assert "US_NASDAQ:MLECW" not in buy_cooldown
@pytest.mark.asyncio
async def test_no_cooldown_param_still_works(
self, mock_broker: MagicMock, mock_overseas_broker: MagicMock,
mock_overseas_market: MagicMock,
) -> None:
"""trading_cycle works normally when buy_cooldown is None (default)."""
engine = MagicMock(spec=ScenarioEngine)
engine.evaluate = MagicMock(return_value=self._make_buy_match_overseas("MLECW"))
with patch("src.main.log_trade"):
await trading_cycle(
broker=mock_broker,
overseas_broker=mock_overseas_broker,
scenario_engine=engine,
playbook=_make_playbook("US_NASDAQ"),
risk=MagicMock(),
db_conn=MagicMock(),
decision_logger=MagicMock(),
context_store=MagicMock(get_latest_timeframe=MagicMock(return_value=None)),
criticality_assessor=MagicMock(
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
get_timeout=MagicMock(return_value=5.0),
),
telegram=MagicMock(
notify_trade_execution=AsyncMock(),
notify_fat_finger=AsyncMock(),
notify_circuit_breaker=AsyncMock(),
notify_scenario_matched=AsyncMock(),
),
market=mock_overseas_market,
stock_code="MLECW",
scan_candidates={},
# buy_cooldown not passed → defaults to None
)
# Should attempt the order (and fail), but not crash
mock_overseas_broker.send_overseas_order.assert_called_once()
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# market_outlook BUY confidence threshold tests (#173) # market_outlook BUY confidence threshold tests (#173)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------

View File

@@ -350,6 +350,42 @@ class TestSmartVolatilityScanner:
assert [c.stock_code for c in candidates] == ["ABCD"] assert [c.stock_code for c in candidates] == ["ABCD"]
class TestImpliedRSIFormula:
"""Test the implied_rsi formula in SmartVolatilityScanner (issue #181)."""
def test_neutral_change_gives_neutral_rsi(self) -> None:
"""0% change → implied_rsi = 50 (neutral)."""
# formula: 50 + (change_rate * 2.0)
rsi = max(0.0, min(100.0, 50.0 + (0.0 * 2.0)))
assert rsi == 50.0
def test_10pct_change_gives_rsi_70(self) -> None:
"""10% upward change → implied_rsi = 70 (momentum signal)."""
rsi = max(0.0, min(100.0, 50.0 + (10.0 * 2.0)))
assert rsi == 70.0
def test_minus_10pct_gives_rsi_30(self) -> None:
"""-10% change → implied_rsi = 30 (oversold signal)."""
rsi = max(0.0, min(100.0, 50.0 + (-10.0 * 2.0)))
assert rsi == 30.0
def test_saturation_at_25pct(self) -> None:
"""Saturation occurs at >=25% change (not 12.5% as with old coefficient 4.0)."""
rsi_12pct = max(0.0, min(100.0, 50.0 + (12.5 * 2.0)))
rsi_25pct = max(0.0, min(100.0, 50.0 + (25.0 * 2.0)))
rsi_30pct = max(0.0, min(100.0, 50.0 + (30.0 * 2.0)))
# At 12.5% change: RSI = 75 (not 100, unlike old formula)
assert rsi_12pct == 75.0
# At 25%+ saturation
assert rsi_25pct == 100.0
assert rsi_30pct == 100.0 # Capped
def test_negative_saturation(self) -> None:
"""Saturation at -25% gives RSI = 0."""
rsi = max(0.0, min(100.0, 50.0 + (-25.0 * 2.0)))
assert rsi == 0.0
class TestRSICalculation: class TestRSICalculation:
"""Test RSI calculation in VolatilityAnalyzer.""" """Test RSI calculation in VolatilityAnalyzer."""