Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

**DRAFT** Service Bus Processor instrumentation #2897

Draft
wants to merge 6 commits into
base: feature/azure-service-bus-instrumentation
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/Agent/NewRelic/Agent/Core/Metrics/MetricNames.cs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,9 @@ public enum MessageBrokerAction
Consume,
Peek,
Purge,
Process,
Settle,
Cancel
}

public const string MessageBrokerPrefix = "MessageBroker";
Expand Down
6 changes: 6 additions & 0 deletions src/Agent/NewRelic/Agent/Core/Transactions/Transaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,12 @@ private static MetricNames.MessageBrokerAction AgentWrapperApiEnumToMetricNamesE
return MetricNames.MessageBrokerAction.Produce;
case MessageBrokerAction.Purge:
return MetricNames.MessageBrokerAction.Purge;
case MessageBrokerAction.Process:
return MetricNames.MessageBrokerAction.Process;
case MessageBrokerAction.Settle:
return MetricNames.MessageBrokerAction.Settle;
case MessageBrokerAction.Cancel:
return MetricNames.MessageBrokerAction.Cancel;
default:
throw new InvalidOperationException("Unexpected enum value: " + wrapper);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ public enum MessageBrokerAction
Consume,
Peek,
Purge,
Process,
Settle,
Cancel
}

///<summary>This enum must be a sequence of values starting with 0 and incrementing by 1. See MetricNames.GetEnumerationFunc</summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2020 New Relic, Inc. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

using System.Threading.Tasks;
using NewRelic.Agent.Api;
using NewRelic.Agent.Extensions.Providers.Wrapper;

namespace NewRelic.Providers.Wrapper.AzureServiceBus;

public class AzureServiceBusProcessorWrapper : AzureServiceBusWrapperBase
{
public override bool IsTransactionRequired => true;

public override CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMethodInfo)
{
var canWrap = instrumentedMethodInfo.RequestedWrapperName.Equals(nameof(AzureServiceBusProcessorWrapper));
return new CanWrapResponse(canWrap);
}

