comparison mercurial/util.py @ 38713:27391d74aaa2

ssh: avoid reading beyond the end of stream when using compression Compressed streams can be used as part of getbundle. The normal read() operation of bufferedinputpipe will try to fulfill the request exactly and can deadlock if the server sends less as it is done. At the same time, the bundle2 logic will stop reading when it believes it has gotten all parts of the bundle, which can leave behind end of stream markers as used by bzip2 and zstd. To solve this, introduce a new optional unbufferedread interface and provided it in bufferedinputpipe and doublepipe. If there is buffered data left, it will be returned, otherwise it will issue a single read request and return whatever it obtains. Reorganize the decompression handlers to try harder to read until the end of stream, especially if the requested read can already be fulfilled. Check for end of stream is messy with Python 2, none of the standard compression modules properly exposes it. At least with zstd and bzip2, decompressing will remember EOS and fail for empty input after the EOS has been seen. For zlib, the only way to detect it with Python 2 is to duplicate the decompressobj and force some additional data into it. The common handler can be further optimized, but works as PoC. Differential Revision: https://phab.mercurial-scm.org/D3937
author Joerg Sonnenberger <joerg@bec.de>
date Thu, 12 Jul 2018 18:46:10 +0200
parents 152f4822d210
children 8751d1e2a7ff
comparison
equal deleted inserted replaced
38712:70a4289896b0 38713:27391d74aaa2
320 def read(self, size): 320 def read(self, size):
321 while (not self._eof) and (self._lenbuf < size): 321 while (not self._eof) and (self._lenbuf < size):
322 self._fillbuffer() 322 self._fillbuffer()
323 return self._frombuffer(size) 323 return self._frombuffer(size)
324 324
325 def unbufferedread(self, size):
326 if not self._eof and self._lenbuf == 0:
327 self._fillbuffer(max(size, _chunksize))
328 return self._frombuffer(min(self._lenbuf, size))
329
325 def readline(self, *args, **kwargs): 330 def readline(self, *args, **kwargs):
326 if 1 < len(self._buffer): 331 if 1 < len(self._buffer):
327 # this should not happen because both read and readline end with a 332 # this should not happen because both read and readline end with a
328 # _frombuffer call that collapse it. 333 # _frombuffer call that collapse it.
329 self._buffer = [''.join(self._buffer)] 334 self._buffer = [''.join(self._buffer)]
361 else: 366 else:
362 self._buffer = [] 367 self._buffer = []
363 self._lenbuf = 0 368 self._lenbuf = 0
364 return data 369 return data
365 370
366 def _fillbuffer(self): 371 def _fillbuffer(self, size=_chunksize):
367 """read data to the buffer""" 372 """read data to the buffer"""
368 data = os.read(self._input.fileno(), _chunksize) 373 data = os.read(self._input.fileno(), size)
369 if not data: 374 if not data:
370 self._eof = True 375 self._eof = True
371 else: 376 else:
372 self._lenbuf += len(data) 377 self._lenbuf += len(data)
373 self._buffer.append(data) 378 self._buffer.append(data)
3300 3305
3301 The object is reusable but is not thread safe. 3306 The object is reusable but is not thread safe.
3302 """ 3307 """
3303 raise NotImplementedError() 3308 raise NotImplementedError()
3304 3309
3310 class _CompressedStreamReader(object):
3311 def __init__(self, fh):
3312 if safehasattr(fh, 'unbufferedread'):
3313 self._reader = fh.unbufferedread
3314 else:
3315 self._reader = fh.read
3316 self._pending = []
3317 self._pos = 0
3318 self._eof = False
3319
3320 def _decompress(self, chunk):
3321 raise NotImplementedError()
3322
3323 def read(self, l):
3324 buf = []
3325 while True:
3326 while self._pending:
3327 if len(self._pending[0]) > l + self._pos:
3328 newbuf = self._pending[0]
3329 buf.append(newbuf[self._pos:self._pos + l])
3330 self._pos += l
3331 return ''.join(buf)
3332
3333 newbuf = self._pending.pop(0)
3334 if self._pos:
3335 buf.append(newbuf[self._pos:])
3336 l -= len(newbuf) - self._pos
3337 else:
3338 buf.append(newbuf)
3339 l -= len(newbuf)
3340 self._pos = 0
3341
3342 if self._eof:
3343 return ''.join(buf)
3344 chunk = self._reader(65536)
3345 self._decompress(chunk)
3346
3347 class _GzipCompressedStreamReader(_CompressedStreamReader):
3348 def __init__(self, fh):
3349 super(_GzipCompressedStreamReader, self).__init__(fh)
3350 self._decompobj = zlib.decompressobj()
3351 def _decompress(self, chunk):
3352 newbuf = self._decompobj.decompress(chunk)
3353 if newbuf:
3354 self._pending.append(newbuf)
3355 d = self._decompobj.copy()
3356 try:
3357 d.decompress('x')
3358 d.flush()
3359 if d.unused_data == 'x':
3360 self._eof = True
3361 except zlib.error:
3362 pass
3363
3364 class _BZ2CompressedStreamReader(_CompressedStreamReader):
3365 def __init__(self, fh):
3366 super(_BZ2CompressedStreamReader, self).__init__(fh)
3367 self._decompobj = bz2.BZ2Decompressor()
3368 def _decompress(self, chunk):
3369 newbuf = self._decompobj.decompress(chunk)
3370 if newbuf:
3371 self._pending.append(newbuf)
3372 try:
3373 while True:
3374 newbuf = self._decompobj.decompress('')
3375 if newbuf:
3376 self._pending.append(newbuf)
3377 else:
3378 break
3379 except EOFError:
3380 self._eof = True
3381
3382 class _TruncatedBZ2CompressedStreamReader(_BZ2CompressedStreamReader):
3383 def __init__(self, fh):
3384 super(_TruncatedBZ2CompressedStreamReader, self).__init__(fh)
3385 newbuf = self._decompobj.decompress('BZ')
3386 if newbuf:
3387 self._pending.append(newbuf)
3388
3389 class _ZstdCompressedStreamReader(_CompressedStreamReader):
3390 def __init__(self, fh, zstd):
3391 super(_ZstdCompressedStreamReader, self).__init__(fh)
3392 self._zstd = zstd
3393 self._decompobj = zstd.ZstdDecompressor().decompressobj()
3394 def _decompress(self, chunk):
3395 newbuf = self._decompobj.decompress(chunk)
3396 if newbuf:
3397 self._pending.append(newbuf)
3398 try:
3399 while True:
3400 newbuf = self._decompobj.decompress('')
3401 if newbuf:
3402 self._pending.append(newbuf)
3403 else:
3404 break
3405 except self._zstd.ZstdError:
3406 self._eof = True
3407
3305 class _zlibengine(compressionengine): 3408 class _zlibengine(compressionengine):
3306 def name(self): 3409 def name(self):
3307 return 'zlib' 3410 return 'zlib'
3308 3411
3309 def bundletype(self): 3412 def bundletype(self):
3333 yield data 3436 yield data
3334 3437
3335 yield z.flush() 3438 yield z.flush()
3336 3439
3337 def decompressorreader(self, fh): 3440 def decompressorreader(self, fh):
3338 def gen(): 3441 return _GzipCompressedStreamReader(fh)
3339 d = zlib.decompressobj()
3340 for chunk in filechunkiter(fh):
3341 while chunk:
3342 # Limit output size to limit memory.
3343 yield d.decompress(chunk, 2 ** 18)
3344 chunk = d.unconsumed_tail
3345
3346 return chunkbuffer(gen())
3347 3442
3348 class zlibrevlogcompressor(object): 3443 class zlibrevlogcompressor(object):
3349 def compress(self, data): 3444 def compress(self, data):
3350 insize = len(data) 3445 insize = len(data)
3351 # Caller handles empty input case. 3446 # Caller handles empty input case.
3421 yield data 3516 yield data
3422 3517
3423 yield z.flush() 3518 yield z.flush()
3424 3519
3425 def decompressorreader(self, fh): 3520 def decompressorreader(self, fh):
3426 def gen(): 3521 return _BZ2CompressedStreamReader(fh)
3427 d = bz2.BZ2Decompressor()
3428 for chunk in filechunkiter(fh):
3429 yield d.decompress(chunk)
3430
3431 return chunkbuffer(gen())
3432 3522
3433 compengines.register(_bz2engine()) 3523 compengines.register(_bz2engine())
3434 3524
3435 class _truncatedbz2engine(compressionengine): 3525 class _truncatedbz2engine(compressionengine):
3436 def name(self): 3526 def name(self):
3440 return None, '_truncatedBZ' 3530 return None, '_truncatedBZ'
3441 3531
3442 # We don't implement compressstream because it is hackily handled elsewhere. 3532 # We don't implement compressstream because it is hackily handled elsewhere.
3443 3533
3444 def decompressorreader(self, fh): 3534 def decompressorreader(self, fh):
3445 def gen(): 3535 return _TruncatedBZ2CompressedStreamReader(fh)
3446 # The input stream doesn't have the 'BZ' header. So add it back.
3447 d = bz2.BZ2Decompressor()
3448 d.decompress('BZ')
3449 for chunk in filechunkiter(fh):
3450 yield d.decompress(chunk)
3451
3452 return chunkbuffer(gen())
3453 3536
3454 compengines.register(_truncatedbz2engine()) 3537 compengines.register(_truncatedbz2engine())
3455 3538
3456 class _noopengine(compressionengine): 3539 class _noopengine(compressionengine):
3457 def name(self): 3540 def name(self):
3542 yield data 3625 yield data
3543 3626
3544 yield z.flush() 3627 yield z.flush()
3545 3628
3546 def decompressorreader(self, fh): 3629 def decompressorreader(self, fh):
3547 zstd = self._module 3630 return _ZstdCompressedStreamReader(fh, self._module)
3548 dctx = zstd.ZstdDecompressor()
3549 return chunkbuffer(dctx.read_from(fh))
3550 3631
3551 class zstdrevlogcompressor(object): 3632 class zstdrevlogcompressor(object):
3552 def __init__(self, zstd, level=3): 3633 def __init__(self, zstd, level=3):
3553 # TODO consider omitting frame magic to save 4 bytes. 3634 # TODO consider omitting frame magic to save 4 bytes.
3554 # This writes content sizes into the frame header. That is 3635 # This writes content sizes into the frame header. That is