Skip to content

Commit

Permalink
Fix postgres_cdc input
Browse files Browse the repository at this point in the history
Allow quoted identifiers for the table names

Signed-off-by: Mihai Todor <[email protected]>
  • Loading branch information
mihaitodor committed Dec 13, 2024
1 parent 65aadd2 commit 22076dc
Show file tree
Hide file tree
Showing 6 changed files with 435 additions and 24 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ All notable changes to this project will be documented in this file.
### Fixed

- `gcp_bigquery` output with parquet format no longer returns errors incorrectly. (@rockwotj)
- `postgres_cdc` input now allows quoted identifiers for the table names. (@mihaitodor)

## 4.43.1 - 2024-12-09

Expand Down
360 changes: 360 additions & 0 deletions internal/impl/kafka/enterprise/integration_foobar_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,360 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Licensed as a Redpanda Enterprise file under the Redpanda Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/redpanda-data/connect/blob/main/licenses/rcl.md

package enterprise_test

import (
"context"
"fmt"
"log"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

_ "github.com/redpanda-data/benthos/v4/public/components/pure"
"github.com/redpanda-data/benthos/v4/public/service"

"github.com/redpanda-data/connect/v4/internal/license"
_ "github.com/redpanda-data/connect/v4/public/components/confluent"
)

func Foobar(t *testing.T) {
// pool, err := dockertest.NewPool("")
// require.NoError(t, err)
// pool.MaxWait = time.Minute

// source, err := startRedpanda(t, pool, true, true)
// require.NoError(t, err)

var source redpandaEndpoints

Check failure on line 36 in internal/impl/kafka/enterprise/integration_foobar_test.go

View workflow job for this annotation

GitHub Actions / test

undefined: redpandaEndpoints

Check failure on line 36 in internal/impl/kafka/enterprise/integration_foobar_test.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: redpandaEndpoints
source.brokerAddr = "localhost:9092"
source.schemaRegistryURL = "http://localhost:8081"

dummyTopic := "test"

// Create a schema associated with the test topic
createSchema(t, source.schemaRegistryURL, dummyTopic, fmt.Sprintf(`{"name":"%s", "type": "record", "fields":[{"name":"test", "type": "string"}]}`, dummyTopic), nil)

// Produce one message
streamBuilder := service.NewStreamBuilder()
require.NoError(t, streamBuilder.SetYAML(fmt.Sprintf(`
pipeline:
processors:
- schema_registry_encode:
url: %s
subject: %s
avro_raw_json: true
output:
kafka_franz:
seed_brokers: [ %s ]
topic: %s
`, source.schemaRegistryURL, dummyTopic, source.brokerAddr, dummyTopic)))
require.NoError(t, streamBuilder.SetLoggerYAML(`level: OFF`))

inFunc, err := streamBuilder.AddProducerFunc()
require.NoError(t, err)

stream, err := streamBuilder.Build()
require.NoError(t, err)

license.InjectTestService(stream.Resources())

ctx, done := context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(done)

go func() {
require.NoError(t, inFunc(ctx, service.NewMessage([]byte(`{"test":"foobar"}`))))

require.NoError(t, stream.StopWithin(1*time.Second))
}()

err = stream.Run(ctx)
require.NoError(t, err)

// Read the message using a consumer group
dummyConsumerGroup := "test"

streamBuilder = service.NewStreamBuilder()
require.NoError(t, streamBuilder.SetYAML(fmt.Sprintf(`
input:
redpanda:
seed_brokers: [ %s ]
topics: [ %s ]
consumer_group: %s
start_from_oldest: true
processors:
- schema_registry_decode:
url: %s
avro_raw_json: true
`, source.brokerAddr, dummyTopic, dummyConsumerGroup, source.schemaRegistryURL)))
require.NoError(t, streamBuilder.SetLoggerYAML(`level: info`))

recvChan := make(chan struct{})
err = streamBuilder.AddConsumerFunc(func(ctx context.Context, m *service.Message) error {
b, err := m.AsBytes()
require.NoError(t, err)
assert.Equal(t, `{"test":"foobar"}`, string(b))

close(recvChan)
return nil
})
require.NoError(t, err)

stream, err = streamBuilder.Build()
require.NoError(t, err)

license.InjectTestService(stream.Resources())

ctx, done = context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(done)

go func() {
require.NoError(t, stream.Run(ctx))

println("WTF")
}()

<-recvChan

require.NoError(t, stream.StopWithin(3*time.Second))

time.Sleep(3 * time.Second)

println("WTF3")
}

