mercurial/wireprotoframing.py
changeset 37288 9bfcbe4f4745
parent 37285 3ed344546d9e
child 37289 5fadc63ac99f
--- a/mercurial/wireprotoframing.py	Mon Mar 26 13:57:22 2018 -0700
+++ b/mercurial/wireprotoframing.py	Mon Mar 26 11:00:16 2018 -0700
@@ -25,15 +25,26 @@
     stringutil,
 )
 
-FRAME_HEADER_SIZE = 6
+FRAME_HEADER_SIZE = 8
 DEFAULT_MAX_FRAME_SIZE = 32768
 
+STREAM_FLAG_BEGIN_STREAM = 0x01
+STREAM_FLAG_END_STREAM = 0x02
+STREAM_FLAG_ENCODING_APPLIED = 0x04
+
+STREAM_FLAGS = {
+    b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
+    b'stream-end': STREAM_FLAG_END_STREAM,
+    b'encoded': STREAM_FLAG_ENCODING_APPLIED,
+}
+
 FRAME_TYPE_COMMAND_NAME = 0x01
 FRAME_TYPE_COMMAND_ARGUMENT = 0x02
 FRAME_TYPE_COMMAND_DATA = 0x03
 FRAME_TYPE_BYTES_RESPONSE = 0x04
 FRAME_TYPE_ERROR_RESPONSE = 0x05
 FRAME_TYPE_TEXT_OUTPUT = 0x06
+FRAME_TYPE_STREAM_SETTINGS = 0x08
 
 FRAME_TYPES = {
     b'command-name': FRAME_TYPE_COMMAND_NAME,
@@ -42,6 +53,7 @@
     b'bytes-response': FRAME_TYPE_BYTES_RESPONSE,
     b'error-response': FRAME_TYPE_ERROR_RESPONSE,
     b'text-output': FRAME_TYPE_TEXT_OUTPUT,
+    b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
 }
 
 FLAG_COMMAND_NAME_EOS = 0x01
@@ -94,6 +106,7 @@
     FRAME_TYPE_BYTES_RESPONSE: FLAGS_BYTES_RESPONSE,
     FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE,
     FRAME_TYPE_TEXT_OUTPUT: {},
+    FRAME_TYPE_STREAM_SETTINGS: {},
 }
 
 ARGUMENT_FRAME_HEADER = struct.Struct(r'<HH')
@@ -104,6 +117,8 @@
 
     length = attr.ib()
     requestid = attr.ib()
+    streamid = attr.ib()
+    streamflags = attr.ib()
     typeid = attr.ib()
     flags = attr.ib()
 
@@ -112,25 +127,29 @@
     """Represents a parsed frame."""
 
     requestid = attr.ib()
+    streamid = attr.ib()
+    streamflags = attr.ib()
     typeid = attr.ib()
     flags = attr.ib()
     payload = attr.ib()
 
-def makeframe(requestid, typeid, flags, payload):
+def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
     """Assemble a frame into a byte array."""
     # TODO assert size of payload.
     frame = bytearray(FRAME_HEADER_SIZE + len(payload))
 
     # 24 bits length
     # 16 bits request id
+    # 8 bits stream id
+    # 8 bits stream flags
     # 4 bits type
     # 4 bits flags
 
     l = struct.pack(r'<I', len(payload))
     frame[0:3] = l[0:3]
-    struct.pack_into(r'<H', frame, 3, requestid)
-    frame[5] = (typeid << 4) | flags
-    frame[6:] = payload
+    struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
+    frame[7] = (typeid << 4) | flags
+    frame[8:] = payload
 
     return frame
 
@@ -139,20 +158,30 @@
 
     Strings have the form:
 
-        <request-id> <type> <flags> <payload>
+        <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
 
     This can be used by user-facing applications and tests for creating
     frames easily without having to type out a bunch of constants.
 
-    Request ID is an integer.
+    Request ID and stream IDs are integers.
 
-    Frame type and flags can be specified by integer or named constant.
+    Stream flags, frame type, and flags can be specified by integer or
+    named constant.
 
     Flags can be delimited by `|` to bitwise OR them together.
     """
-    requestid, frametype, frameflags, payload = s.split(b' ', 3)
+    fields = s.split(b' ', 5)
+    requestid, streamid, streamflags, frametype, frameflags, payload = fields
 
     requestid = int(requestid)
+    streamid = int(streamid)
+
+    finalstreamflags = 0
+    for flag in streamflags.split(b'|'):
+        if flag in STREAM_FLAGS:
+            finalstreamflags |= STREAM_FLAGS[flag]
+        else:
+            finalstreamflags |= int(flag)
 
     if frametype in FRAME_TYPES:
         frametype = FRAME_TYPES[frametype]
@@ -169,7 +198,8 @@
 
     payload = stringutil.unescapestr(payload)
 
-    return makeframe(requestid=requestid, typeid=frametype,
+    return makeframe(requestid=requestid, streamid=streamid,
+                     streamflags=finalstreamflags, typeid=frametype,
                      flags=finalflags, payload=payload)
 
 def parseheader(data):
@@ -179,17 +209,21 @@
     buffer is expected to be large enough to hold a full header.
     """
     # 24 bits payload length (little endian)
