Skip to content

Commit ccf73f8

Browse files
committed
Add core async HTTP/2 request-timeout test asserting HttpStreamResetException
1 parent 9a39015 commit ccf73f8

File tree

4 files changed

+372
-68
lines changed

4 files changed

+372
-68
lines changed

httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.net.SocketAddress;
3131
import java.nio.BufferOverflowException;
3232
import java.nio.ByteBuffer;
33+
import java.nio.channels.CancelledKeyException;
3334
import java.nio.channels.SelectionKey;
3435
import java.nio.charset.StandardCharsets;
3536
import java.util.Deque;
@@ -271,7 +272,12 @@ private void commitFrameInternal(final RawFrame frame) throws IOException {
271272
} else {
272273
outputQueue.addLast(frame);
273274
}
274-
ioSession.setEvent(SelectionKey.OP_WRITE);
275+
try {
276+
ioSession.setEvent(SelectionKey.OP_WRITE);
277+
} catch (final CancelledKeyException ex) {
278+
connState = ConnectionHandshake.SHUTDOWN;
279+
ioSession.close(CloseMode.IMMEDIATE);
280+
}
275281
}
276282

277283
private void commitFrame(final RawFrame frame) throws IOException {
@@ -413,7 +419,12 @@ private void incrementInputCapacity(
413419

414420
void requestSessionOutput() {
415421
outputRequests.incrementAndGet();
416-
ioSession.setEvent(SelectionKey.OP_WRITE);
422+
try {
423+
ioSession.setEvent(SelectionKey.OP_WRITE);
424+
} catch (final CancelledKeyException ex) {
425+
connState = ConnectionHandshake.SHUTDOWN;
426+
ioSession.close(CloseMode.IMMEDIATE);
427+
}
417428
}
418429

419430
public final void onConnect() throws HttpException, IOException {
@@ -439,10 +450,6 @@ public final void onInput(final ByteBuffer src) throws HttpException, IOExceptio
439450
for (;;) {
440451
final RawFrame frame = inputBuffer.read(src, ioSession);
441452
if (frame != null) {
442-
if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) {
443-
checkStreamTimeouts(System.nanoTime());
444-
}
445-
446453
if (streamListener != null) {
447454
streamListener.onFrameInput(this, frame.getStreamId(), frame);
448455
}
@@ -1377,9 +1384,12 @@ H2StreamChannel createChannel(final int streamId) {
13771384
}
13781385

13791386
private void initializeStreamTimeouts(final H2Stream stream) {
1380-
final Timeout socketTimeout = ioSession.getSocketTimeout();
1381-
if (socketTimeout != null && socketTimeout.isEnabled()) {
1382-
stream.setIdleTimeout(socketTimeout);
1387+
final Timeout streamIdleTimeout = stream.getIdleTimeout();
1388+
if (streamIdleTimeout == null || !streamIdleTimeout.isEnabled()) {
1389+
final Timeout socketTimeout = ioSession.getSocketTimeout();
1390+
if (socketTimeout != null && socketTimeout.isEnabled()) {
1391+
stream.setIdleTimeout(socketTimeout);
1392+
}
13831393
}
13841394
}
13851395

@@ -1633,7 +1643,6 @@ private void checkStreamTimeouts(final long nowNanos) throws IOException {
16331643
idleTimeout,
16341644
true);
16351645
stream.localReset(ex, H2Error.CANCEL);
1636-
// Once reset due to idle timeout, we do not care about lifetime anymore
16371646
continue;
16381647
}
16391648
}

httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/H2Stream.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,13 @@ boolean isOutputReady() {
213213

214214
void produceOutput() throws HttpException, IOException {
215215
try {
216+
if (channel.isLocalReset()) {
217+
return;
218+
}
219+
if (cancelled.get()) {
220+
localResetCancelled();
221+
return;
222+
}
216223
touch();
217224

218225
handler.produceOutput();
@@ -222,6 +229,13 @@ void produceOutput() throws HttpException, IOException {
222229
}
223230

224231
void produceInputCapacityUpdate() throws IOException {
232+
if (channel.isLocalReset()) {
233+
return;
234+
}
235+
if (cancelled.get()) {
236+
localResetCancelled();
237+
return;
238+
}
225239
touch();
226240
handler.updateInputCapacity();
227241
}

httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2StreamTimeoutClientExample.java

Lines changed: 141 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,16 @@
3232
import java.util.List;
3333
import java.util.concurrent.CountDownLatch;
3434
import java.util.concurrent.TimeUnit;
35+
import java.util.concurrent.atomic.AtomicBoolean;
3536

3637
import org.apache.hc.core5.http.EntityDetails;
3738
import org.apache.hc.core5.http.Header;
3839
import org.apache.hc.core5.http.HttpConnection;
3940
import org.apache.hc.core5.http.HttpException;
41+
import org.apache.hc.core5.http.HttpHost;
4042
import org.apache.hc.core5.http.HttpResponse;
4143
import org.apache.hc.core5.http.impl.bootstrap.HttpAsyncRequester;
44+
import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
4245
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
4346
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
4447
import org.apache.hc.core5.http.nio.CapacityChannel;
@@ -50,6 +53,7 @@
5053
import org.apache.hc.core5.http.protocol.HttpContext;
5154
import org.apache.hc.core5.http.protocol.HttpCoreContext;
5255
import org.apache.hc.core5.http2.H2StreamResetException;
56+
import org.apache.hc.core5.http2.H2StreamTimeoutException;
5357
import org.apache.hc.core5.http2.HttpVersionPolicy;
5458
import org.apache.hc.core5.http2.config.H2Config;
5559
import org.apache.hc.core5.http2.frame.RawFrame;
@@ -60,23 +64,13 @@
6064
import org.apache.hc.core5.util.Timeout;
6165

6266
/**
63-
* Example of an HTTP/2 client where a "slow" request gets aborted when the
64-
* underlying HTTP/2 connection times out due to inactivity (socket timeout).
67+
* Example of an HTTP/2 client where a "slow" request gets aborted by a
68+
* per-stream idle timeout enforced by the HTTP/2 multiplexer.
6569
* <p>
66-
* The client opens a single HTTP/2 connection to {@code nghttp2.org} and
67-
* executes two concurrent requests:
68-
* <ul>
69-
* <li>a "fast" request ({@code /httpbin/ip}), which completes before
70-
* the connection idle timeout, and</li>
71-
* <li>a "slow" request ({@code /httpbin/delay/5}), which keeps the
72-
* connection idle long enough for the I/O reactor to trigger a timeout
73-
* and close the HTTP/2 connection.</li>
74-
* </ul>
75-
* <p>
76-
* When the reactor closes the connection due to inactivity, all active
77-
* streams fail with {@link H2StreamResetException} reporting
78-
* {@code "Timeout due to inactivity (...)"}. The already completed stream
79-
* is not affected.
70+
* The connection socket timeout is set to 2 seconds and is used as the initial / default
71+
* per-stream idle timeout value. The example keeps the connection active by sending
72+
* small "keep-alive" requests on separate streams, so the connection itself does not time out.
73+
* The "slow" stream remains idle long enough to exceed the per-stream idle timeout and gets reset.
8074
*
8175
* @since 5.4
8276
*/
@@ -85,8 +79,6 @@ public class H2StreamTimeoutClientExample {
8579
public static void main(final String[] args) throws Exception {
8680

8781
final IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
88-
// Connection-level inactivity timeout: keep it short so that
89-
// /httpbin/delay/5 reliably triggers it.
9082
.setSoTimeout(2, TimeUnit.SECONDS)
9183
.build();
9284

@@ -167,52 +159,150 @@ public void onOutputFlowControl(
167159

168160
requester.start();
169161

170-
final URI fastUri = new URI("https://nghttp2.org/httpbin/ip");
171-
final URI slowUri = new URI("https://nghttp2.org/httpbin/delay/5");
162+
final HttpHost target = new HttpHost("https", "nghttp2.org", 443);
163+
final AsyncClientEndpoint endpoint = requester.connect(target, Timeout.ofSeconds(10)).get();
164+
165+
try {
166+
final URI keepAliveUri = new URI("https://nghttp2.org/httpbin/ip");
167+
final URI slowUri = new URI("https://nghttp2.org/httpbin/delay/5");
168+
169+
final CountDownLatch latch = new CountDownLatch(2);
170+
final AtomicBoolean stop = new AtomicBoolean(false);
171+
172+
// Keep the connection active with short requests on new streams,
173+
// so the connection does NOT hit "Timeout due to inactivity".
174+
final Thread keepAliveThread = new Thread(() -> {
175+
try {
176+
while (!stop.get()) {
177+
executeKeepAliveOnce(endpoint, keepAliveUri);
178+
Thread.sleep(500);
179+
}
180+
} catch (final Exception ignore) {
181+
} finally {
182+
latch.countDown();
183+
}
184+
});
185+
keepAliveThread.setDaemon(true);
186+
keepAliveThread.start();
187+
188+
// Slow stream: should be reset by per-stream idle timeout while the connection stays active.
189+
executeWithLogging(
190+
endpoint,
191+
slowUri,
192+
"[slow]",
193+
latch,
194+
stop);
195+
196+
latch.await(30, TimeUnit.SECONDS);
197+
198+
} finally {
199+
endpoint.releaseAndReuse();
200+
System.out.println("Shutting down I/O reactor");
201+
requester.initiateShutdown();
202+
}
203+
}
204+
205+
private static void executeKeepAliveOnce(
206+
final AsyncClientEndpoint endpoint,
207+
final URI requestUri) throws InterruptedException {
208+
209+
final AsyncRequestProducer requestProducer = AsyncRequestBuilder.get(requestUri).build();
210+
final BasicResponseConsumer<String> responseConsumer = new BasicResponseConsumer<>(
211+
new StringAsyncEntityConsumer());
212+
213+
final CountDownLatch done = new CountDownLatch(1);
214+
215+
endpoint.execute(new AsyncClientExchangeHandler() {
216+
217+
@Override
218+
public void releaseResources() {
219+
requestProducer.releaseResources();
220+
responseConsumer.releaseResources();
221+
done.countDown();
222+
}
223+
224+
@Override
225+
public void cancel() {
226+
done.countDown();
227+
}
228+
229+
@Override
230+
public void failed(final Exception cause) {
231+
done.countDown();
232+
}
233+
234+
@Override
235+
public void produceRequest(
236+
final RequestChannel channel,
237+
final HttpContext httpContext) throws HttpException, IOException {
238+
requestProducer.sendRequest(channel, httpContext);
239+
}
240+
241+
@Override
242+
public int available() {
243+
return requestProducer.available();
244+
}
245+
246+
@Override
247+
public void produce(final DataStreamChannel channel) throws IOException {
248+
requestProducer.produce(channel);
249+
}
250+
251+
@Override
252+
public void consumeInformation(
253+
final HttpResponse response,
254+
final HttpContext httpContext) throws HttpException, IOException {
255+
// No-op
256+
}
257+
258+
@Override
259+
public void consumeResponse(
260+
final HttpResponse response,
261+
final EntityDetails entityDetails,
262+
final HttpContext httpContext) throws HttpException, IOException {
263+
responseConsumer.consumeResponse(response, entityDetails, httpContext, null);
264+
}
172265

173-
final CountDownLatch latch = new CountDownLatch(2);
266+
@Override
267+
public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
268+
responseConsumer.updateCapacity(capacityChannel);
269+
}
174270

175-
// --- Fast stream: expected to succeed
176-
executeWithLogging(
177-
requester,
178-
fastUri,
179-
"[fast]",
180-
latch,
181-
false);
271+
@Override
272+
public void consume(final ByteBuffer src) throws IOException {
273+
responseConsumer.consume(src);
274+
}
182275

183-
// --- Slow stream: /delay/5 sleeps 5 seconds and should exceed
184-
// the 2-second connection idle timeout, resulting in a reset.
185-
executeWithLogging(
186-
requester,
187-
slowUri,
188-
"[slow]",
189-
latch,
190-
true);
276+
@Override
277+
public void streamEnd(final List<? extends Header> trailers)
278+
throws HttpException, IOException {
279+
responseConsumer.streamEnd(trailers);
280+
}
191281

192-
latch.await();
282+
}, HttpCoreContext.create());
193283

194-
System.out.println("Shutting down I/O reactor");
195-
requester.initiateShutdown();
284+
done.await(5, TimeUnit.SECONDS);
196285
}
197286

198287
private static void executeWithLogging(
199-
final HttpAsyncRequester requester,
288+
final AsyncClientEndpoint endpoint,
200289
final URI requestUri,
201290
final String label,
202291
final CountDownLatch latch,
203-
final boolean expectTimeout) {
292+
final AtomicBoolean stop) {
204293

205294
final AsyncRequestProducer requestProducer = AsyncRequestBuilder.get(requestUri)
206295
.build();
207296
final BasicResponseConsumer<String> responseConsumer = new BasicResponseConsumer<>(
208297
new StringAsyncEntityConsumer());
209298

210-
requester.execute(new AsyncClientExchangeHandler() {
299+
endpoint.execute(new AsyncClientExchangeHandler() {
211300

212301
@Override
213302
public void releaseResources() {
214303
requestProducer.releaseResources();
215304
responseConsumer.releaseResources();
305+
stop.set(true);
216306
latch.countDown();
217307
}
218308

@@ -223,11 +313,12 @@ public void cancel() {
223313

224314
@Override
225315
public void failed(final Exception cause) {
226-
if (expectTimeout && cause instanceof H2StreamResetException) {
227-
final H2StreamResetException ex = (H2StreamResetException) cause;
228-
System.out.println(label + " expected timeout reset: "
229-
+ requestUri
230-
+ " -> " + ex);
316+
if (cause instanceof H2StreamTimeoutException) {
317+
System.out.println(label + " expected per-stream timeout reset: "
318+
+ requestUri + " -> " + cause);
319+
} else if (cause instanceof H2StreamResetException) {
320+
System.out.println(label + " stream reset: "
321+
+ requestUri + " -> " + cause);
231322
} else {
232323
System.out.println(label + " failure: "
233324
+ requestUri + " -> " + cause);
@@ -265,13 +356,8 @@ public void consumeResponse(
265356
final HttpResponse response,
266357
final EntityDetails entityDetails,
267358
final HttpContext httpContext) throws HttpException, IOException {
268-
if (expectTimeout) {
269-
System.out.println(label + " UNEXPECTED success: "
270-
+ requestUri + " -> " + response.getCode());
271-
} else {
272-
System.out.println(label + " response: "
273-
+ requestUri + " -> " + response.getCode());
274-
}
359+
System.out.println(label + " response: "
360+
+ requestUri + " -> " + response.getCode());
275361
responseConsumer.consumeResponse(response, entityDetails, httpContext, null);
276362
}
277363

@@ -289,12 +375,9 @@ public void consume(final ByteBuffer src) throws IOException {
289375
public void streamEnd(final List<? extends Header> trailers)
290376
throws HttpException, IOException {
291377
responseConsumer.streamEnd(trailers);
292-
if (!expectTimeout) {
293-
System.out.println(label + " body completed for " + requestUri);
294-
}
295378
}
296379

297-
}, Timeout.ofSeconds(10), HttpCoreContext.create());
380+
}, HttpCoreContext.create());
298381
}
299382

300383
}

0 commit comments

Comments
 (0)