diff --git a/pkg/kgo/record_and_fetch.go b/pkg/kgo/record_and_fetch.go index dbfb9451..c82d7539 100644 --- a/pkg/kgo/record_and_fetch.go +++ b/pkg/kgo/record_and_fetch.go @@ -178,9 +178,13 @@ type Record struct { // Once this method has been called, any reference to the passed record should be considered invalid by the caller, // as it may be reused as a result of future calls to the PollFetches/PollRecords method. func (r *Record) Reuse() { - if r.recordsPool != nil { + if r.rcRawRecordsBuffer != nil { r.rcRawRecordsBuffer.release() + } + if r.rcBatchBuffer != nil { r.rcBatchBuffer.release() + } + if r.recordsPool != nil { r.recordsPool.put(r) } } diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 0f2d8962..51f87374 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -1583,7 +1583,9 @@ func processRecordBatch( } rawRecords := batch.Records - if compression := byte(batch.Attributes & 0x0007); compression != 0 { + + compression := byte(batch.Attributes & 0x0007) + if compression != 0 { var err error if rawRecords, err = decompressor.decompress(rawRecords, compression, o.DecompressBufferPool); err != nil { return 0, 0 // truncated batch @@ -1616,7 +1618,7 @@ func processRecordBatch( rcBatchBuff *rcBuffer[byte] rcRawRecordsBuff *rcBuffer[kmsg.Record] ) - if o.recordPool != nil { + if o.recordPool != nil && codecType(compression) != codecNone { rcBatchBuff = newRCBuffer(rawRecords, o.DecompressBufferPool) rcRawRecordsBuff = newRCBuffer(krecords, rawRecordsPool) }