diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/ParallelPollingMonitor.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/ParallelPollingMonitor.groovy index 07af8f22c2..9c15029d7b 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/ParallelPollingMonitor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/ParallelPollingMonitor.groovy @@ -55,7 +55,7 @@ class ParallelPollingMonitor extends TaskPollingMonitor { @Override protected boolean canSubmit(TaskHandler handler) { - return super.canSubmit(handler) && (semaphore == null || semaphore.tryAcquire()) + return super.canSubmit(handler) && (semaphore == null || semaphore.tryAcquire(handler.getForksCount())) } protected RateLimiter createSubmitRateLimit() { @@ -95,7 +95,7 @@ class ParallelPollingMonitor extends TaskPollingMonitor { @Override boolean evict(TaskHandler handler) { - semaphore?.release() + semaphore?.release(handler.getForksCount()) return super.evict(handler) } } diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy index a0f980fe7a..3aa85ec97f 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy @@ -254,6 +254,13 @@ abstract class TaskHandler { return record } + /** + * Determine the number of forks consumed by the task handler. + */ + int getForksCount() { + return task instanceof TaskArrayRun ? task.getArraySize() : 1 + } + /** * Determine if a process can be forked i.e. can launch * a parallel task execution. This is only enforced when @@ -266,7 +273,7 @@ abstract class TaskHandler { */ boolean canForkProcess() { final max = task.processor.maxForks - return !max ? true : task.processor.forksCount < max + return !max ? true : task.processor.forksCount + getForksCount() <= max } /** @@ -283,14 +290,18 @@ abstract class TaskHandler { * Increment the number of current forked processes */ final void incProcessForks() { - task.processor.forksCount?.increment() + def count = task.processor.forksCount + if( count != null ) + count.add(getForksCount()) } /** * Decrement the number of current forked processes */ final void decProcessForks() { - task.processor.forksCount?.decrement() + def count = task.processor.forksCount + if( count != null ) + count.add(-getForksCount()) } /** diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskPollingMonitor.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskPollingMonitor.groovy index e4301c1285..b36055bea9 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskPollingMonitor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskPollingMonitor.groovy @@ -198,7 +198,12 @@ class TaskPollingMonitor implements TaskMonitor { * by the polling monitor */ protected boolean canSubmit(TaskHandler handler) { - (capacity>0 ? runningQueue.size() < capacity : true) && handler.canForkProcess() && handler.isReady() + int slots = handler.getForksCount() + if( capacity > 0 && slots > capacity ) + throw new IllegalArgumentException("Job array ${handler.task.name} exceeds the queue size (array size: $slots, queue size: $capacity)") + if( capacity > 0 && runningQueue.size() + slots > capacity ) + return false + return handler.canForkProcess() && handler.isReady() } /** diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/ParallelPollingMonitorTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/ParallelPollingMonitorTest.groovy index 76701ec5e6..a410959038 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/ParallelPollingMonitorTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/ParallelPollingMonitorTest.groovy @@ -83,7 +83,7 @@ class ParallelPollingMonitorTest extends Specification { def retry = new AtomicInteger() def session = Mock(Session) - def handler = Mock(TaskHandler) + def handler = Spy(TaskHandler) def opts = new ThrottlingExecutor.Options() .retryOn(IllegalArgumentException) diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/TaskPollingMonitorTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/TaskPollingMonitorTest.groovy index 41d4c2e812..2c85462ff8 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/TaskPollingMonitorTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/TaskPollingMonitorTest.groovy @@ -16,11 +16,14 @@ package nextflow.processor +import java.util.concurrent.atomic.LongAdder import nextflow.Session import nextflow.util.Duration import nextflow.util.RateUnit +import nextflow.util.ThrottlingExecutor import spock.lang.Specification +import spock.lang.Unroll /** * * @author Paolo Di Tommaso @@ -148,4 +151,59 @@ class TaskPollingMonitorTest extends Specification { 3 * session.notifyTaskSubmit(handler) } + @Unroll + def 'should check whether job can be submitted' () { + given: + def session = Mock(Session) + def monitor = Spy(new TaskPollingMonitor(name: 'foo', session: session, capacity: CAPACITY, pollInterval: Duration.of('1min'))) + and: + def processor = Mock(TaskProcessor) { + getForksCount() >> new LongAdder() + getMaxForks() >> FORKS + } + def handler = Spy(TaskHandler) + handler.task = Mock(TaskRun) { + getProcessor() >> processor + } + def arrayHandler = Spy(TaskHandler) { + isReady() >> true + } + arrayHandler.task = Mock(TaskArrayRun) { + getArraySize() >> ARRAY + getProcessor() >> processor + } + + and: + SUBMIT.times { monitor.runningQueue.add(handler) } + + expect: + monitor.runningQueue.size() == SUBMIT + monitor.canSubmit(arrayHandler) == EXPECTED + where: + CAPACITY | SUBMIT | FORKS | ARRAY | EXPECTED + 0 | 0 | 0 | 2 | true + 0 | 0 | 1 | 2 | false + 10 | 5 | 0 | 5 | true + 10 | 8 | 0 | 5 | false + 10 | 0 | 1 | 5 | false + } + + def 'should throw error if job array size exceeds queue size' () { + given: + def session = Mock(Session) + def monitor = Spy(new TaskPollingMonitor(name: 'foo', session: session, capacity: 2, pollInterval: Duration.of('1min'))) + and: + def arrayHandler = Spy(TaskHandler) + arrayHandler.task = Mock(TaskArrayRun) { + getName() >> 'bar' + getArraySize() >> 4 + } + + when: + monitor.canSubmit(arrayHandler) + then: + def e = thrown(IllegalArgumentException) + e.message == 'Job array bar exceeds the queue size (array size: 4, queue size: 2)' + } + }