From e9f88553d662558252c8209aeed18dcdbe5a18ce Mon Sep 17 00:00:00 2001 From: harborn Date: Thu, 17 Aug 2023 09:58:19 +0000 Subject: [PATCH 01/11] Support Intel GPU Signed-off-by: harborn --- python/ray/_private/ray_constants.py | 10 +++ python/ray/_private/resource_spec.py | 100 ++++++++++++++++------ python/ray/_private/utils.py | 101 ++++++++++++++++++++++- python/ray/_private/worker.py | 67 ++++++++++++++- python/ray/tests/test_actor_resources.py | 30 +++++-- python/ray/tests/test_advanced_2.py | 96 ++++++++++++++++++++- python/ray/tests/test_advanced_6.py | 52 +++++++++++- python/ray/tests/test_basic.py | 23 +++++- 8 files changed, 437 insertions(+), 42 deletions(-) diff --git a/python/ray/_private/ray_constants.py b/python/ray/_private/ray_constants.py index 78bff0dfd134..cd8a1d19c076 100644 --- a/python/ray/_private/ray_constants.py +++ b/python/ray/_private/ray_constants.py @@ -400,6 +400,7 @@ def env_set_by_user(key): "RAY_EXPERIMENTAL_NOSET_NEURON_RT_VISIBLE_CORES" ) NOSET_CUDA_VISIBLE_DEVICES_ENV_VAR = "RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES" + CUDA_VISIBLE_DEVICES_ENV_VAR = "CUDA_VISIBLE_DEVICES" NEURON_RT_VISIBLE_CORES_ENV_VAR = "NEURON_RT_VISIBLE_CORES" NEURON_CORES = "neuron_cores" @@ -416,6 +417,15 @@ def env_set_by_user(key): "inf2.24xlarge": 12, "inf2.48xlarge": 24, } + +NOSET_ONEAPI_DEVICE_SELECTOR_ENV_VAR = "RAY_EXPERIMENTAL_NOSET_ONEAPI_DEVICE_SELECTOR" + +RAY_ONEAPI_DEVICE_BACKEND_TYPE = "level_zero" +RAY_ONEAPI_DEVICE_TYPE = "gpu" + +RAY_DEVICE_SUPPORT_TYPES = {"CPU", "CUDA", "XPU"} +RAY_ACCELERATOR_DEFAULT = "CUDA" + RAY_WORKER_NICENESS = "RAY_worker_niceness" # Default max_retries option in @ray.remote for non-actor diff --git a/python/ray/_private/resource_spec.py b/python/ray/_private/resource_spec.py index 4cf5b01a0338..be457ca27e84 100644 --- a/python/ray/_private/resource_spec.py +++ b/python/ray/_private/resource_spec.py @@ -171,35 +171,21 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None): if is_head: resources[HEAD_NODE_RESOURCE_NAME] = 1.0 + # Get cpu num num_cpus = self.num_cpus if num_cpus is None: num_cpus = ray._private.utils.get_num_cpus() - num_gpus = self.num_gpus - gpu_ids = ray._private.utils.get_cuda_visible_devices() - # Check that the number of GPUs that the raylet wants doesn't - # exceed the amount allowed by CUDA_VISIBLE_DEVICES. - if num_gpus is not None and gpu_ids is not None and num_gpus > len(gpu_ids): - raise ValueError( - "Attempting to start raylet with {} GPUs, " - "but CUDA_VISIBLE_DEVICES contains {}.".format(num_gpus, gpu_ids) - ) - if num_gpus is None: - # Try to automatically detect the number of GPUs. - num_gpus = _autodetect_num_gpus() - # Don't use more GPUs than allowed by CUDA_VISIBLE_DEVICES. - if gpu_ids is not None: - num_gpus = min(num_gpus, len(gpu_ids)) - - try: - if importlib.util.find_spec("GPUtil") is not None: - gpu_types = _get_gpu_types_gputil() - else: - info_string = _get_gpu_info_string() - gpu_types = _constraints_from_gpu_info(info_string) + # Get accelerate device info + accelerator = ray._private.utils.get_current_accelerator() + if accelerator == "CUDA": # get cuda device num + num_gpus, gpu_types = _get_cuda_info(self.num_gpus) + resources.update(gpu_types) + elif accelerator == "XPU": # get xpu device num + # here we take xpu as gpu, so no need to develop core's scheduling policy + # If we don't want to take xpu as gpu, ray core need to develop new scheduling policy + num_gpus, gpu_types = _get_xpu_info(self.num_gpus) resources.update(gpu_types) - except Exception: - logger.exception("Could not parse gpu information.") accelerator.update_resources_with_accelerator_type(resources) @@ -281,6 +267,72 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None): return spec +def _get_cuda_info(num_gpus): + """Attemp to process the number and type of GPUs + Notice: + If gpu id not specified in CUDA_VISIBLE_DEVICES, + and num_gpus is defined in task or actor, + this function will return the input num_gpus, not 0 + + Returns: + (num_gpus, gpu_types) + """ + gpu_ids = ray._private.utils.get_cuda_visible_devices() + # Check that the number of GPUs that the raylet wants doesn't + # exceed the amount allowed by CUDA_VISIBLE_DEVICES. + if num_gpus is not None and gpu_ids is not None and num_gpus > len(gpu_ids): + raise ValueError( + "Attempting to start raylet with {} GPUs, " + "but CUDA_VISIBLE_DEVICES contains {}.".format(num_gpus, gpu_ids) + ) + if num_gpus is None: + # Try to automatically detect the number of GPUs. + num_gpus = _autodetect_num_gpus() + # Don't use more GPUs than allowed by CUDA_VISIBLE_DEVICES. + if gpu_ids is not None: + num_gpus = min(num_gpus, len(gpu_ids)) + + gpu_types = "" + try: + if importlib.util.find_spec("GPUtil") is not None: + gpu_types = _get_gpu_types_gputil() + else: + info_string = _get_gpu_info_string() + gpu_types = _constraints_from_gpu_info(info_string) + except Exception: + logger.exception("Could not parse gpu information.") + + return num_gpus, gpu_types + + +def _get_xpu_info(num_xpus): + """Attempt to process the number of XPUs + Notice: + If xpu id not specified in ONEAPI_DEVICE_SELECTOR, + and num_gpus is defined in task or actor, + this function will return the input num_gpus, not 0 + + Returns: + (num_xpus, xpu_types) + """ + # get visible xpu ids + xpu_ids = ray._private.utils.get_xpu_visible_devices() + if num_xpus is not None and xpu_ids is not None and num_xpus > len(xpu_ids): + raise ValueError( + "Attempting to start raylet with {} XPUs, " + "but ONEAPI_DEVICE_SELECTOR contains {}.".format(num_xpus, xpu_ids) + ) + if num_xpus is None: + # Try to detect all number of XPUs. + num_xpus = len(ray._private.utils.get_xpu_all_devices()) + # Don't use more XPUs than allowed by ONEAPI_DEVICE_SELECTOR. + if xpu_ids is not None: + num_xpus = min(num_xpus, len(xpu_ids)) + + xpu_types = {f"{ray_constants.RESOURCE_CONSTRAINT_PREFIX}" "xpu": 1} + return num_xpus, xpu_types + + def _autodetect_num_gpus(): """Attempt to detect the number of GPUs on this machine. diff --git a/python/ray/_private/utils.py b/python/ray/_private/utils.py index 4d90dc4610c0..18132d5b6bc4 100644 --- a/python/ray/_private/utils.py +++ b/python/ray/_private/utils.py @@ -354,6 +354,81 @@ def _get_visible_ids(env_var: str) -> Optional[List[str]]: last_set_neuron_core_ids = None +def get_xpu_devices(): + """Get xpu device IDs by calling `dpctl` api + device with specific backend and device_type + Returns: + devices IDs (List[str]): return the list of string representing + the relative IDs filtered by + ONEAPI_DEVICE_SELECTOR, specific backend and device_type + returned visible IDs start with index 0 + Example: + if ONEAPI_DEVICE_SELECTOR="level_zero:2,3,4" + the device IDs enumerated will be [0,1,2] + same with CUDA_VISIBLE_DEVICES + """ + backend = ray_constants.RAY_ONEAPI_DEVICE_BACKEND_TYPE + device_type = ray_constants.RAY_ONEAPI_DEVICE_TYPE + xpu_ids = [] + try: + import dpctl + + for dev in dpctl.get_devices(backend=backend, device_type=device_type): + # device filter_string with format: "backend:device_type:relative_id" + xpu_ids.append(int(dev.filter_string.split(":")[-1])) + except ImportError: + ValueError("Import dpctl error, maybe dpctl not installed.") + return xpu_ids + + +def get_xpu_visible_devices(): + """Get xpu devices IDs filtered by ONEAPI_DEVICE_SELECTOR environment variable. + Returns: + devices (List[str]): return the list of string representing the relative IDS + filtered by ONEAPI_DEVICE_SELECTOR. + """ + if os.environ.get("ONEAPI_DEVICE_SELECTOR", None) is None: + return None + + xpu_ids = get_xpu_devices() + + return xpu_ids + + +def get_xpu_all_devices(): + """Get all xpu device IDS without ONEAPI_DEVICE_SELECTOR filter, + But all xpu device still filtered by specific backend and device_type + Returns: + devices (List[str]): list of strings representing the numeric index (zero-based), + with sepcific backend and device_type + """ + selector = os.environ.get("ONEAPI_DEVICE_SELECTOR", None) + # unset "ONEAPI_DEVICE_SELECTOR" + os.unsetenv("ONEAPI_DEVICE_SELECTOR") + + xpu_ids = get_xpu_devices() + + # set "ONEAPI_DEVICE_SELECTOR" value back + if selector is not None: + os.environ["ONEAPI_DEVICE_SELECTOR"] = selector + + return xpu_ids + + +def get_current_accelerator(): + return os.environ.get( + "RAY_EXPERIMENTAL_ACCELERATOR_TYPE", ray_constants.RAY_ACCELERATOR_DEFAULT + ) + + +def get_gpu_visible_devices(): + accelerator = get_current_accelerator() + if get_current_accelerator() == "XPU": + return get_xpu_visible_devices() + elif accelerator == "CUDA": + return get_cuda_visible_devices() + + def set_omp_num_threads_if_unset() -> bool: """Set the OMP_NUM_THREADS to default to num cpus assigned to the worker @@ -398,17 +473,39 @@ def set_cuda_visible_devices(gpu_ids: List[str]): """Set the CUDA_VISIBLE_DEVICES environment variable. Args: - gpu_ids (List[str]): List of strings representing GPU IDs. + dev_ids (List[str]): List of strings representing GPU IDs. """ if os.environ.get(ray_constants.NOSET_CUDA_VISIBLE_DEVICES_ENV_VAR): return global last_set_gpu_ids - if last_set_gpu_ids == gpu_ids: + if last_set_gpu_ids == dev_ids: return # optimization: already set _set_visible_ids(gpu_ids, ray_constants.CUDA_VISIBLE_DEVICES_ENV_VAR) last_set_gpu_ids = gpu_ids +def set_xpu_visible_devices(dev_ids): + """Set the ONEAPI_DEVICE_SELECTOR environment variable. + Args: + dev_ids (List[str]): List of strings representing GPU IDs + """ + + if os.environ.get(ray_constants.NOSET_ONEAPI_DEVICE_SELECTOR_ENV_VAR): + return + + backend = ray_constants.RAY_ONEAPI_DEVICE_BACKEND_TYPE + dev_str = ",".join([str(i) for i in dev_ids]) + os.environ["ONEAPI_DEVICE_SELECTOR"] = backend + ":" + dev_str + + +def set_gpu_visible_devices(device_ids): + accelerator = get_current_accelerator() + if accelerator == "XPU": + return set_xpu_visible_devices(device_ids) + elif accelerator == "CUDA": + return set_cuda_visible_devices(device_ids) + + def set_gpu_and_accelerator_runtime_ids() -> None: """Set (CUDA_VISIBLE_DEVICES, NEURON_RT_VISIBLE_CORES, ..) environment variables based on the accelerator runtime. diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index d8389d3ae36e..32c3da808f9d 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -430,6 +430,7 @@ def __init__(self): self.original_gpu_and_accelerator_runtime_ids = ( ray._private.utils.get_gpu_and_accelerator_runtime_ids() ) + # A dictionary that maps from driver id to SerializationContext # TODO: clean up the SerializationContext once the job finished. self.serialization_context_map = {} @@ -883,9 +884,7 @@ def get_resource_ids_for_resource( return list(assigned_ids) -@PublicAPI -@client_mode_hook -def get_gpu_ids(): +def get_cuda_ids(): """Get the IDs of the GPUs that are available to the worker. If the CUDA_VISIBLE_DEVICES environment variable was set when the worker @@ -903,6 +902,67 @@ def get_gpu_ids(): ) +def get_xpu_ids(): + """Get the IDs of the XPUs that are available to the worker. + + If the ONEAPI_DEVICE_SELECTOR environment variable was set before the worker + started up, then the IDs returned by this method will be a subset of the + IDs in ONEAPI_DEVICE_SELECTOR. If not, the IDs will fall in the range + [0, NUM_GPUS - 1], where NUM_GPUS is the number of GPUs that the node has. + + Returns: + A list of XPU IDs + """ + worker = global_worker + worker.check_connected() + + if worker.mode != WORKER_MODE: + if log_once("worker_get_gpu_ids_empty_from_driver"): + logger.warning( + "`ray.get_xpu_ids()` will always return the empty list when " + "called from the driver. This is because Ray does not manage " + "XPU allocations to the driver process." + ) + + # Get all resources from global core worker + all_resource_ids = global_worker.core_worker.resource_ids() + assigned_ids = set() + for resource, assignment in all_resource_ids.items(): + # Handle both normal and placement group GPU resources. + # Note: We should only get the GPU ids from the placement + # group resource that does not contain the bundle index! + import re + + if resource == "GPU" or re.match(r"^GPU_group_[0-9A-Za-z]+$", resource): + for resource_id, _ in assignment: + assigned_ids.add(resource_id) + assigned_ids = list(assigned_ids) + # If the user had already set ONEAPI_DEVICE_SELECTOR, then respect that (in + # the sense that only GPU IDs that appear in ONEAPI_DEVICE_SELECTOR should be + # returned). + if global_worker.original_gpu_ids is not None: + assigned_ids = [ + global_worker.original_gpu_ids[gpu_id] for gpu_id in assigned_ids + ] + # Give all GPUs in local_mode. + if global_worker.mode == LOCAL_MODE: + max_gpus = global_worker.node.get_resource_spec().num_gpus + assigned_ids = global_worker.original_gpu_ids[:max_gpus] + + return assigned_ids + + +@PublicAPI +@client_mode_hook +def get_gpu_ids(): + accelerator = ray._private.utils.get_current_accelerator() + if accelerator == "CUDA": + return get_cuda_ids() + elif accelerator == "XPU": + return get_xpu_ids() + return [] + + @Deprecated( message="Use ray.get_runtime_context().get_assigned_resources() instead.", warning=True, @@ -1492,7 +1552,6 @@ def init( usage_lib.show_usage_stats_prompt(cli=False) else: usage_lib.set_usage_stats_enabled_via_env_var(False) - # Use a random port by not specifying Redis port / GCS server port. ray_params = ray._private.parameter.RayParams( node_ip_address=node_ip_address, diff --git a/python/ray/tests/test_actor_resources.py b/python/ray/tests/test_actor_resources.py index e501fb658b45..a5196b2b6c06 100644 --- a/python/ray/tests/test_actor_resources.py +++ b/python/ray/tests/test_actor_resources.py @@ -8,6 +8,8 @@ import ray import ray.cluster_utils +import ray._private.ray_constants as ray_constants + try: import pytest_timeout except ImportError: @@ -80,7 +82,9 @@ def echo(self, value): @pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") -def test_actor_gpus(ray_start_cluster): +@pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) +def test_actor_gpus(ray_start_cluster, ACCELERATOR_TYPE): + os.environ["RAY_EXPERIMENTAL_ACCELERATOR_TYPE"] = ACCELERATOR_TYPE cluster = ray_start_cluster num_nodes = 3 num_gpus_per_raylet = 4 @@ -123,7 +127,9 @@ def get_location_and_ids(self): assert ready_ids == [] -def test_actor_multiple_gpus(ray_start_cluster): +@pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) +def test_actor_multiple_gpus(ray_start_cluster, ACCELERATOR_TYPE): + os.environ["RAY_EXPERIMENTAL_ACCELERATOR_TYPE"] = ACCELERATOR_TYPE cluster = ray_start_cluster num_nodes = 3 num_gpus_per_raylet = 5 @@ -201,7 +207,9 @@ def get_location_and_ids(self): @pytest.mark.skipif(sys.platform == "win32", reason="Very flaky.") -def test_actor_different_numbers_of_gpus(ray_start_cluster): +@pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) +def test_actor_different_numbers_of_gpus(ray_start_cluster, ACCELERATOR_TYPE): + os.environ["RAY_EXPERIMENTAL_ACCELERATOR_TYPE"] = ACCELERATOR_TYPE # Test that we can create actors on two nodes that have different # numbers of GPUs. cluster = ray_start_cluster @@ -243,7 +251,9 @@ def get_location_and_ids(self): assert ready_ids == [] -def test_actor_multiple_gpus_from_multiple_tasks(ray_start_cluster): +@pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) +def test_actor_multiple_gpus_from_multiple_tasks(ray_start_cluster, ACCELERATOR_TYPE): + os.environ["RAY_EXPERIMENTAL_ACCELERATOR_TYPE"] = ACCELERATOR_TYPE cluster = ray_start_cluster num_nodes = 3 num_gpus_per_raylet = 2 @@ -320,7 +330,9 @@ def get_location_and_ids(self): assert ready_ids == [] -def test_actors_and_tasks_with_gpus(ray_start_cluster): +@pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) +def test_actors_and_tasks_with_gpus(ray_start_cluster, ACCELERATOR_TYPE): + os.environ["RAY_EXPERIMENTAL_ACCELERATOR_TYPE"] = ACCELERATOR_TYPE cluster = ray_start_cluster num_nodes = 3 num_gpus_per_raylet = 2 @@ -439,7 +451,9 @@ def locations_to_intervals_for_many_tasks(): assert len(ready_ids) == 0 -def test_actors_and_tasks_with_gpus_version_two(shutdown_only): +@pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) +def test_actors_and_tasks_with_gpus_version_two(shutdown_only, ACCELERATOR_TYPE): + os.environ["RAY_EXPERIMENTAL_ACCELERATOR_TYPE"] = ACCELERATOR_TYPE # Create tasks and actors that both use GPUs and make sure that they # are given different GPUs num_gpus = 4 @@ -616,7 +630,9 @@ def get_location(self): assert location == custom_resource2_node.unique_id -def test_creating_more_actors_than_resources(shutdown_only): +@pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) +def test_creating_more_actors_than_resources(shutdown_only, ACCELERATOR_TYPE): + os.environ["RAY_EXPERIMENTAL_ACCELERATOR_TYPE"] = ACCELERATOR_TYPE ray.init(num_cpus=10, num_gpus=2, resources={"CustomResource1": 1}) @ray.remote(num_gpus=1) diff --git a/python/ray/tests/test_advanced_2.py b/python/ray/tests/test_advanced_2.py index 9be2733a2649..7bd4e1f7e1f5 100644 --- a/python/ray/tests/test_advanced_2.py +++ b/python/ray/tests/test_advanced_2.py @@ -9,6 +9,7 @@ import ray import ray.cluster_utils +from ray._private.ray_constants import RAY_ONEAPI_DEVICE_BACKEND_TYPE as XPU_BACKEND from ray._private.test_utils import RayTestTimeoutException, wait_for_condition from ray.util.placement_group import placement_group from ray.util.accelerators import AWS_NEURON_CORE @@ -17,7 +18,96 @@ logger = logging.getLogger(__name__) -def test_gpu_ids(shutdown_only): +def test_xpu_ids(shutdown_only): + os.environ["RAY_EXPERIMENTAL_ACCELERATOR_TYPE"] = "XPU" + + num_gpus = 3 + ray.init(num_cpus=num_gpus, num_gpus=num_gpus) + + def get_gpu_ids(num_gpus_per_worker): + gpu_ids = ray.get_gpu_ids() + assert len(gpu_ids) == num_gpus_per_worker + assert os.environ["ONEAPI_DEVICE_SELECTOR"] == XPU_BACKEND + ":" + ",".join( + [str(i) for i in gpu_ids] # noqa + ) + for gpu_id in gpu_ids: + assert gpu_id in range(num_gpus) + return gpu_ids + + f0 = ray.remote(num_gpus=0)(lambda: get_gpu_ids(0)) + f1 = ray.remote(num_gpus=1)(lambda: get_gpu_ids(1)) + f2 = ray.remote(num_gpus=2)(lambda: get_gpu_ids(2)) + f3 = ray.remote(num_gpus=3)(lambda: get_gpu_ids(3)) + + # Wait for all workers to start up. + @ray.remote + def f(): + time.sleep(0.2) + return os.getpid() + + start_time = time.time() + while True: + num_workers_started = len(set(ray.get([f.remote() for _ in range(num_gpus)]))) + if num_workers_started == num_gpus: + break + if time.time() > start_time + 10: + raise RayTestTimeoutException( + "Timed out while waiting for workers to start up." + ) + + list_of_ids = ray.get([f0.remote() for _ in range(10)]) + assert list_of_ids == 10 * [[]] + ray.get([f1.remote() for _ in range(10)]) + ray.get([f2.remote() for _ in range(10)]) + + # Test that actors have ONEAPI_DEVICE_SELECTOR set properly. + + @ray.remote + class Actor0: + def __init__(self): + gpu_ids = ray.get_gpu_ids() + assert len(gpu_ids) == 0 + assert os.environ["ONEAPI_DEVICE_SELECTOR"] == XPU_BACKEND + ":" + ",".join( + [str(i) for i in gpu_ids] # noqa + ) + # Set self.x to make sure that we got here. + self.x = 1 + + def test(self): + gpu_ids = ray.get_gpu_ids() + assert len(gpu_ids) == 0 + assert os.environ["ONEAPI_DEVICE_SELECTOR"] == XPU_BACKEND + ":" + ",".join( + [str(i) for i in gpu_ids] + ) + return self.x + + @ray.remote(num_gpus=1) + class Actor1: + def __init__(self): + gpu_ids = ray.get_gpu_ids() + assert len(gpu_ids) == 1 + assert os.environ["ONEAPI_DEVICE_SELECTOR"] == XPU_BACKEND + ":" + ",".join( + [str(i) for i in gpu_ids] + ) + # Set self.x to make sure that we got here. + self.x = 1 + + def test(self): + gpu_ids = ray.get_gpu_ids() + assert len(gpu_ids) == 1 + assert os.environ["ONEAPI_DEVICE_SELECTOR"] == XPU_BACKEND + ":" + ",".join( + [str(i) for i in gpu_ids] + ) + return self.x + + a0 = Actor0.remote() + ray.get(a0.test.remote()) + + a1 = Actor1.remote() + ray.get(a1.test.remote()) + + +def test_cuda_ids(shutdown_only): num_gpus = 3 ray.init(num_cpus=num_gpus, num_gpus=num_gpus) @@ -146,7 +236,9 @@ def method(self): assert valid_node.unique_id == ray.get(a.method.remote()) -def test_fractional_resources(shutdown_only): +@pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) +def test_fractional_resources(shutdown_only, ACCELERATOR_TYPE): + os.environ["RAY_EXPERIMENTAL_ACCELERATOR_TYPE"] = ACCELERATOR_TYPE ray.init(num_cpus=6, num_gpus=3, resources={"Custom": 1}) @ray.remote(num_gpus=0.5) diff --git a/python/ray/tests/test_advanced_6.py b/python/ray/tests/test_advanced_6.py index 3e916f7eebcb..baea4fc73fd0 100644 --- a/python/ray/tests/test_advanced_6.py +++ b/python/ray/tests/test_advanced_6.py @@ -11,12 +11,17 @@ import ray import ray.cluster_utils + from ray._private.test_utils import ( run_string_as_driver_nonblocking, wait_for_condition, wait_for_pid_to_exit, ) +from ray._private.ray_constants import RAY_ONEAPI_DEVICE_BACKEND_TYPE as XPU_BACKEND + +from unittest.mock import Mock, MagicMock + logger = logging.getLogger(__name__) @@ -37,10 +42,29 @@ def save_gpu_ids_shutdown_only(): del os.environ["CUDA_VISIBLE_DEVICES"] +@pytest.fixture +def save_xpu_ids_shutdown_only(): + # Record the curent value of this environment variable so that we can + # reset it after the test. + selector = os.environ.get("ONEAPI_DEVICE_SELECTOR", None) + + yield None + + # The code after the yield will run as teardown code. + ray.shutdown() + # Reset the environment variable. + if selector is not None: + os.environ["ONEAPI_DEVICE_SELECTOR"] = selector + else: + del os.environ["ONEAPI_DEVICE_SELECTOR"] + + @pytest.mark.skipif(platform.system() == "Windows", reason="Hangs on Windows") -def test_specific_gpus(save_gpu_ids_shutdown_only): +def test_specific_cudas(save_gpu_ids_shutdown_only): + os.environ["RAY_EXPERIMENTAL_ACCELERATOR_TYPE"] = "CUDA" allowed_gpu_ids = [4, 5, 6] os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in allowed_gpu_ids]) + ray.init(num_gpus=3) @ray.remote(num_gpus=1) @@ -60,7 +84,7 @@ def g(): ray.get([g.remote() for _ in range(100)]) -def test_local_mode_gpus(save_gpu_ids_shutdown_only): +def test_local_mode_cudas(save_gpu_ids_shutdown_only): allowed_gpu_ids = [4, 5, 6, 7, 8] os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in allowed_gpu_ids]) @@ -80,6 +104,30 @@ def f(): ray.get([f.remote() for _ in range(100)]) +def test_local_mode_xpus(save_xpu_ids_shutdown_only): + os.environ["RAY_EXPERIMENTAL_ACCELERATOR_TYPE"] = "XPU" + allowed_xpu_ids = [0, 1, 2, 3, 4, 5] + os.environ["ONEAPI_DEVICE_SELECTOR"] = ( + XPU_BACKEND + ":" + ",".join([str(i) for i in allowed_xpu_ids]) + ) + + ray._private.utils.get_xpu_devices = Mock(return_value=allowed_xpu_ids) + from importlib import reload + + reload(ray._private.worker) + + ray.init(num_gpus=3, local_mode=True) + + @ray.remote + def f(): + gpu_ids = ray.get_gpu_ids() + assert len(gpu_ids) == 3 + for gpu in gpu_ids: + assert int(gpu) in allowed_xpu_ids + + ray.get([f.remote() for _ in range(100)]) + + def test_blocking_tasks(ray_start_regular): @ray.remote def f(i, j): diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index 193a847d8b4d..32b34523e978 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -230,7 +230,9 @@ def g(): assert ray.get(f.options(num_cpus=4).remote()) == "1" -def test_submit_api(shutdown_only): +@pytest.mark.parametrize("ACCELERATOR_TYPE", ["CUDA", "XPU"]) +def test_submit_api(shutdown_only, ACCELERATOR_TYPE): + os.environ["RAY_EXPERIMENTAL_ACCELERATOR_TYPE"] = ACCELERATOR_TYPE ray.init(num_cpus=2, num_gpus=1, resources={"Custom": 1}) @ray.remote @@ -540,6 +542,25 @@ def check(): ) +def test_disable_xpu_devices(): + script = """ +import ray +ray.init() + +@ray.remote +def check(): + import os + assert "ONEAPI_DEVICE_SELECTOR" not in os.environ + +print("remote", ray.get(check.remote())) +""" + + run_string_as_driver( + script, + dict(os.environ, **{"RAY_EXPERIMENTAL_NOSET_ONEAPI_DEVICE_SELECTOR": "1"}), + ) + + def test_put_get(shutdown_only): ray.init(num_cpus=0) From 6b1db6db4d328844a090a43a1ef82f2c7fcd5d36 Mon Sep 17 00:00:00 2001 From: harborn Date: Thu, 17 Aug 2023 10:08:55 +0000 Subject: [PATCH 02/11] add requirements_xpu.txt for xpu Signed-off-by: harborn --- python/requirements_xpu.txt | 1 + 1 file changed, 1 insertion(+) create mode 100644 python/requirements_xpu.txt diff --git a/python/requirements_xpu.txt b/python/requirements_xpu.txt new file mode 100644 index 000000000000..3e7ffe68d330 --- /dev/null +++ b/python/requirements_xpu.txt @@ -0,0 +1 @@ +dpctl From 58a49931347ee7461da7a070c412d66478683d65 Mon Sep 17 00:00:00 2001 From: harborn Date: Thu, 17 Aug 2023 10:36:16 +0000 Subject: [PATCH 03/11] fix format Signed-off-by: harborn --- python/ray/_private/resource_spec.py | 3 ++- python/ray/_private/utils.py | 4 ++-- python/ray/tests/test_actor_resources.py | 2 -- python/ray/tests/test_advanced_2.py | 1 - python/ray/tests/test_advanced_6.py | 2 +- 5 files changed, 5 insertions(+), 7 deletions(-) diff --git a/python/ray/_private/resource_spec.py b/python/ray/_private/resource_spec.py index be457ca27e84..66c55da31bcd 100644 --- a/python/ray/_private/resource_spec.py +++ b/python/ray/_private/resource_spec.py @@ -183,7 +183,8 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None): resources.update(gpu_types) elif accelerator == "XPU": # get xpu device num # here we take xpu as gpu, so no need to develop core's scheduling policy - # If we don't want to take xpu as gpu, ray core need to develop new scheduling policy + # If we don't want to take xpu as gpu, + # ray core need to develop new scheduling policy num_gpus, gpu_types = _get_xpu_info(self.num_gpus) resources.update(gpu_types) diff --git a/python/ray/_private/utils.py b/python/ray/_private/utils.py index 18132d5b6bc4..1e139c5dbfaf 100644 --- a/python/ray/_private/utils.py +++ b/python/ray/_private/utils.py @@ -399,8 +399,8 @@ def get_xpu_all_devices(): """Get all xpu device IDS without ONEAPI_DEVICE_SELECTOR filter, But all xpu device still filtered by specific backend and device_type Returns: - devices (List[str]): list of strings representing the numeric index (zero-based), - with sepcific backend and device_type + devices (List[str]): list of strings representing + the numeric index (zero-based), with sepcific backend and device_type """ selector = os.environ.get("ONEAPI_DEVICE_SELECTOR", None) # unset "ONEAPI_DEVICE_SELECTOR" diff --git a/python/ray/tests/test_actor_resources.py b/python/ray/tests/test_actor_resources.py index a5196b2b6c06..3e5386c4a4f7 100644 --- a/python/ray/tests/test_actor_resources.py +++ b/python/ray/tests/test_actor_resources.py @@ -8,8 +8,6 @@ import ray import ray.cluster_utils -import ray._private.ray_constants as ray_constants - try: import pytest_timeout except ImportError: diff --git a/python/ray/tests/test_advanced_2.py b/python/ray/tests/test_advanced_2.py index 7bd4e1f7e1f5..d07944b37312 100644 --- a/python/ray/tests/test_advanced_2.py +++ b/python/ray/tests/test_advanced_2.py @@ -37,7 +37,6 @@ def get_gpu_ids(num_gpus_per_worker): f0 = ray.remote(num_gpus=0)(lambda: get_gpu_ids(0)) f1 = ray.remote(num_gpus=1)(lambda: get_gpu_ids(1)) f2 = ray.remote(num_gpus=2)(lambda: get_gpu_ids(2)) - f3 = ray.remote(num_gpus=3)(lambda: get_gpu_ids(3)) # Wait for all workers to start up. @ray.remote diff --git a/python/ray/tests/test_advanced_6.py b/python/ray/tests/test_advanced_6.py index baea4fc73fd0..b10e930f6883 100644 --- a/python/ray/tests/test_advanced_6.py +++ b/python/ray/tests/test_advanced_6.py @@ -20,7 +20,7 @@ from ray._private.ray_constants import RAY_ONEAPI_DEVICE_BACKEND_TYPE as XPU_BACKEND -from unittest.mock import Mock, MagicMock +from unittest.mock import Mock logger = logging.getLogger(__name__) From 4b8afcd9f84b90289d5654f1717e17863bf58cf4 Mon Sep 17 00:00:00 2001 From: harborn Date: Thu, 17 Aug 2023 13:37:37 +0000 Subject: [PATCH 04/11] fix UT error Signed-off-by: harborn --- python/ray/tests/test_advanced_2.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/ray/tests/test_advanced_2.py b/python/ray/tests/test_advanced_2.py index d07944b37312..dfe0b9bb3ff9 100644 --- a/python/ray/tests/test_advanced_2.py +++ b/python/ray/tests/test_advanced_2.py @@ -107,6 +107,7 @@ def test(self): def test_cuda_ids(shutdown_only): + os.environ["RAY_EXPERIMENTAL_ACCELERATOR_TYPE"] = "CUDA" num_gpus = 3 ray.init(num_cpus=num_gpus, num_gpus=num_gpus) @@ -803,6 +804,8 @@ def test(self): # TODO: 5 retry attempts may be too little for Travis and we may need to # increase it if this test begins to be flaky on Travis. def test_zero_capacity_deletion_semantics(shutdown_only): + os.environ["RAY_EXPERIMENTAL_ACCELERATOR_TYPE"] = "CUDA" + ray.init(num_cpus=2, num_gpus=1, resources={"test_resource": 1}) def delete_miscellaneous_item(resources): From 44a95fdc97b37be3323594b549387a3471458130 Mon Sep 17 00:00:00 2001 From: harborn Date: Thu, 17 Aug 2023 09:58:19 +0000 Subject: [PATCH 05/11] Support Intel GPU Signed-off-by: harborn --- python/ray/_private/utils.py | 97 ++++++++++++++++++++++++ python/ray/_private/worker.py | 51 +++++++++++++ python/ray/tests/test_actor_resources.py | 2 + python/ray/tests/test_advanced_2.py | 87 +++++++++++++++++++++ 4 files changed, 237 insertions(+) diff --git a/python/ray/_private/utils.py b/python/ray/_private/utils.py index 1e139c5dbfaf..e03f5bcb44bd 100644 --- a/python/ray/_private/utils.py +++ b/python/ray/_private/utils.py @@ -429,6 +429,81 @@ def get_gpu_visible_devices(): return get_cuda_visible_devices() +def get_xpu_devices(): + """Get xpu device IDs by calling `dpctl` api + device with specific backend and device_type + Returns: + devices IDs (List[str]): return the list of string representing + the relative IDs filtered by + ONEAPI_DEVICE_SELECTOR, specific backend and device_type + returned visible IDs start with index 0 + Example: + if ONEAPI_DEVICE_SELECTOR="level_zero:2,3,4" + the device IDs enumerated will be [0,1,2] + same with CUDA_VISIBLE_DEVICES + """ + backend = ray_constants.RAY_ONEAPI_DEVICE_BACKEND_TYPE + device_type = ray_constants.RAY_ONEAPI_DEVICE_TYPE + xpu_ids = [] + try: + import dpctl + + for dev in dpctl.get_devices(backend=backend, device_type=device_type): + # device filter_string with format: "backend:device_type:relative_id" + xpu_ids.append(int(dev.filter_string.split(":")[-1])) + except ImportError: + ValueError("Import dpctl error, maybe dpctl not installed.") + return xpu_ids + + +def get_xpu_visible_devices(): + """Get xpu devices IDs filtered by ONEAPI_DEVICE_SELECTOR environment variable. + Returns: + devices (List[str]): return the list of string representing the relative IDS + filtered by ONEAPI_DEVICE_SELECTOR. + """ + if os.environ.get("ONEAPI_DEVICE_SELECTOR", None) is None: + return None + + xpu_ids = get_xpu_devices() + + return xpu_ids + + +def get_xpu_all_devices(): + """Get all xpu device IDS without ONEAPI_DEVICE_SELECTOR filter, + But all xpu device still filtered by specific backend and device_type + Returns: + devices (List[str]): list of strings representing the numeric index (zero-based), + with sepcific backend and device_type + """ + selector = os.environ.get("ONEAPI_DEVICE_SELECTOR", None) + # unset "ONEAPI_DEVICE_SELECTOR" + os.unsetenv("ONEAPI_DEVICE_SELECTOR") + + xpu_ids = get_xpu_devices() + + # set "ONEAPI_DEVICE_SELECTOR" value back + if selector is not None: + os.environ["ONEAPI_DEVICE_SELECTOR"] = selector + + return xpu_ids + + +def get_current_accelerator(): + return os.environ.get( + "RAY_EXPERIMENTAL_ACCELERATOR_TYPE", ray_constants.RAY_ACCELERATOR_DEFAULT + ) + + +def get_gpu_visible_devices(): + accelerator = get_current_accelerator() + if get_current_accelerator() == "XPU": + return get_xpu_visible_devices() + elif accelerator == "CUDA": + return get_cuda_visible_devices() + + def set_omp_num_threads_if_unset() -> bool: """Set the OMP_NUM_THREADS to default to num cpus assigned to the worker @@ -506,6 +581,28 @@ def set_gpu_visible_devices(device_ids): return set_cuda_visible_devices(device_ids) +def set_xpu_visible_devices(dev_ids): + """Set the ONEAPI_DEVICE_SELECTOR environment variable. + Args: + dev_ids (List[str]): List of strings representing GPU IDs + """ + + if os.environ.get(ray_constants.NOSET_ONEAPI_DEVICE_SELECTOR_ENV_VAR): + return + + backend = ray_constants.RAY_ONEAPI_DEVICE_BACKEND_TYPE + dev_str = ",".join([str(i) for i in dev_ids]) + os.environ["ONEAPI_DEVICE_SELECTOR"] = backend + ":" + dev_str + + +def set_gpu_visible_devices(device_ids): + accelerator = get_current_accelerator() + if accelerator == "XPU": + return set_xpu_visible_devices(device_ids) + elif accelerator == "CUDA": + return set_cuda_visible_devices(device_ids) + + def set_gpu_and_accelerator_runtime_ids() -> None: """Set (CUDA_VISIBLE_DEVICES, NEURON_RT_VISIBLE_CORES, ..) environment variables based on the accelerator runtime. diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index 32c3da808f9d..6a1d249e87a5 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -840,6 +840,7 @@ def get_resource_ids_for_resource( ) -> Union[List[str], List[int]]: """Get the resource IDs that are assigned to the given resource. +<<<<<<< HEAD Args: resource_name: The name of the resource. resource_regex: The regex of the resource. @@ -952,6 +953,56 @@ def get_xpu_ids(): return assigned_ids +def get_xpu_ids(): + """Get the IDs of the XPUs that are available to the worker. + + If the ONEAPI_DEVICE_SELECTOR environment variable was set before the worker + started up, then the IDs returned by this method will be a subset of the + IDs in ONEAPI_DEVICE_SELECTOR. If not, the IDs will fall in the range + [0, NUM_GPUS - 1], where NUM_GPUS is the number of GPUs that the node has. + + Returns: + A list of XPU IDs + """ + worker = global_worker + worker.check_connected() + + if worker.mode != WORKER_MODE: + if log_once("worker_get_gpu_ids_empty_from_driver"): + logger.warning( + "`ray.get_xpu_ids()` will always return the empty list when " + "called from the driver. This is because Ray does not manage " + "XPU allocations to the driver process." + ) + + # Get all resources from global core worker + all_resource_ids = global_worker.core_worker.resource_ids() + assigned_ids = set() + for resource, assignment in all_resource_ids.items(): + # Handle both normal and placement group GPU resources. + # Note: We should only get the GPU ids from the placement + # group resource that does not contain the bundle index! + import re + + if resource == "GPU" or re.match(r"^GPU_group_[0-9A-Za-z]+$", resource): + for resource_id, _ in assignment: + assigned_ids.add(resource_id) + assigned_ids = list(assigned_ids) + # If the user had already set ONEAPI_DEVICE_SELECTOR, then respect that (in + # the sense that only GPU IDs that appear in ONEAPI_DEVICE_SELECTOR should be + # returned). + if global_worker.original_gpu_ids is not None: + assigned_ids = [ + global_worker.original_gpu_ids[gpu_id] for gpu_id in assigned_ids + ] + # Give all GPUs in local_mode. + if global_worker.mode == LOCAL_MODE: + max_gpus = global_worker.node.get_resource_spec().num_gpus + assigned_ids = global_worker.original_gpu_ids[:max_gpus] + + return assigned_ids + + @PublicAPI @client_mode_hook def get_gpu_ids(): diff --git a/python/ray/tests/test_actor_resources.py b/python/ray/tests/test_actor_resources.py index 3e5386c4a4f7..a5196b2b6c06 100644 --- a/python/ray/tests/test_actor_resources.py +++ b/python/ray/tests/test_actor_resources.py @@ -8,6 +8,8 @@ import ray import ray.cluster_utils +import ray._private.ray_constants as ray_constants + try: import pytest_timeout except ImportError: diff --git a/python/ray/tests/test_advanced_2.py b/python/ray/tests/test_advanced_2.py index dfe0b9bb3ff9..3957301b419c 100644 --- a/python/ray/tests/test_advanced_2.py +++ b/python/ray/tests/test_advanced_2.py @@ -24,6 +24,93 @@ def test_xpu_ids(shutdown_only): num_gpus = 3 ray.init(num_cpus=num_gpus, num_gpus=num_gpus) + def get_gpu_ids(num_gpus_per_worker): + gpu_ids = ray.get_gpu_ids() + assert len(gpu_ids) == num_gpus_per_worker + assert os.environ["ONEAPI_DEVICE_SELECTOR"] == XPU_BACKEND + ":" + ",".join( + [str(i) for i in gpu_ids] # noqa + ) + for gpu_id in gpu_ids: + assert gpu_id in range(num_gpus) + return gpu_ids + + f0 = ray.remote(num_gpus=0)(lambda: get_gpu_ids(0)) + f1 = ray.remote(num_gpus=1)(lambda: get_gpu_ids(1)) + f2 = ray.remote(num_gpus=2)(lambda: get_gpu_ids(2)) + f3 = ray.remote(num_gpus=3)(lambda: get_gpu_ids(3)) + + # Wait for all workers to start up. + @ray.remote + def f(): + time.sleep(0.2) + return os.getpid() + + start_time = time.time() + while True: + num_workers_started = len(set(ray.get([f.remote() for _ in range(num_gpus)]))) + if num_workers_started == num_gpus: + break + if time.time() > start_time + 10: + raise RayTestTimeoutException( + "Timed out while waiting for workers to start up." + ) + + list_of_ids = ray.get([f0.remote() for _ in range(10)]) + assert list_of_ids == 10 * [[]] + ray.get([f1.remote() for _ in range(10)]) + ray.get([f2.remote() for _ in range(10)]) + + # Test that actors have ONEAPI_DEVICE_SELECTOR set properly. + + @ray.remote + class Actor0: + def __init__(self): + gpu_ids = ray.get_gpu_ids() + assert len(gpu_ids) == 0 + assert os.environ["ONEAPI_DEVICE_SELECTOR"] == XPU_BACKEND + ":" + ",".join( + [str(i) for i in gpu_ids] # noqa + ) + # Set self.x to make sure that we got here. + self.x = 1 + + def test(self): + gpu_ids = ray.get_gpu_ids() + assert len(gpu_ids) == 0 + assert os.environ["ONEAPI_DEVICE_SELECTOR"] == XPU_BACKEND + ":" + ",".join( + [str(i) for i in gpu_ids] + ) + return self.x + + @ray.remote(num_gpus=1) + class Actor1: + def __init__(self): + gpu_ids = ray.get_gpu_ids() + assert len(gpu_ids) == 1 + assert os.environ["ONEAPI_DEVICE_SELECTOR"] == XPU_BACKEND + ":" + ",".join( + [str(i) for i in gpu_ids] + ) + # Set self.x to make sure that we got here. + self.x = 1 + + def test(self): + gpu_ids = ray.get_gpu_ids() + assert len(gpu_ids) == 1 + assert os.environ["ONEAPI_DEVICE_SELECTOR"] == XPU_BACKEND + ":" + ",".join( + [str(i) for i in gpu_ids] + ) + return self.x + + a0 = Actor0.remote() + ray.get(a0.test.remote()) + + a1 = Actor1.remote() + ray.get(a1.test.remote()) + + +def test_cuda_ids(shutdown_only): + num_gpus = 3 + ray.init(num_cpus=num_gpus, num_gpus=num_gpus) + def get_gpu_ids(num_gpus_per_worker): gpu_ids = ray.get_gpu_ids() assert len(gpu_ids) == num_gpus_per_worker From a8e202d964fbad209a4da5dc7be8ed3e08f29576 Mon Sep 17 00:00:00 2001 From: harborn Date: Thu, 17 Aug 2023 10:36:16 +0000 Subject: [PATCH 06/11] fix format Signed-off-by: harborn --- python/ray/_private/utils.py | 4 ++-- python/ray/tests/test_actor_resources.py | 2 -- python/ray/tests/test_advanced_2.py | 1 - 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/python/ray/_private/utils.py b/python/ray/_private/utils.py index e03f5bcb44bd..8ab98c908d85 100644 --- a/python/ray/_private/utils.py +++ b/python/ray/_private/utils.py @@ -474,8 +474,8 @@ def get_xpu_all_devices(): """Get all xpu device IDS without ONEAPI_DEVICE_SELECTOR filter, But all xpu device still filtered by specific backend and device_type Returns: - devices (List[str]): list of strings representing the numeric index (zero-based), - with sepcific backend and device_type + devices (List[str]): list of strings representing + the numeric index (zero-based), with sepcific backend and device_type """ selector = os.environ.get("ONEAPI_DEVICE_SELECTOR", None) # unset "ONEAPI_DEVICE_SELECTOR" diff --git a/python/ray/tests/test_actor_resources.py b/python/ray/tests/test_actor_resources.py index a5196b2b6c06..3e5386c4a4f7 100644 --- a/python/ray/tests/test_actor_resources.py +++ b/python/ray/tests/test_actor_resources.py @@ -8,8 +8,6 @@ import ray import ray.cluster_utils -import ray._private.ray_constants as ray_constants - try: import pytest_timeout except ImportError: diff --git a/python/ray/tests/test_advanced_2.py b/python/ray/tests/test_advanced_2.py index 3957301b419c..d1aea7ee115c 100644 --- a/python/ray/tests/test_advanced_2.py +++ b/python/ray/tests/test_advanced_2.py @@ -37,7 +37,6 @@ def get_gpu_ids(num_gpus_per_worker): f0 = ray.remote(num_gpus=0)(lambda: get_gpu_ids(0)) f1 = ray.remote(num_gpus=1)(lambda: get_gpu_ids(1)) f2 = ray.remote(num_gpus=2)(lambda: get_gpu_ids(2)) - f3 = ray.remote(num_gpus=3)(lambda: get_gpu_ids(3)) # Wait for all workers to start up. @ray.remote From a6e22a7e5c81756c660d595f743e20dd139d5bd7 Mon Sep 17 00:00:00 2001 From: harborn Date: Thu, 17 Aug 2023 13:37:37 +0000 Subject: [PATCH 07/11] fix UT error Signed-off-by: harborn --- python/ray/tests/test_advanced_2.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/ray/tests/test_advanced_2.py b/python/ray/tests/test_advanced_2.py index d1aea7ee115c..e875c3d1ecf1 100644 --- a/python/ray/tests/test_advanced_2.py +++ b/python/ray/tests/test_advanced_2.py @@ -107,6 +107,7 @@ def test(self): def test_cuda_ids(shutdown_only): + os.environ["RAY_EXPERIMENTAL_ACCELERATOR_TYPE"] = "CUDA" num_gpus = 3 ray.init(num_cpus=num_gpus, num_gpus=num_gpus) From cb24f353ec6f52684cd9c101c2b39842479fa913 Mon Sep 17 00:00:00 2001 From: harborn Date: Fri, 18 Aug 2023 08:44:56 +0000 Subject: [PATCH 08/11] fix conflicts after rabase master Signed-off-by: harborn --- python/ray/_private/resource_spec.py | 6 +- python/ray/_private/utils.py | 109 ++------------------------- python/ray/_private/worker.py | 58 ++------------ python/ray/tests/conftest.py | 3 + python/ray/tests/test_advanced_2.py | 90 ---------------------- python/ray/tests/test_advanced_6.py | 9 ++- 6 files changed, 23 insertions(+), 252 deletions(-) diff --git a/python/ray/_private/resource_spec.py b/python/ray/_private/resource_spec.py index 66c55da31bcd..bb4352c18f94 100644 --- a/python/ray/_private/resource_spec.py +++ b/python/ray/_private/resource_spec.py @@ -177,11 +177,11 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None): num_cpus = ray._private.utils.get_num_cpus() # Get accelerate device info - accelerator = ray._private.utils.get_current_accelerator() - if accelerator == "CUDA": # get cuda device num + acc_type = ray._private.utils.get_current_accelerator() + if acc_type == "CUDA": # get cuda device num num_gpus, gpu_types = _get_cuda_info(self.num_gpus) resources.update(gpu_types) - elif accelerator == "XPU": # get xpu device num + elif acc_type == "XPU": # get xpu device num # here we take xpu as gpu, so no need to develop core's scheduling policy # If we don't want to take xpu as gpu, # ray core need to develop new scheduling policy diff --git a/python/ray/_private/utils.py b/python/ray/_private/utils.py index 8ab98c908d85..7cd90c2c09ca 100644 --- a/python/ray/_private/utils.py +++ b/python/ray/_private/utils.py @@ -290,7 +290,7 @@ def get_gpu_and_accelerator_runtime_ids() -> Mapping[str, Optional[List[str]]]: corresponding key. """ return { - ray_constants.GPU: get_cuda_visible_devices(), + ray_constants.GPU: get_gpu_visible_devices(), ray_constants.NEURON_CORES: get_aws_neuron_core_visible_ids(), } @@ -354,81 +354,6 @@ def _get_visible_ids(env_var: str) -> Optional[List[str]]: last_set_neuron_core_ids = None -def get_xpu_devices(): - """Get xpu device IDs by calling `dpctl` api - device with specific backend and device_type - Returns: - devices IDs (List[str]): return the list of string representing - the relative IDs filtered by - ONEAPI_DEVICE_SELECTOR, specific backend and device_type - returned visible IDs start with index 0 - Example: - if ONEAPI_DEVICE_SELECTOR="level_zero:2,3,4" - the device IDs enumerated will be [0,1,2] - same with CUDA_VISIBLE_DEVICES - """ - backend = ray_constants.RAY_ONEAPI_DEVICE_BACKEND_TYPE - device_type = ray_constants.RAY_ONEAPI_DEVICE_TYPE - xpu_ids = [] - try: - import dpctl - - for dev in dpctl.get_devices(backend=backend, device_type=device_type): - # device filter_string with format: "backend:device_type:relative_id" - xpu_ids.append(int(dev.filter_string.split(":")[-1])) - except ImportError: - ValueError("Import dpctl error, maybe dpctl not installed.") - return xpu_ids - - -def get_xpu_visible_devices(): - """Get xpu devices IDs filtered by ONEAPI_DEVICE_SELECTOR environment variable. - Returns: - devices (List[str]): return the list of string representing the relative IDS - filtered by ONEAPI_DEVICE_SELECTOR. - """ - if os.environ.get("ONEAPI_DEVICE_SELECTOR", None) is None: - return None - - xpu_ids = get_xpu_devices() - - return xpu_ids - - -def get_xpu_all_devices(): - """Get all xpu device IDS without ONEAPI_DEVICE_SELECTOR filter, - But all xpu device still filtered by specific backend and device_type - Returns: - devices (List[str]): list of strings representing - the numeric index (zero-based), with sepcific backend and device_type - """ - selector = os.environ.get("ONEAPI_DEVICE_SELECTOR", None) - # unset "ONEAPI_DEVICE_SELECTOR" - os.unsetenv("ONEAPI_DEVICE_SELECTOR") - - xpu_ids = get_xpu_devices() - - # set "ONEAPI_DEVICE_SELECTOR" value back - if selector is not None: - os.environ["ONEAPI_DEVICE_SELECTOR"] = selector - - return xpu_ids - - -def get_current_accelerator(): - return os.environ.get( - "RAY_EXPERIMENTAL_ACCELERATOR_TYPE", ray_constants.RAY_ACCELERATOR_DEFAULT - ) - - -def get_gpu_visible_devices(): - accelerator = get_current_accelerator() - if get_current_accelerator() == "XPU": - return get_xpu_visible_devices() - elif accelerator == "CUDA": - return get_cuda_visible_devices() - - def get_xpu_devices(): """Get xpu device IDs by calling `dpctl` api device with specific backend and device_type @@ -544,7 +469,7 @@ def set_omp_num_threads_if_unset() -> bool: return True -def set_cuda_visible_devices(gpu_ids: List[str]): +def set_cuda_visible_devices(dev_ids: List[str]): """Set the CUDA_VISIBLE_DEVICES environment variable. Args: @@ -555,33 +480,11 @@ def set_cuda_visible_devices(gpu_ids: List[str]): global last_set_gpu_ids if last_set_gpu_ids == dev_ids: return # optimization: already set - _set_visible_ids(gpu_ids, ray_constants.CUDA_VISIBLE_DEVICES_ENV_VAR) - last_set_gpu_ids = gpu_ids - - -def set_xpu_visible_devices(dev_ids): - """Set the ONEAPI_DEVICE_SELECTOR environment variable. - Args: - dev_ids (List[str]): List of strings representing GPU IDs - """ - - if os.environ.get(ray_constants.NOSET_ONEAPI_DEVICE_SELECTOR_ENV_VAR): - return - - backend = ray_constants.RAY_ONEAPI_DEVICE_BACKEND_TYPE - dev_str = ",".join([str(i) for i in dev_ids]) - os.environ["ONEAPI_DEVICE_SELECTOR"] = backend + ":" + dev_str - - -def set_gpu_visible_devices(device_ids): - accelerator = get_current_accelerator() - if accelerator == "XPU": - return set_xpu_visible_devices(device_ids) - elif accelerator == "CUDA": - return set_cuda_visible_devices(device_ids) + _set_visible_ids(dev_ids, ray_constants.CUDA_VISIBLE_DEVICES_ENV_VAR) + last_set_gpu_ids = dev_ids -def set_xpu_visible_devices(dev_ids): +def set_xpu_visible_devices(dev_ids: List[str]): """Set the ONEAPI_DEVICE_SELECTOR environment variable. Args: dev_ids (List[str]): List of strings representing GPU IDs @@ -612,7 +515,7 @@ def set_gpu_and_accelerator_runtime_ids() -> None: environment variable. """ ids = ray.get_runtime_context().get_resource_ids() - set_cuda_visible_devices(ids[ray_constants.GPU]) + set_gpu_visible_devices(ids[ray_constants.GPU]) set_aws_neuron_core_visible_ids(ids[ray_constants.NEURON_CORES]) diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index 6a1d249e87a5..d9d19ae679ab 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -840,7 +840,6 @@ def get_resource_ids_for_resource( ) -> Union[List[str], List[int]]: """Get the resource IDs that are assigned to the given resource. -<<<<<<< HEAD Args: resource_name: The name of the resource. resource_regex: The regex of the resource. @@ -861,7 +860,6 @@ def get_resource_ids_for_resource( if resource == resource_name or re.match(resource_regex, resource): for resource_id, _ in assignment: assigned_ids.add(resource_id) - # If the user had already set the environment variables # (CUDA_VISIBLE_DEVICES, NEURON_RT_VISIBLE_CORES, ..) then respect that # in the sense that only IDs that appear in (CUDA_VISIBLE_DEVICES, @@ -917,6 +915,10 @@ def get_xpu_ids(): worker = global_worker worker.check_connected() + return worker.get_resource_ids_for_resource( + ray_constants.GPU, f"^{ray_constants.GPU}_group_[0-9A-Za-z]+$" + ) +""" if worker.mode != WORKER_MODE: if log_once("worker_get_gpu_ids_empty_from_driver"): logger.warning( @@ -951,57 +953,7 @@ def get_xpu_ids(): assigned_ids = global_worker.original_gpu_ids[:max_gpus] return assigned_ids - - -def get_xpu_ids(): - """Get the IDs of the XPUs that are available to the worker. - - If the ONEAPI_DEVICE_SELECTOR environment variable was set before the worker - started up, then the IDs returned by this method will be a subset of the - IDs in ONEAPI_DEVICE_SELECTOR. If not, the IDs will fall in the range - [0, NUM_GPUS - 1], where NUM_GPUS is the number of GPUs that the node has. - - Returns: - A list of XPU IDs - """ - worker = global_worker - worker.check_connected() - - if worker.mode != WORKER_MODE: - if log_once("worker_get_gpu_ids_empty_from_driver"): - logger.warning( - "`ray.get_xpu_ids()` will always return the empty list when " - "called from the driver. This is because Ray does not manage " - "XPU allocations to the driver process." - ) - - # Get all resources from global core worker - all_resource_ids = global_worker.core_worker.resource_ids() - assigned_ids = set() - for resource, assignment in all_resource_ids.items(): - # Handle both normal and placement group GPU resources. - # Note: We should only get the GPU ids from the placement - # group resource that does not contain the bundle index! - import re - - if resource == "GPU" or re.match(r"^GPU_group_[0-9A-Za-z]+$", resource): - for resource_id, _ in assignment: - assigned_ids.add(resource_id) - assigned_ids = list(assigned_ids) - # If the user had already set ONEAPI_DEVICE_SELECTOR, then respect that (in - # the sense that only GPU IDs that appear in ONEAPI_DEVICE_SELECTOR should be - # returned). - if global_worker.original_gpu_ids is not None: - assigned_ids = [ - global_worker.original_gpu_ids[gpu_id] for gpu_id in assigned_ids - ] - # Give all GPUs in local_mode. - if global_worker.mode == LOCAL_MODE: - max_gpus = global_worker.node.get_resource_spec().num_gpus - assigned_ids = global_worker.original_gpu_ids[:max_gpus] - - return assigned_ids - +""" @PublicAPI @client_mode_hook diff --git a/python/ray/tests/conftest.py b/python/ray/tests/conftest.py index 3773b0981380..c34c3080344c 100644 --- a/python/ray/tests/conftest.py +++ b/python/ray/tests/conftest.py @@ -332,6 +332,9 @@ def shutdown_only(maybe_external_redis): # Delete the cluster address just in case. ray._private.utils.reset_ray_address() + if "RAY_EXPERIMENTAL_ACCELERATOR_TYPE" in os.environ: + del os.environ["RAY_EXPERIMENTAL_ACCELERATOR_TYPE"] + @pytest.fixture def propagate_logs(): diff --git a/python/ray/tests/test_advanced_2.py b/python/ray/tests/test_advanced_2.py index e875c3d1ecf1..d07944b37312 100644 --- a/python/ray/tests/test_advanced_2.py +++ b/python/ray/tests/test_advanced_2.py @@ -107,94 +107,6 @@ def test(self): def test_cuda_ids(shutdown_only): - os.environ["RAY_EXPERIMENTAL_ACCELERATOR_TYPE"] = "CUDA" - num_gpus = 3 - ray.init(num_cpus=num_gpus, num_gpus=num_gpus) - - def get_gpu_ids(num_gpus_per_worker): - gpu_ids = ray.get_gpu_ids() - assert len(gpu_ids) == num_gpus_per_worker - assert os.environ["ONEAPI_DEVICE_SELECTOR"] == XPU_BACKEND + ":" + ",".join( - [str(i) for i in gpu_ids] # noqa - ) - for gpu_id in gpu_ids: - assert gpu_id in range(num_gpus) - return gpu_ids - - f0 = ray.remote(num_gpus=0)(lambda: get_gpu_ids(0)) - f1 = ray.remote(num_gpus=1)(lambda: get_gpu_ids(1)) - f2 = ray.remote(num_gpus=2)(lambda: get_gpu_ids(2)) - - # Wait for all workers to start up. - @ray.remote - def f(): - time.sleep(0.2) - return os.getpid() - - start_time = time.time() - while True: - num_workers_started = len(set(ray.get([f.remote() for _ in range(num_gpus)]))) - if num_workers_started == num_gpus: - break - if time.time() > start_time + 10: - raise RayTestTimeoutException( - "Timed out while waiting for workers to start up." - ) - - list_of_ids = ray.get([f0.remote() for _ in range(10)]) - assert list_of_ids == 10 * [[]] - ray.get([f1.remote() for _ in range(10)]) - ray.get([f2.remote() for _ in range(10)]) - - # Test that actors have ONEAPI_DEVICE_SELECTOR set properly. - - @ray.remote - class Actor0: - def __init__(self): - gpu_ids = ray.get_gpu_ids() - assert len(gpu_ids) == 0 - assert os.environ["ONEAPI_DEVICE_SELECTOR"] == XPU_BACKEND + ":" + ",".join( - [str(i) for i in gpu_ids] # noqa - ) - # Set self.x to make sure that we got here. - self.x = 1 - - def test(self): - gpu_ids = ray.get_gpu_ids() - assert len(gpu_ids) == 0 - assert os.environ["ONEAPI_DEVICE_SELECTOR"] == XPU_BACKEND + ":" + ",".join( - [str(i) for i in gpu_ids] - ) - return self.x - - @ray.remote(num_gpus=1) - class Actor1: - def __init__(self): - gpu_ids = ray.get_gpu_ids() - assert len(gpu_ids) == 1 - assert os.environ["ONEAPI_DEVICE_SELECTOR"] == XPU_BACKEND + ":" + ",".join( - [str(i) for i in gpu_ids] - ) - # Set self.x to make sure that we got here. - self.x = 1 - - def test(self): - gpu_ids = ray.get_gpu_ids() - assert len(gpu_ids) == 1 - assert os.environ["ONEAPI_DEVICE_SELECTOR"] == XPU_BACKEND + ":" + ",".join( - [str(i) for i in gpu_ids] - ) - return self.x - - a0 = Actor0.remote() - ray.get(a0.test.remote()) - - a1 = Actor1.remote() - ray.get(a1.test.remote()) - - -def test_cuda_ids(shutdown_only): - os.environ["RAY_EXPERIMENTAL_ACCELERATOR_TYPE"] = "CUDA" num_gpus = 3 ray.init(num_cpus=num_gpus, num_gpus=num_gpus) @@ -891,8 +803,6 @@ def test(self): # TODO: 5 retry attempts may be too little for Travis and we may need to # increase it if this test begins to be flaky on Travis. def test_zero_capacity_deletion_semantics(shutdown_only): - os.environ["RAY_EXPERIMENTAL_ACCELERATOR_TYPE"] = "CUDA" - ray.init(num_cpus=2, num_gpus=1, resources={"test_resource": 1}) def delete_miscellaneous_item(resources): diff --git a/python/ray/tests/test_advanced_6.py b/python/ray/tests/test_advanced_6.py index b10e930f6883..1f64466c0581 100644 --- a/python/ray/tests/test_advanced_6.py +++ b/python/ray/tests/test_advanced_6.py @@ -41,6 +41,8 @@ def save_gpu_ids_shutdown_only(): else: del os.environ["CUDA_VISIBLE_DEVICES"] + if "RAY_EXPERIMENTAL_ACCELERATOR_TYPE" in os.environ: + del os.environ["RAY_EXPERIMENTAL_ACCELERATOR_TYPE"] @pytest.fixture def save_xpu_ids_shutdown_only(): @@ -58,6 +60,9 @@ def save_xpu_ids_shutdown_only(): else: del os.environ["ONEAPI_DEVICE_SELECTOR"] + if "RAY_EXPERIMENTAL_ACCELERATOR_TYPE" in os.environ: + del os.environ["RAY_EXPERIMENTAL_ACCELERATOR_TYPE"] + @pytest.mark.skipif(platform.system() == "Windows", reason="Hangs on Windows") def test_specific_cudas(save_gpu_ids_shutdown_only): @@ -107,9 +112,7 @@ def f(): def test_local_mode_xpus(save_xpu_ids_shutdown_only): os.environ["RAY_EXPERIMENTAL_ACCELERATOR_TYPE"] = "XPU" allowed_xpu_ids = [0, 1, 2, 3, 4, 5] - os.environ["ONEAPI_DEVICE_SELECTOR"] = ( - XPU_BACKEND + ":" + ",".join([str(i) for i in allowed_xpu_ids]) - ) + os.environ["ONEAPI_DEVICE_SELECTOR"] = XPU_BACKEND + ":" + ",".join([str(i) for i in allowed_xpu_ids]) ray._private.utils.get_xpu_devices = Mock(return_value=allowed_xpu_ids) from importlib import reload From 548cd35d1300f507f0fd1f4e67543df8ce9e64a1 Mon Sep 17 00:00:00 2001 From: harborn Date: Fri, 18 Aug 2023 08:48:12 +0000 Subject: [PATCH 09/11] remove unused codes Signed-off-by: harborn --- python/ray/_private/worker.py | 35 ----------------------------------- 1 file changed, 35 deletions(-) diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index d9d19ae679ab..c3e72d441e3d 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -918,42 +918,7 @@ def get_xpu_ids(): return worker.get_resource_ids_for_resource( ray_constants.GPU, f"^{ray_constants.GPU}_group_[0-9A-Za-z]+$" ) -""" - if worker.mode != WORKER_MODE: - if log_once("worker_get_gpu_ids_empty_from_driver"): - logger.warning( - "`ray.get_xpu_ids()` will always return the empty list when " - "called from the driver. This is because Ray does not manage " - "XPU allocations to the driver process." - ) - # Get all resources from global core worker - all_resource_ids = global_worker.core_worker.resource_ids() - assigned_ids = set() - for resource, assignment in all_resource_ids.items(): - # Handle both normal and placement group GPU resources. - # Note: We should only get the GPU ids from the placement - # group resource that does not contain the bundle index! - import re - - if resource == "GPU" or re.match(r"^GPU_group_[0-9A-Za-z]+$", resource): - for resource_id, _ in assignment: - assigned_ids.add(resource_id) - assigned_ids = list(assigned_ids) - # If the user had already set ONEAPI_DEVICE_SELECTOR, then respect that (in - # the sense that only GPU IDs that appear in ONEAPI_DEVICE_SELECTOR should be - # returned). - if global_worker.original_gpu_ids is not None: - assigned_ids = [ - global_worker.original_gpu_ids[gpu_id] for gpu_id in assigned_ids - ] - # Give all GPUs in local_mode. - if global_worker.mode == LOCAL_MODE: - max_gpus = global_worker.node.get_resource_spec().num_gpus - assigned_ids = global_worker.original_gpu_ids[:max_gpus] - - return assigned_ids -""" @PublicAPI @client_mode_hook From 314d44b241a808e2863f1068db7c06f2f8b4b727 Mon Sep 17 00:00:00 2001 From: Abhilash Majumder <30946547+abhilash1910@users.noreply.github.com> Date: Fri, 18 Aug 2023 17:13:22 +0530 Subject: [PATCH 10/11] cleanup Signed-off-by: Abhilash Majumder <30946547+abhilash1910@users.noreply.github.com> --- python/ray/_private/worker.py | 39 ++++++++--------------------------- 1 file changed, 9 insertions(+), 30 deletions(-) diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index c3e72d441e3d..764866b410b1 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -883,16 +883,16 @@ def get_resource_ids_for_resource( return list(assigned_ids) -def get_cuda_ids(): - """Get the IDs of the GPUs that are available to the worker. +def get_gpu_device_ids(): + """Get the IDs of the GPUs or XPUs that are available to the worker. - If the CUDA_VISIBLE_DEVICES environment variable was set when the worker - started up, then the IDs returned by this method will be a subset of the - IDs in CUDA_VISIBLE_DEVICES. If not, the IDs will fall in the range - [0, NUM_GPUS - 1], where NUM_GPUS is the number of GPUs that the node has. + If the CUDA_VISIBLE_DEVICES or ONEAPI_DEVICE_SELECTOR environment variable was set when + the worker started up, then the IDs returned by this method will be a subset of the + IDs in CUDA_VISIBLE_DEVICES or ONEAPI_DEVICE_SELECTOR. If not, the IDs will fall in the + range [0, NUM_GPUS - 1], where NUM_GPUS is the number of GPUs or XPUs that the node has. Returns: - A list of GPU IDs. + A list of GPU or XPU IDs. """ worker = global_worker worker.check_connected() @@ -901,33 +901,12 @@ def get_cuda_ids(): ) -def get_xpu_ids(): - """Get the IDs of the XPUs that are available to the worker. - - If the ONEAPI_DEVICE_SELECTOR environment variable was set before the worker - started up, then the IDs returned by this method will be a subset of the - IDs in ONEAPI_DEVICE_SELECTOR. If not, the IDs will fall in the range - [0, NUM_GPUS - 1], where NUM_GPUS is the number of GPUs that the node has. - - Returns: - A list of XPU IDs - """ - worker = global_worker - worker.check_connected() - - return worker.get_resource_ids_for_resource( - ray_constants.GPU, f"^{ray_constants.GPU}_group_[0-9A-Za-z]+$" - ) - - @PublicAPI @client_mode_hook def get_gpu_ids(): accelerator = ray._private.utils.get_current_accelerator() - if accelerator == "CUDA": - return get_cuda_ids() - elif accelerator == "XPU": - return get_xpu_ids() + if accelerator in ["CUDA","XPU"]: + return get_gpu_device_ids() return [] From 851af30dc7dcba9f4284892f6970ccd03ba6a5f4 Mon Sep 17 00:00:00 2001 From: Abhilash Majumder <30946547+abhilash1910@users.noreply.github.com> Date: Fri, 18 Aug 2023 17:19:32 +0530 Subject: [PATCH 11/11] align Signed-off-by: Abhilash Majumder <30946547+abhilash1910@users.noreply.github.com> --- python/ray/tests/test_basic.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index 32b34523e978..c6c2d8b31fc5 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -31,16 +31,16 @@ def test_http_proxy(start_http_proxy, shutdown_only): # run driver as a separate process to make sure the correct config value # is initialized. script = """ -import ray - -ray.init(num_cpus=1) - -@ray.remote -def f(): - return 1 - -assert ray.get(f.remote()) == 1 -""" + import ray + + ray.init(num_cpus=1) + + @ray.remote + def f(): + return 1 + + assert ray.get(f.remote()) == 1 + """ env = start_http_proxy run_string_as_driver(script, dict(os.environ, **env))