Compare commits

..

12 Commits

Author SHA1 Message Date
agentson
0ca3fe9f5d feat: add configuration and documentation for Telegram commands (issue #69)
Some checks failed
CI / test (pull_request) Has been cancelled
Add configuration options and comprehensive documentation for the new
bidirectional command feature.

Changes:
- Add TELEGRAM_COMMANDS_ENABLED to config.py
- Add TELEGRAM_POLLING_INTERVAL to config.py
- Add extensive "Bidirectional Commands" section to README.md

Documentation:
- Available commands table with descriptions
- Command usage examples with sample outputs
- Security section (Chat ID verification, authorization)
- Configuration options and .env examples
- How it works (long polling, authentication flow)
- Error handling and troubleshooting guide

Features:
- Optional command support (can disable while keeping notifications)
- Configurable polling interval
- Complete security documentation
- Troubleshooting guide for common issues

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-05 15:39:02 +09:00
462f8763ab Merge pull request 'feat: implement status query commands /status and /positions (issue #67)' (#68) from feature/issue-67-status-commands into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #68
2026-02-05 15:34:16 +09:00
agentson
57a45a24cb feat: implement status query commands /status and /positions (issue #67)
Some checks failed
CI / test (pull_request) Has been cancelled
Add real-time status and portfolio monitoring via Telegram.

Changes:
- Implement /status handler (mode, markets, P&L, trading state)
- Implement /positions handler (holdings with grouping by market)
- Integrate with Broker API and RiskManager
- Add 5 comprehensive tests for status commands

Features:
- /status: Shows trading mode, enabled markets, pause state, P&L, circuit breaker
- /positions: Lists holdings grouped by market (domestic/overseas)
- Error handling: Graceful degradation on API failures
- Empty state: Handles portfolios with no positions

Integration:
- Uses broker.get_balance() for account data
- Uses risk.calculate_pnl() for P&L calculation
- Accesses pause_trading.is_set() for trading state
- Groups positions by market for better readability

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-05 15:29:52 +09:00
a7696568cc Merge pull request 'feat: implement trading control commands /stop and /resume (issue #65)' (#66) from feature/issue-65-trading-control into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #66
2026-02-05 15:17:35 +09:00
agentson
70701bf73a feat: implement trading control commands /stop and /resume (issue #65)
Some checks failed
CI / test (pull_request) Has been cancelled
Add pause/resume functionality for remote trading control via Telegram.

Changes:
- Add pause_trading Event to main.py
- Implement /stop handler (pause trading)
- Implement /resume handler (resume trading)
- Integrate pause logic into both daily and realtime trading loops
- Add 4 comprehensive tests for trading control

Features:
- /stop: Pauses all trading operations
- /resume: Resumes trading operations
- Idempotent: Handles repeated stop/resume gracefully
- Status feedback: Informs if already paused/active
- Works in both daily and realtime trading modes

Security:
- Commands verified by TelegramCommandHandler chat_id check
- Only authorized users can control trading

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-05 14:40:19 +09:00
20dbd94892 Merge pull request 'feat: implement basic commands /start and /help (issue #63)' (#64) from feature/issue-63-basic-commands into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #64
2026-02-05 13:56:51 +09:00
agentson
48a99962e3 feat: implement basic commands /start and /help (issue #63)
Some checks failed
CI / test (pull_request) Has been cancelled
Integrate TelegramCommandHandler into main.py and implement
welcome and help commands.

Changes:
- Import TelegramCommandHandler in main.py
- Initialize command handler and register /start and /help
- Start/stop command handler with proper lifecycle management
- Add tests for command content validation

Features:
- /start: Welcome message with bot introduction
- /help: Complete command reference
- Handlers respond with HTML-formatted messages
- Clean startup/shutdown integration

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-05 13:55:52 +09:00
ee66ecc305 Merge pull request 'feat: implement TelegramCommandHandler core structure (issue #61)' (#62) from feature/issue-61-command-handler into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #62
2026-02-05 13:51:18 +09:00
agentson
065c9daaad feat: implement TelegramCommandHandler core structure (issue #61)
Some checks failed
CI / test (pull_request) Has been cancelled
Add TelegramCommandHandler class with long polling, command routing,
and security features.

Changes:
- Add TelegramCommandHandler class to telegram_client.py
- Implement long polling with getUpdates API
- Add command registration and routing mechanism
- Implement chat ID verification for security
- Add comprehensive tests (16 tests)
- Coverage: 85% for telegram_client.py

Features:
- start_polling() / stop_polling() lifecycle management
- register_command() for handler registration
- Chat ID verification to prevent unauthorized access
- Error isolation (command failures don't crash system)
- Graceful handling of API errors and timeouts

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-05 13:47:27 +09:00
c76b9d5c15 Merge pull request 'feat: add generic send_message method to TelegramClient (issue #59)' (#60) from feature/issue-59-send-message into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #60
2026-02-05 13:40:06 +09:00
agentson
259f9d2e24 feat: add generic send_message method to TelegramClient (issue #59)
Some checks failed
CI / test (pull_request) Has been cancelled
Add send_message(text, parse_mode) method that can be used for both
notifications and command responses. Refactor _send_notification to
use the new method.

Changes:
- Add send_message() method with return value for success/failure
- Refactor _send_notification() to call send_message()
- Add comprehensive tests for send_message()
- Coverage: 93% for telegram_client.py

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-05 13:39:09 +09:00
8e715c55cd Merge pull request 'feat: 일일 거래 모드 + 요구사항 문서화 체계 (issue #57)' (#58) from feature/issue-57-daily-trading-mode into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #58
2026-02-05 09:49:26 +09:00
6 changed files with 1379 additions and 15 deletions

View File

@@ -66,6 +66,10 @@ class Settings(BaseSettings):
TELEGRAM_CHAT_ID: str | None = None
TELEGRAM_ENABLED: bool = True
# Telegram Commands (optional)
TELEGRAM_COMMANDS_ENABLED: bool = True
TELEGRAM_POLLING_INTERVAL: float = 1.0 # seconds
model_config = {"env_file": ".env", "env_file_encoding": "utf-8"}
@property

View File

@@ -29,7 +29,7 @@ from src.db import init_db, log_trade
from src.logging.decision_logger import DecisionLogger
from src.logging_config import setup_logging
from src.markets.schedule import MarketInfo, get_next_market_open, get_open_markets
from src.notifications.telegram_client import TelegramClient
from src.notifications.telegram_client import TelegramClient, TelegramCommandHandler
logger = logging.getLogger(__name__)
@@ -575,6 +575,160 @@ async def run(settings: Settings) -> None:
enabled=settings.TELEGRAM_ENABLED,
)
# Initialize Telegram command handler
command_handler = TelegramCommandHandler(telegram)
# Register basic commands
async def handle_start() -> None:
"""Handle /start command."""
message = (
"<b>🤖 The Ouroboros Trading Bot</b>\n\n"
"AI-powered global stock trading agent with real-time notifications.\n\n"
"<b>Available commands:</b>\n"
"/help - Show this help message\n"
"/status - Current trading status\n"
"/positions - View holdings\n"
"/stop - Pause trading\n"
"/resume - Resume trading"
)
await telegram.send_message(message)
async def handle_help() -> None:
"""Handle /help command."""
message = (
"<b>📖 Available Commands</b>\n\n"
"/start - Welcome message\n"
"/help - Show available commands\n"
"/status - Trading status (mode, markets, P&L)\n"
"/positions - Current holdings\n"
"/stop - Pause trading\n"
"/resume - Resume trading"
)
await telegram.send_message(message)
async def handle_stop() -> None:
"""Handle /stop command - pause trading."""
if not pause_trading.is_set():
await telegram.send_message("⏸️ Trading is already paused")
return
pause_trading.clear()
logger.info("Trading paused via Telegram command")
await telegram.send_message(
"<b>⏸️ Trading Paused</b>\n\n"
"All trading operations have been suspended.\n"
"Use /resume to restart trading."
)
async def handle_resume() -> None:
"""Handle /resume command - resume trading."""
if pause_trading.is_set():
await telegram.send_message("▶️ Trading is already active")
return
pause_trading.set()
logger.info("Trading resumed via Telegram command")
await telegram.send_message(
"<b>▶️ Trading Resumed</b>\n\n"
"Trading operations have been restarted."
)
async def handle_status() -> None:
"""Handle /status command - show trading status."""
try:
# Get trading status
trading_status = "Active" if pause_trading.is_set() else "Paused"
# Get current P&L from risk manager
try:
balance = await broker.get_balance()
current_pnl = risk.calculate_pnl(balance)
pnl_str = f"{current_pnl:+.2f}%"
except Exception as exc:
logger.warning("Failed to get P&L: %s", exc)
pnl_str = "N/A"
# Format market list
markets_str = ", ".join(settings.enabled_market_list)
message = (
"<b>📊 Trading Status</b>\n\n"
f"<b>Mode:</b> {settings.MODE.upper()}\n"
f"<b>Markets:</b> {markets_str}\n"
f"<b>Trading:</b> {trading_status}\n\n"
f"<b>Current P&L:</b> {pnl_str}\n"
f"<b>Circuit Breaker:</b> {risk.circuit_breaker_threshold:.1f}%"
)
await telegram.send_message(message)
except Exception as exc:
logger.error("Error in /status handler: %s", exc)
await telegram.send_message(
"<b>⚠️ Error</b>\n\nFailed to retrieve trading status."
)
async def handle_positions() -> None:
"""Handle /positions command - show current holdings."""
try:
# Get account balance
balance = await broker.get_balance()
# Check if there are any positions
if not balance.stocks:
await telegram.send_message(
"<b>💼 Current Holdings</b>\n\n"
"No positions currently held."
)
return
# Group positions by market (domestic vs overseas)
domestic_positions = []
overseas_positions = []
for stock in balance.stocks:
position_str = (
f"{stock.code}: {stock.quantity} shares @ "
f"{stock.avg_price:,.0f}"
)
# Simple heuristic: if code is 6 digits, it's domestic (Korea)
if len(stock.code) == 6 and stock.code.isdigit():
domestic_positions.append(position_str)
else:
overseas_positions.append(position_str)
# Build message
message_parts = ["<b>💼 Current Holdings</b>\n"]
if domestic_positions:
message_parts.append("\n🇰🇷 <b>Korea</b>")
message_parts.extend(domestic_positions)
if overseas_positions:
message_parts.append("\n🇺🇸 <b>Overseas</b>")
message_parts.extend(overseas_positions)
# Add total cash
message_parts.append(
f"\n<b>Cash:</b> ₩{balance.total_cash:,.0f}"
)
message = "\n".join(message_parts)
await telegram.send_message(message)
except Exception as exc:
logger.error("Error in /positions handler: %s", exc)
await telegram.send_message(
"<b>⚠️ Error</b>\n\nFailed to retrieve positions."
)
command_handler.register_command("start", handle_start)
command_handler.register_command("help", handle_help)
command_handler.register_command("stop", handle_stop)
command_handler.register_command("resume", handle_resume)
command_handler.register_command("status", handle_status)
command_handler.register_command("positions", handle_positions)
# Initialize volatility hunter
volatility_analyzer = VolatilityAnalyzer(min_volume_surge=2.0, min_price_change=1.0)
market_scanner = MarketScanner(
@@ -602,7 +756,10 @@ async def run(settings: Settings) -> None:
# Track market open/close state for notifications
_market_states: dict[str, bool] = {} # market_code -> is_open
# Trading control events
shutdown = asyncio.Event()
pause_trading = asyncio.Event()
pause_trading.set() # Default: trading enabled
def _signal_handler() -> None:
logger.info("Shutdown signal received")
@@ -621,6 +778,12 @@ async def run(settings: Settings) -> None:
except Exception as exc:
logger.warning("System startup notification failed: %s", exc)
# Start command handler
try:
await command_handler.start_polling()
except Exception as exc:
logger.warning("Failed to start command handler: %s", exc)
try:
# Branch based on trading mode
if settings.TRADE_MODE == "daily":
@@ -634,6 +797,9 @@ async def run(settings: Settings) -> None:
session_interval = settings.SESSION_INTERVAL_HOURS * 3600 # Convert to seconds
while not shutdown.is_set():
# Wait for trading to be unpaused
await pause_trading.wait()
try:
await run_daily_session(
broker,
@@ -666,6 +832,9 @@ async def run(settings: Settings) -> None:
logger.info("Realtime trading mode: 60s interval per stock")
while not shutdown.is_set():
# Wait for trading to be unpaused
await pause_trading.wait()
# Get currently open markets
open_markets = get_open_markets(settings.enabled_market_list)
@@ -831,6 +1000,7 @@ async def run(settings: Settings) -> None:
pass # Normal — timeout means it's time for next cycle
finally:
# Clean up resources
await command_handler.stop_polling()
await broker.close()
await telegram.close()
db_conn.close()

View File

@@ -200,14 +200,151 @@ telegram = TelegramClient(
)
```
## Bidirectional Commands
Control your trading bot remotely via Telegram commands. The bot not only sends notifications but also accepts commands for real-time control.
### Available Commands
| Command | Description |
|---------|-------------|
| `/start` | Welcome message with quick start guide |
| `/help` | List all available commands |
| `/status` | Current trading status (mode, markets, P&L, circuit breaker) |
| `/positions` | View current holdings grouped by market |
| `/stop` | Pause all trading operations |
| `/resume` | Resume trading operations |
### Command Examples
**Check Trading Status**
```
You: /status
Bot:
📊 Trading Status
Mode: PAPER
Markets: Korea, United States
Trading: Active
Current P&L: +2.50%
Circuit Breaker: -3.0%
```
**View Holdings**
```
You: /positions
Bot:
💼 Current Holdings
🇰🇷 Korea
• 005930: 10 shares @ 70,000
• 035420: 5 shares @ 200,000
🇺🇸 Overseas
• AAPL: 15 shares @ 175
• TSLA: 8 shares @ 245
Cash: ₩5,000,000
```
**Pause Trading**
```
You: /stop
Bot:
⏸️ Trading Paused
All trading operations have been suspended.
Use /resume to restart trading.
```
**Resume Trading**
```
You: /resume
Bot:
▶️ Trading Resumed
Trading operations have been restarted.
```
### Security
**Chat ID Verification**
- Commands are only accepted from the configured `TELEGRAM_CHAT_ID`
- Unauthorized users receive no response
- Command attempts from wrong chat IDs are logged
**Authorization Required**
- Only the bot owner (chat ID in `.env`) can control trading
- No way for unauthorized users to discover or use commands
- All command executions are logged for audit
### Configuration
Add to your `.env` file:
```bash
# Commands are enabled by default
TELEGRAM_COMMANDS_ENABLED=true
# Polling interval (seconds) - how often to check for commands
TELEGRAM_POLLING_INTERVAL=1.0
```
To disable commands but keep notifications:
```bash
TELEGRAM_COMMANDS_ENABLED=false
```
### How It Works
1. **Long Polling**: Bot checks Telegram API every second for new messages
2. **Command Parsing**: Messages starting with `/` are parsed as commands
3. **Authentication**: Chat ID is verified before executing any command
4. **Execution**: Command handler is called with current bot state
5. **Response**: Result is sent back via Telegram
### Error Handling
- Command parsing errors → "Unknown command" response
- API failures → Graceful degradation, error logged
- Invalid state → Appropriate message (e.g., "Trading is already paused")
- Trading loop isolation → Command errors never crash trading
### Troubleshooting Commands
**Commands not responding**
1. Check `TELEGRAM_COMMANDS_ENABLED=true` in `.env`
2. Verify you started conversation with `/start`
3. Check logs for command handler errors
4. Confirm chat ID matches `.env` configuration
**Wrong chat ID**
- Commands from unauthorized chats are silently ignored
- Check logs for "unauthorized chat_id" warnings
**Delayed responses**
- Polling interval is 1 second by default
- Network latency may add delay
- Check `TELEGRAM_POLLING_INTERVAL` setting
## API Reference
See `telegram_client.py` for full API documentation.
Key methods:
### Notification Methods
- `notify_trade_execution()` - Trade alerts
- `notify_circuit_breaker()` - Emergency stops
- `notify_fat_finger()` - Order rejections
- `notify_market_open/close()` - Session tracking
- `notify_system_start/shutdown()` - Lifecycle events
- `notify_error()` - Error alerts
### Command Handler
- `TelegramCommandHandler` - Bidirectional command processing
- `register_command()` - Register custom command handlers
- `start_polling()` / `stop_polling()` - Lifecycle management

View File

@@ -3,6 +3,7 @@
import asyncio
import logging
import time
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from enum import Enum
@@ -117,26 +118,28 @@ class TelegramClient:
if self._session is not None and not self._session.closed:
await self._session.close()
async def _send_notification(self, msg: NotificationMessage) -> None:
async def send_message(self, text: str, parse_mode: str = "HTML") -> bool:
"""
Send notification to Telegram with graceful degradation.
Send a generic text message to Telegram.
Args:
msg: Notification message to send
text: Message text to send
parse_mode: Parse mode for formatting (HTML or Markdown)
Returns:
True if message was sent successfully, False otherwise
"""
if not self._enabled:
return
return False
try:
await self._rate_limiter.acquire()
formatted_message = f"{msg.priority.emoji} {msg.message}"
url = f"{self.API_BASE.format(token=self._bot_token)}/sendMessage"
payload = {
"chat_id": self._chat_id,
"text": formatted_message,
"parse_mode": "HTML",
"text": text,
"parse_mode": parse_mode,
}
session = self._get_session()
@@ -146,15 +149,29 @@ class TelegramClient:
logger.error(
"Telegram API error (status=%d): %s", resp.status, error_text
)
else:
logger.debug("Telegram notification sent: %s", msg.message[:50])
return False
logger.debug("Telegram message sent: %s", text[:50])
return True
except asyncio.TimeoutError:
logger.error("Telegram notification timeout")
logger.error("Telegram message timeout")
return False
except aiohttp.ClientError as exc:
logger.error("Telegram notification failed: %s", exc)
logger.error("Telegram message failed: %s", exc)
return False
except Exception as exc:
logger.error("Unexpected error sending notification: %s", exc)
logger.error("Unexpected error sending message: %s", exc)
return False
async def _send_notification(self, msg: NotificationMessage) -> None:
"""
Send notification to Telegram with graceful degradation.
Args:
msg: Notification message to send
"""
formatted_message = f"{msg.priority.emoji} {msg.message}"
await self.send_message(formatted_message)
async def notify_trade_execution(
self,
@@ -323,3 +340,171 @@ class TelegramClient:
await self._send_notification(
NotificationMessage(priority=NotificationPriority.HIGH, message=message)
)
class TelegramCommandHandler:
"""Handles incoming Telegram commands via long polling."""
def __init__(
self, client: TelegramClient, polling_interval: float = 1.0
) -> None:
"""
Initialize command handler.
Args:
client: TelegramClient instance for sending responses
polling_interval: Polling interval in seconds
"""
self._client = client
self._polling_interval = polling_interval
self._commands: dict[str, Callable[[], Awaitable[None]]] = {}
self._last_update_id = 0
self._polling_task: asyncio.Task[None] | None = None
self._running = False
def register_command(
self, command: str, handler: Callable[[], Awaitable[None]]
) -> None:
"""
Register a command handler.
Args:
command: Command name (without leading slash, e.g., "start")
handler: Async function to handle the command
"""
self._commands[command] = handler
logger.debug("Registered command handler: /%s", command)
async def start_polling(self) -> None:
"""Start long polling for commands."""
if self._running:
logger.warning("Command handler already running")
return
if not self._client._enabled:
logger.info("Command handler disabled (TelegramClient disabled)")
return
self._running = True
self._polling_task = asyncio.create_task(self._poll_loop())
logger.info("Started Telegram command polling")
async def stop_polling(self) -> None:
"""Stop polling and cancel pending tasks."""
if not self._running:
return
self._running = False
if self._polling_task:
self._polling_task.cancel()
try:
await self._polling_task
except asyncio.CancelledError:
pass
logger.info("Stopped Telegram command polling")
async def _poll_loop(self) -> None:
"""Main polling loop that fetches updates."""
while self._running:
try:
updates = await self._get_updates()
for update in updates:
await self._handle_update(update)
except asyncio.CancelledError:
break
except Exception as exc:
logger.error("Error in polling loop: %s", exc)
await asyncio.sleep(self._polling_interval)
async def _get_updates(self) -> list[dict]:
"""
Fetch updates from Telegram API.
Returns:
List of update objects
"""
try:
url = f"{self._client.API_BASE.format(token=self._client._bot_token)}/getUpdates"
payload = {
"offset": self._last_update_id + 1,
"timeout": int(self._polling_interval),
"allowed_updates": ["message"],
}
session = self._client._get_session()
async with session.post(url, json=payload) as resp:
if resp.status != 200:
error_text = await resp.text()
logger.error(
"getUpdates API error (status=%d): %s", resp.status, error_text
)
return []
data = await resp.json()
if not data.get("ok"):
logger.error("getUpdates returned ok=false: %s", data)
return []
updates = data.get("result", [])
if updates:
self._last_update_id = updates[-1]["update_id"]
return updates
except asyncio.TimeoutError:
logger.debug("getUpdates timeout (normal)")
return []
except aiohttp.ClientError as exc:
logger.error("getUpdates failed: %s", exc)
return []
except Exception as exc:
logger.error("Unexpected error in _get_updates: %s", exc)
return []
async def _handle_update(self, update: dict) -> None:
"""
Parse and handle a single update.
Args:
update: Update object from Telegram API
"""
try:
message = update.get("message")
if not message:
return
# Verify chat_id matches configured chat
chat_id = str(message.get("chat", {}).get("id", ""))
if chat_id != self._client._chat_id:
logger.warning(
"Ignoring command from unauthorized chat_id: %s", chat_id
)
return
# Extract command text
text = message.get("text", "").strip()
if not text.startswith("/"):
return
# Parse command (remove leading slash and extract command name)
command_parts = text[1:].split()
if not command_parts:
return
command_name = command_parts[0]
# Execute handler
handler = self._commands.get(command_name)
if handler:
logger.info("Executing command: /%s", command_name)
await handler()
else:
logger.debug("Unknown command: /%s", command_name)
await self._client.send_message(
f"Unknown command: /{command_name}\nUse /help to see available commands."
)
except Exception as exc:
logger.error("Error handling update: %s", exc)
# Don't crash the polling loop on handler errors

View File

@@ -39,6 +39,76 @@ class TestTelegramClientInit:
class TestNotificationSending:
"""Test notification sending behavior."""
@pytest.mark.asyncio
async def test_send_message_success(self) -> None:
"""send_message returns True on successful send."""
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True
)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
result = await client.send_message("Test message")
assert result is True
assert mock_post.call_count == 1
payload = mock_post.call_args.kwargs["json"]
assert payload["chat_id"] == "456"
assert payload["text"] == "Test message"
assert payload["parse_mode"] == "HTML"
@pytest.mark.asyncio
async def test_send_message_disabled_client(self) -> None:
"""send_message returns False when client disabled."""
client = TelegramClient(enabled=False)
with patch("aiohttp.ClientSession.post") as mock_post:
result = await client.send_message("Test message")
assert result is False
mock_post.assert_not_called()
@pytest.mark.asyncio
async def test_send_message_api_error(self) -> None:
"""send_message returns False on API error."""
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True
)
mock_resp = AsyncMock()
mock_resp.status = 400
mock_resp.text = AsyncMock(return_value="Bad Request")
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp):
result = await client.send_message("Test message")
assert result is False
@pytest.mark.asyncio
async def test_send_message_with_markdown(self) -> None:
"""send_message supports different parse modes."""
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True
)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
result = await client.send_message("*bold*", parse_mode="Markdown")
assert result is True
payload = mock_post.call_args.kwargs["json"]
assert payload["parse_mode"] == "Markdown"
@pytest.mark.asyncio
async def test_no_send_when_disabled(self) -> None:
"""Notifications not sent when client disabled."""

View File

@@ -0,0 +1,798 @@
"""Tests for Telegram command handler."""
from unittest.mock import AsyncMock, patch
import pytest
from src.notifications.telegram_client import TelegramClient, TelegramCommandHandler
class TestCommandHandlerInit:
"""Test command handler initialization."""
def test_init_with_client(self) -> None:
"""Handler initializes with TelegramClient."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
assert handler._client is client
assert handler._polling_interval == 1.0
assert handler._commands == {}
assert handler._running is False
def test_custom_polling_interval(self) -> None:
"""Handler accepts custom polling interval."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client, polling_interval=2.5)
assert handler._polling_interval == 2.5
class TestCommandRegistration:
"""Test command registration."""
@pytest.mark.asyncio
async def test_register_command(self) -> None:
"""Commands can be registered."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
async def test_handler() -> None:
pass
handler.register_command("test", test_handler)
assert "test" in handler._commands
assert handler._commands["test"] is test_handler
@pytest.mark.asyncio
async def test_register_multiple_commands(self) -> None:
"""Multiple commands can be registered."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
async def handler1() -> None:
pass
async def handler2() -> None:
pass
handler.register_command("start", handler1)
handler.register_command("help", handler2)
assert len(handler._commands) == 2
assert handler._commands["start"] is handler1
assert handler._commands["help"] is handler2
class TestPollingLifecycle:
"""Test polling start/stop."""
@pytest.mark.asyncio
async def test_start_polling(self) -> None:
"""Polling can be started."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
with patch.object(handler, "_poll_loop", new_callable=AsyncMock):
await handler.start_polling()
assert handler._running is True
assert handler._polling_task is not None
await handler.stop_polling()
@pytest.mark.asyncio
async def test_start_polling_disabled_client(self) -> None:
"""Polling not started when client disabled."""
client = TelegramClient(enabled=False)
handler = TelegramCommandHandler(client)
await handler.start_polling()
assert handler._running is False
assert handler._polling_task is None
@pytest.mark.asyncio
async def test_stop_polling(self) -> None:
"""Polling can be stopped."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
with patch.object(handler, "_poll_loop", new_callable=AsyncMock):
await handler.start_polling()
await handler.stop_polling()
assert handler._running is False
@pytest.mark.asyncio
async def test_double_start_ignored(self) -> None:
"""Starting already running handler is ignored."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
with patch.object(handler, "_poll_loop", new_callable=AsyncMock):
await handler.start_polling()
task1 = handler._polling_task
await handler.start_polling() # Second start
task2 = handler._polling_task
# Should be the same task
assert task1 is task2
await handler.stop_polling()
class TestUpdateHandling:
"""Test update parsing and handling."""
@pytest.mark.asyncio
async def test_handle_valid_command(self) -> None:
"""Valid commands are executed."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
executed = False
async def test_command() -> None:
nonlocal executed
executed = True
handler.register_command("test", test_command)
update = {
"update_id": 1,
"message": {
"chat": {"id": 456},
"text": "/test",
},
}
await handler._handle_update(update)
assert executed is True
@pytest.mark.asyncio
async def test_handle_unknown_command(self) -> None:
"""Unknown commands send help message."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
update = {
"update_id": 1,
"message": {
"chat": {"id": 456},
"text": "/unknown",
},
}
await handler._handle_update(update)
# Should send error message
assert mock_post.call_count == 1
payload = mock_post.call_args.kwargs["json"]
assert "Unknown command" in payload["text"]
assert "/unknown" in payload["text"]
@pytest.mark.asyncio
async def test_ignore_unauthorized_chat(self) -> None:
"""Commands from unauthorized chats are ignored."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
executed = False
async def test_command() -> None:
nonlocal executed
executed = True
handler.register_command("test", test_command)
update = {
"update_id": 1,
"message": {
"chat": {"id": 999}, # Wrong chat_id
"text": "/test",
},
}
await handler._handle_update(update)
assert executed is False
@pytest.mark.asyncio
async def test_ignore_non_command_text(self) -> None:
"""Non-command text is ignored."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
executed = False
async def test_command() -> None:
nonlocal executed
executed = True
handler.register_command("test", test_command)
update = {
"update_id": 1,
"message": {
"chat": {"id": 456},
"text": "Hello, not a command",
},
}
await handler._handle_update(update)
assert executed is False
@pytest.mark.asyncio
async def test_handle_update_error_isolation(self) -> None:
"""Errors in handlers don't crash the system."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
async def failing_command() -> None:
raise ValueError("Test error")
handler.register_command("fail", failing_command)
update = {
"update_id": 1,
"message": {
"chat": {"id": 456},
"text": "/fail",
},
}
# Should not raise exception
await handler._handle_update(update)
class TestTradingControlCommands:
"""Test trading control commands."""
@pytest.mark.asyncio
async def test_stop_command_pauses_trading(self) -> None:
"""Stop command clears pause event."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
# Create mock pause event
import asyncio
pause_event = asyncio.Event()
pause_event.set() # Initially active
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
async def mock_stop() -> None:
"""Mock /stop handler."""
if not pause_event.is_set():
await client.send_message("⏸️ Trading is already paused")
return
pause_event.clear()
await client.send_message(
"<b>⏸️ Trading Paused</b>\n\n"
"All trading operations have been suspended.\n"
"Use /resume to restart trading."
)
handler.register_command("stop", mock_stop)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
update = {
"update_id": 1,
"message": {
"chat": {"id": 456},
"text": "/stop",
},
}
await handler._handle_update(update)
# Verify pause event was cleared
assert not pause_event.is_set()
# Verify message was sent
assert mock_post.call_count == 1
payload = mock_post.call_args.kwargs["json"]
assert "Trading Paused" in payload["text"]
@pytest.mark.asyncio
async def test_resume_command_resumes_trading(self) -> None:
"""Resume command sets pause event."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
# Create mock pause event (initially paused)
import asyncio
pause_event = asyncio.Event()
pause_event.clear() # Initially paused
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
async def mock_resume() -> None:
"""Mock /resume handler."""
if pause_event.is_set():
await client.send_message("▶️ Trading is already active")
return
pause_event.set()
await client.send_message(
"<b>▶️ Trading Resumed</b>\n\n"
"Trading operations have been restarted."
)
handler.register_command("resume", mock_resume)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
update = {
"update_id": 1,
"message": {
"chat": {"id": 456},
"text": "/resume",
},
}
await handler._handle_update(update)
# Verify pause event was set
assert pause_event.is_set()
# Verify message was sent
assert mock_post.call_count == 1
payload = mock_post.call_args.kwargs["json"]
assert "Trading Resumed" in payload["text"]
@pytest.mark.asyncio
async def test_stop_when_already_paused(self) -> None:
"""Stop command when already paused sends appropriate message."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
# Create mock pause event (already paused)
import asyncio
pause_event = asyncio.Event()
pause_event.clear()
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
async def mock_stop() -> None:
"""Mock /stop handler."""
if not pause_event.is_set():
await client.send_message("⏸️ Trading is already paused")
return
pause_event.clear()
handler.register_command("stop", mock_stop)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
update = {
"update_id": 1,
"message": {
"chat": {"id": 456},
"text": "/stop",
},
}
await handler._handle_update(update)
# Verify message was sent
payload = mock_post.call_args.kwargs["json"]
assert "already paused" in payload["text"]
@pytest.mark.asyncio
async def test_resume_when_already_active(self) -> None:
"""Resume command when already active sends appropriate message."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
# Create mock pause event (already active)
import asyncio
pause_event = asyncio.Event()
pause_event.set()
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
async def mock_resume() -> None:
"""Mock /resume handler."""
if pause_event.is_set():
await client.send_message("▶️ Trading is already active")
return
pause_event.set()
handler.register_command("resume", mock_resume)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
update = {
"update_id": 1,
"message": {
"chat": {"id": 456},
"text": "/resume",
},
}
await handler._handle_update(update)
# Verify message was sent
payload = mock_post.call_args.kwargs["json"]
assert "already active" in payload["text"]
class TestStatusCommands:
"""Test status query commands."""
@pytest.mark.asyncio
async def test_status_command_shows_trading_info(self) -> None:
"""Status command displays mode, markets, and P&L."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
async def mock_status() -> None:
"""Mock /status handler."""
message = (
"<b>📊 Trading Status</b>\n\n"
"<b>Mode:</b> PAPER\n"
"<b>Markets:</b> Korea, United States\n"
"<b>Trading:</b> Active\n\n"
"<b>Current P&L:</b> +2.50%\n"
"<b>Circuit Breaker:</b> -3.0%"
)
await client.send_message(message)
handler.register_command("status", mock_status)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
update = {
"update_id": 1,
"message": {
"chat": {"id": 456},
"text": "/status",
},
}
await handler._handle_update(update)
# Verify message was sent
assert mock_post.call_count == 1
payload = mock_post.call_args.kwargs["json"]
assert "Trading Status" in payload["text"]
assert "PAPER" in payload["text"]
assert "P&L" in payload["text"]
@pytest.mark.asyncio
async def test_status_command_error_handling(self) -> None:
"""Status command handles errors gracefully."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
async def mock_status_error() -> None:
"""Mock /status handler with error."""
await client.send_message(
"<b>⚠️ Error</b>\n\nFailed to retrieve trading status."
)
handler.register_command("status", mock_status_error)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
update = {
"update_id": 1,
"message": {
"chat": {"id": 456},
"text": "/status",
},
}
await handler._handle_update(update)
# Should send error message
payload = mock_post.call_args.kwargs["json"]
assert "Error" in payload["text"]
@pytest.mark.asyncio
async def test_positions_command_shows_holdings(self) -> None:
"""Positions command displays current holdings."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
async def mock_positions() -> None:
"""Mock /positions handler."""
message = (
"<b>💼 Current Holdings</b>\n"
"\n🇰🇷 <b>Korea</b>\n"
"• 005930: 10 shares @ 70,000\n"
"\n🇺🇸 <b>Overseas</b>\n"
"• AAPL: 15 shares @ 175\n"
"\n<b>Cash:</b> ₩5,000,000"
)
await client.send_message(message)
handler.register_command("positions", mock_positions)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
update = {
"update_id": 1,
"message": {
"chat": {"id": 456},
"text": "/positions",
},
}
await handler._handle_update(update)
# Verify message was sent
assert mock_post.call_count == 1
payload = mock_post.call_args.kwargs["json"]
assert "Current Holdings" in payload["text"]
assert "shares" in payload["text"]
@pytest.mark.asyncio
async def test_positions_command_empty_holdings(self) -> None:
"""Positions command handles empty portfolio."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
async def mock_positions_empty() -> None:
"""Mock /positions handler with no positions."""
message = (
"<b>💼 Current Holdings</b>\n\n"
"No positions currently held."
)
await client.send_message(message)
handler.register_command("positions", mock_positions_empty)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
update = {
"update_id": 1,
"message": {
"chat": {"id": 456},
"text": "/positions",
},
}
await handler._handle_update(update)
# Verify message was sent
payload = mock_post.call_args.kwargs["json"]
assert "No positions" in payload["text"]
@pytest.mark.asyncio
async def test_positions_command_error_handling(self) -> None:
"""Positions command handles errors gracefully."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
async def mock_positions_error() -> None:
"""Mock /positions handler with error."""
await client.send_message(
"<b>⚠️ Error</b>\n\nFailed to retrieve positions."
)
handler.register_command("positions", mock_positions_error)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
update = {
"update_id": 1,
"message": {
"chat": {"id": 456},
"text": "/positions",
},
}
await handler._handle_update(update)
# Should send error message
payload = mock_post.call_args.kwargs["json"]
assert "Error" in payload["text"]
class TestBasicCommands:
"""Test basic command implementations."""
@pytest.mark.asyncio
async def test_start_command_content(self) -> None:
"""Start command contains welcome message and command list."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
async def mock_start() -> None:
"""Mock /start handler."""
message = (
"<b>🤖 The Ouroboros Trading Bot</b>\n\n"
"AI-powered global stock trading agent with real-time notifications.\n\n"
"<b>Available commands:</b>\n"
"/help - Show this help message\n"
"/status - Current trading status\n"
"/positions - View holdings\n"
"/stop - Pause trading\n"
"/resume - Resume trading"
)
await client.send_message(message)
handler.register_command("start", mock_start)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
update = {
"update_id": 1,
"message": {
"chat": {"id": 456},
"text": "/start",
},
}
await handler._handle_update(update)
# Verify message was sent
assert mock_post.call_count == 1
payload = mock_post.call_args.kwargs["json"]
assert "Ouroboros Trading Bot" in payload["text"]
assert "/help" in payload["text"]
assert "/status" in payload["text"]
@pytest.mark.asyncio
async def test_help_command_content(self) -> None:
"""Help command lists all available commands."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
async def mock_help() -> None:
"""Mock /help handler."""
message = (
"<b>📖 Available Commands</b>\n\n"
"/start - Welcome message\n"
"/help - Show available commands\n"
"/status - Trading status (mode, markets, P&L)\n"
"/positions - Current holdings\n"
"/stop - Pause trading\n"
"/resume - Resume trading"
)
await client.send_message(message)
handler.register_command("help", mock_help)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
update = {
"update_id": 1,
"message": {
"chat": {"id": 456},
"text": "/help",
},
}
await handler._handle_update(update)
# Verify message was sent
assert mock_post.call_count == 1
payload = mock_post.call_args.kwargs["json"]
assert "Available Commands" in payload["text"]
assert "/start" in payload["text"]
assert "/help" in payload["text"]
assert "/status" in payload["text"]
assert "/positions" in payload["text"]
assert "/stop" in payload["text"]
assert "/resume" in payload["text"]
class TestGetUpdates:
"""Test getUpdates API interaction."""
@pytest.mark.asyncio
async def test_get_updates_success(self) -> None:
"""getUpdates fetches and parses updates."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.json = AsyncMock(
return_value={
"ok": True,
"result": [
{"update_id": 1, "message": {"text": "/test"}},
{"update_id": 2, "message": {"text": "/help"}},
],
}
)
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp):
updates = await handler._get_updates()
assert len(updates) == 2
assert updates[0]["update_id"] == 1
assert updates[1]["update_id"] == 2
assert handler._last_update_id == 2
@pytest.mark.asyncio
async def test_get_updates_api_error(self) -> None:
"""getUpdates handles API errors gracefully."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
mock_resp = AsyncMock()
mock_resp.status = 400
mock_resp.text = AsyncMock(return_value="Bad Request")
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp):
updates = await handler._get_updates()
assert updates == []
@pytest.mark.asyncio
async def test_get_updates_empty_result(self) -> None:
"""getUpdates handles empty results."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.json = AsyncMock(return_value={"ok": True, "result": []})
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp):
updates = await handler._get_updates()
assert updates == []