Skip to content
This repository has been archived by the owner on Jun 4, 2024. It is now read-only.

Commit

Permalink
[Runtime Env] Suppress the log messages when RAY_RUNTIME_ENV_LOG_TO_D…
Browse files Browse the repository at this point in the history
…RIVER_ENABLED=0 (#21806)

There was a user request to disable runtime env logs. This is the first PR that allows users to disable runtime env logs through an env var. Basically if users specify `RAY_RUNTIME_ENV_LOG_TO_DRIVER_ENABLED =0`, this will disable runtime env logs. 

Note that in the log monitor RAY_RUNTIME_ENV_LOG_TO_DRIVER_ENABLED=1 by default. This is temporary, and I'd like to make this 0 by default after improving runtime error failure messages. 

Once we disable log msgs by default, we can unify `RAY_RUNTIME_ENV_LOG_TO_DRIVER_ENABLED` and `RAY_RUNTIME_ENV_LOCAL_DEV_MODE`
  • Loading branch information
rkooo567 authored Jan 25, 2022
1 parent 290f317 commit b2cd123
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 17 deletions.
8 changes: 6 additions & 2 deletions python/ray/_private/log_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
# log monitor start giving backpressure to lower cpu usages.
RAY_LOG_MONITOR_MANY_FILES_THRESHOLD = int(
os.getenv("RAY_LOG_MONITOR_MANY_FILES_THRESHOLD", 1000))
RAY_RUNTIME_ENV_LOG_TO_DRIVER_ENABLED = int(
os.getenv("RAY_RUNTIME_ENV_LOG_TO_DRIVER_ENABLED", 1))


class LogFileInfo:
Expand Down Expand Up @@ -164,8 +166,10 @@ def update_log_filenames(self):
# If gcs server restarts, there can be multiple log files.
gcs_err_path = glob.glob(f"{self.logs_dir}/gcs_server*.err")
# runtime_env setup process is logged here
runtime_env_setup_paths = glob.glob(
f"{self.logs_dir}/runtime_env*.log")
runtime_env_setup_paths = []
if RAY_RUNTIME_ENV_LOG_TO_DRIVER_ENABLED:
runtime_env_setup_paths = glob.glob(
f"{self.logs_dir}/runtime_env*.log")
total_files = 0
for file_path in (log_file_paths + raylet_err_paths + gcs_err_path +
monitor_log_paths + runtime_env_setup_paths):
Expand Down
46 changes: 32 additions & 14 deletions python/ray/_private/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,18 +601,11 @@ def init_log_pubsub():
return s


def get_log_message(subscriber,
num: int = 1e6,
timeout: float = 20,
job_id: Optional[str] = None,
matcher=None) -> List[str]:
"""Gets log lines through GCS / Redis subscriber.
Returns maximum `num` lines of log messages, within `timeout`.
If `job_id` or `match` is specified, only returns log lines from `job_id`
or when `matcher` is true.
"""
def get_log_data(subscriber,
num: int = 1e6,
timeout: float = 20,
job_id: Optional[str] = None,
matcher=None) -> List[str]:
deadline = time.time() + timeout
msgs = []
while time.time() < deadline and len(msgs) < num:
Expand All @@ -632,11 +625,36 @@ def get_log_message(subscriber,
continue
if matcher and all(not matcher(line) for line in logs_data["lines"]):
continue
msgs.extend(logs_data["lines"])

msgs.append(logs_data)
return msgs


def get_log_message(subscriber,
num: int = 1e6,
timeout: float = 20,
job_id: Optional[str] = None,
matcher=None) -> List[str]:
"""Gets log lines through GCS / Redis subscriber.
Returns maximum `num` lines of log messages, within `timeout`.
If `job_id` or `match` is specified, only returns log lines from `job_id`
or when `matcher` is true.
"""
msgs = get_log_data(subscriber, num, timeout, job_id, matcher)
return [msg["lines"] for msg in msgs]


def get_log_sources(subscriber,
num: int = 1e6,
timeout: float = 20,
job_id: Optional[str] = None,
matcher=None):
"""Get the source of all log messages"""
msgs = get_log_data(subscriber, num, timeout, job_id, matcher)
return {msg["pid"] for msg in msgs}


def get_log_batch(subscriber,
num: int,
timeout: float = 20,
Expand Down
32 changes: 31 additions & 1 deletion python/ray/tests/test_runtime_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

import ray
from ray.exceptions import RuntimeEnvSetupError
from ray._private.test_utils import wait_for_condition, get_error_message
from ray._private.test_utils import (wait_for_condition, get_error_message,
get_log_sources)
from ray._private.utils import (get_wheel_filename, get_master_wheel_url,
get_release_wheel_url)

Expand Down Expand Up @@ -279,6 +280,35 @@ def f():
ray.get(a.ready.remote())


@pytest.fixture
def enable_dev_mode(local_env_var_enabled):
enabled = "1" if local_env_var_enabled else "0"
os.environ["RAY_RUNTIME_ENV_LOG_TO_DRIVER_ENABLED"] = enabled
yield
del os.environ["RAY_RUNTIME_ENV_LOG_TO_DRIVER_ENABLED"]


@pytest.mark.skipif(
sys.platform == "win32",
reason="conda in runtime_env unsupported on Windows.")
@pytest.mark.parametrize("local_env_var_enabled", [False, True])
def test_runtime_env_log_msg(local_env_var_enabled, enable_dev_mode,
ray_start_cluster_head, log_pubsub):
p = log_pubsub

@ray.remote
def f():
pass

good_env = {"pip": ["requests"]}
ray.get(f.options(runtime_env=good_env).remote())
sources = get_log_sources(p, 5)
if local_env_var_enabled:
assert "runtime_env" in sources
else:
assert "runtime_env" not in sources


if __name__ == "__main__":
import sys
sys.exit(pytest.main(["-sv", __file__]))

0 comments on commit b2cd123

Please sign in to comment.