Mercurial > hg
changeset 30798:f50c0db50025
util: compression APIs to support revlog decompression
Previously, compression engines had APIs for performing revlog
compression but no mechanism to perform revlog decompression. This
patch changes that.
Revlog decompression is slightly more complicated than compression
because in the compression case there is (currently) only a single
engine that can be used at a time. However for decompression, a
revlog could contain chunks from multiple compression engines. This
means decompression needs to map to multiple engines and
decompressors. This functionality is outside the scope of this patch.
But it drives the decision for engines to declare a byte header
sequence that identifies revlog data as belonging to an engine and
an API for obtaining an engine from a revlog header.
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Mon, 02 Jan 2017 13:27:20 -0800 |
parents | 0bde7372e4c0 |
children | 0b49449a01f4 |
files | mercurial/util.py |
diffstat | 1 files changed, 74 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- a/mercurial/util.py Sun Jan 08 10:08:29 2017 +0800 +++ b/mercurial/util.py Mon Jan 02 13:27:20 2017 -0800 @@ -3000,6 +3000,8 @@ self._bundlenames = {} # Internal bundle identifier to engine name. self._bundletypes = {} + # Revlog header to engine name. + self._revlogheaders = {} # Wire proto identifier to engine name. self._wiretypes = {} @@ -3053,6 +3055,14 @@ self._wiretypes[wiretype] = name + revlogheader = engine.revlogheader() + if revlogheader and revlogheader in self._revlogheaders: + raise error.Abort(_('revlog header %s already registered by %s') % + (revlogheader, self._revlogheaders[revlogheader])) + + if revlogheader: + self._revlogheaders[revlogheader] = name + self._engines[name] = engine @property @@ -3121,6 +3131,13 @@ engine.name()) return engine + def forrevlogheader(self, header): + """Obtain a compression engine registered to a revlog header. + + Will raise KeyError if the revlog header value isn't registered. + """ + return self._engines[self._revlogheaders[header]] + compengines = compressormanager() class compressionengine(object): @@ -3186,6 +3203,16 @@ """ return None + def revlogheader(self): + """Header added to revlog chunks that identifies this engine. + + If this engine can be used to compress revlogs, this method should + return the bytes used to identify chunks compressed with this engine. + Else, the method should return ``None`` to indicate it does not + participate in revlog compression. + """ + return None + def compressstream(self, it, opts=None): """Compress an iterator of chunks. @@ -3215,6 +3242,13 @@ the data could not be compressed (too small, not compressible, etc). The returned data should have a header uniquely identifying this compression format so decompression can be routed to this engine. + This header should be identified by the ``revlogheader()`` return + value. + + The object has a ``decompress(data)`` method that decompresses + data. The method will only be called if ``data`` begins with + ``revlogheader()``. The method should return the raw, uncompressed + data or raise a ``RevlogError``. The object is reusable but is not thread safe. """ @@ -3230,6 +3264,9 @@ def wireprotosupport(self): return compewireprotosupport('zlib', 20, 20) + def revlogheader(self): + return 'x' + def compressstream(self, it, opts=None): opts = opts or {} @@ -3286,6 +3323,13 @@ return ''.join(parts) return None + def decompress(self, data): + try: + return zlib.decompress(data) + except zlib.error as e: + raise error.RevlogError(_('revlog decompress error: %s') % + str(e)) + def revlogcompressor(self, opts=None): return self.zlibrevlogcompressor() @@ -3357,6 +3401,9 @@ def wireprotosupport(self): return compewireprotosupport('none', 0, 10) + # We don't implement revlogheader because it is handled specially + # in the revlog class. + def compressstream(self, it, opts=None): return it @@ -3397,6 +3444,9 @@ def wireprotosupport(self): return compewireprotosupport('zstd', 50, 50) + def revlogheader(self): + return '\x28' + def compressstream(self, it, opts=None): opts = opts or {} # zstd level 3 is almost always significantly faster than zlib @@ -3425,7 +3475,9 @@ # pre-allocate a buffer to hold the result. self._cctx = zstd.ZstdCompressor(level=level, write_content_size=True) + self._dctx = zstd.ZstdDecompressor() self._compinsize = zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE + self._decompinsize = zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE def compress(self, data): insize = len(data) @@ -3456,6 +3508,28 @@ return ''.join(chunks) return None + def decompress(self, data): + insize = len(data) + + try: + # This was measured to be faster than other streaming + # decompressors. + dobj = self._dctx.decompressobj() + chunks = [] + pos = 0 + while pos < insize: + pos2 = pos + self._decompinsize + chunk = dobj.decompress(data[pos:pos2]) + if chunk: + chunks.append(chunk) + pos = pos2 + # Frame should be exhausted, so no finish() API. + + return ''.join(chunks) + except Exception as e: + raise error.RevlogError(_('revlog decompress error: %s') % + str(e)) + def revlogcompressor(self, opts=None): opts = opts or {} return self.zstdrevlogcompressor(self._module,