diff --git a/runpod/serverless/modules/rp_ping.py b/runpod/serverless/modules/rp_ping.py index 5a5ec954..77fc069c 100644 --- a/runpod/serverless/modules/rp_ping.py +++ b/runpod/serverless/modules/rp_ping.py @@ -7,6 +7,7 @@ import threading import requests +from urllib3.util.retry import Retry from runpod.serverless.modules.rp_logger import RunPodLogger from .worker_state import Jobs, WORKER_ID @@ -22,8 +23,27 @@ class Heartbeat: PING_URL = PING_URL.replace('$RUNPOD_POD_ID', WORKER_ID) PING_INTERVAL = int(os.environ.get('RUNPOD_PING_INTERVAL', 10000))//1000 - _session = requests.Session() - _session.headers.update({"Authorization": f"{os.environ.get('RUNPOD_AI_API_KEY')}"}) + def __init__(self, pool_connections=100, retries=3) -> None: + ''' + Initializes the Heartbeat class. + ''' + self._session = requests.Session() + self._session.headers.update({"Authorization": f"{os.environ.get('RUNPOD_AI_API_KEY')}"}) + + retry_strategy = Retry( + total=retries, + status_forcelist=[429, 500, 502, 503, 504], + method_whitelist=["GET"], + backoff_factor=1 + ) + + adapter = requests.adapters.HTTPAdapter( + pool_connections=pool_connections, + pool_maxsize=pool_connections, + max_retries=retry_strategy + ) + self._session.mount('http://', adapter) + self._session.mount('https://', adapter) def start_ping(self, test=False): ''' @@ -40,13 +60,8 @@ def ping_loop(self, test=False): Sends heartbeat pings to the Runpod server. ''' while True: - try: - self._send_ping() - time.sleep(self.PING_INTERVAL) - except requests.RequestException as err: - log.error(f"Ping Error: {err}, attempting to restart ping.") - if test: - return + self._send_ping() + time.sleep(self.PING_INTERVAL) if test: return @@ -58,9 +73,13 @@ def _send_ping(self): job_ids = jobs.get_job_list() ping_params = {'job_id': job_ids} if job_ids is not None else None - result = self._session.get( - self.PING_URL, params=ping_params, - timeout=self.PING_INTERVAL - ) + try: + result = self._session.get( + self.PING_URL, params=ping_params, + timeout=self.PING_INTERVAL + ) + + log.debug(f"Heartbeat Sent | URL: {self.PING_URL} | Status: {result.status_code}") - log.debug(f"Heartbeat Sent | URL: {self.PING_URL} | Status: {result.status_code}") + except requests.RequestException as err: + log.error(f"Ping Request Error: {err}, attempting to restart ping.") diff --git a/runpod/serverless/worker.py b/runpod/serverless/worker.py index 1f65d1e6..82d6883f 100644 --- a/runpod/serverless/worker.py +++ b/runpod/serverless/worker.py @@ -92,16 +92,18 @@ async def run_worker(config: Dict[str, Any]) -> None: Args: config (Dict[str, Any]): Configuration parameters for the worker. """ - connector = aiohttp.TCPConnector(limit=None) - async with aiohttp.ClientSession( - connector=connector, headers=_get_auth_header(), timeout=_TIMEOUT) as session: + heartbeat.start_ping() + client_session = aiohttp.ClientSession( + connector=aiohttp.TCPConnector(limit=None), + headers=_get_auth_header(), timeout=_TIMEOUT + ) + + async with client_session as session: job_scaler = JobScaler( concurrency_controller=config.get('concurrency_controller', None) ) - heartbeat.start_ping() - while job_scaler.is_alive(): async for job in job_scaler.get_jobs(session):