From c1df891320bd667c5e8b5ba702dbdddd45442747 Mon Sep 17 00:00:00 2001 From: Denis Evdokimov Date: Fri, 17 Jan 2025 15:53:33 +0300 Subject: [PATCH] feat: add buffer encoder and refactor --- CHANGELOG.md | 2 +- .../topic/topicwriterinternal/encoders.go | 64 +++++++++++-------- .../topicwriterinternal/encoders_test.go | 54 +++++++++------- internal/topic/topicwriterinternal/message.go | 10 +-- .../topicwriterinternal/writer_reconnector.go | 8 +-- .../writer_reconnector_test.go | 4 +- 6 files changed, 77 insertions(+), 65 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a67517f6d..7569fa9f0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -* Added the use of sync.Pool in EncoderMap and renamed it to MultiEncoder +* Supported pool of encoders, which implement ResetableWriter interface ## v3.97.0 * Added immutable range iterators from go1.23 into query stats to iterate over query phases and accessed tables without query stats object mutation diff --git a/internal/topic/topicwriterinternal/encoders.go b/internal/topic/topicwriterinternal/encoders.go index b7daafbe7..8bd5ea17b 100644 --- a/internal/topic/topicwriterinternal/encoders.go +++ b/internal/topic/topicwriterinternal/encoders.go @@ -1,6 +1,7 @@ package topicwriterinternal import ( + "bytes" "compress/gzip" "fmt" "io" @@ -45,30 +46,49 @@ func newEncoderPool() *encoderPool { type MultiEncoder struct { m map[rawtopiccommon.Codec]PublicCreateEncoderFunc - mx sync.RWMutex - p map[rawtopiccommon.Codec]*encoderPool + bp xsync.Pool[bytes.Buffer] + ep map[rawtopiccommon.Codec]*encoderPool } -func NewEncoderMap() *MultiEncoder { - return &MultiEncoder{ - m: map[rawtopiccommon.Codec]PublicCreateEncoderFunc{ - rawtopiccommon.CodecRaw: func(writer io.Writer) (io.WriteCloser, error) { - return nopWriteCloser{writer}, nil - }, - rawtopiccommon.CodecGzip: func(writer io.Writer) (io.WriteCloser, error) { - return gzip.NewWriter(writer), nil +func NewMultiEncoder() *MultiEncoder { + me := &MultiEncoder{ + m: make(map[rawtopiccommon.Codec]PublicCreateEncoderFunc), + bp: xsync.Pool[bytes.Buffer]{ + New: func() *bytes.Buffer { + return &bytes.Buffer{} }, }, - mx: sync.RWMutex{}, - p: make(map[rawtopiccommon.Codec]*encoderPool), + ep: make(map[rawtopiccommon.Codec]*encoderPool), } + + me.AddEncoder(rawtopiccommon.CodecRaw, func(writer io.Writer) (io.WriteCloser, error) { + return nopWriteCloser{writer}, nil + }) + me.AddEncoder(rawtopiccommon.CodecGzip, func(writer io.Writer) (io.WriteCloser, error) { + return gzip.NewWriter(writer), nil + }) + + return me } func (e *MultiEncoder) AddEncoder(codec rawtopiccommon.Codec, creator PublicCreateEncoderFunc) { e.m[codec] = creator + e.ep[codec] = newEncoderPool() +} + +func (e *MultiEncoder) Encode(codec rawtopiccommon.Codec, target io.Writer, source io.Reader) (int, error) { + buf := e.bp.GetOrNew() + defer e.bp.Put(buf) + + buf.Reset() + if _, err := buf.ReadFrom(source); err != nil { + return 0, err + } + + return e.EncodeBytes(codec, target, buf.Bytes()) } -func (e *MultiEncoder) Encode(codec rawtopiccommon.Codec, target io.Writer, data []byte) (int, error) { +func (e *MultiEncoder) EncodeBytes(codec rawtopiccommon.Codec, target io.Writer, data []byte) (int, error) { enc, err := e.createEncodeWriter(codec, target) if err != nil { return 0, err @@ -82,27 +102,15 @@ func (e *MultiEncoder) Encode(codec rawtopiccommon.Codec, target io.Writer, data return 0, err } - resetableEnc, ok := enc.(PublicResetableWriter) - if ok { - e.mx.Lock() - if _, ok := e.p[codec]; !ok { - e.p[codec] = newEncoderPool() - } - e.mx.Unlock() - - e.mx.RLock() - e.p[codec].Put(resetableEnc) - e.mx.RUnlock() + if resetableEnc, ok := enc.(PublicResetableWriter); ok { + e.ep[codec].Put(resetableEnc) } return n, nil } func (e *MultiEncoder) createEncodeWriter(codec rawtopiccommon.Codec, target io.Writer) (io.WriteCloser, error) { - e.mx.RLock() - defer e.mx.RUnlock() - - if ePool, ok := e.p[codec]; ok { + if ePool, ok := e.ep[codec]; ok { wr := ePool.Get() if wr != nil { wr.Reset(target) diff --git a/internal/topic/topicwriterinternal/encoders_test.go b/internal/topic/topicwriterinternal/encoders_test.go index 9672661bb..b2bf7bd97 100644 --- a/internal/topic/topicwriterinternal/encoders_test.go +++ b/internal/topic/topicwriterinternal/encoders_test.go @@ -23,7 +23,7 @@ func TestEncoderSelector_CodecMeasure(t *testing.T) { }) t.Run("One", func(t *testing.T) { s := NewEncoderSelector( - NewEncoderMap(), + NewMultiEncoder(), rawtopiccommon.SupportedCodecs{rawtopiccommon.CodecRaw}, 1, &trace.Topic{}, @@ -210,7 +210,7 @@ func TestCompressMessages(t *testing.T) { }) } -func TestEncodersPool(t *testing.T) { +func TestMultiEncoder(t *testing.T) { decompressGzip := func(rd io.Reader) string { gzreader, err := gzip.NewReader(rd) require.NoError(t, err) @@ -221,67 +221,77 @@ func TestEncodersPool(t *testing.T) { return string(decompressedMsg) } + t.Run("BuffersPool", func(t *testing.T) { + testMultiEncoder := NewMultiEncoder() + + buf := &bytes.Buffer{} + for i := 0; i < 50; i++ { + testMsg := []byte(fmt.Sprintf("test_data_%d", i)) + + buf.Reset() + _, err := testMultiEncoder.Encode(rawtopiccommon.CodecGzip, buf, bytes.NewReader(testMsg)) + require.NoError(t, err) + + require.Equal(t, string(testMsg), decompressGzip(buf)) + } + }) + t.Run("NotResetableWriter", func(t *testing.T) { - testEncoderMap := NewEncoderMap() + testMultiEncoder := NewMultiEncoder() + require.Len(t, testMultiEncoder.ep, 2) buf := &bytes.Buffer{} - _, err := testEncoderMap.Encode(rawtopiccommon.CodecRaw, buf, []byte("test_data")) + _, err := testMultiEncoder.EncodeBytes(rawtopiccommon.CodecRaw, buf, []byte("test_data")) require.NoError(t, err) - require.Len(t, testEncoderMap.p, 0) require.Equal(t, "test_data", buf.String()) }) t.Run("ResetableWriterCustom", func(t *testing.T) { - testEncoderMap := NewEncoderMap() + testMultiEncoder := NewMultiEncoder() - customCodecCode := rawtopiccommon.Codec(5) - testEncoderMap.AddEncoder(customCodecCode, func(writer io.Writer) (io.WriteCloser, error) { + customCodecCode := rawtopiccommon.Codec(10001) + testMultiEncoder.AddEncoder(customCodecCode, func(writer io.Writer) (io.WriteCloser, error) { return gzip.NewWriter(writer), nil }) + require.Len(t, testMultiEncoder.ep, 3) buf := &bytes.Buffer{} - _, err := testEncoderMap.Encode(customCodecCode, buf, []byte("test_data_1")) + _, err := testMultiEncoder.EncodeBytes(customCodecCode, buf, []byte("test_data_1")) require.NoError(t, err) - require.Len(t, testEncoderMap.p, 1) require.Equal(t, "test_data_1", decompressGzip(buf)) buf.Reset() - _, err = testEncoderMap.Encode(rawtopiccommon.CodecGzip, buf, []byte("test_data_2")) + _, err = testMultiEncoder.EncodeBytes(rawtopiccommon.CodecGzip, buf, []byte("test_data_2")) require.NoError(t, err) - require.Len(t, testEncoderMap.p, 2) require.Equal(t, "test_data_2", decompressGzip(buf)) }) t.Run("ResetableWriter", func(t *testing.T) { - testEncoderMap := NewEncoderMap() + testMultiEncoder := NewMultiEncoder() buf := &bytes.Buffer{} - _, err := testEncoderMap.Encode(rawtopiccommon.CodecGzip, buf, []byte("test_data_1")) + _, err := testMultiEncoder.EncodeBytes(rawtopiccommon.CodecGzip, buf, []byte("test_data_1")) require.NoError(t, err) - require.Len(t, testEncoderMap.p, 1) require.Equal(t, "test_data_1", decompressGzip(buf)) buf.Reset() - _, err = testEncoderMap.Encode(rawtopiccommon.CodecGzip, buf, []byte("test_data_2")) + _, err = testMultiEncoder.EncodeBytes(rawtopiccommon.CodecGzip, buf, []byte("test_data_2")) require.NoError(t, err) - require.Len(t, testEncoderMap.p, 1) require.Equal(t, "test_data_2", decompressGzip(buf)) }) t.Run("ResetableWriterManyMessages", func(t *testing.T) { - testEncoderMap := NewEncoderMap() + testMultiEncoder := NewMultiEncoder() buf := &bytes.Buffer{} for i := 0; i < 50; i++ { - testMsg := []byte(fmt.Sprintf("data_%d", i)) + testMsg := []byte(fmt.Sprintf("test_data_%d", i)) buf.Reset() - _, err := testEncoderMap.Encode(rawtopiccommon.CodecGzip, buf, testMsg) + _, err := testMultiEncoder.EncodeBytes(rawtopiccommon.CodecGzip, buf, testMsg) require.NoError(t, err) require.Equal(t, string(testMsg), decompressGzip(buf)) } - - require.Len(t, testEncoderMap.p, 1) }) } diff --git a/internal/topic/topicwriterinternal/message.go b/internal/topic/topicwriterinternal/message.go index bc2cf70ef..0e82532fe 100644 --- a/internal/topic/topicwriterinternal/message.go +++ b/internal/topic/topicwriterinternal/message.go @@ -111,7 +111,7 @@ func (m *messageWithDataContent) encodeRawContent(codec rawtopiccommon.Codec) ([ m.bufEncoded.Reset() - _, err := m.encoders.Encode(codec, &m.bufEncoded, m.rawBuf.Bytes()) + _, err := m.encoders.EncodeBytes(codec, &m.bufEncoded, m.rawBuf.Bytes()) if err != nil { return nil, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf( "ydb: failed to compress message, codec '%v': %w", @@ -151,13 +151,7 @@ func (m *messageWithDataContent) readDataToTargetCodec(codec rawtopiccommon.Code reader = &bytes.Reader{} } - buf := &bytes.Buffer{} - _, err := buf.ReadFrom(reader) - if err != nil { - return err - } - - bytesCount, err := m.encoders.Encode(codec, &m.bufEncoded, buf.Bytes()) + bytesCount, err := m.encoders.Encode(codec, &m.bufEncoded, reader) if err != nil { return xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf( "ydb: failed compress message with codec '%v': %w", diff --git a/internal/topic/topicwriterinternal/writer_reconnector.go b/internal/topic/topicwriterinternal/writer_reconnector.go index e1684e4c7..a7632c4a4 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector.go +++ b/internal/topic/topicwriterinternal/writer_reconnector.go @@ -156,7 +156,7 @@ func newWriterReconnectorStopped( queue: newMessageQueue(), lastSeqNo: -1, firstInitResponseProcessedChan: make(empty.Chan), - encodersMap: NewEncoderMap(), + encodersMap: NewMultiEncoder(), writerInstanceID: writerInstanceID.String(), retrySettings: cfg.RetrySettings, } @@ -760,11 +760,11 @@ func createRawMessageData( return res, err } -func calculateAllowedCodecs(forceCodec rawtopiccommon.Codec, encoderMap *MultiEncoder, +func calculateAllowedCodecs(forceCodec rawtopiccommon.Codec, multiEncoder *MultiEncoder, serverCodecs rawtopiccommon.SupportedCodecs, ) rawtopiccommon.SupportedCodecs { if forceCodec != rawtopiccommon.CodecUNSPECIFIED { - if serverCodecs.AllowedByCodecsList(forceCodec) && encoderMap.IsSupported(forceCodec) { + if serverCodecs.AllowedByCodecsList(forceCodec) && multiEncoder.IsSupported(forceCodec) { return rawtopiccommon.SupportedCodecs{forceCodec} } @@ -779,7 +779,7 @@ func calculateAllowedCodecs(forceCodec rawtopiccommon.Codec, encoderMap *MultiEn res := make(rawtopiccommon.SupportedCodecs, 0, len(serverCodecs)) for _, codec := range serverCodecs { - if encoderMap.IsSupported(codec) { + if multiEncoder.IsSupported(codec) { res = append(res, codec) } } diff --git a/internal/topic/topicwriterinternal/writer_reconnector_test.go b/internal/topic/topicwriterinternal/writer_reconnector_test.go index dc89dcafb..405265399 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector_test.go +++ b/internal/topic/topicwriterinternal/writer_reconnector_test.go @@ -26,7 +26,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest" ) -var testCommonEncoders = NewEncoderMap() +var testCommonEncoders = NewMultiEncoder() func TestWriterImpl_AutoSeq(t *testing.T) { t.Run("OK", func(t *testing.T) { @@ -829,7 +829,7 @@ func TestSplitMessagesByBufCodec(t *testing.T) { func TestCalculateAllowedCodecs(t *testing.T) { customCodecSupported := rawtopiccommon.Codec(rawtopiccommon.CodecCustomerFirst) customCodecUnsupported := rawtopiccommon.Codec(rawtopiccommon.CodecCustomerFirst + 1) - encoders := NewEncoderMap() + encoders := NewMultiEncoder() encoders.AddEncoder(customCodecSupported, func(writer io.Writer) (io.WriteCloser, error) { return nil, errors.New("test") })