Skip to content

Commit

Permalink
cluster list filters
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexandra Belousov authored and Alexandra Belousov committed Sep 9, 2024
1 parent 1633300 commit 1c535c0
Show file tree
Hide file tree
Showing 3 changed files with 338 additions and 122 deletions.
9 changes: 9 additions & 0 deletions runhouse/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
SECOND = 1
MINUTE = 60
HOUR = 3600
DAY = HOUR * 24
DEFAULT_STATUS_CHECK_INTERVAL = 1 * MINUTE
INCREASED_STATUS_CHECK_INTERVAL = 1 * HOUR
GPU_COLLECTION_INTERVAL = 5 * SECOND
Expand All @@ -104,3 +105,11 @@

# Constants runhouse cluster list
LAST_ACTIVE_AT_TIMEFRAME = 24 * HOUR
SUPPORTED_CLUSTER_SUBTYPES = {
"onDemand": "OnDemandCluster",
"static": "Cluster",
"k8": "kubernetes",
}

# in seconds
TIME_UNITS = {"s": SECOND, "m": MINUTE, "h": HOUR, "d": DAY}
235 changes: 113 additions & 122 deletions runhouse/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
import datetime
import importlib
import logging
Expand All @@ -10,8 +9,6 @@
from pathlib import Path
from typing import Any, Dict, List, Optional

import httpx

import ray

import requests
Expand All @@ -38,13 +35,16 @@
SERVER_STOP_CMD,
START_NOHUP_CMD,
START_SCREEN_CMD,
SUPPORTED_CLUSTER_SUBTYPES,
TIME_UNITS,
)
from runhouse.globals import rns_client
from runhouse.logger import get_logger
from runhouse.resources.hardware.ray_utils import (
check_for_existing_ray_instance,
kill_actors,
)
from runhouse.resources.hardware.utils import ResourceServerStatus

from runhouse.utils import get_status_color

Expand Down Expand Up @@ -601,154 +601,145 @@ def status(
_print_status(cluster_status, current_cluster)


async def aget_clusters_from_den():
httpx_client = httpx.AsyncClient()
def _create_output_table(total_clusters: int, running_clusters: int):
table_title = f"[bold cyan]{rns_client.username}'s Clusters (Running: {running_clusters}, Total: {total_clusters})[/bold cyan]"
table = Table(title=table_title)

# Add columns to the table
table.add_column("Name", justify="left", no_wrap=True)
table.add_column("Cluster Type", justify="left", no_wrap=True)
table.add_column("Status", justify="left")
table.add_column("Last Active (UTC)", justify="left")

return table


get_clusters_params = {"resource_type": "cluster", "folder": rns_client.username}
clusters_in_den_resp = await httpx_client.get(
f"{rns_client.api_server_url}/resource",
params=get_clusters_params,
headers=rns_client.request_headers(),
def _add_cluster_as_table_row(table: Table, rh_cluster: dict):
last_active = rh_cluster.get("Last Active (UTC)")
last_active = last_active if last_active != "1970-01-01 00:00:00" else "Unknown"
table.add_row(
rh_cluster.get("Name"),
rh_cluster.get("Cluster Type"),
rh_cluster.get("Status"),
last_active,
)

return clusters_in_den_resp
return table


def get_clusters_from_den():
return asyncio.run(aget_clusters_from_den())
def _add_clusters_to_output_table(
table: Table, clusters: list[Dict], filters_requested: dict = None
):

for rh_cluster in clusters:
last_active_at = rh_cluster.get("Last Active (UTC)")
last_active_at_no_offset = str(last_active_at).split("+")[
0
] # The split is required to remove the offset (according to UTC)
rh_cluster["Last Active (UTC)"] = last_active_at_no_offset
rh_cluster["Status"] = get_status_color(rh_cluster.get("Status"))

if filters_requested:
table = _add_cluster_as_table_row(table, rh_cluster)
else:
# Default behaviour: Print Running clusters that were active it the last 24 hours
now = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc)
if (now - last_active_at).total_seconds() <= LAST_ACTIVE_AT_TIMEFRAME:
table = _add_cluster_as_table_row(table, rh_cluster)

@cluster_app.command("list")
def cluster_list():
"""Load Runhouse clusters"""
import sky

# logged out case
if not rh.configs.token:
# TODO [SB]: adjust msg formatting (coloring etc)
sky_cli_command_formatted = f"{italic_bold_ansi}sky status -r{reset_format}" # will be printed bold and italic
def _print_msg_live_sky_clusters(live_clusters_not_in_den: int):
if live_clusters_not_in_den > 0:
live_sky_clusters_str = (
f"There are {live_clusters_not_in_den} live clusters that are not saved in Den."
if live_clusters_not_in_den > 1
else "There is 1 live cluster that is not saved in Den."
)

clusters_pronoun = "them" if live_clusters_not_in_den > 1 else "it"

console.print(
f"This feature is available only for Den users. Please run {sky_cli_command_formatted} to get on-demand cluster(s) information or sign-up Den."
f"{live_sky_clusters_str} To get information about {clusters_pronoun}, please run [bold italic]sky status -r[/bold italic]."
)
return

on_demand_clusters_sky = sky.status(refresh=True)

clusters_in_den_resp = get_clusters_from_den()
def supported_cluster_statuses():
return ", ".join(list(ResourceServerStatus.__members__.values().mapping.keys()))

if clusters_in_den_resp.status_code != 200:
logger.error(f"Failed to load {rns_client.username}'s clusters from Den")
clusters_in_den = []
else:
clusters_in_den = clusters_in_den_resp.json().get("data")

clusters_in_den_names = [cluster.get("name") for cluster in clusters_in_den]
def supported_cluster_subtypes():
return ", ".join(list(SUPPORTED_CLUSTER_SUBTYPES.keys()))

if not on_demand_clusters_sky and not clusters_in_den:
console.print("No existing clusters.")

if on_demand_clusters_sky:
# getting the on-demand clusters that are not saved in den.
on_demand_clusters_sky = [
cluster
for cluster in on_demand_clusters_sky
if f'/{rns_client.username}/{cluster.get("name")}'
not in clusters_in_den_names
]
def supported_time_units():
return ", ".join(list(TIME_UNITS.keys()))

running_clusters = []
not_running_clusters = []

for den_cluster in clusters_in_den:
# get just name, not full rns address. reset is used so the name will be printed all in white.
cluster_name = f'[reset]{den_cluster.get("name").split("/")[-1]}'
cluster_type = den_cluster.get("data").get("resource_subtype")
cluster_status = (
den_cluster.get("status") if den_cluster.get("status") else "unknown"
@cluster_app.command("list")
def cluster_list(
get_all_clusters: bool = typer.Option(
False,
"--a",
"-a",
help="Get all den clusters. Up to 50 most recently clusters will be returnted",
),
since: Optional[str] = typer.Option(
None,
"--since",
help=f"Time duration to filter on. Supported time units: {supported_time_units()}. Examples: 30s, 15m, 2h, 3d.",
),
cluster_status: Optional[str] = typer.Option(
None,
"--status",
help=f"Cluster status to filter on. Supported statues: {supported_cluster_statuses()}.",
),
cluster_subtype: Optional[str] = typer.Option(
None,
"--type",
help=f"Runhouse cluster type to filter on. Supported types: {supported_cluster_subtypes()}.",
),
):
"""Load Runhouse clusters"""

# logged out case
if not rh.configs.token:
# TODO [SB]: adjust msg formatting (coloring etc)
sky_cli_command_formatted = f"{italic_bold_ansi}sky status -r{reset_format}" # will be printed bold and italic
console.print(
f"This feature is available only for Den users. Please run {sky_cli_command_formatted} to get on-demand cluster(s) information or sign-up Den."
)
return

# currently relying on status pings to den as a sign of cluster activity.
# The split is required to remove milliseconds and the offset (according to UTC) from the timestamp.
# (status_last_checked is in the following format: YYYY-MM-DD HH:MM:SS.ssssss±HH:MM)

last_active_at = den_cluster.get("status_last_checked")
last_active_at = (
datetime.datetime.fromisoformat(last_active_at.split(".")[0])
if isinstance(last_active_at, str)
else datetime.datetime(1970, 1, 1)
) # Convert to datetime
last_active_at = last_active_at.replace(tzinfo=datetime.timezone.utc)

if cluster_status == "running" and not last_active_at:
# For BC, in case there are clusters that were saved and created before we introduced sending cluster status to den.
cluster_status = "unknown"

cluster_info = {
"Name": cluster_name,
"Cluster Type": cluster_type,
"Status": cluster_status,
"Last Active (UTC)": last_active_at,
}
running_clusters.append(
cluster_info
) if cluster_status == "running" else not_running_clusters.append(cluster_info)

# TODO: will be used if we'll need to print not-running clusters
# Sort not-running clusters by the 'Status' column
# not_running_clusters = sorted(not_running_clusters, key=lambda x: x["Status"])

# Sort clusters by the 'Last Active (UTC)' column
not_running_clusters = sorted(
not_running_clusters, key=lambda x: x["Last Active (UTC)"], reverse=True
)
running_clusters = sorted(
running_clusters, key=lambda x: x["Last Active (UTC)"], reverse=True
all_clusters, running_clusters, sky_live_clusters = Cluster.list(
get_all_clusters=get_all_clusters,
since=since,
status=cluster_status,
subtype=cluster_subtype,
)
if not all_clusters:
console.print("No existing clusters.")

# creating the clusters table
total_clusters = len(clusters_in_den)
table_title = f"[bold cyan]{rns_client.username}'s Clusters (Running: {len(running_clusters)}, Total: {total_clusters})[/bold cyan]"
table = Table(title=table_title)

# Add columns to the table
table.add_column("Name", justify="left", no_wrap=True)
table.add_column("Cluster Type", justify="center", no_wrap=True)
table.add_column("Status", justify="left")
table.add_column("Last Active (UTC)", justify="left")
table = _create_output_table(
total_clusters=len(all_clusters), running_clusters=len(running_clusters)
)
clusters_to_print = all_clusters if get_all_clusters else running_clusters

# TODO: will be used if we'll need to print not-running clusters
# all_clusters = running_clusters + not_running_clusters
if get_all_clusters:
# if user requesting all den cluster, limit print only to 50 clusters max.
clusters_to_print = clusters_to_print[: (min(len(clusters_to_print), 50))]

for rh_cluster in running_clusters:
last_active_at = rh_cluster.get("Last Active (UTC)")
filters_requested: bool = (
get_all_clusters or since or cluster_status or cluster_subtype
)

# Print Running clusters that were active it the last 24 hours
now = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc)
if (now - last_active_at).total_seconds() <= LAST_ACTIVE_AT_TIMEFRAME:
table.add_row(
rh_cluster.get("Name"),
rh_cluster.get("Cluster Type"),
get_status_color(rh_cluster.get("Status")),
str(last_active_at).split("+")[
0
], # The split is required to remove the offset (according to UTC)
)
_add_clusters_to_output_table(
table=table, clusters=clusters_to_print, filters_requested=filters_requested
)

console.print(table)

live_clusters_not_in_den = len(on_demand_clusters_sky)
if live_clusters_not_in_den > 0:
live_sky_clusters_str = (
f"There are {live_clusters_not_in_den} live clusters that are not saved in Den."
if live_clusters_not_in_den > 1
else "There is 1 live cluster that is not saved in Den."
)

clusters_pronoun = "them" if live_clusters_not_in_den > 1 else "it"

console.print(
f"{live_sky_clusters_str} To get information about {clusters_pronoun}, please run [bold italic]sky status -r[/bold italic]."
)
_print_msg_live_sky_clusters(live_clusters_not_in_den=len(sky_live_clusters))


# Register the 'cluster' command group with the main runhouse application
Expand Down
Loading

0 comments on commit 1c535c0

Please sign in to comment.