diff --git a/src/EventStore.Core.Tests/ClusterVNodeOptionsExtensions.cs b/src/EventStore.Core.Tests/ClusterVNodeOptionsExtensions.cs index 7d198e16342..c5003cdc945 100644 --- a/src/EventStore.Core.Tests/ClusterVNodeOptionsExtensions.cs +++ b/src/EventStore.Core.Tests/ClusterVNodeOptionsExtensions.cs @@ -12,6 +12,7 @@ public static ClusterVNodeOptions ReduceMemoryUsageForTests(this ClusterVNodeOpt Database = options.Database with { ChunkSize = MiniNode.ChunkSize, ChunksCacheSize = MiniNode.CachedChunkSize, + ChunkCachePooling = false, StreamExistenceFilterSize = 10_000, ScavengeBackendCacheSize = 64 * 1024, } diff --git a/src/EventStore.Core.Tests/Helpers/MiniClusterNode.cs b/src/EventStore.Core.Tests/Helpers/MiniClusterNode.cs index 7eeb107cd35..6049822c44f 100644 --- a/src/EventStore.Core.Tests/Helpers/MiniClusterNode.cs +++ b/src/EventStore.Core.Tests/Helpers/MiniClusterNode.cs @@ -136,6 +136,7 @@ public MiniClusterNode(string pathname, int debugIndex, IPEndPoint internalTcp, MaxMemTableSize = memTableSize, MemDb = inMemDb, Db = _dbPath, + ChunkCachePooling = false, ChunkSize = MiniNode.ChunkSize, ChunksCacheSize = MiniNode.CachedChunkSize, StreamExistenceFilterSize = 10_000 diff --git a/src/EventStore.Core.Tests/Helpers/MiniNode.cs b/src/EventStore.Core.Tests/Helpers/MiniNode.cs index c626b41be07..06d1144fd89 100644 --- a/src/EventStore.Core.Tests/Helpers/MiniNode.cs +++ b/src/EventStore.Core.Tests/Helpers/MiniNode.cs @@ -115,6 +115,7 @@ public MiniNode(string pathname, Database = new() { ChunkSize = chunkSize, ChunksCacheSize = cachedChunkSize, + ChunkCachePooling = false, SkipDbVerify = true, StatsStorage = StatsStorage.None, MaxMemTableSize = memTableSize, diff --git a/src/EventStore.Core.Tests/Services/RedactionService/SwitchChunkFailureTests.cs b/src/EventStore.Core.Tests/Services/RedactionService/SwitchChunkFailureTests.cs index 3b8e2b06e3d..451b0be0858 100644 --- a/src/EventStore.Core.Tests/Services/RedactionService/SwitchChunkFailureTests.cs +++ b/src/EventStore.Core.Tests/Services/RedactionService/SwitchChunkFailureTests.cs @@ -131,7 +131,9 @@ public async Task cannot_switch_with_chunk_having_mismatched_range() { newChunk = $"{nameof(cannot_switch_with_chunk_having_mismatched_range)}-chunk-0-2.tmp"; var chunkHeader = new ChunkHeader(1, 1024, 0, 2, true, Guid.NewGuid()); - var chunk = TFChunk.CreateWithHeader(Path.Combine(PathName, newChunk), chunkHeader, 1024, false, false, false, 1, 1, false, + var chunk = TFChunk.CreateWithHeader( + new UnmanagedChunkCacheManager(), + Path.Combine(PathName, newChunk), chunkHeader, 1024, false, false, false, 1, 1, false, new TFChunkTracker.NoOp()); chunk.Dispose(); msg = await SwitchChunk(GetChunk(0, 0), newChunk); diff --git a/src/EventStore.Core.Tests/Services/Replication/LogReplication/LogReplicationWithExistingDbFixture.cs b/src/EventStore.Core.Tests/Services/Replication/LogReplication/LogReplicationWithExistingDbFixture.cs index 10fe3d58eee..17f724fca2c 100644 --- a/src/EventStore.Core.Tests/Services/Replication/LogReplication/LogReplicationWithExistingDbFixture.cs +++ b/src/EventStore.Core.Tests/Services/Replication/LogReplication/LogReplicationWithExistingDbFixture.cs @@ -38,6 +38,7 @@ protected static Task CreateChunk(TFChunkDb db, bool raw, bool complete, int chu chunkId: Guid.NewGuid()); var chunk = TFChunk.CreateWithHeader( + new UnmanagedChunkCacheManager(), filename: filename, header: header, fileSize: TFChunk.GetAlignedSize(db.Config.ChunkSize + ChunkHeader.Size + ChunkFooter.Size), diff --git a/src/EventStore.Core.Tests/TransactionLog/Optimization/tfchunkreader_existsat_optimizer_should.cs b/src/EventStore.Core.Tests/TransactionLog/Optimization/tfchunkreader_existsat_optimizer_should.cs index a341e145ad3..7c4eb243ce5 100644 --- a/src/EventStore.Core.Tests/TransactionLog/Optimization/tfchunkreader_existsat_optimizer_should.cs +++ b/src/EventStore.Core.Tests/TransactionLog/Optimization/tfchunkreader_existsat_optimizer_should.cs @@ -87,7 +87,9 @@ private TFChunk CreateChunk(int chunkNumber, bool scavenged) { private TFChunk CreateChunk(int chunkNumber, bool scavenged, out List posmap) { var map = new List(); - var chunk = TFChunk.CreateNew(GetFilePathFor("chunk-" + chunkNumber + "-" + Guid.NewGuid()), 1024 * 1024, + var chunk = TFChunk.CreateNew( + new UnmanagedChunkCacheManager(), + GetFilePathFor("chunk-" + chunkNumber + "-" + Guid.NewGuid()), 1024 * 1024, chunkNumber, chunkNumber, scavenged, false, false, false, Constants.TFChunkInitialReaderCountDefault, Constants.TFChunkMaxReaderCountDefault, diff --git a/src/EventStore.Core.Tests/TransactionLog/Scavenging/scavenged_chunk.cs b/src/EventStore.Core.Tests/TransactionLog/Scavenging/scavenged_chunk.cs index d08eb03519b..5e22fddfb55 100644 --- a/src/EventStore.Core.Tests/TransactionLog/Scavenging/scavenged_chunk.cs +++ b/src/EventStore.Core.Tests/TransactionLog/Scavenging/scavenged_chunk.cs @@ -11,7 +11,9 @@ public class scavenged_chunk : SpecificationWithFile { [Test] public void is_fully_resident_in_memory_when_cached() { var map = new List(); - var chunk = TFChunk.CreateNew(Filename, 1024 * 1024, 0, 0, true, false, false, false, + var chunk = TFChunk.CreateNew( + new UnmanagedChunkCacheManager(), + Filename, 1024 * 1024, 0, 0, true, false, false, false, Constants.TFChunkInitialReaderCountDefault, Constants.TFChunkMaxReaderCountDefault, false, diff --git a/src/EventStore.Core.Tests/TransactionLog/TFChunkHelper.cs b/src/EventStore.Core.Tests/TransactionLog/TFChunkHelper.cs index a283a8e2dca..72198e041aa 100644 --- a/src/EventStore.Core.Tests/TransactionLog/TFChunkHelper.cs +++ b/src/EventStore.Core.Tests/TransactionLog/TFChunkHelper.cs @@ -70,7 +70,9 @@ public static TFChunkDbConfig CreateDbConfig( } public static TFChunk CreateNewChunk(string fileName, int chunkSize = 4096, bool isScavenged = false) { - return TFChunk.CreateNew(fileName, chunkSize, 0, 0, + return TFChunk.CreateNew( + new UnmanagedChunkCacheManager(), + fileName, chunkSize, 0, 0, isScavenged: isScavenged, inMem: false, unbuffered: false, writethrough: false, initialReaderCount: Constants.TFChunkInitialReaderCountDefault, maxReaderCount: Constants.TFChunkMaxReaderCountDefault, reduceFileCachePressure: false, tracker: new TFChunkTracker.NoOp()); } diff --git a/src/EventStore.Core.Tests/TransactionLog/Validation/when_validating_tfchunk_db.cs b/src/EventStore.Core.Tests/TransactionLog/Validation/when_validating_tfchunk_db.cs index 7d953814feb..6b0424d29c7 100644 --- a/src/EventStore.Core.Tests/TransactionLog/Validation/when_validating_tfchunk_db.cs +++ b/src/EventStore.Core.Tests/TransactionLog/Validation/when_validating_tfchunk_db.cs @@ -446,7 +446,7 @@ public void when_prelast_chunk_corrupted_throw_hash_validation_exception() { .WriteTo.Sink(sink) .MinimumLevel.Verbose() .CreateLogger()) - using (var db = new TFChunkDb(config, new TFChunkTracker.NoOp(), log)) { + using (var db = new TFChunkDb(config, new UnmanagedChunkCacheManager(), new TFChunkTracker.NoOp(), log)) { byte[] contents = new byte[config.ChunkSize]; for (var i = 0; i < config.ChunkSize; i++) { contents[i] = 0; diff --git a/src/EventStore.Core.Tests/TransactionLog/when_opening_existing_tfchunk.cs b/src/EventStore.Core.Tests/TransactionLog/when_opening_existing_tfchunk.cs index feaa6c0c0f3..0184917d0e8 100644 --- a/src/EventStore.Core.Tests/TransactionLog/when_opening_existing_tfchunk.cs +++ b/src/EventStore.Core.Tests/TransactionLog/when_opening_existing_tfchunk.cs @@ -15,7 +15,9 @@ public override void TestFixtureSetUp() { base.TestFixtureSetUp(); _chunk = TFChunkHelper.CreateNewChunk(Filename); _chunk.Complete(); - _testChunk = TFChunk.FromCompletedFile(Filename, true, false, + _testChunk = TFChunk.FromCompletedFile( + new UnmanagedChunkCacheManager(), + Filename, true, false, Constants.TFChunkInitialReaderCountDefault, Constants.TFChunkMaxReaderCountDefault, reduceFileCachePressure: false, tracker: new TFChunkTracker.NoOp()); diff --git a/src/EventStore.Core.Tests/TransactionLog/when_opening_tfchunk_from_non_existing_file.cs b/src/EventStore.Core.Tests/TransactionLog/when_opening_tfchunk_from_non_existing_file.cs index 4604e10e2bb..73f22e007f6 100644 --- a/src/EventStore.Core.Tests/TransactionLog/when_opening_tfchunk_from_non_existing_file.cs +++ b/src/EventStore.Core.Tests/TransactionLog/when_opening_tfchunk_from_non_existing_file.cs @@ -8,7 +8,9 @@ namespace EventStore.Core.Tests.TransactionLog { public class when_opening_tfchunk_from_non_existing_file : SpecificationWithFile { [Test] public void it_should_throw_a_file_not_found_exception() { - Assert.Throws(() => TFChunk.FromCompletedFile(Filename, verifyHash: true, + Assert.Throws(() => TFChunk.FromCompletedFile( + new UnmanagedChunkCacheManager(), + Filename, verifyHash: true, unbufferedRead: false, initialReaderCount: Constants.TFChunkInitialReaderCountDefault, maxReaderCount: Constants.TFChunkMaxReaderCountDefault, reduceFileCachePressure: false, tracker: new TFChunkTracker.NoOp())); } } diff --git a/src/EventStore.Core.Tests/TransactionLog/when_reading_from_a_cached_tfchunk.cs b/src/EventStore.Core.Tests/TransactionLog/when_reading_from_a_cached_tfchunk.cs index a7164447673..517188a475b 100644 --- a/src/EventStore.Core.Tests/TransactionLog/when_reading_from_a_cached_tfchunk.cs +++ b/src/EventStore.Core.Tests/TransactionLog/when_reading_from_a_cached_tfchunk.cs @@ -29,7 +29,9 @@ public override void TestFixtureSetUp() { _result = _chunk.TryAppend(_record); _chunk.Flush(); _chunk.Complete(); - _cachedChunk = TFChunk.FromCompletedFile(Filename, verifyHash: true, unbufferedRead: false, + _cachedChunk = TFChunk.FromCompletedFile( + new UnmanagedChunkCacheManager(), + Filename, verifyHash: true, unbufferedRead: false, initialReaderCount: Constants.TFChunkInitialReaderCountDefault, maxReaderCount: Constants.TFChunkMaxReaderCountDefault, reduceFileCachePressure: false, tracker: new TFChunkTracker.NoOp()); _cachedChunk.CacheInMemory(); } diff --git a/src/EventStore.Core.Tests/TransactionLog/when_uncaching_a_tfchunk.cs b/src/EventStore.Core.Tests/TransactionLog/when_uncaching_a_tfchunk.cs index 24d8ed146bd..b7d03d415cb 100644 --- a/src/EventStore.Core.Tests/TransactionLog/when_uncaching_a_tfchunk.cs +++ b/src/EventStore.Core.Tests/TransactionLog/when_uncaching_a_tfchunk.cs @@ -29,7 +29,9 @@ public override void TestFixtureSetUp() { _result = _chunk.TryAppend(_record); _chunk.Flush(); _chunk.Complete(); - _uncachedChunk = TFChunk.FromCompletedFile(Filename, verifyHash: true, unbufferedRead: false, + _uncachedChunk = TFChunk.FromCompletedFile( + new UnmanagedChunkCacheManager(), + Filename, verifyHash: true, unbufferedRead: false, initialReaderCount: Constants.TFChunkInitialReaderCountDefault, maxReaderCount: Constants.TFChunkMaxReaderCountDefault, reduceFileCachePressure: false, tracker: new TFChunkTracker.NoOp()); _uncachedChunk.CacheInMemory(); _uncachedChunk.UnCacheFromMemory(); diff --git a/src/EventStore.Core.Tests/options.cs b/src/EventStore.Core.Tests/options.cs index 794ef680e6d..3d458545d28 100644 --- a/src/EventStore.Core.Tests/options.cs +++ b/src/EventStore.Core.Tests/options.cs @@ -62,7 +62,7 @@ public void all_keys_are_read_from_configuration() { nameof(ClusterVNodeOptions.TrustedRootCertificates), nameof(ClusterVNodeOptions.IndexBitnessVersion), nameof(ClusterVNodeOptions.Cluster.QuorumSize), - nameof(ClusterVNodeOptions.Database.ChunkSize), + nameof(ClusterVNodeOptions.Database.ChunkSize), //qq do we want to exclude this nameof(ClusterVNodeOptions.Database.StatsStorage), nameof(ClusterVNodeOptions.Unknown.Options), }; diff --git a/src/EventStore.Core.XUnit.Tests/Scavenge/Infrastructure/Scenario.cs b/src/EventStore.Core.XUnit.Tests/Scavenge/Infrastructure/Scenario.cs index a82db9413d6..d867c95bb21 100644 --- a/src/EventStore.Core.XUnit.Tests/Scavenge/Infrastructure/Scenario.cs +++ b/src/EventStore.Core.XUnit.Tests/Scavenge/Infrastructure/Scenario.cs @@ -705,6 +705,7 @@ private void EmptyRequestedChunks(TFChunkDb db) { chunkId: Guid.NewGuid()); var newChunk = TFChunk.CreateWithHeader( + new UnmanagedChunkCacheManager(), filename: $"{chunk.FileName}.tmp", header: newChunkHeader, fileSize: ChunkHeader.Size, diff --git a/src/EventStore.Core.XUnit.Tests/TransactionLog/Chunks/PoolingChunkCacheManagerTests.cs b/src/EventStore.Core.XUnit.Tests/TransactionLog/Chunks/PoolingChunkCacheManagerTests.cs new file mode 100644 index 00000000000..c866f331e66 --- /dev/null +++ b/src/EventStore.Core.XUnit.Tests/TransactionLog/Chunks/PoolingChunkCacheManagerTests.cs @@ -0,0 +1,97 @@ +using System.Collections.Generic; +using EventStore.Core.TransactionLog.Chunks; +using Xunit; + +namespace EventStore.Core.XUnit.Tests.TransactionLog.Chunks; + +public class PoolingChunkCacheManagerTests { + readonly FakeInnerManager _recorder = new(); + + private PoolingChunkCacheManager CreateSut(int initialBuffers) => + new(_recorder, minBufferSize: 100, cleanBuffers: false, initialBuffers: initialBuffers); + + [Fact] + public void can_allocate_multiple_buffers() { + var sut = CreateSut(initialBuffers: 0); + sut.AllocateAtLeast(99); + sut.AllocateAtLeast(100); + sut.AllocateAtLeast(101); + Assert.Equal( + [ + "Allocated buffer 0 with 100 bytes", // respects minimum + "Allocated buffer 1 with 100 bytes", + "Allocated buffer 2 with 101 bytes", // can allocate larger + ], + _recorder.GetNewMessages()); + } + + [Fact] + public void can_reuse_buffers() { + var sut = CreateSut(initialBuffers: 0); + var p0 = sut.AllocateAtLeast(100); + sut.Free(p0); + + sut.AllocateAtLeast(101); + sut.AllocateAtLeast(100); // reuse p1 + sut.AllocateAtLeast(101); + + Assert.Equal( + [ + "Allocated buffer 0 with 100 bytes", + "Allocated buffer 1 with 101 bytes", + "Allocated buffer 2 with 101 bytes", + ], + _recorder.GetNewMessages()); + } + + [Fact] + public void can_reuse_smaller_buffers() { + var sut = CreateSut(initialBuffers: 0); + var p0 = sut.AllocateAtLeast(90); + sut.Free(p0); + + sut.AllocateAtLeast(95); + + Assert.Equal(["Allocated buffer 0 with 100 bytes"], _recorder.GetNewMessages()); + } + + [Fact] + public void can_create_initial_buffers() { + var sut = CreateSut(initialBuffers: 3); + + Assert.Equal( + [ + "Allocated buffer 0 with 100 bytes", + "Allocated buffer 1 with 100 bytes", + "Allocated buffer 2 with 100 bytes", + ], + _recorder.GetNewMessages()); + + sut.AllocateAtLeast(100); + sut.AllocateAtLeast(100); + sut.AllocateAtLeast(100); + + Assert.Equal([], _recorder.GetNewMessages()); + } + + class FakeInnerManager : IChunkCacheManager { + readonly List _messages = []; + nint _nextBuffer; + + public string[] GetNewMessages() { + var newMessages = _messages.ToArray(); + _messages.Clear(); + return newMessages; + } + + public nint AllocateAtLeast(int numBytes) { + var p = _nextBuffer++; + _messages.Add($"Allocated buffer {p} with {numBytes} bytes"); + return p; + } + + public void Free(nint pointer) { + _messages.Add($"Freed buffer {pointer}"); + } + } +} diff --git a/src/EventStore.Core/ClusterVNode.cs b/src/EventStore.Core/ClusterVNode.cs index 5e07bc7bf77..f048e88a215 100644 --- a/src/EventStore.Core/ClusterVNode.cs +++ b/src/EventStore.Core/ClusterVNode.cs @@ -306,7 +306,36 @@ public ClusterVNode(ClusterVNodeOptions options, metricsConfiguration ??= new(); MetricsBootstrapper.Bootstrap(metricsConfiguration, dbConfig, trackers); - Db = new TFChunkDb(dbConfig, tracker: trackers.TransactionFileTracker); + + + IChunkCacheManager cacheManager = new UnmanagedChunkCacheManager(); + + if (options.Database.ChunkCachePooling) { + var chunkTotalSize = dbConfig.ChunkSize + ChunkHeader.Size + ChunkFooter.Size; + var cacheSizeBytes = dbConfig.MaxChunksCacheSize.RoundUpToMultipleOf(chunkTotalSize); + var cacheSizePhysicalChunks = cacheSizeBytes / chunkTotalSize; + var initialBuffers = cacheSizePhysicalChunks + 2; + + var allowanceForPosMap = 16 * 1024 * 1024; //qq hand wave. 6% increase in chunk cache memory usage acceptable + + Log.Information($"#### Chunk cache pooling is enabled"); + Log.Information($"#### Pre-allocating {initialBuffers} initialBuffers based on chunk cache size of {cacheSizePhysicalChunks} chunks."); + + cacheManager = new PoolingChunkCacheManager( + inner: cacheManager, + minBufferSize: chunkTotalSize + allowanceForPosMap, + cleanBuffers: true, + initialBuffers: (int)initialBuffers); + } else { + Log.Information($"#### Chunk cache pooling is disabled"); + } + + + + Db = new TFChunkDb( + dbConfig, + cacheManager: cacheManager, + tracker: trackers.TransactionFileTracker); TFChunkDbConfig CreateDbConfig( out SystemStatsHelper statsHelper, diff --git a/src/EventStore.Core/ClusterVNodeOptions.cs b/src/EventStore.Core/ClusterVNodeOptions.cs index 6220cbe3bf5..909e5ca3683 100644 --- a/src/EventStore.Core/ClusterVNodeOptions.cs +++ b/src/EventStore.Core/ClusterVNodeOptions.cs @@ -415,6 +415,10 @@ public record ClusterOptions { [Description("Database Options")] public record DatabaseOptions { + //qq + [Description("Enabled Chunk Cache Buffer Pooling (experimental)")] + public bool ChunkCachePooling { get; init; } = true; + [Description("The minimum flush delay in milliseconds.")] public double MinFlushDelayMs { get; init; } = TFConsts.MinFlushDelayMs.TotalMilliseconds; @@ -579,6 +583,7 @@ public static int GetTFChunkMaxReaderCount(int readerThreadsCount, int chunkInit internal static DatabaseOptions FromConfiguration(IConfigurationRoot configurationRoot) => new() { + ChunkCachePooling = configurationRoot.GetValue(nameof(ChunkCachePooling)), MinFlushDelayMs = configurationRoot.GetValue(nameof(MinFlushDelayMs)), DisableScavengeMerging = configurationRoot.GetValue(nameof(DisableScavengeMerging)), MemDb = configurationRoot.GetValue(nameof(MemDb)), diff --git a/src/EventStore.Core/Services/RedactionService.cs b/src/EventStore.Core/Services/RedactionService.cs index 67863964107..1fbd8e0ce8f 100644 --- a/src/EventStore.Core/Services/RedactionService.cs +++ b/src/EventStore.Core/Services/RedactionService.cs @@ -244,6 +244,7 @@ private bool IsValidSwitchChunkRequest(string targetChunkFile, string newChunkFi try { // temporarily open the chunk to verify its integrity newChunk = TFChunk.FromCompletedFile( + _db.Manager.ChunkCacheManager, filename: newChunkPath, verifyHash: true, unbufferedRead: _db.Config.Unbuffered, diff --git a/src/EventStore.Core/TransactionLog/Chunks/ChunkCacheManager.cs b/src/EventStore.Core/TransactionLog/Chunks/ChunkCacheManager.cs new file mode 100644 index 00000000000..c7fb4570aef --- /dev/null +++ b/src/EventStore.Core/TransactionLog/Chunks/ChunkCacheManager.cs @@ -0,0 +1,117 @@ +using System; +using System.Collections.Generic; +using System.Runtime.InteropServices; +using Serilog; + +namespace EventStore.Core.TransactionLog.Chunks; + +public interface IChunkCacheManager { + public nint AllocateAtLeast(int numBytes); + public void Free(nint pointer); +} + +public class UnmanagedChunkCacheManager : IChunkCacheManager { + private readonly object _lock = new(); + + // the actual lengths of all the buffers + private readonly Dictionary _bufferLengths = []; + +//qq public static UnmanagedChunkCacheManager Instance { get; } = new UnmanagedChunkCacheManager(); + + public nint AllocateAtLeast(int numBytes) { + lock (_lock) { + var pointer = Marshal.AllocHGlobal(numBytes); + GC.AddMemoryPressure(numBytes); + _bufferLengths[pointer] = numBytes; + return pointer; + } + } + + public void Free(nint pointer) { + lock (_lock) { + if (!_bufferLengths.Remove(pointer, out var bufferLength)) + throw new InvalidOperationException($"Attempted to free unknown buffer {pointer}"); + + Marshal.FreeHGlobal(pointer); + GC.RemoveMemoryPressure(bufferLength); + } + } +} + +// https://github.com/dotnet/runtime/blob/5535e31a712343a63f5d7d796cd874e563e5ac14/src/libraries/System.Private.CoreLib/src/System/Runtime/InteropServices/Marshal.Unix.cs +// https://github.com/dotnet/runtime/blob/5535e31a712343a63f5d7d796cd874e563e5ac14/src/libraries/System.Private.CoreLib/src/System/Runtime/InteropServices/Marshal.Windows.cs + +// This naive POC allocates whatever it needs and doesn't free anything. +public class PoolingChunkCacheManager : IChunkCacheManager { + private static readonly ILogger Log = Serilog.Log.ForContext(); + + private readonly object _lock = new(); + private readonly IChunkCacheManager _inner; + private readonly int _minBufferSize; + private readonly bool _cleanBuffers; + private readonly Dictionary _bufferLengths = []; // the actual lengths of all the buffers + private readonly Stack _freeBuffers = []; + + public PoolingChunkCacheManager( + IChunkCacheManager inner, + int minBufferSize, + bool cleanBuffers, + int initialBuffers) { + + _inner = inner; + _minBufferSize = minBufferSize; + _cleanBuffers = cleanBuffers; + + var buffers = new nint[initialBuffers]; + + for (var i = 0; i < initialBuffers; i++) + buffers[i] = AllocateAtLeast(minBufferSize); + + for (var i = 0; i < initialBuffers; i++) + Free(buffers[i]); + } + + // This reuses a free buffer if it can, otherwise allocates a new one. + public nint AllocateAtLeast(int requestedBytes) { + lock (_lock) { + var reason = ""; + + if (_freeBuffers.TryPeek(out var peeked)) { + if (_bufferLengths[peeked] >= requestedBytes) { + // found a buffer we can use. + _freeBuffers.Pop(); + Log.Information($"#### Saved allocating a buffer for the chunk cache by reusing"); + return peeked; + + } else { + reason = $"available buffer was too small ({_bufferLengths[peeked]:N0})"; + } + + } else { + reason = $"no buffers are available"; + } + + var actualBytesToAllocate = Math.Max(_minBufferSize, requestedBytes); + Log.Warning($"#### Allocating a new buffer of size {actualBytesToAllocate:N0} bytes for the chunk cache because {reason}"); + var p = _inner.AllocateAtLeast(actualBytesToAllocate); + _bufferLengths[p] = actualBytesToAllocate; + return p; + } + } + + unsafe public void Free(nint pointer) { + lock (_lock) { + if (!_bufferLengths.TryGetValue(pointer, out var bufferLength)) + throw new InvalidOperationException("Attempted to free unknown buffer"); + + Log.Information($"#### Returned a chunk cache buffer to the pool"); + + if (_cleanBuffers) { + var span = new Span((void*)pointer, bufferLength); + span.Clear(); + } + + _freeBuffers.Push(pointer); + } + } +} diff --git a/src/EventStore.Core/TransactionLog/Chunks/TFChunk/TFChunk.cs b/src/EventStore.Core/TransactionLog/Chunks/TFChunk/TFChunk.cs index 60bbe9e74c1..bb1b5f61701 100644 --- a/src/EventStore.Core/TransactionLog/Chunks/TFChunk/TFChunk.cs +++ b/src/EventStore.Core/TransactionLog/Chunks/TFChunk/TFChunk.cs @@ -131,10 +131,13 @@ private enum CacheStatus { private bool _unbuffered; private bool _writeThrough; private readonly bool _reduceFileCachePressure; + private readonly IChunkCacheManager _cacheManager; private IChunkReadSide _readSide; - private TFChunk(string filename, + private TFChunk( + IChunkCacheManager cacheManager, + string filename, int initialReaderCount, int maxReaderCount, int midpointsDepth, @@ -150,6 +153,7 @@ private TFChunk(string filename, "initialReaderCount is greater than maxReaderCount."); Ensure.Nonnegative(midpointsDepth, "midpointsDepth"); + _cacheManager = cacheManager; _filename = filename; _internalStreamsCount = initialReaderCount; _maxReaderCount = maxReaderCount; @@ -164,9 +168,9 @@ private TFChunk(string filename, FreeCachedData(); } - public static TFChunk FromCompletedFile(string filename, bool verifyHash, bool unbufferedRead, + public static TFChunk FromCompletedFile(IChunkCacheManager cacheManager, string filename, bool verifyHash, bool unbufferedRead, int initialReaderCount, int maxReaderCount, ITransactionFileTracker tracker, bool optimizeReadSideCache = false, bool reduceFileCachePressure = false) { - var chunk = new TFChunk(filename, initialReaderCount, maxReaderCount, + var chunk = new TFChunk(cacheManager, filename, initialReaderCount, maxReaderCount, TFConsts.MidpointsDepth, false, unbufferedRead, false, reduceFileCachePressure); try { chunk.InitCompleted(verifyHash, optimizeReadSideCache, tracker); @@ -178,9 +182,9 @@ public static TFChunk FromCompletedFile(string filename, bool verifyHash, bool u return chunk; } - public static TFChunk FromOngoingFile(string filename, int writePosition, bool checkSize, bool unbuffered, + public static TFChunk FromOngoingFile(IChunkCacheManager cacheManager, string filename, int writePosition, bool checkSize, bool unbuffered, bool writethrough, int initialReaderCount, int maxReaderCount, bool reduceFileCachePressure, ITransactionFileTracker tracker) { - var chunk = new TFChunk(filename, + var chunk = new TFChunk(cacheManager, filename, initialReaderCount, maxReaderCount, TFConsts.MidpointsDepth, @@ -197,7 +201,9 @@ public static TFChunk FromOngoingFile(string filename, int writePosition, bool c return chunk; } - public static TFChunk CreateNew(string filename, + public static TFChunk CreateNew( + IChunkCacheManager cacheManager, + string filename, int chunkSize, int chunkStartNumber, int chunkEndNumber, @@ -212,11 +218,13 @@ public static TFChunk CreateNew(string filename, var size = GetAlignedSize(chunkSize + ChunkHeader.Size + ChunkFooter.Size); var chunkHeader = new ChunkHeader(CurrentChunkVersion, chunkSize, chunkStartNumber, chunkEndNumber, isScavenged, Guid.NewGuid()); - return CreateWithHeader(filename, chunkHeader, size, inMem, unbuffered, writethrough, initialReaderCount, maxReaderCount, + return CreateWithHeader(cacheManager, filename, chunkHeader, size, inMem, unbuffered, writethrough, initialReaderCount, maxReaderCount, reduceFileCachePressure, tracker); } - public static TFChunk CreateWithHeader(string filename, + public static TFChunk CreateWithHeader( + IChunkCacheManager cacheManager, + string filename, ChunkHeader header, int fileSize, bool inMem, @@ -226,7 +234,7 @@ public static TFChunk CreateWithHeader(string filename, int maxReaderCount, bool reduceFileCachePressure, ITransactionFileTracker tracker) { - var chunk = new TFChunk(filename, + var chunk = new TFChunk(cacheManager, filename, initialReaderCount, maxReaderCount, TFConsts.MidpointsDepth, @@ -415,8 +423,7 @@ private void CreateInMemChunk(ChunkHeader chunkHeader, int fileSize) { // ALLOCATE MEM _cacheStatus = CacheStatus.Cached; _cachedLength = fileSize; - _cachedData = Marshal.AllocHGlobal(_cachedLength); - GC.AddMemoryPressure(_cachedLength); + _cachedData = _cacheManager.AllocateAtLeast(_cachedLength); // WRITER STREAM var memStream = @@ -696,8 +703,7 @@ private void BuildCacheArray() { var dataSize = IsReadOnly ? _physicalDataSize + ChunkFooter.MapSize : _chunkHeader.ChunkSize; _cachedLength = GetAlignedSize(ChunkHeader.Size + dataSize + ChunkFooter.Size); - var cachedData = Marshal.AllocHGlobal(_cachedLength); - GC.AddMemoryPressure(_cachedLength); + var cachedData = _cacheManager.AllocateAtLeast(_cachedLength); try { using (var unmanagedStream = new UnmanagedMemoryStream((byte*)cachedData, _cachedLength, @@ -715,8 +721,7 @@ private void BuildCacheArray() { } } } catch { - Marshal.FreeHGlobal(cachedData); - GC.RemoveMemoryPressure(_cachedLength); + _cacheManager.Free(cachedData); throw; } @@ -1068,8 +1073,7 @@ private void FreeCachedData() { lock (_cachedDataLock) { var cachedData = _cachedData; if (cachedData != IntPtr.Zero) { - Marshal.FreeHGlobal(cachedData); - GC.RemoveMemoryPressure(_cachedLength); + _cacheManager.Free(cachedData); _cachedData = IntPtr.Zero; _cacheStatus = CacheStatus.Uncached; Log.Debug("UNCACHED TFChunk {chunk}.", this); diff --git a/src/EventStore.Core/TransactionLog/Chunks/TFChunkDb.cs b/src/EventStore.Core/TransactionLog/Chunks/TFChunkDb.cs index 8333c1f515f..099ee0e65a1 100644 --- a/src/EventStore.Core/TransactionLog/Chunks/TFChunkDb.cs +++ b/src/EventStore.Core/TransactionLog/Chunks/TFChunkDb.cs @@ -16,12 +16,13 @@ public class TFChunkDb : IDisposable { private readonly ITransactionFileTracker _tracker; private int _closed; - public TFChunkDb(TFChunkDbConfig config, ITransactionFileTracker tracker = null, ILogger log = null) { + public TFChunkDb(TFChunkDbConfig config, IChunkCacheManager cacheManager = null, ITransactionFileTracker tracker = null, ILogger log = null) { Ensure.NotNull(config, "config"); Config = config; _tracker = tracker ?? new TFChunkTracker.NoOp(); - Manager = new TFChunkManager(Config, _tracker); + cacheManager ??= new UnmanagedChunkCacheManager(); + Manager = new TFChunkManager(Config, cacheManager, _tracker); _log = log ?? Serilog.Log.ForContext(); } @@ -72,7 +73,9 @@ public void Open(bool verifyHash = true, bool readOnly = false, int threads = 1) // but the actual last chunk is (lastChunkNum-1) one and it could be not completed yet -- perfectly valid situation. var footer = ReadChunkFooter(chunkInfo.ChunkFileName); if (footer.IsCompleted) - chunk = TFChunk.TFChunk.FromCompletedFile(chunkInfo.ChunkFileName, verifyHash: false, + chunk = TFChunk.TFChunk.FromCompletedFile( + Manager.ChunkCacheManager, + chunkInfo.ChunkFileName, verifyHash: false, unbufferedRead: Config.Unbuffered, initialReaderCount: Config.InitialReaderCount, maxReaderCount: Config.MaxReaderCount, @@ -80,7 +83,9 @@ public void Open(bool verifyHash = true, bool readOnly = false, int threads = 1) optimizeReadSideCache: Config.OptimizeReadSideCache, reduceFileCachePressure: Config.ReduceFileCachePressure); else { - chunk = TFChunk.TFChunk.FromOngoingFile(chunkInfo.ChunkFileName, Config.ChunkSize, + chunk = TFChunk.TFChunk.FromOngoingFile( + Manager.ChunkCacheManager, + chunkInfo.ChunkFileName, Config.ChunkSize, checkSize: false, unbuffered: Config.Unbuffered, writethrough: Config.WriteThrough, initialReaderCount: Config.InitialReaderCount, @@ -92,7 +97,9 @@ public void Open(bool verifyHash = true, bool readOnly = false, int threads = 1) chunk.Complete(); } } else { - chunk = TFChunk.TFChunk.FromCompletedFile(chunkInfo.ChunkFileName, verifyHash: false, + chunk = TFChunk.TFChunk.FromCompletedFile( + Manager.ChunkCacheManager, + chunkInfo.ChunkFileName, verifyHash: false, unbufferedRead: Config.Unbuffered, initialReaderCount: Config.InitialReaderCount, maxReaderCount: Config.MaxReaderCount, @@ -121,7 +128,9 @@ public void Open(bool verifyHash = true, bool readOnly = false, int threads = 1) var chunkHeader = ReadChunkHeader(chunkFileName); var chunkLocalPos = chunkHeader.GetLocalLogPosition(checkpoint); if (chunkHeader.IsScavenged) { - var lastChunk = TFChunk.TFChunk.FromCompletedFile(chunkFileName, verifyHash: false, + var lastChunk = TFChunk.TFChunk.FromCompletedFile( + Manager.ChunkCacheManager, + chunkFileName, verifyHash: false, unbufferedRead: Config.Unbuffered, initialReaderCount: Config.InitialReaderCount, maxReaderCount: Config.MaxReaderCount, @@ -148,7 +157,9 @@ public void Open(bool verifyHash = true, bool readOnly = false, int threads = 1) Manager.AddNewChunk(); } } else { - var lastChunk = TFChunk.TFChunk.FromOngoingFile(chunkFileName, (int)chunkLocalPos, checkSize: false, + var lastChunk = TFChunk.TFChunk.FromOngoingFile( + Manager.ChunkCacheManager, + chunkFileName, (int)chunkLocalPos, checkSize: false, unbuffered: Config.Unbuffered, writethrough: Config.WriteThrough, initialReaderCount: Config.InitialReaderCount, maxReaderCount: Config.MaxReaderCount, diff --git a/src/EventStore.Core/TransactionLog/Chunks/TFChunkManager.cs b/src/EventStore.Core/TransactionLog/Chunks/TFChunkManager.cs index 21a1018e587..671ae742cb2 100644 --- a/src/EventStore.Core/TransactionLog/Chunks/TFChunkManager.cs +++ b/src/EventStore.Core/TransactionLog/Chunks/TFChunkManager.cs @@ -28,12 +28,15 @@ public int ChunksCount { private int _backgroundPassesRemaining; private int _backgroundRunning; - public TFChunkManager(TFChunkDbConfig config, ITransactionFileTracker tracker) { + public TFChunkManager(TFChunkDbConfig config, IChunkCacheManager cacheManager, ITransactionFileTracker tracker) { Ensure.NotNull(config, "config"); _config = config; _tracker = tracker; + ChunkCacheManager = cacheManager; } + public IChunkCacheManager ChunkCacheManager { get; } + public void EnableCaching() { if (_chunksCount == 0) throw new Exception("No chunks in DB."); @@ -58,15 +61,19 @@ private void BackgroundCachingProcess(object state) { private void CacheUncacheReadOnlyChunks() { int lastChunkToCache; lock (_chunksLocker) { + //qq this used cache physical chunks of different sizes until the chunk cache is full + // now we are using fixed size buffers so we treat every physical chunk as the same size. + // if we had a large chunk cache combined with lots of data in the latest chunks being removed by scavenge + // and also no chunk merging, then quite a lot of space could be wasted. in later versions with dotnext + // it may be sensible to allocate smaller segments and assemble them into a stream via readonlysequence + // probably better if this behaviour was determined by the IChunkCacheManager long totalSize = 0; lastChunkToCache = _chunksCount; for (int chunkNum = _chunksCount - 1; chunkNum >= 0;) { var chunk = _chunks[chunkNum]; - var chunkSize = chunk.IsReadOnly - ? chunk.ChunkFooter.PhysicalDataSize + chunk.ChunkFooter.MapSize + ChunkHeader.Size + - ChunkFooter.Size - : chunk.ChunkHeader.ChunkSize + ChunkHeader.Size + ChunkFooter.Size; + var chunkSize = + chunk.ChunkHeader.ChunkSize + ChunkHeader.Size + ChunkFooter.Size; if (totalSize + chunkSize > _config.MaxChunksCacheSize) break; @@ -95,7 +102,9 @@ private void CacheUncacheReadOnlyChunks() { public TFChunk.TFChunk CreateTempChunk(ChunkHeader chunkHeader, int fileSize) { var chunkFileName = _config.FileNamingStrategy.GetTempFilename(); - return TFChunk.TFChunk.CreateWithHeader(chunkFileName, + return TFChunk.TFChunk.CreateWithHeader( + ChunkCacheManager, + chunkFileName, chunkHeader, fileSize, _config.InMemDb, @@ -111,7 +120,9 @@ public TFChunk.TFChunk AddNewChunk() { lock (_chunksLocker) { var chunkNumber = _chunksCount; var chunkName = _config.FileNamingStrategy.GetFilenameFor(chunkNumber, 0); - var chunk = TFChunk.TFChunk.CreateNew(chunkName, + var chunk = TFChunk.TFChunk.CreateNew( + ChunkCacheManager, + chunkName, _config.ChunkSize, chunkNumber, chunkNumber, @@ -139,7 +150,9 @@ public TFChunk.TFChunk AddNewChunk(ChunkHeader chunkHeader, int fileSize) { chunkHeader.ChunkStartNumber, chunkHeader.ChunkEndNumber, _chunksCount)); var chunkName = _config.FileNamingStrategy.GetFilenameFor(chunkHeader.ChunkStartNumber, 0); - var chunk = TFChunk.TFChunk.CreateWithHeader(chunkName, + var chunk = TFChunk.TFChunk.CreateWithHeader( + ChunkCacheManager, + chunkName, chunkHeader, fileSize, _config.InMemDb, @@ -204,7 +217,9 @@ public TFChunk.TFChunk SwitchChunk(TFChunk.TFChunk chunk, bool verifyHash, throw; } - newChunk = TFChunk.TFChunk.FromCompletedFile(newFileName, verifyHash, _config.Unbuffered, + newChunk = TFChunk.TFChunk.FromCompletedFile( + ChunkCacheManager, + newFileName, verifyHash, _config.Unbuffered, _config.InitialReaderCount, _config.MaxReaderCount, _tracker, _config.OptimizeReadSideCache, _config.ReduceFileCachePressure ); } diff --git a/src/EventStore.Core/TransactionLog/Chunks/TFChunkScavenger.cs b/src/EventStore.Core/TransactionLog/Chunks/TFChunkScavenger.cs index 2430155e306..35ffd20a877 100644 --- a/src/EventStore.Core/TransactionLog/Chunks/TFChunkScavenger.cs +++ b/src/EventStore.Core/TransactionLog/Chunks/TFChunkScavenger.cs @@ -185,7 +185,9 @@ private void ScavengeChunk(bool alwaysKeepScavenged, TFChunk.TFChunk oldChunk, TFChunk.TFChunk newChunk; try { - newChunk = TFChunk.TFChunk.CreateNew(tmpChunkPath, + newChunk = TFChunk.TFChunk.CreateNew( + _db.Manager.ChunkCacheManager, + tmpChunkPath, _db.Config.ChunkSize, chunkStartNumber, chunkEndNumber, @@ -421,7 +423,9 @@ private static bool MergeChunks( TFChunk.TFChunk newChunk; try { - newChunk = TFChunk.TFChunk.CreateNew(tmpChunkPath, + newChunk = TFChunk.TFChunk.CreateNew( + db.Manager.ChunkCacheManager, + tmpChunkPath, db.Config.ChunkSize, chunkStartNumber, chunkEndNumber, diff --git a/src/EventStore.Core/TransactionLog/Scavenging/DbAccess/ChunkManagerForExecutor.cs b/src/EventStore.Core/TransactionLog/Scavenging/DbAccess/ChunkManagerForExecutor.cs index 6d4120f3110..da32c02cbf2 100644 --- a/src/EventStore.Core/TransactionLog/Scavenging/DbAccess/ChunkManagerForExecutor.cs +++ b/src/EventStore.Core/TransactionLog/Scavenging/DbAccess/ChunkManagerForExecutor.cs @@ -16,6 +16,7 @@ public ChunkManagerForExecutor(ILogger logger, TFChunkManager manager, TFChunkDb _dbConfig = dbConfig; } + public IChunkCacheManager ChunkCacheManager => _manager.ChunkCacheManager; public IChunkWriterForExecutor CreateChunkWriter( IChunkReaderForExecutor sourceChunk) { diff --git a/src/EventStore.Core/TransactionLog/Scavenging/DbAccess/ChunkWriterForExecutor.cs b/src/EventStore.Core/TransactionLog/Scavenging/DbAccess/ChunkWriterForExecutor.cs index 222d6279149..3d61f1d07f3 100644 --- a/src/EventStore.Core/TransactionLog/Scavenging/DbAccess/ChunkWriterForExecutor.cs +++ b/src/EventStore.Core/TransactionLog/Scavenging/DbAccess/ChunkWriterForExecutor.cs @@ -34,6 +34,7 @@ public ChunkWriterForExecutor( // from TFChunkScavenger.ScavengeChunk FileName = Path.Combine(dbConfig.Path, Guid.NewGuid() + ".scavenge.tmp"); _outputChunk = TFChunk.CreateNew( + cacheManager: manager.ChunkCacheManager, filename: FileName, chunkSize: dbConfig.ChunkSize, chunkStartNumber: sourceChunk.ChunkStartNumber,