mercurial/httppeer.py
changeset 48526 04688c51f81f
parent 46907 ffd3e823a7e5
child 48835 a0da5075bca3
--- a/mercurial/httppeer.py	Thu Dec 30 13:25:44 2021 +0100
+++ b/mercurial/httppeer.py	Tue Dec 07 16:44:22 2021 +0100
@@ -13,7 +13,6 @@
 import os
 import socket
 import struct
-import weakref
 
 from .i18n import _
 from .pycompat import getattr
@@ -25,21 +24,9 @@
     statichttprepo,
     url as urlmod,
     util,
-    wireprotoframing,
-    wireprototypes,
     wireprotov1peer,
-    wireprotov2peer,
-    wireprotov2server,
 )
-from .interfaces import (
-    repository,
-    util as interfaceutil,
-)
-from .utils import (
-    cborutil,
-    stringutil,
-    urlutil,
-)
+from .utils import urlutil
 
 httplib = util.httplib
 urlerr = util.urlerr
@@ -331,9 +318,7 @@
         self.respurl = respurl
 
 
-def parsev1commandresponse(
-    ui, baseurl, requrl, qs, resp, compressible, allowcbor=False
-):
+def parsev1commandresponse(ui, baseurl, requrl, qs, resp, compressible):
     # record the url we got redirected to
     redirected = False
     respurl = pycompat.bytesurl(resp.geturl())
@@ -376,17 +361,6 @@
     try:
         subtype = proto.split(b'-', 1)[1]
 
