changeset 40026:7e807b8a9e56

wireprotov2: client support for following content redirects And with the server actually sending content redirects, it is finally time to implement client support for following them! When a redirect response is seen, we wait until all data for that request has been received (it should be nearly immediate since no data is expected to follow the redirect message). Then we use a URL opener to make a request. We stuff that response into the client handler and construct a new response object to track it. When readdata() is called for servicing requests, we attempt to read data from the first redirected response. During data reading, data is processed similarly to as if it came from a frame payload. The existing test for the functionality demonstrates the client transparently following the redirect and obtaining the command response data from an alternate URL! There is still plenty of work to do here, including shoring up testing. I'm not convinced things will work in the presence of multiple redirect responses. And we don't yet implement support for integrity verification or configuring server certificates to validate the connection. But it's a start. And it should enable us to start experimenting with "real" caches. Differential Revision: https://phab.mercurial-scm.org/D4778
author Gregory Szorc <gregory.szorc@gmail.com>
date Wed, 26 Sep 2018 18:08:08 -0700
parents b099e6032f38
children 83146d176c03
files mercurial/httppeer.py mercurial/wireprotov2peer.py tests/test-wireproto-content-redirects.t
diffstat 3 files changed, 161 insertions(+), 8 deletions(-) [+]
line wrap: on
line diff
--- a/mercurial/httppeer.py	Wed Sep 26 18:07:55 2018 -0700
+++ b/mercurial/httppeer.py	Wed Sep 26 18:08:08 2018 -0700
@@ -513,7 +513,9 @@
     reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
                                              buffersends=True)
 
-    handler = wireprotov2peer.clienthandler(ui, reactor)
+    handler = wireprotov2peer.clienthandler(ui, reactor,
+                                            opener=opener,
+                                            requestbuilder=requestbuilder)
 
     url = '%s/%s' % (apiurl, permission)
 
--- a/mercurial/wireprotov2peer.py	Wed Sep 26 18:07:55 2018 -0700
+++ b/mercurial/wireprotov2peer.py	Wed Sep 26 18:08:08 2018 -0700
@@ -13,8 +13,12 @@
 from . import (
     encoding,
     error,
+    pycompat,
     sslutil,
+    url as urlmod,
+    util,
     wireprotoframing,
+    wireprototypes,
 )
 from .utils import (
     cborutil,
@@ -112,9 +116,10 @@
     events occur.
     """
 
-    def __init__(self, requestid, command):
+    def __init__(self, requestid, command, fromredirect=False):
         self.requestid = requestid
         self.command = command
+        self.fromredirect = fromredirect
 
         # Whether all remote input related to this command has been
         # received.
@@ -132,6 +137,7 @@
         self._pendingevents = []
         self._decoder = cborutil.bufferingdecoder()
         self._seeninitial = False
+        self._redirect = None
 
     def _oninputcomplete(self):
         with self._lock:
@@ -146,10 +152,19 @@
 
         with self._lock:
             for o in self._decoder.getavailable():
-                if not self._seeninitial:
+                if not self._seeninitial and not self.fromredirect:
                     self._handleinitial(o)
                     continue
 
+                # We should never see an object after a content redirect,
+                # as the spec says the main status object containing the
+                # content redirect is the only object in the stream. Fail
+                # if we see a misbehaving server.
+                if self._redirect:
+                    raise error.Abort(_('received unexpected response data '
+                                        'after content redirect; the remote is '
+                                        'buggy'))
+
                 self._pendingevents.append(o)
 
             self._serviceable.set()
@@ -160,7 +175,16 @@
             return
 
         elif o[b'status'] == b'redirect':
-            raise error.Abort(_('redirect responses not yet supported'))
+            l = o[b'location']
+            self._redirect = wireprototypes.alternatelocationresponse(
+                url=l[b'url'],
+                mediatype=l[b'mediatype'],
+                size=l.get(b'size'),
+                fullhashes=l.get(b'fullhashes'),
+                fullhashseed=l.get(b'fullhashseed'),
+                serverdercerts=l.get(b'serverdercerts'),
+                servercadercerts=l.get(b'servercadercerts'))
+            return
 
         atoms = [{'msg': o[b'error'][b'message']}]
         if b'args' in o[b'error']:
@@ -214,13 +238,17 @@
     with the higher-level peer API.
     """
 
-    def __init__(self, ui, clientreactor):
+    def __init__(self, ui, clientreactor, opener=None,
+                 requestbuilder=util.urlreq.request):
         self._ui = ui
         self._reactor = clientreactor
         self._requests = {}
         self._futures = {}
         self._responses = {}
+        self._redirects = []
         self._frameseof = False
+        self._opener = opener or urlmod.opener(ui)
+        self._requestbuilder = requestbuilder
 
     def callcommand(self, command, args, f, redirect=None):
         """Register a request to call a command.
@@ -269,7 +297,12 @@
                 self._ui.note(_('received %r\n') % frame)
                 self._processframe(frame)
 
-        if self._frameseof:
+        # Also try to read the first redirect.
+        if self._redirects:
+            if not self._processredirect(*self._redirects[0]):
+                self._redirects.pop(0)
+
+        if self._frameseof and not self._redirects:
             return None
 
         return True
@@ -318,10 +351,27 @@
         # This can raise. The caller can handle it.
         response._onresponsedata(meta['data'])
 
+        # If we got a content redirect response, we want to fetch it and
+        # expose the data as if we received it inline. But we also want to
+        # keep our internal request accounting in order. Our strategy is to
+        # basically put meaningful response handling on pause until EOS occurs
+        # and the stream accounting is in a good state. At that point, we follow
+        # the redirect and replace the response object with its data.
+
+        redirect = response._redirect
+        handlefuture = False if redirect else True
+
         if meta['eos']:
             response._oninputcomplete()
             del self._requests[frame.requestid]
 
+            if redirect:
+                self._followredirect(frame.requestid, redirect)
+                return
+
+        if not handlefuture:
+            return
+
         # If the command has a decoder, we wait until all input has been
         # received before resolving the future. Otherwise we resolve the
         # future immediately.
@@ -336,6 +386,82 @@
             self._futures[frame.requestid].set_result(decoded)
             del self._futures[frame.requestid]
 
+    def _followredirect(self, requestid, redirect):
+        """Called to initiate redirect following for a request."""
+        self._ui.note(_('(following redirect to %s)\n') % redirect.url)
+
+        # TODO handle framed responses.
+        if redirect.mediatype != b'application/mercurial-cbor':
+            raise error.Abort(_('cannot handle redirects for the %s media type')
+                              % redirect.mediatype)
+
+        if redirect.fullhashes:
+            self._ui.warn(_('(support for validating hashes on content '
+                            'redirects not supported)\n'))
+
+        if redirect.serverdercerts or redirect.servercadercerts:
+            self._ui.warn(_('(support for pinning server certificates on '
+                            'content redirects not supported)\n'))
+
+        headers = {
+            r'Accept': redirect.mediatype,
+        }
+
+        req = self._requestbuilder(pycompat.strurl(redirect.url), None, headers)
+
+        try:
+            res = self._opener.open(req)
+        except util.urlerr.httperror as e:
+            if e.code == 401:
+                raise error.Abort(_('authorization failed'))
+            raise
+        except util.httplib.HTTPException as e:
+            self._ui.debug('http error requesting %s\n' % req.get_full_url())
+            self._ui.traceback()
+            raise IOError(None, e)
+
+        urlmod.wrapresponse(res)
+
+        # The existing response object is associated with frame data. Rather
+        # than try to normalize its state, just create a new object.
+        oldresponse = self._responses[requestid]
+        self._responses[requestid] = commandresponse(requestid,
+                                                     oldresponse.command,
+                                                     fromredirect=True)
+
+        self._redirects.append((requestid, res))
+
+    def _processredirect(self, rid, res):
+        """Called to continue processing a response from a redirect."""
+        response = self._responses[rid]
+
+        try:
+            data = res.read(32768)
+            response._onresponsedata(data)
+
+            # We're at end of stream.
+            if not data:
+                response._oninputcomplete()
+
+            if rid not in self._futures:
+                return
+
+            if response.command not in COMMAND_DECODERS:
+                self._futures[rid].set_result(response.objects())
+                del self._futures[rid]
+            elif response._inputcomplete:
+                decoded = COMMAND_DECODERS[response.command](response.objects())
+                self._futures[rid].set_result(decoded)
+                del self._futures[rid]
+
+            return bool(data)
+
+        except BaseException as e:
+            self._futures[rid].set_exception(e)
+            del self._futures[rid]
+            response._oninputcomplete()
+            return False
+
 def decodebranchmap(objs):
     # Response should be a single CBOR map of branch name to array of nodes.
     bm = next(objs)
--- a/tests/test-wireproto-content-redirects.t	Wed Sep 26 18:07:55 2018 -0700
+++ b/tests/test-wireproto-content-redirects.t	Wed Sep 26 18:08:08 2018 -0700
@@ -1354,8 +1354,33 @@
   s>     0\r\n
   s>     \r\n
   received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
-  abort: redirect responses not yet supported
-  [255]
+  (following redirect to http://*:$HGPORT/api/simplecache/c045a581599d58608efd3d93d8129841f2af04a0) (glob)
+  s>     GET /api/simplecache/c045a581599d58608efd3d93d8129841f2af04a0 HTTP/1.1\r\n
+  s>     Accept-Encoding: identity\r\n
+  s>     accept: application/mercurial-cbor\r\n
+  s>     host: *:$HGPORT\r\n (glob)
+  s>     user-agent: Mercurial debugwireproto\r\n
+  s>     \r\n
+  s> makefile('rb', None)
+  s>     HTTP/1.1 200 OK\r\n
+  s>     Server: testing stub value\r\n
+  s>     Date: $HTTP_DATE$\r\n
+  s>     Content-Type: application/mercurial-cbor\r\n
+  s>     Content-Length: 91\r\n
+  s>     \r\n
+  s>     \xa1Jtotalitems\x01\xa2DnodeT\x99/Gy\x02\x9a=\xf8\xd0fm\x00\xbb\x92OicN&AGparents\x82T\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00T\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00
+  response: gen[
+    {
+      b'totalitems': 1
+    },
+    {
+      b'node': b'\x99/Gy\x02\x9a=\xf8\xd0fm\x00\xbb\x92OicN&A',
+      b'parents': [
+        b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',
+        b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
+      ]
+    }
+  ]
 
   $ cat error.log
   $ killdaemons.py