Skip to content

Commit

Permalink
cluster list tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexandra Belousov authored and Alexandra Belousov committed Sep 11, 2024
1 parent 4e7363b commit acc9724
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 12 deletions.
20 changes: 13 additions & 7 deletions runhouse/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from runhouse.constants import (
BULLET_UNICODE,
DOUBLE_SPACE_UNICODE,
HOUR,
LAST_ACTIVE_AT_TIMEFRAME,
RAY_KILL_CMD,
RAY_START_CMD,
Expand Down Expand Up @@ -594,9 +595,9 @@ def status(
_print_status(cluster_status, current_cluster)


def _create_output_table(total_clusters: int, running_clusters: int):
table_title = f"[bold cyan]{rns_client.username}'s Clusters (Running: {running_clusters}, Total: {total_clusters})[/bold cyan]"
table = Table(title=table_title)
def _create_output_table():

table = Table()

# Add columns to the table
table.add_column("Name", justify="left", no_wrap=True)
Expand Down Expand Up @@ -635,7 +636,8 @@ def _add_clusters_to_output_table(
if filters_requested:
table = _add_cluster_as_table_row(table, rh_cluster)
else:
# Default behaviour: Print Running clusters that were active it the last 24 hours
# Default behaviour: Print Running clusters that were active it the last LAST_ACTIVE_AT_TIMEFRAME timestamp.
# Currently, LAST_ACTIVE_AT_TIMEFRAME = 24 hours.
now = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc)
if (now - last_active_at).total_seconds() <= LAST_ACTIVE_AT_TIMEFRAME:
table = _add_cluster_as_table_row(table, rh_cluster)
Expand Down Expand Up @@ -711,9 +713,7 @@ def cluster_list(
console.print("No existing clusters.")

# creating the clusters table
table = _create_output_table(
total_clusters=len(all_clusters), running_clusters=len(running_clusters)
)
table = _create_output_table()

filters_requested: bool = (
get_all_clusters
Expand All @@ -725,6 +725,11 @@ def cluster_list(

clusters_to_print = all_clusters if filters_requested else running_clusters

if not filters_requested:
logger.info(
f"runhouse cluster list will display clusters that were active in the last {LAST_ACTIVE_AT_TIMEFRAME/HOUR} hours."
)

if get_all_clusters:
# if user requesting all den cluster, limit print only to 50 clusters max.
clusters_to_print = clusters_to_print[: (min(len(clusters_to_print), 50))]
Expand All @@ -733,6 +738,7 @@ def cluster_list(
table=table, clusters=clusters_to_print, filters_requested=filters_requested
)

table.title = f"[bold cyan]{rns_client.username}'s Clusters (Running: {table.row_count}/{len(running_clusters)}, Total: {len(all_clusters)})[/bold cyan]"
console.print(table)

_print_msg_live_sky_clusters(live_clusters_not_in_den=len(sky_live_clusters))
Expand Down
171 changes: 166 additions & 5 deletions tests/test_resources/test_clusters/test_cluster.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import json
import os
import subprocess
import time

from datetime import datetime, timezone
from threading import Thread

import pytest
Expand All @@ -12,9 +15,11 @@
DEFAULT_HTTP_PORT,
DEFAULT_HTTPS_PORT,
DEFAULT_SERVER_PORT,
HOUR,
LOCALHOST,
)

from runhouse.resources.hardware.cluster import Cluster
from runhouse.resources.hardware.utils import ResourceServerStatus

import tests.test_resources.test_resource
Expand Down Expand Up @@ -106,10 +111,10 @@ class TestCluster(tests.test_resources.test_resource.TestResource):
LOCAL = {
"cluster": [
"docker_cluster_pk_ssh_no_auth", # Represents private dev use case
"docker_cluster_pk_ssh_den_auth", # Helps isolate Auth issues
"docker_cluster_pk_tls_den_auth", # Represents public app use case
"docker_cluster_pk_http_exposed", # Represents within VPC use case
"docker_cluster_pwd_ssh_no_auth",
# "docker_cluster_pk_ssh_den_auth", # Helps isolate Auth issues
# "docker_cluster_pk_tls_den_auth", # Represents public app use case
# "docker_cluster_pk_http_exposed", # Represents within VPC use case
# "docker_cluster_pwd_ssh_no_auth",
],
}
MINIMAL = {"cluster": ["static_cpu_pwd_cluster"]}
Expand Down Expand Up @@ -660,7 +665,6 @@ def test_rh_status_stopped(self, cluster):
@pytest.mark.level("local")
@pytest.mark.clustertest
def test_send_status_to_db(self, cluster):
import json

status = cluster.status()
env_servlet_processes = status.pop("env_servlet_processes")
Expand Down Expand Up @@ -931,3 +935,160 @@ def test_switch_default_env(self, cluster):
# set it back
cluster.default_env = test_env
cluster.delete(new_env.name)

@pytest.mark.level("local")
@pytest.mark.clustertest
def test_cluster_list_pydentic(self, cluster):
all_clusters, running_clusters, _ = Cluster.list()
assert len(all_clusters) > 0
assert len(running_clusters) > 0

all_clusters_names = [
f'/{rh.configs.username}/{den_cluster.get("name")}'
for den_cluster in all_clusters
]
running_clusters_names = [
f'/{rh.configs.username}/{running_cluster.get("name")}'
for running_cluster in running_clusters
]

assert cluster.name in all_clusters_names
assert cluster.name in running_clusters_names

def set_cluster_status(self, cluster: rh.Cluster, status: ResourceServerStatus):
status_info = cluster.status()
env_servlet_processes = status_info.pop("env_servlet_processes")
status_data = {
"status": status,
"resource_type": status_info.get("cluster_config").get("resource_type"),
"resource_info": status_info,
"env_servlet_processes": env_servlet_processes,
}
cluster_uri = rh.globals.rns_client.format_rns_address(cluster.rns_address)
headers = rh.globals.rns_client.request_headers()
api_server_url = rh.globals.rns_client.api_server_url
requests.post(
f"{api_server_url}/resource/{cluster_uri}/cluster/status",
data=json.dumps(status_data),
headers=headers,
)

@pytest.mark.level("local")
@pytest.mark.clustertest
def test_cluster_list_all_pydentic(self, cluster):

# make sure that we at least one terminated cluster for the tests, (does not matter if the status is mocked)
self.set_cluster_status(cluster=cluster, status=ResourceServerStatus.terminated)

all_clusters, running_clusters, _ = Cluster.list(get_all_clusters=True)
assert len(all_clusters) > 0
assert len(running_clusters) > 0
assert len(all_clusters) >= len(running_clusters)

all_clusters_names = [
f'/{rh.configs.username}/{den_cluster.get("name")}'
for den_cluster in all_clusters
]

all_clusters_status = set(
[den_cluster.get("Status") for den_cluster in all_clusters]
)

assert len(all_clusters) > 1
assert ResourceServerStatus.running in all_clusters_status
assert ResourceServerStatus.terminated in all_clusters_status

running_clusters_names = [
f'/{rh.configs.username}/{running_cluster.get("name")}'
for running_cluster in running_clusters
]

assert cluster.name in all_clusters_names
assert cluster.name in running_clusters_names

@pytest.mark.level("local")
@pytest.mark.clustertest
def test_cluster_list_status_filter_pydentic(self, cluster):

from runhouse.resources.hardware.utils import ResourceServerStatus

all_clusters, running_clusters, _ = Cluster.list(status="running")
assert len(all_clusters) > 0
assert len(running_clusters) > 0
assert len(all_clusters) == len(running_clusters)

all_clusters_status = set(
[den_cluster.get("Status") for den_cluster in all_clusters]
)

supported_statuses_without_running = [
status
for status in list(ResourceServerStatus.__members__.keys())
if status != "running"
]

for status in supported_statuses_without_running:
assert status not in all_clusters_status

assert ResourceServerStatus.running in all_clusters_status

# make sure that we at least one terminated cluster for the tests, (does not matter if the status is mocked)
self.set_cluster_status(cluster=cluster, status=ResourceServerStatus.terminated)

all_clusters, running_clusters, _ = Cluster.list(status="terminated")
assert len(all_clusters) > 0
assert len(running_clusters) == 0

all_clusters_status = set(
[den_cluster.get("Status") for den_cluster in all_clusters]
)

assert "terminated" in all_clusters_status

@pytest.mark.level("local")
@pytest.mark.clustertest
def test_cluster_list_type_filter_pydentic(self):

all_clusters, running_clusters, _ = Cluster.list(subtype="static")
assert len(all_clusters) > 0
assert len(running_clusters) >= 0

all_clusters_types = set(
[den_cluster.get("Cluster Type") for den_cluster in all_clusters]
)

assert len(all_clusters_types) == 1
assert "Cluster" in all_clusters_types

@pytest.mark.level("unit")
@pytest.mark.clustertest
def test_cluster_list_since_filter_pydentic(self, cluster):

hours_time_filter = 12

# make sure that we at least one terminated cluster for the tests, (does not matter if the status is mocked)
self.set_cluster_status(cluster=cluster, status=ResourceServerStatus.terminated)

all_clusters, running_clusters, _ = Cluster.list(since=f"{hours_time_filter}h")
assert len(all_clusters) > 0
assert len(running_clusters) >= 0

clusters_last_active_timestamps = set(
[den_cluster.get("Last Active (UTC)") for den_cluster in all_clusters]
)

assert len(clusters_last_active_timestamps) >= 1
current_utc_time = datetime.utcnow().replace(tzinfo=timezone.utc)

for timestamp in clusters_last_active_timestamps:
assert (
current_utc_time - timestamp
).total_seconds() <= hours_time_filter * HOUR

@pytest.mark.level("local")
@pytest.mark.clustertest
def test_cluster_list_all_cmd(self):
import subprocess

a = subprocess.check_output(["runhouse", "cluster", "list"])
assert a

0 comments on commit acc9724

Please sign in to comment.