From 04a8cd0bca931ae438b3c6f3c1fb37c704b9baf5 Mon Sep 17 00:00:00 2001 From: Mark Johnson Date: Mon, 2 Jun 2025 11:00:59 -0400 Subject: [PATCH 1/2] Move AnyAsyncSequence into WorkflowConcurrency as AsyncSequenceWorker --- .../Sources/AsyncSequenceWorker.swift | 72 +++ .../Tests/AsyncSequenceWorkerTests.swift | 432 ++++++++++++++++++ 2 files changed, 504 insertions(+) create mode 100644 WorkflowConcurrency/Sources/AsyncSequenceWorker.swift create mode 100644 WorkflowConcurrency/Tests/AsyncSequenceWorkerTests.swift diff --git a/WorkflowConcurrency/Sources/AsyncSequenceWorker.swift b/WorkflowConcurrency/Sources/AsyncSequenceWorker.swift new file mode 100644 index 000000000..a8533b76d --- /dev/null +++ b/WorkflowConcurrency/Sources/AsyncSequenceWorker.swift @@ -0,0 +1,72 @@ +import Foundation +import Workflow + +/// Workers define a unit of asynchronous work. +/// +/// During a render pass, a workflow can ask the context to await the result of a worker. +/// +/// When this occurs, the context checks to see if there is already a running worker of the same type. +/// If there is, and if the workers are 'equivalent', the context leaves the existing worker running. +/// +/// If there is not an existing worker of this type, the context will kick off the new worker (via `run`). +public protocol AsyncSequenceWorker: AnyWorkflowConvertible where Rendering == Void { + /// The type of output events returned by this worker. + associatedtype Output + + // In iOS 18+ we can do: + // func run() -> any AsyncSequence + // And then remove the casting in the side effect + + /// Returns an `AsyncSequence` to execute the work represented by this worker. + func run() -> any AsyncSequence + /// Returns `true` if the other worker should be considered equivalent to `self`. Equivalence should take into + /// account whatever data is meaningful to the task. For example, a worker that loads a user account from a server + /// would not be equivalent to another worker with a different user ID. + func isEquivalent(to otherWorker: Self) -> Bool +} + +extension AsyncSequenceWorker { + public func asAnyWorkflow() -> AnyWorkflow { + AsyncSequenceWorkerWorkflow(worker: self).asAnyWorkflow() + } +} + +struct AsyncSequenceWorkerWorkflow: Workflow { + let worker: WorkerType + + typealias Output = WorkerType.Output + typealias Rendering = Void + typealias State = UUID + + func makeInitialState() -> State { UUID() } + + func workflowDidChange(from previousWorkflow: AsyncSequenceWorkerWorkflow, state: inout UUID) { + if !worker.isEquivalent(to: previousWorkflow.worker) { + state = UUID() + } + } + + func render(state: State, context: RenderContext) -> Rendering { + let sink = context.makeSink(of: AnyWorkflowAction.self) + context.runSideEffect(key: state) { lifetime in + let task = Task { + for try await output in worker.run() { + // Not necessary in iOS 18+ once we can use AsyncSequence + guard let output = output as? Output else { + fatalError("Unexpected output type \(type(of: output)) from worker \(worker)") + } + await sendAction(output: output, sink: sink) + } + } + + lifetime.onEnded { + task.cancel() + } + } + } + + @MainActor + func sendAction(output: Output, sink: Sink>>) { + sink.send(AnyWorkflowAction(sendingOutput: output)) + } +} diff --git a/WorkflowConcurrency/Tests/AsyncSequenceWorkerTests.swift b/WorkflowConcurrency/Tests/AsyncSequenceWorkerTests.swift new file mode 100644 index 000000000..d0e26ebb9 --- /dev/null +++ b/WorkflowConcurrency/Tests/AsyncSequenceWorkerTests.swift @@ -0,0 +1,432 @@ +import Workflow +import WorkflowTesting +import XCTest +@testable import WorkflowConcurrency + +class AsyncSequenceWorkerTests: XCTestCase { + func testWorkerOutput() { + let host = WorkflowHost( + workflow: TestIntWorkerWorkflow(key: "", isEquivalent: true) + ) + + let expectation = XCTestExpectation() + let disposable = host.rendering.signal.observeValues { rendering in + expectation.fulfill() + } + + XCTAssertEqual(0, host.rendering.value.intValue) + + wait(for: [expectation], timeout: 1.0) + XCTAssertEqual(1, host.rendering.value.intValue) + + disposable?.dispose() + } + + func testNotEquivalentWorker() { + // Create the workflow which causes the IntWorker to run. + let host = WorkflowHost( + workflow: TestIntWorkerWorkflow(key: "", isEquivalent: false) + ) + + var expectation = XCTestExpectation() + // Set to observe renderings. + // This expectation should be called after the IntWorker runs + // and updates the state. + var disposable = host.rendering.signal.observeValues { rendering in + expectation.fulfill() + } + + // Test to make sure the initial state of the workflow is correct. + XCTAssertEqual(0, host.rendering.value.intValue) + + // Wait for the worker to run. + wait(for: [expectation], timeout: 1.0) + // Test to make sure the rendering after the worker runs is correct. + XCTAssertEqual(1, host.rendering.value.intValue) + + disposable?.dispose() + expectation = XCTestExpectation() + // Set to observe renderings. + // This expectation should be called after the add one action is sent. + disposable = host.rendering.signal.observeValues { rendering in + expectation.fulfill() + } + + // Send an addOne action to add 1 to the state. + host.rendering.value.addOne() + + // Wait for the action to trigger a render. + wait(for: [expectation], timeout: 1.0) + // Test to make sure the rendering equals 2 now that the action has run. + XCTAssertEqual(2, host.rendering.value.intValue) + + disposable?.dispose() + expectation = XCTestExpectation() + // Set to observe renderings + // Since isEquivalent is set to false in the worker + // the worker should run again and update the rendering. + disposable = host.rendering.signal.observeValues { rendering in + expectation.fulfill() + } + + // Wait for worker to run. + wait(for: [expectation], timeout: 1) + // Verify the rendering changed after the worker is run. + XCTAssertEqual(1, host.rendering.value.intValue) + + disposable?.dispose() + } + + func testEquivalentWorker() { + // Create the workflow which causes the IntWorker to run. + let host = WorkflowHost( + workflow: TestIntWorkerWorkflow(key: "", isEquivalent: true) + ) + + var expectation = XCTestExpectation() + // Set to observe renderings. + // This expectation should be called after the IntWorker runs + // and updates the state. + var disposable = host.rendering.signal.observeValues { rendering in + expectation.fulfill() + } + + // Test to make sure the initial state of the workflow is correct. + XCTAssertEqual(0, host.rendering.value.intValue) + + // Wait for the worker to run. + wait(for: [expectation], timeout: 1.0) + // Test to make sure the rendering after the worker runs is correct. + XCTAssertEqual(1, host.rendering.value.intValue) + + disposable?.dispose() + expectation = XCTestExpectation() + // Set to observe renderings. + // This expectation should be called after the add one action is sent. + disposable = host.rendering.signal.observeValues { rendering in + expectation.fulfill() + } + + // Send an addOne action to add 1 to the state. + host.rendering.value.addOne() + + // Wait for the action to trigger a render. + wait(for: [expectation], timeout: 1.0) + // Test to make sure the rendering equals 2 now that the action has run. + XCTAssertEqual(2, host.rendering.value.intValue) + + disposable?.dispose() + // Set to observe renderings + // This expectation should be called after the workflow is updated. + // After the host is updated with a new workflow instance the + // initial state should be 2. + expectation = XCTestExpectation() + disposable = host.rendering.signal.observeValues { rendering in + expectation.fulfill() + } + + // Update the workflow. + host.update(workflow: TestIntWorkerWorkflow(key: "", isEquivalent: true)) + // Wait for the workflow to render after being updated. + wait(for: [expectation], timeout: 1.0) + // Test to make sure the rendering matches the existing state. + XCTAssertEqual(2, host.rendering.value.intValue) + + disposable?.dispose() + // The workflow should not produce another rendering. + expectation = XCTestExpectation() + // The expectation is inverted because there should not be another rendering + // since the worker returned isEquivalent is true. + expectation.isInverted = true + disposable = host.rendering.signal.observeValues { rendering in + // This should not be called! + expectation.fulfill() + } + + // Wait to see if the expection is fullfulled. + wait(for: [expectation], timeout: 1) + // Verify the rendering didn't change and is still 2. + XCTAssertEqual(2, host.rendering.value.intValue) + + disposable?.dispose() + } + + func testChangingIsEquivalent() { + // Create the workflow which causes the IntWorker to run. + let host = WorkflowHost( + workflow: TestIntWorkerWorkflow(key: "", isEquivalent: true) + ) + + var expectation = XCTestExpectation() + // Set to observe renderings. + // This expectation should be called after the IntWorker runs and + // updates the state. + var disposable = host.rendering.signal.observeValues { rendering in + expectation.fulfill() + } + + // Test to make sure the initial state of the workflow is correct. + XCTAssertEqual(0, host.rendering.value.intValue) + + // Wait for the worker to run. + wait(for: [expectation], timeout: 1.0) + // Test to make sure the rendering after the worker runs is correct. + XCTAssertEqual(1, host.rendering.value.intValue) + + disposable?.dispose() + expectation = XCTestExpectation() + // Set to observe renderings. + // This expectation should be called after the add one action is sent. + disposable = host.rendering.signal.observeValues { rendering in + expectation.fulfill() + } + + // Send an addOne action to add 1 to the state. + host.rendering.value.addOne() + + // Wait for the action to trigger a render. + wait(for: [expectation], timeout: 1.0) + // Test to make sure the rendering equals 2 now that the action has run. + XCTAssertEqual(2, host.rendering.value.intValue) + + disposable?.dispose() + // Set to observe renderings. + // This expectation should be called after the workflow is updated. + // After the host is updated with a new workflow instance the + // initial state should be 2. + expectation = XCTestExpectation() + disposable = host.rendering.signal.observeValues { rendering in + expectation.fulfill() + } + + // Update the workflow to change the isEquivalent for the worker. + host.update(workflow: TestIntWorkerWorkflow(key: "", isEquivalent: false)) + // Wait for the workflow to render after being updated. + wait(for: [expectation], timeout: 1.0) + // Test to make sure the rendering matches the existing state. + XCTAssertEqual(2, host.rendering.value.intValue) + + disposable?.dispose() + expectation = XCTestExpectation() + // Set to observe renderings + // Since isEquivalent is set to false in the worker + // the worker should run again and update the rendering. + disposable = host.rendering.signal.observeValues { rendering in + expectation.fulfill() + } + + // Wait for worker to run. + wait(for: [expectation], timeout: 1) + // Verify the rendering changed after the worker is run. + XCTAssertEqual(1, host.rendering.value.intValue) + + disposable?.dispose() + } + + func testContinuousIntWorker() { + let host = WorkflowHost( + workflow: TestContinuousIntWorkerWorkflow(key: "") + ) + + let expectation = XCTestExpectation() + expectation.expectedFulfillmentCount = 5 + var expectedInt = 0 + let disposable = host.rendering.signal.observeValues { rendering in + expectedInt += 1 + XCTAssertEqual(expectedInt, rendering) + expectation.fulfill() + } + + XCTAssertEqual(0, host.rendering.value) + + wait(for: [expectation], timeout: 1.0) + XCTAssertEqual(expectedInt, host.rendering.value) + + disposable?.dispose() + } + + func testExpectedWorker() { + TestIntWorkerWorkflow(key: "123", isEquivalent: true) + .renderTester() + .expectWorkflow( + type: AsyncSequenceWorkerWorkflow.self, + key: "123", + producingRendering: (), + producingOutput: 1, + assertions: { _ in } + ) + .render { _ in } + .verifyState { state in + XCTAssertEqual(state, 1) + } + } + + // A worker declared on a first `render` pass that is not on a subsequent should have the work cancelled. + func test_cancelsWorkers() { + struct WorkerWorkflow: Workflow { + typealias State = Void + + enum Mode { + case notWorking + case working(start: XCTestExpectation, end: XCTestExpectation) + } + + let mode: Mode + + func render(state: State, context: RenderContext) -> Bool { + switch mode { + case .notWorking: + return false + case .working(start: let startExpectation, end: let endExpectation): + ExpectingWorker( + startExpectation: startExpectation, + endExpectation: endExpectation + ) + .mapOutput { _ in AnyWorkflowAction.noAction } + .running(in: context) + return true + } + } + + struct ExpectingWorker: AsyncSequenceWorker { + typealias Output = Void + + let startExpectation: XCTestExpectation + let endExpectation: XCTestExpectation + + func run() -> any AsyncSequence { + startExpectation.fulfill() + return AsyncStream { + if Task.isCancelled { + endExpectation.fulfill() + return nil + } + return () + } + onCancel: { @Sendable () in endExpectation.fulfill() } + } + + func isEquivalent(to otherWorker: WorkerWorkflow.ExpectingWorker) -> Bool { + true + } + } + } + + let startExpectation = XCTestExpectation() + let endExpectation = XCTestExpectation() + let host = WorkflowHost( + workflow: WorkerWorkflow(mode: .working( + start: startExpectation, + end: endExpectation + )) + ) + + wait(for: [startExpectation], timeout: 1.0) + + host.update(workflow: WorkerWorkflow(mode: .notWorking)) + + wait(for: [endExpectation], timeout: 1.0) + } + + private struct TestIntWorkerRendering { + let intValue: Int + let addOne: () -> Void + } + + private struct TestIntWorkerWorkflow: Workflow { + enum Action: WorkflowAction { + typealias WorkflowType = TestIntWorkerWorkflow + + case add(Int) + + func apply(toState state: inout WorkflowType.State, context: ApplyContext) -> WorkflowType.Output? { + switch self { + case .add(let value): + state += value + return nil + } + } + } + + typealias State = Int + typealias Rendering = TestIntWorkerRendering + + let key: String + let isEquivalent: Bool + + func makeInitialState() -> Int { 0 } + + func render(state: Int, context: RenderContext) -> TestIntWorkerRendering { + let sink = context.makeSink(of: Action.self) + + IntWorker(isEquivalent: isEquivalent) + .mapOutput { output in + AnyWorkflowAction { state in + state = output + return nil + } + } + .running(in: context, key: key) + + return TestIntWorkerRendering(intValue: state, addOne: { + sink.send(.add(1)) + + }) + } + } + + private struct IntWorker: AsyncSequenceWorker { + let isEquivalent: Bool + + func run() -> any AsyncSequence { + AsyncStream(Int.self) { continuation in + continuation.yield(1) + continuation.finish() + } + } + + func isEquivalent(to otherWorker: IntWorker) -> Bool { + isEquivalent + } + + typealias Output = Int + } + + private struct TestContinuousIntWorkerWorkflow: Workflow { + typealias State = Int + typealias Rendering = Int + + let key: String + + func makeInitialState() -> Int { 0 } + + func render(state: Int, context: RenderContext) -> Int { + ContinuousIntWorker() + .mapOutput { output in + AnyWorkflowAction { state in + state = output + return nil + } + } + .running(in: context, key: key) + + return state + } + } + + private struct ContinuousIntWorker: AsyncSequenceWorker { + func run() -> any AsyncSequence { + var i = 0 + return AsyncStream { + i += 1 + return i + } + } + + func isEquivalent(to otherWorker: ContinuousIntWorker) -> Bool { + true + } + + typealias Output = Int + } +} From 389e6a1601e2c98a1f71177b2a6987de41542aad Mon Sep 17 00:00:00 2001 From: Mark Johnson Date: Mon, 2 Jun 2025 12:15:07 -0400 Subject: [PATCH 2/2] Move sink.send to MainActor run block --- WorkflowConcurrency/Sources/AsyncSequenceWorker.swift | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/WorkflowConcurrency/Sources/AsyncSequenceWorker.swift b/WorkflowConcurrency/Sources/AsyncSequenceWorker.swift index a8533b76d..c4d2fcbb4 100644 --- a/WorkflowConcurrency/Sources/AsyncSequenceWorker.swift +++ b/WorkflowConcurrency/Sources/AsyncSequenceWorker.swift @@ -55,7 +55,9 @@ struct AsyncSequenceWorkerWorkflow: Workflow { guard let output = output as? Output else { fatalError("Unexpected output type \(type(of: output)) from worker \(worker)") } - await sendAction(output: output, sink: sink) + await MainActor.run { + sink.send(AnyWorkflowAction(sendingOutput: output)) + } } } @@ -64,9 +66,4 @@ struct AsyncSequenceWorkerWorkflow: Workflow { } } } - - @MainActor - func sendAction(output: Output, sink: Sink>>) { - sink.send(AnyWorkflowAction(sendingOutput: output)) - } }