Skip to content

Commit b0728bb

Browse files
authored
Merge pull request #58 from pycompression/fixdeadlock
Fixdeadlock
2 parents 6b942fb + 4a25e32 commit b0728bb

File tree

3 files changed

+24
-35
lines changed

3 files changed

+24
-35
lines changed

Diff for: CHANGELOG.rst

+3-4
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,10 @@ Changelog
77
.. This document is user facing. Please word the changes in such a way
88
.. that users understand how the changes affect the new version.
99
10-
version 0.5.1
10+
version 0.5.1-dev
1111
-----------------
12-
+ Fix a bug where ``gzip_ng_threaded.open`` could
13-
cause a hang when the program exited and the program was not used with a
14-
context manager.
12+
+ Threaded reading and writing do no longer block exiting when an exception
13+
occurs in the main thread.
1514

1615
version 0.5.0
1716
-----------------

Diff for: src/zlib_ng/gzip_ng_threaded.py

+21-30
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,9 @@ def __init__(self, filename, queue_size=2, block_size=1024 * 1024):
100100
self.block_size = block_size
101101
self.worker = threading.Thread(target=self._decompress)
102102
self._closed = False
103-
self.running = False
103+
self.running = True
104+
self._calling_thread = threading.current_thread()
105+
self.worker.start()
104106

105107
def _check_closed(self, msg=None):
106108
if self._closed:
@@ -109,34 +111,23 @@ def _check_closed(self, msg=None):
109111
def _decompress(self):
110112
block_size = self.block_size
111113
block_queue = self.queue
112-
while self.running:
114+
while self.running and self._calling_thread.is_alive():
113115
try:
114116
data = self.fileobj.read(block_size)
115117
except Exception as e:
116118
self.exception = e
117119
return
118120
if not data:
119121
return
120-
while self.running:
122+
while self.running and self._calling_thread.is_alive():
121123
try:
122124
block_queue.put(data, timeout=0.05)
123125
break
124126
except queue.Full:
125127
pass
126128

127-
def _start(self):
128-
if not self.running:
129-
self.running = True
130-
self.worker.start()
131-
132-
def _stop(self):
133-
if self.running:
134-
self.running = False
135-
self.worker.join()
136-
137129
def readinto(self, b):
138130
self._check_closed()
139-
self._start()
140131
result = self.buffer.readinto(b)
141132
if result == 0:
142133
while True:
@@ -164,7 +155,8 @@ def tell(self) -> int:
164155
def close(self) -> None:
165156
if self._closed:
166157
return
167-
self._stop()
158+
self.running = False
159+
self.worker.join()
168160
self.fileobj.close()
169161
if self.closefd:
170162
self.raw.close()
@@ -224,6 +216,7 @@ def __init__(self,
224216
if "b" not in mode:
225217
mode += "b"
226218
self.lock = threading.Lock()
219+
self._calling_thread = threading.current_thread()
227220
self.exception: Optional[Exception] = None
228221
self.level = level
229222
self.previous_block = b""
@@ -261,6 +254,7 @@ def __init__(self,
261254
self.raw, self.closefd = open_as_binary_stream(filename, mode)
262255
self._closed = False
263256
self._write_gzip_header()
257+
self.start()
264258

265259
def _check_closed(self, msg=None):
266260
if self._closed:
@@ -283,24 +277,21 @@ def _write_gzip_header(self):
283277
self.raw.write(struct.pack(
284278
"BBBBIBB", magic1, magic2, method, flags, mtime, os, xfl))
285279

286-
def _start(self):
287-
if not self.running:
288-
self.running = True
289-
self.output_worker.start()
290-
for worker in self.compression_workers:
291-
worker.start()
280+
def start(self):
281+
self.running = True
282+
self.output_worker.start()
283+
for worker in self.compression_workers:
284+
worker.start()
292285

293286
def stop(self):
294287
"""Stop, but do not care for remaining work"""
295-
if self.running:
296-
self.running = False
297-
for worker in self.compression_workers:
298-
worker.join()
299-
self.output_worker.join()
288+
self.running = False
289+
for worker in self.compression_workers:
290+
worker.join()
291+
self.output_worker.join()
300292

301293
def write(self, b) -> int:
302294
self._check_closed()
303-
self._start()
304295
with self.lock:
305296
if self.exception:
306297
raise self.exception
@@ -360,7 +351,7 @@ def _compress(self, index: int):
360351
in_queue = self.input_queues[index]
361352
out_queue = self.output_queues[index]
362353
compressor: zlib_ng._ParallelCompress = self.compressors[index]
363-
while True:
354+
while self._calling_thread.is_alive():
364355
try:
365356
data, zdict = in_queue.get(timeout=0.05)
366357
except queue.Empty:
@@ -383,7 +374,7 @@ def _write(self):
383374
fp = self.raw
384375
total_crc = 0
385376
size = 0
386-
while True:
377+
while self._calling_thread.is_alive():
387378
out_index = index % self.threads
388379
output_queue = output_queues[out_index]
389380
try:
@@ -408,7 +399,7 @@ def _compress_and_write(self):
408399
size = 0
409400
in_queue = self.input_queues[0]
410401
compressor = self.compressors[0]
411-
while True:
402+
while self._calling_thread.is_alive():
412403
try:
413404
data, zdict = in_queue.get(timeout=0.05)
414405
except queue.Empty:

Diff for: tests/test_gzip_ng_threaded.py

-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,6 @@ def test_threaded_write_error(threads):
105105
threads=threads, block_size=8 * 1024)
106106
# Bypass the write method which should not allow blocks larger than
107107
# block_size.
108-
f._start()
109108
f.input_queues[0].put((os.urandom(1024 * 64), b""))
110109
with pytest.raises(OverflowError) as error:
111110
f.close()

0 commit comments

Comments
 (0)