Skip to content

Commit

Permalink
Merge pull request #3026 from redpanda-data/mihaitodor-add-redpanda-m…
Browse files Browse the repository at this point in the history
…igrator-offset-metadata

Refactor Redpanda Migrator components
  • Loading branch information
mihaitodor authored Jan 15, 2025
2 parents d32a89f + 037d93b commit e9a056c
Show file tree
Hide file tree
Showing 30 changed files with 2,207 additions and 1,178 deletions.
47 changes: 35 additions & 12 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,50 @@ All notable changes to this project will be documented in this file.

## 4.45.0 - TBD

### Fixed

- The `code` and `file` fields on the `javascript` processor docs no longer erroneously mention interpolation support. (@mihaitodor)
- The `postgres_cdc` now correctly handles `null` values. (@rockwotj)
- The `redpanda_migrator` output no longer rejects messages if it can't perform schema ID translation. (@mihaitodor)
- The `redpanda_migrator` input no longer converts the kafka key to string. (@mihaitodor)

### Added

- `aws_sqs` now has a `max_outstanding` field to prevent unbounded memory usage. (@rockwotj)
- `aws_sqs` input now has a `max_outstanding` field to prevent unbounded memory usage. (@rockwotj)
- `avro` scanner now emits metadata for the Avro schema it used along with the schema fingerprint. (@rockwotj)
- Field `content_type` added to the `amqp_1` output. (@timo102)
- `kafka_franz`, `ockam_kafka`, `redpanda`, `redpanda_common`, `redpanda_migrator` now support `fetch_max_wait` configuration field.
- `snowpipe_streaming` now supports interpolating table names. (@rockwotj)
- `snowpipe_streaming` now supports interpolating channel names. (@rockwotj)
- `snowpipe_streaming` now supports exactly once delivery using `offset_token`. (@rockwotj)
- `ollama_chat` now supports tool calling. (@rockwotj)
- New `ollama_moderation` which allows using LlamaGuard or ShieldGemma to check if LLM responses are safe. (@rockwotj)
- Field `fetch_max_wait` added to the `kafka_franz`, `ockam_kafka`, `redpanda`, `redpanda_common` and `redpanda_migrator` inputs. (@birdayz)
- `snowpipe_streaming` output now supports interpolating table names. (@rockwotj)
- `snowpipe_streaming` output now supports interpolating channel names. (@rockwotj)
- `snowpipe_streaming` output now supports exactly once delivery using `offset_token`. (@rockwotj)
- `ollama_chat` processor now supports tool calling. (@rockwotj)
- New `ollama_moderation` processor which allows using LlamaGuard or ShieldGemma to check if LLM responses are safe. (@rockwotj)
- Field `queries` added to `sql_raw` processor and output to support rummong multiple SQL statements transactionally. (@rockwotj)
- New `redpanda_migrator_offsets` input. (@mihaitodor)
- Fields `offset_topic`, `offset_group`, `offset_partition`, `offset_commit_timestamp` and `offset_metadata` added to the `redpanda_migrator_offsets` output. (@mihaitodor)
- Field `topic_lag_refresh_period` added to the `redpanda` and `redpanda_common` inputs. (@mihaitodor)
- Metric `redpanda_lag` now emitted by the `redpanda` and `redpanda_common` inputs. (@mihaitodor)
- Metadata `kafka_lag` now emitted by the `redpanda` and `redpanda_common` inputs. (@mihaitodor)
- The `redpanda_migrator_bundle` input and output now set labels for their subcomponents. (@mihaitodor)
- (Benthos) Field `label` added to the template tests definitions. (@mihaitodor)
- (Benthos) Metadata field `label` can now be utilized within a template's `mapping` field to access the label that is associated with the template instantiation in a config. (@mihaitodor)
- (Benthos) `bloblang` scalar type added to template fields. (@mihaitodor)
- (Benthos) Go API: Method `SetOutputBrokerPattern` added to the `StreamBuilder` type. (@mihaitodor)
- (Benthos) New `error_source_name`, `error_source_label` and `error_source_path` bloblang functions. (@mihaitodor)
- (Benthos) Flag `--verbose` added to the `benthos lint` and `benthos template lint` commands. (@mihaitodor)

### Fixed
### Changed

- The `code` and `file` fields on the `javascript` processor docs no longer erroneously mention interpolation support. (@mihaitodor)
- The `postgres_cdc` now correctly handles `null` values. (@rockwotj)
- Fix an issue in `aws_sqs` with refreshing in-flight message leases which could prevent acks from processed. (@rockwotj)
- Fix an issue with `postgres_cdc` with TOAST values not being propagated with `REPLICA IDENTITY FULL`. (@rockwotj)
- Fix a initial snapshot streaming consistency issue with `postgres_cdc`. (@rockwotj)
- Fix bug in `sftp` input where the last file was not deleted when `watcher` and `delete_on_finish` were enabled (@ooesili)
- Fix bug in `sftp` input where the last file was not deleted when `watcher` and `delete_on_finish` were enabled. (@ooesili)
- Fields `batch_size`, `multi_header`, `replication_factor`, `replication_factor_override` and `output_resource` for the `redpanda_migrator` input are now deprecated. (@mihaitodor)
- Fields `kafka_key` and `max_in_flight` for the `redpanda_migrator_offsets` output are now deprecated. (@mihaitodor)
- Field `batching` for the `redpanda_migrator` output is now deprecated. (@mihaitodor)
- The `redpanda_migrator` input no longer emits tombstone messages. (@mihaitodor)
- (Benthos) The `branch` processor no longer emits an entry in the log at error level when the child processors throw errors. (@mihaitodor)
- (Benthos) Streams and the StreamBuilder API now use `reject` by default when no output is specified in the config and `stdout` isn't registered (for example when the `io` components are not imported). (@mihaitodor)

## 4.44.0 - 2024-12-13

Expand Down Expand Up @@ -72,7 +95,7 @@ All notable changes to this project will be documented in this file.

- Add support for `spanner` driver to SQL plugins. (@yufeng-deng)
- Add support for complex database types (JSONB, TEXT[], INET, TSVECTOR, TSRANGE, POINT, INTEGER[]) for `pg_stream` input. (@le-vlad)
- Add support for Parquet files to `bigquery` output (@rockwotj)
- Add support for Parquet files to `bigquery` output. (@rockwotj)
- (Benthos) New `exists` operator added to the `cache` processor. (@mihaitodor)
- New CLI flag `redpanda-license` added as an alternative way to specify a Redpanda license. (@Jeffail)

Expand Down
15 changes: 15 additions & 0 deletions docs/modules/components/pages/inputs/redpanda.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ input:
consumer_group: "" # No default (optional)
commit_period: 5s
partition_buffer_bytes: 1MB
topic_lag_refresh_period: 5s
auto_replay_nacks: true
```
Expand Down Expand Up @@ -115,6 +116,10 @@ output:
Records are processed and delivered from each partition in batches as received from brokers. These batch sizes are therefore dynamically sized in order to optimise throughput, but can be tuned with the config fields `fetch_max_partition_bytes` and `fetch_max_bytes`. Batches can be further broken down using the xref:components:processors/split.adoc[`split`] processor.
== Metrics
Emits a `redpanda_lag` metric with `topic` and `partition` labels for each consumed topic.
== Metadata
This input adds the following metadata fields to each message:
Expand All @@ -124,6 +129,7 @@ This input adds the following metadata fields to each message:
- kafka_topic
- kafka_partition
- kafka_offset
- kafka_lag
- kafka_timestamp_ms
- kafka_timestamp_unix
- kafka_tombstone_message
Expand Down Expand Up @@ -645,6 +651,15 @@ A buffer size (in bytes) for each consumed partition, allowing records to be que
*Default*: `"1MB"`
=== `topic_lag_refresh_period`
The period of time between each topic lag refresh cycle.
*Type*: `string`
*Default*: `"5s"`
=== `auto_replay_nacks`
Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to `false` these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation.
Expand Down
15 changes: 15 additions & 0 deletions docs/modules/components/pages/inputs/redpanda_common.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ input:
consumer_group: "" # No default (optional)
commit_period: 5s
partition_buffer_bytes: 1MB
topic_lag_refresh_period: 5s
auto_replay_nacks: true
```
Expand Down Expand Up @@ -101,6 +102,10 @@ output:
Records are processed and delivered from each partition in batches as received from brokers. These batch sizes are therefore dynamically sized in order to optimise throughput, but can be tuned with the config fields `fetch_max_partition_bytes` and `fetch_max_bytes`. Batches can be further broken down using the xref:components:processors/split.adoc[`split`] processor.
== Metrics
Emits a `redpanda_lag` metric with `topic` and `partition` labels for each consumed topic.
== Metadata
This input adds the following metadata fields to each message:
Expand All @@ -110,6 +115,7 @@ This input adds the following metadata fields to each message:
- kafka_topic
- kafka_partition
- kafka_offset
- kafka_lag
- kafka_timestamp_ms
- kafka_timestamp_unix
- kafka_tombstone_message
Expand Down Expand Up @@ -245,6 +251,15 @@ A buffer size (in bytes) for each consumed partition, allowing records to be que
*Default*: `"1MB"`
=== `topic_lag_refresh_period`
The period of time between each topic lag refresh cycle.
*Type*: `string`
*Default*: `"5s"`
=== `auto_replay_nacks`
Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to `false` these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation.
Expand Down
60 changes: 10 additions & 50 deletions docs/modules/components/pages/inputs/redpanda_migrator.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -77,25 +77,21 @@ input:
fetch_max_partition_bytes: 1MiB
consumer_group: "" # No default (optional)
commit_period: 5s
multi_header: false
batch_size: 1024
auto_replay_nacks: true
partition_buffer_bytes: 1MB
topic_lag_refresh_period: 5s
output_resource: redpanda_migrator_output
replication_factor_override: true
replication_factor: 3
auto_replay_nacks: true
```
--
======
Reads a batch of messages from a Kafka broker and waits for the output to acknowledge the writes before updating the Kafka consumer group offset.
This input should be used in combination with a `redpanda_migrator` output which it can query for existing topics.
This input should be used in combination with a `redpanda_migrator` output.
When a consumer group is specified this input consumes one or more topics where partitions will automatically balance across any other connected clients with the same consumer group. When a consumer group is not specified topics can either be consumed in their entirety or with explicit partitions.
It attempts to create all selected topics it along with their associated ACLs in the broker that the `redpanda_migrator` output points to identified by the label specified in `output_resource`.
It provides the same delivery guarantees and ordering semantics as the `redpanda` input.
== Metrics
Expand Down Expand Up @@ -623,32 +619,14 @@ The period of time between each commit of the current partition offsets. Offsets
*Default*: `"5s"`
=== `multi_header`
Decode headers into lists to allow handling of multiple values with the same key
*Type*: `bool`
*Default*: `false`
=== `batch_size`
The maximum number of messages that should be accumulated into each batch.
*Type*: `int`
=== `partition_buffer_bytes`
*Default*: `1024`
=== `auto_replay_nacks`
Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to `false` these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation.
A buffer size (in bytes) for each consumed partition, allowing records to be queued internally before flushing. Increasing this may improve throughput at the cost of higher memory utilisation. Note that each buffer can grow slightly beyond this value.
*Type*: `bool`
*Type*: `string`
*Default*: `true`
*Default*: `"1MB"`
=== `topic_lag_refresh_period`
Expand All @@ -659,31 +637,13 @@ The period of time between each topic lag refresh cycle.
*Default*: `"5s"`
=== `output_resource`
The label of the redpanda_migrator output in which the currently selected topics need to be created before attempting to read messages.
*Type*: `string`
*Default*: `"redpanda_migrator_output"`
=== `replication_factor_override`
=== `auto_replay_nacks`
Use the specified replication factor when creating topics.
Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to `false` these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation.
*Type*: `bool`
*Default*: `true`
=== `replication_factor`
Replication factor for created topics. This is only used when `replication_factor_override` is set to `true`.
*Type*: `int`
*Default*: `3`
Loading

0 comments on commit e9a056c

Please sign in to comment.