changeset 37542:1ec5ce21cb46

tests: extract wire protocol framing tests to own file I was lazy when I put these in test-wireproto-serverreactor.py. Let's do it properly. Differential Revision: https://phab.mercurial-scm.org/D3222
author Gregory Szorc <gregory.szorc@gmail.com>
date Mon, 09 Apr 2018 14:17:57 -0700
parents 3e5e37204b32
children 01361be9e2dc
files tests/test-wireproto-framing.py tests/test-wireproto-serverreactor.py
diffstat 2 files changed, 191 insertions(+), 176 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/test-wireproto-framing.py	Mon Apr 09 14:17:57 2018 -0700
@@ -0,0 +1,191 @@
+from __future__ import absolute_import, print_function
+
+import unittest
+
+from mercurial import (
+    util,
+    wireprotoframing as framing,
+)
+
+ffs = framing.makeframefromhumanstring
+
+class FrameHumanStringTests(unittest.TestCase):
+    def testbasic(self):
+        self.assertEqual(ffs(b'1 1 0 1 0 '),
+                         b'\x00\x00\x00\x01\x00\x01\x00\x10')
+
+        self.assertEqual(ffs(b'2 4 0 1 0 '),
+                         b'\x00\x00\x00\x02\x00\x04\x00\x10')
+
+        self.assertEqual(ffs(b'2 4 0 1 0 foo'),
+                         b'\x03\x00\x00\x02\x00\x04\x00\x10foo')
+
+    def testcborint(self):
+        self.assertEqual(ffs(b'1 1 0 1 0 cbor:15'),
+                         b'\x01\x00\x00\x01\x00\x01\x00\x10\x0f')
+
+        self.assertEqual(ffs(b'1 1 0 1 0 cbor:42'),
+                         b'\x02\x00\x00\x01\x00\x01\x00\x10\x18*')
+
+        self.assertEqual(ffs(b'1 1 0 1 0 cbor:1048576'),
+                         b'\x05\x00\x00\x01\x00\x01\x00\x10\x1a'
+                         b'\x00\x10\x00\x00')
+
+        self.assertEqual(ffs(b'1 1 0 1 0 cbor:0'),
+                         b'\x01\x00\x00\x01\x00\x01\x00\x10\x00')
+
+        self.assertEqual(ffs(b'1 1 0 1 0 cbor:-1'),
+                         b'\x01\x00\x00\x01\x00\x01\x00\x10 ')
+
+        self.assertEqual(ffs(b'1 1 0 1 0 cbor:-342542'),
+                         b'\x05\x00\x00\x01\x00\x01\x00\x10:\x00\x05:\r')
+
+    def testcborstrings(self):
+        self.assertEqual(ffs(b"1 1 0 1 0 cbor:b'foo'"),
+                         b'\x04\x00\x00\x01\x00\x01\x00\x10Cfoo')
+
+        self.assertEqual(ffs(b"1 1 0 1 0 cbor:u'foo'"),
+                         b'\x04\x00\x00\x01\x00\x01\x00\x10cfoo')
+
+    def testcborlists(self):
+        self.assertEqual(ffs(b"1 1 0 1 0 cbor:[None, True, False, 42, b'foo']"),
+                         b'\n\x00\x00\x01\x00\x01\x00\x10\x85\xf6\xf5\xf4'
+                         b'\x18*Cfoo')
+
+    def testcbordicts(self):
+        self.assertEqual(ffs(b"1 1 0 1 0 "
+                             b"cbor:{b'foo': b'val1', b'bar': b'val2'}"),
+                         b'\x13\x00\x00\x01\x00\x01\x00\x10\xa2'
+                         b'CbarDval2CfooDval1')
+
+class FrameTests(unittest.TestCase):
+    def testdataexactframesize(self):
+        data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE)
+
+        stream = framing.stream(1)
+        frames = list(framing.createcommandframes(stream, 1, b'command',
+                                                  {}, data))
+        self.assertEqual(frames, [
+            ffs(b'1 1 stream-begin command-request new|have-data '
+                b"cbor:{b'name': b'command'}"),
+            ffs(b'1 1 0 command-data continuation %s' % data.getvalue()),
+            ffs(b'1 1 0 command-data eos ')
+        ])
+
+    def testdatamultipleframes(self):
+        data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1))
+
+        stream = framing.stream(1)
+        frames = list(framing.createcommandframes(stream, 1, b'command', {},
+                                                  data))
+        self.assertEqual(frames, [
+            ffs(b'1 1 stream-begin command-request new|have-data '
+                b"cbor:{b'name': b'command'}"),
+            ffs(b'1 1 0 command-data continuation %s' % (
+                b'x' * framing.DEFAULT_MAX_FRAME_SIZE)),
+            ffs(b'1 1 0 command-data eos x'),
+        ])
+
+    def testargsanddata(self):
+        data = util.bytesio(b'x' * 100)
+
+        stream = framing.stream(1)
+        frames = list(framing.createcommandframes(stream, 1, b'command', {
+            b'key1': b'key1value',
+            b'key2': b'key2value',
+            b'key3': b'key3value',
+        }, data))
+
+        self.assertEqual(frames, [
+            ffs(b'1 1 stream-begin command-request new|have-data '
+                b"cbor:{b'name': b'command', b'args': {b'key1': b'key1value', "
+                b"b'key2': b'key2value', b'key3': b'key3value'}}"),
+            ffs(b'1 1 0 command-data eos %s' % data.getvalue()),
+        ])
+
+    def testtextoutputformattingstringtype(self):
+        """Formatting string must be bytes."""
+        with self.assertRaisesRegexp(ValueError, 'must use bytes formatting '):
+            list(framing.createtextoutputframe(None, 1, [
+                (b'foo'.decode('ascii'), [], [])]))
+
+    def testtextoutputargumentbytes(self):
+        with self.assertRaisesRegexp(ValueError, 'must use bytes for argument'):
+            list(framing.createtextoutputframe(None, 1, [
+                (b'foo', [b'foo'.decode('ascii')], [])]))
+
+    def testtextoutputlabelbytes(self):
+        with self.assertRaisesRegexp(ValueError, 'must use bytes for labels'):
+            list(framing.createtextoutputframe(None, 1, [
+                (b'foo', [], [b'foo'.decode('ascii')])]))
+
+    def testtextoutput1simpleatom(self):
+        stream = framing.stream(1)
+        val = list(framing.createtextoutputframe(stream, 1, [
+            (b'foo', [], [])]))
+
+        self.assertEqual(val, [
+            ffs(b'1 1 stream-begin text-output 0 '
+                b"cbor:[{b'msg': b'foo'}]"),
+        ])
+
+    def testtextoutput2simpleatoms(self):
+        stream = framing.stream(1)
+        val = list(framing.createtextoutputframe(stream, 1, [
+            (b'foo', [], []),
+            (b'bar', [], []),
+        ]))
+
+        self.assertEqual(val, [
+            ffs(b'1 1 stream-begin text-output 0 '
+                b"cbor:[{b'msg': b'foo'}, {b'msg': b'bar'}]")
+        ])
+
+    def testtextoutput1arg(self):
+        stream = framing.stream(1)
+        val = list(framing.createtextoutputframe(stream, 1, [
+            (b'foo %s', [b'val1'], []),
+        ]))
+
+        self.assertEqual(val, [
+            ffs(b'1 1 stream-begin text-output 0 '
+                b"cbor:[{b'msg': b'foo %s', b'args': [b'val1']}]")
+        ])
+
+    def testtextoutput2arg(self):
+        stream = framing.stream(1)
+        val = list(framing.createtextoutputframe(stream, 1, [
+            (b'foo %s %s', [b'val', b'value'], []),
+        ]))
+
+        self.assertEqual(val, [
+            ffs(b'1 1 stream-begin text-output 0 '
+                b"cbor:[{b'msg': b'foo %s %s', b'args': [b'val', b'value']}]")
+        ])
+
+    def testtextoutput1label(self):
+        stream = framing.stream(1)
+        val = list(framing.createtextoutputframe(stream, 1, [
+            (b'foo', [], [b'label']),
+        ]))
+
+        self.assertEqual(val, [
+            ffs(b'1 1 stream-begin text-output 0 '
+                b"cbor:[{b'msg': b'foo', b'labels': [b'label']}]")
+        ])
+
+    def testargandlabel(self):
+        stream = framing.stream(1)
+        val = list(framing.createtextoutputframe(stream, 1, [
+            (b'foo %s', [b'arg'], [b'label']),
+        ]))
+
+        self.assertEqual(val, [
+            ffs(b'1 1 stream-begin text-output 0 '
+                b"cbor:[{b'msg': b'foo %s', b'args': [b'arg'], "
+                b"b'labels': [b'label']}]")
+        ])
+
+if __name__ == '__main__':
+    import silenttestrunner
+    silenttestrunner.main(__name__)
--- a/tests/test-wireproto-serverreactor.py	Mon Apr 09 11:33:38 2018 -0700
+++ b/tests/test-wireproto-serverreactor.py	Mon Apr 09 14:17:57 2018 -0700
@@ -38,182 +38,6 @@
                       framing.createcommandframes(stream, rid, cmd, args,
                                                   datafh))
 
