Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion chronos/asyncloop.nim
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,10 @@ when defined(windows):
## (Unix) for the specified dispatcher.
return disp.ioPort

proc closeIoHandler(disp: PDispatcher) {.raises: [Defect, OSError].} =
if disp.ioPort.closeHandle() == 0:
raiseOSError(osLastError())

proc register*(fd: AsyncFD) {.raises: [Defect, CatchableError].} =
## Register file descriptor ``fd`` in thread's dispatcher.
let loop = getThreadDispatcher()
Expand Down Expand Up @@ -618,6 +622,9 @@ elif unixPlatform:
## Returns system specific OS queue.
return disp.selector

proc closeIoHandler(disp: PDispatcher) {.raises: [Defect, OSError].} =
disp.selector.close()

proc register*(fd: AsyncFD) {.raises: [Defect, CatchableError].} =
## Register file descriptor ``fd`` in thread's dispatcher.
let loop = getThreadDispatcher()
Expand Down Expand Up @@ -828,7 +835,8 @@ else:
proc setThreadDispatcher*(disp: PDispatcher) =
## Set current thread's dispatcher instance to ``disp``.
if not gDisp.isNil:
doAssert gDisp.callbacks.len == 0
# Sentinel can be present
doAssert gDisp.callbacks.len <= 1
gDisp = disp

proc getThreadDispatcher*(): PDispatcher =
Expand All @@ -840,6 +848,14 @@ proc getThreadDispatcher*(): PDispatcher =
raiseAsDefect exc, "Cannot create dispatcher"
gDisp

proc closeThreadDispatcher*() {.raises: [Defect, CatchableError].} =
if gDisp.isNil: return
let prevDisp = gDisp
while gDisp.callbacks.len > 1:
Copy link
Collaborator

@cheatfate cheatfate Oct 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is definitely not enough, number of callbacks could zero right after poll() call but it doesn't mean that there is no pending socket events, or pending timers. And even in such case you should check if this callback is actually SentinelCallback. Call to closeThreadDispatcher could be made while main poll() loop processing callbacks, so its possible to get into situation when callbacks <= 1, but even in such case its possible to get into situation when there no SentinelCallback.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I'll check that this is the sentinel, and that it's not called from inside poll

poll()
setThreadDispatcher(nil)
prevDisp.closeIoHandler()

proc setGlobalDispatcher*(disp: PDispatcher) {.
gcsafe, deprecated: "Use setThreadDispatcher() instead".} =
setThreadDispatcher(disp)
Expand All @@ -848,6 +864,7 @@ proc getGlobalDispatcher*(): PDispatcher {.
gcsafe, deprecated: "Use getThreadDispatcher() instead".} =
getThreadDispatcher()


proc setTimer*(at: Moment, cb: CallbackFunc,
udata: pointer = nil): TimerCallback =
## Arrange for the callback ``cb`` to be called at the given absolute
Expand Down
4 changes: 2 additions & 2 deletions chronos/ioselects/ioselectors_epoll.nim
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,13 @@ proc newSelector*[T](): Selector[T] {.raises: [Defect, OSError].} =
for i in 0 ..< numFD:
result.fds[i].ident = InvalidIdent

proc close*[T](s: Selector[T]) =
proc close*[T](s: Selector[T]) {.raises: [Defect, OSError].} =
let res = posix.close(s.epollFD)
when hasThreadSupport:
deallocSharedArray(s.fds)
deallocShared(cast[pointer](s))
if res != 0:
raiseIOSelectorsError(osLastError())
raiseOSError(osLastError())

proc newSelectEvent*(): SelectEvent {.raises: [Defect, OSError, IOSelectorsException].} =
let fdci = eventfd(0, 0)
Expand Down
2 changes: 1 addition & 1 deletion chronos/ioselects/ioselectors_kqueue.nim
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ proc close*[T](s: Selector[T]) =
deallocSharedArray(s.fds)
deallocShared(cast[pointer](s))
if res1 != 0 or res2 != 0:
raiseIOSelectorsError(osLastError())
raiseOSError(osLastError())

proc newSelectEvent*(): SelectEvent =
var fds: array[2, cint]
Expand Down
2 changes: 1 addition & 1 deletion tests/testall.nim
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import testmacro, testsync, testsoon, testtime, testfut, testsignal,
import testclosedispatcher, testmacro, testsync, testsoon, testtime, testfut, testsignal,
testaddress, testdatagram, teststream, testserver, testbugs, testnet,
testasyncstream, testhttpserver, testshttpserver, testhttpclient
import testutils
22 changes: 22 additions & 0 deletions tests/testclosedispatcher.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Chronos Test Suite
# (c) Copyright 2022-Present
# Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import unittest2
import ../chronos

when defined(nimHasUsed): {.used.}

suite "Dispatcher closing":
test "Can close the current dispatcher":
waitFor(sleepAsync(1.milliseconds))
check isNil(getThreadDispatcher()) == false
let beforeClose = getThreadDispatcher()
closeThreadDispatcher()
waitFor(sleepAsync(1.milliseconds))
check:
isNil(getThreadDispatcher()) == false
getThreadDispatcher() != beforeClose