Skip to content

Commit a0e78bc

Browse files
authored
Reduce Buffering in SDKs (Azure#26821)
Reduce Buffering in SDKs
1 parent 2c6c8ed commit a0e78bc

File tree

24 files changed

+320
-96
lines changed

24 files changed

+320
-96
lines changed

eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,7 @@
396396
<Class name="com.azure.core.util.FluxUtil"/>
397397
<Class name="com.azure.core.util.FluxUtil$1"/>
398398
<Class name="com.azure.core.implementation.FileWriteSubscriber"/>
399+
<Class name="com.azure.core.implementation.OutputStreamWriteSubscriber"/>
399400
</Or>
400401
<Bug pattern="UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"/>
401402
</Match>

eng/versioning/version_client.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,8 @@ com.azure.tools:azure-sdk-archetype;1.0.0;1.0.0
345345
# In the pom, the version update tag after the version should name the unreleased package and the dependency version:
346346
# <!-- {x-version-update;unreleased_com.azure:azure-core;dependency} -->
347347

348+
unreleased_com.azure:azure-core;1.26.0-beta.1
349+
348350
# Released Beta dependencies: Copy the entry from above, prepend "beta_", remove the current
349351
# version and set the version to the released beta. Released beta dependencies are only valid
350352
# for dependency versions. These entries are specifically for when we've released a beta for

sdk/communication/azure-communication-callingserver/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
<dependency>
5454
<groupId>com.azure</groupId>
5555
<artifactId>azure-core</artifactId>
56-
<version>1.25.0</version> <!-- {x-version-update;com.azure:azure-core;dependency} -->
56+
<version>1.26.0-beta.1</version> <!-- {x-version-update;unreleased_com.azure:azure-core;dependency} -->
5757
</dependency>
5858
<dependency>
5959
<groupId>com.azure</groupId>

sdk/communication/azure-communication-callingserver/src/main/java/com/azure/communication/callingserver/ContentDownloader.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import com.azure.core.util.Context;
1616
import com.azure.core.util.FluxUtil;
1717
import com.azure.core.util.logging.ClientLogger;
18-
import reactor.core.Exceptions;
1918
import reactor.core.publisher.Flux;
2019
import reactor.core.publisher.Mono;
2120
import reactor.core.publisher.SignalType;
@@ -54,15 +53,9 @@ Mono<Response<Void>> downloadToStreamWithResponse(
5453
HttpRange httpRange,
5554
Context context) {
5655
return downloadStreamWithResponse(sourceEndpoint, httpRange, context)
57-
.flatMap(response -> response.getValue().reduce(destinationStream, (outputStream, buffer) -> {
58-
try {
59-
outputStream.write(FluxUtil.byteBufferToArray(buffer));
60-
return outputStream;
61-
} catch (IOException ex) {
62-
throw logger.logExceptionAsError(Exceptions.propagate(new UncheckedIOException(ex)));
63-
}
64-
}).thenReturn(new SimpleResponse<>(response.getRequest(), response.getStatusCode(),
65-
response.getHeaders(), null)));
56+
.flatMap(response -> FluxUtil.writeToOutputStream(response.getValue(), destinationStream)
57+
.thenReturn(new SimpleResponse<>(response.getRequest(), response.getStatusCode(),
58+
response.getHeaders(), null)));
6659
}
6760

6861
Mono<Response<Flux<ByteBuffer>>> downloadStreamWithResponse(

sdk/communication/azure-communication-callingserver/src/test/java/com/azure/communication/callingserver/DownloadContentLiveTests.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
import java.io.ByteArrayOutputStream;
1515
import java.io.IOException;
16+
import java.io.OutputStream;
1617
import java.io.UncheckedIOException;
1718
import java.io.UnsupportedEncodingException;
1819
import java.nio.charset.StandardCharsets;
@@ -130,12 +131,12 @@ public void downloadContentStreamFailure(HttpClient httpClient) throws IOExcepti
130131
CallingServerClientBuilder builder = getCallingServerClientUsingConnectionString(httpClient);
131132
CallingServerClient conversationClient = setupClient(builder, "downloadContent404");
132133

133-
ByteArrayOutputStream byteArrayOutputStream = Mockito.mock(ByteArrayOutputStream.class);
134-
doThrow(IOException.class).when(byteArrayOutputStream).write(Mockito.any());
134+
OutputStream outputStream = Mockito.mock(OutputStream.class);
135+
doThrow(IOException.class).when(outputStream).write(Mockito.any(), Mockito.anyInt(), Mockito.anyInt());
135136
assertThrows(
136137
UncheckedIOException.class,
137138
() -> conversationClient
138-
.downloadTo(METADATA_URL, byteArrayOutputStream, null));
139+
.downloadTo(METADATA_URL, outputStream, null));
139140
}
140141

141142
private CallingServerClient setupClient(CallingServerClientBuilder builder, String testName) {

sdk/core/azure-core/src/main/java/com/azure/core/implementation/ImplUtils.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,12 @@
66
import com.azure.core.http.HttpHeaders;
77
import com.azure.core.util.CoreUtils;
88
import com.azure.core.util.DateTimeRfc1123;
9+
import com.azure.core.util.FluxUtil;
910

11+
import java.io.FileOutputStream;
12+
import java.io.IOException;
13+
import java.io.OutputStream;
14+
import java.nio.ByteBuffer;
1015
import java.time.DateTimeException;
1116
import java.time.Duration;
1217
import java.time.OffsetDateTime;
@@ -94,6 +99,47 @@ private static long tryParseLong(String value) {
9499
}
95100
}
96101

102+
/**
103+
* Writes a {@link ByteBuffer} into an {@link OutputStream}.
104+
* <p>
105+
* This method provides writing optimization based on the type of {@link ByteBuffer} and {@link OutputStream}
106+
* passed. For example, if the {@link ByteBuffer} has a backing {@code byte[]} this method will access that directly
107+
* to write to the {@code stream} instead of buffering the contents of the {@link ByteBuffer} into a temporary
108+
* buffer.
109+
*
110+
* @param buffer The {@link ByteBuffer} to write into the {@code stream}.
111+
* @param stream The {@link OutputStream} where the {@code buffer} will be written.
112+
* @throws IOException If an I/O occurs while writing the {@code buffer} into the {@code stream}.
113+
*/
114+
public static void writeByteBufferToStream(ByteBuffer buffer, OutputStream stream) throws IOException {
115+
// First check if the buffer has a backing byte[]. The backing byte[] can be accessed directly and written
116+
// without an additional buffering byte[].
117+
if (buffer.hasArray()) {
118+
// Write the byte[] from the current view position to the length remaining in the view.
119+
stream.write(buffer.array(), buffer.position(), buffer.remaining());
120+
121+
// Update the position of the ByteBuffer to treat this the same as getting from the buffer.
122+
buffer.position(buffer.position() + buffer.remaining());
123+
return;
124+
}
125+
126+
// Next begin checking for specific instances of OutputStream that may provide better writing options for
127+
// direct ByteBuffers.
128+
if (stream instanceof FileOutputStream) {
129+
FileOutputStream fileOutputStream = (FileOutputStream) stream;
130+
131+
// Writing to the FileChannel directly may provide native optimizations for moving the OS managed memory
132+
// into the file.
133+
// Write will move both the OutputStream's and ByteBuffer's position so there is no need to perform
134+
// additional updates that are required when using the backing array.
135+
fileOutputStream.getChannel().write(buffer);
136+
return;
137+
}
138+
139+
// All optimizations have been exhausted, fallback to buffering write.
140+
stream.write(FluxUtil.byteBufferToArray(buffer));
141+
}
142+
97143
private ImplUtils() {
98144
}
99145
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
package com.azure.core.implementation;
4+
5+
import com.azure.core.util.logging.ClientLogger;
6+
import org.reactivestreams.Subscriber;
7+
import org.reactivestreams.Subscription;
8+
import reactor.core.publisher.MonoSink;
9+
import reactor.core.publisher.Operators;
10+
11+
import java.io.IOException;
12+
import java.io.OutputStream;
13+
import java.io.UncheckedIOException;
14+
import java.nio.ByteBuffer;
15+
16+
/**
17+
* Subscriber that writes a stream of {@link ByteBuffer ByteBuffers} to an {@link OutputStream}.
18+
*/
19+
@SuppressWarnings("ReactiveStreamsSubscriberImplementation")
20+
public final class OutputStreamWriteSubscriber implements Subscriber<ByteBuffer> {
21+
private final MonoSink<Void> emitter;
22+
private final OutputStream stream;
23+
private final ClientLogger logger;
24+
25+
private Subscription subscription;
26+
27+
public OutputStreamWriteSubscriber(MonoSink<Void> emitter, OutputStream stream, ClientLogger logger) {
28+
this.emitter = emitter;
29+
this.stream = stream;
30+
this.logger = logger;
31+
}
32+
33+
@Override
34+
public void onSubscribe(Subscription s) {
35+
// Only set the Subscription if one has not been previously set.
36+
// Any additional Subscriptions will be cancelled.
37+
if (Operators.validate(this.subscription, s)) {
38+
subscription = s;
39+
40+
s.request(1);
41+
}
42+
}
43+
44+
@Override
45+
public void onNext(ByteBuffer byteBuffer) {
46+
try {
47+
ImplUtils.writeByteBufferToStream(byteBuffer, stream);
48+
subscription.request(1);
49+
} catch (IOException ex) {
50+
// Emit IOExceptions as UncheckIOExceptions as that is the pattern used in SDKs.
51+
onError(new UncheckedIOException(ex));
52+
}
53+
}
54+
55+
@Override
56+
public void onError(Throwable throwable) {
57+
subscription.cancel();
58+
emitter.error(logger.logThrowableAsError(throwable));
59+
}
60+
61+
@Override
62+
public void onComplete() {
63+
emitter.success();
64+
}
65+
}

