diff -r 89742f1fa6cb -r 73fef626dae3 contrib/python-zstandard/tests/test_compressor_fuzzing.py --- a/contrib/python-zstandard/tests/test_compressor_fuzzing.py Tue Sep 25 20:55:03 2018 +0900 +++ b/contrib/python-zstandard/tests/test_compressor_fuzzing.py Mon Oct 08 16:27:40 2018 -0700 @@ -135,6 +135,51 @@ self.assertEqual(b''.join(chunks), ref_frame) + @hypothesis.settings( + suppress_health_check=[hypothesis.HealthCheck.large_base_example]) + @hypothesis.given(original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + chunk_sizes=strategies.data(), + flushes=strategies.data()) + def test_flush_block(self, original, level, chunk_sizes, flushes): + cctx = zstd.ZstdCompressor(level=level) + cobj = cctx.compressobj() + + dctx = zstd.ZstdDecompressor() + dobj = dctx.decompressobj() + + compressed_chunks = [] + decompressed_chunks = [] + i = 0 + while True: + input_size = chunk_sizes.draw(strategies.integers(1, 4096)) + source = original[i:i + input_size] + if not source: + break + + i += input_size + + chunk = cobj.compress(source) + compressed_chunks.append(chunk) + decompressed_chunks.append(dobj.decompress(chunk)) + + if not flushes.draw(strategies.booleans()): + continue + + chunk = cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK) + compressed_chunks.append(chunk) + decompressed_chunks.append(dobj.decompress(chunk)) + + self.assertEqual(b''.join(decompressed_chunks), original[0:i]) + + chunk = cobj.flush(zstd.COMPRESSOBJ_FLUSH_FINISH) + compressed_chunks.append(chunk) + decompressed_chunks.append(dobj.decompress(chunk)) + + self.assertEqual(dctx.decompress(b''.join(compressed_chunks), + max_output_size=len(original)), + original) + self.assertEqual(b''.join(decompressed_chunks), original) @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') @make_cffi @@ -186,3 +231,90 @@ for i, frame in enumerate(result): self.assertEqual(dctx.decompress(frame), original[i]) + + +@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') +@make_cffi +class TestCompressor_chunker_fuzzing(unittest.TestCase): + @hypothesis.settings( + suppress_health_check=[hypothesis.HealthCheck.large_base_example]) + @hypothesis.given(original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + chunk_size=strategies.integers( + min_value=1, + max_value=32 * 1048576), + input_sizes=strategies.data()) + def test_random_input_sizes(self, original, level, chunk_size, input_sizes): + cctx = zstd.ZstdCompressor(level=level) + chunker = cctx.chunker(chunk_size=chunk_size) + + chunks = [] + i = 0 + while True: + input_size = input_sizes.draw(strategies.integers(1, 4096)) + source = original[i:i + input_size] + if not source: + break + + chunks.extend(chunker.compress(source)) + i += input_size + + chunks.extend(chunker.finish()) + + dctx = zstd.ZstdDecompressor() + + self.assertEqual(dctx.decompress(b''.join(chunks), + max_output_size=len(original)), + original) + + self.assertTrue(all(len(chunk) == chunk_size for chunk in chunks[:-1])) + + @hypothesis.settings( + suppress_health_check=[hypothesis.HealthCheck.large_base_example]) + @hypothesis.given(original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + chunk_size=strategies.integers( + min_value=1, + max_value=32 * 1048576), + input_sizes=strategies.data(), + flushes=strategies.data()) + def test_flush_block(self, original, level, chunk_size, input_sizes, + flushes): + cctx = zstd.ZstdCompressor(level=level) + chunker = cctx.chunker(chunk_size=chunk_size) + + dctx = zstd.ZstdDecompressor() + dobj = dctx.decompressobj() + + compressed_chunks = [] + decompressed_chunks = [] + i = 0 + while True: + input_size = input_sizes.draw(strategies.integers(1, 4096)) + source = original[i:i + input_size] + if not source: + break + + i += input_size + + chunks = list(chunker.compress(source)) + compressed_chunks.extend(chunks) + decompressed_chunks.append(dobj.decompress(b''.join(chunks))) + + if not flushes.draw(strategies.booleans()): + continue + + chunks = list(chunker.flush()) + compressed_chunks.extend(chunks) + decompressed_chunks.append(dobj.decompress(b''.join(chunks))) + + self.assertEqual(b''.join(decompressed_chunks), original[0:i]) + + chunks = list(chunker.finish()) + compressed_chunks.extend(chunks) + decompressed_chunks.append(dobj.decompress(b''.join(chunks))) + + self.assertEqual(dctx.decompress(b''.join(compressed_chunks), + max_output_size=len(original)), + original) + self.assertEqual(b''.join(decompressed_chunks), original) \ No newline at end of file