Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: DynamoDB Timestamp validation of previous seqNr #1258

Merged
merged 1 commit into from
Nov 22, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -100,13 +100,15 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)
private def createOffsetStore(
projectionId: ProjectionId,
customSettings: DynamoDBProjectionSettings = settings,
offsetStoreClock: TestClock = clock,
eventTimestampQueryClock: TestClock = clock) =
new DynamoDBOffsetStore(
projectionId,
Some(new TestTimestampSourceProvider(0, persistenceExt.numberOfSlices - 1, eventTimestampQueryClock)),
system,
customSettings,
client)
client,
offsetStoreClock)

def createEnvelope(pid: Pid, seqNr: SeqNr, timestamp: Instant, event: String): EventEnvelope[String] = {
val entityType = PersistenceId.extractEntityType(pid)
@@ -559,6 +561,11 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)
val eventTimestampQueryClock = TestClock.nowMicros()
val offsetStore = createOffsetStore(projectionId, eventTimestampQueryClock = eventTimestampQueryClock)

// some validation require the startTimestamp, which is set from readOffset
offsetStore.getState().startTimestampBySlice.size shouldBe 0
offsetStore.readOffset().futureValue
offsetStore.getState().startTimestampBySlice.values.toSet shouldBe Set(clock.instant())

val startTime = TestClock.nowMicros().instant()
val offset1 = TimestampOffset(startTime, Map("p1" -> 3L, "p2" -> 1L, "p3" -> 5L))
offsetStore.saveOffset(OffsetPidSeqNr(offset1, "p1", 3L)).futureValue
@@ -1033,5 +1040,59 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)
// "set offset" in {
// "clear offset" in {

