Compare commits

..

5 Commits

Author SHA1 Message Date
agentson
d19e5b0de6 feat: include current holdings in realtime trading loop for exit evaluation (#165)
Some checks failed
CI / test (pull_request) Has been cancelled
스캐너 후보 종목뿐 아니라 현재 보유 종목도 매 사이클마다 평가해
stop-loss / take-profit이 실제로 동작하도록 개선.

- db.py: get_open_positions_by_market() 추가
  - net BUY - SELL 집계 쿼리로 실제 보유 종목 코드 목록 반환
  - 단순 "최신 레코드 = BUY" 방식보다 안전 (이중 매도 방지)
- main.py: 실시간 루프에서 스캐너 후보 + 보유 종목을 union으로 구성
  - dict.fromkeys로 순서 유지하며 중복 제거
  - 스캐너에 없는 보유 종목은 로그로 명시
  - 보유 종목은 Playbook 없으면 HOLD → stop-loss/take-profit 체크
- tests/test_db.py: get_open_positions_by_market 테스트 5개 추가
  - net 양수 종목 포함, 전량 매도 제외, 부분 매도 포함
  - 마켓 범위 격리, 거래 없을 때 빈 리스트

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-20 03:08:49 +09:00
ff5ff736d8 Merge pull request 'feat: granular Telegram notification filters via .env (#161)' (#162) from feature/issue-161-telegram-notification-filters into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #162
2026-02-20 02:33:56 +09:00
agentson
4a59d7e66d feat: /notify command for runtime notification filter control (#161)
Some checks failed
CI / test (pull_request) Has been cancelled
Add /notify Telegram command for adjusting notification filters at runtime
without restarting the service:

  /notify                  → show current filter state
  /notify scenario off     → disable scenario match alerts
  /notify market off       → disable market open/close alerts
  /notify all off          → disable all (circuit_breaker always on)
  /notify trades on        → re-enable trade execution alerts

Changes:
- NotificationFilter: add KEYS class var, set_flag(), as_dict()
- TelegramClient: add set_notification(), filter_status()
- TelegramCommandHandler: add register_command_with_args() + args dispatch
- main.py: handle_notify() handler + register /notify command + /help update
- Tests: 12 new tests (set_flag, set_notification, register_command_with_args)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-20 02:33:03 +09:00
agentson
8dd625bfd1 feat: granular Telegram notification filters via .env (#161)
Some checks failed
CI / test (pull_request) Has been cancelled
Add NotificationFilter dataclass to TelegramClient allowing per-type
on/off control via .env variables. circuit_breaker always sends regardless.

New .env options (all default true):
- TELEGRAM_NOTIFY_TRADES
- TELEGRAM_NOTIFY_MARKET_OPEN_CLOSE
- TELEGRAM_NOTIFY_FAT_FINGER
- TELEGRAM_NOTIFY_SYSTEM_EVENTS
- TELEGRAM_NOTIFY_PLAYBOOK
- TELEGRAM_NOTIFY_SCENARIO_MATCH  (most frequent — set false to reduce noise)
- TELEGRAM_NOTIFY_ERRORS

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-20 02:26:28 +09:00
b50977aa76 Merge pull request 'feat: improve dashboard UI with P&L chart and decisions log (#159)' (#160) from feature/issue-159-dashboard-ui-improvement into main
Some checks failed
CI / test (push) Has been cancelled
Reviewed-on: #160
2026-02-20 02:20:12 +09:00
7 changed files with 585 additions and 11 deletions

View File

@@ -93,6 +93,16 @@ class Settings(BaseSettings):
TELEGRAM_COMMANDS_ENABLED: bool = True TELEGRAM_COMMANDS_ENABLED: bool = True
TELEGRAM_POLLING_INTERVAL: float = 1.0 # seconds TELEGRAM_POLLING_INTERVAL: float = 1.0 # seconds
# Telegram notification type filters (granular control)
# circuit_breaker is always sent regardless — safety-critical
TELEGRAM_NOTIFY_TRADES: bool = True # BUY/SELL execution alerts
TELEGRAM_NOTIFY_MARKET_OPEN_CLOSE: bool = True # Market open/close alerts
TELEGRAM_NOTIFY_FAT_FINGER: bool = True # Fat-finger rejection alerts
TELEGRAM_NOTIFY_SYSTEM_EVENTS: bool = True # System start/shutdown alerts
TELEGRAM_NOTIFY_PLAYBOOK: bool = True # Playbook generated/failed alerts
TELEGRAM_NOTIFY_SCENARIO_MATCH: bool = True # Scenario matched alerts (most frequent)
TELEGRAM_NOTIFY_ERRORS: bool = True # Error alerts
# Overseas ranking API (KIS endpoint/TR_ID may vary by account/product) # Overseas ranking API (KIS endpoint/TR_ID may vary by account/product)
# Override these from .env if your account uses different specs. # Override these from .env if your account uses different specs.
OVERSEAS_RANKING_ENABLED: bool = True OVERSEAS_RANKING_ENABLED: bool = True

View File

@@ -237,6 +237,28 @@ def get_open_position(
return {"decision_id": row[1], "price": row[2], "quantity": row[3]} return {"decision_id": row[1], "price": row[2], "quantity": row[3]}
def get_open_positions_by_market(
conn: sqlite3.Connection, market: str
) -> list[str]:
"""Return stock codes with a net positive position in the given market.
Uses net BUY - SELL quantity aggregation to avoid false positives from
the simpler "latest record is BUY" heuristic. A stock is considered
open only when the bot's own recorded trades leave a positive net quantity.
"""
cursor = conn.execute(
"""
SELECT stock_code
FROM trades
WHERE market = ?
GROUP BY stock_code
HAVING SUM(CASE WHEN action = 'BUY' THEN quantity ELSE -quantity END) > 0
""",
(market,),
)
return [row[0] for row in cursor.fetchall()]
def get_recent_symbols( def get_recent_symbols(
conn: sqlite3.Connection, market: str, limit: int = 30 conn: sqlite3.Connection, market: str, limit: int = 30
) -> list[str]: ) -> list[str]:

View File

@@ -32,6 +32,7 @@ from src.core.risk_manager import CircuitBreakerTripped, FatFingerRejected, Risk
from src.db import ( from src.db import (
get_latest_buy_trade, get_latest_buy_trade,
get_open_position, get_open_position,
get_open_positions_by_market,
get_recent_symbols, get_recent_symbols,
init_db, init_db,
log_trade, log_trade,
@@ -41,7 +42,7 @@ from src.evolution.optimizer import EvolutionOptimizer
from src.logging.decision_logger import DecisionLogger from src.logging.decision_logger import DecisionLogger
from src.logging_config import setup_logging from src.logging_config import setup_logging
from src.markets.schedule import MarketInfo, get_next_market_open, get_open_markets from src.markets.schedule import MarketInfo, get_next_market_open, get_open_markets
from src.notifications.telegram_client import TelegramClient, TelegramCommandHandler from src.notifications.telegram_client import NotificationFilter, TelegramClient, TelegramCommandHandler
from src.strategy.models import DayPlaybook from src.strategy.models import DayPlaybook
from src.strategy.playbook_store import PlaybookStore from src.strategy.playbook_store import PlaybookStore
from src.strategy.pre_market_planner import PreMarketPlanner from src.strategy.pre_market_planner import PreMarketPlanner
@@ -1208,6 +1209,15 @@ async def run(settings: Settings) -> None:
bot_token=settings.TELEGRAM_BOT_TOKEN, bot_token=settings.TELEGRAM_BOT_TOKEN,
chat_id=settings.TELEGRAM_CHAT_ID, chat_id=settings.TELEGRAM_CHAT_ID,
enabled=settings.TELEGRAM_ENABLED, enabled=settings.TELEGRAM_ENABLED,
notification_filter=NotificationFilter(
trades=settings.TELEGRAM_NOTIFY_TRADES,
market_open_close=settings.TELEGRAM_NOTIFY_MARKET_OPEN_CLOSE,
fat_finger=settings.TELEGRAM_NOTIFY_FAT_FINGER,
system_events=settings.TELEGRAM_NOTIFY_SYSTEM_EVENTS,
playbook=settings.TELEGRAM_NOTIFY_PLAYBOOK,
scenario_match=settings.TELEGRAM_NOTIFY_SCENARIO_MATCH,
errors=settings.TELEGRAM_NOTIFY_ERRORS,
),
) )
# Initialize Telegram command handler # Initialize Telegram command handler
@@ -1226,7 +1236,11 @@ async def run(settings: Settings) -> None:
"/review - Recent scorecards\n" "/review - Recent scorecards\n"
"/dashboard - Dashboard URL/status\n" "/dashboard - Dashboard URL/status\n"
"/stop - Pause trading\n" "/stop - Pause trading\n"
"/resume - Resume trading" "/resume - Resume trading\n"
"/notify - Show notification filter status\n"
"/notify [key] [on|off] - Toggle notification type\n"
" Keys: trades, market, scenario, playbook,\n"
" system, fatfinger, errors, all"
) )
await telegram.send_message(message) await telegram.send_message(message)
@@ -1479,6 +1493,63 @@ async def run(settings: Settings) -> None:
"<b>⚠️ Error</b>\n\nFailed to retrieve reviews." "<b>⚠️ Error</b>\n\nFailed to retrieve reviews."
) )
async def handle_notify(args: list[str]) -> None:
"""Handle /notify [key] [on|off] — query or change notification filters."""
status = telegram.filter_status()
# /notify — show current state
if not args:
lines = ["<b>🔔 알림 필터 현재 상태</b>\n"]
for key, enabled in status.items():
icon = "" if enabled else ""
lines.append(f"{icon} <code>{key}</code>")
lines.append("\n<i>예) /notify scenario off</i>")
lines.append("<i>예) /notify all off</i>")
await telegram.send_message("\n".join(lines))
return
# /notify [key] — missing on/off
if len(args) == 1:
key = args[0].lower()
if key == "all":
lines = ["<b>🔔 알림 필터 현재 상태</b>\n"]
for k, enabled in status.items():
icon = "" if enabled else ""
lines.append(f"{icon} <code>{k}</code>")
await telegram.send_message("\n".join(lines))
elif key in status:
icon = "" if status[key] else ""
await telegram.send_message(
f"<b>🔔 {key}</b>: {icon} {'켜짐' if status[key] else '꺼짐'}\n"
f"<i>/notify {key} on 또는 /notify {key} off</i>"
)
else:
valid = ", ".join(list(status.keys()) + ["all"])
await telegram.send_message(
f"❌ 알 수 없는 키: <code>{key}</code>\n"
f"유효한 키: {valid}"
)
return
# /notify [key] [on|off]
key, toggle = args[0].lower(), args[1].lower()
if toggle not in ("on", "off"):
await telegram.send_message("❌ on 또는 off 를 입력해 주세요.")
return
value = toggle == "on"
if telegram.set_notification(key, value):
icon = "" if value else ""
label = f"전체 알림" if key == "all" else f"<code>{key}</code> 알림"
state = "켜짐" if value else "꺼짐"
await telegram.send_message(f"{icon} {label}{state}")
logger.info("Notification filter changed via Telegram: %s=%s", key, value)
else:
valid = ", ".join(list(telegram.filter_status().keys()) + ["all"])
await telegram.send_message(
f"❌ 알 수 없는 키: <code>{key}</code>\n"
f"유효한 키: {valid}"
)
async def handle_dashboard() -> None: async def handle_dashboard() -> None:
"""Handle /dashboard command - show dashboard URL if enabled.""" """Handle /dashboard command - show dashboard URL if enabled."""
if not settings.DASHBOARD_ENABLED: if not settings.DASHBOARD_ENABLED:
@@ -1502,6 +1573,7 @@ async def run(settings: Settings) -> None:
command_handler.register_command("scenarios", handle_scenarios) command_handler.register_command("scenarios", handle_scenarios)
command_handler.register_command("review", handle_review) command_handler.register_command("review", handle_review)
command_handler.register_command("dashboard", handle_dashboard) command_handler.register_command("dashboard", handle_dashboard)
command_handler.register_command_with_args("notify", handle_notify)
# Initialize volatility hunter # Initialize volatility hunter
volatility_analyzer = VolatilityAnalyzer(min_volume_surge=2.0, min_price_change=1.0) volatility_analyzer = VolatilityAnalyzer(min_volume_surge=2.0, min_price_change=1.0)
@@ -1793,7 +1865,21 @@ async def run(settings: Settings) -> None:
logger.error("Smart Scanner failed for %s: %s", market.name, exc) logger.error("Smart Scanner failed for %s: %s", market.name, exc)
# Get active stocks from scanner (dynamic, no static fallback) # Get active stocks from scanner (dynamic, no static fallback)
stock_codes = active_stocks.get(market.code, []) # Also include current holdings so stop-loss / take-profit
# can trigger even when a position drops off the scanner.
scanner_codes = active_stocks.get(market.code, [])
held_codes = get_open_positions_by_market(db_conn, market.code)
# Union: scanner candidates first, then holdings not already present.
# dict.fromkeys preserves insertion order and removes duplicates.
stock_codes = list(dict.fromkeys(scanner_codes + held_codes))
if held_codes:
new_held = [c for c in held_codes if c not in set(scanner_codes)]
if new_held:
logger.info(
"Holdings added to loop for %s (not in scanner): %s",
market.name,
new_held,
)
if not stock_codes: if not stock_codes:
logger.debug("No active stocks for market %s", market.code) logger.debug("No active stocks for market %s", market.code)
continue continue

View File

@@ -4,8 +4,9 @@ import asyncio
import logging import logging
import time import time
from collections.abc import Awaitable, Callable from collections.abc import Awaitable, Callable
from dataclasses import dataclass from dataclasses import dataclass, fields
from enum import Enum from enum import Enum
from typing import ClassVar
import aiohttp import aiohttp
@@ -58,6 +59,45 @@ class LeakyBucket:
self._tokens -= 1.0 self._tokens -= 1.0
@dataclass
class NotificationFilter:
"""Granular on/off flags for each notification type.
circuit_breaker is intentionally omitted — it is always sent regardless.
"""
# Maps user-facing command keys to dataclass field names
KEYS: ClassVar[dict[str, str]] = {
"trades": "trades",
"market": "market_open_close",
"fatfinger": "fat_finger",
"system": "system_events",
"playbook": "playbook",
"scenario": "scenario_match",
"errors": "errors",
}
trades: bool = True
market_open_close: bool = True
fat_finger: bool = True
system_events: bool = True
playbook: bool = True
scenario_match: bool = True
errors: bool = True
def set_flag(self, key: str, value: bool) -> bool:
"""Set a filter flag by user-facing key. Returns False if key is unknown."""
field = self.KEYS.get(key.lower())
if field is None:
return False
setattr(self, field, value)
return True
def as_dict(self) -> dict[str, bool]:
"""Return {user_key: current_value} for display."""
return {k: getattr(self, field) for k, field in self.KEYS.items()}
@dataclass @dataclass
class NotificationMessage: class NotificationMessage:
"""Internal notification message structure.""" """Internal notification message structure."""
@@ -79,6 +119,7 @@ class TelegramClient:
chat_id: str | None = None, chat_id: str | None = None,
enabled: bool = True, enabled: bool = True,
rate_limit: float = DEFAULT_RATE, rate_limit: float = DEFAULT_RATE,
notification_filter: NotificationFilter | None = None,
) -> None: ) -> None:
""" """
Initialize Telegram client. Initialize Telegram client.
@@ -88,12 +129,14 @@ class TelegramClient:
chat_id: Target chat ID (user or group) chat_id: Target chat ID (user or group)
enabled: Enable/disable notifications globally enabled: Enable/disable notifications globally
rate_limit: Maximum messages per second rate_limit: Maximum messages per second
notification_filter: Granular per-type on/off flags
""" """
self._bot_token = bot_token self._bot_token = bot_token
self._chat_id = chat_id self._chat_id = chat_id
self._enabled = enabled self._enabled = enabled
self._rate_limiter = LeakyBucket(rate=rate_limit) self._rate_limiter = LeakyBucket(rate=rate_limit)
self._session: aiohttp.ClientSession | None = None self._session: aiohttp.ClientSession | None = None
self._filter = notification_filter if notification_filter is not None else NotificationFilter()
if not enabled: if not enabled:
logger.info("Telegram notifications disabled via configuration") logger.info("Telegram notifications disabled via configuration")
@@ -118,6 +161,26 @@ class TelegramClient:
if self._session is not None and not self._session.closed: if self._session is not None and not self._session.closed:
await self._session.close() await self._session.close()
def set_notification(self, key: str, value: bool) -> bool:
"""Toggle a notification type by user-facing key at runtime.
Args:
key: User-facing key (e.g. "scenario", "market", "all")
value: True to enable, False to disable
Returns:
True if key was valid, False if unknown.
"""
if key == "all":
for k in NotificationFilter.KEYS:
self._filter.set_flag(k, value)
return True
return self._filter.set_flag(key, value)
def filter_status(self) -> dict[str, bool]:
"""Return current per-type filter state keyed by user-facing names."""
return self._filter.as_dict()
async def send_message(self, text: str, parse_mode: str = "HTML") -> bool: async def send_message(self, text: str, parse_mode: str = "HTML") -> bool:
""" """
Send a generic text message to Telegram. Send a generic text message to Telegram.
@@ -193,6 +256,8 @@ class TelegramClient:
price: Execution price price: Execution price
confidence: AI confidence level (0-100) confidence: AI confidence level (0-100)
""" """
if not self._filter.trades:
return
emoji = "🟢" if action == "BUY" else "🔴" emoji = "🟢" if action == "BUY" else "🔴"
message = ( message = (
f"<b>{emoji} {action}</b>\n" f"<b>{emoji} {action}</b>\n"
@@ -212,6 +277,8 @@ class TelegramClient:
Args: Args:
market_name: Name of the market (e.g., "Korea", "United States") market_name: Name of the market (e.g., "Korea", "United States")
""" """
if not self._filter.market_open_close:
return
message = f"<b>Market Open</b>\n{market_name} trading session started" message = f"<b>Market Open</b>\n{market_name} trading session started"
await self._send_notification( await self._send_notification(
NotificationMessage(priority=NotificationPriority.LOW, message=message) NotificationMessage(priority=NotificationPriority.LOW, message=message)
@@ -225,6 +292,8 @@ class TelegramClient:
market_name: Name of the market market_name: Name of the market
pnl_pct: Final P&L percentage for the session pnl_pct: Final P&L percentage for the session
""" """
if not self._filter.market_open_close:
return
pnl_sign = "+" if pnl_pct >= 0 else "" pnl_sign = "+" if pnl_pct >= 0 else ""
pnl_emoji = "📈" if pnl_pct >= 0 else "📉" pnl_emoji = "📈" if pnl_pct >= 0 else "📉"
message = ( message = (
@@ -271,6 +340,8 @@ class TelegramClient:
total_cash: Total available cash total_cash: Total available cash
max_pct: Maximum allowed percentage max_pct: Maximum allowed percentage
""" """
if not self._filter.fat_finger:
return
attempted_pct = (order_amount / total_cash) * 100 if total_cash > 0 else 0 attempted_pct = (order_amount / total_cash) * 100 if total_cash > 0 else 0
message = ( message = (
f"<b>Fat-Finger Protection</b>\n" f"<b>Fat-Finger Protection</b>\n"
@@ -293,6 +364,8 @@ class TelegramClient:
mode: Trading mode ("paper" or "live") mode: Trading mode ("paper" or "live")
enabled_markets: List of enabled market codes enabled_markets: List of enabled market codes
""" """
if not self._filter.system_events:
return
mode_emoji = "📝" if mode == "paper" else "💰" mode_emoji = "📝" if mode == "paper" else "💰"
markets_str = ", ".join(enabled_markets) markets_str = ", ".join(enabled_markets)
message = ( message = (
@@ -320,6 +393,8 @@ class TelegramClient:
scenario_count: Total number of scenarios scenario_count: Total number of scenarios
token_count: Gemini token usage for the playbook token_count: Gemini token usage for the playbook
""" """
if not self._filter.playbook:
return
message = ( message = (
f"<b>Playbook Generated</b>\n" f"<b>Playbook Generated</b>\n"
f"Market: {market}\n" f"Market: {market}\n"
@@ -347,6 +422,8 @@ class TelegramClient:
condition_summary: Short summary of the matched condition condition_summary: Short summary of the matched condition
confidence: Scenario confidence (0-100) confidence: Scenario confidence (0-100)
""" """
if not self._filter.scenario_match:
return
message = ( message = (
f"<b>Scenario Matched</b>\n" f"<b>Scenario Matched</b>\n"
f"Symbol: <code>{stock_code}</code>\n" f"Symbol: <code>{stock_code}</code>\n"
@@ -366,6 +443,8 @@ class TelegramClient:
market: Market code (e.g., "KR", "US") market: Market code (e.g., "KR", "US")
reason: Failure reason summary reason: Failure reason summary
""" """
if not self._filter.playbook:
return
message = ( message = (
f"<b>Playbook Failed</b>\n" f"<b>Playbook Failed</b>\n"
f"Market: {market}\n" f"Market: {market}\n"
@@ -382,6 +461,8 @@ class TelegramClient:
Args: Args:
reason: Reason for shutdown (e.g., "Normal shutdown", "Circuit breaker") reason: Reason for shutdown (e.g., "Normal shutdown", "Circuit breaker")
""" """
if not self._filter.system_events:
return
message = f"<b>System Shutdown</b>\n{reason}" message = f"<b>System Shutdown</b>\n{reason}"
priority = ( priority = (
NotificationPriority.CRITICAL NotificationPriority.CRITICAL
@@ -403,6 +484,8 @@ class TelegramClient:
error_msg: Error message error_msg: Error message
context: Error context (e.g., stock code, market) context: Error context (e.g., stock code, market)
""" """
if not self._filter.errors:
return
message = ( message = (
f"<b>Error: {error_type}</b>\n" f"<b>Error: {error_type}</b>\n"
f"Context: {context}\n" f"Context: {context}\n"
@@ -429,6 +512,7 @@ class TelegramCommandHandler:
self._client = client self._client = client
self._polling_interval = polling_interval self._polling_interval = polling_interval
self._commands: dict[str, Callable[[], Awaitable[None]]] = {} self._commands: dict[str, Callable[[], Awaitable[None]]] = {}
self._commands_with_args: dict[str, Callable[[list[str]], Awaitable[None]]] = {}
self._last_update_id = 0 self._last_update_id = 0
self._polling_task: asyncio.Task[None] | None = None self._polling_task: asyncio.Task[None] | None = None
self._running = False self._running = False
@@ -437,7 +521,7 @@ class TelegramCommandHandler:
self, command: str, handler: Callable[[], Awaitable[None]] self, command: str, handler: Callable[[], Awaitable[None]]
) -> None: ) -> None:
""" """
Register a command handler. Register a command handler (no arguments).
Args: Args:
command: Command name (without leading slash, e.g., "start") command: Command name (without leading slash, e.g., "start")
@@ -446,6 +530,19 @@ class TelegramCommandHandler:
self._commands[command] = handler self._commands[command] = handler
logger.debug("Registered command handler: /%s", command) logger.debug("Registered command handler: /%s", command)
def register_command_with_args(
self, command: str, handler: Callable[[list[str]], Awaitable[None]]
) -> None:
"""
Register a command handler that receives trailing arguments.
Args:
command: Command name (without leading slash, e.g., "notify")
handler: Async function receiving list of argument tokens
"""
self._commands_with_args[command] = handler
logger.debug("Registered command handler (with args): /%s", command)
async def start_polling(self) -> None: async def start_polling(self) -> None:
"""Start long polling for commands.""" """Start long polling for commands."""
if self._running: if self._running:
@@ -566,11 +663,14 @@ class TelegramCommandHandler:
# Remove @botname suffix if present (for group chats) # Remove @botname suffix if present (for group chats)
command_name = command_parts[0].split("@")[0] command_name = command_parts[0].split("@")[0]
# Execute handler # Execute handler (args-aware handlers take priority)
handler = self._commands.get(command_name) args_handler = self._commands_with_args.get(command_name)
if handler: if args_handler:
logger.info("Executing command: /%s %s", command_name, command_parts[1:])
await args_handler(command_parts[1:])
elif command_name in self._commands:
logger.info("Executing command: /%s", command_name) logger.info("Executing command: /%s", command_name)
await handler() await self._commands[command_name]()
else: else:
logger.debug("Unknown command: /%s", command_name) logger.debug("Unknown command: /%s", command_name)
await self._client.send_message( await self._client.send_message(

View File

@@ -1,6 +1,6 @@
"""Tests for database helper functions.""" """Tests for database helper functions."""
from src.db import get_open_position, init_db, log_trade from src.db import get_open_position, get_open_positions_by_market, init_db, log_trade
def test_get_open_position_returns_latest_buy() -> None: def test_get_open_position_returns_latest_buy() -> None:
@@ -58,3 +58,87 @@ def test_get_open_position_returns_none_when_latest_is_sell() -> None:
def test_get_open_position_returns_none_when_no_trades() -> None: def test_get_open_position_returns_none_when_no_trades() -> None:
conn = init_db(":memory:") conn = init_db(":memory:")
assert get_open_position(conn, "AAPL", "US_NASDAQ") is None assert get_open_position(conn, "AAPL", "US_NASDAQ") is None
# --- get_open_positions_by_market tests ---
def test_get_open_positions_by_market_returns_net_positive_stocks() -> None:
"""Stocks with net BUY quantity > 0 are included."""
conn = init_db(":memory:")
log_trade(
conn=conn, stock_code="005930", action="BUY", confidence=90,
rationale="entry", quantity=5, price=70000.0, market="KR",
exchange_code="KRX", decision_id="d1",
)
log_trade(
conn=conn, stock_code="000660", action="BUY", confidence=85,
rationale="entry", quantity=3, price=100000.0, market="KR",
exchange_code="KRX", decision_id="d2",
)
result = get_open_positions_by_market(conn, "KR")
assert set(result) == {"005930", "000660"}
def test_get_open_positions_by_market_excludes_fully_sold_stocks() -> None:
"""Stocks where BUY qty == SELL qty are excluded (net qty = 0)."""
conn = init_db(":memory:")
log_trade(
conn=conn, stock_code="005930", action="BUY", confidence=90,
rationale="entry", quantity=3, price=70000.0, market="KR",
exchange_code="KRX", decision_id="d1",
)
log_trade(
conn=conn, stock_code="005930", action="SELL", confidence=95,
rationale="exit", quantity=3, price=71000.0, market="KR",
exchange_code="KRX", decision_id="d2",
)
result = get_open_positions_by_market(conn, "KR")
assert "005930" not in result
def test_get_open_positions_by_market_includes_partially_sold_stocks() -> None:
"""Stocks with partial SELL (net qty > 0) are still included."""
conn = init_db(":memory:")
log_trade(
conn=conn, stock_code="005930", action="BUY", confidence=90,
rationale="entry", quantity=5, price=70000.0, market="KR",
exchange_code="KRX", decision_id="d1",
)
log_trade(
conn=conn, stock_code="005930", action="SELL", confidence=95,
rationale="partial exit", quantity=2, price=71000.0, market="KR",
exchange_code="KRX", decision_id="d2",
)
result = get_open_positions_by_market(conn, "KR")
assert "005930" in result
def test_get_open_positions_by_market_is_market_scoped() -> None:
"""Only stocks from the specified market are returned."""
conn = init_db(":memory:")
log_trade(
conn=conn, stock_code="005930", action="BUY", confidence=90,
rationale="entry", quantity=3, price=70000.0, market="KR",
exchange_code="KRX", decision_id="d1",
)
log_trade(
conn=conn, stock_code="AAPL", action="BUY", confidence=85,
rationale="entry", quantity=2, price=200.0, market="NASD",
exchange_code="NAS", decision_id="d2",
)
kr_result = get_open_positions_by_market(conn, "KR")
nasd_result = get_open_positions_by_market(conn, "NASD")
assert kr_result == ["005930"]
assert nasd_result == ["AAPL"]
def test_get_open_positions_by_market_returns_empty_when_no_trades() -> None:
"""Empty list returned when no trades exist for the market."""
conn = init_db(":memory:")
assert get_open_positions_by_market(conn, "KR") == []

View File

@@ -5,7 +5,7 @@ from unittest.mock import AsyncMock, patch
import aiohttp import aiohttp
import pytest import pytest
from src.notifications.telegram_client import NotificationPriority, TelegramClient from src.notifications.telegram_client import NotificationFilter, NotificationPriority, TelegramClient
class TestTelegramClientInit: class TestTelegramClientInit:
@@ -481,3 +481,187 @@ class TestClientCleanup:
# Should not raise exception # Should not raise exception
await client.close() await client.close()
class TestNotificationFilter:
"""Test granular notification filter behavior."""
def test_default_filter_allows_all(self) -> None:
"""Default NotificationFilter has all flags enabled."""
f = NotificationFilter()
assert f.trades is True
assert f.market_open_close is True
assert f.fat_finger is True
assert f.system_events is True
assert f.playbook is True
assert f.scenario_match is True
assert f.errors is True
def test_client_uses_default_filter_when_none_given(self) -> None:
"""TelegramClient creates a default NotificationFilter when none provided."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
assert isinstance(client._filter, NotificationFilter)
assert client._filter.scenario_match is True
def test_client_stores_provided_filter(self) -> None:
"""TelegramClient stores a custom NotificationFilter."""
nf = NotificationFilter(scenario_match=False, trades=False)
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True, notification_filter=nf
)
assert client._filter.scenario_match is False
assert client._filter.trades is False
assert client._filter.market_open_close is True # default still True
@pytest.mark.asyncio
async def test_scenario_match_filtered_does_not_send(self) -> None:
"""notify_scenario_matched skips send when scenario_match=False."""
nf = NotificationFilter(scenario_match=False)
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True, notification_filter=nf
)
with patch("aiohttp.ClientSession.post") as mock_post:
await client.notify_scenario_matched(
stock_code="005930", action="BUY", condition_summary="rsi<30", confidence=85.0
)
mock_post.assert_not_called()
@pytest.mark.asyncio
async def test_trades_filtered_does_not_send(self) -> None:
"""notify_trade_execution skips send when trades=False."""
nf = NotificationFilter(trades=False)
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True, notification_filter=nf
)
with patch("aiohttp.ClientSession.post") as mock_post:
await client.notify_trade_execution(
stock_code="005930", market="KR", action="BUY",
quantity=10, price=70000.0, confidence=85.0
)
mock_post.assert_not_called()
@pytest.mark.asyncio
async def test_market_open_close_filtered_does_not_send(self) -> None:
"""notify_market_open/close skip send when market_open_close=False."""
nf = NotificationFilter(market_open_close=False)
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True, notification_filter=nf
)
with patch("aiohttp.ClientSession.post") as mock_post:
await client.notify_market_open("Korea")
await client.notify_market_close("Korea", pnl_pct=1.5)
mock_post.assert_not_called()
@pytest.mark.asyncio
async def test_circuit_breaker_always_sends_regardless_of_filter(self) -> None:
"""notify_circuit_breaker always sends (no filter flag)."""
nf = NotificationFilter(
trades=False, market_open_close=False, fat_finger=False,
system_events=False, playbook=False, scenario_match=False, errors=False,
)
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True, notification_filter=nf
)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
with patch("aiohttp.ClientSession.post", return_value=mock_resp) as mock_post:
await client.notify_circuit_breaker(pnl_pct=-3.5, threshold=-3.0)
assert mock_post.call_count == 1
@pytest.mark.asyncio
async def test_errors_filtered_does_not_send(self) -> None:
"""notify_error skips send when errors=False."""
nf = NotificationFilter(errors=False)
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True, notification_filter=nf
)
with patch("aiohttp.ClientSession.post") as mock_post:
await client.notify_error("TestError", "something went wrong", "KR")
mock_post.assert_not_called()
@pytest.mark.asyncio
async def test_playbook_filtered_does_not_send(self) -> None:
"""notify_playbook_generated/failed skip send when playbook=False."""
nf = NotificationFilter(playbook=False)
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True, notification_filter=nf
)
with patch("aiohttp.ClientSession.post") as mock_post:
await client.notify_playbook_generated("KR", 3, 10, 1200)
await client.notify_playbook_failed("KR", "timeout")
mock_post.assert_not_called()
@pytest.mark.asyncio
async def test_system_events_filtered_does_not_send(self) -> None:
"""notify_system_start/shutdown skip send when system_events=False."""
nf = NotificationFilter(system_events=False)
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True, notification_filter=nf
)
with patch("aiohttp.ClientSession.post") as mock_post:
await client.notify_system_start("paper", ["KR"])
await client.notify_system_shutdown("Normal shutdown")
mock_post.assert_not_called()
def test_set_flag_valid_key(self) -> None:
"""set_flag returns True and updates field for a known key."""
nf = NotificationFilter()
assert nf.set_flag("scenario", False) is True
assert nf.scenario_match is False
def test_set_flag_invalid_key(self) -> None:
"""set_flag returns False for an unknown key."""
nf = NotificationFilter()
assert nf.set_flag("unknown_key", False) is False
def test_as_dict_keys_match_KEYS(self) -> None:
"""as_dict() returns every key defined in KEYS."""
nf = NotificationFilter()
d = nf.as_dict()
assert set(d.keys()) == set(NotificationFilter.KEYS.keys())
def test_set_notification_valid_key(self) -> None:
"""TelegramClient.set_notification toggles filter at runtime."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
assert client._filter.scenario_match is True
assert client.set_notification("scenario", False) is True
assert client._filter.scenario_match is False
def test_set_notification_all_off(self) -> None:
"""set_notification('all', False) disables every filter flag."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
assert client.set_notification("all", False) is True
for v in client.filter_status().values():
assert v is False
def test_set_notification_all_on(self) -> None:
"""set_notification('all', True) enables every filter flag."""
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True,
notification_filter=NotificationFilter(
trades=False, market_open_close=False, scenario_match=False,
fat_finger=False, system_events=False, playbook=False, errors=False,
),
)
assert client.set_notification("all", True) is True
for v in client.filter_status().values():
assert v is True
def test_set_notification_unknown_key(self) -> None:
"""set_notification returns False for an unknown key."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
assert client.set_notification("unknown", False) is False
def test_filter_status_reflects_current_state(self) -> None:
"""filter_status() matches the current NotificationFilter state."""
nf = NotificationFilter(trades=False, scenario_match=False)
client = TelegramClient(
bot_token="123:abc", chat_id="456", enabled=True, notification_filter=nf
)
status = client.filter_status()
assert status["trades"] is False
assert status["scenario"] is False
assert status["market"] is True

View File

@@ -875,3 +875,91 @@ class TestGetUpdates:
updates = await handler._get_updates() updates = await handler._get_updates()
assert updates == [] assert updates == []
class TestCommandWithArgs:
"""Test register_command_with_args and argument dispatch."""
def test_register_command_with_args_stored(self) -> None:
"""register_command_with_args stores handler in _commands_with_args."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
async def my_handler(args: list[str]) -> None:
pass
handler.register_command_with_args("notify", my_handler)
assert "notify" in handler._commands_with_args
assert handler._commands_with_args["notify"] is my_handler
@pytest.mark.asyncio
async def test_args_handler_receives_arguments(self) -> None:
"""Args handler is called with the trailing tokens."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
received: list[list[str]] = []
async def capture(args: list[str]) -> None:
received.append(args)
handler.register_command_with_args("notify", capture)
update = {
"message": {
"chat": {"id": "456"},
"text": "/notify scenario off",
}
}
await handler._handle_update(update)
assert received == [["scenario", "off"]]
@pytest.mark.asyncio
async def test_args_handler_takes_priority_over_no_args_handler(self) -> None:
"""When both handlers exist for same command, args handler wins."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
no_args_called = []
args_called = []
async def no_args_handler() -> None:
no_args_called.append(True)
async def args_handler(args: list[str]) -> None:
args_called.append(args)
handler.register_command("notify", no_args_handler)
handler.register_command_with_args("notify", args_handler)
update = {
"message": {
"chat": {"id": "456"},
"text": "/notify all off",
}
}
await handler._handle_update(update)
assert args_called == [["all", "off"]]
assert no_args_called == []
@pytest.mark.asyncio
async def test_args_handler_with_no_trailing_args(self) -> None:
"""/notify with no args still dispatches to args handler with empty list."""
client = TelegramClient(bot_token="123:abc", chat_id="456", enabled=True)
handler = TelegramCommandHandler(client)
received: list[list[str]] = []
async def capture(args: list[str]) -> None:
received.append(args)
handler.register_command_with_args("notify", capture)
update = {
"message": {
"chat": {"id": "456"},
"text": "/notify",
}
}
await handler._handle_update(update)
assert received == [[]]