Skip to content

Commit

Permalink
Merge pull request #28 from JaimeStill/demo
Browse files Browse the repository at this point in the history
add reconnect resilience to SyncService
  • Loading branch information
JaimeStill authored Mar 6, 2023
2 parents 8657e26 + 77a5a56 commit 1ec5277
Show file tree
Hide file tree
Showing 13 changed files with 135 additions and 76 deletions.
1 change: 0 additions & 1 deletion src/Arma.Demo.Core/Sync/ISyncService.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
namespace Arma.Demo.Core.Sync;
public interface ISyncService<T> : IAsyncDisposable
{
Task EnsureConnection();
Task Connect();
Task Join(Guid key);
Task Leave(Guid key);
Expand Down
36 changes: 30 additions & 6 deletions src/Arma.Demo.Core/Sync/SyncAction.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,33 @@
using Microsoft.AspNetCore.SignalR.Client;

namespace Arma.Demo.Core.Sync;
public enum SyncAction
public class SyncEvent<T> : IDisposable
{
Push,
Notify,
Complete,
Return,
Reject
private readonly string method;
private readonly HubConnection client;
private IDisposable subscription;

public SyncEvent(string method, HubConnection client)
{
this.method = method;
this.client = client;
}

public void Set(Func<T, Task> action)
{
subscription?.Dispose();
subscription = client.On(method, action);
}

public void Set(Action<T> action)
{
subscription?.Dispose();
subscription = client.On(method, action);
}

public void Dispose()
{
subscription?.Dispose();
GC.SuppressFinalize(this);
}
}
9 changes: 9 additions & 0 deletions src/Arma.Demo.Core/Sync/SyncActionType.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace Arma.Demo.Core.Sync;
public enum SyncActionType
{
Push,
Notify,
Complete,
Return,
Reject
}
2 changes: 1 addition & 1 deletion src/Arma.Demo.Core/Sync/SyncMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ public class SyncMessage<T>
public Guid Key { get; set; }
public string Message { get; set; }
public T Data { get; set; }
public SyncAction Action { get; set; }
public SyncActionType Action { get; set; }
}
113 changes: 67 additions & 46 deletions src/Arma.Demo.Core/Sync/SyncService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,58 @@ public abstract class SyncService<T> : ISyncService<T>
{
protected readonly HubConnection connection;
protected readonly string endpoint;
protected CancellationToken token;

protected List<Guid> Groups { get; set; }
public virtual Action<Guid> OnRegistered { get; set; }
public virtual Func<SyncMessage<T>, Task> OnPush { get; set; }
public virtual Func<SyncMessage<T>, Task> OnNotify { get; set; }
public virtual Func<SyncMessage<T>, Task> OnComplete { get; set; }
public virtual Func<SyncMessage<T>, Task> OnReturn { get; set; }
public virtual Func<SyncMessage<T>, Task> OnReject { get; set; }

public SyncEvent<Guid> OnRegistered { get; private set; }
public SyncEvent<SyncMessage<T>> OnPush { get; private set; }
public SyncEvent<SyncMessage<T>> OnNotify { get; private set; }
public SyncEvent<SyncMessage<T>> OnComplete { get; private set; }
public SyncEvent<SyncMessage<T>> OnReturn { get; private set; }
public SyncEvent<SyncMessage<T>> OnReject { get; private set; }

public SyncService(string endpoint)
{
Console.WriteLine($"Building Sync connection at {endpoint}");
connection = BuildHubConnection(endpoint);

this.endpoint = endpoint;
Groups = new();
}
InitializeEvents();

protected virtual HubConnection BuildHubConnection(string endpoint) =>
new HubConnectionBuilder()
.WithUrl(endpoint)
.WithAutomaticReconnect()
.Build();

public async Task EnsureConnection()
{
if (connection.State != HubConnectionState.Connected)
connection.Closed += async (error) =>
{
await Task.Delay(5000);
await Connect();
};

this.endpoint = endpoint;
Groups = new();
token = new();
}

public async Task Connect()
{
if (connection is not null)
if (connection.State != HubConnectionState.Connected)
{
Console.WriteLine("Registering Sync events");
RegisterEvents();
Console.WriteLine($"Connecting to {endpoint}");
await connection.StartAsync();
while (true)
{
try
{
Console.WriteLine($"Connecting to {endpoint}");
await connection.StartAsync(token);
return;
}
catch when (token.IsCancellationRequested)
{
return;
}
catch (Exception ex)
{
Console.WriteLine($"Failed to connect to {endpoint}");
Console.WriteLine(ex.Message);
await Task.Delay(5000);
}
}
}
}

Expand All @@ -62,63 +76,70 @@ public async Task Leave(Guid key)

public async Task Push(SyncMessage<T> message)
{
message.Action = SyncAction.Push;
message.Action = SyncActionType.Push;
await connection.InvokeAsync("SendPush", message);
}

public async Task Notify(SyncMessage<T> message)
{
message.Action = SyncAction.Notify;
message.Action = SyncActionType.Notify;
await connection.InvokeAsync("SendNotify", message);
}

public async Task Complete(SyncMessage<T> message)
{
message.Action = SyncAction.Complete;
message.Action = SyncActionType.Complete;
await connection.InvokeAsync("SendComplete", message);
}

public async Task Return(SyncMessage<T> message)
{
message.Action = SyncAction.Return;
message.Action = SyncActionType.Return;
await connection.InvokeAsync("SendReturn", message);
}

public async Task Reject(SyncMessage<T> message)
{
message.Action = SyncAction.Reject;
message.Action = SyncActionType.Reject;
await connection.InvokeAsync("SendReject", message);
}

void RegisterEvents()
{
if (OnRegistered is not null)
connection.On("Registered", OnRegistered);

if (OnPush is not null)
connection.On("Push", OnPush);

if (OnNotify is not null)
connection.On("Notify", OnNotify);

if (OnComplete is not null)
connection.On("Complete", OnComplete);

if (OnReturn is not null)
connection.On("Return", OnReturn);
protected virtual HubConnection BuildHubConnection(string endpoint) =>
new HubConnectionBuilder()
.WithUrl(endpoint)
.WithAutomaticReconnect()
.Build();

if (OnReject is not null)
connection.On("Reject", OnReject);
protected void InitializeEvents()
{
OnRegistered = new("Registered", connection);
OnPush = new("Push", connection);
OnNotify = new("Notify", connection);
OnComplete = new("Complete", connection);
OnReturn = new("Return", connection);
OnReject = new("Reject", connection);
}

public async ValueTask DisposeAsync()
{
DisposeEvents();

await DisposeConnection()
.ConfigureAwait(false);

GC.SuppressFinalize(this);
}

protected void DisposeEvents()
{
OnRegistered?.Dispose();
OnPush?.Dispose();
OnNotify?.Dispose();
OnComplete?.Dispose();
OnReturn?.Dispose();
OnReject?.Dispose();
}

protected async ValueTask DisposeConnection()
{
if (connection is not null)
Expand Down
3 changes: 2 additions & 1 deletion src/cli/Clients/ProcessorClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ public class ProcessorClient : SyncService<Package>
{
public ProcessorClient(string endpoint) : base(endpoint)
{
OnPush = OnNotify = Output;
OnPush.Set(Output);
OnNotify.Set(Output);
}

static Task Output(SyncMessage<Package> message)
Expand Down
4 changes: 2 additions & 2 deletions src/cli/Commands/ProcessCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ static async Task Call(string server, string api, string sync, Intent intent)
bool exit = false;
await using ProcessorClient processor = new(sync);

processor.OnComplete = async (SyncMessage<Package> message) =>
processor.OnComplete.Set(async (SyncMessage<Package> message) =>
{
Console.WriteLine(message.Message);
await processor.Leave(message.Key);
exit = true;
};
});

Console.WriteLine($"Generating {intent.ToActionString()} Package");
Package package = GeneratePackage(intent);
Expand Down
3 changes: 1 addition & 2 deletions src/jps-core-api/Services/ProcessorService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public ProcessorService(IConfiguration config) : base(

public async Task<bool> SendPackage(Package package)
{
await EnsureConnection();
await Connect();

await Join(package.Key);

Expand All @@ -19,7 +19,6 @@ public async Task<bool> SendPackage(Package package)
Id = Guid.NewGuid(),
Key = package.Key,
Data = package,
Action = SyncAction.Push,
Message = $"Initializing Package {package.Name} for processing"
};

Expand Down
15 changes: 9 additions & 6 deletions src/jps-processor/Services/ProcessorService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ public ProcessorService(IConfiguration config) : base(
config.GetValue<string>("SyncServer:ProcessorUrl") ?? "https://jps-sync.azurewebsites.net/processor"
)
{
OnRegistered = (Guid key) =>
OnRegistered.Set((Guid key) =>
{
Console.WriteLine($"Service successfully registered at {key}");
Key = key;
};
});

OnPush = ProcessPackage;
OnPush.Set(ProcessPackage);
}

public static async Task Initialize(IServiceProvider services)
Expand All @@ -33,7 +33,7 @@ public async Task Register()
if (!Key.HasValue)
{
Key = Guid.NewGuid();
await EnsureConnection();
await Connect();
Console.WriteLine($"Registering service with key {Key}");
await connection.InvokeAsync("RegisterService", Key);
}
Expand All @@ -49,31 +49,34 @@ public async Task ProcessPackage(SyncMessage<Package> message)

message.Message = $"Submitting package {message.Data.Name} for {message.Data.Intent.ToActionString()}";

Console.WriteLine(message.Message);
await Notify(message);

await Task.Delay(1200);

message.Message = $"Package {message.Data.Name} assigned process {process.Name}";

Console.WriteLine(message.Message);
await Notify(message);

foreach (ProcessTask task in process.Tasks)
{
message.Message = $"Current step: {task.Name}";
Console.WriteLine(message.Message);
await Notify(message);

await Task.Delay(task.Duration);

message.Message = $"Package {message.Data.Name} was successfully appoved by {task.Section}";
Console.WriteLine(message.Message);
await Notify(message);
}

await Task.Delay(300);

message.Message = $"Package {message.Data.Name} was successfully approved";
Console.WriteLine(message.Message);
await Complete(message);

Console.WriteLine($"Processing package {message.Data.Name} with process {process.Name} was successful");
}

static Process GenerateProcess(Package package) => package.Intent switch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public ProcessorService(IConfiguration config) : base(

public async Task<bool> SendPackage(Package package)
{
await EnsureConnection();
await Connect();

await Join(package.Key);

Expand All @@ -19,7 +19,6 @@ public async Task<bool> SendPackage(Package package)
Id = Guid.NewGuid(),
Key = package.Key,
Data = package,
Action = SyncAction.Push,
Message = $"Initializing Package {package.Name} for processing"
};

Expand Down
Loading

0 comments on commit 1ec5277

Please sign in to comment.