Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions python/ray/_private/ray_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ def env_set_by_user(key):
"RAY_EXPERIMENTAL_NOSET_NEURON_RT_VISIBLE_CORES"
)
NOSET_CUDA_VISIBLE_DEVICES_ENV_VAR = "RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES"

CUDA_VISIBLE_DEVICES_ENV_VAR = "CUDA_VISIBLE_DEVICES"
NEURON_RT_VISIBLE_CORES_ENV_VAR = "NEURON_RT_VISIBLE_CORES"
NEURON_CORES = "neuron_cores"
Expand All @@ -416,6 +417,15 @@ def env_set_by_user(key):
"inf2.24xlarge": 12,
"inf2.48xlarge": 24,
}

NOSET_ONEAPI_DEVICE_SELECTOR_ENV_VAR = "RAY_EXPERIMENTAL_NOSET_ONEAPI_DEVICE_SELECTOR"

RAY_ONEAPI_DEVICE_BACKEND_TYPE = "level_zero"
RAY_ONEAPI_DEVICE_TYPE = "gpu"

RAY_DEVICE_SUPPORT_TYPES = {"CPU", "CUDA", "XPU"}
RAY_ACCELERATOR_DEFAULT = "CUDA"

RAY_WORKER_NICENESS = "RAY_worker_niceness"

# Default max_retries option in @ray.remote for non-actor
Expand Down
101 changes: 77 additions & 24 deletions python/ray/_private/resource_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,35 +171,22 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None):
if is_head:
resources[HEAD_NODE_RESOURCE_NAME] = 1.0

# Get cpu num
num_cpus = self.num_cpus
if num_cpus is None:
num_cpus = ray._private.utils.get_num_cpus()

num_gpus = self.num_gpus
gpu_ids = ray._private.utils.get_cuda_visible_devices()
# Check that the number of GPUs that the raylet wants doesn't
# exceed the amount allowed by CUDA_VISIBLE_DEVICES.
if num_gpus is not None and gpu_ids is not None and num_gpus > len(gpu_ids):
raise ValueError(
"Attempting to start raylet with {} GPUs, "
"but CUDA_VISIBLE_DEVICES contains {}.".format(num_gpus, gpu_ids)
)
if num_gpus is None:
# Try to automatically detect the number of GPUs.
num_gpus = _autodetect_num_gpus()
# Don't use more GPUs than allowed by CUDA_VISIBLE_DEVICES.
if gpu_ids is not None:
num_gpus = min(num_gpus, len(gpu_ids))

try:
if importlib.util.find_spec("GPUtil") is not None:
gpu_types = _get_gpu_types_gputil()
else:
info_string = _get_gpu_info_string()
gpu_types = _constraints_from_gpu_info(info_string)
# Get accelerate device info
acc_type = ray._private.utils.get_current_accelerator()
if acc_type == "CUDA": # get cuda device num
num_gpus, gpu_types = _get_cuda_info(self.num_gpus)
resources.update(gpu_types)
elif acc_type == "XPU": # get xpu device num
# here we take xpu as gpu, so no need to develop core's scheduling policy
# If we don't want to take xpu as gpu,
# ray core need to develop new scheduling policy
num_gpus, gpu_types = _get_xpu_info(self.num_gpus)
resources.update(gpu_types)
except Exception:
logger.exception("Could not parse gpu information.")

accelerator.update_resources_with_accelerator_type(resources)

Expand Down Expand Up @@ -281,6 +268,72 @@ def resolve(self, is_head: bool, node_ip_address: Optional[str] = None):
return spec


def _get_cuda_info(num_gpus):
"""Attemp to process the number and type of GPUs
Notice:
If gpu id not specified in CUDA_VISIBLE_DEVICES,
and num_gpus is defined in task or actor,
this function will return the input num_gpus, not 0

Returns:
(num_gpus, gpu_types)
"""
gpu_ids = ray._private.utils.get_cuda_visible_devices()
# Check that the number of GPUs that the raylet wants doesn't
# exceed the amount allowed by CUDA_VISIBLE_DEVICES.
if num_gpus is not None and gpu_ids is not None and num_gpus > len(gpu_ids):
raise ValueError(
"Attempting to start raylet with {} GPUs, "
"but CUDA_VISIBLE_DEVICES contains {}.".format(num_gpus, gpu_ids)
)
if num_gpus is None:
# Try to automatically detect the number of GPUs.
num_gpus = _autodetect_num_gpus()
# Don't use more GPUs than allowed by CUDA_VISIBLE_DEVICES.
if gpu_ids is not None:
num_gpus = min(num_gpus, len(gpu_ids))

gpu_types = ""
try:
if importlib.util.find_spec("GPUtil") is not None:
gpu_types = _get_gpu_types_gputil()
else:
info_string = _get_gpu_info_string()
gpu_types = _constraints_from_gpu_info(info_string)
except Exception:
logger.exception("Could not parse gpu information.")

return num_gpus, gpu_types


