Mercurial > hg
diff contrib/python-zstandard/tests/test_compressor.py @ 31796:e0dc40530c5a
zstd: vendor python-zstandard 0.8.0
Commit 81e1f5bbf1fc54808649562d3ed829730765c540 from
https://github.com/indygreg/python-zstandard is imported without
modifications (other than removing unwanted files).
Updates relevant to Mercurial include:
* Support for multi-threaded compression (we can use this for
bundle and wire protocol compression).
* APIs for batch compression and decompression operations using
multiple threads and optimal memory allocation mechanism. (Can
be useful for revlog perf improvements.)
* A ``BufferWithSegments`` type that models a single memory buffer
containing N discrete items of known lengths. This type can be
used for very efficient 0-copy data operations.
# no-check-commit
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Sat, 01 Apr 2017 15:24:03 -0700 |
parents | c32454d69b85 |
children | b1fb341d8a61 |
line wrap: on
line diff
--- a/contrib/python-zstandard/tests/test_compressor.py Sat Apr 01 13:43:52 2017 -0700 +++ b/contrib/python-zstandard/tests/test_compressor.py Sat Apr 01 15:24:03 2017 -0700 @@ -22,6 +22,12 @@ next = lambda it: it.next() +def multithreaded_chunk_size(level, source_size=0): + params = zstd.get_compression_parameters(level, source_size) + + return 1 << (params.window_log + 2) + + @make_cffi class TestCompressor(unittest.TestCase): def test_level_bounds(self): @@ -34,6 +40,24 @@ @make_cffi class TestCompressor_compress(unittest.TestCase): + def test_multithreaded_unsupported(self): + samples = [] + for i in range(128): + samples.append(b'foo' * 64) + samples.append(b'bar' * 64) + + d = zstd.train_dictionary(8192, samples) + + cctx = zstd.ZstdCompressor(dict_data=d, threads=2) + + with self.assertRaisesRegexp(zstd.ZstdError, 'compress\(\) cannot be used with both dictionaries and multi-threaded compression'): + cctx.compress(b'foo') + + params = zstd.get_compression_parameters(3) + cctx = zstd.ZstdCompressor(compression_params=params, threads=2) + with self.assertRaisesRegexp(zstd.ZstdError, 'compress\(\) cannot be used with both compression parameters and multi-threaded compression'): + cctx.compress(b'foo') + def test_compress_empty(self): cctx = zstd.ZstdCompressor(level=1) result = cctx.compress(b'') @@ -132,6 +156,21 @@ for i in range(32): cctx.compress(b'foo bar foobar foo bar foobar') + def test_multithreaded(self): + chunk_size = multithreaded_chunk_size(1) + source = b''.join([b'x' * chunk_size, b'y' * chunk_size]) + + cctx = zstd.ZstdCompressor(level=1, threads=2) + compressed = cctx.compress(source) + + params = zstd.get_frame_parameters(compressed) + self.assertEqual(params.content_size, chunk_size * 2) + self.assertEqual(params.dict_id, 0) + self.assertFalse(params.has_checksum) + + dctx = zstd.ZstdDecompressor() + self.assertEqual(dctx.decompress(compressed), source) + @make_cffi class TestCompressor_compressobj(unittest.TestCase): @@ -237,6 +276,30 @@ header = trailing[0:3] self.assertEqual(header, b'\x01\x00\x00') + def test_multithreaded(self): + source = io.BytesIO() + source.write(b'a' * 1048576) + source.write(b'b' * 1048576) + source.write(b'c' * 1048576) + source.seek(0) + + cctx = zstd.ZstdCompressor(level=1, threads=2) + cobj = cctx.compressobj() + + chunks = [] + while True: + d = source.read(8192) + if not d: + break + + chunks.append(cobj.compress(d)) + + chunks.append(cobj.flush()) + + compressed = b''.join(chunks) + + self.assertEqual(len(compressed), 295) + @make_cffi class TestCompressor_copy_stream(unittest.TestCase): @@ -355,6 +418,36 @@ self.assertEqual(source._read_count, len(source.getvalue()) + 1) self.assertEqual(dest._write_count, len(dest.getvalue())) + def test_multithreaded(self): + source = io.BytesIO() + source.write(b'a' * 1048576) + source.write(b'b' * 1048576) + source.write(b'c' * 1048576) + source.seek(0) + + dest = io.BytesIO() + cctx = zstd.ZstdCompressor(threads=2) + r, w = cctx.copy_stream(source, dest) + self.assertEqual(r, 3145728) + self.assertEqual(w, 295) + + params = zstd.get_frame_parameters(dest.getvalue()) + self.assertEqual(params.content_size, 0) + self.assertEqual(params.dict_id, 0) + self.assertFalse(params.has_checksum) + + # Writing content size and checksum works. + cctx = zstd.ZstdCompressor(threads=2, write_content_size=True, + write_checksum=True) + dest = io.BytesIO() + source.seek(0) + cctx.copy_stream(source, dest, size=len(source.getvalue())) + + params = zstd.get_frame_parameters(dest.getvalue()) + self.assertEqual(params.content_size, 3145728) + self.assertEqual(params.dict_id, 0) + self.assertTrue(params.has_checksum) + def compress(data, level): buffer = io.BytesIO() @@ -584,6 +677,16 @@ header = trailing[0:3] self.assertEqual(header, b'\x01\x00\x00') + def test_multithreaded(self): + dest = io.BytesIO() + cctx = zstd.ZstdCompressor(threads=2) + with cctx.write_to(dest) as compressor: + compressor.write(b'a' * 1048576) + compressor.write(b'b' * 1048576) + compressor.write(b'c' * 1048576) + + self.assertEqual(len(dest.getvalue()), 295) + @make_cffi class TestCompressor_read_from(unittest.TestCase): @@ -673,3 +776,130 @@ self.assertEqual(len(chunk), 1) self.assertEqual(source._read_count, len(source.getvalue()) + 1) + + def test_multithreaded(self): + source = io.BytesIO() + source.write(b'a' * 1048576) + source.write(b'b' * 1048576) + source.write(b'c' * 1048576) + source.seek(0) + + cctx = zstd.ZstdCompressor(threads=2) + + compressed = b''.join(cctx.read_from(source)) + self.assertEqual(len(compressed), 295) + + +class TestCompressor_multi_compress_to_buffer(unittest.TestCase): + def test_multithreaded_unsupported(self): + cctx = zstd.ZstdCompressor(threads=2) + + with self.assertRaisesRegexp(zstd.ZstdError, 'function cannot be called on ZstdCompressor configured for multi-threaded compression'): + cctx.multi_compress_to_buffer([b'foo']) + + def test_invalid_inputs(self): + cctx = zstd.ZstdCompressor() + + with self.assertRaises(TypeError): + cctx.multi_compress_to_buffer(True) + + with self.assertRaises(TypeError): + cctx.multi_compress_to_buffer((1, 2)) + + with self.assertRaisesRegexp(TypeError, 'item 0 not a bytes like object'): + cctx.multi_compress_to_buffer([u'foo']) + + def test_empty_input(self): + cctx = zstd.ZstdCompressor() + + with self.assertRaisesRegexp(ValueError, 'no source elements found'): + cctx.multi_compress_to_buffer([]) + + with self.assertRaisesRegexp(ValueError, 'source elements are empty'): + cctx.multi_compress_to_buffer([b'', b'', b'']) + + def test_list_input(self): + cctx = zstd.ZstdCompressor(write_content_size=True, write_checksum=True) + + original = [b'foo' * 12, b'bar' * 6] + frames = [cctx.compress(c) for c in original] + b = cctx.multi_compress_to_buffer(original) + + self.assertIsInstance(b, zstd.BufferWithSegmentsCollection) + + self.assertEqual(len(b), 2) + self.assertEqual(b.size(), 44) + + self.assertEqual(b[0].tobytes(), frames[0]) + self.assertEqual(b[1].tobytes(), frames[1]) + + def test_buffer_with_segments_input(self): + cctx = zstd.ZstdCompressor(write_content_size=True, write_checksum=True) + + original = [b'foo' * 4, b'bar' * 6] + frames = [cctx.compress(c) for c in original] + + offsets = struct.pack('=QQQQ', 0, len(original[0]), + len(original[0]), len(original[1])) + segments = zstd.BufferWithSegments(b''.join(original), offsets) + + result = cctx.multi_compress_to_buffer(segments) + + self.assertEqual(len(result), 2) + self.assertEqual(result.size(), 47) + + self.assertEqual(result[0].tobytes(), frames[0]) + self.assertEqual(result[1].tobytes(), frames[1]) + + def test_buffer_with_segments_collection_input(self): + cctx = zstd.ZstdCompressor(write_content_size=True, write_checksum=True) + + original = [ + b'foo1', + b'foo2' * 2, + b'foo3' * 3, + b'foo4' * 4, + b'foo5' * 5, + ] + + frames = [cctx.compress(c) for c in original] + + b = b''.join([original[0], original[1]]) + b1 = zstd.BufferWithSegments(b, struct.pack('=QQQQ', + 0, len(original[0]), + len(original[0]), len(original[1]))) + b = b''.join([original[2], original[3], original[4]]) + b2 = zstd.BufferWithSegments(b, struct.pack('=QQQQQQ', + 0, len(original[2]), + len(original[2]), len(original[3]), + len(original[2]) + len(original[3]), len(original[4]))) + + c = zstd.BufferWithSegmentsCollection(b1, b2) + + result = cctx.multi_compress_to_buffer(c) + + self.assertEqual(len(result), len(frames)) + + for i, frame in enumerate(frames): + self.assertEqual(result[i].tobytes(), frame) + + def test_multiple_threads(self): + # threads argument will cause multi-threaded ZSTD APIs to be used, which will + # make output different. + refcctx = zstd.ZstdCompressor(write_content_size=True, write_checksum=True) + reference = [refcctx.compress(b'x' * 64), refcctx.compress(b'y' * 64)] + + cctx = zstd.ZstdCompressor(write_content_size=True, write_checksum=True) + + frames = [] + frames.extend(b'x' * 64 for i in range(256)) + frames.extend(b'y' * 64 for i in range(256)) + + result = cctx.multi_compress_to_buffer(frames, threads=-1) + + self.assertEqual(len(result), 512) + for i in range(512): + if i < 256: + self.assertEqual(result[i].tobytes(), reference[0]) + else: + self.assertEqual(result[i].tobytes(), reference[1])