view tests/test-wireproto-framing.py @ 46874:84a93fa7ecfd

revlog-compression: use zstd by default (if available) As see in changeset bb271ec2fbfb, zstd is 20% to 50% faster for reading and writing. Use take advantage of the new config behavior to try zstd by default, falling back to zlib is zstd is not available on that plateform. Differential Revision: https://phab.mercurial-scm.org/D10326
author Pierre-Yves David <pierre-yves.david@octobus.net>
date Tue, 06 Apr 2021 18:55:19 +0200
parents 89a2afe31e82
children 6000f5b25c9b
line wrap: on
line source

from __future__ import absolute_import, print_function

import unittest

from mercurial import (
    util,
    wireprotoframing as framing,
)

ffs = framing.makeframefromhumanstring


class FrameHumanStringTests(unittest.TestCase):
    def testbasic(self):
        self.assertEqual(
            ffs(b'1 1 0 1 0 '), b'\x00\x00\x00\x01\x00\x01\x00\x10'
        )

        self.assertEqual(
            ffs(b'2 4 0 1 0 '), b'\x00\x00\x00\x02\x00\x04\x00\x10'
        )

        self.assertEqual(
            ffs(b'2 4 0 1 0 foo'), b'\x03\x00\x00\x02\x00\x04\x00\x10foo'
        )

    def testcborint(self):
        self.assertEqual(
            ffs(b'1 1 0 1 0 cbor:15'), b'\x01\x00\x00\x01\x00\x01\x00\x10\x0f'
        )

        self.assertEqual(
            ffs(b'1 1 0 1 0 cbor:42'), b'\x02\x00\x00\x01\x00\x01\x00\x10\x18*'
        )

        self.assertEqual(
            ffs(b'1 1 0 1 0 cbor:1048576'),
            b'\x05\x00\x00\x01\x00\x01\x00\x10\x1a' b'\x00\x10\x00\x00',
        )

        self.assertEqual(
            ffs(b'1 1 0 1 0 cbor:0'), b'\x01\x00\x00\x01\x00\x01\x00\x10\x00'
        )

        self.assertEqual(
            ffs(b'1 1 0 1 0 cbor:-1'), b'\x01\x00\x00\x01\x00\x01\x00\x10 '
        )

        self.assertEqual(
            ffs(b'1 1 0 1 0 cbor:-342542'),
            b'\x05\x00\x00\x01\x00\x01\x00\x10:\x00\x05:\r',
        )

    def testcborstrings(self):
        self.assertEqual(
            ffs(b"1 1 0 1 0 cbor:b'foo'"),
            b'\x04\x00\x00\x01\x00\x01\x00\x10Cfoo',
        )

    def testcborlists(self):
        self.assertEqual(
            ffs(b"1 1 0 1 0 cbor:[None, True, False, 42, b'foo']"),
            b'\n\x00\x00\x01\x00\x01\x00\x10\x85\xf6\xf5\xf4' b'\x18*Cfoo',
        )

    def testcbordicts(self):
        self.assertEqual(
            ffs(b"1 1 0 1 0 " b"cbor:{b'foo': b'val1', b'bar': b'val2'}"),
            b'\x13\x00\x00\x01\x00\x01\x00\x10\xa2' b'CbarDval2CfooDval1',
        )


