diff --git a/src/Arma.Demo.Core/Sync/ISyncService.cs b/src/Arma.Demo.Core/Sync/ISyncService.cs index 65dcfd0..2817d1e 100644 --- a/src/Arma.Demo.Core/Sync/ISyncService.cs +++ b/src/Arma.Demo.Core/Sync/ISyncService.cs @@ -1,7 +1,6 @@ namespace Arma.Demo.Core.Sync; public interface ISyncService : IAsyncDisposable { - Task EnsureConnection(); Task Connect(); Task Join(Guid key); Task Leave(Guid key); diff --git a/src/Arma.Demo.Core/Sync/SyncAction.cs b/src/Arma.Demo.Core/Sync/SyncAction.cs index 7e65716..1f0b8b5 100644 --- a/src/Arma.Demo.Core/Sync/SyncAction.cs +++ b/src/Arma.Demo.Core/Sync/SyncAction.cs @@ -1,9 +1,33 @@ +using Microsoft.AspNetCore.SignalR.Client; + namespace Arma.Demo.Core.Sync; -public enum SyncAction +public class SyncEvent : IDisposable { - Push, - Notify, - Complete, - Return, - Reject + private readonly string method; + private readonly HubConnection client; + private IDisposable subscription; + + public SyncEvent(string method, HubConnection client) + { + this.method = method; + this.client = client; + } + + public void Set(Func action) + { + subscription?.Dispose(); + subscription = client.On(method, action); + } + + public void Set(Action action) + { + subscription?.Dispose(); + subscription = client.On(method, action); + } + + public void Dispose() + { + subscription?.Dispose(); + GC.SuppressFinalize(this); + } } \ No newline at end of file diff --git a/src/Arma.Demo.Core/Sync/SyncActionType.cs b/src/Arma.Demo.Core/Sync/SyncActionType.cs new file mode 100644 index 0000000..64beb15 --- /dev/null +++ b/src/Arma.Demo.Core/Sync/SyncActionType.cs @@ -0,0 +1,9 @@ +namespace Arma.Demo.Core.Sync; +public enum SyncActionType +{ + Push, + Notify, + Complete, + Return, + Reject +} \ No newline at end of file diff --git a/src/Arma.Demo.Core/Sync/SyncMessage.cs b/src/Arma.Demo.Core/Sync/SyncMessage.cs index b3e729d..996cebb 100644 --- a/src/Arma.Demo.Core/Sync/SyncMessage.cs +++ b/src/Arma.Demo.Core/Sync/SyncMessage.cs @@ -7,5 +7,5 @@ public class SyncMessage public Guid Key { get; set; } public string Message { get; set; } public T Data { get; set; } - public SyncAction Action { get; set; } + public SyncActionType Action { get; set; } } \ No newline at end of file diff --git a/src/Arma.Demo.Core/Sync/SyncService.cs b/src/Arma.Demo.Core/Sync/SyncService.cs index 5c9c7e5..28717f6 100644 --- a/src/Arma.Demo.Core/Sync/SyncService.cs +++ b/src/Arma.Demo.Core/Sync/SyncService.cs @@ -5,44 +5,58 @@ public abstract class SyncService : ISyncService { protected readonly HubConnection connection; protected readonly string endpoint; + protected CancellationToken token; protected List Groups { get; set; } - public virtual Action OnRegistered { get; set; } - public virtual Func, Task> OnPush { get; set; } - public virtual Func, Task> OnNotify { get; set; } - public virtual Func, Task> OnComplete { get; set; } - public virtual Func, Task> OnReturn { get; set; } - public virtual Func, Task> OnReject { get; set; } + + public SyncEvent OnRegistered { get; private set; } + public SyncEvent> OnPush { get; private set; } + public SyncEvent> OnNotify { get; private set; } + public SyncEvent> OnComplete { get; private set; } + public SyncEvent> OnReturn { get; private set; } + public SyncEvent> OnReject { get; private set; } public SyncService(string endpoint) { Console.WriteLine($"Building Sync connection at {endpoint}"); connection = BuildHubConnection(endpoint); - this.endpoint = endpoint; - Groups = new(); - } + InitializeEvents(); - protected virtual HubConnection BuildHubConnection(string endpoint) => - new HubConnectionBuilder() - .WithUrl(endpoint) - .WithAutomaticReconnect() - .Build(); - - public async Task EnsureConnection() - { - if (connection.State != HubConnectionState.Connected) + connection.Closed += async (error) => + { + await Task.Delay(5000); await Connect(); + }; + + this.endpoint = endpoint; + Groups = new(); + token = new(); } public async Task Connect() { - if (connection is not null) + if (connection.State != HubConnectionState.Connected) { - Console.WriteLine("Registering Sync events"); - RegisterEvents(); - Console.WriteLine($"Connecting to {endpoint}"); - await connection.StartAsync(); + while (true) + { + try + { + Console.WriteLine($"Connecting to {endpoint}"); + await connection.StartAsync(token); + return; + } + catch when (token.IsCancellationRequested) + { + return; + } + catch (Exception ex) + { + Console.WriteLine($"Failed to connect to {endpoint}"); + Console.WriteLine(ex.Message); + await Task.Delay(5000); + } + } } } @@ -62,63 +76,70 @@ public async Task Leave(Guid key) public async Task Push(SyncMessage message) { - message.Action = SyncAction.Push; + message.Action = SyncActionType.Push; await connection.InvokeAsync("SendPush", message); } public async Task Notify(SyncMessage message) { - message.Action = SyncAction.Notify; + message.Action = SyncActionType.Notify; await connection.InvokeAsync("SendNotify", message); } public async Task Complete(SyncMessage message) { - message.Action = SyncAction.Complete; + message.Action = SyncActionType.Complete; await connection.InvokeAsync("SendComplete", message); } public async Task Return(SyncMessage message) { - message.Action = SyncAction.Return; + message.Action = SyncActionType.Return; await connection.InvokeAsync("SendReturn", message); } public async Task Reject(SyncMessage message) { - message.Action = SyncAction.Reject; + message.Action = SyncActionType.Reject; await connection.InvokeAsync("SendReject", message); } - void RegisterEvents() - { - if (OnRegistered is not null) - connection.On("Registered", OnRegistered); - - if (OnPush is not null) - connection.On("Push", OnPush); - - if (OnNotify is not null) - connection.On("Notify", OnNotify); - - if (OnComplete is not null) - connection.On("Complete", OnComplete); - - if (OnReturn is not null) - connection.On("Return", OnReturn); + protected virtual HubConnection BuildHubConnection(string endpoint) => + new HubConnectionBuilder() + .WithUrl(endpoint) + .WithAutomaticReconnect() + .Build(); - if (OnReject is not null) - connection.On("Reject", OnReject); + protected void InitializeEvents() + { + OnRegistered = new("Registered", connection); + OnPush = new("Push", connection); + OnNotify = new("Notify", connection); + OnComplete = new("Complete", connection); + OnReturn = new("Return", connection); + OnReject = new("Reject", connection); } public async ValueTask DisposeAsync() { + DisposeEvents(); + await DisposeConnection() .ConfigureAwait(false); GC.SuppressFinalize(this); } + protected void DisposeEvents() + { + OnRegistered?.Dispose(); + OnPush?.Dispose(); + OnNotify?.Dispose(); + OnComplete?.Dispose(); + OnReturn?.Dispose(); + OnReject?.Dispose(); + } + protected async ValueTask DisposeConnection() { if (connection is not null) diff --git a/src/cli/Clients/ProcessorClient.cs b/src/cli/Clients/ProcessorClient.cs index 306edef..bed45c2 100644 --- a/src/cli/Clients/ProcessorClient.cs +++ b/src/cli/Clients/ProcessorClient.cs @@ -6,7 +6,8 @@ public class ProcessorClient : SyncService { public ProcessorClient(string endpoint) : base(endpoint) { - OnPush = OnNotify = Output; + OnPush.Set(Output); + OnNotify.Set(Output); } static Task Output(SyncMessage message) diff --git a/src/cli/Commands/ProcessCommand.cs b/src/cli/Commands/ProcessCommand.cs index 2644516..fec3647 100644 --- a/src/cli/Commands/ProcessCommand.cs +++ b/src/cli/Commands/ProcessCommand.cs @@ -36,12 +36,12 @@ static async Task Call(string server, string api, string sync, Intent intent) bool exit = false; await using ProcessorClient processor = new(sync); - processor.OnComplete = async (SyncMessage message) => + processor.OnComplete.Set(async (SyncMessage message) => { Console.WriteLine(message.Message); await processor.Leave(message.Key); exit = true; - }; + }); Console.WriteLine($"Generating {intent.ToActionString()} Package"); Package package = GeneratePackage(intent); diff --git a/src/jps-core-api/Services/ProcessorService.cs b/src/jps-core-api/Services/ProcessorService.cs index c1b54c4..b2b46e9 100644 --- a/src/jps-core-api/Services/ProcessorService.cs +++ b/src/jps-core-api/Services/ProcessorService.cs @@ -10,7 +10,7 @@ public ProcessorService(IConfiguration config) : base( public async Task SendPackage(Package package) { - await EnsureConnection(); + await Connect(); await Join(package.Key); @@ -19,7 +19,6 @@ public async Task SendPackage(Package package) Id = Guid.NewGuid(), Key = package.Key, Data = package, - Action = SyncAction.Push, Message = $"Initializing Package {package.Name} for processing" }; diff --git a/src/jps-processor/Services/ProcessorService.cs b/src/jps-processor/Services/ProcessorService.cs index 80575f4..02687eb 100644 --- a/src/jps-processor/Services/ProcessorService.cs +++ b/src/jps-processor/Services/ProcessorService.cs @@ -11,13 +11,13 @@ public ProcessorService(IConfiguration config) : base( config.GetValue("SyncServer:ProcessorUrl") ?? "https://jps-sync.azurewebsites.net/processor" ) { - OnRegistered = (Guid key) => + OnRegistered.Set((Guid key) => { Console.WriteLine($"Service successfully registered at {key}"); Key = key; - }; + }); - OnPush = ProcessPackage; + OnPush.Set(ProcessPackage); } public static async Task Initialize(IServiceProvider services) @@ -33,7 +33,7 @@ public async Task Register() if (!Key.HasValue) { Key = Guid.NewGuid(); - await EnsureConnection(); + await Connect(); Console.WriteLine($"Registering service with key {Key}"); await connection.InvokeAsync("RegisterService", Key); } @@ -49,31 +49,34 @@ public async Task ProcessPackage(SyncMessage message) message.Message = $"Submitting package {message.Data.Name} for {message.Data.Intent.ToActionString()}"; + Console.WriteLine(message.Message); await Notify(message); await Task.Delay(1200); message.Message = $"Package {message.Data.Name} assigned process {process.Name}"; + Console.WriteLine(message.Message); await Notify(message); foreach (ProcessTask task in process.Tasks) { message.Message = $"Current step: {task.Name}"; + Console.WriteLine(message.Message); await Notify(message); await Task.Delay(task.Duration); message.Message = $"Package {message.Data.Name} was successfully appoved by {task.Section}"; + Console.WriteLine(message.Message); await Notify(message); } await Task.Delay(300); message.Message = $"Package {message.Data.Name} was successfully approved"; + Console.WriteLine(message.Message); await Complete(message); - - Console.WriteLine($"Processing package {message.Data.Name} with process {process.Name} was successful"); } static Process GenerateProcess(Package package) => package.Intent switch diff --git a/tests/cross-service-signalr/apis/core-api/Services/ProcessorService.cs b/tests/cross-service-signalr/apis/core-api/Services/ProcessorService.cs index c024845..7f1b68d 100644 --- a/tests/cross-service-signalr/apis/core-api/Services/ProcessorService.cs +++ b/tests/cross-service-signalr/apis/core-api/Services/ProcessorService.cs @@ -10,7 +10,7 @@ public ProcessorService(IConfiguration config) : base( public async Task SendPackage(Package package) { - await EnsureConnection(); + await Connect(); await Join(package.Key); @@ -19,7 +19,6 @@ public async Task SendPackage(Package package) Id = Guid.NewGuid(), Key = package.Key, Data = package, - Action = SyncAction.Push, Message = $"Initializing Package {package.Name} for processing" }; diff --git a/tests/cross-service-signalr/apis/processor/Services/ProcessorService.cs b/tests/cross-service-signalr/apis/processor/Services/ProcessorService.cs index eff1308..d1e4f04 100644 --- a/tests/cross-service-signalr/apis/processor/Services/ProcessorService.cs +++ b/tests/cross-service-signalr/apis/processor/Services/ProcessorService.cs @@ -11,13 +11,13 @@ public ProcessorService(IConfiguration config) : base( config.GetValue("SyncServer:ProcessorUrl") ?? "http://localhost:5100/processor" ) { - OnRegistered = (Guid key) => + OnRegistered.Set((Guid key) => { Console.WriteLine($"Service successfully registered at {key}"); Key = key; - }; + }); - OnPush = ProcessPackage; + OnPush.Set(ProcessPackage); } public static async Task Initialize(IServiceProvider services) @@ -33,7 +33,7 @@ public async Task Register() if (!Key.HasValue) { Key = Guid.NewGuid(); - await EnsureConnection(); + await Connect(); Console.WriteLine($"Registering service with key {Key}"); await connection.InvokeAsync("RegisterService", Key); } @@ -49,31 +49,34 @@ public async Task ProcessPackage(SyncMessage message) message.Message = $"Submitting package {message.Data.Name} for {message.Data.Intent.ToActionString()}"; + Console.WriteLine(message.Message); await Notify(message); await Task.Delay(1200); message.Message = $"Package {message.Data.Name} assigned process {process.Name}"; + Console.WriteLine(message.Message); await Notify(message); foreach (ProcessTask task in process.Tasks) { message.Message = $"Current step: {task.Name}"; + Console.WriteLine(message.Message); await Notify(message); await Task.Delay(task.Duration); message.Message = $"Package {message.Data.Name} was successfully appoved by {task.Section}"; + Console.WriteLine(message.Message); await Notify(message); } await Task.Delay(300); message.Message = $"Package {message.Data.Name} was successfully approved"; + Console.WriteLine(message.Message); await Complete(message); - - Console.WriteLine($"Processing package {message.Data.Name} with process {process.Name} was successful"); } static Process GenerateProcess(Package package) => package.Intent switch diff --git a/tests/cross-service-signalr/app/core-cli/Clients/ProcessorClient.cs b/tests/cross-service-signalr/app/core-cli/Clients/ProcessorClient.cs index 17f5476..b3caad2 100644 --- a/tests/cross-service-signalr/app/core-cli/Clients/ProcessorClient.cs +++ b/tests/cross-service-signalr/app/core-cli/Clients/ProcessorClient.cs @@ -6,7 +6,8 @@ public class ProcessorClient : SyncService { public ProcessorClient(string endpoint) : base(endpoint) { - OnPush = OnNotify = Output; + OnPush.Set(Output); + OnNotify.Set(Output); } static Task Output(SyncMessage message) diff --git a/tests/cross-service-signalr/app/core-cli/Commands/ProcessCommand.cs b/tests/cross-service-signalr/app/core-cli/Commands/ProcessCommand.cs index ba781da..c898171 100644 --- a/tests/cross-service-signalr/app/core-cli/Commands/ProcessCommand.cs +++ b/tests/cross-service-signalr/app/core-cli/Commands/ProcessCommand.cs @@ -36,12 +36,12 @@ static async Task Call(string api, string sync, Intent intent) bool exit = false; await using ProcessorClient processor = new(sync); - processor.OnComplete = async (SyncMessage message) => + processor.OnComplete.Set(async (SyncMessage message) => { Console.WriteLine(message.Message); await processor.Leave(message.Key); exit = true; - }; + }); Console.WriteLine($"Generating {intent.ToActionString()} Package"); Package package = GeneratePackage(intent);