Skip to content

Commit 198bdfb

Browse files
committed
修复接收和发送超时时没有触发连接中断的BUG
1 parent 59ec893 commit 198bdfb

File tree

2 files changed

+122
-129
lines changed

2 files changed

+122
-129
lines changed

Quick.Protocol/QpChannel_Recv.cs

+117-126
Original file line numberDiff line numberDiff line change
@@ -178,121 +178,122 @@ private async Task ReadRecvPipeAsync(PipeReader recvReader, CancellationToken to
178178
byte[] decryptBuffer2 = null;
179179
//解压相关变量
180180
Pipe decompressPipe = null;
181-
try
181+
182+
while (!token.IsCancellationRequested)
182183
{
183-
while (!token.IsCancellationRequested)
184+
var currentReader = recvReader;
185+
186+
var readTask = currentReader.ReadAtLeastAsync(PACKAGE_TOTAL_LENGTH_LENGTH, token);
187+
var ret = await readTask.AsTask()
188+
.WaitAsync(TimeSpan.FromMilliseconds(options.InternalTransportTimeout))
189+
.ConfigureAwait(false);
190+
if (ret.IsCanceled)
191+
return;
192+
if (ret.Buffer.Length < PACKAGE_TOTAL_LENGTH_LENGTH)
193+
throw new ProtocolException(ret.Buffer, $"包头读取错误!包头长度:{PACKAGE_TOTAL_LENGTH_LENGTH},读取数据长度:{ret.Buffer.Length}");
194+
195+
//解析包总长度
196+
packageTotalLength = parsePackageTotalLength(ret.Buffer, packageHeadBuffer);
197+
currentReader.AdvanceTo(ret.Buffer.Start);
198+
199+
//读取完整包
200+
ret = await recvReader.ReadAtLeastAsync(packageTotalLength, token).ConfigureAwait(false);
201+
if (ret.IsCanceled)
202+
return;
203+
if (ret.Buffer.Length < packageTotalLength)
204+
throw new ProtocolException(ret.Buffer, $"包读取错误!包总长度:{packageTotalLength},读取数据长度:{ret.Buffer.Length}");
205+
var packageBuffer = ret.Buffer.Slice(0, packageTotalLength);
206+
207+
//如果设置了压缩或者加密
208+
if (options.InternalCompress || options.InternalEncrypt)
184209
{
185-
var currentReader = recvReader;
186-
var ret = await currentReader.ReadAtLeastAsync(PACKAGE_HEAD_LENGTH, token).ConfigureAwait(false);
187-
if (ret.IsCanceled)
188-
return;
189-
//解析包总长度
190-
packageTotalLength = parsePackageTotalLength(ret.Buffer, packageHeadBuffer);
191-
currentReader.AdvanceTo(ret.Buffer.Start);
192-
193-
//读取完整包
194-
ret = await recvReader.ReadAtLeastAsync(packageTotalLength, token).ConfigureAwait(false);
195-
if (ret.IsCanceled)
196-
return;
197-
if (ret.Buffer.Length < packageTotalLength)
198-
throw new ProtocolException(ret.Buffer, $"包读取错误!包总长度:{packageTotalLength},读取数据长度:{ret.Buffer.Length}");
199-
var packageBuffer = ret.Buffer.Slice(0, packageTotalLength);
200-
201-
//如果设置了压缩或者加密
202-
if (options.InternalCompress || options.InternalEncrypt)
210+
//如果设置了加密
211+
if (options.InternalEncrypt)
203212
{
204-
//如果设置了加密
205-
if (options.InternalEncrypt)
213+
//准备管道
214+
if (decryptPipe == null)
206215
{
207-
//准备管道
208-
if (decryptPipe == null)
209-
{
210-
decryptPipe = new Pipe();
211-
decryptBuffer1 = new byte[dec.InputBlockSize];
212-
decryptBuffer2 = new byte[dec.OutputBlockSize];
213-
}
216+
decryptPipe = new Pipe();
217+
decryptBuffer1 = new byte[dec.InputBlockSize];
218+
decryptBuffer2 = new byte[dec.OutputBlockSize];
219+
}
214220

215-
//写入包头
216-
decryptPipe.Writer.GetMemory(PACKAGE_TOTAL_LENGTH_LENGTH);
217-
decryptPipe.Writer.Advance(PACKAGE_TOTAL_LENGTH_LENGTH);
218-
packageTotalLength = PACKAGE_TOTAL_LENGTH_LENGTH;
221+
//写入包头
222+
decryptPipe.Writer.GetMemory(PACKAGE_TOTAL_LENGTH_LENGTH);
223+
decryptPipe.Writer.Advance(PACKAGE_TOTAL_LENGTH_LENGTH);
224+
packageTotalLength = PACKAGE_TOTAL_LENGTH_LENGTH;
219225

220-
//开始解密
221-
var encryptedBuffer = packageBuffer.Slice(PACKAGE_TOTAL_LENGTH_LENGTH).ToArray();
222-
var decryptBuffer = dec.TransformFinalBlock(encryptedBuffer, 0, encryptedBuffer.Length);
223-
packageTotalLength += decryptBuffer.Length;
226+
//开始解密
227+
var encryptedBuffer = packageBuffer.Slice(PACKAGE_TOTAL_LENGTH_LENGTH).ToArray();
228+
var decryptBuffer = dec.TransformFinalBlock(encryptedBuffer, 0, encryptedBuffer.Length);
229+
packageTotalLength += decryptBuffer.Length;
224230

225-
decryptBuffer.CopyTo(decryptPipe.Writer.GetMemory(decryptBuffer.Length));
226-
decryptPipe.Writer.Advance(decryptBuffer.Length);
231+
decryptBuffer.CopyTo(decryptPipe.Writer.GetMemory(decryptBuffer.Length));
232+
decryptPipe.Writer.Advance(decryptBuffer.Length);
227233

228-
_ = decryptPipe.Writer.FlushAsync();
229-
ret = await decryptPipe.Reader.ReadAtLeastAsync(packageTotalLength, token).ConfigureAwait(false);
234+
_ = decryptPipe.Writer.FlushAsync();
235+
ret = await decryptPipe.Reader.ReadAtLeastAsync(packageTotalLength, token).ConfigureAwait(false);
230236

231-
//解密完成,释放缓存
232-
currentReader?.AdvanceTo(packageBuffer.End);
237+
//解密完成,释放缓存
238+
currentReader?.AdvanceTo(packageBuffer.End);
233239

234-
packageBuffer = ret.Buffer;
235-
currentReader = decryptPipe.Reader;
236-
}
240+
packageBuffer = ret.Buffer;
241+
currentReader = decryptPipe.Reader;
242+
}
237243

238-
//如果设置了压缩
239-
if (options.InternalCompress)
244+
//如果设置了压缩
245+
if (options.InternalCompress)
246+
{
247+
//准备管道
248+
if (decompressPipe == null)
249+
decompressPipe = new Pipe();
250+
251+
//写入包头
252+
decompressPipe.Writer.GetMemory(PACKAGE_TOTAL_LENGTH_LENGTH);
253+
decompressPipe.Writer.Advance(PACKAGE_TOTAL_LENGTH_LENGTH);
254+
packageTotalLength = PACKAGE_TOTAL_LENGTH_LENGTH;
255+
256+
//开始解压
257+
var compressedBuffer = packageBuffer.Slice(PACKAGE_TOTAL_LENGTH_LENGTH);
258+
using (var readMs = new ReadOnlySequenceByteStream(compressedBuffer))
259+
using (var gzStream = new GZipStream(readMs, CompressionMode.Decompress, true))
240260
{
241-
//准备管道
242-
if (decompressPipe == null)
243-
decompressPipe = new Pipe();
244-
245-
//写入包头
246-
decompressPipe.Writer.GetMemory(PACKAGE_TOTAL_LENGTH_LENGTH);
247-
decompressPipe.Writer.Advance(PACKAGE_TOTAL_LENGTH_LENGTH);
248-
packageTotalLength = PACKAGE_TOTAL_LENGTH_LENGTH;
249-
250-
//开始解压
251-
var compressedBuffer = packageBuffer.Slice(PACKAGE_TOTAL_LENGTH_LENGTH);
252-
using (var readMs = new ReadOnlySequenceByteStream(compressedBuffer))
253-
using (var gzStream = new GZipStream(readMs, CompressionMode.Decompress, true))
261+
while (true)
254262
{
255-
while (true)
256-
{
257-
var count = await gzStream.ReadAsync(decompressPipe.Writer.GetMemory(minimumBufferSize), token).ConfigureAwait(false);
258-
if (count <= 0)
259-
break;
260-
decompressPipe.Writer.Advance(count);
261-
packageTotalLength += count;
262-
}
263+
var count = await gzStream.ReadAsync(decompressPipe.Writer.GetMemory(minimumBufferSize), token).ConfigureAwait(false);
264+
if (count <= 0)
265+
break;
266+
decompressPipe.Writer.Advance(count);
267+
packageTotalLength += count;
263268
}
264-
_ = Task.Run(async () =>
265-
{
266-
await decompressPipe.Writer.FlushAsync().ConfigureAwait(false);
267-
});
268-
ret = await decompressPipe.Reader.ReadAtLeastAsync(packageTotalLength, token).ConfigureAwait(false);
269-
//解压完成,释放缓存
270-
currentReader?.AdvanceTo(packageBuffer.End);
271-
packageBuffer = ret.Buffer;
272-
currentReader = decompressPipe.Reader;
273269
}
270+
_ = Task.Run(async () =>
271+
{
272+
await decompressPipe.Writer.FlushAsync().ConfigureAwait(false);
273+
});
274+
ret = await decompressPipe.Reader.ReadAtLeastAsync(packageTotalLength, token).ConfigureAwait(false);
275+
//解压完成,释放缓存
276+
currentReader?.AdvanceTo(packageBuffer.End);
277+
packageBuffer = ret.Buffer;
278+
currentReader = decompressPipe.Reader;
274279
}
275-
276-
//包类型
277-
packageBuffer.Slice(0, PACKAGE_HEAD_LENGTH).CopyTo(packageHeadBuffer);
278-
var packageType = (QpPackageType)packageHeadBuffer[PACKAGE_TOTAL_LENGTH_LENGTH];
279-
280-
if (LogUtils.LogPackage)
281-
LogUtils.Log(
282-
"{0}: [Recv-Package]Length:{1},Type:{2},Content:{3}",
283-
DateTime.Now,
284-
packageTotalLength,
285-
packageType,
286-
LogUtils.LogContent ?
287-
BitConverter.ToString(packageBuffer.ToArray())
288-
: LogUtils.NOT_SHOW_CONTENT_MESSAGE);
289-
HandlePackage(packageType, packageBuffer);
290-
currentReader?.AdvanceTo(packageBuffer.End);
291280
}
292-
}
293-
catch (Exception ex)
294-
{
295-
OnReadError(ex);
281+
282+
//包类型
283+
packageBuffer.Slice(0, PACKAGE_HEAD_LENGTH).CopyTo(packageHeadBuffer);
284+
var packageType = (QpPackageType)packageHeadBuffer[PACKAGE_TOTAL_LENGTH_LENGTH];
285+
286+
if (LogUtils.LogPackage)
287+
LogUtils.Log(
288+
"{0}: [Recv-Package]Length:{1},Type:{2},Content:{3}",
289+
DateTime.Now,
290+
packageTotalLength,
291+
packageType,
292+
LogUtils.LogContent ?
293+
BitConverter.ToString(packageBuffer.ToArray())
294+
: LogUtils.NOT_SHOW_CONTENT_MESSAGE);
295+
HandlePackage(packageType, packageBuffer);
296+
currentReader?.AdvanceTo(packageBuffer.End);
296297
}
297298
}
298299

