diff contrib/python-zstandard/tests/test_compressor.py @ 31796:e0dc40530c5a

zstd: vendor python-zstandard 0.8.0 Commit 81e1f5bbf1fc54808649562d3ed829730765c540 from https://github.com/indygreg/python-zstandard is imported without modifications (other than removing unwanted files). Updates relevant to Mercurial include: * Support for multi-threaded compression (we can use this for bundle and wire protocol compression). * APIs for batch compression and decompression operations using multiple threads and optimal memory allocation mechanism. (Can be useful for revlog perf improvements.) * A ``BufferWithSegments`` type that models a single memory buffer containing N discrete items of known lengths. This type can be used for very efficient 0-copy data operations. # no-check-commit
author Gregory Szorc <gregory.szorc@gmail.com>
date Sat, 01 Apr 2017 15:24:03 -0700
parents c32454d69b85
children b1fb341d8a61
line wrap: on
line diff
--- a/contrib/python-zstandard/tests/test_compressor.py	Sat Apr 01 13:43:52 2017 -0700
+++ b/contrib/python-zstandard/tests/test_compressor.py	Sat Apr 01 15:24:03 2017 -0700
@@ -22,6 +22,12 @@
     next = lambda it: it.next()
 
 
+def multithreaded_chunk_size(level, source_size=0):
+    params = zstd.get_compression_parameters(level, source_size)
+
+    return 1 << (params.window_log + 2)
+
+
 @make_cffi
 class TestCompressor(unittest.TestCase):
     def test_level_bounds(self):
@@ -34,6 +40,24 @@
 
 @make_cffi
 class TestCompressor_compress(unittest.TestCase):
+    def test_multithreaded_unsupported(self):
+        samples = []
+        for i in range(128):
+            samples.append(b'foo' * 64)
+            samples.append(b'bar' * 64)
+
+        d = zstd.train_dictionary(8192, samples)
+
+        cctx = zstd.ZstdCompressor(dict_data=d, threads=2)
+
+        with self.assertRaisesRegexp(zstd.ZstdError, 'compress\(\) cannot be used with both dictionaries and multi-threaded compression'):
+            cctx.compress(b'foo')
+
+        params = zstd.get_compression_parameters(3)
+        cctx = zstd.ZstdCompressor(compression_params=params, threads=2)
+        with self.assertRaisesRegexp(zstd.ZstdError, 'compress\(\) cannot be used with both compression parameters and multi-threaded compression'):
+            cctx.compress(b'foo')
+
     def test_compress_empty(self):
         cctx = zstd.ZstdCompressor(level=1)
         result = cctx.compress(b'')
@@ -132,6 +156,21 @@
         for i in range(32):
             cctx.compress(b'foo bar foobar foo bar foobar')
 
+    def test_multithreaded(self):
+        chunk_size = multithreaded_chunk_size(1)
+        source = b''.join([b'x' * chunk_size, b'y' * chunk_size])
+
+        cctx = zstd.ZstdCompressor(level=1, threads=2)
+        compressed = cctx.compress(source)
+
+        params = zstd.get_frame_parameters(compressed)
+        self.assertEqual(params.content_size, chunk_size * 2)
+        self.assertEqual(params.dict_id, 0)
+        self.assertFalse(params.has_checksum)
+
+        dctx = zstd.ZstdDecompressor()
+        self.assertEqual(dctx.decompress(compressed), source)
+
 
 @make_cffi
 class TestCompressor_compressobj(unittest.TestCase):
@@ -237,6 +276,30 @@
         header = trailing[0:3]
         self.assertEqual(header, b'\x01\x00\x00')
 
