tests/test-wireproto-framing.py
author Valentin Gatien-Baron <valentin.gatienbaron@gmail.com>
Tue, 26 May 2020 08:15:09 -0400
changeset 44861 065421e12248
parent 43076 2372284d9457
child 45942 89a2afe31e82
permissions -rw-r--r--
files: speed up `hg files` when no flags change display It's not the first time I see slowness from this command slow down tools built on top of hg. The majority of the time is spent merely printing the result before this change, which is clearly not how it should be (especially since the computation of the result also looks slow). Running `hg files` in mozilla-central: parent revision: 1,260s this commit: 0,683s this commit without batching ui.write: 0,931s this commit replacing the body of the loop with `pass`: 0,566s This looks like a prime candidate for a rust fast path, but until then, it seems reasonable to optimize the python. Differential Revision: https://phab.mercurial-scm.org/D8586

from __future__ import absolute_import, print_function

import unittest

from mercurial import (
    util,
    wireprotoframing as framing,
)

ffs = framing.makeframefromhumanstring


class FrameHumanStringTests(unittest.TestCase):
    def testbasic(self):
        self.assertEqual(
            ffs(b'1 1 0 1 0 '), b'\x00\x00\x00\x01\x00\x01\x00\x10'
        )

        self.assertEqual(
            ffs(b'2 4 0 1 0 '), b'\x00\x00\x00\x02\x00\x04\x00\x10'
        )

        self.assertEqual(
            ffs(b'2 4 0 1 0 foo'), b'\x03\x00\x00\x02\x00\x04\x00\x10foo'
        )

    def testcborint(self):
        self.assertEqual(
            ffs(b'1 1 0 1 0 cbor:15'), b'\x01\x00\x00\x01\x00\x01\x00\x10\x0f'
        )

        self.assertEqual(
            ffs(b'1 1 0 1 0 cbor:42'), b'\x02\x00\x00\x01\x00\x01\x00\x10\x18*'
        )

        self.assertEqual(
            ffs(b'1 1 0 1 0 cbor:1048576'),
            b'\x05\x00\x00\x01\x00\x01\x00\x10\x1a' b'\x00\x10\x00\x00',
        )

        self.assertEqual(
            ffs(b'1 1 0 1 0 cbor:0'), b'\x01\x00\x00\x01\x00\x01\x00\x10\x00'
        )

        self.assertEqual(
            ffs(b'1 1 0 1 0 cbor:-1'), b'\x01\x00\x00\x01\x00\x01\x00\x10 '
        )

        self.assertEqual(
            ffs(b'1 1 0 1 0 cbor:-342542'),
            b'\x05\x00\x00\x01\x00\x01\x00\x10:\x00\x05:\r',
        )

    def testcborstrings(self):
        self.assertEqual(
            ffs(b"1 1 0 1 0 cbor:b'foo'"),
            b'\x04\x00\x00\x01\x00\x01\x00\x10Cfoo',
        )

    def testcborlists(self):
        self.assertEqual(
            ffs(b"1 1 0 1 0 cbor:[None, True, False, 42, b'foo']"),
            b'\n\x00\x00\x01\x00\x01\x00\x10\x85\xf6\xf5\xf4' b'\x18*Cfoo',
        )

    def testcbordicts(self):
        self.assertEqual(
            ffs(b"1 1 0 1 0 " b"cbor:{b'foo': b'val1', b'bar': b'val2'}"),
            b'\x13\x00\x00\x01\x00\x01\x00\x10\xa2' b'CbarDval2CfooDval1',
        )


