diff contrib/python-zstandard/tests/test_decompressor_fuzzing.py @ 31796:e0dc40530c5a

zstd: vendor python-zstandard 0.8.0 Commit 81e1f5bbf1fc54808649562d3ed829730765c540 from https://github.com/indygreg/python-zstandard is imported without modifications (other than removing unwanted files). Updates relevant to Mercurial include: * Support for multi-threaded compression (we can use this for bundle and wire protocol compression). * APIs for batch compression and decompression operations using multiple threads and optimal memory allocation mechanism. (Can be useful for revlog perf improvements.) * A ``BufferWithSegments`` type that models a single memory buffer containing N discrete items of known lengths. This type can be used for very efficient 0-copy data operations. # no-check-commit
author Gregory Szorc <gregory.szorc@gmail.com>
date Sat, 01 Apr 2017 15:24:03 -0700
parents
children b1fb341d8a61
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/contrib/python-zstandard/tests/test_decompressor_fuzzing.py	Sat Apr 01 15:24:03 2017 -0700
@@ -0,0 +1,151 @@
+import io
+import os
+
+try:
+    import unittest2 as unittest
+except ImportError:
+    import unittest
+
+try:
+    import hypothesis
+    import hypothesis.strategies as strategies
+except ImportError:
+    raise unittest.SkipTest('hypothesis not available')
+
+import zstd
+
+from . common import (
+    make_cffi,
+    random_input_data,
+)
+
+
+@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
+@make_cffi
+class TestDecompressor_write_to_fuzzing(unittest.TestCase):
+    @hypothesis.given(original=strategies.sampled_from(random_input_data()),
+                      level=strategies.integers(min_value=1, max_value=5),
+                      write_size=strategies.integers(min_value=1, max_value=8192),
+                      input_sizes=strategies.streaming(
+                          strategies.integers(min_value=1, max_value=4096)))
+    def test_write_size_variance(self, original, level, write_size, input_sizes):
+        input_sizes = iter(input_sizes)
+
+        cctx = zstd.ZstdCompressor(level=level)
+        frame = cctx.compress(original)
+
+        dctx = zstd.ZstdDecompressor()
+        source = io.BytesIO(frame)
+        dest = io.BytesIO()
+
+        with dctx.write_to(dest, write_size=write_size) as decompressor:
+            while True:
+                chunk = source.read(next(input_sizes))
+                if not chunk:
+                    break
+
+                decompressor.write(chunk)
+
+        self.assertEqual(dest.getvalue(), original)
+
+
+@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
+@make_cffi
+class TestDecompressor_copy_stream_fuzzing(unittest.TestCase):
+    @hypothesis.given(original=strategies.sampled_from(random_input_data()),
+                      level=strategies.integers(min_value=1, max_value=5),
+                      read_size=strategies.integers(min_value=1, max_value=8192),
+                      write_size=strategies.integers(min_value=1, max_value=8192))
+    def test_read_write_size_variance(self, original, level, read_size, write_size):
+        cctx = zstd.ZstdCompressor(level=level)
+        frame = cctx.compress(original)
+
+        source = io.BytesIO(frame)
+        dest = io.BytesIO()
+
+        dctx = zstd.ZstdDecompressor()
+        dctx.copy_stream(source, dest, read_size=read_size, write_size=write_size)
+
+        self.assertEqual(dest.getvalue(), original)
+
+
+@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
+@make_cffi
+class TestDecompressor_decompressobj_fuzzing(unittest.TestCase):
+    @hypothesis.given(original=strategies.sampled_from(random_input_data()),
+                      level=strategies.integers(min_value=1, max_value=5),
+                      chunk_sizes=strategies.streaming(
+                          strategies.integers(min_value=1, max_value=4096)))
+    def test_random_input_sizes(self, original, level, chunk_sizes):
+        chunk_sizes = iter(chunk_sizes)
+
+        cctx = zstd.ZstdCompressor(level=level)
+        frame = cctx.compress(original)
+
+        source = io.BytesIO(frame)
+
+        dctx = zstd.ZstdDecompressor()
+        dobj = dctx.decompressobj()
+
+        chunks = []
+        while True:
+            chunk = source.read(next(chunk_sizes))
+            if not chunk:
+                break
+
+            chunks.append(dobj.decompress(chunk))
+
+        self.assertEqual(b''.join(chunks), original)
+
+
+@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
+@make_cffi
+class TestDecompressor_read_from_fuzzing(unittest.TestCase):
+    @hypothesis.given(original=strategies.sampled_from(random_input_data()),
+                      level=strategies.integers(min_value=1, max_value=5),
+                      read_size=strategies.integers(min_value=1, max_value=4096),
+                      write_size=strategies.integers(min_value=1, max_value=4096))
+    def test_read_write_size_variance(self, original, level, read_size, write_size):
+        cctx = zstd.ZstdCompressor(level=level)
+        frame = cctx.compress(original)
+
+        source = io.BytesIO(frame)
+
+        dctx = zstd.ZstdDecompressor()
+        chunks = list(dctx.read_from(source, read_size=read_size, write_size=write_size))
+
+        self.assertEqual(b''.join(chunks), original)
+
+
+@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
+class TestDecompressor_multi_decompress_to_buffer_fuzzing(unittest.TestCase):
+    @hypothesis.given(original=strategies.lists(strategies.sampled_from(random_input_data()),
+                                        min_size=1, max_size=1024),
+                threads=strategies.integers(min_value=1, max_value=8),
+                use_dict=strategies.booleans())
+    def test_data_equivalence(self, original, threads, use_dict):
+        kwargs = {}
+        if use_dict:
+            kwargs['dict_data'] = zstd.ZstdCompressionDict(original[0])
+
+        cctx = zstd.ZstdCompressor(level=1,
+                                   write_content_size=True,
+                                   write_checksum=True,
+                                   **kwargs)
+
+        frames_buffer = cctx.multi_compress_to_buffer(original, threads=-1)
+
+        dctx = zstd.ZstdDecompressor(**kwargs)
+
+        result = dctx.multi_decompress_to_buffer(frames_buffer)
+
+        self.assertEqual(len(result), len(original))
+        for i, frame in enumerate(result):
+            self.assertEqual(frame.tobytes(), original[i])
+
+        frames_list = [f.tobytes() for f in frames_buffer]
+        result = dctx.multi_decompress_to_buffer(frames_list)
+
+        self.assertEqual(len(result), len(original))
+        for i, frame in enumerate(result):
+            self.assertEqual(frame.tobytes(), original[i])