Skip to content

Commit

Permalink
stage: Track applied mutations in staging tables
Browse files Browse the repository at this point in the history
This change moves tracking of partially-applied resolved timestamp windows into
the staging tables by adding a new `applied` column. The goal of this change is
to move some state-tracking out of the cdc resolver loop into the stage
package. Tracking apply status on a per-mutation basis improves idempotency of
cdc-sink when the userscript has non-idempotent behaviors (e.g.: three-way
merge). It also allows us to export monitoring data around mutations which may
have slipped through the cracks or to detect when a migration process has
completely drained. Fine-grained tracking will also be useful for unifying the
non-transactional modes into a single behavior.

Many unused methods in the stage API have been deleted. The "unstaging" SQL
query is now generated with a golang template and is tested similarly to the
apply package.

The cdc package performs less work to track partial application of large
individual changes. It just persists the contents of the UnstageCursor as a
performance enhancement. Exactly-once behavior is provided by the applied
column.

The change to `server/integration_test.go` is due to the unstage processing
being a one-shot. The test being performed duplicates an existing test in
`cdc/handler_test.go`.

Breaking change: The `--selectBatchSize` flag is deprecated in favor of two
different flags `--largeTransactionLimit` and `--timestampWindowSize` which,
respectively, enable partial processing of a single, over-sized transaction and
a general limit on the total amount of data to be unstaged.

Breaking change: A staging schema migraion is required, this is documented in
the migrations directory.

X-Ref: #487
X-Ref: #504
X-Ref: #565
  • Loading branch information
bobvawter committed Oct 31, 2023
1 parent b8a2f9f commit 785d167
Show file tree
Hide file tree
Showing 18 changed files with 1,067 additions and 925 deletions.
32 changes: 32 additions & 0 deletions internal/sinktest/all/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
"github.com/cockroachdb/cdc-sink/internal/types"
"github.com/cockroachdb/cdc-sink/internal/util/applycfg"
"github.com/cockroachdb/cdc-sink/internal/util/diag"
"github.com/cockroachdb/cdc-sink/internal/util/hlc"
"github.com/cockroachdb/cdc-sink/internal/util/ident"
"github.com/jackc/pgx/v5"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -76,3 +78,33 @@ func (f *Fixture) CreateTargetTable(
}
return ti, err
}

// PeekStaged peeks at the data which has been staged for the target
// table between the given timestamps.
func (f *Fixture) PeekStaged(
ctx context.Context, tbl ident.Table, startAt, endBefore hlc.Time,
) ([]types.Mutation, error) {
tx, err := f.StagingPool.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return nil, errors.WithStack(err)
}
defer func() { _ = tx.Rollback(ctx) }()

q := &types.UnstageCursor{
StartAt: startAt,
EndBefore: endBefore,
Targets: []ident.Table{tbl},
}
var ret []types.Mutation
for selecting := true; selecting; {
q, selecting, err = f.Stagers.Unstage(ctx, tx, q,
func(ctx context.Context, tbl ident.Table, mut types.Mutation) error {
ret = append(ret, mut)
return nil
})
if err != nil {
return nil, err
}
}
return ret, nil
}
85 changes: 56 additions & 29 deletions internal/source/cdc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ import (
)

const (
defaultBackupPolling = 100 * time.Millisecond
defaultFlushBatchSize = 1_000
defaultSelectBatchSize = 10_000
defaultNDJsonBuffer = bufio.MaxScanTokenSize // 64k
defaultBackupPolling = 100 * time.Millisecond
defaultIdealBatchSize = 1000
defaultMetaTable = "resolved_timestamps"
defaultLargeTransactionLimit = 250_000 // Chosen arbitrarily.
defaultNDJsonBuffer = bufio.MaxScanTokenSize // 64k
defaultTimestampWindowSize = 1000
)

// Config adds CDC-specific configuration to the core logical loop.
Expand All @@ -43,34 +45,45 @@ type Config struct {
// timestamps will receive the incoming resolved-timestamp message.
BackupPolling time.Duration

// Don't coalesce updates from different source MVCC timestamps into
// a single destination transaction. Setting this will preserve the
// original structure of updates from the source, but may decrease
// overall throughput by increasing the total number of target
// database transactions.
// If true, the resolver loop will behave as though
// TimestampWindowSize has been set to 1. That is, each unique
// timestamp within a resolved-timestamp window will be processed,
// instead of fast-forwarding to the latest values within a batch.
// This flag is relevant for use-cases that employ a merge function
// which must see every incremental update in order to produce a
// meaningful result.
FlushEveryTimestamp bool

// Coalesce timestamps within a resolved-timestamp window until
// at least this many mutations have been collected.
// This setting controls a soft maximum on the number of rows that
// will be flushed in one target transaction.
IdealFlushBatchSize int

// The maximum amount of data to buffer when reading a single line
// of ndjson input. This can be increased if the source cluster
// has large blob values.
NDJsonBuffer int
// If non-zero and the number of rows in a single timestamp exceeds
// this value, allow that source transaction to be applied over
// multiple target transactions.
LargeTransactionLimit int

// The name of the resolved_timestamps table.
MetaTableName ident.Ident

// The number of rows to retrieve when loading staged data.
SelectBatchSize int
// The maximum amount of data to buffer when reading a single line
// of ndjson input. This can be increased if the source cluster
// has large blob values.
NDJsonBuffer int

// Retain staged, applied data for an extra amount of time. This
// allows, for example, additional time to validate what was staged
// versus what was applied. When set to zero, staged mutations may
// be retired as soon as there is a resolved timestamp greater than
// the timestamp of the mutation.
RetireOffset time.Duration

// The maximum number of source transactions to unstage at once.
// This does not place a hard limit on the number of mutations that
// may be dequeued at once, but it does reduce the total number of
// round-trips to the staging database when the average transaction
// size is small.
TimestampWindowSize int
}

// Bind adds configuration flags to the set.
Expand All @@ -80,19 +93,27 @@ func (c *Config) Bind(f *pflag.FlagSet) {
f.DurationVar(&c.BackupPolling, "backupPolling", defaultBackupPolling,
"poll for resolved timestamps from other instances of cdc-sink")
f.BoolVar(&c.FlushEveryTimestamp, "flushEveryTimestamp", false,
"preserve intermediate updates from the source in transactional mode; "+
"don't fast-forward to the latest row values within a resolved timestamp; "+
"may negatively impact throughput")
f.IntVar(&c.IdealFlushBatchSize, "idealFlushBatchSize", defaultFlushBatchSize,
"try to apply at least this many mutations per resolved-timestamp window")
f.IntVar(&c.IdealFlushBatchSize, "idealFlushBatchSize", defaultIdealBatchSize,
"a soft limit on the number of rows to send to the target at once")
f.IntVar(&c.LargeTransactionLimit, "largeTransactionLimit", defaultLargeTransactionLimit,
"if non-zero, all source transactions with more than this "+
"number of rows may be applied in multiple target transactions")
f.IntVar(&c.NDJsonBuffer, "ndjsonBufferSize", defaultNDJsonBuffer,
"the maximum amount of data to buffer while reading a single line of ndjson input; "+
"increase when source cluster has large blob values")
f.Var(ident.NewValue("resolved_timestamps", &c.MetaTableName), "metaTable",
f.Var(ident.NewValue(defaultMetaTable, &c.MetaTableName), "metaTable",
"the name of the table in which to store resolved timestamps")
f.IntVar(&c.SelectBatchSize, "selectBatchSize", defaultSelectBatchSize,
"the number of rows to select at once when reading staged data")
f.DurationVar(&c.RetireOffset, "retireOffset", 0,
"retain staged, applied data for an extra duration")
"if non-zero, retain staged, applied data for an extra duration")
f.IntVar(&c.TimestampWindowSize, "timestampWindowSize", defaultTimestampWindowSize,
"the maximum number of source transaction timestamps to unstage at once")

var deprecated int
const msg = "replaced by --largeTransactionLimit and --timestampWindowSize"
f.IntVar(&deprecated, "selectBatchSize", 0, "")
_ = f.MarkDeprecated("selectBatchSize", msg)
}

// Preflight implements logical.Config.
Expand All @@ -105,20 +126,26 @@ func (c *Config) Preflight() error {
c.BackupPolling = defaultBackupPolling
}
if c.IdealFlushBatchSize == 0 {
c.IdealFlushBatchSize = defaultFlushBatchSize
c.IdealFlushBatchSize = defaultIdealBatchSize
}
// Zero is legal; implies no limit.
if c.LargeTransactionLimit < 0 {
return errors.New("largeTransactionLimit must be >= 0")
}
if c.NDJsonBuffer == 0 {
c.NDJsonBuffer = defaultNDJsonBuffer
}
if c.MetaTableName.Empty() {
return errors.New("no metadata table specified")
}
if c.SelectBatchSize == 0 {
c.SelectBatchSize = defaultSelectBatchSize
c.MetaTableName = ident.New(defaultMetaTable)
}
if c.RetireOffset < 0 {
return errors.New("retireOffset must be >= 0")
}
if c.TimestampWindowSize < 0 {
return errors.New("timestampWindowSize must be >= 0")
} else if c.TimestampWindowSize == 0 {
c.TimestampWindowSize = defaultTimestampWindowSize
}