func TestFoobarWorks(t *testing.T) {
var source redpandaEndpoints

Check failure on line 134 in internal/impl/kafka/enterprise/integration_foobar_test.go

View workflow job for this annotation

GitHub Actions / test

undefined: redpandaEndpoints

Check failure on line 134 in internal/impl/kafka/enterprise/integration_foobar_test.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: redpandaEndpoints
source.brokerAddr = "localhost:9092"
source.schemaRegistryURL = "http://localhost:8081"

var destination redpandaEndpoints

Check failure on line 138 in internal/impl/kafka/enterprise/integration_foobar_test.go

View workflow job for this annotation

GitHub Actions / test

undefined: redpandaEndpoints

Check failure on line 138 in internal/impl/kafka/enterprise/integration_foobar_test.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: redpandaEndpoints
destination.brokerAddr = "localhost:9093"
destination.schemaRegistryURL = "http://localhost:8082"

dummyTopic := "test"

// Create a schema associated with the test topic
createSchema(t, source.schemaRegistryURL, dummyTopic, fmt.Sprintf(`{"name":"%s", "type": "record", "fields":[{"name":"test", "type": "string"}]}`, dummyTopic), nil)

{
streamBuilder := service.NewStreamBuilder()
require.NoError(t, streamBuilder.SetYAML(fmt.Sprintf(`
input:
broker:
inputs:
- redpanda_migrator:
seed_brokers: [ %s ]
topics: [ %s ]
consumer_group: blobfish
start_from_oldest: true
replication_factor_override: true
replication_factor: -1
- redpanda_migrator_offsets:
seed_brokers: [ %s ]
topics: [ %s ]
processors:
- log:
message: meta ${! @ }
- log:
message: content ${! content() }
output:
redpanda_migrator:
seed_brokers: [ %s ]
topic: test
replication_factor_override: true
replication_factor: -1
`, source.brokerAddr, dummyTopic, source.brokerAddr, dummyTopic, destination.brokerAddr)))
require.NoError(t, streamBuilder.SetLoggerYAML(`level: INFO`))

stream, err := streamBuilder.Build()
require.NoError(t, err)

license.InjectTestService(stream.Resources())

// Run stream in the background and shut it down when the test is finished
migratorCloseChan := make(chan struct{})
go func() {
err = stream.Run(context.Background())
require.NoError(t, err)

t.Log("Migrator shut down")

close(migratorCloseChan)
}()
t.Cleanup(func() {
require.NoError(t, stream.StopWithin(3*time.Second))

<-migratorCloseChan
})
}

for {
log.Println("loop")
time.Sleep(1 * time.Second)
}
}

