diff --git a/src/config.py b/src/config.py
index 80e3203..1968d05 100644
--- a/src/config.py
+++ b/src/config.py
@@ -93,6 +93,16 @@ class Settings(BaseSettings):
TELEGRAM_COMMANDS_ENABLED: bool = True
TELEGRAM_POLLING_INTERVAL: float = 1.0 # seconds
+ # Telegram notification type filters (granular control)
+ # circuit_breaker is always sent regardless — safety-critical
+ TELEGRAM_NOTIFY_TRADES: bool = True # BUY/SELL execution alerts
+ TELEGRAM_NOTIFY_MARKET_OPEN_CLOSE: bool = True # Market open/close alerts
+ TELEGRAM_NOTIFY_FAT_FINGER: bool = True # Fat-finger rejection alerts
+ TELEGRAM_NOTIFY_SYSTEM_EVENTS: bool = True # System start/shutdown alerts
+ TELEGRAM_NOTIFY_PLAYBOOK: bool = True # Playbook generated/failed alerts
+ TELEGRAM_NOTIFY_SCENARIO_MATCH: bool = True # Scenario matched alerts (most frequent)
+ TELEGRAM_NOTIFY_ERRORS: bool = True # Error alerts
+
# Overseas ranking API (KIS endpoint/TR_ID may vary by account/product)
# Override these from .env if your account uses different specs.
OVERSEAS_RANKING_ENABLED: bool = True
diff --git a/src/main.py b/src/main.py
index 3f78ae9..8e2a805 100644
--- a/src/main.py
+++ b/src/main.py
@@ -41,7 +41,7 @@ from src.evolution.optimizer import EvolutionOptimizer
from src.logging.decision_logger import DecisionLogger
from src.logging_config import setup_logging
from src.markets.schedule import MarketInfo, get_next_market_open, get_open_markets
-from src.notifications.telegram_client import TelegramClient, TelegramCommandHandler
+from src.notifications.telegram_client import NotificationFilter, TelegramClient, TelegramCommandHandler
from src.strategy.models import DayPlaybook
from src.strategy.playbook_store import PlaybookStore
from src.strategy.pre_market_planner import PreMarketPlanner
@@ -1208,6 +1208,15 @@ async def run(settings: Settings) -> None:
bot_token=settings.TELEGRAM_BOT_TOKEN,
chat_id=settings.TELEGRAM_CHAT_ID,
enabled=settings.TELEGRAM_ENABLED,
+ notification_filter=NotificationFilter(
+ trades=settings.TELEGRAM_NOTIFY_TRADES,
+ market_open_close=settings.TELEGRAM_NOTIFY_MARKET_OPEN_CLOSE,
+ fat_finger=settings.TELEGRAM_NOTIFY_FAT_FINGER,
+ system_events=settings.TELEGRAM_NOTIFY_SYSTEM_EVENTS,
+ playbook=settings.TELEGRAM_NOTIFY_PLAYBOOK,
+ scenario_match=settings.TELEGRAM_NOTIFY_SCENARIO_MATCH,
+ errors=settings.TELEGRAM_NOTIFY_ERRORS,
+ ),
)
# Initialize Telegram command handler
diff --git a/src/notifications/telegram_client.py b/src/notifications/telegram_client.py
index 70c6dfc..bbfff00 100644
--- a/src/notifications/telegram_client.py
+++ b/src/notifications/telegram_client.py
@@ -58,6 +58,22 @@ class LeakyBucket:
self._tokens -= 1.0
+@dataclass
+class NotificationFilter:
+ """Granular on/off flags for each notification type.
+
+ circuit_breaker is intentionally omitted — it is always sent regardless.
+ """
+
+ trades: bool = True
+ market_open_close: bool = True
+ fat_finger: bool = True
+ system_events: bool = True
+ playbook: bool = True
+ scenario_match: bool = True
+ errors: bool = True
+
+
@dataclass
class NotificationMessage:
"""Internal notification message structure."""
@@ -79,6 +95,7 @@ class TelegramClient:
chat_id: str | None = None,
enabled: bool = True,
rate_limit: float = DEFAULT_RATE,
+ notification_filter: NotificationFilter | None = None,
) -> None:
"""
Initialize Telegram client.
@@ -88,12 +105,14 @@ class TelegramClient:
chat_id: Target chat ID (user or group)
enabled: Enable/disable notifications globally
rate_limit: Maximum messages per second
+ notification_filter: Granular per-type on/off flags
"""
self._bot_token = bot_token
self._chat_id = chat_id
self._enabled = enabled
self._rate_limiter = LeakyBucket(rate=rate_limit)
self._session: aiohttp.ClientSession | None = None
+ self._filter = notification_filter if notification_filter is not None else NotificationFilter()
if not enabled:
logger.info("Telegram notifications disabled via configuration")
@@ -193,6 +212,8 @@ class TelegramClient:
price: Execution price
confidence: AI confidence level (0-100)
"""
+ if not self._filter.trades:
+ return
emoji = "🟢" if action == "BUY" else "🔴"
message = (
f"{emoji} {action}\n"
@@ -212,6 +233,8 @@ class TelegramClient:
Args:
market_name: Name of the market (e.g., "Korea", "United States")
"""
+ if not self._filter.market_open_close:
+ return
message = f"Market Open\n{market_name} trading session started"
await self._send_notification(
NotificationMessage(priority=NotificationPriority.LOW, message=message)
@@ -225,6 +248,8 @@ class TelegramClient:
market_name: Name of the market
pnl_pct: Final P&L percentage for the session
"""
+ if not self._filter.market_open_close:
+ return
pnl_sign = "+" if pnl_pct >= 0 else ""
pnl_emoji = "📈" if pnl_pct >= 0 else "📉"
message = (
@@ -271,6 +296,8 @@ class TelegramClient:
total_cash: Total available cash
max_pct: Maximum allowed percentage
"""
+ if not self._filter.fat_finger:
+ return
attempted_pct = (order_amount / total_cash) * 100 if total_cash > 0 else 0
message = (
f"Fat-Finger Protection\n"
@@ -293,6 +320,8 @@ class TelegramClient:
mode: Trading mode ("paper" or "live")
enabled_markets: List of enabled market codes
"""
+ if not self._filter.system_events:
+ return
mode_emoji = "📝" if mode == "paper" else "💰"
markets_str = ", ".join(enabled_markets)
message = (
@@ -320,6 +349,8 @@ class TelegramClient:
scenario_count: Total number of scenarios
token_count: Gemini token usage for the playbook
"""
+ if not self._filter.playbook:
+ return
message = (
f"Playbook Generated\n"
f"Market: {market}\n"
@@ -347,6 +378,8 @@ class TelegramClient:
condition_summary: Short summary of the matched condition
confidence: Scenario confidence (0-100)
"""
+ if not self._filter.scenario_match:
+ return
message = (
f"Scenario Matched\n"
f"Symbol: {stock_code}\n"
@@ -366,6 +399,8 @@ class TelegramClient:
market: Market code (e.g., "KR", "US")
reason: Failure reason summary
"""
+ if not self._filter.playbook:
+ return
message = (
f"Playbook Failed\n"
f"Market: {market}\n"
@@ -382,6 +417,8 @@ class TelegramClient:
Args:
reason: Reason for shutdown (e.g., "Normal shutdown", "Circuit breaker")
"""
+ if not self._filter.system_events:
+ return
message = f"System Shutdown\n{reason}"
priority = (
NotificationPriority.CRITICAL
@@ -403,6 +440,8 @@ class TelegramClient:
error_msg: Error message
context: Error context (e.g., stock code, market)
"""
+ if not self._filter.errors:
+ return
message = (
f"Error: {error_type}\n"
f"Context: {context}\n"
diff --git a/tests/test_telegram.py b/tests/test_telegram.py
index 4aae621..46ef8cb 100644
--- a/tests/test_telegram.py
+++ b/tests/test_telegram.py
@@ -5,7 +5,7 @@ from unittest.mock import AsyncMock, patch
import aiohttp
import pytest
-from src.notifications.telegram_client import NotificationPriority, TelegramClient
+from src.notifications.telegram_client import NotificationFilter, NotificationPriority, TelegramClient
class TestTelegramClientInit:
@@ -481,3 +481,127 @@ class TestClientCleanup:
# Should not raise exception
await client.close()
+
+
+class TestNotificationFilter:
+ """Test granular notification filter behavior."""
+
+ def test_default_filter_allows_all(self) -> None:
+ """Default NotificationFilter has all flags enabled."""
+ f = NotificationFilter()
+ assert f.trades is True
+ assert f.market_open_close is True
+ assert f.fat_finger is True
+ assert f.system_events is True
+ assert f.playbook is True
+ assert f.scenario_match is True
+ assert f.errors is True
+
+ def test_client_uses_default_filter_when_none_given(self) -> None:
+ """TelegramClient creates a default NotificationFilter when none provided."""
+ client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
+ assert isinstance(client._filter, NotificationFilter)
+ assert client._filter.scenario_match is True
+
+ def test_client_stores_provided_filter(self) -> None:
+ """TelegramClient stores a custom NotificationFilter."""
+ nf = NotificationFilter(scenario_match=False, trades=False)
+ client = TelegramClient(
+ bot_token="123:abc", chat_id="456", enabled=True, notification_filter=nf
+ )
+ assert client._filter.scenario_match is False
+ assert client._filter.trades is False
+ assert client._filter.market_open_close is True # default still True
+
+ @pytest.mark.asyncio
+ async def test_scenario_match_filtered_does_not_send(self) -> None:
+ """notify_scenario_matched skips send when scenario_match=False."""
+ nf = NotificationFilter(scenario_match=False)
+ client = TelegramClient(
+ bot_token="123:abc", chat_id="456", enabled=True, notification_filter=nf
+ )
+ with patch("aiohttp.ClientSession.post") as mock_post:
+ await client.notify_scenario_matched(
+ stock_code="005930", action="BUY", condition_summary="rsi<30", confidence=85.0
+ )
+ mock_post.assert_not_called()
+
+ @pytest.mark.asyncio
+ async def test_trades_filtered_does_not_send(self) -> None:
+ """notify_trade_execution skips send when trades=False."""
+ nf = NotificationFilter(trades=False)
+ client = TelegramClient(
+ bot_token="123:abc", chat_id="456", enabled=True, notification_filter=nf
+ )
+ with patch("aiohttp.ClientSession.post") as mock_post:
+ await client.notify_trade_execution(
+ stock_code="005930", market="KR", action="BUY",
+ quantity=10, price=70000.0, confidence=85.0
+ )
+ mock_post.assert_not_called()
+
+ @pytest.mark.asyncio
+ async def test_market_open_close_filtered_does_not_send(self) -> None:
+ """notify_market_open/close skip send when market_open_close=False."""
+ nf = NotificationFilter(market_open_close=False)
+ client = TelegramClient(
+ bot_token="123:abc", chat_id="456", enabled=True, notification_filter=nf
+ )
+ with patch("aiohttp.ClientSession.post") as mock_post:
+ await client.notify_market_open("Korea")
+ await client.notify_market_close("Korea", pnl_pct=1.5)
+ mock_post.assert_not_called()
+
+ @pytest.mark.asyncio
+ async def test_circuit_breaker_always_sends_regardless_of_filter(self) -> None:
+ """notify_circuit_breaker always sends (no filter flag)."""
+ nf = NotificationFilter(
+ trades=False, market_open_close=False, fat_finger=False,
+ system_events=False, playbook=False, scenario_match=False, errors=False,
+ )
+ client = TelegramClient(
+ bot_token="123:abc", chat_id="456", enabled=True, notification_filter=nf
+ )
+ mock_resp = AsyncMock()
+ mock_resp.status = 200
+ mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
+ mock_resp.__aexit__ = AsyncMock(return_value=False)
+
+ with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
+ await client.notify_circuit_breaker(pnl_pct=-3.5, threshold=-3.0)
+ assert mock_post.call_count == 1
+
+ @pytest.mark.asyncio
+ async def test_errors_filtered_does_not_send(self) -> None:
+ """notify_error skips send when errors=False."""
+ nf = NotificationFilter(errors=False)
+ client = TelegramClient(
+ bot_token="123:abc", chat_id="456", enabled=True, notification_filter=nf
+ )
+ with patch("aiohttp.ClientSession.post") as mock_post:
+ await client.notify_error("TestError", "something went wrong", "KR")
+ mock_post.assert_not_called()
+
+ @pytest.mark.asyncio
+ async def test_playbook_filtered_does_not_send(self) -> None:
+ """notify_playbook_generated/failed skip send when playbook=False."""
+ nf = NotificationFilter(playbook=False)
+ client = TelegramClient(
+ bot_token="123:abc", chat_id="456", enabled=True, notification_filter=nf
+ )
+ with patch("aiohttp.ClientSession.post") as mock_post:
+ await client.notify_playbook_generated("KR", 3, 10, 1200)
+ await client.notify_playbook_failed("KR", "timeout")
+ mock_post.assert_not_called()
+
+ @pytest.mark.asyncio
+ async def test_system_events_filtered_does_not_send(self) -> None:
+ """notify_system_start/shutdown skip send when system_events=False."""
+ nf = NotificationFilter(system_events=False)
+ client = TelegramClient(
+ bot_token="123:abc", chat_id="456", enabled=True, notification_filter=nf
+ )
+ with patch("aiohttp.ClientSession.post") as mock_post:
+ await client.notify_system_start("paper", ["KR"])
+ await client.notify_system_shutdown("Normal shutdown")
+ mock_post.assert_not_called()