Skip to content

Commit 228b434

Browse files
authored
Fix BlobBatch Not Emitting Responses (Azure#23304)
Fix BlobBatch Not Emitting Responses
1 parent 918fbe3 commit 228b434

File tree

59 files changed

+2956
-2860
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+2956
-2860
lines changed

sdk/storage/azure-storage-blob-batch/src/main/java/com/azure/storage/blob/batch/BlobBatchHelper.java

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,16 @@ class BlobBatchHelper {
5151
private static final Pattern APPLICATION_HTTP_PATTERN = Pattern
5252
.compile("application\\/http", Pattern.CASE_INSENSITIVE);
5353

54+
/*
55+
* The following patterns were previously used in 'String.split' calls. 'String.split' internally compiles the
56+
* String pattern into a Pattern regex if it is larger than one character, or two if-and-only-if it is an escaped
57+
* single character. Compiling these patterns here will help reduce the number of regex compilations, greatly
58+
* improving performance.
59+
*/
60+
private static final Pattern HTTP_NEWLINE_PATTERN = Pattern.compile(HTTP_NEWLINE);
61+
private static final Pattern HTTP_DOUBLE_NEWLINE_PATTERN = Pattern.compile(HTTP_NEWLINE + HTTP_NEWLINE_PATTERN);
62+
private static final Pattern HTTP_HEADER_SPLIT_PATTERN = Pattern.compile(":\\s*");
63+
5464
// This method connects the batch response values to the individual batch operations based on their Content-Id
5565
static Mono<SimpleResponse<Void>> mapBatchResponse(BlobBatchOperationInfo batchOperationInfo,
5666
Response<Flux<ByteBuffer>> rawResponse, boolean throwOnAnyFailure, ClientLogger logger) {
@@ -70,19 +80,29 @@ static Mono<SimpleResponse<Void>> mapBatchResponse(BlobBatchOperationInfo batchO
7080
String boundary = boundaryPieces[1];
7181

7282
return FluxUtil.collectBytesInByteBufferStream(rawResponse.getValue())
73-
.flatMap(byteArrayBody -> Mono.fromRunnable(() -> {
83+
/*
84+
* This has been changed from using 'Mono.fromRunnable' to 'Mono.create' to resolve an issue where iterating
85+
* the responses resulted in 0 responses being returned. The reason that this occurred is that
86+
* 'Mono.fromRunnable' only returns an 'onComplete' trigger with no response value, resulting on all
87+
* downstream operators being skipped as there is no 'onNext' trigger. Effectively, the request and response
88+
* worked correctly but emitting the responses failed.
89+
*
90+
* This change has an additional benefit in that the MonoSink used in 'Mono.create' now allows for 'onError'
91+
* emissions to occur instead of escaping the reactive stream boundaries when throwing an exception.
92+
*/
93+
.flatMap(byteArrayBody -> Mono.create(sink -> {
7494
String body = new String(byteArrayBody, StandardCharsets.UTF_8);
7595
List<BlobStorageException> exceptions = new ArrayList<>();
7696

7797
String[] subResponses = body.split("--" + boundary);
7898
if (subResponses.length == 3 && batchOperationInfo.getOperationCount() != 1) {
79-
String[] exceptionSections = subResponses[1].split(HTTP_NEWLINE + HTTP_NEWLINE);
99+
String[] exceptionSections = HTTP_DOUBLE_NEWLINE_PATTERN.split(subResponses[1]);
80100
int statusCode = getStatusCode(exceptionSections[1], logger);
81101
HttpHeaders headers = getHttpHeaders(exceptionSections[1]);
82102

83-
throw logger.logExceptionAsError(new BlobStorageException(
103+
sink.error(logger.logExceptionAsError(new BlobStorageException(
84104
headers.getValue(Constants.HeaderConstants.ERROR_CODE),
85-
createHttpResponse(rawResponse.getRequest(), statusCode, headers, body), body));
105+
createHttpResponse(rawResponse.getRequest(), statusCode, headers, body), body)));
86106
}
87107

88108
// Split the batch response body into batch operation responses.
@@ -93,7 +113,7 @@ static Mono<SimpleResponse<Void>> mapBatchResponse(BlobBatchOperationInfo batchO
93113
}
94114

95115
// The batch operation response will be delimited by two new lines.
96-
String[] subResponseSections = subResponse.split(HTTP_NEWLINE + HTTP_NEWLINE);
116+
String[] subResponseSections = HTTP_DOUBLE_NEWLINE_PATTERN.split(subResponse);
97117

98118
// The first section will contain batching metadata.
99119
BlobBatchOperationResponse<?> batchOperationResponse =
@@ -111,11 +131,11 @@ static Mono<SimpleResponse<Void>> mapBatchResponse(BlobBatchOperationInfo batchO
111131
}
112132

113133
if (throwOnAnyFailure && exceptions.size() != 0) {
114-
throw logger.logExceptionAsError(new BlobBatchStorageException("Batch had operation failures.",
115-
createHttpResponse(rawResponse), exceptions));
134+
sink.error(logger.logExceptionAsError(new BlobBatchStorageException("Batch had operation failures.",
135+
createHttpResponse(rawResponse), exceptions)));
116136
}
117137

118-
new SimpleResponse<>(rawResponse, null);
138+
sink.success(new SimpleResponse<>(rawResponse, null));
119139
}));
120140
}
121141

@@ -146,16 +166,16 @@ private static int getStatusCode(String responseMetadata, ClientLogger logger) {
146166
private static HttpHeaders getHttpHeaders(String responseMetadata) {
147167
HttpHeaders headers = new HttpHeaders();
148168

149-
for (String line : responseMetadata.split(HTTP_NEWLINE)) {
169+
for (String line : HTTP_NEWLINE_PATTERN.split(responseMetadata)) {
150170
if (CoreUtils.isNullOrEmpty(line) || (line.startsWith("HTTP") && !line.contains(":"))) {
151171
continue;
152172
}
153173

154-
String[] headerPieces = line.split(":\\s*", 2);
174+
String[] headerPieces = HTTP_HEADER_SPLIT_PATTERN.split(line, 2);
155175
if (headerPieces.length == 1) {
156-
headers.put(headerPieces[0], null);
176+
headers.set(headerPieces[0], (String) null);
157177
} else {
158-
headers.put(headerPieces[0], headerPieces[1]);
178+
headers.set(headerPieces[0], headerPieces[1]);
159179
}
160180
}
161181

@@ -165,7 +185,7 @@ private static HttpHeaders getHttpHeaders(String responseMetadata) {
165185
private static void setBodyOrAddException(BlobBatchOperationResponse<?> batchOperationResponse,
166186
String responseBody, List<BlobStorageException> exceptions, ClientLogger logger) {
167187
/*
168-
* Currently no batching operations will return a success body, they will only return a body on an exception.
188+
* Currently, no batching operations will return a success body, they will only return a body on an exception.
169189
* For now this will only construct the exception and throw if it should throw on an error.
170190
*/
171191
BlobStorageException exception = new BlobStorageException(responseBody,

sdk/storage/azure-storage-blob-batch/src/test/java/com/azure/storage/blob/batch/BatchAPITest.groovy

Lines changed: 92 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,29 @@
11
package com.azure.storage.blob.batch
22

3-
import com.azure.core.http.HttpPipelineBuilder
4-
import com.azure.core.http.policy.HttpPipelinePolicy
3+
54
import com.azure.core.http.rest.Response
65
import com.azure.core.test.TestMode
76
import com.azure.core.util.Context
8-
import com.azure.storage.blob.BlobServiceAsyncClient
97
import com.azure.storage.blob.BlobServiceVersion
108
import com.azure.storage.blob.batch.options.BlobBatchSetBlobAccessTierOptions
119
import com.azure.storage.blob.models.AccessTier
1210
import com.azure.storage.blob.models.BlobStorageException
1311
import com.azure.storage.blob.models.DeleteSnapshotsOptionType
1412
import com.azure.storage.blob.models.RehydratePriority
1513
import com.azure.storage.blob.sas.BlobContainerSasPermission
16-
import com.azure.storage.blob.sas.BlobSasPermission
1714
import com.azure.storage.blob.sas.BlobServiceSasSignatureValues
1815
import com.azure.storage.common.sas.AccountSasPermission
1916
import com.azure.storage.common.sas.AccountSasResourceType
2017
import com.azure.storage.common.sas.AccountSasService
2118
import com.azure.storage.common.sas.AccountSasSignatureValues
22-
import com.azure.storage.common.sas.SasIpRange
2319
import com.azure.storage.common.sas.SasProtocol
2420
import com.azure.storage.common.test.shared.extensions.RequiredServiceVersion
21+
import reactor.test.StepVerifier
2522
import spock.lang.Unroll
2623

2724
import java.nio.charset.StandardCharsets
25+
import java.time.Duration
26+
import java.util.stream.Collectors
2827

2928
class BatchAPITest extends APISpec {
3029
/*
@@ -33,6 +32,7 @@ class BatchAPITest extends APISpec {
3332
* as these requests will be properly associated to the response by their `Content-ID` but this causes issues in
3433
* playback as we are using a static response that cannot handle changes in operation order.
3534
*/
35+
3636
static def assertExpectedOrException(Response<?> response, int expectedStatusCode) {
3737
try {
3838
def statusCode = response.getStatusCode()
@@ -45,10 +45,13 @@ class BatchAPITest extends APISpec {
4545
}
4646

4747
BlobBatchClient batchClient
48+
BlobBatchAsyncClient batchAsyncClient
4849
BlobBatchClient oauthBatchClient
4950

5051
def setup() {
51-
batchClient = new BlobBatchClientBuilder(primaryBlobServiceClient).buildClient()
52+
def blobBatchClientBuilder = new BlobBatchClientBuilder(primaryBlobServiceAsyncClient)
53+
batchClient = blobBatchClientBuilder.buildClient()
54+
batchAsyncClient = blobBatchClientBuilder.buildAsyncClient()
5255
oauthBatchClient = new BlobBatchClientBuilder(getOAuthServiceClient()).buildClient()
5356
}
5457

@@ -189,9 +192,39 @@ class BatchAPITest extends APISpec {
189192
primaryBlobServiceClient.deleteBlobContainer(containerName)
190193

191194
where:
192-
leaseId | tags
193-
garbageLeaseID | null
194-
null | "\"notfoo\" = 'notbar'"
195+
leaseId | tags
196+
garbageLeaseID | null
197+
null | "\"notfoo\" = 'notbar'"
198+
}
199+
200+
// Ensures errors in the batch using BlobBatchAsyncClient are emitted as onError and are not thrown.
201+
@Unroll
202+
def "Set tier AC fail async"() {
203+
setup:
204+
def containerName = generateContainerName()
205+
def blobName1 = generateBlobName()
206+
def batch = batchAsyncClient.getBlobBatch()
207+
def containerClient = primaryBlobServiceClient.createBlobContainer(containerName)
208+
def blobClient1 = containerClient.getBlobClient(blobName1)
209+
blobClient1.getBlockBlobClient().upload(data.defaultInputStream, data.defaultDataSize)
210+
211+
when:
212+
batch.setBlobAccessTier(new BlobBatchSetBlobAccessTierOptions(blobClient1.getBlobUrl(), AccessTier.HOT)
213+
.setLeaseId(leaseId).setTagsConditions(tags))
214+
def request = batchAsyncClient.submitBatch(batch)
215+
216+
then:
217+
StepVerifier.create(request)
218+
.expectError(BlobBatchStorageException.class)
219+
.verify(Duration.ofSeconds(30))
220+
221+
cleanup:
222+
primaryBlobServiceClient.deleteBlobContainer(containerName)
223+
224+
where:
225+
leaseId | tags
226+
garbageLeaseID | null
227+
null | "\"notfoo\" = 'notbar'"
195228
}
196229

197230
def "Set tier some succeed throw on any error"() {
@@ -227,6 +260,41 @@ class BatchAPITest extends APISpec {
227260
primaryBlobServiceClient.deleteBlobContainer(containerName)
228261
}
229262

263+
def "Set tier some succeed throw on any error async"() {
264+
setup:
265+
def containerName = generateContainerName()
266+
def blobName1 = generateBlobName()
267+
def blobName2 = generateBlobName()
268+
def batch = batchAsyncClient.getBlobBatch()
269+
def containerClient = primaryBlobServiceClient.createBlobContainer(containerName)
270+
containerClient.getBlobClient(blobName1).getBlockBlobClient().upload(data.defaultInputStream, data.defaultDataSize)
271+
272+
when:
273+
def response1 = batch.setBlobAccessTier(containerName, blobName1, AccessTier.HOT)
274+
def response2 = batch.setBlobAccessTier(containerName, blobName2, AccessTier.COOL)
275+
def request = batchAsyncClient.submitBatch(batch)
276+
277+
then:
278+
StepVerifier.create(request)
279+
.expectError(BlobBatchStorageException.class)
280+
.verify(Duration.ofSeconds(30))
281+
282+
// In PLAYBACK check responses in an order invariant fashion.
283+
if (env.testMode == TestMode.PLAYBACK) {
284+
assert (assertExpectedOrException(response1, 200) + assertExpectedOrException(response2, 200)) == 1
285+
} else {
286+
assert response1.getStatusCode() == 200
287+
try {
288+
response2.getStatusCode()
289+
} catch (def exception) {
290+
assert exception instanceof BlobStorageException
291+
}
292+
}
293+
294+
cleanup:
295+
primaryBlobServiceClient.deleteBlobContainer(containerName)
296+
}
297+
230298
def "Set tier some succeed do not throw on any error"() {
231299
setup:
232300
def containerName = generateContainerName()
@@ -507,7 +575,9 @@ class BatchAPITest extends APISpec {
507575
def responses = batchClient.deleteBlobs(blobUrls, DeleteSnapshotsOptionType.INCLUDE)
508576

509577
then:
510-
for (def response : responses) {
578+
def responseList = responses.stream().collect(Collectors.toList())
579+
assert responseList.size() == 10
580+
for (def response : responseList) {
511581
assert response.getStatusCode() == 202
512582
}
513583

@@ -530,7 +600,9 @@ class BatchAPITest extends APISpec {
530600
def responses = batchClient.setBlobsAccessTier(blobUrls, AccessTier.HOT)
531601

532602
then:
533-
for (def response : responses) {
603+
def responseList = responses.stream().collect(Collectors.toList())
604+
assert responseList.size() == 10
605+
for (def response : responseList) {
534606
assert response.getStatusCode() == 200
535607
}
536608

@@ -553,7 +625,9 @@ class BatchAPITest extends APISpec {
553625
def responses = batchClient.setBlobsAccessTier(blobUrls, AccessTier.HOT)
554626

555627
then:
556-
for (def response : responses) {
628+
def responseList = responses.stream().collect(Collectors.toList())
629+
assert responseList.size() == 1
630+
for (def response : responseList) {
557631
assert response.getStatusCode() == 200
558632
}
559633

@@ -579,7 +653,9 @@ class BatchAPITest extends APISpec {
579653
def responses = batchClient.setBlobsAccessTier(blobUrls, AccessTier.HOT)
580654

581655
then:
582-
for (def response : responses) {
656+
def responseList = responses.stream().collect(Collectors.toList())
657+
assert responseList.size() == 1
658+
for (def response : responseList) {
583659
assert response.getStatusCode() == 200
584660
}
585661

@@ -601,7 +677,8 @@ class BatchAPITest extends APISpec {
601677
batchClient.deleteBlobs(blobUrls, DeleteSnapshotsOptionType.INCLUDE).iterator().next()
602678

603679
then:
604-
thrown(BlobStorageException)
680+
def ex = thrown(RuntimeException)
681+
assert ex instanceof BlobStorageException || ex.getCause() instanceof BlobStorageException
605682

606683
cleanup:
607684
primaryBlobServiceClient.deleteBlobContainer(containerName)
@@ -711,7 +788,7 @@ class BatchAPITest extends APISpec {
711788
def sas = primaryBlobServiceClient.generateAccountSas(sasValues)
712789

713790
def batchClient = new BlobBatchClientBuilder(getServiceClient(sas, primaryBlobServiceClient.getAccountUrl()))
714-
.buildClient()
791+
.buildClient()
715792

716793
def batch = batchClient.getBlobBatch()
717794

0 commit comments

Comments
 (0)