From 25de0fb04060857cc85493a59d0ba301ad00a998 Mon Sep 17 00:00:00 2001 From: Mihai Todor Date: Fri, 6 Dec 2024 17:52:45 +0000 Subject: [PATCH] Fix schema ID translation in `redpanda_migrator_bundle` output We want to skip this when `translate_schema_ids: false` even if there's `schema_registry` component configured under `redpanda_migrator_bundle`. Signed-off-by: Mihai Todor --- CHANGELOG.md | 1 + .../redpanda_migrator_bundle_output.tmpl.yaml | 136 +++++++++++++++++- 2 files changed, 136 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 83d72ba1f5..15bb54f47c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ All notable changes to this project will be documented in this file. ### Fixed - Trial Redpanda Enterprise licenses are now considered valid. (@Jeffail) +- The `redpanda_migrator_bundle` output now skips schema ID translation when `translate_schema_ids: false` and `schema_registry` is configured. (@mihaitodor) ## 4.43.0 - 2024-12-05 diff --git a/internal/impl/kafka/enterprise/redpanda_migrator_bundle_output.tmpl.yaml b/internal/impl/kafka/enterprise/redpanda_migrator_bundle_output.tmpl.yaml index 99873d77df..3e04ed91d1 100644 --- a/internal/impl/kafka/enterprise/redpanda_migrator_bundle_output.tmpl.yaml +++ b/internal/impl/kafka/enterprise/redpanda_migrator_bundle_output.tmpl.yaml @@ -51,7 +51,7 @@ mapping: | "^(?:[^k].*|k[^a].*|ka[^f].*|kaf[^k].*|kafk[^a].*|kafka[^_].*)" ] }, - "translate_schema_ids": this.schema_registry.length() != 0 + "translate_schema_ids": this.redpanda_migrator.translate_schema_ids.or(true) && this.schema_registry.length() != 0 } ) @@ -212,6 +212,140 @@ tests: - output: reject: ${! @fallback_error } + - name: Migrate messages, offsets and schemas but skip schema ID translation + config: + redpanda_migrator: + seed_brokers: [ "127.0.0.1:9092" ] + translate_schema_ids: false + max_in_flight: 1 + schema_registry: + url: http://localhost:8081 + max_in_flight: 1 + + expected: + switch: + cases: + - check: metadata("input_label") == "redpanda_migrator" + output: + fallback: + - redpanda_migrator: + key: ${! metadata("kafka_key") } + max_in_flight: 1 + partition: ${! metadata("kafka_partition").or(throw("missing kafka_partition metadata")) } + partitioner: manual + seed_brokers: + - 127.0.0.1:9092 + timestamp_ms: ${! metadata("kafka_timestamp_ms").or(timestamp_unix_milli()) } + topic: ${! metadata("kafka_topic").or(throw("missing kafka_topic metadata")) } + metadata: + include_patterns: + - ^(?:[^k].*|k[^a].*|ka[^f].*|kaf[^k].*|kafk[^a].*|kafka[^_].*) + translate_schema_ids: false + processors: + - mapping: | + meta input_label = deleted() + - drop: {} + processors: + - log: + message: | + Dropping message: ${! content() } / ${! metadata() } + - check: metadata("input_label") == "redpanda_migrator_offsets" + output: + fallback: + - redpanda_migrator_offsets: + seed_brokers: + - 127.0.0.1:9092 + - drop: {} + processors: + - log: + message: | + Dropping message: ${! content() } / ${! metadata() } + - check: metadata("input_label") == "schema_registry" + output: + fallback: + - schema_registry: + subject: ${! @schema_registry_subject } + url: http://localhost:8081 + max_in_flight: 1 + - switch: + cases: + - check: '@fallback_error == "request returned status: 422"' + output: + drop: {} + processors: + - log: + message: | + Subject '${! @schema_registry_subject }' version ${! @schema_registry_version } already has schema: ${! content() } + - output: + reject: ${! @fallback_error } + + - name: Migrate messages, offsets and schemas and do schema ID translation when requested explicitly + config: + redpanda_migrator: + seed_brokers: [ "127.0.0.1:9092" ] + translate_schema_ids: true + max_in_flight: 1 + schema_registry: + url: http://localhost:8081 + max_in_flight: 1 + + expected: + switch: + cases: + - check: metadata("input_label") == "redpanda_migrator" + output: + fallback: + - redpanda_migrator: + key: ${! metadata("kafka_key") } + max_in_flight: 1 + partition: ${! metadata("kafka_partition").or(throw("missing kafka_partition metadata")) } + partitioner: manual + seed_brokers: + - 127.0.0.1:9092 + timestamp_ms: ${! metadata("kafka_timestamp_ms").or(timestamp_unix_milli()) } + topic: ${! metadata("kafka_topic").or(throw("missing kafka_topic metadata")) } + metadata: + include_patterns: + - ^(?:[^k].*|k[^a].*|ka[^f].*|kaf[^k].*|kafk[^a].*|kafka[^_].*) + translate_schema_ids: true + processors: + - mapping: | + meta input_label = deleted() + - drop: {} + processors: + - log: + message: | + Dropping message: ${! content() } / ${! metadata() } + - check: metadata("input_label") == "redpanda_migrator_offsets" + output: + fallback: + - redpanda_migrator_offsets: + seed_brokers: + - 127.0.0.1:9092 + - drop: {} + processors: + - log: + message: | + Dropping message: ${! content() } / ${! metadata() } + - check: metadata("input_label") == "schema_registry" + output: + fallback: + - schema_registry: + subject: ${! @schema_registry_subject } + url: http://localhost:8081 + max_in_flight: 1 + - switch: + cases: + - check: '@fallback_error == "request returned status: 422"' + output: + drop: {} + processors: + - log: + message: | + Subject '${! @schema_registry_subject }' version ${! @schema_registry_version } already has schema: ${! content() } + - output: + reject: ${! @fallback_error } + - name: Migrate only messages and offsets config: redpanda_migrator: