Skip to content

Solve queueSize exceeded when using job arrays #6047

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class ParallelPollingMonitor extends TaskPollingMonitor {

@Override
protected boolean canSubmit(TaskHandler handler) {
return super.canSubmit(handler) && (semaphore == null || semaphore.tryAcquire())
return super.canSubmit(handler) && (semaphore == null || semaphore.tryAcquire(handler.getForksCount()))
}

protected RateLimiter createSubmitRateLimit() {
Expand Down Expand Up @@ -95,7 +95,7 @@ class ParallelPollingMonitor extends TaskPollingMonitor {

@Override
boolean evict(TaskHandler handler) {
semaphore?.release()
semaphore?.release(handler.getForksCount())
return super.evict(handler)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,13 @@ abstract class TaskHandler {
return record
}

/**
* Determine the number of forks consumed by the task handler.
*/
int getForksCount() {
return task instanceof TaskArrayRun ? task.getArraySize() : 1
}

/**
* Determine if a process can be forked i.e. can launch
* a parallel task execution. This is only enforced when
Expand All @@ -266,7 +273,7 @@ abstract class TaskHandler {
*/
boolean canForkProcess() {
final max = task.processor.maxForks
return !max ? true : task.processor.forksCount < max
return !max ? true : task.processor.forksCount + getForksCount() <= max
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not be task.processor.forksCount * getForksCount(), I mean multiplied?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why would it be multiplied?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I was totally confused by the getForksCount naming. This new attribute should be named differently because it overlaps with processor.forksCount that has a complete different meaning. Something like getRunsCount and getArraySize would be much better

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how are they different in meaning?

}

/**
Expand All @@ -283,14 +290,18 @@ abstract class TaskHandler {
* Increment the number of current forked processes
*/
final void incProcessForks() {
task.processor.forksCount?.increment()
def count = task.processor.forksCount
if( count != null )
count.add(getForksCount())
}

/**
* Decrement the number of current forked processes
*/
final void decProcessForks() {
task.processor.forksCount?.decrement()
def count = task.processor.forksCount
if( count != null )
count.add(-getForksCount())
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,12 @@ class TaskPollingMonitor implements TaskMonitor {
* by the polling monitor
*/
protected boolean canSubmit(TaskHandler handler) {
(capacity>0 ? runningQueue.size() < capacity : true) && handler.canForkProcess() && handler.isReady()
int slots = handler.getForksCount()
if( capacity > 0 && slots > capacity )
throw new IllegalArgumentException("Job array ${handler.task.name} exceeds the queue size (array size: $slots, queue size: $capacity)")
if( capacity > 0 && runningQueue.size() + slots > capacity )
return false
return handler.canForkProcess() && handler.isReady()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class ParallelPollingMonitorTest extends Specification {
def retry = new AtomicInteger()

def session = Mock(Session)
def handler = Mock(TaskHandler)
def handler = Spy(TaskHandler)

def opts = new ThrottlingExecutor.Options()
.retryOn(IllegalArgumentException)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@

package nextflow.processor

import java.util.concurrent.atomic.LongAdder

import nextflow.Session
import nextflow.util.Duration
import nextflow.util.RateUnit
import nextflow.util.ThrottlingExecutor
import spock.lang.Specification
import spock.lang.Unroll
/**
*
* @author Paolo Di Tommaso <[email protected]>
Expand Down Expand Up @@ -148,4 +151,59 @@ class TaskPollingMonitorTest extends Specification {
3 * session.notifyTaskSubmit(handler)
}

@Unroll
def 'should check whether job can be submitted' () {
given:
def session = Mock(Session)
def monitor = Spy(new TaskPollingMonitor(name: 'foo', session: session, capacity: CAPACITY, pollInterval: Duration.of('1min')))
and:
def processor = Mock(TaskProcessor) {
getForksCount() >> new LongAdder()
getMaxForks() >> FORKS
}
def handler = Spy(TaskHandler)
handler.task = Mock(TaskRun) {
getProcessor() >> processor
}
def arrayHandler = Spy(TaskHandler) {
isReady() >> true
}
arrayHandler.task = Mock(TaskArrayRun) {
getArraySize() >> ARRAY
getProcessor() >> processor
}

and:
SUBMIT.times { monitor.runningQueue.add(handler) }

expect:
monitor.runningQueue.size() == SUBMIT
monitor.canSubmit(arrayHandler) == EXPECTED
where:
CAPACITY | SUBMIT | FORKS | ARRAY | EXPECTED
0 | 0 | 0 | 2 | true
0 | 0 | 1 | 2 | false
10 | 5 | 0 | 5 | true
10 | 8 | 0 | 5 | false
10 | 0 | 1 | 5 | false
}

def 'should throw error if job array size exceeds queue size' () {
given:
def session = Mock(Session)
def monitor = Spy(new TaskPollingMonitor(name: 'foo', session: session, capacity: 2, pollInterval: Duration.of('1min')))
and:
def arrayHandler = Spy(TaskHandler)
arrayHandler.task = Mock(TaskArrayRun) {
getName() >> 'bar'
getArraySize() >> 4
}

when:
monitor.canSubmit(arrayHandler)
then:
def e = thrown(IllegalArgumentException)
e.message == 'Job array bar exceeds the queue size (array size: 4, queue size: 2)'
}

}