--- a/mercurial/wireprotoframing.py Sat Oct 05 10:29:34 2019 -0400
+++ b/mercurial/wireprotoframing.py Sun Oct 06 09:45:02 2019 -0400
@@ -15,9 +15,7 @@
import struct
from .i18n import _
-from .thirdparty import (
- attr,
-)
+from .thirdparty import attr
from . import (
encoding,
error,
@@ -121,6 +119,7 @@
ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
+
def humanflags(mapping, value):
"""Convert a numeric flags value to a human value, using a mapping table."""
namemap = {v: k for k, v in mapping.iteritems()}
@@ -133,6 +132,7 @@
return b'|'.join(flags)
+
@attr.s(slots=True)
class frameheader(object):
"""Represents the data in a frame header."""
@@ -144,6 +144,7 @@
typeid = attr.ib()
flags = attr.ib()
+
@attr.s(slots=True, repr=False)
class frame(object):
"""Represents a parsed frame."""
@@ -163,11 +164,19 @@
typename = name
break
- return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
- 'type=%s; flags=%s)' % (
- len(self.payload), self.requestid, self.streamid,
- humanflags(STREAM_FLAGS, self.streamflags), typename,
- humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
+ return (
+ 'frame(size=%d; request=%d; stream=%d; streamflags=%s; '
+ 'type=%s; flags=%s)'
+ % (
+ len(self.payload),
+ self.requestid,
+ self.streamid,
+ humanflags(STREAM_FLAGS, self.streamflags),
+ typename,
+ humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags),
+ )
+ )
+
def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
"""Assemble a frame into a byte array."""
@@ -189,6 +198,7 @@
return frame
+
def makeframefromhumanstring(s):
"""Create a frame from a human readable string
@@ -238,15 +248,22 @@
finalflags |= int(flag)
if payload.startswith(b'cbor:'):
- payload = b''.join(cborutil.streamencode(
- stringutil.evalpythonliteral(payload[5:])))
+ payload = b''.join(
+ cborutil.streamencode(stringutil.evalpythonliteral(payload[5:]))
+ )
else:
payload = stringutil.unescapestr(payload)
- return makeframe(requestid=requestid, streamid=streamid,
- streamflags=finalstreamflags, typeid=frametype,
- flags=finalflags, payload=payload)
+ return makeframe(
+ requestid=requestid,
+ streamid=streamid,
+ streamflags=finalstreamflags,
+ typeid=frametype,
+ flags=finalflags,
+ payload=payload,
+ )
+
def parseheader(data):
"""Parse a unified framing protocol frame header from a buffer.
@@ -265,11 +282,13 @@
requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
typeflags = data[7]
- frametype = (typeflags & 0xf0) >> 4
- frameflags = typeflags & 0x0f
+ frametype = (typeflags & 0xF0) >> 4
+ frameflags = typeflags & 0x0F
- return frameheader(framelength, requestid, streamid, streamflags,
- frametype, frameflags)
+ return frameheader(
+ framelength, requestid, streamid, streamflags, frametype, frameflags
+ )
+
def readframe(fh):
"""Read a unified framing protocol frame from a file object.
@@ -286,22 +305,34 @@
return None
if readcount != FRAME_HEADER_SIZE:
- raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
- (readcount, header))
+ raise error.Abort(
+ _('received incomplete frame: got %d bytes: %s')
+ % (readcount, header)
+ )
h = parseheader(header)
payload = fh.read(h.length)
if len(payload) != h.length:
- raise error.Abort(_('frame length error: expected %d; got %d') %
- (h.length, len(payload)))
+ raise error.Abort(
+ _('frame length error: expected %d; got %d')
+ % (h.length, len(payload))
+ )
+
+ return frame(
+ h.requestid, h.streamid, h.streamflags, h.typeid, h.flags, payload
+ )
+
- return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
- payload)
-
-def createcommandframes(stream, requestid, cmd, args, datafh=None,
- maxframesize=DEFAULT_MAX_FRAME_SIZE,
- redirect=None):
+def createcommandframes(
+ stream,
+ requestid,
+ cmd,
+ args,
+ datafh=None,
+ maxframesize=DEFAULT_MAX_FRAME_SIZE,
+ redirect=None,
+):
"""Create frames necessary to transmit a request to run a command.
This is a generator of bytearrays. Each item represents a frame
@@ -331,16 +362,18 @@
if datafh:
flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
- payload = data[offset:offset + maxframesize]
+ payload = data[offset : offset + maxframesize]
offset += len(payload)
if len(payload) == maxframesize and offset < len(data):
flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
- yield stream.makeframe(requestid=requestid,
- typeid=FRAME_TYPE_COMMAND_REQUEST,
- flags=flags,
- payload=payload)
+ yield stream.makeframe(
+ requestid=requestid,
+ typeid=FRAME_TYPE_COMMAND_REQUEST,
+ flags=flags,
+ payload=payload,
+ )
if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
break
@@ -357,14 +390,17 @@
assert datafh.read(1) == b''
done = True
- yield stream.makeframe(requestid=requestid,
- typeid=FRAME_TYPE_COMMAND_DATA,
- flags=flags,
- payload=data)
+ yield stream.makeframe(
+ requestid=requestid,
+ typeid=FRAME_TYPE_COMMAND_DATA,
+ flags=flags,
+ payload=data,
+ )
if done:
break
+
def createcommandresponseokframe(stream, requestid):
overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
@@ -377,20 +413,24 @@
else:
encoded = False
- return stream.makeframe(requestid=requestid,
- typeid=FRAME_TYPE_COMMAND_RESPONSE,
- flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
- payload=overall,
- encoded=encoded)
+ return stream.makeframe(
+ requestid=requestid,
+ typeid=FRAME_TYPE_COMMAND_RESPONSE,
+ flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
+ payload=overall,
+ encoded=encoded,
+ )
-def createcommandresponseeosframes(stream, requestid,
- maxframesize=DEFAULT_MAX_FRAME_SIZE):
+
+def createcommandresponseeosframes(
+ stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE
+):
"""Create an empty payload frame representing command end-of-stream."""
payload = stream.flush()
offset = 0
while True:
- chunk = payload[offset:offset + maxframesize]
+ chunk = payload[offset : offset + maxframesize]
offset += len(chunk)
done = offset == len(payload)
@@ -400,26 +440,31 @@
else:
flags = FLAG_COMMAND_RESPONSE_CONTINUATION
- yield stream.makeframe(requestid=requestid,
- typeid=FRAME_TYPE_COMMAND_RESPONSE,
- flags=flags,
- payload=chunk,
- encoded=payload != b'')
+ yield stream.makeframe(
+ requestid=requestid,
+ typeid=FRAME_TYPE_COMMAND_RESPONSE,
+ flags=flags,
+ payload=chunk,
+ encoded=payload != b'',
+ )
if done:
break
+
def createalternatelocationresponseframe(stream, requestid, location):
data = {
b'status': b'redirect',
- b'location': {
- b'url': location.url,
- b'mediatype': location.mediatype,
- }
+ b'location': {b'url': location.url, b'mediatype': location.mediatype,},
}
- for a in (r'size', r'fullhashes', r'fullhashseed', r'serverdercerts',
- r'servercadercerts'):
+ for a in (
+ r'size',
+ r'fullhashes',
+ r'fullhashseed',
+ r'serverdercerts',
+ r'servercadercerts',
+ ):
value = getattr(location, a)
if value is not None:
data[b'location'][pycompat.bytestr(a)] = value
@@ -432,48 +477,52 @@
else:
encoded = False
- return stream.makeframe(requestid=requestid,
- typeid=FRAME_TYPE_COMMAND_RESPONSE,
- flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
- payload=payload,
- encoded=encoded)
+ return stream.makeframe(
+ requestid=requestid,
+ typeid=FRAME_TYPE_COMMAND_RESPONSE,
+ flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
+ payload=payload,
+ encoded=encoded,
+ )
+
def createcommanderrorresponse(stream, requestid, message, args=None):
# TODO should this be using a list of {'msg': ..., 'args': {}} so atom
# formatting works consistently?
- m = {
- b'status': b'error',
- b'error': {
- b'message': message,
- }
- }
+ m = {b'status': b'error', b'error': {b'message': message,}}
if args:
m[b'error'][b'args'] = args
overall = b''.join(cborutil.streamencode(m))
- yield stream.makeframe(requestid=requestid,
- typeid=FRAME_TYPE_COMMAND_RESPONSE,
- flags=FLAG_COMMAND_RESPONSE_EOS,
- payload=overall)
+ yield stream.makeframe(
+ requestid=requestid,
+ typeid=FRAME_TYPE_COMMAND_RESPONSE,
+ flags=FLAG_COMMAND_RESPONSE_EOS,
+ payload=overall,
+ )
+
def createerrorframe(stream, requestid, msg, errtype):
# TODO properly handle frame size limits.
assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
- payload = b''.join(cborutil.streamencode({
- b'type': errtype,
- b'message': [{b'msg': msg}],
- }))
+ payload = b''.join(
+ cborutil.streamencode({b'type': errtype, b'message': [{b'msg': msg}],})
+ )
- yield stream.makeframe(requestid=requestid,
- typeid=FRAME_TYPE_ERROR_RESPONSE,
- flags=0,
- payload=payload)
+ yield stream.makeframe(
+ requestid=requestid,
+ typeid=FRAME_TYPE_ERROR_RESPONSE,
+ flags=0,
+ payload=payload,
+ )
-def createtextoutputframe(stream, requestid, atoms,
- maxframesize=DEFAULT_MAX_FRAME_SIZE):
+
+def createtextoutputframe(
+ stream, requestid, atoms, maxframesize=DEFAULT_MAX_FRAME_SIZE
+):
"""Create a text output frame to render text to people.
``atoms`` is a 3-tuple of (formatting string, args, labels).
@@ -504,8 +553,9 @@
args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
# Labels must be ASCII.
- labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
- for l in labels]
+ labels = [
+ l.decode(r'ascii', r'strict').encode(r'ascii') for l in labels
+ ]
atom = {b'msg': formatting}
if args:
@@ -520,10 +570,13 @@
if len(payload) > maxframesize:
raise ValueError('cannot encode data in a single frame')
- yield stream.makeframe(requestid=requestid,
- typeid=FRAME_TYPE_TEXT_OUTPUT,
- flags=0,
- payload=payload)
+ yield stream.makeframe(
+ requestid=requestid,
+ typeid=FRAME_TYPE_TEXT_OUTPUT,
+ flags=0,
+ payload=payload,
+ )
+
class bufferingcommandresponseemitter(object):
"""Helper object to emit command response frames intelligently.
@@ -536,6 +589,7 @@
So it might make sense to implement this functionality at the stream
level.
"""
+
def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
self._stream = stream
self._requestid = requestid
@@ -581,7 +635,7 @@
# Now emit frames for the big chunk.
offset = 0
while True:
- chunk = data[offset:offset + self._maxsize]
+ chunk = data[offset : offset + self._maxsize]
offset += len(chunk)
yield self._stream.makeframe(
@@ -589,7 +643,8 @@
typeid=FRAME_TYPE_COMMAND_RESPONSE,
flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
payload=chunk,
- encoded=True)
+ encoded=True,
+ )
if offset == len(data):
return
@@ -625,13 +680,17 @@
typeid=FRAME_TYPE_COMMAND_RESPONSE,
flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
payload=payload,
- encoded=True)
+ encoded=True,
+ )
+
# TODO consider defining encoders/decoders using the util.compressionengine
# mechanism.
+
class identityencoder(object):
"""Encoder for the "identity" stream encoding profile."""
+
def __init__(self, ui):
pass
@@ -644,20 +703,24 @@
def finish(self):
return b''
+
class identitydecoder(object):
"""Decoder for the "identity" stream encoding profile."""
def __init__(self, ui, extraobjs):
if extraobjs:
- raise error.Abort(_('identity decoder received unexpected '
- 'additional values'))
+ raise error.Abort(
+ _('identity decoder received unexpected ' 'additional values')
+ )
def decode(self, data):
return data
+
class zlibencoder(object):
def __init__(self, ui):
import zlib
+
self._zlib = zlib
self._compressor = zlib.compressobj()
@@ -674,13 +737,15 @@
self._compressor = None
return res
+
class zlibdecoder(object):
def __init__(self, ui, extraobjs):
import zlib
if extraobjs:
- raise error.Abort(_('zlib decoder received unexpected '
- 'additional values'))
+ raise error.Abort(
+ _('zlib decoder received unexpected ' 'additional values')
+ )
self._decompressor = zlib.decompressobj()
@@ -692,6 +757,7 @@
return self._decompressor.decompress(data)
+
class zstdbaseencoder(object):
def __init__(self, level):
from . import zstd
@@ -714,38 +780,46 @@
self._compressor = None
return res
+
class zstd8mbencoder(zstdbaseencoder):
def __init__(self, ui):
super(zstd8mbencoder, self).__init__(3)
+
class zstdbasedecoder(object):
def __init__(self, maxwindowsize):
from . import zstd
+
dctx = zstd.ZstdDecompressor(max_window_size=maxwindowsize)
self._decompressor = dctx.decompressobj()
def decode(self, data):
return self._decompressor.decompress(data)
+
class zstd8mbdecoder(zstdbasedecoder):
def __init__(self, ui, extraobjs):
if extraobjs:
- raise error.Abort(_('zstd8mb decoder received unexpected '
- 'additional values'))
+ raise error.Abort(
+ _('zstd8mb decoder received unexpected ' 'additional values')
+ )
super(zstd8mbdecoder, self).__init__(maxwindowsize=8 * 1048576)
+
# We lazily populate this to avoid excessive module imports when importing
# this module.
STREAM_ENCODERS = {}
STREAM_ENCODERS_ORDER = []
+
def populatestreamencoders():
if STREAM_ENCODERS:
return
try:
from . import zstd
+
zstd.__version__
except ImportError:
zstd = None
@@ -761,6 +835,7 @@
STREAM_ENCODERS[b'identity'] = (identityencoder, identitydecoder)
STREAM_ENCODERS_ORDER.append(b'identity')
+
class stream(object):
"""Represents a logical unidirectional series of frames."""
@@ -778,8 +853,10 @@
streamflags |= STREAM_FLAG_BEGIN_STREAM
self._active = True
- return makeframe(requestid, self.streamid, streamflags, typeid, flags,
- payload)
+ return makeframe(
+ requestid, self.streamid, streamflags, typeid, flags, payload
+ )
+
class inputstream(stream):
"""Represents a stream used for receiving data."""
@@ -813,6 +890,7 @@
return self._decoder.flush()
+
class outputstream(stream):
"""Represents a stream used for sending data."""
@@ -851,8 +929,7 @@
self._encoder.finish()
- def makeframe(self, requestid, typeid, flags, payload,
- encoded=False):
+ def makeframe(self, requestid, typeid, flags, payload, encoded=False):
"""Create a frame to be sent out over this stream.
Only returns the frame instance. Does not actually send it.
@@ -866,16 +943,20 @@
if not self.streamsettingssent:
raise error.ProgrammingError(
b'attempting to send encoded frame without sending stream '
- b'settings')
+ b'settings'
+ )
streamflags |= STREAM_FLAG_ENCODING_APPLIED
- if (typeid == FRAME_TYPE_STREAM_SETTINGS
- and flags & FLAG_STREAM_ENCODING_SETTINGS_EOS):
+ if (
+ typeid == FRAME_TYPE_STREAM_SETTINGS
+ and flags & FLAG_STREAM_ENCODING_SETTINGS_EOS
+ ):
self.streamsettingssent = True
- return makeframe(requestid, self.streamid, streamflags, typeid, flags,
- payload)
+ return makeframe(
+ requestid, self.streamid, streamflags, typeid, flags, payload
+ )
def makestreamsettingsframe(self, requestid):
"""Create a stream settings frame for this stream.
@@ -887,19 +968,27 @@
return None
payload = b''.join(cborutil.streamencode(self._encodername))
- return self.makeframe(requestid, FRAME_TYPE_STREAM_SETTINGS,
- FLAG_STREAM_ENCODING_SETTINGS_EOS, payload)
+ return self.makeframe(
+ requestid,
+ FRAME_TYPE_STREAM_SETTINGS,
+ FLAG_STREAM_ENCODING_SETTINGS_EOS,
+ payload,
+ )
+
def ensureserverstream(stream):
if stream.streamid % 2:
- raise error.ProgrammingError('server should only write to even '
- 'numbered streams; %d is not even' %
- stream.streamid)
+ raise error.ProgrammingError(
+ 'server should only write to even '
+ 'numbered streams; %d is not even' % stream.streamid
+ )
+
DEFAULT_PROTOCOL_SETTINGS = {
'contentencodings': [b'identity'],
}
+
class serverreactor(object):
"""Holds state of a server handling frame-based protocol requests.
@@ -1006,23 +1095,28 @@
if not frame.streamid % 2:
self._state = 'errored'
return self._makeerrorresult(
- _('received frame with even numbered stream ID: %d') %
- frame.streamid)
+ _('received frame with even numbered stream ID: %d')
+ % frame.streamid
+ )
if frame.streamid not in self._incomingstreams:
if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
self._state = 'errored'
return self._makeerrorresult(
- _('received frame on unknown inactive stream without '
- 'beginning of stream flag set'))
+ _(
+ 'received frame on unknown inactive stream without '
+ 'beginning of stream flag set'
+ )
+ )
self._incomingstreams[frame.streamid] = inputstream(frame.streamid)
if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
# TODO handle decoding frames
self._state = 'errored'
- raise error.ProgrammingError('support for decoding stream payloads '
- 'not yet implemented')
+ raise error.ProgrammingError(
+ 'support for decoding stream payloads ' 'not yet implemented'
+ )
if frame.streamflags & STREAM_FLAG_END_STREAM:
del self._incomingstreams[frame.streamid]
@@ -1080,20 +1174,25 @@
if emitted:
for frame in createcommandresponseeosframes(
- stream, requestid):
+ stream, requestid
+ ):
yield frame
break
except error.WireprotoCommandError as e:
for frame in createcommanderrorresponse(
- stream, requestid, e.message, e.messageargs):
+ stream, requestid, e.message, e.messageargs
+ ):
yield frame
break
except Exception as e:
for frame in createerrorframe(
- stream, requestid, '%s' % stringutil.forcebytestr(e),
- errtype='server'):
+ stream,
+ requestid,
+ '%s' % stringutil.forcebytestr(e),
+ errtype='server',
+ ):
yield frame
@@ -1106,14 +1205,16 @@
if emitted:
raise error.ProgrammingError(
'alternatelocationresponse seen after initial '
- 'output object')
+ 'output object'
+ )
frame = stream.makestreamsettingsframe(requestid)
if frame:
yield frame
yield createalternatelocationresponseframe(
- stream, requestid, o)
+ stream, requestid, o
+ )
alternatelocationsent = True
emitted = True
@@ -1121,7 +1222,8 @@
if alternatelocationsent:
raise error.ProgrammingError(
- 'object follows alternatelocationresponse')
+ 'object follows alternatelocationresponse'
+ )
if not emitted:
# Frame is optional.
@@ -1147,9 +1249,11 @@
yield frame
elif isinstance(
- o, wireprototypes.indefinitebytestringresponse):
+ o, wireprototypes.indefinitebytestringresponse
+ ):
for chunk in cborutil.streamencodebytestringfromiter(
- o.chunks):
+ o.chunks
+ ):
for frame in emitter.send(chunk):
yield frame
@@ -1161,9 +1265,9 @@
yield frame
except Exception as e:
- for frame in createerrorframe(stream, requestid,
- '%s' % e,
- errtype='server'):
+ for frame in createerrorframe(
+ stream, requestid, '%s' % e, errtype='server'
+ ):
yield frame
break
@@ -1189,25 +1293,22 @@
for frame in gen:
yield frame
- return 'sendframes', {
- 'framegen': makegen(),
- }
+ return 'sendframes', {'framegen': makegen(),}
def _handlesendframes(self, framegen):
if self._deferoutput:
self._bufferedframegens.append(framegen)
return 'noop', {}
else:
- return 'sendframes', {
- 'framegen': framegen,
- }
+ return 'sendframes', {'framegen': framegen,}
def onservererror(self, stream, requestid, msg):
ensureserverstream(stream)
def sendframes():
- for frame in createerrorframe(stream, requestid, msg,
- errtype='server'):
+ for frame in createerrorframe(
+ stream, requestid, msg, errtype='server'
+ ):
yield frame
self._activecommands.remove(requestid)
@@ -1219,8 +1320,9 @@
ensureserverstream(stream)
def sendframes():
- for frame in createcommanderrorresponse(stream, requestid, message,
- args):
+ for frame in createcommanderrorresponse(
+ stream, requestid, message, args
+ ):
yield frame
self._activecommands.remove(requestid)
@@ -1250,17 +1352,16 @@
return s
def _makeerrorresult(self, msg):
- return 'error', {
- 'message': msg,
- }
+ return 'error', {'message': msg,}
def _makeruncommandresult(self, requestid):
entry = self._receivingcommands[requestid]
if not entry['requestdone']:
self._state = 'errored'
- raise error.ProgrammingError('should not be called without '
- 'requestdone set')
+ raise error.ProgrammingError(
+ 'should not be called without ' 'requestdone set'
+ )
del self._receivingcommands[requestid]
@@ -1276,7 +1377,8 @@
if b'name' not in request:
self._state = 'errored'
return self._makeerrorresult(
- _('command request missing "name" field'))
+ _('command request missing "name" field')
+ )
if b'args' not in request:
request[b'args'] = {}
@@ -1284,18 +1386,19 @@
assert requestid not in self._activecommands
self._activecommands.add(requestid)
- return 'runcommand', {
- 'requestid': requestid,
- 'command': request[b'name'],
- 'args': request[b'args'],
- 'redirect': request.get(b'redirect'),
- 'data': entry['data'].getvalue() if entry['data'] else None,
- }
+ return (
+ 'runcommand',
+ {
+ 'requestid': requestid,
+ 'command': request[b'name'],
+ 'args': request[b'args'],
+ 'redirect': request.get(b'redirect'),
+ 'data': entry['data'].getvalue() if entry['data'] else None,
+ },
+ )
def _makewantframeresult(self):
- return 'wantframe', {
- 'state': self._state,
- }
+ return 'wantframe', {'state': self._state,}
def _validatecommandrequestframe(self, frame):
new = frame.flags & FLAG_COMMAND_REQUEST_NEW
@@ -1304,14 +1407,20 @@
if new and continuation:
self._state = 'errored'
return self._makeerrorresult(
- _('received command request frame with both new and '
- 'continuation flags set'))
+ _(
+ 'received command request frame with both new and '
+ 'continuation flags set'
+ )
+ )
if not new and not continuation:
self._state = 'errored'
return self._makeerrorresult(
- _('received command request frame with neither new nor '
- 'continuation flags set'))
+ _(
+ 'received command request frame with neither new nor '
+ 'continuation flags set'
+ )
+ )
def _onframeinitial(self, frame):
# Called when we receive a frame when in the "initial" state.
@@ -1327,8 +1436,12 @@
else:
self._state = 'errored'
return self._makeerrorresult(
- _('expected sender protocol settings or command request '
- 'frame; got %d') % frame.typeid)
+ _(
+ 'expected sender protocol settings or command request '
+ 'frame; got %d'
+ )
+ % frame.typeid
+ )
def _onframeprotocolsettings(self, frame):
assert self._state == 'protocol-settings-receiving'
@@ -1337,8 +1450,9 @@
if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
self._state = 'errored'
return self._makeerrorresult(
- _('expected sender protocol settings frame; got %d') %
- frame.typeid)
+ _('expected sender protocol settings frame; got %d')
+ % frame.typeid
+ )
more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION
eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS
@@ -1346,14 +1460,20 @@
if more and eos:
self._state = 'errored'
return self._makeerrorresult(
- _('sender protocol settings frame cannot have both '
- 'continuation and end of stream flags set'))
+ _(
+ 'sender protocol settings frame cannot have both '
+ 'continuation and end of stream flags set'
+ )
+ )
if not more and not eos:
self._state = 'errored'
return self._makeerrorresult(
- _('sender protocol settings frame must have continuation or '
- 'end of stream flag set'))
+ _(
+ 'sender protocol settings frame must have continuation or '
+ 'end of stream flag set'
+ )
+ )
# TODO establish limits for maximum amount of data that can be
# buffered.
@@ -1363,7 +1483,8 @@
self._state = 'errored'
return self._makeerrorresult(
_('error decoding CBOR from sender protocol settings frame: %s')
- % stringutil.forcebytestr(e))
+ % stringutil.forcebytestr(e)
+ )
if more:
return self._makewantframeresult()
@@ -1376,12 +1497,16 @@
if not decoded:
self._state = 'errored'
return self._makeerrorresult(
- _('sender protocol settings frame did not contain CBOR data'))
+ _('sender protocol settings frame did not contain CBOR data')
+ )
elif len(decoded) > 1:
self._state = 'errored'
return self._makeerrorresult(
- _('sender protocol settings frame contained multiple CBOR '
- 'values'))
+ _(
+ 'sender protocol settings frame contained multiple CBOR '
+ 'values'
+ )
+ )
d = decoded[0]
@@ -1398,7 +1523,8 @@
if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
self._state = 'errored'
return self._makeerrorresult(
- _('expected command request frame; got %d') % frame.typeid)
+ _('expected command request frame; got %d') % frame.typeid
+ )
res = self._validatecommandrequestframe(frame)
if res:
@@ -1407,12 +1533,14 @@
if frame.requestid in self._receivingcommands:
self._state = 'errored'
return self._makeerrorresult(
- _('request with ID %d already received') % frame.requestid)
+ _('request with ID %d already received') % frame.requestid
+ )
if frame.requestid in self._activecommands:
self._state = 'errored'
return self._makeerrorresult(
- _('request with ID %d is already active') % frame.requestid)
+ _('request with ID %d is already active') % frame.requestid
+ )
new = frame.flags & FLAG_COMMAND_REQUEST_NEW
moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
@@ -1421,7 +1549,8 @@
if not new:
self._state = 'errored'
return self._makeerrorresult(
- _('received command request frame without new flag set'))
+ _('received command request frame without new flag set')
+ )
payload = util.bytesio()
payload.write(frame.payload)
@@ -1456,14 +1585,16 @@
if frame.requestid in self._activecommands:
self._state = 'errored'
return self._makeerrorresult(
- _('received frame for request that is still active: %d') %
- frame.requestid)
+ _('received frame for request that is still active: %d')
+ % frame.requestid
+ )
if frame.requestid not in self._receivingcommands:
self._state = 'errored'
return self._makeerrorresult(
- _('received frame for request that is not receiving: %d') %
- frame.requestid)
+ _('received frame for request that is not receiving: %d')
+ % frame.requestid
+ )
entry = self._receivingcommands[frame.requestid]
@@ -1474,13 +1605,17 @@
if entry['requestdone']:
self._state = 'errored'
return self._makeerrorresult(
- _('received command request frame when request frames '
- 'were supposedly done'))
+ _(
+ 'received command request frame when request frames '
+ 'were supposedly done'
+ )
+ )
if expectingdata != entry['expectingdata']:
self._state = 'errored'
return self._makeerrorresult(
- _('mismatch between expect data flag and previous frame'))
+ _('mismatch between expect data flag and previous frame')
+ )
entry['payload'].write(frame.payload)
@@ -1495,9 +1630,13 @@
elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
if not entry['expectingdata']:
self._state = 'errored'
- return self._makeerrorresult(_(
- 'received command data frame for request that is not '
- 'expecting data: %d') % frame.requestid)
+ return self._makeerrorresult(
+ _(
+ 'received command data frame for request that is not '
+ 'expecting data: %d'
+ )
+ % frame.requestid
+ )
if entry['data'] is None:
entry['data'] = util.bytesio()
@@ -1505,8 +1644,9 @@
return self._handlecommanddataframe(frame, entry)
else:
self._state = 'errored'
- return self._makeerrorresult(_(
- 'received unexpected frame type: %d') % frame.typeid)
+ return self._makeerrorresult(
+ _('received unexpected frame type: %d') % frame.typeid
+ )
def _handlecommanddataframe(self, frame, entry):
assert frame.typeid == FRAME_TYPE_COMMAND_DATA
@@ -1521,12 +1661,14 @@
return self._makeruncommandresult(frame.requestid)
else:
self._state = 'errored'
- return self._makeerrorresult(_('command data frame without '
- 'flags'))
+ return self._makeerrorresult(
+ _('command data frame without ' 'flags')
+ )
def _onframeerrored(self, frame):
return self._makeerrorresult(_('server already errored'))
+
class commandrequest(object):
"""Represents a request to run a command."""
@@ -1538,6 +1680,7 @@
self.redirect = redirect
self.state = 'pending'
+
class clientreactor(object):
"""Holds state of a client issuing frame-based protocol requests.
@@ -1584,8 +1727,14 @@
is expected to follow or we're at the end of the response stream,
respectively.
"""
- def __init__(self, ui, hasmultiplesend=False, buffersends=True,
- clientcontentencoders=None):
+
+ def __init__(
+ self,
+ ui,
+ hasmultiplesend=False,
+ buffersends=True,
+ clientcontentencoders=None,
+ ):
"""Create a new instance.
``hasmultiplesend`` indicates whether multiple sends are supported
@@ -1634,24 +1783,28 @@
requestid = self._nextrequestid
self._nextrequestid += 2
- request = commandrequest(requestid, name, args, datafh=datafh,
- redirect=redirect)
+ request = commandrequest(
+ requestid, name, args, datafh=datafh, redirect=redirect
+ )
if self._buffersends:
self._pendingrequests.append(request)
return request, 'noop', {}
else:
if not self._cansend:
- raise error.ProgrammingError('sends cannot be performed on '
- 'this instance')
+ raise error.ProgrammingError(
+ 'sends cannot be performed on ' 'this instance'
+ )
if not self._hasmultiplesend:
self._cansend = False
self._canissuecommands = False
- return request, 'sendframes', {
- 'framegen': self._makecommandframes(request),
- }
+ return (
+ request,
+ 'sendframes',
+ {'framegen': self._makecommandframes(request),},
+ )
def flushcommands(self):
"""Request that all queued commands be sent.
@@ -1667,8 +1820,9 @@
return 'noop', {}
if not self._cansend:
- raise error.ProgrammingError('sends cannot be performed on this '
- 'instance')
+ raise error.ProgrammingError(
+ 'sends cannot be performed on this ' 'instance'
+ )
# If the instance only allows sending once, mark that we have fired
# our one shot.
@@ -1682,9 +1836,7 @@
for frame in self._makecommandframes(request):
yield frame
- return 'sendframes', {
- 'framegen': makeframes(),
- }
+ return 'sendframes', {'framegen': makeframes(),}
def _makecommandframes(self, request):
"""Emit frames to issue a command request.
@@ -1698,22 +1850,27 @@
if not self._protocolsettingssent and self._clientcontentencoders:
self._protocolsettingssent = True
- payload = b''.join(cborutil.streamencode({
- b'contentencodings': self._clientcontentencoders,
- }))
+ payload = b''.join(
+ cborutil.streamencode(
+ {b'contentencodings': self._clientcontentencoders,}
+ )
+ )
yield self._outgoingstream.makeframe(
requestid=request.requestid,
typeid=FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
flags=FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
- payload=payload)
+ payload=payload,
+ )
- res = createcommandframes(self._outgoingstream,
- request.requestid,
- request.name,
- request.args,
- datafh=request.datafh,
- redirect=request.redirect)
+ res = createcommandframes(
+ self._outgoingstream,
+ request.requestid,
+ request.name,
+ request.args,
+ datafh=request.datafh,
+ redirect=request.redirect,
+ )
for frame in res:
yield frame
@@ -1727,21 +1884,29 @@
caller needs to take as a result of receiving this frame.
"""
if frame.streamid % 2:
- return 'error', {
- 'message': (
- _('received frame with odd numbered stream ID: %d') %
- frame.streamid),
- }
+ return (
+ 'error',
+ {
+ 'message': (
+ _('received frame with odd numbered stream ID: %d')
+ % frame.streamid
+ ),
+ },
+ )
if frame.streamid not in self._incomingstreams:
if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
- return 'error', {
- 'message': _('received frame on unknown stream '
- 'without beginning of stream flag set'),
- }
+ return (
+ 'error',
+ {
+ 'message': _(
+ 'received frame on unknown stream '
+ 'without beginning of stream flag set'
+ ),
+ },
+ )
- self._incomingstreams[frame.streamid] = inputstream(
- frame.streamid)
+ self._incomingstreams[frame.streamid] = inputstream(frame.streamid)
stream = self._incomingstreams[frame.streamid]
@@ -1758,10 +1923,15 @@
return self._onstreamsettingsframe(frame)
if frame.requestid not in self._activerequests:
- return 'error', {
- 'message': (_('received frame for inactive request ID: %d') %
- frame.requestid),
- }
+ return (
+ 'error',
+ {
+ 'message': (
+ _('received frame for inactive request ID: %d')
+ % frame.requestid
+ ),
+ },
+ )
request = self._activerequests[frame.requestid]
request.state = 'receiving'
@@ -1773,8 +1943,9 @@
meth = handlers.get(frame.typeid)
if not meth:
- raise error.ProgrammingError('unhandled frame type: %d' %
- frame.typeid)
+ raise error.ProgrammingError(
+ 'unhandled frame type: %d' % frame.typeid
+ )
return meth(request, frame)
@@ -1785,16 +1956,28 @@
eos = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_EOS
if more and eos:
- return 'error', {
- 'message': (_('stream encoding settings frame cannot have both '
- 'continuation and end of stream flags set')),
- }
+ return (
+ 'error',
+ {
+ 'message': (
+ _(
+ 'stream encoding settings frame cannot have both '
+ 'continuation and end of stream flags set'
+ )
+ ),
+ },
+ )
if not more and not eos:
- return 'error', {
- 'message': _('stream encoding settings frame must have '
- 'continuation or end of stream flag set'),
- }
+ return (
+ 'error',
+ {
+ 'message': _(
+ 'stream encoding settings frame must have '
+ 'continuation or end of stream flag set'
+ ),
+ },
+ )
if frame.streamid not in self._streamsettingsdecoders:
decoder = cborutil.bufferingdecoder()
@@ -1805,11 +1988,18 @@
try:
decoder.decode(frame.payload)
except Exception as e:
- return 'error', {
- 'message': (_('error decoding CBOR from stream encoding '
- 'settings frame: %s') %
- stringutil.forcebytestr(e)),
- }
+ return (
+ 'error',
+ {
+ 'message': (
+ _(
+ 'error decoding CBOR from stream encoding '
+ 'settings frame: %s'
+ )
+ % stringutil.forcebytestr(e)
+ ),
+ },
+ )
if more:
return 'noop', {}
@@ -1820,20 +2010,30 @@
del self._streamsettingsdecoders[frame.streamid]
if not decoded:
- return 'error', {
- 'message': _('stream encoding settings frame did not contain '
- 'CBOR data'),
- }
+ return (
+ 'error',
+ {
+ 'message': _(
+ 'stream encoding settings frame did not contain '
+ 'CBOR data'
+ ),
+ },
+ )
try:
- self._incomingstreams[frame.streamid].setdecoder(self._ui,
- decoded[0],
- decoded[1:])
+ self._incomingstreams[frame.streamid].setdecoder(
+ self._ui, decoded[0], decoded[1:]
+ )
except Exception as e:
- return 'error', {
- 'message': (_('error setting stream decoder: %s') %
- stringutil.forcebytestr(e)),
- }
+ return (
+ 'error',
+ {
+ 'message': (
+ _('error setting stream decoder: %s')
+ % stringutil.forcebytestr(e)
+ ),
+ },
+ )
return 'noop', {}
@@ -1842,12 +2042,15 @@
request.state = 'received'
del self._activerequests[request.requestid]
- return 'responsedata', {
- 'request': request,
- 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
- 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
- 'data': frame.payload,
- }
+ return (
+ 'responsedata',
+ {
+ 'request': request,
+ 'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
+ 'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
+ 'data': frame.payload,
+ },
+ )
def _onerrorresponseframe(self, request, frame):
request.state = 'errored'
@@ -1856,8 +2059,7 @@
# The payload should be a CBOR map.
m = cborutil.decodeall(frame.payload)[0]
- return 'error', {
- 'request': request,
- 'type': m['type'],
- 'message': m['message'],
- }
+ return (
+ 'error',
+ {'request': request, 'type': m['type'], 'message': m['message'],},
+ )