Skip to content

Commit f6971a8

Browse files
Merge pull request #298 from runpod/fastai-clarity
Fastai clarity
2 parents cce9474 + d992c63 commit f6971a8

File tree

7 files changed

+85
-23
lines changed

7 files changed

+85
-23
lines changed

CHANGELOG.md

+9
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,14 @@
11
# Change Log
22

3+
## Release 1.6.2 (2/12/24)
4+
5+
### Fixed
6+
7+
- Reorder FastAPI endpoints for clarity.
8+
- Truncate long logs to avoid overloading the server.
9+
10+
---
11+
312
## Release 1.6.1 (2/11/24)
413

514
### Added

runpod/serverless/modules/rp_fastapi.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ def _send_webhook(url: str, payload: Dict[str, Any]) -> bool:
163163
response.raise_for_status() # Raises exception for 4xx/5xx responses
164164
return True
165165
except requests.RequestException as err:
166-
print(f"Request to {url} failed: {err}")
166+
print(f"WEBHOOK | Request to {url} failed: {err}")
167167
return False
168168

169169

@@ -186,17 +186,17 @@ def __init__(self, config: Dict[str, Any]):
186186
self.config = config
187187

188188
tags_metadata = [
189+
{
190+
"name": "Synchronously Submit Request & Get Job Results",
191+
"description": "Endpoints for submitting job requests and getting the results."
192+
},
189193
{
190194
"name": "Submit Job Requests",
191195
"description": "Endpoints for submitting job requests."
192196
},
193197
{
194198
"name": "Check Job Results",
195199
"description": "Endpoints for checking the status of a job and getting the results."
196-
},
197-
{
198-
"name": "Synchronously Submit Request & Get Job Results",
199-
"description": "Endpoints for submitting job requests and getting the results."
200200
}
201201
]
202202

runpod/serverless/modules/rp_job.py

