--- a/contrib/python-zstandard/tests/test_compressor.py Thu Feb 09 21:44:32 2017 -0500
+++ b/contrib/python-zstandard/tests/test_compressor.py Tue Feb 07 23:24:47 2017 -0800
@@ -10,7 +10,10 @@
import zstd
-from .common import OpCountingBytesIO
+from .common import (
+ make_cffi,
+ OpCountingBytesIO,
+)
if sys.version_info[0] >= 3:
@@ -19,6 +22,7 @@
next = lambda it: it.next()
+@make_cffi
class TestCompressor(unittest.TestCase):
def test_level_bounds(self):
with self.assertRaises(ValueError):
@@ -28,18 +32,17 @@
zstd.ZstdCompressor(level=23)
+@make_cffi
class TestCompressor_compress(unittest.TestCase):
def test_compress_empty(self):
cctx = zstd.ZstdCompressor(level=1)
- cctx.compress(b'')
-
- cctx = zstd.ZstdCompressor(level=22)
- cctx.compress(b'')
-
- def test_compress_empty(self):
- cctx = zstd.ZstdCompressor(level=1)
- self.assertEqual(cctx.compress(b''),
- b'\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00')
+ result = cctx.compress(b'')
+ self.assertEqual(result, b'\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00')
+ params = zstd.get_frame_parameters(result)
+ self.assertEqual(params.content_size, 0)
+ self.assertEqual(params.window_size, 524288)
+ self.assertEqual(params.dict_id, 0)
+ self.assertFalse(params.has_checksum, 0)
# TODO should be temporary until https://github.com/facebook/zstd/issues/506
# is fixed.
@@ -59,6 +62,13 @@
self.assertEqual(len(result), 999)
self.assertEqual(result[0:4], b'\x28\xb5\x2f\xfd')
+ # This matches the test for read_from() below.
+ cctx = zstd.ZstdCompressor(level=1)
+ result = cctx.compress(b'f' * zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE + b'o')
+ self.assertEqual(result, b'\x28\xb5\x2f\xfd\x00\x40\x54\x00\x00'
+ b'\x10\x66\x66\x01\x00\xfb\xff\x39\xc0'
+ b'\x02\x09\x00\x00\x6f')
+
def test_write_checksum(self):
cctx = zstd.ZstdCompressor(level=1)
no_checksum = cctx.compress(b'foobar')
@@ -67,6 +77,12 @@
self.assertEqual(len(with_checksum), len(no_checksum) + 4)
+ no_params = zstd.get_frame_parameters(no_checksum)
+ with_params = zstd.get_frame_parameters(with_checksum)
+
+ self.assertFalse(no_params.has_checksum)
+ self.assertTrue(with_params.has_checksum)
+
def test_write_content_size(self):
cctx = zstd.ZstdCompressor(level=1)
no_size = cctx.compress(b'foobar' * 256)
@@ -75,6 +91,11 @@
self.assertEqual(len(with_size), len(no_size) + 1)
+ no_params = zstd.get_frame_parameters(no_size)
+ with_params = zstd.get_frame_parameters(with_size)
+ self.assertEqual(no_params.content_size, 0)
+ self.assertEqual(with_params.content_size, 1536)
+
def test_no_dict_id(self):
samples = []
for i in range(128):
@@ -92,6 +113,11 @@
self.assertEqual(len(with_dict_id), len(no_dict_id) + 4)
+ no_params = zstd.get_frame_parameters(no_dict_id)
+ with_params = zstd.get_frame_parameters(with_dict_id)
+ self.assertEqual(no_params.dict_id, 0)
+ self.assertEqual(with_params.dict_id, 1584102229)
+
def test_compress_dict_multiple(self):
samples = []
for i in range(128):
@@ -107,6 +133,7 @@
cctx.compress(b'foo bar foobar foo bar foobar')
+@make_cffi
class TestCompressor_compressobj(unittest.TestCase):
def test_compressobj_empty(self):
cctx = zstd.ZstdCompressor(level=1)
@@ -127,6 +154,12 @@
self.assertEqual(len(result), 999)
self.assertEqual(result[0:4], b'\x28\xb5\x2f\xfd')
+ params = zstd.get_frame_parameters(result)
+ self.assertEqual(params.content_size, 0)
+ self.assertEqual(params.window_size, 1048576)
+ self.assertEqual(params.dict_id, 0)
+ self.assertFalse(params.has_checksum)
+
def test_write_checksum(self):
cctx = zstd.ZstdCompressor(level=1)
cobj = cctx.compressobj()
@@ -135,6 +168,15 @@
cobj = cctx.compressobj()
with_checksum = cobj.compress(b'foobar') + cobj.flush()
+ no_params = zstd.get_frame_parameters(no_checksum)
+ with_params = zstd.get_frame_parameters(with_checksum)
+ self.assertEqual(no_params.content_size, 0)
+ self.assertEqual(with_params.content_size, 0)
+ self.assertEqual(no_params.dict_id, 0)
+ self.assertEqual(with_params.dict_id, 0)
+ self.assertFalse(no_params.has_checksum)
+ self.assertTrue(with_params.has_checksum)
+
self.assertEqual(len(with_checksum), len(no_checksum) + 4)
def test_write_content_size(self):
@@ -145,6 +187,15 @@
cobj = cctx.compressobj(size=len(b'foobar' * 256))
with_size = cobj.compress(b'foobar' * 256) + cobj.flush()
+ no_params = zstd.get_frame_parameters(no_size)
+ with_params = zstd.get_frame_parameters(with_size)
+ self.assertEqual(no_params.content_size, 0)
+ self.assertEqual(with_params.content_size, 1536)
+ self.assertEqual(no_params.dict_id, 0)
+ self.assertEqual(with_params.dict_id, 0)
+ self.assertFalse(no_params.has_checksum)
+ self.assertFalse(with_params.has_checksum)
+
self.assertEqual(len(with_size), len(no_size) + 1)
def test_compress_after_finished(self):
@@ -187,6 +238,7 @@
self.assertEqual(header, b'\x01\x00\x00')
+@make_cffi
class TestCompressor_copy_stream(unittest.TestCase):
def test_no_read(self):
source = object()
@@ -229,6 +281,12 @@
self.assertEqual(r, 255 * 16384)
self.assertEqual(w, 999)
+ params = zstd.get_frame_parameters(dest.getvalue())
+ self.assertEqual(params.content_size, 0)
+ self.assertEqual(params.window_size, 1048576)
+ self.assertEqual(params.dict_id, 0)
+ self.assertFalse(params.has_checksum)
+
def test_write_checksum(self):
source = io.BytesIO(b'foobar')
no_checksum = io.BytesIO()
@@ -244,6 +302,15 @@
self.assertEqual(len(with_checksum.getvalue()),
len(no_checksum.getvalue()) + 4)
+ no_params = zstd.get_frame_parameters(no_checksum.getvalue())
+ with_params = zstd.get_frame_parameters(with_checksum.getvalue())
+ self.assertEqual(no_params.content_size, 0)
+ self.assertEqual(with_params.content_size, 0)
+ self.assertEqual(no_params.dict_id, 0)
+ self.assertEqual(with_params.dict_id, 0)
+ self.assertFalse(no_params.has_checksum)
+ self.assertTrue(with_params.has_checksum)
+
def test_write_content_size(self):
source = io.BytesIO(b'foobar' * 256)
no_size = io.BytesIO()
@@ -268,6 +335,15 @@
self.assertEqual(len(with_size.getvalue()),
len(no_size.getvalue()) + 1)
+ no_params = zstd.get_frame_parameters(no_size.getvalue())
+ with_params = zstd.get_frame_parameters(with_size.getvalue())
+ self.assertEqual(no_params.content_size, 0)
+ self.assertEqual(with_params.content_size, 1536)
+ self.assertEqual(no_params.dict_id, 0)
+ self.assertEqual(with_params.dict_id, 0)
+ self.assertFalse(no_params.has_checksum)
+ self.assertFalse(with_params.has_checksum)
+
def test_read_write_size(self):
source = OpCountingBytesIO(b'foobarfoobar')
dest = OpCountingBytesIO()
@@ -288,18 +364,25 @@
return buffer.getvalue()
+@make_cffi
class TestCompressor_write_to(unittest.TestCase):
def test_empty(self):
- self.assertEqual(compress(b'', 1),
- b'\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00')
+ result = compress(b'', 1)
+ self.assertEqual(result, b'\x28\xb5\x2f\xfd\x00\x48\x01\x00\x00')
+
+ params = zstd.get_frame_parameters(result)
+ self.assertEqual(params.content_size, 0)
+ self.assertEqual(params.window_size, 524288)
+ self.assertEqual(params.dict_id, 0)
+ self.assertFalse(params.has_checksum)
def test_multiple_compress(self):
buffer = io.BytesIO()
cctx = zstd.ZstdCompressor(level=5)
with cctx.write_to(buffer) as compressor:
- compressor.write(b'foo')
- compressor.write(b'bar')
- compressor.write(b'x' * 8192)
+ self.assertEqual(compressor.write(b'foo'), 0)
+ self.assertEqual(compressor.write(b'bar'), 0)
+ self.assertEqual(compressor.write(b'x' * 8192), 0)
result = buffer.getvalue()
self.assertEqual(result,
@@ -318,11 +401,23 @@
buffer = io.BytesIO()
cctx = zstd.ZstdCompressor(level=9, dict_data=d)
with cctx.write_to(buffer) as compressor:
- compressor.write(b'foo')
- compressor.write(b'bar')
- compressor.write(b'foo' * 16384)
+ self.assertEqual(compressor.write(b'foo'), 0)
+ self.assertEqual(compressor.write(b'bar'), 0)
+ self.assertEqual(compressor.write(b'foo' * 16384), 634)
compressed = buffer.getvalue()
+
+ params = zstd.get_frame_parameters(compressed)
+ self.assertEqual(params.content_size, 0)
+ self.assertEqual(params.window_size, 1024)
+ self.assertEqual(params.dict_id, d.dict_id())
+ self.assertFalse(params.has_checksum)
+
+ self.assertEqual(compressed[0:32],
+ b'\x28\xb5\x2f\xfd\x03\x00\x55\x7b\x6b\x5e\x54\x00'
+ b'\x00\x00\x02\xfc\xf4\xa5\xba\x23\x3f\x85\xb3\x54'
+ b'\x00\x00\x18\x6f\x6f\x66\x01\x00')
+
h = hashlib.sha1(compressed).hexdigest()
self.assertEqual(h, '1c5bcd25181bcd8c1a73ea8773323e0056129f92')
@@ -332,11 +427,18 @@
buffer = io.BytesIO()
cctx = zstd.ZstdCompressor(compression_params=params)
with cctx.write_to(buffer) as compressor:
- compressor.write(b'foo')
- compressor.write(b'bar')
- compressor.write(b'foobar' * 16384)
+ self.assertEqual(compressor.write(b'foo'), 0)
+ self.assertEqual(compressor.write(b'bar'), 0)
+ self.assertEqual(compressor.write(b'foobar' * 16384), 0)
compressed = buffer.getvalue()
+
+ params = zstd.get_frame_parameters(compressed)
+ self.assertEqual(params.content_size, 0)
+ self.assertEqual(params.window_size, 1048576)
+ self.assertEqual(params.dict_id, 0)
+ self.assertFalse(params.has_checksum)
+
h = hashlib.sha1(compressed).hexdigest()
self.assertEqual(h, '1ae31f270ed7de14235221a604b31ecd517ebd99')
@@ -344,12 +446,21 @@
no_checksum = io.BytesIO()
cctx = zstd.ZstdCompressor(level=1)
with cctx.write_to(no_checksum) as compressor:
- compressor.write(b'foobar')
+ self.assertEqual(compressor.write(b'foobar'), 0)
with_checksum = io.BytesIO()
cctx = zstd.ZstdCompressor(level=1, write_checksum=True)
with cctx.write_to(with_checksum) as compressor:
- compressor.write(b'foobar')
+ self.assertEqual(compressor.write(b'foobar'), 0)
+
+ no_params = zstd.get_frame_parameters(no_checksum.getvalue())
+ with_params = zstd.get_frame_parameters(with_checksum.getvalue())
+ self.assertEqual(no_params.content_size, 0)
+ self.assertEqual(with_params.content_size, 0)
+ self.assertEqual(no_params.dict_id, 0)
+ self.assertEqual(with_params.dict_id, 0)
+ self.assertFalse(no_params.has_checksum)
+ self.assertTrue(with_params.has_checksum)
self.assertEqual(len(with_checksum.getvalue()),
len(no_checksum.getvalue()) + 4)
@@ -358,12 +469,12 @@
no_size = io.BytesIO()
cctx = zstd.ZstdCompressor(level=1)
with cctx.write_to(no_size) as compressor:
- compressor.write(b'foobar' * 256)
+ self.assertEqual(compressor.write(b'foobar' * 256), 0)
with_size = io.BytesIO()
cctx = zstd.ZstdCompressor(level=1, write_content_size=True)
with cctx.write_to(with_size) as compressor:
- compressor.write(b'foobar' * 256)
+ self.assertEqual(compressor.write(b'foobar' * 256), 0)
# Source size is not known in streaming mode, so header not
# written.
@@ -373,7 +484,16 @@
# Declaring size will write the header.
with_size = io.BytesIO()
with cctx.write_to(with_size, size=len(b'foobar' * 256)) as compressor:
- compressor.write(b'foobar' * 256)
+ self.assertEqual(compressor.write(b'foobar' * 256), 0)
+
+ no_params = zstd.get_frame_parameters(no_size.getvalue())
+ with_params = zstd.get_frame_parameters(with_size.getvalue())
+ self.assertEqual(no_params.content_size, 0)
+ self.assertEqual(with_params.content_size, 1536)
+ self.assertEqual(no_params.dict_id, 0)
+ self.assertEqual(with_params.dict_id, 0)
+ self.assertFalse(no_params.has_checksum)
+ self.assertFalse(with_params.has_checksum)
self.assertEqual(len(with_size.getvalue()),
len(no_size.getvalue()) + 1)
@@ -390,12 +510,21 @@
with_dict_id = io.BytesIO()
cctx = zstd.ZstdCompressor(level=1, dict_data=d)
with cctx.write_to(with_dict_id) as compressor:
- compressor.write(b'foobarfoobar')
+ self.assertEqual(compressor.write(b'foobarfoobar'), 0)
cctx = zstd.ZstdCompressor(level=1, dict_data=d, write_dict_id=False)
no_dict_id = io.BytesIO()
with cctx.write_to(no_dict_id) as compressor:
- compressor.write(b'foobarfoobar')
+ self.assertEqual(compressor.write(b'foobarfoobar'), 0)
+
+ no_params = zstd.get_frame_parameters(no_dict_id.getvalue())
+ with_params = zstd.get_frame_parameters(with_dict_id.getvalue())
+ self.assertEqual(no_params.content_size, 0)
+ self.assertEqual(with_params.content_size, 0)
+ self.assertEqual(no_params.dict_id, 0)
+ self.assertEqual(with_params.dict_id, d.dict_id())
+ self.assertFalse(no_params.has_checksum)
+ self.assertFalse(with_params.has_checksum)
self.assertEqual(len(with_dict_id.getvalue()),
len(no_dict_id.getvalue()) + 4)
@@ -412,9 +541,9 @@
cctx = zstd.ZstdCompressor(level=3)
dest = OpCountingBytesIO()
with cctx.write_to(dest, write_size=1) as compressor:
- compressor.write(b'foo')
- compressor.write(b'bar')
- compressor.write(b'foobar')
+ self.assertEqual(compressor.write(b'foo'), 0)
+ self.assertEqual(compressor.write(b'bar'), 0)
+ self.assertEqual(compressor.write(b'foobar'), 0)
self.assertEqual(len(dest.getvalue()), dest._write_count)
@@ -422,15 +551,15 @@
cctx = zstd.ZstdCompressor(level=3)
dest = OpCountingBytesIO()
with cctx.write_to(dest) as compressor:
- compressor.write(b'foo')
+ self.assertEqual(compressor.write(b'foo'), 0)
self.assertEqual(dest._write_count, 0)
- compressor.flush()
+ self.assertEqual(compressor.flush(), 12)
self.assertEqual(dest._write_count, 1)
- compressor.write(b'bar')
+ self.assertEqual(compressor.write(b'bar'), 0)
self.assertEqual(dest._write_count, 1)
- compressor.flush()
+ self.assertEqual(compressor.flush(), 6)
self.assertEqual(dest._write_count, 2)
- compressor.write(b'baz')
+ self.assertEqual(compressor.write(b'baz'), 0)
self.assertEqual(dest._write_count, 3)
@@ -438,10 +567,10 @@
cctx = zstd.ZstdCompressor(level=3, write_checksum=True)
dest = OpCountingBytesIO()
with cctx.write_to(dest) as compressor:
- compressor.write(b'foobar' * 8192)
+ self.assertEqual(compressor.write(b'foobar' * 8192), 0)
count = dest._write_count
offset = dest.tell()
- compressor.flush()
+ self.assertEqual(compressor.flush(), 23)
self.assertGreater(dest._write_count, count)
self.assertGreater(dest.tell(), offset)
offset = dest.tell()
@@ -456,18 +585,22 @@
self.assertEqual(header, b'\x01\x00\x00')
+@make_cffi
class TestCompressor_read_from(unittest.TestCase):
def test_type_validation(self):
cctx = zstd.ZstdCompressor()
# Object with read() works.
- cctx.read_from(io.BytesIO())
+ for chunk in cctx.read_from(io.BytesIO()):
+ pass
# Buffer protocol works.
- cctx.read_from(b'foobar')
+ for chunk in cctx.read_from(b'foobar'):
+ pass
with self.assertRaisesRegexp(ValueError, 'must pass an object with a read'):
- cctx.read_from(True)
+ for chunk in cctx.read_from(True):
+ pass
def test_read_empty(self):
cctx = zstd.ZstdCompressor(level=1)
@@ -521,6 +654,12 @@
# We should get the same output as the one-shot compression mechanism.
self.assertEqual(b''.join(chunks), cctx.compress(source.getvalue()))
+ params = zstd.get_frame_parameters(b''.join(chunks))
+ self.assertEqual(params.content_size, 0)
+ self.assertEqual(params.window_size, 262144)
+ self.assertEqual(params.dict_id, 0)
+ self.assertFalse(params.has_checksum)
+
# Now check the buffer protocol.
it = cctx.read_from(source.getvalue())
chunks = list(it)