contrib/python-zstandard/tests/test_decompressor.py
changeset 42070 675775c33ab6
parent 40122 73fef626dae3
child 43999 de7838053207
--- a/contrib/python-zstandard/tests/test_decompressor.py	Thu Apr 04 15:24:03 2019 -0700
+++ b/contrib/python-zstandard/tests/test_decompressor.py	Thu Apr 04 17:34:43 2019 -0700
@@ -3,6 +3,7 @@
 import random
 import struct
 import sys
+import tempfile
 import unittest
 
 import zstandard as zstd
@@ -10,6 +11,7 @@
 from .common import (
     generate_samples,
     make_cffi,
+    NonClosingBytesIO,
     OpCountingBytesIO,
 )
 
@@ -219,7 +221,7 @@
         cctx = zstd.ZstdCompressor(write_content_size=False)
         frame = cctx.compress(source)
 
-        dctx = zstd.ZstdDecompressor(max_window_size=1)
+        dctx = zstd.ZstdDecompressor(max_window_size=2**zstd.WINDOWLOG_MIN)
 
         with self.assertRaisesRegexp(
             zstd.ZstdError, 'decompression error: Frame requires too much memory'):
@@ -302,19 +304,16 @@
         dctx = zstd.ZstdDecompressor()
 
         with dctx.stream_reader(b'foo') as reader:
-            with self.assertRaises(NotImplementedError):
+            with self.assertRaises(io.UnsupportedOperation):
                 reader.readline()
 
-            with self.assertRaises(NotImplementedError):
+            with self.assertRaises(io.UnsupportedOperation):
                 reader.readlines()
 
-            with self.assertRaises(NotImplementedError):
-                reader.readall()
-
-            with self.assertRaises(NotImplementedError):
+            with self.assertRaises(io.UnsupportedOperation):
                 iter(reader)
 
-            with self.assertRaises(NotImplementedError):
+            with self.assertRaises(io.UnsupportedOperation):
                 next(reader)
 
             with self.assertRaises(io.UnsupportedOperation):
@@ -347,15 +346,18 @@
             with self.assertRaisesRegexp(ValueError, 'stream is closed'):
                 reader.read(1)
 
-    def test_bad_read_size(self):
+    def test_read_sizes(self):
+        cctx = zstd.ZstdCompressor()
+        foo = cctx.compress(b'foo')
+
         dctx = zstd.ZstdDecompressor()
 
-        with dctx.stream_reader(b'foo') as reader:
-            with self.assertRaisesRegexp(ValueError, 'cannot read negative or size 0 amounts'):
-                reader.read(-1)
+        with dctx.stream_reader(foo) as reader:
+            with self.assertRaisesRegexp(ValueError, 'cannot read negative amounts less than -1'):
+                reader.read(-2)
 
-            with self.assertRaisesRegexp(ValueError, 'cannot read negative or size 0 amounts'):
-                reader.read(0)
+            self.assertEqual(reader.read(0), b'')
+            self.assertEqual(reader.read(), b'foo')
 
     def test_read_buffer(self):
         cctx = zstd.ZstdCompressor()
@@ -524,13 +526,243 @@
         reader = dctx.stream_reader(source)
 
         with reader:
-            with self.assertRaises(TypeError):
-                reader.read()
+            reader.read(0)
 
         with reader:
             with self.assertRaisesRegexp(ValueError, 'stream is closed'):
                 reader.read(100)
 
