view tests/test-wireproto-framing.py @ 38486:4c0683655599

namespaces: let namespaces override singlenode() definition Some namespaces have multiple nodes per name (meaning that their namemap() returns multiple nodes). One such namespace is the "topics" namespace (from the evolve repo). We also have our own internal namespace at Google (for review units) that has multiple nodes per name. These namespaces may not want to use the default "pick highest revnum" resolution that we currently use when resolving a name to a single node. As an example, they may decide that `hg co <name>` should check out a commit that's last in some sense even if an earlier commit had just been amended and thus had a higher revnum [1]. This patch gives the namespace the option to continue to return multiple nodes and to override how the best node is picked. Allowing namespaces to override that may also be useful as an optimization (it may be cheaper for the namespace to find just that node). I have been arguing (in D3715) for using all the nodes returned from namemap() when resolving the symbol to a revset, so e.g. `hg log -r stable` would resolve to *all* nodes on stable, not just the one with the highest revnum (except that I don't actually think we should change it for the branch namespace because of BC). Most people seem opposed to that. If we decide not to do it, I think we can deprecate the namemap() function in favor of the new singlenode() (I find it weird to have namespaces, like the branch namespace, where namemap() isn't nodemap()'s inverse). I therefore think this patch makes sense regardless of what we decide on that issue. [1] Actually, even the branch namespace would have wanted to override singlenode() if it had supported multiple nodes. That's because closes branch heads are mostly ignored, so "hg co default" will not check out the highest-revnum node if that's a closed head. Differential Revision: https://phab.mercurial-scm.org/D3852
author Martin von Zweigbergk <martinvonz@google.com>
date Tue, 26 Jun 2018 10:02:01 -0700
parents 1859b9a7ddef
children fcc6bd11444b
line wrap: on
line source

from __future__ import absolute_import, print_function

import unittest

from mercurial import (
    util,
    wireprotoframing as framing,
)

ffs = framing.makeframefromhumanstring

class FrameHumanStringTests(unittest.TestCase):
    def testbasic(self):
        self.assertEqual(ffs(b'1 1 0 1 0 '),
                         b'\x00\x00\x00\x01\x00\x01\x00\x10')

        self.assertEqual(ffs(b'2 4 0 1 0 '),
                         b'\x00\x00\x00\x02\x00\x04\x00\x10')

        self.assertEqual(ffs(b'2 4 0 1 0 foo'),
                         b'\x03\x00\x00\x02\x00\x04\x00\x10foo')

    def testcborint(self):
        self.assertEqual(ffs(b'1 1 0 1 0 cbor:15'),
                         b'\x01\x00\x00\x01\x00\x01\x00\x10\x0f')

        self.assertEqual(ffs(b'1 1 0 1 0 cbor:42'),
                         b'\x02\x00\x00\x01\x00\x01\x00\x10\x18*')

        self.assertEqual(ffs(b'1 1 0 1 0 cbor:1048576'),
                         b'\x05\x00\x00\x01\x00\x01\x00\x10\x1a'
                         b'\x00\x10\x00\x00')

        self.assertEqual(ffs(b'1 1 0 1 0 cbor:0'),
                         b'\x01\x00\x00\x01\x00\x01\x00\x10\x00')

        self.assertEqual(ffs(b'1 1 0 1 0 cbor:-1'),
                         b'\x01\x00\x00\x01\x00\x01\x00\x10 ')

        self.assertEqual(ffs(b'1 1 0 1 0 cbor:-342542'),
                         b'\x05\x00\x00\x01\x00\x01\x00\x10:\x00\x05:\r')

    def testcborstrings(self):
        self.assertEqual(ffs(b"1 1 0 1 0 cbor:b'foo'"),
                         b'\x04\x00\x00\x01\x00\x01\x00\x10Cfoo')

        self.assertEqual(ffs(b"1 1 0 1 0 cbor:u'foo'"),
                         b'\x04\x00\x00\x01\x00\x01\x00\x10cfoo')

    def testcborlists(self):
        self.assertEqual(ffs(b"1 1 0 1 0 cbor:[None, True, False, 42, b'foo']"),
                         b'\n\x00\x00\x01\x00\x01\x00\x10\x85\xf6\xf5\xf4'
                         b'\x18*Cfoo')

    def testcbordicts(self):
        self.assertEqual(ffs(b"1 1 0 1 0 "
                             b"cbor:{b'foo': b'val1', b'bar': b'val2'}"),
                         b'\x13\x00\x00\x01\x00\x01\x00\x10\xa2'
                         b'CbarDval2CfooDval1')

