Skip to content

Commit 96c98c3

Browse files
Cosmos Spark: Fixes a bug where bulk responses with mixed results don't handle 400/409 for individual item operations properly (Azure#26069)
* Cosmos Spark: Fixes a bug where bulk responses with mixed results don't handle 400/409 for individual item operations properly * Fixing scala stylecop violations * Avoiding to log payload in the case of errors. PK+id should be sufficient to debug any failures
1 parent 911f7f1 commit 96c98c3

File tree

7 files changed

+288
-100
lines changed

7 files changed

+288
-100
lines changed

sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/BulkWriter.scala

Lines changed: 110 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,9 @@
33
package com.azure.cosmos.spark
44

55
// scalastyle:off underscore.import
6-
import com.azure.cosmos.implementation.CosmosSchedulers
76
import com.azure.cosmos.{models, _}
8-
import com.azure.cosmos.models.{CosmosBulkExecutionOptions, CosmosBulkExecutionThresholdsState, CosmosBulkItemRequestOptions, CosmosBulkOperations}
9-
import com.azure.cosmos.spark.BulkWriter.{bulkWriterBoundedElastic, getThreadInfo}
7+
import com.azure.cosmos.models._
8+
import com.azure.cosmos.spark.BulkWriter.{BulkOperationFailedException, bulkWriterBoundedElastic, getThreadInfo}
109
import com.azure.cosmos.spark.diagnostics.DefaultDiagnostics
1110
import reactor.core.scheduler.Scheduler
1211

@@ -38,6 +37,7 @@ import scala.collection.JavaConverters._
3837

