Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

examples: add a postgres replication example using CDC #3081

Merged
merged 1 commit into from
Dec 17, 2024

Conversation

rockwotj
Copy link
Collaborator

Note that we can't dynamically do anything with the schema at the
moment, we don't emit schema changes and creating the migration
statements would require a complicated sql_raw output.

Note that we can't dynamically do anything with the schema at the
moment, we don't emit schema changes and creating the migration
statements would require a complicated sql_raw output.
Copy link
Collaborator

@mihaitodor mihaitodor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just one small nitpick. Feel free to :shipit:

config/examples/cdc_replication.yaml Show resolved Hide resolved
@ligfx
Copy link

ligfx commented Dec 16, 2024

Two thoughts:

  • If @operation == "begin" || @operation == "commit" isn't handled explicitly (i.e. they don't care about restraints, as many analytical warehouses don't), what do those messages look like? Will they mess up the following SQL code or are they just empty messages?

  • Relying on @operation to choose INSERT and UPDATE is in general not safe and will do weird things if messages come out of order or get replayed. Usually, people will either do INSERT ... ON CONFLICT or MERGE to perform an upsert (and that's what the Kafka Connect JDBC Sink connector does in upsert mode).

    Possible approach using MERGE (requires Postgres >= 15):

sql_raw:
  driver: postgres
  dsn: postgres://michaelmaltese@localhost:5432/michaelmaltese?sslmode=disable
  args_mapping: |
    root = [ this.id, this.name, this.updated_at ]
  query: |
    MERGE INTO journey_apps3_cdc AS old
    USING (SELECT
      $1::integer id,
      $2 name,
      $3::timestamptz updated_at,
    ) AS new
    ON new.id = old.id
    WHEN MATCHED THEN
      UPDATE SET
        name = case when new.updated_at > old.updated_at OR old.updated_at is null THEN new.name ELSE old.name END,
        updated_at = greatest(new.updated_at, old.updated_at)
    WHEN NOT MATCHED THEN
      INSERT (id, name, updated_at) VALUES (
        new.id,
        new.name,
        new.updated_at
      )
    ;

Copy link

@ligfx ligfx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upsert comment above. Sorry, I'm a bit rusty with GitHub!

@rockwotj
Copy link
Collaborator Author

Will they mess up the following SQL code or are they just empty messages?

Yes they are empty and will mess up stuff downstream.

@rockwotj
Copy link
Collaborator Author

If you don't have foreign key restraints than batching by transaction is quite useless indeed.

@rockwotj rockwotj merged commit e63612a into main Dec 17, 2024
4 checks passed
@rockwotj rockwotj deleted the cdc_replication branch December 17, 2024 05:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants