|
63 | 63 | NODE_BOOTSTRAP_TIMEOUT, |
64 | 64 | ONTAP, |
65 | 65 | OPENZFS, |
| 66 | + ULTRASERVER_INSTANCE_PREFIX_LIST, |
66 | 67 | Feature, |
67 | 68 | ) |
68 | | -from pcluster.utils import get_partition, get_resource_name_from_resource_arn, to_snake_case |
| 69 | +from pcluster.utils import ( |
| 70 | + get_partition, |
| 71 | + get_resource_name_from_resource_arn, |
| 72 | + to_snake_case, |
| 73 | +) |
69 | 74 | from pcluster.validators.awsbatch_validators import ( |
70 | 75 | AwsBatchComputeInstanceTypeValidator, |
71 | 76 | AwsBatchComputeResourceSizeValidator, |
|
141 | 146 | ) |
142 | 147 | from pcluster.validators.ec2_validators import ( |
143 | 148 | AmiOsCompatibleValidator, |
| 149 | + CapacityBlockHealthStatusValidator, |
144 | 150 | CapacityReservationResourceGroupValidator, |
145 | 151 | CapacityReservationSizeValidator, |
146 | 152 | CapacityReservationValidator, |
@@ -2409,7 +2415,16 @@ def instance_types(self) -> List[str]: |
2409 | 2415 | def instance_type(self): |
2410 | 2416 | """Instance type of this compute resource.""" |
2411 | 2417 | if not self._instance_type: |
2412 | | - self._instance_type = Resource.init_param(self._instance_type_from_capacity_reservation()) |
| 2418 | + capacity_reservation_id = ( |
| 2419 | + self.capacity_reservation_target.capacity_reservation_id if self.capacity_reservation_target else None |
| 2420 | + ) |
| 2421 | + ( |
| 2422 | + instance_type_from_capacity_reservation, |
| 2423 | + _, |
| 2424 | + ) = AWSApi.instance().ec2.get_instance_type_and_reservation_type_from_capacity_reservation( |
| 2425 | + capacity_reservation_id |
| 2426 | + ) |
| 2427 | + self._instance_type = Resource.init_param(instance_type_from_capacity_reservation) |
2413 | 2428 | return self._instance_type |
2414 | 2429 |
|
2415 | 2430 | def _register_validators(self, context: ValidatorContext = None): |
@@ -2453,18 +2468,6 @@ def disable_simultaneous_multithreading_manually(self) -> bool: |
2453 | 2468 | """Return true if simultaneous multithreading must be disabled with a cookbook script.""" |
2454 | 2469 | return self.disable_simultaneous_multithreading and self._instance_type_info.default_threads_per_core() > 1 |
2455 | 2470 |
|
2456 | | - def _instance_type_from_capacity_reservation(self): |
2457 | | - """Return the instance type from the configured CapacityReservationId, if any.""" |
2458 | | - instance_type = None |
2459 | | - capacity_reservation_id = ( |
2460 | | - self.capacity_reservation_target.capacity_reservation_id if self.capacity_reservation_target else None |
2461 | | - ) |
2462 | | - if capacity_reservation_id: |
2463 | | - capacity_reservations = AWSApi.instance().ec2.describe_capacity_reservations([capacity_reservation_id]) |
2464 | | - if capacity_reservations: |
2465 | | - instance_type = capacity_reservations[0].instance_type() |
2466 | | - return instance_type |
2467 | | - |
2468 | 2471 |
|
2469 | 2472 | class _CommonQueue(BaseQueue): |
2470 | 2473 | """Represent the Common Queue resource between Slurm and future scheduler implementation.""" |
@@ -2931,6 +2934,7 @@ def __init__( |
2931 | 2934 | pool.ssh.allowed_ips = self.head_node.ssh.allowed_ips |
2932 | 2935 |
|
2933 | 2936 | self.__image_dict = None |
| 2937 | + self.__ultraserver_capacity_block_dict = None |
2934 | 2938 | # Cache capacity reservations information together to reduce number of boto3 calls. |
2935 | 2939 | # Since this cache is only used for validation, if AWSClientError happens |
2936 | 2940 | # (e.g insufficient IAM permissions to describe the capacity reservations), we catch the exception to avoid |
@@ -2986,6 +2990,53 @@ def login_nodes_subnet_ids(self): |
2986 | 2990 | subnet_ids_set.add(subnet_id) |
2987 | 2991 | return list(subnet_ids_set) |
2988 | 2992 |
|
| 2993 | + @property |
| 2994 | + def ultraserver_capacity_block_dict(self): |
| 2995 | + """ |
| 2996 | + Return a dictionary mapping ultraserver instance prefixes to their capacity block reservation IDs. |
| 2997 | +
|
| 2998 | + This property collects all capacity block reservations used by ultraserver instances |
| 2999 | + (e.g., p6e-gb200) across all queues and compute resources in the cluster configuration. |
| 3000 | +
|
| 3001 | + Returns: |
| 3002 | + dict: A dictionary where keys are ultraserver instance prefixes (e.g., 'p6e-gb200') |
| 3003 | + and values are lists of capacity reservation IDs for that instance type. |
| 3004 | +
|
| 3005 | + Example: |
| 3006 | + { |
| 3007 | + 'p6e-gb200': ['cr-123456', 'cr-789012'] |
| 3008 | + } |
| 3009 | + """ |
| 3010 | + if self.__ultraserver_capacity_block_dict: |
| 3011 | + return self.__ultraserver_capacity_block_dict |
| 3012 | + |
| 3013 | + self.__ultraserver_capacity_block_dict = {} |
| 3014 | + |
| 3015 | + # Initialize empty lists for each supported ultraserver instance prefix |
| 3016 | + for ultraserver_instance_prefix in ULTRASERVER_INSTANCE_PREFIX_LIST: |
| 3017 | + self.__ultraserver_capacity_block_dict[ultraserver_instance_prefix] = [] |
| 3018 | + |
| 3019 | + # Iterate through all queues and compute resources to find ultraserver capacity blocks |
| 3020 | + for queue in self.scheduling.queues: |
| 3021 | + for compute_resource in queue.compute_resources: |
| 3022 | + cr_target = compute_resource.capacity_reservation_target or queue.capacity_reservation_target |
| 3023 | + if cr_target and cr_target.capacity_reservation_id: |
| 3024 | + # Get instance type and reservation type from the capacity reservation |
| 3025 | + ( |
| 3026 | + instance_type, |
| 3027 | + reservation_type, |
| 3028 | + ) = AWSApi.instance().ec2.get_instance_type_and_reservation_type_from_capacity_reservation( |
| 3029 | + cr_target.capacity_reservation_id |
| 3030 | + ) |
| 3031 | + # Extract instance prefix (e.g., 'p6e-gb200' from 'p6e-gb200.36xlarge') |
| 3032 | + instance_prefix = instance_type.split(".")[0] |
| 3033 | + # Only collect capacity blocks for ultraserver instances |
| 3034 | + if reservation_type == "capacity-block" and instance_prefix in ULTRASERVER_INSTANCE_PREFIX_LIST: |
| 3035 | + self.__ultraserver_capacity_block_dict.get(instance_prefix).append( |
| 3036 | + cr_target.capacity_reservation_id |
| 3037 | + ) |
| 3038 | + return self.__ultraserver_capacity_block_dict |
| 3039 | + |
2989 | 3040 | def _register_login_node_validators(self): |
2990 | 3041 | """Register all login node validators to ensure that the resource parameters are valid.""" |
2991 | 3042 | # Check if all subnets(head node, Login nodes, compute nodes) are in the same VPC and support DNS. |
@@ -3223,6 +3274,13 @@ def _register_validators(self, context: ValidatorContext = None): # noqa: C901 |
3223 | 3274 | num_of_instances=num_of_instances, |
3224 | 3275 | ) |
3225 | 3276 |
|
| 3277 | + for ultraserver_instance_prefix in ULTRASERVER_INSTANCE_PREFIX_LIST: |
| 3278 | + if self.ultraserver_capacity_block_dict.get(ultraserver_instance_prefix): |
| 3279 | + self._register_validator( |
| 3280 | + CapacityBlockHealthStatusValidator, |
| 3281 | + capacity_reservation_ids=self.ultraserver_capacity_block_dict.get(ultraserver_instance_prefix), |
| 3282 | + ) |
| 3283 | + |
3226 | 3284 | @property |
3227 | 3285 | def image_dict(self): |
3228 | 3286 | """Return image dict of queues, key is queue name, value is image id.""" |
|
0 commit comments