Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combine RequestBody delegate, source state to fix hang #666

Merged
merged 8 commits into from
Jan 29, 2025
108 changes: 89 additions & 19 deletions Sources/HummingbirdCore/Request/RequestBody.swift
Original file line number Diff line number Diff line change
Expand Up @@ -134,36 +134,112 @@
/// Delegate for NIOThrowingAsyncSequenceProducer
@usableFromInline
final class Delegate: NIOAsyncSequenceProducerDelegate, Sendable {
let checkedContinuations: NIOLockedValueBox<Deque<CheckedContinuation<Void, Never>>>
enum State {
case produceMore
case waitingForProduceMore(CheckedContinuation<Void, Never>?)
case multipleWaitingForProduceMore(Deque<CheckedContinuation<Void, Never>>)
case terminated
}
let state: NIOLockedValueBox<State>

@usableFromInline
init() {
self.checkedContinuations = .init([])
self.state = .init(.produceMore)
}

@usableFromInline
func produceMore() {
self.checkedContinuations.withLockedValue {
if let cont = $0.popFirst() {
cont.resume()
self.state.withLockedValue { state in
switch state {
case .produceMore:
break
case .waitingForProduceMore(let continuation):
if let continuation {
continuation.resume()
}
state = .produceMore

case .multipleWaitingForProduceMore(var continuations):
if let cont = continuations.popFirst() {
cont.resume()
}
if continuations.count > 0 {
state = .multipleWaitingForProduceMore(continuations)
} else {
state = .produceMore

Check warning on line 169 in Sources/HummingbirdCore/Request/RequestBody.swift

View check run for this annotation

Codecov / codecov/patch

Sources/HummingbirdCore/Request/RequestBody.swift#L163-L169

Added lines #L163 - L169 were not covered by tests
}
case .terminated:
preconditionFailure("Unexpected state")

Check warning on line 172 in Sources/HummingbirdCore/Request/RequestBody.swift

View check run for this annotation

Codecov / codecov/patch

Sources/HummingbirdCore/Request/RequestBody.swift#L172

Added line #L172 was not covered by tests
}
}
}

@usableFromInline
func didTerminate() {
self.checkedContinuations.withLockedValue {
while let cont = $0.popFirst() {
cont.resume()
self.state.withLockedValue { state in
switch state {
case .produceMore:
break
case .waitingForProduceMore(let continuation):
if let continuation {
continuation.resume()

Check warning on line 185 in Sources/HummingbirdCore/Request/RequestBody.swift

View check run for this annotation

Codecov / codecov/patch

Sources/HummingbirdCore/Request/RequestBody.swift#L185

Added line #L185 was not covered by tests
}
state = .terminated
case .multipleWaitingForProduceMore(var continuations):
while let cont = continuations.popFirst() {
cont.resume()
}
state = .terminated

Check warning on line 192 in Sources/HummingbirdCore/Request/RequestBody.swift

View check run for this annotation

Codecov / codecov/patch

Sources/HummingbirdCore/Request/RequestBody.swift#L189-L192

Added lines #L189 - L192 were not covered by tests
case .terminated:
preconditionFailure("Unexpected state")

Check warning on line 194 in Sources/HummingbirdCore/Request/RequestBody.swift

View check run for this annotation

Codecov / codecov/patch

Sources/HummingbirdCore/Request/RequestBody.swift#L194

Added line #L194 was not covered by tests
}
}
}

@usableFromInline
func waitForProduceMore() async {
await withCheckedContinuation { (cont: CheckedContinuation<Void, Never>) in
self.checkedContinuations.withLockedValue {
$0.append(cont)
switch self.state.withLockedValue({ $0 }) {
case .produceMore, .terminated:
adam-fowler marked this conversation as resolved.
Show resolved Hide resolved
break
case .waitingForProduceMore, .multipleWaitingForProduceMore:
await withCheckedContinuation { (newContinuation: CheckedContinuation<Void, Never>) in
self.state.withLockedValue { state in
switch state {
case .produceMore:
newContinuation.resume()

Check warning on line 209 in Sources/HummingbirdCore/Request/RequestBody.swift

View check run for this annotation

Codecov / codecov/patch

Sources/HummingbirdCore/Request/RequestBody.swift#L209

Added line #L209 was not covered by tests
case .waitingForProduceMore(let firstContinuation):
if let firstContinuation {
var continuations = Deque<CheckedContinuation<Void, Never>>()
continuations.reserveCapacity(2)
continuations.append(firstContinuation)
continuations.append(newContinuation)
state = .multipleWaitingForProduceMore(continuations)

Check warning on line 216 in Sources/HummingbirdCore/Request/RequestBody.swift

View check run for this annotation

Codecov / codecov/patch

Sources/HummingbirdCore/Request/RequestBody.swift#L212-L216

Added lines #L212 - L216 were not covered by tests
} else {
state = .waitingForProduceMore(newContinuation)
}
case .multipleWaitingForProduceMore(var continuations):
continuations.append(newContinuation)
state = .multipleWaitingForProduceMore(continuations)

Check warning on line 222 in Sources/HummingbirdCore/Request/RequestBody.swift

View check run for this annotation

Codecov / codecov/patch

Sources/HummingbirdCore/Request/RequestBody.swift#L221-L222

Added lines #L221 - L222 were not covered by tests
case .terminated:
newContinuation.resume()

Check warning on line 224 in Sources/HummingbirdCore/Request/RequestBody.swift

View check run for this annotation

Codecov / codecov/patch

Sources/HummingbirdCore/Request/RequestBody.swift#L224

Added line #L224 was not covered by tests
}
}
}
}
}

@usableFromInline
func stopProducing() {
self.state.withLockedValue { state in
switch state {
case .produceMore:
state = .waitingForProduceMore(nil)
case .waitingForProduceMore:
break

Check warning on line 238 in Sources/HummingbirdCore/Request/RequestBody.swift

View check run for this annotation

Codecov / codecov/patch

Sources/HummingbirdCore/Request/RequestBody.swift#L238

Added line #L238 was not covered by tests
case .multipleWaitingForProduceMore:
break

Check warning on line 240 in Sources/HummingbirdCore/Request/RequestBody.swift

View check run for this annotation

Codecov / codecov/patch

Sources/HummingbirdCore/Request/RequestBody.swift#L240

Added line #L240 was not covered by tests
case .terminated:
break

Check warning on line 242 in Sources/HummingbirdCore/Request/RequestBody.swift

View check run for this annotation

Codecov / codecov/patch

Sources/HummingbirdCore/Request/RequestBody.swift#L242

Added line #L242 was not covered by tests
}
}
}
Expand All @@ -175,14 +251,11 @@
let source: Producer.Source
@usableFromInline
let delegate: Delegate
@usableFromInline
let waitForProduceMore: NIOLockedValueBox<Bool>

@usableFromInline
init(source: Producer.Source, delegate: Delegate) {
self.source = source
self.delegate = delegate
self.waitForProduceMore = .init(false)
}

/// Yields the element to the inbound stream.
Expand All @@ -195,13 +268,10 @@
public func yield(_ element: ByteBuffer) async throws {
// if previous call indicated we should stop producing wait until the delegate
// says we can start producing again
if self.waitForProduceMore.withLockedValue({ $0 }) {
await self.delegate.waitForProduceMore()
self.waitForProduceMore.withLockedValue { $0 = false }
}
await self.delegate.waitForProduceMore()
let result = self.source.yield(element)
if result == .stopProducing {
self.waitForProduceMore.withLockedValue { $0 = true }
self.delegate.stopProducing()
}
}

Expand Down
Loading