Skip to content

Commit 7029f8b

Browse files
committed
Fix behavior for write operations.
1 parent d3eac1d commit 7029f8b

File tree

2 files changed

+65
-33
lines changed

2 files changed

+65
-33
lines changed

chronos.nimble

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
packageName = "chronos"
2-
version = "2.2.8"
2+
version = "2.2.9"
33
author = "Status Research & Development GmbH"
44
description = "Chronos"
55
license = "Apache License 2.0 or MIT"

chronos/transports/stream.nim

Lines changed: 64 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,13 @@ proc completePendingWriteQueue(queue: var Deque[StreamVector],
252252
if not(vector.writer.finished()):
253253
vector.writer.complete(v)
254254

255+
proc failPendingWriteQueue(queue: var Deque[StreamVector],
256+
error: ref Exception) {.inline.} =
257+
while len(queue) > 0:
258+
var vector = queue.popFirst()
259+
if not(vector.writer.finished()):
260+
vector.writer.fail(error)
261+
255262
when defined(windows):
256263

257264
template zeroOvelappedOffset(t: untyped) =
@@ -324,25 +331,29 @@ when defined(windows):
324331
vector.writer.complete(transp.wwsabuf.len)
325332
elif int(err) == ERROR_OPERATION_ABORTED:
326333
# CancelIO() interrupt
327-
transp.state.incl(WritePaused)
328-
let v = transp.queue.popFirst()
329-
if not(v.writer.finished()):
330-
v.writer.complete(0)
334+
transp.state.incl({WritePaused, WriteEof})
335+
let vector = transp.queue.popFirst()
336+
if not(vector.writer.finished()):
337+
vector.writer.complete(0)
338+
completePendingWriteQueue(transp.queue, 0)
331339
break
332340
else:
333-
let v = transp.queue.popFirst()
341+
let vector = transp.queue.popFirst()
334342
if isConnResetError(err):
335343
# Soft error happens which indicates that remote peer got
336344
# disconnected, complete all pending writes in queue with 0.
337-
transp.state.incl(WriteEof)
338-
if not(v.writer.finished()):
339-
v.writer.complete(0)
345+
transp.state.incl({WritePaused, WriteEof})
346+
if not(vector.writer.finished()):
347+
vector.writer.complete(0)
340348
completePendingWriteQueue(transp.queue, 0)
341349
break
342350
else:
343-
transp.state.incl(WriteError)
344-
if not(v.writer.finished()):
345-
v.writer.fail(getTransportOsError(err))
351+
transp.state.incl({WritePaused, WriteError})
352+
let error = getTransportOsError(err)
353+
if not(vector.writer.finished()):
354+
vector.writer.fail(error)
355+
failPendingWriteQueue(transp.queue, error)
356+
break
346357
else:
347358
## Initiation
348359
transp.state.incl(WritePending)
@@ -360,9 +371,11 @@ when defined(windows):
360371
if int(err) == ERROR_OPERATION_ABORTED:
361372
# CancelIO() interrupt
362373
transp.state.excl(WritePending)
363-
transp.state.incl(WritePaused)
374+
transp.state.incl({WritePaused, WriteEof})
364375
if not(vector.writer.finished()):
365376
vector.writer.complete(0)
377+
completePendingWriteQueue(transp.queue, 0)
378+
break
366379
elif int(err) == ERROR_IO_PENDING:
367380
transp.queue.addFirst(vector)
368381
else:
@@ -377,8 +390,11 @@ when defined(windows):
377390
break
378391
else:
379392
transp.state.incl({WritePaused, WriteError})
393+
let error = getTransportOsError(err)
380394
if not(vector.writer.finished()):
381-
vector.writer.fail(getTransportOsError(err))
395+
vector.writer.fail(error)
396+
failPendingWriteQueue(transp.queue, error)
397+
break
382398
else:
383399
transp.queue.addFirst(vector)
384400
else:
@@ -400,9 +416,11 @@ when defined(windows):
400416
if int(err) == ERROR_OPERATION_ABORTED:
401417
# CancelIO() interrupt
402418
transp.state.excl(WritePending)
403-
transp.state.incl(WritePaused)
419+
transp.state.incl({WritePaused, WriteEof})
404420
if not(vector.writer.finished()):
405421
vector.writer.complete(0)
422+
completePendingWriteQueue(transp.queue, 0)
423+
break
406424
elif int(err) == ERROR_IO_PENDING:
407425
transp.queue.addFirst(vector)
408426
else:
@@ -417,8 +435,11 @@ when defined(windows):
417435
break
418436
else:
419437
transp.state.incl({WritePaused, WriteError})
438+
let error = getTransportOsError(err)
420439
if not(vector.writer.finished()):
421-
vector.writer.fail(getTransportOsError(err))
440+
vector.writer.fail(error)
441+
failPendingWriteQueue(transp.queue, error)
442+
break
422443
else:
423444
transp.queue.addFirst(vector)
424445
elif transp.kind == TransportKind.Pipe:
@@ -432,20 +453,16 @@ when defined(windows):
432453
cast[POVERLAPPED](addr transp.wovl))
433454
if ret == 0:
434455
let err = osLastError()
435-
if int(err) == ERROR_OPERATION_ABORTED:
456+
if int(err) in {ERROR_OPERATION_ABORTED, ERROR_NO_DATA}:
436457
# CancelIO() interrupt
437458
transp.state.excl(WritePending)
438-
transp.state.incl(WritePaused)
459+
transp.state.incl({WritePaused, WriteEof})
439460
if not(vector.writer.finished()):
440461
vector.writer.complete(0)
462+
completePendingWriteQueue(transp.queue, 0)
463+
break
441464
elif int(err) == ERROR_IO_PENDING:
442465
transp.queue.addFirst(vector)
443-
elif int(err) == ERROR_NO_DATA:
444-
# The pipe is being closed.
445-
transp.state.excl(WritePending)
446-
transp.state.incl(WritePaused)
447-
if not(vector.writer.finished()):
448-
vector.writer.complete(0)
449466
else:
450467
transp.state.excl(WritePending)
451468
if isConnResetError(err):
@@ -458,8 +475,11 @@ when defined(windows):
458475
break
459476
else:
460477
transp.state.incl({WritePaused, WriteError})
478+
let error = getTransportOsError(err)
461479
if not(vector.writer.finished()):
462-
vector.writer.fail(getTransportOsError(err))
480+
vector.writer.fail(error)
481+
failPendingWriteQueue(transp.queue, error)
482+
break
463483
else:
464484
transp.queue.addFirst(vector)
465485
break
@@ -974,17 +994,20 @@ else:
974994
if int(err) == EINTR:
975995
continue
976996
else:
997+
transp.fd.removeWriter()
977998
if isConnResetError(err):
978999
# Soft error happens which indicates that remote peer got
9791000
# disconnected, complete all pending writes in queue with 0.
9801001
transp.state.incl({WriteEof, WritePaused})
9811002
if not(vector.writer.finished()):
9821003
vector.writer.complete(0)
9831004
completePendingWriteQueue(transp.queue, 0)
984-
transp.fd.removeWriter()
9851005
else:
1006+
transp.state.incl({WriteError, WritePaused})
1007+
let error = getTransportOsError(err)
9861008
if not(vector.writer.finished()):
987-
vector.writer.fail(getTransportOsError(err))
1009+
vector.writer.fail(error)
1010+
failPendingWriteQueue(transp.queue, error)
9881011
else:
9891012
var nbytes = cast[int](vector.buf)
9901013
let res = sendfile(int(fd), cast[int](vector.buflen),
@@ -1004,17 +1027,20 @@ else:
10041027
if int(err) == EINTR:
10051028
continue
10061029
else:
1030+
transp.fd.removeWriter()
10071031
if isConnResetError(err):
10081032
# Soft error happens which indicates that remote peer got
10091033
# disconnected, complete all pending writes in queue with 0.
10101034
transp.state.incl({WriteEof, WritePaused})
10111035
if not(vector.writer.finished()):
10121036
vector.writer.complete(0)
10131037
completePendingWriteQueue(transp.queue, 0)
1014-
transp.fd.removeWriter()
10151038
else:
1039+
transp.state.incl({WriteError, WritePaused})
1040+
let error = getTransportOsError(err)
10161041
if not(vector.writer.finished()):
1017-
vector.writer.fail(getTransportOsError(err))
1042+
vector.writer.fail(error)
1043+
failPendingWriteQueue(transp.queue, error)
10181044
break
10191045

10201046
elif transp.kind == TransportKind.Pipe:
@@ -1032,17 +1058,20 @@ else:
10321058
if int(err) == EINTR:
10331059
continue
10341060
else:
1061+
transp.fd.removeWriter()
10351062
if isConnResetError(err):
10361063
# Soft error happens which indicates that remote peer got
10371064
# disconnected, complete all pending writes in queue with 0.
10381065
transp.state.incl({WriteEof, WritePaused})
10391066
if not(vector.writer.finished()):
10401067
vector.writer.complete(0)
10411068
completePendingWriteQueue(transp.queue, 0)
1042-
transp.fd.removeWriter()
10431069
else:
1070+
transp.state.incl({WriteError, WritePaused})
1071+
let error = getTransportOsError(err)
10441072
if not(vector.writer.finished()):
1045-
vector.writer.fail(getTransportOsError(err))
1073+
vector.writer.fail(error)
1074+
failPendingWriteQueue(transp.queue, error)
10461075
else:
10471076
var nbytes = cast[int](vector.buf)
10481077
let res = sendfile(int(fd), cast[int](vector.buflen),
@@ -1062,17 +1091,20 @@ else:
10621091
if int(err) == EINTR:
10631092
continue
10641093
else:
1094+
transp.fd.removeWriter()
10651095
if isConnResetError(err):
10661096
# Soft error happens which indicates that remote peer got
10671097
# disconnected, complete all pending writes in queue with 0.
10681098
transp.state.incl({WriteEof, WritePaused})
10691099
if not(vector.writer.finished()):
10701100
vector.writer.complete(0)
10711101
completePendingWriteQueue(transp.queue, 0)
1072-
transp.fd.removeWriter()
10731102
else:
1103+
transp.state.incl({WriteError, WritePaused})
1104+
let error = getTransportOsError(err)
10741105
if not(vector.writer.finished()):
1075-
vector.writer.fail(getTransportOsError(err))
1106+
vector.writer.fail(error)
1107+
failPendingWriteQueue(transp.queue, error)
10761108
break
10771109
else:
10781110
transp.state.incl(WritePaused)

0 commit comments

Comments
 (0)