view contrib/python-zstandard/tests/test_compressor_fuzzing.py @ 40121:73fef626dae3

zstandard: vendor python-zstandard 0.10.1 This was just released. The upstream source distribution from PyPI was extracted. Unwanted files were removed. The clang-format ignore list was updated to reflect the new source of files. setup.py was updated to pass a new argument to python-zstandard's function for returning an Extension instance. Upstream had to change to use relative paths because Python 3.7's packaging doesn't seem to like absolute paths when defining sources, includes, etc. The default relative path calculation is relative to setup_zstd.py which is different from the directory of Mercurial's setup.py. The project contains a vendored copy of zstandard 1.3.6. The old version was 1.3.4. The API should be backwards compatible and nothing in core should need adjusted. However, there is a new "chunker" API that we may find useful in places where we want to emit compressed chunks of a fixed size. There are a pair of bug fixes in 0.10.0 with regards to compressobj() and decompressobj() when block flushing is used. I actually found these bugs when introducing these APIs in Mercurial! But existing Mercurial code is not affected because we don't perform block flushing. # no-check-commit because 3rd party code has different style guidelines Differential Revision: https://phab.mercurial-scm.org/D4911
author Gregory Szorc <gregory.szorc@gmail.com>
date Mon, 08 Oct 2018 16:27:40 -0700
parents b1fb341d8a61
children 675775c33ab6
line wrap: on
line source

import io
import os
import unittest

try:
    import hypothesis
    import hypothesis.strategies as strategies
except ImportError:
    raise unittest.SkipTest('hypothesis not available')

import zstandard as zstd

from . common import (
    make_cffi,
    random_input_data,
)


@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
@make_cffi
class TestCompressor_stream_reader_fuzzing(unittest.TestCase):
    @hypothesis.given(original=strategies.sampled_from(random_input_data()),
                      level=strategies.integers(min_value=1, max_value=5),
                      source_read_size=strategies.integers(1, 16384),
                      read_sizes=strategies.data())
    def test_stream_source_read_variance(self, original, level, source_read_size,
                                         read_sizes):
        refctx = zstd.ZstdCompressor(level=level)
        ref_frame = refctx.compress(original)

        cctx = zstd.ZstdCompressor(level=level)
        with cctx.stream_reader(io.BytesIO(original), size=len(original),
                                read_size=source_read_size) as reader:
            chunks = []
            while True:
                read_size = read_sizes.draw(strategies.integers(1, 16384))
                chunk = reader.read(read_size)

                if not chunk:
                    break
                chunks.append(chunk)

        self.assertEqual(b''.join(chunks), ref_frame)

    @hypothesis.given(original=strategies.sampled_from(random_input_data()),
                      level=strategies.integers(min_value=1, max_value=5),
                      source_read_size=strategies.integers(1, 16384),
                      read_sizes=strategies.data())
    def test_buffer_source_read_variance(self, original, level, source_read_size,
                                         read_sizes):

        refctx = zstd.ZstdCompressor(level=level)
        ref_frame = refctx.compress(original)

        cctx = zstd.ZstdCompressor(level=level)
        with cctx.stream_reader(original, size=len(original),
                                read_size=source_read_size) as reader:
            chunks = []
            while True:
                read_size = read_sizes.draw(strategies.integers(1, 16384))
                chunk = reader.read(read_size)
                if not chunk:
                    break
                chunks.append(chunk)

        self.assertEqual(b''.join(chunks), ref_frame)


@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
@make_cffi
class TestCompressor_stream_writer_fuzzing(unittest.TestCase):
    @hypothesis.given(original=strategies.sampled_from(random_input_data()),
                        level=strategies.integers(min_value=1, max_value=5),
                        write_size=strategies.integers(min_value=1, max_value=1048576))
    def test_write_size_variance(self, original, level, write_size):
        refctx = zstd.ZstdCompressor(level=level)
        ref_frame = refctx.compress(original)

        cctx = zstd.ZstdCompressor(level=level)
        b = io.BytesIO()
        with cctx.stream_writer(b, size=len(original), write_size=write_size) as compressor:
            compressor.write(original)

        self.assertEqual(b.getvalue(), ref_frame)


