Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

examples: add a postgres replication example using CDC #3081

Merged
merged 1 commit into from
Dec 17, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions config/examples/cdc_replication.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
input:
postgres_cdc:
dsn: postgres://me:foobar@localhost:5432?sslmode=disable
include_transaction_markers: true
slot_name: test_slot_native_decoder
stream_snapshot: true
schema: public
tables: [my_src_table]
# Group by transaction, each message batch is all rows changed in a transaction
# this might be massive, but might be required for foreign key constraints
batching:
check: '@operation == "commit"'
period: 10s
rockwotj marked this conversation as resolved.
Show resolved Hide resolved
processors:
# But drop the placeholder messages for start/end transaction
- mapping: |
root = if @operation == "begin" || @operation == "commit" {
deleted()
} else {
this
}
output:
# Dispatch the write based on the operation metadata
switch:
strict_mode: true
cases:
- check: '@operation == "read" || @operation == "insert"'
output:
sql_insert:
driver: postgres
dsn: postgres://me:foobar@localhost:5432?sslmode=disable
table: my_dst_table
columns: [id, foo, bar]
args_mapping: root = [this.id, this.foo, this.bar]
init_statement: |
CREATE TABLE IF NOT EXISTS my_dst_table (
id serial PRIMARY KEY,
foo text,
bar timestamp
);
- check: '@operation == "update"'
output:
sql_raw:
driver: postgres
dsn: postgres://me:foobar@localhost:5432?sslmode=disable
query: UPDATE my_dst_table SET foo = $1, bar = $2 WHERE id = $3
args_mapping: root = [this.foo, this.bar, this.id]
- check: '@operation == "delete"'
output:
sql_raw:
driver: postgres
dsn: postgres://me:foobar@localhost:5432?sslmode=disable
query: DELETE FROM my_dst_table WHERE id = $1
args_mapping: root = [this.id]
Loading