Skip to content

Commit df77102

Browse files
authored
debounce at HTTP 429 response (#367)
1 parent 5a6b911 commit df77102

File tree

4 files changed

+56
-11
lines changed

4 files changed

+56
-11
lines changed

runpod/http_client.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,17 @@
55
import os
66

77
import requests
8-
from aiohttp import ClientSession, ClientTimeout, TCPConnector
8+
from aiohttp import ClientSession, ClientTimeout, TCPConnector, ClientResponseError
99

1010
from .cli.groups.config.functions import get_credentials
1111
from .tracer import create_aiohttp_tracer, create_request_tracer
1212
from .user_agent import USER_AGENT
1313

1414

15+
class TooManyRequests(ClientResponseError):
16+
pass
17+
18+
1519
def get_auth_header():
1620
"""
1721
Produce a header dict with the `Authorization` key derived from

runpod/serverless/modules/rp_job.py

+12-7
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import traceback
99
from typing import Any, AsyncGenerator, Callable, Dict, Optional, Union, List
1010

11-
from runpod.http_client import ClientSession
11+
from runpod.http_client import ClientSession, TooManyRequests
1212
from runpod.serverless.modules.rp_logger import RunPodLogger
1313

1414
from ...version import __version__ as runpod_version
@@ -70,15 +70,20 @@ async def get_job(
7070
log.debug("- Received 400 status, expected when FlashBoot is enabled.")
7171
return
7272

73-
try:
74-
response.raise_for_status()
75-
except Exception:
76-
log.error(f"- Failed to get job, status code: {response.status}")
77-
return
73+
if response.status == 429:
74+
raise TooManyRequests(
75+
response.request_info,
76+
response.history,
77+
status=response.status,
78+
message=response.reason
79+
)
80+
81+
# All other errors should raise an exception
82+
response.raise_for_status()
7883

7984
# Verify if the content type is JSON
8085
if response.content_type != "application/json":
81-
log.error(f"- Unexpected content type: {response.content_type}")
86+
log.debug(f"- Unexpected content type: {response.content_type}")
8287
return
8388

8489
# Check if there is a non-empty content to parse

runpod/serverless/modules/rp_scale.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import signal
88
from typing import Any, Dict
99

10-
from ...http_client import AsyncClientSession, ClientSession
10+
from ...http_client import AsyncClientSession, ClientSession, TooManyRequests
1111
from .rp_job import get_job, handle_job
1212
from .rp_logger import RunPodLogger
1313
from .worker_state import JobsQueue, JobsProgress
@@ -150,6 +150,10 @@ async def get_jobs(self, session: ClientSession):
150150
acquired_jobs = await asyncio.wait_for(
151151
get_job(session, jobs_needed), timeout=30
152152
)
153+
except TooManyRequests:
154+
log.debug(f"JobScaler.get_jobs | Too many requests. Debounce for 5 seconds.")
155+
await asyncio.sleep(5) # debounce for 5 seconds
156+
continue
153157
except asyncio.CancelledError:
154158
log.debug("JobScaler.get_jobs | Request was cancelled.")
155159
continue

tests/test_serverless/test_modules/test_job.py

+34-2
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@
22
Test Serverless Job Module
33
"""
44

5-
import asyncio
65
from unittest.mock import Mock, patch
76

87
from unittest import IsolatedAsyncioTestCase
9-
from aiohttp import ClientResponse
8+
from aiohttp import ClientResponse, ClientResponseError
109
from aiohttp.test_utils import make_mocked_coro
1110

11+
from runpod.http_client import TooManyRequests
1212
from runpod.serverless.modules import rp_job
1313

1414

@@ -63,6 +63,38 @@ async def test_get_job_400(self):
6363
job = await rp_job.get_job(mock_session)
6464
self.assertIsNone(job)
6565

66+
async def test_get_job_429(self):
67+
"""Tests the get_job function with a 429 response."""
68+
response = Mock(ClientResponse)
69+
response.raise_for_status.side_effect = TooManyRequests(
70+
request_info=None,
71+
history=(),
72+
status=429,
73+
)
74+
75+
with patch("aiohttp.ClientSession") as mock_session, patch(
76+
"runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"
77+
):
78+
mock_session.get.return_value.__aenter__.return_value = response
79+
with self.assertRaises(ClientResponseError):
80+
await rp_job.get_job(mock_session)
81+
82+
async def test_get_job_500(self):
83+
"""Tests the get_job function with a 500 response."""
84+
# Mock 500 response
85+
response = Mock(ClientResponse)
86+
response.raise_for_status.side_effect = TooManyRequests(
87+
request_info=None, # Not needed for the test
88+
history=(), # Not needed for the test
89+
status=500,
90+
)
91+
with patch("aiohttp.ClientSession") as mock_session, patch(
92+
"runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"
93+
):
94+
mock_session.get.return_value.__aenter__.return_value = response
95+
with self.assertRaises(Exception):
96+
await rp_job.get_job(mock_session)
97+
6698
async def test_get_job_no_id(self):
6799
"""Tests the get_job function with a 200 response but no 'id' field."""
68100
response = Mock(ClientResponse)

0 commit comments

Comments
 (0)