Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: redpanda-data/connect
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: c8148a6500d998878f0e14687f07a23a59f4e074
Choose a base ref
..
head repository: redpanda-data/connect
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 22076dcf0c4aff01cd6d7e399ff6e08710098677
Choose a head ref
360 changes: 360 additions & 0 deletions internal/impl/kafka/enterprise/integration_foobar_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,360 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Licensed as a Redpanda Enterprise file under the Redpanda Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/redpanda-data/connect/blob/main/licenses/rcl.md

package enterprise_test

import (
"context"
"fmt"
"log"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

_ "github.com/redpanda-data/benthos/v4/public/components/pure"
"github.com/redpanda-data/benthos/v4/public/service"

"github.com/redpanda-data/connect/v4/internal/license"
_ "github.com/redpanda-data/connect/v4/public/components/confluent"
)

func Foobar(t *testing.T) {
// pool, err := dockertest.NewPool("")
// require.NoError(t, err)
// pool.MaxWait = time.Minute

// source, err := startRedpanda(t, pool, true, true)
// require.NoError(t, err)

var source redpandaEndpoints

Check failure on line 36 in internal/impl/kafka/enterprise/integration_foobar_test.go

GitHub Actions / test

undefined: redpandaEndpoints

Check failure on line 36 in internal/impl/kafka/enterprise/integration_foobar_test.go

GitHub Actions / golangci-lint

undefined: redpandaEndpoints
source.brokerAddr = "localhost:9092"
source.schemaRegistryURL = "http://localhost:8081"

dummyTopic := "test"

// Create a schema associated with the test topic
createSchema(t, source.schemaRegistryURL, dummyTopic, fmt.Sprintf(`{"name":"%s", "type": "record", "fields":[{"name":"test", "type": "string"}]}`, dummyTopic), nil)

// Produce one message
streamBuilder := service.NewStreamBuilder()
require.NoError(t, streamBuilder.SetYAML(fmt.Sprintf(`
pipeline:
processors:
- schema_registry_encode:
url: %s
subject: %s
avro_raw_json: true
output:
kafka_franz:
seed_brokers: [ %s ]
topic: %s
`, source.schemaRegistryURL, dummyTopic, source.brokerAddr, dummyTopic)))
require.NoError(t, streamBuilder.SetLoggerYAML(`level: OFF`))

inFunc, err := streamBuilder.AddProducerFunc()
require.NoError(t, err)

stream, err := streamBuilder.Build()
require.NoError(t, err)

license.InjectTestService(stream.Resources())

ctx, done := context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(done)

go func() {
require.NoError(t, inFunc(ctx, service.NewMessage([]byte(`{"test":"foobar"}`))))

require.NoError(t, stream.StopWithin(1*time.Second))
}()

err = stream.Run(ctx)
require.NoError(t, err)

// Read the message using a consumer group
dummyConsumerGroup := "test"

streamBuilder = service.NewStreamBuilder()
require.NoError(t, streamBuilder.SetYAML(fmt.Sprintf(`
input:
redpanda:
seed_brokers: [ %s ]
topics: [ %s ]
consumer_group: %s
start_from_oldest: true
processors:
- schema_registry_decode:
url: %s
avro_raw_json: true
`, source.brokerAddr, dummyTopic, dummyConsumerGroup, source.schemaRegistryURL)))
require.NoError(t, streamBuilder.SetLoggerYAML(`level: info`))

recvChan := make(chan struct{})
err = streamBuilder.AddConsumerFunc(func(ctx context.Context, m *service.Message) error {
b, err := m.AsBytes()
require.NoError(t, err)
assert.Equal(t, `{"test":"foobar"}`, string(b))

close(recvChan)
return nil
})
require.NoError(t, err)

stream, err = streamBuilder.Build()
require.NoError(t, err)

license.InjectTestService(stream.Resources())

ctx, done = context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(done)

go func() {
require.NoError(t, stream.Run(ctx))

println("WTF")
}()

<-recvChan

require.NoError(t, stream.StopWithin(3*time.Second))

time.Sleep(3 * time.Second)

println("WTF3")
}

