|
18 | 18 |
|
19 | 19 | import boto3 |
20 | 20 | import yaml |
| 21 | +from assertpy import assert_that |
21 | 22 | from framework.credential_providers import run_pcluster_command |
| 23 | +from remote_command_executor import RemoteCommandExecutor |
22 | 24 | from retrying import retry |
23 | 25 | from time_utils import minutes, seconds |
24 | 26 | from utils import ( |
|
34 | 36 | retry_if_subprocess_error, |
35 | 37 | ) |
36 | 38 |
|
| 39 | +from tests.common.utils import read_remote_file |
| 40 | + |
| 41 | +TAG_CLUSTER_NAME = "parallelcluster:cluster-name" |
| 42 | +TAG_NODE_TYPE = "parallelcluster:node-type" |
| 43 | +TAG_QUEUE_NAME = "parallelcluster:queue-name" |
| 44 | +TAG_COMPUTE_RESOURCE_NAME = "parallelcluster:compute-resource-name" |
| 45 | + |
| 46 | +LAUNCH_TEMPLATES_CONFIG_FILE = "/opt/parallelcluster/shared/launch-templates-config.json" |
| 47 | + |
37 | 48 |
|
38 | 49 | def suppress_and_log_exception(func): |
39 | 50 | @functools.wraps(func) |
@@ -253,6 +264,63 @@ def describe_cluster_instances(self, node_type=None, queue_name=None): |
253 | 264 | logging.error("Failed when getting cluster instances with error:\n%s\nand output:\n%s", e.stderr, e.stdout) |
254 | 265 | raise |
255 | 266 |
|
| 267 | + @retry(wait_fixed=seconds(5), stop_max_delay=minutes(1)) |
| 268 | + def get_compute_nodes( |
| 269 | + self, |
| 270 | + queue_name: str = None, |
| 271 | + compute_resource_name: str = None, |
| 272 | + state: list = None, |
| 273 | + expected_num_nodes: int = None, |
| 274 | + ): |
| 275 | + """Return the EC2 instance details for compute nodes matching the provided criteria.""" |
| 276 | + state = ["running"] if state is None else state |
| 277 | + ec2 = boto3.client("ec2", region_name=self.region) |
| 278 | + filters = [ |
| 279 | + {"Name": f"tag:{TAG_CLUSTER_NAME}", "Values": [self.cfn_name]}, |
| 280 | + {"Name": f"tag:{TAG_NODE_TYPE}", "Values": ["Compute"]}, |
| 281 | + {"Name": "instance-state-name", "Values": state}, |
| 282 | + ] |
| 283 | + |
| 284 | + if queue_name: |
| 285 | + filters.append({"Name": f"tag:{TAG_QUEUE_NAME}", "Values": [queue_name]}) |
| 286 | + if compute_resource_name: |
| 287 | + filters.append({"Name": f"tag:{TAG_COMPUTE_RESOURCE_NAME}", "Values": [compute_resource_name]}) |
| 288 | + |
| 289 | + instances = [] |
| 290 | + for reservation in ec2.describe_instances(Filters=filters).get("Reservations"): |
| 291 | + instances.extend(reservation.get("Instances", [])) |
| 292 | + |
| 293 | + if expected_num_nodes: |
| 294 | + assert_that(instances).is_length(expected_num_nodes) |
| 295 | + |
| 296 | + return instances |
| 297 | + |
| 298 | + def get_compute_nodes_private_ip( |
| 299 | + self, |
| 300 | + queue_name: str = None, |
| 301 | + compute_resource_name: str = None, |
| 302 | + state: list = None, |
| 303 | + expected_num_nodes: int = None, |
| 304 | + ): |
| 305 | + """Return the private IP address of compute nodes matching the provided criteria.""" |
| 306 | + instances = self.get_compute_nodes(queue_name, compute_resource_name, state, expected_num_nodes) |
| 307 | + return [i.get("PrivateIpAddress") for i in instances] |
| 308 | + |
| 309 | + def get_compute_nodes_launch_template_logical_id(self, queue_name: str, compute_resource_name: str): |
| 310 | + """Return the launch template logical id of compute nodes matching the provided criteria.""" |
| 311 | + launch_templates_config = json.loads( |
| 312 | + read_remote_file(RemoteCommandExecutor(self), LAUNCH_TEMPLATES_CONFIG_FILE) |
| 313 | + ) |
| 314 | + logging.info(f"Read launch template config from {LAUNCH_TEMPLATES_CONFIG_FILE}: {launch_templates_config}") |
| 315 | + return ( |
| 316 | + launch_templates_config.get("Queues", {}) |
| 317 | + .get(queue_name, {}) |
| 318 | + .get("ComputeResources", {}) |
| 319 | + .get(compute_resource_name, {}) |
| 320 | + .get("LaunchTemplate", {}) |
| 321 | + .get("LogicalId") |
| 322 | + ) |
| 323 | + |
256 | 324 | def get_cluster_instance_ids(self, node_type=None, queue_name=None): |
257 | 325 | """Run pcluster describe-cluster-instances and collect instance ids.""" |
258 | 326 | instances = self.describe_cluster_instances(node_type=node_type, queue_name=queue_name) |
|
0 commit comments