From 73e1d0a54e99bb1556a508a1758dc8bf1f389287 Mon Sep 17 00:00:00 2001 From: agentson Date: Wed, 4 Feb 2026 21:29:46 +0900 Subject: [PATCH] feat: implement TelegramClient core module (issue #31) Add TelegramClient for real-time trading notifications: - NotificationPriority enum (LOW/MEDIUM/HIGH/CRITICAL) - LeakyBucket rate limiter (1 msg/sec) - 8 notification methods (trade, circuit breaker, fat finger, market open/close, system start/shutdown, errors) - Graceful degradation (optional API, never crashes) - Session management pattern from KISBroker - Comprehensive README with setup guide and troubleshooting Follows NewsAPI pattern for optional APIs. Uses existing aiohttp dependency. Co-Authored-By: Claude Sonnet 4.5 --- src/notifications/README.md | 213 ++++++++++++++++++ src/notifications/__init__.py | 5 + src/notifications/telegram_client.py | 325 +++++++++++++++++++++++++++ 3 files changed, 543 insertions(+) create mode 100644 src/notifications/README.md create mode 100644 src/notifications/__init__.py create mode 100644 src/notifications/telegram_client.py diff --git a/src/notifications/README.md b/src/notifications/README.md new file mode 100644 index 0000000..c866ebb --- /dev/null +++ b/src/notifications/README.md @@ -0,0 +1,213 @@ +# Telegram Notifications + +Real-time trading event notifications via Telegram Bot API. + +## Setup + +### 1. Create a Telegram Bot + +1. Open Telegram and message [@BotFather](https://t.me/BotFather) +2. Send `/newbot` command +3. Follow prompts to name your bot +4. Save the **bot token** (looks like `1234567890:ABCdefGHIjklMNOpqrsTUVwxyz`) + +### 2. Get Your Chat ID + +**Option A: Using @userinfobot** +1. Message [@userinfobot](https://t.me/userinfobot) on Telegram +2. Send `/start` +3. Save your numeric **chat ID** (e.g., `123456789`) + +**Option B: Using @RawDataBot** +1. Message [@RawDataBot](https://t.me/rawdatabot) on Telegram +2. Look for `"id":` in the JSON response +3. Save your numeric **chat ID** + +### 3. Configure Environment + +Add to your `.env` file: + +```bash +TELEGRAM_BOT_TOKEN=1234567890:ABCdefGHIjklMNOpqrsTUVwxyz +TELEGRAM_CHAT_ID=123456789 +TELEGRAM_ENABLED=true +``` + +### 4. Test the Bot + +Start a conversation with your bot on Telegram first (send `/start`), then run: + +```bash +python -m src.main --mode=paper +``` + +You should receive a startup notification. + +## Message Examples + +### Trade Execution +``` +đŸŸĸ BUY +Symbol: AAPL (United States) +Quantity: 10 shares +Price: 150.25 +Confidence: 85% +``` + +### Circuit Breaker +``` +🚨 CIRCUIT BREAKER TRIPPED +P&L: -3.15% (threshold: -3.0%) +Trading halted for safety +``` + +### Fat-Finger Protection +``` +âš ī¸ Fat-Finger Protection +Order rejected: TSLA +Attempted: 45.0% of cash +Max allowed: 30% +Amount: 45,000 / 100,000 +``` + +### Market Open/Close +``` +â„šī¸ Market Open +Korea trading session started + +â„šī¸ Market Close +Korea trading session ended +📈 P&L: +1.25% +``` + +### System Status +``` +📝 System Started +Mode: PAPER +Markets: KRX, NASDAQ + +System Shutdown +Normal shutdown +``` + +## Notification Priorities + +| Priority | Emoji | Use Case | +|----------|-------|----------| +| LOW | â„šī¸ | Market open/close | +| MEDIUM | 📊 | Trade execution, system start/stop | +| HIGH | âš ī¸ | Fat-finger protection, errors | +| CRITICAL | 🚨 | Circuit breaker trips | + +## Rate Limiting + +- Default: 1 message per second +- Prevents hitting Telegram's global rate limits +- Configurable via `rate_limit` parameter + +## Troubleshooting + +### No notifications received + +1. **Check bot configuration** + ```bash + # Verify env variables are set + grep TELEGRAM .env + ``` + +2. **Start conversation with bot** + - Open bot in Telegram + - Send `/start` command + - Bot cannot message users who haven't started a conversation + +3. **Check logs** + ```bash + # Look for Telegram-related errors + python -m src.main --mode=paper 2>&1 | grep -i telegram + ``` + +4. **Verify bot token** + ```bash + curl https://api.telegram.org/bot/getMe + # Should return bot info (not 401 error) + ``` + +5. **Verify chat ID** + ```bash + curl -X POST https://api.telegram.org/bot/sendMessage \ + -H 'Content-Type: application/json' \ + -d '{"chat_id": "", "text": "Test"}' + # Should send a test message + ``` + +### Notifications delayed + +- Check rate limiter settings +- Verify network connection +- Look for timeout errors in logs + +### "Chat not found" error + +- Incorrect chat ID +- Bot blocked by user +- Need to send `/start` to bot first + +### "Unauthorized" error + +- Invalid bot token +- Token revoked (regenerate with @BotFather) + +## Graceful Degradation + +The system works without Telegram notifications: + +- Missing credentials → notifications disabled automatically +- API errors → logged but trading continues +- Network timeouts → trading loop unaffected +- Rate limiting → messages queued, trading proceeds + +**Notifications never crash the trading system.** + +## Security Notes + +- Never commit `.env` file with credentials +- Bot token grants full bot control +- Chat ID is not sensitive (just a number) +- Messages are sent over HTTPS +- No trading credentials in notifications + +## Advanced Usage + +### Group Notifications + +1. Add bot to Telegram group +2. Get group chat ID (negative number like `-123456789`) +3. Use group chat ID in `TELEGRAM_CHAT_ID` + +### Multiple Recipients + +Create multiple bots or use a broadcast group with multiple members. + +### Custom Rate Limits + +Not currently exposed in config, but can be modified in code: + +```python +telegram = TelegramClient( + bot_token=settings.TELEGRAM_BOT_TOKEN, + chat_id=settings.TELEGRAM_CHAT_ID, + rate_limit=2.0, # 2 messages per second +) +``` + +## API Reference + +See `telegram_client.py` for full API documentation. + +Key methods: +- `notify_trade_execution()` - Trade alerts +- `notify_circuit_breaker()` - Emergency stops +- `notify_fat_finger()` - Order rejections +- `notify_market_open/close()` - Session tracking +- `notify_system_start/shutdown()` - Lifecycle events +- `notify_error()` - Error alerts diff --git a/src/notifications/__init__.py b/src/notifications/__init__.py new file mode 100644 index 0000000..17e1996 --- /dev/null +++ b/src/notifications/__init__.py @@ -0,0 +1,5 @@ +"""Real-time notifications for trading events.""" + +from src.notifications.telegram_client import TelegramClient + +__all__ = ["TelegramClient"] diff --git a/src/notifications/telegram_client.py b/src/notifications/telegram_client.py new file mode 100644 index 0000000..910e853 --- /dev/null +++ b/src/notifications/telegram_client.py @@ -0,0 +1,325 @@ +"""Telegram notification client for real-time trading alerts.""" + +import asyncio +import logging +import time +from dataclasses import dataclass +from enum import Enum + +import aiohttp + +logger = logging.getLogger(__name__) + + +class NotificationPriority(Enum): + """Priority levels for notifications with emoji indicators.""" + + LOW = ("â„šī¸", "info") + MEDIUM = ("📊", "medium") + HIGH = ("âš ī¸", "warning") + CRITICAL = ("🚨", "critical") + + def __init__(self, emoji: str, label: str) -> None: + self.emoji = emoji + self.label = label + + +class LeakyBucket: + """Rate limiter using leaky bucket algorithm.""" + + def __init__(self, rate: float, capacity: int = 1) -> None: + """ + Initialize rate limiter. + + Args: + rate: Maximum requests per second + capacity: Bucket capacity (burst size) + """ + self._rate = rate + self._capacity = capacity + self._tokens = float(capacity) + self._last_update = time.monotonic() + self._lock = asyncio.Lock() + + async def acquire(self) -> None: + """Wait until a token is available, then consume it.""" + async with self._lock: + now = time.monotonic() + elapsed = now - self._last_update + self._tokens = min(self._capacity, self._tokens + elapsed * self._rate) + self._last_update = now + + if self._tokens < 1.0: + wait_time = (1.0 - self._tokens) / self._rate + await asyncio.sleep(wait_time) + self._tokens = 0.0 + else: + self._tokens -= 1.0 + + +@dataclass +class NotificationMessage: + """Internal notification message structure.""" + + priority: NotificationPriority + message: str + + +class TelegramClient: + """Telegram Bot API client for sending trading notifications.""" + + API_BASE = "https://api.telegram.org/bot{token}" + DEFAULT_TIMEOUT = 5.0 # seconds + DEFAULT_RATE = 1.0 # messages per second + + def __init__( + self, + bot_token: str | None = None, + chat_id: str | None = None, + enabled: bool = True, + rate_limit: float = DEFAULT_RATE, + ) -> None: + """ + Initialize Telegram client. + + Args: + bot_token: Telegram bot token from @BotFather + chat_id: Target chat ID (user or group) + enabled: Enable/disable notifications globally + rate_limit: Maximum messages per second + """ + self._bot_token = bot_token + self._chat_id = chat_id + self._enabled = enabled + self._rate_limiter = LeakyBucket(rate=rate_limit) + self._session: aiohttp.ClientSession | None = None + + if not enabled: + logger.info("Telegram notifications disabled via configuration") + elif bot_token is None or chat_id is None: + logger.warning( + "Telegram notifications disabled (missing bot_token or chat_id)" + ) + self._enabled = False + else: + logger.info("Telegram notifications enabled for chat_id=%s", chat_id) + + def _get_session(self) -> aiohttp.ClientSession: + """Get or create aiohttp session.""" + if self._session is None or self._session.closed: + self._session = aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=self.DEFAULT_TIMEOUT) + ) + return self._session + + async def close(self) -> None: + """Close HTTP session.""" + if self._session is not None and not self._session.closed: + await self._session.close() + + async def _send_notification(self, msg: NotificationMessage) -> None: + """ + Send notification to Telegram with graceful degradation. + + Args: + msg: Notification message to send + """ + if not self._enabled: + return + + try: + await self._rate_limiter.acquire() + + formatted_message = f"{msg.priority.emoji} {msg.message}" + url = f"{self.API_BASE.format(token=self._bot_token)}/sendMessage" + + payload = { + "chat_id": self._chat_id, + "text": formatted_message, + "parse_mode": "HTML", + } + + session = self._get_session() + async with session.post(url, json=payload) as resp: + if resp.status != 200: + error_text = await resp.text() + logger.error( + "Telegram API error (status=%d): %s", resp.status, error_text + ) + else: + logger.debug("Telegram notification sent: %s", msg.message[:50]) + + except asyncio.TimeoutError: + logger.error("Telegram notification timeout") + except aiohttp.ClientError as exc: + logger.error("Telegram notification failed: %s", exc) + except Exception as exc: + logger.error("Unexpected error sending notification: %s", exc) + + async def notify_trade_execution( + self, + stock_code: str, + market: str, + action: str, + quantity: int, + price: float, + confidence: float, + ) -> None: + """ + Notify trade execution. + + Args: + stock_code: Stock ticker symbol + market: Market name (e.g., "Korea", "United States") + action: "BUY" or "SELL" + quantity: Number of shares + price: Execution price + confidence: AI confidence level (0-100) + """ + emoji = "đŸŸĸ" if action == "BUY" else "🔴" + message = ( + f"{emoji} {action}\n" + f"Symbol: {stock_code} ({market})\n" + f"Quantity: {quantity:,} shares\n" + f"Price: {price:,.2f}\n" + f"Confidence: {confidence:.0f}%" + ) + await self._send_notification( + NotificationMessage(priority=NotificationPriority.MEDIUM, message=message) + ) + + async def notify_market_open(self, market_name: str) -> None: + """ + Notify market opening. + + Args: + market_name: Name of the market (e.g., "Korea", "United States") + """ + message = f"Market Open\n{market_name} trading session started" + await self._send_notification( + NotificationMessage(priority=NotificationPriority.LOW, message=message) + ) + + async def notify_market_close(self, market_name: str, pnl_pct: float) -> None: + """ + Notify market closing. + + Args: + market_name: Name of the market + pnl_pct: Final P&L percentage for the session + """ + pnl_sign = "+" if pnl_pct >= 0 else "" + pnl_emoji = "📈" if pnl_pct >= 0 else "📉" + message = ( + f"Market Close\n" + f"{market_name} trading session ended\n" + f"{pnl_emoji} P&L: {pnl_sign}{pnl_pct:.2f}%" + ) + await self._send_notification( + NotificationMessage(priority=NotificationPriority.LOW, message=message) + ) + + async def notify_circuit_breaker( + self, pnl_pct: float, threshold: float + ) -> None: + """ + Notify circuit breaker activation. + + Args: + pnl_pct: Current P&L percentage + threshold: Circuit breaker threshold + """ + message = ( + f"CIRCUIT BREAKER TRIPPED\n" + f"P&L: {pnl_pct:.2f}% (threshold: {threshold:.1f}%)\n" + f"Trading halted for safety" + ) + await self._send_notification( + NotificationMessage(priority=NotificationPriority.CRITICAL, message=message) + ) + + async def notify_fat_finger( + self, + stock_code: str, + order_amount: float, + total_cash: float, + max_pct: float, + ) -> None: + """ + Notify fat-finger protection rejection. + + Args: + stock_code: Stock ticker symbol + order_amount: Attempted order amount + total_cash: Total available cash + max_pct: Maximum allowed percentage + """ + attempted_pct = (order_amount / total_cash) * 100 if total_cash > 0 else 0 + message = ( + f"Fat-Finger Protection\n" + f"Order rejected: {stock_code}\n" + f"Attempted: {attempted_pct:.1f}% of cash\n" + f"Max allowed: {max_pct:.0f}%\n" + f"Amount: {order_amount:,.0f} / {total_cash:,.0f}" + ) + await self._send_notification( + NotificationMessage(priority=NotificationPriority.HIGH, message=message) + ) + + async def notify_system_start( + self, mode: str, enabled_markets: list[str] + ) -> None: + """ + Notify system startup. + + Args: + mode: Trading mode ("paper" or "live") + enabled_markets: List of enabled market codes + """ + mode_emoji = "📝" if mode == "paper" else "💰" + markets_str = ", ".join(enabled_markets) + message = ( + f"{mode_emoji} System Started\n" + f"Mode: {mode.upper()}\n" + f"Markets: {markets_str}" + ) + await self._send_notification( + NotificationMessage(priority=NotificationPriority.MEDIUM, message=message) + ) + + async def notify_system_shutdown(self, reason: str) -> None: + """ + Notify system shutdown. + + Args: + reason: Reason for shutdown (e.g., "Normal shutdown", "Circuit breaker") + """ + message = f"System Shutdown\n{reason}" + priority = ( + NotificationPriority.CRITICAL + if "circuit breaker" in reason.lower() + else NotificationPriority.MEDIUM + ) + await self._send_notification( + NotificationMessage(priority=priority, message=message) + ) + + async def notify_error( + self, error_type: str, error_msg: str, context: str + ) -> None: + """ + Notify system error. + + Args: + error_type: Type of error (e.g., "Connection Error") + error_msg: Error message + context: Error context (e.g., stock code, market) + """ + message = ( + f"Error: {error_type}\n" + f"Context: {context}\n" + f"Message: {error_msg[:200]}" # Truncate long errors + ) + await self._send_notification( + NotificationMessage(priority=NotificationPriority.HIGH, message=message) + )