@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
@make_cffi
class TestCompressor_copy_stream_fuzzing(unittest.TestCase):
    @hypothesis.given(original=strategies.sampled_from(random_input_data()),
                      level=strategies.integers(min_value=1, max_value=5),
                      read_size=strategies.integers(min_value=1, max_value=1048576),
                      write_size=strategies.integers(min_value=1, max_value=1048576))
    def test_read_write_size_variance(self, original, level, read_size, write_size):
        refctx = zstd.ZstdCompressor(level=level)
        ref_frame = refctx.compress(original)

        cctx = zstd.ZstdCompressor(level=level)
        source = io.BytesIO(original)
        dest = io.BytesIO()

        cctx.copy_stream(source, dest, size=len(original), read_size=read_size,
                         write_size=write_size)

        self.assertEqual(dest.getvalue(), ref_frame)


@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
@make_cffi
class TestCompressor_compressobj_fuzzing(unittest.TestCase):
    @hypothesis.settings(
        suppress_health_check=[hypothesis.HealthCheck.large_base_example])
    @hypothesis.given(original=strategies.sampled_from(random_input_data()),
                      level=strategies.integers(min_value=1, max_value=5),
                      chunk_sizes=strategies.data())
    def test_random_input_sizes(self, original, level, chunk_sizes):
        refctx = zstd.ZstdCompressor(level=level)
        ref_frame = refctx.compress(original)

        cctx = zstd.ZstdCompressor(level=level)
        cobj = cctx.compressobj(size=len(original))

        chunks = []
        i = 0
        while True:
            chunk_size = chunk_sizes.draw(strategies.integers(1, 4096))
            source = original[i:i + chunk_size]
            if not source:
                break

            chunks.append(cobj.compress(source))
            i += chunk_size

        chunks.append(cobj.flush())

        self.assertEqual(b''.join(chunks), ref_frame)

    @hypothesis.settings(
        suppress_health_check=[hypothesis.HealthCheck.large_base_example])
    @hypothesis.given(original=strategies.sampled_from(random_input_data()),
                      level=strategies.integers(min_value=1, max_value=5),
                      chunk_sizes=strategies.data(),
                      flushes=strategies.data())
    def test_flush_block(self, original, level, chunk_sizes, flushes):
        cctx = zstd.ZstdCompressor(level=level)
        cobj = cctx.compressobj()

        dctx = zstd.ZstdDecompressor()
        dobj = dctx.decompressobj()

        compressed_chunks = []
        decompressed_chunks = []
        i = 0
        while True:
            input_size = chunk_sizes.draw(strategies.integers(1, 4096))
            source = original[i:i + input_size]
            if not source:
                break

            i += input_size

            chunk = cobj.compress(source)
            compressed_chunks.append(chunk)
            decompressed_chunks.append(dobj.decompress(chunk))

            if not flushes.draw(strategies.booleans()):
                continue

            chunk = cobj.flush(zstd.COMPRESSOBJ_FLUSH_BLOCK)
            compressed_chunks.append(chunk)
            decompressed_chunks.append(dobj.decompress(chunk))

            self.assertEqual(b''.join(decompressed_chunks), original[0:i])

        chunk = cobj.flush(zstd.COMPRESSOBJ_FLUSH_FINISH)
        compressed_chunks.append(chunk)
        decompressed_chunks.append(dobj.decompress(chunk))

        self.assertEqual(dctx.decompress(b''.join(compressed_chunks),
                                         max_output_size=len(original)),
                         original)
        self.assertEqual(b''.join(decompressed_chunks), original)

@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
@make_cffi
class TestCompressor_read_to_iter_fuzzing(unittest.TestCase):
    @hypothesis.given(original=strategies.sampled_from(random_input_data()),
                      level=strategies.integers(min_value=1, max_value=5),
                      read_size=strategies.integers(min_value=1, max_value=4096),
                      write_size=strategies.integers(min_value=1, max_value=4096))
    def test_read_write_size_variance(self, original, level, read_size, write_size):
        refcctx = zstd.ZstdCompressor(level=level)
        ref_frame = refcctx.compress(original)

        source = io.BytesIO(original)

        cctx = zstd.ZstdCompressor(level=level)
        chunks = list(cctx.read_to_iter(source, size=len(original),
                                        read_size=read_size,
                                        write_size=write_size))

        self.assertEqual(b''.join(chunks), ref_frame)


