mercurial/wireprotoframing.py
changeset 43076 2372284d9457
parent 40328 2c55716f8a1c
child 43077 687b865b95ad
--- a/mercurial/wireprotoframing.py	Sat Oct 05 10:29:34 2019 -0400
+++ b/mercurial/wireprotoframing.py	Sun Oct 06 09:45:02 2019 -0400
@@ -15,9 +15,7 @@
 import struct
 
 from .i18n import _
-from .thirdparty import (
-    attr,
-)
+from .thirdparty import attr
 from . import (
     encoding,
     error,
@@ -121,6 +119,7 @@
 
 ARGUMENT_RECORD_HEADER = struct.Struct(r'<HH')
 
+
 def humanflags(mapping, value):
     """Convert a numeric flags value to a human value, using a mapping table."""
     namemap = {v: k for k, v in mapping.iteritems()}
@@ -133,6 +132,7 @@
 
     return b'|'.join(flags)
 
+
 @attr.s(slots=True)
 class frameheader(object):
     """Represents the data in a frame header."""
@@ -144,6 +144,7 @@
     typeid = attr.ib()
     flags = attr.ib()
 
+
 @attr.s(slots=True, repr=False)
 class frame(object):
     """Represents a parsed frame."""
@@ -163,11 +164,19 @@
                 typename = name
                 break
 
-        return ('frame(size=%d; request=%d; stream=%d; streamflags=%s; '
-                'type=%s; flags=%s)' % (
-            len(self.payload), self.requestid, self.streamid,
-            humanflags(STREAM_FLAGS, self.streamflags), typename,
-            humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags)))
+        return (
+            'frame(size=%d; request=%d; stream=%d; streamflags=%s; '
+            'type=%s; flags=%s)'
+            % (
+                len(self.payload),
+                self.requestid,
+                self.streamid,
+                humanflags(STREAM_FLAGS, self.streamflags),
+                typename,
+                humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags),
+            )
+        )
+
 
 def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
     """Assemble a frame into a byte array."""
@@ -189,6 +198,7 @@
 
     return frame
 
+
 def makeframefromhumanstring(s):
     """Create a frame from a human readable string
 
@@ -238,15 +248,22 @@
             finalflags |= int(flag)
 
     if payload.startswith(b'cbor:'):
-        payload = b''.join(cborutil.streamencode(
-            stringutil.evalpythonliteral(payload[5:])))
+        payload = b''.join(
+            cborutil.streamencode(stringutil.evalpythonliteral(payload[5:]))
+        )
 
     else:
         payload = stringutil.unescapestr(payload)
 
-    return makeframe(requestid=requestid, streamid=streamid,
-                     streamflags=finalstreamflags, typeid=frametype,
-                     flags=finalflags, payload=payload)
+    return makeframe(
+        requestid=requestid,
+        streamid=streamid,
+        streamflags=finalstreamflags,
+        typeid=frametype,
+        flags=finalflags,
+        payload=payload,
+    )
+
 
 def parseheader(data):
     """Parse a unified framing protocol frame header from a buffer.
@@ -265,11 +282,13 @@
     requestid, streamid, streamflags = struct.unpack_from(r'<HBB', data, 3)
     typeflags = data[7]
 
-    frametype = (typeflags & 0xf0) >> 4
-    frameflags = typeflags & 0x0f
+    frametype = (typeflags & 0xF0) >> 4
+    frameflags = typeflags & 0x0F
 
-    return frameheader(framelength, requestid, streamid, streamflags,
-                       frametype, frameflags)
+    return frameheader(
+        framelength, requestid, streamid, streamflags, frametype, frameflags
+    )
+
 
 def readframe(fh):
     """Read a unified framing protocol frame from a file object.
@@ -286,22 +305,34 @@
         return None
 
     if readcount != FRAME_HEADER_SIZE:
-        raise error.Abort(_('received incomplete frame: got %d bytes: %s') %
-                          (readcount, header))
+        raise error.Abort(
+            _('received incomplete frame: got %d bytes: %s')
+            % (readcount, header)
+        )
 
     h = parseheader(header)
 
     payload = fh.read(h.length)
     if len(payload) != h.length:
-        raise error.Abort(_('frame length error: expected %d; got %d') %
-                          (h.length, len(payload)))
+        raise error.Abort(
+            _('frame length error: expected %d; got %d')
+            % (h.length, len(payload))
+        )
+
+    return frame(
+        h.requestid, h.streamid, h.streamflags, h.typeid, h.flags, payload
+    )
+
 
-    return frame(h.requestid, h.streamid, h.streamflags, h.typeid, h.flags,
-                 payload)
-
-def createcommandframes(stream, requestid, cmd, args, datafh=None,
-                        maxframesize=DEFAULT_MAX_FRAME_SIZE,
-                        redirect=None):
+def createcommandframes(
+    stream,
+    requestid,
+    cmd,
+    args,
+    datafh=None,
+    maxframesize=DEFAULT_MAX_FRAME_SIZE,
+    redirect=None,
+):
     """Create frames necessary to transmit a request to run a command.
 
     This is a generator of bytearrays. Each item represents a frame
@@ -331,16 +362,18 @@
         if datafh:
             flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
 
