Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion internal/common/metrics/capturing_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,17 @@ func (c *CapturedGauge) Value() float64 {
type CapturedTimer struct {
CapturedMetricMeta
value int64
count int64
}

// Record implements Timer.Record.
func (c *CapturedTimer) Record(d time.Duration) { atomic.StoreInt64(&c.value, int64(d)) }
func (c *CapturedTimer) Record(d time.Duration) {
atomic.StoreInt64(&c.value, int64(d))
atomic.AddInt64(&c.count, 1)
}

// Value atomically returns the current value.
func (c *CapturedTimer) Value() time.Duration { return time.Duration(atomic.LoadInt64(&c.value)) }

// Count atomically returns the current count.
func (c *CapturedTimer) Count() int64 { return atomic.LoadInt64(&c.count) }
10 changes: 5 additions & 5 deletions internal/internal_public.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

type (
workflowTaskHeartbeatFunc func(response interface{}, startTime time.Time) (*workflowTask, error)
workflowTaskHeartbeatFunc func(taskCompletion *workflowTaskCompletion, startTime time.Time) (*workflowTask, error)

// HistoryIterator iterator through history events
HistoryIterator interface {
Expand All @@ -32,8 +32,8 @@ type (
WorkflowExecutionContext interface {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in internal_public but I don't see it referenced in internalbindings or having any side effects of my changing this interface (some methods already accepted parameters that couldn't be instantiated externally anyways).

@Quinn-With-Two-Ns - any concerns with the altering of this interface?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this interface even used? My IDE can't find a us of it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same, I doubt it is

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 don't see it being used anywhere

Lock()
Unlock(err error)
ProcessWorkflowTask(workflowTask *workflowTask) (completeRequest interface{}, err error)
ProcessLocalActivityResult(workflowTask *workflowTask, lar *localActivityResult) (interface{}, error)
ProcessWorkflowTask(workflowTask *workflowTask) (taskCompletion *workflowTaskCompletion, err error)
ProcessLocalActivityResult(workflowTask *workflowTask, lar *localActivityResult) (*workflowTaskCompletion, error)
// CompleteWorkflowTask try to complete current workflow task and get response that needs to be sent back to server.
// The waitLocalActivity is used to control if we should wait for outstanding local activities.
// If there is no outstanding local activities or if waitLocalActivity is false, the complete will return response
Expand All @@ -42,7 +42,7 @@ type (
// - RespondWorkflowTaskFailedRequest
// - RespondQueryTaskCompletedRequest
// If waitLocalActivity is true, and there is outstanding local activities, this call will return nil.
CompleteWorkflowTask(workflowTask *workflowTask, waitLocalActivity bool) interface{}
CompleteWorkflowTask(workflowTask *workflowTask, waitLocalActivity bool) workflowTaskCompletion
// GetWorkflowTaskTimeout returns the WorkflowTaskTimeout
GetWorkflowTaskTimeout() time.Duration
GetCurrentWorkflowTask() *workflowservice.PollWorkflowTaskQueueResponse
Expand All @@ -65,7 +65,7 @@ type (
task *workflowTask,
ctx *workflowExecutionContextImpl,
f workflowTaskHeartbeatFunc,
) (response interface{}, err error)
) (taskCompletion *workflowTaskCompletion, err error)
}

WorkflowContextManager interface {
Expand Down
59 changes: 38 additions & 21 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"

"go.temporal.io/sdk/internal/common/retry"
Expand Down Expand Up @@ -214,6 +215,11 @@ type (
sdkVersion string
sdkName string
}

workflowTaskCompletion struct {
rawRequest proto.Message
applyCompletionMetrics func()
}
)

func newHistory(lastHandledEventID int64, task *workflowTask, eventsHandler *workflowExecutionEventHandlerImpl) *history {
Expand Down Expand Up @@ -892,7 +898,7 @@ func (wth *workflowTaskHandlerImpl) ProcessWorkflowTask(
workflowTask *workflowTask,
workflowContext *workflowExecutionContextImpl,
heartbeatFunc workflowTaskHeartbeatFunc,
) (completeRequest interface{}, errRet error) {
) (taskCompletion *workflowTaskCompletion, errRet error) {
if workflowTask == nil || workflowTask.task == nil {
return nil, errors.New("nil workflow task provided")
}
Expand Down Expand Up @@ -922,7 +928,7 @@ func (wth *workflowTaskHandlerImpl) ProcessWorkflowTask(
})

var (
response interface{}
response *workflowTaskCompletion
err error
heartbeatTimer *time.Timer
)
Expand Down Expand Up @@ -1020,11 +1026,11 @@ processWorkflowLoop:
}
}
errRet = err
completeRequest = response
taskCompletion = response
return
}

func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflowTask) (interface{}, error) {
func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflowTask) (*workflowTaskCompletion, error) {
task := workflowTask.task
historyIterator := workflowTask.historyIterator
if err := w.ResetIfStale(task, historyIterator); err != nil {
Expand Down Expand Up @@ -1268,15 +1274,15 @@ ProcessEvents:
return w.applyWorkflowPanicPolicy(workflowTask, workflowError)
}

func (w *workflowExecutionContextImpl) ProcessLocalActivityResult(workflowTask *workflowTask, lar *localActivityResult) (interface{}, error) {
func (w *workflowExecutionContextImpl) ProcessLocalActivityResult(workflowTask *workflowTask, lar *localActivityResult) (*workflowTaskCompletion, error) {
if lar.err != nil && w.retryLocalActivity(lar) {
return nil, nil // nothing to do here as we are retrying...
}

return w.applyWorkflowPanicPolicy(workflowTask, w.getEventHandler().ProcessLocalActivityResult(lar))
}

func (w *workflowExecutionContextImpl) applyWorkflowPanicPolicy(workflowTask *workflowTask, workflowError error) (interface{}, error) {
func (w *workflowExecutionContextImpl) applyWorkflowPanicPolicy(workflowTask *workflowTask, workflowError error) (*workflowTaskCompletion, error) {
task := workflowTask.task

if workflowError == nil && w.err != nil {
Expand Down Expand Up @@ -1398,7 +1404,7 @@ func getRetryBackoffWithNowTime(p *RetryPolicy, attempt int32, err error, now, e
return backoffInterval
}

func (w *workflowExecutionContextImpl) CompleteWorkflowTask(workflowTask *workflowTask, waitLocalActivities bool) interface{} {
func (w *workflowExecutionContextImpl) CompleteWorkflowTask(workflowTask *workflowTask, waitLocalActivities bool) *workflowTaskCompletion {
if w.currentWorkflowTask == nil {
return nil
}
Expand Down Expand Up @@ -1442,7 +1448,7 @@ func (w *workflowExecutionContextImpl) CompleteWorkflowTask(workflowTask *workfl
completeRequest := w.wth.completeWorkflow(eventHandler, w.currentWorkflowTask, w, w.newCommands, w.newMessages, !waitLocalActivities)
w.clearCurrentTask()

return completeRequest
return &completeRequest
}

func (w *workflowExecutionContextImpl) hasPendingLocalActivityWork() bool {
Expand Down Expand Up @@ -1824,7 +1830,7 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
commands []*commandpb.Command,
messages []*protocolpb.Message,
forceNewWorkflowTask bool,
) interface{} {
) workflowTaskCompletion {
// for query task
if task.Query != nil {
queryCompletedRequest := &workflowservice.RespondQueryTaskCompletedRequest{
Expand All @@ -1835,7 +1841,7 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
if errors.As(workflowContext.err, &panicErr) {
queryCompletedRequest.CompletedType = enumspb.QUERY_RESULT_TYPE_FAILED
queryCompletedRequest.ErrorMessage = "Workflow panic: " + panicErr.Error()
return queryCompletedRequest
return workflowTaskCompletion{rawRequest: queryCompletedRequest}
}

result, err := eventHandler.ProcessQuery(task.Query.GetQueryType(), task.Query.QueryArgs, task.Query.Header)
Expand All @@ -1847,27 +1853,25 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
queryCompletedRequest.CompletedType = enumspb.QUERY_RESULT_TYPE_ANSWERED
queryCompletedRequest.QueryResult = result
}
return queryCompletedRequest
return workflowTaskCompletion{rawRequest: queryCompletedRequest}
}

metricsHandler := wth.metricsHandler.WithTags(metrics.WorkflowTags(
eventHandler.workflowEnvironmentImpl.workflowInfo.WorkflowType.Name))

// complete workflow task
var closeCommand *commandpb.Command
var canceledErr *CanceledError
var contErr *ContinueAsNewError
var metricCounterToIncrement string

if errors.As(workflowContext.err, &canceledErr) {
// Workflow canceled
metricsHandler.Counter(metrics.WorkflowCanceledCounter).Inc(1)
metricCounterToIncrement = metrics.WorkflowCanceledCounter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you check if the existing test coverage for these completion metrics is sufficient to catch any potential bugs here? (I can check if you haven't just let me know)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not check, only confirmed existing tests pass and added my check w/ unhandled command that wouldn't have passed before. Doing a quick glance, I don't think any of these completion metrics have any coverage. Do you feel that new tests covering these existing metrics should be in this PR? I don't mind doing it if I must, will just add some time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have test coverage to make sure these metrics are still being emitted correctly. Unfortunately we lack test coverage of some basic metrics in the Go SDK so we have to keep an eye out. I don't think it adds a lot of complexity to your test to iterate through the few other ways a workflow can complete.

Copy link
Member Author

@cretz cretz Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do as part of this PR (may not be right away though)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test

closeCommand = createNewCommand(enumspb.COMMAND_TYPE_CANCEL_WORKFLOW_EXECUTION)
closeCommand.Attributes = &commandpb.Command_CancelWorkflowExecutionCommandAttributes{CancelWorkflowExecutionCommandAttributes: &commandpb.CancelWorkflowExecutionCommandAttributes{
Details: convertErrDetailsToPayloads(canceledErr.details, wth.dataConverter),
}}
} else if errors.As(workflowContext.err, &contErr) {
// Continue as new error.
metricsHandler.Counter(metrics.WorkflowContinueAsNewCounter).Inc(1)
metricCounterToIncrement = metrics.WorkflowContinueAsNewCounter
closeCommand = createNewCommand(enumspb.COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION)

// ContinueAsNewError.RetryPolicy is optional.
Expand All @@ -1894,7 +1898,7 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
} else if workflowContext.err != nil {
// Workflow failures
if !isBenignApplicationError(workflowContext.err) {
metricsHandler.Counter(metrics.WorkflowFailedCounter).Inc(1)
metricCounterToIncrement = metrics.WorkflowFailedCounter
}
closeCommand = createNewCommand(enumspb.COMMAND_TYPE_FAIL_WORKFLOW_EXECUTION)
failure := wth.failureConverter.ErrorToFailure(workflowContext.err)
Expand All @@ -1903,7 +1907,7 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
}}
} else if workflowContext.isWorkflowCompleted {
// Workflow completion
metricsHandler.Counter(metrics.WorkflowCompletedCounter).Inc(1)
metricCounterToIncrement = metrics.WorkflowCompletedCounter
closeCommand = createNewCommand(enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION)
closeCommand.Attributes = &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{
Result: workflowContext.result,
Expand All @@ -1912,8 +1916,6 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(

if closeCommand != nil {
commands = append(commands, closeCommand)
elapsed := time.Since(workflowContext.workflowInfo.WorkflowStartTime)
metricsHandler.Timer(metrics.WorkflowEndToEndLatency).Record(elapsed)
forceNewWorkflowTask = false
}

Expand Down Expand Up @@ -1992,7 +1994,22 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
builtRequest.VersioningBehavior = versioningBehaviorToProto(wth.defaultVersioningBehavior)
}
}
return builtRequest

// Return request and a function that will update certain metrics
metricsHandler := wth.metricsHandler.WithTags(metrics.WorkflowTags(
eventHandler.workflowEnvironmentImpl.workflowInfo.WorkflowType.Name))
return workflowTaskCompletion{
rawRequest: builtRequest,
applyCompletionMetrics: func() {
if metricCounterToIncrement != "" {
metricsHandler.Counter(metricCounterToIncrement).Inc(1)
}
if closeCommand != nil {
elapsed := time.Since(workflowContext.workflowInfo.WorkflowStartTime)
metricsHandler.Timer(metrics.WorkflowEndToEndLatency).Record(elapsed)
}
},
}
}

func (wth *workflowTaskHandlerImpl) executeAnyPressurePoints(event *historypb.HistoryEvent, isInReplay bool) error {
Expand Down
8 changes: 4 additions & 4 deletions internal/internal_task_handlers_interfaces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ func (wth sampleWorkflowTaskHandler) ProcessWorkflowTask(
workflowTask *workflowTask,
_ *workflowExecutionContextImpl,
_ workflowTaskHeartbeatFunc,
) (interface{}, error) {
return &workflowservice.RespondWorkflowTaskCompletedRequest{
) (*workflowTaskCompletion, error) {
return &workflowTaskCompletion{rawRequest: &workflowservice.RespondWorkflowTaskCompletedRequest{
TaskToken: workflowTask.task.TaskToken,
}, nil
}}, nil
}

func (wth sampleWorkflowTaskHandler) GetOrCreateWorkflowContext(
Expand Down Expand Up @@ -111,7 +111,7 @@ func (s *PollLayerInterfacesTestSuite) TestProcessWorkflowTaskInterface() {
// Process task and respond to the service.
taskHandler := newSampleWorkflowTaskHandler()
request, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: response}, nil, nil)
completionRequest := request.(*workflowservice.RespondWorkflowTaskCompletedRequest)
completionRequest := request.rawRequest.(*workflowservice.RespondWorkflowTaskCompletedRequest)
s.NoError(err)

_, err = s.service.RespondWorkflowTaskCompleted(ctx, completionRequest)
Expand Down
Loading
Loading