--- a/mercurial/wireprotoframing.py Wed Mar 14 14:01:16 2018 -0700
+++ b/mercurial/wireprotoframing.py Wed Mar 14 16:51:34 2018 -0700
@@ -19,7 +19,7 @@
util,
)
-FRAME_HEADER_SIZE = 4
+FRAME_HEADER_SIZE = 6
DEFAULT_MAX_FRAME_SIZE = 32768
FRAME_TYPE_COMMAND_NAME = 0x01
@@ -89,28 +89,43 @@
ARGUMENT_FRAME_HEADER = struct.Struct(r'<HH')
-def makeframe(frametype, frameflags, payload):
+def makeframe(requestid, frametype, frameflags, payload):
"""Assemble a frame into a byte array."""
# TODO assert size of payload.
frame = bytearray(FRAME_HEADER_SIZE + len(payload))
+ # 24 bits length
+ # 16 bits request id
+ # 4 bits type
+ # 4 bits flags
+
l = struct.pack(r'<I', len(payload))
frame[0:3] = l[0:3]
- frame[3] = (frametype << 4) | frameflags
- frame[4:] = payload
+ struct.pack_into(r'<H', frame, 3, requestid)
+ frame[5] = (frametype << 4) | frameflags
+ frame[6:] = payload
return frame
def makeframefromhumanstring(s):
- """Given a string of the form: <type> <flags> <payload>, creates a frame.
+ """Create a frame from a human readable string
+
+ Strings have the form:
+
+ <request-id> <type> <flags> <payload>
This can be used by user-facing applications and tests for creating
frames easily without having to type out a bunch of constants.
+ Request ID is an integer.
+
Frame type and flags can be specified by integer or named constant.
+
Flags can be delimited by `|` to bitwise OR them together.
"""
- frametype, frameflags, payload = s.split(b' ', 2)
+ requestid, frametype, frameflags, payload = s.split(b' ', 3)
+
+ requestid = int(requestid)
if frametype in FRAME_TYPES:
frametype = FRAME_TYPES[frametype]
@@ -127,7 +142,7 @@
payload = util.unescapestr(payload)
- return makeframe(frametype, finalflags, payload)
+ return makeframe(requestid, frametype, finalflags, payload)
def parseheader(data):
"""Parse a unified framing protocol frame header from a buffer.
@@ -140,12 +155,13 @@
# 4 bits frame flags
# ... payload
framelength = data[0] + 256 * data[1] + 16384 * data[2]
- typeflags = data[3]
+ requestid = struct.unpack_from(r'<H', data, 3)[0]
+ typeflags = data[5]
frametype = (typeflags & 0xf0) >> 4
frameflags = typeflags & 0x0f
- return frametype, frameflags, framelength
+ return requestid, frametype, frameflags, framelength
def readframe(fh):
"""Read a unified framing protocol frame from a file object.
@@ -165,16 +181,16 @@
raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
(readcount, header))
- frametype, frameflags, framelength = parseheader(header)
+ requestid, frametype, frameflags, framelength = parseheader(header)
payload = fh.read(framelength)
if len(payload) != framelength:
raise error.Abort(_('frame length error: expected %d; got %d') %
(framelength, len(payload)))
- return frametype, frameflags, payload
+ return requestid, frametype, frameflags, payload
-def createcommandframes(cmd, args, datafh=None):
+def createcommandframes(requestid, cmd, args, datafh=None):
"""Create frames necessary to transmit a request to run a command.
This is a generator of bytearrays. Each item represents a frame
@@ -189,7 +205,7 @@
if not flags:
flags |= FLAG_COMMAND_NAME_EOS
- yield makeframe(FRAME_TYPE_COMMAND_NAME, flags, cmd)
+ yield makeframe(requestid, FRAME_TYPE_COMMAND_NAME, flags, cmd)
for i, k in enumerate(sorted(args)):
v = args[k]
@@ -205,7 +221,7 @@
payload[offset:offset + len(v)] = v
flags = FLAG_COMMAND_ARGUMENT_EOA if last else 0
- yield makeframe(FRAME_TYPE_COMMAND_ARGUMENT, flags, payload)
+ yield makeframe(requestid, FRAME_TYPE_COMMAND_ARGUMENT, flags, payload)
if datafh:
while True:
@@ -219,12 +235,12 @@
assert datafh.read(1) == b''
done = True
- yield makeframe(FRAME_TYPE_COMMAND_DATA, flags, data)
+ yield makeframe(requestid, FRAME_TYPE_COMMAND_DATA, flags, data)
if done:
break
-def createbytesresponseframesfrombytes(data,
+def createbytesresponseframesfrombytes(requestid, data,
maxframesize=DEFAULT_MAX_FRAME_SIZE):
"""Create a raw frame to send a bytes response from static bytes input.
@@ -233,7 +249,7 @@
# Simple case of a single frame.
if len(data) <= maxframesize:
- yield makeframe(FRAME_TYPE_BYTES_RESPONSE,
+ yield makeframe(requestid, FRAME_TYPE_BYTES_RESPONSE,
FLAG_BYTES_RESPONSE_EOS, data)
return
@@ -248,12 +264,12 @@
else:
flags = FLAG_BYTES_RESPONSE_CONTINUATION
- yield makeframe(FRAME_TYPE_BYTES_RESPONSE, flags, chunk)
+ yield makeframe(requestid, FRAME_TYPE_BYTES_RESPONSE, flags, chunk)
if done:
break
-def createerrorframe(msg, protocol=False, application=False):
+def createerrorframe(requestid, msg, protocol=False, application=False):
# TODO properly handle frame size limits.
assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
@@ -263,7 +279,7 @@
if application:
flags |= FLAG_ERROR_RESPONSE_APPLICATION
- yield makeframe(FRAME_TYPE_ERROR_RESPONSE, flags, msg)
+ yield makeframe(requestid, FRAME_TYPE_ERROR_RESPONSE, flags, msg)
class serverreactor(object):
"""Holds state of a server handling frame-based protocol requests.
@@ -326,6 +342,7 @@
self._deferoutput = deferoutput
self._state = 'idle'
self._bufferedframegens = []
+ self._activerequestid = None
self._activecommand = None
self._activeargs = None
self._activedata = None
@@ -334,7 +351,7 @@
self._activeargname = None
self._activeargchunks = None
- def onframerecv(self, frametype, frameflags, payload):
+ def onframerecv(self, requestid, frametype, frameflags, payload):
"""Process a frame that has been received off the wire.
Returns a dict with an ``action`` key that details what action,
@@ -351,14 +368,14 @@
if not meth:
raise error.ProgrammingError('unhandled state: %s' % self._state)
- return meth(frametype, frameflags, payload)
+ return meth(requestid, frametype, frameflags, payload)
- def onbytesresponseready(self, data):
+ def onbytesresponseready(self, requestid, data):
"""Signal that a bytes response is ready to be sent to the client.
The raw bytes response is passed as an argument.
"""
- framegen = createbytesresponseframesfrombytes(data)
+ framegen = createbytesresponseframesfrombytes(requestid, data)
if self._deferoutput:
self._bufferedframegens.append(framegen)
@@ -387,9 +404,9 @@
'framegen': makegen(),
}
- def onapplicationerror(self, msg):
+ def onapplicationerror(self, requestid, msg):
return 'sendframes', {
- 'framegen': createerrorframe(msg, application=True),
+ 'framegen': createerrorframe(requestid, msg, application=True),
}
def _makeerrorresult(self, msg):
@@ -399,6 +416,7 @@
def _makeruncommandresult(self):
return 'runcommand', {
+ 'requestid': self._activerequestid,
'command': self._activecommand,
'args': self._activeargs,
'data': self._activedata.getvalue() if self._activedata else None,
@@ -409,7 +427,7 @@
'state': self._state,
}
- def _onframeidle(self, frametype, frameflags, payload):
+ def _onframeidle(self, requestid, frametype, frameflags, payload):
# The only frame type that should be received in this state is a
# command request.
if frametype != FRAME_TYPE_COMMAND_NAME:
@@ -417,6 +435,7 @@
return self._makeerrorresult(
_('expected command frame; got %d') % frametype)
+ self._activerequestid = requestid
self._activecommand = payload
self._activeargs = {}
self._activedata = None
@@ -439,7 +458,7 @@
return self._makeerrorresult(_('missing frame flags on '
'command frame'))
- def _onframereceivingargs(self, frametype, frameflags, payload):
+ def _onframereceivingargs(self, requestid, frametype, frameflags, payload):
if frametype != FRAME_TYPE_COMMAND_ARGUMENT:
self._state = 'errored'
return self._makeerrorresult(_('expected command argument '
@@ -492,7 +511,7 @@
else:
return self._makewantframeresult()
- def _onframereceivingdata(self, frametype, frameflags, payload):
+ def _onframereceivingdata(self, requestid, frametype, frameflags, payload):
if frametype != FRAME_TYPE_COMMAND_DATA:
self._state = 'errored'
return self._makeerrorresult(_('expected command data frame; '
@@ -512,5 +531,5 @@
return self._makeerrorresult(_('command data frame without '
'flags'))
- def _onframeerrored(self, frametype, frameflags, payload):
+ def _onframeerrored(self, requestid, frametype, frameflags, payload):
return self._makeerrorresult(_('server already errored'))