diff --git a/runhouse/main.py b/runhouse/main.py index 5991dd201..82af9e056 100644 --- a/runhouse/main.py +++ b/runhouse/main.py @@ -30,6 +30,8 @@ from runhouse.constants import ( BULLET_UNICODE, DOUBLE_SPACE_UNICODE, + HOUR, + LAST_ACTIVE_AT_TIMEFRAME, RAY_KILL_CMD, RAY_START_CMD, SERVER_LOGFILE, @@ -657,6 +659,9 @@ def _add_clusters_to_output_table( table = _add_cluster_as_table_row(table, rh_cluster) + # adding reset so the name will be printed nicely in the console. + rh_cluster["Name"] = f'[reset]{rh_cluster.get("Name")}' + def _print_msg_live_sky_clusters(sky_clusters: int): if sky_clusters > 0: @@ -726,6 +731,11 @@ def cluster_list( clusters_to_print = all_clusters if filters_requested else running_clusters + if not filters_requested: + logger.info( + f"runhouse cluster list will display clusters that were active in the last {LAST_ACTIVE_AT_TIMEFRAME/HOUR} hours." + ) + if show_all: # if user requesting all den cluster, limit print only to 50 clusters max. clusters_to_print = clusters_to_print[: (min(len(clusters_to_print), 50))] diff --git a/runhouse/resources/hardware/cluster.py b/runhouse/resources/hardware/cluster.py index c98196cb0..a47b1d155 100644 --- a/runhouse/resources/hardware/cluster.py +++ b/runhouse/resources/hardware/cluster.py @@ -2068,7 +2068,7 @@ def _get_running_and_not_running_clusters(clusters: list): for den_cluster in clusters: # get just name, not full rns address. reset is used so the name will be printed all in white. - cluster_name = f'[reset]{den_cluster.get("name").split("/")[-1]}' + cluster_name = f'{den_cluster.get("name").split("/")[-1]}' cluster_type = den_cluster.get("data").get("resource_subtype") cluster_status = ( den_cluster.get("status") if den_cluster.get("status") else "unknown" diff --git a/tests/test_resources/test_clusters/test_cluster.py b/tests/test_resources/test_clusters/test_cluster.py index 5e421372d..e056976d6 100644 --- a/tests/test_resources/test_clusters/test_cluster.py +++ b/tests/test_resources/test_clusters/test_cluster.py @@ -1,6 +1,9 @@ +import json import os import subprocess import time + +from datetime import datetime, timezone from threading import Thread import pytest @@ -13,8 +16,10 @@ DEFAULT_HTTPS_PORT, DEFAULT_SERVER_PORT, LOCALHOST, + MINUTE, ) +from runhouse.resources.hardware.cluster import Cluster from runhouse.resources.hardware.utils import ResourceServerStatus import tests.test_resources.test_resource @@ -665,7 +670,6 @@ def test_rh_status_stopped(self, cluster): @pytest.mark.level("local") @pytest.mark.clustertest def test_send_status_to_db(self, cluster): - import json status = cluster.status() env_servlet_processes = status.pop("env_servlet_processes") @@ -936,3 +940,287 @@ def test_switch_default_env(self, cluster): # set it back cluster.default_env = test_env cluster.delete(new_env.name) + + #################################################################################################### + # Cluster list test + #################################################################################################### + + # when we pass a cluster fixture + + @pytest.mark.level("local") + @pytest.mark.clustertest + def test_cluster_list_pydentic(self, cluster): + + # on-demand clusters and the tls_exposed cluster are not created as a part of test-org, + # therefore we can check if those clusters are displayed on the users cluster list + if isinstance(cluster, rh.OnDemandCluster) or "tls_exposed" in cluster.name: + + clusters = Cluster.list() + all_clusters = clusters.get("all_clusters", {}) + running_clusters = clusters.get("running_clusters", {}) + assert len(all_clusters) > 0 + assert len(running_clusters) > 0 + assert len(running_clusters) == len( + all_clusters + ) # by default we get only running clusters + + all_clusters_names = [ + den_cluster.get("Name") for den_cluster in all_clusters + ] + running_clusters_names = [ + running_cluster.get("Name") for running_cluster in running_clusters + ] + assert cluster.name in all_clusters_names + assert cluster.name in running_clusters_names + + def set_cluster_status(self, cluster: rh.Cluster, status: ResourceServerStatus): + cluster_uri = rh.globals.rns_client.format_rns_address(cluster.rns_address) + headers = rh.globals.rns_client.request_headers() + api_server_url = rh.globals.rns_client.api_server_url + + # updating the resource collection as well, because the cluster.list() gets the info from the resource + status_data_resource = { + "status": status, + "status_last_checked": datetime.utcnow().isoformat(), + } + requests.put( + f"{api_server_url}/resource/{cluster_uri}", + data=json.dumps(status_data_resource), + headers=headers, + ) + + @pytest.mark.level("local") + @pytest.mark.clustertest + def test_cluster_list_all_pydentic(self, cluster): + + # on-demand clusters and the tls_exposed cluster are not created as a part of test-org, + # therefore we can check if those clusters are displayed on the users cluster list + if isinstance(cluster, rh.OnDemandCluster) or "tls_exposed" in cluster.name: + + # make sure that we at least one terminated cluster for the tests, (does not matter if the status is mocked) + self.set_cluster_status( + cluster=cluster, status=ResourceServerStatus.terminated + ) + + clusters = Cluster.list(show_all=True) + all_clusters = clusters.get("all_clusters", {}) + running_clusters = clusters.get("running_clusters", {}) + assert len(all_clusters) == 200 # den limit + assert len(all_clusters) >= len(running_clusters) + + all_clusters_status = set( + [den_cluster.get("Status") for den_cluster in all_clusters] + ) + + # testing that we don't get just running clusters + assert len(all_clusters) > 1 + + assert ResourceServerStatus.running in all_clusters_status + assert ResourceServerStatus.terminated in all_clusters_status + + current_cluster_info = [ + den_cluster + for den_cluster in all_clusters + if den_cluster.get("Name") == cluster.name + ][0] + assert current_cluster_info.get("Status") == "terminated" + + else: + pytest.skip("can preform this test on test-org clusters") + + @pytest.mark.level("local") + @pytest.mark.clustertest + def test_cluster_list_status_filter_pydentic(self, cluster): + + from runhouse.resources.hardware.utils import ResourceServerStatus + + # on-demand clusters and the tls_exposed cluster are not created as a part of test-org, + # therefore we can check if those clusters are displayed on the users cluster list + if isinstance(cluster, rh.OnDemandCluster) or "tls_exposed" in cluster.name: + + clusters = Cluster.list(status="running") + all_clusters = clusters.get("all_clusters", {}) + running_clusters = clusters.get("running_clusters", {}) + assert len(all_clusters) > 0 + assert len(running_clusters) > 0 + assert len(all_clusters) == len(running_clusters) + + all_clusters_status = set( + [den_cluster.get("Status") for den_cluster in all_clusters] + ) + + supported_statuses_without_running = [ + status + for status in list(ResourceServerStatus.__members__.keys()) + if status != "running" + ] + + for status in supported_statuses_without_running: + assert status not in all_clusters_status + + assert ResourceServerStatus.running in all_clusters_status + + # make sure that we at least one terminated cluster for the tests, (does not matter if the status is mocked) + self.set_cluster_status( + cluster=cluster, status=ResourceServerStatus.terminated + ) + + clusters = Cluster.list(status="terminated") + all_clusters = clusters.get("all_clusters", {}) + running_clusters = clusters.get("running_clusters", {}) + assert len(all_clusters) > 0 + assert len(running_clusters) == 0 + + all_clusters_status = set( + [den_cluster.get("Status") for den_cluster in all_clusters] + ) + + assert "terminated" in all_clusters_status + + else: + pytest.skip("can preform this test on test-org clusters") + + @pytest.mark.level("local") + @pytest.mark.clustertest + def test_cluster_list_since_filter_pydentic(self, cluster): + + # on-demand clusters and the tls_exposed cluster are not created as a part of test-org, + # therefore we can check if those clusters are displayed on the users cluster list + if isinstance(cluster, rh.OnDemandCluster) or "tls_exposed" in cluster.name: + cluster.save() # tls exposed local cluster is not saved by default + + minutes_time_filter = 10 + + # make sure that we at least one terminated cluster for the tests, (does not matter if the status is mocked) + self.set_cluster_status( + cluster=cluster, status=ResourceServerStatus.terminated + ) + + clusters = Cluster.list(since=f"{minutes_time_filter}m") + all_clusters = clusters.get("all_clusters", {}) + running_clusters = clusters.get("running_clusters", {}) + assert len(running_clusters) >= 0 + assert len(all_clusters) > 0 + + clusters_last_active_timestamps = set( + [den_cluster.get("Last Active (UTC)") for den_cluster in all_clusters] + ) + + assert len(clusters_last_active_timestamps) >= 1 + current_utc_time = datetime.utcnow().replace(tzinfo=timezone.utc) + + for timestamp in clusters_last_active_timestamps: + assert ( + current_utc_time - timestamp + ).total_seconds() <= minutes_time_filter * MINUTE + + else: + pytest.skip("can preform this test on test-org clusters") + + @pytest.mark.level("local") + @pytest.mark.clustertest + def test_cluster_list_cmd_output_no_filters(self, capsys): + import re + import subprocess + + env = os.environ.copy() + # Set the COLUMNS and LINES environment variables to control terminal width and height, + # so we could get the runhouse cluster list output properly using subprocess + env["COLUMNS"] = "250" + env["LINES"] = "40" # Set a height value, though COLUMNS is the key one here + + process = subprocess.Popen( + "runhouse cluster list", + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=env, + ) + process.wait() + stdout, stderr = process.communicate() + capsys.readouterr() + cmd_stdout = stdout.decode("utf-8") + + assert cmd_stdout + + # The output is printed as a table. + # testing that the table name is printed correctly + regex = f".*{rh.configs.username}'s Clusters \(Running: .*/.*, Total\s*Displayed: .*\).*" + assert re.search(regex, cmd_stdout) + + # testing that the table column names is printed correctly + col_names = ["┃ Name", "┃ Cluster Type", "┃ Status", "┃ Last Active (UTC)"] + for name in col_names: + assert name in cmd_stdout + assert ( + "runhouse cluster list will display clusters that were active in the last 24.0 hours." + in cmd_stdout + ) + + @pytest.mark.level("local") + @pytest.mark.clustertest + def test_cluster_list_cmd_output_with_filters(self, capsys, cluster): + + import re + import subprocess + + if isinstance(cluster, rh.OnDemandCluster) or "tls_exposed" in cluster.name: + cluster.save() # tls exposed local cluster is not saved by default + + # make sure that we at least one terminated cluster for the tests, (does not matter if the status is mocked) + self.set_cluster_status( + cluster=cluster, status=ResourceServerStatus.terminated + ) + + env = os.environ.copy() + # Set the COLUMNS and LINES environment variables to control terminal width and height, + # so we could get the runhouse cluster list output properly using subprocess + env["COLUMNS"] = "250" + env[ + "LINES" + ] = "40" # Set a height value, though COLUMNS is the key one here + + process = subprocess.Popen( + "runhouse cluster list --status terminated", + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=env, + ) + process.wait() + stdout, stderr = process.communicate() + capsys.readouterr() + cmd_stdout = stdout.decode("utf-8") + assert cmd_stdout + + # The output is printed as a table. + # testing that the table name is printed correctly + + regex = f".*{rh.configs.username}'s Clusters \(Running: .*/.*, Total\s*Displayed: .*\).*" + assert re.search(regex, cmd_stdout) + + # testing that the table column names is printed correctly + displayed_info_names = [ + "┃ Name", + "┃ Cluster Type", + "┃ Status", + "┃ Last Active (UTC)", + ] + for name in displayed_info_names: + assert name in cmd_stdout + + assert ( + "runhouse cluster list will display clusters that were active in the last 24.0 hours." + not in cmd_stdout + ) + + # Removing 'Running' which appearing in the title of the output, + # so we could test the no clusters with status 'Running' is printed + cmd_stdout = cmd_stdout.replace("Running", "") + assert "Terminated" in cmd_stdout + + statues = list(ResourceServerStatus.__members__.keys()) + statues.remove("terminated") + + for status in statues: + assert status not in cmd_stdout