Skip to content

Commit eb53fe3

Browse files
authored
Add helper class to capture context using ScheduledExecutorService (#6712)
Signed-off-by: Adriano Machado <[email protected]>
1 parent 0f859b4 commit eb53fe3

File tree

6 files changed

+383
-49
lines changed

6 files changed

+383
-49
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Gradle
22
build
33
.gradle
4+
.kotlin
45
local.properties
56
out/
67

context/src/main/java/io/opentelemetry/context/Context.java

+29
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.Executor;
2828
import java.util.concurrent.ExecutorService;
2929
import java.util.concurrent.ScheduledExecutorService;
30+
import java.util.concurrent.TimeUnit;
3031
import java.util.function.BiConsumer;
3132
import java.util.function.BiFunction;
3233
import java.util.function.Consumer;
@@ -135,9 +136,37 @@ static Executor taskWrapping(Executor executor) {
135136
* @since 1.1.0
136137
*/
137138
static ExecutorService taskWrapping(ExecutorService executorService) {
139+
if (executorService instanceof CurrentContextExecutorService) {
140+
return executorService;
141+
}
138142
return new CurrentContextExecutorService(executorService);
139143
}
140144

145+
/**
146+
* Returns an {@link ScheduledExecutorService} which delegates to the provided {@code
147+
* executorService}, wrapping all invocations of {@link ExecutorService} methods such as {@link
148+
* ExecutorService#execute(Runnable)} or {@link ExecutorService#submit(Runnable)} with the
149+
* {@linkplain Context#current() current context} at the time of invocation.
150+
*
151+
* <p>This is generally used to create an {@link ScheduledExecutorService} which will forward the
152+
* {@link Context} during an invocation to another thread. For example, you may use something like
153+
* {@code ScheduledExecutorService dbExecutor = Context.wrapTasks(threadPool)} to ensure calls
154+
* like {@code dbExecutor.execute(() -> database.query())} have {@link Context} available on the
155+
* thread executing database queries.
156+
*
157+
* <p>Note: The context will not be propagated for {@link
158+
* ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)} and {@link
159+
* ScheduledExecutorService#scheduleWithFixedDelay(Runnable, long, long, TimeUnit)} calls.
160+
*
161+
* @since 1.43.0
162+
*/
163+
static ScheduledExecutorService taskWrapping(ScheduledExecutorService executorService) {
164+
if (executorService instanceof CurrentContextScheduledExecutorService) {
165+
return executorService;
166+
}
167+
return new CurrentContextScheduledExecutorService(executorService);
168+
}
169+
141170
/**
142171
* Returns the value stored in this {@link Context} for the given {@link ContextKey}, or {@code
143172
* null} if there is no value for the key in this context.

context/src/main/java/io/opentelemetry/context/CurrentContextExecutorService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import java.util.concurrent.TimeUnit;
1515
import java.util.concurrent.TimeoutException;
1616

17-
final class CurrentContextExecutorService extends ForwardingExecutorService {
17+
class CurrentContextExecutorService extends ForwardingExecutorService {
1818

1919
CurrentContextExecutorService(ExecutorService delegate) {
2020
super(delegate);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.context;
7+
8+
import java.util.concurrent.Callable;
9+
import java.util.concurrent.ScheduledExecutorService;
10+
import java.util.concurrent.ScheduledFuture;
11+
import java.util.concurrent.TimeUnit;
12+
13+
final class CurrentContextScheduledExecutorService extends CurrentContextExecutorService
14+
implements ScheduledExecutorService {
15+
16+
private final ScheduledExecutorService delegate;
17+
18+
CurrentContextScheduledExecutorService(ScheduledExecutorService delegate) {
19+
super(delegate);
20+
this.delegate = delegate;
21+
}
22+
23+
@Override
24+
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
25+
return delegate.schedule(Context.current().wrap(command), delay, unit);
26+
}
27+
28+
@Override
29+
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
30+
return delegate.schedule(Context.current().wrap(callable), delay, unit);
31+
}
32+
33+
@Override
34+
public ScheduledFuture<?> scheduleAtFixedRate(
35+
Runnable command, long initialDelay, long period, TimeUnit unit) {
36+
return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
37+
}
38+
39+
@Override
40+
public ScheduledFuture<?> scheduleWithFixedDelay(
41+
Runnable command, long initialDelay, long delay, TimeUnit unit) {
42+
return delegate.scheduleWithFixedDelay(command, initialDelay, delay, unit);
43+
}
44+
}

0 commit comments

Comments
 (0)