Mercurial > hg
diff contrib/python-zstandard/tests/test_decompressor.py @ 44147:5e84a96d865b
python-zstandard: blacken at 80 characters
I made this change upstream and it will make it into the next
release of python-zstandard. I figured I'd send it Mercurial's
way because it will allow us to drop this directory from the black
exclusion list.
# skip-blame blackening
Differential Revision: https://phab.mercurial-scm.org/D7937
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Wed, 22 Jan 2020 22:23:04 -0800 |
parents | de7838053207 |
children | 493034cc3265 |
line wrap: on
line diff
--- a/contrib/python-zstandard/tests/test_decompressor.py Tue Jan 21 15:45:06 2020 -0800 +++ b/contrib/python-zstandard/tests/test_decompressor.py Wed Jan 22 22:23:04 2020 -0800 @@ -170,11 +170,15 @@ dctx.decompress(compressed, max_output_size=len(source) - 1) # Input size + 1 works - decompressed = dctx.decompress(compressed, max_output_size=len(source) + 1) + decompressed = dctx.decompress( + compressed, max_output_size=len(source) + 1 + ) self.assertEqual(decompressed, source) # A much larger buffer works. - decompressed = dctx.decompress(compressed, max_output_size=len(source) * 64) + decompressed = dctx.decompress( + compressed, max_output_size=len(source) * 64 + ) self.assertEqual(decompressed, source) def test_stupidly_large_output_buffer(self): @@ -237,7 +241,8 @@ dctx = zstd.ZstdDecompressor(max_window_size=2 ** zstd.WINDOWLOG_MIN) with self.assertRaisesRegex( - zstd.ZstdError, "decompression error: Frame requires too much memory" + zstd.ZstdError, + "decompression error: Frame requires too much memory", ): dctx.decompress(frame, max_output_size=len(source)) @@ -291,7 +296,9 @@ self.assertEqual(w, len(source.getvalue())) def test_read_write_size(self): - source = OpCountingBytesIO(zstd.ZstdCompressor().compress(b"foobarfoobar")) + source = OpCountingBytesIO( + zstd.ZstdCompressor().compress(b"foobarfoobar") + ) dest = OpCountingBytesIO() dctx = zstd.ZstdDecompressor() @@ -309,7 +316,9 @@ dctx = zstd.ZstdDecompressor() with dctx.stream_reader(b"foo") as reader: - with self.assertRaisesRegex(ValueError, "cannot __enter__ multiple times"): + with self.assertRaisesRegex( + ValueError, "cannot __enter__ multiple times" + ): with reader as reader2: pass @@ -474,7 +483,9 @@ dctx = zstd.ZstdDecompressor() with dctx.stream_reader(frame) as reader: - with self.assertRaisesRegex(ValueError, "cannot seek to negative position"): + with self.assertRaisesRegex( + ValueError, "cannot seek to negative position" + ): reader.seek(-1, os.SEEK_SET) reader.read(1) @@ -490,7 +501,8 @@ reader.seek(-1, os.SEEK_CUR) with self.assertRaisesRegex( - ValueError, "zstd decompression streams cannot be seeked with SEEK_END" + ValueError, + "zstd decompression streams cannot be seeked with SEEK_END", ): reader.seek(0, os.SEEK_END) @@ -743,7 +755,9 @@ def test_read_lines(self): cctx = zstd.ZstdCompressor() - source = b"\n".join(("line %d" % i).encode("ascii") for i in range(1024)) + source = b"\n".join( + ("line %d" % i).encode("ascii") for i in range(1024) + ) frame = cctx.compress(source) @@ -821,7 +835,9 @@ dobj = dctx.decompressobj() dobj.decompress(data) - with self.assertRaisesRegex(zstd.ZstdError, "cannot use a decompressobj"): + with self.assertRaisesRegex( + zstd.ZstdError, "cannot use a decompressobj" + ): dobj.decompress(data) self.assertIsNone(dobj.flush()) @@ -1124,7 +1140,9 @@ # Buffer protocol works. dctx.read_to_iter(b"foobar") - with self.assertRaisesRegex(ValueError, "must pass an object with a read"): + with self.assertRaisesRegex( + ValueError, "must pass an object with a read" + ): b"".join(dctx.read_to_iter(True)) def test_empty_input(self): @@ -1226,7 +1244,9 @@ decompressed = b"".join(chunks) self.assertEqual(decompressed, source.getvalue()) - @unittest.skipUnless("ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set") + @unittest.skipUnless( + "ZSTD_SLOW_TESTS" in os.environ, "ZSTD_SLOW_TESTS not set" + ) def test_large_input(self): bytes = list(struct.Struct(">B").pack(i) for i in range(256)) compressed = NonClosingBytesIO() @@ -1241,13 +1261,16 @@ len(compressed.getvalue()) > zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE ) - have_raw = input_size > zstd.DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE * 2 + have_raw = ( + input_size > zstd.DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE * 2 + ) if have_compressed and have_raw: break compressed = io.BytesIO(compressed.getvalue()) self.assertGreater( - len(compressed.getvalue()), zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE + len(compressed.getvalue()), + zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE, ) dctx = zstd.ZstdDecompressor() @@ -1303,7 +1326,9 @@ self.assertEqual(streamed, source.getvalue()) def test_read_write_size(self): - source = OpCountingBytesIO(zstd.ZstdCompressor().compress(b"foobarfoobar")) + source = OpCountingBytesIO( + zstd.ZstdCompressor().compress(b"foobarfoobar") + ) dctx = zstd.ZstdDecompressor() for chunk in dctx.read_to_iter(source, read_size=1, write_size=1): self.assertEqual(len(chunk), 1) @@ -1355,10 +1380,14 @@ ): dctx.decompress_content_dict_chain([zstd.FRAME_HEADER]) - with self.assertRaisesRegex(ValueError, "chunk 0 is not a valid zstd frame"): + with self.assertRaisesRegex( + ValueError, "chunk 0 is not a valid zstd frame" + ): dctx.decompress_content_dict_chain([b"foo" * 8]) - no_size = zstd.ZstdCompressor(write_content_size=False).compress(b"foo" * 64) + no_size = zstd.ZstdCompressor(write_content_size=False).compress( + b"foo" * 64 + ) with self.assertRaisesRegex( ValueError, "chunk 0 missing content size in frame" @@ -1389,10 +1418,14 @@ ): dctx.decompress_content_dict_chain([initial, zstd.FRAME_HEADER]) - with self.assertRaisesRegex(ValueError, "chunk 1 is not a valid zstd frame"): + with self.assertRaisesRegex( + ValueError, "chunk 1 is not a valid zstd frame" + ): dctx.decompress_content_dict_chain([initial, b"foo" * 8]) - no_size = zstd.ZstdCompressor(write_content_size=False).compress(b"foo" * 64) + no_size = zstd.ZstdCompressor(write_content_size=False).compress( + b"foo" * 64 + ) with self.assertRaisesRegex( ValueError, "chunk 1 missing content size in frame" @@ -1400,7 +1433,9 @@ dctx.decompress_content_dict_chain([initial, no_size]) # Corrupt second frame. - cctx = zstd.ZstdCompressor(dict_data=zstd.ZstdCompressionDict(b"foo" * 64)) + cctx = zstd.ZstdCompressor( + dict_data=zstd.ZstdCompressionDict(b"foo" * 64) + ) frame = cctx.compress(b"bar" * 64) frame = frame[0:12] + frame[15:] @@ -1447,7 +1482,9 @@ with self.assertRaises(TypeError): dctx.multi_decompress_to_buffer((1, 2)) - with self.assertRaisesRegex(TypeError, "item 0 not a bytes like object"): + with self.assertRaisesRegex( + TypeError, "item 0 not a bytes like object" + ): dctx.multi_decompress_to_buffer([u"foo"]) with self.assertRaisesRegex( @@ -1491,7 +1528,9 @@ if not hasattr(dctx, "multi_decompress_to_buffer"): self.skipTest("multi_decompress_to_buffer not available") - result = dctx.multi_decompress_to_buffer(frames, decompressed_sizes=sizes) + result = dctx.multi_decompress_to_buffer( + frames, decompressed_sizes=sizes + ) self.assertEqual(len(result), len(frames)) self.assertEqual(result.size(), sum(map(len, original))) @@ -1582,10 +1621,15 @@ # And a manual mode. b = b"".join([frames[0].tobytes(), frames[1].tobytes()]) b1 = zstd.BufferWithSegments( - b, struct.pack("=QQQQ", 0, len(frames[0]), len(frames[0]), len(frames[1])) + b, + struct.pack( + "=QQQQ", 0, len(frames[0]), len(frames[0]), len(frames[1]) + ), ) - b = b"".join([frames[2].tobytes(), frames[3].tobytes(), frames[4].tobytes()]) + b = b"".join( + [frames[2].tobytes(), frames[3].tobytes(), frames[4].tobytes()] + ) b2 = zstd.BufferWithSegments( b, struct.pack(