diff mercurial/bundle2.py @ 26404:795f02a24b87

bundle2: allow compressed bundle This changeset adds support for a 'compression' parameter in bundle2 streams. When set, it controls the compression algorithm used for the payload part of the bundle2. There is currently no usage of this except in tests.
author Pierre-Yves David <pierre-yves.david@fb.com>
date Wed, 23 Sep 2015 12:56:12 -0700
parents d90c30801cdf
children d40029b4296e
line wrap: on
line diff
--- a/mercurial/bundle2.py	Mon Sep 28 15:01:20 2015 -0700
+++ b/mercurial/bundle2.py	Wed Sep 23 12:56:12 2015 -0700
@@ -479,6 +479,15 @@
         self._params = []
         self._parts = []
         self.capabilities = dict(capabilities)
+        self._compressor = util.compressors[None]()
+
+    def setcompression(self, alg):
+        """setup core part compression to <alg>"""
+        if alg is None:
+            return
+        assert not any(n.lower() == 'Compression' for n, v in self._params)
+        self.addparam('Compression', alg)
+        self._compressor = util.compressors[alg]()
 
     @property
     def nbparts(self):
@@ -530,8 +539,10 @@
         yield _pack(_fstreamparamsize, len(param))
         if param:
             yield param
+        # starting compression
         for chunk in self._getcorechunk():
-            yield chunk
+            yield self._compressor.compress(chunk)
+        yield self._compressor.flush()
 
     def _paramchunk(self):
         """return a encoded version of all stream parameters"""
@@ -633,6 +644,7 @@
     def __init__(self, ui, fp):
         """If header is specified, we do not read it out of the stream."""
         self.ui = ui
+        self._decompressor = util.decompressors[None]
         super(unbundle20, self).__init__(fp)
 
     @util.propertycache
@@ -682,6 +694,8 @@
         """yield all parts contained in the stream"""
         # make sure param have been loaded
         self.params
+        # From there, payload need to be decompressed
+        self._fp = self._decompressor(self._fp)
         indebug(self.ui, 'start extraction of bundle2 parts')
         headerblock = self._readpartheader()
         while headerblock is not None:
@@ -719,6 +733,14 @@
         return func
     return decorator
 
+@b2streamparamhandler('compression')
+def processcompression(unbundler, param, value):
+    """read compression parameter and install payload decompression"""
+    if value not in util.decompressors:
+        raise error.BundleUnknownFeatureError(params=(param,),
+                                              values=(value,))
+    unbundler._decompressor = util.decompressors[value]
+
 class bundlepart(object):
     """A bundle2 part contains application level payload