Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Simplify and consolidate progress bar outputs #47692

Merged
merged 14 commits into from
Sep 23, 2024
8 changes: 4 additions & 4 deletions python/ray/data/_internal/execution/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,11 @@ def get_op_usage(self, op: PhysicalOperator) -> ExecutionResources:
def get_op_usage_str(self, op: PhysicalOperator) -> str:
"""Return a human-readable string representation of the resource usage of
the given operator."""
usage_str = f"cpu: {self._op_running_usages[op].cpu:.1f}"
usage_str = f"{self._op_running_usages[op].cpu:.1f} CPU"
if self._op_running_usages[op].gpu:
usage_str += f", gpu: {self._op_running_usages[op].gpu:.1f}"
usage_str += f", {self._op_running_usages[op].gpu:.1f} GPU"
usage_str += (
f", objects: {self._op_running_usages[op].object_store_memory_str()}"
f", {self._op_running_usages[op].object_store_memory_str()} object store"
)
if self._debug:
usage_str += (
Expand All @@ -231,7 +231,7 @@ def get_op_usage_str(self, op: PhysicalOperator) -> str:
budget = self._op_resource_allocator._op_budgets[op]
usage_str += f", budget=(cpu={budget.cpu:.1f}"
usage_str += f",gpu={budget.gpu:.1f}"
usage_str += f",objects={budget.object_store_memory_str()})"
usage_str += f",object store={budget.object_store_memory_str()})"
return usage_str

def get_downstream_fraction(self, op: PhysicalOperator) -> float:
Expand Down
7 changes: 4 additions & 3 deletions python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,12 +358,13 @@ def _report_current_usage(self) -> None:
pending_usage = self._resource_manager.get_global_pending_usage()
limits = self._resource_manager.get_global_limits()
resources_status = (
"Running. Resources: "
# TODO(scottjlee): Add dataset name/ID to progress bar output.
"Running Dataset. Resource usage: "
f"{running_usage.cpu:.4g}/{limits.cpu:.4g} CPU, "
f"{running_usage.gpu:.4g}/{limits.gpu:.4g} GPU, "
f"{running_usage.object_store_memory_str()}/"
f"{limits.object_store_memory_str()} object_store_memory "
"(pending: "
f"{limits.object_store_memory_str()} object store "
"(: "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we hide the pending section if all are 0? (forgot to mention in the previous PR)

f"{pending_usage.cpu:.4g} CPU, "
f"{pending_usage.gpu:.4g} GPU)"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,16 +259,16 @@ def refresh_progress_bar(self, resource_manager: ResourceManager) -> None:
def summary_str(self, resource_manager: ResourceManager) -> str:
queued = self.num_queued() + self.op.internal_queue_size()
active = self.op.num_active_tasks()
desc = f"- {self.op.name}: {active} active, {queued} queued"
desc = f"- {self.op.name}. Tasks: {active} 🟢, {queued} 🟡"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"N queued" actually means N blocks in the input buffer, not number of tasks.
(the previous was already a bit confusing)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also since we have a "Tasks: " section here. I'm wondering maybe we can also move the actor info after this. Instead of at the very end.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe format it as "Tasks ..., Actors ..., Input blocks ..."

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, and blocked sign can be part of "tasks"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe format it as "Tasks ..., Actors ..., Input blocks ..."

+1 something like this seems reasonable to me

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do the green and yellow circles represent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

green = active, yellow = queued

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

although after reworking the progress bar as suggested above, i have removed the green/yellow emoji

if (
self.op._in_task_submission_backpressure
or self.op._in_task_output_backpressure
):
desc += " 🚧"
desc += f", [{resource_manager.get_op_usage_str(self.op)}]"
desc += f"; Resource usage: {resource_manager.get_op_usage_str(self.op)}"
suffix = self.op.progress_str()
if suffix:
desc += f", {suffix}"
desc += f"; {suffix}"
return desc

def dispatch_next_task(self) -> None:
Expand Down