Skip to content

wip: defer initialization of completed chunks #4731

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 119 additions & 33 deletions src/EventStore.Core/TransactionLog/Chunks/TFChunk/TFChunk.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public enum ChunkVersions : byte {
private static readonly ILogger Log = Serilog.Log.ForContext<TFChunk>();

public bool IsReadOnly {
//qq EnsureInitializedAsync
get { return Interlocked.CompareExchange(ref _isReadOnly, 0, 0) == 1; }
set { Interlocked.Exchange(ref _isReadOnly, value ? 1 : 0); }
}
Expand All @@ -52,15 +53,18 @@ public bool IsCached {
}

// if inMem, _handle is null but not remote
//qq EnsureInitializedAsync
public bool IsRemote => _handle is not null and not ChunkFileHandle;

// the logical size of (untransformed) data (could be > PhysicalDataSize if scavenged chunk)
public long LogicalDataSize {
//qq EnsureInitializedAsync
get { return Interlocked.Read(ref _logicalDataSize); }
}

// the physical size of (untransformed) data
public int PhysicalDataSize {
//qq EnsureInitializedAsync
get { return _physicalDataSize; }
}

Expand All @@ -69,18 +73,22 @@ public string FileName {
}

public int FileSize {
//qq EnsureInitializedAsync
get { return _fileSize; }
}

public ChunkHeader ChunkHeader {
//qq EnsureInitializedAsync
get { return _chunkHeader; }
}

public ChunkFooter ChunkFooter {
//qq EnsureInitializedAsync
get { return _chunkFooter; }
}

public ChunkInfo ChunkInfo {
//qq EnsureInitializedAsync
get => new() {
ChunkFileName = _filename,
ChunkStartNumber = _chunkHeader.ChunkStartNumber,
Expand All @@ -92,13 +100,15 @@ public ChunkInfo ChunkInfo {
}

public ReadOnlyMemory<byte> TransformHeader {
//qq EnsureInitializedAsync
get { return _transformHeader; }
}

private readonly int _midpointsDepth;

public int RawWriterPosition {
get {
// no need for EnsureInitializedAsync because only completed chunks are initialized asynchronously
return (int)(_writerWorkItem?.WorkingStream.Position
?? throw new InvalidOperationException(string.Format("TFChunk {0} is not in write mode.", _filename)));
}
Expand Down Expand Up @@ -173,12 +183,17 @@ private enum CacheStatus {
private IChunkTransform _transform;
private ReadOnlyMemory<byte> _transformHeader;

private int _initialized;
private Func<TFChunk, CancellationToken, ValueTask> _initialize;

private TFChunk(string filename,
int midpointsDepth,
bool inMem,
bool unbuffered,
bool writethrough,
bool reduceFileCachePressure) {
bool reduceFileCachePressure,
Func<TFChunk, CancellationToken, ValueTask> initialize) {

Ensure.NotNullOrEmpty(filename, "filename");
Ensure.Nonnegative(midpointsDepth, "midpointsDepth");

Expand All @@ -188,6 +203,7 @@ private TFChunk(string filename,
_unbuffered = unbuffered;
_writeThrough = writethrough;
_reduceFileCachePressure = reduceFileCachePressure;
_initialize = initialize;
_memStreams = new();
_fileStreams = new();

Expand All @@ -202,20 +218,23 @@ private TFChunk(string filename,
}

// local or remote
public static async ValueTask<TFChunk> FromCompletedFile(IBlobFileSystem fileSystem, string filename, bool verifyHash, bool unbufferedRead,
public static ValueTask<TFChunk> FromCompletedFile(IBlobFileSystem fileSystem, string filename, bool verifyHash, bool unbufferedRead,
ITransactionFileTracker tracker, Func<TransformType, IChunkTransformFactory> getTransformFactory,
bool reduceFileCachePressure = false, CancellationToken token = default) {

var chunk = new TFChunk(filename,
TFConsts.MidpointsDepth, false, unbufferedRead, false, reduceFileCachePressure);
try {
await chunk.InitCompleted(fileSystem, verifyHash, tracker, getTransformFactory, token);
} catch {
chunk.Dispose();
throw;
//qq do we want any different behaviour here between local and remote completed chunks?
var chunk = new TFChunk(filename,
TFConsts.MidpointsDepth, false, unbufferedRead, false, reduceFileCachePressure,
initialize: (chunk, token) =>
chunk.InitCompleted(fileSystem, verifyHash, tracker, getTransformFactory, token)) {
IsReadOnly = true
};

return new(chunk);
} catch (Exception ex) {
return ValueTask.FromException<TFChunk>(ex);
}

return chunk;
}

// always local
Expand All @@ -228,7 +247,8 @@ public static async ValueTask<TFChunk> FromOngoingFile(string filename, int writ
false,
unbuffered,
writethrough,
reduceFileCachePressure);
reduceFileCachePressure,
initialize: static (_, _) => ValueTask.CompletedTask);
try {
await chunk.InitOngoing(writePosition, tracker, getTransformFactory, token);
} catch {
Expand Down Expand Up @@ -285,7 +305,8 @@ public static async ValueTask<TFChunk> CreateWithHeader(string filename,
inMem,
unbuffered,
writethrough,
reduceFileCachePressure);
reduceFileCachePressure,
initialize: static (_, _) => ValueTask.CompletedTask);
try {
await chunk.InitNew(header, fileSize, tracker, transformFactory, transformHeader, token);
} catch {
Expand All @@ -301,8 +322,6 @@ private async ValueTask InitCompleted(IBlobFileSystem fileSystem, bool verifyHas
_handle = await fileSystem.OpenForReadAsync(_filename, _reduceFileCachePressure, token);
_fileSize = (int)_handle.Length;

IsReadOnly = true;

await using (var stream = _handle.CreateStream()) {
_chunkHeader = await ReadHeader(stream, token);
Log.Debug("Opened completed {chunk} as version {version} (min. compatible version: {minCompatibleVersion})", _filename, _chunkHeader.Version, _chunkHeader.MinCompatibleVersion);
Expand Down Expand Up @@ -340,7 +359,42 @@ private async ValueTask InitCompleted(IBlobFileSystem fileSystem, bool verifyHas
_readSide.RequestCaching();

if (verifyHash)
await VerifyFileHash(token);
await VerifyFileHash(token); //qq do not do this for remote
}

//qq important properties that we may want specific tests for
// initialization is not triggered on startup, only on demand for this chunk (or we'd have loads of them)
// initialization can fail, but it will be retried next time the chunk is used.
// if one thread is running initialization, another triggering thread will wait for it
// and not run its own initialization
// if initialization has succeeded we don't run it any more
private async ValueTask EnsureInitializedAsync(CancellationToken token) {
if (_initialized is 0) {
await _cachedDataLock.AcquireAsync(token);
try {
if (_initialized is 0) {
// no one else is initializing, so we can.
//qq we probably don't want to cancel initialization, but we might want to
// stop waiting for it.
//qq make sure that init can indeed be retried.
//qq do we really need the AsTask or am i doing something wrong
await Initialize(CancellationToken.None).AsTask().WaitAsync(token);
}
} catch (Exception ex) {
Log.Warning(ex, "Failed to initialize chunk {Chunk}", _filename);
throw;
} finally {
_cachedDataLock.Release();
}
} else {
// fast common case
}

async ValueTask Initialize(CancellationToken token) {
await _initialize(this, CancellationToken.None);
_initialized = 1;
_initialize = null;
}
}

private async ValueTask InitNew(ChunkHeader chunkHeader, int fileSize, ITransactionFileTracker tracker,
Expand Down Expand Up @@ -589,6 +643,8 @@ public async ValueTask VerifyFileHash(CancellationToken token) {
if (!IsReadOnly)
throw new InvalidOperationException("You can't verify hash of not-completed TFChunk.");

await EnsureInitializedAsync(token);

Log.Debug("Verifying hash for TFChunk '{chunk}'...", _filename);
using var reader = AcquireRawReader();
reader.Stream.Seek(0, SeekOrigin.Begin);
Expand Down Expand Up @@ -649,13 +705,16 @@ private static long GetDataPosition(WriterWorkItem workItem) {
// this method takes (b) and returns (d)
public async ValueTask<long> GetActualRawPosition(long logicalPosition, CancellationToken token) {
ArgumentOutOfRangeException.ThrowIfNegative(logicalPosition);
await EnsureInitializedAsync(token);

var actualPosition = await _readSide.GetActualPosition(logicalPosition, token);

return actualPosition < 0 ? -1 : GetRawPosition(actualPosition);
}

public async ValueTask CacheInMemory(CancellationToken token) {
await EnsureInitializedAsync(token);

if (_inMem)
return;

Expand Down Expand Up @@ -797,29 +856,44 @@ public void UnCacheFromMemory() {
}
}

public ValueTask<bool> ExistsAt(long logicalPosition, CancellationToken token)
=> _readSide.ExistsAt(logicalPosition, token);
public async ValueTask<bool> ExistsAt(long logicalPosition, CancellationToken token) {
await EnsureInitializedAsync(token);
return await _readSide.ExistsAt(logicalPosition, token);
}

public ValueTask<RecordReadResult> TryReadAt(long logicalPosition, bool couldBeScavenged, CancellationToken token)
=> _readSide.TryReadAt(logicalPosition, couldBeScavenged, token);
public async ValueTask<RecordReadResult> TryReadAt(long logicalPosition, bool couldBeScavenged, CancellationToken token) {
await EnsureInitializedAsync(token);
return await _readSide.TryReadAt(logicalPosition, couldBeScavenged, token);
}

public ValueTask<RecordReadResult> TryReadFirst(CancellationToken token)
=> _readSide.TryReadFirst(token);
public async ValueTask<RecordReadResult> TryReadFirst(CancellationToken token) {
await EnsureInitializedAsync(token);
return await _readSide.TryReadFirst(token);
}

public ValueTask<RecordReadResult> TryReadClosestForward(long logicalPosition, CancellationToken token)
=> _readSide.TryReadClosestForward(logicalPosition, token);
public async ValueTask<RecordReadResult> TryReadClosestForward(long logicalPosition, CancellationToken token) {
await EnsureInitializedAsync(token);
return await _readSide.TryReadClosestForward(logicalPosition, token);
}

public ValueTask<RawReadResult> TryReadClosestForwardRaw(long logicalPosition, Func<int, byte[]> getBuffer,
CancellationToken token)
=> _readSide.TryReadClosestForwardRaw(logicalPosition, getBuffer, token);
public async ValueTask<RawReadResult> TryReadClosestForwardRaw(long logicalPosition, Func<int, byte[]> getBuffer,
CancellationToken token) {
await EnsureInitializedAsync(token);
return await _readSide.TryReadClosestForwardRaw(logicalPosition, getBuffer, token);
}

public ValueTask<RecordReadResult> TryReadLast(CancellationToken token)
=> _readSide.TryReadLast(token);
public async ValueTask<RecordReadResult> TryReadLast(CancellationToken token) {
await EnsureInitializedAsync(token);
return await _readSide.TryReadLast(token);
}

public ValueTask<RecordReadResult> TryReadClosestBackward(long logicalPosition, CancellationToken token)
=> _readSide.TryReadClosestBackward(logicalPosition, token);
public async ValueTask<RecordReadResult> TryReadClosestBackward(long logicalPosition, CancellationToken token) {
await EnsureInitializedAsync(token);
return await _readSide.TryReadClosestBackward(logicalPosition, token);
}

public async ValueTask<RecordWriteResult> TryAppend(ILogRecord record, CancellationToken token) {
// no need for EnsureInitializedAsync because only completed chunks are initialized asynchronously
if (IsReadOnly)
throw new InvalidOperationException("Cannot write to a read-only block.");

Expand Down Expand Up @@ -865,15 +939,18 @@ static MemoryOwner<byte> SerializeLogRecord(ILogRecord record, out int recordLen
}

public async ValueTask<bool> TryAppendRawData(ReadOnlyMemory<byte> buffer, CancellationToken token) {
// no need for EnsureInitializedAsync because only completed chunks are initialized asynchronously
var workItem = _writerWorkItem;
if (workItem.WorkingStream.Position + buffer.Length > workItem.WorkingStream.Length)
return false;
await workItem.AppendData(buffer, token);
return true;
}

public ValueTask Flush(CancellationToken token)
=> IsReadOnly ? ValueTask.CompletedTask : _writerWorkItem.FlushToDisk(token);
public ValueTask Flush(CancellationToken token) {
// no need for EnsureInitializedAsync because only completed chunks are initialized asynchronously
return IsReadOnly ? ValueTask.CompletedTask : _writerWorkItem.FlushToDisk(token);
}

public ValueTask Complete(CancellationToken token) {
return ChunkHeader.IsScavenged
Expand All @@ -890,6 +967,8 @@ public ValueTask CompleteScavenge(IReadOnlyCollection<PosMap> mapping, Cancellat
}

private async ValueTask CompleteNonRaw(IReadOnlyCollection<PosMap> mapping, CancellationToken token) {
await EnsureInitializedAsync(token);

if (IsReadOnly)
throw new InvalidOperationException("Cannot complete a read-only TFChunk.");

Expand All @@ -907,6 +986,8 @@ private async ValueTask CompleteNonRaw(IReadOnlyCollection<PosMap> mapping, Canc
}

public async ValueTask CompleteRaw(CancellationToken token) {
await EnsureInitializedAsync(token);

if (IsReadOnly)
throw new InvalidOperationException("Cannot complete a read-only TFChunk.");
if (_writerWorkItem.WorkingStream.Position != _writerWorkItem.WorkingStream.Length)
Expand Down Expand Up @@ -1002,6 +1083,7 @@ static int WriteMapping(Span<byte> buffer, IReadOnlyCollection<PosMap> mapping)
public void Dispose() => TryClose();

public bool TryClose() {
//qq EnsureInitializedAsync
_selfdestructin54321 = true;

Thread.MemoryBarrier();
Expand All @@ -1014,6 +1096,7 @@ public bool TryClose() {
}

public void MarkForDeletion() {
//qq EnsureInitializedAsync
_selfdestructin54321 = true;
_deleteFile = true;

Expand Down Expand Up @@ -1248,13 +1331,15 @@ private void ReturnReaderWorkItem(ReaderWorkItem item) {
}

public TFChunkBulkReader AcquireDataReader() {
//qq EnsureInitializedAsync
if (TryAcquireBulkMemReader(raw: false, out var reader))
return reader;

return AcquireFileReader(raw: false);
}

public TFChunkBulkReader AcquireRawReader() {
//qq EnsureInitializedAsync
if (TryAcquireBulkMemReader(raw: true, out var reader))
return reader;

Expand Down Expand Up @@ -1387,11 +1472,12 @@ public void ReleaseReader(TFChunkBulkReader reader) {
}

public override string ToString() {
//qq EnsureInitializedAsync
return string.Format("#{0}-{1} ({2})", _chunkHeader.ChunkStartNumber, _chunkHeader.ChunkEndNumber,
Path.GetFileName(_filename));
}

private struct Midpoint {
private readonly struct Midpoint {
public readonly int ItemIndex;
public readonly long LogPos;

Expand Down
Loading