From 971f04676d84a2a24838cef601b2519670583bca Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Miguel=20=C3=81ngel=20Ortu=C3=B1o?= <ortuman@gmail.com>
Date: Thu, 3 Oct 2024 15:53:45 +0200
Subject: [PATCH] use grafana/franz-go fork
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Signed-off-by: Miguel Ángel Ortuño <ortuman@gmail.com>
---
 go.mod                                        |   7 +-
 go.sum                                        |   4 +-
 .../twmb/franz-go/pkg/kgo/compression.go      |  57 ++++++-
 .../twmb/franz-go/pkg/kgo/config.go           |  22 +++
 .../pkg/kgo/internal/pool/bucketed_pool.go    |  94 ++++++++++++
 .../twmb/franz-go/pkg/kgo/record_and_fetch.go |  34 +++++
 .../twmb/franz-go/pkg/kgo/source.go           | 140 ++++++++++++++----
 vendor/modules.txt                            |   5 +-
 8 files changed, 322 insertions(+), 41 deletions(-)
 create mode 100644 vendor/github.com/twmb/franz-go/pkg/kgo/internal/pool/bucketed_pool.go

diff --git a/go.mod b/go.mod
index f65d360ba4c..bff6dc9956f 100644
--- a/go.mod
+++ b/go.mod
@@ -315,8 +315,11 @@ replace github.com/opentracing-contrib/go-grpc => github.com/charleskorn/go-grpc
 // Replacing prometheus/alertmanager with our fork.
 replace github.com/prometheus/alertmanager => github.com/grafana/prometheus-alertmanager v0.25.1-0.20240924175849-b8b7c2c74eb6
 
-// Replacing with a fork commit based on v1.17.1 with https://github.com/twmb/franz-go/pull/803 cherry-picked.
-replace github.com/twmb/franz-go => github.com/dimitarvdimitrov/franz-go v0.0.0-20240904145554-ceadc28d3bd9
+// Replacing with a fork commit based on v1.17.1 having cherry-picked the following PRs:
+// - https://github.com/grafana/franz-go/pull/1
+// - https://github.com/grafana/franz-go/pull/3
+// - https://github.com/grafana/franz-go/pull/4
+replace github.com/twmb/franz-go => github.com/grafana/franz-go v0.0.0-20241003103709-f8f76a439eb4
 
 // Pin Google GRPC to v1.65.0 as v1.66.0 has API changes and also potentially performance regressions.
 // Following https://github.com/grafana/dskit/pull/581
diff --git a/go.sum b/go.sum
index 1f01d5c5b30..1a8eaadaabc 100644
--- a/go.sum
+++ b/go.sum
@@ -947,8 +947,6 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/r
 github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
 github.com/digitalocean/godo v1.121.0 h1:ilXiHuEnhbJs2fmFEPX0r/QQ6KfiOIMAhJN3f8NiCfI=
 github.com/digitalocean/godo v1.121.0/go.mod h1:WQVH83OHUy6gC4gXpEVQKtxTd4L5oCp+5OialidkPLY=
-github.com/dimitarvdimitrov/franz-go v0.0.0-20240904145554-ceadc28d3bd9 h1:jszPVGeTr25QTJ/jWiT7eXnabc4R4itChxUVFSCLjRQ=
-github.com/dimitarvdimitrov/franz-go v0.0.0-20240904145554-ceadc28d3bd9/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM=
 github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK2OFGvA0=
 github.com/distribution/reference v0.5.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
 github.com/dlclark/regexp2 v1.11.0 h1:G/nrcoOa7ZXlpoa/91N3X7mM3r8eIlMBBJZvsz/mxKI=
@@ -1256,6 +1254,8 @@ github.com/grafana/dskit v0.0.0-20240925193654-7c41a4057319 h1:KACpOOTqA4WqyyKF2
 github.com/grafana/dskit v0.0.0-20240925193654-7c41a4057319/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ=
 github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc h1:BW+LjKJDz0So5LI8UZfW5neWeKpSkWqhmGjQFzcFfLM=
 github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc/go.mod h1:JVmqPBe8A/pZWwRoJW5ZjyALeY5OXMzPl7LrVXOdZAI=
