contrib/python-zstandard/tests/test_compressor_fuzzing.py
changeset 37495 b1fb341d8a61
parent 31799 e0dc40530c5a
child 40122 73fef626dae3
equal deleted inserted replaced
37494:1ce7a55b09d1 37495:b1fb341d8a61
     1 import io
     1 import io
     2 import os
     2 import os
     3 
     3 import unittest
     4 try:
       
     5     import unittest2 as unittest
       
     6 except ImportError:
       
     7     import unittest
       
     8 
     4 
     9 try:
     5 try:
    10     import hypothesis
     6     import hypothesis
    11     import hypothesis.strategies as strategies
     7     import hypothesis.strategies as strategies
    12 except ImportError:
     8 except ImportError:
    13     raise unittest.SkipTest('hypothesis not available')
     9     raise unittest.SkipTest('hypothesis not available')
    14 
    10 
    15 import zstd
    11 import zstandard as zstd
    16 
    12 
    17 from . common import (
    13 from . common import (
    18     make_cffi,
    14     make_cffi,
    19     random_input_data,
    15     random_input_data,
    20 )
    16 )
    21 
    17 
    22 
    18 
    23 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
    19 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
    24 @make_cffi
    20 @make_cffi
    25 class TestCompressor_write_to_fuzzing(unittest.TestCase):
    21 class TestCompressor_stream_reader_fuzzing(unittest.TestCase):
       
    22     @hypothesis.given(original=strategies.sampled_from(random_input_data()),
       
    23                       level=strategies.integers(min_value=1, max_value=5),
       
    24                       source_read_size=strategies.integers(1, 16384),
       
    25                       read_sizes=strategies.data())
       
    26     def test_stream_source_read_variance(self, original, level, source_read_size,
       
    27                                          read_sizes):
       
    28         refctx = zstd.ZstdCompressor(level=level)
       
    29         ref_frame = refctx.compress(original)
       
    30 
       
    31         cctx = zstd.ZstdCompressor(level=level)
       
    32         with cctx.stream_reader(io.BytesIO(original), size=len(original),
       
    33                                 read_size=source_read_size) as reader:
       
    34             chunks = []
       
    35             while True:
       
    36                 read_size = read_sizes.draw(strategies.integers(1, 16384))
       
    37                 chunk = reader.read(read_size)
       
    38 
       
    39                 if not chunk:
       
    40                     break
       
    41                 chunks.append(chunk)
       
    42 
       
    43         self.assertEqual(b''.join(chunks), ref_frame)
       
    44 
       
    45     @hypothesis.given(original=strategies.sampled_from(random_input_data()),
       
    46                       level=strategies.integers(min_value=1, max_value=5),
       
    47                       source_read_size=strategies.integers(1, 16384),
       
    48                       read_sizes=strategies.data())
       
    49     def test_buffer_source_read_variance(self, original, level, source_read_size,
       
    50                                          read_sizes):
       
    51 
       
    52         refctx = zstd.ZstdCompressor(level=level)
       
    53         ref_frame = refctx.compress(original)
       
    54 
       
    55         cctx = zstd.ZstdCompressor(level=level)
       
    56         with cctx.stream_reader(original, size=len(original),
       
    57                                 read_size=source_read_size) as reader:
       
    58             chunks = []
       
    59             while True:
       
    60                 read_size = read_sizes.draw(strategies.integers(1, 16384))
       
    61                 chunk = reader.read(read_size)
       
    62                 if not chunk:
       
    63                     break
       
    64                 chunks.append(chunk)
       
    65 
       
    66         self.assertEqual(b''.join(chunks), ref_frame)
       
    67 
       
    68 
       
    69 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
       
    70 @make_cffi
       
    71 class TestCompressor_stream_writer_fuzzing(unittest.TestCase):
    26     @hypothesis.given(original=strategies.sampled_from(random_input_data()),
    72     @hypothesis.given(original=strategies.sampled_from(random_input_data()),
    27                         level=strategies.integers(min_value=1, max_value=5),
    73                         level=strategies.integers(min_value=1, max_value=5),
    28                         write_size=strategies.integers(min_value=1, max_value=1048576))
    74                         write_size=strategies.integers(min_value=1, max_value=1048576))
    29     def test_write_size_variance(self, original, level, write_size):
    75     def test_write_size_variance(self, original, level, write_size):
    30         refctx = zstd.ZstdCompressor(level=level)
    76         refctx = zstd.ZstdCompressor(level=level)
    31         ref_frame = refctx.compress(original)
    77         ref_frame = refctx.compress(original)
    32 
    78 
    33         cctx = zstd.ZstdCompressor(level=level)
    79         cctx = zstd.ZstdCompressor(level=level)
    34         b = io.BytesIO()
    80         b = io.BytesIO()
    35         with cctx.write_to(b, size=len(original), write_size=write_size) as compressor:
    81         with cctx.stream_writer(b, size=len(original), write_size=write_size) as compressor:
    36             compressor.write(original)
    82             compressor.write(original)
    37 
    83 
    38         self.assertEqual(b.getvalue(), ref_frame)
    84         self.assertEqual(b.getvalue(), ref_frame)
    39 
    85 
    40 
    86 
    60 
   106 
    61 
   107 
    62 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
   108 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
    63 @make_cffi
   109 @make_cffi
    64 class TestCompressor_compressobj_fuzzing(unittest.TestCase):
   110 class TestCompressor_compressobj_fuzzing(unittest.TestCase):
       
   111     @hypothesis.settings(
       
   112         suppress_health_check=[hypothesis.HealthCheck.large_base_example])
    65     @hypothesis.given(original=strategies.sampled_from(random_input_data()),
   113     @hypothesis.given(original=strategies.sampled_from(random_input_data()),
    66                       level=strategies.integers(min_value=1, max_value=5),
   114                       level=strategies.integers(min_value=1, max_value=5),
    67                       chunk_sizes=strategies.streaming(
   115                       chunk_sizes=strategies.data())
    68                           strategies.integers(min_value=1, max_value=4096)))
       
    69     def test_random_input_sizes(self, original, level, chunk_sizes):
   116     def test_random_input_sizes(self, original, level, chunk_sizes):
    70         chunk_sizes = iter(chunk_sizes)
       
    71 
       
    72         refctx = zstd.ZstdCompressor(level=level)
   117         refctx = zstd.ZstdCompressor(level=level)
    73         ref_frame = refctx.compress(original)
   118         ref_frame = refctx.compress(original)
    74 
   119 
    75         cctx = zstd.ZstdCompressor(level=level)
   120         cctx = zstd.ZstdCompressor(level=level)
    76         cobj = cctx.compressobj(size=len(original))
   121         cobj = cctx.compressobj(size=len(original))
    77 
   122 
    78         chunks = []
   123         chunks = []
    79         i = 0
   124         i = 0
    80         while True:
   125         while True:
    81             chunk_size = next(chunk_sizes)
   126             chunk_size = chunk_sizes.draw(strategies.integers(1, 4096))
    82             source = original[i:i + chunk_size]
   127             source = original[i:i + chunk_size]
    83             if not source:
   128             if not source:
    84                 break
   129                 break
    85 
   130 
    86             chunks.append(cobj.compress(source))
   131             chunks.append(cobj.compress(source))
    91         self.assertEqual(b''.join(chunks), ref_frame)
   136         self.assertEqual(b''.join(chunks), ref_frame)
    92 
   137 
    93 
   138 
    94 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
   139 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
    95 @make_cffi
   140 @make_cffi
    96 class TestCompressor_read_from_fuzzing(unittest.TestCase):
   141 class TestCompressor_read_to_iter_fuzzing(unittest.TestCase):
    97     @hypothesis.given(original=strategies.sampled_from(random_input_data()),
   142     @hypothesis.given(original=strategies.sampled_from(random_input_data()),
    98                       level=strategies.integers(min_value=1, max_value=5),
   143                       level=strategies.integers(min_value=1, max_value=5),
    99                       read_size=strategies.integers(min_value=1, max_value=4096),
   144                       read_size=strategies.integers(min_value=1, max_value=4096),
   100                       write_size=strategies.integers(min_value=1, max_value=4096))
   145                       write_size=strategies.integers(min_value=1, max_value=4096))
   101     def test_read_write_size_variance(self, original, level, read_size, write_size):
   146     def test_read_write_size_variance(self, original, level, read_size, write_size):
   103         ref_frame = refcctx.compress(original)
   148         ref_frame = refcctx.compress(original)
   104 
   149 
   105         source = io.BytesIO(original)
   150         source = io.BytesIO(original)
   106 
   151 
   107         cctx = zstd.ZstdCompressor(level=level)
   152         cctx = zstd.ZstdCompressor(level=level)
   108         chunks = list(cctx.read_from(source, size=len(original), read_size=read_size,
   153         chunks = list(cctx.read_to_iter(source, size=len(original),
   109                                      write_size=write_size))
   154                                         read_size=read_size,
       
   155                                         write_size=write_size))
   110 
   156 
   111         self.assertEqual(b''.join(chunks), ref_frame)
   157         self.assertEqual(b''.join(chunks), ref_frame)
   112 
   158 
   113 
   159 
   114 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
   160 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
   123         # Use a content dictionary because it is cheap to create.
   169         # Use a content dictionary because it is cheap to create.
   124         if use_dict:
   170         if use_dict:
   125             kwargs['dict_data'] = zstd.ZstdCompressionDict(original[0])
   171             kwargs['dict_data'] = zstd.ZstdCompressionDict(original[0])
   126 
   172 
   127         cctx = zstd.ZstdCompressor(level=1,
   173         cctx = zstd.ZstdCompressor(level=1,
   128                                    write_content_size=True,
       
   129                                    write_checksum=True,
   174                                    write_checksum=True,
   130                                    **kwargs)
   175                                    **kwargs)
   131 
   176 
   132         result = cctx.multi_compress_to_buffer(original, threads=-1)
   177         result = cctx.multi_compress_to_buffer(original, threads=-1)
   133 
   178