-        payload = data[offset:offset + maxframesize]
+        payload = data[offset : offset + maxframesize]
         offset += len(payload)
 
         if len(payload) == maxframesize and offset < len(data):
             flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
 
-        yield stream.makeframe(requestid=requestid,
-                               typeid=FRAME_TYPE_COMMAND_REQUEST,
-                               flags=flags,
-                               payload=payload)
+        yield stream.makeframe(
+            requestid=requestid,
+            typeid=FRAME_TYPE_COMMAND_REQUEST,
+            flags=flags,
+            payload=payload,
+        )
 
         if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
             break
@@ -357,14 +390,17 @@
                 assert datafh.read(1) == b''
                 done = True
 
-            yield stream.makeframe(requestid=requestid,
-                                   typeid=FRAME_TYPE_COMMAND_DATA,
-                                   flags=flags,
-                                   payload=data)
+            yield stream.makeframe(
+                requestid=requestid,
+                typeid=FRAME_TYPE_COMMAND_DATA,
+                flags=flags,
+                payload=data,
+            )
 
             if done:
                 break
 
+
 def createcommandresponseokframe(stream, requestid):
     overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
 
@@ -377,20 +413,24 @@
     else:
         encoded = False
 
-    return stream.makeframe(requestid=requestid,
-                            typeid=FRAME_TYPE_COMMAND_RESPONSE,
-                            flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
-                            payload=overall,
-                            encoded=encoded)
+    return stream.makeframe(
+        requestid=requestid,
+        typeid=FRAME_TYPE_COMMAND_RESPONSE,
+        flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
+        payload=overall,
+        encoded=encoded,
+    )
 
-def createcommandresponseeosframes(stream, requestid,
-                                   maxframesize=DEFAULT_MAX_FRAME_SIZE):
+
+def createcommandresponseeosframes(
+    stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE
+):
     """Create an empty payload frame representing command end-of-stream."""
     payload = stream.flush()
 
     offset = 0
     while True:
-        chunk = payload[offset:offset + maxframesize]
+        chunk = payload[offset : offset + maxframesize]
         offset += len(chunk)
 
         done = offset == len(payload)
@@ -400,26 +440,31 @@
         else:
             flags = FLAG_COMMAND_RESPONSE_CONTINUATION
 
-        yield stream.makeframe(requestid=requestid,
-                               typeid=FRAME_TYPE_COMMAND_RESPONSE,
-                               flags=flags,
-                               payload=chunk,
-                               encoded=payload != b'')
+        yield stream.makeframe(
+            requestid=requestid,
+            typeid=FRAME_TYPE_COMMAND_RESPONSE,
+            flags=flags,
+            payload=chunk,
+            encoded=payload != b'',
+        )
 
         if done:
             break
 
+
 def createalternatelocationresponseframe(stream, requestid, location):
     data = {
         b'status': b'redirect',
-        b'location': {
-            b'url': location.url,
-            b'mediatype': location.mediatype,
-        }
+        b'location': {b'url': location.url, b'mediatype': location.mediatype,},
     }
 
-    for a in (r'size', r'fullhashes', r'fullhashseed', r'serverdercerts',
-              r'servercadercerts'):
+    for a in (
+        r'size',
+        r'fullhashes',
+        r'fullhashseed',
+        r'serverdercerts',
+        r'servercadercerts',
+    ):
         value = getattr(location, a)
         if value is not None:
             data[b'location'][pycompat.bytestr(a)] = value
@@ -432,48 +477,52 @@
     else:
         encoded = False
 
-    return stream.makeframe(requestid=requestid,
-                            typeid=FRAME_TYPE_COMMAND_RESPONSE,
-                            flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
-                            payload=payload,
-                            encoded=encoded)
+    return stream.makeframe(
+        requestid=requestid,
+        typeid=FRAME_TYPE_COMMAND_RESPONSE,
+        flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
+        payload=payload,
+        encoded=encoded,
+    )
+
 
 def createcommanderrorresponse(stream, requestid, message, args=None):
     # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
     # formatting works consistently?
-    m = {
-        b'status': b'error',
-        b'error': {
-            b'message': message,
-        }
-    }
+    m = {b'status': b'error', b'error': {b'message': message,}}
 
     if args:
         m[b'error'][b'args'] = args
 
     overall = b''.join(cborutil.streamencode(m))
 
-    yield stream.makeframe(requestid=requestid,
-                           typeid=FRAME_TYPE_COMMAND_RESPONSE,
-                           flags=FLAG_COMMAND_RESPONSE_EOS,
-                           payload=overall)
+    yield stream.makeframe(
+        requestid=requestid,
+        typeid=FRAME_TYPE_COMMAND_RESPONSE,
+        flags=FLAG_COMMAND_RESPONSE_EOS,
+        payload=overall,
+    )
+
 
 def createerrorframe(stream, requestid, msg, errtype):
     # TODO properly handle frame size limits.
     assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
 
-    payload = b''.join(cborutil.streamencode({
-        b'type': errtype,
-        b'message': [{b'msg': msg}],
-    }))
+    payload = b''.join(
+        cborutil.streamencode({b'type': errtype, b'message': [{b'msg': msg}],})
+    )
 
