10000 Fix multithreaded flushing implementation. by rhpvorderman · Pull Request #234 · pycompression/python-isal · GitHub
[go: up one dir, main page]

Skip to content

Fix multithreaded flushing implementation. #234

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ Changelog
version 1.8.0-dev
-----------------
+ Python 3.8 is no longer supported.
+ Fix an issue where flushing using igzip_threaded caused a gzip end of stream
and started a new gzip stream. In essence creating a concatenated gzip
stream. Now it is in concordance with how single threaded gzip streams
are flushed using Z_SYNC_FLUSH.
+ Change build backend to setuptools-scm which is more commonly used and
supported.
+ Include test packages in the source distribution, so source distribution
Expand Down
18 changes: 7 additions & 11 deletions src/isal/igzip_threaded.py
View file Open in desktop
Original file line number Diff line number Diff line change
Expand Up @@ -316,30 +316,26 @@ def write(self, b) -> int:
self.input_queues[worker_index].put((data, zdict))
return len(data)

def _end_gzip_stream(self):
def flush(self):
self._check_closed()
# Wait for all data to be compressed
for in_q in self.input_queues:
in_q.join()
# Wait for all data to be written
for out_q in self.output_queues:
out_q.join()
# Write an empty deflate block with a lost block marker.
self.raw.flush()

def close(self) -> None:
if self._closed:
return
self.flush()
self.raw.write(isal_zlib.compress(b"", wbits=-15))
trailer = struct.pack("<II", self._crc, self._size & 0xFFFFFFFF)
self.raw.write(trailer)
self._crc = 0
self._size = 0
self.raw.flush()

def flush(self):
self._end_gzip_stream()
self._write_gzip_header()

def close(self) -> None:
if self._closed:
return
self._end_gzip_stream()
self.stop()
if self.exception:
self.raw.close()
Expand Down
21 changes: 18 additions & 3 deletions tests/test_igzip_threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import subprocess
import sys
import tempfile
import zlib
from pathlib import Path

from isal import igzip_threaded
Expand Down Expand Up @@ -243,15 +244,29 @@ def test_threaded_program_can_exit_on_error(tmp_path, mode, threads):

@pytest.mark.parametrize("threads", [1, 2])
def test_flush(tmp_path, threads):
empty_block_end = b"\x00\x00\xff\xff"
compressobj = zlib.compressobj(wbits=-15)
deflate_last_block = compressobj.compress(b"") + compressobj.flush()
test_file = tmp_path / "output.gz"
with igzip_threaded.open(test_file, "wb", threads=threads) as f:
f.write(b"1")
f.flush()
assert gzip.decompress(test_file.read_bytes()) == b"1"
data = test_file.read_bytes()
assert data[-4:] == empty_block_end
# Cut off gzip header and end data with an explicit last block to
# test if the data was compressed correctly.
deflate_block = data[10:] + deflate_last_block
assert zlib.decompress(deflate_block, wbits=-15) == b"1"
f.write(b"2")
f.flush()
assert gzip.decompress(test_file.read_bytes()) == b"12"
data = test_file.read_bytes()
assert data[-4:] == empty_block_end
deflate_block = data[10:] + deflate_last_block
assert zlib.decompress(deflate_block, wbits=-15) == b"12"
f.write(b"3")
f.flush()
assert gzip.decompress(test_file.read_bytes()) == b"123"
data = test_file.read_bytes()
assert data[-4:] == empty_block_end
deflate_block = data[10:] + deflate_last_block
assert zlib.decompress(deflate_block, wbits=-15) == b"123"
assert gzip.decompress(test_file.read_bytes()) == b"123"
Loading
0