Compare commits

..

2 Commits

Author SHA1 Message Date
agentson
aceba86186 fix: Telegram 409 감지 시 백오프 대신 polling 즉시 종료 (#180)
Some checks failed
CI / test (pull_request) Has been cancelled
409 충돌 감지 시 30초 백오프 후 재시도하는 방식에서
_running = False로 polling을 즉시 중단하는 방식으로 변경.

다중 인스턴스가 실행 중인 경우 재시도는 의미 없고 충돌만 반복됨.
이제 409 발생 시 이 프로세스의 Telegram 명령어 polling을 완전히 비활성화.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-20 09:35:33 +09:00
agentson
77577f3f4d fix: Telegram 409 충돌 시 WARNING 로그 + 30초 백오프 적용 (#180)
Some checks failed
CI / test (pull_request) Has been cancelled
다중 인스턴스 실행 시 Telegram getUpdates 409 응답을 ERROR가 아닌 WARNING으로
처리하고, 30초 동안 polling을 일시 중단하여 충돌을 완화.

- _conflict_backoff_until 속성 추가
- 409 감지 시 명확한 "another instance is polling" 메시지 출력
- poll_loop에서 백오프 활성 중 polling 스킵
- TestGetUpdates에 409 관련 테스트 2개 추가

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-20 09:31:04 +09:00
4 changed files with 64 additions and 307 deletions

View File

@@ -81,7 +81,6 @@ def safe_float(value: str | float | None, default: float = 0.0) -> float:
TRADE_INTERVAL_SECONDS = 60
SCAN_INTERVAL_SECONDS = 60 # Scan markets every 60 seconds
MAX_CONNECTION_RETRIES = 3
_BUY_COOLDOWN_SECONDS = 600 # 10-minute cooldown after insufficient-balance rejection
# Daily trading mode constants (for Free tier API efficiency)
DAILY_TRADE_SESSIONS = 4 # Number of trading sessions per day
@@ -299,7 +298,6 @@ async def trading_cycle(
stock_code: str,
scan_candidates: dict[str, dict[str, ScanCandidate]],
settings: Settings | None = None,
buy_cooldown: dict[str, float] | None = None,
) -> None:
"""Execute one trading cycle for a single stock."""
cycle_start_time = asyncio.get_event_loop().time()
@@ -644,22 +642,7 @@ async def trading_cycle(
return
order_amount = current_price * quantity
# 4. Check BUY cooldown (set when a prior BUY failed due to insufficient balance)
if decision.action == "BUY" and buy_cooldown is not None:
cooldown_key = f"{market.code}:{stock_code}"
cooldown_until = buy_cooldown.get(cooldown_key, 0.0)
now = asyncio.get_event_loop().time()
if now < cooldown_until:
remaining = int(cooldown_until - now)
logger.info(
"Skip BUY %s (%s): insufficient-balance cooldown active (%ds remaining)",
stock_code,
market.name,
remaining,
)
return
# 5a. Risk check BEFORE order
# 4. Risk check BEFORE order
try:
risk.validate_order(
current_pnl_pct=pnl_pct,
@@ -707,24 +690,12 @@ async def trading_cycle(
# Check if KIS rejected the order (rt_cd != "0")
if result.get("rt_cd", "") != "0":
order_succeeded = False
msg1 = result.get("msg1") or ""
logger.warning(
"Overseas order not accepted for %s: rt_cd=%s msg=%s",
stock_code,
result.get("rt_cd"),
msg1,
result.get("msg1"),
)
# Set BUY cooldown when the rejection is due to insufficient balance
if decision.action == "BUY" and buy_cooldown is not None and "주문가능금액" in msg1:
cooldown_key = f"{market.code}:{stock_code}"
buy_cooldown[cooldown_key] = (
asyncio.get_event_loop().time() + _BUY_COOLDOWN_SECONDS
)
logger.info(
"BUY cooldown set for %s: %.0fs (insufficient balance)",
stock_code,
_BUY_COOLDOWN_SECONDS,
)
logger.info("Order result: %s", result.get("msg1", "OK"))
# 5.5. Notify trade execution (only on success)
@@ -832,9 +803,6 @@ async def run_daily_session(
logger.info("Starting daily trading session for %d markets", len(open_markets))
# BUY cooldown: prevents retrying stocks rejected for insufficient balance
daily_buy_cooldown: dict[str, float] = {} # "{market_code}:{stock_code}" -> expiry timestamp
# Process each open market
for market in open_markets:
# Use market-local date for playbook keying
@@ -1107,21 +1075,6 @@ async def run_daily_session(
continue
order_amount = stock_data["current_price"] * quantity
# Check BUY cooldown (insufficient balance)
if decision.action == "BUY":
daily_cooldown_key = f"{market.code}:{stock_code}"
daily_cooldown_until = daily_buy_cooldown.get(daily_cooldown_key, 0.0)
now = asyncio.get_event_loop().time()
if now < daily_cooldown_until:
remaining = int(daily_cooldown_until - now)
logger.info(
"Skip BUY %s (%s): insufficient-balance cooldown active (%ds remaining)",
stock_code,
market.name,
remaining,
)
continue
# Risk check
try:
risk.validate_order(
@@ -1178,23 +1131,12 @@ async def run_daily_session(
)
if result.get("rt_cd", "") != "0":
order_succeeded = False
daily_msg1 = result.get("msg1") or ""
logger.warning(
"Overseas order not accepted for %s: rt_cd=%s msg=%s",
stock_code,
result.get("rt_cd"),
daily_msg1,
result.get("msg1"),
)
if decision.action == "BUY" and "주문가능금액" in daily_msg1:
daily_cooldown_key = f"{market.code}:{stock_code}"
daily_buy_cooldown[daily_cooldown_key] = (
asyncio.get_event_loop().time() + _BUY_COOLDOWN_SECONDS
)
logger.info(
"BUY cooldown set for %s: %.0fs (insufficient balance)",
stock_code,
_BUY_COOLDOWN_SECONDS,
)
logger.info("Order result: %s", result.get("msg1", "OK"))
# Notify trade execution (only on success)
@@ -1813,9 +1755,6 @@ async def run(settings: Settings) -> None:
# Active stocks per market (dynamically discovered by scanner)
active_stocks: dict[str, list[str]] = {} # market_code -> [stock_codes]
# BUY cooldown: prevents retrying a stock rejected for insufficient balance
buy_cooldown: dict[str, float] = {} # "{market_code}:{stock_code}" -> expiry timestamp
# Initialize latency control system
criticality_assessor = CriticalityAssessor(
critical_pnl_threshold=-2.5, # Near circuit breaker at -3.0%
@@ -2158,7 +2097,6 @@ async def run(settings: Settings) -> None:
stock_code,
scan_candidates,
settings,
buy_cooldown,
)
break # Success — exit retry loop
except CircuitBreakerTripped as exc:

View File

@@ -604,9 +604,19 @@ class TelegramCommandHandler:
async with session.post(url, json=payload) as resp:
if resp.status != 200:
error_text = await resp.text()
logger.error(
"getUpdates API error (status=%d): %s", resp.status, error_text
)
if resp.status == 409:
# Another bot instance is already polling — stop this poller entirely.
# Retrying would keep conflicting with the other instance.
self._running = False
logger.warning(
"Telegram conflict (409): another instance is already polling. "
"Disabling Telegram commands for this process. "
"Ensure only one instance of The Ouroboros is running at a time.",
)
else:
logger.error(
"getUpdates API error (status=%d): %s", resp.status, error_text
)
return []
data = await resp.json()

View File

@@ -2194,245 +2194,6 @@ def test_start_dashboard_server_enabled_starts_thread() -> None:
mock_thread.start.assert_called_once()
# ---------------------------------------------------------------------------
# BUY cooldown tests (#179)
# ---------------------------------------------------------------------------
class TestBuyCooldown:
"""Tests for BUY cooldown after insufficient-balance rejection."""
@pytest.fixture
def mock_broker(self) -> MagicMock:
broker = MagicMock()
broker.get_current_price = AsyncMock(return_value=(100.0, 1.0, 0.0))
broker.get_balance = AsyncMock(
return_value={
"output2": [{"tot_evlu_amt": "1000000", "dnca_tot_amt": "500000",
"pchs_amt_smtl_amt": "500000"}]
}
)
broker.send_order = AsyncMock(return_value={"msg1": "OK"})
return broker
@pytest.fixture
def mock_market(self) -> MagicMock:
market = MagicMock()
market.name = "Korea"
market.code = "KR"
market.exchange_code = "KRX"
market.is_domestic = True
return market
@pytest.fixture
def mock_overseas_market(self) -> MagicMock:
market = MagicMock()
market.name = "NASDAQ"
market.code = "US_NASDAQ"
market.exchange_code = "NAS"
market.is_domestic = False
return market
@pytest.fixture
def mock_overseas_broker(self) -> MagicMock:
broker = MagicMock()
broker.get_overseas_price = AsyncMock(
return_value={"output": {"last": "1.0", "rate": "0.0",
"high": "1.05", "low": "0.95", "tvol": "1000000"}}
)
broker.get_overseas_balance = AsyncMock(return_value={
"output1": [],
"output2": [{"frcr_dncl_amt_2": "50000", "frcr_evlu_tota": "50000",
"frcr_buy_amt_smtl": "0"}],
})
broker.send_overseas_order = AsyncMock(
return_value={"rt_cd": "1", "msg1": "모의투자 주문가능금액이 부족합니다."}
)
return broker
def _make_buy_match_overseas(self, stock_code: str = "MLECW") -> ScenarioMatch:
return ScenarioMatch(
stock_code=stock_code,
matched_scenario=None,
action=ScenarioAction.BUY,
confidence=85,
rationale="Test buy",
)
@pytest.mark.asyncio
async def test_cooldown_set_on_insufficient_balance(
self, mock_broker: MagicMock, mock_overseas_broker: MagicMock,
mock_overseas_market: MagicMock,
) -> None:
"""BUY cooldown entry is created after 주문가능금액 rejection."""
engine = MagicMock(spec=ScenarioEngine)
engine.evaluate = MagicMock(return_value=self._make_buy_match_overseas("MLECW"))
buy_cooldown: dict[str, float] = {}
with patch("src.main.log_trade"):
await trading_cycle(
broker=mock_broker,
overseas_broker=mock_overseas_broker,
scenario_engine=engine,
playbook=_make_playbook("US_NASDAQ"),
risk=MagicMock(),
db_conn=MagicMock(),
decision_logger=MagicMock(),
context_store=MagicMock(get_latest_timeframe=MagicMock(return_value=None)),
criticality_assessor=MagicMock(
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
get_timeout=MagicMock(return_value=5.0),
),
telegram=MagicMock(
notify_trade_execution=AsyncMock(),
notify_fat_finger=AsyncMock(),
notify_circuit_breaker=AsyncMock(),
notify_scenario_matched=AsyncMock(),
),
market=mock_overseas_market,
stock_code="MLECW",
scan_candidates={},
buy_cooldown=buy_cooldown,
)
assert "US_NASDAQ:MLECW" in buy_cooldown
assert buy_cooldown["US_NASDAQ:MLECW"] > 0
@pytest.mark.asyncio
async def test_cooldown_skips_buy(
self, mock_broker: MagicMock, mock_overseas_broker: MagicMock,
mock_overseas_market: MagicMock,
) -> None:
"""BUY is skipped when cooldown is active for the stock."""
engine = MagicMock(spec=ScenarioEngine)
engine.evaluate = MagicMock(return_value=self._make_buy_match_overseas("MLECW"))
import asyncio
# Set an active cooldown (expires far in the future)
buy_cooldown: dict[str, float] = {
"US_NASDAQ:MLECW": asyncio.get_event_loop().time() + 600
}
with patch("src.main.log_trade"):
await trading_cycle(
broker=mock_broker,
overseas_broker=mock_overseas_broker,
scenario_engine=engine,
playbook=_make_playbook("US_NASDAQ"),
risk=MagicMock(),
db_conn=MagicMock(),
decision_logger=MagicMock(),
context_store=MagicMock(get_latest_timeframe=MagicMock(return_value=None)),
criticality_assessor=MagicMock(
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
get_timeout=MagicMock(return_value=5.0),
),
telegram=MagicMock(
notify_trade_execution=AsyncMock(),
notify_fat_finger=AsyncMock(),
notify_circuit_breaker=AsyncMock(),
notify_scenario_matched=AsyncMock(),
),
market=mock_overseas_market,
stock_code="MLECW",
scan_candidates={},
buy_cooldown=buy_cooldown,
)
# Order should NOT have been sent
mock_overseas_broker.send_overseas_order.assert_not_called()
@pytest.mark.asyncio
async def test_cooldown_not_set_on_other_errors(
self, mock_broker: MagicMock, mock_overseas_market: MagicMock,
) -> None:
"""Cooldown is NOT set for non-balance-related rejections."""
engine = MagicMock(spec=ScenarioEngine)
engine.evaluate = MagicMock(return_value=self._make_buy_match_overseas("MLECW"))
# Different rejection reason
overseas_broker = MagicMock()
overseas_broker.get_overseas_price = AsyncMock(
return_value={"output": {"last": "1.0", "rate": "0.0",
"high": "1.05", "low": "0.95", "tvol": "1000000"}}
)
overseas_broker.get_overseas_balance = AsyncMock(return_value={
"output1": [],
"output2": [{"frcr_dncl_amt_2": "50000", "frcr_evlu_tota": "50000",
"frcr_buy_amt_smtl": "0"}],
})
overseas_broker.send_overseas_order = AsyncMock(
return_value={"rt_cd": "1", "msg1": "기타 오류 메시지"}
)
buy_cooldown: dict[str, float] = {}
with patch("src.main.log_trade"):
await trading_cycle(
broker=mock_broker,
overseas_broker=overseas_broker,
scenario_engine=engine,
playbook=_make_playbook("US_NASDAQ"),
risk=MagicMock(),
db_conn=MagicMock(),
decision_logger=MagicMock(),
context_store=MagicMock(get_latest_timeframe=MagicMock(return_value=None)),
criticality_assessor=MagicMock(
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
get_timeout=MagicMock(return_value=5.0),
),
telegram=MagicMock(
notify_trade_execution=AsyncMock(),
notify_fat_finger=AsyncMock(),
notify_circuit_breaker=AsyncMock(),
notify_scenario_matched=AsyncMock(),
),
market=mock_overseas_market,
stock_code="MLECW",
scan_candidates={},
buy_cooldown=buy_cooldown,
)
# Cooldown should NOT be set for non-balance errors
assert "US_NASDAQ:MLECW" not in buy_cooldown
@pytest.mark.asyncio
async def test_no_cooldown_param_still_works(
self, mock_broker: MagicMock, mock_overseas_broker: MagicMock,
mock_overseas_market: MagicMock,
) -> None:
"""trading_cycle works normally when buy_cooldown is None (default)."""
engine = MagicMock(spec=ScenarioEngine)
engine.evaluate = MagicMock(return_value=self._make_buy_match_overseas("MLECW"))
with patch("src.main.log_trade"):
await trading_cycle(
broker=mock_broker,
overseas_broker=mock_overseas_broker,
scenario_engine=engine,
playbook=_make_playbook("US_NASDAQ"),
risk=MagicMock(),
db_conn=MagicMock(),
decision_logger=MagicMock(),
context_store=MagicMock(get_latest_timeframe=MagicMock(return_value=None)),
criticality_assessor=MagicMock(
assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")),
get_timeout=MagicMock(return_value=5.0),
),
telegram=MagicMock(
notify_trade_execution=AsyncMock(),
notify_fat_finger=AsyncMock(),
notify_circuit_breaker=AsyncMock(),
notify_scenario_matched=AsyncMock(),
),
market=mock_overseas_market,
stock_code="MLECW",
scan_candidates={},
# buy_cooldown not passed → defaults to None
)
# Should attempt the order (and fail), but not crash
mock_overseas_broker.send_overseas_order.assert_called_once()
# ---------------------------------------------------------------------------
# market_outlook BUY confidence threshold tests (#173)
# ---------------------------------------------------------------------------

View File

@@ -876,6 +876,54 @@ class TestGetUpdates:
assert updates == []
@pytest.mark.asyncio
async def test_get_updates_409_stops_polling(self) -> None:
"""409 Conflict response stops the poller (_running = False) and returns empty list."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
handler._running = True # simulate active poller
mock_resp = AsyncMock()
mock_resp.status = 409
mock_resp.text = AsyncMock(
return_value='{"ok":false,"error_code":409,"description":"Conflict"}'
)
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp):
updates = await handler._get_updates()
assert updates == []
assert handler._running is False # poller stopped
@pytest.mark.asyncio
async def test_poll_loop_exits_after_409(self) -> None:
"""_poll_loop exits naturally after _running is set to False by a 409 response."""
import asyncio as _asyncio
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
call_count = 0
async def mock_get_updates_409() -> list[dict]:
nonlocal call_count
call_count += 1
# Simulate 409 stopping the poller
handler._running = False
return []
handler._get_updates = mock_get_updates_409 # type: ignore[method-assign]
handler._running = True
task = _asyncio.create_task(handler._poll_loop())
await _asyncio.wait_for(task, timeout=2.0)
# _get_updates called exactly once, then loop exited
assert call_count == 1
assert handler._running is False
class TestCommandWithArgs:
"""Test register_command_with_args and argument dispatch."""