changeset 42041:3e47d1ec9da5

util: extract compression code in `mercurial.utils.compression` The code seems large enough to be worth extracting. This is similar to what was done for various module in `mercurial/utils/`. Since None of the compression logic takes a `ui` objet, issuing deprecation warning is tricky. Luckly the logic does not seems to have many external users.
author Pierre-Yves David <pierre-yves.david@octobus.net>
date Wed, 27 Mar 2019 16:45:14 +0100
parents 02fa567f8a3c
children aaececb4b066
files mercurial/debugcommands.py mercurial/help.py mercurial/util.py mercurial/utils/compression.py mercurial/wireprotoserver.py mercurial/wireprototypes.py
diffstat 6 files changed, 769 insertions(+), 718 deletions(-) [+]
line wrap: on
line diff
--- a/mercurial/debugcommands.py	Sat Mar 30 13:13:10 2019 -0700
+++ b/mercurial/debugcommands.py	Wed Mar 27 16:45:14 2019 +0100
@@ -82,6 +82,7 @@
 )
 from .utils import (
     cborutil,
+    compression,
     dateutil,
     procutil,
     stringutil,
@@ -1299,7 +1300,8 @@
              fm.formatlist(sorted(e.name() for e in compengines
                                   if e.available()),
                            name='compengine', fmt='%s', sep=', '))
