Skip to content

Commit 4758944

Browse files
authored
Key Locks (#41)
1 parent b2f1b72 commit 4758944

File tree

2 files changed

+187
-0
lines changed

2 files changed

+187
-0
lines changed

pkg/store/locks/locks.go

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
The locks package implements a key-based lock mechanism that uses a crc32 hash to
3+
distribute keys across a fixed number of locks. This allows for concurrent access
4+
to different keys without contention, but fixes the number of locks (and therefore the
5+
amount of available concurrency) to ensure that memory usage is bounded.
6+
*/
7+
package locks
8+
9+
import (
10+
"hash/crc32"
11+
"sync"
12+
)
13+
14+
const DefaultCount = 1024
15+
16+
// KeyLocks are used to prevent concurrent writes to the same key and to allow multiple
17+
// concurrent reads using a sync.RWMutex. The keys are distributed across a fixed number
18+
// to preven unbounded memory growth; so it is possible that two different keys will
19+
// share the same lock. Collection keys (keys without object ids or versions), object
20+
// prefix keys, and specific version keys are all locked with the same data structure.
21+
type KeyLock struct {
22+
count uint32
23+
locks []sync.RWMutex
24+
table *crc32.Table
25+
}
26+
27+
// Create a new KeyLock with the given number of locks. The greater nlocks is, the
28+
// greater concurrency there is across the entire key space at the cost of more memory.
29+
// We recommend allocating at least 1024 locks to ensure suitable performance.
30+
func New(nlocks uint32) *KeyLock {
31+
if nlocks == 0 {
32+
nlocks = DefaultCount
33+
}
34+
35+
return &KeyLock{
36+
count: nlocks,
37+
locks: make([]sync.RWMutex, nlocks),
38+
table: crc32.MakeTable(crc32.Koopman),
39+
}
40+
}
41+
42+
// Acquire a write lock for the given key.
43+
func (k *KeyLock) Lock(key []byte) {
44+
k.locks[crc32.Checksum(key, k.table)%k.count].Lock()
45+
}
46+
47+
// Unlock the write lock for the specified key.
48+
func (k *KeyLock) Unlock(key []byte) {
49+
k.locks[crc32.Checksum(key, k.table)%k.count].Unlock()
50+
}
51+
52+
// Acquire a read lock for the given key. Multiple read locks can be acquired
53+
// concurrently, but a write lock cannot be acquired while any read locks are held.
54+
func (k *KeyLock) RLock(key []byte) {
55+
k.locks[crc32.Checksum(key, k.table)%k.count].RLock()
56+
}
57+
58+
// Release a read lock for the specified key.
59+
func (k *KeyLock) RUnlock(key []byte) {
60+
k.locks[crc32.Checksum(key, k.table)%k.count].RUnlock()
61+
}
62+
63+
// Return the index of the lock for the given key (useful for debugging).
64+
func (k *KeyLock) Index(key []byte) int {
65+
return int(crc32.Checksum(key, k.table) % k.count)
66+
}

pkg/store/locks/locks_test.go

+121
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package locks_test
2+
3+
import (
4+
"crypto/rand"
5+
random "math/rand/v2"
6+
"sync"
7+
"testing"
8+
9+
"github.com/rotationalio/honu/pkg/store/locks"
10+
"github.com/stretchr/testify/require"
11+
)
12+
13+
func TestLocking(t *testing.T) {
14+
mu := locks.New(0)
15+
k1 := []byte{28, 38, 142, 19, 36, 13, 138, 224, 208, 120, 174, 202, 245, 249, 3, 170, 219, 30, 199, 222, 214, 181, 20, 190, 160, 228, 153, 169, 50, 227, 233, 158, 213, 202, 71, 43, 228, 172, 50, 245, 129, 145, 45, 241, 52}
16+
k2 := []byte{130, 255, 26, 113, 234, 106, 222, 190, 243, 205, 105, 236, 252, 191, 170, 22, 103, 99, 205, 208, 252, 137, 32, 197, 180, 40, 46, 90, 120, 224, 109, 215, 124, 190, 67, 208, 114, 208, 64, 13, 144, 88, 188, 112, 144}
17+
k3 := []byte{43, 190, 204, 198, 110, 31, 120, 141, 165, 0, 172, 63, 39, 97, 37, 6, 189, 186, 23, 208, 124, 212, 163, 232, 153, 81, 105, 133, 147, 247, 192, 66, 253, 243, 163, 225, 35, 22, 230, 42, 55, 4, 83, 74, 240}
18+
19+
// Make sure the three keys have different indexes or this test will fail.
20+
require.NotEqual(t, mu.Index(k1), mu.Index(k2))
21+
require.NotEqual(t, mu.Index(k1), mu.Index(k3))
22+
require.NotEqual(t, mu.Index(k2), mu.Index(k3))
23+
24+
mu.Lock(k1)
25+
mu.Lock(k2)
26+
mu.Lock(k3)
27+
28+
mu.Unlock(k2)
29+
mu.RLock(k2)
30+
mu.RLock(k2)
31+
mu.RLock(k2)
32+
33+
mu.RUnlock(k2)
34+
mu.RUnlock(k2)
35+
mu.RUnlock(k2)
36+
37+
mu.Unlock(k1)
38+
mu.Unlock(k3)
39+
}
40+
41+
func TestContention(t *testing.T) {
42+
mu := locks.New(128)
43+
wg := sync.WaitGroup{}
44+
45+
wg.Add(1024)
46+
for i := 0; i < 1024; i++ {
47+
go func() {
48+
defer wg.Done()
49+
for j := 0; j < 512; j++ {
50+
k := RandomKey()
51+
mu.Lock(k)
52+
mu.Unlock(k)
53+
}
54+
}()
55+
}
56+
57+
wg.Add(2048)
58+
for i := 0; i < 2048; i++ {
59+
go func() {
60+
defer wg.Done()
61+
for j := 0; j < 1024; j++ {
62+
k := RandomKey()
63+
mu.RLock(k)
64+
mu.RUnlock(k)
65+
}
66+
}()
67+
}
68+
69+
wg.Wait()
70+
}
71+
72+
func BenchmarkLocks(b *testing.B) {
73+
keys := make([][]byte, 128)
74+
for i := 0; i < 128; i++ {
75+
keys[i] = RandomKey()
76+
}
77+
78+
runContention := func(f func([]byte)) {
79+
wg := sync.WaitGroup{}
80+
wg.Add(1024)
81+
for i := 0; i < 1024; i++ {
82+
go func() {
83+
defer wg.Done()
84+
for j := 0; j < 512; j++ {
85+
k := keys[random.IntN(128)]
86+
f(k)
87+
}
88+
}()
89+
}
90+
wg.Wait()
91+
}
92+
93+
b.Run("Mutex", func(b *testing.B) {
94+
var mu sync.Mutex
95+
for i := 0; i < b.N; i++ {
96+
runContention(func(k []byte) {
97+
mu.Lock()
98+
_ = len(k)
99+
mu.Unlock()
100+
})
101+
}
102+
})
103+
104+
b.Run("KeyLock", func(b *testing.B) {
105+
mu := locks.New(128)
106+
for i := 0; i < b.N; i++ {
107+
runContention(func(k []byte) {
108+
mu.Lock(k)
109+
_ = len(k)
110+
mu.Unlock(k)
111+
})
112+
}
113+
})
114+
115+
}
116+
117+
func RandomKey() []byte {
118+
k := make([]byte, 45)
119+
rand.Read(k)
120+
return k
121+
}

0 commit comments

Comments
 (0)