return nil
}
79 changes: 12 additions & 67 deletions internal/source/cdc/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,8 @@ func testQueryHandler(t *testing.T, htc *fixtureConfig) {

// Verify staged data, if applicable.
if !htc.immediate {
stager, err := fixture.Stagers.Get(ctx, tableInfo.Name())
muts, err := fixture.PeekStaged(ctx, tableInfo.Name(), hlc.Zero(), hlc.New(1, 1))
if a.NoError(err) {
muts, err := stager.Select(ctx, fixture.StagingPool.Pool, hlc.Zero(), hlc.New(1, 0))
a.NoError(err)
a.Len(muts, 2)
// The order is stable since the underlying query
// is ordered, in part, by the key.
Expand Down Expand Up @@ -229,40 +227,14 @@ func testQueryHandler(t *testing.T, htc *fixtureConfig) {
`),
}))

// Verify staged data, if applicable.
// Verify staged and un-applied data, if applicable.
if !htc.immediate {
stager, err := fixture.Stagers.Get(ctx, tableInfo.Name())
muts, err := fixture.PeekStaged(ctx, tableInfo.Name(), hlc.Zero(), hlc.New(10, 1))
if a.NoError(err) {
muts, err := stager.Select(ctx, fixture.StagingPool.Pool, hlc.Zero(), hlc.New(10, 0))
a.NoError(err)
a.Len(muts, 6)
a.Len(muts, 2)
// The order is stable since the underlying query
// orders by HLC and key.
a.Equal([]types.Mutation{
// Original entries.
{
Data: []byte(`{"pk":42,"v":99}`),
Key: []byte(`[42]`),
Time: hlc.New(1, 0),
},
{
Before: []byte(`{"pk":99,"v":33}`),
Data: []byte(`{"pk":99,"v":42}`),
Key: []byte(`[99]`),
Time: hlc.New(1, 0),
},
// These are the deletes from above.
{
Data: []byte(`null`),
Key: []byte(`[42]`),
Time: hlc.New(3, 0),
},
{
Data: []byte(`null`),
Key: []byte(`[99]`),
Time: hlc.New(3, 0),
},
// These are the entries we just created.
{
Data: []byte(`{"pk":42,"v":99}`),
Key: []byte(`[42]`),
Expand Down Expand Up @@ -384,10 +356,8 @@ func testHandler(t *testing.T, cfg *fixtureConfig) {

// Verify staged data, if applicable.
if !cfg.immediate {
stager, err := fixture.Stagers.Get(ctx, jumbleName())
muts, err := fixture.PeekStaged(ctx, tableInfo.Name(), hlc.Zero(), hlc.New(1, 1))
if a.NoError(err) {
muts, err := stager.Select(ctx, fixture.StagingPool.Pool, hlc.Zero(), hlc.New(1, 0))
a.NoError(err)
a.Len(muts, 2)
// The order is stable since the underlying query
// orders, in part, on key
Expand Down Expand Up @@ -454,40 +424,14 @@ func testHandler(t *testing.T, cfg *fixtureConfig) {
`, jumbleName().Table())),
}))

// Verify staged data, if applicable.
// Verify staged and as-yet-unapplied data.
if !cfg.immediate {
stager, err := fixture.Stagers.Get(ctx, jumbleName())
muts, err := fixture.PeekStaged(ctx, tableInfo.Name(), hlc.Zero(), hlc.New(10, 1))
if a.NoError(err) {
muts, err := stager.Select(ctx, fixture.StagingPool.Pool, hlc.Zero(), hlc.New(10, 0))
a.NoError(err)
a.Len(muts, 6)
a.Len(muts, 2)
// The order is stable since the underlying query
// orders by HLC and key.
a.Equal([]types.Mutation{
// Original entries.
{
Data: []byte(`{ "pk" : 42, "v" : 99 }`),
Key: []byte(`[ 42 ]`),
Time: hlc.New(1, 0),
},
{
Before: []byte(`{ "pk" : 99, "v" : 33 }`),
Data: []byte(`{ "pk" : 99, "v" : 42 }`),
Key: []byte(`[ 99 ]`),
Time: hlc.New(1, 0),
},
// These are the deletes from above.
{
Data: []byte(`null`),
Key: []byte(`[ 42 ]`),
Time: hlc.New(3, 0),
},
{
Data: []byte(`null`),
Key: []byte(`[ 99 ]`),
Time: hlc.New(3, 0),
},
// These are the entries we just created.
{
Data: []byte(`{ "pk" : 42, "v" : 99 }`),
Key: []byte(`[ 42 ]`),
Expand Down Expand Up @@ -655,6 +599,7 @@ func TestConcurrentHandlers(t *testing.T) {
func testMassBackfillWithForeignKeys(
t *testing.T, rowCount, fixtureCount int, fns ...func(*Config),
) {
t.Parallel() // We spend most of our time waiting for a resolved timestamp.
r := require.New(t)

baseFixture, cancel, err := all.NewFixture()
Expand All @@ -675,9 +620,9 @@ func testMassBackfillWithForeignKeys(

for idx := range fixtures {
cfg := &Config{
IdealFlushBatchSize: 123, // Pick weird batch sizes.
SelectBatchSize: 587,
MetaTableName: ident.New("resolved_timestamps"),
LargeTransactionLimit: rowCount / 3, // Read the same timestamp more than once.
MetaTableName: ident.New("resolved_timestamps"),
TimestampWindowSize: 100,
BaseConfig: logical.BaseConfig{
ApplyTimeout: time.Second,
FanShards: 16,
Expand Down
Loading

0 comments on commit 785d167

Please sign in to comment.