Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Sources/DispatchAsync/DispatchGroup.swift
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ extension DispatchAsync {
}
}

func wait() async {
public func wait() async {
await withCheckedContinuation { continuation in
queue.enqueue { [weak self] in
guard let self else { return }
Expand Down
99 changes: 99 additions & 0 deletions Tests/DispatchAsyncTests/AsyncSemaphoreTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift.org open source project
//
// Copyright (c) 2025 PassiveLogic, Inc.
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of Swift.org project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Testing

@testable import DispatchAsync

nonisolated(unsafe) private var sharedPoolCompletionCount = 0

@Suite("AsyncSemaphore Tests")
class AsyncSemaphoreTests {
@available(macOS 13, iOS 16, tvOS 16, watchOS 9, *)
@Test(.timeLimit(.minutes(1)))
func asyncSemaphoreWaitSignal() async throws {
let semaphore = AsyncSemaphore(value: 1)

// First wait should succeed immediately and bring the count to 0
await semaphore.wait()

// Launch a task that tries to wait – it should be suspended until we signal
nonisolated(unsafe) var didEnterCriticalSection = false
await withCheckedContinuation { continuation in
Task { @Sendable in
// Ensure the rest of this test doesn't
// proceed until the Task block has started executing
continuation.resume()

await semaphore.wait()
didEnterCriticalSection = true
await semaphore.signal()
}
}

// Allow the task a few cycles to reach the initial semaphore.wait()
try? await Task.sleep(nanoseconds: 1_000)

#expect(!didEnterCriticalSection) // should still be waiting

// Now release the semaphore – the waiter should proceed
await semaphore.signal()

// Wait for second signal to fire from inside the task above
// There is a timeout on this test, so if there is a problem
// we'll either hit the timeout and fail, or didEnterCriticalSection
// will be false below
await semaphore.wait()

#expect(didEnterCriticalSection) // waiter must have run
}

@Test func basicAsyncSemaphoreTest() async throws {
sharedPoolCompletionCount = 0 // Reset to 0 for each test run
let totalConcurrentPools = 10

let semaphore = AsyncSemaphore(value: 1)

await withTaskGroup(of: Void.self) { group in
for _ in 0 ..< totalConcurrentPools {
group.addTask {
// Wait for any other pools currently holding the semaphore
await semaphore.wait()

// Only one task should mutate counter at a time
//
// If there are issues with the semaphore, then
// we would expect to grab incorrect values here occasionally,
// which would result in an incorrect final completion count.
//
let existingPoolCompletionCount = sharedPoolCompletionCount

// Add artificial delay to amplify race conditions
// Pools started shortly after this "semaphore-locked"
// pool starts will run before this line, unless
// this pool contains a valid lock.
try? await Task.sleep(nanoseconds: 100)

sharedPoolCompletionCount = existingPoolCompletionCount + 1

// When we exit this flow, release our hold on the semaphore
await semaphore.signal()
}
}
}

// After all tasks are done, counter should be 10
#expect(sharedPoolCompletionCount == totalConcurrentPools)
}
}
257 changes: 168 additions & 89 deletions Tests/DispatchAsyncTests/DispatchGroupTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,109 +15,188 @@
@_spi(DispatchAsync) import DispatchAsync
import Testing

import func Foundation.sin

#if !os(WASI)
import class Foundation.Thread
#endif

private typealias DispatchGroup = DispatchAsync.DispatchGroup
private typealias DispatchQueue = DispatchAsync.DispatchQueue

@Test(arguments: [100])
func dispatchGroupOrderCleanliness(repetitions: Int) async throws {
// Repeating this `repetitions` number of times to help rule out
// edge cases that only show up some of the time
for index in 0 ..< repetitions {
Task {
actor Result {
private(set) var value = ""

func append(value: String) {
self.value.append(value)
@Suite("DispatchGroup Tests")
struct DispatchGroupTests {
@Test(arguments: [1000])
@available(macOS 13, iOS 16, tvOS 16, watchOS 9, *)
func dispatchGroupOrderCleanliness(repetitions: Int) async throws {
// Repeating this `repetitions` number of times to help rule out
// edge cases that only show up some of the time
for index in 0 ..< repetitions {
Task {
actor Result {
private(set) var value = ""

func append(value: String) {
self.value.append(value)
}
}
}

let result = Result()
let result = Result()

let group = DispatchGroup()
await result.append(value: "|🔵\(index)")
let group = DispatchGroup()
await result.append(value: "|🔵\(iteration)")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, interesting. Compiler error caused by this change. Something in the rebase and merged didn't flag this as a conflict, and interestingly enough the CI didn't run on this PR after re-targeting the stacked PR to main. Will fix soon, and will check that a normal PR runs the expected checks.


group.enter()
Task {
await result.append(value: "🟣/")
group.leave()
}
group.enter()
Task {
await result.append(value: "🟣/")
group.leave()
}

group.enter()
Task {
await result.append(value: "🟣^")
group.leave()
}
group.enter()
Task {
await result.append(value: "🟣^")
group.leave()
}

group.enter()
Task {
await result.append(value: "🟣\\")
group.leave()
group.enter()
Task {
await result.append(value: "🟣\\")
group.leave()
}

await withCheckedContinuation { continuation in
group.notify(queue: .main) {
Task {
await result.append(value: "🟢\(iteration)=")
continuation.resume()
}
}
}

let finalValue = await result.value

/// NOTE: If you need to visually debug issues, you can uncomment
/// the following to watch a visual representation of the group ordering.
///
/// In general, you'll see something like the following printed over and over
/// to the console:
///
/// ```
/// |🔵42🟣/🟣^🟣\🟢42=
/// ```
///
/// What you should observe:
///
/// - The index number be the same at the beginning and end of each line, and it
/// should always increment by one.
/// - The 🔵 should always be first, and the 🟢 should always be last for each line.
/// - There should always be 3 🟣's in between the 🔵 and 🟢.
/// - The ordering of the 🟣 can be random, and that is fine.
///
/// For example, for of the following are valid outputs:
///
/// ```
/// // GOOD
/// |🔵42🟣/🟣^🟣\🟢42=
/// ```
///
/// ```
/// // GOOD
/// |🔵42🟣/🟣\🟣^🟢42=
/// ```
///
/// But the following would not be valid:
///
/// ```
/// // BAD! (43 comes before 42)
/// |🔵43🟣/🟣^🟣\🟢43=
/// |🔵42🟣/🟣^🟣\🟢42=
/// |🔵44🟣/🟣^🟣\🟢44=
/// ```
///
/// ```
/// // BAD! (green globe comes before a purle one)
/// |🔵42🟣/🟣^🟢42🟣\=
/// ```
///

// NOTE: Uncomment to use troubleshooting method above:
// print(finalValue)

#expect(finalValue.prefix(1) == "|")
#expect(finalValue.count { $0 == "🟣" } == 3)
#expect(finalValue.count { $0 == "🟢" } == 1)
#expect(finalValue.lastIndex(of: "🟣")! < finalValue.firstIndex(of: "🟢")!)
#expect(finalValue.suffix(1) == "=")
}
}
}

/// Swift port of libdispatch/tests/dispatch_group.c
///
/// See https://github.com/swiftlang/swift-corelibs-libdispatch/blob/686475721aca13d98d2eab3a0c439403d33b6e2d/tests/dispatch_group.c
///
/// The original C test stresses `dispatch_group_wait` by enqueuing a bunch of
/// math-heavy blocks on a global queue, then waiting for them to finish with a
/// timeout. It also verifies that `notify` is invoked exactly once.
@Test(.timeLimit(.minutes(1)))
@available(macOS 13, iOS 16, tvOS 16, watchOS 9, *)
func dispatchGroupStress() async throws {
let iterations = 1000
// We use a separate concurrent queue rather than the global queue to avoid interference issues
// with other tests running in parallel
let workQueue = DispatchQueue(attributes: .concurrent)
let group = DispatchGroup()

await withCheckedContinuation { continuation in
group.notify(queue: .main) {
Task {
await result.append(value: "🟢\(index)=")
continuation.resume()
let isolationQueue = DispatchQueue(label: "isolationQueue")
nonisolated(unsafe) var counter = 0

for _ in 0 ..< iterations {
group.enter()
workQueue.async {
// We alternate between two options for workload. One is a simple
// math function, the other is a thread sleep.
//
// Alternating between those two approaches provides variance to
// increases failure chances if there are race conditions subject to timing
// and load.
if Bool.random() {
#if !os(WASI)
Thread.sleep(forTimeInterval: 0.00001) // 10_000 nanoseconds
#endif
} else {
// A small math workload similar to the original C test which used
// sin(random()). We iterate a couple thousand times to keep the CPU
// busy long enough for the group scheduling to matter.
var x = Double.random(in: 0.0 ... Double.pi)
for _ in 0 ..< 2_000 {
x = sin(x)
}
}

isolationQueue.async {
counter += 1
group.leave()
}
}
}

// NOTE: The test has a 1 minute time limit that will time out. In
// the original code, this timeout was 5 seconds, but currently
// the shortest timeout Swift Testing provides is 1 minute.
await group.wait()

let finalValue = await result.value

/// NOTE: If you need to visually debug issues, you can uncomment
/// the following to watch a visual representation of the group ordering.
///
/// In general, you'll see something like the following printed over and over
/// to the console:
///
/// ```
/// |🔵42🟣/🟣^🟣\🟢42=
/// ```
///
/// What you should observe:
///
/// - The index number be the same at the beginning and end of each line, and it
/// should always increment by one.
/// - The 🔵 should always be first, and the 🟢 should always be last for each line.
/// - There should always be 3 🟣's in between the 🔵 and 🟢.
/// - The ordering of the 🟣 can be random, and that is fine.
///
/// For example, for of the following are valid outputs:
///
/// ```
/// // GOOD
/// |🔵42🟣/🟣^🟣\🟢42=
/// ```
///
/// ```
/// // GOOD
/// |🔵42🟣/🟣\🟣^🟢42=
/// ```
///
/// But the following would not be valid:
///
/// ```
/// // BAD!
/// |🔵43🟣/🟣^🟣\🟢43=
/// |🔵42🟣/🟣^🟣\🟢42=
/// |🔵44🟣/🟣^🟣\🟢44=
/// ```
///
/// ```
/// // BAD!
/// |🔵42🟣/🟣^🟢42🟣\=
/// ```
///

// Uncomment to use troubleshooting method above:
// print(finalValue)

#expect(finalValue.prefix(1) == "|")
#expect(finalValue.count { $0 == "🟣" } == 3)
#expect(finalValue.count { $0 == "🟢" } == 1)
#expect(finalValue.lastIndex(of: "🟣")! < finalValue.firstIndex(of: "🟢")!)
#expect(finalValue.suffix(1) == "=")
// Verify notify fires exactly once.
nonisolated(unsafe) var notifyHits = 0
await withCheckedContinuation { k in
group.notify(queue: .main) {
notifyHits += 1
k.resume()
}
}
#expect(notifyHits == 1)

let finalCount = counter
#expect(finalCount == iterations)
}
}
Loading