From 971f04676d84a2a24838cef601b2519670583bca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miguel=20=C3=81ngel=20Ortu=C3=B1o?= Date: Thu, 3 Oct 2024 15:53:45 +0200 Subject: [PATCH] use grafana/franz-go fork MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Miguel Ángel Ortuño --- go.mod | 7 +- go.sum | 4 +- .../twmb/franz-go/pkg/kgo/compression.go | 57 ++++++- .../twmb/franz-go/pkg/kgo/config.go | 22 +++ .../pkg/kgo/internal/pool/bucketed_pool.go | 94 ++++++++++++ .../twmb/franz-go/pkg/kgo/record_and_fetch.go | 34 +++++ .../twmb/franz-go/pkg/kgo/source.go | 140 ++++++++++++++---- vendor/modules.txt | 5 +- 8 files changed, 322 insertions(+), 41 deletions(-) create mode 100644 vendor/github.com/twmb/franz-go/pkg/kgo/internal/pool/bucketed_pool.go diff --git a/go.mod b/go.mod index f65d360ba4c..bff6dc9956f 100644 --- a/go.mod +++ b/go.mod @@ -315,8 +315,11 @@ replace github.com/opentracing-contrib/go-grpc => github.com/charleskorn/go-grpc // Replacing prometheus/alertmanager with our fork. replace github.com/prometheus/alertmanager => github.com/grafana/prometheus-alertmanager v0.25.1-0.20240924175849-b8b7c2c74eb6 -// Replacing with a fork commit based on v1.17.1 with https://github.com/twmb/franz-go/pull/803 cherry-picked. -replace github.com/twmb/franz-go => github.com/dimitarvdimitrov/franz-go v0.0.0-20240904145554-ceadc28d3bd9 +// Replacing with a fork commit based on v1.17.1 having cherry-picked the following PRs: +// - https://github.com/grafana/franz-go/pull/1 +// - https://github.com/grafana/franz-go/pull/3 +// - https://github.com/grafana/franz-go/pull/4 +replace github.com/twmb/franz-go => github.com/grafana/franz-go v0.0.0-20241003103709-f8f76a439eb4 // Pin Google GRPC to v1.65.0 as v1.66.0 has API changes and also potentially performance regressions. // Following https://github.com/grafana/dskit/pull/581 diff --git a/go.sum b/go.sum index 1f01d5c5b30..1a8eaadaabc 100644 --- a/go.sum +++ b/go.sum @@ -947,8 +947,6 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/r github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/digitalocean/godo v1.121.0 h1:ilXiHuEnhbJs2fmFEPX0r/QQ6KfiOIMAhJN3f8NiCfI= github.com/digitalocean/godo v1.121.0/go.mod h1:WQVH83OHUy6gC4gXpEVQKtxTd4L5oCp+5OialidkPLY= -github.com/dimitarvdimitrov/franz-go v0.0.0-20240904145554-ceadc28d3bd9 h1:jszPVGeTr25QTJ/jWiT7eXnabc4R4itChxUVFSCLjRQ= -github.com/dimitarvdimitrov/franz-go v0.0.0-20240904145554-ceadc28d3bd9/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM= github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK2OFGvA0= github.com/distribution/reference v0.5.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= github.com/dlclark/regexp2 v1.11.0 h1:G/nrcoOa7ZXlpoa/91N3X7mM3r8eIlMBBJZvsz/mxKI= @@ -1256,6 +1254,8 @@ github.com/grafana/dskit v0.0.0-20240925193654-7c41a4057319 h1:KACpOOTqA4WqyyKF2 github.com/grafana/dskit v0.0.0-20240925193654-7c41a4057319/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc h1:BW+LjKJDz0So5LI8UZfW5neWeKpSkWqhmGjQFzcFfLM= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc/go.mod h1:JVmqPBe8A/pZWwRoJW5ZjyALeY5OXMzPl7LrVXOdZAI= +github.com/grafana/franz-go v0.0.0-20241003103709-f8f76a439eb4 h1:7/CJa4ilczGHLjULGJFxRFAGsnaN33YIJEqpm45TUYs= +github.com/grafana/franz-go v0.0.0-20241003103709-f8f76a439eb4/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM= github.com/grafana/goautoneg v0.0.0-20240607115440-f335c04c58ce h1:WI1olbgS+sEl77qxEYbmt9TgRUz7iLqmjh8lYPpGlKQ= github.com/grafana/goautoneg v0.0.0-20240607115440-f335c04c58ce/go.mod h1:GFAN9Jn9t1cX7sNfc6ZoFyc4f7i8jtm3SajrWdZM2EE= github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56 h1:X8IKQ0wu40wpvYcKfBcc5T4QnhdQjUhtUtB/1CY89lE= diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/compression.go b/vendor/github.com/twmb/franz-go/pkg/kgo/compression.go index 81d9d8a7e3b..1adbe69a074 100644 --- a/vendor/github.com/twmb/franz-go/pkg/kgo/compression.go +++ b/vendor/github.com/twmb/franz-go/pkg/kgo/compression.go @@ -12,6 +12,8 @@ import ( "github.com/klauspost/compress/s2" "github.com/klauspost/compress/zstd" "github.com/pierrec/lz4/v4" + + "github.com/twmb/franz-go/pkg/kgo/internal/pool" ) var byteBuffers = sync.Pool{New: func() any { return bytes.NewBuffer(make([]byte, 8<<10)) }} @@ -266,15 +268,23 @@ type zstdDecoder struct { inner *zstd.Decoder } -func (d *decompressor) decompress(src []byte, codec byte) ([]byte, error) { +func (d *decompressor) decompress(src []byte, codec byte, pool *pool.BucketedPool[byte]) ([]byte, error) { // Early return in case there is no compression compCodec := codecType(codec) if compCodec == codecNone { return src, nil } - out := byteBuffers.Get().(*bytes.Buffer) - out.Reset() - defer byteBuffers.Put(out) + + out, buf, err := d.getDecodedBuffer(src, compCodec, pool) + if err != nil { + return nil, err + } + defer func() { + if compCodec == codecSnappy { + return + } + pool.Put(buf) + }() switch compCodec { case codecGzip: @@ -286,7 +296,7 @@ func (d *decompressor) decompress(src []byte, codec byte) ([]byte, error) { if _, err := io.Copy(out, ungz); err != nil { return nil, err } - return append([]byte(nil), out.Bytes()...), nil + return d.copyDecodedBuffer(out.Bytes(), compCodec, pool), nil case codecSnappy: if len(src) > 16 && bytes.HasPrefix(src, xerialPfx) { return xerialDecode(src) @@ -295,7 +305,7 @@ func (d *decompressor) decompress(src []byte, codec byte) ([]byte, error) { if err != nil { return nil, err } - return append([]byte(nil), decoded...), nil + return d.copyDecodedBuffer(decoded, compCodec, pool), nil case codecLZ4: unlz4 := d.unlz4Pool.Get().(*lz4.Reader) defer d.unlz4Pool.Put(unlz4) @@ -303,7 +313,7 @@ func (d *decompressor) decompress(src []byte, codec byte) ([]byte, error) { if _, err := io.Copy(out, unlz4); err != nil { return nil, err } - return append([]byte(nil), out.Bytes()...), nil + return d.copyDecodedBuffer(out.Bytes(), compCodec, pool), nil case codecZstd: unzstd := d.unzstdPool.Get().(*zstdDecoder) defer d.unzstdPool.Put(unzstd) @@ -311,12 +321,43 @@ func (d *decompressor) decompress(src []byte, codec byte) ([]byte, error) { if err != nil { return nil, err } - return append([]byte(nil), decoded...), nil + return d.copyDecodedBuffer(decoded, compCodec, pool), nil default: return nil, errors.New("unknown compression codec") } } +func (d *decompressor) getDecodedBuffer(src []byte, compCodec codecType, pool *pool.BucketedPool[byte]) (*bytes.Buffer, []byte, error) { + var ( + decodedBufSize int + err error + ) + switch compCodec { + case codecSnappy: + decodedBufSize, err = s2.DecodedLen(src) + if err != nil { + return nil, nil, err + } + + default: + // Make a guess at the output size. + decodedBufSize = len(src) * 2 + } + buf := pool.Get(decodedBufSize)[:0] + + return bytes.NewBuffer(buf), buf, nil +} + +func (d *decompressor) copyDecodedBuffer(decoded []byte, compCodec codecType, pool *pool.BucketedPool[byte]) []byte { + if compCodec == codecSnappy { + // We already know the actual size of the decoded buffer before decompression, + // so there's no need to copy the buffer. + return decoded + } + out := pool.Get(len(decoded)) + return append(out[:0], decoded...) +} + var xerialPfx = []byte{130, 83, 78, 65, 80, 80, 89, 0} var errMalformedXerial = errors.New("malformed xerial framing") diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/config.go b/vendor/github.com/twmb/franz-go/pkg/kgo/config.go index 92ebeaa3905..dabaf03f618 100644 --- a/vendor/github.com/twmb/franz-go/pkg/kgo/config.go +++ b/vendor/github.com/twmb/franz-go/pkg/kgo/config.go @@ -16,6 +16,8 @@ import ( "github.com/twmb/franz-go/pkg/kmsg" "github.com/twmb/franz-go/pkg/kversion" "github.com/twmb/franz-go/pkg/sasl" + + "github.com/twmb/franz-go/pkg/kgo/internal/pool" ) // Opt is an option to configure a client. @@ -151,6 +153,9 @@ type cfg struct { partitions map[string]map[int32]Offset // partitions to directly consume from regex bool + recordsPool *recordsPool + decompressBufferPool *pool.BucketedPool[byte] + //////////////////////////// // CONSUMER GROUP SECTION // //////////////////////////// @@ -389,6 +394,11 @@ func (cfg *cfg) validate() error { } cfg.hooks = processedHooks + // Assume a 2x compression ratio. + maxDecompressedBatchSize := int(cfg.maxBytes.load()) * 2 + cfg.decompressBufferPool = pool.NewBucketedPool[byte](4096, maxDecompressedBatchSize, 2, func(sz int) []byte { + return make([]byte, sz) + }) return nil } @@ -1347,6 +1357,18 @@ func ConsumeRegex() ConsumerOpt { return consumerOpt{func(cfg *cfg) { cfg.regex = true }} } +// EnableRecordsPool sets the client to obtain the *kgo.Record objects from a pool, +// in order to minimize the number of allocations. +// +// By enabling this option, the records returned by PollFetches/PollRecords +// can be sent back to the pool via ReuseRecords method in order to be recycled. +// +// This option is particularly useful for use cases where the volume of generated records is very high, +// as it can negatively impact performance due to the extra GC overhead. +func EnableRecordsPool() ConsumerOpt { + return consumerOpt{func(cfg *cfg) { cfg.recordsPool = newRecordsPool() }} +} + // DisableFetchSessions sets the client to not use fetch sessions (Kafka 1.0+). // // A "fetch session" is is a way to reduce bandwidth for fetch requests & diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/internal/pool/bucketed_pool.go b/vendor/github.com/twmb/franz-go/pkg/kgo/internal/pool/bucketed_pool.go new file mode 100644 index 00000000000..b5c435a55e2 --- /dev/null +++ b/vendor/github.com/twmb/franz-go/pkg/kgo/internal/pool/bucketed_pool.go @@ -0,0 +1,94 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pool + +import ( + "sync" +) + +// BucketedPool is a bucketed pool for variably sized slices. +type BucketedPool[T any] struct { + buckets []sync.Pool + sizes []int + // make is the function used to create an empty slice when none exist yet. + make func(int) []T +} + +// NewBucketedPool returns a new BucketedPool with size buckets for minSize to maxSize +// increasing by the given factor. +func NewBucketedPool[T any](minSize, maxSize int, factor float64, makeFunc func(int) []T) *BucketedPool[T] { + if minSize < 1 { + panic("invalid minimum pool size") + } + if maxSize < 1 { + panic("invalid maximum pool size") + } + if factor < 1 { + panic("invalid factor") + } + + var sizes []int + + for s := minSize; s <= maxSize; s = int(float64(s) * factor) { + sizes = append(sizes, s) + } + + p := &BucketedPool[T]{ + buckets: make([]sync.Pool, len(sizes)), + sizes: sizes, + make: makeFunc, + } + return p +} + +// Get returns a new slice with capacity greater than or equal to size. +func (p *BucketedPool[T]) Get(size int) []T { + for i, bktSize := range p.sizes { + if size > bktSize { + continue + } + buff := p.buckets[i].Get() + if buff == nil { + buff = p.make(bktSize) + } + return buff.([]T) + } + return p.make(size) +} + +// Put adds a slice to the right bucket in the pool. +// If the slice does not belong to any bucket in the pool, it is ignored. +func (p *BucketedPool[T]) Put(s []T) { + sCap := cap(s) + if sCap < p.sizes[0] { + return + } + + for i, size := range p.sizes { + if sCap > size { + continue + } + + if sCap == size { + // Buffer is exactly the minimum size for this bucket. Add it to this bucket. + p.buckets[i].Put(s) + } else { + // Buffer belongs in previous bucket. + p.buckets[i-1].Put(s) + } + return + } +} + + diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/record_and_fetch.go b/vendor/github.com/twmb/franz-go/pkg/kgo/record_and_fetch.go index 4f1ebe6f524..dbfb94510a6 100644 --- a/vendor/github.com/twmb/franz-go/pkg/kgo/record_and_fetch.go +++ b/vendor/github.com/twmb/franz-go/pkg/kgo/record_and_fetch.go @@ -6,6 +6,8 @@ import ( "reflect" "time" "unsafe" + + "github.com/twmb/franz-go/pkg/kmsg" ) // RecordHeader contains extra information that can be sent with Records. @@ -149,6 +151,38 @@ type Record struct { // producer hooks. It can also be set in a consumer hook to propagate // enrichment to consumer clients. Context context.Context + + // recordsPool is the pool that this record was fetched from, if any. + // + // When reused, record is returned to this pool. + recordsPool *recordsPool + + // rcBatchBuffer is used to keep track of the raw buffer that this record was + // derived from when consuming, after decompression. + // + // This is used to allow reusing these buffers when record pooling has been enabled + // via EnableRecordsPool option. + rcBatchBuffer *rcBuffer[byte] + + // rcRawRecordsBuffer is used to keep track of the raw record buffer that this record was + // derived from when consuming. + // + // This is used to allow reusing these buffers when record pooling has been enabled + // via EnableRecordsPool option. + rcRawRecordsBuffer *rcBuffer[kmsg.Record] +} + +// Reuse releases the record back to the pool. +// +// +// Once this method has been called, any reference to the passed record should be considered invalid by the caller, +// as it may be reused as a result of future calls to the PollFetches/PollRecords method. +func (r *Record) Reuse() { + if r.recordsPool != nil { + r.rcRawRecordsBuffer.release() + r.rcBatchBuffer.release() + r.recordsPool.put(r) + } } func (r *Record) userSize() int64 { diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/source.go b/vendor/github.com/twmb/franz-go/pkg/kgo/source.go index 85586a0a412..69fafba7feb 100644 --- a/vendor/github.com/twmb/franz-go/pkg/kgo/source.go +++ b/vendor/github.com/twmb/franz-go/pkg/kgo/source.go @@ -9,13 +9,61 @@ import ( "sort" "strings" "sync" + "sync/atomic" "time" "github.com/twmb/franz-go/pkg/kbin" "github.com/twmb/franz-go/pkg/kerr" + "github.com/twmb/franz-go/pkg/kgo/internal/pool" "github.com/twmb/franz-go/pkg/kmsg" ) +type recordsPool struct{ p *sync.Pool } + +func newRecordsPool() *recordsPool { + return &recordsPool{ + p: &sync.Pool{New: func() any { return &Record{} }}, + } +} + +func (p *recordsPool) get() *Record { + return p.p.Get().(*Record) +} + +func (p *recordsPool) put(r *Record) { + *r = Record{} // zero out the record + p.p.Put(r) +} + +// rcBuffer is a reference counted buffer. +// +// The internal buffer will be sent back to the pool after calling release +// when the ref count reaches 0. +type rcBuffer[T any] struct { + refCount atomic.Int32 + buffer []T + pool *pool.BucketedPool[T] +} + +func newRCBuffer[T any](buffer []T, pool *pool.BucketedPool[T]) *rcBuffer[T] { + return &rcBuffer[T]{buffer: buffer, pool: pool} +} + +func (b *rcBuffer[T]) acquire() { + b.refCount.Add(1) +} + +func (b *rcBuffer[T]) release() { + if b.refCount.Add(-1) == 0 { + b.pool.Put(b.buffer) + b.buffer = nil + return + } + if b.refCount.Load() < 0 { + panic("rcBuffer released too many times") + } +} + type readerFrom interface { ReadFrom([]byte) error } @@ -110,6 +158,12 @@ type ProcessFetchPartitionOptions struct { // Topic is used to populate the Partition field of each Record. Partition int32 + + // DecompressBufferPool is a pool of buffers to use for decompressing batches. + DecompressBufferPool *pool.BucketedPool[byte] + + // recordsPool is for internal use only. + recordPool *recordsPool } // cursor is where we are consuming from for an individual partition. @@ -1088,7 +1142,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe continue } - fp := partOffset.processRespPartition(br, rp, s.cl.cfg.hooks) + fp := partOffset.processRespPartition(br, rp, s.cl.cfg.hooks, s.cl.cfg.recordsPool, s.cl.cfg.decompressBufferPool) if fp.Err != nil { if moving := kmove.maybeAddFetchPartition(resp, rp, partOffset.from); moving { strip(topic, partition, fp.Err) @@ -1265,16 +1319,18 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe // processRespPartition processes all records in all potentially compressed // batches (or message sets). -func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchResponseTopicPartition, hooks hooks) (fp FetchPartition) { +func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchResponseTopicPartition, hooks hooks, recordsPool *recordsPool, decompressBufferPool *pool.BucketedPool[byte]) (fp FetchPartition) { if rp.ErrorCode == 0 { o.hwm = rp.HighWatermark } opts := ProcessFetchPartitionOptions{ - KeepControlRecords: br.cl.cfg.keepControl, - Offset: o.offset, - IsolationLevel: IsolationLevel{br.cl.cfg.isolationLevel}, - Topic: o.from.topic, - Partition: o.from.partition, + KeepControlRecords: br.cl.cfg.keepControl, + Offset: o.offset, + IsolationLevel: IsolationLevel{br.cl.cfg.isolationLevel}, + Topic: o.from.topic, + Partition: o.from.partition, + DecompressBufferPool: decompressBufferPool, + recordPool: recordsPool, } observeMetrics := func(m FetchBatchMetrics) { hooks.each(func(h Hook) { @@ -1483,11 +1539,17 @@ func (a aborter) trackAbortedPID(producerID int64) { // processing records to fetch part // ////////////////////////////////////// +var rawRecordsPool = pool.NewBucketedPool[kmsg.Record](32, 16*1024, 2, func(len int) []kmsg.Record { + return make([]kmsg.Record, len) +}) + // readRawRecords reads n records from in and returns them, returning early if // there were partial records. func readRawRecords(n int, in []byte) []kmsg.Record { - rs := make([]kmsg.Record, n) + rs := rawRecordsPool.Get(n) + rs = rs[:n] for i := 0; i < n; i++ { + rs[i] = kmsg.Record{} length, used := kbin.Varint(in) total := used + int(length) if used == 0 || length < 0 || len(in) < total { @@ -1523,7 +1585,7 @@ func processRecordBatch( rawRecords := batch.Records if compression := byte(batch.Attributes & 0x0007); compression != 0 { var err error - if rawRecords, err = decompressor.decompress(rawRecords, compression); err != nil { + if rawRecords, err = decompressor.decompress(rawRecords, compression, o.DecompressBufferPool); err != nil { return 0, 0 // truncated batch } } @@ -1550,6 +1612,15 @@ func processRecordBatch( } }() + var ( + rcBatchBuff *rcBuffer[byte] + rcRawRecordsBuff *rcBuffer[kmsg.Record] + ) + if o.recordPool != nil { + rcBatchBuff = newRCBuffer(rawRecords, o.DecompressBufferPool) + rcRawRecordsBuff = newRCBuffer(krecords, rawRecordsPool) + } + abortBatch := aborter.shouldAbortBatch(batch) for i := range krecords { record := recordToRecord( @@ -1557,9 +1628,10 @@ func processRecordBatch( fp.Partition, batch, &krecords[i], + o.recordPool, ) - o.maybeKeepRecord(fp, record, abortBatch) + o.maybeKeepRecord(fp, record, rcBatchBuff, rcRawRecordsBuff, abortBatch) if abortBatch && record.Attrs.IsControl() { // A control record has a key and a value where the key // is int16 version and int16 type. Aborted records @@ -1584,7 +1656,7 @@ func processV1OuterMessage(o *ProcessFetchPartitionOptions, fp *FetchPartition, return 1, 0 } - rawInner, err := decompressor.decompress(message.Value, compression) + rawInner, err := decompressor.decompress(message.Value, compression, o.DecompressBufferPool) if err != nil { return 0, 0 // truncated batch } @@ -1679,7 +1751,7 @@ func processV1Message( return false } record := v1MessageToRecord(o.Topic, fp.Partition, message) - o.maybeKeepRecord(fp, record, false) + o.maybeKeepRecord(fp, record, nil, nil, false) return true } @@ -1697,7 +1769,7 @@ func processV0OuterMessage( return 1, 0 // uncompressed bytes is 0; set to compressed bytes on return } - rawInner, err := decompressor.decompress(message.Value, compression) + rawInner, err := decompressor.decompress(message.Value, compression, o.DecompressBufferPool) if err != nil { return 0, 0 // truncated batch } @@ -1757,7 +1829,7 @@ func processV0Message( return false } record := v0MessageToRecord(o.Topic, fp.Partition, message) - o.maybeKeepRecord(fp, record, false) + o.maybeKeepRecord(fp, record, nil, nil, false) return true } @@ -1765,7 +1837,7 @@ func processV0Message( // // If the record is being aborted or the record is a control record and the // client does not want to keep control records, this does not keep the record. -func (o *ProcessFetchPartitionOptions) maybeKeepRecord(fp *FetchPartition, record *Record, abort bool) { +func (o *ProcessFetchPartitionOptions) maybeKeepRecord(fp *FetchPartition, record *Record, rcBatchBuff *rcBuffer[byte], rcRawRecordsBuff *rcBuffer[kmsg.Record], abort bool) { if record.Offset < o.Offset { // We asked for offset 5, but that was in the middle of a // batch; we got offsets 0 thru 4 that we need to skip. @@ -1777,6 +1849,13 @@ func (o *ProcessFetchPartitionOptions) maybeKeepRecord(fp *FetchPartition, recor abort = !o.KeepControlRecords } if !abort { + if rcBatchBuff != nil && rcRawRecordsBuff != nil { + rcBatchBuff.acquire() + record.rcBatchBuffer = rcBatchBuff + + rcRawRecordsBuff.acquire() + record.rcRawRecordsBuffer = rcRawRecordsBuff + } fp.Records = append(fp.Records, record) } @@ -1799,6 +1878,7 @@ func recordToRecord( partition int32, batch *kmsg.RecordBatch, record *kmsg.Record, + recordsPool *recordsPool, ) *Record { h := make([]RecordHeader, 0, len(record.Headers)) for _, kv := range record.Headers { @@ -1807,19 +1887,25 @@ func recordToRecord( Value: kv.Value, }) } - - r := &Record{ - Key: record.Key, - Value: record.Value, - Headers: h, - Topic: topic, - Partition: partition, - Attrs: RecordAttrs{uint8(batch.Attributes)}, - ProducerID: batch.ProducerID, - ProducerEpoch: batch.ProducerEpoch, - LeaderEpoch: batch.PartitionLeaderEpoch, - Offset: batch.FirstOffset + int64(record.OffsetDelta), + var r *Record + if recordsPool != nil { + r = recordsPool.get() + } else { + r = new(Record) } + + r.Key = record.Key + r.Value = record.Value + r.Headers = h + r.Topic = topic + r.Partition = partition + r.Attrs = RecordAttrs{uint8(batch.Attributes)} + r.ProducerID = batch.ProducerID + r.ProducerEpoch = batch.ProducerEpoch + r.LeaderEpoch = batch.PartitionLeaderEpoch + r.Offset = batch.FirstOffset + int64(record.OffsetDelta) + r.recordsPool = recordsPool + if r.Attrs.TimestampType() == 0 { r.Timestamp = timeFromMillis(batch.FirstTimestamp + record.TimestampDelta64) } else { diff --git a/vendor/modules.txt b/vendor/modules.txt index 313b1d68673..ceb9e3fdc81 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1143,11 +1143,12 @@ github.com/tklauser/go-sysconf # github.com/tklauser/numcpus v0.6.1 ## explicit; go 1.13 github.com/tklauser/numcpus -# github.com/twmb/franz-go v1.17.1 => github.com/dimitarvdimitrov/franz-go v0.0.0-20240904145554-ceadc28d3bd9 +# github.com/twmb/franz-go v1.17.1 => github.com/grafana/franz-go v0.0.0-20241003103709-f8f76a439eb4 ## explicit; go 1.21 github.com/twmb/franz-go/pkg/kbin github.com/twmb/franz-go/pkg/kerr github.com/twmb/franz-go/pkg/kgo +github.com/twmb/franz-go/pkg/kgo/internal/pool github.com/twmb/franz-go/pkg/kgo/internal/sticky github.com/twmb/franz-go/pkg/kversion github.com/twmb/franz-go/pkg/sasl @@ -1672,5 +1673,5 @@ sigs.k8s.io/yaml/goyaml.v3 # github.com/opentracing-contrib/go-stdlib => github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956 # github.com/opentracing-contrib/go-grpc => github.com/charleskorn/go-grpc v0.0.0-20231024023642-e9298576254f # github.com/prometheus/alertmanager => github.com/grafana/prometheus-alertmanager v0.25.1-0.20240924175849-b8b7c2c74eb6 -# github.com/twmb/franz-go => github.com/dimitarvdimitrov/franz-go v0.0.0-20240904145554-ceadc28d3bd9 +# github.com/twmb/franz-go => github.com/grafana/franz-go v0.0.0-20241003103709-f8f76a439eb4 # google.golang.org/grpc => google.golang.org/grpc v1.65.0