Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Redpanda Migrator components #3026

Merged
merged 27 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
75a0800
Do not reject messages for which we can't perform schema ID translation
mihaitodor Dec 6, 2024
1c7c440
Update `redpanda_migrator` input to not convert keys to string
mihaitodor Dec 8, 2024
3da7b0d
Add Redpanda Migrator offset metadata
mihaitodor Nov 21, 2024
0fec988
Fix error message in `redpanda_migrator_offsets` input
mihaitodor Dec 8, 2024
27fbecf
Make some more changes to the integration test
mihaitodor Dec 6, 2024
8904bd2
Switch the `redpanda_migrator` to the new ack mechanism
mihaitodor Dec 11, 2024
2cc32c8
Check for cluster connectivity in the `kafka_franz` input
mihaitodor Dec 16, 2024
ce5d4bb
Update github.com/twmb/franz-go
mihaitodor Dec 12, 2024
d714b35
Make sure we exit the preflight hook if the context is cancelled
mihaitodor Dec 12, 2024
373506c
Cleanup `redpanda_migrator_offsets` input
mihaitodor Dec 16, 2024
fda303d
Cleanup NewFranzReaderOrderedFromConfig constructor
mihaitodor Dec 31, 2024
cd2422e
Refactor Migrator integration test
mihaitodor Dec 12, 2024
634b069
Fix Migrator integration test
mihaitodor Dec 16, 2024
ca9f984
Add labels to Redpanda Migrator bundle subcomponents
mihaitodor Dec 31, 2024
6a3dd6c
Don't emit tombstone messages from redpanda_migrator input
mihaitodor Dec 31, 2024
b53ee62
Update info.csv
mihaitodor Dec 12, 2024
6d31cc3
Address review feedback
mihaitodor Jan 4, 2025
47bc779
Refactor matchesTopic helper as suggested by Tyler
mihaitodor Jan 12, 2025
6fd03aa
Simplify Migrator logic
mihaitodor Jan 13, 2025
ef9736e
Update changelog
mihaitodor Dec 12, 2024
6d5e681
Rework recordToMessageFn to return bool instead of error
mihaitodor Jan 13, 2025
adab534
Remove hooks from FranzReaderOrdered
mihaitodor Jan 13, 2025
cd990b9
Refactor FranzWriter hooks
mihaitodor Jan 13, 2025
6933aaf
Update the Benthos framework to v4.43.0
mihaitodor Jan 13, 2025
b292c36
Enhance Kafka Migrator integration test
mihaitodor Jan 14, 2025
9f67fea
Remove topic filter log
mihaitodor Jan 14, 2025
037d93b
Create topics when getting the first message in the Migrator output
mihaitodor Jan 15, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 35 additions & 12 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,50 @@ All notable changes to this project will be documented in this file.

## 4.45.0 - TBD

### Fixed

- The `code` and `file` fields on the `javascript` processor docs no longer erroneously mention interpolation support. (@mihaitodor)
- The `postgres_cdc` now correctly handles `null` values. (@rockwotj)
- The `redpanda_migrator` output no longer rejects messages if it can't perform schema ID translation. (@mihaitodor)
- The `redpanda_migrator` input no longer converts the kafka key to string. (@mihaitodor)

### Added

- `aws_sqs` now has a `max_outstanding` field to prevent unbounded memory usage. (@rockwotj)
- `aws_sqs` input now has a `max_outstanding` field to prevent unbounded memory usage. (@rockwotj)
- `avro` scanner now emits metadata for the Avro schema it used along with the schema fingerprint. (@rockwotj)
- Field `content_type` added to the `amqp_1` output. (@timo102)
- `kafka_franz`, `ockam_kafka`, `redpanda`, `redpanda_common`, `redpanda_migrator` now support `fetch_max_wait` configuration field.
- `snowpipe_streaming` now supports interpolating table names. (@rockwotj)
- `snowpipe_streaming` now supports interpolating channel names. (@rockwotj)
- `snowpipe_streaming` now supports exactly once delivery using `offset_token`. (@rockwotj)
- `ollama_chat` now supports tool calling. (@rockwotj)
- New `ollama_moderation` which allows using LlamaGuard or ShieldGemma to check if LLM responses are safe. (@rockwotj)
- Field `fetch_max_wait` added to the `kafka_franz`, `ockam_kafka`, `redpanda`, `redpanda_common` and `redpanda_migrator` inputs. (@birdayz)
- `snowpipe_streaming` output now supports interpolating table names. (@rockwotj)
- `snowpipe_streaming` output now supports interpolating channel names. (@rockwotj)
- `snowpipe_streaming` output now supports exactly once delivery using `offset_token`. (@rockwotj)
- `ollama_chat` processor now supports tool calling. (@rockwotj)
- New `ollama_moderation` processor which allows using LlamaGuard or ShieldGemma to check if LLM responses are safe. (@rockwotj)
- Field `queries` added to `sql_raw` processor and output to support rummong multiple SQL statements transactionally. (@rockwotj)
- New `redpanda_migrator_offsets` input. (@mihaitodor)
- Fields `offset_topic`, `offset_group`, `offset_partition`, `offset_commit_timestamp` and `offset_metadata` added to the `redpanda_migrator_offsets` output. (@mihaitodor)
- Field `topic_lag_refresh_period` added to the `redpanda` and `redpanda_common` inputs. (@mihaitodor)
- Metric `redpanda_lag` now emitted by the `redpanda` and `redpanda_common` inputs. (@mihaitodor)
- Metadata `kafka_lag` now emitted by the `redpanda` and `redpanda_common` inputs. (@mihaitodor)
- The `redpanda_migrator_bundle` input and output now set labels for their subcomponents. (@mihaitodor)
- (Benthos) Field `label` added to the template tests definitions. (@mihaitodor)
- (Benthos) Metadata field `label` can now be utilized within a template's `mapping` field to access the label that is associated with the template instantiation in a config. (@mihaitodor)
- (Benthos) `bloblang` scalar type added to template fields. (@mihaitodor)
- (Benthos) Go API: Method `SetOutputBrokerPattern` added to the `StreamBuilder` type. (@mihaitodor)
- (Benthos) New `error_source_name`, `error_source_label` and `error_source_path` bloblang functions. (@mihaitodor)
- (Benthos) Flag `--verbose` added to the `benthos lint` and `benthos template lint` commands. (@mihaitodor)

### Fixed
### Changed

- The `code` and `file` fields on the `javascript` processor docs no longer erroneously mention interpolation support. (@mihaitodor)
- The `postgres_cdc` now correctly handles `null` values. (@rockwotj)
- Fix an issue in `aws_sqs` with refreshing in-flight message leases which could prevent acks from processed. (@rockwotj)
- Fix an issue with `postgres_cdc` with TOAST values not being propagated with `REPLICA IDENTITY FULL`. (@rockwotj)
- Fix a initial snapshot streaming consistency issue with `postgres_cdc`. (@rockwotj)
- Fix bug in `sftp` input where the last file was not deleted when `watcher` and `delete_on_finish` were enabled (@ooesili)
- Fix bug in `sftp` input where the last file was not deleted when `watcher` and `delete_on_finish` were enabled. (@ooesili)
- Fields `batch_size`, `multi_header`, `replication_factor`, `replication_factor_override` and `output_resource` for the `redpanda_migrator` input are now deprecated. (@mihaitodor)
- Fields `kafka_key` and `max_in_flight` for the `redpanda_migrator_offsets` output are now deprecated. (@mihaitodor)
- Field `batching` for the `redpanda_migrator` output is now deprecated. (@mihaitodor)
- The `redpanda_migrator` input no longer emits tombstone messages. (@mihaitodor)
- (Benthos) The `branch` processor no longer emits an entry in the log at error level when the child processors throw errors. (@mihaitodor)
- (Benthos) Streams and the StreamBuilder API now use `reject` by default when no output is specified in the config and `stdout` isn't registered (for example when the `io` components are not imported). (@mihaitodor)

## 4.44.0 - 2024-12-13

Expand Down Expand Up @@ -72,7 +95,7 @@ All notable changes to this project will be documented in this file.

- Add support for `spanner` driver to SQL plugins. (@yufeng-deng)
- Add support for complex database types (JSONB, TEXT[], INET, TSVECTOR, TSRANGE, POINT, INTEGER[]) for `pg_stream` input. (@le-vlad)
- Add support for Parquet files to `bigquery` output (@rockwotj)
- Add support for Parquet files to `bigquery` output. (@rockwotj)
- (Benthos) New `exists` operator added to the `cache` processor. (@mihaitodor)
- New CLI flag `redpanda-license` added as an alternative way to specify a Redpanda license. (@Jeffail)

Expand Down
15 changes: 15 additions & 0 deletions docs/modules/components/pages/inputs/redpanda.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ input:
consumer_group: "" # No default (optional)
commit_period: 5s
partition_buffer_bytes: 1MB
topic_lag_refresh_period: 5s
auto_replay_nacks: true
```

Expand Down Expand Up @@ -115,6 +116,10 @@ output:

Records are processed and delivered from each partition in batches as received from brokers. These batch sizes are therefore dynamically sized in order to optimise throughput, but can be tuned with the config fields `fetch_max_partition_bytes` and `fetch_max_bytes`. Batches can be further broken down using the xref:components:processors/split.adoc[`split`] processor.

== Metrics

Emits a `redpanda_lag` metric with `topic` and `partition` labels for each consumed topic.

== Metadata

This input adds the following metadata fields to each message:
Expand All @@ -124,6 +129,7 @@ This input adds the following metadata fields to each message:
- kafka_topic
- kafka_partition
- kafka_offset
- kafka_lag
- kafka_timestamp_ms
- kafka_timestamp_unix
- kafka_tombstone_message
Expand Down Expand Up @@ -645,6 +651,15 @@ A buffer size (in bytes) for each consumed partition, allowing records to be que

*Default*: `"1MB"`

=== `topic_lag_refresh_period`

The period of time between each topic lag refresh cycle.


*Type*: `string`

*Default*: `"5s"`

=== `auto_replay_nacks`

Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to `false` these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation.
Expand Down
15 changes: 15 additions & 0 deletions docs/modules/components/pages/inputs/redpanda_common.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ input:
consumer_group: "" # No default (optional)
commit_period: 5s
partition_buffer_bytes: 1MB
topic_lag_refresh_period: 5s
auto_replay_nacks: true
```

Expand Down Expand Up @@ -101,6 +102,10 @@ output:

Records are processed and delivered from each partition in batches as received from brokers. These batch sizes are therefore dynamically sized in order to optimise throughput, but can be tuned with the config fields `fetch_max_partition_bytes` and `fetch_max_bytes`. Batches can be further broken down using the xref:components:processors/split.adoc[`split`] processor.

== Metrics

Emits a `redpanda_lag` metric with `topic` and `partition` labels for each consumed topic.

== Metadata

This input adds the following metadata fields to each message:
Expand All @@ -110,6 +115,7 @@ This input adds the following metadata fields to each message:
- kafka_topic
- kafka_partition
- kafka_offset
- kafka_lag
- kafka_timestamp_ms
- kafka_timestamp_unix
- kafka_tombstone_message
Expand Down Expand Up @@ -245,6 +251,15 @@ A buffer size (in bytes) for each consumed partition, allowing records to be que

*Default*: `"1MB"`

=== `topic_lag_refresh_period`

The period of time between each topic lag refresh cycle.


*Type*: `string`

*Default*: `"5s"`

=== `auto_replay_nacks`

Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to `false` these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation.
Expand Down
60 changes: 10 additions & 50 deletions docs/modules/components/pages/inputs/redpanda_migrator.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -77,25 +77,21 @@ input:
fetch_max_partition_bytes: 1MiB
consumer_group: "" # No default (optional)
commit_period: 5s
multi_header: false
batch_size: 1024
auto_replay_nacks: true
partition_buffer_bytes: 1MB
topic_lag_refresh_period: 5s
output_resource: redpanda_migrator_output
replication_factor_override: true
replication_factor: 3
auto_replay_nacks: true
```

--
======

Reads a batch of messages from a Kafka broker and waits for the output to acknowledge the writes before updating the Kafka consumer group offset.

This input should be used in combination with a `redpanda_migrator` output which it can query for existing topics.
This input should be used in combination with a `redpanda_migrator` output.

When a consumer group is specified this input consumes one or more topics where partitions will automatically balance across any other connected clients with the same consumer group. When a consumer group is not specified topics can either be consumed in their entirety or with explicit partitions.

It attempts to create all selected topics it along with their associated ACLs in the broker that the `redpanda_migrator` output points to identified by the label specified in `output_resource`.
It provides the same delivery guarantees and ordering semantics as the `redpanda` input.

== Metrics

Expand Down Expand Up @@ -623,32 +619,14 @@ The period of time between each commit of the current partition offsets. Offsets

*Default*: `"5s"`

=== `multi_header`

Decode headers into lists to allow handling of multiple values with the same key


*Type*: `bool`

*Default*: `false`

=== `batch_size`

The maximum number of messages that should be accumulated into each batch.


*Type*: `int`
=== `partition_buffer_bytes`

*Default*: `1024`

=== `auto_replay_nacks`

Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to `false` these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation.
A buffer size (in bytes) for each consumed partition, allowing records to be queued internally before flushing. Increasing this may improve throughput at the cost of higher memory utilisation. Note that each buffer can grow slightly beyond this value.


*Type*: `bool`
*Type*: `string`

*Default*: `true`
*Default*: `"1MB"`

=== `topic_lag_refresh_period`

Expand All @@ -659,31 +637,13 @@ The period of time between each topic lag refresh cycle.

*Default*: `"5s"`

=== `output_resource`

The label of the redpanda_migrator output in which the currently selected topics need to be created before attempting to read messages.


*Type*: `string`

*Default*: `"redpanda_migrator_output"`

=== `replication_factor_override`
=== `auto_replay_nacks`

Use the specified replication factor when creating topics.
Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to `false` these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation.


*Type*: `bool`

*Default*: `true`

=== `replication_factor`

Replication factor for created topics. This is only used when `replication_factor_override` is set to `true`.


*Type*: `int`

*Default*: `3`


Loading
Loading