+    def test_partial_read(self):
+        # Inspired by https://github.com/indygreg/python-zstandard/issues/71.
+        buffer = io.BytesIO()
+        cctx = zstd.ZstdCompressor()
+        writer = cctx.stream_writer(buffer)
+        writer.write(bytearray(os.urandom(1000000)))
+        writer.flush(zstd.FLUSH_FRAME)
+        buffer.seek(0)
+
+        dctx = zstd.ZstdDecompressor()
+        reader = dctx.stream_reader(buffer)
+
+        while True:
+            chunk = reader.read(8192)
+            if not chunk:
+                break
+
+    def test_read_multiple_frames(self):
+        cctx = zstd.ZstdCompressor()
+        source = io.BytesIO()
+        writer = cctx.stream_writer(source)
+        writer.write(b'foo')
+        writer.flush(zstd.FLUSH_FRAME)
+        writer.write(b'bar')
+        writer.flush(zstd.FLUSH_FRAME)
+
+        dctx = zstd.ZstdDecompressor()
+
+        reader = dctx.stream_reader(source.getvalue())
+        self.assertEqual(reader.read(2), b'fo')
+        self.assertEqual(reader.read(2), b'o')
+        self.assertEqual(reader.read(2), b'ba')
+        self.assertEqual(reader.read(2), b'r')
+
+        source.seek(0)
+        reader = dctx.stream_reader(source)
+        self.assertEqual(reader.read(2), b'fo')
+        self.assertEqual(reader.read(2), b'o')
+        self.assertEqual(reader.read(2), b'ba')
+        self.assertEqual(reader.read(2), b'r')
+
+        reader = dctx.stream_reader(source.getvalue())
+        self.assertEqual(reader.read(3), b'foo')
+        self.assertEqual(reader.read(3), b'bar')
+
+        source.seek(0)
+        reader = dctx.stream_reader(source)
+        self.assertEqual(reader.read(3), b'foo')
+        self.assertEqual(reader.read(3), b'bar')
+
+        reader = dctx.stream_reader(source.getvalue())
+        self.assertEqual(reader.read(4), b'foo')
+        self.assertEqual(reader.read(4), b'bar')
+
+        source.seek(0)
+        reader = dctx.stream_reader(source)
+        self.assertEqual(reader.read(4), b'foo')
+        self.assertEqual(reader.read(4), b'bar')
+
+        reader = dctx.stream_reader(source.getvalue())
+        self.assertEqual(reader.read(128), b'foo')
+        self.assertEqual(reader.read(128), b'bar')
+
+        source.seek(0)
+        reader = dctx.stream_reader(source)
+        self.assertEqual(reader.read(128), b'foo')
+        self.assertEqual(reader.read(128), b'bar')
+
+        # Now tests for reads spanning frames.
+        reader = dctx.stream_reader(source.getvalue(), read_across_frames=True)
+        self.assertEqual(reader.read(3), b'foo')
+        self.assertEqual(reader.read(3), b'bar')
+
+        source.seek(0)
+        reader = dctx.stream_reader(source, read_across_frames=True)
+        self.assertEqual(reader.read(3), b'foo')
+        self.assertEqual(reader.read(3), b'bar')
+
+        reader = dctx.stream_reader(source.getvalue(), read_across_frames=True)
+        self.assertEqual(reader.read(6), b'foobar')
+
+        source.seek(0)
+        reader = dctx.stream_reader(source, read_across_frames=True)
+        self.assertEqual(reader.read(6), b'foobar')
+
+        reader = dctx.stream_reader(source.getvalue(), read_across_frames=True)
+        self.assertEqual(reader.read(7), b'foobar')
+
+        source.seek(0)
+        reader = dctx.stream_reader(source, read_across_frames=True)
+        self.assertEqual(reader.read(7), b'foobar')
+
+        reader = dctx.stream_reader(source.getvalue(), read_across_frames=True)
+        self.assertEqual(reader.read(128), b'foobar')
+
+        source.seek(0)
+        reader = dctx.stream_reader(source, read_across_frames=True)
+        self.assertEqual(reader.read(128), b'foobar')
+
+    def test_readinto(self):
+        cctx = zstd.ZstdCompressor()
+        foo = cctx.compress(b'foo')
+
+        dctx = zstd.ZstdDecompressor()
+
+        # Attempting to readinto() a non-writable buffer fails.
+        # The exact exception varies based on the backend.
+        reader = dctx.stream_reader(foo)
+        with self.assertRaises(Exception):
+            reader.readinto(b'foobar')
+
+        # readinto() with sufficiently large destination.
+        b = bytearray(1024)
+        reader = dctx.stream_reader(foo)
+        self.assertEqual(reader.readinto(b), 3)
+        self.assertEqual(b[0:3], b'foo')
+        self.assertEqual(reader.readinto(b), 0)
+        self.assertEqual(b[0:3], b'foo')
+
+        # readinto() with small reads.
+        b = bytearray(1024)
+        reader = dctx.stream_reader(foo, read_size=1)
+        self.assertEqual(reader.readinto(b), 3)
+        self.assertEqual(b[0:3], b'foo')
+
+        # Too small destination buffer.
+        b = bytearray(2)
+        reader = dctx.stream_reader(foo)
+        self.assertEqual(reader.readinto(b), 2)
+        self.assertEqual(b[:], b'fo')
+
+    def test_readinto1(self):
+        cctx = zstd.ZstdCompressor()
+        foo = cctx.compress(b'foo')
+
+        dctx = zstd.ZstdDecompressor()
+
+        reader = dctx.stream_reader(foo)
+        with self.assertRaises(Exception):
+            reader.readinto1(b'foobar')
+
+        # Sufficiently large destination.
+        b = bytearray(1024)
+        reader = dctx.stream_reader(foo)
+        self.assertEqual(reader.readinto1(b), 3)
+        self.assertEqual(b[0:3], b'foo')
+        self.assertEqual(reader.readinto1(b), 0)
+        self.assertEqual(b[0:3], b'foo')
+
+        # readinto() with small reads.
+        b = bytearray(1024)
+        reader = dctx.stream_reader(foo, read_size=1)
+        self.assertEqual(reader.readinto1(b), 3)
+        self.assertEqual(b[0:3], b'foo')
+
+        # Too small destination buffer.
+        b = bytearray(2)
+        reader = dctx.stream_reader(foo)
+        self.assertEqual(reader.readinto1(b), 2)
+        self.assertEqual(b[:], b'fo')
+
+    def test_readall(self):
+        cctx = zstd.ZstdCompressor()
+        foo = cctx.compress(b'foo')
+
+        dctx = zstd.ZstdDecompressor()
+        reader = dctx.stream_reader(foo)
+
+        self.assertEqual(reader.readall(), b'foo')
+
+    def test_read1(self):
+        cctx = zstd.ZstdCompressor()
+        foo = cctx.compress(b'foo')
+
+        dctx = zstd.ZstdDecompressor()
+
+        b = OpCountingBytesIO(foo)
+        reader = dctx.stream_reader(b)
+
+        self.assertEqual(reader.read1(), b'foo')
+        self.assertEqual(b._read_count, 1)
+
+        b = OpCountingBytesIO(foo)
+        reader = dctx.stream_reader(b)
+
+        self.assertEqual(reader.read1(0), b'')
+        self.assertEqual(reader.read1(2), b'fo')
+        self.assertEqual(b._read_count, 1)
+        self.assertEqual(reader.read1(1), b'o')
+        self.assertEqual(b._read_count, 1)
+        self.assertEqual(reader.read1(1), b'')
+        self.assertEqual(b._read_count, 2)
+
+    def test_read_lines(self):
+        cctx = zstd.ZstdCompressor()
+        source = b'\n'.join(('line %d' % i).encode('ascii') for i in range(1024))
+
+        frame = cctx.compress(source)
+
+        dctx = zstd.ZstdDecompressor()
+        reader = dctx.stream_reader(frame)
+        tr = io.TextIOWrapper(reader, encoding='utf-8')
+
+        lines = []
+        for line in tr:
+            lines.append(line.encode('utf-8'))
+
+        self.assertEqual(len(lines), 1024)
+        self.assertEqual(b''.join(lines), source)
+
+        reader = dctx.stream_reader(frame)
+        tr = io.TextIOWrapper(reader, encoding='utf-8')
+
+        lines = tr.readlines()
+        self.assertEqual(len(lines), 1024)
+        self.assertEqual(''.join(lines).encode('utf-8'), source)
+
+        reader = dctx.stream_reader(frame)
+        tr = io.TextIOWrapper(reader, encoding='utf-8')
+
+        lines = []
+        while True:
+            line = tr.readline()
+            if not line:
+                break
+
+            lines.append(line.encode('utf-8'))
+
+        self.assertEqual(len(lines), 1024)
+        self.assertEqual(b''.join(lines), source)
+
 
 @make_cffi
 class TestDecompressor_decompressobj(unittest.TestCase):
@@ -540,6 +772,9 @@
         dctx = zstd.ZstdDecompressor()
         dobj = dctx.decompressobj()
         self.assertEqual(dobj.decompress(data), b'foobar')
+        self.assertIsNone(dobj.flush())
+        self.assertIsNone(dobj.flush(10))
+        self.assertIsNone(dobj.flush(length=100))
 
     def test_input_types(self):
         compressed = zstd.ZstdCompressor(level=1).compress(b'foo')
@@ -557,7 +792,11 @@
 
         for source in sources:
             dobj = dctx.decompressobj()
+            self.assertIsNone(dobj.flush())
+            self.assertIsNone(dobj.flush(10))
+            self.assertIsNone(dobj.flush(length=100))
             self.assertEqual(dobj.decompress(source), b'foo')
+            self.assertIsNone(dobj.flush())
 
     def test_reuse(self):
         data = zstd.ZstdCompressor(level=1).compress(b'foobar')
@@ -568,6 +807,7 @@
 
         with self.assertRaisesRegexp(zstd.ZstdError, 'cannot use a decompressobj'):
             dobj.decompress(data)
+            self.assertIsNone(dobj.flush())
 
     def test_bad_write_size(self):
         dctx = zstd.ZstdDecompressor()
@@ -585,16 +825,141 @@
             dobj = dctx.decompressobj(write_size=i + 1)
             self.assertEqual(dobj.decompress(data), source)
 
+
 def decompress_via_writer(data):
     buffer = io.BytesIO()
     dctx = zstd.ZstdDecompressor()
-    with dctx.stream_writer(buffer) as decompressor:
-        decompressor.write(data)
+    decompressor = dctx.stream_writer(buffer)
+    decompressor.write(data)
+
     return buffer.getvalue()
 
 
 @make_cffi
 class TestDecompressor_stream_writer(unittest.TestCase):
+    def test_io_api(self):
+        buffer = io.BytesIO()
+        dctx = zstd.ZstdDecompressor()
+        writer = dctx.stream_writer(buffer)
+
+        self.assertFalse(writer.closed)
+        self.assertFalse(writer.isatty())
+        self.assertFalse(writer.readable())
+
+        with self.assertRaises(io.UnsupportedOperation):
+            writer.readline()
+
+        with self.assertRaises(io.UnsupportedOperation):
+            writer.readline(42)
+
+        with self.assertRaises(io.UnsupportedOperation):
+            writer.readline(size=42)
+
+        with self.assertRaises(io.UnsupportedOperation):
+            writer.readlines()
+
+        with self.assertRaises(io.UnsupportedOperation):
+            writer.readlines(42)
+
+        with self.assertRaises(io.UnsupportedOperation):
+            writer.readlines(hint=42)
+
+        with self.assertRaises(io.UnsupportedOperation):
+            writer.seek(0)
+
+        with self.assertRaises(io.UnsupportedOperation):
+            writer.seek(10, os.SEEK_SET)
+
+        self.assertFalse(writer.seekable())
+
+        with self.assertRaises(io.UnsupportedOperation):
+            writer.tell()
+
+        with self.assertRaises(io.UnsupportedOperation):
+            writer.truncate()
+
+        with self.assertRaises(io.UnsupportedOperation):
+            writer.truncate(42)
+
+        with self.assertRaises(io.UnsupportedOperation):
+            writer.truncate(size=42)
+
+        self.assertTrue(writer.writable())
+
+        with self.assertRaises(io.UnsupportedOperation):
+            writer.writelines([])
+
+        with self.assertRaises(io.UnsupportedOperation):
+            writer.read()
+
+        with self.assertRaises(io.UnsupportedOperation):
+            writer.read(42)
+
+        with self.assertRaises(io.UnsupportedOperation):
+            writer.read(size=42)
+
+        with self.assertRaises(io.UnsupportedOperation):
+            writer.readall()
+
+        with self.assertRaises(io.UnsupportedOperation):
+            writer.readinto(None)
+
+        with self.assertRaises(io.UnsupportedOperation):
+            writer.fileno()
+
+    def test_fileno_file(self):
+        with tempfile.TemporaryFile('wb') as tf:
+            dctx = zstd.ZstdDecompressor()
+            writer = dctx.stream_writer(tf)
+
+            self.assertEqual(writer.fileno(), tf.fileno())
+
+    def test_close(self):
+        foo = zstd.ZstdCompressor().compress(b'foo')
+
+        buffer = NonClosingBytesIO()
+        dctx = zstd.ZstdDecompressor()
+        writer = dctx.stream_writer(buffer)
+
+        writer.write(foo)
+        self.assertFalse(writer.closed)
+        self.assertFalse(buffer.closed)
+        writer.close()
+        self.assertTrue(writer.closed)
+        self.assertTrue(buffer.closed)
+
+        with self.assertRaisesRegexp(ValueError, 'stream is closed'):
+            writer.write(b'')
+
+        with self.assertRaisesRegexp(ValueError, 'stream is closed'):
+            writer.flush()
+
+        with self.assertRaisesRegexp(ValueError, 'stream is closed'):
+            with writer:
+                pass
+
+        self.assertEqual(buffer.getvalue(), b'foo')
+
+        # Context manager exit should close stream.
+        buffer = NonClosingBytesIO()
+        writer = dctx.stream_writer(buffer)
+
+        with writer:
+            writer.write(foo)
+
+        self.assertTrue(writer.closed)
+        self.assertEqual(buffer.getvalue(), b'foo')
+
+    def test_flush(self):
+        buffer = OpCountingBytesIO()
+        dctx = zstd.ZstdDecompressor()
+        writer = dctx.stream_writer(buffer)
+
+        writer.flush()
+        self.assertEqual(buffer._flush_count, 1)
+        writer.flush()
+        self.assertEqual(buffer._flush_count, 2)
+
     def test_empty_roundtrip(self):
         cctx = zstd.ZstdCompressor()
         empty = cctx.compress(b'')