+    # 16 bits request ID
+    # 8 bits stream ID
+    # 8 bits stream flags
     # 4 bits frame type
     # 4 bits frame flags
     # ... payload
     framelength = data[0] + 256 * data[1] + 16384 * data[2]
-    requestid = struct.unpack_from(r'<H', data, 3)[0]
-    typeflags = data[5]
+    requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
+    typeflags = data[7]
 
     frametype = (typeflags & 0xf0) >> 4
     frameflags = typeflags & 0x0f
 
-    return frameheader(framelength, requestid, frametype, frameflags)
+    return frameheader(framelength, requestid, streamid, streamflags,
+                       frametype, frameflags)
 
 def readframe(fh):
     """Read a unified framing protocol frame from a file object.
@@ -216,7 +250,8 @@
         raise error.Abort(_('frame length error: expected %d; got %d') %
                           (h.length, len(payload)))
 
-    return frame(h.requestid, h.typeid, h.flags, payload)
+    return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
+                 payload)
 
 def createcommandframes(stream, requestid, cmd, args, datafh=None):
     """Create frames necessary to transmit a request to run a command.
@@ -398,12 +433,28 @@
 class stream(object):
     """Represents a logical unidirectional series of frames."""
 
+    def __init__(self, streamid, active=False):
+        self.streamid = streamid
+        self._active = False
+
     def makeframe(self, requestid, typeid, flags, payload):
         """Create a frame to be sent out over this stream.
 
         Only returns the frame instance. Does not actually send it.
         """
-        return makeframe(requestid, typeid, flags, payload)
+        streamflags = 0
+        if not self._active:
+            streamflags |= STREAM_FLAG_BEGIN_STREAM
+            self._active = True
+
+        return makeframe(requestid, self.streamid, streamflags, typeid, flags,
+                         payload)
+
+def ensureserverstream(stream):
+    if stream.streamid % 2:
+        raise error.ProgrammingError('server should only write to even '
+                                     'numbered streams; %d is not even' %
+                                     stream.streamid)
 
 class serverreactor(object):
     """Holds state of a server handling frame-based protocol requests.
@@ -483,6 +534,8 @@
         self._deferoutput = deferoutput
         self._state = 'idle'
         self._bufferedframegens = []
+        # stream id -> stream instance for all active streams from the client.
+        self._incomingstreams = {}
         # request id -> dict of commands that are actively being received.
         self._receivingcommands = {}
         # Request IDs that have been received and are actively being processed.
@@ -496,6 +549,30 @@
         Returns a dict with an ``action`` key that details what action,
         if any, the consumer should take next.
         """
+        if not frame.streamid % 2:
+            self._state = 'errored'
+            return self._makeerrorresult(
+                _('received frame with even numbered stream ID: %d') %
+                  frame.streamid)
+
+        if frame.streamid not in self._incomingstreams:
+            if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
+                self._state = 'errored'
+                return self._makeerrorresult(
+                    _('received frame on unknown inactive stream without '
+                      'beginning of stream flag set'))
+
+            self._incomingstreams[frame.streamid] = stream(frame.streamid)
+
+        if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
+            # TODO handle decoding frames
+            self._state = 'errored'
+            raise error.ProgrammingError('support for decoding stream payloads '
+                                         'not yet implemented')
+
+        if frame.streamflags & STREAM_FLAG_END_STREAM:
+            del self._incomingstreams[frame.streamid]
+
         handlers = {
             'idle': self._onframeidle,
             'command-receiving': self._onframecommandreceiving,
@@ -513,6 +590,8 @@
 
         The raw bytes response is passed as an argument.
         """
+        ensureserverstream(stream)
+
         def sendframes():
             for frame in createbytesresponseframesfrombytes(stream, requestid,
                                                             data):
@@ -552,6 +631,8 @@
         }
 
     def onapplicationerror(self, stream, requestid, msg):
+        ensureserverstream(stream)
+
         return 'sendframes', {
             'framegen': createerrorframe(stream, requestid, msg,
                                          application=True),