Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][dashboard] Change file_tail_iterator to async. #47721

Merged
merged 6 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions python/ray/dashboard/modules/job/job_log_storage_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
from collections import deque
from typing import Iterator, List, Tuple
from typing import AsyncIterator, List, Tuple

import ray
from ray.dashboard.modules.job.common import JOB_LOGS_PATH_TEMPLATE
Expand All @@ -25,10 +25,10 @@ def get_logs(self, job_id: str) -> str:
except FileNotFoundError:
return ""

def tail_logs(self, job_id: str) -> Iterator[List[str]]:
def tail_logs(self, job_id: str) -> AsyncIterator[List[str]]:
return file_tail_iterator(self.get_log_file_path(job_id))

def get_last_n_log_lines(
async def get_last_n_log_lines(
self, job_id: str, num_log_lines=NUM_LOG_LINES_ON_ERROR
) -> str:
"""
Expand All @@ -39,9 +39,8 @@ def get_last_n_log_lines(
job_id: The id of the job whose logs we want to return
num_log_lines: The number of lines to return.
"""
log_tail_iter = self.tail_logs(job_id)
log_tail_deque = deque(maxlen=num_log_lines)
for lines in log_tail_iter:
async for lines in self.tail_logs(job_id):
if lines is None:
break
else:
Expand Down
6 changes: 3 additions & 3 deletions python/ray/dashboard/modules/job/job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import string
import time
import traceback
from typing import Any, Dict, Iterator, Optional, Union
from typing import Any, AsyncIterator, Dict, Optional, Union

import ray
import ray._private.ray_constants as ray_constants
Expand Down Expand Up @@ -619,12 +619,12 @@ def get_job_logs(self, job_id: str) -> str:
"""Get all logs produced by a job."""
return self._log_client.get_logs(job_id)

async def tail_job_logs(self, job_id: str) -> Iterator[str]:
async def tail_job_logs(self, job_id: str) -> AsyncIterator[str]:
"""Return an iterator following the logs of a job."""
if await self.get_job_status(job_id) is None:
raise RuntimeError(f"Job '{job_id}' does not exist.")

for lines in self._log_client.tail_logs(job_id):
async for lines in self._log_client.tail_logs(job_id):
if lines is None:
# Return if the job has exited and there are no new log lines.
status = await self.get_job_status(job_id)
Expand Down
2 changes: 1 addition & 1 deletion python/ray/dashboard/modules/job/job_supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ async def run(
driver_exit_code=return_code,
)
else:
log_tail = self._log_client.get_last_n_log_lines(self._job_id)
log_tail = await self._log_client.get_last_n_log_lines(self._job_id)
if log_tail is not None and log_tail != "":
message = (
"Job entrypoint command "
Expand Down
4 changes: 2 additions & 2 deletions python/ray/dashboard/modules/job/sdk.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import dataclasses
import logging
from typing import Any, Dict, Iterator, List, Optional, Union
from typing import Any, AsyncIterator, Dict, List, Optional, Union

import packaging.version

Expand Down Expand Up @@ -449,7 +449,7 @@ def get_job_logs(self, job_id: str) -> str:
self._raise_error(r)

@PublicAPI(stability="stable")
async def tail_job_logs(self, job_id: str) -> Iterator[str]:
async def tail_job_logs(self, job_id: str) -> AsyncIterator[str]:
"""Get an iterator that follows the logs of a job.

Example:
Expand Down
87 changes: 53 additions & 34 deletions python/ray/dashboard/modules/job/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,17 @@
)


# Polyfill anext() function for Python 3.9 compatibility
# May raise StopAsyncIteration.
async def anext_polyfill(iterator):
return await iterator.__anext__()


# Use the built-in anext() for Python 3.10+, otherwise use our polyfilled function
if sys.version_info < (3, 10):
anext = anext_polyfill


@pytest.fixture
def tmp():
with NamedTemporaryFile() as f:
Expand Down Expand Up @@ -80,32 +91,36 @@ async def test_forward_compatibility(self):


class TestIterLine:
def test_invalid_type(self):
@pytest.mark.asyncio
async def test_invalid_type(self):
with pytest.raises(TypeError, match="path must be a string"):
next(file_tail_iterator(1))
await anext(file_tail_iterator(1))

def test_file_not_created(self, tmp):
@pytest.mark.asyncio
async def test_file_not_created(self, tmp):
it = file_tail_iterator(tmp)
assert next(it) is None
assert await anext(it) is None
f = open(tmp, "w")
f.write("hi\n")
f.flush()
assert next(it) is not None
assert await anext(it) is not None

def test_wait_for_newline(self, tmp):
@pytest.mark.asyncio
async def test_wait_for_newline(self, tmp):
it = file_tail_iterator(tmp)
assert next(it) is None
assert await anext(it) is None

f = open(tmp, "w")
f.write("no_newline_yet")
assert next(it) is None
assert await anext(it) is None
f.write("\n")
f.flush()
assert next(it) == ["no_newline_yet\n"]
assert await anext(it) == ["no_newline_yet\n"]

def test_multiple_lines(self, tmp):
@pytest.mark.asyncio
async def test_multiple_lines(self, tmp):
it = file_tail_iterator(tmp)
assert next(it) is None
assert await anext(it) is None

f = open(tmp, "w")

Expand All @@ -114,13 +129,14 @@ def test_multiple_lines(self, tmp):
s = f"{i}\n"
f.write(s)
f.flush()
assert next(it) == [s]
assert await anext(it) == [s]

assert next(it) is None
assert await anext(it) is None

def test_batching(self, tmp):
@pytest.mark.asyncio
async def test_batching(self, tmp):
it = file_tail_iterator(tmp)
assert next(it) is None
assert await anext(it) is None

f = open(tmp, "w")

Expand All @@ -131,13 +147,14 @@ def test_batching(self, tmp):
f.write(f"{i}\n")
f.flush()

assert next(it) == [f"{i}\n" for i in range(10)]
assert await anext(it) == [f"{i}\n" for i in range(10)]

assert next(it) is None
assert await anext(it) is None

def test_max_line_batching(self, tmp):
@pytest.mark.asyncio
async def test_max_line_batching(self, tmp):
it = file_tail_iterator(tmp)
assert next(it) is None
assert await anext(it) is None

f = open(tmp, "w")

Expand All @@ -148,17 +165,18 @@ def test_max_line_batching(self, tmp):
f.write(f"{i}\n")
f.flush()

assert next(it) == [f"{i}\n" for i in range(10)]
assert next(it) == [f"{i}\n" for i in range(10, 20)]
assert next(it) == [f"{i}\n" for i in range(20, 30)]
assert next(it) == [f"{i}\n" for i in range(30, 40)]
assert next(it) == [f"{i}\n" for i in range(40, 50)]
assert await anext(it) == [f"{i}\n" for i in range(10)]
assert await anext(it) == [f"{i}\n" for i in range(10, 20)]
assert await anext(it) == [f"{i}\n" for i in range(20, 30)]
assert await anext(it) == [f"{i}\n" for i in range(30, 40)]
assert await anext(it) == [f"{i}\n" for i in range(40, 50)]

assert next(it) is None
assert await anext(it) is None

def test_max_char_batching(self, tmp):
@pytest.mark.asyncio
async def test_max_char_batching(self, tmp):
it = file_tail_iterator(tmp)
assert next(it) is None
assert await anext(it) is None

f = open(tmp, "w")

Expand All @@ -170,31 +188,32 @@ def test_max_char_batching(self, tmp):
f.flush()

# First line will come in a batch of its own
assert next(it) == [f"{'1234567890' * 6000}\n"]
assert await anext(it) == [f"{'1234567890' * 6000}\n"]
# Other 4 lines will be batched together
assert (
next(it)
await anext(it)
== [
f"{'1234567890' * 500}\n",
]
* 4
)
assert next(it) is None
assert await anext(it) is None

def test_delete_file(self):
@pytest.mark.asyncio
async def test_delete_file(self):
with NamedTemporaryFile() as tmp:
it = file_tail_iterator(tmp.name)
f = open(tmp.name, "w")

assert next(it) is None
assert await anext(it) is None

f.write("hi\n")
f.flush()

assert next(it) == ["hi\n"]
assert await anext(it) == ["hi\n"]

# Calls should continue returning None after file deleted.
assert next(it) is None
assert await anext(it) is None


if __name__ == "__main__":
Expand Down
7 changes: 3 additions & 4 deletions python/ray/dashboard/modules/job/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@
import logging
import os
import re
import time
import traceback
from dataclasses import dataclass
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union
from typing import Any, AsyncIterator, Dict, List, Optional, Tuple, Union

from ray._private import ray_constants
from ray._private.gcs_utils import GcsAioClient
Expand Down Expand Up @@ -60,7 +59,7 @@ def redact_url_password(url: str) -> str:
return url


def file_tail_iterator(path: str) -> Iterator[Optional[List[str]]]:
async def file_tail_iterator(path: str) -> AsyncIterator[Optional[List[str]]]:
"""Yield lines from a file as it's written.

Returns lines in batches of up to 10 lines or 20000 characters,
Expand Down Expand Up @@ -114,7 +113,7 @@ def file_tail_iterator(path: str) -> Iterator[Optional[List[str]]]:
chunk_char_count += len(curr_line)
else:
# If EOF is reached sleep for 1s before continuing
time.sleep(1)
await asyncio.sleep(1)


async def parse_and_validate_request(
Expand Down