From 08dbfa19e254f3c9668bcfadb39cb9621b320535 Mon Sep 17 00:00:00 2001 From: Tyler Rockwood Date: Thu, 5 Dec 2024 11:29:23 +0000 Subject: [PATCH 1/3] pgcdc: consolidate mode into operation --- internal/impl/postgresql/input_pg_stream.go | 7 +++---- .../impl/postgresql/pglogicalstream/logical_stream.go | 10 ++++------ .../pglogicalstream/replication_message_decoders.go | 2 +- .../impl/postgresql/pglogicalstream/stream_message.go | 11 ++++++----- 4 files changed, 14 insertions(+), 16 deletions(-) diff --git a/internal/impl/postgresql/input_pg_stream.go b/internal/impl/postgresql/input_pg_stream.go index 47d33e175a..5c52274196 100644 --- a/internal/impl/postgresql/input_pg_stream.go +++ b/internal/impl/postgresql/input_pg_stream.go @@ -61,9 +61,9 @@ Additionally, if ` + "`" + fieldStreamSnapshot + "`" + ` is set to true, then th == Metadata This input adds the following metadata fields to each message: -- mode (Either "streaming" or "snapshot" indicating whether the message is part of a streaming operation or snapshot processing) - table (Name of the table that the message originated from) -- operation (Type of operation that generated the message: "insert", "update", or "delete". This will also be "begin" and "commit" if ` + "`" + fieldIncludeTxnMarkers + "`" + ` is enabled) +- operation (Type of operation that generated the message: "read", "insert", "update", or "delete". "read" is from messages that are read in the initial snapshot phase. This will also be "begin" and "commit" if ` + "`" + fieldIncludeTxnMarkers + "`" + ` is enabled) +- lsn (the log sequence number in postgres) `). Field(service.NewStringField(fieldDSN). Description("The Data Source Name for the PostgreSQL database in the form of `postgres://[user[:password]@][netloc][:port][/dbname][?param1=value1&...]`. Please note that Postgres enforces SSL by default, you can override this with the parameter `sslmode=disable` if required."). @@ -259,7 +259,7 @@ func newPgStreamInput(conf *service.ParsedConfig, mgr *service.Resources) (s ser return nil, err } - return conf.WrapBatchInputExtractTracingSpanMapping("pg_stream", r) + return conf.WrapBatchInputExtractTracingSpanMapping("postgres_cdc", r) } // validateSimpleString ensures we aren't vuln to SQL injection @@ -367,7 +367,6 @@ func (p *pgStreamInput) processStream(pgStream *pglogicalstream.Stream, batcher break } batchMsg := service.NewMessage(mb) - batchMsg.MetaSet("mode", string(message.Mode)) batchMsg.MetaSet("table", message.Table) batchMsg.MetaSet("operation", string(message.Operation)) if message.LSN != nil { diff --git a/internal/impl/postgresql/pglogicalstream/logical_stream.go b/internal/impl/postgresql/pglogicalstream/logical_stream.go index 5d46407877..3bde5984e0 100644 --- a/internal/impl/postgresql/pglogicalstream/logical_stream.go +++ b/internal/impl/postgresql/pglogicalstream/logical_stream.go @@ -580,12 +580,10 @@ func (s *Stream) processSnapshot() error { snapshotChangePacket := StreamMessage{ LSN: nil, - Mode: StreamModeSnapshot, - Operation: InsertOpType, - - Table: tableWithoutSchema, - Schema: s.schema, - Data: data, + Operation: ReadOpType, + Table: tableWithoutSchema, + Schema: s.schema, + Data: data, } if rowsCount%100 == 0 { diff --git a/internal/impl/postgresql/pglogicalstream/replication_message_decoders.go b/internal/impl/postgresql/pglogicalstream/replication_message_decoders.go index b82202e4d6..5a98789800 100644 --- a/internal/impl/postgresql/pglogicalstream/replication_message_decoders.go +++ b/internal/impl/postgresql/pglogicalstream/replication_message_decoders.go @@ -47,7 +47,7 @@ func isCommitMessage(WALData []byte) (bool, *CommitMessage, error) { // before the change message. func decodePgOutput(WALData []byte, relations map[uint32]*RelationMessage, typeMap *pgtype.Map) (*StreamMessage, error) { logicalMsg, err := Parse(WALData) - message := &StreamMessage{Mode: StreamModeStreaming} + message := &StreamMessage{} if err != nil { return nil, err diff --git a/internal/impl/postgresql/pglogicalstream/stream_message.go b/internal/impl/postgresql/pglogicalstream/stream_message.go index bf75faaa0a..c01537f7de 100644 --- a/internal/impl/postgresql/pglogicalstream/stream_message.go +++ b/internal/impl/postgresql/pglogicalstream/stream_message.go @@ -22,6 +22,8 @@ const ( type OpType string const ( + // ReadOpType is a snapshot read + ReadOpType OpType = "read" // InsertOpType is a database insert InsertOpType OpType = "insert" // UpdateOpType is a database update @@ -36,11 +38,10 @@ const ( // StreamMessage represents a single change from the database type StreamMessage struct { - LSN *string `json:"lsn"` - Operation OpType `json:"operation"` - Schema string `json:"schema"` - Table string `json:"table"` - Mode StreamMode `json:"mode"` + LSN *string `json:"lsn"` + Operation OpType `json:"operation"` + Schema string `json:"schema"` + Table string `json:"table"` // For deleted messages - there will be old changes if replica identity set to full or empty changes Data any `json:"data"` } From b8d25e018172800c079ee829f753ce16c1d56bb8 Mon Sep 17 00:00:00 2001 From: Tyler Rockwood Date: Thu, 5 Dec 2024 11:31:30 +0000 Subject: [PATCH 2/3] pgcdc: add changelog entry --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7be34880b6..af55573768 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ All notable changes to this project will be documented in this file. ### Changed - The `pg_stream` input has been renamed to `postgres_cdc`. The old name will continue to function as an alias. (@rockwotj) +- The `postgres_cdc` input no longer emits `mode` metadata and instead snapshot reads set `operation` metadata to be `read` instead of `insert`. (@rockwotj) ## 4.42.0 - 2024-12-02 From 181ad119daaa0c628a8b6b4f82cbfce18342aa08 Mon Sep 17 00:00:00 2001 From: Tyler Rockwood Date: Thu, 5 Dec 2024 11:35:22 +0000 Subject: [PATCH 3/3] pgcdc: update docs --- docs/modules/components/pages/inputs/pg_stream.adoc | 4 ++-- docs/modules/components/pages/inputs/postgres_cdc.adoc | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/modules/components/pages/inputs/pg_stream.adoc b/docs/modules/components/pages/inputs/pg_stream.adoc index 3dd54827dc..ffa3a21ecd 100644 --- a/docs/modules/components/pages/inputs/pg_stream.adoc +++ b/docs/modules/components/pages/inputs/pg_stream.adoc @@ -106,9 +106,9 @@ Additionally, if `stream_snapshot` is set to true, then the existing data in the == Metadata This input adds the following metadata fields to each message: -- mode (Either "streaming" or "snapshot" indicating whether the message is part of a streaming operation or snapshot processing) - table (Name of the table that the message originated from) -- operation (Type of operation that generated the message: "insert", "update", or "delete". This will also be "begin" and "commit" if `include_transaction_markers` is enabled) +- operation (Type of operation that generated the message: "read", "insert", "update", or "delete". "read" is from messages that are read in the initial snapshot phase. This will also be "begin" and "commit" if `include_transaction_markers` is enabled) +- lsn (the log sequence number in postgres) == Fields diff --git a/docs/modules/components/pages/inputs/postgres_cdc.adoc b/docs/modules/components/pages/inputs/postgres_cdc.adoc index fd366f117c..8bf208ac9b 100644 --- a/docs/modules/components/pages/inputs/postgres_cdc.adoc +++ b/docs/modules/components/pages/inputs/postgres_cdc.adoc @@ -101,9 +101,9 @@ Additionally, if `stream_snapshot` is set to true, then the existing data in the == Metadata This input adds the following metadata fields to each message: -- mode (Either "streaming" or "snapshot" indicating whether the message is part of a streaming operation or snapshot processing) - table (Name of the table that the message originated from) -- operation (Type of operation that generated the message: "insert", "update", or "delete". This will also be "begin" and "commit" if `include_transaction_markers` is enabled) +- operation (Type of operation that generated the message: "read", "insert", "update", or "delete". "read" is from messages that are read in the initial snapshot phase. This will also be "begin" and "commit" if `include_transaction_markers` is enabled) +- lsn (the log sequence number in postgres) == Fields