--- a/mercurial/wireprotoframing.py Mon Mar 26 10:50:36 2018 -0700
+++ b/mercurial/wireprotoframing.py Mon Mar 26 14:34:32 2018 -0700
@@ -39,8 +39,7 @@
b'encoded': STREAM_FLAG_ENCODING_APPLIED,
}
-FRAME_TYPE_COMMAND_NAME = 0x01
-FRAME_TYPE_COMMAND_ARGUMENT = 0x02
+FRAME_TYPE_COMMAND_REQUEST = 0x01
FRAME_TYPE_COMMAND_DATA = 0x03
FRAME_TYPE_BYTES_RESPONSE = 0x04
FRAME_TYPE_ERROR_RESPONSE = 0x05
@@ -49,8 +48,7 @@
FRAME_TYPE_STREAM_SETTINGS = 0x08
FRAME_TYPES = {
- b'command-name': FRAME_TYPE_COMMAND_NAME,
- b'command-argument': FRAME_TYPE_COMMAND_ARGUMENT,
+ b'command-request': FRAME_TYPE_COMMAND_REQUEST,
b'command-data': FRAME_TYPE_COMMAND_DATA,
b'bytes-response': FRAME_TYPE_BYTES_RESPONSE,
b'error-response': FRAME_TYPE_ERROR_RESPONSE,
@@ -59,22 +57,16 @@
b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
}
-FLAG_COMMAND_NAME_EOS = 0x01
-FLAG_COMMAND_NAME_HAVE_ARGS = 0x02
-FLAG_COMMAND_NAME_HAVE_DATA = 0x04
+FLAG_COMMAND_REQUEST_NEW = 0x01
+FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
+FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
+FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
-FLAGS_COMMAND = {
- b'eos': FLAG_COMMAND_NAME_EOS,
- b'have-args': FLAG_COMMAND_NAME_HAVE_ARGS,
- b'have-data': FLAG_COMMAND_NAME_HAVE_DATA,
-}
-
-FLAG_COMMAND_ARGUMENT_CONTINUATION = 0x01
-FLAG_COMMAND_ARGUMENT_EOA = 0x02
-
-FLAGS_COMMAND_ARGUMENT = {
- b'continuation': FLAG_COMMAND_ARGUMENT_CONTINUATION,
- b'eoa': FLAG_COMMAND_ARGUMENT_EOA,
+FLAGS_COMMAND_REQUEST = {
+ b'new': FLAG_COMMAND_REQUEST_NEW,
+ b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
+ b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
+ b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
}
FLAG_COMMAND_DATA_CONTINUATION = 0x01
@@ -103,8 +95,7 @@
# Maps frame types to their available flags.
FRAME_TYPE_FLAGS = {
- FRAME_TYPE_COMMAND_NAME: FLAGS_COMMAND,
- FRAME_TYPE_COMMAND_ARGUMENT: FLAGS_COMMAND_ARGUMENT,
+ FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
FRAME_TYPE_BYTES_RESPONSE: FLAGS_BYTES_RESPONSE,
FRAME_TYPE_ERROR_RESPONSE: FLAGS_ERROR_RESPONSE,
@@ -113,7 +104,7 @@
FRAME_TYPE_STREAM_SETTINGS: {},
}
-ARGUMENT_FRAME_HEADER = struct.Struct(r'<HH')
+ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
@attr.s(slots=True)
class frameheader(object):
@@ -269,43 +260,48 @@
return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
payload)
-def createcommandframes(stream, requestid, cmd, args, datafh=None):
+def createcommandframes(stream, requestid, cmd, args, datafh=None,
+ maxframesize=DEFAULT_MAX_FRAME_SIZE):
"""Create frames necessary to transmit a request to run a command.
This is a generator of bytearrays. Each item represents a frame
ready to be sent over the wire to a peer.
"""
- flags = 0
+ data = {b'name': cmd}
if args:
- flags |= FLAG_COMMAND_NAME_HAVE_ARGS
- if datafh:
- flags |= FLAG_COMMAND_NAME_HAVE_DATA
+ data[b'args'] = args
- if not flags:
- flags |= FLAG_COMMAND_NAME_EOS
+ data = cbor.dumps(data, canonical=True)
- yield stream.makeframe(requestid=requestid, typeid=FRAME_TYPE_COMMAND_NAME,
- flags=flags, payload=cmd)
+ offset = 0
+
+ while True:
+ flags = 0
- for i, k in enumerate(sorted(args)):
- v = args[k]
- last = i == len(args) - 1
+ # Must set new or continuation flag.
+ if not offset:
+ flags |= FLAG_COMMAND_REQUEST_NEW
+ else:
+ flags |= FLAG_COMMAND_REQUEST_CONTINUATION
- # TODO handle splitting of argument values across frames.
- payload = bytearray(ARGUMENT_FRAME_HEADER.size + len(k) + len(v))
- offset = 0
- ARGUMENT_FRAME_HEADER.pack_into(payload, offset, len(k), len(v))
- offset += ARGUMENT_FRAME_HEADER.size
- payload[offset:offset + len(k)] = k
- offset += len(k)
- payload[offset:offset + len(v)] = v
+ # Data frames is set on all frames.
+ if datafh:
+ flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
- flags = FLAG_COMMAND_ARGUMENT_EOA if last else 0
+ payload = data[offset:offset + maxframesize]
+ offset += len(payload)
+
+ if len(payload) == maxframesize and offset < len(data):
+ flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
+
yield stream.makeframe(requestid=requestid,
- typeid=FRAME_TYPE_COMMAND_ARGUMENT,
+ typeid=FRAME_TYPE_COMMAND_REQUEST,
flags=flags,
payload=payload)
+ if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
+ break
+
if datafh:
while True:
data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
@@ -673,6 +669,12 @@
def _makeruncommandresult(self, requestid):
entry = self._receivingcommands[requestid]
+
+ if not entry['requestdone']:
+ self._state = 'errored'
+ raise error.ProgrammingError('should not be called without '
+ 'requestdone set')
+
del self._receivingcommands[requestid]
if self._receivingcommands:
@@ -680,13 +682,25 @@
else:
self._state = 'idle'
+ # Decode the payloads as CBOR.
+ entry['payload'].seek(0)
+ request = cbor.load(entry['payload'])
+
+ if b'name' not in request:
+ self._state = 'errored'
+ return self._makeerrorresult(
+ _('command request missing "name" field'))
+
+ if b'args' not in request:
+ request[b'args'] = {}
+
assert requestid not in self._activecommands
self._activecommands.add(requestid)
return 'runcommand', {
'requestid': requestid,
- 'command': entry['command'],
- 'args': entry['args'],
+ 'command': request[b'name'],
+ 'args': request[b'args'],
'data': entry['data'].getvalue() if entry['data'] else None,
}
@@ -695,13 +709,33 @@
'state': self._state,
}
+ def _validatecommandrequestframe(self, frame):
+ new = frame.flags & FLAG_COMMAND_REQUEST_NEW
+ continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
+
+ if new and continuation:
+ self._state = 'errored'
+ return self._makeerrorresult(
+ _('received command request frame with both new and '
+ 'continuation flags set'))
+
+ if not new and not continuation:
+ self._state = 'errored'
+ return self._makeerrorresult(
+ _('received command request frame with neither new nor '
+ 'continuation flags set'))
+
def _onframeidle(self, frame):
# The only frame type that should be received in this state is a
# command request.
- if frame.typeid != FRAME_TYPE_COMMAND_NAME:
+ if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
self._state = 'errored'
return self._makeerrorresult(
- _('expected command frame; got %d') % frame.typeid)
+ _('expected command request frame; got %d') % frame.typeid)
+
+ res = self._validatecommandrequestframe(frame)
+ if res:
+ return res
if frame.requestid in self._receivingcommands:
self._state = 'errored'
@@ -710,35 +744,45 @@
if frame.requestid in self._activecommands:
self._state = 'errored'
- return self._makeerrorresult((
- _('request with ID %d is already active') % frame.requestid))
+ return self._makeerrorresult(
+ _('request with ID %d is already active') % frame.requestid)
+
+ new = frame.flags & FLAG_COMMAND_REQUEST_NEW
+ moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
+ expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
- expectingargs = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_ARGS)
- expectingdata = bool(frame.flags & FLAG_COMMAND_NAME_HAVE_DATA)
+ if not new:
+ self._state = 'errored'
+ return self._makeerrorresult(
+ _('received command request frame without new flag set'))
+
+ payload = util.bytesio()
+ payload.write(frame.payload)
self._receivingcommands[frame.requestid] = {
- 'command': frame.payload,
- 'args': {},
+ 'payload': payload,
'data': None,
- 'expectingargs': expectingargs,
- 'expectingdata': expectingdata,
+ 'requestdone': not moreframes,
+ 'expectingdata': bool(expectingdata),
}
- if frame.flags & FLAG_COMMAND_NAME_EOS:
+ # This is the final frame for this request. Dispatch it.
+ if not moreframes and not expectingdata:
return self._makeruncommandresult(frame.requestid)
- if expectingargs or expectingdata:
- self._state = 'command-receiving'
- return self._makewantframeresult()
- else:
- self._state = 'errored'
- return self._makeerrorresult(_('missing frame flags on '
- 'command frame'))
+ assert moreframes or expectingdata
+ self._state = 'command-receiving'
+ return self._makewantframeresult()
def _onframecommandreceiving(self, frame):
- # It could be a new command request. Process it as such.
- if frame.typeid == FRAME_TYPE_COMMAND_NAME:
- return self._onframeidle(frame)
+ if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
+ # Process new command requests as such.
+ if frame.flags & FLAG_COMMAND_REQUEST_NEW:
+ return self._onframeidle(frame)
+
+ res = self._validatecommandrequestframe(frame)
+ if res:
+ return res
# All other frames should be related to a command that is currently
# receiving but is not active.
@@ -756,14 +800,30 @@
entry = self._receivingcommands[frame.requestid]
- if frame.typeid == FRAME_TYPE_COMMAND_ARGUMENT:
- if not entry['expectingargs']:
+ if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
+ moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
+ expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
+
+ if entry['requestdone']:
+ self._state = 'errored'
+ return self._makeerrorresult(
+ _('received command request frame when request frames '
+ 'were supposedly done'))
+
+ if expectingdata != entry['expectingdata']:
self._state = 'errored'
- return self._makeerrorresult(_(
- 'received command argument frame for request that is not '
- 'expecting arguments: %d') % frame.requestid)
+ return self._makeerrorresult(
+ _('mismatch between expect data flag and previous frame'))
+
+ entry['payload'].write(frame.payload)
- return self._handlecommandargsframe(frame, entry)
+ if not moreframes:
+ entry['requestdone'] = True
+
+ if not moreframes and not expectingdata:
+ return self._makeruncommandresult(frame.requestid)
+
+ return self._makewantframeresult()
elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
if not entry['expectingdata']:
@@ -776,50 +836,10 @@
entry['data'] = util.bytesio()
return self._handlecommanddataframe(frame, entry)
-
- def _handlecommandargsframe(self, frame, entry):
- # The frame and state of command should have already been validated.
- assert frame.typeid == FRAME_TYPE_COMMAND_ARGUMENT
-
- offset = 0
- namesize, valuesize = ARGUMENT_FRAME_HEADER.unpack_from(frame.payload)
- offset += ARGUMENT_FRAME_HEADER.size
-
- # The argument name MUST fit inside the frame.
- argname = bytes(frame.payload[offset:offset + namesize])
- offset += namesize
-
- if len(argname) != namesize:
+ else:
self._state = 'errored'
- return self._makeerrorresult(_('malformed argument frame: '
- 'partial argument name'))
-
- argvalue = bytes(frame.payload[offset:])
-
- # Argument value spans multiple frames. Record our active state
- # and wait for the next frame.
- if frame.flags & FLAG_COMMAND_ARGUMENT_CONTINUATION:
- raise error.ProgrammingError('not yet implemented')
-
- # Common case: the argument value is completely contained in this
- # frame.
-
- if len(argvalue) != valuesize:
- self._state = 'errored'
- return self._makeerrorresult(_('malformed argument frame: '
- 'partial argument value'))
-
- entry['args'][argname] = argvalue
-
- if frame.flags & FLAG_COMMAND_ARGUMENT_EOA:
- if entry['expectingdata']:
- # TODO signal request to run a command once we don't
- # buffer data frames.
- return self._makewantframeresult()
- else:
- return self._makeruncommandresult(frame.requestid)
- else:
- return self._makewantframeresult()
+ return self._makeerrorresult(_(
+ 'received unexpected frame type: %d') % frame.typeid)
def _handlecommanddataframe(self, frame, entry):
assert frame.typeid == FRAME_TYPE_COMMAND_DATA