Skip to content

Commit

Permalink
fix: fix the deadlock caused by metrics gauge callback (#597)
Browse files Browse the repository at this point in the history
### Motivation

Fixed the deadlock caused by the metrics register lock. 




FYI:
[block.log](https://github.com/user-attachments/files/18522124/block.log)
  • Loading branch information
mattisonchao authored Jan 23, 2025
1 parent 11e8833 commit 851f57d
Show file tree
Hide file tree
Showing 6 changed files with 229 additions and 12 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pr_build_and_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ jobs:
--time-limit 60 --concurrency 2n --latency 10 --latency-dist uniform
- name: Upload test results
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
if: failure()
with:
name: maelstrom-result
Expand Down
33 changes: 33 additions & 0 deletions common/collection/map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2025 StreamNative, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collection

type Map[K comparable, V any] interface {
Put(key K, value V)
Get(key K) (value V, found bool)
Remove(key K)
Keys() []K
Empty() bool
Size() int
Clear()
Values() []V
String() string
}

func NewVisibleMap[K comparable, V any]() Map[K, V] {
return &visibleMap[K, V]{
container: make(map[K]V),
}
}
94 changes: 94 additions & 0 deletions common/collection/visible_map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2025 StreamNative, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collection

import (
"encoding/json"
"sync/atomic"
)

// visibleMap is a generic struct designed to provide a thread-safe mapping.
// It uses atomic operations to support safe concurrent reads of the size.
//
// K: The type of the key, which must be comparable.
// V: The type of the value, which can be any type (any).
//
// Fields:
// - container: A map storing key-value pairs of type map[K]V.
// - size: An atomic integer storing the number of elements in the map,
// using atomic.Int32 to support efficient concurrent reads.
//
// This struct is designed to allow multiple goroutines to safely read size
// without needing locks, thereby improving performance. However, write operations
// (such as adding or removing elements) must still ensure thread safety,
// potentially requiring additional synchronization mechanisms.
type visibleMap[K comparable, V any] struct {
container map[K]V
size atomic.Int32
}

func (v *visibleMap[K, V]) Put(key K, value V) {
if _, exist := v.Get(key); !exist {
v.size.Add(1)
}
v.container[key] = value
}

func (v *visibleMap[K, V]) Get(key K) (value V, found bool) {
value, found = v.container[key]
return value, found
}
func (v *visibleMap[K, V]) Remove(key K) {
if _, exist := v.Get(key); exist {
v.size.Add(-1)
}
delete(v.container, key)
}

func (v *visibleMap[K, V]) Keys() []K {
keys := make([]K, 0, len(v.container))
for key := range v.container {
keys = append(keys, key)
}
return keys
}

func (v *visibleMap[K, V]) Values() []V {
values := make([]V, 0, len(v.container))
for key := range v.container {
values = append(values, v.container[key])
}
return values
}

func (v *visibleMap[K, V]) Empty() bool {
return v.size.Load() == 0
}

func (v *visibleMap[K, V]) Size() int {
return int(v.size.Load())
}
func (v *visibleMap[K, V]) Clear() {
v.container = make(map[K]V)
v.size.Store(0)
}

func (v *visibleMap[K, V]) String() string {
data, err := json.Marshal(v.container)
if err != nil {
panic(err)
}
return string(data)
}
91 changes: 91 additions & 0 deletions common/collection/visible_map_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2025 StreamNative, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collection

import (
"github.com/stretchr/testify/assert"
"testing"
)

func TestVisibleMap(t *testing.T) {
vm := NewVisibleMap[string, int]()

size := vm.Size()
assert.Equal(t, size, 0)
assert.True(t, vm.Empty())
vm.Put("one", 1)
val, found := vm.Get("one")
assert.Equal(t, val, 1)
assert.True(t, found)

// test repeat put
vm.Put("one", 10)
val, found = vm.Get("one")
assert.Equal(t, val, 10)
assert.True(t, found)
assert.Equal(t, vm.Size(), 1)

vm.Put("two", 2)
vm.Put("three", 3)
assert.Equal(t, vm.Size(), 3)

keys := vm.Keys()
values := vm.Values()
expectedKeys := []string{"one", "two", "three"}
expectedValues := []int{10, 2, 3}

for _, key := range expectedKeys {
found := false
for _, k := range keys {
if k == key {
found = true
break
}
}
if !found {
t.Errorf("Expected key %s not found", key)
}
}

for _, value := range expectedValues {
found := false
for _, v := range values {
if v == value {
found = true
break
}
}
if !found {
t.Errorf("Expected value %d not found", value)
}
}

vm.Remove("two")
_, found = vm.Get("two")
assert.False(t, found)
assert.Equal(t, vm.Size(), 2)

vm.Clear()
assert.Equal(t, vm.Size(), 0)
assert.True(t, vm.Empty())

vm.Put("four", 4)
output := vm.String()
assert.Equal(t, "{\"four\":4}", output)

vm.Clear()
output = vm.String()
assert.Equal(t, "{}", output)
}
4 changes: 2 additions & 2 deletions server/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func startSession(sessionId SessionId, sessionMetadata *proto.SessionMetadata, s
slog.Int64("shard", sm.shardId),
),
}
sm.sessions[sessionId] = s
sm.sessions.Put(sessionId, s)

s.ctx, s.cancel = context.WithCancel(context.Background())

Expand Down Expand Up @@ -184,7 +184,7 @@ func (s *session) waitForHeartbeats() {
s.Unlock()

s.sm.Lock()
delete(s.sm.sessions, s.id)
s.sm.sessions.Remove(s.id)
s.sm.expiredSessions.Inc()
s.sm.Unlock()
}
Expand Down
17 changes: 8 additions & 9 deletions server/session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package server
import (
"context"
"fmt"
"github.com/streamnative/oxia/common/collection"
"io"
"log/slog"
"net/url"
Expand Down Expand Up @@ -79,7 +80,7 @@ type sessionManager struct {
leaderController *leaderController
namespace string
shardId int64
sessions map[SessionId]*session
sessions collection.Map[SessionId, *session]
log *slog.Logger

ctx context.Context
Expand All @@ -94,7 +95,7 @@ type sessionManager struct {
func NewSessionManager(ctx context.Context, namespace string, shardId int64, controller *leaderController) SessionManager {
labels := metrics.LabelsForShard(namespace, shardId)
sm := &sessionManager{
sessions: make(map[SessionId]*session),
sessions: collection.NewVisibleMap[SessionId, *session](),
namespace: namespace,
shardId: shardId,
leaderController: controller,
Expand All @@ -117,9 +118,7 @@ func NewSessionManager(ctx context.Context, namespace string, shardId int64, con

sm.activeSessions = metrics.NewGauge("oxia_server_session_active",
"The number of sessions currently active", "count", labels, func() int64 {
sm.RLock()
defer sm.RUnlock()
return int64(len(sm.sessions))
return int64(sm.sessions.Size())
})

return sm
Expand Down Expand Up @@ -172,7 +171,7 @@ func (sm *sessionManager) createSession(request *proto.CreateSessionRequest, min
}

func (sm *sessionManager) getSession(sessionId int64) (*session, error) {
s, found := sm.sessions[SessionId(sessionId)]
s, found := sm.sessions.Get(SessionId(sessionId))
if !found {
sm.log.Warn(
"Session not found",
Expand Down Expand Up @@ -201,7 +200,7 @@ func (sm *sessionManager) CloseSession(request *proto.CloseSessionRequest) (*pro
sm.Unlock()
return nil, err
}
delete(sm.sessions, s.id)
sm.sessions.Remove(s.id)
sm.Unlock()
s.Lock()
defer s.Unlock()
Expand Down Expand Up @@ -294,8 +293,8 @@ func (sm *sessionManager) Close() error {
sm.Lock()
defer sm.Unlock()
sm.cancel()
for _, s := range sm.sessions {
delete(sm.sessions, s.id)
for _, s := range sm.sessions.Values() {
sm.sessions.Remove(s.id)
s.Lock()
s.closeChannels()
s.Unlock()
Expand Down

0 comments on commit 851f57d

Please sign in to comment.