Skip to content

Commit 0b7f8f1

Browse files
committed
Defer updating workflow completion metrics until completion accepted by server
Fixes #2111
1 parent 7c0ebcf commit 0b7f8f1

8 files changed

+165
-73
lines changed

internal/internal_public.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515
)
1616

1717
type (
18-
workflowTaskHeartbeatFunc func(response interface{}, startTime time.Time) (*workflowTask, error)
18+
workflowTaskHeartbeatFunc func(taskCompletion *workflowTaskCompletion, startTime time.Time) (*workflowTask, error)
1919

2020
// HistoryIterator iterator through history events
2121
HistoryIterator interface {
@@ -32,8 +32,8 @@ type (
3232
WorkflowExecutionContext interface {
3333
Lock()
3434
Unlock(err error)
35-
ProcessWorkflowTask(workflowTask *workflowTask) (completeRequest interface{}, err error)
36-
ProcessLocalActivityResult(workflowTask *workflowTask, lar *localActivityResult) (interface{}, error)
35+
ProcessWorkflowTask(workflowTask *workflowTask) (taskCompletion *workflowTaskCompletion, err error)
36+
ProcessLocalActivityResult(workflowTask *workflowTask, lar *localActivityResult) (*workflowTaskCompletion, error)
3737
// CompleteWorkflowTask try to complete current workflow task and get response that needs to be sent back to server.
3838
// The waitLocalActivity is used to control if we should wait for outstanding local activities.
3939
// If there is no outstanding local activities or if waitLocalActivity is false, the complete will return response
@@ -42,7 +42,7 @@ type (
4242
// - RespondWorkflowTaskFailedRequest
4343
// - RespondQueryTaskCompletedRequest
4444
// If waitLocalActivity is true, and there is outstanding local activities, this call will return nil.
45-
CompleteWorkflowTask(workflowTask *workflowTask, waitLocalActivity bool) interface{}
45+
CompleteWorkflowTask(workflowTask *workflowTask, waitLocalActivity bool) workflowTaskCompletion
4646
// GetWorkflowTaskTimeout returns the WorkflowTaskTimeout
4747
GetWorkflowTaskTimeout() time.Duration
4848
GetCurrentWorkflowTask() *workflowservice.PollWorkflowTaskQueueResponse
@@ -65,7 +65,7 @@ type (
6565
task *workflowTask,
6666
ctx *workflowExecutionContextImpl,
6767
f workflowTaskHeartbeatFunc,
68-
) (response interface{}, err error)
68+
) (taskCompletion *workflowTaskCompletion, err error)
6969
}
7070

7171
WorkflowContextManager interface {

internal/internal_task_handlers.go

Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"go.temporal.io/api/serviceerror"
2525
taskqueuepb "go.temporal.io/api/taskqueue/v1"
2626
"go.temporal.io/api/workflowservice/v1"
27+
"google.golang.org/protobuf/proto"
2728
"google.golang.org/protobuf/types/known/durationpb"
2829

2930
"go.temporal.io/sdk/internal/common/retry"
@@ -214,6 +215,11 @@ type (
214215
sdkVersion string
215216
sdkName string
216217
}
218+
219+
workflowTaskCompletion struct {
220+
rawRequest proto.Message
221+
applyCompletionMetrics func()
222+
}
217223
)
218224

219225
func newHistory(lastHandledEventID int64, task *workflowTask, eventsHandler *workflowExecutionEventHandlerImpl) *history {
@@ -892,7 +898,7 @@ func (wth *workflowTaskHandlerImpl) ProcessWorkflowTask(
892898
workflowTask *workflowTask,
893899
workflowContext *workflowExecutionContextImpl,
894900
heartbeatFunc workflowTaskHeartbeatFunc,
895-
) (completeRequest interface{}, errRet error) {
901+
) (taskCompletion *workflowTaskCompletion, errRet error) {
896902
if workflowTask == nil || workflowTask.task == nil {
897903
return nil, errors.New("nil workflow task provided")
898904
}
@@ -922,7 +928,7 @@ func (wth *workflowTaskHandlerImpl) ProcessWorkflowTask(
922928
})
923929

924930
var (
925-
response interface{}
931+
response *workflowTaskCompletion
926932
err error
927933
heartbeatTimer *time.Timer
928934
)
@@ -1020,11 +1026,11 @@ processWorkflowLoop:
10201026
}
10211027
}
10221028
errRet = err
1023-
completeRequest = response
1029+
taskCompletion = response
10241030
return
10251031
}
10261032

1027-
func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflowTask) (interface{}, error) {
1033+
func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflowTask) (*workflowTaskCompletion, error) {
10281034
task := workflowTask.task
10291035
historyIterator := workflowTask.historyIterator
10301036
if err := w.ResetIfStale(task, historyIterator); err != nil {
@@ -1268,15 +1274,15 @@ ProcessEvents:
12681274
return w.applyWorkflowPanicPolicy(workflowTask, workflowError)
12691275
}
12701276

1271-
func (w *workflowExecutionContextImpl) ProcessLocalActivityResult(workflowTask *workflowTask, lar *localActivityResult) (interface{}, error) {
1277+
func (w *workflowExecutionContextImpl) ProcessLocalActivityResult(workflowTask *workflowTask, lar *localActivityResult) (*workflowTaskCompletion, error) {
12721278
if lar.err != nil && w.retryLocalActivity(lar) {
12731279
return nil, nil // nothing to do here as we are retrying...
12741280
}
12751281

12761282
return w.applyWorkflowPanicPolicy(workflowTask, w.getEventHandler().ProcessLocalActivityResult(lar))
12771283
}
12781284

1279-
func (w *workflowExecutionContextImpl) applyWorkflowPanicPolicy(workflowTask *workflowTask, workflowError error) (interface{}, error) {
1285+
func (w *workflowExecutionContextImpl) applyWorkflowPanicPolicy(workflowTask *workflowTask, workflowError error) (*workflowTaskCompletion, error) {
12801286
task := workflowTask.task
12811287

12821288
if workflowError == nil && w.err != nil {
@@ -1398,7 +1404,7 @@ func getRetryBackoffWithNowTime(p *RetryPolicy, attempt int32, err error, now, e
13981404
return backoffInterval
13991405
}
14001406

1401-
func (w *workflowExecutionContextImpl) CompleteWorkflowTask(workflowTask *workflowTask, waitLocalActivities bool) interface{} {
1407+
func (w *workflowExecutionContextImpl) CompleteWorkflowTask(workflowTask *workflowTask, waitLocalActivities bool) *workflowTaskCompletion {
14021408
if w.currentWorkflowTask == nil {
14031409
return nil
14041410
}
@@ -1442,7 +1448,7 @@ func (w *workflowExecutionContextImpl) CompleteWorkflowTask(workflowTask *workfl
14421448
completeRequest := w.wth.completeWorkflow(eventHandler, w.currentWorkflowTask, w, w.newCommands, w.newMessages, !waitLocalActivities)
14431449
w.clearCurrentTask()
14441450

1445-
return completeRequest
1451+
return &completeRequest
14461452
}
14471453

14481454
func (w *workflowExecutionContextImpl) hasPendingLocalActivityWork() bool {
@@ -1824,7 +1830,7 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
18241830
commands []*commandpb.Command,
18251831
messages []*protocolpb.Message,
18261832
forceNewWorkflowTask bool,
1827-
) interface{} {
1833+
) workflowTaskCompletion {
18281834
// for query task
18291835
if task.Query != nil {
18301836
queryCompletedRequest := &workflowservice.RespondQueryTaskCompletedRequest{
@@ -1835,7 +1841,7 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
18351841
if errors.As(workflowContext.err, &panicErr) {
18361842
queryCompletedRequest.CompletedType = enumspb.QUERY_RESULT_TYPE_FAILED
18371843
queryCompletedRequest.ErrorMessage = "Workflow panic: " + panicErr.Error()
1838-
return queryCompletedRequest
1844+
return workflowTaskCompletion{rawRequest: queryCompletedRequest}
18391845
}
18401846

18411847
result, err := eventHandler.ProcessQuery(task.Query.GetQueryType(), task.Query.QueryArgs, task.Query.Header)
@@ -1847,27 +1853,25 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
18471853
queryCompletedRequest.CompletedType = enumspb.QUERY_RESULT_TYPE_ANSWERED
18481854
queryCompletedRequest.QueryResult = result
18491855
}
1850-
return queryCompletedRequest
1856+
return workflowTaskCompletion{rawRequest: queryCompletedRequest}
18511857
}
18521858

1853-
metricsHandler := wth.metricsHandler.WithTags(metrics.WorkflowTags(
1854-
eventHandler.workflowEnvironmentImpl.workflowInfo.WorkflowType.Name))
1855-
18561859
// complete workflow task
18571860
var closeCommand *commandpb.Command
18581861
var canceledErr *CanceledError
18591862
var contErr *ContinueAsNewError
1863+
var metricCounterToIncrement string
18601864

18611865
if errors.As(workflowContext.err, &canceledErr) {
18621866
// Workflow canceled
1863-
metricsHandler.Counter(metrics.WorkflowCanceledCounter).Inc(1)
1867+
metricCounterToIncrement = metrics.WorkflowCanceledCounter
18641868
closeCommand = createNewCommand(enumspb.COMMAND_TYPE_CANCEL_WORKFLOW_EXECUTION)
18651869
closeCommand.Attributes = &commandpb.Command_CancelWorkflowExecutionCommandAttributes{CancelWorkflowExecutionCommandAttributes: &commandpb.CancelWorkflowExecutionCommandAttributes{
18661870
Details: convertErrDetailsToPayloads(canceledErr.details, wth.dataConverter),
18671871
}}
18681872
} else if errors.As(workflowContext.err, &contErr) {
18691873
// Continue as new error.
1870-
metricsHandler.Counter(metrics.WorkflowContinueAsNewCounter).Inc(1)
1874+
metricCounterToIncrement = metrics.WorkflowContinueAsNewCounter
18711875
closeCommand = createNewCommand(enumspb.COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION)
18721876

18731877
// ContinueAsNewError.RetryPolicy is optional.
@@ -1894,7 +1898,7 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
18941898
} else if workflowContext.err != nil {
18951899
// Workflow failures
18961900
if !isBenignApplicationError(workflowContext.err) {
1897-
metricsHandler.Counter(metrics.WorkflowFailedCounter).Inc(1)
1901+
metricCounterToIncrement = metrics.WorkflowFailedCounter
18981902
}
18991903
closeCommand = createNewCommand(enumspb.COMMAND_TYPE_FAIL_WORKFLOW_EXECUTION)
19001904
failure := wth.failureConverter.ErrorToFailure(workflowContext.err)
@@ -1903,7 +1907,7 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
19031907
}}
19041908
} else if workflowContext.isWorkflowCompleted {
19051909
// Workflow completion
1906-
metricsHandler.Counter(metrics.WorkflowCompletedCounter).Inc(1)
1910+
metricCounterToIncrement = metrics.WorkflowCompletedCounter
19071911
closeCommand = createNewCommand(enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION)
19081912
closeCommand.Attributes = &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{
19091913
Result: workflowContext.result,
@@ -1912,8 +1916,6 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
19121916

19131917
if closeCommand != nil {
19141918
commands = append(commands, closeCommand)
1915-
elapsed := time.Since(workflowContext.workflowInfo.WorkflowStartTime)
1916-
metricsHandler.Timer(metrics.WorkflowEndToEndLatency).Record(elapsed)
19171919
forceNewWorkflowTask = false
19181920
}
19191921

@@ -1992,7 +1994,22 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
19921994
builtRequest.VersioningBehavior = versioningBehaviorToProto(wth.defaultVersioningBehavior)
19931995
}
19941996
}
1995-
return builtRequest
1997+
1998+
// Return request and a function that will update certain metrics
1999+
metricsHandler := wth.metricsHandler.WithTags(metrics.WorkflowTags(
2000+
eventHandler.workflowEnvironmentImpl.workflowInfo.WorkflowType.Name))
2001+
return workflowTaskCompletion{
2002+
rawRequest: builtRequest,
2003+
applyCompletionMetrics: func() {
2004+
if metricCounterToIncrement != "" {
2005+
metricsHandler.Counter(metricCounterToIncrement).Inc(1)
2006+
}
2007+
if closeCommand != nil {
2008+
elapsed := time.Since(workflowContext.workflowInfo.WorkflowStartTime)
2009+
metricsHandler.Timer(metrics.WorkflowEndToEndLatency).Record(elapsed)
2010+
}
2011+
},
2012+
}
19962013
}
19972014

19982015
func (wth *workflowTaskHandlerImpl) executeAnyPressurePoints(event *historypb.HistoryEvent, isInReplay bool) error {

internal/internal_task_handlers_interfaces_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,10 @@ func (wth sampleWorkflowTaskHandler) ProcessWorkflowTask(
3232
workflowTask *workflowTask,
3333
_ *workflowExecutionContextImpl,
3434
_ workflowTaskHeartbeatFunc,
35-
) (interface{}, error) {
36-
return &workflowservice.RespondWorkflowTaskCompletedRequest{
35+
) (*workflowTaskCompletion, error) {
36+
return &workflowTaskCompletion{rawRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{
3737
TaskToken: workflowTask.task.TaskToken,
38-
}, nil
38+
}}, nil
3939
}
4040

4141
func (wth sampleWorkflowTaskHandler) GetOrCreateWorkflowContext(
@@ -111,7 +111,7 @@ func (s *PollLayerInterfacesTestSuite) TestProcessWorkflowTaskInterface() {
111111
// Process task and respond to the service.
112112
taskHandler := newSampleWorkflowTaskHandler()
113113
request, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: response}, nil, nil)
114-
completionRequest := request.(*workflowservice.RespondWorkflowTaskCompletedRequest)
114+
completionRequest := request.rawRequest.(*workflowservice.RespondWorkflowTaskCompletedRequest)
115115
s.NoError(err)
116116

117117
_, err = s.service.RespondWorkflowTaskCompleted(ctx, completionRequest)

0 commit comments

Comments
 (0)