@@ -387,43 +388,33 @@ protected void HandlePackage(QpPackageType packageType, ReadOnlySequence<byte> p
387388
}
388389
}
389390

390-
391391
protected void BeginReadPackage(CancellationToken token)
392392
{
393-
_ = Task.Run(async () =>
393+
var pipe = new Pipe();
394+
FillRecvPipeAsync(QpPackageHandler_Stream, pipe.Writer, token).ContinueWith(task =>
394395
{
395-
var pipe = new Pipe();
396-
var fillTask = FillRecvPipeAsync(QpPackageHandler_Stream, pipe.Writer, token);
397-
var readTask = ReadRecvPipeAsync(pipe.Reader, token);
398-
try
399-
{
400-
await Task.WhenAll(fillTask, readTask).ConfigureAwait(false);
401-
}
402-
catch
403-
{
404-
await pipe.Writer.CompleteAsync().ConfigureAwait(false);
405-
await pipe.Reader.CompleteAsync().ConfigureAwait(false);
406-
}
396+
if (task.IsFaulted)
397+
OnReadError(task.Exception);
398+
pipe.Writer.CompleteAsync(task.Exception);
399+
});
400+
ReadRecvPipeAsync(pipe.Reader, token).ContinueWith(task =>
401+
{
402+
if (task.IsFaulted)
403+
OnReadError(task.Exception);
404+
pipe.Reader.CompleteAsync(task.Exception);
407405
});
408406
}
409407

410408
private async Task FillRecvPipeAsync(Stream stream, PipeWriter writer, CancellationToken token)
411409
{
412-
try
413-
{
414-
while (!token.IsCancellationRequested)
415-
{
416-
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
417-
int bytesRead = await stream.ReadAsync(memory, token);
418-
if (bytesRead == 0)
419-
break;
420-
writer.Advance(bytesRead);
421-
await writer.FlushAsync(token);
422-
}
423-
}
424-
catch (Exception ex)
410+
while (!token.IsCancellationRequested)
425411
{
426-
OnReadError(ex);
412+
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
413+
int bytesRead = await stream.ReadAsync(memory, token);
414+
if (bytesRead == 0)
415+
break;
416+
writer.Advance(bytesRead);
417+
await writer.FlushAsync(token);
427418
}
428419
}
429420

Quick.Protocol/QpChannel_Send.cs

+5-3
Original file line numberDiff line numberDiff line change
@@ -111,12 +111,14 @@ private async Task writePackageBuffer(PipeReader currentReader, QpPackageType pa
111111
packageHeadMemory = new Memory<byte>(sendHeadBuffer, 0, PACKAGE_TOTAL_LENGTH_LENGTH);
112112
}
113113
//写入包头
114-
await stream.WriteAsync(packageHeadMemory).ConfigureAwait(false);
115-
114+
var writeTask = stream.WriteAsync(packageHeadMemory).AsTask();
115+
await writeTask
116+
.WaitAsync(TimeSpan.FromMilliseconds(options.InternalTransportTimeout))
117+
.ConfigureAwait(false);
118+
116119
//如果有包内容,写入包内容
117120
if (packageBodyBuffer.Length > 0)
118121
{
119-
Task writeTask = null;
120122
using (var sequenceByteStream = new ReadOnlySequenceByteStream(packageBodyBuffer))
121123
writeTask = sequenceByteStream.CopyToAsync(stream);
122124
await writeTask

0 commit comments

Comments
 (0)