From 1b3e7ed08f8e4217dbbc451a18d870694ba2e2aa Mon Sep 17 00:00:00 2001 From: skashin Date: Fri, 22 Feb 2019 11:44:25 -0500 Subject: [PATCH] - Fixed a bug in the method getBytesInBufferAvailable() - Added explicit abort() calls on the S3ObjectInputStream, to avoid WARN messages "S3AbortableInputStream - Not all bytes were read from the S3ObjectInputStream..." --- .../s3fs/S3ReadOnlySeekableByteChannel.java | 24 +++++++++++++++---- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/upplication/s3fs/S3ReadOnlySeekableByteChannel.java b/src/main/java/com/upplication/s3fs/S3ReadOnlySeekableByteChannel.java index 149145a..190c4d4 100644 --- a/src/main/java/com/upplication/s3fs/S3ReadOnlySeekableByteChannel.java +++ b/src/main/java/com/upplication/s3fs/S3ReadOnlySeekableByteChannel.java @@ -2,6 +2,7 @@ import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectInputStream; import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; @@ -27,7 +28,7 @@ public class S3ReadOnlySeekableByteChannel implements SeekableByteChannel { private S3Path path; private Set options; private long length; - private ExtBufferedInputStream bufferedStream; + private ExtBufferedInputStream bufferedStream = null; private ReadableByteChannel rbc; private long position = 0; @@ -72,6 +73,9 @@ public S3ReadOnlySeekableByteChannel(S3Path path, Set opti } private void openStreamAt(long position) throws IOException { + if (bufferedStream != null) { + bufferedStream.getObjectStream().abort(); + } if (rbc != null) { rbc.close(); } @@ -143,18 +147,28 @@ public long size() { } public void close() throws IOException { + if (bufferedStream != null) { + bufferedStream.getObjectStream().abort(); + } rbc.close(); } private class ExtBufferedInputStream extends BufferedInputStream { - private ExtBufferedInputStream(final InputStream inputStream, final int size) { - super(inputStream, size); + S3ObjectInputStream objectStream; + + private ExtBufferedInputStream(final S3ObjectInputStream objectStream, final int size) { + super(objectStream, size); + + this.objectStream = objectStream; + } + + private S3ObjectInputStream getObjectStream() { + return this.objectStream ; } /** Returns the number of bytes that can be read from the buffer without reading more into the buffer. */ int getBytesInBufferAvailable() { - if (this.count == this.pos) return 0; - else return this.buf.length - this.pos; + return (this.count - this.pos); } } }