From c9fdb903f1f7d3e2d4b9b33a83a4369bb7766e9c Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Tue, 24 Sep 2024 08:05:00 +0200 Subject: [PATCH 1/3] Revert "Start threads only after calling `read` and `write` on ThreadedGzip" This reverts commit 1c3f210089c62fb567ee42b5de05d1c0fdd77a7c. --- src/zlib_ng/gzip_ng_threaded.py | 49 ++++++++++++++------------------- tests/test_gzip_ng_threaded.py | 1 - 2 files changed, 20 insertions(+), 30 deletions(-) diff --git a/src/zlib_ng/gzip_ng_threaded.py b/src/zlib_ng/gzip_ng_threaded.py index 8a7e54a..7fa7249 100644 --- a/src/zlib_ng/gzip_ng_threaded.py +++ b/src/zlib_ng/gzip_ng_threaded.py @@ -98,9 +98,11 @@ def __init__(self, filename, queue_size=2, block_size=1024 * 1024): self.exception = None self.buffer = io.BytesIO() self.block_size = block_size - self.worker = threading.Thread(target=self._decompress) + # Using a daemon thread prevents programs freezing on error. + self.worker = threading.Thread(target=self._decompress, daemon=True) self._closed = False - self.running = False + self.running = True + self.worker.start() def _check_closed(self, msg=None): if self._closed: @@ -124,19 +126,8 @@ def _decompress(self): except queue.Full: pass - def _start(self): - if not self.running: - self.running = True - self.worker.start() - - def _stop(self): - if self.running: - self.running = False - self.worker.join() - def readinto(self, b): self._check_closed() - self._start() result = self.buffer.readinto(b) if result == 0: while True: @@ -164,7 +155,8 @@ def tell(self) -> int: def close(self) -> None: if self._closed: return - self._stop() + self.running = False + self.worker.join() self.fileobj.close() if self.closefd: self.raw.close() @@ -240,9 +232,10 @@ def __init__(self, queue.Queue(queue_size) for _ in range(threads)] self.output_queues: List[queue.Queue[Tuple[bytes, int, int]]] = [ queue.Queue(queue_size) for _ in range(threads)] - self.output_worker = threading.Thread(target=self._write) + # Using daemon threads prevents a program freezing on error. + self.output_worker = threading.Thread(target=self._write, daemon=True) self.compression_workers = [ - threading.Thread(target=self._compress, args=(i,)) + threading.Thread(target=self._compress, args=(i,), daemon=True) for i in range(threads) ] elif threads == 1: @@ -250,7 +243,7 @@ def __init__(self, self.output_queues = [] self.compression_workers = [] self.output_worker = threading.Thread( - target=self._compress_and_write) + target=self._compress_and_write, daemon=True) else: raise ValueError(f"threads should be at least 1, got {threads}") self.threads = threads @@ -261,6 +254,7 @@ def __init__(self, self.raw, self.closefd = open_as_binary_stream(filename, mode) self._closed = False self._write_gzip_header() + self.start() def _check_closed(self, msg=None): if self._closed: @@ -283,24 +277,21 @@ def _write_gzip_header(self): self.raw.write(struct.pack( "BBBBIBB", magic1, magic2, method, flags, mtime, os, xfl)) - def _start(self): - if not self.running: - self.running = True - self.output_worker.start() - for worker in self.compression_workers: - worker.start() + def start(self): + self.running = True + self.output_worker.start() + for worker in self.compression_workers: + worker.start() def stop(self): """Stop, but do not care for remaining work""" - if self.running: - self.running = False - for worker in self.compression_workers: - worker.join() - self.output_worker.join() + self.running = False + for worker in self.compression_workers: + worker.join() + self.output_worker.join() def write(self, b) -> int: self._check_closed() - self._start() with self.lock: if self.exception: raise self.exception diff --git a/tests/test_gzip_ng_threaded.py b/tests/test_gzip_ng_threaded.py index 1a0a5a8..7ed06de 100644 --- a/tests/test_gzip_ng_threaded.py +++ b/tests/test_gzip_ng_threaded.py @@ -105,7 +105,6 @@ def test_threaded_write_error(threads): threads=threads, block_size=8 * 1024) # Bypass the write method which should not allow blocks larger than # block_size. - f._start() f.input_queues[0].put((os.urandom(1024 * 64), b"")) with pytest.raises(OverflowError) as error: f.close() From 9f468570288b7b466db16abf2a15181f58be6af0 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Tue, 24 Sep 2024 08:05:21 +0200 Subject: [PATCH 2/3] Revert "ThreadedGzip classes cannot prevent a program from exiting anymore." This reverts commit 050fb001b39a99862abf35275f612a0061a973db. --- CHANGELOG.rst | 6 ------ src/zlib_ng/gzip_ng_threaded.py | 10 ++++------ 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index bdd0f54..e38a722 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -7,12 +7,6 @@ Changelog .. This document is user facing. Please word the changes in such a way .. that users understand how the changes affect the new version. -version 0.5.1 ------------------ -+ Fix a bug where ``gzip_ng_threaded.open`` could - cause a hang when the program exited and the program was not used with a - context manager. - version 0.5.0 ----------------- + Wheels are now build for MacOS arm64 architectures. diff --git a/src/zlib_ng/gzip_ng_threaded.py b/src/zlib_ng/gzip_ng_threaded.py index 7fa7249..5b8a9ff 100644 --- a/src/zlib_ng/gzip_ng_threaded.py +++ b/src/zlib_ng/gzip_ng_threaded.py @@ -98,8 +98,7 @@ def __init__(self, filename, queue_size=2, block_size=1024 * 1024): self.exception = None self.buffer = io.BytesIO() self.block_size = block_size - # Using a daemon thread prevents programs freezing on error. - self.worker = threading.Thread(target=self._decompress, daemon=True) + self.worker = threading.Thread(target=self._decompress) self._closed = False self.running = True self.worker.start() @@ -232,10 +231,9 @@ def __init__(self, queue.Queue(queue_size) for _ in range(threads)] self.output_queues: List[queue.Queue[Tuple[bytes, int, int]]] = [ queue.Queue(queue_size) for _ in range(threads)] - # Using daemon threads prevents a program freezing on error. - self.output_worker = threading.Thread(target=self._write, daemon=True) + self.output_worker = threading.Thread(target=self._write) self.compression_workers = [ - threading.Thread(target=self._compress, args=(i,), daemon=True) + threading.Thread(target=self._compress, args=(i,)) for i in range(threads) ] elif threads == 1: @@ -243,7 +241,7 @@ def __init__(self, self.output_queues = [] self.compression_workers = [] self.output_worker = threading.Thread( - target=self._compress_and_write, daemon=True) + target=self._compress_and_write) else: raise ValueError(f"threads should be at least 1, got {threads}") self.threads = threads From 4a25e32156b9c0e0bf3e2fb46d99893e772de896 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Tue, 24 Sep 2024 08:30:47 +0200 Subject: [PATCH 3/3] Fix infinite while loops with a thread is alive check --- CHANGELOG.rst | 5 +++++ src/zlib_ng/gzip_ng_threaded.py | 12 +++++++----- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index e38a722..f59dea5 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -7,6 +7,11 @@ Changelog .. This document is user facing. Please word the changes in such a way .. that users understand how the changes affect the new version. +version 0.5.1-dev +----------------- ++ Threaded reading and writing do no longer block exiting when an exception + occurs in the main thread. + version 0.5.0 ----------------- + Wheels are now build for MacOS arm64 architectures. diff --git a/src/zlib_ng/gzip_ng_threaded.py b/src/zlib_ng/gzip_ng_threaded.py index 5b8a9ff..a1cd918 100644 --- a/src/zlib_ng/gzip_ng_threaded.py +++ b/src/zlib_ng/gzip_ng_threaded.py @@ -101,6 +101,7 @@ def __init__(self, filename, queue_size=2, block_size=1024 * 1024): self.worker = threading.Thread(target=self._decompress) self._closed = False self.running = True + self._calling_thread = threading.current_thread() self.worker.start() def _check_closed(self, msg=None): @@ -110,7 +111,7 @@ def _check_closed(self, msg=None): def _decompress(self): block_size = self.block_size block_queue = self.queue - while self.running: + while self.running and self._calling_thread.is_alive(): try: data = self.fileobj.read(block_size) except Exception as e: @@ -118,7 +119,7 @@ def _decompress(self): return if not data: return - while self.running: + while self.running and self._calling_thread.is_alive(): try: block_queue.put(data, timeout=0.05) break @@ -215,6 +216,7 @@ def __init__(self, if "b" not in mode: mode += "b" self.lock = threading.Lock() + self._calling_thread = threading.current_thread() self.exception: Optional[Exception] = None self.level = level self.previous_block = b"" @@ -349,7 +351,7 @@ def _compress(self, index: int): in_queue = self.input_queues[index] out_queue = self.output_queues[index] compressor: zlib_ng._ParallelCompress = self.compressors[index] - while True: + while self._calling_thread.is_alive(): try: data, zdict = in_queue.get(timeout=0.05) except queue.Empty: @@ -372,7 +374,7 @@ def _write(self): fp = self.raw total_crc = 0 size = 0 - while True: + while self._calling_thread.is_alive(): out_index = index % self.threads output_queue = output_queues[out_index] try: @@ -397,7 +399,7 @@ def _compress_and_write(self): size = 0 in_queue = self.input_queues[0] compressor = self.compressors[0] - while True: + while self._calling_thread.is_alive(): try: data, zdict = in_queue.get(timeout=0.05) except queue.Empty: