diff --git a/Sources/Runtime/Microsoft.Psi.Interop/Transport/TcpWriter.cs b/Sources/Runtime/Microsoft.Psi.Interop/Transport/TcpWriter.cs index cdc106513..0ddfff10a 100644 --- a/Sources/Runtime/Microsoft.Psi.Interop/Transport/TcpWriter.cs +++ b/Sources/Runtime/Microsoft.Psi.Interop/Transport/TcpWriter.cs @@ -4,6 +4,7 @@ namespace Microsoft.Psi.Interop.Transport { using System; + using System.Collections.Generic; using System.Diagnostics; using System.Net; using System.Net.Sockets; @@ -21,8 +22,8 @@ public class TcpWriter : IConsumer, IDisposable private readonly string name; private TcpListener listener; - private TcpClient client; - private NetworkStream networkStream; + private List clients; + private Thread acceptingThread; /// /// Initializes a new instance of the class. @@ -38,6 +39,7 @@ public TcpWriter(Pipeline pipeline, int port, IFormatSerializer serializer, stri this.Port = port; this.In = pipeline.CreateReceiver(this, this.Receive, nameof(this.In)); this.listener = new TcpListener(IPAddress.Any, port); + this.clients = new List(); this.Start(); } @@ -63,49 +65,69 @@ private void Receive(T message, Envelope envelope) { (var bytes, int offset, int count) = this.serializer.SerializeMessage(message, envelope.OriginatingTime); - try + if (this.clients.Count != 0) { - if (this.networkStream != null) + List clientsToRemove = new List(); + foreach (var client in this.clients) { - this.networkStream.Write(BitConverter.GetBytes(count), 0, sizeof(int)); - this.networkStream.Write(bytes, offset, count); + if (!client.Connected) + { + clientsToRemove.Add(client); + continue; + } + + try + { + var stream = client.GetStream(); + stream.Write(BitConverter.GetBytes(count), 0, sizeof(int)); + stream.Write(bytes, offset, count); + } + catch (Exception ex) + { + Trace.WriteLine($"TcpWriter Exception: {ex.Message}"); + clientsToRemove.Add(client); + } } - } - catch (Exception ex) - { - Trace.WriteLine($"TcpWriter Exception: {ex.Message}"); - // Restart the server - this.Stop(); - this.Start(); + clientsToRemove.ForEach(client => + { + this.clients.Remove(client); + }); } } private void Start() { - new Thread(new ThreadStart(this.Listen)) { IsBackground = true }.Start(); + this.acceptingThread = new Thread(new ThreadStart(this.Listen)) { IsBackground = true }; + this.acceptingThread.Start(); } private void Stop() { + this.acceptingThread.Abort(); + // Dispose active client if any - this.networkStream?.Dispose(); - this.networkStream = null; - this.client?.Dispose(); - this.client = null; + if (this.clients.Count != 0) + { + foreach (var client in this.clients) + { + client.Dispose(); + } + } + this.clients.Clear(); this.listener.Stop(); + this.listener = null; } private void Listen() { - if (this.listener != null) + while (this.listener != null) { try { this.listener.Start(); - this.client = this.listener.AcceptTcpClient(); - this.networkStream = this.client.GetStream(); + this.clients.Add(this.listener.AcceptTcpClient()); } catch (Exception ex) {