--- a/mercurial/wireprotoframing.py Mon Mar 19 16:49:53 2018 -0700
+++ b/mercurial/wireprotoframing.py Wed Mar 14 15:25:06 2018 -0700
@@ -13,7 +13,9 @@
import struct
+from .i18n import _
from . import (
+ error,
util,
)
@@ -105,6 +107,51 @@
return makeframe(frametype, finalflags, payload)
+def parseheader(data):
+ """Parse a unified framing protocol frame header from a buffer.
+
+ The header is expected to be in the buffer at offset 0 and the
+ buffer is expected to be large enough to hold a full header.
+ """
+ # 24 bits payload length (little endian)
+ # 4 bits frame type
+ # 4 bits frame flags
+ # ... payload
+ framelength = data[0] + 256 * data[1] + 16384 * data[2]
+ typeflags = data[3]
+
+ frametype = (typeflags & 0xf0) >> 4
+ frameflags = typeflags & 0x0f
+
+ return frametype, frameflags, framelength
+
+def readframe(fh):
+ """Read a unified framing protocol frame from a file object.
+
+ Returns a 3-tuple of (type, flags, payload) for the decoded frame or
+ None if no frame is available. May raise if a malformed frame is
+ seen.
+ """
+ header = bytearray(FRAME_HEADER_SIZE)
+
+ readcount = fh.readinto(header)
+
+ if readcount == 0:
+ return None
+
+ if readcount != FRAME_HEADER_SIZE:
+ raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
+ (readcount, header))
+
+ frametype, frameflags, framelength = parseheader(header)
+
+ payload = fh.read(framelength)
+ if len(payload) != framelength:
+ raise error.Abort(_('frame length error: expected %d; got %d') %
+ (framelength, len(payload)))
+
+ return frametype, frameflags, payload
+
def createcommandframes(cmd, args, datafh=None):
"""Create frames necessary to transmit a request to run a command.
@@ -154,3 +201,195 @@
if done:
break
+
+class serverreactor(object):
+ """Holds state of a server handling frame-based protocol requests.
+
+ This class is the "brain" of the unified frame-based protocol server
+ component. While the protocol is stateless from the perspective of
+ requests/commands, something needs to track which frames have been
+ received, what frames to expect, etc. This class is that thing.
+
+ Instances are modeled as a state machine of sorts. Instances are also
+ reactionary to external events. The point of this class is to encapsulate
+ the state of the connection and the exchange of frames, not to perform
+ work. Instead, callers tell this class when something occurs, like a
+ frame arriving. If that activity is worthy of a follow-up action (say
+ *run a command*), the return value of that handler will say so.
+
+ I/O and CPU intensive operations are purposefully delegated outside of
+ this class.
+
+ Consumers are expected to tell instances when events occur. They do so by
+ calling the various ``on*`` methods. These methods return a 2-tuple
+ describing any follow-up action(s) to take. The first element is the
+ name of an action to perform. The second is a data structure (usually
+ a dict) specific to that action that contains more information. e.g.
+ if the server wants to send frames back to the client, the data structure
+ will contain a reference to those frames.
+
+ Valid actions that consumers can be instructed to take are:
+
+ error
+ Indicates that an error occurred. Consumer should probably abort.
+
+ runcommand
+ Indicates that the consumer should run a wire protocol command. Details
+ of the command to run are given in the data structure.
+
+ wantframe
+ Indicates that nothing of interest happened and the server is waiting on
+ more frames from the client before anything interesting can be done.
+ """
+
+ def __init__(self):
+ self._state = 'idle'
+ self._activecommand = None
+ self._activeargs = None
+ self._activedata = None
+ self._expectingargs = None
+ self._expectingdata = None
+ self._activeargname = None
+ self._activeargchunks = None
+
+ def onframerecv(self, frametype, frameflags, payload):
+ """Process a frame that has been received off the wire.
+
+ Returns a dict with an ``action`` key that details what action,
+ if any, the consumer should take next.
+ """
+ handlers = {
+ 'idle': self._onframeidle,
+ 'command-receiving-args': self._onframereceivingargs,
+ 'command-receiving-data': self._onframereceivingdata,
+ 'errored': self._onframeerrored,
+ }
+
+ meth = handlers.get(self._state)
+ if not meth:
+ raise error.ProgrammingError('unhandled state: %s' % self._state)
+
+ return meth(frametype, frameflags, payload)
+
+ def _makeerrorresult(self, msg):
+ return 'error', {
+ 'message': msg,
+ }
+
+ def _makeruncommandresult(self):
+ return 'runcommand', {
+ 'command': self._activecommand,
+ 'args': self._activeargs,
+ 'data': self._activedata.getvalue() if self._activedata else None,
+ }
+
+ def _makewantframeresult(self):
+ return 'wantframe', {
+ 'state': self._state,
+ }
+
+ def _onframeidle(self, frametype, frameflags, payload):
+ # The only frame type that should be received in this state is a
+ # command request.
+ if frametype != FRAME_TYPE_COMMAND_NAME:
+ self._state = 'errored'
+ return self._makeerrorresult(
+ _('expected command frame; got %d') % frametype)
+
+ self._activecommand = payload
+ self._activeargs = {}
+ self._activedata = None
+
+ if frameflags & FLAG_COMMAND_NAME_EOS:
+ return self._makeruncommandresult()
+
+ self._expectingargs = bool(frameflags & FLAG_COMMAND_NAME_HAVE_ARGS)
+ self._expectingdata = bool(frameflags & FLAG_COMMAND_NAME_HAVE_DATA)
+
+ if self._expectingargs:
+ self._state = 'command-receiving-args'
+ return self._makewantframeresult()
+ elif self._expectingdata:
+ self._activedata = util.bytesio()
+ self._state = 'command-receiving-data'
+ return self._makewantframeresult()
+ else:
+ self._state = 'errored'
+ return self._makeerrorresult(_('missing frame flags on '
+ 'command frame'))
+
+ def _onframereceivingargs(self, frametype, frameflags, payload):
+ if frametype != FRAME_TYPE_COMMAND_ARGUMENT:
+ self._state = 'errored'
+ return self._makeerrorresult(_('expected command argument '
+ 'frame; got %d') % frametype)
+
+ offset = 0
+ namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(payload)
+ offset += ARGUMENT_FRAME_HEADER.size
+
+ # The argument name MUST fit inside the frame.
+ argname = bytes(payload[offset:offset + namesize])
+ offset += namesize
+
+ if len(argname) != namesize:
+ self._state = 'errored'
+ return self._makeerrorresult(_('malformed argument frame: '
+ 'partial argument name'))
+
+ argvalue = bytes(payload[offset:])
+
+ # Argument value spans multiple frames. Record our active state
+ # and wait for the next frame.
+ if frameflags & FLAG_COMMAND_ARGUMENT_CONTINUATION:
+ raise error.ProgrammingError('not yet implemented')
+ self._activeargname = argname
+ self._activeargchunks = [argvalue]
+ self._state = 'command-arg-continuation'
+ return self._makewantframeresult()
+
+ # Common case: the argument value is completely contained in this
+ # frame.
+
+ if len(argvalue) != valuesize:
+ self._state = 'errored'
+ return self._makeerrorresult(_('malformed argument frame: '
+ 'partial argument value'))
+
+ self._activeargs[argname] = argvalue
+
+ if frameflags & FLAG_COMMAND_ARGUMENT_EOA:
+ if self._expectingdata:
+ self._state = 'command-receiving-data'
+ self._activedata = util.bytesio()
+ # TODO signal request to run a command once we don't
+ # buffer data frames.
+ return self._makewantframeresult()
+ else:
+ self._state = 'waiting'
+ return self._makeruncommandresult()
+ else:
+ return self._makewantframeresult()
+
+ def _onframereceivingdata(self, frametype, frameflags, payload):
+ if frametype != FRAME_TYPE_COMMAND_DATA:
+ self._state = 'errored'
+ return self._makeerrorresult(_('expected command data frame; '
+ 'got %d') % frametype)
+
+ # TODO support streaming data instead of buffering it.
+ self._activedata.write(payload)
+
+ if frameflags & FLAG_COMMAND_DATA_CONTINUATION:
+ return self._makewantframeresult()
+ elif frameflags & FLAG_COMMAND_DATA_EOS:
+ self._activedata.seek(0)
+ self._state = 'idle'
+ return self._makeruncommandresult()
+ else:
+ self._state = 'errored'
+ return self._makeerrorresult(_('command data frame without '
+ 'flags'))
+
+ def _onframeerrored(self, frametype, frameflags, payload):
+ return self._makeerrorresult(_('server already errored'))