Mercurial > hg
diff contrib/python-zstandard/tests/test_decompressor_fuzzing.py @ 37495:b1fb341d8a61
zstandard: vendor python-zstandard 0.9.0
This was just released. It features a number of goodies. More info at
https://gregoryszorc.com/blog/2018/04/09/release-of-python-zstandard-0.9/.
The clang-format ignore list was updated to reflect the new source
of files.
The project contains a vendored copy of zstandard 1.3.4. The old
version was 1.1.3. One of the changes between those versions is that
zstandard is now dual licensed BSD + GPLv2 and the patent rights grant
has been removed. Good riddance.
The API should be backwards compatible. So no changes in core
should be needed. However, there were a number of changes in the
library that we'll want to adapt to. Those will be addressed in
subsequent commits.
Differential Revision: https://phab.mercurial-scm.org/D3198
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Mon, 09 Apr 2018 10:13:29 -0700 |
parents | e0dc40530c5a |
children | 675775c33ab6 |
line wrap: on
line diff
--- a/contrib/python-zstandard/tests/test_decompressor_fuzzing.py Sun Apr 08 01:08:43 2018 +0200 +++ b/contrib/python-zstandard/tests/test_decompressor_fuzzing.py Mon Apr 09 10:13:29 2018 -0700 @@ -1,10 +1,6 @@ import io import os - -try: - import unittest2 as unittest -except ImportError: - import unittest +import unittest try: import hypothesis @@ -12,7 +8,7 @@ except ImportError: raise unittest.SkipTest('hypothesis not available') -import zstd +import zstandard as zstd from . common import ( make_cffi, @@ -22,15 +18,96 @@ @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') @make_cffi -class TestDecompressor_write_to_fuzzing(unittest.TestCase): +class TestDecompressor_stream_reader_fuzzing(unittest.TestCase): + @hypothesis.settings( + suppress_health_check=[hypothesis.HealthCheck.large_base_example]) + @hypothesis.given(original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + source_read_size=strategies.integers(1, 16384), + read_sizes=strategies.data()) + def test_stream_source_read_variance(self, original, level, source_read_size, + read_sizes): + cctx = zstd.ZstdCompressor(level=level) + frame = cctx.compress(original) + + dctx = zstd.ZstdDecompressor() + source = io.BytesIO(frame) + + chunks = [] + with dctx.stream_reader(source, read_size=source_read_size) as reader: + while True: + read_size = read_sizes.draw(strategies.integers(1, 16384)) + chunk = reader.read(read_size) + if not chunk: + break + + chunks.append(chunk) + + self.assertEqual(b''.join(chunks), original) + + @hypothesis.settings( + suppress_health_check=[hypothesis.HealthCheck.large_base_example]) + @hypothesis.given(original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + source_read_size=strategies.integers(1, 16384), + read_sizes=strategies.data()) + def test_buffer_source_read_variance(self, original, level, source_read_size, + read_sizes): + cctx = zstd.ZstdCompressor(level=level) + frame = cctx.compress(original) + + dctx = zstd.ZstdDecompressor() + chunks = [] + + with dctx.stream_reader(frame, read_size=source_read_size) as reader: + while True: + read_size = read_sizes.draw(strategies.integers(1, 16384)) + chunk = reader.read(read_size) + if not chunk: + break + + chunks.append(chunk) + + self.assertEqual(b''.join(chunks), original) + + @hypothesis.settings( + suppress_health_check=[hypothesis.HealthCheck.large_base_example]) + @hypothesis.given( + original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + source_read_size=strategies.integers(1, 16384), + seek_amounts=strategies.data(), + read_sizes=strategies.data()) + def test_relative_seeks(self, original, level, source_read_size, seek_amounts, + read_sizes): + cctx = zstd.ZstdCompressor(level=level) + frame = cctx.compress(original) + + dctx = zstd.ZstdDecompressor() + + with dctx.stream_reader(frame, read_size=source_read_size) as reader: + while True: + amount = seek_amounts.draw(strategies.integers(0, 16384)) + reader.seek(amount, os.SEEK_CUR) + + offset = reader.tell() + read_amount = read_sizes.draw(strategies.integers(1, 16384)) + chunk = reader.read(read_amount) + + if not chunk: + break + + self.assertEqual(original[offset:offset + len(chunk)], chunk) + + +@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') +@make_cffi +class TestDecompressor_stream_writer_fuzzing(unittest.TestCase): @hypothesis.given(original=strategies.sampled_from(random_input_data()), level=strategies.integers(min_value=1, max_value=5), write_size=strategies.integers(min_value=1, max_value=8192), - input_sizes=strategies.streaming( - strategies.integers(min_value=1, max_value=4096))) + input_sizes=strategies.data()) def test_write_size_variance(self, original, level, write_size, input_sizes): - input_sizes = iter(input_sizes) - cctx = zstd.ZstdCompressor(level=level) frame = cctx.compress(original) @@ -38,9 +115,10 @@ source = io.BytesIO(frame) dest = io.BytesIO() - with dctx.write_to(dest, write_size=write_size) as decompressor: + with dctx.stream_writer(dest, write_size=write_size) as decompressor: while True: - chunk = source.read(next(input_sizes)) + input_size = input_sizes.draw(strategies.integers(1, 4096)) + chunk = source.read(input_size) if not chunk: break @@ -74,11 +152,8 @@ class TestDecompressor_decompressobj_fuzzing(unittest.TestCase): @hypothesis.given(original=strategies.sampled_from(random_input_data()), level=strategies.integers(min_value=1, max_value=5), - chunk_sizes=strategies.streaming( - strategies.integers(min_value=1, max_value=4096))) + chunk_sizes=strategies.data()) def test_random_input_sizes(self, original, level, chunk_sizes): - chunk_sizes = iter(chunk_sizes) - cctx = zstd.ZstdCompressor(level=level) frame = cctx.compress(original) @@ -89,7 +164,33 @@ chunks = [] while True: - chunk = source.read(next(chunk_sizes)) + chunk_size = chunk_sizes.draw(strategies.integers(1, 4096)) + chunk = source.read(chunk_size) + if not chunk: + break + + chunks.append(dobj.decompress(chunk)) + + self.assertEqual(b''.join(chunks), original) + + @hypothesis.given(original=strategies.sampled_from(random_input_data()), + level=strategies.integers(min_value=1, max_value=5), + write_size=strategies.integers(min_value=1, + max_value=4 * zstd.DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE), + chunk_sizes=strategies.data()) + def test_random_output_sizes(self, original, level, write_size, chunk_sizes): + cctx = zstd.ZstdCompressor(level=level) + frame = cctx.compress(original) + + source = io.BytesIO(frame) + + dctx = zstd.ZstdDecompressor() + dobj = dctx.decompressobj(write_size=write_size) + + chunks = [] + while True: + chunk_size = chunk_sizes.draw(strategies.integers(1, 4096)) + chunk = source.read(chunk_size) if not chunk: break @@ -100,7 +201,7 @@ @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') @make_cffi -class TestDecompressor_read_from_fuzzing(unittest.TestCase): +class TestDecompressor_read_to_iter_fuzzing(unittest.TestCase): @hypothesis.given(original=strategies.sampled_from(random_input_data()), level=strategies.integers(min_value=1, max_value=5), read_size=strategies.integers(min_value=1, max_value=4096), @@ -112,7 +213,7 @@ source = io.BytesIO(frame) dctx = zstd.ZstdDecompressor() - chunks = list(dctx.read_from(source, read_size=read_size, write_size=write_size)) + chunks = list(dctx.read_to_iter(source, read_size=read_size, write_size=write_size)) self.assertEqual(b''.join(chunks), original)