class FrameTests(unittest.TestCase):
    def testdataexactframesize(self):
        data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE)

        stream = framing.stream(1)
        frames = list(framing.createcommandframes(stream, 1, b'command',
                                                  {}, data))
        self.assertEqual(frames, [
            ffs(b'1 1 stream-begin command-request new|have-data '
                b"cbor:{b'name': b'command'}"),
            ffs(b'1 1 0 command-data continuation %s' % data.getvalue()),
            ffs(b'1 1 0 command-data eos ')
        ])

    def testdatamultipleframes(self):
        data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1))

        stream = framing.stream(1)
        frames = list(framing.createcommandframes(stream, 1, b'command', {},
                                                  data))
        self.assertEqual(frames, [
            ffs(b'1 1 stream-begin command-request new|have-data '
                b"cbor:{b'name': b'command'}"),
            ffs(b'1 1 0 command-data continuation %s' % (
                b'x' * framing.DEFAULT_MAX_FRAME_SIZE)),
            ffs(b'1 1 0 command-data eos x'),
        ])

    def testargsanddata(self):
        data = util.bytesio(b'x' * 100)

        stream = framing.stream(1)
        frames = list(framing.createcommandframes(stream, 1, b'command', {
            b'key1': b'key1value',
            b'key2': b'key2value',
            b'key3': b'key3value',
        }, data))

        self.assertEqual(frames, [
            ffs(b'1 1 stream-begin command-request new|have-data '
                b"cbor:{b'name': b'command', b'args': {b'key1': b'key1value', "
                b"b'key2': b'key2value', b'key3': b'key3value'}}"),
            ffs(b'1 1 0 command-data eos %s' % data.getvalue()),
        ])

    if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
        # Python 3.7 deprecates the regex*p* version, but 2.7 lacks
        # the regex version.
        assertRaisesRegex = (# camelcase-required
            unittest.TestCase.assertRaisesRegexp)

    def testtextoutputformattingstringtype(self):
        """Formatting string must be bytes."""
        with self.assertRaisesRegex(ValueError, 'must use bytes formatting '):
            list(framing.createtextoutputframe(None, 1, [
                (b'foo'.decode('ascii'), [], [])]))

    def testtextoutputargumentbytes(self):
        with self.assertRaisesRegex(ValueError, 'must use bytes for argument'):
            list(framing.createtextoutputframe(None, 1, [
                (b'foo', [b'foo'.decode('ascii')], [])]))

    def testtextoutputlabelbytes(self):
        with self.assertRaisesRegex(ValueError, 'must use bytes for labels'):
            list(framing.createtextoutputframe(None, 1, [
                (b'foo', [], [b'foo'.decode('ascii')])]))

    def testtextoutput1simpleatom(self):
        stream = framing.stream(1)
        val = list(framing.createtextoutputframe(stream, 1, [
            (b'foo', [], [])]))

        self.assertEqual(val, [
            ffs(b'1 1 stream-begin text-output 0 '
                b"cbor:[{b'msg': b'foo'}]"),
        ])

    def testtextoutput2simpleatoms(self):
        stream = framing.stream(1)
        val = list(framing.createtextoutputframe(stream, 1, [
            (b'foo', [], []),
            (b'bar', [], []),
        ]))

        self.assertEqual(val, [
            ffs(b'1 1 stream-begin text-output 0 '
                b"cbor:[{b'msg': b'foo'}, {b'msg': b'bar'}]")
        ])

    def testtextoutput1arg(self):
        stream = framing.stream(1)
        val = list(framing.createtextoutputframe(stream, 1, [
            (b'foo %s', [b'val1'], []),
        ]))

        self.assertEqual(val, [
            ffs(b'1 1 stream-begin text-output 0 '
                b"cbor:[{b'msg': b'foo %s', b'args': [b'val1']}]")
        ])

    def testtextoutput2arg(self):
        stream = framing.stream(1)
        val = list(framing.createtextoutputframe(stream, 1, [
            (b'foo %s %s', [b'val', b'value'], []),
        ]))

        self.assertEqual(val, [
            ffs(b'1 1 stream-begin text-output 0 '
                b"cbor:[{b'msg': b'foo %s %s', b'args': [b'val', b'value']}]")
        ])

    def testtextoutput1label(self):
        stream = framing.stream(1)
        val = list(framing.createtextoutputframe(stream, 1, [
            (b'foo', [], [b'label']),
        ]))

        self.assertEqual(val, [
            ffs(b'1 1 stream-begin text-output 0 '
                b"cbor:[{b'msg': b'foo', b'labels': [b'label']}]")
        ])

    def testargandlabel(self):
        stream = framing.stream(1)
        val = list(framing.createtextoutputframe(stream, 1, [
            (b'foo %s', [b'arg'], [b'label']),
        ]))

        self.assertEqual(val, [
            ffs(b'1 1 stream-begin text-output 0 '
                b"cbor:[{b'msg': b'foo %s', b'args': [b'arg'], "
                b"b'labels': [b'label']}]")
        ])

if __name__ == '__main__':
    import silenttestrunner
    silenttestrunner.main(__name__)