Skip to content

Commit 4118877

Browse files
authoredAug 18, 2023
Merge pull request #97 from runpod/connectionpool-fix
Connectionpool fix
2 parents 0273994 + 5897304 commit 4118877

File tree

2 files changed

+40
-19
lines changed

2 files changed

+40
-19
lines changed
 

‎runpod/serverless/modules/rp_ping.py

+33-14
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import threading
88

99
import requests
10+
from urllib3.util.retry import Retry
1011

1112
from runpod.serverless.modules.rp_logger import RunPodLogger
1213
from .worker_state import Jobs, WORKER_ID
@@ -22,8 +23,27 @@ class Heartbeat:
2223
PING_URL = PING_URL.replace('$RUNPOD_POD_ID', WORKER_ID)
2324
PING_INTERVAL = int(os.environ.get('RUNPOD_PING_INTERVAL', 10000))//1000
2425

25-
_session = requests.Session()
26-
_session.headers.update({"Authorization": f"{os.environ.get('RUNPOD_AI_API_KEY')}"})
26+
def __init__(self, pool_connections=100, retries=3) -> None:
27+
'''
28+
Initializes the Heartbeat class.
29+
'''
30+
self._session = requests.Session()
31+
self._session.headers.update({"Authorization": f"{os.environ.get('RUNPOD_AI_API_KEY')}"})
32+
33+
retry_strategy = Retry(
34+
total=retries,
35+
status_forcelist=[429, 500, 502, 503, 504],
36+
method_whitelist=["GET"],
37+
backoff_factor=1
38+
)
39+
40+
adapter = requests.adapters.HTTPAdapter(
41+
pool_connections=pool_connections,
42+
pool_maxsize=pool_connections,
43+
max_retries=retry_strategy
44+
)
45+
self._session.mount('http://', adapter)
46+
self._session.mount('https://', adapter)
2747

2848
def start_ping(self, test=False):
2949
'''
@@ -40,13 +60,8 @@ def ping_loop(self, test=False):
4060
Sends heartbeat pings to the Runpod server.
4161
'''
4262
while True:
43-
try:
44-
self._send_ping()
45-
time.sleep(self.PING_INTERVAL)
46-
except requests.RequestException as err:
47-
log.error(f"Ping Error: {err}, attempting to restart ping.")
48-
if test:
49-
return
63+
self._send_ping()
64+
time.sleep(self.PING_INTERVAL)
5065

5166
if test:
5267
return
@@ -58,9 +73,13 @@ def _send_ping(self):
5873
job_ids = jobs.get_job_list()
5974
ping_params = {'job_id': job_ids} if job_ids is not None else None
6075

61-
result = self._session.get(
62-
self.PING_URL, params=ping_params,
63-
timeout=self.PING_INTERVAL
64-
)
76+
try:
77+
result = self._session.get(
78+
self.PING_URL, params=ping_params,
79+
timeout=self.PING_INTERVAL
80+
)
81+
82+
log.debug(f"Heartbeat Sent | URL: {self.PING_URL} | Status: {result.status_code}")
6583

66-
log.debug(f"Heartbeat Sent | URL: {self.PING_URL} | Status: {result.status_code}")
84+
except requests.RequestException as err:
85+
log.error(f"Ping Request Error: {err}, attempting to restart ping.")

‎runpod/serverless/worker.py

+7-5
Original file line numberDiff line numberDiff line change
@@ -92,16 +92,18 @@ async def run_worker(config: Dict[str, Any]) -> None:
9292
Args:
9393
config (Dict[str, Any]): Configuration parameters for the worker.
9494
"""
95-
connector = aiohttp.TCPConnector(limit=None)
96-
async with aiohttp.ClientSession(
97-
connector=connector, headers=_get_auth_header(), timeout=_TIMEOUT) as session:
95+
heartbeat.start_ping()
9896

97+
client_session = aiohttp.ClientSession(
98+
connector=aiohttp.TCPConnector(limit=None),
99+
headers=_get_auth_header(), timeout=_TIMEOUT
100+
)
101+
102+
async with client_session as session:
99103
job_scaler = JobScaler(
100104
concurrency_controller=config.get('concurrency_controller', None)
101105
)
102106

103-
heartbeat.start_ping()
104-
105107
while job_scaler.is_alive():
106108

107109
async for job in job_scaler.get_jobs(session):

0 commit comments

Comments
 (0)
Please sign in to comment.