Skip to content

Commit

Permalink
Add max records per request field to sqs output (#3031)
Browse files Browse the repository at this point in the history
* Add max records per request field to sqs output

* Fix test

* Add lint rule
  • Loading branch information
Jeffail authored Nov 22, 2024
1 parent 42953e5 commit 0308fa8
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 9 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ Changelog

All notable changes to this project will be documented in this file.

## 4.41.0 - TBD

### Added

- Field `max_records_per_request` added to the `aws_sqs` output. (@Jeffail)

## 4.40.0 - 2024-11-21

### Added
Expand Down
10 changes: 10 additions & 0 deletions docs/modules/components/pages/outputs/aws_sqs.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ output:
period: ""
check: ""
processors: [] # No default (optional)
max_records_per_request: 10
region: ""
endpoint: ""
credentials:
Expand Down Expand Up @@ -276,6 +277,15 @@ processors:
format: json_array
```
=== `max_records_per_request`
Customize the maximum number of records delivered in a single SQS request. This value must be greater than 0 but no greater than 10.
*Type*: `int`
*Default*: `10`
=== `region`
The AWS region to target.
Expand Down
43 changes: 43 additions & 0 deletions internal/impl/aws/integration_sqs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,47 @@ input:
}),
integration.StreamTestOptPort(lsPort),
)

t.Run("batch_limited", func(t *testing.T) {
template := `
output:
aws_sqs:
url: http://localhost:$PORT/000000000000/queue-$ID
endpoint: http://localhost:$PORT
region: eu-west-1
credentials:
id: xxxxx
secret: xxxxx
token: xxxxx
max_in_flight: $MAX_IN_FLIGHT
batching:
count: $OUTPUT_BATCH_COUNT
max_records_per_request: 1
input:
aws_sqs:
url: http://localhost:$PORT/000000000000/queue-$ID
endpoint: http://localhost:$PORT
region: eu-west-1
credentials:
id: xxxxx
secret: xxxxx
token: xxxxx
`
integration.StreamTests(
integration.StreamTestOpenClose(),
integration.StreamTestSendBatch(10),
integration.StreamTestStreamSequential(50),
integration.StreamTestStreamParallel(50),
integration.StreamTestStreamParallelLossy(50),
integration.StreamTestStreamParallelLossyThroughReconnect(50),
).Run(
t, template,
integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, vars *integration.StreamTestConfigVars) {
require.NoError(t, createBucketQueue(ctx, "", lsPort, vars.ID))
}),
integration.StreamTestOptPort(lsPort),
)
})

}
25 changes: 19 additions & 6 deletions internal/impl/aws/output_sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ const (
sqsoFieldDelaySeconds = "delay_seconds"
sqsoFieldMetadata = "metadata"
sqsoFieldBatching = "batching"

sqsMaxRecordsCount = 10
sqsoFieldMaxRecordsCount = "max_records_per_request"
)

type sqsoConfig struct {
Expand All @@ -55,6 +54,8 @@ type sqsoConfig struct {
MessageDeduplicationID *service.InterpolatedString
DelaySeconds *service.InterpolatedString

MaxRecordsCount int

Metadata *service.MetadataExcludeFilter
aconf aws.Config
backoffCtor func() backoff.BackOff
Expand Down Expand Up @@ -88,6 +89,13 @@ func sqsoConfigFromParsed(pConf *service.ParsedConfig) (conf sqsoConfig, err err
if conf.backoffCtor, err = retries.CommonRetryBackOffCtorFromParsed(pConf); err != nil {
return
}
if conf.MaxRecordsCount, err = pConf.FieldInt(sqsoFieldMaxRecordsCount); err != nil {
return
}
if conf.MaxRecordsCount <= 0 || conf.MaxRecordsCount > 10 {
err = errors.New("field " + sqsoFieldMaxRecordsCount + " must be >0 and <= 10")
return
}
return
}

Expand Down Expand Up @@ -121,6 +129,11 @@ By default Redpanda Connect will use a shared credentials file when connecting t
service.NewMetadataExcludeFilterField(snsoFieldMetadata).
Description("Specify criteria for which metadata values are sent as headers."),
service.NewBatchPolicyField(koFieldBatching),
service.NewIntField(sqsoFieldMaxRecordsCount).
Description("Customize the maximum number of records delivered in a single SQS request. This value must be greater than 0 but no greater than 10.").
Default(10).
LintRule(`if this <= 0 || this > 10 { "this field must be >0 and <=10" } `).
Advanced(),
).
Fields(config.SessionFields()...).
Fields(retries.CommonRetryBackOffFields(0, "1s", "5s", "30s")...)
Expand Down Expand Up @@ -321,8 +334,8 @@ func (a *sqsWriter) writeChunk(
}

// trim input length to max sqs batch size
if len(entries) > sqsMaxRecordsCount {
input.Entries, entries = entries[:sqsMaxRecordsCount], entries[sqsMaxRecordsCount:]
if len(entries) > a.conf.MaxRecordsCount {
input.Entries, entries = entries[:a.conf.MaxRecordsCount], entries[a.conf.MaxRecordsCount:]
} else {
entries = nil
}
Expand Down Expand Up @@ -385,8 +398,8 @@ func (a *sqsWriter) writeChunk(

// add remaining records to batch
l := len(input.Entries)
if n := len(entries); n > 0 && l < sqsMaxRecordsCount {
if remaining := sqsMaxRecordsCount - l; remaining < n {
if n := len(entries); n > 0 && l < a.conf.MaxRecordsCount {
if remaining := a.conf.MaxRecordsCount - l; remaining < n {
input.Entries, entries = append(input.Entries, entries[:remaining]...), entries[remaining:]
} else {
input.Entries, entries = append(input.Entries, entries...), nil
Expand Down
9 changes: 6 additions & 3 deletions internal/impl/aws/output_sqs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ func TestSQSRetries(t *testing.T) {
backoffCtor: func() backoff.BackOff {
return backoff.NewExponentialBackOff()
},
aconf: conf,
aconf: conf,
MaxRecordsCount: 10,
}, service.MockResources())
require.NoError(t, err)

Expand Down Expand Up @@ -226,7 +227,8 @@ func TestSQSSendLimit(t *testing.T) {
backoffCtor: func() backoff.BackOff {
return backoff.NewExponentialBackOff()
},
aconf: conf,
aconf: conf,
MaxRecordsCount: 10,
}, service.MockResources())
require.NoError(t, err)

Expand Down Expand Up @@ -300,7 +302,8 @@ func TestSQSMultipleQueues(t *testing.T) {
backoffCtor: func() backoff.BackOff {
return backoff.NewExponentialBackOff()
},
aconf: conf,
aconf: conf,
MaxRecordsCount: 10,
}, service.MockResources())
require.NoError(t, err)

Expand Down

0 comments on commit 0308fa8

Please sign in to comment.