From 95f540e5df1c172f57b2d03d06d12f287205300e Mon Sep 17 00:00:00 2001 From: agentson Date: Thu, 5 Feb 2026 00:08:56 +0900 Subject: [PATCH] fix: add token refresh lock to prevent concurrent API calls (issue #42) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add asyncio.Lock to prevent multiple coroutines from simultaneously refreshing the KIS access token, which hits the 1-per-minute rate limit (EGW00133: "접근토큰 발급 잠시 후 다시 시도하세요"). Changes: - Add self._token_lock in KISBroker.__init__ - Wrap token refresh in async with self._token_lock - Re-check token validity after acquiring lock (double-check pattern) - Add concurrent token refresh test (5 parallel requests → 1 API call) The lock ensures that when multiple coroutines detect an expired token, only the first one refreshes while others wait and reuse the result. Fixes: #42 Co-Authored-By: Claude Sonnet 4.5 --- src/broker/kis_api.py | 49 +++++++++++++++++++++++++++---------------- tests/test_broker.py | 40 +++++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 18 deletions(-) diff --git a/src/broker/kis_api.py b/src/broker/kis_api.py index e3d88ba..bfec86b 100644 --- a/src/broker/kis_api.py +++ b/src/broker/kis_api.py @@ -55,6 +55,7 @@ class KISBroker: self._session: aiohttp.ClientSession | None = None self._access_token: str | None = None self._token_expires_at: float = 0.0 + self._token_lock = asyncio.Lock() self._rate_limiter = LeakyBucket(settings.RATE_LIMIT_RPS) def _get_session(self) -> aiohttp.ClientSession: @@ -80,30 +81,42 @@ class KISBroker: # ------------------------------------------------------------------ async def _ensure_token(self) -> str: - """Return a valid access token, refreshing if expired.""" + """Return a valid access token, refreshing if expired. + + Uses a lock to prevent concurrent token refresh attempts that would + hit the API's 1-per-minute rate limit (EGW00133). + """ + # Fast path: check without lock now = asyncio.get_event_loop().time() if self._access_token and now < self._token_expires_at: return self._access_token - logger.info("Refreshing KIS access token") - session = self._get_session() - url = f"{self._base_url}/oauth2/tokenP" - body = { - "grant_type": "client_credentials", - "appkey": self._app_key, - "appsecret": self._app_secret, - } + # Slow path: acquire lock and refresh + async with self._token_lock: + # Re-check after acquiring lock (another coroutine may have refreshed) + now = asyncio.get_event_loop().time() + if self._access_token and now < self._token_expires_at: + return self._access_token - async with session.post(url, json=body) as resp: - if resp.status != 200: - text = await resp.text() - raise ConnectionError(f"Token refresh failed ({resp.status}): {text}") - data = await resp.json() + logger.info("Refreshing KIS access token") + session = self._get_session() + url = f"{self._base_url}/oauth2/tokenP" + body = { + "grant_type": "client_credentials", + "appkey": self._app_key, + "appsecret": self._app_secret, + } - self._access_token = data["access_token"] - self._token_expires_at = now + data.get("expires_in", 86400) - 60 # 1-min buffer - logger.info("Token refreshed successfully") - return self._access_token + async with session.post(url, json=body) as resp: + if resp.status != 200: + text = await resp.text() + raise ConnectionError(f"Token refresh failed ({resp.status}): {text}") + data = await resp.json() + + self._access_token = data["access_token"] + self._token_expires_at = now + data.get("expires_in", 86400) - 60 # 1-min buffer + logger.info("Token refreshed successfully") + return self._access_token # ------------------------------------------------------------------ # Hash Key (required for POST bodies) diff --git a/tests/test_broker.py b/tests/test_broker.py index fc88996..bacde6d 100644 --- a/tests/test_broker.py +++ b/tests/test_broker.py @@ -49,6 +49,46 @@ class TestTokenManagement: await broker.close() + @pytest.mark.asyncio + async def test_concurrent_token_refresh_calls_api_once(self, settings): + """Multiple concurrent token requests should only call API once.""" + broker = KISBroker(settings) + + # Track how many times the mock API is called + call_count = [0] + + def create_mock_resp(): + call_count[0] += 1 + mock_resp = AsyncMock() + mock_resp.status = 200 + mock_resp.json = AsyncMock( + return_value={ + "access_token": "tok_concurrent", + "token_type": "Bearer", + "expires_in": 86400, + } + ) + mock_resp.__aenter__ = AsyncMock(return_value=mock_resp) + mock_resp.__aexit__ = AsyncMock(return_value=False) + return mock_resp + + with patch("aiohttp.ClientSession.post", return_value=create_mock_resp()): + # Launch 5 concurrent token requests + tokens = await asyncio.gather( + broker._ensure_token(), + broker._ensure_token(), + broker._ensure_token(), + broker._ensure_token(), + broker._ensure_token(), + ) + + # All should get the same token + assert all(t == "tok_concurrent" for t in tokens) + # API should be called only once (due to lock) + assert call_count[0] == 1 + + await broker.close() + # --------------------------------------------------------------------------- # Network Error Handling -- 2.49.1