Skip to content

Commit

Permalink
feat: add buffer encoder and refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Denis Evdokimov committed Jan 17, 2025
1 parent 34ce284 commit c1df891
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 65 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
* Added the use of sync.Pool in EncoderMap and renamed it to MultiEncoder
* Supported pool of encoders, which implement ResetableWriter interface

## v3.97.0
* Added immutable range iterators from go1.23 into query stats to iterate over query phases and accessed tables without query stats object mutation
Expand Down
64 changes: 36 additions & 28 deletions internal/topic/topicwriterinternal/encoders.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package topicwriterinternal

import (
"bytes"
"compress/gzip"
"fmt"
"io"
Expand Down Expand Up @@ -45,30 +46,49 @@ func newEncoderPool() *encoderPool {
type MultiEncoder struct {
m map[rawtopiccommon.Codec]PublicCreateEncoderFunc

mx sync.RWMutex
p map[rawtopiccommon.Codec]*encoderPool
bp xsync.Pool[bytes.Buffer]
ep map[rawtopiccommon.Codec]*encoderPool
}

func NewEncoderMap() *MultiEncoder {
return &MultiEncoder{
m: map[rawtopiccommon.Codec]PublicCreateEncoderFunc{
rawtopiccommon.CodecRaw: func(writer io.Writer) (io.WriteCloser, error) {
return nopWriteCloser{writer}, nil
},
rawtopiccommon.CodecGzip: func(writer io.Writer) (io.WriteCloser, error) {
return gzip.NewWriter(writer), nil
func NewMultiEncoder() *MultiEncoder {
me := &MultiEncoder{
m: make(map[rawtopiccommon.Codec]PublicCreateEncoderFunc),
bp: xsync.Pool[bytes.Buffer]{
New: func() *bytes.Buffer {
return &bytes.Buffer{}
},
},
mx: sync.RWMutex{},
p: make(map[rawtopiccommon.Codec]*encoderPool),
ep: make(map[rawtopiccommon.Codec]*encoderPool),
}

me.AddEncoder(rawtopiccommon.CodecRaw, func(writer io.Writer) (io.WriteCloser, error) {
return nopWriteCloser{writer}, nil
})
me.AddEncoder(rawtopiccommon.CodecGzip, func(writer io.Writer) (io.WriteCloser, error) {
return gzip.NewWriter(writer), nil
})

return me
}

func (e *MultiEncoder) AddEncoder(codec rawtopiccommon.Codec, creator PublicCreateEncoderFunc) {
e.m[codec] = creator
e.ep[codec] = newEncoderPool()
}

func (e *MultiEncoder) Encode(codec rawtopiccommon.Codec, target io.Writer, source io.Reader) (int, error) {
buf := e.bp.GetOrNew()
defer e.bp.Put(buf)

buf.Reset()
if _, err := buf.ReadFrom(source); err != nil {
return 0, err
}

return e.EncodeBytes(codec, target, buf.Bytes())
}

func (e *MultiEncoder) Encode(codec rawtopiccommon.Codec, target io.Writer, data []byte) (int, error) {
func (e *MultiEncoder) EncodeBytes(codec rawtopiccommon.Codec, target io.Writer, data []byte) (int, error) {
enc, err := e.createEncodeWriter(codec, target)
if err != nil {
return 0, err
Expand All @@ -82,27 +102,15 @@ func (e *MultiEncoder) Encode(codec rawtopiccommon.Codec, target io.Writer, data
return 0, err
}

resetableEnc, ok := enc.(PublicResetableWriter)
if ok {
e.mx.Lock()
if _, ok := e.p[codec]; !ok {
e.p[codec] = newEncoderPool()
}
e.mx.Unlock()

e.mx.RLock()
e.p[codec].Put(resetableEnc)
e.mx.RUnlock()
if resetableEnc, ok := enc.(PublicResetableWriter); ok {
e.ep[codec].Put(resetableEnc)
}

return n, nil
}

func (e *MultiEncoder) createEncodeWriter(codec rawtopiccommon.Codec, target io.Writer) (io.WriteCloser, error) {
e.mx.RLock()
defer e.mx.RUnlock()

if ePool, ok := e.p[codec]; ok {
if ePool, ok := e.ep[codec]; ok {
wr := ePool.Get()
if wr != nil {
wr.Reset(target)
Expand Down
54 changes: 32 additions & 22 deletions internal/topic/topicwriterinternal/encoders_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestEncoderSelector_CodecMeasure(t *testing.T) {
})
t.Run("One", func(t *testing.T) {
s := NewEncoderSelector(
NewEncoderMap(),
NewMultiEncoder(),
rawtopiccommon.SupportedCodecs{rawtopiccommon.CodecRaw},
1,
&trace.Topic{},
Expand Down Expand Up @@ -210,7 +210,7 @@ func TestCompressMessages(t *testing.T) {
})
}

func TestEncodersPool(t *testing.T) {
func TestMultiEncoder(t *testing.T) {
decompressGzip := func(rd io.Reader) string {
gzreader, err := gzip.NewReader(rd)
require.NoError(t, err)
Expand All @@ -221,67 +221,77 @@ func TestEncodersPool(t *testing.T) {
return string(decompressedMsg)
}

t.Run("BuffersPool", func(t *testing.T) {
testMultiEncoder := NewMultiEncoder()

buf := &bytes.Buffer{}
for i := 0; i < 50; i++ {
testMsg := []byte(fmt.Sprintf("test_data_%d", i))

buf.Reset()
_, err := testMultiEncoder.Encode(rawtopiccommon.CodecGzip, buf, bytes.NewReader(testMsg))
require.NoError(t, err)

require.Equal(t, string(testMsg), decompressGzip(buf))
}
})

t.Run("NotResetableWriter", func(t *testing.T) {
testEncoderMap := NewEncoderMap()
testMultiEncoder := NewMultiEncoder()
require.Len(t, testMultiEncoder.ep, 2)

buf := &bytes.Buffer{}
_, err := testEncoderMap.Encode(rawtopiccommon.CodecRaw, buf, []byte("test_data"))
_, err := testMultiEncoder.EncodeBytes(rawtopiccommon.CodecRaw, buf, []byte("test_data"))
require.NoError(t, err)
require.Len(t, testEncoderMap.p, 0)
require.Equal(t, "test_data", buf.String())
})

t.Run("ResetableWriterCustom", func(t *testing.T) {
testEncoderMap := NewEncoderMap()
testMultiEncoder := NewMultiEncoder()

customCodecCode := rawtopiccommon.Codec(5)
testEncoderMap.AddEncoder(customCodecCode, func(writer io.Writer) (io.WriteCloser, error) {
customCodecCode := rawtopiccommon.Codec(10001)
testMultiEncoder.AddEncoder(customCodecCode, func(writer io.Writer) (io.WriteCloser, error) {
return gzip.NewWriter(writer), nil
})
require.Len(t, testMultiEncoder.ep, 3)

buf := &bytes.Buffer{}
_, err := testEncoderMap.Encode(customCodecCode, buf, []byte("test_data_1"))
_, err := testMultiEncoder.EncodeBytes(customCodecCode, buf, []byte("test_data_1"))
require.NoError(t, err)
require.Len(t, testEncoderMap.p, 1)
require.Equal(t, "test_data_1", decompressGzip(buf))

buf.Reset()
_, err = testEncoderMap.Encode(rawtopiccommon.CodecGzip, buf, []byte("test_data_2"))
_, err = testMultiEncoder.EncodeBytes(rawtopiccommon.CodecGzip, buf, []byte("test_data_2"))
require.NoError(t, err)
require.Len(t, testEncoderMap.p, 2)
require.Equal(t, "test_data_2", decompressGzip(buf))
})

t.Run("ResetableWriter", func(t *testing.T) {
testEncoderMap := NewEncoderMap()
testMultiEncoder := NewMultiEncoder()

buf := &bytes.Buffer{}
_, err := testEncoderMap.Encode(rawtopiccommon.CodecGzip, buf, []byte("test_data_1"))
_, err := testMultiEncoder.EncodeBytes(rawtopiccommon.CodecGzip, buf, []byte("test_data_1"))
require.NoError(t, err)
require.Len(t, testEncoderMap.p, 1)
require.Equal(t, "test_data_1", decompressGzip(buf))

buf.Reset()
_, err = testEncoderMap.Encode(rawtopiccommon.CodecGzip, buf, []byte("test_data_2"))
_, err = testMultiEncoder.EncodeBytes(rawtopiccommon.CodecGzip, buf, []byte("test_data_2"))
require.NoError(t, err)
require.Len(t, testEncoderMap.p, 1)
require.Equal(t, "test_data_2", decompressGzip(buf))
})

t.Run("ResetableWriterManyMessages", func(t *testing.T) {
testEncoderMap := NewEncoderMap()
testMultiEncoder := NewMultiEncoder()

buf := &bytes.Buffer{}
for i := 0; i < 50; i++ {
testMsg := []byte(fmt.Sprintf("data_%d", i))
testMsg := []byte(fmt.Sprintf("test_data_%d", i))

buf.Reset()
_, err := testEncoderMap.Encode(rawtopiccommon.CodecGzip, buf, testMsg)
_, err := testMultiEncoder.EncodeBytes(rawtopiccommon.CodecGzip, buf, testMsg)
require.NoError(t, err)

require.Equal(t, string(testMsg), decompressGzip(buf))
}

require.Len(t, testEncoderMap.p, 1)
})
}
10 changes: 2 additions & 8 deletions internal/topic/topicwriterinternal/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (m *messageWithDataContent) encodeRawContent(codec rawtopiccommon.Codec) ([

m.bufEncoded.Reset()

_, err := m.encoders.Encode(codec, &m.bufEncoded, m.rawBuf.Bytes())
_, err := m.encoders.EncodeBytes(codec, &m.bufEncoded, m.rawBuf.Bytes())
if err != nil {
return nil, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf(
"ydb: failed to compress message, codec '%v': %w",
Expand Down Expand Up @@ -151,13 +151,7 @@ func (m *messageWithDataContent) readDataToTargetCodec(codec rawtopiccommon.Code
reader = &bytes.Reader{}
}

buf := &bytes.Buffer{}
_, err := buf.ReadFrom(reader)
if err != nil {
return err
}

bytesCount, err := m.encoders.Encode(codec, &m.bufEncoded, buf.Bytes())
bytesCount, err := m.encoders.Encode(codec, &m.bufEncoded, reader)
if err != nil {
return xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf(
"ydb: failed compress message with codec '%v': %w",
Expand Down
8 changes: 4 additions & 4 deletions internal/topic/topicwriterinternal/writer_reconnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func newWriterReconnectorStopped(
queue: newMessageQueue(),
lastSeqNo: -1,
firstInitResponseProcessedChan: make(empty.Chan),
encodersMap: NewEncoderMap(),
encodersMap: NewMultiEncoder(),
writerInstanceID: writerInstanceID.String(),
retrySettings: cfg.RetrySettings,
}
Expand Down Expand Up @@ -760,11 +760,11 @@ func createRawMessageData(
return res, err
}

func calculateAllowedCodecs(forceCodec rawtopiccommon.Codec, encoderMap *MultiEncoder,
func calculateAllowedCodecs(forceCodec rawtopiccommon.Codec, multiEncoder *MultiEncoder,
serverCodecs rawtopiccommon.SupportedCodecs,
) rawtopiccommon.SupportedCodecs {
if forceCodec != rawtopiccommon.CodecUNSPECIFIED {
if serverCodecs.AllowedByCodecsList(forceCodec) && encoderMap.IsSupported(forceCodec) {
if serverCodecs.AllowedByCodecsList(forceCodec) && multiEncoder.IsSupported(forceCodec) {
return rawtopiccommon.SupportedCodecs{forceCodec}
}

Expand All @@ -779,7 +779,7 @@ func calculateAllowedCodecs(forceCodec rawtopiccommon.Codec, encoderMap *MultiEn

res := make(rawtopiccommon.SupportedCodecs, 0, len(serverCodecs))
for _, codec := range serverCodecs {
if encoderMap.IsSupported(codec) {
if multiEncoder.IsSupported(codec) {
res = append(res, codec)
}
}
Expand Down
4 changes: 2 additions & 2 deletions internal/topic/topicwriterinternal/writer_reconnector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest"
)

var testCommonEncoders = NewEncoderMap()
var testCommonEncoders = NewMultiEncoder()

func TestWriterImpl_AutoSeq(t *testing.T) {
t.Run("OK", func(t *testing.T) {
Expand Down Expand Up @@ -829,7 +829,7 @@ func TestSplitMessagesByBufCodec(t *testing.T) {
func TestCalculateAllowedCodecs(t *testing.T) {
customCodecSupported := rawtopiccommon.Codec(rawtopiccommon.CodecCustomerFirst)
customCodecUnsupported := rawtopiccommon.Codec(rawtopiccommon.CodecCustomerFirst + 1)
encoders := NewEncoderMap()
encoders := NewMultiEncoder()
encoders.AddEncoder(customCodecSupported, func(writer io.Writer) (io.WriteCloser, error) {
return nil, errors.New("test")
})
Expand Down

0 comments on commit c1df891

Please sign in to comment.