changeset 40126:327d40b94bed

wireprotov2: handle sender protocol settings frames We teach the server reactor to handle the optional sender protocol settings frames, which can only be sent at the beginning of frame exchange. Right now, we simply decode the data and record the sender protocol settings on the server reactor instance: we don't yet do anything meaningful with the data. Differential Revision: https://phab.mercurial-scm.org/D4916
author Gregory Szorc <gregory.szorc@gmail.com>
date Thu, 04 Oct 2018 16:26:45 -0700
parents e2fe1074024c
children 080419fa4fe4
files mercurial/wireprotoframing.py tests/test-wireproto-serverreactor.py
diffstat 2 files changed, 195 insertions(+), 2 deletions(-) [+]
line wrap: on
line diff
--- a/mercurial/wireprotoframing.py	Thu Oct 04 14:05:16 2018 -0700
+++ b/mercurial/wireprotoframing.py	Thu Oct 04 16:26:45 2018 -0700
@@ -674,6 +674,10 @@
                                      'numbered streams; %d is not even' %
                                      stream.streamid)
 
+DEFAULT_PROTOCOL_SETTINGS = {
+    'contentencodings': [b'identity'],
+}
+
 class serverreactor(object):
     """Holds state of a server handling frame-based protocol requests.
 
@@ -750,7 +754,7 @@
         sender cannot receive until all data has been transmitted.
         """
         self._deferoutput = deferoutput
-        self._state = 'idle'
+        self._state = 'initial'
         self._nextoutgoingstreamid = 2
         self._bufferedframegens = []
         # stream id -> stream instance for all active streams from the client.
@@ -763,6 +767,11 @@
         # set.
         self._activecommands = set()
 
