Skip to content

Commit

Permalink
Merge pull request #3061 from redpanda-data/fix-pg-cdc-license
Browse files Browse the repository at this point in the history
Add postgres_cdc license check and fix tests
  • Loading branch information
Jeffail authored Dec 5, 2024
2 parents 1a79469 + 714968f commit 8ae0c01
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 1 deletion.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ Changelog

All notable changes to this project will be documented in this file.

## 4.43.0 - 2024-12-05

### Changed

- The `pg_stream` input has been renamed to `postgres_cdc`. The old name will continue to function as an alias. (@rockwotj)

## 4.42.0 - 2024-12-02

### Added
Expand Down
5 changes: 5 additions & 0 deletions internal/impl/postgresql/input_pg_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/redpanda-data/connect/v4/internal/asyncroutine"
"github.com/redpanda-data/connect/v4/internal/impl/postgresql/pglogicalstream"
"github.com/redpanda-data/connect/v4/internal/license"
)

const (
Expand Down Expand Up @@ -134,6 +135,10 @@ func newPgStreamInput(conf *service.ParsedConfig, mgr *service.Resources) (s ser
batching service.BatchPolicy
)

if err := license.CheckRunningEnterprise(mgr); err != nil {
return nil, err
}

if dsn, err = conf.FieldString(fieldDSN); err != nil {
return nil, err
}
Expand Down
20 changes: 19 additions & 1 deletion internal/impl/postgresql/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/redpanda-data/connect/v4/internal/license"

"github.com/ory/dockertest/v3"
"github.com/ory/dockertest/v3/docker"
)
Expand Down Expand Up @@ -210,6 +212,8 @@ file:
streamOut, err := streamOutBuilder.Build()
require.NoError(t, err)

license.InjectTestService(streamOut.Resources())

go func() {
_ = streamOut.Run(context.Background())
}()
Expand Down Expand Up @@ -257,6 +261,8 @@ file:
streamOut, err = streamOutBuilder.Build()
require.NoError(t, err)

license.InjectTestService(streamOut.Resources())

go func() {
assert.NoError(t, streamOut.Run(context.Background()))
}()
Expand Down Expand Up @@ -325,6 +331,8 @@ file:
streamOut, err := streamOutBuilder.Build()
require.NoError(t, err)

license.InjectTestService(streamOut.Resources())

go func() {
_ = streamOut.Run(context.Background())
}()
Expand Down Expand Up @@ -415,6 +423,8 @@ file:
streamOut, err := streamOutBuilder.Build()
require.NoError(t, err)

license.InjectTestService(streamOut.Resources())

go func() {
err = streamOut.Run(context.Background())
require.NoError(t, err)
Expand Down Expand Up @@ -463,6 +473,8 @@ file:
streamOut, err = streamOutBuilder.Build()
require.NoError(t, err)

license.InjectTestService(streamOut.Resources())

go func() {
assert.NoError(t, streamOut.Run(context.Background()))
}()
Expand Down Expand Up @@ -562,12 +574,14 @@ file:
streamOut, err := streamOutBuilder.Build()
require.NoError(t, err)

license.InjectTestService(streamOut.Resources())

go func() {
err = streamOut.Run(context.Background())
require.NoError(t, err)
}()

assert.Eventually(t, func() bool {
require.Eventually(t, func() bool {
outBatchMut.Lock()
defer outBatchMut.Unlock()
return len(outBatches) == 1
Expand Down Expand Up @@ -664,6 +678,8 @@ file:
streamOut, err := streamOutBuilder.Build()
require.NoError(t, err)

license.InjectTestService(streamOut.Resources())

go func() {
_ = streamOut.Run(context.Background())
}()
Expand Down Expand Up @@ -711,6 +727,8 @@ file:
streamOut, err = streamOutBuilder.Build()
require.NoError(t, err)

license.InjectTestService(streamOut.Resources())

go func() {
assert.NoError(t, streamOut.Run(context.Background()))
}()
Expand Down

0 comments on commit 8ae0c01

Please sign in to comment.