|
| 1 | +# Copyright 2025 Amazon.com, Inc. or its affiliates. All Rights Reserved. |
| 2 | +# |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License"). |
| 4 | +# You may not use this file except in compliance with the License. |
| 5 | +# A copy of the License is located at |
| 6 | +# |
| 7 | +# http://aws.amazon.com/apache2.0/ |
| 8 | +# |
| 9 | +# or in the "LICENSE.txt" file accompanying this file. |
| 10 | +# This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, express or implied. |
| 11 | +# See the License for the specific language governing permissions and limitations under the License. |
| 12 | +import json |
| 13 | +import logging |
| 14 | + |
| 15 | +import boto3 |
| 16 | +import pytest |
| 17 | +from assertpy import assert_that |
| 18 | +from clusters_factory import Cluster |
| 19 | +from remote_command_executor import RemoteCommandExecutor |
| 20 | + |
| 21 | +from tests.common.assertions import assert_regex_in_file |
| 22 | +from tests.common.schedulers_common import SlurmCommands |
| 23 | +from tests.common.utils import read_remote_file, terminate_nodes_manually |
| 24 | + |
| 25 | + |
| 26 | +def submit_job_imex_status(rce: RemoteCommandExecutor, launch_template_id: str, queue_name: str, max_nodes: int = 1): |
| 27 | + logging.info("Submitting job to check IMEX status") |
| 28 | + slurm = SlurmCommands(rce) |
| 29 | + job_id = slurm.submit_command_and_assert_job_accepted( |
| 30 | + submit_command_args={ |
| 31 | + "command": f"/opt/parallelcluster/shared/nvidia-imex-status.job /opt/parallelcluster/shared/nvidia-imex/config_{launch_template_id}.cfg", |
| 32 | + "partition": queue_name, |
| 33 | + "nodes": max_nodes, |
| 34 | + } |
| 35 | + ) |
| 36 | + slurm.wait_job_completed(job_id) |
| 37 | + slurm.assert_job_succeeded(job_id) |
| 38 | + return job_id |
| 39 | + |
| 40 | +def assert_imex_nodes_config_is_correct(rce: RemoteCommandExecutor, launch_template_id: str, expected_ips: list): |
| 41 | + logging.info(f"Checking IMEX nodes config contains the expected nodes: {expected_ips}") |
| 42 | + imex_nodes_config_file = f"/opt/parallelcluster/shared/nvidia-imex/nodes_config_{launch_template_id}.cfg" |
| 43 | + imex_config_content = read_remote_file(rce, imex_nodes_config_file) |
| 44 | + actual_ips = [ip.strip() for ip in imex_config_content.strip().split("\n")] |
| 45 | + assert_that(actual_ips).contains_only(*expected_ips) |
| 46 | + logging.info(f"IMEX nodes config contains the expected nodes: {expected_ips}") |
| 47 | + |
| 48 | +def assert_imex_status_ok(rce: RemoteCommandExecutor, job_id: str, ips: list): |
| 49 | + """ |
| 50 | + Assert that the output returned by the nvidia-imex-ctl command represent a healthy status for IMEX. |
| 51 | + IMEX is considered healthy if every node of the domain reports a healthy status, i.e: |
| 52 | + * every node is READY |
| 53 | + * every node is CONNECTED to every other node |
| 54 | +
|
| 55 | + Example of healthy IMEX status: |
| 56 | + { |
| 57 | + "nodes": { |
| 58 | + "0": { |
| 59 | + "status": "READY", |
| 60 | + "host": "192.168.103.159", |
| 61 | + "connections": { |
| 62 | + "1": { |
| 63 | + "host": "192.168.107.187", |
| 64 | + "status": "CONNECTED", |
| 65 | + "changed": true |
| 66 | + }, |
| 67 | + "0": { |
| 68 | + "host": "192.168.103.159", |
| 69 | + "status": "CONNECTED", |
| 70 | + "changed": true |
| 71 | + } |
| 72 | + }, |
| 73 | + "changed": true, |
| 74 | + "version": "570.172.08" |
| 75 | + }, |
| 76 | + "1": { |
| 77 | + "status": "READY", |
| 78 | + "host": "192.168.107.187", |
| 79 | + "connections": { |
| 80 | + "0": { |
| 81 | + "host": "192.168.103.159", |
| 82 | + "status": "CONNECTED", |
| 83 | + "changed": true |
| 84 | + }, |
| 85 | + "1": { |
| 86 | + "host": "192.168.107.187", |
| 87 | + "status": "CONNECTED", |
| 88 | + "changed": true |
| 89 | + } |
| 90 | + }, |
| 91 | + "changed": true, |
| 92 | + "version": "570.172.08" |
| 93 | + } |
| 94 | + }, |
| 95 | + "timestamp": "8/8/2025 17:38:02.641", |
| 96 | + "status": "UP" |
| 97 | + } |
| 98 | + """ |
| 99 | + slurm = SlurmCommands(rce) |
| 100 | + |
| 101 | + for reporting_node_ip in ips: |
| 102 | + reporting_node_name = slurm.get_nodename_from_ip(reporting_node_ip) |
| 103 | + logging.info(f"Retrieving IMEX status reported by node {reporting_node_ip} with hostname {reporting_node_name}") |
| 104 | + result_stdout = rce.run_remote_command(f"cat result_{job_id}_{reporting_node_name}.out").stdout |
| 105 | + result_stderr = rce.run_remote_command(f"cat result_{job_id}_{reporting_node_name}.err").stdout |
| 106 | + assert_that(result_stderr).is_empty() |
| 107 | + logging.info(f"IMEX status reported by by node {reporting_node_ip} with hostname {reporting_node_name}: {result_stdout}") |
| 108 | + # logging.info(f"Checking that IMEX is able to interconnect the expected nodes: {ips}") |
| 109 | + # imex_status = json.loads(result_stdout) |
| 110 | + # assert_that(imex_status["status"]).is_equal_to("UP") |
| 111 | + # for ip_source in ips: |
| 112 | + # node_item = next(filter(lambda i: i['host'] == ip_source, imex_status['nodes'].values()), None) |
| 113 | + # assert_that(node_item).is_not_none() |
| 114 | + # assert_that(node_item['status']).is_equal_to("READY") |
| 115 | + # for ip_destination in ips: |
| 116 | + # connection_item = next(filter(lambda i: i['host'] == ip_destination, node_item['connections'].values()), None) |
| 117 | + # assert_that(node_item).is_not_none() |
| 118 | + # assert_that(connection_item['status']).is_equal_to("CONNECTED") |
| 119 | + |
| 120 | +def assert_imex_healthy(cluster: Cluster, queue_name: str, compute_resource_name: str, max_nodes: int = 1): |
| 121 | + rce = RemoteCommandExecutor(cluster) |
| 122 | + |
| 123 | + launch_template_id = cluster.get_compute_nodes_launch_template_logical_id(queue_name, compute_resource_name) |
| 124 | + logging.info( |
| 125 | + f"Launch template for compute nodes in queue {queue_name} and compute resource {compute_resource_name}: {launch_template_id}" |
| 126 | + ) |
| 127 | + |
| 128 | + ips = cluster.get_compute_nodes_private_ip(queue_name, compute_resource_name) |
| 129 | + logging.info( |
| 130 | + f"Private IP addresses for compute nodes in queue {queue_name} and compute resource {compute_resource_name}: {ips}" |
| 131 | + ) |
| 132 | + |
| 133 | + job_id = submit_job_imex_status(rce, launch_template_id, queue_name, max_nodes) |
| 134 | + |
| 135 | + assert_imex_nodes_config_is_correct(rce, launch_template_id, ips) |
| 136 | + assert_imex_status_ok(rce, job_id, ips) |
| 137 | + |
| 138 | + for compute_node_ip in cluster.get_compute_nodes_private_ip(queue_name, compute_resource_name): |
| 139 | + for file_name in ["/var/log/nvidia-imex-verbose.log", "/var/log/parallelcluster/nvidia-imex-prolog.log"]: |
| 140 | + logging.info(f"Checking file {file_name} log does not contain any error") |
| 141 | + assert_regex_in_file(cluster, compute_node_ip, file_name, r"^(?!.*(?:err|warn|fail)).*$") |
| 142 | + |
| 143 | +def assert_imex_not_configured(cluster: Cluster, queue_name: str, compute_resource_name: str, max_nodes: int = 1): |
| 144 | + rce = RemoteCommandExecutor(cluster) |
| 145 | + |
| 146 | + launch_template_id = cluster.get_compute_nodes_launch_template_logical_id(queue_name, compute_resource_name) |
| 147 | + logging.info( |
| 148 | + f"Launch template for compute nodes in queue {queue_name} and compute resource {compute_resource_name}: {launch_template_id}" |
| 149 | + ) |
| 150 | + |
| 151 | + submit_job_imex_status(rce, launch_template_id, queue_name, max_nodes) |
| 152 | + |
| 153 | + assert_imex_nodes_config_is_correct(rce, launch_template_id, ["0.0.0.0", "0.0.0.0"]) |
| 154 | + |
| 155 | + |
| 156 | +@pytest.mark.usefixtures("region", "os", "instance", "scheduler") |
| 157 | +def test_gb200(pcluster_config_reader, file_reader, clusters_factory, test_datadir, s3_bucket_factory, region): |
| 158 | + """ |
| 159 | + Test automated configuration of Nvidia IMEX. |
| 160 | +
|
| 161 | + This test creates a cluster with the necessary custom actions to configure NVIDIA IMEX and verifies the following: |
| 162 | + 1. On the compute resource supporting IMEX (q1-cr1), the IMEX nodes file is configured by the prolog, |
| 163 | + IMEX service is healthy and no errors are reported in IMEX's or prolog's logs. |
| 164 | + Also, IMEX gets reconfigured when nodes belonging to the same compute resource get replaced |
| 165 | + 2. On the compute resource not supporting IMEX (q1-cr2), the IMEX nodes file is not configured by the prolog, |
| 166 | + keeping the default values and IMEX is not started. |
| 167 | +
|
| 168 | + The test prints in test log the full IMEX status to facilitate troubleshooting. |
| 169 | + The test uses instance type g4dn to simulate a p6e-gb200 instance. |
| 170 | + This is a reasonable approximation for the test because the focus of the test is on IMEX configuration, |
| 171 | + which can be executed on g4dn as well. |
| 172 | + """ |
| 173 | + max_queue_size = 2 |
| 174 | + |
| 175 | + # Create an S3 bucket for custom action scripts |
| 176 | + bucket_name = s3_bucket_factory() |
| 177 | + bucket = boto3.resource("s3", region_name=region).Bucket(bucket_name) |
| 178 | + |
| 179 | + # Upload files to test bucket |
| 180 | + headnode_start_filename = "head_node_start.sh" |
| 181 | + prolog_filename = "90-nvidia-imex.prolog.sh" |
| 182 | + job_filename = "nvidia-imex-status.job" |
| 183 | + bucket.upload_file(str(test_datadir / prolog_filename), prolog_filename) |
| 184 | + bucket.upload_file(str(test_datadir / job_filename), job_filename) |
| 185 | + head_node_start_script_rendered = file_reader( |
| 186 | + input_file=headnode_start_filename, |
| 187 | + output_file=f"{headnode_start_filename}.rendered", |
| 188 | + bucket_name=bucket_name, |
| 189 | + prolog_filename=prolog_filename, |
| 190 | + job_filename=job_filename, |
| 191 | + ) |
| 192 | + bucket.upload_file(head_node_start_script_rendered, headnode_start_filename) |
| 193 | + |
| 194 | + # TODO: Remove after testing: BEGIN: added compute custom action to force the configuraiton of IMEX |
| 195 | + compute_configured_filename = "compute_node_configured.sh" |
| 196 | + bucket.upload_file(str(test_datadir / compute_configured_filename), compute_configured_filename) |
| 197 | + # TODO: Remove after testing: END |
| 198 | + |
| 199 | + queue_name = "q1" |
| 200 | + compute_resource_with_imex = "cr1" |
| 201 | + compute_resource_without_imex = "cr2" |
| 202 | + |
| 203 | + cluster_config = pcluster_config_reader( |
| 204 | + bucket_name=bucket_name, |
| 205 | + head_node_start_script=headnode_start_filename, |
| 206 | + compute_node_configured_script=compute_configured_filename, |
| 207 | + max_queue_size=max_queue_size, |
| 208 | + queue_name=queue_name, |
| 209 | + compute_resource_with_imex=compute_resource_with_imex, |
| 210 | + compute_resource_without_imex=compute_resource_without_imex, |
| 211 | + ) |
| 212 | + cluster = clusters_factory(cluster_config) |
| 213 | + |
| 214 | + assert_imex_healthy(cluster, queue_name, compute_resource_with_imex, max_queue_size) |
| 215 | + |
| 216 | + # IMEX is not configured on compute resource thta do not support it |
| 217 | + assert_imex_not_configured(cluster, queue_name, compute_resource_without_imex) |
| 218 | + |
| 219 | + # Forcefully terminate a compute node in the compute resource supporting IMEX |
| 220 | + # to simulate an outage that forces the replacement of the node and consequently the IMEX reconfiguration. |
| 221 | + terminate_nodes_manually( |
| 222 | + [cluster.get_compute_nodes(queue_name, compute_resource_with_imex)[0].get("InstanceId")], region |
| 223 | + ) |
| 224 | + assert_imex_healthy(cluster, queue_name, compute_resource_with_imex, max_queue_size) |
0 commit comments