You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I had searched in the DSIP and found no similar DSIP.
Motivation
Right now, the thread model and state control in the master are very complicated, these come from the original code and design and lead to the project being hard to maintain.
Concurrency modification of state
Multiple places can modify a workflow instance state in the master.
Right now we have an RPC thread pool, a failover daemon thread, a WorkflowExecuteThreadpool, and a TaskExecuteThreadpook. All of them might modify a workflow state or metadata, and all of these modification are not atomic, this means the state of a workflow/task can easily be inconsistent.
e.g. If the task is successful but the moment we stop the workflow, then the task state might be successful first but change to stop last, since the task state update is not automatic.
Furthermore, the API will also modify the workflow state e.g. when we do a pause or kill operation, once we do a pause or kill operation, the API server will change the workflow state to READY_PAUSE/READY_STOP and then send a request to the master by RPC, if the RPC fails, the state be in unexcepted.
No end-to-end task state consistency
Once the master receives a pause/kill request, the master will pause/kill the task in the db first, and then send a pause/kill request to the task executor, the request might handle failed、reject, or loss in a distributed environment.
Then the task state will be inconsistent between the master and task executor.
e.g. We stop the workflow, then the task will be killed, but if the master sends a kill request to the worker failed or the worker handles the kill request failed or worker failover, then the status will be killed in db but the task is still running in worker or remote server.
State control is very complex
There is no state machine in the master, the translation of state relies on a lot of if-else. There are a lot of bugs and it's impossible to write UT for the state translation. Most of the time, contributors fix a bug but write serial new bugs.
If we want to add a new state, this is impossible, the modification scope will be the whole master.
Failover will cause a master crash
The master doesn't use page query to find out the failover workflow instances, in some stress test, the master will directly OOM once it begins to do master failover or global failover.
The global failover check is too frequent, each time when we do a global failover this will bring a big pressure to db since this will need to scan the whole table of workflow instances. But this only needs when the master startup first.
Seriously, we do failover in the registry client thread, this might block the registry client, since the failover will cost a lot of time.
This DSIP hopes to refactor the master and fix these problems.
Design Detail
The final architecture in the master might look like below
Each workflow instance in runtime will be loaded as a WorkflowExecutionRunnable, and the task in the WorkflowExecutionRunnable is represented as a TaskExecutionRunnable. All changes to workflow and task will be triggered by lifecycle event.
WorkflowEventBus
WorkflowEventBus is an event channel that belongs to a specific workflow instance. Each workflow instance has its own EventBus, all operations that will affect the workflow/task running, be transformed into lifecycle events, and put into the EventBus in the order of the order of arrival of events.
This can make the engine correctly easily handle the event since all events belonging to a workflow will be handled by one thread sequence.
WorkflowEventBusCoordinator
The WorkflowEventBusCoordinator is responsible for managing the EventBus and assigns the worker thread for an EventBus.
There are existing configurable workers in the WorkflowEventBusCoordinator. After a WorkflowExecutionRunnable is created, it will be assigned to an EventBusFireWorker, each WorkflowEventBus will only be assigned to one EventBusFireWorker, and one EventBusFireWorker will handle multiple WorkflowEventBus in DFS.
The worker number in WorkflowEventBusCoordinator is the only configuration you need to take care of, this will affect the performance of the master, a large number of workers does not mean you will have a better performance since this will increase os thread context switching, and most of the event handling will rely on db connection, other is memory operation this is very fast, only little event will rely on PRC. So it's better not to use more than twice the number of threads in your service's database connection pool, you should do some stress tests if you want to get the best configuration.
WorkflowLifecycleEvent
The workflow lifecycle event represents the operation that might happen in the workflow runtime.
WorkflowLifecycleEventType
publicenumWorkflowLifecycleEventTypeimplementsILifecycleEventType {
/** * Start the workflow instance */START,
/** * Notify the workflow instance there exist a task has been finished, and should do DAG topology logic transaction. */TOPOLOGY_LOGICAL_TRANSACTION_WITH_TASK_FINISH,
/** * Pause the workflow instance */PAUSE,
/** * The workflow instance has been paused */PAUSED,
/** * Stop the workflow instance */STOP,
/** * The workflow instance has been stopped */STOPPED,
/** * The workflow instance has been success */SUCCEED,
/** * The workflow instance has been failed */FAILED,
/** * Finalize the workflow instance. */FINALIZE,
}
Not all lifecycle is related to a workflow state. The lifecycle event does not correspond to the state.
Since not all lifecycle changes will affect the state change, some are the inner context transform.
TaskLifecycleEvent
TaskLifecycleEventType
publicenumTaskLifecycleEventTypeimplementsILifecycleEventType {
/** * Start the Task instance. */START,
/** * Dispatch the task instance to target. */DISPATCH,
/** * The task instance is dispatched to the target executor server. */DISPATCHED,
/** * // todo: maybe we can remove this event, once the task has been dispatched it should start * The task instance is running at the target executor server. */RUNNING,
/** * Do Timeout strategy of the task instance. */TIMEOUT,
/** * Retry the task instance. */RETRY,
/** * Pause the task instance. */PAUSE,
/** * The task instance is paused. */PAUSED,
/** * Failover the task instance. */FAILOVER,
/** * Kill the task instance. */KILL,
/** * The task instance is killed. */KILLED,
/** * The task instance is success. */SUCCEEDED,
/** * The task instance is failed. */FAILED,
;
}
WorkflowExecutionGraph VS WorkflowGraph
WorkflowExecutionGraph represents a real expected DAG in the runtime, it will record the runtime state of each task chain. You can take it as a physis graph.
WorkflowGraph represents the origin DAG before you trigger a workflow, it will only record the origin DAG context. You can take it as a logic graph.
e.g. If we have a DAG(A, B, C), if we run from B, then the WorkflowExecutionGraph will only contains one task.
And if we run from A the WorkflowExecutionGraph will have the same DAG with WorkflowGraph. In addition, the WorkflowExecutionGraph will contain more runtime data, if we want to know the state of the Workflow in runtime, we can directly query from the WorkflowExecutionGraph.
IWorkflowExecutionGraph
/** * The workflow execution graph represent the real DAG in runtime, it might be a sub DAG of the workflow DAG. * * @see WorkflowExecutionGraph */publicinterfaceIWorkflowExecutionGraph {
/** * Add a new task to the graph. */voidaddNode(TaskExecutionRunnabletaskExecutionRunnable);
/** * Add a new edge to the graph. * <p> Right now, this method call after all the tasks are added to the graph. */voidaddEdge(StringfromTaskName, Set<String> toTaskName);
/** * Return the start tasks, the start tasks in the workflow execution graph is the tasks which predecessors is empty. */List<ITaskExecutionRunnable> getStartNodes();
/** * Get the predecessor tasks of the given task. */List<ITaskExecutionRunnable> getPredecessors(StringtaskName);
/** * Return the successor tasks of the given task. */List<ITaskExecutionRunnable> getSuccessors(StringtaskName);
/** * Return the successor tasks of the given task. */List<ITaskExecutionRunnable> getSuccessors(ITaskExecutionRunnabletaskExecutionRunnable);
/** * Get the ITaskExecutionRunnable by task code. */ITaskExecutionRunnablegetTaskExecutionRunnableByName(StringtaskName);
/** * Get the ITaskExecutionRunnable by task instance id. */ITaskExecutionRunnablegetTaskExecutionRunnableById(IntegertaskInstanceId);
/** * Get the ITaskExecutionRunnable by task code. */ITaskExecutionRunnablegetTaskExecutionRunnableByTaskCode(LongtaskCode);
/** * Get the active TaskExecutionRunnable list. * <p> The active TaskExecutionRunnable means the task is handling in the workflow execution graph. */List<ITaskExecutionRunnable> getActiveTaskExecutionRunnable();
/** * Get all the TaskExecutionRunnable in the graph, this method will return all the TaskExecutionRunnable in the graph, * include active and inactive TaskExecutionRunnable. */List<ITaskExecutionRunnable> getAllTaskExecutionRunnable();
/** * Check whether the given task can be trigger now. * <p> The task can be trigger if all the predecessors are finished and all predecessors are not failure/pause/kill. */booleanisTriggerConditionMet(ITaskExecutionRunnabletaskExecutionRunnable);
/** * Mark the TaskExecutionRunnable is active. * <p> If the TaskExecutionRunnable is active means the task is handling by the workflow. * <p> Once we begin to handle a task, we should mark the TaskExecutionRunnable active. */voidmarkTaskExecutionRunnableActive(ITaskExecutionRunnabletaskExecutionRunnable);
/** * Mark the TaskExecutionRunnable is inactive. * <p> If the TaskExecutionRunnable is inactive means the task has not been handled by the workflow. * <p> Once we finish to handle a task, we should mark the TaskExecutionRunnable inactive. */voidmarkTaskExecutionRunnableInActive(ITaskExecutionRunnabletaskExecutionRunnable);
/** * Mark the TaskExecutionRunnable is skipped. * <p> Once the TaskExecutionRunnable is marked as skipped, this means the task will not be trigger. */voidmarkTaskSkipped(ITaskExecutionRunnabletaskExecutionRunnable);
/** * Mark the Task is skipped. * <p> Once the Task is marked as skipped, this means the task will not be trigger. */voidmarkTaskSkipped(StringtaskName);
/** * Mark the TaskExecutionRunnable chain is failure. * <p> Once the TaskExecutionRunnable chain is failure, then the successors will not be trigger, and the workflow execution graph might be failure. */voidmarkTaskExecutionRunnableChainFailure(ITaskExecutionRunnabletaskExecutionRunnable);
/** * Mark the TaskExecutionRunnable chain is pause. * <p> Once the TaskExecutionRunnable chain is pause, then the successors will not be trigger, and the workflow execution graph might be paused. */voidmarkTaskExecutionRunnableChainPause(ITaskExecutionRunnabletaskExecutionRunnable);
/** * Mark the TaskExecutionRunnable chain is kill. * <p> Once the TaskExecutionRunnable chain is kill, then the successors will not be trigger, and the workflow execution graph might be stop. */voidmarkTaskExecutionRunnableChainKill(ITaskExecutionRunnabletaskExecutionRunnable);
/** * Whether all the TaskExecutionRunnable chain in the graph is finish. */booleanisAllTaskExecutionRunnableChainFinish();
/** * Whether all the TaskExecutionRunnable chain in the graph is finish with success. */booleanisAllTaskExecutionRunnableChainSuccess();
/** * Whether there exist the TaskExecutionRunnable chain in the graph is finish with failure. */booleanisExistFailureTaskExecutionRunnableChain();
/** * Whether there exist the TaskExecutionRunnable chain in the graph is finish with paused. */booleanisExistPauseTaskExecutionRunnableChain();
/** * Whether there exist the TaskExecutionRunnable chain in the graph is finish with kill. */booleanisExistKillTaskExecutionRunnableChain();
/** * Check whether the given task is the end of the task chain. * <p> If the given task has no successor, then it is the end of the task chain. */booleanisEndOfTaskChain(StringtaskName);
/** * Check whether the given task is the end of the task chain. * <p> If the given task has no successor, then it is the end of the task chain. */booleanisEndOfTaskChain(ITaskExecutionRunnabletaskExecutionRunnable);
/** * Whether the given task is skipped. * <p> Once we mark the task is skipped, then the task will not be trigger, and will trigger its successors. */booleanisTaskExecutionRunnableSkipped(ITaskExecutionRunnabletaskExecutionRunnable);
/** * Whether the given task is forbidden. * <p> Once the task is forbidden then it will be passed, and will trigger its successors. */booleanisTaskExecutionRunnableForbidden(ITaskExecutionRunnabletaskExecutionRunnable);
/** * Whether all predecessors task is skipped. * <p> Once all predecessors are marked as skipped, then the task will be marked as skipped, and will trigger its successors. */booleanisAllPredecessorsSkipped(ITaskExecutionRunnabletaskExecutionRunnable);
/** * Whether all predecessors task are condition task. */booleanisAllSuccessorsAreConditionTask(ITaskExecutionRunnabletaskExecutionRunnable);
}
StateMachine
Use StateMachine to control the workflow and task state change.
WorkflowStateMachine
The WorkflowStateMachine used to control the state transition in workflow.
Each state in workflow will implement IWorkflowStateAction.
Each state should implement IWorkflowStateAction and provide the function to handle WorkflowLifecycleEvent.
IWorkflowStateAction
/** * Represents the action to be taken when a workflow is in a certain state and receive a target event. * <p> Each {@link WorkflowExecutionStatus} should have a corresponding {@link IWorkflowStateAction} implementation. * * @see WorkflowRunningStateAction * @see WorkflowReadyPauseStateAction * @see WorkflowPausedStateAction * @see WorkflowReadyStopStateAction * @see WorkflowStoppedStateAction * @see WorkflowSerialWaitStateAction * @see WorkflowFailedStateAction * @see WorkflowSuccessStateAction * @see WorkflowFailoverStateAction * @see WorkflowWaitToRunStateAction */publicinterfaceIWorkflowStateAction {
/** * Perform the necessary actions when the workflow in a certain state receive a {@link WorkflowStartLifecycleEvent}. */voidstartEventAction(finalIWorkflowExecutionRunnableworkflowExecutionRunnable,
finalWorkflowStartLifecycleEventworkflowStartEvent);
/** * Perform the necessary actions when the workflow in a certain state receive a {@link WorkflowTopologyLogicalTransitionWithTaskFinishLifecycleEvent}. */voidtopologyLogicalTransitionEventAction(finalIWorkflowExecutionRunnableworkflowExecutionRunnable,
finalWorkflowTopologyLogicalTransitionWithTaskFinishLifecycleEventworkflowTopologyLogicalTransitionWithTaskFinishEvent);
/** * Perform the necessary actions when the workflow in a certain state receive a {@link WorkflowPauseLifecycleEvent}. */voidpauseEventAction(finalIWorkflowExecutionRunnableworkflowExecutionRunnable,
finalWorkflowPauseLifecycleEventworkflowPauseEvent);
/** * Perform the necessary actions when the workflow in a certain state receive a {@link WorkflowStopLifecycleEvent}. */voidpausedEventAction(finalIWorkflowExecutionRunnableworkflowExecutionRunnable,
finalWorkflowPausedLifecycleEventworkflowPausedEvent);
/** * Perform the necessary actions when the workflow in a certain state receive a {@link WorkflowStopLifecycleEvent}. */voidstopEventAction(finalIWorkflowExecutionRunnableworkflowExecutionRunnable,
finalWorkflowStopLifecycleEventworkflowStopEvent);
/** * Perform the necessary actions when the workflow in a certain state receive a {@link WorkflowStoppedLifecycleEvent}. */voidstoppedEventAction(finalIWorkflowExecutionRunnableworkflowExecutionRunnable,
finalWorkflowStoppedLifecycleEventworkflowStoppedEvent);
/** * Perform the necessary actions when the workflow in a certain state receive a {@link WorkflowSucceedLifecycleEvent}. */voidsucceedEventAction(finalIWorkflowExecutionRunnableworkflowExecutionRunnable,
finalWorkflowSucceedLifecycleEventworkflowSucceedEvent);
/** * Perform the necessary actions when the workflow in a certain state receive a {@link WorkflowFailedLifecycleEvent}. */voidfailedEventAction(finalIWorkflowExecutionRunnableworkflowExecutionRunnable,
finalWorkflowFailedLifecycleEventworkflowFailedEvent);
/** * Perform the necessary actions when the workflow in a certain state receive a {@link WorkflowFinalizeLifecycleEvent}. */voidfinalizeEventAction(finalIWorkflowExecutionRunnableworkflowExecutionRunnable,
finalWorkflowFinalizeLifecycleEventworkflowFinalizeEvent);
/** * Get the {@link WorkflowExecutionStatus} that this action match. */WorkflowExecutionStatusmatchState();
}
TaskStateMachine
The TaskStateAction used to control the state transition in task.
Each state in task will implement ITaskStateAction.
Each state should implement ITaskStateAction and provide the function to handle TaskLifecycleEvent.
ITaskStateAction
/** * Represents the action to be taken when a task is in a certain state and receive a target event. * <p> Each {@link TaskExecutionStatus} should have a corresponding {@link ITaskStateAction} implementation. * * @see TaskSubmittedStateAction * @see TaskDelayExecutionStateAction * @see TaskDispatchStateAction * @see TaskRunningStateAction * @see TaskPauseStateAction * @see TaskKillStateAction * @see TaskFailureStateAction * @see TaskSuccessStateAction * @see TaskForceSuccessStateAction * @see TaskFailoverStateAction */publicinterfaceITaskStateAction {
/** * Perform the necessary actions when the task in a certain state receive a {@link TaskStartLifecycleEvent}. * <p> This method is called when you want to start a task. */voidstartEventAction(finalIWorkflowExecutionRunnableworkflowExecutionRunnable,
finalITaskExecutionRunnabletaskExecutionRunnable,
finalTaskStartLifecycleEventtaskStartEvent);
/** * Perform the necessary actions when the task in a certain state receive a {@link TaskRunningLifecycleEvent}. * <p> This method is called when the master receive task running event from executor. */voidstartedEventAction(finalIWorkflowExecutionRunnableworkflowExecutionRunnable,
finalITaskExecutionRunnabletaskExecutionRunnable,
finalTaskRunningLifecycleEventtaskRunningEvent);
/** * Perform the necessary actions when the task in a certain state receive a {@link TaskRetryLifecycleEvent}. * <p> This method is called when the task need to retry. */voidretryEventAction(finalIWorkflowExecutionRunnableworkflowExecutionRunnable,
finalITaskExecutionRunnabletaskExecutionRunnable,
finalTaskRetryLifecycleEventtaskRetryEvent);
/** * Perform the necessary actions when the task in a certain state receive a {@link TaskDispatchLifecycleEvent}. * <p> This method is called when you want to dispatch a task. */voiddispatchEventAction(finalIWorkflowExecutionRunnableworkflowExecutionRunnable,
finalITaskExecutionRunnabletaskExecutionRunnable,
finalTaskDispatchLifecycleEventtaskDispatchEvent);
/** * Perform the necessary actions when the task in a certain state receive a {@link TaskDispatchedLifecycleEvent}. * <p> This method is called when the task has been dispatched to executor. */voiddispatchedEventAction(finalIWorkflowExecutionRunnableworkflowExecutionRunnable,
finalITaskExecutionRunnabletaskExecutionRunnable,
finalTaskDispatchedLifecycleEventtaskDispatchedEvent);
/** * Perform the necessary actions when the task in a certain state receive a {@link TaskPauseLifecycleEvent}. * <p> This method is called when you want to pause a task. */voidpauseEventAction(finalIWorkflowExecutionRunnableworkflowExecutionRunnable,
finalITaskExecutionRunnabletaskExecutionRunnable,
finalTaskPauseLifecycleEventtaskPauseEvent);
/** * Perform the necessary actions when the task in a certain state receive a {@link TaskPausedLifecycleEvent}. * <p> This method is called when the task has been paused. */voidpausedEventAction(finalIWorkflowExecutionRunnableworkflowExecutionRunnable,
finalITaskExecutionRunnabletaskExecutionRunnable,
finalTaskPausedLifecycleEventtaskPausedEvent);
/** * Perform the necessary actions when the task in a certain state receive a {@link TaskKillLifecycleEvent}. * <p> This method is called when you want to kill a task. */voidkillEventAction(finalIWorkflowExecutionRunnableworkflowExecutionRunnable,
finalITaskExecutionRunnabletaskExecutionRunnable,
finalTaskKillLifecycleEventtaskKillEvent);
/** * Perform the necessary actions when the task in a certain state receive a {@link TaskKilledLifecycleEvent}. * <p> This method is called when the task has been killed. */voidkilledEventAction(finalIWorkflowExecutionRunnableworkflowExecutionRunnable,
finalITaskExecutionRunnabletaskExecutionRunnable,
finalTaskKilledLifecycleEventtaskKilledEvent);
/** * Perform the necessary actions when the task in a certain state receive a {@link TaskFailedLifecycleEvent}. * <p> This method is called when the task has been failed. */voidfailedEventAction(finalIWorkflowExecutionRunnableworkflowExecutionRunnable,
finalITaskExecutionRunnabletaskExecutionRunnable,
finalTaskFailedLifecycleEventtaskFailedEvent);
/** * Perform the necessary actions when the task in a certain state receive a {@link TaskSuccessLifecycleEvent}. * <p> This method is called when the task has been success. */voidsucceedEventAction(finalIWorkflowExecutionRunnableworkflowExecutionRunnable,
finalITaskExecutionRunnabletaskExecutionRunnable,
finalTaskSuccessLifecycleEventtaskSuccessEvent);
/** * Perform the necessary actions when the task in a certain state receive a {@link TaskFailoverLifecycleEvent}. * <p> This method is called when the task need to failover. */voidfailoverEventAction(finalIWorkflowExecutionRunnableworkflowExecutionRunnable,
finalITaskExecutionRunnabletaskExecutionRunnable,
finalTaskFailoverLifecycleEventtaskFailoverEvent);
/** * Get the {@link TaskExecutionStatus} that this action match. */TaskExecutionStatusmatchState();
}
Failover
The failover will use a SystemEventBus to trigger.
There are exist three kinds of failover event in DS.
GlobalMasterFailover
The GlobalMasterFailover used to failover the history workflow in the systems. Once the master server startup it will publish a GlobalMasterFailoverEvent, the GlobalMasterFailover will scan the not finished workflow instance in DB, and check whether we should failover them, so the GlobalMasterFailover will cost a lot of time.
MasterFailover
The MasterFailover is used to failover the workflow under the crashed master. Once the master crashed, then other active masters will receive the master crashed event, then publish a MasterFailoverEvent. If all Master crashed, then failover work will rely on GlobalMasterFailover.
WorkerFailover
The WorkerFailover is used to failover the task under the crashed worker. Once the worker crashed, then all active masters will receive the worker crashed event, then publish a WorkerFailoverEvent. Each master will only failover the task which hold on the master and has been dispatched to the crashed worker.
Compatibility, Deprecation, and Migration Plan
Compatibility with the latest version, but will deprecated some interface which is not used by ui.
Test Plan
Test by UT, E2E, manual.
Master Integration Test
Right now, there is no master integration test, in this DSIP, will add master IT test support.
ruanwenjun
changed the title
[DSIP-][Master] Unify master thread pool and event orchestration.
[DSIP-][Master] Unify master threadpool and event orchestration.
Aug 7, 2024
ruanwenjun
changed the title
[DSIP-][Master] Unify master threadpool and event orchestration.
[DSIP-61][Master] Unify master threadpool and event orchestration.
Aug 7, 2024
ruanwenjun
changed the title
[DSIP-61][Master] Unify master threadpool and event orchestration.
[DSIP-61][Master] Refactor thread pool and state event orchestration in master
Aug 11, 2024
Search before asking
Motivation
Right now, the thread model and state control in the master are very complicated, these come from the original code and design and lead to the project being hard to maintain.
Concurrency modification of state
Multiple places can modify a workflow instance state in the master.
Right now we have an RPC thread pool, a failover daemon thread, a WorkflowExecuteThreadpool, and a TaskExecuteThreadpook. All of them might modify a workflow state or metadata, and all of these modification are not atomic, this means the state of a workflow/task can easily be inconsistent.
e.g. If the task is successful but the moment we stop the workflow, then the task state might be successful first but change to stop last, since the task state update is not automatic.
Furthermore, the API will also modify the workflow state e.g. when we do a pause or kill operation, once we do a pause or kill operation, the API server will change the workflow state to READY_PAUSE/READY_STOP and then send a request to the master by RPC, if the RPC fails, the state be in unexcepted.
No end-to-end task state consistency
Once the master receives a pause/kill request, the master will pause/kill the task in the db first, and then send a pause/kill request to the task executor, the request might handle failed、reject, or loss in a distributed environment.
Then the task state will be inconsistent between the master and task executor.
e.g. We stop the workflow, then the task will be killed, but if the master sends a kill request to the worker failed or the worker handles the kill request failed or worker failover, then the status will be killed in db but the task is still running in worker or remote server.
State control is very complex
There is no state machine in the master, the translation of state relies on a lot of if-else. There are a lot of bugs and it's impossible to write UT for the state translation. Most of the time, contributors fix a bug but write serial new bugs.
If we want to add a new state, this is impossible, the modification scope will be the whole master.
Failover will cause a master crash
The master doesn't use page query to find out the failover workflow instances, in some stress test, the master will directly OOM once it begins to do master failover or global failover.
The global failover check is too frequent, each time when we do a global failover this will bring a big pressure to db since this will need to scan the whole table of workflow instances. But this only needs when the master startup first.
Seriously, we do failover in the registry client thread, this might block the registry client, since the failover will cost a lot of time.
This DSIP hopes to refactor the master and fix these problems.
Design Detail
The final architecture in the master might look like below
Each workflow instance in runtime will be loaded as a WorkflowExecutionRunnable, and the task in the WorkflowExecutionRunnable is represented as a TaskExecutionRunnable. All changes to workflow and task will be triggered by lifecycle event.
WorkflowEventBus
WorkflowEventBus is an event channel that belongs to a specific workflow instance. Each workflow instance has its own EventBus, all operations that will affect the workflow/task running, be transformed into lifecycle events, and put into the EventBus in the order of the order of arrival of events.
This can make the engine correctly easily handle the event since all events belonging to a workflow will be handled by one thread sequence.
WorkflowEventBusCoordinator
The WorkflowEventBusCoordinator is responsible for managing the EventBus and assigns the worker thread for an EventBus.
There are existing configurable workers in the WorkflowEventBusCoordinator. After a WorkflowExecutionRunnable is created, it will be assigned to an EventBusFireWorker, each WorkflowEventBus will only be assigned to one EventBusFireWorker, and one EventBusFireWorker will handle multiple WorkflowEventBus in DFS.
WorkflowEventBusCoordinator
The worker number in WorkflowEventBusCoordinator is the only configuration you need to take care of, this will affect the performance of the master, a large number of workers does not mean you will have a better performance since this will increase os thread context switching, and most of the event handling will rely on db connection, other is memory operation this is very fast, only little event will rely on PRC. So it's better not to use more than twice the number of threads in your service's database connection pool, you should do some stress tests if you want to get the best configuration.
WorkflowLifecycleEvent
The workflow lifecycle event represents the operation that might happen in the workflow runtime.
WorkflowLifecycleEventType
Not all lifecycle is related to a workflow state. The lifecycle event does not correspond to the state.
Since not all lifecycle changes will affect the state change, some are the inner context transform.
TaskLifecycleEvent
TaskLifecycleEventType
WorkflowExecutionGraph VS WorkflowGraph
WorkflowExecutionGraph represents a real expected DAG in the runtime, it will record the runtime state of each task chain. You can take it as a physis graph.
WorkflowGraph represents the origin DAG before you trigger a workflow, it will only record the origin DAG context. You can take it as a logic graph.
e.g. If we have a DAG(A, B, C), if we run from B, then the WorkflowExecutionGraph will only contains one task.
And if we run from A the WorkflowExecutionGraph will have the same DAG with WorkflowGraph. In addition, the WorkflowExecutionGraph will contain more runtime data, if we want to know the state of the Workflow in runtime, we can directly query from the WorkflowExecutionGraph.
IWorkflowExecutionGraph
StateMachine
Use
StateMachine
to control the workflow and task state change.WorkflowStateMachine
The WorkflowStateMachine used to control the state transition in workflow.
Each state in workflow will implement IWorkflowStateAction.
Each state should implement IWorkflowStateAction and provide the function to handle WorkflowLifecycleEvent.
IWorkflowStateAction
TaskStateMachine
The TaskStateAction used to control the state transition in task.
Each state in task will implement ITaskStateAction.
Each state should implement ITaskStateAction and provide the function to handle TaskLifecycleEvent.
ITaskStateAction
Failover
The failover will use a SystemEventBus to trigger.
There are exist three kinds of failover event in DS.
GlobalMasterFailover
The GlobalMasterFailover used to failover the history workflow in the systems. Once the master server startup it will publish a GlobalMasterFailoverEvent, the GlobalMasterFailover will scan the not finished workflow instance in DB, and check whether we should failover them, so the GlobalMasterFailover will cost a lot of time.
MasterFailover
The MasterFailover is used to failover the workflow under the crashed master. Once the master crashed, then other active masters will receive the master crashed event, then publish a MasterFailoverEvent. If all Master crashed, then failover work will rely on GlobalMasterFailover.
WorkerFailover
The WorkerFailover is used to failover the task under the crashed worker. Once the worker crashed, then all active masters will receive the worker crashed event, then publish a WorkerFailoverEvent. Each master will only failover the task which hold on the master and has been dispatched to the crashed worker.
Compatibility, Deprecation, and Migration Plan
Compatibility with the latest version, but will deprecated some interface which is not used by ui.
Test Plan
Test by UT, E2E, manual.
Master Integration Test
Right now, there is no master integration test, in this DSIP, will add master IT test support.
Code of Conduct
The text was updated successfully, but these errors were encountered: