Skip to content

Commit 081958b

Browse files
authored
Add priority annotations (#1792)
1 parent e7a505f commit 081958b

14 files changed

+133
-7
lines changed

internal/activity.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ type (
6262
Deadline time.Time // Time of activity timeout
6363
Attempt int32 // Attempt starts from 1, and increased by 1 for every retry if retry policy is specified.
6464
IsLocalActivity bool // true if it is a local activity
65+
Priority Priority
6566
}
6667

6768
// RegisterActivityOptions consists of options for registering an activity.
@@ -165,6 +166,10 @@ type (
165166
//
166167
// NOTE: Experimental
167168
Summary string
169+
170+
// Priority - Optional priority settings that control relative ordering of
171+
// task processing when tasks are backed up in a queue.
172+
Priority Priority
168173
}
169174

170175
// LocalActivityOptions stores local activity specific parameters that will be stored inside of a context.
@@ -320,6 +325,7 @@ func WithActivityTask(
320325
taskQueue: taskQueue,
321326
dataConverter: dataConverter,
322327
attempt: task.GetAttempt(),
328+
priority: task.GetPriority(),
323329
heartbeatDetails: task.HeartbeatDetails,
324330
workflowType: &WorkflowType{
325331
Name: task.WorkflowType.GetName(),

internal/client.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -774,6 +774,10 @@ type (
774774
// NOTE: Experimental
775775
VersioningOverride VersioningOverride
776776

777+
// Priority - Optional priority settings that control relative ordering of
778+
// task processing when tasks are backed up in a queue.
779+
Priority Priority
780+
777781
// request ID. Only settable by the SDK - e.g. [temporalnexus.workflowRunOperation].
778782
requestID string
779783
// workflow completion callback. Only settable by the SDK - e.g. [temporalnexus.workflowRunOperation].
@@ -835,6 +839,33 @@ type (
835839
NonRetryableErrorTypes []string
836840
}
837841

842+
// Priority contains metadata that controls the relative ordering of task processing
843+
// when tasks are backed up in a queue. The affected queues depend on the
844+
// server version.
845+
//
846+
// Priority is attached to workflows and activities. By default, activities
847+
// and child workflows inherit Priority from the workflow that created them, but may
848+
// override fields when an activity is started or modified.
849+
//
850+
// For all fields, the field not present or equal to zero/empty string means to
851+
// inherit the value from the calling workflow, or if there is no calling
852+
// workflow, then use the default value.
853+
//
854+
// Exposed as: [go.temporal.io/sdk/temporal.Priority]
855+
Priority struct {
856+
// PriorityKey is a positive integer from 1 to n, where smaller integers
857+
// correspond to higher priorities (tasks run sooner). In general, tasks in
858+
// a queue should be processed in close to priority order, although small
859+
// deviations are possible.
860+
//
861+
// The maximum priority value (minimum priority) is determined by server
862+
// configuration, and defaults to 5.
863+
//
864+
// The default value when unset or 0 is calculated by (min+max)/2. With the
865+
// default max of 5, and min of 1, that comes out to 3.
866+
PriorityKey int
867+
}
868+
838869
// NamespaceClient is the client for managing operations on the namespace.
839870
// CLI, tools, ... can use this layer to manager operations on namespace.
840871
NamespaceClient interface {

internal/internal_activity.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ type (
7474
DisableEagerExecution bool
7575
VersioningIntent VersioningIntent
7676
Summary string
77+
Priority *commonpb.Priority
7778
}
7879

7980
// ExecuteLocalActivityOptions options for executing a local activity
@@ -146,6 +147,7 @@ type (
146147
workflowNamespace string
147148
workerStopChannel <-chan struct{}
148149
contextPropagators []ContextPropagator
150+
priority *commonpb.Priority
149151
}
150152

151153
// context.WithValue need this type instead of basic type string to avoid lint error
@@ -382,6 +384,7 @@ func (a *activityEnvironmentInterceptor) GetInfo(ctx context.Context) ActivityIn
382384
WorkflowType: a.env.workflowType,
383385
WorkflowNamespace: a.env.workflowNamespace,
384386
IsLocalActivity: a.env.isLocalActivity,
387+
Priority: convertFromPBPriority(a.env.priority),
385388
}
386389
}
387390

internal/internal_event_handlers.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -593,6 +593,7 @@ func (wc *workflowEnvironmentImpl) ExecuteChildWorkflow(
593593
attributes.WorkflowIdReusePolicy = params.WorkflowIDReusePolicy
594594
attributes.ParentClosePolicy = params.ParentClosePolicy
595595
attributes.RetryPolicy = params.RetryPolicy
596+
attributes.Priority = params.Priority
596597
attributes.Header = params.Header
597598
attributes.Memo = memo
598599
attributes.SearchAttributes = searchAttr
@@ -758,6 +759,7 @@ func (wc *workflowEnvironmentImpl) ExecuteActivity(parameters ExecuteActivityPar
758759
scheduleTaskAttr.RequestEagerExecution = !parameters.DisableEagerExecution
759760
scheduleTaskAttr.UseWorkflowBuildId = determineInheritBuildIdFlagForCommand(
760761
parameters.VersioningIntent, wc.workflowInfo.TaskQueueName, parameters.TaskQueueName)
762+
scheduleTaskAttr.Priority = parameters.Priority
761763

762764
startMetadata, err := buildUserMetadata(parameters.Summary, "", wc.dataConverter)
763765
if err != nil {

internal/internal_schedule_client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -661,6 +661,7 @@ func convertToPBScheduleAction(
661661
Header: header,
662662
UserMetadata: userMetadata,
663663
VersioningOverride: versioningOverrideToProto(action.VersioningOverride),
664+
Priority: convertToPBPriority(action.Priority),
664665
},
665666
},
666667
}, nil

internal/internal_task_handlers.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -744,6 +744,7 @@ func (wth *workflowTaskHandlerImpl) createWorkflowContext(task *workflowservice.
744744
Memo: attributes.Memo,
745745
SearchAttributes: attributes.SearchAttributes,
746746
RetryPolicy: convertFromPBRetryPolicy(attributes.RetryPolicy),
747+
Priority: convertFromPBPriority(attributes.Priority),
747748
}
748749

749750
return newWorkflowExecutionContext(workflowInfo, wth), nil

internal/internal_workflow.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,7 @@ type (
218218
WorkflowIDConflictPolicy enumspb.WorkflowIdConflictPolicy
219219
DataConverter converter.DataConverter
220220
RetryPolicy *commonpb.RetryPolicy
221+
Priority *commonpb.Priority
221222
CronSchedule string
222223
ContextPropagators []ContextPropagator
223224
Memo map[string]interface{}

internal/internal_workflow_client.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -740,11 +740,12 @@ func (wc *WorkflowClient) DescribeWorkflowExecution(ctx context.Context, workflo
740740

741741
// QueryWorkflow queries a given workflow execution
742742
// workflowID and queryType are required, other parameters are optional.
743-
// - workflow ID of the workflow.
744-
// - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID.
745-
// - taskQueue can be default(empty string). If empty string then it will pick the taskQueue of the running execution of that workflow ID.
746-
// - queryType is the type of the query.
747-
// - args... are the optional query parameters.
743+
// - workflow ID of the workflow.
744+
// - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID.
745+
// - taskQueue can be default(empty string). If empty string then it will pick the taskQueue of the running execution of that workflow ID.
746+
// - queryType is the type of the query.
747+
// - args... are the optional query parameters.
748+
//
748749
// The errors it can return:
749750
// - serviceerror.InvalidArgument
750751
// - serviceerror.Internal
@@ -943,8 +944,9 @@ func (wc *WorkflowClient) QueryWorkflowWithOptions(ctx context.Context, request
943944

944945
// DescribeTaskQueue returns information about the target taskqueue, right now this API returns the
945946
// pollers which polled this taskqueue in last few minutes.
946-
// - taskqueue name of taskqueue
947-
// - taskqueueType type of taskqueue, can be workflow or activity
947+
// - taskqueue name of taskqueue
948+
// - taskqueueType type of taskqueue, can be workflow or activity
949+
//
948950
// The errors it can return:
949951
// - serviceerror.InvalidArgument
950952
// - serviceerror.Internal
@@ -1684,6 +1686,7 @@ func (w *workflowClientInterceptor) createStartWorkflowRequest(
16841686
CompletionCallbacks: in.Options.callbacks,
16851687
Links: in.Options.links,
16861688
VersioningOverride: versioningOverrideToProto(in.Options.VersioningOverride),
1689+
Priority: convertToPBPriority(in.Options.Priority),
16871690
}
16881691

16891692
startRequest.UserMetadata, err = buildUserMetadata(in.Options.StaticSummary, in.Options.StaticDetails, dataConverter)
@@ -2056,6 +2059,7 @@ func (w *workflowClientInterceptor) SignalWithStartWorkflow(
20562059
WorkflowIdConflictPolicy: in.Options.WorkflowIDConflictPolicy,
20572060
Header: header,
20582061
VersioningOverride: versioningOverrideToProto(in.Options.VersioningOverride),
2062+
Priority: convertToPBPriority(in.Options.Priority),
20592063
}
20602064

20612065
if in.Options.StartDelay != 0 {

internal/internal_workflow_testsuite.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2155,6 +2155,7 @@ func newTestActivityTask(workflowID, runID, workflowTypeName, namespace string,
21552155
},
21562156
WorkflowNamespace: namespace,
21572157
Header: attr.GetHeader(),
2158+
Priority: attr.Priority,
21582159
}
21592160
return task
21602161
}

internal/nexus_operations.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ func (t *testSuiteClientForNexusOperations) ExecuteWorkflow(ctx context.Context,
241241
Memo: options.Memo,
242242
CronSchedule: options.CronSchedule,
243243
RetryPolicy: convertToPBRetryPolicy(options.RetryPolicy),
244+
Priority: convertToPBPriority(options.Priority),
244245
},
245246
}, func(result *commonpb.Payloads, wfErr error) {
246247
ncb := callback.GetNexus()

0 commit comments

Comments
 (0)