tests/test-wireproto-clientreactor.py
author Gregory Szorc <gregory.szorc@gmail.com>
Wed, 11 Apr 2018 17:51:40 -0700
changeset 37642 d959277ff1b5
parent 37544 55b5ba8d4e68
child 37656 e6870bca1f47
permissions -rw-r--r--
bundlerepo: rename "other" to "peer" Differential Revision: https://phab.mercurial-scm.org/D3293

from __future__ import absolute_import

import unittest

from mercurial import (
    error,
    wireprotoframing as framing,
)

ffs = framing.makeframefromhumanstring

def sendframe(reactor, frame):
    """Send a frame bytearray to a reactor."""
    header = framing.parseheader(frame)
    payload = frame[framing.FRAME_HEADER_SIZE:]
    assert len(payload) == header.length

    return reactor.onframerecv(framing.frame(header.requestid,
                                             header.streamid,
                                             header.streamflags,
                                             header.typeid,
                                             header.flags,
                                             payload))

class SingleSendTests(unittest.TestCase):
    """A reactor that can only send once rejects subsequent sends."""
    def testbasic(self):
        reactor = framing.clientreactor(hasmultiplesend=False, buffersends=True)

        request, action, meta = reactor.callcommand(b'foo', {})
        self.assertEqual(request.state, 'pending')
        self.assertEqual(action, 'noop')

        action, meta = reactor.flushcommands()
        self.assertEqual(action, 'sendframes')

        for frame in meta['framegen']:
            self.assertEqual(request.state, 'sending')

        self.assertEqual(request.state, 'sent')

        with self.assertRaisesRegexp(error.ProgrammingError,
                                     'cannot issue new commands'):
            reactor.callcommand(b'foo', {})

        with self.assertRaisesRegexp(error.ProgrammingError,
                                     'cannot issue new commands'):
            reactor.callcommand(b'foo', {})

class NoBufferTests(unittest.TestCase):
    """A reactor without send buffering sends requests immediately."""
    def testbasic(self):
        reactor = framing.clientreactor(hasmultiplesend=True, buffersends=False)

        request, action, meta = reactor.callcommand(b'command1', {})
        self.assertEqual(request.requestid, 1)
        self.assertEqual(action, 'sendframes')

        self.assertEqual(request.state, 'pending')

        for frame in meta['framegen']:
            self.assertEqual(request.state, 'sending')

        self.assertEqual(request.state, 'sent')

        action, meta = reactor.flushcommands()
        self.assertEqual(action, 'noop')

        # And we can send another command.
        request, action, meta = reactor.callcommand(b'command2', {})
        self.assertEqual(request.requestid, 3)
        self.assertEqual(action, 'sendframes')

        for frame in meta['framegen']:
            self.assertEqual(request.state, 'sending')

        self.assertEqual(request.state, 'sent')

class BadFrameRecvTests(unittest.TestCase):
    def testoddstream(self):
        reactor = framing.clientreactor()

        action, meta = sendframe(reactor, ffs(b'1 1 0 1 0 foo'))
        self.assertEqual(action, 'error')
        self.assertEqual(meta['message'],
                         'received frame with odd numbered stream ID: 1')

    def testunknownstream(self):
        reactor = framing.clientreactor()

        action, meta = sendframe(reactor, ffs(b'1 0 0 1 0 foo'))
        self.assertEqual(action, 'error')
        self.assertEqual(meta['message'],
                         'received frame on unknown stream without beginning '
                         'of stream flag set')

    def testunhandledframetype(self):
        reactor = framing.clientreactor(buffersends=False)

        request, action, meta = reactor.callcommand(b'foo', {})
        for frame in meta['framegen']:
            pass

        with self.assertRaisesRegexp(error.ProgrammingError,
                                     'unhandled frame type'):
            sendframe(reactor, ffs(b'1 0 stream-begin text-output 0 foo'))

if __name__ == '__main__':
    import silenttestrunner
    silenttestrunner.main(__name__)