From 785d167b928c19be3d53fe752a1f362aaf19e7ad Mon Sep 17 00:00:00 2001 From: Bob Vawter Date: Mon, 30 Oct 2023 17:22:51 -0400 Subject: [PATCH] stage: Track applied mutations in staging tables This change moves tracking of partially-applied resolved timestamp windows into the staging tables by adding a new `applied` column. The goal of this change is to move some state-tracking out of the cdc resolver loop into the stage package. Tracking apply status on a per-mutation basis improves idempotency of cdc-sink when the userscript has non-idempotent behaviors (e.g.: three-way merge). It also allows us to export monitoring data around mutations which may have slipped through the cracks or to detect when a migration process has completely drained. Fine-grained tracking will also be useful for unifying the non-transactional modes into a single behavior. Many unused methods in the stage API have been deleted. The "unstaging" SQL query is now generated with a golang template and is tested similarly to the apply package. The cdc package performs less work to track partial application of large individual changes. It just persists the contents of the UnstageCursor as a performance enhancement. Exactly-once behavior is provided by the applied column. The change to `server/integration_test.go` is due to the unstage processing being a one-shot. The test being performed duplicates an existing test in `cdc/handler_test.go`. Breaking change: The `--selectBatchSize` flag is deprecated in favor of two different flags `--largeTransactionLimit` and `--timestampWindowSize` which, respectively, enable partial processing of a single, over-sized transaction and a general limit on the total amount of data to be unstaged. Breaking change: A staging schema migraion is required, this is documented in the migrations directory. X-Ref: #487 X-Ref: #504 X-Ref: #565 --- internal/sinktest/all/fixture.go | 32 ++ internal/source/cdc/config.go | 85 +++-- internal/source/cdc/handler_test.go | 79 +---- internal/source/cdc/resolved_stamp.go | 72 +--- internal/source/cdc/resolver.go | 265 ++++++++------ internal/source/server/integration_test.go | 40 --- internal/staging/stage/factory.go | 263 ++++---------- internal/staging/stage/metrics.go | 6 +- internal/staging/stage/queries/unstage.tmpl | 107 ++++++ internal/staging/stage/stage.go | 260 +++++--------- internal/staging/stage/stage_test.go | 324 +++++++----------- internal/staging/stage/templates.go | 59 ++++ internal/staging/stage/templates_test.go | 94 +++++ internal/staging/stage/testdata/unstage.sql | 83 +++++ .../staging/stage/testdata/unstage_limit.sql | 91 +++++ internal/staging/version/versions.go | 1 + internal/types/types.go | 91 +++-- migrations/pr_572.md | 40 +++ 18 files changed, 1067 insertions(+), 925 deletions(-) create mode 100644 internal/staging/stage/queries/unstage.tmpl create mode 100644 internal/staging/stage/templates.go create mode 100644 internal/staging/stage/templates_test.go create mode 100644 internal/staging/stage/testdata/unstage.sql create mode 100644 internal/staging/stage/testdata/unstage_limit.sql create mode 100644 migrations/pr_572.md diff --git a/internal/sinktest/all/fixture.go b/internal/sinktest/all/fixture.go index f7c4fde35..31f525f80 100644 --- a/internal/sinktest/all/fixture.go +++ b/internal/sinktest/all/fixture.go @@ -26,7 +26,9 @@ import ( "github.com/cockroachdb/cdc-sink/internal/types" "github.com/cockroachdb/cdc-sink/internal/util/applycfg" "github.com/cockroachdb/cdc-sink/internal/util/diag" + "github.com/cockroachdb/cdc-sink/internal/util/hlc" "github.com/cockroachdb/cdc-sink/internal/util/ident" + "github.com/jackc/pgx/v5" "github.com/pkg/errors" ) @@ -76,3 +78,33 @@ func (f *Fixture) CreateTargetTable( } return ti, err } + +// PeekStaged peeks at the data which has been staged for the target +// table between the given timestamps. +func (f *Fixture) PeekStaged( + ctx context.Context, tbl ident.Table, startAt, endBefore hlc.Time, +) ([]types.Mutation, error) { + tx, err := f.StagingPool.BeginTx(ctx, pgx.TxOptions{}) + if err != nil { + return nil, errors.WithStack(err) + } + defer func() { _ = tx.Rollback(ctx) }() + + q := &types.UnstageCursor{ + StartAt: startAt, + EndBefore: endBefore, + Targets: []ident.Table{tbl}, + } + var ret []types.Mutation + for selecting := true; selecting; { + q, selecting, err = f.Stagers.Unstage(ctx, tx, q, + func(ctx context.Context, tbl ident.Table, mut types.Mutation) error { + ret = append(ret, mut) + return nil + }) + if err != nil { + return nil, err + } + } + return ret, nil +} diff --git a/internal/source/cdc/config.go b/internal/source/cdc/config.go index 97e98c3d0..c0cbfdde1 100644 --- a/internal/source/cdc/config.go +++ b/internal/source/cdc/config.go @@ -27,10 +27,12 @@ import ( ) const ( - defaultBackupPolling = 100 * time.Millisecond - defaultFlushBatchSize = 1_000 - defaultSelectBatchSize = 10_000 - defaultNDJsonBuffer = bufio.MaxScanTokenSize // 64k + defaultBackupPolling = 100 * time.Millisecond + defaultIdealBatchSize = 1000 + defaultMetaTable = "resolved_timestamps" + defaultLargeTransactionLimit = 250_000 // Chosen arbitrarily. + defaultNDJsonBuffer = bufio.MaxScanTokenSize // 64k + defaultTimestampWindowSize = 1000 ) // Config adds CDC-specific configuration to the core logical loop. @@ -43,27 +45,31 @@ type Config struct { // timestamps will receive the incoming resolved-timestamp message. BackupPolling time.Duration - // Don't coalesce updates from different source MVCC timestamps into - // a single destination transaction. Setting this will preserve the - // original structure of updates from the source, but may decrease - // overall throughput by increasing the total number of target - // database transactions. + // If true, the resolver loop will behave as though + // TimestampWindowSize has been set to 1. That is, each unique + // timestamp within a resolved-timestamp window will be processed, + // instead of fast-forwarding to the latest values within a batch. + // This flag is relevant for use-cases that employ a merge function + // which must see every incremental update in order to produce a + // meaningful result. FlushEveryTimestamp bool - // Coalesce timestamps within a resolved-timestamp window until - // at least this many mutations have been collected. + // This setting controls a soft maximum on the number of rows that + // will be flushed in one target transaction. IdealFlushBatchSize int - // The maximum amount of data to buffer when reading a single line - // of ndjson input. This can be increased if the source cluster - // has large blob values. - NDJsonBuffer int + // If non-zero and the number of rows in a single timestamp exceeds + // this value, allow that source transaction to be applied over + // multiple target transactions. + LargeTransactionLimit int // The name of the resolved_timestamps table. MetaTableName ident.Ident - // The number of rows to retrieve when loading staged data. - SelectBatchSize int + // The maximum amount of data to buffer when reading a single line + // of ndjson input. This can be increased if the source cluster + // has large blob values. + NDJsonBuffer int // Retain staged, applied data for an extra amount of time. This // allows, for example, additional time to validate what was staged @@ -71,6 +77,13 @@ type Config struct { // be retired as soon as there is a resolved timestamp greater than // the timestamp of the mutation. RetireOffset time.Duration + + // The maximum number of source transactions to unstage at once. + // This does not place a hard limit on the number of mutations that + // may be dequeued at once, but it does reduce the total number of + // round-trips to the staging database when the average transaction + // size is small. + TimestampWindowSize int } // Bind adds configuration flags to the set. @@ -80,19 +93,27 @@ func (c *Config) Bind(f *pflag.FlagSet) { f.DurationVar(&c.BackupPolling, "backupPolling", defaultBackupPolling, "poll for resolved timestamps from other instances of cdc-sink") f.BoolVar(&c.FlushEveryTimestamp, "flushEveryTimestamp", false, - "preserve intermediate updates from the source in transactional mode; "+ + "don't fast-forward to the latest row values within a resolved timestamp; "+ "may negatively impact throughput") - f.IntVar(&c.IdealFlushBatchSize, "idealFlushBatchSize", defaultFlushBatchSize, - "try to apply at least this many mutations per resolved-timestamp window") + f.IntVar(&c.IdealFlushBatchSize, "idealFlushBatchSize", defaultIdealBatchSize, + "a soft limit on the number of rows to send to the target at once") + f.IntVar(&c.LargeTransactionLimit, "largeTransactionLimit", defaultLargeTransactionLimit, + "if non-zero, all source transactions with more than this "+ + "number of rows may be applied in multiple target transactions") f.IntVar(&c.NDJsonBuffer, "ndjsonBufferSize", defaultNDJsonBuffer, "the maximum amount of data to buffer while reading a single line of ndjson input; "+ "increase when source cluster has large blob values") - f.Var(ident.NewValue("resolved_timestamps", &c.MetaTableName), "metaTable", + f.Var(ident.NewValue(defaultMetaTable, &c.MetaTableName), "metaTable", "the name of the table in which to store resolved timestamps") - f.IntVar(&c.SelectBatchSize, "selectBatchSize", defaultSelectBatchSize, - "the number of rows to select at once when reading staged data") f.DurationVar(&c.RetireOffset, "retireOffset", 0, - "retain staged, applied data for an extra duration") + "if non-zero, retain staged, applied data for an extra duration") + f.IntVar(&c.TimestampWindowSize, "timestampWindowSize", defaultTimestampWindowSize, + "the maximum number of source transaction timestamps to unstage at once") + + var deprecated int + const msg = "replaced by --largeTransactionLimit and --timestampWindowSize" + f.IntVar(&deprecated, "selectBatchSize", 0, "") + _ = f.MarkDeprecated("selectBatchSize", msg) } // Preflight implements logical.Config. @@ -105,20 +126,26 @@ func (c *Config) Preflight() error { c.BackupPolling = defaultBackupPolling } if c.IdealFlushBatchSize == 0 { - c.IdealFlushBatchSize = defaultFlushBatchSize + c.IdealFlushBatchSize = defaultIdealBatchSize + } + // Zero is legal; implies no limit. + if c.LargeTransactionLimit < 0 { + return errors.New("largeTransactionLimit must be >= 0") } if c.NDJsonBuffer == 0 { c.NDJsonBuffer = defaultNDJsonBuffer } if c.MetaTableName.Empty() { - return errors.New("no metadata table specified") - } - if c.SelectBatchSize == 0 { - c.SelectBatchSize = defaultSelectBatchSize + c.MetaTableName = ident.New(defaultMetaTable) } if c.RetireOffset < 0 { return errors.New("retireOffset must be >= 0") } + if c.TimestampWindowSize < 0 { + return errors.New("timestampWindowSize must be >= 0") + } else if c.TimestampWindowSize == 0 { + c.TimestampWindowSize = defaultTimestampWindowSize + } return nil } diff --git a/internal/source/cdc/handler_test.go b/internal/source/cdc/handler_test.go index bf28e873d..77f0d091d 100644 --- a/internal/source/cdc/handler_test.go +++ b/internal/source/cdc/handler_test.go @@ -160,10 +160,8 @@ func testQueryHandler(t *testing.T, htc *fixtureConfig) { // Verify staged data, if applicable. if !htc.immediate { - stager, err := fixture.Stagers.Get(ctx, tableInfo.Name()) + muts, err := fixture.PeekStaged(ctx, tableInfo.Name(), hlc.Zero(), hlc.New(1, 1)) if a.NoError(err) { - muts, err := stager.Select(ctx, fixture.StagingPool.Pool, hlc.Zero(), hlc.New(1, 0)) - a.NoError(err) a.Len(muts, 2) // The order is stable since the underlying query // is ordered, in part, by the key. @@ -229,40 +227,14 @@ func testQueryHandler(t *testing.T, htc *fixtureConfig) { `), })) - // Verify staged data, if applicable. + // Verify staged and un-applied data, if applicable. if !htc.immediate { - stager, err := fixture.Stagers.Get(ctx, tableInfo.Name()) + muts, err := fixture.PeekStaged(ctx, tableInfo.Name(), hlc.Zero(), hlc.New(10, 1)) if a.NoError(err) { - muts, err := stager.Select(ctx, fixture.StagingPool.Pool, hlc.Zero(), hlc.New(10, 0)) - a.NoError(err) - a.Len(muts, 6) + a.Len(muts, 2) // The order is stable since the underlying query // orders by HLC and key. a.Equal([]types.Mutation{ - // Original entries. - { - Data: []byte(`{"pk":42,"v":99}`), - Key: []byte(`[42]`), - Time: hlc.New(1, 0), - }, - { - Before: []byte(`{"pk":99,"v":33}`), - Data: []byte(`{"pk":99,"v":42}`), - Key: []byte(`[99]`), - Time: hlc.New(1, 0), - }, - // These are the deletes from above. - { - Data: []byte(`null`), - Key: []byte(`[42]`), - Time: hlc.New(3, 0), - }, - { - Data: []byte(`null`), - Key: []byte(`[99]`), - Time: hlc.New(3, 0), - }, - // These are the entries we just created. { Data: []byte(`{"pk":42,"v":99}`), Key: []byte(`[42]`), @@ -384,10 +356,8 @@ func testHandler(t *testing.T, cfg *fixtureConfig) { // Verify staged data, if applicable. if !cfg.immediate { - stager, err := fixture.Stagers.Get(ctx, jumbleName()) + muts, err := fixture.PeekStaged(ctx, tableInfo.Name(), hlc.Zero(), hlc.New(1, 1)) if a.NoError(err) { - muts, err := stager.Select(ctx, fixture.StagingPool.Pool, hlc.Zero(), hlc.New(1, 0)) - a.NoError(err) a.Len(muts, 2) // The order is stable since the underlying query // orders, in part, on key @@ -454,40 +424,14 @@ func testHandler(t *testing.T, cfg *fixtureConfig) { `, jumbleName().Table())), })) - // Verify staged data, if applicable. + // Verify staged and as-yet-unapplied data. if !cfg.immediate { - stager, err := fixture.Stagers.Get(ctx, jumbleName()) + muts, err := fixture.PeekStaged(ctx, tableInfo.Name(), hlc.Zero(), hlc.New(10, 1)) if a.NoError(err) { - muts, err := stager.Select(ctx, fixture.StagingPool.Pool, hlc.Zero(), hlc.New(10, 0)) - a.NoError(err) - a.Len(muts, 6) + a.Len(muts, 2) // The order is stable since the underlying query // orders by HLC and key. a.Equal([]types.Mutation{ - // Original entries. - { - Data: []byte(`{ "pk" : 42, "v" : 99 }`), - Key: []byte(`[ 42 ]`), - Time: hlc.New(1, 0), - }, - { - Before: []byte(`{ "pk" : 99, "v" : 33 }`), - Data: []byte(`{ "pk" : 99, "v" : 42 }`), - Key: []byte(`[ 99 ]`), - Time: hlc.New(1, 0), - }, - // These are the deletes from above. - { - Data: []byte(`null`), - Key: []byte(`[ 42 ]`), - Time: hlc.New(3, 0), - }, - { - Data: []byte(`null`), - Key: []byte(`[ 99 ]`), - Time: hlc.New(3, 0), - }, - // These are the entries we just created. { Data: []byte(`{ "pk" : 42, "v" : 99 }`), Key: []byte(`[ 42 ]`), @@ -655,6 +599,7 @@ func TestConcurrentHandlers(t *testing.T) { func testMassBackfillWithForeignKeys( t *testing.T, rowCount, fixtureCount int, fns ...func(*Config), ) { + t.Parallel() // We spend most of our time waiting for a resolved timestamp. r := require.New(t) baseFixture, cancel, err := all.NewFixture() @@ -675,9 +620,9 @@ func testMassBackfillWithForeignKeys( for idx := range fixtures { cfg := &Config{ - IdealFlushBatchSize: 123, // Pick weird batch sizes. - SelectBatchSize: 587, - MetaTableName: ident.New("resolved_timestamps"), + LargeTransactionLimit: rowCount / 3, // Read the same timestamp more than once. + MetaTableName: ident.New("resolved_timestamps"), + TimestampWindowSize: 100, BaseConfig: logical.BaseConfig{ ApplyTimeout: time.Second, FanShards: 16, diff --git a/internal/source/cdc/resolved_stamp.go b/internal/source/cdc/resolved_stamp.go index fe84ee5e7..95126ed0a 100644 --- a/internal/source/cdc/resolved_stamp.go +++ b/internal/source/cdc/resolved_stamp.go @@ -28,47 +28,26 @@ import ( ) // resolvedStamp tracks the progress of applying staged mutations for -// a given resolved timestamp. This type has some extra complexity to -// support backfilling of initial changefeed scans. -// -// Here's a state diagram to help: -// -// committed -// | *---* -// V V | -// proposed ---> backfill -* -// | | -// V | -// committed <----------* -// -// Tracking of FK levels and table offsets is only used in backfill mode -// and is especially relevant to an initial_scan from a changefeed. In -// this state, we have an arbitrarily large number of mutations to -// apply, all of which will have the same timestamp. -// -// Each resolvedStamp is associated with a separate database transaction -// that holds a SELECT FOR UPDATE lock on the row that holds the -// timestamp being resolved. The ownership of this transaction is -// transferred to any derived resolvedStamp that represents partial -// progress within a resolved timestamp. The resolved-timestamp -// transaction is wrapped in a txguard.Guard to ensure that it is kept -// alive. If the resolved-timestamp transaction fails for any reason, -// we can roll back to the previously committed stamp through the usual -// logical-loop error-handling code. +// a given resolved timestamp. +// Partial progress of very large batches is maintained in the staging +// database by tracking a per-mutation applied flag. type resolvedStamp struct { - Backfill bool `json:"b,omitempty"` // A resolved timestamp that represents a transactionally-consistent // point in the history of the workload. CommittedTime hlc.Time `json:"c,omitempty"` // Iteration is used to provide well-ordered behavior within a - // single backfill window. + // single backfill window. Partial progress is maintained in the + // staging tables in case cdc-sink is interrupted in the middle + // of processing a large window. Iteration int `json:"i,omitempty"` + // LargeBatchOffset allows us to jump over already-applied keys when + // processing a large batch of data (e.g. an initial backfill). This + // is merely a performance optimization, since the staging tables + // are used to track whether or not any particular mutation has been + // applied. + LargeBatchOffset *ident.TableMap[json.RawMessage] `json:"off,omitempty"` // The next resolved timestamp that we want to advance to. ProposedTime hlc.Time `json:"p,omitempty"` - - OffsetKey json.RawMessage `json:"ok,omitempty"` - OffsetTable ident.Table `json:"otbl,omitempty"` - OffsetTime hlc.Time `json:"ots,omitempty"` } // AsTime implements logical.TimeStamp to improve reporting. @@ -106,47 +85,30 @@ func (s *resolvedStamp) NewProposed(proposed hlc.Time) (*resolvedStamp, error) { return nil, errors.Errorf("proposed cannot roll back committed time: %s vs %s", proposed, s.CommittedTime) } - if hlc.Compare(proposed, s.OffsetTime) < 0 { - return nil, errors.Errorf("proposed time undoing work: %s vs %s", proposed, s.OffsetTime) - } if hlc.Compare(proposed, s.ProposedTime) < 0 { return nil, errors.Errorf("proposed time cannot go backward: %s vs %s", proposed, s.ProposedTime) } return &resolvedStamp{ - Backfill: s.Backfill, CommittedTime: s.CommittedTime, Iteration: s.Iteration + 1, - OffsetKey: s.OffsetKey, - OffsetTable: s.OffsetTable, - OffsetTime: s.OffsetTime, ProposedTime: proposed, }, nil } // NewProgress returns a resolvedStamp that represents partial progress // within the same [committed, proposed] window. -func (s *resolvedStamp) NewProgress(cursor *types.SelectManyCursor) *resolvedStamp { +func (s *resolvedStamp) NewProgress(cursor *types.UnstageCursor) *resolvedStamp { ret := &resolvedStamp{ - Backfill: s.Backfill, CommittedTime: s.CommittedTime, Iteration: s.Iteration + 1, - OffsetKey: s.OffsetKey, - OffsetTable: s.OffsetTable, - OffsetTime: s.OffsetTime, ProposedTime: s.ProposedTime, } - if cursor != nil { - ret.OffsetTime = cursor.OffsetTime - - // Only Key and Table make sense to retain in a backfill. For - // transactional mode, we always want to restart at a specific - // timestamp. - if s.Backfill { - ret.OffsetKey = cursor.OffsetKey - ret.OffsetTable = cursor.OffsetTable - } + // Record offsets to allow us to skip rows within the next query. + if cursor != nil && cursor.StartAfterKey.Len() > 0 { + ret.LargeBatchOffset = &ident.TableMap[json.RawMessage]{} + cursor.StartAfterKey.CopyInto(ret.LargeBatchOffset) } return ret diff --git a/internal/source/cdc/resolver.go b/internal/source/cdc/resolver.go index 2597bb027..c7a148702 100644 --- a/internal/source/cdc/resolver.go +++ b/internal/source/cdc/resolver.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cdc-sink/internal/util/hlc" "github.com/cockroachdb/cdc-sink/internal/util/ident" "github.com/cockroachdb/cdc-sink/internal/util/notify" + "github.com/cockroachdb/cdc-sink/internal/util/retry" "github.com/cockroachdb/cdc-sink/internal/util/stamp" "github.com/cockroachdb/cdc-sink/internal/util/stopper" "github.com/jackc/pgx/v5" @@ -164,18 +165,18 @@ func (r *resolver) Mark(ctx context.Context, ts hlc.Time) error { func (r *resolver) BackfillInto( ctx context.Context, ch chan<- logical.Message, state logical.State, ) error { - return r.readInto(ctx, ch, state, true) + return r.readInto(ctx, ch, state) } // ReadInto implements logical.Dialect. func (r *resolver) ReadInto( ctx context.Context, ch chan<- logical.Message, state logical.State, ) error { - return r.readInto(ctx, ch, state, false) + return r.readInto(ctx, ch, state) } func (r *resolver) readInto( - ctx context.Context, ch chan<- logical.Message, state logical.State, backfill bool, + ctx context.Context, ch chan<- logical.Message, state logical.State, ) error { // This will either be from a previous iteration or ZeroStamp. cp, cpUpdated := state.GetConsistentPoint() @@ -211,7 +212,7 @@ func (r *resolver) readInto( } else { // We're looping around with a committed timestamp, so // look for work to do and send it. - proposed, err := r.nextProposedStamp(ctx, resumeFrom, backfill) + proposed, err := r.nextProposedStamp(ctx, resumeFrom) switch { case err == nil: log.WithFields(log.Fields{ @@ -286,7 +287,7 @@ func (r *resolver) readInto( // may be one of the sentinel values errNoWork or errBlocked that are // returned by selectTimestamp. func (r *resolver) nextProposedStamp( - ctx context.Context, prev *resolvedStamp, backfill bool, + ctx context.Context, prev *resolvedStamp, ) (*resolvedStamp, error) { // Find the next resolved timestamp to apply, starting from // a timestamp known to be committed. @@ -301,11 +302,6 @@ func (r *resolver) nextProposedStamp( return nil, err } - // Propagate the backfilling flag. - if backfill { - ret.Backfill = true - } - return ret, nil } @@ -353,8 +349,7 @@ func (r *resolver) ZeroStamp() stamp.Stamp { } // process makes incremental progress in fulfilling the given -// resolvedStamp. It returns the state to which the resolved timestamp -// has been advanced. +// resolvedStamp. func (r *resolver) process(ctx context.Context, rs *resolvedStamp, events logical.Events) error { start := time.Now() targets := r.watcher.Get().Order @@ -363,135 +358,193 @@ func (r *resolver) process(ctx context.Context, rs *resolvedStamp, events logica return errors.Errorf("no tables known in schema %s; have they been created?", r.target) } - cursor := &types.SelectManyCursor{ - Backfill: rs.Backfill, - End: rs.ProposedTime, - Limit: r.cfg.SelectBatchSize, - OffsetKey: rs.OffsetKey, - OffsetTable: rs.OffsetTable, - OffsetTime: rs.OffsetTime, - Start: rs.CommittedTime, - Targets: targets, + flattened := make([]ident.Table, 0, len(targets)) + for _, tgts := range targets { + flattened = append(flattened, tgts...) + } + + cursor := &types.UnstageCursor{ + StartAt: rs.CommittedTime, + EndBefore: rs.ProposedTime, + Targets: flattened, + TimestampLimit: r.cfg.TimestampWindowSize, + UpdateLimit: r.cfg.LargeTransactionLimit, + } + if rs.LargeBatchOffset != nil && rs.LargeBatchOffset.Len() > 0 { + rs.LargeBatchOffset.CopyInto(&cursor.StartAfterKey) } - source := script.SourceName(r.target) - flush := func(toApply *ident.TableMap[[]types.Mutation], final bool) error { + flush := func(toApply *ident.TableMap[[]types.Mutation]) error { flushStart := time.Now() ctx, cancel := context.WithTimeout(ctx, r.cfg.ApplyTimeout) defer cancel() - // Apply the retrieved data. - batch, err := events.OnBegin(ctx) - if err != nil { - return err - } - defer func() { _ = batch.OnRollback(ctx) }() - - for _, tables := range targets { - for _, table := range tables { - muts := toApply.GetZero(table) - if len(muts) == 0 { - continue - } - if err := batch.OnData(ctx, source, table, muts); err != nil { - return err - } - } - } + retries := -1 + if err := retry.Retry(ctx, func(ctx context.Context) error { + retries++ - // OnCommit is asynchronous to support (a future) - // unified-immediate mode. We'll wait for the data to be - // committed. - select { - case err := <-batch.OnCommit(ctx): + // Apply the retrieved data. + batch, err := events.OnBegin(ctx) if err != nil { return err } - case <-ctx.Done(): - return ctx.Err() - } + defer func() { _ = batch.OnRollback(ctx) }() - // Advance and save the stamp once the flush has completed. - if final { - rs, err = rs.NewCommitted() - if err != nil { - return err + for _, tables := range targets { + for _, table := range tables { + muts := toApply.GetZero(table) + if len(muts) == 0 { + continue + } + source := script.SourceName(r.target) + muts = append([]types.Mutation(nil), muts...) + if err := batch.OnData(ctx, source, table, muts); err != nil { + return err + } + } } - // Mark the timestamp has being processed. - if err := r.Record(ctx, rs.CommittedTime); err != nil { + + // OnCommit is asynchronous to support (a future) + // unified-immediate mode. We'll wait for the data to be + // committed. + select { + case err := <-batch.OnCommit(ctx): return err + case <-ctx.Done(): + return ctx.Err() } - } else { - rs = rs.NewProgress(cursor) - } - if err := events.SetConsistentPoint(ctx, rs); err != nil { + }); err != nil { return err } log.WithFields(log.Fields{ "duration": time.Since(flushStart), "schema": r.target, + "retries": retries, }).Debugf("flushed mutations") return nil } - var epoch hlc.Time - flushCounter := 0 - toApply := &ident.TableMap[[]types.Mutation]{} - total := 0 - if err := r.stagers.SelectMany(ctx, r.pool, cursor, - func(ctx context.Context, tbl ident.Table, mut types.Mutation) error { - // Check for flush before accumulating. - var needsFlush bool - if rs.Backfill { - // We're receiving data in table-order. Just read data - // and flush it once we hit our soft limit, since we - // can resume at any point. - needsFlush = flushCounter >= r.cfg.IdealFlushBatchSize - } else if r.cfg.FlushEveryTimestamp { - // The user wants to preserve all intermediate updates - // to a row, rather than fast-forwarding the values of a - // row to the latest transactionally-consistent state. - // We'll flush on every MVCC boundary change. - needsFlush = epoch != hlc.Zero() && hlc.Compare(mut.Time, epoch) > 0 - } else { - // We're receiving data ordered by MVCC timestamp. Flush - // data when we see a new epoch after accumulating a - // minimum number of mutations. This increases - // throughput when there are many single-row - // transactions in the source database. - needsFlush = flushCounter >= r.cfg.IdealFlushBatchSize && - hlc.Compare(mut.Time, epoch) > 0 + total := 0 // Metrics. + + // We run in a loop until the attempt to unstage returns no more + // data (i.e. we've consumed all mutations within the resolving + // window). If a LargeBatchLimit is configured, we may wind up + // calling flush() several times within a particularly large MVCC + // timestamp. + for hadData := true; hadData; { + if err := retry.Retry(ctx, func(ctx context.Context) error { + unstageTX, err := r.pool.BeginTx(ctx, pgx.TxOptions{}) + if err != nil { + return errors.Wrap(err, "could not open staging transaction") } - if needsFlush { - if err := flush(toApply, false /* interim */); err != nil { - return err - } - total += flushCounter - flushCounter = 0 - // Reset the slices in toApply; flush() ignores zero-length entries. - _ = toApply.Range(func(tbl ident.Table, muts []types.Mutation) error { - toApply.Put(tbl, muts[:0]) + defer func() { _ = unstageTX.Rollback(ctx) }() + + var epoch hlc.Time + var nextCursor *types.UnstageCursor + pendingMutations := 0 + toApply := &ident.TableMap[[]types.Mutation]{} + + nextCursor, hadData, err = r.stagers.Unstage(ctx, unstageTX, cursor, + func(ctx context.Context, tbl ident.Table, mut types.Mutation) error { + total++ + + // Decorate the mutation. + script.AddMeta("cdc", tbl, &mut) + + if epoch == hlc.Zero() { + epoch = mut.Time + } else if r.cfg.FlushEveryTimestamp || + pendingMutations >= r.cfg.IdealFlushBatchSize { + // If the user wants to see all intermediate + // values for a row, we'll flush whenever + // there's a change in the timestamp value. We + // may also preemptively flush to prevent + // unbounded memory use. + if hlc.Compare(mut.Time, epoch) > 0 { + if err := flush(toApply); err != nil { + return err + } + epoch = mut.Time + pendingMutations = 0 + toApply = &ident.TableMap[[]types.Mutation]{} + } + } else if hlc.Compare(mut.Time, epoch) < 0 { + // This would imply a coding error in Unstage. + return errors.New("epoch going backwards") + } + + // Accumulate the mutation. + toApply.Put(tbl, append(toApply.GetZero(tbl), mut)) + pendingMutations++ return nil }) + if err != nil { + return errors.Wrap(err, "could not unstage mutations") } - flushCounter++ - script.AddMeta("cdc", tbl, &mut) - toApply.Put(tbl, append(toApply.GetZero(tbl), mut)) - epoch = mut.Time + if hadData { + // Final flush of the data that was unstaged. It's + // possible, although unlikely, that the final callback + // from Unstage performed a flush. If this were to + // happen, we don't want to perform a no-work flush. + if pendingMutations > 0 { + if err := flush(toApply); err != nil { + return err + } + } + + // Commit the transaction that unstaged the data. Note that + // without an X/A transaction model, there's always a + // possibility where we've committed the target transaction + // and then the unstaging transaction fails to commit. In + // general, we expect that re-applying a mutation within a + // short timeframe should be idempotent, although this might + // not necessarily be the case for data-aware merge + // functions in the absence of conflict-free datatypes. We + // could choose to keep a manifest of (table, key, + // timestamp) entries in the target database to filter + // repeated updates. + // https://github.com/cockroachdb/cdc-sink/issues/565 + if err := unstageTX.Commit(ctx); err != nil { + return errors.Wrap(err, "could not commit unstaging transaction; "+ + "mutations may be reapplied") + } + + // If we're going to continue around, update the + // consistent point with partial progress. This ensures + // that cdc-sink can make progress on large batches, + // even if it's restarted. + cursor = nextCursor + rs = rs.NewProgress(cursor) + } else { + // We read no data, which means that we're done. We'll + // let the unstaging transaction be rolled back by the + // defer above since it performed no actual work in the + // database. + rs, err = rs.NewCommitted() + if err != nil { + return err + } + // Mark the timestamp has being processed. + if err := r.Record(ctx, rs.CommittedTime); err != nil { + return err + } + } + + // Save the consistent point to be able to resume if the + // cdc-sink process is stopped. + if err := events.SetConsistentPoint(ctx, rs); err != nil { + return err + } return nil }); err != nil { - return err + return err + } } - // Final flush cycle to commit the final stamp. - if err := flush(toApply, true /* final */); err != nil { - return err - } - total += flushCounter log.WithFields(log.Fields{ "committed": rs.CommittedTime, "count": total, diff --git a/internal/source/server/integration_test.go b/internal/source/server/integration_test.go index 488738748..eccc5096e 100644 --- a/internal/source/server/integration_test.go +++ b/internal/source/server/integration_test.go @@ -32,7 +32,6 @@ import ( "github.com/cockroachdb/cdc-sink/internal/source/logical" jwtAuth "github.com/cockroachdb/cdc-sink/internal/staging/auth/jwt" "github.com/cockroachdb/cdc-sink/internal/util/diag" - "github.com/cockroachdb/cdc-sink/internal/util/hlc" "github.com/cockroachdb/cdc-sink/internal/util/ident" "github.com/cockroachdb/cdc-sink/internal/util/stdlogical" "github.com/cockroachdb/cdc-sink/internal/util/stopper" @@ -312,45 +311,6 @@ func testIntegration(t *testing.T, cfg testConfig) { time.Sleep(100 * time.Millisecond) } - if !cfg.immediate { - t.Run("inspect_staged_data", func(t *testing.T) { - a := assert.New(t) - r := require.New(t) - - stager, err := targetFixture.Stagers.Get(ctx, target) - r.NoError(err) - - // Just load all staged data. - muts, err := stager.Select(ctx, targetFixture.StagingPool, - hlc.Zero(), hlc.New(time.Now().UnixNano(), 0)) - r.NoError(err) - r.Len(muts, 3) // Two inserts and one update. - - // Classify the mutation's before value as empty/null or - // containing an object. - var objectCount, nullCount int - for _, mut := range muts { - before := string(mut.Before) - if len(before) == 0 || before == "null" { - nullCount++ - } else if len(before) > 0 && before[0] == '{' { - objectCount++ - } else { - r.Fail("unexpected before value", before) - } - } - - if cfg.diff { - a.Equal(1, objectCount) // One update. - a.Equal(2, nullCount) // Two inserts. - } else { - // We don't expect to see any diff data. - a.Equal(0, objectCount) - a.Equal(3, nullCount) - } - }) - } - metrics, err := prometheus.DefaultGatherer.Gather() a.NoError(err) log.WithField("metrics", metrics).Trace() diff --git a/internal/staging/stage/factory.go b/internal/staging/stage/factory.go index f93de0815..e35843a50 100644 --- a/internal/staging/stage/factory.go +++ b/internal/staging/stage/factory.go @@ -18,18 +18,14 @@ package stage import ( "context" - "fmt" - "strings" + "encoding/json" "sync" - "time" "github.com/cockroachdb/cdc-sink/internal/types" "github.com/cockroachdb/cdc-sink/internal/util/hlc" "github.com/cockroachdb/cdc-sink/internal/util/ident" "github.com/cockroachdb/cdc-sink/internal/util/stopper" - "github.com/jackc/pgx/v5" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" ) type factory struct { @@ -74,216 +70,75 @@ func (f *factory) getUnlocked(table ident.Table) *stage { return f.mu.instances.GetZero(table) } -// SelectMany implements types.Stagers. -func (f *factory) SelectMany( +// Unstage implements types.Stagers. +func (f *factory) Unstage( ctx context.Context, tx types.StagingQuerier, - q *types.SelectManyCursor, - fn types.SelectManyCallback, -) error { - if q.Limit == 0 { - return errors.New("limit must be set") + cursor *types.UnstageCursor, + fn types.UnstageCallback, +) (*types.UnstageCursor, bool, error) { + // Duplicate the cursor so callers can choose to advance. + cursor = cursor.Copy() + + data := &templateData{ + Cursor: cursor, + StagingSchema: f.stagingDB.Schema(), } - // Ensure offset >= start. - if hlc.Compare(q.OffsetTime, q.Start) < 0 { - q.OffsetTime = q.Start + q, err := data.Eval() + if err != nil { + return nil, false, err } - - // Each table is assigned a numeric id to simplify the queries. - tablesToIds := &ident.TableMap[int]{} - idsToTables := make(map[int]ident.Table) - var orderedTables []ident.Table - for _, tables := range q.Targets { - orderedTables = append(orderedTables, tables...) - for _, table := range tables { - id := tablesToIds.Len() - if _, duplicate := tablesToIds.Get(table); duplicate { - return errors.Errorf("duplicate table name: %s", table) - } - tablesToIds.Put(table, id) - idsToTables[id] = table - } + keyOffsets := make([]string, len(cursor.Targets)) + for idx, tbl := range cursor.Targets { + keyOffsets[idx] = string(cursor.StartAfterKey.GetZero(tbl)) } - - // Define the rows variable here, so we don't leak it from the - // variety of error code-paths below. - var rows pgx.Rows - defer func() { - if rows != nil { - rows.Close() - } - }() - - for { - start := time.Now() - - offsetTableIdx := -1 - if !q.OffsetTable.Empty() { - offsetTableIdx = tablesToIds.GetZero(q.OffsetTable) - } - - // This is a union-all construct: - // WITH data AS ( - // (SELECT ... FROM staging_table) - // UNION ALL - // (SELECT ... FROM another_staging_table) - // ) - // SELECT ... FROM data .... - // - // The generated SQL does vary based on the offset table value, - // so we can't easily reuse it across iterations. - var sb strings.Builder - sb.WriteString(`WITH -args AS (SELECT $1::INT, $2::INT, $3::INT, $4::INT, $5::INT, $6:::INT, $7::INT, $8::STRING), -data AS (`) - needsUnion := false - for _, table := range orderedTables { - id := tablesToIds.GetZero(table) - // If we're backfilling, the dominant ordering term is the - // table. Any tables that are before the starting table can - // simply be elided from the query. - if q.Backfill && id < offsetTableIdx { - continue - } - if needsUnion { - sb.WriteString(" UNION ALL ") - } else { - needsUnion = true - } - - // Mangle the real table name into its staging table name. - destination := stagingTable(f.stagingDB, table) - - // This ORDER BY should be driven by the index scan, but we - // don't want to be subject to any cross-range scan merges. - // The WHERE clauses in the sub-queries should line up with - // the ORDER BY in the top-level that we use for pagination. - if q.Backfill { - if id == offsetTableIdx { - // We're resuming in the middle of reading a table. - _, _ = fmt.Fprintf(&sb, - "(SELECT %[1]d AS t, nanos, logical, key, mut, before "+ - "FROM %[2]s "+ - "WHERE (nanos, logical, key) > ($6, $7, $8) "+ - "AND (nanos, logical) <= ($3, $4) "+ - "ORDER BY nanos, logical, key "+ - "LIMIT $5)", - id, destination) - } else { - // id must be > offsetTableIdx, since we filter out - // lesser values above. This means that we haven't - // yet read any values from this table, so we want - // to start at the beginning time. - _, _ = fmt.Fprintf(&sb, - "(SELECT %[1]d AS t, nanos, logical, key, mut, before "+ - "FROM %[2]s "+ - "WHERE (nanos, logical) >= ($1, $2) "+ - "AND (nanos, logical) <= ($3, $4) "+ - "ORDER BY nanos, logical, key "+ - "LIMIT $5)", - id, destination) - } - } else { - // The differences in these cases have to do with the - // comparison operators around (nanos,logical) to pick the - // correct starting point for the scan. - if id == offsetTableIdx { - _, _ = fmt.Fprintf(&sb, - "(SELECT %[1]d AS t, nanos, logical, key, mut, before "+ - "FROM %[2]s "+ - "WHERE ( (nanos, logical, key) > ($6, $7, $8) ) "+ - "AND (nanos, logical) <= ($3, $4) "+ - "ORDER BY nanos, logical, key "+ - "LIMIT $5)", - id, destination) - } else if id > offsetTableIdx { - _, _ = fmt.Fprintf(&sb, - "(SELECT %[1]d AS t, nanos, logical, key, mut, before "+ - "FROM %[2]s "+ - "WHERE (nanos, logical) >= ($6, $7) "+ - "AND (nanos, logical) <= ($3, $4) "+ - "ORDER BY nanos, logical, key "+ - "LIMIT $5)", - id, destination) - } else { - _, _ = fmt.Fprintf(&sb, - "(SELECT %[1]d AS t, nanos, logical, key, mut, before "+ - "FROM %[2]s "+ - "WHERE (nanos, logical) > ($6, $7) "+ - "AND (nanos, logical) <= ($3, $4) "+ - "ORDER BY nanos, logical, key "+ - "LIMIT $5)", - id, destination) - } - } + rows, err := tx.Query(ctx, q, + cursor.StartAt.Nanos(), + cursor.StartAt.Logical(), + cursor.EndBefore.Nanos(), + cursor.EndBefore.Logical(), + keyOffsets) + if err != nil { + return nil, false, errors.Wrap(err, q) + } + defer rows.Close() + + hadRows := false + // We want to reset StartAfterKey whenever we read into a new region + // of timestamps. + epoch := cursor.StartAt + for rows.Next() { + var mut types.Mutation + var tableIdx int + var nanos int64 + var logical int + if err := rows.Scan(&tableIdx, &nanos, &logical, &mut.Key, &mut.Data, &mut.Before); err != nil { + return nil, false, errors.WithStack(err) } - sb.WriteString(`) SELECT t, nanos, logical, key, mut, before FROM data `) - if q.Backfill { - sb.WriteString(`ORDER BY t, nanos, logical, key LIMIT $5`) - } else { - sb.WriteString(`ORDER BY nanos, logical, t, key LIMIT $5`) + mut.Before, err = maybeGunzip(mut.Before) + if err != nil { + return nil, false, err } - - var err error - // Rows closed in defer statement above. - rows, err = tx.Query(ctx, sb.String(), - q.Start.Nanos(), - q.Start.Logical(), - q.End.Nanos(), - q.End.Logical(), - q.Limit, - q.OffsetTime.Nanos(), - q.OffsetTime.Logical(), - string(q.OffsetKey), - ) + mut.Data, err = maybeGunzip(mut.Data) if err != nil { - return errors.Wrap(err, sb.String()) + return nil, false, err } - - count := 0 - var lastMut types.Mutation - var lastTable ident.Table - for rows.Next() { - count++ - var mut types.Mutation - var tableIdx int - var nanos int64 - var logical int - if err := rows.Scan(&tableIdx, &nanos, &logical, &mut.Key, &mut.Data, &mut.Before); err != nil { - return errors.WithStack(err) - } - mut.Before, err = maybeGunzip(mut.Before) - if err != nil { - return err - } - mut.Data, err = maybeGunzip(mut.Data) - if err != nil { - return err - } - mut.Time = hlc.New(nanos, logical) - - lastMut = mut - lastTable = idsToTables[tableIdx] - if err := fn(ctx, lastTable, mut); err != nil { - return err - } + mut.Time = hlc.New(nanos, logical) + if hlc.Compare(mut.Time, epoch) > 0 { + epoch = mut.Time + cursor.StartAt = epoch + cursor.StartAfterKey = ident.TableMap[json.RawMessage]{} } + cursor.StartAfterKey.Put(cursor.Targets[tableIdx], mut.Key) + hadRows = true - // Only update the cursor if we've successfully read the entire page. - q.OffsetKey = lastMut.Key - q.OffsetTime = lastMut.Time - q.OffsetTable = lastTable - - log.WithFields(log.Fields{ - "count": count, - "duration": time.Since(start), - "targets": q.Targets, - }).Debug("retrieved staged mutations") - - // We can stop once we've received less than a max-sized window - // of mutations. Otherwise, loop around. - if count < q.Limit { - return nil + if err := fn(ctx, cursor.Targets[tableIdx], mut); err != nil { + return nil, false, err } } + if err := rows.Err(); err != nil { + return nil, false, errors.WithStack(err) + } + + return cursor, hadRows, nil } diff --git a/internal/staging/stage/metrics.go b/internal/staging/stage/metrics.go index f4e6cc11a..1d578b86f 100644 --- a/internal/staging/stage/metrics.go +++ b/internal/staging/stage/metrics.go @@ -32,7 +32,6 @@ var ( Name: "stage_retire_errors_total", Help: "the number of times an error was encountered while retiring mutations", }, metrics.TableLabels) - stageSelectCount = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "stage_select_mutations_total", Help: "the number of mutations read for this table", @@ -46,7 +45,10 @@ var ( Name: "stage_select_errors_total", Help: "the number of times an error was encountered while selecting mutations", }, metrics.TableLabels) - + stageStaleMutations = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "stage_stale_mutations_count", + Help: "the number of un-applied staged mutations left after retiring applied mutations", + }, metrics.TableLabels) stageStoreCount = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "stage_store_mutations_total", Help: "the number of mutations stored for this table", diff --git a/internal/staging/stage/queries/unstage.tmpl b/internal/staging/stage/queries/unstage.tmpl new file mode 100644 index 000000000..4b5b169a9 --- /dev/null +++ b/internal/staging/stage/queries/unstage.tmpl @@ -0,0 +1,107 @@ +{{- /*gotype: github.com/cockroachdb/cdc-sink/internal/staging/stage.templateData*/ -}} +{{- $top := . -}} +WITH {{- sp -}} + +{{- /* +Select the minimum unapplied timestamp(s) from the source table. + +We start scanning at some timestamp, but after a starting key, which +could be a zero-length string. + +hlc_0 AS ( + SELECT nanos, logical FROM staging_table + WHERE (nanos, logical, key) > (start_at_nanos, start_at_logical, start_after_key) + AND (nanos, locical) < (end_before_nanos, end_before_logical) + AND NOT APPLIED + GROUP BY nanos, logical -- We want multi-column distinct behavior + ORDER BY nanos, logical + LIMIT N +) + +*/ -}} +{{- range $idx, $tgt := .Cursor.Targets -}} +{{- if $idx -}}, {{- nl -}}{{- end -}} +hlc_{{ $idx }} (n, l) AS ( +SELECT nanos, logical +FROM {{ $top.StagingTable $tgt }} +WHERE (nanos, logical, key) > ($1, $2, ($5::STRING[])[ {{- add $idx 1 -}} ]) +AND (nanos, logical) < ($3, $4) +AND NOT applied +GROUP BY nanos, logical +ORDER BY nanos, logical +LIMIT {{ or $top.Cursor.TimestampLimit 1 }} +) +{{- end -}} + +{{- /* +Select the minimum timestamp(s) across all source tables. + +hlc_all AS ( + SELECT n, l FROM hlc_0 UNION ALL + SELECT n, l FROM hlc_1 UNION ALL ... + SELECT n, l FROM hlc_N +), +hlc_min AS (SELECT n, l FROM hlc_all GROUY BY n, l ORDER BY n, l LIMIT N) + +*/ -}} +, {{- nl -}} +hlc_all AS ( +{{- range $idx, $tgt := .Cursor.Targets -}} +{{- if $idx }} UNION ALL {{- nl -}}{{- end -}} +SELECT n, l FROM hlc_{{ $idx }} +{{- end -}} +), +hlc_min AS (SELECT n, l FROM hlc_all GROUP BY n, l ORDER BY n, l LIMIT {{ or .Cursor.TimestampLimit 1 }}) + +{{- /* +Set the applied column and return the data. + +We want to update any non-applied mutations after the starting key +and within the HLC timestamp that we're operating on. + +data_0 AS ( + UPDATE staging_table SET applied=true + FROM hlc_min + WHERE (nanos, logical) IN (hlc_min) + AND (nanos, logical, key) > (start_at_nanos, start_at_logical, start_after_key) + AND NOT APPLIED + [ ORDER BY nanos, logical, key + LIMIT n ] + RETURNING nanos, logical, key, mut, before +) +*/ -}} +{{- range $idx, $tgt := .Cursor.Targets -}} +, {{- nl -}} +data_{{ $idx }} AS ( +UPDATE {{ $top.StagingTable $tgt }} +SET applied=true +FROM hlc_min +WHERE (nanos,logical) = (n, l) +AND (nanos, logical, key) > ($1, $2, ($5::STRING[])[ {{- add $idx 1 -}} ]) +AND NOT applied +{{- if $top.Cursor.UpdateLimit -}} {{- nl -}} +ORDER BY nanos, logical, key +LIMIT {{ $top.Cursor.UpdateLimit }} +{{- end -}} +{{- nl -}} RETURNING nanos, logical, key, mut, before) +{{- end -}} +{{- nl -}} + +{{- /* +Top-level query aggregates the updates in table order. + +SELECT 0 idx, * FROM data_0 UNION ALL +SELECT 1 idx, * FROM data_1 UNION ALL ... +SELECT N idx, * FROM data_N +ORDER BY nanos, logical, idx, key + +*/ -}} +SELECT * FROM ( {{- nl -}} +{{- range $idx, $tgt := .Cursor.Targets -}} +{{- if $idx }} UNION ALL {{- nl -}}{{- end -}} +SELECT {{ $idx }} idx, nanos, logical, key, mut, before FROM data_{{ $idx }} +{{- end -}} +) {{- nl -}} +ORDER BY nanos, logical, idx, key + +{{- /* Consume whitespace */ -}} diff --git a/internal/staging/stage/stage.go b/internal/staging/stage/stage.go index f3592a9ba..948e0e924 100644 --- a/internal/staging/stage/stage.go +++ b/internal/staging/stage/stage.go @@ -31,6 +31,8 @@ import ( "github.com/cockroachdb/cdc-sink/internal/util/hlc" "github.com/cockroachdb/cdc-sink/internal/util/ident" "github.com/cockroachdb/cdc-sink/internal/util/metrics" + "github.com/cockroachdb/cdc-sink/internal/util/msort" + "github.com/cockroachdb/cdc-sink/internal/util/notify" "github.com/cockroachdb/cdc-sink/internal/util/retry" "github.com/cockroachdb/cdc-sink/internal/util/stopper" "github.com/jackc/pgx/v5" @@ -54,29 +56,42 @@ func stagingTable(stagingDB ident.Schema, target ident.Table) ident.Table { type stage struct { // The staging table that holds the mutations. stage ident.Table - retireFrom hlc.Time // Makes subsequent calls to Retire() a bit faster. + retireFrom notify.Var[hlc.Time] // Makes subsequent calls to Retire() a bit faster. retireDuration prometheus.Observer retireError prometheus.Counter selectCount prometheus.Counter selectDuration prometheus.Observer selectError prometheus.Counter + staleCount prometheus.Gauge storeCount prometheus.Counter storeDuration prometheus.Observer storeError prometheus.Counter // Compute SQL fragments exactly once on startup. sql struct { - nextAfter string // Find a timestamp for which data is available. - retire string // Delete a batch of staged mutations - store string // store mutations - sel string // select all rows in the timeframe from the staging table - selPartial string // select limited number of rows from the staging table + retire string // Delete a batch of staged mutations. + store string // Store mutations. + unapplied string // Count stale, unapplied mutations. } } var _ types.Stager = (*stage)(nil) +const tableSchema = ` +CREATE TABLE IF NOT EXISTS %[1]s ( + nanos INT NOT NULL, + logical INT NOT NULL, + key STRING NOT NULL, + mut BYTES NOT NULL, + before BYTES NULL, + applied BOOL NOT NULL DEFAULT false, + %[2]s + PRIMARY KEY (nanos, logical, key), + FAMILY cold (mut, before), + FAMILY hot (applied) +)` + // newStore constructs a new mutation stage that will track pending // mutations to be applied to the given target table. func newStore( @@ -84,16 +99,21 @@ func newStore( ) (*stage, error) { table := stagingTable(stagingDB, target) - if err := retry.Execute(ctx, db, fmt.Sprintf(` -CREATE TABLE IF NOT EXISTS %s ( - nanos INT NOT NULL, - logical INT NOT NULL, - key STRING NOT NULL, - mut BYTES NOT NULL, - before BYTES NULL, - PRIMARY KEY (nanos, logical, key) -)`, table)); err != nil { - return nil, errors.WithStack(err) + // Try to create the staging table with a helper virtual column. We + // never query for it, so it should have essentially no cost. + if err := retry.Execute(ctx, db, fmt.Sprintf(tableSchema, table, + `source_time TIMESTAMPTZ AS (to_timestamp(nanos::float/1e9)) VIRTUAL,`)); err != nil { + + // Old versions of CRDB don't know about to_timestamp(). Try + // again without the helper column. + if pgErr := (*pgconn.PgError)(nil); errors.As(err, &pgErr) { + if pgErr.Code == "42883" /* unknown function */ { + err = retry.Execute(ctx, db, fmt.Sprintf(tableSchema, table, "")) + } + } + if err != nil { + return nil, errors.WithStack(err) + } } // Transparently upgrade older staging tables. This avoids needing @@ -112,179 +132,65 @@ ALTER TABLE %[1]s ADD COLUMN IF NOT EXISTS before BYTES NULL selectCount: stageSelectCount.WithLabelValues(labels...), selectDuration: stageSelectDurations.WithLabelValues(labels...), selectError: stageSelectErrors.WithLabelValues(labels...), + staleCount: stageStaleMutations.WithLabelValues(labels...), storeCount: stageStoreCount.WithLabelValues(labels...), storeDuration: stageStoreDurations.WithLabelValues(labels...), storeError: stageStoreErrors.WithLabelValues(labels...), } - s.sql.nextAfter = fmt.Sprintf(nextAfterTemplate, table) s.sql.retire = fmt.Sprintf(retireTemplate, table) - s.sql.sel = fmt.Sprintf(selectTemplateAll, table) - s.sql.selPartial = fmt.Sprintf(selectTemplatePartial, table) s.sql.store = fmt.Sprintf(putTemplate, table) + s.sql.unapplied = fmt.Sprintf(countTemplate, table) - return s, nil -} - -// GetTable returns the table that the stage is storing into. -func (s *stage) GetTable() ident.Table { return s.stage } - -const nextAfterTemplate = ` -SELECT DISTINCT nanos, logical - FROM %[1]s -WHERE (nanos, logical) > ($1, $2) AND (nanos, logical) <= ($3, $4) -ORDER BY nanos, logical -` + // Report unapplied mutations on a periodic basis. + ctx.Go(func() error { + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() -// TransactionTimes implements types.Stager and returns timestamps for -// which data is available in the (before, after] range. -func (s *stage) TransactionTimes( - ctx context.Context, tx types.StagingQuerier, before, after hlc.Time, -) ([]hlc.Time, error) { - var ret []hlc.Time - err := retry.Retry(ctx, func(ctx context.Context) error { - ret = ret[:0] // Reset if retrying. - rows, err := tx.Query(ctx, - s.sql.nextAfter, - before.Nanos(), - before.Logical(), - after.Nanos(), - after.Logical(), - ) - if err != nil { - return errors.WithStack(err) - } + for from, changed := s.retireFrom.Get(); ; { + ct, err := s.CountUnapplied(ctx, db, from) + if err != nil { + log.WithError(err).Warnf("could not count unapplied mutations for target: %s", target) + } else { + s.staleCount.Set(float64(ct)) + } - for rows.Next() { - var nanos int64 - var logical int - if err := rows.Scan(&nanos, &logical); err != nil { - return errors.WithStack(err) + select { + case <-ctx.Stopping(): + return nil + case <-changed: + from, changed = s.retireFrom.Get() + case <-ticker.C: + // Ensure that values get reset if this instance of + // cdc-sink isn't the one that's actively resolving or + // retiring mutations. } - ret = append(ret, hlc.New(nanos, logical)) } - return errors.WithStack(rows.Err()) }) - return ret, err -} - -// ($1, $2) starting resolved timestamp -// ($3, $4) ending resolved timestamp -// -// This query drains all mutations between the given timestamps, -// returning the latest timestamped value for any given key. -const selectTemplateAll = ` -SELECT key, nanos, logical, mut, before - FROM %[1]s - WHERE (nanos, logical) BETWEEN ($1, $2) AND ($3, $4) - ORDER BY nanos, logical` - -// ($1, $2) starting resolved timestamp -// ($3, $4) ending resolved timestamp -// $5 starting key to skip -// $6 limit -// -// For the kind of very large datasets that we see in a backfill -// scenario, it's not feasible to deduplicate updates for a given key -// with in the time range. We would have to scan forward to all -// timestamps within the given timestamp range to see if there's a key -// value to be had. An index over (key, nanos, time) doesn't help, -// either for key-based pagination, since we now have to read through -// all keys to identify those with a mutation in the desired window. -const selectTemplatePartial = ` -SELECT key, nanos, logical, mut, before - FROM %[1]s - WHERE (nanos, logical, key) > ($1, $2, $5) - AND (nanos, logical) <= ($3, $4) - ORDER BY nanos, logical, key - LIMIT $6` - -// Select implements types.Stager. -func (s *stage) Select( - ctx context.Context, tx types.StagingQuerier, prev, next hlc.Time, -) ([]types.Mutation, error) { - return s.SelectPartial(ctx, tx, prev, next, nil, -1) + return s, nil } -// SelectPartial implements types.Stager. -func (s *stage) SelectPartial( - ctx context.Context, tx types.StagingQuerier, prev, next hlc.Time, afterKey []byte, limit int, -) ([]types.Mutation, error) { - if hlc.Compare(prev, next) > 0 { - return nil, errors.Errorf("timestamps out of order: %s > %s", prev, next) - } - - start := time.Now() - var ret []types.Mutation - if limit > 0 { - ret = make([]types.Mutation, 0, limit) - } +const countTemplate = ` +SELECT count(*) FROM %s +WHERE (nanos, logical) < ($1, $2) AND NOT applied +` +// CountUnapplied returns the number of dangling mutations that likely +// indicate an error condition. +func (s *stage) CountUnapplied( + ctx context.Context, db types.StagingQuerier, before hlc.Time, +) (int, error) { + var ret int err := retry.Retry(ctx, func(ctx context.Context) error { - ret = ret[:0] - var rows pgx.Rows - var err error - if limit <= 0 { - rows, err = tx.Query(ctx, s.sql.sel, - prev.Nanos(), prev.Logical(), next.Nanos(), next.Logical()) - } else { - rows, err = tx.Query(ctx, s.sql.selPartial, - prev.Nanos(), prev.Logical(), next.Nanos(), next.Logical(), string(afterKey), limit) - } - if err != nil { - return err - } - defer rows.Close() - - for rows.Next() { - var mut types.Mutation - var nanos int64 - var logical int - if err := rows.Scan(&mut.Key, &nanos, &logical, &mut.Data, &mut.Before); err != nil { - return err - } - mut.Data, err = maybeGunzip(mut.Data) - if err != nil { - return err - } - mut.Time = hlc.New(nanos, logical) - ret = append(ret, mut) - } - return nil + return db.QueryRow(ctx, s.sql.unapplied, before.Nanos(), before.Logical()).Scan(&ret) }) - - if err != nil { - if pgErr := (*pgconn.PgError)(nil); errors.As(err, &pgErr) { - // Staging table not found. Most likely cause is a reference - // table which was restored via backup for which we haven't - // seen any updates. - if pgErr.Code == "42P01" { - return nil, nil - } - } - s.selectError.Inc() - return nil, errors.Wrapf(err, "select %s [%s, %s]", s.stage, prev, next) - } - - // Don't bother recording stats about no-op selects. - if len(ret) == 0 { - return nil, nil - } - - d := time.Since(start) - s.selectDuration.Observe(d.Seconds()) - s.selectCount.Add(float64(len(ret))) - log.WithFields(log.Fields{ - "count": len(ret), - "duration": d, - "next": next, - "prev": prev, - "target": s.stage, - }).Debug("select mutations") - return ret, nil + return ret, errors.Wrap(err, s.sql.unapplied) } +// GetTable returns the table that the stage is storing into. +func (s *stage) GetTable() ident.Table { return s.stage } + // The byte-array casts on $4 and $5 are because arrays of JSONB aren't implemented: // https://github.com/cockroachdb/cockroach/issues/23468 const putTemplate = ` @@ -297,6 +203,8 @@ func (s *stage) Store( ) error { start := time.Now() + mutations = msort.UniqueByTimeKey(mutations) + // If we're working with a pool, and not a transaction, we'll stage // the data in a concurrent manner. var err error @@ -388,7 +296,7 @@ func (s *stage) putOne( const retireTemplate = ` WITH d AS ( DELETE FROM %s - WHERE (nanos, logical) BETWEEN ($1, $2) AND ($3, $4) + WHERE (nanos, logical) BETWEEN ($1, $2) AND ($3, $4) AND applied ORDER BY nanos, logical LIMIT $5 RETURNING nanos, logical) @@ -400,12 +308,13 @@ SELECT last_value(nanos) OVER (), last_value(logical) OVER () func (s *stage) Retire(ctx context.Context, db types.StagingQuerier, end hlc.Time) error { start := time.Now() err := retry.Retry(ctx, func(ctx context.Context) error { - for hlc.Compare(s.retireFrom, end) < 0 { + from, _ := s.retireFrom.Get() + for hlc.Compare(from, end) < 0 { var lastNanos int64 var lastLogical int err := db.QueryRow(ctx, s.sql.retire, - s.retireFrom.Nanos(), - s.retireFrom.Logical(), + from.Nanos(), + from.Logical(), end.Nanos(), end.Logical(), 10000, // Make configurable? @@ -417,12 +326,13 @@ func (s *stage) Retire(ctx context.Context, db types.StagingQuerier, end hlc.Tim if err != nil { return errors.WithStack(err) } - s.retireFrom = hlc.New(lastNanos, lastLogical) + from = hlc.New(lastNanos, lastLogical) } // If there was nothing to delete, still advance the marker. - if hlc.Compare(s.retireFrom, end) < 0 { - s.retireFrom = end + if hlc.Compare(from, end) < 0 { + from = end } + s.retireFrom.Set(from) return nil }) if err == nil { diff --git a/internal/staging/stage/stage_test.go b/internal/staging/stage/stage_test.go index 4160c354f..e6ef93969 100644 --- a/internal/staging/stage/stage_test.go +++ b/internal/staging/stage/stage_test.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cdc-sink/internal/types" "github.com/cockroachdb/cdc-sink/internal/util/hlc" "github.com/cockroachdb/cdc-sink/internal/util/ident" + "github.com/jackc/pgx/v5" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -39,11 +40,10 @@ import ( // TestPutAndDrain will insert and dequeue a batch of Mutations. func TestPutAndDrain(t *testing.T) { a := assert.New(t) + r := require.New(t) fixture, cancel, err := all.NewFixture() - if !a.NoError(err) { - return - } + r.NoError(err) defer cancel() ctx := fixture.Context @@ -54,13 +54,16 @@ func TestPutAndDrain(t *testing.T) { dummyTarget := ident.NewTable(targetDB, ident.New("target")) s, err := fixture.Stagers.Get(ctx, dummyTarget) - if !a.NoError(err) { - return - } + r.NoError(err) a.NotNil(s) + // Not part of public API, but we want to test the metrics function. + ctr := s.(interface { + CountUnapplied(ctx context.Context, db types.StagingQuerier, before hlc.Time) (int, error) + }) + jumbledStager, err := fixture.Stagers.Get(ctx, sinktest.JumbleTable(dummyTarget)) - a.NoError(err) + r.NoError(err) a.Same(s, jumbledStager) // Steal implementation details to cross-check DB state. @@ -80,134 +83,65 @@ func TestPutAndDrain(t *testing.T) { muts[i].Before = []byte("before") } } - maxTime := muts[len(muts)-1].Time - - // Check TransactionTimes in empty state. - found, err := s.TransactionTimes(ctx, pool, hlc.Zero(), maxTime) - a.Empty(found) - a.NoError(err) // Insert. - a.NoError(s.Store(ctx, pool, muts)) - - // We should find all timestamps now. - found, err = s.TransactionTimes(ctx, pool, hlc.Zero(), maxTime) - if a.Len(found, len(muts)) { - a.Equal(muts[0].Time, found[0]) - a.Equal(maxTime, found[len(found)-1]) - } - a.NoError(err) - - // Advance once. - found, err = s.TransactionTimes(ctx, pool, muts[0].Time, maxTime) - if a.Len(found, len(muts)-1) { - a.Equal(muts[1].Time, found[0]) - a.Equal(maxTime, found[len(found)-1]) - } - a.NoError(err) - - // Make sure we don't find the last value. - found, err = s.TransactionTimes(ctx, pool, maxTime, maxTime) - a.Empty(found) - a.NoError(err) + r.NoError(s.Store(ctx, pool, muts)) // Sanity-check table. count, err := base.GetRowCount(ctx, pool, stagingTable) - a.NoError(err) + r.NoError(err) a.Equal(total, count) // Ensure that data insertion is idempotent. - a.NoError(s.Store(ctx, pool, muts)) + r.NoError(s.Store(ctx, pool, muts)) // Sanity-check table. count, err = base.GetRowCount(ctx, pool, stagingTable) - a.NoError(err) + r.NoError(err) a.Equal(total, count) - // Insert an older value for each key, we'll check that only the - // latest values are returned below. - older := make([]types.Mutation, total) - copy(older, muts) - for i := range older { - older[i].Data = []byte(`"should not see this"`) - older[i].Time = hlc.New(older[i].Time.Nanos()-1, i) - } - a.NoError(s.Store(ctx, pool, older)) + // Verify metrics query. + count, err = ctr.CountUnapplied(ctx, pool, hlc.New(math.MaxInt64, 0)) + r.NoError(err) + a.Equal(total, count) - // Sanity-check table. - count, err = base.GetRowCount(ctx, pool, stagingTable) - a.NoError(err) - a.Equal(2*total, count) - - // The two queries that we're going to run will see slightly - // different views of the data. The all-data query will provide - // deduplicated data, ordered by the target key. The incremental - // query will see all values interleaved, which allows us to - // page within a (potentially large) backfill window. - dedupOrder := make([]types.Mutation, total) - copy(dedupOrder, muts) - sort.Slice(dedupOrder, func(i, j int) bool { - return bytes.Compare(dedupOrder[i].Key, dedupOrder[j].Key) < 0 - }) - mergedOrder := make([]types.Mutation, 2*total) - for i := range muts { - mergedOrder[2*i] = older[i] - mergedOrder[2*i+1] = muts[i] + // Select all mutations to set the applied flag. This allows the + // mutations to be deleted. + cursor := &types.UnstageCursor{ + EndBefore: hlc.New(math.MaxInt64, 0), + Targets: []ident.Table{dummyTarget}, } - - // Test retrieving all data. - ret, err := s.Select(ctx, pool, hlc.Zero(), hlc.New(int64(1000*total+1), 0)) - a.NoError(err) - a.Equal(mergedOrder, ret) - - // Retrieve a few pages of partial values, validate expected boundaries. - const limit = 10 - var tail types.Mutation - for i := 0; i < 10; i++ { - ret, err = s.SelectPartial(ctx, - pool, - tail.Time, - muts[len(muts)-1].Time, - tail.Key, - limit, - ) - a.NoError(err) - a.Equalf(mergedOrder[i*limit:(i+1)*limit], ret, "at idx %d", i) - tail = ret[len(ret)-1] + unstagedCount := 0 + for unstaging := true; unstaging; { + cursor, unstaging, err = fixture.Stagers.Unstage(ctx, pool, cursor, + func(context.Context, ident.Table, types.Mutation) error { + unstagedCount++ + return nil + }) + r.NoError(err) } - a.NoError(s.Retire(ctx, pool, muts[limit-1].Time)) - - // Verify that reading from the end returns no results. - ret, err = s.SelectPartial(ctx, - pool, - muts[len(muts)-1].Time, - hlc.New(math.MaxInt64, 0), - muts[len(muts)-1].Key, - limit, - ) - a.NoError(err) - a.Empty(ret) - - // Check deletion. We have two timestamps for each - count, err = base.GetRowCount(ctx, pool, stagingTable) - a.NoError(err) - a.Equal(2*total-2*limit, count) + a.Equal(total, unstagedCount) - // Dequeue remainder. - a.NoError(s.Retire(ctx, pool, muts[len(muts)-1].Time)) + // Verify metrics query. + count, err = ctr.CountUnapplied(ctx, pool, hlc.New(math.MaxInt64, 0)) + r.NoError(err) + a.Zero(count) + + // Retire mutations. + r.NoError(s.Retire(ctx, pool, muts[len(muts)-1].Time)) // Should be empty now. count, err = base.GetRowCount(ctx, pool, stagingTable) - a.NoError(err) + r.NoError(err) a.Equal(0, count) // Verify various no-op calls are OK. - a.NoError(s.Retire(ctx, pool, hlc.Zero())) - a.NoError(s.Retire(ctx, pool, muts[len(muts)-1].Time)) - a.NoError(s.Retire(ctx, pool, hlc.New(muts[len(muts)-1].Time.Nanos()+1, 0))) + r.NoError(s.Retire(ctx, pool, hlc.Zero())) + r.NoError(s.Retire(ctx, pool, muts[len(muts)-1].Time)) + r.NoError(s.Retire(ctx, pool, hlc.New(muts[len(muts)-1].Time.Nanos()+1, 0))) } -func TestSelectMany(t *testing.T) { +func TestUnstage(t *testing.T) { const entries = 100 const tableCount = 10 a := assert.New(t) @@ -303,24 +237,34 @@ func TestSelectMany(t *testing.T) { a := assert.New(t) r := require.New(t) - q := &types.SelectManyCursor{ - End: hlc.New(100*entries, 0), // Past any existing time. - Limit: entries/2 - 1, // Validate paging - Targets: [][]ident.Table{ - {tables[0]}, - // {tables[1], tables[2]}, - // {tables[3], tables[4], tables[5]}, - // {tables[6], tables[7], tables[8], tables[9]}, - }, + q := &types.UnstageCursor{ + EndBefore: hlc.New(100*entries, 0), // Past any existing time. + Targets: tables, } - entriesByTable := &ident.TableMap[[]types.Mutation]{} - err := fixture.Stagers.SelectMany(ctx, fixture.StagingPool, q, - func(ctx context.Context, tbl ident.Table, mut types.Mutation) error { - entriesByTable.Put(tbl, append(entriesByTable.GetZero(tbl), mut)) - return nil - }) + // Run the select in a discarded transaction to avoid + // contaminating future tests with side effects. + tx, err := fixture.StagingPool.BeginTx(ctx, pgx.TxOptions{}) r.NoError(err) + defer func() { _ = tx.Rollback(ctx) }() + + entriesByTable := &ident.TableMap[[]types.Mutation]{} + numSelections := 0 + for selecting := true; selecting; { + q, selecting, err = fixture.Stagers.Unstage(ctx, tx, q, + func(ctx context.Context, tbl ident.Table, mut types.Mutation) error { + entriesByTable.Put(tbl, append(entriesByTable.GetZero(tbl), mut)) + return nil + }) + r.NoError(err) + numSelections++ + } + // Refer to comment about the distribution of timestamps. + // There are two large batches, then each mutation has two + // unique timestamps, and then there's a final call that returns + // false. + a.Equal(2+2*entries+1, numSelections) + r.NoError(entriesByTable.Range(func(_ ident.Table, seen []types.Mutation) error { if a.Len(seen, len(expectedMutOrder)) { a.Equal(expectedMutOrder, seen) @@ -334,25 +278,32 @@ func TestSelectMany(t *testing.T) { a := assert.New(t) r := require.New(t) - q := &types.SelectManyCursor{ - Start: hlc.New(2, 0), // Skip the initial transaction - End: hlc.New(10*entries, 0), // Read the second large batch - Limit: entries/2 - 1, // Validate paging - Targets: [][]ident.Table{ - {tables[0]}, - {tables[1], tables[2]}, - {tables[3], tables[4], tables[5]}, - {tables[6], tables[7], tables[8], tables[9]}, - }, + q := &types.UnstageCursor{ + StartAt: hlc.New(2, 0), // Skip the initial transaction + EndBefore: hlc.New(10*entries, 1), // Read the second large batch + Targets: tables, } - entriesByTable := &ident.TableMap[[]types.Mutation]{} - err := fixture.Stagers.SelectMany(ctx, fixture.StagingPool, q, - func(ctx context.Context, tbl ident.Table, mut types.Mutation) error { - entriesByTable.Put(tbl, append(entriesByTable.GetZero(tbl), mut)) - return nil - }) + // Run the select in a discarded transaction to avoid + // contaminating future tests with side effects. + tx, err := fixture.StagingPool.BeginTx(ctx, pgx.TxOptions{}) r.NoError(err) + defer func() { _ = tx.Rollback(ctx) }() + + entriesByTable := &ident.TableMap[[]types.Mutation]{} + numSelections := 0 + for selecting := true; selecting; { + q, selecting, err = fixture.Stagers.Unstage(ctx, tx, q, + func(ctx context.Context, tbl ident.Table, mut types.Mutation) error { + entriesByTable.Put(tbl, append(entriesByTable.GetZero(tbl), mut)) + return nil + }) + r.NoError(err) + numSelections++ + } + // We expect to see one large batch, a timestamp for each entry, + // and the final zero-results call. + a.Equal(1+entries+1, numSelections) r.NoError(entriesByTable.Range(func(_ ident.Table, seen []types.Mutation) error { if a.Len(seen, 2*entries) { a.Equal(expectedMutOrder[entries:3*entries], seen) @@ -361,40 +312,36 @@ func TestSelectMany(t *testing.T) { })) }) - // What's different about the backfill case is that we want to - // verify that if we see an update for a table in group G, then we - // already have all data for group G-1. - t.Run("backfill", func(t *testing.T) { + // Read from the staging tables using the limit, to simulate + // very large batches. + t.Run("transactional-incremental", func(t *testing.T) { a := assert.New(t) r := require.New(t) - q := &types.SelectManyCursor{ - Backfill: true, - End: hlc.New(100*entries, 0), // Read the second large batch - Limit: entries/2 - 1, // Validate paging - Targets: [][]ident.Table{ - {tables[0]}, - {tables[1], tables[2]}, - {tables[3], tables[4], tables[5]}, - {tables[6], tables[7], tables[8], tables[9]}, - }, + q := &types.UnstageCursor{ + EndBefore: hlc.New(100*entries, 0), // Past any existing time. + Targets: tables, + UpdateLimit: 20, } - entriesByTable := &ident.TableMap[[]types.Mutation]{} - err := fixture.Stagers.SelectMany(ctx, fixture.StagingPool, q, - func(ctx context.Context, tbl ident.Table, mut types.Mutation) error { - entriesByTable.Put(tbl, append(entriesByTable.GetZero(tbl), mut)) - - // Check that all data for parent groups have been received. - if group := tableToGroup.GetZero(tbl); group > 1 { - for _, tableToCheck := range tableGroups[group-1] { - r.Len(entriesByTable.GetZero(tableToCheck), len(muts)) - } - } - return nil - }) + // Run the select in a discarded transaction to avoid + // contaminating future tests with side effects. + tx, err := fixture.StagingPool.BeginTx(ctx, pgx.TxOptions{}) r.NoError(err) + defer func() { _ = tx.Rollback(ctx) }() + entriesByTable := &ident.TableMap[[]types.Mutation]{} + numSelections := 0 + for selecting := true; selecting; { + q, selecting, err = fixture.Stagers.Unstage(ctx, tx, q, + func(ctx context.Context, tbl ident.Table, mut types.Mutation) error { + entriesByTable.Put(tbl, append(entriesByTable.GetZero(tbl), mut)) + return nil + }) + r.NoError(err) + numSelections++ + } + a.Equal(211, numSelections) r.NoError(entriesByTable.Range(func(_ ident.Table, seen []types.Mutation) error { if a.Len(seen, len(expectedMutOrder)) { a.Equal(expectedMutOrder, seen) @@ -402,47 +349,6 @@ func TestSelectMany(t *testing.T) { return nil })) }) - - // Similar to the backfill test, but we read a subset of the data. - t.Run("backfill-bounded", func(t *testing.T) { - a := assert.New(t) - r := require.New(t) - - q := &types.SelectManyCursor{ - Start: hlc.New(2, 0), // Skip the initial transaction - End: hlc.New(10*entries, 0), // Read the second large batch - Backfill: true, - Limit: entries/2 - 1, // Validate paging - Targets: [][]ident.Table{ - {tables[0]}, - {tables[1], tables[2]}, - {tables[3], tables[4], tables[5]}, - {tables[6], tables[7], tables[8], tables[9]}, - }, - } - - entriesByTable := &ident.TableMap[[]types.Mutation]{} - err := fixture.Stagers.SelectMany(ctx, fixture.StagingPool, q, - func(ctx context.Context, tbl ident.Table, mut types.Mutation) error { - entriesByTable.Put(tbl, append(entriesByTable.GetZero(tbl), mut)) - - // Check that all data for parent groups have been received. - if group := tableToGroup.GetZero(tbl); group > 1 { - for _, tableToCheck := range tableGroups[group-1] { - r.Len(entriesByTable.GetZero(tableToCheck), 2*entries) - } - } - return nil - }) - r.NoError(err) - - r.NoError(entriesByTable.Range(func(_ ident.Table, seen []types.Mutation) error { - if a.Len(seen, 2*entries) { - a.Equal(expectedMutOrder[entries:3*entries], seen) - } - return nil - })) - }) } func BenchmarkStage(b *testing.B) { diff --git a/internal/staging/stage/templates.go b/internal/staging/stage/templates.go new file mode 100644 index 000000000..0a891445a --- /dev/null +++ b/internal/staging/stage/templates.go @@ -0,0 +1,59 @@ +// Copyright 2023 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package stage + +import ( + "embed" + "strings" + "text/template" + + "github.com/cockroachdb/cdc-sink/internal/types" + "github.com/cockroachdb/cdc-sink/internal/util/ident" + "github.com/pkg/errors" +) + +//go:embed queries/*.tmpl +var templateFS embed.FS + +var ( + tmplFuncs = template.FuncMap{ + "add": func(a, b int) int { return a + b }, + "nl": func() string { return "\n" }, + "sp": func() string { return " " }, + } + templates = template.Must( + template.New("").Funcs(tmplFuncs).ParseFS(templateFS, "queries/*.tmpl")) + unstage = templates.Lookup("unstage.tmpl") +) + +type templateData struct { + Cursor *types.UnstageCursor // Required input. + StagingSchema ident.Schema // Required input. +} + +func (d *templateData) Eval() (string, error) { + var sb strings.Builder + err := unstage.Execute(&sb, d) + if err != nil { + return "", errors.WithStack(err) + } + return sb.String(), nil +} + +func (d *templateData) StagingTable(tbl ident.Table) ident.Table { + return stagingTable(d.StagingSchema, tbl) +} diff --git a/internal/staging/stage/templates_test.go b/internal/staging/stage/templates_test.go new file mode 100644 index 000000000..6f8ae0fe7 --- /dev/null +++ b/internal/staging/stage/templates_test.go @@ -0,0 +1,94 @@ +// Copyright 2023 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package stage + +import ( + "fmt" + "os" + "testing" + + "github.com/cockroachdb/cdc-sink/internal/types" + "github.com/cockroachdb/cdc-sink/internal/util/hlc" + "github.com/cockroachdb/cdc-sink/internal/util/ident" + "github.com/stretchr/testify/require" +) + +const rewriteExpcted = false + +func TestRewriteIsFalse(t *testing.T) { + require.False(t, rewriteExpcted) +} + +func TestTimeOrderTemplate(t *testing.T) { + r := require.New(t) + + r.NotNil(unstage) + + sch := ident.MustSchema(ident.New("my_db"), ident.Public) + tbl0 := ident.NewTable(sch, ident.New("tbl0")) + tbl1 := ident.NewTable(sch, ident.New("tbl1")) + tbl2 := ident.NewTable(sch, ident.New("tbl2")) + tbl3 := ident.NewTable(sch, ident.New("tbl3")) + + staging := ident.MustSchema(ident.New("_cdc_sink"), ident.Public) + + tcs := []struct { + name string + cursor *types.UnstageCursor + }{ + { + name: "unstage", + cursor: &types.UnstageCursor{ + StartAt: hlc.New(1, 2), + EndBefore: hlc.New(3, 4), + Targets: []ident.Table{tbl0, tbl1, tbl2, tbl3}, + }, + }, + { + name: "unstage_limit", + cursor: &types.UnstageCursor{ + StartAt: hlc.New(1, 2), + EndBefore: hlc.New(3, 4), + TimestampLimit: 22, + Targets: []ident.Table{tbl0, tbl1, tbl2, tbl3}, + UpdateLimit: 10000, + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + r := require.New(t) + data := &templateData{ + Cursor: tc.cursor, + StagingSchema: staging, + } + + out, err := data.Eval() + r.NoError(err) + + filename := fmt.Sprintf("./testdata/%s.sql", tc.name) + if rewriteExpcted { + r.NoError(os.WriteFile(filename, []byte(out), 0644)) + } else { + data, err := os.ReadFile(filename) + r.NoError(err) + r.Equal(string(data), out) + } + }) + } +} diff --git a/internal/staging/stage/testdata/unstage.sql b/internal/staging/stage/testdata/unstage.sql new file mode 100644 index 000000000..9e44b40d4 --- /dev/null +++ b/internal/staging/stage/testdata/unstage.sql @@ -0,0 +1,83 @@ +WITH hlc_0 (n, l) AS ( +SELECT nanos, logical +FROM "_cdc_sink"."public"."my_db_public_tbl0" +WHERE (nanos, logical, key) > ($1, $2, ($5::STRING[])[1]) +AND (nanos, logical) < ($3, $4) +AND NOT applied +GROUP BY nanos, logical +ORDER BY nanos, logical +LIMIT 1 +), +hlc_1 (n, l) AS ( +SELECT nanos, logical +FROM "_cdc_sink"."public"."my_db_public_tbl1" +WHERE (nanos, logical, key) > ($1, $2, ($5::STRING[])[2]) +AND (nanos, logical) < ($3, $4) +AND NOT applied +GROUP BY nanos, logical +ORDER BY nanos, logical +LIMIT 1 +), +hlc_2 (n, l) AS ( +SELECT nanos, logical +FROM "_cdc_sink"."public"."my_db_public_tbl2" +WHERE (nanos, logical, key) > ($1, $2, ($5::STRING[])[3]) +AND (nanos, logical) < ($3, $4) +AND NOT applied +GROUP BY nanos, logical +ORDER BY nanos, logical +LIMIT 1 +), +hlc_3 (n, l) AS ( +SELECT nanos, logical +FROM "_cdc_sink"."public"."my_db_public_tbl3" +WHERE (nanos, logical, key) > ($1, $2, ($5::STRING[])[4]) +AND (nanos, logical) < ($3, $4) +AND NOT applied +GROUP BY nanos, logical +ORDER BY nanos, logical +LIMIT 1 +), +hlc_all AS (SELECT n, l FROM hlc_0 UNION ALL +SELECT n, l FROM hlc_1 UNION ALL +SELECT n, l FROM hlc_2 UNION ALL +SELECT n, l FROM hlc_3), +hlc_min AS (SELECT n, l FROM hlc_all GROUP BY n, l ORDER BY n, l LIMIT 1), +data_0 AS ( +UPDATE "_cdc_sink"."public"."my_db_public_tbl0" +SET applied=true +FROM hlc_min +WHERE (nanos,logical) = (n, l) +AND (nanos, logical, key) > ($1, $2, ($5::STRING[])[1]) +AND NOT applied +RETURNING nanos, logical, key, mut, before), +data_1 AS ( +UPDATE "_cdc_sink"."public"."my_db_public_tbl1" +SET applied=true +FROM hlc_min +WHERE (nanos,logical) = (n, l) +AND (nanos, logical, key) > ($1, $2, ($5::STRING[])[2]) +AND NOT applied +RETURNING nanos, logical, key, mut, before), +data_2 AS ( +UPDATE "_cdc_sink"."public"."my_db_public_tbl2" +SET applied=true +FROM hlc_min +WHERE (nanos,logical) = (n, l) +AND (nanos, logical, key) > ($1, $2, ($5::STRING[])[3]) +AND NOT applied +RETURNING nanos, logical, key, mut, before), +data_3 AS ( +UPDATE "_cdc_sink"."public"."my_db_public_tbl3" +SET applied=true +FROM hlc_min +WHERE (nanos,logical) = (n, l) +AND (nanos, logical, key) > ($1, $2, ($5::STRING[])[4]) +AND NOT applied +RETURNING nanos, logical, key, mut, before) +SELECT * FROM ( +SELECT 0 idx, nanos, logical, key, mut, before FROM data_0 UNION ALL +SELECT 1 idx, nanos, logical, key, mut, before FROM data_1 UNION ALL +SELECT 2 idx, nanos, logical, key, mut, before FROM data_2 UNION ALL +SELECT 3 idx, nanos, logical, key, mut, before FROM data_3) +ORDER BY nanos, logical, idx, key \ No newline at end of file diff --git a/internal/staging/stage/testdata/unstage_limit.sql b/internal/staging/stage/testdata/unstage_limit.sql new file mode 100644 index 000000000..5067cdfd5 --- /dev/null +++ b/internal/staging/stage/testdata/unstage_limit.sql @@ -0,0 +1,91 @@ +WITH hlc_0 (n, l) AS ( +SELECT nanos, logical +FROM "_cdc_sink"."public"."my_db_public_tbl0" +WHERE (nanos, logical, key) > ($1, $2, ($5::STRING[])[1]) +AND (nanos, logical) < ($3, $4) +AND NOT applied +GROUP BY nanos, logical +ORDER BY nanos, logical +LIMIT 22 +), +hlc_1 (n, l) AS ( +SELECT nanos, logical +FROM "_cdc_sink"."public"."my_db_public_tbl1" +WHERE (nanos, logical, key) > ($1, $2, ($5::STRING[])[2]) +AND (nanos, logical) < ($3, $4) +AND NOT applied +GROUP BY nanos, logical +ORDER BY nanos, logical +LIMIT 22 +), +hlc_2 (n, l) AS ( +SELECT nanos, logical +FROM "_cdc_sink"."public"."my_db_public_tbl2" +WHERE (nanos, logical, key) > ($1, $2, ($5::STRING[])[3]) +AND (nanos, logical) < ($3, $4) +AND NOT applied +GROUP BY nanos, logical +ORDER BY nanos, logical +LIMIT 22 +), +hlc_3 (n, l) AS ( +SELECT nanos, logical +FROM "_cdc_sink"."public"."my_db_public_tbl3" +WHERE (nanos, logical, key) > ($1, $2, ($5::STRING[])[4]) +AND (nanos, logical) < ($3, $4) +AND NOT applied +GROUP BY nanos, logical +ORDER BY nanos, logical +LIMIT 22 +), +hlc_all AS (SELECT n, l FROM hlc_0 UNION ALL +SELECT n, l FROM hlc_1 UNION ALL +SELECT n, l FROM hlc_2 UNION ALL +SELECT n, l FROM hlc_3), +hlc_min AS (SELECT n, l FROM hlc_all GROUP BY n, l ORDER BY n, l LIMIT 22), +data_0 AS ( +UPDATE "_cdc_sink"."public"."my_db_public_tbl0" +SET applied=true +FROM hlc_min +WHERE (nanos,logical) = (n, l) +AND (nanos, logical, key) > ($1, $2, ($5::STRING[])[1]) +AND NOT applied +ORDER BY nanos, logical, key +LIMIT 10000 +RETURNING nanos, logical, key, mut, before), +data_1 AS ( +UPDATE "_cdc_sink"."public"."my_db_public_tbl1" +SET applied=true +FROM hlc_min +WHERE (nanos,logical) = (n, l) +AND (nanos, logical, key) > ($1, $2, ($5::STRING[])[2]) +AND NOT applied +ORDER BY nanos, logical, key +LIMIT 10000 +RETURNING nanos, logical, key, mut, before), +data_2 AS ( +UPDATE "_cdc_sink"."public"."my_db_public_tbl2" +SET applied=true +FROM hlc_min +WHERE (nanos,logical) = (n, l) +AND (nanos, logical, key) > ($1, $2, ($5::STRING[])[3]) +AND NOT applied +ORDER BY nanos, logical, key +LIMIT 10000 +RETURNING nanos, logical, key, mut, before), +data_3 AS ( +UPDATE "_cdc_sink"."public"."my_db_public_tbl3" +SET applied=true +FROM hlc_min +WHERE (nanos,logical) = (n, l) +AND (nanos, logical, key) > ($1, $2, ($5::STRING[])[4]) +AND NOT applied +ORDER BY nanos, logical, key +LIMIT 10000 +RETURNING nanos, logical, key, mut, before) +SELECT * FROM ( +SELECT 0 idx, nanos, logical, key, mut, before FROM data_0 UNION ALL +SELECT 1 idx, nanos, logical, key, mut, before FROM data_1 UNION ALL +SELECT 2 idx, nanos, logical, key, mut, before FROM data_2 UNION ALL +SELECT 3 idx, nanos, logical, key, mut, before FROM data_3) +ORDER BY nanos, logical, idx, key \ No newline at end of file diff --git a/internal/staging/version/versions.go b/internal/staging/version/versions.go index c9e7d9a53..b7feaca23 100644 --- a/internal/staging/version/versions.go +++ b/internal/staging/version/versions.go @@ -41,6 +41,7 @@ import ( var Versions = []Version{ {"Add versions table", 400}, {"Support single-level schema namespaces", 389}, + {"Track applied in staging table", 572}, } // A Version describes a breaking change in the cdc-sink metadata diff --git a/internal/types/types.go b/internal/types/types.go index 1b82f8292..5e3abe7b2 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -153,56 +153,71 @@ type Stager interface { // not occur within a single database transaction. Retire(ctx context.Context, db StagingQuerier, end hlc.Time) error - // Select will return all queued mutations between the timestamps. - Select(ctx context.Context, tx StagingQuerier, prev, next hlc.Time) ([]Mutation, error) - - // SelectPartial will return queued mutations between the - // timestamps. The after and limit arguments are used together when - // backfilling large amounts of data. - SelectPartial(ctx context.Context, tx StagingQuerier, prev, next hlc.Time, afterKey []byte, limit int) ([]Mutation, error) - // Store implementations should be idempotent. Store(ctx context.Context, db StagingQuerier, muts []Mutation) error - - // TransactionTimes returns distinct timestamps in the range - // (after, before] for which there is data in the associated table. - TransactionTimes(ctx context.Context, tx StagingQuerier, before, after hlc.Time) ([]hlc.Time, error) } -// SelectManyCallback is provided to Stagers.SelectMany to receive the +// UnstageCallback is provided to Stagers.SelectMany to receive the // incoming data. -type SelectManyCallback func(ctx context.Context, tbl ident.Table, mut Mutation) error - -// SelectManyCursor is used with Stagers.SelectMany. The After values -// will be updated by the method, allowing callers to call SelectMany -// in a loop until fewer than Limit values are returned. -type SelectManyCursor struct { - Start, End hlc.Time - Targets [][]ident.Table // The outer slice defines FK groupings. - Limit int - - // If true, we read all updates for parent tables before children, - // but make no guarantees around transactional boundaries. If false, - // we read some number of individual MVCC timestamps in their - // entirety. - Backfill bool - - OffsetKey json.RawMessage - OffsetTable ident.Table - OffsetTime hlc.Time +type UnstageCallback func(ctx context.Context, tbl ident.Table, mut Mutation) error + +// UnstageCursor is used with Stagers.SelectMany. The After values +// will be updated by the method, allowing callers to call Unstage +// in a loop until it returns false. +type UnstageCursor struct { + // A half-open interval: [ StartAt, EndBefore ) + StartAt, EndBefore hlc.Time + + // StartAfterKey is used when processing very large batches that + // occur within a single timestamp, to provide an additional offset + // for skipping already-processed rows. The implementation of + // [Stagers.SelectMany] will automatically populate this field. + StartAfterKey ident.TableMap[json.RawMessage] + + // Targets defines the order in which data for the selected tables + // will be passed to the results callback. + Targets []ident.Table + + // Limit the number of distinct MVCC timestamps that are returned. + // This will default to a single timestamp if unset. + TimestampLimit int + + // Limit the number of rows that are selected from any given staging + // table. The total number of rows that can be emitted is the limit + // multiplied by the number of tables listed in Targets. This will + // return all rows within the selected timestamp(s) if unset. + UpdateLimit int +} + +// Copy returns a copy of the cursor so that it may be updated. +func (c *UnstageCursor) Copy() *UnstageCursor { + cpy := &UnstageCursor{ + StartAt: c.StartAt, + EndBefore: c.EndBefore, + Targets: make([]ident.Table, len(c.Targets)), + TimestampLimit: c.TimestampLimit, + UpdateLimit: c.UpdateLimit, + } + c.StartAfterKey.CopyInto(&cpy.StartAfterKey) + copy(cpy.Targets, c.Targets) + return cpy } // Stagers is a factory for Stager instances. type Stagers interface { Get(ctx context.Context, target ident.Table) (Stager, error) - // SelectMany performs queries across multiple staging tables, to - // more readily support backfilling large amounts of data that may - // result from a changefeed's initial_scan. + // Unstage implements an exactly-once behavior of reading staged + // mutations. + // + // This method returns true if at least one row was returned. It + // will also return an updated cursor that allows a caller to resume + // reading. // - // This method will update the fields within the cursor so that it - // can be used to restart in case of interruption. - SelectMany(ctx context.Context, tx StagingQuerier, q *SelectManyCursor, fn SelectManyCallback) error + // Rows will be emitted in (time, table, key) order. As such, + // transaction boundaries can be detected by looking for a change in + // the timestamp values. + Unstage(ctx context.Context, tx StagingQuerier, q *UnstageCursor, fn UnstageCallback) (*UnstageCursor, bool, error) } // ColData hold SQL column metadata. diff --git a/migrations/pr_572.md b/migrations/pr_572.md new file mode 100644 index 000000000..d994a72f0 --- /dev/null +++ b/migrations/pr_572.md @@ -0,0 +1,40 @@ +# PR 572 Track Applied Mutations + +[PR 572](https://github.com/cockroachdb/cdc-sink/pull/572) + +Breaking schema change: + +Each staging table gains a new `applied` column which cdc-sink will use +to suppress duplicate updates during process restarts. The resolver loop +continues to save resolved-timestamp checkpoints, but partial progress +within a resolved-timestamp window is handled through the `applied` +column. + +Migration: +* Run the following `ALTER TABLE` command on each staging table before + upgrading cdc-sink: + +```sql +ALTER TABLE _cdc_sink.my_db_public_my_table +ADD COLUMN applied BOOL NOT NULL DEFAULT false + CREATE FAMILY hot, +ADD COLUMN source_time TIMESTAMPTZ + AS (to_timestamp(nanos::float/1e9)) VIRTUAL; +``` +* Mark the migration as complete: +```sql +UPSERT INTO _cdc_sink.memo (key, value) VALUES ('version-572','{"state":"applied"}'); +``` +* Stop all instances of cdc-sink, upgrade, and restart. +* cdc-sink will no longer delete un-applied mutations that may be before + the resolved-timestamp frontier. The mutations that were present + before the upgrade can either be manually deleted or marked as applied: + +```sql +UPDATE _cdc_sink.my_db_public_my_table + SET applied = true +WHERE nanos < ( + SELECT max(source_nanos) + FROM _cdc_sink.resolved_timestamps + WHERE target_applied_at IS NOT NULL) +```