Skip to content

Commit d9d2f62

Browse files
authored
Mutable state changes for workflow pause. (#8560)
## What changed? **Note**: This depends on temporalio/api#653. Sending it for some early feedback. - Made changes to `WorkflowPauseInfo` (in WorkflowExecutionInfo). Reusing unused proto fields. - Added mutablestate.IsWorkflowExecutionPaused() - Added mutablestate.AddWorkflowExecutionPausedEvent() ## Why? - These changes are needed to implement pause/unpause features. ## How did you test it? - [x] built - [ ] run locally and tested manually - [ ] covered by existing tests - [x] added new unit test(s) - [ ] added new functional test(s) <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Adds workflow-level pause/unpause with new `WorkflowPauseInfo` fields, client endpoints, history event handling, state/status validation for `PAUSED`, and tests; removes activity-level pause info. > > - **Protocol/API**: > - Redefine `persistence.v1.WorkflowPauseInfo` to `{pause_time, identity, reason, request_id}`; remove `ActivityPauseInfo` and related helpers. > - Bump dependency `go.temporal.io/api` and adjust generated code indexes. > - Update RPC metadata, redirection maps, quotas, and log tags for `PauseWorkflowExecution`/`UnpauseWorkflowExecution`. > - **Frontend Clients**: > - Add `PauseWorkflowExecution` and `UnpauseWorkflowExecution` to `client_impl`, `metric_client`, `retryable_client`, and mocks. > - **History/State**: > - Add `WorkflowExecutionPaused` event creation (`Create/Add...PausedEvent`) with buffering rules (paused allowed to buffer; unpaused not buffered). > - Implement `ApplyWorkflowExecutionPausedEvent`: set status `PAUSED`, populate `executionInfo.PauseInfo`, invalidate pending activities and workflow task via stamps. > - Rebuilder applies paused event; state transition validation supports `PAUSED` across CREATED/RUNNING/ZOMBIE. > - **Validation/Tests**: > - Allow `WORKFLOW_EXECUTION_STATUS_PAUSED` in validators; expand unit tests for create/update state/status and paused behavior. > - **Tooling**: > - `buf.yaml` breaking ignore for `executions.proto`. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 951c9c4. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 2c9211f commit d9d2f62

29 files changed

+922
-381
lines changed

api/persistence/v1/executions.go-helpers.pb.go

Lines changed: 0 additions & 37 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/persistence/v1/executions.pb.go

Lines changed: 244 additions & 303 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

client/frontend/client_gen.go

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

client/frontend/metric_client_gen.go

Lines changed: 28 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

client/frontend/retryable_client_gen.go

Lines changed: 30 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/api/metadata.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,8 @@ var (
162162
"UpdateTaskQueueConfig": {Scope: ScopeNamespace, Access: AccessWrite, Polling: PollingNone},
163163
"FetchWorkerConfig": {Scope: ScopeNamespace, Access: AccessReadOnly, Polling: PollingNone},
164164
"UpdateWorkerConfig": {Scope: ScopeNamespace, Access: AccessWrite, Polling: PollingNone},
165+
"PauseWorkflowExecution": {Scope: ScopeNamespace, Access: AccessWrite, Polling: PollingNone},
166+
"UnpauseWorkflowExecution": {Scope: ScopeNamespace, Access: AccessWrite, Polling: PollingNone},
165167
}
166168
operatorServiceMetadata = map[string]MethodMetadata{
167169
"AddSearchAttributes": {Scope: ScopeNamespace, Access: AccessAdmin, Polling: PollingNone},

common/log/tag/values.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ var (
1010
WorkflowActionWorkflowTimeout = workflowAction("add-workflow-timeout-event")
1111
WorkflowActionWorkflowTerminated = workflowAction("add-workflow-terminated-event")
1212
WorkflowActionWorkflowContinueAsNew = workflowAction("add-workflow-continue-as-new-event")
13+
WorkflowActionWorkflowPaused = workflowAction("add-workflow-paused-event")
1314

1415
// workflow cancellation / sign / update-options
1516
WorkflowActionWorkflowCancelRequested = workflowAction("add-workflow-cancel-requested-event")

common/persistence/workflow_state_status_validator.go

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ var (
2323
enumspb.WORKFLOW_EXECUTION_STATUS_TERMINATED: {},
2424
enumspb.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW: {},
2525
enumspb.WORKFLOW_EXECUTION_STATUS_TIMED_OUT: {},
26+
enumspb.WORKFLOW_EXECUTION_STATUS_PAUSED: {},
2627
}
2728
)
2829

@@ -40,9 +41,14 @@ func ValidateCreateWorkflowStateStatus(
4041
}
4142

4243
// validate workflow state & status
43-
if (state == enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED && status == enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING) ||
44-
(state != enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED && status != enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING) {
45-
return serviceerror.NewInternalf("Create workflow with invalid state: %v or status: %v", state, status)
44+
if state == enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED {
45+
if status == enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING || status == enumspb.WORKFLOW_EXECUTION_STATUS_PAUSED {
46+
return serviceerror.NewInternalf("Create workflow with invalid state: %v or status: %v", state, status)
47+
}
48+
} else {
49+
if status != enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING {
50+
return serviceerror.NewInternalf("Create workflow with invalid state: %v or status: %v", state, status)
51+
}
4652
}
4753
return nil
4854
}
@@ -61,9 +67,19 @@ func ValidateUpdateWorkflowStateStatus(
6167
}
6268

6369
// validate workflow state & status
64-
if (state == enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED && status == enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING) ||
65-
(state != enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED && status != enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING) {
66-
return serviceerror.NewInternalf("Update workflow with invalid state: %v or status: %v", state, status)
70+
switch state {
71+
case enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, enumsspb.WORKFLOW_EXECUTION_STATE_ZOMBIE:
72+
if status != enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING && status != enumspb.WORKFLOW_EXECUTION_STATUS_PAUSED {
73+
return serviceerror.NewInternalf("Update workflow with invalid state: %v or status: %v", state, status)
74+
}
75+
case enumsspb.WORKFLOW_EXECUTION_STATE_CREATED:
76+
if status != enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING {
77+
return serviceerror.NewInternalf("Update workflow with invalid state: %v or status: %v", state, status)
78+
}
79+
default:
80+
if status == enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING || status == enumspb.WORKFLOW_EXECUTION_STATUS_PAUSED {
81+
return serviceerror.NewInternalf("Update workflow with invalid state: %v or status: %v", state, status)
82+
}
6783
}
6884
return nil
6985
}

common/persistence/workflow_state_status_validator_test.go

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,16 @@ func (s *workflowStateStatusSuite) TestCreateWorkflowStateStatus_WorkflowStateCr
4444
enumspb.WORKFLOW_EXECUTION_STATUS_TERMINATED,
4545
enumspb.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW,
4646
enumspb.WORKFLOW_EXECUTION_STATUS_TIMED_OUT,
47+
enumspb.WORKFLOW_EXECUTION_STATUS_PAUSED,
4748
}
4849

50+
// Creating workflow in State: CREATED and status: RUNNING is allowed
4951
s.NoError(ValidateCreateWorkflowStateStatus(enumsspb.WORKFLOW_EXECUTION_STATE_CREATED, enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING))
5052

53+
// Cannot be created in State: COMPLETED and status: {RUNNING ,PAUSED}
54+
s.Error(ValidateCreateWorkflowStateStatus(enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED, enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING))
55+
s.Error(ValidateCreateWorkflowStateStatus(enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED, enumspb.WORKFLOW_EXECUTION_STATUS_PAUSED))
56+
5157
for _, status := range statuses {
5258
s.NotNil(ValidateCreateWorkflowStateStatus(enumsspb.WORKFLOW_EXECUTION_STATE_CREATED, status))
5359
}
@@ -105,24 +111,26 @@ func (s *workflowStateStatusSuite) TestCreateWorkflowStateStatus_WorkflowStateZo
105111
}
106112

