From 11c429662339ce85f27b124449b26a113d70d4c8 Mon Sep 17 00:00:00 2001 From: cpegeric Date: Thu, 24 Jul 2025 09:44:26 +0100 Subject: [PATCH 1/8] single atomic --- pkg/fileservice/bytes.go | 31 ++++++++-------- pkg/fileservice/bytes_test.go | 66 +++++++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+), 17 deletions(-) diff --git a/pkg/fileservice/bytes.go b/pkg/fileservice/bytes.go index 57a3bbcd957b9..ed6a0070729f6 100644 --- a/pkg/fileservice/bytes.go +++ b/pkg/fileservice/bytes.go @@ -25,9 +25,7 @@ import ( type Bytes struct { bytes []byte deallocator malloc.Deallocator - deallocated uint32 - _refs atomic.Int32 - refs *atomic.Int32 + refs atomic.Int32 } func (b *Bytes) Size() int64 { @@ -44,24 +42,24 @@ func (b *Bytes) Slice(length int) fscache.Data { } func (b *Bytes) Retain() { - if b.refs != nil { - b.refs.Add(1) - } + b.refs.Add(1) } func (b *Bytes) Release() { - if b.refs != nil { - if n := b.refs.Add(-1); n == 0 { - if b.deallocator != nil && - atomic.CompareAndSwapUint32(&b.deallocated, 0, 1) { - b.deallocator.Deallocate(malloc.NoHints) - } + n := b.refs.Add(-1) + if n == 0 { + if b.bytes == nil { + panic("Bytes.Release: deallocate nil pointer error") } - } else { - if b.deallocator != nil && - atomic.CompareAndSwapUint32(&b.deallocated, 0, 1) { + + if b.deallocator != nil { b.deallocator.Deallocate(malloc.NoHints) } + + // set bytes to nil + b.bytes = nil + } else if n < 0 { + panic("Bytes.Release: double free") } } @@ -80,8 +78,7 @@ func (b *bytesAllocator) allocateCacheData(size int, hints malloc.Hints) fscache bytes: slice, deallocator: dec, } - bytes._refs.Store(1) - bytes.refs = &bytes._refs + bytes.refs.Store(1) return bytes } diff --git a/pkg/fileservice/bytes_test.go b/pkg/fileservice/bytes_test.go index 0859bdbb0445b..27340c18cb390 100644 --- a/pkg/fileservice/bytes_test.go +++ b/pkg/fileservice/bytes_test.go @@ -29,6 +29,72 @@ func TestBytes(t *testing.T) { bytes: bytes, deallocator: deallocator, } + bs.refs.Store(1) bs.Release() }) } + +func TestBytesError(t *testing.T) { + t.Run("Bytes double free", func(t *testing.T) { + bytes, deallocator, err := ioAllocator().Allocate(42, malloc.NoHints) + assert.Nil(t, err) + bs := &Bytes{ + bytes: bytes, + deallocator: deallocator, + } + bs.refs.Store(1) + + // deallocate memory + bs.Release() + + // nil pointer + ptr := bs.Bytes() + assert.Nil(t, ptr) + + // double free + assert.Panics(t, func() { bs.Release() }, "double free") + }) + + t.Run("Bytes without refs", func(t *testing.T) { + bytes, deallocator, err := ioAllocator().Allocate(42, malloc.NoHints) + assert.Nil(t, err) + bs := &Bytes{ + bytes: bytes, + deallocator: deallocator, + } + bs.refs.Store(1) + + // deallocate memory + bs.Release() + + // nil pointer + ptr := bs.Bytes() + assert.Nil(t, ptr) + + // increment deallocated bytes and try to deallocate again + bs.Retain() // refs = 1 + assert.Panics(t, func() { bs.Release() }, "deallocate nil pointer") + }) + + t.Run("Bytes nil deallocator", func(t *testing.T) { + bs := &Bytes{ + bytes: []byte("123"), + deallocator: nil, + } + bs.refs.Store(1) + + // deallocate memory + bs.Release() + + // nil pointer + ptr := bs.Bytes() + assert.Nil(t, ptr) + + assert.Panics(t, func() { bs.Release() }, "double free") + + // increment deallocated bytes and try to deallocate again + bs.Retain() // refs = 0 + bs.Retain() // refs = 1 + assert.Panics(t, func() { bs.Release() }, "deallocate nil pointer") + }) +} From 984687f9f693152cf6840ab6f2742c9106921161 Mon Sep 17 00:00:00 2001 From: cpegeric Date: Thu, 24 Jul 2025 11:11:18 +0100 Subject: [PATCH 2/8] concurrent test --- pkg/fileservice/bytes.go | 3 +++ pkg/fileservice/bytes_test.go | 29 +++++++++++++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/pkg/fileservice/bytes.go b/pkg/fileservice/bytes.go index ed6a0070729f6..15ecc13d50118 100644 --- a/pkg/fileservice/bytes.go +++ b/pkg/fileservice/bytes.go @@ -33,6 +33,9 @@ func (b *Bytes) Size() int64 { } func (b *Bytes) Bytes() []byte { + if b.refs.Load() <= 0 { + return nil + } return b.bytes } diff --git a/pkg/fileservice/bytes_test.go b/pkg/fileservice/bytes_test.go index 27340c18cb390..8367e9a87a3fb 100644 --- a/pkg/fileservice/bytes_test.go +++ b/pkg/fileservice/bytes_test.go @@ -15,7 +15,9 @@ package fileservice import ( + "sync" "testing" + "time" "github.com/matrixorigin/matrixone/pkg/common/malloc" "github.com/stretchr/testify/assert" @@ -98,3 +100,30 @@ func TestBytesError(t *testing.T) { assert.Panics(t, func() { bs.Release() }, "deallocate nil pointer") }) } + +func TestBytesConcurrent(t *testing.T) { + bs := &Bytes{ + bytes: []byte("123"), + deallocator: nil, + } + bs.refs.Store(1) + nthread := 5 + var wg sync.WaitGroup + for i := 0; i < nthread; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + + bs.Retain() + + time.Sleep(1 * time.Millisecond) + + bs.Release() + }(i) + } + + wg.Wait() + + bs.Release() + +} From e0b84528176f053109ee4b413102cca05cfb3604 Mon Sep 17 00:00:00 2001 From: cpegeric Date: Thu, 24 Jul 2025 11:24:47 +0100 Subject: [PATCH 3/8] update --- pkg/fileservice/bytes.go | 10 +++------- pkg/fileservice/bytes_test.go | 26 -------------------------- 2 files changed, 3 insertions(+), 33 deletions(-) diff --git a/pkg/fileservice/bytes.go b/pkg/fileservice/bytes.go index 15ecc13d50118..d7c940393945c 100644 --- a/pkg/fileservice/bytes.go +++ b/pkg/fileservice/bytes.go @@ -51,16 +51,12 @@ func (b *Bytes) Retain() { func (b *Bytes) Release() { n := b.refs.Add(-1) if n == 0 { - if b.bytes == nil { - panic("Bytes.Release: deallocate nil pointer error") - } - + // set bytes to nil + b.bytes = nil if b.deallocator != nil { b.deallocator.Deallocate(malloc.NoHints) + b.deallocator = nil } - - // set bytes to nil - b.bytes = nil } else if n < 0 { panic("Bytes.Release: double free") } diff --git a/pkg/fileservice/bytes_test.go b/pkg/fileservice/bytes_test.go index 8367e9a87a3fb..c273b4dd92361 100644 --- a/pkg/fileservice/bytes_test.go +++ b/pkg/fileservice/bytes_test.go @@ -57,27 +57,6 @@ func TestBytesError(t *testing.T) { assert.Panics(t, func() { bs.Release() }, "double free") }) - t.Run("Bytes without refs", func(t *testing.T) { - bytes, deallocator, err := ioAllocator().Allocate(42, malloc.NoHints) - assert.Nil(t, err) - bs := &Bytes{ - bytes: bytes, - deallocator: deallocator, - } - bs.refs.Store(1) - - // deallocate memory - bs.Release() - - // nil pointer - ptr := bs.Bytes() - assert.Nil(t, ptr) - - // increment deallocated bytes and try to deallocate again - bs.Retain() // refs = 1 - assert.Panics(t, func() { bs.Release() }, "deallocate nil pointer") - }) - t.Run("Bytes nil deallocator", func(t *testing.T) { bs := &Bytes{ bytes: []byte("123"), @@ -93,11 +72,6 @@ func TestBytesError(t *testing.T) { assert.Nil(t, ptr) assert.Panics(t, func() { bs.Release() }, "double free") - - // increment deallocated bytes and try to deallocate again - bs.Retain() // refs = 0 - bs.Retain() // refs = 1 - assert.Panics(t, func() { bs.Release() }, "deallocate nil pointer") }) } From b6bd15b440a68bba0179aa078c0e68ab64923238 Mon Sep 17 00:00:00 2001 From: cpegeric Date: Thu, 24 Jul 2025 11:28:46 +0100 Subject: [PATCH 4/8] update --- pkg/fileservice/bytes_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/fileservice/bytes_test.go b/pkg/fileservice/bytes_test.go index c273b4dd92361..22de1a685a8b1 100644 --- a/pkg/fileservice/bytes_test.go +++ b/pkg/fileservice/bytes_test.go @@ -100,4 +100,6 @@ func TestBytesConcurrent(t *testing.T) { bs.Release() + // double free + assert.Panics(t, func() { bs.Release() }, "double free") } From 48cb9a133ea1ee3efa068e2f9544952d73e0e05b Mon Sep 17 00:00:00 2001 From: cpegeric Date: Fri, 25 Jul 2025 15:34:28 +0100 Subject: [PATCH 5/8] update --- pkg/fileservice/bytes.go | 10 ++++++++- pkg/fileservice/bytes_test.go | 38 +++++++++++++++++++---------------- 2 files changed, 30 insertions(+), 18 deletions(-) diff --git a/pkg/fileservice/bytes.go b/pkg/fileservice/bytes.go index d7c940393945c..9989b6fa8a353 100644 --- a/pkg/fileservice/bytes.go +++ b/pkg/fileservice/bytes.go @@ -34,7 +34,7 @@ func (b *Bytes) Size() int64 { func (b *Bytes) Bytes() []byte { if b.refs.Load() <= 0 { - return nil + panic("Bytes.Bytes: memory was already deallocated.") } return b.bytes } @@ -62,6 +62,14 @@ func (b *Bytes) Release() { } } +func NewBytes(size int) *Bytes { + bytes := &Bytes{ + bytes: make([]byte, size), + } + bytes.refs.Store(1) + return bytes +} + type bytesAllocator struct { allocator malloc.Allocator } diff --git a/pkg/fileservice/bytes_test.go b/pkg/fileservice/bytes_test.go index 22de1a685a8b1..f85e1412ecc60 100644 --- a/pkg/fileservice/bytes_test.go +++ b/pkg/fileservice/bytes_test.go @@ -37,7 +37,7 @@ func TestBytes(t *testing.T) { } func TestBytesError(t *testing.T) { - t.Run("Bytes double free", func(t *testing.T) { + t.Run("Bytes get invalid memory", func(t *testing.T) { bytes, deallocator, err := ioAllocator().Allocate(42, malloc.NoHints) assert.Nil(t, err) bs := &Bytes{ @@ -50,37 +50,41 @@ func TestBytesError(t *testing.T) { bs.Release() // nil pointer - ptr := bs.Bytes() - assert.Nil(t, ptr) - - // double free - assert.Panics(t, func() { bs.Release() }, "double free") + assert.Panics(t, func() { bs.Bytes() }, "get invalid memory") }) - t.Run("Bytes nil deallocator", func(t *testing.T) { + t.Run("Bytes double free", func(t *testing.T) { + bytes, deallocator, err := ioAllocator().Allocate(42, malloc.NoHints) + assert.Nil(t, err) bs := &Bytes{ - bytes: []byte("123"), - deallocator: nil, + bytes: bytes, + deallocator: deallocator, } bs.refs.Store(1) // deallocate memory bs.Release() - // nil pointer - ptr := bs.Bytes() - assert.Nil(t, ptr) + // double free + assert.Panics(t, func() { bs.Release() }, "double free") + }) + + t.Run("Bytes nil deallocator", func(t *testing.T) { + data := []byte("123") + bs := NewBytes(len(data)) + copy(bs.bytes, data) + + // deallocate memory + bs.Release() assert.Panics(t, func() { bs.Release() }, "double free") }) } func TestBytesConcurrent(t *testing.T) { - bs := &Bytes{ - bytes: []byte("123"), - deallocator: nil, - } - bs.refs.Store(1) + data := []byte("123") + bs := NewBytes(len(data)) + copy(bs.bytes, data) nthread := 5 var wg sync.WaitGroup for i := 0; i < nthread; i++ { From a0b6ec314ad07903832312c1dbd4551583682c09 Mon Sep 17 00:00:00 2001 From: cpegeric Date: Fri, 25 Jul 2025 15:49:08 +0100 Subject: [PATCH 6/8] update --- pkg/fileservice/bytes.go | 4 ++-- pkg/fileservice/bytes_test.go | 6 ++---- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/pkg/fileservice/bytes.go b/pkg/fileservice/bytes.go index 9989b6fa8a353..871fb23550d54 100644 --- a/pkg/fileservice/bytes.go +++ b/pkg/fileservice/bytes.go @@ -62,9 +62,9 @@ func (b *Bytes) Release() { } } -func NewBytes(size int) *Bytes { +func NewBytes(data []byte) *Bytes { bytes := &Bytes{ - bytes: make([]byte, size), + bytes: data, } bytes.refs.Store(1) return bytes diff --git a/pkg/fileservice/bytes_test.go b/pkg/fileservice/bytes_test.go index f85e1412ecc60..0040003636a61 100644 --- a/pkg/fileservice/bytes_test.go +++ b/pkg/fileservice/bytes_test.go @@ -71,8 +71,7 @@ func TestBytesError(t *testing.T) { t.Run("Bytes nil deallocator", func(t *testing.T) { data := []byte("123") - bs := NewBytes(len(data)) - copy(bs.bytes, data) + bs := NewBytes(data) // deallocate memory bs.Release() @@ -83,8 +82,7 @@ func TestBytesError(t *testing.T) { func TestBytesConcurrent(t *testing.T) { data := []byte("123") - bs := NewBytes(len(data)) - copy(bs.bytes, data) + bs := NewBytes(data) nthread := 5 var wg sync.WaitGroup for i := 0; i < nthread; i++ { From a1cc5a241e26e83dfa211e96dc80aa3e6c6a824f Mon Sep 17 00:00:00 2001 From: cpegeric Date: Fri, 25 Jul 2025 15:55:13 +0100 Subject: [PATCH 7/8] remote cache with NewBytes --- pkg/fileservice/remote_cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/fileservice/remote_cache.go b/pkg/fileservice/remote_cache.go index 2d32f45baba79..5be6c45f79fec 100644 --- a/pkg/fileservice/remote_cache.go +++ b/pkg/fileservice/remote_cache.go @@ -130,7 +130,7 @@ func (r *RemoteCache) Read(ctx context.Context, vector *IOVector) error { idx := int(cacheData.Index) if cacheData.Hit { vector.Entries[idx].done = true - vector.Entries[idx].CachedData = &Bytes{bytes: cacheData.Data} + vector.Entries[idx].CachedData = NewBytes(cacheData.Data) vector.Entries[idx].fromCache = r numHit++ } From f86c0ebb7ff036d81d9300b689453da6b4c82235 Mon Sep 17 00:00:00 2001 From: cpegeric Date: Fri, 25 Jul 2025 18:41:21 +0100 Subject: [PATCH 8/8] fix ut --- pkg/fileservice/remote_cache_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/fileservice/remote_cache_test.go b/pkg/fileservice/remote_cache_test.go index e9819c9a77ec3..e1850b30e90db 100644 --- a/pkg/fileservice/remote_cache_test.go +++ b/pkg/fileservice/remote_cache_test.go @@ -89,7 +89,7 @@ func TestRemoteCache(t *testing.T) { err = sf2.rc.Read(ctx, ioVec2) assert.NoError(t, err) assert.Equal(t, 1, len(ioVec2.Entries)) - assert.Equal(t, &Bytes{bytes: []byte{1, 2}}, ioVec2.Entries[0].CachedData) + assert.Equal(t, NewBytes([]byte{1, 2}), ioVec2.Entries[0].CachedData) assert.Equal(t, true, ioVec2.Entries[0].done) assert.NotNil(t, ioVec2.Entries[0].fromCache)