33package com .azure .cosmos .implementation ;
44
55import com .azure .core .util .Context ;
6+ import com .azure .core .util .tracing .SpanKind ;
7+ import com .azure .core .util .tracing .StartSpanOptions ;
68import com .azure .core .util .tracing .Tracer ;
79import com .azure .cosmos .BridgeInternal ;
810import com .azure .cosmos .ConsistencyLevel ;
2123import org .HdrHistogram .ConcurrentDoubleHistogram ;
2224import org .slf4j .Logger ;
2325import org .slf4j .LoggerFactory ;
26+
27+ import reactor .core .CoreSubscriber ;
28+ import reactor .core .publisher .Flux ;
2429import reactor .core .publisher .Mono ;
30+ import reactor .core .publisher .Operators ;
2531import reactor .core .publisher .Signal ;
32+ import reactor .util .context .ContextView ;
2633
2734import java .time .Duration ;
2835import java .time .OffsetDateTime ;
3239import java .util .Map ;
3340import java .util .Objects ;
3441import java .util .Optional ;
35- import java .util .concurrent .atomic .AtomicReference ;
3642import java .util .function .Function ;
3743
3844import static com .azure .core .util .tracing .Tracer .AZ_TRACING_NAMESPACE_KEY ;
@@ -53,14 +59,47 @@ public class TracerProvider {
5359 public static final String RESOURCE_PROVIDER_NAME = "Microsoft.DocumentDB" ;
5460 public final Duration CRUD_THRESHOLD_FOR_DIAGNOSTICS = Duration .ofMillis (100 );
5561 public final Duration QUERY_THRESHOLD_FOR_DIAGNOSTICS = Duration .ofMillis (500 );
62+
63+ private static final String REACTOR_TRACING_CONTEXT_KEY = "tracing-context" ;
64+ private static final Object DUMMY_VALUE = new Object ();
65+ private final Mono <Object > propagatingMono ;
66+ private final Flux <Object > propagatingFlux ;
5667 public TracerProvider (Tracer tracer ) {
5768 this .tracer = tracer ;
69+ this .propagatingMono = new PropagatingMono ();
70+ this .propagatingFlux = new PropagatingFlux ();
5871 }
5972
6073 public boolean isEnabled () {
6174 return tracer != null ;
6275 }
6376
77+ /**
78+ * Gets {@link Context} from Reactor {@link ContextView}.
79+ *
80+ * @param reactorContext Reactor context instance.
81+ * @return {@link Context} from reactor context or null if not present.
82+ */
83+ public static Context getContextFromReactorOrNull (ContextView reactorContext ) {
84+ Object context = reactorContext .getOrDefault (REACTOR_TRACING_CONTEXT_KEY , null );
85+
86+ if (context != null && context instanceof Context ) {
87+ return (Context ) context ;
88+ }
89+
90+ return null ;
91+ }
92+
93+ /**
94+ * Stores {@link Context} in Reactor {@link reactor.util.context.Context}.
95+ *
96+ * @param traceContext {@link Context} context with trace context to store.
97+ * @return {@link reactor.util.context.Context} Reactor context with trace context.
98+ */
99+ public static reactor .util .context .Context setContextInReactor (Context traceContext ) {
100+ return reactor .util .context .Context .of (REACTOR_TRACING_CONTEXT_KEY , traceContext );
101+ }
102+
64103 /**
65104 * For each tracer plugged into the SDK a new tracing span is created.
66105 * <p>
@@ -73,16 +112,18 @@ public boolean isEnabled() {
73112 */
74113 public Context startSpan (String methodName , String databaseId , String endpoint , Context context ) {
75114 Context local = Objects .requireNonNull (context , "'context' cannot be null." );
76- local = local .addData (AZ_TRACING_NAMESPACE_KEY , RESOURCE_PROVIDER_NAME );
77- local = tracer .start (methodName , local ); // start the span and return the started span
115+
116+ StartSpanOptions spanOptions = new StartSpanOptions (SpanKind .CLIENT )
117+ .setAttribute (AZ_TRACING_NAMESPACE_KEY , RESOURCE_PROVIDER_NAME )
118+ .setAttribute (DB_TYPE , DB_TYPE_VALUE )
119+ .setAttribute (TracerProvider .DB_URL , endpoint )
120+ .setAttribute (TracerProvider .DB_STATEMENT , methodName );
78121 if (databaseId != null ) {
79- tracer .setAttribute (TracerProvider .DB_INSTANCE , databaseId , local );
122+ spanOptions .setAttribute (TracerProvider .DB_INSTANCE , databaseId );
80123 }
81124
82- tracer .setAttribute (TracerProvider .DB_TYPE , DB_TYPE_VALUE , local );
83- tracer .setAttribute (TracerProvider .DB_URL , endpoint , local );
84- tracer .setAttribute (TracerProvider .DB_STATEMENT , methodName , local );
85- return local ;
125+ // start the span and return the started span
126+ return tracer .start (methodName , spanOptions , local );
86127 }
87128
88129 /**
@@ -106,17 +147,19 @@ public void addEvent(String name, Map<String, Object> attributes, OffsetDateTime
106147 * Given a context containing the current tracing span the span is marked completed with status info from
107148 * {@link Signal}. For each tracer plugged into the SDK the current tracing span is marked as completed.
108149 *
109- * @param context Additional metadata that is passed through the call stack.
110150 * @param signal The signal indicates the status and contains the metadata we need to end the tracing span.
111151 */
112- public <T extends CosmosResponse <? extends Resource >> void endSpan (Context context ,
113- Signal <T > signal ,
114- int statusCode ) {
115- Objects .requireNonNull (context , "'context' cannot be null." );
152+ public <T > void endSpan (Signal <T > signal , int statusCode ) {
116153 Objects .requireNonNull (signal , "'signal' cannot be null." );
117154
155+ Context context = getContextFromReactorOrNull (signal .getContextView ());
156+ if (context == null ) {
157+ return ;
158+ }
159+
118160 switch (signal .getType ()) {
119161 case ON_COMPLETE :
162+ case ON_NEXT :
120163 end (statusCode , null , context );
121164 break ;
122165 case ON_ERROR :
@@ -133,7 +176,7 @@ public <T extends CosmosResponse<? extends Resource>> void endSpan(Context conte
133176 end (statusCode , throwable , context );
134177 break ;
135178 default :
136- // ON_SUBSCRIBE and ON_NEXT don 't have the information to end the span so just return.
179+ // ON_SUBSCRIBE isn 't the right state to end span
137180 break ;
138181 }
139182 }
@@ -190,6 +233,20 @@ public <T> Mono<CosmosItemResponse<T>> traceEnabledCosmosItemResponsePublisher(M
190233 thresholdForDiagnosticsOnTracer );
191234 }
192235
236+ /**
237+ * Runs given {@code Flux<T>} publisher in the scope of trace context passed in using
238+ * {@link TracerProvider#setContextInReactor(Context, reactor.util.context.Context)} in {@code contextWrite}
239+ * Populates active trace context on Reactor's hot path. Reactor's instrumentation for OpenTelemetry
240+ * (or other hypothetical solution) will take care of the cold path.
241+ *
242+ * @param publisher publisher to run.
243+ * @return wrapped publisher.
244+ */
245+ public <T > Flux <T > runUnderSpanInContext (Flux <T > publisher ) {
246+ return propagatingFlux
247+ .flatMap (ignored -> publisher );
248+ }
249+
193250 private <T > Mono <T > traceEnabledPublisher (Mono <T > resultPublisher ,
194251 Context context ,
195252 String spanName ,
@@ -198,41 +255,55 @@ private <T> Mono<T> traceEnabledPublisher(Mono<T> resultPublisher,
198255 Function <T , Integer > statusCodeFunc ,
199256 Function <T , CosmosDiagnostics > diagnosticFunc ,
200257 Duration thresholdForDiagnosticsOnTracer ) {
201- final AtomicReference <Context > parentContext = new AtomicReference <>(Context .NONE );
258+
259+ if (!isEnabled ()) {
260+ return resultPublisher ;
261+ }
262+
202263 Optional <Object > callDepth = context .getData (COSMOS_CALL_DEPTH );
203264 final boolean isNestedCall = callDepth .isPresent ();
204- return resultPublisher
205- .doOnSubscribe (ignoredValue -> {
206- if (isEnabled () && !isNestedCall ) {
207- parentContext .set (this .startSpan (spanName , databaseId , endpoint ,
208- context ));
209- }
210- }).doOnSuccess (response -> {
211- if (isEnabled () && !isNestedCall ) {
212- CosmosDiagnostics cosmosDiagnostics = diagnosticFunc .apply (response );
213- try {
214- Duration threshold = thresholdForDiagnosticsOnTracer ;
215- if (threshold == null ) {
216- threshold = CRUD_THRESHOLD_FOR_DIAGNOSTICS ;
217- }
265+ if (isNestedCall ) {
266+ return resultPublisher ;
267+ }
218268
219- if (cosmosDiagnostics != null
220- && cosmosDiagnostics .getDuration () != null
221- && cosmosDiagnostics .getDuration ().compareTo (threshold ) > 0 ) {
222- addDiagnosticsOnTracerEvent (cosmosDiagnostics , parentContext .get ());
269+ // propagatingMono ensures active span is propagated to the `resultPublisher`
270+ // subscription and hot path. OpenTelemetry reactor's instrumentation will
271+ // propagate it on the cold path.
272+ return propagatingMono
273+ .flatMap (ignored -> resultPublisher )
274+ .doOnEach (signal -> {
275+ switch (signal .getType ()) {
276+ case ON_NEXT :
277+ T response = signal .get ();
278+ Context traceContext = getContextFromReactorOrNull (signal .getContextView ());
279+ CosmosDiagnostics cosmosDiagnostics = diagnosticFunc .apply (response );
280+ try {
281+ Duration threshold = thresholdForDiagnosticsOnTracer ;
282+ if (threshold == null ) {
283+ threshold = CRUD_THRESHOLD_FOR_DIAGNOSTICS ;
284+ }
285+
286+ if (cosmosDiagnostics != null
287+ && cosmosDiagnostics .getDuration () != null
288+ && cosmosDiagnostics .getDuration ().compareTo (threshold ) > 0 ) {
289+ addDiagnosticsOnTracerEvent (cosmosDiagnostics , traceContext );
290+ }
291+ } catch (JsonProcessingException ex ) {
292+ LOGGER .warn ("Error while serializing diagnostics for tracer" , ex .getMessage ());
223293 }
224- } catch (JsonProcessingException ex ) {
225- LOGGER .warn ("Error while serializing diagnostics for tracer" , ex .getMessage ());
226- }
227- this .endSpan (parentContext .get (), Signal .complete (), statusCodeFunc .apply (response ));
228- }
229- }).doOnError (throwable -> {
230- if (isEnabled () && !isNestedCall ) {
231- // not adding diagnostics on trace event for exception as this information is already there as
232- // part of exception message
233- this .endSpan (parentContext .get (), Signal .error (throwable ), ERROR_CODE );
234- }
235- });
294+
295+ this .endSpan (signal , statusCodeFunc .apply (response ));
296+ break ;
297+ case ON_ERROR :
298+ // not adding diagnostics on trace event for exception as this information is already there as
299+ // part of exception message
300+ this .endSpan (signal , ERROR_CODE );
301+ break ;
302+ default :
303+ break ;
304+ }})
305+ .contextWrite (setContextInReactor (this .startSpan (spanName , databaseId , endpoint ,
306+ context )));
236307 }
237308
238309 private <T > Mono <T > publisherWithClientTelemetry (Mono <T > resultPublisher ,
@@ -364,7 +435,7 @@ private ReportPayload createReportPayload(CosmosAsyncClient cosmosAsyncClient,
364435 }
365436
366437 private void addDiagnosticsOnTracerEvent (CosmosDiagnostics cosmosDiagnostics , Context context ) throws JsonProcessingException {
367- if (cosmosDiagnostics == null ) {
438+ if (cosmosDiagnostics == null || context == null ) {
368439 return ;
369440 }
370441
@@ -499,4 +570,52 @@ private void addDiagnosticsOnTracerEvent(CosmosDiagnostics cosmosDiagnostics, Co
499570 this .addEvent ("ClientCfgs" , attributes ,
500571 OffsetDateTime .ofInstant (clientSideRequestStatistics .getRequestStartTimeUTC (), ZoneOffset .UTC ), context );
501572 }
573+
574+ private static void subscribe (Tracer tracer , CoreSubscriber <? super Object > actual ) {
575+ Context context = getContextFromReactorOrNull (actual .currentContext ());
576+ if (context != null ) {
577+ AutoCloseable scope = tracer .makeSpanCurrent (context );
578+ try {
579+ actual .onSubscribe (Operators .scalarSubscription (actual , DUMMY_VALUE ));
580+ } finally {
581+ try {
582+ scope .close ();
583+ } catch (Exception e ) {
584+ // can't happen
585+ }
586+ }
587+ } else {
588+ actual .onSubscribe (Operators .scalarSubscription (actual , DUMMY_VALUE ));
589+ }
590+ }
591+
592+ /**
593+ * Helper class allowing running Mono subscription (and anything on the hot path)
594+ * in scope of trace context. This enables OpenTelemetry auto-collection
595+ * to pick it up and correlate lower levels of instrumentation and logs
596+ * to logical Cosmos spans.
597+ *
598+ * OpenTelemetry reactor auto-instrumentation will take care of the cold path.
599+ */
600+ private final class PropagatingMono extends Mono <Object > {
601+ @ Override
602+ public void subscribe (CoreSubscriber <? super Object > actual ) {
603+ TracerProvider .subscribe (tracer , actual );
604+ }
605+ }
606+
607+ /**
608+ * Helper class allowing running Flux subscription (and anything on the hot path)
609+ * in scope of trace context. This enables OpenTelemetry auto-collection
610+ * to pick it up and correlate lower levels of instrumentation and logs
611+ * to logical Cosmos spans.
612+ *
613+ * OpenTelemetry reactor auto-instrumentation will take care of the cold path.
614+ */
615+ private final class PropagatingFlux extends Flux <Object > {
616+ @ Override
617+ public void subscribe (CoreSubscriber <? super Object > actual ) {
618+ TracerProvider .subscribe (tracer , actual );
619+ }
620+ }
502621}
0 commit comments