Skip to content

Commit 4908b2c

Browse files
amarzialimcculls
andauthored
Make webflux compatible with latest spring 6 (DataDog#6352)
* Make webflux compatible with latest spring 6 * webflux refactoring * enable java 17 latest dep * Update dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/asserts/TagsAssert.groovy Co-authored-by: Stuart McCulloch <[email protected]> * unformat --------- Co-authored-by: Stuart McCulloch <[email protected]>
1 parent 30789d4 commit 4908b2c

File tree

28 files changed

+328
-405
lines changed

28 files changed

+328
-405
lines changed

.circleci/config.continue.yml.j2

+14
Original file line numberDiff line numberDiff line change
@@ -1092,6 +1092,20 @@ build_test_jobs: &build_test_jobs
10921092
maxWorkers: 4
10931093
testJvm: "8"
10941094
1095+
- xlarge_tests:
1096+
requires:
1097+
- ok_to_test
1098+
- build_latestdep
1099+
name: test_17_inst_latest
1100+
gradleTarget: ":instrumentationLatestDepTest"
1101+
gradleParameters: "-PskipFlakyTests"
1102+
triggeredBy: *instrumentation_modules
1103+
stage: instrumentation
1104+
cacheType: latestdep
1105+
parallelism: 4
1106+
maxWorkers: 4
1107+
testJvm: "17"
1108+
10951109
{% if flaky %}
10961110
- tests:
10971111
requires:

dd-java-agent/agent-tooling/src/main/resources/datadog/trace/agent/tooling/bytebuddy/matcher/ignored_class_name.trie

+1
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,7 @@
258258
0 org.springframework.beans.factory.support.DefaultListableBeanFactory
259259
0 org.springframework.beans.factory.support.DisposableBeanAdapter
260260
1 org.springframework.boot.SpringApplicationShutdownHook$Handlers
261+
1 org.springframework.boot.autoconfigure.ssl.FileWatcher$WatcherThread
261262
2 org.springframework.boot.*
262263
0 org.apache.xalan.transformer.TransformerImpl
263264
# More runnables to deal with

dd-java-agent/instrumentation/spring-webflux-5/build.gradle

+8
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,14 @@ muzzle {
4848
module = "spring-webflux"
4949
versions = "[5.1.1.RELEASE,6)"
5050
}
51+
pass {
52+
name = "webflux_6"
53+
group = "org.springframework"
54+
module = "spring-webflux"
55+
versions = "[6,)"
56+
javaVersion = "17"
57+
extraDependency "io.projectreactor:reactor-core:3.6.0"
58+
}
5159
}
5260

5361
apply from: "$rootDir/gradle/java.gradle"

dd-java-agent/instrumentation/spring-webflux-5/src/bootTest/groovy/SpringWebfluxTest.groovy

