Skip to content

Commit

Permalink
up
Browse files Browse the repository at this point in the history
Signed-off-by: Rui Qiao <[email protected]>
  • Loading branch information
ruisearch42 committed Sep 19, 2024
1 parent f7b1b97 commit 424a3e2
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 25 deletions.
26 changes: 13 additions & 13 deletions python/ray/dag/compiled_dag_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,7 @@ def __init__(
asyncio_max_queue_size: Optional[int] = None,
max_buffered_results: Optional[int] = None,
max_inflight_executions: Optional[int] = None,
overlapping_factor: Optional[float] = None,
overlap_gpu_communication: Optional[bool] = None,
):
"""
Args:
Expand Down Expand Up @@ -827,11 +827,11 @@ def __init__(
are allowed to be sent to this DAG. Before submitting more requests,
the caller is responsible for calling ray.get to get the result,
otherwise, RayAdagCapacityExceeded is raised.
overlapping_factor: Controls the degree of overlapping computation and
communication in aDAG execution. If None, the default value is used.
If 0, no overlapping is allowed. If 1, the communication and
computation are overlapped with the minimal degree. No other values
are supported at the moment.
overlap_gpu_communication: Whether to overlap GPU communication with
computation during DAG execution. If True, the communication
and computation can be overlapped, which can improve the
performance of the DAG execution. If None, the default value
will be used.
Returns:
Channel: A wrapper around ray.ObjectRef.
Expand All @@ -857,9 +857,9 @@ def __init__(
self._buffer_size_bytes: Optional[int] = buffer_size_bytes
if self._buffer_size_bytes is None:
self._buffer_size_bytes = ctx.buffer_size_bytes
self._overlapping_factor: Optional[float] = overlapping_factor
if self._overlapping_factor is None:
self._overlapping_factor = ctx.overlapping_factor
self._overlap_gpu_communication: Optional[bool] = overlap_gpu_communication
if self._overlap_gpu_communication is None:
self._overlap_gpu_communication = ctx.overlap_gpu_communication

self._default_type_hint: ChannelOutputType = SharedMemoryType(
self._buffer_size_bytes,
Expand Down Expand Up @@ -1597,7 +1597,7 @@ def _get_or_compile(

if RAY_ADAG_ENABLE_PROFILING:
exec_task_func = do_profile_tasks
elif self._overlapping_factor:
elif self._overlap_gpu_communication:
if RAY_ADAG_ENABLE_TORCH_PROFILING:
exec_task_func = do_profile_stream_tasks
else:
Expand Down Expand Up @@ -1767,7 +1767,7 @@ def _build_execution_schedule(

# Step 3: Optimize the execution schedule based on overlapping factor
actor_to_optimized_schedule = _optimize_execution_schedule(
actor_to_execution_schedule, self._overlapping_factor
actor_to_execution_schedule, self._overlap_gpu_communication
)
return actor_to_optimized_schedule

Expand Down Expand Up @@ -2348,7 +2348,7 @@ def build_compiled_dag_from_ray_dag(
asyncio_max_queue_size: Optional[int] = None,
max_buffered_results: Optional[int] = None,
max_inflight_executions: Optional[int] = None,
overlapping_factor: Optional[int] = None,
overlap_gpu_communication: Optional[bool] = None,
) -> "CompiledDAG":
compiled_dag = CompiledDAG(
execution_timeout,
Expand All @@ -2357,7 +2357,7 @@ def build_compiled_dag_from_ray_dag(
asyncio_max_queue_size,
max_buffered_results,
max_inflight_executions,
overlapping_factor,
overlap_gpu_communication,
)

def _build_compiled_dag(node):
Expand Down
12 changes: 8 additions & 4 deletions python/ray/dag/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
os.environ.get("RAY_DAG_max_inflight_executions", 10)
)

DEFAULT_OVERLAPPING_FACTOR = int(os.environ.get("RAY_DAG_overlapping_factor", 0))
DEFAULT_OVERLAPPING_FACTOR = bool(
os.environ.get("RAY_DAG_overlap_gpu_communication", 0)
)


@DeveloperAPI
Expand Down Expand Up @@ -62,8 +64,10 @@ class DAGContext:
enforced when it is smaller than the DAG capacity.
max_inflight_executions: The maximum number of in-flight executions
that can be submitted before consuming the output.
overlapping_factor: Determines the degree to which the DAG execution
can overlap communication and computation.
overlap_gpu_communication: Whether to overlap GPU communication with
computation during DAG execution. If True, the communication
and computation can be overlapped, which can improve the
performance of the DAG execution.
"""

execution_timeout: int = DEFAULT_EXECUTION_TIMEOUT_S
Expand All @@ -72,7 +76,7 @@ class DAGContext:
asyncio_max_queue_size: int = DEFAULT_ASYNCIO_MAX_QUEUE_SIZE
max_buffered_results: int = DEFAULT_MAX_BUFFERED_RESULTS
max_inflight_executions: int = DEFAULT_MAX_INFLIGHT_EXECUTIONS
overlapping_factor: int = DEFAULT_OVERLAPPING_FACTOR
overlap_gpu_communication: bool = DEFAULT_OVERLAPPING_FACTOR

@staticmethod
def get_current() -> "DAGContext":
Expand Down
14 changes: 7 additions & 7 deletions python/ray/dag/dag_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def experimental_compile(
_asyncio_max_queue_size: Optional[int] = None,
_max_buffered_results: Optional[int] = None,
_max_inflight_executions: Optional[int] = None,
_overlapping_factor: Optional[int] = None,
_overlap_gpu_communication: Optional[bool] = None,
) -> "ray.dag.CompiledDAG":
"""Compile an accelerated execution path for this DAG.
Expand All @@ -192,11 +192,11 @@ def experimental_compile(
are allowed to be sent to this DAG. Before submitting more requests,
the caller is responsible for calling ray.get to clear finished
in-flight requests.
_overlapping_factor: Controls the degree of overlapping computation and
communication in aDAG execution. If None, the default value is used.
If 0, no overlapping is allowed. If 1, the communication and
computation are overlapped with the minimal degree. No other values
are supported at the moment.
overlap_gpu_communication: Whether to overlap GPU communication with
computation during DAG execution. If True, the communication
and computation can be overlapped, which can improve the
performance of the DAG execution. If None, the default value
will be used.
Returns:
A compiled DAG.
Expand Down Expand Up @@ -230,7 +230,7 @@ def experimental_compile(
_asyncio_max_queue_size,
_max_buffered_results,
_max_inflight_executions,
_overlapping_factor,
_overlap_gpu_communication,
)

def execute(
Expand Down
2 changes: 1 addition & 1 deletion python/ray/dag/tests/experimental/test_torch_tensor_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ def test_torch_tensor_nccl_overlap(ray_start_regular):
dag = MultiOutputNode([branch1, branch2])

# Test normal execution.
compiled_dag = dag.experimental_compile(_overlapping_factor=1)
compiled_dag = dag.experimental_compile(_overlap_gpu_communication=True)

for i in range(3):
ref = compiled_dag.execute(i)
Expand Down

0 comments on commit 424a3e2

Please sign in to comment.