From 0308fa81bda9728b62a16a8fd3d5b7cc5aab0be9 Mon Sep 17 00:00:00 2001 From: Ashley Jeffs Date: Fri, 22 Nov 2024 16:44:46 +0000 Subject: [PATCH] Add max records per request field to sqs output (#3031) * Add max records per request field to sqs output * Fix test * Add lint rule --- CHANGELOG.md | 6 +++ .../components/pages/outputs/aws_sqs.adoc | 10 +++++ internal/impl/aws/integration_sqs_test.go | 43 +++++++++++++++++++ internal/impl/aws/output_sqs.go | 25 ++++++++--- internal/impl/aws/output_sqs_test.go | 9 ++-- 5 files changed, 84 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b088934aef..9f8839501a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,12 @@ Changelog All notable changes to this project will be documented in this file. +## 4.41.0 - TBD + +### Added + +- Field `max_records_per_request` added to the `aws_sqs` output. (@Jeffail) + ## 4.40.0 - 2024-11-21 ### Added diff --git a/docs/modules/components/pages/outputs/aws_sqs.adoc b/docs/modules/components/pages/outputs/aws_sqs.adoc index 09de443ed3..d94ddf2823 100644 --- a/docs/modules/components/pages/outputs/aws_sqs.adoc +++ b/docs/modules/components/pages/outputs/aws_sqs.adoc @@ -76,6 +76,7 @@ output: period: "" check: "" processors: [] # No default (optional) + max_records_per_request: 10 region: "" endpoint: "" credentials: @@ -276,6 +277,15 @@ processors: format: json_array ``` +=== `max_records_per_request` + +Customize the maximum number of records delivered in a single SQS request. This value must be greater than 0 but no greater than 10. + + +*Type*: `int` + +*Default*: `10` + === `region` The AWS region to target. diff --git a/internal/impl/aws/integration_sqs_test.go b/internal/impl/aws/integration_sqs_test.go index 166f33901d..d981698015 100644 --- a/internal/impl/aws/integration_sqs_test.go +++ b/internal/impl/aws/integration_sqs_test.go @@ -64,4 +64,47 @@ input: }), integration.StreamTestOptPort(lsPort), ) + + t.Run("batch_limited", func(t *testing.T) { + template := ` +output: + aws_sqs: + url: http://localhost:$PORT/000000000000/queue-$ID + endpoint: http://localhost:$PORT + region: eu-west-1 + credentials: + id: xxxxx + secret: xxxxx + token: xxxxx + max_in_flight: $MAX_IN_FLIGHT + batching: + count: $OUTPUT_BATCH_COUNT + max_records_per_request: 1 + +input: + aws_sqs: + url: http://localhost:$PORT/000000000000/queue-$ID + endpoint: http://localhost:$PORT + region: eu-west-1 + credentials: + id: xxxxx + secret: xxxxx + token: xxxxx +` + integration.StreamTests( + integration.StreamTestOpenClose(), + integration.StreamTestSendBatch(10), + integration.StreamTestStreamSequential(50), + integration.StreamTestStreamParallel(50), + integration.StreamTestStreamParallelLossy(50), + integration.StreamTestStreamParallelLossyThroughReconnect(50), + ).Run( + t, template, + integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, vars *integration.StreamTestConfigVars) { + require.NoError(t, createBucketQueue(ctx, "", lsPort, vars.ID)) + }), + integration.StreamTestOptPort(lsPort), + ) + }) + } diff --git a/internal/impl/aws/output_sqs.go b/internal/impl/aws/output_sqs.go index 07166e42fe..6782c174c0 100644 --- a/internal/impl/aws/output_sqs.go +++ b/internal/impl/aws/output_sqs.go @@ -45,8 +45,7 @@ const ( sqsoFieldDelaySeconds = "delay_seconds" sqsoFieldMetadata = "metadata" sqsoFieldBatching = "batching" - - sqsMaxRecordsCount = 10 + sqsoFieldMaxRecordsCount = "max_records_per_request" ) type sqsoConfig struct { @@ -55,6 +54,8 @@ type sqsoConfig struct { MessageDeduplicationID *service.InterpolatedString DelaySeconds *service.InterpolatedString + MaxRecordsCount int + Metadata *service.MetadataExcludeFilter aconf aws.Config backoffCtor func() backoff.BackOff @@ -88,6 +89,13 @@ func sqsoConfigFromParsed(pConf *service.ParsedConfig) (conf sqsoConfig, err err if conf.backoffCtor, err = retries.CommonRetryBackOffCtorFromParsed(pConf); err != nil { return } + if conf.MaxRecordsCount, err = pConf.FieldInt(sqsoFieldMaxRecordsCount); err != nil { + return + } + if conf.MaxRecordsCount <= 0 || conf.MaxRecordsCount > 10 { + err = errors.New("field " + sqsoFieldMaxRecordsCount + " must be >0 and <= 10") + return + } return } @@ -121,6 +129,11 @@ By default Redpanda Connect will use a shared credentials file when connecting t service.NewMetadataExcludeFilterField(snsoFieldMetadata). Description("Specify criteria for which metadata values are sent as headers."), service.NewBatchPolicyField(koFieldBatching), + service.NewIntField(sqsoFieldMaxRecordsCount). + Description("Customize the maximum number of records delivered in a single SQS request. This value must be greater than 0 but no greater than 10."). + Default(10). + LintRule(`if this <= 0 || this > 10 { "this field must be >0 and <=10" } `). + Advanced(), ). Fields(config.SessionFields()...). Fields(retries.CommonRetryBackOffFields(0, "1s", "5s", "30s")...) @@ -321,8 +334,8 @@ func (a *sqsWriter) writeChunk( } // trim input length to max sqs batch size - if len(entries) > sqsMaxRecordsCount { - input.Entries, entries = entries[:sqsMaxRecordsCount], entries[sqsMaxRecordsCount:] + if len(entries) > a.conf.MaxRecordsCount { + input.Entries, entries = entries[:a.conf.MaxRecordsCount], entries[a.conf.MaxRecordsCount:] } else { entries = nil } @@ -385,8 +398,8 @@ func (a *sqsWriter) writeChunk( // add remaining records to batch l := len(input.Entries) - if n := len(entries); n > 0 && l < sqsMaxRecordsCount { - if remaining := sqsMaxRecordsCount - l; remaining < n { + if n := len(entries); n > 0 && l < a.conf.MaxRecordsCount { + if remaining := a.conf.MaxRecordsCount - l; remaining < n { input.Entries, entries = append(input.Entries, entries[:remaining]...), entries[remaining:] } else { input.Entries, entries = append(input.Entries, entries...), nil diff --git a/internal/impl/aws/output_sqs_test.go b/internal/impl/aws/output_sqs_test.go index 70ca8f8091..a6bca37770 100644 --- a/internal/impl/aws/output_sqs_test.go +++ b/internal/impl/aws/output_sqs_test.go @@ -153,7 +153,8 @@ func TestSQSRetries(t *testing.T) { backoffCtor: func() backoff.BackOff { return backoff.NewExponentialBackOff() }, - aconf: conf, + aconf: conf, + MaxRecordsCount: 10, }, service.MockResources()) require.NoError(t, err) @@ -226,7 +227,8 @@ func TestSQSSendLimit(t *testing.T) { backoffCtor: func() backoff.BackOff { return backoff.NewExponentialBackOff() }, - aconf: conf, + aconf: conf, + MaxRecordsCount: 10, }, service.MockResources()) require.NoError(t, err) @@ -300,7 +302,8 @@ func TestSQSMultipleQueues(t *testing.T) { backoffCtor: func() backoff.BackOff { return backoff.NewExponentialBackOff() }, - aconf: conf, + aconf: conf, + MaxRecordsCount: 10, }, service.MockResources()) require.NoError(t, err)