Mercurial > hg
view tests/test-wireproto-framing.py @ 40402:106adc261492
logtoprocess: sends the canonical command name to the subprocess
One of the use-case of logtoprocess is to monitor command duration. With the
current code, we only get whatever command name the user typed (either
abbreviated or aliased).
This makes analytics on the collected data more difficult. Stores the
canonical command name in the request object. Pass the stored canonical name
in the `req.ui.log("commandfinish", ...)` call as keyword argument to not
break potential string formatting.
Pass the value as the environment variable named `LTP_COMMAND` to the called
script.
Differential Revision: https://phab.mercurial-scm.org/D4820
author | Boris Feld <boris.feld@octobus.net> |
---|---|
date | Mon, 22 Oct 2018 15:51:01 +0200 |
parents | fcc6bd11444b |
children | 2372284d9457 |
line wrap: on
line source
from __future__ import absolute_import, print_function import unittest from mercurial import ( util, wireprotoframing as framing, ) ffs = framing.makeframefromhumanstring class FrameHumanStringTests(unittest.TestCase): def testbasic(self): self.assertEqual(ffs(b'1 1 0 1 0 '), b'\x00\x00\x00\x01\x00\x01\x00\x10') self.assertEqual(ffs(b'2 4 0 1 0 '), b'\x00\x00\x00\x02\x00\x04\x00\x10') self.assertEqual(ffs(b'2 4 0 1 0 foo'), b'\x03\x00\x00\x02\x00\x04\x00\x10foo') def testcborint(self): self.assertEqual(ffs(b'1 1 0 1 0 cbor:15'), b'\x01\x00\x00\x01\x00\x01\x00\x10\x0f') self.assertEqual(ffs(b'1 1 0 1 0 cbor:42'), b'\x02\x00\x00\x01\x00\x01\x00\x10\x18*') self.assertEqual(ffs(b'1 1 0 1 0 cbor:1048576'), b'\x05\x00\x00\x01\x00\x01\x00\x10\x1a' b'\x00\x10\x00\x00') self.assertEqual(ffs(b'1 1 0 1 0 cbor:0'), b'\x01\x00\x00\x01\x00\x01\x00\x10\x00') self.assertEqual(ffs(b'1 1 0 1 0 cbor:-1'), b'\x01\x00\x00\x01\x00\x01\x00\x10 ') self.assertEqual(ffs(b'1 1 0 1 0 cbor:-342542'), b'\x05\x00\x00\x01\x00\x01\x00\x10:\x00\x05:\r') def testcborstrings(self): self.assertEqual(ffs(b"1 1 0 1 0 cbor:b'foo'"), b'\x04\x00\x00\x01\x00\x01\x00\x10Cfoo') def testcborlists(self): self.assertEqual(ffs(b"1 1 0 1 0 cbor:[None, True, False, 42, b'foo']"), b'\n\x00\x00\x01\x00\x01\x00\x10\x85\xf6\xf5\xf4' b'\x18*Cfoo') def testcbordicts(self): self.assertEqual(ffs(b"1 1 0 1 0 " b"cbor:{b'foo': b'val1', b'bar': b'val2'}"), b'\x13\x00\x00\x01\x00\x01\x00\x10\xa2' b'CbarDval2CfooDval1') class FrameTests(unittest.TestCase): def testdataexactframesize(self): data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE) stream = framing.stream(1) frames = list(framing.createcommandframes(stream, 1, b'command', {}, data)) self.assertEqual(frames, [ ffs(b'1 1 stream-begin command-request new|have-data ' b"cbor:{b'name': b'command'}"), ffs(b'1 1 0 command-data continuation %s' % data.getvalue()), ffs(b'1 1 0 command-data eos ') ]) def testdatamultipleframes(self): data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1)) stream = framing.stream(1) frames = list(framing.createcommandframes(stream, 1, b'command', {}, data)) self.assertEqual(frames, [ ffs(b'1 1 stream-begin command-request new|have-data ' b"cbor:{b'name': b'command'}"), ffs(b'1 1 0 command-data continuation %s' % ( b'x' * framing.DEFAULT_MAX_FRAME_SIZE)), ffs(b'1 1 0 command-data eos x'), ]) def testargsanddata(self): data = util.bytesio(b'x' * 100) stream = framing.stream(1) frames = list(framing.createcommandframes(stream, 1, b'command', { b'key1': b'key1value', b'key2': b'key2value', b'key3': b'key3value', }, data)) self.assertEqual(frames, [ ffs(b'1 1 stream-begin command-request new|have-data ' b"cbor:{b'name': b'command', b'args': {b'key1': b'key1value', " b"b'key2': b'key2value', b'key3': b'key3value'}}"), ffs(b'1 1 0 command-data eos %s' % data.getvalue()), ]) if not getattr(unittest.TestCase, 'assertRaisesRegex', False): # Python 3.7 deprecates the regex*p* version, but 2.7 lacks # the regex version. assertRaisesRegex = (# camelcase-required unittest.TestCase.assertRaisesRegexp) def testtextoutputformattingstringtype(self): """Formatting string must be bytes.""" with self.assertRaisesRegex(ValueError, 'must use bytes formatting '): list(framing.createtextoutputframe(None, 1, [ (b'foo'.decode('ascii'), [], [])])) def testtextoutputargumentbytes(self): with self.assertRaisesRegex(ValueError, 'must use bytes for argument'): list(framing.createtextoutputframe(None, 1, [ (b'foo', [b'foo'.decode('ascii')], [])])) def testtextoutputlabelbytes(self): with self.assertRaisesRegex(ValueError, 'must use bytes for labels'): list(framing.createtextoutputframe(None, 1, [ (b'foo', [], [b'foo'.decode('ascii')])])) def testtextoutput1simpleatom(self): stream = framing.stream(1) val = list(framing.createtextoutputframe(stream, 1, [ (b'foo', [], [])])) self.assertEqual(val, [ ffs(b'1 1 stream-begin text-output 0 ' b"cbor:[{b'msg': b'foo'}]"), ]) def testtextoutput2simpleatoms(self): stream = framing.stream(1) val = list(framing.createtextoutputframe(stream, 1, [ (b'foo', [], []), (b'bar', [], []), ])) self.assertEqual(val, [ ffs(b'1 1 stream-begin text-output 0 ' b"cbor:[{b'msg': b'foo'}, {b'msg': b'bar'}]") ]) def testtextoutput1arg(self): stream = framing.stream(1) val = list(framing.createtextoutputframe(stream, 1, [ (b'foo %s', [b'val1'], []), ])) self.assertEqual(val, [ ffs(b'1 1 stream-begin text-output 0 ' b"cbor:[{b'msg': b'foo %s', b'args': [b'val1']}]") ]) def testtextoutput2arg(self): stream = framing.stream(1) val = list(framing.createtextoutputframe(stream, 1, [ (b'foo %s %s', [b'val', b'value'], []), ])) self.assertEqual(val, [ ffs(b'1 1 stream-begin text-output 0 ' b"cbor:[{b'msg': b'foo %s %s', b'args': [b'val', b'value']}]") ]) def testtextoutput1label(self): stream = framing.stream(1) val = list(framing.createtextoutputframe(stream, 1, [ (b'foo', [], [b'label']), ])) self.assertEqual(val, [ ffs(b'1 1 stream-begin text-output 0 ' b"cbor:[{b'msg': b'foo', b'labels': [b'label']}]") ]) def testargandlabel(self): stream = framing.stream(1) val = list(framing.createtextoutputframe(stream, 1, [ (b'foo %s', [b'arg'], [b'label']), ])) self.assertEqual(val, [ ffs(b'1 1 stream-begin text-output 0 ' b"cbor:[{b'msg': b'foo %s', b'args': [b'arg'], " b"b'labels': [b'label']}]") ]) if __name__ == '__main__': import silenttestrunner silenttestrunner.main(__name__)