contrib/python-zstandard/tests/test_compressor_fuzzing.py
changeset 40121 73fef626dae3
parent 37495 b1fb341d8a61
child 42070 675775c33ab6
--- a/contrib/python-zstandard/tests/test_compressor_fuzzing.py	Tue Sep 25 20:55:03 2018 +0900
+++ b/contrib/python-zstandard/tests/test_compressor_fuzzing.py	Mon Oct 08 16:27:40 2018 -0700
@@ -135,6 +135,51 @@
 
         self.assertEqual(b''.join(chunks), ref_frame)
 
+    @hypothesis.settings(
+        suppress_health_check=[hypothesis.HealthCheck.large_base_example])
+    @hypothesis.given(original=strategies.sampled_from(random_input_data()),
+                      level=strategies.integers(min_value=1, max_value=5),
+                      chunk_sizes=strategies.data(),
+                      flushes=strategies.data())
+    def test_flush_block(self, original, level, chunk_sizes, flushes):
+        cctx = zstd.ZstdCompressor(level=level)
+        cobj = cctx.compressobj()
+
+        dctx = zstd.ZstdDecompressor()
+        dobj = dctx.decompressobj()
+
+        compressed_chunks = []
+        decompressed_chunks = []
+        i = 0
+        while True:
+            input_size = chunk_sizes.draw(strategies.integers(1, 4096))
+            source = original[i:i + input_size]
+            if not source:
+                break
+
+            i += input_size
+
+            chunk = cobj.compress(source)
+            compressed_chunks.append(chunk)
+            decompressed_chunks.append(dobj.decompress(chunk))
+
+            if not flushes.draw(strategies.booleans()):
+                continue
+
+            chunk = cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK)
+            compressed_chunks.append(chunk)
+            decompressed_chunks.append(dobj.decompress(chunk))
+
+            self.assertEqual(b''.join(decompressed_chunks), original[0:i])
+
+        chunk = cobj.flush(zstd.COMPRESSOBJ_FLUSH_FINISH)
+        compressed_chunks.append(chunk)
+        decompressed_chunks.append(dobj.decompress(chunk))
+
+        self.assertEqual(dctx.decompress(b''.join(compressed_chunks),
+                                         max_output_size=len(original)),
+                         original)
+        self.assertEqual(b''.join(decompressed_chunks), original)
 
 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
 @make_cffi
@@ -186,3 +231,90 @@
 
         for i, frame in enumerate(result):
             self.assertEqual(dctx.decompress(frame), original[i])
+
+
+@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
+@make_cffi
+class TestCompressor_chunker_fuzzing(unittest.TestCase):
+    @hypothesis.settings(
+        suppress_health_check=[hypothesis.HealthCheck.large_base_example])
+    @hypothesis.given(original=strategies.sampled_from(random_input_data()),
+                      level=strategies.integers(min_value=1, max_value=5),
+                      chunk_size=strategies.integers(
+                          min_value=1,
+                          max_value=32 * 1048576),
+                      input_sizes=strategies.data())
+    def test_random_input_sizes(self, original, level, chunk_size, input_sizes):
+        cctx = zstd.ZstdCompressor(level=level)
+        chunker = cctx.chunker(chunk_size=chunk_size)
+
+        chunks = []
+        i = 0
+        while True:
+            input_size = input_sizes.draw(strategies.integers(1, 4096))
+            source = original[i:i + input_size]
+            if not source:
+                break
+
+            chunks.extend(chunker.compress(source))
+            i += input_size
+
+        chunks.extend(chunker.finish())
+
+        dctx = zstd.ZstdDecompressor()
+
+        self.assertEqual(dctx.decompress(b''.join(chunks),
+                                         max_output_size=len(original)),
+                         original)
+
+        self.assertTrue(all(len(chunk) == chunk_size for chunk in chunks[:-1]))
+
+    @hypothesis.settings(
+        suppress_health_check=[hypothesis.HealthCheck.large_base_example])
+    @hypothesis.given(original=strategies.sampled_from(random_input_data()),
+                      level=strategies.integers(min_value=1, max_value=5),
+                      chunk_size=strategies.integers(
+                          min_value=1,
+                          max_value=32 * 1048576),
+                      input_sizes=strategies.data(),
+                      flushes=strategies.data())
+    def test_flush_block(self, original, level, chunk_size, input_sizes,
+                         flushes):
+        cctx = zstd.ZstdCompressor(level=level)
+        chunker = cctx.chunker(chunk_size=chunk_size)
+
+        dctx = zstd.ZstdDecompressor()
+        dobj = dctx.decompressobj()
+
+        compressed_chunks = []
+        decompressed_chunks = []
+        i = 0
+        while True:
+            input_size = input_sizes.draw(strategies.integers(1, 4096))
+            source = original[i:i + input_size]
+            if not source:
+                break
+
+            i += input_size
+
+            chunks = list(chunker.compress(source))
+            compressed_chunks.extend(chunks)
+            decompressed_chunks.append(dobj.decompress(b''.join(chunks)))
+
+            if not flushes.draw(strategies.booleans()):
+                continue
+
+            chunks = list(chunker.flush())
+            compressed_chunks.extend(chunks)
+            decompressed_chunks.append(dobj.decompress(b''.join(chunks)))
+
+            self.assertEqual(b''.join(decompressed_chunks), original[0:i])
+
+        chunks = list(chunker.finish())
+        compressed_chunks.extend(chunks)
+        decompressed_chunks.append(dobj.decompress(b''.join(chunks)))
+
+        self.assertEqual(dctx.decompress(b''.join(compressed_chunks),
+                                         max_output_size=len(original)),
+                         original)
+        self.assertEqual(b''.join(decompressed_chunks), original)
\ No newline at end of file