changeset 37061:884a0c1604ad

wireproto: define attr-based classes for representing frames When frames only had 3 attributes, it was reasonable to represent them as a tuple. With them growing more attributes, it will be easier to pass them around as a more formal type. So let's define attr-based classes to represent frame headers and full frames. Differential Revision: https://phab.mercurial-scm.org/D2899
author Gregory Szorc <gregory.szorc@gmail.com>
date Thu, 15 Mar 2018 16:03:14 -0700
parents 0a6c5cc09a88
children fe4c944f95bb
files mercurial/wireprotoframing.py mercurial/wireprotoserver.py tests/test-wireproto-serverreactor.py
diffstat 3 files changed, 78 insertions(+), 60 deletions(-) [+]
line wrap: on
line diff
--- a/mercurial/wireprotoframing.py	Wed Mar 14 22:19:00 2018 -0700
+++ b/mercurial/wireprotoframing.py	Thu Mar 15 16:03:14 2018 -0700
@@ -14,6 +14,9 @@
 import struct
 
 from .i18n import _
+from .thirdparty import (
+    attr,
+)
 from . import (
     error,
     util,
@@ -92,6 +95,24 @@
 
 ARGUMENT_FRAME_HEADER = struct.Struct(r'<HH')
 
+@attr.s(slots=True)
+class frameheader(object):
+    """Represents the data in a frame header."""
+
+    length = attr.ib()
+    requestid = attr.ib()
+    typeid = attr.ib()
+    flags = attr.ib()
+
+@attr.s(slots=True)
+class frame(object):
+    """Represents a parsed frame."""
+
+    requestid = attr.ib()
+    typeid = attr.ib()
+    flags = attr.ib()
+    payload = attr.ib()
+
 def makeframe(requestid, frametype, frameflags, payload):
     """Assemble a frame into a byte array."""
     # TODO assert size of payload.
@@ -164,7 +185,7 @@
     frametype = (typeflags & 0xf0) >> 4
     frameflags = typeflags & 0x0f
 
-    return requestid, frametype, frameflags, framelength
+    return frameheader(framelength, requestid, frametype, frameflags)
 
 def readframe(fh):
     """Read a unified framing protocol frame from a file object.
@@ -184,14 +205,14 @@
         raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
                           (readcount, header))
 
-    requestid, frametype, frameflags, framelength = parseheader(header)
+    h = parseheader(header)
 
-    payload = fh.read(framelength)
-    if len(payload) != framelength:
+    payload = fh.read(h.length)
+    if len(payload) != h.length:
         raise error.Abort(_('frame length error: expected %d; got %d') %
-                          (framelength, len(payload)))
+                          (h.length, len(payload)))
 
-    return requestid, frametype, frameflags, payload
+    return frame(h.requestid, h.typeid, h.flags, payload)
 
 def createcommandframes(requestid, cmd, args, datafh=None):
     """Create frames necessary to transmit a request to run a command.
@@ -433,7 +454,7 @@
         # request id -> dict of commands that are actively being received.
         self._receivingcommands = {}
 
-    def onframerecv(self, requestid, frametype, frameflags, payload):
+    def onframerecv(self, frame):
         """Process a frame that has been received off the wire.
 
         Returns a dict with an ``action`` key that details what action,
@@ -449,7 +470,7 @@
         if not meth:
             raise error.ProgrammingError('unhandled state: %s' % self._state)
 
-        return meth(requestid, frametype, frameflags, payload)
+        return meth(frame)
 
     def onbytesresponseready(self, requestid, data):
         """Signal that a bytes response is ready to be sent to the client.
@@ -518,32 +539,32 @@
             'state': self._state,
         }
 
-    def _onframeidle(self, requestid, frametype, frameflags, payload):
+    def _onframeidle(self, frame):
         # The only frame type that should be received in this state is a
         # command request.
-        if frametype != FRAME_TYPE_COMMAND_NAME:
+        if frame.typeid != FRAME_TYPE_COMMAND_NAME:
             self._state = 'errored'
             return self._makeerrorresult(
-                _('expected command frame; got %d') % frametype)
+                _('expected command frame; got %d') % frame.typeid)
 