func TestFoobarFails(t *testing.T) {
// pool, err := dockertest.NewPool("")
// require.NoError(t, err)
// pool.MaxWait = time.Minute

// source, err := startRedpanda(t, pool, true, true)
// require.NoError(t, err)

var source redpandaEndpoints

Check failure on line 214 in internal/impl/kafka/enterprise/integration_foobar_test.go

View workflow job for this annotation

GitHub Actions / test

undefined: redpandaEndpoints

Check failure on line 214 in internal/impl/kafka/enterprise/integration_foobar_test.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: redpandaEndpoints
source.brokerAddr = "localhost:9092"
source.schemaRegistryURL = "http://localhost:8081"

var destination redpandaEndpoints

Check failure on line 218 in internal/impl/kafka/enterprise/integration_foobar_test.go

View workflow job for this annotation

GitHub Actions / test

undefined: redpandaEndpoints

Check failure on line 218 in internal/impl/kafka/enterprise/integration_foobar_test.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: redpandaEndpoints (typecheck)
destination.brokerAddr = "localhost:9093"
destination.schemaRegistryURL = "http://localhost:8082"

dummyTopic := "test"

// Create a schema associated with the test topic
createSchema(t, source.schemaRegistryURL, dummyTopic, fmt.Sprintf(`{"name":"%s", "type": "record", "fields":[{"name":"test", "type": "string"}]}`, dummyTopic), nil)

{
streamBuilder := service.NewStreamBuilder()
require.NoError(t, streamBuilder.SetYAML(fmt.Sprintf(`
input:
redpanda_migrator_bundle:
redpanda_migrator:
seed_brokers: [ %s ]
topics: [ %s ]
consumer_group: blobfish
start_from_oldest: true
replication_factor_override: true
replication_factor: -1
schema_registry:
url: %s
processors:
- log:
message: meta ${! @ }
- log:
message: content ${! content() }
output:
redpanda_migrator_bundle:
redpanda_migrator:
seed_brokers: [ %s ]
replication_factor_override: true
replication_factor: -1
schema_registry:
url: %s
`, source.brokerAddr, dummyTopic, source.schemaRegistryURL, destination.brokerAddr, destination.schemaRegistryURL)))
require.NoError(t, streamBuilder.SetLoggerYAML(`level: INFO`))

stream, err := streamBuilder.Build()
require.NoError(t, err)

license.InjectTestService(stream.Resources())

// Run stream in the background and shut it down when the test is finished
migratorCloseChan := make(chan struct{})
go func() {
err = stream.Run(context.Background())
require.NoError(t, err)

t.Log("Migrator shut down")

close(migratorCloseChan)
}()
t.Cleanup(func() {
require.NoError(t, stream.StopWithin(3*time.Second))

<-migratorCloseChan
})
}

for {
log.Println("loop")
time.Sleep(1 * time.Second)
}
return
// Produce one message
{
streamBuilder := service.NewStreamBuilder()
require.NoError(t, streamBuilder.SetYAML(fmt.Sprintf(`
pipeline:
processors:
- schema_registry_encode:
url: %s
subject: %s
avro_raw_json: true
output:
kafka_franz:
seed_brokers: [ %s ]
topic: %s
`, source.schemaRegistryURL, dummyTopic, source.brokerAddr, dummyTopic)))
// require.NoError(t, streamBuilder.SetLoggerYAML(`level: OFF`))

inFunc, err := streamBuilder.AddProducerFunc()
require.NoError(t, err)

stream, err := streamBuilder.Build()
require.NoError(t, err)

license.InjectTestService(stream.Resources())

ctx, done := context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(done)

go func() {
require.NoError(t, inFunc(ctx, service.NewMessage([]byte(`{"test":"foobar"}`))))

require.NoError(t, stream.StopWithin(1*time.Second))
}()

err = stream.Run(ctx)
require.NoError(t, err)
}
return
{
streamBuilder := service.NewStreamBuilder()
require.NoError(t, streamBuilder.SetYAML(fmt.Sprintf(`
pipeline:
processors:
- schema_registry_encode:
url: %s
subject: %s
avro_raw_json: true
output:
kafka_franz:
seed_brokers: [ %s ]
topic: %s
`, source.schemaRegistryURL, dummyTopic, source.brokerAddr, dummyTopic)))
// require.NoError(t, streamBuilder.SetLoggerYAML(`level: OFF`))

inFunc, err := streamBuilder.AddProducerFunc()
require.NoError(t, err)

stream, err := streamBuilder.Build()
require.NoError(t, err)

license.InjectTestService(stream.Resources())

ctx, done := context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(done)

go func() {
require.NoError(t, inFunc(ctx, service.NewMessage([]byte(`{"test":"foobar"}`))))

require.NoError(t, stream.StopWithin(1*time.Second))
}()

err = stream.Run(ctx)
require.NoError(t, err)
}

}
2 changes: 1 addition & 1 deletion internal/impl/postgresql/pglogicalstream/logical_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func NewPgStream(ctx context.Context, config *Config) (*Stream, error) {

pubName := "pglog_stream_" + config.ReplicationSlotName
stream.logger.Infof("Creating publication %s for tables: %s", pubName, tableNames)
if err = CreatePublication(ctx, stream.pgConn, pubName, tableNames); err != nil {
if err = CreatePublication(ctx, stream.pgConn, pubName, config.DBSchema, tableNames); err != nil {
return nil, err
}
cleanups = append(cleanups, func() {
Expand Down
Loading

0 comments on commit 22076dc

Please sign in to comment.