changeset 30466:2add671bf55b

wireproto: perform chunking and compression at protocol layer (API) Currently, the "streamres" response type is populated with a generator of chunks with compression possibly already applied. This puts the onus on commands to perform chunking and compression. Architecturally, I think this is the wrong place to perform this work. I think commands should say "here is the data" and the protocol layer should take care of encoding the final bytes to put on the wire. Additionally, upcoming commits will improve wire protocol support for compression. Having a central place for performing compression in the protocol transport layer will be easier than having to deal with compression at the commands layer. This commit refactors the "streamres" response type to accept either a generator or an object with "read." Additionally, the type now accepts a flag indicating whether the response is a "version 1 compressible" response. This basically identifies all commands currently performing compression. I could have used a special type for this, but a flag works just as well. The argument name foreshadows the introduction of wire protocol changes, hence the "v1." The code for chunking and compressing has been moved to the output generation function for each protocol transport. Some code has been inlined, resulting in the deletion of now unused methods.
author Gregory Szorc <gregory.szorc@gmail.com>
date Sun, 20 Nov 2016 13:50:45 -0800
parents 40a1871eea5e
children 5b0baa9f3362
files hgext/largefiles/proto.py mercurial/hgweb/protocol.py mercurial/sshserver.py mercurial/wireproto.py
diffstat 4 files changed, 34 insertions(+), 43 deletions(-) [+]
line wrap: on
line diff
--- a/hgext/largefiles/proto.py	Sun Nov 20 13:55:53 2016 -0800
+++ b/hgext/largefiles/proto.py	Sun Nov 20 13:50:45 2016 -0800
@@ -76,7 +76,7 @@
         yield '%d\n' % length
         for chunk in util.filechunkiter(f):
             yield chunk
-    return wireproto.streamres(generator())
+    return wireproto.streamres(gen=generator())
 
 def statlfile(repo, proto, sha):
     '''Server command for checking if a largefile is present - returns '2\n' if
--- a/mercurial/hgweb/protocol.py	Sun Nov 20 13:55:53 2016 -0800
+++ b/mercurial/hgweb/protocol.py	Sun Nov 20 13:50:45 2016 -0800
@@ -73,16 +73,6 @@
         self.ui.ferr, self.ui.fout = self.oldio
         return val
 
-    def groupchunks(self, fh):
-        def getchunks():
-            while True:
-                chunk = fh.read(32768)
-                if not chunk:
-                    break
-                yield chunk
-
-        return self.compresschunks(getchunks())
-
     def compresschunks(self, chunks):
         # Don't allow untrusted settings because disabling compression or
         # setting a very high compression level could lead to flooding
@@ -106,8 +96,16 @@
         req.respond(HTTP_OK, HGTYPE, body=rsp)
         return []
     elif isinstance(rsp, wireproto.streamres):
+        if rsp.reader:
+            gen = iter(lambda: rsp.reader.read(32768), '')
+        else:
+            gen = rsp.gen
+
+        if rsp.v1compressible:
+            gen = p.compresschunks(gen)
+
         req.respond(HTTP_OK, HGTYPE)
-        return rsp.gen
+        return gen
     elif isinstance(rsp, wireproto.pushres):
         val = p.restore()
         rsp = '%d\n%s' % (rsp.res, val)
--- a/mercurial/sshserver.py	Sun Nov 20 13:55:53 2016 -0800
+++ b/mercurial/sshserver.py	Sun Nov 20 13:50:45 2016 -0800
@@ -68,13 +68,6 @@
     def redirect(self):
         pass
 
-    def groupchunks(self, fh):
-        return iter(lambda: fh.read(4096), '')
-
-    def compresschunks(self, chunks):
-        for chunk in chunks:
-            yield chunk
-
     def sendresponse(self, v):
         self.fout.write("%d\n" % len(v))
         self.fout.write(v)
@@ -82,7 +75,13 @@
 
     def sendstream(self, source):
         write = self.fout.write
-        for chunk in source.gen:
+
+        if source.reader:
+            gen = iter(lambda: source.reader.read(4096), '')
+        else:
+            gen = source.gen
+
+        for chunk in gen:
             write(chunk)
         self.fout.flush()
 
--- a/mercurial/wireproto.py	Sun Nov 20 13:55:53 2016 -0800
+++ b/mercurial/wireproto.py	Sun Nov 20 13:50:45 2016 -0800
@@ -78,21 +78,6 @@
     #    """
     #    raise NotImplementedError()
 
-    def groupchunks(self, fh):
-        """Generator of chunks to send to the client.
-
-        Some protocols may have compressed the contents.
-        """
-        raise NotImplementedError()
-
-    def compresschunks(self, chunks):
-        """Generator of possible compressed chunks to send to the client.
-
-        This is like ``groupchunks()`` except it accepts a generator as
-        its argument.
-        """
-        raise NotImplementedError()
-
 class remotebatch(peer.batcher):
     '''batches the queued calls; uses as few roundtrips as possible'''
     def __init__(self, remote):
@@ -529,10 +514,19 @@
     """wireproto reply: binary stream
 
     The call was successful and the result is a stream.
-    Iterate on the `self.gen` attribute to retrieve chunks.
+
+    Accepts either a generator or an object with a ``read(size)`` method.
+
+    ``v1compressible`` indicates whether this data can be compressed to
+    "version 1" clients (technically: HTTP peers using
+    application/mercurial-0.1 media type). This flag should NOT be used on
+    new commands because new clients should support a more modern compression
+    mechanism.
     """
-    def __init__(self, gen):
+    def __init__(self, gen=None, reader=None, v1compressible=False):
         self.gen = gen
+        self.reader = reader
+        self.v1compressible = v1compressible
 
 class pushres(object):
     """wireproto reply: success with simple integer return
@@ -739,14 +733,14 @@
 def changegroup(repo, proto, roots):
     nodes = decodelist(roots)
     cg = changegroupmod.changegroup(repo, nodes, 'serve')
-    return streamres(proto.groupchunks(cg))
+    return streamres(reader=cg, v1compressible=True)
 
 @wireprotocommand('changegroupsubset', 'bases heads')
 def changegroupsubset(repo, proto, bases, heads):
     bases = decodelist(bases)
     heads = decodelist(heads)
     cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
-    return streamres(proto.groupchunks(cg))
+    return streamres(reader=cg, v1compressible=True)
 
 @wireprotocommand('debugwireargs', 'one two *')
 def debugwireargs(repo, proto, one, two, others):
@@ -781,7 +775,7 @@
             return ooberror(bundle2required)
 
     chunks = exchange.getbundlechunks(repo, 'serve', **opts)
-    return streamres(proto.compresschunks(chunks))
+    return streamres(gen=chunks, v1compressible=True)
 
 @wireprotocommand('heads')
 def heads(repo, proto):
@@ -870,7 +864,7 @@
         # LockError may be raised before the first result is yielded. Don't
         # emit output until we're sure we got the lock successfully.
         it = streamclone.generatev1wireproto(repo)
-        return streamres(getstream(it))
+        return streamres(gen=getstream(it))
     except error.LockError:
         return '2\n'
 
@@ -900,7 +894,7 @@
             if util.safehasattr(r, 'addpart'):
                 # The return looks streamable, we are in the bundle2 case and
                 # should return a stream.
-                return streamres(r.getchunks())
+                return streamres(gen=r.getchunks())
             return pushres(r)
 
         finally:
@@ -962,4 +956,4 @@
                                                manargs, advargs))
         except error.PushRaced as exc:
             bundler.newpart('error:pushraced', [('message', str(exc))])
-        return streamres(bundler.getchunks())
+        return streamres(gen=bundler.getchunks())