mercurial/wireprotoframing.py
changeset 37292 3d0e2cd86e05
parent 37291 b0041036214e
child 37298 5ef2da00e935
--- a/mercurial/wireprotoframing.py	Mon Mar 26 10:50:36 2018 -0700
+++ b/mercurial/wireprotoframing.py	Mon Mar 26 14:34:32 2018 -0700
@@ -39,8 +39,7 @@
     b'encoded': STREAM_FLAG_ENCODING_APPLIED,
 }
 
-FRAME_TYPE_COMMAND_NAME = 0x01
-FRAME_TYPE_COMMAND_ARGUMENT = 0x02
+FRAME_TYPE_COMMAND_REQUEST = 0x01
 FRAME_TYPE_COMMAND_DATA = 0x03
 FRAME_TYPE_BYTES_RESPONSE = 0x04
 FRAME_TYPE_ERROR_RESPONSE = 0x05
@@ -49,8 +48,7 @@
 FRAME_TYPE_STREAM_SETTINGS = 0x08
 
 FRAME_TYPES = {
-    b'command-name': FRAME_TYPE_COMMAND_NAME,
-    b'command-argument': FRAME_TYPE_COMMAND_ARGUMENT,
+    b'command-request': FRAME_TYPE_COMMAND_REQUEST,
     b'command-data': FRAME_TYPE_COMMAND_DATA,
     b'bytes-response': FRAME_TYPE_BYTES_RESPONSE,
     b'error-response': FRAME_TYPE_ERROR_RESPONSE,
@@ -59,22 +57,16 @@
     b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
 }
 
-FLAG_COMMAND_NAME_EOS = 0x01
-FLAG_COMMAND_NAME_HAVE_ARGS = 0x02
-FLAG_COMMAND_NAME_HAVE_DATA = 0x04
+FLAG_COMMAND_REQUEST_NEW = 0x01
+FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
+FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
+FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
 
-FLAGS_COMMAND = {
-    b'eos': FLAG_COMMAND_NAME_EOS,
-    b'have-args': FLAG_COMMAND_NAME_HAVE_ARGS,
-    b'have-data': FLAG_COMMAND_NAME_HAVE_DATA,
-}
-
-FLAG_COMMAND_ARGUMENT_CONTINUATION = 0x01
-FLAG_COMMAND_ARGUMENT_EOA = 0x02
-
-FLAGS_COMMAND_ARGUMENT = {
-    b'continuation': FLAG_COMMAND_ARGUMENT_CONTINUATION,
-    b'eoa': FLAG_COMMAND_ARGUMENT_EOA,
+FLAGS_COMMAND_REQUEST = {
+    b'new': FLAG_COMMAND_REQUEST_NEW,
+    b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
+    b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
+    b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
 }
 
 FLAG_COMMAND_DATA_CONTINUATION = 0x01
@@ -103,8 +95,7 @@
 
 # Maps frame types to their available flags.
 FRAME_TYPE_FLAGS = {
-    FRAME_TYPE_COMMAND_NAME: FLAGS_COMMAND,
-    FRAME_TYPE_COMMAND_ARGUMENT: FLAGS_COMMAND_ARGUMENT,
+    FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
     FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
     FRAME_TYPE_BYTES_RESPONSE: FLAGS_BYTES_RESPONSE,
     FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE,
@@ -113,7 +104,7 @@
     FRAME_TYPE_STREAM_SETTINGS: {},
 }
 
-ARGUMENT_FRAME_HEADER = struct.Struct(r'<HH')
+ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
 
 @attr.s(slots=True)
 class frameheader(object):
@@ -269,43 +260,48 @@
     return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
                  payload)
 
-def createcommandframes(stream, requestid, cmd, args, datafh=None):
+def createcommandframes(stream, requestid, cmd, args, datafh=None,
+                        maxframesize=DEFAULT_MAX_FRAME_SIZE):
     """Create frames necessary to transmit a request to run a command.
 
     This is a generator of bytearrays. Each item represents a frame
     ready to be sent over the wire to a peer.
     """
-    flags = 0
+    data = {b'name': cmd}
     if args:
-        flags |= FLAG_COMMAND_NAME_HAVE_ARGS
-    if datafh:
-        flags |= FLAG_COMMAND_NAME_HAVE_DATA
+        data[b'args'] = args
 
-    if not flags:
-        flags |= FLAG_COMMAND_NAME_EOS
+    data = cbor.dumps(data, canonical=True)
 
