Skip to content

Commit 56343a0

Browse files
committed
减少一层NuGet包依赖。
1 parent cac4f71 commit 56343a0

File tree

5 files changed

+134
-16
lines changed

5 files changed

+134
-16
lines changed

Quick.Protocol/QpChannel.cs

+2-7
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
using System.Text.Json;
2-
using Quick.Protocol.Exceptions;
3-
using Quick.Protocol.Utils;
1+
using Quick.Protocol.Utils;
42
using System;
53
using System.Collections.Concurrent;
64
using System.Collections.Generic;
@@ -11,11 +9,8 @@
119
using System.Text;
1210
using System.Threading;
1311
using System.Threading.Tasks;
14-
using System.Text.Json.Serialization;
15-
using System.IO.Pipelines;
1612
using System.Buffers;
17-
using System.Collections.ObjectModel;
18-
using Nerdbank.Streams;
13+
1914

2015
namespace Quick.Protocol
2116
{

Quick.Protocol/QpChannel_Recv.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
using System.Text;
1111
using System.Threading;
1212
using System.Threading.Tasks;
13-
using Nerdbank.Streams;
13+
using Quick.Protocol.Streams;
1414

1515
namespace Quick.Protocol
1616
{
@@ -249,7 +249,7 @@ private async Task ReadRecvPipeAsync(PipeReader recvReader, CancellationToken to
249249

250250
//开始解压
251251
var compressedBuffer = packageBuffer.Slice(PACKAGE_TOTAL_LENGTH_LENGTH);
252-
using (var readMs = compressedBuffer.AsStream())
252+
using (var readMs = new ReadOnlySequenceByteStream(compressedBuffer))
253253
using (var gzStream = new GZipStream(readMs, CompressionMode.Decompress, true))
254254
{
255255
while (true)

Quick.Protocol/QpChannel_Send.cs

+6-5
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
using System.Text;
1010
using System.Threading;
1111
using System.Threading.Tasks;
12-
using Nerdbank.Streams;
1312
using Quick.Protocol.Streams;
1413

1514
namespace Quick.Protocol
@@ -53,7 +52,7 @@ private async Task writePackageBuffer(PipeReader currentReader, QpPackageType pa
5352
{
5453
if (writeCompressPipe == null)
5554
writeCompressPipe = new Pipe();
56-
using (var inStream = packageBodyBuffer.AsStream())
55+
using (var inStream = new ReadOnlySequenceByteStream(packageBodyBuffer))
5756
using (var outStream = new PipeWriterStream(writeCompressPipe.Writer, true))
5857
{
5958
using (var gzStream = new GZipStream(outStream, CompressionMode.Compress, true))
@@ -113,11 +112,13 @@ private async Task writePackageBuffer(PipeReader currentReader, QpPackageType pa
113112
}
114113
//写入包头
115114
await stream.WriteAsync(packageHeadMemory).ConfigureAwait(false);
116-
115+
117116
//如果有包内容,写入包内容
118117
if (packageBodyBuffer.Length > 0)
119118
{
120-
var writeTask = packageBodyBuffer.AsStream().CopyToAsync(stream);
119+
Task writeTask = null;
120+
using (var sequenceByteStream = new ReadOnlySequenceByteStream(packageBodyBuffer))
121+
writeTask = sequenceByteStream.CopyToAsync(stream);
121122
await writeTask
122123
.WaitAsync(TimeSpan.FromMilliseconds(options.InternalTransportTimeout))
123124
.ConfigureAwait(false);
@@ -141,7 +142,7 @@ await writeTask
141142
packageTotalLength,
142143
packageType,
143144
LogUtils.LogContent ?
144-
BitConverter.ToString(packageHeadMemory.ToArray().Concat(packageBodyBuffer.ToArray()).ToArray())
145+
BitConverter.ToString(packageHeadMemory.ToArray()) + "-" + BitConverter.ToString(packageBodyBuffer.ToArray())
145146
: LogUtils.NOT_SHOW_CONTENT_MESSAGE);
146147
currentReader?.AdvanceTo(packageBodyBuffer.End);
147148
await stream.FlushAsync().ConfigureAwait(false);

Quick.Protocol/Quick.Protocol.csproj

+1-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
</ItemGroup>
2424

2525
<ItemGroup>
26-
<PackageReference Include="Nerdbank.Streams" Version="2.11.79" />
26+
<PackageReference Include="System.IO.Pipelines" Version="8.0.0" />
2727
</ItemGroup>
28-
2928
</Project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
using System;
2+
using System.Buffers;
3+
using System.Collections.Generic;
4+
using System.IO;
5+
using System.IO.Pipelines;
6+
using System.Linq;
7+
using System.Text;
8+
using System.Threading;
9+
using System.Threading.Tasks;
10+
11+
namespace Quick.Protocol.Streams
12+
{
13+
public class ReadOnlySequenceByteStream : Stream
14+
{
15+
private ReadOnlySequence<byte> _sequence;
16+
17+
public override bool CanRead => true;
18+
19+
public override bool CanSeek => false;
20+
21+
public override bool CanWrite => false;
22+
23+
public override long Length => _sequence.Length;
24+
25+
public override long Position
26+
{
27+
get
28+
{
29+
throw new NotSupportedException();
30+
}
31+
set
32+
{
33+
throw new NotSupportedException();
34+
}
35+
}
36+
37+
38+
public ReadOnlySequenceByteStream(ReadOnlySequence<byte> sequence)
39+
{
40+
_sequence = sequence;
41+
}
42+
43+
public override int Read(byte[] buffer, int offset, int count)
44+
{
45+
if (buffer == null)
46+
{
47+
throw new ArgumentNullException(nameof(buffer));
48+
}
49+
50+
return ReadInternal(new Span<byte>(buffer, offset, count));
51+
}
52+
53+
public override int ReadByte()
54+
{
55+
Span<byte> buffer = stackalloc byte[1];
56+
if (ReadInternal(buffer) != 0)
57+
{
58+
return buffer[0];
59+
}
60+
61+
return -1;
62+
}
63+
64+
private int ReadInternal(Span<byte> buffer)
65+
{
66+
var count = Math.Min((int)_sequence.Length, buffer.Length);
67+
_sequence.Slice(0, count).CopyTo(buffer);
68+
_sequence = _sequence.Slice(count);
69+
return count;
70+
}
71+
72+
public override long Seek(long offset, SeekOrigin origin)
73+
{
74+
throw new NotSupportedException();
75+
}
76+
77+
public override void SetLength(long value)
78+
{
79+
throw new NotSupportedException();
80+
}
81+
82+
public override void Write(byte[] buffer, int offset, int count)
83+
{
84+
throw new NotSupportedException();
85+
}
86+
87+
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
88+
{
89+
if (buffer == null)
90+
{
91+
throw new ArgumentNullException(nameof(buffer));
92+
}
93+
return Task.Run(() => ReadInternal(new Span<byte>(buffer, offset, count)));
94+
}
95+
96+
public override int Read(Span<byte> buffer)
97+
{
98+
return ReadInternal(buffer);
99+
}
100+
101+
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default(CancellationToken))
102+
{
103+
return new ValueTask<int>(Task.Run(() => ReadInternal(buffer.Span)));
104+
}
105+
106+
public override async Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
107+
{
108+
var buffer = new byte[bufferSize];
109+
while (_sequence.Length > 0)
110+
{
111+
var count = Math.Min((int)_sequence.Length, bufferSize);
112+
_sequence.Slice(0, count).CopyTo(buffer);
113+
_sequence = _sequence.Slice(count);
114+
await destination.WriteAsync(buffer, 0, count).ConfigureAwait(false);
115+
}
116+
await destination.FlushAsync().ConfigureAwait(false);
117+
}
118+
119+
public override void Flush()
120+
{
121+
}
122+
}
123+
}

0 commit comments

Comments
 (0)