util: compression APIs to support revlog compression
authorGregory Szorc <gregory.szorc@gmail.com>
Mon, 02 Jan 2017 12:39:03 -0800
changeset 30794 31e1f0d4ab44
parent 30793 b6f455a6e4d6
child 30795 78ac56aebab6
util: compression APIs to support revlog compression As part of "zstd all of the things," we need to teach revlogs to use non-zlib compression formats. Because we're routing all compression via the "compression manager" and "compression engine" APIs, we need to introduction functionality there for performing revlog operations. Ideally, revlog compression and decompression operations would be implemented in terms of simple "compress" and "decompress" primitives. However, there are a few considerations that make us want to have a specialized primitive for handling revlogs: 1) Performance. Revlogs tend to do compression and especially decompression operations in batches. Any overhead for e.g. instantiating a "context" for performing an operation can be noticed. For this reason, our "revlog compressor" primitive is reusable. For zstd, we reuse the same compression "context" for multiple operations. I've measured this to have a performance impact versus constructing new contexts for each operation. 2) Specialization. By having a primitive dedicated to revlog use, we can make revlog-specific choices and leave the door open for more functionality in the future. For example, the zstd revlog compressor may one day make use of dictionary compression. A future patch will introduce a decompress() on the compressor object. The code for the zlib compressor is basically copied from revlog.compress(). Although it doesn't handle the empty input case, the null first byte case, and the 'u' prefix case. These cases will continue to be handled in revlog.py once that code is ported to use this API.
mercurial/util.py
--- a/mercurial/util.py	Mon Jan 02 13:00:16 2017 -0800
+++ b/mercurial/util.py	Mon Jan 02 12:39:03 2017 -0800
@@ -3207,6 +3207,19 @@
         """
         raise NotImplementedError()
 
+    def revlogcompressor(self, opts=None):
+        """Obtain an object that can be used to compress revlog entries.
+
+        The object has a ``compress(data)`` method that compresses binary
+        data. This method returns compressed binary data or ``None`` if
+        the data could not be compressed (too small, not compressible, etc).
+        The returned data should have a header uniquely identifying this
+        compression format so decompression can be routed to this engine.
+
+        The object is reusable but is not thread safe.
+        """
+        raise NotImplementedError()
+
 class _zlibengine(compressionengine):
     def name(self):
         return 'zlib'
@@ -3241,6 +3254,41 @@
 
         return chunkbuffer(gen())
 
+    class zlibrevlogcompressor(object):
+        def compress(self, data):
+            insize = len(data)
+            # Caller handles empty input case.
+            assert insize > 0
+
+            if insize < 44:
+                return None
+
+            elif insize <= 1000000:
+                compressed = zlib.compress(data)
+                if len(compressed) < insize:
+                    return compressed
+                return None
+
+            # zlib makes an internal copy of the input buffer, doubling
+            # memory usage for large inputs. So do streaming compression
+            # on large inputs.
+            else:
+                z = zlib.compressobj()
+                parts = []
+                pos = 0
+                while pos < insize:
+                    pos2 = pos + 2**20
+                    parts.append(z.compress(data[pos:pos2]))
+                    pos = pos2
+                parts.append(z.flush())
+
+                if sum(map(len, parts)) < insize:
+                    return ''.join(parts)
+                return None
+
+    def revlogcompressor(self, opts=None):
+        return self.zlibrevlogcompressor()
+
 compengines.register(_zlibengine())
 
 class _bz2engine(compressionengine):
@@ -3315,6 +3363,13 @@
     def decompressorreader(self, fh):
         return fh
 
+    class nooprevlogcompressor(object):
+        def compress(self, data):
+            return None
+
+    def revlogcompressor(self, opts=None):
+        return self.nooprevlogcompressor()
+
 compengines.register(_noopengine())
 
 class _zstdengine(compressionengine):
@@ -3363,6 +3418,49 @@
         dctx = zstd.ZstdDecompressor()
         return chunkbuffer(dctx.read_from(fh))
 
+    class zstdrevlogcompressor(object):
+        def __init__(self, zstd, level=3):
+            # Writing the content size adds a few bytes to the output. However,
+            # it allows decompression to be more optimal since we can
+            # pre-allocate a buffer to hold the result.
+            self._cctx = zstd.ZstdCompressor(level=level,
+                                             write_content_size=True)
+            self._compinsize = zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE
+
+        def compress(self, data):
+            insize = len(data)
+            # Caller handles empty input case.
+            assert insize > 0
+
+            if insize < 50:
+                return None
+
+            elif insize <= 1000000:
+                compressed = self._cctx.compress(data)
+                if len(compressed) < insize:
+                    return compressed
+                return None
+            else:
+                z = self._cctx.compressobj()
+                chunks = []
+                pos = 0
+                while pos < insize:
+                    pos2 = pos + self._compinsize
+                    chunk = z.compress(data[pos:pos2])
+                    if chunk:
+                        chunks.append(chunk)
+                    pos = pos2
+                chunks.append(z.flush())
+
+                if sum(map(len, chunks)) < insize:
+                    return ''.join(chunks)
+                return None
+
+    def revlogcompressor(self, opts=None):
+        opts = opts or {}
+        return self.zstdrevlogcompressor(self._module,
+                                         level=opts.get('level', 3))
+
 compengines.register(_zstdengine())
 
 # convenient shortcut