From 436a61ad11c0329a578ddb83315c700c99dbb64f Mon Sep 17 00:00:00 2001 From: Alexandra Belousov Date: Wed, 4 Sep 2024 17:31:26 +0300 Subject: [PATCH] cluster list filters --- runhouse/constants.py | 4 + runhouse/main.py | 227 ++++++++++++------------- runhouse/resources/hardware/cluster.py | 163 ++++++++++++++++++ runhouse/resources/hardware/utils.py | 57 +++++++ 4 files changed, 334 insertions(+), 117 deletions(-) diff --git a/runhouse/constants.py b/runhouse/constants.py index 0ed3b35b7..eba582dc9 100644 --- a/runhouse/constants.py +++ b/runhouse/constants.py @@ -80,6 +80,7 @@ SECOND = 1 MINUTE = 60 HOUR = 3600 +DAY = HOUR * 24 DEFAULT_STATUS_CHECK_INTERVAL = 1 * MINUTE INCREASED_STATUS_CHECK_INTERVAL = 1 * HOUR GPU_COLLECTION_INTERVAL = 5 * SECOND @@ -104,3 +105,6 @@ # Constants runhouse cluster list LAST_ACTIVE_AT_TIMEFRAME = 24 * HOUR + +# in seconds +TIME_UNITS = {"s": SECOND, "m": MINUTE, "h": HOUR, "d": DAY} diff --git a/runhouse/main.py b/runhouse/main.py index 2cfaac22c..33805f630 100644 --- a/runhouse/main.py +++ b/runhouse/main.py @@ -1,3 +1,4 @@ +import asyncio import datetime import importlib import logging @@ -27,6 +28,7 @@ from runhouse.constants import ( BULLET_UNICODE, DOUBLE_SPACE_UNICODE, + HOUR, LAST_ACTIVE_AT_TIMEFRAME, RAY_KILL_CMD, RAY_START_CMD, @@ -35,6 +37,7 @@ SERVER_STOP_CMD, START_NOHUP_CMD, START_SCREEN_CMD, + TIME_UNITS, ) from runhouse.globals import rns_client from runhouse.logger import get_logger @@ -590,159 +593,149 @@ def status( _print_status(cluster_status, current_cluster) -def get_clusters_from_den(): - get_clusters_params = {"resource_type": "cluster", "folder": rns_client.username} - clusters_in_den_resp = rns_client.session.get( - f"{rns_client.api_server_url}/resource", - params=get_clusters_params, - headers=rns_client.request_headers(), +def _create_output_table( + total_clusters: int, + running_clusters: int, + displayed_clusters: int, + filters_requested: bool, +): + displayed_running_clusters = ( + running_clusters + if running_clusters < displayed_clusters + else displayed_clusters ) + table_title = f"[bold cyan]Clusters for {rns_client.username} (Running: {displayed_running_clusters}/{running_clusters}, Total Displayed: {displayed_clusters}/{total_clusters})[/bold cyan]" - return clusters_in_den_resp + table = Table(title=table_title) + if not filters_requested: + table.caption = f"[reset]Showing clusters that were active in the last {LAST_ACTIVE_AT_TIMEFRAME / HOUR} hours." + table.caption_justify = "left" -@cluster_app.command("list") -def cluster_list(): - """List all Runhouse clusters saved in Den""" - import sky + # Add columns to the table + table.add_column("Name", justify="left", no_wrap=True) + table.add_column("Cluster Type", justify="left", no_wrap=True) + table.add_column("Status", justify="left") + table.add_column("Last Active (UTC)", justify="left") - # logged out case - if not rh.configs.token: - # TODO [SB]: adjust msg formatting (coloring etc) - sky_cli_command_formatted = f"{italic_bold_ansi}sky status -r{reset_format}" # will be printed bold and italic - console.print( - f"Runhouse token required to view all clusters. Please run `runhouse login` to load your token. To view on-demand clusters run {sky_cli_command_formatted}" - ) - return + return table - sky_clusters = sky.status() - clusters_in_den_resp = get_clusters_from_den() +def _add_cluster_as_table_row(table: Table, rh_cluster: dict): + last_active = rh_cluster.get("Last Active (UTC)") + last_active = last_active if last_active != "1970-01-01 00:00:00" else "Unknown" + table.add_row( + rh_cluster.get("Name"), + rh_cluster.get("Cluster Type"), + rh_cluster.get("Status"), + last_active, + ) - if clusters_in_den_resp.status_code != 200: - logger.error( - f"Failed to load clusters from Den for username: {rns_client.username}" - ) - clusters_in_den = [] - else: - clusters_in_den = clusters_in_den_resp.json().get("data") + return table - clusters_in_den_names = [cluster.get("name") for cluster in clusters_in_den] - if not sky_clusters and not clusters_in_den: - console.print("No existing clusters.") +def _add_clusters_to_output_table( + table: Table, clusters: list[Dict], filters_requested: dict = None +): + for rh_cluster in clusters: + last_active_at = rh_cluster.get("Last Active (UTC)") + last_active_at_no_offset = str(last_active_at).split("+")[ + 0 + ] # The split is required to remove the offset (according to UTC) + rh_cluster["Last Active (UTC)"] = last_active_at_no_offset + rh_cluster["Status"] = StatusColors.get_status_color(rh_cluster.get("Status")) - if sky_clusters: - # getting the on-demand clusters that are not saved in den. - sky_clusters = [ - cluster - for cluster in sky_clusters - if f'/{rns_client.username}/{cluster.get("name")}' - not in clusters_in_den_names - ] + table = _add_cluster_as_table_row(table, rh_cluster) - total_clusters = len(clusters_in_den) - table_title = f"[bold cyan]Clusters for {rns_client.username} (Total: {total_clusters})[/bold cyan]" - table = Table(title=table_title, title_justify="left") +def _print_msg_live_sky_clusters(sky_clusters: int): + if sky_clusters > 0: + console.print( + f"There are {sky_clusters} live cluster(s) that are not saved in Den. For more information, please run [bold italic]sky status -r[/bold italic]." + ) - # Add columns to the table - table.add_column("Name", justify="left", no_wrap=True) - table.add_column("Cluster Type", justify="center", no_wrap=True) - table.add_column("Status", justify="left") - running_clusters = [] - not_running_clusters = [] +def supported_time_units(): + return ", ".join(list(TIME_UNITS.keys())) - for den_cluster in clusters_in_den: - # get just name, not full rns address. reset is used so the name will be printed all in white. - cluster_name = f'[reset]{den_cluster.get("name").split("/")[-1]}' - cluster_type = den_cluster.get("data").get("resource_subtype") - cluster_status = ( - den_cluster.get("status") if den_cluster.get("status") else None - ) - # currently relying on status pings to den as a sign of cluster activity. - # The split is required to remove milliseconds and the offset (according to UTC) from the timestamp. - # (status_last_checked is in the following format: YYYY-MM-DD HH:MM:SS.ssssss±HH:MM) +@cluster_app.command("list") +def cluster_list( + show_all: bool = typer.Option( + False, + "--a", + "--all", + help="Get all den clusters. Up to 50 most recently active clusters will be displayed.", + ), + since: Optional[str] = typer.Option( + None, + "--since", + help="Time duration to filter on. Minimum allowable filter is 1 minute. If the provided value will be less " + "than that, the filter will be set to 1 minute by default. You may filter by seconds (s), " + "minutes (m), hours (h) or days (s). Examples: 30s, 15m, 2h, 3d.", + ), + cluster_status: Optional[str] = typer.Option( + None, + "--status", + help="Cluster status to filter on. Supported filter values: running, terminated (cluster is not live), down (runhouser server is down, but the cluster might be live).", + ), +): + """Load Runhouse clusters""" - last_active_at = den_cluster.get("status_last_checked") - last_active_at = ( - datetime.datetime.fromisoformat(last_active_at.split(".")[0]) - if isinstance(last_active_at, str) - else None - ) - last_active_at = ( - last_active_at.replace(tzinfo=datetime.timezone.utc) - if last_active_at - else None + # logged out case + if not rh.configs.token: + # TODO [SB]: adjust msg formatting (coloring etc) + sky_cli_command_formatted = f"{italic_bold_ansi}sky status -r{reset_format}" # will be printed bold and italic + console.print( + f"Listing clusters requires a Runhouse token. Please run `runhouse login` to get your token, or run {sky_cli_command_formatted} to list locally stored on-demand clusters." ) + return - if cluster_status == "running" and not last_active_at: - # For BC, in case there are clusters that were saved and created before we introduced sending cluster status to den. - cluster_status = "unknown" + clusters = Cluster.list(show_all=show_all, since=since, status=cluster_status) - cluster_info = { - "Name": cluster_name, - "Cluster Type": cluster_type, - "Status": cluster_status, - "Last Active (UTC)": last_active_at, - } - running_clusters.append( - cluster_info - ) if cluster_status == "running" else not_running_clusters.append(cluster_info) + all_clusters = clusters.get("all_clusters", None) + running_clusters = clusters.get("running_clusters", None) + sky_clusters = clusters.get("sky_clusters", None) + + if not all_clusters: + console.print("No clusters found.") + return # TODO: will be used if we'll need to print not-running clusters # Sort not-running clusters by the 'Status' column # not_running_clusters = sorted(not_running_clusters, key=lambda x: x["Status"]) # Sort clusters by the 'Last Active (UTC)' column - not_running_clusters = sorted( - not_running_clusters, - key=lambda x: (x["Last Active (UTC)"] is None, x["Last Active (UTC)"]), - reverse=True, - ) + # not_running_clusters = sorted( + # not_running_clusters, key=lambda x: x["Last Active (UTC)"], reverse=True + # ) running_clusters = sorted( - running_clusters, - key=lambda x: (x["Last Active (UTC)"] is None, x["Last Active (UTC)"]), - reverse=True, + running_clusters, key=lambda x: x["Last Active (UTC)"], reverse=True ) - # creating the clusters table - total_clusters = len(clusters_in_den) - table_title = f"[bold cyan]{rns_client.username}'s Clusters (Running: {len(running_clusters)}, Total: {total_clusters})[/bold cyan]" - table = Table(title=table_title) + filters_requested: bool = show_all or since or cluster_status - # Add columns to the table - table.add_column("Name", justify="left", no_wrap=True) - table.add_column("Cluster Type", justify="center", no_wrap=True) - table.add_column("Status", justify="left") - table.add_column("Last Active (UTC)", justify="left") + clusters_to_print = all_clusters if filters_requested else running_clusters - # TODO: will be used if we'll need to print not-running clusters - # all_clusters = running_clusters + not_running_clusters + if show_all: + # if user requesting all den cluster, limit print only to 50 clusters max. + clusters_to_print = clusters_to_print[: (min(len(clusters_to_print), 50))] - for rh_cluster in running_clusters: - last_active_at = rh_cluster.get("Last Active (UTC)") + # creating the clusters table + table = _create_output_table( + total_clusters=len(all_clusters), + running_clusters=len(running_clusters), + displayed_clusters=len(clusters_to_print), + filters_requested=filters_requested, + ) - # Print Running clusters that were active it the last 24 hours - now = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc) - if (now - last_active_at).total_seconds() <= LAST_ACTIVE_AT_TIMEFRAME: - table.add_row( - rh_cluster.get("Name"), - rh_cluster.get("Cluster Type"), - StatusColors.get_status_color(rh_cluster.get("Status")), - str(last_active_at).split("+")[ - 0 - ], # The split is required to remove the offset (according to UTC) - ) + _add_clusters_to_output_table( + table=table, clusters=clusters_to_print, filters_requested=filters_requested + ) console.print(table) - if len(sky_clusters) > 0: - console.print( - f"There are {len(sky_clusters)} live cluster(s) that are not saved in Den. For more information, please run [bold italic]sky status -r[/bold italic]." - ) + _print_msg_live_sky_clusters(sky_clusters=len(sky_clusters)) # Register the 'cluster' command group with the main runhouse application @@ -1157,4 +1150,4 @@ def main( if version: print(f"{__version__}") elif ctx.invoked_subcommand is None: - subprocess.run("runhouse --help", shell=True) + subprocess.run("runhouse --help", shell=True) \ No newline at end of file diff --git a/runhouse/resources/hardware/cluster.py b/runhouse/resources/hardware/cluster.py index bb48ddf31..dbf8e0fc2 100644 --- a/runhouse/resources/hardware/cluster.py +++ b/runhouse/resources/hardware/cluster.py @@ -1,5 +1,6 @@ import contextlib import copy +import datetime import importlib import json import os @@ -14,6 +15,8 @@ import yaml +from runhouse.resources.hardware.utils import parse_filters, ResourceServerStatus + from runhouse.rns.utils.api import ResourceAccess, ResourceVisibility from runhouse.servers.http.certs import TLSCertConfig from runhouse.utils import ( @@ -38,6 +41,7 @@ DEFAULT_SERVER_PORT, DEFAULT_STATUS_CHECK_INTERVAL, EMPTY_DEFAULT_ENV_NAME, + LAST_ACTIVE_AT_TIMEFRAME, LOCALHOST, NUM_PORTS_TO_TRY, RESERVED_SYSTEM_NAMES, @@ -1936,3 +1940,162 @@ def _folder_mv( def _folder_exists(self, path: Union[str, Path]): return self.client.folder_exists(path=path) + + @staticmethod + def get_clusters_from_den(cluster_filters: dict): + get_clusters_params = { + "resource_type": "cluster", + "folder": rns_client.username, + } + + # send den request with filters if the user specifies filters. + # If "all" filter is specified - get all clusters (no filters are added to get_clusters_params) + if cluster_filters and "all" not in cluster_filters.keys(): + get_clusters_params.update(cluster_filters) + + # If not filters are specified, get only running clusters. + elif not cluster_filters: + get_clusters_params.update( + {"cluster_status": "running", "since": LAST_ACTIVE_AT_TIMEFRAME} + ) + + clusters_in_den_resp = rns_client.session.get( + f"{rns_client.api_server_url}/resource", + params=get_clusters_params, + headers=rns_client.request_headers(), + ) + + return clusters_in_den_resp + + @staticmethod + def _get_unsaved_live_clusters(den_clusters: list[Dict], sky_live_clusters: list): + den_clusters_names = [c.get("name") for c in den_clusters] + + # getting the on-demand clusters that are not saved in den. + if sky_live_clusters: + return [ + cluster + for cluster in sky_live_clusters + if f'/{rns_client.username}/{cluster.get("name")}' + not in den_clusters_names + ] + else: + return [] + + @staticmethod + def _get_running_and_not_running_clusters(clusters: list): + running_clusters, not_running_clusters = [], [] + + for den_cluster in clusters: + # get just name, not full rns address. reset is used so the name will be printed all in white. + cluster_name = f'[reset]{den_cluster.get("name").split("/")[-1]}' + cluster_type = den_cluster.get("data").get("resource_subtype") + cluster_status = ( + den_cluster.get("status") if den_cluster.get("status") else "unknown" + ) + + # currently relying on status pings to den as a sign of cluster activity. + # The split is required to remove milliseconds and the offset (according to UTC) from the timestamp. + # (status_last_checked is in the following format: YYYY-MM-DD HH:MM:SS.ssssss±HH:MM) + + last_active_at = den_cluster.get("status_last_checked") + last_active_at = ( + datetime.datetime.fromisoformat(last_active_at.split(".")[0]) + if isinstance(last_active_at, str) + else datetime.datetime(1970, 1, 1) + ) # Convert to datetime + last_active_at = last_active_at.replace(tzinfo=datetime.timezone.utc) + + if cluster_status == "running" and not last_active_at: + # For BC, in case there are clusters that were saved and created before we introduced sending cluster status to den. + cluster_status = "unknown" + + cluster_info = { + "Name": cluster_name, + "Cluster Type": cluster_type, + "Status": cluster_status, + "Last Active (UTC)": last_active_at, + } + running_clusters.append( + cluster_info + ) if cluster_status == "running" else not_running_clusters.append( + cluster_info + ) + + # TODO: will be used if we'll need to print not-running clusters + + # Sort clusters by the 'Last Active (UTC)' and 'Status' column + not_running_clusters = sorted( + not_running_clusters, + key=lambda x: (x["Last Active (UTC)"], x["Status"]), + reverse=True, + ) + running_clusters = sorted( + running_clusters, key=lambda x: x["Last Active (UTC)"], reverse=True + ) + + return running_clusters, not_running_clusters + + ############################### + # Cluster list + ############################### + @classmethod + def list( + cls, + show_all: Optional[bool] = False, + since: Optional[str] = None, + status: Optional[Union[str, ResourceServerStatus]] = None, + ): + """ + Returns user's runhouse clusters saved in Den. If filters are provided, only clusters that are matching the + filters are returned. If not filters are provided, clusters that were active in the last 24 hours are returned. + + Args: + show_all Optional[bool]: Whether to get ALL user's clusters saved in den. + since Optional[int]: Clusters that were active in the last N seconds (= the provided value) will be returned. + status Optional[str]: Clusters with the provided status will be returned. + """ + + import sky + + cluster_filters = ( + parse_filters(since=since, cluster_status=status) + if not show_all + else {"all": "all"} + ) + + # get clusters from den + den_clusters_resp = Cluster.get_clusters_from_den( + cluster_filters=cluster_filters + ) + if den_clusters_resp.status_code != 200: + logger.error(f"Failed to load {rns_client.username}'s clusters from Den") + den_clusters = [] + else: + den_clusters = den_clusters_resp.json().get("data") + + # get sky live clusters + sky_live_clusters = sky.status(refresh=True) + + if not sky_live_clusters and not den_clusters: + return {} + + # sky_live_clusters = sky clusters found in the local sky DB but not saved in den + sky_live_clusters = Cluster._get_unsaved_live_clusters( + den_clusters=den_clusters, sky_live_clusters=sky_live_clusters + ) + + # running_clusters: running clusters which are saved in Den + # not running clusters: clusters that are terminated / unknown / down which are also saved in Den. + ( + running_clusters, + not_running_clusters, + ) = Cluster._get_running_and_not_running_clusters(clusters=den_clusters) + all_clusters = running_clusters + not_running_clusters + + clusters = { + "all_clusters": all_clusters, + "running_clusters": running_clusters, + "sky_clusters": sky_live_clusters, + } + return clusters diff --git a/runhouse/resources/hardware/utils.py b/runhouse/resources/hardware/utils.py index e98af49be..6880903bb 100644 --- a/runhouse/resources/hardware/utils.py +++ b/runhouse/resources/hardware/utils.py @@ -9,7 +9,9 @@ from runhouse.constants import ( CLUSTER_CONFIG_PATH, EMPTY_DEFAULT_ENV_NAME, + LAST_ACTIVE_AT_TIMEFRAME, RESERVED_SYSTEM_NAMES, + TIME_UNITS, ) from runhouse.logger import get_logger @@ -248,3 +250,58 @@ def up_cluster_helper(cluster, capture_output: Union[bool, str] = True): f.write(outfile.output) else: cluster.up() + + +################################### +# Cluster list helping methods +################################### + + +def parse_time_duration(duration: str): + # A simple parser for duration like "15m", "2h", "3d" + try: + unit = duration[-1] + value = int(duration[:-1]) + + # if the user provides a "--since" time filter than is less than a minute, set it by default to one minute. + if unit == "s" and value < 60: + value = 60 + + time_duration = value * TIME_UNITS.get(unit, 0) + + if time_duration == 0 and value != 0: + logger.warning( + f"Filter is not applied, invalid time unit provided ({unit})." + ) + return + + return time_duration + except Exception: + return LAST_ACTIVE_AT_TIMEFRAME + + +def parse_filters(since: str, cluster_status: str): + cluster_filters = {} + + if since: + last_active_in: int = parse_time_duration( + duration=since + ) # return in representing the "since" filter in seconds + if last_active_in: + cluster_filters["since"] = last_active_in + + if cluster_status: + + if cluster_status.lower() == "down": + # added server_down for BC + cluster_status = "runhouse_daemon_down" + + elif cluster_status.lower() not in ResourceServerStatus.__members__.keys(): + logger.warning( + f"Invalid cluster status provided ({cluster_status}), status filter set to 'Running'." + ) + cluster_status = ResourceServerStatus.running.value + + cluster_filters["cluster_status"] = cluster_status + + return cluster_filters