Skip to content

Commit 7658ca2

Browse files
authored
Buffered JSON parsing (#8803)
Currently, we use a non-blocking parser to transform input JSON to a JsonNode, then send it downstream, and finally convert it to the desired request type when the route arguments are bound. This patch delays parsing until the conversion to the route type. It consists of these parts: - A change to JsonContentProcessor. The JsonContentProcessor has to handle certain scenarios (`application/x-json-stream`, or our support for streaming a json array as a Publisher) where the input contains multiple json tokens, Instead of forwarding the input bytes to an async parser, they are first split into individual json values, that are then forwarded or parsed in bulk. - A new JsonCounter class. This class handles splitting the input into its individual json nodes. It is basically a limited JSON parser that counts braces. It is already very optimized, but it also has a special optimization when the content type is `application/json`, in which case it assumes only a single json value in the input (this is technically a breaking change, but breaks no tests). I also wrote fuzz tests for JsonCounter, I will submit them in a separate PR to https://github.com/micronaut-projects/micronaut-fuzzing . - A benchmark. It's not run by the build tools, but it is good to have around. - An option in NettyHttpServerConfiguration to re-enable eager parsing to JsonNode. JsonCounter is still used, so it's not 100% the old behavior, but it should be more compatible in some respects because it doesn't change the conversion logic anymore. - A new LazyJsonNode class. It contains a ByteBuffer with the actual unparsed bytes. It's a bit complicated because it contains a reference counted buffer that has to be released after conversion, but also sometimes multiple converters are called for the same LazyJsonNode (once to JsonNode, once to a user-defined type). To solve this, when there is a conversion to JsonNode, it keeps that JsonNode around (and releases the buffer). If there are future conversions to a specific type, it will use the JsonNode as the source instead. - A new JsonSyntaxException class. JsonMappers can throw this to signal a syntax error in the JSON, which will be reported differently than a data binding error. - Changes to JsonMapper API: The asynchronous parser is deprecated. There is a new readValue method that takes a ByteBuffer. The jackson implementation has an optimization for netty ByteBuffers. - New converters for LazyJsonNode. Also removed some old superfluous converters for JsonNode. - A change to RequestLifecycle to show JSON syntax error messages like we did previously, instead of the opaque 400 that we usually send for conversion errors. - Some test changes to reflect changes in error messages. Because we now parse the input in bulk, in some cases jackson can decorate errors with short snippets of the failing input data. Is this an issue? There are a few potential incompatibilities with this change: - Non-standard JSON features when combined with JsonCounter (i.e. when streaming a JSON array, or when using `application/x-json-stream`) will break even when the JSON parser is configured to support them. e.g. if the user configured jackson to ignore comments, that may fail now. - When the input body is never bound, JSON syntax errors may not be caught. - JsonCounter only supports UTF-8, by design. This is permitted by the JSON standard, however Jackson also supports UTF-16 and UTF-32. imo this is a net benefit, to avoid potential parser differential vulnerabilities. I also made a jackson feature to match this, though JsonCounter is sufficient now: FasterXML/jackson-core#921 - JsonMapper implementations that do not use JsonSyntaxException yet will have less verbose HTTP errors until they switch to JsonSyntaxException.
1 parent 4398d51 commit 7658ca2

File tree

28 files changed

+1612
-214
lines changed

28 files changed

+1612
-214
lines changed

buffer-netty/src/main/java/io/micronaut/buffer/netty/NettyByteBuffer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class NettyByteBuffer implements ByteBuffer<ByteBuf>, ReferenceCounted {
5050
}
5151

5252
@Override
53-
public ByteBuffer retain() {
53+
public NettyByteBuffer retain() {
5454
delegate.retain();
5555
return this;
5656
}

core/src/main/java/io/micronaut/core/io/buffer/ReferenceCounted.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public interface ReferenceCounted {
2525
*
2626
* @return this
2727
*/
28-
ByteBuffer retain();
28+
ReferenceCounted retain();
2929

3030
/**
3131
* Release a reference to this object.

http-server-netty/build.gradle

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ dependencies {
3131
compileOnly libs.kotlin.stdlib
3232
compileOnly libs.managed.netty.transport.native.unix.common
3333

34+
testImplementation 'org.openjdk.jmh:jmh-core:1.36'
35+
testAnnotationProcessor 'org.openjdk.jmh:jmh-generator-annprocess:1.36'
36+
37+
3438
testCompileOnly project(":inject-groovy")
3539
testCompileOnly(libs.jetbrains.annotations)
3640
testAnnotationProcessor project(":inject-java")

http-server-netty/src/main/java/io/micronaut/http/server/netty/configuration/NettyHttpServerConfiguration.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,22 @@ public class NettyHttpServerConfiguration extends HttpServerConfiguration {
114114
@SuppressWarnings("WeakerAccess")
115115
public static final boolean DEFAULT_KEEP_ALIVE_ON_SERVER_ERROR = true;
116116

117+
/**
118+
* The default value for eager parsing.
119+
*
120+
* @since 4.0.0
121+
*/
122+
@SuppressWarnings("WeakerAccess")
123+
public static final boolean DEFAULT_EAGER_PARSING = false;
124+
125+
/**
126+
* The default value for eager parsing.
127+
*
128+
* @since 4.0.0
129+
*/
130+
@SuppressWarnings("WeakerAccess")
131+
public static final int DEFAULT_JSON_BUFFER_MAX_COMPONENTS = 4096;
132+
117133
private static final Logger LOG = LoggerFactory.getLogger(NettyHttpServerConfiguration.class);
118134

119135
private final List<ChannelPipelineListener> pipelineCustomizers;
@@ -140,6 +156,8 @@ public class NettyHttpServerConfiguration extends HttpServerConfiguration {
140156
private boolean keepAliveOnServerError = DEFAULT_KEEP_ALIVE_ON_SERVER_ERROR;
141157
private String pcapLoggingPathPattern = null;
142158
private List<NettyListenerConfiguration> listeners = null;
159+
private boolean eagerParsing = DEFAULT_EAGER_PARSING;
160+
private int jsonBufferMaxComponents = DEFAULT_JSON_BUFFER_MAX_COMPONENTS;
143161

144162
/**
145163
* Default empty constructor.
@@ -564,6 +582,51 @@ public void setListeners(List<NettyListenerConfiguration> listeners) {
564582
this.listeners = listeners;
565583
}
566584

585+
/**
586+
* Parse incoming JSON data eagerly, before route binding. Default value
587+
* {@value DEFAULT_EAGER_PARSING}.
588+
*
589+
* @return Whether to parse incoming JSON data eagerly before route binding
590+
* @since 4.0.0
591+
*/
592+
public boolean isEagerParsing() {
593+
return eagerParsing;
594+
}
595+
596+
/**
597+
* Parse incoming JSON data eagerly, before route binding. Default value
598+
* {@value DEFAULT_EAGER_PARSING}.
599+
*
600+
* @param eagerParsing Whether to parse incoming JSON data eagerly before route binding
601+
* @since 4.0.0
602+
*/
603+
public void setEagerParsing(boolean eagerParsing) {
604+
this.eagerParsing = eagerParsing;
605+
}
606+
607+
/**
608+
* Maximum number of buffers to keep around in JSON parsing before they should be consolidated.
609+
* Defaults to {@value #DEFAULT_JSON_BUFFER_MAX_COMPONENTS}.
610+
*
611+
* @return The maximum number of components
612+
* @since 4.0.0
613+
*/
614+
public int getJsonBufferMaxComponents() {
615+
return jsonBufferMaxComponents;
616+
}
617+
618+
619+
/**
620+
* Maximum number of buffers to keep around in JSON parsing before they should be consolidated.
621+
* Defaults to {@value #DEFAULT_JSON_BUFFER_MAX_COMPONENTS}.
622+
*
623+
* @param jsonBufferMaxComponents The maximum number of components
624+
* @since 4.0.0
625+
*/
626+
public void setJsonBufferMaxComponents(int jsonBufferMaxComponents) {
627+
this.jsonBufferMaxComponents = jsonBufferMaxComponents;
628+
}
629+
567630
/**
568631
* Http2 settings.
569632
*/

http-server-netty/src/main/java/io/micronaut/http/server/netty/jackson/JsonContentProcessor.java

Lines changed: 75 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,24 @@
1515
*/
1616
package io.micronaut.http.server.netty.jackson;
1717

18+
import io.micronaut.buffer.netty.NettyByteBufferFactory;
1819
import io.micronaut.core.annotation.Internal;
1920
import io.micronaut.core.async.publisher.Publishers;
20-
import io.micronaut.core.async.subscriber.CompletionAwareSubscriber;
21+
import io.micronaut.core.io.buffer.ByteBuffer;
2122
import io.micronaut.core.type.Argument;
2223
import io.micronaut.http.MediaType;
23-
import io.micronaut.http.server.HttpServerConfiguration;
2424
import io.micronaut.http.server.netty.AbstractHttpContentProcessor;
2525
import io.micronaut.http.server.netty.HttpContentProcessor;
2626
import io.micronaut.http.server.netty.NettyHttpRequest;
27+
import io.micronaut.http.server.netty.configuration.NettyHttpServerConfiguration;
2728
import io.micronaut.json.JsonMapper;
29+
import io.micronaut.json.convert.LazyJsonNode;
2830
import io.micronaut.json.tree.JsonNode;
2931
import io.netty.buffer.ByteBuf;
3032
import io.netty.buffer.ByteBufHolder;
31-
import io.netty.buffer.ByteBufUtil;
32-
import io.netty.util.ReferenceCountUtil;
33-
import org.reactivestreams.Processor;
34-
import org.reactivestreams.Subscription;
33+
import io.netty.buffer.CompositeByteBuf;
3534

35+
import java.io.IOException;
3636
import java.util.Collection;
3737
import java.util.Optional;
3838

@@ -47,9 +47,8 @@
4747
public class JsonContentProcessor extends AbstractHttpContentProcessor {
4848

4949
private final JsonMapper jsonMapper;
50-
private Processor<byte[], JsonNode> jacksonProcessor;
51-
private Collection<Object> out;
52-
private Throwable failure = null;
50+
private final JsonCounter counter = new JsonCounter();
51+
private CompositeByteBuf buffer;
5352

5453
/**
5554
* @param nettyHttpRequest The Netty Http request
@@ -58,111 +57,106 @@ public class JsonContentProcessor extends AbstractHttpContentProcessor {
5857
*/
5958
public JsonContentProcessor(
6059
NettyHttpRequest<?> nettyHttpRequest,
61-
HttpServerConfiguration configuration,
60+
NettyHttpServerConfiguration configuration,
6261
JsonMapper jsonMapper) {
6362
super(nettyHttpRequest, configuration);
6463
this.jsonMapper = jsonMapper;
64+
65+
if (hasContentType(MediaType.APPLICATION_JSON_TYPE)) {
66+
67+
// if the content type is application/json, we can only have one root-level value
68+
counter.noTokenization();
69+
}
6570
}
6671

6772
@Override
6873
public HttpContentProcessor resultType(Argument<?> type) {
69-
boolean streamArray = false;
7074

71-
boolean isJsonStream = nettyHttpRequest.getContentType()
72-
.map(mediaType -> mediaType.equals(MediaType.APPLICATION_JSON_STREAM_TYPE))
73-
.orElse(false);
75+
boolean isJsonStream = hasContentType(MediaType.APPLICATION_JSON_STREAM_TYPE);
7476

7577
if (type != null) {
7678
Class<?> targetType = type.getType();
7779
if (Publishers.isConvertibleToPublisher(targetType) && !Publishers.isSingle(targetType)) {
7880
Optional<Argument<?>> genericArgument = type.getFirstTypeVariable();
7981
if (genericArgument.isPresent() && !Iterable.class.isAssignableFrom(genericArgument.get().getType()) && !isJsonStream) {
8082
// if the generic argument is not a iterable type them stream the array into the publisher
81-
streamArray = true;
83+
counter.unwrapTopLevelArray();
8284
}
8385
}
8486
}
87+
return this;
88+
}
8589

86-
this.jacksonProcessor = jsonMapper.createReactiveParser(p -> {
87-
}, streamArray);
88-
this.jacksonProcessor.subscribe(new CompletionAwareSubscriber<>() {
89-
90-
@Override
91-
protected void doOnSubscribe(Subscription jsonSubscription) {
92-
jsonSubscription.request(Long.MAX_VALUE);
93-
}
94-
95-
@Override
96-
protected void doOnNext(JsonNode message) {
97-
if (out == null) {
98-
throw new IllegalStateException("Concurrent access not allowed");
99-
}
100-
out.add(message);
101-
}
102-
103-
@Override
104-
protected void doOnError(Throwable t) {
105-
if (out == null) {
106-
throw new IllegalStateException("Concurrent access not allowed");
107-
}
108-
failure = t;
109-
}
90+
private boolean hasContentType(MediaType expected) {
91+
Optional<MediaType> actual = nettyHttpRequest.getContentType();
92+
return actual.isPresent() && actual.get().equals(expected);
93+
}
11094

111-
@Override
112-
protected void doOnComplete() {
113-
if (out == null) {
114-
throw new IllegalStateException("Concurrent access not allowed");
115-
}
116-
}
117-
});
118-
this.jacksonProcessor.onSubscribe(new Subscription() {
119-
@Override
120-
public void request(long n) {
95+
@Override
96+
protected void onData(ByteBufHolder message, Collection<Object> out) throws Throwable {
97+
ByteBuf content = message.content();
98+
try {
99+
countLoop(out, content);
100+
} catch (Exception e) {
101+
if (this.buffer != null) {
102+
this.buffer.release();
103+
this.buffer = null;
121104
}
105+
throw e;
106+
} finally {
107+
content.release();
108+
}
109+
}
122110

123-
@Override
124-
public void cancel() {
125-
// happens on error, ignore
111+
private void countLoop(Collection<Object> out, ByteBuf content) throws IOException {
112+
long initialPosition = counter.position();
113+
long bias = initialPosition - content.readerIndex();
114+
while (content.isReadable()) {
115+
counter.feed(content);
116+
JsonCounter.BufferRegion bufferRegion = counter.pollFlushedRegion();
117+
if (bufferRegion != null) {
118+
long start = Math.max(initialPosition, bufferRegion.start());
119+
flush(out, content.retainedSlice(
120+
Math.toIntExact(start - bias),
121+
Math.toIntExact(bufferRegion.end() - start)
122+
));
126123
}
127-
});
128-
return this;
124+
}
125+
if (counter.isBuffering()) {
126+
int currentBufferStart = Math.toIntExact(Math.max(initialPosition, counter.bufferStart()) - bias);
127+
bufferForNextRun(content.retainedSlice(currentBufferStart, content.writerIndex() - currentBufferStart));
128+
}
129129
}
130130

131-
@Override
132-
protected void onData(ByteBufHolder message, Collection<Object> out) throws Throwable {
133-
if (jacksonProcessor == null) {
134-
resultType(null);
131+
private void bufferForNextRun(ByteBuf buffer) {
132+
if (this.buffer == null) {
133+
// number of components should not be too small to avoid unnecessary consolidation
134+
this.buffer = buffer.alloc().compositeBuffer(((NettyHttpServerConfiguration) configuration).getJsonBufferMaxComponents());
135135
}
136+
this.buffer.addComponent(true, buffer);
137+
}
136138

137-
this.out = out;
138-
ByteBuf content = message.content();
139-
try {
140-
byte[] bytes = ByteBufUtil.getBytes(content);
141-
jacksonProcessor.onNext(bytes);
142-
} finally {
143-
ReferenceCountUtil.release(content);
144-
this.out = null;
139+
private void flush(Collection<Object> out, ByteBuf completedNode) throws IOException {
140+
if (this.buffer != null) {
141+
completedNode = completedNode == null ? this.buffer : this.buffer.addComponent(true, completedNode);
142+
this.buffer = null;
145143
}
146-
Throwable f = failure;
147-
if (f != null) {
148-
failure = null;
149-
throw f;
144+
ByteBuffer<ByteBuf> wrapped = NettyByteBufferFactory.DEFAULT.wrap(completedNode);
145+
if (((NettyHttpServerConfiguration) configuration).isEagerParsing()) {
146+
try {
147+
out.add(jsonMapper.readValue(wrapped, Argument.of(JsonNode.class)));
148+
} finally {
149+
completedNode.release();
150+
}
151+
} else {
152+
out.add(new LazyJsonNode(wrapped));
150153
}
151154
}
152155

153156
@Override
154157
public void complete(Collection<Object> out) throws Throwable {
155-
if (jacksonProcessor == null) {
156-
resultType(null);
157-
}
158-
159-
this.out = out;
160-
jacksonProcessor.onComplete();
161-
this.out = null;
162-
Throwable f = failure;
163-
if (f != null) {
164-
failure = null;
165-
throw f;
158+
if (this.buffer != null) {
159+
flush(out, null);
166160
}
167161
}
168162
}

0 commit comments

Comments
 (0)