Skip to content

Commit

Permalink
Fix postgres_cdc input
Browse files Browse the repository at this point in the history
Allow quoted identifiers for the table names

Signed-off-by: Mihai Todor <[email protected]>
  • Loading branch information
mihaitodor committed Dec 13, 2024
1 parent 65aadd2 commit c8148a6
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ All notable changes to this project will be documented in this file.
### Fixed

- `gcp_bigquery` output with parquet format no longer returns errors incorrectly. (@rockwotj)
- `postgres_cdc` input now allows quoted identifiers for the table names. (@mihaitodor)

## 4.43.1 - 2024-12-09

Expand Down
2 changes: 1 addition & 1 deletion internal/impl/postgresql/pglogicalstream/pglogrepl.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ func CreatePublication(ctx context.Context, conn *pgconn.PgConn, publicationName

// remove tables from publication
for _, dropTable := range tablesToRemoveFromPublication {
sq, err := sanitize.SQLQuery(fmt.Sprintf("ALTER PUBLICATION %s DROP TABLE %s;", publicationName, dropTable))
sq, err := sanitize.SQLQuery(fmt.Sprintf(`ALTER PUBLICATION %s DROP TABLE "%s";`, publicationName, dropTable))
if err != nil {
return fmt.Errorf("failed to sanitize drop table query: %w", err)
}
Expand Down
12 changes: 12 additions & 0 deletions internal/impl/postgresql/pglogicalstream/sanitize/sanitize.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,18 @@ func SQLQuery(sql string, args ...any) (string, error) {
// ValidatePostgresIdentifier checks if a string is a valid PostgreSQL identifier
// This follows PostgreSQL's standard naming rules
func ValidatePostgresIdentifier(name string) error {
if parts := strings.Split(name, "."); len(parts) == 2 {
if err := ValidatePostgresIdentifier(parts[0]); err != nil {
return fmt.Errorf("invalid schema identifier: %s", err)
}
name = parts[1]
}

// Strip quotes if they are present
if strings.HasPrefix(name, "\"") && strings.HasSuffix(name, "\"") {
name = strings.Trim(name, "\"")
}

if len(name) == 0 {
return errors.New("empty identifier is not allowed")
}
Expand Down

0 comments on commit c8148a6

Please sign in to comment.