Mercurial > hg
view tests/test-wireproto-clientreactor.py @ 37560:41ba336d9f1e
fix: use a portable python script instead of sed in test
Differential Revision: https://phab.mercurial-scm.org/D2988
author | Danny Hooper <hooper@google.com> |
---|---|
date | Fri, 30 Mar 2018 17:01:12 -0700 |
parents | 55b5ba8d4e68 |
children | e6870bca1f47 |
line wrap: on
line source
from __future__ import absolute_import import unittest from mercurial import ( error, wireprotoframing as framing, ) ffs = framing.makeframefromhumanstring def sendframe(reactor, frame): """Send a frame bytearray to a reactor.""" header = framing.parseheader(frame) payload = frame[framing.FRAME_HEADER_SIZE:] assert len(payload) == header.length return reactor.onframerecv(framing.frame(header.requestid, header.streamid, header.streamflags, header.typeid, header.flags, payload)) class SingleSendTests(unittest.TestCase): """A reactor that can only send once rejects subsequent sends.""" def testbasic(self): reactor = framing.clientreactor(hasmultiplesend=False, buffersends=True) request, action, meta = reactor.callcommand(b'foo', {}) self.assertEqual(request.state, 'pending') self.assertEqual(action, 'noop') action, meta = reactor.flushcommands() self.assertEqual(action, 'sendframes') for frame in meta['framegen']: self.assertEqual(request.state, 'sending') self.assertEqual(request.state, 'sent') with self.assertRaisesRegexp(error.ProgrammingError, 'cannot issue new commands'): reactor.callcommand(b'foo', {}) with self.assertRaisesRegexp(error.ProgrammingError, 'cannot issue new commands'): reactor.callcommand(b'foo', {}) class NoBufferTests(unittest.TestCase): """A reactor without send buffering sends requests immediately.""" def testbasic(self): reactor = framing.clientreactor(hasmultiplesend=True, buffersends=False) request, action, meta = reactor.callcommand(b'command1', {}) self.assertEqual(request.requestid, 1) self.assertEqual(action, 'sendframes') self.assertEqual(request.state, 'pending') for frame in meta['framegen']: self.assertEqual(request.state, 'sending') self.assertEqual(request.state, 'sent') action, meta = reactor.flushcommands() self.assertEqual(action, 'noop') # And we can send another command. request, action, meta = reactor.callcommand(b'command2', {}) self.assertEqual(request.requestid, 3) self.assertEqual(action, 'sendframes') for frame in meta['framegen']: self.assertEqual(request.state, 'sending') self.assertEqual(request.state, 'sent') class BadFrameRecvTests(unittest.TestCase): def testoddstream(self): reactor = framing.clientreactor() action, meta = sendframe(reactor, ffs(b'1 1 0 1 0 foo')) self.assertEqual(action, 'error') self.assertEqual(meta['message'], 'received frame with odd numbered stream ID: 1') def testunknownstream(self): reactor = framing.clientreactor() action, meta = sendframe(reactor, ffs(b'1 0 0 1 0 foo')) self.assertEqual(action, 'error') self.assertEqual(meta['message'], 'received frame on unknown stream without beginning ' 'of stream flag set') def testunhandledframetype(self): reactor = framing.clientreactor(buffersends=False) request, action, meta = reactor.callcommand(b'foo', {}) for frame in meta['framegen']: pass with self.assertRaisesRegexp(error.ProgrammingError, 'unhandled frame type'): sendframe(reactor, ffs(b'1 0 stream-begin text-output 0 foo')) if __name__ == '__main__': import silenttestrunner silenttestrunner.main(__name__)