view tests/test-wireproto-framing.py @ 40172:30f70d11c224

wireprotov2: advertise recommended batch size for requests Currently, exchangev2 hardcodes the batch size for how many revisions to fetch per command request. A single value is not appropriate for every repository because some repositories may have a drastically different "shape" from other repositories. e.g. a repo with lots of small files may benefit from larger batch sizes than a repo with lots of large files. And depending on caching used by the server, the server may wish to control the number of commands (to e.g. mitigate overhead of following content redirects). This commit teaches wireprotov2 commands to declare extra metadata which is advertised as part of the command descriptor. The manifestdata command has been taught to advertise a recommended batch size for requests. Differential Revision: https://phab.mercurial-scm.org/D4975
author Gregory Szorc <gregory.szorc@gmail.com>
date Mon, 08 Oct 2018 17:45:51 -0700
parents fcc6bd11444b
children 2372284d9457
line wrap: on
line source

from __future__ import absolute_import, print_function

import unittest

from mercurial import (
    util,
    wireprotoframing as framing,
)

ffs = framing.makeframefromhumanstring

class FrameHumanStringTests(unittest.TestCase):
    def testbasic(self):
        self.assertEqual(ffs(b'1 1 0 1 0 '),
                         b'\x00\x00\x00\x01\x00\x01\x00\x10')

        self.assertEqual(ffs(b'2 4 0 1 0 '),
                         b'\x00\x00\x00\x02\x00\x04\x00\x10')

        self.assertEqual(ffs(b'2 4 0 1 0 foo'),
                         b'\x03\x00\x00\x02\x00\x04\x00\x10foo')

    def testcborint(self):
        self.assertEqual(ffs(b'1 1 0 1 0 cbor:15'),
                         b'\x01\x00\x00\x01\x00\x01\x00\x10\x0f')

        self.assertEqual(ffs(b'1 1 0 1 0 cbor:42'),
                         b'\x02\x00\x00\x01\x00\x01\x00\x10\x18*')

        self.assertEqual(ffs(b'1 1 0 1 0 cbor:1048576'),
                         b'\x05\x00\x00\x01\x00\x01\x00\x10\x1a'
                         b'\x00\x10\x00\x00')

        self.assertEqual(ffs(b'1 1 0 1 0 cbor:0'),
                         b'\x01\x00\x00\x01\x00\x01\x00\x10\x00')

        self.assertEqual(ffs(b'1 1 0 1 0 cbor:-1'),
                         b'\x01\x00\x00\x01\x00\x01\x00\x10 ')

        self.assertEqual(ffs(b'1 1 0 1 0 cbor:-342542'),
                         b'\x05\x00\x00\x01\x00\x01\x00\x10:\x00\x05:\r')

    def testcborstrings(self):
        self.assertEqual(ffs(b"1 1 0 1 0 cbor:b'foo'"),
                         b'\x04\x00\x00\x01\x00\x01\x00\x10Cfoo')

    def testcborlists(self):
        self.assertEqual(ffs(b"1 1 0 1 0 cbor:[None, True, False, 42, b'foo']"),
                         b'\n\x00\x00\x01\x00\x01\x00\x10\x85\xf6\xf5\xf4'
                         b'\x18*Cfoo')

    def testcbordicts(self):
        self.assertEqual(ffs(b"1 1 0 1 0 "
                             b"cbor:{b'foo': b'val1', b'bar': b'val2'}"),
                         b'\x13\x00\x00\x01\x00\x01\x00\x10\xa2'
                         b'CbarDval2CfooDval1')

