contrib/python-zstandard/tests/test_train_dictionary.py
author Jun Wu <quark@fb.com>
Tue, 11 Apr 2017 18:31:40 -0700
changeset 31890 ca7af5d15b21
parent 31799 e0dc40530c5a
child 37495 b1fb341d8a61
permissions -rw-r--r--
chg: always wait for pager Previously, when runcommand raises, chg aborts with, and does not wait for pager. The call stack is like: hgc_runcommand -> handleresponse -> readchannel -> debugmsg("failed to read channel") -> exit(255) That means, chg returns to the shell, then both the pager and the shell will read from the terminal at the same time, causing problems. This patch fixes that by using "atexit" to register the pager cleanup function so chg will always wait for pager even if runcommand raises.

import sys

try:
    import unittest2 as unittest
except ImportError:
    import unittest

import zstd

from . common import (
    make_cffi,
)

if sys.version_info[0] >= 3:
    int_type = int
else:
    int_type = long


@make_cffi
class TestTrainDictionary(unittest.TestCase):
    def test_no_args(self):
        with self.assertRaises(TypeError):
            zstd.train_dictionary()

    def test_bad_args(self):
        with self.assertRaises(TypeError):
            zstd.train_dictionary(8192, u'foo')

        with self.assertRaises(ValueError):
            zstd.train_dictionary(8192, [u'foo'])

    def test_basic(self):
        samples = []
        for i in range(128):
            samples.append(b'foo' * 64)
            samples.append(b'bar' * 64)
            samples.append(b'foobar' * 64)
            samples.append(b'baz' * 64)
            samples.append(b'foobaz' * 64)
            samples.append(b'bazfoo' * 64)

        d = zstd.train_dictionary(8192, samples)
        self.assertLessEqual(len(d), 8192)

        dict_id = d.dict_id()
        self.assertIsInstance(dict_id, int_type)

        data = d.as_bytes()
        self.assertEqual(data[0:4], b'\x37\xa4\x30\xec')

    def test_set_dict_id(self):
        samples = []
        for i in range(128):
            samples.append(b'foo' * 64)
            samples.append(b'foobar' * 64)

        d = zstd.train_dictionary(8192, samples, dict_id=42)
        self.assertEqual(d.dict_id(), 42)


@make_cffi
class TestTrainCoverDictionary(unittest.TestCase):
    def test_no_args(self):
        with self.assertRaises(TypeError):
            zstd.train_cover_dictionary()

    def test_bad_args(self):
        with self.assertRaises(TypeError):
            zstd.train_cover_dictionary(8192, u'foo')

        with self.assertRaises(ValueError):
            zstd.train_cover_dictionary(8192, [u'foo'])

    def test_basic(self):
        samples = []
        for i in range(128):
            samples.append(b'foo' * 64)
            samples.append(b'foobar' * 64)

        d = zstd.train_cover_dictionary(8192, samples, k=64, d=16)
        self.assertIsInstance(d.dict_id(), int_type)

        data = d.as_bytes()
        self.assertEqual(data[0:4], b'\x37\xa4\x30\xec')

        self.assertEqual(d.k, 64)
        self.assertEqual(d.d, 16)

    def test_set_dict_id(self):
        samples = []
        for i in range(128):
            samples.append(b'foo' * 64)
            samples.append(b'foobar' * 64)

        d = zstd.train_cover_dictionary(8192, samples, k=64, d=16,
                                        dict_id=42)
        self.assertEqual(d.dict_id(), 42)

    def test_optimize(self):
        samples = []
        for i in range(128):
            samples.append(b'foo' * 64)
            samples.append(b'foobar' * 64)

        d = zstd.train_cover_dictionary(8192, samples, optimize=True,
                                        threads=-1, steps=1, d=16)

        self.assertEqual(d.k, 16)
        self.assertEqual(d.d, 16)