+    def test_multithreaded(self):
+        source = io.BytesIO()
+        source.write(b'a' * 1048576)
+        source.write(b'b' * 1048576)
+        source.write(b'c' * 1048576)
+        source.seek(0)
+
+        cctx = zstd.ZstdCompressor(level=1, threads=2)
+        cobj = cctx.compressobj()
+
+        chunks = []
+        while True:
+            d = source.read(8192)
+            if not d:
+                break
+
+            chunks.append(cobj.compress(d))
+
+        chunks.append(cobj.flush())
+
+        compressed = b''.join(chunks)
+
+        self.assertEqual(len(compressed), 295)
+
 
 @make_cffi
 class TestCompressor_copy_stream(unittest.TestCase):
@@ -355,6 +418,36 @@
         self.assertEqual(source._read_count, len(source.getvalue()) + 1)
         self.assertEqual(dest._write_count, len(dest.getvalue()))
 
+    def test_multithreaded(self):
+        source = io.BytesIO()
+        source.write(b'a' * 1048576)
+        source.write(b'b' * 1048576)
+        source.write(b'c' * 1048576)
+        source.seek(0)
+
+        dest = io.BytesIO()
+        cctx = zstd.ZstdCompressor(threads=2)
+        r, w = cctx.copy_stream(source, dest)
+        self.assertEqual(r, 3145728)
+        self.assertEqual(w, 295)
+
+        params = zstd.get_frame_parameters(dest.getvalue())
+        self.assertEqual(params.content_size, 0)
+        self.assertEqual(params.dict_id, 0)
+        self.assertFalse(params.has_checksum)
+
+        # Writing content size and checksum works.
+        cctx = zstd.ZstdCompressor(threads=2, write_content_size=True,
+                                   write_checksum=True)
+        dest = io.BytesIO()
+        source.seek(0)
+        cctx.copy_stream(source, dest, size=len(source.getvalue()))
+
+        params = zstd.get_frame_parameters(dest.getvalue())
+        self.assertEqual(params.content_size, 3145728)
+        self.assertEqual(params.dict_id, 0)
+        self.assertTrue(params.has_checksum)
+
 
 def compress(data, level):
     buffer = io.BytesIO()
@@ -584,6 +677,16 @@
         header = trailing[0:3]
         self.assertEqual(header, b'\x01\x00\x00')
 
+    def test_multithreaded(self):
+        dest = io.BytesIO()
+        cctx = zstd.ZstdCompressor(threads=2)
+        with cctx.write_to(dest) as compressor:
+            compressor.write(b'a' * 1048576)
+            compressor.write(b'b' * 1048576)
+            compressor.write(b'c' * 1048576)
+
+        self.assertEqual(len(dest.getvalue()), 295)
+
 
 @make_cffi
 class TestCompressor_read_from(unittest.TestCase):
@@ -673,3 +776,130 @@
             self.assertEqual(len(chunk), 1)
 
         self.assertEqual(source._read_count, len(source.getvalue()) + 1)
