Skip to content

Commit

Permalink
Tests: Debug deadlocks in tqdm._lock while running tests
Browse files Browse the repository at this point in the history
  • Loading branch information
davidfstr committed Feb 23, 2024
1 parent baaf506 commit 250f840
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 0 deletions.
3 changes: 3 additions & 0 deletions src/crystal/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,9 @@ def _finish_launch(self, filepath: Optional[str]=None) -> None:
# Immediately enter testing mode
os.environ['CRYSTAL_RUNNING_TESTS'] = 'True'

from crystal.util.tqdm_debug import patch_tqdm_to_debug_deadlocks
patch_tqdm_to_debug_deadlocks(on_deadlock='keep_trying')

# Block until test-related modules are done loading,
# before starting bg_task() on background thread
from crystal.tests.index import run_tests
Expand Down
55 changes: 55 additions & 0 deletions src/crystal/util/tqdm_debug.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import threading
from tqdm.std import TqdmDefaultWriteLock
import traceback
from typing import List, Tuple
from typing_extensions import assert_never, Literal


def patch_tqdm_to_debug_deadlocks(on_deadlock: Literal['raise', 'keep_trying']='raise') -> None:
"""
Patch tqdm to detect potential deadlocks when accessing its internal tqdm._lock.
When a deadlock is detected a warning is printed and one of the following
actions is taken, depending on the value of `on_deadlock`:
* 'raise' - Raise an exception
* 'keep_trying' - Keep trying to acquire the lock anyway.
"""
TqdmDefaultWriteLock.th_lock_holders = [] # List[Tuple[threading.Thread, traceback.StackSummary]]

def acquire(self, *a, **k):
k_no_timeout = dict(k)
if 'timeout' not in k:
k['timeout'] = 5.0
for lock in self.locks:
ok = lock.acquire(*a, **k)
if not ok:
try:
lock_holder = TqdmDefaultWriteLock.th_lock_holders[-1]
except IndexError:
lock_holder = ('<none>', ['<none>'])

print(
f'*** TqdmDefaultWriteLock: Failed to acquire lock in '
f'{k["timeout"]} seconds. '
f'Lock holder is {lock_holder[0]} at:\n'
f'{"".join(lock_holder[1])}')
print(f'*** New acquirer is:\n{"".join(traceback.format_stack())}')
if on_deadlock == 'raise':
raise AssertionError('Failed to acquire TqdmDefaultWriteLock')
elif on_deadlock == 'keep_trying':
print('*** Continuing to try acquiring lock anyway:')
lock.acquire(*a, **k_no_timeout)
else:
assert_never(on_deadlock)

TqdmDefaultWriteLock.th_lock_holders.append((
threading.current_thread(),
traceback.format_stack()
))
TqdmDefaultWriteLock.acquire = acquire

super_release = TqdmDefaultWriteLock.release
def release(self, *args, **kwargs):
TqdmDefaultWriteLock.th_lock_holders.pop()
return super_release(self, *args, **kwargs)
TqdmDefaultWriteLock.release = release

0 comments on commit 250f840

Please sign in to comment.