view mercurial/wireprotov2peer.py @ 39438:c734a5c82f38

wireprotov2peer: split responsedata handling into separate function So we don't have so much logic inside an if/elif block. Differential Revision: https://phab.mercurial-scm.org/D4439
author Gregory Szorc <gregory.szorc@gmail.com>
date Thu, 23 Aug 2018 13:46:39 -0700
parents 1467b6c27ff9
children 9f51fd22ed50
line wrap: on
line source

# wireprotov2peer.py - client side code for wire protocol version 2
#
# Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.

from __future__ import absolute_import

from .i18n import _
from .thirdparty import (
    cbor,
)
from . import (
    encoding,
    error,
    util,
    wireprotoframing,
)

def formatrichmessage(atoms):
    """Format an encoded message from the framing protocol."""

    chunks = []

    for atom in atoms:
        msg = _(atom[b'msg'])

        if b'args' in atom:
            msg = msg % atom[b'args']

        chunks.append(msg)

    return b''.join(chunks)

class commandresponse(object):
    """Represents the response to a command request."""

    def __init__(self, requestid, command):
        self.requestid = requestid
        self.command = command

        self.b = util.bytesio()

    def cborobjects(self):
        """Obtain decoded CBOR objects from this response."""
        size = self.b.tell()
        self.b.seek(0)

        decoder = cbor.CBORDecoder(self.b)

        while self.b.tell() < size:
            yield decoder.decode()

class clienthandler(object):
    """Object to handle higher-level client activities.

    The ``clientreactor`` is used to hold low-level state about the frame-based
    protocol, such as which requests and streams are active. This type is used
    for higher-level operations, such as reading frames from a socket, exposing
    and managing a higher-level primitive for representing command responses,
    etc. This class is what peers should probably use to bridge wire activity
    with the higher-level peer API.
    """

    def __init__(self, ui, clientreactor):
        self._ui = ui
        self._reactor = clientreactor
        self._requests = {}
        self._futures = {}
        self._responses = {}

    def callcommand(self, command, args, f):
        """Register a request to call a command.

        Returns an iterable of frames that should be sent over the wire.
        """
        request, action, meta = self._reactor.callcommand(command, args)

        if action != 'noop':
            raise error.ProgrammingError('%s not yet supported' % action)

        rid = request.requestid
        self._requests[rid] = request
        self._futures[rid] = f
        self._responses[rid] = commandresponse(rid, command)

        return iter(())

    def flushcommands(self):
        """Flush all queued commands.

        Returns an iterable of frames that should be sent over the wire.
        """
        action, meta = self._reactor.flushcommands()

        if action != 'sendframes':
            raise error.ProgrammingError('%s not yet supported' % action)

        return meta['framegen']

    def readframe(self, fh):
        """Attempt to read and process a frame.

        Returns None if no frame was read. Presumably this means EOF.
        """
        frame = wireprotoframing.readframe(fh)
        if frame is None:
            # TODO tell reactor?
            return

        self._ui.note(_('received %r\n') % frame)
        self._processframe(frame)

        return True

    def _processframe(self, frame):
        """Process a single read frame."""

        action, meta = self._reactor.onframerecv(frame)

        if action == 'error':
            e = error.RepoError(meta['message'])

            if frame.requestid in self._futures:
                self._futures[frame.requestid].set_exception(e)
            else:
                raise e

        if frame.requestid not in self._requests:
            raise error.ProgrammingError(
                'received frame for unknown request; this is either a bug in '
                'the clientreactor not screening for this or this instance was '
                'never told about this request: %r' % frame)

        response = self._responses[frame.requestid]

        if action == 'responsedata':
            self._processresponsedata(frame, meta, response)
        else:
            raise error.ProgrammingError(
                'unhandled action from clientreactor: %s' % action)

    def _processresponsedata(self, frame, meta, response):
        # This buffers all data until end of stream is received. This
        # is bad for performance.
        # TODO make response data streamable
        response.b.write(meta['data'])

        if meta['eos']:
            # If the command has a decoder, resolve the future to the
            # decoded value. Otherwise resolve to the rich response object.
            decoder = COMMAND_DECODERS.get(response.command)

            # TODO consider always resolving the overall status map.
            if decoder:
                objs = response.cborobjects()

                overall = next(objs)

                if overall['status'] == 'ok':
                    self._futures[frame.requestid].set_result(decoder(objs))
                else:
                    e = error.RepoError(
                        formatrichmessage(overall['error']['message']))
                    self._futures[frame.requestid].set_exception(e)
            else:
                self._futures[frame.requestid].set_result(response)

            del self._requests[frame.requestid]
            del self._futures[frame.requestid]

def decodebranchmap(objs):
    # Response should be a single CBOR map of branch name to array of nodes.
    bm = next(objs)

    return {encoding.tolocal(k): v for k, v in bm.items()}

def decodeheads(objs):
    # Array of node bytestrings.
    return next(objs)

def decodeknown(objs):
    # Bytestring where each byte is a 0 or 1.
    raw = next(objs)

    return [True if c == '1' else False for c in raw]

def decodelistkeys(objs):
    # Map with bytestring keys and values.
    return next(objs)

def decodelookup(objs):
    return next(objs)

def decodepushkey(objs):
    return next(objs)

COMMAND_DECODERS = {
    'branchmap': decodebranchmap,
    'heads': decodeheads,
    'known': decodeknown,
    'listkeys': decodelistkeys,
    'lookup': decodelookup,
    'pushkey': decodepushkey,
}