From 5da6149827ff48be496b195429f40164df543c3c Mon Sep 17 00:00:00 2001 From: Alex Peck Date: Sun, 4 Sep 2022 15:30:00 -0700 Subject: [PATCH 1/5] multirem --- BitFaster.Caching/Lfu/ConcurrentLfu.cs | 102 +++++++++++++++++++++++-- 1 file changed, 97 insertions(+), 5 deletions(-) diff --git a/BitFaster.Caching/Lfu/ConcurrentLfu.cs b/BitFaster.Caching/Lfu/ConcurrentLfu.cs index 81f7ec8f..d68e25e2 100644 --- a/BitFaster.Caching/Lfu/ConcurrentLfu.cs +++ b/BitFaster.Caching/Lfu/ConcurrentLfu.cs @@ -56,6 +56,8 @@ public sealed class ConcurrentLfu : ICache, IAsyncCache, IBoun private readonly IScheduler scheduler; + private readonly LfuNode[] removeList = new LfuNode[Environment.ProcessorCount]; + #if NETSTANDARD2_0 private readonly LfuNode[] drainBuffer; #endif @@ -276,14 +278,50 @@ private void AfterWrite(LfuNode node) TryScheduleDrain(); + var rem = TryDequeue(); + if (rem != null) + { + this.dictionary.TryRemove(rem.Key, out var _); + Disposer.Dispose(rem.Value); + } + spinner.SpinOnce(); } - lock (this.maintenanceLock) + while (true) { - // if the write was dropped from the buffer, explicitly pass it to maintenance - Maintenance(node); + + var rem = TryDequeue(); + if (rem != null) + { + this.dictionary.TryRemove(rem.Key, out var _); + Disposer.Dispose(rem.Value); + } + + bool wasTaken = false; + Monitor.TryEnter(this.maintenanceLock, ref wasTaken); + try + { + if (wasTaken) + { + Maintenance(node); + return; + } + } + finally + { + if (wasTaken) + { + Monitor.Exit(this.maintenanceLock); + } + } } + + //lock (this.maintenanceLock) + //{ + // // if the write was dropped from the buffer, explicitly pass it to maintenance + // Maintenance(node); + //} } private void ScheduleAfterWrite() @@ -411,6 +449,8 @@ private bool Maintenance(LfuNode droppedWrite = null) this.capacity.OptimizePartitioning(this.metrics, this.cmSketch.ResetSampleSize); ReFitProtected(); + RemoveAll(); + // Reset to idle if either // 1. We drained both input buffers (all work done) // 2. or scheduler is foreground (since don't run continuously on the foreground) @@ -424,6 +464,57 @@ private bool Maintenance(LfuNode droppedWrite = null) return wasDrained; } + private void TryAddToRemoveList(LfuNode node) + { + for (int i = 0; i < removeList.Length; i++) + { + if (Volatile.Read(ref removeList[i]) == null) + { + if (Interlocked.CompareExchange(ref removeList[i], node, null) == null) + { + return; + } + } + } + + // direct remove if not added to list + this.dictionary.TryRemove(node.Key, out var _); + Disposer.Dispose(node.Value); + } + + private LfuNode TryDequeue() + { + for (int i = 0; i < removeList.Length; i++) + { + var node = Volatile.Read(ref removeList[i]); + if (node != null) + { + if (Interlocked.CompareExchange(ref removeList[i], null, node) == node) + { + return node; + } + } + } + + return null; + } + + private void RemoveAll() + { + for (int i = 0; i < removeList.Length; i++) + { + var node = Volatile.Read(ref removeList[i]); + if (node != null) + { + if (Interlocked.CompareExchange(ref removeList[i], null, node) == node) + { + this.dictionary.TryRemove(node.Key, out var _); + Disposer.Dispose(node.Value); + } + } + } + } + private void OnAccess(LfuNode node) { // there was a cache hit even if the item was removed or is not yet added. @@ -606,9 +697,10 @@ private bool AdmitCandidate(K candidateKey, K victimKey) private void Evict(LfuNode evictee) { - this.dictionary.TryRemove(evictee.Key, out var _); + //this.dictionary.TryRemove(evictee.Key, out var _); + TryAddToRemoveList(evictee); evictee.list.Remove(evictee); - Disposer.Dispose(evictee.Value); +// Disposer.Dispose(evictee.Value); this.metrics.evictedCount++; } From 0269a53272402fe96e9d2fb300f44109e258ac87 Mon Sep 17 00:00:00 2001 From: Alex Peck Date: Sun, 4 Sep 2022 15:49:55 -0700 Subject: [PATCH 2/5] pad --- BitFaster.Caching/Lfu/ConcurrentLfu.cs | 50 ++++++++++++-------------- 1 file changed, 22 insertions(+), 28 deletions(-) diff --git a/BitFaster.Caching/Lfu/ConcurrentLfu.cs b/BitFaster.Caching/Lfu/ConcurrentLfu.cs index d68e25e2..254d040d 100644 --- a/BitFaster.Caching/Lfu/ConcurrentLfu.cs +++ b/BitFaster.Caching/Lfu/ConcurrentLfu.cs @@ -56,7 +56,10 @@ public sealed class ConcurrentLfu : ICache, IAsyncCache, IBoun private readonly IScheduler scheduler; - private readonly LfuNode[] removeList = new LfuNode[Environment.ProcessorCount]; + const int padding = 64; + const int spacing = 64; + + private readonly LfuNode[] removeList = new LfuNode[padding + (Environment.ProcessorCount * spacing)]; #if NETSTANDARD2_0 private readonly LfuNode[] drainBuffer; @@ -278,26 +281,13 @@ private void AfterWrite(LfuNode node) TryScheduleDrain(); - var rem = TryDequeue(); - if (rem != null) - { - this.dictionary.TryRemove(rem.Key, out var _); - Disposer.Dispose(rem.Value); - } + TryHelpRemove(); spinner.SpinOnce(); } while (true) { - - var rem = TryDequeue(); - if (rem != null) - { - this.dictionary.TryRemove(rem.Key, out var _); - Disposer.Dispose(rem.Value); - } - bool wasTaken = false; Monitor.TryEnter(this.maintenanceLock, ref wasTaken); try @@ -315,6 +305,8 @@ private void AfterWrite(LfuNode node) Monitor.Exit(this.maintenanceLock); } } + + TryHelpRemove(); } //lock (this.maintenanceLock) @@ -466,11 +458,12 @@ private bool Maintenance(LfuNode droppedWrite = null) private void TryAddToRemoveList(LfuNode node) { - for (int i = 0; i < removeList.Length; i++) + for (int i = 0; i < Environment.ProcessorCount; i++) { - if (Volatile.Read(ref removeList[i]) == null) + int index = padding + (i * spacing); + if (Volatile.Read(ref removeList[index]) == null) { - if (Interlocked.CompareExchange(ref removeList[i], node, null) == null) + if (Interlocked.CompareExchange(ref removeList[index], node, null) == null) { return; } @@ -482,31 +475,32 @@ private void TryAddToRemoveList(LfuNode node) Disposer.Dispose(node.Value); } - private LfuNode TryDequeue() + private void TryHelpRemove() { - for (int i = 0; i < removeList.Length; i++) + for (int i = 0; i < Environment.ProcessorCount; i++) { - var node = Volatile.Read(ref removeList[i]); + int index = padding + (i * spacing); + var node = Volatile.Read(ref removeList[index]); if (node != null) { - if (Interlocked.CompareExchange(ref removeList[i], null, node) == node) + if (Interlocked.CompareExchange(ref removeList[index], null, node) == node) { - return node; + this.dictionary.TryRemove(node.Key, out var _); + Disposer.Dispose(node.Value); } } } - - return null; } private void RemoveAll() { - for (int i = 0; i < removeList.Length; i++) + for (int i = 0; i < Environment.ProcessorCount; i++) { - var node = Volatile.Read(ref removeList[i]); + int index = padding + (i * spacing); + var node = Volatile.Read(ref removeList[index]); if (node != null) { - if (Interlocked.CompareExchange(ref removeList[i], null, node) == node) + if (Interlocked.CompareExchange(ref removeList[index], null, node) == node) { this.dictionary.TryRemove(node.Key, out var _); Disposer.Dispose(node.Value); From 7074a7d4f9aad916c4b1be17ef5a32f17c061bb6 Mon Sep 17 00:00:00 2001 From: Alex Peck Date: Sun, 4 Sep 2022 17:22:27 -0700 Subject: [PATCH 3/5] buff --- BitFaster.Caching/Lfu/ConcurrentLfu.cs | 96 ++++++++++++++++---------- 1 file changed, 59 insertions(+), 37 deletions(-) diff --git a/BitFaster.Caching/Lfu/ConcurrentLfu.cs b/BitFaster.Caching/Lfu/ConcurrentLfu.cs index 254d040d..b6132fea 100644 --- a/BitFaster.Caching/Lfu/ConcurrentLfu.cs +++ b/BitFaster.Caching/Lfu/ConcurrentLfu.cs @@ -7,6 +7,7 @@ using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; +using System.Xml.Linq; using BitFaster.Caching.Buffers; using BitFaster.Caching.Concurrent; using BitFaster.Caching.Lru; @@ -59,7 +60,9 @@ public sealed class ConcurrentLfu : ICache, IAsyncCache, IBoun const int padding = 64; const int spacing = 64; - private readonly LfuNode[] removeList = new LfuNode[padding + (Environment.ProcessorCount * spacing)]; + // private readonly LfuNode[] removeList = new LfuNode[padding + (Environment.ProcessorCount * spacing)]; + + private readonly MpmcBoundedBuffer> removeList2 = new MpmcBoundedBuffer>(Environment.ProcessorCount); #if NETSTANDARD2_0 private readonly LfuNode[] drainBuffer; @@ -281,7 +284,7 @@ private void AfterWrite(LfuNode node) TryScheduleDrain(); - TryHelpRemove(); + //TryHelpRemove(); spinner.SpinOnce(); } @@ -306,7 +309,7 @@ private void AfterWrite(LfuNode node) } } - TryHelpRemove(); + //TryHelpRemove(); } //lock (this.maintenanceLock) @@ -458,54 +461,73 @@ private bool Maintenance(LfuNode droppedWrite = null) private void TryAddToRemoveList(LfuNode node) { - for (int i = 0; i < Environment.ProcessorCount; i++) + //for (int i = 0; i < Environment.ProcessorCount; i++) + //{ + // int index = padding + (i * spacing); + // if (Volatile.Read(ref removeList[index]) == null) + // { + // if (Interlocked.CompareExchange(ref removeList[index], node, null) == null) + // { + // return; + // } + // } + //} + + if (removeList2.TryAdd(node) != BufferStatus.Success) { - int index = padding + (i * spacing); - if (Volatile.Read(ref removeList[index]) == null) - { - if (Interlocked.CompareExchange(ref removeList[index], node, null) == null) - { - return; - } - } + // direct remove if not added to list + this.dictionary.TryRemove(node.Key, out var _); + Disposer.Dispose(node.Value); + } - // direct remove if not added to list - this.dictionary.TryRemove(node.Key, out var _); - Disposer.Dispose(node.Value); } private void TryHelpRemove() { - for (int i = 0; i < Environment.ProcessorCount; i++) + //for (int i = 0; i < Environment.ProcessorCount; i++) + //{ + // int index = padding + (i * spacing); + // var node = Volatile.Read(ref removeList[index]); + // if (node != null) + // { + // if (Interlocked.CompareExchange(ref removeList[index], null, node) == node) + // { + // this.dictionary.TryRemove(node.Key, out var _); + // Disposer.Dispose(node.Value); + // } + // } + //} + + if (removeList2.TryTake(out var node) != BufferStatus.Success) { - int index = padding + (i * spacing); - var node = Volatile.Read(ref removeList[index]); - if (node != null) - { - if (Interlocked.CompareExchange(ref removeList[index], null, node) == node) - { - this.dictionary.TryRemove(node.Key, out var _); - Disposer.Dispose(node.Value); - } - } + // direct remove if not added to list + this.dictionary.TryRemove(node.Key, out var _); + Disposer.Dispose(node.Value); + } } private void RemoveAll() { - for (int i = 0; i < Environment.ProcessorCount; i++) + //for (int i = 0; i < Environment.ProcessorCount; i++) + //{ + // int index = padding + (i * spacing); + // var node = Volatile.Read(ref removeList[index]); + // if (node != null) + // { + // if (Interlocked.CompareExchange(ref removeList[index], null, node) == node) + // { + // this.dictionary.TryRemove(node.Key, out var _); + // Disposer.Dispose(node.Value); + // } + // } + //} + + while (removeList2.TryTake(out var node) != BufferStatus.Empty) { - int index = padding + (i * spacing); - var node = Volatile.Read(ref removeList[index]); - if (node != null) - { - if (Interlocked.CompareExchange(ref removeList[index], null, node) == node) - { - this.dictionary.TryRemove(node.Key, out var _); - Disposer.Dispose(node.Value); - } - } + this.dictionary.TryRemove(node.Key, out var _); + Disposer.Dispose(node.Value); } } From 7121e47c14bdca93d8e52c2df816b690043a85aa Mon Sep 17 00:00:00 2001 From: Alex Peck Date: Thu, 8 Sep 2022 14:20:07 -0700 Subject: [PATCH 4/5] buffer --- BitFaster.Caching/Lfu/ConcurrentLfu.cs | 240 +++++++++++++++---------- 1 file changed, 149 insertions(+), 91 deletions(-) diff --git a/BitFaster.Caching/Lfu/ConcurrentLfu.cs b/BitFaster.Caching/Lfu/ConcurrentLfu.cs index 60b9af8d..142b230f 100644 --- a/BitFaster.Caching/Lfu/ConcurrentLfu.cs +++ b/BitFaster.Caching/Lfu/ConcurrentLfu.cs @@ -7,7 +7,6 @@ using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; -using System.Xml.Linq; using BitFaster.Caching.Buffers; using BitFaster.Caching.Concurrent; using BitFaster.Caching.Lru; @@ -57,12 +56,8 @@ public sealed class ConcurrentLfu : ICache, IAsyncCache, IBoun private readonly IScheduler scheduler; - const int padding = 64; - const int spacing = 64; - - // private readonly LfuNode[] removeList = new LfuNode[padding + (Environment.ProcessorCount * spacing)]; - - private readonly MpmcBoundedBuffer> removeList2 = new MpmcBoundedBuffer>(Environment.ProcessorCount); + private readonly MpmcBoundedBuffer>> cleanBufferQueue = new MpmcBoundedBuffer>>(Environment.ProcessorCount); + private readonly MpmcBoundedBuffer>> deleteBufferQueue = new MpmcBoundedBuffer>>(Environment.ProcessorCount); #if NETSTANDARD2_0 private readonly LfuNode[] drainBuffer; @@ -176,6 +171,7 @@ public void Trim(int itemCount) { // flush all buffers Maintenance(); + DeleteAllBufferedItems(); // walk in lru order, get itemCount keys to evict TakeCandidatesInLruOrder(this.probationLru, candidates, itemCount); @@ -325,51 +321,70 @@ private void AfterWrite(LfuNode node) } TryScheduleDrain(); - //TryHelpRemove(); - + TryDeleteBufferedItems(); } - while (true) + //var spinner = new SpinWait(); + //while (true) + //{ + // bool wasTaken = false; + // Monitor.TryEnter(this.maintenanceLock, ref wasTaken); + // try + // { + // if (wasTaken) + // { + // // aggressively try to exit the lock early before doing full maintenance + // var status = BufferStatus.Contended; + // while (status != BufferStatus.Full) + // { + // status = writeBuffer.TryAdd(node); + + // if (status == BufferStatus.Success) + // { + // ScheduleAfterWrite(); + // return; + // } + // } + + // Maintenance(node); + // DeleteAllBufferedItems(); + // return; + // } + // } + // finally + // { + // if (wasTaken) + // { + // Monitor.Exit(this.maintenanceLock); + // } + // } + + // TryDeleteBufferedItems(); + + // //spinner.SpinOnce(); + //} + + lock (this.maintenanceLock) { - bool wasTaken = false; - Monitor.TryEnter(this.maintenanceLock, ref wasTaken); - try + // aggressively try to exit the lock early before doing full maintenance + var status = BufferStatus.Contended; + while (status != BufferStatus.Full) { - // aggressively try to exit the lock early before doing full maintenance - var status = BufferStatus.Contended; - while (status != BufferStatus.Full) - { - status = writeBuffer.TryAdd(node); - - if (status == BufferStatus.Success) - { - ScheduleAfterWrite(); - return; - } - } - - if (wasTaken) + status = writeBuffer.TryAdd(node); + + if (status == BufferStatus.Success) { - Maintenance(node); + ScheduleAfterWrite(); return; } } - finally - { - if (wasTaken) - { - Monitor.Exit(this.maintenanceLock); - } - } - //TryHelpRemove(); + // if the write was dropped from the buffer, explicitly pass it to maintenance + Maintenance(node); } - //lock (this.maintenanceLock) - //{ - // // if the write was dropped from the buffer, explicitly pass it to maintenance - // Maintenance(node); - //} + // remove outside the lock + DeleteAllBufferedItems(); } private void ScheduleAfterWrite() @@ -449,6 +464,9 @@ private void DrainBuffers() done = Maintenance(); } + // remove outside the lock + DeleteAllBufferedItems(); + // don't run continuous foreground maintenance if (!scheduler.IsBackground) { @@ -499,11 +517,10 @@ private bool Maintenance(LfuNode droppedWrite = null) ReturnDrainBuffer(localDrainBuffer); EvictEntries(); + QueueDeleteBuffer(); this.capacity.OptimizePartitioning(this.metrics, this.cmSketch.ResetSampleSize); ReFitProtected(); - RemoveAll(); - // Reset to idle if either // 1. We drained both input buffers (all work done) // 2. or scheduler is foreground (since don't run continuously on the foreground) @@ -519,73 +536,114 @@ private bool Maintenance(LfuNode droppedWrite = null) private void TryAddToRemoveList(LfuNode node) { - //for (int i = 0; i < Environment.ProcessorCount; i++) - //{ - // int index = padding + (i * spacing); - // if (Volatile.Read(ref removeList[index]) == null) - // { - // if (Interlocked.CompareExchange(ref removeList[index], node, null) == null) - // { - // return; - // } - // } - //} + deleteBuffer.Add(node); - if (removeList2.TryAdd(node) != BufferStatus.Success) + if (deleteBuffer.Count >= deleteBufferSize) { - // direct remove if not added to list - this.dictionary.TryRemove(node.Key, out var _); - Disposer.Dispose(node.Value); + // int attempts = 0; - } + while (true) + { + if (this.deleteBufferQueue.TryAdd(deleteBuffer) == BufferStatus.Success) + { + if (this.cleanBufferQueue.TryTake(out deleteBuffer) == BufferStatus.Success) + { + return; + } + + deleteBuffer = new List>(deleteBufferSize); + return; + } + } + + //foreach (var n in deleteBuffer) + //{ + // this.dictionary.TryRemove(n.Key, out var _); + // Disposer.Dispose(n.Value); + //} + //deleteBuffer.Clear(); + } } - private void TryHelpRemove() + private void QueueDeleteBuffer() { - //for (int i = 0; i < Environment.ProcessorCount; i++) + if (deleteBuffer.Count == 0) + { + return; + } + + while (true) + { + if (this.deleteBufferQueue.TryAdd(deleteBuffer) == BufferStatus.Success) + { + if (this.cleanBufferQueue.TryTake(out deleteBuffer) == BufferStatus.Success) + { + return; + } + + deleteBuffer = new List>(deleteBufferSize); + return; + } + } + + //if (this.deleteBufferQueue.TryAdd(deleteBuffer) != BufferStatus.Success) //{ - // int index = padding + (i * spacing); - // var node = Volatile.Read(ref removeList[index]); - // if (node != null) + // foreach (var n in deleteBuffer) // { - // if (Interlocked.CompareExchange(ref removeList[index], null, node) == node) - // { - // this.dictionary.TryRemove(node.Key, out var _); - // Disposer.Dispose(node.Value); - // } + // this.dictionary.TryRemove(n.Key, out var _); + // Disposer.Dispose(n.Value); + // } + + // deleteBuffer.Clear(); + //} + //else + //{ + // if (this.cleanBufferQueue.TryTake(out deleteBuffer) == BufferStatus.Success) + // { + // return; // } + + // deleteBuffer = new List>(deleteBufferSize); //} + } - if (removeList2.TryTake(out var node) != BufferStatus.Success) + const int deleteBufferSize = 128; + private List> deleteBuffer = new List>(deleteBufferSize); + + private void TryDeleteBufferedItems() + { + if (deleteBufferQueue.TryTake(out var buffer) == BufferStatus.Success) { - // direct remove if not added to list - this.dictionary.TryRemove(node.Key, out var _); - Disposer.Dispose(node.Value); + foreach (var n in buffer) + { + this.dictionary.TryRemove(n.Key, out var _); + Disposer.Dispose(n.Value); + } + buffer.Clear(); + this.cleanBufferQueue.TryAdd(buffer); } } - private void RemoveAll() + private void DeleteAllBufferedItems() { - //for (int i = 0; i < Environment.ProcessorCount; i++) - //{ - // int index = padding + (i * spacing); - // var node = Volatile.Read(ref removeList[index]); - // if (node != null) - // { - // if (Interlocked.CompareExchange(ref removeList[index], null, node) == node) - // { - // this.dictionary.TryRemove(node.Key, out var _); - // Disposer.Dispose(node.Value); - // } - // } - //} - - while (removeList2.TryTake(out var node) != BufferStatus.Empty) + var spinner = new SpinWait(); + while (deleteBufferQueue.TryTake(out var buffer) != BufferStatus.Empty) { - this.dictionary.TryRemove(node.Key, out var _); - Disposer.Dispose(node.Value); + if (buffer != null) + { + foreach (var n in buffer) + { + this.dictionary.TryRemove(n.Key, out var _); + Disposer.Dispose(n.Value); + } + + buffer.Clear(); + this.cleanBufferQueue.TryAdd(buffer); + } + + spinner.SpinOnce(); } } From 3da2e1f9d4cf41d7e46ce81237745260b3725fa1 Mon Sep 17 00:00:00 2001 From: Alex Peck Date: Thu, 8 Sep 2022 16:20:59 -0700 Subject: [PATCH 5/5] deletebuffer --- BitFaster.Caching/Lfu/ConcurrentLfu.cs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/BitFaster.Caching/Lfu/ConcurrentLfu.cs b/BitFaster.Caching/Lfu/ConcurrentLfu.cs index 142b230f..430967bf 100644 --- a/BitFaster.Caching/Lfu/ConcurrentLfu.cs +++ b/BitFaster.Caching/Lfu/ConcurrentLfu.cs @@ -324,7 +324,8 @@ private void AfterWrite(LfuNode node) TryDeleteBufferedItems(); } - //var spinner = new SpinWait(); + ////////////////////////////////////////////////////////////////////////////////////////////////// + //while (true) //{ // bool wasTaken = false; @@ -347,8 +348,7 @@ private void AfterWrite(LfuNode node) // } // Maintenance(node); - // DeleteAllBufferedItems(); - // return; + // break; // } // } // finally @@ -358,11 +358,11 @@ private void AfterWrite(LfuNode node) // Monitor.Exit(this.maintenanceLock); // } // } + //} - // TryDeleteBufferedItems(); + //DeleteAllBufferedItems(); - // //spinner.SpinOnce(); - //} + ////////////////////////////////////////////////////////////////////////////////////////////////// lock (this.maintenanceLock) { @@ -483,6 +483,7 @@ private void DrainBuffers() private bool Maintenance(LfuNode droppedWrite = null) { this.drainStatus.Set(DrainStatus.ProcessingToIdle); + var localDrainBuffer = RentDrainBuffer(); // extract to a buffer before doing book keeping work, ~2x faster @@ -518,6 +519,7 @@ private bool Maintenance(LfuNode droppedWrite = null) EvictEntries(); QueueDeleteBuffer(); + this.capacity.OptimizePartitioning(this.metrics, this.cmSketch.ResetSampleSize); ReFitProtected();