Skip to content

Commit

Permalink
Memo-based backup of the database version
Browse files Browse the repository at this point in the history
This allows replicator to start without the database.

Added a shell script to run precommit checks at qol/fmt.sh.
  • Loading branch information
MattWhelan committed Oct 3, 2024
1 parent afae39d commit d23e8a8
Show file tree
Hide file tree
Showing 24 changed files with 666 additions and 149 deletions.
4 changes: 4 additions & 0 deletions internal/cmd/preflight/preflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"time"

"github.com/cockroachdb/field-eng-powertools/stopper"
"github.com/cockroachdb/replicator/internal/sinktest"
"github.com/cockroachdb/replicator/internal/staging/memo"
"github.com/cockroachdb/replicator/internal/types"
"github.com/cockroachdb/replicator/internal/util/stdpool"
"github.com/pkg/errors"
Expand Down Expand Up @@ -77,6 +79,8 @@ func testTargetConnection(ctx *stopper.Context, connString string) error {
pool, err := stdpool.OpenTarget(
ctx,
connString,
stdpool.ProvideBackup(&memo.Memory{}, nil),
sinktest.NewBreakers(),
stdpool.WithConnectionLifetime(5*time.Minute, time.Minute, 15*time.Second),
stdpool.WithTransactionTimeout(time.Minute),
)
Expand Down
17 changes: 11 additions & 6 deletions internal/cmd/workload/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type clientConfig struct {
requestTimeout time.Duration
resolvedInterval time.Duration
retryMin, retryMax time.Duration
suffix string

childTable ident.Table // Derived from targetSchema
parentTable ident.Table // Derived from targetSchema
Expand All @@ -85,6 +86,8 @@ func (c *clientConfig) Bind(flags *pflag.FlagSet) {
"the maximum delay between HTTP retry attempts")
flags.DurationVar(&c.retryMin, "retryMin", defaultRetryMin,
"the minimum delay between HTTP retry attempts")
flags.StringVar(&c.suffix, "suffix", "",
"parent/child table suffix")
flags.StringVar(&c.token, "token", "",
"JWT bearer token if security is enabled")
flags.StringVar(&c.url, "url", defaultURL,
Expand Down Expand Up @@ -133,18 +136,20 @@ func (c *clientConfig) Preflight() error {
}
}
if c.childTable.Empty() {
c.childTable = ident.NewTable(c.targetSchema, ident.New("child"))
c.childTable = ident.NewTable(c.targetSchema, ident.New("child"+c.suffix))
}
if c.parentTable.Empty() {
c.parentTable = ident.NewTable(c.targetSchema, ident.New("parent"))
c.parentTable = ident.NewTable(c.targetSchema, ident.New("parent"+c.suffix))
}
return nil
}

func (c *clientConfig) createTables(ctx *stopper.Context, targetPool *types.TargetPool) error {
// We need a 64-bit type.
bigType := "BIGINT"
uniq := fmt.Sprintf("_%d_%d", os.Getpid(), rand.Int32N(10000))
if c.suffix == "" {
c.suffix = fmt.Sprintf("_%d_%d", os.Getpid(), rand.Int32N(10000))
}

// Create the tables within the "current" schema specified on the
// command-line. We'll use uniquely-named tables to ensure that
Expand Down Expand Up @@ -179,8 +184,8 @@ func (c *clientConfig) createTables(ctx *stopper.Context, targetPool *types.Targ
if err != nil {
return err
}
c.parentTable = ident.NewTable(c.targetSchema, ident.New("parent"+uniq))
c.childTable = ident.NewTable(c.targetSchema, ident.New("child"+uniq))
c.parentTable = ident.NewTable(c.targetSchema, ident.New("parent"+c.suffix))
c.childTable = ident.NewTable(c.targetSchema, ident.New("child"+c.suffix))

if _, err := targetPool.ExecContext(ctx, fmt.Sprintf(
`CREATE TABLE %s(parent %[2]s PRIMARY KEY, val %[2]s DEFAULT 0 NOT NULL)`,
Expand All @@ -194,7 +199,7 @@ child %[4]s PRIMARY KEY,
parent %[4]s NOT NULL,
val %[4]s DEFAULT 0 NOT NULL,
CONSTRAINT parent_fk%[2]s FOREIGN KEY(parent) REFERENCES %[3]s(parent)
)`, c.childTable, uniq, c.parentTable, bigType)); err != nil {
)`, c.childTable, c.suffix, c.parentTable, bigType)); err != nil {
return errors.WithStack(err)
}

Expand Down
6 changes: 6 additions & 0 deletions internal/cmd/workload/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package workload

import (
"fmt"

"github.com/cockroachdb/field-eng-powertools/stopper"
"github.com/cockroachdb/replicator/internal/source/cdc/server"
"github.com/cockroachdb/replicator/internal/util/workload"
Expand Down Expand Up @@ -71,6 +73,10 @@ func pcDemo() *cobra.Command {
return err
}

fmt.Printf("Changefeed target URL: %s\n", cfg.url)
fmt.Printf("Parent table: %s\n", cfg.parentTable)
fmt.Printf("Child table: %s\n", cfg.childTable)

if !serverCfg.HTTP.DisableAuth {
if err := cfg.generateJWT(ctx, svr); err != nil {
return err
Expand Down
162 changes: 162 additions & 0 deletions internal/cmd/workload/workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import (
"github.com/cockroachdb/field-eng-powertools/notify"
"github.com/cockroachdb/field-eng-powertools/stopper"
"github.com/cockroachdb/replicator/internal/sinkprod"
"github.com/cockroachdb/replicator/internal/sinktest"
"github.com/cockroachdb/replicator/internal/sinktest/all"
"github.com/cockroachdb/replicator/internal/sinktest/base"
"github.com/cockroachdb/replicator/internal/source/cdc/server"
"github.com/cockroachdb/replicator/internal/types"
"github.com/cockroachdb/replicator/internal/util/hlc"
"github.com/cockroachdb/replicator/internal/util/stdserver"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -119,6 +121,166 @@ func TestWorkload(t *testing.T) {
workload.CheckConsistent(ctx, t)
}

func initSystem(
t *testing.T,
) (
*runner,
*types.TableGroup,
*server.Server,
*all.Workload,
*stopper.Context,
*sinktest.Breakers,
) {
r := require.New(t)

fixture, err := all.NewFixture(t)
r.NoError(err)
ctx := fixture.Context

serverCfg := &server.Config{
HTTP: stdserver.Config{
BindAddr: "127.0.0.1:0",
GenerateSelfSigned: true,
},
Staging: sinkprod.StagingConfig{
CommonConfig: sinkprod.CommonConfig{
Conn: fixture.StagingPool.ConnectionString,
},
CreateSchema: true,
Schema: fixture.StagingDB.Schema(),
},
Target: sinkprod.TargetConfig{
CommonConfig: sinkprod.CommonConfig{
Conn: fixture.TargetPool.ConnectionString,
},
},
}

workload, group, err := fixture.NewWorkload(ctx, &all.WorkloadConfig{})
r.NoError(err)

cfg := &clientConfig{
childTable: workload.Child.Name(),
parentTable: workload.Parent.Name(),
targetSchema: fixture.TargetSchema.Schema(),
}

svr, err := cfg.newServer(ctx, serverCfg)
r.NoError(err)

r.NoError(cfg.initURL(svr.GetListener()))

r.NoError(cfg.generateJWT(ctx, svr))

// Create a runner, but inject the generator from above. This will
// allow us to validate the behavior later.
runner, err := cfg.newRunner(ctx, workload.GeneratorBase)
r.NoError(err)

return runner, group, svr, workload, ctx, fixture.Breakers
}

// This is a white-box test that creates a server and executes the
// workload against it for a few seconds, stops the workload, then
// starts it again, but this time with a failed connection to the
// target database. The target database connection then recovers
// after a period.
func TestColdStart(t *testing.T) {
t.Parallel()
const initialTime = 5 * time.Second
const targetDownTime = 5 * time.Second
const recoveryTime = 5 * time.Second

runner, group, svr, workload, ctx, _ := initSystem(t)

// Phase one: initial connection and run

// Create a nested stopper, so we can run the workload generator
// for a period of time.
runnerCtx := stopper.WithContext(ctx)
runnerCtx.Go(func(runnerCtx *stopper.Context) error {
return runner.Run(runnerCtx)
})

r := require.New(t)
// Wait for a bit.
select {
case <-time.After(initialTime):
log.Info("waiting for runner context to finish")
runnerCtx.Stop(time.Second)
r.NoError(runnerCtx.Wait())
case <-ctx.Stopping():
r.Fail("test context stopping")
}

var resolvedRange notify.Var[hlc.Range]
_, err := svr.Checkpoints.Start(ctx, group, &resolvedRange)
r.NoError(err)
for {
progress, changed := resolvedRange.Get()
if hlc.Compare(progress.Min(), runner.lastResolved) >= 0 {
break
}
log.Infof("waiting for resolved timestamp progress: %s vs %s", progress, runner.lastResolved)
select {
case <-changed:
case <-ctx.Done():
r.NoError(ctx.Err())
}
}
log.Infof("resolved timestamps have caught up; validating initial workload")

workload.CheckConsistent(ctx, t)

// Second phase: start the workload with the target down
runner, group, svr, workload, ctx, breakers := initSystem(t)
breakers.TargetConnectionFails.Store(true)

// Create a nested stopper, so we can run the workload generator
// for a period of time.
runnerCtx = stopper.WithContext(ctx)
runnerCtx.Go(func(runnerCtx *stopper.Context) error {
return runner.Run(runnerCtx)
})

// Wait for a bit, re-enable target connections
select {
case <-time.After(targetDownTime):
breakers.TargetConnectionFails.Store(false)
case <-ctx.Stopping():
r.Fail("test context stopping")
}

// Third phase: target recovers, and we proceed
select {
case <-time.After(recoveryTime):
log.Info("waiting for runner context to finish")
runnerCtx.Stop(time.Second)
r.NoError(runnerCtx.Wait())
case <-ctx.Stopping():
r.Fail("test context stopping")
}

var resolvedRangeFinal notify.Var[hlc.Range]
_, err = svr.Checkpoints.Start(ctx, group, &resolvedRangeFinal)
r.NoError(err)
for {
progress, changed := resolvedRangeFinal.Get()
if hlc.Compare(progress.Min(), runner.lastResolved) >= 0 {
break
}
log.Infof("waiting for resolved timestamp progress: %s vs %s", progress, runner.lastResolved)
select {
case <-changed:
case <-ctx.Done():
r.NoError(ctx.Err())
}
}
log.Infof("resolved timestamps have caught up; validating final workload")

workload.CheckConsistent(ctx, t)
}

// This is a black-box test to ensure the demo command fires up.
func TestDemoCommand(t *testing.T) {
const testTime = 5 * time.Second
Expand Down
4 changes: 4 additions & 0 deletions internal/sinkprod/sinkprod.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ package sinkprod
import (
"time"

"github.com/cockroachdb/replicator/internal/sinktest"
"github.com/cockroachdb/replicator/internal/util/stdpool"
"github.com/google/wire"
)

Expand All @@ -29,7 +31,9 @@ var Set = wire.NewSet(
ProvideStagingDB,
ProvideStagingPool,
ProvideTargetPool,
stdpool.ProvideBackup,
ProvideStatementCache,
sinktest.NewBreakers,
)

const (
Expand Down
10 changes: 8 additions & 2 deletions internal/sinkprod/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/cockroachdb/field-eng-powertools/stopper"
"github.com/cockroachdb/replicator/internal/sinktest"
"github.com/cockroachdb/replicator/internal/staging/version"
"github.com/cockroachdb/replicator/internal/types"
"github.com/cockroachdb/replicator/internal/util/diag"
Expand Down Expand Up @@ -74,7 +75,12 @@ func (c *TargetConfig) Preflight() error {
// Adding the Replicator version checker here is a bit of a hack. If
// Wire supported eager dependencies, this would be an obvious use.
func ProvideTargetPool(
ctx *stopper.Context, check *version.Checker, config *TargetConfig, diags *diag.Diagnostics,
ctx *stopper.Context,
check *version.Checker,
config *TargetConfig,
diags *diag.Diagnostics,
backup *stdpool.Backup,
breakers *sinktest.Breakers,
) (*types.TargetPool, error) {
missing, err := check.Check(ctx)
if err != nil {
Expand All @@ -94,7 +100,7 @@ func ProvideTargetPool(
stdpool.WithTransactionTimeout(config.ApplyTimeout),
}

ret, err := stdpool.OpenTarget(ctx, config.Conn, options...)
ret, err := stdpool.OpenTarget(ctx, config.Conn, backup, breakers, options...)
if err != nil {
return nil, err
}
Expand Down
21 changes: 13 additions & 8 deletions internal/sinktest/all/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/sinktest/base/injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ import (

// NewFixture constructs a self-contained test fixture.
func NewFixture(t testing.TB) (*Fixture, error) {
panic(wire.Build(TestSet))
panic(wire.Build(TestSet, ProvideMemory))
}
Loading

0 comments on commit d23e8a8

Please sign in to comment.