httppeer: implement command executor for version 2 peer
authorGregory Szorc <gregory.szorc@gmail.com>
Fri, 13 Apr 2018 12:30:04 -0700
changeset 37651 950294e28136
parent 37650 62ebfda864de
child 37652 fe8c6f9f2914
httppeer: implement command executor for version 2 peer Now that we have a new API for issuing commands which is compatible with wire protocol version 2, we can start using it with wire protocol version 2. This commit replaces our hacky implementation of _call() with something a bit more robust based on the new command executor interface. We now have proper support for issuing multiple commands per HTTP request. Each HTTP request maintains its own client reactor. The implementation is similar to the one in the legacy wire protocol. We use a ThreadPoolExecutor for spinning up a thread to read the HTTP response in the background. This allows responses to resolve in any order. While not implemented on the server yet, a client could use concurrent.futures.as_completed() with a collection of futures and handle responses as they arrive from the server. The return value from issued commands is still a simple list of raw or decoded CBOR data. This is still super hacky. We will want a rich data type for representing command responses. But at least this commit gets us one step closer to a proper peer implementation. Differential Revision: https://phab.mercurial-scm.org/D3297
mercurial/httppeer.py
--- a/mercurial/httppeer.py	Fri Apr 13 11:54:13 2018 -0700
+++ b/mercurial/httppeer.py	Fri Apr 13 12:30:04 2018 -0700
@@ -13,7 +13,9 @@
 import os
 import socket
 import struct
+import sys
 import tempfile
