Mercurial > hg
changeset 37542:1ec5ce21cb46
tests: extract wire protocol framing tests to own file
I was lazy when I put these in test-wireproto-serverreactor.py. Let's
do it properly.
Differential Revision: https://phab.mercurial-scm.org/D3222
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Mon, 09 Apr 2018 14:17:57 -0700 |
parents | 3e5e37204b32 |
children | 01361be9e2dc |
files | tests/test-wireproto-framing.py tests/test-wireproto-serverreactor.py |
diffstat | 2 files changed, 191 insertions(+), 176 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/test-wireproto-framing.py Mon Apr 09 14:17:57 2018 -0700 @@ -0,0 +1,191 @@ +from __future__ import absolute_import, print_function + +import unittest + +from mercurial import ( + util, + wireprotoframing as framing, +) + +ffs = framing.makeframefromhumanstring + +class FrameHumanStringTests(unittest.TestCase): + def testbasic(self): + self.assertEqual(ffs(b'1 1 0 1 0 '), + b'\x00\x00\x00\x01\x00\x01\x00\x10') + + self.assertEqual(ffs(b'2 4 0 1 0 '), + b'\x00\x00\x00\x02\x00\x04\x00\x10') + + self.assertEqual(ffs(b'2 4 0 1 0 foo'), + b'\x03\x00\x00\x02\x00\x04\x00\x10foo') + + def testcborint(self): + self.assertEqual(ffs(b'1 1 0 1 0 cbor:15'), + b'\x01\x00\x00\x01\x00\x01\x00\x10\x0f') + + self.assertEqual(ffs(b'1 1 0 1 0 cbor:42'), + b'\x02\x00\x00\x01\x00\x01\x00\x10\x18*') + + self.assertEqual(ffs(b'1 1 0 1 0 cbor:1048576'), + b'\x05\x00\x00\x01\x00\x01\x00\x10\x1a' + b'\x00\x10\x00\x00') + + self.assertEqual(ffs(b'1 1 0 1 0 cbor:0'), + b'\x01\x00\x00\x01\x00\x01\x00\x10\x00') + + self.assertEqual(ffs(b'1 1 0 1 0 cbor:-1'), + b'\x01\x00\x00\x01\x00\x01\x00\x10 ') + + self.assertEqual(ffs(b'1 1 0 1 0 cbor:-342542'), + b'\x05\x00\x00\x01\x00\x01\x00\x10:\x00\x05:\r') + + def testcborstrings(self): + self.assertEqual(ffs(b"1 1 0 1 0 cbor:b'foo'"), + b'\x04\x00\x00\x01\x00\x01\x00\x10Cfoo') + + self.assertEqual(ffs(b"1 1 0 1 0 cbor:u'foo'"), + b'\x04\x00\x00\x01\x00\x01\x00\x10cfoo') + + def testcborlists(self): + self.assertEqual(ffs(b"1 1 0 1 0 cbor:[None, True, False, 42, b'foo']"), + b'\n\x00\x00\x01\x00\x01\x00\x10\x85\xf6\xf5\xf4' + b'\x18*Cfoo') + + def testcbordicts(self): + self.assertEqual(ffs(b"1 1 0 1 0 " + b"cbor:{b'foo': b'val1', b'bar': b'val2'}"), + b'\x13\x00\x00\x01\x00\x01\x00\x10\xa2' + b'CbarDval2CfooDval1') + +class FrameTests(unittest.TestCase): + def testdataexactframesize(self): + data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE) + + stream = framing.stream(1) + frames = list(framing.createcommandframes(stream, 1, b'command', + {}, data)) + self.assertEqual(frames, [ + ffs(b'1 1 stream-begin command-request new|have-data ' + b"cbor:{b'name': b'command'}"), + ffs(b'1 1 0 command-data continuation %s' % data.getvalue()), + ffs(b'1 1 0 command-data eos ') + ]) + + def testdatamultipleframes(self): + data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1)) + + stream = framing.stream(1) + frames = list(framing.createcommandframes(stream, 1, b'command', {}, + data)) + self.assertEqual(frames, [ + ffs(b'1 1 stream-begin command-request new|have-data ' + b"cbor:{b'name': b'command'}"), + ffs(b'1 1 0 command-data continuation %s' % ( + b'x' * framing.DEFAULT_MAX_FRAME_SIZE)), + ffs(b'1 1 0 command-data eos x'), + ]) + + def testargsanddata(self): + data = util.bytesio(b'x' * 100) + + stream = framing.stream(1) + frames = list(framing.createcommandframes(stream, 1, b'command', { + b'key1': b'key1value', + b'key2': b'key2value', + b'key3': b'key3value', + }, data)) + + self.assertEqual(frames, [ + ffs(b'1 1 stream-begin command-request new|have-data ' + b"cbor:{b'name': b'command', b'args': {b'key1': b'key1value', " + b"b'key2': b'key2value', b'key3': b'key3value'}}"), + ffs(b'1 1 0 command-data eos %s' % data.getvalue()), + ]) + + def testtextoutputformattingstringtype(self): + """Formatting string must be bytes.""" + with self.assertRaisesRegexp(ValueError, 'must use bytes formatting '): + list(framing.createtextoutputframe(None, 1, [ + (b'foo'.decode('ascii'), [], [])])) + + def testtextoutputargumentbytes(self): + with self.assertRaisesRegexp(ValueError, 'must use bytes for argument'): + list(framing.createtextoutputframe(None, 1, [ + (b'foo', [b'foo'.decode('ascii')], [])])) + + def testtextoutputlabelbytes(self): + with self.assertRaisesRegexp(ValueError, 'must use bytes for labels'): + list(framing.createtextoutputframe(None, 1, [ + (b'foo', [], [b'foo'.decode('ascii')])])) + + def testtextoutput1simpleatom(self): + stream = framing.stream(1) + val = list(framing.createtextoutputframe(stream, 1, [ + (b'foo', [], [])])) + + self.assertEqual(val, [ + ffs(b'1 1 stream-begin text-output 0 ' + b"cbor:[{b'msg': b'foo'}]"), + ]) + + def testtextoutput2simpleatoms(self): + stream = framing.stream(1) + val = list(framing.createtextoutputframe(stream, 1, [ + (b'foo', [], []), + (b'bar', [], []), + ])) + + self.assertEqual(val, [ + ffs(b'1 1 stream-begin text-output 0 ' + b"cbor:[{b'msg': b'foo'}, {b'msg': b'bar'}]") + ]) + + def testtextoutput1arg(self): + stream = framing.stream(1) + val = list(framing.createtextoutputframe(stream, 1, [ + (b'foo %s', [b'val1'], []), + ])) + + self.assertEqual(val, [ + ffs(b'1 1 stream-begin text-output 0 ' + b"cbor:[{b'msg': b'foo %s', b'args': [b'val1']}]") + ]) + + def testtextoutput2arg(self): + stream = framing.stream(1) + val = list(framing.createtextoutputframe(stream, 1, [ + (b'foo %s %s', [b'val', b'value'], []), + ])) + + self.assertEqual(val, [ + ffs(b'1 1 stream-begin text-output 0 ' + b"cbor:[{b'msg': b'foo %s %s', b'args': [b'val', b'value']}]") + ]) + + def testtextoutput1label(self): + stream = framing.stream(1) + val = list(framing.createtextoutputframe(stream, 1, [ + (b'foo', [], [b'label']), + ])) + + self.assertEqual(val, [ + ffs(b'1 1 stream-begin text-output 0 ' + b"cbor:[{b'msg': b'foo', b'labels': [b'label']}]") + ]) + + def testargandlabel(self): + stream = framing.stream(1) + val = list(framing.createtextoutputframe(stream, 1, [ + (b'foo %s', [b'arg'], [b'label']), + ])) + + self.assertEqual(val, [ + ffs(b'1 1 stream-begin text-output 0 ' + b"cbor:[{b'msg': b'foo %s', b'args': [b'arg'], " + b"b'labels': [b'label']}]") + ]) + +if __name__ == '__main__': + import silenttestrunner + silenttestrunner.main(__name__)
--- a/tests/test-wireproto-serverreactor.py Mon Apr 09 11:33:38 2018 -0700 +++ b/tests/test-wireproto-serverreactor.py Mon Apr 09 14:17:57 2018 -0700 @@ -38,182 +38,6 @@ framing.createcommandframes(stream, rid, cmd, args, datafh)) -class FrameHumanStringTests(unittest.TestCase): - def testbasic(self): - self.assertEqual(ffs(b'1 1 0 1 0 '), - b'\x00\x00\x00\x01\x00\x01\x00\x10') - - self.assertEqual(ffs(b'2 4 0 1 0 '), - b'\x00\x00\x00\x02\x00\x04\x00\x10') - - self.assertEqual(ffs(b'2 4 0 1 0 foo'), - b'\x03\x00\x00\x02\x00\x04\x00\x10foo') - - def testcborint(self): - self.assertEqual(ffs(b'1 1 0 1 0 cbor:15'), - b'\x01\x00\x00\x01\x00\x01\x00\x10\x0f') - - self.assertEqual(ffs(b'1 1 0 1 0 cbor:42'), - b'\x02\x00\x00\x01\x00\x01\x00\x10\x18*') - - self.assertEqual(ffs(b'1 1 0 1 0 cbor:1048576'), - b'\x05\x00\x00\x01\x00\x01\x00\x10\x1a' - b'\x00\x10\x00\x00') - - self.assertEqual(ffs(b'1 1 0 1 0 cbor:0'), - b'\x01\x00\x00\x01\x00\x01\x00\x10\x00') - - self.assertEqual(ffs(b'1 1 0 1 0 cbor:-1'), - b'\x01\x00\x00\x01\x00\x01\x00\x10 ') - - self.assertEqual(ffs(b'1 1 0 1 0 cbor:-342542'), - b'\x05\x00\x00\x01\x00\x01\x00\x10:\x00\x05:\r') - - def testcborstrings(self): - self.assertEqual(ffs(b"1 1 0 1 0 cbor:b'foo'"), - b'\x04\x00\x00\x01\x00\x01\x00\x10Cfoo') - - self.assertEqual(ffs(b"1 1 0 1 0 cbor:u'foo'"), - b'\x04\x00\x00\x01\x00\x01\x00\x10cfoo') - - def testcborlists(self): - self.assertEqual(ffs(b"1 1 0 1 0 cbor:[None, True, False, 42, b'foo']"), - b'\n\x00\x00\x01\x00\x01\x00\x10\x85\xf6\xf5\xf4' - b'\x18*Cfoo') - - def testcbordicts(self): - self.assertEqual(ffs(b"1 1 0 1 0 " - b"cbor:{b'foo': b'val1', b'bar': b'val2'}"), - b'\x13\x00\x00\x01\x00\x01\x00\x10\xa2' - b'CbarDval2CfooDval1') - -class FrameTests(unittest.TestCase): - def testdataexactframesize(self): - data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE) - - stream = framing.stream(1) - frames = list(framing.createcommandframes(stream, 1, b'command', - {}, data)) - self.assertEqual(frames, [ - ffs(b'1 1 stream-begin command-request new|have-data ' - b"cbor:{b'name': b'command'}"), - ffs(b'1 1 0 command-data continuation %s' % data.getvalue()), - ffs(b'1 1 0 command-data eos ') - ]) - - def testdatamultipleframes(self): - data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1)) - - stream = framing.stream(1) - frames = list(framing.createcommandframes(stream, 1, b'command', {}, - data)) - self.assertEqual(frames, [ - ffs(b'1 1 stream-begin command-request new|have-data ' - b"cbor:{b'name': b'command'}"), - ffs(b'1 1 0 command-data continuation %s' % ( - b'x' * framing.DEFAULT_MAX_FRAME_SIZE)), - ffs(b'1 1 0 command-data eos x'), - ]) - - def testargsanddata(self): - data = util.bytesio(b'x' * 100) - - stream = framing.stream(1) - frames = list(framing.createcommandframes(stream, 1, b'command', { - b'key1': b'key1value', - b'key2': b'key2value', - b'key3': b'key3value', - }, data)) - - self.assertEqual(frames, [ - ffs(b'1 1 stream-begin command-request new|have-data ' - b"cbor:{b'name': b'command', b'args': {b'key1': b'key1value', " - b"b'key2': b'key2value', b'key3': b'key3value'}}"), - ffs(b'1 1 0 command-data eos %s' % data.getvalue()), - ]) - - def testtextoutputformattingstringtype(self): - """Formatting string must be bytes.""" - with self.assertRaisesRegexp(ValueError, 'must use bytes formatting '): - list(framing.createtextoutputframe(None, 1, [ - (b'foo'.decode('ascii'), [], [])])) - - def testtextoutputargumentbytes(self): - with self.assertRaisesRegexp(ValueError, 'must use bytes for argument'): - list(framing.createtextoutputframe(None, 1, [ - (b'foo', [b'foo'.decode('ascii')], [])])) - - def testtextoutputlabelbytes(self): - with self.assertRaisesRegexp(ValueError, 'must use bytes for labels'): - list(framing.createtextoutputframe(None, 1, [ - (b'foo', [], [b'foo'.decode('ascii')])])) - - def testtextoutput1simpleatom(self): - stream = framing.stream(1) - val = list(framing.createtextoutputframe(stream, 1, [ - (b'foo', [], [])])) - - self.assertEqual(val, [ - ffs(b'1 1 stream-begin text-output 0 ' - b"cbor:[{b'msg': b'foo'}]"), - ]) - - def testtextoutput2simpleatoms(self): - stream = framing.stream(1) - val = list(framing.createtextoutputframe(stream, 1, [ - (b'foo', [], []), - (b'bar', [], []), - ])) - - self.assertEqual(val, [ - ffs(b'1 1 stream-begin text-output 0 ' - b"cbor:[{b'msg': b'foo'}, {b'msg': b'bar'}]") - ]) - - def testtextoutput1arg(self): - stream = framing.stream(1) - val = list(framing.createtextoutputframe(stream, 1, [ - (b'foo %s', [b'val1'], []), - ])) - - self.assertEqual(val, [ - ffs(b'1 1 stream-begin text-output 0 ' - b"cbor:[{b'msg': b'foo %s', b'args': [b'val1']}]") - ]) - - def testtextoutput2arg(self): - stream = framing.stream(1) - val = list(framing.createtextoutputframe(stream, 1, [ - (b'foo %s %s', [b'val', b'value'], []), - ])) - - self.assertEqual(val, [ - ffs(b'1 1 stream-begin text-output 0 ' - b"cbor:[{b'msg': b'foo %s %s', b'args': [b'val', b'value']}]") - ]) - - def testtextoutput1label(self): - stream = framing.stream(1) - val = list(framing.createtextoutputframe(stream, 1, [ - (b'foo', [], [b'label']), - ])) - - self.assertEqual(val, [ - ffs(b'1 1 stream-begin text-output 0 ' - b"cbor:[{b'msg': b'foo', b'labels': [b'label']}]") - ]) - - def testargandlabel(self): - stream = framing.stream(1) - val = list(framing.createtextoutputframe(stream, 1, [ - (b'foo %s', [b'arg'], [b'label']), - ])) - - self.assertEqual(val, [ - ffs(b'1 1 stream-begin text-output 0 ' - b"cbor:[{b'msg': b'foo %s', b'args': [b'arg'], " - b"b'labels': [b'label']}]") - ]) class ServerReactorTests(unittest.TestCase): def _sendsingleframe(self, reactor, f):