+        self._protocolsettingsdecoder = None
+
+        # Sender protocol settings are optional. Set implied default values.
+        self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS)
+
     def onframerecv(self, frame):
         """Process a frame that has been received off the wire.
 
@@ -794,6 +803,8 @@
             del self._incomingstreams[frame.streamid]
 
         handlers = {
+            'initial': self._onframeinitial,
+            'protocol-settings-receiving': self._onframeprotocolsettings,
             'idle': self._onframeidle,
             'command-receiving': self._onframecommandreceiving,
             'errored': self._onframeerrored,
@@ -1062,6 +1073,85 @@
                 _('received command request frame with neither new nor '
                   'continuation flags set'))
 
+    def _onframeinitial(self, frame):
+        # Called when we receive a frame when in the "initial" state.
+        if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
+            self._state = 'protocol-settings-receiving'
+            self._protocolsettingsdecoder = cborutil.bufferingdecoder()
+            return self._onframeprotocolsettings(frame)
+
+        elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
+            self._state = 'idle'
+            return self._onframeidle(frame)
+
+        else:
+            self._state = 'errored'
+            return self._makeerrorresult(
+                _('expected sender protocol settings or command request '
+                  'frame; got %d') % frame.typeid)
+
+    def _onframeprotocolsettings(self, frame):
+        assert self._state == 'protocol-settings-receiving'
+        assert self._protocolsettingsdecoder is not None
+
+        if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
+            self._state = 'errored'
+            return self._makeerrorresult(
+                _('expected sender protocol settings frame; got %d') %
+                frame.typeid)
+
+        more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION
+        eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS
+
+        if more and eos:
+            self._state = 'errored'
+            return self._makeerrorresult(
+                _('sender protocol settings frame cannot have both '
+                  'continuation and end of stream flags set'))
+
+        if not more and not eos:
+            self._state = 'errored'
+            return self._makeerrorresult(
+                _('sender protocol settings frame must have continuation or '
+                  'end of stream flag set'))
+
+        # TODO establish limits for maximum amount of data that can be
+        # buffered.
+        try:
+            self._protocolsettingsdecoder.decode(frame.payload)
+        except Exception as e:
+            self._state = 'errored'
+            return self._makeerrorresult(
+                _('error decoding CBOR from sender protocol settings frame: %s')
+                % stringutil.forcebytestr(e))
+
+        if more:
+            return self._makewantframeresult()
+
+        assert eos
+
+        decoded = self._protocolsettingsdecoder.getavailable()
+        self._protocolsettingsdecoder = None
+
+        if not decoded:
+            self._state = 'errored'
+            return self._makeerrorresult(
+                _('sender protocol settings frame did not contain CBOR data'))
+        elif len(decoded) > 1:
+            self._state = 'errored'
+            return self._makeerrorresult(
+                _('sender protocol settings frame contained multiple CBOR '
+                  'values'))
+
+        d = decoded[0]
+
+        if b'contentencodings' in d:
+            self._sendersettings['contentencodings'] = d[b'contentencodings']
+
+        self._state = 'idle'
+
+        return self._makewantframeresult()
+
     def _onframeidle(self, frame):
         # The only frame type that should be received in this state is a
         # command request.
--- a/tests/test-wireproto-serverreactor.py	Thu Oct 04 14:05:16 2018 -0700
+++ b/tests/test-wireproto-serverreactor.py	Thu Oct 04 16:26:45 2018 -0700
@@ -9,6 +9,9 @@
     util,
     wireprotoframing as framing,
 )
+from mercurial.utils import (
+    cborutil,
+)
 
 ffs = framing.makeframefromhumanstring
 
@@ -193,7 +196,8 @@
             ffs(b'1 1 stream-begin command-data 0 ignored'))
         self.assertaction(result, b'error')
         self.assertEqual(result[1], {
-            b'message': b'expected command request frame; got 2',
+            b'message': b'expected sender protocol settings or command request '
+                        b'frame; got 2',
         })
 
     def testunexpectedcommanddatareceiving(self):
@@ -494,6 +498,105 @@
         results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
         self.assertaction(results[0], b'runcommand')
 
+    def testprotocolsettingsnoflags(self):
+        result = self._sendsingleframe(
+            makereactor(),
+            ffs(b'0 1 stream-begin sender-protocol-settings 0 '))
+        self.assertaction(result, b'error')
+        self.assertEqual(result[1], {
+            b'message': b'sender protocol settings frame must have '
+                        b'continuation or end of stream flag set',
+        })
+
+    def testprotocolsettingsconflictflags(self):
+        result = self._sendsingleframe(
+            makereactor(),
+            ffs(b'0 1 stream-begin sender-protocol-settings continuation|eos '))
+        self.assertaction(result, b'error')
+        self.assertEqual(result[1], {
+            b'message': b'sender protocol settings frame cannot have both '
+                        b'continuation and end of stream flags set',
+        })
+
+    def testprotocolsettingsemptypayload(self):
+        result = self._sendsingleframe(
+            makereactor(),
+            ffs(b'0 1 stream-begin sender-protocol-settings eos '))
+        self.assertaction(result, b'error')
+        self.assertEqual(result[1], {
+            b'message': b'sender protocol settings frame did not contain CBOR '
+                        b'data',
+        })
+
+    def testprotocolsettingsmultipleobjects(self):
+        result = self._sendsingleframe(
+            makereactor(),
+            ffs(b'0 1 stream-begin sender-protocol-settings eos '
+                b'\x46foobar\x43foo'))
+        self.assertaction(result, b'error')
+        self.assertEqual(result[1], {
+            b'message': b'sender protocol settings frame contained multiple '
+                        b'CBOR values',
+        })
+
+    def testprotocolsettingscontentencodings(self):
+        reactor = makereactor()
+
+        result = self._sendsingleframe(
+            reactor,
+            ffs(b'0 1 stream-begin sender-protocol-settings eos '
+                b'cbor:{b"contentencodings": [b"a", b"b"]}'))
+        self.assertaction(result, b'wantframe')
+
+        self.assertEqual(reactor._state, b'idle')
+        self.assertEqual(reactor._sendersettings[b'contentencodings'],
+                         [b'a', b'b'])
+
+    def testprotocolsettingsmultipleframes(self):
+        reactor = makereactor()
+
+        data = b''.join(cborutil.streamencode({
+            b'contentencodings': [b'value1', b'value2'],
+        }))
+
+        results = list(sendframes(reactor, [
+            ffs(b'0 1 stream-begin sender-protocol-settings continuation %s' %
+                data[0:5]),
+            ffs(b'0 1 0 sender-protocol-settings eos %s' % data[5:]),
+        ]))
+
+        self.assertEqual(len(results), 2)
+
+        self.assertaction(results[0], b'wantframe')
+        self.assertaction(results[1], b'wantframe')
+
+        self.assertEqual(reactor._state, b'idle')
+        self.assertEqual(reactor._sendersettings[b'contentencodings'],
+                         [b'value1', b'value2'])
+
+    def testprotocolsettingsbadcbor(self):
+        result = self._sendsingleframe(
+            makereactor(),
+            ffs(b'0 1 stream-begin sender-protocol-settings eos badvalue'))
+        self.assertaction(result, b'error')
+
+    def testprotocolsettingsnoninitial(self):
+        # Cannot have protocol settings frames as non-initial frames.
+        reactor = makereactor()
+
+        stream = framing.stream(1)
+        results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {}))
+        self.assertEqual(len(results), 1)
+        self.assertaction(results[0], b'runcommand')
+
+        result = self._sendsingleframe(
+            reactor,
+            ffs(b'0 1 0 sender-protocol-settings eos '))
+        self.assertaction(result, b'error')
+        self.assertEqual(result[1], {
+            b'message': b'expected command request frame; got 8',
+        })
+
 if __name__ == '__main__':
     import silenttestrunner
     silenttestrunner.main(__name__)