diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 769de04e389d..129a7b82cbeb 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -243,7 +243,7 @@ /pkg/ccl/jobsccl/ @cockroachdb/jobs-prs @cockroachdb/disaster-recovery /pkg/ccl/changefeedccl/ @cockroachdb/cdc-prs -/pkg/ccl/streamingccl/ @cockroachdb/disaster-recovery +/pkg/ccl/crosscluster/ @cockroachdb/disaster-recovery /pkg/ccl/backupccl/ @cockroachdb/disaster-recovery /pkg/ccl/backupccl/*_job.go @cockroachdb/disaster-recovery @cockroachdb/jobs-prs /pkg/ccl/revertccl/ @cockroachdb/disaster-recovery diff --git a/build/bazelutil/nogo_config.json b/build/bazelutil/nogo_config.json index 2c5a7238e9ee..c15a472ce358 100644 --- a/build/bazelutil/nogo_config.json +++ b/build/bazelutil/nogo_config.json @@ -45,7 +45,7 @@ "pkg/ccl/multitenantccl/tenantcostclient/tenant_side.go": "flagged by linter, should be evaluated", "pkg/ccl/sqlproxyccl/acl/watcher.go": "flagged by linter, should be evaluated", "pkg/ccl/sqlproxyccl/tenantdirsvr/test_directory_svr.go": "flagged by linter, should be evaluated", - "pkg/ccl/streamingccl/streamclient/random_stream_client.go": "flagged by linter, should be evaluated", + "pkg/ccl/crosscluster/streamclient/random_stream_client.go": "flagged by linter, should be evaluated", "pkg/cli/clisqlexec/format_csv_tsv.go": "flagged by linter, should be evaluated", "pkg/cli/clisqlshell/sql.go": "flagged by linter, should be evaluated", "pkg/cli/syncbench/syncbench.go": "flagged by linter, should be evaluated", diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index c3fc4f6cb157..9ca27ba57000 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -40,6 +40,11 @@ ALL_TESTS = [ "//pkg/ccl/cloudccl/cloudprivilege:cloudprivilege_test", "//pkg/ccl/cloudccl/externalconn:externalconn_test", "//pkg/ccl/cloudccl/gcp:gcp_test", + "//pkg/ccl/crosscluster/logical:logical_test", + "//pkg/ccl/crosscluster/physical:physical_test", + "//pkg/ccl/crosscluster/producer:producer_test", + "//pkg/ccl/crosscluster/replicationutils:replicationutils_test", + "//pkg/ccl/crosscluster/streamclient:streamclient_test", "//pkg/ccl/importerccl:importerccl_test", "//pkg/ccl/jobsccl/jobsprotectedtsccl:jobsprotectedtsccl_test", "//pkg/ccl/jwtauthccl:jwtauthccl_test", @@ -95,11 +100,6 @@ ALL_TESTS = [ "//pkg/ccl/sqlproxyccl:sqlproxyccl_test", "//pkg/ccl/storageccl/engineccl:engineccl_test", "//pkg/ccl/storageccl:storageccl_test", - "//pkg/ccl/streamingccl/logical:logical_test", - "//pkg/ccl/streamingccl/replicationutils:replicationutils_test", - "//pkg/ccl/streamingccl/streamclient:streamclient_test", - "//pkg/ccl/streamingccl/streamingest:streamingest_test", - "//pkg/ccl/streamingccl/streamproducer:streamproducer_test", "//pkg/ccl/telemetryccl:telemetryccl_test", "//pkg/ccl/testccl/authccl:authccl_test", "//pkg/ccl/testccl/sqlccl:sqlccl_test", @@ -870,6 +870,18 @@ GO_TARGETS = [ "//pkg/ccl/cmdccl/clusterrepl:clusterrepl_lib", "//pkg/ccl/cmdccl/stub-schema-registry:stub-schema-registry", "//pkg/ccl/cmdccl/stub-schema-registry:stub-schema-registry_lib", + "//pkg/ccl/crosscluster/logical:logical", + "//pkg/ccl/crosscluster/logical:logical_test", + "//pkg/ccl/crosscluster/physical:physical", + "//pkg/ccl/crosscluster/physical:physical_test", + "//pkg/ccl/crosscluster/producer:producer", + "//pkg/ccl/crosscluster/producer:producer_test", + "//pkg/ccl/crosscluster/replicationtestutils:replicationtestutils", + "//pkg/ccl/crosscluster/replicationutils:replicationutils", + "//pkg/ccl/crosscluster/replicationutils:replicationutils_test", + "//pkg/ccl/crosscluster/streamclient:streamclient", + "//pkg/ccl/crosscluster/streamclient:streamclient_test", + "//pkg/ccl/crosscluster:crosscluster", "//pkg/ccl/gssapiccl:gssapiccl", "//pkg/ccl/importerccl:importerccl_test", "//pkg/ccl/jobsccl/jobsprotectedtsccl:jobsprotectedtsccl_test", @@ -960,18 +972,6 @@ GO_TARGETS = [ "//pkg/ccl/storageccl/engineccl:engineccl_test", "//pkg/ccl/storageccl:storageccl", "//pkg/ccl/storageccl:storageccl_test", - "//pkg/ccl/streamingccl/logical:logical", - "//pkg/ccl/streamingccl/logical:logical_test", - "//pkg/ccl/streamingccl/replicationtestutils:replicationtestutils", - "//pkg/ccl/streamingccl/replicationutils:replicationutils", - "//pkg/ccl/streamingccl/replicationutils:replicationutils_test", - "//pkg/ccl/streamingccl/streamclient:streamclient", - "//pkg/ccl/streamingccl/streamclient:streamclient_test", - "//pkg/ccl/streamingccl/streamingest:streamingest", - "//pkg/ccl/streamingccl/streamingest:streamingest_test", - "//pkg/ccl/streamingccl/streamproducer:streamproducer", - "//pkg/ccl/streamingccl/streamproducer:streamproducer_test", - "//pkg/ccl/streamingccl:streamingccl", "//pkg/ccl/telemetryccl:telemetryccl_test", "//pkg/ccl/testccl/authccl:authccl_test", "//pkg/ccl/testccl/sqlccl:sqlccl_test", diff --git a/pkg/ccl/BUILD.bazel b/pkg/ccl/BUILD.bazel index a364c2d69be0..66c894316e98 100644 --- a/pkg/ccl/BUILD.bazel +++ b/pkg/ccl/BUILD.bazel @@ -12,6 +12,9 @@ go_library( "//pkg/ccl/buildccl", "//pkg/ccl/changefeedccl", "//pkg/ccl/cliccl", + "//pkg/ccl/crosscluster/logical", + "//pkg/ccl/crosscluster/physical", + "//pkg/ccl/crosscluster/producer", "//pkg/ccl/gssapiccl", "//pkg/ccl/jwtauthccl", "//pkg/ccl/kvccl", @@ -25,9 +28,6 @@ go_library( "//pkg/ccl/securityccl/fipsccl", "//pkg/ccl/storageccl", "//pkg/ccl/storageccl/engineccl", - "//pkg/ccl/streamingccl/logical", - "//pkg/ccl/streamingccl/streamingest", - "//pkg/ccl/streamingccl/streamproducer", "//pkg/ccl/utilccl", "//pkg/ccl/workloadccl", "//pkg/server", diff --git a/pkg/ccl/ccl_init.go b/pkg/ccl/ccl_init.go index 9acad433310b..2f7e2fb5cc65 100644 --- a/pkg/ccl/ccl_init.go +++ b/pkg/ccl/ccl_init.go @@ -18,6 +18,9 @@ import ( _ "github.com/cockroachdb/cockroach/pkg/ccl/buildccl" _ "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl" _ "github.com/cockroachdb/cockroach/pkg/ccl/cliccl" + _ "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/logical" + _ "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/physical" + _ "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/producer" _ "github.com/cockroachdb/cockroach/pkg/ccl/gssapiccl" _ "github.com/cockroachdb/cockroach/pkg/ccl/jwtauthccl" _ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl" @@ -31,9 +34,6 @@ import ( _ "github.com/cockroachdb/cockroach/pkg/ccl/securityccl/fipsccl" _ "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" _ "github.com/cockroachdb/cockroach/pkg/ccl/storageccl/engineccl" - _ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/logical" - _ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingest" - _ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamproducer" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" _ "github.com/cockroachdb/cockroach/pkg/ccl/workloadccl" "github.com/cockroachdb/cockroach/pkg/server" diff --git a/pkg/ccl/cmdccl/clusterrepl/BUILD.bazel b/pkg/ccl/cmdccl/clusterrepl/BUILD.bazel index be62d34fef55..7b76f1b93038 100644 --- a/pkg/ccl/cmdccl/clusterrepl/BUILD.bazel +++ b/pkg/ccl/cmdccl/clusterrepl/BUILD.bazel @@ -6,8 +6,8 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/ccl/cmdccl/clusterrepl", visibility = ["//visibility:private"], deps = [ - "//pkg/ccl/streamingccl", - "//pkg/ccl/streamingccl/streamclient", + "//pkg/ccl/crosscluster", + "//pkg/ccl/crosscluster/streamclient", "//pkg/cli/exit", "//pkg/keys", "//pkg/repstream/streampb", diff --git a/pkg/ccl/cmdccl/clusterrepl/main.go b/pkg/ccl/cmdccl/clusterrepl/main.go index e1a492a7e961..7ad34694583c 100644 --- a/pkg/ccl/cmdccl/clusterrepl/main.go +++ b/pkg/ccl/cmdccl/clusterrepl/main.go @@ -17,8 +17,8 @@ import ( "os/signal" "time" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/streamclient" "github.com/cockroachdb/cockroach/pkg/cli/exit" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" @@ -243,16 +243,16 @@ func subscriptionConsumer( return sub.Err() } switch event.Type() { - case streamingccl.KVEvent: + case crosscluster.KVEvent: sz = 0 for _, kv := range event.GetKVs() { sz += kv.Size() } - case streamingccl.SSTableEvent: + case crosscluster.SSTableEvent: ssTab := event.GetSSTable() sz = ssTab.Size() - case streamingccl.DeleteRangeEvent: - case streamingccl.CheckpointEvent: + case crosscluster.DeleteRangeEvent: + case crosscluster.CheckpointEvent: fmt.Printf("%s checkpoint\n", timeutil.Now().Format(time.RFC3339)) resolved := event.GetResolvedSpans() for _, r := range resolved { diff --git a/pkg/ccl/streamingccl/BUILD.bazel b/pkg/ccl/crosscluster/BUILD.bazel similarity index 78% rename from pkg/ccl/streamingccl/BUILD.bazel rename to pkg/ccl/crosscluster/BUILD.bazel index 610c56a2d34d..14bc5b9c8700 100644 --- a/pkg/ccl/streamingccl/BUILD.bazel +++ b/pkg/ccl/crosscluster/BUILD.bazel @@ -1,14 +1,14 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( - name = "streamingccl", + name = "crosscluster", srcs = [ "addresses.go", "errors.go", "event.go", "settings.go", ], - importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl", + importpath = "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster", visibility = ["//visibility:public"], deps = [ "//pkg/jobs/jobspb", diff --git a/pkg/ccl/streamingccl/addresses.go b/pkg/ccl/crosscluster/addresses.go similarity index 98% rename from pkg/ccl/streamingccl/addresses.go rename to pkg/ccl/crosscluster/addresses.go index 185feac23a02..c9a4199f7353 100644 --- a/pkg/ccl/streamingccl/addresses.go +++ b/pkg/ccl/crosscluster/addresses.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamingccl +package crosscluster import "net/url" diff --git a/pkg/ccl/streamingccl/errors.go b/pkg/ccl/crosscluster/errors.go similarity index 98% rename from pkg/ccl/streamingccl/errors.go rename to pkg/ccl/crosscluster/errors.go index f6e98bcb94a2..b6404120194d 100644 --- a/pkg/ccl/streamingccl/errors.go +++ b/pkg/ccl/crosscluster/errors.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamingccl +package crosscluster import ( "fmt" diff --git a/pkg/ccl/streamingccl/event.go b/pkg/ccl/crosscluster/event.go similarity index 99% rename from pkg/ccl/streamingccl/event.go rename to pkg/ccl/crosscluster/event.go index c3f1f3c7a634..a424e30afb97 100644 --- a/pkg/ccl/streamingccl/event.go +++ b/pkg/ccl/crosscluster/event.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamingccl +package crosscluster import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" diff --git a/pkg/ccl/streamingccl/logical/BUILD.bazel b/pkg/ccl/crosscluster/logical/BUILD.bazel similarity index 91% rename from pkg/ccl/streamingccl/logical/BUILD.bazel rename to pkg/ccl/crosscluster/logical/BUILD.bazel index 8c155004007b..089a3cc991aa 100644 --- a/pkg/ccl/streamingccl/logical/BUILD.bazel +++ b/pkg/ccl/crosscluster/logical/BUILD.bazel @@ -9,15 +9,15 @@ go_library( "lww_row_processor.go", "metrics.go", ], - importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/logical", + importpath = "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/logical", visibility = ["//visibility:public"], deps = [ "//pkg/base", "//pkg/ccl/changefeedccl/cdcevent", "//pkg/ccl/changefeedccl/changefeedbase", - "//pkg/ccl/streamingccl", - "//pkg/ccl/streamingccl/streamclient", - "//pkg/ccl/streamingccl/streamingest", + "//pkg/ccl/crosscluster", + "//pkg/ccl/crosscluster/physical", + "//pkg/ccl/crosscluster/streamclient", "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/jobs/jobsprofiler", @@ -71,8 +71,8 @@ go_test( deps = [ "//pkg/base", "//pkg/ccl", + "//pkg/ccl/crosscluster/replicationtestutils", "//pkg/ccl/storageccl", - "//pkg/ccl/streamingccl/replicationtestutils", "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/kv/kvpb", diff --git a/pkg/ccl/streamingccl/logical/logical_replication_dist.go b/pkg/ccl/crosscluster/logical/logical_replication_dist.go similarity index 90% rename from pkg/ccl/streamingccl/logical/logical_replication_dist.go rename to pkg/ccl/crosscluster/logical/logical_replication_dist.go index 9fd513665154..6baeec1a0d66 100644 --- a/pkg/ccl/streamingccl/logical/logical_replication_dist.go +++ b/pkg/ccl/crosscluster/logical/logical_replication_dist.go @@ -12,9 +12,9 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingest" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/physical" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/streamclient" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -27,7 +27,7 @@ import ( func constructLogicalReplicationWriterSpecs( ctx context.Context, - streamAddress streamingccl.StreamAddress, + streamAddress crosscluster.StreamAddress, topology streamclient.Topology, destSQLInstances []sql.InstanceLocality, initialScanTimestamp hlc.Timestamp, @@ -51,7 +51,7 @@ func constructLogicalReplicationWriterSpecs( writerSpecs := make(map[base.SQLInstanceID][]execinfrapb.LogicalReplicationWriterSpec, len(destSQLInstances)) // Update stream ingestion specs with their matched source node. - matcher := streamingest.MakeNodeMatcher(destSQLInstances) + matcher := physical.MakeNodeMatcher(destSQLInstances) for _, candidate := range matcher.FindSourceNodePriority(topology) { destID := matcher.FindMatch(candidate.ClosestDestIDs) partition := candidate.Partition diff --git a/pkg/ccl/streamingccl/logical/logical_replication_job.go b/pkg/ccl/crosscluster/logical/logical_replication_job.go similarity index 97% rename from pkg/ccl/streamingccl/logical/logical_replication_job.go rename to pkg/ccl/crosscluster/logical/logical_replication_job.go index d9a40de5ab6e..3a335157a844 100644 --- a/pkg/ccl/streamingccl/logical/logical_replication_job.go +++ b/pkg/ccl/crosscluster/logical/logical_replication_job.go @@ -15,9 +15,9 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingest" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/physical" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/streamclient" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler" @@ -195,13 +195,13 @@ func (r *logicalReplicationResumer) ingest( if err != nil { return err } - destNodeLocalities, err := streamingest.GetDestNodeLocalities(ctx, distSQLPlanner, nodes) + destNodeLocalities, err := physical.GetDestNodeLocalities(ctx, distSQLPlanner, nodes) if err != nil { return err } specs, err := constructLogicalReplicationWriterSpecs(ctx, - streamingccl.StreamAddress(payload.TargetClusterConnStr), + crosscluster.StreamAddress(payload.TargetClusterConnStr), plan.Topology, destNodeLocalities, payload.ReplicationStartTime, diff --git a/pkg/ccl/streamingccl/logical/logical_replication_job_test.go b/pkg/ccl/crosscluster/logical/logical_replication_job_test.go similarity index 100% rename from pkg/ccl/streamingccl/logical/logical_replication_job_test.go rename to pkg/ccl/crosscluster/logical/logical_replication_job_test.go diff --git a/pkg/ccl/streamingccl/logical/logical_replication_writer_processor.go b/pkg/ccl/crosscluster/logical/logical_replication_writer_processor.go similarity index 97% rename from pkg/ccl/streamingccl/logical/logical_replication_writer_processor.go rename to pkg/ccl/crosscluster/logical/logical_replication_writer_processor.go index 3f59b93da8d2..1fde815fb416 100644 --- a/pkg/ccl/streamingccl/logical/logical_replication_writer_processor.go +++ b/pkg/ccl/crosscluster/logical/logical_replication_writer_processor.go @@ -14,8 +14,8 @@ import ( "sync/atomic" "time" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/streamclient" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" @@ -190,7 +190,7 @@ func (lrw *logicalReplicationWriterProcessor) Start(ctx context.Context) { if redactedErr != nil { log.Warning(lrw.Ctx(), "could not redact stream address") } - streamClient, err := streamclient.NewStreamClient(ctx, streamingccl.StreamAddress(addr), db, + streamClient, err := streamclient.NewStreamClient(ctx, crosscluster.StreamAddress(addr), db, streamclient.WithStreamID(streampb.StreamID(lrw.spec.StreamID)), streamclient.WithCompression(true), ) @@ -348,10 +348,10 @@ func (lrw *logicalReplicationWriterProcessor) consumeEvents(ctx context.Context) } func (lrw *logicalReplicationWriterProcessor) handleEvent( - ctx context.Context, event streamingccl.Event, + ctx context.Context, event crosscluster.Event, ) error { switch event.Type() { - case streamingccl.KVEvent: + case crosscluster.KVEvent: ts := event.GetKVs()[0].KeyValue.Value.Timestamp.GoTime() lrw.metrics.AdmitLatency.RecordValue( timeutil.Since(ts).Nanoseconds()) @@ -366,21 +366,21 @@ func (lrw *logicalReplicationWriterProcessor) handleEvent( } switch event.Type() { - case streamingccl.KVEvent: + case crosscluster.KVEvent: if err := lrw.flushBuffer(ctx, event.GetKVs()); err != nil { return err } - case streamingccl.CheckpointEvent: + case crosscluster.CheckpointEvent: if err := lrw.checkpoint(ctx, event); err != nil { return err } - case streamingccl.SSTableEvent, streamingccl.DeleteRangeEvent: + case crosscluster.SSTableEvent, crosscluster.DeleteRangeEvent: // TODO(ssd): Handle SSTableEvent here eventually. I'm not sure // we'll ever want to truly handle DeleteRangeEvent since // currently those are only used by DROP which should be handled // via whatever mechanism handles schema changes. return errors.Newf("unexpected event for online stream: %v", event) - case streamingccl.SplitEvent: + case crosscluster.SplitEvent: log.Infof(lrw.Ctx(), "SplitEvent received on logical replication stream") default: return errors.Newf("unknown streaming event type %v", event.Type()) @@ -389,7 +389,7 @@ func (lrw *logicalReplicationWriterProcessor) handleEvent( } func (lrw *logicalReplicationWriterProcessor) checkpoint( - ctx context.Context, event streamingccl.Event, + ctx context.Context, event crosscluster.Event, ) error { if streamingKnobs, ok := lrw.FlowCtx.TestingKnobs().StreamingTestingKnobs.(*sql.StreamingTestingKnobs); ok { if streamingKnobs != nil && streamingKnobs.ElideCheckpointEvent != nil { diff --git a/pkg/ccl/streamingccl/logical/lww_row_processor.go b/pkg/ccl/crosscluster/logical/lww_row_processor.go similarity index 100% rename from pkg/ccl/streamingccl/logical/lww_row_processor.go rename to pkg/ccl/crosscluster/logical/lww_row_processor.go diff --git a/pkg/ccl/streamingccl/logical/lww_row_processor_test.go b/pkg/ccl/crosscluster/logical/lww_row_processor_test.go similarity index 98% rename from pkg/ccl/streamingccl/logical/lww_row_processor_test.go rename to pkg/ccl/crosscluster/logical/lww_row_processor_test.go index d18a9d324969..0117652c8d89 100644 --- a/pkg/ccl/streamingccl/logical/lww_row_processor_test.go +++ b/pkg/ccl/crosscluster/logical/lww_row_processor_test.go @@ -14,7 +14,7 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationtestutils" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationtestutils" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" diff --git a/pkg/ccl/streamingccl/logical/main_test.go b/pkg/ccl/crosscluster/logical/main_test.go similarity index 100% rename from pkg/ccl/streamingccl/logical/main_test.go rename to pkg/ccl/crosscluster/logical/main_test.go diff --git a/pkg/ccl/streamingccl/logical/metrics.go b/pkg/ccl/crosscluster/logical/metrics.go similarity index 100% rename from pkg/ccl/streamingccl/logical/metrics.go rename to pkg/ccl/crosscluster/logical/metrics.go diff --git a/pkg/ccl/streamingccl/streamingest/BUILD.bazel b/pkg/ccl/crosscluster/physical/BUILD.bazel similarity index 90% rename from pkg/ccl/streamingccl/streamingest/BUILD.bazel rename to pkg/ccl/crosscluster/physical/BUILD.bazel index 5721778a0117..b22e5cdb24da 100644 --- a/pkg/ccl/streamingccl/streamingest/BUILD.bazel +++ b/pkg/ccl/crosscluster/physical/BUILD.bazel @@ -1,7 +1,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( - name = "streamingest", + name = "physical", srcs = [ "alter_replication_job.go", "external_connection.go", @@ -17,16 +17,16 @@ go_library( "stream_ingestion_planning.go", "stream_ingestion_processor.go", ], - importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingest", + importpath = "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/physical", visibility = ["//visibility:public"], deps = [ "//pkg/base", "//pkg/ccl/backupccl", + "//pkg/ccl/crosscluster", + "//pkg/ccl/crosscluster/producer", + "//pkg/ccl/crosscluster/replicationutils", + "//pkg/ccl/crosscluster/streamclient", "//pkg/ccl/revertccl", - "//pkg/ccl/streamingccl", - "//pkg/ccl/streamingccl/replicationutils", - "//pkg/ccl/streamingccl/streamclient", - "//pkg/ccl/streamingccl/streamproducer", "//pkg/ccl/utilccl", "//pkg/cloud/externalconn", "//pkg/cloud/externalconn/connectionpb", @@ -89,7 +89,7 @@ go_library( ) go_test( - name = "streamingest_test", + name = "physical_test", size = "large", srcs = [ "alter_replication_job_test.go", @@ -109,7 +109,7 @@ go_test( "stream_ingestion_processor_test.go", ], data = glob(["testdata/**"]), - embed = [":streamingest"], + embed = [":physical"], exec_properties = select({ "//build/toolchains:is_heavy": {"test.Pool": "heavy"}, "//conditions:default": {"test.Pool": "large"}, @@ -121,13 +121,13 @@ go_test( "//pkg/ccl/backupccl", "//pkg/ccl/changefeedccl", "//pkg/ccl/changefeedccl/cdctest", + "//pkg/ccl/crosscluster", + "//pkg/ccl/crosscluster/producer", + "//pkg/ccl/crosscluster/replicationtestutils", + "//pkg/ccl/crosscluster/replicationutils", + "//pkg/ccl/crosscluster/streamclient", "//pkg/ccl/kvccl/kvtenantccl", "//pkg/ccl/storageccl", - "//pkg/ccl/streamingccl", - "//pkg/ccl/streamingccl/replicationtestutils", - "//pkg/ccl/streamingccl/replicationutils", - "//pkg/ccl/streamingccl/streamclient", - "//pkg/ccl/streamingccl/streamproducer", "//pkg/cloud/impl:cloudimpl", "//pkg/cloud/nodelocal", "//pkg/jobs", diff --git a/pkg/ccl/streamingccl/streamingest/alter_replication_job.go b/pkg/ccl/crosscluster/physical/alter_replication_job.go similarity index 98% rename from pkg/ccl/streamingccl/streamingest/alter_replication_job.go rename to pkg/ccl/crosscluster/physical/alter_replication_job.go index a18b31fe9fe6..67874134734a 100644 --- a/pkg/ccl/streamingccl/streamingest/alter_replication_job.go +++ b/pkg/ccl/crosscluster/physical/alter_replication_job.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamingest +package physical import ( "context" @@ -14,9 +14,9 @@ import ( "math" "time" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationutils" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/streamclient" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" @@ -337,14 +337,14 @@ func alterTenantRestartReplication( return CannotSetExpirationWindowErr } - streamAddress := streamingccl.StreamAddress(srcAddr) + streamAddress := crosscluster.StreamAddress(srcAddr) streamURL, err := streamAddress.URL() if err != nil { return errors.Wrap(err, "url") } - streamAddress = streamingccl.StreamAddress(streamURL.String()) + streamAddress = crosscluster.StreamAddress(streamURL.String()) - client, err := streamclient.NewStreamClient(ctx, streamingccl.StreamAddress(srcAddr), p.ExecCfg().InternalDB) + client, err := streamclient.NewStreamClient(ctx, crosscluster.StreamAddress(srcAddr), p.ExecCfg().InternalDB) if err != nil { return errors.Wrap(err, "creating client") } diff --git a/pkg/ccl/streamingccl/streamingest/alter_replication_job_test.go b/pkg/ccl/crosscluster/physical/alter_replication_job_test.go similarity index 99% rename from pkg/ccl/streamingccl/streamingest/alter_replication_job_test.go rename to pkg/ccl/crosscluster/physical/alter_replication_job_test.go index 407d828f36f1..1192a16f4572 100644 --- a/pkg/ccl/streamingccl/streamingest/alter_replication_job_test.go +++ b/pkg/ccl/crosscluster/physical/alter_replication_job_test.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamingest +package physical import ( "context" @@ -16,7 +16,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationtestutils" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationtestutils" "github.com/cockroachdb/cockroach/pkg/cloud/nodelocal" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" diff --git a/pkg/ccl/streamingccl/streamingest/datadriven_test.go b/pkg/ccl/crosscluster/physical/datadriven_test.go similarity index 99% rename from pkg/ccl/streamingccl/streamingest/datadriven_test.go rename to pkg/ccl/crosscluster/physical/datadriven_test.go index 9de71a6cab91..6db0d677c5d9 100644 --- a/pkg/ccl/streamingccl/streamingest/datadriven_test.go +++ b/pkg/ccl/crosscluster/physical/datadriven_test.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamingest +package physical import ( "context" @@ -18,8 +18,8 @@ import ( "testing" "time" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationtestutils" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationtestutils" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationutils" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" diff --git a/pkg/ccl/streamingccl/streamingest/external_connection.go b/pkg/ccl/crosscluster/physical/external_connection.go similarity index 94% rename from pkg/ccl/streamingccl/streamingest/external_connection.go rename to pkg/ccl/crosscluster/physical/external_connection.go index 7832c632339b..7660cf43e5c0 100644 --- a/pkg/ccl/streamingccl/streamingest/external_connection.go +++ b/pkg/ccl/crosscluster/physical/external_connection.go @@ -6,13 +6,13 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamingest +package physical import ( "context" "net/url" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/streamclient" "github.com/cockroachdb/cockroach/pkg/cloud/externalconn" "github.com/cockroachdb/cockroach/pkg/cloud/externalconn/connectionpb" ) diff --git a/pkg/ccl/streamingccl/streamingest/ingest_span_configs.go b/pkg/ccl/crosscluster/physical/ingest_span_configs.go similarity index 98% rename from pkg/ccl/streamingccl/streamingest/ingest_span_configs.go rename to pkg/ccl/crosscluster/physical/ingest_span_configs.go index 7b48322deeba..a019349424d6 100644 --- a/pkg/ccl/streamingccl/streamingest/ingest_span_configs.go +++ b/pkg/ccl/crosscluster/physical/ingest_span_configs.go @@ -6,15 +6,15 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamingest +package physical import ( "context" "time" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/streamclient" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" @@ -156,11 +156,11 @@ func (sc *spanConfigIngestor) consumeSpanConfigs( } } -func (sc *spanConfigIngestor) consumeEvent(ctx context.Context, event streamingccl.Event) error { +func (sc *spanConfigIngestor) consumeEvent(ctx context.Context, event crosscluster.Event) error { switch event.Type() { - case streamingccl.SpanConfigEvent: + case crosscluster.SpanConfigEvent: return sc.bufferRecord(ctx, event.GetSpanConfigEvent()) - case streamingccl.CheckpointEvent: + case crosscluster.CheckpointEvent: if sc.bufferingFullScan && sc.bufferIsEmpty() { return errors.AssertionFailedf("a flush after the full scan checkpoint must have data in it") } diff --git a/pkg/ccl/streamingccl/streamingest/ingest_span_configs_test.go b/pkg/ccl/crosscluster/physical/ingest_span_configs_test.go similarity index 99% rename from pkg/ccl/streamingccl/streamingest/ingest_span_configs_test.go rename to pkg/ccl/crosscluster/physical/ingest_span_configs_test.go index 617a2abc4b37..96a5d07fe63a 100644 --- a/pkg/ccl/streamingccl/streamingest/ingest_span_configs_test.go +++ b/pkg/ccl/crosscluster/physical/ingest_span_configs_test.go @@ -6,15 +6,15 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamingest +package physical import ( "context" "testing" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationtestutils" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationtestutils" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/streamclient" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedcache" "github.com/cockroachdb/cockroach/pkg/roachpb" diff --git a/pkg/ccl/streamingccl/streamingest/main_test.go b/pkg/ccl/crosscluster/physical/main_test.go similarity index 97% rename from pkg/ccl/streamingccl/streamingest/main_test.go rename to pkg/ccl/crosscluster/physical/main_test.go index 7a3bd14fcf39..94cd7926f311 100644 --- a/pkg/ccl/streamingccl/streamingest/main_test.go +++ b/pkg/ccl/crosscluster/physical/main_test.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamingest_test +package physical_test import ( "os" diff --git a/pkg/ccl/streamingccl/streamingest/merged_subscription.go b/pkg/ccl/crosscluster/physical/merged_subscription.go similarity index 95% rename from pkg/ccl/streamingccl/streamingest/merged_subscription.go rename to pkg/ccl/crosscluster/physical/merged_subscription.go index ca72fa65e19b..6b6f12a49e39 100644 --- a/pkg/ccl/streamingccl/streamingest/merged_subscription.go +++ b/pkg/ccl/crosscluster/physical/merged_subscription.go @@ -6,12 +6,12 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamingest +package physical import ( "context" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/streamclient" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" ) diff --git a/pkg/ccl/streamingccl/streamingest/merged_subscription_test.go b/pkg/ccl/crosscluster/physical/merged_subscription_test.go similarity index 87% rename from pkg/ccl/streamingccl/streamingest/merged_subscription_test.go rename to pkg/ccl/crosscluster/physical/merged_subscription_test.go index bf556654ed52..f047cb191bf8 100644 --- a/pkg/ccl/streamingccl/streamingest/merged_subscription_test.go +++ b/pkg/ccl/crosscluster/physical/merged_subscription_test.go @@ -6,15 +6,15 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamingest +package physical import ( "context" "sort" "testing" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/streamclient" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -28,18 +28,18 @@ func TestMergeSubscriptionsRun(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - events := func(partition string) []streamingccl.Event { - return []streamingccl.Event{ - streamingccl.MakeKVEventFromKVs([]roachpb.KeyValue{{ + events := func(partition string) []crosscluster.Event { + return []crosscluster.Event{ + crosscluster.MakeKVEventFromKVs([]roachpb.KeyValue{{ Key: []byte(partition + "_key1"), }}), - streamingccl.MakeKVEventFromKVs([]roachpb.KeyValue{{ + crosscluster.MakeKVEventFromKVs([]roachpb.KeyValue{{ Key: []byte(partition + "_key2"), }}), } } mockClient := &streamclient.MockStreamClient{ - PartitionEvents: map[string][]streamingccl.Event{ + PartitionEvents: map[string][]crosscluster.Event{ "partition1": events("partition1"), "partition2": events("partition2"), }, diff --git a/pkg/ccl/streamingccl/streamingest/metrics.go b/pkg/ccl/crosscluster/physical/metrics.go similarity index 99% rename from pkg/ccl/streamingccl/streamingest/metrics.go rename to pkg/ccl/crosscluster/physical/metrics.go index e74ce48b6665..3795370ff388 100644 --- a/pkg/ccl/streamingccl/streamingest/metrics.go +++ b/pkg/ccl/crosscluster/physical/metrics.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamingest +package physical import ( "time" diff --git a/pkg/ccl/streamingccl/streamingest/node_lag_detector.go b/pkg/ccl/crosscluster/physical/node_lag_detector.go similarity index 99% rename from pkg/ccl/streamingccl/streamingest/node_lag_detector.go rename to pkg/ccl/crosscluster/physical/node_lag_detector.go index 992da52eaa12..64058af66dc9 100644 --- a/pkg/ccl/streamingccl/streamingest/node_lag_detector.go +++ b/pkg/ccl/crosscluster/physical/node_lag_detector.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamingest +package physical import ( "context" diff --git a/pkg/ccl/streamingccl/streamingest/node_lag_detector_test.go b/pkg/ccl/crosscluster/physical/node_lag_detector_test.go similarity index 99% rename from pkg/ccl/streamingccl/streamingest/node_lag_detector_test.go rename to pkg/ccl/crosscluster/physical/node_lag_detector_test.go index b4cc6265adfa..1e5b3af223c4 100644 --- a/pkg/ccl/streamingccl/streamingest/node_lag_detector_test.go +++ b/pkg/ccl/crosscluster/physical/node_lag_detector_test.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamingest +package physical import ( "context" diff --git a/pkg/ccl/streamingccl/streamingest/rangekey_batcher_test.go b/pkg/ccl/crosscluster/physical/rangekey_batcher_test.go similarity index 99% rename from pkg/ccl/streamingccl/streamingest/rangekey_batcher_test.go rename to pkg/ccl/crosscluster/physical/rangekey_batcher_test.go index 5eded07be102..c38eca824705 100644 --- a/pkg/ccl/streamingccl/streamingest/rangekey_batcher_test.go +++ b/pkg/ccl/crosscluster/physical/rangekey_batcher_test.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamingest +package physical import ( "context" diff --git a/pkg/ccl/streamingccl/streamingest/replication_execution_details.go b/pkg/ccl/crosscluster/physical/replication_execution_details.go similarity index 99% rename from pkg/ccl/streamingccl/streamingest/replication_execution_details.go rename to pkg/ccl/crosscluster/physical/replication_execution_details.go index fd54782cd63a..1e4127a11bfa 100644 --- a/pkg/ccl/streamingccl/streamingest/replication_execution_details.go +++ b/pkg/ccl/crosscluster/physical/replication_execution_details.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamingest +package physical import ( "bytes" diff --git a/pkg/ccl/streamingccl/streamingest/replication_execution_details_test.go b/pkg/ccl/crosscluster/physical/replication_execution_details_test.go similarity index 99% rename from pkg/ccl/streamingccl/streamingest/replication_execution_details_test.go rename to pkg/ccl/crosscluster/physical/replication_execution_details_test.go index f650596dd9f2..183774894d0d 100644 --- a/pkg/ccl/streamingccl/streamingest/replication_execution_details_test.go +++ b/pkg/ccl/crosscluster/physical/replication_execution_details_test.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamingest +package physical import ( "bytes" diff --git a/pkg/ccl/streamingccl/streamingest/replication_random_client_test.go b/pkg/ccl/crosscluster/physical/replication_random_client_test.go similarity index 98% rename from pkg/ccl/streamingccl/streamingest/replication_random_client_test.go rename to pkg/ccl/crosscluster/physical/replication_random_client_test.go index 0ff7fd270162..72a69ce7e098 100644 --- a/pkg/ccl/streamingccl/streamingest/replication_random_client_test.go +++ b/pkg/ccl/crosscluster/physical/replication_random_client_test.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamingest +package physical import ( "context" @@ -18,10 +18,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationtestutils" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationutils" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/streamclient" _ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" // To start tenants. - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationtestutils" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" diff --git a/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go b/pkg/ccl/crosscluster/physical/replication_stream_e2e_test.go similarity index 99% rename from pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go rename to pkg/ccl/crosscluster/physical/replication_stream_e2e_test.go index 7d5beac83808..8db392fd1a42 100644 --- a/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go +++ b/pkg/ccl/crosscluster/physical/replication_stream_e2e_test.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamingest +package physical import ( "context" @@ -18,9 +18,9 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationtestutils" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationtestutils" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationutils" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/streamclient" _ "github.com/cockroachdb/cockroach/pkg/cloud/impl" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go b/pkg/ccl/crosscluster/physical/stream_ingest_manager.go similarity index 96% rename from pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go rename to pkg/ccl/crosscluster/physical/stream_ingest_manager.go index ab7b3ae1220d..33723c44dd79 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingest_manager.go +++ b/pkg/ccl/crosscluster/physical/stream_ingest_manager.go @@ -6,14 +6,14 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamingest +package physical import ( "context" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationutils" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/streamclient" "github.com/cockroachdb/cockroach/pkg/ccl/revertccl" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go b/pkg/ccl/crosscluster/physical/stream_ingestion_dist.go similarity index 98% rename from pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go rename to pkg/ccl/crosscluster/physical/stream_ingestion_dist.go index 13a55d536524..2b7009564408 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go +++ b/pkg/ccl/crosscluster/physical/stream_ingestion_dist.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamingest +package physical import ( "context" @@ -16,9 +16,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/streamclient" "github.com/cockroachdb/cockroach/pkg/ccl/revertccl" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler" @@ -148,7 +148,7 @@ func startDistIngestion( replanOracle := sql.ReplanOnCustomFunc( measurePlanChange, func() float64 { - return streamingccl.ReplanThreshold.Get(execCtx.ExecCfg().SV()) + return crosscluster.ReplanThreshold.Get(execCtx.ExecCfg().SV()) }, ) @@ -157,7 +157,7 @@ func startDistIngestion( planner.generatePlan, execCtx, replanOracle, - func() time.Duration { return streamingccl.ReplanFrequency.Get(execCtx.ExecCfg().SV()) }, + func() time.Duration { return crosscluster.ReplanFrequency.Get(execCtx.ExecCfg().SV()) }, ) tracingAggCh := make(chan *execinfrapb.TracingAggregatorEvents) @@ -179,7 +179,7 @@ func startDistIngestion( spanConfigIngestStopper := make(chan struct{}) streamSpanConfigs := func(ctx context.Context) error { - if !streamingccl.ReplicateSpanConfigsEnabled.Get(&execCtx.ExecCfg().Settings.SV) { + if !crosscluster.ReplicateSpanConfigsEnabled.Get(&execCtx.ExecCfg().Settings.SV) { log.Warningf(ctx, "span config replication is disabled") return nil } @@ -342,7 +342,7 @@ func createInitialSplits( destNodeCount int, destTenantID roachpb.TenantID, ) error { - ctx, sp := tracing.ChildSpan(ctx, "streamingest.createInitialSplits") + ctx, sp := tracing.ChildSpan(ctx, "physical.createInitialSplits") defer sp.Finish() rekeyer, err := backupccl.MakeKeyRewriterFromRekeys(codec, @@ -534,7 +534,7 @@ func (p *replicationFlowPlanner) constructPlanGenerator( streamIngestionSpecs, streamIngestionFrontierSpec, err := constructStreamIngestionPlanSpecs( ctx, - streamingccl.StreamAddress(details.StreamAddress), + crosscluster.StreamAddress(details.StreamAddress), topology, destNodeLocalities, initialScanTimestamp, @@ -757,7 +757,7 @@ func GetDestNodeLocalities( func constructStreamIngestionPlanSpecs( ctx context.Context, - streamAddress streamingccl.StreamAddress, + streamAddress crosscluster.StreamAddress, topology streamclient.Topology, destSQLInstances []sql.InstanceLocality, initialScanTimestamp hlc.Timestamp, diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist_test.go b/pkg/ccl/crosscluster/physical/stream_ingestion_dist_test.go similarity index 98% rename from pkg/ccl/streamingccl/streamingest/stream_ingestion_dist_test.go rename to pkg/ccl/crosscluster/physical/stream_ingestion_dist_test.go index 7b885745d939..ee59feeb8818 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist_test.go +++ b/pkg/ccl/crosscluster/physical/stream_ingestion_dist_test.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamingest +package physical import ( "context" @@ -16,8 +16,8 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/streamclient" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" @@ -318,7 +318,7 @@ func TestSourceDestMatching(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - fakeStreamAddress := streamingccl.StreamAddress("") + fakeStreamAddress := crosscluster.StreamAddress("") sipSpecs, _, err := constructStreamIngestionPlanSpecs( ctx, fakeStreamAddress, diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go b/pkg/ccl/crosscluster/physical/stream_ingestion_frontier_processor.go similarity index 97% rename from pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go rename to pkg/ccl/crosscluster/physical/stream_ingestion_frontier_processor.go index 1bdfd79cf48c..82dc7fcc07c3 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go +++ b/pkg/ccl/crosscluster/physical/stream_ingestion_frontier_processor.go @@ -6,14 +6,14 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamingest +package physical import ( "context" "time" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/streamclient" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" @@ -126,7 +126,7 @@ func newStreamIngestionFrontierProcessor( metrics: flowCtx.Cfg.JobRegistry.MetricsStruct().StreamIngest.(*Metrics), client: streamClient, heartbeatSender: streamclient.NewHeartbeatSender(ctx, streamClient, streamID, func() time.Duration { - return streamingccl.StreamReplicationConsumerHeartbeatFrequency.Get(&flowCtx.EvalCtx.Settings.SV) + return crosscluster.StreamReplicationConsumerHeartbeatFrequency.Get(&flowCtx.EvalCtx.Settings.SV) }), persistedReplicatedTime: spec.ReplicatedTimeAtStart, } @@ -303,7 +303,7 @@ func (sf *streamIngestionFrontier) noteResolvedTimestamps( // the status of each partition. func (sf *streamIngestionFrontier) maybeUpdateProgress() error { ctx := sf.Ctx() - updateFreq := streamingccl.JobCheckpointFrequency.Get(&sf.flowCtx.Cfg.Settings.SV) + updateFreq := crosscluster.JobCheckpointFrequency.Get(&sf.flowCtx.Cfg.Settings.SV) if updateFreq == 0 || timeutil.Since(sf.lastPartitionUpdate) < updateFreq { return nil } @@ -404,7 +404,7 @@ func (sf *streamIngestionFrontier) maybeUpdateProgress() error { // we always persist the entries to the same info key and so we never have more // than one row describing the state of the frontier at a given point in time. func (sf *streamIngestionFrontier) maybePersistFrontierEntries() error { - dumpFreq := streamingccl.DumpFrontierEntries.Get(&sf.FlowCtx.Cfg.Settings.SV) + dumpFreq := crosscluster.DumpFrontierEntries.Get(&sf.FlowCtx.Cfg.Settings.SV) if dumpFreq == 0 || timeutil.Since(sf.lastFrontierDump) < dumpFreq { return nil } @@ -439,8 +439,8 @@ func (sf *streamIngestionFrontier) maybeCheckForLaggingNodes() error { // We halve the frequency relative to the ReplanFrequency setting (i.e. // check twice as often), because the node lag checker will only restart the // distSQL plan if a node is lagging for 2 checks in a row. - checkFreq := streamingccl.ReplanFrequency.Get(&sf.FlowCtx.Cfg.Settings.SV) / 2 - maxLag := streamingccl.InterNodeLag.Get(&sf.FlowCtx.Cfg.Settings.SV) + checkFreq := crosscluster.ReplanFrequency.Get(&sf.FlowCtx.Cfg.Settings.SV) / 2 + maxLag := crosscluster.InterNodeLag.Get(&sf.FlowCtx.Cfg.Settings.SV) if checkFreq == 0 || maxLag == 0 || timeutil.Since(sf.lastNodeLagCheck) < checkFreq { log.VEventf(ctx, 2, "skipping lag replanning check: maxLag %d; checkFreq %.2f; last node check %s; time since last check %.2f", maxLag, checkFreq.Minutes(), sf.lastNodeLagCheck, timeutil.Since(sf.lastNodeLagCheck).Minutes()) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go b/pkg/ccl/crosscluster/physical/stream_ingestion_frontier_processor_test.go similarity index 99% rename from pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go rename to pkg/ccl/crosscluster/physical/stream_ingestion_frontier_processor_test.go index fc16f40f328e..a5600f5ab5a9 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go +++ b/pkg/ccl/crosscluster/physical/stream_ingestion_frontier_processor_test.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamingest +package physical import ( "context" diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/crosscluster/physical/stream_ingestion_job.go similarity index 98% rename from pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go rename to pkg/ccl/crosscluster/physical/stream_ingestion_job.go index 70e8a559e26c..6615553ba1bb 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/crosscluster/physical/stream_ingestion_job.go @@ -6,17 +6,17 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamingest +package physical import ( "context" "fmt" "time" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/producer" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationutils" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/streamclient" "github.com/cockroachdb/cockroach/pkg/ccl/revertccl" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamproducer" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts" @@ -185,7 +185,7 @@ func startPostCutoverRetentionJob( req := streampb.ReplicationProducerRequest{ ReplicationStartTime: cutoverTime, } - _, err = streamproducer.StartReplicationProducerJob(ctx, evalCtx, txn, info.Name, req, true) + _, err = producer.StartReplicationProducerJob(ctx, evalCtx, txn, info.Name, req, true) return err }) } @@ -430,7 +430,7 @@ func maybeRevertToCutoverTimestamp( ctx context.Context, p sql.JobExecContext, ingestionJob *jobs.Job, ) (hlc.Timestamp, bool, error) { - ctx, span := tracing.ChildSpan(ctx, "streamingest.revertToCutoverTimestamp") + ctx, span := tracing.ChildSpan(ctx, "physical.revertToCutoverTimestamp") defer span.Finish() // The update below sets the ReplicationStatus to diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go b/pkg/ccl/crosscluster/physical/stream_ingestion_job_test.go similarity index 99% rename from pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go rename to pkg/ccl/crosscluster/physical/stream_ingestion_job_test.go index abe7f6748fc1..a66f2d8a36d0 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go +++ b/pkg/ccl/crosscluster/physical/stream_ingestion_job_test.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamingest +package physical import ( "context" @@ -17,9 +17,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" _ "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationtestutils" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils" - _ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamproducer" + _ "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/producer" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationtestutils" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationutils" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_manager_test.go b/pkg/ccl/crosscluster/physical/stream_ingestion_manager_test.go similarity index 99% rename from pkg/ccl/streamingccl/streamingest/stream_ingestion_manager_test.go rename to pkg/ccl/crosscluster/physical/stream_ingestion_manager_test.go index 81aac617cc32..71b15c8fc927 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_manager_test.go +++ b/pkg/ccl/crosscluster/physical/stream_ingestion_manager_test.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamingest +package physical import ( "context" diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go b/pkg/ccl/crosscluster/physical/stream_ingestion_planning.go similarity index 97% rename from pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go rename to pkg/ccl/crosscluster/physical/stream_ingestion_planning.go index 57d345a55986..f3138e3fde8f 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go +++ b/pkg/ccl/crosscluster/physical/stream_ingestion_planning.go @@ -6,13 +6,13 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamingest +package physical import ( "context" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/streamclient" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" @@ -159,12 +159,12 @@ func ingestionPlanHook( return err } - streamAddress := streamingccl.StreamAddress(from) + streamAddress := crosscluster.StreamAddress(from) streamURL, err := streamAddress.URL() if err != nil { return err } - streamAddress = streamingccl.StreamAddress(streamURL.String()) + streamAddress = crosscluster.StreamAddress(streamURL.String()) if roachpb.IsSystemTenantName(roachpb.TenantName(dstTenantName)) || roachpb.IsSystemTenantID(dstTenantID) { @@ -233,7 +233,7 @@ func ingestionPlanHook( func createReplicationJob( ctx context.Context, p sql.PlanHookState, - streamAddress streamingccl.StreamAddress, + streamAddress crosscluster.StreamAddress, sourceTenant string, destinationTenantID roachpb.TenantID, retentionTTLSeconds int32, diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/crosscluster/physical/stream_ingestion_processor.go similarity index 98% rename from pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go rename to pkg/ccl/crosscluster/physical/stream_ingestion_processor.go index 654dccda6965..b9c28ee9fa7b 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/crosscluster/physical/stream_ingestion_processor.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamingest +package physical import ( "context" @@ -15,9 +15,9 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationutils" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/streamclient" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" @@ -292,7 +292,7 @@ type streamIngestionProcessor struct { // PartitionEvent augments a normal event with the partition it came from. type PartitionEvent struct { - streamingccl.Event + crosscluster.Event partition string } @@ -441,7 +441,7 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) { streamClient = sip.forceClientForTests log.Infof(ctx, "using testing client") } else { - streamClient, err = streamclient.NewStreamClient(ctx, streamingccl.StreamAddress(addr), db, + streamClient, err = streamclient.NewStreamClient(ctx, crosscluster.StreamAddress(addr), db, streamclient.WithStreamID(streampb.StreamID(sip.spec.StreamID)), streamclient.WithCompression(compress.Get(&evalCtx.Settings.SV))) if err != nil { @@ -715,7 +715,7 @@ func (sip *streamIngestionProcessor) consumeEvents(ctx context.Context) error { func (sip *streamIngestionProcessor) handleEvent(event PartitionEvent) error { sv := &sip.FlowCtx.Cfg.Settings.SV - if event.Type() == streamingccl.KVEvent { + if event.Type() == crosscluster.KVEvent { sip.metrics.AdmitLatency.RecordValue( timeutil.Since(event.GetKVs()[0].KeyValue.Value.Timestamp.GoTime()).Nanoseconds()) } @@ -729,19 +729,19 @@ func (sip *streamIngestionProcessor) handleEvent(event PartitionEvent) error { } switch event.Type() { - case streamingccl.KVEvent: + case crosscluster.KVEvent: if err := sip.bufferKVs(event.GetKVs()); err != nil { return err } - case streamingccl.SSTableEvent: + case crosscluster.SSTableEvent: if err := sip.bufferSST(event.GetSSTable()); err != nil { return err } - case streamingccl.DeleteRangeEvent: + case crosscluster.DeleteRangeEvent: if err := sip.bufferDelRange(event.GetDeleteRange()); err != nil { return err } - case streamingccl.CheckpointEvent: + case crosscluster.CheckpointEvent: if err := sip.bufferCheckpoint(event); err != nil { return err } @@ -759,7 +759,7 @@ func (sip *streamIngestionProcessor) handleEvent(event PartitionEvent) error { return err } return nil - case streamingccl.SplitEvent: + case crosscluster.SplitEvent: if err := sip.handleSplitEvent(event.GetSplitEvent()); err != nil { return err } @@ -968,7 +968,7 @@ type rangeKeySST struct { // Flush all the range keys buffered so far into storage as an SST. func (r *rangeKeyBatcher) flush(ctx context.Context, toFlush mvccRangeKeyValues) error { - _, sp := tracing.ChildSpan(ctx, "streamingest.rangeKeyBatcher.flush") + _, sp := tracing.ChildSpan(ctx, "physical.rangeKeyBatcher.flush") defer sp.Finish() if len(toFlush) == 0 { diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go b/pkg/ccl/crosscluster/physical/stream_ingestion_processor_test.go similarity index 91% rename from pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go rename to pkg/ccl/crosscluster/physical/stream_ingestion_processor_test.go index 74bdfdec72c3..aef756760a31 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go +++ b/pkg/ccl/crosscluster/physical/stream_ingestion_processor_test.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamingest +package physical import ( "context" @@ -19,9 +19,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationtestutils" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationtestutils" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/streamclient" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" @@ -136,19 +136,19 @@ func TestStreamIngestionProcessor(t *testing.T) { } t.Run("finite stream client", func(t *testing.T) { - events := func() []streamingccl.Event { - return []streamingccl.Event{ - streamingccl.MakeKVEventFromKVs(sampleKV()), - streamingccl.MakeKVEventFromKVs(sampleKV()), - streamingccl.MakeCheckpointEvent(sampleCheckpoint(p1Span, 2)), - streamingccl.MakeKVEventFromKVs(sampleKV()), - streamingccl.MakeKVEventFromKVs(sampleKV()), - streamingccl.MakeCheckpointEvent(sampleCheckpoint(p1Span, 4)), - streamingccl.MakeCheckpointEvent(sampleCheckpoint(p2Span, 5)), + events := func() []crosscluster.Event { + return []crosscluster.Event{ + crosscluster.MakeKVEventFromKVs(sampleKV()), + crosscluster.MakeKVEventFromKVs(sampleKV()), + crosscluster.MakeCheckpointEvent(sampleCheckpoint(p1Span, 2)), + crosscluster.MakeKVEventFromKVs(sampleKV()), + crosscluster.MakeKVEventFromKVs(sampleKV()), + crosscluster.MakeCheckpointEvent(sampleCheckpoint(p1Span, 4)), + crosscluster.MakeCheckpointEvent(sampleCheckpoint(p2Span, 5)), } } mockClient := &streamclient.MockStreamClient{ - PartitionEvents: map[string][]streamingccl.Event{string(p1): events(), string(p2): events()}, + PartitionEvents: map[string][]crosscluster.Event{string(p1): events(), string(p2): events()}, } initialScanTimestamp := hlc.Timestamp{WallTime: 1} @@ -175,15 +175,15 @@ func TestStreamIngestionProcessor(t *testing.T) { "partition 2 should advance to timestamp 5") }) t.Run("time-based-flush", func(t *testing.T) { - events := func() []streamingccl.Event { - return []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(sampleCheckpoint(p1Span, 2)), - streamingccl.MakeCheckpointEvent(sampleCheckpoint(p1Span, 4)), + events := func() []crosscluster.Event { + return []crosscluster.Event{ + crosscluster.MakeCheckpointEvent(sampleCheckpoint(p1Span, 2)), + crosscluster.MakeCheckpointEvent(sampleCheckpoint(p1Span, 4)), } } mockClient := &streamclient.MockStreamClient{ DoneCh: make(chan struct{}), - PartitionEvents: map[string][]streamingccl.Event{string(p1): events()}, + PartitionEvents: map[string][]crosscluster.Event{string(p1): events()}, } initialScanTimestamp := hlc.Timestamp{WallTime: 1} @@ -218,16 +218,16 @@ func TestStreamIngestionProcessor(t *testing.T) { require.NoError(t, g.Wait()) }) t.Run("kv-size-based-flush", func(t *testing.T) { - events := func() []streamingccl.Event { - return []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(sampleCheckpoint(p1Span, 2)), - streamingccl.MakeCheckpointEvent(sampleCheckpoint(p1Span, 4)), - streamingccl.MakeKVEventFromKVs(sampleKV()), + events := func() []crosscluster.Event { + return []crosscluster.Event{ + crosscluster.MakeCheckpointEvent(sampleCheckpoint(p1Span, 2)), + crosscluster.MakeCheckpointEvent(sampleCheckpoint(p1Span, 4)), + crosscluster.MakeKVEventFromKVs(sampleKV()), } } mockClient := &streamclient.MockStreamClient{ DoneCh: make(chan struct{}), - PartitionEvents: map[string][]streamingccl.Event{string(p1): events()}, + PartitionEvents: map[string][]crosscluster.Event{string(p1): events()}, } initialScanTimestamp := hlc.Timestamp{WallTime: 1} @@ -265,11 +265,11 @@ func TestStreamIngestionProcessor(t *testing.T) { t.Run("range-kv-size-based-flush", func(t *testing.T) { key, err := keys.RewriteKeyToTenantPrefix(p1Key, keys.MakeTenantPrefix(roachpb.MustMakeTenantID(tenantID))) require.NoError(t, err) - events := func() []streamingccl.Event { - return []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(sampleCheckpoint(p1Span, 2)), - streamingccl.MakeCheckpointEvent(sampleCheckpoint(p1Span, 4)), - streamingccl.MakeDeleteRangeEvent(kvpb.RangeFeedDeleteRange{ + events := func() []crosscluster.Event { + return []crosscluster.Event{ + crosscluster.MakeCheckpointEvent(sampleCheckpoint(p1Span, 2)), + crosscluster.MakeCheckpointEvent(sampleCheckpoint(p1Span, 4)), + crosscluster.MakeDeleteRangeEvent(kvpb.RangeFeedDeleteRange{ Span: roachpb.Span{Key: key, EndKey: key.Next()}, Timestamp: hlc.Timestamp{WallTime: 5}, }), @@ -277,7 +277,7 @@ func TestStreamIngestionProcessor(t *testing.T) { } mockClient := &streamclient.MockStreamClient{ DoneCh: make(chan struct{}), - PartitionEvents: map[string][]streamingccl.Event{string(p1): events()}, + PartitionEvents: map[string][]crosscluster.Event{string(p1): events()}, } initialScanTimestamp := hlc.Timestamp{WallTime: 1} @@ -316,16 +316,16 @@ func TestStreamIngestionProcessor(t *testing.T) { // Two partitions, checkpoint for each, client start time for each should match t.Run("resume from checkpoint", func(t *testing.T) { - events := func() []streamingccl.Event { - return []streamingccl.Event{ - streamingccl.MakeKVEventFromKVs(sampleKV()), - streamingccl.MakeCheckpointEvent(sampleCheckpoint(p2Span, 2)), - streamingccl.MakeKVEventFromKVs(sampleKV()), - streamingccl.MakeCheckpointEvent(sampleCheckpoint(p1Span, 6)), + events := func() []crosscluster.Event { + return []crosscluster.Event{ + crosscluster.MakeKVEventFromKVs(sampleKV()), + crosscluster.MakeCheckpointEvent(sampleCheckpoint(p2Span, 2)), + crosscluster.MakeKVEventFromKVs(sampleKV()), + crosscluster.MakeCheckpointEvent(sampleCheckpoint(p1Span, 6)), } } mockClient := &streamclient.MockStreamClient{ - PartitionEvents: map[string][]streamingccl.Event{string(p1): events(), string(p2): events()}, + PartitionEvents: map[string][]crosscluster.Event{string(p1): events(), string(p2): events()}, } initialScanTimestamp := hlc.Timestamp{WallTime: 1} @@ -532,7 +532,7 @@ func TestRandomClientGeneration(t *testing.T) { streamAddr := getTestRandomClientURI(tenantID, tenantName) // The random client returns system and table data partitions. - streamClient, err := streamclient.NewStreamClient(ctx, streamingccl.StreamAddress(streamAddr), nil) + streamClient, err := streamclient.NewStreamClient(ctx, crosscluster.StreamAddress(streamAddr), nil) require.NoError(t, err) randomStreamClient, ok := streamClient.(*streamclient.RandomStreamClient) @@ -777,16 +777,16 @@ func noteKeyVal( func validateFnWithValidator( t *testing.T, validator *streamClientValidator, -) func(event streamingccl.Event, spec streamclient.SubscriptionToken) { - return func(event streamingccl.Event, spec streamclient.SubscriptionToken) { +) func(event crosscluster.Event, spec streamclient.SubscriptionToken) { + return func(event crosscluster.Event, spec streamclient.SubscriptionToken) { switch event.Type() { - case streamingccl.CheckpointEvent: + case crosscluster.CheckpointEvent: resolvedTS := resolvedSpansMinTS(event.GetResolvedSpans()) err := validator.noteResolved(string(spec), resolvedTS) if err != nil { panic(err.Error()) } - case streamingccl.SSTableEvent: + case crosscluster.SSTableEvent: kvs := storageutils.ScanSST(t, event.GetSSTable().Data) for _, keyVal := range kvs.MVCCKeyValues() { noteKeyVal(validator, []streampb.StreamEvent_KV{{ @@ -798,9 +798,9 @@ func validateFnWithValidator( }, }}}, spec) } - case streamingccl.KVEvent: + case crosscluster.KVEvent: noteKeyVal(validator, event.GetKVs(), spec) - case streamingccl.DeleteRangeEvent: + case crosscluster.DeleteRangeEvent: panic(errors.New("unsupported event type")) } } @@ -810,15 +810,15 @@ func validateFnWithValidator( // checkpoint events. func makeCheckpointEventCounter( mu *syncutil.Mutex, threshold int, f func(), -) func(streamingccl.Event, streamclient.SubscriptionToken) { +) func(crosscluster.Event, streamclient.SubscriptionToken) { mu.Lock() defer mu.Unlock() numCheckpointEventsGenerated := 0 - return func(event streamingccl.Event, _ streamclient.SubscriptionToken) { + return func(event crosscluster.Event, _ streamclient.SubscriptionToken) { mu.Lock() defer mu.Unlock() switch event.Type() { - case streamingccl.CheckpointEvent: + case crosscluster.CheckpointEvent: numCheckpointEventsGenerated++ if numCheckpointEventsGenerated == threshold { f() diff --git a/pkg/ccl/streamingccl/streamingest/testdata/add_early_cutover b/pkg/ccl/crosscluster/physical/testdata/add_early_cutover similarity index 100% rename from pkg/ccl/streamingccl/streamingest/testdata/add_early_cutover rename to pkg/ccl/crosscluster/physical/testdata/add_early_cutover diff --git a/pkg/ccl/streamingccl/streamingest/testdata/alter_tenant b/pkg/ccl/crosscluster/physical/testdata/alter_tenant similarity index 100% rename from pkg/ccl/streamingccl/streamingest/testdata/alter_tenant rename to pkg/ccl/crosscluster/physical/testdata/alter_tenant diff --git a/pkg/ccl/streamingccl/streamingest/testdata/backup_schedule_pauses_after_cutover b/pkg/ccl/crosscluster/physical/testdata/backup_schedule_pauses_after_cutover similarity index 100% rename from pkg/ccl/streamingccl/streamingest/testdata/backup_schedule_pauses_after_cutover rename to pkg/ccl/crosscluster/physical/testdata/backup_schedule_pauses_after_cutover diff --git a/pkg/ccl/streamingccl/streamingest/testdata/cutover_after_pause b/pkg/ccl/crosscluster/physical/testdata/cutover_after_pause similarity index 100% rename from pkg/ccl/streamingccl/streamingest/testdata/cutover_after_pause rename to pkg/ccl/crosscluster/physical/testdata/cutover_after_pause diff --git a/pkg/ccl/streamingccl/streamingest/testdata/expected_replication_frontier.txt b/pkg/ccl/crosscluster/physical/testdata/expected_replication_frontier.txt similarity index 100% rename from pkg/ccl/streamingccl/streamingest/testdata/expected_replication_frontier.txt rename to pkg/ccl/crosscluster/physical/testdata/expected_replication_frontier.txt diff --git a/pkg/ccl/streamingccl/streamingest/testdata/initial_scan_span_configs b/pkg/ccl/crosscluster/physical/testdata/initial_scan_span_configs similarity index 100% rename from pkg/ccl/streamingccl/streamingest/testdata/initial_scan_span_configs rename to pkg/ccl/crosscluster/physical/testdata/initial_scan_span_configs diff --git a/pkg/ccl/streamingccl/streamingest/testdata/jobs_post_cutover b/pkg/ccl/crosscluster/physical/testdata/jobs_post_cutover similarity index 100% rename from pkg/ccl/streamingccl/streamingest/testdata/jobs_post_cutover rename to pkg/ccl/crosscluster/physical/testdata/jobs_post_cutover diff --git a/pkg/ccl/streamingccl/streamingest/testdata/show_without_pts b/pkg/ccl/crosscluster/physical/testdata/show_without_pts similarity index 100% rename from pkg/ccl/streamingccl/streamingest/testdata/show_without_pts rename to pkg/ccl/crosscluster/physical/testdata/show_without_pts diff --git a/pkg/ccl/streamingccl/streamingest/testdata/simple b/pkg/ccl/crosscluster/physical/testdata/simple similarity index 100% rename from pkg/ccl/streamingccl/streamingest/testdata/simple rename to pkg/ccl/crosscluster/physical/testdata/simple diff --git a/pkg/ccl/streamingccl/streamingest/testdata/span_configs b/pkg/ccl/crosscluster/physical/testdata/span_configs similarity index 100% rename from pkg/ccl/streamingccl/streamingest/testdata/span_configs rename to pkg/ccl/crosscluster/physical/testdata/span_configs diff --git a/pkg/ccl/streamingccl/streamproducer/BUILD.bazel b/pkg/ccl/crosscluster/producer/BUILD.bazel similarity index 91% rename from pkg/ccl/streamingccl/streamproducer/BUILD.bazel rename to pkg/ccl/crosscluster/producer/BUILD.bazel index 433aa368f8d3..7211e3e800de 100644 --- a/pkg/ccl/streamingccl/streamproducer/BUILD.bazel +++ b/pkg/ccl/crosscluster/producer/BUILD.bazel @@ -1,7 +1,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( - name = "streamproducer", + name = "producer", srcs = [ "event_stream.go", "producer_job.go", @@ -10,12 +10,12 @@ go_library( "stream_event_batcher.go", "stream_lifetime.go", ], - importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamproducer", + importpath = "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/producer", visibility = ["//visibility:public"], deps = [ + "//pkg/ccl/crosscluster", + "//pkg/ccl/crosscluster/replicationutils", "//pkg/ccl/kvccl/kvfollowerreadsccl", - "//pkg/ccl/streamingccl", - "//pkg/ccl/streamingccl/replicationutils", "//pkg/ccl/utilccl", "//pkg/jobs", "//pkg/jobs/jobspb", @@ -69,7 +69,7 @@ go_library( ) go_test( - name = "streamproducer_test", + name = "producer_test", size = "large", srcs = [ "main_test.go", @@ -79,15 +79,15 @@ go_test( "stream_event_batcher_test.go", "stream_lifetime_test.go", ], - embed = [":streamproducer"], + embed = [":producer"], deps = [ "//pkg/base", "//pkg/ccl", "//pkg/ccl/changefeedccl", + "//pkg/ccl/crosscluster", + "//pkg/ccl/crosscluster/replicationtestutils", "//pkg/ccl/kvccl/kvtenantccl", "//pkg/ccl/storageccl", - "//pkg/ccl/streamingccl", - "//pkg/ccl/streamingccl/replicationtestutils", "//pkg/cloud/impl:cloudimpl", "//pkg/jobs", "//pkg/jobs/jobspb", diff --git a/pkg/ccl/streamingccl/streamproducer/event_stream.go b/pkg/ccl/crosscluster/producer/event_stream.go similarity index 99% rename from pkg/ccl/streamingccl/streamproducer/event_stream.go rename to pkg/ccl/crosscluster/producer/event_stream.go index 47aa18cfe20e..8972133dbc46 100644 --- a/pkg/ccl/streamingccl/streamproducer/event_stream.go +++ b/pkg/ccl/crosscluster/producer/event_stream.go @@ -6,14 +6,14 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamproducer +package producer import ( "context" "fmt" "time" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationutils" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" diff --git a/pkg/ccl/streamingccl/streamproducer/main_test.go b/pkg/ccl/crosscluster/producer/main_test.go similarity index 97% rename from pkg/ccl/streamingccl/streamproducer/main_test.go rename to pkg/ccl/crosscluster/producer/main_test.go index bae661f68be8..693151a5d90c 100644 --- a/pkg/ccl/streamingccl/streamproducer/main_test.go +++ b/pkg/ccl/crosscluster/producer/main_test.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamproducer_test +package producer_test import ( "os" diff --git a/pkg/ccl/streamingccl/streamproducer/producer_job.go b/pkg/ccl/crosscluster/producer/producer_job.go similarity index 97% rename from pkg/ccl/streamingccl/streamproducer/producer_job.go rename to pkg/ccl/crosscluster/producer/producer_job.go index 0df686aff8aa..7316058b8abd 100644 --- a/pkg/ccl/streamingccl/streamproducer/producer_job.go +++ b/pkg/ccl/crosscluster/producer/producer_job.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamproducer +package producer import ( "context" @@ -14,8 +14,8 @@ import ( "os/exec" "time" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationutils" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" @@ -142,7 +142,7 @@ func (p *producerJobResumer) Resume(ctx context.Context, execCtx interface{}) er return ctx.Err() case <-p.timer.Ch(): p.timer.MarkRead() - p.timer.Reset(streamingccl.StreamReplicationStreamLivenessTrackFrequency.Get(execCfg.SV())) + p.timer.Reset(crosscluster.StreamReplicationStreamLivenessTrackFrequency.Get(execCfg.SV())) progress, err := replicationutils.LoadReplicationProgress(ctx, execCfg.InternalDB, p.job.ID()) if knobs := execCfg.StreamingTestingKnobs; knobs != nil && knobs.AfterResumerJobLoad != nil { err = knobs.AfterResumerJobLoad(err) diff --git a/pkg/ccl/streamingccl/streamproducer/producer_job_test.go b/pkg/ccl/crosscluster/producer/producer_job_test.go similarity index 99% rename from pkg/ccl/streamingccl/streamproducer/producer_job_test.go rename to pkg/ccl/crosscluster/producer/producer_job_test.go index 40e8ffb1daea..35c99853696c 100644 --- a/pkg/ccl/streamingccl/streamproducer/producer_job_test.go +++ b/pkg/ccl/crosscluster/producer/producer_job_test.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamproducer +package producer import ( "context" diff --git a/pkg/ccl/streamingccl/streamproducer/replication_manager.go b/pkg/ccl/crosscluster/producer/replication_manager.go similarity index 99% rename from pkg/ccl/streamingccl/streamproducer/replication_manager.go rename to pkg/ccl/crosscluster/producer/replication_manager.go index 5e5cf14fb9bf..e74ae5a4e1e7 100644 --- a/pkg/ccl/streamingccl/streamproducer/replication_manager.go +++ b/pkg/ccl/crosscluster/producer/replication_manager.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamproducer +package producer import ( "context" diff --git a/pkg/ccl/streamingccl/streamproducer/replication_manager_test.go b/pkg/ccl/crosscluster/producer/replication_manager_test.go similarity index 99% rename from pkg/ccl/streamingccl/streamproducer/replication_manager_test.go rename to pkg/ccl/crosscluster/producer/replication_manager_test.go index 145895ecac6a..74c9de6dd769 100644 --- a/pkg/ccl/streamingccl/streamproducer/replication_manager_test.go +++ b/pkg/ccl/crosscluster/producer/replication_manager_test.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamproducer +package producer import ( "context" diff --git a/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go b/pkg/ccl/crosscluster/producer/replication_stream_test.go similarity index 98% rename from pkg/ccl/streamingccl/streamproducer/replication_stream_test.go rename to pkg/ccl/crosscluster/producer/replication_stream_test.go index 0e6020606ab1..8b31ba14e30c 100644 --- a/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go +++ b/pkg/ccl/crosscluster/producer/replication_stream_test.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamproducer_test +package producer_test import ( "bytes" @@ -20,10 +20,10 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" - _ "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl" // Ensure changefeed init hooks run. + _ "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl" // Ensure changefeed init hooks run. + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationtestutils" _ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" // Ensure we can start tenant. - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationtestutils" _ "github.com/cockroachdb/cockroach/pkg/cloud/impl" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" @@ -75,7 +75,7 @@ var _ replicationtestutils.FeedSource = (*pgConnReplicationFeedSource)(nil) type pgConnEventDecoder interface { decode() - pop() streamingccl.Event + pop() crosscluster.Event } type eventDecoderFactory func(t *testing.T, rows pgx.Rows) pgConnEventDecoder @@ -104,18 +104,18 @@ func makePartitionStreamDecoder(t *testing.T, rows pgx.Rows) pgConnEventDecoder } } -func (d *partitionStreamDecoder) pop() streamingccl.Event { +func (d *partitionStreamDecoder) pop() crosscluster.Event { if d.e.Checkpoint != nil { // TODO(yevgeniy): Fix checkpoint handling and support backfill checkpoints. // For now, check that we only have one span in the checkpoint, and use that timestamp. require.Equal(d.t, 1, len(d.e.Checkpoint.ResolvedSpans)) - event := streamingccl.MakeCheckpointEvent(d.e.Checkpoint.ResolvedSpans) + event := crosscluster.MakeCheckpointEvent(d.e.Checkpoint.ResolvedSpans) d.e.Checkpoint = nil return event } if d.e.Batch != nil { - event := streamingccl.MakeKVEvent(d.e.Batch.KVs[0:1]) + event := crosscluster.MakeKVEvent(d.e.Batch.KVs[0:1]) d.e.Batch.KVs = d.e.Batch.KVs[1:] if len(d.e.Batch.KVs) == 0 { d.e.Batch = nil @@ -138,7 +138,7 @@ func (d *partitionStreamDecoder) decode() { } // Next implements the streamingtest.FeedSource interface. -func (f *pgConnReplicationFeedSource) Next() (streamingccl.Event, bool) { +func (f *pgConnReplicationFeedSource) Next() (crosscluster.Event, bool) { f.mu.Lock() defer f.mu.Unlock() // First check if there exists a decoded event. diff --git a/pkg/ccl/streamingccl/streamproducer/span_config_event_stream.go b/pkg/ccl/crosscluster/producer/span_config_event_stream.go similarity index 99% rename from pkg/ccl/streamingccl/streamproducer/span_config_event_stream.go rename to pkg/ccl/crosscluster/producer/span_config_event_stream.go index 5150497aef1d..feb2c73c5be6 100644 --- a/pkg/ccl/streamingccl/streamproducer/span_config_event_stream.go +++ b/pkg/ccl/crosscluster/producer/span_config_event_stream.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamproducer +package producer import ( "context" diff --git a/pkg/ccl/streamingccl/streamproducer/stream_event_batcher.go b/pkg/ccl/crosscluster/producer/stream_event_batcher.go similarity index 99% rename from pkg/ccl/streamingccl/streamproducer/stream_event_batcher.go rename to pkg/ccl/crosscluster/producer/stream_event_batcher.go index 36b74b604de9..2bb2a469c8f6 100644 --- a/pkg/ccl/streamingccl/streamproducer/stream_event_batcher.go +++ b/pkg/ccl/crosscluster/producer/stream_event_batcher.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamproducer +package producer import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" diff --git a/pkg/ccl/streamingccl/streamproducer/stream_event_batcher_test.go b/pkg/ccl/crosscluster/producer/stream_event_batcher_test.go similarity index 98% rename from pkg/ccl/streamingccl/streamproducer/stream_event_batcher_test.go rename to pkg/ccl/crosscluster/producer/stream_event_batcher_test.go index 789421752909..21370fc2bf22 100644 --- a/pkg/ccl/streamingccl/streamproducer/stream_event_batcher_test.go +++ b/pkg/ccl/crosscluster/producer/stream_event_batcher_test.go @@ -6,13 +6,13 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamproducer +package producer import ( "fmt" "testing" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationtestutils" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationtestutils" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" diff --git a/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go b/pkg/ccl/crosscluster/producer/stream_lifetime.go similarity index 98% rename from pkg/ccl/streamingccl/streamproducer/stream_lifetime.go rename to pkg/ccl/crosscluster/producer/stream_lifetime.go index 4d61054e6e33..6bf93356124c 100644 --- a/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go +++ b/pkg/ccl/crosscluster/producer/stream_lifetime.go @@ -6,14 +6,14 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamproducer +package producer import ( "context" "time" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster" "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvfollowerreadsccl" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts" @@ -342,7 +342,7 @@ func buildReplicationStreamSpec( PartitionSpec: &streampb.StreamPartitionSpec{ Spans: sp.Spans, Config: streampb.StreamPartitionSpec_ExecutionConfig{ - MinCheckpointFrequency: streamingccl.StreamReplicationMinCheckpointFrequency.Get(&evalCtx.Settings.SV), + MinCheckpointFrequency: crosscluster.StreamReplicationMinCheckpointFrequency.Get(&evalCtx.Settings.SV), }, }, }) @@ -458,7 +458,7 @@ func setupSpanConfigsStream( spec := streampb.SpanConfigEventStreamSpec{ Span: span, TenantID: tenantID, - MinCheckpointFrequency: streamingccl.StreamReplicationMinCheckpointFrequency.Get(&evalCtx.Settings.SV), + MinCheckpointFrequency: crosscluster.StreamReplicationMinCheckpointFrequency.Get(&evalCtx.Settings.SV), WrappedEvents: true, } return streamSpanConfigs(evalCtx, spec) diff --git a/pkg/ccl/streamingccl/streamproducer/stream_lifetime_test.go b/pkg/ccl/crosscluster/producer/stream_lifetime_test.go similarity index 98% rename from pkg/ccl/streamingccl/streamproducer/stream_lifetime_test.go rename to pkg/ccl/crosscluster/producer/stream_lifetime_test.go index 27fd1fe28b46..a005568fcb6d 100644 --- a/pkg/ccl/streamingccl/streamproducer/stream_lifetime_test.go +++ b/pkg/ccl/crosscluster/producer/stream_lifetime_test.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamproducer +package producer import ( "fmt" diff --git a/pkg/ccl/streamingccl/replicationtestutils/BUILD.bazel b/pkg/ccl/crosscluster/replicationtestutils/BUILD.bazel similarity index 90% rename from pkg/ccl/streamingccl/replicationtestutils/BUILD.bazel rename to pkg/ccl/crosscluster/replicationtestutils/BUILD.bazel index 0c1e1993baac..b0a974af8e78 100644 --- a/pkg/ccl/streamingccl/replicationtestutils/BUILD.bazel +++ b/pkg/ccl/crosscluster/replicationtestutils/BUILD.bazel @@ -9,12 +9,12 @@ go_library( "span_config_helpers.go", "testutils.go", ], - importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationtestutils", + importpath = "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationtestutils", visibility = ["//visibility:public"], deps = [ "//pkg/base", - "//pkg/ccl/streamingccl", - "//pkg/ccl/streamingccl/replicationutils", + "//pkg/ccl/crosscluster", + "//pkg/ccl/crosscluster/replicationutils", "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/keys", diff --git a/pkg/ccl/streamingccl/replicationtestutils/encoding.go b/pkg/ccl/crosscluster/replicationtestutils/encoding.go similarity index 100% rename from pkg/ccl/streamingccl/replicationtestutils/encoding.go rename to pkg/ccl/crosscluster/replicationtestutils/encoding.go diff --git a/pkg/ccl/streamingccl/replicationtestutils/replication_helpers.go b/pkg/ccl/crosscluster/replicationtestutils/replication_helpers.go similarity index 94% rename from pkg/ccl/streamingccl/replicationtestutils/replication_helpers.go rename to pkg/ccl/crosscluster/replicationtestutils/replication_helpers.go index 8a66a7b4cec1..3be09cdabfdc 100644 --- a/pkg/ccl/streamingccl/replicationtestutils/replication_helpers.go +++ b/pkg/ccl/crosscluster/replicationtestutils/replication_helpers.go @@ -18,7 +18,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" @@ -36,15 +36,15 @@ import ( ) // FeedEventPredicate allows tests to search a ReplicationFeed. -type FeedEventPredicate func(message streamingccl.Event) bool +type FeedEventPredicate func(message crosscluster.Event) bool // FeedErrorPredicate allows tests to match an error from ReplicationFeed. type FeedErrorPredicate func(err error) bool // KeyMatches makes a FeedEventPredicate that matches a given key in a kv batch. func KeyMatches(key roachpb.Key) FeedEventPredicate { - return func(msg streamingccl.Event) bool { - if msg.Type() != streamingccl.KVEvent { + return func(msg crosscluster.Event) bool { + if msg.Type() != crosscluster.KVEvent { return false } for _, kv := range msg.GetKVs() { @@ -57,8 +57,8 @@ func KeyMatches(key roachpb.Key) FeedEventPredicate { } func AnySpanConfigMatches() FeedEventPredicate { - return func(msg streamingccl.Event) bool { - return msg.Type() == streamingccl.SpanConfigEvent + return func(msg crosscluster.Event) bool { + return msg.Type() == crosscluster.SpanConfigEvent } } @@ -75,8 +75,8 @@ func minResolvedTimestamp(resolvedSpans []jobspb.ResolvedSpan) hlc.Timestamp { // ResolvedAtLeast makes a FeedEventPredicate that matches when a timestamp has been // reached. func ResolvedAtLeast(lo hlc.Timestamp) FeedEventPredicate { - return func(msg streamingccl.Event) bool { - if msg.Type() != streamingccl.CheckpointEvent { + return func(msg crosscluster.Event) bool { + if msg.Type() != crosscluster.CheckpointEvent { return false } return lo.LessEq(minResolvedTimestamp(msg.GetResolvedSpans())) @@ -87,7 +87,7 @@ func ResolvedAtLeast(lo hlc.Timestamp) FeedEventPredicate { type FeedSource interface { // Next returns the next event, and a flag indicating if there are more events // to consume. - Next() (streamingccl.Event, bool) + Next() (crosscluster.Event, bool) // Error returns the error encountered in the feed. If present, it // is set after Next() indicates there is no more event to consume. @@ -101,7 +101,7 @@ type FeedSource interface { type ReplicationFeed struct { t *testing.T f FeedSource - msg streamingccl.Event + msg crosscluster.Event } // MakeReplicationFeed creates a ReplicationFeed based on a given FeedSource. @@ -147,7 +147,7 @@ func (rf *ReplicationFeed) ObserveResolved(ctx context.Context, lo hlc.Timestamp // ObserveError consumes the feed until the feed is exhausted, and the final error should // match 'errPred'. func (rf *ReplicationFeed) ObserveError(ctx context.Context, errPred FeedErrorPredicate) { - rf.consumeUntil(ctx, func(message streamingccl.Event) bool { + rf.consumeUntil(ctx, func(message crosscluster.Event) bool { return false }, errPred) } diff --git a/pkg/ccl/streamingccl/replicationtestutils/span_config_helpers.go b/pkg/ccl/crosscluster/replicationtestutils/span_config_helpers.go similarity index 100% rename from pkg/ccl/streamingccl/replicationtestutils/span_config_helpers.go rename to pkg/ccl/crosscluster/replicationtestutils/span_config_helpers.go diff --git a/pkg/ccl/streamingccl/replicationtestutils/testutils.go b/pkg/ccl/crosscluster/replicationtestutils/testutils.go similarity index 99% rename from pkg/ccl/streamingccl/replicationtestutils/testutils.go rename to pkg/ccl/crosscluster/replicationtestutils/testutils.go index 9fd6d8902398..70b6a1905486 100644 --- a/pkg/ccl/streamingccl/replicationtestutils/testutils.go +++ b/pkg/ccl/crosscluster/replicationtestutils/testutils.go @@ -20,7 +20,7 @@ import ( apd "github.com/cockroachdb/apd/v3" "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationutils" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" diff --git a/pkg/ccl/streamingccl/replicationutils/BUILD.bazel b/pkg/ccl/crosscluster/replicationutils/BUILD.bazel similarity index 93% rename from pkg/ccl/streamingccl/replicationutils/BUILD.bazel rename to pkg/ccl/crosscluster/replicationutils/BUILD.bazel index 4491661b39d0..2a1a6614d33f 100644 --- a/pkg/ccl/streamingccl/replicationutils/BUILD.bazel +++ b/pkg/ccl/crosscluster/replicationutils/BUILD.bazel @@ -3,7 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "replicationutils", srcs = ["utils.go"], - importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils", + importpath = "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationutils", visibility = ["//visibility:public"], deps = [ "//pkg/jobs", diff --git a/pkg/ccl/streamingccl/replicationutils/utils.go b/pkg/ccl/crosscluster/replicationutils/utils.go similarity index 100% rename from pkg/ccl/streamingccl/replicationutils/utils.go rename to pkg/ccl/crosscluster/replicationutils/utils.go diff --git a/pkg/ccl/streamingccl/replicationutils/utils_test.go b/pkg/ccl/crosscluster/replicationutils/utils_test.go similarity index 100% rename from pkg/ccl/streamingccl/replicationutils/utils_test.go rename to pkg/ccl/crosscluster/replicationutils/utils_test.go diff --git a/pkg/ccl/streamingccl/settings.go b/pkg/ccl/crosscluster/settings.go similarity index 99% rename from pkg/ccl/streamingccl/settings.go rename to pkg/ccl/crosscluster/settings.go index 1459450ff702..7e51a3b2f682 100644 --- a/pkg/ccl/streamingccl/settings.go +++ b/pkg/ccl/crosscluster/settings.go @@ -6,7 +6,7 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package streamingccl +package crosscluster import ( "time" diff --git a/pkg/ccl/streamingccl/streamclient/BUILD.bazel b/pkg/ccl/crosscluster/streamclient/BUILD.bazel similarity index 93% rename from pkg/ccl/streamingccl/streamclient/BUILD.bazel rename to pkg/ccl/crosscluster/streamclient/BUILD.bazel index 0d6cbe356d54..63fef3d374f4 100644 --- a/pkg/ccl/streamingccl/streamclient/BUILD.bazel +++ b/pkg/ccl/crosscluster/streamclient/BUILD.bazel @@ -12,10 +12,10 @@ go_library( "random_stream_client.go", "span_config_stream_client.go", ], - importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient", + importpath = "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/streamclient", visibility = ["//visibility:public"], deps = [ - "//pkg/ccl/streamingccl", + "//pkg/ccl/crosscluster", "//pkg/cloud/externalconn", "//pkg/jobs/jobspb", "//pkg/keys", @@ -67,10 +67,10 @@ go_test( deps = [ "//pkg/base", "//pkg/ccl", + "//pkg/ccl/crosscluster", + "//pkg/ccl/crosscluster/replicationtestutils", "//pkg/ccl/kvccl/kvtenantccl", "//pkg/ccl/storageccl", - "//pkg/ccl/streamingccl", - "//pkg/ccl/streamingccl/replicationtestutils", "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/keys", diff --git a/pkg/ccl/streamingccl/streamclient/client.go b/pkg/ccl/crosscluster/streamclient/client.go similarity index 96% rename from pkg/ccl/streamingccl/streamclient/client.go rename to pkg/ccl/crosscluster/streamclient/client.go index 40d14f72bf11..bd0b489de8be 100644 --- a/pkg/ccl/streamingccl/streamclient/client.go +++ b/pkg/ccl/crosscluster/streamclient/client.go @@ -13,7 +13,7 @@ import ( "fmt" "net/url" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster" "github.com/cockroachdb/cockroach/pkg/cloud/externalconn" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -184,7 +184,7 @@ type PartitionInfo struct { ID string SubscriptionToken SrcInstanceID int - SrcAddr streamingccl.PartitionAddress + SrcAddr crosscluster.PartitionAddress SrcLocality roachpb.Locality Spans []roachpb.Span } @@ -205,7 +205,7 @@ type Subscription interface { // Events is a channel receiving streaming events. // This channel is closed when no additional values will be sent to this channel. - Events() <-chan streamingccl.Event + Events() <-chan crosscluster.Event // Err is set once when Events channel closed -- must not be called before // the channel closes. @@ -214,7 +214,7 @@ type Subscription interface { // NewStreamClient creates a new stream client based on the stream address. func NewStreamClient( - ctx context.Context, streamAddress streamingccl.StreamAddress, db isql.DB, opts ...Option, + ctx context.Context, streamAddress crosscluster.StreamAddress, db isql.DB, opts ...Option, ) (Client, error) { var streamClient Client streamURL, err := streamAddress.URL() @@ -250,7 +250,7 @@ func NewStreamClient( func lookupExternalConnection( ctx context.Context, name string, localDB isql.DB, -) (streamingccl.StreamAddress, error) { +) (crosscluster.StreamAddress, error) { // Retrieve the external connection object from the system table. var ec externalconn.ExternalConnection if err := localDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { @@ -260,7 +260,7 @@ func lookupExternalConnection( }); err != nil { return "", errors.Wrap(err, "failed to load external connection object") } - return streamingccl.StreamAddress(ec.ConnectionProto().UnredactedURI()), nil + return crosscluster.StreamAddress(ec.ConnectionProto().UnredactedURI()), nil } // GetFirstActiveClient iterates through each provided stream address @@ -269,7 +269,7 @@ func GetFirstActiveClient( ctx context.Context, streamAddresses []string, db isql.DB, opts ...Option, ) (Client, error) { - newClient := func(ctx context.Context, address streamingccl.StreamAddress) (Dialer, error) { + newClient := func(ctx context.Context, address crosscluster.StreamAddress) (Dialer, error) { return NewStreamClient(ctx, address, db, opts...) } dialer, err := getFirstDialer(ctx, streamAddresses, newClient) @@ -279,7 +279,7 @@ func GetFirstActiveClient( return dialer.(Client), err } -type dialerFactory func(ctx context.Context, address streamingccl.StreamAddress) (Dialer, error) +type dialerFactory func(ctx context.Context, address crosscluster.StreamAddress) (Dialer, error) func getFirstDialer( ctx context.Context, streamAddresses []string, getNewDialer dialerFactory, @@ -289,7 +289,7 @@ func getFirstDialer( } var combinedError error = nil for _, address := range streamAddresses { - streamAddress := streamingccl.StreamAddress(address) + streamAddress := crosscluster.StreamAddress(address) clientCandidate, err := getNewDialer(ctx, streamAddress) if err == nil { err = clientCandidate.Dial(ctx) diff --git a/pkg/ccl/streamingccl/streamclient/client_helpers.go b/pkg/ccl/crosscluster/streamclient/client_helpers.go similarity index 83% rename from pkg/ccl/streamingccl/streamclient/client_helpers.go rename to pkg/ccl/crosscluster/streamclient/client_helpers.go index 7e0d83634394..925cf0cefca5 100644 --- a/pkg/ccl/streamingccl/streamclient/client_helpers.go +++ b/pkg/ccl/crosscluster/streamclient/client_helpers.go @@ -11,7 +11,7 @@ package streamclient import ( "context" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/golang/snappy" @@ -22,13 +22,13 @@ import ( func subscribeInternal( ctx context.Context, feed pgx.Rows, - eventCh chan streamingccl.Event, + eventCh chan crosscluster.Event, closeCh chan struct{}, compressed bool, ) error { // Get the next event from the cursor. var bufferedEvent *streampb.StreamEvent - getNextEvent := func() (streamingccl.Event, error) { + getNextEvent := func() (crosscluster.Event, error) { if e := parseEvent(bufferedEvent); e != nil { return e, nil } @@ -88,37 +88,37 @@ func subscribeInternal( } // parseEvent parses next event from the batch of events inside streampb.StreamEvent. -func parseEvent(streamEvent *streampb.StreamEvent) streamingccl.Event { +func parseEvent(streamEvent *streampb.StreamEvent) crosscluster.Event { if streamEvent == nil { return nil } if streamEvent.Checkpoint != nil { - event := streamingccl.MakeCheckpointEvent(streamEvent.Checkpoint.ResolvedSpans) + event := crosscluster.MakeCheckpointEvent(streamEvent.Checkpoint.ResolvedSpans) streamEvent.Checkpoint = nil return event } - var event streamingccl.Event + var event crosscluster.Event if streamEvent.Batch != nil { switch { case len(streamEvent.Batch.Ssts) > 0: - event = streamingccl.MakeSSTableEvent(streamEvent.Batch.Ssts[0]) + event = crosscluster.MakeSSTableEvent(streamEvent.Batch.Ssts[0]) streamEvent.Batch.Ssts = streamEvent.Batch.Ssts[1:] case len(streamEvent.Batch.DeprecatedKeyValues) > 0: - event = streamingccl.MakeKVEventFromKVs(streamEvent.Batch.DeprecatedKeyValues) + event = crosscluster.MakeKVEventFromKVs(streamEvent.Batch.DeprecatedKeyValues) streamEvent.Batch.DeprecatedKeyValues = nil case len(streamEvent.Batch.KVs) > 0: - event = streamingccl.MakeKVEvent(streamEvent.Batch.KVs) + event = crosscluster.MakeKVEvent(streamEvent.Batch.KVs) streamEvent.Batch.KVs = nil case len(streamEvent.Batch.DelRanges) > 0: - event = streamingccl.MakeDeleteRangeEvent(streamEvent.Batch.DelRanges[0]) + event = crosscluster.MakeDeleteRangeEvent(streamEvent.Batch.DelRanges[0]) streamEvent.Batch.DelRanges = streamEvent.Batch.DelRanges[1:] case len(streamEvent.Batch.SpanConfigs) > 0: - event = streamingccl.MakeSpanConfigEvent(streamEvent.Batch.SpanConfigs[0]) + event = crosscluster.MakeSpanConfigEvent(streamEvent.Batch.SpanConfigs[0]) streamEvent.Batch.SpanConfigs = streamEvent.Batch.SpanConfigs[1:] case len(streamEvent.Batch.SplitPoints) > 0: - event = streamingccl.MakeSplitEvent(streamEvent.Batch.SplitPoints[0]) + event = crosscluster.MakeSplitEvent(streamEvent.Batch.SplitPoints[0]) streamEvent.Batch.SplitPoints = streamEvent.Batch.SplitPoints[1:] } diff --git a/pkg/ccl/streamingccl/streamclient/client_test.go b/pkg/ccl/crosscluster/streamclient/client_test.go similarity index 94% rename from pkg/ccl/streamingccl/streamclient/client_test.go rename to pkg/ccl/crosscluster/streamclient/client_test.go index 17c2348d8759..d203695e8749 100644 --- a/pkg/ccl/streamingccl/streamclient/client_test.go +++ b/pkg/ccl/crosscluster/streamclient/client_test.go @@ -16,7 +16,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -104,9 +104,9 @@ func (sc testStreamClient) Subscribe( Timestamp: hlc.Timestamp{WallTime: 100}, } - events := make(chan streamingccl.Event, 2) - events <- streamingccl.MakeKVEventFromKVs([]roachpb.KeyValue{sampleKV}) - events <- streamingccl.MakeCheckpointEvent([]jobspb.ResolvedSpan{sampleResolvedSpan}) + events := make(chan crosscluster.Event, 2) + events <- crosscluster.MakeKVEventFromKVs([]roachpb.KeyValue{sampleKV}) + events <- crosscluster.MakeCheckpointEvent([]jobspb.ResolvedSpan{sampleResolvedSpan}) close(events) return &testStreamSubscription{ @@ -127,7 +127,7 @@ func (sc testStreamClient) PriorReplicationDetails( } type testStreamSubscription struct { - eventCh chan streamingccl.Event + eventCh chan crosscluster.Event } // Subscribe implements the Subscription interface. @@ -136,7 +136,7 @@ func (t testStreamSubscription) Subscribe(_ context.Context) error { } // Events implements the Subscription interface. -func (t testStreamSubscription) Events() <-chan streamingccl.Event { +func (t testStreamSubscription) Events() <-chan crosscluster.Event { return t.eventCh } @@ -177,8 +177,8 @@ func TestExternalConnectionClient(t *testing.T) { sql.Exec(t, fmt.Sprintf(`CREATE EXTERNAL CONNECTION "%s" AS "%s"`, externalConnection, pgURL.String())) nonExistentConnection := "i-dont-exist" - address := streamingccl.StreamAddress(fmt.Sprintf("external://%s", externalConnection)) - dontExistAddress := streamingccl.StreamAddress(fmt.Sprintf("external://%s", nonExistentConnection)) + address := crosscluster.StreamAddress(fmt.Sprintf("external://%s", externalConnection)) + dontExistAddress := crosscluster.StreamAddress(fmt.Sprintf("external://%s", nonExistentConnection)) isqlDB := srv.InternalDB().(isql.DB) client, err := NewStreamClient(ctx, address, isqlDB) @@ -305,18 +305,18 @@ func ExampleClient() { // cutting over to the new cluster to move to the next stage. for event := range sub.Events() { switch event.Type() { - case streamingccl.KVEvent: + case crosscluster.KVEvent: kvs := event.GetKVs() for _, kv := range kvs { fmt.Printf("kv: %s->%s@%d\n", kv.KeyValue.Key.String(), string(kv.KeyValue.Value.RawBytes), kv.KeyValue.Value.Timestamp.WallTime) } - case streamingccl.SSTableEvent: + case crosscluster.SSTableEvent: sst := event.GetSSTable() fmt.Printf("sst: %s->%s@%d\n", sst.Span.String(), string(sst.Data), sst.WriteTS.WallTime) - case streamingccl.DeleteRangeEvent: + case crosscluster.DeleteRangeEvent: delRange := event.GetDeleteRange() fmt.Printf("delRange: %s@%d\n", delRange.Span.String(), delRange.Timestamp.WallTime) - case streamingccl.CheckpointEvent: + case crosscluster.CheckpointEvent: minTS := hlc.MaxTimestamp for _, rs := range event.GetResolvedSpans() { if rs.Timestamp.Less(minTS) { diff --git a/pkg/ccl/streamingccl/streamclient/heartbeat_sender.go b/pkg/ccl/crosscluster/streamclient/heartbeat_sender.go similarity index 97% rename from pkg/ccl/streamingccl/streamclient/heartbeat_sender.go rename to pkg/ccl/crosscluster/streamclient/heartbeat_sender.go index 0e37b5e24eb9..0f29a09d8d35 100644 --- a/pkg/ccl/streamingccl/streamclient/heartbeat_sender.go +++ b/pkg/ccl/crosscluster/streamclient/heartbeat_sender.go @@ -12,7 +12,7 @@ import ( "context" "time" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -98,7 +98,7 @@ func (h *HeartbeatSender) Start(ctx context.Context, ts timeutil.TimeSource) { continue } // The replication stream is either paused or inactive. - return streamingccl.NewStreamStatusErr(h.streamID, streamStatus.StreamStatus) + return crosscluster.NewStreamStatusErr(h.streamID, streamStatus.StreamStatus) } } err := sendHeartbeats() diff --git a/pkg/ccl/streamingccl/streamclient/heartbeat_sender_test.go b/pkg/ccl/crosscluster/streamclient/heartbeat_sender_test.go similarity index 100% rename from pkg/ccl/streamingccl/streamclient/heartbeat_sender_test.go rename to pkg/ccl/crosscluster/streamclient/heartbeat_sender_test.go diff --git a/pkg/ccl/streamingccl/streamclient/main_test.go b/pkg/ccl/crosscluster/streamclient/main_test.go similarity index 100% rename from pkg/ccl/streamingccl/streamclient/main_test.go rename to pkg/ccl/crosscluster/streamclient/main_test.go diff --git a/pkg/ccl/streamingccl/streamclient/mock_stream_client.go b/pkg/ccl/crosscluster/streamclient/mock_stream_client.go similarity index 93% rename from pkg/ccl/streamingccl/streamclient/mock_stream_client.go rename to pkg/ccl/crosscluster/streamclient/mock_stream_client.go index f431b02caa9e..d86c9c0c8a73 100644 --- a/pkg/ccl/streamingccl/streamclient/mock_stream_client.go +++ b/pkg/ccl/crosscluster/streamclient/mock_stream_client.go @@ -11,7 +11,7 @@ package streamclient import ( "context" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -24,7 +24,7 @@ import ( // partition being consumed. Stream partitions are identified by unique // partition addresses. type MockStreamClient struct { - PartitionEvents map[string][]streamingccl.Event + PartitionEvents map[string][]crosscluster.Event DoneCh chan struct{} HeartbeatErr error HeartbeatStatus streampb.StreamReplicationStatus @@ -63,7 +63,7 @@ func (m *MockStreamClient) PlanPhysicalReplication( } type mockSubscription struct { - eventsCh chan streamingccl.Event + eventsCh chan crosscluster.Event } // Subscribe implements the Subscription interface. @@ -72,7 +72,7 @@ func (m *mockSubscription) Subscribe(_ context.Context) error { } // Events implements the Subscription interface. -func (m *mockSubscription) Events() <-chan streamingccl.Event { +func (m *mockSubscription) Events() <-chan crosscluster.Event { return m.eventsCh } @@ -91,7 +91,7 @@ func (m *MockStreamClient) Subscribe( _ span.Frontier, _ ...SubscribeOption, ) (Subscription, error) { - var events []streamingccl.Event + var events []crosscluster.Event var ok bool if events, ok = m.PartitionEvents[string(token)]; !ok { return nil, errors.Newf("no events found for partition %s", string(token)) @@ -99,7 +99,7 @@ func (m *MockStreamClient) Subscribe( log.Infof(ctx, "%q beginning subscription from time %v ", string(token), initialScanTime) log.Infof(ctx, "%q emitting %d events", string(token), len(events)) - eventCh := make(chan streamingccl.Event, len(events)) + eventCh := make(chan crosscluster.Event, len(events)) for _, event := range events { log.Infof(ctx, "%q emitting event %v", string(token), event) eventCh <- event diff --git a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go b/pkg/ccl/crosscluster/streamclient/partitioned_stream_client.go similarity index 97% rename from pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go rename to pkg/ccl/crosscluster/streamclient/partitioned_stream_client.go index 6ab6f0b9bf6c..e43ce57ce89f 100644 --- a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go +++ b/pkg/ccl/crosscluster/streamclient/partitioned_stream_client.go @@ -16,7 +16,7 @@ import ( "net/url" "github.com/cockroachdb/apd/v3" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -181,7 +181,7 @@ func (p *partitionedStreamClient) createTopology( ID: sp.NodeID.String(), SubscriptionToken: SubscriptionToken(rawSpec), SrcInstanceID: int(sp.NodeID), - SrcAddr: streamingccl.PartitionAddress(pgURL.String()), + SrcAddr: crosscluster.PartitionAddress(pgURL.String()), SrcLocality: sp.Locality, Spans: sp.PartitionSpec.Spans, }) @@ -250,7 +250,7 @@ func (p *partitionedStreamClient) Subscribe( } res := &partitionedStreamSubscription{ - eventsChan: make(chan streamingccl.Event), + eventsChan: make(chan crosscluster.Event), srcConnConfig: p.pgxConfig, specBytes: specBytes, streamID: streamID, @@ -388,7 +388,7 @@ func (p *partitionedStreamClient) PriorReplicationDetails( type partitionedStreamSubscription struct { err error srcConnConfig *pgx.ConnConfig - eventsChan chan streamingccl.Event + eventsChan chan crosscluster.Event // Channel to send signal to close the subscription. closeChan chan struct{} @@ -433,7 +433,7 @@ func (p *partitionedStreamSubscription) Subscribe(ctx context.Context) error { } // Events implements the Subscription interface. -func (p *partitionedStreamSubscription) Events() <-chan streamingccl.Event { +func (p *partitionedStreamSubscription) Events() <-chan crosscluster.Event { return p.eventsChan } diff --git a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client_test.go b/pkg/ccl/crosscluster/streamclient/partitioned_stream_client_test.go similarity index 98% rename from pkg/ccl/streamingccl/streamclient/partitioned_stream_client_test.go rename to pkg/ccl/crosscluster/streamclient/partitioned_stream_client_test.go index 2f2ad9c97b88..efa6e1b7e9c4 100644 --- a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client_test.go +++ b/pkg/ccl/crosscluster/streamclient/partitioned_stream_client_test.go @@ -16,10 +16,10 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationtestutils" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/streamclient" _ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" // Ensure we can start tenant. - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationtestutils" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" @@ -50,7 +50,7 @@ type subscriptionFeedSource struct { var _ replicationtestutils.FeedSource = (*subscriptionFeedSource)(nil) // Next implements the streamingtest.FeedSource interface. -func (f *subscriptionFeedSource) Next() (streamingccl.Event, bool) { +func (f *subscriptionFeedSource) Next() (crosscluster.Event, bool) { event, hasMore := <-f.sub.Events() return event, hasMore } @@ -320,7 +320,7 @@ INSERT INTO d.t2 VALUES (2); // Ignore table t2 and only subscribe to the changes to table t1. require.Equal(t, len(top.Partitions), 1) - url, err := streamingccl.StreamAddress(top.Partitions[0].SrcAddr).URL() + url, err := crosscluster.StreamAddress(top.Partitions[0].SrcAddr).URL() require.NoError(t, err) // Create a new stream client with the given partition address. subClient, err := streamclient.NewPartitionedStreamClient(ctx, url) diff --git a/pkg/ccl/streamingccl/streamclient/pgconn.go b/pkg/ccl/crosscluster/streamclient/pgconn.go similarity index 100% rename from pkg/ccl/streamingccl/streamclient/pgconn.go rename to pkg/ccl/crosscluster/streamclient/pgconn.go diff --git a/pkg/ccl/streamingccl/streamclient/random_stream_client.go b/pkg/ccl/crosscluster/streamclient/random_stream_client.go similarity index 95% rename from pkg/ccl/streamingccl/streamclient/random_stream_client.go rename to pkg/ccl/crosscluster/streamclient/random_stream_client.go index eb18c72227fe..ce520e601b69 100644 --- a/pkg/ccl/streamingccl/streamclient/random_stream_client.go +++ b/pkg/ccl/crosscluster/streamclient/random_stream_client.go @@ -16,7 +16,7 @@ import ( "strconv" "time" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" @@ -98,7 +98,7 @@ func GetRandomStreamClientSingletonForTesting() *RandomStreamClient { // InterceptFn is a function that will intercept events emitted by // an InterceptableStreamClient -type InterceptFn func(event streamingccl.Event, spec SubscriptionToken) +type InterceptFn func(event crosscluster.Event, spec SubscriptionToken) // DialInterceptFn is a function that will intercept Dial calls made to an // InterceptableStreamClient @@ -260,8 +260,8 @@ func newRandomEventGenerator( }, nil } -func (r *randomEventGenerator) generateNewEvent() streamingccl.Event { - var event streamingccl.Event +func (r *randomEventGenerator) generateNewEvent() crosscluster.Event { + var event crosscluster.Event if r.numEventsSinceLastResolved == r.config.eventsPerCheckpoint { sp := r.tableDesc.TableSpan(r.codec) if r.config.startTenant { @@ -274,14 +274,14 @@ func (r *randomEventGenerator) generateNewEvent() streamingccl.Event { resolvedTime := timeutil.Now() hlcResolvedTime := hlc.Timestamp{WallTime: resolvedTime.UnixNano()} resolvedSpan := jobspb.ResolvedSpan{Span: sp, Timestamp: hlcResolvedTime} - event = streamingccl.MakeCheckpointEvent([]jobspb.ResolvedSpan{resolvedSpan}) + event = crosscluster.MakeCheckpointEvent([]jobspb.ResolvedSpan{resolvedSpan}) r.numEventsSinceLastResolved = 0 } else { // If there are system KVs to emit, prioritize those. if len(r.systemKVs) > 0 { systemKV := r.systemKVs[0] systemKV.Value.Timestamp = hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} - event = streamingccl.MakeKVEventFromKVs([]roachpb.KeyValue{systemKV}) + event = crosscluster.MakeKVEventFromKVs([]roachpb.KeyValue{systemKV}) r.systemKVs = r.systemKVs[1:] return event } @@ -294,9 +294,9 @@ func (r *randomEventGenerator) generateNewEvent() streamingccl.Event { for i := 0; i < size; i++ { keyVals = append(keyVals, makeRandomKey(r.rng, r.config, r.codec, r.tableDesc)) } - event = streamingccl.MakeSSTableEvent(r.sstMaker(keyVals)) + event = crosscluster.MakeSSTableEvent(r.sstMaker(keyVals)) } else { - event = streamingccl.MakeKVEventFromKVs([]roachpb.KeyValue{makeRandomKey(r.rng, r.config, r.codec, r.tableDesc)}) + event = crosscluster.MakeKVEventFromKVs([]roachpb.KeyValue{makeRandomKey(r.rng, r.config, r.codec, r.tableDesc)}) } r.numEventsSinceLastResolved++ } @@ -412,7 +412,7 @@ func (m *RandomStreamClient) PlanPhysicalReplication( topology.Partitions = append(topology.Partitions, PartitionInfo{ ID: strconv.Itoa(i), - SrcAddr: streamingccl.PartitionAddress(partitionURI), + SrcAddr: crosscluster.PartitionAddress(partitionURI), SubscriptionToken: []byte(partitionURI), Spans: []roachpb.Span{tableDesc.TableSpan(srcCodec)}, }) @@ -521,7 +521,7 @@ func (m *RandomStreamClient) Subscribe( return nil, err } - eventCh := make(chan streamingccl.Event) + eventCh := make(chan crosscluster.Event) now := timeutil.Now() startWalltime := timeutil.Unix(0 /* sec */, initialScanTime.WallTime) if startWalltime.After(now) { @@ -543,9 +543,9 @@ func (m *RandomStreamClient) Subscribe( defer close(eventCh) dataEventInterval := config.eventFrequency - var lastEventCopy streamingccl.Event + var lastEventCopy crosscluster.Event for { - var event streamingccl.Event + var event crosscluster.Event if lastEventCopy != nil && rng.Float64() < config.dupProbability { event = duplicateEvent(lastEventCopy) } else { @@ -598,7 +598,7 @@ func (p *RandomStreamClient) PriorReplicationDetails( type randomStreamSubscription struct { receiveFn func(ctx context.Context) error - eventCh chan streamingccl.Event + eventCh chan crosscluster.Event } // Subscribe implements the Subscription interface. @@ -607,7 +607,7 @@ func (r *randomStreamSubscription) Subscribe(ctx context.Context) error { } // Events implements the Subscription interface. -func (r *randomStreamSubscription) Events() <-chan streamingccl.Event { +func (r *randomStreamSubscription) Events() <-chan crosscluster.Event { return r.eventCh } @@ -670,14 +670,14 @@ func makeRandomKey( } } -func duplicateEvent(event streamingccl.Event) streamingccl.Event { - var dup streamingccl.Event +func duplicateEvent(event crosscluster.Event) crosscluster.Event { + var dup crosscluster.Event switch event.Type() { - case streamingccl.CheckpointEvent: + case crosscluster.CheckpointEvent: resolvedSpans := make([]jobspb.ResolvedSpan, len(event.GetResolvedSpans())) copy(resolvedSpans, event.GetResolvedSpans()) - dup = streamingccl.MakeCheckpointEvent(resolvedSpans) - case streamingccl.KVEvent: + dup = crosscluster.MakeCheckpointEvent(resolvedSpans) + case crosscluster.KVEvent: kvs := event.GetKVs() res := make([]roachpb.KeyValue, len(kvs)) var a bufalloc.ByteAllocator @@ -686,12 +686,12 @@ func duplicateEvent(event streamingccl.Event) streamingccl.Event { res[i].Value.Timestamp = kvs[i].KeyValue.Value.Timestamp a, res[i].Value.RawBytes = a.Copy(kvs[i].KeyValue.Value.RawBytes, 0) } - dup = streamingccl.MakeKVEventFromKVs(res) - case streamingccl.SSTableEvent: + dup = crosscluster.MakeKVEventFromKVs(res) + case crosscluster.SSTableEvent: sst := event.GetSSTable() dataCopy := make([]byte, len(sst.Data)) copy(dataCopy, sst.Data) - dup = streamingccl.MakeSSTableEvent(kvpb.RangeFeedSSTable{ + dup = crosscluster.MakeSSTableEvent(kvpb.RangeFeedSSTable{ Data: dataCopy, Span: sst.Span.Clone(), WriteTS: sst.WriteTS, diff --git a/pkg/ccl/streamingccl/streamclient/span_config_stream_client.go b/pkg/ccl/crosscluster/streamclient/span_config_stream_client.go similarity index 95% rename from pkg/ccl/streamingccl/streamclient/span_config_stream_client.go rename to pkg/ccl/crosscluster/streamclient/span_config_stream_client.go index 8067c9c0bb4f..7859e203e888 100644 --- a/pkg/ccl/streamingccl/streamclient/span_config_stream_client.go +++ b/pkg/ccl/crosscluster/streamclient/span_config_stream_client.go @@ -12,7 +12,7 @@ import ( "context" "net/url" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" @@ -79,7 +79,7 @@ func GetFirstActiveSpanConfigClient( ctx context.Context, streamAddresses []string, db isql.DB, opts ...Option, ) (SpanConfigClient, error) { - newClient := func(ctx context.Context, address streamingccl.StreamAddress) (Dialer, error) { + newClient := func(ctx context.Context, address crosscluster.StreamAddress) (Dialer, error) { streamURL, err := address.URL() if err != nil { return nil, err @@ -106,7 +106,7 @@ func (p *spanConfigClient) Close(ctx context.Context) error { func (p *spanConfigClient) SetupSpanConfigsStream(tenant roachpb.TenantName) (Subscription, error) { p.subscription = spanConfigStreamSubscription{ - eventsChan: make(chan streamingccl.Event), + eventsChan: make(chan crosscluster.Event), srcConnConfig: p.pgxConfig, tenantName: tenant, closeChan: make(chan struct{}), @@ -118,7 +118,7 @@ func (p *spanConfigClient) SetupSpanConfigsStream(tenant roachpb.TenantName) (Su type spanConfigStreamSubscription struct { err error srcConnConfig *pgx.ConnConfig - eventsChan chan streamingccl.Event + eventsChan chan crosscluster.Event tenantName roachpb.TenantName // Channel to send signal to close the subscription. closeChan chan struct{} @@ -179,7 +179,7 @@ func (p *spanConfigStreamSubscription) Subscribe(ctx context.Context) error { } // Events implements the Subscription interface. -func (p *spanConfigStreamSubscription) Events() <-chan streamingccl.Event { +func (p *spanConfigStreamSubscription) Events() <-chan crosscluster.Event { return p.eventsChan } diff --git a/pkg/ccl/streamingccl/streamclient/span_config_stream_client_test.go b/pkg/ccl/crosscluster/streamclient/span_config_stream_client_test.go similarity index 95% rename from pkg/ccl/streamingccl/streamclient/span_config_stream_client_test.go rename to pkg/ccl/crosscluster/streamclient/span_config_stream_client_test.go index 3c0d43ee0851..94842202d305 100644 --- a/pkg/ccl/streamingccl/streamclient/span_config_stream_client_test.go +++ b/pkg/ccl/crosscluster/streamclient/span_config_stream_client_test.go @@ -14,8 +14,8 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationtestutils" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationtestutils" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/streamclient" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" diff --git a/pkg/cmd/github-pull-request-make/main_test.go b/pkg/cmd/github-pull-request-make/main_test.go index 258e9f139145..ae5a1d0d222b 100644 --- a/pkg/cmd/github-pull-request-make/main_test.go +++ b/pkg/cmd/github-pull-request-make/main_test.go @@ -24,12 +24,12 @@ func TestPkgsFromDiff(t *testing.T) { "pkg/ccl/storageccl": makePkg([]string{"TestPutS3"}), }, datapathutils.TestDataPath(t, "modified.diff"): { - "pkg/ccl/streamingccl/streamingest": makePkg([]string{"TestStreamingAutoReplan"}), + "pkg/ccl/crosscluster/physical": makePkg([]string{"TestStreamingAutoReplan"}), }, datapathutils.TestDataPath(t, "removed.diff"): {}, datapathutils.TestDataPath(t, "not_go.diff"): {}, datapathutils.TestDataPath(t, "new_test.diff"): { - "pkg/ccl/streamingccl/streamclient": makePkg([]string{ + "pkg/ccl/crosscluster/streamclient": makePkg([]string{ "TestExternalConnectionClient", "TestGetFirstActiveClientEmpty", }), diff --git a/pkg/cmd/github-pull-request-make/testdata/modified.diff b/pkg/cmd/github-pull-request-make/testdata/modified.diff index ca048d2d0e8e..3b16f1143e2f 100644 --- a/pkg/cmd/github-pull-request-make/testdata/modified.diff +++ b/pkg/cmd/github-pull-request-make/testdata/modified.diff @@ -1,7 +1,7 @@ -diff --git a/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go b/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go +diff --git a/pkg/ccl/crosscluster/physical/replication_stream_e2e_test.go b/pkg/ccl/crosscluster/physical/replication_stream_e2e_test.go index b92047e8651..a76b862ea8c 100644 ---- a/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go -+++ b/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go +--- a/pkg/ccl/crosscluster/physical/replication_stream_e2e_test.go ++++ b/pkg/ccl/crosscluster/physical/replication_stream_e2e_test.go @@ -729,6 +729,7 @@ func TestStreamingAutoReplan(t *testing.T) { defer cleanup() // Don't allow for replanning until the new nodes and scattered table have been created. diff --git a/pkg/cmd/github-pull-request-make/testdata/new_test.diff b/pkg/cmd/github-pull-request-make/testdata/new_test.diff index d5755a30efd9..74d1554465e8 100644 --- a/pkg/cmd/github-pull-request-make/testdata/new_test.diff +++ b/pkg/cmd/github-pull-request-make/testdata/new_test.diff @@ -1,12 +1,12 @@ index 6c20afaef23..879494bdadd 100644 ---- a/pkg/ccl/streamingccl/streamclient/client_test.go -+++ b/pkg/ccl/streamingccl/streamclient/client_test.go +--- a/pkg/ccl/crosscluster/streamclient/client_test.go ++++ b/pkg/ccl/crosscluster/streamclient/client_test.go @@ -15,13 +15,19 @@ import ( "testing" "time" + "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -43,8 +43,8 @@ abled}) + sql.Exec(t, fmt.Sprintf(`CREATE EXTERNAL CONNECTION "%s" AS "%s"`, + externalConnection, pgURL.String())) + nonExistentConnection := "i-dont-exist" -+ address := streamingccl.StreamAddress(fmt.Sprintf("external://%s", externalConnection)) -+ dontExistAddress := streamingccl.StreamAddress(fmt.Sprintf("external://%s", nonExistentConnection)) ++ address := crosscluster.StreamAddress(fmt.Sprintf("external://%s", externalConnection)) ++ dontExistAddress := crosscluster.StreamAddress(fmt.Sprintf("external://%s", nonExistentConnection)) + + isqlDB := srv.InternalDB().(isql.DB) + client, err := NewStreamClient(ctx, address, isqlDB) diff --git a/pkg/cmd/roachtest/tests/BUILD.bazel b/pkg/cmd/roachtest/tests/BUILD.bazel index 8a77082211ce..afd82b69d5a6 100644 --- a/pkg/cmd/roachtest/tests/BUILD.bazel +++ b/pkg/cmd/roachtest/tests/BUILD.bazel @@ -206,8 +206,8 @@ go_library( "//pkg/ccl/changefeedccl", "//pkg/ccl/changefeedccl/cdctest", "//pkg/ccl/changefeedccl/changefeedbase", + "//pkg/ccl/crosscluster/replicationutils", "//pkg/ccl/storageccl/engineccl/enginepbccl", - "//pkg/ccl/streamingccl/replicationutils", "//pkg/cli", "//pkg/cli/clisqlclient", "//pkg/cloud", diff --git a/pkg/cmd/roachtest/tests/cluster_to_cluster.go b/pkg/cmd/roachtest/tests/cluster_to_cluster.go index 24911ca537f6..0e6dc68fe0d1 100644 --- a/pkg/cmd/roachtest/tests/cluster_to_cluster.go +++ b/pkg/cmd/roachtest/tests/cluster_to_cluster.go @@ -23,7 +23,7 @@ import ( "time" apd "github.com/cockroachdb/apd/v3" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationutils" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/clusterstats" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" diff --git a/pkg/server/application_api/BUILD.bazel b/pkg/server/application_api/BUILD.bazel index 4d881fa70768..15ac49cc1540 100644 --- a/pkg/server/application_api/BUILD.bazel +++ b/pkg/server/application_api/BUILD.bazel @@ -40,7 +40,7 @@ go_test( deps = [ "//pkg/base", "//pkg/ccl", - "//pkg/ccl/streamingccl", + "//pkg/ccl/crosscluster", "//pkg/config/zonepb", "//pkg/jobs", "//pkg/jobs/jobspb", diff --git a/pkg/server/application_api/config_test.go b/pkg/server/application_api/config_test.go index 5387b2f4c89e..b244cf5be2e9 100644 --- a/pkg/server/application_api/config_test.go +++ b/pkg/server/application_api/config_test.go @@ -22,7 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl" // To ensure the streaming replication cluster setting is defined. - _ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" + _ "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster" "github.com/cockroachdb/cockroach/pkg/server/apiconstants" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/server/srvtestutils" diff --git a/pkg/sql/ttl/ttljob/BUILD.bazel b/pkg/sql/ttl/ttljob/BUILD.bazel index 34ce6ec2ca38..8f8df52fd466 100644 --- a/pkg/sql/ttl/ttljob/BUILD.bazel +++ b/pkg/sql/ttl/ttljob/BUILD.bazel @@ -71,8 +71,8 @@ go_test( shard_count = 4, deps = [ "//pkg/base", + "//pkg/ccl/crosscluster/replicationtestutils", "//pkg/ccl/kvccl/kvtenantccl", - "//pkg/ccl/streamingccl/replicationtestutils", "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/jobs/jobstest", diff --git a/pkg/sql/ttl/ttljob/ttljob_processor_test.go b/pkg/sql/ttl/ttljob/ttljob_processor_test.go index 647ce28d9244..4bf2d867881b 100644 --- a/pkg/sql/ttl/ttljob/ttljob_processor_test.go +++ b/pkg/sql/ttl/ttljob/ttljob_processor_test.go @@ -16,7 +16,7 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationtestutils" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationtestutils" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog" diff --git a/pkg/testutils/lint/lint_test.go b/pkg/testutils/lint/lint_test.go index bdc8808fcc73..d4d182b12022 100644 --- a/pkg/testutils/lint/lint_test.go +++ b/pkg/testutils/lint/lint_test.go @@ -2466,13 +2466,13 @@ func TestLint(t *testing.T) { ":!ccl/partitionccl/partition_test.go", ":!ccl/partitionccl/zone_test.go", ":!ccl/serverccl/admin_test.go", - ":!ccl/streamingccl/replicationtestutils/testutils.go", - ":!ccl/streamingccl/streamclient/partitioned_stream_client_test.go", - ":!ccl/streamingccl/streamingest/replication_random_client_test.go", - ":!ccl/streamingccl/streamingest/stream_ingestion_job_test.go", - ":!ccl/streamingccl/streamingest/stream_ingestion_processor_test.go", - ":!ccl/streamingccl/streamproducer/producer_job_test.go", - ":!ccl/streamingccl/streamproducer/replication_stream_test.go", + ":!ccl/crosscluster/replicationtestutils/testutils.go", + ":!ccl/crosscluster/streamclient/partitioned_stream_client_test.go", + ":!ccl/crosscluster/physical/replication_random_client_test.go", + ":!ccl/crosscluster/physical/stream_ingestion_job_test.go", + ":!ccl/crosscluster/physical/stream_ingestion_processor_test.go", + ":!ccl/crosscluster/producer/producer_job_test.go", + ":!ccl/crosscluster/producer/replication_stream_test.go", ":!ccl/workloadccl/allccl/all_test.go", ":!cli/democluster/demo_cluster.go", ":!cli/democluster/demo_cluster_test.go",