public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent, ITransaction transaction)
{
if (instrumentedMethodCall.IsAsync)
transaction.AttachToAsync();

// this call wraps the client event handler callback, so start a method segment that will time the callback
var segment = transaction.StartMethodSegment(
instrumentedMethodCall.MethodCall,
instrumentedMethodCall.MethodCall.Method.Type.Name,
instrumentedMethodCall.MethodCall.Method.MethodName);

return instrumentedMethodCall.IsAsync ?
Delegates.GetAsyncDelegateFor<Task>(agent, segment)
:
Delegates.GetDelegateFor(segment);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ public class AzureServiceBusReceiveWrapper : AzureServiceBusWrapperBase
{
private static readonly ConcurrentDictionary<Type, Func<object, object>> _getResultFromGenericTask = new();

private Func<object, object> _innerReceiverAccessor;
private Func<object, bool> _innerReceiverIsProcessorAccessor;

public override bool IsTransactionRequired => false; // only partially true. See the code below...

public override CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMethodInfo)
{
var canWrap = instrumentedMethodInfo.RequestedWrapperName.Equals(nameof(AzureServiceBusReceiveWrapper));
Expand All @@ -24,32 +29,71 @@ public override CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMetho

public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent, ITransaction transaction)
{
transaction.LogFinest("AzureServiceBusReceiveWrapper.BeforeWrappedMethod() is starting.");

dynamic serviceBusReceiver = instrumentedMethodCall.MethodCall.InvocationTarget;
string queueName = serviceBusReceiver.EntityPath; // some-queue-name
string fqns = serviceBusReceiver.FullyQualifiedNamespace; // some-service-bus-entity.servicebus.windows.net
string fqns = serviceBusReceiver.FullyQualifiedNamespace; // some-service-bus-entity.servicebus.windows.net

_innerReceiverAccessor ??= VisibilityBypasser.Instance.GeneratePropertyAccessor<object>(serviceBusReceiver.GetType(), "InnerReceiver");
object innerReceiver = _innerReceiverAccessor.Invoke(serviceBusReceiver);

// use reflection to access the _isProcessor field of the inner receiver
_innerReceiverIsProcessorAccessor ??= VisibilityBypasser.Instance.GenerateFieldReadAccessor<bool>(innerReceiver.GetType(), "_isProcessor");
var isProcessor = _innerReceiverIsProcessorAccessor.Invoke(innerReceiver);

var instrumentedMethodName = instrumentedMethodCall.MethodCall.Method.MethodName;

MessageBrokerAction action =
instrumentedMethodCall.MethodCall.Method.MethodName switch
instrumentedMethodName switch
{
"ReceiveMessagesAsync" => MessageBrokerAction.Consume,
"ReceiveDeferredMessagesAsync" => MessageBrokerAction.Consume,
"PeekMessagesInternalAsync" => MessageBrokerAction.Peek,
"AbandonMessageAsync" => MessageBrokerAction.Purge, // TODO is this correct ??? Abandon sends the message back to the queue for re-delivery
"CompleteMessageAsync" => MessageBrokerAction.Consume,
"DeadLetterInternalAsync" => MessageBrokerAction.Purge, // TODO is this correct ???
"DeferMessageAsync" => MessageBrokerAction.Consume, // TODO is this correct or should we extend MessageBrokerAction with more values???
"RenewMessageLockAsync" => MessageBrokerAction.Consume, // TODO is this correct or should we extend MessageBrokerAction with more values???
_ => throw new ArgumentOutOfRangeException(nameof(action), $"Unexpected instrumented method call: {instrumentedMethodCall.MethodCall.Method.MethodName}")
"AbandonMessageAsync" => MessageBrokerAction.Settle,
"CompleteMessageAsync" => MessageBrokerAction.Settle,
"DeadLetterInternalAsync" => MessageBrokerAction.Settle,
"DeferMessageAsync" => MessageBrokerAction.Settle,
"RenewMessageLockAsync" => MessageBrokerAction.Consume, // TODO This doesn't quite fit. OTEL uses a default action with no name for this
_ => throw new ArgumentOutOfRangeException(nameof(action), $"Unexpected instrumented method call: {instrumentedMethodName}")
};

// start a message broker segment
// If the inner receiver is configured as a processor and this is a ReceiveMessagesAsync call, start a transaction.
// The transaction will end at the conclusion of ReceiverManager.ProcessOneMessageWithinScopeAsync()
if (isProcessor && instrumentedMethodName == "ReceiveMessagesAsync")
{
transaction = agent.CreateTransaction(
destinationType: MessageBrokerDestinationType.Queue,
BrokerVendorName,
destination: queueName);

if (instrumentedMethodCall.IsAsync)
{
transaction.DetachFromPrimary();
}

transaction.LogFinest("Created transaction for ReceiveMessagesAsync in processor mode.");
}
else
{
transaction = agent.CurrentTransaction;
transaction.LogFinest($"Using existing transaction for {instrumentedMethodName}.");
}

if (instrumentedMethodCall.IsAsync)
{
transaction.AttachToAsync();
}


// start a message broker segment (only happens if transaction is not NoOpTransaction)
var segment = transaction.StartMessageBrokerSegment(
instrumentedMethodCall.MethodCall,
MessageBrokerDestinationType.Queue,
action,
BrokerVendorName,
queueName,
serverAddress: fqns );
serverAddress: fqns);

return instrumentedMethodCall.IsAsync
?
Expand All @@ -58,71 +102,39 @@ public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMetho
agent,
segment,
false,
HandleResponse,
TaskContinuationOptions.ExecuteSynchronously)
: Delegates.GetDelegateFor<object>(
onFailure: transaction.NoticeError,
onComplete: segment.End,
onSuccess: ExtractDTHeadersIfAvailable);

void HandleResponse(Task responseTask)
{
try
{
if (responseTask.IsFaulted)
(responseTask) =>
{
transaction.NoticeError(responseTask.Exception);
return;
}

var resultObj = GetTaskResultFromObject(responseTask);
ExtractDTHeadersIfAvailable(resultObj);
}
finally
{
segment.End();
}
}



void ExtractDTHeadersIfAvailable(object resultObj)
{
if (resultObj != null)
{
switch (instrumentedMethodCall.MethodCall.Method.MethodName)
{
case "ReceiveMessagesAsync":
case "ReceiveDeferredMessagesAsync":
case "PeekMessagesInternalAsync":
// the response contains a list of messages.
// get the first message from the response and extract DT headers
dynamic messages = resultObj;
if (messages.Count > 0)
try
{
if (responseTask.IsFaulted)
{
var msg = messages[0];
if (msg.ApplicationProperties is ReadOnlyDictionary<string, object> applicationProperties)
{
transaction.AcceptDistributedTraceHeaders(applicationProperties, ProcessHeaders, TransportType.Queue);
}
transaction.NoticeError(responseTask.Exception);
}
break;
}
}
IEnumerable<string> ProcessHeaders(ReadOnlyDictionary<string, object> applicationProperties, string key)
{
var headerValues = new List<string>();
foreach (var item in applicationProperties)
{
if (item.Key.Equals(key, StringComparison.OrdinalIgnoreCase))

HandleReceiveResponse(responseTask, instrumentedMethodName, transaction, isProcessor);
}
catch (Exception ex)
{
headerValues.Add(item.Value as string);
transaction.LogFinest($"Unexpected exception: {ex.Message}");
}
}
finally
{
transaction.LogFinest($"Ending segment for {instrumentedMethodName}.");
segment.End();

return headerValues;
}
}
if (isProcessor && responseTask.IsCanceled)
{
transaction.LogFinest("ReceiveMessagesAsync task was canceled in processor mode. Ignoring transaction.");
transaction.Ignore();
}
}
},
TaskContinuationOptions.ExecuteSynchronously)
:
Delegates.GetDelegateFor<object>(
onFailure: transaction.NoticeError,
onComplete: segment.End,
onSuccess: (resultObj) => ExtractDTHeadersIfAvailable(resultObj, transaction, instrumentedMethodName, isProcessor));
}

