Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: DynamoDB replay rejected events #1275

Merged
merged 28 commits into from
Nov 29, 2024
Merged
Changes from 13 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
f172c54
fix: DynamoDB Opt-in to replay rejected events
patriknw Nov 28, 2024
85b2450
better recover
patriknw Nov 28, 2024
56b5552
more providers
patriknw Nov 28, 2024
8424450
bump: akka-persistence-dynamodb 2.0.3
patriknw Nov 28, 2024
cb9063c
test
patriknw Nov 28, 2024
a882e01
verify that it finds all events
patriknw Nov 28, 2024
5281789
unrelated test fail
patriknw Nov 28, 2024
df90091
add test for clock skew on event writes
pvlugter Nov 29, 2024
5984826
move replayIfPossible to adapted handler implementation
pvlugter Nov 29, 2024
5d9dcb8
add opt-in config setting for replay
pvlugter Nov 29, 2024
05e65ec
throw on replay failed, fix off-by-one, add offset store log prefix
pvlugter Nov 29, 2024
3bd2dc8
only fail replay for queries on missing replay (unexpected count)
pvlugter Nov 29, 2024
886fba0
add initial replay and tests for at-least-once grouped
pvlugter Nov 29, 2024
fa7f1d7
remove unnecessary implicits
pvlugter Nov 29, 2024
67fa7de
add initial replay implementation and tests for flow handler
pvlugter Nov 29, 2024
2576390
downgrade expected replay count warning to debug for flow handler
pvlugter Nov 29, 2024
df0217f
cleanup envelope source, log
patriknw Nov 29, 2024
5bdf207
replay sequentially in grouped at-least-once
patriknw Nov 29, 2024
3e0425e
additional validation to avoid duplicates for grouped at-least-once
patriknw Nov 29, 2024
362e543
no need to loadEnvelope of replayed
patriknw Nov 29, 2024
fa031e1
minor test cleanup
patriknw Nov 29, 2024
c68d93d
enable by default
patriknw Nov 29, 2024
673e606
comments in validation
patriknw Nov 29, 2024
2f237b9
log warning every 1000
patriknw Nov 29, 2024
ecf092c
exactly once
patriknw Nov 29, 2024
ccfbf4b
exactly once grouped
patriknw Nov 29, 2024
ce483e5
extract some trivial logging
patriknw Nov 29, 2024
2951934
enable mima
patriknw Nov 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -52,6 +52,7 @@ import akka.projection.dynamodb.internal.DynamoDBOffsetStore.Pid
import akka.projection.dynamodb.internal.DynamoDBOffsetStore.SeqNr
import akka.projection.dynamodb.scaladsl.DynamoDBProjection
import akka.projection.dynamodb.scaladsl.DynamoDBTransactHandler
import akka.projection.eventsourced.scaladsl.EventSourcedProvider.LoadEventsByPersistenceIdSourceProvider
import akka.projection.scaladsl.Handler
import akka.projection.scaladsl.SourceProvider
import akka.projection.testkit.scaladsl.ProjectionTestKit
@@ -104,11 +105,13 @@ object DynamoDBTimestampOffsetProjectionSpec {
class TestTimestampSourceProvider(
envelopes: immutable.IndexedSeq[EventEnvelope[String]],
testSourceProvider: TestSourceProvider[Offset, EventEnvelope[String]],
override val maxSlice: Int)
override val maxSlice: Int,
enableCurrentEventsByPersistenceId: Boolean)
extends SourceProvider[Offset, EventEnvelope[String]]
with BySlicesSourceProvider
with EventTimestampQuery
with LoadEventQuery {
with LoadEventQuery
with LoadEventsByPersistenceIdSourceProvider[String] {

override def source(offset: () => Future[Option[Offset]]): Future[Source[EventEnvelope[String], NotUsed]] =
testSourceProvider.source(offset)
@@ -142,6 +145,18 @@ object DynamoDBTimestampOffsetProjectionSpec {
s"Event with persistenceId [$persistenceId] and sequenceNr [$sequenceNr] not found."))
}
}

override private[akka] def currentEventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long): Option[Source[EventEnvelope[String], NotUsed]] = {
if (enableCurrentEventsByPersistenceId)
Some(Source(envelopes.filter { env =>
env.persistenceId == persistenceId && env.sequenceNr >= fromSequenceNr && env.sequenceNr <= toSequenceNr
}))
else
None
}
}

// test model is as simple as a text that gets other string concatenated to it
@@ -281,6 +296,16 @@ class DynamoDBTimestampOffsetProjectionSpec

def createSourceProvider(
envelopes: immutable.IndexedSeq[EventEnvelope[String]],
enableCurrentEventsByPersistenceId: Boolean = false,
complete: Boolean = true): TestTimestampSourceProvider = {
createSourceProviderWithMoreEnvelopes(envelopes, envelopes, enableCurrentEventsByPersistenceId, complete)
}

// envelopes are emitted by the "query" source, but allEnvelopes can be loaded
def createSourceProviderWithMoreEnvelopes(
envelopes: immutable.IndexedSeq[EventEnvelope[String]],
allEnvelopes: immutable.IndexedSeq[EventEnvelope[String]],
enableCurrentEventsByPersistenceId: Boolean,
complete: Boolean = true): TestTimestampSourceProvider = {
val sp =
TestSourceProvider[Offset, EventEnvelope[String]](Source(envelopes), _.offset)
@@ -294,7 +319,11 @@ class DynamoDBTimestampOffsetProjectionSpec
}
.withAllowCompletion(complete)

new TestTimestampSourceProvider(envelopes, sp, persistenceExt.numberOfSlices - 1)
new TestTimestampSourceProvider(
allEnvelopes,
sp,
persistenceExt.numberOfSlices - 1,
enableCurrentEventsByPersistenceId)
}

def createBacktrackingSourceProvider(
@@ -304,7 +333,11 @@ class DynamoDBTimestampOffsetProjectionSpec
TestSourceProvider[Offset, EventEnvelope[String]](Source(envelopes), _.offset)
.withStartSourceFrom { (_, _) => false } // include all
.withAllowCompletion(complete)
new TestTimestampSourceProvider(envelopes, sp, persistenceExt.numberOfSlices - 1)
new TestTimestampSourceProvider(
envelopes,
sp,
persistenceExt.numberOfSlices - 1,
enableCurrentEventsByPersistenceId = false)
}

private def latestOffsetShouldBe(expected: Any)(implicit offsetStore: DynamoDBOffsetStore) = {
@@ -857,6 +890,101 @@ class DynamoDBTimestampOffsetProjectionSpec
latestOffsetShouldBe(envelopes.last.offset)
}
}

"replay rejected sequence numbers" in {
val pid1 = UUID.randomUUID().toString
val pid2 = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()

val allEnvelopes = createEnvelopes(pid1, 6) ++ createEnvelopes(pid2, 3)
val envelopes = allEnvelopes.filterNot { env =>
(env.persistenceId == pid1 && (env.sequenceNr == 3 || env.sequenceNr == 4 || env.sequenceNr == 5)) ||
(env.persistenceId == pid2 && (env.sequenceNr == 1))
}

val sourceProvider =
createSourceProviderWithMoreEnvelopes(envelopes, allEnvelopes, enableCurrentEventsByPersistenceId = true)

implicit val offsetStore: DynamoDBOffsetStore =
new DynamoDBOffsetStore(projectionId, Some(sourceProvider), system, settings, client)

val projectionRef = spawn(
ProjectionBehavior(
DynamoDBProjection
.atLeastOnce(
projectionId,
Some(settings.withReplayOnRejectedSequenceNumbers(true)),
sourceProvider,
handler = () => new ConcatHandler(repository))))

eventually {
projectedValueShouldBe("e1|e2|e3|e4|e5|e6")(pid1)
projectedValueShouldBe("e1|e2|e3")(pid2)
}

eventually {
latestOffsetShouldBe(allEnvelopes.last.offset)
}
projectionRef ! ProjectionBehavior.Stop
}

"replay rejected sequence numbers due to clock skew on event write" in {
val pid1 = UUID.randomUUID().toString
val pid2 = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()

val start = tick().instant()

def createEnvelopesFor(
pid: Pid,
fromSeqNr: Int,
toSeqNr: Int,
fromTimestamp: Instant): immutable.IndexedSeq[EventEnvelope[String]] = {
(fromSeqNr to toSeqNr).map { n =>
createEnvelope(pid, n, fromTimestamp.plusSeconds(n - fromSeqNr), s"e$n")
}
}

val envelopes1 =
createEnvelopesFor(pid1, 1, 2, start) ++
createEnvelopesFor(pid1, 3, 4, start.plusSeconds(4)) ++ // gap
createEnvelopesFor(pid1, 5, 9, start.plusSeconds(2)) // clock skew, back 2, and then overlapping

val envelopes2 =
createEnvelopesFor(pid2, 1, 3, start.plusSeconds(10)) ++
createEnvelopesFor(pid2, 4, 6, start.plusSeconds(1)) ++ // clock skew, back 9
createEnvelopesFor(pid2, 7, 9, start.plusSeconds(20)) // and gap

val allEnvelopes = envelopes1 ++ envelopes2

val envelopes = allEnvelopes.sortBy(_.timestamp)

val sourceProvider =
createSourceProviderWithMoreEnvelopes(envelopes, allEnvelopes, enableCurrentEventsByPersistenceId = true)

implicit val offsetStore: DynamoDBOffsetStore =
new DynamoDBOffsetStore(projectionId, Some(sourceProvider), system, settings, client)

val projectionRef = spawn(
ProjectionBehavior(
DynamoDBProjection
.atLeastOnce(
projectionId,
Some(settings.withReplayOnRejectedSequenceNumbers(true)),
sourceProvider,
handler = () => new ConcatHandler(repository))))

eventually {
projectedValueShouldBe("e1|e2|e3|e4|e5|e6|e7|e8|e9")(pid1)
projectedValueShouldBe("e1|e2|e3|e4|e5|e6|e7|e8|e9")(pid2)
}

eventually {
latestOffsetShouldBe(envelopes.last.offset)
}

projectionRef ! ProjectionBehavior.Stop
}
}

"A DynamoDB exactly-once projection with TimestampOffset" must {
@@ -1113,6 +1241,7 @@ class DynamoDBTimestampOffsetProjectionSpec
offsetShouldBeEmpty()
projectionTestKit.run(projection) {
projectedTestValueShouldBe("e1|e2|e5")
offsetStore.storedSeqNr(pid).futureValue shouldBe 6
}
latestOffsetShouldBe(envelopes.last.offset)
}
@@ -1436,6 +1565,126 @@ class DynamoDBTimestampOffsetProjectionSpec
latestOffsetShouldBe(envelopes.last.offset)
}
}

"replay rejected sequence numbers for at-least-once grouped" in {
val pid1 = UUID.randomUUID().toString
val pid2 = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()

val allEnvelopes = createEnvelopes(pid1, 6) ++ createEnvelopes(pid2, 3)
val envelopes = allEnvelopes.filterNot { env =>
(env.persistenceId == pid1 && (env.sequenceNr == 3 || env.sequenceNr == 4 || env.sequenceNr == 5)) ||
(env.persistenceId == pid2 && (env.sequenceNr == 1))
}

val sourceProvider =
createSourceProviderWithMoreEnvelopes(envelopes, allEnvelopes, enableCurrentEventsByPersistenceId = true)

implicit val offsetStore: DynamoDBOffsetStore =
new DynamoDBOffsetStore(projectionId, Some(sourceProvider), system, settings, client)

val results = new ConcurrentHashMap[String, String]()

val handler: Handler[Seq[EventEnvelope[String]]] =
(envelopes: Seq[EventEnvelope[String]]) => {
Future {
envelopes.foreach { envelope =>
results.putIfAbsent(envelope.persistenceId, "|")
results.computeIfPresent(envelope.persistenceId, (_, value) => value + envelope.event + "|")
}
}.map(_ => Done)
}

val projection =
DynamoDBProjection
.atLeastOnceGroupedWithin(
projectionId,
Some(settings.withReplayOnRejectedSequenceNumbers(true)),
sourceProvider,
handler = () => handler)
.withGroup(2, 3.seconds)

offsetShouldBeEmpty()

projectionTestKit.run(projection) {
results.get(pid1) shouldBe "|e1|e2|e3|e4|e5|e6|"
results.get(pid2) shouldBe "|e1|e2|e3|"
}

eventually {
latestOffsetShouldBe(allEnvelopes.last.offset)
}
}