+11-4
Original file line numberDiff line numberDiff line change
@@ -184,18 +184,25 @@ async def run_job_generator(
184184
Run generator job used to stream output.
185185
Yields output partials from the generator.
186186
'''
187+
is_async_gen = inspect.isasyncgenfunction(handler)
188+
log.debug('Using Async Generator' if is_async_gen else 'Using Standard Generator', job["id"])
189+
187190
try:
188191
job_output = handler(job)
189-
if inspect.isasyncgenfunction(handler):
190-
log.debug('Async generator', job["id"])
192+
193+
if is_async_gen:
191194
async for output_partial in job_output:
195+
log.debug(f"Async Generator output: {output_partial}", job["id"])
192196
yield {"output": output_partial}
193197
else:
194-
log.debug('Generator', job["id"])
195198
for output_partial in job_output:
199+
log.debug(f"Generator output: {output_partial}", job["id"])
196200
yield {"output": output_partial}
201+
197202
except Exception as err: # pylint: disable=broad-except
198203
log.error(err, job["id"])
199-
yield {"error": f"handler: {str(err)} \ntraceback: {traceback.format_exc()}"}
204+
yield {
205+
"error": f"handler: {str(err)} \ntraceback: {traceback.format_exc()}"
206+
}
200207
finally:
201208
log.info('Finished running generator.', job["id"])

runpod/serverless/modules/rp_logger.py

+9
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from typing import Optional
1616

1717

18+
MAX_MESSAGE_LENGTH = 4096
1819
LOG_LEVELS = ['NOTSET', 'DEBUG', 'INFO', 'WARN', 'ERROR']
1920

2021

@@ -72,6 +73,14 @@ def log(self, message, message_level='INFO', job_id=None):
7273
if level_index > LOG_LEVELS.index(message_level) and message_level != 'TIP':
7374
return
7475

76+
message = str(message)
77+
# Truncate message over 10MB, remove chunk from the middle
78+
if len(message) > MAX_MESSAGE_LENGTH:
79+
half_max_length = MAX_MESSAGE_LENGTH // 2
80+
truncated_amount = len(message) - MAX_MESSAGE_LENGTH
81+
truncation_note = f'\n...TRUNCATED {truncated_amount} CHARACTERS...\n'
82+
message = message[:half_max_length] + truncation_note + message[-half_max_length:]
83+
7584
if os.environ.get('RUNPOD_ENDPOINT_ID'):
7685
log_json = {
7786
'requestId': job_id,

runpod/serverless/worker.py

+1
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ async def _process_job(job, session, job_scaler, config):
5050

5151
job_result = {'output': []}
5252
async for stream_output in generator_output:
53+
log.debug(f"Stream output: {stream_output}", job['id'])
5354
if 'error' in stream_output:
5455
job_result = stream_output
5556
break

tests/test_serverless/test_modules/test_logger.py

+19-1
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,27 @@ def test_log_job_id(self):
127127
# Test with endpoint id set
128128
os.environ["RUNPOD_ENDPOINT_ID"] = "test_endpoint_id"
129129
logger.log("test_message", "INFO", job_id)
130+
os.environ.pop("RUNPOD_ENDPOINT_ID")
130131

131132
mock_print.assert_called_with(
132133
'{"requestId": "test_job_id", "message": "test_message", "level": "INFO"}',
133134
flush=True
134135
)
135-
os.environ.pop("RUNPOD_ENDPOINT_ID")
136+
137+
def test_log_truncate(self):
138+
"""Tests that the log method truncates """
139+
logger = rp_logger.RunPodLogger()
140+
job_id = "test_job_id"
141+
long_message = "a" * (rp_logger.MAX_MESSAGE_LENGTH + 100)
142+
expected_start = "a" * (rp_logger.MAX_MESSAGE_LENGTH // 2)
143+
expected_end = "a" * (rp_logger.MAX_MESSAGE_LENGTH // 2)
144+
truncated_amount = len(long_message) - rp_logger.MAX_MESSAGE_LENGTH
145+
truncation_note = f'\n...TRUNCATED {truncated_amount} CHARACTERS...\n'
146+
truncated_message = expected_start + truncation_note + expected_end
147+
148+
with patch("builtins.print") as mock_print:
149+
logger.log(long_message, "INFO", job_id)
150+
151+
expected_log_output = f'INFO | {job_id} | {truncated_message}'
152+
153+
mock_print.assert_called_once_with(expected_log_output, flush=True)

tests/test_serverless/test_worker.py

+31-13
Original file line numberDiff line numberDiff line change
@@ -160,9 +160,27 @@ def generator_handler_exception(job):
160160
'''
161161
print(job)
162162
yield "test1"
163+
print("Raise exception")
163164
raise Exception() # pylint: disable=broad-exception-raised
164165

165166

167+
def test_generator_handler_exception():
168+
""" Test generator_handler_exception """
169+
job = {"id": "test_job"}
170+
gen = generator_handler_exception(job)
171+
172+
# Process the first yielded value
173+
output = next(gen)
174+
assert output == "test1", "First output should be 'test1'"
175+
176+
# Attempt to get the next value, expecting an exception
177+
try:
178+
next(gen)
179+
assert False, "Expected an exception to be raised"
180+
except Exception: # pylint: disable=broad-except
181+
assert True, "Exception was caught as expected"
182+
183+
166184
class TestRunWorker(IsolatedAsyncioTestCase):
167185
""" Tests for runpod | serverless| worker """
168186

@@ -254,27 +272,27 @@ async def test_run_worker_generator_handler_exception(
254272
'''
255273
Test run_worker with generator handler.
256274
257-
Args:
258-
mock_stream_result (_type_): _description_
259-
mock_run_job_generator (_type_): _description_
260-
mock_run_job (_type_): _description_
261-
mock_get_job (_type_): _description_
275+
This test verifies that:
276+
- `stream_result` is called exactly once before an exception occurs.
277+
- `run_job` is never called since `handler` is a generator function.
278+
- An error is correctly reported back via `send_result`.
262279
'''
263-
# Define the mock behaviors
264-
mock_get_job.return_value = {
265-
"id": "generator-123-exception", "input": {"number": 1}}
280+
RunPodLogger().set_level("DEBUG")
266281

267-
# Test generator handler
268-
generator_config = {
269-
"handler": generator_handler_exception, "refresh_worker": True}
270-
runpod.serverless.start(generator_config)
282+
# Setup: Mock `get_job` to return a predefined job.
283+
mock_get_job.return_value = {"id": "generator-123-exception", "input": {"number": 1}}
284+
285+
runpod.serverless.start({
286+
"handler": generator_handler_exception,
287+
"refresh_worker": True
288+
})
271289

272290
assert mock_stream_result.call_count == 1
273291
assert not mock_run_job.called
274292

275293
# Since return_aggregate_stream is NOT activated, we should not submit any outputs.
276294
_, args, _ = mock_send_result.mock_calls[0]
277-
assert 'error' in args[1]
295+
assert 'error' in args[1], "Expected the error to be reported in the results."
278296

279297
@patch("runpod.serverless.modules.rp_scale.get_job")
280298
@patch("runpod.serverless.worker.run_job")

0 commit comments

Comments
 (0)