From 4034f7eec518176c5b135f7956bc1cb131ef133c Mon Sep 17 00:00:00 2001 From: cpegeric Date: Fri, 16 May 2025 10:17:46 +0100 Subject: [PATCH 01/33] update deleted --- pkg/fileservice/fifocache/bench2_test.go | 15 +++++--- pkg/fileservice/fifocache/fifo.go | 48 ++++++++++++++++++++---- pkg/fileservice/fifocache/shardmap.go | 15 ++++++++ 3 files changed, 64 insertions(+), 14 deletions(-) diff --git a/pkg/fileservice/fifocache/bench2_test.go b/pkg/fileservice/fifocache/bench2_test.go index 2e39dd1074760..133bf1bac8409 100644 --- a/pkg/fileservice/fifocache/bench2_test.go +++ b/pkg/fileservice/fifocache/bench2_test.go @@ -43,7 +43,7 @@ func get_rand(start int64, end int64, r *rand.Rand) int64 { return start + r.Int64N(end-start) } -func dataset_read(b *testing.B, ctx context.Context, cache *Cache[int64, int64], startkey int64, endkey int64, r *rand.Rand) { +func dataset_read(b *testing.B, ctx context.Context, cache *Cache[int64, int64], startkey int64, endkey int64, r *rand.Rand, mutex *sync.Mutex) { ncpu := runtime.NumCPU() var wg sync.WaitGroup @@ -60,8 +60,9 @@ func dataset_read(b *testing.B, ctx context.Context, cache *Cache[int64, int64], //fmt.Printf("start = %d, end = %d\n", startkey, endkey) for range endkey - startkey { - + mutex.Lock() key := get_rand(startkey, endkey, r) + mutex.Unlock() cache_read(ctx, cache, key) } } @@ -72,6 +73,7 @@ func dataset_read(b *testing.B, ctx context.Context, cache *Cache[int64, int64], } func data_shift(b *testing.B, time int64) { + var mutex sync.Mutex ctx := context.Background() cache_size := g_cache_size cache := New[int64, int64](fscache.ConstCapacity(int64(cache_size)), ShardInt[int64], nil, nil, nil, false) @@ -88,12 +90,13 @@ func data_shift(b *testing.B, time int64) { b.ResetTimer() - dataset_read(b, ctx, cache, d1[0], d1[1], r) - dataset_read(b, ctx, cache, d2[0], d2[1], r) - dataset_read(b, ctx, cache, d3[0], d3[1], r) + dataset_read(b, ctx, cache, d1[0], d1[1], r, &mutex) + dataset_read(b, ctx, cache, d2[0], d2[1], r, &mutex) + dataset_read(b, ctx, cache, d3[0], d3[1], r, &mutex) } func data_readNx(b *testing.B, time int64) { + var mutex sync.Mutex ctx := context.Background() cache_size := g_cache_size cache := New[int64, int64](fscache.ConstCapacity(int64(cache_size)), ShardInt[int64], nil, nil, nil, false) @@ -102,7 +105,7 @@ func data_readNx(b *testing.B, time int64) { r := rand.New(rand.NewPCG(1, 2)) b.ResetTimer() - dataset_read(b, ctx, cache, start, end, r) + dataset_read(b, ctx, cache, start, end, r, &mutex) } func BenchmarkSimCacheRead1x(b *testing.B) { diff --git a/pkg/fileservice/fifocache/fifo.go b/pkg/fileservice/fifocache/fifo.go index fbe946a3e13f8..9a32c8088921d 100644 --- a/pkg/fileservice/fifocache/fifo.go +++ b/pkg/fileservice/fifocache/fifo.go @@ -42,11 +42,12 @@ type Cache[K comparable, V any] struct { } type _CacheItem[K comparable, V any] struct { - mu sync.Mutex - key K - value V - size int64 - freq int8 + mu sync.Mutex + key K + value V + size int64 + freq int8 + deleted bool } /* @@ -75,6 +76,22 @@ func (c *_CacheItem[K, V]) getFreq() int8 { return c.freq } +func (c *_CacheItem[K, V]) isDeleted() bool { + c.mu.Lock() + defer c.mu.Unlock() + return c.deleted +} + +func (c *_CacheItem[K, V]) setDeleted() bool { + c.mu.Lock() + defer c.mu.Unlock() + if !c.deleted { + c.deleted = true + return true + } + return false +} + // thread safe to run post function such as postGet, postSet and postEvict func (c *_CacheItem[K, V]) postFunc(ctx context.Context, fn func(ctx context.Context, key K, value V, size int64)) { if fn == nil { @@ -196,14 +213,17 @@ func (c *Cache[K, V]) Get(ctx context.Context, key K) (value V, ok bool) { } func (c *Cache[K, V]) Delete(ctx context.Context, key K) { - item, ok := c.htab.Get(key) + item, ok := c.htab.GetAndDelete(key) if !ok { return } - c.htab.Remove(key) + + needsPostEvict := item.setDeleted() // post evict - item.postFunc(ctx, c.postEvict) + if needsPostEvict { + item.postFunc(ctx, c.postEvict) + } // queues will be update in evict } @@ -261,6 +281,12 @@ func (c *Cache[K, V]) evictSmall(ctx context.Context) { return } + deleted := item.isDeleted() + if deleted { + c.usedSmall.Add(-item.size) + return + } + if item.getFreq() > 1 { // put main c.main.enqueue(item) @@ -290,6 +316,12 @@ func (c *Cache[K, V]) evictMain(ctx context.Context) { break } + deleted := item.isDeleted() + if deleted { + c.usedMain.Add(-item.size) + return + } + if item.getFreq() > 0 { // re-enqueue item.dec() diff --git a/pkg/fileservice/fifocache/shardmap.go b/pkg/fileservice/fifocache/shardmap.go index b224e1d56f53a..d2c5462455e77 100644 --- a/pkg/fileservice/fifocache/shardmap.go +++ b/pkg/fileservice/fifocache/shardmap.go @@ -93,3 +93,18 @@ func (m *ShardMap[K, V]) CompareAndDelete(key K, fn func(k1, k2 K) bool, postfn postfn(v) } } + +func (m *ShardMap[K, V]) GetAndDelete(key K) (V, bool) { + + s := &m.shards[m.hashfn(key)%numShards] + s.Lock() + defer s.Unlock() + + v, ok := s.values[key] + if !ok { + return v, ok + } + + delete(s.values, key) + return v, ok +} From dd044ffbea9b722eff7f32a33597a978367d1394 Mon Sep 17 00:00:00 2001 From: cpegeric Date: Fri, 16 May 2025 10:36:32 +0100 Subject: [PATCH 02/33] cleanup --- pkg/fileservice/fifocache/bench2_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/fileservice/fifocache/bench2_test.go b/pkg/fileservice/fifocache/bench2_test.go index 133bf1bac8409..7370d79de4bdd 100644 --- a/pkg/fileservice/fifocache/bench2_test.go +++ b/pkg/fileservice/fifocache/bench2_test.go @@ -39,7 +39,9 @@ func cache_read(ctx context.Context, cache *Cache[int64, int64], key int64) { } } -func get_rand(start int64, end int64, r *rand.Rand) int64 { +func get_rand(start int64, end int64, r *rand.Rand, mutex *sync.Mutex) int64 { + mutex.Lock() + defer mutex.Unlock() return start + r.Int64N(end-start) } @@ -60,9 +62,7 @@ func dataset_read(b *testing.B, ctx context.Context, cache *Cache[int64, int64], //fmt.Printf("start = %d, end = %d\n", startkey, endkey) for range endkey - startkey { - mutex.Lock() - key := get_rand(startkey, endkey, r) - mutex.Unlock() + key := get_rand(startkey, endkey, r, mutex) cache_read(ctx, cache, key) } } From 9e97aee1d99d716cb847640a0d2b3d8723496dcb Mon Sep 17 00:00:00 2001 From: cpegeric Date: Fri, 16 May 2025 10:48:34 +0100 Subject: [PATCH 03/33] update --- pkg/fileservice/fifocache/fifo.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/fileservice/fifocache/fifo.go b/pkg/fileservice/fifocache/fifo.go index 9a32c8088921d..8cc9aedda3669 100644 --- a/pkg/fileservice/fifocache/fifo.go +++ b/pkg/fileservice/fifocache/fifo.go @@ -313,7 +313,7 @@ func (c *Cache[K, V]) evictMain(ctx context.Context) { item, ok := c.main.dequeue() if !ok { // empty queue - break + return } deleted := item.isDeleted() From 15dbb3414427fc4320a618fea91a66974ad6d8d7 Mon Sep 17 00:00:00 2001 From: cpegeric Date: Fri, 16 May 2025 11:33:54 +0100 Subject: [PATCH 04/33] update --- pkg/fileservice/fifocache/fifo.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/fileservice/fifocache/fifo.go b/pkg/fileservice/fifocache/fifo.go index 8cc9aedda3669..6e1cfe8e6397a 100644 --- a/pkg/fileservice/fifocache/fifo.go +++ b/pkg/fileservice/fifocache/fifo.go @@ -296,7 +296,9 @@ func (c *Cache[K, V]) evictSmall(ctx context.Context) { // evict c.htab.Remove(item.key) // post evict - item.postFunc(ctx, c.postEvict) + if item.setDeleted() { + item.postFunc(ctx, c.postEvict) + } c.usedSmall.Add(-item.size) if !c.disable_s3fifo { @@ -330,7 +332,9 @@ func (c *Cache[K, V]) evictMain(ctx context.Context) { // evict c.htab.Remove(item.key) // post evict - item.postFunc(ctx, c.postEvict) + if item.setDeleted() { + item.postFunc(ctx, c.postEvict) + } c.usedMain.Add(-item.size) return } From 12ced87711a6c782b283585dfd4736db4c605849 Mon Sep 17 00:00:00 2001 From: cpegeric Date: Fri, 16 May 2025 15:50:55 +0100 Subject: [PATCH 05/33] add tests --- pkg/fileservice/fifocache/shardmap_test.go | 40 ++++++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 pkg/fileservice/fifocache/shardmap_test.go diff --git a/pkg/fileservice/fifocache/shardmap_test.go b/pkg/fileservice/fifocache/shardmap_test.go new file mode 100644 index 0000000000000..02db229484911 --- /dev/null +++ b/pkg/fileservice/fifocache/shardmap_test.go @@ -0,0 +1,40 @@ +// Copyright 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fifocache + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestShardMap(t *testing.T) { + + m := NewShardMap[int, string](ShardInt[int]) + ok := m.Set(1, "1") + assert.Equal(t, ok, true) + ok = m.Set(1, "1") + assert.Equal(t, ok, false) + + v, ok := m.Get(1) + assert.Equal(t, ok, true) + assert.Equal(t, v, "1") + + _, ok = m.GetAndDelete(0) + assert.Equal(t, ok, false) + + v, ok = m.GetAndDelete(1) + assert.Equal(t, v, "1") +} From 0331f9c99e12094671c80afeed3dda0f2448d666 Mon Sep 17 00:00:00 2001 From: cpegeric Date: Fri, 16 May 2025 16:32:53 +0100 Subject: [PATCH 06/33] fix sca --- pkg/fileservice/fifocache/shardmap_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/fileservice/fifocache/shardmap_test.go b/pkg/fileservice/fifocache/shardmap_test.go index 02db229484911..d62c869b4bc73 100644 --- a/pkg/fileservice/fifocache/shardmap_test.go +++ b/pkg/fileservice/fifocache/shardmap_test.go @@ -37,4 +37,5 @@ func TestShardMap(t *testing.T) { v, ok = m.GetAndDelete(1) assert.Equal(t, v, "1") + assert.Equal(t, ok, true) } From cbf8647ce1ec22fd0dc2d75ea5a0514c55c14dc5 Mon Sep 17 00:00:00 2001 From: cpegeric Date: Fri, 16 May 2025 16:38:05 +0100 Subject: [PATCH 07/33] check double free --- pkg/fileservice/bytes.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/fileservice/bytes.go b/pkg/fileservice/bytes.go index 57a3bbcd957b9..aa43b1b593403 100644 --- a/pkg/fileservice/bytes.go +++ b/pkg/fileservice/bytes.go @@ -50,17 +50,23 @@ func (b *Bytes) Retain() { } func (b *Bytes) Release() { + if b.bytes == nil { + panic("fileservice.Bytes.Release() double free") + } + if b.refs != nil { if n := b.refs.Add(-1); n == 0 { if b.deallocator != nil && atomic.CompareAndSwapUint32(&b.deallocated, 0, 1) { b.deallocator.Deallocate(malloc.NoHints) + b.bytes = nil } } } else { if b.deallocator != nil && atomic.CompareAndSwapUint32(&b.deallocated, 0, 1) { b.deallocator.Deallocate(malloc.NoHints) + b.bytes = nil } } } From 9ae40fe62b9ebaabf430b3e1c5d0a9f14d09a89c Mon Sep 17 00:00:00 2001 From: cpegeric Date: Mon, 19 May 2025 15:20:18 +0100 Subject: [PATCH 08/33] postfn protected by shardmap --- pkg/fileservice/bytes.go | 8 +- pkg/fileservice/fifocache/data_cache.go | 4 +- pkg/fileservice/fifocache/data_cache_test.go | 3 +- pkg/fileservice/fifocache/fifo.go | 118 ++++++++----------- pkg/fileservice/fifocache/shardmap.go | 33 +++++- pkg/fileservice/fifocache/shardmap_test.go | 15 ++- pkg/fileservice/fscache/data.go | 2 +- pkg/fileservice/io_vector.go | 4 +- pkg/fileservice/mem_cache.go | 6 +- 9 files changed, 109 insertions(+), 84 deletions(-) diff --git a/pkg/fileservice/bytes.go b/pkg/fileservice/bytes.go index aa43b1b593403..ca0abf512c3d2 100644 --- a/pkg/fileservice/bytes.go +++ b/pkg/fileservice/bytes.go @@ -49,17 +49,19 @@ func (b *Bytes) Retain() { } } -func (b *Bytes) Release() { +func (b *Bytes) Release() bool { if b.bytes == nil { panic("fileservice.Bytes.Release() double free") } if b.refs != nil { - if n := b.refs.Add(-1); n == 0 { + n := b.refs.Add(-1) + if n == 0 { if b.deallocator != nil && atomic.CompareAndSwapUint32(&b.deallocated, 0, 1) { b.deallocator.Deallocate(malloc.NoHints) b.bytes = nil + return true } } } else { @@ -67,8 +69,10 @@ func (b *Bytes) Release() { atomic.CompareAndSwapUint32(&b.deallocated, 0, 1) { b.deallocator.Deallocate(malloc.NoHints) b.bytes = nil + return true } } + return false } type bytesAllocator struct { diff --git a/pkg/fileservice/fifocache/data_cache.go b/pkg/fileservice/fifocache/data_cache.go index 1c75476547c28..ab392ad59c0f2 100644 --- a/pkg/fileservice/fifocache/data_cache.go +++ b/pkg/fileservice/fifocache/data_cache.go @@ -72,7 +72,9 @@ func (d *DataCache) DeletePaths(ctx context.Context, paths []string) { d.fifo.htab.CompareAndDelete(key, func(key1, key2 fscache.CacheKey) bool { return key1.Path == key2.Path }, func(value *_CacheItem[fscache.CacheKey, fscache.Data]) { - value.postFunc(ctx, d.fifo.postEvict) + if d.fifo.postEvict != nil { + d.fifo.postEvict(ctx, value.key, value.value, value.size) + } }) } } diff --git a/pkg/fileservice/fifocache/data_cache_test.go b/pkg/fileservice/fifocache/data_cache_test.go index 569272b31c678..4ab400f184ba6 100644 --- a/pkg/fileservice/fifocache/data_cache_test.go +++ b/pkg/fileservice/fifocache/data_cache_test.go @@ -121,7 +121,8 @@ func (t testBytes) Bytes() []byte { return t } -func (t testBytes) Release() { +func (t testBytes) Release() bool { + return false } func (t testBytes) Retain() { diff --git a/pkg/fileservice/fifocache/fifo.go b/pkg/fileservice/fifocache/fifo.go index 6e1cfe8e6397a..7778411f070a0 100644 --- a/pkg/fileservice/fifocache/fifo.go +++ b/pkg/fileservice/fifocache/fifo.go @@ -42,15 +42,17 @@ type Cache[K comparable, V any] struct { } type _CacheItem[K comparable, V any] struct { - mu sync.Mutex - key K - value V - size int64 - freq int8 - deleted bool + key K + value V + size int64 + + // mutex only protect the freq + mu sync.Mutex + freq int8 + + deleted atomic.Bool } -/* func (c *_CacheItem[K, V]) inc() { c.mu.Lock() defer c.mu.Unlock() @@ -59,7 +61,6 @@ func (c *_CacheItem[K, V]) inc() { c.freq += 1 } } -*/ func (c *_CacheItem[K, V]) dec() { c.mu.Lock() @@ -77,44 +78,11 @@ func (c *_CacheItem[K, V]) getFreq() int8 { } func (c *_CacheItem[K, V]) isDeleted() bool { - c.mu.Lock() - defer c.mu.Unlock() - return c.deleted + return c.deleted.Load() } func (c *_CacheItem[K, V]) setDeleted() bool { - c.mu.Lock() - defer c.mu.Unlock() - if !c.deleted { - c.deleted = true - return true - } - return false -} - -// thread safe to run post function such as postGet, postSet and postEvict -func (c *_CacheItem[K, V]) postFunc(ctx context.Context, fn func(ctx context.Context, key K, value V, size int64)) { - if fn == nil { - return - } - c.mu.Lock() - defer c.mu.Unlock() - - fn(ctx, c.key, c.value, c.size) -} - -// IncAndPost merge inc() and postFunc() into one to save one mutex Lock operation -func (c *_CacheItem[K, V]) IncAndPost(ctx context.Context, fn func(ctx context.Context, key K, value V, size int64)) { - c.mu.Lock() - defer c.mu.Unlock() - if c.freq < 3 { - c.freq += 1 - } - - if fn == nil { - return - } - fn(ctx, c.key, c.value, c.size) + return c.deleted.CompareAndSwap(false, true) } // assume cache size is 128K @@ -169,15 +137,17 @@ func (c *Cache[K, V]) Set(ctx context.Context, key K, value V, size int64) { size: size, } - ok := c.htab.Set(key, item) + ok := c.htab.Set(key, item, func(v *_CacheItem[K, V]) { + if c.postSet != nil { + c.postSet(ctx, v.key, v.value, v.size) + } + + }) if !ok { // existed return } - // post set - item.postFunc(ctx, c.postSet) - // evict c.evictAll(ctx, nil, 0) @@ -201,29 +171,36 @@ func (c *Cache[K, V]) Set(ctx context.Context, key K, value V, size int64) { func (c *Cache[K, V]) Get(ctx context.Context, key K) (value V, ok bool) { var item *_CacheItem[K, V] - item, ok = c.htab.Get(key) + item, ok = c.htab.Get(key, func(v *_CacheItem[K, V]) { + if c.postGet != nil { + c.postGet(ctx, v.key, v.value, v.size) + } + }) if !ok { return } - // increment and postGet - item.IncAndPost(ctx, c.postGet) + // increment + item.inc() return item.value, true } func (c *Cache[K, V]) Delete(ctx context.Context, key K) { - item, ok := c.htab.GetAndDelete(key) + _, ok := c.htab.GetAndDelete(key, func(v *_CacheItem[K, V]) { + needsPostEvict := v.setDeleted() + + // post evict + if needsPostEvict { + if c.postEvict != nil { + c.postEvict(ctx, v.key, v.value, v.size) + } + } + }) if !ok { return } - needsPostEvict := item.setDeleted() - - // post evict - if needsPostEvict { - item.postFunc(ctx, c.postEvict) - } // queues will be update in evict } @@ -294,11 +271,14 @@ func (c *Cache[K, V]) evictSmall(ctx context.Context) { c.usedMain.Add(item.size) } else { // evict - c.htab.Remove(item.key) - // post evict - if item.setDeleted() { - item.postFunc(ctx, c.postEvict) - } + c.htab.Remove(item.key, item, func(v *_CacheItem[K, V]) { + // post evict + if v.setDeleted() { + if c.postEvict != nil { + c.postEvict(ctx, v.key, v.value, v.size) + } + } + }) c.usedSmall.Add(-item.size) if !c.disable_s3fifo { @@ -330,11 +310,15 @@ func (c *Cache[K, V]) evictMain(ctx context.Context) { c.main.enqueue(item) } else { // evict - c.htab.Remove(item.key) - // post evict - if item.setDeleted() { - item.postFunc(ctx, c.postEvict) - } + c.htab.Remove(item.key, item, func(v *_CacheItem[K, V]) { + // post evict + if v.setDeleted() { + if c.postEvict != nil { + c.postEvict(ctx, v.key, v.value, v.size) + } + } + + }) c.usedMain.Add(-item.size) return } diff --git a/pkg/fileservice/fifocache/shardmap.go b/pkg/fileservice/fifocache/shardmap.go index d2c5462455e77..139c53a3d2c38 100644 --- a/pkg/fileservice/fifocache/shardmap.go +++ b/pkg/fileservice/fifocache/shardmap.go @@ -40,7 +40,7 @@ func NewShardMap[K comparable, V any](hashfn func(K) uint64) *ShardMap[K, V] { return m } -func (m *ShardMap[K, V]) Set(key K, value V) bool { +func (m *ShardMap[K, V]) Set(key K, value V, postfn func(V)) bool { s := &m.shards[m.hashfn(key)%numShards] s.Lock() @@ -52,24 +52,40 @@ func (m *ShardMap[K, V]) Set(key K, value V) bool { } s.values[key] = value + + if postfn != nil { + postfn(value) + } return true } -func (m *ShardMap[K, V]) Get(key K) (V, bool) { +func (m *ShardMap[K, V]) Get(key K, postfn func(V)) (V, bool) { s := &m.shards[m.hashfn(key)%numShards] s.RLock() defer s.RUnlock() v, ok := s.values[key] + + if !ok { + return v, ok + } + + if postfn != nil { + postfn(v) + } return v, ok } -func (m *ShardMap[K, V]) Remove(key K) { +func (m *ShardMap[K, V]) Remove(key K, value V, postfn func(V)) { s := &m.shards[m.hashfn(key)%numShards] s.Lock() defer s.Unlock() delete(s.values, key) + + if postfn != nil { + postfn(value) + } } func (m *ShardMap[K, V]) CompareAndDelete(key K, fn func(k1, k2 K) bool, postfn func(V)) { @@ -90,11 +106,13 @@ func (m *ShardMap[K, V]) CompareAndDelete(key K, fn func(k1, k2 K) bool, postfn } for _, v := range deleted { - postfn(v) + if postfn != nil { + postfn(v) + } } } -func (m *ShardMap[K, V]) GetAndDelete(key K) (V, bool) { +func (m *ShardMap[K, V]) GetAndDelete(key K, postfn func(V)) (V, bool) { s := &m.shards[m.hashfn(key)%numShards] s.Lock() @@ -106,5 +124,10 @@ func (m *ShardMap[K, V]) GetAndDelete(key K) (V, bool) { } delete(s.values, key) + + if postfn != nil { + postfn(v) + } + return v, ok } diff --git a/pkg/fileservice/fifocache/shardmap_test.go b/pkg/fileservice/fifocache/shardmap_test.go index d62c869b4bc73..7633ed2e8d05b 100644 --- a/pkg/fileservice/fifocache/shardmap_test.go +++ b/pkg/fileservice/fifocache/shardmap_test.go @@ -23,19 +23,24 @@ import ( func TestShardMap(t *testing.T) { m := NewShardMap[int, string](ShardInt[int]) - ok := m.Set(1, "1") + ok := m.Set(1, "1", func(v string) { + }) assert.Equal(t, ok, true) - ok = m.Set(1, "1") + ok = m.Set(1, "1", func(v string) { + }) assert.Equal(t, ok, false) - v, ok := m.Get(1) + v, ok := m.Get(1, func(v string) { + }) assert.Equal(t, ok, true) assert.Equal(t, v, "1") - _, ok = m.GetAndDelete(0) + _, ok = m.GetAndDelete(0, func(v string) { + }) assert.Equal(t, ok, false) - v, ok = m.GetAndDelete(1) + v, ok = m.GetAndDelete(1, func(v string) { + }) assert.Equal(t, v, "1") assert.Equal(t, ok, true) } diff --git a/pkg/fileservice/fscache/data.go b/pkg/fileservice/fscache/data.go index 0a11fc38d4208..5761b155100cc 100644 --- a/pkg/fileservice/fscache/data.go +++ b/pkg/fileservice/fscache/data.go @@ -18,5 +18,5 @@ type Data interface { Bytes() []byte Slice(length int) Data Retain() - Release() + Release() bool } diff --git a/pkg/fileservice/io_vector.go b/pkg/fileservice/io_vector.go index 933b8e996cb84..680c34a5f5cff 100644 --- a/pkg/fileservice/io_vector.go +++ b/pkg/fileservice/io_vector.go @@ -28,7 +28,9 @@ func (i *IOVector) allDone() bool { func (i *IOVector) Release() { for _, entry := range i.Entries { if entry.CachedData != nil { - entry.CachedData.Release() + if entry.CachedData.Release() { + entry.CachedData = nil + } } if entry.releaseData != nil { entry.releaseData() diff --git a/pkg/fileservice/mem_cache.go b/pkg/fileservice/mem_cache.go index 2b9d2b666d5b2..dae9e651f4f85 100644 --- a/pkg/fileservice/mem_cache.go +++ b/pkg/fileservice/mem_cache.go @@ -96,7 +96,10 @@ func NewMemCache( defer LogEvent(ctx, str_memory_cache_post_evict_end) // relaese - value.Release() + released := value.Release() + if released { + // value is deallocated + } // metrics LogEvent(ctx, str_update_metrics_begin) @@ -210,6 +213,7 @@ func (m *MemCache) Update( } LogEvent(ctx, str_set_memory_cache_entry_begin) + // NOTE: cache data may not be put to the cache if data already exists m.cache.Set(ctx, key, entry.CachedData) LogEvent(ctx, str_set_memory_cache_entry_end) } From 3f30dd3cf93a130f040a870450003fa2cd5f8d19 Mon Sep 17 00:00:00 2001 From: cpegeric Date: Mon, 19 May 2025 15:29:26 +0100 Subject: [PATCH 09/33] update --- pkg/fileservice/fifocache/shardmap.go | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/pkg/fileservice/fifocache/shardmap.go b/pkg/fileservice/fifocache/shardmap.go index 139c53a3d2c38..f21d272942b48 100644 --- a/pkg/fileservice/fifocache/shardmap.go +++ b/pkg/fileservice/fifocache/shardmap.go @@ -90,7 +90,6 @@ func (m *ShardMap[K, V]) Remove(key K, value V, postfn func(V)) { func (m *ShardMap[K, V]) CompareAndDelete(key K, fn func(k1, k2 K) bool, postfn func(V)) { - deleted := make([]V, 0, 64) for i := range m.shards { s := &m.shards[i] func() { @@ -99,17 +98,13 @@ func (m *ShardMap[K, V]) CompareAndDelete(key K, fn func(k1, k2 K) bool, postfn for k, v := range s.values { if fn(k, key) { delete(s.values, k) - deleted = append(deleted, v) + if postfn != nil { + postfn(v) + } } } }() } - - for _, v := range deleted { - if postfn != nil { - postfn(v) - } - } } func (m *ShardMap[K, V]) GetAndDelete(key K, postfn func(V)) (V, bool) { From 8ab612d48701a714336dbefb76411b48825fab76 Mon Sep 17 00:00:00 2001 From: cpegeric Date: Mon, 19 May 2025 16:38:04 +0100 Subject: [PATCH 10/33] isDeleted protected by shardmap mutex --- pkg/fileservice/fifocache/fifo.go | 20 ++++++++++++++++---- pkg/fileservice/fifocache/shardmap.go | 7 +++++++ 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/pkg/fileservice/fifocache/fifo.go b/pkg/fileservice/fifocache/fifo.go index 7778411f070a0..6d1d592d3d0e0 100644 --- a/pkg/fileservice/fifocache/fifo.go +++ b/pkg/fileservice/fifocache/fifo.go @@ -23,6 +23,8 @@ import ( ) // Cache implements an in-memory cache with S3-FIFO-based eviction +// All postfn is very critical. They will increment and decrement the reference counter of the cache data and deallocate the memory when reference counter is 0. +// Make sure the postfn is protected by mutex from shardmap. type Cache[K comparable, V any] struct { capacity fscache.CapacityFunc capSmall fscache.CapacityFunc @@ -50,7 +52,8 @@ type _CacheItem[K comparable, V any] struct { mu sync.Mutex freq int8 - deleted atomic.Bool + // deleted is protected by shardmap + deleted bool } func (c *_CacheItem[K, V]) inc() { @@ -77,12 +80,18 @@ func (c *_CacheItem[K, V]) getFreq() int8 { return c.freq } +// protected by shardmap func (c *_CacheItem[K, V]) isDeleted() bool { - return c.deleted.Load() + return c.deleted } +// protected by shardmap func (c *_CacheItem[K, V]) setDeleted() bool { - return c.deleted.CompareAndSwap(false, true) + if !c.deleted { + c.deleted = true + return true + } + return false } // assume cache size is 128K @@ -258,7 +267,10 @@ func (c *Cache[K, V]) evictSmall(ctx context.Context) { return } - deleted := item.isDeleted() + deleted := c.htab.ValueIsDeleted(item.key, item, func(v *_CacheItem[K, V]) bool { + return v.deleted + }) + if deleted { c.usedSmall.Add(-item.size) return diff --git a/pkg/fileservice/fifocache/shardmap.go b/pkg/fileservice/fifocache/shardmap.go index f21d272942b48..d6bc825082432 100644 --- a/pkg/fileservice/fifocache/shardmap.go +++ b/pkg/fileservice/fifocache/shardmap.go @@ -126,3 +126,10 @@ func (m *ShardMap[K, V]) GetAndDelete(key K, postfn func(V)) (V, bool) { return v, ok } + +func (m *ShardMap[K, V]) ValueIsDeleted(key K, value V, isDeleted func(V) bool) bool { + s := &m.shards[m.hashfn(key)%numShards] + s.Lock() + defer s.Unlock() + return isDeleted(value) +} From c6338e7ad5a656e4ca14ba36ae2f8f1c110c5246 Mon Sep 17 00:00:00 2001 From: cpegeric Date: Mon, 19 May 2025 16:41:42 +0100 Subject: [PATCH 11/33] use mutex instead --- pkg/fileservice/bytes.go | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/pkg/fileservice/bytes.go b/pkg/fileservice/bytes.go index ca0abf512c3d2..744b60e3e2761 100644 --- a/pkg/fileservice/bytes.go +++ b/pkg/fileservice/bytes.go @@ -16,6 +16,7 @@ package fileservice import ( "context" + "sync" "sync/atomic" "github.com/matrixorigin/matrixone/pkg/common/malloc" @@ -23,18 +24,23 @@ import ( ) type Bytes struct { + mu sync.Mutex bytes []byte deallocator malloc.Deallocator deallocated uint32 - _refs atomic.Int32 - refs *atomic.Int32 + _refs int32 + refs *int32 } func (b *Bytes) Size() int64 { + b.mu.Lock() + defer b.mu.Unlock() return int64(len(b.bytes)) } func (b *Bytes) Bytes() []byte { + b.mu.Lock() + defer b.mu.Unlock() return b.bytes } @@ -44,21 +50,25 @@ func (b *Bytes) Slice(length int) fscache.Data { } func (b *Bytes) Retain() { + b.mu.Lock() + defer b.mu.Unlock() if b.refs != nil { - b.refs.Add(1) + (*b.refs) += 1 } } func (b *Bytes) Release() bool { + b.mu.Lock() + defer b.mu.Unlock() + if b.bytes == nil { panic("fileservice.Bytes.Release() double free") } if b.refs != nil { - n := b.refs.Add(-1) - if n == 0 { - if b.deallocator != nil && - atomic.CompareAndSwapUint32(&b.deallocated, 0, 1) { + (*b.refs) -= 1 + if *b.refs == 0 { + if b.deallocator != nil { b.deallocator.Deallocate(malloc.NoHints) b.bytes = nil return true @@ -90,7 +100,7 @@ func (b *bytesAllocator) allocateCacheData(size int, hints malloc.Hints) fscache bytes: slice, deallocator: dec, } - bytes._refs.Store(1) + bytes._refs = 1 bytes.refs = &bytes._refs return bytes } From d2fcc574045d74851c5d70ee924743ee247fa463 Mon Sep 17 00:00:00 2001 From: cpegeric Date: Mon, 19 May 2025 17:05:31 +0100 Subject: [PATCH 12/33] update --- pkg/fileservice/fifocache/fifo.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/fileservice/fifocache/fifo.go b/pkg/fileservice/fifocache/fifo.go index 6d1d592d3d0e0..219d1fe92208c 100644 --- a/pkg/fileservice/fifocache/fifo.go +++ b/pkg/fileservice/fifocache/fifo.go @@ -310,7 +310,9 @@ func (c *Cache[K, V]) evictMain(ctx context.Context) { return } - deleted := item.isDeleted() + deleted := c.htab.ValueIsDeleted(item.key, item, func(v *_CacheItem[K, V]) bool { + return v.deleted + }) if deleted { c.usedMain.Add(-item.size) return From ac7f43953d37cc3b17d0a0817f4e130edf554405 Mon Sep 17 00:00:00 2001 From: cpegeric Date: Mon, 19 May 2025 17:24:24 +0100 Subject: [PATCH 13/33] use RLock --- pkg/fileservice/fifocache/shardmap.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/fileservice/fifocache/shardmap.go b/pkg/fileservice/fifocache/shardmap.go index d6bc825082432..c9a6e46859a2f 100644 --- a/pkg/fileservice/fifocache/shardmap.go +++ b/pkg/fileservice/fifocache/shardmap.go @@ -129,7 +129,7 @@ func (m *ShardMap[K, V]) GetAndDelete(key K, postfn func(V)) (V, bool) { func (m *ShardMap[K, V]) ValueIsDeleted(key K, value V, isDeleted func(V) bool) bool { s := &m.shards[m.hashfn(key)%numShards] - s.Lock() - defer s.Unlock() + s.RLock() + defer s.RUnlock() return isDeleted(value) } From a7ebf4b6aa4297c68dc3875e05863dcb46e63ead Mon Sep 17 00:00:00 2001 From: cpegeric Date: Mon, 19 May 2025 17:29:43 +0100 Subject: [PATCH 14/33] protect Slice() --- pkg/fileservice/bytes.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/fileservice/bytes.go b/pkg/fileservice/bytes.go index 744b60e3e2761..eb9b8a2ce164c 100644 --- a/pkg/fileservice/bytes.go +++ b/pkg/fileservice/bytes.go @@ -45,6 +45,8 @@ func (b *Bytes) Bytes() []byte { } func (b *Bytes) Slice(length int) fscache.Data { + b.mu.Lock() + defer b.mu.Unlock() b.bytes = b.bytes[:length] return b } From b7847b6b20ac90a7a507a15b54d8823ecfb8be21 Mon Sep 17 00:00:00 2001 From: cpegeric Date: Tue, 20 May 2025 09:05:56 +0100 Subject: [PATCH 15/33] check buffer deallocated --- pkg/fileservice/bytes.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/pkg/fileservice/bytes.go b/pkg/fileservice/bytes.go index eb9b8a2ce164c..4dbb41f396e49 100644 --- a/pkg/fileservice/bytes.go +++ b/pkg/fileservice/bytes.go @@ -35,18 +35,27 @@ type Bytes struct { func (b *Bytes) Size() int64 { b.mu.Lock() defer b.mu.Unlock() + if b.bytes == nil { + panic("fileservice.Bytes.Size() buffer already deallocated") + } return int64(len(b.bytes)) } func (b *Bytes) Bytes() []byte { b.mu.Lock() defer b.mu.Unlock() + if b.bytes == nil { + panic("fileservice.Bytes.Bytes() buffer already deallocated") + } return b.bytes } func (b *Bytes) Slice(length int) fscache.Data { b.mu.Lock() defer b.mu.Unlock() + if b.bytes == nil { + panic("fileservice.Bytes.Slice() buffer already deallocated") + } b.bytes = b.bytes[:length] return b } @@ -54,6 +63,11 @@ func (b *Bytes) Slice(length int) fscache.Data { func (b *Bytes) Retain() { b.mu.Lock() defer b.mu.Unlock() + + if b.bytes == nil { + panic("fileservice.Bytes.Retain() buffer already deallocated") + } + if b.refs != nil { (*b.refs) += 1 } From a420ff5951bd4f77918a4043c32bb5621e4e469d Mon Sep 17 00:00:00 2001 From: cpegeric Date: Tue, 20 May 2025 09:22:20 +0100 Subject: [PATCH 16/33] add tests and comments --- pkg/fileservice/fifocache/shardmap.go | 5 +++++ pkg/fileservice/fifocache/shardmap_test.go | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/pkg/fileservice/fifocache/shardmap.go b/pkg/fileservice/fifocache/shardmap.go index c9a6e46859a2f..8cf3e74002085 100644 --- a/pkg/fileservice/fifocache/shardmap.go +++ b/pkg/fileservice/fifocache/shardmap.go @@ -54,6 +54,7 @@ func (m *ShardMap[K, V]) Set(key K, value V, postfn func(V)) bool { s.values[key] = value if postfn != nil { + // call postSet to increment the cache reference counter postfn(value) } return true @@ -71,6 +72,7 @@ func (m *ShardMap[K, V]) Get(key K, postfn func(V)) (V, bool) { } if postfn != nil { + // call postGet to increment the cache reference counter postfn(v) } return v, ok @@ -84,6 +86,7 @@ func (m *ShardMap[K, V]) Remove(key K, value V, postfn func(V)) { delete(s.values, key) if postfn != nil { + // call postEvict to decrement the cache reference counter, deallocate the buffer when ref counter = 0 postfn(value) } } @@ -99,6 +102,7 @@ func (m *ShardMap[K, V]) CompareAndDelete(key K, fn func(k1, k2 K) bool, postfn if fn(k, key) { delete(s.values, k) if postfn != nil { + // call postEvict to decrement the cache reference counter, deallocate the buffer when ref counter = 0 postfn(v) } } @@ -121,6 +125,7 @@ func (m *ShardMap[K, V]) GetAndDelete(key K, postfn func(V)) (V, bool) { delete(s.values, key) if postfn != nil { + // call postEvict to decrement the cache reference counter, deallocate the buffer when ref counter = 0 postfn(v) } diff --git a/pkg/fileservice/fifocache/shardmap_test.go b/pkg/fileservice/fifocache/shardmap_test.go index 7633ed2e8d05b..61fe14f72689d 100644 --- a/pkg/fileservice/fifocache/shardmap_test.go +++ b/pkg/fileservice/fifocache/shardmap_test.go @@ -43,4 +43,9 @@ func TestShardMap(t *testing.T) { }) assert.Equal(t, v, "1") assert.Equal(t, ok, true) + + ok = m.ValueIsDeleted(1, "1", func(v string) bool { + return false + }) + assert.Equal(t, ok, false) } From dbc16ffc8418396ee0a1aafd8c759d0853a67c1c Mon Sep 17 00:00:00 2001 From: cpegeric Date: Tue, 20 May 2025 09:34:08 +0100 Subject: [PATCH 17/33] comments --- pkg/fileservice/fifocache/fifo.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pkg/fileservice/fifocache/fifo.go b/pkg/fileservice/fifocache/fifo.go index 219d1fe92208c..e5087ff82e4d9 100644 --- a/pkg/fileservice/fifocache/fifo.go +++ b/pkg/fileservice/fifocache/fifo.go @@ -147,6 +147,7 @@ func (c *Cache[K, V]) Set(ctx context.Context, key K, value V, size int64) { } ok := c.htab.Set(key, item, func(v *_CacheItem[K, V]) { + // call Bytes.Retain() to increment the ref counter and protected by shardmap mutex if c.postSet != nil { c.postSet(ctx, v.key, v.value, v.size) } @@ -181,6 +182,7 @@ func (c *Cache[K, V]) Get(ctx context.Context, key K) (value V, ok bool) { var item *_CacheItem[K, V] item, ok = c.htab.Get(key, func(v *_CacheItem[K, V]) { + // call Bytes.Retain() to increment the ref counter and protected by shardmap mutex if c.postGet != nil { c.postGet(ctx, v.key, v.value, v.size) } @@ -197,6 +199,8 @@ func (c *Cache[K, V]) Get(ctx context.Context, key K) (value V, ok bool) { func (c *Cache[K, V]) Delete(ctx context.Context, key K) { _, ok := c.htab.GetAndDelete(key, func(v *_CacheItem[K, V]) { + // call Bytes.Release() to decrement the ref counter and protected by shardmap mutex. + // item.deleted makes sure postEvict only call once. needsPostEvict := v.setDeleted() // post evict @@ -268,6 +272,7 @@ func (c *Cache[K, V]) evictSmall(ctx context.Context) { } deleted := c.htab.ValueIsDeleted(item.key, item, func(v *_CacheItem[K, V]) bool { + // check item is deleted and protected by shardmap mutex. return v.deleted }) @@ -284,6 +289,8 @@ func (c *Cache[K, V]) evictSmall(ctx context.Context) { } else { // evict c.htab.Remove(item.key, item, func(v *_CacheItem[K, V]) { + // call Bytes.Release() to decrement the ref counter and protected by shardmap mutex. + // item.deleted makes sure postEvict only call once. // post evict if v.setDeleted() { if c.postEvict != nil { @@ -311,6 +318,7 @@ func (c *Cache[K, V]) evictMain(ctx context.Context) { } deleted := c.htab.ValueIsDeleted(item.key, item, func(v *_CacheItem[K, V]) bool { + // check item is deleted and protected by shardmap mutex. return v.deleted }) if deleted { @@ -325,6 +333,8 @@ func (c *Cache[K, V]) evictMain(ctx context.Context) { } else { // evict c.htab.Remove(item.key, item, func(v *_CacheItem[K, V]) { + // call Bytes.Release() to decrement the ref counter and protected by shardmap mutex. + // item.deleted makes sure postEvict only call once. // post evict if v.setDeleted() { if c.postEvict != nil { From 43ec47b4979d6512dbb3d2613b33f2649e9e1b8a Mon Sep 17 00:00:00 2001 From: cpegeric Date: Tue, 20 May 2025 10:09:36 +0100 Subject: [PATCH 18/33] fix sca --- pkg/fileservice/fifocache/fifo.go | 6 +++--- pkg/fileservice/fifocache/shardmap.go | 6 +++--- pkg/fileservice/fifocache/shardmap_test.go | 5 ++--- pkg/fileservice/mem_cache.go | 8 +++----- 4 files changed, 11 insertions(+), 14 deletions(-) diff --git a/pkg/fileservice/fifocache/fifo.go b/pkg/fileservice/fifocache/fifo.go index e5087ff82e4d9..07faf5923d885 100644 --- a/pkg/fileservice/fifocache/fifo.go +++ b/pkg/fileservice/fifocache/fifo.go @@ -198,7 +198,7 @@ func (c *Cache[K, V]) Get(ctx context.Context, key K) (value V, ok bool) { } func (c *Cache[K, V]) Delete(ctx context.Context, key K) { - _, ok := c.htab.GetAndDelete(key, func(v *_CacheItem[K, V]) { + ok := c.htab.GetAndDelete(key, func(v *_CacheItem[K, V]) { // call Bytes.Release() to decrement the ref counter and protected by shardmap mutex. // item.deleted makes sure postEvict only call once. needsPostEvict := v.setDeleted() @@ -273,7 +273,7 @@ func (c *Cache[K, V]) evictSmall(ctx context.Context) { deleted := c.htab.ValueIsDeleted(item.key, item, func(v *_CacheItem[K, V]) bool { // check item is deleted and protected by shardmap mutex. - return v.deleted + return v.isDeleted() }) if deleted { @@ -319,7 +319,7 @@ func (c *Cache[K, V]) evictMain(ctx context.Context) { deleted := c.htab.ValueIsDeleted(item.key, item, func(v *_CacheItem[K, V]) bool { // check item is deleted and protected by shardmap mutex. - return v.deleted + return v.isDeleted() }) if deleted { c.usedMain.Add(-item.size) diff --git a/pkg/fileservice/fifocache/shardmap.go b/pkg/fileservice/fifocache/shardmap.go index 8cf3e74002085..a076f19511adb 100644 --- a/pkg/fileservice/fifocache/shardmap.go +++ b/pkg/fileservice/fifocache/shardmap.go @@ -111,7 +111,7 @@ func (m *ShardMap[K, V]) CompareAndDelete(key K, fn func(k1, k2 K) bool, postfn } } -func (m *ShardMap[K, V]) GetAndDelete(key K, postfn func(V)) (V, bool) { +func (m *ShardMap[K, V]) GetAndDelete(key K, postfn func(V)) bool { s := &m.shards[m.hashfn(key)%numShards] s.Lock() @@ -119,7 +119,7 @@ func (m *ShardMap[K, V]) GetAndDelete(key K, postfn func(V)) (V, bool) { v, ok := s.values[key] if !ok { - return v, ok + return ok } delete(s.values, key) @@ -129,7 +129,7 @@ func (m *ShardMap[K, V]) GetAndDelete(key K, postfn func(V)) (V, bool) { postfn(v) } - return v, ok + return ok } func (m *ShardMap[K, V]) ValueIsDeleted(key K, value V, isDeleted func(V) bool) bool { diff --git a/pkg/fileservice/fifocache/shardmap_test.go b/pkg/fileservice/fifocache/shardmap_test.go index 61fe14f72689d..2c1d43268fa70 100644 --- a/pkg/fileservice/fifocache/shardmap_test.go +++ b/pkg/fileservice/fifocache/shardmap_test.go @@ -35,13 +35,12 @@ func TestShardMap(t *testing.T) { assert.Equal(t, ok, true) assert.Equal(t, v, "1") - _, ok = m.GetAndDelete(0, func(v string) { + ok = m.GetAndDelete(0, func(v string) { }) assert.Equal(t, ok, false) - v, ok = m.GetAndDelete(1, func(v string) { + ok = m.GetAndDelete(1, func(v string) { }) - assert.Equal(t, v, "1") assert.Equal(t, ok, true) ok = m.ValueIsDeleted(1, "1", func(v string) bool { diff --git a/pkg/fileservice/mem_cache.go b/pkg/fileservice/mem_cache.go index dae9e651f4f85..19f67d95d2724 100644 --- a/pkg/fileservice/mem_cache.go +++ b/pkg/fileservice/mem_cache.go @@ -95,11 +95,8 @@ func NewMemCache( LogEvent(ctx, str_memory_cache_post_evict_begin) defer LogEvent(ctx, str_memory_cache_post_evict_end) - // relaese - released := value.Release() - if released { - // value is deallocated - } + // release after all callbacks executed + defer value.Release() // metrics LogEvent(ctx, str_update_metrics_begin) @@ -115,6 +112,7 @@ func NewMemCache( } LogEvent(ctx, str_memory_cache_callbacks_end) } + } dataCache := fifocache.NewDataCache(capacityFunc, postSetFn, postGetFn, postEvictFn, disable_s3fifo) From ee52cc7b25aa0e5fde668a0723e91ea43c4cb512 Mon Sep 17 00:00:00 2001 From: cpegeric Date: Tue, 20 May 2025 10:20:13 +0100 Subject: [PATCH 19/33] panic --- pkg/fileservice/bytes_test.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/pkg/fileservice/bytes_test.go b/pkg/fileservice/bytes_test.go index 0859bdbb0445b..ea08828583ec5 100644 --- a/pkg/fileservice/bytes_test.go +++ b/pkg/fileservice/bytes_test.go @@ -32,3 +32,22 @@ func TestBytes(t *testing.T) { bs.Release() }) } + +func TestBytesPanic(t *testing.T) { + bytes, deallocator, err := ioAllocator().Allocate(42, malloc.NoHints) + assert.Nil(t, err) + bs := &Bytes{ + bytes: bytes, + deallocator: deallocator, + } + + released := bs.Release() + assert.Equal(t, released, true) + + assert.Panics(t, func() { bs.Release() }, "Bytes.Release panic()") + assert.Panics(t, func() { bs.Size() }, "Bytes.Size panic()") + assert.Panics(t, func() { bs.Bytes() }, "Bytes.Bytes panic()") + assert.Panics(t, func() { bs.Slice(0) }, "Bytes.Slice panic()") + assert.Panics(t, func() { bs.Retain() }, "Bytes.Retain panic()") + +} From f1de9d72e269c3f7a532474f7860bf2e04f269c7 Mon Sep 17 00:00:00 2001 From: cpegeric Date: Tue, 20 May 2025 11:00:05 +0100 Subject: [PATCH 20/33] fix sca --- pkg/fileservice/fifocache/fifo.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/pkg/fileservice/fifocache/fifo.go b/pkg/fileservice/fifocache/fifo.go index 07faf5923d885..baf06821d08aa 100644 --- a/pkg/fileservice/fifocache/fifo.go +++ b/pkg/fileservice/fifocache/fifo.go @@ -198,7 +198,7 @@ func (c *Cache[K, V]) Get(ctx context.Context, key K) (value V, ok bool) { } func (c *Cache[K, V]) Delete(ctx context.Context, key K) { - ok := c.htab.GetAndDelete(key, func(v *_CacheItem[K, V]) { + c.htab.GetAndDelete(key, func(v *_CacheItem[K, V]) { // call Bytes.Release() to decrement the ref counter and protected by shardmap mutex. // item.deleted makes sure postEvict only call once. needsPostEvict := v.setDeleted() @@ -210,11 +210,6 @@ func (c *Cache[K, V]) Delete(ctx context.Context, key K) { } } }) - if !ok { - return - } - - // queues will be update in evict } func (c *Cache[K, V]) Evict(ctx context.Context, done chan int64, capacityCut int64) { From 60f051317a07ac0a0d776e15b7deb5a1fe0ff69d Mon Sep 17 00:00:00 2001 From: cpegeric Date: Tue, 20 May 2025 17:57:14 +0100 Subject: [PATCH 21/33] DeletePaths add setDeleted to avoid multiple postEvict --- pkg/fileservice/fifocache/data_cache.go | 6 ++++-- pkg/fileservice/fifocache/fifo.go | 4 ++-- pkg/fileservice/io_vector.go | 5 ++++- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/pkg/fileservice/fifocache/data_cache.go b/pkg/fileservice/fifocache/data_cache.go index ab392ad59c0f2..9e39f03ba9cdd 100644 --- a/pkg/fileservice/fifocache/data_cache.go +++ b/pkg/fileservice/fifocache/data_cache.go @@ -72,8 +72,10 @@ func (d *DataCache) DeletePaths(ctx context.Context, paths []string) { d.fifo.htab.CompareAndDelete(key, func(key1, key2 fscache.CacheKey) bool { return key1.Path == key2.Path }, func(value *_CacheItem[fscache.CacheKey, fscache.Data]) { - if d.fifo.postEvict != nil { - d.fifo.postEvict(ctx, value.key, value.value, value.size) + if value.setDeleted() { + if d.fifo.postEvict != nil { + d.fifo.postEvict(ctx, value.key, value.value, value.size) + } } }) } diff --git a/pkg/fileservice/fifocache/fifo.go b/pkg/fileservice/fifocache/fifo.go index baf06821d08aa..5c0c849491511 100644 --- a/pkg/fileservice/fifocache/fifo.go +++ b/pkg/fileservice/fifocache/fifo.go @@ -283,7 +283,7 @@ func (c *Cache[K, V]) evictSmall(ctx context.Context) { c.usedMain.Add(item.size) } else { // evict - c.htab.Remove(item.key, item, func(v *_CacheItem[K, V]) { + c.htab.GetAndDelete(item.key, func(v *_CacheItem[K, V]) { // call Bytes.Release() to decrement the ref counter and protected by shardmap mutex. // item.deleted makes sure postEvict only call once. // post evict @@ -327,7 +327,7 @@ func (c *Cache[K, V]) evictMain(ctx context.Context) { c.main.enqueue(item) } else { // evict - c.htab.Remove(item.key, item, func(v *_CacheItem[K, V]) { + c.htab.GetAndDelete(item.key, func(v *_CacheItem[K, V]) { // call Bytes.Release() to decrement the ref counter and protected by shardmap mutex. // item.deleted makes sure postEvict only call once. // post evict diff --git a/pkg/fileservice/io_vector.go b/pkg/fileservice/io_vector.go index 680c34a5f5cff..1427aeb72f5b4 100644 --- a/pkg/fileservice/io_vector.go +++ b/pkg/fileservice/io_vector.go @@ -14,7 +14,9 @@ package fileservice -import "math" +import ( + "math" +) func (i *IOVector) allDone() bool { for _, entry := range i.Entries { @@ -30,6 +32,7 @@ func (i *IOVector) Release() { if entry.CachedData != nil { if entry.CachedData.Release() { entry.CachedData = nil + entry.fromCache = nil } } if entry.releaseData != nil { From 80f132af90fed5ef9d7e31fe6e74a67f618d15fd Mon Sep 17 00:00:00 2001 From: cpegeric Date: Wed, 21 May 2025 10:00:37 +0100 Subject: [PATCH 22/33] merge fix --- cmd/mo-service/config.go | 2 +- pkg/embed/config.go | 2 +- pkg/fileservice/cache.go | 1 + pkg/fileservice/cache_test.go | 2 +- pkg/fileservice/disk_cache.go | 2 + pkg/fileservice/disk_cache_test.go | 24 ++-- pkg/fileservice/fifocache/bench_test.go | 12 +- pkg/fileservice/fifocache/data_cache_test.go | 5 +- pkg/fileservice/fifocache/fifo_test.go | 135 +++++++++++++++++-- pkg/fileservice/fifocache/ghost.go | 90 +++++++++++++ pkg/fileservice/fifocache/queue.go | 27 +++- pkg/fileservice/local_fs.go | 5 +- pkg/fileservice/local_fs_test.go | 2 + pkg/fileservice/mem_cache.go | 6 +- pkg/fileservice/mem_cache_test.go | 7 +- pkg/fileservice/memory_fs_test.go | 2 + pkg/fileservice/s3_fs.go | 2 + pkg/objectio/cache.go | 12 +- 18 files changed, 292 insertions(+), 46 deletions(-) create mode 100644 pkg/fileservice/fifocache/ghost.go diff --git a/cmd/mo-service/config.go b/cmd/mo-service/config.go index 17fb9caa94ddb..1ca7f29fb0b05 100644 --- a/cmd/mo-service/config.go +++ b/cmd/mo-service/config.go @@ -301,7 +301,7 @@ func (c *Config) setDefaultValue() error { func (c *Config) initMetaCache() { if c.MetaCache.MemoryCapacity > 0 { - objectio.InitMetaCache(int64(c.MetaCache.MemoryCapacity)) + objectio.InitMetaCache(int64(c.MetaCache.MemoryCapacity), c.MetaCache.DisableS3Fifo) } } diff --git a/pkg/embed/config.go b/pkg/embed/config.go index 0314f021d7ff6..b237e972a1a63 100644 --- a/pkg/embed/config.go +++ b/pkg/embed/config.go @@ -279,7 +279,7 @@ func (c *ServiceConfig) setDefaultValue() error { func (c *ServiceConfig) initMetaCache() { if c.MetaCache.MemoryCapacity > 0 { - objectio.InitMetaCache(int64(c.MetaCache.MemoryCapacity)) + objectio.InitMetaCache(int64(c.MetaCache.MemoryCapacity), c.MetaCache.DisableS3Fifo) } } diff --git a/pkg/fileservice/cache.go b/pkg/fileservice/cache.go index f48e7cc13d1e3..1a43c5955e7c9 100644 --- a/pkg/fileservice/cache.go +++ b/pkg/fileservice/cache.go @@ -43,6 +43,7 @@ type CacheConfig struct { RemoteCacheEnabled bool `toml:"remote-cache-enabled"` RPC morpc.Config `toml:"rpc"` CheckOverlaps bool `toml:"check-overlaps"` + DisableS3Fifo bool `toml:"disable-s3fifo"` QueryClient client.QueryClient `json:"-"` KeyRouterFactory KeyRouterFactory[pb.CacheKey] `json:"-"` diff --git a/pkg/fileservice/cache_test.go b/pkg/fileservice/cache_test.go index 4ec2bb77cfca4..4b05d3fe17e43 100644 --- a/pkg/fileservice/cache_test.go +++ b/pkg/fileservice/cache_test.go @@ -32,7 +32,7 @@ func Test_readCache(t *testing.T) { slowCacheReadThreshold = time.Second size := int64(128) - m := NewMemCache(fscache.ConstCapacity(size), nil, nil, "") + m := NewMemCache(fscache.ConstCapacity(size), nil, nil, "", false) defer m.Close(ctx) ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*3) diff --git a/pkg/fileservice/disk_cache.go b/pkg/fileservice/disk_cache.go index cff7b09b760b7..1eca92c60b16a 100644 --- a/pkg/fileservice/disk_cache.go +++ b/pkg/fileservice/disk_cache.go @@ -60,6 +60,7 @@ func NewDiskCache( asyncLoad bool, cacheDataAllocator CacheDataAllocator, name string, + disable_s3fifo bool, ) (ret *DiskCache, err error) { err = os.MkdirAll(path, 0755) @@ -119,6 +120,7 @@ func NewDiskCache( ) } }, + disable_s3fifo, ), } ret.updatingPaths.Cond = sync.NewCond(new(sync.Mutex)) diff --git a/pkg/fileservice/disk_cache_test.go b/pkg/fileservice/disk_cache_test.go index 9f4af56b566e7..a023440d7e24a 100644 --- a/pkg/fileservice/disk_cache_test.go +++ b/pkg/fileservice/disk_cache_test.go @@ -45,7 +45,7 @@ func TestDiskCache(t *testing.T) { }) // new - cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "") + cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "", false) assert.Nil(t, err) defer cache.Close(ctx) @@ -130,7 +130,7 @@ func TestDiskCache(t *testing.T) { testRead(cache) // new cache instance and read - cache, err = NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "") + cache, err = NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "", false) assert.Nil(t, err) defer cache.Close(ctx) @@ -139,7 +139,7 @@ func TestDiskCache(t *testing.T) { assert.Equal(t, 1, numWritten) // new cache instance and update - cache, err = NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "") + cache, err = NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "", false) assert.Nil(t, err) defer cache.Close(ctx) @@ -159,7 +159,7 @@ func TestDiskCacheWriteAgain(t *testing.T) { var counterSet perfcounter.CounterSet ctx = perfcounter.WithCounterSet(ctx, &counterSet) - cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(4096), nil, false, nil, "") + cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(4096), nil, false, nil, "", false) assert.Nil(t, err) defer cache.Close(ctx) @@ -224,7 +224,7 @@ func TestDiskCacheWriteAgain(t *testing.T) { func TestDiskCacheFileCache(t *testing.T) { dir := t.TempDir() ctx := context.Background() - cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "") + cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "", false) assert.Nil(t, err) defer cache.Close(ctx) @@ -284,7 +284,7 @@ func TestDiskCacheDirSize(t *testing.T) { dir := t.TempDir() capacity := 1 << 20 - cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(int64(capacity)), nil, false, nil, "") + cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(int64(capacity)), nil, false, nil, "", false) assert.Nil(t, err) defer cache.Close(ctx) @@ -347,6 +347,7 @@ func benchmarkDiskCacheWriteThenRead( false, nil, "", + false, ) if err != nil { b.Fatal(err) @@ -442,6 +443,7 @@ func benchmarkDiskCacheReadRandomOffsetAtLargeFile( false, nil, "", + false, ) if err != nil { b.Fatal(err) @@ -513,6 +515,7 @@ func BenchmarkDiskCacheMultipleIOEntries(b *testing.B) { false, nil, "", + false, ) if err != nil { b.Fatal(err) @@ -584,7 +587,7 @@ func TestDiskCacheClearFiles(t *testing.T) { assert.Nil(t, err) numFiles := len(files) - _, err = NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "") + _, err = NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "", false) assert.Nil(t, err) files, err = filepath.Glob(filepath.Join(dir, "*")) @@ -598,7 +601,7 @@ func TestDiskCacheClearFiles(t *testing.T) { func TestDiskCacheBadWrite(t *testing.T) { dir := t.TempDir() ctx := context.Background() - cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "") + cache, err := NewDiskCache(ctx, dir, fscache.ConstCapacity(1<<20), nil, false, nil, "", false) assert.Nil(t, err) written, err := cache.writeFile( @@ -633,6 +636,7 @@ func TestDiskCacheGlobalSizeHint(t *testing.T) { false, nil, "test", + false, ) assert.Nil(t, err) defer cache.Close(ctx) @@ -665,7 +669,7 @@ func TestDiskCacheGlobalSizeHint(t *testing.T) { func TestDiskCacheSetFromFile(t *testing.T) { ctx := context.Background() - cache, err := NewDiskCache(ctx, t.TempDir(), fscache.ConstCapacity(1<<30), nil, false, nil, "") + cache, err := NewDiskCache(ctx, t.TempDir(), fscache.ConstCapacity(1<<30), nil, false, nil, "", false) require.Nil(t, err) defer cache.Close(ctx) @@ -690,7 +694,7 @@ func TestDiskCacheSetFromFile(t *testing.T) { func TestDiskCacheQuotaExceeded(t *testing.T) { ctx := context.Background() - cache, err := NewDiskCache(ctx, t.TempDir(), fscache.ConstCapacity(3), nil, false, nil, "") + cache, err := NewDiskCache(ctx, t.TempDir(), fscache.ConstCapacity(3), nil, false, nil, "", false) require.Nil(t, err) defer cache.Close(ctx) diff --git a/pkg/fileservice/fifocache/bench_test.go b/pkg/fileservice/fifocache/bench_test.go index d4bce36f709cd..5168f2ce54735 100644 --- a/pkg/fileservice/fifocache/bench_test.go +++ b/pkg/fileservice/fifocache/bench_test.go @@ -26,7 +26,7 @@ import ( func BenchmarkSequentialSet(b *testing.B) { ctx := context.Background() size := 65536 - cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil) + cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil, false) nElements := size * 16 b.ResetTimer() for i := 0; i < b.N; i++ { @@ -37,7 +37,7 @@ func BenchmarkSequentialSet(b *testing.B) { func BenchmarkParallelSet(b *testing.B) { ctx := context.Background() size := 65536 - cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil) + cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil, false) nElements := size * 16 b.ResetTimer() b.RunParallel(func(pb *testing.PB) { @@ -50,7 +50,7 @@ func BenchmarkParallelSet(b *testing.B) { func BenchmarkGet(b *testing.B) { ctx := context.Background() size := 65536 - cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil) + cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil, false) nElements := size * 16 for i := 0; i < nElements; i++ { cache.Set(ctx, i, i, int64(1+i%3)) @@ -64,7 +64,7 @@ func BenchmarkGet(b *testing.B) { func BenchmarkParallelGet(b *testing.B) { ctx := context.Background() size := 65536 - cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil) + cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil, false) nElements := size * 16 for i := 0; i < nElements; i++ { cache.Set(ctx, i, i, int64(1+i%3)) @@ -80,7 +80,7 @@ func BenchmarkParallelGet(b *testing.B) { func BenchmarkParallelGetOrSet(b *testing.B) { ctx := context.Background() size := 65536 - cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil) + cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil, false) nElements := size * 16 b.ResetTimer() b.RunParallel(func(pb *testing.PB) { @@ -97,7 +97,7 @@ func BenchmarkParallelGetOrSet(b *testing.B) { func BenchmarkParallelEvict(b *testing.B) { ctx := context.Background() size := 65536 - cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil) + cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil, false) nElements := size * 16 b.ResetTimer() b.RunParallel(func(pb *testing.PB) { diff --git a/pkg/fileservice/fifocache/data_cache_test.go b/pkg/fileservice/fifocache/data_cache_test.go index ac1692b729c68..4ab400f184ba6 100644 --- a/pkg/fileservice/fifocache/data_cache_test.go +++ b/pkg/fileservice/fifocache/data_cache_test.go @@ -29,6 +29,7 @@ func BenchmarkEnsureNBytesAndSet(b *testing.B) { cache := NewDataCache( fscache.ConstCapacity(1024), nil, nil, nil, + false, ) b.ResetTimer() b.RunParallel(func(pb *testing.PB) { @@ -63,7 +64,7 @@ func TestShardCacheKeyAllocs(t *testing.T) { Offset: 3, Path: strings.Repeat("abc", 42), } - if n := testing.AllocsPerRun(64, func() { + if n := testing.AllocsPerRun(64000, func() { shardCacheKey(key) }); n != 0 { t.Fatalf("should not allocate") @@ -74,6 +75,7 @@ func BenchmarkDataCacheSet(b *testing.B) { cache := NewDataCache( fscache.ConstCapacity(1024), nil, nil, nil, + false, ) b.ResetTimer() for i := range b.N { @@ -93,6 +95,7 @@ func BenchmarkDataCacheGet(b *testing.B) { cache := NewDataCache( fscache.ConstCapacity(1024), nil, nil, nil, + false, ) key := fscache.CacheKey{ Path: "foo", diff --git a/pkg/fileservice/fifocache/fifo_test.go b/pkg/fileservice/fifocache/fifo_test.go index 9e5ca3fcde918..1b38022edcf49 100644 --- a/pkg/fileservice/fifocache/fifo_test.go +++ b/pkg/fileservice/fifocache/fifo_test.go @@ -24,7 +24,7 @@ import ( func TestCacheSetGet(t *testing.T) { ctx := context.Background() - cache := New[int, int](fscache.ConstCapacity(8), ShardInt[int], nil, nil, nil) + cache := New[int, int](fscache.ConstCapacity(8), ShardInt[int], nil, nil, nil, false) cache.Set(ctx, 1, 1, 1) n, ok := cache.Get(ctx, 1) @@ -42,18 +42,18 @@ func TestCacheSetGet(t *testing.T) { func TestCacheEvict(t *testing.T) { ctx := context.Background() - cache := New[int, int](fscache.ConstCapacity(8), ShardInt[int], nil, nil, nil) + cache := New[int, int](fscache.ConstCapacity(8), ShardInt[int], nil, nil, nil, false) for i := 0; i < 64; i++ { cache.Set(ctx, i, i, 1) - if cache.used1+cache.used2 > cache.capacity() { - t.Fatalf("capacity %v, used1 %v used2 %v", cache.capacity(), cache.used1, cache.used2) + if cache.Used() > cache.capacity() { + t.Fatalf("capacity %v, usedSmall %v usedMain %v", cache.capacity(), cache.usedSmall.Load(), cache.usedMain.Load()) } } } func TestCacheEvict2(t *testing.T) { ctx := context.Background() - cache := New[int, int](fscache.ConstCapacity(2), ShardInt[int], nil, nil, nil) + cache := New[int, int](fscache.ConstCapacity(20), ShardInt[int], nil, nil, nil, false) cache.Set(ctx, 1, 1, 1) cache.Set(ctx, 2, 2, 1) @@ -76,8 +76,8 @@ func TestCacheEvict2(t *testing.T) { v, ok = cache.Get(ctx, 4) assert.True(t, ok) assert.Equal(t, 4, v) - assert.Equal(t, int64(1), cache.used1) - assert.Equal(t, int64(1), cache.used2) + assert.Equal(t, int64(4), cache.usedSmall.Load()) + assert.Equal(t, int64(0), cache.usedMain.Load()) } func TestCacheEvict3(t *testing.T) { @@ -94,12 +94,13 @@ func TestCacheEvict3(t *testing.T) { func(_ context.Context, _ int, _ bool, _ int64) { nEvict++ }, + false, ) for i := 0; i < 1024; i++ { cache.Set(ctx, i, true, 1) cache.Get(ctx, i) cache.Get(ctx, i) - assert.True(t, cache.used1+cache.used2 <= 1024) + assert.True(t, cache.Used() <= 1024) } assert.Equal(t, 0, nEvict) assert.Equal(t, 1024, nSet) @@ -107,11 +108,123 @@ func TestCacheEvict3(t *testing.T) { for i := 0; i < 1024; i++ { cache.Set(ctx, 10000+i, true, 1) - assert.True(t, cache.used1+cache.used2 <= 1024) + assert.True(t, cache.Used() <= 1024) } - assert.Equal(t, int64(102), cache.used1) - assert.Equal(t, int64(922), cache.used2) + assert.Equal(t, int64(102), cache.usedSmall.Load()) + assert.Equal(t, int64(922), cache.usedMain.Load()) assert.Equal(t, 1024, nEvict) assert.Equal(t, 2048, nSet) assert.Equal(t, 2048, nGet) } + +func TestCacheOneHitWonder(t *testing.T) { + ctx := context.Background() + cache := New[int64, int64](fscache.ConstCapacity(1000), ShardInt[int64], nil, nil, nil, false) + + capsmall := int64(1000) + for i := range capsmall { + cache.Set(ctx, i, i, 1) + } + assert.Equal(t, int64(0), cache.usedMain.Load()) + //fmt.Printf("cache main %d, small %d\n", cache.usedMain.Load(), cache.usedSmall.Load()) +} + +func TestCacheMoveMain(t *testing.T) { + ctx := context.Background() + cache := New[int64, int64](fscache.ConstCapacity(100), ShardInt[int64], nil, nil, nil, false) + + // fill small fifo to 90 + for i := range int64(90) { + cache.Set(ctx, 10000+i, 10000+i, 1) + } + + results := [][]int64{{0, 100}, {0, 100}, {0, 100}, {0, 100}, {0, 100}, + {20, 80}, {40, 60}, {60, 40}, {80, 20}, + {90, 10}, {90, 10}, {90, 10}, {90, 10}, {90, 10}, {90, 10}, {90, 10}, {90, 10}, {90, 10}, {90, 10}, {90, 10}} + + for k := range int64(20) { + //fmt.Printf("cache set 10 items\n") + capsmall := int64(10) + for i := range capsmall { + cache.Set(ctx, k*10+i, k*10+i, 1) + } + + //fmt.Printf("increment freq 2\n") + // increment the freq + for j := range 2 { + for i := range capsmall { + _, ok := cache.Get(ctx, k*10+i) + assert.Equal(t, ok, true) + } + _ = j + } + //fmt.Printf("Add %d to %d and Try move to main\n", (k+1)*200, (k+1)*200+capsmall) + // move to main + for i := range capsmall { + cache.Set(ctx, (k+1)*200+i, (k+1)*200+i, 1) + } + //fmt.Printf("cache main %d, small %d\n", cache.usedMain.Load(), cache.usedSmall.Load()) + + assert.Equal(t, results[k][0], cache.usedMain.Load()) + assert.Equal(t, results[k][1], cache.usedSmall.Load()) + } + + assert.Equal(t, int64(90), cache.usedMain.Load()) + assert.Equal(t, int64(10), cache.usedSmall.Load()) + // remove all main 0 - 99 + //fmt.Printf("remove all main\n") +} + +func TestCacheMoveGhost(t *testing.T) { + ctx := context.Background() + cache := New[int64, int64](fscache.ConstCapacity(100), ShardInt[int64], nil, nil, nil, false) + + // fill small fifo to 90 + for i := range int64(90) { + cache.Set(ctx, 10000+i, 10000+i, 1) + } + + for k := range int64(2) { + //fmt.Printf("cache set 10 items\n") + capsmall := int64(10) + for i := range capsmall { + cache.Set(ctx, k*10+i, k*10+i, 1) + } + + //fmt.Printf("increment freq 2\n") + // increment the freq + for j := range 2 { + for i := range capsmall { + _, ok := cache.Get(ctx, k*10+i) + assert.Equal(t, ok, true) + } + _ = j + } + //fmt.Printf("Add %d to %d and Try move to main\n", (k+1)*200, (k+1)*200+capsmall) + // move to main + for i := range capsmall { + cache.Set(ctx, (k+1)*200+i, (k+1)*200+i, 1) + } + //fmt.Printf("cache main %d, small %d\n", cache.usedMain.Load(), cache.usedSmall.Load()) + } + + for i := 10000; i < 10020; i++ { + assert.Equal(t, cache.ghost.contains(int64(i)), true) + + } + + //fmt.Printf("cache main %d, small %d\n", cache.usedMain.Load(), cache.usedSmall.Load()) + for i := 10000; i < 10020; i++ { + cache.Set(ctx, int64(i), int64(i), 1) + assert.Equal(t, cache.ghost.contains(int64(i)), false) + } + + assert.Equal(t, cache.usedMain.Load(), int64(20)) + assert.Equal(t, cache.usedSmall.Load(), int64(80)) + //fmt.Printf("cache main %d, small %d\n", cache.usedMain.Load(), cache.usedSmall.Load()) + //assert.Equal(t, int64(10), cache.usedMain.Load()) + //assert.Equal(t, int64(10), cache.usedSmall.Load()) + + // remove all main 0 - 99 + //fmt.Printf("remove all main\n") +} diff --git a/pkg/fileservice/fifocache/ghost.go b/pkg/fileservice/fifocache/ghost.go new file mode 100644 index 0000000000000..2eac3fd699322 --- /dev/null +++ b/pkg/fileservice/fifocache/ghost.go @@ -0,0 +1,90 @@ +// Copyright 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fifocache + +import ( + "container/list" + "sync" +) + +// ghost implements a thread-safe ghost queue for S3-FIFO +type ghost[K comparable] struct { + mu sync.RWMutex // Use RWMutex for better read performance + capacity int + keys map[K]*list.Element + queue *list.List +} + +func newGhost[K comparable](capacity int) *ghost[K] { + return &ghost[K]{ + capacity: capacity, + keys: make(map[K]*list.Element), + queue: list.New(), + } +} + +func (g *ghost[K]) add(key K) { + g.mu.Lock() + defer g.mu.Unlock() + + if _, ok := g.keys[key]; ok { + // Key already exists, maybe move it to back if needed by specific ghost logic + // For simple ghost queue, we might just ignore or update frequency if tracked + return + } + + // Evict if capacity is reached + if g.queue.Len() >= g.capacity { + elem := g.queue.Front() + if elem != nil { + evictedKey := g.queue.Remove(elem).(K) + delete(g.keys, evictedKey) + } + } + + // Add new key + elem := g.queue.PushBack(key) + g.keys[key] = elem +} + +func (g *ghost[K]) remove(key K) { + g.mu.Lock() + defer g.mu.Unlock() + + if elem, ok := g.keys[key]; ok { + g.queue.Remove(elem) + delete(g.keys, key) + } +} + +func (g *ghost[K]) contains(key K) bool { + g.mu.RLock() + defer g.mu.RUnlock() + + _, ok := g.keys[key] + return ok +} + +/* +func (g *ghost[K]) clear() { + g.mu.Lock() + defer g.mu.Unlock() + + g.queue.Init() + for k := range g.keys { + delete(g.keys, k) + } +} +*/ diff --git a/pkg/fileservice/fifocache/queue.go b/pkg/fileservice/fifocache/queue.go index fd9431d63e977..38fe4b5ce59be 100644 --- a/pkg/fileservice/fifocache/queue.go +++ b/pkg/fileservice/fifocache/queue.go @@ -17,9 +17,11 @@ package fifocache import "sync" type Queue[T any] struct { + mu sync.Mutex // Mutex to protect queue operations head *queuePart[T] tail *queuePart[T] partPool sync.Pool + size int } type queuePart[T any] struct { @@ -46,9 +48,9 @@ func NewQueue[T any]() *Queue[T] { return queue } +// empty is an internal helper, assumes lock is held func (p *Queue[T]) empty() bool { - return p.head == p.tail && - p.head.begin == len(p.head.values) + return p.head == p.tail && len(p.head.values) == p.head.begin } func (p *queuePart[T]) reset() { @@ -58,6 +60,9 @@ func (p *queuePart[T]) reset() { } func (p *Queue[T]) enqueue(v T) { + p.mu.Lock() // Acquire lock + defer p.mu.Unlock() // Ensure lock is released + if len(p.head.values) >= maxQueuePartCapacity { // extend newPart := p.partPool.Get().(*queuePart[T]) @@ -66,9 +71,13 @@ func (p *Queue[T]) enqueue(v T) { p.head = newPart } p.head.values = append(p.head.values, v) + p.size++ } func (p *Queue[T]) dequeue() (ret T, ok bool) { + p.mu.Lock() // Acquire lock + defer p.mu.Unlock() // Ensure lock is released + if p.empty() { return } @@ -76,17 +85,27 @@ func (p *Queue[T]) dequeue() (ret T, ok bool) { if p.tail.begin >= len(p.tail.values) { // shrink if p.tail.next == nil { - panic("impossible") + // This should ideally not happen if empty() check passes, + // but adding a safeguard. + // Consider logging an error here if it does. + return } part := p.tail p.tail = p.tail.next - p.partPool.Put(part) + p.partPool.Put(part) // Return the old part to the pool } ret = p.tail.values[p.tail.begin] var zero T p.tail.values[p.tail.begin] = zero p.tail.begin++ + p.size-- ok = true return } + +func (p *Queue[T]) Len() int { + p.mu.Lock() // Acquire lock + defer p.mu.Unlock() // Ensure lock is released + return p.size +} diff --git a/pkg/fileservice/local_fs.go b/pkg/fileservice/local_fs.go index 9a146f963b08b..65713215a71f1 100644 --- a/pkg/fileservice/local_fs.go +++ b/pkg/fileservice/local_fs.go @@ -18,7 +18,6 @@ import ( "bytes" "context" "errors" - "github.com/matrixorigin/matrixone/pkg/common/malloc" "io" "io/fs" "iter" @@ -31,6 +30,8 @@ import ( "sync/atomic" "time" + "github.com/matrixorigin/matrixone/pkg/common/malloc" + "go.uber.org/zap" "github.com/matrixorigin/matrixone/pkg/common/moerr" @@ -156,6 +157,7 @@ func (l *LocalFS) initCaches(ctx context.Context, config CacheConfig) error { &config.CacheCallbacks, l.perfCounterSets, l.name, + config.DisableS3Fifo, ) logutil.Info("fileservice: memory cache initialized", zap.Any("fs-name", l.name), @@ -177,6 +179,7 @@ func (l *LocalFS) initCaches(ctx context.Context, config CacheConfig) error { true, l, l.name, + config.DisableS3Fifo, ) if err != nil { return err diff --git a/pkg/fileservice/local_fs_test.go b/pkg/fileservice/local_fs_test.go index 9381c681d9169..8fad00036d3d2 100644 --- a/pkg/fileservice/local_fs_test.go +++ b/pkg/fileservice/local_fs_test.go @@ -211,12 +211,14 @@ func TestLocalFSWithIOVectorCache(t *testing.T) { fscache.ConstCapacity(1<<20), nil, nil, "", + false, ) defer memCache1.Close(ctx) memCache2 := NewMemCache( fscache.ConstCapacity(1<<20), nil, nil, "", + false, ) defer memCache2.Close(ctx) caches := []IOVectorCache{memCache1, memCache2} diff --git a/pkg/fileservice/mem_cache.go b/pkg/fileservice/mem_cache.go index 424137251d319..52e99b0987bf4 100644 --- a/pkg/fileservice/mem_cache.go +++ b/pkg/fileservice/mem_cache.go @@ -33,6 +33,7 @@ func NewMemCache( callbacks *CacheCallbacks, counterSets []*perfcounter.CounterSet, name string, + disable_s3fifo bool, ) *MemCache { inuseBytes, capacityBytes := metric.GetFsCacheBytesGauge(name, "mem") @@ -111,10 +112,9 @@ func NewMemCache( } LogEvent(ctx, str_memory_cache_callbacks_end) } - } - dataCache := fifocache.NewDataCache(capacityFunc, postSetFn, postGetFn, postEvictFn) + dataCache := fifocache.NewDataCache(capacityFunc, postSetFn, postGetFn, postEvictFn, disable_s3fifo) ret := &MemCache{ cache: dataCache, @@ -210,7 +210,7 @@ func (m *MemCache) Update( } LogEvent(ctx, str_set_memory_cache_entry_begin) - // NOTE: cache data may not be put to the cache if data already exists + // NOTE: data existed in hashtable will skip setting this cache data. At a result, reference counter does not increment m.cache.Set(ctx, key, entry.CachedData) LogEvent(ctx, str_set_memory_cache_entry_end) } diff --git a/pkg/fileservice/mem_cache_test.go b/pkg/fileservice/mem_cache_test.go index e1dfb9b2a79b9..6591b6cf655be 100644 --- a/pkg/fileservice/mem_cache_test.go +++ b/pkg/fileservice/mem_cache_test.go @@ -45,7 +45,7 @@ func TestMemCacheLeak(t *testing.T) { assert.Nil(t, err) size := int64(128) - m := NewMemCache(fscache.ConstCapacity(size), nil, nil, "") + m := NewMemCache(fscache.ConstCapacity(size), nil, nil, "", false) defer m.Close(ctx) newReadVec := func() *IOVector { @@ -118,7 +118,7 @@ func TestMemCacheLeak(t *testing.T) { // and dataOverlap-checker. func TestHighConcurrency(t *testing.T) { ctx := context.Background() - m := NewMemCache(fscache.ConstCapacity(2), nil, nil, "") + m := NewMemCache(fscache.ConstCapacity(2), nil, nil, "", false) defer m.Close(ctx) n := 10 @@ -165,6 +165,7 @@ func BenchmarkMemoryCacheUpdate(b *testing.B) { nil, nil, "", + false, ) defer cache.Flush(ctx) @@ -199,6 +200,7 @@ func BenchmarkMemoryCacheRead(b *testing.B) { nil, nil, "", + false, ) defer cache.Flush(ctx) @@ -247,6 +249,7 @@ func TestMemoryCacheGlobalSizeHint(t *testing.T) { nil, nil, "test", + false, ) defer cache.Close(ctx) diff --git a/pkg/fileservice/memory_fs_test.go b/pkg/fileservice/memory_fs_test.go index ca1d89a57ff53..db2a338e860a3 100644 --- a/pkg/fileservice/memory_fs_test.go +++ b/pkg/fileservice/memory_fs_test.go @@ -62,6 +62,7 @@ func BenchmarkMemoryFSWithMemoryCache(b *testing.B) { nil, nil, "", + false, ) defer cache.Close(ctx) @@ -85,6 +86,7 @@ func BenchmarkMemoryFSWithMemoryCacheLowCapacity(b *testing.B) { cache := NewMemCache( fscache.ConstCapacity(2*1024*1024), nil, nil, "", + false, ) defer cache.Close(ctx) diff --git a/pkg/fileservice/s3_fs.go b/pkg/fileservice/s3_fs.go index 87c7ad8745768..5a08503959428 100644 --- a/pkg/fileservice/s3_fs.go +++ b/pkg/fileservice/s3_fs.go @@ -199,6 +199,7 @@ func (s *S3FS) initCaches(ctx context.Context, config CacheConfig) error { &config.CacheCallbacks, s.perfCounterSets, s.name, + config.DisableS3Fifo, ) logutil.Info("fileservice: memory cache initialized", zap.Any("fs-name", s.name), @@ -219,6 +220,7 @@ func (s *S3FS) initCaches(ctx context.Context, config CacheConfig) error { true, s, s.name, + config.DisableS3Fifo, ) if err != nil { return err diff --git a/pkg/objectio/cache.go b/pkg/objectio/cache.go index b92d1641785c6..9f482e3058d0a 100644 --- a/pkg/objectio/cache.go +++ b/pkg/objectio/cache.go @@ -48,6 +48,7 @@ const ( type CacheConfig struct { MemoryCapacity toml.ByteSize `toml:"memory-capacity"` + DisableS3Fifo bool `toml:"disable-s3fifo"` } // BlockReadStats collect blk read related cache statistics, @@ -111,16 +112,16 @@ func cacheCapacityFunc(size int64) fscache.CapacityFunc { } func init() { - metaCache = newMetaCache(cacheCapacityFunc(metaCacheSize())) + metaCache = newMetaCache(cacheCapacityFunc(metaCacheSize()), false) } -func InitMetaCache(size int64) { +func InitMetaCache(size int64, disable_s3fifo bool) { onceInit.Do(func() { - metaCache = newMetaCache(cacheCapacityFunc(size)) + metaCache = newMetaCache(cacheCapacityFunc(size), disable_s3fifo) }) } -func newMetaCache(capacity fscache.CapacityFunc) *fifocache.Cache[mataCacheKey, []byte] { +func newMetaCache(capacity fscache.CapacityFunc, disable_s3fifo bool) *fifocache.Cache[mataCacheKey, []byte] { inuseBytes, capacityBytes := metric.GetFsCacheBytesGauge("", "meta") capacityBytes.Set(float64(capacity())) return fifocache.New[mataCacheKey, []byte]( @@ -134,7 +135,8 @@ func newMetaCache(capacity fscache.CapacityFunc) *fifocache.Cache[mataCacheKey, func(_ context.Context, _ mataCacheKey, _ []byte, size int64) { // postEvict inuseBytes.Add(float64(-size)) capacityBytes.Set(float64(capacity())) - }) + }, + disable_s3fifo) } func EvictCache(ctx context.Context) (target int64) { From 72f6421bf69396996849401917c7144d062a0323 Mon Sep 17 00:00:00 2001 From: cpegeric Date: Wed, 21 May 2025 12:02:15 +0100 Subject: [PATCH 23/33] add test for double free and bug fix looping --- pkg/fileservice/fifocache/fifo.go | 15 +++++++-------- pkg/fileservice/mem_cache_test.go | 4 ++++ 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/pkg/fileservice/fifocache/fifo.go b/pkg/fileservice/fifocache/fifo.go index 5c0c849491511..431a517c77421 100644 --- a/pkg/fileservice/fifocache/fifo.go +++ b/pkg/fileservice/fifocache/fifo.go @@ -231,21 +231,20 @@ func (c *Cache[K, V]) Used() int64 { func (c *Cache[K, V]) evictAll(ctx context.Context, done chan int64, capacityCut int64) int64 { var target int64 - target = c.capacity() - capacityCut + target = c.capacity() - capacityCut - 1 if target <= 0 { - target = 1 + target = 0 } - targetSmall := c.capSmall() - capacityCut + targetSmall := c.capSmall() - capacityCut - 1 if targetSmall <= 0 { - targetSmall = 1 + targetSmall = 0 } - //targetMain := target - targetSmall usedsmall := c.usedSmall.Load() usedmain := c.usedMain.Load() - for usedmain+usedsmall >= target { - if usedsmall >= targetSmall { + for usedmain+usedsmall > target { + if usedsmall > targetSmall { c.evictSmall(ctx) } else { c.evictMain(ctx) @@ -254,7 +253,7 @@ func (c *Cache[K, V]) evictAll(ctx context.Context, done chan int64, capacityCut usedmain = c.usedMain.Load() } - return target + return target + 1 } func (c *Cache[K, V]) evictSmall(ctx context.Context) { diff --git a/pkg/fileservice/mem_cache_test.go b/pkg/fileservice/mem_cache_test.go index 6591b6cf655be..77df8b16dec76 100644 --- a/pkg/fileservice/mem_cache_test.go +++ b/pkg/fileservice/mem_cache_test.go @@ -112,6 +112,10 @@ func TestMemCacheLeak(t *testing.T) { assert.Equal(t, int64(size)-1, m.cache.Available()) assert.Equal(t, int64(1), m.cache.Used()) + // check double free + // delete path will remove items from hashtable but items are still in queus and have reference counter 0. + m.DeletePaths(ctx, []string{"foo"}) + } // TestHighConcurrency this test is to mainly test concurrency issue in objectCache From 663cf965b6c84e6027be8201e00b70ec1b8db52c Mon Sep 17 00:00:00 2001 From: cpegeric Date: Thu, 10 Jul 2025 12:06:46 +0100 Subject: [PATCH 24/33] retain/release inside cache --- pkg/fileservice/fifocache/fifo.go | 143 ++++++++++++++------- pkg/fileservice/fifocache/shardmap.go | 6 +- pkg/fileservice/fifocache/shardmap_test.go | 4 +- pkg/fileservice/mem_cache.go | 9 -- 4 files changed, 99 insertions(+), 63 deletions(-) diff --git a/pkg/fileservice/fifocache/fifo.go b/pkg/fileservice/fifocache/fifo.go index 431a517c77421..8d8ee72a6cfdf 100644 --- a/pkg/fileservice/fifocache/fifo.go +++ b/pkg/fileservice/fifocache/fifo.go @@ -48,12 +48,12 @@ type _CacheItem[K comparable, V any] struct { value V size int64 - // mutex only protect the freq + // mutex protect the freq and postFn mu sync.Mutex freq int8 // deleted is protected by shardmap - deleted bool + deleted atomic.Bool } func (c *_CacheItem[K, V]) inc() { @@ -82,16 +82,46 @@ func (c *_CacheItem[K, V]) getFreq() int8 { // protected by shardmap func (c *_CacheItem[K, V]) isDeleted() bool { - return c.deleted + return c.deleted.Load() } // protected by shardmap func (c *_CacheItem[K, V]) setDeleted() bool { - if !c.deleted { - c.deleted = true + return c.deleted.CompareAndSwap(false, true) +} + +func (c *_CacheItem[K, V]) postFn(ctx context.Context, fn func(ctx context.Context, key K, value V, size int64)) { + if fn != nil { + c.mu.Lock() + defer c.mu.Unlock() + fn(ctx, c.key, c.value, c.size) + } +} + +// increment the reference counter +func (c *_CacheItem[K, V]) retainValue() bool { + cdata, ok := any(c.value).(fscache.Data) + if !ok { return true } - return false + c.mu.Lock() + defer c.mu.Unlock() + if c.isDeleted() { + return false + } + cdata.Retain() + return true +} + +// decrement the reference counter +func (c *_CacheItem[K, V]) releaseValue() { + cdata, ok := any(c.value).(fscache.Data) + if !ok { + return + } + c.mu.Lock() + defer c.mu.Unlock() + cdata.Release() } // assume cache size is 128K @@ -140,24 +170,29 @@ func New[K comparable, V any]( } func (c *Cache[K, V]) Set(ctx context.Context, key K, value V, size int64) { + item := &_CacheItem[K, V]{ key: key, value: value, size: size, } - ok := c.htab.Set(key, item, func(v *_CacheItem[K, V]) { - // call Bytes.Retain() to increment the ref counter and protected by shardmap mutex - if c.postSet != nil { - c.postSet(ctx, v.key, v.value, v.size) - } + // TODO: FSCACHEDATA RETAIN + // increment the ref counter first no matter what otherwise there is a risk that value is deleted + item.retainValue() - }) + ok := c.htab.Set(key, item, nil) if !ok { // existed + // TODO: FSCACHEDATA RELEASE + // decrement the ref counter if not set to release the resource + item.releaseValue() return } + // postSet + item.postFn(ctx, c.postSet) + // evict c.evictAll(ctx, nil, 0) @@ -181,35 +216,45 @@ func (c *Cache[K, V]) Set(ctx context.Context, key K, value V, size int64) { func (c *Cache[K, V]) Get(ctx context.Context, key K) (value V, ok bool) { var item *_CacheItem[K, V] - item, ok = c.htab.Get(key, func(v *_CacheItem[K, V]) { - // call Bytes.Retain() to increment the ref counter and protected by shardmap mutex - if c.postGet != nil { - c.postGet(ctx, v.key, v.value, v.size) - } - }) + item, ok = c.htab.Get(key, func(v *_CacheItem[K, V]) {}) if !ok { return } + // TODO: FSCACHEDATA RETAIN + ok = item.retainValue() + if !ok { + return item.value, false + } + // increment item.inc() + // postGet + item.postFn(ctx, c.postGet) + return item.value, true } func (c *Cache[K, V]) Delete(ctx context.Context, key K) { - c.htab.GetAndDelete(key, func(v *_CacheItem[K, V]) { + needsDelete := false + + item, ok := c.htab.GetAndDelete(key, func(v *_CacheItem[K, V]) { + needsDelete = v.setDeleted() + + }) + + if ok && needsDelete { // call Bytes.Release() to decrement the ref counter and protected by shardmap mutex. // item.deleted makes sure postEvict only call once. - needsPostEvict := v.setDeleted() - // post evict - if needsPostEvict { - if c.postEvict != nil { - c.postEvict(ctx, v.key, v.value, v.size) - } - } - }) + // deleted from hashtable + item.postFn(ctx, c.postEvict) + + // TODO: FSCACHEDATA RELEASE + item.releaseValue() + } + } func (c *Cache[K, V]) Evict(ctx context.Context, done chan int64, capacityCut int64) { @@ -265,11 +310,7 @@ func (c *Cache[K, V]) evictSmall(ctx context.Context) { return } - deleted := c.htab.ValueIsDeleted(item.key, item, func(v *_CacheItem[K, V]) bool { - // check item is deleted and protected by shardmap mutex. - return v.isDeleted() - }) - + deleted := item.isDeleted() if deleted { c.usedSmall.Add(-item.size) return @@ -282,17 +323,21 @@ func (c *Cache[K, V]) evictSmall(ctx context.Context) { c.usedMain.Add(item.size) } else { // evict + needsDelete := false c.htab.GetAndDelete(item.key, func(v *_CacheItem[K, V]) { - // call Bytes.Release() to decrement the ref counter and protected by shardmap mutex. // item.deleted makes sure postEvict only call once. // post evict - if v.setDeleted() { - if c.postEvict != nil { - c.postEvict(ctx, v.key, v.value, v.size) - } - } + needsDelete = v.setDeleted() }) + if needsDelete { + // call Bytes.Release() to decrement the ref counter + // postEvict + item.postFn(ctx, c.postEvict) + // TODO: FSCACHEDATA RELEASE + item.releaseValue() + } + c.usedSmall.Add(-item.size) if !c.disable_s3fifo { c.ghost.add(item.key) @@ -311,10 +356,7 @@ func (c *Cache[K, V]) evictMain(ctx context.Context) { return } - deleted := c.htab.ValueIsDeleted(item.key, item, func(v *_CacheItem[K, V]) bool { - // check item is deleted and protected by shardmap mutex. - return v.isDeleted() - }) + deleted := item.isDeleted() if deleted { c.usedMain.Add(-item.size) return @@ -326,17 +368,20 @@ func (c *Cache[K, V]) evictMain(ctx context.Context) { c.main.enqueue(item) } else { // evict + needsDelete := false c.htab.GetAndDelete(item.key, func(v *_CacheItem[K, V]) { - // call Bytes.Release() to decrement the ref counter and protected by shardmap mutex. // item.deleted makes sure postEvict only call once. // post evict - if v.setDeleted() { - if c.postEvict != nil { - c.postEvict(ctx, v.key, v.value, v.size) - } - } - + needsDelete = v.setDeleted() }) + if needsDelete { + // call Bytes.Release() to decrement the ref counter + // postEvict + item.postFn(ctx, c.postEvict) + // TODO: FSCACHEDATA RELEASE + item.releaseValue() + } + c.usedMain.Add(-item.size) return } diff --git a/pkg/fileservice/fifocache/shardmap.go b/pkg/fileservice/fifocache/shardmap.go index a076f19511adb..8cf3e74002085 100644 --- a/pkg/fileservice/fifocache/shardmap.go +++ b/pkg/fileservice/fifocache/shardmap.go @@ -111,7 +111,7 @@ func (m *ShardMap[K, V]) CompareAndDelete(key K, fn func(k1, k2 K) bool, postfn } } -func (m *ShardMap[K, V]) GetAndDelete(key K, postfn func(V)) bool { +func (m *ShardMap[K, V]) GetAndDelete(key K, postfn func(V)) (V, bool) { s := &m.shards[m.hashfn(key)%numShards] s.Lock() @@ -119,7 +119,7 @@ func (m *ShardMap[K, V]) GetAndDelete(key K, postfn func(V)) bool { v, ok := s.values[key] if !ok { - return ok + return v, ok } delete(s.values, key) @@ -129,7 +129,7 @@ func (m *ShardMap[K, V]) GetAndDelete(key K, postfn func(V)) bool { postfn(v) } - return ok + return v, ok } func (m *ShardMap[K, V]) ValueIsDeleted(key K, value V, isDeleted func(V) bool) bool { diff --git a/pkg/fileservice/fifocache/shardmap_test.go b/pkg/fileservice/fifocache/shardmap_test.go index 2c1d43268fa70..ee7689293380d 100644 --- a/pkg/fileservice/fifocache/shardmap_test.go +++ b/pkg/fileservice/fifocache/shardmap_test.go @@ -35,11 +35,11 @@ func TestShardMap(t *testing.T) { assert.Equal(t, ok, true) assert.Equal(t, v, "1") - ok = m.GetAndDelete(0, func(v string) { + _, ok = m.GetAndDelete(0, func(v string) { }) assert.Equal(t, ok, false) - ok = m.GetAndDelete(1, func(v string) { + _, ok = m.GetAndDelete(1, func(v string) { }) assert.Equal(t, ok, true) diff --git a/pkg/fileservice/mem_cache.go b/pkg/fileservice/mem_cache.go index 52e99b0987bf4..d3c1b10119ac9 100644 --- a/pkg/fileservice/mem_cache.go +++ b/pkg/fileservice/mem_cache.go @@ -53,9 +53,6 @@ func NewMemCache( LogEvent(ctx, str_memory_cache_post_set_begin) defer LogEvent(ctx, str_memory_cache_post_set_end) - // retain - value.Retain() - // metrics LogEvent(ctx, str_update_metrics_begin) inuseBytes.Add(float64(size)) @@ -77,9 +74,6 @@ func NewMemCache( LogEvent(ctx, str_memory_cache_post_get_begin) defer LogEvent(ctx, str_memory_cache_post_get_end) - // retain - value.Retain() - // callbacks if callbacks != nil { LogEvent(ctx, str_memory_cache_callbacks_begin) @@ -95,9 +89,6 @@ func NewMemCache( LogEvent(ctx, str_memory_cache_post_evict_begin) defer LogEvent(ctx, str_memory_cache_post_evict_end) - // release after all callbacks executed - defer value.Release() - // metrics LogEvent(ctx, str_update_metrics_begin) inuseBytes.Add(float64(-size)) From eda779eb3bbc3c4dbb7e5fe65bf9fa55918b5c35 Mon Sep 17 00:00:00 2001 From: cpegeric Date: Thu, 10 Jul 2025 12:19:33 +0100 Subject: [PATCH 25/33] update --- pkg/fileservice/fifocache/fifo.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/fileservice/fifocache/fifo.go b/pkg/fileservice/fifocache/fifo.go index 8d8ee72a6cfdf..6ff93a977667a 100644 --- a/pkg/fileservice/fifocache/fifo.go +++ b/pkg/fileservice/fifocache/fifo.go @@ -48,12 +48,12 @@ type _CacheItem[K comparable, V any] struct { value V size int64 + // check item is already deleted by either hashtable or evict + deleted atomic.Bool + // mutex protect the freq and postFn mu sync.Mutex freq int8 - - // deleted is protected by shardmap - deleted atomic.Bool } func (c *_CacheItem[K, V]) inc() { @@ -178,7 +178,7 @@ func (c *Cache[K, V]) Set(ctx context.Context, key K, value V, size int64) { } // TODO: FSCACHEDATA RETAIN - // increment the ref counter first no matter what otherwise there is a risk that value is deleted + // increment the ref counter first no matter what to make sure the memory is occupied before hashtable.Set item.retainValue() ok := c.htab.Set(key, item, nil) From f9217dd55c465f453165405bdecebfdf4c08d3e3 Mon Sep 17 00:00:00 2001 From: cpegeric Date: Thu, 10 Jul 2025 12:35:04 +0100 Subject: [PATCH 26/33] delete paths --- pkg/fileservice/fifocache/data_cache.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/pkg/fileservice/fifocache/data_cache.go b/pkg/fileservice/fifocache/data_cache.go index 9e39f03ba9cdd..96554e6c5c706 100644 --- a/pkg/fileservice/fifocache/data_cache.go +++ b/pkg/fileservice/fifocache/data_cache.go @@ -66,6 +66,7 @@ func (d *DataCache) Capacity() int64 { } func (d *DataCache) DeletePaths(ctx context.Context, paths []string) { + deletes := make([]*_CacheItem[fscache.CacheKey, fscache.Data], 0, 10) for _, path := range paths { key := fscache.CacheKey{Path: path} @@ -73,12 +74,16 @@ func (d *DataCache) DeletePaths(ctx context.Context, paths []string) { return key1.Path == key2.Path }, func(value *_CacheItem[fscache.CacheKey, fscache.Data]) { if value.setDeleted() { - if d.fifo.postEvict != nil { - d.fifo.postEvict(ctx, value.key, value.value, value.size) - } + deletes = append(deletes, value) } }) } + + // FSCACHEDATA RELEASE + for _, item := range deletes { + item.postFn(ctx, d.fifo.postEvict) + item.releaseValue() + } } func (d *DataCache) EnsureNBytes(ctx context.Context, want int) { From 66eb8098115719198567fcfde29fe998a1e33cdf Mon Sep 17 00:00:00 2001 From: cpegeric Date: Fri, 11 Jul 2025 10:24:33 +0100 Subject: [PATCH 27/33] item mutex protect everything --- pkg/fileservice/fifocache/data_cache.go | 7 +- pkg/fileservice/fifocache/fifo.go | 147 ++++++++++----------- pkg/fileservice/fifocache/shardmap.go | 14 +- pkg/fileservice/fifocache/shardmap_test.go | 5 - 4 files changed, 73 insertions(+), 100 deletions(-) diff --git a/pkg/fileservice/fifocache/data_cache.go b/pkg/fileservice/fifocache/data_cache.go index 96554e6c5c706..da1ca664204d8 100644 --- a/pkg/fileservice/fifocache/data_cache.go +++ b/pkg/fileservice/fifocache/data_cache.go @@ -73,16 +73,13 @@ func (d *DataCache) DeletePaths(ctx context.Context, paths []string) { d.fifo.htab.CompareAndDelete(key, func(key1, key2 fscache.CacheKey) bool { return key1.Path == key2.Path }, func(value *_CacheItem[fscache.CacheKey, fscache.Data]) { - if value.setDeleted() { - deletes = append(deletes, value) - } + deletes = append(deletes, value) }) } // FSCACHEDATA RELEASE for _, item := range deletes { - item.postFn(ctx, d.fifo.postEvict) - item.releaseValue() + item.MarkAsDeleted(ctx, d.fifo.postEvict) } } diff --git a/pkg/fileservice/fifocache/fifo.go b/pkg/fileservice/fifocache/fifo.go index 6ff93a977667a..a10496486339a 100644 --- a/pkg/fileservice/fifocache/fifo.go +++ b/pkg/fileservice/fifocache/fifo.go @@ -48,15 +48,14 @@ type _CacheItem[K comparable, V any] struct { value V size int64 - // check item is already deleted by either hashtable or evict - deleted atomic.Bool - - // mutex protect the freq and postFn - mu sync.Mutex - freq int8 + // mutex protect the deleted, freq and postFn + mu sync.Mutex + freq int8 + deleted bool // flag indicate item is already deleted by either hashtable or evict } -func (c *_CacheItem[K, V]) inc() { +// Thread-safe +func (c *_CacheItem[K, V]) Inc() { c.mu.Lock() defer c.mu.Unlock() @@ -65,7 +64,8 @@ func (c *_CacheItem[K, V]) inc() { } } -func (c *_CacheItem[K, V]) dec() { +// Thread-safe +func (c *_CacheItem[K, V]) Dec() { c.mu.Lock() defer c.mu.Unlock() @@ -74,23 +74,43 @@ func (c *_CacheItem[K, V]) dec() { } } -func (c *_CacheItem[K, V]) getFreq() int8 { +// Thread-safe +func (c *_CacheItem[K, V]) GetFreq() int8 { c.mu.Lock() defer c.mu.Unlock() return c.freq } -// protected by shardmap -func (c *_CacheItem[K, V]) isDeleted() bool { - return c.deleted.Load() +// Thread-safe +func (c *_CacheItem[K, V]) IsDeleted() bool { + c.mu.Lock() + defer c.mu.Unlock() + return c.deleted } -// protected by shardmap -func (c *_CacheItem[K, V]) setDeleted() bool { - return c.deleted.CompareAndSwap(false, true) +// Thread-safe +// first MarkAsDeleted will decrement the ref counter and call postfn and set deleted = true. +// After first call, MarkAsDeleted will do nothing. +func (c *_CacheItem[K, V]) MarkAsDeleted(ctx context.Context, fn func(ctx context.Context, key K, value V, size int64)) bool { + c.mu.Lock() + defer c.mu.Unlock() + + if c.deleted { + return false + } + + c.deleted = true + + if fn != nil { + fn(ctx, c.key, c.value, c.size) + } + + c.releaseValue() + return true } -func (c *_CacheItem[K, V]) postFn(ctx context.Context, fn func(ctx context.Context, key K, value V, size int64)) { +// Thread-safe +func (c *_CacheItem[K, V]) PostFn(ctx context.Context, fn func(ctx context.Context, key K, value V, size int64)) { if fn != nil { c.mu.Lock() defer c.mu.Unlock() @@ -98,29 +118,35 @@ func (c *_CacheItem[K, V]) postFn(ctx context.Context, fn func(ctx context.Conte } } -// increment the reference counter +// Thread-safe +func (c *_CacheItem[K, V]) Retain() bool { + c.mu.Lock() + defer c.mu.Unlock() + return c.retainValue() +} + +// INTERNAL: non-thread safe. +// if deleted = true, item value is already released by this Cache and is NOT valid to use it inside the Cache. +// if deleted = false, increment the reference counter of the value and it is safe to use now. func (c *_CacheItem[K, V]) retainValue() bool { cdata, ok := any(c.value).(fscache.Data) if !ok { return true } - c.mu.Lock() - defer c.mu.Unlock() - if c.isDeleted() { + if c.deleted { return false } cdata.Retain() return true } +// INTERNAL: non-thread safe. // decrement the reference counter func (c *_CacheItem[K, V]) releaseValue() { cdata, ok := any(c.value).(fscache.Data) if !ok { return } - c.mu.Lock() - defer c.mu.Unlock() cdata.Release() } @@ -177,21 +203,21 @@ func (c *Cache[K, V]) Set(ctx context.Context, key K, value V, size int64) { size: size, } - // TODO: FSCACHEDATA RETAIN + // FSCACHEDATA RETAIN // increment the ref counter first no matter what to make sure the memory is occupied before hashtable.Set - item.retainValue() + item.Retain() ok := c.htab.Set(key, item, nil) if !ok { // existed - // TODO: FSCACHEDATA RELEASE + // FSCACHEDATA RELEASE // decrement the ref counter if not set to release the resource - item.releaseValue() + item.MarkAsDeleted(ctx, nil) return } // postSet - item.postFn(ctx, c.postSet) + item.PostFn(ctx, c.postSet) // evict c.evictAll(ctx, nil, 0) @@ -216,43 +242,33 @@ func (c *Cache[K, V]) Set(ctx context.Context, key K, value V, size int64) { func (c *Cache[K, V]) Get(ctx context.Context, key K) (value V, ok bool) { var item *_CacheItem[K, V] - item, ok = c.htab.Get(key, func(v *_CacheItem[K, V]) {}) + item, ok = c.htab.Get(key, nil) if !ok { return } - // TODO: FSCACHEDATA RETAIN - ok = item.retainValue() + // FSCACHEDATA RETAIN + ok = item.Retain() if !ok { return item.value, false } // increment - item.inc() + item.Inc() // postGet - item.postFn(ctx, c.postGet) + item.PostFn(ctx, c.postGet) return item.value, true } func (c *Cache[K, V]) Delete(ctx context.Context, key K) { - needsDelete := false - - item, ok := c.htab.GetAndDelete(key, func(v *_CacheItem[K, V]) { - needsDelete = v.setDeleted() + item, ok := c.htab.GetAndDelete(key, nil) - }) - - if ok && needsDelete { + if ok { // call Bytes.Release() to decrement the ref counter and protected by shardmap mutex. // item.deleted makes sure postEvict only call once. - - // deleted from hashtable - item.postFn(ctx, c.postEvict) - - // TODO: FSCACHEDATA RELEASE - item.releaseValue() + item.MarkAsDeleted(ctx, c.postEvict) } } @@ -310,33 +326,21 @@ func (c *Cache[K, V]) evictSmall(ctx context.Context) { return } - deleted := item.isDeleted() + deleted := item.IsDeleted() if deleted { c.usedSmall.Add(-item.size) return } - if item.getFreq() > 1 { + if item.GetFreq() > 1 { // put main c.main.enqueue(item) c.usedSmall.Add(-item.size) c.usedMain.Add(item.size) } else { // evict - needsDelete := false - c.htab.GetAndDelete(item.key, func(v *_CacheItem[K, V]) { - // item.deleted makes sure postEvict only call once. - // post evict - needsDelete = v.setDeleted() - }) - - if needsDelete { - // call Bytes.Release() to decrement the ref counter - // postEvict - item.postFn(ctx, c.postEvict) - // TODO: FSCACHEDATA RELEASE - item.releaseValue() - } + c.htab.Remove(item.key) + item.MarkAsDeleted(ctx, c.postEvict) c.usedSmall.Add(-item.size) if !c.disable_s3fifo { @@ -356,31 +360,20 @@ func (c *Cache[K, V]) evictMain(ctx context.Context) { return } - deleted := item.isDeleted() + deleted := item.IsDeleted() if deleted { c.usedMain.Add(-item.size) return } - if item.getFreq() > 0 { + if item.GetFreq() > 0 { // re-enqueue - item.dec() + item.Dec() c.main.enqueue(item) } else { // evict - needsDelete := false - c.htab.GetAndDelete(item.key, func(v *_CacheItem[K, V]) { - // item.deleted makes sure postEvict only call once. - // post evict - needsDelete = v.setDeleted() - }) - if needsDelete { - // call Bytes.Release() to decrement the ref counter - // postEvict - item.postFn(ctx, c.postEvict) - // TODO: FSCACHEDATA RELEASE - item.releaseValue() - } + c.htab.Remove(item.key) + item.MarkAsDeleted(ctx, c.postEvict) c.usedMain.Add(-item.size) return diff --git a/pkg/fileservice/fifocache/shardmap.go b/pkg/fileservice/fifocache/shardmap.go index 8cf3e74002085..1ac69ab97c020 100644 --- a/pkg/fileservice/fifocache/shardmap.go +++ b/pkg/fileservice/fifocache/shardmap.go @@ -78,17 +78,12 @@ func (m *ShardMap[K, V]) Get(key K, postfn func(V)) (V, bool) { return v, ok } -func (m *ShardMap[K, V]) Remove(key K, value V, postfn func(V)) { +func (m *ShardMap[K, V]) Remove(key K) { s := &m.shards[m.hashfn(key)%numShards] s.Lock() defer s.Unlock() delete(s.values, key) - - if postfn != nil { - // call postEvict to decrement the cache reference counter, deallocate the buffer when ref counter = 0 - postfn(value) - } } func (m *ShardMap[K, V]) CompareAndDelete(key K, fn func(k1, k2 K) bool, postfn func(V)) { @@ -131,10 +126,3 @@ func (m *ShardMap[K, V]) GetAndDelete(key K, postfn func(V)) (V, bool) { return v, ok } - -func (m *ShardMap[K, V]) ValueIsDeleted(key K, value V, isDeleted func(V) bool) bool { - s := &m.shards[m.hashfn(key)%numShards] - s.RLock() - defer s.RUnlock() - return isDeleted(value) -} diff --git a/pkg/fileservice/fifocache/shardmap_test.go b/pkg/fileservice/fifocache/shardmap_test.go index ee7689293380d..da1695d58a88f 100644 --- a/pkg/fileservice/fifocache/shardmap_test.go +++ b/pkg/fileservice/fifocache/shardmap_test.go @@ -42,9 +42,4 @@ func TestShardMap(t *testing.T) { _, ok = m.GetAndDelete(1, func(v string) { }) assert.Equal(t, ok, true) - - ok = m.ValueIsDeleted(1, "1", func(v string) bool { - return false - }) - assert.Equal(t, ok, false) } From 1011890f02f3c1d85120deb1e1c4cd1395ccda55 Mon Sep 17 00:00:00 2001 From: cpegeric Date: Fri, 11 Jul 2025 10:35:21 +0100 Subject: [PATCH 28/33] update comment --- pkg/fileservice/fifocache/shardmap.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/fileservice/fifocache/shardmap.go b/pkg/fileservice/fifocache/shardmap.go index 1ac69ab97c020..df9ee5f49ecbc 100644 --- a/pkg/fileservice/fifocache/shardmap.go +++ b/pkg/fileservice/fifocache/shardmap.go @@ -54,7 +54,7 @@ func (m *ShardMap[K, V]) Set(key K, value V, postfn func(V)) bool { s.values[key] = value if postfn != nil { - // call postSet to increment the cache reference counter + // call postSet protected by mutex.Lock postfn(value) } return true @@ -72,7 +72,7 @@ func (m *ShardMap[K, V]) Get(key K, postfn func(V)) (V, bool) { } if postfn != nil { - // call postGet to increment the cache reference counter + // call postGet protected the mutex RLock. postfn(v) } return v, ok @@ -97,7 +97,7 @@ func (m *ShardMap[K, V]) CompareAndDelete(key K, fn func(k1, k2 K) bool, postfn if fn(k, key) { delete(s.values, k) if postfn != nil { - // call postEvict to decrement the cache reference counter, deallocate the buffer when ref counter = 0 + // call postfn to let parent know the item get deleted. (protected by mutex.Lock) postfn(v) } } @@ -120,7 +120,7 @@ func (m *ShardMap[K, V]) GetAndDelete(key K, postfn func(V)) (V, bool) { delete(s.values, key) if postfn != nil { - // call postEvict to decrement the cache reference counter, deallocate the buffer when ref counter = 0 + // call postfn to let parent know the item get deleted. (protected by mutex.Lock) postfn(v) } From 084c62045716fb5f681824ead10279e7550477f2 Mon Sep 17 00:00:00 2001 From: cpegeric Date: Fri, 11 Jul 2025 14:38:21 +0100 Subject: [PATCH 29/33] update --- pkg/fileservice/bytes.go | 5 +---- pkg/fileservice/fifocache/fifo.go | 20 +++++++++++++------- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/pkg/fileservice/bytes.go b/pkg/fileservice/bytes.go index 4dbb41f396e49..1a57e2310cbfd 100644 --- a/pkg/fileservice/bytes.go +++ b/pkg/fileservice/bytes.go @@ -17,7 +17,6 @@ package fileservice import ( "context" "sync" - "sync/atomic" "github.com/matrixorigin/matrixone/pkg/common/malloc" "github.com/matrixorigin/matrixone/pkg/fileservice/fscache" @@ -27,7 +26,6 @@ type Bytes struct { mu sync.Mutex bytes []byte deallocator malloc.Deallocator - deallocated uint32 _refs int32 refs *int32 } @@ -91,8 +89,7 @@ func (b *Bytes) Release() bool { } } } else { - if b.deallocator != nil && - atomic.CompareAndSwapUint32(&b.deallocated, 0, 1) { + if b.deallocator != nil { b.deallocator.Deallocate(malloc.NoHints) b.bytes = nil return true diff --git a/pkg/fileservice/fifocache/fifo.go b/pkg/fileservice/fifocache/fifo.go index a10496486339a..7ad35b4fb7605 100644 --- a/pkg/fileservice/fifocache/fifo.go +++ b/pkg/fileservice/fifocache/fifo.go @@ -119,10 +119,19 @@ func (c *_CacheItem[K, V]) PostFn(ctx context.Context, fn func(ctx context.Conte } // Thread-safe -func (c *_CacheItem[K, V]) Retain() bool { +func (c *_CacheItem[K, V]) Retain(ctx context.Context, fn func(ctx context.Context, key K, value V, size int64)) bool { c.mu.Lock() defer c.mu.Unlock() - return c.retainValue() + ok := c.retainValue() + if !ok { + return false + } + + if fn != nil { + fn(ctx, c.key, c.value, c.size) + } + + return true } // INTERNAL: non-thread safe. @@ -205,7 +214,7 @@ func (c *Cache[K, V]) Set(ctx context.Context, key K, value V, size int64) { // FSCACHEDATA RETAIN // increment the ref counter first no matter what to make sure the memory is occupied before hashtable.Set - item.Retain() + item.Retain(ctx, nil) ok := c.htab.Set(key, item, nil) if !ok { @@ -248,7 +257,7 @@ func (c *Cache[K, V]) Get(ctx context.Context, key K) (value V, ok bool) { } // FSCACHEDATA RETAIN - ok = item.Retain() + ok = item.Retain(ctx, c.postGet) if !ok { return item.value, false } @@ -256,9 +265,6 @@ func (c *Cache[K, V]) Get(ctx context.Context, key K) (value V, ok bool) { // increment item.Inc() - // postGet - item.PostFn(ctx, c.postGet) - return item.value, true } From 536e323dfb1e42bc94d268110ba7e196e20944b0 Mon Sep 17 00:00:00 2001 From: cpegeric Date: Mon, 14 Jul 2025 13:20:36 +0100 Subject: [PATCH 30/33] bug fix --- pkg/fileservice/fifocache/fifo.go | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/pkg/fileservice/fifocache/fifo.go b/pkg/fileservice/fifocache/fifo.go index 7ad35b4fb7605..ebd64c4b1186a 100644 --- a/pkg/fileservice/fifocache/fifo.go +++ b/pkg/fileservice/fifocache/fifo.go @@ -122,6 +122,9 @@ func (c *_CacheItem[K, V]) PostFn(ctx context.Context, fn func(ctx context.Conte func (c *_CacheItem[K, V]) Retain(ctx context.Context, fn func(ctx context.Context, key K, value V, size int64)) bool { c.mu.Lock() defer c.mu.Unlock() + if c.deleted { + return false + } ok := c.retainValue() if !ok { return false @@ -137,26 +140,20 @@ func (c *_CacheItem[K, V]) Retain(ctx context.Context, fn func(ctx context.Conte // INTERNAL: non-thread safe. // if deleted = true, item value is already released by this Cache and is NOT valid to use it inside the Cache. // if deleted = false, increment the reference counter of the value and it is safe to use now. -func (c *_CacheItem[K, V]) retainValue() bool { +func (c *_CacheItem[K, V]) retainValue() { cdata, ok := any(c.value).(fscache.Data) - if !ok { - return true - } - if c.deleted { - return false + if ok { + cdata.Retain() } - cdata.Retain() - return true } // INTERNAL: non-thread safe. // decrement the reference counter func (c *_CacheItem[K, V]) releaseValue() { cdata, ok := any(c.value).(fscache.Data) - if !ok { - return + if ok { + cdata.Release() } - cdata.Release() } // assume cache size is 128K From 7c5850bb33e008c980a3ebd1ff835052cc9c34e6 Mon Sep 17 00:00:00 2001 From: cpegeric Date: Mon, 14 Jul 2025 13:21:33 +0100 Subject: [PATCH 31/33] bug fix --- pkg/fileservice/fifocache/fifo.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pkg/fileservice/fifocache/fifo.go b/pkg/fileservice/fifocache/fifo.go index ebd64c4b1186a..8e2204ad610e4 100644 --- a/pkg/fileservice/fifocache/fifo.go +++ b/pkg/fileservice/fifocache/fifo.go @@ -125,10 +125,7 @@ func (c *_CacheItem[K, V]) Retain(ctx context.Context, fn func(ctx context.Conte if c.deleted { return false } - ok := c.retainValue() - if !ok { - return false - } + c.retainValue() if fn != nil { fn(ctx, c.key, c.value, c.size) From 560175b9bd6dcebc7c69ff1b1557af075e28288c Mon Sep 17 00:00:00 2001 From: cpegeric Date: Mon, 14 Jul 2025 16:40:48 +0100 Subject: [PATCH 32/33] comments --- pkg/fileservice/fifocache/fifo.go | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/pkg/fileservice/fifocache/fifo.go b/pkg/fileservice/fifocache/fifo.go index 8e2204ad610e4..217dda7976591 100644 --- a/pkg/fileservice/fifocache/fifo.go +++ b/pkg/fileservice/fifocache/fifo.go @@ -95,16 +95,21 @@ func (c *_CacheItem[K, V]) MarkAsDeleted(ctx context.Context, fn func(ctx contex c.mu.Lock() defer c.mu.Unlock() + // check item is already deleted if c.deleted { + // exit and return false which means no need to deallocate the memory return false } + // set deleted = true c.deleted = true + // call postEvict before decrement the ref counter if fn != nil { fn(ctx, c.key, c.value, c.size) } + // decrement the ref counter c.releaseValue() return true } @@ -122,11 +127,16 @@ func (c *_CacheItem[K, V]) PostFn(ctx context.Context, fn func(ctx context.Conte func (c *_CacheItem[K, V]) Retain(ctx context.Context, fn func(ctx context.Context, key K, value V, size int64)) bool { c.mu.Lock() defer c.mu.Unlock() + + // first check item is already deleted if c.deleted { return false } + + // if not deleted, increment ref counter to occupy the memory c.retainValue() + // value is safe to be accessed now and call postfn if fn != nil { fn(ctx, c.key, c.value, c.size) } @@ -340,12 +350,12 @@ func (c *Cache[K, V]) evictSmall(ctx context.Context) { } else { // evict c.htab.Remove(item.key) - item.MarkAsDeleted(ctx, c.postEvict) - c.usedSmall.Add(-item.size) if !c.disable_s3fifo { c.ghost.add(item.key) } + // mark item as deleted and item should not be accessed again + item.MarkAsDeleted(ctx, c.postEvict) return } } @@ -373,9 +383,10 @@ func (c *Cache[K, V]) evictMain(ctx context.Context) { } else { // evict c.htab.Remove(item.key) + c.usedMain.Add(-item.size) + // mark item as deleted and item should not be accessed again item.MarkAsDeleted(ctx, c.postEvict) - c.usedMain.Add(-item.size) return } } From 235ab0bb38d173c4068fb653ead49d323abd27fe Mon Sep 17 00:00:00 2001 From: cpegeric Date: Tue, 15 Jul 2025 11:00:30 +0100 Subject: [PATCH 33/33] clean up bytes --- pkg/fileservice/bytes.go | 22 +++++----------------- pkg/fileservice/bytes_test.go | 2 ++ 2 files changed, 7 insertions(+), 17 deletions(-) diff --git a/pkg/fileservice/bytes.go b/pkg/fileservice/bytes.go index 1a57e2310cbfd..853db8481d67a 100644 --- a/pkg/fileservice/bytes.go +++ b/pkg/fileservice/bytes.go @@ -26,8 +26,7 @@ type Bytes struct { mu sync.Mutex bytes []byte deallocator malloc.Deallocator - _refs int32 - refs *int32 + refs int32 } func (b *Bytes) Size() int64 { @@ -66,9 +65,7 @@ func (b *Bytes) Retain() { panic("fileservice.Bytes.Retain() buffer already deallocated") } - if b.refs != nil { - (*b.refs) += 1 - } + b.refs += 1 } func (b *Bytes) Release() bool { @@ -79,16 +76,8 @@ func (b *Bytes) Release() bool { panic("fileservice.Bytes.Release() double free") } - if b.refs != nil { - (*b.refs) -= 1 - if *b.refs == 0 { - if b.deallocator != nil { - b.deallocator.Deallocate(malloc.NoHints) - b.bytes = nil - return true - } - } - } else { + b.refs -= 1 + if b.refs == 0 { if b.deallocator != nil { b.deallocator.Deallocate(malloc.NoHints) b.bytes = nil @@ -112,9 +101,8 @@ func (b *bytesAllocator) allocateCacheData(size int, hints malloc.Hints) fscache bytes := &Bytes{ bytes: slice, deallocator: dec, + refs: 1, } - bytes._refs = 1 - bytes.refs = &bytes._refs return bytes } diff --git a/pkg/fileservice/bytes_test.go b/pkg/fileservice/bytes_test.go index ea08828583ec5..115aabec7b06d 100644 --- a/pkg/fileservice/bytes_test.go +++ b/pkg/fileservice/bytes_test.go @@ -28,6 +28,7 @@ func TestBytes(t *testing.T) { bs := &Bytes{ bytes: bytes, deallocator: deallocator, + refs: 1, } bs.Release() }) @@ -39,6 +40,7 @@ func TestBytesPanic(t *testing.T) { bs := &Bytes{ bytes: bytes, deallocator: deallocator, + refs: 1, } released := bs.Release()