forked from deterministic-arts/DartsPyLRU
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtest.py
More file actions
142 lines (109 loc) · 4.64 KB
/
test.py
File metadata and controls
142 lines (109 loc) · 4.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
from threading import Thread, RLock, Condition
from unittest import TestCase, main
from darts.lib.utils.lru import AutoLRUCache, CacheLoadError, CacheAbandonedError
from time import sleep
from random import shuffle
class AutoCacheThreadingTest(TestCase):
"""Tests the AutoLRUCache implementation
This class adds tests for the AutoLRUCache class,
which are hard to do in doctest (e.g., due to threading)
Testing for race conditions and bugs stemming from
concurrency is notoriously hard. This test case runs
enough threads (and iterations per thread) to make
sure, that there is nothing glaringly obvious wrong
with the implementation, but it is unlikely to be
able to find non-trivial bugs.
TODO:
- Need tests for the abandoning stuff (clear with
parameter discard_loads=True)
- Need to figure out a way to force a key to be
loaded concurrently, and exercise the error
handling when the loading thread encounters
an exception from the loader function. All
conurrently running threads loading the same
key have to abort properly with the error
signalled by the loader, wrapped as CacheLoadError
exception.
"""
def test_concurrent_access(self):
"""Test concurrent access of AutoLRUCaches
This test function tests the nominal behaviour of a
cache instance, not expecting any exceptions to be
raised.
This test makes sure, that:
- the logic, which keeps track of keys currently
being loaded in a cache, works as advertised,
and collapses multiple concurrent read requests
for the same key into a single `loader` call.
- nothing is ever evicted if the cache is large
enough to accomodate all objects ever being
requested from it.
"""
iterations_per_thread = 1000
number_of_threads = 100
key_range = list(range(4))
loads_lock = RLock()
loads = dict()
start_lock = RLock()
start_condition = Condition(start_lock)
ready_condition = Condition(start_lock)
start_now = False
ready_list = list()
def loader(key):
with loads_lock:
loads[key] = loads.get(key, 0) + 1
# Fake a complicated computation here.
# This should not slow down the test too
# much, as it is ever only called 4 times
# at all (if the cache implementation has
# no bug in the load-coalescing code), so
# we should sleep for at most 4 seconds,
# and expect to sleep less, since these
# sleep calls are likely to happen in parallel
# on different threads.
sleep(1)
return "R(%r)" % (key,)
def shuffled(seq):
copy = list(seq)
shuffle(copy)
return copy
cache = AutoLRUCache(loader, capacity=len(key_range))
def reader():
with start_lock:
while not start_now:
start_condition.wait()
for k in range(iterations_per_thread):
for i in shuffled(key_range):
answer = cache.load(i)
self.assertEqual("R(%r)" % (i,), answer)
with start_lock:
ready_list.append(1)
ready_condition.notifyAll()
with start_lock:
for k in range(number_of_threads):
thr = Thread(target=reader)
thr.start()
# Ok. All threads have been started. Now, we can
# send them the start signal
start_now = True
start_condition.notifyAll()
# Must happen in a different `with` block, so that the
# mutex gets released, and the threads have a chance to
# read the start signal. Now, wait for all threads to
# terminate.
with start_lock:
while len(ready_list) < number_of_threads:
ready_condition.wait()
# Make sure, that all keys have actually been requested
# at least once.
self.assertEqual(set(key_range), set(loads.keys()))
# The cache has a capacity such, that it can hold all
# elements nominally ever requested by the readers. So,
# we expect, that every requested key is loaded exactly
# once (due to the cache keeping track of what it is
# currently loading).
for key,count in loads.items():
self.assertEqual(1, count)
self.assertTrue(key in key_range)
if __name__ == '__main__':
main()