-    yield stream.makeframe(requestid=requestid,
-                           typeid=FRAME_TYPE_ERROR_RESPONSE,
-                           flags=0,
-                           payload=payload)
+    yield stream.makeframe(
+        requestid=requestid,
+        typeid=FRAME_TYPE_ERROR_RESPONSE,
+        flags=0,
+        payload=payload,
+    )
 
-def createtextoutputframe(stream, requestid, atoms,
-                          maxframesize=DEFAULT_MAX_FRAME_SIZE):
+
+def createtextoutputframe(
+    stream, requestid, atoms, maxframesize=DEFAULT_MAX_FRAME_SIZE
+):
     """Create a text output frame to render text to people.
 
     ``atoms`` is a 3-tuple of (formatting string, args, labels).
@@ -504,8 +553,9 @@
         args = [a.decode(r'utf-8', r'replace').encode(r'utf-8') for a in args]
 
         # Labels must be ASCII.
-        labels = [l.decode(r'ascii', r'strict').encode(r'ascii')
-                  for l in labels]
+        labels = [
+            l.decode(r'ascii', r'strict').encode(r'ascii') for l in labels
+        ]
 
         atom = {b'msg': formatting}
         if args:
@@ -520,10 +570,13 @@
     if len(payload) > maxframesize:
         raise ValueError('cannot encode data in a single frame')
 
-    yield stream.makeframe(requestid=requestid,
-                           typeid=FRAME_TYPE_TEXT_OUTPUT,
-                           flags=0,
-                           payload=payload)
+    yield stream.makeframe(
+        requestid=requestid,
+        typeid=FRAME_TYPE_TEXT_OUTPUT,
+        flags=0,
+        payload=payload,
+    )
+
 
 class bufferingcommandresponseemitter(object):
     """Helper object to emit command response frames intelligently.
@@ -536,6 +589,7 @@
     So it might make sense to implement this functionality at the stream
     level.
     """
+
     def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
         self._stream = stream
         self._requestid = requestid
@@ -581,7 +635,7 @@
             # Now emit frames for the big chunk.
             offset = 0
             while True:
-                chunk = data[offset:offset + self._maxsize]
+                chunk = data[offset : offset + self._maxsize]
                 offset += len(chunk)
 
                 yield self._stream.makeframe(
@@ -589,7 +643,8 @@
                     typeid=FRAME_TYPE_COMMAND_RESPONSE,
                     flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
                     payload=chunk,
-                    encoded=True)
+                    encoded=True,
+                )
 
                 if offset == len(data):
                     return
@@ -625,13 +680,17 @@
             typeid=FRAME_TYPE_COMMAND_RESPONSE,
             flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
             payload=payload,
-            encoded=True)
+            encoded=True,
+        )
+
 
 # TODO consider defining encoders/decoders using the util.compressionengine
 # mechanism.
 
+
 class identityencoder(object):
     """Encoder for the "identity" stream encoding profile."""
+
     def __init__(self, ui):
         pass
 
@@ -644,20 +703,24 @@
     def finish(self):
         return b''
 
+
 class identitydecoder(object):
     """Decoder for the "identity" stream encoding profile."""
 
     def __init__(self, ui, extraobjs):
         if extraobjs:
-            raise error.Abort(_('identity decoder received unexpected '
-                                'additional values'))
+            raise error.Abort(
+                _('identity decoder received unexpected ' 'additional values')
+            )
 
     def decode(self, data):
         return data
 
+
 class zlibencoder(object):
     def __init__(self, ui):
         import zlib
+
         self._zlib = zlib
         self._compressor = zlib.compressobj()
 
@@ -674,13 +737,15 @@
         self._compressor = None
         return res
 
+
 class zlibdecoder(object):
     def __init__(self, ui, extraobjs):
         import zlib
 
         if extraobjs:
-            raise error.Abort(_('zlib decoder received unexpected '
-                                'additional values'))
+            raise error.Abort(
+                _('zlib decoder received unexpected ' 'additional values')
+            )
 
         self._decompressor = zlib.decompressobj()
 
@@ -692,6 +757,7 @@
 
         return self._decompressor.decompress(data)
 
+
 class zstdbaseencoder(object):
     def __init__(self, level):
         from . import zstd
@@ -714,38 +780,46 @@
         self._compressor = None
         return res
 
+
 class zstd8mbencoder(zstdbaseencoder):
     def __init__(self, ui):
         super(zstd8mbencoder, self).__init__(3)
 
+
 class zstdbasedecoder(object):
     def __init__(self, maxwindowsize):
         from . import zstd
+
         dctx = zstd.ZstdDecompressor(max_window_size=maxwindowsize)
         self._decompressor = dctx.decompressobj()
 
     def decode(self, data):
         return self._decompressor.decompress(data)
 
+
 class zstd8mbdecoder(zstdbasedecoder):
     def __init__(self, ui, extraobjs):
         if extraobjs:
-            raise error.Abort(_('zstd8mb decoder received unexpected '
-                                'additional values'))
+            raise error.Abort(
+                _('zstd8mb decoder received unexpected ' 'additional values')
+            )
 
         super(zstd8mbdecoder, self).__init__(maxwindowsize=8 * 1048576)
 
+
 # We lazily populate this to avoid excessive module imports when importing
 # this module.
 STREAM_ENCODERS = {}
 STREAM_ENCODERS_ORDER = []
 
+
 def populatestreamencoders():
     if STREAM_ENCODERS:
         return
 
     try:
         from . import zstd
+
         zstd.__version__
     except ImportError:
         zstd = None
@@ -761,6 +835,7 @@
     STREAM_ENCODERS[b'identity'] = (identityencoder, identitydecoder)
     STREAM_ENCODERS_ORDER.append(b'identity')
 
+
 class stream(object):
     """Represents a logical unidirectional series of frames."""
 
@@ -778,8 +853,10 @@
             streamflags |= STREAM_FLAG_BEGIN_STREAM
             self._active = True
 
-        return makeframe(requestid, self.streamid, streamflags, typeid, flags,
-                         payload)
+        return makeframe(
+            requestid, self.streamid, streamflags, typeid, flags, payload
+        )
+
 
 class inputstream(stream):
     """Represents a stream used for receiving data."""
@@ -813,6 +890,7 @@
 
         return self._decoder.flush()
 
+
 class outputstream(stream):
     """Represents a stream used for sending data."""
 
@@ -851,8 +929,7 @@
 
         self._encoder.finish()
 
-    def makeframe(self, requestid, typeid, flags, payload,
-                  encoded=False):
+    def makeframe(self, requestid, typeid, flags, payload, encoded=False):
         """Create a frame to be sent out over this stream.
 
         Only returns the frame instance. Does not actually send it.
