From 7568e88361a91f963a65bfe37d2d280764e94303 Mon Sep 17 00:00:00 2001 From: Matthias Bertschy Date: Fri, 26 Jan 2024 10:22:05 +0100 Subject: [PATCH] add mutexes per key for all methods Signed-off-by: Matthias Bertschy --- pkg/registry/file/mutex.go | 91 ++++++++++ pkg/registry/file/mutex_test.go | 290 ++++++++++++++++++++++++++++++ pkg/registry/file/storage.go | 93 +++++----- pkg/registry/file/storage_test.go | 3 +- 4 files changed, 431 insertions(+), 46 deletions(-) create mode 100644 pkg/registry/file/mutex.go create mode 100644 pkg/registry/file/mutex_test.go diff --git a/pkg/registry/file/mutex.go b/pkg/registry/file/mutex.go new file mode 100644 index 000000000..325064568 --- /dev/null +++ b/pkg/registry/file/mutex.go @@ -0,0 +1,91 @@ +package file + +import ( + "math/rand" + "sync" + "time" +) + +// Based on https://github.com/EagleChen/mapmutex/blob/master/mapmutex.go + +// Mutex is the mutex with synchronized map +// it's for reducing unnecessary locks among different keys +type Mutex[T comparable] struct { + locks map[T]any + m *sync.Mutex + maxRetry int + maxDelay float64 // in nanosend + baseDelay float64 // in nanosecond + factor float64 + jitter float64 +} + +// TryLock tries to aquire the lock. +func (m *Mutex[T]) TryLock(key T) bool { + for i := 0; i < m.maxRetry; i++ { + m.m.Lock() + if _, ok := m.locks[key]; ok { // if locked + m.m.Unlock() + time.Sleep(m.backoff(i)) + } else { // if unlock, lockit + m.locks[key] = struct{}{} + m.m.Unlock() + return true + } + } + return false +} + +// Unlock unlocks for the key +// please call Unlock only after having aquired the lock +func (m *Mutex[T]) Unlock(key T) { + m.m.Lock() + delete(m.locks, key) + m.m.Unlock() +} + +// borrowed from grpc +func (m *Mutex[T]) backoff(retries int) time.Duration { + if retries == 0 { + return time.Duration(m.baseDelay) * time.Nanosecond + } + backoff, max := m.baseDelay, m.maxDelay + for backoff < max && retries > 0 { + backoff *= m.factor + retries-- + } + if backoff > max { + backoff = max + } + backoff *= 1 + m.jitter*(rand.Float64()*2-1) + if backoff < 0 { + return 0 + } + return time.Duration(backoff) * time.Nanosecond +} + +// NewMapMutex returns a mapmutex with default configs +func NewMapMutex[T comparable]() *Mutex[T] { + return &Mutex[T]{ + locks: make(map[T]any), + m: &sync.Mutex{}, + maxRetry: 200, + maxDelay: 100000000, // 0.1 second + baseDelay: 10, // 10 nanosecond + factor: 1.1, + jitter: 0.2, + } +} + +// NewCustomizedMapMutex returns a customized mapmutex +func NewCustomizedMapMutex[T comparable](mRetry int, mDelay, bDelay, factor, jitter float64) *Mutex[T] { + return &Mutex[T]{ + locks: make(map[T]any), + m: &sync.Mutex{}, + maxRetry: mRetry, + maxDelay: mDelay, + baseDelay: bDelay, + factor: factor, + jitter: jitter, + } +} diff --git a/pkg/registry/file/mutex_test.go b/pkg/registry/file/mutex_test.go new file mode 100644 index 000000000..5c5a69153 --- /dev/null +++ b/pkg/registry/file/mutex_test.go @@ -0,0 +1,290 @@ +package file + +import ( + "fmt" + "math/rand" + "sync" + "testing" + "time" +) + +const MaxRetry = 100000 + +func TestLockSuccess(t *testing.T) { + m := NewMapMutex[string]() + + if !m.TryLock("123") { + t.Error("fail to get lock") + } + m.Unlock("123") +} + +func TestLockFail(t *testing.T) { + // fail fast + m := NewCustomizedMapMutex[string](1, 1, 1, 2, 0.1) + + c := make(chan bool) + finish := make(chan bool) + + num := 5 + success := make([]int, num) + + for i := 0; i < num; i++ { + go func(i int) { + if m.TryLock("123") { + <-c // block here + success[i] = 1 + m.Unlock("123") + } + finish <- true + }(i) + } + + // most goroutines fail to get the lock + for i := 0; i < num-1; i++ { + <-finish + } + + sum := 0 + for _, s := range success { + sum += s + } + + if sum != 0 { + t.Error("some other goroutine got the lock") + } + + // finish the success one + c <- true + // wait + <-finish + for _, s := range success { + sum += s + } + if sum != 1 { + t.Error("no goroutine got the lock") + } +} + +func TestLockIndivisually(t *testing.T) { + m := NewMapMutex[int]() + + if !m.TryLock(123) || !m.TryLock(456) { + t.Error("different locks affect each other") + } +} + +func BenchmarkMutex1000_100_20_20(b *testing.B) { lockByOneMutex(1000, 100, 20, 20) } +func BenchmarkMapWithMutex1000_100_20_20(b *testing.B) { lockByMapWithMutex(1000, 100, 20, 20) } +func BenchmarkMapMutex1000_100_20_20(b *testing.B) { lockByMapMutex(1000, 100, 20, 20) } + +// less key, more conflict for map key +func BenchmarkMutex1000_20_20_20(b *testing.B) { lockByOneMutex(1000, 20, 20, 20) } +func BenchmarkMapWithMutex1000_20_20_20(b *testing.B) { lockByMapWithMutex(1000, 20, 20, 20) } +func BenchmarkMapMutex1000_20_20_20(b *testing.B) { lockByMapMutex(1000, 20, 20, 20) } + +// less key, more goroutine, more conflict for map key +func BenchmarkMutex1000_20_40_20(b *testing.B) { lockByOneMutex(1000, 20, 40, 20) } +func BenchmarkMapWithMutex1000_20_40_20(b *testing.B) { lockByMapWithMutex(1000, 20, 40, 20) } +func BenchmarkMapMutex1000_20_40_20(b *testing.B) { lockByMapMutex(1000, 20, 40, 20) } + +// even we want to use map to avoid unnecessary lock +// if case of only 2 entries, a lot of locking occurs +func BenchmarkMutex1000_2_40_20(b *testing.B) { lockByOneMutex(1000, 2, 40, 20) } +func BenchmarkMapWithMutex1000_2_40_20(b *testing.B) { lockByMapWithMutex(1000, 2, 40, 20) } +func BenchmarkMapMutex1000_2_40_20(b *testing.B) { lockByMapMutex(1000, 2, 40, 20) } + +// longer time per job, more conflict for map key +func BenchmarkMutex1000_20_40_60(b *testing.B) { lockByOneMutex(1000, 20, 40, 60) } +func BenchmarkMapWithMutex1000_20_40_60(b *testing.B) { lockByMapWithMutex(1000, 20, 40, 60) } +func BenchmarkMapMutex1000_20_40_60(b *testing.B) { lockByMapMutex(1000, 20, 40, 60) } + +// much more actions +func BenchmarkMutex10000_20_40_20(b *testing.B) { lockByOneMutex(10000, 20, 40, 20) } +func BenchmarkMapWithMutex10000_20_40_20(b *testing.B) { lockByMapWithMutex(10000, 20, 40, 20) } +func BenchmarkMapMutex10000_20_40_20(b *testing.B) { lockByMapMutex(10000, 20, 40, 20) } + +func min(a, b int) int { + if a < b { + return a + } + return b +} + +// load should be larger than 0 +func splitLoad(load, buckets int) []int { + result := make([]int, buckets) + avg := load / buckets + remain := load % buckets + + // split + for i := range result { + result[i] = avg + if remain > 0 { + result[i]++ + remain-- + } + } + + // randomize + for i := 0; i < buckets; i += 2 { + if i+1 < buckets { + r := rand.Intn(min(result[i], result[i+1])) + if rand.Intn(r+1)%2 == 0 { + result[i] -= r + result[i+1] += r + } else { + result[i] += r + result[i+1] -= r + } + } + } + + return result +} + +func lockByOneMutex(actionCount, keyCount, goroutineNum, averageTime int) { + sharedSlice := make([]int, keyCount) + var m sync.Mutex + + loads := splitLoad(actionCount, goroutineNum) + var wg sync.WaitGroup + wg.Add(goroutineNum) + success := make([]int, goroutineNum) + for i, load := range loads { + go func(i, load int) { + success[i] = runWithOneMutex(load, keyCount, averageTime, + sharedSlice, &m) + wg.Done() + }(i, load) + } + + wg.Wait() + sum := 0 + for _, s := range success { + sum += s + } + fmt.Println("one mutex: ", actionCount, keyCount, goroutineNum, averageTime, "sum is: ", sum) +} + +func lockByMapWithMutex(actionCount, keyCount, goroutineNum, averageTime int) { + sharedSlice := make([]int, keyCount) + locks := make(map[int]bool) + var m sync.Mutex + + loads := splitLoad(actionCount, goroutineNum) + var wg sync.WaitGroup + wg.Add(goroutineNum) + success := make([]int, goroutineNum) + for i, load := range loads { + go func(i, load int) { + success[i] = runWithMapWithMutex(load, keyCount, averageTime, + sharedSlice, &m, locks) + wg.Done() + }(i, load) + } + + wg.Wait() + sum := 0 + for _, s := range success { + sum += s + } + fmt.Println("map with mutex: ", actionCount, keyCount, goroutineNum, averageTime, "sum is: ", sum) +} + +func lockByMapMutex(actionCount, keyCount, goroutineNum, averageTime int) { + sharedSlice := make([]int, keyCount) + m := NewMapMutex[int]() + + loads := splitLoad(actionCount, goroutineNum) + var wg sync.WaitGroup + wg.Add(goroutineNum) + success := make([]int, goroutineNum) + for i, load := range loads { + go func(i, load int) { + success[i] = runWithMapMutex(load, keyCount, averageTime, + sharedSlice, m) + wg.Done() + }(i, load) + } + + wg.Wait() + sum := 0 + for _, s := range success { + sum += s + } + fmt.Println("map mutex: ", actionCount, keyCount, goroutineNum, averageTime, "sum is: ", sum) +} + +func runWithOneMutex(iterateNum, keyCount, averageTime int, sharedSlice []int, + m *sync.Mutex) int { + success := 0 + for ; iterateNum > 0; iterateNum-- { + m.Lock() + + idx := rand.Intn(keyCount) + doTheJob(averageTime, idx, sharedSlice) + success++ + + m.Unlock() + } + + return success +} + +func runWithMapWithMutex(iterateNum, keyCount, averageTime int, + sharedSlice []int, m *sync.Mutex, locks map[int]bool) int { + success := 0 + for ; iterateNum > 0; iterateNum-- { + idx := rand.Intn(keyCount) + goon := false + for i := 0; i < MaxRetry; i++ { + m.Lock() + if locks[idx] { // if locked + m.Unlock() + time.Sleep(time.Duration(rand.Intn(100)*(i/100+1)) * time.Nanosecond) + } else { // if unlock, lockit + locks[idx] = true + m.Unlock() + goon = true + break + } + } + + if !goon { + continue // failed to get lock, go on for next iteration + } + doTheJob(averageTime, idx, sharedSlice) + success++ + + m.Lock() + delete(locks, idx) + m.Unlock() + } + return success +} + +func runWithMapMutex(iterateNum, keyCount, averageTime int, + sharedSlice []int, m *Mutex[int]) int { + success := 0 + for ; iterateNum > 0; iterateNum-- { + idx := rand.Intn(keyCount) + // fail to get lock + if !m.TryLock(idx) { + continue + } + + doTheJob(averageTime, idx, sharedSlice) + success++ + + m.Unlock(idx) + } + return success +} + +func doTheJob(averageTime, idx int, sharedSlice []int) { + // do real job, just sleep some time and set a value + miliSec := rand.Intn(averageTime * 2) + time.Sleep(time.Duration(miliSec) * time.Millisecond) + sharedSlice[idx] = miliSec +} diff --git a/pkg/registry/file/storage.go b/pkg/registry/file/storage.go index a519f7ec9..8c71c406e 100644 --- a/pkg/registry/file/storage.go +++ b/pkg/registry/file/storage.go @@ -5,25 +5,25 @@ import ( "encoding/json" "errors" "fmt" - "os" - "path/filepath" - "reflect" - "strings" - "sync" - "github.com/kubescape/go-logger" "github.com/kubescape/go-logger/helpers" "github.com/spf13/afero" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/storage" + "os" + "path/filepath" + "reflect" + "strings" ) const ( + clusterKey = "cluster" JsonExt = ".j" MetadataExt = ".m" DefaultStorageRoot = "/data" @@ -42,7 +42,7 @@ type objState struct { type StorageImpl struct { appFs afero.Fs watchDispatcher watchDispatcher - lock *sync.RWMutex + locks *Mutex[string] root string versioner storage.Versioner } @@ -63,7 +63,7 @@ func NewStorageImpl(appFs afero.Fs, root string) StorageQuerier { return &StorageImpl{ appFs: appFs, watchDispatcher: newWatchDispatcher(), - lock: &sync.RWMutex{}, + locks: NewMapMutex[string](), root: root, versioner: storage.APIObjectVersioner{}, } @@ -105,7 +105,7 @@ func isPayloadFile(path string) bool { return !IsMetadataFile(path) } -func (s *StorageImpl) writeFiles(ctx context.Context, key string, obj runtime.Object, metaOut runtime.Object) error { +func (s *StorageImpl) writeFiles(key string, obj runtime.Object, metaOut runtime.Object) error { // set resourceversion if version, _ := s.versioner.ObjectResourceVersion(obj); version == 0 { if err := s.versioner.UpdateObject(obj, 1); err != nil { @@ -119,10 +119,6 @@ func (s *StorageImpl) writeFiles(ctx context.Context, key string, obj runtime.Ob } // prepare path p := filepath.Join(s.root, key) - _, spanLock := otel.Tracer("").Start(ctx, "waiting for lock") - s.lock.Lock() - spanLock.End() - defer s.lock.Unlock() if err := s.appFs.MkdirAll(filepath.Dir(p), 0755); err != nil { return fmt.Errorf("mkdir: %w", err) } @@ -168,6 +164,10 @@ func (s *StorageImpl) Create(ctx context.Context, key string, obj, metaOut runti ctx, span := otel.Tracer("").Start(ctx, "StorageImpl.Create") span.SetAttributes(attribute.String("key", key)) defer span.End() + _, spanLock := otel.Tracer("").Start(ctx, "waiting for lock") + s.locks.TryLock(key) + spanLock.End() + defer s.locks.Unlock(key) // resourceversion should not be set on create if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 { msg := "resourceVersion should not be set on objects to be created" @@ -175,7 +175,7 @@ func (s *StorageImpl) Create(ctx context.Context, key string, obj, metaOut runti return errors.New(msg) } // write files - if err := s.writeFiles(ctx, key, obj, metaOut); err != nil { + if err := s.writeFiles(key, obj, metaOut); err != nil { logger.L().Ctx(ctx).Error("write files failed", helpers.Error(err), helpers.String("key", key)) return err } @@ -194,11 +194,11 @@ func (s *StorageImpl) Delete(ctx context.Context, key string, metaOut runtime.Ob ctx, span := otel.Tracer("").Start(ctx, "StorageImpl.Delete") span.SetAttributes(attribute.String("key", key)) defer span.End() - p := filepath.Join(s.root, key) _, spanLock := otel.Tracer("").Start(ctx, "waiting for lock") - s.lock.Lock() + s.locks.TryLock(key) spanLock.End() - defer s.lock.Unlock() + defer s.locks.Unlock(key) + p := filepath.Join(s.root, key) // read metadata file file, err := s.appFs.Open(makeMetadataPath(p)) if err != nil { @@ -255,11 +255,11 @@ func (s *StorageImpl) Get(ctx context.Context, key string, opts storage.GetOptio ctx, span := otel.Tracer("").Start(ctx, "StorageImpl.Get") span.SetAttributes(attribute.String("key", key)) defer span.End() - p := filepath.Join(s.root, key) _, spanLock := otel.Tracer("").Start(ctx, "waiting for lock") - s.lock.RLock() + s.locks.TryLock(key) spanLock.End() - defer s.lock.RUnlock() + defer s.locks.Unlock(key) + p := filepath.Join(s.root, key) file, err := s.appFs.Open(makePayloadPath(p)) if err != nil { if errors.Is(err, afero.ErrFileNotFound) { @@ -291,6 +291,10 @@ func (s *StorageImpl) GetList(ctx context.Context, key string, _ storage.ListOpt ctx, span := otel.Tracer("").Start(ctx, "StorageImpl.GetList") span.SetAttributes(attribute.String("key", key)) defer span.End() + _, spanLock := otel.Tracer("").Start(ctx, "waiting for lock") + s.locks.TryLock(key) + spanLock.End() + defer s.locks.Unlock(key) listPtr, err := meta.GetItemsPtr(listObj) if err != nil { logger.L().Ctx(ctx).Error("get items ptr failed", helpers.Error(err), helpers.String("key", key)) @@ -304,9 +308,6 @@ func (s *StorageImpl) GetList(ctx context.Context, key string, _ storage.ListOpt p := filepath.Join(s.root, key) var files []string - _, spanLock := otel.Tracer("").Start(ctx, "waiting for lock") - s.lock.RLock() - spanLock.End() metadataPath := makeMetadataPath(p) if exists, _ := afero.Exists(s.appFs, metadataPath); exists { @@ -322,7 +323,6 @@ func (s *StorageImpl) GetList(ctx context.Context, key string, _ storage.ListOpt return nil }) } - s.lock.RUnlock() for _, path := range files { // we need to read the whole file file, err := s.appFs.Open(path) @@ -407,6 +407,10 @@ func (s *StorageImpl) GuaranteedUpdate( ctx, span := otel.Tracer("").Start(ctx, "StorageImpl.GuaranteedUpdate") span.SetAttributes(attribute.String("key", key)) defer span.End() + _, spanLock := otel.Tracer("").Start(ctx, "waiting for lock") + s.locks.TryLock(key) + spanLock.End() + defer s.locks.Unlock(key) // key preparation is skipped // otel span tracking is skipped @@ -466,7 +470,9 @@ func (s *StorageImpl) GuaranteedUpdate( if err != nil { // If our data is already up-to-date, return the error if origStateIsCurrent { - logger.L().Ctx(ctx).Error("tryUpdate func failed", helpers.Error(err), helpers.String("key", key)) + if !apierrors.IsNotFound(err) { + logger.L().Ctx(ctx).Error("tryUpdate func failed", helpers.Error(err), helpers.String("key", key)) + } return err } @@ -485,7 +491,9 @@ func (s *StorageImpl) GuaranteedUpdate( // it turns out our cached data was not stale, return the error if cachedRev == origState.rev { - logger.L().Ctx(ctx).Error("tryUpdate func failed", helpers.Error(err), helpers.String("key", key)) + if !apierrors.IsNotFound(err) { + logger.L().Ctx(ctx).Error("tryUpdate func failed", helpers.Error(err), helpers.String("key", key)) + } return cachedUpdateErr } @@ -494,7 +502,7 @@ func (s *StorageImpl) GuaranteedUpdate( } // save to disk and fill into metaOut - err = s.writeFiles(ctx, key, ret, metaOut) + err = s.writeFiles(key, ret, metaOut) if err == nil { // Only successful updates should produce modification events s.watchDispatcher.Modified(key, metaOut) @@ -511,8 +519,8 @@ func (s *StorageImpl) Count(key string) (int64, error) { p := filepath.Join(s.root, key) metadataPath := makeMetadataPath(p) - s.lock.RLock() - defer s.lock.RUnlock() + s.locks.TryLock(key) + defer s.locks.Unlock(key) pathExists, _ := afero.Exists(s.appFs, metadataPath) pathIsDir, _ := afero.IsDir(s.appFs, metadataPath) @@ -546,6 +554,10 @@ func (s *StorageImpl) GetByNamespace(ctx context.Context, apiVersion, kind, name ctx, span := otel.Tracer("").Start(ctx, "StorageImpl.GetByNamespace") span.SetAttributes(attribute.String("apiVersion", apiVersion), attribute.String("kind", kind), attribute.String("namespace", namespace)) defer span.End() + _, spanLock := otel.Tracer("").Start(ctx, "waiting for lock") + s.locks.TryLock(namespace) + spanLock.End() + defer s.locks.Unlock(namespace) listPtr, err := meta.GetItemsPtr(listObj) if err != nil { @@ -561,11 +573,6 @@ func (s *StorageImpl) GetByNamespace(ctx context.Context, apiVersion, kind, name p := filepath.Join(s.root, apiVersion, kind, namespace) - _, spanLock := otel.Tracer("").Start(ctx, "waiting for lock") - s.lock.RLock() - defer s.lock.RUnlock() - spanLock.End() - // read all json files under the namespace and append to list _ = afero.Walk(s.appFs, p, func(path string, info os.FileInfo, err error) error { if !isPayloadFile(path) { @@ -586,6 +593,10 @@ func (s *StorageImpl) GetByNamespace(ctx context.Context, apiVersion, kind, name func (s *StorageImpl) GetClusterScopedResource(ctx context.Context, apiVersion, kind string, listObj runtime.Object) error { ctx, span := otel.Tracer("").Start(ctx, "StorageImpl.GetClusterScopedResource") defer span.End() + _, spanLock := otel.Tracer("").Start(ctx, "waiting for lock") + s.locks.TryLock(clusterKey) + spanLock.End() + defer s.locks.Unlock(clusterKey) listPtr, err := meta.GetItemsPtr(listObj) if err != nil { @@ -601,11 +612,6 @@ func (s *StorageImpl) GetClusterScopedResource(ctx context.Context, apiVersion, p := filepath.Join(s.root, apiVersion, kind) - _, spanLock := otel.Tracer("").Start(ctx, "waiting for lock") - s.lock.RLock() - defer s.lock.RUnlock() - spanLock.End() - _ = afero.Walk(s.appFs, p, func(path string, info os.FileInfo, err error) error { // the first path is the root path if path == p { @@ -633,6 +639,10 @@ func (s *StorageImpl) GetClusterScopedResource(ctx context.Context, apiVersion, func (s *StorageImpl) GetByCluster(ctx context.Context, apiVersion, kind string, listObj runtime.Object) error { ctx, span := otel.Tracer("").Start(ctx, "StorageImpl.GetByCluster") defer span.End() + _, spanLock := otel.Tracer("").Start(ctx, "waiting for lock") + s.locks.TryLock(clusterKey) + spanLock.End() + defer s.locks.Unlock(clusterKey) listPtr, err := meta.GetItemsPtr(listObj) if err != nil { @@ -648,11 +658,6 @@ func (s *StorageImpl) GetByCluster(ctx context.Context, apiVersion, kind string, p := filepath.Join(s.root, apiVersion, kind) - _, spanLock := otel.Tracer("").Start(ctx, "waiting for lock") - s.lock.RLock() - defer s.lock.RUnlock() - spanLock.End() - // for each namespace, read all json files and append it to list obj _ = afero.Walk(s.appFs, p, func(path string, info os.FileInfo, err error) error { // the first path is the root path diff --git a/pkg/registry/file/storage_test.go b/pkg/registry/file/storage_test.go index 646a88504..163701a2b 100644 --- a/pkg/registry/file/storage_test.go +++ b/pkg/registry/file/storage_test.go @@ -549,7 +549,6 @@ func TestStorageImpl_Versioner(t *testing.T) { func BenchmarkWriteFiles(b *testing.B) { s := NewStorageImpl(afero.NewMemMapFs(), DefaultStorageRoot).(*StorageImpl) - ctx := context.TODO() key := "/spdx.softwarecomposition.kubescape.io/sbomspdxv2p3s/kubescape/toto" obj := &v1beta1.SBOMSPDXv2p3{ ObjectMeta: v1.ObjectMeta{ @@ -563,7 +562,7 @@ func BenchmarkWriteFiles(b *testing.B) { } metaOut := &v1beta1.SBOMSPDXv2p3{} for i := 0; i < b.N; i++ { - _ = s.writeFiles(ctx, key, obj, metaOut) + _ = s.writeFiles(key, obj, metaOut) } b.ReportAllocs() }