Skip to content

Commit

Permalink
fix: DynamoDB - Avoid dropping sequence number progress (#1260)
Browse files Browse the repository at this point in the history
* retry unprocessed items in batchwriterequest
* cleanup: configurability, backoffs for retries, and more idiomatic failures
* feedback
* add retry delay to debug log

---------

Co-authored-by: Peter Vlugter <[email protected]>
  • Loading branch information
leviramsey and pvlugter authored Nov 22, 2024
1 parent 786d43a commit 17cd73d
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 16 deletions.
8 changes: 8 additions & 0 deletions akka-projection-dynamodb/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ akka.projection.dynamodb {
# reducing this may result in fewer restarts of the projection due to failure to query
# starting offsets.
offset-slice-read-parallelism = 64

# Batch writes are not automatically retried by the underlying SDK, so these settings govern those retries
retries {
max-retries = 3
min-backoff = 200ms
max-backoff = 2s
random-factor = 0.3
}
}

# By default it shares DynamoDB client with akka-persistence-dynamodb (write side).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ object DynamoDBProjectionSettings {
warnAboutFilteredEventsInFlow = config.getBoolean("warn-about-filtered-events-in-flow"),
offsetBatchSize = config.getInt("offset-store.offset-batch-size"),
offsetSliceReadParallelism = config.getInt("offset-store.offset-slice-read-parallelism"),
timeToLiveSettings = TimeToLiveSettings(config.getConfig("time-to-live")))
timeToLiveSettings = TimeToLiveSettings(config.getConfig("time-to-live")),
retrySettings = RetrySettings(config.getConfig("offset-store.retries")))
}

