Skip to content

Commit

Permalink
Merge pull request #3063 from redpanda-data/mihaitodor-fix-panic-redp…
Browse files Browse the repository at this point in the history
…anda-migrator-output

Fix panic in redpanda_migrator output
  • Loading branch information
mihaitodor authored Dec 5, 2024
2 parents 8ae0c01 + cfd8161 commit a03a02a
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 3 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ All notable changes to this project will be documented in this file.

- The `pg_stream` input has been renamed to `postgres_cdc`. The old name will continue to function as an alias. (@rockwotj)

### Fixed

- The `redpanda_migrator_bundle` output no longer attempts to translate schema IDs when a schema registry is not configured. (@mihaitodor)

## 4.42.0 - 2024-12-02

### Added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ mapping: |
# Exclude metadata fields which start with `kafka_`
"^(?:[^k].*|k[^a].*|ka[^f].*|kaf[^k].*|kafk[^a].*|kafka[^_].*)"
]
}
},
"translate_schema_ids": this.schema_registry.length() != 0
}
)
Expand Down Expand Up @@ -172,6 +173,7 @@ tests:
metadata:
include_patterns:
- ^(?:[^k].*|k[^a].*|ka[^f].*|kaf[^k].*|kafk[^a].*|kafka[^_].*)
translate_schema_ids: true
processors:
- mapping: |
meta input_label = deleted()
Expand Down Expand Up @@ -234,6 +236,7 @@ tests:
metadata:
include_patterns:
- ^(?:[^k].*|k[^a].*|ka[^f].*|kaf[^k].*|kafk[^a].*|kafka[^_].*)
translate_schema_ids: false
processors:
- mapping: |
meta input_label = deleted()
Expand Down
4 changes: 2 additions & 2 deletions internal/impl/kafka/enterprise/redpanda_migrator_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (w *RedpandaMigratorWriter) Connect(ctx context.Context) error {
if res, ok := w.mgr.GetGeneric(w.schemaRegistryOutputResource); ok {
w.schemaRegistryOutput = res.(*schemaRegistryOutput)
} else {
return fmt.Errorf("schema_registry output resource %q not found", w.schemaRegistryOutputResource)
w.mgr.Logger().Warnf("schema_registry output resource %q not found; skipping schema ID translation", w.schemaRegistryOutputResource)
}
}

Expand All @@ -275,7 +275,7 @@ func (w *RedpandaMigratorWriter) WriteBatch(ctx context.Context, b service.Messa
}

var ch franz_sr.ConfluentHeader
if w.translateSchemaIDs {
if w.translateSchemaIDs && w.schemaRegistryOutput != nil {
for recordIdx, record := range records {
schemaID, _, err := ch.DecodeID(record.Value)
if err != nil {
Expand Down

0 comments on commit a03a02a

Please sign in to comment.