--- a/mercurial/wireprotoframing.py Mon Mar 26 13:57:22 2018 -0700
+++ b/mercurial/wireprotoframing.py Mon Mar 26 11:00:16 2018 -0700
@@ -25,15 +25,26 @@
stringutil,
)
-FRAME_HEADER_SIZE = 6
+FRAME_HEADER_SIZE = 8
DEFAULT_MAX_FRAME_SIZE = 32768
+STREAM_FLAG_BEGIN_STREAM = 0x01
+STREAM_FLAG_END_STREAM = 0x02
+STREAM_FLAG_ENCODING_APPLIED = 0x04
+
+STREAM_FLAGS = {
+ b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
+ b'stream-end': STREAM_FLAG_END_STREAM,
+ b'encoded': STREAM_FLAG_ENCODING_APPLIED,
+}
+
FRAME_TYPE_COMMAND_NAME = 0x01
FRAME_TYPE_COMMAND_ARGUMENT = 0x02
FRAME_TYPE_COMMAND_DATA = 0x03
FRAME_TYPE_BYTES_RESPONSE = 0x04
FRAME_TYPE_ERROR_RESPONSE = 0x05
FRAME_TYPE_TEXT_OUTPUT = 0x06
+FRAME_TYPE_STREAM_SETTINGS = 0x08
FRAME_TYPES = {
b'command-name': FRAME_TYPE_COMMAND_NAME,
@@ -42,6 +53,7 @@
b'bytes-response': FRAME_TYPE_BYTES_RESPONSE,
b'error-response': FRAME_TYPE_ERROR_RESPONSE,
b'text-output': FRAME_TYPE_TEXT_OUTPUT,
+ b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
}
FLAG_COMMAND_NAME_EOS = 0x01
@@ -94,6 +106,7 @@
FRAME_TYPE_BYTES_RESPONSE: FLAGS_BYTES_RESPONSE,
FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE,
FRAME_TYPE_TEXT_OUTPUT: {},
+ FRAME_TYPE_STREAM_SETTINGS: {},
}
ARGUMENT_FRAME_HEADER = struct.Struct(r'<HH')
@@ -104,6 +117,8 @@
length = attr.ib()
requestid = attr.ib()
+ streamid = attr.ib()
+ streamflags = attr.ib()
typeid = attr.ib()
flags = attr.ib()
@@ -112,25 +127,29 @@
"""Represents a parsed frame."""
requestid = attr.ib()
+ streamid = attr.ib()
+ streamflags = attr.ib()
typeid = attr.ib()
flags = attr.ib()
payload = attr.ib()
-def makeframe(requestid, typeid, flags, payload):
+def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
"""Assemble a frame into a byte array."""
# TODO assert size of payload.
frame = bytearray(FRAME_HEADER_SIZE + len(payload))
# 24 bits length
# 16 bits request id
+ # 8 bits stream id
+ # 8 bits stream flags
# 4 bits type
# 4 bits flags
l = struct.pack(r'<I', len(payload))
frame[0:3] = l[0:3]
- struct.pack_into(r'<H', frame, 3, requestid)
- frame[5] = (typeid << 4) | flags
- frame[6:] = payload
+ struct.pack_into(r'<HBB', frame, 3, requestid, streamid, streamflags)
+ frame[7] = (typeid << 4) | flags
+ frame[8:] = payload
return frame
@@ -139,20 +158,30 @@
Strings have the form:
- <request-id> <type> <flags> <payload>
+ <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
This can be used by user-facing applications and tests for creating
frames easily without having to type out a bunch of constants.
- Request ID is an integer.
+ Request ID and stream IDs are integers.
- Frame type and flags can be specified by integer or named constant.
+ Stream flags, frame type, and flags can be specified by integer or
+ named constant.
Flags can be delimited by `|` to bitwise OR them together.
"""
- requestid, frametype, frameflags, payload = s.split(b' ', 3)
+ fields = s.split(b' ', 5)
+ requestid, streamid, streamflags, frametype, frameflags, payload = fields
requestid = int(requestid)
+ streamid = int(streamid)
+
+ finalstreamflags = 0
+ for flag in streamflags.split(b'|'):
+ if flag in STREAM_FLAGS:
+ finalstreamflags |= STREAM_FLAGS[flag]
+ else:
+ finalstreamflags |= int(flag)
if frametype in FRAME_TYPES:
frametype = FRAME_TYPES[frametype]
@@ -169,7 +198,8 @@
payload = stringutil.unescapestr(payload)
- return makeframe(requestid=requestid, typeid=frametype,
+ return makeframe(requestid=requestid, streamid=streamid,
+ streamflags=finalstreamflags, typeid=frametype,
flags=finalflags, payload=payload)
def parseheader(data):
@@ -179,17 +209,21 @@
buffer is expected to be large enough to hold a full header.
"""
# 24 bits payload length (little endian)
+ # 16 bits request ID
+ # 8 bits stream ID
+ # 8 bits stream flags
# 4 bits frame type
# 4 bits frame flags
# ... payload
framelength = data[0] + 256 * data[1] + 16384 * data[2]
- requestid = struct.unpack_from(r'<H', data, 3)[0]
- typeflags = data[5]
+ requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
+ typeflags = data[7]
frametype = (typeflags & 0xf0) >> 4
frameflags = typeflags & 0x0f
- return frameheader(framelength, requestid, frametype, frameflags)
+ return frameheader(framelength, requestid, streamid, streamflags,
+ frametype, frameflags)
def readframe(fh):
"""Read a unified framing protocol frame from a file object.
@@ -216,7 +250,8 @@
raise error.Abort(_('frame length error: expected %d; got %d') %
(h.length, len(payload)))
- return frame(h.requestid, h.typeid, h.flags, payload)
+ return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
+ payload)
def createcommandframes(stream, requestid, cmd, args, datafh=None):
"""Create frames necessary to transmit a request to run a command.
@@ -398,12 +433,28 @@
class stream(object):
"""Represents a logical unidirectional series of frames."""
+ def __init__(self, streamid, active=False):
+ self.streamid = streamid
+ self._active = False
+
def makeframe(self, requestid, typeid, flags, payload):
"""Create a frame to be sent out over this stream.
Only returns the frame instance. Does not actually send it.
"""
- return makeframe(requestid, typeid, flags, payload)
+ streamflags = 0
+ if not self._active:
+ streamflags |= STREAM_FLAG_BEGIN_STREAM
+ self._active = True
+
+ return makeframe(requestid, self.streamid, streamflags, typeid, flags,
+ payload)
+
+def ensureserverstream(stream):
+ if stream.streamid % 2:
+ raise error.ProgrammingError('server should only write to even '
+ 'numbered streams; %d is not even' %
+ stream.streamid)
class serverreactor(object):
"""Holds state of a server handling frame-based protocol requests.
@@ -483,6 +534,8 @@
self._deferoutput = deferoutput
self._state = 'idle'
self._bufferedframegens = []
+ # stream id -> stream instance for all active streams from the client.
+ self._incomingstreams = {}
# request id -> dict of commands that are actively being received.
self._receivingcommands = {}
# Request IDs that have been received and are actively being processed.
@@ -496,6 +549,30 @@
Returns a dict with an ``action`` key that details what action,
if any, the consumer should take next.
"""
+ if not frame.streamid % 2:
+ self._state = 'errored'
+ return self._makeerrorresult(
+ _('received frame with even numbered stream ID: %d') %
+ frame.streamid)
+
+ if frame.streamid not in self._incomingstreams:
+ if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
+ self._state = 'errored'
+ return self._makeerrorresult(
+ _('received frame on unknown inactive stream without '
+ 'beginning of stream flag set'))
+
+ self._incomingstreams[frame.streamid] = stream(frame.streamid)
+
+ if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
+ # TODO handle decoding frames
+ self._state = 'errored'
+ raise error.ProgrammingError('support for decoding stream payloads '
+ 'not yet implemented')
+
+ if frame.streamflags & STREAM_FLAG_END_STREAM:
+ del self._incomingstreams[frame.streamid]
+
handlers = {
'idle': self._onframeidle,
'command-receiving': self._onframecommandreceiving,
@@ -513,6 +590,8 @@
The raw bytes response is passed as an argument.
"""
+ ensureserverstream(stream)
+
def sendframes():
for frame in createbytesresponseframesfrombytes(stream, requestid,
data):
@@ -552,6 +631,8 @@
}
def onapplicationerror(self, stream, requestid, msg):
+ ensureserverstream(stream)
+
return 'sendframes', {
'framegen': createerrorframe(stream, requestid, msg,
application=True),