Skip to content

Commit

Permalink
Adds the InputStreamInterceptor interface to allow users to plug in c…
Browse files Browse the repository at this point in the history
…ustom interceptors for formats like Zstd. (#930)

* Adds a StreamInterceptor interface to allow users to plug in custom interceptors for formats like Zstd.

* Adds support for detection of interceptors on the classpath; renames StreamInterceptor to InputStreamInterceptor.

* Addresses PR feedback on the addition of InputStreamInterceptor.

* Adds support for detecting format headers from InputStreams that require multiple read() calls to produce enough bytes to fill a header.

* Addresses more PR feedback.

* Uses static initialization of stream interceptors instead of caching.

* Makes manually added stream interceptors follow detected stream interceptors, rather than replacing them.

* Renames InputStreamInterceptor methods to avoid referring to 'headers'.

* Minor cleanups.
  • Loading branch information
tgregg authored Jan 22, 2025
1 parent 66acb7a commit 4ccee4d
Show file tree
Hide file tree
Showing 9 changed files with 1,204 additions and 583 deletions.
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ dependencies {
testImplementation("org.hamcrest:hamcrest:2.2")
testImplementation("pl.pragmatists:JUnitParams:1.1.1")
testImplementation("com.google.code.tempus-fugit:tempus-fugit:1.1")
testImplementation("com.github.luben:zstd-jni:1.5.6-5")
}

group = "com.amazon.ion"
Expand Down
946 changes: 454 additions & 492 deletions config/spotbugs/baseline.xml

Large diffs are not rendered by default.

142 changes: 98 additions & 44 deletions src/main/java/com/amazon/ion/impl/_Private_IonReaderBuilder.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package com.amazon.ion.impl;

import com.amazon.ion.IonCatalog;
import com.amazon.ion.IonException;
import com.amazon.ion.IonReader;
import com.amazon.ion.IonTextReader;
import com.amazon.ion.IonValue;
import com.amazon.ion.util.InputStreamInterceptor;
import com.amazon.ion.system.IonReaderBuilder;
import com.amazon.ion.util.IonStreamUtils;

Expand All @@ -16,7 +16,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.util.zip.GZIPInputStream;
import java.util.Collections;
import java.util.List;

import static com.amazon.ion.impl.LocalSymbolTable.DEFAULT_LST_FACTORY;
import static com.amazon.ion.impl._Private_IonReaderFactory.makeReader;
Expand Down Expand Up @@ -99,7 +100,7 @@ protected void mutationCheck() {
* The TwoElementSequenceInputStream allows the second delegate InputStream to return valid data if it subsequently
* receives more data, which is common when performing continuable reads.
*/
private static final class TwoElementSequenceInputStream extends InputStream {
private static final class TwoElementInputStream extends InputStream {

/**
* The first InputStream in the sequence.
Expand All @@ -121,7 +122,7 @@ private static final class TwoElementSequenceInputStream extends InputStream {
* @param first first InputStream in the sequence.
* @param last last InputStream in the sequence.
*/
private TwoElementSequenceInputStream(final InputStream first, final InputStream last) {
private TwoElementInputStream(final InputStream first, final InputStream last) {
this.first = first;
this.last = last;
this.in = first;
Expand Down Expand Up @@ -192,6 +193,18 @@ interface IonReaderFromBytesFactoryBinary {
IonReader makeReader(_Private_IonReaderBuilder builder, byte[] ionData, int offset, int length);
}

private static void validateHeaderLength(int maxHeaderLength) {
if (maxHeaderLength > _Private_IonConstants.ARRAY_MAXIMUM_SIZE) {
// Note: we could choose an arbitrary limit lower than this. The purpose at this point is to avoid OOM
// in the case where Java cannot allocate an array of the requested size.
throw new IonException(String.format(
"The maximum header length %d exceeds the maximum array size %d.",
maxHeaderLength,
_Private_IonConstants.ARRAY_MAXIMUM_SIZE
));
}
}

static IonReader buildReader(
_Private_IonReaderBuilder builder,
byte[] ionData,
Expand All @@ -200,16 +213,29 @@ static IonReader buildReader(
IonReaderFromBytesFactoryBinary binary,
IonReaderFromBytesFactoryText text
) {
if (IonStreamUtils.isGzip(ionData, offset, length)) {
try {
return buildReader(
builder,
new GZIPInputStream(new ByteArrayInputStream(ionData, offset, length)),
_Private_IonReaderFactory::makeReaderBinary,
_Private_IonReaderFactory::makeReaderText
);
} catch (IOException e) {
throw new IonException(e);
List<InputStreamInterceptor> streamInterceptors = builder.getInputStreamInterceptors();
for (InputStreamInterceptor streamInterceptor : streamInterceptors) {
int headerLength = streamInterceptor.numberOfBytesNeededToDetermineMatch();
validateHeaderLength(headerLength);
if (length < headerLength) {
continue;
}
if (streamInterceptor.isMatch(ionData, offset, length)) {
try {
return buildReader(
builder,
streamInterceptor.newInputStream(new ByteArrayInputStream(ionData, offset, length)),
_Private_IonReaderFactory::makeReaderBinary,
_Private_IonReaderFactory::makeReaderText,
// The builder provides only one level of detection, e.g. GZIP-compressed binary Ion *or*
// zstd-compressed binary Ion; *not* GZIP-compressed zstd-compressed binary Ion. Users that
// need to intercept multiple format layers can provide a custom InputStreamInterceptor to
// achieve this.
/*inputStreamInterceptors=*/ Collections.emptyList()
);
} catch (IOException e) {
throw new IonException(e);
}
}
}
if (IonStreamUtils.isIonBinary(ionData, offset, length)) {
Expand Down Expand Up @@ -247,15 +273,6 @@ private static boolean startsWithIvm(byte[] buffer, int length) {
return true;
}

static final byte[] GZIP_HEADER = {0x1F, (byte) 0x8B};

private static boolean startsWithGzipHeader(byte[] buffer, int length) {
if (length >= GZIP_HEADER.length) {
return buffer[0] == GZIP_HEADER[0] && buffer[1] == GZIP_HEADER[1];
}
return false;
}

@FunctionalInterface
interface IonReaderFromInputStreamFactoryText {
IonReader makeReader(IonCatalog catalog, InputStream source, _Private_LocalSymbolTableFactory lstFactory);
Expand All @@ -266,27 +283,62 @@ interface IonReaderFromInputStreamFactoryBinary {
IonReader makeReader(_Private_IonReaderBuilder builder, InputStream source, byte[] alreadyRead, int alreadyReadOff, int alreadyReadLen);
}

/**
* Reads from the given source into the given byte array, stopping once either
* <ol>
* <li>`length` bytes have been read, or</li>
* <li>the end of the source stream has been reached, or</li>
* <li>the source stream throws an exception.</li>
* </ol>
* @param source the source of the bytes to read.
* @param destination the destination for the bytes read.
* @param length the number of bytes to attempt to read.
* @return the number of bytes read into `destination`.
*/
private static int fillToLengthOrStreamEnd(InputStream source, byte[] destination, int length) {
int bytesRead = 0;
while (bytesRead < length) {
int bytesToRead = length - bytesRead;
int bytesReadThisIteration;
try {
bytesReadThisIteration = source.read(destination, bytesRead, bytesToRead);
} catch (EOFException e) {
// Some InputStream implementations throw EOFException in certain cases to convey
// that the end of the stream has been reached.
break;
} catch (IOException e) {
throw new IonException(e);
}
if (bytesReadThisIteration < 0) { // This indicates the end of the stream.
break;
}
bytesRead += bytesReadThisIteration;
}
return bytesRead;
}

static IonReader buildReader(
_Private_IonReaderBuilder builder,
InputStream source,
IonReaderFromInputStreamFactoryBinary binary,
IonReaderFromInputStreamFactoryText text
IonReaderFromInputStreamFactoryText text,
List<InputStreamInterceptor> inputStreamInterceptors
) {
if (source == null) {
throw new NullPointerException("Cannot build a reader from a null InputStream.");
}
int maxHeaderLength = Math.max(
_Private_IonConstants.BINARY_VERSION_MARKER_SIZE,
inputStreamInterceptors.stream().mapToInt(InputStreamInterceptor::numberOfBytesNeededToDetermineMatch).max().orElse(0)
);
validateHeaderLength(maxHeaderLength);
// Note: this can create a lot of layers of InputStream wrappers. For example, if this method is called
// from build(byte[]) and the bytes contain GZIP, the chain will be SequenceInputStream(ByteArrayInputStream,
// GZIPInputStream -> PushbackInputStream -> ByteArrayInputStream). If this creates a drag on efficiency,
// alternatives should be evaluated.
byte[] possibleIVM = new byte[_Private_IonConstants.BINARY_VERSION_MARKER_SIZE];
byte[] possibleIVM = new byte[maxHeaderLength];
InputStream ionData = source;
int bytesRead;
try {
bytesRead = ionData.read(possibleIVM);
} catch (IOException e) {
throw new IonException(e);
}
int bytesRead = fillToLengthOrStreamEnd(ionData, possibleIVM, maxHeaderLength);
// If the input stream is growing, it is possible that fewer than BINARY_VERSION_MARKER_SIZE bytes are
// available yet. Simply check whether the stream *could* contain binary Ion based on the available bytes.
// If it can't, fall back to text.
Expand All @@ -296,27 +348,28 @@ static IonReader buildReader(
// stream will always be empty (in which case it doesn't matter whether a text or binary reader is used)
// or it's a binary stream (in which case the correct reader was created) or it's a growing text stream
// (which has always been unsupported).
if (startsWithGzipHeader(possibleIVM, bytesRead)) {
try {
ionData = new GZIPInputStream(
new TwoElementSequenceInputStream(new ByteArrayInputStream(possibleIVM, 0, bytesRead), ionData)
);
for (InputStreamInterceptor streamInterceptor : inputStreamInterceptors) {
if (bytesRead < streamInterceptor.numberOfBytesNeededToDetermineMatch()) {
continue;
}
if (streamInterceptor.isMatch(possibleIVM, 0, bytesRead)) {
try {
bytesRead = ionData.read(possibleIVM);
} catch (EOFException e) {
// Only a GZIP header was available, so this may be a binary Ion stream.
bytesRead = 0;
ionData = streamInterceptor.newInputStream(
new TwoElementInputStream(new ByteArrayInputStream(possibleIVM, 0, bytesRead), ionData)
);
} catch (IOException e) {
throw new IonException(e);
}
} catch (IOException e) {
throw new IonException(e);
bytesRead = fillToLengthOrStreamEnd(ionData, possibleIVM, _Private_IonConstants.BINARY_VERSION_MARKER_SIZE);
break;
}
}
if (startsWithIvm(possibleIVM, bytesRead)) {
return binary.makeReader(builder, ionData, possibleIVM, 0, bytesRead);
}
InputStream wrapper;
if (bytesRead > 0) {
wrapper = new TwoElementSequenceInputStream(
wrapper = new TwoElementInputStream(
new ByteArrayInputStream(possibleIVM, 0, bytesRead),
ionData
);
Expand All @@ -333,7 +386,8 @@ public IonReader build(InputStream source)
this,
source,
_Private_IonReaderFactory::makeReaderBinary,
_Private_IonReaderFactory::makeReaderText
_Private_IonReaderFactory::makeReaderText,
getInputStreamInterceptors()
);
}

Expand Down
23 changes: 6 additions & 17 deletions src/main/java/com/amazon/ion/impl/_Private_IonReaderFactory.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,5 @@
/*
* Copyright 2007-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package com.amazon.ion.impl;

import static com.amazon.ion.impl.UnifiedInputStreamX.makeStream;
Expand Down Expand Up @@ -105,11 +92,13 @@ public static final IonReader makeReaderText(IonCatalog catalog,

public static IonReader makeSystemReaderText(InputStream is)
{
_Private_IonReaderBuilder builder = (_Private_IonReaderBuilder) _Private_IonReaderBuilder.standard();
return _Private_IonReaderBuilder.buildReader(
(_Private_IonReaderBuilder) _Private_IonReaderBuilder.standard(),
builder,
is,
_Private_IonReaderFactory::makeSystemReaderBinary,
_Private_IonReaderFactory::makeSystemReaderText
_Private_IonReaderFactory::makeSystemReaderText,
builder.getInputStreamInterceptors()
);
}

Expand Down
Loading

0 comments on commit 4ccee4d

Please sign in to comment.