-    yield stream.makeframe(requestid=requestid, typeid=FRAME_TYPE_COMMAND_NAME,
-                           flags=flags, payload=cmd)
+    offset = 0
+
+    while True:
+        flags = 0
 
-    for i, k in enumerate(sorted(args)):
-        v = args[k]
-        last = i == len(args) - 1
+        # Must set new or continuation flag.
+        if not offset:
+            flags |= FLAG_COMMAND_REQUEST_NEW
+        else:
+            flags |= FLAG_COMMAND_REQUEST_CONTINUATION
 
-        # TODO handle splitting of argument values across frames.
-        payload = bytearray(ARGUMENT_FRAME_HEADER.size + len(k) + len(v))
-        offset = 0
-        ARGUMENT_FRAME_HEADER.pack_into(payload, offset, len(k), len(v))
-        offset += ARGUMENT_FRAME_HEADER.size
-        payload[offset:offset + len(k)] = k
-        offset += len(k)
-        payload[offset:offset + len(v)] = v
+        # Data frames is set on all frames.
+        if datafh:
+            flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
 
-        flags = FLAG_COMMAND_ARGUMENT_EOA if last else 0
+        payload = data[offset:offset + maxframesize]
+        offset += len(payload)
+
+        if len(payload) == maxframesize and offset < len(data):
+            flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
+
         yield stream.makeframe(requestid=requestid,
-                               typeid=FRAME_TYPE_COMMAND_ARGUMENT,
+                               typeid=FRAME_TYPE_COMMAND_REQUEST,
                                flags=flags,
                                payload=payload)
 
+        if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
+            break
+
     if datafh:
         while True:
             data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
@@ -673,6 +669,12 @@
 
     def _makeruncommandresult(self, requestid):
         entry = self._receivingcommands[requestid]
+
+        if not entry['requestdone']:
+            self._state = 'errored'
+            raise error.ProgrammingError('should not be called without '
+                                         'requestdone set')
+
         del self._receivingcommands[requestid]
 
         if self._receivingcommands:
@@ -680,13 +682,25 @@
         else:
             self._state = 'idle'
 
+        # Decode the payloads as CBOR.
+        entry['payload'].seek(0)
+        request = cbor.load(entry['payload'])
+
+        if b'name' not in request:
+            self._state = 'errored'
+            return self._makeerrorresult(
+                _('command request missing "name" field'))
+
+        if b'args' not in request:
+            request[b'args'] = {}
+
         assert requestid not in self._activecommands
         self._activecommands.add(requestid)
 
         return 'runcommand', {
             'requestid': requestid,
-            'command': entry['command'],
-            'args': entry['args'],
+            'command': request[b'name'],
+            'args': request[b'args'],
             'data': entry['data'].getvalue() if entry['data'] else None,
         }
 
@@ -695,13 +709,33 @@
             'state': self._state,
         }
 
+    def _validatecommandrequestframe(self, frame):
+        new = frame.flags & FLAG_COMMAND_REQUEST_NEW
+        continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
+
+        if new and continuation:
+            self._state = 'errored'
+            return self._makeerrorresult(
+                _('received command request frame with both new and '
+                  'continuation flags set'))
+
+        if not new and not continuation:
+            self._state = 'errored'
+            return self._makeerrorresult(
+                _('received command request frame with neither new nor '
+                  'continuation flags set'))
+
     def _onframeidle(self, frame):
         # The only frame type that should be received in this state is a
         # command request.
-        if frame.typeid != FRAME_TYPE_COMMAND_NAME:
+        if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
             self._state = 'errored'
             return self._makeerrorresult(
-                _('expected command frame; got %d') % frame.typeid)
+                _('expected command request frame; got %d') % frame.typeid)
+
+        res = self._validatecommandrequestframe(frame)
+        if res:
+            return res
 
         if frame.requestid in self._receivingcommands:
             self._state = 'errored'
@@ -710,35 +744,45 @@
 
         if frame.requestid in self._activecommands:
             self._state = 'errored'
-            return self._makeerrorresult((
-                _('request with ID %d is already active') % frame.requestid))
+            return self._makeerrorresult(
+                _('request with ID %d is already active') % frame.requestid)
+
+        new = frame.flags & FLAG_COMMAND_REQUEST_NEW
+        moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
+        expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
 
