--- a/contrib/python-zstandard/tests/test_compressor_fuzzing.py Tue Sep 25 20:55:03 2018 +0900
+++ b/contrib/python-zstandard/tests/test_compressor_fuzzing.py Mon Oct 08 16:27:40 2018 -0700
@@ -135,6 +135,51 @@
self.assertEqual(b''.join(chunks), ref_frame)
+ @hypothesis.settings(
+ suppress_health_check=[hypothesis.HealthCheck.large_base_example])
+ @hypothesis.given(original=strategies.sampled_from(random_input_data()),
+ level=strategies.integers(min_value=1, max_value=5),
+ chunk_sizes=strategies.data(),
+ flushes=strategies.data())
+ def test_flush_block(self, original, level, chunk_sizes, flushes):
+ cctx = zstd.ZstdCompressor(level=level)
+ cobj = cctx.compressobj()
+
+ dctx = zstd.ZstdDecompressor()
+ dobj = dctx.decompressobj()
+
+ compressed_chunks = []
+ decompressed_chunks = []
+ i = 0
+ while True:
+ input_size = chunk_sizes.draw(strategies.integers(1, 4096))
+ source = original[i:i + input_size]
+ if not source:
+ break
+
+ i += input_size
+
+ chunk = cobj.compress(source)
+ compressed_chunks.append(chunk)
+ decompressed_chunks.append(dobj.decompress(chunk))
+
+ if not flushes.draw(strategies.booleans()):
+ continue
+
+ chunk = cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK)
+ compressed_chunks.append(chunk)
+ decompressed_chunks.append(dobj.decompress(chunk))
+
+ self.assertEqual(b''.join(decompressed_chunks), original[0:i])
+
+ chunk = cobj.flush(zstd.COMPRESSOBJ_FLUSH_FINISH)
+ compressed_chunks.append(chunk)
+ decompressed_chunks.append(dobj.decompress(chunk))
+
+ self.assertEqual(dctx.decompress(b''.join(compressed_chunks),
+ max_output_size=len(original)),
+ original)
+ self.assertEqual(b''.join(decompressed_chunks), original)
@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
@make_cffi
@@ -186,3 +231,90 @@
for i, frame in enumerate(result):
self.assertEqual(dctx.decompress(frame), original[i])
+
+
+@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
+@make_cffi
+class TestCompressor_chunker_fuzzing(unittest.TestCase):
+ @hypothesis.settings(
+ suppress_health_check=[hypothesis.HealthCheck.large_base_example])
+ @hypothesis.given(original=strategies.sampled_from(random_input_data()),
+ level=strategies.integers(min_value=1, max_value=5),
+ chunk_size=strategies.integers(
+ min_value=1,
+ max_value=32 * 1048576),
+ input_sizes=strategies.data())
+ def test_random_input_sizes(self, original, level, chunk_size, input_sizes):
+ cctx = zstd.ZstdCompressor(level=level)
+ chunker = cctx.chunker(chunk_size=chunk_size)
+
+ chunks = []
+ i = 0
+ while True:
+ input_size = input_sizes.draw(strategies.integers(1, 4096))
+ source = original[i:i + input_size]
+ if not source:
+ break
+
+ chunks.extend(chunker.compress(source))
+ i += input_size
+
+ chunks.extend(chunker.finish())
+
+ dctx = zstd.ZstdDecompressor()
+
+ self.assertEqual(dctx.decompress(b''.join(chunks),
+ max_output_size=len(original)),
+ original)
+
+ self.assertTrue(all(len(chunk) == chunk_size for chunk in chunks[:-1]))
+
+ @hypothesis.settings(
+ suppress_health_check=[hypothesis.HealthCheck.large_base_example])
+ @hypothesis.given(original=strategies.sampled_from(random_input_data()),
+ level=strategies.integers(min_value=1, max_value=5),
+ chunk_size=strategies.integers(
+ min_value=1,
+ max_value=32 * 1048576),
+ input_sizes=strategies.data(),
+ flushes=strategies.data())
+ def test_flush_block(self, original, level, chunk_size, input_sizes,
+ flushes):
+ cctx = zstd.ZstdCompressor(level=level)
+ chunker = cctx.chunker(chunk_size=chunk_size)
+
+ dctx = zstd.ZstdDecompressor()
+ dobj = dctx.decompressobj()
+
+ compressed_chunks = []
+ decompressed_chunks = []
+ i = 0
+ while True:
+ input_size = input_sizes.draw(strategies.integers(1, 4096))
+ source = original[i:i + input_size]
+ if not source:
+ break
+
+ i += input_size
+
+ chunks = list(chunker.compress(source))
+ compressed_chunks.extend(chunks)
+ decompressed_chunks.append(dobj.decompress(b''.join(chunks)))
+
+ if not flushes.draw(strategies.booleans()):
+ continue
+
+ chunks = list(chunker.flush())
+ compressed_chunks.extend(chunks)
+ decompressed_chunks.append(dobj.decompress(b''.join(chunks)))
+
+ self.assertEqual(b''.join(decompressed_chunks), original[0:i])
+
+ chunks = list(chunker.finish())
+ compressed_chunks.extend(chunks)
+ decompressed_chunks.append(dobj.decompress(b''.join(chunks)))
+
+ self.assertEqual(dctx.decompress(b''.join(compressed_chunks),
+ max_output_size=len(original)),
+ original)
+ self.assertEqual(b''.join(decompressed_chunks), original)
\ No newline at end of file