diff --git a/src/config.py b/src/config.py index 80e3203..1968d05 100644 --- a/src/config.py +++ b/src/config.py @@ -93,6 +93,16 @@ class Settings(BaseSettings): TELEGRAM_COMMANDS_ENABLED: bool = True TELEGRAM_POLLING_INTERVAL: float = 1.0 # seconds + # Telegram notification type filters (granular control) + # circuit_breaker is always sent regardless — safety-critical + TELEGRAM_NOTIFY_TRADES: bool = True # BUY/SELL execution alerts + TELEGRAM_NOTIFY_MARKET_OPEN_CLOSE: bool = True # Market open/close alerts + TELEGRAM_NOTIFY_FAT_FINGER: bool = True # Fat-finger rejection alerts + TELEGRAM_NOTIFY_SYSTEM_EVENTS: bool = True # System start/shutdown alerts + TELEGRAM_NOTIFY_PLAYBOOK: bool = True # Playbook generated/failed alerts + TELEGRAM_NOTIFY_SCENARIO_MATCH: bool = True # Scenario matched alerts (most frequent) + TELEGRAM_NOTIFY_ERRORS: bool = True # Error alerts + # Overseas ranking API (KIS endpoint/TR_ID may vary by account/product) # Override these from .env if your account uses different specs. OVERSEAS_RANKING_ENABLED: bool = True diff --git a/src/main.py b/src/main.py index 3f78ae9..ca7c11e 100644 --- a/src/main.py +++ b/src/main.py @@ -41,7 +41,7 @@ from src.evolution.optimizer import EvolutionOptimizer from src.logging.decision_logger import DecisionLogger from src.logging_config import setup_logging from src.markets.schedule import MarketInfo, get_next_market_open, get_open_markets -from src.notifications.telegram_client import TelegramClient, TelegramCommandHandler +from src.notifications.telegram_client import NotificationFilter, TelegramClient, TelegramCommandHandler from src.strategy.models import DayPlaybook from src.strategy.playbook_store import PlaybookStore from src.strategy.pre_market_planner import PreMarketPlanner @@ -1208,6 +1208,15 @@ async def run(settings: Settings) -> None: bot_token=settings.TELEGRAM_BOT_TOKEN, chat_id=settings.TELEGRAM_CHAT_ID, enabled=settings.TELEGRAM_ENABLED, + notification_filter=NotificationFilter( + trades=settings.TELEGRAM_NOTIFY_TRADES, + market_open_close=settings.TELEGRAM_NOTIFY_MARKET_OPEN_CLOSE, + fat_finger=settings.TELEGRAM_NOTIFY_FAT_FINGER, + system_events=settings.TELEGRAM_NOTIFY_SYSTEM_EVENTS, + playbook=settings.TELEGRAM_NOTIFY_PLAYBOOK, + scenario_match=settings.TELEGRAM_NOTIFY_SCENARIO_MATCH, + errors=settings.TELEGRAM_NOTIFY_ERRORS, + ), ) # Initialize Telegram command handler @@ -1226,7 +1235,11 @@ async def run(settings: Settings) -> None: "/review - Recent scorecards\n" "/dashboard - Dashboard URL/status\n" "/stop - Pause trading\n" - "/resume - Resume trading" + "/resume - Resume trading\n" + "/notify - Show notification filter status\n" + "/notify [key] [on|off] - Toggle notification type\n" + " Keys: trades, market, scenario, playbook,\n" + " system, fatfinger, errors, all" ) await telegram.send_message(message) @@ -1479,6 +1492,63 @@ async def run(settings: Settings) -> None: "⚠️ Error\n\nFailed to retrieve reviews." ) + async def handle_notify(args: list[str]) -> None: + """Handle /notify [key] [on|off] — query or change notification filters.""" + status = telegram.filter_status() + + # /notify — show current state + if not args: + lines = ["🔔 알림 필터 현재 상태\n"] + for key, enabled in status.items(): + icon = "✅" if enabled else "❌" + lines.append(f"{icon} {key}") + lines.append("\n예) /notify scenario off") + lines.append("예) /notify all off") + await telegram.send_message("\n".join(lines)) + return + + # /notify [key] — missing on/off + if len(args) == 1: + key = args[0].lower() + if key == "all": + lines = ["🔔 알림 필터 현재 상태\n"] + for k, enabled in status.items(): + icon = "✅" if enabled else "❌" + lines.append(f"{icon} {k}") + await telegram.send_message("\n".join(lines)) + elif key in status: + icon = "✅" if status[key] else "❌" + await telegram.send_message( + f"🔔 {key}: {icon} {'켜짐' if status[key] else '꺼짐'}\n" + f"/notify {key} on 또는 /notify {key} off" + ) + else: + valid = ", ".join(list(status.keys()) + ["all"]) + await telegram.send_message( + f"❌ 알 수 없는 키: {key}\n" + f"유효한 키: {valid}" + ) + return + + # /notify [key] [on|off] + key, toggle = args[0].lower(), args[1].lower() + if toggle not in ("on", "off"): + await telegram.send_message("❌ on 또는 off 를 입력해 주세요.") + return + value = toggle == "on" + if telegram.set_notification(key, value): + icon = "✅" if value else "❌" + label = f"전체 알림" if key == "all" else f"{key} 알림" + state = "켜짐" if value else "꺼짐" + await telegram.send_message(f"{icon} {label} → {state}") + logger.info("Notification filter changed via Telegram: %s=%s", key, value) + else: + valid = ", ".join(list(telegram.filter_status().keys()) + ["all"]) + await telegram.send_message( + f"❌ 알 수 없는 키: {key}\n" + f"유효한 키: {valid}" + ) + async def handle_dashboard() -> None: """Handle /dashboard command - show dashboard URL if enabled.""" if not settings.DASHBOARD_ENABLED: @@ -1502,6 +1572,7 @@ async def run(settings: Settings) -> None: command_handler.register_command("scenarios", handle_scenarios) command_handler.register_command("review", handle_review) command_handler.register_command("dashboard", handle_dashboard) + command_handler.register_command_with_args("notify", handle_notify) # Initialize volatility hunter volatility_analyzer = VolatilityAnalyzer(min_volume_surge=2.0, min_price_change=1.0) diff --git a/src/notifications/telegram_client.py b/src/notifications/telegram_client.py index 70c6dfc..ee61bce 100644 --- a/src/notifications/telegram_client.py +++ b/src/notifications/telegram_client.py @@ -4,8 +4,9 @@ import asyncio import logging import time from collections.abc import Awaitable, Callable -from dataclasses import dataclass +from dataclasses import dataclass, fields from enum import Enum +from typing import ClassVar import aiohttp @@ -58,6 +59,45 @@ class LeakyBucket: self._tokens -= 1.0 +@dataclass +class NotificationFilter: + """Granular on/off flags for each notification type. + + circuit_breaker is intentionally omitted — it is always sent regardless. + """ + + # Maps user-facing command keys to dataclass field names + KEYS: ClassVar[dict[str, str]] = { + "trades": "trades", + "market": "market_open_close", + "fatfinger": "fat_finger", + "system": "system_events", + "playbook": "playbook", + "scenario": "scenario_match", + "errors": "errors", + } + + trades: bool = True + market_open_close: bool = True + fat_finger: bool = True + system_events: bool = True + playbook: bool = True + scenario_match: bool = True + errors: bool = True + + def set_flag(self, key: str, value: bool) -> bool: + """Set a filter flag by user-facing key. Returns False if key is unknown.""" + field = self.KEYS.get(key.lower()) + if field is None: + return False + setattr(self, field, value) + return True + + def as_dict(self) -> dict[str, bool]: + """Return {user_key: current_value} for display.""" + return {k: getattr(self, field) for k, field in self.KEYS.items()} + + @dataclass class NotificationMessage: """Internal notification message structure.""" @@ -79,6 +119,7 @@ class TelegramClient: chat_id: str | None = None, enabled: bool = True, rate_limit: float = DEFAULT_RATE, + notification_filter: NotificationFilter | None = None, ) -> None: """ Initialize Telegram client. @@ -88,12 +129,14 @@ class TelegramClient: chat_id: Target chat ID (user or group) enabled: Enable/disable notifications globally rate_limit: Maximum messages per second + notification_filter: Granular per-type on/off flags """ self._bot_token = bot_token self._chat_id = chat_id self._enabled = enabled self._rate_limiter = LeakyBucket(rate=rate_limit) self._session: aiohttp.ClientSession | None = None + self._filter = notification_filter if notification_filter is not None else NotificationFilter() if not enabled: logger.info("Telegram notifications disabled via configuration") @@ -118,6 +161,26 @@ class TelegramClient: if self._session is not None and not self._session.closed: await self._session.close() + def set_notification(self, key: str, value: bool) -> bool: + """Toggle a notification type by user-facing key at runtime. + + Args: + key: User-facing key (e.g. "scenario", "market", "all") + value: True to enable, False to disable + + Returns: + True if key was valid, False if unknown. + """ + if key == "all": + for k in NotificationFilter.KEYS: + self._filter.set_flag(k, value) + return True + return self._filter.set_flag(key, value) + + def filter_status(self) -> dict[str, bool]: + """Return current per-type filter state keyed by user-facing names.""" + return self._filter.as_dict() + async def send_message(self, text: str, parse_mode: str = "HTML") -> bool: """ Send a generic text message to Telegram. @@ -193,6 +256,8 @@ class TelegramClient: price: Execution price confidence: AI confidence level (0-100) """ + if not self._filter.trades: + return emoji = "🟢" if action == "BUY" else "🔴" message = ( f"{emoji} {action}\n" @@ -212,6 +277,8 @@ class TelegramClient: Args: market_name: Name of the market (e.g., "Korea", "United States") """ + if not self._filter.market_open_close: + return message = f"Market Open\n{market_name} trading session started" await self._send_notification( NotificationMessage(priority=NotificationPriority.LOW, message=message) @@ -225,6 +292,8 @@ class TelegramClient: market_name: Name of the market pnl_pct: Final P&L percentage for the session """ + if not self._filter.market_open_close: + return pnl_sign = "+" if pnl_pct >= 0 else "" pnl_emoji = "📈" if pnl_pct >= 0 else "📉" message = ( @@ -271,6 +340,8 @@ class TelegramClient: total_cash: Total available cash max_pct: Maximum allowed percentage """ + if not self._filter.fat_finger: + return attempted_pct = (order_amount / total_cash) * 100 if total_cash > 0 else 0 message = ( f"Fat-Finger Protection\n" @@ -293,6 +364,8 @@ class TelegramClient: mode: Trading mode ("paper" or "live") enabled_markets: List of enabled market codes """ + if not self._filter.system_events: + return mode_emoji = "📝" if mode == "paper" else "💰" markets_str = ", ".join(enabled_markets) message = ( @@ -320,6 +393,8 @@ class TelegramClient: scenario_count: Total number of scenarios token_count: Gemini token usage for the playbook """ + if not self._filter.playbook: + return message = ( f"Playbook Generated\n" f"Market: {market}\n" @@ -347,6 +422,8 @@ class TelegramClient: condition_summary: Short summary of the matched condition confidence: Scenario confidence (0-100) """ + if not self._filter.scenario_match: + return message = ( f"Scenario Matched\n" f"Symbol: {stock_code}\n" @@ -366,6 +443,8 @@ class TelegramClient: market: Market code (e.g., "KR", "US") reason: Failure reason summary """ + if not self._filter.playbook: + return message = ( f"Playbook Failed\n" f"Market: {market}\n" @@ -382,6 +461,8 @@ class TelegramClient: Args: reason: Reason for shutdown (e.g., "Normal shutdown", "Circuit breaker") """ + if not self._filter.system_events: + return message = f"System Shutdown\n{reason}" priority = ( NotificationPriority.CRITICAL @@ -403,6 +484,8 @@ class TelegramClient: error_msg: Error message context: Error context (e.g., stock code, market) """ + if not self._filter.errors: + return message = ( f"Error: {error_type}\n" f"Context: {context}\n" @@ -429,6 +512,7 @@ class TelegramCommandHandler: self._client = client self._polling_interval = polling_interval self._commands: dict[str, Callable[[], Awaitable[None]]] = {} + self._commands_with_args: dict[str, Callable[[list[str]], Awaitable[None]]] = {} self._last_update_id = 0 self._polling_task: asyncio.Task[None] | None = None self._running = False @@ -437,7 +521,7 @@ class TelegramCommandHandler: self, command: str, handler: Callable[[], Awaitable[None]] ) -> None: """ - Register a command handler. + Register a command handler (no arguments). Args: command: Command name (without leading slash, e.g., "start") @@ -446,6 +530,19 @@ class TelegramCommandHandler: self._commands[command] = handler logger.debug("Registered command handler: /%s", command) + def register_command_with_args( + self, command: str, handler: Callable[[list[str]], Awaitable[None]] + ) -> None: + """ + Register a command handler that receives trailing arguments. + + Args: + command: Command name (without leading slash, e.g., "notify") + handler: Async function receiving list of argument tokens + """ + self._commands_with_args[command] = handler + logger.debug("Registered command handler (with args): /%s", command) + async def start_polling(self) -> None: """Start long polling for commands.""" if self._running: @@ -566,11 +663,14 @@ class TelegramCommandHandler: # Remove @botname suffix if present (for group chats) command_name = command_parts[0].split("@")[0] - # Execute handler - handler = self._commands.get(command_name) - if handler: + # Execute handler (args-aware handlers take priority) + args_handler = self._commands_with_args.get(command_name) + if args_handler: + logger.info("Executing command: /%s %s", command_name, command_parts[1:]) + await args_handler(command_parts[1:]) + elif command_name in self._commands: logger.info("Executing command: /%s", command_name) - await handler() + await self._commands[command_name]() else: logger.debug("Unknown command: /%s", command_name) await self._client.send_message( diff --git a/tests/test_telegram.py b/tests/test_telegram.py index 4aae621..606b4e7 100644 --- a/tests/test_telegram.py +++ b/tests/test_telegram.py @@ -5,7 +5,7 @@ from unittest.mock import AsyncMock, patch import aiohttp import pytest -from src.notifications.telegram_client import NotificationPriority, TelegramClient +from src.notifications.telegram_client import NotificationFilter, NotificationPriority, TelegramClient class TestTelegramClientInit: @@ -481,3 +481,187 @@ class TestClientCleanup: # Should not raise exception await client.close() + + +class TestNotificationFilter: + """Test granular notification filter behavior.""" + + def test_default_filter_allows_all(self) -> None: + """Default NotificationFilter has all flags enabled.""" + f = NotificationFilter() + assert f.trades is True + assert f.market_open_close is True + assert f.fat_finger is True + assert f.system_events is True + assert f.playbook is True + assert f.scenario_match is True + assert f.errors is True + + def test_client_uses_default_filter_when_none_given(self) -> None: + """TelegramClient creates a default NotificationFilter when none provided.""" + client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True) + assert isinstance(client._filter, NotificationFilter) + assert client._filter.scenario_match is True + + def test_client_stores_provided_filter(self) -> None: + """TelegramClient stores a custom NotificationFilter.""" + nf = NotificationFilter(scenario_match=False, trades=False) + client = TelegramClient( + bot_token="123:abc", chat_id="456", enabled=True, notification_filter=nf + ) + assert client._filter.scenario_match is False + assert client._filter.trades is False + assert client._filter.market_open_close is True # default still True + + @pytest.mark.asyncio + async def test_scenario_match_filtered_does_not_send(self) -> None: + """notify_scenario_matched skips send when scenario_match=False.""" + nf = NotificationFilter(scenario_match=False) + client = TelegramClient( + bot_token="123:abc", chat_id="456", enabled=True, notification_filter=nf + ) + with patch("aiohttp.ClientSession.post") as mock_post: + await client.notify_scenario_matched( + stock_code="005930", action="BUY", condition_summary="rsi<30", confidence=85.0 + ) + mock_post.assert_not_called() + + @pytest.mark.asyncio + async def test_trades_filtered_does_not_send(self) -> None: + """notify_trade_execution skips send when trades=False.""" + nf = NotificationFilter(trades=False) + client = TelegramClient( + bot_token="123:abc", chat_id="456", enabled=True, notification_filter=nf + ) + with patch("aiohttp.ClientSession.post") as mock_post: + await client.notify_trade_execution( + stock_code="005930", market="KR", action="BUY", + quantity=10, price=70000.0, confidence=85.0 + ) + mock_post.assert_not_called() + + @pytest.mark.asyncio + async def test_market_open_close_filtered_does_not_send(self) -> None: + """notify_market_open/close skip send when market_open_close=False.""" + nf = NotificationFilter(market_open_close=False) + client = TelegramClient( + bot_token="123:abc", chat_id="456", enabled=True, notification_filter=nf + ) + with patch("aiohttp.ClientSession.post") as mock_post: + await client.notify_market_open("Korea") + await client.notify_market_close("Korea", pnl_pct=1.5) + mock_post.assert_not_called() + + @pytest.mark.asyncio + async def test_circuit_breaker_always_sends_regardless_of_filter(self) -> None: + """notify_circuit_breaker always sends (no filter flag).""" + nf = NotificationFilter( + trades=False, market_open_close=False, fat_finger=False, + system_events=False, playbook=False, scenario_match=False, errors=False, + ) + client = TelegramClient( + bot_token="123:abc", chat_id="456", enabled=True, notification_filter=nf + ) + mock_resp = AsyncMock() + mock_resp.status = 200 + mock_resp.__aenter__ = AsyncMock(return_value=mock_resp) + mock_resp.__aexit__ = AsyncMock(return_value=False) + + with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post: + await client.notify_circuit_breaker(pnl_pct=-3.5, threshold=-3.0) + assert mock_post.call_count == 1 + + @pytest.mark.asyncio + async def test_errors_filtered_does_not_send(self) -> None: + """notify_error skips send when errors=False.""" + nf = NotificationFilter(errors=False) + client = TelegramClient( + bot_token="123:abc", chat_id="456", enabled=True, notification_filter=nf + ) + with patch("aiohttp.ClientSession.post") as mock_post: + await client.notify_error("TestError", "something went wrong", "KR") + mock_post.assert_not_called() + + @pytest.mark.asyncio + async def test_playbook_filtered_does_not_send(self) -> None: + """notify_playbook_generated/failed skip send when playbook=False.""" + nf = NotificationFilter(playbook=False) + client = TelegramClient( + bot_token="123:abc", chat_id="456", enabled=True, notification_filter=nf + ) + with patch("aiohttp.ClientSession.post") as mock_post: + await client.notify_playbook_generated("KR", 3, 10, 1200) + await client.notify_playbook_failed("KR", "timeout") + mock_post.assert_not_called() + + @pytest.mark.asyncio + async def test_system_events_filtered_does_not_send(self) -> None: + """notify_system_start/shutdown skip send when system_events=False.""" + nf = NotificationFilter(system_events=False) + client = TelegramClient( + bot_token="123:abc", chat_id="456", enabled=True, notification_filter=nf + ) + with patch("aiohttp.ClientSession.post") as mock_post: + await client.notify_system_start("paper", ["KR"]) + await client.notify_system_shutdown("Normal shutdown") + mock_post.assert_not_called() + + def test_set_flag_valid_key(self) -> None: + """set_flag returns True and updates field for a known key.""" + nf = NotificationFilter() + assert nf.set_flag("scenario", False) is True + assert nf.scenario_match is False + + def test_set_flag_invalid_key(self) -> None: + """set_flag returns False for an unknown key.""" + nf = NotificationFilter() + assert nf.set_flag("unknown_key", False) is False + + def test_as_dict_keys_match_KEYS(self) -> None: + """as_dict() returns every key defined in KEYS.""" + nf = NotificationFilter() + d = nf.as_dict() + assert set(d.keys()) == set(NotificationFilter.KEYS.keys()) + + def test_set_notification_valid_key(self) -> None: + """TelegramClient.set_notification toggles filter at runtime.""" + client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True) + assert client._filter.scenario_match is True + assert client.set_notification("scenario", False) is True + assert client._filter.scenario_match is False + + def test_set_notification_all_off(self) -> None: + """set_notification('all', False) disables every filter flag.""" + client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True) + assert client.set_notification("all", False) is True + for v in client.filter_status().values(): + assert v is False + + def test_set_notification_all_on(self) -> None: + """set_notification('all', True) enables every filter flag.""" + client = TelegramClient( + bot_token="123:abc", chat_id="456", enabled=True, + notification_filter=NotificationFilter( + trades=False, market_open_close=False, scenario_match=False, + fat_finger=False, system_events=False, playbook=False, errors=False, + ), + ) + assert client.set_notification("all", True) is True + for v in client.filter_status().values(): + assert v is True + + def test_set_notification_unknown_key(self) -> None: + """set_notification returns False for an unknown key.""" + client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True) + assert client.set_notification("unknown", False) is False + + def test_filter_status_reflects_current_state(self) -> None: + """filter_status() matches the current NotificationFilter state.""" + nf = NotificationFilter(trades=False, scenario_match=False) + client = TelegramClient( + bot_token="123:abc", chat_id="456", enabled=True, notification_filter=nf + ) + status = client.filter_status() + assert status["trades"] is False + assert status["scenario"] is False + assert status["market"] is True diff --git a/tests/test_telegram_commands.py b/tests/test_telegram_commands.py index c0f0b98..bf9b437 100644 --- a/tests/test_telegram_commands.py +++ b/tests/test_telegram_commands.py @@ -875,3 +875,91 @@ class TestGetUpdates: updates = await handler._get_updates() assert updates == [] + + +class TestCommandWithArgs: + """Test register_command_with_args and argument dispatch.""" + + def test_register_command_with_args_stored(self) -> None: + """register_command_with_args stores handler in _commands_with_args.""" + client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True) + handler = TelegramCommandHandler(client) + + async def my_handler(args: list[str]) -> None: + pass + + handler.register_command_with_args("notify", my_handler) + assert "notify" in handler._commands_with_args + assert handler._commands_with_args["notify"] is my_handler + + @pytest.mark.asyncio + async def test_args_handler_receives_arguments(self) -> None: + """Args handler is called with the trailing tokens.""" + client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True) + handler = TelegramCommandHandler(client) + + received: list[list[str]] = [] + + async def capture(args: list[str]) -> None: + received.append(args) + + handler.register_command_with_args("notify", capture) + + update = { + "message": { + "chat": {"id": "456"}, + "text": "/notify scenario off", + } + } + await handler._handle_update(update) + assert received == [["scenario", "off"]] + + @pytest.mark.asyncio + async def test_args_handler_takes_priority_over_no_args_handler(self) -> None: + """When both handlers exist for same command, args handler wins.""" + client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True) + handler = TelegramCommandHandler(client) + + no_args_called = [] + args_called = [] + + async def no_args_handler() -> None: + no_args_called.append(True) + + async def args_handler(args: list[str]) -> None: + args_called.append(args) + + handler.register_command("notify", no_args_handler) + handler.register_command_with_args("notify", args_handler) + + update = { + "message": { + "chat": {"id": "456"}, + "text": "/notify all off", + } + } + await handler._handle_update(update) + assert args_called == [["all", "off"]] + assert no_args_called == [] + + @pytest.mark.asyncio + async def test_args_handler_with_no_trailing_args(self) -> None: + """/notify with no args still dispatches to args handler with empty list.""" + client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True) + handler = TelegramCommandHandler(client) + + received: list[list[str]] = [] + + async def capture(args: list[str]) -> None: + received.append(args) + + handler.register_command_with_args("notify", capture) + + update = { + "message": { + "chat": {"id": "456"}, + "text": "/notify", + } + } + await handler._handle_update(update) + assert received == [[]]