Skip to content

Commit

Permalink
Merge pull request #1584 Pool for encoders from DenisEvd/pool-for-enc…
Browse files Browse the repository at this point in the history
…oders
  • Loading branch information
rekby authored Jan 20, 2025
2 parents 0dc77f5 + c1df891 commit 84893f0
Show file tree
Hide file tree
Showing 8 changed files with 208 additions and 49 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Supported pool of encoders, which implement ResetableWriter interface

## v3.97.0
* Added immutable range iterators from go1.23 into query stats to iterate over query phases and accessed tables without query stats object mutation

Expand Down
110 changes: 95 additions & 15 deletions internal/topic/topicwriterinternal/encoders.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package topicwriterinternal

import (
"bytes"
"compress/gzip"
"fmt"
"io"
Expand All @@ -17,36 +18,115 @@ const (
codecUnknown = rawtopiccommon.CodecUNSPECIFIED
)

type EncoderMap struct {
type PublicResetableWriter interface {
io.WriteCloser
Reset(wr io.Writer)
}

type encoderPool struct {
pool sync.Pool
}

func (p *encoderPool) Get() PublicResetableWriter {
enc, _ := p.pool.Get().(PublicResetableWriter)

return enc
}

func (p *encoderPool) Put(wr PublicResetableWriter) {
p.pool.Put(wr)
}

func newEncoderPool() *encoderPool {
return &encoderPool{
pool: sync.Pool{},
}
}

type MultiEncoder struct {
m map[rawtopiccommon.Codec]PublicCreateEncoderFunc

bp xsync.Pool[bytes.Buffer]
ep map[rawtopiccommon.Codec]*encoderPool
}

func NewEncoderMap() *EncoderMap {
return &EncoderMap{
m: map[rawtopiccommon.Codec]PublicCreateEncoderFunc{
rawtopiccommon.CodecRaw: func(writer io.Writer) (io.WriteCloser, error) {
return nopWriteCloser{writer}, nil
},
rawtopiccommon.CodecGzip: func(writer io.Writer) (io.WriteCloser, error) {
return gzip.NewWriter(writer), nil
func NewMultiEncoder() *MultiEncoder {
me := &MultiEncoder{
m: make(map[rawtopiccommon.Codec]PublicCreateEncoderFunc),
bp: xsync.Pool[bytes.Buffer]{
New: func() *bytes.Buffer {
return &bytes.Buffer{}
},
},
ep: make(map[rawtopiccommon.Codec]*encoderPool),
}

me.AddEncoder(rawtopiccommon.CodecRaw, func(writer io.Writer) (io.WriteCloser, error) {
return nopWriteCloser{writer}, nil
})
me.AddEncoder(rawtopiccommon.CodecGzip, func(writer io.Writer) (io.WriteCloser, error) {
return gzip.NewWriter(writer), nil
})

return me
}

func (e *EncoderMap) AddEncoder(codec rawtopiccommon.Codec, creator PublicCreateEncoderFunc) {
func (e *MultiEncoder) AddEncoder(codec rawtopiccommon.Codec, creator PublicCreateEncoderFunc) {
e.m[codec] = creator
e.ep[codec] = newEncoderPool()
}

func (e *EncoderMap) CreateLazyEncodeWriter(codec rawtopiccommon.Codec, target io.Writer) (io.WriteCloser, error) {
func (e *MultiEncoder) Encode(codec rawtopiccommon.Codec, target io.Writer, source io.Reader) (int, error) {
buf := e.bp.GetOrNew()
defer e.bp.Put(buf)

buf.Reset()
if _, err := buf.ReadFrom(source); err != nil {
return 0, err
}

return e.EncodeBytes(codec, target, buf.Bytes())
}

func (e *MultiEncoder) EncodeBytes(codec rawtopiccommon.Codec, target io.Writer, data []byte) (int, error) {
enc, err := e.createEncodeWriter(codec, target)
if err != nil {
return 0, err
}

n, err := enc.Write(data)
if err == nil {
err = enc.Close()
}
if err != nil {
return 0, err
}

if resetableEnc, ok := enc.(PublicResetableWriter); ok {
e.ep[codec].Put(resetableEnc)
}

return n, nil
}

func (e *MultiEncoder) createEncodeWriter(codec rawtopiccommon.Codec, target io.Writer) (io.WriteCloser, error) {
if ePool, ok := e.ep[codec]; ok {
wr := ePool.Get()
if wr != nil {
wr.Reset(target)

return wr, nil
}
}

if encoderCreator, ok := e.m[codec]; ok {
return encoderCreator(target)
}

return nil, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf("ydb: unexpected codec '%v' for encode message", codec)))
}

func (e *EncoderMap) GetSupportedCodecs() rawtopiccommon.SupportedCodecs {
func (e *MultiEncoder) GetSupportedCodecs() rawtopiccommon.SupportedCodecs {
res := make(rawtopiccommon.SupportedCodecs, 0, len(e.m))
for codec := range e.m {
res = append(res, codec)
Expand All @@ -55,7 +135,7 @@ func (e *EncoderMap) GetSupportedCodecs() rawtopiccommon.SupportedCodecs {
return res
}

func (e *EncoderMap) IsSupported(codec rawtopiccommon.Codec) bool {
func (e *MultiEncoder) IsSupported(codec rawtopiccommon.Codec) bool {
_, ok := e.m[codec]

return ok
Expand All @@ -73,7 +153,7 @@ func (nopWriteCloser) Close() error {

// EncoderSelector not thread safe
type EncoderSelector struct {
m *EncoderMap
m *MultiEncoder

tracer *trace.Topic
writerReconnectorID string
Expand All @@ -87,7 +167,7 @@ type EncoderSelector struct {
}

func NewEncoderSelector(
m *EncoderMap,
m *MultiEncoder,
allowedCodecs rawtopiccommon.SupportedCodecs,
parallelCompressors int,
tracer *trace.Topic,
Expand Down
91 changes: 90 additions & 1 deletion internal/topic/topicwriterinternal/encoders_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package topicwriterinternal

import (
"bytes"
"compress/gzip"
"fmt"
"io"
"strings"
"testing"

Expand All @@ -20,7 +23,7 @@ func TestEncoderSelector_CodecMeasure(t *testing.T) {
})
t.Run("One", func(t *testing.T) {
s := NewEncoderSelector(
NewEncoderMap(),
NewMultiEncoder(),
rawtopiccommon.SupportedCodecs{rawtopiccommon.CodecRaw},
1,
&trace.Topic{},
Expand Down Expand Up @@ -206,3 +209,89 @@ func TestCompressMessages(t *testing.T) {
require.Error(t, cacheMessages(messages, rawtopiccommon.CodecGzip, parallelCount))
})
}

func TestMultiEncoder(t *testing.T) {
decompressGzip := func(rd io.Reader) string {
gzreader, err := gzip.NewReader(rd)
require.NoError(t, err)

decompressedMsg, err := io.ReadAll(gzreader)
require.NoError(t, err)

return string(decompressedMsg)
}

t.Run("BuffersPool", func(t *testing.T) {
testMultiEncoder := NewMultiEncoder()

buf := &bytes.Buffer{}
for i := 0; i < 50; i++ {
testMsg := []byte(fmt.Sprintf("test_data_%d", i))

buf.Reset()
_, err := testMultiEncoder.Encode(rawtopiccommon.CodecGzip, buf, bytes.NewReader(testMsg))
require.NoError(t, err)

require.Equal(t, string(testMsg), decompressGzip(buf))
}
})

t.Run("NotResetableWriter", func(t *testing.T) {
testMultiEncoder := NewMultiEncoder()
require.Len(t, testMultiEncoder.ep, 2)

buf := &bytes.Buffer{}
_, err := testMultiEncoder.EncodeBytes(rawtopiccommon.CodecRaw, buf, []byte("test_data"))
require.NoError(t, err)
require.Equal(t, "test_data", buf.String())
})

t.Run("ResetableWriterCustom", func(t *testing.T) {
testMultiEncoder := NewMultiEncoder()

customCodecCode := rawtopiccommon.Codec(10001)
testMultiEncoder.AddEncoder(customCodecCode, func(writer io.Writer) (io.WriteCloser, error) {
return gzip.NewWriter(writer), nil
})
require.Len(t, testMultiEncoder.ep, 3)

buf := &bytes.Buffer{}
_, err := testMultiEncoder.EncodeBytes(customCodecCode, buf, []byte("test_data_1"))
require.NoError(t, err)
require.Equal(t, "test_data_1", decompressGzip(buf))

buf.Reset()
_, err = testMultiEncoder.EncodeBytes(rawtopiccommon.CodecGzip, buf, []byte("test_data_2"))
require.NoError(t, err)
require.Equal(t, "test_data_2", decompressGzip(buf))
})

t.Run("ResetableWriter", func(t *testing.T) {
testMultiEncoder := NewMultiEncoder()

buf := &bytes.Buffer{}
_, err := testMultiEncoder.EncodeBytes(rawtopiccommon.CodecGzip, buf, []byte("test_data_1"))
require.NoError(t, err)
require.Equal(t, "test_data_1", decompressGzip(buf))

buf.Reset()
_, err = testMultiEncoder.EncodeBytes(rawtopiccommon.CodecGzip, buf, []byte("test_data_2"))
require.NoError(t, err)
require.Equal(t, "test_data_2", decompressGzip(buf))
})

t.Run("ResetableWriterManyMessages", func(t *testing.T) {
testMultiEncoder := NewMultiEncoder()

buf := &bytes.Buffer{}
for i := 0; i < 50; i++ {
testMsg := []byte(fmt.Sprintf("test_data_%d", i))

buf.Reset()
_, err := testMultiEncoder.EncodeBytes(rawtopiccommon.CodecGzip, buf, testMsg)
require.NoError(t, err)

require.Equal(t, string(testMsg), decompressGzip(buf))
}
})
}
30 changes: 6 additions & 24 deletions internal/topic/topicwriterinternal/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type messageWithDataContent struct {
bufCodec rawtopiccommon.Codec
bufEncoded bytes.Buffer
rawBuf bytes.Buffer
encoders *EncoderMap
encoders *MultiEncoder
BufUncompressedSize int
}

Expand Down Expand Up @@ -111,18 +111,7 @@ func (m *messageWithDataContent) encodeRawContent(codec rawtopiccommon.Codec) ([

m.bufEncoded.Reset()

writer, err := m.encoders.CreateLazyEncodeWriter(codec, &m.bufEncoded)
if err != nil {
return nil, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf(
"ydb: failed create encoder for message, codec '%v': %w",
codec,
err,
)))
}
_, err = writer.Write(m.rawBuf.Bytes())
if err == nil {
err = writer.Close()
}
_, err := m.encoders.EncodeBytes(codec, &m.bufEncoded, m.rawBuf.Bytes())
if err != nil {
return nil, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf(
"ydb: failed to compress message, codec '%v': %w",
Expand Down Expand Up @@ -157,27 +146,20 @@ func (m *messageWithDataContent) readDataToTargetCodec(codec rawtopiccommon.Code
m.bufCodec = codec
m.bufEncoded.Reset()

encoder, err := m.encoders.CreateLazyEncodeWriter(codec, &m.bufEncoded)
if err != nil {
return err
}

reader := m.Data
if reader == nil {
reader = &bytes.Reader{}
}
bytesCount, err := io.Copy(encoder, reader)
if err == nil {
err = encoder.Close()
}

bytesCount, err := m.encoders.Encode(codec, &m.bufEncoded, reader)
if err != nil {
return xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf(
"ydb: failed compress message with codec '%v': %w",
codec,
err,
)))
}
m.BufUncompressedSize = int(bytesCount)
m.BufUncompressedSize = bytesCount
m.Data = nil

return nil
Expand Down Expand Up @@ -218,7 +200,7 @@ func (m *messageWithDataContent) getEncodedBytes(codec rawtopiccommon.Codec) ([]

func newMessageDataWithContent(
message PublicMessage, //nolint:gocritic
encoders *EncoderMap,
encoders *MultiEncoder,
) messageWithDataContent {
return messageWithDataContent{
PublicMessage: message,
Expand Down
10 changes: 5 additions & 5 deletions internal/topic/topicwriterinternal/writer_reconnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ type WriterReconnector struct {
semaphore *semaphore.Weighted
firstInitResponseProcessedChan empty.Chan
lastSeqNo int64
encodersMap *EncoderMap
encodersMap *MultiEncoder
initDoneCh empty.Chan
initInfo InitialInfo
m xsync.RWMutex
Expand Down Expand Up @@ -156,7 +156,7 @@ func newWriterReconnectorStopped(
queue: newMessageQueue(),
lastSeqNo: -1,
firstInitResponseProcessedChan: make(empty.Chan),
encodersMap: NewEncoderMap(),
encodersMap: NewMultiEncoder(),
writerInstanceID: writerInstanceID.String(),
retrySettings: cfg.RetrySettings,
}
Expand Down Expand Up @@ -760,11 +760,11 @@ func createRawMessageData(
return res, err
}

func calculateAllowedCodecs(forceCodec rawtopiccommon.Codec, encoderMap *EncoderMap,
func calculateAllowedCodecs(forceCodec rawtopiccommon.Codec, multiEncoder *MultiEncoder,
serverCodecs rawtopiccommon.SupportedCodecs,
) rawtopiccommon.SupportedCodecs {
if forceCodec != rawtopiccommon.CodecUNSPECIFIED {
if serverCodecs.AllowedByCodecsList(forceCodec) && encoderMap.IsSupported(forceCodec) {
if serverCodecs.AllowedByCodecsList(forceCodec) && multiEncoder.IsSupported(forceCodec) {
return rawtopiccommon.SupportedCodecs{forceCodec}
}

Expand All @@ -779,7 +779,7 @@ func calculateAllowedCodecs(forceCodec rawtopiccommon.Codec, encoderMap *Encoder

res := make(rawtopiccommon.SupportedCodecs, 0, len(serverCodecs))
for _, codec := range serverCodecs {
if encoderMap.IsSupported(codec) {
if multiEncoder.IsSupported(codec) {
res = append(res, codec)
}
}
Expand Down
Loading

0 comments on commit 84893f0

Please sign in to comment.