From c5ed15eaad178303c99b63d028637901a39a9740 Mon Sep 17 00:00:00 2001 From: Olivier Lepage-Applin Date: Tue, 22 Jul 2025 18:15:56 -0400 Subject: [PATCH 01/41] exploration --- .../async/FileAsyncResponseTransformer.java | 129 ++-------------- .../async/FileSplittingTransformer.java | 19 +++ .../core/internal/async/FileSubscriber.java | 141 ++++++++++++++++++ .../internal/async/FileSubscriberTckTest.java | 1 - .../progress/TransferProgressUpdater.java | 1 + .../progress/ContentRangeParserTest.java | 17 ++- .../multipart/DownloadObjectHelper.java | 4 + ...MultipartFileAsyncResponseTransformer.java | 54 +++++++ .../4195d6e3-8849-4e5a-848d-04f810577cd3 | 2 +- .../awssdk/utils}/ContentRangeParser.java | 35 ++++- .../amazon/awssdk/utils/ToString.java | 1 + 11 files changed, 282 insertions(+), 122 deletions(-) create mode 100644 core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileSplittingTransformer.java create mode 100644 core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileSubscriber.java create mode 100644 services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartFileAsyncResponseTransformer.java rename {services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress => utils/src/main/java/software/amazon/awssdk/utils}/ContentRangeParser.java (71%) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncResponseTransformer.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncResponseTransformer.java index 4348355fa5d8..b3fc0babe791 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncResponseTransformer.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncResponseTransformer.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousFileChannel; -import java.nio.channels.CompletionHandler; import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.OpenOption; @@ -34,18 +33,17 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.core.FileTransformerConfiguration; import software.amazon.awssdk.core.FileTransformerConfiguration.FailureBehavior; +import software.amazon.awssdk.core.SplittingTransformerConfiguration; import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.utils.Logger; import software.amazon.awssdk.utils.Validate; +import software.amazon.awssdk.utils.async.SimplePublisher; /** * {@link AsyncResponseTransformer} that writes the data to the specified file. @@ -59,7 +57,7 @@ public final class FileAsyncResponseTransformer implements AsyncRespo private volatile AsynchronousFileChannel fileChannel; private volatile CompletableFuture cf; private volatile ResponseT response; - private final long position; + private long position; private final FileTransformerConfiguration configuration; public FileAsyncResponseTransformer(Path path) { @@ -89,7 +87,7 @@ private static long determineFilePositionToWrite(Path path, FileTransformerConfi if (fileConfiguration.fileWriteOption() == WRITE_TO_POSITION) { return Validate.getOrDefault(fileConfiguration.position(), () -> 0L); } - return 0L; + return 0L; } private AsynchronousFileChannel createChannel(Path path) throws IOException { @@ -134,6 +132,15 @@ public void onResponse(ResponseT response) { this.response = response; } + // to be overridden by subclasses + public void setOffsetPosition(long position) { + this.position = position; + } + + public FileTransformerConfiguration getConfiguration() { + return this.configuration; + } + @Override public void onStream(SdkPublisher publisher) { try { @@ -174,114 +181,4 @@ public String name() { return TransformerType.FILE.getName(); } - /** - * {@link Subscriber} implementation that writes chunks to a file. - */ - static class FileSubscriber implements Subscriber { - private final AtomicLong position; - private final AsynchronousFileChannel fileChannel; - private final Path path; - private final CompletableFuture future; - private final Consumer onErrorMethod; - - private volatile boolean writeInProgress = false; - private volatile boolean closeOnLastWrite = false; - private Subscription subscription; - - FileSubscriber(AsynchronousFileChannel fileChannel, Path path, CompletableFuture future, - Consumer onErrorMethod, long startingPosition) { - this.fileChannel = fileChannel; - this.path = path; - this.future = future; - this.onErrorMethod = onErrorMethod; - this.position = new AtomicLong(startingPosition); - } - - @Override - public void onSubscribe(Subscription s) { - if (this.subscription != null) { - s.cancel(); - return; - } - this.subscription = s; - // Request the first chunk to start producing content - s.request(1); - } - - @Override - public void onNext(ByteBuffer byteBuffer) { - if (byteBuffer == null) { - throw new NullPointerException("Element must not be null"); - } - - performWrite(byteBuffer); - } - - private void performWrite(ByteBuffer byteBuffer) { - writeInProgress = true; - - fileChannel.write(byteBuffer, position.get(), byteBuffer, new CompletionHandler() { - @Override - public void completed(Integer result, ByteBuffer attachment) { - position.addAndGet(result); - - if (byteBuffer.hasRemaining()) { - performWrite(byteBuffer); - } else { - synchronized (FileSubscriber.this) { - writeInProgress = false; - if (closeOnLastWrite) { - close(); - } else { - subscription.request(1); - } - } - } - } - - @Override - public void failed(Throwable exc, ByteBuffer attachment) { - subscription.cancel(); - future.completeExceptionally(exc); - } - }); - } - - @Override - public void onError(Throwable t) { - onErrorMethod.accept(t); - } - - @Override - public void onComplete() { - log.trace(() -> "onComplete"); - // if write in progress, tell write to close on finish. - synchronized (this) { - if (writeInProgress) { - log.trace(() -> "writeInProgress = true, not closing"); - closeOnLastWrite = true; - } else { - log.trace(() -> "writeInProgress = false, closing"); - close(); - } - } - } - - private void close() { - try { - if (fileChannel != null) { - invokeSafely(fileChannel::close); - } - log.trace(() -> "Completing File async transformer future future"); - future.complete(null); - } catch (RuntimeException exception) { - future.completeExceptionally(exception); - } - } - - @Override - public String toString() { - return getClass() + ":" + path.toString(); - } - } } \ No newline at end of file diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileSplittingTransformer.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileSplittingTransformer.java new file mode 100644 index 000000000000..eb4a5b21b193 --- /dev/null +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileSplittingTransformer.java @@ -0,0 +1,19 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.internal.async; + +public class FileSplittingTransformer { +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileSubscriber.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileSubscriber.java new file mode 100644 index 000000000000..844e78549481 --- /dev/null +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileSubscriber.java @@ -0,0 +1,141 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.internal.async; + +import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely; + +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousFileChannel; +import java.nio.channels.CompletionHandler; +import java.nio.file.Path; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import software.amazon.awssdk.utils.Logger; + +/** + * {@link Subscriber} implementation that writes chunks to a file. + */ +public class FileSubscriber implements Subscriber { + private static final Logger log = Logger.loggerFor(FileSubscriber.class); + private final AtomicLong position; + private final AsynchronousFileChannel fileChannel; + private final Path path; + private final CompletableFuture future; + private final Consumer onErrorMethod; + + private volatile boolean writeInProgress = false; + private volatile boolean closeOnLastWrite = false; + private Subscription subscription; + + public FileSubscriber(AsynchronousFileChannel fileChannel, Path path, CompletableFuture future, + Consumer onErrorMethod, long startingPosition) { + this.fileChannel = fileChannel; + this.path = path; + this.future = future; + this.onErrorMethod = onErrorMethod; + this.position = new AtomicLong(startingPosition); + } + + @Override + public void onSubscribe(Subscription s) { + if (this.subscription != null) { + s.cancel(); + return; + } + this.subscription = s; + // Request the first chunk to start producing content + s.request(1); + } + + @Override + public void onNext(ByteBuffer byteBuffer) { + if (byteBuffer == null) { + throw new NullPointerException("Element must not be null"); + } + + performWrite(byteBuffer); + } + + private void performWrite(ByteBuffer byteBuffer) { + writeInProgress = true; + + fileChannel.write(byteBuffer, position.get(), byteBuffer, new CompletionHandler() { + @Override + public void completed(Integer result, ByteBuffer attachment) { + position.addAndGet(result); + + if (byteBuffer.hasRemaining()) { + performWrite(byteBuffer); + } else { + synchronized (FileSubscriber.this) { + writeInProgress = false; + if (closeOnLastWrite) { + close(); + } else { + subscription.request(1); + } + } + } + } + + @Override + public void failed(Throwable exc, ByteBuffer attachment) { + subscription.cancel(); + future.completeExceptionally(exc); + } + }); + } + + @Override + public void onError(Throwable t) { + onErrorMethod.accept(t); + } + + @Override + public void onComplete() { + log.trace(() -> "onComplete"); + // if write in progress, tell write to close on finish. + synchronized (this) { + if (writeInProgress) { + log.trace(() -> "writeInProgress = true, not closing"); + closeOnLastWrite = true; + } else { + log.trace(() -> "writeInProgress = false, closing"); + close(); + } + } + } + + private void close() { + try { + if (fileChannel != null) { + invokeSafely(fileChannel::close); + } + log.trace(() -> "Completing File async transformer future future"); + future.complete(null); + } catch (RuntimeException exception) { + future.completeExceptionally(exception); + } + } + + @Override + public String toString() { + return getClass() + ":" + path.toString(); + } +} diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileSubscriberTckTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileSubscriberTckTest.java index 8810c3459f25..7690afaeadce 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileSubscriberTckTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileSubscriberTckTest.java @@ -31,7 +31,6 @@ import org.reactivestreams.Subscription; import org.reactivestreams.tck.SubscriberWhiteboxVerification; import org.reactivestreams.tck.TestEnvironment; -import software.amazon.awssdk.core.internal.async.FileAsyncResponseTransformer.FileSubscriber; /** * TCK verification test for {@link FileSubscriber}. diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java index 2cab45039d97..aaf1ee84a9ad 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java @@ -35,6 +35,7 @@ import software.amazon.awssdk.transfer.s3.progress.TransferListener; import software.amazon.awssdk.transfer.s3.progress.TransferProgress; import software.amazon.awssdk.transfer.s3.progress.TransferProgressSnapshot; +import software.amazon.awssdk.utils.ContentRangeParser; /** * An SDK-internal helper class that facilitates updating a {@link TransferProgress} and invoking {@link TransferListener}s. diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/progress/ContentRangeParserTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/progress/ContentRangeParserTest.java index 6dc7a7fc3ce8..61a686c012d7 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/progress/ContentRangeParserTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/progress/ContentRangeParserTest.java @@ -17,12 +17,14 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.util.Optional; import java.util.OptionalLong; import java.util.stream.Stream; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import software.amazon.awssdk.utils.ContentRangeParser; +import software.amazon.awssdk.utils.Pair; class ContentRangeParserTest { @@ -51,4 +53,17 @@ static Stream argumentProvider() { Arguments.of("bla bla bla", OptionalLong.empty())); } + @ParameterizedTest + @MethodSource("testRange") + void testRange(String contentRange, Optional> expected) { + assertThat(ContentRangeParser.range(contentRange)).isEqualTo(expected); + } + + static Stream testRange() { + return Stream.of( + Arguments.of("bytes 0-9/10", Optional.of(Pair.of(0L, 9L))), + Arguments.of("bytes 12000000-17999999/30000000", Optional.of(Pair.of(12000000L, 17999999L))) + ); + } + } \ No newline at end of file diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/DownloadObjectHelper.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/DownloadObjectHelper.java index 2d6fadc5f505..95ff55fac84f 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/DownloadObjectHelper.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/DownloadObjectHelper.java @@ -20,6 +20,7 @@ import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.core.SplittingTransformerConfiguration; import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.internal.async.FileAsyncResponseTransformer; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; @@ -42,6 +43,9 @@ public CompletableFuture downloadObject( if (getObjectRequest.range() != null || getObjectRequest.partNumber() != null) { logSinglePartMessage(getObjectRequest); return s3AsyncClient.getObject(getObjectRequest, asyncResponseTransformer); + } + if (asyncResponseTransformer instanceof FileAsyncResponseTransformer) { + } AsyncResponseTransformer.SplitResult split = asyncResponseTransformer.split(SplittingTransformerConfiguration.builder() diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartFileAsyncResponseTransformer.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartFileAsyncResponseTransformer.java new file mode 100644 index 000000000000..f94fdd11e067 --- /dev/null +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartFileAsyncResponseTransformer.java @@ -0,0 +1,54 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.services.s3.internal.multipart; + +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.async.SdkPublisher; +import software.amazon.awssdk.core.internal.async.FileAsyncResponseTransformer; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.utils.ContentRangeParser; + +public class MultipartFileAsyncResponseTransformer implements AsyncResponseTransformer { + private final FileAsyncResponseTransformer delegate; + + public MultipartFileAsyncResponseTransformer(FileAsyncResponseTransformer delegate) { + this.delegate = delegate; + } + + @Override + public CompletableFuture prepare() { + return delegate.prepare(); + } + + @Override + public void onResponse(GetObjectResponse response) { + delegate.onResponse(response); + ContentRangeParser.range(response.contentRange()) + .ifPresent(pair -> delegate.setOffsetPosition(pair.left())); + } + + @Override + public void onStream(SdkPublisher publisher) { + delegate.onStream(publisher); + } + + @Override + public void exceptionOccurred(Throwable error) { + delegate.exceptionOccurred(error); + } +} diff --git a/test/architecture-tests/archunit_store/4195d6e3-8849-4e5a-848d-04f810577cd3 b/test/architecture-tests/archunit_store/4195d6e3-8849-4e5a-848d-04f810577cd3 index 8c3b1f284548..c1c17fce7485 100644 --- a/test/architecture-tests/archunit_store/4195d6e3-8849-4e5a-848d-04f810577cd3 +++ b/test/architecture-tests/archunit_store/4195d6e3-8849-4e5a-848d-04f810577cd3 @@ -41,7 +41,7 @@ Method calls method in (ReceiveSqsMessageHelper.java:132) Method calls method in (AsyncBufferingSubscriber.java:67) Method calls method in (PauseResumeHelper.java:59) -Method calls method in (ContentRangeParser.java:71) +Method calls method in (ContentRangeParser.java:71) Method calls method in (LoggingTransferListener.java:76) Method calls method in (Logger.java:205) Method calls method in (AddingTrailingDataSubscriber.java:73) diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/ContentRangeParser.java b/utils/src/main/java/software/amazon/awssdk/utils/ContentRangeParser.java similarity index 71% rename from services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/ContentRangeParser.java rename to utils/src/main/java/software/amazon/awssdk/utils/ContentRangeParser.java index 03e67c402a56..da65b46b594c 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/ContentRangeParser.java +++ b/utils/src/main/java/software/amazon/awssdk/utils/ContentRangeParser.java @@ -13,12 +13,11 @@ * permissions and limitations under the License. */ -package software.amazon.awssdk.transfer.s3.internal.progress; +package software.amazon.awssdk.utils; +import java.util.Optional; import java.util.OptionalLong; import software.amazon.awssdk.annotations.SdkInternalApi; -import software.amazon.awssdk.utils.Logger; -import software.amazon.awssdk.utils.StringUtils; /** * Parse a Content-Range header value into a total byte count. The expected format is the following:

@@ -72,4 +71,34 @@ public static OptionalLong totalBytes(String contentRange) { return OptionalLong.empty(); } } + + public static Optional> range(String contentRange) { + if (StringUtils.isEmpty(contentRange)) { + return Optional.empty(); + } + + String trimmed = contentRange.trim(); + if (!trimmed.startsWith("bytes ")) { + return Optional.empty(); + } + String withoutBytes = trimmed.substring("bytes ".length()); + int hyphen = withoutBytes.indexOf('-'); + if (hyphen == -1) { + return Optional.empty(); + } + String begin = withoutBytes.substring(0, hyphen); + int slash = withoutBytes.indexOf('/'); + if (slash == -1) { + return Optional.empty(); + } + String end = withoutBytes.substring(hyphen + 1, slash); + try { + long startInt = Long.parseLong(begin); + long endInt = Long.parseLong(end); + return Optional.of(Pair.of(startInt, endInt)); + } catch (Exception e) { + return Optional.empty(); + } + } + } diff --git a/utils/src/main/java/software/amazon/awssdk/utils/ToString.java b/utils/src/main/java/software/amazon/awssdk/utils/ToString.java index 6b6fbbbe7a55..a066ae68e6e6 100644 --- a/utils/src/main/java/software/amazon/awssdk/utils/ToString.java +++ b/utils/src/main/java/software/amazon/awssdk/utils/ToString.java @@ -16,6 +16,7 @@ package software.amazon.awssdk.utils; import java.util.Arrays; +import java.util.concurrent.locks.ReentrantLock; import software.amazon.awssdk.annotations.NotThreadSafe; import software.amazon.awssdk.annotations.SdkProtectedApi; From 067db3049b67c8b987068b365b36b1934f622226 Mon Sep 17 00:00:00 2001 From: Olivier Lepage-Applin Date: Wed, 23 Jul 2025 09:17:28 -0400 Subject: [PATCH 02/41] hide cloudwatch logs for s3 regression tests --- .github/workflows/s3-regression-tests.yml | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/.github/workflows/s3-regression-tests.yml b/.github/workflows/s3-regression-tests.yml index abad4b4e675f..9b5f4fa047c9 100644 --- a/.github/workflows/s3-regression-tests.yml +++ b/.github/workflows/s3-regression-tests.yml @@ -51,7 +51,8 @@ jobs: with: project-name: aws-sdk-java-v2-s3-regression-tests env-vars-for-codebuild: REGRESSION_TEST,HAS_S3_CHANGES - + hide-cloudwatch-logs: true + s3-regression-tests-control-plane: needs: check-s3-related-changes if: github.repository == 'aws/aws-sdk-java-v2' @@ -71,7 +72,8 @@ jobs: with: project-name: aws-sdk-java-v2-s3-regression-tests env-vars-for-codebuild: REGRESSION_TEST,HAS_S3_CHANGES - + hide-cloudwatch-logs: true + s3-regression-tests-upload-sync: needs: check-s3-related-changes if: github.repository == 'aws/aws-sdk-java-v2' @@ -91,7 +93,8 @@ jobs: with: project-name: aws-sdk-java-v2-s3-regression-tests env-vars-for-codebuild: REGRESSION_TEST,HAS_S3_CHANGES - + hide-cloudwatch-logs: true + s3-regression-tests-upload-async: needs: check-s3-related-changes if: github.repository == 'aws/aws-sdk-java-v2' @@ -111,7 +114,8 @@ jobs: with: project-name: aws-sdk-java-v2-s3-regression-tests env-vars-for-codebuild: REGRESSION_TEST,HAS_S3_CHANGES - + hide-cloudwatch-logs: true + s3-regression-tests-upload-crt: needs: check-s3-related-changes if: github.repository == 'aws/aws-sdk-java-v2' @@ -131,7 +135,8 @@ jobs: with: project-name: aws-sdk-java-v2-s3-regression-tests env-vars-for-codebuild: REGRESSION_TEST,HAS_S3_CHANGES - + hide-cloudwatch-logs: true + s3-regression-tests-upload-multi: needs: check-s3-related-changes if: github.repository == 'aws/aws-sdk-java-v2' @@ -151,3 +156,4 @@ jobs: with: project-name: aws-sdk-java-v2-s3-regression-tests env-vars-for-codebuild: REGRESSION_TEST,HAS_S3_CHANGES + hide-cloudwatch-logs: true \ No newline at end of file From 51357eed964f55250996ea856007a96c015dd1c7 Mon Sep 17 00:00:00 2001 From: Olivier Lepage-Applin Date: Wed, 6 Aug 2025 16:04:27 -0400 Subject: [PATCH 03/41] Multipart dowload implementation --- .idea/codeStyles/Project.xml | 1 + .../core/async/AsyncResponseTransformer.java | 18 ++ ...ltAsyncResponseTransformerSplitResult.java | 20 ++ .../async/FileAsyncResponseTransformer.java | 22 +- ...FileAsyncResponseTransformerPublisher.java | 126 +++++++++ .../async/ThreadSafeEmittingSubscription.java | 96 +++++++ .../progress/ContentRangeParserTest.java | 1 + .../multipart/DownloadObjectHelper.java | 29 +- ...MultipartFileAsyncResponseTransformer.java | 54 ---- ...onLinearMultipartDownloaderSubscriber.java | 253 ++++++++++++++++++ .../awssdk/utils/ContentRangeParser.java | 12 + 11 files changed, 569 insertions(+), 63 deletions(-) create mode 100644 core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncResponseTransformerPublisher.java create mode 100644 core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/ThreadSafeEmittingSubscription.java delete mode 100644 services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartFileAsyncResponseTransformer.java create mode 100644 services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/NonLinearMultipartDownloaderSubscriber.java diff --git a/.idea/codeStyles/Project.xml b/.idea/codeStyles/Project.xml index 82511b2671f7..31e3dd29a01f 100644 --- a/.idea/codeStyles/Project.xml +++ b/.idea/codeStyles/Project.xml @@ -10,6 +10,7 @@