-        # Unless we end up supporting CBOR in the legacy wire protocol,
-        # this should ONLY be encountered for the initial capabilities
-        # request during handshake.
-        if subtype == b'cbor':
-            if allowcbor:
-                return respurl, proto, resp
-            else:
-                raise error.RepoError(
-                    _(b'unexpected CBOR response from server')
-                )
-
         version_info = tuple([int(n) for n in subtype.split(b'.')])
     except ValueError:
         raise error.RepoError(
@@ -564,85 +538,6 @@
         raise exception
 
 
-def sendv2request(
-    ui, opener, requestbuilder, apiurl, permission, requests, redirect
-):
-    wireprotoframing.populatestreamencoders()
-
-    uiencoders = ui.configlist(b'experimental', b'httppeer.v2-encoder-order')
-
-    if uiencoders:
-        encoders = []
-
-        for encoder in uiencoders:
-            if encoder not in wireprotoframing.STREAM_ENCODERS:
-                ui.warn(
-                    _(
-                        b'wire protocol version 2 encoder referenced in '
-                        b'config (%s) is not known; ignoring\n'
-                    )
-                    % encoder
-                )
-            else:
-                encoders.append(encoder)
-
-    else:
-        encoders = wireprotoframing.STREAM_ENCODERS_ORDER
-
-    reactor = wireprotoframing.clientreactor(
-        ui,
-        hasmultiplesend=False,
-        buffersends=True,
-        clientcontentencoders=encoders,
-    )
-
-    handler = wireprotov2peer.clienthandler(
-        ui, reactor, opener=opener, requestbuilder=requestbuilder
-    )
-
-    url = b'%s/%s' % (apiurl, permission)
-
-    if len(requests) > 1:
-        url += b'/multirequest'
-    else:
-        url += b'/%s' % requests[0][0]
-
-    ui.debug(b'sending %d commands\n' % len(requests))
-    for command, args, f in requests:
-        ui.debug(
-            b'sending command %s: %s\n'
-            % (command, stringutil.pprint(args, indent=2))
-        )
-        assert not list(
-            handler.callcommand(command, args, f, redirect=redirect)
-        )
-
-    # TODO stream this.
-    body = b''.join(map(bytes, handler.flushcommands()))
-
-    # TODO modify user-agent to reflect v2
-    headers = {
-        'Accept': wireprotov2server.FRAMINGTYPE,
-        'Content-Type': wireprotov2server.FRAMINGTYPE,
-    }
-
-    req = requestbuilder(pycompat.strurl(url), body, headers)
-    req.add_unredirected_header('Content-Length', '%d' % len(body))
-
-    try:
-        res = opener.open(req)
-    except urlerr.httperror as e:
-        if e.code == 401:
-            raise error.Abort(_(b'authorization failed'))
-
-        raise
-    except httplib.HTTPException as e:
-        ui.traceback()
-        raise IOError(None, e)
-
-    return handler, res
-
-
 class queuedcommandfuture(pycompat.futures.Future):
     """Wraps result() on command futures to trigger submission on call."""
 
@@ -657,302 +552,6 @@
         return self.result(timeout)
 
 
-@interfaceutil.implementer(repository.ipeercommandexecutor)
-class httpv2executor(object):
-    def __init__(
-        self, ui, opener, requestbuilder, apiurl, descriptor, redirect
-    ):
-        self._ui = ui
-        self._opener = opener
-        self._requestbuilder = requestbuilder
-        self._apiurl = apiurl
-        self._descriptor = descriptor
-        self._redirect = redirect
-        self._sent = False
-        self._closed = False
-        self._neededpermissions = set()
-        self._calls = []
-        self._futures = weakref.WeakSet()
-        self._responseexecutor = None
-        self._responsef = None
-
-    def __enter__(self):
-        return self
-
-    def __exit__(self, exctype, excvalue, exctb):
-        self.close()
-
-    def callcommand(self, command, args):
-        if self._sent:
-            raise error.ProgrammingError(
-                b'callcommand() cannot be used after commands are sent'
-            )
-
-        if self._closed:
-            raise error.ProgrammingError(
-                b'callcommand() cannot be used after close()'
-            )
-
-        # The service advertises which commands are available. So if we attempt
-        # to call an unknown command or pass an unknown argument, we can screen
-        # for this.
-        if command not in self._descriptor[b'commands']:
-            raise error.ProgrammingError(
-                b'wire protocol command %s is not available' % command
-            )
-
-        cmdinfo = self._descriptor[b'commands'][command]
-        unknownargs = set(args.keys()) - set(cmdinfo.get(b'args', {}))
-
-        if unknownargs:
-            raise error.ProgrammingError(
-                b'wire protocol command %s does not accept argument: %s'
-                % (command, b', '.join(sorted(unknownargs)))
-            )
-
-        self._neededpermissions |= set(cmdinfo[b'permissions'])
-
-        # TODO we /could/ also validate types here, since the API descriptor
-        # includes types...
-
-        f = pycompat.futures.Future()
-
-        # Monkeypatch it so result() triggers sendcommands(), otherwise result()
-        # could deadlock.
-        f.__class__ = queuedcommandfuture
-        f._peerexecutor = self
-
-        self._futures.add(f)
-        self._calls.append((command, args, f))
-
-        return f
-
-    def sendcommands(self):
-        if self._sent:
-            return
-
-        if not self._calls:
-            return
-
-        self._sent = True
-
-        # Unhack any future types so caller sees a clean type and so we
-        # break reference cycle.
-        for f in self._futures:
-            if isinstance(f, queuedcommandfuture):
-                f.__class__ = pycompat.futures.Future
-                f._peerexecutor = None
-
-        # Mark the future as running and filter out cancelled futures.
-        calls = [
-            (command, args, f)
-            for command, args, f in self._calls
-            if f.set_running_or_notify_cancel()
-        ]
-
-        # Clear out references, prevent improper object usage.
-        self._calls = None
-
-        if not calls:
-            return
-
-        permissions = set(self._neededpermissions)
-
-        if b'push' in permissions and b'pull' in permissions:
-            permissions.remove(b'pull')
-
-        if len(permissions) > 1:
-            raise error.RepoError(
-                _(b'cannot make request requiring multiple permissions: %s')
-                % _(b', ').join(sorted(permissions))
-            )
-
-        permission = {
-            b'push': b'rw',
-            b'pull': b'ro',
-        }[permissions.pop()]
-
-        handler, resp = sendv2request(
-            self._ui,
-            self._opener,
-            self._requestbuilder,
-            self._apiurl,
-            permission,
-            calls,
-            self._redirect,
-        )
-
-        # TODO we probably want to validate the HTTP code, media type, etc.
-
-        self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
-        self._responsef = self._responseexecutor.submit(
-            self._handleresponse, handler, resp
-        )
-
-    def close(self):
-        if self._closed:
-            return
-
-        self.sendcommands()
-
-        self._closed = True
-
-        if not self._responsef:
-            return
-
-        # TODO ^C here may not result in immediate program termination.
-
-        try:
-            self._responsef.result()
-        finally:
-            self._responseexecutor.shutdown(wait=True)
-            self._responsef = None
-            self._responseexecutor = None
-
-            # If any of our futures are still in progress, mark them as
-            # errored, otherwise a result() could wait indefinitely.
-            for f in self._futures:
-                if not f.done():
-                    f.set_exception(
-                        error.ResponseError(_(b'unfulfilled command response'))
-                    )
-
-            self._futures = None
-
-    def _handleresponse(self, handler, resp):
-        # Called in a thread to read the response.
-
-        while handler.readdata(resp):
-            pass
-
-
-@interfaceutil.implementer(repository.ipeerv2)
-class httpv2peer(object):
-
-    limitedarguments = False
-
-    def __init__(
-        self, ui, repourl, apipath, opener, requestbuilder, apidescriptor
-    ):
-        self.ui = ui
-        self.apidescriptor = apidescriptor
-
-        if repourl.endswith(b'/'):
-            repourl = repourl[:-1]
-
-        self._url = repourl
-        self._apipath = apipath
-        self._apiurl = b'%s/%s' % (repourl, apipath)
-        self._opener = opener
-        self._requestbuilder = requestbuilder
-
-        self._redirect = wireprotov2peer.supportedredirects(ui, apidescriptor)
-
-    # Start of ipeerconnection.
-
-    def url(self):
-        return self._url
-
-    def local(self):
-        return None
-
-    def peer(self):
-        return self
-
-    def canpush(self):
-        # TODO change once implemented.
-        return False
-
-    def close(self):
-        self.ui.note(
-            _(
-                b'(sent %d HTTP requests and %d bytes; '
-                b'received %d bytes in responses)\n'
-            )
-            % (
-                self._opener.requestscount,
-                self._opener.sentbytescount,
-                self._opener.receivedbytescount,
-            )
-        )
-
-    # End of ipeerconnection.
-
-    # Start of ipeercapabilities.
-
-    def capable(self, name):
-        # The capabilities used internally historically map to capabilities
-        # advertised from the "capabilities" wire protocol command. However,
-        # version 2 of that command works differently.
-
-        # Maps to commands that are available.
-        if name in (
-            b'branchmap',
-            b'getbundle',
-            b'known',
-            b'lookup',
-            b'pushkey',
-        ):
-            return True
-
-        # Other concepts.
-        if name in (b'bundle2',):
-            return True
-
-        # Alias command-* to presence of command of that name.
-        if name.startswith(b'command-'):
-            return name[len(b'command-') :] in self.apidescriptor[b'commands']
-
-        return False
-
-    def requirecap(self, name, purpose):
-        if self.capable(name):
-            return
-
-        raise error.CapabilityError(
-            _(
-                b'cannot %s; client or remote repository does not support the '
-                b'\'%s\' capability'
-            )
-            % (purpose, name)
-        )
-
-    # End of ipeercapabilities.
-
-    def _call(self, name, **args):
-        with self.commandexecutor() as e:
-            return e.callcommand(name, args).result()
-
-    def commandexecutor(self):
-        return httpv2executor(
-            self.ui,
-            self._opener,
-            self._requestbuilder,
-            self._apiurl,
-            self.apidescriptor,
-            self._redirect,
-        )
-
-
-# Registry of API service names to metadata about peers that handle it.
-#
-# The following keys are meaningful:
-#
-# init
-#    Callable receiving (ui, repourl, servicepath, opener, requestbuilder,
-#                        apidescriptor) to create a peer.
-#
-# priority
-#    Integer priority for the service. If we could choose from multiple
-#    services, we choose the one with the highest priority.
-API_PEERS = {
-    wireprototypes.HTTP_WIREPROTO_V2: {
-        b'init': httpv2peer,
-        b'priority': 50,
-    },
-}
-
-
 def performhandshake(ui, url, opener, requestbuilder):
     # The handshake is a request to the capabilities command.
 
@@ -963,28 +562,6 @@
 
     args = {}
 
-    # The client advertises support for newer protocols by adding an
-    # X-HgUpgrade-* header with a list of supported APIs and an
-    # X-HgProto-* header advertising which serializing formats it supports.
-    # We only support the HTTP version 2 transport and CBOR responses for
-    # now.
-    advertisev2 = ui.configbool(b'experimental', b'httppeer.advertise-v2')
-
-    if advertisev2:
-        args[b'headers'] = {
-            'X-HgProto-1': 'cbor',
-        }
-
-        args[b'headers'].update(
-            encodevalueinheaders(
-                b' '.join(sorted(API_PEERS)),
-                b'X-HgUpgrade',
-                # We don't know the header limit this early.
-                # So make it small.
-                1024,
-            )
-        )
-
     req, requrl, qs = makev1commandrequest(
         ui, requestbuilder, caps, capable, url, b'capabilities', args
     )
@@ -1004,7 +581,7 @@
     # redirect that drops the query string to "just work."
     try:
         respurl, ct, resp = parsev1commandresponse(
-            ui, url, requrl, qs, resp, compressible=False, allowcbor=advertisev2
+            ui, url, requrl, qs, resp, compressible=False
         )
     except RedirectedRepoError as e:
         req, requrl, qs = makev1commandrequest(
@@ -1012,7 +589,7 @@
         )
         resp = sendrequest(ui, opener, req)
         respurl, ct, resp = parsev1commandresponse(
-            ui, url, requrl, qs, resp, compressible=False, allowcbor=advertisev2
+            ui, url, requrl, qs, resp, compressible=False
         )
 
     try:
@@ -1023,29 +600,7 @@
     if not ct.startswith(b'application/mercurial-'):
         raise error.ProgrammingError(b'unexpected content-type: %s' % ct)
 
-    if advertisev2:
-        if ct == b'application/mercurial-cbor':
-            try:
-                info = cborutil.decodeall(rawdata)[0]
-            except cborutil.CBORDecodeError:
-                raise error.Abort(
-                    _(b'error decoding CBOR from remote server'),
-                    hint=_(
-                        b'try again and consider contacting '
-                        b'the server operator'
-                    ),
-                )
-
-        # We got a legacy response. That's fine.
-        elif ct in (b'application/mercurial-0.1', b'application/mercurial-0.2'):
-            info = {b'v1capabilities': set(rawdata.split())}
-
-        else:
-            raise error.RepoError(
-                _(b'unexpected response type from server: %s') % ct
-            )
-    else:
-        info = {b'v1capabilities': set(rawdata.split())}
+    info = {b'v1capabilities': set(rawdata.split())}
 
     return respurl, info
 
@@ -1073,29 +628,6 @@
 
     respurl, info = performhandshake(ui, url, opener, requestbuilder)
 
-    # Given the intersection of APIs that both we and the server support,
-    # sort by their advertised priority and pick the first one.
-    #
-    # TODO consider making this request-based and interface driven. For
-    # example, the caller could say "I want a peer that does X." It's quite
-    # possible that not all peers would do that. Since we know the service
-    # capabilities, we could filter out services not meeting the
-    # requirements. Possibly by consulting the interfaces defined by the
-    # peer type.
-    apipeerchoices = set(info.get(b'apis', {}).keys()) & set(API_PEERS.keys())
-
-    preferredchoices = sorted(
-        apipeerchoices, key=lambda x: API_PEERS[x][b'priority'], reverse=True
-    )
-
-    for service in preferredchoices:
-        apipath = b'%s/%s' % (info[b'apibase'].rstrip(b'/'), service)
-
-        return API_PEERS[service][b'init'](
-            ui, respurl, apipath, opener, requestbuilder, info[b'apis'][service]
-        )
-
-    # Failed to construct an API peer. Fall back to legacy.
     return httppeer(
         ui, path, respurl, opener, requestbuilder, info[b'v1capabilities']
     )