We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
There was an error while loading. Please reload this page.
1 parent d63baef commit 6ff59caCopy full SHA for 6ff59ca
commons-core/jvm/src/main/scala/com/avsystem/commons/concurrent/ObservableBlockingIterator.scala
@@ -67,8 +67,10 @@ class ObservableBlockingIterator[T](
67
// after the queue got full, wait until at least half of its capacity is free before letting
68
// the Observable produce more elements
69
if (promise != null && queue.remainingCapacity >= bufferSize / 2) {
70
- promise.success(Ack.Continue)
+ // unsetting promise must happen before completing the promise or otherwise we risk throwing away
71
+ // some other promise set by `onNext` before it could be completed
72
ackPromise = null
73
+ promise.success(Ack.Continue)
74
}
75
last
76
case nonEmpty =>
0 commit comments