+import weakref
 
 from .i18n import _
 from .thirdparty import (
@@ -31,7 +33,6 @@
     statichttprepo,
     url as urlmod,
     util,
-    wireproto,
     wireprotoframing,
     wireprototypes,
     wireprotov1peer,
@@ -517,8 +518,262 @@
     def _abort(self, exception):
         raise exception
 
+def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests):
+    reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
+                                             buffersends=True)
+
+    url = '%s/%s' % (apiurl, permission)
+
+    if len(requests) > 1:
+        url += '/multirequest'
+    else:
+        url += '/%s' % requests[0][0]
+
+    # Request ID to (request, future)
+    requestmap = {}
+
+    for command, args, f in requests:
+        request, action, meta = reactor.callcommand(command, args)
+        assert action == 'noop'
+
+        requestmap[request.requestid] = (request, f)
+
+    action, meta = reactor.flushcommands()
+    assert action == 'sendframes'
+
+    # TODO stream this.
+    body = b''.join(map(bytes, meta['framegen']))
+
+    # TODO modify user-agent to reflect v2
+    headers = {
+        r'Accept': wireprotov2server.FRAMINGTYPE,
+        r'Content-Type': wireprotov2server.FRAMINGTYPE,
+    }
+
+    req = requestbuilder(pycompat.strurl(url), body, headers)
+    req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
+
+    try:
+        res = opener.open(req)
+    except urlerr.httperror as e:
+        if e.code == 401:
+            raise error.Abort(_('authorization failed'))
+
+        raise
+    except httplib.HTTPException as e:
+        ui.traceback()
+        raise IOError(None, e)
+
+    return reactor, requestmap, res
+
+class queuedcommandfuture(pycompat.futures.Future):
+    """Wraps result() on command futures to trigger submission on call."""
+
+    def result(self, timeout=None):
+        if self.done():
+            return pycompat.futures.Future.result(self, timeout)
+
+        self._peerexecutor.sendcommands()
+
+        # sendcommands() will restore the original __class__ and self.result
+        # will resolve to Future.result.
+        return self.result(timeout)
+
+@zi.implementer(repository.ipeercommandexecutor)
+class httpv2executor(object):
+    def __init__(self, ui, opener, requestbuilder, apiurl, descriptor):
+        self._ui = ui
+        self._opener = opener
+        self._requestbuilder = requestbuilder
+        self._apiurl = apiurl
+        self._descriptor = descriptor
+        self._sent = False
+        self._closed = False
+        self._neededpermissions = set()
+        self._calls = []
+        self._futures = weakref.WeakSet()
+        self._responseexecutor = None
+        self._responsef = None
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exctype, excvalue, exctb):
+        self.close()
+
+    def callcommand(self, command, args):
+        if self._sent:
+            raise error.ProgrammingError('callcommand() cannot be used after '
+                                         'commands are sent')
+
+        if self._closed:
+            raise error.ProgrammingError('callcommand() cannot be used after '
+                                         'close()')
+
+        # The service advertises which commands are available. So if we attempt
+        # to call an unknown command or pass an unknown argument, we can screen
+        # for this.
+        if command not in self._descriptor['commands']:
+            raise error.ProgrammingError(
+                'wire protocol command %s is not available' % command)
+
+        cmdinfo = self._descriptor['commands'][command]
+        unknownargs = set(args.keys()) - set(cmdinfo.get('args', {}))
+
+        if unknownargs:
+            raise error.ProgrammingError(
+                'wire protocol command %s does not accept argument: %s' % (
+                    command, ', '.join(sorted(unknownargs))))
+
+        self._neededpermissions |= set(cmdinfo['permissions'])
+
+        # TODO we /could/ also validate types here, since the API descriptor
+        # includes types...
+
+        f = pycompat.futures.Future()
+
+        # Monkeypatch it so result() triggers sendcommands(), otherwise result()
+        # could deadlock.
+        f.__class__ = queuedcommandfuture
+        f._peerexecutor = self
+
+        self._futures.add(f)
+        self._calls.append((command, args, f))
+
+        return f
+
+    def sendcommands(self):
+        if self._sent:
+            return
+
+        if not self._calls:
+            return
+
+        self._sent = True
+
+        # Unhack any future types so caller sees a clean type and so we
+        # break reference cycle.
+        for f in self._futures:
+            if isinstance(f, queuedcommandfuture):
+                f.__class__ = pycompat.futures.Future
+                f._peerexecutor = None
+
+        # Mark the future as running and filter out cancelled futures.
+        calls = [(command, args, f)
+                 for command, args, f in self._calls
+                 if f.set_running_or_notify_cancel()]
+
+        # Clear out references, prevent improper object usage.
+        self._calls = None
+
+        if not calls:
+            return
+
+        permissions = set(self._neededpermissions)
+
+        if 'push' in permissions and 'pull' in permissions:
+            permissions.remove('pull')
+
+        if len(permissions) > 1:
+            raise error.RepoError(_('cannot make request requiring multiple '
+                                    'permissions: %s') %
+                                  _(', ').join(sorted(permissions)))
+
+        permission = {
+            'push': 'rw',
+            'pull': 'ro',
+        }[permissions.pop()]
+
+        reactor, requests, resp = sendv2request(
+            self._ui, self._opener, self._requestbuilder, self._apiurl,
+            permission, calls)
+
+        # TODO we probably want to validate the HTTP code, media type, etc.
+
+        self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
+        self._responsef = self._responseexecutor.submit(self._handleresponse,
+                                                        reactor,
+                                                        requests,
+                                                        resp)
+
+    def close(self):
+        if self._closed:
+            return
+
+        self.sendcommands()
+
+        self._closed = True
+
+        if not self._responsef:
+            return
+
+        try:
+            self._responsef.result()
+        finally:
+            self._responseexecutor.shutdown(wait=True)
+            self._responsef = None
+            self._responseexecutor = None
+
+            # If any of our futures are still in progress, mark them as
+            # errored, otherwise a result() could wait indefinitely.
+            for f in self._futures:
+                if not f.done():
+                    f.set_exception(error.ResponseError(
+                        _('unfulfilled command response')))
+
+            self._futures = None
+
+    def _handleresponse(self, reactor, requests, resp):
+        # Called in a thread to read the response.
+
+        results = {k: [] for k in requests}
+
+        while True:
+            frame = wireprotoframing.readframe(resp)
+            if frame is None:
+                break
+
+            self._ui.note(_('received %r\n') % frame)
+
+            # Guard against receiving a frame with a request ID that we
+            # didn't issue. This should never happen.
+            request, f = requests.get(frame.requestid, [None, None])
+
+            action, meta = reactor.onframerecv(frame)
+
+            if action == 'responsedata':
+                assert request.requestid == meta['request'].requestid
+
+                result = results[request.requestid]
+
+                if meta['cbor']:
+                    payload = util.bytesio(meta['data'])
+
+                    decoder = cbor.CBORDecoder(payload)
+                    while payload.tell() + 1 < len(meta['data']):
+                        try:
+                            result.append(decoder.decode())
+                        except Exception:
+                            f.set_exception_info(*sys.exc_info()[1:])
+                            continue
+                else:
+                    result.append(meta['data'])
+
+                if meta['eos']:
+                    f.set_result(result)
+                    del results[request.requestid]
+
+            else:
+                e = error.ProgrammingError('unhandled action: %s' % action)
+
+                if f:
+                    f.set_exception(e)
+                else:
+                    raise e
+
 # TODO implement interface for version 2 peers
-@zi.implementer(repository.ipeerconnection, repository.ipeercapabilities)
+@zi.implementer(repository.ipeerconnection, repository.ipeercapabilities,
+                repository.ipeerrequests)
 class httpv2peer(object):
     def __init__(self, ui, repourl, apipath, opener, requestbuilder,
                  apidescriptor):
@@ -529,6 +784,7 @@
 
         self._url = repourl
         self._apipath = apipath
+        self._apiurl = '%s/%s' % (repourl, apipath)
         self._opener = opener
         self._requestbuilder = requestbuilder
         self._descriptor = apidescriptor
@@ -580,85 +836,13 @@
 
     # End of ipeercapabilities.
 
-    # TODO require to be part of a batched primitive, use futures.
     def _call(self, name, **args):
-        """Call a wire protocol command with arguments."""
-
-        # Having this early has a side-effect of importing wireprotov2server,
-        # which has the side-effect of ensuring commands are registered.
-
-        # TODO modify user-agent to reflect v2.
-        headers = {
-            r'Accept': wireprotov2server.FRAMINGTYPE,
-            r'Content-Type': wireprotov2server.FRAMINGTYPE,
-        }
-
-        # TODO permissions should come from capabilities results.
-        permission = wireproto.commandsv2[name].permission
-        if permission not in ('push', 'pull'):
-            raise error.ProgrammingError('unknown permission type: %s' %
-                                         permission)
-
-        permission = {
-            'push': 'rw',
-            'pull': 'ro',
-        }[permission]
-
-        url = '%s/%s/%s/%s' % (self._url, self._apipath, permission, name)
-
-        # TODO this should be part of a generic peer for the frame-based
-        # protocol.
-        reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
-                                                 buffersends=True)
-
-        request, action, meta = reactor.callcommand(name, args)
-        assert action == 'noop'
-
-        action, meta = reactor.flushcommands()
-        assert action == 'sendframes'
+        with self.commandexecutor() as e:
+            return e.callcommand(name, args).result()
 
-        body = b''.join(map(bytes, meta['framegen']))
-        req = self._requestbuilder(pycompat.strurl(url), body, headers)
-        req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
-
-        # TODO unify this code with httppeer.
-        try:
-            res = self._opener.open(req)
-        except urlerr.httperror as e:
-            if e.code == 401:
-                raise error.Abort(_('authorization failed'))
-
-            raise
-        except httplib.HTTPException as e:
-            self.ui.traceback()
-            raise IOError(None, e)
-
-        # TODO validate response type, wrap response to handle I/O errors.
-        # TODO more robust frame receiver.
-        results = []
-
-        while True:
-            frame = wireprotoframing.readframe(res)
-            if frame is None:
-                break
-
-            self.ui.note(_('received %r\n') % frame)
-
-            action, meta = reactor.onframerecv(frame)
-
-            if action == 'responsedata':
-                if meta['cbor']:
-                    payload = util.bytesio(meta['data'])
-
-                    decoder = cbor.CBORDecoder(payload)
-                    while payload.tell() + 1 < len(meta['data']):
-                        results.append(decoder.decode())
-                else:
-                    results.append(meta['data'])
-            else:
-                error.ProgrammingError('unhandled action: %s' % action)
-
-        return results
+    def commandexecutor(self):
+        return httpv2executor(self.ui, self._opener, self._requestbuilder,
+                              self._apiurl, self._descriptor)
 
 # Registry of API service names to metadata about peers that handle it.
 #