class FrameTests(unittest.TestCase):
    def testdataexactframesize(self):
        data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE)

        stream = framing.stream(1)
        frames = list(framing.createcommandframes(stream, 1, b'command',
                                                  {}, data))
        self.assertEqual(frames, [
            ffs(b'1 1 stream-begin command-request new|have-data '
                b"cbor:{b'name': b'command'}"),
            ffs(b'1 1 0 command-data continuation %s' % data.getvalue()),
            ffs(b'1 1 0 command-data eos ')
        ])

    def testdatamultipleframes(self):
        data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1))

        stream = framing.stream(1)
        frames = list(framing.createcommandframes(stream, 1, b'command', {},
                                                  data))
        self.assertEqual(frames, [
            ffs(b'1 1 stream-begin command-request new|have-data '
                b"cbor:{b'name': b'command'}"),
            ffs(b'1 1 0 command-data continuation %s' % (
                b'x' * framing.DEFAULT_MAX_FRAME_SIZE)),
            ffs(b'1 1 0 command-data eos x'),
        ])

    def testargsanddata(self):
        data = util.bytesio(b'x' * 100)

        stream = framing.stream(1)
        frames = list(framing.createcommandframes(stream, 1, b'command', {
            b'key1': b'key1value',
            b'key2': b'key2value',
            b'key3': b'key3value',
        }, data))

        self.assertEqual(frames, [
            ffs(b'1 1 stream-begin command-request new|have-data '
                b"cbor:{b'name': b'command', b'args': {b'key1': b'key1value', "
                b"b'key2': b'key2value', b'key3': b'key3value'}}"),
            ffs(b'1 1 0 command-data eos %s' % data.getvalue()),
        ])

    if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
        # Python 3.7 deprecates the regex*p* version, but 2.7 lacks
        # the regex version.
        assertRaisesRegex = (# camelcase-required
            unittest.TestCase.assertRaisesRegexp)

    def testtextoutputformattingstringtype(self):
        """Formatting string must be bytes."""
        with self.assertRaisesRegex(ValueError, 'must use bytes formatting '):
            list(framing.createtextoutputframe(None, 1, [
                (b'foo'.decode('ascii'), [], [])]))

    def testtextoutputargumentbytes(self):
        with self.assertRaisesRegex(ValueError, 'must use bytes for argument'):
            list(framing.createtextoutputframe(None, 1, [
                (b'foo', [b'foo'.decode('ascii')], [])]))

    def testtextoutputlabelbytes(self):
        with self.assertRaisesRegex(ValueError, 'must use bytes for labels'):
            list(framing.createtextoutputframe(None, 1, [
                (b'foo', [], [b'foo'.decode('ascii')])]))

    def testtextoutput1simpleatom(self):
        stream = framing.stream(1)
        val = list(framing.createtextoutputframe(stream, 1, [
            (b'foo', [], [])]))

        self.assertEqual(val, [
            ffs(b'1 1 stream-begin text-output 0 '
                b"cbor:[{b'msg': b'foo'}]"),
        ])

    def testtextoutput2simpleatoms(self):
        stream = framing.stream(1)
        val = list(framing.createtextoutputframe(stream, 1, [
            (b'foo', [], []),
            (b'bar', [], []),
        ]))

        self.assertEqual(val, [
            ffs(b'1 1 stream-begin text-output 0 '
                b"cbor:[{b'msg': b'foo'}, {b'msg': b'bar'}]")
        ])

    def testtextoutput1arg(self):
        stream = framing.stream(1)
        val = list(framing.createtextoutputframe(stream, 1, [
            (b'foo %s', [b'val1'], []),
        ]))

        self.assertEqual(val, [
            ffs(b'1 1 stream-begin text-output 0 '
                b"cbor:[{b'msg': b'foo %s', b'args': [b'val1']}]")
        ])

    def testtextoutput2arg(self):
        stream = framing.stream(1)
        val = list(framing.createtextoutputframe(stream, 1, [
            (b'foo %s %s', [b'val', b'value'], []),
        ]))

        self.assertEqual(val, [
            ffs(b'1 1 stream-begin text-output 0 '
                b"cbor:[{b'msg': b'foo %s %s', b'args': [b'val', b'value']}]")
        ])

    def testtextoutput1label(self):
        stream = framing.stream(1)
        val = list(framing.createtextoutputframe(stream, 1, [
            (b'foo', [], [b'label']),
        ]))

        self.assertEqual(val, [
            ffs(b'1 1 stream-begin text-output 0 '
                b"cbor:[{b'msg': b'foo', b'labels': [b'label']}]")
        ])

    def testargandlabel(self):
        stream = framing.stream(1)
        val = list(framing.createtextoutputframe(stream, 1, [
            (b'foo %s', [b'arg'], [b'label']),
        ]))

        self.assertEqual(val, [
            ffs(b'1 1 stream-begin text-output 0 '
                b"cbor:[{b'msg': b'foo %s', b'args': [b'arg'], "
                b"b'labels': [b'label']}]")
        ])

if __name__ == '__main__':
    import silenttestrunner
    silenttestrunner.main(__name__)