Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

source: use pull model for all frontends #1065

Draft
wants to merge 26 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
d1f24b5
source: use pull model for mylogical and pglogical
ZhouXing19 Nov 6, 2024
d2e6bec
TestMYLogical/consistent passed
ZhouXing19 Nov 6, 2024
ceb8e5b
passed for both pg and mysql integration tests
ZhouXing19 Nov 7, 2024
81285d6
return correct acceptor for script/wrapper.Start()
ZhouXing19 Nov 7, 2024
b781637
draft for script, passed for integreation test (mylogical/pglogical)
ZhouXing19 Nov 8, 2024
2043edd
removed mu.acceptor
ZhouXing19 Nov 8, 2024
064a782
draft for webhook
ZhouXing19 Nov 8, 2024
95ef9e1
fixed for TestCommand with webhook, but slow
ZhouXing19 Nov 11, 2024
10b48f3
finished fixing for TestHandler/immediate-feed/webhookEndpoints
ZhouXing19 Nov 11, 2024
2a79057
fixed for TestMergeInt
ZhouXing19 Nov 12, 2024
27d2702
fixed TestHandler/immediate
ZhouXing19 Nov 12, 2024
d1240d4
fixed TestHandler/feed
ZhouXing19 Nov 12, 2024
521e6ae
fixed TestHandler
ZhouXing19 Nov 13, 2024
7502537
TestKafka passed
ZhouXing19 Nov 13, 2024
37fe3bf
fixed TestApplyWithinRange
ZhouXing19 Nov 14, 2024
95c7bb1
fixed TestLocalProcess
ZhouXing19 Nov 14, 2024
648828a
fixed TestSmoke
ZhouXing19 Nov 15, 2024
2c4fd4f
draft for TestUserScriptSequencer/ModeConsistent
ZhouXing19 Nov 16, 2024
c31e535
[stagingErrChans] working for TestUserScriptSequencer/ModeConsistent
ZhouXing19 Nov 18, 2024
2387598
convo with bob about staging and besteffort
ZhouXing19 Nov 19, 2024
4805a78
passed first part of user script
ZhouXing19 Nov 21, 2024
9848464
worked for besteffort and consistent, broke immediate
ZhouXing19 Nov 22, 2024
38d5e44
TestUserScriptSequencer fully passed
ZhouXing19 Nov 23, 2024
013905a
passed TestHandler/feed/deferred/webhookEndpoints with outcome check
ZhouXing19 Nov 23, 2024
cd92bfc
fixed TestHandler and TestKafka
ZhouXing19 Nov 25, 2024
73c414f
add some noisy logging
ZhouXing19 Dec 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/cmd/workload/workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func TestDemoCommand(t *testing.T) {

// This is a black-box test to ensure the run command fires up.
func TestRunCommand(t *testing.T) {
log.SetLevel(log.TraceLevel)
const testTime = 5 * time.Second
t.Parallel()
r := require.New(t)
Expand Down
67 changes: 42 additions & 25 deletions internal/conveyor/conveyor.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,19 @@ type Conveyors struct {
}
}

// Get returns a conveyor for a specific schema.
func (c *Conveyors) Get(schema ident.Schema) (*Conveyor, error) {
type CheckpointBatchReader interface {
types.BatchReader
// MaybeSetConveyor only applies to struct with is schema-bound, such
// as kafka.Conn. For struct that is not schema-bound, such as cdc.Handler,
// this func should be no-op.
MaybeSetConveyor(conveyor *Conveyor)
}

func (c *Conveyors) NewGet(
schema ident.Schema,
batchReader CheckpointBatchReader,
seqStartOptFuncs ...sequencer.StartOptionsFunc,
) (*Conveyor, error) {
c.mu.RLock()
found, ok := c.mu.targets.Get(schema)
c.mu.RUnlock()
Expand Down Expand Up @@ -120,25 +131,35 @@ func (c *Conveyors) Get(schema ident.Schema) (*Conveyor, error) {
return nil, err
}

ret.acceptor, ret.stat, err = seq.Start(
c.stopper,
&sequencer.StartOptions{
Bounds: &ret.resolvingRange,
Delegate: types.OrderedAcceptorFrom(c.tableAcceptor, c.watchers),
Group: tableGroup,
})
if err != nil {
return nil, err
}

// Add top-of-funnel reporting.
orderedAcceptor := types.OrderedAcceptorFrom(c.tableAcceptor, c.watchers)
labels := []string{c.kind, schema.Raw()}
ret.acceptor = types.CountingAcceptor(ret.acceptor,
dekegateAcceptor := types.CountingAcceptor(orderedAcceptor,
mutationsErrorCount.WithLabelValues(labels...),
mutationsReceivedCount.WithLabelValues(labels...),
mutationsSuccessCount.WithLabelValues(labels...),
)

seqStartOpts := &sequencer.StartOptions{
BatchReader: batchReader,
Bounds: &ret.resolvingRange,
Delegate: dekegateAcceptor,
Group: tableGroup,
}

for _, act := range seqStartOptFuncs {
act(seqStartOpts)
}

batchReader.MaybeSetConveyor(ret)

_, ret.stat, err = seq.Start(
c.stopper,
seqStartOpts,
)
if err != nil {
return nil, err
}

// Advance the stored resolved timestamps.
ret.updateResolved(c.stopper)

Expand All @@ -149,6 +170,7 @@ func (c *Conveyors) Get(schema ident.Schema) (*Conveyor, error) {
ret.metrics(c.stopper)

c.mu.targets.Put(schema, ret)

return ret, nil
}

Expand All @@ -160,13 +182,16 @@ func (c *Conveyors) WithKind(kind string) *Conveyors {
}

// Bootstrap existing schemas for recovery cases.
func (c *Conveyors) Bootstrap() error {
// TODO(janexing): Is this correct?
func (c *Conveyors) Bootstrap(
batchReader CheckpointBatchReader, seqStartOptFuncs ...sequencer.StartOptionsFunc,
) error {
schemas, err := c.checkpoints.ScanForTargetSchemas(c.stopper)
if err != nil {
return err
}
for _, sch := range schemas {
_, err := c.Get(sch)
_, err := c.NewGet(sch, batchReader, seqStartOptFuncs...)
if err != nil {
return err
}
Expand All @@ -193,7 +218,6 @@ func (c *Conveyors) clone() *Conveyors {
// It provides an abstraction over various delivery strategies and it
// manages checkpoints across multiple partitions for a table group.
type Conveyor struct {
acceptor types.MultiAcceptor // Possibly-async writes to the target.
checkpoint *checkpoint.Group // Persistence of checkpoint (fka. resolved) timestamps
factory *Conveyors // Factory that created this conveyor.
mode notify.Var[switcher.Mode] // Switchable strategies.
Expand All @@ -203,13 +227,6 @@ type Conveyor struct {
watcher types.Watcher // Schema info.
}

// AcceptMultiBatch transmits the batch. The options may be nil.
func (c *Conveyor) AcceptMultiBatch(
ctx context.Context, batch *types.MultiBatch, options *types.AcceptOptions,
) error {
return c.acceptor.AcceptMultiBatch(ctx, batch, options)
}

// Advance the checkpoint for all the named partitions.
func (c *Conveyor) Advance(ctx context.Context, partition ident.Ident, ts hlc.Time) error {
return c.checkpoint.Advance(ctx, partition, ts)
Expand Down
2 changes: 2 additions & 0 deletions internal/script/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/replicator/internal/util/pjson"
"github.com/dop251/goja"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)

// A JS function which is provided with an array of applyOp.
Expand Down Expand Up @@ -74,6 +75,7 @@ func newApplier(parent *UserScript, table ident.Table, apply applyJS) *applier {
func (a *applier) AcceptTableBatch(
ctx context.Context, batch *types.TableBatch, opts *types.AcceptOptions,
) error {
log.Debug("entered applier.AcceptTableBatch, going to run apply in script")
if opts == nil || opts.TargetQuerier == nil {
return errors.New("UserScript apply function cannot be called without an explicit transaction")
}
Expand Down
3 changes: 3 additions & 0 deletions internal/script/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/dop251/goja"
"github.com/google/uuid"
"github.com/google/wire"
log "github.com/sirupsen/logrus"
)

// Set is used by Wire.
Expand Down Expand Up @@ -119,6 +120,8 @@ func ProvideLoader(
return nil, err
}

log.Infof("script loader set")

return l, nil
}

Expand Down
67 changes: 48 additions & 19 deletions internal/sequencer/besteffort/best_effort.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ import (

"github.com/cockroachdb/field-eng-powertools/notify"
"github.com/cockroachdb/field-eng-powertools/stopper"
"github.com/cockroachdb/field-eng-powertools/stopvar"
"github.com/cockroachdb/replicator/internal/sequencer"
"github.com/cockroachdb/replicator/internal/sequencer/scheduler"
"github.com/cockroachdb/replicator/internal/types"
"github.com/cockroachdb/replicator/internal/util/hlc"
"github.com/cockroachdb/replicator/internal/util/ident"
"github.com/cockroachdb/replicator/internal/util/stopvar"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -147,46 +148,68 @@ func (s *bestEffort) Start(
ctx.Go(func(ctx *stopper.Context) error {
_, err := stopvar.DoWhenChanged(ctx, schemaData, watcher.GetNotify(),
func(ctx *stopper.Context, _, schemaData *types.SchemaData) error {
cfg, err := s.startGeneration(ctx, opts, schemaData, stats)
if cfg != nil {
if err := cfg.shutdown(); err != nil {
log.WithError(err).Warn("error while shutting down previous BestEffort")
} else {
log.Infof("closed previous BestEffort")
}
}
newCfg, err := s.startGeneration(ctx, opts, schemaData, stats)
if err != nil {
log.WithError(err).Warn("could not create new BestEffort sequencers")
return nil
}
oldCfg, _ := ret.config.Swap(cfg)
_, _ = ret.config.Swap(newCfg)
// Notify test code.
if s.schemaChanged != nil {
s.schemaChanged.Notify()
}
log.Debug("reconfigured BestEffort due to schema change")
if err := oldCfg.shutdown(); err != nil {
log.WithError(err).Warn("error while shutting down previous BestEffort")
}
return nil
})
return err
})

return ret, stats, nil
ctx.Go(func(ctx *stopper.Context) error {
return ret.Fanout(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of doing a fanout here? Does this allow us to perform batching across different tables? How is this fan out affected if they have FK constraints?

})

return nil, stats, nil
}

// startGeneration creates the delegate sequences and returns a routing
// configuration to map incoming requests. The delegates will execute
// with a nested stopper.
//
// Input Pipe 1 --->
// \
// --> [ Component Core ]
// /
// Input Pipe 2 --->
func (s *bestEffort) startGeneration(
ctx *stopper.Context,
opts *sequencer.StartOptions,
schemaData *types.SchemaData,
stats *notify.Var[sequencer.Stat],
) (*routerConfig, error) {
// Create a nested context.
ctx = stopper.WithContext(ctx)
childCtx := stopper.WithContext(ctx)

if opts.BatchReader == nil {
return nil, errors.New("batch reader is required for best effort mode")
}

cfg := &routerConfig{
routes: make(map[*types.SchemaComponent]types.MultiAcceptor),
intputChan: opts.BatchReader,
routes: make(map[*types.SchemaComponent]types.BatchReader),
schemaData: schemaData,
shutdown: func() error {
ctx.Stop(s.cfg.TaskGracePeriod)
return ctx.Wait()
childCtx.Stop(s.cfg.TaskGracePeriod)
if err := childCtx.Wait(); err != nil {
return err
}
log.Infof("best effort shut down finished")
return nil
},
}

Expand All @@ -195,10 +218,17 @@ func (s *bestEffort) startGeneration(
// relationships can be swept in a coordinated fashion.
for _, comp := range schemaData.Components {
subOpts := opts.Copy()
// Initialize the batch reader for the specific component.
subOpts.Group.Tables = comp.Order
subOpts.MaxDeferred = s.cfg.TimestampLimit
cfg.routes[comp] = &routePipe{
comp: comp,
outputChan: make(chan *types.BatchCursor),
}
subOpts.BatchReader = cfg.routes[comp]

subAcc, subStats, err := s.delegate.Start(ctx, subOpts)
// s.delegate is staging
_, subStats, err := s.delegate.Start(childCtx, subOpts)
if err != nil {
log.WithError(err).Warnf(
"BestEffort.Start: could not start nested Sequencer for %s", comp.Order)
Expand All @@ -208,22 +238,22 @@ func (s *bestEffort) startGeneration(
// This is a special case for single-table groups, where
// we'll try to write directly to the target table, rather
// than wait for an entire stage-apply cycle.
// TODO(janexing): no need to consider it at the first pass.
if len(comp.Order) == 1 {
log.Tracef("enabling direct path for %s", comp.Order[0])
subAcc = &directAcceptor{
_ = &directAcceptor{
BestEffort: s.BestEffort,
apply: subOpts.Delegate,
fallback: subAcc,
}
}

// Route incoming mutations to the component's sequencer.
cfg.routes[comp] = subAcc
// cfg.routes[comp] = subAcc

// Start a helper to aggregate the progress values together.
ctx.Go(func(ctx *stopper.Context) error {
childCtx.Go(func(inCtx *stopper.Context) error {
// Ignoring error since innermost callback returns nil.
_, _ = stopvar.DoWhenChanged(ctx, nil, subStats, func(ctx *stopper.Context, _, subStat sequencer.Stat) error {
_, _ = stopvar.DoWhenChanged(inCtx, nil, subStats, func(ctx *stopper.Context, _, subStat sequencer.Stat) error {
_, _, err := stats.Update(func(old sequencer.Stat) (sequencer.Stat, error) {
next := old.Copy()
subStat.Progress().CopyInto(next.Progress())
Expand All @@ -239,6 +269,5 @@ func (s *bestEffort) startGeneration(
return nil
})
}

return cfg, nil
}
7 changes: 7 additions & 0 deletions internal/sequencer/besteffort/direct.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"time"

"github.com/cockroachdb/field-eng-powertools/stopper"
"github.com/cockroachdb/replicator/internal/types"
"github.com/cockroachdb/replicator/internal/util/lockset"
"github.com/cockroachdb/replicator/internal/util/metrics"
Expand All @@ -34,6 +35,12 @@ type directAcceptor struct {
*BestEffort
apply types.MultiAcceptor
fallback types.MultiAcceptor

batchChan chan *types.BatchCursor
}

func (a *directAcceptor) Read(ctx *stopper.Context) (<-chan *types.BatchCursor, error) {
return a.batchChan, nil
}

var _ types.TableAcceptor = (*directAcceptor)(nil)
Expand Down
Loading
Loading