Skip to content

Connectionpool fix #97

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 33 additions & 14 deletions runpod/serverless/modules/rp_ping.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import threading

import requests
from urllib3.util.retry import Retry

from runpod.serverless.modules.rp_logger import RunPodLogger
from .worker_state import Jobs, WORKER_ID
Expand All @@ -22,8 +23,27 @@ class Heartbeat:
PING_URL = PING_URL.replace('$RUNPOD_POD_ID', WORKER_ID)
PING_INTERVAL = int(os.environ.get('RUNPOD_PING_INTERVAL', 10000))//1000

_session = requests.Session()
_session.headers.update({"Authorization": f"{os.environ.get('RUNPOD_AI_API_KEY')}"})
def __init__(self, pool_connections=100, retries=3) -> None:
'''
Initializes the Heartbeat class.
'''
self._session = requests.Session()
self._session.headers.update({"Authorization": f"{os.environ.get('RUNPOD_AI_API_KEY')}"})

retry_strategy = Retry(
total=retries,
status_forcelist=[429, 500, 502, 503, 504],
method_whitelist=["GET"],
backoff_factor=1
)

adapter = requests.adapters.HTTPAdapter(
pool_connections=pool_connections,
pool_maxsize=pool_connections,
max_retries=retry_strategy
)
self._session.mount('http://', adapter)
self._session.mount('https://', adapter)

def start_ping(self, test=False):
'''
Expand All @@ -40,13 +60,8 @@ def ping_loop(self, test=False):
Sends heartbeat pings to the Runpod server.
'''
while True:
try:
self._send_ping()
time.sleep(self.PING_INTERVAL)
except requests.RequestException as err:
log.error(f"Ping Error: {err}, attempting to restart ping.")
if test:
return
self._send_ping()
time.sleep(self.PING_INTERVAL)

if test:
return
Expand All @@ -58,9 +73,13 @@ def _send_ping(self):
job_ids = jobs.get_job_list()
ping_params = {'job_id': job_ids} if job_ids is not None else None

result = self._session.get(
self.PING_URL, params=ping_params,
timeout=self.PING_INTERVAL
)
try:
result = self._session.get(
self.PING_URL, params=ping_params,
timeout=self.PING_INTERVAL
)

log.debug(f"Heartbeat Sent | URL: {self.PING_URL} | Status: {result.status_code}")

log.debug(f"Heartbeat Sent | URL: {self.PING_URL} | Status: {result.status_code}")
except requests.RequestException as err:
log.error(f"Ping Request Error: {err}, attempting to restart ping.")
12 changes: 7 additions & 5 deletions runpod/serverless/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,18 @@ async def run_worker(config: Dict[str, Any]) -> None:
Args:
config (Dict[str, Any]): Configuration parameters for the worker.
"""
connector = aiohttp.TCPConnector(limit=None)
async with aiohttp.ClientSession(
connector=connector, headers=_get_auth_header(), timeout=_TIMEOUT) as session:
heartbeat.start_ping()

client_session = aiohttp.ClientSession(
connector=aiohttp.TCPConnector(limit=None),
headers=_get_auth_header(), timeout=_TIMEOUT
)

async with client_session as session:
job_scaler = JobScaler(
concurrency_controller=config.get('concurrency_controller', None)
)

heartbeat.start_ping()

while job_scaler.is_alive():

async for job in job_scaler.get_jobs(session):
Expand Down