Skip to content

Commit

Permalink
cluster list tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexandra Belousov authored and Alexandra Belousov committed Sep 11, 2024
1 parent 15b29f8 commit 0c38238
Show file tree
Hide file tree
Showing 3 changed files with 226 additions and 9 deletions.
23 changes: 16 additions & 7 deletions runhouse/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from runhouse.constants import (
BULLET_UNICODE,
DOUBLE_SPACE_UNICODE,
HOUR,
LAST_ACTIVE_AT_TIMEFRAME,
RAY_KILL_CMD,
RAY_START_CMD,
Expand Down Expand Up @@ -593,9 +594,9 @@ def status(
_print_status(cluster_status, current_cluster)


def _create_output_table(total_clusters: int, running_clusters: int):
table_title = f"[bold cyan]{rns_client.username}'s Clusters (Running: {running_clusters}, Total: {total_clusters})[/bold cyan]"
table = Table(title=table_title)
def _create_output_table():

table = Table()

# Add columns to the table
table.add_column("Name", justify="left", no_wrap=True)
Expand Down Expand Up @@ -631,10 +632,14 @@ def _add_clusters_to_output_table(
rh_cluster["Last Active (UTC)"] = last_active_at_no_offset
rh_cluster["Status"] = get_status_color(rh_cluster.get("Status"))

# adding reset so the name will be printed nicely in the console.
rh_cluster["Name"] = f'[reset]{rh_cluster.get("Name")}'

if filters_requested:
table = _add_cluster_as_table_row(table, rh_cluster)
else:
# Default behaviour: Print Running clusters that were active it the last 24 hours
# Default behaviour: Print Running clusters that were active it the last LAST_ACTIVE_AT_TIMEFRAME timestamp.
# Currently, LAST_ACTIVE_AT_TIMEFRAME = 24 hours.
now = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc)
if (now - last_active_at).total_seconds() <= LAST_ACTIVE_AT_TIMEFRAME:
table = _add_cluster_as_table_row(table, rh_cluster)
Expand Down Expand Up @@ -698,16 +703,19 @@ def cluster_list(
console.print("No existing clusters.")

# creating the clusters table
table = _create_output_table(
total_clusters=len(all_clusters), running_clusters=len(running_clusters)
)
table = _create_output_table()

filters_requested: bool = (
get_all_clusters or since or cluster_status or get_all_clusters
)

clusters_to_print = all_clusters if filters_requested else running_clusters

if not filters_requested:
logger.info(
f"runhouse cluster list will display clusters that were active in the last {LAST_ACTIVE_AT_TIMEFRAME/HOUR} hours."
)

if get_all_clusters:
# if user requesting all den cluster, limit print only to 50 clusters max.
clusters_to_print = clusters_to_print[: (min(len(clusters_to_print), 50))]
Expand All @@ -716,6 +724,7 @@ def cluster_list(
table=table, clusters=clusters_to_print, filters_requested=filters_requested
)

table.title = f"[bold cyan]{rns_client.username}'s Clusters (Running: {len(running_clusters)}/{len(all_clusters)}, Total displayed: {len(all_clusters)})[/bold cyan]"
console.print(table)

_print_msg_live_sky_clusters(live_clusters_not_in_den=len(sky_live_clusters))
Expand Down
2 changes: 1 addition & 1 deletion runhouse/resources/hardware/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2063,7 +2063,7 @@ def _get_running_and_not_running_clusters(clusters: list):

for den_cluster in clusters:
# get just name, not full rns address. reset is used so the name will be printed all in white.
cluster_name = f'[reset]{den_cluster.get("name").split("/")[-1]}'
cluster_name = f'{den_cluster.get("name").split("/")[-1]}'
cluster_type = den_cluster.get("data").get("resource_subtype")
cluster_status = (
den_cluster.get("status") if den_cluster.get("status") else "unknown"
Expand Down
210 changes: 209 additions & 1 deletion tests/test_resources/test_clusters/test_cluster.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import json
import os
import subprocess
import time

from datetime import datetime, timezone
from threading import Thread

import pytest
Expand All @@ -12,9 +15,11 @@
DEFAULT_HTTP_PORT,
DEFAULT_HTTPS_PORT,
DEFAULT_SERVER_PORT,
HOUR,
LOCALHOST,
)

from runhouse.resources.hardware.cluster import Cluster
from runhouse.resources.hardware.utils import ResourceServerStatus

import tests.test_resources.test_resource
Expand Down Expand Up @@ -660,7 +665,6 @@ def test_rh_status_stopped(self, cluster):
@pytest.mark.level("local")
@pytest.mark.clustertest
def test_send_status_to_db(self, cluster):
import json

status = cluster.status()
env_servlet_processes = status.pop("env_servlet_processes")
Expand Down Expand Up @@ -931,3 +935,207 @@ def test_switch_default_env(self, cluster):
# set it back
cluster.default_env = test_env
cluster.delete(new_env.name)

@pytest.mark.level("local")
@pytest.mark.clustertest
def test_cluster_list_pydentic(self, cluster):
all_clusters, running_clusters, _ = Cluster.list()
assert len(all_clusters) > 0
assert len(running_clusters) > 0

all_clusters_names = [den_cluster.get("Name") for den_cluster in all_clusters]
running_clusters_names = [
running_cluster.get("Name") for running_cluster in running_clusters
]

assert cluster.name in all_clusters_names
assert cluster.name in running_clusters_names

def set_cluster_status(self, cluster: rh.Cluster, status: ResourceServerStatus):
cluster_uri = rh.globals.rns_client.format_rns_address(cluster.rns_address)
headers = rh.globals.rns_client.request_headers()
api_server_url = rh.globals.rns_client.api_server_url

# updating the resource collection as well, because the cluster.list() gets the info from the resource
status_data_resource = {
"status": status,
"status_last_checked": datetime.utcnow(),
}
requests.put(
f"{api_server_url}/resource/{cluster_uri}",
data=json.dumps(status_data_resource),
headers=headers,
)

@pytest.mark.level("local")
@pytest.mark.clustertest
def test_cluster_list_all_pydentic(self, cluster):

# make sure that we at least one terminated cluster for the tests, (does not matter if the status is mocked)
self.set_cluster_status(cluster=cluster, status=ResourceServerStatus.terminated)

all_clusters, running_clusters, _ = Cluster.list(get_all_clusters=True)
assert len(all_clusters) > 0
assert len(running_clusters) > 0
assert len(all_clusters) >= len(running_clusters)

all_clusters_names = [den_cluster.get("Name") for den_cluster in all_clusters]

all_clusters_status = set(
[den_cluster.get("Status") for den_cluster in all_clusters]
)

assert len(all_clusters) > 1
assert ResourceServerStatus.running in all_clusters_status
assert ResourceServerStatus.terminated in all_clusters_status

running_clusters_names = [
running_cluster.get("Name") for running_cluster in running_clusters
]

assert cluster.name in all_clusters_names
assert cluster.name in running_clusters_names

@pytest.mark.level("local")
@pytest.mark.clustertest
def test_cluster_list_status_filter_pydentic(self, cluster):

from runhouse.resources.hardware.utils import ResourceServerStatus

all_clusters, running_clusters, _ = Cluster.list(status="running")
assert len(all_clusters) > 0
assert len(running_clusters) > 0
assert len(all_clusters) == len(running_clusters)

all_clusters_status = set(
[den_cluster.get("Status") for den_cluster in all_clusters]
)

supported_statuses_without_running = [
status
for status in list(ResourceServerStatus.__members__.keys())
if status != "running"
]

for status in supported_statuses_without_running:
assert status not in all_clusters_status

assert ResourceServerStatus.running in all_clusters_status

# make sure that we at least one terminated cluster for the tests, (does not matter if the status is mocked)
self.set_cluster_status(cluster=cluster, status=ResourceServerStatus.terminated)

all_clusters, running_clusters, _ = Cluster.list(status="terminated")
assert len(all_clusters) > 0
assert len(running_clusters) == 0

all_clusters_status = set(
[den_cluster.get("Status") for den_cluster in all_clusters]
)

assert "terminated" in all_clusters_status

@pytest.mark.level("local")
@pytest.mark.clustertest
def test_cluster_list_since_filter_pydentic(self, cluster):

hours_time_filter = 12

# make sure that we at least one terminated cluster for the tests, (does not matter if the status is mocked)
self.set_cluster_status(cluster=cluster, status=ResourceServerStatus.terminated)

all_clusters, running_clusters, _ = Cluster.list(since=f"{hours_time_filter}h")
assert len(all_clusters) > 0
assert len(running_clusters) >= 0

clusters_last_active_timestamps = set(
[den_cluster.get("Last Active (UTC)") for den_cluster in all_clusters]
)

assert len(clusters_last_active_timestamps) >= 1
current_utc_time = datetime.utcnow().replace(tzinfo=timezone.utc)

for timestamp in clusters_last_active_timestamps:
assert (
current_utc_time - timestamp
).total_seconds() <= hours_time_filter * HOUR

@pytest.mark.level("local")
@pytest.mark.clustertest
def test_cluster_list_cmd_output_no_filters(self):
import re
import subprocess

process = subprocess.Popen(
["runhouse", "cluster", "list"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)

process.wait()

cmd_stdout = "\n".join([line.strip() for line in process.stdout])
assert cmd_stdout

# The output is printed as a table.
# testing that the table name is printed correctly
regex = f".*{rh.configs.username}'s Clusters \(Running: .*/.*, Total displayed: .*\).*"
assert re.search(regex, cmd_stdout)

# testing that the table column names is printed correctly
col_names = ["┃ Name ", "┃ Cluster Type", "┃ Status", "┃ Last Active (UTC)"]
for name in col_names:
assert name in cmd_stdout
assert (
"runhouse cluster list will display clusters that were active in the last 24.0 hours."
in cmd_stdout
)

@pytest.mark.level("local")
@pytest.mark.clustertest
def test_cluster_list_cmd_output_with_filters(self):
import re
import subprocess

process = subprocess.Popen(
["runhouse", "cluster", "list", "--status", "terminated"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)

process.wait()

# The output is printed as a table.
# testing that the table name is printed correctly
cmd_stdout = "\n".join([line.strip() for line in process.stdout])
assert cmd_stdout
regex = f".*{rh.configs.username}'s Clusters \(Running: 0/.*, Total displayed: .*\).*"
assert re.search(regex, cmd_stdout)

# testing that the table column names is printed correctly
displayed_info_names = [
"┃ Name ",
"┃ Cluster Type",
"┃ Status",
"┃ Last Active (UTC)",
]
for name in displayed_info_names:
assert name in cmd_stdout

assert (
"runhouse cluster list will display clusters that were active in the last 24.0 hours."
not in cmd_stdout
)

# Removing 'Running' which appearing in the title of the output,
# so we could test the no clusters with status 'Running' is printed
cmd_stdout = cmd_stdout.replace("Running", "")
assert "Terminated" in cmd_stdout

statues = list(ResourceServerStatus.__members__.keys())
statues.remove("terminated")

for status in statues:
assert status not in cmd_stdout

0 comments on commit 0c38238

Please sign in to comment.