From 12b07034d28206c784bdfd83f47f1d7caa9666a5 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Fri, 9 Aug 2024 16:10:42 +0200 Subject: [PATCH 01/12] Add 3.13 to the supported python versions --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index 95a6a44..8959b04 100644 --- a/setup.py +++ b/setup.py @@ -151,6 +151,7 @@ def build_zlib_ng(): "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", "Programming Language :: Python :: Implementation :: CPython", "Programming Language :: Python :: Implementation :: PyPy", "Programming Language :: C", From df45eaf374bfbae48d54e628b9402404efbcb4b0 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Wed, 11 Sep 2024 09:29:14 +0200 Subject: [PATCH 02/12] Reproduce error where program with error does not close. Fix linting --- tests/test_gzip_ng_threaded.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/tests/test_gzip_ng_threaded.py b/tests/test_gzip_ng_threaded.py index 1032a67..7ed06de 100644 --- a/tests/test_gzip_ng_threaded.py +++ b/tests/test_gzip_ng_threaded.py @@ -9,6 +9,8 @@ import io import itertools import os +import subprocess +import sys import tempfile from pathlib import Path @@ -209,3 +211,22 @@ def test_threaded_writer_does_not_close_stream(): assert not test_stream.closed test_stream.seek(0) assert gzip.decompress(test_stream.read()) == b"thisisatest" + + +@pytest.mark.timeout(5) +@pytest.mark.parametrize( + ["mode", "threads"], itertools.product(["rb", "wb"], [1, 2])) +def test_threaded_program_can_exit_on_error(tmp_path, mode, threads): + program = tmp_path / "no_context_manager.py" + test_file = tmp_path / "output.gz" + # Write 40 mb input data to saturate read buffer. Because of the repetitive + # nature the resulting gzip file is very small (~40 KiB). + test_file.write_bytes(gzip.compress(b"test" * (10 * 1024 * 1024))) + with open(program, "wt") as f: + f.write("from zlib_ng import gzip_ng_threaded\n") + f.write( + f"f = gzip_ng_threaded.open('{test_file}', " + f"mode='{mode}', threads={threads})\n" + ) + f.write("raise Exception('Error')\n") + subprocess.run([sys.executable, str(program)]) From 050fb001b39a99862abf35275f612a0061a973db Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Wed, 11 Sep 2024 09:40:32 +0200 Subject: [PATCH 03/12] ThreadedGzip classes cannot prevent a program from exiting anymore. --- CHANGELOG.rst | 6 ++++++ src/zlib_ng/gzip_ng_threaded.py | 10 ++++++---- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index e38a722..bdd0f54 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -7,6 +7,12 @@ Changelog .. This document is user facing. Please word the changes in such a way .. that users understand how the changes affect the new version. +version 0.5.1 +----------------- ++ Fix a bug where ``gzip_ng_threaded.open`` could + cause a hang when the program exited and the program was not used with a + context manager. + version 0.5.0 ----------------- + Wheels are now build for MacOS arm64 architectures. diff --git a/src/zlib_ng/gzip_ng_threaded.py b/src/zlib_ng/gzip_ng_threaded.py index 5b8a9ff..7fa7249 100644 --- a/src/zlib_ng/gzip_ng_threaded.py +++ b/src/zlib_ng/gzip_ng_threaded.py @@ -98,7 +98,8 @@ def __init__(self, filename, queue_size=2, block_size=1024 * 1024): self.exception = None self.buffer = io.BytesIO() self.block_size = block_size - self.worker = threading.Thread(target=self._decompress) + # Using a daemon thread prevents programs freezing on error. + self.worker = threading.Thread(target=self._decompress, daemon=True) self._closed = False self.running = True self.worker.start() @@ -231,9 +232,10 @@ def __init__(self, queue.Queue(queue_size) for _ in range(threads)] self.output_queues: List[queue.Queue[Tuple[bytes, int, int]]] = [ queue.Queue(queue_size) for _ in range(threads)] - self.output_worker = threading.Thread(target=self._write) + # Using daemon threads prevents a program freezing on error. + self.output_worker = threading.Thread(target=self._write, daemon=True) self.compression_workers = [ - threading.Thread(target=self._compress, args=(i,)) + threading.Thread(target=self._compress, args=(i,), daemon=True) for i in range(threads) ] elif threads == 1: @@ -241,7 +243,7 @@ def __init__(self, self.output_queues = [] self.compression_workers = [] self.output_worker = threading.Thread( - target=self._compress_and_write) + target=self._compress_and_write, daemon=True) else: raise ValueError(f"threads should be at least 1, got {threads}") self.threads = threads From 1c3f210089c62fb567ee42b5de05d1c0fdd77a7c Mon Sep 17 00:00:00 2001 From: y5c4l3 Date: Fri, 13 Sep 2024 18:24:49 +0800 Subject: [PATCH 04/12] Start threads only after calling `read` and `write` on ThreadedGzip This patch reverts daemon threads, and postpones the launch of threads until calling `read` / `write` on `ThreadedGzipReader` / `ThreadedGzipWriter`. This is supposed to fix #53. Signed-off-by: y5c4l3 --- src/zlib_ng/gzip_ng_threaded.py | 49 +++++++++++++++++++-------------- tests/test_gzip_ng_threaded.py | 1 + 2 files changed, 30 insertions(+), 20 deletions(-) diff --git a/src/zlib_ng/gzip_ng_threaded.py b/src/zlib_ng/gzip_ng_threaded.py index 7fa7249..8a7e54a 100644 --- a/src/zlib_ng/gzip_ng_threaded.py +++ b/src/zlib_ng/gzip_ng_threaded.py @@ -98,11 +98,9 @@ def __init__(self, filename, queue_size=2, block_size=1024 * 1024): self.exception = None self.buffer = io.BytesIO() self.block_size = block_size - # Using a daemon thread prevents programs freezing on error. - self.worker = threading.Thread(target=self._decompress, daemon=True) + self.worker = threading.Thread(target=self._decompress) self._closed = False - self.running = True - self.worker.start() + self.running = False def _check_closed(self, msg=None): if self._closed: @@ -126,8 +124,19 @@ def _decompress(self): except queue.Full: pass + def _start(self): + if not self.running: + self.running = True + self.worker.start() + + def _stop(self): + if self.running: + self.running = False + self.worker.join() + def readinto(self, b): self._check_closed() + self._start() result = self.buffer.readinto(b) if result == 0: while True: @@ -155,8 +164,7 @@ def tell(self) -> int: def close(self) -> None: if self._closed: return - self.running = False - self.worker.join() + self._stop() self.fileobj.close() if self.closefd: self.raw.close() @@ -232,10 +240,9 @@ def __init__(self, queue.Queue(queue_size) for _ in range(threads)] self.output_queues: List[queue.Queue[Tuple[bytes, int, int]]] = [ queue.Queue(queue_size) for _ in range(threads)] - # Using daemon threads prevents a program freezing on error. - self.output_worker = threading.Thread(target=self._write, daemon=True) + self.output_worker = threading.Thread(target=self._write) self.compression_workers = [ - threading.Thread(target=self._compress, args=(i,), daemon=True) + threading.Thread(target=self._compress, args=(i,)) for i in range(threads) ] elif threads == 1: @@ -243,7 +250,7 @@ def __init__(self, self.output_queues = [] self.compression_workers = [] self.output_worker = threading.Thread( - target=self._compress_and_write, daemon=True) + target=self._compress_and_write) else: raise ValueError(f"threads should be at least 1, got {threads}") self.threads = threads @@ -254,7 +261,6 @@ def __init__(self, self.raw, self.closefd = open_as_binary_stream(filename, mode) self._closed = False self._write_gzip_header() - self.start() def _check_closed(self, msg=None): if self._closed: @@ -277,21 +283,24 @@ def _write_gzip_header(self): self.raw.write(struct.pack( "BBBBIBB", magic1, magic2, method, flags, mtime, os, xfl)) - def start(self): - self.running = True - self.output_worker.start() - for worker in self.compression_workers: - worker.start() + def _start(self): + if not self.running: + self.running = True + self.output_worker.start() + for worker in self.compression_workers: + worker.start() def stop(self): """Stop, but do not care for remaining work""" - self.running = False - for worker in self.compression_workers: - worker.join() - self.output_worker.join() + if self.running: + self.running = False + for worker in self.compression_workers: + worker.join() + self.output_worker.join() def write(self, b) -> int: self._check_closed() + self._start() with self.lock: if self.exception: raise self.exception diff --git a/tests/test_gzip_ng_threaded.py b/tests/test_gzip_ng_threaded.py index 7ed06de..1a0a5a8 100644 --- a/tests/test_gzip_ng_threaded.py +++ b/tests/test_gzip_ng_threaded.py @@ -105,6 +105,7 @@ def test_threaded_write_error(threads): threads=threads, block_size=8 * 1024) # Bypass the write method which should not allow blocks larger than # block_size. + f._start() f.input_queues[0].put((os.urandom(1024 * 64), b"")) with pytest.raises(OverflowError) as error: f.close() From c9fdb903f1f7d3e2d4b9b33a83a4369bb7766e9c Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Tue, 24 Sep 2024 08:05:00 +0200 Subject: [PATCH 05/12] Revert "Start threads only after calling `read` and `write` on ThreadedGzip" This reverts commit 1c3f210089c62fb567ee42b5de05d1c0fdd77a7c. --- src/zlib_ng/gzip_ng_threaded.py | 49 ++++++++++++++------------------- tests/test_gzip_ng_threaded.py | 1 - 2 files changed, 20 insertions(+), 30 deletions(-) diff --git a/src/zlib_ng/gzip_ng_threaded.py b/src/zlib_ng/gzip_ng_threaded.py index 8a7e54a..7fa7249 100644 --- a/src/zlib_ng/gzip_ng_threaded.py +++ b/src/zlib_ng/gzip_ng_threaded.py @@ -98,9 +98,11 @@ def __init__(self, filename, queue_size=2, block_size=1024 * 1024): self.exception = None self.buffer = io.BytesIO() self.block_size = block_size - self.worker = threading.Thread(target=self._decompress) + # Using a daemon thread prevents programs freezing on error. + self.worker = threading.Thread(target=self._decompress, daemon=True) self._closed = False - self.running = False + self.running = True + self.worker.start() def _check_closed(self, msg=None): if self._closed: @@ -124,19 +126,8 @@ def _decompress(self): except queue.Full: pass - def _start(self): - if not self.running: - self.running = True - self.worker.start() - - def _stop(self): - if self.running: - self.running = False - self.worker.join() - def readinto(self, b): self._check_closed() - self._start() result = self.buffer.readinto(b) if result == 0: while True: @@ -164,7 +155,8 @@ def tell(self) -> int: def close(self) -> None: if self._closed: return - self._stop() + self.running = False + self.worker.join() self.fileobj.close() if self.closefd: self.raw.close() @@ -240,9 +232,10 @@ def __init__(self, queue.Queue(queue_size) for _ in range(threads)] self.output_queues: List[queue.Queue[Tuple[bytes, int, int]]] = [ queue.Queue(queue_size) for _ in range(threads)] - self.output_worker = threading.Thread(target=self._write) + # Using daemon threads prevents a program freezing on error. + self.output_worker = threading.Thread(target=self._write, daemon=True) self.compression_workers = [ - threading.Thread(target=self._compress, args=(i,)) + threading.Thread(target=self._compress, args=(i,), daemon=True) for i in range(threads) ] elif threads == 1: @@ -250,7 +243,7 @@ def __init__(self, self.output_queues = [] self.compression_workers = [] self.output_worker = threading.Thread( - target=self._compress_and_write) + target=self._compress_and_write, daemon=True) else: raise ValueError(f"threads should be at least 1, got {threads}") self.threads = threads @@ -261,6 +254,7 @@ def __init__(self, self.raw, self.closefd = open_as_binary_stream(filename, mode) self._closed = False self._write_gzip_header() + self.start() def _check_closed(self, msg=None): if self._closed: @@ -283,24 +277,21 @@ def _write_gzip_header(self): self.raw.write(struct.pack( "BBBBIBB", magic1, magic2, method, flags, mtime, os, xfl)) - def _start(self): - if not self.running: - self.running = True - self.output_worker.start() - for worker in self.compression_workers: - worker.start() + def start(self): + self.running = True + self.output_worker.start() + for worker in self.compression_workers: + worker.start() def stop(self): """Stop, but do not care for remaining work""" - if self.running: - self.running = False - for worker in self.compression_workers: - worker.join() - self.output_worker.join() + self.running = False + for worker in self.compression_workers: + worker.join() + self.output_worker.join() def write(self, b) -> int: self._check_closed() - self._start() with self.lock: if self.exception: raise self.exception diff --git a/tests/test_gzip_ng_threaded.py b/tests/test_gzip_ng_threaded.py index 1a0a5a8..7ed06de 100644 --- a/tests/test_gzip_ng_threaded.py +++ b/tests/test_gzip_ng_threaded.py @@ -105,7 +105,6 @@ def test_threaded_write_error(threads): threads=threads, block_size=8 * 1024) # Bypass the write method which should not allow blocks larger than # block_size. - f._start() f.input_queues[0].put((os.urandom(1024 * 64), b"")) with pytest.raises(OverflowError) as error: f.close() From 9f468570288b7b466db16abf2a15181f58be6af0 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Tue, 24 Sep 2024 08:05:21 +0200 Subject: [PATCH 06/12] Revert "ThreadedGzip classes cannot prevent a program from exiting anymore." This reverts commit 050fb001b39a99862abf35275f612a0061a973db. --- CHANGELOG.rst | 6 ------ src/zlib_ng/gzip_ng_threaded.py | 10 ++++------ 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index bdd0f54..e38a722 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -7,12 +7,6 @@ Changelog .. This document is user facing. Please word the changes in such a way .. that users understand how the changes affect the new version. -version 0.5.1 ------------------ -+ Fix a bug where ``gzip_ng_threaded.open`` could - cause a hang when the program exited and the program was not used with a - context manager. - version 0.5.0 ----------------- + Wheels are now build for MacOS arm64 architectures. diff --git a/src/zlib_ng/gzip_ng_threaded.py b/src/zlib_ng/gzip_ng_threaded.py index 7fa7249..5b8a9ff 100644 --- a/src/zlib_ng/gzip_ng_threaded.py +++ b/src/zlib_ng/gzip_ng_threaded.py @@ -98,8 +98,7 @@ def __init__(self, filename, queue_size=2, block_size=1024 * 1024): self.exception = None self.buffer = io.BytesIO() self.block_size = block_size - # Using a daemon thread prevents programs freezing on error. - self.worker = threading.Thread(target=self._decompress, daemon=True) + self.worker = threading.Thread(target=self._decompress) self._closed = False self.running = True self.worker.start() @@ -232,10 +231,9 @@ def __init__(self, queue.Queue(queue_size) for _ in range(threads)] self.output_queues: List[queue.Queue[Tuple[bytes, int, int]]] = [ queue.Queue(queue_size) for _ in range(threads)] - # Using daemon threads prevents a program freezing on error. - self.output_worker = threading.Thread(target=self._write, daemon=True) + self.output_worker = threading.Thread(target=self._write) self.compression_workers = [ - threading.Thread(target=self._compress, args=(i,), daemon=True) + threading.Thread(target=self._compress, args=(i,)) for i in range(threads) ] elif threads == 1: @@ -243,7 +241,7 @@ def __init__(self, self.output_queues = [] self.compression_workers = [] self.output_worker = threading.Thread( - target=self._compress_and_write, daemon=True) + target=self._compress_and_write) else: raise ValueError(f"threads should be at least 1, got {threads}") self.threads = threads From 4a25e32156b9c0e0bf3e2fb46d99893e772de896 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Tue, 24 Sep 2024 08:30:47 +0200 Subject: [PATCH 07/12] Fix infinite while loops with a thread is alive check --- CHANGELOG.rst | 5 +++++ src/zlib_ng/gzip_ng_threaded.py | 12 +++++++----- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index e38a722..f59dea5 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -7,6 +7,11 @@ Changelog .. This document is user facing. Please word the changes in such a way .. that users understand how the changes affect the new version. +version 0.5.1-dev +----------------- ++ Threaded reading and writing do no longer block exiting when an exception + occurs in the main thread. + version 0.5.0 ----------------- + Wheels are now build for MacOS arm64 architectures. diff --git a/src/zlib_ng/gzip_ng_threaded.py b/src/zlib_ng/gzip_ng_threaded.py index 5b8a9ff..a1cd918 100644 --- a/src/zlib_ng/gzip_ng_threaded.py +++ b/src/zlib_ng/gzip_ng_threaded.py @@ -101,6 +101,7 @@ def __init__(self, filename, queue_size=2, block_size=1024 * 1024): self.worker = threading.Thread(target=self._decompress) self._closed = False self.running = True + self._calling_thread = threading.current_thread() self.worker.start() def _check_closed(self, msg=None): @@ -110,7 +111,7 @@ def _check_closed(self, msg=None): def _decompress(self): block_size = self.block_size block_queue = self.queue - while self.running: + while self.running and self._calling_thread.is_alive(): try: data = self.fileobj.read(block_size) except Exception as e: @@ -118,7 +119,7 @@ def _decompress(self): return if not data: return - while self.running: + while self.running and self._calling_thread.is_alive(): try: block_queue.put(data, timeout=0.05) break @@ -215,6 +216,7 @@ def __init__(self, if "b" not in mode: mode += "b" self.lock = threading.Lock() + self._calling_thread = threading.current_thread() self.exception: Optional[Exception] = None self.level = level self.previous_block = b"" @@ -349,7 +351,7 @@ def _compress(self, index: int): in_queue = self.input_queues[index] out_queue = self.output_queues[index] compressor: zlib_ng._ParallelCompress = self.compressors[index] - while True: + while self._calling_thread.is_alive(): try: data, zdict = in_queue.get(timeout=0.05) except queue.Empty: @@ -372,7 +374,7 @@ def _write(self): fp = self.raw total_crc = 0 size = 0 - while True: + while self._calling_thread.is_alive(): out_index = index % self.threads output_queue = output_queues[out_index] try: @@ -397,7 +399,7 @@ def _compress_and_write(self): size = 0 in_queue = self.input_queues[0] compressor = self.compressors[0] - while True: + while self._calling_thread.is_alive(): try: data, zdict = in_queue.get(timeout=0.05) except queue.Empty: From 1bddda6a203c345a068420b84a51fab24a5707c2 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Tue, 24 Sep 2024 15:36:29 +0200 Subject: [PATCH 08/12] Write a test for proper flushing of the threaded writer --- tests/test_gzip_ng_threaded.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/tests/test_gzip_ng_threaded.py b/tests/test_gzip_ng_threaded.py index 7ed06de..b11ddb9 100644 --- a/tests/test_gzip_ng_threaded.py +++ b/tests/test_gzip_ng_threaded.py @@ -230,3 +230,19 @@ def test_threaded_program_can_exit_on_error(tmp_path, mode, threads): ) f.write("raise Exception('Error')\n") subprocess.run([sys.executable, str(program)]) + + +@pytest.mark.parametrize("threads", [1, 2]) +def test_flush(tmp_path, threads): + test_file = tmp_path / "output.gz" + with gzip_ng_threaded.open(test_file, "wb", threads=threads) as f: + f.write(b"1") + f.flush() + assert gzip.decompress(test_file.read_bytes()) == b"1" + f.write(b"2") + f.flush() + assert gzip.decompress(test_file.read_bytes()) == b"12" + f.write(b"3") + f.flush() + assert gzip.decompress(test_file.read_bytes()) == b"123" + assert gzip.decompress(test_file.read_bytes()) == b"123" From 31125b2e8376846ef5011423dc5ee8bc4803d575 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Tue, 24 Sep 2024 16:25:07 +0200 Subject: [PATCH 09/12] Implement proper flushing behaviour --- CHANGELOG.rst | 2 ++ src/zlib_ng/gzip_ng_threaded.py | 49 +++++++++++++++++---------------- 2 files changed, 27 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index f59dea5..d2a9fa4 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -9,6 +9,8 @@ Changelog version 0.5.1-dev ----------------- ++ Fix a bug where flushing in threaded mode did not write the data to the + output file. + Threaded reading and writing do no longer block exiting when an exception occurs in the main thread. diff --git a/src/zlib_ng/gzip_ng_threaded.py b/src/zlib_ng/gzip_ng_threaded.py index a1cd918..251c0df 100644 --- a/src/zlib_ng/gzip_ng_threaded.py +++ b/src/zlib_ng/gzip_ng_threaded.py @@ -60,7 +60,7 @@ def open(filename, mode="rb", compresslevel=gzip_ng._COMPRESS_LEVEL_TRADEOFF, gzip_file = io.BufferedReader( _ThreadedGzipReader(filename, block_size=block_size)) else: - gzip_file = io.BufferedWriter( + gzip_file = FlushableBufferedWriter( _ThreadedGzipWriter( filename, mode.replace("t", "b"), @@ -167,6 +167,12 @@ def closed(self) -> bool: return self._closed +class FlushableBufferedWriter(io.BufferedWriter): + def flush(self): + super().flush() + self.raw.flush() + + class _ThreadedGzipWriter(io.RawIOBase): """ Write a gzip file using multiple threads. @@ -315,7 +321,7 @@ def write(self, b) -> int: self.input_queues[worker_index].put((data, zdict)) return len(data) - def flush(self): + def _end_gzip_stream(self): self._check_closed() # Wait for all data to be compressed for in_q in self.input_queues: @@ -323,22 +329,27 @@ def flush(self): # Wait for all data to be written for out_q in self.output_queues: out_q.join() + # Write an empty deflate block with a lost block marker. + self.raw.write(zlib_ng.compress(b"", wbits=-15)) + trailer = struct.pack(" None: if self._closed: return - self.flush() + self._end_gzip_stream() self.stop() if self.exception: self.raw.close() self._closed = True raise self.exception - # Write an empty deflate block with a lost block marker. - self.raw.write(zlib_ng.compress(b"", wbits=-15)) - trailer = struct.pack(" Date: Wed, 25 Sep 2024 08:45:28 +0200 Subject: [PATCH 10/12] Fix readthedocs build --- docs/conf.py | 15 ++++----------- requirements-docs.txt | 2 +- 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/docs/conf.py b/docs/conf.py index e2e0bf0..528d846 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -4,26 +4,19 @@ # list see the documentation: # https://www.sphinx-doc.org/en/master/usage/configuration.html -from distutils.dist import DistributionMetadata -from pathlib import Path - -import pkg_resources +import zlib_ng # -- Project information ----------------------------------------------------- -# Get package information from the installed package. -package = pkg_resources.get_distribution("zlib-ng") -metadata_file = Path(package.egg_info) / Path(package.PKG_INFO) -metadata = DistributionMetadata(path=str(metadata_file)) - project = 'python-zlib-ng' copyright = '2023, Leiden University Medical Center' author = 'Leiden University Medical Center' + # The short X.Y version -version = package.parsed_version.base_version +version = zlib_ng.__version__ # The full version, including alpha/beta/rc tags -release = package.version +release = zlib_ng.__version__ # -- General configuration --------------------------------------------------- diff --git a/requirements-docs.txt b/requirements-docs.txt index 7fea598..051c278 100644 --- a/requirements-docs.txt +++ b/requirements-docs.txt @@ -1,4 +1,4 @@ sphinx setuptools -sphinx-rtd-theme>=1.2.0rc3,<1.3 +sphinx-rtd-theme sphinx-argparse \ No newline at end of file From 73a0f1c26fb2000b7990609d171529e4604d89c7 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Wed, 25 Sep 2024 08:49:17 +0200 Subject: [PATCH 11/12] Prepare release_0.5.1 --- CHANGELOG.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index d2a9fa4..1b7cf28 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -7,7 +7,7 @@ Changelog .. This document is user facing. Please word the changes in such a way .. that users understand how the changes affect the new version. -version 0.5.1-dev +version 0.5.1 ----------------- + Fix a bug where flushing in threaded mode did not write the data to the output file. From 1449b9992d5d2e88d312343b53fb3f31d065b930 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Wed, 25 Sep 2024 08:58:31 +0200 Subject: [PATCH 12/12] Slightly rephrase to avoid too many is_alive checks --- src/zlib_ng/gzip_ng_threaded.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/zlib_ng/gzip_ng_threaded.py b/src/zlib_ng/gzip_ng_threaded.py index 251c0df..b9ec269 100644 --- a/src/zlib_ng/gzip_ng_threaded.py +++ b/src/zlib_ng/gzip_ng_threaded.py @@ -362,11 +362,11 @@ def _compress(self, index: int): in_queue = self.input_queues[index] out_queue = self.output_queues[index] compressor: zlib_ng._ParallelCompress = self.compressors[index] - while self._calling_thread.is_alive(): + while True: try: data, zdict = in_queue.get(timeout=0.05) except queue.Empty: - if not self.running: + if not (self.running and self._calling_thread.is_alive()): return continue try: @@ -382,13 +382,13 @@ def _compress(self, index: int): def _write(self): index = 0 output_queues = self.output_queues - while self._calling_thread.is_alive(): + while True: out_index = index % self.threads output_queue = output_queues[out_index] try: compressed, crc, data_length = output_queue.get(timeout=0.05) except queue.Empty: - if not self.running: + if not (self.running and self._calling_thread.is_alive()): return continue self._crc = zlib_ng.crc32_combine(self._crc, crc, data_length) @@ -402,11 +402,11 @@ def _compress_and_write(self): raise SystemError("Compress_and_write is for one thread only") in_queue = self.input_queues[0] compressor = self.compressors[0] - while self._calling_thread.is_alive(): + while True: try: data, zdict = in_queue.get(timeout=0.05) except queue.Empty: - if not self.running: + if not (self.running and self._calling_thread.is_alive()): return continue try: