From 5e4befa18f757ea16b05fea31436840d0372dc20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=ADctor=20Rold=C3=A1n=20Betancort?= Date: Mon, 25 Nov 2024 11:42:24 +0000 Subject: [PATCH] postgres watch: checkpoints should move the high watermark revision In https://github.com/authzed/spicedb/pull/2139 we introduced the emission of checkpoint RevisionChanges when the postgres snapshot moves up out of band (outside of application changes). The fix missed to also move the locally-cached high watermark revision. As a consequence, on a PG-backed SpiceDB were the pg snapshot does not have an associated SpiceDB transaction, the Watch API will emit checkpoints at the same revision over and over, because its own internal high watermark hasn't moved. This changes the code so that `currentTxn`, which keeps track of the last checked revision, moves as well with an out-of-band snapshot revision. --- .../postgres/postgres_shared_test.go | 22 +++++++++++++------ internal/datastore/postgres/watch.go | 2 ++ 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/internal/datastore/postgres/postgres_shared_test.go b/internal/datastore/postgres/postgres_shared_test.go index 495eb3c593..0ba5783af6 100644 --- a/internal/datastore/postgres/postgres_shared_test.go +++ b/internal/datastore/postgres/postgres_shared_test.go @@ -1636,6 +1636,8 @@ func CheckpointsOnOutOfBandChangesTest(t *testing.T, ds datastore.Datastore) { // Run the watch API. changes, errchan := ds.Watch(ctx, lowestRevision, datastore.WatchOptions{ Content: datastore.WatchRelationships | datastore.WatchSchema | datastore.WatchCheckpoints, + // loop quickly over the changes to make sure we don't re-issue checkpoints + CheckpointInterval: 1 * time.Nanosecond, }) require.Zero(len(errchan)) @@ -1651,27 +1653,33 @@ func CheckpointsOnOutOfBandChangesTest(t *testing.T, ds datastore.Datastore) { require.NoError(err) require.True(newRevision.GreaterThan(lowestRevision)) - var checkedCheckpoint bool + awaitManyCheckpoints := time.NewTimer(1 * time.Second) + var checkpointCount int for { - if checkedCheckpoint { - break - } - changeWait := time.NewTimer(waitForChangesTimeout) select { case change, ok := <-changes: if !ok { - require.True(checkedCheckpoint, "expected checkpoint to be emitted") + require.Equal(1, checkpointCount, "expected checkpoint to be emitted") return } if change.IsCheckpoint { - checkedCheckpoint = change.Revision.GreaterThan(lowestRevision) + if change.Revision.Equal(newRevision) { + checkpointCount++ + } } time.Sleep(10 * time.Millisecond) case <-changeWait.C: require.Fail("timed out waiting for checkpoint for out of band change") + // we want to make sure checkpoints are not reissued when moving out-of-band, so with a short poll interval + // we wait a bit before checking if we received checkpoints, + case <-awaitManyCheckpoints.C: + if checkpointCount > 0 { + require.Equal(1, checkpointCount, "expected checkpoint to be emitted") + return + } } } } diff --git a/internal/datastore/postgres/watch.go b/internal/datastore/postgres/watch.go index f409bccef3..d91e2fa78c 100644 --- a/internal/datastore/postgres/watch.go +++ b/internal/datastore/postgres/watch.go @@ -201,6 +201,8 @@ func (pgd *pgDatastore) Watch( }) { return } + + currentTxn = *optionalHeadRevision } }