Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/executor.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Resource requests and other job characteristics can be controlled via the follow
- {ref}`process-arch` (only when using Fargate platform type for AWS Batch)
- {ref}`process-container`
- {ref}`process-containerOptions`
- {ref}`process-consumableresources`
- {ref}`process-cpus`
- {ref}`process-disk` (only when using Fargate platform type for AWS Batch)
- {ref}`process-memory`
Expand Down
52 changes: 52 additions & 0 deletions docs/reference/process.md
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,58 @@ process hello_docker {
This feature is not supported by the {ref}`k8s-executor` executor.
:::

(process-consumableresources)=

### consumableResources

:::{versionadded} 25.04.0-edge
:::

The `consumableResources` directive allows you to specify custom consumable resources for AWS Batch jobs. Consumable resources enable resource-aware scheduling for rate-limited resources such as software licenses or other constrained resources.

This directive is only supported when using the {ref}`awsbatch-executor` executor.

For example:

```nextflow
process licensed_software {
consumableResources [
[type: 'my-software-license', value: 1]
]

script:
"""
run_licensed_software
"""
}
```

Multiple consumable resources can be specified:

```nextflow
process multi_resource {
consumableResources [
[type: 'license-type-a', value: 2],
[type: 'license-type-b', value: 1]
]

script:
"""
your_command_here
"""
}
```

Each consumable resource must be defined as a map with:
- `type`: The name of the consumable resource (must match a resource configured in your AWS Batch job queue)
- `value`: The quantity of the resource required by the task

:::{note}
Consumable resources must be configured in your AWS Batch compute environment before they can be used. See the [AWS Batch documentation](https://docs.aws.amazon.com/batch/latest/userguide/consumable-resources.html) for more information on setting up consumable resources.
:::

See also: [AWS Batch Consumable Resources](https://aws.amazon.com/blogs/hpc/how-to-use-rate-limited-resources-in-aws-batch-jobs-with-resource-aware-scheduling/)

(process-cpus)=

### cpus
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,28 @@ class TaskConfig extends LazyMap implements Cloneable {
return null
}

List<Map<String,Object>> getConsumableResources() {
final value = get('consumableResources')
if( value == null )
return null
if( value instanceof List ) {
// Validate the list contains maps with 'type' and 'value'
final result = new ArrayList<Map<String,Object>>()
value.each { item ->
if( !(item instanceof Map) )
throw new IllegalArgumentException("Invalid `consumableResources` item: $item [${item.getClass().getName()}] - each item must be a map with 'type' and 'value' keys")
final map = item as Map
if( !map.containsKey('type') )
throw new IllegalArgumentException("Invalid `consumableResources` item: missing 'type' key")
if( !map.containsKey('value') )
throw new IllegalArgumentException("Invalid `consumableResources` item: missing 'value' key")
result.add(map)
}
return result
}
throw new IllegalArgumentException("Invalid `consumableResources` directive value: $value [${value.getClass().getName()}] - must be a list of maps")
}

String getMachineType() {
return get('machineType')
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class ProcessBuilder {
'cache',
'clusterOptions',
'conda',
'consumableResources',
'container',
'containerOptions',
'cpus',
Expand Down Expand Up @@ -168,6 +169,13 @@ class ProcessBuilder {
throw new IllegalArgumentException("Not a valid `arch` directive value: $value [${value.getClass().getName()}]")
}

void consumableResources( value ) {
if( value instanceof List || value instanceof Closure )
config.put('consumableResources', value)
else if( value != null )
throw new IllegalArgumentException("Not a valid `consumableResources` directive value: $value [${value.getClass().getName()}] - expected a list of maps with 'type' and 'value' keys")
}

void debug(boolean value) {
config.debug = value
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,21 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job
resources << ResourceRequirement.builder().type(ResourceType.GPU).value(accelerator.request.toString()).build()
}

// set consumable resources
final consumableResources = task.config.getConsumableResources()
if( consumableResources ) {
for( Map<String,Object> resource : consumableResources ) {
final type = resource.get('type')?.toString()
final value = resource.get('value')
if( type && value ) {
resources << ResourceRequirement.builder()
.type(type)
.value(value.toString())
.build()
}
}
}

if( resources )
container.resourceRequirements(resources)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,44 @@ class AwsBatchTaskHandlerTest extends Specification {
req.containerOverrides().resourceRequirements().find { it.type() == ResourceType.GPU}.value() == '2'
}

def 'should set consumable resources' () {
given:
def task = Mock(TaskRun)
task.getName() >> 'batch-task'
task.getConfig() >> new TaskConfig(
memory: '2GB',
cpus: 2,
consumableResources: [
[type: 'FOO_LICENSES', value: 1],
[type: 'OTHER_LICENSE', value: 3]
]
)
task.getWorkDirStr() >> 's3://my-bucket/work/dir'
and:
def executor = Spy(AwsBatchExecutor) { getAwsOptions()>> new AwsOptions() }
and:
def handler = Spy(new AwsBatchTaskHandler(executor: executor))

when:
def req = handler.newSubmitRequest(task)
then:
handler.getAwsOptions() >> { new AwsOptions(awsConfig: new AwsConfig(batch:[cliPath: '/bin/aws'],region: 'eu-west-1')) }
and:
_ * handler.getTask() >> task
_ * handler.fusionEnabled() >> false
1 * handler.maxSpotAttempts() >> 0
1 * handler.getJobQueue(task) >> 'queue1'
1 * handler.getJobDefinition(task) >> 'job-def:1'
and:
def res = req.containerOverrides().resourceRequirements()
res.size()==4 // VCPU, MEMORY, and 2 consumable resources
and:
req.containerOverrides().resourceRequirements().find { it.type() == ResourceType.VCPU}.value() == '2'
req.containerOverrides().resourceRequirements().find { it.type() == ResourceType.MEMORY}.value() == '2048'
req.containerOverrides().resourceRequirements().find { it.typeAsString() == 'FOO_LICENSES'}.value() == '1'
req.containerOverrides().resourceRequirements().find { it.typeAsString() == 'OTHER_LICENSE'}.value() == '3'
}

def 'should create an aws submit request with a timeout'() {
given:
def task = Mock(TaskRun)
Expand Down