Skip to content

Commit

Permalink
wire: Make stopper.Context globally available
Browse files Browse the repository at this point in the history
This change cleans up some longstanding lifecycle issues around graceful
draining. In the past, we relied on the ability to return a cancel function to
Wire to shut down background goroutines that might be started by some service
or another. This works, but it would be nice to standardize on the
stopper.Context utility type for running goroutines that can benefit from a
soft-exit condition.

The crux of this change is to make `*stopper.Context` available from all
injectors. The `context.Context` type remains injectable, but it becomes an
alias for the `*stopper.Context`. The top-level injectors now require a
`*stopper.Context`, which gives the callers the ability to trigger a soft
shutdown of background processes before tearing down the entire stack. This
should reduce the amount of log-spam during shutdown.

This change will also make it far more straightforward to implement metrics
that tick (e.g. #560) since a service or object that needs to produce a ticking
metric has the option to use the stack-global `*stopper.Context.Go()`. Increase
use of our `Go()` method also provides a future opportunity to improve
observabiity around background processes within a cdc-sink binary.
  • Loading branch information
bobvawter committed Oct 31, 2023
1 parent 7e1d5da commit b8a2f9f
Show file tree
Hide file tree
Showing 33 changed files with 230 additions and 197 deletions.
4 changes: 3 additions & 1 deletion internal/cmd/fslogical/fslogical.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package fslogical
import (
"github.com/cockroachdb/cdc-sink/internal/source/fslogical"
"github.com/cockroachdb/cdc-sink/internal/util/stdlogical"
"github.com/cockroachdb/cdc-sink/internal/util/stopper"
"github.com/spf13/cobra"
)

Expand All @@ -31,7 +32,8 @@ func Command() *cobra.Command {
Bind: cfg.Bind,
Short: "start a Google Cloud Firestore logical replication feed",
Start: func(cmd *cobra.Command) (any, func(), error) {
return fslogical.Start(cmd.Context(), cfg)
// main.go provides this stopper.
return fslogical.Start(stopper.From(cmd.Context()), cfg)
},
Use: "fslogical",
})
Expand Down
4 changes: 3 additions & 1 deletion internal/cmd/mylogical/mylogical.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package mylogical
import (
"github.com/cockroachdb/cdc-sink/internal/source/mylogical"
"github.com/cockroachdb/cdc-sink/internal/util/stdlogical"
"github.com/cockroachdb/cdc-sink/internal/util/stopper"
"github.com/spf13/cobra"
)

Expand All @@ -31,7 +32,8 @@ func Command() *cobra.Command {
Bind: cfg.Bind,
Short: "start a mySQL replication feed",
Start: func(cmd *cobra.Command) (any, func(), error) {
return mylogical.Start(cmd.Context(), cfg)
// main.go provides a stopper.
return mylogical.Start(stopper.From(cmd.Context()), cfg)
},
Use: "mylogical",
})
Expand Down
4 changes: 3 additions & 1 deletion internal/cmd/pglogical/pglogical.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package pglogical
import (
"github.com/cockroachdb/cdc-sink/internal/source/pglogical"
"github.com/cockroachdb/cdc-sink/internal/util/stdlogical"
"github.com/cockroachdb/cdc-sink/internal/util/stopper"
"github.com/spf13/cobra"
)

Expand All @@ -31,7 +32,8 @@ func Command() *cobra.Command {
Bind: cfg.Bind,
Short: "start a pg logical replication feed",
Start: func(cmd *cobra.Command) (any, func(), error) {
return pglogical.Start(cmd.Context(), cfg)
// main.go provides a stopper.
return pglogical.Start(stopper.From(cmd.Context()), cfg)
},
Use: "pglogical",
})
Expand Down
4 changes: 3 additions & 1 deletion internal/cmd/start/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package start
import (
"github.com/cockroachdb/cdc-sink/internal/source/server"
"github.com/cockroachdb/cdc-sink/internal/util/stdlogical"
"github.com/cockroachdb/cdc-sink/internal/util/stopper"
"github.com/spf13/cobra"
)

Expand All @@ -30,7 +31,8 @@ func Command() *cobra.Command {
Bind: cfg.Bind,
Short: "start the server",
Start: func(cmd *cobra.Command) (any, func(), error) {
return server.NewServer(cmd.Context(), &cfg)
// main.go gives us a stopper, just unwrap it.
return server.NewServer(stopper.From(cmd.Context()), &cfg)
},
Use: "start",
})
Expand Down
9 changes: 7 additions & 2 deletions internal/script/injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,29 @@ import (
"github.com/cockroachdb/cdc-sink/internal/types"
"github.com/cockroachdb/cdc-sink/internal/util/applycfg"
"github.com/cockroachdb/cdc-sink/internal/util/diag"
"github.com/cockroachdb/cdc-sink/internal/util/stopper"
"github.com/google/wire"
)

// Evaluate the loaded script.
func Evaluate(
ctx context.Context,
ctx *stopper.Context,
loader *Loader,
configs *applycfg.Configs,
diags *diag.Diagnostics,
targetSchema TargetSchema,
watchers types.Watchers,
) (*UserScript, error) {
panic(wire.Build(ProvideUserScript))
panic(wire.Build(
ProvideUserScript,
wire.Bind(new(context.Context), new(*stopper.Context)),
))
}

func newScriptFromFixture(*all.Fixture, *Config, TargetSchema) (*UserScript, error) {
panic(wire.Build(
Set,
wire.Bind(new(context.Context), new(*stopper.Context)),
wire.FieldsOf(new(*all.Fixture), "Diagnostics", "Fixture", "Configs", "Watchers"),
wire.FieldsOf(new(*base.Fixture), "Context"),
))
Expand Down
8 changes: 4 additions & 4 deletions internal/script/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 2 additions & 5 deletions internal/sinktest/all/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 16 additions & 4 deletions internal/sinktest/base/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cdc-sink/internal/util/retry"
"github.com/cockroachdb/cdc-sink/internal/util/stdpool"
"github.com/cockroachdb/cdc-sink/internal/util/stmtcache"
"github.com/cockroachdb/cdc-sink/internal/util/stopper"
"github.com/google/wire"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -95,14 +96,15 @@ var TestSet = wire.NewSet(
ProvideTargetStatements,
diag.New,

wire.Bind(new(context.Context), new(*stopper.Context)),
wire.Struct(new(Fixture), "*"),
)

// Fixture can be used for tests that "just need a database",
// without the other services provided by the target package. One can be
// constructed by calling NewFixture.
type Fixture struct {
Context context.Context // The context for the test.
Context *stopper.Context // The context for the test.
SourcePool *types.SourcePool // Access to user-data tables and changefeed creation.
SourceSchema sinktest.SourceSchema // A container for tables within SourcePool.
StagingPool *types.StagingPool // Access to __cdc_sink database.
Expand Down Expand Up @@ -138,9 +140,19 @@ var caseTimout = flag.Duration(

// ProvideContext returns an execution context that is associated with a
// singleton connection to a CockroachDB cluster.
func ProvideContext() (context.Context, func(), error) {
ctx, cancel := context.WithTimeout(context.Background(), *caseTimout)
return ctx, cancel, nil
func ProvideContext() (*stopper.Context, func()) {
ctx := stopper.WithContext(context.Background())
ctx.Go(func() error {
select {
case <-ctx.Stopping():
// Clean shutdown, do nothing.
case <-time.After(*caseTimout):
// Just cancel immediately.
ctx.Stop(0)
}
return nil
})
return ctx, func() { ctx.Stop(100 * time.Millisecond) }
}

// ProvideSourcePool connects to the source database. If the source is a
Expand Down
5 changes: 1 addition & 4 deletions internal/sinktest/base/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 8 additions & 7 deletions internal/source/cdc/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
package cdc

import (
"context"
"fmt"

"github.com/cockroachdb/cdc-sink/internal/source/logical"
"github.com/cockroachdb/cdc-sink/internal/types"
"github.com/cockroachdb/cdc-sink/internal/util/ident"
"github.com/cockroachdb/cdc-sink/internal/util/stopper"
"github.com/google/wire"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -55,17 +55,17 @@ func ProvideMetaTable(cfg *Config) MetaTable {

// ProvideResolvers is called by Wire.
func ProvideResolvers(
ctx context.Context,
ctx *stopper.Context,
cfg *Config,
leases types.Leases,
loops *logical.Factory,
metaTable MetaTable,
pool *types.StagingPool,
stagers types.Stagers,
watchers types.Watchers,
) (*Resolvers, func(), error) {
) (*Resolvers, error) {
if _, err := pool.Exec(ctx, fmt.Sprintf(schema, metaTable.Table())); err != nil {
return nil, nil, errors.WithStack(err)
return nil, errors.WithStack(err)
}

ret := &Resolvers{
Expand All @@ -75,20 +75,21 @@ func ProvideResolvers(
metaTable: metaTable.Table(),
pool: pool,
stagers: stagers,
stop: ctx,
watchers: watchers,
}
ret.mu.instances = &ident.SchemaMap[*logical.Loop]{}

// Resume from previous state.
schemas, err := ScanForTargetSchemas(ctx, pool, ret.metaTable)
if err != nil {
return nil, nil, err
return nil, err
}
for _, schema := range schemas {
if _, _, err := ret.get(ctx, schema); err != nil {
return nil, nil, errors.Wrapf(err, "could not bootstrap resolver for schema %s", schema)
return nil, errors.Wrapf(err, "could not bootstrap resolver for schema %s", schema)
}
}

return ret, ret.close, nil
return ret, nil
}
29 changes: 3 additions & 26 deletions internal/source/cdc/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,33 +595,15 @@ type Resolvers struct {
metaTable ident.Table
pool *types.StagingPool
stagers types.Stagers
stop *stopper.Context // Manage lifecycle of background processes.
watchers types.Watchers

mu struct {
sync.Mutex
cleanups []func()
instances *ident.SchemaMap[*logical.Loop]
}
}

// close will drain any running resolver loops.
func (r *Resolvers) close() {
r.mu.Lock()
defer r.mu.Unlock()

// Cancel each loop.
for _, cancel := range r.mu.cleanups {
cancel()
}
// Wait for shutdown.
_ = r.mu.instances.Range(func(_ ident.Schema, l *logical.Loop) error {
<-l.Stopped()
return nil
})
r.mu.cleanups = nil
r.mu.instances = nil
}

// get creates or returns the [logical.Loop] and the enclosed resolver.
func (r *Resolvers) get(
ctx context.Context, target ident.Schema,
Expand All @@ -643,7 +625,7 @@ func (r *Resolvers) get(
return nil, ret, nil
}

loop, cleanup, err := r.loops.Start(&logical.LoopConfig{
loop, err := r.loops.Start(r.stop, &logical.LoopConfig{
Dialect: ret,
LoopName: "changefeed-" + target.Raw(),
TargetSchema: target,
Expand All @@ -655,13 +637,8 @@ func (r *Resolvers) get(
r.mu.instances.Put(target, loop)

// Start a goroutine to retire old data.
stop := stopper.WithContext(context.Background())
ret.retireLoop(stop)
ret.retireLoop(r.stop)

r.mu.cleanups = append(r.mu.cleanups,
cleanup,
func() { stop.Stop(time.Second) },
)
return loop, ret, nil
}

Expand Down
4 changes: 4 additions & 0 deletions internal/source/cdc/test_fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package cdc

import (
"context"

"github.com/cockroachdb/cdc-sink/internal/script"
"github.com/cockroachdb/cdc-sink/internal/sinktest/all"
"github.com/cockroachdb/cdc-sink/internal/sinktest/base"
Expand All @@ -28,6 +30,7 @@ import (
"github.com/cockroachdb/cdc-sink/internal/staging/leases"
"github.com/cockroachdb/cdc-sink/internal/target"
"github.com/cockroachdb/cdc-sink/internal/util/diag"
"github.com/cockroachdb/cdc-sink/internal/util/stopper"
"github.com/google/wire"
)

Expand All @@ -50,6 +53,7 @@ func newTestFixture(*all.Fixture, *Config) (*testFixture, func(), error) {
target.Set,
trust.New, // Is valid to use as a provider.
wire.Struct(new(testFixture), "*"),
wire.Bind(new(context.Context), new(*stopper.Context)),
wire.Bind(new(logical.Config), new(*Config)),
))
}
3 changes: 1 addition & 2 deletions internal/source/cdc/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit b8a2f9f

Please sign in to comment.