util: compression APIs to support revlog compression
As part of "zstd all of the things," we need to teach revlogs to
use non-zlib compression formats. Because we're routing all compression
via the "compression manager" and "compression engine" APIs, we need to
introduction functionality there for performing revlog operations.
Ideally, revlog compression and decompression operations would be
implemented in terms of simple "compress" and "decompress" primitives.
However, there are a few considerations that make us want to have a
specialized primitive for handling revlogs:
1) Performance. Revlogs tend to do compression and especially
decompression operations in batches. Any overhead for e.g.
instantiating a "context" for performing an operation can be
noticed. For this reason, our "revlog compressor" primitive is
reusable. For zstd, we reuse the same compression "context" for
multiple operations. I've measured this to have a performance
impact versus constructing new contexts for each operation.
2) Specialization. By having a primitive dedicated to revlog use,
we can make revlog-specific choices and leave the door open for
more functionality in the future. For example, the zstd revlog
compressor may one day make use of dictionary compression.
A future patch will introduce a decompress() on the compressor
object.
The code for the zlib compressor is basically copied from
revlog.compress(). Although it doesn't handle the empty input
case, the null first byte case, and the 'u' prefix case. These
cases will continue to be handled in revlog.py once that code is
ported to use this API.
--- a/mercurial/util.py Mon Jan 02 13:00:16 2017 -0800
+++ b/mercurial/util.py Mon Jan 02 12:39:03 2017 -0800
@@ -3207,6 +3207,19 @@
"""
raise NotImplementedError()
+ def revlogcompressor(self, opts=None):
+ """Obtain an object that can be used to compress revlog entries.
+
+ The object has a ``compress(data)`` method that compresses binary
+ data. This method returns compressed binary data or ``None`` if
+ the data could not be compressed (too small, not compressible, etc).
+ The returned data should have a header uniquely identifying this
+ compression format so decompression can be routed to this engine.
+
+ The object is reusable but is not thread safe.
+ """
+ raise NotImplementedError()
+
class _zlibengine(compressionengine):
def name(self):
return 'zlib'
@@ -3241,6 +3254,41 @@
return chunkbuffer(gen())
+ class zlibrevlogcompressor(object):
+ def compress(self, data):
+ insize = len(data)
+ # Caller handles empty input case.
+ assert insize > 0
+
+ if insize < 44:
+ return None
+
+ elif insize <= 1000000:
+ compressed = zlib.compress(data)
+ if len(compressed) < insize:
+ return compressed
+ return None
+
+ # zlib makes an internal copy of the input buffer, doubling
+ # memory usage for large inputs. So do streaming compression
+ # on large inputs.
+ else:
+ z = zlib.compressobj()
+ parts = []
+ pos = 0
+ while pos < insize:
+ pos2 = pos + 2**20
+ parts.append(z.compress(data[pos:pos2]))
+ pos = pos2
+ parts.append(z.flush())
+
+ if sum(map(len, parts)) < insize:
+ return ''.join(parts)
+ return None
+
+ def revlogcompressor(self, opts=None):
+ return self.zlibrevlogcompressor()
+
compengines.register(_zlibengine())
class _bz2engine(compressionengine):
@@ -3315,6 +3363,13 @@
def decompressorreader(self, fh):
return fh
+ class nooprevlogcompressor(object):
+ def compress(self, data):
+ return None
+
+ def revlogcompressor(self, opts=None):
+ return self.nooprevlogcompressor()
+
compengines.register(_noopengine())
class _zstdengine(compressionengine):
@@ -3363,6 +3418,49 @@
dctx = zstd.ZstdDecompressor()
return chunkbuffer(dctx.read_from(fh))
+ class zstdrevlogcompressor(object):
+ def __init__(self, zstd, level=3):
+ # Writing the content size adds a few bytes to the output. However,
+ # it allows decompression to be more optimal since we can
+ # pre-allocate a buffer to hold the result.
+ self._cctx = zstd.ZstdCompressor(level=level,
+ write_content_size=True)
+ self._compinsize = zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE
+
+ def compress(self, data):
+ insize = len(data)
+ # Caller handles empty input case.
+ assert insize > 0
+
+ if insize < 50:
+ return None
+
+ elif insize <= 1000000:
+ compressed = self._cctx.compress(data)
+ if len(compressed) < insize:
+ return compressed
+ return None
+ else:
+ z = self._cctx.compressobj()
+ chunks = []
+ pos = 0
+ while pos < insize:
+ pos2 = pos + self._compinsize
+ chunk = z.compress(data[pos:pos2])
+ if chunk:
+ chunks.append(chunk)
+ pos = pos2
+ chunks.append(z.flush())
+
+ if sum(map(len, chunks)) < insize:
+ return ''.join(chunks)
+ return None
+
+ def revlogcompressor(self, opts=None):
+ opts = opts or {}
+ return self.zstdrevlogcompressor(self._module,
+ level=opts.get('level', 3))
+
compengines.register(_zstdengine())
# convenient shortcut