Skip to content

Set tolerance to zero when using Task.sleep #2225

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ extension ClientRPCExecutor.HedgingExecutor {
if let deadline = self.deadline {
group.addTask {
let result = await Result {
try await Task.sleep(until: deadline, clock: .continuous)
try await Task.sleep(until: deadline, tolerance: .zero, clock: .continuous)
}
return .timedOut(result)
}
Expand Down Expand Up @@ -533,7 +533,7 @@ extension ClientRPCExecutor.HedgingExecutor {
self._isPushback = pushback
self._handle = group.addCancellableTask {
do {
try await Task.sleep(for: delay, clock: .continuous)
try await Task.sleep(for: delay, tolerance: .zero, clock: .continuous)
return .scheduledAttemptFired(.ran)
} catch {
return .scheduledAttemptFired(.cancelled)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func withDeadline<Result: Sendable>(
return await withTaskGroup(of: _DeadlineChildTaskResult<Result>.self) { group in
group.addTask {
do {
try await Task.sleep(until: deadline)
try await Task.sleep(until: deadline, tolerance: .zero)
return .deadlinePassed
} catch {
return .timeoutCancelled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ extension ClientRPCExecutor.RetryExecutor {
if let deadline = self.deadline {
group.addTask {
let result = await Result {
try await Task.sleep(until: deadline, clock: .continuous)
try await Task.sleep(until: deadline, tolerance: .zero, clock: .continuous)
}
return .timedOut(result)
}
Expand Down Expand Up @@ -155,11 +155,16 @@ extension ClientRPCExecutor.RetryExecutor {
// If the delay is overridden with server pushback then reset the iterator for the
// next retry.
delayIterator = delaySequence.makeIterator()
try? await Task.sleep(until: .now.advanced(by: delayOverride), clock: .continuous)
try? await Task.sleep(
until: .now.advanced(by: delayOverride),
tolerance: .zero,
clock: .continuous
)
} else {
// The delay iterator never terminates.
try? await Task.sleep(
until: .now.advanced(by: delayIterator.next()!),
tolerance: .zero,
clock: .continuous
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ struct ServerRPCExecutor {
await withTaskGroup(of: Void.self) { group in
group.addTask {
do {
try await Task.sleep(for: timeout, clock: .continuous)
try await Task.sleep(for: timeout, tolerance: .zero, clock: .continuous)
context.cancellation.cancel()
} catch {
() // Only cancel the RPC if the timeout completes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ extension ClientRPCExecutorTestHarness.ServerStreamHandler {

static func sleepFor(duration: Duration, then handler: Self) -> Self {
return Self { stream in
try await Task.sleep(until: .now.advanced(by: duration), clock: .continuous)
try await Task.sleep(until: .now.advanced(by: duration), tolerance: .zero, clock: .continuous)
try await handler.handle(stream: stream)
}
}
Expand Down
8 changes: 4 additions & 4 deletions Tests/GRPCCoreTests/GRPCClientTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ final class GRPCClientTests: XCTestCase {
transport: inProcess.client,
interceptorPipeline: interceptorPipeline
) { client in
try await Task.sleep(for: .milliseconds(100))
try await Task.sleep(for: .milliseconds(100), tolerance: .zero)
try await body(client, server)
}
}
Expand Down Expand Up @@ -341,7 +341,7 @@ final class GRPCClientTests: XCTestCase {
let task = Task {
try await client.clientStreaming(
request: StreamingClientRequest { writer in
try await Task.sleep(for: .seconds(5))
try await Task.sleep(for: .seconds(5), tolerance: .zero)
},
descriptor: BinaryEcho.Methods.collect,
serializer: IdentitySerializer(),
Expand Down Expand Up @@ -382,7 +382,7 @@ final class GRPCClientTests: XCTestCase {
// Run the client.
let task = Task { try await client.runConnections() }
// Make sure the client is run for the first time here.
try await Task.sleep(for: .milliseconds(10))
try await Task.sleep(for: .milliseconds(10), tolerance: .zero)

// Client is already running, should throw an error.
await XCTAssertThrowsErrorAsync(ofType: RuntimeError.self) {
Expand Down Expand Up @@ -545,7 +545,7 @@ struct ClientTests {
}

// Make sure both server and client are running
try await Task.sleep(for: .milliseconds(100))
try await Task.sleep(for: .milliseconds(100), tolerance: .zero)
try await body(client, server)
client.beginGracefulShutdown()
server.beginGracefulShutdown()
Expand Down
2 changes: 1 addition & 1 deletion Tests/GRPCCoreTests/Internal/Result+CatchingTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import XCTest
final class ResultCatchingTests: XCTestCase {
func testResultCatching() async {
let result = await Result {
try? await Task.sleep(nanoseconds: 1)
try? await Task.sleep(for: .nanoseconds(1), tolerance: .zero)
throw RPCError(code: .unknown, message: "foo")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ final class InProcessClientTransportTests: XCTestCase {
try await client.connect()
}
group.addTask {
try await Task.sleep(for: .milliseconds(100))
try await Task.sleep(for: .milliseconds(100), tolerance: .zero)
}

try await group.next()
Expand Down Expand Up @@ -97,7 +97,7 @@ final class InProcessClientTransportTests: XCTestCase {
try await client.connect()
}
group.addTask {
try await Task.sleep(for: .milliseconds(100))
try await Task.sleep(for: .milliseconds(100), tolerance: .zero)
}

try await group.next()
Expand All @@ -121,7 +121,7 @@ final class InProcessClientTransportTests: XCTestCase {
group.addTask {
// Add a sleep to make sure connection happens after `withStream` has been called,
// to test pending streams are handled correctly.
try await Task.sleep(for: .milliseconds(100))
try await Task.sleep(for: .milliseconds(100), tolerance: .zero)
try await client.connect()
}

Expand Down Expand Up @@ -171,7 +171,7 @@ final class InProcessClientTransportTests: XCTestCase {
}

group.addTask {
try await Task.sleep(for: .milliseconds(100))
try await Task.sleep(for: .milliseconds(100), tolerance: .zero)
client.beginGracefulShutdown()
}

Expand Down Expand Up @@ -252,18 +252,18 @@ final class InProcessClientTransportTests: XCTestCase {

group.addTask {
try await client.withStream(descriptor: .testTest, options: .defaults) { stream, _ in
try await Task.sleep(for: .milliseconds(100))
try await Task.sleep(for: .milliseconds(100), tolerance: .zero)
}
}

group.addTask {
try await client.withStream(descriptor: .testTest, options: .defaults) { stream, _ in
try await Task.sleep(for: .milliseconds(100))
try await Task.sleep(for: .milliseconds(100), tolerance: .zero)
}
}

group.addTask {
try await Task.sleep(for: .milliseconds(50))
try await Task.sleep(for: .milliseconds(50), tolerance: .zero)
client.beginGracefulShutdown()
}

Expand Down