mercurial/wireprotov2peer.py
author Gregory Szorc <gregory.szorc@gmail.com>
Wed, 05 Sep 2018 09:04:40 -0700
changeset 39486 43d92d68ac88
parent 39485 42bc1c70a6b8
child 39559 07b58266bce3
permissions -rw-r--r--
wireprotov2peer: properly format errors formatrichmessage() expects an iterable containing dicts with well-defined keys. We were passing in something else. This caused an exception. Change the code to call formatrichmessage() with the proper argument. And add a TODO to potentially emit the proper data structure from the server in the first place. Differential Revision: https://phab.mercurial-scm.org/D4441

# wireprotov2peer.py - client side code for wire protocol version 2
#
# Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.

from __future__ import absolute_import

from .i18n import _
from . import (
    encoding,
    error,
    util,
    wireprotoframing,
)
from .utils import (
    cborutil,
)

def formatrichmessage(atoms):
    """Format an encoded message from the framing protocol."""

    chunks = []

    for atom in atoms:
        msg = _(atom[b'msg'])

        if b'args' in atom:
            msg = msg % tuple(atom[b'args'])

        chunks.append(msg)

    return b''.join(chunks)

class commandresponse(object):
    """Represents the response to a command request."""

    def __init__(self, requestid, command):
        self.requestid = requestid
        self.command = command

        self.b = util.bytesio()

    def cborobjects(self):
        """Obtain decoded CBOR objects from this response."""
        self.b.seek(0)

        for v in cborutil.decodeall(self.b.getvalue()):
            yield v

class clienthandler(object):
    """Object to handle higher-level client activities.

    The ``clientreactor`` is used to hold low-level state about the frame-based
    protocol, such as which requests and streams are active. This type is used
    for higher-level operations, such as reading frames from a socket, exposing
    and managing a higher-level primitive for representing command responses,
    etc. This class is what peers should probably use to bridge wire activity
    with the higher-level peer API.
    """

    def __init__(self, ui, clientreactor):
        self._ui = ui
        self._reactor = clientreactor
        self._requests = {}
        self._futures = {}
        self._responses = {}

    def callcommand(self, command, args, f):
        """Register a request to call a command.

        Returns an iterable of frames that should be sent over the wire.
        """
        request, action, meta = self._reactor.callcommand(command, args)

        if action != 'noop':
            raise error.ProgrammingError('%s not yet supported' % action)

        rid = request.requestid
        self._requests[rid] = request
        self._futures[rid] = f
        self._responses[rid] = commandresponse(rid, command)

        return iter(())

    def flushcommands(self):
        """Flush all queued commands.

        Returns an iterable of frames that should be sent over the wire.
        """
        action, meta = self._reactor.flushcommands()

        if action != 'sendframes':
            raise error.ProgrammingError('%s not yet supported' % action)

        return meta['framegen']

    def readframe(self, fh):
        """Attempt to read and process a frame.

        Returns None if no frame was read. Presumably this means EOF.
        """
        frame = wireprotoframing.readframe(fh)
        if frame is None:
            # TODO tell reactor?
            return

        self._ui.note(_('received %r\n') % frame)
        self._processframe(frame)

        return True

    def _processframe(self, frame):
        """Process a single read frame."""

        action, meta = self._reactor.onframerecv(frame)

        if action == 'error':
            e = error.RepoError(meta['message'])

            if frame.requestid in self._futures:
                self._futures[frame.requestid].set_exception(e)
            else:
                raise e

        if frame.requestid not in self._requests:
            raise error.ProgrammingError(
                'received frame for unknown request; this is either a bug in '
                'the clientreactor not screening for this or this instance was '
                'never told about this request: %r' % frame)

        response = self._responses[frame.requestid]

        if action == 'responsedata':
            # Any failures processing this frame should bubble up to the
            # future tracking the request.
            try:
                self._processresponsedata(frame, meta, response)
            except BaseException as e:
                self._futures[frame.requestid].set_exception(e)
        else:
            raise error.ProgrammingError(
                'unhandled action from clientreactor: %s' % action)

    def _processresponsedata(self, frame, meta, response):
        # This buffers all data until end of stream is received. This
        # is bad for performance.
        # TODO make response data streamable
        response.b.write(meta['data'])

        if meta['eos']:
            # If the command has a decoder, resolve the future to the
            # decoded value. Otherwise resolve to the rich response object.
            decoder = COMMAND_DECODERS.get(response.command)

            # TODO consider always resolving the overall status map.
            if decoder:
                objs = response.cborobjects()

                overall = next(objs)

                if overall['status'] == 'ok':
                    self._futures[frame.requestid].set_result(decoder(objs))
                else:
                    atoms = [{'msg': overall['error']['message']}]
                    if 'args' in overall['error']:
                        atoms[0]['args'] = overall['error']['args']
                    e = error.RepoError(formatrichmessage(atoms))
                    self._futures[frame.requestid].set_exception(e)
            else:
                self._futures[frame.requestid].set_result(response)

            del self._requests[frame.requestid]
            del self._futures[frame.requestid]

def decodebranchmap(objs):
    # Response should be a single CBOR map of branch name to array of nodes.
    bm = next(objs)

    return {encoding.tolocal(k): v for k, v in bm.items()}

def decodeheads(objs):
    # Array of node bytestrings.
    return next(objs)

def decodeknown(objs):
    # Bytestring where each byte is a 0 or 1.
    raw = next(objs)

    return [True if c == '1' else False for c in raw]

def decodelistkeys(objs):
    # Map with bytestring keys and values.
    return next(objs)

def decodelookup(objs):
    return next(objs)

def decodepushkey(objs):
    return next(objs)

COMMAND_DECODERS = {
    'branchmap': decodebranchmap,
    'heads': decodeheads,
    'known': decodeknown,
    'listkeys': decodelistkeys,
    'lookup': decodelookup,
    'pushkey': decodepushkey,
}