mercurial/wireprotoframing.py
changeset 37055 8c3c47362934
parent 37054 40206e227412
child 37058 61393f888dfe
--- a/mercurial/wireprotoframing.py	Mon Mar 19 16:49:53 2018 -0700
+++ b/mercurial/wireprotoframing.py	Wed Mar 14 15:25:06 2018 -0700
@@ -13,7 +13,9 @@
 
 import struct
 
+from .i18n import _
 from . import (
+    error,
     util,
 )
 
@@ -105,6 +107,51 @@
 
     return makeframe(frametype, finalflags, payload)
 
+def parseheader(data):
+    """Parse a unified framing protocol frame header from a buffer.
+
+    The header is expected to be in the buffer at offset 0 and the
+    buffer is expected to be large enough to hold a full header.
+    """
+    # 24 bits payload length (little endian)
+    # 4 bits frame type
+    # 4 bits frame flags
+    # ... payload
+    framelength = data[0] + 256 * data[1] + 16384 * data[2]
+    typeflags = data[3]
+
+    frametype = (typeflags & 0xf0) >> 4
+    frameflags = typeflags & 0x0f
+
+    return frametype, frameflags, framelength
+
+def readframe(fh):
+    """Read a unified framing protocol frame from a file object.
+
+    Returns a 3-tuple of (type, flags, payload) for the decoded frame or
+    None if no frame is available. May raise if a malformed frame is
+    seen.
+    """
+    header = bytearray(FRAME_HEADER_SIZE)
+
+    readcount = fh.readinto(header)
+
+    if readcount == 0:
+        return None
+
+    if readcount != FRAME_HEADER_SIZE:
+        raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
+                          (readcount, header))
+
+    frametype, frameflags, framelength = parseheader(header)
+
+    payload = fh.read(framelength)
+    if len(payload) != framelength:
+        raise error.Abort(_('frame length error: expected %d; got %d') %
+                          (framelength, len(payload)))
+
+    return frametype, frameflags, payload
+
 def createcommandframes(cmd, args, datafh=None):
     """Create frames necessary to transmit a request to run a command.
 
@@ -154,3 +201,195 @@
 
             if done:
                 break
+
+class serverreactor(object):
+    """Holds state of a server handling frame-based protocol requests.
+
+    This class is the "brain" of the unified frame-based protocol server
+    component. While the protocol is stateless from the perspective of
+    requests/commands, something needs to track which frames have been
+    received, what frames to expect, etc. This class is that thing.
+
+    Instances are modeled as a state machine of sorts. Instances are also
+    reactionary to external events. The point of this class is to encapsulate
+    the state of the connection and the exchange of frames, not to perform
+    work. Instead, callers tell this class when something occurs, like a
+    frame arriving. If that activity is worthy of a follow-up action (say
+    *run a command*), the return value of that handler will say so.
+
+    I/O and CPU intensive operations are purposefully delegated outside of
+    this class.
+
+    Consumers are expected to tell instances when events occur. They do so by
+    calling the various ``on*`` methods. These methods return a 2-tuple
+    describing any follow-up action(s) to take. The first element is the
+    name of an action to perform. The second is a data structure (usually
+    a dict) specific to that action that contains more information. e.g.
+    if the server wants to send frames back to the client, the data structure
+    will contain a reference to those frames.
+
+    Valid actions that consumers can be instructed to take are:
+
+    error
+       Indicates that an error occurred. Consumer should probably abort.
+
+    runcommand
+       Indicates that the consumer should run a wire protocol command. Details
+       of the command to run are given in the data structure.
+
+    wantframe
+       Indicates that nothing of interest happened and the server is waiting on
+       more frames from the client before anything interesting can be done.
+    """
+
+    def __init__(self):
+        self._state = 'idle'
+        self._activecommand = None
+        self._activeargs = None
+        self._activedata = None
+        self._expectingargs = None
+        self._expectingdata = None
+        self._activeargname = None
+        self._activeargchunks = None
+
+    def onframerecv(self, frametype, frameflags, payload):
+        """Process a frame that has been received off the wire.
+
+        Returns a dict with an ``action`` key that details what action,
+        if any, the consumer should take next.
+        """
+        handlers = {
+            'idle': self._onframeidle,
+            'command-receiving-args': self._onframereceivingargs,
+            'command-receiving-data': self._onframereceivingdata,
+            'errored': self._onframeerrored,
+        }
+
+        meth = handlers.get(self._state)
+        if not meth:
+            raise error.ProgrammingError('unhandled state: %s' % self._state)
+
+        return meth(frametype, frameflags, payload)
+
+    def _makeerrorresult(self, msg):
+        return 'error', {
+            'message': msg,
+        }
+
+    def _makeruncommandresult(self):
+        return 'runcommand', {
+            'command': self._activecommand,
+            'args': self._activeargs,
+            'data': self._activedata.getvalue() if self._activedata else None,
+        }
+
+    def _makewantframeresult(self):
+        return 'wantframe', {
+            'state': self._state,
+        }
+
+    def _onframeidle(self, frametype, frameflags, payload):
+        # The only frame type that should be received in this state is a
+        # command request.
+        if frametype != FRAME_TYPE_COMMAND_NAME:
+            self._state = 'errored'
+            return self._makeerrorresult(
+                _('expected command frame; got %d') % frametype)
+
+        self._activecommand = payload
+        self._activeargs = {}
+        self._activedata = None
+
+        if frameflags & FLAG_COMMAND_NAME_EOS:
+            return self._makeruncommandresult()
+
+        self._expectingargs = bool(frameflags & FLAG_COMMAND_NAME_HAVE_ARGS)
+        self._expectingdata = bool(frameflags & FLAG_COMMAND_NAME_HAVE_DATA)
+
+        if self._expectingargs:
+            self._state = 'command-receiving-args'
+            return self._makewantframeresult()
+        elif self._expectingdata:
+            self._activedata = util.bytesio()
+            self._state = 'command-receiving-data'
+            return self._makewantframeresult()
+        else:
+            self._state = 'errored'
+            return self._makeerrorresult(_('missing frame flags on '
+                                           'command frame'))
+
+    def _onframereceivingargs(self, frametype, frameflags, payload):
+        if frametype != FRAME_TYPE_COMMAND_ARGUMENT:
+            self._state = 'errored'
+            return self._makeerrorresult(_('expected command argument '
+                                           'frame; got %d') % frametype)
+
+        offset = 0
+        namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(payload)
+        offset += ARGUMENT_FRAME_HEADER.size
+
+        # The argument name MUST fit inside the frame.
+        argname = bytes(payload[offset:offset + namesize])
+        offset += namesize
+
+        if len(argname) != namesize:
+            self._state = 'errored'
+            return self._makeerrorresult(_('malformed argument frame: '
+                                           'partial argument name'))
+
+        argvalue = bytes(payload[offset:])
+
+        # Argument value spans multiple frames. Record our active state
+        # and wait for the next frame.
+        if frameflags & FLAG_COMMAND_ARGUMENT_CONTINUATION:
+            raise error.ProgrammingError('not yet implemented')
+            self._activeargname = argname
+            self._activeargchunks = [argvalue]
+            self._state = 'command-arg-continuation'
+            return self._makewantframeresult()
+
+        # Common case: the argument value is completely contained in this
+        # frame.
+
+        if len(argvalue) != valuesize:
+            self._state = 'errored'
+            return self._makeerrorresult(_('malformed argument frame: '
+                                           'partial argument value'))
+
+        self._activeargs[argname] = argvalue
+
+        if frameflags & FLAG_COMMAND_ARGUMENT_EOA:
+            if self._expectingdata:
+                self._state = 'command-receiving-data'
+                self._activedata = util.bytesio()
+                # TODO signal request to run a command once we don't
+                # buffer data frames.
+                return self._makewantframeresult()
+            else:
+                self._state = 'waiting'
+                return self._makeruncommandresult()
+        else:
+            return self._makewantframeresult()
+
+    def _onframereceivingdata(self, frametype, frameflags, payload):
+        if frametype != FRAME_TYPE_COMMAND_DATA:
+            self._state = 'errored'
+            return self._makeerrorresult(_('expected command data frame; '
+                                           'got %d') % frametype)
+
+        # TODO support streaming data instead of buffering it.
+        self._activedata.write(payload)
+
+        if frameflags & FLAG_COMMAND_DATA_CONTINUATION:
+            return self._makewantframeresult()
+        elif frameflags & FLAG_COMMAND_DATA_EOS:
+            self._activedata.seek(0)
+            self._state = 'idle'
+            return self._makeruncommandresult()
+        else:
+            self._state = 'errored'
+            return self._makeerrorresult(_('command data frame without '
+                                           'flags'))
+
+    def _onframeerrored(self, frametype, frameflags, payload):
+        return self._makeerrorresult(_('server already errored'))