+github.com/grafana/franz-go v0.0.0-20241003103709-f8f76a439eb4 h1:7/CJa4ilczGHLjULGJFxRFAGsnaN33YIJEqpm45TUYs=
+github.com/grafana/franz-go v0.0.0-20241003103709-f8f76a439eb4/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM=
 github.com/grafana/goautoneg v0.0.0-20240607115440-f335c04c58ce h1:WI1olbgS+sEl77qxEYbmt9TgRUz7iLqmjh8lYPpGlKQ=
 github.com/grafana/goautoneg v0.0.0-20240607115440-f335c04c58ce/go.mod h1:GFAN9Jn9t1cX7sNfc6ZoFyc4f7i8jtm3SajrWdZM2EE=
 github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56 h1:X8IKQ0wu40wpvYcKfBcc5T4QnhdQjUhtUtB/1CY89lE=
diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/compression.go b/vendor/github.com/twmb/franz-go/pkg/kgo/compression.go
index 81d9d8a7e3b..1adbe69a074 100644
--- a/vendor/github.com/twmb/franz-go/pkg/kgo/compression.go
+++ b/vendor/github.com/twmb/franz-go/pkg/kgo/compression.go
@@ -12,6 +12,8 @@ import (
 	"github.com/klauspost/compress/s2"
 	"github.com/klauspost/compress/zstd"
 	"github.com/pierrec/lz4/v4"
+
+	"github.com/twmb/franz-go/pkg/kgo/internal/pool"
 )
 
 var byteBuffers = sync.Pool{New: func() any { return bytes.NewBuffer(make([]byte, 8<<10)) }}
@@ -266,15 +268,23 @@ type zstdDecoder struct {
 	inner *zstd.Decoder
 }
 
-func (d *decompressor) decompress(src []byte, codec byte) ([]byte, error) {
+func (d *decompressor) decompress(src []byte, codec byte, pool *pool.BucketedPool[byte]) ([]byte, error) {
 	// Early return in case there is no compression
 	compCodec := codecType(codec)
 	if compCodec == codecNone {
 		return src, nil
 	}
-	out := byteBuffers.Get().(*bytes.Buffer)
-	out.Reset()
-	defer byteBuffers.Put(out)
+
+	out, buf, err := d.getDecodedBuffer(src, compCodec, pool)
+	if err != nil {
+		return nil, err
+	}
+	defer func() {
+		if compCodec == codecSnappy {
+			return
+		}
+		pool.Put(buf)
+	}()
 
 	switch compCodec {
 	case codecGzip:
@@ -286,7 +296,7 @@ func (d *decompressor) decompress(src []byte, codec byte) ([]byte, error) {
 		if _, err := io.Copy(out, ungz); err != nil {
 			return nil, err
 		}
-		return append([]byte(nil), out.Bytes()...), nil
+		return d.copyDecodedBuffer(out.Bytes(), compCodec, pool), nil
 	case codecSnappy:
 		if len(src) > 16 && bytes.HasPrefix(src, xerialPfx) {
 			return xerialDecode(src)
@@ -295,7 +305,7 @@ func (d *decompressor) decompress(src []byte, codec byte) ([]byte, error) {
 		if err != nil {
 			return nil, err
 		}
-		return append([]byte(nil), decoded...), nil
+		return d.copyDecodedBuffer(decoded, compCodec, pool), nil
 	case codecLZ4:
 		unlz4 := d.unlz4Pool.Get().(*lz4.Reader)
 		defer d.unlz4Pool.Put(unlz4)
@@ -303,7 +313,7 @@ func (d *decompressor) decompress(src []byte, codec byte) ([]byte, error) {
 		if _, err := io.Copy(out, unlz4); err != nil {
 			return nil, err
 		}
-		return append([]byte(nil), out.Bytes()...), nil
+		return d.copyDecodedBuffer(out.Bytes(), compCodec, pool), nil
 	case codecZstd:
 		unzstd := d.unzstdPool.Get().(*zstdDecoder)
 		defer d.unzstdPool.Put(unzstd)
@@ -311,12 +321,43 @@ func (d *decompressor) decompress(src []byte, codec byte) ([]byte, error) {
 		if err != nil {
 			return nil, err
 		}
-		return append([]byte(nil), decoded...), nil
+		return d.copyDecodedBuffer(decoded, compCodec, pool), nil
 	default:
 		return nil, errors.New("unknown compression codec")
 	}
 }
 
+func (d *decompressor) getDecodedBuffer(src []byte, compCodec codecType, pool *pool.BucketedPool[byte]) (*bytes.Buffer, []byte, error) {
+	var (
+		decodedBufSize int
+		err error
+	)
+	switch compCodec {
+	case codecSnappy:
+		decodedBufSize, err = s2.DecodedLen(src)
+		if err != nil {
+			return nil, nil, err
+		}
+
+	default:
+		// Make a guess at the output size.
+		decodedBufSize = len(src) * 2
+	}
+	buf := pool.Get(decodedBufSize)[:0]
+
+	return bytes.NewBuffer(buf), buf, nil
+}
+
+func (d *decompressor) copyDecodedBuffer(decoded []byte, compCodec codecType, pool *pool.BucketedPool[byte]) []byte {
+	if compCodec == codecSnappy {
+		// We already know the actual size of the decoded buffer before decompression,
+		// so there's no need to copy the buffer.
+		return decoded
+	}
+	out := pool.Get(len(decoded))
+	return append(out[:0], decoded...)
+}
+
 var xerialPfx = []byte{130, 83, 78, 65, 80, 80, 89, 0}
 
 var errMalformedXerial = errors.New("malformed xerial framing")
diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/config.go b/vendor/github.com/twmb/franz-go/pkg/kgo/config.go
index 92ebeaa3905..dabaf03f618 100644
--- a/vendor/github.com/twmb/franz-go/pkg/kgo/config.go
+++ b/vendor/github.com/twmb/franz-go/pkg/kgo/config.go
@@ -16,6 +16,8 @@ import (
 	"github.com/twmb/franz-go/pkg/kmsg"
 	"github.com/twmb/franz-go/pkg/kversion"
 	"github.com/twmb/franz-go/pkg/sasl"
+
+	"github.com/twmb/franz-go/pkg/kgo/internal/pool"
 )
 
 // Opt is an option to configure a client.
@@ -151,6 +153,9 @@ type cfg struct {
 	partitions map[string]map[int32]Offset // partitions to directly consume from
 	regex      bool
 
+	recordsPool          *recordsPool
+	decompressBufferPool *pool.BucketedPool[byte]
+
 	////////////////////////////
 	// CONSUMER GROUP SECTION //
 	////////////////////////////
@@ -389,6 +394,11 @@ func (cfg *cfg) validate() error {
 	}
 	cfg.hooks = processedHooks
 
+	// Assume a 2x compression ratio.
+	maxDecompressedBatchSize := int(cfg.maxBytes.load()) * 2
+	cfg.decompressBufferPool = pool.NewBucketedPool[byte](4096, maxDecompressedBatchSize, 2, func(sz int) []byte {
+		return make([]byte, sz)
+	})
 	return nil
 }
 
@@ -1347,6 +1357,18 @@ func ConsumeRegex() ConsumerOpt {
 	return consumerOpt{func(cfg *cfg) { cfg.regex = true }}
 }
 
+// EnableRecordsPool sets the client to obtain the *kgo.Record objects from a pool,
+// in order to minimize the number of allocations.
+//
+// By enabling this option, the records returned by PollFetches/PollRecords
+// can be sent back to the pool via ReuseRecords method in order to be recycled.
+//
+// This option is particularly useful for use cases where the volume of generated records is very high,
+// as it can negatively impact performance due to the extra GC overhead.
+func EnableRecordsPool() ConsumerOpt {
+	return consumerOpt{func(cfg *cfg) { cfg.recordsPool = newRecordsPool() }}
+}
+
 // DisableFetchSessions sets the client to not use fetch sessions (Kafka 1.0+).
 //
 // A "fetch session" is is a way to reduce bandwidth for fetch requests &
diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/internal/pool/bucketed_pool.go b/vendor/github.com/twmb/franz-go/pkg/kgo/internal/pool/bucketed_pool.go
new file mode 100644
index 00000000000..b5c435a55e2
--- /dev/null
+++ b/vendor/github.com/twmb/franz-go/pkg/kgo/internal/pool/bucketed_pool.go
@@ -0,0 +1,94 @@
+// Copyright 2017 The Prometheus Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pool
+
+import (
+	"sync"
+)
+
+// BucketedPool is a bucketed pool for variably sized slices.
+type BucketedPool[T any] struct {
+	buckets []sync.Pool
+	sizes   []int
+	// make is the function used to create an empty slice when none exist yet.
+	make func(int) []T
+}
+
+// NewBucketedPool returns a new BucketedPool with size buckets for minSize to maxSize
+// increasing by the given factor.
+func NewBucketedPool[T any](minSize, maxSize int, factor float64, makeFunc func(int) []T) *BucketedPool[T] {
+	if minSize < 1 {
+		panic("invalid minimum pool size")
+	}
+	if maxSize < 1 {
+		panic("invalid maximum pool size")
+	}
+	if factor < 1 {
+		panic("invalid factor")
+	}
+
+	var sizes []int
+
+	for s := minSize; s <= maxSize; s = int(float64(s) * factor) {
+		sizes = append(sizes, s)
+	}
+
+	p := &BucketedPool[T]{
+		buckets: make([]sync.Pool, len(sizes)),
+		sizes:   sizes,
+		make:    makeFunc,
+	}
+	return p
+}
+
+// Get returns a new slice with capacity greater than or equal to size.
+func (p *BucketedPool[T]) Get(size int) []T {
+	for i, bktSize := range p.sizes {
+		if size > bktSize {
+			continue
+		}
+		buff := p.buckets[i].Get()
+		if buff == nil {
+			buff = p.make(bktSize)
+		}
+		return buff.([]T)
+	}
+	return p.make(size)
+}
+
+// Put adds a slice to the right bucket in the pool.
+// If the slice does not belong to any bucket in the pool, it is ignored.
+func (p *BucketedPool[T]) Put(s []T) {
+	sCap := cap(s)
+	if sCap < p.sizes[0] {
+		return
+	}
+
+	for i, size := range p.sizes {
+		if sCap > size {
+			continue
+		}
+
+		if sCap == size {
+			// Buffer is exactly the minimum size for this bucket. Add it to this bucket.
+			p.buckets[i].Put(s)
+		} else {
+			// Buffer belongs in previous bucket.
+			p.buckets[i-1].Put(s)
+		}
+		return
+	}
+}
+
+
diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/record_and_fetch.go b/vendor/github.com/twmb/franz-go/pkg/kgo/record_and_fetch.go
index 4f1ebe6f524..dbfb94510a6 100644
--- a/vendor/github.com/twmb/franz-go/pkg/kgo/record_and_fetch.go
+++ b/vendor/github.com/twmb/franz-go/pkg/kgo/record_and_fetch.go
@@ -6,6 +6,8 @@ import (
 	"reflect"
 	"time"
 	"unsafe"
+
+	"github.com/twmb/franz-go/pkg/kmsg"
 )
 
 // RecordHeader contains extra information that can be sent with Records.
@@ -149,6 +151,38 @@ type Record struct {
 	// producer hooks. It can also be set in a consumer hook to propagate
 	// enrichment to consumer clients.
 	Context context.Context
+
+	// recordsPool is the pool that this record was fetched from, if any.
+	//
+	// When reused, record is returned to this pool.
+	recordsPool *recordsPool
+
+	// rcBatchBuffer is used to keep track of the raw buffer that this record was
+	// derived from when consuming, after decompression.
+	//
+	// This is used to allow reusing these buffers when record pooling has been enabled
+	// via EnableRecordsPool option.
+	rcBatchBuffer *rcBuffer[byte]
+
+	// rcRawRecordsBuffer is used to keep track of the raw record buffer that this record was
+	// derived from when consuming.
+	//
+	// This is used to allow reusing these buffers when record pooling has been enabled
+	// via EnableRecordsPool option.
+	rcRawRecordsBuffer *rcBuffer[kmsg.Record]
+}
+
+// Reuse releases the record back to the pool.
+//
+//
+// Once this method has been called, any reference to the passed record should be considered invalid by the caller,
+// as it may be reused as a result of future calls to the PollFetches/PollRecords method.
+func (r *Record) Reuse() {
+	if r.recordsPool != nil {
+		r.rcRawRecordsBuffer.release()
+		r.rcBatchBuffer.release()
+		r.recordsPool.put(r)
+	}
 }
 
 func (r *Record) userSize() int64 {
diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/source.go b/vendor/github.com/twmb/franz-go/pkg/kgo/source.go
index 85586a0a412..69fafba7feb 100644
--- a/vendor/github.com/twmb/franz-go/pkg/kgo/source.go
+++ b/vendor/github.com/twmb/franz-go/pkg/kgo/source.go
@@ -9,13 +9,61 @@ import (
 	"sort"
 	"strings"
 	"sync"
+	"sync/atomic"
 	"time"
 
 	"github.com/twmb/franz-go/pkg/kbin"
 	"github.com/twmb/franz-go/pkg/kerr"
+	"github.com/twmb/franz-go/pkg/kgo/internal/pool"
 	"github.com/twmb/franz-go/pkg/kmsg"
 )
 
+type recordsPool struct{ p *sync.Pool }
+
+func newRecordsPool() *recordsPool {
+	return &recordsPool{
+		p: &sync.Pool{New: func() any { return &Record{} }},
+	}
+}
+
+func (p *recordsPool) get() *Record {
+	return p.p.Get().(*Record)
+}
+
+func (p *recordsPool) put(r *Record) {
+	*r = Record{} // zero out the record
+	p.p.Put(r)
+}
+
+// rcBuffer is a reference counted buffer.
+//
+// The internal buffer will be sent back to the pool after calling release
+// when the ref count reaches 0.
+type rcBuffer[T any] struct {
+	refCount atomic.Int32
+	buffer   []T
+	pool     *pool.BucketedPool[T]
+}
+
+func newRCBuffer[T any](buffer []T, pool *pool.BucketedPool[T]) *rcBuffer[T] {
+	return &rcBuffer[T]{buffer: buffer, pool: pool}
+}
+
+func (b *rcBuffer[T]) acquire() {
+	b.refCount.Add(1)
+}
+
+func (b *rcBuffer[T]) release() {
+	if b.refCount.Add(-1) == 0 {
+		b.pool.Put(b.buffer)
+		b.buffer = nil
+		return
+	}
+	if b.refCount.Load() < 0 {
+		panic("rcBuffer released too many times")
+	}
+}
+
 type readerFrom interface {
 	ReadFrom([]byte) error
 }
@@ -110,6 +158,12 @@ type ProcessFetchPartitionOptions struct {
 
 	// Topic is used to populate the Partition field of each Record.
 	Partition int32
+
+	// DecompressBufferPool is a pool of buffers to use for decompressing batches.
+	DecompressBufferPool *pool.BucketedPool[byte]
+
+	// recordsPool is for internal use only.
+	recordPool *recordsPool
 }
 
 // cursor is where we are consuming from for an individual partition.
@@ -1088,7 +1142,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
 				continue
 			}
 
-			fp := partOffset.processRespPartition(br, rp, s.cl.cfg.hooks)
+			fp := partOffset.processRespPartition(br, rp, s.cl.cfg.hooks, s.cl.cfg.recordsPool, s.cl.cfg.decompressBufferPool)
 			if fp.Err != nil {
 				if moving := kmove.maybeAddFetchPartition(resp, rp, partOffset.from); moving {
 					strip(topic, partition, fp.Err)
@@ -1265,16 +1319,18 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
 
 // processRespPartition processes all records in all potentially compressed
 // batches (or message sets).
-func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchResponseTopicPartition, hooks hooks) (fp FetchPartition) {
+func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchResponseTopicPartition, hooks hooks, recordsPool *recordsPool, decompressBufferPool *pool.BucketedPool[byte]) (fp FetchPartition) {
 	if rp.ErrorCode == 0 {
 		o.hwm = rp.HighWatermark
 	}
 	opts := ProcessFetchPartitionOptions{
-		KeepControlRecords: br.cl.cfg.keepControl,
-		Offset:             o.offset,
-		IsolationLevel:     IsolationLevel{br.cl.cfg.isolationLevel},
-		Topic:              o.from.topic,
-		Partition:          o.from.partition,
+		KeepControlRecords:   br.cl.cfg.keepControl,
+		Offset:               o.offset,
+		IsolationLevel:       IsolationLevel{br.cl.cfg.isolationLevel},
+		Topic:                o.from.topic,
+		Partition:            o.from.partition,
+		DecompressBufferPool: decompressBufferPool,
+		recordPool:           recordsPool,
 	}
 	observeMetrics := func(m FetchBatchMetrics) {
 		hooks.each(func(h Hook) {
@@ -1483,11 +1539,17 @@ func (a aborter) trackAbortedPID(producerID int64) {
 // processing records to fetch part //
 //////////////////////////////////////
 
+var rawRecordsPool = pool.NewBucketedPool[kmsg.Record](32, 16*1024, 2, func(len int) []kmsg.Record {
+	return make([]kmsg.Record, len)
+})
+
 // readRawRecords reads n records from in and returns them, returning early if
 // there were partial records.
 func readRawRecords(n int, in []byte) []kmsg.Record {
-	rs := make([]kmsg.Record, n)
+	rs := rawRecordsPool.Get(n)
+	rs = rs[:n]
 	for i := 0; i < n; i++ {
+		rs[i] = kmsg.Record{}
 		length, used := kbin.Varint(in)
 		total := used + int(length)
 		if used == 0 || length < 0 || len(in) < total {
@@ -1523,7 +1585,7 @@ func processRecordBatch(
 	rawRecords := batch.Records
 	if compression := byte(batch.Attributes & 0x0007); compression != 0 {
 		var err error
-		if rawRecords, err = decompressor.decompress(rawRecords, compression); err != nil {
+		if rawRecords, err = decompressor.decompress(rawRecords, compression, o.DecompressBufferPool); err != nil {
 			return 0, 0 // truncated batch
 		}
 	}
@@ -1550,6 +1612,15 @@ func processRecordBatch(
 		}
 	}()
 
+	var (
+		rcBatchBuff      *rcBuffer[byte]
+		rcRawRecordsBuff *rcBuffer[kmsg.Record]
+	)
+	if o.recordPool != nil {
+		rcBatchBuff = newRCBuffer(rawRecords, o.DecompressBufferPool)
+		rcRawRecordsBuff = newRCBuffer(krecords, rawRecordsPool)
+	}
+
 	abortBatch := aborter.shouldAbortBatch(batch)
 	for i := range krecords {
 		record := recordToRecord(
@@ -1557,9 +1628,10 @@ func processRecordBatch(
 			fp.Partition,
 			batch,
 			&krecords[i],
+			o.recordPool,
 		)
-		o.maybeKeepRecord(fp, record, abortBatch)
 
+		o.maybeKeepRecord(fp, record, rcBatchBuff, rcRawRecordsBuff, abortBatch)
 		if abortBatch && record.Attrs.IsControl() {
 			// A control record has a key and a value where the key
 			// is int16 version and int16 type. Aborted records
@@ -1584,7 +1656,7 @@ func processV1OuterMessage(o *ProcessFetchPartitionOptions, fp *FetchPartition,
 		return 1, 0
 	}
 
-	rawInner, err := decompressor.decompress(message.Value, compression)
+	rawInner, err := decompressor.decompress(message.Value, compression, o.DecompressBufferPool)
 	if err != nil {
 		return 0, 0 // truncated batch
 	}
@@ -1679,7 +1751,7 @@ func processV1Message(
 		return false
 	}
 	record := v1MessageToRecord(o.Topic, fp.Partition, message)
-	o.maybeKeepRecord(fp, record, false)
+	o.maybeKeepRecord(fp, record, nil, nil, false)
 	return true
 }
 
@@ -1697,7 +1769,7 @@ func processV0OuterMessage(
 		return 1, 0 // uncompressed bytes is 0; set to compressed bytes on return
 	}
 
-	rawInner, err := decompressor.decompress(message.Value, compression)
+	rawInner, err := decompressor.decompress(message.Value, compression, o.DecompressBufferPool)
 	if err != nil {
 		return 0, 0 // truncated batch
 	}
@@ -1757,7 +1829,7 @@ func processV0Message(
 		return false
 	}
 	record := v0MessageToRecord(o.Topic, fp.Partition, message)
-	o.maybeKeepRecord(fp, record, false)
+	o.maybeKeepRecord(fp, record, nil, nil, false)
 	return true
 }
 
@@ -1765,7 +1837,7 @@ func processV0Message(
 //
 // If the record is being aborted or the record is a control record and the
 // client does not want to keep control records, this does not keep the record.
-func (o *ProcessFetchPartitionOptions) maybeKeepRecord(fp *FetchPartition, record *Record, abort bool) {
+func (o *ProcessFetchPartitionOptions) maybeKeepRecord(fp *FetchPartition, record *Record, rcBatchBuff *rcBuffer[byte], rcRawRecordsBuff *rcBuffer[kmsg.Record], abort bool) {
 	if record.Offset < o.Offset {
 		// We asked for offset 5, but that was in the middle of a
 		// batch; we got offsets 0 thru 4 that we need to skip.
@@ -1777,6 +1849,13 @@ func (o *ProcessFetchPartitionOptions) maybeKeepRecord(fp *FetchPartition, recor
 		abort = !o.KeepControlRecords
 	}
 	if !abort {
+		if rcBatchBuff != nil && rcRawRecordsBuff != nil {
+			rcBatchBuff.acquire()
+			record.rcBatchBuffer = rcBatchBuff
+
+			rcRawRecordsBuff.acquire()
+			record.rcRawRecordsBuffer = rcRawRecordsBuff
+		}
 		fp.Records = append(fp.Records, record)
 	}
 
@@ -1799,6 +1878,7 @@ func recordToRecord(
 	partition int32,
 	batch *kmsg.RecordBatch,
 	record *kmsg.Record,
+	recordsPool *recordsPool,
 ) *Record {
 	h := make([]RecordHeader, 0, len(record.Headers))
 	for _, kv := range record.Headers {
@@ -1807,19 +1887,25 @@ func recordToRecord(
 			Value: kv.Value,
 		})
 	}
-
-	r := &Record{
-		Key:           record.Key,
-		Value:         record.Value,
-		Headers:       h,
-		Topic:         topic,
-		Partition:     partition,
-		Attrs:         RecordAttrs{uint8(batch.Attributes)},
-		ProducerID:    batch.ProducerID,
-		ProducerEpoch: batch.ProducerEpoch,
-		LeaderEpoch:   batch.PartitionLeaderEpoch,
-		Offset:        batch.FirstOffset + int64(record.OffsetDelta),
+	var r *Record
+	if recordsPool != nil {
+		r = recordsPool.get()
+	} else {
+		r = new(Record)
 	}
+
+	r.Key = record.Key
+	r.Value = record.Value
+	r.Headers = h
+	r.Topic = topic
+	r.Partition = partition
+	r.Attrs = RecordAttrs{uint8(batch.Attributes)}
+	r.ProducerID = batch.ProducerID
+	r.ProducerEpoch = batch.ProducerEpoch
+	r.LeaderEpoch = batch.PartitionLeaderEpoch
+	r.Offset = batch.FirstOffset + int64(record.OffsetDelta)
+	r.recordsPool = recordsPool
+
 	if r.Attrs.TimestampType() == 0 {
 		r.Timestamp = timeFromMillis(batch.FirstTimestamp + record.TimestampDelta64)
 	} else {
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 313b1d68673..ceb9e3fdc81 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -1143,11 +1143,12 @@ github.com/tklauser/go-sysconf
 # github.com/tklauser/numcpus v0.6.1
 ## explicit; go 1.13
 github.com/tklauser/numcpus
-# github.com/twmb/franz-go v1.17.1 => github.com/dimitarvdimitrov/franz-go v0.0.0-20240904145554-ceadc28d3bd9
+# github.com/twmb/franz-go v1.17.1 => github.com/grafana/franz-go v0.0.0-20241003103709-f8f76a439eb4
 ## explicit; go 1.21
 github.com/twmb/franz-go/pkg/kbin
 github.com/twmb/franz-go/pkg/kerr
 github.com/twmb/franz-go/pkg/kgo
+github.com/twmb/franz-go/pkg/kgo/internal/pool
 github.com/twmb/franz-go/pkg/kgo/internal/sticky
 github.com/twmb/franz-go/pkg/kversion
 github.com/twmb/franz-go/pkg/sasl
@@ -1672,5 +1673,5 @@ sigs.k8s.io/yaml/goyaml.v3
 # github.com/opentracing-contrib/go-stdlib => github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956
 # github.com/opentracing-contrib/go-grpc => github.com/charleskorn/go-grpc v0.0.0-20231024023642-e9298576254f
 # github.com/prometheus/alertmanager => github.com/grafana/prometheus-alertmanager v0.25.1-0.20240924175849-b8b7c2c74eb6
-# github.com/twmb/franz-go => github.com/dimitarvdimitrov/franz-go v0.0.0-20240904145554-ceadc28d3bd9
+# github.com/twmb/franz-go => github.com/grafana/franz-go v0.0.0-20241003103709-f8f76a439eb4
 # google.golang.org/grpc => google.golang.org/grpc v1.65.0