diff --git a/CHANGELOG.md b/CHANGELOG.md index a2eef18b36..7be34880b6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,12 @@ Changelog All notable changes to this project will be documented in this file. +## 4.43.0 - 2024-12-05 + +### Changed + +- The `pg_stream` input has been renamed to `postgres_cdc`. The old name will continue to function as an alias. (@rockwotj) + ## 4.42.0 - 2024-12-02 ### Added diff --git a/internal/impl/postgresql/input_pg_stream.go b/internal/impl/postgresql/input_pg_stream.go index f3c9b17a61..47d33e175a 100644 --- a/internal/impl/postgresql/input_pg_stream.go +++ b/internal/impl/postgresql/input_pg_stream.go @@ -22,6 +22,7 @@ import ( "github.com/redpanda-data/connect/v4/internal/asyncroutine" "github.com/redpanda-data/connect/v4/internal/impl/postgresql/pglogicalstream" + "github.com/redpanda-data/connect/v4/internal/license" ) const ( @@ -134,6 +135,10 @@ func newPgStreamInput(conf *service.ParsedConfig, mgr *service.Resources) (s ser batching service.BatchPolicy ) + if err := license.CheckRunningEnterprise(mgr); err != nil { + return nil, err + } + if dsn, err = conf.FieldString(fieldDSN); err != nil { return nil, err } diff --git a/internal/impl/postgresql/integration_test.go b/internal/impl/postgresql/integration_test.go index a59c4b1761..c13b04b5fc 100644 --- a/internal/impl/postgresql/integration_test.go +++ b/internal/impl/postgresql/integration_test.go @@ -26,6 +26,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/redpanda-data/connect/v4/internal/license" + "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" ) @@ -210,6 +212,8 @@ file: streamOut, err := streamOutBuilder.Build() require.NoError(t, err) + license.InjectTestService(streamOut.Resources()) + go func() { _ = streamOut.Run(context.Background()) }() @@ -257,6 +261,8 @@ file: streamOut, err = streamOutBuilder.Build() require.NoError(t, err) + license.InjectTestService(streamOut.Resources()) + go func() { assert.NoError(t, streamOut.Run(context.Background())) }() @@ -325,6 +331,8 @@ file: streamOut, err := streamOutBuilder.Build() require.NoError(t, err) + license.InjectTestService(streamOut.Resources()) + go func() { _ = streamOut.Run(context.Background()) }() @@ -415,6 +423,8 @@ file: streamOut, err := streamOutBuilder.Build() require.NoError(t, err) + license.InjectTestService(streamOut.Resources()) + go func() { err = streamOut.Run(context.Background()) require.NoError(t, err) @@ -463,6 +473,8 @@ file: streamOut, err = streamOutBuilder.Build() require.NoError(t, err) + license.InjectTestService(streamOut.Resources()) + go func() { assert.NoError(t, streamOut.Run(context.Background())) }() @@ -562,12 +574,14 @@ file: streamOut, err := streamOutBuilder.Build() require.NoError(t, err) + license.InjectTestService(streamOut.Resources()) + go func() { err = streamOut.Run(context.Background()) require.NoError(t, err) }() - assert.Eventually(t, func() bool { + require.Eventually(t, func() bool { outBatchMut.Lock() defer outBatchMut.Unlock() return len(outBatches) == 1 @@ -664,6 +678,8 @@ file: streamOut, err := streamOutBuilder.Build() require.NoError(t, err) + license.InjectTestService(streamOut.Resources()) + go func() { _ = streamOut.Run(context.Background()) }() @@ -711,6 +727,8 @@ file: streamOut, err = streamOutBuilder.Build() require.NoError(t, err) + license.InjectTestService(streamOut.Resources()) + go func() { assert.NoError(t, streamOut.Run(context.Background())) }()