view tests/test-wireproto-framing.py @ 43815:19da643dc10c

tests: finally fix up test-fuzz-targets.t It's been failing on my workstation for a while, since I have a new enough LLVM that I had the fuzzer goo, but not so new that I actually had FuzzedDataProvider. This is a better solution all around in my opinion. I _believe_ this should let us run these tests on most systems, even those using GCC instead of clang. That said, my one attempt to test this on my macOS laptop failed miserably, and I don't feel like doing more work on this right now. Differential Revision: https://phab.mercurial-scm.org/D7566
author Augie Fackler <augie@google.com>
date Fri, 06 Dec 2019 15:08:37 -0500
parents 2372284d9457
children 89a2afe31e82
line wrap: on
line source

from __future__ import absolute_import, print_function

import unittest

from mercurial import (
    util,
    wireprotoframing as framing,
)

ffs = framing.makeframefromhumanstring


class FrameHumanStringTests(unittest.TestCase):
    def testbasic(self):
        self.assertEqual(
            ffs(b'1 1 0 1 0 '), b'\x00\x00\x00\x01\x00\x01\x00\x10'
        )

        self.assertEqual(
            ffs(b'2 4 0 1 0 '), b'\x00\x00\x00\x02\x00\x04\x00\x10'
        )

        self.assertEqual(
            ffs(b'2 4 0 1 0 foo'), b'\x03\x00\x00\x02\x00\x04\x00\x10foo'
        )

    def testcborint(self):
        self.assertEqual(
            ffs(b'1 1 0 1 0 cbor:15'), b'\x01\x00\x00\x01\x00\x01\x00\x10\x0f'
        )

        self.assertEqual(
            ffs(b'1 1 0 1 0 cbor:42'), b'\x02\x00\x00\x01\x00\x01\x00\x10\x18*'
        )

        self.assertEqual(
            ffs(b'1 1 0 1 0 cbor:1048576'),
            b'\x05\x00\x00\x01\x00\x01\x00\x10\x1a' b'\x00\x10\x00\x00',
        )

        self.assertEqual(
            ffs(b'1 1 0 1 0 cbor:0'), b'\x01\x00\x00\x01\x00\x01\x00\x10\x00'
        )

        self.assertEqual(
            ffs(b'1 1 0 1 0 cbor:-1'), b'\x01\x00\x00\x01\x00\x01\x00\x10 '
        )

        self.assertEqual(
            ffs(b'1 1 0 1 0 cbor:-342542'),
            b'\x05\x00\x00\x01\x00\x01\x00\x10:\x00\x05:\r',
        )

    def testcborstrings(self):
        self.assertEqual(
            ffs(b"1 1 0 1 0 cbor:b'foo'"),
            b'\x04\x00\x00\x01\x00\x01\x00\x10Cfoo',
        )

    def testcborlists(self):
        self.assertEqual(
            ffs(b"1 1 0 1 0 cbor:[None, True, False, 42, b'foo']"),
            b'\n\x00\x00\x01\x00\x01\x00\x10\x85\xf6\xf5\xf4' b'\x18*Cfoo',
        )

    def testcbordicts(self):
        self.assertEqual(
            ffs(b"1 1 0 1 0 " b"cbor:{b'foo': b'val1', b'bar': b'val2'}"),
            b'\x13\x00\x00\x01\x00\x01\x00\x10\xa2' b'CbarDval2CfooDval1',
        )