@@ -866,16 +943,20 @@
             if not self.streamsettingssent:
                 raise error.ProgrammingError(
                     b'attempting to send encoded frame without sending stream '
-                    b'settings')
+                    b'settings'
+                )
 
             streamflags |= STREAM_FLAG_ENCODING_APPLIED
 
-        if (typeid == FRAME_TYPE_STREAM_SETTINGS
-            and flags & FLAG_STREAM_ENCODING_SETTINGS_EOS):
+        if (
+            typeid == FRAME_TYPE_STREAM_SETTINGS
+            and flags & FLAG_STREAM_ENCODING_SETTINGS_EOS
+        ):
             self.streamsettingssent = True
 
-        return makeframe(requestid, self.streamid, streamflags, typeid, flags,
-                         payload)
+        return makeframe(
+            requestid, self.streamid, streamflags, typeid, flags, payload
+        )
 
     def makestreamsettingsframe(self, requestid):
         """Create a stream settings frame for this stream.
@@ -887,19 +968,27 @@
             return None
 
         payload = b''.join(cborutil.streamencode(self._encodername))
-        return self.makeframe(requestid, FRAME_TYPE_STREAM_SETTINGS,
-                              FLAG_STREAM_ENCODING_SETTINGS_EOS, payload)
+        return self.makeframe(
+            requestid,
+            FRAME_TYPE_STREAM_SETTINGS,
+            FLAG_STREAM_ENCODING_SETTINGS_EOS,
+            payload,
+        )
+
 
 def ensureserverstream(stream):
     if stream.streamid % 2:
-        raise error.ProgrammingError('server should only write to even '
-                                     'numbered streams; %d is not even' %
-                                     stream.streamid)
+        raise error.ProgrammingError(
+            'server should only write to even '
+            'numbered streams; %d is not even' % stream.streamid
+        )
+
 
 DEFAULT_PROTOCOL_SETTINGS = {
     'contentencodings': [b'identity'],
 }
 
+
 class serverreactor(object):
     """Holds state of a server handling frame-based protocol requests.
 
@@ -1006,23 +1095,28 @@
         if not frame.streamid % 2:
             self._state = 'errored'
             return self._makeerrorresult(
-                _('received frame with even numbered stream ID: %d') %
-                  frame.streamid)
+                _('received frame with even numbered stream ID: %d')
+                % frame.streamid
+            )
 
         if frame.streamid not in self._incomingstreams:
             if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
                 self._state = 'errored'
                 return self._makeerrorresult(
-                    _('received frame on unknown inactive stream without '
-                      'beginning of stream flag set'))
+                    _(
+                        'received frame on unknown inactive stream without '
+                        'beginning of stream flag set'
+                    )
+                )
 
             self._incomingstreams[frame.streamid] = inputstream(frame.streamid)
 
         if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
             # TODO handle decoding frames
             self._state = 'errored'
-            raise error.ProgrammingError('support for decoding stream payloads '
-                                         'not yet implemented')
+            raise error.ProgrammingError(
+                'support for decoding stream payloads ' 'not yet implemented'
+            )
 
         if frame.streamflags & STREAM_FLAG_END_STREAM:
             del self._incomingstreams[frame.streamid]
