"""TDD tests for broker/kis_api.py — written BEFORE implementation.""" from __future__ import annotations import asyncio from unittest.mock import AsyncMock, MagicMock, patch import pytest from src.broker.kis_api import KISBroker # --------------------------------------------------------------------------- # Token Management # --------------------------------------------------------------------------- class TestTokenManagement: """Access token must be auto-refreshed and cached.""" @pytest.mark.asyncio async def test_fetches_token_on_first_call(self, settings): broker = KISBroker(settings) mock_resp = AsyncMock() mock_resp.status = 200 mock_resp.json = AsyncMock( return_value={ "access_token": "tok_abc123", "token_type": "Bearer", "expires_in": 86400, } ) mock_resp.__aenter__ = AsyncMock(return_value=mock_resp) mock_resp.__aexit__ = AsyncMock(return_value=False) with patch("aiohttp.ClientSession.post", return_value=mock_resp): token = await broker._ensure_token() assert token == "tok_abc123" await broker.close() @pytest.mark.asyncio async def test_reuses_cached_token(self, settings): broker = KISBroker(settings) broker._access_token = "cached_token" broker._token_expires_at = asyncio.get_event_loop().time() + 3600 token = await broker._ensure_token() assert token == "cached_token" await broker.close() @pytest.mark.asyncio async def test_concurrent_token_refresh_calls_api_once(self, settings): """Multiple concurrent token requests should only call API once.""" broker = KISBroker(settings) # Track how many times the mock API is called call_count = [0] def create_mock_resp(): call_count[0] += 1 mock_resp = AsyncMock() mock_resp.status = 200 mock_resp.json = AsyncMock( return_value={ "access_token": "tok_concurrent", "token_type": "Bearer", "expires_in": 86400, } ) mock_resp.__aenter__ = AsyncMock(return_value=mock_resp) mock_resp.__aexit__ = AsyncMock(return_value=False) return mock_resp with patch("aiohttp.ClientSession.post", return_value=create_mock_resp()): # Launch 5 concurrent token requests tokens = await asyncio.gather( broker._ensure_token(), broker._ensure_token(), broker._ensure_token(), broker._ensure_token(), broker._ensure_token(), ) # All should get the same token assert all(t == "tok_concurrent" for t in tokens) # API should be called only once (due to lock) assert call_count[0] == 1 await broker.close() @pytest.mark.asyncio async def test_token_refresh_cooldown_waits_then_retries(self, settings): """Token refresh should wait out cooldown then retry (issue #54).""" broker = KISBroker(settings) broker._refresh_cooldown = 0.1 # Short cooldown for testing # All attempts fail with 403 (EGW00133) mock_resp_403 = AsyncMock() mock_resp_403.status = 403 mock_resp_403.text = AsyncMock( return_value='{"error_code":"EGW00133","error_description":"접근토큰 발급 잠시 후 다시 시도하세요(1분당 1회)"}' ) mock_resp_403.__aenter__ = AsyncMock(return_value=mock_resp_403) mock_resp_403.__aexit__ = AsyncMock(return_value=False) with patch("aiohttp.ClientSession.post", return_value=mock_resp_403): # First attempt should fail with 403 with pytest.raises(ConnectionError, match="Token refresh failed"): await broker._ensure_token() # Second attempt within cooldown should wait then retry (and still get 403) with pytest.raises(ConnectionError, match="Token refresh failed"): await broker._ensure_token() await broker.close() @pytest.mark.asyncio async def test_token_refresh_allowed_after_cooldown(self, settings): """Token refresh should be allowed after cooldown period expires.""" broker = KISBroker(settings) broker._refresh_cooldown = 0.1 # Very short cooldown for testing # First attempt fails mock_resp_403 = AsyncMock() mock_resp_403.status = 403 mock_resp_403.text = AsyncMock(return_value='{"error_code":"EGW00133"}') mock_resp_403.__aenter__ = AsyncMock(return_value=mock_resp_403) mock_resp_403.__aexit__ = AsyncMock(return_value=False) # Second attempt succeeds mock_resp_200 = AsyncMock() mock_resp_200.status = 200 mock_resp_200.json = AsyncMock( return_value={ "access_token": "tok_after_cooldown", "expires_in": 86400, } ) mock_resp_200.__aenter__ = AsyncMock(return_value=mock_resp_200) mock_resp_200.__aexit__ = AsyncMock(return_value=False) with patch("aiohttp.ClientSession.post", return_value=mock_resp_403): with pytest.raises(ConnectionError, match="Token refresh failed"): await broker._ensure_token() # Wait for cooldown to expire await asyncio.sleep(0.15) with patch("aiohttp.ClientSession.post", return_value=mock_resp_200): token = await broker._ensure_token() assert token == "tok_after_cooldown" await broker.close() # --------------------------------------------------------------------------- # Network Error Handling # --------------------------------------------------------------------------- class TestNetworkErrorHandling: """Broker must handle network timeouts and HTTP errors gracefully.""" @pytest.mark.asyncio async def test_timeout_raises_connection_error(self, settings): broker = KISBroker(settings) broker._access_token = "tok" broker._token_expires_at = asyncio.get_event_loop().time() + 3600 with patch( "aiohttp.ClientSession.get", side_effect=TimeoutError(), ): with pytest.raises(ConnectionError): await broker.get_orderbook("005930") await broker.close() @pytest.mark.asyncio async def test_http_500_raises_connection_error(self, settings): broker = KISBroker(settings) broker._access_token = "tok" broker._token_expires_at = asyncio.get_event_loop().time() + 3600 mock_resp = AsyncMock() mock_resp.status = 500 mock_resp.text = AsyncMock(return_value="Internal Server Error") mock_resp.__aenter__ = AsyncMock(return_value=mock_resp) mock_resp.__aexit__ = AsyncMock(return_value=False) with patch("aiohttp.ClientSession.get", return_value=mock_resp): with pytest.raises(ConnectionError): await broker.get_orderbook("005930") await broker.close() # --------------------------------------------------------------------------- # Rate Limiter # --------------------------------------------------------------------------- class TestRateLimiter: """The leaky bucket rate limiter must throttle requests.""" @pytest.mark.asyncio async def test_rate_limiter_does_not_block_under_limit(self, settings): broker = KISBroker(settings) # Should complete without blocking when under limit await broker._rate_limiter.acquire() await broker.close() @pytest.mark.asyncio async def test_send_order_acquires_rate_limiter_twice(self, settings): """send_order must acquire rate limiter for both hash key and order call.""" broker = KISBroker(settings) broker._access_token = "tok" broker._token_expires_at = asyncio.get_event_loop().time() + 3600 # Mock hash key response mock_hash_resp = AsyncMock() mock_hash_resp.status = 200 mock_hash_resp.json = AsyncMock(return_value={"HASH": "abc123"}) mock_hash_resp.__aenter__ = AsyncMock(return_value=mock_hash_resp) mock_hash_resp.__aexit__ = AsyncMock(return_value=False) # Mock order response mock_order_resp = AsyncMock() mock_order_resp.status = 200 mock_order_resp.json = AsyncMock(return_value={"rt_cd": "0"}) mock_order_resp.__aenter__ = AsyncMock(return_value=mock_order_resp) mock_order_resp.__aexit__ = AsyncMock(return_value=False) with patch( "aiohttp.ClientSession.post", side_effect=[mock_hash_resp, mock_order_resp] ): with patch.object( broker._rate_limiter, "acquire", new_callable=AsyncMock ) as mock_acquire: await broker.send_order("005930", "BUY", 1, 50000) assert mock_acquire.call_count == 2 await broker.close() # --------------------------------------------------------------------------- # Hash Key Generation # --------------------------------------------------------------------------- class TestHashKey: """POST requests to KIS require a hash key.""" @pytest.mark.asyncio async def test_generates_hash_key_for_post_body(self, settings): broker = KISBroker(settings) broker._access_token = "tok" broker._token_expires_at = asyncio.get_event_loop().time() + 3600 body = {"CANO": "12345678", "ACNT_PRDT_CD": "01"} mock_resp = AsyncMock() mock_resp.status = 200 mock_resp.json = AsyncMock(return_value={"HASH": "abc123hash"}) mock_resp.__aenter__ = AsyncMock(return_value=mock_resp) mock_resp.__aexit__ = AsyncMock(return_value=False) with patch("aiohttp.ClientSession.post", return_value=mock_resp): hash_key = await broker._get_hash_key(body) assert isinstance(hash_key, str) assert len(hash_key) > 0 await broker.close() @pytest.mark.asyncio async def test_hash_key_acquires_rate_limiter(self, settings): """_get_hash_key must go through the rate limiter to prevent burst.""" broker = KISBroker(settings) broker._access_token = "tok" broker._token_expires_at = asyncio.get_event_loop().time() + 3600 body = {"CANO": "12345678", "ACNT_PRDT_CD": "01"} mock_resp = AsyncMock() mock_resp.status = 200 mock_resp.json = AsyncMock(return_value={"HASH": "abc123hash"}) mock_resp.__aenter__ = AsyncMock(return_value=mock_resp) mock_resp.__aexit__ = AsyncMock(return_value=False) with patch("aiohttp.ClientSession.post", return_value=mock_resp): with patch.object( broker._rate_limiter, "acquire", new_callable=AsyncMock ) as mock_acquire: await broker._get_hash_key(body) mock_acquire.assert_called_once() await broker.close() # --------------------------------------------------------------------------- # fetch_market_rankings — TR_ID, path, params (issue #155) # --------------------------------------------------------------------------- def _make_ranking_mock(items: list[dict]) -> AsyncMock: """Build a mock HTTP response returning ranking items.""" mock_resp = AsyncMock() mock_resp.status = 200 mock_resp.json = AsyncMock(return_value={"output": items}) mock_resp.__aenter__ = AsyncMock(return_value=mock_resp) mock_resp.__aexit__ = AsyncMock(return_value=False) return mock_resp class TestFetchMarketRankings: """Verify correct TR_ID, API path, and params per ranking_type (issue #155).""" @pytest.fixture def broker(self, settings) -> KISBroker: b = KISBroker(settings) b._access_token = "tok" b._token_expires_at = float("inf") b._rate_limiter.acquire = AsyncMock() return b @pytest.mark.asyncio async def test_volume_uses_correct_tr_id_and_path(self, broker: KISBroker) -> None: mock_resp = _make_ranking_mock([]) with patch("aiohttp.ClientSession.get", return_value=mock_resp) as mock_get: await broker.fetch_market_rankings(ranking_type="volume") call_kwargs = mock_get.call_args url = call_kwargs[0][0] if call_kwargs[0] else call_kwargs[1].get("url", "") headers = call_kwargs[1].get("headers", {}) params = call_kwargs[1].get("params", {}) assert "volume-rank" in url assert headers.get("tr_id") == "FHPST01710000" assert params.get("FID_COND_SCR_DIV_CODE") == "20171" assert params.get("FID_TRGT_EXLS_CLS_CODE") == "0000000000" @pytest.mark.asyncio async def test_fluctuation_uses_correct_tr_id_and_path(self, broker: KISBroker) -> None: mock_resp = _make_ranking_mock([]) with patch("aiohttp.ClientSession.get", return_value=mock_resp) as mock_get: await broker.fetch_market_rankings(ranking_type="fluctuation") call_kwargs = mock_get.call_args url = call_kwargs[0][0] if call_kwargs[0] else call_kwargs[1].get("url", "") headers = call_kwargs[1].get("headers", {}) params = call_kwargs[1].get("params", {}) assert "ranking/fluctuation" in url assert headers.get("tr_id") == "FHPST01700000" assert params.get("fid_cond_scr_div_code") == "20170" @pytest.mark.asyncio async def test_volume_returns_parsed_rows(self, broker: KISBroker) -> None: items = [ { "mksc_shrn_iscd": "005930", "hts_kor_isnm": "삼성전자", "stck_prpr": "75000", "acml_vol": "10000000", "prdy_ctrt": "2.5", "vol_inrt": "150", } ] mock_resp = _make_ranking_mock(items) with patch("aiohttp.ClientSession.get", return_value=mock_resp): result = await broker.fetch_market_rankings(ranking_type="volume") assert len(result) == 1 assert result[0]["stock_code"] == "005930" assert result[0]["price"] == 75000.0 assert result[0]["change_rate"] == 2.5 # --------------------------------------------------------------------------- # KRX tick unit / round-down helpers (issue #157) # --------------------------------------------------------------------------- from src.broker.kis_api import kr_tick_unit, kr_round_down # noqa: E402 class TestKrTickUnit: """kr_tick_unit and kr_round_down must implement KRX price tick rules.""" @pytest.mark.parametrize( "price, expected_tick", [ (1999, 1), (2000, 5), (4999, 5), (5000, 10), (19999, 10), (20000, 50), (49999, 50), (50000, 100), (199999, 100), (200000, 500), (499999, 500), (500000, 1000), (1000000, 1000), ], ) def test_tick_unit_boundaries(self, price: int, expected_tick: int) -> None: assert kr_tick_unit(price) == expected_tick @pytest.mark.parametrize( "price, expected_rounded", [ (188150, 188100), # 100원 단위, 50원 잔여 → 내림 (188100, 188100), # 이미 정렬됨 (75050, 75000), # 100원 단위, 50원 잔여 → 내림 (49950, 49950), # 50원 단위 정렬됨 (49960, 49950), # 50원 단위, 10원 잔여 → 내림 (1999, 1999), # 1원 단위 → 그대로 (5003, 5000), # 10원 단위, 3원 잔여 → 내림 ], ) def test_round_down_to_tick(self, price: int, expected_rounded: int) -> None: assert kr_round_down(price) == expected_rounded # --------------------------------------------------------------------------- # get_current_price (issue #157) # --------------------------------------------------------------------------- class TestGetCurrentPrice: """get_current_price must use inquire-price API and return (price, change, foreigner).""" @pytest.fixture def broker(self, settings) -> KISBroker: b = KISBroker(settings) b._access_token = "tok" b._token_expires_at = float("inf") b._rate_limiter.acquire = AsyncMock() return b @pytest.mark.asyncio async def test_returns_correct_fields(self, broker: KISBroker) -> None: mock_resp = AsyncMock() mock_resp.status = 200 mock_resp.json = AsyncMock( return_value={ "rt_cd": "0", "output": { "stck_prpr": "188600", "prdy_ctrt": "3.97", "frgn_ntby_qty": "12345", }, } ) mock_resp.__aenter__ = AsyncMock(return_value=mock_resp) mock_resp.__aexit__ = AsyncMock(return_value=False) with patch("aiohttp.ClientSession.get", return_value=mock_resp) as mock_get: price, change_pct, foreigner = await broker.get_current_price("005930") assert price == 188600.0 assert change_pct == 3.97 assert foreigner == 12345.0 call_kwargs = mock_get.call_args url = call_kwargs[0][0] if call_kwargs[0] else call_kwargs[1].get("url", "") headers = call_kwargs[1].get("headers", {}) assert "inquire-price" in url assert headers.get("tr_id") == "FHKST01010100" @pytest.mark.asyncio async def test_http_error_raises_connection_error(self, broker: KISBroker) -> None: mock_resp = AsyncMock() mock_resp.status = 500 mock_resp.text = AsyncMock(return_value="Internal Server Error") mock_resp.__aenter__ = AsyncMock(return_value=mock_resp) mock_resp.__aexit__ = AsyncMock(return_value=False) with patch("aiohttp.ClientSession.get", return_value=mock_resp): with pytest.raises(ConnectionError, match="get_current_price failed"): await broker.get_current_price("005930") # --------------------------------------------------------------------------- # send_order tick rounding and ORD_DVSN (issue #157) # --------------------------------------------------------------------------- class TestSendOrderTickRounding: """send_order must apply KRX tick rounding and correct ORD_DVSN codes.""" @pytest.fixture def broker(self, settings) -> KISBroker: b = KISBroker(settings) b._access_token = "tok" b._token_expires_at = float("inf") b._rate_limiter.acquire = AsyncMock() return b @pytest.mark.asyncio async def test_limit_order_rounds_down_to_tick(self, broker: KISBroker) -> None: """Price 188150 (not on 100-won tick) must be rounded to 188100.""" mock_hash = AsyncMock() mock_hash.status = 200 mock_hash.json = AsyncMock(return_value={"HASH": "h"}) mock_hash.__aenter__ = AsyncMock(return_value=mock_hash) mock_hash.__aexit__ = AsyncMock(return_value=False) mock_order = AsyncMock() mock_order.status = 200 mock_order.json = AsyncMock(return_value={"rt_cd": "0"}) mock_order.__aenter__ = AsyncMock(return_value=mock_order) mock_order.__aexit__ = AsyncMock(return_value=False) with patch( "aiohttp.ClientSession.post", side_effect=[mock_hash, mock_order] ) as mock_post: await broker.send_order("005930", "BUY", 1, price=188150) order_call = mock_post.call_args_list[1] body = order_call[1].get("json", {}) assert body["ORD_UNPR"] == "188100" # rounded down assert body["ORD_DVSN"] == "00" # 지정가 @pytest.mark.asyncio async def test_limit_order_ord_dvsn_is_00(self, broker: KISBroker) -> None: """send_order with price>0 must use ORD_DVSN='00' (지정가).""" mock_hash = AsyncMock() mock_hash.status = 200 mock_hash.json = AsyncMock(return_value={"HASH": "h"}) mock_hash.__aenter__ = AsyncMock(return_value=mock_hash) mock_hash.__aexit__ = AsyncMock(return_value=False) mock_order = AsyncMock() mock_order.status = 200 mock_order.json = AsyncMock(return_value={"rt_cd": "0"}) mock_order.__aenter__ = AsyncMock(return_value=mock_order) mock_order.__aexit__ = AsyncMock(return_value=False) with patch( "aiohttp.ClientSession.post", side_effect=[mock_hash, mock_order] ) as mock_post: await broker.send_order("005930", "BUY", 1, price=50000) order_call = mock_post.call_args_list[1] body = order_call[1].get("json", {}) assert body["ORD_DVSN"] == "00" @pytest.mark.asyncio async def test_market_order_ord_dvsn_is_01(self, broker: KISBroker) -> None: """send_order with price=0 must use ORD_DVSN='01' (시장가).""" mock_hash = AsyncMock() mock_hash.status = 200 mock_hash.json = AsyncMock(return_value={"HASH": "h"}) mock_hash.__aenter__ = AsyncMock(return_value=mock_hash) mock_hash.__aexit__ = AsyncMock(return_value=False) mock_order = AsyncMock() mock_order.status = 200 mock_order.json = AsyncMock(return_value={"rt_cd": "0"}) mock_order.__aenter__ = AsyncMock(return_value=mock_order) mock_order.__aexit__ = AsyncMock(return_value=False) with patch( "aiohttp.ClientSession.post", side_effect=[mock_hash, mock_order] ) as mock_post: await broker.send_order("005930", "SELL", 1, price=0) order_call = mock_post.call_args_list[1] body = order_call[1].get("json", {}) assert body["ORD_DVSN"] == "01" # --------------------------------------------------------------------------- # TR_ID live/paper branching (issues #201, #202, #203) # --------------------------------------------------------------------------- class TestTRIDBranchingDomestic: """get_balance and send_order must use correct TR_ID for live vs paper mode.""" def _make_broker(self, settings, mode: str) -> KISBroker: from src.config import Settings s = Settings( KIS_APP_KEY=settings.KIS_APP_KEY, KIS_APP_SECRET=settings.KIS_APP_SECRET, KIS_ACCOUNT_NO=settings.KIS_ACCOUNT_NO, GEMINI_API_KEY=settings.GEMINI_API_KEY, DB_PATH=":memory:", ENABLED_MARKETS="KR", MODE=mode, ) b = KISBroker(s) b._access_token = "tok" b._token_expires_at = float("inf") b._rate_limiter.acquire = AsyncMock() return b @pytest.mark.asyncio async def test_get_balance_paper_uses_vttc8434r(self, settings) -> None: broker = self._make_broker(settings, "paper") mock_resp = AsyncMock() mock_resp.status = 200 mock_resp.json = AsyncMock( return_value={"output1": [], "output2": {}} ) mock_resp.__aenter__ = AsyncMock(return_value=mock_resp) mock_resp.__aexit__ = AsyncMock(return_value=False) with patch("aiohttp.ClientSession.get", return_value=mock_resp) as mock_get: await broker.get_balance() headers = mock_get.call_args[1].get("headers", {}) assert headers["tr_id"] == "VTTC8434R" @pytest.mark.asyncio async def test_get_balance_live_uses_tttc8434r(self, settings) -> None: broker = self._make_broker(settings, "live") mock_resp = AsyncMock() mock_resp.status = 200 mock_resp.json = AsyncMock( return_value={"output1": [], "output2": {}} ) mock_resp.__aenter__ = AsyncMock(return_value=mock_resp) mock_resp.__aexit__ = AsyncMock(return_value=False) with patch("aiohttp.ClientSession.get", return_value=mock_resp) as mock_get: await broker.get_balance() headers = mock_get.call_args[1].get("headers", {}) assert headers["tr_id"] == "TTTC8434R" @pytest.mark.asyncio async def test_send_order_buy_paper_uses_vttc0012u(self, settings) -> None: broker = self._make_broker(settings, "paper") mock_hash = AsyncMock() mock_hash.status = 200 mock_hash.json = AsyncMock(return_value={"HASH": "h"}) mock_hash.__aenter__ = AsyncMock(return_value=mock_hash) mock_hash.__aexit__ = AsyncMock(return_value=False) mock_order = AsyncMock() mock_order.status = 200 mock_order.json = AsyncMock(return_value={"rt_cd": "0"}) mock_order.__aenter__ = AsyncMock(return_value=mock_order) mock_order.__aexit__ = AsyncMock(return_value=False) with patch( "aiohttp.ClientSession.post", side_effect=[mock_hash, mock_order] ) as mock_post: await broker.send_order("005930", "BUY", 1) order_headers = mock_post.call_args_list[1][1].get("headers", {}) assert order_headers["tr_id"] == "VTTC0012U" @pytest.mark.asyncio async def test_send_order_buy_live_uses_tttc0012u(self, settings) -> None: broker = self._make_broker(settings, "live") mock_hash = AsyncMock() mock_hash.status = 200 mock_hash.json = AsyncMock(return_value={"HASH": "h"}) mock_hash.__aenter__ = AsyncMock(return_value=mock_hash) mock_hash.__aexit__ = AsyncMock(return_value=False) mock_order = AsyncMock() mock_order.status = 200 mock_order.json = AsyncMock(return_value={"rt_cd": "0"}) mock_order.__aenter__ = AsyncMock(return_value=mock_order) mock_order.__aexit__ = AsyncMock(return_value=False) with patch( "aiohttp.ClientSession.post", side_effect=[mock_hash, mock_order] ) as mock_post: await broker.send_order("005930", "BUY", 1) order_headers = mock_post.call_args_list[1][1].get("headers", {}) assert order_headers["tr_id"] == "TTTC0012U" @pytest.mark.asyncio async def test_send_order_sell_paper_uses_vttc0011u(self, settings) -> None: broker = self._make_broker(settings, "paper") mock_hash = AsyncMock() mock_hash.status = 200 mock_hash.json = AsyncMock(return_value={"HASH": "h"}) mock_hash.__aenter__ = AsyncMock(return_value=mock_hash) mock_hash.__aexit__ = AsyncMock(return_value=False) mock_order = AsyncMock() mock_order.status = 200 mock_order.json = AsyncMock(return_value={"rt_cd": "0"}) mock_order.__aenter__ = AsyncMock(return_value=mock_order) mock_order.__aexit__ = AsyncMock(return_value=False) with patch( "aiohttp.ClientSession.post", side_effect=[mock_hash, mock_order] ) as mock_post: await broker.send_order("005930", "SELL", 1) order_headers = mock_post.call_args_list[1][1].get("headers", {}) assert order_headers["tr_id"] == "VTTC0011U" @pytest.mark.asyncio async def test_send_order_sell_live_uses_tttc0011u(self, settings) -> None: broker = self._make_broker(settings, "live") mock_hash = AsyncMock() mock_hash.status = 200 mock_hash.json = AsyncMock(return_value={"HASH": "h"}) mock_hash.__aenter__ = AsyncMock(return_value=mock_hash) mock_hash.__aexit__ = AsyncMock(return_value=False) mock_order = AsyncMock() mock_order.status = 200 mock_order.json = AsyncMock(return_value={"rt_cd": "0"}) mock_order.__aenter__ = AsyncMock(return_value=mock_order) mock_order.__aexit__ = AsyncMock(return_value=False) with patch( "aiohttp.ClientSession.post", side_effect=[mock_hash, mock_order] ) as mock_post: await broker.send_order("005930", "SELL", 1) order_headers = mock_post.call_args_list[1][1].get("headers", {}) assert order_headers["tr_id"] == "TTTC0011U"