Skip to content

Commit

Permalink
add reconnect resilience to SyncService
Browse files Browse the repository at this point in the history
Refactor SyncAction to allow events to be modified at any time
  • Loading branch information
JaimeStill committed Mar 6, 2023
1 parent 8657e26 commit 77a5a56
Show file tree
Hide file tree
Showing 13 changed files with 135 additions and 76 deletions.
1 change: 0 additions & 1 deletion src/Arma.Demo.Core/Sync/ISyncService.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
namespace Arma.Demo.Core.Sync;
public interface ISyncService<T> : IAsyncDisposable
{
Task EnsureConnection();
Task Connect();
Task Join(Guid key);
Task Leave(Guid key);
Expand Down
36 changes: 30 additions & 6 deletions src/Arma.Demo.Core/Sync/SyncAction.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,33 @@
using Microsoft.AspNetCore.SignalR.Client;

namespace Arma.Demo.Core.Sync;
public enum SyncAction
public class SyncEvent<T> : IDisposable
{
Push,
Notify,
Complete,
Return,
Reject
private readonly string method;
private readonly HubConnection client;
private IDisposable subscription;

public SyncEvent(string method, HubConnection client)
{
this.method = method;
this.client = client;
}

public void Set(Func<T, Task> action)
{
subscription?.Dispose();
subscription = client.On(method, action);
}

public void Set(Action<T> action)
{
subscription?.Dispose();
subscription = client.On(method, action);
}

public void Dispose()
{
subscription?.Dispose();
GC.SuppressFinalize(this);
}
}
9 changes: 9 additions & 0 deletions src/Arma.Demo.Core/Sync/SyncActionType.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace Arma.Demo.Core.Sync;
public enum SyncActionType
{
Push,
Notify,
Complete,
Return,
Reject
}
2 changes: 1 addition & 1 deletion src/Arma.Demo.Core/Sync/SyncMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ public class SyncMessage<T>
public Guid Key { get; set; }
public string Message { get; set; }
public T Data { get; set; }
public SyncAction Action { get; set; }
public SyncActionType Action { get; set; }
}
113 changes: 67 additions & 46 deletions src/Arma.Demo.Core/Sync/SyncService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,58 @@ public abstract class SyncService<T> : ISyncService<T>
{
protected readonly HubConnection connection;
protected readonly string endpoint;
protected CancellationToken token;

protected List<Guid> Groups { get; set; }
public virtual Action<Guid> OnRegistered { get; set; }
public virtual Func<SyncMessage<T>, Task> OnPush { get; set; }
public virtual Func<SyncMessage<T>, Task> OnNotify { get; set; }
public virtual Func<SyncMessage<T>, Task> OnComplete { get; set; }
public virtual Func<SyncMessage<T>, Task> OnReturn { get; set; }
public virtual Func<SyncMessage<T>, Task> OnReject { get; set; }

public SyncEvent<Guid> OnRegistered { get; private set; }
public SyncEvent<SyncMessage<T>> OnPush { get; private set; }
public SyncEvent<SyncMessage<T>> OnNotify { get; private set; }
public SyncEvent<SyncMessage<T>> OnComplete { get; private set; }
public SyncEvent<SyncMessage<T>> OnReturn { get; private set; }
public SyncEvent<SyncMessage<T>> OnReject { get; private set; }

public SyncService(string endpoint)
{
Console.WriteLine($"Building Sync connection at {endpoint}");
connection = BuildHubConnection(endpoint);

this.endpoint = endpoint;
Groups = new();
}
InitializeEvents();

protected virtual HubConnection BuildHubConnection(string endpoint) =>
new HubConnectionBuilder()
.WithUrl(endpoint)
.WithAutomaticReconnect()
.Build();

public async Task EnsureConnection()
{
if (connection.State != HubConnectionState.Connected)
connection.Closed += async (error) =>
{
await Task.Delay(5000);
await Connect();
};

this.endpoint = endpoint;
Groups = new();
token = new();
}

public async Task Connect()
{
if (connection is not null)
if (connection.State != HubConnectionState.Connected)
{
Console.WriteLine("Registering Sync events");
RegisterEvents();
Console.WriteLine($"Connecting to {endpoint}");
await connection.StartAsync();
while (true)
{
try
{
Console.WriteLine($"Connecting to {endpoint}");
await connection.StartAsync(token);
return;
}
catch when (token.IsCancellationRequested)
{
return;
}
catch (Exception ex)
{
Console.WriteLine($"Failed to connect to {endpoint}");
Console.WriteLine(ex.Message);
await Task.Delay(5000);
}
}
}
}

Expand All @@ -62,63 +76,70 @@ public async Task Leave(Guid key)

public async Task Push(SyncMessage<T> message)
{
message.Action = SyncAction.Push;
message.Action = SyncActionType.Push;
await connection.InvokeAsync("SendPush", message);
}

public async Task Notify(SyncMessage<T> message)
{
message.Action = SyncAction.Notify;
message.Action = SyncActionType.Notify;
await connection.InvokeAsync("SendNotify", message);
}

public async Task Complete(SyncMessage<T> message)
{
message.Action = SyncAction.Complete;
message.Action = SyncActionType.Complete;
await connection.InvokeAsync("SendComplete", message);
}

public async Task Return(SyncMessage<T> message)
{
message.Action = SyncAction.Return;
message.Action = SyncActionType.Return;
await connection.InvokeAsync("SendReturn", message);
}

public async Task Reject(SyncMessage<T> message)
{
message.Action = SyncAction.Reject;
message.Action = SyncActionType.Reject;
await connection.InvokeAsync("SendReject", message);
}

void RegisterEvents()
{
if (OnRegistered is not null)
connection.On("Registered", OnRegistered);

if (OnPush is not null)
connection.On("Push", OnPush);

if (OnNotify is not null)
connection.On("Notify", OnNotify);

if (OnComplete is not null)
connection.On("Complete", OnComplete);

if (OnReturn is not null)
connection.On("Return", OnReturn);
protected virtual HubConnection BuildHubConnection(string endpoint) =>
new HubConnectionBuilder()
.WithUrl(endpoint)
.WithAutomaticReconnect()
.Build();

if (OnReject is not null)
connection.On("Reject", OnReject);
protected void InitializeEvents()
{
OnRegistered = new("Registered", connection);
OnPush = new("Push", connection);
OnNotify = new("Notify", connection);
OnComplete = new("Complete", connection);
OnReturn = new("Return", connection);
OnReject = new("Reject", connection);
}

public async ValueTask DisposeAsync()
{
DisposeEvents();

await DisposeConnection()
.ConfigureAwait(false);

GC.SuppressFinalize(this);
}

protected void DisposeEvents()
{
OnRegistered?.Dispose();
OnPush?.Dispose();
OnNotify?.Dispose();
OnComplete?.Dispose();
OnReturn?.Dispose();
OnReject?.Dispose();
}

protected async ValueTask DisposeConnection()
{
if (connection is not null)
Expand Down
3 changes: 2 additions & 1 deletion src/cli/Clients/ProcessorClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ public class ProcessorClient : SyncService<Package>
{
public ProcessorClient(string endpoint) : base(endpoint)
{
OnPush = OnNotify = Output;
OnPush.Set(Output);
OnNotify.Set(Output);
}

static Task Output(SyncMessage<Package> message)
Expand Down
4 changes: 2 additions & 2 deletions src/cli/Commands/ProcessCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ static async Task Call(string server, string api, string sync, Intent intent)
bool exit = false;
await using ProcessorClient processor = new(sync);

processor.OnComplete = async (SyncMessage<Package> message) =>
processor.OnComplete.Set(async (SyncMessage<Package> message) =>
{
Console.WriteLine(message.Message);
await processor.Leave(message.Key);
exit = true;
};
});

Console.WriteLine($"Generating {intent.ToActionString()} Package");
Package package = GeneratePackage(intent);
Expand Down
3 changes: 1 addition & 2 deletions src/jps-core-api/Services/ProcessorService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public ProcessorService(IConfiguration config) : base(

public async Task<bool> SendPackage(Package package)
{
await EnsureConnection();
await Connect();

await Join(package.Key);

Expand All @@ -19,7 +19,6 @@ public async Task<bool> SendPackage(Package package)
Id = Guid.NewGuid(),
Key = package.Key,
Data = package,
Action = SyncAction.Push,
Message = $"Initializing Package {package.Name} for processing"
};

Expand Down
15 changes: 9 additions & 6 deletions src/jps-processor/Services/ProcessorService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ public ProcessorService(IConfiguration config) : base(
config.GetValue<string>("SyncServer:ProcessorUrl") ?? "https://jps-sync.azurewebsites.net/processor"
)
{
OnRegistered = (Guid key) =>
OnRegistered.Set((Guid key) =>
{
Console.WriteLine($"Service successfully registered at {key}");
Key = key;
};
});

OnPush = ProcessPackage;
OnPush.Set(ProcessPackage);
}

public static async Task Initialize(IServiceProvider services)
Expand All @@ -33,7 +33,7 @@ public async Task Register()
if (!Key.HasValue)
{
Key = Guid.NewGuid();
await EnsureConnection();
await Connect();
Console.WriteLine($"Registering service with key {Key}");
await connection.InvokeAsync("RegisterService", Key);
}
Expand All @@ -49,31 +49,34 @@ public async Task ProcessPackage(SyncMessage<Package> message)

message.Message = $"Submitting package {message.Data.Name} for {message.Data.Intent.ToActionString()}";

Console.WriteLine(message.Message);
await Notify(message);

await Task.Delay(1200);

message.Message = $"Package {message.Data.Name} assigned process {process.Name}";

Console.WriteLine(message.Message);
await Notify(message);

foreach (ProcessTask task in process.Tasks)
{
message.Message = $"Current step: {task.Name}";
Console.WriteLine(message.Message);
await Notify(message);

await Task.Delay(task.Duration);

message.Message = $"Package {message.Data.Name} was successfully appoved by {task.Section}";
Console.WriteLine(message.Message);
await Notify(message);
}

await Task.Delay(300);

message.Message = $"Package {message.Data.Name} was successfully approved";
Console.WriteLine(message.Message);
await Complete(message);

Console.WriteLine($"Processing package {message.Data.Name} with process {process.Name} was successful");
}

static Process GenerateProcess(Package package) => package.Intent switch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public ProcessorService(IConfiguration config) : base(

public async Task<bool> SendPackage(Package package)
{
await EnsureConnection();
await Connect();

await Join(package.Key);

Expand All @@ -19,7 +19,6 @@ public async Task<bool> SendPackage(Package package)
Id = Guid.NewGuid(),
Key = package.Key,
Data = package,
Action = SyncAction.Push,
Message = $"Initializing Package {package.Name} for processing"
};

Expand Down
Loading

0 comments on commit 77a5a56

Please sign in to comment.