Skip to content

Commit 89c026a

Browse files
committed
QPIDJMS-552 Shared reference counted FJ pool
1 parent 66868fe commit 89c026a

7 files changed

+329
-41
lines changed

qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java

+12-13
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,14 @@
2222
import java.util.Set;
2323
import java.util.concurrent.ConcurrentHashMap;
2424
import java.util.concurrent.CopyOnWriteArraySet;
25-
import java.util.concurrent.ExecutionException;
26-
import java.util.concurrent.Executor;
2725
import java.util.concurrent.ExecutorService;
28-
import java.util.concurrent.Future;
29-
import java.util.concurrent.LinkedBlockingQueue;
3026
import java.util.concurrent.LinkedTransferQueue;
31-
import java.util.concurrent.SynchronousQueue;
3227
import java.util.concurrent.ThreadPoolExecutor;
3328
import java.util.concurrent.TimeUnit;
3429
import java.util.concurrent.atomic.AtomicBoolean;
3530
import java.util.concurrent.atomic.AtomicLong;
3631
import java.util.concurrent.atomic.AtomicReference;
32+
import java.util.function.Supplier;
3733

3834
import javax.jms.Connection;
3935
import javax.jms.ConnectionConsumer;
@@ -89,6 +85,7 @@
8985
import org.apache.qpid.jms.provider.ProviderSynchronization;
9086
import org.apache.qpid.jms.tracing.JmsTracer;
9187
import org.apache.qpid.jms.util.FifoMessageQueue;
88+
import org.apache.qpid.jms.util.HolderSuppliers;
9289
import org.apache.qpid.jms.util.MessageQueue;
9390
import org.apache.qpid.jms.util.PriorityMessageQueue;
9491
import org.apache.qpid.jms.util.QpidJMSThreadFactory;
@@ -125,6 +122,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
125122
private final AtomicLong transactionIdGenerator = new AtomicLong();
126123
private final AtomicLong connectionConsumerIdGenerator = new AtomicLong();
127124
private final Map<AsyncResult, AsyncResult> requests = new ConcurrentHashMap<>();
125+
private final HolderSuppliers.Holder<ExecutorService> completionExecutorService;
128126