class FrameTests(unittest.TestCase):
    def testdataexactframesize(self):
        data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE)

        stream = framing.stream(1)
        frames = list(
            framing.createcommandframes(stream, 1, b'command', {}, data)
        )
        self.assertEqual(
            frames,
            [
                ffs(
                    b'1 1 stream-begin command-request new|have-data '
                    b"cbor:{b'name': b'command'}"
                ),
                ffs(b'1 1 0 command-data continuation %s' % data.getvalue()),
                ffs(b'1 1 0 command-data eos '),
            ],
        )

    def testdatamultipleframes(self):
        data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1))

        stream = framing.stream(1)
        frames = list(
            framing.createcommandframes(stream, 1, b'command', {}, data)
        )
        self.assertEqual(
            frames,
            [
                ffs(
                    b'1 1 stream-begin command-request new|have-data '
                    b"cbor:{b'name': b'command'}"
                ),
                ffs(
                    b'1 1 0 command-data continuation %s'
                    % (b'x' * framing.DEFAULT_MAX_FRAME_SIZE)
                ),
                ffs(b'1 1 0 command-data eos x'),
            ],
        )

    def testargsanddata(self):
        data = util.bytesio(b'x' * 100)

        stream = framing.stream(1)
        frames = list(
            framing.createcommandframes(
                stream,
                1,
                b'command',
                {
                    b'key1': b'key1value',
                    b'key2': b'key2value',
                    b'key3': b'key3value',
                },
                data,
            )
        )

        self.assertEqual(
            frames,
            [
                ffs(
                    b'1 1 stream-begin command-request new|have-data '
                    b"cbor:{b'name': b'command', b'args': {b'key1': b'key1value', "
                    b"b'key2': b'key2value', b'key3': b'key3value'}}"
                ),
                ffs(b'1 1 0 command-data eos %s' % data.getvalue()),
            ],
        )

    if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
        # Python 3.7 deprecates the regex*p* version, but 2.7 lacks
        # the regex version.
        assertRaisesRegex = (  # camelcase-required
            unittest.TestCase.assertRaisesRegexp
        )

    def testtextoutputformattingstringtype(self):
        """Formatting string must be bytes."""
        with self.assertRaisesRegex(ValueError, 'must use bytes formatting '):
            list(
                framing.createtextoutputframe(
                    None, 1, [(b'foo'.decode('ascii'), [], [])]
                )
            )

    def testtextoutputargumentbytes(self):
        with self.assertRaisesRegex(ValueError, 'must use bytes for argument'):
            list(
                framing.createtextoutputframe(
                    None, 1, [(b'foo', [b'foo'.decode('ascii')], [])]
                )
            )

    def testtextoutputlabelbytes(self):
        with self.assertRaisesRegex(ValueError, 'must use bytes for labels'):
            list(
                framing.createtextoutputframe(
                    None, 1, [(b'foo', [], [b'foo'.decode('ascii')])]
                )
            )

    def testtextoutput1simpleatom(self):
        stream = framing.stream(1)
        val = list(framing.createtextoutputframe(stream, 1, [(b'foo', [], [])]))

        self.assertEqual(
            val,
            [
                ffs(
                    b'1 1 stream-begin text-output 0 '
                    b"cbor:[{b'msg': b'foo'}]"
                ),
            ],
        )

    def testtextoutput2simpleatoms(self):
        stream = framing.stream(1)
        val = list(
            framing.createtextoutputframe(
                stream,
                1,
                [
                    (b'foo', [], []),
                    (b'bar', [], []),
                ],
            )
        )

        self.assertEqual(
            val,
            [
                ffs(
                    b'1 1 stream-begin text-output 0 '
                    b"cbor:[{b'msg': b'foo'}, {b'msg': b'bar'}]"
                )
            ],
        )

    def testtextoutput1arg(self):
        stream = framing.stream(1)
        val = list(
            framing.createtextoutputframe(
                stream,
                1,
                [
                    (b'foo %s', [b'val1'], []),
                ],
            )
        )

        self.assertEqual(
            val,
            [
                ffs(
                    b'1 1 stream-begin text-output 0 '
                    b"cbor:[{b'msg': b'foo %s', b'args': [b'val1']}]"
                )
            ],
        )

    def testtextoutput2arg(self):
        stream = framing.stream(1)
        val = list(
            framing.createtextoutputframe(
                stream,
                1,
                [
                    (b'foo %s %s', [b'val', b'value'], []),
                ],
            )
        )

        self.assertEqual(
            val,
            [
                ffs(
                    b'1 1 stream-begin text-output 0 '
                    b"cbor:[{b'msg': b'foo %s %s', b'args': [b'val', b'value']}]"
                )
            ],
        )

    def testtextoutput1label(self):
        stream = framing.stream(1)
        val = list(
            framing.createtextoutputframe(
                stream,
                1,
                [
                    (b'foo', [], [b'label']),
                ],
            )
        )

        self.assertEqual(
            val,
            [
                ffs(
                    b'1 1 stream-begin text-output 0 '
                    b"cbor:[{b'msg': b'foo', b'labels': [b'label']}]"
                )
            ],
        )

    def testargandlabel(self):
        stream = framing.stream(1)
        val = list(
            framing.createtextoutputframe(
                stream,
                1,
                [
                    (b'foo %s', [b'arg'], [b'label']),
                ],
            )
        )

        self.assertEqual(
            val,
            [
                ffs(
                    b'1 1 stream-begin text-output 0 '
                    b"cbor:[{b'msg': b'foo %s', b'args': [b'arg'], "
                    b"b'labels': [b'label']}]"
                )
            ],
        )


if __name__ == '__main__':
    import silenttestrunner

    silenttestrunner.main(__name__)