def _get_xpu_info(num_xpus):
"""Attempt to process the number of XPUs
Notice:
If xpu id not specified in ONEAPI_DEVICE_SELECTOR,
and num_gpus is defined in task or actor,
this function will return the input num_gpus, not 0

Returns:
(num_xpus, xpu_types)
"""
# get visible xpu ids
xpu_ids = ray._private.utils.get_xpu_visible_devices()
if num_xpus is not None and xpu_ids is not None and num_xpus > len(xpu_ids):
raise ValueError(
"Attempting to start raylet with {} XPUs, "
"but ONEAPI_DEVICE_SELECTOR contains {}.".format(num_xpus, xpu_ids)
)
if num_xpus is None:
# Try to detect all number of XPUs.
num_xpus = len(ray._private.utils.get_xpu_all_devices())
# Don't use more XPUs than allowed by ONEAPI_DEVICE_SELECTOR.
if xpu_ids is not None:
num_xpus = min(num_xpus, len(xpu_ids))

xpu_types = {f"{ray_constants.RESOURCE_CONSTRAINT_PREFIX}" "xpu": 1}
return num_xpus, xpu_types


def _autodetect_num_gpus():
"""Attempt to detect the number of GPUs on this machine.

Expand Down
111 changes: 104 additions & 7 deletions python/ray/_private/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ def get_gpu_and_accelerator_runtime_ids() -> Mapping[str, Optional[List[str]]]:
corresponding key.
"""
return {
ray_constants.GPU: get_cuda_visible_devices(),
ray_constants.GPU: get_gpu_visible_devices(),
ray_constants.NEURON_CORES: get_aws_neuron_core_visible_ids(),
}

Expand Down Expand Up @@ -354,6 +354,81 @@ def _get_visible_ids(env_var: str) -> Optional[List[str]]:
last_set_neuron_core_ids = None


def get_xpu_devices():
"""Get xpu device IDs by calling `dpctl` api
device with specific backend and device_type
Returns:
devices IDs (List[str]): return the list of string representing
the relative IDs filtered by
ONEAPI_DEVICE_SELECTOR, specific backend and device_type
returned visible IDs start with index 0
Example:
if ONEAPI_DEVICE_SELECTOR="level_zero:2,3,4"
the device IDs enumerated will be [0,1,2]
same with CUDA_VISIBLE_DEVICES
"""
backend = ray_constants.RAY_ONEAPI_DEVICE_BACKEND_TYPE
device_type = ray_constants.RAY_ONEAPI_DEVICE_TYPE
xpu_ids = []
try:
import dpctl

for dev in dpctl.get_devices(backend=backend, device_type=device_type):
# device filter_string with format: "backend:device_type:relative_id"
xpu_ids.append(int(dev.filter_string.split(":")[-1]))
except ImportError:
ValueError("Import dpctl error, maybe dpctl not installed.")
return xpu_ids


def get_xpu_visible_devices():
"""Get xpu devices IDs filtered by ONEAPI_DEVICE_SELECTOR environment variable.
Returns:
devices (List[str]): return the list of string representing the relative IDS
filtered by ONEAPI_DEVICE_SELECTOR.
"""
if os.environ.get("ONEAPI_DEVICE_SELECTOR", None) is None:
return None

xpu_ids = get_xpu_devices()

return xpu_ids


def get_xpu_all_devices():
"""Get all xpu device IDS without ONEAPI_DEVICE_SELECTOR filter,
But all xpu device still filtered by specific backend and device_type
Returns:
devices (List[str]): list of strings representing
the numeric index (zero-based), with sepcific backend and device_type
"""
selector = os.environ.get("ONEAPI_DEVICE_SELECTOR", None)
# unset "ONEAPI_DEVICE_SELECTOR"
os.unsetenv("ONEAPI_DEVICE_SELECTOR")

xpu_ids = get_xpu_devices()

# set "ONEAPI_DEVICE_SELECTOR" value back
if selector is not None:
os.environ["ONEAPI_DEVICE_SELECTOR"] = selector

return xpu_ids


def get_current_accelerator():
return os.environ.get(
"RAY_EXPERIMENTAL_ACCELERATOR_TYPE", ray_constants.RAY_ACCELERATOR_DEFAULT
)


def get_gpu_visible_devices():
accelerator = get_current_accelerator()
if get_current_accelerator() == "XPU":
return get_xpu_visible_devices()
elif accelerator == "CUDA":
return get_cuda_visible_devices()


def set_omp_num_threads_if_unset() -> bool:
"""Set the OMP_NUM_THREADS to default to num cpus assigned to the worker

Expand Down Expand Up @@ -394,19 +469,41 @@ def set_omp_num_threads_if_unset() -> bool:
return True


def set_cuda_visible_devices(gpu_ids: List[str]):
def set_cuda_visible_devices(dev_ids: List[str]):
"""Set the CUDA_VISIBLE_DEVICES environment variable.

Args:
gpu_ids (List[str]): List of strings representing GPU IDs.
dev_ids (List[str]): List of strings representing GPU IDs.
"""
if os.environ.get(ray_constants.NOSET_CUDA_VISIBLE_DEVICES_ENV_VAR):
return
global last_set_gpu_ids
if last_set_gpu_ids == gpu_ids:
if last_set_gpu_ids == dev_ids:
return # optimization: already set
_set_visible_ids(gpu_ids, ray_constants.CUDA_VISIBLE_DEVICES_ENV_VAR)
last_set_gpu_ids = gpu_ids
_set_visible_ids(dev_ids, ray_constants.CUDA_VISIBLE_DEVICES_ENV_VAR)
last_set_gpu_ids = dev_ids


def set_xpu_visible_devices(dev_ids: List[str]):
"""Set the ONEAPI_DEVICE_SELECTOR environment variable.
Args:
dev_ids (List[str]): List of strings representing GPU IDs
"""

if os.environ.get(ray_constants.NOSET_ONEAPI_DEVICE_SELECTOR_ENV_VAR):
return

backend = ray_constants.RAY_ONEAPI_DEVICE_BACKEND_TYPE
dev_str = ",".join([str(i) for i in dev_ids])
os.environ["ONEAPI_DEVICE_SELECTOR"] = backend + ":" + dev_str


def set_gpu_visible_devices(device_ids):
accelerator = get_current_accelerator()
if accelerator == "XPU":
return set_xpu_visible_devices(device_ids)
elif accelerator == "CUDA":
return set_cuda_visible_devices(device_ids)


def set_gpu_and_accelerator_runtime_ids() -> None:
Expand All @@ -418,7 +515,7 @@ def set_gpu_and_accelerator_runtime_ids() -> None:
environment variable.
"""
ids = ray.get_runtime_context().get_resource_ids()
set_cuda_visible_devices(ids[ray_constants.GPU])
set_gpu_visible_devices(ids[ray_constants.GPU])
set_aws_neuron_core_visible_ids(ids[ray_constants.NEURON_CORES])


Expand Down
28 changes: 17 additions & 11 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ def __init__(self):
self.original_gpu_and_accelerator_runtime_ids = (
ray._private.utils.get_gpu_and_accelerator_runtime_ids()
)

# A dictionary that maps from driver id to SerializationContext
# TODO: clean up the SerializationContext once the job finished.
self.serialization_context_map = {}
Expand Down Expand Up @@ -859,7 +860,6 @@ def get_resource_ids_for_resource(
if resource == resource_name or re.match(resource_regex, resource):
for resource_id, _ in assignment:
assigned_ids.add(resource_id)

# If the user had already set the environment variables
# (CUDA_VISIBLE_DEVICES, NEURON_RT_VISIBLE_CORES, ..) then respect that
# in the sense that only IDs that appear in (CUDA_VISIBLE_DEVICES,
Expand All @@ -883,18 +883,16 @@ def get_resource_ids_for_resource(
return list(assigned_ids)


@PublicAPI
@client_mode_hook
def get_gpu_ids():
"""Get the IDs of the GPUs that are available to the worker.
def get_gpu_device_ids():
"""Get the IDs of the GPUs or XPUs that are available to the worker.

If the CUDA_VISIBLE_DEVICES environment variable was set when the worker
started up, then the IDs returned by this method will be a subset of the
IDs in CUDA_VISIBLE_DEVICES. If not, the IDs will fall in the range
[0, NUM_GPUS - 1], where NUM_GPUS is the number of GPUs that the node has.
If the CUDA_VISIBLE_DEVICES or ONEAPI_DEVICE_SELECTOR environment variable was set when
the worker started up, then the IDs returned by this method will be a subset of the
IDs in CUDA_VISIBLE_DEVICES or ONEAPI_DEVICE_SELECTOR. If not, the IDs will fall in the
range [0, NUM_GPUS - 1], where NUM_GPUS is the number of GPUs or XPUs that the node has.

Returns:
A list of GPU IDs.
A list of GPU or XPU IDs.
"""
worker = global_worker
worker.check_connected()
Expand All @@ -903,6 +901,15 @@ def get_gpu_ids():
)


@PublicAPI
@client_mode_hook
def get_gpu_ids():
accelerator = ray._private.utils.get_current_accelerator()
if accelerator in ["CUDA","XPU"]:
return get_gpu_device_ids()
return []


@Deprecated(
message="Use ray.get_runtime_context().get_assigned_resources() instead.",
warning=True,
Expand Down Expand Up @@ -1492,7 +1499,6 @@ def init(
usage_lib.show_usage_stats_prompt(cli=False)
else:
usage_lib.set_usage_stats_enabled_via_env_var(False)

# Use a random port by not specifying Redis port / GCS server port.
ray_params = ray._private.parameter.RayParams(
node_ip_address=node_ip_address,
Expand Down
3 changes: 3 additions & 0 deletions python/ray/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,9 @@ def shutdown_only(maybe_external_redis):
# Delete the cluster address just in case.
ray._private.utils.reset_ray_address()

if "RAY_EXPERIMENTAL_ACCELERATOR_TYPE" in os.environ:
del os.environ["RAY_EXPERIMENTAL_ACCELERATOR_TYPE"]


@pytest.fixture
def propagate_logs():
Expand Down
Loading