Compare commits

..

1 Commits

Author SHA1 Message Date
agentson
d19e5b0de6 feat: include current holdings in realtime trading loop for exit evaluation (#165)
Some checks failed
CI / test (pull_request) Has been cancelled
스캐너 후보 종목뿐 아니라 현재 보유 종목도 매 사이클마다 평가해
stop-loss / take-profit이 실제로 동작하도록 개선.

- db.py: get_open_positions_by_market() 추가
  - net BUY - SELL 집계 쿼리로 실제 보유 종목 코드 목록 반환
  - 단순 "최신 레코드 = BUY" 방식보다 안전 (이중 매도 방지)
- main.py: 실시간 루프에서 스캐너 후보 + 보유 종목을 union으로 구성
  - dict.fromkeys로 순서 유지하며 중복 제거
  - 스캐너에 없는 보유 종목은 로그로 명시
  - 보유 종목은 Playbook 없으면 HOLD → stop-loss/take-profit 체크
- tests/test_db.py: get_open_positions_by_market 테스트 5개 추가
  - net 양수 종목 포함, 전량 매도 제외, 부분 매도 포함
  - 마켓 범위 격리, 거래 없을 때 빈 리스트

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-20 03:08:49 +09:00
17 changed files with 155 additions and 2216 deletions

View File

@@ -192,27 +192,6 @@ When `TELEGRAM_COMMANDS_ENABLED=true` (default), the bot accepts these interacti
Commands are only processed from the authorized `TELEGRAM_CHAT_ID`.
## KIS API TR_ID 참조 문서
**TR_ID를 추가하거나 수정할 때 반드시 공식 문서를 먼저 확인할 것.**
공식 문서: `docs/한국투자증권_오픈API_전체문서_20260221_030000.xlsx`
> ⚠️ 커뮤니티 블로그, GitHub 예제 등 비공식 자료의 TR_ID는 오래되거나 틀릴 수 있음.
> 실제로 `VTTT1006U`(미국 매도 — 잘못됨)가 오랫동안 코드에 남아있던 사례가 있음 (Issue #189).
### 주요 TR_ID 목록
| 구분 | 모의투자 TR_ID | 실전투자 TR_ID | 시트명 |
|------|---------------|---------------|--------|
| 해외주식 매수 (미국) | `VTTT1002U` | `TTTT1002U` | 해외주식 주문 |
| 해외주식 매도 (미국) | `VTTT1001U` | `TTTT1006U` | 해외주식 주문 |
새로운 TR_ID가 필요할 때:
1. 위 xlsx 파일에서 해당 거래 유형의 시트를 찾는다.
2. 모의투자(`VTTT`) / 실전투자(`TTTT`) 컬럼을 구분하여 정확한 값을 사용한다.
3. 코드에 출처 주석을 남긴다: `# Source: 한국투자증권_오픈API_전체문서 — '<시트명>' 시트`
## Environment Setup
```bash

View File

@@ -7,32 +7,6 @@
---
## 2026-02-21
### 거래 상태 확인 중 발견된 버그 (#187)
- 거래 상태 점검 요청 → SELL 주문(손절/익절)이 Fat Finger에 막혀 전혀 실행 안 됨 발견
- **#187 (Critical)**: SELL 주문에서 Fat Finger 오탐 — `order_amount/total_cash > 30%`가 SELL에도 적용되어 대형 포지션 매도 불가
- JELD stop-loss -6.20% → 차단, RXT take-profit +46.13% → 차단
- 수정: SELL은 `check_circuit_breaker`만 호출, `validate_order`(Fat Finger 포함) 미호출
---
## 2026-02-20
### 지속적 모니터링 및 개선점 도출 (이슈 #178~#182)
- Dashboard 포함해서 실행하며 간헐적 문제 모니터링 및 개선점 자동 도출 요청
- 모니터링 결과 발견된 이슈 목록:
- **#178**: uvicorn 미설치 → dashboard 미작동 + 오해의 소지 있는 시작 로그 → uvicorn 설치 완료
- **#179 (Critical)**: 잔액 부족 주문 실패 후 매 사이클마다 무한 재시도 (MLECW 20분 이상 반복)
- **#180**: 다중 인스턴스 실행 시 Telegram 409 충돌
- **#181**: implied_rsi 공식 포화 문제 (change_rate≥12.5% → RSI=100)
- **#182 (Critical)**: 보유 종목이 SmartScanner 변동성 필터에 걸려 SELL 신호 미생성 → SELL 체결 0건, 잔고 소진
- 요구사항: 모니터링 자동화 및 주기적 개선점 리포트 도출
---
## 2026-02-05
### API 효율화

View File

@@ -175,7 +175,7 @@ class SmartVolatilityScanner:
liquidity_score = volume_rank_bonus.get(stock_code, 0.0)
score = min(100.0, volatility_score + liquidity_score)
signal = "momentum" if change_rate >= 0 else "oversold"
implied_rsi = max(0.0, min(100.0, 50.0 + (change_rate * 2.0)))
implied_rsi = max(0.0, min(100.0, 50.0 + (change_rate * 4.0)))
candidates.append(
ScanCandidate(
@@ -282,7 +282,7 @@ class SmartVolatilityScanner:
liquidity_score = volume_rank_bonus.get(stock_code, 0.0)
score = min(100.0, volatility_score + liquidity_score)
signal = "momentum" if change_rate >= 0 else "oversold"
implied_rsi = max(0.0, min(100.0, 50.0 + (change_rate * 2.0)))
implied_rsi = max(0.0, min(100.0, 50.0 + (change_rate * 4.0)))
candidates.append(
ScanCandidate(
stock_code=stock_code,
@@ -338,7 +338,7 @@ class SmartVolatilityScanner:
score = min(volatility_pct / 10.0, 1.0) * 100.0
signal = "momentum" if change_rate >= 0 else "oversold"
implied_rsi = max(0.0, min(100.0, 50.0 + (change_rate * 2.0)))
implied_rsi = max(0.0, min(100.0, 50.0 + (change_rate * 4.0)))
candidates.append(
ScanCandidate(
stock_code=stock_code,

View File

@@ -230,9 +230,7 @@ class OverseasBroker:
session = self._broker._get_session()
# Virtual trading TR_IDs for overseas orders
# Source: 한국투자증권 오픈API 전체문서 (20260221) — '해외주식 주문' 시트
# VTTT1002U: 모의투자 미국 매수, VTTT1001U: 모의투자 미국 매도
tr_id = "VTTT1002U" if order_type == "BUY" else "VTTT1001U"
tr_id = "VTTT1002U" if order_type == "BUY" else "VTTT1006U"
body = {
"CANO": self._broker._account_no,

View File

@@ -237,6 +237,28 @@ def get_open_position(
return {"decision_id": row[1], "price": row[2], "quantity": row[3]}
def get_open_positions_by_market(
conn: sqlite3.Connection, market: str
) -> list[str]:
"""Return stock codes with a net positive position in the given market.
Uses net BUY - SELL quantity aggregation to avoid false positives from
the simpler "latest record is BUY" heuristic. A stock is considered
open only when the bot's own recorded trades leave a positive net quantity.
"""
cursor = conn.execute(
"""
SELECT stock_code
FROM trades
WHERE market = ?
GROUP BY stock_code
HAVING SUM(CASE WHEN action = 'BUY' THEN quantity ELSE -quantity END) > 0
""",
(market,),
)
return [row[0] for row in cursor.fetchall()]
def get_recent_symbols(
conn: sqlite3.Connection, market: str, limit: int = 30
) -> list[str]:

View File

@@ -32,6 +32,7 @@ from src.core.risk_manager import CircuitBreakerTripped, FatFingerRejected, Risk
from src.db import (
get_latest_buy_trade,
get_open_position,
get_open_positions_by_market,
get_recent_symbols,
init_db,
log_trade,
@@ -42,7 +43,7 @@ from src.logging.decision_logger import DecisionLogger
from src.logging_config import setup_logging
from src.markets.schedule import MarketInfo, get_next_market_open, get_open_markets
from src.notifications.telegram_client import NotificationFilter, TelegramClient, TelegramCommandHandler
from src.strategy.models import DayPlaybook, MarketOutlook
from src.strategy.models import DayPlaybook
from src.strategy.playbook_store import PlaybookStore
from src.strategy.pre_market_planner import PreMarketPlanner
from src.strategy.scenario_engine import ScenarioEngine
@@ -81,7 +82,6 @@ def safe_float(value: str | float | None, default: float = 0.0) -> float:
TRADE_INTERVAL_SECONDS = 60
SCAN_INTERVAL_SECONDS = 60 # Scan markets every 60 seconds
MAX_CONNECTION_RETRIES = 3
_BUY_COOLDOWN_SECONDS = 600 # 10-minute cooldown after insufficient-balance rejection
# Daily trading mode constants (for Free tier API efficiency)
DAILY_TRADE_SESSIONS = 4 # Number of trading sessions per day
@@ -107,82 +107,6 @@ def _extract_symbol_from_holding(item: dict[str, Any]) -> str:
return ""
def _extract_held_codes_from_balance(
balance_data: dict[str, Any],
*,
is_domestic: bool,
) -> list[str]:
"""Return stock codes with a positive orderable quantity from a balance response.
Uses the broker's live output1 as the source of truth so that partial fills
and manual external trades are always reflected correctly.
"""
output1 = balance_data.get("output1", [])
if isinstance(output1, dict):
output1 = [output1]
if not isinstance(output1, list):
return []
codes: list[str] = []
for holding in output1:
if not isinstance(holding, dict):
continue
code_key = "pdno" if is_domestic else "ovrs_pdno"
code = str(holding.get(code_key, "")).strip().upper()
if not code:
continue
if is_domestic:
qty = int(holding.get("ord_psbl_qty") or holding.get("hldg_qty") or 0)
else:
qty = int(holding.get("ovrs_cblc_qty") or holding.get("hldg_qty") or 0)
if qty > 0:
codes.append(code)
return codes
def _extract_held_qty_from_balance(
balance_data: dict[str, Any],
stock_code: str,
*,
is_domestic: bool,
) -> int:
"""Extract the broker-confirmed orderable quantity for a stock.
Uses the broker's live balance response (output1) as the source of truth
rather than the local DB, because DB records reflect order quantity which
may differ from actual fill quantity due to partial fills.
Domestic fields (VTTC8434R output1):
pdno — 종목코드
ord_psbl_qty — 주문가능수량 (preferred: excludes unsettled)
hldg_qty — 보유수량 (fallback)
Overseas fields (output1):
ovrs_pdno — 종목코드
ovrs_cblc_qty — 해외잔고수량 (preferred)
hldg_qty — 보유수량 (fallback)
"""
output1 = balance_data.get("output1", [])
if isinstance(output1, dict):
output1 = [output1]
if not isinstance(output1, list):
return 0
for holding in output1:
if not isinstance(holding, dict):
continue
code_key = "pdno" if is_domestic else "ovrs_pdno"
held_code = str(holding.get(code_key, "")).strip().upper()
if held_code != stock_code.strip().upper():
continue
if is_domestic:
qty = int(holding.get("ord_psbl_qty") or holding.get("hldg_qty") or 0)
else:
qty = int(holding.get("ovrs_cblc_qty") or holding.get("hldg_qty") or 0)
return qty
return 0
def _determine_order_quantity(
*,
action: str,
@@ -190,40 +114,16 @@ def _determine_order_quantity(
total_cash: float,
candidate: ScanCandidate | None,
settings: Settings | None,
broker_held_qty: int = 0,
playbook_allocation_pct: float | None = None,
scenario_confidence: int = 80,
) -> int:
"""Determine order quantity using volatility-aware position sizing.
Priority:
1. playbook_allocation_pct (AI-specified) scaled by scenario_confidence
2. Fallback: volatility-score-based allocation from scanner candidate
"""
if action == "SELL":
return broker_held_qty
"""Determine order quantity using volatility-aware position sizing."""
if action != "BUY":
return 1
if current_price <= 0 or total_cash <= 0:
return 0
if settings is None or not settings.POSITION_SIZING_ENABLED:
return 1
# Use AI-specified allocation_pct if available
if playbook_allocation_pct is not None:
# Confidence scaling: confidence 80 → 1.0x, confidence 95 → 1.19x
confidence_scale = scenario_confidence / 80.0
effective_pct = min(
settings.POSITION_MAX_ALLOCATION_PCT,
max(
settings.POSITION_MIN_ALLOCATION_PCT,
playbook_allocation_pct * confidence_scale,
),
)
budget = total_cash * (effective_pct / 100.0)
quantity = int(budget // current_price)
return max(0, quantity)
# Fallback: volatility-score-based allocation
target_score = max(1.0, settings.POSITION_VOLATILITY_TARGET_SCORE)
observed_score = candidate.score if candidate else target_score
observed_score = max(1.0, min(100.0, observed_score))
@@ -299,7 +199,6 @@ async def trading_cycle(
stock_code: str,
scan_candidates: dict[str, dict[str, ScanCandidate]],
settings: Settings | None = None,
buy_cooldown: dict[str, float] | None = None,
) -> None:
"""Execute one trading cycle for a single stock."""
cycle_start_time = asyncio.get_event_loop().time()
@@ -482,53 +381,6 @@ async def trading_cycle(
)
stock_playbook = playbook.get_stock_playbook(stock_code)
# 2.1. Apply market_outlook-based BUY confidence threshold
if decision.action == "BUY":
base_threshold = (settings.CONFIDENCE_THRESHOLD if settings else 80)
outlook = playbook.market_outlook
if outlook == MarketOutlook.BEARISH:
min_confidence = 90
elif outlook == MarketOutlook.BULLISH:
min_confidence = 75
else:
min_confidence = base_threshold
if match.confidence < min_confidence:
logger.info(
"BUY suppressed for %s (%s): confidence %d < %d (market_outlook=%s)",
stock_code,
market.name,
match.confidence,
min_confidence,
outlook.value,
)
decision = TradeDecision(
action="HOLD",
confidence=match.confidence,
rationale=(
f"BUY confidence {match.confidence} < {min_confidence} "
f"(market_outlook={outlook.value})"
),
)
# BUY 결정 전 기존 포지션 체크 (중복 매수 방지)
if decision.action == "BUY":
existing_position = get_open_position(db_conn, stock_code, market.code)
if existing_position:
decision = TradeDecision(
action="HOLD",
confidence=decision.confidence,
rationale=(
f"Already holding {stock_code} "
f"(entry={existing_position['price']:.4f}, "
f"qty={existing_position['quantity']})"
),
)
logger.info(
"BUY suppressed for %s (%s): already holding open position",
stock_code,
market.name,
)
if decision.action == "HOLD":
open_position = get_open_position(db_conn, stock_code, market.code)
if open_position:
@@ -536,10 +388,8 @@ async def trading_cycle(
if entry_price > 0:
loss_pct = (current_price - entry_price) / entry_price * 100
stop_loss_threshold = -2.0
take_profit_threshold = 3.0
if stock_playbook and stock_playbook.scenarios:
stop_loss_threshold = stock_playbook.scenarios[0].stop_loss_pct
take_profit_threshold = stock_playbook.scenarios[0].take_profit_pct
if loss_pct <= stop_loss_threshold:
decision = TradeDecision(
@@ -557,22 +407,6 @@ async def trading_cycle(
loss_pct,
stop_loss_threshold,
)
elif loss_pct >= take_profit_threshold:
decision = TradeDecision(
action="SELL",
confidence=90,
rationale=(
f"Take-profit triggered ({loss_pct:.2f}% >= "
f"{take_profit_threshold:.2f}%)"
),
)
logger.info(
"Take-profit override for %s (%s): %.2f%% >= %.2f%%",
stock_code,
market.name,
loss_pct,
take_profit_threshold,
)
logger.info(
"Decision for %s (%s): %s (confidence=%d)",
stock_code,
@@ -633,23 +467,12 @@ async def trading_cycle(
trade_price = current_price
trade_pnl = 0.0
if decision.action in ("BUY", "SELL"):
broker_held_qty = (
_extract_held_qty_from_balance(
balance_data, stock_code, is_domestic=market.is_domestic
)
if decision.action == "SELL"
else 0
)
matched_scenario = match.matched_scenario
quantity = _determine_order_quantity(
action=decision.action,
current_price=current_price,
total_cash=total_cash,
candidate=candidate,
settings=settings,
broker_held_qty=broker_held_qty,
playbook_allocation_pct=matched_scenario.allocation_pct if matched_scenario else None,
scenario_confidence=match.confidence,
)
if quantity <= 0:
logger.info(
@@ -663,33 +486,13 @@ async def trading_cycle(
return
order_amount = current_price * quantity
# 4. Check BUY cooldown (set when a prior BUY failed due to insufficient balance)
if decision.action == "BUY" and buy_cooldown is not None:
cooldown_key = f"{market.code}:{stock_code}"
cooldown_until = buy_cooldown.get(cooldown_key, 0.0)
now = asyncio.get_event_loop().time()
if now < cooldown_until:
remaining = int(cooldown_until - now)
logger.info(
"Skip BUY %s (%s): insufficient-balance cooldown active (%ds remaining)",
stock_code,
market.name,
remaining,
)
return
# 5a. Risk check BEFORE order
# SELL orders do not consume cash (they receive it), so fat-finger check
# is skipped for SELLs — only circuit breaker applies.
# 4. Risk check BEFORE order
try:
if decision.action == "SELL":
risk.check_circuit_breaker(pnl_pct)
else:
risk.validate_order(
current_pnl_pct=pnl_pct,
order_amount=order_amount,
total_cash=total_cash,
)
risk.validate_order(
current_pnl_pct=pnl_pct,
order_amount=order_amount,
total_cash=total_cash,
)
except FatFingerRejected as exc:
try:
await telegram.notify_fat_finger(
@@ -731,24 +534,12 @@ async def trading_cycle(
# Check if KIS rejected the order (rt_cd != "0")
if result.get("rt_cd", "") != "0":
order_succeeded = False
msg1 = result.get("msg1") or ""
logger.warning(
"Overseas order not accepted for %s: rt_cd=%s msg=%s",
stock_code,
result.get("rt_cd"),
msg1,
result.get("msg1"),
)
# Set BUY cooldown when the rejection is due to insufficient balance
if decision.action == "BUY" and buy_cooldown is not None and "주문가능금액" in msg1:
cooldown_key = f"{market.code}:{stock_code}"
buy_cooldown[cooldown_key] = (
asyncio.get_event_loop().time() + _BUY_COOLDOWN_SECONDS
)
logger.info(
"BUY cooldown set for %s: %.0fs (insufficient balance)",
stock_code,
_BUY_COOLDOWN_SECONDS,
)
logger.info("Order result: %s", result.get("msg1", "OK"))
# 5.5. Notify trade execution (only on success)
@@ -856,9 +647,6 @@ async def run_daily_session(
logger.info("Starting daily trading session for %d markets", len(open_markets))
# BUY cooldown: prevents retrying stocks rejected for insufficient balance
daily_buy_cooldown: dict[str, float] = {} # "{market_code}:{stock_code}" -> expiry timestamp
# Process each open market
for market in open_markets:
# Use market-local date for playbook keying
@@ -1104,20 +892,12 @@ async def run_daily_session(
trade_pnl = 0.0
order_succeeded = True
if decision.action in ("BUY", "SELL"):
daily_broker_held_qty = (
_extract_held_qty_from_balance(
balance_data, stock_code, is_domestic=market.is_domestic
)
if decision.action == "SELL"
else 0
)
quantity = _determine_order_quantity(
action=decision.action,
current_price=stock_data["current_price"],
total_cash=total_cash,
candidate=candidate_map.get(stock_code),
settings=settings,
broker_held_qty=daily_broker_held_qty,
)
if quantity <= 0:
logger.info(
@@ -1131,33 +911,13 @@ async def run_daily_session(
continue
order_amount = stock_data["current_price"] * quantity
# Check BUY cooldown (insufficient balance)
if decision.action == "BUY":
daily_cooldown_key = f"{market.code}:{stock_code}"
daily_cooldown_until = daily_buy_cooldown.get(daily_cooldown_key, 0.0)
now = asyncio.get_event_loop().time()
if now < daily_cooldown_until:
remaining = int(daily_cooldown_until - now)
logger.info(
"Skip BUY %s (%s): insufficient-balance cooldown active (%ds remaining)",
stock_code,
market.name,
remaining,
)
continue
# Risk check
# SELL orders do not consume cash (they receive it), so fat-finger
# check is skipped for SELLs — only circuit breaker applies.
try:
if decision.action == "SELL":
risk.check_circuit_breaker(pnl_pct)
else:
risk.validate_order(
current_pnl_pct=pnl_pct,
order_amount=order_amount,
total_cash=total_cash,
)
risk.validate_order(
current_pnl_pct=pnl_pct,
order_amount=order_amount,
total_cash=total_cash,
)
except FatFingerRejected as exc:
try:
await telegram.notify_fat_finger(
@@ -1207,23 +967,12 @@ async def run_daily_session(
)
if result.get("rt_cd", "") != "0":
order_succeeded = False
daily_msg1 = result.get("msg1") or ""
logger.warning(
"Overseas order not accepted for %s: rt_cd=%s msg=%s",
stock_code,
result.get("rt_cd"),
daily_msg1,
result.get("msg1"),
)
if decision.action == "BUY" and "주문가능금액" in daily_msg1:
daily_cooldown_key = f"{market.code}:{stock_code}"
daily_buy_cooldown[daily_cooldown_key] = (
asyncio.get_event_loop().time() + _BUY_COOLDOWN_SECONDS
)
logger.info(
"BUY cooldown set for %s: %.0fs (insufficient balance)",
stock_code,
_BUY_COOLDOWN_SECONDS,
)
logger.info("Order result: %s", result.get("msg1", "OK"))
# Notify trade execution (only on success)
@@ -1387,18 +1136,10 @@ def _start_dashboard_server(settings: Settings) -> threading.Thread | None:
if not settings.DASHBOARD_ENABLED:
return None
# Validate dependencies before spawning the thread so startup failures are
# reported synchronously (avoids the misleading "started" → "failed" log pair).
try:
import uvicorn # noqa: F401
from src.dashboard import create_dashboard_app # noqa: F401
except ImportError as exc:
logger.warning("Dashboard server unavailable (missing dependency): %s", exc)
return None
def _serve() -> None:
try:
import uvicorn
from src.dashboard import create_dashboard_app
app = create_dashboard_app(settings.DB_PATH)
@@ -1409,7 +1150,7 @@ def _start_dashboard_server(settings: Settings) -> threading.Thread | None:
log_level="info",
)
except Exception as exc:
logger.warning("Dashboard server stopped unexpectedly: %s", exc)
logger.warning("Dashboard server failed to start: %s", exc)
thread = threading.Thread(
target=_serve,
@@ -1850,9 +1591,6 @@ async def run(settings: Settings) -> None:
# Active stocks per market (dynamically discovered by scanner)
active_stocks: dict[str, list[str]] = {} # market_code -> [stock_codes]
# BUY cooldown: prevents retrying a stock rejected for insufficient balance
buy_cooldown: dict[str, float] = {} # "{market_code}:{stock_code}" -> expiry timestamp
# Initialize latency control system
criticality_assessor = CriticalityAssessor(
critical_pnl_threshold=-2.5, # Near circuit breaker at -3.0%
@@ -2126,38 +1864,22 @@ async def run(settings: Settings) -> None:
except Exception as exc:
logger.error("Smart Scanner failed for %s: %s", market.name, exc)
# Get active stocks from scanner (dynamic, no static fallback).
# Also include currently-held positions so stop-loss /
# take-profit can fire even when a holding drops off the
# scanner. Broker balance is the source of truth here —
# unlike the local DB it reflects actual fills and any
# manual trades done outside the bot.
# Get active stocks from scanner (dynamic, no static fallback)
# Also include current holdings so stop-loss / take-profit
# can trigger even when a position drops off the scanner.
scanner_codes = active_stocks.get(market.code, [])
try:
if market.is_domestic:
held_balance = await broker.get_balance()
else:
held_balance = await overseas_broker.get_overseas_balance(
market.exchange_code
)
held_codes = _extract_held_codes_from_balance(
held_balance, is_domestic=market.is_domestic
)
except Exception as exc:
logger.warning(
"Failed to fetch holdings for %s: %s — skipping holdings merge",
market.name, exc,
)
held_codes = []
held_codes = get_open_positions_by_market(db_conn, market.code)
# Union: scanner candidates first, then holdings not already present.
# dict.fromkeys preserves insertion order and removes duplicates.
stock_codes = list(dict.fromkeys(scanner_codes + held_codes))
extra_held = [c for c in held_codes if c not in set(scanner_codes)]
if extra_held:
logger.info(
"Holdings added to loop for %s (not in scanner): %s",
market.name, extra_held,
)
if held_codes:
new_held = [c for c in held_codes if c not in set(scanner_codes)]
if new_held:
logger.info(
"Holdings added to loop for %s (not in scanner): %s",
market.name,
new_held,
)
if not stock_codes:
logger.debug("No active stocks for market %s", market.code)
continue
@@ -2195,7 +1917,6 @@ async def run(settings: Settings) -> None:
stock_code,
scan_candidates,
settings,
buy_cooldown,
)
break # Success — exit retry loop
except CircuitBreakerTripped as exc:

View File

@@ -604,19 +604,9 @@ class TelegramCommandHandler:
async with session.post(url, json=payload) as resp:
if resp.status != 200:
error_text = await resp.text()
if resp.status == 409:
# Another bot instance is already polling — stop this poller entirely.
# Retrying would keep conflicting with the other instance.
self._running = False
logger.warning(
"Telegram conflict (409): another instance is already polling. "
"Disabling Telegram commands for this process. "
"Ensure only one instance of The Ouroboros is running at a time.",
)
else:
logger.error(
"getUpdates API error (status=%d): %s", resp.status, error_text
)
logger.error(
"getUpdates API error (status=%d): %s", resp.status, error_text
)
return []
data = await resp.json()

View File

@@ -46,18 +46,6 @@ class StockCondition(BaseModel):
The ScenarioEngine evaluates all non-None fields as AND conditions.
A condition matches only if ALL specified fields are satisfied.
Technical indicator fields:
rsi_below / rsi_above — RSI threshold
volume_ratio_above / volume_ratio_below — volume vs previous day
price_above / price_below — absolute price level
price_change_pct_above / price_change_pct_below — intraday % change
Position-aware fields (require market_data enrichment from open position):
unrealized_pnl_pct_above — matches if unrealized P&L > threshold (e.g. 3.0 → +3%)
unrealized_pnl_pct_below — matches if unrealized P&L < threshold (e.g. -2.0 → -2%)
holding_days_above — matches if position held for more than N days
holding_days_below — matches if position held for fewer than N days
"""
rsi_below: float | None = None
@@ -68,10 +56,6 @@ class StockCondition(BaseModel):
price_below: float | None = None
price_change_pct_above: float | None = None
price_change_pct_below: float | None = None
unrealized_pnl_pct_above: float | None = None
unrealized_pnl_pct_below: float | None = None
holding_days_above: int | None = None
holding_days_below: int | None = None
def has_any_condition(self) -> bool:
"""Check if at least one condition field is set."""
@@ -86,10 +70,6 @@ class StockCondition(BaseModel):
self.price_below,
self.price_change_pct_above,
self.price_change_pct_below,
self.unrealized_pnl_pct_above,
self.unrealized_pnl_pct_below,
self.holding_days_above,
self.holding_days_below,
)
)

View File

@@ -75,7 +75,6 @@ class PreMarketPlanner:
market: str,
candidates: list[ScanCandidate],
today: date | None = None,
current_holdings: list[dict] | None = None,
) -> DayPlaybook:
"""Generate a DayPlaybook for a market using Gemini.
@@ -83,10 +82,6 @@ class PreMarketPlanner:
market: Market code ("KR" or "US")
candidates: Stock candidates from SmartVolatilityScanner
today: Override date (defaults to date.today()). Use market-local date.
current_holdings: Currently held positions with entry_price and unrealized_pnl_pct.
Each dict: {"stock_code": str, "name": str, "qty": int,
"entry_price": float, "unrealized_pnl_pct": float,
"holding_days": int}
Returns:
DayPlaybook with scenarios. Empty/defensive if no candidates or failure.
@@ -111,7 +106,6 @@ class PreMarketPlanner:
context_data,
self_market_scorecard,
cross_market,
current_holdings=current_holdings,
)
# 3. Call Gemini
@@ -124,8 +118,7 @@ class PreMarketPlanner:
# 4. Parse response
playbook = self._parse_response(
decision.rationale, today, market, candidates, cross_market,
current_holdings=current_holdings,
decision.rationale, today, market, candidates, cross_market
)
playbook_with_tokens = playbook.model_copy(
update={"token_count": decision.token_count}
@@ -237,7 +230,6 @@ class PreMarketPlanner:
context_data: dict[str, Any],
self_market_scorecard: dict[str, Any] | None,
cross_market: CrossMarketContext | None,
current_holdings: list[dict] | None = None,
) -> str:
"""Build a structured prompt for Gemini to generate scenario JSON."""
max_scenarios = self._settings.MAX_SCENARIOS_PER_STOCK
@@ -249,26 +241,6 @@ class PreMarketPlanner:
for c in candidates
)
holdings_text = ""
if current_holdings:
lines = []
for h in current_holdings:
code = h.get("stock_code", "")
name = h.get("name", "")
qty = h.get("qty", 0)
entry_price = h.get("entry_price", 0.0)
pnl_pct = h.get("unrealized_pnl_pct", 0.0)
holding_days = h.get("holding_days", 0)
lines.append(
f" - {code} ({name}): {qty}주 @ {entry_price:,.0f}, "
f"미실현손익 {pnl_pct:+.2f}%, 보유 {holding_days}"
)
holdings_text = (
"\n## Current Holdings (보유 중 — SELL/HOLD 전략 고려 필요)\n"
+ "\n".join(lines)
+ "\n"
)
cross_market_text = ""
if cross_market:
cross_market_text = (
@@ -301,20 +273,10 @@ class PreMarketPlanner:
for key, value in list(layer_data.items())[:5]:
context_text += f" - {key}: {value}\n"
holdings_instruction = ""
if current_holdings:
holding_codes = [h.get("stock_code", "") for h in current_holdings]
holdings_instruction = (
f"- Also include SELL/HOLD scenarios for held stocks: "
f"{', '.join(holding_codes)} "
f"(even if not in candidates list)\n"
)
return (
f"You are a pre-market trading strategist for the {market} market.\n"
f"Generate structured trading scenarios for today.\n\n"
f"## Candidates (from volatility scanner)\n{candidates_text}\n"
f"{holdings_text}"
f"{self_market_text}"
f"{cross_market_text}"
f"{context_text}\n"
@@ -332,8 +294,7 @@ class PreMarketPlanner:
f' "stock_code": "...",\n'
f' "scenarios": [\n'
f' {{\n'
f' "condition": {{"rsi_below": 30, "volume_ratio_above": 2.0,'
f' "unrealized_pnl_pct_above": 3.0, "holding_days_above": 5}},\n'
f' "condition": {{"rsi_below": 30, "volume_ratio_above": 2.0}},\n'
f' "action": "BUY|SELL|HOLD",\n'
f' "confidence": 85,\n'
f' "allocation_pct": 10.0,\n'
@@ -347,8 +308,7 @@ class PreMarketPlanner:
f'}}\n\n'
f"Rules:\n"
f"- Max {max_scenarios} scenarios per stock\n"
f"- Candidates list is the primary source for BUY candidates\n"
f"{holdings_instruction}"
f"- Only use stocks from the candidates list\n"
f"- Confidence 0-100 (80+ for actionable trades)\n"
f"- stop_loss_pct must be <= 0, take_profit_pct must be >= 0\n"
f"- Return ONLY the JSON, no markdown fences or explanation\n"
@@ -361,19 +321,12 @@ class PreMarketPlanner:
market: str,
candidates: list[ScanCandidate],
cross_market: CrossMarketContext | None,
current_holdings: list[dict] | None = None,
) -> DayPlaybook:
"""Parse Gemini's JSON response into a validated DayPlaybook."""
cleaned = self._extract_json(response_text)
data = json.loads(cleaned)
valid_codes = {c.stock_code for c in candidates}
# Holdings are also valid — AI may generate SELL/HOLD scenarios for them
if current_holdings:
for h in current_holdings:
code = h.get("stock_code", "")
if code:
valid_codes.add(code)
# Parse market outlook
outlook_str = data.get("market_outlook", "neutral")
@@ -437,10 +390,6 @@ class PreMarketPlanner:
price_below=cond_data.get("price_below"),
price_change_pct_above=cond_data.get("price_change_pct_above"),
price_change_pct_below=cond_data.get("price_change_pct_below"),
unrealized_pnl_pct_above=cond_data.get("unrealized_pnl_pct_above"),
unrealized_pnl_pct_below=cond_data.get("unrealized_pnl_pct_below"),
holding_days_above=cond_data.get("holding_days_above"),
holding_days_below=cond_data.get("holding_days_below"),
)
if not condition.has_any_condition():

View File

@@ -206,37 +206,6 @@ class ScenarioEngine:
if condition.price_change_pct_below is not None:
checks.append(price_change_pct is not None and price_change_pct < condition.price_change_pct_below)
# Position-aware conditions
unrealized_pnl_pct = self._safe_float(market_data.get("unrealized_pnl_pct"))
if condition.unrealized_pnl_pct_above is not None or condition.unrealized_pnl_pct_below is not None:
if "unrealized_pnl_pct" not in market_data:
self._warn_missing_key("unrealized_pnl_pct")
if condition.unrealized_pnl_pct_above is not None:
checks.append(
unrealized_pnl_pct is not None
and unrealized_pnl_pct > condition.unrealized_pnl_pct_above
)
if condition.unrealized_pnl_pct_below is not None:
checks.append(
unrealized_pnl_pct is not None
and unrealized_pnl_pct < condition.unrealized_pnl_pct_below
)
holding_days = self._safe_float(market_data.get("holding_days"))
if condition.holding_days_above is not None or condition.holding_days_below is not None:
if "holding_days" not in market_data:
self._warn_missing_key("holding_days")
if condition.holding_days_above is not None:
checks.append(
holding_days is not None
and holding_days > condition.holding_days_above
)
if condition.holding_days_below is not None:
checks.append(
holding_days is not None
and holding_days < condition.holding_days_below
)
return len(checks) > 0 and all(checks)
def _evaluate_global_condition(
@@ -297,9 +266,5 @@ class ScenarioEngine:
details["current_price"] = self._safe_float(market_data.get("current_price"))
if condition.price_change_pct_above is not None or condition.price_change_pct_below is not None:
details["price_change_pct"] = self._safe_float(market_data.get("price_change_pct"))
if condition.unrealized_pnl_pct_above is not None or condition.unrealized_pnl_pct_below is not None:
details["unrealized_pnl_pct"] = self._safe_float(market_data.get("unrealized_pnl_pct"))
if condition.holding_days_above is not None or condition.holding_days_below is not None:
details["holding_days"] = self._safe_float(market_data.get("holding_days"))
return details

View File

@@ -1,6 +1,6 @@
"""Tests for database helper functions."""
from src.db import get_open_position, init_db, log_trade
from src.db import get_open_position, get_open_positions_by_market, init_db, log_trade
def test_get_open_position_returns_latest_buy() -> None:
@@ -58,3 +58,87 @@ def test_get_open_position_returns_none_when_latest_is_sell() -> None:
def test_get_open_position_returns_none_when_no_trades() -> None:
conn = init_db(":memory:")
assert get_open_position(conn, "AAPL", "US_NASDAQ") is None
# --- get_open_positions_by_market tests ---
def test_get_open_positions_by_market_returns_net_positive_stocks() -> None:
"""Stocks with net BUY quantity > 0 are included."""
conn = init_db(":memory:")
log_trade(
conn=conn, stock_code="005930", action="BUY", confidence=90,
rationale="entry", quantity=5, price=70000.0, market="KR",
exchange_code="KRX", decision_id="d1",
)
log_trade(
conn=conn, stock_code="000660", action="BUY", confidence=85,
rationale="entry", quantity=3, price=100000.0, market="KR",
exchange_code="KRX", decision_id="d2",
)
result = get_open_positions_by_market(conn, "KR")
assert set(result) == {"005930", "000660"}
def test_get_open_positions_by_market_excludes_fully_sold_stocks() -> None:
"""Stocks where BUY qty == SELL qty are excluded (net qty = 0)."""
conn = init_db(":memory:")
log_trade(
conn=conn, stock_code="005930", action="BUY", confidence=90,
rationale="entry", quantity=3, price=70000.0, market="KR",
exchange_code="KRX", decision_id="d1",
)
log_trade(
conn=conn, stock_code="005930", action="SELL", confidence=95,
rationale="exit", quantity=3, price=71000.0, market="KR",
exchange_code="KRX", decision_id="d2",
)
result = get_open_positions_by_market(conn, "KR")
assert "005930" not in result
def test_get_open_positions_by_market_includes_partially_sold_stocks() -> None:
"""Stocks with partial SELL (net qty > 0) are still included."""
conn = init_db(":memory:")
log_trade(
conn=conn, stock_code="005930", action="BUY", confidence=90,
rationale="entry", quantity=5, price=70000.0, market="KR",
exchange_code="KRX", decision_id="d1",
)
log_trade(
conn=conn, stock_code="005930", action="SELL", confidence=95,
rationale="partial exit", quantity=2, price=71000.0, market="KR",
exchange_code="KRX", decision_id="d2",
)
result = get_open_positions_by_market(conn, "KR")
assert "005930" in result
def test_get_open_positions_by_market_is_market_scoped() -> None:
"""Only stocks from the specified market are returned."""
conn = init_db(":memory:")
log_trade(
conn=conn, stock_code="005930", action="BUY", confidence=90,
rationale="entry", quantity=3, price=70000.0, market="KR",
exchange_code="KRX", decision_id="d1",
)
log_trade(
conn=conn, stock_code="AAPL", action="BUY", confidence=85,
rationale="entry", quantity=2, price=200.0, market="NASD",
exchange_code="NAS", decision_id="d2",
)
kr_result = get_open_positions_by_market(conn, "KR")
nasd_result = get_open_positions_by_market(conn, "NASD")
assert kr_result == ["005930"]
assert nasd_result == ["AAPL"]
def test_get_open_positions_by_market_returns_empty_when_no_trades() -> None:
"""Empty list returned when no trades exist for the market."""
conn = init_db(":memory:")
assert get_open_positions_by_market(conn, "KR") == []

File diff suppressed because it is too large Load Diff

View File

@@ -414,7 +414,7 @@ class TestSendOverseasOrder:
@pytest.mark.asyncio
async def test_sell_limit_order(self, overseas_broker: OverseasBroker) -> None:
"""Limit sell order should use VTTT1001U and ORD_DVSN=00."""
"""Limit sell order should use VTTT1006U and ORD_DVSN=00."""
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.json = AsyncMock(return_value={"rt_cd": "0"})
@@ -428,7 +428,7 @@ class TestSendOverseasOrder:
result = await overseas_broker.send_overseas_order("NYSE", "MSFT", "SELL", 5, price=350.0)
assert result["rt_cd"] == "0"
overseas_broker._broker._auth_headers.assert_called_with("VTTT1001U")
overseas_broker._broker._auth_headers.assert_called_with("VTTT1006U")
call_args = mock_session.post.call_args
body = call_args[1]["json"]

View File

@@ -830,171 +830,3 @@ class TestSmartFallbackPlaybook:
]
assert len(buy_scenarios) == 1
assert buy_scenarios[0].condition.volume_ratio_above == 2.0 # VOL_MULTIPLIER default
# ---------------------------------------------------------------------------
# Holdings in prompt (#170)
# ---------------------------------------------------------------------------
class TestHoldingsInPrompt:
"""Tests for current_holdings parameter in generate_playbook / _build_prompt."""
def _make_holdings(self) -> list[dict]:
return [
{
"stock_code": "005930",
"name": "Samsung",
"qty": 10,
"entry_price": 71000.0,
"unrealized_pnl_pct": 2.3,
"holding_days": 3,
}
]
def test_build_prompt_includes_holdings_section(self) -> None:
"""Prompt should contain a Current Holdings section when holdings are given."""
planner = _make_planner()
candidates = [_candidate()]
holdings = self._make_holdings()
prompt = planner._build_prompt(
"KR",
candidates,
context_data={},
self_market_scorecard=None,
cross_market=None,
current_holdings=holdings,
)
assert "## Current Holdings" in prompt
assert "005930" in prompt
assert "+2.30%" in prompt
assert "보유 3일" in prompt
def test_build_prompt_no_holdings_omits_section(self) -> None:
"""Prompt should NOT contain a Current Holdings section when holdings=None."""
planner = _make_planner()
candidates = [_candidate()]
prompt = planner._build_prompt(
"KR",
candidates,
context_data={},
self_market_scorecard=None,
cross_market=None,
current_holdings=None,
)
assert "## Current Holdings" not in prompt
def test_build_prompt_empty_holdings_omits_section(self) -> None:
"""Empty list should also omit the holdings section."""
planner = _make_planner()
candidates = [_candidate()]
prompt = planner._build_prompt(
"KR",
candidates,
context_data={},
self_market_scorecard=None,
cross_market=None,
current_holdings=[],
)
assert "## Current Holdings" not in prompt
def test_build_prompt_holdings_instruction_included(self) -> None:
"""Prompt should include instruction to generate scenarios for held stocks."""
planner = _make_planner()
candidates = [_candidate()]
holdings = self._make_holdings()
prompt = planner._build_prompt(
"KR",
candidates,
context_data={},
self_market_scorecard=None,
cross_market=None,
current_holdings=holdings,
)
assert "005930" in prompt
assert "SELL/HOLD" in prompt
@pytest.mark.asyncio
async def test_generate_playbook_passes_holdings_to_prompt(self) -> None:
"""generate_playbook should pass current_holdings through to the prompt."""
planner = _make_planner()
candidates = [_candidate()]
holdings = self._make_holdings()
# Capture the actual prompt sent to Gemini
captured_prompts: list[str] = []
original_decide = planner._gemini.decide
async def capture_and_call(data: dict) -> TradeDecision:
captured_prompts.append(data.get("prompt_override", ""))
return await original_decide(data)
planner._gemini.decide = capture_and_call # type: ignore[method-assign]
await planner.generate_playbook(
"KR", candidates, today=date(2026, 2, 8), current_holdings=holdings
)
assert len(captured_prompts) == 1
assert "## Current Holdings" in captured_prompts[0]
assert "005930" in captured_prompts[0]
@pytest.mark.asyncio
async def test_holdings_stock_allowed_in_parse_response(self) -> None:
"""Holdings stocks not in candidates list should be accepted in the response."""
holding_code = "000660" # Not in candidates
stocks = [
{
"stock_code": "005930", # candidate
"scenarios": [
{
"condition": {"rsi_below": 30},
"action": "BUY",
"confidence": 85,
"rationale": "oversold",
}
],
},
{
"stock_code": holding_code, # holding only
"scenarios": [
{
"condition": {"price_change_pct_below": -2.0},
"action": "SELL",
"confidence": 90,
"rationale": "stop-loss",
}
],
},
]
planner = _make_planner(gemini_response=_gemini_response_json(stocks=stocks))
candidates = [_candidate()] # only 005930
holdings = [
{
"stock_code": holding_code,
"name": "SK Hynix",
"qty": 5,
"entry_price": 180000.0,
"unrealized_pnl_pct": -1.5,
"holding_days": 7,
}
]
pb = await planner.generate_playbook(
"KR",
candidates,
today=date(2026, 2, 8),
current_holdings=holdings,
)
codes = [sp.stock_code for sp in pb.stock_playbooks]
assert "005930" in codes
assert holding_code in codes

View File

@@ -440,135 +440,3 @@ class TestEvaluate:
assert result.action == ScenarioAction.BUY
assert result.match_details["rsi"] == 25.0
assert isinstance(result.match_details["rsi"], float)
# ---------------------------------------------------------------------------
# Position-aware condition tests (#171)
# ---------------------------------------------------------------------------
class TestPositionAwareConditions:
"""Tests for unrealized_pnl_pct and holding_days condition fields."""
def test_evaluate_condition_unrealized_pnl_above_matches(
self, engine: ScenarioEngine
) -> None:
"""unrealized_pnl_pct_above should match when P&L exceeds threshold."""
condition = StockCondition(unrealized_pnl_pct_above=3.0)
assert engine.evaluate_condition(condition, {"unrealized_pnl_pct": 5.0}) is True
def test_evaluate_condition_unrealized_pnl_above_no_match(
self, engine: ScenarioEngine
) -> None:
"""unrealized_pnl_pct_above should NOT match when P&L is below threshold."""
condition = StockCondition(unrealized_pnl_pct_above=3.0)
assert engine.evaluate_condition(condition, {"unrealized_pnl_pct": 2.0}) is False
def test_evaluate_condition_unrealized_pnl_below_matches(
self, engine: ScenarioEngine
) -> None:
"""unrealized_pnl_pct_below should match when P&L is under threshold."""
condition = StockCondition(unrealized_pnl_pct_below=-2.0)
assert engine.evaluate_condition(condition, {"unrealized_pnl_pct": -3.5}) is True
def test_evaluate_condition_unrealized_pnl_below_no_match(
self, engine: ScenarioEngine
) -> None:
"""unrealized_pnl_pct_below should NOT match when P&L is above threshold."""
condition = StockCondition(unrealized_pnl_pct_below=-2.0)
assert engine.evaluate_condition(condition, {"unrealized_pnl_pct": -1.0}) is False
def test_evaluate_condition_holding_days_above_matches(
self, engine: ScenarioEngine
) -> None:
"""holding_days_above should match when position held longer than threshold."""
condition = StockCondition(holding_days_above=5)
assert engine.evaluate_condition(condition, {"holding_days": 7}) is True
def test_evaluate_condition_holding_days_above_no_match(
self, engine: ScenarioEngine
) -> None:
"""holding_days_above should NOT match when position held shorter."""
condition = StockCondition(holding_days_above=5)
assert engine.evaluate_condition(condition, {"holding_days": 3}) is False
def test_evaluate_condition_holding_days_below_matches(
self, engine: ScenarioEngine
) -> None:
"""holding_days_below should match when position held fewer days."""
condition = StockCondition(holding_days_below=3)
assert engine.evaluate_condition(condition, {"holding_days": 1}) is True
def test_evaluate_condition_holding_days_below_no_match(
self, engine: ScenarioEngine
) -> None:
"""holding_days_below should NOT match when held more days."""
condition = StockCondition(holding_days_below=3)
assert engine.evaluate_condition(condition, {"holding_days": 5}) is False
def test_combined_pnl_and_holding_days(self, engine: ScenarioEngine) -> None:
"""Combined position-aware conditions should AND-evaluate correctly."""
condition = StockCondition(
unrealized_pnl_pct_above=3.0,
holding_days_above=5,
)
# Both met → match
assert engine.evaluate_condition(
condition,
{"unrealized_pnl_pct": 4.5, "holding_days": 7},
) is True
# Only pnl met → no match
assert engine.evaluate_condition(
condition,
{"unrealized_pnl_pct": 4.5, "holding_days": 3},
) is False
def test_missing_unrealized_pnl_does_not_match(
self, engine: ScenarioEngine
) -> None:
"""Missing unrealized_pnl_pct key should not match the condition."""
condition = StockCondition(unrealized_pnl_pct_above=3.0)
assert engine.evaluate_condition(condition, {}) is False
def test_missing_holding_days_does_not_match(
self, engine: ScenarioEngine
) -> None:
"""Missing holding_days key should not match the condition."""
condition = StockCondition(holding_days_above=5)
assert engine.evaluate_condition(condition, {}) is False
def test_match_details_includes_position_fields(
self, engine: ScenarioEngine
) -> None:
"""match_details should include position fields when condition specifies them."""
pb = _playbook(
scenarios=[
StockScenario(
condition=StockCondition(unrealized_pnl_pct_above=3.0),
action=ScenarioAction.SELL,
confidence=90,
rationale="Take profit",
)
]
)
result = engine.evaluate(
pb,
"005930",
{"unrealized_pnl_pct": 5.0},
{},
)
assert result.action == ScenarioAction.SELL
assert "unrealized_pnl_pct" in result.match_details
assert result.match_details["unrealized_pnl_pct"] == 5.0
def test_position_conditions_parse_from_planner(self) -> None:
"""StockCondition should accept and store new fields from JSON parsing."""
condition = StockCondition(
unrealized_pnl_pct_above=3.0,
unrealized_pnl_pct_below=None,
holding_days_above=5,
holding_days_below=None,
)
assert condition.unrealized_pnl_pct_above == 3.0
assert condition.holding_days_above == 5
assert condition.has_any_condition() is True

View File

@@ -350,42 +350,6 @@ class TestSmartVolatilityScanner:
assert [c.stock_code for c in candidates] == ["ABCD"]
class TestImpliedRSIFormula:
"""Test the implied_rsi formula in SmartVolatilityScanner (issue #181)."""
def test_neutral_change_gives_neutral_rsi(self) -> None:
"""0% change → implied_rsi = 50 (neutral)."""
# formula: 50 + (change_rate * 2.0)
rsi = max(0.0, min(100.0, 50.0 + (0.0 * 2.0)))
assert rsi == 50.0
def test_10pct_change_gives_rsi_70(self) -> None:
"""10% upward change → implied_rsi = 70 (momentum signal)."""
rsi = max(0.0, min(100.0, 50.0 + (10.0 * 2.0)))
assert rsi == 70.0
def test_minus_10pct_gives_rsi_30(self) -> None:
"""-10% change → implied_rsi = 30 (oversold signal)."""
rsi = max(0.0, min(100.0, 50.0 + (-10.0 * 2.0)))
assert rsi == 30.0
def test_saturation_at_25pct(self) -> None:
"""Saturation occurs at >=25% change (not 12.5% as with old coefficient 4.0)."""
rsi_12pct = max(0.0, min(100.0, 50.0 + (12.5 * 2.0)))
rsi_25pct = max(0.0, min(100.0, 50.0 + (25.0 * 2.0)))
rsi_30pct = max(0.0, min(100.0, 50.0 + (30.0 * 2.0)))
# At 12.5% change: RSI = 75 (not 100, unlike old formula)
assert rsi_12pct == 75.0
# At 25%+ saturation
assert rsi_25pct == 100.0
assert rsi_30pct == 100.0 # Capped
def test_negative_saturation(self) -> None:
"""Saturation at -25% gives RSI = 0."""
rsi = max(0.0, min(100.0, 50.0 + (-25.0 * 2.0)))
assert rsi == 0.0
class TestRSICalculation:
"""Test RSI calculation in VolatilityAnalyzer."""

View File

@@ -876,54 +876,6 @@ class TestGetUpdates:
assert updates == []
@pytest.mark.asyncio
async def test_get_updates_409_stops_polling(self) -> None:
"""409 Conflict response stops the poller (_running = False) and returns empty list."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
handler._running = True # simulate active poller
mock_resp = AsyncMock()
mock_resp.status = 409
mock_resp.text = AsyncMock(
return_value='{"ok":false,"error_code":409,"description":"Conflict"}'
)
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp):
updates = await handler._get_updates()
assert updates == []
assert handler._running is False # poller stopped
@pytest.mark.asyncio
async def test_poll_loop_exits_after_409(self) -> None:
"""_poll_loop exits naturally after _running is set to False by a 409 response."""
import asyncio as _asyncio
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
call_count = 0
async def mock_get_updates_409() -> list[dict]:
nonlocal call_count
call_count += 1
# Simulate 409 stopping the poller
handler._running = False
return []
handler._get_updates = mock_get_updates_409 # type: ignore[method-assign]
handler._running = True
task = _asyncio.create_task(handler._poll_loop())
await _asyncio.wait_for(task, timeout=2.0)
# _get_updates called exactly once, then loop exited
assert call_count == 1
assert handler._running is False
class TestCommandWithArgs:
"""Test register_command_with_args and argument dispatch."""