@@ -616,9 +981,21 @@
         dctx = zstd.ZstdDecompressor()
         for source in sources:
             buffer = io.BytesIO()
+
+            decompressor = dctx.stream_writer(buffer)
+            decompressor.write(source)
+            self.assertEqual(buffer.getvalue(), b'foo')
+
+            buffer = NonClosingBytesIO()
+
             with dctx.stream_writer(buffer) as decompressor:
-                decompressor.write(source)
+                self.assertEqual(decompressor.write(source), 3)
+
+            self.assertEqual(buffer.getvalue(), b'foo')
 
+            buffer = io.BytesIO()
+            writer = dctx.stream_writer(buffer, write_return_read=True)
+            self.assertEqual(writer.write(source), len(source))
             self.assertEqual(buffer.getvalue(), b'foo')
 
     def test_large_roundtrip(self):
@@ -641,7 +1018,7 @@
         cctx = zstd.ZstdCompressor()
         compressed = cctx.compress(orig)
 
-        buffer = io.BytesIO()
+        buffer = NonClosingBytesIO()
         dctx = zstd.ZstdDecompressor()
         with dctx.stream_writer(buffer) as decompressor:
             pos = 0
@@ -651,6 +1028,17 @@
                 pos += 8192
         self.assertEqual(buffer.getvalue(), orig)
 
