Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
c5ed15e
exploration
L-Applin Jul 22, 2025
067db30
hide cloudwatch logs for s3 regression tests
L-Applin Jul 23, 2025
41be0a6
Merge remote-tracking branch 'origin/master'
L-Applin Aug 5, 2025
51357ee
Multipart dowload implementation
L-Applin Aug 6, 2025
a9d25ec
Multipart dowload implementation
L-Applin Aug 11, 2025
b70ab8b
Merge remote-tracking branch 'origin/master'
L-Applin Aug 11, 2025
0a8f6c5
Merge remote-tracking branch 'origin/master'
L-Applin Aug 12, 2025
7c82e2b
Update non-linear download logic
L-Applin Aug 19, 2025
978f553
request max-in-flight at first request
L-Applin Aug 20, 2025
3e29b8c
remove stateful calls to FileAsyncResponseTransformer by creating it …
L-Applin Aug 21, 2025
c44a520
Wiremock test for s3AsyncClient with AsyncResponseTransformer.toFile
L-Applin Aug 22, 2025
418dcbd
added 10_000 parts tests
L-Applin Aug 22, 2025
db7d301
add missing assertions to errorOnMiddlePart_retryable
L-Applin Aug 22, 2025
2f65a8c
handle fileWriteOption
L-Applin Aug 25, 2025
0bfbdb2
prevent over-requesting and fix potential race condition in FileSubsc…
L-Applin Aug 27, 2025
0a20a9c
TM integration
L-Applin Sep 4, 2025
e1aebb5
fix tests
L-Applin Sep 4, 2025
3a0a0b1
debugging for long stop before completing future
L-Applin Sep 4, 2025
2f3a14e
added outstanding demand tracking, assert with checksum in integ test
L-Applin Sep 9, 2025
e49aadc
Merge remote-tracking branch 'origin/master'
L-Applin Sep 10, 2025
aa0caf1
Fix pendingTransformer polling
L-Applin Sep 11, 2025
29f899d
Make download method of TM work properly with progressUpdater
L-Applin Sep 11, 2025
0aa1529
remove logs
L-Applin Sep 12, 2025
426d82d
add ParallelConfiguration for maxInFlightParts
L-Applin Sep 16, 2025
5bdc142
javadoc and logs
L-Applin Sep 16, 2025
9551671
Merge remote-tracking branch 'origin/master'
L-Applin Sep 16, 2025
7e19d14
Merge remote-tracking branch 'origin/master' into feature/master/hagr…
L-Applin Sep 16, 2025
d37056d
Merge branch 'master' into feature/master/hagrid-multi
L-Applin Sep 16, 2025
474111b
cleanup and changelog
L-Applin Sep 16, 2025
9ad9c56
cleanup
L-Applin Sep 16, 2025
861d64d
fix test utils after merge
L-Applin Sep 16, 2025
c684d4c
remove comment code in test
L-Applin Sep 16, 2025
e34dcc3
checkstyle
L-Applin Sep 16, 2025
8506c43
rename initialPosition to position
L-Applin Sep 16, 2025
f3d3773
use firstMatchingHeader() instead of headers() in FileAsyncResponseTr…
L-Applin Sep 16, 2025
e1360f7
add handle error method
L-Applin Sep 16, 2025
d1db3c8
fix FileAsyncResponseTransformerPublisherTest wrong content-range met…
L-Applin Sep 17, 2025
50631f5
use succeedsWithin assertion for future
L-Applin Sep 17, 2025
8f36d18
fix checkstyle violations
L-Applin Sep 17, 2025
a1f8d23
fix checkstyle violations
L-Applin Sep 17, 2025
1716e46
PR comments - first pass
L-Applin Sep 18, 2025
289554e
PR comments - keep transformerCount in FileAsyncResponseTransformerPu…
L-Applin Sep 18, 2025
e025fb3
PR comments - keep transformerCount in FileAsyncResponseTransformerPu…
L-Applin Sep 18, 2025
fe7985f
rename supportsNonSerial to parallelSplitSupported, and other PR comm…
L-Applin Sep 18, 2025
af94ef1
Added file config options to FileAsyncResponseTransformerPublisherTest
L-Applin Sep 18, 2025
9902ec8
PR comments
L-Applin Sep 18, 2025
e6c824c
checkstyle
L-Applin Sep 19, 2025
13098d3
checkstyle
L-Applin Sep 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/feature-S3-12a0b55.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "feature",
"category": "S3",
"contributor": "",
"description": "Add support for parallel download for individual part-get for multipart GetObject in s3 async client and Transfer Manager"
}
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,15 @@ interface SplitResult<ResponseT, ResultT>
*/
CompletableFuture<ResultT> resultFuture();

