Skip to content

Commit 319e3d8

Browse files
committed
HTTPCLIENT-2398 - cap async execution queue to break recursive re-entry. Add configurable maxQueuedRequests (default unlimited). Release slot on fail/cancel/close to avoid leaks
1 parent 79992d8 commit 319e3d8

10 files changed

+1335
-21
lines changed
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* ====================================================================
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
* ====================================================================
20+
*
21+
* This software consists of voluntary contributions made by many
22+
* individuals on behalf of the Apache Software Foundation. For more
23+
* information on the Apache Software Foundation, please see
24+
* <http://www.apache.org/>.
25+
*
26+
*/
27+
package org.apache.hc.client5.http.impl.async;
28+
29+
import java.lang.reflect.InvocationHandler;
30+
import java.lang.reflect.InvocationTargetException;
31+
import java.lang.reflect.Method;
32+
import java.lang.reflect.Proxy;
33+
import java.util.concurrent.atomic.AtomicBoolean;
34+
35+
import org.apache.hc.core5.annotation.Internal;
36+
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
37+
38+
@Internal
39+
final class AsyncClientExchangeHandlerProxy implements InvocationHandler {
40+
41+
private final AsyncClientExchangeHandler handler;
42+
private final Runnable onRelease;
43+
private final AtomicBoolean released;
44+
45+
private AsyncClientExchangeHandlerProxy(
46+
final AsyncClientExchangeHandler handler,
47+
final Runnable onRelease) {
48+
this.handler = handler;
49+
this.onRelease = onRelease;
50+
this.released = new AtomicBoolean(false);
51+
}
52+
53+
static AsyncClientExchangeHandler newProxy(
54+
final AsyncClientExchangeHandler handler,
55+
final Runnable onRelease) {
56+
return (AsyncClientExchangeHandler) Proxy.newProxyInstance(
57+
AsyncClientExchangeHandler.class.getClassLoader(),
58+
new Class<?>[]{AsyncClientExchangeHandler.class},
59+
new AsyncClientExchangeHandlerProxy(handler, onRelease));
60+
}
61+
62+
@Override
63+
public Object invoke(
64+
final Object proxy,
65+
final Method method,
66+
final Object[] args) throws Throwable {
67+
if ("releaseResources".equals(method.getName())
68+
&& method.getParameterCount() == 0) {
69+
try {
70+
return method.invoke(handler, args);
71+
} catch (final InvocationTargetException ex) {
72+
throw ex.getCause();
73+
} finally {
74+
if (released.compareAndSet(false, true)) {
75+
onRelease.run();
76+
}
77+
}
78+
}
79+
try {
80+
return method.invoke(handler, args);
81+
} catch (final InvocationTargetException ex) {
82+
throw ex.getCause();
83+
}
84+
}
85+
86+
}

httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/H2AsyncClientBuilder.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,8 @@ private ExecInterceptorEntry(
218218

219219
private boolean priorityHeaderDisabled;
220220

221+
private int maxQueuedRequests = -1;
222+
221223
public static H2AsyncClientBuilder create() {
222224
return new H2AsyncClientBuilder();
223225
}
@@ -324,6 +326,21 @@ public final H2AsyncClientBuilder disableRequestPriority() {
324326
return this;
325327
}
326328

329+
/**
330+
* Sets a hard cap on the number of requests allowed to be queued/in-flight
331+
* within the internal async execution pipeline. When the limit is reached,
332+
* new submissions fail fast with {@link java.util.concurrent.RejectedExecutionException}.
333+
* A value {@code <= 0} means unlimited (default).
334+
*
335+
* @param max maximum number of queued requests; {@code <= 0} to disable the cap
336+
* @return this builder
337+
* @since 5.6
338+
*/
339+
public final H2AsyncClientBuilder setMaxQueuedRequests(final int max) {
340+
this.maxQueuedRequests = max;
341+
return this;
342+
}
343+
327344
/**
328345
* Adds this protocol interceptor to the head of the protocol processing list.
329346
*
@@ -976,7 +993,9 @@ public CloseableHttpAsyncClient build() {
976993
cookieStoreCopy,
977994
credentialsProviderCopy,
978995
defaultRequestConfig,
979-
closeablesCopy);
996+
closeablesCopy,
997+
maxQueuedRequests);
998+
980999
}
9811000

9821001
static class IdleConnectionEvictor implements Closeable {

httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilder.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,8 @@ private ExecInterceptorEntry(
267267

268268
private ProxySelector proxySelector;
269269

270+
private int maxQueuedRequests = -1;
271+
270272
private EarlyHintsListener earlyHintsListener;
271273

272274
private boolean priorityHeaderDisabled;
@@ -899,6 +901,22 @@ public HttpAsyncClientBuilder disableContentCompression() {
899901
return this;
900902
}
901903

904+
/**
905+
* Sets a hard cap on the number of requests allowed to be queued/in-flight
906+
* within the internal async execution pipeline. When the limit is reached,
907+
* new submissions fail fast with {@link java.util.concurrent.RejectedExecutionException}.
908+
* A value <= 0 means unlimited (default).
909+
*
910+
* @param max maximum number of queued requests; <= 0 to disable the cap
911+
* @return this builder
912+
* @since 5.6
913+
*/
914+
public HttpAsyncClientBuilder setMaxQueuedRequests(final int max) {
915+
this.maxQueuedRequests = max;
916+
return this;
917+
}
918+
919+
902920
/**
903921
* Disable installing the HTTP/2 Priority header interceptor by default.
904922
* @since 5.6
@@ -1260,7 +1278,8 @@ public CloseableHttpAsyncClient build() {
12601278
credentialsProviderCopy,
12611279
contextAdaptor(),
12621280
defaultRequestConfig,
1263-
closeablesCopy);
1281+
closeablesCopy,
1282+
maxQueuedRequests);
12641283
}
12651284

12661285
}

httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncClient.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.io.Closeable;
3030
import java.util.List;
3131
import java.util.concurrent.ThreadFactory;
32+
import java.util.concurrent.atomic.AtomicInteger;
3233

3334
import org.apache.hc.client5.http.HttpRoute;
3435
import org.apache.hc.client5.http.async.AsyncExecRuntime;
@@ -69,6 +70,8 @@ public final class InternalH2AsyncClient extends InternalAbstractHttpAsyncClient
6970
private static final Logger LOG = LoggerFactory.getLogger(InternalH2AsyncClient.class);
7071
private final HttpRoutePlanner routePlanner;
7172
private final InternalH2ConnPool connPool;
73+
private final int maxQueuedRequests;
74+
private final AtomicInteger queuedRequests;
7275

7376
InternalH2AsyncClient(
7477
final DefaultConnectingIOReactor ioReactor,
@@ -82,21 +85,27 @@ public final class InternalH2AsyncClient extends InternalAbstractHttpAsyncClient
8285
final CookieStore cookieStore,
8386
final CredentialsProvider credentialsProvider,
8487
final RequestConfig defaultConfig,
85-
final List<Closeable> closeables) {
88+
final List<Closeable> closeables,
89+
final int maxQueuedRequests) {
8690
super(ioReactor, pushConsumerRegistry, threadFactory, execChain,
8791
cookieSpecRegistry, authSchemeRegistry, cookieStore, credentialsProvider, HttpClientContext::castOrCreate,
8892
defaultConfig, closeables);
8993
this.connPool = connPool;
9094
this.routePlanner = routePlanner;
95+
this.maxQueuedRequests = maxQueuedRequests;
96+
this.queuedRequests = maxQueuedRequests > 0 ? new AtomicInteger(0) : null;
9197
}
9298

9399
@Override
94100
AsyncExecRuntime createAsyncExecRuntime(final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) {
95-
return new InternalH2AsyncExecRuntime(LOG, connPool, pushHandlerFactory);
101+
return new InternalH2AsyncExecRuntime(LOG, connPool, pushHandlerFactory, maxQueuedRequests, queuedRequests);
96102
}
97103

98104
@Override
99-
HttpRoute determineRoute(final HttpHost httpHost, final HttpRequest request, final HttpClientContext clientContext) throws HttpException {
105+
HttpRoute determineRoute(
106+
final HttpHost httpHost,
107+
final HttpRequest request,
108+
final HttpClientContext clientContext) throws HttpException {
100109
final HttpRoute route = routePlanner.determineRoute(httpHost, request, clientContext);
101110
if (route.isTunnelled()) {
102111
throw new HttpException("HTTP/2 tunneling not supported");

httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncExecRuntime.java

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
package org.apache.hc.client5.http.impl.async;
2929

3030
import java.io.InterruptedIOException;
31+
import java.util.concurrent.RejectedExecutionException;
32+
import java.util.concurrent.atomic.AtomicInteger;
3133
import java.util.concurrent.atomic.AtomicReference;
3234

3335
import org.apache.hc.client5.http.EndpointInfo;
@@ -61,17 +63,30 @@ class InternalH2AsyncExecRuntime implements AsyncExecRuntime {
6163
private final InternalH2ConnPool connPool;
6264
private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
6365
private final AtomicReference<Endpoint> sessionRef;
66+
private final int maxQueued;
67+
private final AtomicInteger sharedQueued;
6468
private volatile boolean reusable;
6569

6670
InternalH2AsyncExecRuntime(
6771
final Logger log,
6872
final InternalH2ConnPool connPool,
6973
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) {
74+
this(log, connPool, pushHandlerFactory, -1, null);
75+
}
76+
77+
InternalH2AsyncExecRuntime(
78+
final Logger log,
79+
final InternalH2ConnPool connPool,
80+
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
81+
final int maxQueued,
82+
final AtomicInteger sharedQueued) {
7083
super();
7184
this.log = log;
7285
this.connPool = connPool;
7386
this.pushHandlerFactory = pushHandlerFactory;
7487
this.sessionRef = new AtomicReference<>();
88+
this.maxQueued = maxQueued;
89+
this.sharedQueued = sharedQueued;
7590
}
7691

7792
@Override
@@ -246,20 +261,49 @@ public EndpointInfo getEndpointInfo() {
246261
return null;
247262
}
248263

264+
private boolean tryAcquireSlot() {
265+
if (sharedQueued == null || maxQueued <= 0) {
266+
return true;
267+
}
268+
for (;;) {
269+
final int q = sharedQueued.get();
270+
if (q >= maxQueued) {
271+
return false;
272+
}
273+
if (sharedQueued.compareAndSet(q, q + 1)) {
274+
return true;
275+
}
276+
}
277+
}
278+
279+
private void releaseSlot() {
280+
if (sharedQueued != null && maxQueued > 0) {
281+
sharedQueued.decrementAndGet();
282+
}
283+
}
284+
249285
@Override
250286
public Cancellable execute(
251287
final String id,
252288
final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) {
253-
final ComplexCancellable complexCancellable = new ComplexCancellable();
254289
final Endpoint endpoint = ensureValid();
290+
if (!tryAcquireSlot()) {
291+
exchangeHandler.failed(new RejectedExecutionException(
292+
"Execution pipeline queue limit reached (max=" + maxQueued + ")"));
293+
return Operations.nonCancellable();
294+
}
295+
final AsyncClientExchangeHandler actual = sharedQueued != null
296+
? AsyncClientExchangeHandlerProxy.newProxy(exchangeHandler, this::releaseSlot)
297+
: exchangeHandler;
298+
final ComplexCancellable complexCancellable = new ComplexCancellable();
255299
final IOSession session = endpoint.session;
256300
if (session.isOpen()) {
257301
if (log.isDebugEnabled()) {
258302
log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
259303
}
260304
context.setProtocolVersion(HttpVersion.HTTP_2);
261305
session.enqueue(
262-
new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context),
306+
new RequestExecutionCommand(actual, pushHandlerFactory, complexCancellable, context),
263307
Command.Priority.NORMAL);
264308
} else {
265309
final HttpRoute route = endpoint.route;
@@ -276,19 +320,19 @@ public void completed(final IOSession ioSession) {
276320
log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
277321
}
278322
context.setProtocolVersion(HttpVersion.HTTP_2);
279-
session.enqueue(
280-
new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, complexCancellable, context),
323+
ioSession.enqueue(
324+
new RequestExecutionCommand(actual, pushHandlerFactory, complexCancellable, context),
281325
Command.Priority.NORMAL);
282326
}
283327

284328
@Override
285329
public void failed(final Exception ex) {
286-
exchangeHandler.failed(ex);
330+
actual.failed(ex);
287331
}
288332

289333
@Override
290334
public void cancelled() {
291-
exchangeHandler.failed(new InterruptedIOException());
335+
actual.failed(new InterruptedIOException());
292336
}
293337

294338
});
@@ -325,7 +369,7 @@ public String getId() {
325369

326370
@Override
327371
public AsyncExecRuntime fork() {
328-
return new InternalH2AsyncExecRuntime(log, connPool, pushHandlerFactory);
372+
return new InternalH2AsyncExecRuntime(log, connPool, pushHandlerFactory, maxQueued, sharedQueued);
329373
}
330374

331375
}

httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncClient.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.io.Closeable;
3030
import java.util.List;
3131
import java.util.concurrent.ThreadFactory;
32+
import java.util.concurrent.atomic.AtomicInteger;
3233
import java.util.function.Function;
3334

3435
import org.apache.hc.client5.http.HttpRoute;
@@ -75,6 +76,8 @@ public final class InternalHttpAsyncClient extends InternalAbstractHttpAsyncClie
7576
private final AsyncClientConnectionManager manager;
7677
private final HttpRoutePlanner routePlanner;
7778
private final TlsConfig tlsConfig;
79+
private final int maxQueuedRequests;
80+
private final AtomicInteger queuedCounter;
7881

7982
InternalHttpAsyncClient(
8083
final DefaultConnectingIOReactor ioReactor,
@@ -90,18 +93,21 @@ public final class InternalHttpAsyncClient extends InternalAbstractHttpAsyncClie
9093
final CredentialsProvider credentialsProvider,
9194
final Function<HttpContext, HttpClientContext> contextAdaptor,
9295
final RequestConfig defaultConfig,
93-
final List<Closeable> closeables) {
96+
final List<Closeable> closeables,
97+
final int maxQueuedRequests) {
9498
super(ioReactor, pushConsumerRegistry, threadFactory, execChain,
9599
cookieSpecRegistry, authSchemeRegistry, cookieStore, credentialsProvider, contextAdaptor,
96100
defaultConfig, closeables);
97101
this.manager = manager;
98102
this.routePlanner = routePlanner;
99103
this.tlsConfig = tlsConfig;
104+
this.maxQueuedRequests = maxQueuedRequests;
105+
this.queuedCounter = maxQueuedRequests > 0 ? new AtomicInteger(0) : null;
100106
}
101107

102108
@Override
103109
AsyncExecRuntime createAsyncExecRuntime(final HandlerFactory<AsyncPushConsumer> pushHandlerFactory) {
104-
return new InternalHttpAsyncExecRuntime(LOG, manager, getConnectionInitiator(), pushHandlerFactory, tlsConfig);
110+
return new InternalHttpAsyncExecRuntime(LOG, manager, getConnectionInitiator(), pushHandlerFactory, tlsConfig, maxQueuedRequests, queuedCounter);
105111
}
106112

107113
@Override

0 commit comments

Comments
 (0)