comparison contrib/python-zstandard/tests/test_compressor.py @ 31796:e0dc40530c5a

zstd: vendor python-zstandard 0.8.0 Commit 81e1f5bbf1fc54808649562d3ed829730765c540 from https://github.com/indygreg/python-zstandard is imported without modifications (other than removing unwanted files). Updates relevant to Mercurial include: * Support for multi-threaded compression (we can use this for bundle and wire protocol compression). * APIs for batch compression and decompression operations using multiple threads and optimal memory allocation mechanism. (Can be useful for revlog perf improvements.) * A ``BufferWithSegments`` type that models a single memory buffer containing N discrete items of known lengths. This type can be used for very efficient 0-copy data operations. # no-check-commit
author Gregory Szorc <gregory.szorc@gmail.com>
date Sat, 01 Apr 2017 15:24:03 -0700
parents c32454d69b85
children b1fb341d8a61
comparison
equal deleted inserted replaced
31795:2b130e26c3a4 31796:e0dc40530c5a
18 18
19 if sys.version_info[0] >= 3: 19 if sys.version_info[0] >= 3:
20 next = lambda it: it.__next__() 20 next = lambda it: it.__next__()
21 else: 21 else:
22 next = lambda it: it.next() 22 next = lambda it: it.next()
23
24
25 def multithreaded_chunk_size(level, source_size=0):
26 params = zstd.get_compression_parameters(level, source_size)
27
28 return 1 << (params.window_log + 2)
23 29
24 30
25 @make_cffi 31 @make_cffi
26 class TestCompressor(unittest.TestCase): 32 class TestCompressor(unittest.TestCase):
27 def test_level_bounds(self): 33 def test_level_bounds(self):
32 zstd.ZstdCompressor(level=23) 38 zstd.ZstdCompressor(level=23)
33 39
34 40
35 @make_cffi 41 @make_cffi
36 class TestCompressor_compress(unittest.TestCase): 42 class TestCompressor_compress(unittest.TestCase):
43 def test_multithreaded_unsupported(self):
44 samples = []
45 for i in range(128):
46 samples.append(b'foo' * 64)
47 samples.append(b'bar' * 64)
48
49 d = zstd.train_dictionary(8192, samples)
50
51 cctx = zstd.ZstdCompressor(dict_data=d, threads=2)
52
53 with self.assertRaisesRegexp(zstd.ZstdError, 'compress\(\) cannot be used with both dictionaries and multi-threaded compression'):
54 cctx.compress(b'foo')
55
56 params = zstd.get_compression_parameters(3)
57 cctx = zstd.ZstdCompressor(compression_params=params, threads=2)
58 with self.assertRaisesRegexp(zstd.ZstdError, 'compress\(\) cannot be used with both compression parameters and multi-threaded compression'):
59 cctx.compress(b'foo')
60
37 def test_compress_empty(self): 61 def test_compress_empty(self):
38 cctx = zstd.ZstdCompressor(level=1) 62 cctx = zstd.ZstdCompressor(level=1)
39 result = cctx.compress(b'') 63 result = cctx.compress(b'')
40 self.assertEqual(result, b'\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00') 64 self.assertEqual(result, b'\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00')
41 params = zstd.get_frame_parameters(result) 65 params = zstd.get_frame_parameters(result)
129 153
130 cctx = zstd.ZstdCompressor(level=1, dict_data=d) 154 cctx = zstd.ZstdCompressor(level=1, dict_data=d)
131 155
132 for i in range(32): 156 for i in range(32):
133 cctx.compress(b'foo bar foobar foo bar foobar') 157 cctx.compress(b'foo bar foobar foo bar foobar')
158
159 def test_multithreaded(self):
160 chunk_size = multithreaded_chunk_size(1)
161 source = b''.join([b'x' * chunk_size, b'y' * chunk_size])
162
163 cctx = zstd.ZstdCompressor(level=1, threads=2)
164 compressed = cctx.compress(source)
165
166 params = zstd.get_frame_parameters(compressed)
167 self.assertEqual(params.content_size, chunk_size * 2)
168 self.assertEqual(params.dict_id, 0)
169 self.assertFalse(params.has_checksum)
170
171 dctx = zstd.ZstdDecompressor()
172 self.assertEqual(dctx.decompress(compressed), source)
134 173
135 174
136 @make_cffi 175 @make_cffi
137 class TestCompressor_compressobj(unittest.TestCase): 176 class TestCompressor_compressobj(unittest.TestCase):
138 def test_compressobj_empty(self): 177 def test_compressobj_empty(self):
235 # 3 bytes block header + 4 bytes frame checksum 274 # 3 bytes block header + 4 bytes frame checksum
236 self.assertEqual(len(trailing), 7) 275 self.assertEqual(len(trailing), 7)
237 header = trailing[0:3] 276 header = trailing[0:3]
238 self.assertEqual(header, b'\x01\x00\x00') 277 self.assertEqual(header, b'\x01\x00\x00')
239 278
279 def test_multithreaded(self):
280 source = io.BytesIO()
281 source.write(b'a' * 1048576)
282 source.write(b'b' * 1048576)
283 source.write(b'c' * 1048576)
284 source.seek(0)
285
286 cctx = zstd.ZstdCompressor(level=1, threads=2)
287 cobj = cctx.compressobj()
288
289 chunks = []
290 while True:
291 d = source.read(8192)
292 if not d:
293 break
294
295 chunks.append(cobj.compress(d))
296
297 chunks.append(cobj.flush())
298
299 compressed = b''.join(chunks)
300
301 self.assertEqual(len(compressed), 295)
302
240 303
241 @make_cffi 304 @make_cffi
242 class TestCompressor_copy_stream(unittest.TestCase): 305 class TestCompressor_copy_stream(unittest.TestCase):
243 def test_no_read(self): 306 def test_no_read(self):
244 source = object() 307 source = object()
352 415
353 self.assertEqual(r, len(source.getvalue())) 416 self.assertEqual(r, len(source.getvalue()))
354 self.assertEqual(w, 21) 417 self.assertEqual(w, 21)
355 self.assertEqual(source._read_count, len(source.getvalue()) + 1) 418 self.assertEqual(source._read_count, len(source.getvalue()) + 1)
356 self.assertEqual(dest._write_count, len(dest.getvalue())) 419 self.assertEqual(dest._write_count, len(dest.getvalue()))
420
421 def test_multithreaded(self):
422 source = io.BytesIO()
423 source.write(b'a' * 1048576)
424 source.write(b'b' * 1048576)
425 source.write(b'c' * 1048576)
426 source.seek(0)
427
428 dest = io.BytesIO()
429 cctx = zstd.ZstdCompressor(threads=2)
430 r, w = cctx.copy_stream(source, dest)
431 self.assertEqual(r, 3145728)
432 self.assertEqual(w, 295)
433
434 params = zstd.get_frame_parameters(dest.getvalue())
435 self.assertEqual(params.content_size, 0)
436 self.assertEqual(params.dict_id, 0)
437 self.assertFalse(params.has_checksum)
438
439 # Writing content size and checksum works.
440 cctx = zstd.ZstdCompressor(threads=2, write_content_size=True,
441 write_checksum=True)
442 dest = io.BytesIO()
443 source.seek(0)
444 cctx.copy_stream(source, dest, size=len(source.getvalue()))
445
446 params = zstd.get_frame_parameters(dest.getvalue())
447 self.assertEqual(params.content_size, 3145728)
448 self.assertEqual(params.dict_id, 0)
449 self.assertTrue(params.has_checksum)
357 450
358 451
359 def compress(data, level): 452 def compress(data, level):
360 buffer = io.BytesIO() 453 buffer = io.BytesIO()
361 cctx = zstd.ZstdCompressor(level=level) 454 cctx = zstd.ZstdCompressor(level=level)
582 self.assertEqual(len(trailing), 7) 675 self.assertEqual(len(trailing), 7)
583 676
584 header = trailing[0:3] 677 header = trailing[0:3]
585 self.assertEqual(header, b'\x01\x00\x00') 678 self.assertEqual(header, b'\x01\x00\x00')
586 679
680 def test_multithreaded(self):
681 dest = io.BytesIO()
682 cctx = zstd.ZstdCompressor(threads=2)
683 with cctx.write_to(dest) as compressor:
684 compressor.write(b'a' * 1048576)
685 compressor.write(b'b' * 1048576)
686 compressor.write(b'c' * 1048576)
687
688 self.assertEqual(len(dest.getvalue()), 295)
689
587 690
588 @make_cffi 691 @make_cffi
589 class TestCompressor_read_from(unittest.TestCase): 692 class TestCompressor_read_from(unittest.TestCase):
590 def test_type_validation(self): 693 def test_type_validation(self):
591 cctx = zstd.ZstdCompressor() 694 cctx = zstd.ZstdCompressor()
671 cctx = zstd.ZstdCompressor(level=3) 774 cctx = zstd.ZstdCompressor(level=3)
672 for chunk in cctx.read_from(source, read_size=1, write_size=1): 775 for chunk in cctx.read_from(source, read_size=1, write_size=1):
673 self.assertEqual(len(chunk), 1) 776 self.assertEqual(len(chunk), 1)
674 777
675 self.assertEqual(source._read_count, len(source.getvalue()) + 1) 778 self.assertEqual(source._read_count, len(source.getvalue()) + 1)
779
780 def test_multithreaded(self):
781 source = io.BytesIO()
782 source.write(b'a' * 1048576)
783 source.write(b'b' * 1048576)
784 source.write(b'c' * 1048576)
785 source.seek(0)
786
787 cctx = zstd.ZstdCompressor(threads=2)
788
789 compressed = b''.join(cctx.read_from(source))
790 self.assertEqual(len(compressed), 295)
791
792
793 class TestCompressor_multi_compress_to_buffer(unittest.TestCase):
794 def test_multithreaded_unsupported(self):
795 cctx = zstd.ZstdCompressor(threads=2)
796
797 with self.assertRaisesRegexp(zstd.ZstdError, 'function cannot be called on ZstdCompressor configured for multi-threaded compression'):
798 cctx.multi_compress_to_buffer([b'foo'])
799
800 def test_invalid_inputs(self):
801 cctx = zstd.ZstdCompressor()
802
803 with self.assertRaises(TypeError):
804 cctx.multi_compress_to_buffer(True)
805
806 with self.assertRaises(TypeError):
807 cctx.multi_compress_to_buffer((1, 2))
808
809 with self.assertRaisesRegexp(TypeError, 'item 0 not a bytes like object'):
810 cctx.multi_compress_to_buffer([u'foo'])
811
812 def test_empty_input(self):
813 cctx = zstd.ZstdCompressor()
814
815 with self.assertRaisesRegexp(ValueError, 'no source elements found'):
816 cctx.multi_compress_to_buffer([])
817
818 with self.assertRaisesRegexp(ValueError, 'source elements are empty'):
819 cctx.multi_compress_to_buffer([b'', b'', b''])
820
821 def test_list_input(self):
822 cctx = zstd.ZstdCompressor(write_content_size=True, write_checksum=True)
823
824 original = [b'foo' * 12, b'bar' * 6]
825 frames = [cctx.compress(c) for c in original]
826 b = cctx.multi_compress_to_buffer(original)
827
828 self.assertIsInstance(b, zstd.BufferWithSegmentsCollection)
829
830 self.assertEqual(len(b), 2)
831 self.assertEqual(b.size(), 44)
832
833 self.assertEqual(b[0].tobytes(), frames[0])
834 self.assertEqual(b[1].tobytes(), frames[1])
835
836 def test_buffer_with_segments_input(self):
837 cctx = zstd.ZstdCompressor(write_content_size=True, write_checksum=True)
838
839 original = [b'foo' * 4, b'bar' * 6]
840 frames = [cctx.compress(c) for c in original]
841
842 offsets = struct.pack('=QQQQ', 0, len(original[0]),
843 len(original[0]), len(original[1]))
844 segments = zstd.BufferWithSegments(b''.join(original), offsets)
845
846 result = cctx.multi_compress_to_buffer(segments)
847
848 self.assertEqual(len(result), 2)
849 self.assertEqual(result.size(), 47)
850
851 self.assertEqual(result[0].tobytes(), frames[0])
852 self.assertEqual(result[1].tobytes(), frames[1])
853
854 def test_buffer_with_segments_collection_input(self):
855 cctx = zstd.ZstdCompressor(write_content_size=True, write_checksum=True)
856
857 original = [
858 b'foo1',
859 b'foo2' * 2,
860 b'foo3' * 3,
861 b'foo4' * 4,
862 b'foo5' * 5,
863 ]
864
865 frames = [cctx.compress(c) for c in original]
866
867 b = b''.join([original[0], original[1]])
868 b1 = zstd.BufferWithSegments(b, struct.pack('=QQQQ',
869 0, len(original[0]),
870 len(original[0]), len(original[1])))
871 b = b''.join([original[2], original[3], original[4]])
872 b2 = zstd.BufferWithSegments(b, struct.pack('=QQQQQQ',
873 0, len(original[2]),
874 len(original[2]), len(original[3]),
875 len(original[2]) + len(original[3]), len(original[4])))
876
877 c = zstd.BufferWithSegmentsCollection(b1, b2)
878
879 result = cctx.multi_compress_to_buffer(c)
880
881 self.assertEqual(len(result), len(frames))
882
883 for i, frame in enumerate(frames):
884 self.assertEqual(result[i].tobytes(), frame)
885
886 def test_multiple_threads(self):
887 # threads argument will cause multi-threaded ZSTD APIs to be used, which will
888 # make output different.
889 refcctx = zstd.ZstdCompressor(write_content_size=True, write_checksum=True)
890 reference = [refcctx.compress(b'x' * 64), refcctx.compress(b'y' * 64)]
891
892 cctx = zstd.ZstdCompressor(write_content_size=True, write_checksum=True)
893
894 frames = []
895 frames.extend(b'x' * 64 for i in range(256))
896 frames.extend(b'y' * 64 for i in range(256))
897
898 result = cctx.multi_compress_to_buffer(frames, threads=-1)
899
900 self.assertEqual(len(result), 512)
901 for i in range(512):
902 if i < 256:
903 self.assertEqual(result[i].tobytes(), reference[0])
904 else:
905 self.assertEqual(result[i].tobytes(), reference[1])