Skip to content

Commit

Permalink
受信処理の最適化
Browse files Browse the repository at this point in the history
  • Loading branch information
xtne6f committed Aug 26, 2024
1 parent b975976 commit 4052775
Showing 1 changed file with 23 additions and 18 deletions.
41 changes: 23 additions & 18 deletions Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,9 @@ await DoWebSocketAction(async ct => await watchSession.SendAsync(new ArraySegmen
string hashedUserId = null;
int keepSeatIntervalSec = 0;
int keepSeatTick = 0;
var readEntryBuf = new byte[512];
var readSegmentBuf = new byte[512];
var readPrefetchBuf = new byte[512];
var watchBuf = new byte[MaxAcceptableWebSocketPayloadSize];
int watchCount = 0;
var serverUnixTime = TimeSpan.Zero;
Expand Down Expand Up @@ -629,7 +632,7 @@ await DoWebSocketAction(async ct => await watchSession.SendAsync(new ArraySegmen
{
// プレイリスト接続完了
entryStream = await (Task<Stream>)entryTask;
entryTask = ReadProtoBufChunkAsync(entryStream, msEntry, linkedCts.Token);
entryTask = ReadProtoBufChunkAsync(entryStream, msEntry, readEntryBuf, linkedCts.Token);
}
else
{
Expand Down Expand Up @@ -677,7 +680,7 @@ await DoWebSocketAction(async ct => await watchSession.SendAsync(new ArraySegmen
Trace.WriteLine("Prefetch started");
}
}
entryTask = ReadProtoBufChunkAsync(entryStream, msEntry, linkedCts.Token);
entryTask = ReadProtoBufChunkAsync(entryStream, msEntry, readEntryBuf, linkedCts.Token);
}
}
}
Expand All @@ -687,7 +690,7 @@ await DoWebSocketAction(async ct => await watchSession.SendAsync(new ArraySegmen
{
// セグメント接続完了
segmentStream = await (Task<Stream>)segmentTask;
segmentTask = ReadProtoBufChunkAsync(segmentStream, msSegment, linkedCts.Token);
segmentTask = ReadProtoBufChunkAsync(segmentStream, msSegment, readSegmentBuf, linkedCts.Token);
}
else
{
Expand All @@ -707,13 +710,13 @@ await DoWebSocketAction(async ct => await watchSession.SendAsync(new ArraySegmen
{
// プリフェッチ済み
chunkedMessage = ProtoBuf.Serializer.Deserialize<ChunkedMessage>(msPrefetch);
segmentTask = ReadProtoBufChunkAsync(segmentStream, msSegment, linkedCts.Token);
segmentTask = ReadProtoBufChunkAsync(segmentStream, msSegment, readSegmentBuf, linkedCts.Token);
}
}
else
{
chunkedMessage = ProtoBuf.Serializer.Deserialize<ChunkedMessage>(ms);
segmentTask = ReadProtoBufChunkAsync(segmentStream, msSegment, linkedCts.Token);
segmentTask = ReadProtoBufChunkAsync(segmentStream, msSegment, readSegmentBuf, linkedCts.Token);
}
}
}
Expand All @@ -723,7 +726,7 @@ await DoWebSocketAction(async ct => await watchSession.SendAsync(new ArraySegmen
{
// プリフェッチセグメント接続完了
prefetchStream = await (Task<Stream>)prefetchTask;
prefetchTask = ReadProtoBufChunkAsync(prefetchStream, msPrefetch, linkedCts.Token);
prefetchTask = ReadProtoBufChunkAsync(prefetchStream, msPrefetch, readPrefetchBuf, linkedCts.Token);
}
else if (await (Task<MemoryStream>)prefetchTask == null)
{
Expand Down Expand Up @@ -1515,16 +1518,16 @@ static string HtmlEncodeAmpLtGt(string s, bool encodeQuot = false)
return encodeQuot ? s.Replace("\"", "&quot;") : s;
}

static async Task<MemoryStream> ReadProtoBufChunkAsync(Stream s, MemoryStream ms, CancellationToken ct)
static async Task<MemoryStream> ReadProtoBufChunkAsync(Stream s, MemoryStream ms, byte[] buf, CancellationToken ct)
{
ms.SetLength(0);
var b = new byte[1] { 255 };
buf[0] = 255;
// 可変長整数の終わりまで読む
for (int i = 0; i < 5 && b[0] >= 128 && await s.ReadAsync(b, ct) == 1; i++)
for (int i = 0; i < 5 && buf[0] >= 128 && await s.ReadAsync(buf, 0, 1, ct) > 0; i++)
{
ms.WriteByte(b[0]);
ms.WriteByte(buf[0]);
}
if (b[0] >= 128)
if (buf[0] >= 128)
{
// 終端または値が大きすぎる
return null;
Expand All @@ -1537,14 +1540,16 @@ static async Task<MemoryStream> ReadProtoBufChunkAsync(Stream s, MemoryStream ms
return null;
}
ms.SetLength(0);
for (; len > 0 && await s.ReadAsync(b, ct) == 1; len--)
while (len > 0)
{
ms.WriteByte(b[0]);
}
if (len > 0)
{
// 終端
return null;
int readLen = await s.ReadAsync(buf, 0, Math.Min(len, buf.Length), ct);
if (readLen <= 0)
{
// 終端
return null;
}
ms.Write(buf, 0, readLen);
len -= readLen;
}
ms.Position = 0;
return ms;
Expand Down

0 comments on commit 4052775

Please sign in to comment.