From 385542ca917f0310909274cd50f4b03808cd9d4c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=ADctor=20Rold=C3=A1n=20Betancort?= Date: Mon, 25 Nov 2024 11:42:24 +0000 Subject: [PATCH] postgres watch: checkpoints should move the high watermark revision In https://github.com/authzed/spicedb/pull/2139 we introduced the emission of checkpoint RevisionChanges when the postgres snapshot moves up out of band (outside of application changes). The fix missed to also move the locally-cached high watermark revision. As a consequence, on a PG-backed SpiceDB were the pg snapshot does not have an associated SpiceDB transaction, the Watch API will emit checkpoints at the same revision over and over, because its own internal high watermark hasn't moved. This changes the code so that `currentTxn`, which keeps track of the last checked revision, moves as well with an out-of-band snapshot revision. --- .../postgres/postgres_shared_test.go | 30 ++++++++++++++----- internal/datastore/postgres/watch.go | 2 ++ 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/internal/datastore/postgres/postgres_shared_test.go b/internal/datastore/postgres/postgres_shared_test.go index 495eb3c593..b1e59a7943 100644 --- a/internal/datastore/postgres/postgres_shared_test.go +++ b/internal/datastore/postgres/postgres_shared_test.go @@ -1636,6 +1636,8 @@ func CheckpointsOnOutOfBandChangesTest(t *testing.T, ds datastore.Datastore) { // Run the watch API. changes, errchan := ds.Watch(ctx, lowestRevision, datastore.WatchOptions{ Content: datastore.WatchRelationships | datastore.WatchSchema | datastore.WatchCheckpoints, + // loop quickly over the changes to make sure we don't re-issue checkpoints + CheckpointInterval: 10 * time.Nanosecond, }) require.Zero(len(errchan)) @@ -1651,29 +1653,41 @@ func CheckpointsOnOutOfBandChangesTest(t *testing.T, ds datastore.Datastore) { require.NoError(err) require.True(newRevision.GreaterThan(lowestRevision)) - var checkedCheckpoint bool + awaitManyCheckpoints := time.NewTimer(1 * time.Second) + checkpointCount := make(map[string]int) for { - if checkedCheckpoint { - break - } - changeWait := time.NewTimer(waitForChangesTimeout) select { case change, ok := <-changes: if !ok { - require.True(checkedCheckpoint, "expected checkpoint to be emitted") + for _, count := range checkpointCount { + require.Equal(1, count, "found duplicated checkpoint revision event") + } + return } if change.IsCheckpoint { - checkedCheckpoint = change.Revision.GreaterThan(lowestRevision) + if change.Revision.Equal(newRevision) || change.Revision.GreaterThan(newRevision) { + checkpointCount[change.Revision.String()] = checkpointCount[change.Revision.String()] + 1 + } } time.Sleep(10 * time.Millisecond) case <-changeWait.C: require.Fail("timed out waiting for checkpoint for out of band change") + // we want to make sure checkpoints are not reissued when moving out-of-band, so with a short poll interval + // we wait a bit before checking if we received checkpoints, + case <-awaitManyCheckpoints.C: + if len(checkpointCount) > 0 { + for _, count := range checkpointCount { + require.Equal(1, count, "found duplicated checkpoint revision event") + } + + return + } } } } -const waitForChangesTimeout = 5 * time.Second +const waitForChangesTimeout = 30 * time.Second diff --git a/internal/datastore/postgres/watch.go b/internal/datastore/postgres/watch.go index f409bccef3..d91e2fa78c 100644 --- a/internal/datastore/postgres/watch.go +++ b/internal/datastore/postgres/watch.go @@ -201,6 +201,8 @@ func (pgd *pgDatastore) Watch( }) { return } + + currentTxn = *optionalHeadRevision } }