func TestFoobarWorks(t *testing.T) {
var source redpandaEndpoints

Check failure on line 134 in internal/impl/kafka/enterprise/integration_foobar_test.go

GitHub Actions / test

undefined: redpandaEndpoints

Check failure on line 134 in internal/impl/kafka/enterprise/integration_foobar_test.go

GitHub Actions / golangci-lint

undefined: redpandaEndpoints
source.brokerAddr = "localhost:9092"
source.schemaRegistryURL = "http://localhost:8081"

var destination redpandaEndpoints

Check failure on line 138 in internal/impl/kafka/enterprise/integration_foobar_test.go

GitHub Actions / test

undefined: redpandaEndpoints

Check failure on line 138 in internal/impl/kafka/enterprise/integration_foobar_test.go

GitHub Actions / golangci-lint

undefined: redpandaEndpoints
destination.brokerAddr = "localhost:9093"
destination.schemaRegistryURL = "http://localhost:8082"

dummyTopic := "test"

// Create a schema associated with the test topic
createSchema(t, source.schemaRegistryURL, dummyTopic, fmt.Sprintf(`{"name":"%s", "type": "record", "fields":[{"name":"test", "type": "string"}]}`, dummyTopic), nil)

{
streamBuilder := service.NewStreamBuilder()
require.NoError(t, streamBuilder.SetYAML(fmt.Sprintf(`
input:
broker:
inputs:
- redpanda_migrator:
seed_brokers: [ %s ]
topics: [ %s ]
consumer_group: blobfish
start_from_oldest: true
replication_factor_override: true
replication_factor: -1
- redpanda_migrator_offsets:
seed_brokers: [ %s ]
topics: [ %s ]
processors:
- log:
message: meta ${! @ }
- log:
message: content ${! content() }
output:
redpanda_migrator:
seed_brokers: [ %s ]
topic: test
replication_factor_override: true
replication_factor: -1
`, source.brokerAddr, dummyTopic, source.brokerAddr, dummyTopic, destination.brokerAddr)))
require.NoError(t, streamBuilder.SetLoggerYAML(`level: INFO`))

stream, err := streamBuilder.Build()
require.NoError(t, err)

license.InjectTestService(stream.Resources())

// Run stream in the background and shut it down when the test is finished
migratorCloseChan := make(chan struct{})
go func() {
err = stream.Run(context.Background())
require.NoError(t, err)

t.Log("Migrator shut down")

close(migratorCloseChan)
}()
t.Cleanup(func() {
require.NoError(t, stream.StopWithin(3*time.Second))

<-migratorCloseChan
})
}

for {
log.Println("loop")
time.Sleep(1 * time.Second)
}
}

func TestFoobarFails(t *testing.T) {
// pool, err := dockertest.NewPool("")
// require.NoError(t, err)
// pool.MaxWait = time.Minute

// source, err := startRedpanda(t, pool, true, true)
// require.NoError(t, err)

var source redpandaEndpoints

Check failure on line 214 in internal/impl/kafka/enterprise/integration_foobar_test.go

GitHub Actions / test

undefined: redpandaEndpoints

Check failure on line 214 in internal/impl/kafka/enterprise/integration_foobar_test.go

GitHub Actions / golangci-lint

undefined: redpandaEndpoints
source.brokerAddr = "localhost:9092"
source.schemaRegistryURL = "http://localhost:8081"

var destination redpandaEndpoints

Check failure on line 218 in internal/impl/kafka/enterprise/integration_foobar_test.go

GitHub Actions / test

undefined: redpandaEndpoints

Check failure on line 218 in internal/impl/kafka/enterprise/integration_foobar_test.go

GitHub Actions / golangci-lint

undefined: redpandaEndpoints (typecheck)
destination.brokerAddr = "localhost:9093"
destination.schemaRegistryURL = "http://localhost:8082"

dummyTopic := "test"

// Create a schema associated with the test topic
createSchema(t, source.schemaRegistryURL, dummyTopic, fmt.Sprintf(`{"name":"%s", "type": "record", "fields":[{"name":"test", "type": "string"}]}`, dummyTopic), nil)

{
streamBuilder := service.NewStreamBuilder()
require.NoError(t, streamBuilder.SetYAML(fmt.Sprintf(`
input:
redpanda_migrator_bundle:
redpanda_migrator:
seed_brokers: [ %s ]
topics: [ %s ]
consumer_group: blobfish
start_from_oldest: true
replication_factor_override: true
replication_factor: -1
schema_registry:
url: %s
processors:
- log:
message: meta ${! @ }
- log:
message: content ${! content() }
output:
redpanda_migrator_bundle:
redpanda_migrator:
seed_brokers: [ %s ]
replication_factor_override: true
replication_factor: -1
schema_registry:
url: %s
`, source.brokerAddr, dummyTopic, source.schemaRegistryURL, destination.brokerAddr, destination.schemaRegistryURL)))
require.NoError(t, streamBuilder.SetLoggerYAML(`level: INFO`))

stream, err := streamBuilder.Build()
require.NoError(t, err)

license.InjectTestService(stream.Resources())

// Run stream in the background and shut it down when the test is finished
migratorCloseChan := make(chan struct{})
go func() {
err = stream.Run(context.Background())
require.NoError(t, err)

t.Log("Migrator shut down")

close(migratorCloseChan)
}()
t.Cleanup(func() {
require.NoError(t, stream.StopWithin(3*time.Second))

<-migratorCloseChan
})
}

for {
log.Println("loop")
time.Sleep(1 * time.Second)
}
return
// Produce one message
{
streamBuilder := service.NewStreamBuilder()
require.NoError(t, streamBuilder.SetYAML(fmt.Sprintf(`
pipeline:
processors:
- schema_registry_encode:
url: %s
subject: %s
avro_raw_json: true
output:
kafka_franz:
seed_brokers: [ %s ]
topic: %s
`, source.schemaRegistryURL, dummyTopic, source.brokerAddr, dummyTopic)))
// require.NoError(t, streamBuilder.SetLoggerYAML(`level: OFF`))

inFunc, err := streamBuilder.AddProducerFunc()
require.NoError(t, err)

stream, err := streamBuilder.Build()
require.NoError(t, err)

license.InjectTestService(stream.Resources())

ctx, done := context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(done)

go func() {
require.NoError(t, inFunc(ctx, service.NewMessage([]byte(`{"test":"foobar"}`))))

require.NoError(t, stream.StopWithin(1*time.Second))
}()

err = stream.Run(ctx)
require.NoError(t, err)
}
return
{
streamBuilder := service.NewStreamBuilder()
require.NoError(t, streamBuilder.SetYAML(fmt.Sprintf(`
pipeline:
processors:
- schema_registry_encode:
url: %s
subject: %s
avro_raw_json: true
output:
kafka_franz:
seed_brokers: [ %s ]
topic: %s
`, source.schemaRegistryURL, dummyTopic, source.brokerAddr, dummyTopic)))
// require.NoError(t, streamBuilder.SetLoggerYAML(`level: OFF`))

inFunc, err := streamBuilder.AddProducerFunc()
require.NoError(t, err)

stream, err := streamBuilder.Build()
require.NoError(t, err)

license.InjectTestService(stream.Resources())

ctx, done := context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(done)

go func() {
require.NoError(t, inFunc(ctx, service.NewMessage([]byte(`{"test":"foobar"}`))))

require.NoError(t, stream.StopWithin(1*time.Second))
}()

err = stream.Run(ctx)
require.NoError(t, err)
}

}
2 changes: 1 addition & 1 deletion internal/impl/postgresql/pglogicalstream/logical_stream.go
Original file line number Diff line number Diff line change
@@ -144,7 +144,7 @@ func NewPgStream(ctx context.Context, config *Config) (*Stream, error) {

pubName := "pglog_stream_" + config.ReplicationSlotName
stream.logger.Infof("Creating publication %s for tables: %s", pubName, tableNames)
if err = CreatePublication(ctx, stream.pgConn, pubName, tableNames); err != nil {
if err = CreatePublication(ctx, stream.pgConn, pubName, config.DBSchema, tableNames); err != nil {
return nil, err
}
cleanups = append(cleanups, func() {
Loading