-        if requestid in self._receivingcommands:
+        if frame.requestid in self._receivingcommands:
             self._state = 'errored'
             return self._makeerrorresult(
-                _('request with ID %d already received') % requestid)
+                _('request with ID %d already received') % frame.requestid)
 
-        expectingargs = bool(frameflags & FLAG_COMMAND_NAME_HAVE_ARGS)
-        expectingdata = bool(frameflags & FLAG_COMMAND_NAME_HAVE_DATA)
+        expectingargs = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_ARGS)
+        expectingdata = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_DATA)
 
-        self._receivingcommands[requestid] = {
-            'command': payload,
+        self._receivingcommands[frame.requestid] = {
+            'command': frame.payload,
             'args': {},
             'data': None,
             'expectingargs': expectingargs,
             'expectingdata': expectingdata,
         }
 
-        if frameflags & FLAG_COMMAND_NAME_EOS:
-            return self._makeruncommandresult(requestid)
+        if frame.flags & FLAG_COMMAND_NAME_EOS:
+            return self._makeruncommandresult(frame.requestid)
 
         if expectingargs or expectingdata:
             self._state = 'command-receiving'
@@ -553,56 +574,52 @@
             return self._makeerrorresult(_('missing frame flags on '
                                            'command frame'))
 
-    def _onframecommandreceiving(self, requestid, frametype, frameflags,
-                                 payload):
+    def _onframecommandreceiving(self, frame):
         # It could be a new command request. Process it as such.
-        if frametype == FRAME_TYPE_COMMAND_NAME:
-            return self._onframeidle(requestid, frametype, frameflags, payload)
+        if frame.typeid == FRAME_TYPE_COMMAND_NAME:
+            return self._onframeidle(frame)
 
         # All other frames should be related to a command that is currently
         # receiving.
-        if requestid not in self._receivingcommands:
+        if frame.requestid not in self._receivingcommands:
             self._state = 'errored'
             return self._makeerrorresult(
                 _('received frame for request that is not receiving: %d') %
-                  requestid)
+                  frame.requestid)
 
-        entry = self._receivingcommands[requestid]
+        entry = self._receivingcommands[frame.requestid]
 
-        if frametype == FRAME_TYPE_COMMAND_ARGUMENT:
+        if frame.typeid == FRAME_TYPE_COMMAND_ARGUMENT:
             if not entry['expectingargs']:
                 self._state = 'errored'
                 return self._makeerrorresult(_(
                     'received command argument frame for request that is not '
-                    'expecting arguments: %d') % requestid)
+                    'expecting arguments: %d') % frame.requestid)
 
-            return self._handlecommandargsframe(requestid, entry, frametype,
-                                                frameflags, payload)
+            return self._handlecommandargsframe(frame, entry)
 
-        elif frametype == FRAME_TYPE_COMMAND_DATA:
+        elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
             if not entry['expectingdata']:
                 self._state = 'errored'
                 return self._makeerrorresult(_(
                     'received command data frame for request that is not '
-                    'expecting data: %d') % requestid)
+                    'expecting data: %d') % frame.requestid)
 
             if entry['data'] is None:
                 entry['data'] = util.bytesio()
 
-            return self._handlecommanddataframe(requestid, entry, frametype,
-                                                frameflags, payload)
+            return self._handlecommanddataframe(frame, entry)
 
-    def _handlecommandargsframe(self, requestid, entry, frametype, frameflags,
-                                payload):
+    def _handlecommandargsframe(self, frame, entry):
         # The frame and state of command should have already been validated.
-        assert frametype == FRAME_TYPE_COMMAND_ARGUMENT
+        assert frame.typeid == FRAME_TYPE_COMMAND_ARGUMENT
 
         offset = 0
-        namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(payload)
+        namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(frame.payload)
         offset += ARGUMENT_FRAME_HEADER.size
 
         # The argument name MUST fit inside the frame.
-        argname = bytes(payload[offset:offset + namesize])
+        argname = bytes(frame.payload[offset:offset + namesize])
         offset += namesize
 
         if len(argname) != namesize:
@@ -610,11 +627,11 @@
             return self._makeerrorresult(_('malformed argument frame: '
                                            'partial argument name'))
 
-        argvalue = bytes(payload[offset:])
+        argvalue = bytes(frame.payload[offset:])
 
         # Argument value spans multiple frames. Record our active state
         # and wait for the next frame.
-        if frameflags & FLAG_COMMAND_ARGUMENT_CONTINUATION:
+        if frame.flags & FLAG_COMMAND_ARGUMENT_CONTINUATION:
             raise error.ProgrammingError('not yet implemented')
 
         # Common case: the argument value is completely contained in this
@@ -627,32 +644,31 @@
 
         entry['args'][argname] = argvalue
 
-        if frameflags & FLAG_COMMAND_ARGUMENT_EOA:
+        if frame.flags & FLAG_COMMAND_ARGUMENT_EOA:
             if entry['expectingdata']:
                 # TODO signal request to run a command once we don't
                 # buffer data frames.
                 return self._makewantframeresult()
             else:
-                return self._makeruncommandresult(requestid)
+                return self._makeruncommandresult(frame.requestid)
         else:
             return self._makewantframeresult()
 
-    def _handlecommanddataframe(self, requestid, entry, frametype, frameflags,
-                                payload):
-        assert frametype == FRAME_TYPE_COMMAND_DATA
+    def _handlecommanddataframe(self, frame, entry):
+        assert frame.typeid == FRAME_TYPE_COMMAND_DATA
 
         # TODO support streaming data instead of buffering it.
-        entry['data'].write(payload)
+        entry['data'].write(frame.payload)
 
-        if frameflags & FLAG_COMMAND_DATA_CONTINUATION:
+        if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
             return self._makewantframeresult()
-        elif frameflags & FLAG_COMMAND_DATA_EOS:
+        elif frame.flags & FLAG_COMMAND_DATA_EOS:
             entry['data'].seek(0)
-            return self._makeruncommandresult(requestid)
+            return self._makeruncommandresult(frame.requestid)
         else:
             self._state = 'errored'
             return self._makeerrorresult(_('command data frame without '
                                            'flags'))
 
-    def _onframeerrored(self, requestid, frametype, frameflags, payload):
+    def _onframeerrored(self, frame):
         return self._makeerrorresult(_('server already errored'))
--- a/mercurial/wireprotoserver.py	Wed Mar 14 22:19:00 2018 -0700
+++ b/mercurial/wireprotoserver.py	Thu Mar 15 16:03:14 2018 -0700
@@ -400,12 +400,11 @@
             states.append(b'received: <no frame>')
             break
 
-        requestid, frametype, frameflags, payload = frame
-        states.append(b'received: %d %d %d %s' % (frametype, frameflags,
-                                                  requestid, payload))
+        states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
+                                                  frame.requestid,
+                                                  frame.payload))
 
-        action, meta = reactor.onframerecv(requestid, frametype, frameflags,
-                                           payload)
+        action, meta = reactor.onframerecv(frame)
         states.append(json.dumps((action, meta), sort_keys=True,
                                  separators=(', ', ': ')))
 
@@ -434,7 +433,7 @@
         if not frame:
             break
 
-        action, meta = reactor.onframerecv(*frame)
+        action, meta = reactor.onframerecv(frame)
 
         if action == 'wantframe':
             # Need more data before we can do anything.
--- a/tests/test-wireproto-serverreactor.py	Wed Mar 14 22:19:00 2018 -0700
+++ b/tests/test-wireproto-serverreactor.py	Thu Mar 15 16:03:14 2018 -0700
@@ -18,11 +18,14 @@
     Emits a generator of results from ``onframerecv()`` calls.
     """
     for frame in gen:
-        rid, frametype, frameflags, framelength = framing.parseheader(frame)
+        header = framing.parseheader(frame)
         payload = frame[framing.FRAME_HEADER_SIZE:]
-        assert len(payload) == framelength
+        assert len(payload) == header.length
 
-        yield reactor.onframerecv(rid, frametype, frameflags, payload)
+        yield reactor.onframerecv(framing.frame(header.requestid,
+                                                header.typeid,
+                                                header.flags,
+                                                payload))
 
 def sendcommandframes(reactor, rid, cmd, args, datafh=None):
     """Generate frames to run a command and send them to a reactor."""