diff --git a/CHANGELOG.rst b/CHANGELOG.rst index e38a722..1b7cf28 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -7,6 +7,13 @@ Changelog .. This document is user facing. Please word the changes in such a way .. that users understand how the changes affect the new version. +version 0.5.1 +----------------- ++ Fix a bug where flushing in threaded mode did not write the data to the + output file. ++ Threaded reading and writing do no longer block exiting when an exception + occurs in the main thread. + version 0.5.0 ----------------- + Wheels are now build for MacOS arm64 architectures. diff --git a/docs/conf.py b/docs/conf.py index e2e0bf0..528d846 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -4,26 +4,19 @@ # list see the documentation: # https://www.sphinx-doc.org/en/master/usage/configuration.html -from distutils.dist import DistributionMetadata -from pathlib import Path - -import pkg_resources +import zlib_ng # -- Project information ----------------------------------------------------- -# Get package information from the installed package. -package = pkg_resources.get_distribution("zlib-ng") -metadata_file = Path(package.egg_info) / Path(package.PKG_INFO) -metadata = DistributionMetadata(path=str(metadata_file)) - project = 'python-zlib-ng' copyright = '2023, Leiden University Medical Center' author = 'Leiden University Medical Center' + # The short X.Y version -version = package.parsed_version.base_version +version = zlib_ng.__version__ # The full version, including alpha/beta/rc tags -release = package.version +release = zlib_ng.__version__ # -- General configuration --------------------------------------------------- diff --git a/requirements-docs.txt b/requirements-docs.txt index 7fea598..051c278 100644 --- a/requirements-docs.txt +++ b/requirements-docs.txt @@ -1,4 +1,4 @@ sphinx setuptools -sphinx-rtd-theme>=1.2.0rc3,<1.3 +sphinx-rtd-theme sphinx-argparse \ No newline at end of file diff --git a/setup.py b/setup.py index 95a6a44..8959b04 100644 --- a/setup.py +++ b/setup.py @@ -151,6 +151,7 @@ def build_zlib_ng(): "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", "Programming Language :: Python :: Implementation :: CPython", "Programming Language :: Python :: Implementation :: PyPy", "Programming Language :: C", diff --git a/src/zlib_ng/gzip_ng_threaded.py b/src/zlib_ng/gzip_ng_threaded.py index 5b8a9ff..b9ec269 100644 --- a/src/zlib_ng/gzip_ng_threaded.py +++ b/src/zlib_ng/gzip_ng_threaded.py @@ -60,7 +60,7 @@ def open(filename, mode="rb", compresslevel=gzip_ng._COMPRESS_LEVEL_TRADEOFF, gzip_file = io.BufferedReader( _ThreadedGzipReader(filename, block_size=block_size)) else: - gzip_file = io.BufferedWriter( + gzip_file = FlushableBufferedWriter( _ThreadedGzipWriter( filename, mode.replace("t", "b"), @@ -101,6 +101,7 @@ def __init__(self, filename, queue_size=2, block_size=1024 * 1024): self.worker = threading.Thread(target=self._decompress) self._closed = False self.running = True + self._calling_thread = threading.current_thread() self.worker.start() def _check_closed(self, msg=None): @@ -110,7 +111,7 @@ def _check_closed(self, msg=None): def _decompress(self): block_size = self.block_size block_queue = self.queue - while self.running: + while self.running and self._calling_thread.is_alive(): try: data = self.fileobj.read(block_size) except Exception as e: @@ -118,7 +119,7 @@ def _decompress(self): return if not data: return - while self.running: + while self.running and self._calling_thread.is_alive(): try: block_queue.put(data, timeout=0.05) break @@ -166,6 +167,12 @@ def closed(self) -> bool: return self._closed +class FlushableBufferedWriter(io.BufferedWriter): + def flush(self): + super().flush() + self.raw.flush() + + class _ThreadedGzipWriter(io.RawIOBase): """ Write a gzip file using multiple threads. @@ -215,6 +222,7 @@ def __init__(self, if "b" not in mode: mode += "b" self.lock = threading.Lock() + self._calling_thread = threading.current_thread() self.exception: Optional[Exception] = None self.level = level self.previous_block = b"" @@ -313,7 +321,7 @@ def write(self, b) -> int: self.input_queues[worker_index].put((data, zdict)) return len(data) - def flush(self): + def _end_gzip_stream(self): self._check_closed() # Wait for all data to be compressed for in_q in self.input_queues: @@ -321,22 +329,27 @@ def flush(self): # Wait for all data to be written for out_q in self.output_queues: out_q.join() + # Write an empty deflate block with a lost block marker. + self.raw.write(zlib_ng.compress(b"", wbits=-15)) + trailer = struct.pack(" None: if self._closed: return - self.flush() + self._end_gzip_stream() self.stop() if self.exception: self.raw.close() self._closed = True raise self.exception - # Write an empty deflate block with a lost block marker. - self.raw.write(zlib_ng.compress(b"", wbits=-15)) - trailer = struct.pack("