+1-1
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ class SpringWebfluxTest extends AgentTestRunner {
198198
resourceName "TestController.tracedMethod"
199199
operationName "trace.annotation"
200200
}
201-
childOf(span(0)) // FIXME this is wrong
201+
childOfPrevious()
202202
errored false
203203
tags {
204204
"$Tags.COMPONENT" "trace"

dd-java-agent/instrumentation/spring-webflux-5/src/main/java/datadog/trace/instrumentation/springwebflux/client/SpringWebfluxHttpClientDecorator.java

+2-21
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,6 @@
33
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
44
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
55
import datadog.trace.bootstrap.instrumentation.decorator.HttpClientDecorator;
6-
import java.lang.invoke.MethodHandle;
7-
import java.lang.invoke.MethodHandles;
8-
import java.lang.invoke.MethodType;
96
import java.net.URI;
107
import org.springframework.web.reactive.function.client.ClientRequest;
118
import org.springframework.web.reactive.function.client.ClientResponse;
@@ -18,8 +15,6 @@ public class SpringWebfluxHttpClientDecorator
1815
public static final CharSequence CANCELLED_MESSAGE =
1916
UTF8BytesString.create("The subscription was cancelled");
2017

21-
private static final MethodHandle RAW_STATUS_CODE = findRawStatusCode();
22-
2318
public static final SpringWebfluxHttpClientDecorator DECORATE =
2419
new SpringWebfluxHttpClientDecorator();
2520

@@ -52,13 +47,8 @@ protected URI url(final ClientRequest httpRequest) {
5247

5348
@Override
5449
protected int status(final ClientResponse httpResponse) {
55-
if (null != RAW_STATUS_CODE) {
56-
try {
57-
return (int) RAW_STATUS_CODE.invokeExact(httpResponse);
58-
} catch (Throwable ignored) {
59-
}
60-
}
61-
return httpResponse.statusCode().value();
50+
final Integer code = StatusCodes.STATUS_CODE_FUNCTION.apply(httpResponse);
51+
return code != null ? code : 0;
6252
}
6353

6454
@Override
@@ -70,13 +60,4 @@ protected String getRequestHeader(ClientRequest request, String headerName) {
7060
protected String getResponseHeader(ClientResponse response, String headerName) {
7161
return response.headers().asHttpHeaders().getFirst(headerName);
7262
}
73-
74-
private static MethodHandle findRawStatusCode() {
75-
try {
76-
return MethodHandles.publicLookup()
77-
.findVirtual(ClientResponse.class, "rawStatusCode", MethodType.methodType(int.class));
78-
} catch (IllegalAccessException | NoSuchMethodException e) {
79-
return null;
80-
}
81-
}
8263
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package datadog.trace.instrumentation.springwebflux.client;
2+
3+
import java.lang.invoke.MethodHandle;
4+
import java.lang.invoke.MethodHandles;
5+
import java.lang.invoke.MethodType;
6+
import java.util.function.Function;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
import org.springframework.http.HttpStatus;
10+
import org.springframework.web.reactive.function.client.ClientResponse;
11+
12+
// Adapted from Opentelemetry
13+
public final class StatusCodes {
14+
private static final Logger LOGGER = LoggerFactory.getLogger(StatusCodes.class);
15+
public static final Function<ClientResponse, Integer> STATUS_CODE_FUNCTION =
16+
getStatusCodeFunction();
17+
18+
private static Function<ClientResponse, Integer> getStatusCodeFunction() {
19+
Function<ClientResponse, Integer> statusCodeFunction = getStatusCodeFunction60();
20+
if (statusCodeFunction == null) {
21+
statusCodeFunction = getStatusCodeFunction51();
22+
}
23+
if (statusCodeFunction == null) {
24+
statusCodeFunction = getStatusCodeFunction50();
25+
}
26+
if (statusCodeFunction == null) {
27+
LOGGER.debug(
28+
"Unable to find a status code extractor function working for the current webflux client version. "
29+
+ "Status codes will not be tagged on webflux client spans");
30+
}
31+
return statusCodeFunction;
32+
}
33+
34+
// in webflux 6.0, HttpStatusCode class was introduced, and statusCode() was changed to return
35+
// HttpStatusCode instead of HttpStatus
36+
private static Function<ClientResponse, Integer> getStatusCodeFunction60() {
37+
MethodHandle statusCode;
38+
MethodHandle value;
39+
try {
40+
Class<?> httpStatusCodeClass =
41+
Class.forName(
42+
"org.springframework.http.HttpStatusCode", false, StatusCodes.class.getClassLoader());
43+
statusCode =
44+
MethodHandles.publicLookup()
45+
.findVirtual(
46+
ClientResponse.class, "statusCode", MethodType.methodType(httpStatusCodeClass));
47+
value =
48+
MethodHandles.publicLookup()
49+
.findVirtual(httpStatusCodeClass, "value", MethodType.methodType(int.class));
50+
} catch (ClassNotFoundException | IllegalAccessException | NoSuchMethodException e) {
51+
return null;
52+
}
53+
54+
return response -> {
55+
try {
56+
Object httpStatusCode = statusCode.invoke(response);
57+
return (int) value.invoke(httpStatusCode);
58+
} catch (Throwable e) {
59+
return null;
60+
}
61+
};
62+
}
63+
64+
// in webflux 5.1, rawStatusCode() was introduced to retrieve the exact status code
65+
// note: rawStatusCode() was deprecated in 6.0
66+
private static Function<ClientResponse, Integer> getStatusCodeFunction51() {
67+
MethodHandle rawStatusCode;
68+
try {
69+
rawStatusCode =
70+
MethodHandles.publicLookup()
71+
.findVirtual(ClientResponse.class, "rawStatusCode", MethodType.methodType(int.class));
72+
} catch (IllegalAccessException | NoSuchMethodException e) {
73+
return null;
74+
}
75+
76+
return response -> {
77+
try {
78+
return (int) rawStatusCode.invoke(response);
79+
} catch (Throwable e) {
80+
return null;
81+
}
82+
};
83+
}
84+
85+
// in webflux 5.0, statusCode() returns HttpStatus, which only represents standard status codes
86+
// (there's no way to capture arbitrary status codes)
87+
private static Function<ClientResponse, Integer> getStatusCodeFunction50() {
88+
MethodHandle statusCode;
89+
MethodHandle value;
90+
try {
91+
statusCode =
92+
MethodHandles.publicLookup()
93+
.findVirtual(
94+
ClientResponse.class, "statusCode", MethodType.methodType(HttpStatus.class));
95+
value =
96+
MethodHandles.publicLookup()
97+
.findVirtual(HttpStatus.class, "value", MethodType.methodType(int.class));
98+
} catch (IllegalAccessException | NoSuchMethodException e) {
99+
return null;
100+
}
101+
102+
return response -> {
103+
try {
104+
Object httpStatusCode = statusCode.invoke(response);
105+
return (int) value.invoke(httpStatusCode);
106+
} catch (Throwable e) {
107+
return null;
108+
}
109+
};
110+
}
111+
112+
private StatusCodes() {}
113+
}

dd-java-agent/instrumentation/spring-webflux-5/src/main/java/datadog/trace/instrumentation/springwebflux/client/WebClientFilterInstrumentation.java

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public WebClientFilterInstrumentation() {
2020
public String[] helperClassNames() {
2121
return new String[] {
2222
packageName + ".SpringWebfluxHttpClientDecorator",
23+
packageName + ".StatusCodes",
2324
packageName + ".TraceWebClientSubscriber",
2425
packageName + ".WebClientTracingFilter",
2526
packageName + ".WebClientTracingFilter$MonoWebClientTrace",

dd-java-agent/instrumentation/spring-webflux-5/src/main/java/datadog/trace/instrumentation/springwebflux/server/AbstractWebfluxInstrumentation.java

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ public String[] helperClassNames() {
1313
return new String[] {
1414
packageName + ".SpringWebfluxHttpServerDecorator",
1515
packageName + ".AdviceUtils",
16+
packageName + ".AdviceUtils$SpanSubscriber",
1617
packageName + ".AdviceUtils$SpanFinishingSubscriber",
1718
packageName + ".RouteOnSuccessOrError"
1819
};

dd-java-agent/instrumentation/spring-webflux-5/src/main/java/datadog/trace/instrumentation/springwebflux/server/AdviceUtils.java

+51-21
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ public static <T> Mono<T> setPublisherSpan(Mono<T> mono, AgentSpan span) {
4343
return mono.<T>transform(finishSpanNextOrError(span));
4444
}
4545

46+
public static <T> Mono<T> wrapMonoWithScope(Mono<T> mono, AgentSpan span) {
47+
return mono.<T>transform(wrapPublisher(span));
48+
}
49+
4650
/**
4751
* Idea for this has been lifted from https://github.com/reactor/reactor-core/issues/947. Newer
4852
* versions of reactor-core have easier way to access context but we want to support older
@@ -54,23 +58,18 @@ public static <T> Mono<T> setPublisherSpan(Mono<T> mono, AgentSpan span) {
5458
(scannable, subscriber) -> new SpanFinishingSubscriber<>(subscriber, span));
5559
}
5660

57-
/**
58-
* This makes sure any callback is wrapped in suspend/resume checkpoints. Otherwise, we may end up
59-
* executing these callbacks in different threads without being resumed first.
60-
*/
61-
private static final class SpanFinishingSubscriber<T> implements CoreSubscriber<T>, Subscription {
61+
private static <T> Function<? super Publisher<T>, ? extends Publisher<T>> wrapPublisher(
62+
AgentSpan span) {
63+
return Operators.lift((scannable, subscriber) -> new SpanSubscriber<>(subscriber, span));
64+
}
6265

66+
private static class SpanSubscriber<T> implements CoreSubscriber<T>, Subscription {
6367
private final CoreSubscriber<? super T> subscriber;
64-
private final AgentSpan span;
68+
protected final AgentSpan span;
6569
private final Context context;
6670
private volatile Subscription subscription;
67-
private volatile int completed;
6871

69-
@SuppressWarnings("rawtypes")
70-
private static final AtomicIntegerFieldUpdater<SpanFinishingSubscriber> COMPLETED =
71-
AtomicIntegerFieldUpdater.newUpdater(SpanFinishingSubscriber.class, "completed");
72-
73-
public SpanFinishingSubscriber(CoreSubscriber<? super T> subscriber, AgentSpan span) {
72+
public SpanSubscriber(CoreSubscriber<? super T> subscriber, AgentSpan span) {
7473
this.subscriber = subscriber;
7574
this.span = span;
7675
this.context = subscriber.currentContext().put(AgentSpan.class, span);
@@ -93,19 +92,11 @@ public void onNext(T t) {
9392

9493
@Override
9594
public void onError(Throwable t) {
96-
if (null != span && COMPLETED.compareAndSet(this, 0, 1)) {
97-
span.setError(true);
98-
span.addThrowable(t);
99-
span.finish();
100-
}
10195
subscriber.onError(t);
10296
}
10397

10498
@Override
10599
public void onComplete() {
106-
if (null != span && COMPLETED.compareAndSet(this, 0, 1)) {
107-
span.finish();
108-
}
109100
subscriber.onComplete();
110101
}
111102

@@ -119,12 +110,51 @@ public void request(long n) {
119110
subscription.request(n);
120111
}
121112

113+
@Override
114+
public void cancel() {
115+
subscription.cancel();
116+
}
117+
}
118+
119+
/**
120+
* This makes sure any callback is wrapped in suspend/resume checkpoints. Otherwise, we may end up
121+
* executing these callbacks in different threads without being resumed first.
122+
*/
123+
private static final class SpanFinishingSubscriber<T> extends SpanSubscriber<T> {
124+
private volatile int completed;
125+
126+
@SuppressWarnings("rawtypes")
127+
private static final AtomicIntegerFieldUpdater<SpanFinishingSubscriber> COMPLETED =
128+
AtomicIntegerFieldUpdater.newUpdater(SpanFinishingSubscriber.class, "completed");
129+
130+
public SpanFinishingSubscriber(CoreSubscriber<? super T> subscriber, AgentSpan span) {
131+
super(subscriber, span);
132+
}
133+
134+
@Override
135+
public void onError(Throwable t) {
136+
if (null != span && COMPLETED.compareAndSet(this, 0, 1)) {
137+
span.setError(true);
138+
span.addThrowable(t);
139+
span.finish();
140+
}
141+
super.onError(t);
142+
}
143+
144+
@Override
145+
public void onComplete() {
146+
if (null != span && COMPLETED.compareAndSet(this, 0, 1)) {
147+
span.finish();
148+
}
149+
super.onComplete();
150+
}
151+
122152
@Override
123153
public void cancel() {
124154
if (null != span && COMPLETED.compareAndSet(this, 0, 1)) {
125155
span.finish();
126156
}
127-
subscription.cancel();
157+
super.cancel();
128158
}
129159
}
130160
}

dd-java-agent/instrumentation/spring-webflux-5/src/main/java/datadog/trace/instrumentation/springwebflux/server/DispatcherHandlerInstrumentation.java

+5
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,10 @@ public void adviceTransformations(AdviceTransformation transformation) {
2828
.and(takesArguments(1)),
2929
// Cannot reference class directly here because it would lead to class load failure on Java7
3030
packageName + ".DispatcherHandlerAdvice");
31+
transformation.applyAdvice(
32+
isMethod()
33+
.and(named("handleResult"))
34+
.and(takesArgument(0, named("org.springframework.web.server.ServerWebExchange"))),
35+
packageName + ".HandleResultAdvice");
3136
}
3237
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package datadog.trace.instrumentation.springwebflux.server;
2+
3+
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
4+
import net.bytebuddy.asm.Advice;
5+
import org.springframework.web.server.ServerWebExchange;
6+
import reactor.core.publisher.Mono;
7+
8+
public class HandleResultAdvice {
9+
@Advice.OnMethodExit(suppress = Throwable.class)
10+
public static void methodExit(
11+
@Advice.Argument(0) ServerWebExchange exchange,
12+
@Advice.Return(readOnly = false) Mono<Void> mono) {
13+
final AgentSpan span = exchange.getAttribute(AdviceUtils.SPAN_ATTRIBUTE);
14+
if (span != null && mono != null) {
15+
mono = AdviceUtils.wrapMonoWithScope(mono, span);
16+
}
17+
}
18+
}

0 commit comments

Comments
 (0)