107113
func (s *workflowStateStatusSuite) TestUpdateWorkflowStateStatus_WorkflowStateCreated() {
108-
statuses := []enumspb.WorkflowExecutionStatus{
114+
disallowedStatuses := []enumspb.WorkflowExecutionStatus{
109115
enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
110116
enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
111117
enumspb.WORKFLOW_EXECUTION_STATUS_CANCELED,
112118
enumspb.WORKFLOW_EXECUTION_STATUS_TERMINATED,
113119
enumspb.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW,
114120
enumspb.WORKFLOW_EXECUTION_STATUS_TIMED_OUT,
121+
enumspb.WORKFLOW_EXECUTION_STATUS_PAUSED,
115122
}
116123

124+
// Updating workflow to State: CREATED and status: RUNNING is allowed
117125
s.NoError(ValidateUpdateWorkflowStateStatus(enumsspb.WORKFLOW_EXECUTION_STATE_CREATED, enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING))
118126

119-
for _, status := range statuses {
127+
for _, status := range disallowedStatuses {
120128
s.Error(ValidateUpdateWorkflowStateStatus(enumsspb.WORKFLOW_EXECUTION_STATE_CREATED, status))
121129
}
122130
}
123131

124132
func (s *workflowStateStatusSuite) TestUpdateWorkflowStateStatus_WorkflowStateRunning() {
125-
statuses := []enumspb.WorkflowExecutionStatus{
133+
disallowedStatuses := []enumspb.WorkflowExecutionStatus{
126134
enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
127135
enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
128136
enumspb.WORKFLOW_EXECUTION_STATUS_CANCELED,
@@ -131,15 +139,17 @@ func (s *workflowStateStatusSuite) TestUpdateWorkflowStateStatus_WorkflowStateRu
131139
enumspb.WORKFLOW_EXECUTION_STATUS_TIMED_OUT,
132140
}
133141

142+
// Updating workflow to State: RUNNING and status: {RUNNING, PAUSED} is allowed
134143
s.NoError(ValidateUpdateWorkflowStateStatus(enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING))
144+
s.NoError(ValidateUpdateWorkflowStateStatus(enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, enumspb.WORKFLOW_EXECUTION_STATUS_PAUSED))
135145

136-
for _, status := range statuses {
146+
for _, status := range disallowedStatuses {
137147
s.Error(ValidateUpdateWorkflowStateStatus(enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, status))
138148
}
139149
}
140150

141151
func (s *workflowStateStatusSuite) TestUpdateWorkflowStateStatus_WorkflowStateCompleted() {
142-
statuses := []enumspb.WorkflowExecutionStatus{
152+
allowedStatuses := []enumspb.WorkflowExecutionStatus{
143153
enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
144154
enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
145155
enumspb.WORKFLOW_EXECUTION_STATUS_CANCELED,
@@ -148,15 +158,17 @@ func (s *workflowStateStatusSuite) TestUpdateWorkflowStateStatus_WorkflowStateCo
148158
enumspb.WORKFLOW_EXECUTION_STATUS_TIMED_OUT,
149159
}
150160

161+
// Updating workflow to State: COMPLETED and status: {RUNNING, PAUSED} is *not* allowed
151162
s.Error(ValidateUpdateWorkflowStateStatus(enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED, enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING))
163+
s.Error(ValidateUpdateWorkflowStateStatus(enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED, enumspb.WORKFLOW_EXECUTION_STATUS_PAUSED))
152164

153-
for _, status := range statuses {
165+
for _, status := range allowedStatuses {
154166
s.NoError(ValidateUpdateWorkflowStateStatus(enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED, status))
155167
}
156168
}
157169

158170
func (s *workflowStateStatusSuite) TestUpdateWorkflowStateStatus_WorkflowStateZombie() {
159-
statuses := []enumspb.WorkflowExecutionStatus{
171+
disallowedStatuses := []enumspb.WorkflowExecutionStatus{
160172
enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED,
161173
enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
162174
enumspb.WORKFLOW_EXECUTION_STATUS_CANCELED,
@@ -165,9 +177,11 @@ func (s *workflowStateStatusSuite) TestUpdateWorkflowStateStatus_WorkflowStateZo
165177
enumspb.WORKFLOW_EXECUTION_STATUS_TIMED_OUT,
166178
}
167179

180+
// Updating workflow to State: ZOMBIE and status: {RUNNING, PAUSED} is allowed
168181
s.NoError(ValidateUpdateWorkflowStateStatus(enumsspb.WORKFLOW_EXECUTION_STATE_ZOMBIE, enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING))
182+
s.NoError(ValidateUpdateWorkflowStateStatus(enumsspb.WORKFLOW_EXECUTION_STATE_ZOMBIE, enumspb.WORKFLOW_EXECUTION_STATUS_PAUSED))
169183

170-
for _, status := range statuses {
184+
for _, status := range disallowedStatuses {
171185
s.Error(ValidateUpdateWorkflowStateStatus(enumsspb.WORKFLOW_EXECUTION_STATE_ZOMBIE, status))
172186
}
173187
}

common/rpc/interceptor/logtags/workflow_service_server_gen.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)