Skip to content

Commit

Permalink
Merge pull request #376 from Clever/INFRANG-5393
Browse files Browse the repository at this point in the history
[INFRANG-5393]: embedded workflow manager: race condition allows more…
  • Loading branch information
ribeirophilippe authored Jan 23, 2024
2 parents 5f63f27 + f08efb6 commit 04f4599
Showing 1 changed file with 23 additions and 24 deletions.
47 changes: 23 additions & 24 deletions embedded/poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,33 +58,32 @@ func (e *Embedded) pollGetActivityTask(ctx context.Context, resourceName string,
continue
}
// short circuit at the configured limit
if atomic.LoadInt64(concurrentExecutions) >= e.concurrencyLimits[resourceName] {
continue
}
select {
case <-ctx.Done():
log.Info("getactivitytask-stop")
default:
log.TraceD("getactivitytask-start", logger.M{"activity-arn": activityArn, "worker-name": e.workerName})
out, err := e.sfnAPI.GetActivityTaskWithContext(ctx, &sfn.GetActivityTaskInput{
ActivityArn: aws.String(activityArn),
WorkerName: aws.String(e.workerName),
})
if err != nil {
if err == context.Canceled || awsErr(err, request.CanceledErrorCode) {
log.Info("getactivitytask-stop")
if atomic.LoadInt64(concurrentExecutions) < e.concurrencyLimits[resourceName] {
select {
case <-ctx.Done():
log.Info("getactivitytask-stop")
default:
log.TraceD("getactivitytask-start", logger.M{"activity-arn": activityArn, "worker-name": e.workerName})
out, err := e.sfnAPI.GetActivityTaskWithContext(ctx, &sfn.GetActivityTaskInput{
ActivityArn: aws.String(activityArn),
WorkerName: aws.String(e.workerName),
})
if err != nil {
if err == context.Canceled || awsErr(err, request.CanceledErrorCode) {
log.Info("getactivitytask-stop")
continue
}
log.ErrorD("getactivitytask-error", logger.M{"error": err.Error()})
continue
}
log.ErrorD("getactivitytask-error", logger.M{"error": err.Error()})
continue
}
if out.TaskToken == nil {
continue
if out.TaskToken == nil {
continue
}
input := *out.Input
token := *out.TaskToken
log.TraceD("getactivitytask", logger.M{"input": input, "token": shortToken(token)})
go e.concurrentlyHandleTask(ctx, concurrentExecutions, resourceName, resource, token, input)
}
input := *out.Input
token := *out.TaskToken
log.TraceD("getactivitytask", logger.M{"input": input, "token": shortToken(token)})
go e.concurrentlyHandleTask(ctx, concurrentExecutions, resourceName, resource, token, input)
}
}
return nil
Expand Down

0 comments on commit 04f4599

Please sign in to comment.