+
+    def test_multithreaded(self):
+        source = io.BytesIO()
+        source.write(b'a' * 1048576)
+        source.write(b'b' * 1048576)
+        source.write(b'c' * 1048576)
+        source.seek(0)
+
+        cctx = zstd.ZstdCompressor(threads=2)
+
+        compressed = b''.join(cctx.read_from(source))
+        self.assertEqual(len(compressed), 295)
+
+
+class TestCompressor_multi_compress_to_buffer(unittest.TestCase):
+    def test_multithreaded_unsupported(self):
+        cctx = zstd.ZstdCompressor(threads=2)
+
+        with self.assertRaisesRegexp(zstd.ZstdError, 'function cannot be called on ZstdCompressor configured for multi-threaded compression'):
+            cctx.multi_compress_to_buffer([b'foo'])
+
+    def test_invalid_inputs(self):
+        cctx = zstd.ZstdCompressor()
+
+        with self.assertRaises(TypeError):
+            cctx.multi_compress_to_buffer(True)
+
+        with self.assertRaises(TypeError):
+            cctx.multi_compress_to_buffer((1, 2))
+
+        with self.assertRaisesRegexp(TypeError, 'item 0 not a bytes like object'):
+            cctx.multi_compress_to_buffer([u'foo'])
+
+    def test_empty_input(self):
+        cctx = zstd.ZstdCompressor()
+
+        with self.assertRaisesRegexp(ValueError, 'no source elements found'):
+            cctx.multi_compress_to_buffer([])
+
+        with self.assertRaisesRegexp(ValueError, 'source elements are empty'):
+            cctx.multi_compress_to_buffer([b'', b'', b''])
+
+    def test_list_input(self):
+        cctx = zstd.ZstdCompressor(write_content_size=True, write_checksum=True)
+
+        original = [b'foo' * 12, b'bar' * 6]
+        frames = [cctx.compress(c) for c in original]
+        b = cctx.multi_compress_to_buffer(original)
+
+        self.assertIsInstance(b, zstd.BufferWithSegmentsCollection)
+
+        self.assertEqual(len(b), 2)
+        self.assertEqual(b.size(), 44)
+
+        self.assertEqual(b[0].tobytes(), frames[0])
+        self.assertEqual(b[1].tobytes(), frames[1])
+
+    def test_buffer_with_segments_input(self):
+        cctx = zstd.ZstdCompressor(write_content_size=True, write_checksum=True)
+
+        original = [b'foo' * 4, b'bar' * 6]
+        frames = [cctx.compress(c) for c in original]
+
+        offsets = struct.pack('=QQQQ', 0, len(original[0]),
+                                       len(original[0]), len(original[1]))
+        segments = zstd.BufferWithSegments(b''.join(original), offsets)
+
+        result = cctx.multi_compress_to_buffer(segments)
+
+        self.assertEqual(len(result), 2)
+        self.assertEqual(result.size(), 47)
+
+        self.assertEqual(result[0].tobytes(), frames[0])
+        self.assertEqual(result[1].tobytes(), frames[1])
+
+    def test_buffer_with_segments_collection_input(self):
+        cctx = zstd.ZstdCompressor(write_content_size=True, write_checksum=True)
+
+        original = [
+            b'foo1',
+            b'foo2' * 2,
+            b'foo3' * 3,
+            b'foo4' * 4,
+            b'foo5' * 5,
+        ]
+
+        frames = [cctx.compress(c) for c in original]
+
+        b = b''.join([original[0], original[1]])
+        b1 = zstd.BufferWithSegments(b, struct.pack('=QQQQ',
+                                                    0, len(original[0]),
+                                                    len(original[0]), len(original[1])))
+        b = b''.join([original[2], original[3], original[4]])
+        b2 = zstd.BufferWithSegments(b, struct.pack('=QQQQQQ',
+                                                    0, len(original[2]),
+                                                    len(original[2]), len(original[3]),
+                                                    len(original[2]) + len(original[3]), len(original[4])))
+
+        c = zstd.BufferWithSegmentsCollection(b1, b2)
+
+        result = cctx.multi_compress_to_buffer(c)
+
+        self.assertEqual(len(result), len(frames))
+
+        for i, frame in enumerate(frames):
+            self.assertEqual(result[i].tobytes(), frame)
+
+    def test_multiple_threads(self):
+        # threads argument will cause multi-threaded ZSTD APIs to be used, which will
+        # make output different.
+        refcctx = zstd.ZstdCompressor(write_content_size=True, write_checksum=True)
+        reference = [refcctx.compress(b'x' * 64), refcctx.compress(b'y' * 64)]
+
+        cctx = zstd.ZstdCompressor(write_content_size=True, write_checksum=True)
+
+        frames = []
+        frames.extend(b'x' * 64 for i in range(256))
+        frames.extend(b'y' * 64 for i in range(256))
+
+        result = cctx.multi_compress_to_buffer(frames, threads=-1)
+
+        self.assertEqual(len(result), 512)
+        for i in range(512):
+            if i < 256:
+                self.assertEqual(result[i].tobytes(), reference[0])
+            else:
+                self.assertEqual(result[i].tobytes(), reference[1])