Skip to content

Commit 969e20b

Browse files
committed
fix: expiration cache hanging causing delay in driver
1 parent 74ae5de commit 969e20b

File tree

2 files changed

+47
-22
lines changed

2 files changed

+47
-22
lines changed

aws_advanced_python_wrapper/utils/sliding_expiration_cache.py

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
from __future__ import annotations
1616

17-
from concurrent.futures import Executor, ThreadPoolExecutor
17+
from threading import Thread
1818
from time import perf_counter_ns, sleep
1919
from typing import Callable, Generic, ItemsView, KeysView, Optional, TypeVar
2020

@@ -119,28 +119,26 @@ def __init__(
119119
should_dispose_func: Optional[Callable] = None,
120120
item_disposal_func: Optional[Callable] = None):
121121
super().__init__(cleanup_interval_ns, should_dispose_func, item_disposal_func)
122-
self._executor: Executor = ThreadPoolExecutor(thread_name_prefix="SlidingExpirationCacheWithCleanupThreadExecutor")
123-
self.init_cleanup_thread()
124-
125-
def init_cleanup_thread(self) -> None:
126-
self._executor.submit(self._cleanup_thread_internal)
122+
self._cleanup_thread = Thread(target=self._cleanup_thread_internal, daemon=True)
123+
self._cleanup_thread.start()
127124

128125
def _cleanup_thread_internal(self):
129-
logger.debug("SlidingExpirationCache.CleaningUp")
130-
current_time = perf_counter_ns()
131-
sleep(self._cleanup_interval_ns / 1_000_000_000)
132-
self._cleanup_time_ns.set(current_time + self._cleanup_interval_ns)
133-
keys = [key for key, _ in self._cdict.items()]
134-
for key in keys:
126+
while True:
135127
try:
136-
self._remove_if_expired(key)
128+
sleep(self._cleanup_interval_ns / 1_000_000_000)
129+
logger.debug("SlidingExpirationCache.CleaningUp")
130+
self._cleanup_time_ns.set(perf_counter_ns() + self._cleanup_interval_ns)
131+
keys = [key for key, _ in self._cdict.items()]
132+
for key in keys:
133+
try:
134+
self._remove_if_expired(key)
135+
except Exception:
136+
pass # ignore
137137
except Exception:
138-
pass # ignore
139-
140-
self._executor.shutdown()
138+
break
141139

142140
def _cleanup(self):
143-
pass # do nothing, cleanup thread does the job
141+
pass # cleanup thread handles this
144142

145143

146144
class CacheItem(Generic[V]):

tests/unit/test_sliding_expiration_cache.py

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,9 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
import time
16-
1715
from aws_advanced_python_wrapper.utils.sliding_expiration_cache import \
18-
SlidingExpirationCache
16+
SlidingExpirationCache, SlidingExpirationCacheWithCleanupThread
17+
from time import sleep
1918

2019

2120
def test_compute_if_absent():
@@ -30,7 +29,7 @@ def test_compute_if_absent():
3029
assert "a" == result2
3130
assert "a" == cache.get(1)
3231

33-
time.sleep(0.07) # Python may sleep slightly less than the given value
32+
sleep(0.07) # Python may sleep slightly less than the given value
3433
result3 = cache.compute_if_absent(1, lambda _: "b", 5)
3534
assert "b" == result3
3635
assert "b" == cache.get(1)
@@ -54,7 +53,7 @@ def test_remove():
5453
result = cache.compute_if_absent("non_expired_item", lambda _: non_expired_item, 15_000_000_000)
5554
assert non_expired_item == result
5655

57-
time.sleep(0.07) # Python may sleep slightly less than the given value
56+
sleep(0.07) # Python may sleep slightly less than the given value
5857
cache.remove("item_to_remove")
5958

6059
assert cache.get("item_to_remove") is None
@@ -89,6 +88,34 @@ def test_clear():
8988
assert item2.disposed is True
9089

9190

91+
def test_cleanup_thread_continuous_removal():
92+
# Use very short cleanup interval for testing (100ms)
93+
cache = SlidingExpirationCacheWithCleanupThread(
94+
cleanup_interval_ns=100_000_000, # 100ms
95+
item_disposal_func=lambda item: item.dispose()
96+
)
97+
98+
# First cycle: insert item that expires quickly
99+
item1 = DisposableItem(True)
100+
cache.compute_if_absent("key1", lambda _: item1, 50_000_000) # 50ms expiration
101+
assert cache.get("key1") == item1
102+
103+
# Wait for cleanup thread to remove expired item
104+
sleep(0.2) # Wait 200ms for cleanup
105+
assert cache.get("key1") is None
106+
assert item1.disposed is True
107+
108+
# Second cycle: insert another item that expires quickly
109+
item2 = DisposableItem(True)
110+
cache.compute_if_absent("key2", lambda _: item2, 50_000_000) # 50ms expiration
111+
assert cache.get("key2") == item2
112+
113+
# Wait for cleanup thread to remove second expired item
114+
sleep(0.2) # Wait 200ms for cleanup
115+
assert cache.get("key2") is None
116+
assert item2.disposed is True
117+
118+
92119
class DisposableItem:
93120
def __init__(self, should_dispose):
94121
self.should_dispose = should_dispose

0 commit comments

Comments
 (0)