Skip to content

Commit

Permalink
cluster list filters
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexandra Belousov authored and Alexandra Belousov committed Sep 18, 2024
1 parent b98bad0 commit edf7c85
Show file tree
Hide file tree
Showing 4 changed files with 335 additions and 108 deletions.
4 changes: 4 additions & 0 deletions runhouse/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
SECOND = 1
MINUTE = 60
HOUR = 3600
DAY = HOUR * 24
DEFAULT_STATUS_CHECK_INTERVAL = 1 * MINUTE
INCREASED_STATUS_CHECK_INTERVAL = 1 * HOUR
GPU_COLLECTION_INTERVAL = 5 * SECOND
Expand All @@ -104,3 +105,6 @@

# Constants runhouse cluster list
LAST_ACTIVE_AT_TIMEFRAME = 24 * HOUR

# in seconds
TIME_UNITS = {"s": SECOND, "m": MINUTE, "h": HOUR, "d": DAY}
219 changes: 111 additions & 108 deletions runhouse/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from runhouse.constants import (
BULLET_UNICODE,
DOUBLE_SPACE_UNICODE,
HOUR,
LAST_ACTIVE_AT_TIMEFRAME,
RAY_KILL_CMD,
RAY_START_CMD,
Expand All @@ -36,6 +37,7 @@
SERVER_STOP_CMD,
START_NOHUP_CMD,
START_SCREEN_CMD,
TIME_UNITS,
)
from runhouse.globals import rns_client
from runhouse.logger import get_logger
Expand Down Expand Up @@ -607,148 +609,149 @@ def get_clusters_from_den():
return asyncio.run(aget_clusters_from_den())


@cluster_app.command("list")
def cluster_list():
"""List all Runhouse clusters saved in Den"""
import sky
def _create_output_table(
total_clusters: int,
running_clusters: int,
displayed_clusters: int,
filters_requested: bool,
):
displayed_running_clusters = (
running_clusters
if running_clusters < displayed_clusters
else displayed_clusters
)
table_title = f"[bold cyan]Clusters for {rns_client.username} (Running: {displayed_running_clusters}/{running_clusters}, Total Displayed: {displayed_clusters}/{total_clusters})[/bold cyan]"

# logged out case
if not rh.configs.token:
# TODO [SB]: adjust msg formatting (coloring etc)
sky_cli_command_formatted = f"{italic_bold_ansi}sky status -r{reset_format}" # will be printed bold and italic
console.print(
f"Runhouse token required to view all clusters. Please run `runhouse login` to load your token. To view on-demand clusters run {sky_cli_command_formatted}"
)
return
table = Table(title=table_title)

sky_clusters = sky.status(refresh=True)
if not filters_requested:
table.caption = f"[reset]Showing clusters that were active in the last {LAST_ACTIVE_AT_TIMEFRAME / HOUR} hours."
table.caption_justify = "left"

clusters_in_den_resp = get_clusters_from_den()
# Add columns to the table
table.add_column("Name", justify="left", no_wrap=True)
table.add_column("Cluster Type", justify="left", no_wrap=True)
table.add_column("Status", justify="left")
table.add_column("Last Active (UTC)", justify="left")

if clusters_in_den_resp.status_code != 200:
logger.error(
f"Failed to load clusters from Den for username: {rns_client.username}"
)
clusters_in_den = []
else:
clusters_in_den = clusters_in_den_resp.json().get("data")
return table

clusters_in_den_names = [cluster.get("name") for cluster in clusters_in_den]

if not sky_clusters and not clusters_in_den:
console.print("No existing clusters.")
def _add_cluster_as_table_row(table: Table, rh_cluster: dict):
last_active = rh_cluster.get("Last Active (UTC)")
last_active = last_active if last_active != "1970-01-01 00:00:00" else "Unknown"
table.add_row(
rh_cluster.get("Name"),
rh_cluster.get("Cluster Type"),
rh_cluster.get("Status"),
last_active,
)

if sky_clusters:
# getting the on-demand clusters that are not saved in den.
sky_clusters = [
cluster
for cluster in sky_clusters
if f'/{rns_client.username}/{cluster.get("name")}'
not in clusters_in_den_names
]
return table

total_clusters = len(clusters_in_den)
table_title = f"[bold cyan]Clusters for {rns_client.username} (Total: {total_clusters})[/bold cyan]"

table = Table(title=table_title, title_justify="left")
def _add_clusters_to_output_table(
table: Table, clusters: list[Dict], filters_requested: dict = None
):
for rh_cluster in clusters:
last_active_at = rh_cluster.get("Last Active (UTC)")
last_active_at_no_offset = str(last_active_at).split("+")[
0
] # The split is required to remove the offset (according to UTC)
rh_cluster["Last Active (UTC)"] = last_active_at_no_offset
rh_cluster["Status"] = StatusColors.get_status_color(rh_cluster.get("Status"))

# Add columns to the table
table.add_column("Name", justify="left", no_wrap=True)
table.add_column("Cluster Type", justify="center", no_wrap=True)
table.add_column("Status", justify="left")
table = _add_cluster_as_table_row(table, rh_cluster)

running_clusters = []
not_running_clusters = []

for den_cluster in clusters_in_den:
# get just name, not full rns address. reset is used so the name will be printed all in white.
cluster_name = f'[reset]{den_cluster.get("name").split("/")[-1]}'
cluster_type = den_cluster.get("data").get("resource_subtype")
cluster_status = (
den_cluster.get("status") if den_cluster.get("status") else None
def _print_msg_live_sky_clusters(sky_clusters: int):
if sky_clusters > 0:
console.print(
f"There are {sky_clusters} live cluster(s) that are not saved in Den. For more information, please run [bold italic]sky status -r[/bold italic]."
)

# currently relying on status pings to den as a sign of cluster activity.
# The split is required to remove milliseconds and the offset (according to UTC) from the timestamp.
# (status_last_checked is in the following format: YYYY-MM-DD HH:MM:SS.ssssss±HH:MM)

last_active_at = den_cluster.get("status_last_checked")
last_active_at = (
datetime.datetime.fromisoformat(last_active_at.split(".")[0])
if isinstance(last_active_at, str)
else None
)
last_active_at = (
last_active_at.replace(tzinfo=datetime.timezone.utc)
if last_active_at
else None
def supported_time_units():
return ", ".join(list(TIME_UNITS.keys()))


@cluster_app.command("list")
def cluster_list(
show_all: bool = typer.Option(
False,
"--a",
"--all",
help="Get all den clusters. Up to 50 most recently active clusters will be displayed.",
),
since: Optional[str] = typer.Option(
None,
"--since",
help="Time duration to filter on. Minimum allowable filter is 1 minute. If the provided value will be less "
"than that, the filter will be set to 1 minute by default. You may filter by seconds (s), "
"minutes (m), hours (h) or days (s). Examples: 30s, 15m, 2h, 3d.",
),
cluster_status: Optional[str] = typer.Option(
None,
"--status",
help="Cluster status to filter on. Supported filter values: running, terminated (cluster is not live), down (runhouser server is down, but the cluster might be live).",
),
):
"""Load Runhouse clusters"""

# logged out case
if not rh.configs.token:
# TODO [SB]: adjust msg formatting (coloring etc)
sky_cli_command_formatted = f"{italic_bold_ansi}sky status -r{reset_format}" # will be printed bold and italic
console.print(
f"Listing clusters requires a Runhouse token. Please run `runhouse login` to get your token, or run {sky_cli_command_formatted} to list locally stored on-demand clusters."
)
return

clusters = Cluster.list(show_all=show_all, since=since, status=cluster_status)

if cluster_status == "running" and not last_active_at:
# For BC, in case there are clusters that were saved and created before we introduced sending cluster status to den.
cluster_status = "unknown"
all_clusters = clusters.get("all_clusters", None)
running_clusters = clusters.get("running_clusters", None)
sky_clusters = clusters.get("sky_clusters", None)

cluster_info = {
"Name": cluster_name,
"Cluster Type": cluster_type,
"Status": cluster_status,
"Last Active (UTC)": last_active_at,
}
running_clusters.append(
cluster_info
) if cluster_status == "running" else not_running_clusters.append(cluster_info)
if not all_clusters:
console.print("No clusters found.")
return

# TODO: will be used if we'll need to print not-running clusters
# Sort not-running clusters by the 'Status' column
# not_running_clusters = sorted(not_running_clusters, key=lambda x: x["Status"])

# Sort clusters by the 'Last Active (UTC)' column
not_running_clusters = sorted(
not_running_clusters,
key=lambda x: (x["Last Active (UTC)"] is None, x["Last Active (UTC)"]),
reverse=True,
)
# not_running_clusters = sorted(
# not_running_clusters, key=lambda x: x["Last Active (UTC)"], reverse=True
# )
running_clusters = sorted(
running_clusters,
key=lambda x: (x["Last Active (UTC)"] is None, x["Last Active (UTC)"]),
reverse=True,
running_clusters, key=lambda x: x["Last Active (UTC)"], reverse=True
)

# creating the clusters table
total_clusters = len(clusters_in_den)
table_title = f"[bold cyan]{rns_client.username}'s Clusters (Running: {len(running_clusters)}, Total: {total_clusters})[/bold cyan]"
table = Table(title=table_title)
filters_requested: bool = show_all or since or cluster_status

# Add columns to the table
table.add_column("Name", justify="left", no_wrap=True)
table.add_column("Cluster Type", justify="center", no_wrap=True)
table.add_column("Status", justify="left")
table.add_column("Last Active (UTC)", justify="left")
clusters_to_print = all_clusters if filters_requested else running_clusters

# TODO: will be used if we'll need to print not-running clusters
# all_clusters = running_clusters + not_running_clusters
if show_all:
# if user requesting all den cluster, limit print only to 50 clusters max.
clusters_to_print = clusters_to_print[: (min(len(clusters_to_print), 50))]

for rh_cluster in running_clusters:
last_active_at = rh_cluster.get("Last Active (UTC)")
# creating the clusters table
table = _create_output_table(
total_clusters=len(all_clusters),
running_clusters=len(running_clusters),
displayed_clusters=len(clusters_to_print),
filters_requested=filters_requested,
)

# Print Running clusters that were active it the last 24 hours
now = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc)
if (now - last_active_at).total_seconds() <= LAST_ACTIVE_AT_TIMEFRAME:
table.add_row(
rh_cluster.get("Name"),
rh_cluster.get("Cluster Type"),
StatusColors.get_status_color(rh_cluster.get("Status")),
str(last_active_at).split("+")[
0
], # The split is required to remove the offset (according to UTC)
)
_add_clusters_to_output_table(
table=table, clusters=clusters_to_print, filters_requested=filters_requested
)

console.print(table)

if len(sky_clusters) > 0:
console.print(
f"There are {len(sky_clusters)} live cluster(s) that are not saved in Den. For more information, please run [bold italic]sky status -r[/bold italic]."
)
_print_msg_live_sky_clusters(sky_clusters=len(sky_clusters))


# Register the 'cluster' command group with the main runhouse application
Expand Down
Loading

0 comments on commit edf7c85

Please sign in to comment.