--- a/mercurial/wireprotoframing.py Wed Mar 14 22:19:00 2018 -0700
+++ b/mercurial/wireprotoframing.py Thu Mar 15 16:03:14 2018 -0700
@@ -14,6 +14,9 @@
import struct
from .i18n import _
+from .thirdparty import (
+ attr,
+)
from . import (
error,
util,
@@ -92,6 +95,24 @@
ARGUMENT_FRAME_HEADER = struct.Struct(r'<HH')
+@attr.s(slots=True)
+class frameheader(object):
+ """Represents the data in a frame header."""
+
+ length = attr.ib()
+ requestid = attr.ib()
+ typeid = attr.ib()
+ flags = attr.ib()
+
+@attr.s(slots=True)
+class frame(object):
+ """Represents a parsed frame."""
+
+ requestid = attr.ib()
+ typeid = attr.ib()
+ flags = attr.ib()
+ payload = attr.ib()
+
def makeframe(requestid, frametype, frameflags, payload):
"""Assemble a frame into a byte array."""
# TODO assert size of payload.
@@ -164,7 +185,7 @@
frametype = (typeflags & 0xf0) >> 4
frameflags = typeflags & 0x0f
- return requestid, frametype, frameflags, framelength
+ return frameheader(framelength, requestid, frametype, frameflags)
def readframe(fh):
"""Read a unified framing protocol frame from a file object.
@@ -184,14 +205,14 @@
raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
(readcount, header))
- requestid, frametype, frameflags, framelength = parseheader(header)
+ h = parseheader(header)
- payload = fh.read(framelength)
- if len(payload) != framelength:
+ payload = fh.read(h.length)
+ if len(payload) != h.length:
raise error.Abort(_('frame length error: expected %d; got %d') %
- (framelength, len(payload)))
+ (h.length, len(payload)))
- return requestid, frametype, frameflags, payload
+ return frame(h.requestid, h.typeid, h.flags, payload)
def createcommandframes(requestid, cmd, args, datafh=None):
"""Create frames necessary to transmit a request to run a command.
@@ -433,7 +454,7 @@
# request id -> dict of commands that are actively being received.
self._receivingcommands = {}
- def onframerecv(self, requestid, frametype, frameflags, payload):
+ def onframerecv(self, frame):
"""Process a frame that has been received off the wire.
Returns a dict with an ``action`` key that details what action,
@@ -449,7 +470,7 @@
if not meth:
raise error.ProgrammingError('unhandled state: %s' % self._state)
- return meth(requestid, frametype, frameflags, payload)
+ return meth(frame)
def onbytesresponseready(self, requestid, data):
"""Signal that a bytes response is ready to be sent to the client.
@@ -518,32 +539,32 @@
'state': self._state,
}
- def _onframeidle(self, requestid, frametype, frameflags, payload):
+ def _onframeidle(self, frame):
# The only frame type that should be received in this state is a
# command request.
- if frametype != FRAME_TYPE_COMMAND_NAME:
+ if frame.typeid != FRAME_TYPE_COMMAND_NAME:
self._state = 'errored'
return self._makeerrorresult(
- _('expected command frame; got %d') % frametype)
+ _('expected command frame; got %d') % frame.typeid)
- if requestid in self._receivingcommands:
+ if frame.requestid in self._receivingcommands:
self._state = 'errored'
return self._makeerrorresult(
- _('request with ID %d already received') % requestid)
+ _('request with ID %d already received') % frame.requestid)
- expectingargs = bool(frameflags & FLAG_COMMAND_NAME_HAVE_ARGS)
- expectingdata = bool(frameflags & FLAG_COMMAND_NAME_HAVE_DATA)
+ expectingargs = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_ARGS)
+ expectingdata = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_DATA)
- self._receivingcommands[requestid] = {
- 'command': payload,
+ self._receivingcommands[frame.requestid] = {
+ 'command': frame.payload,
'args': {},
'data': None,
'expectingargs': expectingargs,
'expectingdata': expectingdata,
}
- if frameflags & FLAG_COMMAND_NAME_EOS:
- return self._makeruncommandresult(requestid)
+ if frame.flags & FLAG_COMMAND_NAME_EOS:
+ return self._makeruncommandresult(frame.requestid)
if expectingargs or expectingdata:
self._state = 'command-receiving'
@@ -553,56 +574,52 @@
return self._makeerrorresult(_('missing frame flags on '
'command frame'))
- def _onframecommandreceiving(self, requestid, frametype, frameflags,
- payload):
+ def _onframecommandreceiving(self, frame):
# It could be a new command request. Process it as such.
- if frametype == FRAME_TYPE_COMMAND_NAME:
- return self._onframeidle(requestid, frametype, frameflags, payload)
+ if frame.typeid == FRAME_TYPE_COMMAND_NAME:
+ return self._onframeidle(frame)
# All other frames should be related to a command that is currently
# receiving.
- if requestid not in self._receivingcommands:
+ if frame.requestid not in self._receivingcommands:
self._state = 'errored'
return self._makeerrorresult(
_('received frame for request that is not receiving: %d') %
- requestid)
+ frame.requestid)
- entry = self._receivingcommands[requestid]
+ entry = self._receivingcommands[frame.requestid]
- if frametype == FRAME_TYPE_COMMAND_ARGUMENT:
+ if frame.typeid == FRAME_TYPE_COMMAND_ARGUMENT:
if not entry['expectingargs']:
self._state = 'errored'
return self._makeerrorresult(_(
'received command argument frame for request that is not '
- 'expecting arguments: %d') % requestid)
+ 'expecting arguments: %d') % frame.requestid)
- return self._handlecommandargsframe(requestid, entry, frametype,
- frameflags, payload)
+ return self._handlecommandargsframe(frame, entry)
- elif frametype == FRAME_TYPE_COMMAND_DATA:
+ elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
if not entry['expectingdata']:
self._state = 'errored'
return self._makeerrorresult(_(
'received command data frame for request that is not '
- 'expecting data: %d') % requestid)
+ 'expecting data: %d') % frame.requestid)
if entry['data'] is None:
entry['data'] = util.bytesio()
- return self._handlecommanddataframe(requestid, entry, frametype,
- frameflags, payload)
+ return self._handlecommanddataframe(frame, entry)
- def _handlecommandargsframe(self, requestid, entry, frametype, frameflags,
- payload):
+ def _handlecommandargsframe(self, frame, entry):
# The frame and state of command should have already been validated.
- assert frametype == FRAME_TYPE_COMMAND_ARGUMENT
+ assert frame.typeid == FRAME_TYPE_COMMAND_ARGUMENT
offset = 0
- namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(payload)
+ namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(frame.payload)
offset += ARGUMENT_FRAME_HEADER.size
# The argument name MUST fit inside the frame.
- argname = bytes(payload[offset:offset + namesize])
+ argname = bytes(frame.payload[offset:offset + namesize])
offset += namesize
if len(argname) != namesize:
@@ -610,11 +627,11 @@
return self._makeerrorresult(_('malformed argument frame: '
'partial argument name'))
- argvalue = bytes(payload[offset:])
+ argvalue = bytes(frame.payload[offset:])
# Argument value spans multiple frames. Record our active state
# and wait for the next frame.
- if frameflags & FLAG_COMMAND_ARGUMENT_CONTINUATION:
+ if frame.flags & FLAG_COMMAND_ARGUMENT_CONTINUATION:
raise error.ProgrammingError('not yet implemented')
# Common case: the argument value is completely contained in this
@@ -627,32 +644,31 @@
entry['args'][argname] = argvalue
- if frameflags & FLAG_COMMAND_ARGUMENT_EOA:
+ if frame.flags & FLAG_COMMAND_ARGUMENT_EOA:
if entry['expectingdata']:
# TODO signal request to run a command once we don't
# buffer data frames.
return self._makewantframeresult()
else:
- return self._makeruncommandresult(requestid)
+ return self._makeruncommandresult(frame.requestid)
else:
return self._makewantframeresult()
- def _handlecommanddataframe(self, requestid, entry, frametype, frameflags,
- payload):
- assert frametype == FRAME_TYPE_COMMAND_DATA
+ def _handlecommanddataframe(self, frame, entry):
+ assert frame.typeid == FRAME_TYPE_COMMAND_DATA
# TODO support streaming data instead of buffering it.
- entry['data'].write(payload)
+ entry['data'].write(frame.payload)
- if frameflags & FLAG_COMMAND_DATA_CONTINUATION:
+ if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
return self._makewantframeresult()
- elif frameflags & FLAG_COMMAND_DATA_EOS:
+ elif frame.flags & FLAG_COMMAND_DATA_EOS:
entry['data'].seek(0)
- return self._makeruncommandresult(requestid)
+ return self._makeruncommandresult(frame.requestid)
else:
self._state = 'errored'
return self._makeerrorresult(_('command data frame without '
'flags'))
- def _onframeerrored(self, requestid, frametype, frameflags, payload):
+ def _onframeerrored(self, frame):
return self._makeerrorresult(_('server already errored'))
--- a/mercurial/wireprotoserver.py Wed Mar 14 22:19:00 2018 -0700
+++ b/mercurial/wireprotoserver.py Thu Mar 15 16:03:14 2018 -0700
@@ -400,12 +400,11 @@
states.append(b'received: <no frame>')
break
- requestid, frametype, frameflags, payload = frame
- states.append(b'received: %d %d %d %s' % (frametype, frameflags,
- requestid, payload))
+ states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
+ frame.requestid,
+ frame.payload))
- action, meta = reactor.onframerecv(requestid, frametype, frameflags,
- payload)
+ action, meta = reactor.onframerecv(frame)
states.append(json.dumps((action, meta), sort_keys=True,
separators=(', ', ': ')))
@@ -434,7 +433,7 @@
if not frame:
break
- action, meta = reactor.onframerecv(*frame)
+ action, meta = reactor.onframerecv(frame)
if action == 'wantframe':
# Need more data before we can do anything.
--- a/tests/test-wireproto-serverreactor.py Wed Mar 14 22:19:00 2018 -0700
+++ b/tests/test-wireproto-serverreactor.py Thu Mar 15 16:03:14 2018 -0700
@@ -18,11 +18,14 @@
Emits a generator of results from ``onframerecv()`` calls.
"""
for frame in gen:
- rid, frametype, frameflags, framelength = framing.parseheader(frame)
+ header = framing.parseheader(frame)
payload = frame[framing.FRAME_HEADER_SIZE:]
- assert len(payload) == framelength
+ assert len(payload) == header.length
- yield reactor.onframerecv(rid, frametype, frameflags, payload)
+ yield reactor.onframerecv(framing.frame(header.requestid,
+ header.typeid,
+ header.flags,
+ payload))
def sendcommandframes(reactor, rid, cmd, args, datafh=None):
"""Generate frames to run a command and send them to a reactor."""