From 4805a78b89eb1b254fef848bbeca51e578bfc460 Mon Sep 17 00:00:00 2001 From: ZhouXing19 Date: Thu, 21 Nov 2024 11:17:43 -0500 Subject: [PATCH] passed first part of user script --- internal/sequencer/besteffort/best_effort.go | 50 ++++--- internal/sequencer/besteffort/router.go | 130 ++++++++++--------- internal/sequencer/script/script_test.go | 53 ++++++-- internal/sequencer/script/source_acceptor.go | 9 ++ internal/sequencer/sequtil/copier.go | 13 +- internal/sequencer/staging/staging.go | 10 ++ internal/source/objstore/conn_test.go | 1 + internal/staging/stage/stage.go | 6 +- internal/staging/stage/table_reader.go | 4 +- internal/target/schemawatch/watcher.go | 1 + internal/types/batch_reader.go | 2 + 11 files changed, 175 insertions(+), 104 deletions(-) diff --git a/internal/sequencer/besteffort/best_effort.go b/internal/sequencer/besteffort/best_effort.go index 5338b265..32b23a76 100644 --- a/internal/sequencer/besteffort/best_effort.go +++ b/internal/sequencer/besteffort/best_effort.go @@ -23,12 +23,12 @@ import ( "github.com/cockroachdb/field-eng-powertools/notify" "github.com/cockroachdb/field-eng-powertools/stopper" - "github.com/cockroachdb/field-eng-powertools/stopvar" "github.com/cockroachdb/replicator/internal/sequencer" "github.com/cockroachdb/replicator/internal/sequencer/scheduler" "github.com/cockroachdb/replicator/internal/types" "github.com/cockroachdb/replicator/internal/util/hlc" "github.com/cockroachdb/replicator/internal/util/ident" + "github.com/cockroachdb/replicator/internal/util/stopvar" "github.com/pkg/errors" log "github.com/sirupsen/logrus" ) @@ -148,26 +148,33 @@ func (s *bestEffort) Start( ctx.Go(func(ctx *stopper.Context) error { _, err := stopvar.DoWhenChanged(ctx, schemaData, watcher.GetNotify(), func(ctx *stopper.Context, _, schemaData *types.SchemaData) error { - cfg, err := s.startGeneration(ctx, opts, schemaData, stats) + if cfg != nil { + if err := cfg.shutdown(); err != nil { + log.WithError(err).Warn("error while shutting down previous BestEffort") + } else { + log.Infof("closed previous BestEffort") + } + } + newCfg, err := s.startGeneration(ctx, opts, schemaData, stats) if err != nil { log.WithError(err).Warn("could not create new BestEffort sequencers") return nil } - oldCfg, _ := ret.config.Swap(cfg) + _, _ = ret.config.Swap(newCfg) // Notify test code. if s.schemaChanged != nil { s.schemaChanged.Notify() } - log.Debug("reconfigured BestEffort due to schema change") - if err := oldCfg.shutdown(); err != nil { - log.WithError(err).Warn("error while shutting down previous BestEffort") - } return nil }) return err }) - return ret, stats, nil + ctx.Go(func(ctx *stopper.Context) error { + return ret.Fanout(ctx) + }) + + return nil, stats, nil } // startGeneration creates the delegate sequences and returns a routing @@ -186,7 +193,7 @@ func (s *bestEffort) startGeneration( stats *notify.Var[sequencer.Stat], ) (*routerConfig, error) { // Create a nested context. - ctx = stopper.WithContext(ctx) + childCtx := stopper.WithContext(ctx) if opts.BatchReader == nil { return nil, errors.New("batch reader is required for best effort mode") @@ -197,8 +204,12 @@ func (s *bestEffort) startGeneration( routes: make(map[*types.SchemaComponent]types.BatchReader), schemaData: schemaData, shutdown: func() error { - ctx.Stop(s.cfg.TaskGracePeriod) - return ctx.Wait() + childCtx.Stop(s.cfg.TaskGracePeriod) + if err := childCtx.Wait(); err != nil { + return err + } + log.Infof("best effort shut down finished") + return nil }, } @@ -210,9 +221,14 @@ func (s *bestEffort) startGeneration( // Initialize the batch reader for the specific component. subOpts.Group.Tables = comp.Order subOpts.MaxDeferred = s.cfg.TimestampLimit - cfg.routes[comp] = &routePipe{batchChan: make(chan *types.BatchCursor)} + cfg.routes[comp] = &routePipe{ + comp: comp, + outputChan: make(chan *types.BatchCursor), + } + subOpts.BatchReader = cfg.routes[comp] - subAcc, subStats, err := s.delegate.Start(ctx, subOpts) + // s.delegate is staging + _, subStats, err := s.delegate.Start(childCtx, subOpts) if err != nil { log.WithError(err).Warnf( "BestEffort.Start: could not start nested Sequencer for %s", comp.Order) @@ -225,10 +241,9 @@ func (s *bestEffort) startGeneration( // TODO(janexing): no need to consider it at the first pass. if len(comp.Order) == 1 { log.Tracef("enabling direct path for %s", comp.Order[0]) - subAcc = &directAcceptor{ + _ = &directAcceptor{ BestEffort: s.BestEffort, apply: subOpts.Delegate, - fallback: subAcc, } } @@ -236,9 +251,9 @@ func (s *bestEffort) startGeneration( // cfg.routes[comp] = subAcc // Start a helper to aggregate the progress values together. - ctx.Go(func(ctx *stopper.Context) error { + childCtx.Go(func(inCtx *stopper.Context) error { // Ignoring error since innermost callback returns nil. - _, _ = stopvar.DoWhenChanged(ctx, nil, subStats, func(ctx *stopper.Context, _, subStat sequencer.Stat) error { + _, _ = stopvar.DoWhenChanged(inCtx, nil, subStats, func(ctx *stopper.Context, _, subStat sequencer.Stat) error { _, _, err := stats.Update(func(old sequencer.Stat) (sequencer.Stat, error) { next := old.Copy() subStat.Progress().CopyInto(next.Progress()) @@ -254,6 +269,5 @@ func (s *bestEffort) startGeneration( return nil }) } - return cfg, nil } diff --git a/internal/sequencer/besteffort/router.go b/internal/sequencer/besteffort/router.go index ec876bd8..a7285ff1 100644 --- a/internal/sequencer/besteffort/router.go +++ b/internal/sequencer/besteffort/router.go @@ -17,13 +17,12 @@ package besteffort import ( - "context" - "github.com/cockroachdb/field-eng-powertools/notify" "github.com/cockroachdb/field-eng-powertools/stopper" "github.com/cockroachdb/replicator/internal/types" "github.com/cockroachdb/replicator/internal/util/hlc" "github.com/pkg/errors" + log "github.com/sirupsen/logrus" ) // A router sends mutations to the sequencer associated with a target @@ -45,82 +44,87 @@ type routerConfig struct { } type routePipe struct { - batchChan chan *types.BatchCursor + outputChan chan *types.BatchCursor + comp *types.SchemaComponent } func (p *routePipe) Read(ctx *stopper.Context) (<-chan *types.BatchCursor, error) { - return p.batchChan, nil + return p.outputChan, nil } // inputChan -> router --> component_1/output_chan_1 // --> component_2/output_chan_2 -var _ types.MultiAcceptor = (*router)(nil) - // For each route.Read(), they should be able to notify the sibling route to stop // when it encounters an error. It can notify the parent ctx to stop the ctx and thus // stop the ctx for all children route.Read(). // AcceptMultiBatch splits the batch based on destination tables. -func (r *router) AcceptMultiBatch( - ctx context.Context, batch *types.MultiBatch, opts *types.AcceptOptions, -) error { +func (r *router) Fanout(ctx *stopper.Context) error { cfg, _ := r.config.Get() - componentBatches := make(map[*types.SchemaComponent]*types.MultiBatch, len(cfg.routes)) - - // Aggregate sub-batches by destination table group. - for table, mut := range batch.Mutations() { - comp, ok := cfg.schemaData.TableComponents.Get(table) - if !ok { - return errors.Errorf("unknown table %s", table) - } - - acc, ok := componentBatches[comp] - if !ok { - acc = &types.MultiBatch{} - componentBatches[comp] = acc - } - if err := acc.Accumulate(table, mut); err != nil { - return err - } + inputChan, err := cfg.intputChan.Read(ctx) + if err != nil { + return err } - - for comp, acc := range componentBatches { - destination, ok := cfg.routes[comp] - if !ok { - return errors.Errorf("encountered unmapped schema component %s", comp) - } - if err := destination.AcceptMultiBatch(ctx, acc, opts); err != nil { - return err + cursorCnt := 0 + for { + componentBatches := make(map[*types.SchemaComponent]*types.TemporalBatch, len(cfg.routes)) + select { + // Read from the source input. + case cursor, ok := <-inputChan: + cursorCnt++ + for name, mut := range cursor.Batch.Data.All() { + log.Infof("[fanOut]received %d cursor: %s, %s", cursorCnt, name, mut.Time) + } + if !ok { + return nil + } + if cursor.Batch == nil { + continue + } + for table, mut := range cursor.Batch.Mutations() { + comp, ok := cfg.schemaData.TableComponents.Get(table) + if !ok { + return errors.Errorf("unknown table %s", table) + } + acc, ok := componentBatches[comp] + if !ok { + acc = &types.TemporalBatch{Time: mut.Time} + componentBatches[comp] = acc + } + if err := acc.Accumulate(table, mut); err != nil { + log.Errorf(err.Error()) + return err + } + } + + // Fan out the mutation to destinations. + for comp, tempBatch := range componentBatches { + destination, ok := cfg.routes[comp] + if !ok { + err = errors.Errorf("encountered unmapped schema component %s", comp) + log.Error(err.Error()) + return err + } + compPipe := destination.(*routePipe) + + cur := &types.BatchCursor{ + Batch: tempBatch, + Progress: hlc.RangeIncluding(hlc.Zero(), tempBatch.Time), + } + + select { + // To staging. + case compPipe.outputChan <- cur: + for name, mut := range cur.Batch.Data.All() { + log.Infof("[fanOut]pushed %d cursor: %s, %s", cursorCnt, name, mut.Time) + } + case <-ctx.Stopping(): + } + } + + case <-ctx.Stopping(): + return nil } } - - return nil -} - -// AcceptTableBatch passes through to the table's acceptor. -func (r *router) AcceptTableBatch( - ctx context.Context, batch *types.TableBatch, opts *types.AcceptOptions, -) error { - cfg, _ := r.config.Get() - comp, ok := cfg.schemaData.TableComponents.Get(batch.Table) - if !ok { - return errors.Errorf("unknown table %s", batch.Table) - } - dest, ok := cfg.routes[comp] - if !ok { - return errors.Errorf("unknown component %s", comp) - } - return dest.AcceptTableBatch(ctx, batch, opts) -} - -// AcceptTemporalBatch delegates to AcceptMultiBatch. -func (r *router) AcceptTemporalBatch( - ctx context.Context, batch *types.TemporalBatch, opts *types.AcceptOptions, -) error { - multi := &types.MultiBatch{ - ByTime: map[hlc.Time]*types.TemporalBatch{batch.Time: batch}, - Data: []*types.TemporalBatch{batch}, - } - return r.AcceptMultiBatch(ctx, multi, opts) } diff --git a/internal/sequencer/script/script_test.go b/internal/sequencer/script/script_test.go index a19478c9..a4fc0a31 100644 --- a/internal/sequencer/script/script_test.go +++ b/internal/sequencer/script/script_test.go @@ -35,6 +35,7 @@ import ( "github.com/cockroachdb/replicator/internal/sinktest" "github.com/cockroachdb/replicator/internal/sinktest/all" "github.com/cockroachdb/replicator/internal/sinktest/base" + "github.com/cockroachdb/replicator/internal/staging/stage" "github.com/cockroachdb/replicator/internal/types" "github.com/cockroachdb/replicator/internal/util/hlc" "github.com/cockroachdb/replicator/internal/util/ident" @@ -251,6 +252,20 @@ func TestUserScriptSequencer(t *testing.T) { } } +func requireStagingTblRowCnt( + ctx *stopper.Context, stagingTbl ident.Table, expectedCnt int, pool *types.StagingPool, +) error { + var res int + if err := pool.QueryRow(ctx, "SELECT COUNT(*) FROM %s", stagingTbl.Table()).Scan(&res); err != nil { + return errors.Wrapf(err, "failed to query rows for staging table %s", stagingTbl.Table()) + } + if res != expectedCnt { + return fmt.Errorf("expected %d rows, got %d", expectedCnt, res) + } + log.Infof("finished verify rows for staging table %s", stagingTbl.Table()) + return nil +} + func testUserScriptSequencer(t *testing.T, baseMode switcher.Mode) { r := require.New(t) @@ -367,6 +382,14 @@ api.configureTable("t_2", { }) r.NoError(err) + progress, progressMade := stats.Get() + + retryAttempt, err := retry.NewRetry(retry.Settings{ + InitialBackoff: 1 * time.Second, + Multiplier: 1, + MaxRetries: 300, + }) + const numEmits = 5 endTime := hlc.New(numEmits+1, 0) for i := 0; i < numEmits; i++ { @@ -380,17 +403,28 @@ api.configureTable("t_2", { }, }, )) - t.Logf("pushed for k %d", i) + // t.Logf("pushed for k %d", i) } - fakeSrc.PushTemporalBatch(ctx, &types.TemporalBatch{ - Time: endTime, - }) + r.NoError(retryAttempt.Do(ctx, func() error { + for _, tgt := range tgts { + res, err := base.GetRowCount(ctx, fixture.StagingPool, stage.StagingTable(fixture.StagingDB.Schema(), tgt)) + if err != nil { + return err + } + if res != numEmits { + return fmt.Errorf("expected %d rows for table %s, got %d", numEmits, tgt, res) + } + } + return nil + }, func(err error) { + t.Logf("retrying checking staging tbl count: %s", err.Error()) + })) - t.Logf("pushed for sentinel") + require.NoError(t, checkpointGroup.Advance(ctx, checkpointGroup.TableGroup().Name, endTime)) + t.Logf("advanced bounds to %s", endTime) // Wait for (async) replication. - progress, progressMade := stats.Get() for { targetProgress := sequencer.CommonProgress(progress) if hlc.Compare(targetProgress.MaxInclusive(), endTime) >= 0 { @@ -406,12 +440,6 @@ api.configureTable("t_2", { } } - retryAttempt, err := retry.NewRetry(retry.Settings{ - InitialBackoff: 1 * time.Second, - Multiplier: 1, - MaxRetries: 300, - }) - r.NoError(err) t.Logf("Verifying that the script transformed the values") @@ -454,6 +482,7 @@ api.configureTable("t_2", { }, }, )) + log.Infof("[fakeSrc] pushed delete for %d", i+1) } fakeSrc.PushTemporalBatch(ctx, &types.TemporalBatch{ diff --git a/internal/sequencer/script/source_acceptor.go b/internal/sequencer/script/source_acceptor.go index 878d0ef0..ab316173 100644 --- a/internal/sequencer/script/source_acceptor.go +++ b/internal/sequencer/script/source_acceptor.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/replicator/internal/types" "github.com/cockroachdb/replicator/internal/util/ident" "github.com/pkg/errors" + log "github.com/sirupsen/logrus" ) // A sourceAcceptor is responsible for the actions wired up to a @@ -53,6 +54,10 @@ func (a *sourceAcceptor) Read(ctx *stopper.Context) (<-chan *types.BatchCursor, if !open { return nil } + for name, mut := range batchCursor.Batch.Data.All() { + log.Infof("[source acceptor] received for table %s with cursor %s", name, mut.Time) + } + postProcessedBatch, err := a.processMutsFromCursor(ctx, batchCursor) if err != nil { return err @@ -63,9 +68,13 @@ func (a *sourceAcceptor) Read(ctx *stopper.Context) (<-chan *types.BatchCursor, } select { case retCh <- cur: + for name, mut := range cur.Batch.Data.All() { + log.Infof("[source acceptor] pushed for table %s with cursor %s", name, mut.Time) + } case <-ctx.Stopping(): return nil } + cur.Ack.Set(true) case <-ctx.Stopping(): return nil } diff --git a/internal/sequencer/sequtil/copier.go b/internal/sequencer/sequtil/copier.go index 3f44ad59..e52cea6b 100644 --- a/internal/sequencer/sequtil/copier.go +++ b/internal/sequencer/sequtil/copier.go @@ -115,13 +115,12 @@ top: } // TODO(janexing): debugging only, remove later. - if cursor.Batch == nil || (cursor.Batch != nil && cursor.Batch.Data.Len() != 0) { - batchData := cursor.Batch.Data - for table, mut := range batchData.All() { - log.Infof("got cursor: table %s, time: %s", table.String(), mut.Time) - } + if cursor == nil || cursor.Batch == nil || cursor.Batch.Data.Len() == 0 { + log.Infof("[copier] got empty batch, with progress %s", cursor.Progress) } else { - log.Infof("got empty batch, with progress %s", cursor.Progress) + for table, mut := range cursor.Batch.Data.All() { + log.Infof("[copier] got cursor: table %s, time: %s", table.String(), mut.Time) + } } // There was an error reading from staging. @@ -173,6 +172,8 @@ top: return ret, nil } + cursor.Ack.Set(true) + // We can continue to accumulate data in subsequent passes. // This will start a timer on the second pass to ensure that // we flush within a reasonable time period. diff --git a/internal/sequencer/staging/staging.go b/internal/sequencer/staging/staging.go index 1d6f41c0..e181670b 100644 --- a/internal/sequencer/staging/staging.go +++ b/internal/sequencer/staging/staging.go @@ -101,9 +101,11 @@ func (s *staging) StageMutations( } ctx.Go(func(ctx *stopper.Context) error { + log.Infof("[stageMutations] start lisening to mutations") // TODO(janexing): debug only, remove later. cntMap := make(map[ident.Table]int) + cursorCnt := 0 for { select { case cursor := <-input: @@ -112,6 +114,7 @@ func (s *staging) StageMutations( // to continue reading the staged data. // TODO(janexing): make it test only. if cursor.Batch.Data.Len() == 0 { + log.Infof("[stageMutations]sentinel received, advancing bounds") if err := cpGroup.Advance(ctx, cpGroup.TableGroup().Name, cursor.Progress.Max()); err != nil { log.Errorf("failed to advanced the checkpoint for %s to %s", cpGroup.TableGroup().Name, cursor.Progress.Max()) } else { @@ -119,6 +122,11 @@ func (s *staging) StageMutations( } continue } + cursorCnt++ + for name, mut := range cursor.Batch.Data.All() { + log.Infof("[stageMutations]received %d cursor: %s, %s", cursorCnt, name, mut.Time) + } + for table, tableBatch := range cursor.Batch.Data.All() { // TODO(janexing): parallize across tables. stager, err := s.stagers.Get(ctx, table) @@ -156,8 +164,10 @@ func (s *staging) StageMutations( } log.Infof("staged table %s with %d elements", table.Table(), cntMap[table]) } + cursor.Ack.Set(true) case <-ctx.Stopping(): + log.Infof("[stageMutations]received signal to stop") return nil } } diff --git a/internal/source/objstore/conn_test.go b/internal/source/objstore/conn_test.go index d7e2026a..00bd9562 100644 --- a/internal/source/objstore/conn_test.go +++ b/internal/source/objstore/conn_test.go @@ -136,6 +136,7 @@ func TestApplyWithinRange(t *testing.T) { a.NotNil(res) a.NotNil(res.Batch) tracker.storeBatchTime(ctx, res.Batch.Time, nil) + res.Ack.Set(true) case <-ctx.Stopping(): break } diff --git a/internal/staging/stage/stage.go b/internal/staging/stage/stage.go index ec2ca026..f496a1a2 100644 --- a/internal/staging/stage/stage.go +++ b/internal/staging/stage/stage.go @@ -47,9 +47,9 @@ import ( // having been applied without having been previously staged. var stubSentinel = json.RawMessage(`{"__stub__":true}`) -// stagingTable returns the staging table name that will store mutations +// StagingTable returns the staging table name that will store mutations // for the given target table. -func stagingTable(stagingDB ident.Schema, target ident.Table) ident.Table { +func StagingTable(stagingDB ident.Schema, target ident.Table) ident.Table { target = target.Canonical() // Use lower-cased version of the table. mangled := ident.Join(target, ident.Raw, '_') return ident.NewTable(stagingDB, ident.New(mangled)) @@ -113,7 +113,7 @@ CREATE TABLE IF NOT EXISTS %[1]s ( // mutations to be applied to the given target table. func (f *factory) newStage(target ident.Table) (*stage, error) { ctx := f.stop - table := stagingTable(f.stagingDB, target) + table := StagingTable(f.stagingDB, target) keyIdx := ident.New(table.Table().Raw() + "_key_applied") // Try to create the staging table with a helper virtual column. We // never query for it, so it should have essentially no cost. diff --git a/internal/staging/stage/table_reader.go b/internal/staging/stage/table_reader.go index 233973e7..8f86bfc9 100644 --- a/internal/staging/stage/table_reader.go +++ b/internal/staging/stage/table_reader.go @@ -92,7 +92,7 @@ func newTableReader( bounds: bounds, db: db, fragmentSize: fragmentSize, - sqlQ: fmt.Sprintf(readTableTemplate, stagingTable(stagingDB, target)), + sqlQ: fmt.Sprintf(readTableTemplate, StagingTable(stagingDB, target)), out: out, table: target, @@ -129,7 +129,7 @@ func (r *tableReader) run(ctx *stopper.Context) { if !bounds.Empty() { cursor := r.nextCursor(ctx) - log.Infof("[%d]read %d muts for table %s", iterCnt, cursor.Batch.Count(), r.table) + log.Infof("[tableReader.run][%d]read %d muts for table %s", iterCnt, cursor.Batch.Count(), r.table) // Indicate discontinuity. cursor.Jump = didJump diff --git a/internal/target/schemawatch/watcher.go b/internal/target/schemawatch/watcher.go index bef61956..b32bd78d 100644 --- a/internal/target/schemawatch/watcher.go +++ b/internal/target/schemawatch/watcher.go @@ -211,6 +211,7 @@ func (w *watcher) getTables(ctx context.Context, tx *types.TargetPool) (*types.S // Normalize the database name to however it's defined in // the target. var dbNameRaw string + // fmt.Printf("checking database name: %s\n", parts[0]) if err := tx.QueryRowContext(ctx, databaseTemplateCrdb, parts[0].Raw()).Scan(&dbNameRaw); err != nil { if errors.Is(err, sql.ErrNoRows) { return errors.Errorf("unknown schema: %s", w.schema) diff --git a/internal/types/batch_reader.go b/internal/types/batch_reader.go index 07f33e6b..f0bd63ba 100644 --- a/internal/types/batch_reader.go +++ b/internal/types/batch_reader.go @@ -59,6 +59,8 @@ type BatchCursor struct { // staging tables. Progress hlc.Range + Ack notify.Var[bool] + // TODO(janexing): a notify.Var[error] // If not nil, Outcome.Set() should be called Outcome *notify.Var[error]