Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pool for encoders #1584

Merged
merged 12 commits into from
Jan 20, 2025
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Added the use of sync.Pool in EncoderMap and renamed it to MultiEncoder
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renaming in internal packages no need to be logger into public changelog. Most important: supported pool of encoders, which implement ResetableWriter interface


## v3.97.0
* Added immutable range iterators from go1.23 into query stats to iterate over query phases and accessed tables without query stats object mutation

Expand Down
90 changes: 81 additions & 9 deletions internal/topic/topicwriterinternal/encoders.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,40 @@ const (
codecUnknown = rawtopiccommon.CodecUNSPECIFIED
)

type EncoderMap struct {
type PublicResetableWriter interface {
io.WriteCloser
Reset(wr io.Writer)
}

type encoderPool struct {
pool sync.Pool
}

func (p *encoderPool) Get() PublicResetableWriter {
enc, _ := p.pool.Get().(PublicResetableWriter)

return enc
}

func (p *encoderPool) Put(wr PublicResetableWriter) {
p.pool.Put(wr)
}

func newEncoderPool() *encoderPool {
return &encoderPool{
pool: sync.Pool{},
}
}

type MultiEncoder struct {
m map[rawtopiccommon.Codec]PublicCreateEncoderFunc

mx sync.RWMutex
p map[rawtopiccommon.Codec]*encoderPool
}

func NewEncoderMap() *EncoderMap {
return &EncoderMap{
func NewEncoderMap() *MultiEncoder {
return &MultiEncoder{
m: map[rawtopiccommon.Codec]PublicCreateEncoderFunc{
rawtopiccommon.CodecRaw: func(writer io.Writer) (io.WriteCloser, error) {
return nopWriteCloser{writer}, nil
Expand All @@ -31,22 +59,66 @@ func NewEncoderMap() *EncoderMap {
return gzip.NewWriter(writer), nil
},
},
mx: sync.RWMutex{},
p: make(map[rawtopiccommon.Codec]*encoderPool),
}
}

func (e *EncoderMap) AddEncoder(codec rawtopiccommon.Codec, creator PublicCreateEncoderFunc) {
func (e *MultiEncoder) AddEncoder(codec rawtopiccommon.Codec, creator PublicCreateEncoderFunc) {
e.m[codec] = creator
}

func (e *EncoderMap) CreateLazyEncodeWriter(codec rawtopiccommon.Codec, target io.Writer) (io.WriteCloser, error) {
func (e *MultiEncoder) Encode(codec rawtopiccommon.Codec, target io.Writer, data []byte) (int, error) {
enc, err := e.createEncodeWriter(codec, target)
if err != nil {
return 0, err
}

n, err := enc.Write(data)
if err == nil {
err = enc.Close()
}
if err != nil {
return 0, err
}

resetableEnc, ok := enc.(PublicResetableWriter)
if ok {
e.mx.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you need two locks every time: for check pool and for put codec. It is difficult to read and to compute

What about create pool for every codec within AddEncoder func?
It allows to remove sync for the maps completely, because them will be changed within single thead on init only.

if _, ok := e.p[codec]; !ok {
e.p[codec] = newEncoderPool()
}
e.mx.Unlock()

e.mx.RLock()
e.p[codec].Put(resetableEnc)
e.mx.RUnlock()
}

return n, nil
}

func (e *MultiEncoder) createEncodeWriter(codec rawtopiccommon.Codec, target io.Writer) (io.WriteCloser, error) {
e.mx.RLock()
defer e.mx.RUnlock()

if ePool, ok := e.p[codec]; ok {
wr := ePool.Get()
if wr != nil {
wr.Reset(target)

return wr, nil
}
}

if encoderCreator, ok := e.m[codec]; ok {
return encoderCreator(target)
}

return nil, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf("ydb: unexpected codec '%v' for encode message", codec)))
}

func (e *EncoderMap) GetSupportedCodecs() rawtopiccommon.SupportedCodecs {
func (e *MultiEncoder) GetSupportedCodecs() rawtopiccommon.SupportedCodecs {
res := make(rawtopiccommon.SupportedCodecs, 0, len(e.m))
for codec := range e.m {
res = append(res, codec)
Expand All @@ -55,7 +127,7 @@ func (e *EncoderMap) GetSupportedCodecs() rawtopiccommon.SupportedCodecs {
return res
}

func (e *EncoderMap) IsSupported(codec rawtopiccommon.Codec) bool {
func (e *MultiEncoder) IsSupported(codec rawtopiccommon.Codec) bool {
_, ok := e.m[codec]

return ok
Expand All @@ -73,7 +145,7 @@ func (nopWriteCloser) Close() error {

// EncoderSelector not thread safe
type EncoderSelector struct {
m *EncoderMap
m *MultiEncoder

tracer *trace.Topic
writerReconnectorID string
Expand All @@ -87,7 +159,7 @@ type EncoderSelector struct {
}

func NewEncoderSelector(
m *EncoderMap,
m *MultiEncoder,
allowedCodecs rawtopiccommon.SupportedCodecs,
parallelCompressors int,
tracer *trace.Topic,
Expand Down
79 changes: 79 additions & 0 deletions internal/topic/topicwriterinternal/encoders_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package topicwriterinternal

import (
"bytes"
"compress/gzip"
"fmt"
"io"
"strings"
"testing"

Expand Down Expand Up @@ -206,3 +209,79 @@ func TestCompressMessages(t *testing.T) {
require.Error(t, cacheMessages(messages, rawtopiccommon.CodecGzip, parallelCount))
})
}

func TestEncodersPool(t *testing.T) {
decompressGzip := func(rd io.Reader) string {
gzreader, err := gzip.NewReader(rd)
require.NoError(t, err)

decompressedMsg, err := io.ReadAll(gzreader)
require.NoError(t, err)

return string(decompressedMsg)
}

t.Run("NotResetableWriter", func(t *testing.T) {
testEncoderMap := NewEncoderMap()

buf := &bytes.Buffer{}
_, err := testEncoderMap.Encode(rawtopiccommon.CodecRaw, buf, []byte("test_data"))
require.NoError(t, err)
require.Len(t, testEncoderMap.p, 0)
require.Equal(t, "test_data", buf.String())
})

t.Run("ResetableWriterCustom", func(t *testing.T) {
testEncoderMap := NewEncoderMap()

customCodecCode := rawtopiccommon.Codec(5)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about use code of gzip codec? Or use code from custom codec space (10000-20000)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if it's worth checking what happens with gzip codec code, since in the previous implementation, such use of AddEncoder overwritten the value

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overwrite codec must work - it is one of expected scenarion: customer overwrite internal codec to own implementation (for example special settings of gzip of more efficient implementation, than stdlib).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I add a test for codec overwriting case?

testEncoderMap.AddEncoder(customCodecCode, func(writer io.Writer) (io.WriteCloser, error) {
return gzip.NewWriter(writer), nil
})

buf := &bytes.Buffer{}
_, err := testEncoderMap.Encode(customCodecCode, buf, []byte("test_data_1"))
require.NoError(t, err)
require.Len(t, testEncoderMap.p, 1)
require.Equal(t, "test_data_1", decompressGzip(buf))

buf.Reset()
_, err = testEncoderMap.Encode(rawtopiccommon.CodecGzip, buf, []byte("test_data_2"))
require.NoError(t, err)
require.Len(t, testEncoderMap.p, 2)
require.Equal(t, "test_data_2", decompressGzip(buf))
})

t.Run("ResetableWriter", func(t *testing.T) {
testEncoderMap := NewEncoderMap()

buf := &bytes.Buffer{}
_, err := testEncoderMap.Encode(rawtopiccommon.CodecGzip, buf, []byte("test_data_1"))
require.NoError(t, err)
require.Len(t, testEncoderMap.p, 1)
require.Equal(t, "test_data_1", decompressGzip(buf))

buf.Reset()
_, err = testEncoderMap.Encode(rawtopiccommon.CodecGzip, buf, []byte("test_data_2"))
require.NoError(t, err)
require.Len(t, testEncoderMap.p, 1)
require.Equal(t, "test_data_2", decompressGzip(buf))
})

t.Run("ResetableWriterManyMessages", func(t *testing.T) {
testEncoderMap := NewEncoderMap()

buf := &bytes.Buffer{}
for i := 0; i < 50; i++ {
testMsg := []byte(fmt.Sprintf("data_%d", i))

buf.Reset()
_, err := testEncoderMap.Encode(rawtopiccommon.CodecGzip, buf, testMsg)
require.NoError(t, err)

require.Equal(t, string(testMsg), decompressGzip(buf))
}

require.Len(t, testEncoderMap.p, 1)
})
}
34 changes: 11 additions & 23 deletions internal/topic/topicwriterinternal/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type messageWithDataContent struct {
bufCodec rawtopiccommon.Codec
bufEncoded bytes.Buffer
rawBuf bytes.Buffer
encoders *EncoderMap
encoders *MultiEncoder
BufUncompressedSize int
}

Expand Down Expand Up @@ -111,18 +111,7 @@ func (m *messageWithDataContent) encodeRawContent(codec rawtopiccommon.Codec) ([

m.bufEncoded.Reset()

writer, err := m.encoders.CreateLazyEncodeWriter(codec, &m.bufEncoded)
if err != nil {
return nil, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf(
"ydb: failed create encoder for message, codec '%v': %w",
codec,
err,
)))
}
_, err = writer.Write(m.rawBuf.Bytes())
if err == nil {
err = writer.Close()
}
_, err := m.encoders.Encode(codec, &m.bufEncoded, m.rawBuf.Bytes())
if err != nil {
return nil, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf(
"ydb: failed to compress message, codec '%v': %w",
Expand Down Expand Up @@ -157,27 +146,26 @@ func (m *messageWithDataContent) readDataToTargetCodec(codec rawtopiccommon.Code
m.bufCodec = codec
m.bufEncoded.Reset()

encoder, err := m.encoders.CreateLazyEncodeWriter(codec, &m.bufEncoded)
if err != nil {
return err
}

reader := m.Data
if reader == nil {
reader = &bytes.Reader{}
}
bytesCount, err := io.Copy(encoder, reader)
if err == nil {
err = encoder.Close()

buf := &bytes.Buffer{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about use xsync.Pool - internal type wrapper over sync pool for get bytes buffer?

It is often operation - it will need for every message.

_, err := buf.ReadFrom(reader)
if err != nil {
return err
}

bytesCount, err := m.encoders.Encode(codec, &m.bufEncoded, buf.Bytes())
if err != nil {
return xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf(
"ydb: failed compress message with codec '%v': %w",
codec,
err,
)))
}
m.BufUncompressedSize = int(bytesCount)
m.BufUncompressedSize = bytesCount
m.Data = nil

return nil
Expand Down Expand Up @@ -218,7 +206,7 @@ func (m *messageWithDataContent) getEncodedBytes(codec rawtopiccommon.Codec) ([]

func newMessageDataWithContent(
message PublicMessage, //nolint:gocritic
encoders *EncoderMap,
encoders *MultiEncoder,
) messageWithDataContent {
return messageWithDataContent{
PublicMessage: message,
Expand Down
4 changes: 2 additions & 2 deletions internal/topic/topicwriterinternal/writer_reconnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ type WriterReconnector struct {
semaphore *semaphore.Weighted
firstInitResponseProcessedChan empty.Chan
lastSeqNo int64
encodersMap *EncoderMap
encodersMap *MultiEncoder
initDoneCh empty.Chan
initInfo InitialInfo
m xsync.RWMutex
Expand Down Expand Up @@ -760,7 +760,7 @@ func createRawMessageData(
return res, err
}

func calculateAllowedCodecs(forceCodec rawtopiccommon.Codec, encoderMap *EncoderMap,
func calculateAllowedCodecs(forceCodec rawtopiccommon.Codec, encoderMap *MultiEncoder,
serverCodecs rawtopiccommon.SupportedCodecs,
) rawtopiccommon.SupportedCodecs {
if forceCodec != rawtopiccommon.CodecUNSPECIFIED {
Expand Down
4 changes: 2 additions & 2 deletions internal/topic/topicwriterinternal/writer_single_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type SingleStreamWriterConfig struct {

stream RawTopicWriterStream
queue *messageQueue
encodersMap *EncoderMap
encodersMap *MultiEncoder
getLastSeqNum bool
reconnectorInstanceID string
}
Expand All @@ -32,7 +32,7 @@ func newSingleStreamWriterConfig(
common WritersCommonConfig, //nolint:gocritic
stream RawTopicWriterStream,
queue *messageQueue,
encodersMap *EncoderMap,
encodersMap *MultiEncoder,
getLastSeqNum bool,
reconnectorID string,
) SingleStreamWriterConfig {
Expand Down
6 changes: 6 additions & 0 deletions topic/topicoptions/topicoptions_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,14 @@ type WriteSessionMetadata map[string]string
// CreateEncoderFunc for create message decoders
type CreateEncoderFunc = topicwriterinternal.PublicCreateEncoderFunc

// ResettableWriter is able to reset a nested writer between uses.
type ResetableWriter = topicwriterinternal.PublicResetableWriter

// WithWriterAddEncoder add custom codec implementation to writer.
// It allows to set custom codecs implementations for custom and internal codecs.
//
// If CreateEncoderFunc returns a writer implementing ResetableWriter, then the compression objects
// will be reused for this codec. This will reduce the load on the GC.
func WithWriterAddEncoder(codec topictypes.Codec, f CreateEncoderFunc) WriterOption {
return topicwriterinternal.WithAddEncoder(rawtopiccommon.Codec(codec), f)
}
Expand Down
Loading