Skip to content

Commit

Permalink
cluster list tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexandra Belousov authored and Alexandra Belousov committed Sep 18, 2024
1 parent edf7c85 commit d118596
Show file tree
Hide file tree
Showing 4 changed files with 276 additions and 3 deletions.
5 changes: 4 additions & 1 deletion runhouse/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ def _create_output_table(
table = Table(title=table_title)

if not filters_requested:
table.caption = f"[reset]Showing clusters that were active in the last {LAST_ACTIVE_AT_TIMEFRAME / HOUR} hours."
table.caption = f"[reset]Note: the above clusters have registered activity in the last {int(LAST_ACTIVE_AT_TIMEFRAME / HOUR)} hours."
table.caption_justify = "left"

# Add columns to the table
Expand Down Expand Up @@ -663,6 +663,9 @@ def _add_clusters_to_output_table(

table = _add_cluster_as_table_row(table, rh_cluster)

# adding reset so the name will be printed nicely in the console.
rh_cluster["Name"] = f'[reset]{rh_cluster.get("Name")}'


def _print_msg_live_sky_clusters(sky_clusters: int):
if sky_clusters > 0:
Expand Down
2 changes: 1 addition & 1 deletion runhouse/resources/hardware/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1988,7 +1988,7 @@ def _get_running_and_not_running_clusters(clusters: list):

for den_cluster in clusters:
# get just name, not full rns address. reset is used so the name will be printed all in white.
cluster_name = f'[reset]{den_cluster.get("name").split("/")[-1]}'
cluster_name = f'{den_cluster.get("name").split("/")[-1]}'
cluster_type = den_cluster.get("data").get("resource_subtype")
cluster_status = (
den_cluster.get("status") if den_cluster.get("status") else "unknown"
Expand Down
250 changes: 249 additions & 1 deletion tests/test_resources/test_clusters/test_cluster.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import json
import os
import subprocess
import time

from datetime import datetime, timezone
from threading import Thread

import pytest
Expand All @@ -13,8 +16,10 @@
DEFAULT_HTTPS_PORT,
DEFAULT_SERVER_PORT,
LOCALHOST,
MINUTE,
)

from runhouse.resources.hardware.cluster import Cluster
from runhouse.resources.hardware.utils import ResourceServerStatus

import tests.test_resources.test_resource
Expand All @@ -25,6 +30,7 @@
friend_account_in_org,
get_random_str,
remove_config_keys,
set_cluster_status,
)

""" TODO:
Expand Down Expand Up @@ -665,7 +671,6 @@ def test_rh_status_stopped(self, cluster):
@pytest.mark.level("local")
@pytest.mark.clustertest
def test_send_status_to_db(self, cluster):
import json

status = cluster.status()
env_servlet_processes = status.pop("env_servlet_processes")
Expand Down Expand Up @@ -936,3 +941,246 @@ def test_switch_default_env(self, cluster):
# set it back
cluster.default_env = test_env
cluster.delete(new_env.name)

####################################################################################################
# Cluster list test
####################################################################################################

# when we pass a cluster fixture

@pytest.mark.level("local")
@pytest.mark.clustertest
def test_cluster_list_pythonic(self, cluster):

with friend_account_in_org():
cluster.save()
clusters = Cluster.list()
all_clusters = clusters.get("all_clusters", {})
running_clusters = clusters.get("running_clusters", {})
assert len(all_clusters) > 0
assert len(running_clusters) > 0
assert len(running_clusters) == len(
all_clusters
) # by default we get only running clusters

all_clusters_names = [
den_cluster.get("Name") for den_cluster in all_clusters
]
running_clusters_names = [
running_cluster.get("Name") for running_cluster in running_clusters
]
assert cluster.name in running_clusters_names
assert cluster.name in all_clusters_names

@pytest.mark.level("local")
@pytest.mark.clustertest
def test_cluster_list_all_pythonic(self, cluster):

with friend_account_in_org():

# make sure that we at least one terminated cluster for the tests, (does not matter if the status is mocked)
set_cluster_status(cluster=cluster, status=ResourceServerStatus.terminated)

clusters = Cluster.list(show_all=True)
all_clusters = clusters.get("all_clusters", {})
running_clusters = clusters.get("running_clusters", {})
assert 0 <= len(all_clusters) <= 200 # den limit
assert len(all_clusters) >= len(running_clusters)

all_clusters_status = set(
[den_cluster.get("Status") for den_cluster in all_clusters]
)

# testing that we don't get just running clusters
assert len(all_clusters) > 1

assert ResourceServerStatus.running in all_clusters_status
assert ResourceServerStatus.terminated in all_clusters_status

current_cluster_info = [
den_cluster
for den_cluster in all_clusters
if den_cluster.get("Name") == cluster.name
][0]
assert current_cluster_info.get("Status") == "terminated"

@pytest.mark.level("local")
@pytest.mark.clustertest
def test_cluster_list_status_filter_pythonic(self, cluster):

from runhouse.resources.hardware.utils import ResourceServerStatus

with friend_account_in_org():

clusters = Cluster.list(status="running")
all_clusters = clusters.get("all_clusters", {})
running_clusters = clusters.get("running_clusters", {})
assert len(all_clusters) > 0
assert len(running_clusters) > 0
assert len(all_clusters) == len(running_clusters)

all_clusters_status = set(
[den_cluster.get("Status") for den_cluster in all_clusters]
)

supported_statuses_without_running = [
status
for status in list(ResourceServerStatus.__members__.keys())
if status != "running"
]

for status in supported_statuses_without_running:
assert status not in all_clusters_status

assert ResourceServerStatus.running in all_clusters_status

# make sure that we at least one terminated cluster for the tests, (does not matter if the status is mocked)
set_cluster_status(cluster=cluster, status=ResourceServerStatus.terminated)

clusters = Cluster.list(status="terminated")
all_clusters = clusters.get("all_clusters", {})
running_clusters = clusters.get("running_clusters", {})
assert len(all_clusters) > 0
assert len(running_clusters) == 0

all_clusters_status = set(
[den_cluster.get("Status") for den_cluster in all_clusters]
)

assert "terminated" in all_clusters_status

@pytest.mark.level("local")
@pytest.mark.clustertest
def test_cluster_list_since_filter_pythonic(self, cluster):

with friend_account_in_org():
cluster.save() # tls exposed local cluster is not saved by default

minutes_time_filter = 10

# make sure that we at least one terminated cluster for the tests, (does not matter if the status is mocked)
set_cluster_status(cluster=cluster, status=ResourceServerStatus.terminated)

clusters = Cluster.list(since=f"{minutes_time_filter}m")
all_clusters = clusters.get("all_clusters", {})
running_clusters = clusters.get("running_clusters", {})
assert len(running_clusters) >= 0
assert len(all_clusters) > 0

clusters_last_active_timestamps = set(
[den_cluster.get("Last Active (UTC)") for den_cluster in all_clusters]
)

assert len(clusters_last_active_timestamps) >= 1
current_utc_time = datetime.utcnow().replace(tzinfo=timezone.utc)

for timestamp in clusters_last_active_timestamps:
assert (
current_utc_time - timestamp
).total_seconds() <= minutes_time_filter * MINUTE

@pytest.mark.level("local")
@pytest.mark.clustertest
def test_cluster_list_cmd_output_no_filters(self, capsys):
import re
import subprocess

env = os.environ.copy()
# Set the COLUMNS and LINES environment variables to control terminal width and height,
# so we could get the runhouse cluster list output properly using subprocess
env["COLUMNS"] = "250"
env["LINES"] = "40" # Set a height value, though COLUMNS is the key one here

process = subprocess.Popen(
"runhouse cluster list",
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
)
process.wait()
stdout, stderr = process.communicate()
capsys.readouterr()
cmd_stdout = stdout.decode("utf-8")

assert cmd_stdout

# The output is printed as a table.
# testing that the table name is printed correctly
regex = f".*Clusters for {rh.configs.username} \(Running: .*/.*, Total Displayed: .*/.*\).*"
assert re.search(regex, cmd_stdout)

# testing that the table column names is printed correctly
col_names = ["┃ Name", "┃ Cluster Type", "┃ Status", "┃ Last Active (UTC)"]
for name in col_names:
assert name in cmd_stdout
assert (
"Note: the above clusters have registered activity in the last 24 hours."
in cmd_stdout
)

@pytest.mark.level("local")
@pytest.mark.clustertest
def test_cluster_list_cmd_output_with_filters(self, capsys, cluster):

import re
import subprocess

with friend_account_in_org():
cluster.save() # tls exposed local cluster is not saved by default

# make sure that we at least one terminated cluster for the tests, (does not matter if the status is mocked)
set_cluster_status(cluster=cluster, status=ResourceServerStatus.terminated)

env = os.environ.copy()
# Set the COLUMNS and LINES environment variables to control terminal width and height,
# so we could get the runhouse cluster list output properly using subprocess
env["COLUMNS"] = "250"
env[
"LINES"
] = "40" # Set a height value, though COLUMNS is the key one here

process = subprocess.Popen(
"runhouse cluster list --status terminated",
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
)
process.wait()
stdout, stderr = process.communicate()
capsys.readouterr()
cmd_stdout = stdout.decode("utf-8")
assert cmd_stdout

# The output is printed as a table.
# testing that the table name is printed correctly

regex = f".*Clusters for {rh.configs.username} \(Running: .*/.*, Total Displayed: .*/.*\).*"
assert re.search(regex, cmd_stdout)

# testing that the table column names is printed correctly
displayed_info_names = [
"┃ Name",
"┃ Cluster Type",
"┃ Status",
"┃ Last Active (UTC)",
]
for name in displayed_info_names:
assert name in cmd_stdout

assert (
"Note: the above clusters have registered activity in the last 24 hours."
not in cmd_stdout
)

# Removing 'Running' which appearing in the title of the output,
# so we could test the no clusters with status 'Running' is printed
cmd_stdout = cmd_stdout.replace("Running", "")
assert "Terminated" in cmd_stdout

statues = list(ResourceServerStatus.__members__.keys())
statues.remove("terminated")

for status in statues:
assert status not in cmd_stdout
22 changes: 22 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
import contextlib
import importlib
import json
import os
import uuid
from datetime import datetime
from pathlib import Path

import pytest
import requests

import runhouse as rh
import yaml
from runhouse.constants import TESTING_LOG_LEVEL

from runhouse.globals import rns_client

from runhouse.resources.hardware.utils import ResourceServerStatus
from runhouse.servers.obj_store import get_cluster_servlet, ObjStore, RaySetupOption


Expand Down Expand Up @@ -132,3 +137,20 @@ def remove_config_keys(config, keys_to_skip):
for key in keys_to_skip:
config.pop(key, None)
return config


def set_cluster_status(cluster: rh.Cluster, status: ResourceServerStatus):
cluster_uri = rh.globals.rns_client.format_rns_address(cluster.rns_address)
headers = rh.globals.rns_client.request_headers()
api_server_url = rh.globals.rns_client.api_server_url

# updating the resource collection as well, because the cluster.list() gets the info from the resource
status_data_resource = {
"status": status,
"status_last_checked": datetime.utcnow().isoformat(),
}
requests.put(
f"{api_server_url}/resource/{cluster_uri}",
data=json.dumps(status_data_resource),
headers=headers,
)

0 comments on commit d118596

Please sign in to comment.