@@ -1080,20 +1174,25 @@
 
                     if emitted:
                         for frame in createcommandresponseeosframes(
-                            stream, requestid):
+                            stream, requestid
+                        ):
                             yield frame
                     break
 
                 except error.WireprotoCommandError as e:
                     for frame in createcommanderrorresponse(
-                        stream, requestid, e.message, e.messageargs):
+                        stream, requestid, e.message, e.messageargs
+                    ):
                         yield frame
                     break
 
                 except Exception as e:
                     for frame in createerrorframe(
-                        stream, requestid, '%s' % stringutil.forcebytestr(e),
-                        errtype='server'):
+                        stream,
+                        requestid,
+                        '%s' % stringutil.forcebytestr(e),
+                        errtype='server',
+                    ):
 
                         yield frame
 
@@ -1106,14 +1205,16 @@
                         if emitted:
                             raise error.ProgrammingError(
                                 'alternatelocationresponse seen after initial '
-                                'output object')
+                                'output object'
+                            )
 
                         frame = stream.makestreamsettingsframe(requestid)
                         if frame:
                             yield frame
 
                         yield createalternatelocationresponseframe(
-                            stream, requestid, o)
+                            stream, requestid, o
+                        )
 
                         alternatelocationsent = True
                         emitted = True
@@ -1121,7 +1222,8 @@
 
                     if alternatelocationsent:
                         raise error.ProgrammingError(
-                            'object follows alternatelocationresponse')
+                            'object follows alternatelocationresponse'
+                        )
 
                     if not emitted:
                         # Frame is optional.
@@ -1147,9 +1249,11 @@
                             yield frame
 
                     elif isinstance(
-                        o, wireprototypes.indefinitebytestringresponse):
+                        o, wireprototypes.indefinitebytestringresponse
+                    ):
                         for chunk in cborutil.streamencodebytestringfromiter(
-                            o.chunks):
+                            o.chunks
+                        ):
 
                             for frame in emitter.send(chunk):
                                 yield frame
@@ -1161,9 +1265,9 @@
                                 yield frame
 
                 except Exception as e:
-                    for frame in createerrorframe(stream, requestid,
-                                                  '%s' % e,
-                                                  errtype='server'):
+                    for frame in createerrorframe(
+                        stream, requestid, '%s' % e, errtype='server'
+                    ):
                         yield frame
 
                     break
@@ -1189,25 +1293,22 @@
                 for frame in gen:
                     yield frame
 
-        return 'sendframes', {
-            'framegen': makegen(),
-        }
+        return 'sendframes', {'framegen': makegen(),}
 
     def _handlesendframes(self, framegen):
         if self._deferoutput:
             self._bufferedframegens.append(framegen)
             return 'noop', {}
         else:
-            return 'sendframes', {
-                'framegen': framegen,
-            }
+            return 'sendframes', {'framegen': framegen,}
 
     def onservererror(self, stream, requestid, msg):
         ensureserverstream(stream)
 
         def sendframes():
-            for frame in createerrorframe(stream, requestid, msg,
-                                          errtype='server'):
+            for frame in createerrorframe(
+                stream, requestid, msg, errtype='server'
+            ):
                 yield frame
 
             self._activecommands.remove(requestid)
@@ -1219,8 +1320,9 @@
         ensureserverstream(stream)
 
         def sendframes():
-            for frame in createcommanderrorresponse(stream, requestid, message,
-                                                    args):
+            for frame in createcommanderrorresponse(
+                stream, requestid, message, args
+            ):
                 yield frame
 
             self._activecommands.remove(requestid)
@@ -1250,17 +1352,16 @@
         return s
 
     def _makeerrorresult(self, msg):
-        return 'error', {
-            'message': msg,
-        }
+        return 'error', {'message': msg,}
 
     def _makeruncommandresult(self, requestid):
         entry = self._receivingcommands[requestid]
 
         if not entry['requestdone']:
             self._state = 'errored'
-            raise error.ProgrammingError('should not be called without '
-                                         'requestdone set')
+            raise error.ProgrammingError(
+                'should not be called without ' 'requestdone set'
+            )
 
         del self._receivingcommands[requestid]
 
@@ -1276,7 +1377,8 @@
         if b'name' not in request:
             self._state = 'errored'
             return self._makeerrorresult(
-                _('command request missing "name" field'))
+                _('command request missing "name" field')
+            )
 
         if b'args' not in request:
             request[b'args'] = {}
@@ -1284,18 +1386,19 @@
         assert requestid not in self._activecommands
         self._activecommands.add(requestid)
 
-        return 'runcommand', {
-            'requestid': requestid,
-            'command': request[b'name'],
-            'args': request[b'args'],
-            'redirect': request.get(b'redirect'),
-            'data': entry['data'].getvalue() if entry['data'] else None,
-        }
+        return (
+            'runcommand',
+            {
+                'requestid': requestid,
+                'command': request[b'name'],
+                'args': request[b'args'],
+                'redirect': request.get(b'redirect'),
+                'data': entry['data'].getvalue() if entry['data'] else None,
+            },
+        )
 
     def _makewantframeresult(self):
-        return 'wantframe', {
-            'state': self._state,
-        }
+        return 'wantframe', {'state': self._state,}
 
     def _validatecommandrequestframe(self, frame):
         new = frame.flags & FLAG_COMMAND_REQUEST_NEW
@@ -1304,14 +1407,20 @@
         if new and continuation:
             self._state = 'errored'
             return self._makeerrorresult(
-                _('received command request frame with both new and '
-                  'continuation flags set'))
+                _(
+                    'received command request frame with both new and '
+                    'continuation flags set'
+                )
+            )
 
         if not new and not continuation:
             self._state = 'errored'
             return self._makeerrorresult(
-                _('received command request frame with neither new nor '
-                  'continuation flags set'))
+                _(
+                    'received command request frame with neither new nor '
+                    'continuation flags set'
+                )
+            )
 
     def _onframeinitial(self, frame):
         # Called when we receive a frame when in the "initial" state.
@@ -1327,8 +1436,12 @@
         else:
             self._state = 'errored'
             return self._makeerrorresult(
-                _('expected sender protocol settings or command request '
-                  'frame; got %d') % frame.typeid)
+                _(
+                    'expected sender protocol settings or command request '
+                    'frame; got %d'
+                )
+                % frame.typeid
+            )
 
     def _onframeprotocolsettings(self, frame):
         assert self._state == 'protocol-settings-receiving'
@@ -1337,8 +1450,9 @@
         if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
             self._state = 'errored'
             return self._makeerrorresult(
-                _('expected sender protocol settings frame; got %d') %
-                frame.typeid)
+                _('expected sender protocol settings frame; got %d')
+                % frame.typeid
+            )
 
         more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION
         eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS
@@ -1346,14 +1460,20 @@
         if more and eos:
             self._state = 'errored'
             return self._makeerrorresult(
-                _('sender protocol settings frame cannot have both '
-                  'continuation and end of stream flags set'))
+                _(
+                    'sender protocol settings frame cannot have both '
+                    'continuation and end of stream flags set'
+                )
+            )
 
         if not more and not eos:
             self._state = 'errored'
             return self._makeerrorresult(
-                _('sender protocol settings frame must have continuation or '
-                  'end of stream flag set'))
+                _(
+                    'sender protocol settings frame must have continuation or '
+                    'end of stream flag set'
+                )
+            )
 
         # TODO establish limits for maximum amount of data that can be
         # buffered.
@@ -1363,7 +1483,8 @@
             self._state = 'errored'
             return self._makeerrorresult(
                 _('error decoding CBOR from sender protocol settings frame: %s')
-                % stringutil.forcebytestr(e))
+                % stringutil.forcebytestr(e)
+            )
 
         if more:
             return self._makewantframeresult()
@@ -1376,12 +1497,16 @@
         if not decoded:
             self._state = 'errored'
             return self._makeerrorresult(
-                _('sender protocol settings frame did not contain CBOR data'))
+                _('sender protocol settings frame did not contain CBOR data')
+            )
         elif len(decoded) > 1:
             self._state = 'errored'
             return self._makeerrorresult(
-                _('sender protocol settings frame contained multiple CBOR '
-                  'values'))
+                _(
+                    'sender protocol settings frame contained multiple CBOR '
+                    'values'
+                )
+            )
 
         d = decoded[0]
 
@@ -1398,7 +1523,8 @@
         if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
             self._state = 'errored'
             return self._makeerrorresult(
-                _('expected command request frame; got %d') % frame.typeid)
+                _('expected command request frame; got %d') % frame.typeid
+            )
 
         res = self._validatecommandrequestframe(frame)
         if res:
@@ -1407,12 +1533,14 @@
         if frame.requestid in self._receivingcommands:
             self._state = 'errored'
             return self._makeerrorresult(
-                _('request with ID %d already received') % frame.requestid)
+                _('request with ID %d already received') % frame.requestid
+            )
 
         if frame.requestid in self._activecommands:
             self._state = 'errored'
             return self._makeerrorresult(
-                _('request with ID %d is already active') % frame.requestid)
+                _('request with ID %d is already active') % frame.requestid
+            )
 
         new = frame.flags & FLAG_COMMAND_REQUEST_NEW
         moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
@@ -1421,7 +1549,8 @@
         if not new:
             self._state = 'errored'
             return self._makeerrorresult(
-                _('received command request frame without new flag set'))
+                _('received command request frame without new flag set')
+            )
 
         payload = util.bytesio()
         payload.write(frame.payload)
@@ -1456,14 +1585,16 @@
         if frame.requestid in self._activecommands:
             self._state = 'errored'
             return self._makeerrorresult(
-                _('received frame for request that is still active: %d') %
-                frame.requestid)
+                _('received frame for request that is still active: %d')
+                % frame.requestid
+            )
 
         if frame.requestid not in self._receivingcommands:
             self._state = 'errored'
             return self._makeerrorresult(
-                _('received frame for request that is not receiving: %d') %
-                  frame.requestid)
+                _('received frame for request that is not receiving: %d')
+                % frame.requestid
+            )
 
         entry = self._receivingcommands[frame.requestid]
 
@@ -1474,13 +1605,17 @@
             if entry['requestdone']:
                 self._state = 'errored'
                 return self._makeerrorresult(
-                    _('received command request frame when request frames '
-                      'were supposedly done'))
+                    _(
+                        'received command request frame when request frames '
+                        'were supposedly done'
+                    )
+                )
 
             if expectingdata != entry['expectingdata']:
                 self._state = 'errored'
                 return self._makeerrorresult(
-                    _('mismatch between expect data flag and previous frame'))
+                    _('mismatch between expect data flag and previous frame')
+                )
 
             entry['payload'].write(frame.payload)
 
