fix: add token refresh lock to prevent concurrent API calls (issue #42)
Some checks failed
CI / test (pull_request) Has been cancelled

Add asyncio.Lock to prevent multiple coroutines from simultaneously
refreshing the KIS access token, which hits the 1-per-minute rate
limit (EGW00133: "접근토큰 발급 잠시 후 다시 시도하세요").

Changes:
- Add self._token_lock in KISBroker.__init__
- Wrap token refresh in async with self._token_lock
- Re-check token validity after acquiring lock (double-check pattern)
- Add concurrent token refresh test (5 parallel requests → 1 API call)

The lock ensures that when multiple coroutines detect an expired token,
only the first one refreshes while others wait and reuse the result.

Fixes: #42

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
agentson
2026-02-05 00:08:56 +09:00
parent 0087a6b20a
commit 95f540e5df
2 changed files with 71 additions and 18 deletions

View File

@@ -55,6 +55,7 @@ class KISBroker:
self._session: aiohttp.ClientSession | None = None
self._access_token: str | None = None
self._token_expires_at: float = 0.0
self._token_lock = asyncio.Lock()
self._rate_limiter = LeakyBucket(settings.RATE_LIMIT_RPS)
def _get_session(self) -> aiohttp.ClientSession:
@@ -80,30 +81,42 @@ class KISBroker:
# ------------------------------------------------------------------
async def _ensure_token(self) -> str:
"""Return a valid access token, refreshing if expired."""
"""Return a valid access token, refreshing if expired.
Uses a lock to prevent concurrent token refresh attempts that would
hit the API's 1-per-minute rate limit (EGW00133).
"""
# Fast path: check without lock
now = asyncio.get_event_loop().time()
if self._access_token and now < self._token_expires_at:
return self._access_token
logger.info("Refreshing KIS access token")
session = self._get_session()
url = f"{self._base_url}/oauth2/tokenP"
body = {
"grant_type": "client_credentials",
"appkey": self._app_key,
"appsecret": self._app_secret,
}
# Slow path: acquire lock and refresh
async with self._token_lock:
# Re-check after acquiring lock (another coroutine may have refreshed)
now = asyncio.get_event_loop().time()
if self._access_token and now < self._token_expires_at:
return self._access_token
async with session.post(url, json=body) as resp:
if resp.status != 200:
text = await resp.text()
raise ConnectionError(f"Token refresh failed ({resp.status}): {text}")
data = await resp.json()
logger.info("Refreshing KIS access token")
session = self._get_session()
url = f"{self._base_url}/oauth2/tokenP"
body = {
"grant_type": "client_credentials",
"appkey": self._app_key,
"appsecret": self._app_secret,
}
self._access_token = data["access_token"]
self._token_expires_at = now + data.get("expires_in", 86400) - 60 # 1-min buffer
logger.info("Token refreshed successfully")
return self._access_token
async with session.post(url, json=body) as resp:
if resp.status != 200:
text = await resp.text()
raise ConnectionError(f"Token refresh failed ({resp.status}): {text}")
data = await resp.json()
self._access_token = data["access_token"]
self._token_expires_at = now + data.get("expires_in", 86400) - 60 # 1-min buffer
logger.info("Token refreshed successfully")
return self._access_token
# ------------------------------------------------------------------
# Hash Key (required for POST bodies)