From 11720c4ca625021aa2bd5ab5b775941b4c07e795 Mon Sep 17 00:00:00 2001 From: Mark Cilia Vincenti Date: Sat, 3 May 2025 10:36:06 +0200 Subject: [PATCH] Locking perf improvements --- .../Internal/Caching/AsyncDuplicateLock.cs | 65 ------------------- TildeSql/Internal/Common/AsyncLock.cs | 42 ------------ TildeSql/Internal/QueryEngine.cs | 40 +++++------- TildeSql/TildeSql.csproj | 1 + 4 files changed, 19 insertions(+), 129 deletions(-) delete mode 100644 TildeSql/Internal/Caching/AsyncDuplicateLock.cs delete mode 100644 TildeSql/Internal/Common/AsyncLock.cs diff --git a/TildeSql/Internal/Caching/AsyncDuplicateLock.cs b/TildeSql/Internal/Caching/AsyncDuplicateLock.cs deleted file mode 100644 index 227d095..0000000 --- a/TildeSql/Internal/Caching/AsyncDuplicateLock.cs +++ /dev/null @@ -1,65 +0,0 @@ -namespace TildeSql.Internal.Caching; - -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; - -/// -/// https://stackoverflow.com/a/31194647/616522 -/// -public sealed class AsyncDuplicateLock { - private sealed class RefCounted { - public RefCounted(T value) { - RefCount = 1; - Value = value; - } - - public int RefCount { get; set; } - - public T Value { get; private set; } - } - - private static readonly Dictionary> SemaphoreSlims = new Dictionary>(); - - private SemaphoreSlim GetOrCreate(object key) { - RefCounted item; - lock (SemaphoreSlims) { - if (SemaphoreSlims.TryGetValue(key, out item)) { - ++item.RefCount; - } - else { - item = new RefCounted(new SemaphoreSlim(1, 1)); - SemaphoreSlims[key] = item; - } - } - - return item.Value; - } - - public IDisposable Lock(object key) { - GetOrCreate(key).Wait(); - return new Releaser { Key = key }; - } - - public async Task LockAsync(object key) { - await GetOrCreate(key).WaitAsync().ConfigureAwait(false); - return new Releaser { Key = key }; - } - - private sealed class Releaser : IDisposable { - public object Key { get; set; } - - public void Dispose() { - RefCounted item; - lock (SemaphoreSlims) { - item = SemaphoreSlims[Key]; - --item.RefCount; - if (item.RefCount == 0) - SemaphoreSlims.Remove(Key); - } - - item.Value.Release(); - } - } -} \ No newline at end of file diff --git a/TildeSql/Internal/Common/AsyncLock.cs b/TildeSql/Internal/Common/AsyncLock.cs deleted file mode 100644 index 689803e..0000000 --- a/TildeSql/Internal/Common/AsyncLock.cs +++ /dev/null @@ -1,42 +0,0 @@ -namespace TildeSql.Internal.Common { - using System; - using System.Threading; - using System.Threading.Tasks; - - /// - /// https://www.hanselman.com/blog/comparing-two-techniques-in-net-asynchronous-coordination-primitives - /// - internal sealed class AsyncLock { - private readonly SemaphoreSlim semaphore = new(1, 1); - - private readonly Task releaser; - - public AsyncLock() { - this.releaser = Task.FromResult((IDisposable)new Releaser(this)); - } - - public Task LockAsync(CancellationToken cancellationToken = default) { - var wait = this.semaphore.WaitAsync(cancellationToken); - return wait.IsCompleted - ? this.releaser - : wait.ContinueWith( - (_, state) => (IDisposable)state, - this.releaser.Result, - cancellationToken, - TaskContinuationOptions.ExecuteSynchronously, - TaskScheduler.Default); - } - - private sealed class Releaser : IDisposable { - private readonly AsyncLock toRelease; - - internal Releaser(AsyncLock toRelease) { - this.toRelease = toRelease; - } - - public void Dispose() { - this.toRelease.semaphore.Release(); - } - } - } -} \ No newline at end of file diff --git a/TildeSql/Internal/QueryEngine.cs b/TildeSql/Internal/QueryEngine.cs index 5225ecf..388d4b4 100644 --- a/TildeSql/Internal/QueryEngine.cs +++ b/TildeSql/Internal/QueryEngine.cs @@ -5,6 +5,8 @@ using System.Threading; using System.Threading.Tasks; + using AsyncKeyedLock; + using Microsoft.Extensions.Caching.Distributed; using Microsoft.Extensions.Caching.Memory; @@ -17,7 +19,7 @@ using TildeSql.UnitOfWork; class QueryEngine : IAsyncDisposable, IDisposable { - private readonly AsyncLock mutex = new(); + private readonly AsyncNonKeyedLocker mutex = new(); private readonly ISchema schema; @@ -26,7 +28,7 @@ class QueryEngine : IAsyncDisposable, IDisposable { private readonly UnitOfWork unitOfWork; private readonly IPersistenceQueryExecutor persistenceQueryExecutor; - + private readonly CacheExecutor cacheExecutor; private readonly CacheSetter cacheSetter; @@ -58,12 +60,12 @@ public QueryEngine( IDistributedCache distributedCache, ICacheSerializer cacheSerializer, CacheOptions cacheOptions) { - this.schema = schema; - this.identityMap = identityMap; - this.unitOfWork = unitOfWork; + this.schema = schema; + this.identityMap = identityMap; + this.unitOfWork = unitOfWork; this.persistenceQueryExecutor = persistenceQueryExecutor; - this.serializer = serializer; - this.identityMapExecutor = new IdentityMapExecutor(this.identityMap, unitOfWork); + this.serializer = serializer; + this.identityMapExecutor = new IdentityMapExecutor(this.identityMap, unitOfWork); if (memoryCache != null || distributedCache != null) { this.cacheExecutor = new CacheExecutor(memoryCache, distributedCache, cacheSerializer, cacheOptions); this.cacheSetter = new CacheSetter(memoryCache, distributedCache, cacheSerializer, cacheOptions); @@ -81,20 +83,14 @@ public async IAsyncEnumerable GetResult(IQuery query) && !this.identityMapQueries.Contains(query) && !(this.cacheExecutorQueries?.Contains(query) ?? false) && !this.persistenceQueryExecutorQueries.Contains(query)) { - var @lock = await this.mutex.LockAsync(); - - try { - // query has not been executed, so let's flush existing queries and then add - await this.FlushPersistenceAsync(); - if (!this.queriesToExecute.Contains(query)) { - this.Add(query); - } - - await this.ExecuteAsync(); - } - finally { - @lock?.Dispose(); + using var _ = await this.mutex.LockAsync(); + // query has not been executed, so let's flush existing queries and then add + await this.FlushPersistenceAsync(); + if (!this.queriesToExecute.Contains(query)) { + this.Add(query); } + + await this.ExecuteAsync(); } if (this.queryForwardMap.TryGetValue(query, out var queries)) { @@ -165,7 +161,7 @@ T HydrateDocument(object[] row) { } public async ValueTask EnsureCleanAsync(CancellationToken cancellationToken = default) { - using var @lock = await this.mutex.LockAsync(cancellationToken); + using var _ = await this.mutex.LockAsync(cancellationToken); await this.FlushPersistenceAsync(); // clear out any non-read queries await this.ExecuteAsync(cancellationToken); // execute any non-executed queries await this.FlushPersistenceAsync(); // ensure that they're also read @@ -231,7 +227,7 @@ async IAsyncEnumerable ExecuteAgainstCacheAsync() { // stampede protection, we take a lock on all cacheable queries var cacheableQueries = queriesStillToExecute.Where(q => q.CacheKey != null).OrderBy(q => q.CacheKey).ToArray(); var locks = new Dictionary(); - var locker = new AsyncDuplicateLock(); + var locker = new AsyncKeyedLocker(o => o.PoolSize = 0); try { foreach (var cacheableQuery in cacheableQueries) { if (!locks.ContainsKey(cacheableQuery.CacheKey)) { diff --git a/TildeSql/TildeSql.csproj b/TildeSql/TildeSql.csproj index d972e54..3020505 100644 --- a/TildeSql/TildeSql.csproj +++ b/TildeSql/TildeSql.csproj @@ -13,6 +13,7 @@ +