diff --git a/Package.swift b/Package.swift index c6a2894c..62595603 100644 --- a/Package.swift +++ b/Package.swift @@ -6,9 +6,13 @@ import CompilerPluginSupport let AsyncAlgorithms_v1_0 = "AvailabilityMacro=AsyncAlgorithms 1.0:macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0" #if compiler(>=6.0) && swift(>=6.0) // 5.10 doesnt support visionOS availability let AsyncAlgorithms_v1_1 = - "AvailabilityMacro=AsyncAlgorithms 1.1:macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0" + "AvailabilityMacro=AsyncAlgorithms 1.1:macOS 11.0, iOS 14.0, tvOS 14.0, watchOS 7.0, visionOS 1.0" +let AsyncAlgorithms_v1_2 = + "AvailabilityMacro=AsyncAlgorithms 1.2:macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0" #else -let AsyncAlgorithms_v1_1 = "AvailabilityMacro=AsyncAlgorithms 1.1:macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0" +let AsyncAlgorithms_v1_1 = "AvailabilityMacro=AsyncAlgorithms 1.1:macOS 11.0, iOS 14.0, tvOS 14.0, watchOS 7.0" +let AsyncAlgorithms_v1_2 = + "AvailabilityMacro=AsyncAlgorithms 1.2:macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0" #endif let availabilityMacros: [SwiftSetting] = [ @@ -18,6 +22,9 @@ let availabilityMacros: [SwiftSetting] = [ .enableExperimentalFeature( AsyncAlgorithms_v1_1 ), + .enableExperimentalFeature( + AsyncAlgorithms_v1_2 + ) ] let package = Package( diff --git a/Package@swift-5.8.swift b/Package@swift-5.8.swift index 8161e475..e86de6c2 100644 --- a/Package@swift-5.8.swift +++ b/Package@swift-5.8.swift @@ -10,7 +10,10 @@ let availabilityMacros: [SwiftSetting] = [ "AvailabilityMacro=AsyncAlgorithms 1.0:macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0" ), .enableExperimentalFeature( - "AvailabilityMacro=AsyncAlgorithms 1.1:macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0" + "AvailabilityMacro=AsyncAlgorithms 1.1:macOS 11.0, iOS 14.0, tvOS 14.0, watchOS 7.0, visionOS 1.0" + ), + .enableExperimentalFeature( + "AvailabilityMacro=AsyncAlgorithms 1.2:macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0" ), ] diff --git a/Sources/AsyncAlgorithms/AsyncShareSequence.swift b/Sources/AsyncAlgorithms/AsyncShareSequence.swift index ebe3492d..55277668 100644 --- a/Sources/AsyncAlgorithms/AsyncShareSequence.swift +++ b/Sources/AsyncAlgorithms/AsyncShareSequence.swift @@ -14,7 +14,7 @@ import Synchronization import DequeModule -@available(AsyncAlgorithms 1.1, *) +@available(AsyncAlgorithms 1.2, *) extension AsyncSequence where Element: Sendable, Self: SendableMetatype, AsyncIterator: SendableMetatype { /// Creates a shared async sequence that allows multiple concurrent iterations over a single source. @@ -114,7 +114,7 @@ where Element: Sendable, Self: SendableMetatype, AsyncIterator: SendableMetatype // // This type is typically not used directly; instead, use the `share()` method on any // async sequence that meets the sendability requirements. -@available(AsyncAlgorithms 1.1, *) +@available(AsyncAlgorithms 1.2, *) struct AsyncShareSequence: Sendable where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: SendableMetatype { // Represents a single consumer's connection to the shared sequence. @@ -697,7 +697,7 @@ where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: Sendab } } -@available(AsyncAlgorithms 1.1, *) +@available(AsyncAlgorithms 1.2, *) extension AsyncShareSequence: AsyncSequence { typealias Element = Base.Element typealias Failure = Base.Failure diff --git a/Sources/AsyncAlgorithms/LegacyAsyncShareSequence.swift b/Sources/AsyncAlgorithms/LegacyAsyncShareSequence.swift new file mode 100644 index 00000000..fb8d1152 --- /dev/null +++ b/Sources/AsyncAlgorithms/LegacyAsyncShareSequence.swift @@ -0,0 +1,740 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +import Synchronization +import DequeModule + +@available(AsyncAlgorithms 1.1, *) +extension AsyncSequence +where Element: Sendable, Self: _SendableMetatype, AsyncIterator: _SendableMetatype { + /// A backported version of the `share` operator creating a shared async sequence + /// that allows multiple concurrent iterations over a single source. + /// + /// The `legacyShare` method transforms an async sequence into a shareable sequence that can be + /// safely iterated by multiple concurrent tasks. This is useful when you want to broadcast elements from + /// a single source to multiple consumers without duplicating work or creating separate iterations. + /// + /// Each element from the source sequence is delivered to all active iterators. + /// Elements are buffered according to the specified buffering policy to handle timing differences + /// between consumers. + /// + /// The base sequence is iterated in it's own task to ensure that cancellation is not polluted from + /// one side of iteration to another. + /// + /// > Note: the `.share` operator should be always used instead of this one whenever possible. + /// This backport is available when targeting "legacy" operating systems where such operator is not available. + /// + /// ## Example Usage + /// + /// ```swift + /// let numbers = [1, 2, 3, 4, 5].async.map { + /// try? await Task.sleep(for: .seconds(1)) + /// return $0 + /// } + /// + /// let shared = numbers.legacyShare() + /// + /// // Multiple tasks can iterate concurrently + /// let consumer1 = Task { + /// for await value in shared { + /// print("Consumer 1: \(value)") + /// } + /// } + /// + /// let consumer2 = Task { + /// for await value in shared { + /// print("Consumer 2: \(value)") + /// } + /// } + /// + /// await consumer1.value + /// await consumer2.value + /// ``` + /// + /// - Parameter bufferingPolicy: The policy controlling how elements are enqueued to the shared buffer. Defaults to `.bounded(1)`. + /// - `.bounded(n)`: Limits the buffer to `n` elements, applying backpressure to the source when that limit is reached + /// - `.bufferingOldest(n)`: Keeps the oldest `n` elements, discarding newer ones when full + /// - `.bufferingNewest(n)`: Keeps the newest `n` elements, discarding older ones when full + /// - `.unbounded`: Allows unlimited buffering (use with caution) + /// + /// - Returns: A sendable async sequence that can be safely shared across multiple concurrent tasks. + /// + /// + public func legacyShare( + bufferingPolicy: AsyncBufferSequencePolicy = .bounded(1) + ) -> LegacyAsyncShareSequence { + + // The iterator is transferred to the isolation of the iterating task + // this has to be done "unsafely" since we cannot annotate the transfer + // however since iterating an AsyncSequence types twice has been defined + // as invalid and one creation of the iterator is virtually a consuming + // operation so this is safe at runtime. + // The general principal of `.share()` is to provide a mecahnism for non- + // shared AsyncSequence types to be shared. The parlance for those is + // that the base AsyncSequence type is not Sendable. If the iterator + // is not marked as `nonisolated(unsafe)` the compiler will claim that + // the value is "Capture of 'iterator' with non-Sendable type 'Self.AsyncIterator' in a '@Sendable' closure;" + // Since the closure returns a disconnected non-sendable value there is no + // distinct problem here and the compiler just needs to be informed + // that the diagnostic is overly pessimistic. + nonisolated(unsafe) let iterator = makeAsyncIterator() + return LegacyAsyncShareSequence( + { + iterator + }, + bufferingPolicy: bufferingPolicy + ) + } +} + +// An async sequence that enables safe concurrent sharing of a single source sequence. +// +// `AsyncShareSequence` wraps a base async sequence and allows multiple concurrent iterators +// to consume elements from the same source. It handles all the complexity of coordinating +// between multiple consumers, buffering elements, and managing the lifecycle of the underlying +// iteration. +// +// ## Key Features +// +// **Single Source Iteration**: The base sequence's iterator is created and consumed only once +// **Concurrent Safe**: Multiple tasks can safely iterate simultaneously +// **Configurable Buffering**: Supports various buffering strategies for different use cases +// **Automatic Cleanup**: Properly manages resources and cancellation across all consumers +// +// ## Internal Architecture +// +// The implementation uses several key components: +// `Side`: Represents a single consumer's iteration state +// `Iteration`: Coordinates all consumers and manages the shared buffer +// `Extent`: Manages the overall lifecycle and cleanup +// +// This type is typically not used directly; instead, use the `share()` method on any +// async sequence that meets the sendability requirements. +@available(AsyncAlgorithms 1.1, *) +public struct LegacyAsyncShareSequence: Sendable +where Base.Element: Sendable, Base: _SendableMetatype, Base.AsyncIterator: _SendableMetatype { + // Represents a single consumer's connection to the shared sequence. + // + // Each iterator of the shared sequence creates its own `Side` instance, which tracks + // that consumer's position in the shared buffer and manages its continuation for + // async iteration. The `Side` automatically registers itself with the central + // `Iteration` coordinator and cleans up when deallocated. + // + // ## Lifecycle + // + // **Creation**: Automatically registers with the iteration coordinator + // **Usage**: Tracks buffer position and manages async continuations + // **Cleanup**: Automatically unregisters and cancels pending operations on deinit + final class Side { + // Due to a runtime crash in 1.0 compatible versions, it's not possible to handle + // a generic failure constrained to Base.Failure. We handle inner failure with a `any Error` + // and force unwrap it to the generic 1.2 generic type on the outside Iterator. + typealias Failure = any Error + // Tracks the state of a single consumer's iteration. + // + // - `continuation`: The continuation waiting for the next element (nil if not waiting) + // - `position`: The consumer's current position in the shared buffer + struct State { + var continuation: UnsafeContinuation, Never>? + var position = 0 + + // Creates a new state with the position adjusted by the given offset. + // + // This is used when the shared buffer is trimmed to maintain correct + // relative positioning for this consumer. + // + // - Parameter adjustment: The number of positions to subtract from the current position + // - Returns: A new `State` with the adjusted position + func offset(_ adjustment: Int) -> State { + State(continuation: continuation, position: position - adjustment) + } + } + + let iteration: Iteration + let id: Int + + init(_ iteration: Iteration) { + self.iteration = iteration + id = iteration.registerSide() + } + + deinit { + iteration.unregisterSide(id) + } + + func next(isolation actor: isolated (any Actor)?) async throws(Failure) -> Base.Element? { + try await iteration.next(isolation: actor, id: id) + } + } + + // The central coordinator that manages the shared iteration state. + // + // `Iteration` is responsible for: + // Managing the single background task that consumes the source sequence + // Coordinating between multiple consumer sides + // Buffering elements according to the specified policy + // Handling backpressure and flow control + // Managing cancellation and cleanup + // + // ## Thread Safety + // + // All operations are synchronized using a `Mutex` to ensure thread-safe access + // to the shared state across multiple concurrent consumers. + final class Iteration: Sendable { + typealias Failure = Side.Failure + // Represents the state of the background task that consumes the source sequence. + // + // The iteration task goes through several states during its lifecycle: + // `pending`: Initial state, holds the factory to create the iterator + // `starting`: Transitional state while the task is being created + // `running`: Active state with a running background task + // `cancelled`: Terminal state when the iteration has been cancelled + enum IteratingTask { + case pending(@Sendable () -> sending Base.AsyncIterator) + case starting + case running(Task) + case cancelled + + var isStarting: Bool { + switch self { + case .starting: true + default: false + } + } + + func cancel() { + switch self { + case .running(let task): + task.cancel() + default: + break + } + } + } + // The complete shared state for coordinating all aspects of the shared iteration. + // + // This state is protected by a mutex and contains all the information needed + // to coordinate between multiple consumers, manage buffering, and control + // the background iteration task. + struct State: Sendable { + // Defines how elements are stored and potentially discarded in the shared buffer. + // + // `unbounded`: Store all elements without limit (may cause memory growth) + // `bufferingOldest(Int)`: Keep only the oldest N elements, ignore newer ones when full + // `bufferingNewest(Int)`: Keep only the newest N elements, discard older ones when full + enum StoragePolicy: Sendable { + case unbounded + case bufferingOldest(Int) + case bufferingNewest(Int) + } + + var generation = 0 + var sides = [Int: Side.State]() + var iteratingTask: IteratingTask + private(set) var buffer = Deque() + private(set) var finished = false + private(set) var failure: Failure? + var cancelled = false + var limit: UnsafeContinuation? + var demand: UnsafeContinuation? + + let storagePolicy: StoragePolicy + + init( + _ iteratorFactory: @escaping @Sendable () -> sending Base.AsyncIterator, + bufferingPolicy: AsyncBufferSequencePolicy + ) { + self.iteratingTask = .pending(iteratorFactory) + switch bufferingPolicy.policy { + case .bounded: self.storagePolicy = .unbounded + case .bufferingOldest(let bound): self.storagePolicy = .bufferingOldest(bound) + case .bufferingNewest(let bound): self.storagePolicy = .bufferingNewest(bound) + case .unbounded: self.storagePolicy = .unbounded + } + } + + // Removes elements from the front of the buffer that all consumers have already processed. + // + // This method finds the minimum position across all active consumers and removes + // that many elements from the front of the buffer. It then adjusts all consumer + // positions to account for the removed elements, maintaining their relative positions. + // + // This optimization prevents the buffer from growing indefinitely when all consumers + // are keeping pace with each other. + mutating func trimBuffer() { + if let minimumIndex = sides.values.map({ $0.position }).min(), minimumIndex > 0 { + buffer.removeFirst(minimumIndex) + sides = sides.mapValues { + $0.offset(minimumIndex) + } + } + } + + // Private state machine transitions for the emission of a given value. + // + // This method ensures the continuations are properly consumed when emitting values + // and returns those continuations for resumption. + private mutating func _emit( + _ value: T, + limit: Int + ) -> (T, UnsafeContinuation?, UnsafeContinuation?, Bool) { + let belowLimit = buffer.count < limit || limit == 0 + defer { + if belowLimit { + self.limit = nil + } + demand = nil + } + guard case .cancelled = iteratingTask else { + return (value, belowLimit ? self.limit : nil, demand, false) + } + return (value, belowLimit ? self.limit : nil, demand, true) + } + + // Internal state machine transitions for the emission of a given value. + // + // This method ensures the continuations are properly consumed when emitting values + // and returns those continuations for resumption. + // + // If no limit is specified it interprets that as an unbounded limit. + mutating func emit( + _ value: T, + limit: Int? + ) -> (T, UnsafeContinuation?, UnsafeContinuation?, Bool) { + return _emit(value, limit: limit ?? .max) + } + + // Adds an element to the buffer according to the configured storage policy. + // + // The behavior depends on the storage policy: + // **Unbounded**: Always appends the element + // **Buffering Oldest**: Appends only if under the limit, otherwise ignores the element + // **Buffering Newest**: Appends if under the limit, otherwise removes the oldest and appends + // + // - Parameter element: The element to add to the buffer + mutating func enqueue(_ element: Base.Element) { + let count = buffer.count + + switch storagePolicy { + case .unbounded: + buffer.append(element) + case .bufferingOldest(let limit): + if count < limit { + buffer.append(element) + } + case .bufferingNewest(let limit): + if count < limit { + buffer.append(element) + } else if count > 0 { + buffer.removeFirst() + buffer.append(element) + } + } + } + + mutating func finish() { + finished = true + } + + mutating func fail(_ error: Failure) { + finished = true + failure = error + } + } + + let state: ManagedCriticalState + let limit: Int? + + init( + _ iteratorFactory: @escaping @Sendable () -> sending Base.AsyncIterator, + bufferingPolicy: AsyncBufferSequencePolicy + ) { + state = ManagedCriticalState(State(iteratorFactory, bufferingPolicy: bufferingPolicy)) + switch bufferingPolicy.policy { + case .bounded(let limit): + self.limit = limit + default: + self.limit = nil + } + } + + func cancel() { + let (task, limitContinuation, demand, cancelled) = state.withLock { + state -> (IteratingTask?, UnsafeContinuation?, UnsafeContinuation?, Bool) in + guard state.sides.count == 0 else { + state.cancelled = true + return state.emit(nil, limit: limit) + } + defer { + state.iteratingTask = .cancelled + state.cancelled = true + } + return state.emit(state.iteratingTask, limit: limit) + } + task?.cancel() + limitContinuation?.resume(returning: cancelled) + demand?.resume() + } + + func registerSide() -> Int { + state.withLock { state in + defer { state.generation += 1 } + state.sides[state.generation] = Side.State() + return state.generation + } + } + + func unregisterSide(_ id: Int) { + let (side, continuation, cancelled, iteratingTaskToCancel) = state.withLock { + state -> (Side.State?, UnsafeContinuation?, Bool, IteratingTask?) in + let side = state.sides.removeValue(forKey: id) + state.trimBuffer() + let cancelRequested = state.sides.count == 0 && state.cancelled + guard let limit, state.buffer.count < limit else { + guard case .cancelled = state.iteratingTask else { + defer { + if cancelRequested { + state.iteratingTask = .cancelled + } + } + return (side, nil, false, cancelRequested ? state.iteratingTask : nil) + } + return (side, nil, true, nil) + } + defer { state.limit = nil } + guard case .cancelled = state.iteratingTask else { + defer { + if cancelRequested { + state.iteratingTask = .cancelled + } + } + return (side, state.limit, false, cancelRequested ? state.iteratingTask : nil) + } + return (side, state.limit, true, nil) + } + if let continuation { + continuation.resume(returning: cancelled) + } + if let side { + side.continuation?.resume(returning: .success(nil)) + } + if let iteratingTaskToCancel { + iteratingTaskToCancel.cancel() + } + } + + func iterate() async -> Bool { + if let limit { + let cancelled = await withUnsafeContinuation { (continuation: UnsafeContinuation) in + let (resume, cancelled) = state.withLock { state -> (UnsafeContinuation?, Bool) in + guard state.buffer.count >= limit else { + assert(state.limit == nil) + guard case .cancelled = state.iteratingTask else { + return (continuation, false) + } + return (continuation, true) + } + state.limit = continuation + guard case .cancelled = state.iteratingTask else { + return (nil, false) + } + return (nil, true) + } + if let resume { + resume.resume(returning: cancelled) + } + } + if cancelled { + return false + } + } + + // await a demand + await withUnsafeContinuation { (continuation: UnsafeContinuation) in + let hasPendingDemand = state.withLock { state in + for (_, side) in state.sides { + if side.continuation != nil { + return true + } + } + state.demand = continuation + return false + } + if hasPendingDemand { + continuation.resume() + } + } + return state.withLock { state in + switch state.iteratingTask { + case .cancelled: + return false + default: + return true + } + } + } + + func cancel(id: Int) { + unregisterSide(id) // doubly unregistering is idempotent but has a side effect of emitting nil if present + } + + struct Resumption { + let continuation: UnsafeContinuation, Never> + let result: Result + + func resume() { + continuation.resume(returning: result) + } + } + + func emit(_ result: Result) { + let (resumptions, limitContinuation, demandContinuation, cancelled) = state.withLock { + state -> ([Resumption], UnsafeContinuation?, UnsafeContinuation?, Bool) in + var resumptions = [Resumption]() + switch result { + case .success(let element): + if let element { + state.enqueue(element) + } else { + state.finish() + } + case .failure(let failure): + state.fail(failure) + } + for (id, side) in state.sides { + if let continuation = side.continuation { + if side.position < state.buffer.count { + resumptions.append(Resumption(continuation: continuation, result: .success(state.buffer[side.position]))) + state.sides[id]?.position += 1 + state.sides[id]?.continuation = nil + } else if state.finished { + state.sides[id]?.continuation = nil + if let failure = state.failure { + resumptions.append(Resumption(continuation: continuation, result: .failure(failure))) + } else { + resumptions.append(Resumption(continuation: continuation, result: .success(nil))) + } + } + } + } + state.trimBuffer() + return state.emit(resumptions, limit: limit) + } + + if let limitContinuation { + limitContinuation.resume(returning: cancelled) + } + if let demandContinuation { + demandContinuation.resume() + } + for resumption in resumptions { + resumption.resume() + } + } + + private func nextIteration( + _ id: Int + ) async -> Result { + return await withTaskCancellationHandler { + await withUnsafeContinuation { continuation in + let (res, limitContinuation, demandContinuation, cancelled) = state.withLock { + state -> ( + Result?, UnsafeContinuation?, UnsafeContinuation?, Bool + ) in + guard let side = state.sides[id] else { + return state.emit(.success(nil), limit: limit) + } + if side.position < state.buffer.count { + // There's an element available at this position + let element = state.buffer[side.position] + state.sides[id]?.position += 1 + state.trimBuffer() + return state.emit(.success(element), limit: limit) + } else { + // Position is beyond the buffer + if let failure = state.failure { + return state.emit(.failure(failure), limit: limit) + } else if state.finished { + return state.emit(.success(nil), limit: limit) + } else { + state.sides[id]?.continuation = continuation + return state.emit(nil, limit: limit) + } + } + } + if let limitContinuation { + limitContinuation.resume(returning: cancelled) + } + if let demandContinuation { + demandContinuation.resume() + } + if let res { + continuation.resume(returning: res) + } + } + } onCancel: { + cancel(id: id) + } + } + + private func iterationLoop(factory: @Sendable () -> sending Base.AsyncIterator) async { + var iterator = factory() + do { + while await iterate() { + if let element = try await iterator.next() { + emit(.success(element)) + } else { + emit(.success(nil)) + } + } + } catch { + emit(.failure(error)) + } + } + + func next(isolation actor: isolated (any Actor)?, id: Int) async throws(Failure) -> Base.Element? { + let iteratingTask = state.withLock { state -> IteratingTask in + defer { + if case .pending = state.iteratingTask { + state.iteratingTask = .starting + } + } + return state.iteratingTask + } + + if case .cancelled = iteratingTask { return nil } + + if case .pending(let factory) = iteratingTask { + let task: Task + // for the fancy dance of availability and canImport see the comment on the next check for details + #if swift(>=6.2) + if #available(macOS 26.0, iOS 26.0, tvOS 26.0, visionOS 26.0, *) { + task = Task(name: "Share Iteration") { [factory, self] in + await iterationLoop(factory: factory) + } + } else { + task = Task.detached(name: "Share Iteration") { [factory, self] in + await iterationLoop(factory: factory) + } + } + #else + task = Task.detached { [factory, self] in + await iterationLoop(factory: factory) + } + #endif + // Known Issue: there is a very small race where the task may not get a priority escalation during startup + // this unfortuantely cannot be avoided since the task should ideally not be formed within the critical + // region of the state. Since that could lead to potential deadlocks in low-core-count systems. + // That window is relatively small and can be revisited if a suitable proof of safe behavior can be + // determined. + state.withLock { state in + precondition(state.iteratingTask.isStarting) + state.iteratingTask = .running(task) + } + } + + // withTaskPriorityEscalationHandler is only available for the '26 releases and the 6.2 version of + // the _Concurrency library. This menas for Darwin based OSes we have to have a fallback at runtime, + // and for non-darwin OSes we need to verify against the ability to import that version. + // Using this priority escalation means that the base task can avoid being detached. + // + // This is disabled for now until the 9999 availability is removed from `withTaskPriorityEscalationHandler` + #if false // TODO: remove when this is resolved + guard #available(macOS 26.0, iOS 26.0, tvOS 26.0, visionOS 26.0, *) else { + return try await nextIteration(id).get() + } + return try await withTaskPriorityEscalationHandler { + return await nextIteration(id) + } onPriorityEscalated: { old, new in + let task = state.withLock { state -> Task? in + switch state.iteratingTask { + case .running(let task): + return task + default: + return nil + } + } + task?.escalatePriority(to: new) + }.get() + #else + return try await nextIteration(id).get() + #endif + } + } + + // Manages the lifecycle of the shared iteration. + // + // `Extent` serves as the ownership boundary for the shared sequence. When the + // `AsyncShareSequence` itself is deallocated, the `Extent` ensures that the + // background iteration task is properly cancelled and all resources are cleaned up. + // + // This design allows multiple iterators to safely reference the same underlying + // iteration coordinator while ensuring proper cleanup when the shared sequence + // is no longer needed. + final class Extent: Sendable { + let iteration: Iteration + + init( + _ iteratorFactory: @escaping @Sendable () -> sending Base.AsyncIterator, + bufferingPolicy: AsyncBufferSequencePolicy + ) { + iteration = Iteration(iteratorFactory, bufferingPolicy: bufferingPolicy) + } + + deinit { + iteration.cancel() + } + } + + let extent: Extent + + init( + _ iteratorFactory: @escaping @Sendable () -> sending Base.AsyncIterator, + bufferingPolicy: AsyncBufferSequencePolicy + ) { + extent = Extent(iteratorFactory, bufferingPolicy: bufferingPolicy) + } +} + +@available(AsyncAlgorithms 1.1, *) +extension LegacyAsyncShareSequence: AsyncSequence { + public typealias Element = Base.Element + @available(AsyncAlgorithms 1.2, *) + public typealias Failure = Base.Failure + public struct Iterator: AsyncIteratorProtocol, _SendableMetatype { + let side: Side + + init(_ iteration: Iteration) { + side = Side(iteration) + } + + mutating public func next() async rethrows -> Element? { + try await side.next(isolation: nil) + } + + @available(AsyncAlgorithms 1.2, *) + mutating public func next(isolation actor: isolated (any Actor)?) async throws(Failure) -> Element? { + do { + return try await side.next(isolation: actor) + } catch { + // It's guaranteed to match `Failure` but we are keeping the internal `Side` and `Iteration` + // constrained to `any Error` to prevent a compiler bug visible at runtime + // on pre 1.2 operating systems + throw error as! Failure + } + } + } + + public func makeAsyncIterator() -> Iterator { + Iterator(extent.iteration) + } +} + diff --git a/Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerAsyncChannel+Internal.swift b/Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerAsyncChannel+Internal.swift index ef2567dd..5b5c04b8 100644 --- a/Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerAsyncChannel+Internal.swift +++ b/Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerAsyncChannel+Internal.swift @@ -13,7 +13,7 @@ import DequeModule import Synchronization -@available(AsyncAlgorithms 1.1, *) +@available(AsyncAlgorithms 1.2, *) extension MultiProducerSingleConsumerAsyncChannel { @usableFromInline enum _InternalBackpressureStrategy: Sendable, CustomStringConvertible { @@ -111,7 +111,7 @@ extension MultiProducerSingleConsumerAsyncChannel { } } -@available(AsyncAlgorithms 1.1, *) +@available(AsyncAlgorithms 1.2, *) extension MultiProducerSingleConsumerAsyncChannel { @usableFromInline final class _Storage: Sendable { @@ -1713,7 +1713,7 @@ extension MultiProducerSingleConsumerAsyncChannel._Storage._StateMachine { } } -@available(AsyncAlgorithms 1.1, *) +@available(AsyncAlgorithms 1.2, *) @usableFromInline enum _MultiProducerSingleConsumerSuspendedProducer { case closure((Result) -> Void) diff --git a/Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerAsyncChannel.swift b/Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerAsyncChannel.swift index acc9a72a..cfc4d94b 100644 --- a/Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerAsyncChannel.swift +++ b/Sources/AsyncAlgorithms/MultiProducerSingleConsumerChannel/MultiProducerSingleConsumerAsyncChannel.swift @@ -15,7 +15,7 @@ /// /// This error is thrown when the channel is already finished when /// trying to send new elements to the source. -@available(AsyncAlgorithms 1.1, *) +@available(AsyncAlgorithms 1.2, *) public struct MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError: Error { @usableFromInline init() {} @@ -115,7 +115,7 @@ public struct MultiProducerSingleConsumerAsyncChannelAlreadyFinishedError: Error /// When the consumer stops consumption by either deiniting the channel or the task calling ``next(isolation:)`` /// getting cancelled, the source will get notified about the termination if a termination callback has been set /// before by calling ``Source/setOnTerminationCallback(_:)``. -@available(AsyncAlgorithms 1.1, *) +@available(AsyncAlgorithms 1.2, *) public struct MultiProducerSingleConsumerAsyncChannel: ~Copyable { /// The backing storage. @usableFromInline @@ -214,7 +214,7 @@ public struct MultiProducerSingleConsumerAsyncChannel: } } -@available(AsyncAlgorithms 1.1, *) +@available(AsyncAlgorithms 1.2, *) extension MultiProducerSingleConsumerAsyncChannel { /// A struct to send values to the channel. /// @@ -640,7 +640,7 @@ extension MultiProducerSingleConsumerAsyncChannel { } } -@available(AsyncAlgorithms 1.1, *) +@available(AsyncAlgorithms 1.2, *) extension MultiProducerSingleConsumerAsyncChannel where Element: Copyable { struct ChannelAsyncSequence: AsyncSequence { @usableFromInline @@ -675,7 +675,7 @@ extension MultiProducerSingleConsumerAsyncChannel where Element: Copyable { } } -@available(AsyncAlgorithms 1.1, *) +@available(AsyncAlgorithms 1.2, *) extension MultiProducerSingleConsumerAsyncChannel.ChannelAsyncSequence where Element: Copyable { struct Iterator: AsyncIteratorProtocol { @usableFromInline @@ -722,6 +722,6 @@ extension MultiProducerSingleConsumerAsyncChannel.ChannelAsyncSequence where Ele } } -@available(AsyncAlgorithms 1.1, *) +@available(AsyncAlgorithms 1.2, *) extension MultiProducerSingleConsumerAsyncChannel.ChannelAsyncSequence: Sendable {} #endif diff --git a/Sources/AsyncAlgorithms/Shims.swift b/Sources/AsyncAlgorithms/Shims.swift new file mode 100644 index 00000000..e4b10f78 --- /dev/null +++ b/Sources/AsyncAlgorithms/Shims.swift @@ -0,0 +1,18 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +import Foundation + +#if compiler(>=6.2) +public typealias _SendableMetatype = SendableMetatype +#else +public typealias _SendableMetatype = Any +#endif diff --git a/Tests/AsyncAlgorithmsTests/Support/FailingSequence.swift b/Tests/AsyncAlgorithmsTests/Support/FailingSequence.swift new file mode 100644 index 00000000..d05bb682 --- /dev/null +++ b/Tests/AsyncAlgorithmsTests/Support/FailingSequence.swift @@ -0,0 +1,30 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +@available(AsyncAlgorithms 1.2, *) +struct FailingSequence: AsyncSequence, Sendable { + typealias Element = Void + let error: Failure + init(_ error: Failure) { + self.error = error + } + func makeAsyncIterator() -> AsyncIterator { AsyncIterator(error: error) } + + struct AsyncIterator: AsyncIteratorProtocol, Sendable { + let error: Failure + func next() async throws(Failure) -> Void? { + throw error + } + mutating func next(completion: @escaping (Result) -> Void) async throws(Failure) -> Element? { + throw error + } + } +} diff --git a/Tests/AsyncAlgorithmsTests/Support/Task+Extensions.swift b/Tests/AsyncAlgorithmsTests/Support/Task+Extensions.swift new file mode 100644 index 00000000..447e43bb --- /dev/null +++ b/Tests/AsyncAlgorithmsTests/Support/Task+Extensions.swift @@ -0,0 +1,35 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2025 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +/// Backportable versions of `sleep(for:)` for legacy platforms where this kind of method is not available +extension Task where Success == Never, Failure == Never { + + static func sleep(milliseconds duration: UInt64) async throws { + try await sleep(duration, multiplier: 1_000_000) + } + static func sleep(microseconds duration: UInt64) async throws { + try await sleep(duration, multiplier: 1_000) + } + static func sleep(seconds duration: UInt64) async throws { + try await sleep(duration, multiplier: 1_000_000_000) + } + + private static func sleep(_ value: UInt64, multiplier: UInt64) async throws { + guard UInt64.max / multiplier > value else { + throw SleepError.durationOutOfBounds + } + try await sleep(nanoseconds: value * multiplier) + } +} + +fileprivate enum SleepError: Error { + case durationOutOfBounds +} diff --git a/Tests/AsyncAlgorithmsTests/TestLegacyShare.swift b/Tests/AsyncAlgorithmsTests/TestLegacyShare.swift new file mode 100644 index 00000000..0833f35a --- /dev/null +++ b/Tests/AsyncAlgorithmsTests/TestLegacyShare.swift @@ -0,0 +1,607 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +import XCTest +import AsyncAlgorithms +import Synchronization + +@available(AsyncAlgorithms 1.1, *) +final class TestLegacyShare: XCTestCase { + + // MARK: - Basic Functionality Tests + + func test_share_delivers_elements_to_multiple_consumers() async throws { + let source = [1, 2, 3, 4, 5] + let shared = source.async.legacyShare() + let gate1 = Gate() + let gate2 = Gate() + + async let consumer1 = Task.detached { + var results = [Int]() + var iterator = shared.makeAsyncIterator() + gate1.open() + await gate2.enter() + while let value = await iterator.next() { + results.append(value) + } + return results + } + + async let consumer2 = Task.detached { + var results = [Int]() + var iterator = shared.makeAsyncIterator() + gate2.open() + await gate1.enter() + while let value = await iterator.next() { + results.append(value) + } + return results + } + let results1 = await consumer1.value + let results2 = await consumer2.value + + XCTAssertEqual(results1, [1, 2, 3, 4, 5]) + XCTAssertEqual(results2, [1, 2, 3, 4, 5]) + } + + func test_share_with_single_consumer() async { + let source = [1, 2, 3, 4, 5] + let shared = source.async.legacyShare() + + var results = [Int]() + for await value in shared { + results.append(value) + } + + XCTAssertEqual(results, [1, 2, 3, 4, 5]) + } + + func test_share_with_empty_source() async { + let source = [Int]() + let shared = source.async.legacyShare() + + var results = [Int]() + for await value in shared { + results.append(value) + } + + XCTAssertEqual(results, []) + } + + // MARK: - Buffering Policy Tests + + func test_share_with_bounded_buffering() async throws { + var gated = GatedSequence([1, 2, 3, 4, 5]) + let shared = gated.legacyShare(bufferingPolicy: .bounded(2)) + + let results1 = ManagedCriticalState([Int]()) + let results2 = ManagedCriticalState([Int]()) + let gate1 = Gate() + let gate2 = Gate() + + let consumer1 = Task { + var iterator = shared.makeAsyncIterator() + gate1.open() + await gate2.enter() + // Consumer 1 reads first element + if let value = await iterator.next() { + results1.withLock { $0.append(value) } + } + // Delay to allow consumer 2 to get ahead + try? await Task.sleep(milliseconds: 10) + // Continue reading + while let value = await iterator.next() { + results1.withLock { $0.append(value) } + } + } + + let consumer2 = Task { + var iterator = shared.makeAsyncIterator() + gate2.open() + await gate1.enter() + // Consumer 2 reads all elements quickly + while let value = await iterator.next() { + results2.withLock { $0.append(value) } + } + } + + // Advance the gated sequence to make elements available + gated.advance() // 1 + gated.advance() // 2 + gated.advance() // 3 + gated.advance() // 4 + gated.advance() // 5 + + await consumer1.value + await consumer2.value + + // Both consumers should receive all elements + XCTAssertEqual(results1.withLock { $0 }.sorted(), [1, 2, 3, 4, 5]) + XCTAssertEqual(results2.withLock { $0 }.sorted(), [1, 2, 3, 4, 5]) + } + + func test_share_with_unbounded_buffering() async throws { + let source = [1, 2, 3, 4, 5] + let shared = source.async.legacyShare(bufferingPolicy: .unbounded) + + let results1 = ManagedCriticalState([Int]()) + let results2 = ManagedCriticalState([Int]()) + let gate1 = Gate() + let gate2 = Gate() + + let consumer1 = Task { + var iterator = shared.makeAsyncIterator() + gate2.open() + await gate1.enter() + while let value = await iterator.next() { + results1.withLock { $0.append(value) } + // Add some delay to consumer 1 + try? await Task.sleep(milliseconds: 1) + } + } + + let consumer2 = Task { + var iterator = shared.makeAsyncIterator() + gate1.open() + await gate2.enter() + while let value = await iterator.next() { + results2.withLock { $0.append(value) } + } + } + + await consumer1.value + await consumer2.value + + XCTAssertEqual(results1.withLock { $0 }, [1, 2, 3, 4, 5]) + XCTAssertEqual(results2.withLock { $0 }, [1, 2, 3, 4, 5]) + } + + func test_share_with_bufferingLatest_buffering() async throws { + var gated = GatedSequence([1, 2, 3, 4, 5]) + let shared = gated.legacyShare(bufferingPolicy: .bufferingLatest(2)) + + let fastResults = ManagedCriticalState([Int]()) + let slowResults = ManagedCriticalState([Int]()) + let gate1 = Gate() + let gate2 = Gate() + + let fastConsumer = Task.detached { + var iterator = shared.makeAsyncIterator() + gate2.open() + await gate1.enter() + while let value = await iterator.next() { + fastResults.withLock { $0.append(value) } + } + } + + let slowConsumer = Task.detached { + var iterator = shared.makeAsyncIterator() + gate1.open() + await gate2.enter() + // Read first element immediately + if let value = await iterator.next() { + slowResults.withLock { $0.append(value) } + } + // Add significant delay to let buffer fill up and potentially overflow + try? await Task.sleep(milliseconds: 50) + // Continue reading remaining elements + while let value = await iterator.next() { + slowResults.withLock { $0.append(value) } + } + } + + // Release all elements quickly to test buffer overflow behavior + gated.advance() // 1 + try? await Task.sleep(milliseconds: 5) + gated.advance() // 2 + try? await Task.sleep(milliseconds: 5) + gated.advance() // 3 + try? await Task.sleep(milliseconds: 5) + gated.advance() // 4 + try? await Task.sleep(milliseconds: 5) + gated.advance() // 5 + + await fastConsumer.value + await slowConsumer.value + + let slowResultsArray = slowResults.withLock { $0 } + + // Slow consumer should get the first element plus the latest elements in buffer + // With bufferingLatest(2), when buffer overflows, older elements are discarded + XCTAssertTrue(slowResultsArray.count >= 1, "Should have at least the first element") + XCTAssertEqual(slowResultsArray.first, 1, "Should start with first element") + + // Due to bufferingLatest policy, the slow consumer should favor newer elements + // It may miss some middle elements but should get the latest ones + let receivedSet = Set(slowResultsArray) + XCTAssertTrue(receivedSet.isSubset(of: Set([1, 2, 3, 4, 5]))) + + // With bufferingLatest, we expect the slow consumer to get newer elements + // when it finally catches up after the delay + if slowResultsArray.count > 1 { + let laterElements = Set(slowResultsArray.dropFirst()) + // Should have received some of the later elements (4, 5) due to bufferingLatest + XCTAssertTrue( + laterElements.contains(4) || laterElements.contains(5) || laterElements.contains(3), + "BufferingLatest should favor keeping newer elements" + ) + } + } + + func test_share_with_bufferingOldest_buffering() async throws { + var gated = GatedSequence([1, 2, 3, 4, 5]) + let shared = gated.legacyShare(bufferingPolicy: .bufferingOldest(2)) + + let fastResults = ManagedCriticalState([Int]()) + let slowResults = ManagedCriticalState([Int]()) + let gate1 = Gate() + let gate2 = Gate() + + let fastConsumer = Task { + var iterator = shared.makeAsyncIterator() + gate2.open() + await gate1.enter() + while let value = await iterator.next() { + fastResults.withLock { $0.append(value) } + } + } + + let slowConsumer = Task { + var iterator = shared.makeAsyncIterator() + gate1.open() + await gate2.enter() + // Read first element immediately + if let value = await iterator.next() { + slowResults.withLock { $0.append(value) } + } + // Add significant delay to let buffer fill up and potentially overflow + try? await Task.sleep(milliseconds: 50) + // Continue reading remaining elements + while let value = await iterator.next() { + slowResults.withLock { $0.append(value) } + } + } + + // Release all elements quickly to test buffer overflow behavior + gated.advance() // 1 + try? await Task.sleep(milliseconds: 5) + gated.advance() // 2 + try? await Task.sleep(milliseconds: 5) + gated.advance() // 3 + try? await Task.sleep(milliseconds: 5) + gated.advance() // 4 + try? await Task.sleep(milliseconds: 5) + gated.advance() // 5 + + await fastConsumer.value + await slowConsumer.value + + let slowResultsArray = slowResults.withLock { $0 } + + // Slow consumer should get the first element plus the oldest elements that fit in buffer + // With bufferingOldest(2), when buffer overflows, newer elements are ignored + XCTAssertTrue(slowResultsArray.count >= 1, "Should have at least the first element") + XCTAssertEqual(slowResultsArray.first, 1, "Should start with first element") + + // Due to bufferingOldest policy, the slow consumer should favor older elements + let receivedSet = Set(slowResultsArray) + XCTAssertTrue(receivedSet.isSubset(of: Set([1, 2, 3, 4, 5]))) + + // With bufferingOldest, when the buffer is full, newer elements are ignored + // So the slow consumer should be more likely to receive earlier elements + if slowResultsArray.count > 1 { + let laterElements = Array(slowResultsArray.dropFirst()) + // Should have received earlier elements due to bufferingOldest policy + // Elements 4 and 5 are less likely to be received since they're newer + let hasEarlierElements = laterElements.contains(2) || laterElements.contains(3) + let hasLaterElements = laterElements.contains(4) && laterElements.contains(5) + + // BufferingOldest should favor keeping older elements when buffer is full + // So we should be more likely to see earlier elements than later ones + XCTAssertTrue( + hasEarlierElements || !hasLaterElements, + "BufferingOldest should favor keeping older elements over newer ones" + ) + } + } + + // MARK: - Cancellation Tests + + func test_share_cancellation_of_single_consumer() async { + let shared = Indefinite(value: 42).async.legacyShare() + + let finished = expectation(description: "finished") + let iterated = expectation(description: "iterated") + + let task = Task { + var firstIteration = false + for await _ in shared { + if !firstIteration { + firstIteration = true + iterated.fulfill() + } + } + finished.fulfill() + } + + // Wait for the task to start iterating + await fulfillment(of: [iterated], timeout: 1.0) + + // Cancel the task + task.cancel() + + // Verify the task finishes + await fulfillment(of: [finished], timeout: 1.0) + } + + func test_share_cancellation_with_multiple_consumers() async { + let shared = Indefinite(value: 42).async.legacyShare() + + let consumer1Finished = expectation(description: "consumer1Finished") + let consumer2Finished = expectation(description: "consumer2Finished") + let consumer1Iterated = expectation(description: "consumer1Iterated") + let consumer2Iterated = expectation(description: "consumer2Iterated") + + let consumer1 = Task { + var firstIteration = false + for await _ in shared { + if !firstIteration { + firstIteration = true + consumer1Iterated.fulfill() + } + } + consumer1Finished.fulfill() + } + + let consumer2 = Task { + var firstIteration = false + for await _ in shared { + if !firstIteration { + firstIteration = true + consumer2Iterated.fulfill() + } + } + consumer2Finished.fulfill() + } + + // Wait for both consumers to start + await fulfillment(of: [consumer1Iterated, consumer2Iterated], timeout: 1.0) + + // Cancel only consumer1 + consumer1.cancel() + + // Consumer1 should finish + await fulfillment(of: [consumer1Finished], timeout: 1.0) + + // Consumer2 should still be running, so cancel it too + consumer2.cancel() + await fulfillment(of: [consumer2Finished], timeout: 1.0) + } + + func test_share_cancellation_cancels_source_when_no_consumers() async throws { + let source = Indefinite(value: 1).async + let shared = source.legacyShare() + + let finished = expectation(description: "finished") + let iterated = expectation(description: "iterated") + + let task = Task { + var iterator = shared.makeAsyncIterator() + if await iterator.next() != nil { + iterated.fulfill() + } + // Task will be cancelled here, so iteration should stop + while await iterator.next() != nil { + // Continue iterating until cancelled + } + finished.fulfill() + } + + await fulfillment(of: [iterated], timeout: 1.0) + task.cancel() + await fulfillment(of: [finished], timeout: 1.0) + } + + // MARK: - Error Handling Tests + + func test_share_propagates_errors_to_all_consumers() async { + let source = [1, 2, 3, 4, 5].async.map { value in + if value == 3 { + throw TestError.failure + } + return value + } + let shared = source.legacyShare() + + let consumer1Results = ManagedCriticalState([Int]()) + let consumer2Results = ManagedCriticalState([Int]()) + let consumer1Error = ManagedCriticalState(nil) + let consumer2Error = ManagedCriticalState(nil) + let gate1 = Gate() + let gate2 = Gate() + + let consumer1 = Task { + do { + var iterator = shared.makeAsyncIterator() + gate2.open() + await gate1.enter() + while let value = try await iterator.next() { + consumer1Results.withLock { $0.append(value) } + } + } catch { + consumer1Error.withLock { $0 = error } + } + } + + let consumer2 = Task { + do { + var iterator = shared.makeAsyncIterator() + gate1.open() + await gate2.enter() + while let value = try await iterator.next() { + consumer2Results.withLock { $0.append(value) } + } + } catch { + consumer2Error.withLock { $0 = error } + } + } + + await consumer1.value + await consumer2.value + + // Both consumers should receive the first two elements + XCTAssertEqual(consumer1Results.withLock { $0 }, [1, 2]) + XCTAssertEqual(consumer2Results.withLock { $0 }, [1, 2]) + + // Both consumers should receive the error + XCTAssertTrue(consumer1Error.withLock { $0 is TestError }) + XCTAssertTrue(consumer2Error.withLock { $0 is TestError }) + } + + // MARK: - Timing and Race Condition Tests + + func test_share_with_late_joining_consumer() async throws { + var gated = GatedSequence([1, 2, 3, 4, 5]) + let shared = gated.legacyShare(bufferingPolicy: .unbounded) + + let earlyResults = ManagedCriticalState([Int]()) + let lateResults = ManagedCriticalState([Int]()) + + // Start early consumer + let earlyConsumer = Task { + var iterator = shared.makeAsyncIterator() + while let value = await iterator.next() { + earlyResults.withLock { $0.append(value) } + } + } + + // Advance some elements + gated.advance() // 1 + gated.advance() // 2 + + // Give early consumer time to consume + try? await Task.sleep(milliseconds: 10) + + // Start late consumer + let lateConsumer = Task { + var iterator = shared.makeAsyncIterator() + while let value = await iterator.next() { + lateResults.withLock { $0.append(value) } + } + } + + // Advance remaining elements + gated.advance() // 3 + gated.advance() // 4 + gated.advance() // 5 + + await earlyConsumer.value + await lateConsumer.value + + // Early consumer gets all elements + XCTAssertEqual(earlyResults.withLock { $0 }, [1, 2, 3, 4, 5]) + // Late consumer only gets elements from when it joined + XCTAssertTrue(lateResults.withLock { $0.count <= 5 }) + } + + func test_share_iterator_independence() async throws { + let source = [1, 2, 3, 4, 5] + let shared = source.async.legacyShare() + + var iterator1 = shared.makeAsyncIterator() + var iterator2 = shared.makeAsyncIterator() + + // Both iterators should independently get the same elements + let value1a = await iterator1.next() + let value2a = await iterator2.next() + + let value1b = await iterator1.next() + let value2b = await iterator2.next() + + XCTAssertEqual(value1a, 1) + XCTAssertEqual(value2a, 1) + XCTAssertEqual(value1b, 2) + XCTAssertEqual(value2b, 2) + } + + // MARK: - Memory and Resource Management Tests + + func test_share_cleans_up_when_all_consumers_finish() async throws { + let source = [1, 2, 3] + let shared = source.async.legacyShare() + + var results = [Int]() + for await value in shared { + results.append(value) + } + + XCTAssertEqual(results, [1, 2, 3]) + + // Create a new iterator after the sequence finished + var newIterator = shared.makeAsyncIterator() + let value = await newIterator.next() + XCTAssertNil(value) // Should return nil since source is exhausted + } + + func test_share_multiple_sequential_consumers() async { + let source = [1, 2, 3, 4, 5] + let shared = source.async.legacyShare(bufferingPolicy: .unbounded) + + // First consumer + var results1 = [Int]() + for await value in shared { + results1.append(value) + } + + // Second consumer (starting after first finished) + var results2 = [Int]() + for await value in shared { + results2.append(value) + } + + XCTAssertEqual(results1, [1, 2, 3, 4, 5]) + XCTAssertEqual(results2, []) // Should be empty since source is exhausted + } + + func test_share_rethrows_failure_type_on_backported() async { + let shared = AsyncThrowingStream { + $0.finish(throwing: TestError.failure) + }.legacyShare() + do { + for try await _ in shared { + XCTFail("Expected to not get here") + } + } catch { + XCTAssertEqual(error as? TestError, .failure) + } + } + + func test_share_rethrows_failure_type_without_falling_back_to_any_error() async { + guard #available(AsyncAlgorithms 1.2, *) else { + return + } + // Ensure - at compile time - that error is effectively a TestError + let shared: some AsyncSequence = FailingSequence(TestError.failure).legacyShare() + do { + for try await _ in shared { + XCTFail("Expected to not get here") + } + } catch { + XCTAssertEqual(error, TestError.failure) + } + } +} + +// MARK: - Helper Types + +private enum TestError: Error, Equatable { + case failure +}