class FrameTests(unittest.TestCase):
    def testdataexactframesize(self):
        data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE)

        stream = framing.stream(1)
        frames = list(
            framing.createcommandframes(stream, 1, b'command', {}, data)
        )
        self.assertEqual(
            frames,
            [
                ffs(
                    b'1 1 stream-begin command-request new|have-data '
                    b"cbor:{b'name': b'command'}"
                ),
                ffs(b'1 1 0 command-data continuation %s' % data.getvalue()),
                ffs(b'1 1 0 command-data eos '),
            ],
        )

    def testdatamultipleframes(self):
        data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1))

        stream = framing.stream(1)
        frames = list(
            framing.createcommandframes(stream, 1, b'command', {}, data)
        )
        self.assertEqual(
            frames,
            [
                ffs(
                    b'1 1 stream-begin command-request new|have-data '
                    b"cbor:{b'name': b'command'}"
                ),
                ffs(
                    b'1 1 0 command-data continuation %s'
                    % (b'x' * framing.DEFAULT_MAX_FRAME_SIZE)
                ),
                ffs(b'1 1 0 command-data eos x'),
            ],
        )

    def testargsanddata(self):
        data = util.bytesio(b'x' * 100)

        stream = framing.stream(1)
        frames = list(
            framing.createcommandframes(
                stream,
                1,
                b'command',
                {
                    b'key1': b'key1value',
                    b'key2': b'key2value',
                    b'key3': b'key3value',
                },
                data,
            )
        )

        self.assertEqual(
            frames,
            [
                ffs(
                    b'1 1 stream-begin command-request new|have-data '
                    b"cbor:{b'name': b'command', b'args': {b'key1': b'key1value', "
                    b"b'key2': b'key2value', b'key3': b'key3value'}}"
                ),
                ffs(b'1 1 0 command-data eos %s' % data.getvalue()),
            ],
        )

    if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
        # Python 3.7 deprecates the regex*p* version, but 2.7 lacks
        # the regex version.
        assertRaisesRegex = (  # camelcase-required
            unittest.TestCase.assertRaisesRegexp
        )

    def testtextoutputformattingstringtype(self):
        """Formatting string must be bytes."""
        with self.assertRaisesRegex(ValueError, 'must use bytes formatting '):
            list(
                framing.createtextoutputframe(
                    None, 1, [(b'foo'.decode('ascii'), [], [])]
                )
            )

    def testtextoutputargumentbytes(self):
        with self.assertRaisesRegex(ValueError, 'must use bytes for argument'):
            list(
                framing.createtextoutputframe(
                    None, 1, [(b'foo', [b'foo'.decode('ascii')], [])]
                )
            )

    def testtextoutputlabelbytes(self):
        with self.assertRaisesRegex(ValueError, 'must use bytes for labels'):
            list(
                framing.createtextoutputframe(
                    None, 1, [(b'foo', [], [b'foo'.decode('ascii')])]
                )
            )

    def testtextoutput1simpleatom(self):
        stream = framing.stream(1)
        val = list(framing.createtextoutputframe(stream, 1, [(b'foo', [], [])]))

        self.assertEqual(
            val,
            [
                ffs(
                    b'1 1 stream-begin text-output 0 '
                    b"cbor:[{b'msg': b'foo'}]"
                ),
            ],
        )

    def testtextoutput2simpleatoms(self):
        stream = framing.stream(1)
        val = list(
            framing.createtextoutputframe(
                stream, 1, [(b'foo', [], []), (b'bar', [], []),]
            )
        )

        self.assertEqual(
            val,
            [
                ffs(
                    b'1 1 stream-begin text-output 0 '
                    b"cbor:[{b'msg': b'foo'}, {b'msg': b'bar'}]"
                )
            ],
        )

    def testtextoutput1arg(self):
        stream = framing.stream(1)
        val = list(
            framing.createtextoutputframe(
                stream, 1, [(b'foo %s', [b'val1'], []),]
            )
        )

        self.assertEqual(
            val,
            [
                ffs(
                    b'1 1 stream-begin text-output 0 '
                    b"cbor:[{b'msg': b'foo %s', b'args': [b'val1']}]"
                )
            ],
        )

    def testtextoutput2arg(self):
        stream = framing.stream(1)
        val = list(
            framing.createtextoutputframe(
                stream, 1, [(b'foo %s %s', [b'val', b'value'], []),]
            )
        )

        self.assertEqual(
            val,
            [
                ffs(
                    b'1 1 stream-begin text-output 0 '
                    b"cbor:[{b'msg': b'foo %s %s', b'args': [b'val', b'value']}]"
                )
            ],
        )

    def testtextoutput1label(self):
        stream = framing.stream(1)
        val = list(
            framing.createtextoutputframe(
                stream, 1, [(b'foo', [], [b'label']),]
            )
        )

        self.assertEqual(
            val,
            [
                ffs(
                    b'1 1 stream-begin text-output 0 '
                    b"cbor:[{b'msg': b'foo', b'labels': [b'label']}]"
                )
            ],
        )

    def testargandlabel(self):
        stream = framing.stream(1)
        val = list(
            framing.createtextoutputframe(
                stream, 1, [(b'foo %s', [b'arg'], [b'label']),]
            )
        )

        self.assertEqual(
            val,
            [
                ffs(
                    b'1 1 stream-begin text-output 0 '
                    b"cbor:[{b'msg': b'foo %s', b'args': [b'arg'], "
                    b"b'labels': [b'label']}]"
                )
            ],
        )


if __name__ == '__main__':
    import silenttestrunner

    silenttestrunner.main(__name__)