Skip to content

Commit

Permalink
PostgreSQL CDC Plugin (#2917)
Browse files Browse the repository at this point in the history
* Move repos into connect

* Add placeholders for logging and TODOs on panics

* feat(pgstream): added support for pgoutput native plugin

* feat(pgstream): added support for pgoutput native plugin

* chore(pg_stream): updated table filtering

* chore(): updated tests for pglogical stream

* chore(): fmt applied

* chore(): code re-org

* chore(): added temp. replication slot and removed outdated code

* chore(): fixed eslint errors && tests

* chore(): removed panics

* fix(): table name in snapshotter

* chore(): working on stream uncomited changes

* fix(postgres): correct order for message LSN ack

* chore(): removed log line

* chore(): working on metrics

* chore(): removed test case && working on monitor testing

* chore(): monitor testing

* chore(): added backward compatibility for postgresql

* chore(): updated tests for different pg versions && working on metrics

* chore(): added WAL lag streaming

* chore(): added snapshot metrics

* chore(): added snapshot metrics streaming

* chore(): added explicit value for snapshot batch size

* chore(): updated docs

* chore(): updated docs

* chore(): applieds golangci-lint notes

* chore(): working on faster snapshot processing

* chore(): experimenting with object pool

* Revert "chore(): experimenting with object pool"

This reverts commit 041a55c.

* chore(): use common pool to process snapshot

* chore(): added snapshot message rate counter

* chore(): working on batches

* fixed(): test

* fix(): metrics

* chore(): removed unused struct

* chore(): stabilised batches

* chore(): removed debug lines; fixed linter

* chore(): updated tls config field && small refactoring

* ref(): use context when create publication

* pgcdc: cleanup configuration

* By default we were just using a replication slot name of `rs_`.
* Cleanup description
* Fix typos

* pgcdc: simplify stream setup

Just have the user give us a DSN that is standard and our SQL* plugins
already expect this format. That fixes bugs we have with special
characters that need escaping, and generally simplfies setup.

Also fixes:
- Don't os.Exit, but bubble an error up
- Use provided context instead of context.Background
- Prevent SQL injection attacks in slot names

* more review feedback. This got to be a lot so just checkpointing so Vlad
can see where I am going.

* Chan cleanup WIP

Signed-off-by: Mihai Todor <[email protected]>

* chore(): addressed pull requests changes

* chore(): updated tests

* chore(): removed unused vars

* chore(): run make deps to fix ci pipeline

* fix(postgres_cdc): monitor tests

* chore(postgres_cdc): added integration test skip check

* fix(postgres_cdc): lint warnings

* chore(): specify monitoring && standby intervals via config

* chore(): removed redundant tests + deps

* chore(): updated docs

* pgstream: create batcher in foreground

* pgstream: only check for done once

* pgcdc: remove bool for operation

* pgcdc: update docs for mode

* pgcdc: validate slot names can't cause SQL injection

* pgcdc: use error type for error handling, not bool

* pgcdc: import sanitization code from pgx

We are forced to use the simple query protocol for pg in replication mode,
which means we need to sanitize stuff. Import some internal code from
pgx for that.

* pgcdc: add note about pk in snapshot reading

* pgcdc: properly sanitize query

* pgcdc add note about how waiting for commit is buggy

* pgcdc: drop unused param

* pgcdc: actually remove unused param

* pgcdc: update docs

* ref(): small code refactoring

* feat(): added max_parallel_snapshot_tables config field

* chore(): added pk ordering to consume snapshot

* fix(): enabled integration tests

* chore(): small fixes && pr notes

* chore(): updated docs && fixed lint

* chore(): revert integration tests

* chore(): added publication updates instead of re-creation

* pgcdc: prefix stat names

* pgcdc: remove lsnrestart field

* pgcdc: add a high watermark utility

* pgcdc: use watermark for log position

* pgcdc: remove layer of nesting from switch

* pgcdc: use typed duration fields

* pgcdc: fix waiting for txn ack

* pgcdc: dedup config fields

* pgcdc: fix config field defaults

* pgcdc: properly implement watermark

We need to be able to be cancelled if we never reach the watermark

* pgcdc: properly ack only on commit messages, once everything is
processed

* pgcdc: there are actually 3 handlers

* pgcdc: simplify plugin handling code

* pgcdc: fix randomized ID

uuid is invalid because we can't use dashes

* pgcdc: remove unused import

* pgcdc: always include mode

* pgcdc: fix period batching and cleanup logic

* pgcdc: fix lint error

* pgcdc: regen docs

* chore(): added +1 to standby update to follow postgresql requirements

* chore: goimports

* pgcdc: simplify shutdown in the input

Still need to simplify this in the internal logical_stream package, but
this is a first step

* pgcdc: localize the pg stream

To make lifetime semantics and handling ErrNotConnected better

* pgcdc: simplify internal flow control

Simplify the internal flow control of the logical stream by just
returning and handling errors at the top level.

* pgcdc: don't produce 0 messages

* pgcdc: rename stream uncommitted to batch transactions

* pgcdc: fix config name

* pgcdc: add some TODOs

* pgcdc: update docs

* pgcdc: review feedback

* pgcdc: cleanup monitor with periodic utility

* pgcdc: fmt

* pgcdc: check for non-zero duration

* chore(): sanitized queries && fixed tests

* chore(): removed wal2json support

* chore(): updated pgstream docs

* feat(): added support for composite primary keys

* pgcdc: mark as enterprise licensed

* chore(): applied make fmt

* pgcdc/snapshot: use context for cancellation

* pgcdc: fix primary key order by clause

* pgcdc: fix zero batch check

* update changelog

---------

Signed-off-by: Mihai Todor <[email protected]>
Co-authored-by: Ashley Jeffs <[email protected]>
Co-authored-by: Tyler Rockwood <[email protected]>
Co-authored-by: Mihai Todor <[email protected]>
  • Loading branch information
4 people authored Nov 18, 2024
1 parent 37eef63 commit 5f28795
Show file tree
Hide file tree
Showing 28 changed files with 6,716 additions and 3 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,16 @@ Changelog

All notable changes to this project will be documented in this file.

## 4.40.0 - TBD

### Added

- New `pg_stream` input supporting change data capture (CDC) from PostgreSQL (@le-vlad)

### Changed

- `snowflake_streaming` with `schema_evolution.enabled` set to true can now autocreate tables.

## 4.39.0 - 2024-11-07

### Added
Expand Down
Loading

0 comments on commit 5f28795

Please sign in to comment.