+        # Again with write_return_read=True
+        buffer = io.BytesIO()
+        writer = dctx.stream_writer(buffer, write_return_read=True)
+        pos = 0
+        while pos < len(compressed):
+            pos2 = pos + 8192
+            chunk = compressed[pos:pos2]
+            self.assertEqual(writer.write(chunk), len(chunk))
+            pos += 8192
+        self.assertEqual(buffer.getvalue(), orig)
+
     def test_dictionary(self):
         samples = []
         for i in range(128):
@@ -661,7 +1049,7 @@
         d = zstd.train_dictionary(8192, samples)
 
         orig = b'foobar' * 16384
-        buffer = io.BytesIO()
+        buffer = NonClosingBytesIO()
         cctx = zstd.ZstdCompressor(dict_data=d)
         with cctx.stream_writer(buffer) as compressor:
             self.assertEqual(compressor.write(orig), 0)
@@ -670,6 +1058,12 @@
         buffer = io.BytesIO()
 
         dctx = zstd.ZstdDecompressor(dict_data=d)
+        decompressor = dctx.stream_writer(buffer)
+        self.assertEqual(decompressor.write(compressed), len(orig))
+        self.assertEqual(buffer.getvalue(), orig)
+
+        buffer = NonClosingBytesIO()
+
         with dctx.stream_writer(buffer) as decompressor:
             self.assertEqual(decompressor.write(compressed), len(orig))
 
@@ -678,6 +1072,11 @@
     def test_memory_size(self):
         dctx = zstd.ZstdDecompressor()
         buffer = io.BytesIO()
+
+        decompressor = dctx.stream_writer(buffer)
+        size = decompressor.memory_size()
+        self.assertGreater(size, 100000)
+
         with dctx.stream_writer(buffer) as decompressor:
             size = decompressor.memory_size()
 
@@ -810,7 +1209,7 @@
     @unittest.skipUnless('ZSTD_SLOW_TESTS' in os.environ, 'ZSTD_SLOW_TESTS not set')
     def test_large_input(self):
         bytes = list(struct.Struct('>B').pack(i) for i in range(256))
-        compressed = io.BytesIO()
+        compressed = NonClosingBytesIO()
         input_size = 0
         cctx = zstd.ZstdCompressor(level=1)
         with cctx.stream_writer(compressed) as compressor:
@@ -823,7 +1222,7 @@
                 if have_compressed and have_raw:
                     break
 
-        compressed.seek(0)
+        compressed = io.BytesIO(compressed.getvalue())
         self.assertGreater(len(compressed.getvalue()),
                            zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE)
 
@@ -861,7 +1260,7 @@
 
         source = io.BytesIO()
 
-        compressed = io.BytesIO()
+        compressed = NonClosingBytesIO()
         with cctx.stream_writer(compressed) as compressor:
             for i in range(256):
                 chunk = b'\0' * 1024
@@ -874,7 +1273,7 @@
                                  max_output_size=len(source.getvalue()))
         self.assertEqual(simple, source.getvalue())
 
-        compressed.seek(0)
+        compressed = io.BytesIO(compressed.getvalue())
         streamed = b''.join(dctx.read_to_iter(compressed))
         self.assertEqual(streamed, source.getvalue())
 
@@ -1001,6 +1400,9 @@
     def test_invalid_inputs(self):
         dctx = zstd.ZstdDecompressor()
 
+        if not hasattr(dctx, 'multi_decompress_to_buffer'):
+            self.skipTest('multi_decompress_to_buffer not available')
+
         with self.assertRaises(TypeError):
             dctx.multi_decompress_to_buffer(True)
 
@@ -1020,6 +1422,10 @@
         frames = [cctx.compress(d) for d in original]
 
         dctx = zstd.ZstdDecompressor()
