Mercurial > hg
changeset 40128:57782791b7e9
wireprotov2: handle stream encoding settings frames
Like what we just did for the server reactor, we teach the client
reactor to handle stream encoding settings frames. The code is
very similar.
We define a method on the stream class to handle processing the data
within the decoded frames. However, it doesn't yet do anything useful.
Differential Revision: https://phab.mercurial-scm.org/D4918
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Thu, 04 Oct 2018 16:44:21 -0700 |
parents | 080419fa4fe4 |
children | 293835e0fff7 |
files | mercurial/wireprotoframing.py tests/test-wireproto-clientreactor.py |
diffstat | 2 files changed, 186 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- a/mercurial/wireprotoframing.py Thu Oct 04 15:43:21 2018 -0700 +++ b/mercurial/wireprotoframing.py Thu Oct 04 16:44:21 2018 -0700 @@ -668,6 +668,13 @@ return makeframe(requestid, self.streamid, streamflags, typeid, flags, payload) + def setdecoder(self, name, extraobjs): + """Set the decoder for this stream. + + Receives the stream profile name and any additional CBOR objects + decoded from the stream encoding settings frame payloads. + """ + def ensureserverstream(stream): if stream.streamid % 2: raise error.ProgrammingError('server should only write to even ' @@ -1367,6 +1374,7 @@ self._pendingrequests = collections.deque() self._activerequests = {} self._incomingstreams = {} + self._streamsettingsdecoders = {} def callcommand(self, name, args, datafh=None, redirect=None): """Request that a command be executed. @@ -1484,6 +1492,9 @@ if frame.streamflags & STREAM_FLAG_END_STREAM: del self._incomingstreams[frame.streamid] + if frame.typeid == FRAME_TYPE_STREAM_SETTINGS: + return self._onstreamsettingsframe(frame) + if frame.requestid not in self._activerequests: return 'error', { 'message': (_('received frame for inactive request ID: %d') % @@ -1505,6 +1516,64 @@ return meth(request, frame) + def _onstreamsettingsframe(self, frame): + assert frame.typeid == FRAME_TYPE_STREAM_SETTINGS + + more = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION + eos = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_EOS + + if more and eos: + return 'error', { + 'message': (_('stream encoding settings frame cannot have both ' + 'continuation and end of stream flags set')), + } + + if not more and not eos: + return 'error', { + 'message': _('stream encoding settings frame must have ' + 'continuation or end of stream flag set'), + } + + if frame.streamid not in self._streamsettingsdecoders: + decoder = cborutil.bufferingdecoder() + self._streamsettingsdecoders[frame.streamid] = decoder + + decoder = self._streamsettingsdecoders[frame.streamid] + + try: + decoder.decode(frame.payload) + except Exception as e: + return 'error', { + 'message': (_('error decoding CBOR from stream encoding ' + 'settings frame: %s') % + stringutil.forcebytestr(e)), + } + + if more: + return 'noop', {} + + assert eos + + decoded = decoder.getavailable() + del self._streamsettingsdecoders[frame.streamid] + + if not decoded: + return 'error', { + 'message': _('stream encoding settings frame did not contain ' + 'CBOR data'), + } + + try: + self._incomingstreams[frame.streamid].setdecoder(decoded[0], + decoded[1:]) + except Exception as e: + return 'error', { + 'message': (_('error setting stream decoder: %s') % + stringutil.forcebytestr(e)), + } + + return 'noop', {} + def _oncommandresponseframe(self, request, frame): if frame.flags & FLAG_COMMAND_RESPONSE_EOS: request.state = 'received'
--- a/tests/test-wireproto-clientreactor.py Thu Oct 04 15:43:21 2018 -0700 +++ b/tests/test-wireproto-clientreactor.py Thu Oct 04 16:44:21 2018 -0700 @@ -6,6 +6,9 @@ error, wireprotoframing as framing, ) +from mercurial.utils import ( + cborutil, +) ffs = framing.makeframefromhumanstring @@ -162,6 +165,120 @@ b"b'redirect': {b'targets': [b'a', b'b'], " b"b'hashes': [b'sha256']}}")) +class StreamSettingsTests(unittest.TestCase): + def testnoflags(self): + reactor = framing.clientreactor(buffersends=False) + + request, action, meta = reactor.callcommand(b'foo', {}) + for f in meta[b'framegen']: + pass + + action, meta = sendframe(reactor, + ffs(b'1 2 stream-begin stream-settings 0 ')) + + self.assertEqual(action, b'error') + self.assertEqual(meta, { + b'message': b'stream encoding settings frame must have ' + b'continuation or end of stream flag set', + }) + + def testconflictflags(self): + reactor = framing.clientreactor(buffersends=False) + + request, action, meta = reactor.callcommand(b'foo', {}) + for f in meta[b'framegen']: + pass + + action, meta = sendframe(reactor, + ffs(b'1 2 stream-begin stream-settings continuation|eos ')) + + self.assertEqual(action, b'error') + self.assertEqual(meta, { + b'message': b'stream encoding settings frame cannot have both ' + b'continuation and end of stream flags set', + }) + + def testemptypayload(self): + reactor = framing.clientreactor(buffersends=False) + + request, action, meta = reactor.callcommand(b'foo', {}) + for f in meta[b'framegen']: + pass + + action, meta = sendframe(reactor, + ffs(b'1 2 stream-begin stream-settings eos ')) + + self.assertEqual(action, b'error') + self.assertEqual(meta, { + b'message': b'stream encoding settings frame did not contain ' + b'CBOR data' + }) + + def testbadcbor(self): + reactor = framing.clientreactor(buffersends=False) + + request, action, meta = reactor.callcommand(b'foo', {}) + for f in meta[b'framegen']: + pass + + action, meta = sendframe(reactor, + ffs(b'1 2 stream-begin stream-settings eos badvalue')) + + self.assertEqual(action, b'error') + + def testsingleobject(self): + reactor = framing.clientreactor(buffersends=False) + + request, action, meta = reactor.callcommand(b'foo', {}) + for f in meta[b'framegen']: + pass + + action, meta = sendframe(reactor, + ffs(b'1 2 stream-begin stream-settings eos cbor:b"identity"')) + + self.assertEqual(action, b'noop') + self.assertEqual(meta, {}) + + def testmultipleobjects(self): + reactor = framing.clientreactor(buffersends=False) + + request, action, meta = reactor.callcommand(b'foo', {}) + for f in meta[b'framegen']: + pass + + data = b''.join([ + b''.join(cborutil.streamencode(b'identity')), + b''.join(cborutil.streamencode({b'foo', b'bar'})), + ]) + + action, meta = sendframe(reactor, + ffs(b'1 2 stream-begin stream-settings eos %s' % data)) + + self.assertEqual(action, b'noop') + self.assertEqual(meta, {}) + + def testmultipleframes(self): + reactor = framing.clientreactor(buffersends=False) + + request, action, meta = reactor.callcommand(b'foo', {}) + for f in meta[b'framegen']: + pass + + data = b''.join(cborutil.streamencode(b'identity')) + + action, meta = sendframe(reactor, + ffs(b'1 2 stream-begin stream-settings continuation %s' % + data[0:3])) + + self.assertEqual(action, b'noop') + self.assertEqual(meta, {}) + + action, meta = sendframe(reactor, + ffs(b'1 2 0 stream-settings eos %s' % data[3:])) + + self.assertEqual(action, b'noop') + self.assertEqual(meta, {}) + if __name__ == '__main__': import silenttestrunner silenttestrunner.main(__name__)