bundlerepo: rename "other" to "peer"
Differential Revision: https://phab.mercurial-scm.org/D3293
from __future__ import absolute_import
import unittest
from mercurial import (
error,
wireprotoframing as framing,
)
ffs = framing.makeframefromhumanstring
def sendframe(reactor, frame):
"""Send a frame bytearray to a reactor."""
header = framing.parseheader(frame)
payload = frame[framing.FRAME_HEADER_SIZE:]
assert len(payload) == header.length
return reactor.onframerecv(framing.frame(header.requestid,
header.streamid,
header.streamflags,
header.typeid,
header.flags,
payload))
class SingleSendTests(unittest.TestCase):
"""A reactor that can only send once rejects subsequent sends."""
def testbasic(self):
reactor = framing.clientreactor(hasmultiplesend=False, buffersends=True)
request, action, meta = reactor.callcommand(b'foo', {})
self.assertEqual(request.state, 'pending')
self.assertEqual(action, 'noop')
action, meta = reactor.flushcommands()
self.assertEqual(action, 'sendframes')
for frame in meta['framegen']:
self.assertEqual(request.state, 'sending')
self.assertEqual(request.state, 'sent')
with self.assertRaisesRegexp(error.ProgrammingError,
'cannot issue new commands'):
reactor.callcommand(b'foo', {})
with self.assertRaisesRegexp(error.ProgrammingError,
'cannot issue new commands'):
reactor.callcommand(b'foo', {})
class NoBufferTests(unittest.TestCase):
"""A reactor without send buffering sends requests immediately."""
def testbasic(self):
reactor = framing.clientreactor(hasmultiplesend=True, buffersends=False)
request, action, meta = reactor.callcommand(b'command1', {})
self.assertEqual(request.requestid, 1)
self.assertEqual(action, 'sendframes')
self.assertEqual(request.state, 'pending')
for frame in meta['framegen']:
self.assertEqual(request.state, 'sending')
self.assertEqual(request.state, 'sent')
action, meta = reactor.flushcommands()
self.assertEqual(action, 'noop')
# And we can send another command.
request, action, meta = reactor.callcommand(b'command2', {})
self.assertEqual(request.requestid, 3)
self.assertEqual(action, 'sendframes')
for frame in meta['framegen']:
self.assertEqual(request.state, 'sending')
self.assertEqual(request.state, 'sent')
class BadFrameRecvTests(unittest.TestCase):
def testoddstream(self):
reactor = framing.clientreactor()
action, meta = sendframe(reactor, ffs(b'1 1 0 1 0 foo'))
self.assertEqual(action, 'error')
self.assertEqual(meta['message'],
'received frame with odd numbered stream ID: 1')
def testunknownstream(self):
reactor = framing.clientreactor()
action, meta = sendframe(reactor, ffs(b'1 0 0 1 0 foo'))
self.assertEqual(action, 'error')
self.assertEqual(meta['message'],
'received frame on unknown stream without beginning '
'of stream flag set')
def testunhandledframetype(self):
reactor = framing.clientreactor(buffersends=False)
request, action, meta = reactor.callcommand(b'foo', {})
for frame in meta['framegen']:
pass
with self.assertRaisesRegexp(error.ProgrammingError,
'unhandled frame type'):
sendframe(reactor, ffs(b'1 0 stream-begin text-output 0 foo'))
if __name__ == '__main__':
import silenttestrunner
silenttestrunner.main(__name__)