"validate timestamp of previous sequence number" in {
import DynamoDBOffsetStore.Validation._

val projectionName = UUID.randomUUID().toString

def offsetStore(minSlice: Int, maxSlice: Int) =
new DynamoDBOffsetStore(
ProjectionId(projectionName, s"$minSlice-$maxSlice"),
Some(new TestTimestampSourceProvider(minSlice, maxSlice, clock)),
system,
settings,
client,
clock)

// one projection at lower scale
val offsetStore1 = offsetStore(512, 1023)

// two projections at higher scale
val offsetStore2 = offsetStore(512, 767)

val p1 = "p-0960" // slice 576
val p2 = "p-6009" // slice 640
val p3 = "p-3039" // slice 832

val t0 = clock.instant().minusSeconds(100)
def time(step: Int) = t0.plusSeconds(step)

// starting with 2 projections, testing 512-1023
offsetStore1.saveOffset(OffsetPidSeqNr(TimestampOffset(time(2), Map(p1 -> 1L)), p1, 1L)).futureValue
offsetStore1.saveOffset(OffsetPidSeqNr(TimestampOffset(time(100), Map(p3 -> 1L)), p3, 1L)).futureValue

// scaled up to 4 projections, testing 512-767
offsetStore2.readOffset().futureValue
offsetStore2.getState().startTimestampBySlice(576) shouldBe time(2)
val slice640StartTimestamp = offsetStore2.getState().startTimestampBySlice(640)
slice640StartTimestamp shouldBe clock.instant()
val latestTime = time(10)
offsetStore2.saveOffset(OffsetPidSeqNr(TimestampOffset(latestTime, Map(p1 -> 2L)), p1, 2L)).futureValue
offsetStore2.getState().latestTimestamp shouldBe latestTime

// clock is used by TestTimestampSourceProvider.timestampOf for timestamp of previous seqNr.
// rejected if timestamp of previous seqNr is after start timestamp minus backtracking window
clock.setInstant(slice640StartTimestamp.minus(settings.backtrackingWindow.minusSeconds(1)))
offsetStore2
.validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4")))
.futureValue shouldBe RejectedBacktrackingSeqNr
// accepted if timestamp of previous seqNr is before start timestamp minus backtracking window
clock.setInstant(slice640StartTimestamp.minus(settings.timeWindow.plusSeconds(1)))
offsetStore2
.validate(backtrackingEnvelope(createEnvelope(p2, 4L, latestTime.minusSeconds(20), "event4")))
.futureValue shouldBe Accepted

}

}
}
6 changes: 6 additions & 0 deletions akka-projection-dynamodb/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -12,6 +12,12 @@ akka.projection.dynamodb {
# within this time window from latest offset.
time-window = 5 minutes

# Backtracking window of the source (query). Should be equal to
# the akka.persistence.dynamodb.query.backtracking.window that is used for the
# SourceProvider.
# It should not be larger than the akka.projection.dynamodb.offset-store.time-window.
backtracking-window = ${akka.persistence.dynamodb.query.backtracking.window}

# Trying to batch insert offsets in batches of this size.
offset-batch-size = 20

Original file line number Diff line number Diff line change
@@ -41,6 +41,7 @@ object DynamoDBProjectionSettings {
timestampOffsetTable = config.getString("offset-store.timestamp-offset-table"),
useClient = config.getString("use-client"),
timeWindow = config.getDuration("offset-store.time-window"),
backtrackingWindow = config.getDuration("offset-store.backtracking-window"),
keepNumberOfEntries = 0,
evictInterval = JDuration.ZERO,
warnAboutFilteredEventsInFlow = config.getBoolean("warn-about-filtered-events-in-flow"),
@@ -61,6 +62,7 @@ final class DynamoDBProjectionSettings private (
val timestampOffsetTable: String,
val useClient: String,
val timeWindow: JDuration,
val backtrackingWindow: JDuration,
@deprecated("Not used, evict is only based on time window", "1.6.2")
val keepNumberOfEntries: Int,
@deprecated("Not used, evict is not periodic", "1.6.2")
@@ -82,6 +84,12 @@ final class DynamoDBProjectionSettings private (
def withTimeWindow(timeWindow: JDuration): DynamoDBProjectionSettings =
copy(timeWindow = timeWindow)

def withBacktrackingWindow(backtrackingWindow: FiniteDuration): DynamoDBProjectionSettings =
copy(backtrackingWindow = backtrackingWindow.toJava)

def withBacktrackingWindow(backtrackingWindow: JDuration): DynamoDBProjectionSettings =
copy(backtrackingWindow = backtrackingWindow)

@deprecated("Not used, evict is only based on time window", "1.6.2")
def withKeepNumberOfEntries(keepNumberOfEntries: Int): DynamoDBProjectionSettings =
this
@@ -111,6 +119,7 @@ final class DynamoDBProjectionSettings private (
timestampOffsetTable: String = timestampOffsetTable,
useClient: String = useClient,
timeWindow: JDuration = timeWindow,
backtrackingWindow: JDuration = backtrackingWindow,
warnAboutFilteredEventsInFlow: Boolean = warnAboutFilteredEventsInFlow,
offsetBatchSize: Int = offsetBatchSize,
offsetSliceReadParallelism: Int = offsetSliceReadParallelism,
@@ -119,6 +128,7 @@ final class DynamoDBProjectionSettings private (
timestampOffsetTable,
useClient,
timeWindow,
backtrackingWindow,
keepNumberOfEntries,
evictInterval,
warnAboutFilteredEventsInFlow,
@@ -127,7 +137,7 @@ final class DynamoDBProjectionSettings private (
timeToLiveSettings)

override def toString =
s"DynamoDBProjectionSettings($timestampOffsetTable, $useClient, $timeWindow, $warnAboutFilteredEventsInFlow, $offsetBatchSize)"
s"DynamoDBProjectionSettings($timestampOffsetTable, $useClient, $timeWindow, $backtrackingWindow, $warnAboutFilteredEventsInFlow, $offsetBatchSize)"
}

object TimeToLiveSettings {
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@

package akka.projection.dynamodb.internal

import java.time.Clock
import java.time.Instant
import java.time.{ Duration => JDuration }
import java.util.concurrent.atomic.AtomicReference
@@ -73,18 +74,21 @@ private[projection] object DynamoDBOffsetStore {
fromSnapshot: Boolean)

object State {
val empty: State = State(Map.empty, Map.empty, Map.empty)
val empty: State = State(Map.empty, Map.empty, Map.empty, Map.empty)

def apply(offsetBySlice: Map[Int, TimestampOffset]): State =
if (offsetBySlice.isEmpty) empty
else new State(Map.empty, Map.empty, offsetBySlice)
def apply(offsetBySlice: Map[Int, TimestampOffset], startTimestampBySlice: Map[Int, Instant]): State =
if (offsetBySlice.isEmpty && startTimestampBySlice.isEmpty)
empty
else
new State(Map.empty, Map.empty, offsetBySlice, startTimestampBySlice)

}

final case class State(
byPid: Map[Pid, Record],
bySliceSorted: Map[Int, TreeSet[Record]],
offsetBySlice: Map[Int, TimestampOffset]) {
offsetBySlice: Map[Int, TimestampOffset],
startTimestampBySlice: Map[Int, Instant]) {

def size: Int = byPid.size

@@ -191,7 +195,8 @@ private[projection] class DynamoDBOffsetStore(
sourceProvider: Option[BySlicesSourceProvider],
system: ActorSystem[_],
settings: DynamoDBProjectionSettings,
client: DynamoDbAsyncClient) {
client: DynamoDbAsyncClient,
clock: Clock = Clock.systemUTC()) {

import DynamoDBOffsetStore._

@@ -282,7 +287,13 @@ private[projection] class DynamoDBOffsetStore(
})

offsetBySliceFut.map { offsetBySlice =>
val newState = State(offsetBySlice)
val now = clock.instant()
val startTimestampBySlice =
(minSlice to maxSlice).map { slice =>
slice -> offsetBySlice.get(slice).map(_.timestamp).getOrElse(now)
}.toMap

val newState = State(offsetBySlice, startTimestampBySlice)

if (!state.compareAndSet(oldState, newState))
throw new IllegalStateException("Unexpected concurrent modification of state from readOffset.")
@@ -560,32 +571,6 @@ private[projection] class DynamoDBOffsetStore(
recordWithOffset.offset)
}

def logUnknown(): Unit = {
if (recordWithOffset.fromPubSub) {
logger.debug(
"{} Rejecting pub-sub envelope, unknown sequence number [{}] for pid [{}] (might be accepted later): {}",
logPrefix,
seqNr,
pid,
recordWithOffset.offset)
} else if (!recordWithOffset.fromBacktracking) {
// This may happen rather frequently when using `publish-events`, after reconnecting and such.
logger.debug(
"{} Rejecting unknown sequence number [{}] for pid [{}] (might be accepted later): {}",
logPrefix,
seqNr,
pid,
recordWithOffset.offset)
} else {
logger.warn(
"{} Rejecting unknown sequence number [{}] for pid [{}]. Offset: {}",
logPrefix,
seqNr,
pid,
recordWithOffset.offset)
}
}

if (prevSeqNr > 0) {
// expecting seqNr to be +1 of previously known
val ok = seqNr == prevSeqNr + 1
@@ -613,35 +598,7 @@ private[projection] class DynamoDBOffsetStore(
// always accept starting from snapshots when there was no previous event seen
FutureAccepted
} else {
// Haven't see seen this pid within the time window. Since events can be missed
// when read at the tail we will only accept it if the event with previous seqNr has timestamp
// before the time window of the offset store.
// Backtracking will emit missed event again.
timestampOf(pid, seqNr - 1).map {
case Some(previousTimestamp) =>
val before = currentState.latestTimestamp.minus(settings.timeWindow)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This latestTimestamp is not good for DynamoDB since each slice may progress more than others.

I don't really expect this to happen often with DynamoDB since we don't delete offsets in same way, but if we keep the check it's better that it's the same as in r2dbc.

if (previousTimestamp.isBefore(before)) {
logger.debug(
"{} Accepting envelope with pid [{}], seqNr [{}], where previous event timestamp [{}] " +
"is before time window [{}].",
logPrefix,
pid,
seqNr,
previousTimestamp,
before)
Accepted
} else if (!recordWithOffset.fromBacktracking) {
logUnknown()
RejectedSeqNr
} else {
logUnknown()
// This will result in projection restart (with normal configuration)
RejectedBacktrackingSeqNr
}
case None =>
// previous not found, could have been deleted
Accepted
}
validateEventTimestamp(currentState, recordWithOffset)
}
} else {
// strictSeqNr == false is for durable state where each revision might not be visible
@@ -663,6 +620,66 @@ private[projection] class DynamoDBOffsetStore(
}
}

private def validateEventTimestamp(currentState: State, recordWithOffset: RecordWithOffset) = {
import Validation._
val pid = recordWithOffset.record.pid
val seqNr = recordWithOffset.record.seqNr
val slice = recordWithOffset.record.slice

// Haven't see seen this pid within the time window. Since events can be missed
// when read at the tail we will only accept it if the event with previous seqNr has timestamp
// before the startTimestamp minus backtracking window
timestampOf(pid, seqNr - 1).map {
case Some(previousTimestamp) =>
val acceptBefore =
currentState.startTimestampBySlice(slice).minus(settings.backtrackingWindow)

if (previousTimestamp.isBefore(acceptBefore)) {
logger.debug(
"Accepting envelope with pid [{}], seqNr [{}], where previous event timestamp [{}] " +
"is before start timestamp [{}] minus backtracking window [{}].",
pid,
seqNr,
previousTimestamp,
currentState.startTimestampBySlice(slice),
settings.backtrackingWindow)
Accepted
} else if (recordWithOffset.fromPubSub) {
logger.debug(
"Rejecting pub-sub envelope, unknown sequence number [{}] for pid [{}] (might be accepted later): {}",
seqNr,
pid,
recordWithOffset.offset)
RejectedSeqNr
} else if (recordWithOffset.fromBacktracking) {
// This will result in projection restart (with normal configuration)
logger.warn(
"Rejecting unknown sequence number [{}] for pid [{}]. Offset: {}, where previous event timestamp [{}] " +
"is after start timestamp [{}] minus backtracking window [{}].",
seqNr,
pid,
recordWithOffset.offset,
previousTimestamp,
currentState.startTimestampBySlice(slice),
settings.backtrackingWindow)
RejectedBacktrackingSeqNr
} else {
// This may happen rather frequently when using `publish-events`, after reconnecting and such.
logger.debug(
"Rejecting unknown sequence number [{}] for pid [{}] (might be accepted later): {}",
seqNr,
pid,
recordWithOffset.offset)
// Backtracking will emit missed event again.
RejectedSeqNr
}
case None =>
// previous not found, could have been deleted
logger.debug("Accepting envelope with pid [{}], seqNr [{}], where previous event not found.", pid, seqNr)
Accepted
}
}

@tailrec final def addInflight[Envelope](envelope: Envelope): Unit = {
createRecordWithOffset(envelope) match {
case Some(recordWithOffset) =>