--- a/mercurial/httppeer.py Thu Dec 30 13:25:44 2021 +0100
+++ b/mercurial/httppeer.py Tue Dec 07 16:44:22 2021 +0100
@@ -13,7 +13,6 @@
import os
import socket
import struct
-import weakref
from .i18n import _
from .pycompat import getattr
@@ -25,21 +24,9 @@
statichttprepo,
url as urlmod,
util,
- wireprotoframing,
- wireprototypes,
wireprotov1peer,
- wireprotov2peer,
- wireprotov2server,
)
-from .interfaces import (
- repository,
- util as interfaceutil,
-)
-from .utils import (
- cborutil,
- stringutil,
- urlutil,
-)
+from .utils import urlutil
httplib = util.httplib
urlerr = util.urlerr
@@ -331,9 +318,7 @@
self.respurl = respurl
-def parsev1commandresponse(
- ui, baseurl, requrl, qs, resp, compressible, allowcbor=False
-):
+def parsev1commandresponse(ui, baseurl, requrl, qs, resp, compressible):
# record the url we got redirected to
redirected = False
respurl = pycompat.bytesurl(resp.geturl())
@@ -376,17 +361,6 @@
try:
subtype = proto.split(b'-', 1)[1]
- # Unless we end up supporting CBOR in the legacy wire protocol,
- # this should ONLY be encountered for the initial capabilities
- # request during handshake.
- if subtype == b'cbor':
- if allowcbor:
- return respurl, proto, resp
- else:
- raise error.RepoError(
- _(b'unexpected CBOR response from server')
- )
-
version_info = tuple([int(n) for n in subtype.split(b'.')])
except ValueError:
raise error.RepoError(
@@ -564,85 +538,6 @@
raise exception
-def sendv2request(
- ui, opener, requestbuilder, apiurl, permission, requests, redirect
-):
- wireprotoframing.populatestreamencoders()
-
- uiencoders = ui.configlist(b'experimental', b'httppeer.v2-encoder-order')
-
- if uiencoders:
- encoders = []
-
- for encoder in uiencoders:
- if encoder not in wireprotoframing.STREAM_ENCODERS:
- ui.warn(
- _(
- b'wire protocol version 2 encoder referenced in '
- b'config (%s) is not known; ignoring\n'
- )
- % encoder
- )
- else:
- encoders.append(encoder)
-
- else:
- encoders = wireprotoframing.STREAM_ENCODERS_ORDER
-
- reactor = wireprotoframing.clientreactor(
- ui,
- hasmultiplesend=False,
- buffersends=True,
- clientcontentencoders=encoders,
- )
-
- handler = wireprotov2peer.clienthandler(
- ui, reactor, opener=opener, requestbuilder=requestbuilder
- )
-
- url = b'%s/%s' % (apiurl, permission)
-
- if len(requests) > 1:
- url += b'/multirequest'
- else:
- url += b'/%s' % requests[0][0]
-
- ui.debug(b'sending %d commands\n' % len(requests))
- for command, args, f in requests:
- ui.debug(
- b'sending command %s: %s\n'
- % (command, stringutil.pprint(args, indent=2))
- )
- assert not list(
- handler.callcommand(command, args, f, redirect=redirect)
- )
-
- # TODO stream this.
- body = b''.join(map(bytes, handler.flushcommands()))
-
- # TODO modify user-agent to reflect v2
- headers = {
- 'Accept': wireprotov2server.FRAMINGTYPE,
- 'Content-Type': wireprotov2server.FRAMINGTYPE,
- }
-
- req = requestbuilder(pycompat.strurl(url), body, headers)
- req.add_unredirected_header('Content-Length', '%d' % len(body))
-
- try:
- res = opener.open(req)
- except urlerr.httperror as e:
- if e.code == 401:
- raise error.Abort(_(b'authorization failed'))
-
- raise
- except httplib.HTTPException as e:
- ui.traceback()
- raise IOError(None, e)
-
- return handler, res
-
-
class queuedcommandfuture(pycompat.futures.Future):
"""Wraps result() on command futures to trigger submission on call."""
@@ -657,302 +552,6 @@
return self.result(timeout)
-@interfaceutil.implementer(repository.ipeercommandexecutor)
-class httpv2executor(object):
- def __init__(
- self, ui, opener, requestbuilder, apiurl, descriptor, redirect
- ):
- self._ui = ui
- self._opener = opener
- self._requestbuilder = requestbuilder
- self._apiurl = apiurl
- self._descriptor = descriptor
- self._redirect = redirect
- self._sent = False
- self._closed = False
- self._neededpermissions = set()
- self._calls = []
- self._futures = weakref.WeakSet()
- self._responseexecutor = None
- self._responsef = None
-
- def __enter__(self):
- return self
-
- def __exit__(self, exctype, excvalue, exctb):
- self.close()
-
- def callcommand(self, command, args):
- if self._sent:
- raise error.ProgrammingError(
- b'callcommand() cannot be used after commands are sent'
- )
-
- if self._closed:
- raise error.ProgrammingError(
- b'callcommand() cannot be used after close()'
- )
-
- # The service advertises which commands are available. So if we attempt
- # to call an unknown command or pass an unknown argument, we can screen
- # for this.
- if command not in self._descriptor[b'commands']:
- raise error.ProgrammingError(
- b'wire protocol command %s is not available' % command
- )
-
- cmdinfo = self._descriptor[b'commands'][command]
- unknownargs = set(args.keys()) - set(cmdinfo.get(b'args', {}))
-
- if unknownargs:
- raise error.ProgrammingError(
- b'wire protocol command %s does not accept argument: %s'
- % (command, b', '.join(sorted(unknownargs)))
- )
-
- self._neededpermissions |= set(cmdinfo[b'permissions'])
-
- # TODO we /could/ also validate types here, since the API descriptor
- # includes types...
-
- f = pycompat.futures.Future()
-
- # Monkeypatch it so result() triggers sendcommands(), otherwise result()
- # could deadlock.
- f.__class__ = queuedcommandfuture
- f._peerexecutor = self
-
- self._futures.add(f)
- self._calls.append((command, args, f))
-
- return f
-
- def sendcommands(self):
- if self._sent:
- return
-
- if not self._calls:
- return
-
- self._sent = True
-
- # Unhack any future types so caller sees a clean type and so we
- # break reference cycle.
- for f in self._futures:
- if isinstance(f, queuedcommandfuture):
- f.__class__ = pycompat.futures.Future
- f._peerexecutor = None
-
- # Mark the future as running and filter out cancelled futures.
- calls = [
- (command, args, f)
- for command, args, f in self._calls
- if f.set_running_or_notify_cancel()
- ]
-
- # Clear out references, prevent improper object usage.
- self._calls = None
-
- if not calls:
- return
-
- permissions = set(self._neededpermissions)
-
- if b'push' in permissions and b'pull' in permissions:
- permissions.remove(b'pull')
-
- if len(permissions) > 1:
- raise error.RepoError(
- _(b'cannot make request requiring multiple permissions: %s')
- % _(b', ').join(sorted(permissions))
- )
-
- permission = {
- b'push': b'rw',
- b'pull': b'ro',
- }[permissions.pop()]
-
- handler, resp = sendv2request(
- self._ui,
- self._opener,
- self._requestbuilder,
- self._apiurl,
- permission,
- calls,
- self._redirect,
- )
-
- # TODO we probably want to validate the HTTP code, media type, etc.
-
- self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
- self._responsef = self._responseexecutor.submit(
- self._handleresponse, handler, resp
- )
-
- def close(self):
- if self._closed:
- return
-
- self.sendcommands()
-
- self._closed = True
-
- if not self._responsef:
- return
-
- # TODO ^C here may not result in immediate program termination.
-
- try:
- self._responsef.result()
- finally:
- self._responseexecutor.shutdown(wait=True)
- self._responsef = None
- self._responseexecutor = None
-
- # If any of our futures are still in progress, mark them as
- # errored, otherwise a result() could wait indefinitely.
- for f in self._futures:
- if not f.done():
- f.set_exception(
- error.ResponseError(_(b'unfulfilled command response'))
- )
-
- self._futures = None
-
- def _handleresponse(self, handler, resp):
- # Called in a thread to read the response.
-
- while handler.readdata(resp):
- pass
-
-
-@interfaceutil.implementer(repository.ipeerv2)
-class httpv2peer(object):
-
- limitedarguments = False
-
- def __init__(
- self, ui, repourl, apipath, opener, requestbuilder, apidescriptor
- ):
- self.ui = ui
- self.apidescriptor = apidescriptor
-
- if repourl.endswith(b'/'):
- repourl = repourl[:-1]
-
- self._url = repourl
- self._apipath = apipath
- self._apiurl = b'%s/%s' % (repourl, apipath)
- self._opener = opener
- self._requestbuilder = requestbuilder
-
- self._redirect = wireprotov2peer.supportedredirects(ui, apidescriptor)
-
- # Start of ipeerconnection.
-
- def url(self):
- return self._url
-
- def local(self):
- return None
-
- def peer(self):
- return self
-
- def canpush(self):
- # TODO change once implemented.
- return False
-
- def close(self):
- self.ui.note(
- _(
- b'(sent %d HTTP requests and %d bytes; '
- b'received %d bytes in responses)\n'
- )
- % (
- self._opener.requestscount,
- self._opener.sentbytescount,
- self._opener.receivedbytescount,
- )
- )
-
- # End of ipeerconnection.
-
- # Start of ipeercapabilities.
-
- def capable(self, name):
- # The capabilities used internally historically map to capabilities
- # advertised from the "capabilities" wire protocol command. However,
- # version 2 of that command works differently.
-
- # Maps to commands that are available.
- if name in (
- b'branchmap',
- b'getbundle',
- b'known',
- b'lookup',
- b'pushkey',
- ):
- return True
-
- # Other concepts.
- if name in (b'bundle2',):
- return True
-
- # Alias command-* to presence of command of that name.
- if name.startswith(b'command-'):
- return name[len(b'command-') :] in self.apidescriptor[b'commands']
-
- return False
-
- def requirecap(self, name, purpose):
- if self.capable(name):
- return
-
- raise error.CapabilityError(
- _(
- b'cannot %s; client or remote repository does not support the '
- b'\'%s\' capability'
- )
- % (purpose, name)
- )
-
- # End of ipeercapabilities.
-
- def _call(self, name, **args):
- with self.commandexecutor() as e:
- return e.callcommand(name, args).result()
-
- def commandexecutor(self):
- return httpv2executor(
- self.ui,
- self._opener,
- self._requestbuilder,
- self._apiurl,
- self.apidescriptor,
- self._redirect,
- )
-
-
-# Registry of API service names to metadata about peers that handle it.
-#
-# The following keys are meaningful:
-#
-# init
-# Callable receiving (ui, repourl, servicepath, opener, requestbuilder,
-# apidescriptor) to create a peer.
-#
-# priority
-# Integer priority for the service. If we could choose from multiple
-# services, we choose the one with the highest priority.
-API_PEERS = {
- wireprototypes.HTTP_WIREPROTO_V2: {
- b'init': httpv2peer,
- b'priority': 50,
- },
-}
-
-
def performhandshake(ui, url, opener, requestbuilder):
# The handshake is a request to the capabilities command.
@@ -963,28 +562,6 @@
args = {}
- # The client advertises support for newer protocols by adding an
- # X-HgUpgrade-* header with a list of supported APIs and an
- # X-HgProto-* header advertising which serializing formats it supports.
- # We only support the HTTP version 2 transport and CBOR responses for
- # now.
- advertisev2 = ui.configbool(b'experimental', b'httppeer.advertise-v2')
-
- if advertisev2:
- args[b'headers'] = {
- 'X-HgProto-1': 'cbor',
- }
-
- args[b'headers'].update(
- encodevalueinheaders(
- b' '.join(sorted(API_PEERS)),
- b'X-HgUpgrade',
- # We don't know the header limit this early.
- # So make it small.
- 1024,
- )
- )
-
req, requrl, qs = makev1commandrequest(
ui, requestbuilder, caps, capable, url, b'capabilities', args
)
@@ -1004,7 +581,7 @@
# redirect that drops the query string to "just work."
try:
respurl, ct, resp = parsev1commandresponse(
- ui, url, requrl, qs, resp, compressible=False, allowcbor=advertisev2
+ ui, url, requrl, qs, resp, compressible=False
)
except RedirectedRepoError as e:
req, requrl, qs = makev1commandrequest(
@@ -1012,7 +589,7 @@
)
resp = sendrequest(ui, opener, req)
respurl, ct, resp = parsev1commandresponse(
- ui, url, requrl, qs, resp, compressible=False, allowcbor=advertisev2
+ ui, url, requrl, qs, resp, compressible=False
)
try:
@@ -1023,29 +600,7 @@
if not ct.startswith(b'application/mercurial-'):
raise error.ProgrammingError(b'unexpected content-type: %s' % ct)
- if advertisev2:
- if ct == b'application/mercurial-cbor':
- try:
- info = cborutil.decodeall(rawdata)[0]
- except cborutil.CBORDecodeError:
- raise error.Abort(
- _(b'error decoding CBOR from remote server'),
- hint=_(
- b'try again and consider contacting '
- b'the server operator'
- ),
- )
-
- # We got a legacy response. That's fine.
- elif ct in (b'application/mercurial-0.1', b'application/mercurial-0.2'):
- info = {b'v1capabilities': set(rawdata.split())}
-
- else:
- raise error.RepoError(
- _(b'unexpected response type from server: %s') % ct
- )
- else:
- info = {b'v1capabilities': set(rawdata.split())}
+ info = {b'v1capabilities': set(rawdata.split())}
return respurl, info
@@ -1073,29 +628,6 @@
respurl, info = performhandshake(ui, url, opener, requestbuilder)
- # Given the intersection of APIs that both we and the server support,
- # sort by their advertised priority and pick the first one.
- #
- # TODO consider making this request-based and interface driven. For
- # example, the caller could say "I want a peer that does X." It's quite
- # possible that not all peers would do that. Since we know the service
- # capabilities, we could filter out services not meeting the
- # requirements. Possibly by consulting the interfaces defined by the
- # peer type.
- apipeerchoices = set(info.get(b'apis', {}).keys()) & set(API_PEERS.keys())
-
- preferredchoices = sorted(
- apipeerchoices, key=lambda x: API_PEERS[x][b'priority'], reverse=True
- )
-
- for service in preferredchoices:
- apipath = b'%s/%s' % (info[b'apibase'].rstrip(b'/'), service)
-
- return API_PEERS[service][b'init'](
- ui, respurl, apipath, opener, requestbuilder, info[b'apis'][service]
- )
-
- # Failed to construct an API peer. Fall back to legacy.
return httppeer(
ui, path, respurl, opener, requestbuilder, info[b'v1capabilities']
)