-    wirecompengines = util.compengines.supportedwireengines(util.SERVERROLE)
+    wirecompengines = compression.compengines.supportedwireengines(
+        compression.SERVERROLE)
     fm.write('compenginesserver', _('checking available compression engines '
                                     'for wire protocol (%s)\n'),
              fm.formatlist([e.name() for e in wirecompengines
--- a/mercurial/help.py	Sat Mar 30 13:13:10 2019 -0700
+++ b/mercurial/help.py	Wed Mar 27 16:45:14 2019 +0100
@@ -37,6 +37,9 @@
 from .hgweb import (
     webcommands,
 )
+from .utils import (
+    compression,
+)
 
 _exclkeywords = {
     "(ADVANCED)",
@@ -428,7 +431,7 @@
     addtopichook(topic, add)
 
 addtopicsymbols('bundlespec', '.. bundlecompressionmarker',
-                util.bundlecompressiontopics())
+                compression.bundlecompressiontopics())
 addtopicsymbols('filesets', '.. predicatesmarker', fileset.symbols)
 addtopicsymbols('merge-tools', '.. internaltoolsmarker',
                 filemerge.internalsdoc)
--- a/mercurial/util.py	Sat Mar 30 13:13:10 2019 -0700
+++ b/mercurial/util.py	Wed Mar 27 16:45:14 2019 +0100
@@ -16,7 +16,6 @@
 from __future__ import absolute_import, print_function
 
 import abc
-import bz2
 import collections
 import contextlib
 import errno
@@ -34,7 +33,6 @@
 import time
 import traceback
 import warnings
-import zlib
 
 from .thirdparty import (
     attr,
@@ -50,6 +48,7 @@
     urllibcompat,
 )
 from .utils import (
+    compression,
     procutil,
     stringutil,
 )
@@ -127,6 +126,11 @@
 unlink = platform.unlink
 username = platform.username
 
+# small compat layer
+compengines = compression.compengines
+SERVERROLE = compression.SERVERROLE
+CLIENTROLE = compression.CLIENTROLE
+
 try:
     recvfds = osutil.recvfds
 except AttributeError:
@@ -3206,714 +3210,6 @@
         yield path[:pos]
         pos = path.rfind('/', 0, pos)
 
-# compression code
-
-SERVERROLE = 'server'
-CLIENTROLE = 'client'
-
-compewireprotosupport = collections.namedtuple(r'compenginewireprotosupport',
-                                               (r'name', r'serverpriority',
-                                                r'clientpriority'))
-
-class compressormanager(object):
-    """Holds registrations of various compression engines.
-
-    This class essentially abstracts the differences between compression
-    engines to allow new compression formats to be added easily, possibly from
-    extensions.
-
-    Compressors are registered against the global instance by calling its
-    ``register()`` method.
-    """
-    def __init__(self):
-        self._engines = {}
-        # Bundle spec human name to engine name.
-        self._bundlenames = {}
-        # Internal bundle identifier to engine name.
-        self._bundletypes = {}
-        # Revlog header to engine name.
-        self._revlogheaders = {}
-        # Wire proto identifier to engine name.
-        self._wiretypes = {}
-
-    def __getitem__(self, key):
-        return self._engines[key]
-
-    def __contains__(self, key):
-        return key in self._engines
-
-    def __iter__(self):
-        return iter(self._engines.keys())
-
-    def register(self, engine):
-        """Register a compression engine with the manager.
-
-        The argument must be a ``compressionengine`` instance.
-        """
-        if not isinstance(engine, compressionengine):
-            raise ValueError(_('argument must be a compressionengine'))
-
-        name = engine.name()
-
-        if name in self._engines:
-            raise error.Abort(_('compression engine %s already registered') %
-                              name)
-
-        bundleinfo = engine.bundletype()
-        if bundleinfo:
-            bundlename, bundletype = bundleinfo
-
-            if bundlename in self._bundlenames:
-                raise error.Abort(_('bundle name %s already registered') %
-                                  bundlename)
-            if bundletype in self._bundletypes:
-                raise error.Abort(_('bundle type %s already registered by %s') %
-                                  (bundletype, self._bundletypes[bundletype]))
-
-            # No external facing name declared.
-            if bundlename:
-                self._bundlenames[bundlename] = name
-
-            self._bundletypes[bundletype] = name
-
-        wiresupport = engine.wireprotosupport()
-        if wiresupport:
-            wiretype = wiresupport.name
-            if wiretype in self._wiretypes:
-                raise error.Abort(_('wire protocol compression %s already '
-                                    'registered by %s') %
-                                  (wiretype, self._wiretypes[wiretype]))
-
-            self._wiretypes[wiretype] = name
-
-        revlogheader = engine.revlogheader()
-        if revlogheader and revlogheader in self._revlogheaders:
-            raise error.Abort(_('revlog header %s already registered by %s') %
-                              (revlogheader, self._revlogheaders[revlogheader]))
-
-        if revlogheader:
-            self._revlogheaders[revlogheader] = name
-
-        self._engines[name] = engine
-
-    @property
-    def supportedbundlenames(self):
-        return set(self._bundlenames.keys())
-
-    @property
-    def supportedbundletypes(self):
-        return set(self._bundletypes.keys())
-
-    def forbundlename(self, bundlename):
-        """Obtain a compression engine registered to a bundle name.
-
-        Will raise KeyError if the bundle type isn't registered.
-
-        Will abort if the engine is known but not available.
-        """
-        engine = self._engines[self._bundlenames[bundlename]]
-        if not engine.available():
-            raise error.Abort(_('compression engine %s could not be loaded') %
-                              engine.name())
-        return engine
-
-    def forbundletype(self, bundletype):
-        """Obtain a compression engine registered to a bundle type.
-
-        Will raise KeyError if the bundle type isn't registered.
-
-        Will abort if the engine is known but not available.
-        """
-        engine = self._engines[self._bundletypes[bundletype]]
-        if not engine.available():
-            raise error.Abort(_('compression engine %s could not be loaded') %
-                              engine.name())
-        return engine
-
-    def supportedwireengines(self, role, onlyavailable=True):
-        """Obtain compression engines that support the wire protocol.
-
-        Returns a list of engines in prioritized order, most desired first.
-
-        If ``onlyavailable`` is set, filter out engines that can't be
-        loaded.
-        """
-        assert role in (SERVERROLE, CLIENTROLE)
-
-        attr = 'serverpriority' if role == SERVERROLE else 'clientpriority'
-
-        engines = [self._engines[e] for e in self._wiretypes.values()]
-        if onlyavailable:
-            engines = [e for e in engines if e.available()]
-
-        def getkey(e):
-            # Sort first by priority, highest first. In case of tie, sort
-            # alphabetically. This is arbitrary, but ensures output is
-            # stable.
-            w = e.wireprotosupport()
-            return -1 * getattr(w, attr), w.name
-
-        return list(sorted(engines, key=getkey))
-
-    def forwiretype(self, wiretype):
-        engine = self._engines[self._wiretypes[wiretype]]
-        if not engine.available():
-            raise error.Abort(_('compression engine %s could not be loaded') %
-                              engine.name())
-        return engine
-
-    def forrevlogheader(self, header):
-        """Obtain a compression engine registered to a revlog header.
-
-        Will raise KeyError if the revlog header value isn't registered.
-        """
-        return self._engines[self._revlogheaders[header]]
-
-compengines = compressormanager()
-
-class compressionengine(object):
-    """Base class for compression engines.
-
-    Compression engines must implement the interface defined by this class.
-    """
-    def name(self):
-        """Returns the name of the compression engine.
-
-        This is the key the engine is registered under.
-
-        This method must be implemented.
-        """
-        raise NotImplementedError()
-
-    def available(self):
-        """Whether the compression engine is available.
-
-        The intent of this method is to allow optional compression engines
-        that may not be available in all installations (such as engines relying
-        on C extensions that may not be present).
-        """
-        return True
-
-    def bundletype(self):
-        """Describes bundle identifiers for this engine.
-
-        If this compression engine isn't supported for bundles, returns None.
-
-        If this engine can be used for bundles, returns a 2-tuple of strings of
-        the user-facing "bundle spec" compression name and an internal
-        identifier used to denote the compression format within bundles. To
-        exclude the name from external usage, set the first element to ``None``.
-
-        If bundle compression is supported, the class must also implement
-        ``compressstream`` and `decompressorreader``.
-
-        The docstring of this method is used in the help system to tell users
-        about this engine.
-        """
-        return None
-
-    def wireprotosupport(self):
-        """Declare support for this compression format on the wire protocol.
-
-        If this compression engine isn't supported for compressing wire
-        protocol payloads, returns None.
-
-        Otherwise, returns ``compenginewireprotosupport`` with the following
-        fields:
-
-        * String format identifier
-        * Integer priority for the server
-        * Integer priority for the client
-
-        The integer priorities are used to order the advertisement of format
-        support by server and client. The highest integer is advertised
-        first. Integers with non-positive values aren't advertised.
-
-        The priority values are somewhat arbitrary and only used for default
-        ordering. The relative order can be changed via config options.
-
-        If wire protocol compression is supported, the class must also implement
-        ``compressstream`` and ``decompressorreader``.
-        """
-        return None
-
-    def revlogheader(self):
-        """Header added to revlog chunks that identifies this engine.
-
-        If this engine can be used to compress revlogs, this method should
-        return the bytes used to identify chunks compressed with this engine.
-        Else, the method should return ``None`` to indicate it does not
-        participate in revlog compression.
-        """
-        return None
-
-    def compressstream(self, it, opts=None):
-        """Compress an iterator of chunks.
-
-        The method receives an iterator (ideally a generator) of chunks of
-        bytes to be compressed. It returns an iterator (ideally a generator)
-        of bytes of chunks representing the compressed output.
-
-        Optionally accepts an argument defining how to perform compression.
-        Each engine treats this argument differently.
-        """
-        raise NotImplementedError()
-
-    def decompressorreader(self, fh):
-        """Perform decompression on a file object.
-
-        Argument is an object with a ``read(size)`` method that returns
-        compressed data. Return value is an object with a ``read(size)`` that
-        returns uncompressed data.
-        """
-        raise NotImplementedError()
-
-    def revlogcompressor(self, opts=None):
-        """Obtain an object that can be used to compress revlog entries.
-
-        The object has a ``compress(data)`` method that compresses binary
-        data. This method returns compressed binary data or ``None`` if
-        the data could not be compressed (too small, not compressible, etc).
-        The returned data should have a header uniquely identifying this
-        compression format so decompression can be routed to this engine.
-        This header should be identified by the ``revlogheader()`` return
-        value.
-
-        The object has a ``decompress(data)`` method that decompresses
-        data. The method will only be called if ``data`` begins with
-        ``revlogheader()``. The method should return the raw, uncompressed
-        data or raise a ``StorageError``.
-
-        The object is reusable but is not thread safe.
-        """
-        raise NotImplementedError()
-
-class _CompressedStreamReader(object):
-    def __init__(self, fh):
-        if safehasattr(fh, 'unbufferedread'):
-            self._reader = fh.unbufferedread
-        else:
-            self._reader = fh.read
-        self._pending = []
-        self._pos = 0
-        self._eof = False
-
-    def _decompress(self, chunk):
-        raise NotImplementedError()
-
-    def read(self, l):
-        buf = []
-        while True:
-            while self._pending:
-                if len(self._pending[0]) > l + self._pos:
-                    newbuf = self._pending[0]
-                    buf.append(newbuf[self._pos:self._pos + l])
-                    self._pos += l
-                    return ''.join(buf)
-
-                newbuf = self._pending.pop(0)
-                if self._pos:
-                    buf.append(newbuf[self._pos:])
-                    l -= len(newbuf) - self._pos
-                else:
-                    buf.append(newbuf)
-                    l -= len(newbuf)
-                self._pos = 0
-
-            if self._eof:
-                return ''.join(buf)
-            chunk = self._reader(65536)
-            self._decompress(chunk)
-            if not chunk and not self._pending and not self._eof:
-                # No progress and no new data, bail out
-                return ''.join(buf)
-
-class _GzipCompressedStreamReader(_CompressedStreamReader):
-    def __init__(self, fh):
-        super(_GzipCompressedStreamReader, self).__init__(fh)
-        self._decompobj = zlib.decompressobj()
-    def _decompress(self, chunk):
-        newbuf = self._decompobj.decompress(chunk)
-        if newbuf:
-            self._pending.append(newbuf)
-        d = self._decompobj.copy()
-        try:
-            d.decompress('x')
-            d.flush()
-            if d.unused_data == 'x':
-                self._eof = True
-        except zlib.error:
-            pass
-
-class _BZ2CompressedStreamReader(_CompressedStreamReader):
-    def __init__(self, fh):
-        super(_BZ2CompressedStreamReader, self).__init__(fh)
-        self._decompobj = bz2.BZ2Decompressor()
-    def _decompress(self, chunk):
-        newbuf = self._decompobj.decompress(chunk)
-        if newbuf:
-            self._pending.append(newbuf)
-        try:
-            while True:
-                newbuf = self._decompobj.decompress('')
-                if newbuf:
-                    self._pending.append(newbuf)
-                else:
-                    break
-        except EOFError:
-            self._eof = True
-
-class _TruncatedBZ2CompressedStreamReader(_BZ2CompressedStreamReader):
-    def __init__(self, fh):
-        super(_TruncatedBZ2CompressedStreamReader, self).__init__(fh)
-        newbuf = self._decompobj.decompress('BZ')
-        if newbuf:
-            self._pending.append(newbuf)
-
-class _ZstdCompressedStreamReader(_CompressedStreamReader):
-    def __init__(self, fh, zstd):
-        super(_ZstdCompressedStreamReader, self).__init__(fh)
-        self._zstd = zstd
-        self._decompobj = zstd.ZstdDecompressor().decompressobj()
-    def _decompress(self, chunk):
-        newbuf = self._decompobj.decompress(chunk)
-        if newbuf:
-            self._pending.append(newbuf)
-        try:
-            while True:
-                newbuf = self._decompobj.decompress('')
-                if newbuf:
-                    self._pending.append(newbuf)
-                else:
-                    break
-        except self._zstd.ZstdError:
-            self._eof = True
-
-class _zlibengine(compressionengine):
-    def name(self):
-        return 'zlib'
-
-    def bundletype(self):
-        """zlib compression using the DEFLATE algorithm.
-
-        All Mercurial clients should support this format. The compression
-        algorithm strikes a reasonable balance between compression ratio
-        and size.
-        """
-        return 'gzip', 'GZ'
-
-    def wireprotosupport(self):
-        return compewireprotosupport('zlib', 20, 20)
-
-    def revlogheader(self):
-        return 'x'
-
-    def compressstream(self, it, opts=None):
-        opts = opts or {}
-
-        z = zlib.compressobj(opts.get('level', -1))
-        for chunk in it:
-            data = z.compress(chunk)
-            # Not all calls to compress emit data. It is cheaper to inspect
-            # here than to feed empty chunks through generator.
-            if data:
-                yield data
-
-        yield z.flush()
-
-    def decompressorreader(self, fh):
-        return _GzipCompressedStreamReader(fh)
-
-    class zlibrevlogcompressor(object):
-        def compress(self, data):
-            insize = len(data)
-            # Caller handles empty input case.
-            assert insize > 0
-
-            if insize < 44:
-                return None
-
-            elif insize <= 1000000:
-                compressed = zlib.compress(data)
-                if len(compressed) < insize:
-                    return compressed
-                return None
-
-            # zlib makes an internal copy of the input buffer, doubling
-            # memory usage for large inputs. So do streaming compression
-            # on large inputs.
-            else:
-                z = zlib.compressobj()
-                parts = []
-                pos = 0
-                while pos < insize:
-                    pos2 = pos + 2**20
-                    parts.append(z.compress(data[pos:pos2]))
-                    pos = pos2
-                parts.append(z.flush())
-
-                if sum(map(len, parts)) < insize:
-                    return ''.join(parts)
-                return None
-
-        def decompress(self, data):
-            try:
-                return zlib.decompress(data)
-            except zlib.error as e:
-                raise error.StorageError(_('revlog decompress error: %s') %
-                                         stringutil.forcebytestr(e))
-
-    def revlogcompressor(self, opts=None):
-        return self.zlibrevlogcompressor()
-
-compengines.register(_zlibengine())
-
-class _bz2engine(compressionengine):
-    def name(self):
-        return 'bz2'
-
-    def bundletype(self):
-        """An algorithm that produces smaller bundles than ``gzip``.
-
-        All Mercurial clients should support this format.
-
-        This engine will likely produce smaller bundles than ``gzip`` but
-        will be significantly slower, both during compression and
-        decompression.
-
-        If available, the ``zstd`` engine can yield similar or better
-        compression at much higher speeds.
-        """
-        return 'bzip2', 'BZ'
-
-    # We declare a protocol name but don't advertise by default because
-    # it is slow.
-    def wireprotosupport(self):
-        return compewireprotosupport('bzip2', 0, 0)
-
-    def compressstream(self, it, opts=None):
-        opts = opts or {}
-        z = bz2.BZ2Compressor(opts.get('level', 9))
-        for chunk in it:
-            data = z.compress(chunk)
-            if data:
-                yield data
-
-        yield z.flush()
-
-    def decompressorreader(self, fh):
-        return _BZ2CompressedStreamReader(fh)
-
-compengines.register(_bz2engine())
-
-class _truncatedbz2engine(compressionengine):
-    def name(self):
-        return 'bz2truncated'
-
-    def bundletype(self):
-        return None, '_truncatedBZ'
-
-    # We don't implement compressstream because it is hackily handled elsewhere.
-
-    def decompressorreader(self, fh):
-        return _TruncatedBZ2CompressedStreamReader(fh)
-
-compengines.register(_truncatedbz2engine())
-
-class _noopengine(compressionengine):
-    def name(self):
-        return 'none'
-
-    def bundletype(self):
-        """No compression is performed.
-
-        Use this compression engine to explicitly disable compression.
-        """
-        return 'none', 'UN'
-
-    # Clients always support uncompressed payloads. Servers don't because
-    # unless you are on a fast network, uncompressed payloads can easily
-    # saturate your network pipe.
-    def wireprotosupport(self):
-        return compewireprotosupport('none', 0, 10)
-
-    # We don't implement revlogheader because it is handled specially
-    # in the revlog class.
-
-    def compressstream(self, it, opts=None):
-        return it
-
-    def decompressorreader(self, fh):
-        return fh
-
-    class nooprevlogcompressor(object):
-        def compress(self, data):
-            return None
-
-    def revlogcompressor(self, opts=None):
-        return self.nooprevlogcompressor()
-
-compengines.register(_noopengine())
-
-class _zstdengine(compressionengine):
-    def name(self):
-        return 'zstd'
-
-    @propertycache
-    def _module(self):
-        # Not all installs have the zstd module available. So defer importing
-        # until first access.
-        try:
-            from . import zstd
-            # Force delayed import.
-            zstd.__version__
-            return zstd
-        except ImportError:
-            return None
-
-    def available(self):
-        return bool(self._module)
-
-    def bundletype(self):
-        """A modern compression algorithm that is fast and highly flexible.
-
-        Only supported by Mercurial 4.1 and newer clients.
-
-        With the default settings, zstd compression is both faster and yields
-        better compression than ``gzip``. It also frequently yields better
-        compression than ``bzip2`` while operating at much higher speeds.
-
-        If this engine is available and backwards compatibility is not a
-        concern, it is likely the best available engine.
-        """
-        return 'zstd', 'ZS'
-
-    def wireprotosupport(self):
-        return compewireprotosupport('zstd', 50, 50)
-
-    def revlogheader(self):
-        return '\x28'
-
-    def compressstream(self, it, opts=None):
-        opts = opts or {}
-        # zstd level 3 is almost always significantly faster than zlib
-        # while providing no worse compression. It strikes a good balance
-        # between speed and compression.
-        level = opts.get('level', 3)
-
-        zstd = self._module
-        z = zstd.ZstdCompressor(level=level).compressobj()
-        for chunk in it:
-            data = z.compress(chunk)
-            if data:
-                yield data
-
-        yield z.flush()
-
-    def decompressorreader(self, fh):
-        return _ZstdCompressedStreamReader(fh, self._module)
-
-    class zstdrevlogcompressor(object):
-        def __init__(self, zstd, level=3):
-            # TODO consider omitting frame magic to save 4 bytes.
-            # This writes content sizes into the frame header. That is
-            # extra storage. But it allows a correct size memory allocation
-            # to hold the result.
-            self._cctx = zstd.ZstdCompressor(level=level)
-            self._dctx = zstd.ZstdDecompressor()
-            self._compinsize = zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE
-            self._decompinsize = zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE
-
-        def compress(self, data):
-            insize = len(data)
-            # Caller handles empty input case.
-            assert insize > 0
-
-            if insize < 50:
-                return None
-
-            elif insize <= 1000000:
-                compressed = self._cctx.compress(data)
-                if len(compressed) < insize:
-                    return compressed
-                return None
-            else:
-                z = self._cctx.compressobj()
-                chunks = []
-                pos = 0
-                while pos < insize:
-                    pos2 = pos + self._compinsize
-                    chunk = z.compress(data[pos:pos2])
-                    if chunk:
-                        chunks.append(chunk)
-                    pos = pos2
-                chunks.append(z.flush())
-
-                if sum(map(len, chunks)) < insize:
-                    return ''.join(chunks)
-                return None
-
-        def decompress(self, data):
-            insize = len(data)
-
-            try:
-                # This was measured to be faster than other streaming
-                # decompressors.
-                dobj = self._dctx.decompressobj()
-                chunks = []
-                pos = 0
-                while pos < insize:
-                    pos2 = pos + self._decompinsize
-                    chunk = dobj.decompress(data[pos:pos2])
-                    if chunk:
-                        chunks.append(chunk)
-                    pos = pos2
-                # Frame should be exhausted, so no finish() API.
-
-                return ''.join(chunks)
-            except Exception as e:
-                raise error.StorageError(_('revlog decompress error: %s') %
-                                         stringutil.forcebytestr(e))
-
-    def revlogcompressor(self, opts=None):
-        opts = opts or {}
-        return self.zstdrevlogcompressor(self._module,
-                                         level=opts.get('level', 3))
-
-compengines.register(_zstdengine())
-
-def bundlecompressiontopics():
-    """Obtains a list of available bundle compressions for use in help."""
-    # help.makeitemsdocs() expects a dict of names to items with a .__doc__.
-    items = {}
-
-    # We need to format the docstring. So use a dummy object/type to hold it
-    # rather than mutating the original.
-    class docobject(object):
-        pass
-
-    for name in compengines:
-        engine = compengines[name]
-
-        if not engine.available():
-            continue
-
-        bt = engine.bundletype()
-        if not bt or not bt[0]:
-            continue
-
-        doc = b'``%s``\n    %s' % (bt[0], pycompat.getdoc(engine.bundletype))
-
-        value = docobject()
-        value.__doc__ = pycompat.sysstr(doc)
-        value._origdoc = engine.bundletype.__doc__
-        value._origfunc = engine.bundletype
-
-        items[bt[0]] = value
-
-    return items
-
-i18nfunctions = bundlecompressiontopics().values()
 
 # convenient shortcut
 dst = debugstacktrace
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/mercurial/utils/compression.py	Wed Mar 27 16:45:14 2019 +0100
@@ -0,0 +1,747 @@
+# compression.py - Mercurial utility functions for compression
+#
+# This software may be used and distributed according to the terms of the
+# GNU General Public License version 2 or any later version.
+
+
+from __future__ import absolute_import, print_function
+
+import bz2
+import collections
+import zlib
+
+from .. import (
+    error,
+    i18n,
+    pycompat,
+)
+from . import (
+    stringutil,
+)
+
+safehasattr = pycompat.safehasattr
+
+
+_ = i18n._
+
+# compression code
+
+SERVERROLE = 'server'
+CLIENTROLE = 'client'
+
+compewireprotosupport = collections.namedtuple(r'compenginewireprotosupport',
+                                               (r'name', r'serverpriority',
+                                                r'clientpriority'))
+
+class propertycache(object):
+    def __init__(self, func):
+        self.func = func
+        self.name = func.__name__
+    def __get__(self, obj, type=None):
+        result = self.func(obj)
+        self.cachevalue(obj, result)
+        return result
+
+    def cachevalue(self, obj, value):
+        # __dict__ assignment required to bypass __setattr__ (eg: repoview)
+        obj.__dict__[self.name] = value
+
+class compressormanager(object):
+    """Holds registrations of various compression engines.
+
+    This class essentially abstracts the differences between compression
+    engines to allow new compression formats to be added easily, possibly from
+    extensions.
+
+    Compressors are registered against the global instance by calling its
+    ``register()`` method.
+    """
+    def __init__(self):
+        self._engines = {}
+        # Bundle spec human name to engine name.
+        self._bundlenames = {}
+        # Internal bundle identifier to engine name.
+        self._bundletypes = {}
+        # Revlog header to engine name.
+        self._revlogheaders = {}
+        # Wire proto identifier to engine name.
+        self._wiretypes = {}
+
+    def __getitem__(self, key):
+        return self._engines[key]
+
+    def __contains__(self, key):
+        return key in self._engines
+
+    def __iter__(self):
+        return iter(self._engines.keys())
+
+    def register(self, engine):
+        """Register a compression engine with the manager.
+
+        The argument must be a ``compressionengine`` instance.
+        """
+        if not isinstance(engine, compressionengine):
+            raise ValueError(_('argument must be a compressionengine'))
+
+        name = engine.name()
+
+        if name in self._engines:
+            raise error.Abort(_('compression engine %s already registered') %
+                              name)
+
+        bundleinfo = engine.bundletype()
+        if bundleinfo:
+            bundlename, bundletype = bundleinfo
+
+            if bundlename in self._bundlenames:
+                raise error.Abort(_('bundle name %s already registered') %
+                                  bundlename)
+            if bundletype in self._bundletypes:
+                raise error.Abort(_('bundle type %s already registered by %s') %
+                                  (bundletype, self._bundletypes[bundletype]))
+
+            # No external facing name declared.
+            if bundlename:
+                self._bundlenames[bundlename] = name
+
+            self._bundletypes[bundletype] = name
+
+        wiresupport = engine.wireprotosupport()
+        if wiresupport:
+            wiretype = wiresupport.name
+            if wiretype in self._wiretypes:
+                raise error.Abort(_('wire protocol compression %s already '
+                                    'registered by %s') %
+                                  (wiretype, self._wiretypes[wiretype]))
+
+            self._wiretypes[wiretype] = name
+
+        revlogheader = engine.revlogheader()
+        if revlogheader and revlogheader in self._revlogheaders:
+            raise error.Abort(_('revlog header %s already registered by %s') %
+                              (revlogheader, self._revlogheaders[revlogheader]))
+
+        if revlogheader:
+            self._revlogheaders[revlogheader] = name
+
+        self._engines[name] = engine
+
+    @property
+    def supportedbundlenames(self):
+        return set(self._bundlenames.keys())
+
+    @property
+    def supportedbundletypes(self):
+        return set(self._bundletypes.keys())
+
+    def forbundlename(self, bundlename):
+        """Obtain a compression engine registered to a bundle name.
+
+        Will raise KeyError if the bundle type isn't registered.
+
+        Will abort if the engine is known but not available.
+        """
+        engine = self._engines[self._bundlenames[bundlename]]
+        if not engine.available():
+            raise error.Abort(_('compression engine %s could not be loaded') %
+                              engine.name())
+        return engine
+
+    def forbundletype(self, bundletype):
+        """Obtain a compression engine registered to a bundle type.
+
+        Will raise KeyError if the bundle type isn't registered.
+
+        Will abort if the engine is known but not available.
+        """
+        engine = self._engines[self._bundletypes[bundletype]]
+        if not engine.available():
+            raise error.Abort(_('compression engine %s could not be loaded') %
+                              engine.name())
+        return engine
+
+    def supportedwireengines(self, role, onlyavailable=True):
+        """Obtain compression engines that support the wire protocol.
+
+        Returns a list of engines in prioritized order, most desired first.
+
+        If ``onlyavailable`` is set, filter out engines that can't be
+        loaded.
+        """
+        assert role in (SERVERROLE, CLIENTROLE)
+
+        attr = 'serverpriority' if role == SERVERROLE else 'clientpriority'
+
+        engines = [self._engines[e] for e in self._wiretypes.values()]
+        if onlyavailable:
+            engines = [e for e in engines if e.available()]
+
+        def getkey(e):
+            # Sort first by priority, highest first. In case of tie, sort
+            # alphabetically. This is arbitrary, but ensures output is
+            # stable.
+            w = e.wireprotosupport()
+            return -1 * getattr(w, attr), w.name
+
+        return list(sorted(engines, key=getkey))
+
+    def forwiretype(self, wiretype):
+        engine = self._engines[self._wiretypes[wiretype]]
+        if not engine.available():
+            raise error.Abort(_('compression engine %s could not be loaded') %
+                              engine.name())
+        return engine
+
+    def forrevlogheader(self, header):
+        """Obtain a compression engine registered to a revlog header.
+
+        Will raise KeyError if the revlog header value isn't registered.
+        """
+        return self._engines[self._revlogheaders[header]]
+
+compengines = compressormanager()
+
+class compressionengine(object):
+    """Base class for compression engines.
+
+    Compression engines must implement the interface defined by this class.
+    """
+    def name(self):
+        """Returns the name of the compression engine.
+
+        This is the key the engine is registered under.
+
+        This method must be implemented.
+        """
+        raise NotImplementedError()
+
+    def available(self):
+        """Whether the compression engine is available.
+
+        The intent of this method is to allow optional compression engines
+        that may not be available in all installations (such as engines relying
+        on C extensions that may not be present).
+        """
+        return True
+
+    def bundletype(self):
+        """Describes bundle identifiers for this engine.
+
+        If this compression engine isn't supported for bundles, returns None.
+
+        If this engine can be used for bundles, returns a 2-tuple of strings of
+        the user-facing "bundle spec" compression name and an internal
+        identifier used to denote the compression format within bundles. To
+        exclude the name from external usage, set the first element to ``None``.
+
+        If bundle compression is supported, the class must also implement
+        ``compressstream`` and `decompressorreader``.
+
+        The docstring of this method is used in the help system to tell users
+        about this engine.
+        """
+        return None
+
+    def wireprotosupport(self):
+        """Declare support for this compression format on the wire protocol.
+
+        If this compression engine isn't supported for compressing wire
+        protocol payloads, returns None.
+
+        Otherwise, returns ``compenginewireprotosupport`` with the following
+        fields:
+
+        * String format identifier
+        * Integer priority for the server
+        * Integer priority for the client
+
+        The integer priorities are used to order the advertisement of format
+        support by server and client. The highest integer is advertised
+        first. Integers with non-positive values aren't advertised.
+
+        The priority values are somewhat arbitrary and only used for default
+        ordering. The relative order can be changed via config options.
+
+        If wire protocol compression is supported, the class must also implement
+        ``compressstream`` and ``decompressorreader``.
+        """
+        return None
+
+    def revlogheader(self):
+        """Header added to revlog chunks that identifies this engine.
+
+        If this engine can be used to compress revlogs, this method should
+        return the bytes used to identify chunks compressed with this engine.
+        Else, the method should return ``None`` to indicate it does not
+        participate in revlog compression.
+        """
+        return None
+
+    def compressstream(self, it, opts=None):
+        """Compress an iterator of chunks.
+
+        The method receives an iterator (ideally a generator) of chunks of
+        bytes to be compressed. It returns an iterator (ideally a generator)
+        of bytes of chunks representing the compressed output.
+
+        Optionally accepts an argument defining how to perform compression.
+        Each engine treats this argument differently.
+        """
+        raise NotImplementedError()
+
+    def decompressorreader(self, fh):
+        """Perform decompression on a file object.
+
+        Argument is an object with a ``read(size)`` method that returns
+        compressed data. Return value is an object with a ``read(size)`` that
+        returns uncompressed data.
+        """
+        raise NotImplementedError()
+
+    def revlogcompressor(self, opts=None):
+        """Obtain an object that can be used to compress revlog entries.
+
+        The object has a ``compress(data)`` method that compresses binary
+        data. This method returns compressed binary data or ``None`` if
+        the data could not be compressed (too small, not compressible, etc).
+        The returned data should have a header uniquely identifying this
+        compression format so decompression can be routed to this engine.
+        This header should be identified by the ``revlogheader()`` return
+        value.
+
+        The object has a ``decompress(data)`` method that decompresses
+        data. The method will only be called if ``data`` begins with
+        ``revlogheader()``. The method should return the raw, uncompressed
+        data or raise a ``StorageError``.
+
+        The object is reusable but is not thread safe.
+        """
+        raise NotImplementedError()
+
+class _CompressedStreamReader(object):
+    def __init__(self, fh):
+        if safehasattr(fh, 'unbufferedread'):
+            self._reader = fh.unbufferedread
+        else:
+            self._reader = fh.read
+        self._pending = []
+        self._pos = 0
+        self._eof = False
+
+    def _decompress(self, chunk):
+        raise NotImplementedError()
+
+    def read(self, l):
+        buf = []
+        while True:
+            while self._pending:
+                if len(self._pending[0]) > l + self._pos:
+                    newbuf = self._pending[0]
+                    buf.append(newbuf[self._pos:self._pos + l])
+                    self._pos += l
+                    return ''.join(buf)
+
+                newbuf = self._pending.pop(0)
+                if self._pos:
+                    buf.append(newbuf[self._pos:])
+                    l -= len(newbuf) - self._pos
+                else:
+                    buf.append(newbuf)
+                    l -= len(newbuf)
+                self._pos = 0
+
+            if self._eof:
+                return ''.join(buf)
+            chunk = self._reader(65536)
+            self._decompress(chunk)
+            if not chunk and not self._pending and not self._eof:
+                # No progress and no new data, bail out
+                return ''.join(buf)
+
+class _GzipCompressedStreamReader(_CompressedStreamReader):
+    def __init__(self, fh):
+        super(_GzipCompressedStreamReader, self).__init__(fh)
+        self._decompobj = zlib.decompressobj()
+    def _decompress(self, chunk):
+        newbuf = self._decompobj.decompress(chunk)
+        if newbuf:
+            self._pending.append(newbuf)
+        d = self._decompobj.copy()
+        try:
+            d.decompress('x')
+            d.flush()
+            if d.unused_data == 'x':
+                self._eof = True
+        except zlib.error:
+            pass
+
+class _BZ2CompressedStreamReader(_CompressedStreamReader):
+    def __init__(self, fh):
+        super(_BZ2CompressedStreamReader, self).__init__(fh)
+        self._decompobj = bz2.BZ2Decompressor()
+    def _decompress(self, chunk):
+        newbuf = self._decompobj.decompress(chunk)
+        if newbuf:
+            self._pending.append(newbuf)
+        try:
+            while True:
+                newbuf = self._decompobj.decompress('')
+                if newbuf:
+                    self._pending.append(newbuf)
+                else:
+                    break
+        except EOFError:
+            self._eof = True
+
+class _TruncatedBZ2CompressedStreamReader(_BZ2CompressedStreamReader):
+    def __init__(self, fh):
+        super(_TruncatedBZ2CompressedStreamReader, self).__init__(fh)
+        newbuf = self._decompobj.decompress('BZ')
+        if newbuf:
+            self._pending.append(newbuf)
+
+class _ZstdCompressedStreamReader(_CompressedStreamReader):
+    def __init__(self, fh, zstd):
+        super(_ZstdCompressedStreamReader, self).__init__(fh)
+        self._zstd = zstd
+        self._decompobj = zstd.ZstdDecompressor().decompressobj()
+    def _decompress(self, chunk):
+        newbuf = self._decompobj.decompress(chunk)
+        if newbuf:
+            self._pending.append(newbuf)
+        try:
+            while True:
+                newbuf = self._decompobj.decompress('')
+                if newbuf:
+                    self._pending.append(newbuf)
+                else:
+                    break
+        except self._zstd.ZstdError:
+            self._eof = True
+
+class _zlibengine(compressionengine):
+    def name(self):
+        return 'zlib'
+
+    def bundletype(self):
+        """zlib compression using the DEFLATE algorithm.
+
+        All Mercurial clients should support this format. The compression
+        algorithm strikes a reasonable balance between compression ratio
+        and size.
+        """
+        return 'gzip', 'GZ'
+
+    def wireprotosupport(self):
+        return compewireprotosupport('zlib', 20, 20)
+
+    def revlogheader(self):
+        return 'x'
+
+    def compressstream(self, it, opts=None):
+        opts = opts or {}
+
+        z = zlib.compressobj(opts.get('level', -1))
+        for chunk in it:
+            data = z.compress(chunk)
+            # Not all calls to compress emit data. It is cheaper to inspect
+            # here than to feed empty chunks through generator.
+            if data:
+                yield data
+
+        yield z.flush()
+
+    def decompressorreader(self, fh):
+        return _GzipCompressedStreamReader(fh)
+
+    class zlibrevlogcompressor(object):
+        def compress(self, data):
+            insize = len(data)
+            # Caller handles empty input case.
+            assert insize > 0
+
+            if insize < 44:
+                return None
+
+            elif insize <= 1000000:
+                compressed = zlib.compress(data)
+                if len(compressed) < insize:
+                    return compressed
+                return None
+
+            # zlib makes an internal copy of the input buffer, doubling
+            # memory usage for large inputs. So do streaming compression
+            # on large inputs.
+            else:
+                z = zlib.compressobj()
+                parts = []
+                pos = 0
+                while pos < insize:
+                    pos2 = pos + 2**20
+                    parts.append(z.compress(data[pos:pos2]))
+                    pos = pos2
+                parts.append(z.flush())
+
+                if sum(map(len, parts)) < insize:
+                    return ''.join(parts)
+                return None
+
+        def decompress(self, data):
+            try:
+                return zlib.decompress(data)
+            except zlib.error as e:
+                raise error.StorageError(_('revlog decompress error: %s') %
+                                         stringutil.forcebytestr(e))
+
+    def revlogcompressor(self, opts=None):
+        return self.zlibrevlogcompressor()
+
+compengines.register(_zlibengine())
+
+class _bz2engine(compressionengine):
+    def name(self):
+        return 'bz2'
+
+    def bundletype(self):
+        """An algorithm that produces smaller bundles than ``gzip``.
+
+        All Mercurial clients should support this format.
+
+        This engine will likely produce smaller bundles than ``gzip`` but
+        will be significantly slower, both during compression and
+        decompression.
+
+        If available, the ``zstd`` engine can yield similar or better
+        compression at much higher speeds.
+        """
+        return 'bzip2', 'BZ'
+
+    # We declare a protocol name but don't advertise by default because
+    # it is slow.
+    def wireprotosupport(self):
+        return compewireprotosupport('bzip2', 0, 0)
+
+    def compressstream(self, it, opts=None):
+        opts = opts or {}
+        z = bz2.BZ2Compressor(opts.get('level', 9))
+        for chunk in it:
+            data = z.compress(chunk)
+            if data:
+                yield data
+
+        yield z.flush()
+
+    def decompressorreader(self, fh):
+        return _BZ2CompressedStreamReader(fh)
+
+compengines.register(_bz2engine())
+
+class _truncatedbz2engine(compressionengine):
+    def name(self):
+        return 'bz2truncated'
+
+    def bundletype(self):
+        return None, '_truncatedBZ'
+
+    # We don't implement compressstream because it is hackily handled elsewhere.
+
+    def decompressorreader(self, fh):
+        return _TruncatedBZ2CompressedStreamReader(fh)
+
+compengines.register(_truncatedbz2engine())
+
+class _noopengine(compressionengine):
+    def name(self):
+        return 'none'
+
+    def bundletype(self):
+        """No compression is performed.
+
+        Use this compression engine to explicitly disable compression.
+        """
+        return 'none', 'UN'
+
+    # Clients always support uncompressed payloads. Servers don't because
+    # unless you are on a fast network, uncompressed payloads can easily
+    # saturate your network pipe.
+    def wireprotosupport(self):
+        return compewireprotosupport('none', 0, 10)
+
+    # We don't implement revlogheader because it is handled specially
+    # in the revlog class.
+
+    def compressstream(self, it, opts=None):
+        return it
+
+    def decompressorreader(self, fh):
+        return fh
+
+    class nooprevlogcompressor(object):
+        def compress(self, data):
+            return None
+
+    def revlogcompressor(self, opts=None):
+        return self.nooprevlogcompressor()
+
+compengines.register(_noopengine())
+
+class _zstdengine(compressionengine):
+    def name(self):
+        return 'zstd'
+
+    @propertycache
+    def _module(self):
+        # Not all installs have the zstd module available. So defer importing
+        # until first access.
+        try:
+            from .. import zstd
+            # Force delayed import.
+            zstd.__version__
+            return zstd
+        except ImportError:
+            return None
+
+    def available(self):
+        return bool(self._module)
+
+    def bundletype(self):
+        """A modern compression algorithm that is fast and highly flexible.
+
+        Only supported by Mercurial 4.1 and newer clients.
+
+        With the default settings, zstd compression is both faster and yields
+        better compression than ``gzip``. It also frequently yields better
+        compression than ``bzip2`` while operating at much higher speeds.
+
+        If this engine is available and backwards compatibility is not a
+        concern, it is likely the best available engine.
+        """
+        return 'zstd', 'ZS'
+
+    def wireprotosupport(self):
+        return compewireprotosupport('zstd', 50, 50)
+
+    def revlogheader(self):
+        return '\x28'
+
+    def compressstream(self, it, opts=None):
+        opts = opts or {}
+        # zstd level 3 is almost always significantly faster than zlib
+        # while providing no worse compression. It strikes a good balance
+        # between speed and compression.
+        level = opts.get('level', 3)
+
+        zstd = self._module
+        z = zstd.ZstdCompressor(level=level).compressobj()
+        for chunk in it:
+            data = z.compress(chunk)
+            if data:
+                yield data
+
+        yield z.flush()
+
+    def decompressorreader(self, fh):
+        return _ZstdCompressedStreamReader(fh, self._module)
+
+    class zstdrevlogcompressor(object):
+        def __init__(self, zstd, level=3):
+            # TODO consider omitting frame magic to save 4 bytes.
+            # This writes content sizes into the frame header. That is
+            # extra storage. But it allows a correct size memory allocation
+            # to hold the result.
+            self._cctx = zstd.ZstdCompressor(level=level)
+            self._dctx = zstd.ZstdDecompressor()
+            self._compinsize = zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE
+            self._decompinsize = zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE
+
+        def compress(self, data):
+            insize = len(data)
+            # Caller handles empty input case.
+            assert insize > 0
+
+            if insize < 50:
+                return None
+
+            elif insize <= 1000000:
+                compressed = self._cctx.compress(data)
+                if len(compressed) < insize:
+                    return compressed
+                return None
+            else:
+                z = self._cctx.compressobj()
+                chunks = []
+                pos = 0
+                while pos < insize:
+                    pos2 = pos + self._compinsize
+                    chunk = z.compress(data[pos:pos2])
+                    if chunk:
+                        chunks.append(chunk)
+                    pos = pos2
+                chunks.append(z.flush())
+
+                if sum(map(len, chunks)) < insize:
+                    return ''.join(chunks)
+                return None
+
+        def decompress(self, data):
+            insize = len(data)
+
+            try:
+                # This was measured to be faster than other streaming
+                # decompressors.
+                dobj = self._dctx.decompressobj()
+                chunks = []
+                pos = 0
+                while pos < insize:
+                    pos2 = pos + self._decompinsize
+                    chunk = dobj.decompress(data[pos:pos2])
+                    if chunk:
+                        chunks.append(chunk)
+                    pos = pos2
+                # Frame should be exhausted, so no finish() API.
+
+                return ''.join(chunks)
+            except Exception as e:
+                raise error.StorageError(_('revlog decompress error: %s') %
+                                         stringutil.forcebytestr(e))
+
+    def revlogcompressor(self, opts=None):
+        opts = opts or {}
+        return self.zstdrevlogcompressor(self._module,
+                                         level=opts.get('level', 3))
+
+compengines.register(_zstdengine())
+
+def bundlecompressiontopics():
+    """Obtains a list of available bundle compressions for use in help."""
+    # help.makeitemsdocs() expects a dict of names to items with a .__doc__.
+    items = {}
+
+    # We need to format the docstring. So use a dummy object/type to hold it
+    # rather than mutating the original.
+    class docobject(object):
+        pass
+
+    for name in compengines:
+        engine = compengines[name]
+
+        if not engine.available():
+            continue
+
+        bt = engine.bundletype()
+        if not bt or not bt[0]:
+            continue
+
+        doc = b'``%s``\n    %s' % (bt[0], pycompat.getdoc(engine.bundletype))
+
+        value = docobject()
+        value.__doc__ = pycompat.sysstr(doc)
+        value._origdoc = engine.bundletype.__doc__
+        value._origfunc = engine.bundletype
+
+        items[bt[0]] = value
+
+    return items
+
+i18nfunctions = bundlecompressiontopics().values()
--- a/mercurial/wireprotoserver.py	Sat Mar 30 13:13:10 2019 -0700
+++ b/mercurial/wireprotoserver.py	Wed Mar 27 16:45:14 2019 +0100
@@ -23,6 +23,7 @@
 )
 from .utils import (
     cborutil,
+    compression,
     interfaceutil,
 )
 
@@ -144,7 +145,7 @@
         caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
 
         compengines = wireprototypes.supportedcompengines(repo.ui,
-                                                          util.SERVERROLE)
+            compression.SERVERROLE)
         if compengines:
             comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
                                  for e in compengines)