+
+        if not hasattr(dctx, 'multi_decompress_to_buffer'):
+            self.skipTest('multi_decompress_to_buffer not available')
+
         result = dctx.multi_decompress_to_buffer(frames)
 
         self.assertEqual(len(result), len(frames))
@@ -1041,6 +1447,10 @@
         sizes = struct.pack('=' + 'Q' * len(original), *map(len, original))
 
         dctx = zstd.ZstdDecompressor()
+
+        if not hasattr(dctx, 'multi_decompress_to_buffer'):
+            self.skipTest('multi_decompress_to_buffer not available')
+
         result = dctx.multi_decompress_to_buffer(frames, decompressed_sizes=sizes)
 
         self.assertEqual(len(result), len(frames))
@@ -1057,6 +1467,9 @@
 
         dctx = zstd.ZstdDecompressor()
 
+        if not hasattr(dctx, 'multi_decompress_to_buffer'):
+            self.skipTest('multi_decompress_to_buffer not available')
+
         segments = struct.pack('=QQQQ', 0, len(frames[0]), len(frames[0]), len(frames[1]))
         b = zstd.BufferWithSegments(b''.join(frames), segments)
 
@@ -1074,12 +1487,16 @@
         frames = [cctx.compress(d) for d in original]
         sizes = struct.pack('=' + 'Q' * len(original), *map(len, original))
 
+        dctx = zstd.ZstdDecompressor()
+
+        if not hasattr(dctx, 'multi_decompress_to_buffer'):
+            self.skipTest('multi_decompress_to_buffer not available')
+
         segments = struct.pack('=QQQQQQ', 0, len(frames[0]),
                                len(frames[0]), len(frames[1]),
                                len(frames[0]) + len(frames[1]), len(frames[2]))
         b = zstd.BufferWithSegments(b''.join(frames), segments)
 
-        dctx = zstd.ZstdDecompressor()
         result = dctx.multi_decompress_to_buffer(b, decompressed_sizes=sizes)
 
         self.assertEqual(len(result), len(frames))
@@ -1099,10 +1516,14 @@
             b'foo4' * 6,
         ]
 
+        if not hasattr(cctx, 'multi_compress_to_buffer'):
+            self.skipTest('multi_compress_to_buffer not available')
+
         frames = cctx.multi_compress_to_buffer(original)
 
         # Check round trip.
         dctx = zstd.ZstdDecompressor()
+
         decompressed = dctx.multi_decompress_to_buffer(frames, threads=3)
 
         self.assertEqual(len(decompressed), len(original))
@@ -1138,7 +1559,12 @@
         frames = [cctx.compress(s) for s in generate_samples()]
 
         dctx = zstd.ZstdDecompressor(dict_data=d)
+
+        if not hasattr(dctx, 'multi_decompress_to_buffer'):
+            self.skipTest('multi_decompress_to_buffer not available')
+
         result = dctx.multi_decompress_to_buffer(frames)
+
         self.assertEqual([o.tobytes() for o in result], generate_samples())
 
     def test_multiple_threads(self):
@@ -1149,6 +1575,10 @@
         frames.extend(cctx.compress(b'y' * 64) for i in range(256))
 
         dctx = zstd.ZstdDecompressor()
+
+        if not hasattr(dctx, 'multi_decompress_to_buffer'):
+            self.skipTest('multi_decompress_to_buffer not available')
+
         result = dctx.multi_decompress_to_buffer(frames, threads=-1)
 
         self.assertEqual(len(result), len(frames))
@@ -1164,6 +1594,9 @@
 
         dctx = zstd.ZstdDecompressor()
 
+        if not hasattr(dctx, 'multi_decompress_to_buffer'):
+            self.skipTest('multi_decompress_to_buffer not available')
+
         with self.assertRaisesRegexp(zstd.ZstdError,
                                      'error decompressing item 1: ('
                                      'Corrupted block|'