Skip to content

Commit e66b830

Browse files
committed
inputs/redpanda: add fetch_max_wait option
kgo.FetchMaxWait is a config option supported by franz-go. It makes it possible to use kgo.FetchMinBytes, but have a rather low max wait time to fill the batch. This makes it possible to force the broker to send big batches if possible, but still wait only for a short time if there's not enough data. This is especially important with the redpanda input, as it's using ordered franz-go. It will only send batches, if the previous batch with the partition has been consumed. If the broker keeps sending very small batches, e.g. size 1, it's likely to stall batched outputs. I could reproduce locally by using a producer that sends lots of batches of size 1. Using kgo.FetchMinBytes in combination with kgo.FetchMaxWait can solve this problem. But in any case, it is a useful tuning knob offered by franz-go, but also the standard Java client.
1 parent 21a9d97 commit e66b830

File tree

1 file changed

+12
-0
lines changed

1 file changed

+12
-0
lines changed

internal/impl/kafka/franz_reader.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package kafka
1616

1717
import (
1818
"fmt"
19+
"time"
1920

2021
"github.com/dustin/go-humanize"
2122
"github.com/twmb/franz-go/pkg/kgo"
@@ -53,6 +54,7 @@ const (
5354
kfrFieldFetchMaxBytes = "fetch_max_bytes"
5455
kfrFieldFetchMinBytes = "fetch_min_bytes"
5556
kfrFieldFetchMaxPartitionBytes = "fetch_max_partition_bytes"
57+
kfrFieldFetchMaxWait = "fetch_max_wait"
5658
)
5759

5860
// FranzConsumerFields returns a slice of fields specifically for customising
@@ -87,6 +89,10 @@ Finally, it's also possible to specify an explicit offset to consume from by add
8789
Description("Sets the maximum amount of bytes a broker will try to send during a fetch. Note that brokers may not obey this limit if it has records larger than this limit. This is the equivalent to the Java fetch.max.bytes setting.").
8890
Advanced().
8991
Default("50MiB"),
92+
service.NewDurationField(kfrFieldFetchMaxWait).
93+
Description("Sets the maximum amount of time a broker will wait for a fetch response to hit the minimum number of required bytes. This is the equivalent to the Java fetch.max.wait.ms setting.").
94+
Advanced().
95+
Default("5s"),
9096
service.NewStringField(kfrFieldFetchMinBytes).
9197
Description("Sets the minimum amount of bytes a broker will try to send during a fetch. This is the equivalent to the Java fetch.min.bytes setting.").
9298
Advanced().
@@ -109,6 +115,7 @@ type FranzConsumerDetails struct {
109115
FetchMinBytes int32
110116
FetchMaxBytes int32
111117
FetchMaxPartitionBytes int32
118+
FetchMaxWait time.Duration
112119
}
113120

114121
// FranzConsumerDetailsFromConfig returns a summary of kafka consumer
@@ -171,6 +178,10 @@ func FranzConsumerDetailsFromConfig(conf *service.ParsedConfig) (*FranzConsumerD
171178
return nil, err
172179
}
173180

181+
if d.FetchMaxWait, err = conf.FieldDuration(kfrFieldFetchMaxWait); err != nil {
182+
return nil, err
183+
}
184+
174185
return &d, nil
175186
}
176187

@@ -185,6 +196,7 @@ func (d *FranzConsumerDetails) FranzOpts() []kgo.Opt {
185196
kgo.FetchMaxBytes(d.FetchMaxBytes),
186197
kgo.FetchMinBytes(d.FetchMinBytes),
187198
kgo.FetchMaxPartitionBytes(d.FetchMaxPartitionBytes),
199+
kgo.FetchMaxWait(d.FetchMaxWait),
188200
}
189201

190202
if d.RegexPattern {

0 commit comments

Comments
 (0)