129127
protected JmsConnection(final JmsConnectionInfo connectionInfo, Provider provider) throws JMSException {
130128

@@ -157,6 +155,7 @@ public void run() {
157155

158156
this.connectionInfo = connectionInfo;
159157
this.connectionInfo.setConnection(this);
158+
this.completionExecutorService = this.connectionInfo.getCompletionExecutorServiceFactory().map(Supplier::get).orElse(null);
160159
}
161160

162161
JmsConnection connect() throws JMSException {
@@ -217,6 +216,10 @@ public void close() throws JMSException {
217216
session.shutdown();
218217
}
219218

219+
if (completionExecutorService != null) {
220+
completionExecutorService.close();
221+
}
222+
220223
for (JmsConnectionConsumer connectionConsumer : connectionConsumers.values()) {
221224
connectionConsumer.shutdown();
222225
}
@@ -272,6 +275,10 @@ public void close() throws JMSException {
272275
}
273276
}
274277

278+
HolderSuppliers.Holder<ExecutorService> getCompletionExecutorService() {
279+
return completionExecutorService;
280+
}
281+
275282
/**
276283
* Called to free all Connection resources.
277284
*/
@@ -1146,14 +1153,6 @@ void setMessageFactory(JmsMessageFactory factory) {
11461153
messageFactory = factory;
11471154
}
11481155

1149-
public void setUseConnectionCompletionHandler(boolean value) {
1150-
connectionInfo.setUseConnectionCompletionHandler(value);
1151-
}
1152-
1153-
public boolean isUseConnectionCompletionHandler() {
1154-
return connectionInfo.isUseConnectionCompletionHandler();
1155-
}
1156-
11571156
public boolean isForceAsyncAcks() {
11581157
return connectionInfo.isForceAsyncAcks();
11591158
}

qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java

+28-7
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@
2222
import java.security.PrivilegedAction;
2323
import java.util.EnumMap;
2424
import java.util.Map;
25+
import java.util.concurrent.ExecutorService;
26+
import java.util.concurrent.ForkJoinPool;
2527
import java.util.function.BiFunction;
28+
import java.util.function.Supplier;
2629

2730
import javax.jms.Connection;
2831
import javax.jms.ConnectionFactory;
@@ -55,13 +58,18 @@
5558
import org.apache.qpid.jms.tracing.JmsNoOpTracer;
5659
import org.apache.qpid.jms.tracing.JmsTracer;
5760
import org.apache.qpid.jms.tracing.JmsTracerFactory;
61+
import org.apache.qpid.jms.util.HolderSuppliers.Holder;
5862
import org.apache.qpid.jms.util.IdGenerator;
5963
import org.apache.qpid.jms.util.PropertyUtil;
64+
import org.apache.qpid.jms.util.QpidJMSForkJoinWorkerThreadFactory;
65+
import org.apache.qpid.jms.util.ThreadPoolUtils;
6066
import org.apache.qpid.jms.util.URISupport;
6167
import org.apache.qpid.jms.util.URISupport.CompositeData;
6268
import org.slf4j.Logger;
6369
import org.slf4j.LoggerFactory;
6470

71+
import static org.apache.qpid.jms.util.HolderSuppliers.sharedRefCnt;
72+
6573
/**
6674
* JMS ConnectionFactory Implementation.
6775
*/
@@ -102,15 +110,15 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact
102110
private long requestTimeout = JmsConnectionInfo.DEFAULT_REQUEST_TIMEOUT;
103111
private long closeTimeout = JmsConnectionInfo.DEFAULT_CLOSE_TIMEOUT;
104112
private long connectTimeout = JmsConnectionInfo.DEFAULT_CONNECT_TIMEOUT;
105-
private boolean useConnectionCompletionHandler = JmsConnectionInfo.DEFAULT_USE_CONNECTION_COMPLETION_HANDLER;
113+
private int completionThreads = JmsConnectionInfo.DEFAULT_COMPLETION_THREADS;
106114
private IdGenerator clientIdGenerator;
107115
private String clientIDPrefix;
108116
private IdGenerator connectionIdGenerator;
109117
private String connectionIDPrefix;
110118
private ExceptionListener exceptionListener;
111119
private String tracing;
112120
private JmsTracer tracer;
113-
121+
private Supplier<Holder<ExecutorService>> completionExecutorServiceFactory;
114122
private JmsPrefetchPolicy prefetchPolicy = new JmsDefaultPrefetchPolicy();
115123
private JmsRedeliveryPolicy redeliveryPolicy = new JmsDefaultRedeliveryPolicy();
116124
private JmsPresettlePolicy presettlePolicy = new JmsDefaultPresettlePolicy();
@@ -283,7 +291,7 @@ protected JmsConnectionInfo configureConnectionInfo(String username, String pass
283291
implicitTracer = JmsTracerFactory.create(remoteURI, tracing);
284292
connectionInfo.setTracer(implicitTracer);
285293
}
286-
294+
connectionInfo.setCompletionExecutorServiceFactory(getCompletionExecutorServiceFactory());
287295
// Set properties to make additional configuration changes
288296
PropertyUtil.setProperties(connectionInfo, properties);
289297

@@ -369,6 +377,19 @@ protected static URI createURI(String name) {
369377
return null;
370378
}
371379

380+
protected Supplier<Holder<ExecutorService>> getCompletionExecutorServiceFactory() {
381+
if (this.completionThreads == 0) {
382+
return null;
383+
}
384+
synchronized (this) {
385+
if (completionExecutorServiceFactory == null) {
386+
QpidJMSForkJoinWorkerThreadFactory fjThreadFactory = new QpidJMSForkJoinWorkerThreadFactory("completion thread pool", true);
387+
completionExecutorServiceFactory = sharedRefCnt(() -> new ForkJoinPool(completionThreads, fjThreadFactory, null, false), ThreadPoolUtils::shutdown);
388+
}
389+
return completionExecutorServiceFactory;
390+
}
391+
}
392+
372393
protected synchronized IdGenerator getConnectionIdGenerator() {
373394
if (connectionIdGenerator == null) {
374395
if (connectionIDPrefix != null) {
@@ -592,12 +613,12 @@ public long getConnectTimeout() {
592613
return this.connectTimeout;
593614
}
594615

595-
public void setUseConnectionCompletionHandler(final boolean useConnectionCompletionHandler) {
596-
this.useConnectionCompletionHandler = useConnectionCompletionHandler;
616+
public void setCompletionThreads(final int completionThreads) {
617+
this.completionThreads = completionThreads;
597618
}
598619

599-
public boolean isUseConnectionCompletionHandler() {
600-
return useConnectionCompletionHandler;
620+
public int getCompletionThreads() {
621+
return completionThreads;
601622
}
602623

603624
/**

qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java

+11-11
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.Map;
2727
import java.util.concurrent.ConcurrentHashMap;
2828
import java.util.concurrent.ConcurrentLinkedDeque;
29+
import java.util.concurrent.ConcurrentLinkedQueue;
2930
import java.util.concurrent.CountDownLatch;
3031
import java.util.concurrent.ExecutionException;
3132
import java.util.concurrent.Executor;
@@ -72,7 +73,6 @@
7273
import javax.jms.TopicSession;
7374
import javax.jms.TopicSubscriber;
7475

75-
import io.netty.util.internal.PlatformDependent;
7676
import org.apache.qpid.jms.exceptions.JmsConnectionFailedException;
7777
import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
7878
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
@@ -127,7 +127,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
127127
private final JmsSessionInfo sessionInfo;
128128
private final ReentrantLock sendLock = new ReentrantLock();
129129
private volatile ThreadPoolExecutor deliveryExecutor;
130-
private volatile ThreadPoolExecutor completionExcecutor;
130+
private volatile ExecutorService completionExecutor;
131131
private AtomicReference<Thread> deliveryThread = new AtomicReference<Thread>();
132132
private boolean deliveryThreadCheckEnabled = true;
133133
private volatile Thread completionThread = null;
@@ -139,7 +139,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
139139
private boolean sessionRecovered;
140140
private final AtomicReference<Throwable> failureCause = new AtomicReference<>();
141141
private final Deque<SendCompletion> asyncSendQueue = new ConcurrentLinkedDeque<SendCompletion>();
142-
private final java.util.Queue<Runnable> completionTasks = PlatformDependent.newMpscQueue();
142+
private final ConcurrentLinkedQueue<Runnable> completionTasks = new ConcurrentLinkedQueue<>();
143143

144144
protected JmsSession(JmsConnection connection, JmsSessionId sessionId, int acknowledgementMode) throws JMSException {
145145
this.connection = connection;
@@ -222,7 +222,7 @@ private void processCompletions() {
222222
if (!processCompletion.compareAndSet(false, true)) {
223223
return;
224224
}
225-
if (!connection.isUseConnectionCompletionHandler()) {
225+
if (connection.getCompletionExecutorService() == null) {
226226
// an exclusive per-session executor doesn't need to guarantee fair completion processing
227227
// and can just keep on processing completion on this same thread
228228
continue;
@@ -428,11 +428,11 @@ protected boolean shutdown(Throwable cause) throws JMSException {
428428
cause = new JMSException("Session closed remotely before message transfer result was notified");
429429
}
430430
asyncProcessCompletion(new FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)), true);
431-
if (!connection.isUseConnectionCompletionHandler()) {
431+
if (connection.getCompletionExecutorService() == null) {
432432
getCompletionExecutor().shutdown();
433433
}
434434
}
435-
if (connection.isUseConnectionCompletionHandler()) {
435+
if (connection.getCompletionExecutorService() != null) {
436436
final CountDownLatch completed = new CountDownLatch(1);
437437
try {
438438
asyncProcessCompletion(completed::countDown, true);
@@ -1260,13 +1260,13 @@ Executor getDispatcherExecutor() {
12601260
}
12611261

12621262
private ExecutorService getCompletionExecutor() {
1263-
ThreadPoolExecutor exec = completionExcecutor;
1263+
ExecutorService exec = completionExecutor;
12641264
if (exec == null) {
12651265
synchronized (sessionInfo) {
1266-
exec = completionExcecutor;
1266+
exec = completionExecutor;
12671267
if (exec == null) {
1268-
if (connection.isUseConnectionCompletionHandler()) {
1269-
exec = connection.executor;
1268+
if (connection.getCompletionExecutorService() != null) {
1269+
exec = connection.getCompletionExecutorService().ref();
12701270
} else {
12711271
exec = createExecutor("completion dispatcher", null);
12721272
}
@@ -1280,7 +1280,7 @@ private ExecutorService getCompletionExecutor() {
12801280
LOG.trace("Completion Executor starter task failed: {}", e.getMessage());
12811281
}
12821282

1283-
completionExcecutor = exec;
1283+
completionExecutor = exec;
12841284
}
12851285
}
12861286
}

qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java

+18-10
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@
2020
import java.nio.charset.Charset;
2121
import java.util.EnumMap;
2222
import java.util.Objects;
23+
import java.util.Optional;
24+
import java.util.concurrent.ExecutorService;
2325
import java.util.function.BiFunction;
26+
import java.util.function.Supplier;
2427

2528
import javax.jms.Connection;
2629

@@ -38,6 +41,8 @@
3841
import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
3942
import org.apache.qpid.jms.tracing.JmsNoOpTracer;
4043
import org.apache.qpid.jms.tracing.JmsTracer;
44+
import org.apache.qpid.jms.util.HolderSuppliers;
45+
import org.apache.qpid.jms.util.HolderSuppliers.Holder;
4146

4247
/**
4348
* Meta object that contains the JmsConnection identification and configuration
@@ -50,7 +55,7 @@ public final class JmsConnectionInfo extends JmsAbstractResource implements Comp
5055
public static final long DEFAULT_CLOSE_TIMEOUT = 60000;
5156
public static final long DEFAULT_SEND_TIMEOUT = INFINITE;
5257
public static final long DEFAULT_REQUEST_TIMEOUT = INFINITE;
53-
public static final boolean DEFAULT_USE_CONNECTION_COMPLETION_HANDLER = false;
58+
public static final int DEFAULT_COMPLETION_THREADS = 0;
5459

5560
private final JmsConnectionId connectionId;
5661
private final EnumMap<JmsConnectionExtensions, BiFunction<Connection, URI, Object>> extensionMap = new EnumMap<>(JmsConnectionExtensions.class);
@@ -79,7 +84,6 @@ public final class JmsConnectionInfo extends JmsAbstractResource implements Comp
7984
private long requestTimeout = DEFAULT_REQUEST_TIMEOUT;
8085
private long connectTimeout = DEFAULT_CONNECT_TIMEOUT;
8186
private long closeTimeout = DEFAULT_CLOSE_TIMEOUT;
82-
private boolean useConnectionCompletionHandler;
8387
private String queuePrefix = null;
8488
private String topicPrefix = null;
8589

@@ -91,6 +95,7 @@ public final class JmsConnectionInfo extends JmsAbstractResource implements Comp
9195

9296
private volatile byte[] encodedUserId;
9397
private JmsTracer tracer = JmsNoOpTracer.INSTANCE;
98+
private Optional<Supplier<Holder<ExecutorService>>> completionExecutorServiceFactory;
9499

95100
public JmsConnectionInfo(JmsConnectionId connectionId) {
96101
if (connectionId == null) {
@@ -261,14 +266,6 @@ public void setRequestTimeout(long requestTimeout) {
261266
this.requestTimeout = requestTimeout;
262267
}
263268

264-
public void setUseConnectionCompletionHandler(boolean value) {
265-
this.useConnectionCompletionHandler = value;
266-
}
267-
268-
public boolean isUseConnectionCompletionHandler() {
269-
return useConnectionCompletionHandler;
270-
}
271-
272269
public boolean isLocalMessagePriority() {
273270
return localMessagePriority;
274271
}
@@ -462,4 +459,15 @@ public void setTracer(JmsTracer tracer) {
462459
public JmsTracer getTracer() {
463460
return tracer;
464461
}
462+
463+
public void setCompletionExecutorServiceFactory(final Supplier<Holder<ExecutorService>> completionExecutorServiceFactory) {
464+
this.completionExecutorServiceFactory = Optional.ofNullable(completionExecutorServiceFactory);
465+
}
466+
467+
public Optional<Supplier<Holder<ExecutorService>>> getCompletionExecutorServiceFactory() {
468+
if (completionExecutorServiceFactory == null) {
469+
return Optional.empty();
470+
}
471+
return completionExecutorServiceFactory;
472+
}
465473
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.qpid.jms.util;
18+
19+
import java.util.concurrent.atomic.AtomicBoolean;
20+
import java.util.function.Consumer;
21+
import java.util.function.Supplier;
22+
23+
public class HolderSuppliers {
24+
25+
public interface Holder<T> extends AutoCloseable {
26+
27+
T ref();
28+
29+
@Override
30+
void close();
31+
}
32+
33+
public static <T> Supplier<Holder<T>> sharedRefCnt(final Supplier<? extends T> create,
34+
final Consumer<? super T> onDispose) {
35+
return new SharedDisposable<>(create, onDispose);
36+
}
37+
38+
public static <T> Supplier<Holder<T>> exclusiveDisposable(final Supplier<? extends T> create,
39+
final Consumer<? super T> onDispose) {
40+
return () -> new Holder<T>() {
41+
42+
private final T ref = create.get();
43+
private final AtomicBoolean closed = new AtomicBoolean(false);
44+
45+
@Override
46+
public T ref() {
47+
if (closed.get()) {
48+
throw new IllegalStateException("closed lease");
49+
}
50+
return ref;
51+
}
52+
53+
@Override
54+
public void close() {
55+
if (closed.compareAndSet(false, true)) {
56+
onDispose.accept(ref);
57+
}
58+
}
59+
};
60+
}
61+
}

0 commit comments

Comments
 (0)