Skip to content

Commit

Permalink
4.x: Unbuffered OutputStream in Http1ServerResponse (helidon-io#9276)
Browse files Browse the repository at this point in the history
Currently, having a write buffer size of 0 always leads to an exception
when an Http1 request handler tries to create an OutputStream.
  • Loading branch information
Kay Werndli authored and romain-grecourt committed Oct 17, 2024
1 parent 6a93b66 commit 868e8a3
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class Http1ServerResponse extends ServerResponseBase<Http1ServerResponse> {

private boolean streamingEntity;
private boolean isSent;
private ClosingBufferedOutputStream outputStream;
private ResponseOutputStream outputStream;
private long bytesWritten;
private String streamResult = "";
private final boolean validateHeaders;
Expand Down Expand Up @@ -421,7 +421,11 @@ private OutputStream outputStream(boolean skipEncoders) {
validateHeaders);

int writeBufferSize = ctx.listenerContext().config().writeBufferSize();
outputStream = new ClosingBufferedOutputStream(bos, writeBufferSize);
if (writeBufferSize > 0) {
outputStream = new ClosingBufferedOutputStream(bos, writeBufferSize);
} else {
outputStream = bos;
}

OutputStream encodedOutputStream = outputStream;
if (!skipEncoders) {
Expand All @@ -431,7 +435,29 @@ private OutputStream outputStream(boolean skipEncoders) {
return outputStreamFilter == null ? encodedOutputStream : outputStreamFilter.apply(encodedOutputStream);
}

static class BlockingOutputStream extends OutputStream {
abstract static class ResponseOutputStream extends OutputStream {
abstract long totalBytesWritten();

abstract void commit();

/**
* This is a noop, even when user closes the output stream, we wait for the
* call to {@link this#commit()}.
*/
@Override
public void close() {
// no-op
}

/**
* Calls the {@link OutputStream#close()}, which is currently a no-op.
*/
void superClose() throws IOException {
super.close();
}
}

static class BlockingOutputStream extends ResponseOutputStream {
private final ServerResponseHeaders headers;
private final WritableHeaders<?> trailers;
private final Supplier<Status> status;
Expand Down Expand Up @@ -521,15 +547,6 @@ public void flush() throws IOException {
}
}

/**
* This is a noop, even when user closes the output stream, we wait for the
* call to {@link this#commit()}.
*/
@Override
public void close() {
// no-op
}

/**
* Informs output stream that closing phase has started. Special handling
* for {@link this#flush()}.
Expand Down Expand Up @@ -573,12 +590,13 @@ void commit() {

responseCloseRunnable.run();
try {
super.close();
superClose();
} catch (IOException e) {
throw new ServerConnectionException("Failed to close server response stream.", e);
}
}

@Override
long totalBytesWritten() {
return responseBytesTotal;
}
Expand Down Expand Up @@ -763,7 +781,7 @@ private void writeContent(BufferData buffer) throws IOException {
* {@link io.helidon.webserver.http1.Http1ServerResponse.BlockingOutputStream}
* with a {@link java.io.BufferedOutputStream}.
*/
static class ClosingBufferedOutputStream extends OutputStream {
static class ClosingBufferedOutputStream extends ResponseOutputStream {

private final BlockingOutputStream closingDelegate;
private final OutputStream delegate;
Expand Down Expand Up @@ -803,10 +821,12 @@ public void close() {
}
}

@Override
long totalBytesWritten() {
return closingDelegate.totalBytesWritten();
}

@Override
void commit() {
try {
flush();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright (c) 2024 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.helidon.webserver.http1;

import io.helidon.webserver.WebServer;
import io.helidon.webserver.http.Handler;
import io.helidon.webserver.http1.Http1ServerResponse.BlockingOutputStream;
import io.helidon.webserver.http1.Http1ServerResponse.ClosingBufferedOutputStream;
import org.junit.jupiter.api.Test;

import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
import java.nio.charset.StandardCharsets;

import io.helidon.webserver.http.ServerResponse;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

class WriteBufferTest {

/**
* Test that a simple response can be sent using the {@link ServerResponse#outputStream()} using the default
* (non-zero) write buffer.
*/
@Test
void defaultWriteBufferTest() throws Exception {
String path = "/test";
String response = "Hello World!";
Handler handler = (req, res) -> {
try(OutputStream out = res.outputStream()) {
assertThat(out, instanceOf(ClosingBufferedOutputStream.class));
out.write(response.getBytes(StandardCharsets.UTF_8));
}
};
WebServer server = WebServer.builder().port(0).routing(it -> it.get(path, handler)).build().start();
try {
URL url = new URI("http://localhost:" + server.port() + path).toURL();
HttpURLConnection conn = (HttpURLConnection)url.openConnection();
conn.setRequestMethod("GET");
assertThat(conn.getResponseCode(), is(200));
String received = new String(conn.getInputStream().readAllBytes(), StandardCharsets.UTF_8);
assertThat(received, is(response));
} finally {
server.stop();
}
}

/**
* Test that a simple response can be sent using the {@link ServerResponse#outputStream()} using no write buffer
* (i.e. the write buffer size was set to {@code 0}).
*/
@Test
void noWriteBufferTest() throws Exception {
String path = "/test";
String response = "Hello World!";
Handler handler = (req, res) -> {
try(OutputStream out = res.outputStream()) {
assertThat(out, instanceOf(BlockingOutputStream.class));
out.write(response.getBytes(StandardCharsets.UTF_8));
}
};
WebServer server = WebServer.builder().port(0).writeBufferSize(0)
.routing(it -> it.get(path, handler)).build().start();
try {
URL url = new URI("http://localhost:" + server.port() + path).toURL();
HttpURLConnection conn = (HttpURLConnection)url.openConnection();
conn.setRequestMethod("GET");
assertThat(conn.getResponseCode(), is(200));
String received = new String(conn.getInputStream().readAllBytes(), StandardCharsets.UTF_8);
assertThat(received, is(response));
} finally {
server.stop();
}
}

}

0 comments on commit 868e8a3

Please sign in to comment.