class FrameTests(unittest.TestCase):
    def testdataexactframesize(self):
        data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE)

        stream = framing.stream(1)
        frames = list(
            framing.createcommandframes(stream, 1, b'command', {}, data)
        )
        self.assertEqual(
            frames,
            [
                ffs(
                    b'1 1 stream-begin command-request new|have-data '
                    b"cbor:{b'name': b'command'}"
                ),
                ffs(b'1 1 0 command-data continuation %s' % data.getvalue()),
                ffs(b'1 1 0 command-data eos '),
            ],
        )

    def testdatamultipleframes(self):
        data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1))

        stream = framing.stream(1)
        frames = list(
            framing.createcommandframes(stream, 1, b'command', {}, data)
        )
        self.assertEqual(
            frames,
            [
                ffs(
                    b'1 1 stream-begin command-request new|have-data '
                    b"cbor:{b'name': b'command'}"
                ),
                ffs(
                    b'1 1 0 command-data continuation %s'
                    % (b'x' * framing.DEFAULT_MAX_FRAME_SIZE)
                ),
                ffs(b'1 1 0 command-data eos x'),
            ],
        )

    def testargsanddata(self):
        data = util.bytesio(b'x' * 100)

        stream = framing.stream(1)
        frames = list(
            framing.createcommandframes(
                stream,
                1,
                b'command',
                {
                    b'key1': b'key1value',
                    b'key2': b'key2value',
                    b'key3': b'key3value',
                },
                data,
            )
        )

        self.assertEqual(
            frames,
            [
                ffs(
                    b'1 1 stream-begin command-request new|have-data '
                    b"cbor:{b'name': b'command', b'args': {b'key1': b'key1value', "
                    b"b'key2': b'key2value', b'key3': b'key3value'}}"
                ),
                ffs(b'1 1 0 command-data eos %s' % data.getvalue()),
            ],
        )

    if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
        # Python 3.7 deprecates the regex*p* version, but 2.7 lacks
        # the regex version.
        assertRaisesRegex = (  # camelcase-required
            unittest.TestCase.assertRaisesRegexp
        )

    def testtextoutputformattingstringtype(self):
        """Formatting string must be bytes."""
        with self.assertRaisesRegex(ValueError, 'must use bytes formatting '):
            list(
                framing.createtextoutputframe(
                    None, 1, [(b'foo'.decode('ascii'), [], [])]
                )
            )

    def testtextoutputargumentbytes(self):
        with self.assertRaisesRegex(ValueError, 'must use bytes for argument'):
            list(
                framing.createtextoutputframe(
                    None, 1, [(b'foo', [b'foo'.decode('ascii')], [])]
                )
            )

    def testtextoutputlabelbytes(self):
        with self.assertRaisesRegex(ValueError, 'must use bytes for labels'):
            list(
                framing.createtextoutputframe(
                    None, 1, [(b'foo', [], [b'foo'.decode('ascii')])]
                )
            )

    def testtextoutput1simpleatom(self):
        stream = framing.stream(1)
        val = list(framing.createtextoutputframe(stream, 1, [(b'foo', [], [])]))

        self.assertEqual(
            val,
            [
                ffs(
                    b'1 1 stream-begin text-output 0 '
                    b"cbor:[{b'msg': b'foo'}]"
                ),
            ],
        )

    def testtextoutput2simpleatoms(self):
        stream = framing.stream(1)
        val = list(
            framing.createtextoutputframe(
                stream, 1, [(b'foo', [], []), (b'bar', [], []),]
            )
        )

        self.assertEqual(
            val,
            [
                ffs(
                    b'1 1 stream-begin text-output 0 '
                    b"cbor:[{b'msg': b'foo'}, {b'msg': b'bar'}]"
                )
            ],
        )

    def testtextoutput1arg(self):
        stream = framing.stream(1)
        val = list(
            framing.createtextoutputframe(
                stream, 1, [(b'foo %s', [b'val1'], []),]
            )
        )

        self.assertEqual(
            val,
            [
                ffs(
                    b'1 1 stream-begin text-output 0 '
                    b"cbor:[{b'msg': b'foo %s', b'args': [b'val1']}]"
                )
            ],
        )

    def testtextoutput2arg(self):
        stream = framing.stream(1)
        val = list(
            framing.createtextoutputframe(
                stream, 1, [(b'foo %s %s', [b'val', b'value'], []),]
            )
        )

        self.assertEqual(
            val,
            [
                ffs(
                    b'1 1 stream-begin text-output 0 '
                    b"cbor:[{b'msg': b'foo %s %s', b'args': [b'val', b'value']}]"
                )
            ],
        )

    def testtextoutput1label(self):
        stream = framing.stream(1)
        val = list(
            framing.createtextoutputframe(
                stream, 1, [(b'foo', [], [b'label']),]
            )
        )

        self.assertEqual(
            val,
            [
                ffs(
                    b'1 1 stream-begin text-output 0 '
                    b"cbor:[{b'msg': b'foo', b'labels': [b'label']}]"
                )
            ],
        )

    def testargandlabel(self):
        stream = framing.stream(1)
        val = list(
            framing.createtextoutputframe(
                stream, 1, [(b'foo %s', [b'arg'], [b'label']),]
            )
        )

        self.assertEqual(
            val,
            [
                ffs(
                    b'1 1 stream-begin text-output 0 '
                    b"cbor:[{b'msg': b'foo %s', b'args': [b'arg'], "
                    b"b'labels': [b'label']}]"
                )
            ],
        )


if __name__ == '__main__':
    import silenttestrunner

    silenttestrunner.main(__name__)