Skip to content

Commit ec0d2c6

Browse files
authored
Reach target parallelism in .limitedParallelism even if dispatcher throws (#4330)
1 parent b9899b5 commit ec0d2c6

File tree

4 files changed

+116
-15
lines changed

4 files changed

+116
-15
lines changed

kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ public abstract class CoroutineDispatcher :
201201
*
202202
* This method should generally be exception-safe. An exception thrown from this method
203203
* may leave the coroutines that use this dispatcher in an inconsistent and hard-to-debug state.
204+
* It is assumed that if any exceptions do get thrown from this method, then [block] will not be executed.
204205
*
205206
* This method must not immediately call [block]. Doing so may result in `StackOverflowError`
206207
* when `dispatch` is invoked repeatedly, for example when [yield] is called in a loop.

kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,17 @@ internal class LimitedDispatcher(
6565
// `runningWorkers` when they observed an empty queue.
6666
if (!tryAllocateWorker()) return
6767
val task = obtainTaskOrDeallocateWorker() ?: return
68-
startWorker(Worker(task))
68+
try {
69+
startWorker(Worker(task))
70+
} catch (e: Throwable) {
71+
/* If we failed to start a worker, we should decrement the counter.
72+
The queue is in an inconsistent state--it's non-empty despite the target parallelism not having been
73+
reached--but at least a properly functioning worker will have a chance to correct this if some future
74+
dispatch does succeed.
75+
If we don't decrement the counter, it will be impossible to ever reach the target parallelism again. */
76+
runningWorkers.decrementAndGet()
77+
throw e
78+
}
6979
}
7080

7181
/**
@@ -107,21 +117,29 @@ internal class LimitedDispatcher(
107117
*/
108118
private inner class Worker(private var currentTask: Runnable) : Runnable {
109119
override fun run() {
110-
var fairnessCounter = 0
111-
while (true) {
112-
try {
113-
currentTask.run()
114-
} catch (e: Throwable) {
115-
handleCoroutineException(EmptyCoroutineContext, e)
120+
try {
121+
var fairnessCounter = 0
122+
while (true) {
123+
try {
124+
currentTask.run()
125+
} catch (e: Throwable) {
126+
handleCoroutineException(EmptyCoroutineContext, e)
127+
}
128+
currentTask = obtainTaskOrDeallocateWorker() ?: return
129+
// 16 is our out-of-thin-air constant to emulate fairness. Used in JS dispatchers as well
130+
if (++fairnessCounter >= 16 && dispatcher.safeIsDispatchNeeded(this@LimitedDispatcher)) {
131+
// Do "yield" to let other views execute their runnable as well
132+
// Note that we do not decrement 'runningWorkers' as we are still committed to our part of work
133+
dispatcher.safeDispatch(this@LimitedDispatcher, this)
134+
return
135+
}
116136
}
117-
currentTask = obtainTaskOrDeallocateWorker() ?: return
118-
// 16 is our out-of-thin-air constant to emulate fairness. Used in JS dispatchers as well
119-
if (++fairnessCounter >= 16 && dispatcher.safeIsDispatchNeeded(this@LimitedDispatcher)) {
120-
// Do "yield" to let other views execute their runnable as well
121-
// Note that we do not decrement 'runningWorkers' as we are still committed to our part of work
122-
dispatcher.safeDispatch(this@LimitedDispatcher, this)
123-
return
137+
} catch (e: Throwable) {
138+
// If the worker failed, we should deallocate its slot
139+
synchronized(workerAllocationLock) {
140+
runningWorkers.decrementAndGet()
124141
}
142+
throw e
125143
}
126144
}
127145
}
@@ -132,4 +150,4 @@ internal fun Int.checkParallelism() = require(this >= 1) { "Expected positive pa
132150
internal fun CoroutineDispatcher.namedOrThis(name: String?): CoroutineDispatcher {
133151
if (name != null) return NamedDispatcher(this, name)
134152
return this
135-
}
153+
}

kotlinx-coroutines-core/common/test/LimitedParallelismSharedTest.kt

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package kotlinx.coroutines
22

33
import kotlinx.coroutines.testing.*
4+
import kotlin.coroutines.CoroutineContext
5+
import kotlin.coroutines.EmptyCoroutineContext
46
import kotlin.test.*
57

68
class LimitedParallelismSharedTest : TestBase() {
@@ -28,4 +30,30 @@ class LimitedParallelismSharedTest : TestBase() {
2830
assertFailsWith<IllegalArgumentException> { Dispatchers.Default.limitedParallelism(Int.MIN_VALUE) }
2931
Dispatchers.Default.limitedParallelism(Int.MAX_VALUE)
3032
}
33+
34+
/**
35+
* Checks that even if the dispatcher sporadically fails, the limited dispatcher will still allow reaching the
36+
* target parallelism level.
37+
*/
38+
@Test
39+
fun testLimitedParallelismOfOccasionallyFailingDispatcher() {
40+
val limit = 5
41+
var doFail = false
42+
val workerQueue = mutableListOf<Runnable>()
43+
val limited = object: CoroutineDispatcher() {
44+
override fun dispatch(context: CoroutineContext, block: Runnable) {
45+
if (doFail) throw TestException()
46+
workerQueue.add(block)
47+
}
48+
}.limitedParallelism(limit)
49+
repeat(6 * limit) {
50+
try {
51+
limited.dispatch(EmptyCoroutineContext, Runnable { /* do nothing */ })
52+
} catch (_: DispatchException) {
53+
// ignore
54+
}
55+
doFail = !doFail
56+
}
57+
assertEquals(limit, workerQueue.size)
58+
}
3159
}

kotlinx-coroutines-core/jvm/test/LimitedParallelismStressTest.kt

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import org.junit.runner.*
77
import org.junit.runners.*
88
import java.util.concurrent.*
99
import java.util.concurrent.atomic.*
10+
import kotlin.coroutines.CoroutineContext
11+
import kotlin.coroutines.EmptyCoroutineContext
1012
import kotlin.test.*
1113

1214
@RunWith(Parameterized::class)
@@ -84,6 +86,58 @@ class LimitedParallelismStressTest(private val targetParallelism: Int) : TestBas
8486
}
8587
}
8688

89+
/**
90+
* Checks that dispatcher failures during fairness redispatches don't prevent reaching the target parallelism.
91+
*/
92+
@Test
93+
fun testLimitedFailingDispatcherReachesTargetParallelism() = runTest {
94+
val keepFailing = AtomicBoolean(true)
95+
val occasionallyFailing = object: CoroutineDispatcher() {
96+
override fun dispatch(context: CoroutineContext, block: Runnable) {
97+
if (keepFailing.get() && ThreadLocalRandom.current().nextBoolean()) throw TestException()
98+
executor.dispatch(context, block)
99+
}
100+
}.limitedParallelism(targetParallelism)
101+
doStress {
102+
repeat(1000) {
103+
keepFailing.set(true) // we want the next tasks to sporadically fail
104+
// Start some tasks to make sure redispatching for fairness is happening
105+
repeat(targetParallelism * 16 + 1) {
106+
// targetParallelism * 16 + 1 because we need at least one worker to go through a fairness yield
107+
// with high probability.
108+
try {
109+
occasionallyFailing.dispatch(EmptyCoroutineContext, Runnable {
110+
// do nothing.
111+
})
112+
} catch (_: DispatchException) {
113+
// ignore
114+
}
115+
}
116+
keepFailing.set(false) // we want the next tasks to succeed
117+
val barrier = CyclicBarrier(targetParallelism + 1)
118+
repeat(targetParallelism) {
119+
launch(occasionallyFailing) {
120+
barrier.await()
121+
}
122+
}
123+
val success = launch(Dispatchers.Default) {
124+
// Successfully awaited parallelism + 1
125+
barrier.await()
126+
}
127+
// Feed the dispatcher with more tasks to make sure it's not stuck
128+
while (success.isActive) {
129+
Thread.sleep(1)
130+
repeat(targetParallelism) {
131+
occasionallyFailing.dispatch(EmptyCoroutineContext, Runnable {
132+
// do nothing.
133+
})
134+
}
135+
}
136+
coroutineContext.job.children.toList().joinAll()
137+
}
138+
}
139+
}
140+
87141
private suspend inline fun doStress(crossinline block: suspend CoroutineScope.() -> Unit) {
88142
repeat(stressTestMultiplier) {
89143
coroutineScope {

0 commit comments

Comments
 (0)