|
19 | 19 | import boto3 |
20 | 20 | import yaml |
21 | 21 | from framework.credential_providers import run_pcluster_command |
| 22 | +from remote_command_executor import RemoteCommandExecutor |
22 | 23 | from retrying import retry |
23 | 24 | from time_utils import minutes, seconds |
24 | 25 | from utils import ( |
|
34 | 35 | retry_if_subprocess_error, |
35 | 36 | ) |
36 | 37 |
|
| 38 | +from tests.common.utils import read_remote_file |
| 39 | + |
| 40 | +TAG_CLUSTER_NAME = "parallelcluster:cluster-name" |
| 41 | +TAG_NODE_TYPE = "parallelcluster:node-type" |
| 42 | +TAG_QUEUE_NAME = "parallelcluster:queue-name" |
| 43 | +TAG_QCOMPUTE_RESOURCE_NAME = "parallelcluster:compute-resource-name" |
| 44 | + |
| 45 | +LAUNCH_TEMPLATES_CONFIG_FILE = "/opt/parallelcluster/shared/launch-templates-config.json" |
| 46 | + |
37 | 47 |
|
38 | 48 | def suppress_and_log_exception(func): |
39 | 49 | @functools.wraps(func) |
@@ -253,6 +263,42 @@ def describe_cluster_instances(self, node_type=None, queue_name=None): |
253 | 263 | logging.error("Failed when getting cluster instances with error:\n%s\nand output:\n%s", e.stderr, e.stdout) |
254 | 264 | raise |
255 | 265 |
|
| 266 | + def get_compute_nodes(self, queue_name: str = None, compute_resource_name: str = None, status: str = "running"): |
| 267 | + """Return the EC2 instance details for compute nodes matching the provided criteria.""" |
| 268 | + ec2 = boto3.client("ec2", region_name=self.region) |
| 269 | + filters = [ |
| 270 | + {"Name": f"tag:{TAG_CLUSTER_NAME}", "Values": [self.cfn_name]}, |
| 271 | + {"Name": f"tag:{TAG_NODE_TYPE}", "Values": ["Compute"]}, |
| 272 | + {"Name": "instance-state-name", "Values": [status]}, |
| 273 | + ] |
| 274 | + |
| 275 | + if queue_name: |
| 276 | + filters.append({"Name": f"tag:{TAG_QUEUE_NAME}", "Values": [queue_name]}) |
| 277 | + if compute_resource_name: |
| 278 | + filters.append({"Name": f"tag:{TAG_QCOMPUTE_RESOURCE_NAME}", "Values": [compute_resource_name]}) |
| 279 | + |
| 280 | + return ec2.describe_instances(Filters=filters).get("Reservations")[0].get("Instances") |
| 281 | + |
| 282 | + def get_compute_nodes_private_ip( |
| 283 | + self, queue_name: str = None, compute_resource_name: str = None, status: str = "running" |
| 284 | + ): |
| 285 | + """Return the private IP address of compute nodes matching the provided criteria.""" |
| 286 | + return [i.get("PrivateIpAddress") for i in self.get_compute_nodes(queue_name, compute_resource_name, status)] |
| 287 | + |
| 288 | + def get_compute_nodes_launch_template_logical_id(self, queue_name: str, compute_resource_name: str): |
| 289 | + """Return the launch template logical id of compute nodes matching the provided criteria.""" |
| 290 | + launch_templates_config = json.loads( |
| 291 | + read_remote_file(RemoteCommandExecutor(self), LAUNCH_TEMPLATES_CONFIG_FILE) |
| 292 | + ) |
| 293 | + return ( |
| 294 | + launch_templates_config.get("Queues", {}) |
| 295 | + .get(queue_name, {}) |
| 296 | + .get("ComputeResources", {}) |
| 297 | + .get(compute_resource_name, {}) |
| 298 | + .get("LaunchTemplate", {}) |
| 299 | + .get("LogicalId") |
| 300 | + ) |
| 301 | + |
256 | 302 | def get_cluster_instance_ids(self, node_type=None, queue_name=None): |
257 | 303 | """Run pcluster describe-cluster-instances and collect instance ids.""" |
258 | 304 | instances = self.describe_cluster_instances(node_type=node_type, queue_name=queue_name) |
|
0 commit comments