Skip to content

Commit

Permalink
cluster list tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexandra Belousov authored and Alexandra Belousov committed Sep 12, 2024
1 parent e3195ba commit c91f406
Show file tree
Hide file tree
Showing 3 changed files with 285 additions and 9 deletions.
23 changes: 16 additions & 7 deletions runhouse/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from runhouse.constants import (
BULLET_UNICODE,
DOUBLE_SPACE_UNICODE,
HOUR,
LAST_ACTIVE_AT_TIMEFRAME,
RAY_KILL_CMD,
RAY_START_CMD,
Expand Down Expand Up @@ -593,9 +594,9 @@ def status(
_print_status(cluster_status, current_cluster)


def _create_output_table(total_clusters: int, running_clusters: int):
table_title = f"[bold cyan]{rns_client.username}'s Clusters (Running: {running_clusters}, Total: {total_clusters})[/bold cyan]"
table = Table(title=table_title)
def _create_output_table():

table = Table()

# Add columns to the table
table.add_column("Name", justify="left", no_wrap=True)
Expand Down Expand Up @@ -631,10 +632,14 @@ def _add_clusters_to_output_table(
rh_cluster["Last Active (UTC)"] = last_active_at_no_offset
rh_cluster["Status"] = get_status_color(rh_cluster.get("Status"))

# adding reset so the name will be printed nicely in the console.
rh_cluster["Name"] = f'[reset]{rh_cluster.get("Name")}'

if filters_requested:
table = _add_cluster_as_table_row(table, rh_cluster)
else:
# Default behaviour: Print Running clusters that were active it the last 24 hours
# Default behaviour: Print Running clusters that were active it the last LAST_ACTIVE_AT_TIMEFRAME timestamp.
# Currently, LAST_ACTIVE_AT_TIMEFRAME = 24 hours.
now = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc)
if (now - last_active_at).total_seconds() <= LAST_ACTIVE_AT_TIMEFRAME:
table = _add_cluster_as_table_row(table, rh_cluster)
Expand Down Expand Up @@ -698,16 +703,19 @@ def cluster_list(
console.print("No existing clusters.")

# creating the clusters table
table = _create_output_table(
total_clusters=len(all_clusters), running_clusters=len(running_clusters)
)
table = _create_output_table()

filters_requested: bool = (
get_all_clusters or since or cluster_status or get_all_clusters
)

clusters_to_print = all_clusters if filters_requested else running_clusters

if not filters_requested:
logger.info(
f"runhouse cluster list will display clusters that were active in the last {LAST_ACTIVE_AT_TIMEFRAME/HOUR} hours."
)

if get_all_clusters:
# if user requesting all den cluster, limit print only to 50 clusters max.
clusters_to_print = clusters_to_print[: (min(len(clusters_to_print), 50))]
Expand All @@ -716,6 +724,7 @@ def cluster_list(
table=table, clusters=clusters_to_print, filters_requested=filters_requested
)

table.title = f"[bold cyan]{rns_client.username}'s Clusters (Running: {len(running_clusters)}/{len(all_clusters)}, Total displayed: {len(all_clusters)})[/bold cyan]"
console.print(table)

_print_msg_live_sky_clusters(live_clusters_not_in_den=len(sky_live_clusters))
Expand Down
2 changes: 1 addition & 1 deletion runhouse/resources/hardware/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2068,7 +2068,7 @@ def _get_running_and_not_running_clusters(clusters: list):

for den_cluster in clusters:
# get just name, not full rns address. reset is used so the name will be printed all in white.
cluster_name = f'[reset]{den_cluster.get("name").split("/")[-1]}'
cluster_name = f'{den_cluster.get("name").split("/")[-1]}'
cluster_type = den_cluster.get("data").get("resource_subtype")
cluster_status = (
den_cluster.get("status") if den_cluster.get("status") else "unknown"
Expand Down
269 changes: 268 additions & 1 deletion tests/test_resources/test_clusters/test_cluster.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import json
import os
import subprocess
import time

from datetime import datetime, timezone
from threading import Thread

import pytest
Expand All @@ -13,8 +16,10 @@
DEFAULT_HTTPS_PORT,
DEFAULT_SERVER_PORT,
LOCALHOST,
MINUTE,
)

from runhouse.resources.hardware.cluster import Cluster
from runhouse.resources.hardware.utils import ResourceServerStatus

import tests.test_resources.test_resource
Expand Down Expand Up @@ -660,7 +665,6 @@ def test_rh_status_stopped(self, cluster):
@pytest.mark.level("local")
@pytest.mark.clustertest
def test_send_status_to_db(self, cluster):
import json

status = cluster.status()
env_servlet_processes = status.pop("env_servlet_processes")
Expand Down Expand Up @@ -931,3 +935,266 @@ def test_switch_default_env(self, cluster):
# set it back
cluster.default_env = test_env
cluster.delete(new_env.name)

####################################################################################################
# Cluster list test
####################################################################################################

# when we pass a cluster fixture

@pytest.mark.level("local")
@pytest.mark.clustertest
def test_cluster_list_pydentic(self, cluster):

# on-demand clusters and the tls_exposed cluster are not created as a part of test-org,
# therefore we can check if those clusters are displayed on the users cluster list
if isinstance(cluster, rh.OnDemandCluster) or "tls_exposed" in cluster.name:

all_clusters, running_clusters, _ = Cluster.list()
assert len(all_clusters) > 0
assert len(running_clusters) > 0
assert len(running_clusters) == len(
all_clusters
) # by default we get only running clusters

all_clusters_names = [
den_cluster.get("Name") for den_cluster in all_clusters
]
running_clusters_names = [
running_cluster.get("Name") for running_cluster in running_clusters
]
assert cluster.name in all_clusters_names
assert cluster.name in running_clusters_names

def set_cluster_status(self, cluster: rh.Cluster, status: ResourceServerStatus):
cluster_uri = rh.globals.rns_client.format_rns_address(cluster.rns_address)
headers = rh.globals.rns_client.request_headers()
api_server_url = rh.globals.rns_client.api_server_url

# updating the resource collection as well, because the cluster.list() gets the info from the resource
status_data_resource = {
"status": status,
"status_last_checked": datetime.utcnow().isoformat(),
}
requests.put(
f"{api_server_url}/resource/{cluster_uri}",
data=json.dumps(status_data_resource),
headers=headers,
)

@pytest.mark.level("local")
@pytest.mark.clustertest
def test_cluster_list_all_pydentic(self, cluster):

# on-demand clusters and the tls_exposed cluster are not created as a part of test-org,
# therefore we can check if those clusters are displayed on the users cluster list
if isinstance(cluster, rh.OnDemandCluster) or "tls_exposed" in cluster.name:

# make sure that we at least one terminated cluster for the tests, (does not matter if the status is mocked)
self.set_cluster_status(
cluster=cluster, status=ResourceServerStatus.terminated
)

all_clusters, running_clusters, _ = Cluster.list(get_all_clusters=True)
assert len(all_clusters) == 200 # den limit
assert len(all_clusters) >= len(running_clusters)

all_clusters_status = set(
[den_cluster.get("Status") for den_cluster in all_clusters]
)

# testing that we don't get just running clusters
assert len(all_clusters) > 1

assert ResourceServerStatus.running in all_clusters_status
assert ResourceServerStatus.terminated in all_clusters_status

current_cluster_info = [
cluster
for cluster in all_clusters
if cluster.get("Name") == cluster.name
][0]
assert current_cluster_info.get("Status") == "terminated"

else:
pytest.skip("can preform this test on test-org clusters")

@pytest.mark.level("local")
@pytest.mark.clustertest
def test_cluster_list_status_filter_pydentic(self, cluster):

from runhouse.resources.hardware.utils import ResourceServerStatus

# on-demand clusters and the tls_exposed cluster are not created as a part of test-org,
# therefore we can check if those clusters are displayed on the users cluster list
if isinstance(cluster, rh.OnDemandCluster) or "tls_exposed" in cluster.name:

all_clusters, running_clusters, _ = Cluster.list(status="running")
assert len(all_clusters) > 0
assert len(running_clusters) > 0
assert len(all_clusters) == len(running_clusters)

all_clusters_status = set(
[den_cluster.get("Status") for den_cluster in all_clusters]
)

supported_statuses_without_running = [
status
for status in list(ResourceServerStatus.__members__.keys())
if status != "running"
]

for status in supported_statuses_without_running:
assert status not in all_clusters_status

assert ResourceServerStatus.running in all_clusters_status

# make sure that we at least one terminated cluster for the tests, (does not matter if the status is mocked)
self.set_cluster_status(
cluster=cluster, status=ResourceServerStatus.terminated
)

all_clusters, running_clusters, _ = Cluster.list(status="terminated")
assert len(all_clusters) > 0
assert len(running_clusters) == 0

all_clusters_status = set(
[den_cluster.get("Status") for den_cluster in all_clusters]
)

assert "terminated" in all_clusters_status

else:
pytest.skip("can preform this test on test-org clusters")

@pytest.mark.level("local")
@pytest.mark.clustertest
def test_cluster_list_since_filter_pydentic(self, cluster):

# on-demand clusters and the tls_exposed cluster are not created as a part of test-org,
# therefore we can check if those clusters are displayed on the users cluster list
if isinstance(cluster, rh.OnDemandCluster) or "tls_exposed" in cluster.name:
minutes_time_filter = 10

# make sure that we at least one terminated cluster for the tests, (does not matter if the status is mocked)
self.set_cluster_status(
cluster=cluster, status=ResourceServerStatus.terminated
)

all_clusters, running_clusters, _ = Cluster.list(
since=f"{minutes_time_filter}m"
)
assert len(all_clusters) > 0
assert len(running_clusters) >= 0

clusters_last_active_timestamps = set(
[den_cluster.get("Last Active (UTC)") for den_cluster in all_clusters]
)

assert len(clusters_last_active_timestamps) >= 1
current_utc_time = datetime.utcnow().replace(tzinfo=timezone.utc)

for timestamp in clusters_last_active_timestamps:
assert (
current_utc_time - timestamp
).total_seconds() <= minutes_time_filter * MINUTE

else:
pytest.skip("can preform this test on test-org clusters")

@pytest.mark.level("local")
@pytest.mark.clustertest
def test_cluster_list_cmd_output_no_filters(self, capsys):
import re
import subprocess

env = os.environ.copy()
# Set the COLUMNS and LINES environment variables to control terminal width and height,
# so we could get the runhouse cluster list output properly using subprocess
env["COLUMNS"] = "250"
env["LINES"] = "40" # Set a height value, though COLUMNS is the key one here

process = subprocess.Popen(
"runhouse cluster list",
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
)
process.wait()
stdout, stderr = process.communicate()
capsys.readouterr()
cmd_stdout = stdout.decode("utf-8")

assert cmd_stdout

# The output is printed as a table.
# testing that the table name is printed correctly
regex = f".*{rh.configs.username}'s Clusters \(Running: .*/.*, Total\s*displayed: .*\).*"
assert re.search(regex, cmd_stdout)

# testing that the table column names is printed correctly
col_names = ["┃ Name", "┃ Cluster Type", "┃ Status", "┃ Last Active (UTC)"]
for name in col_names:
assert name in cmd_stdout
assert (
"runhouse cluster list will display clusters that were active in the last 24.0 hours."
in cmd_stdout
)

@pytest.mark.level("local")
@pytest.mark.clustertest
def test_cluster_list_cmd_output_with_filters(self, capsys):
import re
import subprocess

env = os.environ.copy()
# Set the COLUMNS and LINES environment variables to control terminal width and height,
# so we could get the runhouse cluster list output properly using subprocess
env["COLUMNS"] = "250"
env["LINES"] = "40" # Set a height value, though COLUMNS is the key one here

process = subprocess.Popen(
"runhouse cluster list --status terminated",
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
)
process.wait()
stdout, stderr = process.communicate()
capsys.readouterr()
cmd_stdout = stdout.decode("utf-8")
assert cmd_stdout

# The output is printed as a table.
# testing that the table name is printed correctly

regex = f".*{rh.configs.username}'s Clusters \(Running: .*/.*, Total\s*displayed: .*\).*"
assert re.search(regex, cmd_stdout)

# testing that the table column names is printed correctly
displayed_info_names = [
"┃ Name",
"┃ Cluster Type",
"┃ Status",
"┃ Last Active (UTC)",
]
for name in displayed_info_names:
assert name in cmd_stdout

assert (
"runhouse cluster list will display clusters that were active in the last 24.0 hours."
not in cmd_stdout
)

# Removing 'Running' which appearing in the title of the output,
# so we could test the no clusters with status 'Running' is printed
cmd_stdout = cmd_stdout.replace("Running", "")
assert "Terminated" in cmd_stdout

statues = list(ResourceServerStatus.__members__.keys())
statues.remove("terminated")

for status in statues:
assert status not in cmd_stdout

0 comments on commit c91f406

Please sign in to comment.