Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,727 changes: 961 additions & 766 deletions docs/span_attribute_schema/v0.md

Large diffs are not rendered by default.

2,363 changes: 1,303 additions & 1,060 deletions docs/span_attribute_schema/v1.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,38 @@ internal static class ConsumerCache
// A map between Kafka Consumer<TKey,TValue> and the corresponding consumer group
private static readonly ConditionalWeakTable<object, string> ConsumerToGroupIdMap = new();
private static readonly ConditionalWeakTable<object, string> ConsumerToBootstrapServersMap = new();
private static readonly ConditionalWeakTable<object, string> ConsumerToClusterIdMap = new();
private static readonly ConditionalWeakTable<object, Type?> ConsumerToOffsetsCommittedHandlerMap = new();

public static void SetConsumerGroup(object consumer, string groupId, string bootstrapServers)
public static void SetConsumerGroup(object consumer, string groupId, string bootstrapServers, string clusterId)
{
#if NETCOREAPP3_1_OR_GREATER
ConsumerToGroupIdMap.AddOrUpdate(consumer, groupId);
ConsumerToBootstrapServersMap.AddOrUpdate(consumer, bootstrapServers);
ConsumerToClusterIdMap.AddOrUpdate(consumer, clusterId);
#else
ConsumerToGroupIdMap.GetValue(consumer, _ => groupId);
ConsumerToBootstrapServersMap.GetValue(consumer, _ => bootstrapServers);
ConsumerToClusterIdMap.GetValue(consumer, _ => clusterId);
#endif
}

public static bool TryGetConsumerGroup(object consumer, out string? groupId, out string? bootstrapServers)
public static bool TryGetConsumerGroup(object consumer, out string? groupId, out string? bootstrapServers, out string? clusterId)
{
groupId = null;
bootstrapServers = null;
clusterId = null;

return ConsumerToGroupIdMap.TryGetValue(consumer, out groupId) && ConsumerToBootstrapServersMap.TryGetValue(consumer, out bootstrapServers);
return ConsumerToGroupIdMap.TryGetValue(consumer, out groupId)
&& ConsumerToBootstrapServersMap.TryGetValue(consumer, out bootstrapServers)
&& ConsumerToClusterIdMap.TryGetValue(consumer, out clusterId);
}

