Skip to content

Commit 8ee3a8a

Browse files
Expose StartToCloseTimeout in ActivityInfo struct (#1990)
Expose StartToCloseTimeout in ActivityInfo struct
1 parent 5be3364 commit 8ee3a8a

File tree

4 files changed

+95
-79
lines changed

4 files changed

+95
-79
lines changed

internal/activity.go

Lines changed: 46 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,21 @@ type (
2525
//
2626
// Exposed as: [go.temporal.io/sdk/activity.Info]
2727
ActivityInfo struct {
28-
TaskToken []byte
29-
WorkflowType *WorkflowType
30-
WorkflowNamespace string
31-
WorkflowExecution WorkflowExecution
32-
ActivityID string
33-
ActivityType ActivityType
34-
TaskQueue string
35-
HeartbeatTimeout time.Duration // Maximum time between heartbeats. 0 means no heartbeat needed.
36-
ScheduledTime time.Time // Time of activity scheduled by a workflow
37-
StartedTime time.Time // Time of activity start
38-
Deadline time.Time // Time of activity timeout
39-
Attempt int32 // Attempt starts from 1, and increased by 1 for every retry if retry policy is specified.
40-
IsLocalActivity bool // true if it is a local activity
28+
TaskToken []byte
29+
WorkflowType *WorkflowType
30+
WorkflowNamespace string
31+
WorkflowExecution WorkflowExecution
32+
ActivityID string
33+
ActivityType ActivityType
34+
TaskQueue string
35+
HeartbeatTimeout time.Duration // Maximum time between heartbeats. 0 means no heartbeat needed.
36+
ScheduleToCloseTimeout time.Duration // Schedule to close timeout set by the activity options.
37+
StartToCloseTimeout time.Duration // Start to close timeout set by the activity options.
38+
ScheduledTime time.Time // Time of activity scheduled by a workflow
39+
StartedTime time.Time // Time of activity start
40+
Deadline time.Time // Time of activity timeout
41+
Attempt int32 // Attempt starts from 1, and increased by 1 for every retry if retry policy is specified.
42+
IsLocalActivity bool // true if it is a local activity
4143
// Priority settings that control relative ordering of task processing when activity tasks are backed up in a queue.
4244
// If no priority is set, the default value is the zero value.
4345
//
@@ -318,17 +320,19 @@ func WithActivityTask(
318320
workflowExecution: WorkflowExecution{
319321
RunID: task.WorkflowExecution.RunId,
320322
ID: task.WorkflowExecution.WorkflowId},
321-
logger: logger,
322-
metricsHandler: metricsHandler,
323-
deadline: deadline,
324-
heartbeatTimeout: heartbeatTimeout,
325-
scheduledTime: scheduled,
326-
startedTime: started,
327-
taskQueue: taskQueue,
328-
dataConverter: dataConverter,
329-
attempt: task.GetAttempt(),
330-
priority: task.GetPriority(),
331-
heartbeatDetails: task.HeartbeatDetails,
323+
logger: logger,
324+
metricsHandler: metricsHandler,
325+
deadline: deadline,
326+
heartbeatTimeout: heartbeatTimeout,
327+
scheduleToCloseTimeout: scheduleToCloseTimeout,
328+
startToCloseTimeout: startToCloseTimeout,
329+
scheduledTime: scheduled,
330+
startedTime: started,
331+
taskQueue: taskQueue,
332+
dataConverter: dataConverter,
333+
attempt: task.GetAttempt(),
334+
priority: task.GetPriority(),
335+
heartbeatDetails: task.HeartbeatDetails,
332336
workflowType: &WorkflowType{
333337
Name: task.WorkflowType.GetName(),
334338
},
@@ -380,22 +384,24 @@ func WithLocalActivityTask(
380384
deadline = task.expireTime
381385
}
382386
return newActivityContext(ctx, interceptors, &activityEnvironment{
383-
workflowType: &workflowTypeLocal,
384-
workflowNamespace: task.params.WorkflowInfo.Namespace,
385-
taskQueue: task.params.WorkflowInfo.TaskQueueName,
386-
activityType: ActivityType{Name: activityType},
387-
activityID: fmt.Sprintf("%v", task.activityID),
388-
workflowExecution: task.params.WorkflowInfo.WorkflowExecution,
389-
logger: logger,
390-
metricsHandler: metricsHandler,
391-
isLocalActivity: true,
392-
deadline: deadline,
393-
scheduledTime: task.scheduledTime,
394-
startedTime: startedTime,
395-
dataConverter: dataConverter,
396-
attempt: task.attempt,
397-
client: client,
398-
workerStopChannel: workerStopChannel,
387+
workflowType: &workflowTypeLocal,
388+
workflowNamespace: task.params.WorkflowInfo.Namespace,
389+
taskQueue: task.params.WorkflowInfo.TaskQueueName,
390+
activityType: ActivityType{Name: activityType},
391+
activityID: fmt.Sprintf("%v", task.activityID),
392+
workflowExecution: task.params.WorkflowInfo.WorkflowExecution,
393+
logger: logger,
394+
metricsHandler: metricsHandler,
395+
scheduleToCloseTimeout: scheduleToCloseTimeout,
396+
startToCloseTimeout: startToCloseTimeout,
397+
isLocalActivity: true,
398+
deadline: deadline,
399+
scheduledTime: task.scheduledTime,
400+
startedTime: startedTime,
401+
dataConverter: dataConverter,
402+
attempt: task.attempt,
403+
client: client,
404+
workerStopChannel: workerStopChannel,
399405
})
400406
}
401407

internal/internal_activity.go

Lines changed: 40 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -104,28 +104,30 @@ type (
104104
}
105105

106106
activityEnvironment struct {
107-
taskToken []byte
108-
workflowExecution WorkflowExecution
109-
activityID string
110-
activityType ActivityType
111-
serviceInvoker ServiceInvoker
112-
logger log.Logger
113-
metricsHandler metrics.Handler
114-
isLocalActivity bool
115-
heartbeatTimeout time.Duration
116-
deadline time.Time
117-
scheduledTime time.Time
118-
startedTime time.Time
119-
taskQueue string
120-
dataConverter converter.DataConverter
121-
attempt int32 // starts from 1.
122-
heartbeatDetails *commonpb.Payloads
123-
workflowType *WorkflowType
124-
workflowNamespace string
125-
workerStopChannel <-chan struct{}
126-
contextPropagators []ContextPropagator
127-
client *WorkflowClient
128-
priority *commonpb.Priority
107+
taskToken []byte
108+
workflowExecution WorkflowExecution
109+
activityID string
110+
activityType ActivityType
111+
serviceInvoker ServiceInvoker
112+
logger log.Logger
113+
metricsHandler metrics.Handler
114+
isLocalActivity bool
115+
heartbeatTimeout time.Duration
116+
scheduleToCloseTimeout time.Duration
117+
startToCloseTimeout time.Duration
118+
deadline time.Time
119+
scheduledTime time.Time
120+
startedTime time.Time
121+
taskQueue string
122+
dataConverter converter.DataConverter
123+
attempt int32 // starts from 1.
124+
heartbeatDetails *commonpb.Payloads
125+
workflowType *WorkflowType
126+
workflowNamespace string
127+
workerStopChannel <-chan struct{}
128+
contextPropagators []ContextPropagator
129+
client *WorkflowClient
130+
priority *commonpb.Priority
129131
}
130132

131133
// context.WithValue need this type instead of basic type string to avoid lint error
@@ -349,20 +351,22 @@ func (a *activityEnvironmentInterceptor) ExecuteActivity(
349351

350352
func (a *activityEnvironmentInterceptor) GetInfo(ctx context.Context) ActivityInfo {
351353
return ActivityInfo{
352-
ActivityID: a.env.activityID,
353-
ActivityType: a.env.activityType,
354-
TaskToken: a.env.taskToken,
355-
WorkflowExecution: a.env.workflowExecution,
356-
HeartbeatTimeout: a.env.heartbeatTimeout,
357-
Deadline: a.env.deadline,
358-
ScheduledTime: a.env.scheduledTime,
359-
StartedTime: a.env.startedTime,
360-
TaskQueue: a.env.taskQueue,
361-
Attempt: a.env.attempt,
362-
WorkflowType: a.env.workflowType,
363-
WorkflowNamespace: a.env.workflowNamespace,
364-
IsLocalActivity: a.env.isLocalActivity,
365-
Priority: convertFromPBPriority(a.env.priority),
354+
ActivityID: a.env.activityID,
355+
ActivityType: a.env.activityType,
356+
TaskToken: a.env.taskToken,
357+
WorkflowExecution: a.env.workflowExecution,
358+
HeartbeatTimeout: a.env.heartbeatTimeout,
359+
ScheduleToCloseTimeout: a.env.scheduleToCloseTimeout,
360+
StartToCloseTimeout: a.env.startToCloseTimeout,
361+
Deadline: a.env.deadline,
362+
ScheduledTime: a.env.scheduledTime,
363+
StartedTime: a.env.startedTime,
364+
TaskQueue: a.env.taskQueue,
365+
Attempt: a.env.attempt,
366+
WorkflowType: a.env.workflowType,
367+
WorkflowNamespace: a.env.workflowNamespace,
368+
IsLocalActivity: a.env.isLocalActivity,
369+
Priority: convertFromPBPriority(a.env.priority),
366370
}
367371
}
368372

test/activity_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ func (a *Activities) failNTimes(_ context.Context, times int, id int) error {
192192
return errFailOnPurpose
193193
}
194194

195-
func (a *Activities) InspectActivityInfo(ctx context.Context, namespace, taskQueue, wfType string, isLocalActivity bool) error {
195+
func (a *Activities) InspectActivityInfo(ctx context.Context, namespace, taskQueue, wfType string, isLocalActivity bool, scheduleToCloseTimeout, startToCloseTimeout time.Duration) error {
196196
a.append("inspectActivityInfo")
197197
if !activity.IsActivity(ctx) {
198198
return fmt.Errorf("expected InActivity to return %v but got %v", true, activity.IsActivity(ctx))
@@ -220,6 +220,12 @@ func (a *Activities) InspectActivityInfo(ctx context.Context, namespace, taskQue
220220
if info.IsLocalActivity != isLocalActivity {
221221
return fmt.Errorf("expected IsLocalActivity %v but got %v", isLocalActivity, info.IsLocalActivity)
222222
}
223+
if info.ScheduleToCloseTimeout != scheduleToCloseTimeout {
224+
return fmt.Errorf("expected ScheduleToCloseTimeout %v but got %v", scheduleToCloseTimeout, info.ScheduleToCloseTimeout)
225+
}
226+
if info.StartToCloseTimeout != startToCloseTimeout {
227+
return fmt.Errorf("expected StartToCloseTimeout %v but got %v", startToCloseTimeout, info.StartToCloseTimeout)
228+
}
223229
return nil
224230
}
225231

test/workflow_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1494,7 +1494,7 @@ func (w *Workflows) InspectActivityInfo(ctx workflow.Context) error {
14941494
wfType := info.WorkflowType.Name
14951495
taskQueue := info.TaskQueueName
14961496
ctx = workflow.WithActivityOptions(ctx, w.defaultActivityOptions())
1497-
return workflow.ExecuteActivity(ctx, "inspectActivityInfo", namespace, taskQueue, wfType, false).Get(ctx, nil)
1497+
return workflow.ExecuteActivity(ctx, "inspectActivityInfo", namespace, taskQueue, wfType, false, 5*time.Second, 5*time.Second).Get(ctx, nil)
14981498
}
14991499

15001500
func (w *Workflows) InspectLocalActivityInfo(ctx workflow.Context) error {
@@ -1505,7 +1505,7 @@ func (w *Workflows) InspectLocalActivityInfo(ctx workflow.Context) error {
15051505
ctx = workflow.WithLocalActivityOptions(ctx, w.defaultLocalActivityOptions())
15061506
var activities *Activities
15071507
return workflow.ExecuteLocalActivity(
1508-
ctx, activities.InspectActivityInfo, namespace, taskQueue, wfType, true).Get(ctx, nil)
1508+
ctx, activities.InspectActivityInfo, namespace, taskQueue, wfType, true, 5*time.Second, 5*time.Second).Get(ctx, nil)
15091509
}
15101510

15111511
func (w *Workflows) WorkflowWithLocalActivityCtxPropagation(ctx workflow.Context) (string, error) {

0 commit comments

Comments
 (0)