view contrib/python-zstandard/tests/test_roundtrip.py @ 31174:842489d18118

tests: fix test-update-branches to remove non-conflicting file I was clearly very sloppy when I wrote the test case for experimental.updatecheck=noconflict. The test case that checks that one can move between commits with a removed file was deleting a file that was modified between the source and target commits, which resulted in a "change/delete" conflict. Since that is a conlict, the update correctly failed. Let's fix the test by removing a file that is not modified between the two commits.
author Martin von Zweigbergk <martinvonz@google.com>
date Mon, 06 Mar 2017 23:19:57 -0800
parents c32454d69b85
children
line wrap: on
line source

import io

try:
    import unittest2 as unittest
except ImportError:
    import unittest

try:
    import hypothesis
    import hypothesis.strategies as strategies
except ImportError:
    raise unittest.SkipTest('hypothesis not available')

import zstd

from .common import (
    make_cffi,
)

compression_levels = strategies.integers(min_value=1, max_value=22)


@make_cffi
class TestRoundTrip(unittest.TestCase):
    @hypothesis.given(strategies.binary(), compression_levels)
    def test_compress_write_to(self, data, level):
        """Random data from compress() roundtrips via write_to."""
        cctx = zstd.ZstdCompressor(level=level)
        compressed = cctx.compress(data)

        buffer = io.BytesIO()
        dctx = zstd.ZstdDecompressor()
        with dctx.write_to(buffer) as decompressor:
            decompressor.write(compressed)

        self.assertEqual(buffer.getvalue(), data)

    @hypothesis.given(strategies.binary(), compression_levels)
    def test_compressor_write_to_decompressor_write_to(self, data, level):
        """Random data from compressor write_to roundtrips via write_to."""
        compress_buffer = io.BytesIO()
        decompressed_buffer = io.BytesIO()

        cctx = zstd.ZstdCompressor(level=level)
        with cctx.write_to(compress_buffer) as compressor:
            compressor.write(data)

        dctx = zstd.ZstdDecompressor()
        with dctx.write_to(decompressed_buffer) as decompressor:
            decompressor.write(compress_buffer.getvalue())

        self.assertEqual(decompressed_buffer.getvalue(), data)

    @hypothesis.given(strategies.binary(average_size=1048576))
    @hypothesis.settings(perform_health_check=False)
    def test_compressor_write_to_decompressor_write_to_larger(self, data):
        compress_buffer = io.BytesIO()
        decompressed_buffer = io.BytesIO()

        cctx = zstd.ZstdCompressor(level=5)
        with cctx.write_to(compress_buffer) as compressor:
            compressor.write(data)

        dctx = zstd.ZstdDecompressor()
        with dctx.write_to(decompressed_buffer) as decompressor:
            decompressor.write(compress_buffer.getvalue())

        self.assertEqual(decompressed_buffer.getvalue(), data)