tests: extract wire protocol framing tests to own file
I was lazy when I put these in test-wireproto-serverreactor.py. Let's
do it properly.
Differential Revision: https://phab.mercurial-scm.org/D3222
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/test-wireproto-framing.py Mon Apr 09 14:17:57 2018 -0700
@@ -0,0 +1,191 @@
+from __future__ import absolute_import, print_function
+
+import unittest
+
+from mercurial import (
+ util,
+ wireprotoframing as framing,
+)
+
+ffs = framing.makeframefromhumanstring
+
+class FrameHumanStringTests(unittest.TestCase):
+ def testbasic(self):
+ self.assertEqual(ffs(b'1 1 0 1 0 '),
+ b'\x00\x00\x00\x01\x00\x01\x00\x10')
+
+ self.assertEqual(ffs(b'2 4 0 1 0 '),
+ b'\x00\x00\x00\x02\x00\x04\x00\x10')
+
+ self.assertEqual(ffs(b'2 4 0 1 0 foo'),
+ b'\x03\x00\x00\x02\x00\x04\x00\x10foo')
+
+ def testcborint(self):
+ self.assertEqual(ffs(b'1 1 0 1 0 cbor:15'),
+ b'\x01\x00\x00\x01\x00\x01\x00\x10\x0f')
+
+ self.assertEqual(ffs(b'1 1 0 1 0 cbor:42'),
+ b'\x02\x00\x00\x01\x00\x01\x00\x10\x18*')
+
+ self.assertEqual(ffs(b'1 1 0 1 0 cbor:1048576'),
+ b'\x05\x00\x00\x01\x00\x01\x00\x10\x1a'
+ b'\x00\x10\x00\x00')
+
+ self.assertEqual(ffs(b'1 1 0 1 0 cbor:0'),
+ b'\x01\x00\x00\x01\x00\x01\x00\x10\x00')
+
+ self.assertEqual(ffs(b'1 1 0 1 0 cbor:-1'),
+ b'\x01\x00\x00\x01\x00\x01\x00\x10 ')
+
+ self.assertEqual(ffs(b'1 1 0 1 0 cbor:-342542'),
+ b'\x05\x00\x00\x01\x00\x01\x00\x10:\x00\x05:\r')
+
+ def testcborstrings(self):
+ self.assertEqual(ffs(b"1 1 0 1 0 cbor:b'foo'"),
+ b'\x04\x00\x00\x01\x00\x01\x00\x10Cfoo')
+
+ self.assertEqual(ffs(b"1 1 0 1 0 cbor:u'foo'"),
+ b'\x04\x00\x00\x01\x00\x01\x00\x10cfoo')
+
+ def testcborlists(self):
+ self.assertEqual(ffs(b"1 1 0 1 0 cbor:[None, True, False, 42, b'foo']"),
+ b'\n\x00\x00\x01\x00\x01\x00\x10\x85\xf6\xf5\xf4'
+ b'\x18*Cfoo')
+
+ def testcbordicts(self):
+ self.assertEqual(ffs(b"1 1 0 1 0 "
+ b"cbor:{b'foo': b'val1', b'bar': b'val2'}"),
+ b'\x13\x00\x00\x01\x00\x01\x00\x10\xa2'
+ b'CbarDval2CfooDval1')
+
+class FrameTests(unittest.TestCase):
+ def testdataexactframesize(self):
+ data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE)
+
+ stream = framing.stream(1)
+ frames = list(framing.createcommandframes(stream, 1, b'command',
+ {}, data))
+ self.assertEqual(frames, [
+ ffs(b'1 1 stream-begin command-request new|have-data '
+ b"cbor:{b'name': b'command'}"),
+ ffs(b'1 1 0 command-data continuation %s' % data.getvalue()),
+ ffs(b'1 1 0 command-data eos ')
+ ])
+
+ def testdatamultipleframes(self):
+ data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1))
+
+ stream = framing.stream(1)
+ frames = list(framing.createcommandframes(stream, 1, b'command', {},
+ data))
+ self.assertEqual(frames, [
+ ffs(b'1 1 stream-begin command-request new|have-data '
+ b"cbor:{b'name': b'command'}"),
+ ffs(b'1 1 0 command-data continuation %s' % (
+ b'x' * framing.DEFAULT_MAX_FRAME_SIZE)),
+ ffs(b'1 1 0 command-data eos x'),
+ ])
+
+ def testargsanddata(self):
+ data = util.bytesio(b'x' * 100)
+
+ stream = framing.stream(1)
+ frames = list(framing.createcommandframes(stream, 1, b'command', {
+ b'key1': b'key1value',
+ b'key2': b'key2value',
+ b'key3': b'key3value',
+ }, data))
+
+ self.assertEqual(frames, [
+ ffs(b'1 1 stream-begin command-request new|have-data '
+ b"cbor:{b'name': b'command', b'args': {b'key1': b'key1value', "
+ b"b'key2': b'key2value', b'key3': b'key3value'}}"),
+ ffs(b'1 1 0 command-data eos %s' % data.getvalue()),
+ ])
+
+ def testtextoutputformattingstringtype(self):
+ """Formatting string must be bytes."""
+ with self.assertRaisesRegexp(ValueError, 'must use bytes formatting '):
+ list(framing.createtextoutputframe(None, 1, [
+ (b'foo'.decode('ascii'), [], [])]))
+
+ def testtextoutputargumentbytes(self):
+ with self.assertRaisesRegexp(ValueError, 'must use bytes for argument'):
+ list(framing.createtextoutputframe(None, 1, [
+ (b'foo', [b'foo'.decode('ascii')], [])]))
+
+ def testtextoutputlabelbytes(self):
+ with self.assertRaisesRegexp(ValueError, 'must use bytes for labels'):
+ list(framing.createtextoutputframe(None, 1, [
+ (b'foo', [], [b'foo'.decode('ascii')])]))
+
+ def testtextoutput1simpleatom(self):
+ stream = framing.stream(1)
+ val = list(framing.createtextoutputframe(stream, 1, [
+ (b'foo', [], [])]))
+
+ self.assertEqual(val, [
+ ffs(b'1 1 stream-begin text-output 0 '
+ b"cbor:[{b'msg': b'foo'}]"),
+ ])
+
+ def testtextoutput2simpleatoms(self):
+ stream = framing.stream(1)
+ val = list(framing.createtextoutputframe(stream, 1, [
+ (b'foo', [], []),
+ (b'bar', [], []),
+ ]))
+
+ self.assertEqual(val, [
+ ffs(b'1 1 stream-begin text-output 0 '
+ b"cbor:[{b'msg': b'foo'}, {b'msg': b'bar'}]")
+ ])
+
+ def testtextoutput1arg(self):
+ stream = framing.stream(1)
+ val = list(framing.createtextoutputframe(stream, 1, [
+ (b'foo %s', [b'val1'], []),
+ ]))
+
+ self.assertEqual(val, [
+ ffs(b'1 1 stream-begin text-output 0 '
+ b"cbor:[{b'msg': b'foo %s', b'args': [b'val1']}]")
+ ])
+
+ def testtextoutput2arg(self):
+ stream = framing.stream(1)
+ val = list(framing.createtextoutputframe(stream, 1, [
+ (b'foo %s %s', [b'val', b'value'], []),
+ ]))
+
+ self.assertEqual(val, [
+ ffs(b'1 1 stream-begin text-output 0 '
+ b"cbor:[{b'msg': b'foo %s %s', b'args': [b'val', b'value']}]")
+ ])
+
+ def testtextoutput1label(self):
+ stream = framing.stream(1)
+ val = list(framing.createtextoutputframe(stream, 1, [
+ (b'foo', [], [b'label']),
+ ]))
+
+ self.assertEqual(val, [
+ ffs(b'1 1 stream-begin text-output 0 '
+ b"cbor:[{b'msg': b'foo', b'labels': [b'label']}]")
+ ])
+
+ def testargandlabel(self):
+ stream = framing.stream(1)
+ val = list(framing.createtextoutputframe(stream, 1, [
+ (b'foo %s', [b'arg'], [b'label']),
+ ]))
+
+ self.assertEqual(val, [
+ ffs(b'1 1 stream-begin text-output 0 '
+ b"cbor:[{b'msg': b'foo %s', b'args': [b'arg'], "
+ b"b'labels': [b'label']}]")
+ ])
+
+if __name__ == '__main__':
+ import silenttestrunner
+ silenttestrunner.main(__name__)
--- a/tests/test-wireproto-serverreactor.py Mon Apr 09 11:33:38 2018 -0700
+++ b/tests/test-wireproto-serverreactor.py Mon Apr 09 14:17:57 2018 -0700
@@ -38,182 +38,6 @@
framing.createcommandframes(stream, rid, cmd, args,
datafh))
-class FrameHumanStringTests(unittest.TestCase):
- def testbasic(self):
- self.assertEqual(ffs(b'1 1 0 1 0 '),
- b'\x00\x00\x00\x01\x00\x01\x00\x10')
-
- self.assertEqual(ffs(b'2 4 0 1 0 '),
- b'\x00\x00\x00\x02\x00\x04\x00\x10')
-
- self.assertEqual(ffs(b'2 4 0 1 0 foo'),
- b'\x03\x00\x00\x02\x00\x04\x00\x10foo')
-
- def testcborint(self):
- self.assertEqual(ffs(b'1 1 0 1 0 cbor:15'),
- b'\x01\x00\x00\x01\x00\x01\x00\x10\x0f')
-
- self.assertEqual(ffs(b'1 1 0 1 0 cbor:42'),
- b'\x02\x00\x00\x01\x00\x01\x00\x10\x18*')
-
- self.assertEqual(ffs(b'1 1 0 1 0 cbor:1048576'),
- b'\x05\x00\x00\x01\x00\x01\x00\x10\x1a'
- b'\x00\x10\x00\x00')
-
- self.assertEqual(ffs(b'1 1 0 1 0 cbor:0'),
- b'\x01\x00\x00\x01\x00\x01\x00\x10\x00')
-
- self.assertEqual(ffs(b'1 1 0 1 0 cbor:-1'),
- b'\x01\x00\x00\x01\x00\x01\x00\x10 ')
-
- self.assertEqual(ffs(b'1 1 0 1 0 cbor:-342542'),
- b'\x05\x00\x00\x01\x00\x01\x00\x10:\x00\x05:\r')
-
- def testcborstrings(self):
- self.assertEqual(ffs(b"1 1 0 1 0 cbor:b'foo'"),
- b'\x04\x00\x00\x01\x00\x01\x00\x10Cfoo')
-
- self.assertEqual(ffs(b"1 1 0 1 0 cbor:u'foo'"),
- b'\x04\x00\x00\x01\x00\x01\x00\x10cfoo')
-
- def testcborlists(self):
- self.assertEqual(ffs(b"1 1 0 1 0 cbor:[None, True, False, 42, b'foo']"),
- b'\n\x00\x00\x01\x00\x01\x00\x10\x85\xf6\xf5\xf4'
- b'\x18*Cfoo')
-
- def testcbordicts(self):
- self.assertEqual(ffs(b"1 1 0 1 0 "
- b"cbor:{b'foo': b'val1', b'bar': b'val2'}"),
- b'\x13\x00\x00\x01\x00\x01\x00\x10\xa2'
- b'CbarDval2CfooDval1')
-
-class FrameTests(unittest.TestCase):
- def testdataexactframesize(self):
- data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE)
-
- stream = framing.stream(1)
- frames = list(framing.createcommandframes(stream, 1, b'command',
- {}, data))
- self.assertEqual(frames, [
- ffs(b'1 1 stream-begin command-request new|have-data '
- b"cbor:{b'name': b'command'}"),
- ffs(b'1 1 0 command-data continuation %s' % data.getvalue()),
- ffs(b'1 1 0 command-data eos ')
- ])
-
- def testdatamultipleframes(self):
- data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1))
-
- stream = framing.stream(1)
- frames = list(framing.createcommandframes(stream, 1, b'command', {},
- data))
- self.assertEqual(frames, [
- ffs(b'1 1 stream-begin command-request new|have-data '
- b"cbor:{b'name': b'command'}"),
- ffs(b'1 1 0 command-data continuation %s' % (
- b'x' * framing.DEFAULT_MAX_FRAME_SIZE)),
- ffs(b'1 1 0 command-data eos x'),
- ])
-
- def testargsanddata(self):
- data = util.bytesio(b'x' * 100)
-
- stream = framing.stream(1)
- frames = list(framing.createcommandframes(stream, 1, b'command', {
- b'key1': b'key1value',
- b'key2': b'key2value',
- b'key3': b'key3value',
- }, data))
-
- self.assertEqual(frames, [
- ffs(b'1 1 stream-begin command-request new|have-data '
- b"cbor:{b'name': b'command', b'args': {b'key1': b'key1value', "
- b"b'key2': b'key2value', b'key3': b'key3value'}}"),
- ffs(b'1 1 0 command-data eos %s' % data.getvalue()),
- ])
-
- def testtextoutputformattingstringtype(self):
- """Formatting string must be bytes."""
- with self.assertRaisesRegexp(ValueError, 'must use bytes formatting '):
- list(framing.createtextoutputframe(None, 1, [
- (b'foo'.decode('ascii'), [], [])]))
-
- def testtextoutputargumentbytes(self):
- with self.assertRaisesRegexp(ValueError, 'must use bytes for argument'):
- list(framing.createtextoutputframe(None, 1, [
- (b'foo', [b'foo'.decode('ascii')], [])]))
-
- def testtextoutputlabelbytes(self):
- with self.assertRaisesRegexp(ValueError, 'must use bytes for labels'):
- list(framing.createtextoutputframe(None, 1, [
- (b'foo', [], [b'foo'.decode('ascii')])]))
-
- def testtextoutput1simpleatom(self):
- stream = framing.stream(1)
- val = list(framing.createtextoutputframe(stream, 1, [
- (b'foo', [], [])]))
-
- self.assertEqual(val, [
- ffs(b'1 1 stream-begin text-output 0 '
- b"cbor:[{b'msg': b'foo'}]"),
- ])
-
- def testtextoutput2simpleatoms(self):
- stream = framing.stream(1)
- val = list(framing.createtextoutputframe(stream, 1, [
- (b'foo', [], []),
- (b'bar', [], []),
- ]))
-
- self.assertEqual(val, [
- ffs(b'1 1 stream-begin text-output 0 '
- b"cbor:[{b'msg': b'foo'}, {b'msg': b'bar'}]")
- ])
-
- def testtextoutput1arg(self):
- stream = framing.stream(1)
- val = list(framing.createtextoutputframe(stream, 1, [
- (b'foo %s', [b'val1'], []),
- ]))
-
- self.assertEqual(val, [
- ffs(b'1 1 stream-begin text-output 0 '
- b"cbor:[{b'msg': b'foo %s', b'args': [b'val1']}]")
- ])
-
- def testtextoutput2arg(self):
- stream = framing.stream(1)
- val = list(framing.createtextoutputframe(stream, 1, [
- (b'foo %s %s', [b'val', b'value'], []),
- ]))
-
- self.assertEqual(val, [
- ffs(b'1 1 stream-begin text-output 0 '
- b"cbor:[{b'msg': b'foo %s %s', b'args': [b'val', b'value']}]")
- ])
-
- def testtextoutput1label(self):
- stream = framing.stream(1)
- val = list(framing.createtextoutputframe(stream, 1, [
- (b'foo', [], [b'label']),
- ]))
-
- self.assertEqual(val, [
- ffs(b'1 1 stream-begin text-output 0 '
- b"cbor:[{b'msg': b'foo', b'labels': [b'label']}]")
- ])
-
- def testargandlabel(self):
- stream = framing.stream(1)
- val = list(framing.createtextoutputframe(stream, 1, [
- (b'foo %s', [b'arg'], [b'label']),
- ]))
-
- self.assertEqual(val, [
- ffs(b'1 1 stream-begin text-output 0 '
- b"cbor:[{b'msg': b'foo %s', b'args': [b'arg'], "
- b"b'labels': [b'label']}]")
- ])
class ServerReactorTests(unittest.TestCase):
def _sendsingleframe(self, reactor, f):