Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions chronos/asyncsync.nim
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,21 @@ type
counter: uint64
limit: int
offset: int

AsyncSemaphore* = ref object of RootObj
## A semaphore manages an internal number of available slots which is decremented
## by each ``acquire()`` call and incremented by each ``release()`` call.
## The available slots can never go below zero; when ``acquire()`` finds that it is
## zero, it blocks, waiting until some other task calls ``release()``.
##
## The ``size`` argument gives the initial value for the available slots
## counter; it defaults to ``1``. If the value given is less than 1,
## ``AssertionDefect`` is raised.
size: int
availableSlots: int
waiters: Deque[Future[void].Raising([CancelledError])]

AsyncSemaphoreError* = object of AsyncError

proc newAsyncLock*(): AsyncLock =
## Creates new asynchronous lock ``AsyncLock``.
Expand Down Expand Up @@ -638,3 +653,58 @@ proc waitEvents*[T](ab: AsyncEventQueue[T],
break

events

proc newAsyncSemaphore*(size: int = 1): AsyncSemaphore =
## Creates a new asynchronous bounded semaphore ``AsyncSemaphore`` with
## internal available slots set to ``size``.
doAssert(size > 0, "AsyncSemaphore initial size must be bigger then 0")
AsyncSemaphore(
size: size,
availableSlots: size,
waiters: initDeque[Future[void].Raising([CancelledError])](),
)

proc availableSlots*(s: AsyncSemaphore): int =
return s.availableSlots

proc tryAcquire*(s: AsyncSemaphore): bool =
## Attempts to acquire a resource, if successful returns true, otherwise false.

if s.availableSlots > 0:
s.availableSlots.dec
true
else:
false

proc acquire*(
s: AsyncSemaphore
): Future[void] {.async: (raises: [CancelledError], raw: true).} =
## Acquire a resource and decrement the resource counter.
## If no more resources are available, the returned future
## will not complete until the resource count goes above 0.

let fut = newFuture[void]("AsyncSemaphore.acquire")
if s.tryAcquire():
fut.complete()
return fut

s.waiters.addLast(fut)

return fut

proc release*(s: AsyncSemaphore) {.raises: [AsyncSemaphoreError].} =
## Release a resource from the semaphore,
## by picking the first future from waiters queue
## and completing it and incrementing the
## internal resource count.

if s.availableSlots == s.size:
raise newException(AsyncSemaphoreError, "release called without acquire")

s.availableSlots.inc
while s.waiters.len > 0:
var fut = s.waiters.popFirst()
if not fut.finished():
s.availableSlots.dec
fut.complete()
break
6 changes: 3 additions & 3 deletions tests/testall.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ when (chronosEventEngine in ["epoll", "kqueue"]) or defined(windows):
import testmacro, testsync, testsoon, testtime, testfut, testsignal,
testaddress, testdatagram, teststream, testserver, testbugs, testnet,
testasyncstream, testhttpserver, testshttpserver, testhttpclient,
testproc, testratelimit, testfutures, testthreadsync
testproc, testratelimit, testfutures, testthreadsync, testasyncsemaphore

# Must be imported last to check for Pending futures
import testutils
Expand All @@ -20,7 +20,7 @@ elif chronosEventEngine == "poll":
import testmacro, testsync, testsoon, testtime, testfut, testaddress,
testdatagram, teststream, testserver, testbugs, testnet,
testasyncstream, testhttpserver, testshttpserver, testhttpclient,
testratelimit, testfutures, testthreadsync
testratelimit, testfutures, testthreadsync, testasyncsemaphore

# Must be imported last to check for Pending futures
import testutils
import testutils
161 changes: 161 additions & 0 deletions tests/testasyncsemaphore.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
# Chronos Test Suite
# (c) Copyright 2018-Present
# Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import unittest2
import ../chronos, ../chronos/unittest2/asynctests

{.used.}

suite "AsyncSemaphore":
teardown:
checkLeaks()

asyncTest "default size":
let sema = newAsyncSemaphore()
check sema.availableSlots == 1

asyncTest "custom size":
let sema = newAsyncSemaphore(3)
check sema.availableSlots == 3

asyncTest "invalid size":
expect AssertionDefect:
discard newAsyncSemaphore(0)

asyncTest "should acquire":
let sema = newAsyncSemaphore(3)

await sema.acquire()
check sema.availableSlots == 2
await sema.acquire()
check sema.availableSlots == 1
await sema.acquire()
check sema.availableSlots == 0

asyncTest "should release":
let sema = newAsyncSemaphore(3)

await sema.acquire()
await sema.acquire()
await sema.acquire()

sema.release()
check sema.availableSlots == 1
sema.release()
check sema.availableSlots == 2
sema.release()
check sema.availableSlots == 3

asyncTest "initial release":
let sema = newAsyncSemaphore(3)

expect AsyncSemaphoreError: # should not release
sema.release()

asyncTest "double release":
let sema = newAsyncSemaphore(3)

await sema.acquire()
sema.release()
expect AsyncSemaphoreError: # should not release
sema.release()

asyncTest "should queue acquire":
let sema = newAsyncSemaphore(1)

await sema.acquire()
let fut = sema.acquire()

check sema.availableSlots == 0
sema.release()
sema.release()

await fut
check fut.finished()

asyncTest "should tryAcquire":
let sema = newAsyncSemaphore(1)
await sema.acquire()
check sema.tryAcquire() == false

asyncTest "should tryAcquire and acquire":
let sema = newAsyncSemaphore(4)
check sema.tryAcquire() == true
check sema.tryAcquire() == true
check sema.tryAcquire() == true
check sema.tryAcquire() == true
check sema.availableSlots == 0

let fut = sema.acquire()
check fut.finished == false
check sema.availableSlots == 0

sema.release()
sema.release()
sema.release()
sema.release()
sema.release()

check fut.finished == true
check sema.availableSlots == 4

asyncTest "should cancel sequential semaphore slot":
let sema = newAsyncSemaphore(1)

await sema.acquire()

let
tmp = sema.acquire()
tmp2 = sema.acquire()
check:
not tmp.finished()
not tmp2.finished()

tmp.cancel()
sema.release()

check tmp2.finished()

sema.release()

check await sema.acquire().withTimeout(10.millis)

asyncTest "should handle out of order cancellations":
let sema = newAsyncSemaphore(1)

await sema.acquire() # 1st acquire
let tmp1 = sema.acquire() # 2nd acquire
check not tmp1.finished()

let tmp2 = sema.acquire() # 3rd acquire
check not tmp2.finished()

let tmp3 = sema.acquire() # 4th acquire
check not tmp3.finished()

# up to this point, we've called acquire 4 times
tmp1.cancel() # 1st release (implicit)
tmp2.cancel() # 2nd release (implicit)

check not tmp3.finished() # check that we didn't release the wrong slot

sema.release() # 3rd release (explicit)
check tmp3.finished()

sema.release() # 4th release
check await sema.acquire().withTimeout(10.millis)

asyncTest "should properly handle timeouts and cancellations":
let sema = newAsyncSemaphore(1)

await sema.acquire()
check not (await sema.acquire().withTimeout(1.millis))
# should not acquire but cancel
sema.release()

check await sema.acquire().withTimeout(10.millis)

Loading