@@ -1495,9 +1630,13 @@
         elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
             if not entry['expectingdata']:
                 self._state = 'errored'
-                return self._makeerrorresult(_(
-                    'received command data frame for request that is not '
-                    'expecting data: %d') % frame.requestid)
+                return self._makeerrorresult(
+                    _(
+                        'received command data frame for request that is not '
+                        'expecting data: %d'
+                    )
+                    % frame.requestid
+                )
 
             if entry['data'] is None:
                 entry['data'] = util.bytesio()
@@ -1505,8 +1644,9 @@
             return self._handlecommanddataframe(frame, entry)
         else:
             self._state = 'errored'
-            return self._makeerrorresult(_(
-                'received unexpected frame type: %d') % frame.typeid)
+            return self._makeerrorresult(
+                _('received unexpected frame type: %d') % frame.typeid
+            )
 
     def _handlecommanddataframe(self, frame, entry):
         assert frame.typeid == FRAME_TYPE_COMMAND_DATA
@@ -1521,12 +1661,14 @@
             return self._makeruncommandresult(frame.requestid)
         else:
             self._state = 'errored'
-            return self._makeerrorresult(_('command data frame without '
-                                           'flags'))
+            return self._makeerrorresult(
+                _('command data frame without ' 'flags')
+            )
 
     def _onframeerrored(self, frame):
         return self._makeerrorresult(_('server already errored'))
 
+
 class commandrequest(object):
     """Represents a request to run a command."""
 
@@ -1538,6 +1680,7 @@
         self.redirect = redirect
         self.state = 'pending'
 
+
 class clientreactor(object):
     """Holds state of a client issuing frame-based protocol requests.
 
@@ -1584,8 +1727,14 @@
        is expected to follow or we're at the end of the response stream,
        respectively.
     """
-    def __init__(self, ui, hasmultiplesend=False, buffersends=True,
-                 clientcontentencoders=None):
+
+    def __init__(
+        self,
+        ui,
+        hasmultiplesend=False,
+        buffersends=True,
+        clientcontentencoders=None,
+    ):
         """Create a new instance.
 
         ``hasmultiplesend`` indicates whether multiple sends are supported
@@ -1634,24 +1783,28 @@
         requestid = self._nextrequestid
         self._nextrequestid += 2
 
-        request = commandrequest(requestid, name, args, datafh=datafh,
-                                 redirect=redirect)
+        request = commandrequest(
+            requestid, name, args, datafh=datafh, redirect=redirect
+        )
 
         if self._buffersends:
             self._pendingrequests.append(request)
             return request, 'noop', {}
         else:
             if not self._cansend:
-                raise error.ProgrammingError('sends cannot be performed on '
-                                             'this instance')
+                raise error.ProgrammingError(
+                    'sends cannot be performed on ' 'this instance'
+                )
 
             if not self._hasmultiplesend:
                 self._cansend = False
                 self._canissuecommands = False
 
-            return request, 'sendframes', {
-                'framegen': self._makecommandframes(request),
-            }
+            return (
+                request,
+                'sendframes',
+                {'framegen': self._makecommandframes(request),},
+            )
 
     def flushcommands(self):
         """Request that all queued commands be sent.
@@ -1667,8 +1820,9 @@
             return 'noop', {}
 
         if not self._cansend:
-            raise error.ProgrammingError('sends cannot be performed on this '
-                                         'instance')
+            raise error.ProgrammingError(
+                'sends cannot be performed on this ' 'instance'
+            )
 
         # If the instance only allows sending once, mark that we have fired
         # our one shot.
@@ -1682,9 +1836,7 @@
                 for frame in self._makecommandframes(request):
                     yield frame
 
-        return 'sendframes', {
-            'framegen': makeframes(),
-        }
+        return 'sendframes', {'framegen': makeframes(),}
 
     def _makecommandframes(self, request):
         """Emit frames to issue a command request.
@@ -1698,22 +1850,27 @@
         if not self._protocolsettingssent and self._clientcontentencoders:
             self._protocolsettingssent = True
 
-            payload = b''.join(cborutil.streamencode({
-                b'contentencodings': self._clientcontentencoders,
-            }))
+            payload = b''.join(
+                cborutil.streamencode(
+                    {b'contentencodings': self._clientcontentencoders,}
+                )
+            )
 
             yield self._outgoingstream.makeframe(
                 requestid=request.requestid,
                 typeid=FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
                 flags=FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
-                payload=payload)
+                payload=payload,
+            )
 
-        res = createcommandframes(self._outgoingstream,
-                                  request.requestid,
-                                  request.name,
-                                  request.args,
-                                  datafh=request.datafh,
-                                  redirect=request.redirect)
+        res = createcommandframes(
+            self._outgoingstream,
+            request.requestid,
+            request.name,
+            request.args,
+            datafh=request.datafh,
+            redirect=request.redirect,
+        )
 
         for frame in res:
             yield frame
@@ -1727,21 +1884,29 @@
         caller needs to take as a result of receiving this frame.
         """
         if frame.streamid % 2:
