|
| 1 | +# concurrent Module Documentation |
| 2 | + |
| 3 | +The concurrent module provides a set of concurrently scalable data structures and patterns for Python. This module is designed to support high-performance, scalable programming with Free Threaded Python. |
| 4 | + |
| 5 | +## ConcurrentDict |
| 6 | + |
| 7 | +A concurrently accessible dictionary. |
| 8 | + |
| 9 | +### Methods |
| 10 | + |
| 11 | +* `__init__(scaling=17)`: Initializes a new ConcurrentDict with the specified number of concurrent structures. This relates to the number of threads it supports with good scaling. For optimal performance, this value should be close to the number of cores on the machine. However, under or over estimating this value by a factor of 2 or even more does not have a huge impact on performance. |
| 12 | +* `get(key)`: Returns the value associated with the specified key. |
| 13 | +* `set(key, value)`: Sets the value associated with the specified key. |
| 14 | +* `has(key)`: Returns True if the key is present in the dictionary, False otherwise. |
| 15 | + |
| 16 | +### Operators |
| 17 | + |
| 18 | +ConcurrentDict also supports the following access methods: |
| 19 | + |
| 20 | +* `d[key]`: Returns the value associated with the specified key. |
| 21 | +* `d[key] = value`: Sets the value associated with the specified key. |
| 22 | +* `del d[key]`: Deletes the key-value pair associated with the specified key. |
| 23 | + |
| 24 | +Note that the `in` operator is not supported because of implementation details in the CPython runtime. Please use `has` instead. |
| 25 | + |
| 26 | +### Notes |
| 27 | + |
| 28 | +ConcurrentDict does not support all the API methods of a built-in dict. It is designed for basic key-value store operations in a concurrent environment. |
| 29 | + |
| 30 | +### Example |
| 31 | +```python |
| 32 | +from ft_utils.concurrent import ConcurrentDict |
| 33 | + |
| 34 | +d = ConcurrentDict() |
| 35 | +d['key'] = 'value' |
| 36 | +print(d['key']) # prints 'value' |
| 37 | +print(d.has('key')) # prints True |
| 38 | +del d['key'] |
| 39 | +print(d.has('key')) # prints False |
| 40 | +``` |
| 41 | + |
| 42 | +## AtomicInt64 |
| 43 | + |
| 44 | +A 64-bit integer that can be updated atomically. |
| 45 | + |
| 46 | +### Methods |
| 47 | + |
| 48 | +* `__init__(value=0)`: Initializes a new AtomicInt64 with the specified value. |
| 49 | +* `get()`: Returns the current value. |
| 50 | +* `set(value)`: Sets the value. |
| 51 | +* `incr()`: Increments the value and returns the new value. |
| 52 | +* `decr()`: Decrements the value and returns the new value. |
| 53 | + |
| 54 | +In addition the following numeric methods are implemented. |
| 55 | + |
| 56 | +* `__sub__(self, other)`: Returns the result of subtracting `other` from `self`. |
| 57 | +* `__mul__(self, other)`: Returns the result of multiplying `self` by `other`. |
| 58 | +* `__floordiv__(self, other)`: Returns the largest whole number less than or equal to the result of dividing `self` by `other`. |
| 59 | +* `__or__(self, other)`: Returns the bitwise OR of `self` and `other`. |
| 60 | +* `__xor__(self, other)`: Returns the bitwise XOR of `self` and `other`. |
| 61 | +* `__and__(self, other)`: Returns the bitwise AND of `self` and `other`. |
| 62 | +* `__neg__(self)`: Returns the negative of `self`. |
| 63 | +* `__pos__(self)`: Returns `self` unchanged. |
| 64 | +* `__abs__(self)`: Returns the absolute value of `self`. |
| 65 | +* `__invert__(self)`: Returns the bitwise NOT of `self`. |
| 66 | +* `__bool__(self)`: Returns `True` if `self` is non-zero, `False` otherwise. |
| 67 | +* `__iadd__(self, other)`: Adds `other` to `self` in-place and returns `self`. |
| 68 | +* `__isub__(self, other)`: Subtracts `other` from `self` in-place and returns `self`. |
| 69 | +* `__imul__(self, other)`: Multiplies `self` by `other` in-place and returns `self`. |
| 70 | +* `__ifloordiv__(self, other)`: Divides `self` by `other` in-place using floor division and returns `self`. |
| 71 | +* `__ior__(self, other)`: Performs a bitwise OR of `self` and `other` in-place and returns `self`. |
| 72 | +* `__ixor__(self, other)`: Performs a bitwise XOR of `self` and `other` in-place and returns `self`. |
| 73 | +* `__iand__(self, other)`: Performs a bitwise AND of `self` and `other` in-place and returns `self`. |
| 74 | +* `__int__(self)`: Returns an integer representation of `self`. |
| 75 | + |
| 76 | +### Example |
| 77 | + |
| 78 | +```python |
| 79 | +from ft_utils.concurrent import AtomicInt64 |
| 80 | + |
| 81 | +i = AtomicInt64(10) |
| 82 | +print(i.get()) # prints 10 |
| 83 | +i.incr() |
| 84 | +print(i.get()) # prints 11 |
| 85 | +i.add(5) |
| 86 | +print(i.get()) # prints 16 |
| 87 | +``` |
| 88 | + |
| 89 | +## AtomicReference |
| 90 | + |
| 91 | +A reference that can be updated atomically. |
| 92 | + |
| 93 | +### Methods |
| 94 | + |
| 95 | +* `__init__(obj=None)`: Initializes a new AtomicReference with the specified object. |
| 96 | +* `get()`: Returns the current object. |
| 97 | +* `set(obj)`: Sets the object. |
| 98 | +* `exchange(obj)`: Exchanges the current object with the specified object and returns the previous object. |
| 99 | +* `compare_exchange(expected, obj)`: Compares the current object with the expected object and exchanges it with the specified object if they match. Returns `True` if the exchange happened, `False` otherwise. |
| 100 | + |
| 101 | +### Using compare_exchange |
| 102 | + |
| 103 | +The `compare_exchange` method can be used in a loop to atomically update the reference, similar to using the CAS instruction in native programming. |
| 104 | + |
| 105 | +### Example |
| 106 | + |
| 107 | +```python |
| 108 | +from ft_utils.concurrent import AtomicReference |
| 109 | + |
| 110 | +r = AtomicReference(0) |
| 111 | + |
| 112 | +def increment(r): |
| 113 | + while True: |
| 114 | + current = r.get() |
| 115 | + new_value = current + 1 |
| 116 | + if r.compare_exchange(current, new_value): |
| 117 | + break |
| 118 | + |
| 119 | +increment(r) |
| 120 | +print(r.get()) # prints 1 |
| 121 | +``` |
| 122 | + |
| 123 | +In this example, the `increment` function uses a loop to atomically increment the value of the AtomicReference. The `compare_exchange` method is used to check if the current value is still the same as the expected value, and if so, updates the value to the new value. If another thread has updated the value in the meantime, the `compare_exchange` method will return `False` and the loop will retry. |
| 124 | + |
| 125 | +Here are the documents for the new classes: |
| 126 | + |
| 127 | +## AtomicFlag |
| 128 | + |
| 129 | +A boolean flag that can be updated atomically. |
| 130 | + |
| 131 | +### Methods |
| 132 | + |
| 133 | +* `__init__(value)`: Initializes a new AtomicFlag with the specified value. |
| 134 | +* `set(value)`: Sets the value of the flag. |
| 135 | +* `__bool__()`: Returns the current value of the flag. |
| 136 | + |
| 137 | +### Example |
| 138 | +```python |
| 139 | +from ft_utils.concurrent import AtomicFlag |
| 140 | + |
| 141 | +flag = AtomicFlag(True) |
| 142 | +print(flag) # prints True |
| 143 | +flag.set(False) |
| 144 | +print(flag) # prints False |
| 145 | +``` |
| 146 | + |
| 147 | +## ConcurrentGatheringIterator |
| 148 | + |
| 149 | +A concurrent iterator that gathers values from multiple threads and yields them in order. |
| 150 | + |
| 151 | +### Methods |
| 152 | + |
| 153 | +* `__init__(scaling)`: Initializes a new ConcurrentGatheringIterator with the specified scaling factor. |
| 154 | +* `insert(key, value)`: Inserts a key-value pair into the iterator. |
| 155 | +* `iterator(max_key, clear)`: Returns an iterator that reads and deletes key-value pairs from the iterator in order. |
| 156 | + |
| 157 | +### Notes |
| 158 | + |
| 159 | +* The iterator uses a ConcurrentDict to store the key-value pairs. |
| 160 | +* The `insert` method is thread-safe and can be called from multiple threads. |
| 161 | +* The `iterator` method returns an iterator that yields the values in order, blocking if the next value is not available. |
| 162 | +* max_key passed to iterator tells the iterator at what point to stop iteration; i.e. all expected values have been gathered.. |
| 163 | +* If an exception occurs during insertion, the iterator will fail with a RuntimeError. |
| 164 | +* scaling passed to the __init__ function governs the number of threads the iterator supports with good scaling. |
| 165 | + |
| 166 | +### Example |
| 167 | + |
| 168 | +```python |
| 169 | +from ft_utils.concurrent import ConcurrentGatheringIterator |
| 170 | + |
| 171 | +iterator = ConcurrentGatheringIterator() |
| 172 | +iterator.insert(0, 'value0') |
| 173 | +iterator.insert(1, 'value1') |
| 174 | +iterator.insert(2, 'value2') |
| 175 | + |
| 176 | +for value in iterator.iterator(2): |
| 177 | + print(value) # prints 'value0', 'value1', 'value2' |
| 178 | +``` |
| 179 | + |
| 180 | +A more complex example: |
| 181 | + |
| 182 | +```python |
| 183 | +from ft_utils.concurrent import ConcurrentGatheringIterator, AtomicInt64 |
| 184 | +from concurrent.futures import ThreadPoolExecutor |
| 185 | + |
| 186 | +def insert_value(iterator, atomic_index, value): |
| 187 | + index = atomic_index.incr() |
| 188 | + iterator.insert(index, value) |
| 189 | + |
| 190 | +def test_concurrent_gathering_iterator(): |
| 191 | + iterator = ConcurrentGatheringIterator() |
| 192 | + atomic_index = AtomicInt64(-1) |
| 193 | + |
| 194 | + with ThreadPoolExecutor(max_workers=10) as executor: |
| 195 | + futures = [] |
| 196 | + for i in range(100): |
| 197 | + futures.append(executor.submit(insert_value, iterator, atomic_index, i)) |
| 198 | + |
| 199 | + for future in futures: |
| 200 | + future.result() |
| 201 | + |
| 202 | + results = list(iterator.iterator(99)) |
| 203 | + assert results == list(range(100)) |
| 204 | + |
| 205 | +test_concurrent_gathering_iterator() |
| 206 | +``` |
| 207 | + |
| 208 | +In this example, we use a ThreadPoolExecutor to insert values into the ConcurrentGatheringIterator from multiple threads. We use an AtomicInt64 to generate the indices in order. After inserting all the values, we retrieve the results from the iterator and check that they are in the correct order. |
| 209 | + |
| 210 | +Note that the `insert_value` function is a helper function that inserts a value into the iterator at the next available index. The `test_concurrent_gathering_iterator` function is the main test function that creates the iterator, inserts values from multiple threads, and checks the results. |
| 211 | + |
| 212 | +This example demonstrates that the ConcurrentGatheringIterator can handle concurrent inserts from multiple threads and still produce the correct results in order. |
| 213 | + |
| 214 | +## ConcurrentQueue |
| 215 | + |
| 216 | +A concurrent queue that allows multiple threads to push and pop values. |
| 217 | + |
| 218 | +### Methods |
| 219 | + |
| 220 | +* `__init__(scaling)`: Initializes a new ConcurrentQueue with the specified scaling factor. |
| 221 | +* `push(value)`: Pushes a value onto the queue. |
| 222 | +* `pop()`: Pops a value from the queue. |
| 223 | + |
| 224 | +### Notes |
| 225 | + |
| 226 | +* The queue uses a ConcurrentDict to store the values. |
| 227 | +* The `push` method is thread-safe and can be called from multiple threads. |
| 228 | +* The `pop` method returns the next value in the queue, blocking if the queue is empty. |
| 229 | +* If an exception occurs during push, the queue will fail with a RuntimeError. |
| 230 | +* scaling passed to the __init__ function governs the relates to the number of threads it supports with good scaling. |
| 231 | + |
| 232 | +### Example |
| 233 | + |
| 234 | +```python |
| 235 | +from ft_utils.concurrent import ConcurrentQueue |
| 236 | + |
| 237 | +queue = ConcurrentQueue() |
| 238 | +queue.push('value0') |
| 239 | +queue.push('value1') |
| 240 | +queue.push('value2') |
| 241 | + |
| 242 | +print(queue.pop()) # prints 'value0' |
| 243 | +print(queue.pop()) # prints 'value1' |
| 244 | +print(queue.pop()) # prints 'value2' |
| 245 | +``` |
0 commit comments