-
Notifications
You must be signed in to change notification settings - Fork 317
Handle poisoned work items by marking them as failed #1175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| // --------------------------------------------------------------- | ||
| // Copyright (c) Microsoft Corporation. All rights reserved. | ||
| // --------------------------------------------------------------- | ||
|
|
||
| namespace DurableTask.Core.Exceptions | ||
| { | ||
| using System; | ||
|
|
||
| /// <summary> | ||
| /// Represents a work item that is poisoned and should not be retried. | ||
| /// </summary> | ||
| public class WorkItemPoisonedException : Exception | ||
| { | ||
| /// <summary> | ||
| /// Represents a work item that is poisoned and should not be retried. | ||
| /// </summary> | ||
| public WorkItemPoisonedException( | ||
| string message = "Work item is poisoned", | ||
| Exception innerException = null | ||
| ) : base(message, innerException) | ||
| { | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -599,6 +599,26 @@ internal void OrchestrationAborted( | |
| } | ||
| } | ||
|
|
||
| [Event(EventIds.OrchestrationPoisoned, Level = EventLevel.Warning, Version = 1)] | ||
| internal void OrchestrationPoisoned( | ||
| string InstanceId, | ||
| string ExecutionId, | ||
| string Details, | ||
| string AppName, | ||
| string ExtensionVersion) | ||
| { | ||
| if (this.IsEnabled(EventLevel.Warning)) | ||
| { | ||
| this.WriteEvent( | ||
| EventIds.OrchestrationPoisoned, | ||
| InstanceId, | ||
| ExecutionId, | ||
| Details, | ||
| AppName, | ||
| ExtensionVersion); | ||
| } | ||
| } | ||
|
|
||
| [Event(EventIds.DiscardingMessage, Level = EventLevel.Warning, Version = 1)] | ||
| internal void DiscardingMessage( | ||
| string InstanceId, | ||
|
|
@@ -712,7 +732,7 @@ internal void EntityLockReleased( | |
| // TODO: Use WriteEventCore for better performance | ||
| this.WriteEvent( | ||
| EventIds.EntityLockReleased, | ||
| EntityId, | ||
| EntityId, | ||
| InstanceId, | ||
| Id, | ||
| AppName, | ||
|
|
@@ -818,6 +838,32 @@ internal void TaskActivityAborted( | |
| } | ||
| } | ||
|
|
||
| [Event(EventIds.TaskActivityPoisoned, Level = EventLevel.Warning, Version = 1)] | ||
| internal void TaskActivityPoisoned( | ||
| string InstanceId, | ||
| string ExecutionId, | ||
| string Name, | ||
| int TaskEventId, | ||
| string Details, | ||
| string AppName, | ||
| string ExtensionVersion | ||
| ) | ||
| { | ||
| if (this.IsEnabled(EventLevel.Warning)) | ||
| { | ||
| // TODO: Use WriteEventCore for better performance | ||
| this.WriteEvent( | ||
| EventIds.TaskActivityPoisoned, | ||
| InstanceId, | ||
| ExecutionId, | ||
| Name, | ||
| TaskEventId, | ||
| Details, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same question about null checks. |
||
| AppName, | ||
| ExtensionVersion); | ||
| } | ||
| } | ||
|
|
||
| [Event(EventIds.TaskActivityDispatcherError, Level = EventLevel.Error, Version = 1)] | ||
| internal void TaskActivityDispatcherError( | ||
| string InstanceId, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -97,6 +97,19 @@ public OrchestrationRuntimeState(IList<HistoryEvent>? events) | |
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Returns a deep copy of the object. | ||
| /// </summary> | ||
| /// <returns>Cloned object</returns> | ||
| public OrchestrationRuntimeState Clone() | ||
| { | ||
| return new OrchestrationRuntimeState(this.Events) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this is technically a deep clone operation. The state of the two objects may potentially be different. For example, if the object being cloned as a set of history events in both the I suggest either we carefully document that this is not an exact clone or give this method a different name. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you reckon the issue is with the description/comment only, or should the behavior be changed too? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't have a strong opinion. Here's what I think the code would need to be to do a proper deep clone*. var cloned = new OrchestrationRuntimeState(this.PastEvents)
{
CompressedSize = this.CompressedSize,
Size = this.Size,
Status = this.Status,
};
foreach (HistoryEvent e in this.NewEvents)
{
cloned.AddEvent(e);
}
return cloned;*this is not technically a "full deep clone" since we're not cloning the history events and I expect some history events may be mutable. Might be worth mentioning that in the comments. |
||
| { | ||
| Size = this.Size, | ||
| Status = this.Status, | ||
| }; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Gets the execution started event | ||
| /// </summary> | ||
|
|
@@ -188,7 +201,7 @@ public OrchestrationStatus OrchestrationStatus | |
| /// <remarks> | ||
| /// An invalid orchestration runtime state means that the history is somehow corrupted. | ||
| /// </remarks> | ||
| public bool IsValid => | ||
| public bool IsValid => | ||
| this.Events.Count == 0 || | ||
| this.Events.Count == 1 && this.Events[0].EventType == EventType.OrchestratorStarted || | ||
| this.ExecutionStartedEvent != null; | ||
|
|
@@ -253,8 +266,8 @@ bool IsDuplicateEvent(HistoryEvent historyEvent) | |
| historyEvent.EventType == EventType.TaskCompleted && | ||
| !completedEventIds.Add(historyEvent.EventId)) | ||
| { | ||
| TraceHelper.Trace(TraceEventType.Warning, | ||
| "OrchestrationRuntimeState-DuplicateEvent", | ||
| TraceHelper.Trace(TraceEventType.Warning, | ||
| "OrchestrationRuntimeState-DuplicateEvent", | ||
| "The orchestration '{0}' has already seen a completed task with id {1}.", | ||
| this.OrchestrationInstance?.InstanceId ?? "", | ||
| historyEvent.EventId); | ||
|
|
@@ -287,7 +300,7 @@ void SetMarkerEvents(HistoryEvent historyEvent) | |
| // It's not generally expected to receive multiple execution completed events for a given orchestrator, but it's possible under certain race conditions. | ||
| // For example: when an orchestrator is signaled to terminate at the same time as it attempts to continue-as-new. | ||
| var log = $"Received new {completedEvent.GetType().Name} event despite the orchestration being already in the {orchestrationStatus} state."; | ||
|
|
||
| if (orchestrationStatus == OrchestrationStatus.ContinuedAsNew && completedEvent.OrchestrationStatus == OrchestrationStatus.Terminated) | ||
| { | ||
| // If the orchestration planned to continue-as-new but termination is requested, we transition to the terminated state. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,6 +10,7 @@ | |
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
| // ---------------------------------------------------------------------------------- | ||
|
|
||
| #nullable enable | ||
| namespace DurableTask.Core | ||
| { | ||
|
|
@@ -117,13 +118,12 @@ async Task OnProcessWorkItemAsync(TaskActivityWorkItem workItem) | |
| if (taskMessage.Event.EventType != EventType.TaskScheduled) | ||
| { | ||
| this.logHelper.TaskActivityDispatcherError( | ||
| workItem, | ||
| workItem, | ||
| $"The activity worker received an event of type '{taskMessage.Event.EventType}' but only '{EventType.TaskScheduled}' is supported."); | ||
| throw TraceHelper.TraceException( | ||
| TraceEventType.Critical, | ||
| "TaskActivityDispatcher-UnsupportedEventType", | ||
| new NotSupportedException("Activity worker does not support event of type: " + | ||
| taskMessage.Event.EventType)); | ||
| new NotSupportedException("Activity worker does not support event of type: " + taskMessage.Event.EventType)); | ||
| } | ||
|
|
||
| scheduledEvent = (TaskScheduledEvent)taskMessage.Event; | ||
|
|
@@ -268,6 +268,22 @@ await this.dispatchPipeline.RunAsync(dispatchContext, async _ => | |
| TraceHelper.TraceInstance(TraceEventType.Warning, "TaskActivityDispatcher-ExecutionAborted", orchestrationInstance, "{0}: {1}", scheduledEvent?.Name ?? "", e.Message); | ||
| await this.orchestrationService.AbandonTaskActivityWorkItemAsync(workItem); | ||
| } | ||
| catch (WorkItemPoisonedException poisonedException) when (scheduledEvent is not null) | ||
| { | ||
| // The task activity is poisoned and should be marked as failed | ||
| this.logHelper.TaskActivityPoisoned(orchestrationInstance, scheduledEvent!, poisonedException.Message); | ||
| TraceHelper.TraceInstance(TraceEventType.Warning, "TaskActivityDispatcher-ExecutionPoisoned", orchestrationInstance, "{0}: {1}", scheduledEvent?.Name ?? "", poisonedException.Message); | ||
| await this.orchestrationService.CompleteTaskActivityWorkItemAsync( | ||
| workItem, new TaskMessage() | ||
| { | ||
| Event = new TaskFailedEvent( | ||
| -1, | ||
| // Guaranteed to be not null because of the "when" clause in the catch block | ||
| scheduledEvent!.EventId, | ||
| poisonedException.Message, string.Empty), | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to also populate the |
||
| OrchestrationInstance = orchestrationInstance, | ||
| }); | ||
| } | ||
| finally | ||
| { | ||
| diagnosticActivity?.Stop(); // Ensure the activity is stopped here to prevent it from leaking out. | ||
|
|
@@ -349,4 +365,4 @@ DateTime AdjustRenewAt(DateTime renewAt) | |
| return renewAt > maxRenewAt ? maxRenewAt : renewAt; | ||
| } | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to check for null here? Passing
nulltoWriteEventfor any parameter will cause the entire log event to be lost. If I'm reading the code correctly, this comes from theWorkItemPoisonedException.Messageproperty, which doesn't have any guards against null values.