-            return 'error', {
-                'message': (
-                    _('received frame with odd numbered stream ID: %d') %
-                    frame.streamid),
-            }
+            return (
+                'error',
+                {
+                    'message': (
+                        _('received frame with odd numbered stream ID: %d')
+                        % frame.streamid
+                    ),
+                },
+            )
 
         if frame.streamid not in self._incomingstreams:
             if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
-                return 'error', {
-                    'message': _('received frame on unknown stream '
-                                 'without beginning of stream flag set'),
-                }
+                return (
+                    'error',
+                    {
+                        'message': _(
+                            'received frame on unknown stream '
+                            'without beginning of stream flag set'
+                        ),
+                    },
+                )
 
-            self._incomingstreams[frame.streamid] = inputstream(
-                frame.streamid)
+            self._incomingstreams[frame.streamid] = inputstream(frame.streamid)
 
         stream = self._incomingstreams[frame.streamid]
 
@@ -1758,10 +1923,15 @@
             return self._onstreamsettingsframe(frame)
 
         if frame.requestid not in self._activerequests:
-            return 'error', {
-                'message': (_('received frame for inactive request ID: %d') %
-                            frame.requestid),
-            }
+            return (
+                'error',
+                {
+                    'message': (
+                        _('received frame for inactive request ID: %d')
+                        % frame.requestid
+                    ),
+                },
+            )
 
         request = self._activerequests[frame.requestid]
         request.state = 'receiving'
@@ -1773,8 +1943,9 @@
 
         meth = handlers.get(frame.typeid)
         if not meth:
-            raise error.ProgrammingError('unhandled frame type: %d' %
-                                         frame.typeid)
+            raise error.ProgrammingError(
+                'unhandled frame type: %d' % frame.typeid
+            )
 
         return meth(request, frame)
 
@@ -1785,16 +1956,28 @@
         eos = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_EOS
 
         if more and eos:
-            return 'error', {
-                'message': (_('stream encoding settings frame cannot have both '
-                              'continuation and end of stream flags set')),
-            }
+            return (
+                'error',
+                {
+                    'message': (
+                        _(
+                            'stream encoding settings frame cannot have both '
+                            'continuation and end of stream flags set'
+                        )
+                    ),
+                },
+            )
 
         if not more and not eos:
-            return 'error', {
-                'message': _('stream encoding settings frame must have '
-                             'continuation or end of stream flag set'),
-            }
+            return (
+                'error',
+                {
+                    'message': _(
+                        'stream encoding settings frame must have '
+                        'continuation or end of stream flag set'
+                    ),
+                },
+            )
 
         if frame.streamid not in self._streamsettingsdecoders:
             decoder = cborutil.bufferingdecoder()
@@ -1805,11 +1988,18 @@
         try:
             decoder.decode(frame.payload)
         except Exception as e:
-            return 'error', {
-                'message': (_('error decoding CBOR from stream encoding '
-                             'settings frame: %s') %
-                           stringutil.forcebytestr(e)),
-            }
+            return (
+                'error',
+                {
+                    'message': (
+                        _(
+                            'error decoding CBOR from stream encoding '
+                            'settings frame: %s'
+                        )
+                        % stringutil.forcebytestr(e)
+                    ),
+                },
+            )
 
         if more:
             return 'noop', {}
@@ -1820,20 +2010,30 @@
         del self._streamsettingsdecoders[frame.streamid]
 
         if not decoded:
-            return 'error', {
-                'message': _('stream encoding settings frame did not contain '
-                             'CBOR data'),
-            }
+            return (
+                'error',
+                {
+                    'message': _(
+                        'stream encoding settings frame did not contain '
+                        'CBOR data'
+                    ),
+                },
+            )
 
         try:
-            self._incomingstreams[frame.streamid].setdecoder(self._ui,
-                                                             decoded[0],
-                                                             decoded[1:])
+            self._incomingstreams[frame.streamid].setdecoder(
+                self._ui, decoded[0], decoded[1:]
+            )
         except Exception as e:
-            return 'error', {
-                'message': (_('error setting stream decoder: %s') %
-                            stringutil.forcebytestr(e)),
-            }
+            return (
+                'error',
+                {
+                    'message': (
+                        _('error setting stream decoder: %s')
+                        % stringutil.forcebytestr(e)
+                    ),
+                },
+            )
 
         return 'noop', {}
 
@@ -1842,12 +2042,15 @@
             request.state = 'received'
             del self._activerequests[request.requestid]
 
-        return 'responsedata', {
-            'request': request,
-            'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
-            'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
-            'data': frame.payload,
-        }
+        return (
+            'responsedata',
+            {
+                'request': request,
+                'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
+                'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
+                'data': frame.payload,
+            },
+        )
 
     def _onerrorresponseframe(self, request, frame):
         request.state = 'errored'
@@ -1856,8 +2059,7 @@
         # The payload should be a CBOR map.
         m = cborutil.decodeall(frame.payload)[0]
 
-        return 'error', {
-            'request': request,
-            'type': m['type'],
-            'message': m['message'],
-        }
+        return (
+            'error',
+            {'request': request, 'type': m['type'], 'message': m['message'],},
+        )