Mercurial > hg
view mercurial/wireprotov2peer.py @ 39489:f1186c292d03
verify: make output less confusing (issue5924)
output before: "500 files, 2035 changesets, 2622 total revisions"
output after: "checked 2035 changesets with 2622 changes to 500 files"
new one was suggested in the comments inside the issue.
Differential Revision: https://phab.mercurial-scm.org/D4476
author | Meirambek Omyrzak <meirambek77@gmail.com> |
---|---|
date | Wed, 05 Sep 2018 01:19:48 +0300 |
parents | 43d92d68ac88 |
children | 07b58266bce3 |
line wrap: on
line source
# wireprotov2peer.py - client side code for wire protocol version 2 # # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com> # # This software may be used and distributed according to the terms of the # GNU General Public License version 2 or any later version. from __future__ import absolute_import from .i18n import _ from . import ( encoding, error, util, wireprotoframing, ) from .utils import ( cborutil, ) def formatrichmessage(atoms): """Format an encoded message from the framing protocol.""" chunks = [] for atom in atoms: msg = _(atom[b'msg']) if b'args' in atom: msg = msg % tuple(atom[b'args']) chunks.append(msg) return b''.join(chunks) class commandresponse(object): """Represents the response to a command request.""" def __init__(self, requestid, command): self.requestid = requestid self.command = command self.b = util.bytesio() def cborobjects(self): """Obtain decoded CBOR objects from this response.""" self.b.seek(0) for v in cborutil.decodeall(self.b.getvalue()): yield v class clienthandler(object): """Object to handle higher-level client activities. The ``clientreactor`` is used to hold low-level state about the frame-based protocol, such as which requests and streams are active. This type is used for higher-level operations, such as reading frames from a socket, exposing and managing a higher-level primitive for representing command responses, etc. This class is what peers should probably use to bridge wire activity with the higher-level peer API. """ def __init__(self, ui, clientreactor): self._ui = ui self._reactor = clientreactor self._requests = {} self._futures = {} self._responses = {} def callcommand(self, command, args, f): """Register a request to call a command. Returns an iterable of frames that should be sent over the wire. """ request, action, meta = self._reactor.callcommand(command, args) if action != 'noop': raise error.ProgrammingError('%s not yet supported' % action) rid = request.requestid self._requests[rid] = request self._futures[rid] = f self._responses[rid] = commandresponse(rid, command) return iter(()) def flushcommands(self): """Flush all queued commands. Returns an iterable of frames that should be sent over the wire. """ action, meta = self._reactor.flushcommands() if action != 'sendframes': raise error.ProgrammingError('%s not yet supported' % action) return meta['framegen'] def readframe(self, fh): """Attempt to read and process a frame. Returns None if no frame was read. Presumably this means EOF. """ frame = wireprotoframing.readframe(fh) if frame is None: # TODO tell reactor? return self._ui.note(_('received %r\n') % frame) self._processframe(frame) return True def _processframe(self, frame): """Process a single read frame.""" action, meta = self._reactor.onframerecv(frame) if action == 'error': e = error.RepoError(meta['message']) if frame.requestid in self._futures: self._futures[frame.requestid].set_exception(e) else: raise e if frame.requestid not in self._requests: raise error.ProgrammingError( 'received frame for unknown request; this is either a bug in ' 'the clientreactor not screening for this or this instance was ' 'never told about this request: %r' % frame) response = self._responses[frame.requestid] if action == 'responsedata': # Any failures processing this frame should bubble up to the # future tracking the request. try: self._processresponsedata(frame, meta, response) except BaseException as e: self._futures[frame.requestid].set_exception(e) else: raise error.ProgrammingError( 'unhandled action from clientreactor: %s' % action) def _processresponsedata(self, frame, meta, response): # This buffers all data until end of stream is received. This # is bad for performance. # TODO make response data streamable response.b.write(meta['data']) if meta['eos']: # If the command has a decoder, resolve the future to the # decoded value. Otherwise resolve to the rich response object. decoder = COMMAND_DECODERS.get(response.command) # TODO consider always resolving the overall status map. if decoder: objs = response.cborobjects() overall = next(objs) if overall['status'] == 'ok': self._futures[frame.requestid].set_result(decoder(objs)) else: atoms = [{'msg': overall['error']['message']}] if 'args' in overall['error']: atoms[0]['args'] = overall['error']['args'] e = error.RepoError(formatrichmessage(atoms)) self._futures[frame.requestid].set_exception(e) else: self._futures[frame.requestid].set_result(response) del self._requests[frame.requestid] del self._futures[frame.requestid] def decodebranchmap(objs): # Response should be a single CBOR map of branch name to array of nodes. bm = next(objs) return {encoding.tolocal(k): v for k, v in bm.items()} def decodeheads(objs): # Array of node bytestrings. return next(objs) def decodeknown(objs): # Bytestring where each byte is a 0 or 1. raw = next(objs) return [True if c == '1' else False for c in raw] def decodelistkeys(objs): # Map with bytestring keys and values. return next(objs) def decodelookup(objs): return next(objs) def decodepushkey(objs): return next(objs) COMMAND_DECODERS = { 'branchmap': decodebranchmap, 'heads': decodeheads, 'known': decodeknown, 'listkeys': decodelistkeys, 'lookup': decodelookup, 'pushkey': decodepushkey, }