Skip to content

Commit

Permalink
passed first part of user script
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhouXing19 committed Nov 21, 2024
1 parent 2387598 commit 4805a78
Show file tree
Hide file tree
Showing 11 changed files with 175 additions and 104 deletions.
50 changes: 32 additions & 18 deletions internal/sequencer/besteffort/best_effort.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ import (

"github.com/cockroachdb/field-eng-powertools/notify"
"github.com/cockroachdb/field-eng-powertools/stopper"
"github.com/cockroachdb/field-eng-powertools/stopvar"
"github.com/cockroachdb/replicator/internal/sequencer"
"github.com/cockroachdb/replicator/internal/sequencer/scheduler"
"github.com/cockroachdb/replicator/internal/types"
"github.com/cockroachdb/replicator/internal/util/hlc"
"github.com/cockroachdb/replicator/internal/util/ident"
"github.com/cockroachdb/replicator/internal/util/stopvar"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -148,26 +148,33 @@ func (s *bestEffort) Start(
ctx.Go(func(ctx *stopper.Context) error {
_, err := stopvar.DoWhenChanged(ctx, schemaData, watcher.GetNotify(),
func(ctx *stopper.Context, _, schemaData *types.SchemaData) error {
cfg, err := s.startGeneration(ctx, opts, schemaData, stats)
if cfg != nil {
if err := cfg.shutdown(); err != nil {
log.WithError(err).Warn("error while shutting down previous BestEffort")
} else {
log.Infof("closed previous BestEffort")
}
}
newCfg, err := s.startGeneration(ctx, opts, schemaData, stats)
if err != nil {
log.WithError(err).Warn("could not create new BestEffort sequencers")
return nil
}
oldCfg, _ := ret.config.Swap(cfg)
_, _ = ret.config.Swap(newCfg)
// Notify test code.
if s.schemaChanged != nil {
s.schemaChanged.Notify()
}
log.Debug("reconfigured BestEffort due to schema change")
if err := oldCfg.shutdown(); err != nil {
log.WithError(err).Warn("error while shutting down previous BestEffort")
}
return nil
})
return err
})

return ret, stats, nil
ctx.Go(func(ctx *stopper.Context) error {
return ret.Fanout(ctx)
})

return nil, stats, nil
}

// startGeneration creates the delegate sequences and returns a routing
Expand All @@ -186,7 +193,7 @@ func (s *bestEffort) startGeneration(
stats *notify.Var[sequencer.Stat],
) (*routerConfig, error) {
// Create a nested context.
ctx = stopper.WithContext(ctx)
childCtx := stopper.WithContext(ctx)

if opts.BatchReader == nil {
return nil, errors.New("batch reader is required for best effort mode")
Expand All @@ -197,8 +204,12 @@ func (s *bestEffort) startGeneration(
routes: make(map[*types.SchemaComponent]types.BatchReader),
schemaData: schemaData,
shutdown: func() error {
ctx.Stop(s.cfg.TaskGracePeriod)
return ctx.Wait()
childCtx.Stop(s.cfg.TaskGracePeriod)
if err := childCtx.Wait(); err != nil {
return err
}
log.Infof("best effort shut down finished")
return nil
},
}

Expand All @@ -210,9 +221,14 @@ func (s *bestEffort) startGeneration(
// Initialize the batch reader for the specific component.
subOpts.Group.Tables = comp.Order
subOpts.MaxDeferred = s.cfg.TimestampLimit
cfg.routes[comp] = &routePipe{batchChan: make(chan *types.BatchCursor)}
cfg.routes[comp] = &routePipe{
comp: comp,
outputChan: make(chan *types.BatchCursor),
}
subOpts.BatchReader = cfg.routes[comp]

subAcc, subStats, err := s.delegate.Start(ctx, subOpts)
// s.delegate is staging
_, subStats, err := s.delegate.Start(childCtx, subOpts)
if err != nil {
log.WithError(err).Warnf(
"BestEffort.Start: could not start nested Sequencer for %s", comp.Order)
Expand All @@ -225,20 +241,19 @@ func (s *bestEffort) startGeneration(
// TODO(janexing): no need to consider it at the first pass.
if len(comp.Order) == 1 {
log.Tracef("enabling direct path for %s", comp.Order[0])
subAcc = &directAcceptor{
_ = &directAcceptor{
BestEffort: s.BestEffort,
apply: subOpts.Delegate,
fallback: subAcc,
}
}

// Route incoming mutations to the component's sequencer.
// cfg.routes[comp] = subAcc

// Start a helper to aggregate the progress values together.
ctx.Go(func(ctx *stopper.Context) error {
childCtx.Go(func(inCtx *stopper.Context) error {
// Ignoring error since innermost callback returns nil.
_, _ = stopvar.DoWhenChanged(ctx, nil, subStats, func(ctx *stopper.Context, _, subStat sequencer.Stat) error {
_, _ = stopvar.DoWhenChanged(inCtx, nil, subStats, func(ctx *stopper.Context, _, subStat sequencer.Stat) error {
_, _, err := stats.Update(func(old sequencer.Stat) (sequencer.Stat, error) {
next := old.Copy()
subStat.Progress().CopyInto(next.Progress())
Expand All @@ -254,6 +269,5 @@ func (s *bestEffort) startGeneration(
return nil
})
}

return cfg, nil
}
130 changes: 67 additions & 63 deletions internal/sequencer/besteffort/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
package besteffort

import (
"context"

"github.com/cockroachdb/field-eng-powertools/notify"
"github.com/cockroachdb/field-eng-powertools/stopper"
"github.com/cockroachdb/replicator/internal/types"
"github.com/cockroachdb/replicator/internal/util/hlc"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)

// A router sends mutations to the sequencer associated with a target
Expand All @@ -45,82 +44,87 @@ type routerConfig struct {
}

type routePipe struct {
batchChan chan *types.BatchCursor
outputChan chan *types.BatchCursor
comp *types.SchemaComponent
}

func (p *routePipe) Read(ctx *stopper.Context) (<-chan *types.BatchCursor, error) {
return p.batchChan, nil
return p.outputChan, nil
}

// inputChan -> router --> component_1/output_chan_1
// --> component_2/output_chan_2

var _ types.MultiAcceptor = (*router)(nil)

// For each route.Read(), they should be able to notify the sibling route to stop
// when it encounters an error. It can notify the parent ctx to stop the ctx and thus
// stop the ctx for all children route.Read().

// AcceptMultiBatch splits the batch based on destination tables.
func (r *router) AcceptMultiBatch(
ctx context.Context, batch *types.MultiBatch, opts *types.AcceptOptions,
) error {
func (r *router) Fanout(ctx *stopper.Context) error {
cfg, _ := r.config.Get()
componentBatches := make(map[*types.SchemaComponent]*types.MultiBatch, len(cfg.routes))

// Aggregate sub-batches by destination table group.
for table, mut := range batch.Mutations() {
comp, ok := cfg.schemaData.TableComponents.Get(table)
if !ok {
return errors.Errorf("unknown table %s", table)
}

acc, ok := componentBatches[comp]
if !ok {
acc = &types.MultiBatch{}
componentBatches[comp] = acc
}
if err := acc.Accumulate(table, mut); err != nil {
return err
}
inputChan, err := cfg.intputChan.Read(ctx)
if err != nil {
return err
}

for comp, acc := range componentBatches {
destination, ok := cfg.routes[comp]
if !ok {
return errors.Errorf("encountered unmapped schema component %s", comp)
}
if err := destination.AcceptMultiBatch(ctx, acc, opts); err != nil {
return err
cursorCnt := 0
for {
componentBatches := make(map[*types.SchemaComponent]*types.TemporalBatch, len(cfg.routes))
select {
// Read from the source input.
case cursor, ok := <-inputChan:
cursorCnt++
for name, mut := range cursor.Batch.Data.All() {
log.Infof("[fanOut]received %d cursor: %s, %s", cursorCnt, name, mut.Time)
}
if !ok {
return nil
}
if cursor.Batch == nil {
continue
}
for table, mut := range cursor.Batch.Mutations() {
comp, ok := cfg.schemaData.TableComponents.Get(table)
if !ok {
return errors.Errorf("unknown table %s", table)
}
acc, ok := componentBatches[comp]
if !ok {
acc = &types.TemporalBatch{Time: mut.Time}
componentBatches[comp] = acc
}
if err := acc.Accumulate(table, mut); err != nil {
log.Errorf(err.Error())
return err
}
}

// Fan out the mutation to destinations.
for comp, tempBatch := range componentBatches {
destination, ok := cfg.routes[comp]
if !ok {
err = errors.Errorf("encountered unmapped schema component %s", comp)
log.Error(err.Error())
return err
}
compPipe := destination.(*routePipe)

cur := &types.BatchCursor{
Batch: tempBatch,
Progress: hlc.RangeIncluding(hlc.Zero(), tempBatch.Time),
}

select {
// To staging.
case compPipe.outputChan <- cur:
for name, mut := range cur.Batch.Data.All() {
log.Infof("[fanOut]pushed %d cursor: %s, %s", cursorCnt, name, mut.Time)
}
case <-ctx.Stopping():
}
}

case <-ctx.Stopping():
return nil
}
}

return nil
}

// AcceptTableBatch passes through to the table's acceptor.
func (r *router) AcceptTableBatch(
ctx context.Context, batch *types.TableBatch, opts *types.AcceptOptions,
) error {
cfg, _ := r.config.Get()
comp, ok := cfg.schemaData.TableComponents.Get(batch.Table)
if !ok {
return errors.Errorf("unknown table %s", batch.Table)
}
dest, ok := cfg.routes[comp]
if !ok {
return errors.Errorf("unknown component %s", comp)
}
return dest.AcceptTableBatch(ctx, batch, opts)
}

// AcceptTemporalBatch delegates to AcceptMultiBatch.
func (r *router) AcceptTemporalBatch(
ctx context.Context, batch *types.TemporalBatch, opts *types.AcceptOptions,
) error {
multi := &types.MultiBatch{
ByTime: map[hlc.Time]*types.TemporalBatch{batch.Time: batch},
Data: []*types.TemporalBatch{batch},
}
return r.AcceptMultiBatch(ctx, multi, opts)
}
53 changes: 41 additions & 12 deletions internal/sequencer/script/script_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/replicator/internal/sinktest"
"github.com/cockroachdb/replicator/internal/sinktest/all"
"github.com/cockroachdb/replicator/internal/sinktest/base"
"github.com/cockroachdb/replicator/internal/staging/stage"
"github.com/cockroachdb/replicator/internal/types"
"github.com/cockroachdb/replicator/internal/util/hlc"
"github.com/cockroachdb/replicator/internal/util/ident"
Expand Down Expand Up @@ -251,6 +252,20 @@ func TestUserScriptSequencer(t *testing.T) {
}
}

func requireStagingTblRowCnt(
ctx *stopper.Context, stagingTbl ident.Table, expectedCnt int, pool *types.StagingPool,
) error {
var res int
if err := pool.QueryRow(ctx, "SELECT COUNT(*) FROM %s", stagingTbl.Table()).Scan(&res); err != nil {
return errors.Wrapf(err, "failed to query rows for staging table %s", stagingTbl.Table())
}
if res != expectedCnt {
return fmt.Errorf("expected %d rows, got %d", expectedCnt, res)
}
log.Infof("finished verify rows for staging table %s", stagingTbl.Table())
return nil
}

func testUserScriptSequencer(t *testing.T, baseMode switcher.Mode) {
r := require.New(t)

Expand Down Expand Up @@ -367,6 +382,14 @@ api.configureTable("t_2", {
})
r.NoError(err)

progress, progressMade := stats.Get()

retryAttempt, err := retry.NewRetry(retry.Settings{
InitialBackoff: 1 * time.Second,
Multiplier: 1,
MaxRetries: 300,
})

const numEmits = 5
endTime := hlc.New(numEmits+1, 0)
for i := 0; i < numEmits; i++ {
Expand All @@ -380,17 +403,28 @@ api.configureTable("t_2", {
},
},
))
t.Logf("pushed for k %d", i)
// t.Logf("pushed for k %d", i)
}

fakeSrc.PushTemporalBatch(ctx, &types.TemporalBatch{
Time: endTime,
})
r.NoError(retryAttempt.Do(ctx, func() error {
for _, tgt := range tgts {
res, err := base.GetRowCount(ctx, fixture.StagingPool, stage.StagingTable(fixture.StagingDB.Schema(), tgt))
if err != nil {
return err
}
if res != numEmits {
return fmt.Errorf("expected %d rows for table %s, got %d", numEmits, tgt, res)
}
}
return nil
}, func(err error) {
t.Logf("retrying checking staging tbl count: %s", err.Error())
}))

t.Logf("pushed for sentinel")
require.NoError(t, checkpointGroup.Advance(ctx, checkpointGroup.TableGroup().Name, endTime))
t.Logf("advanced bounds to %s", endTime)

// Wait for (async) replication.
progress, progressMade := stats.Get()
for {
targetProgress := sequencer.CommonProgress(progress)
if hlc.Compare(targetProgress.MaxInclusive(), endTime) >= 0 {
Expand All @@ -406,12 +440,6 @@ api.configureTable("t_2", {
}
}

retryAttempt, err := retry.NewRetry(retry.Settings{
InitialBackoff: 1 * time.Second,
Multiplier: 1,
MaxRetries: 300,
})

r.NoError(err)

t.Logf("Verifying that the script transformed the values")
Expand Down Expand Up @@ -454,6 +482,7 @@ api.configureTable("t_2", {
},
},
))
log.Infof("[fakeSrc] pushed delete for %d", i+1)
}

fakeSrc.PushTemporalBatch(ctx, &types.TemporalBatch{
Expand Down
Loading

0 comments on commit 4805a78

Please sign in to comment.