|
4 | 4 | using System.Runtime.InteropServices; |
5 | 5 | using System.Threading; |
6 | 6 | using System.Threading.Tasks; |
| 7 | +using StreamExtended.Helpers; |
7 | 8 | using Titanium.Web.Proxy.Extensions; |
8 | 9 | using Titanium.Web.Proxy.Network.Tcp; |
9 | 10 |
|
@@ -109,15 +110,115 @@ internal static TcpRow GetTcpRowByLocalPort(IpVersion ipVersion, int localPort) |
109 | 110 | /// <summary> |
110 | 111 | /// relays the input clientStream to the server at the specified host name and port with the given httpCmd and headers as prefix |
111 | 112 | /// Usefull for websocket requests |
| 113 | + /// Asynchronous Programming Model, which does not throw exceptions when the socket is closed |
112 | 114 | /// </summary> |
113 | 115 | /// <param name="clientStream"></param> |
114 | 116 | /// <param name="serverStream"></param> |
115 | 117 | /// <param name="bufferSize"></param> |
116 | 118 | /// <param name="onDataSend"></param> |
117 | 119 | /// <param name="onDataReceive"></param> |
| 120 | + /// <param name="exceptionFunc"></param> |
118 | 121 | /// <returns></returns> |
119 | | - internal static async Task SendRaw(Stream clientStream, Stream serverStream, int bufferSize, |
120 | | - Action<byte[], int, int> onDataSend, Action<byte[], int, int> onDataReceive) |
| 122 | + internal static async Task SendRawApm(Stream clientStream, Stream serverStream, int bufferSize, |
| 123 | + Action<byte[], int, int> onDataSend, Action<byte[], int, int> onDataReceive, Action<Exception> exceptionFunc) |
| 124 | + { |
| 125 | + var tcs = new TaskCompletionSource<bool>(); |
| 126 | + var cts = new CancellationTokenSource(); |
| 127 | + cts.Token.Register(() => tcs.TrySetResult(true)); |
| 128 | + |
| 129 | + //Now async relay all server=>client & client=>server data |
| 130 | + byte[] clientBuffer = BufferPool.GetBuffer(bufferSize); |
| 131 | + byte[] serverBuffer = BufferPool.GetBuffer(bufferSize); |
| 132 | + try |
| 133 | + { |
| 134 | + BeginRead(clientStream, serverStream, clientBuffer, cts, onDataSend, exceptionFunc); |
| 135 | + BeginRead(serverStream, clientStream, serverBuffer, cts, onDataReceive, exceptionFunc); |
| 136 | + await tcs.Task; |
| 137 | + } |
| 138 | + finally |
| 139 | + { |
| 140 | + BufferPool.ReturnBuffer(clientBuffer); |
| 141 | + BufferPool.ReturnBuffer(serverBuffer); |
| 142 | + } |
| 143 | + } |
| 144 | + |
| 145 | + private static void BeginRead(Stream inputStream, Stream outputStream, byte[] buffer, CancellationTokenSource cts, Action<byte[], int, int> onCopy, Action<Exception> exceptionFunc) |
| 146 | + { |
| 147 | + if (cts.IsCancellationRequested) |
| 148 | + { |
| 149 | + return; |
| 150 | + } |
| 151 | + |
| 152 | + bool readFlag = false; |
| 153 | + var readCallback = (AsyncCallback)(ar => |
| 154 | + { |
| 155 | + if (cts.IsCancellationRequested || readFlag) |
| 156 | + { |
| 157 | + return; |
| 158 | + } |
| 159 | + |
| 160 | + readFlag = true; |
| 161 | + |
| 162 | + try |
| 163 | + { |
| 164 | + int read = inputStream.EndRead(ar); |
| 165 | + if (read <= 0) |
| 166 | + { |
| 167 | + cts.Cancel(); |
| 168 | + return; |
| 169 | + } |
| 170 | + |
| 171 | + onCopy?.Invoke(buffer, 0, read); |
| 172 | + |
| 173 | + var writeCallback = (AsyncCallback)(ar2 => |
| 174 | + { |
| 175 | + if (cts.IsCancellationRequested) |
| 176 | + { |
| 177 | + return; |
| 178 | + } |
| 179 | + |
| 180 | + try |
| 181 | + { |
| 182 | + outputStream.EndWrite(ar2); |
| 183 | + BeginRead(inputStream, outputStream, buffer, cts, onCopy, exceptionFunc); |
| 184 | + } |
| 185 | + catch (IOException ex) |
| 186 | + { |
| 187 | + cts.Cancel(); |
| 188 | + exceptionFunc(ex); |
| 189 | + } |
| 190 | + }); |
| 191 | + |
| 192 | + outputStream.BeginWrite(buffer, 0, read, writeCallback, null); |
| 193 | + } |
| 194 | + catch (IOException ex) |
| 195 | + { |
| 196 | + cts.Cancel(); |
| 197 | + exceptionFunc(ex); |
| 198 | + } |
| 199 | + }); |
| 200 | + |
| 201 | + var readResult = inputStream.BeginRead(buffer, 0, buffer.Length, readCallback, null); |
| 202 | + if (readResult.CompletedSynchronously) |
| 203 | + { |
| 204 | + readCallback(readResult); |
| 205 | + } |
| 206 | + } |
| 207 | + |
| 208 | + /// <summary> |
| 209 | + /// relays the input clientStream to the server at the specified host name and port with the given httpCmd and headers as prefix |
| 210 | + /// Usefull for websocket requests |
| 211 | + /// Task-based Asynchronous Pattern |
| 212 | + /// </summary> |
| 213 | + /// <param name="clientStream"></param> |
| 214 | + /// <param name="serverStream"></param> |
| 215 | + /// <param name="bufferSize"></param> |
| 216 | + /// <param name="onDataSend"></param> |
| 217 | + /// <param name="onDataReceive"></param> |
| 218 | + /// <param name="exceptionFunc"></param> |
| 219 | + /// <returns></returns> |
| 220 | + internal static async Task SendRawTap(Stream clientStream, Stream serverStream, int bufferSize, |
| 221 | + Action<byte[], int, int> onDataSend, Action<byte[], int, int> onDataReceive, Action<Exception> exceptionFunc) |
121 | 222 | { |
122 | 223 | var cts = new CancellationTokenSource(); |
123 | 224 |
|
|
0 commit comments