Skip to content

Commit

Permalink
[pinpoint-apm#11980] Add async nested of reactor plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
jaehong-kim committed Jan 20, 2025
1 parent f5efe2a commit 20739d8
Show file tree
Hide file tree
Showing 10 changed files with 283 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,5 @@ public interface Trace extends StackOperation {

TraceScope addScope(String name);

TraceScope addBoundaryScope(String name);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Copyright 2017 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.navercorp.pinpoint.bootstrap.interceptor;

import com.navercorp.pinpoint.bootstrap.context.AsyncContext;
import com.navercorp.pinpoint.bootstrap.context.SpanEventRecorder;
import com.navercorp.pinpoint.bootstrap.context.Trace;
import com.navercorp.pinpoint.bootstrap.context.TraceContext;
import com.navercorp.pinpoint.bootstrap.context.scope.TraceScope;
import com.navercorp.pinpoint.bootstrap.util.ScopeUtils;

import java.util.Objects;

public abstract class AsyncContextSpanEventNestedApiIdAwareAroundInterceptor extends AbstractAsyncContextSpanEventInterceptor implements ApiIdAwareAroundInterceptor {

private final String traceScopeName;

public AsyncContextSpanEventNestedApiIdAwareAroundInterceptor(TraceContext traceContext, String traceScopeName) {
super(traceContext);
this.traceScopeName = Objects.requireNonNull(traceScopeName, "traceScopeName");
}

@Override
public void before(Object target, int apiId, Object[] args) {
if (isDebug) {
logger.beforeInterceptor(target, args);
}

final AsyncContext asyncContext = getAsyncContext(target, args);
if (asyncContext == null) {
return;
}

final Trace trace = getAsyncTrace(asyncContext);
if (trace == null) {
return;
}

// try entry API.
if (Boolean.FALSE == tryEnter(trace)) {
// skip nested API.
if (isDebug) {
logger.debug("Skip nested async API.before(), traceScopeName={}", traceScopeName);
}
return;
}

// entry scope.
ScopeUtils.entryAsyncTraceScope(trace);

try {
// trace event for default & async.
final SpanEventRecorder recorder = trace.traceBlockBegin();
beforeTrace(asyncContext, trace, recorder, target, apiId, args);
doInBeforeTrace(recorder, asyncContext, target, apiId, args);
} catch (Throwable th) {
if (logger.isWarnEnabled()) {
logger.warn("BEFORE. Caused:{}", th.getMessage(), th);
}
}
}

protected void beforeTrace(final AsyncContext asyncContext, final Trace trace, final SpanEventRecorder recorder, final Object target, final int apiId, final Object[] args) {
}

protected abstract void doInBeforeTrace(SpanEventRecorder recorder, AsyncContext asyncContext, Object target, int apiId, Object[] args);

@Override
public void after(Object target, int apiId, Object[] args, Object result, Throwable throwable) {
if (isDebug) {
logger.afterInterceptor(target, args, result, throwable);
}

final AsyncContext asyncContext = getAsyncContext(target, args, result, throwable);
if (asyncContext == null) {
if (isTrace) {
logger.trace("AsyncContext not found");
}
return;
}

final Trace trace = asyncContext.currentAsyncTraceObject();
if (trace == null) {
return;
}

if (Boolean.FALSE == canLeave(trace)) {
// skip nested API.
if (isDebug) {
logger.debug("Skip nested async API.after(), traceScopeName={}", traceScopeName);
}
return;
}

// leave scope.
if (!ScopeUtils.leaveAsyncTraceScope(trace)) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to leave scope of async trace {}.", trace);
}
// delete unstable trace.
deleteAsyncContext(trace, asyncContext);
return;
}

try {
final SpanEventRecorder recorder = trace.currentSpanEventRecorder();
afterTrace(asyncContext, trace, recorder, target, apiId, args, result, throwable);
doInAfterTrace(recorder, target, apiId, args, result, throwable);
} catch (Throwable th) {
if (logger.isWarnEnabled()) {
logger.warn("AFTER error. Caused:{}", th.getMessage(), th);
}
} finally {
trace.traceBlockEnd();
if (ScopeUtils.isAsyncTraceEndScope(trace)) {
deleteAsyncContext(trace, asyncContext);
}
}
}

protected void afterTrace(final AsyncContext asyncContext, final Trace trace, final SpanEventRecorder recorder, final Object target, int apiId, final Object[] args, final Object result, final Throwable throwable) {
}

protected abstract void doInAfterTrace(SpanEventRecorder recorder, Object target, int apiId, Object[] args, Object result, Throwable throwable);

public boolean tryEnter(final Trace trace) {
TraceScope scope = trace.getScope(traceScopeName);
if (scope == null) {
trace.addBoundaryScope(traceScopeName);
scope = trace.getScope(traceScopeName);
}
if (scope != null) {
return scope.tryEnter();
}
return false;
}

public boolean canLeave(final Trace trace) {
TraceScope scope = trace.getScope(traceScopeName);
if (scope != null) {
if (scope.canLeave()) {
scope.leave();
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@
import com.navercorp.pinpoint.bootstrap.context.SpanEventRecorder;
import com.navercorp.pinpoint.bootstrap.context.Trace;
import com.navercorp.pinpoint.bootstrap.context.TraceContext;
import com.navercorp.pinpoint.bootstrap.interceptor.AsyncContextSpanEventApiIdAwareAroundInterceptor;
import com.navercorp.pinpoint.bootstrap.interceptor.AsyncContextSpanEventNestedApiIdAwareAroundInterceptor;
import com.navercorp.pinpoint.common.trace.ServiceType;

public class CoreSubscriberOnNextInterceptor extends AsyncContextSpanEventApiIdAwareAroundInterceptor {
public class CoreSubscriberOnNextInterceptor extends AsyncContextSpanEventNestedApiIdAwareAroundInterceptor {
public static final String REACTOR_ON_NEXT_TRACE_SCOPE = "##REACTOR_ON_NEXT_TRACE_SCOPE";
private final ServiceType serviceType;

public CoreSubscriberOnNextInterceptor(TraceContext traceContext, ServiceType serviceType) {
super(traceContext);
super(traceContext, REACTOR_ON_NEXT_TRACE_SCOPE);
this.serviceType = serviceType;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,7 @@ public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, Strin
}
final InstrumentMethod onNextMethod = target.getDeclaredMethod("onNext", "java.lang.Object");
if (onNextMethod != null) {
onNextMethod.addScopedInterceptor(CoreSubscriberOnNextInterceptor.class, va(ReactorConstants.REACTOR), "CoreSubscriberOnNext");
onNextMethod.addInterceptor(CoreSubscriberOnNextInterceptor.class, va(ReactorConstants.REACTOR));
}

return target.toBytecode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,14 @@ public TraceScope addScope(String name) {
return scopePool.add(name);
}

@Override
public TraceScope addBoundaryScope(String name) {
if (scopePool == null) {
this.scopePool = new DefaultTraceScopePool();
}
return scopePool.addBoundary(name);
}

@Override
public String toString() {
return "ChildTrace{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,13 @@ public TraceScope addScope(String name) {
return scopePool.add(name);
}

@Override
public TraceScope addBoundaryScope(String name) {
if (scopePool == null) {
this.scopePool = new DefaultTraceScopePool();
}
return scopePool.addBoundary(name);
}

@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,13 @@ public TraceScope addScope(String name) {
return scopePool.add(name);
}

@Override
public TraceScope addBoundaryScope(String name) {
if (scopePool == null) {
this.scopePool = new DefaultTraceScopePool();
}
return scopePool.addBoundary(name);
}

@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,4 +194,11 @@ public TraceScope addScope(String name) {
return scopePool.add(name);
}

@Override
public TraceScope addBoundaryScope(String name) {
if (scopePool == null) {
this.scopePool = new DefaultTraceScopePool();
}
return scopePool.addBoundary(name);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2015 NAVER Corp.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.navercorp.pinpoint.profiler.context.scope;

import com.navercorp.pinpoint.bootstrap.context.scope.TraceScope;

import java.util.Objects;

/**
* @author jaehong.kim
*/
public class BoundaryTraceScope implements TraceScope {
private final String name;
private int depth = 0;
private int skippedBoundary = 0;

public BoundaryTraceScope(String name) {
this.name = Objects.requireNonNull(name, "name");
}

@Override
public String getName() {
return name;
}

public boolean tryEnter() {
if (isActive()) {
skippedBoundary++;
return false;
} else {
depth++;
return true;
}
}

public boolean canLeave() {
if (skippedBoundary == 0 && depth == 1) {
return true;
} else {
skippedBoundary--;
return false;
}
}

public void leave() {
if (!isActive()) {
throw new IllegalStateException("cannot leave with trace scope. depth: " + depth);
}

if (skippedBoundary != 0 || depth != 1) {
throw new IllegalStateException("cannot leave with BOUNDARY trace scope. depth: " + depth);
}
depth--;
}

@Override
public boolean isActive() {
return depth > 0;
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("DefaultTraceScope{");
sb.append("name='").append(name).append('\'');
sb.append(", depth=").append(depth);
sb.append('}');
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ public TraceScope add(String name) {
return map.put(name, new DefaultTraceScope(name));
}

public TraceScope addBoundary(String name) {
Objects.requireNonNull(name, "name");

return map.put(name, new BoundaryTraceScope(name));
}

public void clear() {
map.clear();
}
Expand Down

0 comments on commit 20739d8

Please sign in to comment.