From b079ce57f2a1606e999732e9466e263823b0c982 Mon Sep 17 00:00:00 2001 From: Tyler Rockwood Date: Tue, 3 Dec 2024 15:39:36 +0000 Subject: [PATCH] pgcdc: rename component to have CDC in the name --- .../components/pages/inputs/pg_stream.adoc | 7 +- .../components/pages/inputs/postgres_cdc.adoc | 393 ++++++++++++++++++ internal/impl/postgresql/input_pg_stream.go | 115 ++--- internal/plugins/info.csv | 3 +- 4 files changed, 462 insertions(+), 56 deletions(-) create mode 100644 docs/modules/components/pages/inputs/postgres_cdc.adoc diff --git a/docs/modules/components/pages/inputs/pg_stream.adoc b/docs/modules/components/pages/inputs/pg_stream.adoc index 5a9274eb4d..3dd54827dc 100644 --- a/docs/modules/components/pages/inputs/pg_stream.adoc +++ b/docs/modules/components/pages/inputs/pg_stream.adoc @@ -1,6 +1,6 @@ = pg_stream :type: input -:status: beta +:status: deprecated :categories: ["Services"] @@ -23,6 +23,11 @@ component_type_dropdown::[] +[WARNING] +.Deprecated +==== +This component is deprecated and will be removed in the next major version release. Please consider moving onto <>. +==== Streams changes from a PostgreSQL database using logical replication. Introduced in version 4.39.0. diff --git a/docs/modules/components/pages/inputs/postgres_cdc.adoc b/docs/modules/components/pages/inputs/postgres_cdc.adoc new file mode 100644 index 0000000000..fd366f117c --- /dev/null +++ b/docs/modules/components/pages/inputs/postgres_cdc.adoc @@ -0,0 +1,393 @@ += postgres_cdc +:type: input +:status: beta +:categories: ["Services"] + + + +//// + THIS FILE IS AUTOGENERATED! + + To make changes, edit the corresponding source file under: + + https://github.com/redpanda-data/connect/tree/main/internal/impl/. + + And: + + https://github.com/redpanda-data/connect/tree/main/cmd/tools/docs_gen/templates/plugin.adoc.tmpl +//// + +// © 2024 Redpanda Data Inc. + + +component_type_dropdown::[] + + +Streams changes from a PostgreSQL database using logical replication. + +Introduced in version 4.39.0. + + +[tabs] +====== +Common:: ++ +-- + +```yml +# Common config fields, showing default values +input: + label: "" + postgres_cdc: + dsn: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable # No default (required) + include_transaction_markers: false + stream_snapshot: false + snapshot_memory_safety_factor: 1 + snapshot_batch_size: 0 + schema: public # No default (required) + tables: [] # No default (required) + checkpoint_limit: 1024 + temporary_slot: false + slot_name: "" + pg_standby_timeout: 10s + pg_wal_monitor_interval: 3s + max_parallel_snapshot_tables: 1 + auto_replay_nacks: true + batching: + count: 0 + byte_size: 0 + period: "" + check: "" +``` + +-- +Advanced:: ++ +-- + +```yml +# All config fields, showing default values +input: + label: "" + postgres_cdc: + dsn: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable # No default (required) + include_transaction_markers: false + stream_snapshot: false + snapshot_memory_safety_factor: 1 + snapshot_batch_size: 0 + schema: public # No default (required) + tables: [] # No default (required) + checkpoint_limit: 1024 + temporary_slot: false + slot_name: "" + pg_standby_timeout: 10s + pg_wal_monitor_interval: 3s + max_parallel_snapshot_tables: 1 + auto_replay_nacks: true + batching: + count: 0 + byte_size: 0 + period: "" + check: "" + processors: [] # No default (optional) +``` + +-- +====== + +Streams changes from a PostgreSQL database for Change Data Capture (CDC). +Additionally, if `stream_snapshot` is set to true, then the existing data in the database is also streamed too. + +== Metadata + +This input adds the following metadata fields to each message: +- mode (Either "streaming" or "snapshot" indicating whether the message is part of a streaming operation or snapshot processing) +- table (Name of the table that the message originated from) +- operation (Type of operation that generated the message: "insert", "update", or "delete". This will also be "begin" and "commit" if `include_transaction_markers` is enabled) + + +== Fields + +=== `dsn` + +The Data Source Name for the PostgreSQL database in the form of `postgres://[user[:password]@][netloc][:port][/dbname][?param1=value1&...]`. Please note that Postgres enforces SSL by default, you can override this with the parameter `sslmode=disable` if required. + + +*Type*: `string` + + +```yml +# Examples + +dsn: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable +``` + +=== `include_transaction_markers` + +When set to true, empty messages with operation types BEGIN and COMMIT are generated for the beginning and end of each transaction. Messages with operation metadata set to "begin" or "commit" will have null message payloads. + + +*Type*: `bool` + +*Default*: `false` + +=== `stream_snapshot` + +When set to true, the plugin will first stream a snapshot of all existing data in the database before streaming changes. In order to use this the tables that are being snapshot MUST have a primary key set so that reading from the table can be parallelized. + + +*Type*: `bool` + +*Default*: `false` + +```yml +# Examples + +stream_snapshot: true +``` + +=== `snapshot_memory_safety_factor` + +Determines the fraction of available memory that can be used for streaming the snapshot. Values between 0 and 1 represent the percentage of memory to use. Lower values make initial streaming slower but help prevent out-of-memory errors. + + +*Type*: `float` + +*Default*: `1` + +```yml +# Examples + +snapshot_memory_safety_factor: 0.2 +``` + +=== `snapshot_batch_size` + +The number of rows to fetch in each batch when querying the snapshot. A value of 0 lets the plugin determine the batch size based on `snapshot_memory_safety_factor` property. + + +*Type*: `int` + +*Default*: `0` + +```yml +# Examples + +snapshot_batch_size: 10000 +``` + +=== `schema` + +The PostgreSQL schema from which to replicate data. + + +*Type*: `string` + + +```yml +# Examples + +schema: public +``` + +=== `tables` + +A list of table names to include in the logical replication. Each table should be specified as a separate item. + + +*Type*: `array` + + +```yml +# Examples + +tables: |2- + - my_table + - my_table_2 + +``` + +=== `checkpoint_limit` + +The maximum number of messages that can be processed at a given time. Increasing this limit enables parallel processing and batching at the output level. Any given LSN will not be acknowledged unless all messages under that offset are delivered in order to preserve at least once delivery guarantees. + + +*Type*: `int` + +*Default*: `1024` + +=== `temporary_slot` + +If set to true, creates a temporary replication slot that is automatically dropped when the connection is closed. + + +*Type*: `bool` + +*Default*: `false` + +=== `slot_name` + +The name of the PostgreSQL logical replication slot to use. If not provided, a random name will be generated. You can create this slot manually before starting replication if desired. + + +*Type*: `string` + +*Default*: `""` + +```yml +# Examples + +slot_name: my_test_slot +``` + +=== `pg_standby_timeout` + +Specify the standby timeout before refreshing an idle connection. + + +*Type*: `string` + +*Default*: `"10s"` + +```yml +# Examples + +pg_standby_timeout: 30s +``` + +=== `pg_wal_monitor_interval` + +How often to report changes to the replication lag. + + +*Type*: `string` + +*Default*: `"3s"` + +```yml +# Examples + +pg_wal_monitor_interval: 6s +``` + +=== `max_parallel_snapshot_tables` + +Int specifies a number of tables that will be processed in parallel during the snapshot processing stage + + +*Type*: `int` + +*Default*: `1` + +=== `auto_replay_nacks` + +Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to `false` these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation. + + +*Type*: `bool` + +*Default*: `true` + +=== `batching` + +Allows you to configure a xref:configuration:batching.adoc[batching policy]. + + +*Type*: `object` + + +```yml +# Examples + +batching: + byte_size: 5000 + count: 0 + period: 1s + +batching: + count: 10 + period: 1s + +batching: + check: this.contains("END BATCH") + count: 0 + period: 1m +``` + +=== `batching.count` + +A number of messages at which the batch should be flushed. If `0` disables count based batching. + + +*Type*: `int` + +*Default*: `0` + +=== `batching.byte_size` + +An amount of bytes at which the batch should be flushed. If `0` disables size based batching. + + +*Type*: `int` + +*Default*: `0` + +=== `batching.period` + +A period in which an incomplete batch should be flushed regardless of its size. + + +*Type*: `string` + +*Default*: `""` + +```yml +# Examples + +period: 1s + +period: 1m + +period: 500ms +``` + +=== `batching.check` + +A xref:guides:bloblang/about.adoc[Bloblang query] that should return a boolean value indicating whether a message should end a batch. + + +*Type*: `string` + +*Default*: `""` + +```yml +# Examples + +check: this.type == "end_of_transaction" +``` + +=== `batching.processors` + +A list of xref:components:processors/about.adoc[processors] to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op. + + +*Type*: `array` + + +```yml +# Examples + +processors: + - archive: + format: concatenate + +processors: + - archive: + format: lines + +processors: + - archive: + format: json_array +``` + + diff --git a/internal/impl/postgresql/input_pg_stream.go b/internal/impl/postgresql/input_pg_stream.go index d5cba76b16..d451287946 100644 --- a/internal/impl/postgresql/input_pg_stream.go +++ b/internal/impl/postgresql/input_pg_stream.go @@ -48,12 +48,13 @@ type asyncMessage struct { ackFn service.AckFunc } -var pgStreamConfigSpec = service.NewConfigSpec(). - Beta(). - Categories("Services"). - Version("4.39.0"). - Summary(`Streams changes from a PostgreSQL database using logical replication.`). - Description(`Streams changes from a PostgreSQL database for Change Data Capture (CDC). +func newPostgresCDCConfig() *service.ConfigSpec { + return service.NewConfigSpec(). + Beta(). + Categories("Services"). + Version("4.39.0"). + Summary(`Streams changes from a PostgreSQL database using logical replication.`). + Description(`Streams changes from a PostgreSQL database for Change Data Capture (CDC). Additionally, if ` + "`" + fieldStreamSnapshot + "`" + ` is set to true, then the existing data in the database is also streamed too. == Metadata @@ -63,56 +64,57 @@ This input adds the following metadata fields to each message: - table (Name of the table that the message originated from) - operation (Type of operation that generated the message: "insert", "update", or "delete". This will also be "begin" and "commit" if ` + "`" + fieldIncludeTxnMarkers + "`" + ` is enabled) `). - Field(service.NewStringField(fieldDSN). - Description("The Data Source Name for the PostgreSQL database in the form of `postgres://[user[:password]@][netloc][:port][/dbname][?param1=value1&...]`. Please note that Postgres enforces SSL by default, you can override this with the parameter `sslmode=disable` if required."). - Example("postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable")). - Field(service.NewBoolField(fieldIncludeTxnMarkers). - Description(`When set to true, empty messages with operation types BEGIN and COMMIT are generated for the beginning and end of each transaction. Messages with operation metadata set to "begin" or "commit" will have null message payloads.`). - Default(false)). - Field(service.NewBoolField(fieldStreamSnapshot). - Description("When set to true, the plugin will first stream a snapshot of all existing data in the database before streaming changes. In order to use this the tables that are being snapshot MUST have a primary key set so that reading from the table can be parallelized."). - Example(true). - Default(false)). - Field(service.NewFloatField(fieldSnapshotMemSafetyFactor). - Description("Determines the fraction of available memory that can be used for streaming the snapshot. Values between 0 and 1 represent the percentage of memory to use. Lower values make initial streaming slower but help prevent out-of-memory errors."). - Example(0.2). - Default(1)). - Field(service.NewIntField(fieldSnapshotBatchSize). - Description("The number of rows to fetch in each batch when querying the snapshot. A value of 0 lets the plugin determine the batch size based on `snapshot_memory_safety_factor` property."). - Example(10000). - Default(0)). - Field(service.NewStringField(fieldSchema). - Description("The PostgreSQL schema from which to replicate data."). - Example("public")). - Field(service.NewStringListField(fieldTables). - Description("A list of table names to include in the logical replication. Each table should be specified as a separate item."). - Example(` + Field(service.NewStringField(fieldDSN). + Description("The Data Source Name for the PostgreSQL database in the form of `postgres://[user[:password]@][netloc][:port][/dbname][?param1=value1&...]`. Please note that Postgres enforces SSL by default, you can override this with the parameter `sslmode=disable` if required."). + Example("postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable")). + Field(service.NewBoolField(fieldIncludeTxnMarkers). + Description(`When set to true, empty messages with operation types BEGIN and COMMIT are generated for the beginning and end of each transaction. Messages with operation metadata set to "begin" or "commit" will have null message payloads.`). + Default(false)). + Field(service.NewBoolField(fieldStreamSnapshot). + Description("When set to true, the plugin will first stream a snapshot of all existing data in the database before streaming changes. In order to use this the tables that are being snapshot MUST have a primary key set so that reading from the table can be parallelized."). + Example(true). + Default(false)). + Field(service.NewFloatField(fieldSnapshotMemSafetyFactor). + Description("Determines the fraction of available memory that can be used for streaming the snapshot. Values between 0 and 1 represent the percentage of memory to use. Lower values make initial streaming slower but help prevent out-of-memory errors."). + Example(0.2). + Default(1)). + Field(service.NewIntField(fieldSnapshotBatchSize). + Description("The number of rows to fetch in each batch when querying the snapshot. A value of 0 lets the plugin determine the batch size based on `snapshot_memory_safety_factor` property."). + Example(10000). + Default(0)). + Field(service.NewStringField(fieldSchema). + Description("The PostgreSQL schema from which to replicate data."). + Example("public")). + Field(service.NewStringListField(fieldTables). + Description("A list of table names to include in the logical replication. Each table should be specified as a separate item."). + Example(` - my_table - my_table_2 `)). - Field(service.NewIntField(fieldCheckpointLimit). - Description("The maximum number of messages that can be processed at a given time. Increasing this limit enables parallel processing and batching at the output level. Any given LSN will not be acknowledged unless all messages under that offset are delivered in order to preserve at least once delivery guarantees."). - Default(1024)). - Field(service.NewBoolField(fieldTemporarySlot). - Description("If set to true, creates a temporary replication slot that is automatically dropped when the connection is closed."). - Default(false)). - Field(service.NewStringField(fieldSlotName). - Description("The name of the PostgreSQL logical replication slot to use. If not provided, a random name will be generated. You can create this slot manually before starting replication if desired."). - Example("my_test_slot"). - Default("")). - Field(service.NewDurationField(fieldPgStandbyTimeout). - Description("Specify the standby timeout before refreshing an idle connection."). - Example("30s"). - Default("10s")). - Field(service.NewDurationField(fieldWalMonitorInterval). - Description("How often to report changes to the replication lag."). - Example("6s"). - Default("3s")). - Field(service.NewIntField(fieldMaxParallelSnapshotTables). - Description("Int specifies a number of tables that will be processed in parallel during the snapshot processing stage"). - Default(1)). - Field(service.NewAutoRetryNacksToggleField()). - Field(service.NewBatchPolicyField(fieldBatching)) + Field(service.NewIntField(fieldCheckpointLimit). + Description("The maximum number of messages that can be processed at a given time. Increasing this limit enables parallel processing and batching at the output level. Any given LSN will not be acknowledged unless all messages under that offset are delivered in order to preserve at least once delivery guarantees."). + Default(1024)). + Field(service.NewBoolField(fieldTemporarySlot). + Description("If set to true, creates a temporary replication slot that is automatically dropped when the connection is closed."). + Default(false)). + Field(service.NewStringField(fieldSlotName). + Description("The name of the PostgreSQL logical replication slot to use. If not provided, a random name will be generated. You can create this slot manually before starting replication if desired."). + Example("my_test_slot"). + Default("")). + Field(service.NewDurationField(fieldPgStandbyTimeout). + Description("Specify the standby timeout before refreshing an idle connection."). + Example("30s"). + Default("10s")). + Field(service.NewDurationField(fieldWalMonitorInterval). + Description("How often to report changes to the replication lag."). + Example("6s"). + Default("3s")). + Field(service.NewIntField(fieldMaxParallelSnapshotTables). + Description("Int specifies a number of tables that will be processed in parallel during the snapshot processing stage"). + Default(1)). + Field(service.NewAutoRetryNacksToggleField()). + Field(service.NewBatchPolicyField(fieldBatching)) +} func newPgStreamInput(conf *service.ParsedConfig, mgr *service.Resources) (s service.BatchInput, err error) { var ( @@ -270,7 +272,12 @@ func validateSimpleString(s string) error { } func init() { - err := service.RegisterBatchInput("pg_stream", pgStreamConfigSpec, newPgStreamInput) + err := service.RegisterBatchInput("postgres_cdc", newPostgresCDCConfig(), newPgStreamInput) + if err != nil { + panic(err) + } + // Legacy naming + err = service.RegisterBatchInput("pg_stream", newPostgresCDCConfig().Deprecated(), newPgStreamInput) if err != nil { panic(err) } diff --git a/internal/plugins/info.csv b/internal/plugins/info.csv index a26760f7b0..397eb732c5 100644 --- a/internal/plugins/info.csv +++ b/internal/plugins/info.csv @@ -170,8 +170,9 @@ parquet ,processor ,parquet ,3.62.0 ,commun parquet_decode ,processor ,parquet_decode ,4.4.0 ,certified ,n ,y ,y parquet_encode ,processor ,parquet_encode ,4.4.0 ,certified ,n ,y ,y parse_log ,processor ,parse_log ,0.0.0 ,community ,n ,y ,y -pg_stream ,input ,pg_stream ,0.0.0 ,enterprise ,n ,y ,y +pg_stream ,input ,pg_stream ,0.0.0 ,enterprise ,y ,y ,y pinecone ,output ,pinecone ,4.31.0 ,certified ,n ,y ,y +postgres_cdc ,input ,postgres_cdc ,4.43.0 ,enterprise ,n ,y ,y processors ,processor ,processors ,0.0.0 ,certified ,n ,y ,y prometheus ,metric ,prometheus ,0.0.0 ,certified ,n ,y ,y protobuf ,processor ,Protobuf ,0.0.0 ,certified ,n ,y ,y