mercurial/wireprotov2peer.py
changeset 48561 04688c51f81f
parent 48560 d6c53b40b078
child 48562 bf5dc156bb4c
--- a/mercurial/wireprotov2peer.py	Thu Dec 30 13:25:44 2021 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,576 +0,0 @@
-# wireprotov2peer.py - client side code for wire protocol version 2
-#
-# Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
-#
-# This software may be used and distributed according to the terms of the
-# GNU General Public License version 2 or any later version.
-
-from __future__ import absolute_import
-
-import threading
-
-from .i18n import _
-from . import (
-    encoding,
-    error,
-    pycompat,
-    sslutil,
-    url as urlmod,
-    util,
-    wireprotoframing,
-    wireprototypes,
-)
-from .utils import cborutil
-
-
-def formatrichmessage(atoms):
-    """Format an encoded message from the framing protocol."""
-
-    chunks = []
-
-    for atom in atoms:
-        msg = _(atom[b'msg'])
-
-        if b'args' in atom:
-            msg = msg % tuple(atom[b'args'])
-
-        chunks.append(msg)
-
-    return b''.join(chunks)
-
-
-SUPPORTED_REDIRECT_PROTOCOLS = {
-    b'http',
-    b'https',
-}
-
-SUPPORTED_CONTENT_HASHES = {
-    b'sha1',
-    b'sha256',
-}
-
-
-def redirecttargetsupported(ui, target):
-    """Determine whether a redirect target entry is supported.
-
-    ``target`` should come from the capabilities data structure emitted by
-    the server.
-    """
-    if target.get(b'protocol') not in SUPPORTED_REDIRECT_PROTOCOLS:
-        ui.note(
-            _(b'(remote redirect target %s uses unsupported protocol: %s)\n')
-            % (target[b'name'], target.get(b'protocol', b''))
-        )
-        return False
-
-    if target.get(b'snirequired') and not sslutil.hassni:
-        ui.note(
-            _(b'(redirect target %s requires SNI, which is unsupported)\n')
-            % target[b'name']
-        )
-        return False
-
-    if b'tlsversions' in target:
-        tlsversions = set(target[b'tlsversions'])
-        supported = set()
-
-        for v in sslutil.supportedprotocols:
-            assert v.startswith(b'tls')
-            supported.add(v[3:])
-
-        if not tlsversions & supported:
-            ui.note(
-                _(
-                    b'(remote redirect target %s requires unsupported TLS '
-                    b'versions: %s)\n'
-                )
-                % (target[b'name'], b', '.join(sorted(tlsversions)))
-            )
-            return False
-
-    ui.note(_(b'(remote redirect target %s is compatible)\n') % target[b'name'])
-
-    return True
-
-
-def supportedredirects(ui, apidescriptor):
-    """Resolve the "redirect" command request key given an API descriptor.
-
-    Given an API descriptor returned by the server, returns a data structure
-    that can be used in hte "redirect" field of command requests to advertise
-    support for compatible redirect targets.
-
-    Returns None if no redirect targets are remotely advertised or if none are
-    supported.
-    """
-    if not apidescriptor or b'redirect' not in apidescriptor:
-        return None
-
-    targets = [
-        t[b'name']
-        for t in apidescriptor[b'redirect'][b'targets']
-        if redirecttargetsupported(ui, t)
-    ]
-
-    hashes = [
-        h
-        for h in apidescriptor[b'redirect'][b'hashes']
-        if h in SUPPORTED_CONTENT_HASHES
-    ]
-
-    return {
-        b'targets': targets,
-        b'hashes': hashes,
-    }
-
-
-class commandresponse(object):
-    """Represents the response to a command request.
-
-    Instances track the state of the command and hold its results.
-
-    An external entity is required to update the state of the object when
-    events occur.
-    """
-
-    def __init__(self, requestid, command, fromredirect=False):
-        self.requestid = requestid
-        self.command = command
-        self.fromredirect = fromredirect
-
-        # Whether all remote input related to this command has been
-        # received.
-        self._inputcomplete = False
-
-        # We have a lock that is acquired when important object state is
-        # mutated. This is to prevent race conditions between 1 thread
-        # sending us new data and another consuming it.
-        self._lock = threading.RLock()
-
-        # An event is set when state of the object changes. This event
-        # is waited on by the generator emitting objects.
-        self._serviceable = threading.Event()
-
-        self._pendingevents = []
-        self._pendingerror = None
-        self._decoder = cborutil.bufferingdecoder()
-        self._seeninitial = False
-        self._redirect = None
-
-    def _oninputcomplete(self):
-        with self._lock:
-            self._inputcomplete = True
-            self._serviceable.set()
-
-    def _onresponsedata(self, data):
-        available, readcount, wanted = self._decoder.decode(data)
-
-        if not available:
-            return
-
-        with self._lock:
-            for o in self._decoder.getavailable():
-                if not self._seeninitial and not self.fromredirect:
-                    self._handleinitial(o)
-                    continue
-
-                # We should never see an object after a content redirect,
-                # as the spec says the main status object containing the
-                # content redirect is the only object in the stream. Fail
-                # if we see a misbehaving server.
-                if self._redirect:
-                    raise error.Abort(
-                        _(
-                            b'received unexpected response data '
-                            b'after content redirect; the remote is '
-                            b'buggy'
-                        )
-                    )
-
-                self._pendingevents.append(o)
-
-            self._serviceable.set()
-
-    def _onerror(self, e):
-        self._pendingerror = e
-
-        with self._lock:
-            self._serviceable.set()
-
-    def _handleinitial(self, o):
-        self._seeninitial = True
-        if o[b'status'] == b'ok':
-            return
-
-        elif o[b'status'] == b'redirect':
-            l = o[b'location']
-            self._redirect = wireprototypes.alternatelocationresponse(
-                url=l[b'url'],
-                mediatype=l[b'mediatype'],
-                size=l.get(b'size'),
-                fullhashes=l.get(b'fullhashes'),
-                fullhashseed=l.get(b'fullhashseed'),
-                serverdercerts=l.get(b'serverdercerts'),
-                servercadercerts=l.get(b'servercadercerts'),
-            )
-            return
-
-        atoms = [{b'msg': o[b'error'][b'message']}]
-        if b'args' in o[b'error']:
-            atoms[0][b'args'] = o[b'error'][b'args']
-
-        raise error.RepoError(formatrichmessage(atoms))
-
-    def objects(self):
-        """Obtained decoded objects from this response.
-
-        This is a generator of data structures that were decoded from the
-        command response.
-
-        Obtaining the next member of the generator may block due to waiting
-        on external data to become available.
-
-        If the server encountered an error in the middle of serving the data
-        or if another error occurred, an exception may be raised when
-        advancing the generator.
-        """
-        while True:
-            # TODO this can infinite loop if self._inputcomplete is never
-            # set. We likely want to tie the lifetime of this object/state
-            # to that of the background thread receiving frames and updating
-            # our state.
-            self._serviceable.wait(1.0)
-
-            if self._pendingerror:
-                raise self._pendingerror
-
-            with self._lock:
-                self._serviceable.clear()
-
-                # Make copies because objects could be mutated during
-                # iteration.
-                stop = self._inputcomplete
-                pending = list(self._pendingevents)
-                self._pendingevents[:] = []
-
-            for o in pending:
-                yield o
-
-            if stop:
-                break
-
-
-class clienthandler(object):
-    """Object to handle higher-level client activities.
-
-    The ``clientreactor`` is used to hold low-level state about the frame-based
-    protocol, such as which requests and streams are active. This type is used
-    for higher-level operations, such as reading frames from a socket, exposing
-    and managing a higher-level primitive for representing command responses,
-    etc. This class is what peers should probably use to bridge wire activity
-    with the higher-level peer API.
-    """
-
-    def __init__(
-        self, ui, clientreactor, opener=None, requestbuilder=util.urlreq.request
-    ):
-        self._ui = ui
-        self._reactor = clientreactor
-        self._requests = {}
-        self._futures = {}
-        self._responses = {}
-        self._redirects = []
-        self._frameseof = False
-        self._opener = opener or urlmod.opener(ui)
-        self._requestbuilder = requestbuilder
-
-    def callcommand(self, command, args, f, redirect=None):
-        """Register a request to call a command.
-
-        Returns an iterable of frames that should be sent over the wire.
-        """
-        request, action, meta = self._reactor.callcommand(
-            command, args, redirect=redirect
-        )
-
-        if action != b'noop':
-            raise error.ProgrammingError(b'%s not yet supported' % action)
-
-        rid = request.requestid
-        self._requests[rid] = request
-        self._futures[rid] = f
-        # TODO we need some kind of lifetime on response instances otherwise
-        # objects() may deadlock.
-        self._responses[rid] = commandresponse(rid, command)
-
-        return iter(())
-
-    def flushcommands(self):
-        """Flush all queued commands.
-
-        Returns an iterable of frames that should be sent over the wire.
-        """
-        action, meta = self._reactor.flushcommands()
-
-        if action != b'sendframes':
-            raise error.ProgrammingError(b'%s not yet supported' % action)
-
-        return meta[b'framegen']
-
-    def readdata(self, framefh):
-        """Attempt to read data and do work.
-
-        Returns None if no data was read. Presumably this means we're
-        done with all read I/O.
-        """
-        if not self._frameseof:
-            frame = wireprotoframing.readframe(framefh)
-            if frame is None:
-                # TODO tell reactor?
-                self._frameseof = True
-            else:
-                self._ui.debug(b'received %r\n' % frame)
-                self._processframe(frame)
-
-        # Also try to read the first redirect.
-        if self._redirects:
-            if not self._processredirect(*self._redirects[0]):
-                self._redirects.pop(0)
-
-        if self._frameseof and not self._redirects:
-            return None
-
-        return True
-
-    def _processframe(self, frame):
-        """Process a single read frame."""
-
-        action, meta = self._reactor.onframerecv(frame)
-
-        if action == b'error':
-            e = error.RepoError(meta[b'message'])
-
-            if frame.requestid in self._responses:
-                self._responses[frame.requestid]._oninputcomplete()
-
-            if frame.requestid in self._futures:
-                self._futures[frame.requestid].set_exception(e)
-                del self._futures[frame.requestid]
-            else:
-                raise e
-
-            return
-        elif action == b'noop':
-            return
-        elif action == b'responsedata':
-            # Handled below.
-            pass
-        else:
-            raise error.ProgrammingError(b'action not handled: %s' % action)
-
-        if frame.requestid not in self._requests:
-            raise error.ProgrammingError(
-                b'received frame for unknown request; this is either a bug in '
-                b'the clientreactor not screening for this or this instance was '
-                b'never told about this request: %r' % frame
-            )
-
-        response = self._responses[frame.requestid]
-
-        if action == b'responsedata':
-            # Any failures processing this frame should bubble up to the
-            # future tracking the request.
-            try:
-                self._processresponsedata(frame, meta, response)
-            except BaseException as e:
-                # If an exception occurs before the future is resolved,
-                # fail the future. Otherwise, we stuff the exception on
-                # the response object so it can be raised during objects()
-                # iteration. If nothing is consuming objects(), we could
-                # silently swallow this exception. That's a risk we'll have to
-                # take.
-                if frame.requestid in self._futures:
-                    self._futures[frame.requestid].set_exception(e)
-                    del self._futures[frame.requestid]
-                    response._oninputcomplete()
-                else:
-                    response._onerror(e)
-        else:
-            raise error.ProgrammingError(
-                b'unhandled action from clientreactor: %s' % action
-            )
-
-    def _processresponsedata(self, frame, meta, response):
-        # This can raise. The caller can handle it.
-        response._onresponsedata(meta[b'data'])
-
-        # We need to be careful about resolving futures prematurely. If a
-        # response is a redirect response, resolving the future before the
-        # redirect is processed would result in the consumer seeing an
-        # empty stream of objects, since they'd be consuming our
-        # response.objects() instead of the redirect's response.objects().
-        #
-        # Our strategy is to not resolve/finish the request until either
-        # EOS occurs or until the initial response object is fully received.
-
-        # Always react to eos.
-        if meta[b'eos']:
-            response._oninputcomplete()
-            del self._requests[frame.requestid]
-
-        # Not EOS but we haven't decoded the initial response object yet.
-        # Return and wait for more data.
-        elif not response._seeninitial:
-            return
-
-        # The specification says no objects should follow the initial/redirect
-        # object. So it should be safe to handle the redirect object if one is
-        # decoded, without having to wait for EOS.
-        if response._redirect:
-            self._followredirect(frame.requestid, response._redirect)
-            return
-
-        # If the command has a decoder, we wait until all input has been
-        # received before resolving the future. Otherwise we resolve the
-        # future immediately.
-        if frame.requestid not in self._futures:
-            return
-
-        if response.command not in COMMAND_DECODERS:
-            self._futures[frame.requestid].set_result(response.objects())
-            del self._futures[frame.requestid]
-        elif response._inputcomplete:
-            decoded = COMMAND_DECODERS[response.command](response.objects())
-            self._futures[frame.requestid].set_result(decoded)
-            del self._futures[frame.requestid]
-
-    def _followredirect(self, requestid, redirect):
-        """Called to initiate redirect following for a request."""
-        self._ui.note(_(b'(following redirect to %s)\n') % redirect.url)
-
-        # TODO handle framed responses.
-        if redirect.mediatype != b'application/mercurial-cbor':
-            raise error.Abort(
-                _(b'cannot handle redirects for the %s media type')
-                % redirect.mediatype
-            )
-
-        if redirect.fullhashes:
-            self._ui.warn(
-                _(
-                    b'(support for validating hashes on content '
-                    b'redirects not supported)\n'
-                )
-            )
-
-        if redirect.serverdercerts or redirect.servercadercerts:
-            self._ui.warn(
-                _(
-                    b'(support for pinning server certificates on '
-                    b'content redirects not supported)\n'
-                )
-            )
-
-        headers = {
-            'Accept': redirect.mediatype,
-        }
-
-        req = self._requestbuilder(pycompat.strurl(redirect.url), None, headers)
-
-        try:
-            res = self._opener.open(req)
-        except util.urlerr.httperror as e:
-            if e.code == 401:
-                raise error.Abort(_(b'authorization failed'))
-            raise
-        except util.httplib.HTTPException as e:
-            self._ui.debug(b'http error requesting %s\n' % req.get_full_url())
-            self._ui.traceback()
-            raise IOError(None, e)
-
-        urlmod.wrapresponse(res)
-
-        # The existing response object is associated with frame data. Rather
-        # than try to normalize its state, just create a new object.
-        oldresponse = self._responses[requestid]
-        self._responses[requestid] = commandresponse(
-            requestid, oldresponse.command, fromredirect=True
-        )
-
-        self._redirects.append((requestid, res))
-
-    def _processredirect(self, rid, res):
-        """Called to continue processing a response from a redirect.
-
-        Returns a bool indicating if the redirect is still serviceable.
-        """
-        response = self._responses[rid]
-
-        try:
-            data = res.read(32768)
-            response._onresponsedata(data)
-
-            # We're at end of stream.
-            if not data:
-                response._oninputcomplete()
-
-            if rid not in self._futures:
-                return bool(data)
-
-            if response.command not in COMMAND_DECODERS:
-                self._futures[rid].set_result(response.objects())
-                del self._futures[rid]
-            elif response._inputcomplete:
-                decoded = COMMAND_DECODERS[response.command](response.objects())
-                self._futures[rid].set_result(decoded)
-                del self._futures[rid]
-
-            return bool(data)
-
-        except BaseException as e:
-            self._futures[rid].set_exception(e)
-            del self._futures[rid]
-            response._oninputcomplete()
-            return False
-
-
-def decodebranchmap(objs):
-    # Response should be a single CBOR map of branch name to array of nodes.
-    bm = next(objs)
-
-    return {encoding.tolocal(k): v for k, v in bm.items()}
-
-
-def decodeheads(objs):
-    # Array of node bytestrings.
-    return next(objs)
-
-
-def decodeknown(objs):
-    # Bytestring where each byte is a 0 or 1.
-    raw = next(objs)
-
-    return [True if raw[i : i + 1] == b'1' else False for i in range(len(raw))]
-
-
-def decodelistkeys(objs):
-    # Map with bytestring keys and values.
-    return next(objs)
-
-
-def decodelookup(objs):
-    return next(objs)
-
-
-def decodepushkey(objs):
-    return next(objs)
-
-
-COMMAND_DECODERS = {
-    b'branchmap': decodebranchmap,
-    b'heads': decodeheads,
-    b'known': decodeknown,
-    b'listkeys': decodelistkeys,
-    b'lookup': decodelookup,
-    b'pushkey': decodepushkey,
-}