Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 43 additions & 21 deletions Sources/Runtime/Microsoft.Psi.Interop/Transport/TcpWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
namespace Microsoft.Psi.Interop.Transport
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net;
using System.Net.Sockets;
Expand All @@ -21,8 +22,8 @@ public class TcpWriter<T> : IConsumer<T>, IDisposable
private readonly string name;

private TcpListener listener;
private TcpClient client;
private NetworkStream networkStream;
private List<TcpClient> clients;
private Thread acceptingThread;

/// <summary>
/// Initializes a new instance of the <see cref="TcpWriter{T}"/> class.
Expand All @@ -38,6 +39,7 @@ public TcpWriter(Pipeline pipeline, int port, IFormatSerializer serializer, stri
this.Port = port;
this.In = pipeline.CreateReceiver<T>(this, this.Receive, nameof(this.In));
this.listener = new TcpListener(IPAddress.Any, port);
this.clients = new List<TcpClient>();
this.Start();
}

Expand All @@ -63,49 +65,69 @@ private void Receive(T message, Envelope envelope)
{
(var bytes, int offset, int count) = this.serializer.SerializeMessage(message, envelope.OriginatingTime);

try
if (this.clients.Count != 0)
{
if (this.networkStream != null)
List<TcpClient> clientsToRemove = new List<TcpClient>();
foreach (var client in this.clients)
{
this.networkStream.Write(BitConverter.GetBytes(count), 0, sizeof(int));
this.networkStream.Write(bytes, offset, count);
if (!client.Connected)
{
clientsToRemove.Add(client);
continue;
}

try
{
var stream = client.GetStream();
stream.Write(BitConverter.GetBytes(count), 0, sizeof(int));
stream.Write(bytes, offset, count);
}
catch (Exception ex)
{
Trace.WriteLine($"TcpWriter Exception: {ex.Message}");
clientsToRemove.Add(client);
}
}
}
catch (Exception ex)
{
Trace.WriteLine($"TcpWriter Exception: {ex.Message}");

// Restart the server
this.Stop();
this.Start();
clientsToRemove.ForEach(client =>
{
this.clients.Remove(client);
});
}
}

private void Start()
{
new Thread(new ThreadStart(this.Listen)) { IsBackground = true }.Start();
this.acceptingThread = new Thread(new ThreadStart(this.Listen)) { IsBackground = true };
this.acceptingThread.Start();
}

private void Stop()
{
this.acceptingThread.Abort();

// Dispose active client if any
this.networkStream?.Dispose();
this.networkStream = null;
this.client?.Dispose();
this.client = null;
if (this.clients.Count != 0)
{
foreach (var client in this.clients)
{
client.Dispose();
}
}

this.clients.Clear();
this.listener.Stop();
this.listener = null;
}

private void Listen()
{
if (this.listener != null)
while (this.listener != null)
{
try
{
this.listener.Start();
this.client = this.listener.AcceptTcpClient();
this.networkStream = this.client.GetStream();
this.clients.Add(this.listener.AcceptTcpClient());
}
catch (Exception ex)
{
Expand Down