private static object GetTaskResultFromObject(object taskObj)
Expand All @@ -136,8 +148,63 @@ private static object GetTaskResultFromObject(object taskObj)
{
return null;
}
if (task.IsCanceled)
{
return null;
}

var getResponse = _getResultFromGenericTask.GetOrAdd(task.GetType(), t => VisibilityBypasser.Instance.GeneratePropertyAccessor<object>(t, "Result"));
return getResponse(task);
}

private static void HandleReceiveResponse(Task responseTask, string instrumentedMethodName, ITransaction transaction, bool isProcessor)
{
var resultObj = GetTaskResultFromObject(responseTask);
ExtractDTHeadersIfAvailable(resultObj, transaction, instrumentedMethodName, isProcessor);
}
private static void ExtractDTHeadersIfAvailable(object resultObj, ITransaction transaction, string instrumentedMethodName, bool isProcessor)
{
if (resultObj != null)
{
switch (instrumentedMethodName)
{
case "ReceiveMessagesAsync":
case "ReceiveDeferredMessagesAsync":
case "PeekMessagesInternalAsync":
// the response contains a list of messages.
// get the first message from the response and extract DT headers
dynamic messages = resultObj;
if (messages.Count > 0)
{
transaction.LogFinest($"Received {messages.Count} message(s). Accepting DT headers.");
var msg = messages[0];
if (msg.ApplicationProperties is ReadOnlyDictionary<string, object> applicationProperties)
{
transaction.AcceptDistributedTraceHeaders(applicationProperties, ProcessHeaders, TransportType.Queue);
}
}
else if (messages.Count == 0 && isProcessor) // if there are no messages and the receiver is a processor, ignore the transaction we created
{
transaction.LogFinest("No messages received. Ignoring transaction.");
transaction.Ignore();
}
break;
}
}
}

private static IEnumerable<string> ProcessHeaders(ReadOnlyDictionary<string, object> applicationProperties, string key)
{
var headerValues = new List<string>();
foreach (var item in applicationProperties)
{
if (item.Key.Equals(key, StringComparison.OrdinalIgnoreCase))
{
headerValues.Add(item.Value as string);
}
}

return headerValues;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2020 New Relic, Inc. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

using System;
using System.Threading.Tasks;
using NewRelic.Agent.Api;
using NewRelic.Agent.Extensions.Providers.Wrapper;
using NewRelic.Reflection;

namespace NewRelic.Providers.Wrapper.AzureServiceBus
{
public class AzureServiceBusReceiverManagerWrapper : AzureServiceBusWrapperBase
{
private Func<object, object> _receiverAccessor;
public override bool IsTransactionRequired => true;

public override CanWrapResponse CanWrap(InstrumentedMethodInfo instrumentedMethodInfo)
{
var canWrap = instrumentedMethodInfo.RequestedWrapperName.Equals(nameof(AzureServiceBusReceiverManagerWrapper));
return new CanWrapResponse(canWrap);
}

public override AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall instrumentedMethodCall, IAgent agent, ITransaction transaction)
{
var receiverManager = instrumentedMethodCall.MethodCall.InvocationTarget;
_receiverAccessor ??= VisibilityBypasser.Instance.GeneratePropertyAccessor<object>(receiverManager.GetType(), "Receiver");
dynamic receiver = _receiverAccessor(receiverManager);

string queueName = receiver.EntityPath; // some-queue-name
string fqns = receiver.FullyQualifiedNamespace; // some-service-bus-entity.servicebus.windows.net

if (instrumentedMethodCall.IsAsync)
transaction.AttachToAsync();

// start a new MessageBroker segment that wraps ProcessOneMessageWithinScopeAsync
var segment = transaction.StartMessageBrokerSegment(
instrumentedMethodCall.MethodCall,
MessageBrokerDestinationType.Queue,
MessageBrokerAction.Process, // TODO: This is a new action, added for this instrumentation
BrokerVendorName,
queueName,
serverAddress: fqns);

return instrumentedMethodCall.IsAsync
?
Delegates.GetAsyncDelegateFor<Task>(
agent,
segment,
false,
onComplete: _ =>
{
segment.End();
transaction.End();
}, TaskContinuationOptions.ExecuteSynchronously)
:
Delegates.GetDelegateFor(onComplete: () =>
{
segment.End();
transaction.End();
});
}
}
}
Loading
Loading