From 15b29f8621d421c2050c4f0cc81e198906ab0bfe Mon Sep 17 00:00:00 2001 From: Alexandra Belousov Date: Wed, 4 Sep 2024 17:31:26 +0300 Subject: [PATCH] cluster list filters --- runhouse/constants.py | 4 + runhouse/main.py | 220 +++++++++++-------------- runhouse/resources/hardware/cluster.py | 199 ++++++++++++++++++++++ 3 files changed, 301 insertions(+), 122 deletions(-) diff --git a/runhouse/constants.py b/runhouse/constants.py index 0ed3b35b7..eba582dc9 100644 --- a/runhouse/constants.py +++ b/runhouse/constants.py @@ -80,6 +80,7 @@ SECOND = 1 MINUTE = 60 HOUR = 3600 +DAY = HOUR * 24 DEFAULT_STATUS_CHECK_INTERVAL = 1 * MINUTE INCREASED_STATUS_CHECK_INTERVAL = 1 * HOUR GPU_COLLECTION_INTERVAL = 5 * SECOND @@ -104,3 +105,6 @@ # Constants runhouse cluster list LAST_ACTIVE_AT_TIMEFRAME = 24 * HOUR + +# in seconds +TIME_UNITS = {"s": SECOND, "m": MINUTE, "h": HOUR, "d": DAY} diff --git a/runhouse/main.py b/runhouse/main.py index 664409bc8..84022c955 100644 --- a/runhouse/main.py +++ b/runhouse/main.py @@ -1,4 +1,3 @@ -import asyncio import datetime import importlib import logging @@ -10,8 +9,6 @@ from pathlib import Path from typing import Any, Dict, List, Optional -import httpx - import ray import requests @@ -38,6 +35,7 @@ SERVER_STOP_CMD, START_NOHUP_CMD, START_SCREEN_CMD, + TIME_UNITS, ) from runhouse.globals import rns_client from runhouse.logger import get_logger @@ -595,27 +593,94 @@ def status( _print_status(cluster_status, current_cluster) -async def aget_clusters_from_den(): - httpx_client = httpx.AsyncClient() +def _create_output_table(total_clusters: int, running_clusters: int): + table_title = f"[bold cyan]{rns_client.username}'s Clusters (Running: {running_clusters}, Total: {total_clusters})[/bold cyan]" + table = Table(title=table_title) + + # Add columns to the table + table.add_column("Name", justify="left", no_wrap=True) + table.add_column("Cluster Type", justify="left", no_wrap=True) + table.add_column("Status", justify="left") + table.add_column("Last Active (UTC)", justify="left") + + return table + - get_clusters_params = {"resource_type": "cluster", "folder": rns_client.username} - clusters_in_den_resp = await httpx_client.get( - f"{rns_client.api_server_url}/resource", - params=get_clusters_params, - headers=rns_client.request_headers(), +def _add_cluster_as_table_row(table: Table, rh_cluster: dict): + last_active = rh_cluster.get("Last Active (UTC)") + last_active = last_active if last_active != "1970-01-01 00:00:00" else "Unknown" + table.add_row( + rh_cluster.get("Name"), + rh_cluster.get("Cluster Type"), + rh_cluster.get("Status"), + last_active, ) - return clusters_in_den_resp + return table + + +def _add_clusters_to_output_table( + table: Table, clusters: list[Dict], filters_requested: dict = None +): + + for rh_cluster in clusters: + last_active_at = rh_cluster.get("Last Active (UTC)") + last_active_at_no_offset = str(last_active_at).split("+")[ + 0 + ] # The split is required to remove the offset (according to UTC) + rh_cluster["Last Active (UTC)"] = last_active_at_no_offset + rh_cluster["Status"] = get_status_color(rh_cluster.get("Status")) + + if filters_requested: + table = _add_cluster_as_table_row(table, rh_cluster) + else: + # Default behaviour: Print Running clusters that were active it the last 24 hours + now = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc) + if (now - last_active_at).total_seconds() <= LAST_ACTIVE_AT_TIMEFRAME: + table = _add_cluster_as_table_row(table, rh_cluster) + + +def _print_msg_live_sky_clusters(live_clusters_not_in_den: int): + if live_clusters_not_in_den > 0: + live_sky_clusters_str = ( + f"There are {live_clusters_not_in_den} live clusters that are not saved in Den." + if live_clusters_not_in_den > 1 + else "There is 1 live cluster that is not saved in Den." + ) + + clusters_pronoun = "them" if live_clusters_not_in_den > 1 else "it" + + console.print( + f"{live_sky_clusters_str} To get information about {clusters_pronoun}, please run [bold italic]sky status -r[/bold italic]." + ) -def get_clusters_from_den(): - return asyncio.run(aget_clusters_from_den()) +def supported_time_units(): + return ", ".join(list(TIME_UNITS.keys())) @cluster_app.command("list") -def cluster_list(): +def cluster_list( + get_all_clusters: bool = typer.Option( + False, + "--a", + "-a", + help="Get all den clusters. Up to 50 most recently active clusters will be displayed.", + ), + since: Optional[str] = typer.Option( + None, + "--since", + help="Time duration to filter on. Minimum time to filter on is 1 minute. If the provided value will be less " + "than that, the filter will be set to 1 minute by default. You may filter by seconds (s), " + "minutes (m), hours (h) or days (s). Examples: 30s, 15m, 2h, 3d.", + ), + cluster_status: Optional[str] = typer.Option( + None, + "--status", + help="Cluster status to filter on. Supported filter values: running, terminated, runhouse_daemon_down.", + ), +): """Load Runhouse clusters""" - import sky # logged out case if not rh.configs.token: @@ -626,123 +691,34 @@ def cluster_list(): ) return - on_demand_clusters_sky = sky.status(refresh=True) - - clusters_in_den_resp = get_clusters_from_den() - - if clusters_in_den_resp.status_code != 200: - logger.error(f"Failed to load {rns_client.username}'s clusters from Den") - clusters_in_den = [] - else: - clusters_in_den = clusters_in_den_resp.json().get("data") - - clusters_in_den_names = [cluster.get("name") for cluster in clusters_in_den] - - if not on_demand_clusters_sky and not clusters_in_den: - console.print("No existing clusters.") - - if on_demand_clusters_sky: - # getting the on-demand clusters that are not saved in den. - on_demand_clusters_sky = [ - cluster - for cluster in on_demand_clusters_sky - if f'/{rns_client.username}/{cluster.get("name")}' - not in clusters_in_den_names - ] - - running_clusters = [] - not_running_clusters = [] - - for den_cluster in clusters_in_den: - # get just name, not full rns address. reset is used so the name will be printed all in white. - cluster_name = f'[reset]{den_cluster.get("name").split("/")[-1]}' - cluster_type = den_cluster.get("data").get("resource_subtype") - cluster_status = ( - den_cluster.get("status") if den_cluster.get("status") else "unknown" - ) - - # currently relying on status pings to den as a sign of cluster activity. - # The split is required to remove milliseconds and the offset (according to UTC) from the timestamp. - # (status_last_checked is in the following format: YYYY-MM-DD HH:MM:SS.ssssss±HH:MM) - - last_active_at = den_cluster.get("status_last_checked") - last_active_at = ( - datetime.datetime.fromisoformat(last_active_at.split(".")[0]) - if isinstance(last_active_at, str) - else datetime.datetime(1970, 1, 1) - ) # Convert to datetime - last_active_at = last_active_at.replace(tzinfo=datetime.timezone.utc) - - if cluster_status == "running" and not last_active_at: - # For BC, in case there are clusters that were saved and created before we introduced sending cluster status to den. - cluster_status = "unknown" - - cluster_info = { - "Name": cluster_name, - "Cluster Type": cluster_type, - "Status": cluster_status, - "Last Active (UTC)": last_active_at, - } - running_clusters.append( - cluster_info - ) if cluster_status == "running" else not_running_clusters.append(cluster_info) - - # TODO: will be used if we'll need to print not-running clusters - # Sort not-running clusters by the 'Status' column - # not_running_clusters = sorted(not_running_clusters, key=lambda x: x["Status"]) - - # Sort clusters by the 'Last Active (UTC)' column - not_running_clusters = sorted( - not_running_clusters, key=lambda x: x["Last Active (UTC)"], reverse=True - ) - running_clusters = sorted( - running_clusters, key=lambda x: x["Last Active (UTC)"], reverse=True + all_clusters, running_clusters, sky_live_clusters = Cluster.list( + get_all_clusters=get_all_clusters, since=since, status=cluster_status ) + if not all_clusters: + console.print("No existing clusters.") # creating the clusters table - total_clusters = len(clusters_in_den) - table_title = f"[bold cyan]{rns_client.username}'s Clusters (Running: {len(running_clusters)}, Total: {total_clusters})[/bold cyan]" - table = Table(title=table_title) + table = _create_output_table( + total_clusters=len(all_clusters), running_clusters=len(running_clusters) + ) - # Add columns to the table - table.add_column("Name", justify="left", no_wrap=True) - table.add_column("Cluster Type", justify="center", no_wrap=True) - table.add_column("Status", justify="left") - table.add_column("Last Active (UTC)", justify="left") + filters_requested: bool = ( + get_all_clusters or since or cluster_status or get_all_clusters + ) - # TODO: will be used if we'll need to print not-running clusters - # all_clusters = running_clusters + not_running_clusters + clusters_to_print = all_clusters if filters_requested else running_clusters - for rh_cluster in running_clusters: - last_active_at = rh_cluster.get("Last Active (UTC)") + if get_all_clusters: + # if user requesting all den cluster, limit print only to 50 clusters max. + clusters_to_print = clusters_to_print[: (min(len(clusters_to_print), 50))] - # Print Running clusters that were active it the last 24 hours - now = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc) - if (now - last_active_at).total_seconds() <= LAST_ACTIVE_AT_TIMEFRAME: - table.add_row( - rh_cluster.get("Name"), - rh_cluster.get("Cluster Type"), - get_status_color(rh_cluster.get("Status")), - str(last_active_at).split("+")[ - 0 - ], # The split is required to remove the offset (according to UTC) - ) + _add_clusters_to_output_table( + table=table, clusters=clusters_to_print, filters_requested=filters_requested + ) console.print(table) - live_clusters_not_in_den = len(on_demand_clusters_sky) - if live_clusters_not_in_den > 0: - live_sky_clusters_str = ( - f"There are {live_clusters_not_in_den} live clusters that are not saved in Den." - if live_clusters_not_in_den > 1 - else "There is 1 live cluster that is not saved in Den." - ) - - clusters_pronoun = "them" if live_clusters_not_in_den > 1 else "it" - - console.print( - f"{live_sky_clusters_str} To get information about {clusters_pronoun}, please run [bold italic]sky status -r[/bold italic]." - ) + _print_msg_live_sky_clusters(live_clusters_not_in_den=len(sky_live_clusters)) # Register the 'cluster' command group with the main runhouse application diff --git a/runhouse/resources/hardware/cluster.py b/runhouse/resources/hardware/cluster.py index 77c20c8c7..fec17c08b 100644 --- a/runhouse/resources/hardware/cluster.py +++ b/runhouse/resources/hardware/cluster.py @@ -1,5 +1,7 @@ +import asyncio import contextlib import copy +import datetime import importlib import json import os @@ -10,10 +12,14 @@ from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Union +import httpx + import requests import yaml +from runhouse.resources.hardware.utils import ResourceServerStatus + from runhouse.rns.utils.api import ResourceAccess, ResourceVisibility from runhouse.servers.http.certs import TLSCertConfig from runhouse.utils import ( @@ -41,6 +47,7 @@ LOCALHOST, NUM_PORTS_TO_TRY, RESERVED_SYSTEM_NAMES, + TIME_UNITS, ) from runhouse.globals import configs, obj_store, rns_client from runhouse.logger import get_logger @@ -1957,3 +1964,195 @@ def _folder_mv( def _folder_exists(self, path: Union[str, Path]): return self.client.folder_exists(path=path) + + ################################### + # Cluster list helping methods + ################################### + + @staticmethod + def _parse_time_duration(duration: str): + # A simple parser for duration like "15m", "2h", "3d" + unit = duration[-1] + value = int(duration[:-1]) + + # if the user provides a "--since" time filter than is less than a minute, set it by default to one minute. + if unit == "s" and value < 60: + value = 60 + + time_duration = value * TIME_UNITS.get(unit, 0) + + if time_duration == 0 and value != 0: + logger.warning( + f"Filter is not applied, invalid time unit provided ({unit})." + ) + return + + return time_duration + + @staticmethod + def _parse_filters(since: str, cluster_status: str): + cluster_filters = {} + + if since: + last_active_in: int = Cluster._parse_time_duration( + duration=since + ) # return in representing the "since" filter in seconds + if last_active_in: + cluster_filters["last_active"] = last_active_in + + if cluster_status: + if cluster_status.lower() not in ResourceServerStatus.__members__.keys(): + logger.warning( + f"Filter is not applied, invalid cluster status provided ({cluster_status})." + ) + else: + cluster_filters["cluster_status"] = cluster_status + + return cluster_filters + + @staticmethod + async def aget_clusters_from_den(cluster_filters: dict): + httpx_client = httpx.AsyncClient() + + get_clusters_params = { + "resource_type": "cluster", + "folder": rns_client.username, + } + + # send den request with filters if the user specifies filters. + # If "all" filter is specified - get all clusters (no filters are added to get_clusters_params) + if cluster_filters and "all" not in cluster_filters.keys(): + get_clusters_params.update(cluster_filters) + + # If not filters are specified, get only running clusters. + elif not cluster_filters: + get_clusters_params.update({"cluster_status": "running"}) + + clusters_in_den_resp = await httpx_client.get( + f"{rns_client.api_server_url}/resource", + params=get_clusters_params, + headers=rns_client.request_headers(), + ) + + return clusters_in_den_resp + + @staticmethod + def get_clusters_from_den(cluster_filters: dict): + return asyncio.run( + Cluster.aget_clusters_from_den(cluster_filters=cluster_filters) + ) + + @staticmethod + def _get_unsaved_live_clusters(den_clusters: list[Dict], sky_live_clusters: list): + den_clusters_names = [c.get("name") for c in den_clusters] + + # getting the on-demand clusters that are not saved in den. + if sky_live_clusters: + return [ + cluster + for cluster in sky_live_clusters + if f'/{rns_client.username}/{cluster.get("name")}' + not in den_clusters_names + ] + else: + return [] + + @staticmethod + def _get_running_and_not_running_clusters(clusters: list): + running_clusters, not_running_clusters = [], [] + + for den_cluster in clusters: + # get just name, not full rns address. reset is used so the name will be printed all in white. + cluster_name = f'[reset]{den_cluster.get("name").split("/")[-1]}' + cluster_type = den_cluster.get("data").get("resource_subtype") + cluster_status = ( + den_cluster.get("status") if den_cluster.get("status") else "unknown" + ) + + # currently relying on status pings to den as a sign of cluster activity. + # The split is required to remove milliseconds and the offset (according to UTC) from the timestamp. + # (status_last_checked is in the following format: YYYY-MM-DD HH:MM:SS.ssssss±HH:MM) + + last_active_at = den_cluster.get("status_last_checked") + last_active_at = ( + datetime.datetime.fromisoformat(last_active_at.split(".")[0]) + if isinstance(last_active_at, str) + else datetime.datetime(1970, 1, 1) + ) # Convert to datetime + last_active_at = last_active_at.replace(tzinfo=datetime.timezone.utc) + + if cluster_status == "running" and not last_active_at: + # For BC, in case there are clusters that were saved and created before we introduced sending cluster status to den. + cluster_status = "unknown" + + cluster_info = { + "Name": cluster_name, + "Cluster Type": cluster_type, + "Status": cluster_status, + "Last Active (UTC)": last_active_at, + } + running_clusters.append( + cluster_info + ) if cluster_status == "running" else not_running_clusters.append( + cluster_info + ) + + # TODO: will be used if we'll need to print not-running clusters + + # Sort clusters by the 'Last Active (UTC)' and 'Status' column + not_running_clusters = sorted( + not_running_clusters, + key=lambda x: (x["Last Active (UTC)"], x["Status"]), + reverse=True, + ) + running_clusters = sorted( + running_clusters, key=lambda x: x["Last Active (UTC)"], reverse=True + ) + + return running_clusters, not_running_clusters + + ############################### + # Cluster list + ############################### + @staticmethod + def list( + get_all_clusters: Optional[bool] = False, + since: Optional[bool] = None, + status: Optional[Union[str, ResourceServerStatus]] = None, + ): + + import sky + + cluster_filters = ( + Cluster._parse_filters(since=since, cluster_status=status) + if not get_all_clusters + else {"all": "all"} + ) + + # get clusters from den + den_clusters_resp = Cluster.get_clusters_from_den( + cluster_filters=cluster_filters + ) + if den_clusters_resp.status_code != 200: + logger.error(f"Failed to load {rns_client.username}'s clusters from Den") + den_clusters = [] + else: + den_clusters = den_clusters_resp.json().get("data") + + # get sky live clusters + sky_live_clusters = sky.status(refresh=True) + + if not sky_live_clusters and not den_clusters: + return [], [], [] + + sky_live_clusters = Cluster._get_unsaved_live_clusters( + den_clusters=den_clusters, sky_live_clusters=sky_live_clusters + ) + + ( + running_clusters, + not_running_clusters, + ) = Cluster._get_running_and_not_running_clusters(clusters=den_clusters) + all_clusters = running_clusters + not_running_clusters + + return all_clusters, running_clusters, sky_live_clusters