From ff9c4d608263a6661f3d1564a9d2835ef326f565 Mon Sep 17 00:00:00 2001 From: agentson Date: Mon, 23 Feb 2026 16:57:13 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20=EC=8B=9C=EC=9E=91=20=EC=8B=9C=20?= =?UTF-8?q?=EB=B8=8C=EB=A1=9C=EC=BB=A4=20=ED=8F=AC=EC=A7=80=EC=85=98=20?= =?UTF-8?q?=E2=86=92=20DB=20=EB=8F=99=EA=B8=B0=ED=99=94=20=EB=B0=8F=20?= =?UTF-8?q?=EA=B5=AD=EB=82=B4=EC=A3=BC=EC=8B=9D=20=EC=9D=B4=EC=A4=91=20?= =?UTF-8?q?=EB=A7=A4=EC=88=98=20=EB=B0=A9=EC=A7=80=20(#206)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - sync_positions_from_broker() 함수 추가 - 시스템 시작 시 브로커 잔고를 조회해 DB에 없는 포지션을 BUY 레코드로 삽입 - 국내: get_balance(), 해외: get_overseas_balance(exchange_code) 순회 - ConnectionError는 경고 로그만 남기고 계속 진행 (non-fatal) - 동일 exchange_code 중복 조회 방지 (seen_exchange_codes 집합) - run() 초기화 후 최초 한 번 자동 호출 - 국내주식 BUY 이중 방지 로직 확장 - trading_cycle 및 run_daily_session에서 기존에 해외 전용(not market.is_domestic) 으로만 적용하던 broker balance 체크를 국내/해외 공통으로 변경 - _extract_held_qty_from_balance(is_domestic=market.is_domestic) - 테스트 (827 passed) - TestSyncPositionsFromBroker (6개): 국내/해외 동기화, 중복 skip, 공란, ConnectionError, dedup - TestDomesticBuyDoublePreventionTradingCycle (1개): 국내 보유 주식 BUY 억제 Co-Authored-By: Claude Sonnet 4.6 --- src/main.py | 102 ++++++++++++++-- tests/test_main.py | 282 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 376 insertions(+), 8 deletions(-) diff --git a/src/main.py b/src/main.py index 0268db0..bdf187d 100644 --- a/src/main.py +++ b/src/main.py @@ -40,7 +40,7 @@ from src.evolution.daily_review import DailyReviewer from src.evolution.optimizer import EvolutionOptimizer from src.logging.decision_logger import DecisionLogger from src.logging_config import setup_logging -from src.markets.schedule import MarketInfo, get_next_market_open, get_open_markets +from src.markets.schedule import MARKETS, MarketInfo, get_next_market_open, get_open_markets from src.notifications.telegram_client import NotificationFilter, TelegramClient, TelegramCommandHandler from src.strategy.models import DayPlaybook, MarketOutlook from src.strategy.playbook_store import PlaybookStore @@ -129,6 +129,88 @@ async def _retry_connection(coro_factory: Any, *args: Any, label: str = "", **kw raise +async def sync_positions_from_broker( + broker: Any, + overseas_broker: Any, + db_conn: Any, + settings: "Settings", +) -> int: + """Sync open positions from the live broker into the local DB at startup. + + Fetches current holdings from the broker for all configured markets and + inserts a synthetic BUY record for any position that the DB does not + already know about. This prevents double-buy when positions were opened + in a previous session or entered manually outside the system. + + Returns: + Number of new positions synced. + """ + synced = 0 + seen_exchange_codes: set[str] = set() + + for market_code in settings.enabled_market_list: + market = MARKETS.get(market_code) + if market is None: + continue + + try: + if market.is_domestic: + balance_data = await broker.get_balance() + log_market = market_code # "KR" + else: + if market.exchange_code in seen_exchange_codes: + continue + seen_exchange_codes.add(market.exchange_code) + balance_data = await overseas_broker.get_overseas_balance( + market.exchange_code + ) + log_market = market_code # e.g. "US_NASDAQ" + except ConnectionError as exc: + logger.warning( + "Startup sync: balance fetch failed for %s — skipping: %s", + market_code, + exc, + ) + continue + + held_codes = _extract_held_codes_from_balance( + balance_data, is_domestic=market.is_domestic + ) + for stock_code in held_codes: + if get_open_position(db_conn, stock_code, log_market): + continue # already tracked + qty = _extract_held_qty_from_balance( + balance_data, stock_code, is_domestic=market.is_domestic + ) + log_trade( + conn=db_conn, + stock_code=stock_code, + action="BUY", + confidence=0, + rationale="[startup-sync] Position detected from broker at startup", + quantity=qty, + price=0.0, + market=log_market, + exchange_code=market.exchange_code, + mode=settings.MODE, + ) + logger.info( + "Startup sync: %s/%s recorded as open position (qty=%d)", + log_market, + stock_code, + qty, + ) + synced += 1 + + if synced: + logger.info( + "Startup sync complete: %d position(s) synced from broker", synced + ) + else: + logger.info("Startup sync: no new positions to sync from broker") + return synced + + def _extract_symbol_from_holding(item: dict[str, Any]) -> str: """Extract symbol from overseas holding payload variants.""" for key in ( @@ -571,11 +653,11 @@ async def trading_cycle( # BUY 결정 전 기존 포지션 체크 (중복 매수 방지) if decision.action == "BUY": existing_position = get_open_position(db_conn, stock_code, market.code) - if not existing_position and not market.is_domestic: + if not existing_position: # SELL 지정가 접수 후 미체결 시 DB는 종료로 기록되나 브로커는 여전히 보유 중. - # 이중 매수 방지를 위해 라이브 브로커 잔고를 authoritative source로 사용. + # 국내/해외 모두 라이브 브로커 잔고를 authoritative source로 사용. broker_qty = _extract_held_qty_from_balance( - balance_data, stock_code, is_domestic=False + balance_data, stock_code, is_domestic=market.is_domestic ) if broker_qty > 0: existing_position = {"price": 0.0, "quantity": broker_qty} @@ -1187,11 +1269,11 @@ async def run_daily_session( # BUY 중복 방지: 브로커 잔고 기반 (미체결 SELL 리밋 주문 보호) if decision.action == "BUY": daily_existing = get_open_position(db_conn, stock_code, market.code) - if not daily_existing and not market.is_domestic: + if not daily_existing: # SELL 지정가 접수 후 미체결 시 DB는 종료로 기록되나 브로커는 여전히 보유 중. - # 이중 매수 방지를 위해 라이브 브로커 잔고를 authoritative source로 사용. + # 국내/해외 모두 라이브 브로커 잔고를 authoritative source로 사용. broker_qty = _extract_held_qty_from_balance( - balance_data, stock_code, is_domestic=False + balance_data, stock_code, is_domestic=market.is_domestic ) if broker_qty > 0: daily_existing = {"price": 0.0, "quantity": broker_qty} @@ -2040,6 +2122,12 @@ async def run(settings: Settings) -> None: except Exception as exc: logger.warning("System startup notification failed: %s", exc) + # Sync broker positions → DB to prevent double-buy on restart + try: + await sync_positions_from_broker(broker, overseas_broker, db_conn, settings) + except Exception as exc: + logger.warning("Startup position sync failed (non-fatal): %s", exc) + # Start command handler try: await command_handler.start_polling() diff --git a/tests/test_main.py b/tests/test_main.py index 3a58e20..f5d2e9a 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -24,6 +24,7 @@ from src.main import ( _start_dashboard_server, run_daily_session, safe_float, + sync_positions_from_broker, trading_cycle, ) from src.strategy.models import ( @@ -3274,7 +3275,6 @@ class TestRetryConnection: assert call_count == 1 # No retry for non-ConnectionError -# --------------------------------------------------------------------------- # run_daily_session — daily CB baseline (daily_start_eval) tests (issue #207) # --------------------------------------------------------------------------- @@ -3512,3 +3512,283 @@ class TestDailyCBBaseline: # Must return the original baseline, NOT the new total_eval (58000) assert result == 55000.0 + + +# --------------------------------------------------------------------------- +# sync_positions_from_broker — startup DB sync tests (issue #206) +# --------------------------------------------------------------------------- + + +class TestSyncPositionsFromBroker: + """Tests for sync_positions_from_broker() startup position sync (issue #206). + + The function queries broker balances at startup and inserts synthetic BUY + records for any holdings that the local DB is unaware of, preventing + double-buy when positions were opened in a previous session or manually. + """ + + def _make_settings(self, enabled_markets: str = "KR") -> Settings: + return Settings( + KIS_APP_KEY="k", + KIS_APP_SECRET="s", + KIS_ACCOUNT_NO="12345678-01", + GEMINI_API_KEY="g", + ENABLED_MARKETS=enabled_markets, + MODE="paper", + ) + + def _domestic_balance( + self, + stock_code: str = "005930", + qty: int = 5, + ) -> dict: + return { + "output1": [{"pdno": stock_code, "ord_psbl_qty": str(qty)}], + "output2": [ + { + "tot_evlu_amt": "1000000", + "dnca_tot_amt": "500000", + "pchs_amt_smtl_amt": "500000", + } + ], + } + + def _overseas_balance( + self, + stock_code: str = "AAPL", + qty: int = 10, + ) -> dict: + return { + "output1": [{"ovrs_pdno": stock_code, "ovrs_cblc_qty": str(qty)}], + "output2": [ + { + "frcr_evlu_tota": "50000", + "frcr_dncl_amt_2": "10000", + "frcr_buy_amt_smtl": "40000", + } + ], + } + + @pytest.mark.asyncio + async def test_syncs_domestic_position_not_in_db(self) -> None: + """A domestic holding found in broker but absent from DB is inserted.""" + settings = self._make_settings("KR") + db_conn = init_db(":memory:") + + broker = MagicMock() + broker.get_balance = AsyncMock( + return_value=self._domestic_balance("005930", qty=7) + ) + overseas_broker = MagicMock() + + synced = await sync_positions_from_broker( + broker, overseas_broker, db_conn, settings + ) + + assert synced == 1 + from src.db import get_open_position + pos = get_open_position(db_conn, "005930", "KR") + assert pos is not None + assert pos["quantity"] == 7 + + @pytest.mark.asyncio + async def test_skips_position_already_in_db(self) -> None: + """No duplicate record is created when the position already exists in DB.""" + settings = self._make_settings("KR") + db_conn = init_db(":memory:") + # Pre-insert a BUY record + log_trade( + conn=db_conn, + stock_code="005930", + action="BUY", + confidence=85, + rationale="existing position", + quantity=5, + price=70000.0, + market="KR", + exchange_code="KRX", + ) + + broker = MagicMock() + broker.get_balance = AsyncMock( + return_value=self._domestic_balance("005930", qty=5) + ) + overseas_broker = MagicMock() + + synced = await sync_positions_from_broker( + broker, overseas_broker, db_conn, settings + ) + + assert synced == 0 + + @pytest.mark.asyncio + async def test_syncs_overseas_position_not_in_db(self) -> None: + """An overseas holding found in broker but absent from DB is inserted.""" + settings = self._make_settings("US_NASDAQ") + db_conn = init_db(":memory:") + + broker = MagicMock() + overseas_broker = MagicMock() + overseas_broker.get_overseas_balance = AsyncMock( + return_value=self._overseas_balance("AAPL", qty=10) + ) + + synced = await sync_positions_from_broker( + broker, overseas_broker, db_conn, settings + ) + + assert synced == 1 + from src.db import get_open_position + pos = get_open_position(db_conn, "AAPL", "US_NASDAQ") + assert pos is not None + assert pos["quantity"] == 10 + + @pytest.mark.asyncio + async def test_returns_zero_when_broker_has_no_holdings(self) -> None: + """Returns 0 when broker reports empty holdings.""" + settings = self._make_settings("KR") + db_conn = init_db(":memory:") + + broker = MagicMock() + broker.get_balance = AsyncMock( + return_value={"output1": [], "output2": [{}]} + ) + overseas_broker = MagicMock() + + synced = await sync_positions_from_broker( + broker, overseas_broker, db_conn, settings + ) + + assert synced == 0 + + @pytest.mark.asyncio + async def test_handles_connection_error_gracefully(self) -> None: + """ConnectionError during balance fetch is logged but does not raise.""" + settings = self._make_settings("KR") + db_conn = init_db(":memory:") + + broker = MagicMock() + broker.get_balance = AsyncMock( + side_effect=ConnectionError("KIS unreachable") + ) + overseas_broker = MagicMock() + + synced = await sync_positions_from_broker( + broker, overseas_broker, db_conn, settings + ) + + assert synced == 0 # Failure treated as no-op + + @pytest.mark.asyncio + async def test_deduplicates_exchange_codes_for_overseas(self) -> None: + """Each exchange code is queried at most once even if multiple market + codes share the same exchange (defensive deduplication).""" + # Both US_NASDAQ and a hypothetical duplicate would share "NASD" + # Use two DIFFERENT overseas markets (NASD vs NYSE) to verify each is + # queried separately. + settings = self._make_settings("US_NASDAQ,US_NYSE") + db_conn = init_db(":memory:") + + broker = MagicMock() + overseas_broker = MagicMock() + overseas_broker.get_overseas_balance = AsyncMock( + return_value={"output1": [], "output2": [{}]} + ) + + await sync_positions_from_broker( + broker, overseas_broker, db_conn, settings + ) + + # Two distinct exchange codes (NASD, NYSE) → 2 calls + assert overseas_broker.get_overseas_balance.call_count == 2 + + +# --------------------------------------------------------------------------- +# Domestic BUY double-prevention (issue #206) — trading_cycle integration +# --------------------------------------------------------------------------- + + +class TestDomesticBuyDoublePreventionTradingCycle: + """Verify domestic BUY suppression using broker balance in trading_cycle. + + Issue #206: the broker-balance check was overseas-only; domestic stocks + were not protected against double-buy caused by untracked positions. + """ + + @pytest.mark.asyncio + async def test_domestic_buy_suppressed_when_broker_holds_stock( + self, + ) -> None: + """BUY for a domestic stock must be suppressed when broker holds it, + even if the DB shows no open position.""" + db_conn = init_db(":memory:") + # DB: no open position for 005930 + + broker = MagicMock() + broker.get_current_price = AsyncMock(return_value=(70000.0, 1.0, 0.0)) + # Broker balance: holds 5 shares of 005930 + broker.get_balance = AsyncMock( + return_value={ + "output1": [{"pdno": "005930", "ord_psbl_qty": "5"}], + "output2": [ + { + "tot_evlu_amt": "1000000", + "dnca_tot_amt": "500000", + "pchs_amt_smtl_amt": "500000", + } + ], + } + ) + broker.send_order = AsyncMock(return_value={"msg1": "주문접수"}) + + market = MagicMock() + market.name = "KR" + market.code = "KR" + market.exchange_code = "KRX" + market.is_domestic = True + + engine = MagicMock(spec=ScenarioEngine) + engine.evaluate = MagicMock(return_value=_make_buy_match("005930")) + + telegram = MagicMock() + telegram.notify_trade_execution = AsyncMock() + telegram.notify_fat_finger = AsyncMock() + telegram.notify_circuit_breaker = AsyncMock() + telegram.notify_scenario_matched = AsyncMock() + + decision_logger = MagicMock() + decision_logger.log_decision = MagicMock(return_value="d1") + + settings = Settings( + KIS_APP_KEY="k", + KIS_APP_SECRET="s", + KIS_ACCOUNT_NO="12345678-01", + GEMINI_API_KEY="g", + MODE="paper", + ) + + await trading_cycle( + broker=broker, + overseas_broker=MagicMock(), + scenario_engine=engine, + playbook=_make_playbook(market="KR"), + risk=MagicMock(), + db_conn=db_conn, + decision_logger=decision_logger, + context_store=MagicMock( + get_latest_timeframe=MagicMock(return_value=None), + set_context=MagicMock(), + ), + criticality_assessor=MagicMock( + assess_market_conditions=MagicMock(return_value=MagicMock(value="NORMAL")), + get_timeout=MagicMock(return_value=5.0), + ), + telegram=telegram, + settings=settings, + market=market, + stock_code="005930", + scan_candidates={"KR": {}}, + ) + + # BUY must NOT have been executed because broker still holds the stock + broker.send_order.assert_not_called()