From 5ef868bd29559dd1647230622bae3c8224f27738 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Tue, 24 Jun 2025 11:40:33 +0200 Subject: [PATCH 1/4] Fix flush implementation --- src/isal/igzip_threaded.py | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/src/isal/igzip_threaded.py b/src/isal/igzip_threaded.py index 7f1c94fc..355b7f72 100644 --- a/src/isal/igzip_threaded.py +++ b/src/isal/igzip_threaded.py @@ -316,7 +316,7 @@ def write(self, b) -> int: self.input_queues[worker_index].put((data, zdict)) return len(data) - def _end_gzip_stream(self): + def flush(self): self._check_closed() # Wait for all data to be compressed for in_q in self.input_queues: @@ -324,22 +324,18 @@ def _end_gzip_stream(self): # Wait for all data to be written for out_q in self.output_queues: out_q.join() - # Write an empty deflate block with a lost block marker. + self.raw.flush() + + def close(self) -> None: + if self._closed: + return + self.flush() self.raw.write(isal_zlib.compress(b"", wbits=-15)) trailer = struct.pack(" None: - if self._closed: - return - self._end_gzip_stream() self.stop() if self.exception: self.raw.close() From 43983269f56a911e84fce09b56cfb95765f6334f Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Tue, 24 Jun 2025 14:22:54 +0200 Subject: [PATCH 2/4] Add a test for correctly flushing data to disk --- tests/test_igzip_threaded.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/tests/test_igzip_threaded.py b/tests/test_igzip_threaded.py index 41c61bfe..6331182b 100644 --- a/tests/test_igzip_threaded.py +++ b/tests/test_igzip_threaded.py @@ -12,6 +12,7 @@ import subprocess import sys import tempfile +import zlib from pathlib import Path from isal import igzip_threaded @@ -243,15 +244,28 @@ def test_threaded_program_can_exit_on_error(tmp_path, mode, threads): @pytest.mark.parametrize("threads", [1, 2]) def test_flush(tmp_path, threads): + empty_block_end = b"\x00\x00\xff\xff" + deflate_last_block = zlib.compress(b"", wbits=-15) test_file = tmp_path / "output.gz" with igzip_threaded.open(test_file, "wb", threads=threads) as f: f.write(b"1") f.flush() - assert gzip.decompress(test_file.read_bytes()) == b"1" + data = test_file.read_bytes() + assert data[-4:] == empty_block_end + # Cut off gzip header and end data with an explicit last block to + # test if the data was compressed correctly. + deflate_block = data[10:] + deflate_last_block + assert zlib.decompress(deflate_block, wbits=-15) == b"1" f.write(b"2") f.flush() - assert gzip.decompress(test_file.read_bytes()) == b"12" + data = test_file.read_bytes() + assert data[-4:] == empty_block_end + deflate_block = data[10:] + deflate_last_block + assert zlib.decompress(deflate_block, wbits=-15) == b"12" f.write(b"3") f.flush() - assert gzip.decompress(test_file.read_bytes()) == b"123" + data = test_file.read_bytes() + assert data[-4:] == empty_block_end + deflate_block = data[10:] + deflate_last_block + assert zlib.decompress(deflate_block, wbits=-15) == b"123" assert gzip.decompress(test_file.read_bytes()) == b"123" From 85e9d4853ca9d67d7b206c71428fc597b31549c6 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Tue, 24 Jun 2025 14:26:24 +0200 Subject: [PATCH 3/4] Add multithreaded flushing fix to the CHANGELOG --- CHANGELOG.rst | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index c49acba7..508abc46 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -11,6 +11,10 @@ Changelog version 1.8.0-dev ----------------- + Python 3.8 is no longer supported. ++ Fix an issue where flushing using igzip_threaded caused a gzip end of stream + and started a new gzip stream. In essence creating a concatenated gzip + stream. Now it is in concordance with how single threaded gzip streams + are flushed using Z_SYNC_FLUSH. + Change build backend to setuptools-scm which is more commonly used and supported. + Include test packages in the source distribution, so source distribution From 0fef0a9bd44689a51d0e32fd51f07708e96e3012 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Tue, 24 Jun 2025 14:31:31 +0200 Subject: [PATCH 4/4] Fix test for older versions of python --- tests/test_igzip_threaded.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_igzip_threaded.py b/tests/test_igzip_threaded.py index 6331182b..d2aee567 100644 --- a/tests/test_igzip_threaded.py +++ b/tests/test_igzip_threaded.py @@ -245,7 +245,8 @@ def test_threaded_program_can_exit_on_error(tmp_path, mode, threads): @pytest.mark.parametrize("threads", [1, 2]) def test_flush(tmp_path, threads): empty_block_end = b"\x00\x00\xff\xff" - deflate_last_block = zlib.compress(b"", wbits=-15) + compressobj = zlib.compressobj(wbits=-15) + deflate_last_block = compressobj.compress(b"") + compressobj.flush() test_file = tmp_path / "output.gz" with igzip_threaded.open(test_file, "wb", threads=threads) as f: f.write(b"1")