sdk/core/azure-core/src/main/java/com/azure/core/util/FluxUtil.java

Lines changed: 75 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.azure.core.http.rest.Response;
99
import com.azure.core.implementation.ByteBufferCollector;
1010
import com.azure.core.implementation.FileWriteSubscriber;
11+
import com.azure.core.implementation.OutputStreamWriteSubscriber;
1112
import com.azure.core.implementation.RetriableDownloadFlux;
1213
import com.azure.core.implementation.TypeUtil;
1314
import com.azure.core.util.logging.ClientLogger;
@@ -21,12 +22,15 @@
2122
import reactor.core.publisher.Operators;
2223
import reactor.util.context.ContextView;
2324

25+
import java.io.FileInputStream;
2426
import java.io.IOException;
2527
import java.io.InputStream;
28+
import java.io.OutputStream;
2629
import java.lang.reflect.Type;
2730
import java.nio.ByteBuffer;
2831
import java.nio.channels.AsynchronousFileChannel;
2932
import java.nio.channels.CompletionHandler;
33+
import java.nio.channels.FileChannel;
3034
import java.nio.file.StandardOpenOption;
3135
import java.util.Collections;
3236
import java.util.Map;
@@ -42,6 +46,7 @@
4246
*/
4347
public final class FluxUtil {
4448
private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
49+
private static final ClientLogger LOGGER = new ClientLogger(FluxUtil.class);
4550

4651
/**
4752
* Checks if a type is Flux&lt;ByteBuffer&gt;.
@@ -203,6 +208,40 @@ public static Flux<ByteBuffer> toFluxByteBuffer(InputStream inputStream, int chu
203208
return Flux.empty();
204209
}
205210

211+
// If the InputStream is an instance of FileInputStream we should be able to leverage the FileChannel backing
212+
// the FileInputStream to generated MappedByteBuffers which aren't loaded into memory until the content is
213+
// consumed. This at least defers the memory usage until later and may also provide downstream calls ways to
214+
// optimize if they have special cases for MappedByteBuffer.
215+
if (inputStream instanceof FileInputStream) {
216+
FileChannel fileChannel = ((FileInputStream) inputStream).getChannel();
217+
218+
return Flux.<ByteBuffer, FileChannel>generate(() -> fileChannel, (channel, sink) -> {
219+
try {
220+
long channelPosition = channel.position();
221+
long channelSize = channel.size();
222+
223+
if (channelPosition == channelSize) {
224+
// End of File has been reached, signal completion.
225+
channel.close();
226+
sink.complete();
227+
} else {
228+
// Determine the size of the next MappedByteBuffer, either the remaining File contents or the
229+
// expected chunk size.
230+
int nextByteBufferSize = (int) Math.min(chunkSize, channelSize - channelPosition);
231+
sink.next(channel.map(FileChannel.MapMode.READ_ONLY, channelPosition, nextByteBufferSize));
232+
233+
// FileChannel.map doesn't update the FileChannel's position as reading would, so the position
234+
// needs to be updated based on the number of bytes mapped.
235+
channel.position(channelPosition + nextByteBufferSize);
236+
}
237+
} catch (IOException ex) {
238+
sink.error(ex);
239+
}
240+
241+
return channel;
242+
});
243+
}
244+
206245
return Flux.<ByteBuffer, InputStream>generate(() -> inputStream, (stream, sink) -> {
207246
byte[] buffer = new byte[chunkSize];
208247

@@ -420,6 +459,32 @@ public static reactor.util.context.Context toReactorContext(Context context) {
420459
return returnContext;
421460
}
422461

462+
/**
463+
* Writes the {@link ByteBuffer ByteBuffers} emitted by a {@link Flux} of {@link ByteBuffer} to an {@link
464+
* OutputStream}.
465+
* <p>
466+
* The {@code stream} is not closed by this call, closing of the {@code stream} is managed by the caller.
467+
* <p>
468+
* The response {@link Mono} will emit an error if {@code content} or {@code stream} are null. Additionally, an
469+
* error will be emitted if an exception occurs while writing the {@code content} to the {@code stream}.
470+
*
471+
* @param content The {@link Flux} of {@link ByteBuffer} content.
472+
* @param stream The {@link OutputStream} being written into.
473+
* @return A {@link Mono} which emits a completion status once the {@link Flux} has been written to the {@link
474+
* OutputStream}, or an error status if writing fails.
475+
*/
476+
public static Mono<Void> writeToOutputStream(Flux<ByteBuffer> content, OutputStream stream) {
477+
if (content == null && stream == null) {
478+
return monoError(LOGGER, new NullPointerException("'content' and 'stream' cannot be null."));
479+
} else if (content == null) {
480+
return monoError(LOGGER, new NullPointerException("'content' cannot be null."));
481+
} else if (stream == null) {
482+
return monoError(LOGGER, new NullPointerException("'stream' cannot be null."));
483+
}
484+
485+
return Mono.create(emitter -> content.subscribe(new OutputStreamWriteSubscriber(emitter, stream, LOGGER)));
486+
}
487+
423488
/**
424489
* Writes the {@link ByteBuffer ByteBuffers} emitted by a {@link Flux} of {@link ByteBuffer} to an {@link
425490
* AsynchronousFileChannel}.
@@ -456,24 +521,17 @@ public static Mono<Void> writeFile(Flux<ByteBuffer> content, AsynchronousFileCha
456521
* AsynchronousFileChannel}.
457522
*/
458523
public static Mono<Void> writeFile(Flux<ByteBuffer> content, AsynchronousFileChannel outFile, long position) {
459-
return Mono.create(emitter -> {
460-
if (content == null) {
461-
emitter.error(new NullPointerException("'content' cannot be null."));
462-
return;
463-
}
464-
465-
if (outFile == null) {
466-
emitter.error(new NullPointerException("'outFile' cannot be null."));
467-
return;
468-
}
469-
470-
if (position < 0) {
471-
emitter.error(new IllegalArgumentException("'position' cannot be less than 0."));
472-
return;
473-
}
524+
if (content == null && outFile == null) {
525+
return monoError(LOGGER, new NullPointerException("'content' and 'outFile' cannot be null."));
526+
} else if (content == null) {
527+
return monoError(LOGGER, new NullPointerException("'content' cannot be null."));
528+
} else if (outFile == null) {
529+
return monoError(LOGGER, new NullPointerException("'outFile' cannot be null."));
530+
} else if (position < 0) {
531+
return monoError(LOGGER, new IllegalArgumentException("'position' cannot be less than 0."));
532+
}
474533

475-
content.subscribe(new FileWriteSubscriber(outFile, position, emitter));
476-
});
534+
return Mono.create(emitter -> content.subscribe(new FileWriteSubscriber(outFile, position, emitter)));
477535
}
478536

479537
/**

0 commit comments

Comments
 (0)