Skip to content

Commit

Permalink
cluster list filters
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexandra Belousov authored and Alexandra Belousov committed Sep 12, 2024
1 parent 447a12c commit 3a3f0a4
Show file tree
Hide file tree
Showing 3 changed files with 301 additions and 122 deletions.
4 changes: 4 additions & 0 deletions runhouse/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
SECOND = 1
MINUTE = 60
HOUR = 3600
DAY = HOUR * 24
DEFAULT_STATUS_CHECK_INTERVAL = 1 * MINUTE
INCREASED_STATUS_CHECK_INTERVAL = 1 * HOUR
GPU_COLLECTION_INTERVAL = 5 * SECOND
Expand All @@ -104,3 +105,6 @@

# Constants runhouse cluster list
LAST_ACTIVE_AT_TIMEFRAME = 24 * HOUR

# in seconds
TIME_UNITS = {"s": SECOND, "m": MINUTE, "h": HOUR, "d": DAY}
220 changes: 98 additions & 122 deletions runhouse/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
import datetime
import importlib
import logging
Expand All @@ -10,8 +9,6 @@
from pathlib import Path
from typing import Any, Dict, List, Optional

import httpx

import ray

import requests
Expand All @@ -38,6 +35,7 @@
SERVER_STOP_CMD,
START_NOHUP_CMD,
START_SCREEN_CMD,
TIME_UNITS,
)
from runhouse.globals import rns_client
from runhouse.logger import get_logger
Expand Down Expand Up @@ -595,27 +593,94 @@ def status(
_print_status(cluster_status, current_cluster)


async def aget_clusters_from_den():
httpx_client = httpx.AsyncClient()
def _create_output_table(total_clusters: int, running_clusters: int):
table_title = f"[bold cyan]{rns_client.username}'s Clusters (Running: {running_clusters}, Total: {total_clusters})[/bold cyan]"
table = Table(title=table_title)

# Add columns to the table
table.add_column("Name", justify="left", no_wrap=True)
table.add_column("Cluster Type", justify="left", no_wrap=True)
table.add_column("Status", justify="left")
table.add_column("Last Active (UTC)", justify="left")

return table


get_clusters_params = {"resource_type": "cluster", "folder": rns_client.username}
clusters_in_den_resp = await httpx_client.get(
f"{rns_client.api_server_url}/resource",
params=get_clusters_params,
headers=rns_client.request_headers(),
def _add_cluster_as_table_row(table: Table, rh_cluster: dict):
last_active = rh_cluster.get("Last Active (UTC)")
last_active = last_active if last_active != "1970-01-01 00:00:00" else "Unknown"
table.add_row(
rh_cluster.get("Name"),
rh_cluster.get("Cluster Type"),
rh_cluster.get("Status"),
last_active,
)

return clusters_in_den_resp
return table


def _add_clusters_to_output_table(
table: Table, clusters: list[Dict], filters_requested: dict = None
):

for rh_cluster in clusters:
last_active_at = rh_cluster.get("Last Active (UTC)")
last_active_at_no_offset = str(last_active_at).split("+")[
0
] # The split is required to remove the offset (according to UTC)
rh_cluster["Last Active (UTC)"] = last_active_at_no_offset
rh_cluster["Status"] = get_status_color(rh_cluster.get("Status"))

if filters_requested:
table = _add_cluster_as_table_row(table, rh_cluster)
else:
# Default behaviour: Print Running clusters that were active it the last 24 hours
now = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc)
if (now - last_active_at).total_seconds() <= LAST_ACTIVE_AT_TIMEFRAME:
table = _add_cluster_as_table_row(table, rh_cluster)


def _print_msg_live_sky_clusters(live_clusters_not_in_den: int):
if live_clusters_not_in_den > 0:
live_sky_clusters_str = (
f"There are {live_clusters_not_in_den} live clusters that are not saved in Den."
if live_clusters_not_in_den > 1
else "There is 1 live cluster that is not saved in Den."
)

clusters_pronoun = "them" if live_clusters_not_in_den > 1 else "it"

console.print(
f"{live_sky_clusters_str} To get information about {clusters_pronoun}, please run [bold italic]sky status -r[/bold italic]."
)


def get_clusters_from_den():
return asyncio.run(aget_clusters_from_den())
def supported_time_units():
return ", ".join(list(TIME_UNITS.keys()))


@cluster_app.command("list")
def cluster_list():
def cluster_list(
get_all_clusters: bool = typer.Option(
False,
"--a",
"-a",
help="Get all den clusters. Up to 50 most recently active clusters will be displayed.",
),
since: Optional[str] = typer.Option(
None,
"--since",
help="Time duration to filter on. Minimum time to filter on is 1 minute. If the provided value will be less "
"than that, the filter will be set to 1 minute by default. You may filter by seconds (s), "
"minutes (m), hours (h) or days (s). Examples: 30s, 15m, 2h, 3d.",
),
cluster_status: Optional[str] = typer.Option(
None,
"--status",
help="Cluster status to filter on. Supported filter values: running, terminated, runhouse_daemon_down.",
),
):
"""Load Runhouse clusters"""
import sky

# logged out case
if not rh.configs.token:
Expand All @@ -626,123 +691,34 @@ def cluster_list():
)
return

on_demand_clusters_sky = sky.status(refresh=True)

