diff --git a/src/DurableTask.Core/Exceptions/WorkItemPoisonedException.cs b/src/DurableTask.Core/Exceptions/WorkItemPoisonedException.cs new file mode 100644 index 000000000..beff99826 --- /dev/null +++ b/src/DurableTask.Core/Exceptions/WorkItemPoisonedException.cs @@ -0,0 +1,24 @@ +// --------------------------------------------------------------- +// Copyright (c) Microsoft Corporation. All rights reserved. +// --------------------------------------------------------------- + +namespace DurableTask.Core.Exceptions +{ + using System; + + /// + /// Represents a work item that is poisoned and should not be retried. + /// + public class WorkItemPoisonedException : Exception + { + /// + /// Represents a work item that is poisoned and should not be retried. + /// + public WorkItemPoisonedException( + string message = "Work item is poisoned", + Exception innerException = null + ) : base(message, innerException) + { + } + } +} diff --git a/src/DurableTask.Core/Logging/EventIds.cs b/src/DurableTask.Core/Logging/EventIds.cs index f7386eb99..7de9aa0d4 100644 --- a/src/DurableTask.Core/Logging/EventIds.cs +++ b/src/DurableTask.Core/Logging/EventIds.cs @@ -51,6 +51,7 @@ static class EventIds public const int EntityBatchExecuted = 56; public const int EntityLockAcquired = 57; public const int EntityLockReleased = 58; + public const int OrchestrationPoisoned = 59; public const int TaskActivityStarting = 60; public const int TaskActivityCompleted = 61; @@ -60,6 +61,7 @@ static class EventIds public const int RenewActivityMessageStarting = 65; public const int RenewActivityMessageCompleted = 66; public const int RenewActivityMessageFailed = 67; + public const int TaskActivityPoisoned = 68; public const int SuspendingInstance = 68; public const int ResumingInstance = 69; diff --git a/src/DurableTask.Core/Logging/LogEvents.cs b/src/DurableTask.Core/Logging/LogEvents.cs index fd626d3be..e443ea1fc 100644 --- a/src/DurableTask.Core/Logging/LogEvents.cs +++ b/src/DurableTask.Core/Logging/LogEvents.cs @@ -1,4 +1,4 @@ -// ---------------------------------------------------------------------------------- +// ---------------------------------------------------------------------------------- // Copyright Microsoft Corporation // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -1137,6 +1137,45 @@ void IEventSourceEvent.WriteEventSource() => Utils.PackageVersion); } + /// + /// Log event representing an orchestration becoming poisoned and being marked as failed. + /// + internal class OrchestrationPoisoned : StructuredLogEvent, IEventSourceEvent + { + public OrchestrationPoisoned(OrchestrationInstance instance, string reason) + { + this.InstanceId = instance.InstanceId; + this.ExecutionId = instance.ExecutionId; + this.Details = reason; + } + + [StructuredLogField] + public string InstanceId { get; } + + [StructuredLogField] + public string ExecutionId { get; } + + [StructuredLogField] + public string Details { get; } + + public override EventId EventId => new EventId( + EventIds.OrchestrationPoisoned, + nameof(EventIds.OrchestrationPoisoned)); + + public override LogLevel Level => LogLevel.Warning; + + protected override string CreateLogMessage() => + $"{this.InstanceId}: Orchestration execution is poisoned and was marked as failed: {this.Details}"; + + void IEventSourceEvent.WriteEventSource() => + StructuredEventSource.Log.OrchestrationPoisoned( + this.InstanceId, + this.ExecutionId, + this.Details, + Utils.AppName, + Utils.PackageVersion); + } + /// /// Log event representing the discarding of an orchestration message that cannot be processed. /// @@ -1600,6 +1639,52 @@ void IEventSourceEvent.WriteEventSource() => Utils.PackageVersion); } + internal class TaskActivityPoisoned : StructuredLogEvent, IEventSourceEvent + { + public TaskActivityPoisoned(OrchestrationInstance instance, TaskScheduledEvent taskEvent, string details) + { + this.InstanceId = instance.InstanceId; + this.ExecutionId = instance.ExecutionId; + this.Name = taskEvent.Name; + this.TaskEventId = taskEvent.EventId; + this.Details = details; + } + + [StructuredLogField] + public string InstanceId { get; } + + [StructuredLogField] + public string ExecutionId { get; } + + [StructuredLogField] + public string Name { get; } + + [StructuredLogField] + public int TaskEventId { get; } + + [StructuredLogField] + public string Details { get; } + + public override EventId EventId => new EventId( + EventIds.TaskActivityPoisoned, + nameof(EventIds.TaskActivityPoisoned)); + + public override LogLevel Level => LogLevel.Warning; + + protected override string CreateLogMessage() => + $"{this.InstanceId}: Task activity {GetEventDescription(this.Name, this.TaskEventId)} is poisoned and was marked as failed: {this.Details}"; + + void IEventSourceEvent.WriteEventSource() => + StructuredEventSource.Log.TaskActivityPoisoned( + this.InstanceId, + this.ExecutionId, + this.Name, + this.TaskEventId, + this.Details, + Utils.AppName, + Utils.PackageVersion); + } + internal class TaskActivityDispatcherError : StructuredLogEvent, IEventSourceEvent { public TaskActivityDispatcherError(TaskActivityWorkItem workItem, string details) diff --git a/src/DurableTask.Core/Logging/LogHelper.cs b/src/DurableTask.Core/Logging/LogHelper.cs index 109c14c42..fdf527a63 100644 --- a/src/DurableTask.Core/Logging/LogHelper.cs +++ b/src/DurableTask.Core/Logging/LogHelper.cs @@ -10,6 +10,7 @@ // See the License for the specific language governing permissions and // limitations under the License. // ---------------------------------------------------------------------------------- + #nullable enable namespace DurableTask.Core.Logging { @@ -34,6 +35,7 @@ public LogHelper(ILogger? log) bool IsStructuredLoggingEnabled => this.log != null; #region TaskHubWorker + /// /// Logs that a is starting. /// @@ -79,6 +81,7 @@ internal void TaskHubWorkerStopped(TimeSpan latency) this.WriteStructuredLog(new LogEvents.TaskHubWorkerStopped(latency)); } } + #endregion #region WorkItemDispatcher traces @@ -497,6 +500,19 @@ internal void OrchestrationAborted(OrchestrationInstance instance, string reason } } + /// + /// Logs a warning indicating that the activity execution is poisoned and was canceled. + /// + /// The orchestration instance that failed. + /// The reason for the orchestration execution becoming poisoned. + internal void OrchestrationPoisoned(OrchestrationInstance instance, string reason) + { + if (this.IsStructuredLoggingEnabled) + { + this.WriteStructuredLog(new LogEvents.OrchestrationPoisoned(instance, reason)); + } + } + /// /// Helper method for logging the dropping of all messages associated with the specified work item. /// @@ -616,6 +632,7 @@ internal void EntityLockReleased(string entityId, Core.Entities.EventFormat.Rele #endregion #region Activity dispatcher + /// /// Logs that a task activity is about to begin execution. /// @@ -678,6 +695,20 @@ internal void TaskActivityAborted(OrchestrationInstance instance, TaskScheduledE } } + /// + /// Logs a warning indicating that the activity execution is poisoned and was canceled. + /// + /// The orchestration instance that scheduled this task activity. + /// The history event associated with this activity execution. + /// More information about why the execution failed. + internal void TaskActivityPoisoned(OrchestrationInstance instance, TaskScheduledEvent taskEvent, string details) + { + if (this.IsStructuredLoggingEnabled) + { + this.WriteStructuredLog(new LogEvents.TaskActivityPoisoned(instance, taskEvent, details)); + } + } + /// /// Logs that an error occurred when attempting to dispatch an activity work item. /// @@ -728,6 +759,7 @@ internal void RenewActivityMessageFailed(TaskActivityWorkItem workItem, Exceptio this.WriteStructuredLog(new LogEvents.RenewActivityMessageFailed(workItem, exception), exception); } } + #endregion internal void OrchestrationDebugTrace(string instanceId, string executionId, string details) diff --git a/src/DurableTask.Core/Logging/StructuredEventSource.cs b/src/DurableTask.Core/Logging/StructuredEventSource.cs index 162a226f2..913800aef 100644 --- a/src/DurableTask.Core/Logging/StructuredEventSource.cs +++ b/src/DurableTask.Core/Logging/StructuredEventSource.cs @@ -599,6 +599,26 @@ internal void OrchestrationAborted( } } + [Event(EventIds.OrchestrationPoisoned, Level = EventLevel.Warning, Version = 1)] + internal void OrchestrationPoisoned( + string InstanceId, + string ExecutionId, + string Details, + string AppName, + string ExtensionVersion) + { + if (this.IsEnabled(EventLevel.Warning)) + { + this.WriteEvent( + EventIds.OrchestrationPoisoned, + InstanceId, + ExecutionId, + Details, + AppName, + ExtensionVersion); + } + } + [Event(EventIds.DiscardingMessage, Level = EventLevel.Warning, Version = 1)] internal void DiscardingMessage( string InstanceId, @@ -712,7 +732,7 @@ internal void EntityLockReleased( // TODO: Use WriteEventCore for better performance this.WriteEvent( EventIds.EntityLockReleased, - EntityId, + EntityId, InstanceId, Id, AppName, @@ -818,6 +838,32 @@ internal void TaskActivityAborted( } } + [Event(EventIds.TaskActivityPoisoned, Level = EventLevel.Warning, Version = 1)] + internal void TaskActivityPoisoned( + string InstanceId, + string ExecutionId, + string Name, + int TaskEventId, + string Details, + string AppName, + string ExtensionVersion + ) + { + if (this.IsEnabled(EventLevel.Warning)) + { + // TODO: Use WriteEventCore for better performance + this.WriteEvent( + EventIds.TaskActivityPoisoned, + InstanceId, + ExecutionId, + Name, + TaskEventId, + Details, + AppName, + ExtensionVersion); + } + } + [Event(EventIds.TaskActivityDispatcherError, Level = EventLevel.Error, Version = 1)] internal void TaskActivityDispatcherError( string InstanceId, diff --git a/src/DurableTask.Core/OrchestrationRuntimeState.cs b/src/DurableTask.Core/OrchestrationRuntimeState.cs index 494071dbd..11706b40b 100644 --- a/src/DurableTask.Core/OrchestrationRuntimeState.cs +++ b/src/DurableTask.Core/OrchestrationRuntimeState.cs @@ -97,6 +97,19 @@ public OrchestrationRuntimeState(IList? events) } } + /// + /// Returns a deep copy of the object. + /// + /// Cloned object + public OrchestrationRuntimeState Clone() + { + return new OrchestrationRuntimeState(this.Events) + { + Size = this.Size, + Status = this.Status, + }; + } + /// /// Gets the execution started event /// @@ -188,7 +201,7 @@ public OrchestrationStatus OrchestrationStatus /// /// An invalid orchestration runtime state means that the history is somehow corrupted. /// - public bool IsValid => + public bool IsValid => this.Events.Count == 0 || this.Events.Count == 1 && this.Events[0].EventType == EventType.OrchestratorStarted || this.ExecutionStartedEvent != null; @@ -253,8 +266,8 @@ bool IsDuplicateEvent(HistoryEvent historyEvent) historyEvent.EventType == EventType.TaskCompleted && !completedEventIds.Add(historyEvent.EventId)) { - TraceHelper.Trace(TraceEventType.Warning, - "OrchestrationRuntimeState-DuplicateEvent", + TraceHelper.Trace(TraceEventType.Warning, + "OrchestrationRuntimeState-DuplicateEvent", "The orchestration '{0}' has already seen a completed task with id {1}.", this.OrchestrationInstance?.InstanceId ?? "", historyEvent.EventId); @@ -287,7 +300,7 @@ void SetMarkerEvents(HistoryEvent historyEvent) // It's not generally expected to receive multiple execution completed events for a given orchestrator, but it's possible under certain race conditions. // For example: when an orchestrator is signaled to terminate at the same time as it attempts to continue-as-new. var log = $"Received new {completedEvent.GetType().Name} event despite the orchestration being already in the {orchestrationStatus} state."; - + if (orchestrationStatus == OrchestrationStatus.ContinuedAsNew && completedEvent.OrchestrationStatus == OrchestrationStatus.Terminated) { // If the orchestration planned to continue-as-new but termination is requested, we transition to the terminated state. diff --git a/src/DurableTask.Core/TaskActivityDispatcher.cs b/src/DurableTask.Core/TaskActivityDispatcher.cs index 1c22c307f..1c21fa24e 100644 --- a/src/DurableTask.Core/TaskActivityDispatcher.cs +++ b/src/DurableTask.Core/TaskActivityDispatcher.cs @@ -10,6 +10,7 @@ // See the License for the specific language governing permissions and // limitations under the License. // ---------------------------------------------------------------------------------- + #nullable enable namespace DurableTask.Core { @@ -117,13 +118,12 @@ async Task OnProcessWorkItemAsync(TaskActivityWorkItem workItem) if (taskMessage.Event.EventType != EventType.TaskScheduled) { this.logHelper.TaskActivityDispatcherError( - workItem, + workItem, $"The activity worker received an event of type '{taskMessage.Event.EventType}' but only '{EventType.TaskScheduled}' is supported."); throw TraceHelper.TraceException( TraceEventType.Critical, "TaskActivityDispatcher-UnsupportedEventType", - new NotSupportedException("Activity worker does not support event of type: " + - taskMessage.Event.EventType)); + new NotSupportedException("Activity worker does not support event of type: " + taskMessage.Event.EventType)); } scheduledEvent = (TaskScheduledEvent)taskMessage.Event; @@ -268,6 +268,22 @@ await this.dispatchPipeline.RunAsync(dispatchContext, async _ => TraceHelper.TraceInstance(TraceEventType.Warning, "TaskActivityDispatcher-ExecutionAborted", orchestrationInstance, "{0}: {1}", scheduledEvent?.Name ?? "", e.Message); await this.orchestrationService.AbandonTaskActivityWorkItemAsync(workItem); } + catch (WorkItemPoisonedException poisonedException) when (scheduledEvent is not null) + { + // The task activity is poisoned and should be marked as failed + this.logHelper.TaskActivityPoisoned(orchestrationInstance, scheduledEvent!, poisonedException.Message); + TraceHelper.TraceInstance(TraceEventType.Warning, "TaskActivityDispatcher-ExecutionPoisoned", orchestrationInstance, "{0}: {1}", scheduledEvent?.Name ?? "", poisonedException.Message); + await this.orchestrationService.CompleteTaskActivityWorkItemAsync( + workItem, new TaskMessage() + { + Event = new TaskFailedEvent( + -1, + // Guaranteed to be not null because of the "when" clause in the catch block + scheduledEvent!.EventId, + poisonedException.Message, string.Empty), + OrchestrationInstance = orchestrationInstance, + }); + } finally { diagnosticActivity?.Stop(); // Ensure the activity is stopped here to prevent it from leaking out. @@ -349,4 +365,4 @@ DateTime AdjustRenewAt(DateTime renewAt) return renewAt > maxRenewAt ? maxRenewAt : renewAt; } } -} \ No newline at end of file +} diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs index 61864a1a5..75472c369 100644 --- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs +++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs @@ -155,7 +155,7 @@ void EnsureExecutionStartedIsFirst(IList batch) { // Keep track of orchestrator generation changes, maybe update target position string executionId = message.OrchestrationInstance.ExecutionId; - if(previousExecutionId != executionId) + if (previousExecutionId != executionId) { // We want to re-position the ExecutionStarted event after the "right-most" // event with a non-null executionID that came before it. @@ -217,7 +217,7 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem) CorrelationTraceClient.Propagate( () => - { + { // Check if it is extended session. // TODO: Remove this code - it looks incorrect and dangerous isExtendedSession = this.concurrentSessionLock.Acquire(); @@ -305,7 +305,7 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work var isCompleted = false; var continuedAsNew = false; var isInterrupted = false; - + // correlation CorrelationTraceClient.Propagate(() => CorrelationTraceContext.Current = workItem.TraceContext); @@ -316,11 +316,10 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work workItem.OrchestrationRuntimeState.LogHelper = this.logHelper; OrchestrationRuntimeState runtimeState = workItem.OrchestrationRuntimeState; + OrchestrationRuntimeState originalOrchestrationRuntimeState = runtimeState.Clone(); runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); - OrchestrationRuntimeState originalOrchestrationRuntimeState = runtimeState; - // Distributed tracing support: each orchestration execution is a trace activity // that derives from an established parent trace context. It is expected that some // listener will receive these events and publish them to a distributed trace logger. @@ -363,7 +362,6 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work continuedAsNew = false; continuedAsNewMessage = null; - this.logHelper.OrchestrationExecuting(runtimeState.OrchestrationInstance!, runtimeState.Name); TraceHelper.TraceInstance( TraceEventType.Verbose, @@ -600,7 +598,9 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work if (workItem.RestoreOriginalRuntimeStateDuringCompletion) { // some backends expect the original runtime state object - workItem.OrchestrationRuntimeState = originalOrchestrationRuntimeState; + // NOTE: In a previous version of the code, originalOrchestrationRuntimeState was pointing to the same object/memory as runtimeState + // We use here runtimeState to preserve the original behavior. + workItem.OrchestrationRuntimeState = runtimeState; } runtimeState.Status = runtimeState.Status ?? carryOverStatus; @@ -610,15 +610,44 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work instanceState.Status = runtimeState.Status; } - await this.orchestrationService.CompleteTaskOrchestrationWorkItemAsync( - workItem, - runtimeState, - continuedAsNew ? null : messagesToSend, - orchestratorMessages, - continuedAsNew ? null : timerMessages, - continuedAsNewMessage, - instanceState); - + try + { + await this.orchestrationService.CompleteTaskOrchestrationWorkItemAsync( + workItem, + runtimeState, + continuedAsNew ? null : messagesToSend, + orchestratorMessages, + continuedAsNew ? null : timerMessages, + continuedAsNewMessage, + instanceState); + } + catch (WorkItemPoisonedException poisonedException) + { + // The orchestration is poisoned and should be marked as failed + OrchestrationInstance instance = workItem.OrchestrationRuntimeState?.OrchestrationInstance ?? new OrchestrationInstance { InstanceId = workItem.InstanceId }; + this.logHelper.OrchestrationPoisoned(instance, poisonedException.Message); + TraceHelper.TraceInstance(TraceEventType.Warning, "TaskOrchestrationDispatcher-ExecutionPoisoned", instance, "{0}", poisonedException.Message); + + OrchestrationRuntimeState failedRuntimeState = originalOrchestrationRuntimeState.Clone(); + failedRuntimeState.AddEvent(new OrchestratorStartedEvent(-1)); + failedRuntimeState.AddEvent(new ExecutionCompletedEvent( + -1, poisonedException.Message, OrchestrationStatus.Failed, new FailureDetails(poisonedException))); + failedRuntimeState.AddEvent(new OrchestratorCompletedEvent(-1)); + + await this.orchestrationService.CompleteTaskOrchestrationWorkItemAsync( + workItem, + failedRuntimeState, + outboundMessages: Array.Empty(), + orchestratorMessages: Array.Empty(), + timerMessages: Array.Empty(), + continuedAsNewMessage: null, + instanceState); + + isCompleted = false; + continuedAsNew = false; + isInterrupted = true; + } + if (workItem.RestoreOriginalRuntimeStateDuringCompletion) { workItem.OrchestrationRuntimeState = runtimeState; @@ -1143,11 +1172,11 @@ TaskMessage ProcessSendEventDecision( { var historyEvent = new EventSentEvent(sendEventAction.Id) { - InstanceId = sendEventAction.Instance?.InstanceId, - Name = sendEventAction.EventName, - Input = sendEventAction.EventData + InstanceId = sendEventAction.Instance?.InstanceId, + Name = sendEventAction.EventName, + Input = sendEventAction.EventData }; - + runtimeState.AddEvent(historyEvent); EventRaisedEvent eventRaisedEvent = new EventRaisedEvent(-1, sendEventAction.EventData) @@ -1169,7 +1198,7 @@ TaskMessage ProcessSendEventDecision( Event = eventRaisedEvent }; } - + internal class NonBlockingCountdownLock { int available;