/**
Expand All @@ -68,7 +69,8 @@ final class DynamoDBProjectionSettings private (
val warnAboutFilteredEventsInFlow: Boolean,
val offsetBatchSize: Int,
val offsetSliceReadParallelism: Int,
val timeToLiveSettings: TimeToLiveSettings) {
val timeToLiveSettings: TimeToLiveSettings,
val retrySettings: RetrySettings) {

def withTimestampOffsetTable(timestampOffsetTable: String): DynamoDBProjectionSettings =
copy(timestampOffsetTable = timestampOffsetTable)
Expand Down Expand Up @@ -106,6 +108,9 @@ final class DynamoDBProjectionSettings private (
def withTimeToLiveSettings(timeToLiveSettings: TimeToLiveSettings): DynamoDBProjectionSettings =
copy(timeToLiveSettings = timeToLiveSettings)

def withRetrySettings(retrySettings: RetrySettings): DynamoDBProjectionSettings =
copy(retrySettings = retrySettings)

@nowarn("msg=deprecated")
private def copy(
timestampOffsetTable: String = timestampOffsetTable,
Expand All @@ -114,7 +119,8 @@ final class DynamoDBProjectionSettings private (
warnAboutFilteredEventsInFlow: Boolean = warnAboutFilteredEventsInFlow,
offsetBatchSize: Int = offsetBatchSize,
offsetSliceReadParallelism: Int = offsetSliceReadParallelism,
timeToLiveSettings: TimeToLiveSettings = timeToLiveSettings) =
timeToLiveSettings: TimeToLiveSettings = timeToLiveSettings,
retrySettings: RetrySettings = retrySettings) =
new DynamoDBProjectionSettings(
timestampOffsetTable,
useClient,
Expand All @@ -124,7 +130,8 @@ final class DynamoDBProjectionSettings private (
warnAboutFilteredEventsInFlow,
offsetBatchSize,
offsetSliceReadParallelism,
timeToLiveSettings)
timeToLiveSettings,
retrySettings)

override def toString =
s"DynamoDBProjectionSettings($timestampOffsetTable, $useClient, $timeWindow, $warnAboutFilteredEventsInFlow, $offsetBatchSize)"
Expand Down Expand Up @@ -200,3 +207,48 @@ final class ProjectionTimeToLiveSettings private (val offsetTimeToLive: Option[F
private def copy(offsetTimeToLive: Option[FiniteDuration]): ProjectionTimeToLiveSettings =
new ProjectionTimeToLiveSettings(offsetTimeToLive)
}

object RetrySettings {
val defaults: RetrySettings =
new RetrySettings(maxRetries = 3, minBackoff = 200.millis, maxBackoff = 2.seconds, randomFactor = 0.3)

def apply(config: Config): RetrySettings = {
new RetrySettings(
maxRetries = config.getInt("max-retries"),
minBackoff = config.getDuration("min-backoff").toScala,
maxBackoff = config.getDuration("max-backoff").toScala,
randomFactor = config.getDouble("random-factor"))
}
}

final class RetrySettings private (
val maxRetries: Int,
val minBackoff: FiniteDuration,
val maxBackoff: FiniteDuration,
val randomFactor: Double) {

def withMaxRetries(maxRetries: Int): RetrySettings =
copy(maxRetries = maxRetries)

def withMinBackoff(minBackoff: FiniteDuration): RetrySettings =
copy(minBackoff = minBackoff)

def withMinBackoff(minBackoff: JDuration): RetrySettings =
copy(minBackoff = minBackoff.toScala)

def withMaxBackoff(maxBackoff: FiniteDuration): RetrySettings =
copy(maxBackoff = maxBackoff)

def withMaxBackoff(maxBackoff: JDuration): RetrySettings =
copy(maxBackoff = maxBackoff.toScala)

def withRandomFactor(randomFactor: Double): RetrySettings =
copy(randomFactor = randomFactor)

private def copy(
maxRetries: Int = maxRetries,
minBackoff: FiniteDuration = minBackoff,
maxBackoff: FiniteDuration = maxBackoff,
randomFactor: Double = randomFactor) =
new RetrySettings(maxRetries, minBackoff, maxBackoff, randomFactor)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import scala.jdk.FutureConverters._
import akka.Done
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.pattern.BackoffSupervisor
import akka.pattern.after
import akka.persistence.dynamodb.internal.InstantFactory
import akka.persistence.query.TimestampOffset
import akka.projection.ProjectionId
Expand All @@ -35,6 +37,7 @@ import software.amazon.awssdk.services.dynamodb.model.ReturnConsumedCapacity
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItemsRequest
import software.amazon.awssdk.services.dynamodb.model.WriteRequest
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse

/**
* INTERNAL API
Expand All @@ -59,6 +62,8 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest
val timestampBySlicePid = AttributeValue.fromS("_")
val managementStateBySlicePid = AttributeValue.fromS("_mgmt")
}

final class BatchWriteFailed(val lastResponse: BatchWriteItemResponse) extends Exception
}

/**
Expand All @@ -70,6 +75,7 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest
projectionId: ProjectionId,
client: DynamoDbAsyncClient) {
import OffsetStoreDao.log
import OffsetStoreDao.BatchWriteFailed
import OffsetStoreDao.MaxBatchSize
import OffsetStoreDao.MaxTransactItems
import system.executionContext
Expand Down Expand Up @@ -113,6 +119,47 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest
}(ExecutionContext.parasitic)
}

implicit def sys: ActorSystem[_] = system

private def writeBatchWithRetries(
batchReq: BatchWriteItemRequest,
retries: Int = 0): Future[List[BatchWriteItemResponse]] = {
val result = client.batchWriteItem(batchReq).asScala

result.flatMap { response =>
if (response.hasUnprocessedItems && !response.unprocessedItems.isEmpty) {
if (retries >= settings.retrySettings.maxRetries) {
Future.failed(new BatchWriteFailed(response))
} else { // retry after exponential backoff
val unprocessed = response.unprocessedItems
val newReq = batchReq.toBuilder.requestItems(unprocessed).build()
val nextRetry = retries + 1
val delay = BackoffSupervisor.calculateDelay(
retries,
settings.retrySettings.minBackoff,
settings.retrySettings.maxBackoff,
settings.retrySettings.randomFactor)

if (log.isDebugEnabled) {
val count = unprocessed.asScala.valuesIterator.map(_.size).sum
log.debug(
"Not all writes in batch were applied, retrying in [{}]: [{}] unapplied writes, [{}/{}] retries",
delay.toCoarsest,
count,
nextRetry,
settings.retrySettings.maxRetries)
}

after(delay) {
writeBatchWithRetries(newReq, nextRetry)
}.map { responses => response :: responses }(ExecutionContext.parasitic)
}
} else {
Future.successful(List(response))
}
}
}

def storeTimestampOffsets(offsetsBySlice: Map[Int, TimestampOffset]): Future[Done] = {
import OffsetStoreDao.OffsetStoreAttributes._

Expand Down Expand Up @@ -163,22 +210,37 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest
.returnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
.build()

val result = client.batchWriteItem(req).asScala
val result = writeBatchWithRetries(req)

if (log.isDebugEnabled()) {
result.foreach { response =>
result.foreach { responses =>
log.debug(
"Wrote latest timestamps for [{}] slices, consumed [{}] WCU",
offsetsBatch.size,
response.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue()).sum)
responses.iterator.flatMap(_.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue)).sum)
}
}
result
.map(_ => Done)(ExecutionContext.parasitic)
.recoverWith {
case failed: BatchWriteFailed =>
val unprocessedSliceItems = failed.lastResponse.unprocessedItems
.get(settings.timestampOffsetTable)
.asScala
.toVector
.map(_.putRequest.item)

val unprocessedSlices = unprocessedSliceItems.map(_.get(NameSlice).s)
log.warn(
"Failed to write latest timestamps for [{}] slices: [{}]",
unprocessedSlices.size,
unprocessedSlices.mkString(", "))

Future.failed(failed)

case c: CompletionException =>
Future.failed(c.getCause)
}(ExecutionContext.parasitic)
}
}

if (offsetsBySlice.size <= MaxBatchSize) {
Expand Down Expand Up @@ -221,18 +283,43 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest
.returnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
.build()

val result = client.batchWriteItem(req).asScala
val result = writeBatchWithRetries(req)

if (log.isDebugEnabled()) {
result.foreach { response =>
result.foreach { responses =>
log.debug(
"Wrote [{}] sequence numbers, consumed [{}] WCU",
recordsBatch.size,
response.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue()).sum)
responses.iterator.flatMap(_.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue)).sum)
}
}

result.map(_ => Done)(ExecutionContext.parasitic)
result
.map(_ => Done)(ExecutionContext.parasitic)
.recoverWith {
case failed: BatchWriteFailed =>
val unprocessedSeqNrItems =
failed.lastResponse.unprocessedItems
.get(settings.timestampOffsetTable)
.asScala
.toVector
.map(_.putRequest.item)

val unprocessedSeqNrs = unprocessedSeqNrItems.map { item =>
import OffsetStoreDao.OffsetStoreAttributes._
s"${item.get(NameSlice).s}: ${item.get(Pid).s}"
}

log.warn(
"Failed to write sequence numbers for [{}] persistence IDs: [{}]",
unprocessedSeqNrs.size,
unprocessedSeqNrs.mkString(", "))

Future.failed(failed)

case c: CompletionException =>
Future.failed(c.getCause)
}
}

if (records.size <= MaxBatchSize) {
Expand Down Expand Up @@ -409,22 +496,41 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest
.returnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
.build()

val result = client.batchWriteItem(req).asScala
val result = writeBatchWithRetries(req)

if (log.isDebugEnabled()) {
result.foreach { response =>
result.foreach { responses =>
log.debug(
"Wrote management state for [{}] slices, consumed [{}] WCU",
slices.size,
response.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue()).sum)
responses.iterator.flatMap(_.consumedCapacity.iterator.asScala.map(_.capacityUnits.doubleValue)).sum)
}
}
result
.map(_ => Done)(ExecutionContext.parasitic)
.recoverWith {
case failed: BatchWriteFailed =>
val unprocessedStateItems =
failed.lastResponse.unprocessedItems
.get(settings.timestampOffsetTable)
.asScala
.toVector
.map(_.putRequest.item)

val unprocessedStates = unprocessedStateItems.map { item =>
s"${item.get(NameSlice).s}-${item.get(Paused).bool}"
}

log.warn(
"Failed to write management state for [{}] slices: [{}]",
unprocessedStates.size,
unprocessedStates.mkString(", "))

Future.failed(failed)

case c: CompletionException =>
Future.failed(c.getCause)
}(ExecutionContext.parasitic)
}
}

val sliceRange = (minSlice to maxSlice).toVector
Expand Down

0 comments on commit 17cd73d

Please sign in to comment.