Skip to content

Commit 8c3152a

Browse files
hossambarakatbrendandburns
authored andcommitted
Update WatcherDelegatingHandler to stop eating first line (#183) (#190)
* Update WatcherDelegatingHandler to stop eating first line (#183) * Override Read methods in PeekableStreamReader to avoid unpredicted behaviour (#183) Override Read methods in PeekableStreamReader to avoid unpredicted behaviour (#183)
1 parent d014467 commit 8c3152a

File tree

2 files changed

+80
-17
lines changed

2 files changed

+80
-17
lines changed

src/KubernetesClient/WatcherDelegatingHandler.cs

Lines changed: 76 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
using System.IO;
1+
using System;
2+
using System.Collections.Generic;
3+
using System.IO;
24
using System.Linq;
35
using System.Net;
46
using System.Net.Http;
@@ -24,8 +26,8 @@ protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage
2426
var query = QueryHelpers.ParseQuery(request.RequestUri.Query);
2527

2628
if (query.TryGetValue("watch", out var values) && values.Any(v => v == "true"))
27-
{
28-
originResponse.Content = new LineSeparatedHttpContent(originResponse.Content);
29+
{
30+
originResponse.Content = new LineSeparatedHttpContent(originResponse.Content);
2931
}
3032
}
3133
return originResponse;
@@ -41,18 +43,19 @@ public LineSeparatedHttpContent(HttpContent originContent)
4143
_originContent = originContent;
4244
}
4345

44-
internal StreamReader StreamReader { get; private set; }
46+
internal PeekableStreamReader StreamReader { get; private set; }
4547

4648
protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context)
4749
{
4850
_originStream = await _originContent.ReadAsStreamAsync();
4951

50-
StreamReader = new StreamReader(_originStream);
51-
52-
var firstLine = await StreamReader.ReadLineAsync();
53-
var writer = new StreamWriter(stream);
54-
55-
// using (writer) // leave open
52+
StreamReader = new PeekableStreamReader(_originStream);
53+
54+
var firstLine = await StreamReader.PeekLineAsync();
55+
56+
var writer = new StreamWriter(stream);
57+
58+
// using (writer) // leave open
5659
{
5760
await writer.WriteAsync(firstLine);
5861
await writer.FlushAsync();
@@ -64,6 +67,67 @@ protected override bool TryComputeLength(out long length)
6467
length = 0;
6568
return false;
6669
}
70+
}
71+
internal class PeekableStreamReader : StreamReader
72+
{
73+
private Queue<string> _buffer;
74+
public PeekableStreamReader(Stream stream) : base(stream)
75+
{
76+
_buffer = new Queue<string>();
77+
}
78+
79+
public override string ReadLine()
80+
{
81+
if (_buffer.Count > 0)
82+
{
83+
return _buffer.Dequeue();
84+
}
85+
return base.ReadLine();
86+
}
87+
public override Task<string> ReadLineAsync()
88+
{
89+
if (_buffer.Count > 0)
90+
{
91+
return Task.FromResult(_buffer.Dequeue());
92+
}
93+
return base.ReadLineAsync();
94+
}
95+
public async Task<string> PeekLineAsync()
96+
{
97+
var line = await ReadLineAsync();
98+
_buffer.Enqueue(line);
99+
return line;
100+
}
101+
102+
public override int Read()
103+
{
104+
throw new NotImplementedException();
105+
}
106+
107+
public override int Read(char[] buffer, int index, int count)
108+
{
109+
throw new NotImplementedException();
110+
}
111+
public override Task<int> ReadAsync(char[] buffer, int index, int count)
112+
{
113+
throw new NotImplementedException();
114+
}
115+
public override int ReadBlock(char[] buffer, int index, int count)
116+
{
117+
throw new NotImplementedException();
118+
}
119+
public override Task<int> ReadBlockAsync(char[] buffer, int index, int count)
120+
{
121+
throw new NotImplementedException();
122+
}
123+
public override string ReadToEnd()
124+
{
125+
throw new NotImplementedException();
126+
}
127+
public override Task<string> ReadToEndAsync()
128+
{
129+
throw new NotImplementedException();
130+
}
67131
}
68-
}
69-
}
132+
}
133+
}

tests/KubernetesClient.Tests/WatchTests.cs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ await Assert.ThrowsAnyAsync<Exception>(() =>
8686
[Fact]
8787
public async Task SuriveBadLine()
8888
{
89-
AsyncCountdownEvent eventsReceived = new AsyncCountdownEvent(4 /* first line of response is eaten by WatcherDelegatingHandler */);
89+
AsyncCountdownEvent eventsReceived = new AsyncCountdownEvent(5);
9090
AsyncManualResetEvent serverShutdown = new AsyncManualResetEvent();
9191
AsyncManualResetEvent connectionClosed = new AsyncManualResetEvent();
9292

@@ -148,7 +148,7 @@ public async Task SuriveBadLine()
148148
Assert.Contains(WatchEventType.Added, events);
149149
Assert.Contains(WatchEventType.Modified, events);
150150

151-
Assert.Equal(2, errors);
151+
Assert.Equal(3, errors);
152152

153153
Assert.True(watcher.Watching);
154154

@@ -236,7 +236,6 @@ public async Task WatchAllEvents()
236236

237237
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
238238
{
239-
await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse);
240239
await WriteStreamLine(httpContext, MockAddedEventStreamLine);
241240
await WriteStreamLine(httpContext, MockDeletedStreamLine);
242241
await WriteStreamLine(httpContext, MockModifiedStreamLine);
@@ -303,7 +302,7 @@ public async Task WatchAllEvents()
303302
[Fact]
304303
public async Task WatchEventsWithTimeout()
305304
{
306-
AsyncCountdownEvent eventsReceived = new AsyncCountdownEvent(4 /* first line of response is eaten by WatcherDelegatingHandler */);
305+
AsyncCountdownEvent eventsReceived = new AsyncCountdownEvent(5);
307306
AsyncManualResetEvent serverShutdown = new AsyncManualResetEvent();
308307
AsyncManualResetEvent connectionClosed = new AsyncManualResetEvent();
309308

@@ -362,7 +361,7 @@ public async Task WatchEventsWithTimeout()
362361
Assert.Contains(WatchEventType.Modified, events);
363362
Assert.Contains(WatchEventType.Error, events);
364363

365-
Assert.Equal(0, errors);
364+
Assert.Equal(1, errors);
366365

367366
Assert.True(watcher.Watching);
368367

0 commit comments

Comments
 (0)