Mercurial > hg
view contrib/python-zstandard/zstandard/cffi.py @ 42814:2c4f656c8e9f
interfaceutil: move to interfaces/
Now that we have a dedicated folder for interfaces, let's move interfaceutil
there.
Differential Revision: https://phab.mercurial-scm.org/D6742
author | Pulkit Goyal <pulkit@yandex-team.ru> |
---|---|
date | Sun, 18 Aug 2019 02:28:42 +0300 |
parents | 675775c33ab6 |
children | 69de49c4e39c |
line wrap: on
line source
# Copyright (c) 2016-present, Gregory Szorc # All rights reserved. # # This software may be modified and distributed under the terms # of the BSD license. See the LICENSE file for details. """Python interface to the Zstandard (zstd) compression library.""" from __future__ import absolute_import, unicode_literals # This should match what the C extension exports. __all__ = [ #'BufferSegment', #'BufferSegments', #'BufferWithSegments', #'BufferWithSegmentsCollection', 'CompressionParameters', 'ZstdCompressionDict', 'ZstdCompressionParameters', 'ZstdCompressor', 'ZstdError', 'ZstdDecompressor', 'FrameParameters', 'estimate_decompression_context_size', 'frame_content_size', 'frame_header_size', 'get_frame_parameters', 'train_dictionary', # Constants. 'FLUSH_BLOCK', 'FLUSH_FRAME', 'COMPRESSOBJ_FLUSH_FINISH', 'COMPRESSOBJ_FLUSH_BLOCK', 'ZSTD_VERSION', 'FRAME_HEADER', 'CONTENTSIZE_UNKNOWN', 'CONTENTSIZE_ERROR', 'MAX_COMPRESSION_LEVEL', 'COMPRESSION_RECOMMENDED_INPUT_SIZE', 'COMPRESSION_RECOMMENDED_OUTPUT_SIZE', 'DECOMPRESSION_RECOMMENDED_INPUT_SIZE', 'DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE', 'MAGIC_NUMBER', 'BLOCKSIZELOG_MAX', 'BLOCKSIZE_MAX', 'WINDOWLOG_MIN', 'WINDOWLOG_MAX', 'CHAINLOG_MIN', 'CHAINLOG_MAX', 'HASHLOG_MIN', 'HASHLOG_MAX', 'HASHLOG3_MAX', 'MINMATCH_MIN', 'MINMATCH_MAX', 'SEARCHLOG_MIN', 'SEARCHLOG_MAX', 'SEARCHLENGTH_MIN', 'SEARCHLENGTH_MAX', 'TARGETLENGTH_MIN', 'TARGETLENGTH_MAX', 'LDM_MINMATCH_MIN', 'LDM_MINMATCH_MAX', 'LDM_BUCKETSIZELOG_MAX', 'STRATEGY_FAST', 'STRATEGY_DFAST', 'STRATEGY_GREEDY', 'STRATEGY_LAZY', 'STRATEGY_LAZY2', 'STRATEGY_BTLAZY2', 'STRATEGY_BTOPT', 'STRATEGY_BTULTRA', 'STRATEGY_BTULTRA2', 'DICT_TYPE_AUTO', 'DICT_TYPE_RAWCONTENT', 'DICT_TYPE_FULLDICT', 'FORMAT_ZSTD1', 'FORMAT_ZSTD1_MAGICLESS', ] import io import os import sys from _zstd_cffi import ( ffi, lib, ) if sys.version_info[0] == 2: bytes_type = str int_type = long else: bytes_type = bytes int_type = int COMPRESSION_RECOMMENDED_INPUT_SIZE = lib.ZSTD_CStreamInSize() COMPRESSION_RECOMMENDED_OUTPUT_SIZE = lib.ZSTD_CStreamOutSize() DECOMPRESSION_RECOMMENDED_INPUT_SIZE = lib.ZSTD_DStreamInSize() DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE = lib.ZSTD_DStreamOutSize() new_nonzero = ffi.new_allocator(should_clear_after_alloc=False) MAX_COMPRESSION_LEVEL = lib.ZSTD_maxCLevel() MAGIC_NUMBER = lib.ZSTD_MAGICNUMBER FRAME_HEADER = b'\x28\xb5\x2f\xfd' CONTENTSIZE_UNKNOWN = lib.ZSTD_CONTENTSIZE_UNKNOWN CONTENTSIZE_ERROR = lib.ZSTD_CONTENTSIZE_ERROR ZSTD_VERSION = (lib.ZSTD_VERSION_MAJOR, lib.ZSTD_VERSION_MINOR, lib.ZSTD_VERSION_RELEASE) BLOCKSIZELOG_MAX = lib.ZSTD_BLOCKSIZELOG_MAX BLOCKSIZE_MAX = lib.ZSTD_BLOCKSIZE_MAX WINDOWLOG_MIN = lib.ZSTD_WINDOWLOG_MIN WINDOWLOG_MAX = lib.ZSTD_WINDOWLOG_MAX CHAINLOG_MIN = lib.ZSTD_CHAINLOG_MIN CHAINLOG_MAX = lib.ZSTD_CHAINLOG_MAX HASHLOG_MIN = lib.ZSTD_HASHLOG_MIN HASHLOG_MAX = lib.ZSTD_HASHLOG_MAX HASHLOG3_MAX = lib.ZSTD_HASHLOG3_MAX MINMATCH_MIN = lib.ZSTD_MINMATCH_MIN MINMATCH_MAX = lib.ZSTD_MINMATCH_MAX SEARCHLOG_MIN = lib.ZSTD_SEARCHLOG_MIN SEARCHLOG_MAX = lib.ZSTD_SEARCHLOG_MAX SEARCHLENGTH_MIN = lib.ZSTD_MINMATCH_MIN SEARCHLENGTH_MAX = lib.ZSTD_MINMATCH_MAX TARGETLENGTH_MIN = lib.ZSTD_TARGETLENGTH_MIN TARGETLENGTH_MAX = lib.ZSTD_TARGETLENGTH_MAX LDM_MINMATCH_MIN = lib.ZSTD_LDM_MINMATCH_MIN LDM_MINMATCH_MAX = lib.ZSTD_LDM_MINMATCH_MAX LDM_BUCKETSIZELOG_MAX = lib.ZSTD_LDM_BUCKETSIZELOG_MAX STRATEGY_FAST = lib.ZSTD_fast STRATEGY_DFAST = lib.ZSTD_dfast STRATEGY_GREEDY = lib.ZSTD_greedy STRATEGY_LAZY = lib.ZSTD_lazy STRATEGY_LAZY2 = lib.ZSTD_lazy2 STRATEGY_BTLAZY2 = lib.ZSTD_btlazy2 STRATEGY_BTOPT = lib.ZSTD_btopt STRATEGY_BTULTRA = lib.ZSTD_btultra STRATEGY_BTULTRA2 = lib.ZSTD_btultra2 DICT_TYPE_AUTO = lib.ZSTD_dct_auto DICT_TYPE_RAWCONTENT = lib.ZSTD_dct_rawContent DICT_TYPE_FULLDICT = lib.ZSTD_dct_fullDict FORMAT_ZSTD1 = lib.ZSTD_f_zstd1 FORMAT_ZSTD1_MAGICLESS = lib.ZSTD_f_zstd1_magicless FLUSH_BLOCK = 0 FLUSH_FRAME = 1 COMPRESSOBJ_FLUSH_FINISH = 0 COMPRESSOBJ_FLUSH_BLOCK = 1 def _cpu_count(): # os.cpu_count() was introducd in Python 3.4. try: return os.cpu_count() or 0 except AttributeError: pass # Linux. try: if sys.version_info[0] == 2: return os.sysconf(b'SC_NPROCESSORS_ONLN') else: return os.sysconf(u'SC_NPROCESSORS_ONLN') except (AttributeError, ValueError): pass # TODO implement on other platforms. return 0 class ZstdError(Exception): pass def _zstd_error(zresult): # Resolves to bytes on Python 2 and 3. We use the string for formatting # into error messages, which will be literal unicode. So convert it to # unicode. return ffi.string(lib.ZSTD_getErrorName(zresult)).decode('utf-8') def _make_cctx_params(params): res = lib.ZSTD_createCCtxParams() if res == ffi.NULL: raise MemoryError() res = ffi.gc(res, lib.ZSTD_freeCCtxParams) attrs = [ (lib.ZSTD_c_format, params.format), (lib.ZSTD_c_compressionLevel, params.compression_level), (lib.ZSTD_c_windowLog, params.window_log), (lib.ZSTD_c_hashLog, params.hash_log), (lib.ZSTD_c_chainLog, params.chain_log), (lib.ZSTD_c_searchLog, params.search_log), (lib.ZSTD_c_minMatch, params.min_match), (lib.ZSTD_c_targetLength, params.target_length), (lib.ZSTD_c_strategy, params.compression_strategy), (lib.ZSTD_c_contentSizeFlag, params.write_content_size), (lib.ZSTD_c_checksumFlag, params.write_checksum), (lib.ZSTD_c_dictIDFlag, params.write_dict_id), (lib.ZSTD_c_nbWorkers, params.threads), (lib.ZSTD_c_jobSize, params.job_size), (lib.ZSTD_c_overlapLog, params.overlap_log), (lib.ZSTD_c_forceMaxWindow, params.force_max_window), (lib.ZSTD_c_enableLongDistanceMatching, params.enable_ldm), (lib.ZSTD_c_ldmHashLog, params.ldm_hash_log), (lib.ZSTD_c_ldmMinMatch, params.ldm_min_match), (lib.ZSTD_c_ldmBucketSizeLog, params.ldm_bucket_size_log), (lib.ZSTD_c_ldmHashRateLog, params.ldm_hash_rate_log), ] for param, value in attrs: _set_compression_parameter(res, param, value) return res class ZstdCompressionParameters(object): @staticmethod def from_level(level, source_size=0, dict_size=0, **kwargs): params = lib.ZSTD_getCParams(level, source_size, dict_size) args = { 'window_log': 'windowLog', 'chain_log': 'chainLog', 'hash_log': 'hashLog', 'search_log': 'searchLog', 'min_match': 'minMatch', 'target_length': 'targetLength', 'compression_strategy': 'strategy', } for arg, attr in args.items(): if arg not in kwargs: kwargs[arg] = getattr(params, attr) return ZstdCompressionParameters(**kwargs) def __init__(self, format=0, compression_level=0, window_log=0, hash_log=0, chain_log=0, search_log=0, min_match=0, target_length=0, strategy=-1, compression_strategy=-1, write_content_size=1, write_checksum=0, write_dict_id=0, job_size=0, overlap_log=-1, overlap_size_log=-1, force_max_window=0, enable_ldm=0, ldm_hash_log=0, ldm_min_match=0, ldm_bucket_size_log=0, ldm_hash_rate_log=-1, ldm_hash_every_log=-1, threads=0): params = lib.ZSTD_createCCtxParams() if params == ffi.NULL: raise MemoryError() params = ffi.gc(params, lib.ZSTD_freeCCtxParams) self._params = params if threads < 0: threads = _cpu_count() # We need to set ZSTD_c_nbWorkers before ZSTD_c_jobSize and ZSTD_c_overlapLog # because setting ZSTD_c_nbWorkers resets the other parameters. _set_compression_parameter(params, lib.ZSTD_c_nbWorkers, threads) _set_compression_parameter(params, lib.ZSTD_c_format, format) _set_compression_parameter(params, lib.ZSTD_c_compressionLevel, compression_level) _set_compression_parameter(params, lib.ZSTD_c_windowLog, window_log) _set_compression_parameter(params, lib.ZSTD_c_hashLog, hash_log) _set_compression_parameter(params, lib.ZSTD_c_chainLog, chain_log) _set_compression_parameter(params, lib.ZSTD_c_searchLog, search_log) _set_compression_parameter(params, lib.ZSTD_c_minMatch, min_match) _set_compression_parameter(params, lib.ZSTD_c_targetLength, target_length) if strategy != -1 and compression_strategy != -1: raise ValueError('cannot specify both compression_strategy and strategy') if compression_strategy != -1: strategy = compression_strategy elif strategy == -1: strategy = 0 _set_compression_parameter(params, lib.ZSTD_c_strategy, strategy) _set_compression_parameter(params, lib.ZSTD_c_contentSizeFlag, write_content_size) _set_compression_parameter(params, lib.ZSTD_c_checksumFlag, write_checksum) _set_compression_parameter(params, lib.ZSTD_c_dictIDFlag, write_dict_id) _set_compression_parameter(params, lib.ZSTD_c_jobSize, job_size) if overlap_log != -1 and overlap_size_log != -1: raise ValueError('cannot specify both overlap_log and overlap_size_log') if overlap_size_log != -1: overlap_log = overlap_size_log elif overlap_log == -1: overlap_log = 0 _set_compression_parameter(params, lib.ZSTD_c_overlapLog, overlap_log) _set_compression_parameter(params, lib.ZSTD_c_forceMaxWindow, force_max_window) _set_compression_parameter(params, lib.ZSTD_c_enableLongDistanceMatching, enable_ldm) _set_compression_parameter(params, lib.ZSTD_c_ldmHashLog, ldm_hash_log) _set_compression_parameter(params, lib.ZSTD_c_ldmMinMatch, ldm_min_match) _set_compression_parameter(params, lib.ZSTD_c_ldmBucketSizeLog, ldm_bucket_size_log) if ldm_hash_rate_log != -1 and ldm_hash_every_log != -1: raise ValueError('cannot specify both ldm_hash_rate_log and ldm_hash_every_log') if ldm_hash_every_log != -1: ldm_hash_rate_log = ldm_hash_every_log elif ldm_hash_rate_log == -1: ldm_hash_rate_log = 0 _set_compression_parameter(params, lib.ZSTD_c_ldmHashRateLog, ldm_hash_rate_log) @property def format(self): return _get_compression_parameter(self._params, lib.ZSTD_c_format) @property def compression_level(self): return _get_compression_parameter(self._params, lib.ZSTD_c_compressionLevel) @property def window_log(self): return _get_compression_parameter(self._params, lib.ZSTD_c_windowLog) @property def hash_log(self): return _get_compression_parameter(self._params, lib.ZSTD_c_hashLog) @property def chain_log(self): return _get_compression_parameter(self._params, lib.ZSTD_c_chainLog) @property def search_log(self): return _get_compression_parameter(self._params, lib.ZSTD_c_searchLog) @property def min_match(self): return _get_compression_parameter(self._params, lib.ZSTD_c_minMatch) @property def target_length(self): return _get_compression_parameter(self._params, lib.ZSTD_c_targetLength) @property def compression_strategy(self): return _get_compression_parameter(self._params, lib.ZSTD_c_strategy) @property def write_content_size(self): return _get_compression_parameter(self._params, lib.ZSTD_c_contentSizeFlag) @property def write_checksum(self): return _get_compression_parameter(self._params, lib.ZSTD_c_checksumFlag) @property def write_dict_id(self): return _get_compression_parameter(self._params, lib.ZSTD_c_dictIDFlag) @property def job_size(self): return _get_compression_parameter(self._params, lib.ZSTD_c_jobSize) @property def overlap_log(self): return _get_compression_parameter(self._params, lib.ZSTD_c_overlapLog) @property def overlap_size_log(self): return self.overlap_log @property def force_max_window(self): return _get_compression_parameter(self._params, lib.ZSTD_c_forceMaxWindow) @property def enable_ldm(self): return _get_compression_parameter(self._params, lib.ZSTD_c_enableLongDistanceMatching) @property def ldm_hash_log(self): return _get_compression_parameter(self._params, lib.ZSTD_c_ldmHashLog) @property def ldm_min_match(self): return _get_compression_parameter(self._params, lib.ZSTD_c_ldmMinMatch) @property def ldm_bucket_size_log(self): return _get_compression_parameter(self._params, lib.ZSTD_c_ldmBucketSizeLog) @property def ldm_hash_rate_log(self): return _get_compression_parameter(self._params, lib.ZSTD_c_ldmHashRateLog) @property def ldm_hash_every_log(self): return self.ldm_hash_rate_log @property def threads(self): return _get_compression_parameter(self._params, lib.ZSTD_c_nbWorkers) def estimated_compression_context_size(self): return lib.ZSTD_estimateCCtxSize_usingCCtxParams(self._params) CompressionParameters = ZstdCompressionParameters def estimate_decompression_context_size(): return lib.ZSTD_estimateDCtxSize() def _set_compression_parameter(params, param, value): zresult = lib.ZSTD_CCtxParam_setParameter(params, param, value) if lib.ZSTD_isError(zresult): raise ZstdError('unable to set compression context parameter: %s' % _zstd_error(zresult)) def _get_compression_parameter(params, param): result = ffi.new('int *') zresult = lib.ZSTD_CCtxParam_getParameter(params, param, result) if lib.ZSTD_isError(zresult): raise ZstdError('unable to get compression context parameter: %s' % _zstd_error(zresult)) return result[0] class ZstdCompressionWriter(object): def __init__(self, compressor, writer, source_size, write_size, write_return_read): self._compressor = compressor self._writer = writer self._write_size = write_size self._write_return_read = bool(write_return_read) self._entered = False self._closed = False self._bytes_compressed = 0 self._dst_buffer = ffi.new('char[]', write_size) self._out_buffer = ffi.new('ZSTD_outBuffer *') self._out_buffer.dst = self._dst_buffer self._out_buffer.size = len(self._dst_buffer) self._out_buffer.pos = 0 zresult = lib.ZSTD_CCtx_setPledgedSrcSize(compressor._cctx, source_size) if lib.ZSTD_isError(zresult): raise ZstdError('error setting source size: %s' % _zstd_error(zresult)) def __enter__(self): if self._closed: raise ValueError('stream is closed') if self._entered: raise ZstdError('cannot __enter__ multiple times') self._entered = True return self def __exit__(self, exc_type, exc_value, exc_tb): self._entered = False if not exc_type and not exc_value and not exc_tb: self.close() self._compressor = None return False def memory_size(self): return lib.ZSTD_sizeof_CCtx(self._compressor._cctx) def fileno(self): f = getattr(self._writer, 'fileno', None) if f: return f() else: raise OSError('fileno not available on underlying writer') def close(self): if self._closed: return try: self.flush(FLUSH_FRAME) finally: self._closed = True # Call close() on underlying stream as well. f = getattr(self._writer, 'close', None) if f: f() @property def closed(self): return self._closed def isatty(self): return False def readable(self): return False def readline(self, size=-1): raise io.UnsupportedOperation() def readlines(self, hint=-1): raise io.UnsupportedOperation() def seek(self, offset, whence=None): raise io.UnsupportedOperation() def seekable(self): return False def truncate(self, size=None): raise io.UnsupportedOperation() def writable(self): return True def writelines(self, lines): raise NotImplementedError('writelines() is not yet implemented') def read(self, size=-1): raise io.UnsupportedOperation() def readall(self): raise io.UnsupportedOperation() def readinto(self, b): raise io.UnsupportedOperation() def write(self, data): if self._closed: raise ValueError('stream is closed') total_write = 0 data_buffer = ffi.from_buffer(data) in_buffer = ffi.new('ZSTD_inBuffer *') in_buffer.src = data_buffer in_buffer.size = len(data_buffer) in_buffer.pos = 0 out_buffer = self._out_buffer out_buffer.pos = 0 while in_buffer.pos < in_buffer.size: zresult = lib.ZSTD_compressStream2(self._compressor._cctx, out_buffer, in_buffer, lib.ZSTD_e_continue) if lib.ZSTD_isError(zresult): raise ZstdError('zstd compress error: %s' % _zstd_error(zresult)) if out_buffer.pos: self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:]) total_write += out_buffer.pos self._bytes_compressed += out_buffer.pos out_buffer.pos = 0 if self._write_return_read: return in_buffer.pos else: return total_write def flush(self, flush_mode=FLUSH_BLOCK): if flush_mode == FLUSH_BLOCK: flush = lib.ZSTD_e_flush elif flush_mode == FLUSH_FRAME: flush = lib.ZSTD_e_end else: raise ValueError('unknown flush_mode: %r' % flush_mode) if self._closed: raise ValueError('stream is closed') total_write = 0 out_buffer = self._out_buffer out_buffer.pos = 0 in_buffer = ffi.new('ZSTD_inBuffer *') in_buffer.src = ffi.NULL in_buffer.size = 0 in_buffer.pos = 0 while True: zresult = lib.ZSTD_compressStream2(self._compressor._cctx, out_buffer, in_buffer, flush) if lib.ZSTD_isError(zresult): raise ZstdError('zstd compress error: %s' % _zstd_error(zresult)) if out_buffer.pos: self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:]) total_write += out_buffer.pos self._bytes_compressed += out_buffer.pos out_buffer.pos = 0 if not zresult: break return total_write def tell(self): return self._bytes_compressed class ZstdCompressionObj(object): def compress(self, data): if self._finished: raise ZstdError('cannot call compress() after compressor finished') data_buffer = ffi.from_buffer(data) source = ffi.new('ZSTD_inBuffer *') source.src = data_buffer source.size = len(data_buffer) source.pos = 0 chunks = [] while source.pos < len(data): zresult = lib.ZSTD_compressStream2(self._compressor._cctx, self._out, source, lib.ZSTD_e_continue) if lib.ZSTD_isError(zresult): raise ZstdError('zstd compress error: %s' % _zstd_error(zresult)) if self._out.pos: chunks.append(ffi.buffer(self._out.dst, self._out.pos)[:]) self._out.pos = 0 return b''.join(chunks) def flush(self, flush_mode=COMPRESSOBJ_FLUSH_FINISH): if flush_mode not in (COMPRESSOBJ_FLUSH_FINISH, COMPRESSOBJ_FLUSH_BLOCK): raise ValueError('flush mode not recognized') if self._finished: raise ZstdError('compressor object already finished') if flush_mode == COMPRESSOBJ_FLUSH_BLOCK: z_flush_mode = lib.ZSTD_e_flush elif flush_mode == COMPRESSOBJ_FLUSH_FINISH: z_flush_mode = lib.ZSTD_e_end self._finished = True else: raise ZstdError('unhandled flush mode') assert self._out.pos == 0 in_buffer = ffi.new('ZSTD_inBuffer *') in_buffer.src = ffi.NULL in_buffer.size = 0 in_buffer.pos = 0 chunks = [] while True: zresult = lib.ZSTD_compressStream2(self._compressor._cctx, self._out, in_buffer, z_flush_mode) if lib.ZSTD_isError(zresult): raise ZstdError('error ending compression stream: %s' % _zstd_error(zresult)) if self._out.pos: chunks.append(ffi.buffer(self._out.dst, self._out.pos)[:]) self._out.pos = 0 if not zresult: break return b''.join(chunks) class ZstdCompressionChunker(object): def __init__(self, compressor, chunk_size): self._compressor = compressor self._out = ffi.new('ZSTD_outBuffer *') self._dst_buffer = ffi.new('char[]', chunk_size) self._out.dst = self._dst_buffer self._out.size = chunk_size self._out.pos = 0 self._in = ffi.new('ZSTD_inBuffer *') self._in.src = ffi.NULL self._in.size = 0 self._in.pos = 0 self._finished = False def compress(self, data): if self._finished: raise ZstdError('cannot call compress() after compression finished') if self._in.src != ffi.NULL: raise ZstdError('cannot perform operation before consuming output ' 'from previous operation') data_buffer = ffi.from_buffer(data) if not len(data_buffer): return self._in.src = data_buffer self._in.size = len(data_buffer) self._in.pos = 0 while self._in.pos < self._in.size: zresult = lib.ZSTD_compressStream2(self._compressor._cctx, self._out, self._in, lib.ZSTD_e_continue) if self._in.pos == self._in.size: self._in.src = ffi.NULL self._in.size = 0 self._in.pos = 0 if lib.ZSTD_isError(zresult): raise ZstdError('zstd compress error: %s' % _zstd_error(zresult)) if self._out.pos == self._out.size: yield ffi.buffer(self._out.dst, self._out.pos)[:] self._out.pos = 0 def flush(self): if self._finished: raise ZstdError('cannot call flush() after compression finished') if self._in.src != ffi.NULL: raise ZstdError('cannot call flush() before consuming output from ' 'previous operation') while True: zresult = lib.ZSTD_compressStream2(self._compressor._cctx, self._out, self._in, lib.ZSTD_e_flush) if lib.ZSTD_isError(zresult): raise ZstdError('zstd compress error: %s' % _zstd_error(zresult)) if self._out.pos: yield ffi.buffer(self._out.dst, self._out.pos)[:] self._out.pos = 0 if not zresult: return def finish(self): if self._finished: raise ZstdError('cannot call finish() after compression finished') if self._in.src != ffi.NULL: raise ZstdError('cannot call finish() before consuming output from ' 'previous operation') while True: zresult = lib.ZSTD_compressStream2(self._compressor._cctx, self._out, self._in, lib.ZSTD_e_end) if lib.ZSTD_isError(zresult): raise ZstdError('zstd compress error: %s' % _zstd_error(zresult)) if self._out.pos: yield ffi.buffer(self._out.dst, self._out.pos)[:] self._out.pos = 0 if not zresult: self._finished = True return class ZstdCompressionReader(object): def __init__(self, compressor, source, read_size): self._compressor = compressor self._source = source self._read_size = read_size self._entered = False self._closed = False self._bytes_compressed = 0 self._finished_input = False self._finished_output = False self._in_buffer = ffi.new('ZSTD_inBuffer *') # Holds a ref so backing bytes in self._in_buffer stay alive. self._source_buffer = None def __enter__(self): if self._entered: raise ValueError('cannot __enter__ multiple times') self._entered = True return self def __exit__(self, exc_type, exc_value, exc_tb): self._entered = False self._closed = True self._source = None self._compressor = None return False def readable(self): return True def writable(self): return False def seekable(self): return False def readline(self): raise io.UnsupportedOperation() def readlines(self): raise io.UnsupportedOperation() def write(self, data): raise OSError('stream is not writable') def writelines(self, ignored): raise OSError('stream is not writable') def isatty(self): return False def flush(self): return None def close(self): self._closed = True return None @property def closed(self): return self._closed def tell(self): return self._bytes_compressed def readall(self): chunks = [] while True: chunk = self.read(1048576) if not chunk: break chunks.append(chunk) return b''.join(chunks) def __iter__(self): raise io.UnsupportedOperation() def __next__(self): raise io.UnsupportedOperation() next = __next__ def _read_input(self): if self._finished_input: return if hasattr(self._source, 'read'): data = self._source.read(self._read_size) if not data: self._finished_input = True return self._source_buffer = ffi.from_buffer(data) self._in_buffer.src = self._source_buffer self._in_buffer.size = len(self._source_buffer) self._in_buffer.pos = 0 else: self._source_buffer = ffi.from_buffer(self._source) self._in_buffer.src = self._source_buffer self._in_buffer.size = len(self._source_buffer) self._in_buffer.pos = 0 def _compress_into_buffer(self, out_buffer): if self._in_buffer.pos >= self._in_buffer.size: return old_pos = out_buffer.pos zresult = lib.ZSTD_compressStream2(self._compressor._cctx, out_buffer, self._in_buffer, lib.ZSTD_e_continue) self._bytes_compressed += out_buffer.pos - old_pos if self._in_buffer.pos == self._in_buffer.size: self._in_buffer.src = ffi.NULL self._in_buffer.pos = 0 self._in_buffer.size = 0 self._source_buffer = None if not hasattr(self._source, 'read'): self._finished_input = True if lib.ZSTD_isError(zresult): raise ZstdError('zstd compress error: %s', _zstd_error(zresult)) return out_buffer.pos and out_buffer.pos == out_buffer.size def read(self, size=-1): if self._closed: raise ValueError('stream is closed') if size < -1: raise ValueError('cannot read negative amounts less than -1') if size == -1: return self.readall() if self._finished_output or size == 0: return b'' # Need a dedicated ref to dest buffer otherwise it gets collected. dst_buffer = ffi.new('char[]', size) out_buffer = ffi.new('ZSTD_outBuffer *') out_buffer.dst = dst_buffer out_buffer.size = size out_buffer.pos = 0 if self._compress_into_buffer(out_buffer): return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] while not self._finished_input: self._read_input() if self._compress_into_buffer(out_buffer): return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] # EOF old_pos = out_buffer.pos zresult = lib.ZSTD_compressStream2(self._compressor._cctx, out_buffer, self._in_buffer, lib.ZSTD_e_end) self._bytes_compressed += out_buffer.pos - old_pos if lib.ZSTD_isError(zresult): raise ZstdError('error ending compression stream: %s', _zstd_error(zresult)) if zresult == 0: self._finished_output = True return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] def read1(self, size=-1): if self._closed: raise ValueError('stream is closed') if size < -1: raise ValueError('cannot read negative amounts less than -1') if self._finished_output or size == 0: return b'' # -1 returns arbitrary number of bytes. if size == -1: size = COMPRESSION_RECOMMENDED_OUTPUT_SIZE dst_buffer = ffi.new('char[]', size) out_buffer = ffi.new('ZSTD_outBuffer *') out_buffer.dst = dst_buffer out_buffer.size = size out_buffer.pos = 0 # read1() dictates that we can perform at most 1 call to the # underlying stream to get input. However, we can't satisfy this # restriction with compression because not all input generates output. # It is possible to perform a block flush in order to ensure output. # But this may not be desirable behavior. So we allow multiple read() # to the underlying stream. But unlike read(), we stop once we have # any output. self._compress_into_buffer(out_buffer) if out_buffer.pos: return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] while not self._finished_input: self._read_input() # If we've filled the output buffer, return immediately. if self._compress_into_buffer(out_buffer): return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] # If we've populated the output buffer and we're not at EOF, # also return, as we've satisfied the read1() limits. if out_buffer.pos and not self._finished_input: return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] # Else if we're at EOS and we have room left in the buffer, # fall through to below and try to add more data to the output. # EOF. old_pos = out_buffer.pos zresult = lib.ZSTD_compressStream2(self._compressor._cctx, out_buffer, self._in_buffer, lib.ZSTD_e_end) self._bytes_compressed += out_buffer.pos - old_pos if lib.ZSTD_isError(zresult): raise ZstdError('error ending compression stream: %s' % _zstd_error(zresult)) if zresult == 0: self._finished_output = True return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] def readinto(self, b): if self._closed: raise ValueError('stream is closed') if self._finished_output: return 0 # TODO use writable=True once we require CFFI >= 1.12. dest_buffer = ffi.from_buffer(b) ffi.memmove(b, b'', 0) out_buffer = ffi.new('ZSTD_outBuffer *') out_buffer.dst = dest_buffer out_buffer.size = len(dest_buffer) out_buffer.pos = 0 if self._compress_into_buffer(out_buffer): return out_buffer.pos while not self._finished_input: self._read_input() if self._compress_into_buffer(out_buffer): return out_buffer.pos # EOF. old_pos = out_buffer.pos zresult = lib.ZSTD_compressStream2(self._compressor._cctx, out_buffer, self._in_buffer, lib.ZSTD_e_end) self._bytes_compressed += out_buffer.pos - old_pos if lib.ZSTD_isError(zresult): raise ZstdError('error ending compression stream: %s', _zstd_error(zresult)) if zresult == 0: self._finished_output = True return out_buffer.pos def readinto1(self, b): if self._closed: raise ValueError('stream is closed') if self._finished_output: return 0 # TODO use writable=True once we require CFFI >= 1.12. dest_buffer = ffi.from_buffer(b) ffi.memmove(b, b'', 0) out_buffer = ffi.new('ZSTD_outBuffer *') out_buffer.dst = dest_buffer out_buffer.size = len(dest_buffer) out_buffer.pos = 0 self._compress_into_buffer(out_buffer) if out_buffer.pos: return out_buffer.pos while not self._finished_input: self._read_input() if self._compress_into_buffer(out_buffer): return out_buffer.pos if out_buffer.pos and not self._finished_input: return out_buffer.pos # EOF. old_pos = out_buffer.pos zresult = lib.ZSTD_compressStream2(self._compressor._cctx, out_buffer, self._in_buffer, lib.ZSTD_e_end) self._bytes_compressed += out_buffer.pos - old_pos if lib.ZSTD_isError(zresult): raise ZstdError('error ending compression stream: %s' % _zstd_error(zresult)) if zresult == 0: self._finished_output = True return out_buffer.pos class ZstdCompressor(object): def __init__(self, level=3, dict_data=None, compression_params=None, write_checksum=None, write_content_size=None, write_dict_id=None, threads=0): if level > lib.ZSTD_maxCLevel(): raise ValueError('level must be less than %d' % lib.ZSTD_maxCLevel()) if threads < 0: threads = _cpu_count() if compression_params and write_checksum is not None: raise ValueError('cannot define compression_params and ' 'write_checksum') if compression_params and write_content_size is not None: raise ValueError('cannot define compression_params and ' 'write_content_size') if compression_params and write_dict_id is not None: raise ValueError('cannot define compression_params and ' 'write_dict_id') if compression_params and threads: raise ValueError('cannot define compression_params and threads') if compression_params: self._params = _make_cctx_params(compression_params) else: if write_dict_id is None: write_dict_id = True params = lib.ZSTD_createCCtxParams() if params == ffi.NULL: raise MemoryError() self._params = ffi.gc(params, lib.ZSTD_freeCCtxParams) _set_compression_parameter(self._params, lib.ZSTD_c_compressionLevel, level) _set_compression_parameter( self._params, lib.ZSTD_c_contentSizeFlag, write_content_size if write_content_size is not None else 1) _set_compression_parameter(self._params, lib.ZSTD_c_checksumFlag, 1 if write_checksum else 0) _set_compression_parameter(self._params, lib.ZSTD_c_dictIDFlag, 1 if write_dict_id else 0) if threads: _set_compression_parameter(self._params, lib.ZSTD_c_nbWorkers, threads) cctx = lib.ZSTD_createCCtx() if cctx == ffi.NULL: raise MemoryError() self._cctx = cctx self._dict_data = dict_data # We defer setting up garbage collection until after calling # _setup_cctx() to ensure the memory size estimate is more accurate. try: self._setup_cctx() finally: self._cctx = ffi.gc(cctx, lib.ZSTD_freeCCtx, size=lib.ZSTD_sizeof_CCtx(cctx)) def _setup_cctx(self): zresult = lib.ZSTD_CCtx_setParametersUsingCCtxParams(self._cctx, self._params) if lib.ZSTD_isError(zresult): raise ZstdError('could not set compression parameters: %s' % _zstd_error(zresult)) dict_data = self._dict_data if dict_data: if dict_data._cdict: zresult = lib.ZSTD_CCtx_refCDict(self._cctx, dict_data._cdict) else: zresult = lib.ZSTD_CCtx_loadDictionary_advanced( self._cctx, dict_data.as_bytes(), len(dict_data), lib.ZSTD_dlm_byRef, dict_data._dict_type) if lib.ZSTD_isError(zresult): raise ZstdError('could not load compression dictionary: %s' % _zstd_error(zresult)) def memory_size(self): return lib.ZSTD_sizeof_CCtx(self._cctx) def compress(self, data): lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) data_buffer = ffi.from_buffer(data) dest_size = lib.ZSTD_compressBound(len(data_buffer)) out = new_nonzero('char[]', dest_size) zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, len(data_buffer)) if lib.ZSTD_isError(zresult): raise ZstdError('error setting source size: %s' % _zstd_error(zresult)) out_buffer = ffi.new('ZSTD_outBuffer *') in_buffer = ffi.new('ZSTD_inBuffer *') out_buffer.dst = out out_buffer.size = dest_size out_buffer.pos = 0 in_buffer.src = data_buffer in_buffer.size = len(data_buffer) in_buffer.pos = 0 zresult = lib.ZSTD_compressStream2(self._cctx, out_buffer, in_buffer, lib.ZSTD_e_end) if lib.ZSTD_isError(zresult): raise ZstdError('cannot compress: %s' % _zstd_error(zresult)) elif zresult: raise ZstdError('unexpected partial frame flush') return ffi.buffer(out, out_buffer.pos)[:] def compressobj(self, size=-1): lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) if size < 0: size = lib.ZSTD_CONTENTSIZE_UNKNOWN zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) if lib.ZSTD_isError(zresult): raise ZstdError('error setting source size: %s' % _zstd_error(zresult)) cobj = ZstdCompressionObj() cobj._out = ffi.new('ZSTD_outBuffer *') cobj._dst_buffer = ffi.new('char[]', COMPRESSION_RECOMMENDED_OUTPUT_SIZE) cobj._out.dst = cobj._dst_buffer cobj._out.size = COMPRESSION_RECOMMENDED_OUTPUT_SIZE cobj._out.pos = 0 cobj._compressor = self cobj._finished = False return cobj def chunker(self, size=-1, chunk_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE): lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) if size < 0: size = lib.ZSTD_CONTENTSIZE_UNKNOWN zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) if lib.ZSTD_isError(zresult): raise ZstdError('error setting source size: %s' % _zstd_error(zresult)) return ZstdCompressionChunker(self, chunk_size=chunk_size) def copy_stream(self, ifh, ofh, size=-1, read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE, write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE): if not hasattr(ifh, 'read'): raise ValueError('first argument must have a read() method') if not hasattr(ofh, 'write'): raise ValueError('second argument must have a write() method') lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) if size < 0: size = lib.ZSTD_CONTENTSIZE_UNKNOWN zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) if lib.ZSTD_isError(zresult): raise ZstdError('error setting source size: %s' % _zstd_error(zresult)) in_buffer = ffi.new('ZSTD_inBuffer *') out_buffer = ffi.new('ZSTD_outBuffer *') dst_buffer = ffi.new('char[]', write_size) out_buffer.dst = dst_buffer out_buffer.size = write_size out_buffer.pos = 0 total_read, total_write = 0, 0 while True: data = ifh.read(read_size) if not data: break data_buffer = ffi.from_buffer(data) total_read += len(data_buffer) in_buffer.src = data_buffer in_buffer.size = len(data_buffer) in_buffer.pos = 0 while in_buffer.pos < in_buffer.size: zresult = lib.ZSTD_compressStream2(self._cctx, out_buffer, in_buffer, lib.ZSTD_e_continue) if lib.ZSTD_isError(zresult): raise ZstdError('zstd compress error: %s' % _zstd_error(zresult)) if out_buffer.pos: ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) total_write += out_buffer.pos out_buffer.pos = 0 # We've finished reading. Flush the compressor. while True: zresult = lib.ZSTD_compressStream2(self._cctx, out_buffer, in_buffer, lib.ZSTD_e_end) if lib.ZSTD_isError(zresult): raise ZstdError('error ending compression stream: %s' % _zstd_error(zresult)) if out_buffer.pos: ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) total_write += out_buffer.pos out_buffer.pos = 0 if zresult == 0: break return total_read, total_write def stream_reader(self, source, size=-1, read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE): lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) try: size = len(source) except Exception: pass if size < 0: size = lib.ZSTD_CONTENTSIZE_UNKNOWN zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) if lib.ZSTD_isError(zresult): raise ZstdError('error setting source size: %s' % _zstd_error(zresult)) return ZstdCompressionReader(self, source, read_size) def stream_writer(self, writer, size=-1, write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE, write_return_read=False): if not hasattr(writer, 'write'): raise ValueError('must pass an object with a write() method') lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) if size < 0: size = lib.ZSTD_CONTENTSIZE_UNKNOWN return ZstdCompressionWriter(self, writer, size, write_size, write_return_read) write_to = stream_writer def read_to_iter(self, reader, size=-1, read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE, write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE): if hasattr(reader, 'read'): have_read = True elif hasattr(reader, '__getitem__'): have_read = False buffer_offset = 0 size = len(reader) else: raise ValueError('must pass an object with a read() method or ' 'conforms to buffer protocol') lib.ZSTD_CCtx_reset(self._cctx, lib.ZSTD_reset_session_only) if size < 0: size = lib.ZSTD_CONTENTSIZE_UNKNOWN zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size) if lib.ZSTD_isError(zresult): raise ZstdError('error setting source size: %s' % _zstd_error(zresult)) in_buffer = ffi.new('ZSTD_inBuffer *') out_buffer = ffi.new('ZSTD_outBuffer *') in_buffer.src = ffi.NULL in_buffer.size = 0 in_buffer.pos = 0 dst_buffer = ffi.new('char[]', write_size) out_buffer.dst = dst_buffer out_buffer.size = write_size out_buffer.pos = 0 while True: # We should never have output data sitting around after a previous # iteration. assert out_buffer.pos == 0 # Collect input data. if have_read: read_result = reader.read(read_size) else: remaining = len(reader) - buffer_offset slice_size = min(remaining, read_size) read_result = reader[buffer_offset:buffer_offset + slice_size] buffer_offset += slice_size # No new input data. Break out of the read loop. if not read_result: break # Feed all read data into the compressor and emit output until # exhausted. read_buffer = ffi.from_buffer(read_result) in_buffer.src = read_buffer in_buffer.size = len(read_buffer) in_buffer.pos = 0 while in_buffer.pos < in_buffer.size: zresult = lib.ZSTD_compressStream2(self._cctx, out_buffer, in_buffer, lib.ZSTD_e_continue) if lib.ZSTD_isError(zresult): raise ZstdError('zstd compress error: %s' % _zstd_error(zresult)) if out_buffer.pos: data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:] out_buffer.pos = 0 yield data assert out_buffer.pos == 0 # And repeat the loop to collect more data. continue # If we get here, input is exhausted. End the stream and emit what # remains. while True: assert out_buffer.pos == 0 zresult = lib.ZSTD_compressStream2(self._cctx, out_buffer, in_buffer, lib.ZSTD_e_end) if lib.ZSTD_isError(zresult): raise ZstdError('error ending compression stream: %s' % _zstd_error(zresult)) if out_buffer.pos: data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:] out_buffer.pos = 0 yield data if zresult == 0: break read_from = read_to_iter def frame_progression(self): progression = lib.ZSTD_getFrameProgression(self._cctx) return progression.ingested, progression.consumed, progression.produced class FrameParameters(object): def __init__(self, fparams): self.content_size = fparams.frameContentSize self.window_size = fparams.windowSize self.dict_id = fparams.dictID self.has_checksum = bool(fparams.checksumFlag) def frame_content_size(data): data_buffer = ffi.from_buffer(data) size = lib.ZSTD_getFrameContentSize(data_buffer, len(data_buffer)) if size == lib.ZSTD_CONTENTSIZE_ERROR: raise ZstdError('error when determining content size') elif size == lib.ZSTD_CONTENTSIZE_UNKNOWN: return -1 else: return size def frame_header_size(data): data_buffer = ffi.from_buffer(data) zresult = lib.ZSTD_frameHeaderSize(data_buffer, len(data_buffer)) if lib.ZSTD_isError(zresult): raise ZstdError('could not determine frame header size: %s' % _zstd_error(zresult)) return zresult def get_frame_parameters(data): params = ffi.new('ZSTD_frameHeader *') data_buffer = ffi.from_buffer(data) zresult = lib.ZSTD_getFrameHeader(params, data_buffer, len(data_buffer)) if lib.ZSTD_isError(zresult): raise ZstdError('cannot get frame parameters: %s' % _zstd_error(zresult)) if zresult: raise ZstdError('not enough data for frame parameters; need %d bytes' % zresult) return FrameParameters(params[0]) class ZstdCompressionDict(object): def __init__(self, data, dict_type=DICT_TYPE_AUTO, k=0, d=0): assert isinstance(data, bytes_type) self._data = data self.k = k self.d = d if dict_type not in (DICT_TYPE_AUTO, DICT_TYPE_RAWCONTENT, DICT_TYPE_FULLDICT): raise ValueError('invalid dictionary load mode: %d; must use ' 'DICT_TYPE_* constants') self._dict_type = dict_type self._cdict = None def __len__(self): return len(self._data) def dict_id(self): return int_type(lib.ZDICT_getDictID(self._data, len(self._data))) def as_bytes(self): return self._data def precompute_compress(self, level=0, compression_params=None): if level and compression_params: raise ValueError('must only specify one of level or ' 'compression_params') if not level and not compression_params: raise ValueError('must specify one of level or compression_params') if level: cparams = lib.ZSTD_getCParams(level, 0, len(self._data)) else: cparams = ffi.new('ZSTD_compressionParameters') cparams.chainLog = compression_params.chain_log cparams.hashLog = compression_params.hash_log cparams.minMatch = compression_params.min_match cparams.searchLog = compression_params.search_log cparams.strategy = compression_params.compression_strategy cparams.targetLength = compression_params.target_length cparams.windowLog = compression_params.window_log cdict = lib.ZSTD_createCDict_advanced(self._data, len(self._data), lib.ZSTD_dlm_byRef, self._dict_type, cparams, lib.ZSTD_defaultCMem) if cdict == ffi.NULL: raise ZstdError('unable to precompute dictionary') self._cdict = ffi.gc(cdict, lib.ZSTD_freeCDict, size=lib.ZSTD_sizeof_CDict(cdict)) @property def _ddict(self): ddict = lib.ZSTD_createDDict_advanced(self._data, len(self._data), lib.ZSTD_dlm_byRef, self._dict_type, lib.ZSTD_defaultCMem) if ddict == ffi.NULL: raise ZstdError('could not create decompression dict') ddict = ffi.gc(ddict, lib.ZSTD_freeDDict, size=lib.ZSTD_sizeof_DDict(ddict)) self.__dict__['_ddict'] = ddict return ddict def train_dictionary(dict_size, samples, k=0, d=0, notifications=0, dict_id=0, level=0, steps=0, threads=0): if not isinstance(samples, list): raise TypeError('samples must be a list') if threads < 0: threads = _cpu_count() total_size = sum(map(len, samples)) samples_buffer = new_nonzero('char[]', total_size) sample_sizes = new_nonzero('size_t[]', len(samples)) offset = 0 for i, sample in enumerate(samples): if not isinstance(sample, bytes_type): raise ValueError('samples must be bytes') l = len(sample) ffi.memmove(samples_buffer + offset, sample, l) offset += l sample_sizes[i] = l dict_data = new_nonzero('char[]', dict_size) dparams = ffi.new('ZDICT_cover_params_t *')[0] dparams.k = k dparams.d = d dparams.steps = steps dparams.nbThreads = threads dparams.zParams.notificationLevel = notifications dparams.zParams.dictID = dict_id dparams.zParams.compressionLevel = level if (not dparams.k and not dparams.d and not dparams.steps and not dparams.nbThreads and not dparams.zParams.notificationLevel and not dparams.zParams.dictID and not dparams.zParams.compressionLevel): zresult = lib.ZDICT_trainFromBuffer( ffi.addressof(dict_data), dict_size, ffi.addressof(samples_buffer), ffi.addressof(sample_sizes, 0), len(samples)) elif dparams.steps or dparams.nbThreads: zresult = lib.ZDICT_optimizeTrainFromBuffer_cover( ffi.addressof(dict_data), dict_size, ffi.addressof(samples_buffer), ffi.addressof(sample_sizes, 0), len(samples), ffi.addressof(dparams)) else: zresult = lib.ZDICT_trainFromBuffer_cover( ffi.addressof(dict_data), dict_size, ffi.addressof(samples_buffer), ffi.addressof(sample_sizes, 0), len(samples), dparams) if lib.ZDICT_isError(zresult): msg = ffi.string(lib.ZDICT_getErrorName(zresult)).decode('utf-8') raise ZstdError('cannot train dict: %s' % msg) return ZstdCompressionDict(ffi.buffer(dict_data, zresult)[:], dict_type=DICT_TYPE_FULLDICT, k=dparams.k, d=dparams.d) class ZstdDecompressionObj(object): def __init__(self, decompressor, write_size): self._decompressor = decompressor self._write_size = write_size self._finished = False def decompress(self, data): if self._finished: raise ZstdError('cannot use a decompressobj multiple times') in_buffer = ffi.new('ZSTD_inBuffer *') out_buffer = ffi.new('ZSTD_outBuffer *') data_buffer = ffi.from_buffer(data) if len(data_buffer) == 0: return b'' in_buffer.src = data_buffer in_buffer.size = len(data_buffer) in_buffer.pos = 0 dst_buffer = ffi.new('char[]', self._write_size) out_buffer.dst = dst_buffer out_buffer.size = len(dst_buffer) out_buffer.pos = 0 chunks = [] while True: zresult = lib.ZSTD_decompressStream(self._decompressor._dctx, out_buffer, in_buffer) if lib.ZSTD_isError(zresult): raise ZstdError('zstd decompressor error: %s' % _zstd_error(zresult)) if zresult == 0: self._finished = True self._decompressor = None if out_buffer.pos: chunks.append(ffi.buffer(out_buffer.dst, out_buffer.pos)[:]) if (zresult == 0 or (in_buffer.pos == in_buffer.size and out_buffer.pos == 0)): break out_buffer.pos = 0 return b''.join(chunks) def flush(self, length=0): pass class ZstdDecompressionReader(object): def __init__(self, decompressor, source, read_size, read_across_frames): self._decompressor = decompressor self._source = source self._read_size = read_size self._read_across_frames = bool(read_across_frames) self._entered = False self._closed = False self._bytes_decompressed = 0 self._finished_input = False self._finished_output = False self._in_buffer = ffi.new('ZSTD_inBuffer *') # Holds a ref to self._in_buffer.src. self._source_buffer = None def __enter__(self): if self._entered: raise ValueError('cannot __enter__ multiple times') self._entered = True return self def __exit__(self, exc_type, exc_value, exc_tb): self._entered = False self._closed = True self._source = None self._decompressor = None return False def readable(self): return True def writable(self): return False def seekable(self): return True def readline(self): raise io.UnsupportedOperation() def readlines(self): raise io.UnsupportedOperation() def write(self, data): raise io.UnsupportedOperation() def writelines(self, lines): raise io.UnsupportedOperation() def isatty(self): return False def flush(self): return None def close(self): self._closed = True return None @property def closed(self): return self._closed def tell(self): return self._bytes_decompressed def readall(self): chunks = [] while True: chunk = self.read(1048576) if not chunk: break chunks.append(chunk) return b''.join(chunks) def __iter__(self): raise io.UnsupportedOperation() def __next__(self): raise io.UnsupportedOperation() next = __next__ def _read_input(self): # We have data left over in the input buffer. Use it. if self._in_buffer.pos < self._in_buffer.size: return # All input data exhausted. Nothing to do. if self._finished_input: return # Else populate the input buffer from our source. if hasattr(self._source, 'read'): data = self._source.read(self._read_size) if not data: self._finished_input = True return self._source_buffer = ffi.from_buffer(data) self._in_buffer.src = self._source_buffer self._in_buffer.size = len(self._source_buffer) self._in_buffer.pos = 0 else: self._source_buffer = ffi.from_buffer(self._source) self._in_buffer.src = self._source_buffer self._in_buffer.size = len(self._source_buffer) self._in_buffer.pos = 0 def _decompress_into_buffer(self, out_buffer): """Decompress available input into an output buffer. Returns True if data in output buffer should be emitted. """ zresult = lib.ZSTD_decompressStream(self._decompressor._dctx, out_buffer, self._in_buffer) if self._in_buffer.pos == self._in_buffer.size: self._in_buffer.src = ffi.NULL self._in_buffer.pos = 0 self._in_buffer.size = 0 self._source_buffer = None if not hasattr(self._source, 'read'): self._finished_input = True if lib.ZSTD_isError(zresult): raise ZstdError('zstd decompress error: %s' % _zstd_error(zresult)) # Emit data if there is data AND either: # a) output buffer is full (read amount is satisfied) # b) we're at end of a frame and not in frame spanning mode return (out_buffer.pos and (out_buffer.pos == out_buffer.size or zresult == 0 and not self._read_across_frames)) def read(self, size=-1): if self._closed: raise ValueError('stream is closed') if size < -1: raise ValueError('cannot read negative amounts less than -1') if size == -1: # This is recursive. But it gets the job done. return self.readall() if self._finished_output or size == 0: return b'' # We /could/ call into readinto() here. But that introduces more # overhead. dst_buffer = ffi.new('char[]', size) out_buffer = ffi.new('ZSTD_outBuffer *') out_buffer.dst = dst_buffer out_buffer.size = size out_buffer.pos = 0 self._read_input() if self._decompress_into_buffer(out_buffer): self._bytes_decompressed += out_buffer.pos return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] while not self._finished_input: self._read_input() if self._decompress_into_buffer(out_buffer): self._bytes_decompressed += out_buffer.pos return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] self._bytes_decompressed += out_buffer.pos return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] def readinto(self, b): if self._closed: raise ValueError('stream is closed') if self._finished_output: return 0 # TODO use writable=True once we require CFFI >= 1.12. dest_buffer = ffi.from_buffer(b) ffi.memmove(b, b'', 0) out_buffer = ffi.new('ZSTD_outBuffer *') out_buffer.dst = dest_buffer out_buffer.size = len(dest_buffer) out_buffer.pos = 0 self._read_input() if self._decompress_into_buffer(out_buffer): self._bytes_decompressed += out_buffer.pos return out_buffer.pos while not self._finished_input: self._read_input() if self._decompress_into_buffer(out_buffer): self._bytes_decompressed += out_buffer.pos return out_buffer.pos self._bytes_decompressed += out_buffer.pos return out_buffer.pos def read1(self, size=-1): if self._closed: raise ValueError('stream is closed') if size < -1: raise ValueError('cannot read negative amounts less than -1') if self._finished_output or size == 0: return b'' # -1 returns arbitrary number of bytes. if size == -1: size = DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE dst_buffer = ffi.new('char[]', size) out_buffer = ffi.new('ZSTD_outBuffer *') out_buffer.dst = dst_buffer out_buffer.size = size out_buffer.pos = 0 # read1() dictates that we can perform at most 1 call to underlying # stream to get input. However, we can't satisfy this restriction with # decompression because not all input generates output. So we allow # multiple read(). But unlike read(), we stop once we have any output. while not self._finished_input: self._read_input() self._decompress_into_buffer(out_buffer) if out_buffer.pos: break self._bytes_decompressed += out_buffer.pos return ffi.buffer(out_buffer.dst, out_buffer.pos)[:] def readinto1(self, b): if self._closed: raise ValueError('stream is closed') if self._finished_output: return 0 # TODO use writable=True once we require CFFI >= 1.12. dest_buffer = ffi.from_buffer(b) ffi.memmove(b, b'', 0) out_buffer = ffi.new('ZSTD_outBuffer *') out_buffer.dst = dest_buffer out_buffer.size = len(dest_buffer) out_buffer.pos = 0 while not self._finished_input and not self._finished_output: self._read_input() self._decompress_into_buffer(out_buffer) if out_buffer.pos: break self._bytes_decompressed += out_buffer.pos return out_buffer.pos def seek(self, pos, whence=os.SEEK_SET): if self._closed: raise ValueError('stream is closed') read_amount = 0 if whence == os.SEEK_SET: if pos < 0: raise ValueError('cannot seek to negative position with SEEK_SET') if pos < self._bytes_decompressed: raise ValueError('cannot seek zstd decompression stream ' 'backwards') read_amount = pos - self._bytes_decompressed elif whence == os.SEEK_CUR: if pos < 0: raise ValueError('cannot seek zstd decompression stream ' 'backwards') read_amount = pos elif whence == os.SEEK_END: raise ValueError('zstd decompression streams cannot be seeked ' 'with SEEK_END') while read_amount: result = self.read(min(read_amount, DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE)) if not result: break read_amount -= len(result) return self._bytes_decompressed class ZstdDecompressionWriter(object): def __init__(self, decompressor, writer, write_size, write_return_read): decompressor._ensure_dctx() self._decompressor = decompressor self._writer = writer self._write_size = write_size self._write_return_read = bool(write_return_read) self._entered = False self._closed = False def __enter__(self): if self._closed: raise ValueError('stream is closed') if self._entered: raise ZstdError('cannot __enter__ multiple times') self._entered = True return self def __exit__(self, exc_type, exc_value, exc_tb): self._entered = False self.close() def memory_size(self): return lib.ZSTD_sizeof_DCtx(self._decompressor._dctx) def close(self): if self._closed: return try: self.flush() finally: self._closed = True f = getattr(self._writer, 'close', None) if f: f() @property def closed(self): return self._closed def fileno(self): f = getattr(self._writer, 'fileno', None) if f: return f() else: raise OSError('fileno not available on underlying writer') def flush(self): if self._closed: raise ValueError('stream is closed') f = getattr(self._writer, 'flush', None) if f: return f() def isatty(self): return False def readable(self): return False def readline(self, size=-1): raise io.UnsupportedOperation() def readlines(self, hint=-1): raise io.UnsupportedOperation() def seek(self, offset, whence=None): raise io.UnsupportedOperation() def seekable(self): return False def tell(self): raise io.UnsupportedOperation() def truncate(self, size=None): raise io.UnsupportedOperation() def writable(self): return True def writelines(self, lines): raise io.UnsupportedOperation() def read(self, size=-1): raise io.UnsupportedOperation() def readall(self): raise io.UnsupportedOperation() def readinto(self, b): raise io.UnsupportedOperation() def write(self, data): if self._closed: raise ValueError('stream is closed') total_write = 0 in_buffer = ffi.new('ZSTD_inBuffer *') out_buffer = ffi.new('ZSTD_outBuffer *') data_buffer = ffi.from_buffer(data) in_buffer.src = data_buffer in_buffer.size = len(data_buffer) in_buffer.pos = 0 dst_buffer = ffi.new('char[]', self._write_size) out_buffer.dst = dst_buffer out_buffer.size = len(dst_buffer) out_buffer.pos = 0 dctx = self._decompressor._dctx while in_buffer.pos < in_buffer.size: zresult = lib.ZSTD_decompressStream(dctx, out_buffer, in_buffer) if lib.ZSTD_isError(zresult): raise ZstdError('zstd decompress error: %s' % _zstd_error(zresult)) if out_buffer.pos: self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:]) total_write += out_buffer.pos out_buffer.pos = 0 if self._write_return_read: return in_buffer.pos else: return total_write class ZstdDecompressor(object): def __init__(self, dict_data=None, max_window_size=0, format=FORMAT_ZSTD1): self._dict_data = dict_data self._max_window_size = max_window_size self._format = format dctx = lib.ZSTD_createDCtx() if dctx == ffi.NULL: raise MemoryError() self._dctx = dctx # Defer setting up garbage collection until full state is loaded so # the memory size is more accurate. try: self._ensure_dctx() finally: self._dctx = ffi.gc(dctx, lib.ZSTD_freeDCtx, size=lib.ZSTD_sizeof_DCtx(dctx)) def memory_size(self): return lib.ZSTD_sizeof_DCtx(self._dctx) def decompress(self, data, max_output_size=0): self._ensure_dctx() data_buffer = ffi.from_buffer(data) output_size = lib.ZSTD_getFrameContentSize(data_buffer, len(data_buffer)) if output_size == lib.ZSTD_CONTENTSIZE_ERROR: raise ZstdError('error determining content size from frame header') elif output_size == 0: return b'' elif output_size == lib.ZSTD_CONTENTSIZE_UNKNOWN: if not max_output_size: raise ZstdError('could not determine content size in frame header') result_buffer = ffi.new('char[]', max_output_size) result_size = max_output_size output_size = 0 else: result_buffer = ffi.new('char[]', output_size) result_size = output_size out_buffer = ffi.new('ZSTD_outBuffer *') out_buffer.dst = result_buffer out_buffer.size = result_size out_buffer.pos = 0 in_buffer = ffi.new('ZSTD_inBuffer *') in_buffer.src = data_buffer in_buffer.size = len(data_buffer) in_buffer.pos = 0 zresult = lib.ZSTD_decompressStream(self._dctx, out_buffer, in_buffer) if lib.ZSTD_isError(zresult): raise ZstdError('decompression error: %s' % _zstd_error(zresult)) elif zresult: raise ZstdError('decompression error: did not decompress full frame') elif output_size and out_buffer.pos != output_size: raise ZstdError('decompression error: decompressed %d bytes; expected %d' % (zresult, output_size)) return ffi.buffer(result_buffer, out_buffer.pos)[:] def stream_reader(self, source, read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE, read_across_frames=False): self._ensure_dctx() return ZstdDecompressionReader(self, source, read_size, read_across_frames) def decompressobj(self, write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE): if write_size < 1: raise ValueError('write_size must be positive') self._ensure_dctx() return ZstdDecompressionObj(self, write_size=write_size) def read_to_iter(self, reader, read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE, write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE, skip_bytes=0): if skip_bytes >= read_size: raise ValueError('skip_bytes must be smaller than read_size') if hasattr(reader, 'read'): have_read = True elif hasattr(reader, '__getitem__'): have_read = False buffer_offset = 0 size = len(reader) else: raise ValueError('must pass an object with a read() method or ' 'conforms to buffer protocol') if skip_bytes: if have_read: reader.read(skip_bytes) else: if skip_bytes > size: raise ValueError('skip_bytes larger than first input chunk') buffer_offset = skip_bytes self._ensure_dctx() in_buffer = ffi.new('ZSTD_inBuffer *') out_buffer = ffi.new('ZSTD_outBuffer *') dst_buffer = ffi.new('char[]', write_size) out_buffer.dst = dst_buffer out_buffer.size = len(dst_buffer) out_buffer.pos = 0 while True: assert out_buffer.pos == 0 if have_read: read_result = reader.read(read_size) else: remaining = size - buffer_offset slice_size = min(remaining, read_size) read_result = reader[buffer_offset:buffer_offset + slice_size] buffer_offset += slice_size # No new input. Break out of read loop. if not read_result: break # Feed all read data into decompressor and emit output until # exhausted. read_buffer = ffi.from_buffer(read_result) in_buffer.src = read_buffer in_buffer.size = len(read_buffer) in_buffer.pos = 0 while in_buffer.pos < in_buffer.size: assert out_buffer.pos == 0 zresult = lib.ZSTD_decompressStream(self._dctx, out_buffer, in_buffer) if lib.ZSTD_isError(zresult): raise ZstdError('zstd decompress error: %s' % _zstd_error(zresult)) if out_buffer.pos: data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:] out_buffer.pos = 0 yield data if zresult == 0: return # Repeat loop to collect more input data. continue # If we get here, input is exhausted. read_from = read_to_iter def stream_writer(self, writer, write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE, write_return_read=False): if not hasattr(writer, 'write'): raise ValueError('must pass an object with a write() method') return ZstdDecompressionWriter(self, writer, write_size, write_return_read) write_to = stream_writer def copy_stream(self, ifh, ofh, read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE, write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE): if not hasattr(ifh, 'read'): raise ValueError('first argument must have a read() method') if not hasattr(ofh, 'write'): raise ValueError('second argument must have a write() method') self._ensure_dctx() in_buffer = ffi.new('ZSTD_inBuffer *') out_buffer = ffi.new('ZSTD_outBuffer *') dst_buffer = ffi.new('char[]', write_size) out_buffer.dst = dst_buffer out_buffer.size = write_size out_buffer.pos = 0 total_read, total_write = 0, 0 # Read all available input. while True: data = ifh.read(read_size) if not data: break data_buffer = ffi.from_buffer(data) total_read += len(data_buffer) in_buffer.src = data_buffer in_buffer.size = len(data_buffer) in_buffer.pos = 0 # Flush all read data to output. while in_buffer.pos < in_buffer.size: zresult = lib.ZSTD_decompressStream(self._dctx, out_buffer, in_buffer) if lib.ZSTD_isError(zresult): raise ZstdError('zstd decompressor error: %s' % _zstd_error(zresult)) if out_buffer.pos: ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos)) total_write += out_buffer.pos out_buffer.pos = 0 # Continue loop to keep reading. return total_read, total_write def decompress_content_dict_chain(self, frames): if not isinstance(frames, list): raise TypeError('argument must be a list') if not frames: raise ValueError('empty input chain') # First chunk should not be using a dictionary. We handle it specially. chunk = frames[0] if not isinstance(chunk, bytes_type): raise ValueError('chunk 0 must be bytes') # All chunks should be zstd frames and should have content size set. chunk_buffer = ffi.from_buffer(chunk) params = ffi.new('ZSTD_frameHeader *') zresult = lib.ZSTD_getFrameHeader(params, chunk_buffer, len(chunk_buffer)) if lib.ZSTD_isError(zresult): raise ValueError('chunk 0 is not a valid zstd frame') elif zresult: raise ValueError('chunk 0 is too small to contain a zstd frame') if params.frameContentSize == lib.ZSTD_CONTENTSIZE_UNKNOWN: raise ValueError('chunk 0 missing content size in frame') self._ensure_dctx(load_dict=False) last_buffer = ffi.new('char[]', params.frameContentSize) out_buffer = ffi.new('ZSTD_outBuffer *') out_buffer.dst = last_buffer out_buffer.size = len(last_buffer) out_buffer.pos = 0 in_buffer = ffi.new('ZSTD_inBuffer *') in_buffer.src = chunk_buffer in_buffer.size = len(chunk_buffer) in_buffer.pos = 0 zresult = lib.ZSTD_decompressStream(self._dctx, out_buffer, in_buffer) if lib.ZSTD_isError(zresult): raise ZstdError('could not decompress chunk 0: %s' % _zstd_error(zresult)) elif zresult: raise ZstdError('chunk 0 did not decompress full frame') # Special case of chain length of 1 if len(frames) == 1: return ffi.buffer(last_buffer, len(last_buffer))[:] i = 1 while i < len(frames): chunk = frames[i] if not isinstance(chunk, bytes_type): raise ValueError('chunk %d must be bytes' % i) chunk_buffer = ffi.from_buffer(chunk) zresult = lib.ZSTD_getFrameHeader(params, chunk_buffer, len(chunk_buffer)) if lib.ZSTD_isError(zresult): raise ValueError('chunk %d is not a valid zstd frame' % i) elif zresult: raise ValueError('chunk %d is too small to contain a zstd frame' % i) if params.frameContentSize == lib.ZSTD_CONTENTSIZE_UNKNOWN: raise ValueError('chunk %d missing content size in frame' % i) dest_buffer = ffi.new('char[]', params.frameContentSize) out_buffer.dst = dest_buffer out_buffer.size = len(dest_buffer) out_buffer.pos = 0 in_buffer.src = chunk_buffer in_buffer.size = len(chunk_buffer) in_buffer.pos = 0 zresult = lib.ZSTD_decompressStream(self._dctx, out_buffer, in_buffer) if lib.ZSTD_isError(zresult): raise ZstdError('could not decompress chunk %d: %s' % _zstd_error(zresult)) elif zresult: raise ZstdError('chunk %d did not decompress full frame' % i) last_buffer = dest_buffer i += 1 return ffi.buffer(last_buffer, len(last_buffer))[:] def _ensure_dctx(self, load_dict=True): lib.ZSTD_DCtx_reset(self._dctx, lib.ZSTD_reset_session_only) if self._max_window_size: zresult = lib.ZSTD_DCtx_setMaxWindowSize(self._dctx, self._max_window_size) if lib.ZSTD_isError(zresult): raise ZstdError('unable to set max window size: %s' % _zstd_error(zresult)) zresult = lib.ZSTD_DCtx_setFormat(self._dctx, self._format) if lib.ZSTD_isError(zresult): raise ZstdError('unable to set decoding format: %s' % _zstd_error(zresult)) if self._dict_data and load_dict: zresult = lib.ZSTD_DCtx_refDDict(self._dctx, self._dict_data._ddict) if lib.ZSTD_isError(zresult): raise ZstdError('unable to reference prepared dictionary: %s' % _zstd_error(zresult))