"replay rejected sequence numbers due to clock skew on event write for at-least-once grouped" in {
val pid1 = UUID.randomUUID().toString
val pid2 = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()

val start = tick().instant()

def createEnvelopesFor(
pid: Pid,
fromSeqNr: Int,
toSeqNr: Int,
fromTimestamp: Instant): immutable.IndexedSeq[EventEnvelope[String]] = {
(fromSeqNr to toSeqNr).map { n =>
createEnvelope(pid, n, fromTimestamp.plusSeconds(n - fromSeqNr), s"e$n")
}
}

val envelopes1 =
createEnvelopesFor(pid1, 1, 2, start) ++
createEnvelopesFor(pid1, 3, 4, start.plusSeconds(4)) ++ // gap
createEnvelopesFor(pid1, 5, 9, start.plusSeconds(2)) // clock skew, back 2, and then overlapping

val envelopes2 =
createEnvelopesFor(pid2, 1, 3, start.plusSeconds(10)) ++
createEnvelopesFor(pid2, 4, 6, start.plusSeconds(1)) ++ // clock skew, back 9
createEnvelopesFor(pid2, 7, 9, start.plusSeconds(20)) // and gap

val allEnvelopes = envelopes1 ++ envelopes2

val envelopes = allEnvelopes.sortBy(_.timestamp)

val sourceProvider =
createSourceProviderWithMoreEnvelopes(envelopes, allEnvelopes, enableCurrentEventsByPersistenceId = true)

implicit val offsetStore: DynamoDBOffsetStore =
new DynamoDBOffsetStore(projectionId, Some(sourceProvider), system, settings, client)

val results = new ConcurrentHashMap[String, String]()

val handler: Handler[Seq[EventEnvelope[String]]] =
(envelopes: Seq[EventEnvelope[String]]) => {
Future {
envelopes.foreach { envelope =>
results.putIfAbsent(envelope.persistenceId, "|")
results.computeIfPresent(envelope.persistenceId, (_, value) => value + envelope.event + "|")
}
}.map(_ => Done)
}

val projection =
DynamoDBProjection
.atLeastOnceGroupedWithin(
projectionId,
Some(settings.withReplayOnRejectedSequenceNumbers(true)),
sourceProvider,
handler = () => handler)
.withGroup(2, 3.seconds)

offsetShouldBeEmpty()

projectionTestKit.run(projection) {
results.get(pid1) shouldBe "|e1|e2|e3|e4|e5|e6|e7|e8|e9|"
results.get(pid2) shouldBe "|e1|e2|e3|e4|e5|e6|e7|e8|e9|"
}

eventually {
latestOffsetShouldBe(allEnvelopes.last.offset)
}
}
}

"A DynamoDB flow projection with TimestampOffset" must {
Original file line number Diff line number Diff line change
@@ -23,15 +23,19 @@ import akka.persistence.query.typed.EventEnvelope
import akka.persistence.query.typed.scaladsl.EventTimestampQuery
import akka.persistence.query.typed.scaladsl.LoadEventQuery
import akka.projection.BySlicesSourceProvider
import akka.projection.eventsourced.scaladsl.EventSourcedProvider.LoadEventsByPersistenceIdSourceProvider
import akka.projection.scaladsl.SourceProvider
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.Source

class TestSourceProviderWithInput()(implicit val system: ActorSystem[_])
class TestSourceProviderWithInput(enableCurrentEventsByPersistenceId: Boolean)(implicit val system: ActorSystem[_])
extends SourceProvider[TimestampOffset, EventEnvelope[String]]
with BySlicesSourceProvider
with EventTimestampQuery
with LoadEventQuery {
with LoadEventQuery
with LoadEventsByPersistenceIdSourceProvider[String] {

def this()(implicit system: ActorSystem[_]) = this(enableCurrentEventsByPersistenceId = false)

private implicit val ec: ExecutionContext = system.executionContext
private val persistenceExt = Persistence(system)
@@ -96,4 +100,22 @@ class TestSourceProviderWithInput()(implicit val system: ActorSystem[_])
s"Event with persistenceId [$persistenceId] and sequenceNr [$sequenceNr] not found."))
}
}

override private[akka] def currentEventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long): Option[Source[EventEnvelope[String], NotUsed]] = {
if (enableCurrentEventsByPersistenceId)
Some(
Source(
envelopes
.iterator()
.asScala
.filter { env =>
env.persistenceId == persistenceId && env.sequenceNr >= fromSequenceNr && env.sequenceNr <= toSequenceNr
}
.toVector))
else
None
}
}
3 changes: 3 additions & 0 deletions akka-projection-dynamodb/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -44,6 +44,9 @@ akka.projection.dynamodb {
}
}

# Replay missed events for a particular persistence id when a sequence number is rejected by validation.
replay-on-rejected-sequence-numbers = off

# By default it shares DynamoDB client with akka-persistence-dynamodb (write side).
# To use a separate client for projections this can be
# set to another config path that defines the config based on
Loading