--- a/mercurial/httppeer.py Fri Apr 13 11:54:13 2018 -0700
+++ b/mercurial/httppeer.py Fri Apr 13 12:30:04 2018 -0700
@@ -13,7 +13,9 @@
import os
import socket
import struct
+import sys
import tempfile
+import weakref
from .i18n import _
from .thirdparty import (
@@ -31,7 +33,6 @@
statichttprepo,
url as urlmod,
util,
- wireproto,
wireprotoframing,
wireprototypes,
wireprotov1peer,
@@ -517,8 +518,262 @@
def _abort(self, exception):
raise exception
+def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests):
+ reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
+ buffersends=True)
+
+ url = '%s/%s' % (apiurl, permission)
+
+ if len(requests) > 1:
+ url += '/multirequest'
+ else:
+ url += '/%s' % requests[0][0]
+
+ # Request ID to (request, future)
+ requestmap = {}
+
+ for command, args, f in requests:
+ request, action, meta = reactor.callcommand(command, args)
+ assert action == 'noop'
+
+ requestmap[request.requestid] = (request, f)
+
+ action, meta = reactor.flushcommands()
+ assert action == 'sendframes'
+
+ # TODO stream this.
+ body = b''.join(map(bytes, meta['framegen']))
+
+ # TODO modify user-agent to reflect v2
+ headers = {
+ r'Accept': wireprotov2server.FRAMINGTYPE,
+ r'Content-Type': wireprotov2server.FRAMINGTYPE,
+ }
+
+ req = requestbuilder(pycompat.strurl(url), body, headers)
+ req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
+
+ try:
+ res = opener.open(req)
+ except urlerr.httperror as e:
+ if e.code == 401:
+ raise error.Abort(_('authorization failed'))
+
+ raise
+ except httplib.HTTPException as e:
+ ui.traceback()
+ raise IOError(None, e)
+
+ return reactor, requestmap, res
+
+class queuedcommandfuture(pycompat.futures.Future):
+ """Wraps result() on command futures to trigger submission on call."""
+
+ def result(self, timeout=None):
+ if self.done():
+ return pycompat.futures.Future.result(self, timeout)
+
+ self._peerexecutor.sendcommands()
+
+ # sendcommands() will restore the original __class__ and self.result
+ # will resolve to Future.result.
+ return self.result(timeout)
+
+@zi.implementer(repository.ipeercommandexecutor)
+class httpv2executor(object):
+ def __init__(self, ui, opener, requestbuilder, apiurl, descriptor):
+ self._ui = ui
+ self._opener = opener
+ self._requestbuilder = requestbuilder
+ self._apiurl = apiurl
+ self._descriptor = descriptor
+ self._sent = False
+ self._closed = False
+ self._neededpermissions = set()
+ self._calls = []
+ self._futures = weakref.WeakSet()
+ self._responseexecutor = None
+ self._responsef = None
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exctype, excvalue, exctb):
+ self.close()
+
+ def callcommand(self, command, args):
+ if self._sent:
+ raise error.ProgrammingError('callcommand() cannot be used after '
+ 'commands are sent')
+
+ if self._closed:
+ raise error.ProgrammingError('callcommand() cannot be used after '
+ 'close()')
+
+ # The service advertises which commands are available. So if we attempt
+ # to call an unknown command or pass an unknown argument, we can screen
+ # for this.
+ if command not in self._descriptor['commands']:
+ raise error.ProgrammingError(
+ 'wire protocol command %s is not available' % command)
+
+ cmdinfo = self._descriptor['commands'][command]
+ unknownargs = set(args.keys()) - set(cmdinfo.get('args', {}))
+
+ if unknownargs:
+ raise error.ProgrammingError(
+ 'wire protocol command %s does not accept argument: %s' % (
+ command, ', '.join(sorted(unknownargs))))
+
+ self._neededpermissions |= set(cmdinfo['permissions'])
+
+ # TODO we /could/ also validate types here, since the API descriptor
+ # includes types...
+
+ f = pycompat.futures.Future()
+
+ # Monkeypatch it so result() triggers sendcommands(), otherwise result()
+ # could deadlock.
+ f.__class__ = queuedcommandfuture
+ f._peerexecutor = self
+
+ self._futures.add(f)
+ self._calls.append((command, args, f))
+
+ return f
+
+ def sendcommands(self):
+ if self._sent:
+ return
+
+ if not self._calls:
+ return
+
+ self._sent = True
+
+ # Unhack any future types so caller sees a clean type and so we
+ # break reference cycle.
+ for f in self._futures:
+ if isinstance(f, queuedcommandfuture):
+ f.__class__ = pycompat.futures.Future
+ f._peerexecutor = None
+
+ # Mark the future as running and filter out cancelled futures.
+ calls = [(command, args, f)
+ for command, args, f in self._calls
+ if f.set_running_or_notify_cancel()]
+
+ # Clear out references, prevent improper object usage.
+ self._calls = None
+
+ if not calls:
+ return
+
+ permissions = set(self._neededpermissions)
+
+ if 'push' in permissions and 'pull' in permissions:
+ permissions.remove('pull')
+
+ if len(permissions) > 1:
+ raise error.RepoError(_('cannot make request requiring multiple '
+ 'permissions: %s') %
+ _(', ').join(sorted(permissions)))
+
+ permission = {
+ 'push': 'rw',
+ 'pull': 'ro',
+ }[permissions.pop()]
+
+ reactor, requests, resp = sendv2request(
+ self._ui, self._opener, self._requestbuilder, self._apiurl,
+ permission, calls)
+
+ # TODO we probably want to validate the HTTP code, media type, etc.
+
+ self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
+ self._responsef = self._responseexecutor.submit(self._handleresponse,
+ reactor,
+ requests,
+ resp)
+
+ def close(self):
+ if self._closed:
+ return
+
+ self.sendcommands()
+
+ self._closed = True
+
+ if not self._responsef:
+ return
+
+ try:
+ self._responsef.result()
+ finally:
+ self._responseexecutor.shutdown(wait=True)
+ self._responsef = None
+ self._responseexecutor = None
+
+ # If any of our futures are still in progress, mark them as
+ # errored, otherwise a result() could wait indefinitely.
+ for f in self._futures:
+ if not f.done():
+ f.set_exception(error.ResponseError(
+ _('unfulfilled command response')))
+
+ self._futures = None
+
+ def _handleresponse(self, reactor, requests, resp):
+ # Called in a thread to read the response.
+
+ results = {k: [] for k in requests}
+
+ while True:
+ frame = wireprotoframing.readframe(resp)
+ if frame is None:
+ break
+
+ self._ui.note(_('received %r\n') % frame)
+
+ # Guard against receiving a frame with a request ID that we
+ # didn't issue. This should never happen.
+ request, f = requests.get(frame.requestid, [None, None])
+
+ action, meta = reactor.onframerecv(frame)
+
+ if action == 'responsedata':
+ assert request.requestid == meta['request'].requestid
+
+ result = results[request.requestid]
+
+ if meta['cbor']:
+ payload = util.bytesio(meta['data'])
+
+ decoder = cbor.CBORDecoder(payload)
+ while payload.tell() + 1 < len(meta['data']):
+ try:
+ result.append(decoder.decode())
+ except Exception:
+ f.set_exception_info(*sys.exc_info()[1:])
+ continue
+ else:
+ result.append(meta['data'])
+
+ if meta['eos']:
+ f.set_result(result)
+ del results[request.requestid]
+
+ else:
+ e = error.ProgrammingError('unhandled action: %s' % action)
+
+ if f:
+ f.set_exception(e)
+ else:
+ raise e
+
# TODO implement interface for version 2 peers
-@zi.implementer(repository.ipeerconnection, repository.ipeercapabilities)
+@zi.implementer(repository.ipeerconnection, repository.ipeercapabilities,
+ repository.ipeerrequests)
class httpv2peer(object):
def __init__(self, ui, repourl, apipath, opener, requestbuilder,
apidescriptor):
@@ -529,6 +784,7 @@
self._url = repourl
self._apipath = apipath
+ self._apiurl = '%s/%s' % (repourl, apipath)
self._opener = opener
self._requestbuilder = requestbuilder
self._descriptor = apidescriptor
@@ -580,85 +836,13 @@
# End of ipeercapabilities.
- # TODO require to be part of a batched primitive, use futures.
def _call(self, name, **args):
- """Call a wire protocol command with arguments."""
-
- # Having this early has a side-effect of importing wireprotov2server,
- # which has the side-effect of ensuring commands are registered.
-
- # TODO modify user-agent to reflect v2.
- headers = {
- r'Accept': wireprotov2server.FRAMINGTYPE,
- r'Content-Type': wireprotov2server.FRAMINGTYPE,
- }
-
- # TODO permissions should come from capabilities results.
- permission = wireproto.commandsv2[name].permission
- if permission not in ('push', 'pull'):
- raise error.ProgrammingError('unknown permission type: %s' %
- permission)
-
- permission = {
- 'push': 'rw',
- 'pull': 'ro',
- }[permission]
-
- url = '%s/%s/%s/%s' % (self._url, self._apipath, permission, name)
-
- # TODO this should be part of a generic peer for the frame-based
- # protocol.
- reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
- buffersends=True)
-
- request, action, meta = reactor.callcommand(name, args)
- assert action == 'noop'
-
- action, meta = reactor.flushcommands()
- assert action == 'sendframes'
+ with self.commandexecutor() as e:
+ return e.callcommand(name, args).result()
- body = b''.join(map(bytes, meta['framegen']))
- req = self._requestbuilder(pycompat.strurl(url), body, headers)
- req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
-
- # TODO unify this code with httppeer.
- try:
- res = self._opener.open(req)
- except urlerr.httperror as e:
- if e.code == 401:
- raise error.Abort(_('authorization failed'))
-
- raise
- except httplib.HTTPException as e:
- self.ui.traceback()
- raise IOError(None, e)
-
- # TODO validate response type, wrap response to handle I/O errors.
- # TODO more robust frame receiver.
- results = []
-
- while True:
- frame = wireprotoframing.readframe(res)
- if frame is None:
- break
-
- self.ui.note(_('received %r\n') % frame)
-
- action, meta = reactor.onframerecv(frame)
-
- if action == 'responsedata':
- if meta['cbor']:
- payload = util.bytesio(meta['data'])
-
- decoder = cbor.CBORDecoder(payload)
- while payload.tell() + 1 < len(meta['data']):
- results.append(decoder.decode())
- else:
- results.append(meta['data'])
- else:
- error.ProgrammingError('unhandled action: %s' % action)
-
- return results
+ def commandexecutor(self):
+ return httpv2executor(self.ui, self._opener, self._requestbuilder,
+ self._apiurl, self._descriptor)
# Registry of API service names to metadata about peers that handle it.
#