3938
//scalastyle:off null
4039
//scalastyle:off multiple.string.literals
40+
//scalastyle:off file.size.limit
4141
class BulkWriter(container: CosmosAsyncContainer,
4242
writeConfig: CosmosWriteConfig,
4343
diagnosticsConfig: DiagnosticsConfig)
@@ -135,81 +135,38 @@ class BulkWriter(container: CosmosAsyncContainer,
135135

136136
bulkOperationResponseFlux.subscribe(
137137
resp => {
138-
var isGettingRetried = false
138+
var isGettingRetried = new AtomicBoolean(false)
139139
try {
140140
val itemOperation = resp.getOperation
141141
val itemOperationFound = activeOperations.remove(itemOperation)
142142
assume(itemOperationFound) // can't find the item operation in list of active operations!
143143
val context = itemOperation.getContext[OperationContext]
144+
val itemResponse = resp.getResponse
144145

145146
if (resp.getException != null) {
146147
Option(resp.getException) match {
147148
case Some(cosmosException: CosmosException) =>
148-
log.logDebug(s"encountered ${cosmosException.getStatusCode}, " +
149-
s"Context: ${operationContext.toString} ${getThreadInfo}")
150-
if (shouldIgnore(cosmosException, context)) {
151-
log.logDebug(s"for itemId=[${context.itemId}], partitionKeyValue=[${context.partitionKeyValue}], " +
152-
s"ignored encountered ${cosmosException.getStatusCode}, Context: ${operationContext.toString}")
153-
totalSuccessfulIngestionMetrics.getAndIncrement()
154-
// work done
155-
} else if (shouldRetry(cosmosException, context)) {
156-
// requeue
157-
log.logWarning(s"for itemId=[${context.itemId}], partitionKeyValue=[${context.partitionKeyValue}], " +
158-
s"encountered ${cosmosException.getStatusCode}, will retry! " +
159-
s"attemptNumber=${context.attemptNumber}, exceptionMessage=${cosmosException.getMessage}, " +
160-
s"Context: {${operationContext.toString}} ${getThreadInfo}")
161-
162-
// this is to ensure the submission will happen on a different thread in background
163-
// and doesn't block the active thread
164-
val deferredRetryMono = SMono.defer(() => {
165-
scheduleWriteInternal(itemOperation.getPartitionKeyValue,
166-
itemOperation.getItem.asInstanceOf[ObjectNode],
167-
OperationContext(context.itemId, context.partitionKeyValue, context.eTag, context.attemptNumber + 1))
168-
SMono.empty
169-
})
170-
171-
if (Exceptions.isTimeout(cosmosException)) {
172-
deferredRetryMono
173-
.delaySubscription(
174-
Duration(
175-
BulkWriter.minDelayOn408RequestTimeoutInMs +
176-
scala.util.Random.nextInt(
177-
BulkWriter.maxDelayOn408RequestTimeoutInMs - BulkWriter.minDelayOn408RequestTimeoutInMs),
178-
TimeUnit.MILLISECONDS),
179-
Schedulers.boundedElastic())
180-
.subscribeOn(Schedulers.boundedElastic())
181-
.subscribe()
182-
183-
} else {
184-
deferredRetryMono
185-
.subscribeOn(Schedulers.boundedElastic())
186-
.subscribe()
187-
}
188-
189-
isGettingRetried = true
190-
} else {
191-
log.logWarning(s"for itemId=[${context.itemId}], partitionKeyValue=[${context.partitionKeyValue}], " +
192-
s"encountered ${cosmosException.getStatusCode}, all retries exhausted! " +
193-
s"attemptNumber=${context.attemptNumber}, exceptionMessage=${cosmosException.getMessage}, " +
194-
s"Context: {${operationContext.toString} ${getThreadInfo}")
195-
captureIfFirstFailure(cosmosException)
196-
cancelWork()
197-
}
149+
handleNonSuccessfulStatusCode(
150+
context, itemOperation, itemResponse, isGettingRetried, Some(cosmosException))
198151
case _ =>
199-
log.logWarning(s"unexpected failure: itemId=[${context.itemId}], partitionKeyValue=[${context.partitionKeyValue}], " +
200-
s"encountered , attemptNumber=${context.attemptNumber}, exceptionMessage=${resp.getException.getMessage}, " +
152+
log.logWarning(
153+
s"unexpected failure: itemId=[${context.itemId}], partitionKeyValue=[" +
154+
s"${context.partitionKeyValue}], encountered , attemptNumber=${context.attemptNumber}, " +
155+
s"exceptionMessage=${resp.getException.getMessage}, " +
201156
s"Context: ${operationContext.toString} ${getThreadInfo}", resp.getException)
202157
captureIfFirstFailure(resp.getException)
203158
cancelWork()
204159
}
160+
} else if (!itemResponse.isSuccessStatusCode) {
161+
handleNonSuccessfulStatusCode(context, itemOperation, itemResponse, isGettingRetried, None)
205162
} else {
206163
// no error case
207164
totalSuccessfulIngestionMetrics.getAndIncrement()
208165
}
209166

210167
}
211168
finally {
212-
if (!isGettingRetried) {
169+
if (!isGettingRetried.get) {
213170
semaphore.release()
214171
}
215172
}
@@ -305,6 +262,88 @@ class BulkWriter(container: CosmosAsyncContainer,
305262
bulkInputEmitter.emitNext(bulkItemOperation, emitFailureHandler)
306263
}
307264

265+
//scalastyle:off method.length
266+
private[this] def handleNonSuccessfulStatusCode
267+
(
268+
context: OperationContext,
269+
itemOperation: CosmosItemOperation,
270+
itemResponse: CosmosBulkItemResponse,
271+
isGettingRetried: AtomicBoolean,
272+
responseException: Option[CosmosException]
273+
) : Unit = {
274+
275+
val exceptionMessage = responseException match {
276+
case Some(e) => e.getMessage
277+
case None => ""
278+
}
279+
280+
log.logDebug(s"encountered item operation response with status code " +
281+
s"${itemResponse.getStatusCode}:${itemResponse.getSubStatusCode}, " +
282+
s"Context: ${operationContext.toString} ${getThreadInfo}")
283+
if (shouldIgnore(itemResponse.getStatusCode, itemResponse.getSubStatusCode, context)) {
284+
log.logDebug(s"for itemId=[${context.itemId}], partitionKeyValue=[${context.partitionKeyValue}], " +
285+
s"ignored encountered status code '${itemResponse.getStatusCode}:${itemResponse.getSubStatusCode}', " +
286+
s"Context: ${operationContext.toString}")
287+
totalSuccessfulIngestionMetrics.getAndIncrement()
288+
// work done
289+
} else if (shouldRetry(itemResponse.getStatusCode, itemResponse.getSubStatusCode, context)) {
290+
// requeue
291+
log.logWarning(s"for itemId=[${context.itemId}], partitionKeyValue=[${context.partitionKeyValue}], " +
292+
s"encountered status code '${itemResponse.getStatusCode}:${itemResponse.getSubStatusCode}', will retry! " +
293+
s"attemptNumber=${context.attemptNumber}, exceptionMessage=${exceptionMessage}, " +
294+
s"Context: {${operationContext.toString}} ${getThreadInfo}")
295+
296+
// this is to ensure the submission will happen on a different thread in background
297+
// and doesn't block the active thread
298+
val deferredRetryMono = SMono.defer(() => {
299+
scheduleWriteInternal(itemOperation.getPartitionKeyValue,
300+
itemOperation.getItem.asInstanceOf[ObjectNode],
301+
OperationContext(context.itemId, context.partitionKeyValue, context.eTag, context.attemptNumber + 1))
302+
SMono.empty
303+
})
304+
305+
if (Exceptions.isTimeout(itemResponse.getStatusCode)) {
306+
deferredRetryMono
307+
.delaySubscription(
308+
Duration(
309+
BulkWriter.minDelayOn408RequestTimeoutInMs +
310+
scala.util.Random.nextInt(
311+
BulkWriter.maxDelayOn408RequestTimeoutInMs - BulkWriter.minDelayOn408RequestTimeoutInMs),
312+
TimeUnit.MILLISECONDS),
313+
Schedulers.boundedElastic())
314+
.subscribeOn(Schedulers.boundedElastic())
315+
.subscribe()
316+
317+
} else {
318+
deferredRetryMono
319+
.subscribeOn(Schedulers.boundedElastic())
320+
.subscribe()
321+
}
322+
323+
isGettingRetried.set(true)
324+
} else {
325+
log.logError(s"for itemId=[${context.itemId}], partitionKeyValue=[${context.partitionKeyValue}], " +
326+
s"encountered status code '${itemResponse.getStatusCode}:${itemResponse.getSubStatusCode}', all retries exhausted! " +
327+
s"attemptNumber=${context.attemptNumber}, exceptionMessage=${exceptionMessage}, " +
328+
s"Context: {${operationContext.toString} ${getThreadInfo}")
329+
330+
val message = s"All retries exhausted for '${itemOperation.getOperationType}' bulk operation - " +
331+
s"statusCode=[${itemResponse.getStatusCode}:${itemResponse.getSubStatusCode}] " +
332+
s"itemId=[${context.itemId}], partitionKeyValue=[${context.partitionKeyValue}]"
333+
334+
val exceptionToBeThrown = responseException match {
335+
case Some(e) =>
336+
new BulkOperationFailedException(itemResponse.getStatusCode, itemResponse.getSubStatusCode, message, e)
337+
case None =>
338+
new BulkOperationFailedException(itemResponse.getStatusCode, itemResponse.getSubStatusCode, message, null)
339+
}
340+
341+
captureIfFirstFailure(exceptionToBeThrown)
342+
cancelWork()
343+
}
344+
}
345+
//scalastyle:on method.length
346+
308347
private[this] def throwIfCapturedExceptionExists(): Unit = {
309348
val errorSnapshot = errorCaptureFirstException.get()
310349
if (errorSnapshot != null) {
@@ -476,26 +515,23 @@ class BulkWriter(container: CosmosAsyncContainer,
476515
subscriptionDisposable.dispose()
477516
}
478517

479-
private def shouldIgnore(cosmosException: CosmosException, operationContext: OperationContext): Boolean = {
518+
private def shouldIgnore(statusCode: Int, subStatusCode: Int, operationContext: OperationContext): Boolean = {
480519
val returnValue = writeConfig.itemWriteStrategy match {
481-
case ItemWriteStrategy.ItemAppend => Exceptions.isResourceExistsException(cosmosException)
482-
case ItemWriteStrategy.ItemDelete => Exceptions.isNotFoundExceptionCore(cosmosException)
483-
case ItemWriteStrategy.ItemDeleteIfNotModified => Exceptions.isNotFoundExceptionCore(cosmosException) ||
484-
Exceptions.isPreconditionFailedException(cosmosException)
520+
case ItemWriteStrategy.ItemAppend => Exceptions.isResourceExistsException(statusCode)
521+
case ItemWriteStrategy.ItemDelete => Exceptions.isNotFoundExceptionCore(statusCode, subStatusCode)
522+
case ItemWriteStrategy.ItemDeleteIfNotModified => Exceptions.isNotFoundExceptionCore(statusCode, subStatusCode) ||
523+
Exceptions.isPreconditionFailedException(statusCode)
485524
case _ => false
486525
}
487526

488-
log.logDebug(s"Should ignore exception '$cosmosException' -> $returnValue, " +
489-
s"Context: ${operationContext.toString} ${getThreadInfo}")
490-
491527
returnValue
492528
}
493529

494-
private def shouldRetry(cosmosException: CosmosException, operationContext: OperationContext): Boolean = {
530+
private def shouldRetry(statusCode: Int, subStatusCode: Int, operationContext: OperationContext): Boolean = {
495531
val returnValue = operationContext.attemptNumber < writeConfig.maxRetryCount &&
496-
Exceptions.canBeTransientFailure(cosmosException)
532+
Exceptions.canBeTransientFailure(statusCode, subStatusCode)
497533

498-
log.logDebug(s"Should retry exception '$cosmosException' -> $returnValue, " +
534+
log.logDebug(s"Should retry statusCode '$statusCode:$subStatusCode' -> $returnValue, " +
499535
s"Context: ${operationContext.toString} ${getThreadInfo}")
500536

501537
returnValue
@@ -599,7 +635,13 @@ private object BulkWriter {
599635
}
600636
s"Thread[Name: ${t.getName}, Group: $group, IsDaemon: ${t.isDaemon} Id: ${t.getId}]"
601637
}
638+
639+
private class BulkOperationFailedException(statusCode: Int, subStatusCode: Int, message:String, cause: Throwable)
640+
extends CosmosException(statusCode, message, null, cause) {
641+
BridgeInternal.setSubStatusCode(this, subStatusCode)
642+
}
602643
}
603644

604645
//scalastyle:on multiple.string.literals
605646
//scalastyle:on null
647+
//scalastyle:on file.size.limit

sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/CosmosConstants.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,13 @@ private object CosmosConstants {
3232
}
3333

3434
object StatusCodes {
35-
val Conflict = 409
36-
val ServiceUnavailable = 503
37-
val InternalServerError = 500
38-
val Gone = 410
39-
val Timeout = 408
40-
val PreconditionFailed = 412
35+
val Conflict = HttpConstants.StatusCodes.CONFLICT
36+
val ServiceUnavailable = HttpConstants.StatusCodes.SERVICE_UNAVAILABLE
37+
val InternalServerError = HttpConstants.StatusCodes.INTERNAL_SERVER_ERROR
38+
val Gone = HttpConstants.StatusCodes.GONE
39+
val Timeout = HttpConstants.StatusCodes.REQUEST_TIMEOUT
40+
val PreconditionFailed = HttpConstants.StatusCodes.PRECONDITION_FAILED
41+
val NotFound = HttpConstants.StatusCodes.NOTFOUND
4142
}
4243

4344
object SystemProperties {

sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/Exceptions.scala

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,41 +4,43 @@ package com.azure.cosmos.spark
44

55
import com.azure.cosmos.CosmosException
66
import com.azure.cosmos.implementation.HttpConstants
7+
import com.azure.cosmos.implementation.HttpConstants.SubStatusCodes
78
import reactor.core.scala.publisher.SMono
89

910
private object Exceptions {
10-
def isResourceExistsException(cosmosException: CosmosException): Boolean = {
11-
cosmosException.getStatusCode == CosmosConstants.StatusCodes.Conflict
11+
def isResourceExistsException(statusCode: Int): Boolean = {
12+
statusCode == CosmosConstants.StatusCodes.Conflict
1213
}
1314

14-
def isPreconditionFailedException(cosmosException: CosmosException): Boolean = {
15-
cosmosException.getStatusCode == CosmosConstants.StatusCodes.PreconditionFailed
15+
def isPreconditionFailedException(statusCode: Int): Boolean = {
16+
statusCode == CosmosConstants.StatusCodes.PreconditionFailed
1617
}
1718

18-
def canBeTransientFailure(cosmosException: CosmosException): Boolean = {
19+
def canBeTransientFailure(statusCode: Int, subStatusCode: Int): Boolean = {
1920
// TODO: moderakh SDK should only throw 503 and not 410,
2021
// however due a bug in core SDK we currently may throw 410 on write
2122
// once that's fixed remove GONE here
22-
cosmosException.getStatusCode == CosmosConstants.StatusCodes.Gone ||
23-
cosmosException.getStatusCode == CosmosConstants.StatusCodes.ServiceUnavailable ||
24-
cosmosException.getStatusCode == CosmosConstants.StatusCodes.InternalServerError ||
25-
cosmosException.getStatusCode == CosmosConstants.StatusCodes.Timeout
23+
statusCode == CosmosConstants.StatusCodes.Gone ||
24+
statusCode == CosmosConstants.StatusCodes.ServiceUnavailable ||
25+
statusCode == CosmosConstants.StatusCodes.InternalServerError ||
26+
statusCode == CosmosConstants.StatusCodes.Timeout ||
27+
statusCode == CosmosConstants.StatusCodes.NotFound && subStatusCode == 1002
2628
}
2729

28-
def isTimeout(cosmosException: CosmosException): Boolean = {
29-
cosmosException.getStatusCode == CosmosConstants.StatusCodes.Timeout
30+
def isTimeout(statusCode: Int): Boolean = {
31+
statusCode == CosmosConstants.StatusCodes.Timeout
3032
}
3133

3234
def isNotFoundException(throwable: Throwable): Boolean = {
3335
throwable match {
3436
case cosmosException: CosmosException =>
35-
isNotFoundExceptionCore(cosmosException)
37+
isNotFoundExceptionCore(cosmosException.getStatusCode, cosmosException.getSubStatusCode)
3638
case _ => false
3739
}
3840
}
3941

40-
def isNotFoundExceptionCore(cosmosException: CosmosException): Boolean = {
41-
cosmosException.getStatusCode == HttpConstants.StatusCodes.NOTFOUND &&
42-
cosmosException.getSubStatusCode == 0
42+
def isNotFoundExceptionCore(statusCode: Int, subStatusCode: Int): Boolean = {
43+
statusCode == CosmosConstants.StatusCodes.NotFound &&
44+
subStatusCode == 0
4345
}
4446
}

sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/PointWriter.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -176,12 +176,12 @@ class PointWriter(container: CosmosAsyncContainer, cosmosWriteConfig: CosmosWrit
176176
container.createItem(objectNode, partitionKeyValue, getOptions).block()
177177
return
178178
} catch {
179-
case e: CosmosException if Exceptions.isResourceExistsException(e) =>
179+
case e: CosmosException if Exceptions.isResourceExistsException(e.getStatusCode) =>
180180
// TODO: what should we do on unique index violation? should we ignore or throw?
181181
// TODO moderakh we need to add log messages extract identifier (id, pk) and log
182182
log.logItemWriteDetails(createOperation, "item already exists")
183183
return
184-
case e: CosmosException if Exceptions.canBeTransientFailure(e) =>
184+
case e: CosmosException if Exceptions.canBeTransientFailure(e.getStatusCode, e.getSubStatusCode) =>
185185
log.logWarning(
186186
s"create item $createOperation attempt #$attempt max remaining retries"
187187
+ s"${cosmosWriteConfig.maxRetryCount + 1 - attempt}, encountered ${e.getMessage}")
@@ -209,7 +209,7 @@ class PointWriter(container: CosmosAsyncContainer, cosmosWriteConfig: CosmosWrit
209209
.block()
210210
return
211211
} catch {
212-
case e: CosmosException if Exceptions.canBeTransientFailure(e) =>
212+
case e: CosmosException if Exceptions.canBeTransientFailure(e.getStatusCode, e.getSubStatusCode) =>
213213
log.logWarning(
214214
s"upsert item $upsertOperation attempt #$attempt max remaining retries "
215215
+ s"${cosmosWriteConfig.maxRetryCount + 1 - attempt}, encountered ${e.getMessage}")
@@ -250,13 +250,13 @@ class PointWriter(container: CosmosAsyncContainer, cosmosWriteConfig: CosmosWrit
250250
.block()
251251
return
252252
} catch {
253-
case e: CosmosException if Exceptions.isNotFoundExceptionCore(e) =>
253+
case e: CosmosException if Exceptions.isNotFoundExceptionCore(e.getStatusCode, e.getSubStatusCode) =>
254254
log.logItemWriteSkipped(deleteOperation, "notFound")
255255
return
256-
case e: CosmosException if Exceptions.isPreconditionFailedException(e) && onlyIfNotModified =>
256+
case e: CosmosException if Exceptions.isPreconditionFailedException(e.getStatusCode) && onlyIfNotModified =>
257257
log.logItemWriteSkipped(deleteOperation, "preConditionNotMet")
258258
return
259-
case e: CosmosException if Exceptions.canBeTransientFailure(e) =>
259+
case e: CosmosException if Exceptions.canBeTransientFailure(e.getStatusCode, e.getSubStatusCode) =>
260260
log.logWarning(
261261
s"delete item attempt #$attempt max remaining retries"
262262
+ s"${cosmosWriteConfig.maxRetryCount + 1 - attempt}, encountered ${e.getMessage}")

sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/TransientErrorsRetryPolicy.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ private[spark] object TransientErrorsRetryPolicy extends BasicLoggingTrait {
3434
}
3535
catch {
3636
case cosmosException: CosmosException =>
37-
if (Exceptions.canBeTransientFailure(cosmosException)) {
37+
if (Exceptions.canBeTransientFailure(cosmosException.getStatusCode, cosmosException.getSubStatusCode)) {
3838
val retryCountSnapshot = retryCount.incrementAndGet()
3939
if (retryCountSnapshot > maxRetryCount) {
4040
logError(

sdk/cosmos/azure-cosmos-spark_3-1_2-12/src/main/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIterator.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ private class TransientIOErrorsRetryingIterator
8585
}
8686
catch {
8787
case cosmosException: CosmosException =>
88-
if (Exceptions.canBeTransientFailure(cosmosException)) {
88+
if (Exceptions.canBeTransientFailure(cosmosException.getStatusCode, cosmosException.getSubStatusCode)) {
8989
val retryCountSnapshot = retryCount.incrementAndGet()
9090
if (retryCountSnapshot > maxRetryCount) {
9191
logError(

0 commit comments

Comments
 (0)