/**
* Indicates if the split async response transformer supports sending individual transformer non-serially, as well as
* receiving back data from the many {@link AsyncResponseTransformer#onStream(SdkPublisher) publishers} non-serially.
* @return true if non-serial data is supported, false otherwise
*/
default Boolean parallelSplitSupported() {
return false;
}

static <ResponseT, ResultT> Builder<ResponseT, ResultT> builder() {
return DefaultAsyncResponseTransformerSplitResult.builder();
}
Expand Down Expand Up @@ -413,6 +422,20 @@ interface Builder<ResponseT, ResultT>
* @return an instance of this Builder
*/
Builder<ResponseT, ResultT> resultFuture(CompletableFuture<ResultT> future);

/**
* If the AsyncResponseTransformers returned by the {@link SplitResult#publisher()} support concurrent
* parallel streaming of multiple content body concurrently.
* @return
*/
Boolean parallelSplitSupported();

/**
* Sets whether the AsyncResponseTransformers returned by the {@link SplitResult#publisher()} support concurrent
* parallel streaming of multiple content body concurrently
* @return
*/
Builder<ResponseT, ResultT> parallelSplitSupported(Boolean parallelSplitSupported);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.reactivestreams.Subscriber;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.annotations.SdkProtectedApi;
import software.amazon.awssdk.core.SplittingTransformerConfiguration;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.utils.Logger;
Expand Down Expand Up @@ -108,6 +109,11 @@ public String name() {
return delegate.name();
}

@Override
public SplitResult<ResponseT, ResultT> split(SplittingTransformerConfiguration splitConfig) {
return delegate.split(splitConfig);
}

