diff --git a/docs/executor.md b/docs/executor.md index 9e841fd6fe..249fa27c80 100644 --- a/docs/executor.md +++ b/docs/executor.md @@ -26,6 +26,7 @@ Resource requests and other job characteristics can be controlled via the follow - {ref}`process-arch` (only when using Fargate platform type for AWS Batch) - {ref}`process-container` - {ref}`process-containerOptions` +- {ref}`process-consumableresources` - {ref}`process-cpus` - {ref}`process-disk` (only when using Fargate platform type for AWS Batch) - {ref}`process-memory` diff --git a/docs/reference/process.md b/docs/reference/process.md index 9de71d2b73..749f1dac58 100644 --- a/docs/reference/process.md +++ b/docs/reference/process.md @@ -603,6 +603,58 @@ process hello_docker { This feature is not supported by the {ref}`k8s-executor` executor. ::: +(process-consumableresources)= + +### consumableResources + +:::{versionadded} 25.04.0-edge +::: + +The `consumableResources` directive allows you to specify custom consumable resources for AWS Batch jobs. Consumable resources enable resource-aware scheduling for rate-limited resources such as software licenses or other constrained resources. + +This directive is only supported when using the {ref}`awsbatch-executor` executor. + +For example: + +```nextflow +process licensed_software { + consumableResources [ + [type: 'my-software-license', value: 1] + ] + + script: + """ + run_licensed_software + """ +} +``` + +Multiple consumable resources can be specified: + +```nextflow +process multi_resource { + consumableResources [ + [type: 'license-type-a', value: 2], + [type: 'license-type-b', value: 1] + ] + + script: + """ + your_command_here + """ +} +``` + +Each consumable resource must be defined as a map with: +- `type`: The name of the consumable resource (must match a resource configured in your AWS Batch job queue) +- `value`: The quantity of the resource required by the task + +:::{note} +Consumable resources must be configured in your AWS Batch compute environment before they can be used. See the [AWS Batch documentation](https://docs.aws.amazon.com/batch/latest/userguide/consumable-resources.html) for more information on setting up consumable resources. +::: + +See also: [AWS Batch Consumable Resources](https://aws.amazon.com/blogs/hpc/how-to-use-rate-limited-resources-in-aws-batch-jobs-with-resource-aware-scheduling/) + (process-cpus)= ### cpus diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy index e60998e248..10d314be70 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy @@ -509,6 +509,28 @@ class TaskConfig extends LazyMap implements Cloneable { return null } + List> getConsumableResources() { + final value = get('consumableResources') + if( value == null ) + return null + if( value instanceof List ) { + // Validate the list contains maps with 'type' and 'value' + final result = new ArrayList>() + value.each { item -> + if( !(item instanceof Map) ) + throw new IllegalArgumentException("Invalid `consumableResources` item: $item [${item.getClass().getName()}] - each item must be a map with 'type' and 'value' keys") + final map = item as Map + if( !map.containsKey('type') ) + throw new IllegalArgumentException("Invalid `consumableResources` item: missing 'type' key") + if( !map.containsKey('value') ) + throw new IllegalArgumentException("Invalid `consumableResources` item: missing 'value' key") + result.add(map) + } + return result + } + throw new IllegalArgumentException("Invalid `consumableResources` directive value: $value [${value.getClass().getName()}] - must be a list of maps") + } + String getMachineType() { return get('machineType') } diff --git a/modules/nextflow/src/main/groovy/nextflow/script/dsl/ProcessBuilder.groovy b/modules/nextflow/src/main/groovy/nextflow/script/dsl/ProcessBuilder.groovy index b34958918e..08ae603a9d 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/dsl/ProcessBuilder.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/dsl/ProcessBuilder.groovy @@ -50,6 +50,7 @@ class ProcessBuilder { 'cache', 'clusterOptions', 'conda', + 'consumableResources', 'container', 'containerOptions', 'cpus', @@ -168,6 +169,13 @@ class ProcessBuilder { throw new IllegalArgumentException("Not a valid `arch` directive value: $value [${value.getClass().getName()}]") } + void consumableResources( value ) { + if( value instanceof List || value instanceof Closure ) + config.put('consumableResources', value) + else if( value != null ) + throw new IllegalArgumentException("Not a valid `consumableResources` directive value: $value [${value.getClass().getName()}] - expected a list of maps with 'type' and 'value' keys") + } + void debug(boolean value) { config.debug = value } diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchTaskHandler.groovy b/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchTaskHandler.groovy index 636e28c0ef..c8431640fe 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchTaskHandler.groovy +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchTaskHandler.groovy @@ -841,6 +841,21 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler resource : consumableResources ) { + final type = resource.get('type')?.toString() + final value = resource.get('value') + if( type && value ) { + resources << ResourceRequirement.builder() + .type(type) + .value(value.toString()) + .build() + } + } + } + if( resources ) container.resourceRequirements(resources) diff --git a/plugins/nf-amazon/src/test/nextflow/cloud/aws/batch/AwsBatchTaskHandlerTest.groovy b/plugins/nf-amazon/src/test/nextflow/cloud/aws/batch/AwsBatchTaskHandlerTest.groovy index f865f2e386..0a260a55f9 100644 --- a/plugins/nf-amazon/src/test/nextflow/cloud/aws/batch/AwsBatchTaskHandlerTest.groovy +++ b/plugins/nf-amazon/src/test/nextflow/cloud/aws/batch/AwsBatchTaskHandlerTest.groovy @@ -210,6 +210,44 @@ class AwsBatchTaskHandlerTest extends Specification { req.containerOverrides().resourceRequirements().find { it.type() == ResourceType.GPU}.value() == '2' } + def 'should set consumable resources' () { + given: + def task = Mock(TaskRun) + task.getName() >> 'batch-task' + task.getConfig() >> new TaskConfig( + memory: '2GB', + cpus: 2, + consumableResources: [ + [type: 'FOO_LICENSES', value: 1], + [type: 'OTHER_LICENSE', value: 3] + ] + ) + task.getWorkDirStr() >> 's3://my-bucket/work/dir' + and: + def executor = Spy(AwsBatchExecutor) { getAwsOptions()>> new AwsOptions() } + and: + def handler = Spy(new AwsBatchTaskHandler(executor: executor)) + + when: + def req = handler.newSubmitRequest(task) + then: + handler.getAwsOptions() >> { new AwsOptions(awsConfig: new AwsConfig(batch:[cliPath: '/bin/aws'],region: 'eu-west-1')) } + and: + _ * handler.getTask() >> task + _ * handler.fusionEnabled() >> false + 1 * handler.maxSpotAttempts() >> 0 + 1 * handler.getJobQueue(task) >> 'queue1' + 1 * handler.getJobDefinition(task) >> 'job-def:1' + and: + def res = req.containerOverrides().resourceRequirements() + res.size()==4 // VCPU, MEMORY, and 2 consumable resources + and: + req.containerOverrides().resourceRequirements().find { it.type() == ResourceType.VCPU}.value() == '2' + req.containerOverrides().resourceRequirements().find { it.type() == ResourceType.MEMORY}.value() == '2048' + req.containerOverrides().resourceRequirements().find { it.typeAsString() == 'FOO_LICENSES'}.value() == '1' + req.containerOverrides().resourceRequirements().find { it.typeAsString() == 'OTHER_LICENSE'}.value() == '3' + } + def 'should create an aws submit request with a timeout'() { given: def task = Mock(TaskRun)