diff --git a/internal/sinktest/all/fixture.go b/internal/sinktest/all/fixture.go index f7c4fde35..31f525f80 100644 --- a/internal/sinktest/all/fixture.go +++ b/internal/sinktest/all/fixture.go @@ -26,7 +26,9 @@ import ( "github.com/cockroachdb/cdc-sink/internal/types" "github.com/cockroachdb/cdc-sink/internal/util/applycfg" "github.com/cockroachdb/cdc-sink/internal/util/diag" + "github.com/cockroachdb/cdc-sink/internal/util/hlc" "github.com/cockroachdb/cdc-sink/internal/util/ident" + "github.com/jackc/pgx/v5" "github.com/pkg/errors" ) @@ -76,3 +78,33 @@ func (f *Fixture) CreateTargetTable( } return ti, err } + +// PeekStaged peeks at the data which has been staged for the target +// table between the given timestamps. +func (f *Fixture) PeekStaged( + ctx context.Context, tbl ident.Table, startAt, endBefore hlc.Time, +) ([]types.Mutation, error) { + tx, err := f.StagingPool.BeginTx(ctx, pgx.TxOptions{}) + if err != nil { + return nil, errors.WithStack(err) + } + defer func() { _ = tx.Rollback(ctx) }() + + q := &types.UnstageCursor{ + StartAt: startAt, + EndBefore: endBefore, + Targets: []ident.Table{tbl}, + } + var ret []types.Mutation + for selecting := true; selecting; { + q, selecting, err = f.Stagers.Unstage(ctx, tx, q, + func(ctx context.Context, tbl ident.Table, mut types.Mutation) error { + ret = append(ret, mut) + return nil + }) + if err != nil { + return nil, err + } + } + return ret, nil +} diff --git a/internal/source/cdc/config.go b/internal/source/cdc/config.go index 97e98c3d0..c0cbfdde1 100644 --- a/internal/source/cdc/config.go +++ b/internal/source/cdc/config.go @@ -27,10 +27,12 @@ import ( ) const ( - defaultBackupPolling = 100 * time.Millisecond - defaultFlushBatchSize = 1_000 - defaultSelectBatchSize = 10_000 - defaultNDJsonBuffer = bufio.MaxScanTokenSize // 64k + defaultBackupPolling = 100 * time.Millisecond + defaultIdealBatchSize = 1000 + defaultMetaTable = "resolved_timestamps" + defaultLargeTransactionLimit = 250_000 // Chosen arbitrarily. + defaultNDJsonBuffer = bufio.MaxScanTokenSize // 64k + defaultTimestampWindowSize = 1000 ) // Config adds CDC-specific configuration to the core logical loop. @@ -43,27 +45,31 @@ type Config struct { // timestamps will receive the incoming resolved-timestamp message. BackupPolling time.Duration - // Don't coalesce updates from different source MVCC timestamps into - // a single destination transaction. Setting this will preserve the - // original structure of updates from the source, but may decrease - // overall throughput by increasing the total number of target - // database transactions. + // If true, the resolver loop will behave as though + // TimestampWindowSize has been set to 1. That is, each unique + // timestamp within a resolved-timestamp window will be processed, + // instead of fast-forwarding to the latest values within a batch. + // This flag is relevant for use-cases that employ a merge function + // which must see every incremental update in order to produce a + // meaningful result. FlushEveryTimestamp bool - // Coalesce timestamps within a resolved-timestamp window until - // at least this many mutations have been collected. + // This setting controls a soft maximum on the number of rows that + // will be flushed in one target transaction. IdealFlushBatchSize int - // The maximum amount of data to buffer when reading a single line - // of ndjson input. This can be increased if the source cluster - // has large blob values. - NDJsonBuffer int + // If non-zero and the number of rows in a single timestamp exceeds + // this value, allow that source transaction to be applied over + // multiple target transactions. + LargeTransactionLimit int // The name of the resolved_timestamps table. MetaTableName ident.Ident - // The number of rows to retrieve when loading staged data. - SelectBatchSize int + // The maximum amount of data to buffer when reading a single line + // of ndjson input. This can be increased if the source cluster + // has large blob values. + NDJsonBuffer int // Retain staged, applied data for an extra amount of time. This // allows, for example, additional time to validate what was staged @@ -71,6 +77,13 @@ type Config struct { // be retired as soon as there is a resolved timestamp greater than // the timestamp of the mutation. RetireOffset time.Duration + + // The maximum number of source transactions to unstage at once. + // This does not place a hard limit on the number of mutations that + // may be dequeued at once, but it does reduce the total number of + // round-trips to the staging database when the average transaction + // size is small. + TimestampWindowSize int } // Bind adds configuration flags to the set. @@ -80,19 +93,27 @@ func (c *Config) Bind(f *pflag.FlagSet) { f.DurationVar(&c.BackupPolling, "backupPolling", defaultBackupPolling, "poll for resolved timestamps from other instances of cdc-sink") f.BoolVar(&c.FlushEveryTimestamp, "flushEveryTimestamp", false, - "preserve intermediate updates from the source in transactional mode; "+ + "don't fast-forward to the latest row values within a resolved timestamp; "+ "may negatively impact throughput") - f.IntVar(&c.IdealFlushBatchSize, "idealFlushBatchSize", defaultFlushBatchSize, - "try to apply at least this many mutations per resolved-timestamp window") + f.IntVar(&c.IdealFlushBatchSize, "idealFlushBatchSize", defaultIdealBatchSize, + "a soft limit on the number of rows to send to the target at once") + f.IntVar(&c.LargeTransactionLimit, "largeTransactionLimit", defaultLargeTransactionLimit, + "if non-zero, all source transactions with more than this "+ + "number of rows may be applied in multiple target transactions") f.IntVar(&c.NDJsonBuffer, "ndjsonBufferSize", defaultNDJsonBuffer, "the maximum amount of data to buffer while reading a single line of ndjson input; "+ "increase when source cluster has large blob values") - f.Var(ident.NewValue("resolved_timestamps", &c.MetaTableName), "metaTable", + f.Var(ident.NewValue(defaultMetaTable, &c.MetaTableName), "metaTable", "the name of the table in which to store resolved timestamps") - f.IntVar(&c.SelectBatchSize, "selectBatchSize", defaultSelectBatchSize, - "the number of rows to select at once when reading staged data") f.DurationVar(&c.RetireOffset, "retireOffset", 0, - "retain staged, applied data for an extra duration") + "if non-zero, retain staged, applied data for an extra duration") + f.IntVar(&c.TimestampWindowSize, "timestampWindowSize", defaultTimestampWindowSize, + "the maximum number of source transaction timestamps to unstage at once") + + var deprecated int + const msg = "replaced by --largeTransactionLimit and --timestampWindowSize" + f.IntVar(&deprecated, "selectBatchSize", 0, "") + _ = f.MarkDeprecated("selectBatchSize", msg) } // Preflight implements logical.Config. @@ -105,20 +126,26 @@ func (c *Config) Preflight() error { c.BackupPolling = defaultBackupPolling } if c.IdealFlushBatchSize == 0 { - c.IdealFlushBatchSize = defaultFlushBatchSize + c.IdealFlushBatchSize = defaultIdealBatchSize + } + // Zero is legal; implies no limit. + if c.LargeTransactionLimit < 0 { + return errors.New("largeTransactionLimit must be >= 0") } if c.NDJsonBuffer == 0 { c.NDJsonBuffer = defaultNDJsonBuffer } if c.MetaTableName.Empty() { - return errors.New("no metadata table specified") - } - if c.SelectBatchSize == 0 { - c.SelectBatchSize = defaultSelectBatchSize + c.MetaTableName = ident.New(defaultMetaTable) } if c.RetireOffset < 0 { return errors.New("retireOffset must be >= 0") } + if c.TimestampWindowSize < 0 { + return errors.New("timestampWindowSize must be >= 0") + } else if c.TimestampWindowSize == 0 { + c.TimestampWindowSize = defaultTimestampWindowSize + } return nil } diff --git a/internal/source/cdc/handler_test.go b/internal/source/cdc/handler_test.go index bf28e873d..77f0d091d 100644 --- a/internal/source/cdc/handler_test.go +++ b/internal/source/cdc/handler_test.go @@ -160,10 +160,8 @@ func testQueryHandler(t *testing.T, htc *fixtureConfig) { // Verify staged data, if applicable. if !htc.immediate { - stager, err := fixture.Stagers.Get(ctx, tableInfo.Name()) + muts, err := fixture.PeekStaged(ctx, tableInfo.Name(), hlc.Zero(), hlc.New(1, 1)) if a.NoError(err) { - muts, err := stager.Select(ctx, fixture.StagingPool.Pool, hlc.Zero(), hlc.New(1, 0)) - a.NoError(err) a.Len(muts, 2) // The order is stable since the underlying query // is ordered, in part, by the key. @@ -229,40 +227,14 @@ func testQueryHandler(t *testing.T, htc *fixtureConfig) { `), })) - // Verify staged data, if applicable. + // Verify staged and un-applied data, if applicable. if !htc.immediate { - stager, err := fixture.Stagers.Get(ctx, tableInfo.Name()) + muts, err := fixture.PeekStaged(ctx, tableInfo.Name(), hlc.Zero(), hlc.New(10, 1)) if a.NoError(err) { - muts, err := stager.Select(ctx, fixture.StagingPool.Pool, hlc.Zero(), hlc.New(10, 0)) - a.NoError(err) - a.Len(muts, 6) + a.Len(muts, 2) // The order is stable since the underlying query // orders by HLC and key. a.Equal([]types.Mutation{ - // Original entries. - { - Data: []byte(`{"pk":42,"v":99}`), - Key: []byte(`[42]`), - Time: hlc.New(1, 0), - }, - { - Before: []byte(`{"pk":99,"v":33}`), - Data: []byte(`{"pk":99,"v":42}`), - Key: []byte(`[99]`), - Time: hlc.New(1, 0), - }, - // These are the deletes from above. - { - Data: []byte(`null`), - Key: []byte(`[42]`), - Time: hlc.New(3, 0), - }, - { - Data: []byte(`null`), - Key: []byte(`[99]`), - Time: hlc.New(3, 0), - }, - // These are the entries we just created. { Data: []byte(`{"pk":42,"v":99}`), Key: []byte(`[42]`), @@ -384,10 +356,8 @@ func testHandler(t *testing.T, cfg *fixtureConfig) { // Verify staged data, if applicable. if !cfg.immediate { - stager, err := fixture.Stagers.Get(ctx, jumbleName()) + muts, err := fixture.PeekStaged(ctx, tableInfo.Name(), hlc.Zero(), hlc.New(1, 1)) if a.NoError(err) { - muts, err := stager.Select(ctx, fixture.StagingPool.Pool, hlc.Zero(), hlc.New(1, 0)) - a.NoError(err) a.Len(muts, 2) // The order is stable since the underlying query // orders, in part, on key @@ -454,40 +424,14 @@ func testHandler(t *testing.T, cfg *fixtureConfig) { `, jumbleName().Table())), })) - // Verify staged data, if applicable. + // Verify staged and as-yet-unapplied data. if !cfg.immediate { - stager, err := fixture.Stagers.Get(ctx, jumbleName()) + muts, err := fixture.PeekStaged(ctx, tableInfo.Name(), hlc.Zero(), hlc.New(10, 1)) if a.NoError(err) { - muts, err := stager.Select(ctx, fixture.StagingPool.Pool, hlc.Zero(), hlc.New(10, 0)) - a.NoError(err) - a.Len(muts, 6) + a.Len(muts, 2) // The order is stable since the underlying query // orders by HLC and key. a.Equal([]types.Mutation{ - // Original entries. - { - Data: []byte(`{ "pk" : 42, "v" : 99 }`), - Key: []byte(`[ 42 ]`), - Time: hlc.New(1, 0), - }, - { - Before: []byte(`{ "pk" : 99, "v" : 33 }`), - Data: []byte(`{ "pk" : 99, "v" : 42 }`), - Key: []byte(`[ 99 ]`), - Time: hlc.New(1, 0), - }, - // These are the deletes from above. - { - Data: []byte(`null`), - Key: []byte(`[ 42 ]`), - Time: hlc.New(3, 0), - }, - { - Data: []byte(`null`), - Key: []byte(`[ 99 ]`), - Time: hlc.New(3, 0), - }, - // These are the entries we just created. { Data: []byte(`{ "pk" : 42, "v" : 99 }`), Key: []byte(`[ 42 ]`), @@ -655,6 +599,7 @@ func TestConcurrentHandlers(t *testing.T) { func testMassBackfillWithForeignKeys( t *testing.T, rowCount, fixtureCount int, fns ...func(*Config), ) { + t.Parallel() // We spend most of our time waiting for a resolved timestamp. r := require.New(t) baseFixture, cancel, err := all.NewFixture() @@ -675,9 +620,9 @@ func testMassBackfillWithForeignKeys( for idx := range fixtures { cfg := &Config{ - IdealFlushBatchSize: 123, // Pick weird batch sizes. - SelectBatchSize: 587, - MetaTableName: ident.New("resolved_timestamps"), + LargeTransactionLimit: rowCount / 3, // Read the same timestamp more than once. + MetaTableName: ident.New("resolved_timestamps"), + TimestampWindowSize: 100, BaseConfig: logical.BaseConfig{ ApplyTimeout: time.Second, FanShards: 16, diff --git a/internal/source/cdc/resolved_stamp.go b/internal/source/cdc/resolved_stamp.go index fe84ee5e7..95126ed0a 100644 --- a/internal/source/cdc/resolved_stamp.go +++ b/internal/source/cdc/resolved_stamp.go @@ -28,47 +28,26 @@ import ( ) // resolvedStamp tracks the progress of applying staged mutations for -// a given resolved timestamp. This type has some extra complexity to -// support backfilling of initial changefeed scans. -// -// Here's a state diagram to help: -// -// committed -// | *---* -// V V | -// proposed ---> backfill -* -// | | -// V | -// committed <----------* -// -// Tracking of FK levels and table offsets is only used in backfill mode -// and is especially relevant to an initial_scan from a changefeed. In -// this state, we have an arbitrarily large number of mutations to -// apply, all of which will have the same timestamp. -// -// Each resolvedStamp is associated with a separate database transaction -// that holds a SELECT FOR UPDATE lock on the row that holds the -// timestamp being resolved. The ownership of this transaction is -// transferred to any derived resolvedStamp that represents partial -// progress within a resolved timestamp. The resolved-timestamp -// transaction is wrapped in a txguard.Guard to ensure that it is kept -// alive. If the resolved-timestamp transaction fails for any reason, -// we can roll back to the previously committed stamp through the usual -// logical-loop error-handling code. +// a given resolved timestamp. +// Partial progress of very large batches is maintained in the staging +// database by tracking a per-mutation applied flag. type resolvedStamp struct { - Backfill bool `json:"b,omitempty"` // A resolved timestamp that represents a transactionally-consistent // point in the history of the workload. CommittedTime hlc.Time `json:"c,omitempty"` // Iteration is used to provide well-ordered behavior within a - // single backfill window. + // single backfill window. Partial progress is maintained in the + // staging tables in case cdc-sink is interrupted in the middle + // of processing a large window. Iteration int `json:"i,omitempty"` + // LargeBatchOffset allows us to jump over already-applied keys when + // processing a large batch of data (e.g. an initial backfill). This + // is merely a performance optimization, since the staging tables + // are used to track whether or not any particular mutation has been + // applied. + LargeBatchOffset *ident.TableMap[json.RawMessage] `json:"off,omitempty"` // The next resolved timestamp that we want to advance to. ProposedTime hlc.Time `json:"p,omitempty"` - - OffsetKey json.RawMessage `json:"ok,omitempty"` - OffsetTable ident.Table `json:"otbl,omitempty"` - OffsetTime hlc.Time `json:"ots,omitempty"` } // AsTime implements logical.TimeStamp to improve reporting. @@ -106,47 +85,30 @@ func (s *resolvedStamp) NewProposed(proposed hlc.Time) (*resolvedStamp, error) { return nil, errors.Errorf("proposed cannot roll back committed time: %s vs %s", proposed, s.CommittedTime) } - if hlc.Compare(proposed, s.OffsetTime) < 0 { - return nil, errors.Errorf("proposed time undoing work: %s vs %s", proposed, s.OffsetTime) - } if hlc.Compare(proposed, s.ProposedTime) < 0 { return nil, errors.Errorf("proposed time cannot go backward: %s vs %s", proposed, s.ProposedTime) } return &resolvedStamp{ - Backfill: s.Backfill, CommittedTime: s.CommittedTime, Iteration: s.Iteration + 1, - OffsetKey: s.OffsetKey, - OffsetTable: s.OffsetTable, - OffsetTime: s.OffsetTime, ProposedTime: proposed, }, nil } // NewProgress returns a resolvedStamp that represents partial progress // within the same [committed, proposed] window. -func (s *resolvedStamp) NewProgress(cursor *types.SelectManyCursor) *resolvedStamp { +func (s *resolvedStamp) NewProgress(cursor *types.UnstageCursor) *resolvedStamp { ret := &resolvedStamp{ - Backfill: s.Backfill, CommittedTime: s.CommittedTime, Iteration: s.Iteration + 1, - OffsetKey: s.OffsetKey, - OffsetTable: s.OffsetTable, - OffsetTime: s.OffsetTime, ProposedTime: s.ProposedTime, } - if cursor != nil { - ret.OffsetTime = cursor.OffsetTime - - // Only Key and Table make sense to retain in a backfill. For - // transactional mode, we always want to restart at a specific - // timestamp. - if s.Backfill { - ret.OffsetKey = cursor.OffsetKey - ret.OffsetTable = cursor.OffsetTable - } + // Record offsets to allow us to skip rows within the next query. + if cursor != nil && cursor.StartAfterKey.Len() > 0 { + ret.LargeBatchOffset = &ident.TableMap[json.RawMessage]{} + cursor.StartAfterKey.CopyInto(ret.LargeBatchOffset) } return ret diff --git a/internal/source/cdc/resolver.go b/internal/source/cdc/resolver.go index 2597bb027..c7a148702 100644 --- a/internal/source/cdc/resolver.go +++ b/internal/source/cdc/resolver.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cdc-sink/internal/util/hlc" "github.com/cockroachdb/cdc-sink/internal/util/ident" "github.com/cockroachdb/cdc-sink/internal/util/notify" + "github.com/cockroachdb/cdc-sink/internal/util/retry" "github.com/cockroachdb/cdc-sink/internal/util/stamp" "github.com/cockroachdb/cdc-sink/internal/util/stopper" "github.com/jackc/pgx/v5" @@ -164,18 +165,18 @@ func (r *resolver) Mark(ctx context.Context, ts hlc.Time) error { func (r *resolver) BackfillInto( ctx context.Context, ch chan<- logical.Message, state logical.State, ) error { - return r.readInto(ctx, ch, state, true) + return r.readInto(ctx, ch, state) } // ReadInto implements logical.Dialect. func (r *resolver) ReadInto( ctx context.Context, ch chan<- logical.Message, state logical.State, ) error { - return r.readInto(ctx, ch, state, false) + return r.readInto(ctx, ch, state) } func (r *resolver) readInto( - ctx context.Context, ch chan<- logical.Message, state logical.State, backfill bool, + ctx context.Context, ch chan<- logical.Message, state logical.State, ) error { // This will either be from a previous iteration or ZeroStamp. cp, cpUpdated := state.GetConsistentPoint() @@ -211,7 +212,7 @@ func (r *resolver) readInto( } else { // We're looping around with a committed timestamp, so // look for work to do and send it. - proposed, err := r.nextProposedStamp(ctx, resumeFrom, backfill) + proposed, err := r.nextProposedStamp(ctx, resumeFrom) switch { case err == nil: log.WithFields(log.Fields{ @@ -286,7 +287,7 @@ func (r *resolver) readInto( // may be one of the sentinel values errNoWork or errBlocked that are // returned by selectTimestamp. func (r *resolver) nextProposedStamp( - ctx context.Context, prev *resolvedStamp, backfill bool, + ctx context.Context, prev *resolvedStamp, ) (*resolvedStamp, error) { // Find the next resolved timestamp to apply, starting from // a timestamp known to be committed. @@ -301,11 +302,6 @@ func (r *resolver) nextProposedStamp( return nil, err } - // Propagate the backfilling flag. - if backfill { - ret.Backfill = true - } - return ret, nil } @@ -353,8 +349,7 @@ func (r *resolver) ZeroStamp() stamp.Stamp { } // process makes incremental progress in fulfilling the given -// resolvedStamp. It returns the state to which the resolved timestamp -// has been advanced. +// resolvedStamp. func (r *resolver) process(ctx context.Context, rs *resolvedStamp, events logical.Events) error { start := time.Now() targets := r.watcher.Get().Order @@ -363,135 +358,193 @@ func (r *resolver) process(ctx context.Context, rs *resolvedStamp, events logica return errors.Errorf("no tables known in schema %s; have they been created?", r.target) } - cursor := &types.SelectManyCursor{ - Backfill: rs.Backfill, - End: rs.ProposedTime, - Limit: r.cfg.SelectBatchSize, - OffsetKey: rs.OffsetKey, - OffsetTable: rs.OffsetTable, - OffsetTime: rs.OffsetTime, - Start: rs.CommittedTime, - Targets: targets, + flattened := make([]ident.Table, 0, len(targets)) + for _, tgts := range targets { + flattened = append(flattened, tgts...) + } + + cursor := &types.UnstageCursor{ + StartAt: rs.CommittedTime, + EndBefore: rs.ProposedTime, + Targets: flattened, + TimestampLimit: r.cfg.TimestampWindowSize, + UpdateLimit: r.cfg.LargeTransactionLimit, + } + if rs.LargeBatchOffset != nil && rs.LargeBatchOffset.Len() > 0 { + rs.LargeBatchOffset.CopyInto(&cursor.StartAfterKey) } - source := script.SourceName(r.target) - flush := func(toApply *ident.TableMap[[]types.Mutation], final bool) error { + flush := func(toApply *ident.TableMap[[]types.Mutation]) error { flushStart := time.Now() ctx, cancel := context.WithTimeout(ctx, r.cfg.ApplyTimeout) defer cancel() - // Apply the retrieved data. - batch, err := events.OnBegin(ctx) - if err != nil { - return err - } - defer func() { _ = batch.OnRollback(ctx) }() - - for _, tables := range targets { - for _, table := range tables { - muts := toApply.GetZero(table) - if len(muts) == 0 { - continue - } - if err := batch.OnData(ctx, source, table, muts); err != nil { - return err - } - } - } + retries := -1 + if err := retry.Retry(ctx, func(ctx context.Context) error { + retries++ - // OnCommit is asynchronous to support (a future) - // unified-immediate mode. We'll wait for the data to be - // committed. - select { - case err := <-batch.OnCommit(ctx): + // Apply the retrieved data. + batch, err := events.OnBegin(ctx) if err != nil { return err } - case <-ctx.Done(): - return ctx.Err() - } + defer func() { _ = batch.OnRollback(ctx) }() - // Advance and save the stamp once the flush has completed. - if final { - rs, err = rs.NewCommitted() - if err != nil { - return err + for _, tables := range targets { + for _, table := range tables { + muts := toApply.GetZero(table) + if len(muts) == 0 { + continue + } + source := script.SourceName(r.target) + muts = append([]types.Mutation(nil), muts...) + if err := batch.OnData(ctx, source, table, muts); err != nil { + return err + } + } } - // Mark the timestamp has being processed. - if err := r.Record(ctx, rs.CommittedTime); err != nil { + + // OnCommit is asynchronous to support (a future) + // unified-immediate mode. We'll wait for the data to be + // committed. + select { + case err := <-batch.OnCommit(ctx): return err + case <-ctx.Done(): + return ctx.Err() } - } else { - rs = rs.NewProgress(cursor) - } - if err := events.SetConsistentPoint(ctx, rs); err != nil { + }); err != nil { return err } log.WithFields(log.Fields{ "duration": time.Since(flushStart), "schema": r.target, + "retries": retries, }).Debugf("flushed mutations") return nil } - var epoch hlc.Time - flushCounter := 0 - toApply := &ident.TableMap[[]types.Mutation]{} - total := 0 - if err := r.stagers.SelectMany(ctx, r.pool, cursor, - func(ctx context.Context, tbl ident.Table, mut types.Mutation) error { - // Check for flush before accumulating. - var needsFlush bool - if rs.Backfill { - // We're receiving data in table-order. Just read data - // and flush it once we hit our soft limit, since we - // can resume at any point. - needsFlush = flushCounter >= r.cfg.IdealFlushBatchSize - } else if r.cfg.FlushEveryTimestamp { - // The user wants to preserve all intermediate updates - // to a row, rather than fast-forwarding the values of a - // row to the latest transactionally-consistent state. - // We'll flush on every MVCC boundary change. - needsFlush = epoch != hlc.Zero() && hlc.Compare(mut.Time, epoch) > 0 - } else { - // We're receiving data ordered by MVCC timestamp. Flush - // data when we see a new epoch after accumulating a - // minimum number of mutations. This increases - // throughput when there are many single-row - // transactions in the source database. - needsFlush = flushCounter >= r.cfg.IdealFlushBatchSize && - hlc.Compare(mut.Time, epoch) > 0 + total := 0 // Metrics. + + // We run in a loop until the attempt to unstage returns no more + // data (i.e. we've consumed all mutations within the resolving + // window). If a LargeBatchLimit is configured, we may wind up + // calling flush() several times within a particularly large MVCC + // timestamp. + for hadData := true; hadData; { + if err := retry.Retry(ctx, func(ctx context.Context) error { + unstageTX, err := r.pool.BeginTx(ctx, pgx.TxOptions{}) + if err != nil { + return errors.Wrap(err, "could not open staging transaction") } - if needsFlush { - if err := flush(toApply, false /* interim */); err != nil { - return err - } - total += flushCounter - flushCounter = 0 - // Reset the slices in toApply; flush() ignores zero-length entries. - _ = toApply.Range(func(tbl ident.Table, muts []types.Mutation) error { - toApply.Put(tbl, muts[:0]) + defer func() { _ = unstageTX.Rollback(ctx) }() + + var epoch hlc.Time + var nextCursor *types.UnstageCursor + pendingMutations := 0 + toApply := &ident.TableMap[[]types.Mutation]{} + + nextCursor, hadData, err = r.stagers.Unstage(ctx, unstageTX, cursor, + func(ctx context.Context, tbl ident.Table, mut types.Mutation) error { + total++ + + // Decorate the mutation. + script.AddMeta("cdc", tbl, &mut) + + if epoch == hlc.Zero() { + epoch = mut.Time + } else if r.cfg.FlushEveryTimestamp || + pendingMutations >= r.cfg.IdealFlushBatchSize { + // If the user wants to see all intermediate + // values for a row, we'll flush whenever + // there's a change in the timestamp value. We + // may also preemptively flush to prevent + // unbounded memory use. + if hlc.Compare(mut.Time, epoch) > 0 { + if err := flush(toApply); err != nil { + return err + } + epoch = mut.Time + pendingMutations = 0 + toApply = &ident.TableMap[[]types.Mutation]{} + } + } else if hlc.Compare(mut.Time, epoch) < 0 { + // This would imply a coding error in Unstage. + return errors.New("epoch going backwards") + } + + // Accumulate the mutation. + toApply.Put(tbl, append(toApply.GetZero(tbl), mut)) + pendingMutations++ return nil }) + if err != nil { + return errors.Wrap(err, "could not unstage mutations") } - flushCounter++ - script.AddMeta("cdc", tbl, &mut) - toApply.Put(tbl, append(toApply.GetZero(tbl), mut)) - epoch = mut.Time + if hadData { + // Final flush of the data that was unstaged. It's + // possible, although unlikely, that the final callback + // from Unstage performed a flush. If this were to + // happen, we don't want to perform a no-work flush. + if pendingMutations > 0 { + if err := flush(toApply); err != nil { + return err + } + } + + // Commit the transaction that unstaged the data. Note that + // without an X/A transaction model, there's always a + // possibility where we've committed the target transaction + // and then the unstaging transaction fails to commit. In + // general, we expect that re-applying a mutation within a + // short timeframe should be idempotent, although this might + // not necessarily be the case for data-aware merge + // functions in the absence of conflict-free datatypes. We + // could choose to keep a manifest of (table, key, + // timestamp) entries in the target database to filter + // repeated updates. + // https://github.com/cockroachdb/cdc-sink/issues/565 + if err := unstageTX.Commit(ctx); err != nil { + return errors.Wrap(err, "could not commit unstaging transaction; "+ + "mutations may be reapplied") + } + + // If we're going to continue around, update the + // consistent point with partial progress. This ensures + // that cdc-sink can make progress on large batches, + // even if it's restarted. + cursor = nextCursor + rs = rs.NewProgress(cursor) + } else { + // We read no data, which means that we're done. We'll + // let the unstaging transaction be rolled back by the + // defer above since it performed no actual work in the + // database. + rs, err = rs.NewCommitted() + if err != nil { + return err + } + // Mark the timestamp has being processed. + if err := r.Record(ctx, rs.CommittedTime); err != nil { + return err + } + } + + // Save the consistent point to be able to resume if the + // cdc-sink process is stopped. + if err := events.SetConsistentPoint(ctx, rs); err != nil { + return err + } return nil }); err != nil { - return err + return err + } } - // Final flush cycle to commit the final stamp. - if err := flush(toApply, true /* final */); err != nil { - return err - } - total += flushCounter log.WithFields(log.Fields{ "committed": rs.CommittedTime, "count": total, diff --git a/internal/source/server/integration_test.go b/internal/source/server/integration_test.go index 488738748..eccc5096e 100644 --- a/internal/source/server/integration_test.go +++ b/internal/source/server/integration_test.go @@ -32,7 +32,6 @@ import ( "github.com/cockroachdb/cdc-sink/internal/source/logical" jwtAuth "github.com/cockroachdb/cdc-sink/internal/staging/auth/jwt" "github.com/cockroachdb/cdc-sink/internal/util/diag" - "github.com/cockroachdb/cdc-sink/internal/util/hlc" "github.com/cockroachdb/cdc-sink/internal/util/ident" "github.com/cockroachdb/cdc-sink/internal/util/stdlogical" "github.com/cockroachdb/cdc-sink/internal/util/stopper" @@ -312,45 +311,6 @@ func testIntegration(t *testing.T, cfg testConfig) { time.Sleep(100 * time.Millisecond) } - if !cfg.immediate { - t.Run("inspect_staged_data", func(t *testing.T) { - a := assert.New(t) - r := require.New(t) - - stager, err := targetFixture.Stagers.Get(ctx, target) - r.NoError(err) - - // Just load all staged data. - muts, err := stager.Select(ctx, targetFixture.StagingPool, - hlc.Zero(), hlc.New(time.Now().UnixNano(), 0)) - r.NoError(err) - r.Len(muts, 3) // Two inserts and one update. - - // Classify the mutation's before value as empty/null or - // containing an object. - var objectCount, nullCount int - for _, mut := range muts { - before := string(mut.Before) - if len(before) == 0 || before == "null" { - nullCount++ - } else if len(before) > 0 && before[0] == '{' { - objectCount++ - } else { - r.Fail("unexpected before value", before) - } - } - - if cfg.diff { - a.Equal(1, objectCount) // One update. - a.Equal(2, nullCount) // Two inserts. - } else { - // We don't expect to see any diff data. - a.Equal(0, objectCount) - a.Equal(3, nullCount) - } - }) - } - metrics, err := prometheus.DefaultGatherer.Gather() a.NoError(err) log.WithField("metrics", metrics).Trace() diff --git a/internal/staging/stage/factory.go b/internal/staging/stage/factory.go index f93de0815..e35843a50 100644 --- a/internal/staging/stage/factory.go +++ b/internal/staging/stage/factory.go @@ -18,18 +18,14 @@ package stage import ( "context" - "fmt" - "strings" + "encoding/json" "sync" - "time" "github.com/cockroachdb/cdc-sink/internal/types" "github.com/cockroachdb/cdc-sink/internal/util/hlc" "github.com/cockroachdb/cdc-sink/internal/util/ident" "github.com/cockroachdb/cdc-sink/internal/util/stopper" - "github.com/jackc/pgx/v5" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" ) type factory struct { @@ -74,216 +70,75 @@ func (f *factory) getUnlocked(table ident.Table) *stage { return f.mu.instances.GetZero(table) } -// SelectMany implements types.Stagers. -func (f *factory) SelectMany( +// Unstage implements types.Stagers. +func (f *factory) Unstage( ctx context.Context, tx types.StagingQuerier, - q *types.SelectManyCursor, - fn types.SelectManyCallback, -) error { - if q.Limit == 0 { - return errors.New("limit must be set") + cursor *types.UnstageCursor, + fn types.UnstageCallback, +) (*types.UnstageCursor, bool, error) { + // Duplicate the cursor so callers can choose to advance. + cursor = cursor.Copy() + + data := &templateData{ + Cursor: cursor, + StagingSchema: f.stagingDB.Schema(), } - // Ensure offset >= start. - if hlc.Compare(q.OffsetTime, q.Start) < 0 { - q.OffsetTime = q.Start + q, err := data.Eval() + if err != nil { + return nil, false, err } - - // Each table is assigned a numeric id to simplify the queries. - tablesToIds := &ident.TableMap[int]{} - idsToTables := make(map[int]ident.Table) - var orderedTables []ident.Table - for _, tables := range q.Targets { - orderedTables = append(orderedTables, tables...) - for _, table := range tables { - id := tablesToIds.Len() - if _, duplicate := tablesToIds.Get(table); duplicate { - return errors.Errorf("duplicate table name: %s", table) - } - tablesToIds.Put(table, id) - idsToTables[id] = table - } + keyOffsets := make([]string, len(cursor.Targets)) + for idx, tbl := range cursor.Targets { + keyOffsets[idx] = string(cursor.StartAfterKey.GetZero(tbl)) } - - // Define the rows variable here, so we don't leak it from the - // variety of error code-paths below. - var rows pgx.Rows - defer func() { - if rows != nil { - rows.Close() - } - }() - - for { - start := time.Now() - - offsetTableIdx := -1 - if !q.OffsetTable.Empty() { - offsetTableIdx = tablesToIds.GetZero(q.OffsetTable) - } - - // This is a union-all construct: - // WITH data AS ( - // (SELECT ... FROM staging_table) - // UNION ALL - // (SELECT ... FROM another_staging_table) - // ) - // SELECT ... FROM data .... - // - // The generated SQL does vary based on the offset table value, - // so we can't easily reuse it across iterations. - var sb strings.Builder - sb.WriteString(`WITH -args AS (SELECT $1::INT, $2::INT, $3::INT, $4::INT, $5::INT, $6:::INT, $7::INT, $8::STRING), -data AS (`) - needsUnion := false - for _, table := range orderedTables { - id := tablesToIds.GetZero(table) - // If we're backfilling, the dominant ordering term is the - // table. Any tables that are before the starting table can - // simply be elided from the query. - if q.Backfill && id < offsetTableIdx { - continue - } - if needsUnion { - sb.WriteString(" UNION ALL ") - } else { - needsUnion = true - } - - // Mangle the real table name into its staging table name. - destination := stagingTable(f.stagingDB, table) - - // This ORDER BY should be driven by the index scan, but we - // don't want to be subject to any cross-range scan merges. - // The WHERE clauses in the sub-queries should line up with - // the ORDER BY in the top-level that we use for pagination. - if q.Backfill { - if id == offsetTableIdx { - // We're resuming in the middle of reading a table. - _, _ = fmt.Fprintf(&sb, - "(SELECT %[1]d AS t, nanos, logical, key, mut, before "+ - "FROM %[2]s "+ - "WHERE (nanos, logical, key) > ($6, $7, $8) "+ - "AND (nanos, logical) <= ($3, $4) "+ - "ORDER BY nanos, logical, key "+ - "LIMIT $5)", - id, destination) - } else { - // id must be > offsetTableIdx, since we filter out - // lesser values above. This means that we haven't - // yet read any values from this table, so we want - // to start at the beginning time. - _, _ = fmt.Fprintf(&sb, - "(SELECT %[1]d AS t, nanos, logical, key, mut, before "+ - "FROM %[2]s "+ - "WHERE (nanos, logical) >= ($1, $2) "+ - "AND (nanos, logical) <= ($3, $4) "+ - "ORDER BY nanos, logical, key "+ - "LIMIT $5)", - id, destination) - } - } else { - // The differences in these cases have to do with the - // comparison operators around (nanos,logical) to pick the - // correct starting point for the scan. - if id == offsetTableIdx { - _, _ = fmt.Fprintf(&sb, - "(SELECT %[1]d AS t, nanos, logical, key, mut, before "+ - "FROM %[2]s "+ - "WHERE ( (nanos, logical, key) > ($6, $7, $8) ) "+ - "AND (nanos, logical) <= ($3, $4) "+ - "ORDER BY nanos, logical, key "+ - "LIMIT $5)", - id, destination) - } else if id > offsetTableIdx { - _, _ = fmt.Fprintf(&sb, - "(SELECT %[1]d AS t, nanos, logical, key, mut, before "+ - "FROM %[2]s "+ - "WHERE (nanos, logical) >= ($6, $7) "+ - "AND (nanos, logical) <= ($3, $4) "+ - "ORDER BY nanos, logical, key "+ - "LIMIT $5)", - id, destination) - } else { - _, _ = fmt.Fprintf(&sb, - "(SELECT %[1]d AS t, nanos, logical, key, mut, before "+ - "FROM %[2]s "+ - "WHERE (nanos, logical) > ($6, $7) "+ - "AND (nanos, logical) <= ($3, $4) "+ - "ORDER BY nanos, logical, key "+ - "LIMIT $5)", - id, destination) - } - } + rows, err := tx.Query(ctx, q, + cursor.StartAt.Nanos(), + cursor.StartAt.Logical(), + cursor.EndBefore.Nanos(), + cursor.EndBefore.Logical(), + keyOffsets) + if err != nil { + return nil, false, errors.Wrap(err, q) + } + defer rows.Close() + + hadRows := false + // We want to reset StartAfterKey whenever we read into a new region + // of timestamps. + epoch := cursor.StartAt + for rows.Next() { + var mut types.Mutation + var tableIdx int + var nanos int64 + var logical int + if err := rows.Scan(&tableIdx, &nanos, &logical, &mut.Key, &mut.Data, &mut.Before); err != nil { + return nil, false, errors.WithStack(err) } - sb.WriteString(`) SELECT t, nanos, logical, key, mut, before FROM data `) - if q.Backfill { - sb.WriteString(`ORDER BY t, nanos, logical, key LIMIT $5`) - } else { - sb.WriteString(`ORDER BY nanos, logical, t, key LIMIT $5`) + mut.Before, err = maybeGunzip(mut.Before) + if err != nil { + return nil, false, err } - - var err error - // Rows closed in defer statement above. - rows, err = tx.Query(ctx, sb.String(), - q.Start.Nanos(), - q.Start.Logical(), - q.End.Nanos(), - q.End.Logical(), - q.Limit, - q.OffsetTime.Nanos(), - q.OffsetTime.Logical(), - string(q.OffsetKey), - ) + mut.Data, err = maybeGunzip(mut.Data) if err != nil { - return errors.Wrap(err, sb.String()) + return nil, false, err } - - count := 0 - var lastMut types.Mutation - var lastTable ident.Table - for rows.Next() { - count++ - var mut types.Mutation - var tableIdx int - var nanos int64 - var logical int - if err := rows.Scan(&tableIdx, &nanos, &logical, &mut.Key, &mut.Data, &mut.Before); err != nil { - return errors.WithStack(err) - } - mut.Before, err = maybeGunzip(mut.Before) - if err != nil { - return err - } - mut.Data, err = maybeGunzip(mut.Data) - if err != nil { - return err - } - mut.Time = hlc.New(nanos, logical) - - lastMut = mut - lastTable = idsToTables[tableIdx] - if err := fn(ctx, lastTable, mut); err != nil { - return err - } + mut.Time = hlc.New(nanos, logical) + if hlc.Compare(mut.Time, epoch) > 0 { + epoch = mut.Time + cursor.StartAt = epoch + cursor.StartAfterKey = ident.TableMap[json.RawMessage]{} } + cursor.StartAfterKey.Put(cursor.Targets[tableIdx], mut.Key) + hadRows = true - // Only update the cursor if we've successfully read the entire page. - q.OffsetKey = lastMut.Key - q.OffsetTime = lastMut.Time - q.OffsetTable = lastTable - - log.WithFields(log.Fields{ - "count": count, - "duration": time.Since(start), - "targets": q.Targets, - }).Debug("retrieved staged mutations") - - // We can stop once we've received less than a max-sized window - // of mutations. Otherwise, loop around. - if count < q.Limit { - return nil + if err := fn(ctx, cursor.Targets[tableIdx], mut); err != nil { + return nil, false, err } } + if err := rows.Err(); err != nil { + return nil, false, errors.WithStack(err) + } + + return cursor, hadRows, nil } diff --git a/internal/staging/stage/metrics.go b/internal/staging/stage/metrics.go index f4e6cc11a..1d578b86f 100644 --- a/internal/staging/stage/metrics.go +++ b/internal/staging/stage/metrics.go @@ -32,7 +32,6 @@ var ( Name: "stage_retire_errors_total", Help: "the number of times an error was encountered while retiring mutations", }, metrics.TableLabels) - stageSelectCount = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "stage_select_mutations_total", Help: "the number of mutations read for this table", @@ -46,7 +45,10 @@ var ( Name: "stage_select_errors_total", Help: "the number of times an error was encountered while selecting mutations", }, metrics.TableLabels) - + stageStaleMutations = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "stage_stale_mutations_count", + Help: "the number of un-applied staged mutations left after retiring applied mutations", + }, metrics.TableLabels) stageStoreCount = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "stage_store_mutations_total", Help: "the number of mutations stored for this table", diff --git a/internal/staging/stage/queries/unstage.tmpl b/internal/staging/stage/queries/unstage.tmpl new file mode 100644 index 000000000..4b5b169a9 --- /dev/null +++ b/internal/staging/stage/queries/unstage.tmpl @@ -0,0 +1,107 @@ +{{- /*gotype: github.com/cockroachdb/cdc-sink/internal/staging/stage.templateData*/ -}} +{{- $top := . -}} +WITH {{- sp -}} + +{{- /* +Select the minimum unapplied timestamp(s) from the source table. + +We start scanning at some timestamp, but after a starting key, which +could be a zero-length string. + +hlc_0 AS ( + SELECT nanos, logical FROM staging_table + WHERE (nanos, logical, key) > (start_at_nanos, start_at_logical, start_after_key) + AND (nanos, locical) < (end_before_nanos, end_before_logical) + AND NOT APPLIED + GROUP BY nanos, logical -- We want multi-column distinct behavior + ORDER BY nanos, logical + LIMIT N +) + +*/ -}} +{{- range $idx, $tgt := .Cursor.Targets -}} +{{- if $idx -}}, {{- nl -}}{{- end -}} +hlc_{{ $idx }} (n, l) AS ( +SELECT nanos, logical +FROM {{ $top.StagingTable $tgt }} +WHERE (nanos, logical, key) > ($1, $2, ($5::STRING[])[ {{- add $idx 1 -}} ]) +AND (nanos, logical) < ($3, $4) +AND NOT applied +GROUP BY nanos, logical +ORDER BY nanos, logical +LIMIT {{ or $top.Cursor.TimestampLimit 1 }} +) +{{- end -}} + +{{- /* +Select the minimum timestamp(s) across all source tables. + +hlc_all AS ( + SELECT n, l FROM hlc_0 UNION ALL + SELECT n, l FROM hlc_1 UNION ALL ... + SELECT n, l FROM hlc_N +), +hlc_min AS (SELECT n, l FROM hlc_all GROUY BY n, l ORDER BY n, l LIMIT N) + +*/ -}} +, {{- nl -}} +hlc_all AS ( +{{- range $idx, $tgt := .Cursor.Targets -}} +{{- if $idx }} UNION ALL {{- nl -}}{{- end -}} +SELECT n, l FROM hlc_{{ $idx }} +{{- end -}} +), +hlc_min AS (SELECT n, l FROM hlc_all GROUP BY n, l ORDER BY n, l LIMIT {{ or .Cursor.TimestampLimit 1 }}) + +{{- /* +Set the applied column and return the data. + +We want to update any non-applied mutations after the starting key +and within the HLC timestamp that we're operating on. + +data_0 AS ( + UPDATE staging_table SET applied=true + FROM hlc_min + WHERE (nanos, logical) IN (hlc_min) + AND (nanos, logical, key) > (start_at_nanos, start_at_logical, start_after_key) + AND NOT APPLIED + [ ORDER BY nanos, logical, key + LIMIT n ] + RETURNING nanos, logical, key, mut, before +) +*/ -}} +{{- range $idx, $tgt := .Cursor.Targets -}} +, {{- nl -}} +data_{{ $idx }} AS ( +UPDATE {{ $top.StagingTable $tgt }} +SET applied=true +FROM hlc_min +WHERE (nanos,logical) = (n, l) +AND (nanos, logical, key) > ($1, $2, ($5::STRING[])[ {{- add $idx 1 -}} ]) +AND NOT applied +{{- if $top.Cursor.UpdateLimit -}} {{- nl -}} +ORDER BY nanos, logical, key +LIMIT {{ $top.Cursor.UpdateLimit }} +{{- end -}} +{{- nl -}} RETURNING nanos, logical, key, mut, before) +{{- end -}} +{{- nl -}} + +{{- /* +Top-level query aggregates the updates in table order. + +SELECT 0 idx, * FROM data_0 UNION ALL +SELECT 1 idx, * FROM data_1 UNION ALL ... +SELECT N idx, * FROM data_N +ORDER BY nanos, logical, idx, key + +*/ -}} +SELECT * FROM ( {{- nl -}} +{{- range $idx, $tgt := .Cursor.Targets -}} +{{- if $idx }} UNION ALL {{- nl -}}{{- end -}} +SELECT {{ $idx }} idx, nanos, logical, key, mut, before FROM data_{{ $idx }} +{{- end -}} +) {{- nl -}} +ORDER BY nanos, logical, idx, key + +{{- /* Consume whitespace */ -}} diff --git a/internal/staging/stage/stage.go b/internal/staging/stage/stage.go index f3592a9ba..948e0e924 100644 --- a/internal/staging/stage/stage.go +++ b/internal/staging/stage/stage.go @@ -31,6 +31,8 @@ import ( "github.com/cockroachdb/cdc-sink/internal/util/hlc" "github.com/cockroachdb/cdc-sink/internal/util/ident" "github.com/cockroachdb/cdc-sink/internal/util/metrics" + "github.com/cockroachdb/cdc-sink/internal/util/msort" + "github.com/cockroachdb/cdc-sink/internal/util/notify" "github.com/cockroachdb/cdc-sink/internal/util/retry" "github.com/cockroachdb/cdc-sink/internal/util/stopper" "github.com/jackc/pgx/v5" @@ -54,29 +56,42 @@ func stagingTable(stagingDB ident.Schema, target ident.Table) ident.Table { type stage struct { // The staging table that holds the mutations. stage ident.Table - retireFrom hlc.Time // Makes subsequent calls to Retire() a bit faster. + retireFrom notify.Var[hlc.Time] // Makes subsequent calls to Retire() a bit faster. retireDuration prometheus.Observer retireError prometheus.Counter selectCount prometheus.Counter selectDuration prometheus.Observer selectError prometheus.Counter + staleCount prometheus.Gauge storeCount prometheus.Counter storeDuration prometheus.Observer storeError prometheus.Counter // Compute SQL fragments exactly once on startup. sql struct { - nextAfter string // Find a timestamp for which data is available. - retire string // Delete a batch of staged mutations - store string // store mutations - sel string // select all rows in the timeframe from the staging table - selPartial string // select limited number of rows from the staging table + retire string // Delete a batch of staged mutations. + store string // Store mutations. + unapplied string // Count stale, unapplied mutations. } } var _ types.Stager = (*stage)(nil) +const tableSchema = ` +CREATE TABLE IF NOT EXISTS %[1]s ( + nanos INT NOT NULL, + logical INT NOT NULL, + key STRING NOT NULL, + mut BYTES NOT NULL, + before BYTES NULL, + applied BOOL NOT NULL DEFAULT false, + %[2]s + PRIMARY KEY (nanos, logical, key), + FAMILY cold (mut, before), + FAMILY hot (applied) +)` + // newStore constructs a new mutation stage that will track pending // mutations to be applied to the given target table. func newStore( @@ -84,16 +99,21 @@ func newStore( ) (*stage, error) { table := stagingTable(stagingDB, target) - if err := retry.Execute(ctx, db, fmt.Sprintf(` -CREATE TABLE IF NOT EXISTS %s ( - nanos INT NOT NULL, - logical INT NOT NULL, - key STRING NOT NULL, - mut BYTES NOT NULL, - before BYTES NULL, - PRIMARY KEY (nanos, logical, key) -)`, table)); err != nil { - return nil, errors.WithStack(err) + // Try to create the staging table with a helper virtual column. We + // never query for it, so it should have essentially no cost. + if err := retry.Execute(ctx, db, fmt.Sprintf(tableSchema, table, + `source_time TIMESTAMPTZ AS (to_timestamp(nanos::float/1e9)) VIRTUAL,`)); err != nil { + + // Old versions of CRDB don't know about to_timestamp(). Try + // again without the helper column. + if pgErr := (*pgconn.PgError)(nil); errors.As(err, &pgErr) { + if pgErr.Code == "42883" /* unknown function */ { + err = retry.Execute(ctx, db, fmt.Sprintf(tableSchema, table, "")) + } + } + if err != nil { + return nil, errors.WithStack(err) + } } // Transparently upgrade older staging tables. This avoids needing @@ -112,179 +132,65 @@ ALTER TABLE %[1]s ADD COLUMN IF NOT EXISTS before BYTES NULL selectCount: stageSelectCount.WithLabelValues(labels...), selectDuration: stageSelectDurations.WithLabelValues(labels...), selectError: stageSelectErrors.WithLabelValues(labels...), + staleCount: stageStaleMutations.WithLabelValues(labels...), storeCount: stageStoreCount.WithLabelValues(labels...), storeDuration: stageStoreDurations.WithLabelValues(labels...), storeError: stageStoreErrors.WithLabelValues(labels...), } - s.sql.nextAfter = fmt.Sprintf(nextAfterTemplate, table) s.sql.retire = fmt.Sprintf(retireTemplate, table) - s.sql.sel = fmt.Sprintf(selectTemplateAll, table) - s.sql.selPartial = fmt.Sprintf(selectTemplatePartial, table) s.sql.store = fmt.Sprintf(putTemplate, table) + s.sql.unapplied = fmt.Sprintf(countTemplate, table) - return s, nil -} - -// GetTable returns the table that the stage is storing into. -func (s *stage) GetTable() ident.Table { return s.stage } - -const nextAfterTemplate = ` -SELECT DISTINCT nanos, logical - FROM %[1]s -WHERE (nanos, logical) > ($1, $2) AND (nanos, logical) <= ($3, $4) -ORDER BY nanos, logical -` + // Report unapplied mutations on a periodic basis. + ctx.Go(func() error { + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() -// TransactionTimes implements types.Stager and returns timestamps for -// which data is available in the (before, after] range. -func (s *stage) TransactionTimes( - ctx context.Context, tx types.StagingQuerier, before, after hlc.Time, -) ([]hlc.Time, error) { - var ret []hlc.Time - err := retry.Retry(ctx, func(ctx context.Context) error { - ret = ret[:0] // Reset if retrying. - rows, err := tx.Query(ctx, - s.sql.nextAfter, - before.Nanos(), - before.Logical(), - after.Nanos(), - after.Logical(), - ) - if err != nil { - return errors.WithStack(err) - } + for from, changed := s.retireFrom.Get(); ; { + ct, err := s.CountUnapplied(ctx, db, from) + if err != nil { + log.WithError(err).Warnf("could not count unapplied mutations for target: %s", target) + } else { + s.staleCount.Set(float64(ct)) + } - for rows.Next() { - var nanos int64 - var logical int - if err := rows.Scan(&nanos, &logical); err != nil { - return errors.WithStack(err) + select { + case <-ctx.Stopping(): + return nil + case <-changed: + from, changed = s.retireFrom.Get() + case <-ticker.C: + // Ensure that values get reset if this instance of + // cdc-sink isn't the one that's actively resolving or + // retiring mutations. } - ret = append(ret, hlc.New(nanos, logical)) } - return errors.WithStack(rows.Err()) }) - return ret, err -} - -// ($1, $2) starting resolved timestamp -// ($3, $4) ending resolved timestamp -// -// This query drains all mutations between the given timestamps, -// returning the latest timestamped value for any given key. -const selectTemplateAll = ` -SELECT key, nanos, logical, mut, before - FROM %[1]s - WHERE (nanos, logical) BETWEEN ($1, $2) AND ($3, $4) - ORDER BY nanos, logical` - -// ($1, $2) starting resolved timestamp -// ($3, $4) ending resolved timestamp -// $5 starting key to skip -// $6 limit -// -// For the kind of very large datasets that we see in a backfill -// scenario, it's not feasible to deduplicate updates for a given key -// with in the time range. We would have to scan forward to all -// timestamps within the given timestamp range to see if there's a key -// value to be had. An index over (key, nanos, time) doesn't help, -// either for key-based pagination, since we now have to read through -// all keys to identify those with a mutation in the desired window. -const selectTemplatePartial = ` -SELECT key, nanos, logical, mut, before - FROM %[1]s - WHERE (nanos, logical, key) > ($1, $2, $5) - AND (nanos, logical) <= ($3, $4) - ORDER BY nanos, logical, key - LIMIT $6` - -// Select implements types.Stager. -func (s *stage) Select( - ctx context.Context, tx types.StagingQuerier, prev, next hlc.Time, -) ([]types.Mutation, error) { - return s.SelectPartial(ctx, tx, prev, next, nil, -1) + return s, nil } -// SelectPartial implements types.Stager. -func (s *stage) SelectPartial( - ctx context.Context, tx types.StagingQuerier, prev, next hlc.Time, afterKey []byte, limit int, -) ([]types.Mutation, error) { - if hlc.Compare(prev, next) > 0 { - return nil, errors.Errorf("timestamps out of order: %s > %s", prev, next) - } - - start := time.Now() - var ret []types.Mutation - if limit > 0 { - ret = make([]types.Mutation, 0, limit) - } +const countTemplate = ` +SELECT count(*) FROM %s +WHERE (nanos, logical) < ($1, $2) AND NOT applied +` +// CountUnapplied returns the number of dangling mutations that likely +// indicate an error condition. +func (s *stage) CountUnapplied( + ctx context.Context, db types.StagingQuerier, before hlc.Time, +) (int, error) { + var ret int err := retry.Retry(ctx, func(ctx context.Context) error { - ret = ret[:0] - var rows pgx.Rows - var err error - if limit <= 0 { - rows, err = tx.Query(ctx, s.sql.sel, - prev.Nanos(), prev.Logical(), next.Nanos(), next.Logical()) - } else { - rows, err = tx.Query(ctx, s.sql.selPartial, - prev.Nanos(), prev.Logical(), next.Nanos(), next.Logical(), string(afterKey), limit) - } - if err != nil { - return err - } - defer rows.Close() - - for rows.Next() { - var mut types.Mutation - var nanos int64 - var logical int - if err := rows.Scan(&mut.Key, &nanos, &logical, &mut.Data, &mut.Before); err != nil { - return err - } - mut.Data, err = maybeGunzip(mut.Data) - if err != nil { - return err - } - mut.Time = hlc.New(nanos, logical) - ret = append(ret, mut) - } - return nil + return db.QueryRow(ctx, s.sql.unapplied, before.Nanos(), before.Logical()).Scan(&ret) }) - - if err != nil { - if pgErr := (*pgconn.PgError)(nil); errors.As(err, &pgErr) { - // Staging table not found. Most likely cause is a reference - // table which was restored via backup for which we haven't - // seen any updates. - if pgErr.Code == "42P01" { - return nil, nil - } - } - s.selectError.Inc() - return nil, errors.Wrapf(err, "select %s [%s, %s]", s.stage, prev, next) - } - - // Don't bother recording stats about no-op selects. - if len(ret) == 0 { - return nil, nil - } - - d := time.Since(start) - s.selectDuration.Observe(d.Seconds()) - s.selectCount.Add(float64(len(ret))) - log.WithFields(log.Fields{ - "count": len(ret), - "duration": d, - "next": next, - "prev": prev, - "target": s.stage, - }).Debug("select mutations") - return ret, nil + return ret, errors.Wrap(err, s.sql.unapplied) } +// GetTable returns the table that the stage is storing into. +func (s *stage) GetTable() ident.Table { return s.stage } + // The byte-array casts on $4 and $5 are because arrays of JSONB aren't implemented: // https://github.com/cockroachdb/cockroach/issues/23468 const putTemplate = ` @@ -297,6 +203,8 @@ func (s *stage) Store( ) error { start := time.Now() + mutations = msort.UniqueByTimeKey(mutations) + // If we're working with a pool, and not a transaction, we'll stage // the data in a concurrent manner. var err error @@ -388,7 +296,7 @@ func (s *stage) putOne( const retireTemplate = ` WITH d AS ( DELETE FROM %s - WHERE (nanos, logical) BETWEEN ($1, $2) AND ($3, $4) + WHERE (nanos, logical) BETWEEN ($1, $2) AND ($3, $4) AND applied ORDER BY nanos, logical LIMIT $5 RETURNING nanos, logical) @@ -400,12 +308,13 @@ SELECT last_value(nanos) OVER (), last_value(logical) OVER () func (s *stage) Retire(ctx context.Context, db types.StagingQuerier, end hlc.Time) error { start := time.Now() err := retry.Retry(ctx, func(ctx context.Context) error { - for hlc.Compare(s.retireFrom, end) < 0 { + from, _ := s.retireFrom.Get() + for hlc.Compare(from, end) < 0 { var lastNanos int64 var lastLogical int err := db.QueryRow(ctx, s.sql.retire, - s.retireFrom.Nanos(), - s.retireFrom.Logical(), + from.Nanos(), + from.Logical(), end.Nanos(), end.Logical(), 10000, // Make configurable? @@ -417,12 +326,13 @@ func (s *stage) Retire(ctx context.Context, db types.StagingQuerier, end hlc.Tim if err != nil { return errors.WithStack(err) } - s.retireFrom = hlc.New(lastNanos, lastLogical) + from = hlc.New(lastNanos, lastLogical) } // If there was nothing to delete, still advance the marker. - if hlc.Compare(s.retireFrom, end) < 0 { - s.retireFrom = end + if hlc.Compare(from, end) < 0 { + from = end } + s.retireFrom.Set(from) return nil }) if err == nil { diff --git a/internal/staging/stage/stage_test.go b/internal/staging/stage/stage_test.go index 4160c354f..e6ef93969 100644 --- a/internal/staging/stage/stage_test.go +++ b/internal/staging/stage/stage_test.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cdc-sink/internal/types" "github.com/cockroachdb/cdc-sink/internal/util/hlc" "github.com/cockroachdb/cdc-sink/internal/util/ident" + "github.com/jackc/pgx/v5" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -39,11 +40,10 @@ import ( // TestPutAndDrain will insert and dequeue a batch of Mutations. func TestPutAndDrain(t *testing.T) { a := assert.New(t) + r := require.New(t) fixture, cancel, err := all.NewFixture() - if !a.NoError(err) { - return - } + r.NoError(err) defer cancel() ctx := fixture.Context @@ -54,13 +54,16 @@ func TestPutAndDrain(t *testing.T) { dummyTarget := ident.NewTable(targetDB, ident.New("target")) s, err := fixture.Stagers.Get(ctx, dummyTarget) - if !a.NoError(err) { - return - } + r.NoError(err) a.NotNil(s) + // Not part of public API, but we want to test the metrics function. + ctr := s.(interface { + CountUnapplied(ctx context.Context, db types.StagingQuerier, before hlc.Time) (int, error) + }) + jumbledStager, err := fixture.Stagers.Get(ctx, sinktest.JumbleTable(dummyTarget)) - a.NoError(err) + r.NoError(err) a.Same(s, jumbledStager) // Steal implementation details to cross-check DB state. @@ -80,134 +83,65 @@ func TestPutAndDrain(t *testing.T) { muts[i].Before = []byte("before") } } - maxTime := muts[len(muts)-1].Time - - // Check TransactionTimes in empty state. - found, err := s.TransactionTimes(ctx, pool, hlc.Zero(), maxTime) - a.Empty(found) - a.NoError(err) // Insert. - a.NoError(s.Store(ctx, pool, muts)) - - // We should find all timestamps now. - found, err = s.TransactionTimes(ctx, pool, hlc.Zero(), maxTime) - if a.Len(found, len(muts)) { - a.Equal(muts[0].Time, found[0]) - a.Equal(maxTime, found[len(found)-1]) - } - a.NoError(err) - - // Advance once. - found, err = s.TransactionTimes(ctx, pool, muts[0].Time, maxTime) - if a.Len(found, len(muts)-1) { - a.Equal(muts[1].Time, found[0]) - a.Equal(maxTime, found[len(found)-1]) - } - a.NoError(err) - - // Make sure we don't find the last value. - found, err = s.TransactionTimes(ctx, pool, maxTime, maxTime) - a.Empty(found) - a.NoError(err) + r.NoError(s.Store(ctx, pool, muts)) // Sanity-check table. count, err := base.GetRowCount(ctx, pool, stagingTable) - a.NoError(err) + r.NoError(err) a.Equal(total, count) // Ensure that data insertion is idempotent. - a.NoError(s.Store(ctx, pool, muts)) + r.NoError(s.Store(ctx, pool, muts)) // Sanity-check table. count, err = base.GetRowCount(ctx, pool, stagingTable) - a.NoError(err) + r.NoError(err) a.Equal(total, count) - // Insert an older value for each key, we'll check that only the - // latest values are returned below. - older := make([]types.Mutation, total) - copy(older, muts) - for i := range older { - older[i].Data = []byte(`"should not see this"`) - older[i].Time = hlc.New(older[i].Time.Nanos()-1, i) - } - a.NoError(s.Store(ctx, pool, older)) + // Verify metrics query. + count, err = ctr.CountUnapplied(ctx, pool, hlc.New(math.MaxInt64, 0)) + r.NoError(err) + a.Equal(total, count) - // Sanity-check table. - count, err = base.GetRowCount(ctx, pool, stagingTable) - a.NoError(err) - a.Equal(2*total, count) - - // The two queries that we're going to run will see slightly - // different views of the data. The all-data query will provide - // deduplicated data, ordered by the target key. The incremental - // query will see all values interleaved, which allows us to - // page within a (potentially large) backfill window. - dedupOrder := make([]types.Mutation, total) - copy(dedupOrder, muts) - sort.Slice(dedupOrder, func(i, j int) bool { - return bytes.Compare(dedupOrder[i].Key, dedupOrder[j].Key) < 0 - }) - mergedOrder := make([]types.Mutation, 2*total) - for i := range muts { - mergedOrder[2*i] = older[i] - mergedOrder[2*i+1] = muts[i] + // Select all mutations to set the applied flag. This allows the + // mutations to be deleted. + cursor := &types.UnstageCursor{ + EndBefore: hlc.New(math.MaxInt64, 0), + Targets: []ident.Table{dummyTarget}, } - - // Test retrieving all data. - ret, err := s.Select(ctx, pool, hlc.Zero(), hlc.New(int64(1000*total+1), 0)) - a.NoError(err) - a.Equal(mergedOrder, ret) - - // Retrieve a few pages of partial values, validate expected boundaries. - const limit = 10 - var tail types.Mutation - for i := 0; i < 10; i++ { - ret, err = s.SelectPartial(ctx, - pool, - tail.Time, - muts[len(muts)-1].Time, - tail.Key, - limit, - ) - a.NoError(err) - a.Equalf(mergedOrder[i*limit:(i+1)*limit], ret, "at idx %d", i) - tail = ret[len(ret)-1] + unstagedCount := 0 + for unstaging := true; unstaging; { + cursor, unstaging, err = fixture.Stagers.Unstage(ctx, pool, cursor, + func(context.Context, ident.Table, types.Mutation) error { + unstagedCount++ + return nil + }) + r.NoError(err) } - a.NoError(s.Retire(ctx, pool, muts[limit-1].Time)) - - // Verify that reading from the end returns no results. - ret, err = s.SelectPartial(ctx, - pool, - muts[len(muts)-1].Time, - hlc.New(math.MaxInt64, 0), - muts[len(muts)-1].Key, - limit, - ) - a.NoError(err) - a.Empty(ret) - - // Check deletion. We have two timestamps for each - count, err = base.GetRowCount(ctx, pool, stagingTable) - a.NoError(err) - a.Equal(2*total-2*limit, count) + a.Equal(total, unstagedCount) - // Dequeue remainder. - a.NoError(s.Retire(ctx, pool, muts[len(muts)-1].Time)) + // Verify metrics query. + count, err = ctr.CountUnapplied(ctx, pool, hlc.New(math.MaxInt64, 0)) + r.NoError(err) + a.Zero(count) + + // Retire mutations. + r.NoError(s.Retire(ctx, pool, muts[len(muts)-1].Time)) // Should be empty now. count, err = base.GetRowCount(ctx, pool, stagingTable) - a.NoError(err) + r.NoError(err) a.Equal(0, count) // Verify various no-op calls are OK. - a.NoError(s.Retire(ctx, pool, hlc.Zero())) - a.NoError(s.Retire(ctx, pool, muts[len(muts)-1].Time)) - a.NoError(s.Retire(ctx, pool, hlc.New(muts[len(muts)-1].Time.Nanos()+1, 0))) + r.NoError(s.Retire(ctx, pool, hlc.Zero())) + r.NoError(s.Retire(ctx, pool, muts[len(muts)-1].Time)) + r.NoError(s.Retire(ctx, pool, hlc.New(muts[len(muts)-1].Time.Nanos()+1, 0))) } -func TestSelectMany(t *testing.T) { +func TestUnstage(t *testing.T) { const entries = 100 const tableCount = 10 a := assert.New(t) @@ -303,24 +237,34 @@ func TestSelectMany(t *testing.T) { a := assert.New(t) r := require.New(t) - q := &types.SelectManyCursor{ - End: hlc.New(100*entries, 0), // Past any existing time. - Limit: entries/2 - 1, // Validate paging - Targets: [][]ident.Table{ - {tables[0]}, - // {tables[1], tables[2]}, - // {tables[3], tables[4], tables[5]}, - // {tables[6], tables[7], tables[8], tables[9]}, - }, + q := &types.UnstageCursor{ + EndBefore: hlc.New(100*entries, 0), // Past any existing time. + Targets: tables, } - entriesByTable := &ident.TableMap[[]types.Mutation]{} - err := fixture.Stagers.SelectMany(ctx, fixture.StagingPool, q, - func(ctx context.Context, tbl ident.Table, mut types.Mutation) error { - entriesByTable.Put(tbl, append(entriesByTable.GetZero(tbl), mut)) - return nil - }) + // Run the select in a discarded transaction to avoid + // contaminating future tests with side effects. + tx, err := fixture.StagingPool.BeginTx(ctx, pgx.TxOptions{}) r.NoError(err) + defer func() { _ = tx.Rollback(ctx) }() + + entriesByTable := &ident.TableMap[[]types.Mutation]{} + numSelections := 0 + for selecting := true; selecting; { + q, selecting, err = fixture.Stagers.Unstage(ctx, tx, q, + func(ctx context.Context, tbl ident.Table, mut types.Mutation) error { + entriesByTable.Put(tbl, append(entriesByTable.GetZero(tbl), mut)) + return nil + }) + r.NoError(err) + numSelections++ + } + // Refer to comment about the distribution of timestamps. + // There are two large batches, then each mutation has two + // unique timestamps, and then there's a final call that returns + // false. + a.Equal(2+2*entries+1, numSelections) + r.NoError(entriesByTable.Range(func(_ ident.Table, seen []types.Mutation) error { if a.Len(seen, len(expectedMutOrder)) { a.Equal(expectedMutOrder, seen) @@ -334,25 +278,32 @@ func TestSelectMany(t *testing.T) { a := assert.New(t) r := require.New(t) - q := &types.SelectManyCursor{ - Start: hlc.New(2, 0), // Skip the initial transaction - End: hlc.New(10*entries, 0), // Read the second large batch - Limit: entries/2 - 1, // Validate paging - Targets: [][]ident.Table{ - {tables[0]}, - {tables[1], tables[2]}, - {tables[3], tables[4], tables[5]}, - {tables[6], tables[7], tables[8], tables[9]}, - }, + q := &types.UnstageCursor{ + StartAt: hlc.New(2, 0), // Skip the initial transaction + EndBefore: hlc.New(10*entries, 1), // Read the second large batch + Targets: tables, } - entriesByTable := &ident.TableMap[[]types.Mutation]{} - err := fixture.Stagers.SelectMany(ctx, fixture.StagingPool, q, - func(ctx context.Context, tbl ident.Table, mut types.Mutation) error { - entriesByTable.Put(tbl, append(entriesByTable.GetZero(tbl), mut)) - return nil - }) + // Run the select in a discarded transaction to avoid + // contaminating future tests with side effects. + tx, err := fixture.StagingPool.BeginTx(ctx, pgx.TxOptions{}) r.NoError(err) + defer func() { _ = tx.Rollback(ctx) }() + + entriesByTable := &ident.TableMap[[]types.Mutation]{} + numSelections := 0 + for selecting := true; selecting; { + q, selecting, err = fixture.Stagers.Unstage(ctx, tx, q, + func(ctx context.Context, tbl ident.Table, mut types.Mutation) error { + entriesByTable.Put(tbl, append(entriesByTable.GetZero(tbl), mut)) + return nil + }) + r.NoError(err) + numSelections++ + } + // We expect to see one large batch, a timestamp for each entry, + // and the final zero-results call. + a.Equal(1+entries+1, numSelections) r.NoError(entriesByTable.Range(func(_ ident.Table, seen []types.Mutation) error { if a.Len(seen, 2*entries) { a.Equal(expectedMutOrder[entries:3*entries], seen) @@ -361,40 +312,36 @@ func TestSelectMany(t *testing.T) { })) }) - // What's different about the backfill case is that we want to - // verify that if we see an update for a table in group G, then we - // already have all data for group G-1. - t.Run("backfill", func(t *testing.T) { + // Read from the staging tables using the limit, to simulate + // very large batches. + t.Run("transactional-incremental", func(t *testing.T) { a := assert.New(t) r := require.New(t) - q := &types.SelectManyCursor{ - Backfill: true, - End: hlc.New(100*entries, 0), // Read the second large batch - Limit: entries/2 - 1, // Validate paging - Targets: [][]ident.Table{ - {tables[0]}, - {tables[1], tables[2]}, - {tables[3], tables[4], tables[5]}, - {tables[6], tables[7], tables[8], tables[9]}, - }, + q := &types.UnstageCursor{ + EndBefore: hlc.New(100*entries, 0), // Past any existing time. + Targets: tables, + UpdateLimit: 20, } - entriesByTable := &ident.TableMap[[]types.Mutation]{} - err := fixture.Stagers.SelectMany(ctx, fixture.StagingPool, q, - func(ctx context.Context, tbl ident.Table, mut types.Mutation) error { - entriesByTable.Put(tbl, append(entriesByTable.GetZero(tbl), mut)) - - // Check that all data for parent groups have been received. - if group := tableToGroup.GetZero(tbl); group > 1 { - for _, tableToCheck := range tableGroups[group-1] { - r.Len(entriesByTable.GetZero(tableToCheck), len(muts)) - } - } - return nil - }) + // Run the select in a discarded transaction to avoid + // contaminating future tests with side effects. + tx, err := fixture.StagingPool.BeginTx(ctx, pgx.TxOptions{}) r.NoError(err) + defer func() { _ = tx.Rollback(ctx) }() + entriesByTable := &ident.TableMap[[]types.Mutation]{} + numSelections := 0 + for selecting := true; selecting; { + q, selecting, err = fixture.Stagers.Unstage(ctx, tx, q, + func(ctx context.Context, tbl ident.Table, mut types.Mutation) error { + entriesByTable.Put(tbl, append(entriesByTable.GetZero(tbl), mut)) + return nil + }) + r.NoError(err) + numSelections++ + } + a.Equal(211, numSelections) r.NoError(entriesByTable.Range(func(_ ident.Table, seen []types.Mutation) error { if a.Len(seen, len(expectedMutOrder)) { a.Equal(expectedMutOrder, seen) @@ -402,47 +349,6 @@ func TestSelectMany(t *testing.T) { return nil })) }) - - // Similar to the backfill test, but we read a subset of the data. - t.Run("backfill-bounded", func(t *testing.T) { - a := assert.New(t) - r := require.New(t) - - q := &types.SelectManyCursor{ - Start: hlc.New(2, 0), // Skip the initial transaction - End: hlc.New(10*entries, 0), // Read the second large batch - Backfill: true, - Limit: entries/2 - 1, // Validate paging - Targets: [][]ident.Table{ - {tables[0]}, - {tables[1], tables[2]}, - {tables[3], tables[4], tables[5]}, - {tables[6], tables[7], tables[8], tables[9]}, - }, - } - - entriesByTable := &ident.TableMap[[]types.Mutation]{} - err := fixture.Stagers.SelectMany(ctx, fixture.StagingPool, q, - func(ctx context.Context, tbl ident.Table, mut types.Mutation) error { - entriesByTable.Put(tbl, append(entriesByTable.GetZero(tbl), mut)) - - // Check that all data for parent groups have been received. - if group := tableToGroup.GetZero(tbl); group > 1 { - for _, tableToCheck := range tableGroups[group-1] { - r.Len(entriesByTable.GetZero(tableToCheck), 2*entries) - } - } - return nil - }) - r.NoError(err) - - r.NoError(entriesByTable.Range(func(_ ident.Table, seen []types.Mutation) error { - if a.Len(seen, 2*entries) { - a.Equal(expectedMutOrder[entries:3*entries], seen) - } - return nil - })) - }) } func BenchmarkStage(b *testing.B) { diff --git a/internal/staging/stage/templates.go b/internal/staging/stage/templates.go new file mode 100644 index 000000000..0a891445a --- /dev/null +++ b/internal/staging/stage/templates.go @@ -0,0 +1,59 @@ +// Copyright 2023 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package stage + +import ( + "embed" + "strings" + "text/template" + + "github.com/cockroachdb/cdc-sink/internal/types" + "github.com/cockroachdb/cdc-sink/internal/util/ident" + "github.com/pkg/errors" +) + +//go:embed queries/*.tmpl +var templateFS embed.FS + +var ( + tmplFuncs = template.FuncMap{ + "add": func(a, b int) int { return a + b }, + "nl": func() string { return "\n" }, + "sp": func() string { return " " }, + } + templates = template.Must( + template.New("").Funcs(tmplFuncs).ParseFS(templateFS, "queries/*.tmpl")) + unstage = templates.Lookup("unstage.tmpl") +) + +type templateData struct { + Cursor *types.UnstageCursor // Required input. + StagingSchema ident.Schema // Required input. +} + +func (d *templateData) Eval() (string, error) { + var sb strings.Builder + err := unstage.Execute(&sb, d) + if err != nil { + return "", errors.WithStack(err) + } + return sb.String(), nil +} + +func (d *templateData) StagingTable(tbl ident.Table) ident.Table { + return stagingTable(d.StagingSchema, tbl) +} diff --git a/internal/staging/stage/templates_test.go b/internal/staging/stage/templates_test.go new file mode 100644 index 000000000..6f8ae0fe7 --- /dev/null +++ b/internal/staging/stage/templates_test.go @@ -0,0 +1,94 @@ +// Copyright 2023 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package stage + +import ( + "fmt" + "os" + "testing" + + "github.com/cockroachdb/cdc-sink/internal/types" + "github.com/cockroachdb/cdc-sink/internal/util/hlc" + "github.com/cockroachdb/cdc-sink/internal/util/ident" + "github.com/stretchr/testify/require" +) + +const rewriteExpcted = false + +func TestRewriteIsFalse(t *testing.T) { + require.False(t, rewriteExpcted) +} + +func TestTimeOrderTemplate(t *testing.T) { + r := require.New(t) + + r.NotNil(unstage) + + sch := ident.MustSchema(ident.New("my_db"), ident.Public) + tbl0 := ident.NewTable(sch, ident.New("tbl0")) + tbl1 := ident.NewTable(sch, ident.New("tbl1")) + tbl2 := ident.NewTable(sch, ident.New("tbl2")) + tbl3 := ident.NewTable(sch, ident.New("tbl3")) + + staging := ident.MustSchema(ident.New("_cdc_sink"), ident.Public) + + tcs := []struct { + name string + cursor *types.UnstageCursor + }{ + { + name: "unstage", + cursor: &types.UnstageCursor{ + StartAt: hlc.New(1, 2), + EndBefore: hlc.New(3, 4), + Targets: []ident.Table{tbl0, tbl1, tbl2, tbl3}, + }, + }, + { + name: "unstage_limit", + cursor: &types.UnstageCursor{ + StartAt: hlc.New(1, 2), + EndBefore: hlc.New(3, 4), + TimestampLimit: 22, + Targets: []ident.Table{tbl0, tbl1, tbl2, tbl3}, + UpdateLimit: 10000, + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + r := require.New(t) + data := &templateData{ + Cursor: tc.cursor, + StagingSchema: staging, + } + + out, err := data.Eval() + r.NoError(err) + + filename := fmt.Sprintf("./testdata/%s.sql", tc.name) + if rewriteExpcted { + r.NoError(os.WriteFile(filename, []byte(out), 0644)) + } else { + data, err := os.ReadFile(filename) + r.NoError(err) + r.Equal(string(data), out) + } + }) + } +} diff --git a/internal/staging/stage/testdata/unstage.sql b/internal/staging/stage/testdata/unstage.sql new file mode 100644 index 000000000..9e44b40d4 --- /dev/null +++ b/internal/staging/stage/testdata/unstage.sql @@ -0,0 +1,83 @@ +WITH hlc_0 (n, l) AS ( +SELECT nanos, logical +FROM "_cdc_sink"."public"."my_db_public_tbl0" +WHERE (nanos, logical, key) > ($1, $2, ($5::STRING[])[1]) +AND (nanos, logical) < ($3, $4) +AND NOT applied +GROUP BY nanos, logical +ORDER BY nanos, logical +LIMIT 1 +), +hlc_1 (n, l) AS ( +SELECT nanos, logical +FROM "_cdc_sink"."public"."my_db_public_tbl1" +WHERE (nanos, logical, key) > ($1, $2, ($5::STRING[])[2]) +AND (nanos, logical) < ($3, $4) +AND NOT applied +GROUP BY nanos, logical +ORDER BY nanos, logical +LIMIT 1 +), +hlc_2 (n, l) AS ( +SELECT nanos, logical +FROM "_cdc_sink"."public"."my_db_public_tbl2" +WHERE (nanos, logical, key) > ($1, $2, ($5::STRING[])[3]) +AND (nanos, logical) < ($3, $4) +AND NOT applied +GROUP BY nanos, logical +ORDER BY nanos, logical +LIMIT 1 +), +hlc_3 (n, l) AS ( +SELECT nanos, logical +FROM "_cdc_sink"."public"."my_db_public_tbl3" +WHERE (nanos, logical, key) > ($1, $2, ($5::STRING[])[4]) +AND (nanos, logical) < ($3, $4) +AND NOT applied +GROUP BY nanos, logical +ORDER BY nanos, logical +LIMIT 1 +), +hlc_all AS (SELECT n, l FROM hlc_0 UNION ALL +SELECT n, l FROM hlc_1 UNION ALL +SELECT n, l FROM hlc_2 UNION ALL +SELECT n, l FROM hlc_3), +hlc_min AS (SELECT n, l FROM hlc_all GROUP BY n, l ORDER BY n, l LIMIT 1), +data_0 AS ( +UPDATE "_cdc_sink"."public"."my_db_public_tbl0" +SET applied=true +FROM hlc_min +WHERE (nanos,logical) = (n, l) +AND (nanos, logical, key) > ($1, $2, ($5::STRING[])[1]) +AND NOT applied +RETURNING nanos, logical, key, mut, before), +data_1 AS ( +UPDATE "_cdc_sink"."public"."my_db_public_tbl1" +SET applied=true +FROM hlc_min +WHERE (nanos,logical) = (n, l) +AND (nanos, logical, key) > ($1, $2, ($5::STRING[])[2]) +AND NOT applied +RETURNING nanos, logical, key, mut, before), +data_2 AS ( +UPDATE "_cdc_sink"."public"."my_db_public_tbl2" +SET applied=true +FROM hlc_min +WHERE (nanos,logical) = (n, l) +AND (nanos, logical, key) > ($1, $2, ($5::STRING[])[3]) +AND NOT applied +RETURNING nanos, logical, key, mut, before), +data_3 AS ( +UPDATE "_cdc_sink"."public"."my_db_public_tbl3" +SET applied=true +FROM hlc_min +WHERE (nanos,logical) = (n, l) +AND (nanos, logical, key) > ($1, $2, ($5::STRING[])[4]) +AND NOT applied +RETURNING nanos, logical, key, mut, before) +SELECT * FROM ( +SELECT 0 idx, nanos, logical, key, mut, before FROM data_0 UNION ALL +SELECT 1 idx, nanos, logical, key, mut, before FROM data_1 UNION ALL +SELECT 2 idx, nanos, logical, key, mut, before FROM data_2 UNION ALL +SELECT 3 idx, nanos, logical, key, mut, before FROM data_3) +ORDER BY nanos, logical, idx, key \ No newline at end of file diff --git a/internal/staging/stage/testdata/unstage_limit.sql b/internal/staging/stage/testdata/unstage_limit.sql new file mode 100644 index 000000000..5067cdfd5 --- /dev/null +++ b/internal/staging/stage/testdata/unstage_limit.sql @@ -0,0 +1,91 @@ +WITH hlc_0 (n, l) AS ( +SELECT nanos, logical +FROM "_cdc_sink"."public"."my_db_public_tbl0" +WHERE (nanos, logical, key) > ($1, $2, ($5::STRING[])[1]) +AND (nanos, logical) < ($3, $4) +AND NOT applied +GROUP BY nanos, logical +ORDER BY nanos, logical +LIMIT 22 +), +hlc_1 (n, l) AS ( +SELECT nanos, logical +FROM "_cdc_sink"."public"."my_db_public_tbl1" +WHERE (nanos, logical, key) > ($1, $2, ($5::STRING[])[2]) +AND (nanos, logical) < ($3, $4) +AND NOT applied +GROUP BY nanos, logical +ORDER BY nanos, logical +LIMIT 22 +), +hlc_2 (n, l) AS ( +SELECT nanos, logical +FROM "_cdc_sink"."public"."my_db_public_tbl2" +WHERE (nanos, logical, key) > ($1, $2, ($5::STRING[])[3]) +AND (nanos, logical) < ($3, $4) +AND NOT applied +GROUP BY nanos, logical +ORDER BY nanos, logical +LIMIT 22 +), +hlc_3 (n, l) AS ( +SELECT nanos, logical +FROM "_cdc_sink"."public"."my_db_public_tbl3" +WHERE (nanos, logical, key) > ($1, $2, ($5::STRING[])[4]) +AND (nanos, logical) < ($3, $4) +AND NOT applied +GROUP BY nanos, logical +ORDER BY nanos, logical +LIMIT 22 +), +hlc_all AS (SELECT n, l FROM hlc_0 UNION ALL +SELECT n, l FROM hlc_1 UNION ALL +SELECT n, l FROM hlc_2 UNION ALL +SELECT n, l FROM hlc_3), +hlc_min AS (SELECT n, l FROM hlc_all GROUP BY n, l ORDER BY n, l LIMIT 22), +data_0 AS ( +UPDATE "_cdc_sink"."public"."my_db_public_tbl0" +SET applied=true +FROM hlc_min +WHERE (nanos,logical) = (n, l) +AND (nanos, logical, key) > ($1, $2, ($5::STRING[])[1]) +AND NOT applied +ORDER BY nanos, logical, key +LIMIT 10000 +RETURNING nanos, logical, key, mut, before), +data_1 AS ( +UPDATE "_cdc_sink"."public"."my_db_public_tbl1" +SET applied=true +FROM hlc_min +WHERE (nanos,logical) = (n, l) +AND (nanos, logical, key) > ($1, $2, ($5::STRING[])[2]) +AND NOT applied +ORDER BY nanos, logical, key +LIMIT 10000 +RETURNING nanos, logical, key, mut, before), +data_2 AS ( +UPDATE "_cdc_sink"."public"."my_db_public_tbl2" +SET applied=true +FROM hlc_min +WHERE (nanos,logical) = (n, l) +AND (nanos, logical, key) > ($1, $2, ($5::STRING[])[3]) +AND NOT applied +ORDER BY nanos, logical, key +LIMIT 10000 +RETURNING nanos, logical, key, mut, before), +data_3 AS ( +UPDATE "_cdc_sink"."public"."my_db_public_tbl3" +SET applied=true +FROM hlc_min +WHERE (nanos,logical) = (n, l) +AND (nanos, logical, key) > ($1, $2, ($5::STRING[])[4]) +AND NOT applied +ORDER BY nanos, logical, key +LIMIT 10000 +RETURNING nanos, logical, key, mut, before) +SELECT * FROM ( +SELECT 0 idx, nanos, logical, key, mut, before FROM data_0 UNION ALL +SELECT 1 idx, nanos, logical, key, mut, before FROM data_1 UNION ALL +SELECT 2 idx, nanos, logical, key, mut, before FROM data_2 UNION ALL +SELECT 3 idx, nanos, logical, key, mut, before FROM data_3) +ORDER BY nanos, logical, idx, key \ No newline at end of file diff --git a/internal/staging/version/versions.go b/internal/staging/version/versions.go index c9e7d9a53..b7feaca23 100644 --- a/internal/staging/version/versions.go +++ b/internal/staging/version/versions.go @@ -41,6 +41,7 @@ import ( var Versions = []Version{ {"Add versions table", 400}, {"Support single-level schema namespaces", 389}, + {"Track applied in staging table", 572}, } // A Version describes a breaking change in the cdc-sink metadata diff --git a/internal/types/types.go b/internal/types/types.go index 1b82f8292..5e3abe7b2 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -153,56 +153,71 @@ type Stager interface { // not occur within a single database transaction. Retire(ctx context.Context, db StagingQuerier, end hlc.Time) error - // Select will return all queued mutations between the timestamps. - Select(ctx context.Context, tx StagingQuerier, prev, next hlc.Time) ([]Mutation, error) - - // SelectPartial will return queued mutations between the - // timestamps. The after and limit arguments are used together when - // backfilling large amounts of data. - SelectPartial(ctx context.Context, tx StagingQuerier, prev, next hlc.Time, afterKey []byte, limit int) ([]Mutation, error) - // Store implementations should be idempotent. Store(ctx context.Context, db StagingQuerier, muts []Mutation) error - - // TransactionTimes returns distinct timestamps in the range - // (after, before] for which there is data in the associated table. - TransactionTimes(ctx context.Context, tx StagingQuerier, before, after hlc.Time) ([]hlc.Time, error) } -// SelectManyCallback is provided to Stagers.SelectMany to receive the +// UnstageCallback is provided to Stagers.SelectMany to receive the // incoming data. -type SelectManyCallback func(ctx context.Context, tbl ident.Table, mut Mutation) error - -// SelectManyCursor is used with Stagers.SelectMany. The After values -// will be updated by the method, allowing callers to call SelectMany -// in a loop until fewer than Limit values are returned. -type SelectManyCursor struct { - Start, End hlc.Time - Targets [][]ident.Table // The outer slice defines FK groupings. - Limit int - - // If true, we read all updates for parent tables before children, - // but make no guarantees around transactional boundaries. If false, - // we read some number of individual MVCC timestamps in their - // entirety. - Backfill bool - - OffsetKey json.RawMessage - OffsetTable ident.Table - OffsetTime hlc.Time +type UnstageCallback func(ctx context.Context, tbl ident.Table, mut Mutation) error + +// UnstageCursor is used with Stagers.SelectMany. The After values +// will be updated by the method, allowing callers to call Unstage +// in a loop until it returns false. +type UnstageCursor struct { + // A half-open interval: [ StartAt, EndBefore ) + StartAt, EndBefore hlc.Time + + // StartAfterKey is used when processing very large batches that + // occur within a single timestamp, to provide an additional offset + // for skipping already-processed rows. The implementation of + // [Stagers.SelectMany] will automatically populate this field. + StartAfterKey ident.TableMap[json.RawMessage] + + // Targets defines the order in which data for the selected tables + // will be passed to the results callback. + Targets []ident.Table + + // Limit the number of distinct MVCC timestamps that are returned. + // This will default to a single timestamp if unset. + TimestampLimit int + + // Limit the number of rows that are selected from any given staging + // table. The total number of rows that can be emitted is the limit + // multiplied by the number of tables listed in Targets. This will + // return all rows within the selected timestamp(s) if unset. + UpdateLimit int +} + +// Copy returns a copy of the cursor so that it may be updated. +func (c *UnstageCursor) Copy() *UnstageCursor { + cpy := &UnstageCursor{ + StartAt: c.StartAt, + EndBefore: c.EndBefore, + Targets: make([]ident.Table, len(c.Targets)), + TimestampLimit: c.TimestampLimit, + UpdateLimit: c.UpdateLimit, + } + c.StartAfterKey.CopyInto(&cpy.StartAfterKey) + copy(cpy.Targets, c.Targets) + return cpy } // Stagers is a factory for Stager instances. type Stagers interface { Get(ctx context.Context, target ident.Table) (Stager, error) - // SelectMany performs queries across multiple staging tables, to - // more readily support backfilling large amounts of data that may - // result from a changefeed's initial_scan. + // Unstage implements an exactly-once behavior of reading staged + // mutations. + // + // This method returns true if at least one row was returned. It + // will also return an updated cursor that allows a caller to resume + // reading. // - // This method will update the fields within the cursor so that it - // can be used to restart in case of interruption. - SelectMany(ctx context.Context, tx StagingQuerier, q *SelectManyCursor, fn SelectManyCallback) error + // Rows will be emitted in (time, table, key) order. As such, + // transaction boundaries can be detected by looking for a change in + // the timestamp values. + Unstage(ctx context.Context, tx StagingQuerier, q *UnstageCursor, fn UnstageCallback) (*UnstageCursor, bool, error) } // ColData hold SQL column metadata. diff --git a/migrations/pr_572.md b/migrations/pr_572.md new file mode 100644 index 000000000..d994a72f0 --- /dev/null +++ b/migrations/pr_572.md @@ -0,0 +1,40 @@ +# PR 572 Track Applied Mutations + +[PR 572](https://github.com/cockroachdb/cdc-sink/pull/572) + +Breaking schema change: + +Each staging table gains a new `applied` column which cdc-sink will use +to suppress duplicate updates during process restarts. The resolver loop +continues to save resolved-timestamp checkpoints, but partial progress +within a resolved-timestamp window is handled through the `applied` +column. + +Migration: +* Run the following `ALTER TABLE` command on each staging table before + upgrading cdc-sink: + +```sql +ALTER TABLE _cdc_sink.my_db_public_my_table +ADD COLUMN applied BOOL NOT NULL DEFAULT false + CREATE FAMILY hot, +ADD COLUMN source_time TIMESTAMPTZ + AS (to_timestamp(nanos::float/1e9)) VIRTUAL; +``` +* Mark the migration as complete: +```sql +UPSERT INTO _cdc_sink.memo (key, value) VALUES ('version-572','{"state":"applied"}'); +``` +* Stop all instances of cdc-sink, upgrade, and restart. +* cdc-sink will no longer delete un-applied mutations that may be before + the resolved-timestamp frontier. The mutations that were present + before the upgrade can either be manually deleted or marked as applied: + +```sql +UPDATE _cdc_sink.my_db_public_my_table + SET applied = true +WHERE nanos < ( + SELECT max(source_nanos) + FROM _cdc_sink.resolved_timestamps + WHERE target_applied_at IS NOT NULL) +```