static void invoke(Runnable runnable, String callbackName) {
try {
runnable.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ public final class DefaultAsyncResponseTransformerSplitResult<ResponseT, ResultT

private final SdkPublisher<AsyncResponseTransformer<ResponseT, ResponseT>> publisher;
private final CompletableFuture<ResultT> future;
private final Boolean parallelSplitSupported;

private DefaultAsyncResponseTransformerSplitResult(Builder<ResponseT, ResultT> builder) {
this.publisher = Validate.paramNotNull(
builder.publisher(), "asyncResponseTransformerPublisher");
this.future = Validate.paramNotNull(
builder.resultFuture(), "future");
this.parallelSplitSupported = Validate.getOrDefault(builder.parallelSplitSupported(), () -> false);
}

/**
Expand All @@ -52,6 +54,11 @@ public CompletableFuture<ResultT> resultFuture() {
return this.future;
}

@Override
public Boolean parallelSplitSupported() {
return this.parallelSplitSupported;
}

@Override
public AsyncResponseTransformer.SplitResult.Builder<ResponseT, ResultT> toBuilder() {
return new DefaultBuilder<>(this);
Expand All @@ -65,13 +72,15 @@ public static class DefaultBuilder<ResponseT, ResultT>
implements AsyncResponseTransformer.SplitResult.Builder<ResponseT, ResultT> {
private SdkPublisher<AsyncResponseTransformer<ResponseT, ResponseT>> publisher;
private CompletableFuture<ResultT> future;
private Boolean parallelSplitSupported;

DefaultBuilder() {
}

DefaultBuilder(DefaultAsyncResponseTransformerSplitResult<ResponseT, ResultT> split) {
this.publisher = split.publisher;
this.future = split.future;
this.parallelSplitSupported = split.parallelSplitSupported;
}

@Override
Expand All @@ -92,14 +101,28 @@ public CompletableFuture<ResultT> resultFuture() {
}

@Override
public AsyncResponseTransformer.SplitResult.Builder<ResponseT, ResultT> resultFuture(CompletableFuture<ResultT> future) {
public AsyncResponseTransformer.SplitResult.Builder<ResponseT, ResultT> resultFuture(
CompletableFuture<ResultT> future) {
this.future = future;
return this;
}

@Override
public Boolean parallelSplitSupported() {
return parallelSplitSupported;
}

@Override
public AsyncResponseTransformer.SplitResult.Builder<ResponseT, ResultT> parallelSplitSupported(
Boolean parallelSplitSupported) {
this.parallelSplitSupported = parallelSplitSupported;
return this;
}

@Override
public AsyncResponseTransformer.SplitResult<ResponseT, ResultT> build() {
return new DefaultAsyncResponseTransformerSplitResult<>(this);
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.async;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.annotations.ThreadSafe;
import software.amazon.awssdk.utils.Logger;

/**
* Subscription which can emit {@link Subscriber#onNext(T)} signals to a subscriber, based on the demand received with the
* {@link Subscription#request(long)}. It tracks the outstandingDemand that has not yet been fulfilled and used a Supplier
* passed to it to create the object it needs to emit.
* @param <T> the type of obejct to emit to the subscriber.
*/
@SdkInternalApi
@ThreadSafe
public final class EmittingSubscription<T> implements Subscription {

private Subscriber<? super T> downstreamSubscriber;
private final AtomicBoolean emitting = new AtomicBoolean(false);
private final AtomicLong outstandingDemand;
private final Runnable onCancel;
private final AtomicBoolean isCancelled;
private final Supplier<T> supplier;
private final Logger log;


private EmittingSubscription(Builder<T> builder) {
this.downstreamSubscriber = builder.downstreamSubscriber;
this.outstandingDemand = builder.outstandingDemand;
this.onCancel = builder.onCancel;
this.isCancelled = builder.isCancelled;
this.log = builder.log;
this.supplier = builder.supplier;
}

public static <T> Builder<T> builder() {
return new Builder<>();
}

@Override
public void request(long n) {
if (n <= 0) {
downstreamSubscriber.onError(new IllegalArgumentException("Amount requested must be positive"));
return;
}
long newDemand = outstandingDemand.updateAndGet(current -> {
if (Long.MAX_VALUE - current < n) {
return Long.MAX_VALUE;
}
return current + n;
});
log.trace(() -> String.format("new outstanding demand: %s", newDemand));
emit();
}

@Override
public void cancel() {
isCancelled.set(true);
downstreamSubscriber = null;
onCancel.run();
}

private void emit() {
do {
if (!emitting.compareAndSet(false, true)) {
return;
}
try {
if (doEmit()) {
return;
}
} finally {
emitting.compareAndSet(true, false);
}
} while (outstandingDemand.get() > 0);
}

private boolean doEmit() {
long demand = outstandingDemand.get();

while (demand > 0) {
if (isCancelled.get()) {
return true;
}
if (outstandingDemand.get() > 0) {
demand = outstandingDemand.decrementAndGet();
T value;
try {
value = supplier.get();
} catch (Exception e) {
downstreamSubscriber.onError(e);
return true;
}
downstreamSubscriber.onNext(value);
}
}
return false;
}

public static class Builder<T> {
private Subscriber<? super T> downstreamSubscriber;
private AtomicLong outstandingDemand = new AtomicLong(0);
private AtomicBoolean isCancelled = new AtomicBoolean(false);
private Logger log = Logger.loggerFor(EmittingSubscription.class);
private Runnable onCancel;
private Supplier<T> supplier;

public Builder<T> downstreamSubscriber(Subscriber<? super T> subscriber) {
this.downstreamSubscriber = subscriber;
return this;
}

public Builder<T> outstandingDemand(AtomicLong outstandingDemand) {
this.outstandingDemand = outstandingDemand;
return this;
}

public Builder<T> onCancel(Runnable onCancel) {
this.onCancel = onCancel;
return this;
}

public Builder<T> isCancelled(AtomicBoolean isCancelled) {
this.isCancelled = isCancelled;
return this;
}

public Builder<T> log(Logger log) {
this.log = log;
return this;
}

public Builder<T> supplier(Supplier<T> supplier) {
this.supplier = supplier;
return this;
}

public EmittingSubscription<T> build() {
return new EmittingSubscription<>(this);
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.FileTransformerConfiguration;
import software.amazon.awssdk.core.FileTransformerConfiguration.FailureBehavior;
import software.amazon.awssdk.core.SplittingTransformerConfiguration;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.core.exception.SdkClientException;
Expand Down Expand Up @@ -76,6 +77,18 @@ private FileAsyncResponseTransformer(Path path, FileTransformerConfiguration fil
this.position = position;
}

FileTransformerConfiguration config() {
return configuration.toBuilder().build();
}

Path path() {
return path;
}

long position() {
return position;
}

private static long determineFilePositionToWrite(Path path, FileTransformerConfiguration fileConfiguration) {
if (fileConfiguration.fileWriteOption() == CREATE_OR_APPEND_TO_EXISTING) {
try {
Expand All @@ -89,7 +102,7 @@ private static long determineFilePositionToWrite(Path path, FileTransformerConfi
if (fileConfiguration.fileWriteOption() == WRITE_TO_POSITION) {
return Validate.getOrDefault(fileConfiguration.position(), () -> 0L);
}
return 0L;
return 0L;
}

private AsynchronousFileChannel createChannel(Path path) throws IOException {
Expand Down Expand Up @@ -183,6 +196,7 @@ static class FileSubscriber implements Subscriber<ByteBuffer> {
private final Path path;
private final CompletableFuture<Void> future;
private final Consumer<Throwable> onErrorMethod;
private final Object closeLock = new Object();

private volatile boolean writeInProgress = false;
private volatile boolean closeOnLastWrite = false;
Expand Down Expand Up @@ -228,7 +242,7 @@ public void completed(Integer result, ByteBuffer attachment) {
if (byteBuffer.hasRemaining()) {
performWrite(byteBuffer);
} else {
synchronized (FileSubscriber.this) {
synchronized (closeLock) {
writeInProgress = false;
if (closeOnLastWrite) {
close();
Expand Down Expand Up @@ -256,7 +270,7 @@ public void onError(Throwable t) {
public void onComplete() {
log.trace(() -> "onComplete");
// if write in progress, tell write to close on finish.
synchronized (this) {
synchronized (closeLock) {
if (writeInProgress) {
log.trace(() -> "writeInProgress = true, not closing");
closeOnLastWrite = true;
Expand Down Expand Up @@ -284,4 +298,18 @@ public String toString() {
return getClass() + ":" + path.toString();
}
}
}


@Override
public SplitResult<ResponseT, ResponseT> split(SplittingTransformerConfiguration splitConfig) {
if (configuration.fileWriteOption() == CREATE_OR_APPEND_TO_EXISTING) {
return AsyncResponseTransformer.super.split(splitConfig);
}
CompletableFuture<ResponseT> future = new CompletableFuture<>();
return (SplitResult<ResponseT, ResponseT>) SplitResult.<ResponseT, ResponseT>builder()
.publisher(new FileAsyncResponseTransformerPublisher(this))
.resultFuture(future)
.parallelSplitSupported(true)
.build();
}
}
Loading
Loading