public static void RemoveConsumerGroup(object consumer)
{
ConsumerToGroupIdMap.Remove(consumer);
ConsumerToBootstrapServersMap.Remove(consumer);
ConsumerToClusterIdMap.Remove(consumer);
ConsumerToOffsetsCommittedHandlerMap.Remove(consumer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// <copyright file="IAdminClient.cs" company="Datadog">
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>

#nullable enable

using System;
using Datadog.Trace.DuckTyping;

namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.Kafka;

/// <summary>
/// Duck Type for Confluent.Kafka.IAdminClient
/// </summary>
internal interface IAdminClient : IDuckType, IDisposable
{
/// <summary>
/// Describes the cluster metadata including the cluster ID
/// </summary>
/// <param name="options">Optional configuration options</param>
/// <returns>Duck typed task containing the cluster description result</returns>
IDuckTypeTask<IDescribeClusterResult> DescribeClusterAsync(IDescribeClusterOptions? options = null);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// <copyright file="IAdminClientBuilder.cs" company="Datadog">
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>

#nullable enable

namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.Kafka;

/// <summary>
/// Duck Type for Confluent.Kafka.AdminClientBuilder
/// Used to build an AdminClient for cluster_id extraction
/// </summary>
internal interface IAdminClientBuilder
{
IAdminClient Build();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// <copyright file="IAdminClientConfig.cs" company="Datadog">
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>

#nullable enable

namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.Kafka;

/// <summary>
/// Duck Type for Confluent.Kafka.AdminClientConfig
/// Used to create an AdminClient for cluster_id extraction
/// </summary>
internal interface IAdminClientConfig
{
string BootstrapServers { get; set; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// <copyright file="IAdminClientExtensions.cs" company="Datadog">
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>

#nullable enable

using System;
using System.Threading.Tasks;
using Datadog.Trace.DuckTyping;

namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.Kafka;

/// <summary>
/// Duck Type for Confluent.Kafka.IAdminClientExtensions
/// We only need this to get the underlying instance for reflection calls to extension methods
/// </summary>
internal interface IAdminClientExtensions : IDuckType
{
Task<IDescribeClusterResult> DescribeClusterAsync(IAdminClient adminClient, IDescribeClusterOptions? options = null);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// <copyright file="IDescribeClusterOptions.cs" company="Datadog">
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>

#nullable enable

namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.Kafka;

/// <summary>
/// Duck Type for Confluent.Kafka.Admin.DescribeClusterOptions
/// Used as an optional parameter for DescribeClusterAsync extension method
/// </summary>
internal interface IDescribeClusterOptions
{
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// <copyright file="IDescribeClusterResult.cs" company="Datadog">
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>

#nullable enable

namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.Kafka;

/// <summary>
/// Duck Type for Confluent.Kafka.Admin.DescribeClusterResult
/// Used to access ClusterId from cluster description
/// </summary>
internal interface IDescribeClusterResult
{
string ClusterId { get; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ internal static CallTargetReturn<TResponse> OnMethodEnd<TTarget, TResponse>(TTar
var dataStreams = Tracer.Instance.TracerManager.DataStreamsManager;
if (exception is null && response.Instance is not null && dataStreams.IsEnabled && instance != null)
{
ConsumerCache.TryGetConsumerGroup(instance, out var groupId, out var _);
ConsumerCache.TryGetConsumerGroup(instance, out var groupId, out var _, out var _);

for (var i = 0; i < response.Count; i++)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ internal static CallTargetReturn OnMethodEnd<TTarget>(TTarget instance, Exceptio
var dataStreams = Tracer.Instance.TracerManager.DataStreamsManager;
if (exception is null && state.State is IEnumerable<object> offsets && dataStreams.IsEnabled && instance != null)
{
ConsumerCache.TryGetConsumerGroup(instance, out var groupId, out var _);
ConsumerCache.TryGetConsumerGroup(instance, out var groupId, out var _, out var _);

foreach (var offset in offsets)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
using System.ComponentModel;
using Datadog.Trace.ClrProfiler.CallTarget;
using Datadog.Trace.DuckTyping;
using Datadog.Trace.Logging;
using Datadog.Trace.Util.Delegates;
using Console = System.Console;

namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.Kafka;

Expand All @@ -27,6 +29,8 @@ namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.Kafka;
[EditorBrowsable(EditorBrowsableState.Never)]
public class KafkaConsumerConstructorIntegration
{
private static readonly IDatadogLogger Log = DatadogLogging.GetLoggerFor(typeof(KafkaConsumerConstructorIntegration));

internal static CallTargetState OnMethodBegin<TTarget, TConsumerBuilder>(TTarget instance, TConsumerBuilder consumer)
where TConsumerBuilder : IConsumerBuilder
{
Expand Down Expand Up @@ -63,8 +67,22 @@ internal static CallTargetState OnMethodBegin<TTarget, TConsumerBuilder>(TTarget
// Only config setting "group.id" is required, so assert that the value is non-null before adding to the ConsumerGroup cache
if (groupId is not null)
{
// Save the map between this consumer and a consumer group
ConsumerCache.SetConsumerGroup(instance, groupId, bootstrapServers);
// Extract cluster_id using AdminClient API with bootstrap servers
var clusterId = KafkaHelper.GetClusterId(bootstrapServers) ?? string.Empty;

if (!string.IsNullOrEmpty(clusterId))
{
Log.Information("ROBC: Kafka consumer config retrieved - GroupId: {GroupId}, BootstrapServers: {BootstrapServers}, ClusterId: {ClusterId}", groupId, bootstrapServers, clusterId);
Console.WriteLine($"ROBC: Kafka consumer config retrieved - GroupId: {groupId}, BootstrapServers: {bootstrapServers}, ClusterId: {clusterId}");
}
else
{
Log.Information("ROBC: Kafka consumer config retrieved but no cluster_id could be obtained - GroupId: {GroupId}, BootstrapServers: {BootstrapServers}", groupId, bootstrapServers);
Console.WriteLine($"ROBC: Kafka consumer config retrieved but no cluster_id could be obtained - GroupId: {groupId}, BootstrapServers: {bootstrapServers}");
}

// Save the map between this consumer and its metadata
ConsumerCache.SetConsumerGroup(instance, groupId, bootstrapServers, clusterId);
return new CallTargetState(scope: null, state: instance);
}
}
Expand Down
Loading
Loading