diff --git a/python/ray/dashboard/modules/job/job_log_storage_client.py b/python/ray/dashboard/modules/job/job_log_storage_client.py index 2112c4e35510..a0a8ef39ebcc 100644 --- a/python/ray/dashboard/modules/job/job_log_storage_client.py +++ b/python/ray/dashboard/modules/job/job_log_storage_client.py @@ -1,6 +1,6 @@ import os from collections import deque -from typing import Iterator, List, Tuple +from typing import AsyncIterator, List, Tuple import ray from ray.dashboard.modules.job.common import JOB_LOGS_PATH_TEMPLATE @@ -25,10 +25,10 @@ def get_logs(self, job_id: str) -> str: except FileNotFoundError: return "" - def tail_logs(self, job_id: str) -> Iterator[List[str]]: + def tail_logs(self, job_id: str) -> AsyncIterator[List[str]]: return file_tail_iterator(self.get_log_file_path(job_id)) - def get_last_n_log_lines( + async def get_last_n_log_lines( self, job_id: str, num_log_lines=NUM_LOG_LINES_ON_ERROR ) -> str: """ @@ -39,9 +39,8 @@ def get_last_n_log_lines( job_id: The id of the job whose logs we want to return num_log_lines: The number of lines to return. """ - log_tail_iter = self.tail_logs(job_id) log_tail_deque = deque(maxlen=num_log_lines) - for lines in log_tail_iter: + async for lines in self.tail_logs(job_id): if lines is None: break else: diff --git a/python/ray/dashboard/modules/job/job_manager.py b/python/ray/dashboard/modules/job/job_manager.py index 075e6405131c..ff14215e03a3 100644 --- a/python/ray/dashboard/modules/job/job_manager.py +++ b/python/ray/dashboard/modules/job/job_manager.py @@ -6,7 +6,7 @@ import string import time import traceback -from typing import Any, Dict, Iterator, Optional, Union +from typing import Any, AsyncIterator, Dict, Optional, Union import ray import ray._private.ray_constants as ray_constants @@ -619,12 +619,12 @@ def get_job_logs(self, job_id: str) -> str: """Get all logs produced by a job.""" return self._log_client.get_logs(job_id) - async def tail_job_logs(self, job_id: str) -> Iterator[str]: + async def tail_job_logs(self, job_id: str) -> AsyncIterator[str]: """Return an iterator following the logs of a job.""" if await self.get_job_status(job_id) is None: raise RuntimeError(f"Job '{job_id}' does not exist.") - for lines in self._log_client.tail_logs(job_id): + async for lines in self._log_client.tail_logs(job_id): if lines is None: # Return if the job has exited and there are no new log lines. status = await self.get_job_status(job_id) diff --git a/python/ray/dashboard/modules/job/job_supervisor.py b/python/ray/dashboard/modules/job/job_supervisor.py index 4fc5073f907a..18974cd1ff4b 100644 --- a/python/ray/dashboard/modules/job/job_supervisor.py +++ b/python/ray/dashboard/modules/job/job_supervisor.py @@ -421,7 +421,7 @@ async def run( driver_exit_code=return_code, ) else: - log_tail = self._log_client.get_last_n_log_lines(self._job_id) + log_tail = await self._log_client.get_last_n_log_lines(self._job_id) if log_tail is not None and log_tail != "": message = ( "Job entrypoint command " diff --git a/python/ray/dashboard/modules/job/sdk.py b/python/ray/dashboard/modules/job/sdk.py index 7b1497ca566c..b3b25e936fa0 100644 --- a/python/ray/dashboard/modules/job/sdk.py +++ b/python/ray/dashboard/modules/job/sdk.py @@ -1,6 +1,6 @@ import dataclasses import logging -from typing import Any, Dict, Iterator, List, Optional, Union +from typing import Any, AsyncIterator, Dict, List, Optional, Union import packaging.version @@ -449,7 +449,7 @@ def get_job_logs(self, job_id: str) -> str: self._raise_error(r) @PublicAPI(stability="stable") - async def tail_job_logs(self, job_id: str) -> Iterator[str]: + async def tail_job_logs(self, job_id: str) -> AsyncIterator[str]: """Get an iterator that follows the logs of a job. Example: diff --git a/python/ray/dashboard/modules/job/tests/test_utils.py b/python/ray/dashboard/modules/job/tests/test_utils.py index 9524141e9a04..732aceea5d92 100644 --- a/python/ray/dashboard/modules/job/tests/test_utils.py +++ b/python/ray/dashboard/modules/job/tests/test_utils.py @@ -12,6 +12,17 @@ ) +# Polyfill anext() function for Python 3.9 compatibility +# May raise StopAsyncIteration. +async def anext_polyfill(iterator): + return await iterator.__anext__() + + +# Use the built-in anext() for Python 3.10+, otherwise use our polyfilled function +if sys.version_info < (3, 10): + anext = anext_polyfill + + @pytest.fixture def tmp(): with NamedTemporaryFile() as f: @@ -80,32 +91,36 @@ async def test_forward_compatibility(self): class TestIterLine: - def test_invalid_type(self): + @pytest.mark.asyncio + async def test_invalid_type(self): with pytest.raises(TypeError, match="path must be a string"): - next(file_tail_iterator(1)) + await anext(file_tail_iterator(1)) - def test_file_not_created(self, tmp): + @pytest.mark.asyncio + async def test_file_not_created(self, tmp): it = file_tail_iterator(tmp) - assert next(it) is None + assert await anext(it) is None f = open(tmp, "w") f.write("hi\n") f.flush() - assert next(it) is not None + assert await anext(it) is not None - def test_wait_for_newline(self, tmp): + @pytest.mark.asyncio + async def test_wait_for_newline(self, tmp): it = file_tail_iterator(tmp) - assert next(it) is None + assert await anext(it) is None f = open(tmp, "w") f.write("no_newline_yet") - assert next(it) is None + assert await anext(it) is None f.write("\n") f.flush() - assert next(it) == ["no_newline_yet\n"] + assert await anext(it) == ["no_newline_yet\n"] - def test_multiple_lines(self, tmp): + @pytest.mark.asyncio + async def test_multiple_lines(self, tmp): it = file_tail_iterator(tmp) - assert next(it) is None + assert await anext(it) is None f = open(tmp, "w") @@ -114,13 +129,14 @@ def test_multiple_lines(self, tmp): s = f"{i}\n" f.write(s) f.flush() - assert next(it) == [s] + assert await anext(it) == [s] - assert next(it) is None + assert await anext(it) is None - def test_batching(self, tmp): + @pytest.mark.asyncio + async def test_batching(self, tmp): it = file_tail_iterator(tmp) - assert next(it) is None + assert await anext(it) is None f = open(tmp, "w") @@ -131,13 +147,14 @@ def test_batching(self, tmp): f.write(f"{i}\n") f.flush() - assert next(it) == [f"{i}\n" for i in range(10)] + assert await anext(it) == [f"{i}\n" for i in range(10)] - assert next(it) is None + assert await anext(it) is None - def test_max_line_batching(self, tmp): + @pytest.mark.asyncio + async def test_max_line_batching(self, tmp): it = file_tail_iterator(tmp) - assert next(it) is None + assert await anext(it) is None f = open(tmp, "w") @@ -148,17 +165,18 @@ def test_max_line_batching(self, tmp): f.write(f"{i}\n") f.flush() - assert next(it) == [f"{i}\n" for i in range(10)] - assert next(it) == [f"{i}\n" for i in range(10, 20)] - assert next(it) == [f"{i}\n" for i in range(20, 30)] - assert next(it) == [f"{i}\n" for i in range(30, 40)] - assert next(it) == [f"{i}\n" for i in range(40, 50)] + assert await anext(it) == [f"{i}\n" for i in range(10)] + assert await anext(it) == [f"{i}\n" for i in range(10, 20)] + assert await anext(it) == [f"{i}\n" for i in range(20, 30)] + assert await anext(it) == [f"{i}\n" for i in range(30, 40)] + assert await anext(it) == [f"{i}\n" for i in range(40, 50)] - assert next(it) is None + assert await anext(it) is None - def test_max_char_batching(self, tmp): + @pytest.mark.asyncio + async def test_max_char_batching(self, tmp): it = file_tail_iterator(tmp) - assert next(it) is None + assert await anext(it) is None f = open(tmp, "w") @@ -170,31 +188,32 @@ def test_max_char_batching(self, tmp): f.flush() # First line will come in a batch of its own - assert next(it) == [f"{'1234567890' * 6000}\n"] + assert await anext(it) == [f"{'1234567890' * 6000}\n"] # Other 4 lines will be batched together assert ( - next(it) + await anext(it) == [ f"{'1234567890' * 500}\n", ] * 4 ) - assert next(it) is None + assert await anext(it) is None - def test_delete_file(self): + @pytest.mark.asyncio + async def test_delete_file(self): with NamedTemporaryFile() as tmp: it = file_tail_iterator(tmp.name) f = open(tmp.name, "w") - assert next(it) is None + assert await anext(it) is None f.write("hi\n") f.flush() - assert next(it) == ["hi\n"] + assert await anext(it) == ["hi\n"] # Calls should continue returning None after file deleted. - assert next(it) is None + assert await anext(it) is None if __name__ == "__main__": diff --git a/python/ray/dashboard/modules/job/utils.py b/python/ray/dashboard/modules/job/utils.py index 0235063dacf3..8c00a7014cec 100644 --- a/python/ray/dashboard/modules/job/utils.py +++ b/python/ray/dashboard/modules/job/utils.py @@ -3,10 +3,9 @@ import logging import os import re -import time import traceback from dataclasses import dataclass -from typing import Any, Dict, Iterator, List, Optional, Tuple, Union +from typing import Any, AsyncIterator, Dict, List, Optional, Tuple, Union from ray._private import ray_constants from ray._private.gcs_utils import GcsAioClient @@ -60,7 +59,7 @@ def redact_url_password(url: str) -> str: return url -def file_tail_iterator(path: str) -> Iterator[Optional[List[str]]]: +async def file_tail_iterator(path: str) -> AsyncIterator[Optional[List[str]]]: """Yield lines from a file as it's written. Returns lines in batches of up to 10 lines or 20000 characters, @@ -114,7 +113,7 @@ def file_tail_iterator(path: str) -> Iterator[Optional[List[str]]]: chunk_char_count += len(curr_line) else: # If EOF is reached sleep for 1s before continuing - time.sleep(1) + await asyncio.sleep(1) async def parse_and_validate_request(