Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cache: readShard: use a separate mutex #4274

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 2 additions & 16 deletions internal/cache/clockpro.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,25 +147,11 @@ func (c *shard) getWithMaybeReadEntry(k key, desireReadEntry bool) (*Value, *rea
e.referenced.Store(true)
}
}
c.mu.RUnlock()
var re *readEntry
if value == nil && desireReadEntry {
c.mu.Lock()
// After the c.mu.RUnlock(), someone could have inserted the value in the
// cache. We could tolerate the race and do a file read, or do another map
// lookup. We choose to do the latter, since the cost of a map lookup is
// insignificant compared to the cost of reading a block from a file.
if e, _ := c.blocks.Get(k); e != nil {
value = e.acquireValue()
if value != nil {
e.referenced.Store(true)
}
}
if value == nil {
re = c.readShard.acquireReadEntryLocked(k)
}
c.mu.Unlock()
re = c.readShard.acquireReadEntry(k)
}
c.mu.RUnlock()
if value == nil {
c.misses.Add(1)
} else {
Expand Down
110 changes: 50 additions & 60 deletions internal/cache/read_shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"
"time"

"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/swiss"
)

Expand Down Expand Up @@ -39,16 +40,15 @@ import (
//
// Design choices and motivation:
//
// - readShard is tightly integrated with a cache shard: At its core,
// readShard is a map with synchronization. For the same reason the cache is
// sharded (for higher concurrency by sharding the mutex), it is beneficial
// to shard synchronization on readShard. By making readShard a member of
// shard, this sharding is trivially accomplished. Additionally, the code
// feels cleaner when there isn't a race between a cache miss, followed by
// creating a readEntry that is no longer needed because someone else has
// done the read since the miss and inserted into the cache. By making the
// readShard use shard.mu, such a race is avoided. A side benefit is that
// the cache interaction can be hidden behind readEntry.SetReadValue. One
// - At its core, readShard is a map with synchronization. For the same reason
// the cache is sharded (for higher concurrency by sharding the mutex), it
// is beneficial to shard synchronization on readShard. By making readShard
// a member of shard, this sharding is trivially accomplished. readShard has
// its own mutex (separate from shard.mu), in order to avoid write-locking
// shard.mu when we start a read.
//
// - readShard is integrated with the corresponding cache shard; this allows
// the cache interaction to be hidden behind readEntry.SetReadValue. One
// disadvantage of this tightly integrated design is that it does not
// encompass readers that will put the read value into a block.BufferPool --
// we don't worry about those since block.BufferPool is only used for
Expand All @@ -69,10 +69,8 @@ type readShard struct {
// shard is only used for locking, and calling shard.Set.
shard *shard
// Protected by shard.mu.
//
// shard.mu is never held when acquiring readEntry.mu. shard.mu is a shared
// resource and must be released quickly.
shardMu struct {
mu struct {
sync.Mutex
readMap swiss.Map[key, *readEntry]
}
}
Expand All @@ -82,27 +80,35 @@ func (rs *readShard) Init(shard *shard) *readShard {
shard: shard,
}
// Choice of 16 is arbitrary.
rs.shardMu.readMap.Init(16)
rs.mu.readMap.Init(16)
return rs
}

// acquireReadEntryLocked gets a *readEntry for (id, fileNum, offset). shard.mu is
// already write locked.
func (rs *readShard) acquireReadEntryLocked(k key) *readEntry {
e, ok := rs.shardMu.readMap.Get(k)
if !ok {
e = newReadEntry(rs, k)
rs.shardMu.readMap.Put(k, e)
} else {
e.refCount.acquireAllowZero()
// acquireReadEntry acquires a *readEntry for (id, fileNum, offset), creating
// one if necessary.
func (rs *readShard) acquireReadEntry(k key) *readEntry {
rs.mu.Lock()
defer rs.mu.Unlock()

if e, ok := rs.mu.readMap.Get(k); ok {
// An entry we found in the map while holding the mutex must have a non-zero
// reference count.
if e.refCount < 1 {
panic("invalid reference count")
}
e.refCount++
return e
}

e := newReadEntry(rs, k)
rs.mu.readMap.Put(k, e)
return e
}

func (rs *readShard) lenForTesting() int {
rs.shard.mu.Lock()
defer rs.shard.mu.Unlock()
return rs.shardMu.readMap.Len()
rs.mu.Lock()
defer rs.mu.Unlock()
return rs.mu.readMap.Len()
}

// readEntry is used to coordinate between concurrent attempted readers of the
Expand Down Expand Up @@ -146,10 +152,8 @@ type readEntry struct {
errorDuration time.Duration
readStart time.Time
}
// Count of ReadHandles that refer to this readEntry. Increments always hold
// shard.mu. So if this is found to be 0 while holding shard.mu, it is safe
// to delete readEntry from readShard.shardMu.readMap.
refCount refcnt
// Count of ReadHandles that refer to this readEntry. Protected by readShard.mu.
refCount int32
}

var readEntryPool = sync.Pool{
Expand All @@ -163,8 +167,8 @@ func newReadEntry(rs *readShard, k key) *readEntry {
*e = readEntry{
readShard: rs,
key: k,
refCount: 1,
}
e.refCount.init(1)
return e
}

Expand Down Expand Up @@ -261,40 +265,26 @@ func (e *readEntry) waitForReadPermissionOrHandle(

// unrefAndTryRemoveFromMap reduces the reference count of e and removes e.key
// => e from the readMap if necessary.
//
// It is possible that after unreffing that s.e has already been removed, and
// is now back in the sync.Pool, or being reused (for the same or different
// key). This is because after unreffing, which caused the s.e.refCount to
// become zero, but before acquiring shard.mu, it could have been incremented
// and decremented concurrently, and some other goroutine could have observed
// a different decrement to 0, and raced ahead and deleted s.e from the
// readMap.
func (e *readEntry) unrefAndTryRemoveFromMap() {
// Save the fields we need from entry; once we release the last refcount, it
// is possible that the entry is found and reused and then freed.
rs := e.readShard
k := e.key
if !e.refCount.release() {
rs.mu.Lock()
e.refCount--
if e.refCount > 0 {
// Entry still in use.
rs.mu.Unlock()
return
}
// Once we release the refcount, it is possible that it the entry is reused
// again and freed before we get the lock.
rs.shard.mu.Lock()
e2, ok := rs.shardMu.readMap.Get(k)
if !ok || e2 != e {
// Already removed.
rs.shard.mu.Unlock()
return
if e.refCount < 0 {
panic("invalid reference count")
}
if e.refCount.value() != 0 {
// The entry was reused.
rs.shard.mu.Unlock()
return
// The refcount is now 0; remove from the map.
if invariants.Enabled {
if e2, ok := rs.mu.readMap.Get(e.key); !ok || e2 != e {
panic("entry not in readMap")
}
}
// e.refCount == 0. And it cannot be incremented since
// shard.mu.Lock() is held. So remove from map.
rs.shardMu.readMap.Delete(k)
rs.shard.mu.Unlock()
rs.mu.readMap.Delete(e.key)
rs.mu.Unlock()

// Free s.e.
e.mu.v.Release()
Expand Down
15 changes: 2 additions & 13 deletions internal/cache/refcnt_normal.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,8 @@ func (v *refcnt) acquire() {
}
}

// acquireAllowZero is the same as acquire, but allows acquireAllowZero to be
// called with a zero refcnt. This is useful for cases where the entry which
// is being reference counted is inside a container and the container does not
// hold a reference. The container uses release() returning true to attempt to
// do a cleanup from the container.
func (v *refcnt) acquireAllowZero() {
v.val.Add(1)
}

// release decrements the reference count and returns true when the reference
// count becomes 0.
func (v *refcnt) release() bool {
switch v := v.val.Add(-1); {
case v < 0:
Expand All @@ -57,10 +50,6 @@ func (v *refcnt) release() bool {
}
}

func (v *refcnt) value() int32 {
return v.val.Load()
}

func (v *refcnt) trace(msg string) {
}

Expand Down
14 changes: 0 additions & 14 deletions internal/cache/refcnt_tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,6 @@ func (v *refcnt) acquire() {
v.trace("acquire")
}

// acquireAllowZero is the same as acquire, but allows acquireAllowZero to be
// called with a zero refcnt. This is useful for cases where the entry which
// is being reference counted is inside a container and the container does not
// hold a reference. The container uses release() returning true to attempt to
// do a cleanup from the container.
func (v *refcnt) acquireAllowZero() {
v.val.Add(1)
v.trace("acquire")
}

func (v *refcnt) release() bool {
n := v.val.Add(-1)
switch {
Expand All @@ -61,10 +51,6 @@ func (v *refcnt) release() bool {
return n == 0
}

func (v *refcnt) value() int32 {
return v.val.Load()
}

func (v *refcnt) trace(msg string) {
s := fmt.Sprintf("%s: refs=%d\n%s", msg, v.refs(), debug.Stack())
v.Lock()
Expand Down
Loading