view contrib/python-zstandard/tests/test_data_structures.py @ 45095:8e04607023e5

procutil: ensure that procutil.std{out,err}.write() writes all bytes Python 3 offers different kind of streams and it’s not guaranteed for all of them that calling write() writes all bytes. When Python is started in unbuffered mode, sys.std{out,err}.buffer are instances of io.FileIO, whose write() can write less bytes for platform-specific reasons (e.g. Linux has a 0x7ffff000 bytes maximum and could write less if interrupted by a signal; when writing to Windows consoles, it’s limited to 32767 bytes to avoid the "not enough space" error). This can lead to silent loss of data, both when using sys.std{out,err}.buffer (which may in fact not be a buffered stream) and when using the text streams sys.std{out,err} (I’ve created a CPython bug report for that: https://bugs.python.org/issue41221). Python may fix the problem at some point. For now, we implement our own wrapper for procutil.std{out,err} that calls the raw stream’s write() method until all bytes have been written. We don’t use sys.std{out,err} for larger writes, so I think it’s not worth the effort to patch them.
author Manuel Jacob <me@manueljacob.de>
date Fri, 10 Jul 2020 12:27:58 +0200
parents 5e84a96d865b
children
line wrap: on
line source

import sys
import unittest

import zstandard as zstd

from .common import (
    make_cffi,
    TestCase,
)


@make_cffi
class TestCompressionParameters(TestCase):
    def test_bounds(self):
        zstd.ZstdCompressionParameters(
            window_log=zstd.WINDOWLOG_MIN,
            chain_log=zstd.CHAINLOG_MIN,
            hash_log=zstd.HASHLOG_MIN,
            search_log=zstd.SEARCHLOG_MIN,
            min_match=zstd.MINMATCH_MIN + 1,
            target_length=zstd.TARGETLENGTH_MIN,
            strategy=zstd.STRATEGY_FAST,
        )

        zstd.ZstdCompressionParameters(
            window_log=zstd.WINDOWLOG_MAX,
            chain_log=zstd.CHAINLOG_MAX,
            hash_log=zstd.HASHLOG_MAX,
            search_log=zstd.SEARCHLOG_MAX,
            min_match=zstd.MINMATCH_MAX - 1,
            target_length=zstd.TARGETLENGTH_MAX,
            strategy=zstd.STRATEGY_BTULTRA2,
        )

    def test_from_level(self):
        p = zstd.ZstdCompressionParameters.from_level(1)
        self.assertIsInstance(p, zstd.CompressionParameters)

        self.assertEqual(p.window_log, 19)

        p = zstd.ZstdCompressionParameters.from_level(-4)
        self.assertEqual(p.window_log, 19)

    def test_members(self):
        p = zstd.ZstdCompressionParameters(
            window_log=10,
            chain_log=6,
            hash_log=7,
            search_log=4,
            min_match=5,
            target_length=8,
            strategy=1,
        )
        self.assertEqual(p.window_log, 10)
        self.assertEqual(p.chain_log, 6)
        self.assertEqual(p.hash_log, 7)
        self.assertEqual(p.search_log, 4)
        self.assertEqual(p.min_match, 5)
        self.assertEqual(p.target_length, 8)
        self.assertEqual(p.compression_strategy, 1)

        p = zstd.ZstdCompressionParameters(compression_level=2)
        self.assertEqual(p.compression_level, 2)

        p = zstd.ZstdCompressionParameters(threads=4)
        self.assertEqual(p.threads, 4)

        p = zstd.ZstdCompressionParameters(
            threads=2, job_size=1048576, overlap_log=6
        )
        self.assertEqual(p.threads, 2)
        self.assertEqual(p.job_size, 1048576)
        self.assertEqual(p.overlap_log, 6)
        self.assertEqual(p.overlap_size_log, 6)

        p = zstd.ZstdCompressionParameters(compression_level=-1)
        self.assertEqual(p.compression_level, -1)

        p = zstd.ZstdCompressionParameters(compression_level=-2)
        self.assertEqual(p.compression_level, -2)

        p = zstd.ZstdCompressionParameters(force_max_window=True)
        self.assertEqual(p.force_max_window, 1)

        p = zstd.ZstdCompressionParameters(enable_ldm=True)
        self.assertEqual(p.enable_ldm, 1)

        p = zstd.ZstdCompressionParameters(ldm_hash_log=7)
        self.assertEqual(p.ldm_hash_log, 7)

        p = zstd.ZstdCompressionParameters(ldm_min_match=6)
        self.assertEqual(p.ldm_min_match, 6)

        p = zstd.ZstdCompressionParameters(ldm_bucket_size_log=7)
        self.assertEqual(p.ldm_bucket_size_log, 7)

        p = zstd.ZstdCompressionParameters(ldm_hash_rate_log=8)
        self.assertEqual(p.ldm_hash_every_log, 8)
        self.assertEqual(p.ldm_hash_rate_log, 8)

    def test_estimated_compression_context_size(self):
        p = zstd.ZstdCompressionParameters(
            window_log=20,
            chain_log=16,
            hash_log=17,
            search_log=1,
            min_match=5,
            target_length=16,
            strategy=zstd.STRATEGY_DFAST,
        )

        # 32-bit has slightly different values from 64-bit.
        self.assertAlmostEqual(
            p.estimated_compression_context_size(), 1294464, delta=400
        )

    def test_strategy(self):
        with self.assertRaisesRegex(
            ValueError, "cannot specify both compression_strategy"
        ):
            zstd.ZstdCompressionParameters(strategy=0, compression_strategy=0)

        p = zstd.ZstdCompressionParameters(strategy=2)
        self.assertEqual(p.compression_strategy, 2)

        p = zstd.ZstdCompressionParameters(strategy=3)
        self.assertEqual(p.compression_strategy, 3)

    def test_ldm_hash_rate_log(self):
        with self.assertRaisesRegex(
            ValueError, "cannot specify both ldm_hash_rate_log"
        ):
            zstd.ZstdCompressionParameters(
                ldm_hash_rate_log=8, ldm_hash_every_log=4
            )

        p = zstd.ZstdCompressionParameters(ldm_hash_rate_log=8)
        self.assertEqual(p.ldm_hash_every_log, 8)

        p = zstd.ZstdCompressionParameters(ldm_hash_every_log=16)
        self.assertEqual(p.ldm_hash_every_log, 16)

    def test_overlap_log(self):
        with self.assertRaisesRegex(
            ValueError, "cannot specify both overlap_log"
        ):
            zstd.ZstdCompressionParameters(overlap_log=1, overlap_size_log=9)

        p = zstd.ZstdCompressionParameters(overlap_log=2)
        self.assertEqual(p.overlap_log, 2)
        self.assertEqual(p.overlap_size_log, 2)

        p = zstd.ZstdCompressionParameters(overlap_size_log=4)
        self.assertEqual(p.overlap_log, 4)
        self.assertEqual(p.overlap_size_log, 4)


@make_cffi
class TestFrameParameters(TestCase):
    def test_invalid_type(self):
        with self.assertRaises(TypeError):
            zstd.get_frame_parameters(None)

        # Python 3 doesn't appear to convert unicode to Py_buffer.
        if sys.version_info[0] >= 3:
            with self.assertRaises(TypeError):
                zstd.get_frame_parameters(u"foobarbaz")
        else:
            # CPython will convert unicode to Py_buffer. But CFFI won't.
            if zstd.backend == "cffi":
                with self.assertRaises(TypeError):
                    zstd.get_frame_parameters(u"foobarbaz")
            else:
                with self.assertRaises(zstd.ZstdError):
                    zstd.get_frame_parameters(u"foobarbaz")

    def test_invalid_input_sizes(self):
        with self.assertRaisesRegex(
            zstd.ZstdError, "not enough data for frame"
        ):
            zstd.get_frame_parameters(b"")

        with self.assertRaisesRegex(
            zstd.ZstdError, "not enough data for frame"
        ):
            zstd.get_frame_parameters(zstd.FRAME_HEADER)

    def test_invalid_frame(self):
        with self.assertRaisesRegex(zstd.ZstdError, "Unknown frame descriptor"):
            zstd.get_frame_parameters(b"foobarbaz")

    def test_attributes(self):
        params = zstd.get_frame_parameters(zstd.FRAME_HEADER + b"\x00\x00")
        self.assertEqual(params.content_size, zstd.CONTENTSIZE_UNKNOWN)
        self.assertEqual(params.window_size, 1024)
        self.assertEqual(params.dict_id, 0)
        self.assertFalse(params.has_checksum)

        # Lowest 2 bits indicate a dictionary and length. Here, the dict id is 1 byte.
        params = zstd.get_frame_parameters(zstd.FRAME_HEADER + b"\x01\x00\xff")
        self.assertEqual(params.content_size, zstd.CONTENTSIZE_UNKNOWN)
        self.assertEqual(params.window_size, 1024)
        self.assertEqual(params.dict_id, 255)
        self.assertFalse(params.has_checksum)

        # Lowest 3rd bit indicates if checksum is present.
        params = zstd.get_frame_parameters(zstd.FRAME_HEADER + b"\x04\x00")
        self.assertEqual(params.content_size, zstd.CONTENTSIZE_UNKNOWN)
        self.assertEqual(params.window_size, 1024)
        self.assertEqual(params.dict_id, 0)
        self.assertTrue(params.has_checksum)

        # Upper 2 bits indicate content size.
        params = zstd.get_frame_parameters(
            zstd.FRAME_HEADER + b"\x40\x00\xff\x00"
        )
        self.assertEqual(params.content_size, 511)
        self.assertEqual(params.window_size, 1024)
        self.assertEqual(params.dict_id, 0)
        self.assertFalse(params.has_checksum)

        # Window descriptor is 2nd byte after frame header.
        params = zstd.get_frame_parameters(zstd.FRAME_HEADER + b"\x00\x40")
        self.assertEqual(params.content_size, zstd.CONTENTSIZE_UNKNOWN)
        self.assertEqual(params.window_size, 262144)
        self.assertEqual(params.dict_id, 0)
        self.assertFalse(params.has_checksum)

        # Set multiple things.
        params = zstd.get_frame_parameters(
            zstd.FRAME_HEADER + b"\x45\x40\x0f\x10\x00"
        )
        self.assertEqual(params.content_size, 272)
        self.assertEqual(params.window_size, 262144)
        self.assertEqual(params.dict_id, 15)
        self.assertTrue(params.has_checksum)

    def test_input_types(self):
        v = zstd.FRAME_HEADER + b"\x00\x00"

        mutable_array = bytearray(len(v))
        mutable_array[:] = v

        sources = [
            memoryview(v),
            bytearray(v),
            mutable_array,
        ]

        for source in sources:
            params = zstd.get_frame_parameters(source)
            self.assertEqual(params.content_size, zstd.CONTENTSIZE_UNKNOWN)
            self.assertEqual(params.window_size, 1024)
            self.assertEqual(params.dict_id, 0)
            self.assertFalse(params.has_checksum)