diff --git a/chronos/asyncsync.nim b/chronos/asyncsync.nim index 5fab9b2a..7bf0c0b5 100644 --- a/chronos/asyncsync.nim +++ b/chronos/asyncsync.nim @@ -78,6 +78,21 @@ type counter: uint64 limit: int offset: int + + AsyncSemaphore* = ref object of RootObj + ## A semaphore manages an internal number of available slots which is decremented + ## by each ``acquire()`` call and incremented by each ``release()`` call. + ## The available slots can never go below zero; when ``acquire()`` finds that it is + ## zero, it blocks, waiting until some other task calls ``release()``. + ## + ## The ``size`` argument gives the initial value for the available slots + ## counter; it defaults to ``1``. If the value given is less than 1, + ## ``AssertionDefect`` is raised. + size: int + availableSlots: int + waiters: Deque[Future[void].Raising([CancelledError])] + + AsyncSemaphoreError* = object of AsyncError proc newAsyncLock*(): AsyncLock = ## Creates new asynchronous lock ``AsyncLock``. @@ -638,3 +653,58 @@ proc waitEvents*[T](ab: AsyncEventQueue[T], break events + +proc newAsyncSemaphore*(size: int = 1): AsyncSemaphore = + ## Creates a new asynchronous bounded semaphore ``AsyncSemaphore`` with + ## internal available slots set to ``size``. + doAssert(size > 0, "AsyncSemaphore initial size must be bigger then 0") + AsyncSemaphore( + size: size, + availableSlots: size, + waiters: initDeque[Future[void].Raising([CancelledError])](), + ) + +proc availableSlots*(s: AsyncSemaphore): int = + return s.availableSlots + +proc tryAcquire*(s: AsyncSemaphore): bool = + ## Attempts to acquire a resource, if successful returns true, otherwise false. + + if s.availableSlots > 0: + s.availableSlots.dec + true + else: + false + +proc acquire*( + s: AsyncSemaphore +): Future[void] {.async: (raises: [CancelledError], raw: true).} = + ## Acquire a resource and decrement the resource counter. + ## If no more resources are available, the returned future + ## will not complete until the resource count goes above 0. + + let fut = newFuture[void]("AsyncSemaphore.acquire") + if s.tryAcquire(): + fut.complete() + return fut + + s.waiters.addLast(fut) + + return fut + +proc release*(s: AsyncSemaphore) {.raises: [AsyncSemaphoreError].} = + ## Release a resource from the semaphore, + ## by picking the first future from waiters queue + ## and completing it and incrementing the + ## internal resource count. + + if s.availableSlots == s.size: + raise newException(AsyncSemaphoreError, "release called without acquire") + + s.availableSlots.inc + while s.waiters.len > 0: + var fut = s.waiters.popFirst() + if not fut.finished(): + s.availableSlots.dec + fut.complete() + break diff --git a/tests/testall.nim b/tests/testall.nim index 6419f983..5013a2bc 100644 --- a/tests/testall.nim +++ b/tests/testall.nim @@ -11,7 +11,7 @@ when (chronosEventEngine in ["epoll", "kqueue"]) or defined(windows): import testmacro, testsync, testsoon, testtime, testfut, testsignal, testaddress, testdatagram, teststream, testserver, testbugs, testnet, testasyncstream, testhttpserver, testshttpserver, testhttpclient, - testproc, testratelimit, testfutures, testthreadsync + testproc, testratelimit, testfutures, testthreadsync, testasyncsemaphore # Must be imported last to check for Pending futures import testutils @@ -20,7 +20,7 @@ elif chronosEventEngine == "poll": import testmacro, testsync, testsoon, testtime, testfut, testaddress, testdatagram, teststream, testserver, testbugs, testnet, testasyncstream, testhttpserver, testshttpserver, testhttpclient, - testratelimit, testfutures, testthreadsync + testratelimit, testfutures, testthreadsync, testasyncsemaphore # Must be imported last to check for Pending futures - import testutils + import testutils \ No newline at end of file diff --git a/tests/testasyncsemaphore.nim b/tests/testasyncsemaphore.nim new file mode 100644 index 00000000..6f3c4ad5 --- /dev/null +++ b/tests/testasyncsemaphore.nim @@ -0,0 +1,161 @@ +# Chronos Test Suite +# (c) Copyright 2018-Present +# Status Research & Development GmbH +# +# Licensed under either of +# Apache License, version 2.0, (LICENSE-APACHEv2) +# MIT license (LICENSE-MIT) +import unittest2 +import ../chronos, ../chronos/unittest2/asynctests + +{.used.} + +suite "AsyncSemaphore": + teardown: + checkLeaks() + + asyncTest "default size": + let sema = newAsyncSemaphore() + check sema.availableSlots == 1 + + asyncTest "custom size": + let sema = newAsyncSemaphore(3) + check sema.availableSlots == 3 + + asyncTest "invalid size": + expect AssertionDefect: + discard newAsyncSemaphore(0) + + asyncTest "should acquire": + let sema = newAsyncSemaphore(3) + + await sema.acquire() + check sema.availableSlots == 2 + await sema.acquire() + check sema.availableSlots == 1 + await sema.acquire() + check sema.availableSlots == 0 + + asyncTest "should release": + let sema = newAsyncSemaphore(3) + + await sema.acquire() + await sema.acquire() + await sema.acquire() + + sema.release() + check sema.availableSlots == 1 + sema.release() + check sema.availableSlots == 2 + sema.release() + check sema.availableSlots == 3 + + asyncTest "initial release": + let sema = newAsyncSemaphore(3) + + expect AsyncSemaphoreError: # should not release + sema.release() + + asyncTest "double release": + let sema = newAsyncSemaphore(3) + + await sema.acquire() + sema.release() + expect AsyncSemaphoreError: # should not release + sema.release() + + asyncTest "should queue acquire": + let sema = newAsyncSemaphore(1) + + await sema.acquire() + let fut = sema.acquire() + + check sema.availableSlots == 0 + sema.release() + sema.release() + + await fut + check fut.finished() + + asyncTest "should tryAcquire": + let sema = newAsyncSemaphore(1) + await sema.acquire() + check sema.tryAcquire() == false + + asyncTest "should tryAcquire and acquire": + let sema = newAsyncSemaphore(4) + check sema.tryAcquire() == true + check sema.tryAcquire() == true + check sema.tryAcquire() == true + check sema.tryAcquire() == true + check sema.availableSlots == 0 + + let fut = sema.acquire() + check fut.finished == false + check sema.availableSlots == 0 + + sema.release() + sema.release() + sema.release() + sema.release() + sema.release() + + check fut.finished == true + check sema.availableSlots == 4 + + asyncTest "should cancel sequential semaphore slot": + let sema = newAsyncSemaphore(1) + + await sema.acquire() + + let + tmp = sema.acquire() + tmp2 = sema.acquire() + check: + not tmp.finished() + not tmp2.finished() + + tmp.cancel() + sema.release() + + check tmp2.finished() + + sema.release() + + check await sema.acquire().withTimeout(10.millis) + + asyncTest "should handle out of order cancellations": + let sema = newAsyncSemaphore(1) + + await sema.acquire() # 1st acquire + let tmp1 = sema.acquire() # 2nd acquire + check not tmp1.finished() + + let tmp2 = sema.acquire() # 3rd acquire + check not tmp2.finished() + + let tmp3 = sema.acquire() # 4th acquire + check not tmp3.finished() + + # up to this point, we've called acquire 4 times + tmp1.cancel() # 1st release (implicit) + tmp2.cancel() # 2nd release (implicit) + + check not tmp3.finished() # check that we didn't release the wrong slot + + sema.release() # 3rd release (explicit) + check tmp3.finished() + + sema.release() # 4th release + check await sema.acquire().withTimeout(10.millis) + + asyncTest "should properly handle timeouts and cancellations": + let sema = newAsyncSemaphore(1) + + await sema.acquire() + check not (await sema.acquire().withTimeout(1.millis)) + # should not acquire but cancel + sema.release() + + check await sema.acquire().withTimeout(10.millis) +