comparison mercurial/util.py @ 30798:f50c0db50025

util: compression APIs to support revlog decompression Previously, compression engines had APIs for performing revlog compression but no mechanism to perform revlog decompression. This patch changes that. Revlog decompression is slightly more complicated than compression because in the compression case there is (currently) only a single engine that can be used at a time. However for decompression, a revlog could contain chunks from multiple compression engines. This means decompression needs to map to multiple engines and decompressors. This functionality is outside the scope of this patch. But it drives the decision for engines to declare a byte header sequence that identifies revlog data as belonging to an engine and an API for obtaining an engine from a revlog header.
author Gregory Szorc <gregory.szorc@gmail.com>
date Mon, 02 Jan 2017 13:27:20 -0800
parents 31e1f0d4ab44
children 7005c03f7387
comparison
equal deleted inserted replaced
30797:0bde7372e4c0 30798:f50c0db50025
2998 self._engines = {} 2998 self._engines = {}
2999 # Bundle spec human name to engine name. 2999 # Bundle spec human name to engine name.
3000 self._bundlenames = {} 3000 self._bundlenames = {}
3001 # Internal bundle identifier to engine name. 3001 # Internal bundle identifier to engine name.
3002 self._bundletypes = {} 3002 self._bundletypes = {}
3003 # Revlog header to engine name.
3004 self._revlogheaders = {}
3003 # Wire proto identifier to engine name. 3005 # Wire proto identifier to engine name.
3004 self._wiretypes = {} 3006 self._wiretypes = {}
3005 3007
3006 def __getitem__(self, key): 3008 def __getitem__(self, key):
3007 return self._engines[key] 3009 return self._engines[key]
3051 'registered by %s') % 3053 'registered by %s') %
3052 (wiretype, self._wiretypes[wiretype])) 3054 (wiretype, self._wiretypes[wiretype]))
3053 3055
3054 self._wiretypes[wiretype] = name 3056 self._wiretypes[wiretype] = name
3055 3057
3058 revlogheader = engine.revlogheader()
3059 if revlogheader and revlogheader in self._revlogheaders:
3060 raise error.Abort(_('revlog header %s already registered by %s') %
3061 (revlogheader, self._revlogheaders[revlogheader]))
3062
3063 if revlogheader:
3064 self._revlogheaders[revlogheader] = name
3065
3056 self._engines[name] = engine 3066 self._engines[name] = engine
3057 3067
3058 @property 3068 @property
3059 def supportedbundlenames(self): 3069 def supportedbundlenames(self):
3060 return set(self._bundlenames.keys()) 3070 return set(self._bundlenames.keys())
3119 if not engine.available(): 3129 if not engine.available():
3120 raise error.Abort(_('compression engine %s could not be loaded') % 3130 raise error.Abort(_('compression engine %s could not be loaded') %
3121 engine.name()) 3131 engine.name())
3122 return engine 3132 return engine
3123 3133
3134 def forrevlogheader(self, header):
3135 """Obtain a compression engine registered to a revlog header.
3136
3137 Will raise KeyError if the revlog header value isn't registered.
3138 """
3139 return self._engines[self._revlogheaders[header]]
3140
3124 compengines = compressormanager() 3141 compengines = compressormanager()
3125 3142
3126 class compressionengine(object): 3143 class compressionengine(object):
3127 """Base class for compression engines. 3144 """Base class for compression engines.
3128 3145
3184 If wire protocol compression is supported, the class must also implement 3201 If wire protocol compression is supported, the class must also implement
3185 ``compressstream`` and ``decompressorreader``. 3202 ``compressstream`` and ``decompressorreader``.
3186 """ 3203 """
3187 return None 3204 return None
3188 3205
3206 def revlogheader(self):
3207 """Header added to revlog chunks that identifies this engine.
3208
3209 If this engine can be used to compress revlogs, this method should
3210 return the bytes used to identify chunks compressed with this engine.
3211 Else, the method should return ``None`` to indicate it does not
3212 participate in revlog compression.
3213 """
3214 return None
3215
3189 def compressstream(self, it, opts=None): 3216 def compressstream(self, it, opts=None):
3190 """Compress an iterator of chunks. 3217 """Compress an iterator of chunks.
3191 3218
3192 The method receives an iterator (ideally a generator) of chunks of 3219 The method receives an iterator (ideally a generator) of chunks of
3193 bytes to be compressed. It returns an iterator (ideally a generator) 3220 bytes to be compressed. It returns an iterator (ideally a generator)
3213 The object has a ``compress(data)`` method that compresses binary 3240 The object has a ``compress(data)`` method that compresses binary
3214 data. This method returns compressed binary data or ``None`` if 3241 data. This method returns compressed binary data or ``None`` if
3215 the data could not be compressed (too small, not compressible, etc). 3242 the data could not be compressed (too small, not compressible, etc).
3216 The returned data should have a header uniquely identifying this 3243 The returned data should have a header uniquely identifying this
3217 compression format so decompression can be routed to this engine. 3244 compression format so decompression can be routed to this engine.
3245 This header should be identified by the ``revlogheader()`` return
3246 value.
3247
3248 The object has a ``decompress(data)`` method that decompresses
3249 data. The method will only be called if ``data`` begins with
3250 ``revlogheader()``. The method should return the raw, uncompressed
3251 data or raise a ``RevlogError``.
3218 3252
3219 The object is reusable but is not thread safe. 3253 The object is reusable but is not thread safe.
3220 """ 3254 """
3221 raise NotImplementedError() 3255 raise NotImplementedError()
3222 3256
3227 def bundletype(self): 3261 def bundletype(self):
3228 return 'gzip', 'GZ' 3262 return 'gzip', 'GZ'
3229 3263
3230 def wireprotosupport(self): 3264 def wireprotosupport(self):
3231 return compewireprotosupport('zlib', 20, 20) 3265 return compewireprotosupport('zlib', 20, 20)
3266
3267 def revlogheader(self):
3268 return 'x'
3232 3269
3233 def compressstream(self, it, opts=None): 3270 def compressstream(self, it, opts=None):
3234 opts = opts or {} 3271 opts = opts or {}
3235 3272
3236 z = zlib.compressobj(opts.get('level', -1)) 3273 z = zlib.compressobj(opts.get('level', -1))
3284 3321
3285 if sum(map(len, parts)) < insize: 3322 if sum(map(len, parts)) < insize:
3286 return ''.join(parts) 3323 return ''.join(parts)
3287 return None 3324 return None
3288 3325
3326 def decompress(self, data):
3327 try:
3328 return zlib.decompress(data)
3329 except zlib.error as e:
3330 raise error.RevlogError(_('revlog decompress error: %s') %
3331 str(e))
3332
3289 def revlogcompressor(self, opts=None): 3333 def revlogcompressor(self, opts=None):
3290 return self.zlibrevlogcompressor() 3334 return self.zlibrevlogcompressor()
3291 3335
3292 compengines.register(_zlibengine()) 3336 compengines.register(_zlibengine())
3293 3337
3354 # Clients always support uncompressed payloads. Servers don't because 3398 # Clients always support uncompressed payloads. Servers don't because
3355 # unless you are on a fast network, uncompressed payloads can easily 3399 # unless you are on a fast network, uncompressed payloads can easily
3356 # saturate your network pipe. 3400 # saturate your network pipe.
3357 def wireprotosupport(self): 3401 def wireprotosupport(self):
3358 return compewireprotosupport('none', 0, 10) 3402 return compewireprotosupport('none', 0, 10)
3403
3404 # We don't implement revlogheader because it is handled specially
3405 # in the revlog class.
3359 3406
3360 def compressstream(self, it, opts=None): 3407 def compressstream(self, it, opts=None):
3361 return it 3408 return it
3362 3409
3363 def decompressorreader(self, fh): 3410 def decompressorreader(self, fh):
3395 return 'zstd', 'ZS' 3442 return 'zstd', 'ZS'
3396 3443
3397 def wireprotosupport(self): 3444 def wireprotosupport(self):
3398 return compewireprotosupport('zstd', 50, 50) 3445 return compewireprotosupport('zstd', 50, 50)
3399 3446
3447 def revlogheader(self):
3448 return '\x28'
3449
3400 def compressstream(self, it, opts=None): 3450 def compressstream(self, it, opts=None):
3401 opts = opts or {} 3451 opts = opts or {}
3402 # zstd level 3 is almost always significantly faster than zlib 3452 # zstd level 3 is almost always significantly faster than zlib
3403 # while providing no worse compression. It strikes a good balance 3453 # while providing no worse compression. It strikes a good balance
3404 # between speed and compression. 3454 # between speed and compression.
3423 # Writing the content size adds a few bytes to the output. However, 3473 # Writing the content size adds a few bytes to the output. However,
3424 # it allows decompression to be more optimal since we can 3474 # it allows decompression to be more optimal since we can
3425 # pre-allocate a buffer to hold the result. 3475 # pre-allocate a buffer to hold the result.
3426 self._cctx = zstd.ZstdCompressor(level=level, 3476 self._cctx = zstd.ZstdCompressor(level=level,
3427 write_content_size=True) 3477 write_content_size=True)
3478 self._dctx = zstd.ZstdDecompressor()
3428 self._compinsize = zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE 3479 self._compinsize = zstd.COMPRESSION_RECOMMENDED_INPUT_SIZE
3480 self._decompinsize = zstd.DECOMPRESSION_RECOMMENDED_INPUT_SIZE
3429 3481
3430 def compress(self, data): 3482 def compress(self, data):
3431 insize = len(data) 3483 insize = len(data)
3432 # Caller handles empty input case. 3484 # Caller handles empty input case.
3433 assert insize > 0 3485 assert insize > 0
3454 3506
3455 if sum(map(len, chunks)) < insize: 3507 if sum(map(len, chunks)) < insize:
3456 return ''.join(chunks) 3508 return ''.join(chunks)
3457 return None 3509 return None
3458 3510
3511 def decompress(self, data):
3512 insize = len(data)
3513
3514 try:
3515 # This was measured to be faster than other streaming
3516 # decompressors.
3517 dobj = self._dctx.decompressobj()
3518 chunks = []
3519 pos = 0
3520 while pos < insize:
3521 pos2 = pos + self._decompinsize
3522 chunk = dobj.decompress(data[pos:pos2])
3523 if chunk:
3524 chunks.append(chunk)
3525 pos = pos2
3526 # Frame should be exhausted, so no finish() API.
3527
3528 return ''.join(chunks)
3529 except Exception as e:
3530 raise error.RevlogError(_('revlog decompress error: %s') %
3531 str(e))
3532
3459 def revlogcompressor(self, opts=None): 3533 def revlogcompressor(self, opts=None):
3460 opts = opts or {} 3534 opts = opts or {}
3461 return self.zstdrevlogcompressor(self._module, 3535 return self.zstdrevlogcompressor(self._module,
3462 level=opts.get('level', 3)) 3536 level=opts.get('level', 3))
3463 3537