Skip to content

Commit dd21dd4

Browse files
authored
feat: add AsyncSemaphore (#586)
1 parent a737147 commit dd21dd4

File tree

3 files changed

+234
-3
lines changed

3 files changed

+234
-3
lines changed

chronos/asyncsync.nim

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,21 @@ type
7878
counter: uint64
7979
limit: int
8080
offset: int
81+
82+
AsyncSemaphore* = ref object of RootObj
83+
## A semaphore manages an internal number of available slots which is decremented
84+
## by each ``acquire()`` call and incremented by each ``release()`` call.
85+
## The available slots can never go below zero; when ``acquire()`` finds that it is
86+
## zero, it blocks, waiting until some other task calls ``release()``.
87+
##
88+
## The ``size`` argument gives the initial value for the available slots
89+
## counter; it defaults to ``1``. If the value given is less than 1,
90+
## ``AssertionDefect`` is raised.
91+
size: int
92+
availableSlots: int
93+
waiters: Deque[Future[void].Raising([CancelledError])]
94+
95+
AsyncSemaphoreError* = object of AsyncError
8196

8297
proc newAsyncLock*(): AsyncLock =
8398
## Creates new asynchronous lock ``AsyncLock``.
@@ -638,3 +653,58 @@ proc waitEvents*[T](ab: AsyncEventQueue[T],
638653
break
639654

640655
events
656+
657+
proc newAsyncSemaphore*(size: int = 1): AsyncSemaphore =
658+
## Creates a new asynchronous bounded semaphore ``AsyncSemaphore`` with
659+
## internal available slots set to ``size``.
660+
doAssert(size > 0, "AsyncSemaphore initial size must be bigger then 0")
661+
AsyncSemaphore(
662+
size: size,
663+
availableSlots: size,
664+
waiters: initDeque[Future[void].Raising([CancelledError])](),
665+
)
666+
667+
proc availableSlots*(s: AsyncSemaphore): int =
668+
return s.availableSlots
669+
670+
proc tryAcquire*(s: AsyncSemaphore): bool =
671+
## Attempts to acquire a resource, if successful returns true, otherwise false.
672+
673+
if s.availableSlots > 0:
674+
s.availableSlots.dec
675+
true
676+
else:
677+
false
678+
679+
proc acquire*(
680+
s: AsyncSemaphore
681+
): Future[void] {.async: (raises: [CancelledError], raw: true).} =
682+
## Acquire a resource and decrement the resource counter.
683+
## If no more resources are available, the returned future
684+
## will not complete until the resource count goes above 0.
685+
686+
let fut = newFuture[void]("AsyncSemaphore.acquire")
687+
if s.tryAcquire():
688+
fut.complete()
689+
return fut
690+
691+
s.waiters.addLast(fut)
692+
693+
return fut
694+
695+
proc release*(s: AsyncSemaphore) {.raises: [AsyncSemaphoreError].} =
696+
## Release a resource from the semaphore,
697+
## by picking the first future from waiters queue
698+
## and completing it and incrementing the
699+
## internal resource count.
700+
701+
if s.availableSlots == s.size:
702+
raise newException(AsyncSemaphoreError, "release called without acquire")
703+
704+
s.availableSlots.inc
705+
while s.waiters.len > 0:
706+
var fut = s.waiters.popFirst()
707+
if not fut.finished():
708+
s.availableSlots.dec
709+
fut.complete()
710+
break

tests/testall.nim

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ when (chronosEventEngine in ["epoll", "kqueue"]) or defined(windows):
1111
import testmacro, testsync, testsoon, testtime, testfut, testsignal,
1212
testaddress, testdatagram, teststream, testserver, testbugs, testnet,
1313
testasyncstream, testhttpserver, testshttpserver, testhttpclient,
14-
testproc, testratelimit, testfutures, testthreadsync
14+
testproc, testratelimit, testfutures, testthreadsync, testasyncsemaphore
1515

1616
# Must be imported last to check for Pending futures
1717
import testutils
@@ -20,7 +20,7 @@ elif chronosEventEngine == "poll":
2020
import testmacro, testsync, testsoon, testtime, testfut, testaddress,
2121
testdatagram, teststream, testserver, testbugs, testnet,
2222
testasyncstream, testhttpserver, testshttpserver, testhttpclient,
23-
testratelimit, testfutures, testthreadsync
23+
testratelimit, testfutures, testthreadsync, testasyncsemaphore
2424

2525
# Must be imported last to check for Pending futures
26-
import testutils
26+
import testutils

tests/testasyncsemaphore.nim

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
# Chronos Test Suite
2+
# (c) Copyright 2018-Present
3+
# Status Research & Development GmbH
4+
#
5+
# Licensed under either of
6+
# Apache License, version 2.0, (LICENSE-APACHEv2)
7+
# MIT license (LICENSE-MIT)
8+
import unittest2
9+
import ../chronos, ../chronos/unittest2/asynctests
10+
11+
{.used.}
12+
13+
suite "AsyncSemaphore":
14+
teardown:
15+
checkLeaks()
16+
17+
asyncTest "default size":
18+
let sema = newAsyncSemaphore()
19+
check sema.availableSlots == 1
20+
21+
asyncTest "custom size":
22+
let sema = newAsyncSemaphore(3)
23+
check sema.availableSlots == 3
24+
25+
asyncTest "invalid size":
26+
expect AssertionDefect:
27+
discard newAsyncSemaphore(0)
28+
29+
asyncTest "should acquire":
30+
let sema = newAsyncSemaphore(3)
31+
32+
await sema.acquire()
33+
check sema.availableSlots == 2
34+
await sema.acquire()
35+
check sema.availableSlots == 1
36+
await sema.acquire()
37+
check sema.availableSlots == 0
38+
39+
asyncTest "should release":
40+
let sema = newAsyncSemaphore(3)
41+
42+
await sema.acquire()
43+
await sema.acquire()
44+
await sema.acquire()
45+
46+
sema.release()
47+
check sema.availableSlots == 1
48+
sema.release()
49+
check sema.availableSlots == 2
50+
sema.release()
51+
check sema.availableSlots == 3
52+
53+
asyncTest "initial release":
54+
let sema = newAsyncSemaphore(3)
55+
56+
expect AsyncSemaphoreError: # should not release
57+
sema.release()
58+
59+
asyncTest "double release":
60+
let sema = newAsyncSemaphore(3)
61+
62+
await sema.acquire()
63+
sema.release()
64+
expect AsyncSemaphoreError: # should not release
65+
sema.release()
66+
67+
asyncTest "should queue acquire":
68+
let sema = newAsyncSemaphore(1)
69+
70+
await sema.acquire()
71+
let fut = sema.acquire()
72+
73+
check sema.availableSlots == 0
74+
sema.release()
75+
sema.release()
76+
77+
await fut
78+
check fut.finished()
79+
80+
asyncTest "should tryAcquire":
81+
let sema = newAsyncSemaphore(1)
82+
await sema.acquire()
83+
check sema.tryAcquire() == false
84+
85+
asyncTest "should tryAcquire and acquire":
86+
let sema = newAsyncSemaphore(4)
87+
check sema.tryAcquire() == true
88+
check sema.tryAcquire() == true
89+
check sema.tryAcquire() == true
90+
check sema.tryAcquire() == true
91+
check sema.availableSlots == 0
92+
93+
let fut = sema.acquire()
94+
check fut.finished == false
95+
check sema.availableSlots == 0
96+
97+
sema.release()
98+
sema.release()
99+
sema.release()
100+
sema.release()
101+
sema.release()
102+
103+
check fut.finished == true
104+
check sema.availableSlots == 4
105+
106+
asyncTest "should cancel sequential semaphore slot":
107+
let sema = newAsyncSemaphore(1)
108+
109+
await sema.acquire()
110+
111+
let
112+
tmp = sema.acquire()
113+
tmp2 = sema.acquire()
114+
check:
115+
not tmp.finished()
116+
not tmp2.finished()
117+
118+
tmp.cancel()
119+
sema.release()
120+
121+
check tmp2.finished()
122+
123+
sema.release()
124+
125+
check await sema.acquire().withTimeout(10.millis)
126+
127+
asyncTest "should handle out of order cancellations":
128+
let sema = newAsyncSemaphore(1)
129+
130+
await sema.acquire() # 1st acquire
131+
let tmp1 = sema.acquire() # 2nd acquire
132+
check not tmp1.finished()
133+
134+
let tmp2 = sema.acquire() # 3rd acquire
135+
check not tmp2.finished()
136+
137+
let tmp3 = sema.acquire() # 4th acquire
138+
check not tmp3.finished()
139+
140+
# up to this point, we've called acquire 4 times
141+
tmp1.cancel() # 1st release (implicit)
142+
tmp2.cancel() # 2nd release (implicit)
143+
144+
check not tmp3.finished() # check that we didn't release the wrong slot
145+
146+
sema.release() # 3rd release (explicit)
147+
check tmp3.finished()
148+
149+
sema.release() # 4th release
150+
check await sema.acquire().withTimeout(10.millis)
151+
152+
asyncTest "should properly handle timeouts and cancellations":
153+
let sema = newAsyncSemaphore(1)
154+
155+
await sema.acquire()
156+
check not (await sema.acquire().withTimeout(1.millis))
157+
# should not acquire but cancel
158+
sema.release()
159+
160+
check await sema.acquire().withTimeout(10.millis)
161+

0 commit comments

Comments
 (0)