-class FrameHumanStringTests(unittest.TestCase):
-    def testbasic(self):
-        self.assertEqual(ffs(b'1 1 0 1 0 '),
-                         b'\x00\x00\x00\x01\x00\x01\x00\x10')
-
-        self.assertEqual(ffs(b'2 4 0 1 0 '),
-                         b'\x00\x00\x00\x02\x00\x04\x00\x10')
-
-        self.assertEqual(ffs(b'2 4 0 1 0 foo'),
-                         b'\x03\x00\x00\x02\x00\x04\x00\x10foo')
-
-    def testcborint(self):
-        self.assertEqual(ffs(b'1 1 0 1 0 cbor:15'),
-                         b'\x01\x00\x00\x01\x00\x01\x00\x10\x0f')
-
-        self.assertEqual(ffs(b'1 1 0 1 0 cbor:42'),
-                         b'\x02\x00\x00\x01\x00\x01\x00\x10\x18*')
-
-        self.assertEqual(ffs(b'1 1 0 1 0 cbor:1048576'),
-                         b'\x05\x00\x00\x01\x00\x01\x00\x10\x1a'
-                         b'\x00\x10\x00\x00')
-
-        self.assertEqual(ffs(b'1 1 0 1 0 cbor:0'),
-                         b'\x01\x00\x00\x01\x00\x01\x00\x10\x00')
-
-        self.assertEqual(ffs(b'1 1 0 1 0 cbor:-1'),
-                         b'\x01\x00\x00\x01\x00\x01\x00\x10 ')
-
-        self.assertEqual(ffs(b'1 1 0 1 0 cbor:-342542'),
-                         b'\x05\x00\x00\x01\x00\x01\x00\x10:\x00\x05:\r')
-
-    def testcborstrings(self):
-        self.assertEqual(ffs(b"1 1 0 1 0 cbor:b'foo'"),
-                         b'\x04\x00\x00\x01\x00\x01\x00\x10Cfoo')
-
-        self.assertEqual(ffs(b"1 1 0 1 0 cbor:u'foo'"),
-                         b'\x04\x00\x00\x01\x00\x01\x00\x10cfoo')
-
-    def testcborlists(self):
-        self.assertEqual(ffs(b"1 1 0 1 0 cbor:[None, True, False, 42, b'foo']"),
-                         b'\n\x00\x00\x01\x00\x01\x00\x10\x85\xf6\xf5\xf4'
-                         b'\x18*Cfoo')
-
-    def testcbordicts(self):
-        self.assertEqual(ffs(b"1 1 0 1 0 "
-                             b"cbor:{b'foo': b'val1', b'bar': b'val2'}"),
-                         b'\x13\x00\x00\x01\x00\x01\x00\x10\xa2'
-                         b'CbarDval2CfooDval1')
-
-class FrameTests(unittest.TestCase):
-    def testdataexactframesize(self):
-        data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE)
-
-        stream = framing.stream(1)
-        frames = list(framing.createcommandframes(stream, 1, b'command',
-                                                  {}, data))
-        self.assertEqual(frames, [
-            ffs(b'1 1 stream-begin command-request new|have-data '
-                b"cbor:{b'name': b'command'}"),
-            ffs(b'1 1 0 command-data continuation %s' % data.getvalue()),
-            ffs(b'1 1 0 command-data eos ')
-        ])
-
-    def testdatamultipleframes(self):
-        data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1))
-
-        stream = framing.stream(1)
-        frames = list(framing.createcommandframes(stream, 1, b'command', {},
-                                                  data))
-        self.assertEqual(frames, [
-            ffs(b'1 1 stream-begin command-request new|have-data '
-                b"cbor:{b'name': b'command'}"),
-            ffs(b'1 1 0 command-data continuation %s' % (
-                b'x' * framing.DEFAULT_MAX_FRAME_SIZE)),
-            ffs(b'1 1 0 command-data eos x'),
-        ])
-
-    def testargsanddata(self):
-        data = util.bytesio(b'x' * 100)
-
-        stream = framing.stream(1)
-        frames = list(framing.createcommandframes(stream, 1, b'command', {
-            b'key1': b'key1value',
-            b'key2': b'key2value',
-            b'key3': b'key3value',
-        }, data))
-
-        self.assertEqual(frames, [
-            ffs(b'1 1 stream-begin command-request new|have-data '
-                b"cbor:{b'name': b'command', b'args': {b'key1': b'key1value', "
-                b"b'key2': b'key2value', b'key3': b'key3value'}}"),
-            ffs(b'1 1 0 command-data eos %s' % data.getvalue()),
-        ])
-
-    def testtextoutputformattingstringtype(self):
-        """Formatting string must be bytes."""
-        with self.assertRaisesRegexp(ValueError, 'must use bytes formatting '):
-            list(framing.createtextoutputframe(None, 1, [
-                (b'foo'.decode('ascii'), [], [])]))
-
-    def testtextoutputargumentbytes(self):
-        with self.assertRaisesRegexp(ValueError, 'must use bytes for argument'):
-            list(framing.createtextoutputframe(None, 1, [
-                (b'foo', [b'foo'.decode('ascii')], [])]))
-
-    def testtextoutputlabelbytes(self):
-        with self.assertRaisesRegexp(ValueError, 'must use bytes for labels'):
-            list(framing.createtextoutputframe(None, 1, [
-                (b'foo', [], [b'foo'.decode('ascii')])]))
-
-    def testtextoutput1simpleatom(self):
-        stream = framing.stream(1)
-        val = list(framing.createtextoutputframe(stream, 1, [
-            (b'foo', [], [])]))
-
-        self.assertEqual(val, [
-            ffs(b'1 1 stream-begin text-output 0 '
-                b"cbor:[{b'msg': b'foo'}]"),
-        ])
-
-    def testtextoutput2simpleatoms(self):
-        stream = framing.stream(1)
-        val = list(framing.createtextoutputframe(stream, 1, [
-            (b'foo', [], []),
-            (b'bar', [], []),
-        ]))
-
-        self.assertEqual(val, [
-            ffs(b'1 1 stream-begin text-output 0 '
-                b"cbor:[{b'msg': b'foo'}, {b'msg': b'bar'}]")
-        ])
-
-    def testtextoutput1arg(self):
-        stream = framing.stream(1)
-        val = list(framing.createtextoutputframe(stream, 1, [
-            (b'foo %s', [b'val1'], []),
-        ]))
-
-        self.assertEqual(val, [
-            ffs(b'1 1 stream-begin text-output 0 '
-                b"cbor:[{b'msg': b'foo %s', b'args': [b'val1']}]")
-        ])
-
-    def testtextoutput2arg(self):
-        stream = framing.stream(1)
-        val = list(framing.createtextoutputframe(stream, 1, [
-            (b'foo %s %s', [b'val', b'value'], []),
-        ]))
-
-        self.assertEqual(val, [
-            ffs(b'1 1 stream-begin text-output 0 '
-                b"cbor:[{b'msg': b'foo %s %s', b'args': [b'val', b'value']}]")
-        ])
-
-    def testtextoutput1label(self):
-        stream = framing.stream(1)
-        val = list(framing.createtextoutputframe(stream, 1, [
-            (b'foo', [], [b'label']),
-        ]))
-
-        self.assertEqual(val, [
-            ffs(b'1 1 stream-begin text-output 0 '
-                b"cbor:[{b'msg': b'foo', b'labels': [b'label']}]")
-        ])
-
-    def testargandlabel(self):
-        stream = framing.stream(1)
-        val = list(framing.createtextoutputframe(stream, 1, [
-            (b'foo %s', [b'arg'], [b'label']),
-        ]))
-
-        self.assertEqual(val, [
-            ffs(b'1 1 stream-begin text-output 0 '
-                b"cbor:[{b'msg': b'foo %s', b'args': [b'arg'], "
-                b"b'labels': [b'label']}]")
-        ])
 
 class ServerReactorTests(unittest.TestCase):
     def _sendsingleframe(self, reactor, f):