util: extract compression code in `mercurial.utils.compression`
The code seems large enough to be worth extracting. This is similar to what was
done for various module in `mercurial/utils/`.
Since None of the compression logic takes a `ui` objet, issuing deprecation
warning is tricky. Luckly the logic does not seems to have many external users.
--- a/mercurial/debugcommands.py Sat Mar 30 13:13:10 2019 -0700
+++ b/mercurial/debugcommands.py Wed Mar 27 16:45:14 2019 +0100
@@ -82,6 +82,7 @@
)
from .utils import (
cborutil,
+ compression,
dateutil,
procutil,
stringutil,
@@ -1299,7 +1300,8 @@
fm.formatlist(sorted(e.name() for e in compengines
if e.available()),
name='compengine', fmt='%s', sep=', '))
- wirecompengines = util.compengines.supportedwireengines(util.SERVERROLE)
+ wirecompengines = compression.compengines.supportedwireengines(
+ compression.SERVERROLE)
fm.write('compenginesserver', _('checking available compression engines '
'for wire protocol (%s)\n'),
fm.formatlist([e.name() for e in wirecompengines
--- a/mercurial/help.py Sat Mar 30 13:13:10 2019 -0700
+++ b/mercurial/help.py Wed Mar 27 16:45:14 2019 +0100
@@ -37,6 +37,9 @@
from .hgweb import (
webcommands,
)
+from .utils import (
+ compression,
+)
_exclkeywords = {
"(ADVANCED)",
@@ -428,7 +431,7 @@
addtopichook(topic, add)
addtopicsymbols('bundlespec', '.. bundlecompressionmarker',
- util.bundlecompressiontopics())
+ compression.bundlecompressiontopics())
addtopicsymbols('filesets', '.. predicatesmarker', fileset.symbols)
addtopicsymbols('merge-tools', '.. internaltoolsmarker',
filemerge.internalsdoc)
--- a/mercurial/util.py Sat Mar 30 13:13:10 2019 -0700
+++ b/mercurial/util.py Wed Mar 27 16:45:14 2019 +0100
@@ -16,7 +16,6 @@
from __future__ import absolute_import, print_function
import abc
-import bz2
import collections
import contextlib
import errno
@@ -34,7 +33,6 @@
import time
import traceback
import warnings
-import zlib
from .thirdparty import (
attr,
@@ -50,6 +48,7 @@
urllibcompat,
)
from .utils import (
+ compression,
procutil,
stringutil,
)
@@ -127,6 +126,11 @@
unlink = platform.unlink
username = platform.username
+# small compat layer
+compengines = compression.compengines
+SERVERROLE = compression.SERVERROLE
+CLIENTROLE = compression.CLIENTROLE
+
try:
recvfds = osutil.recvfds
except AttributeError:
@@ -3206,714 +3210,6 @@
yield path[:pos]
pos = path.rfind('/', 0, pos)
-# compression code
-
-SERVERROLE = 'server'
-CLIENTROLE = 'client'
-
-compewireprotosupport = collections.namedtuple(r'compenginewireprotosupport',
- (r'name', r'serverpriority',
- r'clientpriority'))
-
-class compressormanager(object):
- """Holds registrations of various compression engines.
-
- This class essentially abstracts the differences between compression
- engines to allow new compression formats to be added easily, possibly from
- extensions.
-
- Compressors are registered against the global instance by calling its
- ``register()`` method.
- """
- def __init__(self):
- self._engines = {}
- # Bundle spec human name to engine name.
- self._bundlenames = {}
- # Internal bundle identifier to engine name.
- self._bundletypes = {}
- # Revlog header to engine name.
- self._revlogheaders = {}
- # Wire proto identifier to engine name.
- self._wiretypes = {}
-
- def __getitem__(self, key):
- return self._engines[key]
-
- def __contains__(self, key):
- return key in self._engines
-
- def __iter__(self):
- return iter(self._engines.keys())
-
- def register(self, engine):
- """Register a compression engine with the manager.
-
- The argument must be a ``compressionengine`` instance.
- """
- if not isinstance(engine, compressionengine):
- raise ValueError(_('argument must be a compressionengine'))
-
- name = engine.name()
-
- if name in self._engines:
- raise error.Abort(_('compression engine %s already registered') %
- name)
-
- bundleinfo = engine.bundletype()
- if bundleinfo:
- bundlename, bundletype = bundleinfo
-
- if bundlename in self._bundlenames:
- raise error.Abort(_('bundle name %s already registered') %
- bundlename)
- if bundletype in self._bundletypes:
- raise error.Abort(_('bundle type %s already registered by %s') %
- (bundletype, self._bundletypes[bundletype]))
-
- # No external facing name declared.
- if bundlename:
- self._bundlenames[bundlename] = name
-
- self._bundletypes[bundletype] = name
-
- wiresupport = engine.wireprotosupport()
- if wiresupport:
- wiretype = wiresupport.name
- if wiretype in self._wiretypes:
- raise error.Abort(_('wire protocol compression %s already '
- 'registered by %s') %
- (wiretype, self._wiretypes[wiretype]))
-
- self._wiretypes[wiretype] = name
-
- revlogheader = engine.revlogheader()
- if revlogheader and revlogheader in self._revlogheaders:
- raise error.Abort(_('revlog header %s already registered by %s') %
- (revlogheader, self._revlogheaders[revlogheader]))
-
- if revlogheader:
- self._revlogheaders[revlogheader] = name
-
- self._engines[name] = engine
-
- @property
- def supportedbundlenames(self):
- return set(self._bundlenames.keys())
-
- @property
- def supportedbundletypes(self):
- return set(self._bundletypes.keys())
-
- def forbundlename(self, bundlename):
- """Obtain a compression engine registered to a bundle name.
-
- Will raise KeyError if the bundle type isn't registered.
-
- Will abort if the engine is known but not available.
- """
- engine = self._engines[self._bundlenames[bundlename]]
- if not engine.available():
- raise error.Abort(_('compression engine %s could not be loaded') %
- engine.name())
- return engine
-
- def forbundletype(self, bundletype):
- """Obtain a compression engine registered to a bundle type.
-
- Will raise KeyError if the bundle type isn't registered.
-
- Will abort if the engine is known but not available.
- """
- engine = self._engines[self._bundletypes[bundletype]]
- if not engine.available():
- raise error.Abort(_('compression engine %s could not be loaded') %
- engine.name())
- return engine
-
- def supportedwireengines(self, role, onlyavailable=True):
- """Obtain compression engines that support the wire protocol.
-
- Returns a list of engines in prioritized order, most desired first.
-
- If ``onlyavailable`` is set, filter out engines that can't be
- loaded.
- """
- assert role in (SERVERROLE, CLIENTROLE)
-
- attr = 'serverpriority' if role == SERVERROLE else 'clientpriority'
-
- engines = [self._engines[e] for e in self._wiretypes.values()]
- if onlyavailable:
- engines = [e for e in engines if e.available()]
-
- def getkey(e):
- # Sort first by priority, highest first. In case of tie, sort
- # alphabetically. This is arbitrary, but ensures output is
- # stable.
- w = e.wireprotosupport()
- return -1 * getattr(w, attr), w.name
-
- return list(sorted(engines, key=getkey))
-
- def forwiretype(self, wiretype):
- engine = self._engines[self._wiretypes[wiretype]]
- if not engine.available():
- raise error.Abort(_('compression engine %s could not be loaded') %
- engine.name())
- return engine
-
- def forrevlogheader(self, header):
- """Obtain a compression engine registered to a revlog header.
-
- Will raise KeyError if the revlog header value isn't registered.
- """
- return self._engines[self._revlogheaders[header]]
-
-compengines = compressormanager()
-
-class compressionengine(object):
- """Base class for compression engines.
-
- Compression engines must implement the interface defined by this class.
- """
- def name(self):
- """Returns the name of the compression engine.
-
- This is the key the engine is registered under.
-
- This method must be implemented.
- """
- raise NotImplementedError()
-
- def available(self):
- """Whether the compression engine is available.
-
- The intent of this method is to allow optional compression engines
- that may not be available in all installations (such as engines relying
- on C extensions that may not be present).
- """
- return True
-
- def bundletype(self):
- """Describes bundle identifiers for this engine.
-
- If this compression engine isn't supported for bundles, returns None.
-
- If this engine can be used for bundles, returns a 2-tuple of strings of
- the user-facing "bundle spec" compression name and an internal
- identifier used to denote the compression format within bundles. To
- exclude the name from external usage, set the first element to ``None``.
-
- If bundle compression is supported, the class must also implement
- ``compressstream`` and `decompressorreader``.
-
- The docstring of this method is used in the help system to tell users
- about this engine.
- """
- return None
-
- def wireprotosupport(self):
- """Declare support for this compression format on the wire protocol.
-
- If this compression engine isn't supported for compressing wire
- protocol payloads, returns None.
-
- Otherwise, returns ``compenginewireprotosupport`` with the following
- fields:
-
- * String format identifier
- * Integer priority for the server
- * Integer priority for the client
-
- The integer priorities are used to order the advertisement of format
- support by server and client. The highest integer is advertised
- first. Integers with non-positive values aren't advertised.
-
- The priority values are somewhat arbitrary and only used for default
- ordering. The relative order can be changed via config options.
-
- If wire protocol compression is supported, the class must also implement
- ``compressstream`` and ``decompressorreader``.
- """
- return None
-
- def revlogheader(self):
- """Header added to revlog chunks that identifies this engine.
-
- If this engine can be used to compress revlogs, this method should
- return the bytes used to identify chunks compressed with this engine.
- Else, the method should return ``None`` to indicate it does not
- participate in revlog compression.
- """
- return None
-
- def compressstream(self, it, opts=None):
- """Compress an iterator of chunks.
-
- The method receives an iterator (ideally a generator) of chunks of
- bytes to be compressed. It returns an iterator (ideally a generator)
- of bytes of chunks representing the compressed output.
-
- Optionally accepts an argument defining how to perform compression.
- Each engine treats this argument differently.
- """
- raise NotImplementedError()
-
- def decompressorreader(self, fh):
- """Perform decompression on a file object.
-
- Argument is an object with a ``read(size)`` method that returns
- compressed data. Return value is an object with a ``read(size)`` that
- returns uncompressed data.
- """
- raise NotImplementedError()
-
- def revlogcompressor(self, opts=None):
- """Obtain an object that can be used to compress revlog entries.
-
- The object has a ``compress(data)`` method that compresses binary
- data. This method returns compressed binary data or ``None`` if
- the data could not be compressed (too small, not compressible, etc).
- The returned data should have a header uniquely identifying this
- compression format so decompression can be routed to this engine.
- This header should be identified by the ``revlogheader()`` return
- value.
-
- The object has a ``decompress(data)`` method that decompresses
- data. The method will only be called if ``data`` begins with
- ``revlogheader()``. The method should return the raw, uncompressed
- data or raise a ``StorageError``.
-
- The object is reusable but is not thread safe.
- """
- raise NotImplementedError()
-
-class _CompressedStreamReader(object):
- def __init__(self, fh):
- if safehasattr(fh, 'unbufferedread'):
- self._reader = fh.unbufferedread
- else:
- self._reader = fh.read
- self._pending = []
- self._pos = 0
- self._eof = False
-
- def _decompress(self, chunk):
- raise NotImplementedError()
-
- def read(self, l):
- buf = []
- while True:
- while self._pending:
- if len(self._pending[0]) > l + self._pos:
- newbuf = self._pending[0]
- buf.append(newbuf[self._pos:self._pos + l])
- self._pos += l
- return ''.join(buf)
-
- newbuf = self._pending.pop(0)
- if self._pos:
- buf.append(newbuf[self._pos:])
- l -= len(newbuf) - self._pos
- else:
- buf.append(newbuf)
- l -= len(newbuf)
- self._pos = 0
-
- if self._eof:
- return ''.join(buf)
- chunk = self._reader(65536)
- self._decompress(chunk)
- if not chunk and not self._pending and not self._eof:
- # No progress and no new data, bail out
- return ''.join(buf)
-
-class _GzipCompressedStreamReader(_CompressedStreamReader):
- def __init__(self, fh):
- super(_GzipCompressedStreamReader, self).__init__(fh)
- self._decompobj = zlib.decompressobj()
- def _decompress(self, chunk):
- newbuf = self._decompobj.decompress(chunk)
- if newbuf:
- self._pending.append(newbuf)
- d = self._decompobj.copy()
- try:
- d.decompress('x')
- d.flush()
- if d.unused_data == 'x':
- self._eof = True
- except zlib.error:
- pass
-
-class _BZ2CompressedStreamReader(_CompressedStreamReader):
- def __init__(self, fh):
- super(_BZ2CompressedStreamReader, self).__init__(fh)
- self._decompobj = bz2.BZ2Decompressor()
- def _decompress(self, chunk):
- newbuf = self._decompobj.decompress(chunk)
- if newbuf:
- self._pending.append(newbuf)
- try:
- while True:
- newbuf = self._decompobj.decompress('')
- if newbuf:
- self._pending.append(newbuf)
- else:
- break
- except EOFError:
- self._eof = True
-
-class _TruncatedBZ2CompressedStreamReader(_BZ2CompressedStreamReader):
- def __init__(self, fh):
- super(_TruncatedBZ2CompressedStreamReader, self).__init__(fh)
- newbuf = self._decompobj.decompress('BZ')
- if newbuf:
- self._pending.append(newbuf)
-
-class _ZstdCompressedStreamReader(_CompressedStreamReader):
- def __init__(self, fh, zstd):
- super(_ZstdCompressedStreamReader, self).__init__(fh)
- self._zstd = zstd
- self._decompobj = zstd.ZstdDecompressor().decompressobj()
- def _decompress(self, chunk):
- newbuf = self._decompobj.decompress(chunk)
- if newbuf:
- self._pending.append(newbuf)
- try:
- while True:
- newbuf = self._decompobj.decompress('')
- if newbuf:
- self._pending.append(newbuf)
- else:
- break
- except self._zstd.ZstdError:
- self._eof = True
-
-class _zlibengine(compressionengine):
- def name(self):
- return 'zlib'
-
- def bundletype(self):
- """zlib compression using the DEFLATE algorithm.
-
- All Mercurial clients should support this format. The compression
- algorithm strikes a reasonable balance between compression ratio
- and size.
- """
- return 'gzip', 'GZ'
-
- def wireprotosupport(self):
- return compewireprotosupport('zlib', 20, 20)
-
- def revlogheader(self):
- return 'x'
-
- def compressstream(self, it, opts=None):
- opts = opts or {}
-
- z = zlib.compressobj(opts.get('level', -1))
- for chunk in it:
- data = z.compress(chunk)
- # Not all calls to compress emit data. It is cheaper to inspect
- # here than to feed empty chunks through generator.
- if data:
- yield data
-
- yield z.flush()
-
- def decompressorreader(self, fh):
- return _GzipCompressedStreamReader(fh)
-
- class zlibrevlogcompressor(object):
- def compress(self, data):
- insize = len(data)
- # Caller handles empty input case.
- assert insize > 0
-
- if insize < 44:
- return None
-
- elif insize <= 1000000:
- compressed = zlib.compress(data)
- if len(compressed) < insize:
- return compressed
- return None
-
- # zlib makes an internal copy of the input buffer, doubling
- # memory usage for large inputs. So do streaming compression
- # on large inputs.
- else:
- z = zlib.compressobj()
- parts = []
- pos = 0
- while pos < insize:
- pos2 = pos + 2**20
- parts.append(z.compress(data[pos:pos2]))
- pos = pos2
- parts.append(z.flush())
-
- if sum(map(len, parts)) < insize:
- return ''.join(parts)
- return None
-
- def decompress(self, data):
- try:
- return zlib.decompress(data)
- except zlib.error as e:
- raise error.StorageError(_('revlog decompress error: %s') %
- stringutil.forcebytestr(e))
-
- def revlogcompressor(self, opts=None):
- return self.zlibrevlogcompressor()
-
-compengines.register(_zlibengine())
-
-class _bz2engine(compressionengine):
- def name(self):
- return 'bz2'
-
- def bundletype(self):
- """An algorithm that produces smaller bundles than ``gzip``.
-
- All Mercurial clients should support this format.
-
- This engine will likely produce smaller bundles than ``gzip`` but
- will be significantly slower, both during compression and
- decompression.
-
- If available, the ``zstd`` engine can yield similar or better
- compression at much higher speeds.
- """
- return 'bzip2', 'BZ'
-
- # We declare a protocol name but don't advertise by default because
- # it is slow.
- def wireprotosupport(self):
- return compewireprotosupport('bzip2', 0, 0)
-
- def compressstream(self, it, opts=None):
- opts = opts or {}
- z = bz2.BZ2Compressor(opts.get('level', 9))
- for chunk in it:
- data = z.compress(chunk)
- if data:
- yield data
-
- yield z.flush()
-
- def decompressorreader(self, fh):
- return _BZ2CompressedStreamReader(fh)
-
-compengines.register(_bz2engine())
-
-class _truncatedbz2engine(compressionengine):
- def name(self):
- return 'bz2truncated'
-
- def bundletype(self):
- return None, '_truncatedBZ'
-
- # We don't implement compressstream because it is hackily handled elsewhere.
-
- def decompressorreader(self, fh):
- return _TruncatedBZ2CompressedStreamReader(fh)
-
-compengines.register(_truncatedbz2engine())
-
-class _noopengine(compressionengine):
- def name(self):
- return 'none'
-
- def bundletype(self):
- """No compression is performed.
-
- Use this compression engine to explicitly disable compression.
- """
- return 'none', 'UN'
-
- # Clients always support uncompressed payloads. Servers don't because
- # unless you are on a fast network, uncompressed payloads can easily
- # saturate your network pipe.
- def wireprotosupport(self):
- return compewireprotosupport('none', 0, 10)
-
- # We don't implement revlogheader because it is handled specially
- # in the revlog class.
-
- def compressstream(self, it, opts=None):
- return it
-
- def decompressorreader(self, fh):
- return fh
-
- class nooprevlogcompressor(object):
- def compress(self, data):
- return None
-
- def revlogcompressor(self, opts=None):
- return self.nooprevlogcompressor()
-
-compengines.register(_noopengine())
-
-class _zstdengine(compressionengine):
- def name(self):
- return 'zstd'
-
- @propertycache
- def _module(self):
- # Not all installs have the zstd module available. So defer importing
- # until first access.
- try:
- from . import zstd
- # Force delayed import.
- zstd.__version__
- return zstd
- except ImportError:
- return None
-
- def available(self):
- return bool(self._module)
-
- def bundletype(self):
- """A modern compression algorithm that is fast and highly flexible.
-
- Only supported by Mercurial 4.1 and newer clients.
-
- With the default settings, zstd compression is both faster and yields
- better compression than ``gzip``. It also frequently yields better
- compression than ``bzip2`` while operating at much higher speeds.
-
- If this engine is available and backwards compatibility is not a
- concern, it is likely the best available engine.
- """
- return 'zstd', 'ZS'
-
- def wireprotosupport(self):
- return compewireprotosupport('zstd', 50, 50)
-
- def revlogheader(self):
- return '\x28'
-
- def compressstream(self, it, opts=None):
- opts = opts or {}
- # zstd level 3 is almost always significantly faster than zlib
- # while providing no worse compression. It strikes a good balance
- # between speed and compression.
- level = opts.get('level', 3)
-
- zstd = self._module
- z = zstd.ZstdCompressor(level=level).compressobj()
- for chunk in it:
- data = z.compress(chunk)
- if data:
- yield data
-
- yield z.flush()
-
- def decompressorreader(self, fh):
- return _ZstdCompressedStreamReader(fh, self._module)
-
- class zstdrevlogcompressor(object):
- def __init__(self, zstd, level=3):
- # TODO consider omitting frame magic to save 4 bytes.
- # This writes content sizes into the frame header. That is
- # extra storage. But it allows a correct size memory allocation
- # to hold the result.
- self._cctx = zstd.ZstdCompressor(level=level)
- self._dctx = zstd.ZstdDecompressor()
- self._compinsize = zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE
- self._decompinsize = zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE
-
- def compress(self, data):
- insize = len(data)
- # Caller handles empty input case.
- assert insize > 0
-
- if insize < 50:
- return None
-
- elif insize <= 1000000:
- compressed = self._cctx.compress(data)
- if len(compressed) < insize:
- return compressed
- return None
- else:
- z = self._cctx.compressobj()
- chunks = []
- pos = 0
- while pos < insize:
- pos2 = pos + self._compinsize
- chunk = z.compress(data[pos:pos2])
- if chunk:
- chunks.append(chunk)
- pos = pos2
- chunks.append(z.flush())
-
- if sum(map(len, chunks)) < insize:
- return ''.join(chunks)
- return None
-
- def decompress(self, data):
- insize = len(data)
-
- try:
- # This was measured to be faster than other streaming
- # decompressors.
- dobj = self._dctx.decompressobj()
- chunks = []
- pos = 0
- while pos < insize:
- pos2 = pos + self._decompinsize
- chunk = dobj.decompress(data[pos:pos2])
- if chunk:
- chunks.append(chunk)
- pos = pos2
- # Frame should be exhausted, so no finish() API.
-
- return ''.join(chunks)
- except Exception as e:
- raise error.StorageError(_('revlog decompress error: %s') %
- stringutil.forcebytestr(e))
-
- def revlogcompressor(self, opts=None):
- opts = opts or {}
- return self.zstdrevlogcompressor(self._module,
- level=opts.get('level', 3))
-
-compengines.register(_zstdengine())
-
-def bundlecompressiontopics():
- """Obtains a list of available bundle compressions for use in help."""
- # help.makeitemsdocs() expects a dict of names to items with a .__doc__.
- items = {}
-
- # We need to format the docstring. So use a dummy object/type to hold it
- # rather than mutating the original.
- class docobject(object):
- pass
-
- for name in compengines:
- engine = compengines[name]
-
- if not engine.available():
- continue
-
- bt = engine.bundletype()
- if not bt or not bt[0]:
- continue
-
- doc = b'``%s``\n %s' % (bt[0], pycompat.getdoc(engine.bundletype))
-
- value = docobject()
- value.__doc__ = pycompat.sysstr(doc)
- value._origdoc = engine.bundletype.__doc__
- value._origfunc = engine.bundletype
-
- items[bt[0]] = value
-
- return items
-
-i18nfunctions = bundlecompressiontopics().values()
# convenient shortcut
dst = debugstacktrace
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/mercurial/utils/compression.py Wed Mar 27 16:45:14 2019 +0100
@@ -0,0 +1,747 @@
+# compression.py - Mercurial utility functions for compression
+#
+# This software may be used and distributed according to the terms of the
+# GNU General Public License version 2 or any later version.
+
+
+from __future__ import absolute_import, print_function
+
+import bz2
+import collections
+import zlib
+
+from .. import (
+ error,
+ i18n,
+ pycompat,
+)
+from . import (
+ stringutil,
+)
+
+safehasattr = pycompat.safehasattr
+
+
+_ = i18n._
+
+# compression code
+
+SERVERROLE = 'server'
+CLIENTROLE = 'client'
+
+compewireprotosupport = collections.namedtuple(r'compenginewireprotosupport',
+ (r'name', r'serverpriority',
+ r'clientpriority'))
+
+class propertycache(object):
+ def __init__(self, func):
+ self.func = func
+ self.name = func.__name__
+ def __get__(self, obj, type=None):
+ result = self.func(obj)
+ self.cachevalue(obj, result)
+ return result
+
+ def cachevalue(self, obj, value):
+ # __dict__ assignment required to bypass __setattr__ (eg: repoview)
+ obj.__dict__[self.name] = value
+
+class compressormanager(object):
+ """Holds registrations of various compression engines.
+
+ This class essentially abstracts the differences between compression
+ engines to allow new compression formats to be added easily, possibly from
+ extensions.
+
+ Compressors are registered against the global instance by calling its
+ ``register()`` method.
+ """
+ def __init__(self):
+ self._engines = {}
+ # Bundle spec human name to engine name.
+ self._bundlenames = {}
+ # Internal bundle identifier to engine name.
+ self._bundletypes = {}
+ # Revlog header to engine name.
+ self._revlogheaders = {}
+ # Wire proto identifier to engine name.
+ self._wiretypes = {}
+
+ def __getitem__(self, key):
+ return self._engines[key]
+
+ def __contains__(self, key):
+ return key in self._engines
+
+ def __iter__(self):
+ return iter(self._engines.keys())
+
+ def register(self, engine):
+ """Register a compression engine with the manager.
+
+ The argument must be a ``compressionengine`` instance.
+ """
+ if not isinstance(engine, compressionengine):
+ raise ValueError(_('argument must be a compressionengine'))
+
+ name = engine.name()
+
+ if name in self._engines:
+ raise error.Abort(_('compression engine %s already registered') %
+ name)
+
+ bundleinfo = engine.bundletype()
+ if bundleinfo:
+ bundlename, bundletype = bundleinfo
+
+ if bundlename in self._bundlenames:
+ raise error.Abort(_('bundle name %s already registered') %
+ bundlename)
+ if bundletype in self._bundletypes:
+ raise error.Abort(_('bundle type %s already registered by %s') %
+ (bundletype, self._bundletypes[bundletype]))
+
+ # No external facing name declared.
+ if bundlename:
+ self._bundlenames[bundlename] = name
+
+ self._bundletypes[bundletype] = name
+
+ wiresupport = engine.wireprotosupport()
+ if wiresupport:
+ wiretype = wiresupport.name
+ if wiretype in self._wiretypes:
+ raise error.Abort(_('wire protocol compression %s already '
+ 'registered by %s') %
+ (wiretype, self._wiretypes[wiretype]))
+
+ self._wiretypes[wiretype] = name
+
+ revlogheader = engine.revlogheader()
+ if revlogheader and revlogheader in self._revlogheaders:
+ raise error.Abort(_('revlog header %s already registered by %s') %
+ (revlogheader, self._revlogheaders[revlogheader]))
+
+ if revlogheader:
+ self._revlogheaders[revlogheader] = name
+
+ self._engines[name] = engine
+
+ @property
+ def supportedbundlenames(self):
+ return set(self._bundlenames.keys())
+
+ @property
+ def supportedbundletypes(self):
+ return set(self._bundletypes.keys())
+
+ def forbundlename(self, bundlename):
+ """Obtain a compression engine registered to a bundle name.
+
+ Will raise KeyError if the bundle type isn't registered.
+
+ Will abort if the engine is known but not available.
+ """
+ engine = self._engines[self._bundlenames[bundlename]]
+ if not engine.available():
+ raise error.Abort(_('compression engine %s could not be loaded') %
+ engine.name())
+ return engine
+
+ def forbundletype(self, bundletype):
+ """Obtain a compression engine registered to a bundle type.
+
+ Will raise KeyError if the bundle type isn't registered.
+
+ Will abort if the engine is known but not available.
+ """
+ engine = self._engines[self._bundletypes[bundletype]]
+ if not engine.available():
+ raise error.Abort(_('compression engine %s could not be loaded') %
+ engine.name())
+ return engine
+
+ def supportedwireengines(self, role, onlyavailable=True):
+ """Obtain compression engines that support the wire protocol.
+
+ Returns a list of engines in prioritized order, most desired first.
+
+ If ``onlyavailable`` is set, filter out engines that can't be
+ loaded.
+ """
+ assert role in (SERVERROLE, CLIENTROLE)
+
+ attr = 'serverpriority' if role == SERVERROLE else 'clientpriority'
+
+ engines = [self._engines[e] for e in self._wiretypes.values()]
+ if onlyavailable:
+ engines = [e for e in engines if e.available()]
+
+ def getkey(e):
+ # Sort first by priority, highest first. In case of tie, sort
+ # alphabetically. This is arbitrary, but ensures output is
+ # stable.
+ w = e.wireprotosupport()
+ return -1 * getattr(w, attr), w.name
+
+ return list(sorted(engines, key=getkey))
+
+ def forwiretype(self, wiretype):
+ engine = self._engines[self._wiretypes[wiretype]]
+ if not engine.available():
+ raise error.Abort(_('compression engine %s could not be loaded') %
+ engine.name())
+ return engine
+
+ def forrevlogheader(self, header):
+ """Obtain a compression engine registered to a revlog header.
+
+ Will raise KeyError if the revlog header value isn't registered.
+ """
+ return self._engines[self._revlogheaders[header]]
+
+compengines = compressormanager()
+
+class compressionengine(object):
+ """Base class for compression engines.
+
+ Compression engines must implement the interface defined by this class.
+ """
+ def name(self):
+ """Returns the name of the compression engine.
+
+ This is the key the engine is registered under.
+
+ This method must be implemented.
+ """
+ raise NotImplementedError()
+
+ def available(self):
+ """Whether the compression engine is available.
+
+ The intent of this method is to allow optional compression engines
+ that may not be available in all installations (such as engines relying
+ on C extensions that may not be present).
+ """
+ return True
+
+ def bundletype(self):
+ """Describes bundle identifiers for this engine.
+
+ If this compression engine isn't supported for bundles, returns None.
+
+ If this engine can be used for bundles, returns a 2-tuple of strings of
+ the user-facing "bundle spec" compression name and an internal
+ identifier used to denote the compression format within bundles. To
+ exclude the name from external usage, set the first element to ``None``.
+
+ If bundle compression is supported, the class must also implement
+ ``compressstream`` and `decompressorreader``.
+
+ The docstring of this method is used in the help system to tell users
+ about this engine.
+ """
+ return None
+
+ def wireprotosupport(self):
+ """Declare support for this compression format on the wire protocol.
+
+ If this compression engine isn't supported for compressing wire
+ protocol payloads, returns None.
+
+ Otherwise, returns ``compenginewireprotosupport`` with the following
+ fields:
+
+ * String format identifier
+ * Integer priority for the server
+ * Integer priority for the client
+
+ The integer priorities are used to order the advertisement of format
+ support by server and client. The highest integer is advertised
+ first. Integers with non-positive values aren't advertised.
+
+ The priority values are somewhat arbitrary and only used for default
+ ordering. The relative order can be changed via config options.
+
+ If wire protocol compression is supported, the class must also implement
+ ``compressstream`` and ``decompressorreader``.
+ """
+ return None
+
+ def revlogheader(self):
+ """Header added to revlog chunks that identifies this engine.
+
+ If this engine can be used to compress revlogs, this method should
+ return the bytes used to identify chunks compressed with this engine.
+ Else, the method should return ``None`` to indicate it does not
+ participate in revlog compression.
+ """
+ return None
+
+ def compressstream(self, it, opts=None):
+ """Compress an iterator of chunks.
+
+ The method receives an iterator (ideally a generator) of chunks of
+ bytes to be compressed. It returns an iterator (ideally a generator)
+ of bytes of chunks representing the compressed output.
+
+ Optionally accepts an argument defining how to perform compression.
+ Each engine treats this argument differently.
+ """
+ raise NotImplementedError()
+
+ def decompressorreader(self, fh):
+ """Perform decompression on a file object.
+
+ Argument is an object with a ``read(size)`` method that returns
+ compressed data. Return value is an object with a ``read(size)`` that
+ returns uncompressed data.
+ """
+ raise NotImplementedError()
+
+ def revlogcompressor(self, opts=None):
+ """Obtain an object that can be used to compress revlog entries.
+
+ The object has a ``compress(data)`` method that compresses binary
+ data. This method returns compressed binary data or ``None`` if
+ the data could not be compressed (too small, not compressible, etc).
+ The returned data should have a header uniquely identifying this
+ compression format so decompression can be routed to this engine.
+ This header should be identified by the ``revlogheader()`` return
+ value.
+
+ The object has a ``decompress(data)`` method that decompresses
+ data. The method will only be called if ``data`` begins with
+ ``revlogheader()``. The method should return the raw, uncompressed
+ data or raise a ``StorageError``.
+
+ The object is reusable but is not thread safe.
+ """
+ raise NotImplementedError()
+
+class _CompressedStreamReader(object):
+ def __init__(self, fh):
+ if safehasattr(fh, 'unbufferedread'):
+ self._reader = fh.unbufferedread
+ else:
+ self._reader = fh.read
+ self._pending = []
+ self._pos = 0
+ self._eof = False
+
+ def _decompress(self, chunk):
+ raise NotImplementedError()
+
+ def read(self, l):
+ buf = []
+ while True:
+ while self._pending:
+ if len(self._pending[0]) > l + self._pos:
+ newbuf = self._pending[0]
+ buf.append(newbuf[self._pos:self._pos + l])
+ self._pos += l
+ return ''.join(buf)
+
+ newbuf = self._pending.pop(0)
+ if self._pos:
+ buf.append(newbuf[self._pos:])
+ l -= len(newbuf) - self._pos
+ else:
+ buf.append(newbuf)
+ l -= len(newbuf)
+ self._pos = 0
+
+ if self._eof:
+ return ''.join(buf)
+ chunk = self._reader(65536)
+ self._decompress(chunk)
+ if not chunk and not self._pending and not self._eof:
+ # No progress and no new data, bail out
+ return ''.join(buf)
+
+class _GzipCompressedStreamReader(_CompressedStreamReader):
+ def __init__(self, fh):
+ super(_GzipCompressedStreamReader, self).__init__(fh)
+ self._decompobj = zlib.decompressobj()
+ def _decompress(self, chunk):
+ newbuf = self._decompobj.decompress(chunk)
+ if newbuf:
+ self._pending.append(newbuf)
+ d = self._decompobj.copy()
+ try:
+ d.decompress('x')
+ d.flush()
+ if d.unused_data == 'x':
+ self._eof = True
+ except zlib.error:
+ pass
+
+class _BZ2CompressedStreamReader(_CompressedStreamReader):
+ def __init__(self, fh):
+ super(_BZ2CompressedStreamReader, self).__init__(fh)
+ self._decompobj = bz2.BZ2Decompressor()
+ def _decompress(self, chunk):
+ newbuf = self._decompobj.decompress(chunk)
+ if newbuf:
+ self._pending.append(newbuf)
+ try:
+ while True:
+ newbuf = self._decompobj.decompress('')
+ if newbuf:
+ self._pending.append(newbuf)
+ else:
+ break
+ except EOFError:
+ self._eof = True
+
+class _TruncatedBZ2CompressedStreamReader(_BZ2CompressedStreamReader):
+ def __init__(self, fh):
+ super(_TruncatedBZ2CompressedStreamReader, self).__init__(fh)
+ newbuf = self._decompobj.decompress('BZ')
+ if newbuf:
+ self._pending.append(newbuf)
+
+class _ZstdCompressedStreamReader(_CompressedStreamReader):
+ def __init__(self, fh, zstd):
+ super(_ZstdCompressedStreamReader, self).__init__(fh)
+ self._zstd = zstd
+ self._decompobj = zstd.ZstdDecompressor().decompressobj()
+ def _decompress(self, chunk):
+ newbuf = self._decompobj.decompress(chunk)
+ if newbuf:
+ self._pending.append(newbuf)
+ try:
+ while True:
+ newbuf = self._decompobj.decompress('')
+ if newbuf:
+ self._pending.append(newbuf)
+ else:
+ break
+ except self._zstd.ZstdError:
+ self._eof = True
+
+class _zlibengine(compressionengine):
+ def name(self):
+ return 'zlib'
+
+ def bundletype(self):
+ """zlib compression using the DEFLATE algorithm.
+
+ All Mercurial clients should support this format. The compression
+ algorithm strikes a reasonable balance between compression ratio
+ and size.
+ """
+ return 'gzip', 'GZ'
+
+ def wireprotosupport(self):
+ return compewireprotosupport('zlib', 20, 20)
+
+ def revlogheader(self):
+ return 'x'
+
+ def compressstream(self, it, opts=None):
+ opts = opts or {}
+
+ z = zlib.compressobj(opts.get('level', -1))
+ for chunk in it:
+ data = z.compress(chunk)
+ # Not all calls to compress emit data. It is cheaper to inspect
+ # here than to feed empty chunks through generator.
+ if data:
+ yield data
+
+ yield z.flush()
+
+ def decompressorreader(self, fh):
+ return _GzipCompressedStreamReader(fh)
+
+ class zlibrevlogcompressor(object):
+ def compress(self, data):
+ insize = len(data)
+ # Caller handles empty input case.
+ assert insize > 0
+
+ if insize < 44:
+ return None
+
+ elif insize <= 1000000:
+ compressed = zlib.compress(data)
+ if len(compressed) < insize:
+ return compressed
+ return None
+
+ # zlib makes an internal copy of the input buffer, doubling
+ # memory usage for large inputs. So do streaming compression
+ # on large inputs.
+ else:
+ z = zlib.compressobj()
+ parts = []
+ pos = 0
+ while pos < insize:
+ pos2 = pos + 2**20
+ parts.append(z.compress(data[pos:pos2]))
+ pos = pos2
+ parts.append(z.flush())
+
+ if sum(map(len, parts)) < insize:
+ return ''.join(parts)
+ return None
+
+ def decompress(self, data):
+ try:
+ return zlib.decompress(data)
+ except zlib.error as e:
+ raise error.StorageError(_('revlog decompress error: %s') %
+ stringutil.forcebytestr(e))
+
+ def revlogcompressor(self, opts=None):
+ return self.zlibrevlogcompressor()
+
+compengines.register(_zlibengine())
+
+class _bz2engine(compressionengine):
+ def name(self):
+ return 'bz2'
+
+ def bundletype(self):
+ """An algorithm that produces smaller bundles than ``gzip``.
+
+ All Mercurial clients should support this format.
+
+ This engine will likely produce smaller bundles than ``gzip`` but
+ will be significantly slower, both during compression and
+ decompression.
+
+ If available, the ``zstd`` engine can yield similar or better
+ compression at much higher speeds.
+ """
+ return 'bzip2', 'BZ'
+
+ # We declare a protocol name but don't advertise by default because
+ # it is slow.
+ def wireprotosupport(self):
+ return compewireprotosupport('bzip2', 0, 0)
+
+ def compressstream(self, it, opts=None):
+ opts = opts or {}
+ z = bz2.BZ2Compressor(opts.get('level', 9))
+ for chunk in it:
+ data = z.compress(chunk)
+ if data:
+ yield data
+
+ yield z.flush()
+
+ def decompressorreader(self, fh):
+ return _BZ2CompressedStreamReader(fh)
+
+compengines.register(_bz2engine())
+
+class _truncatedbz2engine(compressionengine):
+ def name(self):
+ return 'bz2truncated'
+
+ def bundletype(self):
+ return None, '_truncatedBZ'
+
+ # We don't implement compressstream because it is hackily handled elsewhere.
+
+ def decompressorreader(self, fh):
+ return _TruncatedBZ2CompressedStreamReader(fh)
+
+compengines.register(_truncatedbz2engine())
+
+class _noopengine(compressionengine):
+ def name(self):
+ return 'none'
+
+ def bundletype(self):
+ """No compression is performed.
+
+ Use this compression engine to explicitly disable compression.
+ """
+ return 'none', 'UN'
+
+ # Clients always support uncompressed payloads. Servers don't because
+ # unless you are on a fast network, uncompressed payloads can easily
+ # saturate your network pipe.
+ def wireprotosupport(self):
+ return compewireprotosupport('none', 0, 10)
+
+ # We don't implement revlogheader because it is handled specially
+ # in the revlog class.
+
+ def compressstream(self, it, opts=None):
+ return it
+
+ def decompressorreader(self, fh):
+ return fh
+
+ class nooprevlogcompressor(object):
+ def compress(self, data):
+ return None
+
+ def revlogcompressor(self, opts=None):
+ return self.nooprevlogcompressor()
+
+compengines.register(_noopengine())
+
+class _zstdengine(compressionengine):
+ def name(self):
+ return 'zstd'
+
+ @propertycache
+ def _module(self):
+ # Not all installs have the zstd module available. So defer importing
+ # until first access.
+ try:
+ from .. import zstd
+ # Force delayed import.
+ zstd.__version__
+ return zstd
+ except ImportError:
+ return None
+
+ def available(self):
+ return bool(self._module)
+
+ def bundletype(self):
+ """A modern compression algorithm that is fast and highly flexible.
+
+ Only supported by Mercurial 4.1 and newer clients.
+
+ With the default settings, zstd compression is both faster and yields
+ better compression than ``gzip``. It also frequently yields better
+ compression than ``bzip2`` while operating at much higher speeds.
+
+ If this engine is available and backwards compatibility is not a
+ concern, it is likely the best available engine.
+ """
+ return 'zstd', 'ZS'
+
+ def wireprotosupport(self):
+ return compewireprotosupport('zstd', 50, 50)
+
+ def revlogheader(self):
+ return '\x28'
+
+ def compressstream(self, it, opts=None):
+ opts = opts or {}
+ # zstd level 3 is almost always significantly faster than zlib
+ # while providing no worse compression. It strikes a good balance
+ # between speed and compression.
+ level = opts.get('level', 3)
+
+ zstd = self._module
+ z = zstd.ZstdCompressor(level=level).compressobj()
+ for chunk in it:
+ data = z.compress(chunk)
+ if data:
+ yield data
+
+ yield z.flush()
+
+ def decompressorreader(self, fh):
+ return _ZstdCompressedStreamReader(fh, self._module)
+
+ class zstdrevlogcompressor(object):
+ def __init__(self, zstd, level=3):
+ # TODO consider omitting frame magic to save 4 bytes.
+ # This writes content sizes into the frame header. That is
+ # extra storage. But it allows a correct size memory allocation
+ # to hold the result.
+ self._cctx = zstd.ZstdCompressor(level=level)
+ self._dctx = zstd.ZstdDecompressor()
+ self._compinsize = zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE
+ self._decompinsize = zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE
+
+ def compress(self, data):
+ insize = len(data)
+ # Caller handles empty input case.
+ assert insize > 0
+
+ if insize < 50:
+ return None
+
+ elif insize <= 1000000:
+ compressed = self._cctx.compress(data)
+ if len(compressed) < insize:
+ return compressed
+ return None
+ else:
+ z = self._cctx.compressobj()
+ chunks = []
+ pos = 0
+ while pos < insize:
+ pos2 = pos + self._compinsize
+ chunk = z.compress(data[pos:pos2])
+ if chunk:
+ chunks.append(chunk)
+ pos = pos2
+ chunks.append(z.flush())
+
+ if sum(map(len, chunks)) < insize:
+ return ''.join(chunks)
+ return None
+
+ def decompress(self, data):
+ insize = len(data)
+
+ try:
+ # This was measured to be faster than other streaming
+ # decompressors.
+ dobj = self._dctx.decompressobj()
+ chunks = []
+ pos = 0
+ while pos < insize:
+ pos2 = pos + self._decompinsize
+ chunk = dobj.decompress(data[pos:pos2])
+ if chunk:
+ chunks.append(chunk)
+ pos = pos2
+ # Frame should be exhausted, so no finish() API.
+
+ return ''.join(chunks)
+ except Exception as e:
+ raise error.StorageError(_('revlog decompress error: %s') %
+ stringutil.forcebytestr(e))
+
+ def revlogcompressor(self, opts=None):
+ opts = opts or {}
+ return self.zstdrevlogcompressor(self._module,
+ level=opts.get('level', 3))
+
+compengines.register(_zstdengine())
+
+def bundlecompressiontopics():
+ """Obtains a list of available bundle compressions for use in help."""
+ # help.makeitemsdocs() expects a dict of names to items with a .__doc__.
+ items = {}
+
+ # We need to format the docstring. So use a dummy object/type to hold it
+ # rather than mutating the original.
+ class docobject(object):
+ pass
+
+ for name in compengines:
+ engine = compengines[name]
+
+ if not engine.available():
+ continue
+
+ bt = engine.bundletype()
+ if not bt or not bt[0]:
+ continue
+
+ doc = b'``%s``\n %s' % (bt[0], pycompat.getdoc(engine.bundletype))
+
+ value = docobject()
+ value.__doc__ = pycompat.sysstr(doc)
+ value._origdoc = engine.bundletype.__doc__
+ value._origfunc = engine.bundletype
+
+ items[bt[0]] = value
+
+ return items
+
+i18nfunctions = bundlecompressiontopics().values()
--- a/mercurial/wireprotoserver.py Sat Mar 30 13:13:10 2019 -0700
+++ b/mercurial/wireprotoserver.py Wed Mar 27 16:45:14 2019 +0100
@@ -23,6 +23,7 @@
)
from .utils import (
cborutil,
+ compression,
interfaceutil,
)
@@ -144,7 +145,7 @@
caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
compengines = wireprototypes.supportedcompengines(repo.ui,
- util.SERVERROLE)
+ compression.SERVERROLE)
if compengines:
comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
for e in compengines)
@@ -320,11 +321,12 @@
if '0.2' in proto.getprotocaps():
# All clients are expected to support uncompressed data.
if prefer_uncompressed:
- return HGTYPE2, util._noopengine(), {}
+ return HGTYPE2, compression._noopengine(), {}
# Now find an agreed upon compression format.
compformats = wireprotov1server.clientcompressionsupport(proto)
- for engine in wireprototypes.supportedcompengines(ui, util.SERVERROLE):
+ for engine in wireprototypes.supportedcompengines(ui,
+ compression.SERVERROLE):
if engine.wireprotosupport().name in compformats:
opts = {}
level = ui.configint('server', '%slevel' % engine.name())
--- a/mercurial/wireprototypes.py Sat Mar 30 13:13:10 2019 -0700
+++ b/mercurial/wireprototypes.py Wed Mar 27 16:45:14 2019 +0100
@@ -18,6 +18,7 @@
util,
)
from .utils import (
+ compression,
interfaceutil,
)
@@ -316,12 +317,12 @@
def supportedcompengines(ui, role):
"""Obtain the list of supported compression engines for a request."""
- assert role in (util.CLIENTROLE, util.SERVERROLE)
+ assert role in (compression.CLIENTROLE, compression.SERVERROLE)
- compengines = util.compengines.supportedwireengines(role)
+ compengines = compression.compengines.supportedwireengines(role)
# Allow config to override default list and ordering.
- if role == util.SERVERROLE:
+ if role == compression.SERVERROLE:
configengines = ui.configlist('server', 'compressionengines')
config = 'server.compressionengines'
else: