From 7d51b06c1a04522df7878d2f32cffdd8f9fb8f22 Mon Sep 17 00:00:00 2001 From: Denis Evdokimov Date: Thu, 12 Dec 2024 22:22:14 +0300 Subject: [PATCH 01/11] feat: sync.Pool for encoders --- .../topic/topicwriterinternal/encoders.go | 74 ++++++++++++++++++- .../topic/topicwriterinternal/gzip_encoder.go | 33 +++++++++ internal/topic/topicwriterinternal/message.go | 28 ++----- 3 files changed, 111 insertions(+), 24 deletions(-) create mode 100644 internal/topic/topicwriterinternal/gzip_encoder.go diff --git a/internal/topic/topicwriterinternal/encoders.go b/internal/topic/topicwriterinternal/encoders.go index a1ca73ad9..a21e37891 100644 --- a/internal/topic/topicwriterinternal/encoders.go +++ b/internal/topic/topicwriterinternal/encoders.go @@ -1,7 +1,6 @@ package topicwriterinternal import ( - "compress/gzip" "fmt" "io" "sync" @@ -17,8 +16,31 @@ const ( codecUnknown = rawtopiccommon.CodecUNSPECIFIED ) +type encoder interface { + Encode(target io.Writer, data []byte) (int, error) +} + +type encoderPool struct { + pool sync.Pool +} + +func (p *encoderPool) Get() encoder { + return p.pool.Get().(encoder) +} + +func (p *encoderPool) Put(encoder encoder) { + p.pool.Put(encoder) +} + +func newEncoderPool(newEncoderFunc func() any) *encoderPool { + return &encoderPool{ + pool: sync.Pool{New: newEncoderFunc}, + } +} + type EncoderMap struct { m map[rawtopiccommon.Codec]PublicCreateEncoderFunc + p map[rawtopiccommon.Codec]*encoderPool } func NewEncoderMap() *EncoderMap { @@ -27,9 +49,9 @@ func NewEncoderMap() *EncoderMap { rawtopiccommon.CodecRaw: func(writer io.Writer) (io.WriteCloser, error) { return nopWriteCloser{writer}, nil }, - rawtopiccommon.CodecGzip: func(writer io.Writer) (io.WriteCloser, error) { - return gzip.NewWriter(writer), nil - }, + }, + p: map[rawtopiccommon.Codec]*encoderPool{ + rawtopiccommon.CodecGzip: newEncoderPool(newGzipEncoder), }, } } @@ -46,6 +68,45 @@ func (e *EncoderMap) CreateLazyEncodeWriter(codec rawtopiccommon.Codec, target i return nil, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf("ydb: unexpected codec '%v' for encode message", codec))) } +func (e *EncoderMap) Encode(codec rawtopiccommon.Codec, target io.Writer, data []byte) (int, error) { + ePool, ok := e.p[codec] + if !ok { + return e.encodeWithLazyWriter(codec, target, data) + } + + enc := ePool.Get() + n, err := enc.Encode(target, data) + if err != nil { + return 0, err + } + + ePool.Put(enc) + + return n, nil +} + +func (e *EncoderMap) encodeWithLazyWriter(codec rawtopiccommon.Codec, target io.Writer, data []byte) (int, error) { + encoderCreator, ok := e.m[codec] + if !ok { + return 0, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf("ydb: unexpected codec '%v' for encode message", codec))) + } + + enc, err := encoderCreator(target) + if err != nil { + return 0, err + } + + n, err := enc.Write(data) + if err == nil { + err = enc.Close() + } + if err != nil { + return 0, err + } + + return n, nil +} + func (e *EncoderMap) GetSupportedCodecs() rawtopiccommon.SupportedCodecs { res := make(rawtopiccommon.SupportedCodecs, 0, len(e.m)) for codec := range e.m { @@ -57,6 +118,11 @@ func (e *EncoderMap) GetSupportedCodecs() rawtopiccommon.SupportedCodecs { func (e *EncoderMap) IsSupported(codec rawtopiccommon.Codec) bool { _, ok := e.m[codec] + if ok { + return true + } + + _, ok = e.p[codec] return ok } diff --git a/internal/topic/topicwriterinternal/gzip_encoder.go b/internal/topic/topicwriterinternal/gzip_encoder.go new file mode 100644 index 000000000..8dc292459 --- /dev/null +++ b/internal/topic/topicwriterinternal/gzip_encoder.go @@ -0,0 +1,33 @@ +package topicwriterinternal + +import ( + "compress/gzip" + "io" +) + +type gzipEncoder struct { + *gzip.Writer +} + +func (ge *gzipEncoder) Encode(target io.Writer, data []byte) (int, error) { + ge.Reset(target) + + n, err := ge.Write(data) + if err != nil { + return 0, err + } + + if err := ge.Flush(); err != nil { + return 0, err + } + + if err := ge.Close(); err != nil { + return 0, err + } + + return n, nil +} + +func newGzipEncoder() any { + return &gzipEncoder{gzip.NewWriter(nil)} +} diff --git a/internal/topic/topicwriterinternal/message.go b/internal/topic/topicwriterinternal/message.go index 00c17be3f..08c26af11 100644 --- a/internal/topic/topicwriterinternal/message.go +++ b/internal/topic/topicwriterinternal/message.go @@ -111,18 +111,7 @@ func (m *messageWithDataContent) encodeRawContent(codec rawtopiccommon.Codec) ([ m.bufEncoded.Reset() - writer, err := m.encoders.CreateLazyEncodeWriter(codec, &m.bufEncoded) - if err != nil { - return nil, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf( - "ydb: failed create encoder for message, codec '%v': %w", - codec, - err, - ))) - } - _, err = writer.Write(m.rawBuf.Bytes()) - if err == nil { - err = writer.Close() - } + _, err := m.encoders.Encode(codec, &m.bufEncoded, m.rawBuf.Bytes()) if err != nil { return nil, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf( "ydb: failed to compress message, codec '%v': %w", @@ -157,19 +146,18 @@ func (m *messageWithDataContent) readDataToTargetCodec(codec rawtopiccommon.Code m.bufCodec = codec m.bufEncoded.Reset() - encoder, err := m.encoders.CreateLazyEncodeWriter(codec, &m.bufEncoded) - if err != nil { - return err - } - reader := m.Data if reader == nil { reader = &bytes.Reader{} } - bytesCount, err := io.Copy(encoder, reader) - if err == nil { - err = encoder.Close() + + buf := bytes.NewBuffer([]byte("")) + _, err := buf.ReadFrom(reader) + if err != nil { + return err } + + bytesCount, err := m.encoders.Encode(codec, &m.bufEncoded, buf.Bytes()) if err != nil { return xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf( "ydb: failed compress message with codec '%v': %w", From 26e43b7df7f336d9908642d81b3de966ab4ac496 Mon Sep 17 00:00:00 2001 From: Denis Evdokimov Date: Thu, 12 Dec 2024 22:45:05 +0300 Subject: [PATCH 02/11] ref: fix linter warns --- internal/topic/topicwriterinternal/encoders.go | 3 ++- internal/topic/topicwriterinternal/message.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/internal/topic/topicwriterinternal/encoders.go b/internal/topic/topicwriterinternal/encoders.go index a21e37891..60181d117 100644 --- a/internal/topic/topicwriterinternal/encoders.go +++ b/internal/topic/topicwriterinternal/encoders.go @@ -25,7 +25,8 @@ type encoderPool struct { } func (p *encoderPool) Get() encoder { - return p.pool.Get().(encoder) + enc, _ := p.pool.Get().(encoder) + return enc } func (p *encoderPool) Put(encoder encoder) { diff --git a/internal/topic/topicwriterinternal/message.go b/internal/topic/topicwriterinternal/message.go index 08c26af11..1a9ec9e86 100644 --- a/internal/topic/topicwriterinternal/message.go +++ b/internal/topic/topicwriterinternal/message.go @@ -151,7 +151,7 @@ func (m *messageWithDataContent) readDataToTargetCodec(codec rawtopiccommon.Code reader = &bytes.Reader{} } - buf := bytes.NewBuffer([]byte("")) + buf := &bytes.Buffer{} _, err := buf.ReadFrom(reader) if err != nil { return err From 352663d1b264ede37f16e2478545f635a2c0a7c1 Mon Sep 17 00:00:00 2001 From: Denis Evdokimov Date: Fri, 13 Dec 2024 10:37:06 +0300 Subject: [PATCH 03/11] feat: remove unused creat func --- internal/topic/topicwriterinternal/encoders.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/internal/topic/topicwriterinternal/encoders.go b/internal/topic/topicwriterinternal/encoders.go index 60181d117..415f75e63 100644 --- a/internal/topic/topicwriterinternal/encoders.go +++ b/internal/topic/topicwriterinternal/encoders.go @@ -61,14 +61,6 @@ func (e *EncoderMap) AddEncoder(codec rawtopiccommon.Codec, creator PublicCreate e.m[codec] = creator } -func (e *EncoderMap) CreateLazyEncodeWriter(codec rawtopiccommon.Codec, target io.Writer) (io.WriteCloser, error) { - if encoderCreator, ok := e.m[codec]; ok { - return encoderCreator(target) - } - - return nil, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf("ydb: unexpected codec '%v' for encode message", codec))) -} - func (e *EncoderMap) Encode(codec rawtopiccommon.Codec, target io.Writer, data []byte) (int, error) { ePool, ok := e.p[codec] if !ok { From 701bb3b0469cd8279377dd67e1db164712b8a239 Mon Sep 17 00:00:00 2001 From: Denis Evdokimov Date: Mon, 16 Dec 2024 20:28:53 +0300 Subject: [PATCH 04/11] feat: make pools for every enc with reset --- .../topic/topicwriterinternal/encoders.go | 77 ++++++++++--------- .../topicwriterinternal/encoders_test.go | 31 ++++++++ .../topic/topicwriterinternal/gzip_encoder.go | 33 -------- internal/topic/topicwriterinternal/message.go | 2 +- 4 files changed, 72 insertions(+), 71 deletions(-) delete mode 100644 internal/topic/topicwriterinternal/gzip_encoder.go diff --git a/internal/topic/topicwriterinternal/encoders.go b/internal/topic/topicwriterinternal/encoders.go index 415f75e63..0d5042ce9 100644 --- a/internal/topic/topicwriterinternal/encoders.go +++ b/internal/topic/topicwriterinternal/encoders.go @@ -1,6 +1,7 @@ package topicwriterinternal import ( + "compress/gzip" "fmt" "io" "sync" @@ -16,26 +17,28 @@ const ( codecUnknown = rawtopiccommon.CodecUNSPECIFIED ) -type encoder interface { - Encode(target io.Writer, data []byte) (int, error) +type resetableWriter interface { + io.WriteCloser + Reset(wr io.Writer) } type encoderPool struct { pool sync.Pool } -func (p *encoderPool) Get() encoder { - enc, _ := p.pool.Get().(encoder) +func (p *encoderPool) Get() resetableWriter { + enc, _ := p.pool.Get().(resetableWriter) + return enc } -func (p *encoderPool) Put(encoder encoder) { - p.pool.Put(encoder) +func (p *encoderPool) Put(wr resetableWriter) { + p.pool.Put(wr) } -func newEncoderPool(newEncoderFunc func() any) *encoderPool { +func newEncoderPool() *encoderPool { return &encoderPool{ - pool: sync.Pool{New: newEncoderFunc}, + pool: sync.Pool{}, } } @@ -50,10 +53,11 @@ func NewEncoderMap() *EncoderMap { rawtopiccommon.CodecRaw: func(writer io.Writer) (io.WriteCloser, error) { return nopWriteCloser{writer}, nil }, + rawtopiccommon.CodecGzip: func(writer io.Writer) (io.WriteCloser, error) { + return gzip.NewWriter(writer), nil + }, }, - p: map[rawtopiccommon.Codec]*encoderPool{ - rawtopiccommon.CodecGzip: newEncoderPool(newGzipEncoder), - }, + p: make(map[rawtopiccommon.Codec]*encoderPool), } } @@ -62,42 +66,46 @@ func (e *EncoderMap) AddEncoder(codec rawtopiccommon.Codec, creator PublicCreate } func (e *EncoderMap) Encode(codec rawtopiccommon.Codec, target io.Writer, data []byte) (int, error) { - ePool, ok := e.p[codec] - if !ok { - return e.encodeWithLazyWriter(codec, target, data) + enc, err := e.createEncodeWriter(codec, target) + if err != nil { + return 0, err } - enc := ePool.Get() - n, err := enc.Encode(target, data) + n, err := enc.Write(data) + if err == nil { + err = enc.Close() + } if err != nil { return 0, err } - ePool.Put(enc) + resetableEnc, ok := enc.(resetableWriter) + if ok { + if _, ok := e.p[codec]; !ok { + e.p[codec] = newEncoderPool() + } + + e.p[codec].Put(resetableEnc) + } return n, nil } -func (e *EncoderMap) encodeWithLazyWriter(codec rawtopiccommon.Codec, target io.Writer, data []byte) (int, error) { - encoderCreator, ok := e.m[codec] - if !ok { - return 0, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf("ydb: unexpected codec '%v' for encode message", codec))) - } +func (e *EncoderMap) createEncodeWriter(codec rawtopiccommon.Codec, target io.Writer) (io.WriteCloser, error) { + if ePool, ok := e.p[codec]; ok { + wr := ePool.Get() + if wr != nil { + wr.Reset(target) - enc, err := encoderCreator(target) - if err != nil { - return 0, err + return wr, nil + } } - n, err := enc.Write(data) - if err == nil { - err = enc.Close() - } - if err != nil { - return 0, err + if encoderCreator, ok := e.m[codec]; ok { + return encoderCreator(target) } - return n, nil + return nil, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf("ydb: unexpected codec '%v' for encode message", codec))) } func (e *EncoderMap) GetSupportedCodecs() rawtopiccommon.SupportedCodecs { @@ -111,11 +119,6 @@ func (e *EncoderMap) GetSupportedCodecs() rawtopiccommon.SupportedCodecs { func (e *EncoderMap) IsSupported(codec rawtopiccommon.Codec) bool { _, ok := e.m[codec] - if ok { - return true - } - - _, ok = e.p[codec] return ok } diff --git a/internal/topic/topicwriterinternal/encoders_test.go b/internal/topic/topicwriterinternal/encoders_test.go index 63b929068..a8bcc465a 100644 --- a/internal/topic/topicwriterinternal/encoders_test.go +++ b/internal/topic/topicwriterinternal/encoders_test.go @@ -206,3 +206,34 @@ func TestCompressMessages(t *testing.T) { require.Error(t, cacheMessages(messages, rawtopiccommon.CodecGzip, parallelCount)) }) } + +func TestEncodersPool(t *testing.T) { + t.Run("Not resetable writer", func(t *testing.T) { + testEncoderMap := NewEncoderMap() + + buf := &bytes.Buffer{} + _, err := testEncoderMap.Encode(rawtopiccommon.CodecRaw, buf, []byte("test_data")) + require.NoError(t, err) + + require.Len(t, testEncoderMap.p, 0) + }) + + t.Run("Resetable writer", func(t *testing.T) { + testEncoderMap := NewEncoderMap() + + buf := &bytes.Buffer{} + _, err := testEncoderMap.Encode(rawtopiccommon.CodecGzip, buf, []byte("test_data")) + require.NoError(t, err) + + require.Len(t, testEncoderMap.p, 1) + + wr := testEncoderMap.p[rawtopiccommon.CodecGzip].Get() + require.NotNil(t, wr) + + buf.Reset() + _, err = testEncoderMap.Encode(rawtopiccommon.CodecGzip, buf, []byte("test_data")) + require.NoError(t, err) + + require.Len(t, testEncoderMap.p, 1) + }) +} diff --git a/internal/topic/topicwriterinternal/gzip_encoder.go b/internal/topic/topicwriterinternal/gzip_encoder.go deleted file mode 100644 index 8dc292459..000000000 --- a/internal/topic/topicwriterinternal/gzip_encoder.go +++ /dev/null @@ -1,33 +0,0 @@ -package topicwriterinternal - -import ( - "compress/gzip" - "io" -) - -type gzipEncoder struct { - *gzip.Writer -} - -func (ge *gzipEncoder) Encode(target io.Writer, data []byte) (int, error) { - ge.Reset(target) - - n, err := ge.Write(data) - if err != nil { - return 0, err - } - - if err := ge.Flush(); err != nil { - return 0, err - } - - if err := ge.Close(); err != nil { - return 0, err - } - - return n, nil -} - -func newGzipEncoder() any { - return &gzipEncoder{gzip.NewWriter(nil)} -} diff --git a/internal/topic/topicwriterinternal/message.go b/internal/topic/topicwriterinternal/message.go index 1a9ec9e86..36774671b 100644 --- a/internal/topic/topicwriterinternal/message.go +++ b/internal/topic/topicwriterinternal/message.go @@ -165,7 +165,7 @@ func (m *messageWithDataContent) readDataToTargetCodec(codec rawtopiccommon.Code err, ))) } - m.BufUncompressedSize = int(bytesCount) + m.BufUncompressedSize = bytesCount m.Data = nil return nil From 37e83c61a390801b35cef6f387921d301e6ba602 Mon Sep 17 00:00:00 2001 From: Denis Evdokimov Date: Thu, 16 Jan 2025 11:41:25 +0300 Subject: [PATCH 05/11] test: add compress/decompress tests --- .../topicwriterinternal/encoders_test.go | 64 ++++++++++++++++--- 1 file changed, 56 insertions(+), 8 deletions(-) diff --git a/internal/topic/topicwriterinternal/encoders_test.go b/internal/topic/topicwriterinternal/encoders_test.go index a8bcc465a..9672661bb 100644 --- a/internal/topic/topicwriterinternal/encoders_test.go +++ b/internal/topic/topicwriterinternal/encoders_test.go @@ -2,6 +2,9 @@ package topicwriterinternal import ( "bytes" + "compress/gzip" + "fmt" + "io" "strings" "testing" @@ -208,31 +211,76 @@ func TestCompressMessages(t *testing.T) { } func TestEncodersPool(t *testing.T) { - t.Run("Not resetable writer", func(t *testing.T) { + decompressGzip := func(rd io.Reader) string { + gzreader, err := gzip.NewReader(rd) + require.NoError(t, err) + + decompressedMsg, err := io.ReadAll(gzreader) + require.NoError(t, err) + + return string(decompressedMsg) + } + + t.Run("NotResetableWriter", func(t *testing.T) { testEncoderMap := NewEncoderMap() buf := &bytes.Buffer{} _, err := testEncoderMap.Encode(rawtopiccommon.CodecRaw, buf, []byte("test_data")) require.NoError(t, err) - require.Len(t, testEncoderMap.p, 0) + require.Equal(t, "test_data", buf.String()) }) - t.Run("Resetable writer", func(t *testing.T) { + t.Run("ResetableWriterCustom", func(t *testing.T) { testEncoderMap := NewEncoderMap() + customCodecCode := rawtopiccommon.Codec(5) + testEncoderMap.AddEncoder(customCodecCode, func(writer io.Writer) (io.WriteCloser, error) { + return gzip.NewWriter(writer), nil + }) + buf := &bytes.Buffer{} - _, err := testEncoderMap.Encode(rawtopiccommon.CodecGzip, buf, []byte("test_data")) + _, err := testEncoderMap.Encode(customCodecCode, buf, []byte("test_data_1")) require.NoError(t, err) - require.Len(t, testEncoderMap.p, 1) + require.Equal(t, "test_data_1", decompressGzip(buf)) - wr := testEncoderMap.p[rawtopiccommon.CodecGzip].Get() - require.NotNil(t, wr) + buf.Reset() + _, err = testEncoderMap.Encode(rawtopiccommon.CodecGzip, buf, []byte("test_data_2")) + require.NoError(t, err) + require.Len(t, testEncoderMap.p, 2) + require.Equal(t, "test_data_2", decompressGzip(buf)) + }) + + t.Run("ResetableWriter", func(t *testing.T) { + testEncoderMap := NewEncoderMap() + + buf := &bytes.Buffer{} + _, err := testEncoderMap.Encode(rawtopiccommon.CodecGzip, buf, []byte("test_data_1")) + require.NoError(t, err) + require.Len(t, testEncoderMap.p, 1) + require.Equal(t, "test_data_1", decompressGzip(buf)) buf.Reset() - _, err = testEncoderMap.Encode(rawtopiccommon.CodecGzip, buf, []byte("test_data")) + _, err = testEncoderMap.Encode(rawtopiccommon.CodecGzip, buf, []byte("test_data_2")) require.NoError(t, err) + require.Len(t, testEncoderMap.p, 1) + require.Equal(t, "test_data_2", decompressGzip(buf)) + }) + + t.Run("ResetableWriterManyMessages", func(t *testing.T) { + testEncoderMap := NewEncoderMap() + + buf := &bytes.Buffer{} + for i := 0; i < 50; i++ { + testMsg := []byte(fmt.Sprintf("data_%d", i)) + + buf.Reset() + _, err := testEncoderMap.Encode(rawtopiccommon.CodecGzip, buf, testMsg) + require.NoError(t, err) + + require.Equal(t, string(testMsg), decompressGzip(buf)) + } require.Len(t, testEncoderMap.p, 1) }) From f267cc34682dbde52e7a44daeca8529e2ba82d1b Mon Sep 17 00:00:00 2001 From: Denis Evdokimov Date: Thu, 16 Jan 2025 12:02:15 +0300 Subject: [PATCH 06/11] feat: make ResetableWriter public --- internal/topic/topicwriterinternal/encoders.go | 10 +++++----- topic/topicoptions/topicoptions_writer.go | 6 ++++++ 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/internal/topic/topicwriterinternal/encoders.go b/internal/topic/topicwriterinternal/encoders.go index 0d5042ce9..c4db5d74e 100644 --- a/internal/topic/topicwriterinternal/encoders.go +++ b/internal/topic/topicwriterinternal/encoders.go @@ -17,7 +17,7 @@ const ( codecUnknown = rawtopiccommon.CodecUNSPECIFIED ) -type resetableWriter interface { +type PublicResetableWriter interface { io.WriteCloser Reset(wr io.Writer) } @@ -26,13 +26,13 @@ type encoderPool struct { pool sync.Pool } -func (p *encoderPool) Get() resetableWriter { - enc, _ := p.pool.Get().(resetableWriter) +func (p *encoderPool) Get() PublicResetableWriter { + enc, _ := p.pool.Get().(PublicResetableWriter) return enc } -func (p *encoderPool) Put(wr resetableWriter) { +func (p *encoderPool) Put(wr PublicResetableWriter) { p.pool.Put(wr) } @@ -79,7 +79,7 @@ func (e *EncoderMap) Encode(codec rawtopiccommon.Codec, target io.Writer, data [ return 0, err } - resetableEnc, ok := enc.(resetableWriter) + resetableEnc, ok := enc.(PublicResetableWriter) if ok { if _, ok := e.p[codec]; !ok { e.p[codec] = newEncoderPool() diff --git a/topic/topicoptions/topicoptions_writer.go b/topic/topicoptions/topicoptions_writer.go index 86d4b9c57..d74befa4e 100644 --- a/topic/topicoptions/topicoptions_writer.go +++ b/topic/topicoptions/topicoptions_writer.go @@ -19,8 +19,14 @@ type WriteSessionMetadata map[string]string // CreateEncoderFunc for create message decoders type CreateEncoderFunc = topicwriterinternal.PublicCreateEncoderFunc +// ResettableWriter is able to reset a nested writer between uses. +type ResetableWriter = topicwriterinternal.PublicResetableWriter + // WithWriterAddEncoder add custom codec implementation to writer. // It allows to set custom codecs implementations for custom and internal codecs. +// +// If CreateEncoderFunc returns a writer implementing ResetableWriter, then the compression objects +// will be reused for this codec. This will reduce the load on the GC. func WithWriterAddEncoder(codec topictypes.Codec, f CreateEncoderFunc) WriterOption { return topicwriterinternal.WithAddEncoder(rawtopiccommon.Codec(codec), f) } From 1cc0d6bd09abc5f1090abf3b05565827dd06a5e3 Mon Sep 17 00:00:00 2001 From: Denis Evdokimov Date: Thu, 16 Jan 2025 12:12:26 +0300 Subject: [PATCH 07/11] feat: add mx on writing in p --- internal/topic/topicwriterinternal/encoders.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/internal/topic/topicwriterinternal/encoders.go b/internal/topic/topicwriterinternal/encoders.go index c4db5d74e..e4f765432 100644 --- a/internal/topic/topicwriterinternal/encoders.go +++ b/internal/topic/topicwriterinternal/encoders.go @@ -44,7 +44,9 @@ func newEncoderPool() *encoderPool { type EncoderMap struct { m map[rawtopiccommon.Codec]PublicCreateEncoderFunc - p map[rawtopiccommon.Codec]*encoderPool + + mx sync.Mutex + p map[rawtopiccommon.Codec]*encoderPool } func NewEncoderMap() *EncoderMap { @@ -57,7 +59,8 @@ func NewEncoderMap() *EncoderMap { return gzip.NewWriter(writer), nil }, }, - p: make(map[rawtopiccommon.Codec]*encoderPool), + mx: sync.Mutex{}, + p: make(map[rawtopiccommon.Codec]*encoderPool), } } @@ -82,7 +85,9 @@ func (e *EncoderMap) Encode(codec rawtopiccommon.Codec, target io.Writer, data [ resetableEnc, ok := enc.(PublicResetableWriter) if ok { if _, ok := e.p[codec]; !ok { + e.mx.Lock() e.p[codec] = newEncoderPool() + e.mx.Unlock() } e.p[codec].Put(resetableEnc) From b63e8b6451f22380f1f9f40dc01fcd51ef589a30 Mon Sep 17 00:00:00 2001 From: Denis Evdokimov Date: Thu, 16 Jan 2025 12:55:02 +0300 Subject: [PATCH 08/11] fix: add rwmx --- internal/topic/topicwriterinternal/encoders.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/internal/topic/topicwriterinternal/encoders.go b/internal/topic/topicwriterinternal/encoders.go index e4f765432..806cde185 100644 --- a/internal/topic/topicwriterinternal/encoders.go +++ b/internal/topic/topicwriterinternal/encoders.go @@ -45,7 +45,7 @@ func newEncoderPool() *encoderPool { type EncoderMap struct { m map[rawtopiccommon.Codec]PublicCreateEncoderFunc - mx sync.Mutex + mx sync.RWMutex p map[rawtopiccommon.Codec]*encoderPool } @@ -59,7 +59,7 @@ func NewEncoderMap() *EncoderMap { return gzip.NewWriter(writer), nil }, }, - mx: sync.Mutex{}, + mx: sync.RWMutex{}, p: make(map[rawtopiccommon.Codec]*encoderPool), } } @@ -84,19 +84,24 @@ func (e *EncoderMap) Encode(codec rawtopiccommon.Codec, target io.Writer, data [ resetableEnc, ok := enc.(PublicResetableWriter) if ok { + e.mx.Lock() if _, ok := e.p[codec]; !ok { - e.mx.Lock() e.p[codec] = newEncoderPool() - e.mx.Unlock() } + e.mx.Unlock() + e.mx.RLock() e.p[codec].Put(resetableEnc) + e.mx.RUnlock() } return n, nil } func (e *EncoderMap) createEncodeWriter(codec rawtopiccommon.Codec, target io.Writer) (io.WriteCloser, error) { + e.mx.RLock() + defer e.mx.RUnlock() + if ePool, ok := e.p[codec]; ok { wr := ePool.Get() if wr != nil { From 5d4a556af76e483c3eae0580e857bc2b3fc89bf4 Mon Sep 17 00:00:00 2001 From: Denis Evdokimov Date: Thu, 16 Jan 2025 13:24:05 +0300 Subject: [PATCH 09/11] ref: changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b52f2f157..4fadbb088 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Added the use of sync.Pool in EncoderMap + ## v3.96.2 * Fixed broken metric `ydb_go_sdk_ydb_database_sql_conns` From 7864cdf07d672b56f6cebf3d0f403e060522311c Mon Sep 17 00:00:00 2001 From: Denis Evdokimov Date: Thu, 16 Jan 2025 16:01:24 +0300 Subject: [PATCH 10/11] ref: rename encoder map --- .../topic/topicwriterinternal/encoders.go | 20 +++++++++---------- internal/topic/topicwriterinternal/message.go | 4 ++-- .../topicwriterinternal/writer_reconnector.go | 4 ++-- .../writer_single_stream.go | 4 ++-- 4 files changed, 16 insertions(+), 16 deletions(-) diff --git a/internal/topic/topicwriterinternal/encoders.go b/internal/topic/topicwriterinternal/encoders.go index 806cde185..b7daafbe7 100644 --- a/internal/topic/topicwriterinternal/encoders.go +++ b/internal/topic/topicwriterinternal/encoders.go @@ -42,15 +42,15 @@ func newEncoderPool() *encoderPool { } } -type EncoderMap struct { +type MultiEncoder struct { m map[rawtopiccommon.Codec]PublicCreateEncoderFunc mx sync.RWMutex p map[rawtopiccommon.Codec]*encoderPool } -func NewEncoderMap() *EncoderMap { - return &EncoderMap{ +func NewEncoderMap() *MultiEncoder { + return &MultiEncoder{ m: map[rawtopiccommon.Codec]PublicCreateEncoderFunc{ rawtopiccommon.CodecRaw: func(writer io.Writer) (io.WriteCloser, error) { return nopWriteCloser{writer}, nil @@ -64,11 +64,11 @@ func NewEncoderMap() *EncoderMap { } } -func (e *EncoderMap) AddEncoder(codec rawtopiccommon.Codec, creator PublicCreateEncoderFunc) { +func (e *MultiEncoder) AddEncoder(codec rawtopiccommon.Codec, creator PublicCreateEncoderFunc) { e.m[codec] = creator } -func (e *EncoderMap) Encode(codec rawtopiccommon.Codec, target io.Writer, data []byte) (int, error) { +func (e *MultiEncoder) Encode(codec rawtopiccommon.Codec, target io.Writer, data []byte) (int, error) { enc, err := e.createEncodeWriter(codec, target) if err != nil { return 0, err @@ -98,7 +98,7 @@ func (e *EncoderMap) Encode(codec rawtopiccommon.Codec, target io.Writer, data [ return n, nil } -func (e *EncoderMap) createEncodeWriter(codec rawtopiccommon.Codec, target io.Writer) (io.WriteCloser, error) { +func (e *MultiEncoder) createEncodeWriter(codec rawtopiccommon.Codec, target io.Writer) (io.WriteCloser, error) { e.mx.RLock() defer e.mx.RUnlock() @@ -118,7 +118,7 @@ func (e *EncoderMap) createEncodeWriter(codec rawtopiccommon.Codec, target io.Wr return nil, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf("ydb: unexpected codec '%v' for encode message", codec))) } -func (e *EncoderMap) GetSupportedCodecs() rawtopiccommon.SupportedCodecs { +func (e *MultiEncoder) GetSupportedCodecs() rawtopiccommon.SupportedCodecs { res := make(rawtopiccommon.SupportedCodecs, 0, len(e.m)) for codec := range e.m { res = append(res, codec) @@ -127,7 +127,7 @@ func (e *EncoderMap) GetSupportedCodecs() rawtopiccommon.SupportedCodecs { return res } -func (e *EncoderMap) IsSupported(codec rawtopiccommon.Codec) bool { +func (e *MultiEncoder) IsSupported(codec rawtopiccommon.Codec) bool { _, ok := e.m[codec] return ok @@ -145,7 +145,7 @@ func (nopWriteCloser) Close() error { // EncoderSelector not thread safe type EncoderSelector struct { - m *EncoderMap + m *MultiEncoder tracer *trace.Topic writerReconnectorID string @@ -159,7 +159,7 @@ type EncoderSelector struct { } func NewEncoderSelector( - m *EncoderMap, + m *MultiEncoder, allowedCodecs rawtopiccommon.SupportedCodecs, parallelCompressors int, tracer *trace.Topic, diff --git a/internal/topic/topicwriterinternal/message.go b/internal/topic/topicwriterinternal/message.go index 36774671b..bc2cf70ef 100644 --- a/internal/topic/topicwriterinternal/message.go +++ b/internal/topic/topicwriterinternal/message.go @@ -67,7 +67,7 @@ type messageWithDataContent struct { bufCodec rawtopiccommon.Codec bufEncoded bytes.Buffer rawBuf bytes.Buffer - encoders *EncoderMap + encoders *MultiEncoder BufUncompressedSize int } @@ -206,7 +206,7 @@ func (m *messageWithDataContent) getEncodedBytes(codec rawtopiccommon.Codec) ([] func newMessageDataWithContent( message PublicMessage, //nolint:gocritic - encoders *EncoderMap, + encoders *MultiEncoder, ) messageWithDataContent { return messageWithDataContent{ PublicMessage: message, diff --git a/internal/topic/topicwriterinternal/writer_reconnector.go b/internal/topic/topicwriterinternal/writer_reconnector.go index d0e182a05..e1684e4c7 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector.go +++ b/internal/topic/topicwriterinternal/writer_reconnector.go @@ -124,7 +124,7 @@ type WriterReconnector struct { semaphore *semaphore.Weighted firstInitResponseProcessedChan empty.Chan lastSeqNo int64 - encodersMap *EncoderMap + encodersMap *MultiEncoder initDoneCh empty.Chan initInfo InitialInfo m xsync.RWMutex @@ -760,7 +760,7 @@ func createRawMessageData( return res, err } -func calculateAllowedCodecs(forceCodec rawtopiccommon.Codec, encoderMap *EncoderMap, +func calculateAllowedCodecs(forceCodec rawtopiccommon.Codec, encoderMap *MultiEncoder, serverCodecs rawtopiccommon.SupportedCodecs, ) rawtopiccommon.SupportedCodecs { if forceCodec != rawtopiccommon.CodecUNSPECIFIED { diff --git a/internal/topic/topicwriterinternal/writer_single_stream.go b/internal/topic/topicwriterinternal/writer_single_stream.go index d254aa075..6dbb3803c 100644 --- a/internal/topic/topicwriterinternal/writer_single_stream.go +++ b/internal/topic/topicwriterinternal/writer_single_stream.go @@ -23,7 +23,7 @@ type SingleStreamWriterConfig struct { stream RawTopicWriterStream queue *messageQueue - encodersMap *EncoderMap + encodersMap *MultiEncoder getLastSeqNum bool reconnectorInstanceID string } @@ -32,7 +32,7 @@ func newSingleStreamWriterConfig( common WritersCommonConfig, //nolint:gocritic stream RawTopicWriterStream, queue *messageQueue, - encodersMap *EncoderMap, + encodersMap *MultiEncoder, getLastSeqNum bool, reconnectorID string, ) SingleStreamWriterConfig { From c1df891320bd667c5e8b5ba702dbdddd45442747 Mon Sep 17 00:00:00 2001 From: Denis Evdokimov Date: Fri, 17 Jan 2025 15:53:33 +0300 Subject: [PATCH 11/11] feat: add buffer encoder and refactor --- CHANGELOG.md | 2 +- .../topic/topicwriterinternal/encoders.go | 64 +++++++++++-------- .../topicwriterinternal/encoders_test.go | 54 +++++++++------- internal/topic/topicwriterinternal/message.go | 10 +-- .../topicwriterinternal/writer_reconnector.go | 8 +-- .../writer_reconnector_test.go | 4 +- 6 files changed, 77 insertions(+), 65 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a67517f6d..7569fa9f0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -* Added the use of sync.Pool in EncoderMap and renamed it to MultiEncoder +* Supported pool of encoders, which implement ResetableWriter interface ## v3.97.0 * Added immutable range iterators from go1.23 into query stats to iterate over query phases and accessed tables without query stats object mutation diff --git a/internal/topic/topicwriterinternal/encoders.go b/internal/topic/topicwriterinternal/encoders.go index b7daafbe7..8bd5ea17b 100644 --- a/internal/topic/topicwriterinternal/encoders.go +++ b/internal/topic/topicwriterinternal/encoders.go @@ -1,6 +1,7 @@ package topicwriterinternal import ( + "bytes" "compress/gzip" "fmt" "io" @@ -45,30 +46,49 @@ func newEncoderPool() *encoderPool { type MultiEncoder struct { m map[rawtopiccommon.Codec]PublicCreateEncoderFunc - mx sync.RWMutex - p map[rawtopiccommon.Codec]*encoderPool + bp xsync.Pool[bytes.Buffer] + ep map[rawtopiccommon.Codec]*encoderPool } -func NewEncoderMap() *MultiEncoder { - return &MultiEncoder{ - m: map[rawtopiccommon.Codec]PublicCreateEncoderFunc{ - rawtopiccommon.CodecRaw: func(writer io.Writer) (io.WriteCloser, error) { - return nopWriteCloser{writer}, nil - }, - rawtopiccommon.CodecGzip: func(writer io.Writer) (io.WriteCloser, error) { - return gzip.NewWriter(writer), nil +func NewMultiEncoder() *MultiEncoder { + me := &MultiEncoder{ + m: make(map[rawtopiccommon.Codec]PublicCreateEncoderFunc), + bp: xsync.Pool[bytes.Buffer]{ + New: func() *bytes.Buffer { + return &bytes.Buffer{} }, }, - mx: sync.RWMutex{}, - p: make(map[rawtopiccommon.Codec]*encoderPool), + ep: make(map[rawtopiccommon.Codec]*encoderPool), } + + me.AddEncoder(rawtopiccommon.CodecRaw, func(writer io.Writer) (io.WriteCloser, error) { + return nopWriteCloser{writer}, nil + }) + me.AddEncoder(rawtopiccommon.CodecGzip, func(writer io.Writer) (io.WriteCloser, error) { + return gzip.NewWriter(writer), nil + }) + + return me } func (e *MultiEncoder) AddEncoder(codec rawtopiccommon.Codec, creator PublicCreateEncoderFunc) { e.m[codec] = creator + e.ep[codec] = newEncoderPool() +} + +func (e *MultiEncoder) Encode(codec rawtopiccommon.Codec, target io.Writer, source io.Reader) (int, error) { + buf := e.bp.GetOrNew() + defer e.bp.Put(buf) + + buf.Reset() + if _, err := buf.ReadFrom(source); err != nil { + return 0, err + } + + return e.EncodeBytes(codec, target, buf.Bytes()) } -func (e *MultiEncoder) Encode(codec rawtopiccommon.Codec, target io.Writer, data []byte) (int, error) { +func (e *MultiEncoder) EncodeBytes(codec rawtopiccommon.Codec, target io.Writer, data []byte) (int, error) { enc, err := e.createEncodeWriter(codec, target) if err != nil { return 0, err @@ -82,27 +102,15 @@ func (e *MultiEncoder) Encode(codec rawtopiccommon.Codec, target io.Writer, data return 0, err } - resetableEnc, ok := enc.(PublicResetableWriter) - if ok { - e.mx.Lock() - if _, ok := e.p[codec]; !ok { - e.p[codec] = newEncoderPool() - } - e.mx.Unlock() - - e.mx.RLock() - e.p[codec].Put(resetableEnc) - e.mx.RUnlock() + if resetableEnc, ok := enc.(PublicResetableWriter); ok { + e.ep[codec].Put(resetableEnc) } return n, nil } func (e *MultiEncoder) createEncodeWriter(codec rawtopiccommon.Codec, target io.Writer) (io.WriteCloser, error) { - e.mx.RLock() - defer e.mx.RUnlock() - - if ePool, ok := e.p[codec]; ok { + if ePool, ok := e.ep[codec]; ok { wr := ePool.Get() if wr != nil { wr.Reset(target) diff --git a/internal/topic/topicwriterinternal/encoders_test.go b/internal/topic/topicwriterinternal/encoders_test.go index 9672661bb..b2bf7bd97 100644 --- a/internal/topic/topicwriterinternal/encoders_test.go +++ b/internal/topic/topicwriterinternal/encoders_test.go @@ -23,7 +23,7 @@ func TestEncoderSelector_CodecMeasure(t *testing.T) { }) t.Run("One", func(t *testing.T) { s := NewEncoderSelector( - NewEncoderMap(), + NewMultiEncoder(), rawtopiccommon.SupportedCodecs{rawtopiccommon.CodecRaw}, 1, &trace.Topic{}, @@ -210,7 +210,7 @@ func TestCompressMessages(t *testing.T) { }) } -func TestEncodersPool(t *testing.T) { +func TestMultiEncoder(t *testing.T) { decompressGzip := func(rd io.Reader) string { gzreader, err := gzip.NewReader(rd) require.NoError(t, err) @@ -221,67 +221,77 @@ func TestEncodersPool(t *testing.T) { return string(decompressedMsg) } + t.Run("BuffersPool", func(t *testing.T) { + testMultiEncoder := NewMultiEncoder() + + buf := &bytes.Buffer{} + for i := 0; i < 50; i++ { + testMsg := []byte(fmt.Sprintf("test_data_%d", i)) + + buf.Reset() + _, err := testMultiEncoder.Encode(rawtopiccommon.CodecGzip, buf, bytes.NewReader(testMsg)) + require.NoError(t, err) + + require.Equal(t, string(testMsg), decompressGzip(buf)) + } + }) + t.Run("NotResetableWriter", func(t *testing.T) { - testEncoderMap := NewEncoderMap() + testMultiEncoder := NewMultiEncoder() + require.Len(t, testMultiEncoder.ep, 2) buf := &bytes.Buffer{} - _, err := testEncoderMap.Encode(rawtopiccommon.CodecRaw, buf, []byte("test_data")) + _, err := testMultiEncoder.EncodeBytes(rawtopiccommon.CodecRaw, buf, []byte("test_data")) require.NoError(t, err) - require.Len(t, testEncoderMap.p, 0) require.Equal(t, "test_data", buf.String()) }) t.Run("ResetableWriterCustom", func(t *testing.T) { - testEncoderMap := NewEncoderMap() + testMultiEncoder := NewMultiEncoder() - customCodecCode := rawtopiccommon.Codec(5) - testEncoderMap.AddEncoder(customCodecCode, func(writer io.Writer) (io.WriteCloser, error) { + customCodecCode := rawtopiccommon.Codec(10001) + testMultiEncoder.AddEncoder(customCodecCode, func(writer io.Writer) (io.WriteCloser, error) { return gzip.NewWriter(writer), nil }) + require.Len(t, testMultiEncoder.ep, 3) buf := &bytes.Buffer{} - _, err := testEncoderMap.Encode(customCodecCode, buf, []byte("test_data_1")) + _, err := testMultiEncoder.EncodeBytes(customCodecCode, buf, []byte("test_data_1")) require.NoError(t, err) - require.Len(t, testEncoderMap.p, 1) require.Equal(t, "test_data_1", decompressGzip(buf)) buf.Reset() - _, err = testEncoderMap.Encode(rawtopiccommon.CodecGzip, buf, []byte("test_data_2")) + _, err = testMultiEncoder.EncodeBytes(rawtopiccommon.CodecGzip, buf, []byte("test_data_2")) require.NoError(t, err) - require.Len(t, testEncoderMap.p, 2) require.Equal(t, "test_data_2", decompressGzip(buf)) }) t.Run("ResetableWriter", func(t *testing.T) { - testEncoderMap := NewEncoderMap() + testMultiEncoder := NewMultiEncoder() buf := &bytes.Buffer{} - _, err := testEncoderMap.Encode(rawtopiccommon.CodecGzip, buf, []byte("test_data_1")) + _, err := testMultiEncoder.EncodeBytes(rawtopiccommon.CodecGzip, buf, []byte("test_data_1")) require.NoError(t, err) - require.Len(t, testEncoderMap.p, 1) require.Equal(t, "test_data_1", decompressGzip(buf)) buf.Reset() - _, err = testEncoderMap.Encode(rawtopiccommon.CodecGzip, buf, []byte("test_data_2")) + _, err = testMultiEncoder.EncodeBytes(rawtopiccommon.CodecGzip, buf, []byte("test_data_2")) require.NoError(t, err) - require.Len(t, testEncoderMap.p, 1) require.Equal(t, "test_data_2", decompressGzip(buf)) }) t.Run("ResetableWriterManyMessages", func(t *testing.T) { - testEncoderMap := NewEncoderMap() + testMultiEncoder := NewMultiEncoder() buf := &bytes.Buffer{} for i := 0; i < 50; i++ { - testMsg := []byte(fmt.Sprintf("data_%d", i)) + testMsg := []byte(fmt.Sprintf("test_data_%d", i)) buf.Reset() - _, err := testEncoderMap.Encode(rawtopiccommon.CodecGzip, buf, testMsg) + _, err := testMultiEncoder.EncodeBytes(rawtopiccommon.CodecGzip, buf, testMsg) require.NoError(t, err) require.Equal(t, string(testMsg), decompressGzip(buf)) } - - require.Len(t, testEncoderMap.p, 1) }) } diff --git a/internal/topic/topicwriterinternal/message.go b/internal/topic/topicwriterinternal/message.go index bc2cf70ef..0e82532fe 100644 --- a/internal/topic/topicwriterinternal/message.go +++ b/internal/topic/topicwriterinternal/message.go @@ -111,7 +111,7 @@ func (m *messageWithDataContent) encodeRawContent(codec rawtopiccommon.Codec) ([ m.bufEncoded.Reset() - _, err := m.encoders.Encode(codec, &m.bufEncoded, m.rawBuf.Bytes()) + _, err := m.encoders.EncodeBytes(codec, &m.bufEncoded, m.rawBuf.Bytes()) if err != nil { return nil, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf( "ydb: failed to compress message, codec '%v': %w", @@ -151,13 +151,7 @@ func (m *messageWithDataContent) readDataToTargetCodec(codec rawtopiccommon.Code reader = &bytes.Reader{} } - buf := &bytes.Buffer{} - _, err := buf.ReadFrom(reader) - if err != nil { - return err - } - - bytesCount, err := m.encoders.Encode(codec, &m.bufEncoded, buf.Bytes()) + bytesCount, err := m.encoders.Encode(codec, &m.bufEncoded, reader) if err != nil { return xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf( "ydb: failed compress message with codec '%v': %w", diff --git a/internal/topic/topicwriterinternal/writer_reconnector.go b/internal/topic/topicwriterinternal/writer_reconnector.go index e1684e4c7..a7632c4a4 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector.go +++ b/internal/topic/topicwriterinternal/writer_reconnector.go @@ -156,7 +156,7 @@ func newWriterReconnectorStopped( queue: newMessageQueue(), lastSeqNo: -1, firstInitResponseProcessedChan: make(empty.Chan), - encodersMap: NewEncoderMap(), + encodersMap: NewMultiEncoder(), writerInstanceID: writerInstanceID.String(), retrySettings: cfg.RetrySettings, } @@ -760,11 +760,11 @@ func createRawMessageData( return res, err } -func calculateAllowedCodecs(forceCodec rawtopiccommon.Codec, encoderMap *MultiEncoder, +func calculateAllowedCodecs(forceCodec rawtopiccommon.Codec, multiEncoder *MultiEncoder, serverCodecs rawtopiccommon.SupportedCodecs, ) rawtopiccommon.SupportedCodecs { if forceCodec != rawtopiccommon.CodecUNSPECIFIED { - if serverCodecs.AllowedByCodecsList(forceCodec) && encoderMap.IsSupported(forceCodec) { + if serverCodecs.AllowedByCodecsList(forceCodec) && multiEncoder.IsSupported(forceCodec) { return rawtopiccommon.SupportedCodecs{forceCodec} } @@ -779,7 +779,7 @@ func calculateAllowedCodecs(forceCodec rawtopiccommon.Codec, encoderMap *MultiEn res := make(rawtopiccommon.SupportedCodecs, 0, len(serverCodecs)) for _, codec := range serverCodecs { - if encoderMap.IsSupported(codec) { + if multiEncoder.IsSupported(codec) { res = append(res, codec) } } diff --git a/internal/topic/topicwriterinternal/writer_reconnector_test.go b/internal/topic/topicwriterinternal/writer_reconnector_test.go index dc89dcafb..405265399 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector_test.go +++ b/internal/topic/topicwriterinternal/writer_reconnector_test.go @@ -26,7 +26,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest" ) -var testCommonEncoders = NewEncoderMap() +var testCommonEncoders = NewMultiEncoder() func TestWriterImpl_AutoSeq(t *testing.T) { t.Run("OK", func(t *testing.T) { @@ -829,7 +829,7 @@ func TestSplitMessagesByBufCodec(t *testing.T) { func TestCalculateAllowedCodecs(t *testing.T) { customCodecSupported := rawtopiccommon.Codec(rawtopiccommon.CodecCustomerFirst) customCodecUnsupported := rawtopiccommon.Codec(rawtopiccommon.CodecCustomerFirst + 1) - encoders := NewEncoderMap() + encoders := NewMultiEncoder() encoders.AddEncoder(customCodecSupported, func(writer io.Writer) (io.WriteCloser, error) { return nil, errors.New("test") })