Compare commits

...

5 Commits

Author SHA1 Message Date
agentson
6dbc2afbf4 feat: add Telegram configuration to settings (issue #33)
Some checks failed
CI / test (pull_request) Has been cancelled
Add Telegram notification configuration:
- src/config.py: Add TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID, TELEGRAM_ENABLED
- .env.example: Add Telegram section with setup instructions

Fields added after S3_REGION (line 55).
Follows existing optional API pattern (NEWS_API_KEY, etc.).
No breaking changes to existing settings.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-04 21:34:05 +09:00
6c96f9ac64 Merge pull request 'test: add comprehensive TelegramClient tests (issue #32)' (#37) from feature/issue-32-telegram-tests into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #37
2026-02-04 21:33:19 +09:00
agentson
ed26915562 test: add comprehensive TelegramClient tests (issue #32)
Some checks failed
CI / test (pull_request) Has been cancelled
Add 15 tests across 5 test classes:
- TestTelegramClientInit (4 tests): disabled scenarios, enabled with credentials
- TestNotificationSending (6 tests): disabled mode, message format, API errors, timeouts, session management
- TestRateLimiting (1 test): rate limiter enforcement
- TestMessagePriorities (2 tests): priority emoji verification
- TestClientCleanup (2 tests): session cleanup

Uses pytest.mark.asyncio for async tests.
Mocks aiohttp responses with AsyncMock.
Follows test patterns from test_broker.py.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-04 21:32:24 +09:00
628a572c70 Merge pull request 'feat: implement TelegramClient core module (issue #31)' (#36) from feature/issue-31-telegram-client into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #36
2026-02-04 21:30:50 +09:00
agentson
73e1d0a54e feat: implement TelegramClient core module (issue #31)
Some checks failed
CI / test (pull_request) Has been cancelled
Add TelegramClient for real-time trading notifications:
- NotificationPriority enum (LOW/MEDIUM/HIGH/CRITICAL)
- LeakyBucket rate limiter (1 msg/sec)
- 8 notification methods (trade, circuit breaker, fat finger, market open/close, system start/shutdown, errors)
- Graceful degradation (optional API, never crashes)
- Session management pattern from KISBroker
- Comprehensive README with setup guide and troubleshooting

Follows NewsAPI pattern for optional APIs.
Uses existing aiohttp dependency.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-04 21:29:46 +09:00
6 changed files with 824 additions and 0 deletions

View File

@@ -26,3 +26,10 @@ MODE=paper
# NEWS_API_KEY=your_news_api_key_here
# NEWS_API_PROVIDER=alphavantage
# MARKET_DATA_API_KEY=your_market_data_key_here
# Telegram Notifications (optional)
# Get bot token from @BotFather on Telegram
# Get chat ID from @userinfobot or your chat
# TELEGRAM_BOT_TOKEN=1234567890:ABCdefGHIjklMNOpqrsTUVwxyz
# TELEGRAM_CHAT_ID=123456789
# TELEGRAM_ENABLED=true

View File

@@ -54,6 +54,11 @@ class Settings(BaseSettings):
S3_BUCKET_NAME: str | None = None
S3_REGION: str = "us-east-1"
# Telegram Notifications (optional)
TELEGRAM_BOT_TOKEN: str | None = None
TELEGRAM_CHAT_ID: str | None = None
TELEGRAM_ENABLED: bool = True
model_config = {"env_file": ".env", "env_file_encoding": "utf-8"}
@property

213
src/notifications/README.md Normal file
View File

@@ -0,0 +1,213 @@
# Telegram Notifications
Real-time trading event notifications via Telegram Bot API.
## Setup
### 1. Create a Telegram Bot
1. Open Telegram and message [@BotFather](https://t.me/BotFather)
2. Send `/newbot` command
3. Follow prompts to name your bot
4. Save the **bot token** (looks like `1234567890:ABCdefGHIjklMNOpqrsTUVwxyz`)
### 2. Get Your Chat ID
**Option A: Using @userinfobot**
1. Message [@userinfobot](https://t.me/userinfobot) on Telegram
2. Send `/start`
3. Save your numeric **chat ID** (e.g., `123456789`)
**Option B: Using @RawDataBot**
1. Message [@RawDataBot](https://t.me/rawdatabot) on Telegram
2. Look for `"id":` in the JSON response
3. Save your numeric **chat ID**
### 3. Configure Environment
Add to your `.env` file:
```bash
TELEGRAM_BOT_TOKEN=1234567890:ABCdefGHIjklMNOpqrsTUVwxyz
TELEGRAM_CHAT_ID=123456789
TELEGRAM_ENABLED=true
```
### 4. Test the Bot
Start a conversation with your bot on Telegram first (send `/start`), then run:
```bash
python -m src.main --mode=paper
```
You should receive a startup notification.
## Message Examples
### Trade Execution
```
🟢 BUY
Symbol: AAPL (United States)
Quantity: 10 shares
Price: 150.25
Confidence: 85%
```
### Circuit Breaker
```
🚨 CIRCUIT BREAKER TRIPPED
P&L: -3.15% (threshold: -3.0%)
Trading halted for safety
```
### Fat-Finger Protection
```
⚠️ Fat-Finger Protection
Order rejected: TSLA
Attempted: 45.0% of cash
Max allowed: 30%
Amount: 45,000 / 100,000
```
### Market Open/Close
```
Market Open
Korea trading session started
Market Close
Korea trading session ended
📈 P&L: +1.25%
```
### System Status
```
📝 System Started
Mode: PAPER
Markets: KRX, NASDAQ
System Shutdown
Normal shutdown
```
## Notification Priorities
| Priority | Emoji | Use Case |
|----------|-------|----------|
| LOW | | Market open/close |
| MEDIUM | 📊 | Trade execution, system start/stop |
| HIGH | ⚠️ | Fat-finger protection, errors |
| CRITICAL | 🚨 | Circuit breaker trips |
## Rate Limiting
- Default: 1 message per second
- Prevents hitting Telegram's global rate limits
- Configurable via `rate_limit` parameter
## Troubleshooting
### No notifications received
1. **Check bot configuration**
```bash
# Verify env variables are set
grep TELEGRAM .env
```
2. **Start conversation with bot**
- Open bot in Telegram
- Send `/start` command
- Bot cannot message users who haven't started a conversation
3. **Check logs**
```bash
# Look for Telegram-related errors
python -m src.main --mode=paper 2>&1 | grep -i telegram
```
4. **Verify bot token**
```bash
curl https://api.telegram.org/bot<YOUR_TOKEN>/getMe
# Should return bot info (not 401 error)
```
5. **Verify chat ID**
```bash
curl -X POST https://api.telegram.org/bot<YOUR_TOKEN>/sendMessage \
-H 'Content-Type: application/json' \
-d '{"chat_id": "<YOUR_CHAT_ID>", "text": "Test"}'
# Should send a test message
```
### Notifications delayed
- Check rate limiter settings
- Verify network connection
- Look for timeout errors in logs
### "Chat not found" error
- Incorrect chat ID
- Bot blocked by user
- Need to send `/start` to bot first
### "Unauthorized" error
- Invalid bot token
- Token revoked (regenerate with @BotFather)
## Graceful Degradation
The system works without Telegram notifications:
- Missing credentials → notifications disabled automatically
- API errors → logged but trading continues
- Network timeouts → trading loop unaffected
- Rate limiting → messages queued, trading proceeds
**Notifications never crash the trading system.**
## Security Notes
- Never commit `.env` file with credentials
- Bot token grants full bot control
- Chat ID is not sensitive (just a number)
- Messages are sent over HTTPS
- No trading credentials in notifications
## Advanced Usage
### Group Notifications
1. Add bot to Telegram group
2. Get group chat ID (negative number like `-123456789`)
3. Use group chat ID in `TELEGRAM_CHAT_ID`
### Multiple Recipients
Create multiple bots or use a broadcast group with multiple members.
### Custom Rate Limits
Not currently exposed in config, but can be modified in code:
```python
telegram = TelegramClient(
bot_token=settings.TELEGRAM_BOT_TOKEN,
chat_id=settings.TELEGRAM_CHAT_ID,
rate_limit=2.0, # 2 messages per second
)
```
## API Reference
See `telegram_client.py` for full API documentation.
Key methods:
- `notify_trade_execution()` - Trade alerts
- `notify_circuit_breaker()` - Emergency stops
- `notify_fat_finger()` - Order rejections
- `notify_market_open/close()` - Session tracking
- `notify_system_start/shutdown()` - Lifecycle events
- `notify_error()` - Error alerts

View File

@@ -0,0 +1,5 @@
"""Real-time notifications for trading events."""
from src.notifications.telegram_client import TelegramClient
__all__ = ["TelegramClient"]

View File

@@ -0,0 +1,325 @@
"""Telegram notification client for real-time trading alerts."""
import asyncio
import logging
import time
from dataclasses import dataclass
from enum import Enum
import aiohttp
logger = logging.getLogger(__name__)
class NotificationPriority(Enum):
"""Priority levels for notifications with emoji indicators."""
LOW = ("", "info")
MEDIUM = ("📊", "medium")
HIGH = ("⚠️", "warning")
CRITICAL = ("🚨", "critical")
def __init__(self, emoji: str, label: str) -> None:
self.emoji = emoji
self.label = label
class LeakyBucket:
"""Rate limiter using leaky bucket algorithm."""
def __init__(self, rate: float, capacity: int = 1) -> None:
"""
Initialize rate limiter.
Args:
rate: Maximum requests per second
capacity: Bucket capacity (burst size)
"""
self._rate = rate
self._capacity = capacity
self._tokens = float(capacity)
self._last_update = time.monotonic()
self._lock = asyncio.Lock()
async def acquire(self) -> None:
"""Wait until a token is available, then consume it."""
async with self._lock:
now = time.monotonic()
elapsed = now - self._last_update
self._tokens = min(self._capacity, self._tokens + elapsed * self._rate)
self._last_update = now
if self._tokens < 1.0:
wait_time = (1.0 - self._tokens) / self._rate
await asyncio.sleep(wait_time)
self._tokens = 0.0
else:
self._tokens -= 1.0
@dataclass
class NotificationMessage:
"""Internal notification message structure."""
priority: NotificationPriority
message: str
class TelegramClient:
"""Telegram Bot API client for sending trading notifications."""
API_BASE = "https://api.telegram.org/bot{token}"
DEFAULT_TIMEOUT = 5.0 # seconds
DEFAULT_RATE = 1.0 # messages per second
def __init__(
self,
bot_token: str | None = None,
chat_id: str | None = None,
enabled: bool = True,
rate_limit: float = DEFAULT_RATE,
) -> None:
"""
Initialize Telegram client.
Args:
bot_token: Telegram bot token from @BotFather
chat_id: Target chat ID (user or group)
enabled: Enable/disable notifications globally
rate_limit: Maximum messages per second
"""
self._bot_token = bot_token
self._chat_id = chat_id
self._enabled = enabled
self._rate_limiter = LeakyBucket(rate=rate_limit)
self._session: aiohttp.ClientSession | None = None
if not enabled:
logger.info("Telegram notifications disabled via configuration")
elif bot_token is None or chat_id is None:
logger.warning(
"Telegram notifications disabled (missing bot_token or chat_id)"
)
self._enabled = False
else:
logger.info("Telegram notifications enabled for chat_id=%s", chat_id)
def _get_session(self) -> aiohttp.ClientSession:
"""Get or create aiohttp session."""
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=self.DEFAULT_TIMEOUT)
)
return self._session
async def close(self) -> None:
"""Close HTTP session."""
if self._session is not None and not self._session.closed:
await self._session.close()
async def _send_notification(self, msg: NotificationMessage) -> None:
"""
Send notification to Telegram with graceful degradation.
Args:
msg: Notification message to send
"""
if not self._enabled:
return
try:
await self._rate_limiter.acquire()
formatted_message = f"{msg.priority.emoji} {msg.message}"
url = f"{self.API_BASE.format(token=self._bot_token)}/sendMessage"
payload = {
"chat_id": self._chat_id,
"text": formatted_message,
"parse_mode": "HTML",
}
session = self._get_session()
async with session.post(url, json=payload) as resp:
if resp.status != 200:
error_text = await resp.text()
logger.error(
"Telegram API error (status=%d): %s", resp.status, error_text
)
else:
logger.debug("Telegram notification sent: %s", msg.message[:50])
except asyncio.TimeoutError:
logger.error("Telegram notification timeout")
except aiohttp.ClientError as exc:
logger.error("Telegram notification failed: %s", exc)
except Exception as exc:
logger.error("Unexpected error sending notification: %s", exc)
async def notify_trade_execution(
self,
stock_code: str,
market: str,
action: str,
quantity: int,
price: float,
confidence: float,
) -> None:
"""
Notify trade execution.
Args:
stock_code: Stock ticker symbol
market: Market name (e.g., "Korea", "United States")
action: "BUY" or "SELL"
quantity: Number of shares
price: Execution price
confidence: AI confidence level (0-100)
"""
emoji = "🟢" if action == "BUY" else "🔴"
message = (
f"<b>{emoji} {action}</b>\n"
f"Symbol: <code>{stock_code}</code> ({market})\n"
f"Quantity: {quantity:,} shares\n"
f"Price: {price:,.2f}\n"
f"Confidence: {confidence:.0f}%"
)
await self._send_notification(
NotificationMessage(priority=NotificationPriority.MEDIUM, message=message)
)
async def notify_market_open(self, market_name: str) -> None:
"""
Notify market opening.
Args:
market_name: Name of the market (e.g., "Korea", "United States")
"""
message = f"<b>Market Open</b>\n{market_name} trading session started"
await self._send_notification(
NotificationMessage(priority=NotificationPriority.LOW, message=message)
)
async def notify_market_close(self, market_name: str, pnl_pct: float) -> None:
"""
Notify market closing.
Args:
market_name: Name of the market
pnl_pct: Final P&L percentage for the session
"""
pnl_sign = "+" if pnl_pct >= 0 else ""
pnl_emoji = "📈" if pnl_pct >= 0 else "📉"
message = (
f"<b>Market Close</b>\n"
f"{market_name} trading session ended\n"
f"{pnl_emoji} P&L: {pnl_sign}{pnl_pct:.2f}%"
)
await self._send_notification(
NotificationMessage(priority=NotificationPriority.LOW, message=message)
)
async def notify_circuit_breaker(
self, pnl_pct: float, threshold: float
) -> None:
"""
Notify circuit breaker activation.
Args:
pnl_pct: Current P&L percentage
threshold: Circuit breaker threshold
"""
message = (
f"<b>CIRCUIT BREAKER TRIPPED</b>\n"
f"P&L: {pnl_pct:.2f}% (threshold: {threshold:.1f}%)\n"
f"Trading halted for safety"
)
await self._send_notification(
NotificationMessage(priority=NotificationPriority.CRITICAL, message=message)
)
async def notify_fat_finger(
self,
stock_code: str,
order_amount: float,
total_cash: float,
max_pct: float,
) -> None:
"""
Notify fat-finger protection rejection.
Args:
stock_code: Stock ticker symbol
order_amount: Attempted order amount
total_cash: Total available cash
max_pct: Maximum allowed percentage
"""
attempted_pct = (order_amount / total_cash) * 100 if total_cash > 0 else 0
message = (
f"<b>Fat-Finger Protection</b>\n"
f"Order rejected: <code>{stock_code}</code>\n"
f"Attempted: {attempted_pct:.1f}% of cash\n"
f"Max allowed: {max_pct:.0f}%\n"
f"Amount: {order_amount:,.0f} / {total_cash:,.0f}"
)
await self._send_notification(
NotificationMessage(priority=NotificationPriority.HIGH, message=message)
)
async def notify_system_start(
self, mode: str, enabled_markets: list[str]
) -> None:
"""
Notify system startup.
Args:
mode: Trading mode ("paper" or "live")
enabled_markets: List of enabled market codes
"""
mode_emoji = "📝" if mode == "paper" else "💰"
markets_str = ", ".join(enabled_markets)
message = (
f"<b>{mode_emoji} System Started</b>\n"
f"Mode: {mode.upper()}\n"
f"Markets: {markets_str}"
)
await self._send_notification(
NotificationMessage(priority=NotificationPriority.MEDIUM, message=message)
)
async def notify_system_shutdown(self, reason: str) -> None:
"""
Notify system shutdown.
Args:
reason: Reason for shutdown (e.g., "Normal shutdown", "Circuit breaker")
"""
message = f"<b>System Shutdown</b>\n{reason}"
priority = (
NotificationPriority.CRITICAL
if "circuit breaker" in reason.lower()
else NotificationPriority.MEDIUM
)
await self._send_notification(
NotificationMessage(priority=priority, message=message)
)
async def notify_error(
self, error_type: str, error_msg: str, context: str
) -> None:
"""
Notify system error.
Args:
error_type: Type of error (e.g., "Connection Error")
error_msg: Error message
context: Error context (e.g., stock code, market)
"""
message = (
f"<b>Error: {error_type}</b>\n"
f"Context: {context}\n"
f"Message: {error_msg[:200]}" # Truncate long errors
)
await self._send_notification(
NotificationMessage(priority=NotificationPriority.HIGH, message=message)
)

269
tests/test_telegram.py Normal file
View File

@@ -0,0 +1,269 @@
"""Tests for Telegram notification client."""
from unittest.mock import AsyncMock, patch
import aiohttp
import pytest
from src.notifications.telegram_client import NotificationPriority, TelegramClient
class TestTelegramClientInit:
"""Test client initialization scenarios."""
def test_disabled_via_flag(self) -> None:
"""Client disabled via enabled=False flag."""
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=False
)
assert client._enabled is False
def test_disabled_missing_token(self) -> None:
"""Client disabled when bot_token is None."""
client = TelegramClient(bot_token=None, chat_id="456", enabled=True)
assert client._enabled is False
def test_disabled_missing_chat_id(self) -> None:
"""Client disabled when chat_id is None."""
client = TelegramClient(bot_token="123:abc", chat_id=None, enabled=True)
assert client._enabled is False
def test_enabled_with_credentials(self) -> None:
"""Client enabled when credentials provided."""
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True
)
assert client._enabled is True
class TestNotificationSending:
"""Test notification sending behavior."""
@pytest.mark.asyncio
async def test_no_send_when_disabled(self) -> None:
"""Notifications not sent when client disabled."""
client = TelegramClient(enabled=False)
with patch("aiohttp.ClientSession.post") as mock_post:
await client.notify_trade_execution(
stock_code="AAPL",
market="United States",
action="BUY",
quantity=10,
price=150.0,
confidence=85.0,
)
mock_post.assert_not_called()
@pytest.mark.asyncio
async def test_trade_execution_format(self) -> None:
"""Trade notification has correct format."""
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True
)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
await client.notify_trade_execution(
stock_code="TSLA",
market="United States",
action="SELL",
quantity=5,
price=250.50,
confidence=92.0,
)
# Verify API call was made
assert mock_post.call_count == 1
call_args = mock_post.call_args
# Check payload structure
payload = call_args.kwargs["json"]
assert payload["chat_id"] == "456"
assert "TSLA" in payload["text"]
assert "SELL" in payload["text"]
assert "5" in payload["text"]
assert "250.50" in payload["text"]
assert "92%" in payload["text"]
@pytest.mark.asyncio
async def test_circuit_breaker_priority(self) -> None:
"""Circuit breaker uses CRITICAL priority."""
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True
)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
await client.notify_circuit_breaker(pnl_pct=-3.15, threshold=-3.0)
payload = mock_post.call_args.kwargs["json"]
# CRITICAL priority has 🚨 emoji
assert NotificationPriority.CRITICAL.emoji in payload["text"]
assert "-3.15%" in payload["text"]
@pytest.mark.asyncio
async def test_api_error_handling(self) -> None:
"""API errors logged but don't crash."""
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True
)
mock_resp = AsyncMock()
mock_resp.status = 400
mock_resp.text = AsyncMock(return_value="Bad Request")
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp):
# Should not raise exception
await client.notify_system_start(mode="paper", enabled_markets=["KR"])
@pytest.mark.asyncio
async def test_timeout_handling(self) -> None:
"""Timeouts logged but don't crash."""
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True
)
with patch(
"aiohttp.ClientSession.post",
side_effect=aiohttp.ClientError("Connection timeout"),
):
# Should not raise exception
await client.notify_error(
error_type="Test Error", error_msg="Test", context="test"
)
@pytest.mark.asyncio
async def test_session_management(self) -> None:
"""Session created and reused correctly."""
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True
)
# Session should be None initially
assert client._session is None
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp):
await client.notify_market_open("Korea")
# Session should be created
assert client._session is not None
session1 = client._session
await client.notify_market_close("Korea", 1.5)
# Same session should be reused
assert client._session is session1
class TestRateLimiting:
"""Test rate limiter behavior."""
@pytest.mark.asyncio
async def test_rate_limiter_enforced(self) -> None:
"""Rate limiter delays rapid requests."""
import time
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True, rate_limit=2.0
)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp):
start = time.monotonic()
# Send 3 messages (rate: 2/sec = 0.5s per message)
await client.notify_market_open("Korea")
await client.notify_market_open("United States")
await client.notify_market_open("Japan")
elapsed = time.monotonic() - start
# Should take at least 0.4 seconds (3 msgs at 2/sec with some tolerance)
assert elapsed >= 0.4
class TestMessagePriorities:
"""Test priority-based messaging."""
@pytest.mark.asyncio
async def test_low_priority_uses_info_emoji(self) -> None:
"""LOW priority uses emoji."""
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True
)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
await client.notify_market_open("Korea")
payload = mock_post.call_args.kwargs["json"]
assert NotificationPriority.LOW.emoji in payload["text"]
@pytest.mark.asyncio
async def test_critical_priority_uses_alarm_emoji(self) -> None:
"""CRITICAL priority uses 🚨 emoji."""
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True
)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
await client.notify_system_shutdown("Circuit breaker tripped")
payload = mock_post.call_args.kwargs["json"]
assert NotificationPriority.CRITICAL.emoji in payload["text"]
class TestClientCleanup:
"""Test client cleanup behavior."""
@pytest.mark.asyncio
async def test_close_closes_session(self) -> None:
"""close() closes the HTTP session."""
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True
)
mock_session = AsyncMock()
mock_session.closed = False
mock_session.close = AsyncMock()
client._session = mock_session
await client.close()
mock_session.close.assert_called_once()
@pytest.mark.asyncio
async def test_close_handles_no_session(self) -> None:
"""close() handles None session gracefully."""
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True
)
# Should not raise exception
await client.close()