@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
class TestCompressor_multi_compress_to_buffer_fuzzing(unittest.TestCase):
    @hypothesis.given(original=strategies.lists(strategies.sampled_from(random_input_data()),
                                                min_size=1, max_size=1024),
                        threads=strategies.integers(min_value=1, max_value=8),
                        use_dict=strategies.booleans())
    def test_data_equivalence(self, original, threads, use_dict):
        kwargs = {}

        # Use a content dictionary because it is cheap to create.
        if use_dict:
            kwargs['dict_data'] = zstd.ZstdCompressionDict(original[0])

        cctx = zstd.ZstdCompressor(level=1,
                                   write_checksum=True,
                                   **kwargs)

        result = cctx.multi_compress_to_buffer(original, threads=-1)

        self.assertEqual(len(result), len(original))

        # The frame produced via the batch APIs may not be bit identical to that
        # produced by compress() because compression parameters are adjusted
        # from the first input in batch mode. So the only thing we can do is
        # verify the decompressed data matches the input.
        dctx = zstd.ZstdDecompressor(**kwargs)

        for i, frame in enumerate(result):
            self.assertEqual(dctx.decompress(frame), original[i])


@unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
@make_cffi
class TestCompressor_chunker_fuzzing(unittest.TestCase):
    @hypothesis.settings(
        suppress_health_check=[hypothesis.HealthCheck.large_base_example])
    @hypothesis.given(original=strategies.sampled_from(random_input_data()),
                      level=strategies.integers(min_value=1, max_value=5),
                      chunk_size=strategies.integers(
                          min_value=1,
                          max_value=32 * 1048576),
                      input_sizes=strategies.data())
    def test_random_input_sizes(self, original, level, chunk_size, input_sizes):
        cctx = zstd.ZstdCompressor(level=level)
        chunker = cctx.chunker(chunk_size=chunk_size)

        chunks = []
        i = 0
        while True:
            input_size = input_sizes.draw(strategies.integers(1, 4096))
            source = original[i:i + input_size]
            if not source:
                break

            chunks.extend(chunker.compress(source))
            i += input_size

        chunks.extend(chunker.finish())

        dctx = zstd.ZstdDecompressor()

        self.assertEqual(dctx.decompress(b''.join(chunks),
                                         max_output_size=len(original)),
                         original)

        self.assertTrue(all(len(chunk) == chunk_size for chunk in chunks[:-1]))

    @hypothesis.settings(
        suppress_health_check=[hypothesis.HealthCheck.large_base_example])
    @hypothesis.given(original=strategies.sampled_from(random_input_data()),
                      level=strategies.integers(min_value=1, max_value=5),
                      chunk_size=strategies.integers(
                          min_value=1,
                          max_value=32 * 1048576),
                      input_sizes=strategies.data(),
                      flushes=strategies.data())
    def test_flush_block(self, original, level, chunk_size, input_sizes,
                         flushes):
        cctx = zstd.ZstdCompressor(level=level)
        chunker = cctx.chunker(chunk_size=chunk_size)

        dctx = zstd.ZstdDecompressor()
        dobj = dctx.decompressobj()

        compressed_chunks = []
        decompressed_chunks = []
        i = 0
        while True:
            input_size = input_sizes.draw(strategies.integers(1, 4096))
            source = original[i:i + input_size]
            if not source:
                break

            i += input_size

            chunks = list(chunker.compress(source))
            compressed_chunks.extend(chunks)
            decompressed_chunks.append(dobj.decompress(b''.join(chunks)))

            if not flushes.draw(strategies.booleans()):
                continue

            chunks = list(chunker.flush())
            compressed_chunks.extend(chunks)
            decompressed_chunks.append(dobj.decompress(b''.join(chunks)))

            self.assertEqual(b''.join(decompressed_chunks), original[0:i])

        chunks = list(chunker.finish())
        compressed_chunks.extend(chunks)
        decompressed_chunks.append(dobj.decompress(b''.join(chunks)))

        self.assertEqual(dctx.decompress(b''.join(compressed_chunks),
                                         max_output_size=len(original)),
                         original)
        self.assertEqual(b''.join(decompressed_chunks), original)