clusters_in_den_resp = get_clusters_from_den()

if clusters_in_den_resp.status_code != 200:
logger.error(f"Failed to load {rns_client.username}'s clusters from Den")
clusters_in_den = []
else:
clusters_in_den = clusters_in_den_resp.json().get("data")

clusters_in_den_names = [cluster.get("name") for cluster in clusters_in_den]

if not on_demand_clusters_sky and not clusters_in_den:
console.print("No existing clusters.")

if on_demand_clusters_sky:
# getting the on-demand clusters that are not saved in den.
on_demand_clusters_sky = [
cluster
for cluster in on_demand_clusters_sky
if f'/{rns_client.username}/{cluster.get("name")}'
not in clusters_in_den_names
]

running_clusters = []
not_running_clusters = []

for den_cluster in clusters_in_den:
# get just name, not full rns address. reset is used so the name will be printed all in white.
cluster_name = f'[reset]{den_cluster.get("name").split("/")[-1]}'
cluster_type = den_cluster.get("data").get("resource_subtype")
cluster_status = (
den_cluster.get("status") if den_cluster.get("status") else "unknown"
)

# currently relying on status pings to den as a sign of cluster activity.
# The split is required to remove milliseconds and the offset (according to UTC) from the timestamp.
# (status_last_checked is in the following format: YYYY-MM-DD HH:MM:SS.ssssss±HH:MM)

last_active_at = den_cluster.get("status_last_checked")
last_active_at = (
datetime.datetime.fromisoformat(last_active_at.split(".")[0])
if isinstance(last_active_at, str)
else datetime.datetime(1970, 1, 1)
) # Convert to datetime
last_active_at = last_active_at.replace(tzinfo=datetime.timezone.utc)

if cluster_status == "running" and not last_active_at:
# For BC, in case there are clusters that were saved and created before we introduced sending cluster status to den.
cluster_status = "unknown"

cluster_info = {
"Name": cluster_name,
"Cluster Type": cluster_type,
"Status": cluster_status,
"Last Active (UTC)": last_active_at,
}
running_clusters.append(
cluster_info
) if cluster_status == "running" else not_running_clusters.append(cluster_info)

# TODO: will be used if we'll need to print not-running clusters
# Sort not-running clusters by the 'Status' column
# not_running_clusters = sorted(not_running_clusters, key=lambda x: x["Status"])

# Sort clusters by the 'Last Active (UTC)' column
not_running_clusters = sorted(
not_running_clusters, key=lambda x: x["Last Active (UTC)"], reverse=True
)
running_clusters = sorted(
running_clusters, key=lambda x: x["Last Active (UTC)"], reverse=True
all_clusters, running_clusters, sky_live_clusters = Cluster.list(
get_all_clusters=get_all_clusters, since=since, status=cluster_status
)
if not all_clusters:
console.print("No existing clusters.")

# creating the clusters table
total_clusters = len(clusters_in_den)
table_title = f"[bold cyan]{rns_client.username}'s Clusters (Running: {len(running_clusters)}, Total: {total_clusters})[/bold cyan]"
table = Table(title=table_title)
table = _create_output_table(
total_clusters=len(all_clusters), running_clusters=len(running_clusters)
)

# Add columns to the table
table.add_column("Name", justify="left", no_wrap=True)
table.add_column("Cluster Type", justify="center", no_wrap=True)
table.add_column("Status", justify="left")
table.add_column("Last Active (UTC)", justify="left")
filters_requested: bool = (
get_all_clusters or since or cluster_status or get_all_clusters
)

# TODO: will be used if we'll need to print not-running clusters
# all_clusters = running_clusters + not_running_clusters
clusters_to_print = all_clusters if filters_requested else running_clusters

for rh_cluster in running_clusters:
last_active_at = rh_cluster.get("Last Active (UTC)")
if get_all_clusters:
# if user requesting all den cluster, limit print only to 50 clusters max.
clusters_to_print = clusters_to_print[: (min(len(clusters_to_print), 50))]

# Print Running clusters that were active it the last 24 hours
now = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc)
if (now - last_active_at).total_seconds() <= LAST_ACTIVE_AT_TIMEFRAME:
table.add_row(
rh_cluster.get("Name"),
rh_cluster.get("Cluster Type"),
get_status_color(rh_cluster.get("Status")),
str(last_active_at).split("+")[
0
], # The split is required to remove the offset (according to UTC)
)
_add_clusters_to_output_table(
table=table, clusters=clusters_to_print, filters_requested=filters_requested
)

console.print(table)

live_clusters_not_in_den = len(on_demand_clusters_sky)
if live_clusters_not_in_den > 0:
live_sky_clusters_str = (
f"There are {live_clusters_not_in_den} live clusters that are not saved in Den."
if live_clusters_not_in_den > 1
else "There is 1 live cluster that is not saved in Den."
)

clusters_pronoun = "them" if live_clusters_not_in_den > 1 else "it"

console.print(
f"{live_sky_clusters_str} To get information about {clusters_pronoun}, please run [bold italic]sky status -r[/bold italic]."
)
_print_msg_live_sky_clusters(live_clusters_not_in_den=len(sky_live_clusters))


# Register the 'cluster' command group with the main runhouse application
Expand Down
Loading

0 comments on commit 3a3f0a4

Please sign in to comment.