From df45eaf374bfbae48d54e628b9402404efbcb4b0 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Wed, 11 Sep 2024 09:29:14 +0200 Subject: [PATCH 1/3] Reproduce error where program with error does not close. Fix linting --- tests/test_gzip_ng_threaded.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/tests/test_gzip_ng_threaded.py b/tests/test_gzip_ng_threaded.py index 1032a67..7ed06de 100644 --- a/tests/test_gzip_ng_threaded.py +++ b/tests/test_gzip_ng_threaded.py @@ -9,6 +9,8 @@ import io import itertools import os +import subprocess +import sys import tempfile from pathlib import Path @@ -209,3 +211,22 @@ def test_threaded_writer_does_not_close_stream(): assert not test_stream.closed test_stream.seek(0) assert gzip.decompress(test_stream.read()) == b"thisisatest" + + +@pytest.mark.timeout(5) +@pytest.mark.parametrize( + ["mode", "threads"], itertools.product(["rb", "wb"], [1, 2])) +def test_threaded_program_can_exit_on_error(tmp_path, mode, threads): + program = tmp_path / "no_context_manager.py" + test_file = tmp_path / "output.gz" + # Write 40 mb input data to saturate read buffer. Because of the repetitive + # nature the resulting gzip file is very small (~40 KiB). + test_file.write_bytes(gzip.compress(b"test" * (10 * 1024 * 1024))) + with open(program, "wt") as f: + f.write("from zlib_ng import gzip_ng_threaded\n") + f.write( + f"f = gzip_ng_threaded.open('{test_file}', " + f"mode='{mode}', threads={threads})\n" + ) + f.write("raise Exception('Error')\n") + subprocess.run([sys.executable, str(program)]) From 050fb001b39a99862abf35275f612a0061a973db Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Wed, 11 Sep 2024 09:40:32 +0200 Subject: [PATCH 2/3] ThreadedGzip classes cannot prevent a program from exiting anymore. --- CHANGELOG.rst | 6 ++++++ src/zlib_ng/gzip_ng_threaded.py | 10 ++++++---- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index e38a722..bdd0f54 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -7,6 +7,12 @@ Changelog .. This document is user facing. Please word the changes in such a way .. that users understand how the changes affect the new version. +version 0.5.1 +----------------- ++ Fix a bug where ``gzip_ng_threaded.open`` could + cause a hang when the program exited and the program was not used with a + context manager. + version 0.5.0 ----------------- + Wheels are now build for MacOS arm64 architectures. diff --git a/src/zlib_ng/gzip_ng_threaded.py b/src/zlib_ng/gzip_ng_threaded.py index 5b8a9ff..7fa7249 100644 --- a/src/zlib_ng/gzip_ng_threaded.py +++ b/src/zlib_ng/gzip_ng_threaded.py @@ -98,7 +98,8 @@ def __init__(self, filename, queue_size=2, block_size=1024 * 1024): self.exception = None self.buffer = io.BytesIO() self.block_size = block_size - self.worker = threading.Thread(target=self._decompress) + # Using a daemon thread prevents programs freezing on error. + self.worker = threading.Thread(target=self._decompress, daemon=True) self._closed = False self.running = True self.worker.start() @@ -231,9 +232,10 @@ def __init__(self, queue.Queue(queue_size) for _ in range(threads)] self.output_queues: List[queue.Queue[Tuple[bytes, int, int]]] = [ queue.Queue(queue_size) for _ in range(threads)] - self.output_worker = threading.Thread(target=self._write) + # Using daemon threads prevents a program freezing on error. + self.output_worker = threading.Thread(target=self._write, daemon=True) self.compression_workers = [ - threading.Thread(target=self._compress, args=(i,)) + threading.Thread(target=self._compress, args=(i,), daemon=True) for i in range(threads) ] elif threads == 1: @@ -241,7 +243,7 @@ def __init__(self, self.output_queues = [] self.compression_workers = [] self.output_worker = threading.Thread( - target=self._compress_and_write) + target=self._compress_and_write, daemon=True) else: raise ValueError(f"threads should be at least 1, got {threads}") self.threads = threads From 1c3f210089c62fb567ee42b5de05d1c0fdd77a7c Mon Sep 17 00:00:00 2001 From: y5c4l3 Date: Fri, 13 Sep 2024 18:24:49 +0800 Subject: [PATCH 3/3] Start threads only after calling `read` and `write` on ThreadedGzip This patch reverts daemon threads, and postpones the launch of threads until calling `read` / `write` on `ThreadedGzipReader` / `ThreadedGzipWriter`. This is supposed to fix #53. Signed-off-by: y5c4l3 --- src/zlib_ng/gzip_ng_threaded.py | 49 +++++++++++++++++++-------------- tests/test_gzip_ng_threaded.py | 1 + 2 files changed, 30 insertions(+), 20 deletions(-) diff --git a/src/zlib_ng/gzip_ng_threaded.py b/src/zlib_ng/gzip_ng_threaded.py index 7fa7249..8a7e54a 100644 --- a/src/zlib_ng/gzip_ng_threaded.py +++ b/src/zlib_ng/gzip_ng_threaded.py @@ -98,11 +98,9 @@ def __init__(self, filename, queue_size=2, block_size=1024 * 1024): self.exception = None self.buffer = io.BytesIO() self.block_size = block_size - # Using a daemon thread prevents programs freezing on error. - self.worker = threading.Thread(target=self._decompress, daemon=True) + self.worker = threading.Thread(target=self._decompress) self._closed = False - self.running = True - self.worker.start() + self.running = False def _check_closed(self, msg=None): if self._closed: @@ -126,8 +124,19 @@ def _decompress(self): except queue.Full: pass + def _start(self): + if not self.running: + self.running = True + self.worker.start() + + def _stop(self): + if self.running: + self.running = False + self.worker.join() + def readinto(self, b): self._check_closed() + self._start() result = self.buffer.readinto(b) if result == 0: while True: @@ -155,8 +164,7 @@ def tell(self) -> int: def close(self) -> None: if self._closed: return - self.running = False - self.worker.join() + self._stop() self.fileobj.close() if self.closefd: self.raw.close() @@ -232,10 +240,9 @@ def __init__(self, queue.Queue(queue_size) for _ in range(threads)] self.output_queues: List[queue.Queue[Tuple[bytes, int, int]]] = [ queue.Queue(queue_size) for _ in range(threads)] - # Using daemon threads prevents a program freezing on error. - self.output_worker = threading.Thread(target=self._write, daemon=True) + self.output_worker = threading.Thread(target=self._write) self.compression_workers = [ - threading.Thread(target=self._compress, args=(i,), daemon=True) + threading.Thread(target=self._compress, args=(i,)) for i in range(threads) ] elif threads == 1: @@ -243,7 +250,7 @@ def __init__(self, self.output_queues = [] self.compression_workers = [] self.output_worker = threading.Thread( - target=self._compress_and_write, daemon=True) + target=self._compress_and_write) else: raise ValueError(f"threads should be at least 1, got {threads}") self.threads = threads @@ -254,7 +261,6 @@ def __init__(self, self.raw, self.closefd = open_as_binary_stream(filename, mode) self._closed = False self._write_gzip_header() - self.start() def _check_closed(self, msg=None): if self._closed: @@ -277,21 +283,24 @@ def _write_gzip_header(self): self.raw.write(struct.pack( "BBBBIBB", magic1, magic2, method, flags, mtime, os, xfl)) - def start(self): - self.running = True - self.output_worker.start() - for worker in self.compression_workers: - worker.start() + def _start(self): + if not self.running: + self.running = True + self.output_worker.start() + for worker in self.compression_workers: + worker.start() def stop(self): """Stop, but do not care for remaining work""" - self.running = False - for worker in self.compression_workers: - worker.join() - self.output_worker.join() + if self.running: + self.running = False + for worker in self.compression_workers: + worker.join() + self.output_worker.join() def write(self, b) -> int: self._check_closed() + self._start() with self.lock: if self.exception: raise self.exception diff --git a/tests/test_gzip_ng_threaded.py b/tests/test_gzip_ng_threaded.py index 7ed06de..1a0a5a8 100644 --- a/tests/test_gzip_ng_threaded.py +++ b/tests/test_gzip_ng_threaded.py @@ -105,6 +105,7 @@ def test_threaded_write_error(threads): threads=threads, block_size=8 * 1024) # Bypass the write method which should not allow blocks larger than # block_size. + f._start() f.input_queues[0].put((os.urandom(1024 * 64), b"")) with pytest.raises(OverflowError) as error: f.close()