tests/test-wireproto-serverreactor.py
changeset 37055 8c3c47362934
child 37058 61393f888dfe
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/test-wireproto-serverreactor.py	Wed Mar 14 15:25:06 2018 -0700
@@ -0,0 +1,275 @@
+from __future__ import absolute_import, print_function
+
+import unittest
+
+from mercurial import (
+    util,
+    wireprotoframing as framing,
+)
+
+ffs = framing.makeframefromhumanstring
+
+def makereactor():
+    return framing.serverreactor()
+
+def sendframes(reactor, gen):
+    """Send a generator of frame bytearray to a reactor.
+
+    Emits a generator of results from ``onframerecv()`` calls.
+    """
+    for frame in gen:
+        frametype, frameflags, framelength = framing.parseheader(frame)
+        payload = frame[framing.FRAME_HEADER_SIZE:]
+        assert len(payload) == framelength
+
+        yield reactor.onframerecv(frametype, frameflags, payload)
+
+def sendcommandframes(reactor, cmd, args, datafh=None):
+    """Generate frames to run a command and send them to a reactor."""
+    return sendframes(reactor, framing.createcommandframes(cmd, args, datafh))
+
+class FrameTests(unittest.TestCase):
+    def testdataexactframesize(self):
+        data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE)
+
+        frames = list(framing.createcommandframes(b'command', {}, data))
+        self.assertEqual(frames, [
+            ffs(b'command-name have-data command'),
+            ffs(b'command-data continuation %s' % data.getvalue()),
+            ffs(b'command-data eos ')
+        ])
+
+    def testdatamultipleframes(self):
+        data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1))
+        frames = list(framing.createcommandframes(b'command', {}, data))
+        self.assertEqual(frames, [
+            ffs(b'command-name have-data command'),
+            ffs(b'command-data continuation %s' % (
+                b'x' * framing.DEFAULT_MAX_FRAME_SIZE)),
+            ffs(b'command-data eos x'),
+        ])
+
+    def testargsanddata(self):
+        data = util.bytesio(b'x' * 100)
+
+        frames = list(framing.createcommandframes(b'command', {
+            b'key1': b'key1value',
+            b'key2': b'key2value',
+            b'key3': b'key3value',
+        }, data))
+
+        self.assertEqual(frames, [
+            ffs(b'command-name have-args|have-data command'),
+            ffs(br'command-argument 0 \x04\x00\x09\x00key1key1value'),
+            ffs(br'command-argument 0 \x04\x00\x09\x00key2key2value'),
+            ffs(br'command-argument eoa \x04\x00\x09\x00key3key3value'),
+            ffs(b'command-data eos %s' % data.getvalue()),
+        ])
+
+class ServerReactorTests(unittest.TestCase):
+    def _sendsingleframe(self, reactor, s):
+        results = list(sendframes(reactor, [ffs(s)]))
+        self.assertEqual(len(results), 1)
+
+        return results[0]
+
+    def assertaction(self, res, expected):
+        self.assertIsInstance(res, tuple)
+        self.assertEqual(len(res), 2)
+        self.assertIsInstance(res[1], dict)
+        self.assertEqual(res[0], expected)
+
+    def test1framecommand(self):
+        """Receiving a command in a single frame yields request to run it."""
+        reactor = makereactor()
+        results = list(sendcommandframes(reactor, b'mycommand', {}))
+        self.assertEqual(len(results), 1)
+        self.assertaction(results[0], 'runcommand')
+        self.assertEqual(results[0][1], {
+            'command': b'mycommand',
+            'args': {},
+            'data': None,
+        })
+
+    def test1argument(self):
+        reactor = makereactor()
+        results = list(sendcommandframes(reactor, b'mycommand',
+                                         {b'foo': b'bar'}))
+        self.assertEqual(len(results), 2)
+        self.assertaction(results[0], 'wantframe')
+        self.assertaction(results[1], 'runcommand')
+        self.assertEqual(results[1][1], {
+            'command': b'mycommand',
+            'args': {b'foo': b'bar'},
+            'data': None,
+        })
+
+    def testmultiarguments(self):
+        reactor = makereactor()
+        results = list(sendcommandframes(reactor, b'mycommand',
+                                         {b'foo': b'bar', b'biz': b'baz'}))
+        self.assertEqual(len(results), 3)
+        self.assertaction(results[0], 'wantframe')
+        self.assertaction(results[1], 'wantframe')
+        self.assertaction(results[2], 'runcommand')
+        self.assertEqual(results[2][1], {
+            'command': b'mycommand',
+            'args': {b'foo': b'bar', b'biz': b'baz'},
+            'data': None,
+        })
+
+    def testsimplecommanddata(self):
+        reactor = makereactor()
+        results = list(sendcommandframes(reactor, b'mycommand', {},
+                                         util.bytesio(b'data!')))
+        self.assertEqual(len(results), 2)
+        self.assertaction(results[0], 'wantframe')
+        self.assertaction(results[1], 'runcommand')
+        self.assertEqual(results[1][1], {
+            'command': b'mycommand',
+            'args': {},
+            'data': b'data!',
+        })
+
+    def testmultipledataframes(self):
+        frames = [
+            ffs(b'command-name have-data mycommand'),
+            ffs(b'command-data continuation data1'),
+            ffs(b'command-data continuation data2'),
+            ffs(b'command-data eos data3'),
+        ]
+
+        reactor = makereactor()
+        results = list(sendframes(reactor, frames))
+        self.assertEqual(len(results), 4)
+        for i in range(3):
+            self.assertaction(results[i], 'wantframe')
+        self.assertaction(results[3], 'runcommand')
+        self.assertEqual(results[3][1], {
+            'command': b'mycommand',
+            'args': {},
+            'data': b'data1data2data3',
+        })
+
+    def testargumentanddata(self):
+        frames = [
+            ffs(b'command-name have-args|have-data command'),
+            ffs(br'command-argument 0 \x03\x00\x03\x00keyval'),
+            ffs(br'command-argument eoa \x03\x00\x03\x00foobar'),
+            ffs(b'command-data continuation value1'),
+            ffs(b'command-data eos value2'),
+        ]
+
+        reactor = makereactor()
+        results = list(sendframes(reactor, frames))
+
+        self.assertaction(results[-1], 'runcommand')
+        self.assertEqual(results[-1][1], {
+            'command': b'command',
+            'args': {
+                b'key': b'val',
+                b'foo': b'bar',
+            },
+            'data': b'value1value2',
+        })
+
+    def testunexpectedcommandargument(self):
+        """Command argument frame when not running a command is an error."""
+        result = self._sendsingleframe(makereactor(),
+                                       b'command-argument 0 ignored')
+        self.assertaction(result, 'error')
+        self.assertEqual(result[1], {
+            'message': b'expected command frame; got 2',
+        })
+
+    def testunexpectedcommanddata(self):
+        """Command argument frame when not running a command is an error."""
+        result = self._sendsingleframe(makereactor(),
+                                       b'command-data 0 ignored')
+        self.assertaction(result, 'error')
+        self.assertEqual(result[1], {
+            'message': b'expected command frame; got 3',
+        })
+
+    def testmissingcommandframeflags(self):
+        """Command name frame must have flags set."""
+        result = self._sendsingleframe(makereactor(),
+                                       b'command-name 0 command')
+        self.assertaction(result, 'error')
+        self.assertEqual(result[1], {
+            'message': b'missing frame flags on command frame',
+        })
+
+    def testmissingargumentframe(self):
+        frames = [
+            ffs(b'command-name have-args command'),
+            ffs(b'command-name 0 ignored'),
+        ]
+
+        results = list(sendframes(makereactor(), frames))
+        self.assertEqual(len(results), 2)
+        self.assertaction(results[0], 'wantframe')
+        self.assertaction(results[1], 'error')
+        self.assertEqual(results[1][1], {
+            'message': b'expected command argument frame; got 1',
+        })
+
+    def testincompleteargumentname(self):
+        """Argument frame with incomplete name."""
+        frames = [
+            ffs(b'command-name have-args command1'),
+            ffs(br'command-argument eoa \x04\x00\xde\xadfoo'),
+        ]
+
+        results = list(sendframes(makereactor(), frames))
+        self.assertEqual(len(results), 2)
+        self.assertaction(results[0], 'wantframe')
+        self.assertaction(results[1], 'error')
+        self.assertEqual(results[1][1], {
+            'message': b'malformed argument frame: partial argument name',
+        })
+
+    def testincompleteargumentvalue(self):
+        """Argument frame with incomplete value."""
+        frames = [
+            ffs(b'command-name have-args command'),
+            ffs(br'command-argument eoa \x03\x00\xaa\xaafoopartialvalue'),
+        ]
+
+        results = list(sendframes(makereactor(), frames))
+        self.assertEqual(len(results), 2)
+        self.assertaction(results[0], 'wantframe')
+        self.assertaction(results[1], 'error')
+        self.assertEqual(results[1][1], {
+            'message': b'malformed argument frame: partial argument value',
+        })
+
+    def testmissingcommanddataframe(self):
+        frames = [
+            ffs(b'command-name have-data command1'),
+            ffs(b'command-name eos command2'),
+        ]
+        results = list(sendframes(makereactor(), frames))
+        self.assertEqual(len(results), 2)
+        self.assertaction(results[0], 'wantframe')
+        self.assertaction(results[1], 'error')
+        self.assertEqual(results[1][1], {
+            'message': b'expected command data frame; got 1',
+        })
+
+    def testmissingcommanddataframeflags(self):
+        frames = [
+            ffs(b'command-name have-data command1'),
+            ffs(b'command-data 0 data'),
+        ]
+        results = list(sendframes(makereactor(), frames))
+        self.assertEqual(len(results), 2)
+        self.assertaction(results[0], 'wantframe')
+        self.assertaction(results[1], 'error')
+        self.assertEqual(results[1][1], {
+            'message': b'command data frame without flags',
+        })
+
+if __name__ == '__main__':
+    import silenttestrunner
+    silenttestrunner.main(__name__)