1 import io |
1 import io |
2 import os |
2 import os |
3 |
3 import unittest |
4 try: |
|
5 import unittest2 as unittest |
|
6 except ImportError: |
|
7 import unittest |
|
8 |
4 |
9 try: |
5 try: |
10 import hypothesis |
6 import hypothesis |
11 import hypothesis.strategies as strategies |
7 import hypothesis.strategies as strategies |
12 except ImportError: |
8 except ImportError: |
13 raise unittest.SkipTest('hypothesis not available') |
9 raise unittest.SkipTest('hypothesis not available') |
14 |
10 |
15 import zstd |
11 import zstandard as zstd |
16 |
12 |
17 from . common import ( |
13 from . common import ( |
18 make_cffi, |
14 make_cffi, |
19 random_input_data, |
15 random_input_data, |
20 ) |
16 ) |
21 |
17 |
22 |
18 |
23 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') |
19 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') |
24 @make_cffi |
20 @make_cffi |
25 class TestCompressor_write_to_fuzzing(unittest.TestCase): |
21 class TestCompressor_stream_reader_fuzzing(unittest.TestCase): |
|
22 @hypothesis.given(original=strategies.sampled_from(random_input_data()), |
|
23 level=strategies.integers(min_value=1, max_value=5), |
|
24 source_read_size=strategies.integers(1, 16384), |
|
25 read_sizes=strategies.data()) |
|
26 def test_stream_source_read_variance(self, original, level, source_read_size, |
|
27 read_sizes): |
|
28 refctx = zstd.ZstdCompressor(level=level) |
|
29 ref_frame = refctx.compress(original) |
|
30 |
|
31 cctx = zstd.ZstdCompressor(level=level) |
|
32 with cctx.stream_reader(io.BytesIO(original), size=len(original), |
|
33 read_size=source_read_size) as reader: |
|
34 chunks = [] |
|
35 while True: |
|
36 read_size = read_sizes.draw(strategies.integers(1, 16384)) |
|
37 chunk = reader.read(read_size) |
|
38 |
|
39 if not chunk: |
|
40 break |
|
41 chunks.append(chunk) |
|
42 |
|
43 self.assertEqual(b''.join(chunks), ref_frame) |
|
44 |
|
45 @hypothesis.given(original=strategies.sampled_from(random_input_data()), |
|
46 level=strategies.integers(min_value=1, max_value=5), |
|
47 source_read_size=strategies.integers(1, 16384), |
|
48 read_sizes=strategies.data()) |
|
49 def test_buffer_source_read_variance(self, original, level, source_read_size, |
|
50 read_sizes): |
|
51 |
|
52 refctx = zstd.ZstdCompressor(level=level) |
|
53 ref_frame = refctx.compress(original) |
|
54 |
|
55 cctx = zstd.ZstdCompressor(level=level) |
|
56 with cctx.stream_reader(original, size=len(original), |
|
57 read_size=source_read_size) as reader: |
|
58 chunks = [] |
|
59 while True: |
|
60 read_size = read_sizes.draw(strategies.integers(1, 16384)) |
|
61 chunk = reader.read(read_size) |
|
62 if not chunk: |
|
63 break |
|
64 chunks.append(chunk) |
|
65 |
|
66 self.assertEqual(b''.join(chunks), ref_frame) |
|
67 |
|
68 |
|
69 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') |
|
70 @make_cffi |
|
71 class TestCompressor_stream_writer_fuzzing(unittest.TestCase): |
26 @hypothesis.given(original=strategies.sampled_from(random_input_data()), |
72 @hypothesis.given(original=strategies.sampled_from(random_input_data()), |
27 level=strategies.integers(min_value=1, max_value=5), |
73 level=strategies.integers(min_value=1, max_value=5), |
28 write_size=strategies.integers(min_value=1, max_value=1048576)) |
74 write_size=strategies.integers(min_value=1, max_value=1048576)) |
29 def test_write_size_variance(self, original, level, write_size): |
75 def test_write_size_variance(self, original, level, write_size): |
30 refctx = zstd.ZstdCompressor(level=level) |
76 refctx = zstd.ZstdCompressor(level=level) |
31 ref_frame = refctx.compress(original) |
77 ref_frame = refctx.compress(original) |
32 |
78 |
33 cctx = zstd.ZstdCompressor(level=level) |
79 cctx = zstd.ZstdCompressor(level=level) |
34 b = io.BytesIO() |
80 b = io.BytesIO() |
35 with cctx.write_to(b, size=len(original), write_size=write_size) as compressor: |
81 with cctx.stream_writer(b, size=len(original), write_size=write_size) as compressor: |
36 compressor.write(original) |
82 compressor.write(original) |
37 |
83 |
38 self.assertEqual(b.getvalue(), ref_frame) |
84 self.assertEqual(b.getvalue(), ref_frame) |
39 |
85 |
40 |
86 |
60 |
106 |
61 |
107 |
62 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') |
108 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') |
63 @make_cffi |
109 @make_cffi |
64 class TestCompressor_compressobj_fuzzing(unittest.TestCase): |
110 class TestCompressor_compressobj_fuzzing(unittest.TestCase): |
|
111 @hypothesis.settings( |
|
112 suppress_health_check=[hypothesis.HealthCheck.large_base_example]) |
65 @hypothesis.given(original=strategies.sampled_from(random_input_data()), |
113 @hypothesis.given(original=strategies.sampled_from(random_input_data()), |
66 level=strategies.integers(min_value=1, max_value=5), |
114 level=strategies.integers(min_value=1, max_value=5), |
67 chunk_sizes=strategies.streaming( |
115 chunk_sizes=strategies.data()) |
68 strategies.integers(min_value=1, max_value=4096))) |
|
69 def test_random_input_sizes(self, original, level, chunk_sizes): |
116 def test_random_input_sizes(self, original, level, chunk_sizes): |
70 chunk_sizes = iter(chunk_sizes) |
|
71 |
|
72 refctx = zstd.ZstdCompressor(level=level) |
117 refctx = zstd.ZstdCompressor(level=level) |
73 ref_frame = refctx.compress(original) |
118 ref_frame = refctx.compress(original) |
74 |
119 |
75 cctx = zstd.ZstdCompressor(level=level) |
120 cctx = zstd.ZstdCompressor(level=level) |
76 cobj = cctx.compressobj(size=len(original)) |
121 cobj = cctx.compressobj(size=len(original)) |
77 |
122 |
78 chunks = [] |
123 chunks = [] |
79 i = 0 |
124 i = 0 |
80 while True: |
125 while True: |
81 chunk_size = next(chunk_sizes) |
126 chunk_size = chunk_sizes.draw(strategies.integers(1, 4096)) |
82 source = original[i:i + chunk_size] |
127 source = original[i:i + chunk_size] |
83 if not source: |
128 if not source: |
84 break |
129 break |
85 |
130 |
86 chunks.append(cobj.compress(source)) |
131 chunks.append(cobj.compress(source)) |
91 self.assertEqual(b''.join(chunks), ref_frame) |
136 self.assertEqual(b''.join(chunks), ref_frame) |
92 |
137 |
93 |
138 |
94 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') |
139 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') |
95 @make_cffi |
140 @make_cffi |
96 class TestCompressor_read_from_fuzzing(unittest.TestCase): |
141 class TestCompressor_read_to_iter_fuzzing(unittest.TestCase): |
97 @hypothesis.given(original=strategies.sampled_from(random_input_data()), |
142 @hypothesis.given(original=strategies.sampled_from(random_input_data()), |
98 level=strategies.integers(min_value=1, max_value=5), |
143 level=strategies.integers(min_value=1, max_value=5), |
99 read_size=strategies.integers(min_value=1, max_value=4096), |
144 read_size=strategies.integers(min_value=1, max_value=4096), |
100 write_size=strategies.integers(min_value=1, max_value=4096)) |
145 write_size=strategies.integers(min_value=1, max_value=4096)) |
101 def test_read_write_size_variance(self, original, level, read_size, write_size): |
146 def test_read_write_size_variance(self, original, level, read_size, write_size): |
103 ref_frame = refcctx.compress(original) |
148 ref_frame = refcctx.compress(original) |
104 |
149 |
105 source = io.BytesIO(original) |
150 source = io.BytesIO(original) |
106 |
151 |
107 cctx = zstd.ZstdCompressor(level=level) |
152 cctx = zstd.ZstdCompressor(level=level) |
108 chunks = list(cctx.read_from(source, size=len(original), read_size=read_size, |
153 chunks = list(cctx.read_to_iter(source, size=len(original), |
109 write_size=write_size)) |
154 read_size=read_size, |
|
155 write_size=write_size)) |
110 |
156 |
111 self.assertEqual(b''.join(chunks), ref_frame) |
157 self.assertEqual(b''.join(chunks), ref_frame) |
112 |
158 |
113 |
159 |
114 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') |
160 @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set') |