From 82814828e0dec48dd91e680fdfd24cfd1faa829a Mon Sep 17 00:00:00 2001 From: Gulin7 Date: Wed, 27 Nov 2024 14:50:34 +0200 Subject: [PATCH] Used async enumerators --- src/MemState.EventStore/EventStoreReader.cs | 4 ++-- src/Memstate.Core/EngineBuilder.cs | 4 ++-- src/Memstate.Core/IJournalReader.cs | 2 +- src/Memstate.Core/NullJournalReader.cs | 2 +- src/Memstate.Core/Storage/FileJournalReader.cs | 5 +++-- 5 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/MemState.EventStore/EventStoreReader.cs b/src/MemState.EventStore/EventStoreReader.cs index e6a908e..9dfdd46 100644 --- a/src/MemState.EventStore/EventStoreReader.cs +++ b/src/MemState.EventStore/EventStoreReader.cs @@ -37,7 +37,7 @@ public Task DisposeAsync() return Task.CompletedTask; } - public IEnumerable GetRecords(long fromRecord = 0) + public async IAsyncEnumerable GetRecords(long fromRecord = 0) { var nextRecord = fromRecord; @@ -45,7 +45,7 @@ public IEnumerable GetRecords(long fromRecord = 0) while (true) { - var slice = _connection.ReadStreamEventsForwardAsync(_streamName, nextRecord, _eventsPerSlice, false).Result; + var slice = await _connection.ReadStreamEventsForwardAsync(_streamName, nextRecord, _eventsPerSlice, false); _logger.Debug("{0} events in slice from {0}", slice.Events.Length, slice.FromEventNumber); diff --git a/src/Memstate.Core/EngineBuilder.cs b/src/Memstate.Core/EngineBuilder.cs index 8bf6199..878a14a 100644 --- a/src/Memstate.Core/EngineBuilder.cs +++ b/src/Memstate.Core/EngineBuilder.cs @@ -53,10 +53,10 @@ public async Task> Build(T initialState) where T : class return new Engine(_settings, model, subscriptionSource, writer, nextRecordNumber); } - internal static TState Load(IJournalReader reader, TState initial, out long lastRecordNumber) + internal static async Task Load(IJournalReader reader, TState initial, out long lastRecordNumber) { lastRecordNumber = -1; - foreach (var journalRecord in reader.GetRecords()) + await foreach (var journalRecord in reader.GetRecords()) { try { diff --git a/src/Memstate.Core/IJournalReader.cs b/src/Memstate.Core/IJournalReader.cs index bfc440e..8f69164 100644 --- a/src/Memstate.Core/IJournalReader.cs +++ b/src/Memstate.Core/IJournalReader.cs @@ -4,6 +4,6 @@ namespace Memstate { public interface IJournalReader : IAsyncDisposable { - IEnumerable GetRecords(long fromRecord = 0); + IAsyncEnumerable GetRecords(long fromRecord = 0); } } \ No newline at end of file diff --git a/src/Memstate.Core/NullJournalReader.cs b/src/Memstate.Core/NullJournalReader.cs index c7b6f3f..15f27bb 100644 --- a/src/Memstate.Core/NullJournalReader.cs +++ b/src/Memstate.Core/NullJournalReader.cs @@ -10,7 +10,7 @@ public Task DisposeAsync() return Task.CompletedTask; } - public IEnumerable GetRecords(long fromRecord = 0) + public async IAsyncEnumerable GetRecords(long fromRecord = 0) { yield break; } diff --git a/src/Memstate.Core/Storage/FileJournalReader.cs b/src/Memstate.Core/Storage/FileJournalReader.cs index 8e6889b..d3e36dc 100644 --- a/src/Memstate.Core/Storage/FileJournalReader.cs +++ b/src/Memstate.Core/Storage/FileJournalReader.cs @@ -25,9 +25,10 @@ public Task DisposeAsync() return Task.Run((Action) _journalStream.Dispose); } - public IEnumerable GetRecords(long fromRecord = 0) + public async IAsyncEnumerable GetRecords(long fromRecord = 0) { - foreach (var record in _serializer.ReadObjects(_journalStream)) + var records = await Task.Run(() => _serializer.ReadObjects(_journalStream)); + await foreach (var record in records) { if (record.RecordNumber >= fromRecord) {