From b2e55afbdfea9feeb8aedd3755a38ba65887f862 Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Wed, 23 Oct 2024 12:01:43 -0400 Subject: [PATCH] Adjust CRDB watch to set the checkpoint frequency equal to the resolved Should reduce watch pressure from the caller to CRDB --- internal/datastore/crdb/crdb.go | 2 ++ internal/datastore/crdb/watch.go | 10 ++++++++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/internal/datastore/crdb/crdb.go b/internal/datastore/crdb/crdb.go index 16f67030f1..29ebee2083 100644 --- a/internal/datastore/crdb/crdb.go +++ b/internal/datastore/crdb/crdb.go @@ -200,6 +200,7 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas ), CommonDecoder: revisions.CommonDecoder{Kind: revisions.HybridLogicalClock}, dburl: url, + version: version, watchBufferLength: config.watchBufferLength, watchBufferWriteTimeout: config.watchBufferWriteTimeout, watchConnectTimeout: config.watchConnectTimeout, @@ -286,6 +287,7 @@ type crdbDatastore struct { revisions.CommonDecoder dburl string + version crdbVersion readPool, writePool *pool.RetryPool watchBufferLength uint16 watchBufferWriteTimeout time.Duration diff --git a/internal/datastore/crdb/watch.go b/internal/datastore/crdb/watch.go index 883d14978a..3e93e23ff6 100644 --- a/internal/datastore/crdb/watch.go +++ b/internal/datastore/crdb/watch.go @@ -24,7 +24,7 @@ import ( ) const ( - queryChangefeed = "EXPERIMENTAL CHANGEFEED FOR %s WITH updated, cursor = '%s', resolved = '%s', min_checkpoint_frequency = '0';" + queryChangefeed = "EXPERIMENTAL CHANGEFEED FOR %s WITH updated, cursor = '%s', resolved = '%s', min_checkpoint_frequency = '%s';" queryChangefeedPreV22 = "EXPERIMENTAL CHANGEFEED FOR %s WITH updated, cursor = '%s', resolved = '%s';" ) @@ -140,7 +140,13 @@ func (cds *crdbDatastore) watch( } resolvedDurationString := strconv.FormatInt(resolvedDuration.Milliseconds(), 10) + "ms" - interpolated := fmt.Sprintf(cds.beginChangefeedQuery, strings.Join(tableNames, ","), afterRevision, resolvedDurationString) + + var interpolated string + if cds.version.Major >= 22 { + interpolated = fmt.Sprintf(cds.beginChangefeedQuery, strings.Join(tableNames, ","), afterRevision, resolvedDurationString, resolvedDurationString) + } else { + interpolated = fmt.Sprintf(cds.beginChangefeedQuery, strings.Join(tableNames, ","), afterRevision, resolvedDurationString) + } sendError := func(err error) { if errors.Is(ctx.Err(), context.Canceled) {