Skip to content

Commit 596d3e5

Browse files
authored
Fix AzureSeekableByteChannel bugs related to buffer/channel boundaries (Azure#24723)
* AzureSeekableByteChannel - fix off-by-one & return -1 (EOF) when position() >= size() * AzureSeekableByteChannel - read() must respect ByteBuffer.position() not assume 0 * AzureSeekableByteChannel - write() must respect ByteBuffer.position() and .limit() * AzureSeekableByteChannel - fix test Groovy code compile failure on Java 8 * AzureSeekableByteChannel - attempt to fix Groovy test MissingMethodException Error reported only by Azure Pipelines `windows2019_117_surefiretest` build, was: [ERROR] Read respect dest buffer pos Time elapsed: 0.019 s <<< ERROR! groovy.lang.MissingMethodException: No signature of method: [B.clone() is applicable for argument types: () values: [] Possible solutions: count(java.lang.Object), collect(), collect(groovy.lang.Closure), collect(java.util.Collection, groovy.lang.Closure), toSet(), size() at com.azure.storage.blob.nio.AzureSeekableByteChannelTest.Read respect dest buffer pos(AzureSeekableByteChannelTest.groovy:101) Line 101 was: def destArray = randArray.clone() Other build checks using other platforms/versions pass, and cannot reproduce locally. Changed to an explicit `new` followed by an `arraycopy`. * AzureSeekableByteChannel - Added comments for requested clarification
1 parent 111df7e commit 596d3e5

File tree

6 files changed

+1252
-27
lines changed

6 files changed

+1252
-27
lines changed

sdk/storage/azure-storage-blob-nio/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,12 @@
131131
<version>1.7.4</version> <!-- {x-version-update;com.azure:azure-core-http-okhttp;dependency} -->
132132
<scope>test</scope>
133133
</dependency>
134+
<dependency>
135+
<groupId>org.mockito</groupId>
136+
<artifactId>mockito-core</artifactId>
137+
<version>3.12.4</version> <!-- {x-version-update;org.mockito:mockito-core;external_dependency} -->
138+
<scope>test</scope>
139+
</dependency>
134140
</dependencies>
135141

136142
<build>

sdk/storage/azure-storage-blob-nio/src/main/java/com/azure/storage/blob/nio/AzureSeekableByteChannel.java

Lines changed: 41 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -63,42 +63,49 @@ public int read(ByteBuffer dst) throws IOException {
6363
validateOpen();
6464
validateReadMode();
6565

66-
// See comments in position()
67-
if (this.position > this.size()) {
68-
return -1;
66+
// See comments in position(), remember that position is 0-based and size() is exclusive
67+
if (this.position >= this.size()) {
68+
return -1; // at or past EOF
6969
}
7070

71-
int count = 0;
72-
73-
int len = dst.remaining();
74-
byte[] buf;
75-
7671
// If the buffer is backed by an array, we can write directly to that instead of allocating new memory.
72+
int pos;
73+
final int limit;
74+
final byte[] buf;
7775
if (dst.hasArray()) {
76+
// ByteBuffer has a position and limit that define the bounds of the writeable area, and that
77+
// area can be both smaller than the backing array and might not begin at array index 0.
78+
pos = dst.position();
79+
limit = pos + dst.remaining();
7880
buf = dst.array();
7981
} else {
80-
buf = new byte[len];
82+
pos = 0;
83+
limit = dst.remaining();
84+
buf = new byte[limit];
8185
}
8286

83-
while (count < len) {
84-
int retCount = this.reader.read(buf, count, len - count);
85-
if (retCount == -1) {
87+
while (pos < limit) {
88+
int byteCount = this.reader.read(buf, pos, limit - pos);
89+
if (byteCount == -1) {
8690
break;
8791
}
88-
count += retCount;
92+
pos += byteCount;
8993
}
9094

9195
/*
9296
Either write to the destination if we had to buffer separately or just set the position correctly if we wrote
9397
underneath the buffer
9498
*/
95-
if (!dst.hasArray()) {
96-
dst.put(buf, 0, count);
99+
int count;
100+
if (dst.hasArray()) {
101+
count = pos - dst.position();
102+
dst.position(pos);
97103
} else {
98-
dst.position(dst.position() + count); //
104+
count = pos; // original position was 0
105+
dst.put(buf, 0, count);
99106
}
100-
this.position += count;
101107

108+
this.position += count;
102109
return count;
103110
}
104111

@@ -108,25 +115,30 @@ public int write(ByteBuffer src) throws IOException {
108115
validateOpen();
109116
validateWriteMode();
110117

111-
int length = src.remaining();
112-
113-
this.position += src.remaining();
118+
final int length = src.remaining();
119+
this.position += length;
114120

115121
/*
116122
If the buffer is backed by an array, we can read directly from that instead of allocating new memory.
117-
118123
Set the position correctly if we read from underneath the buffer
119124
*/
125+
int pos;
120126
byte[] buf;
121127
if (src.hasArray()) {
128+
// ByteBuffer has a position and limit that define the bounds of the readable area, and that
129+
// area can be both smaller than the backing array and might not begin at array index 0.
130+
pos = src.position();
122131
buf = src.array();
123-
src.position(src.position() + length);
132+
src.position(pos + length);
124133
} else {
134+
pos = 0;
125135
buf = new byte[length];
126-
src.get(buf);
136+
src.get(buf); // advances src.position()
127137
}
128-
this.writer.write(buf);
129-
138+
// Either way, the src.position() and this.position have been updated before we know if this write
139+
// will succeed. (Original behavior.) It may be better to update position(s) only *after* success,
140+
// but then on IOException would we know if there was a partial write, and if so how much?
141+
this.writer.write(buf, pos, length);
130142
return length;
131143
}
132144

@@ -209,6 +221,10 @@ public void close() throws IOException {
209221
this.closed = true;
210222
}
211223

224+
Path getPath() {
225+
return this.path;
226+
}
227+
212228
private void validateOpen() throws ClosedChannelException {
213229
if (this.closed) {
214230
throw LoggingUtility.logError(logger, new ClosedChannelException());

sdk/storage/azure-storage-blob-nio/src/test/java/com/azure/storage/blob/nio/AzureSeekableByteChannelTest.groovy

Lines changed: 122 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44
package com.azure.storage.blob.nio;
55

66
import com.azure.storage.blob.BlobClient
7+
import com.azure.storage.blob.specialized.BlobOutputStream
8+
import org.mockito.Answers
9+
import org.mockito.Mockito
710
import spock.lang.Unroll
811

912
import java.nio.ByteBuffer
@@ -63,6 +66,68 @@ class AzureSeekableByteChannelTest extends APISpec {
6366
os.toByteArray() == fileContent
6467
}
6568

69+
def "Read loop until EOF"() {
70+
setup:
71+
def fileContent = new byte[sourceFileSize]
72+
fileStream.read(fileContent)
73+
def os = new ByteArrayOutputStream(sourceFileSize)
74+
def rand = new Random()
75+
long timeLimit = System.currentTimeMillis() + 60_000 // fail if test runs >= 1 minute
76+
77+
when:
78+
while (System.currentTimeMillis() < timeLimit) { // ensures test duration is bounded
79+
def buffer = ByteBuffer.allocate(rand.nextInt(1024 * 1024))
80+
int readAmount = readByteChannel.read(buffer)
81+
if (readAmount == -1) {
82+
break; // reached EOF
83+
}
84+
os.write(buffer.array(), 0, readAmount) // limit the write in case we allocated more than we needed
85+
}
86+
87+
then:
88+
os.toByteArray() == fileContent
89+
System.currentTimeMillis() < timeLimit // else potential inf. loop if read() always returns 0
90+
}
91+
92+
def "Read respect dest buffer pos"() {
93+
setup:
94+
def fileContent = new byte[sourceFileSize]
95+
fileStream.read(fileContent)
96+
97+
def rand = new Random()
98+
int initialOffset = rand.nextInt(512) + 1 // always > 0
99+
byte[] randArray = new byte[2 * initialOffset + sourceFileSize]
100+
rand.nextBytes(randArray) // fill with random bytes
101+
102+
// copy same random bytes, but in this copy some will eventually be overwritten by read()
103+
byte[] destArray = new byte[randArray.length]
104+
System.arraycopy(randArray, 0, destArray, 0, randArray.length)
105+
def dest = ByteBuffer.wrap(destArray)
106+
dest.position(initialOffset) // will have capacity on either side that should not be touched
107+
108+
when:
109+
int readAmount = 0;
110+
while (readAmount != -1) {
111+
assert dest.position() != 0
112+
readAmount = readByteChannel.read(dest) // backed by an array, but position != 0
113+
}
114+
115+
then:
116+
dest.position() == initialOffset + sourceFileSize
117+
compareInputStreams( // destination content should match file content at initial read position
118+
new ByteArrayInputStream(destArray, initialOffset, sourceFileSize),
119+
new ByteArrayInputStream(fileContent),
120+
sourceFileSize)
121+
compareInputStreams( // destination content should be untouched prior to initial position
122+
new ByteArrayInputStream(destArray, 0, initialOffset),
123+
new ByteArrayInputStream(randArray, 0, initialOffset),
124+
initialOffset)
125+
compareInputStreams( // destination content should be untouched past end of read
126+
new ByteArrayInputStream(destArray, initialOffset + sourceFileSize, initialOffset),
127+
new ByteArrayInputStream(randArray, initialOffset + sourceFileSize, initialOffset),
128+
initialOffset)
129+
}
130+
66131
def "Read fs close"() {
67132
when:
68133
fs.close()
@@ -96,6 +161,61 @@ class AzureSeekableByteChannelTest extends APISpec {
96161
compareInputStreams(writeBc.openInputStream(), new ByteArrayInputStream(fileContent), sourceFileSize)
97162
}
98163

164+
def "Write respect src buffer pos"() {
165+
setup:
166+
def rand = new Random()
167+
int initialOffset = rand.nextInt(512) + 1 // always > 0
168+
def srcBufferContent = new byte[2 * initialOffset + sourceFileSize]
169+
rand.nextBytes(srcBufferContent) // fill with random bytes
170+
171+
def fileContent = new byte[sourceFileSize]
172+
fileStream.read(fileContent)
173+
174+
// place expected file content into source buffer at random location, retain other random bytes
175+
System.arraycopy(fileContent, 0, srcBufferContent, initialOffset, sourceFileSize)
176+
def srcBuffer = ByteBuffer.wrap(srcBufferContent)
177+
srcBuffer.position(initialOffset)
178+
srcBuffer.limit(initialOffset + sourceFileSize)
179+
180+
// This test aims to observe the actual bytes written by the ByteChannel to the underlying OutputStream,
181+
// not just the number of bytes allegedly written as reported by its position. It would prefer to examine
182+
// the OutputStream directly, but the channel requires the specific NioBlobOutputStream implementation
183+
// and does not accept something generic like a ByteArrayOutputStream. NioBlobOutputStream is final, so
184+
// it cannot be subclassed or mocked and has little state of its own -- writes go to a BlobOutputStream.
185+
// That class is abstract, but its constructor is not accessible outside its package and cannot normally
186+
// be subclassed to provide custom behavior, but a runtime mocking framework like Mockito can. This is
187+
// the nearest accessible observation point, so the test mocks a BlobOutputStream such that all write
188+
// methods store data in ByteArrayOutputStream which it can later examine for its size and content.
189+
def actualOutput = new ByteArrayOutputStream(sourceFileSize)
190+
def blobOutputStream = Mockito.mock(
191+
BlobOutputStream.class, Mockito.withSettings().useConstructor(4096 /* block size */))
192+
Mockito.doAnswer( { invoked -> actualOutput.write(invoked.getArgument(0)) } )
193+
.when(blobOutputStream).write(Mockito.anyInt())
194+
Mockito.doAnswer( { invoked -> actualOutput.writeBytes(invoked.getArgument(0)) } )
195+
.when(blobOutputStream).write(Mockito.any(byte[].class))
196+
Mockito.doAnswer( { invoked -> actualOutput.write(
197+
invoked.getArgument(0), invoked.getArgument(1), invoked.getArgument(2)) } )
198+
.when(blobOutputStream).write(Mockito.any(byte[].class), Mockito.anyInt(), Mockito.anyInt())
199+
def path = writeByteChannel.getPath()
200+
writeByteChannel = new AzureSeekableByteChannel(new NioBlobOutputStream(blobOutputStream, path), path)
201+
202+
when:
203+
int written = 0
204+
while (written < sourceFileSize) {
205+
written += writeByteChannel.write(srcBuffer)
206+
}
207+
writeByteChannel.close()
208+
209+
then:
210+
srcBuffer.position() == initialOffset + sourceFileSize // src buffer position SHOULD be updated
211+
srcBuffer.limit() == srcBuffer.position() // limit SHOULD be unchanged (still at end of content)
212+
// the above report back to the caller, but this verifies the correct bytes are going to the blob:
213+
compareInputStreams(
214+
new ByteArrayInputStream(actualOutput.toByteArray()),
215+
new ByteArrayInputStream(fileContent),
216+
sourceFileSize)
217+
}
218+
99219
def "Write fs close"() {
100220
when:
101221
fs.close()
@@ -219,10 +339,10 @@ class AzureSeekableByteChannelTest extends APISpec {
219339
thrown(IllegalArgumentException)
220340

221341
when:
222-
readByteChannel.position(sourceFileSize + 1)
342+
readByteChannel.position(sourceFileSize) // position is 0-based, so seeking to size --> EOF
223343

224344
then:
225-
readByteChannel.read(ByteBuffer.allocate(1)) == -1 // Seeking past the end and then reading should indicate EOF
345+
readByteChannel.read(ByteBuffer.allocate(1)) == -1 // Seeking to the end and then reading should indicate EOF
226346
}
227347

228348
def "Seek fs close"() {

0 commit comments

Comments
 (0)