@@ -320,11 +321,12 @@
     if '0.2' in proto.getprotocaps():
         # All clients are expected to support uncompressed data.
         if prefer_uncompressed:
-            return HGTYPE2, util._noopengine(), {}
+            return HGTYPE2, compression._noopengine(), {}
 
         # Now find an agreed upon compression format.
         compformats = wireprotov1server.clientcompressionsupport(proto)
-        for engine in wireprototypes.supportedcompengines(ui, util.SERVERROLE):
+        for engine in wireprototypes.supportedcompengines(ui,
+                compression.SERVERROLE):
             if engine.wireprotosupport().name in compformats:
                 opts = {}
                 level = ui.configint('server', '%slevel' % engine.name())
--- a/mercurial/wireprototypes.py	Sat Mar 30 13:13:10 2019 -0700
+++ b/mercurial/wireprototypes.py	Wed Mar 27 16:45:14 2019 +0100
@@ -18,6 +18,7 @@
     util,
 )
 from .utils import (
+    compression,
     interfaceutil,
 )
 
@@ -316,12 +317,12 @@
 
 def supportedcompengines(ui, role):
     """Obtain the list of supported compression engines for a request."""
-    assert role in (util.CLIENTROLE, util.SERVERROLE)
+    assert role in (compression.CLIENTROLE, compression.SERVERROLE)
 
-    compengines = util.compengines.supportedwireengines(role)
+    compengines = compression.compengines.supportedwireengines(role)
 
     # Allow config to override default list and ordering.
-    if role == util.SERVERROLE:
+    if role == compression.SERVERROLE:
         configengines = ui.configlist('server', 'compressionengines')
         config = 'server.compressionengines'
     else: