mercurial/wireprotoframing.py
changeset 37057 2ec1fb9de638
parent 37056 861e9d37e56e
child 37058 c5e9c3b47366
--- a/mercurial/wireprotoframing.py	Wed Mar 14 14:01:16 2018 -0700
+++ b/mercurial/wireprotoframing.py	Wed Mar 14 16:51:34 2018 -0700
@@ -19,7 +19,7 @@
     util,
 )
 
-FRAME_HEADER_SIZE = 4
+FRAME_HEADER_SIZE = 6
 DEFAULT_MAX_FRAME_SIZE = 32768
 
 FRAME_TYPE_COMMAND_NAME = 0x01
@@ -89,28 +89,43 @@
 
 ARGUMENT_FRAME_HEADER = struct.Struct(r'<HH')
 
-def makeframe(frametype, frameflags, payload):
+def makeframe(requestid, frametype, frameflags, payload):
     """Assemble a frame into a byte array."""
     # TODO assert size of payload.
     frame = bytearray(FRAME_HEADER_SIZE + len(payload))
 
+    # 24 bits length
+    # 16 bits request id
+    # 4 bits type
+    # 4 bits flags
+
     l = struct.pack(r'<I', len(payload))
     frame[0:3] = l[0:3]
-    frame[3] = (frametype << 4) | frameflags
-    frame[4:] = payload
+    struct.pack_into(r'<H', frame, 3, requestid)
+    frame[5] = (frametype << 4) | frameflags
+    frame[6:] = payload
 
     return frame
 
 def makeframefromhumanstring(s):
-    """Given a string of the form: <type> <flags> <payload>, creates a frame.
+    """Create a frame from a human readable string
+
+    Strings have the form:
+
+        <request-id> <type> <flags> <payload>
 
     This can be used by user-facing applications and tests for creating
     frames easily without having to type out a bunch of constants.
 
+    Request ID is an integer.
+
     Frame type and flags can be specified by integer or named constant.
+
     Flags can be delimited by `|` to bitwise OR them together.
     """
-    frametype, frameflags, payload = s.split(b' ', 2)
+    requestid, frametype, frameflags, payload = s.split(b' ', 3)
+
+    requestid = int(requestid)
 
     if frametype in FRAME_TYPES:
         frametype = FRAME_TYPES[frametype]
@@ -127,7 +142,7 @@
 
     payload = util.unescapestr(payload)
 
-    return makeframe(frametype, finalflags, payload)
+    return makeframe(requestid, frametype, finalflags, payload)
 
 def parseheader(data):
     """Parse a unified framing protocol frame header from a buffer.
@@ -140,12 +155,13 @@
     # 4 bits frame flags
     # ... payload
     framelength = data[0] + 256 * data[1] + 16384 * data[2]
-    typeflags = data[3]
+    requestid = struct.unpack_from(r'<H', data, 3)[0]
+    typeflags = data[5]
 
     frametype = (typeflags & 0xf0) >> 4
     frameflags = typeflags & 0x0f
 
-    return frametype, frameflags, framelength
+    return requestid, frametype, frameflags, framelength
 
 def readframe(fh):
     """Read a unified framing protocol frame from a file object.
@@ -165,16 +181,16 @@
         raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
                           (readcount, header))
 
-    frametype, frameflags, framelength = parseheader(header)
+    requestid, frametype, frameflags, framelength = parseheader(header)
 
     payload = fh.read(framelength)
     if len(payload) != framelength:
         raise error.Abort(_('frame length error: expected %d; got %d') %
                           (framelength, len(payload)))
 
-    return frametype, frameflags, payload
+    return requestid, frametype, frameflags, payload
 
-def createcommandframes(cmd, args, datafh=None):
+def createcommandframes(requestid, cmd, args, datafh=None):
     """Create frames necessary to transmit a request to run a command.
 
     This is a generator of bytearrays. Each item represents a frame
@@ -189,7 +205,7 @@
     if not flags:
         flags |= FLAG_COMMAND_NAME_EOS
 
-    yield makeframe(FRAME_TYPE_COMMAND_NAME, flags, cmd)
+    yield makeframe(requestid, FRAME_TYPE_COMMAND_NAME, flags, cmd)
 
     for i, k in enumerate(sorted(args)):
         v = args[k]
@@ -205,7 +221,7 @@
         payload[offset:offset + len(v)] = v
 
         flags = FLAG_COMMAND_ARGUMENT_EOA if last else 0
-        yield makeframe(FRAME_TYPE_COMMAND_ARGUMENT, flags, payload)
+        yield makeframe(requestid, FRAME_TYPE_COMMAND_ARGUMENT, flags, payload)
 
     if datafh:
         while True:
@@ -219,12 +235,12 @@
                 assert datafh.read(1) == b''
                 done = True
 
-            yield makeframe(FRAME_TYPE_COMMAND_DATA, flags, data)
+            yield makeframe(requestid, FRAME_TYPE_COMMAND_DATA, flags, data)
 
             if done:
                 break
 
-def createbytesresponseframesfrombytes(data,
+def createbytesresponseframesfrombytes(requestid, data,
                                        maxframesize=DEFAULT_MAX_FRAME_SIZE):
     """Create a raw frame to send a bytes response from static bytes input.
 
@@ -233,7 +249,7 @@
 
     # Simple case of a single frame.
     if len(data) <= maxframesize:
-        yield makeframe(FRAME_TYPE_BYTES_RESPONSE,
+        yield makeframe(requestid, FRAME_TYPE_BYTES_RESPONSE,
                         FLAG_BYTES_RESPONSE_EOS, data)
         return
 
@@ -248,12 +264,12 @@
         else:
             flags = FLAG_BYTES_RESPONSE_CONTINUATION
 
-        yield makeframe(FRAME_TYPE_BYTES_RESPONSE, flags, chunk)
+        yield makeframe(requestid, FRAME_TYPE_BYTES_RESPONSE, flags, chunk)
 
         if done:
             break
 
-def createerrorframe(msg, protocol=False, application=False):
+def createerrorframe(requestid, msg, protocol=False, application=False):
     # TODO properly handle frame size limits.
     assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
 
@@ -263,7 +279,7 @@
     if application:
         flags |= FLAG_ERROR_RESPONSE_APPLICATION
 
-    yield makeframe(FRAME_TYPE_ERROR_RESPONSE, flags, msg)
+    yield makeframe(requestid, FRAME_TYPE_ERROR_RESPONSE, flags, msg)
 
 class serverreactor(object):
     """Holds state of a server handling frame-based protocol requests.
@@ -326,6 +342,7 @@
         self._deferoutput = deferoutput
         self._state = 'idle'
         self._bufferedframegens = []
+        self._activerequestid = None
         self._activecommand = None
         self._activeargs = None
         self._activedata = None
@@ -334,7 +351,7 @@
         self._activeargname = None
         self._activeargchunks = None
 
-    def onframerecv(self, frametype, frameflags, payload):
+    def onframerecv(self, requestid, frametype, frameflags, payload):
         """Process a frame that has been received off the wire.
 
         Returns a dict with an ``action`` key that details what action,
@@ -351,14 +368,14 @@
         if not meth:
             raise error.ProgrammingError('unhandled state: %s' % self._state)
 
-        return meth(frametype, frameflags, payload)
+        return meth(requestid, frametype, frameflags, payload)
 
-    def onbytesresponseready(self, data):
+    def onbytesresponseready(self, requestid, data):
         """Signal that a bytes response is ready to be sent to the client.
 
         The raw bytes response is passed as an argument.
         """
-        framegen = createbytesresponseframesfrombytes(data)
+        framegen = createbytesresponseframesfrombytes(requestid, data)
 
         if self._deferoutput:
             self._bufferedframegens.append(framegen)
@@ -387,9 +404,9 @@
             'framegen': makegen(),
         }
 
-    def onapplicationerror(self, msg):
+    def onapplicationerror(self, requestid, msg):
         return 'sendframes', {
-            'framegen': createerrorframe(msg, application=True),
+            'framegen': createerrorframe(requestid, msg, application=True),
         }
 
     def _makeerrorresult(self, msg):
@@ -399,6 +416,7 @@
 
     def _makeruncommandresult(self):
         return 'runcommand', {
+            'requestid': self._activerequestid,
             'command': self._activecommand,
             'args': self._activeargs,
             'data': self._activedata.getvalue() if self._activedata else None,
@@ -409,7 +427,7 @@
             'state': self._state,
         }
 
-    def _onframeidle(self, frametype, frameflags, payload):
+    def _onframeidle(self, requestid, frametype, frameflags, payload):
         # The only frame type that should be received in this state is a
         # command request.
         if frametype != FRAME_TYPE_COMMAND_NAME:
@@ -417,6 +435,7 @@
             return self._makeerrorresult(
                 _('expected command frame; got %d') % frametype)
 
+        self._activerequestid = requestid
         self._activecommand = payload
         self._activeargs = {}
         self._activedata = None
@@ -439,7 +458,7 @@
             return self._makeerrorresult(_('missing frame flags on '
                                            'command frame'))
 
-    def _onframereceivingargs(self, frametype, frameflags, payload):
+    def _onframereceivingargs(self, requestid, frametype, frameflags, payload):
         if frametype != FRAME_TYPE_COMMAND_ARGUMENT:
             self._state = 'errored'
             return self._makeerrorresult(_('expected command argument '
@@ -492,7 +511,7 @@
         else:
             return self._makewantframeresult()
 
-    def _onframereceivingdata(self, frametype, frameflags, payload):
+    def _onframereceivingdata(self, requestid, frametype, frameflags, payload):
         if frametype != FRAME_TYPE_COMMAND_DATA:
             self._state = 'errored'
             return self._makeerrorresult(_('expected command data frame; '
@@ -512,5 +531,5 @@
             return self._makeerrorresult(_('command data frame without '
                                            'flags'))
 
-    def _onframeerrored(self, frametype, frameflags, payload):
+    def _onframeerrored(self, requestid, frametype, frameflags, payload):
         return self._makeerrorresult(_('server already errored'))