From f2e23d3ad70836a9a70bb4efe97b22d54e7f31d0 Mon Sep 17 00:00:00 2001 From: Alexandra Belousov Date: Mon, 9 Sep 2024 14:08:38 +0300 Subject: [PATCH] cluster list tests --- runhouse/main.py | 5 +- runhouse/resources/hardware/cluster.py | 9 +- .../test_clusters/test_cluster.py | 249 +++++++++++++++++- tests/utils.py | 22 ++ 4 files changed, 279 insertions(+), 6 deletions(-) diff --git a/runhouse/main.py b/runhouse/main.py index 5febb5e48..6106203cd 100644 --- a/runhouse/main.py +++ b/runhouse/main.py @@ -608,7 +608,7 @@ def _create_output_table( table = Table(title=table_title) if not filters_requested: - table.caption = f"[reset]Showing clusters that were active in the last {LAST_ACTIVE_AT_TIMEFRAME / HOUR} hours." + table.caption = f"[reset]Note: the above clusters have registered activity in the last {int(LAST_ACTIVE_AT_TIMEFRAME / HOUR)} hours." table.caption_justify = "left" # Add columns to the table @@ -646,6 +646,9 @@ def _add_clusters_to_output_table( table = _add_cluster_as_table_row(table, rh_cluster) + # adding reset so the name will be printed nicely in the console. + rh_cluster["Name"] = f'[reset]{rh_cluster.get("Name")}' + def _print_msg_live_sky_clusters(sky_clusters: int): if sky_clusters > 0: diff --git a/runhouse/resources/hardware/cluster.py b/runhouse/resources/hardware/cluster.py index 66d68553f..ba406ea51 100644 --- a/runhouse/resources/hardware/cluster.py +++ b/runhouse/resources/hardware/cluster.py @@ -1942,10 +1942,10 @@ def _folder_exists(self, path: Union[str, Path]): return self.client.folder_exists(path=path) @staticmethod - def get_clusters_from_den(cluster_filters: dict): + def get_clusters_from_den(cluster_filters: dict, org_name: Optional[str] = None): get_clusters_params = { "resource_type": "cluster", - "folder": rns_client.username, + "folder": org_name if org_name else rns_client.username, } # send den request with filters if the user specifies filters. @@ -1988,7 +1988,7 @@ def _get_running_and_not_running_clusters(clusters: list): for den_cluster in clusters: # get just name, not full rns address. reset is used so the name will be printed all in white. - cluster_name = f'[reset]{den_cluster.get("name").split("/")[-1]}' + cluster_name = f'{den_cluster.get("name").split("/")[-1]}' cluster_type = den_cluster.get("data").get("resource_subtype") cluster_status = ( den_cluster.get("status") if den_cluster.get("status") else "unknown" @@ -2045,6 +2045,7 @@ def list( show_all: Optional[bool] = False, since: Optional[str] = None, status: Optional[Union[str, ResourceServerStatus]] = None, + org_name: Optional[str] = None, ): """ Returns user's runhouse clusters saved in Den. If filters are provided, only clusters that are matching the @@ -2066,7 +2067,7 @@ def list( # get clusters from den den_clusters_resp = Cluster.get_clusters_from_den( - cluster_filters=cluster_filters + cluster_filters=cluster_filters, org_name=org_name ) if den_clusters_resp.status_code != 200: logger.error(f"Failed to load {rns_client.username}'s clusters from Den") diff --git a/tests/test_resources/test_clusters/test_cluster.py b/tests/test_resources/test_clusters/test_cluster.py index 5e421372d..1e8be6a63 100644 --- a/tests/test_resources/test_clusters/test_cluster.py +++ b/tests/test_resources/test_clusters/test_cluster.py @@ -1,6 +1,9 @@ +import json import os import subprocess import time + +from datetime import datetime, timezone from threading import Thread import pytest @@ -13,8 +16,10 @@ DEFAULT_HTTPS_PORT, DEFAULT_SERVER_PORT, LOCALHOST, + MINUTE, ) +from runhouse.resources.hardware.cluster import Cluster from runhouse.resources.hardware.utils import ResourceServerStatus import tests.test_resources.test_resource @@ -25,6 +30,7 @@ friend_account_in_org, get_random_str, remove_config_keys, + set_cluster_status, ) """ TODO: @@ -665,7 +671,6 @@ def test_rh_status_stopped(self, cluster): @pytest.mark.level("local") @pytest.mark.clustertest def test_send_status_to_db(self, cluster): - import json status = cluster.status() env_servlet_processes = status.pop("env_servlet_processes") @@ -936,3 +941,245 @@ def test_switch_default_env(self, cluster): # set it back cluster.default_env = test_env cluster.delete(new_env.name) + + #################################################################################################### + # Cluster list test + #################################################################################################### + + # when we pass a cluster fixture + + @pytest.mark.level("local") + @pytest.mark.clustertest + def test_cluster_list_pythonic(self, cluster): + cluster.save() + with friend_account_in_org(): + org_name = None if "test_org_member" in cluster.rns_address else "test-org" + clusters = Cluster.list(org_name=org_name) + all_clusters = clusters.get("all_clusters", {}) + running_clusters = clusters.get("running_clusters", {}) + assert len(all_clusters) > 0 + assert len(running_clusters) > 0 + assert len(running_clusters) == len( + all_clusters + ) # by default we get only running clusters + + all_clusters_names = [den_cluster.get("Name") for den_cluster in all_clusters] + running_clusters_names = [ + running_cluster.get("Name") for running_cluster in running_clusters + ] + assert cluster.name in running_clusters_names + assert cluster.name in all_clusters_names + + @pytest.mark.level("local") + @pytest.mark.clustertest + def test_cluster_list_all_pythonic(self, cluster): + + # make sure that we at least one terminated cluster for the tests, (does not matter if the status is mocked) + set_cluster_status(cluster=cluster, status=ResourceServerStatus.terminated) + + with friend_account_in_org(): + org_name = None if "test_org_member" in cluster.rns_address else "test-org" + clusters = Cluster.list(org_name=org_name, show_all=True) + all_clusters = clusters.get("all_clusters", {}) + running_clusters = clusters.get("running_clusters", {}) + assert 0 <= len(all_clusters) <= 200 # den limit + assert len(all_clusters) >= len(running_clusters) + + all_clusters_status = set( + [den_cluster.get("Status") for den_cluster in all_clusters] + ) + + # testing that we don't get just running clusters + assert len(all_clusters) > 1 + + assert ResourceServerStatus.running in all_clusters_status + assert ResourceServerStatus.terminated in all_clusters_status + + current_cluster_info = [ + den_cluster + for den_cluster in all_clusters + if den_cluster.get("Name") == cluster.name + ][0] + assert current_cluster_info.get("Status") == "terminated" + + @pytest.mark.level("local") + @pytest.mark.clustertest + def test_cluster_list_status_filter_pythonic(self, cluster): + + from runhouse.resources.hardware.utils import ResourceServerStatus + + with friend_account_in_org(): + org_name = None if "test_org_member" in cluster.rns_address else "test-org" + clusters = Cluster.list(status="running", org_name=org_name) + all_clusters = clusters.get("all_clusters", {}) + running_clusters = clusters.get("running_clusters", {}) + assert len(all_clusters) > 0 + assert len(running_clusters) > 0 + assert len(all_clusters) == len(running_clusters) + + all_clusters_status = set( + [den_cluster.get("Status") for den_cluster in all_clusters] + ) + + supported_statuses_without_running = [ + status + for status in list(ResourceServerStatus.__members__.keys()) + if status != "running" + ] + + for status in supported_statuses_without_running: + assert status not in all_clusters_status + + assert ResourceServerStatus.running in all_clusters_status + + # make sure that we at least one terminated cluster for the tests, (does not matter if the status is mocked) + set_cluster_status(cluster=cluster, status=ResourceServerStatus.terminated) + + with friend_account_in_org(): + org_name = None if "test_org_member" in cluster.rns_address else "test-org" + clusters = Cluster.list(status="terminated", org_name=org_name) + all_clusters = clusters.get("all_clusters", {}) + running_clusters = clusters.get("running_clusters", {}) + assert len(all_clusters) > 0 + assert len(running_clusters) == 0 + + all_clusters_status = set( + [den_cluster.get("Status") for den_cluster in all_clusters] + ) + + assert "terminated" in all_clusters_status + + @pytest.mark.level("local") + @pytest.mark.clustertest + def test_cluster_list_since_filter_pythonic(self, cluster): + cluster.save() # tls exposed local cluster is not saved by default + minutes_time_filter = 10 + + # make sure that we at least one terminated cluster for the tests, (does not matter if the status is mocked) + set_cluster_status(cluster=cluster, status=ResourceServerStatus.terminated) + + with friend_account_in_org(): + org_name = None if "test_org_member" in cluster.rns_address else "test-org" + clusters = Cluster.list(since=f"{minutes_time_filter}m", org_name=org_name) + all_clusters = clusters.get("all_clusters", {}) + running_clusters = clusters.get("running_clusters", {}) + assert len(running_clusters) >= 0 + assert len(all_clusters) > 0 + + clusters_last_active_timestamps = set( + [den_cluster.get("Last Active (UTC)") for den_cluster in all_clusters] + ) + + assert len(clusters_last_active_timestamps) >= 1 + current_utc_time = datetime.utcnow().replace(tzinfo=timezone.utc) + + for timestamp in clusters_last_active_timestamps: + assert ( + current_utc_time - timestamp + ).total_seconds() <= minutes_time_filter * MINUTE + + @pytest.mark.level("local") + @pytest.mark.clustertest + def test_cluster_list_cmd_output_no_filters(self, capsys): + import re + import subprocess + + env = os.environ.copy() + # Set the COLUMNS and LINES environment variables to control terminal width and height, + # so we could get the runhouse cluster list output properly using subprocess + env["COLUMNS"] = "250" + env["LINES"] = "40" # Set a height value, though COLUMNS is the key one here + + process = subprocess.Popen( + "runhouse cluster list", + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=env, + ) + process.wait() + stdout, stderr = process.communicate() + capsys.readouterr() + cmd_stdout = stdout.decode("utf-8") + + assert cmd_stdout + + # The output is printed as a table. + # testing that the table name is printed correctly + regex = f".*Clusters for {rh.configs.username}.*\(Running: \d+/\d+, Total Displayed: \d+/\d+\).*" + assert re.search(regex, cmd_stdout) + + # testing that the table column names is printed correctly + col_names = ["┃ Name", "┃ Cluster Type", "┃ Status", "┃ Last Active (UTC)"] + for name in col_names: + assert name in cmd_stdout + assert ( + "Note: the above clusters have registered activity in the last 24 hours." + in cmd_stdout + ) + + @pytest.mark.level("local") + @pytest.mark.clustertest + def test_cluster_list_cmd_output_with_filters(self, capsys, cluster): + + import re + import subprocess + + cluster.save() # tls exposed local cluster is not saved by default + + # make sure that we at least one terminated cluster for the tests, (does not matter if the status is mocked) + set_cluster_status(cluster=cluster, status=ResourceServerStatus.terminated) + + env = os.environ.copy() + # Set the COLUMNS and LINES environment variables to control terminal width and height, + # so we could get the runhouse cluster list output properly using subprocess + env["COLUMNS"] = "250" + env["LINES"] = "40" # Set a height value, though COLUMNS is the key one here + + filter_by_org = ( + "--org-name test-org" if "test-org" in cluster.rns_address else "" + ) + process = subprocess.Popen( + f"runhouse cluster list --status terminated {filter_by_org}", + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=env, + ) + process.wait() + stdout, stderr = process.communicate() + capsys.readouterr() + cmd_stdout = stdout.decode("utf-8") + assert cmd_stdout + + # The output is printed as a table. + # testing that the table name is printed correctly + + regex = f".*Clusters for {rh.configs.username}.*\(Running: \d+/\d+, Total Displayed: \d+/\d+\).*" + assert re.search(regex, cmd_stdout) + + # testing that the table column names is printed correctly + displayed_info_names = [ + "┃ Name", + "┃ Cluster Type", + "┃ Status", + "┃ Last Active (UTC)", + ] + for name in displayed_info_names: + assert name in cmd_stdout + + assert ( + "Note: the above clusters have registered activity in the last 24 hours." + not in cmd_stdout + ) + + # Removing 'Running' which appearing in the title of the output, + # so we could test the no clusters with status 'Running' is printed + cmd_stdout = cmd_stdout.replace("Running", "") + assert "Terminated" in cmd_stdout + + statues = list(ResourceServerStatus.__members__.keys()) + statues.remove("terminated") + + for status in statues: + assert status not in cmd_stdout diff --git a/tests/utils.py b/tests/utils.py index bb9cecbe5..685065b56 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -1,16 +1,21 @@ import contextlib import importlib +import json import os import uuid +from datetime import datetime from pathlib import Path import pytest +import requests import runhouse as rh import yaml from runhouse.constants import TESTING_LOG_LEVEL from runhouse.globals import rns_client + +from runhouse.resources.hardware.utils import ResourceServerStatus from runhouse.servers.obj_store import get_cluster_servlet, ObjStore, RaySetupOption @@ -132,3 +137,20 @@ def remove_config_keys(config, keys_to_skip): for key in keys_to_skip: config.pop(key, None) return config + + +def set_cluster_status(cluster: rh.Cluster, status: ResourceServerStatus): + cluster_uri = rh.globals.rns_client.format_rns_address(cluster.rns_address) + headers = rh.globals.rns_client.request_headers() + api_server_url = rh.globals.rns_client.api_server_url + + # updating the resource collection as well, because the cluster.list() gets the info from the resource + status_data_resource = { + "status": status, + "status_last_checked": datetime.utcnow().isoformat(), + } + requests.put( + f"{api_server_url}/resource/{cluster_uri}", + data=json.dumps(status_data_resource), + headers=headers, + )