Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combine RequestBody delegate, source state to fix hang #666

Merged
merged 8 commits into from
Jan 29, 2025
78 changes: 60 additions & 18 deletions Sources/HummingbirdCore/Request/RequestBody.swift
Original file line number Diff line number Diff line change
Expand Up @@ -134,36 +134,84 @@ extension RequestBody {
/// Delegate for NIOThrowingAsyncSequenceProducer
@usableFromInline
final class Delegate: NIOAsyncSequenceProducerDelegate, Sendable {
let checkedContinuations: NIOLockedValueBox<Deque<CheckedContinuation<Void, Never>>>
enum State {
case produceMore
case waitingForProduceMore(Deque<CheckedContinuation<Void, Never>>)
case terminated
}
let state: NIOLockedValueBox<State>

@usableFromInline
init() {
self.checkedContinuations = .init([])
self.state = .init(.produceMore)
}

@usableFromInline
func produceMore() {
self.checkedContinuations.withLockedValue {
if let cont = $0.popFirst() {
cont.resume()

self.state.withLockedValue { state in
switch state {
case .produceMore:
break
case .waitingForProduceMore(var continuations):
if let cont = continuations.popFirst() {
cont.resume()
}
if continuations.count > 0 {
state = .waitingForProduceMore(continuations)
} else {
state = .produceMore
}
adam-fowler marked this conversation as resolved.
Show resolved Hide resolved
case .terminated:
preconditionFailure("Unexpected state")
}
}
}

@usableFromInline
func didTerminate() {
self.checkedContinuations.withLockedValue {
while let cont = $0.popFirst() {
cont.resume()
self.state.withLockedValue { state in
switch state {
case .produceMore:
break
case .waitingForProduceMore(var continuations):
while let cont = continuations.popFirst() {
cont.resume()
}
state = .terminated
case .terminated:
preconditionFailure("Unexpected state")
}
}
}

@usableFromInline
func waitForProduceMore() async {
await withCheckedContinuation { (cont: CheckedContinuation<Void, Never>) in
self.checkedContinuations.withLockedValue {
$0.append(cont)
self.state.withLockedValue { state in
switch state {
case .produceMore:
cont.resume()
case .waitingForProduceMore(var continuations):
continuations.append(cont)
state = .waitingForProduceMore(continuations)
case .terminated:
cont.resume()
}
}
}
}

@usableFromInline
func stopProducing() {
self.state.withLockedValue { state in
switch state {
case .produceMore:
state = .waitingForProduceMore([])
case .waitingForProduceMore:
break
case .terminated:
break
}
}
}
Expand All @@ -175,14 +223,11 @@ extension RequestBody {
let source: Producer.Source
@usableFromInline
let delegate: Delegate
@usableFromInline
let waitForProduceMore: NIOLockedValueBox<Bool>

@usableFromInline
init(source: Producer.Source, delegate: Delegate) {
self.source = source
self.delegate = delegate
self.waitForProduceMore = .init(false)
}

/// Yields the element to the inbound stream.
Expand All @@ -195,13 +240,10 @@ extension RequestBody {
public func yield(_ element: ByteBuffer) async throws {
// if previous call indicated we should stop producing wait until the delegate
// says we can start producing again
if self.waitForProduceMore.withLockedValue({ $0 }) {
await self.delegate.waitForProduceMore()
self.waitForProduceMore.withLockedValue { $0 = false }
}
await self.delegate.waitForProduceMore()
let result = self.source.yield(element)
if result == .stopProducing {
self.waitForProduceMore.withLockedValue { $0 = true }
self.delegate.stopProducing()
}
}

Expand Down
Loading