From 13e4b42deb573e96707e671928a9c4ca65a38133 Mon Sep 17 00:00:00 2001 From: Stefan Cuypers Date: Sun, 22 Jun 2025 13:18:08 +0200 Subject: [PATCH 01/13] (#377) Fix for possible concurrent query during pull --- .../Offline/Operations/PullOperationManager.cs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/CommunityToolkit.Datasync.Client/Offline/Operations/PullOperationManager.cs b/src/CommunityToolkit.Datasync.Client/Offline/Operations/PullOperationManager.cs index 8754969..2257831 100644 --- a/src/CommunityToolkit.Datasync.Client/Offline/Operations/PullOperationManager.cs +++ b/src/CommunityToolkit.Datasync.Client/Offline/Operations/PullOperationManager.cs @@ -141,10 +141,18 @@ public async Task ExecuteAsync(IEnumerable requests, Pu } }); + // Get requests we need to enqueue. Note : do not enqueue them yet. Context only supports one outstanding query at a time and we don't want a query from a background task being run concurrently with GetDeltaTokenAsync. + List requestsToEnqueue = []; foreach (PullRequest request in requests) { DateTimeOffset lastSynchronization = await context.DeltaTokenStore.GetDeltaTokenAsync(request.QueryId, cancellationToken).ConfigureAwait(false); request.QueryDescription = PrepareQueryDescription(request.QueryDescription, lastSynchronization); + requestsToEnqueue.Add(request); + } + + // Now enqueue the requests. + foreach (PullRequest request in requestsToEnqueue) + { serviceRequestQueue.Enqueue(request); } From 1d02110d15f521e6ae3f7c8a061d81f278967932 Mon Sep 17 00:00:00 2001 From: Stefan Cuypers Date: Thu, 26 Jun 2025 10:54:53 +0200 Subject: [PATCH 02/13] (#384) Added SynchronizationProgress event --- .../Offline/OfflineDbContext.cs | 15 ++++++ .../Operations/PullOperationManager.cs | 30 ++++++++++- .../Offline/SynchronizationEventArgs.cs | 54 +++++++++++++++++++ 3 files changed, 97 insertions(+), 2 deletions(-) create mode 100644 src/CommunityToolkit.Datasync.Client/Offline/SynchronizationEventArgs.cs diff --git a/src/CommunityToolkit.Datasync.Client/Offline/OfflineDbContext.cs b/src/CommunityToolkit.Datasync.Client/Offline/OfflineDbContext.cs index 6e9b7cc..a217ea0 100644 --- a/src/CommunityToolkit.Datasync.Client/Offline/OfflineDbContext.cs +++ b/src/CommunityToolkit.Datasync.Client/Offline/OfflineDbContext.cs @@ -99,6 +99,12 @@ public abstract partial class OfflineDbContext : DbContext /// internal OperationsQueueManager QueueManager { get; } + /// + /// An event delegate that allows the app to monitor synchronization events. + /// + /// This event can be called from background threads. + public event EventHandler? SynchronizationProgress; + /// /// Initializes a new instance of the class. The /// method will be called to @@ -561,6 +567,15 @@ public async Task SaveChangesAsync(bool acceptAllChangesOnSuccess, bool add return await base.SaveChangesAsync(acceptAllChangesOnSuccess, cancellationToken).ConfigureAwait(false); } + /// + /// Sends a synchronization event to the consumers. + /// + /// The event arguments. + internal void SendSynchronizationEvent(SynchronizationEventArgs eventArgs) + { + SynchronizationProgress?.Invoke(this, eventArgs); + } + #region IDisposable /// /// Ensure that the context has not been disposed. diff --git a/src/CommunityToolkit.Datasync.Client/Offline/Operations/PullOperationManager.cs b/src/CommunityToolkit.Datasync.Client/Offline/Operations/PullOperationManager.cs index 2257831..9ee1894 100644 --- a/src/CommunityToolkit.Datasync.Client/Offline/Operations/PullOperationManager.cs +++ b/src/CommunityToolkit.Datasync.Client/Offline/Operations/PullOperationManager.cs @@ -105,6 +105,15 @@ public async Task ExecuteAsync(IEnumerable requests, Pu } } + context.SendSynchronizationEvent(new SynchronizationEventArgs() + { + EventType = SynchronizationEventType.ItemsCommitted, + EntityType = pullResponse.EntityType, + ItemsProcessed = pullResponse.TotalItemsProcessed, + TotalNrItems = pullResponse.TotalRequestItems, + QueryId = pullResponse.QueryId + }); + if (pullOptions.SaveAfterEveryServiceRequest) { _ = await context.SaveChangesAsync(true, false, cancellationToken).ConfigureAwait(false); @@ -120,10 +129,22 @@ public async Task ExecuteAsync(IEnumerable requests, Pu try { bool completed = false; + long itemsProcessed = 0; do { Page page = await GetPageAsync(pullRequest.HttpClient, requestUri, pageType, cancellationToken).ConfigureAwait(false); - databaseUpdateQueue.Enqueue(new PullResponse(pullRequest.EntityType, pullRequest.QueryId, page.Items)); + itemsProcessed += page.Items.Count(); + + context.SendSynchronizationEvent(new SynchronizationEventArgs() + { + EventType = SynchronizationEventType.ItemsFetched, + EntityType = pullRequest.EntityType, + ItemsProcessed = itemsProcessed, + TotalNrItems = page.Count ?? 0, + QueryId = pullRequest.QueryId + }); + + databaseUpdateQueue.Enqueue(new PullResponse(pullRequest.EntityType, pullRequest.QueryId, page.Items, page.Count ?? 0, itemsProcessed)); if (!string.IsNullOrEmpty(page.NextLink)) { requestUri = new UriBuilder(endpoint) { Query = page.NextLink }.Uri; @@ -173,6 +194,8 @@ public async Task ExecuteAsync(IEnumerable requests, Pu /// Thrown on error internal async Task> GetPageAsync(HttpClient client, Uri requestUri, Type pageType, CancellationToken cancellationToken = default) { + PropertyInfo countPropInfo = pageType.GetProperty("Count") + ?? throw new DatasyncException($"Page type '{pageType.Name}' does not have a 'Count' property"); PropertyInfo itemsPropInfo = pageType.GetProperty("Items") ?? throw new DatasyncException($"Page type '{pageType.Name}' does not have an 'Items' property"); PropertyInfo nextLinkPropInfo = pageType.GetProperty("NextLink") @@ -193,6 +216,7 @@ internal async Task> GetPageAsync(HttpClient client, Uri requestUri return new Page() { + Count = (long?)countPropInfo.GetValue(result), Items = (IEnumerable)itemsPropInfo.GetValue(result)!, NextLink = (string?)nextLinkPropInfo.GetValue(result) }; @@ -237,6 +261,8 @@ internal static QueryDescription PrepareQueryDescription(QueryDescription source /// The type of entity contained within the items. /// The query ID for the request. /// The list of items to process. + /// The total number of items in the current pull request. + /// The total number of items processed, included. [ExcludeFromCodeCoverage] - internal record PullResponse(Type EntityType, string QueryId, IEnumerable Items); + internal record PullResponse(Type EntityType, string QueryId, IEnumerable Items, long TotalRequestItems, long TotalItemsProcessed); } diff --git a/src/CommunityToolkit.Datasync.Client/Offline/SynchronizationEventArgs.cs b/src/CommunityToolkit.Datasync.Client/Offline/SynchronizationEventArgs.cs new file mode 100644 index 0000000..490e643 --- /dev/null +++ b/src/CommunityToolkit.Datasync.Client/Offline/SynchronizationEventArgs.cs @@ -0,0 +1,54 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +namespace CommunityToolkit.Datasync.Client.Offline; + +/// +/// The list of synchronization events that we support. +/// +public enum SynchronizationEventType +{ + /// + /// Occurs when items have been successfully fetches from the server. + /// + /// This event is raised after a page of entities was succesfully fetched from the server, ready to be commited to the data store. + ItemsFetched, + + /// + /// Occurs when items have been successfully committed to the underlying data store. + /// + /// This event is raised after a page of entities was succesfully commited to the database + ItemsCommitted, +} + +/// +/// The event arguments sent when a synchronization event occurs. +/// +public class SynchronizationEventArgs +{ + /// + /// The type of event. + /// + public required SynchronizationEventType EventType { get; init; } + + /// + /// The EntityType that is being processed. + /// + public required Type EntityType { get; init; } + + /// + /// When pulling records, the number of items that have been processed in the current pull request. + /// + public long ItemsProcessed { get; init; } = -1; + + /// + /// The total number of items in the current pull request. + /// + public long TotalNrItems { get; init; } + + /// + /// The query ID that is being processed + /// + public required string QueryId { get; init; } +} From d4e996d0998e8ef4b221b3e34548a3c3e2ae746b Mon Sep 17 00:00:00 2001 From: Stefan Cuypers Date: Sun, 29 Jun 2025 10:34:05 +0200 Subject: [PATCH 03/13] (#384) Added test scenario for SynchronizationProgress event (OfflineDbContext_Tests.SynchronizationProgress_Event_Works) --- .../Offline/OfflineDbContext_Tests.cs | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/tests/CommunityToolkit.Datasync.Client.Test/Offline/OfflineDbContext_Tests.cs b/tests/CommunityToolkit.Datasync.Client.Test/Offline/OfflineDbContext_Tests.cs index 7e85ad6..c6864bb 100644 --- a/tests/CommunityToolkit.Datasync.Client.Test/Offline/OfflineDbContext_Tests.cs +++ b/tests/CommunityToolkit.Datasync.Client.Test/Offline/OfflineDbContext_Tests.cs @@ -1433,6 +1433,59 @@ public async Task DbSet_PushAsync_Throws_OnNonOfflineDbContext() } #endregion + #region SynchronizationProgress + [Fact] + public async Task SynchronizationProgress_Event_Works() + { + Page page1 = CreatePage(5, 20, "$skip=5"); + Page page2 = CreatePage(5, 20, "$skip=10"); + Page page3 = CreatePage(5, 20, "$skip=15"); + Page page4 = CreatePage(5, 20); + + this.context.Handler.AddResponse(HttpStatusCode.OK, page1); + this.context.Handler.AddResponse(HttpStatusCode.OK, page2); + this.context.Handler.AddResponse(HttpStatusCode.OK, page3); + this.context.Handler.AddResponse(HttpStatusCode.OK, page4); + + bool eventFiredForFetch = true; + bool eventFiredForCommit = true; + long currentItemsFetched = 0; + long currentItemsCommited = 0; + + this.context.SynchronizationProgress += (sender, args) => + { + sender.Should().Be(this.context); + args.EntityType.Should().Be(); + args.QueryId.Should().Be("CommunityToolkit.Datasync.TestCommon.Databases.ClientMovie"); + args.TotalNrItems.Should().Be(20); + switch(args.EventType) + { + case SynchronizationEventType.ItemsFetched: + currentItemsFetched += 5; + args.ItemsProcessed.Should().Be(currentItemsFetched); + eventFiredForFetch = true; + break; + case SynchronizationEventType.ItemsCommitted: + currentItemsCommited += 5; + args.ItemsProcessed.Should().Be(currentItemsCommited); + eventFiredForCommit = true; + break; + default: + Assert.Fail($"Invalid event type: {args.EventType}"); + break; + } + }; + + await this.context.Movies.PullAsync(); + + eventFiredForFetch.Should().BeTrue(); + eventFiredForCommit.Should().BeTrue(); + currentItemsFetched.Should().Be(20); + currentItemsCommited.Should().Be(20); + } + + #endregion + public class NotOfflineDbContext : DbContext { public NotOfflineDbContext() : base() From e232d8c354c48087ef71658959f99786bdfe4e7a Mon Sep 17 00:00:00 2001 From: Stefan Cuypers Date: Mon, 30 Jun 2025 08:52:34 +0200 Subject: [PATCH 04/13] (#384) Added SynchronizationEventType PullStarted and PullEnded. Added Exception and ServiceResponse to SynchronizationEventArgs. Fixed bug in SynchronizationProgress_Event_Works test. Initialization of eventFired was wrong. SynchronizationProgress_Event_Works added tests for start and end. --- .../Operations/PullOperationManager.cs | 135 +++++++++++------- .../Offline/SynchronizationEventArgs.cs | 20 +++ .../Offline/OfflineDbContext_Tests.cs | 25 +++- 3 files changed, 124 insertions(+), 56 deletions(-) diff --git a/src/CommunityToolkit.Datasync.Client/Offline/Operations/PullOperationManager.cs b/src/CommunityToolkit.Datasync.Client/Offline/Operations/PullOperationManager.cs index 9ee1894..ebb623d 100644 --- a/src/CommunityToolkit.Datasync.Client/Offline/Operations/PullOperationManager.cs +++ b/src/CommunityToolkit.Datasync.Client/Offline/Operations/PullOperationManager.cs @@ -14,6 +14,7 @@ using System.Reflection; using System.Text.Json; using System.Text.Json.Serialization; +using static CommunityToolkit.Datasync.Client.Offline.Operations.PullOperationManager; namespace CommunityToolkit.Datasync.Client.Offline.Operations; @@ -53,70 +54,87 @@ public async Task ExecuteAsync(IEnumerable requests, Pu QueueHandler databaseUpdateQueue = new(1, async pullResponse => { - DateTimeOffset lastSynchronization = await DeltaTokenStore.GetDeltaTokenAsync(pullResponse.QueryId, cancellationToken).ConfigureAwait(false); - foreach (object item in pullResponse.Items) + if (pullResponse.Items.Any()) { - EntityMetadata metadata = EntityResolver.GetEntityMetadata(item, pullResponse.EntityType); - object? originalEntity = await context.FindAsync(pullResponse.EntityType, [metadata.Id], cancellationToken).ConfigureAwait(false); - - if (originalEntity is null && !metadata.Deleted) - { - _ = context.Add(item); - result.IncrementAdditions(); - } - else if (originalEntity is not null && metadata.Deleted) + DateTimeOffset lastSynchronization = await DeltaTokenStore.GetDeltaTokenAsync(pullResponse.QueryId, cancellationToken).ConfigureAwait(false); + foreach (object item in pullResponse.Items) { - _ = context.Remove(originalEntity); - result.IncrementDeletions(); - } - else if (originalEntity is not null && !metadata.Deleted) - { - // Gather properties marked with [JsonIgnore] - HashSet ignoredProps = pullResponse.EntityType - .GetProperties(BindingFlags.Public | BindingFlags.Instance) - .Where(p => p.IsDefined(typeof(JsonIgnoreAttribute), inherit: true)) - .Select(p => p.Name) - .ToHashSet(); - - EntityEntry originalEntry = context.Entry(originalEntity); - EntityEntry newEntry = context.Entry(item); + EntityMetadata metadata = EntityResolver.GetEntityMetadata(item, pullResponse.EntityType); + object? originalEntity = await context.FindAsync(pullResponse.EntityType, [metadata.Id], cancellationToken).ConfigureAwait(false); - // Only copy properties that are not marked with [JsonIgnore] - foreach (IProperty property in originalEntry.Metadata.GetProperties()) + if (originalEntity is null && !metadata.Deleted) { - if (!ignoredProps.Contains(property.Name)) + _ = context.Add(item); + result.IncrementAdditions(); + } + else if (originalEntity is not null && metadata.Deleted) + { + _ = context.Remove(originalEntity); + result.IncrementDeletions(); + } + else if (originalEntity is not null && !metadata.Deleted) + { + // Gather properties marked with [JsonIgnore] + HashSet ignoredProps = pullResponse.EntityType + .GetProperties(BindingFlags.Public | BindingFlags.Instance) + .Where(p => p.IsDefined(typeof(JsonIgnoreAttribute), inherit: true)) + .Select(p => p.Name) + .ToHashSet(); + + EntityEntry originalEntry = context.Entry(originalEntity); + EntityEntry newEntry = context.Entry(item); + + // Only copy properties that are not marked with [JsonIgnore] + foreach (IProperty property in originalEntry.Metadata.GetProperties()) { - originalEntry.Property(property.Name).CurrentValue = newEntry.Property(property.Name).CurrentValue; + if (!ignoredProps.Contains(property.Name)) + { + originalEntry.Property(property.Name).CurrentValue = newEntry.Property(property.Name).CurrentValue; + } } + + result.IncrementReplacements(); } - result.IncrementReplacements(); + if (metadata.UpdatedAt > lastSynchronization) + { + lastSynchronization = metadata.UpdatedAt.Value; + bool isAdded = await DeltaTokenStore.SetDeltaTokenAsync(pullResponse.QueryId, metadata.UpdatedAt.Value, cancellationToken).ConfigureAwait(false); + if (isAdded) + { + // Sqlite oddity - you can't add then update; it changes the change type to UPDATE, which then fails. + _ = await context.SaveChangesAsync(true, false, cancellationToken).ConfigureAwait(false); + } + } } - if (metadata.UpdatedAt > lastSynchronization) + if (pullOptions.SaveAfterEveryServiceRequest) { - lastSynchronization = metadata.UpdatedAt.Value; - bool isAdded = await DeltaTokenStore.SetDeltaTokenAsync(pullResponse.QueryId, metadata.UpdatedAt.Value, cancellationToken).ConfigureAwait(false); - if (isAdded) - { - // Sqlite oddity - you can't add then update; it changes the change type to UPDATE, which then fails. - _ = await context.SaveChangesAsync(true, false, cancellationToken).ConfigureAwait(false); - } + _ = await context.SaveChangesAsync(true, false, cancellationToken).ConfigureAwait(false); } - } - context.SendSynchronizationEvent(new SynchronizationEventArgs() - { - EventType = SynchronizationEventType.ItemsCommitted, - EntityType = pullResponse.EntityType, - ItemsProcessed = pullResponse.TotalItemsProcessed, - TotalNrItems = pullResponse.TotalRequestItems, - QueryId = pullResponse.QueryId - }); + context.SendSynchronizationEvent(new SynchronizationEventArgs() + { + EventType = SynchronizationEventType.ItemsCommitted, + EntityType = pullResponse.EntityType, + ItemsProcessed = pullResponse.TotalItemsProcessed, + TotalNrItems = pullResponse.TotalRequestItems, + QueryId = pullResponse.QueryId + }); + } - if (pullOptions.SaveAfterEveryServiceRequest) + if (pullResponse.Completed) { - _ = await context.SaveChangesAsync(true, false, cancellationToken).ConfigureAwait(false); + context.SendSynchronizationEvent(new SynchronizationEventArgs() + { + EventType = SynchronizationEventType.PullEnded, + EntityType = pullResponse.EntityType, + ItemsProcessed = pullResponse.TotalItemsProcessed, + TotalNrItems = pullResponse.TotalRequestItems, + QueryId = pullResponse.QueryId, + Exception = pullResponse.Exception, + ServiceResponse = pullResponse.Exception is DatasyncPullException ex ? ex.ServiceResponse : null + }); } }); @@ -125,15 +143,24 @@ public async Task ExecuteAsync(IEnumerable requests, Pu Uri endpoint = ExecutableOperation.MakeAbsoluteUri(pullRequest.HttpClient.BaseAddress, pullRequest.Endpoint); Uri requestUri = new UriBuilder(endpoint) { Query = pullRequest.QueryDescription.ToODataQueryString() }.Uri; Type pageType = typeof(Page<>).MakeGenericType(pullRequest.EntityType); + long itemsProcessed = 0; + long totalCount = 0; try { bool completed = false; - long itemsProcessed = 0; + // Signal we started the pull operation. + context.SendSynchronizationEvent(new SynchronizationEventArgs() + { + EventType = SynchronizationEventType.PullStarted, + EntityType = pullRequest.EntityType, + QueryId = pullRequest.QueryId + }); do { Page page = await GetPageAsync(pullRequest.HttpClient, requestUri, pageType, cancellationToken).ConfigureAwait(false); itemsProcessed += page.Items.Count(); + totalCount = page.Count ?? totalCount; context.SendSynchronizationEvent(new SynchronizationEventArgs() { @@ -144,7 +171,6 @@ public async Task ExecuteAsync(IEnumerable requests, Pu QueryId = pullRequest.QueryId }); - databaseUpdateQueue.Enqueue(new PullResponse(pullRequest.EntityType, pullRequest.QueryId, page.Items, page.Count ?? 0, itemsProcessed)); if (!string.IsNullOrEmpty(page.NextLink)) { requestUri = new UriBuilder(endpoint) { Query = page.NextLink }.Uri; @@ -153,12 +179,15 @@ public async Task ExecuteAsync(IEnumerable requests, Pu { completed = true; } + + databaseUpdateQueue.Enqueue(new PullResponse(pullRequest.EntityType, pullRequest.QueryId, page.Items, totalCount, itemsProcessed, completed)); } while (!completed); } catch (DatasyncPullException ex) { result.AddFailedRequest(requestUri, ex.ServiceResponse); + databaseUpdateQueue.Enqueue(new PullResponse(pullRequest.EntityType, pullRequest.QueryId, Enumerable.Empty(), totalCount, itemsProcessed, true, ex)); } }); @@ -263,6 +292,8 @@ internal static QueryDescription PrepareQueryDescription(QueryDescription source /// The list of items to process. /// The total number of items in the current pull request. /// The total number of items processed, included. + /// If true, indicates that the pull request is completed. + /// Indicates an exception occured during fetching of data [ExcludeFromCodeCoverage] - internal record PullResponse(Type EntityType, string QueryId, IEnumerable Items, long TotalRequestItems, long TotalItemsProcessed); + internal record PullResponse(Type EntityType, string QueryId, IEnumerable Items, long TotalRequestItems, long TotalItemsProcessed, bool Completed, Exception? Exception = null); } diff --git a/src/CommunityToolkit.Datasync.Client/Offline/SynchronizationEventArgs.cs b/src/CommunityToolkit.Datasync.Client/Offline/SynchronizationEventArgs.cs index 490e643..7d892da 100644 --- a/src/CommunityToolkit.Datasync.Client/Offline/SynchronizationEventArgs.cs +++ b/src/CommunityToolkit.Datasync.Client/Offline/SynchronizationEventArgs.cs @@ -9,6 +9,11 @@ namespace CommunityToolkit.Datasync.Client.Offline; /// public enum SynchronizationEventType { + /// + /// Pull for the given entity starts. + /// + /// is not yet known here + PullStarted, /// /// Occurs when items have been successfully fetches from the server. /// @@ -20,6 +25,11 @@ public enum SynchronizationEventType /// /// This event is raised after a page of entities was succesfully commited to the database ItemsCommitted, + + /// + /// Pull for the given entiry ended. + /// + PullEnded, } /// @@ -51,4 +61,14 @@ public class SynchronizationEventArgs /// The query ID that is being processed /// public required string QueryId { get; init; } + + /// + /// If not null on event type , indicates pull failed with this exception. + /// + public Exception? Exception { get; init; } + + /// + /// If a occured in during server call processing, this property has more detail on the server response. + /// + public ServiceResponse? ServiceResponse { get; init; } } diff --git a/tests/CommunityToolkit.Datasync.Client.Test/Offline/OfflineDbContext_Tests.cs b/tests/CommunityToolkit.Datasync.Client.Test/Offline/OfflineDbContext_Tests.cs index c6864bb..09beb38 100644 --- a/tests/CommunityToolkit.Datasync.Client.Test/Offline/OfflineDbContext_Tests.cs +++ b/tests/CommunityToolkit.Datasync.Client.Test/Offline/OfflineDbContext_Tests.cs @@ -1447,8 +1447,10 @@ public async Task SynchronizationProgress_Event_Works() this.context.Handler.AddResponse(HttpStatusCode.OK, page3); this.context.Handler.AddResponse(HttpStatusCode.OK, page4); - bool eventFiredForFetch = true; - bool eventFiredForCommit = true; + bool eventFiredForFetch = false; + bool eventFiredForCommit = false; + bool eventFiredForStart = false; + bool eventFiredForEnd = false; long currentItemsFetched = 0; long currentItemsCommited = 0; @@ -1457,19 +1459,32 @@ public async Task SynchronizationProgress_Event_Works() sender.Should().Be(this.context); args.EntityType.Should().Be(); args.QueryId.Should().Be("CommunityToolkit.Datasync.TestCommon.Databases.ClientMovie"); - args.TotalNrItems.Should().Be(20); - switch(args.EventType) + args.Exception.Should().BeNull(); // We don't test exceptions here, so should always be null. + args.ServiceResponse.Should().BeNull(); + switch (args.EventType) { case SynchronizationEventType.ItemsFetched: currentItemsFetched += 5; args.ItemsProcessed.Should().Be(currentItemsFetched); + args.TotalNrItems.Should().Be(20); eventFiredForFetch = true; break; case SynchronizationEventType.ItemsCommitted: currentItemsCommited += 5; args.ItemsProcessed.Should().Be(currentItemsCommited); + args.TotalNrItems.Should().Be(20); eventFiredForCommit = true; break; + case SynchronizationEventType.PullStarted: + eventFiredForStart.Should().BeFalse("PullStarted event should only fire once"); + eventFiredForStart = true; + break; + case SynchronizationEventType.PullEnded: + eventFiredForEnd.Should().BeFalse("PullEnded event should only fire once"); + eventFiredForEnd = true; + args.ItemsProcessed.Should().Be(20); + args.TotalNrItems.Should().Be(20); + break; default: Assert.Fail($"Invalid event type: {args.EventType}"); break; @@ -1478,8 +1493,10 @@ public async Task SynchronizationProgress_Event_Works() await this.context.Movies.PullAsync(); + eventFiredForStart.Should().BeTrue(); eventFiredForFetch.Should().BeTrue(); eventFiredForCommit.Should().BeTrue(); + eventFiredForEnd.Should().BeTrue(); currentItemsFetched.Should().Be(20); currentItemsCommited.Should().Be(20); } From d4fd2f737e23c8e353007df8a58fca219253e776 Mon Sep 17 00:00:00 2001 From: Stefan Cuypers Date: Mon, 30 Jun 2025 11:37:25 +0200 Subject: [PATCH 05/13] (#384) Added test PullAsync_List_FailedRequest_SynchronizationEventWorks Added Synchronization events for push operations and test for it. --- .../Operations/PullOperationManager.cs | 2 +- .../OperationsQueue/OperationsQueueManager.cs | 32 ++++++ .../Offline/SynchronizationEventArgs.cs | 39 +++++-- .../Offline/OfflineDbContext_Tests.cs | 107 ++++++++++++++++++ .../TestData/Movies.cs | 20 ++++ 5 files changed, 191 insertions(+), 9 deletions(-) diff --git a/src/CommunityToolkit.Datasync.Client/Offline/Operations/PullOperationManager.cs b/src/CommunityToolkit.Datasync.Client/Offline/Operations/PullOperationManager.cs index ebb623d..9b2d278 100644 --- a/src/CommunityToolkit.Datasync.Client/Offline/Operations/PullOperationManager.cs +++ b/src/CommunityToolkit.Datasync.Client/Offline/Operations/PullOperationManager.cs @@ -187,7 +187,7 @@ public async Task ExecuteAsync(IEnumerable requests, Pu catch (DatasyncPullException ex) { result.AddFailedRequest(requestUri, ex.ServiceResponse); - databaseUpdateQueue.Enqueue(new PullResponse(pullRequest.EntityType, pullRequest.QueryId, Enumerable.Empty(), totalCount, itemsProcessed, true, ex)); + databaseUpdateQueue.Enqueue(new PullResponse(pullRequest.EntityType, pullRequest.QueryId, [], totalCount, itemsProcessed, true, ex)); } }); diff --git a/src/CommunityToolkit.Datasync.Client/Offline/OperationsQueue/OperationsQueueManager.cs b/src/CommunityToolkit.Datasync.Client/Offline/OperationsQueue/OperationsQueueManager.cs index d00c8ac..1807e34 100644 --- a/src/CommunityToolkit.Datasync.Client/Offline/OperationsQueue/OperationsQueueManager.cs +++ b/src/CommunityToolkit.Datasync.Client/Offline/OperationsQueue/OperationsQueueManager.cs @@ -270,16 +270,40 @@ internal async Task PushAsync(IEnumerable entityTypes, PushOpt // Determine the list of queued operations in scope. List queuedOperations = await GetQueuedOperationsAsync(entityTypeNames, cancellationToken).ConfigureAwait(false); + + // Signal we started the push operation. + this._context.SendSynchronizationEvent(new SynchronizationEventArgs() + { + EventType = SynchronizationEventType.PushStarted, + TotalNrItems = queuedOperations.Count + }); + if (queuedOperations.Count == 0) { + // Signal we ended the push operation. + this._context.SendSynchronizationEvent(new SynchronizationEventArgs() + { + EventType = SynchronizationEventType.PushEnded + }); return pushResult; } + int nrItemsProcessed = 0; + // Push things in parallel, according to the PushOptions QueueHandler queueHandler = new(pushOptions.ParallelOperations, async operation => { ServiceResponse? response = await PushOperationAsync(operation, cancellationToken).ConfigureAwait(false); pushResult.AddOperationResult(operation, response); + // We can run on multiple threads, so use Interlocked to update the number of items processed. + int newItemsProcessed = Interlocked.Increment(ref nrItemsProcessed); + this._context.SendSynchronizationEvent(new SynchronizationEventArgs() + { + EventType = SynchronizationEventType.PushItem, + ItemsProcessed = newItemsProcessed, + TotalNrItems = queuedOperations.Count, + PushOperation = operation, + }); }); // Enqueue and process all the queued operations in scope @@ -288,6 +312,14 @@ internal async Task PushAsync(IEnumerable entityTypes, PushOpt // Save the changes, this time we don't update the queue. _ = await this._context.SaveChangesAsync(acceptAllChangesOnSuccess: true, addToQueue: false, cancellationToken).ConfigureAwait(false); + + this._context.SendSynchronizationEvent(new SynchronizationEventArgs() + { + EventType = SynchronizationEventType.PushEnded, + ItemsProcessed = nrItemsProcessed, + TotalNrItems = queuedOperations.Count, + }); + return pushResult; } diff --git a/src/CommunityToolkit.Datasync.Client/Offline/SynchronizationEventArgs.cs b/src/CommunityToolkit.Datasync.Client/Offline/SynchronizationEventArgs.cs index 7d892da..1740b06 100644 --- a/src/CommunityToolkit.Datasync.Client/Offline/SynchronizationEventArgs.cs +++ b/src/CommunityToolkit.Datasync.Client/Offline/SynchronizationEventArgs.cs @@ -30,6 +30,18 @@ public enum SynchronizationEventType /// Pull for the given entiry ended. /// PullEnded, + /// + /// Push operation started. + /// + PushStarted, + /// + /// An item was pushed to the server + /// + PushItem, + /// + /// Push operation ended. + /// + PushEnded, } /// @@ -40,35 +52,46 @@ public class SynchronizationEventArgs /// /// The type of event. /// + /// + /// On pull events, reporting occurs per entity type. With a start/stop per entity type. + /// On push events, reporting occurs per push request, which may contain multiple entity types. + /// public required SynchronizationEventType EventType { get; init; } /// - /// The EntityType that is being processed. + /// The EntityType that is being processed. Not used on push events. /// - public required Type EntityType { get; init; } + public Type? EntityType { get; init; } /// - /// When pulling records, the number of items that have been processed in the current pull request. + /// When pulling records, the number of items for the given entiry that have been processed in the current pull request. + /// When pushing records, the total number of items that have been processed in the current push request. /// public long ItemsProcessed { get; init; } = -1; /// - /// The total number of items in the current pull request. + /// When pulling records, the total number of items to pull for the given entity in the current pull request + /// When pushing records, the total number of items that are being pushed in the current push request. /// public long TotalNrItems { get; init; } /// - /// The query ID that is being processed + /// The query ID that is being processed on pull operations. Not used on push events. /// - public required string QueryId { get; init; } + public string? QueryId { get; init; } /// - /// If not null on event type , indicates pull failed with this exception. + /// If not null on event type , indicates pull failed with this exception. Currently not used on push. /// public Exception? Exception { get; init; } /// - /// If a occured in during server call processing, this property has more detail on the server response. + /// If a occured in during server call processing, this property has more detail on the server response. Currently not used on push, use the returned instead. /// public ServiceResponse? ServiceResponse { get; init; } + + /// + /// The operation that was executed. Not used on pull events. + /// + public DatasyncOperation? PushOperation { get; init; } } diff --git a/tests/CommunityToolkit.Datasync.Client.Test/Offline/OfflineDbContext_Tests.cs b/tests/CommunityToolkit.Datasync.Client.Test/Offline/OfflineDbContext_Tests.cs index 09beb38..4ca9db4 100644 --- a/tests/CommunityToolkit.Datasync.Client.Test/Offline/OfflineDbContext_Tests.cs +++ b/tests/CommunityToolkit.Datasync.Client.Test/Offline/OfflineDbContext_Tests.cs @@ -9,6 +9,7 @@ using CommunityToolkit.Datasync.Client.Test.Offline.Helpers; using CommunityToolkit.Datasync.TestCommon; using CommunityToolkit.Datasync.TestCommon.Databases; +using CommunityToolkit.Datasync.TestCommon.Models; using Microsoft.Data.Sqlite; using Microsoft.EntityFrameworkCore; using System.Net; @@ -1501,6 +1502,112 @@ public async Task SynchronizationProgress_Event_Works() currentItemsCommited.Should().Be(20); } + [Fact] + public async Task PullAsync_List_FailedRequest_SynchronizationEventWorks() + { + this.context.Handler.AddResponse(HttpStatusCode.BadRequest); + + bool eventFiredForStart = false; + bool eventFiredForEnd = false; + + this.context.SynchronizationProgress += (sender, args) => + { + sender.Should().Be(this.context); + args.EntityType.Should().Be(); + args.QueryId.Should().Be("CommunityToolkit.Datasync.TestCommon.Databases.ClientMovie"); + switch (args.EventType) + { + case SynchronizationEventType.PullStarted: + eventFiredForStart.Should().BeFalse("PullStarted event should only fire once"); + eventFiredForStart = true; + args.Exception.Should().BeNull(); + args.ServiceResponse.Should().BeNull(); + break; + case SynchronizationEventType.PullEnded: + eventFiredForEnd.Should().BeFalse("PullEnded event should only fire once"); + eventFiredForEnd = true; + args.Exception.Should().NotBeNull(); + args.Exception.Should().BeOfType(); + args.ServiceResponse.Should().NotBeNull(); + args.ServiceResponse.StatusCode.Should().Be(400); + break; + default: + Assert.Fail($"Unexpected event type: {args.EventType}"); + break; + } + }; + + PullResult pullResult = await this.context.PullAsync([typeof(ClientMovie)], new PullOptions()); + + eventFiredForStart.Should().BeTrue(); + eventFiredForEnd.Should().BeTrue(); + } + + [Fact] + public async Task SynchronizationProgress_Event_Works_For_Push() + { + // Add movies for testing + (MovieBase movie, string id)[] newMovies = + [ + (TestData.Movies.BlackPanther,Guid.NewGuid().ToString("N")), + (TestData.Movies.Dune,Guid.NewGuid().ToString("N")), + (TestData.Movies.DrNo ,Guid.NewGuid().ToString("N")), + ]; + + foreach ((MovieBase movie, string id) in newMovies) + { + this.context.Movies.Add(new(movie) { Id = id }); + ClientMovie responseMovie = new(movie) { Id = id, UpdatedAt = DateTimeOffset.UtcNow, Version = Guid.NewGuid().ToString() }; + this.context.Handler.AddResponseContent(DatasyncSerializer.Serialize(responseMovie), HttpStatusCode.Created); + this.context.SaveChanges(); + } + + bool eventFiredForItem = false; + bool eventFiredForStart = false; + bool eventFiredForEnd = false; + int[] itemsProcessedReported = new int[newMovies.Length]; // Due to multithreading, we can't guarantee the order of items processed. So register arrival of each separately. + + this.context.SynchronizationProgress += (sender, args) => + { + sender.Should().Be(this.context); + args.Exception.Should().BeNull(); + args.ServiceResponse.Should().BeNull(); + args.TotalNrItems.Should().Be(newMovies.Length); + switch (args.EventType) + { + case SynchronizationEventType.PushItem: + args.TotalNrItems.Should().Be(newMovies.Length); + args.ItemsProcessed.Should().BeInRange(1,newMovies.Length); + int prevProcessed = Interlocked.Exchange(ref itemsProcessedReported[args.ItemsProcessed-1], 1); + prevProcessed.Should().Be(0, "Each item should only be reported once"); + args.PushOperation.Should().NotBeNull(); + args.PushOperation.ItemId.Should().Be(newMovies[args.ItemsProcessed - 1].id); + eventFiredForItem = true; + break; + case SynchronizationEventType.PushStarted: + eventFiredForStart.Should().BeFalse("PushStarted event should only fire once"); + eventFiredForStart = true; + break; + case SynchronizationEventType.PushEnded: + eventFiredForEnd.Should().BeFalse("PushEnded event should only fire once"); + eventFiredForEnd = true; + args.ItemsProcessed.Should().Be(newMovies.Length); + itemsProcessedReported.Should().OnlyContain(x => x == 1, "All items should be reported as processed"); + args.PushOperation.Should().BeNull(); + break; + default: + Assert.Fail($"Invalid event type: {args.EventType}"); + break; + } + }; + + PushResult results = await this.context.Movies.PushAsync(); + + eventFiredForStart.Should().BeTrue(); + eventFiredForItem.Should().BeTrue(); + eventFiredForEnd.Should().BeTrue(); + } + #endregion public class NotOfflineDbContext : DbContext diff --git a/tests/CommunityToolkit.Datasync.TestCommon/TestData/Movies.cs b/tests/CommunityToolkit.Datasync.TestCommon/TestData/Movies.cs index c6b136c..f39e65d 100644 --- a/tests/CommunityToolkit.Datasync.TestCommon/TestData/Movies.cs +++ b/tests/CommunityToolkit.Datasync.TestCommon/TestData/Movies.cs @@ -20,6 +20,26 @@ public static class Movies Year = 2018 }; + public static readonly MovieBase Dune = new() + { + BestPictureWinner = false, + Duration = 155, + Rating = MovieRating.PG13, + ReleaseDate = new DateOnly(2021, 10, 22), + Title = "Dune", + Year = 2021 + }; + + public static readonly MovieBase DrNo = new() + { + BestPictureWinner = false, + Duration = 110, + Rating = MovieRating.PG, + ReleaseDate = new DateOnly(1962, 5, 8), + Title = "Dr. No", + Year = 1962 + }; + /// /// Counts the number of items in the list that match the predicate. /// From 5ad984c0ab5550782db0a32f677d3474af48ae26 Mon Sep 17 00:00:00 2001 From: Stefan Cuypers Date: Mon, 30 Jun 2025 16:04:12 +0200 Subject: [PATCH 06/13] (#384) Changed DateTimeConverter on client and server side to handle 'default' DateTime correctly. --- .../Serialization/DateTimeConverter.cs | 7 +++++++ .../Json/DateTimeConverter.cs | 19 ++++++++++++++++++- 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/src/CommunityToolkit.Datasync.Client/Serialization/DateTimeConverter.cs b/src/CommunityToolkit.Datasync.Client/Serialization/DateTimeConverter.cs index da03fb5..208b4e0 100644 --- a/src/CommunityToolkit.Datasync.Client/Serialization/DateTimeConverter.cs +++ b/src/CommunityToolkit.Datasync.Client/Serialization/DateTimeConverter.cs @@ -26,6 +26,13 @@ public override DateTime Read(ref Utf8JsonReader reader, Type typeToConvert, Jso } else { + // Check if datetime was 'default'. If so do not adjust to local time. + DateTime utc = DateTime.Parse(token, CultureInfo.InvariantCulture, DateTimeStyles.AdjustToUniversal | DateTimeStyles.AssumeUniversal); + if (utc == default) + { + return utc; + } + return DateTime.Parse(token); } } diff --git a/src/CommunityToolkit.Datasync.Server.Abstractions/Json/DateTimeConverter.cs b/src/CommunityToolkit.Datasync.Server.Abstractions/Json/DateTimeConverter.cs index a61d41e..8ec7e72 100644 --- a/src/CommunityToolkit.Datasync.Server.Abstractions/Json/DateTimeConverter.cs +++ b/src/CommunityToolkit.Datasync.Server.Abstractions/Json/DateTimeConverter.cs @@ -18,7 +18,24 @@ public class DateTimeConverter : JsonConverter /// public override DateTime Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) - => DateTime.Parse(reader.GetString() ?? string.Empty); + { + string? token = reader.GetString(); + if (string.IsNullOrEmpty(token)) + { + return DateTime.MinValue; + } + else + { + // Check if datetime was 'default'. If so do not adjust to local time. + DateTime utc = DateTime.Parse(token, CultureInfo.InvariantCulture, DateTimeStyles.AdjustToUniversal | DateTimeStyles.AssumeUniversal); + if (utc == default) + { + return utc; + } + + return DateTime.Parse(token); + } + } /// public override void Write(Utf8JsonWriter writer, DateTime value, JsonSerializerOptions options) From dbff3f75ad42988e0c230e6f21944f39e8cc51e1 Mon Sep 17 00:00:00 2001 From: Stefan Cuypers Date: Mon, 30 Jun 2025 16:29:13 +0200 Subject: [PATCH 07/13] (#384) Renamed TotalNrItems on SynchronizationEventArgs to ItemsTotal. Made DateTimeConverter on server Except again if input is empty. Optimised DataTimeConverter to avoid double parsing. --- .../Offline/Operations/PullOperationManager.cs | 6 +++--- .../OperationsQueue/OperationsQueueManager.cs | 6 +++--- .../Offline/SynchronizationEventArgs.cs | 2 +- .../Serialization/DateTimeConverter.cs | 2 +- .../Json/DateTimeConverter.cs | 18 +++++------------- .../Offline/OfflineDbContext_Tests.cs | 10 +++++----- 6 files changed, 18 insertions(+), 26 deletions(-) diff --git a/src/CommunityToolkit.Datasync.Client/Offline/Operations/PullOperationManager.cs b/src/CommunityToolkit.Datasync.Client/Offline/Operations/PullOperationManager.cs index 9b2d278..8341ec6 100644 --- a/src/CommunityToolkit.Datasync.Client/Offline/Operations/PullOperationManager.cs +++ b/src/CommunityToolkit.Datasync.Client/Offline/Operations/PullOperationManager.cs @@ -118,7 +118,7 @@ public async Task ExecuteAsync(IEnumerable requests, Pu EventType = SynchronizationEventType.ItemsCommitted, EntityType = pullResponse.EntityType, ItemsProcessed = pullResponse.TotalItemsProcessed, - TotalNrItems = pullResponse.TotalRequestItems, + ItemsTotal = pullResponse.TotalRequestItems, QueryId = pullResponse.QueryId }); } @@ -130,7 +130,7 @@ public async Task ExecuteAsync(IEnumerable requests, Pu EventType = SynchronizationEventType.PullEnded, EntityType = pullResponse.EntityType, ItemsProcessed = pullResponse.TotalItemsProcessed, - TotalNrItems = pullResponse.TotalRequestItems, + ItemsTotal = pullResponse.TotalRequestItems, QueryId = pullResponse.QueryId, Exception = pullResponse.Exception, ServiceResponse = pullResponse.Exception is DatasyncPullException ex ? ex.ServiceResponse : null @@ -167,7 +167,7 @@ public async Task ExecuteAsync(IEnumerable requests, Pu EventType = SynchronizationEventType.ItemsFetched, EntityType = pullRequest.EntityType, ItemsProcessed = itemsProcessed, - TotalNrItems = page.Count ?? 0, + ItemsTotal = page.Count ?? 0, QueryId = pullRequest.QueryId }); diff --git a/src/CommunityToolkit.Datasync.Client/Offline/OperationsQueue/OperationsQueueManager.cs b/src/CommunityToolkit.Datasync.Client/Offline/OperationsQueue/OperationsQueueManager.cs index 1807e34..da44d44 100644 --- a/src/CommunityToolkit.Datasync.Client/Offline/OperationsQueue/OperationsQueueManager.cs +++ b/src/CommunityToolkit.Datasync.Client/Offline/OperationsQueue/OperationsQueueManager.cs @@ -275,7 +275,7 @@ internal async Task PushAsync(IEnumerable entityTypes, PushOpt this._context.SendSynchronizationEvent(new SynchronizationEventArgs() { EventType = SynchronizationEventType.PushStarted, - TotalNrItems = queuedOperations.Count + ItemsTotal = queuedOperations.Count }); if (queuedOperations.Count == 0) @@ -301,7 +301,7 @@ internal async Task PushAsync(IEnumerable entityTypes, PushOpt { EventType = SynchronizationEventType.PushItem, ItemsProcessed = newItemsProcessed, - TotalNrItems = queuedOperations.Count, + ItemsTotal = queuedOperations.Count, PushOperation = operation, }); }); @@ -317,7 +317,7 @@ internal async Task PushAsync(IEnumerable entityTypes, PushOpt { EventType = SynchronizationEventType.PushEnded, ItemsProcessed = nrItemsProcessed, - TotalNrItems = queuedOperations.Count, + ItemsTotal = queuedOperations.Count, }); return pushResult; diff --git a/src/CommunityToolkit.Datasync.Client/Offline/SynchronizationEventArgs.cs b/src/CommunityToolkit.Datasync.Client/Offline/SynchronizationEventArgs.cs index 1740b06..11f6959 100644 --- a/src/CommunityToolkit.Datasync.Client/Offline/SynchronizationEventArgs.cs +++ b/src/CommunityToolkit.Datasync.Client/Offline/SynchronizationEventArgs.cs @@ -73,7 +73,7 @@ public class SynchronizationEventArgs /// When pulling records, the total number of items to pull for the given entity in the current pull request /// When pushing records, the total number of items that are being pushed in the current push request. /// - public long TotalNrItems { get; init; } + public long ItemsTotal { get; init; } /// /// The query ID that is being processed on pull operations. Not used on push events. diff --git a/src/CommunityToolkit.Datasync.Client/Serialization/DateTimeConverter.cs b/src/CommunityToolkit.Datasync.Client/Serialization/DateTimeConverter.cs index 208b4e0..c5db71d 100644 --- a/src/CommunityToolkit.Datasync.Client/Serialization/DateTimeConverter.cs +++ b/src/CommunityToolkit.Datasync.Client/Serialization/DateTimeConverter.cs @@ -33,7 +33,7 @@ public override DateTime Read(ref Utf8JsonReader reader, Type typeToConvert, Jso return utc; } - return DateTime.Parse(token); + return utc.ToLocalTime(); } } diff --git a/src/CommunityToolkit.Datasync.Server.Abstractions/Json/DateTimeConverter.cs b/src/CommunityToolkit.Datasync.Server.Abstractions/Json/DateTimeConverter.cs index 8ec7e72..d5d8918 100644 --- a/src/CommunityToolkit.Datasync.Server.Abstractions/Json/DateTimeConverter.cs +++ b/src/CommunityToolkit.Datasync.Server.Abstractions/Json/DateTimeConverter.cs @@ -19,22 +19,14 @@ public class DateTimeConverter : JsonConverter /// public override DateTime Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) { - string? token = reader.GetString(); - if (string.IsNullOrEmpty(token)) + // Check if datetime was 'default'. If so do not adjust to local time. + DateTime utc = DateTime.Parse(reader.GetString() ?? "", CultureInfo.InvariantCulture, DateTimeStyles.AdjustToUniversal | DateTimeStyles.AssumeUniversal); + if (utc == default) { - return DateTime.MinValue; + return utc; } - else - { - // Check if datetime was 'default'. If so do not adjust to local time. - DateTime utc = DateTime.Parse(token, CultureInfo.InvariantCulture, DateTimeStyles.AdjustToUniversal | DateTimeStyles.AssumeUniversal); - if (utc == default) - { - return utc; - } - return DateTime.Parse(token); - } + return utc.ToLocalTime(); } /// diff --git a/tests/CommunityToolkit.Datasync.Client.Test/Offline/OfflineDbContext_Tests.cs b/tests/CommunityToolkit.Datasync.Client.Test/Offline/OfflineDbContext_Tests.cs index 4ca9db4..33db3a4 100644 --- a/tests/CommunityToolkit.Datasync.Client.Test/Offline/OfflineDbContext_Tests.cs +++ b/tests/CommunityToolkit.Datasync.Client.Test/Offline/OfflineDbContext_Tests.cs @@ -1467,13 +1467,13 @@ public async Task SynchronizationProgress_Event_Works() case SynchronizationEventType.ItemsFetched: currentItemsFetched += 5; args.ItemsProcessed.Should().Be(currentItemsFetched); - args.TotalNrItems.Should().Be(20); + args.ItemsTotal.Should().Be(20); eventFiredForFetch = true; break; case SynchronizationEventType.ItemsCommitted: currentItemsCommited += 5; args.ItemsProcessed.Should().Be(currentItemsCommited); - args.TotalNrItems.Should().Be(20); + args.ItemsTotal.Should().Be(20); eventFiredForCommit = true; break; case SynchronizationEventType.PullStarted: @@ -1484,7 +1484,7 @@ public async Task SynchronizationProgress_Event_Works() eventFiredForEnd.Should().BeFalse("PullEnded event should only fire once"); eventFiredForEnd = true; args.ItemsProcessed.Should().Be(20); - args.TotalNrItems.Should().Be(20); + args.ItemsTotal.Should().Be(20); break; default: Assert.Fail($"Invalid event type: {args.EventType}"); @@ -1572,11 +1572,11 @@ public async Task SynchronizationProgress_Event_Works_For_Push() sender.Should().Be(this.context); args.Exception.Should().BeNull(); args.ServiceResponse.Should().BeNull(); - args.TotalNrItems.Should().Be(newMovies.Length); + args.ItemsTotal.Should().Be(newMovies.Length); switch (args.EventType) { case SynchronizationEventType.PushItem: - args.TotalNrItems.Should().Be(newMovies.Length); + args.ItemsTotal.Should().Be(newMovies.Length); args.ItemsProcessed.Should().BeInRange(1,newMovies.Length); int prevProcessed = Interlocked.Exchange(ref itemsProcessedReported[args.ItemsProcessed-1], 1); prevProcessed.Should().Be(0, "Each item should only be reported once"); From 76657a068bbb87482247795c1df6a114caf008eb Mon Sep 17 00:00:00 2001 From: Stefan Cuypers <32466116+StefanCuypers@users.noreply.github.com> Date: Wed, 2 Jul 2025 05:54:55 +0200 Subject: [PATCH 08/13] Update src/CommunityToolkit.Datasync.Client/Offline/SynchronizationEventArgs.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../Offline/SynchronizationEventArgs.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/CommunityToolkit.Datasync.Client/Offline/SynchronizationEventArgs.cs b/src/CommunityToolkit.Datasync.Client/Offline/SynchronizationEventArgs.cs index 11f6959..08cfb93 100644 --- a/src/CommunityToolkit.Datasync.Client/Offline/SynchronizationEventArgs.cs +++ b/src/CommunityToolkit.Datasync.Client/Offline/SynchronizationEventArgs.cs @@ -15,7 +15,7 @@ public enum SynchronizationEventType /// is not yet known here PullStarted, /// - /// Occurs when items have been successfully fetches from the server. + /// Occurs when items have been successfully fetched from the server. /// /// This event is raised after a page of entities was succesfully fetched from the server, ready to be commited to the data store. ItemsFetched, From cf967674ca566d8afd482ce99761d10524775254 Mon Sep 17 00:00:00 2001 From: Stefan Cuypers <32466116+StefanCuypers@users.noreply.github.com> Date: Wed, 2 Jul 2025 05:55:19 +0200 Subject: [PATCH 09/13] Update src/CommunityToolkit.Datasync.Client/Offline/SynchronizationEventArgs.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../Offline/SynchronizationEventArgs.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/CommunityToolkit.Datasync.Client/Offline/SynchronizationEventArgs.cs b/src/CommunityToolkit.Datasync.Client/Offline/SynchronizationEventArgs.cs index 08cfb93..3e2dd25 100644 --- a/src/CommunityToolkit.Datasync.Client/Offline/SynchronizationEventArgs.cs +++ b/src/CommunityToolkit.Datasync.Client/Offline/SynchronizationEventArgs.cs @@ -17,13 +17,13 @@ public enum SynchronizationEventType /// /// Occurs when items have been successfully fetched from the server. /// - /// This event is raised after a page of entities was succesfully fetched from the server, ready to be commited to the data store. + /// This event is raised after a page of entities was successfully fetched from the server, ready to be committed to the data store. ItemsFetched, /// /// Occurs when items have been successfully committed to the underlying data store. /// - /// This event is raised after a page of entities was succesfully commited to the database + /// This event is raised after a page of entities was successfully committed to the database ItemsCommitted, /// From a69e1561e52bdc8d852c5e95c491323b40291334 Mon Sep 17 00:00:00 2001 From: Stefan Cuypers <32466116+StefanCuypers@users.noreply.github.com> Date: Wed, 2 Jul 2025 05:55:31 +0200 Subject: [PATCH 10/13] Update src/CommunityToolkit.Datasync.Client/Offline/SynchronizationEventArgs.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../Offline/SynchronizationEventArgs.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/CommunityToolkit.Datasync.Client/Offline/SynchronizationEventArgs.cs b/src/CommunityToolkit.Datasync.Client/Offline/SynchronizationEventArgs.cs index 3e2dd25..0c8a763 100644 --- a/src/CommunityToolkit.Datasync.Client/Offline/SynchronizationEventArgs.cs +++ b/src/CommunityToolkit.Datasync.Client/Offline/SynchronizationEventArgs.cs @@ -27,7 +27,7 @@ public enum SynchronizationEventType ItemsCommitted, /// - /// Pull for the given entiry ended. + /// Pull for the given entity ended. /// PullEnded, /// From a590ff20061f852d953af20e7734951c83dd0606 Mon Sep 17 00:00:00 2001 From: Stefan Cuypers <32466116+StefanCuypers@users.noreply.github.com> Date: Wed, 2 Jul 2025 05:55:47 +0200 Subject: [PATCH 11/13] Update src/CommunityToolkit.Datasync.Client/Offline/OperationsQueue/OperationsQueueManager.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../Offline/OperationsQueue/OperationsQueueManager.cs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/CommunityToolkit.Datasync.Client/Offline/OperationsQueue/OperationsQueueManager.cs b/src/CommunityToolkit.Datasync.Client/Offline/OperationsQueue/OperationsQueueManager.cs index da44d44..ba2211c 100644 --- a/src/CommunityToolkit.Datasync.Client/Offline/OperationsQueue/OperationsQueueManager.cs +++ b/src/CommunityToolkit.Datasync.Client/Offline/OperationsQueue/OperationsQueueManager.cs @@ -283,7 +283,9 @@ internal async Task PushAsync(IEnumerable entityTypes, PushOpt // Signal we ended the push operation. this._context.SendSynchronizationEvent(new SynchronizationEventArgs() { - EventType = SynchronizationEventType.PushEnded + EventType = SynchronizationEventType.PushEnded, + ItemsProcessed = 0, + ItemsTotal = 0 }); return pushResult; } From 053b2f05ad33170fd96f82290b4a30bf8bf31473 Mon Sep 17 00:00:00 2001 From: Stefan Cuypers Date: Wed, 2 Jul 2025 06:21:18 +0200 Subject: [PATCH 12/13] Added DateTimeConverte tests for verifying default time, local time and utc time conversion. --- .../Serialization/DateTimeConverter_Tests.cs | 46 +++++++++++++++++++ .../Json/DateTimeConverter_Tests.cs | 46 +++++++++++++++++++ 2 files changed, 92 insertions(+) diff --git a/tests/CommunityToolkit.Datasync.Client.Test/Serialization/DateTimeConverter_Tests.cs b/tests/CommunityToolkit.Datasync.Client.Test/Serialization/DateTimeConverter_Tests.cs index a7f0533..c33737c 100644 --- a/tests/CommunityToolkit.Datasync.Client.Test/Serialization/DateTimeConverter_Tests.cs +++ b/tests/CommunityToolkit.Datasync.Client.Test/Serialization/DateTimeConverter_Tests.cs @@ -75,6 +75,52 @@ public void Converter_HandlesNullDateInInput(string culture) }); } + [Theory] + [MemberData(nameof(Locales))] + public void Converter_Roundtrip_Consistent_Default(string culture) + { + DateTime value = default; + + TestWithCulture(culture, () => + { + Entity entity = new() { UpdatedAt = value }; + string serialized = JsonSerializer.Serialize(entity, SerializerOptions); + Entity deserialized = JsonSerializer.Deserialize(serialized, SerializerOptions); + Assert.Equal(deserialized.UpdatedAt, value); + }); + } + + [Theory] + [MemberData(nameof(Locales))] + public void Converter_Roundtrip_Consistent_Local(string culture) + { + DateTime value = new(2021, 8, 21, 12, 30, 15, 123, DateTimeKind.Local); + + TestWithCulture(culture, () => + { + Entity entity = new() { UpdatedAt = value }; + string serialized = JsonSerializer.Serialize(entity, SerializerOptions); + Entity deserialized = JsonSerializer.Deserialize(serialized, SerializerOptions); + Assert.Equal(deserialized.UpdatedAt, value); + }); + } + + [Theory] + [MemberData(nameof(Locales))] + public void Converter_Roundtrip_Consistent_Utc(string culture) + { + DateTime value = new(2021, 8, 21, 14, 35, 20, 12, DateTimeKind.Utc); + + TestWithCulture(culture, () => + { + Entity entity = new() { UpdatedAt = value }; + string serialized = JsonSerializer.Serialize(entity, SerializerOptions); + Entity deserialized = JsonSerializer.Deserialize(serialized, SerializerOptions); + // Roundtrip will convert to local time, DateTimeKind is not preserved. + Assert.Equal(deserialized.UpdatedAt, value.ToLocalTime()); + }); + } + #region Models public class Entity { diff --git a/tests/CommunityToolkit.Datasync.Server.Abstractions.Test/Json/DateTimeConverter_Tests.cs b/tests/CommunityToolkit.Datasync.Server.Abstractions.Test/Json/DateTimeConverter_Tests.cs index e40b681..5bb6823 100644 --- a/tests/CommunityToolkit.Datasync.Server.Abstractions.Test/Json/DateTimeConverter_Tests.cs +++ b/tests/CommunityToolkit.Datasync.Server.Abstractions.Test/Json/DateTimeConverter_Tests.cs @@ -69,6 +69,52 @@ public void Converter_ThrowsOnNullDateInInput() act.Should().Throw(); } + [Theory] + [MemberData(nameof(Locales))] + public void Converter_Roundtrip_Consistent_Default(string culture) + { + DateTime value = default; + + TestWithCulture(culture, () => + { + Entity entity = new() { UpdatedAt = value }; + string serialized = JsonSerializer.Serialize(entity, SerializerOptions); + Entity deserialized = JsonSerializer.Deserialize(serialized, SerializerOptions); + Assert.Equal(deserialized.UpdatedAt, value); + }); + } + + [Theory] + [MemberData(nameof(Locales))] + public void Converter_Roundtrip_Consistent_Local(string culture) + { + DateTime value = new(2021, 8, 21, 12, 30, 15, 123, DateTimeKind.Local); + + TestWithCulture(culture, () => + { + Entity entity = new() { UpdatedAt = value }; + string serialized = JsonSerializer.Serialize(entity, SerializerOptions); + Entity deserialized = JsonSerializer.Deserialize(serialized, SerializerOptions); + Assert.Equal(deserialized.UpdatedAt, value); + }); + } + + [Theory] + [MemberData(nameof(Locales))] + public void Converter_Roundtrip_Consistent_Utc(string culture) + { + DateTime value = new(2021, 8, 21, 14, 35, 20, 12, DateTimeKind.Utc); + + TestWithCulture(culture, () => + { + Entity entity = new() { UpdatedAt = value }; + string serialized = JsonSerializer.Serialize(entity, SerializerOptions); + Entity deserialized = JsonSerializer.Deserialize(serialized, SerializerOptions); + // Roundtrip will convert to local time, DateTimeKind is not preserved. + Assert.Equal(deserialized.UpdatedAt, value.ToLocalTime()); + }); + } + #region Models public class Entity { From 4ffbc920e56450c289938773c88ab833f170a298 Mon Sep 17 00:00:00 2001 From: Stefan Cuypers Date: Wed, 2 Jul 2025 06:54:32 +0200 Subject: [PATCH 13/13] Changed QueueHandler_WithThreads_Enqueue test to sleep a thread for 2 seconds instead of 1. This reduces the risk of thread reusage which could cause the distinct threads count to fail. --- .../Threading/QueueHandler_Tests.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/CommunityToolkit.Datasync.Client.Test/Threading/QueueHandler_Tests.cs b/tests/CommunityToolkit.Datasync.Client.Test/Threading/QueueHandler_Tests.cs index b38d04a..b408f97 100644 --- a/tests/CommunityToolkit.Datasync.Client.Test/Threading/QueueHandler_Tests.cs +++ b/tests/CommunityToolkit.Datasync.Client.Test/Threading/QueueHandler_Tests.cs @@ -23,7 +23,7 @@ public async Task QueueHandler_WithThreads_Enqueue(int nThreads) { accId.Enqueue(el); accTh.Enqueue(Environment.CurrentManagedThreadId); - Thread.Sleep(1000); + Thread.Sleep(2000); return Task.CompletedTask; }); DateTimeOffset startTime = DateTimeOffset.Now; @@ -49,7 +49,7 @@ public async Task QueueHandler_WithThreads_Enqueue(int nThreads) accTh.AsEnumerable().Distinct().Should().HaveCount(nThreads); // This just makes sure that the amount of time is "of the right order of magnitude" since CI systems // are notoriously bad at correct timings. We just don't want it to be 10x the expected time. - (endTime - startTime).TotalSeconds.Should().BeLessThanOrEqualTo((nElements / nThreads) + 5); + (endTime - startTime).TotalSeconds.Should().BeLessThanOrEqualTo(2 * (nElements / nThreads) + 5); } [Theory(Timeout = 30000)]