-        expectingargs = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_ARGS)
-        expectingdata = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_DATA)
+        if not new:
+            self._state = 'errored'
+            return self._makeerrorresult(
+                _('received command request frame without new flag set'))
+
+        payload = util.bytesio()
+        payload.write(frame.payload)
 
         self._receivingcommands[frame.requestid] = {
-            'command': frame.payload,
-            'args': {},
+            'payload': payload,
             'data': None,
-            'expectingargs': expectingargs,
-            'expectingdata': expectingdata,
+            'requestdone': not moreframes,
+            'expectingdata': bool(expectingdata),
         }
 
-        if frame.flags & FLAG_COMMAND_NAME_EOS:
+        # This is the final frame for this request. Dispatch it.
+        if not moreframes and not expectingdata:
             return self._makeruncommandresult(frame.requestid)
 
-        if expectingargs or expectingdata:
-            self._state = 'command-receiving'
-            return self._makewantframeresult()
-        else:
-            self._state = 'errored'
-            return self._makeerrorresult(_('missing frame flags on '
-                                           'command frame'))
+        assert moreframes or expectingdata
+        self._state = 'command-receiving'
+        return self._makewantframeresult()
 
     def _onframecommandreceiving(self, frame):
-        # It could be a new command request. Process it as such.
-        if frame.typeid == FRAME_TYPE_COMMAND_NAME:
-            return self._onframeidle(frame)
+        if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
+            # Process new command requests as such.
+            if frame.flags & FLAG_COMMAND_REQUEST_NEW:
+                return self._onframeidle(frame)
+
+            res = self._validatecommandrequestframe(frame)
+            if res:
+                return res
 
         # All other frames should be related to a command that is currently
         # receiving but is not active.
@@ -756,14 +800,30 @@
 
         entry = self._receivingcommands[frame.requestid]
 
-        if frame.typeid == FRAME_TYPE_COMMAND_ARGUMENT:
-            if not entry['expectingargs']:
+        if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
+            moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
+            expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
+
+            if entry['requestdone']:
+                self._state = 'errored'
+                return self._makeerrorresult(
+                    _('received command request frame when request frames '
+                      'were supposedly done'))
+
+            if expectingdata != entry['expectingdata']:
                 self._state = 'errored'
-                return self._makeerrorresult(_(
-                    'received command argument frame for request that is not '
-                    'expecting arguments: %d') % frame.requestid)
+                return self._makeerrorresult(
+                    _('mismatch between expect data flag and previous frame'))
+
+            entry['payload'].write(frame.payload)
 
-            return self._handlecommandargsframe(frame, entry)
+            if not moreframes:
+                entry['requestdone'] = True
+
+            if not moreframes and not expectingdata:
+                return self._makeruncommandresult(frame.requestid)
+
+            return self._makewantframeresult()
 
         elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
             if not entry['expectingdata']:
@@ -776,50 +836,10 @@
                 entry['data'] = util.bytesio()
 
             return self._handlecommanddataframe(frame, entry)
-
-    def _handlecommandargsframe(self, frame, entry):
-        # The frame and state of command should have already been validated.
-        assert frame.typeid == FRAME_TYPE_COMMAND_ARGUMENT
-
-        offset = 0
-        namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(frame.payload)
-        offset += ARGUMENT_FRAME_HEADER.size
-
-        # The argument name MUST fit inside the frame.
-        argname = bytes(frame.payload[offset:offset + namesize])
-        offset += namesize
-
-        if len(argname) != namesize:
+        else:
             self._state = 'errored'
-            return self._makeerrorresult(_('malformed argument frame: '
-                                           'partial argument name'))
-
-        argvalue = bytes(frame.payload[offset:])
-
-        # Argument value spans multiple frames. Record our active state
-        # and wait for the next frame.
-        if frame.flags & FLAG_COMMAND_ARGUMENT_CONTINUATION:
-            raise error.ProgrammingError('not yet implemented')
-
-        # Common case: the argument value is completely contained in this
-        # frame.
-
-        if len(argvalue) != valuesize:
-            self._state = 'errored'
-            return self._makeerrorresult(_('malformed argument frame: '
-                                           'partial argument value'))
-
-        entry['args'][argname] = argvalue
-
-        if frame.flags & FLAG_COMMAND_ARGUMENT_EOA:
-            if entry['expectingdata']:
-                # TODO signal request to run a command once we don't
-                # buffer data frames.
-                return self._makewantframeresult()
-            else:
-                return self._makeruncommandresult(frame.requestid)
-        else:
-            return self._makewantframeresult()
+            return self._makeerrorresult(_(
+                'received unexpected frame type: %d') % frame.typeid)
 
     def _handlecommanddataframe(self, frame, entry):
         assert frame.typeid == FRAME_TYPE_COMMAND_DATA