wireprotov2: client support for following content redirects
And with the server actually sending content redirects, it is finally
time to implement client support for following them!
When a redirect response is seen, we wait until all data for that
request has been received (it should be nearly immediate since no
data is expected to follow the redirect message). Then we use
a URL opener to make a request. We stuff that response into the
client handler and construct a new response object to track it.
When readdata() is called for servicing requests, we attempt to
read data from the first redirected response. During data reading,
data is processed similarly to as if it came from a frame payload.
The existing test for the functionality demonstrates the client
transparently following the redirect and obtaining the command
response data from an alternate URL!
There is still plenty of work to do here, including shoring up
testing. I'm not convinced things will work in the presence of
multiple redirect responses. And we don't yet implement support
for integrity verification or configuring server certificates
to validate the connection. But it's a start. And it should enable
us to start experimenting with "real" caches.
Differential Revision: https://phab.mercurial-scm.org/D4778
--- a/mercurial/httppeer.py Wed Sep 26 18:07:55 2018 -0700
+++ b/mercurial/httppeer.py Wed Sep 26 18:08:08 2018 -0700
@@ -513,7 +513,9 @@
reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
buffersends=True)
- handler = wireprotov2peer.clienthandler(ui, reactor)
+ handler = wireprotov2peer.clienthandler(ui, reactor,
+ opener=opener,
+ requestbuilder=requestbuilder)
url = '%s/%s' % (apiurl, permission)
--- a/mercurial/wireprotov2peer.py Wed Sep 26 18:07:55 2018 -0700
+++ b/mercurial/wireprotov2peer.py Wed Sep 26 18:08:08 2018 -0700
@@ -13,8 +13,12 @@
from . import (
encoding,
error,
+ pycompat,
sslutil,
+ url as urlmod,
+ util,
wireprotoframing,
+ wireprototypes,
)
from .utils import (
cborutil,
@@ -112,9 +116,10 @@
events occur.
"""
- def __init__(self, requestid, command):
+ def __init__(self, requestid, command, fromredirect=False):
self.requestid = requestid
self.command = command
+ self.fromredirect = fromredirect
# Whether all remote input related to this command has been
# received.
@@ -132,6 +137,7 @@
self._pendingevents = []
self._decoder = cborutil.bufferingdecoder()
self._seeninitial = False
+ self._redirect = None
def _oninputcomplete(self):
with self._lock:
@@ -146,10 +152,19 @@
with self._lock:
for o in self._decoder.getavailable():
- if not self._seeninitial:
+ if not self._seeninitial and not self.fromredirect:
self._handleinitial(o)
continue
+ # We should never see an object after a content redirect,
+ # as the spec says the main status object containing the
+ # content redirect is the only object in the stream. Fail
+ # if we see a misbehaving server.
+ if self._redirect:
+ raise error.Abort(_('received unexpected response data '
+ 'after content redirect; the remote is '
+ 'buggy'))
+
self._pendingevents.append(o)
self._serviceable.set()
@@ -160,7 +175,16 @@
return
elif o[b'status'] == b'redirect':
- raise error.Abort(_('redirect responses not yet supported'))
+ l = o[b'location']
+ self._redirect = wireprototypes.alternatelocationresponse(
+ url=l[b'url'],
+ mediatype=l[b'mediatype'],
+ size=l.get(b'size'),
+ fullhashes=l.get(b'fullhashes'),
+ fullhashseed=l.get(b'fullhashseed'),
+ serverdercerts=l.get(b'serverdercerts'),
+ servercadercerts=l.get(b'servercadercerts'))
+ return
atoms = [{'msg': o[b'error'][b'message']}]
if b'args' in o[b'error']:
@@ -214,13 +238,17 @@
with the higher-level peer API.
"""
- def __init__(self, ui, clientreactor):
+ def __init__(self, ui, clientreactor, opener=None,
+ requestbuilder=util.urlreq.request):
self._ui = ui
self._reactor = clientreactor
self._requests = {}
self._futures = {}
self._responses = {}
+ self._redirects = []
self._frameseof = False
+ self._opener = opener or urlmod.opener(ui)
+ self._requestbuilder = requestbuilder
def callcommand(self, command, args, f, redirect=None):
"""Register a request to call a command.
@@ -269,7 +297,12 @@
self._ui.note(_('received %r\n') % frame)
self._processframe(frame)
- if self._frameseof:
+ # Also try to read the first redirect.
+ if self._redirects:
+ if not self._processredirect(*self._redirects[0]):
+ self._redirects.pop(0)
+
+ if self._frameseof and not self._redirects:
return None
return True
@@ -318,10 +351,27 @@
# This can raise. The caller can handle it.
response._onresponsedata(meta['data'])
+ # If we got a content redirect response, we want to fetch it and
+ # expose the data as if we received it inline. But we also want to
+ # keep our internal request accounting in order. Our strategy is to
+ # basically put meaningful response handling on pause until EOS occurs
+ # and the stream accounting is in a good state. At that point, we follow
+ # the redirect and replace the response object with its data.
+
+ redirect = response._redirect
+ handlefuture = False if redirect else True
+
if meta['eos']:
response._oninputcomplete()
del self._requests[frame.requestid]
+ if redirect:
+ self._followredirect(frame.requestid, redirect)
+ return
+
+ if not handlefuture:
+ return
+
# If the command has a decoder, we wait until all input has been
# received before resolving the future. Otherwise we resolve the
# future immediately.
@@ -336,6 +386,82 @@
self._futures[frame.requestid].set_result(decoded)
del self._futures[frame.requestid]
+ def _followredirect(self, requestid, redirect):
+ """Called to initiate redirect following for a request."""
+ self._ui.note(_('(following redirect to %s)\n') % redirect.url)
+
+ # TODO handle framed responses.
+ if redirect.mediatype != b'application/mercurial-cbor':
+ raise error.Abort(_('cannot handle redirects for the %s media type')
+ % redirect.mediatype)
+
+ if redirect.fullhashes:
+ self._ui.warn(_('(support for validating hashes on content '
+ 'redirects not supported)\n'))
+
+ if redirect.serverdercerts or redirect.servercadercerts:
+ self._ui.warn(_('(support for pinning server certificates on '
+ 'content redirects not supported)\n'))
+
+ headers = {
+ r'Accept': redirect.mediatype,
+ }
+
+ req = self._requestbuilder(pycompat.strurl(redirect.url), None, headers)
+
+ try:
+ res = self._opener.open(req)
+ except util.urlerr.httperror as e:
+ if e.code == 401:
+ raise error.Abort(_('authorization failed'))
+ raise
+ except util.httplib.HTTPException as e:
+ self._ui.debug('http error requesting %s\n' % req.get_full_url())
+ self._ui.traceback()
+ raise IOError(None, e)
+
+ urlmod.wrapresponse(res)
+
+ # The existing response object is associated with frame data. Rather
+ # than try to normalize its state, just create a new object.
+ oldresponse = self._responses[requestid]
+ self._responses[requestid] = commandresponse(requestid,
+ oldresponse.command,
+ fromredirect=True)
+
+ self._redirects.append((requestid, res))
+
+ def _processredirect(self, rid, res):
+ """Called to continue processing a response from a redirect."""
+ response = self._responses[rid]
+
+ try:
+ data = res.read(32768)
+ response._onresponsedata(data)
+
+ # We're at end of stream.
+ if not data:
+ response._oninputcomplete()
+
+ if rid not in self._futures:
+ return
+
+ if response.command not in COMMAND_DECODERS:
+ self._futures[rid].set_result(response.objects())
+ del self._futures[rid]
+ elif response._inputcomplete:
+ decoded = COMMAND_DECODERS[response.command](response.objects())
+ self._futures[rid].set_result(decoded)
+ del self._futures[rid]
+
+ return bool(data)
+
+ except BaseException as e:
+ self._futures[rid].set_exception(e)
+ del self._futures[rid]
+ response._oninputcomplete()
+ return False
+
def decodebranchmap(objs):
# Response should be a single CBOR map of branch name to array of nodes.
bm = next(objs)
--- a/tests/test-wireproto-content-redirects.t Wed Sep 26 18:07:55 2018 -0700
+++ b/tests/test-wireproto-content-redirects.t Wed Sep 26 18:08:08 2018 -0700
@@ -1354,8 +1354,33 @@
s> 0\r\n
s> \r\n
received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
- abort: redirect responses not yet supported
- [255]
+ (following redirect to http://*:$HGPORT/api/simplecache/c045a581599d58608efd3d93d8129841f2af04a0) (glob)
+ s> GET /api/simplecache/c045a581599d58608efd3d93d8129841f2af04a0 HTTP/1.1\r\n
+ s> Accept-Encoding: identity\r\n
+ s> accept: application/mercurial-cbor\r\n
+ s> host: *:$HGPORT\r\n (glob)
+ s> user-agent: Mercurial debugwireproto\r\n
+ s> \r\n
+ s> makefile('rb', None)
+ s> HTTP/1.1 200 OK\r\n
+ s> Server: testing stub value\r\n
+ s> Date: $HTTP_DATE$\r\n
+ s> Content-Type: application/mercurial-cbor\r\n
+ s> Content-Length: 91\r\n
+ s> \r\n
+ s> \xa1Jtotalitems\x01\xa2DnodeT\x99/Gy\x02\x9a=\xf8\xd0fm\x00\xbb\x92OicN&AGparents\x82T\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00T\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00
+ response: gen[
+ {
+ b'totalitems': 1
+ },
+ {
+ b'node': b'\x99/Gy\x02\x9a=\xf8\xd0fm\x00\xbb\x92OicN&A',
+ b'parents': [
+ b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',
+ b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
+ ]
+ }
+ ]
$ cat error.log
$ killdaemons.py