Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 26 additions & 17 deletions Workflow/Sources/SubtreeManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,10 @@ extension WorkflowNode.SubtreeManager {
func makeSink<Action: WorkflowAction>(
of actionType: Action.Type
) -> Sink<Action> where WorkflowType == Action.WorkflowType {
let reusableSink = sinkStore.findOrCreate(actionType: Action.self)
let reusableSink = sinkStore.findOrCreate(
actionType: Action.self,
onSinkEvent: hostContext.onSinkEvent
)

let sink = Sink<Action> { [weak reusableSink] action in
WorkflowLogger.logSinkEvent(ref: SignpostRef(), action: action)
Expand Down Expand Up @@ -320,7 +323,10 @@ extension WorkflowNode.SubtreeManager {
self.usedSinks = [:]
}

mutating func findOrCreate<Action: WorkflowAction>(actionType: Action.Type) -> ReusableSink<Action> {
mutating func findOrCreate<Action: WorkflowAction>(
actionType: Action.Type,
onSinkEvent: @escaping OnSinkEvent
) -> ReusableSink<Action> {
let key = ObjectIdentifier(actionType)

let reusableSink: ReusableSink<Action>
Expand All @@ -334,7 +340,7 @@ extension WorkflowNode.SubtreeManager {
reusableSink = usedSink
} else {
// Create a new reusable sink.
reusableSink = ReusableSink<Action>()
reusableSink = ReusableSink<Action>(onSinkEvent: onSinkEvent)
}

usedSinks[key] = reusableSink
Expand All @@ -345,30 +351,33 @@ extension WorkflowNode.SubtreeManager {

/// Type-erased base class for reusable sinks.
fileprivate class AnyReusableSink {
/// The callback to invoke when an event is to be handled.
let onSinkEvent: OnSinkEvent
var eventPipe: EventPipe

init() {
init(onSinkEvent: @escaping OnSinkEvent) {
self.onSinkEvent = onSinkEvent
self.eventPipe = EventPipe()
}
}

fileprivate final class ReusableSink<Action: WorkflowAction>: AnyReusableSink where Action.WorkflowType == WorkflowType {
func handle(action: Action) {
let output = Output.update(
action,
source: .external,
subtreeInvalidated: false // initial state
)
let perform: () -> Void = {
let output = Output.update(
action,
source: .external,
subtreeInvalidated: false // initial state
)

if case .pending = eventPipe.validationState {
// Workflow is currently processing an `event`.
// Scheduling it to be processed after.
DispatchQueue.workflowExecution.async { [weak self] in
self?.eventPipe.handle(event: output)
}
return
Comment on lines 363 to 369
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trying to think through if we need this check anymore... the pending state should only have been possible to be in after a node was rendered during a render pass, but before the rendering & output had finished being emitted. i guess it's maybe conceivable that a child node could do something that would trigger some side effect that sent a sink event to a parent node in the pending state... 🤔. i think the 'happy path' case is that it would just be enqueued b/c we'd be re-rendering due to handling an event (and so would hit the 'enqueue' case in the onSinkEvent callback), but there is maybe an edge case to consider where the workflow host updates the root node independently (no event being handled) so maybe we should be a bit more cautious here...

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you've convinced me that it still makes sense to handle this possible edge case, what's the downside of leaving this? Does the dispatch approach not work with the new paradigm?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i agree, and think leaving something like this (at least for now1) makes sense. i think we do still need to change the logic that gets enqueued – the way the existing code works fails in the 'someone spun the run loop before you finished a render pass' case, because those manual run loop turns can run that enqueued block, but it doesn't recurse into the ReusableSink method itself and re-check the validation state; it just unconditionally forwards through to the event pipe and we hope for the best (which seems to often crash in that edge case).

there's also an API design question here in my mind – who is responsible for making the check? i'm inclined to also move that logic out to the new SinkEventHandler type so the decision making is basically all in one place, but we'll need to pass the node-local state through to do that. it's a little awkward b/c the validation state enum is generic over various things, but we could just pass a isPending flag in the callback i suppose.

Footnotes

  1. it's conceivable to me that much of the node-local state could probably be moved out to the 'tree level', but haven't thought of a compelling reason to work though that at the moment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i made a couple changes to address this:

  1. added a new method withEventHandlingSuspended (better names welcome) that can be used by the workflow host to explicitly ensure no synchronous event handlers will be run when updating the root node of the tree
  2. restored the original handling logic & made the new event handling code paths conditional on a runtime configuration, so they will be opt-in and we can enable it via a feature flag

self.eventPipe.handle(event: output)
}
eventPipe.handle(event: output)

let enqueue: () -> Void = { [weak self] in
self?.handle(action: action)
}

onSinkEvent(perform, enqueue)
}
}
}
Expand Down
89 changes: 87 additions & 2 deletions Workflow/Sources/WorkflowHost.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

import Dispatch
import ReactiveSwift

/// Defines a type that receives debug information about a running workflow hierarchy.
Expand Down Expand Up @@ -50,6 +51,8 @@ public final class WorkflowHost<WorkflowType: Workflow> {
context.debugger
}

let eventHandler: SinkEventHandler

/// Initializes a new host with the given workflow at the root.
///
/// - Parameter workflow: The root workflow in the hierarchy
Expand All @@ -61,6 +64,13 @@ public final class WorkflowHost<WorkflowType: Workflow> {
observers: [WorkflowObserver] = [],
debugger: WorkflowDebugger? = nil
) {
self.eventHandler = SinkEventHandler()
assert(
eventHandler.state == .initializing,
"EventHandler must begin in the `.initializing` state"
)
defer { eventHandler.state = .ready }

let observer = WorkflowObservation
.sharedObserversInterceptor
.workflowObservers(for: observers)
Expand All @@ -69,7 +79,8 @@ public final class WorkflowHost<WorkflowType: Workflow> {
self.context = HostContext(
observer: observer,
debugger: debugger,
runtimeConfig: Runtime.configuration
runtimeConfig: Runtime.configuration,
onSinkEvent: eventHandler.makeOnSinkEventCallback()
)

self.rootNode = WorkflowNode(
Expand Down Expand Up @@ -158,14 +169,19 @@ struct HostContext {
let debugger: WorkflowDebugger?
let runtimeConfig: Runtime.Configuration

/// Event handler to be plumbed through the runtime down to the Sinks
let onSinkEvent: OnSinkEvent

init(
observer: WorkflowObserver?,
debugger: WorkflowDebugger?,
runtimeConfig: Runtime.Configuration
runtimeConfig: Runtime.Configuration,
onSinkEvent: @escaping OnSinkEvent
) {
self.observer = observer
self.debugger = debugger
self.runtimeConfig = runtimeConfig
self.onSinkEvent = onSinkEvent
}
}

Expand All @@ -176,3 +192,72 @@ extension HostContext {
debugger != nil ? perform() : nil
}
}

// MARK: - EventHandler

/// Callback signature for the internal `ReusableSink` types to invoke when
/// they receive an event from the 'outside world'.
/// - Parameter perform: The event handler to invoke if the event can be processed immediately.
/// - Parameter enqueue: The event handler to invoke in the future if the event cannot currently be processed.
typealias OnSinkEvent = (
_ perform: () -> Void,
_ enqueue: @escaping () -> Void
) -> Void

/// Handles events from 'Sinks' such that runtime-level event handling state is appropriately
/// managed, and attempts to perform reentrant action handling can be detected and dealt with.
final class SinkEventHandler {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this type's API and behavior very intuitive, I like it!

enum State {
/// The handler (and related components) are being
/// initialized, and are not yet ready to process events.
/// Attempts to do so in this state will fail with a fatal error.
case initializing

/// An event is currently being processed.
case processingEvent

/// Ready to handle an event.
case ready
}

fileprivate(set) var state: State = .initializing

/// Synchronously performs or enqueues the specified event handlers based on the current
/// event handler state.
/// - Parameters:
/// - perform: The event handling action to perform immediately if possible.
/// - enqueue: The event handling action to enqueue if the event handler is already processing an event.
func performOrEnqueueEvent(
perform: () -> Void,
enqueue: @escaping () -> Void
) {
switch state {
case .initializing:
fatalError("Tried to handle event before finishing initialization.")

case .processingEvent:
DispatchQueue.workflowExecution.async(execute: enqueue)

case .ready:
state = .processingEvent
defer { state = .ready }
perform()
}
}

/// Creates the callback that should be invoked by Sinks to handle their event appropriately
/// given the `EventHandler`'s current state.
/// - Returns: The callback that should be invoked.
func makeOnSinkEventCallback() -> OnSinkEvent {
// TODO: do we need the weak ref?
let onSinkEvent: OnSinkEvent = { [weak self] perform, enqueue in
guard let self else {
return // TODO: what's the appropriate handling?
}

performOrEnqueueEvent(perform: perform, enqueue: enqueue)
}

return onSinkEvent
}
}
3 changes: 2 additions & 1 deletion Workflow/Tests/TestUtilities.swift
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ extension HostContext {
HostContext(
observer: observer,
debugger: debugger,
runtimeConfig: runtimeConfig
runtimeConfig: runtimeConfig,
onSinkEvent: { perform, _ in perform() }
)
}
}
Expand Down
121 changes: 107 additions & 14 deletions Workflow/Tests/WorkflowHostTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
* limitations under the License.
*/

import ReactiveSwift
import XCTest

@_spi(WorkflowRuntimeConfig) @testable import Workflow

final class WorkflowHostTests: XCTestCase {
Expand Down Expand Up @@ -58,32 +60,84 @@ final class WorkflowHostTests: XCTestCase {
final class WorkflowHost_EventEmissionTests: XCTestCase {
// Previous versions of Workflow would fatalError under this scenario
func test_event_sent_to_invalidated_sink_during_action_handling() {
let root = Parent()
let host = WorkflowHost(workflow: root)
let host = WorkflowHost(workflow: Parent())
let (lifetime, token) = ReactiveSwift.Lifetime.make()
defer { _ = token }
let initialRendering = host.rendering.value
var observedRenderCount = 0

XCTAssertEqual(initialRendering.eventCount, 0)

let disposable = host.rendering.signal.observeValues { rendering in
XCTAssertEqual(rendering.eventCount, 1)
host
.rendering
.signal
.take(during: lifetime)
.observeValues { rendering in
XCTAssertEqual(rendering.eventCount, 1)

// emit another event using an old rendering
// while the first is still being processed, but
// the workflow that handles the event has been
// removed from the tree
if observedRenderCount == 0 {
initialRendering.eventHandler()
}

// emit another event using an old rendering
// while the first is still being processed, but
// the workflow that handles the event has been
// removed from the tree
if observedRenderCount == 0 {
initialRendering.eventHandler()
observedRenderCount += 1
}

observedRenderCount += 1
}
defer { disposable?.dispose() }

// send an event and cause a re-render
initialRendering.eventHandler()

XCTAssertEqual(observedRenderCount, 1)

drainMainQueueBySpinningRunLoop()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Helpful!


// Ensure the invalidated sink doesn't process the event
let nextRendering = host.rendering.value
XCTAssertEqual(nextRendering.eventCount, 1)
XCTAssertEqual(observedRenderCount, 1)
}

func test_reentrant_event_during_render() {
let host = WorkflowHost(workflow: ReentrancyWorkflow())
let (lifetime, token) = ReactiveSwift.Lifetime.make()
defer { _ = token }
let initialRendering = host.rendering.value

var emitReentrantEvent = false

let renderExpectation = expectation(description: "render")
renderExpectation.expectedFulfillmentCount = 2

host
.rendering
.signal
.take(during: lifetime)
.observeValues { val in
defer { renderExpectation.fulfill() }
defer { emitReentrantEvent = true }
guard !emitReentrantEvent else { return }

// In a prior implementation, this would check state local
// to the underlying EventPipe and defer event handling
// into the future. If the RunLoop was spun after that
// point, the action could attempt to be handled and an
// we'd hit a trap when sending a sink an action in an
// invalid state.
//
// 'Real world' code could hit this case as there are some
// UI bindings that fire when a rendering/output is updated
// that call into system API that do sometimes spin the
// RunLoop manually (e.g. stuff calling into WebKit).
initialRendering.sink.send(.event)
drainMainQueueBySpinningRunLoop()
}

// Send an event and cause a re-render
initialRendering.sink.send(.event)

waitForExpectations(timeout: 1)
}
}

Expand Down Expand Up @@ -115,6 +169,35 @@ extension WorkflowHostTests {

// MARK: Utility Types

extension WorkflowHost_EventEmissionTests {
struct ReentrancyWorkflow: Workflow {
typealias State = Void
typealias Output = Never

struct Rendering {
var sink: Sink<Action>!
}

func render(state: Void, context: RenderContext<Self>) -> Rendering {
let sink = context.makeSink(of: Action.self)
return Rendering(sink: sink)
}

enum Action: WorkflowAction {
typealias WorkflowType = ReentrancyWorkflow

case event

func apply(
toState state: inout WorkflowType.State,
context: ApplyContext<WorkflowType>
) -> WorkflowType.Output? {
nil
}
}
}
}

extension WorkflowHost_EventEmissionTests {
struct Parent: Workflow {
struct Rendering {
Expand Down Expand Up @@ -182,3 +265,13 @@ extension WorkflowHost_EventEmissionTests {
}
}
}

private func drainMainQueueBySpinningRunLoop(timeoutSeconds: UInt = 1) {
var done = false
DispatchQueue.main.async { done = true }

let deadline = ContinuousClock.now + .seconds(timeoutSeconds)
while !done, ContinuousClock.now < deadline {
RunLoop.current.run(until: .now.addingTimeInterval(0.01))
}
}
Loading