changeset 35750:a39a9df7ecca

wireproto: split streamres into legacy and modern case A couple of commands currently require transmission of uncompressed frames with the old MIME type. Split this case from streamres into a new streamres_legacy class. Streamline the remaining code accordingly. Add a new flag to streamres to request uncompressed streams. This is useful for sending data that is already compressed like a pre-built bundle. Expect clients to support uncompressed data. For older clients, zlib will still be used. Differential Revision: https://phab.mercurial-scm.org/D1862
author Joerg Sonnenberger <joerg@bec.de>
date Fri, 12 Jan 2018 10:59:58 +0100
parents 3a3b59bbe7ce
children 6d65cef5b038
files hgext/largefiles/proto.py mercurial/hgweb/protocol.py mercurial/sshserver.py mercurial/wireproto.py
diffstat 4 files changed, 43 insertions(+), 37 deletions(-) [+]
line wrap: on
line diff
--- a/hgext/largefiles/proto.py	Fri Jan 19 12:33:03 2018 -0800
+++ b/hgext/largefiles/proto.py	Fri Jan 12 10:59:58 2018 +0100
@@ -75,7 +75,7 @@
         yield '%d\n' % length
         for chunk in util.filechunkiter(f):
             yield chunk
-    return wireproto.streamres(gen=generator())
+    return wireproto.streamres_legacy(gen=generator())
 
 def statlfile(repo, proto, sha):
     '''Server command for checking if a largefile is present - returns '2\n' if
--- a/mercurial/hgweb/protocol.py	Fri Jan 19 12:33:03 2018 -0800
+++ b/mercurial/hgweb/protocol.py	Fri Jan 12 10:59:58 2018 +0100
@@ -102,25 +102,20 @@
             urlreq.quote(self.req.env.get('REMOTE_HOST', '')),
             urlreq.quote(self.req.env.get('REMOTE_USER', '')))
 
-    def responsetype(self, v1compressible=False):
+    def responsetype(self, prefer_uncompressed):
         """Determine the appropriate response type and compression settings.
 
-        The ``v1compressible`` argument states whether the response with
-        application/mercurial-0.1 media types should be zlib compressed.
-
         Returns a tuple of (mediatype, compengine, engineopts).
         """
-        # For now, if it isn't compressible in the old world, it's never
-        # compressible. We can change this to send uncompressed 0.2 payloads
-        # later.
-        if not v1compressible:
-            return HGTYPE, None, None
-
         # Determine the response media type and compression engine based
         # on the request parameters.
         protocaps = decodevaluefromheaders(self.req, r'X-HgProto').split(' ')
 
         if '0.2' in protocaps:
+            # All clients are expected to support uncompressed data.
+            if prefer_uncompressed:
+                return HGTYPE2, util._noopengine(), {}
+
             # Default as defined by wire protocol spec.
             compformats = ['zlib', 'none']
             for cap in protocaps:
@@ -155,7 +150,7 @@
 def call(repo, req, cmd):
     p = webproto(req, repo.ui)
 
-    def genversion2(gen, compress, engine, engineopts):
+    def genversion2(gen, engine, engineopts):
         # application/mercurial-0.2 always sends a payload header
         # identifying the compression engine.
         name = engine.wireprotosupport().name
@@ -163,28 +158,27 @@
         yield struct.pack('B', len(name))
         yield name
 
-        if compress:
-            for chunk in engine.compressstream(gen, opts=engineopts):
-                yield chunk
-        else:
-            for chunk in gen:
-                yield chunk
+        for chunk in gen:
+            yield chunk
 
     rsp = wireproto.dispatch(repo, p, cmd)
     if isinstance(rsp, bytes):
         req.respond(HTTP_OK, HGTYPE, body=rsp)
         return []
+    elif isinstance(rsp, wireproto.streamres_legacy):
+        gen = rsp.gen
+        req.respond(HTTP_OK, HGTYPE)
+        return gen
     elif isinstance(rsp, wireproto.streamres):
         gen = rsp.gen
 
         # This code for compression should not be streamres specific. It
         # is here because we only compress streamres at the moment.
-        mediatype, engine, engineopts = p.responsetype(rsp.v1compressible)
+        mediatype, engine, engineopts = p.responsetype(rsp.prefer_uncompressed)
+        gen = engine.compressstream(gen, engineopts)
 
-        if mediatype == HGTYPE and rsp.v1compressible:
-            gen = engine.compressstream(gen, engineopts)
-        elif mediatype == HGTYPE2:
-            gen = genversion2(gen, rsp.v1compressible, engine, engineopts)
+        if mediatype == HGTYPE2:
+            gen = genversion2(gen, engine, engineopts)
 
         req.respond(HTTP_OK, mediatype)
         return gen
--- a/mercurial/sshserver.py	Fri Jan 19 12:33:03 2018 -0800
+++ b/mercurial/sshserver.py	Fri Jan 12 10:59:58 2018 +0100
@@ -105,6 +105,7 @@
     handlers = {
         str: sendresponse,
         wireproto.streamres: sendstream,
+        wireproto.streamres_legacy: sendstream,
         wireproto.pushres: sendpushresponse,
         wireproto.pusherr: sendpusherror,
         wireproto.ooberror: sendooberror,
--- a/mercurial/wireproto.py	Fri Jan 19 12:33:03 2018 -0800
+++ b/mercurial/wireproto.py	Fri Jan 12 10:59:58 2018 +0100
@@ -522,15 +522,26 @@
 
     Accepts a generator containing chunks of data to be sent to the client.
 
-    ``v1compressible`` indicates whether this data can be compressed to
-    "version 1" clients (technically: HTTP peers using
-    application/mercurial-0.1 media type). This flag should NOT be used on
-    new commands because new clients should support a more modern compression
-    mechanism.
+    ``prefer_uncompressed`` indicates that the data is expected to be
+    uncompressable and that the stream should therefore use the ``none``
+    engine.
     """
-    def __init__(self, gen=None, v1compressible=False):
+    def __init__(self, gen=None, prefer_uncompressed=False):
         self.gen = gen
-        self.v1compressible = v1compressible
+        self.prefer_uncompressed = prefer_uncompressed
+
+class streamres_legacy(object):
+    """wireproto reply: uncompressed binary stream
+
+    The call was successful and the result is a stream.
+
+    Accepts a generator containing chunks of data to be sent to the client.
+
+    Like ``streamres``, but sends an uncompressed data for "version 1" clients
+    using the application/mercurial-0.1 media type.
+    """
+    def __init__(self, gen=None):
+        self.gen = gen
 
 class pushres(object):
     """wireproto reply: success with simple integer return
@@ -802,7 +813,7 @@
                                   missingheads=repo.heads())
     cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
     gen = iter(lambda: cg.read(32768), '')
-    return streamres(gen=gen, v1compressible=True)
+    return streamres(gen=gen)
 
 @wireprotocommand('changegroupsubset', 'bases heads')
 def changegroupsubset(repo, proto, bases, heads):
@@ -812,7 +823,7 @@
                                   missingheads=heads)
     cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
     gen = iter(lambda: cg.read(32768), '')
-    return streamres(gen=gen, v1compressible=True)
+    return streamres(gen=gen)
 
 @wireprotocommand('debugwireargs', 'one two *')
 def debugwireargs(repo, proto, one, two, others):
@@ -877,8 +888,8 @@
             advargs.append(('hint', exc.hint))
         bundler.addpart(bundle2.bundlepart('error:abort',
                                            manargs, advargs))
-        return streamres(gen=bundler.getchunks(), v1compressible=True)
-    return streamres(gen=chunks, v1compressible=True)
+        return streamres(gen=bundler.getchunks())
+    return streamres(gen=chunks)
 
 @wireprotocommand('heads')
 def heads(repo, proto):
@@ -955,7 +966,7 @@
     capability with a value representing the version and flags of the repo
     it is serving. Client checks to see if it understands the format.
     '''
-    return streamres(streamclone.generatev1wireproto(repo))
+    return streamres_legacy(streamclone.generatev1wireproto(repo))
 
 @wireprotocommand('unbundle', 'heads')
 def unbundle(repo, proto, heads):
@@ -990,7 +1001,7 @@
             if util.safehasattr(r, 'addpart'):
                 # The return looks streamable, we are in the bundle2 case and
                 # should return a stream.
-                return streamres(gen=r.getchunks())
+                return streamres_legacy(gen=r.getchunks())
             return pushres(r)
 
         finally:
@@ -1054,4 +1065,4 @@
                                                manargs, advargs))
         except error.PushRaced as exc:
             bundler.newpart('error:pushraced', [('message', str(exc))])
-        return streamres(gen=bundler.getchunks())
+        return streamres_legacy(gen=bundler.getchunks())