diff --git a/internal/cmd/votr/README.md b/internal/cmd/votr/README.md new file mode 100644 index 000000000..74c3d57fa --- /dev/null +++ b/internal/cmd/votr/README.md @@ -0,0 +1,113 @@ +# VOTR + +The VOTR workload demonstrates a hub-and-spoke replication workload that +uses cdc-sink's merge features. + +The core workload demonstrates a ballot-casting system. There are three tables: +* `candidates` is an infrequently-updated table that uses a version + column and last-one-wins behaviors +* `ballots` is an append-only table with a composite primary key of + `(candidate uuid, ballot uuid)` +* `totals` maintains per-candidate running totals. It is intended to + show how a high-contention table can be logically replicated; this is + not an ideal pattern for CockroachDB. + +VOTR shows a hub-and-spoke replication model, but it can also operate in +a two-datacenter (2-DC) model. The `votr_0` database is used as the hub, +and an arbitrary number of "spoke" databases, `votr_N` run the workload. +Each hub-spoke pair has two unidirectional changefeeds, each of which +are processed by independent `cdc-sink` instances. There are a total of +six independent feeds that are operating in this demonstration. + +![votr-diagram.png](votr-diagram.png) + +The worker attached to each spoke selects a random number of candidates +and casts one or more ballots to each of them. The VOTR worker knows +nothing about the replication model. The workload is roughly: +* `BEGIN` +* For random Candidate and Number of votes: + * `UPDATE totals SET total = total + N WHERE candidate = C` + * `INSERT INTO ballots (candidate) VALUES (C, generate_series(1, N))` +* `COMMIT` + +Each spoke also has a validation loop that checks whether or not the +total number of ballots equals the running total of all totals. That is +`SELECT count(*) FROM ballots` should equal `SELECT sum(total) FROM +totals` at any given point in time. + +The only SQL client to interact with the `votr_0` hub is cdc-sink. + +The tables used by the `VOTR` workload are augmented with a vector-clock +scheme. That is, each table has a `whence JSONB` column that shows which +worker(s) a row incorporates state from. A `src INT` column also allows +the `vectorMerge()` function defined in the [votr.ts user +script](./script/votr.ts) to ensure idempotency by ensuring that the +clock for a particular source only ever rolls forwards. Note that the +functionality does *not* depend on clock synchronization between the +workers, only that each worker has a reasonably well-behaved cluster +timestamp. + +# Running the demo + +Start one or more local CockroachDB instances. The VOTR workload can use +multiple SQL databases on a single CockroachDB node, or the databases +can be split across multiple clusters. +> `cockroach start-single-node --store type=mem,size=2G --advertise-addr 127.0.0.1:26257 --insecure ` + +Initialize the VOTR schema on each instance. The number of regions is +specified by repeating the `--connect` argument. It is valid to have +duplicate connection strings to use the same CockroachDB node or cluster +to host multiple VOTR databases. This command-line shows the local +CockroachDB node being used to host the hub and three spokes. +> `cdc-sink votr init +> --connect 'postgresql://root@localhost:26257/?sslmode=disable' +> --connect 'postgresql://root@localhost:26257/?sslmode=disable' +> --connect 'postgresql://root@localhost:26257/?sslmode=disable' +> --connect 'postgresql://root@localhost:26257/?sslmode=disable'` + +Run the workload. The `--drainDelay` flag sets a timeout after receiving +a `^C` for the workers to stop, but to allow the cdc-sink instances to +continue to process messages. +> `cdc-sink votr run +> --candidates 128 --workers 1 --drainDelay 1m +> --connect 'postgresql://root@localhost:26257/?sslmode=disable' +> --connect 'postgresql://root@localhost:26257/?sslmode=disable' +> --connect 'postgresql://root@localhost:26257/?sslmode=disable' +> --connect 'postgresql://root@localhost:26257/?sslmode=disable'` + +# Useful demo query + +Show the count of ballots and sum total across multiple VOTR database on +the same node: +```sql +WITH t (n, t) AS ( + SELECT 1 AS n, sum(total) + FROM votr_1.totals + UNION ALL SELECT 2 AS n, sum(total) + FROM votr_2.totals + UNION ALL SELECT 3 AS n, sum(total) + FROM votr_3.totals + UNION ALL SELECT 0 AS n, sum(total) + FROM votr_0.totals + ), + c (n, c) AS ( + SELECT 1, count(*) FROM votr_1.ballots + UNION ALL SELECT 2, count(*) + FROM votr_2.ballots + UNION ALL SELECT 3, count(*) + FROM votr_3.ballots + UNION ALL SELECT 0, count(*) + FROM votr_0.ballots + ) +SELECT * + FROM t JOIN c USING (n) order by n; +``` + +# Known limitations + +* VOTR is a one-shot demo since the changefeeds are deleted on exit. The + cdc-sink HTTP ports are ephemeral, so it's not straightforward to + continue from a previous run. With more development effort, VOTR could + query the jobs table to determine which port it should attempt to bind + a listener to, in order to use the existing state. +* VOTR should demonstrate adding a new region. \ No newline at end of file diff --git a/internal/cmd/votr/schema.go b/internal/cmd/votr/schema.go new file mode 100644 index 000000000..87c730999 --- /dev/null +++ b/internal/cmd/votr/schema.go @@ -0,0 +1,297 @@ +// Copyright 2023 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package votr + +import ( + "context" + "database/sql" + "fmt" + "math/rand" + + "github.com/cockroachdb/cdc-sink/internal/types" + "github.com/cockroachdb/cdc-sink/internal/util/ident" + "github.com/cockroachdb/cdc-sink/internal/util/retry" + "github.com/google/uuid" + "github.com/pkg/errors" +) + +var ( + ballots = ident.New("ballots") + candidates = ident.New("candidates") + totals = ident.New("totals") + + names = [...]string{ + "Alice", "Bob", "Carol", "David", "Eve", "Frank", "Gil", + "Hillary", "Indira", "Jill", "Kyle", "Louis", "Mike", "Nancy", + "Oscar", "Paul", "Queen", "Romeo", "Sierra", "Toni", "Ursula", + "Vik", "Walter", "Xerxes", "Yolanda", "Zola", + } + connectors = [...]string{"le", "the"} + epithets = [...]string{ + "Awesome", "Boor", "Concerned", "Dependable", "Elated", "Fancy", + "Grouch", "Hapless", "Indecisive", "Joyful", "Kleptocrat", + "Lesser", "Mannered", "Nice", "Opulent", "Purposeful", "Quiet", + "Remote", "Sulky", "Truthful", "Unfortunate", "Victorious", + "Wastrel", "XIVth", "Yankee", "Zoologist", + } + moods = [...]string{ + "Active", "Bad", "Cheerful", "Down", "Elated", "Frightened", + "Good", "Happy", "Introspective", "Justified", "Kind", "Liked", + "Mad", "Naughty", "Open", "Puzzled", "Questioning", "Romantic", + "Sleepy", "Trusting", "Watchful", "XOXO", "Zen", + } +) + +const ( + // ballots are append-only. + ballotsSchema = `CREATE TABLE IF NOT EXISTS %[1]s ( +candidate UUID NOT NULL REFERENCES %[2]s ON DELETE CASCADE, +ballot UUID NOT NULL DEFAULT gen_random_uuid(), +whence JSONB NOT NULL +DEFAULT jsonb_build_object(%[3]d, cluster_logical_timestamp()::string) +ON UPDATE jsonb_build_object(%[3]d, cluster_logical_timestamp()::string), +src INT NOT NULL DEFAULT %[3]d ON UPDATE %[3]d, +xyzzy INT NOT NULL DEFAULT 0, -- Hack to force a conflict, until CAS mode has "always" mode. +PRIMARY KEY (candidate, ballot) +)` + + // candidates might be updated occasionally in a last-one-wins manner. + candidatesSchema = `CREATE TABLE IF NOT EXISTS %[1]s ( +candidate UUID PRIMARY KEY, +whence JSONB NOT NULL +DEFAULT jsonb_build_object(%[2]d, cluster_logical_timestamp()::string) +ON UPDATE jsonb_build_object(%[2]d, cluster_logical_timestamp()::string), +src INT NOT NULL DEFAULT %[2]d ON UPDATE %[2]d, +name TEXT NOT NULL, +mood TEXT NOT NULL, +xyzzy INT NOT NULL DEFAULT 0 -- Hack to force a conflict, until CAS mode has "always" mode. +)` + + // totals will show a high-conflict table with custom merge logic. + totalsSchema = `CREATE TABLE IF NOT EXISTS %[1]s ( +candidate UUID PRIMARY KEY REFERENCES %[2]s ON DELETE CASCADE, +whence JSONB NOT NULL +DEFAULT jsonb_build_object(%[3]d, cluster_logical_timestamp()::string) +ON UPDATE jsonb_build_object(%[3]d, cluster_logical_timestamp()::string), +src INT NOT NULL DEFAULT %[3]d ON UPDATE %[3]d, +total INT NOT NULL DEFAULT 0, +xyzzy INT NOT NULL DEFAULT 0 -- Hack to force a conflict, until CAS mode has "always" mode. +)` +) + +type schema struct { + ballots ident.Table + candidates ident.Table + enclosing ident.Ident + totals ident.Table + + candidateIds map[uuid.UUID]struct{} + db *types.SourcePool + region region +} + +func newSchema(db *types.SourcePool, enclosing ident.Ident, r region) *schema { + enclosing = ident.New(enclosing.Raw() + "_" + r.String()) + s := ident.MustSchema(enclosing, ident.Public) + return &schema{ + ballots: ident.NewTable(s, ballots), + candidateIds: make(map[uuid.UUID]struct{}), + candidates: ident.NewTable(s, candidates), + db: db, + enclosing: enclosing, + region: r, + totals: ident.NewTable(s, totals), + } +} + +func (s *schema) create(ctx context.Context) error { + return retry.Retry(ctx, func(ctx context.Context) error { + if _, err := s.db.ExecContext(ctx, fmt.Sprintf( + `DROP DATABASE IF EXISTS %s CASCADE`, s.enclosing)); err != nil { + return errors.WithStack(err) + } + + if _, err := s.db.ExecContext(ctx, fmt.Sprintf( + `CREATE DATABASE %s `, s.enclosing)); err != nil { + return errors.WithStack(err) + } + + if _, err := s.db.ExecContext(ctx, fmt.Sprintf( + candidatesSchema, s.candidates, s.region, + )); err != nil { + return errors.WithStack(err) + } + + if _, err := s.db.ExecContext(ctx, fmt.Sprintf( + ballotsSchema, s.ballots, s.candidates, s.region, + )); err != nil { + return errors.WithStack(err) + } + + if _, err := s.db.ExecContext(ctx, fmt.Sprintf( + totalsSchema, s.totals, s.candidates, s.region, + )); err != nil { + return errors.WithStack(err) + } + return nil + }) +} + +// doStuff selects a random selection of candidates, distributes the +// requested number of votes across them, and inserts the ballots. +func (s *schema) doStuff(ctx context.Context, votes int) error { + if votes <= 0 { + return nil + } + numCandidates := rand.Intn(votes) + 1 // Intn [0,n) + + winners := make([]uuid.UUID, 0, numCandidates) + // Iteration over a map is random enough for our purposes. + for id := range s.candidateIds { + winners = append(winners, id) + if len(winners) == numCandidates { + break + } + } + + voteAllocation := make(map[uuid.UUID]int) + for i := 0; i < votes; i++ { + winnerIdx := i % len(winners) + voteAllocation[winners[winnerIdx]]++ + } + + ballotQ := fmt.Sprintf(`INSERT INTO %s (candidate) +SELECT candidate FROM +(SELECT $1::UUID candidate, generate_series(1, $2))`, s.ballots) + totalQ := fmt.Sprintf(`INSERT INTO %s AS tbl (candidate, total) +VALUES ($1, $2) +ON CONFLICT(candidate) +DO UPDATE SET total=tbl.total+excluded.total`, s.totals) + + return retry.Retry(ctx, func(ctx context.Context) error { + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return errors.WithStack(err) + } + defer func() { _ = tx.Rollback() }() + + for candidate, count := range voteAllocation { + if _, err := tx.ExecContext(ctx, totalQ, candidate, count); err != nil { + return errors.WithStack(err) + } + if _, err := tx.ExecContext(ctx, ballotQ, candidate, count); err != nil { + return errors.WithStack(err) + } + } + if err := errors.WithStack(tx.Commit()); err != nil { + return err + } + return nil + }) +} + +func (s *schema) ensureCandidates(ctx context.Context, count int) error { + seed := int64(0) + rnd := rand.New(rand.NewSource(seed)) + + nextMood := func() string { + return moods[rnd.Intn(len(moods))] + } + nextName := func(deconflict int) string { + return fmt.Sprintf("%s %s %s (%d)", + names[rnd.Intn(len(names))], + connectors[rnd.Intn(len(connectors))], + epithets[rnd.Intn(len(epithets))], + deconflict) + } + + // Rows are inserted with deterministic ids. + q := fmt.Sprintf(`UPSERT INTO %s (candidate, name, mood) +VALUES (uuid_generate_v5('455E049E-54B6-41C9-BBCE-1587CC394851', $1), $1, $2) +RETURNING candidate`, s.candidates) + + for i := 0; i < count; i++ { + name := nextName(i) + mood := nextMood() + if err := retry.Retry(ctx, func(ctx context.Context) error { + var id uuid.UUID + if err := s.db.QueryRowContext(ctx, q, name, mood).Scan(&id); err != nil { + return errors.WithStack(err) + } + s.candidateIds[id] = struct{}{} + return nil + }); err != nil { + return err + } + } + + return nil +} + +func (s *schema) sumTotal(ctx context.Context) (int, error) { + var total sql.NullInt64 + err := s.db.QueryRowContext(ctx, + fmt.Sprintf("SELECT sum(total) FROM %s", s.totals), + ).Scan(&total) + return int(total.Int64), errors.Wrapf(err, "sum total in region %s", s.region) +} + +func (s *schema) validate(ctx context.Context, aost bool) ([]string, error) { + asOf := "" + if aost { + asOf = "AS OF SYSTEM TIME follower_read_timestamp()" + } + + q := fmt.Sprintf(` + WITH counted AS (SELECT candidate, count(*) AS count FROM %s GROUP BY candidate), + verify AS ( + SELECT candidate, + IFNULL(counted.count, 0) expected, + IFNULL(totals.total, 0) actual + FROM counted FULL JOIN %s USING (candidate) + ) +SELECT candidate, expected, actual, name + FROM verify + JOIN %s USING (candidate) + %s + WHERE expected != actual`, s.ballots, s.totals, s.candidates, asOf) + + var ret []string + err := retry.Retry(ctx, func(ctx context.Context) error { + ret = nil // Reset if looping. + + rows, err := s.db.QueryContext(ctx, q) + if err != nil { + return errors.WithStack(err) + } + defer func() { _ = rows.Close() }() + + for rows.Next() { + var id uuid.UUID + var expected, actual int + var name string + if err := rows.Scan(&id, &expected, &actual, &name); err != nil { + return errors.WithStack(err) + } + ret = append(ret, fmt.Sprintf("%s: expected %d had %d (%s)", + id, expected, actual, name)) + } + // Final error check. + return errors.WithStack(rows.Err()) + }) + return ret, err +} diff --git a/internal/cmd/votr/schema_test.go b/internal/cmd/votr/schema_test.go new file mode 100644 index 000000000..b8ba5232f --- /dev/null +++ b/internal/cmd/votr/schema_test.go @@ -0,0 +1,72 @@ +// Copyright 2023 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package votr + +import ( + "fmt" + "strings" + "testing" + + "github.com/cockroachdb/cdc-sink/internal/sinktest/base" + "github.com/cockroachdb/cdc-sink/internal/types" + "github.com/stretchr/testify/require" +) + +func TestSchema(t *testing.T) { + const numCandidates = 128 + r := require.New(t) + + fixture, err := base.NewFixture(t) + r.NoError(err) + + checkSupported(t, fixture.SourcePool) + + ctx := fixture.Context + + // Steal the enclosing DATABASE name, since we're basically running + // the votr init command. + enclosingDB := fixture.SourceSchema.Schema().Idents(nil)[0] + sch := newSchema(fixture.SourcePool, enclosingDB, 0) + + // Set up the schema, insert some votes, and ensure that everything + // is consistent. + r.NoError(sch.create(ctx)) + r.NoError(sch.ensureCandidates(ctx, numCandidates)) + for i := 0; i < 10; i++ { + r.NoError(sch.doStuff(ctx, 10)) + } + failures, err := sch.validate(ctx, false) + r.NoError(err) + r.Empty(failures) + + // Break the totals table. + _, err = fixture.SourcePool.ExecContext(ctx, fmt.Sprintf(`UPDATE %s SET total=total+1 WHERE true`, sch.totals)) + r.NoError(err) + failures, err = sch.validate(ctx, false) + r.NoError(err) + r.NotEmpty(failures) +} + +// checkSupported calls t.Skip() if VOTR cannot run on the target +// CockroachDB version. +func checkSupported(t *testing.T, target *types.SourcePool) { + if target.Product != types.ProductCockroachDB || + strings.Contains(target.Version, "v21.") || + strings.Contains(target.Version, "v22.") { + t.Skipf("VOTR only runs on CRDB 23.1 or later") + } +} diff --git a/internal/cmd/votr/script/votr.ts b/internal/cmd/votr/script/votr.ts new file mode 100644 index 000000000..5ca8c2c78 --- /dev/null +++ b/internal/cmd/votr/script/votr.ts @@ -0,0 +1,91 @@ +/* + * Copyright 2023 The Cockroach Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +import * as api from "cdc-sink@v1"; + +// These constants are set by the votr script loader. + +const destination: number = DESTINATION_INDEX; +const votrDB: string = DESTINATION_DB; + +// If the document has already passed through the destination, we don't +// want it to continue looping around. Since this decision can be made +// without loading the row in the destination, we can use the per-table +// map function, rather than filtering in the merge function. +const filterWhence = (doc: api.Document) => doc.src !== destination ? doc : null; + +// vectorMerge is a helper function to admit an incoming mutation based +// on its vector clock versus the destination's knowledge of the clock. +const vectorMerge = + (fallback?: api.MergeFunction): api.MergeFunction => + (op: api.MergeOperation): api.MergeResult => { + // Replay prevention: Don't process messages from older + // clock values in the src column. + let src: number = op.proposed.src; + if (src === undefined) { + throw new Error("src column missing"); + } + let incomingClock: string = op.proposed.whence[src] ?? 0; + let existingClock: string = op.target.whence[src] ?? 0; + if (incomingClock <= existingClock) { + return {drop: true}; + } + + // Use the target's view of the vector clock, but update it + // with the mutation source's value. + op.proposed.whence = op.target.whence; + op.proposed.whence[src] = incomingClock; + + // Absent a fallback, act like last-one-wins. + if (!fallback) { + return {apply: op.proposed}; + } + + // Delegate to specialty logic. + return fallback(op); + }; + + +api.configureTable(`${votrDB}.public.ballots`, { + cas: ["xyzzy"], // TODO: The CAS option needs an "always" setting + map: filterWhence, + merge: vectorMerge(), +}); + +api.configureTable(`${votrDB}.public.candidates`, { + cas: ["xyzzy"], + map: filterWhence, + merge: vectorMerge(), +}); + +api.configureTable(`${votrDB}.public.totals`, { + cas: ["xyzzy"], + map: filterWhence, + merge: vectorMerge((op: api.MergeOperation): api.MergeResult => { + // Apply a delta based on before and proposed values. + let a: number = op.proposed.total ?? 0; + let b: number = op.before?.total ?? 0; + let delta = a - b; + if (delta === 0) { + return {drop: true}; + } + + op.proposed.total = op.target.total + delta; + return {apply: op.proposed}; + }), +}) \ No newline at end of file diff --git a/internal/cmd/votr/votr-diagram.png b/internal/cmd/votr/votr-diagram.png new file mode 100644 index 000000000..cf038aeb3 Binary files /dev/null and b/internal/cmd/votr/votr-diagram.png differ diff --git a/internal/cmd/votr/votr.go b/internal/cmd/votr/votr.go new file mode 100644 index 000000000..e4410bc2a --- /dev/null +++ b/internal/cmd/votr/votr.go @@ -0,0 +1,609 @@ +// Copyright 2023 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +// Package votr contains a demonstration workload. +package votr + +import ( + "context" + "embed" + "fmt" + "math" + "math/rand" + "net" + "net/url" + "os" + "strconv" + "strings" + "sync/atomic" + "time" + + "github.com/cockroachdb/cdc-sink/internal/script" + "github.com/cockroachdb/cdc-sink/internal/source/cdc" + "github.com/cockroachdb/cdc-sink/internal/source/logical" + "github.com/cockroachdb/cdc-sink/internal/source/server" + "github.com/cockroachdb/cdc-sink/internal/types" + "github.com/cockroachdb/cdc-sink/internal/util/ident" + "github.com/cockroachdb/cdc-sink/internal/util/retry" + "github.com/cockroachdb/cdc-sink/internal/util/stdpool" + "github.com/cockroachdb/cdc-sink/internal/util/stopper" + "github.com/cockroachdb/cdc-sink/internal/util/subfs" + "github.com/jackc/pgx/v5" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "github.com/spf13/pflag" +) + +type region int + +func (r region) String() string { + return strconv.Itoa(int(r)) +} + +type config struct { + BallotBatch int + Candidates int + Connect []string + DrainDelay time.Duration + Enclosing ident.Ident + ErrInconsistent bool + MaxBallots int + ReportInterval time.Duration + SinkHost string + StopAfter time.Duration + ValidationDelay time.Duration + WorkerDelay time.Duration + Workers int + + schemas []*schema // Used by test code to post-validate. + remaining atomic.Int64 +} + +func (c *config) Bind(f *pflag.FlagSet) { + f.IntVar(&c.BallotBatch, "ballotBatch", 10, + "the number of ballots to record in a single batch") + f.IntVar(&c.Candidates, "candidates", 128, + "the number of candidate rows") + f.StringSliceVar(&c.Connect, "connect", + []string{ + "postgresql://root@localhost:26257/?sslmode=disable", + "postgresql://root@localhost:26258/?sslmode=disable", + }, + "two or more CockroachDB connection strings") + f.DurationVar(&c.DrainDelay, "drainDelay", time.Minute, + "pause between stopping workload and stopping cdc-sink processes") + f.Var(ident.NewValue("votr", &c.Enclosing), "schema", + "the enclosing database schema") + f.BoolVar(&c.ErrInconsistent, "errorIfInconsistent", false, + "exit immediately if the database is inconsistent") + f.IntVar(&c.MaxBallots, "maxBallots", 0, + "if non-zero, the total number of ballots to be cast") + f.DurationVar(&c.ReportInterval, "reportAfter", 5*time.Second, + "report number of ballots inserted") + f.StringVar(&c.SinkHost, "sinkHost", "127.0.0.1", + "the hostname to use when creating changefeeds") + f.DurationVar(&c.StopAfter, "stopAfter", 0, + "if non-zero, exit after running for this long") + f.DurationVar(&c.ValidationDelay, "validationDelay", 15*time.Second, + "sleep time between validation cycles") + f.DurationVar(&c.WorkerDelay, "workerDelay", 100*time.Millisecond, + "sleep time between ballot stuffing") + f.IntVar(&c.Workers, "workers", 1, + "the number of concurrent ballot stuffers") +} + +// Command returns the VOTR workload. +func Command() *cobra.Command { + cmd := &cobra.Command{ + Use: "votr", + Short: "a workload to demonstrate async, active-active replication", + } + cmd.AddCommand(commandInit(), commandRun()) + return cmd +} + +// commandInit returns a Command wrapper around doInit. +func commandInit() *cobra.Command { + cfg := &config{} + cmd := &cobra.Command{ + Use: "init", + Short: "initialize the VOTR schema", + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + // main.go provides the stopper. + return doInit(stopper.From(cmd.Context()), cfg) + }, + } + cfg.Bind(cmd.Flags()) + return cmd +} + +// doInit initializes the VOTR schema in the configured targets. +func doInit(ctx *stopper.Context, cfg *config) error { + if len(cfg.Connect) < 2 { + return errors.New("at least two connection strings are required") + } + + schemas := make([]*schema, 0, len(cfg.Connect)) + + for idx, conn := range cfg.Connect { + sch, err := openSchema(ctx, cfg, region(idx)) + if err != nil { + return errors.Wrapf(err, "could not connect to region %d at %s", idx, conn) + } + schemas = append(schemas, sch) + } + + for _, sch := range schemas { + if err := sch.create(ctx); err != nil { + return errors.Wrapf(err, "could not create VOTR schema in %s", sch.region) + } + } + + return nil +} + +// commandRun returns a Command wrapper around doRun. +func commandRun() *cobra.Command { + cfg := &config{} + cmd := &cobra.Command{ + Use: "run", + Short: "run the VOTR workload", + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) error { + // main.go provides the stopper. + return doRun(stopper.From(cmd.Context()), cfg) + }, + } + cfg.Bind(cmd.Flags()) + return cmd +} + +// doRun is the main votr demo. +func doRun(ctx *stopper.Context, cfg *config) error { + if len(cfg.Connect) < 2 { + return errors.New("at least two connection strings are required") + } + + // Allow scripted shutdown, also used by test code. + if cfg.StopAfter > 0 { + ctx.Go(func() error { + select { + case <-ctx.Stopping(): + // Do nothing. + case <-time.After(cfg.StopAfter): + ctx.Stop(cfg.DrainDelay) + } + return nil + }) + } + + // Run the requested number of workers. + log.Infof("inserting with %d workers across %d candidates", cfg.Workers, cfg.Candidates) + if cfg.WorkerDelay > 0 { + log.Infof("theoretical max regional ballots per reporting interval: %d", + int64(cfg.Workers)*int64(cfg.BallotBatch)* + cfg.ReportInterval.Nanoseconds()/cfg.WorkerDelay.Nanoseconds()) + log.Info("low performance may indicate contention due to too few candidates") + } + + // We want to run the servers with their own lifecycle in + // order to allow in-flight mutations a chance to drain. + svrStopper := stopper.WithContext(context.Background()) + defer svrStopper.Stop(0) + + // Allow a limit on the number of ballots to be set. + if cfg.MaxBallots > 0 { + cfg.remaining.Store(int64(cfg.MaxBallots)) + ctx.Go(func() error { + // Wait for all ballots to be inserted. + for cfg.remaining.Load() > 0 { + select { + case <-ctx.Stopping(): + return nil + case <-time.After(time.Second): + } + } + log.Infof("hit maximum number of ballots, waiting for replication") + + // Loop until we see the right number of ballots in the + // target databases. + top: + for { + select { + case <-ctx.Stopping(): + return nil + case <-time.After(time.Second): + } + + schema: + for _, s := range cfg.schemas { + total, err := s.sumTotal(ctx) + if err != nil { + return err + } + if total < cfg.MaxBallots { + // Waiting for replication. + continue top + } else if total == cfg.MaxBallots { + // Success! + continue schema + } else { + // Over-replication or other error. + return errors.Errorf( + "region %s had %d ballots, but expecting at most %d", + s.region, total, cfg.MaxBallots) + } + } + + log.Infof("%d ballots inserted in all regions", cfg.MaxBallots) + // Fast stop, since replication is done. + ctx.Stop(10 * time.Millisecond) + svrStopper.Stop(10 * time.Millisecond) + return nil + } + }) + } else { + cfg.remaining.Store(math.MaxInt64) + } + + schemas := make([]*schema, 0, len(cfg.Connect)) + for idx := range cfg.Connect { + sch, err := worker(ctx, cfg, region(idx)) + if err != nil { + return err + } + schemas = append(schemas, sch) + } + cfg.schemas = schemas // Make available for post-hoc testing. + + // Create a connection between the hub and each of the + // spokes. We could offer additional topologies, such as a + // uni- or bi-directional ring or a fully-connected graph. + hubSch := schemas[0] + schemas = schemas[1:] + + for idx, leafSch := range schemas { + toLeaf, err := startServer(svrStopper, cfg, hubSch, leafSch) + if err != nil { + return errors.Wrapf(err, "starting server %s -> %s", hubSch.region, leafSch.region) + } + + // Each leaf needs its own server that will write + // updates to the hub until such time as cdc-sink can + // support multiple different changefeeds writing to the + // same destination schema. See + // https://github.com/cockroachdb/cockroach/issues/112880 + // for a way that this might become trivially easy. + toHub, err := startServer(svrStopper, cfg, leafSch, hubSch) + if err != nil { + return errors.Wrapf(err, "starting server %s -> %s", leafSch.region, hubSch.region) + } + + var toLeafJob int64 + if toLeafJob, err = createFeed(ctx, hubSch, toLeaf); err != nil { + return errors.Wrapf(err, "feed %s -> %s", hubSch.region, toLeaf) + } + svrStopper.Defer(func() { + if err := cancelFeed(stopper.Background(), hubSch, toLeafJob); err != nil { + log.WithError(err).Warnf("could not cancel changefeed job %d in %d", + toLeafJob, hubSch.region) + } + }) + + var toHubJob int64 + if toHubJob, err = createFeed(ctx, schemas[idx], toHub); err != nil { + return errors.Wrapf(err, "feed %s -> %s", hubSch.region, toHub) + } + svrStopper.Defer(func() { + if err := cancelFeed(stopper.Background(), schemas[idx], toHubJob); err != nil { + log.WithError(err).Warnf("could not cancel changefeed job %d in %d", + toHubJob, schemas[idx].region) + } + }) + } + + // Wait for the workers to be shut down. + if workerErr := ctx.Wait(); workerErr != nil { + return workerErr + } + + // This won't execute if the servers were already stopped by + // reaching the maximum number of ballots. + svrStopper.Go(func() error { + log.Infof("workload stopped, pausing for %s to drain", cfg.DrainDelay) + time.Sleep(cfg.DrainDelay) + svrStopper.Stop(5 * time.Second) + return nil + }) + return svrStopper.Wait() +} + +// cancelFeed cancels the changefeed job. +func cancelFeed(ctx *stopper.Context, in *schema, job int64) error { + return retry.Retry(ctx, func(ctx context.Context) error { + // We need to open a one-shot connection since the worker pool + // already be closed at this point. + conn, err := pgx.Connect(ctx, in.db.ConnectionString) + if err != nil { + return errors.Wrapf(err, "could not connect to %d to cancel job %d", in.region, job) + } + defer func() { _ = conn.Close(ctx) }() + _, err = conn.Exec(ctx, "CANCEL JOB $1", job) + if err == nil { + log.Infof("cleaned up changefeed %d in %d", job, in.region) + } + return err + }) +} + +// createFeed creates a changefeed from the given source to the given +// server. It returns the job id of the changefeed. +func createFeed(ctx *stopper.Context, from *schema, to *url.URL) (int64, error) { + // Set the cluster settings once, if we need to. + var enabled bool + if err := retry.Retry(ctx, func(ctx context.Context) error { + return from.db.QueryRowContext(ctx, "SHOW CLUSTER SETTING kv.rangefeed.enabled").Scan(&enabled) + }); err != nil { + return 0, errors.Wrap(err, "could not check cluster setting") + } + if !enabled { + if err := retry.Retry(ctx, func(ctx context.Context) error { + _, err := from.db.ExecContext(ctx, "SET CLUSTER SETTING kv.rangefeed.enabled = true") + return errors.Wrapf(err, "%s: could not enable rangefeeds", from.region) + }); err != nil { + return 0, err + } + } + + var hasLicense bool + if err := from.db.QueryRowContext(ctx, + "SELECT (SELECT * FROM [SHOW CLUSTER SETTING enterprise.license]) <> ''", + ).Scan(&hasLicense); err != nil { + return 0, errors.Wrapf(err, "%s: could not query for presense of license", from.region) + } + if !hasLicense { + lic, hasLic := os.LookupEnv("COCKROACH_DEV_LICENSE") + org, hasOrg := os.LookupEnv("COCKROACH_DEV_ORGANIZATION") + if hasLic && hasOrg { + if err := retry.Retry(ctx, func(ctx context.Context) error { + _, err := from.db.ExecContext(ctx, "SET CLUSTER SETTING enterprise.license = $1", lic) + return errors.Wrapf(err, "%s: could not set cluster license", from.region) + }); err != nil { + return 0, err + } + + if err := retry.Retry(ctx, func(ctx context.Context) error { + _, err := from.db.ExecContext(ctx, "SET CLUSTER SETTING cluster.organization = $1", org) + return errors.Wrapf(err, "%s: could not set cluster organization", from.region) + }); err != nil { + return 0, err + } + } else { + return 0, errors.New("changefeeds require an enterprise license; use 'cockroach demo' command") + } + } + + q := fmt.Sprintf(`CREATE CHANGEFEED FOR TABLE %s, %s, %s INTO '%s' +WITH diff, updated, resolved='1s', min_checkpoint_frequency='1s', +webhook_sink_config='{"Flush":{"Messages":1000,"Frequency":"1s"}}'`, + from.ballots, from.candidates, from.totals, to.String(), + ) + + var job int64 + if err := retry.Retry(ctx, func(ctx context.Context) error { + err := from.db.QueryRowContext(ctx, q).Scan(&job) + return errors.Wrapf(err, "%s: could not create changefeed", from.region) + }); err != nil { + return 0, err + } + + log.Infof("created feed from %s into %s", from.region, to) + return job, nil +} + +func openSchema(ctx *stopper.Context, cfg *config, r region) (*schema, error) { + conn := cfg.Connect[r] + + pool, err := stdpool.OpenPgxAsTarget(ctx, conn, + stdpool.WithConnectionLifetime(5*time.Minute), + stdpool.WithPoolSize(cfg.Workers+1), + stdpool.WithTransactionTimeout(time.Minute), + ) + if err != nil { + return nil, errors.Wrapf(err, "could not connect to %s database", r) + } + + return newSchema((*types.SourcePool)(pool), cfg.Enclosing, r), nil +} + +//go:embed script/* +var scriptFS embed.FS + +// startServer runs an instance of cdc-sink which will feed into +// the given destination. It returns the base URL for delivering +// messages to the sink. +func startServer(ctx *stopper.Context, cfg *config, src, dest *schema) (*url.URL, error) { + targetConn := cfg.Connect[dest.region] + + stagingName := ident.New(fmt.Sprintf("cdc_sink_%d_%s_%s", os.Getpid(), src.region, dest.region)) + stagingSchema := ident.MustSchema(dest.enclosing, stagingName) + + if _, err := dest.db.ExecContext(ctx, fmt.Sprintf( + `CREATE SCHEMA IF NOT EXISTS %s`, stagingSchema)); err != nil { + return nil, errors.Wrap(err, dest.region.String()) + } + + srvConfig := &server.Config{ + CDC: cdc.Config{ + BaseConfig: logical.BaseConfig{ + BackfillWindow: 0, + ForeignKeysEnabled: true, + RetryDelay: 10 * time.Second, + ScriptConfig: script.Config{ + FS: &subfs.SubstitutingFS{ + FS: scriptFS, + Replacer: strings.NewReplacer( + "DESTINATION_DB", fmt.Sprintf(`"%s_%s"`, + cfg.Enclosing.Raw(), dest.region.String()), + "DESTINATION_INDEX", dest.region.String(), + "SOURCE_INDEX", src.region.String(), + ), + }, + MainPath: "/script/votr.ts", + }, + StagingConn: targetConn, + StagingSchema: stagingSchema, + TargetConn: targetConn, + }, + FlushEveryTimestamp: true, // Needed for delta behavior. + MetaTableName: ident.New("resolved_timestamps"), + RetireOffset: 24 * time.Hour, // For debugging. + }, + BindAddr: fmt.Sprintf("%s:0", cfg.SinkHost), + DisableAuth: true, + GenerateSelfSigned: true, + } + if err := srvConfig.Preflight(); err != nil { + return nil, errors.Wrap(err, dest.region.String()) + } + srv, err := server.NewServer(ctx, srvConfig) + if err != nil { + return nil, errors.Wrap(err, dest.region.String()) + } + _, port, err := net.SplitHostPort(srv.GetAddr().String()) + if err != nil { + return nil, errors.Wrap(err, dest.region.String()) + } + + sink := &url.URL{ + Scheme: "webhook-https", + Host: fmt.Sprintf("%s:%s", cfg.SinkHost, port), + Path: ident.Join(dest.candidates.Schema(), ident.Raw, '/'), + RawQuery: "insecure_tls_skip_verify=true", + } + + return sink, nil +} + +// worker will launch a number of goroutines into the context to insert +// ballots and to verify the consistency of the dataset. +func worker(ctx *stopper.Context, cfg *config, r region) (*schema, error) { + sch, err := openSchema(ctx, cfg, r) + if err != nil { + return nil, err + } + + if err := sch.ensureCandidates(ctx, cfg.Candidates); err != nil { + return nil, errors.Wrapf(err, "%s: could not create candidate entries", r) + } + + warnings, err := sch.validate(ctx, false) + if err != nil { + return nil, errors.Wrapf(err, "%s: could not perform initial validation", r) + } + if len(warnings) > 0 { + log.WithField( + "inconsistent", warnings, + ).Warnf("%s: workload starting from inconsistent state", r) + if cfg.ErrInconsistent { + return nil, errors.Errorf("exiting due to inconsistency in %s", r) + } + } + + // Don't run the workload on the hub if we're in a hub-and-spoke + // model. The ON UPDATE clause in the table definitions isn't able + // to refer to the existing value in the column. Thus, we cannot + // transparently patch the vector clock without making the workload + // itself aware of the vector clock column. + // + // In a two-region setup, we only need to be able to prevent a + // mutation from cycling between the regions, so the vector clock is + // an over-complication. + if len(cfg.Connect) == 2 || r > 0 { + ballotsToReport := &atomic.Int64{} + for i := 0; i < cfg.Workers; i++ { + ctx.Go(func() error { + // Stagger start by a random amount. + sleep := time.Duration(rand.Int63n(cfg.WorkerDelay.Nanoseconds())) + + for { + select { + case <-time.After(sleep): + sleep = cfg.WorkerDelay + case <-ctx.Stopping(): + return nil + } + + batchSize := int64(cfg.BallotBatch) + remaining := cfg.remaining.Add(-batchSize) + batchSize + if remaining <= 0 { + return nil + } + if remaining < batchSize { + batchSize = remaining + } + + if err := sch.doStuff(ctx, int(batchSize)); err != nil { + return errors.Wrap(err, "could not stuff ballots") + } + ballotsToReport.Add(batchSize) + } + }) + } + + // Print status. + ctx.Go(func() error { + for { + select { + case <-time.After(cfg.ReportInterval): + log.Infof("%s: inserted %d ballots", r, ballotsToReport.Swap(0)) + case <-ctx.Stopping(): + return nil + } + } + }) + } + + // Start a background validation loop. + ctx.Go(func() error { + for { + select { + case <-time.After(cfg.ValidationDelay): + case <-ctx.Stopping(): + return nil + } + + warnings, err := sch.validate(ctx, true) + if err != nil { + return errors.Wrap(err, "could not validate results") + } + if len(warnings) == 0 { + log.Infof("%s: workload is consistent", r) + continue + } + log.WithField( + "inconsistent", warnings, + ).Warnf("%s: workload in inconsistent state", r) + if cfg.ErrInconsistent { + return errors.Errorf("exiting due to inconsistency in %s", r) + } + } + }) + + return sch, nil +} diff --git a/internal/cmd/votr/votr_test.go b/internal/cmd/votr/votr_test.go new file mode 100644 index 000000000..4a82f4188 --- /dev/null +++ b/internal/cmd/votr/votr_test.go @@ -0,0 +1,90 @@ +// Copyright 2023 The Cockroach Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// SPDX-License-Identifier: Apache-2.0 + +package votr + +import ( + "fmt" + "testing" + "time" + + "github.com/cockroachdb/cdc-sink/internal/sinktest/base" + "github.com/cockroachdb/cdc-sink/internal/util/stopper" + "github.com/spf13/pflag" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestVOTR(t *testing.T) { + if testing.Short() { + t.Skip("long test") + return + } + t.Run("2DC", func(t *testing.T) { + testVOTR(t, 2) + }) + t.Run("hub_spoke", func(t *testing.T) { + testVOTR(t, 4) + }) +} + +func testVOTR(t *testing.T, numDBs int) { + a := assert.New(t) + r := require.New(t) + + fixture, err := base.NewFixture(t) + r.NoError(err) + ctx := fixture.Context + + checkSupported(t, fixture.SourcePool) + + // Use all default values, except for the connect strings. + flags := pflag.NewFlagSet("test", pflag.ContinueOnError) + cfg := &config{} + cfg.Bind(flags) + r.NoError(flags.Parse(nil)) + cfg.Connect = make([]string, numDBs) + for i := range cfg.Connect { + cfg.Connect[i] = fixture.SourcePool.ConnectionString + } + cfg.DrainDelay = 30 * time.Second + cfg.Enclosing, _ = fixture.TargetSchema.Split() + cfg.ErrInconsistent = true + cfg.MaxBallots = 2048 + + // Initialize the schema. + initCtx := stopper.WithContext(ctx) + r.NoError(doInit(initCtx, cfg)) + initCtx.Stop(100 * time.Millisecond) + r.NoError(initCtx.Wait()) + + // doRun will exit on its own due to cfg.StopAfter. This function + // will return an error if any inconsistencies were seen. + runCtx := stopper.WithContext(ctx) + r.NoError(doRun(runCtx, cfg)) + r.NoError(runCtx.Wait()) + + // Final validation of totals. We know that ballots and totals are + // consistent within a destination schema, but we want to check that + // the totals are consistent across the various SQL databases. + for idx, s := range cfg.schemas { + var total int + r.NoError(fixture.SourcePool.QueryRowContext(ctx, + fmt.Sprintf("SELECT sum(total) FROM %s", s.totals), + ).Scan(&total)) + a.Equalf(cfg.MaxBallots, total, "idx %d", idx) + } +} diff --git a/internal/source/server/provider.go b/internal/source/server/provider.go index 986c2b594..f49a4c9b1 100644 --- a/internal/source/server/provider.go +++ b/internal/source/server/provider.go @@ -160,7 +160,7 @@ func ProvideServer( return nil }) - return &Server{auth, diags, mux} + return &Server{listener.Addr(), auth, diags, mux} } // ProvideTLSConfig is called by Wire to load the certificate and key diff --git a/internal/source/server/server.go b/internal/source/server/server.go index b1793c83b..1547c35f6 100644 --- a/internal/source/server/server.go +++ b/internal/source/server/server.go @@ -21,6 +21,7 @@ package server // This file contains code repackaged from main.go import ( + "net" "net/http" "github.com/cockroachdb/cdc-sink/internal/types" @@ -31,6 +32,7 @@ import ( // A Server receives incoming CockroachDB changefeed messages and // applies them to a target cluster. type Server struct { + addr net.Addr auth types.Authenticator diags *diag.Diagnostics mux *http.ServeMux @@ -42,6 +44,11 @@ var ( _ stdlogical.HasServeMux = (*Server)(nil) ) +// GetAddr returns the address that the server is attached to. +func (s *Server) GetAddr() net.Addr { + return s.addr +} + // GetAuthenticator implements [stdlogical.HasAuthenticator]. func (s *Server) GetAuthenticator() types.Authenticator { return s.auth diff --git a/internal/target/apply/apply.go b/internal/target/apply/apply.go index fce7eb158..7767a89f5 100644 --- a/internal/target/apply/apply.go +++ b/internal/target/apply/apply.go @@ -180,7 +180,7 @@ func (a *apply) Apply(ctx context.Context, tx types.TargetQuerier, muts []types. if err != nil { a.errors.Inc() } - return err + return errors.Wrap(err, a.target.Raw()) } a.mu.RLock() @@ -535,7 +535,15 @@ func (a *apply) upsertBagsLocked( // Copy the conflicting data from the table into the Conflict. for idx, col := range a.mu.templates.Columns { - c.Target.Put(col.Name, blockingData[idx]) + v := blockingData[idx] + // Allow runtime type fixups + if col.Parse != nil { + v, err = col.Parse(v) + if err != nil { + return errors.Wrapf(err, "could not invoke type helper on column %s", col.Name) + } + } + c.Target.Put(col.Name, v) } // Supply before data if we received it from upstream. diff --git a/internal/target/schemawatch/parse_helpers.go b/internal/target/schemawatch/parse_helpers.go index 8ab74f379..39272eeda 100644 --- a/internal/target/schemawatch/parse_helpers.go +++ b/internal/target/schemawatch/parse_helpers.go @@ -121,10 +121,26 @@ func coerceJSON(a any) (any, error) { return json.Marshal(a) } +// reifyJSON converts an incoming byte array to a reified type. +func reifyJSON(v any) (any, error) { + if buf, ok := v.([]byte); ok { + if err := json.Unmarshal(buf, &v); err != nil { + return nil, err + } + } + return v, nil +} + func parseHelper(product types.Product, typeName string) func(any) (any, error) { switch product { case types.ProductCockroachDB, types.ProductPostgreSQL: - // Just pass through, since we have similar representations. + switch typeName { + case "JSON", "JSONB": + // Ensure that data bound for a JSON column is reified. + return reifyJSON + default: + return nil + } case types.ProductMariaDB, types.ProductMySQL: // Coerce types to the what the mysql driver expects. switch typeName { diff --git a/main.go b/main.go index 865f290b0..ed5438511 100644 --- a/main.go +++ b/main.go @@ -37,6 +37,7 @@ import ( "github.com/cockroachdb/cdc-sink/internal/cmd/preflight" "github.com/cockroachdb/cdc-sink/internal/cmd/start" "github.com/cockroachdb/cdc-sink/internal/cmd/version" + "github.com/cockroachdb/cdc-sink/internal/cmd/votr" "github.com/cockroachdb/cdc-sink/internal/script" "github.com/cockroachdb/cdc-sink/internal/util/logfmt" "github.com/cockroachdb/cdc-sink/internal/util/stopper" @@ -115,6 +116,7 @@ func main() { script.HelpCommand(), start.Command(), version.Command(), + votr.Command(), ) stop := stopper.WithContext(context.Background())