contrib/python-zstandard/zstd_cffi.py
changeset 30924 c32454d69b85
parent 30444 b86a448a2965
child 31799 e0dc40530c5a
equal deleted inserted replaced
30923:5b60464efbde 30924:c32454d69b85
     6 
     6 
     7 """Python interface to the Zstandard (zstd) compression library."""
     7 """Python interface to the Zstandard (zstd) compression library."""
     8 
     8 
     9 from __future__ import absolute_import, unicode_literals
     9 from __future__ import absolute_import, unicode_literals
    10 
    10 
    11 import io
    11 import sys
    12 
    12 
    13 from _zstd_cffi import (
    13 from _zstd_cffi import (
    14     ffi,
    14     ffi,
    15     lib,
    15     lib,
    16 )
    16 )
    17 
    17 
    18 
    18 if sys.version_info[0] == 2:
    19 _CSTREAM_IN_SIZE = lib.ZSTD_CStreamInSize()
    19     bytes_type = str
    20 _CSTREAM_OUT_SIZE = lib.ZSTD_CStreamOutSize()
    20     int_type = long
    21 
    21 else:
    22 
    22     bytes_type = bytes
    23 class _ZstdCompressionWriter(object):
    23     int_type = int
    24     def __init__(self, cstream, writer):
    24 
    25         self._cstream = cstream
    25 
       
    26 COMPRESSION_RECOMMENDED_INPUT_SIZE = lib.ZSTD_CStreamInSize()
       
    27 COMPRESSION_RECOMMENDED_OUTPUT_SIZE = lib.ZSTD_CStreamOutSize()
       
    28 DECOMPRESSION_RECOMMENDED_INPUT_SIZE = lib.ZSTD_DStreamInSize()
       
    29 DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE = lib.ZSTD_DStreamOutSize()
       
    30 
       
    31 new_nonzero = ffi.new_allocator(should_clear_after_alloc=False)
       
    32 
       
    33 
       
    34 MAX_COMPRESSION_LEVEL = lib.ZSTD_maxCLevel()
       
    35 MAGIC_NUMBER = lib.ZSTD_MAGICNUMBER
       
    36 FRAME_HEADER = b'\x28\xb5\x2f\xfd'
       
    37 ZSTD_VERSION = (lib.ZSTD_VERSION_MAJOR, lib.ZSTD_VERSION_MINOR, lib.ZSTD_VERSION_RELEASE)
       
    38 
       
    39 WINDOWLOG_MIN = lib.ZSTD_WINDOWLOG_MIN
       
    40 WINDOWLOG_MAX = lib.ZSTD_WINDOWLOG_MAX
       
    41 CHAINLOG_MIN = lib.ZSTD_CHAINLOG_MIN
       
    42 CHAINLOG_MAX = lib.ZSTD_CHAINLOG_MAX
       
    43 HASHLOG_MIN = lib.ZSTD_HASHLOG_MIN
       
    44 HASHLOG_MAX = lib.ZSTD_HASHLOG_MAX
       
    45 HASHLOG3_MAX = lib.ZSTD_HASHLOG3_MAX
       
    46 SEARCHLOG_MIN = lib.ZSTD_SEARCHLOG_MIN
       
    47 SEARCHLOG_MAX = lib.ZSTD_SEARCHLOG_MAX
       
    48 SEARCHLENGTH_MIN = lib.ZSTD_SEARCHLENGTH_MIN
       
    49 SEARCHLENGTH_MAX = lib.ZSTD_SEARCHLENGTH_MAX
       
    50 TARGETLENGTH_MIN = lib.ZSTD_TARGETLENGTH_MIN
       
    51 TARGETLENGTH_MAX = lib.ZSTD_TARGETLENGTH_MAX
       
    52 
       
    53 STRATEGY_FAST = lib.ZSTD_fast
       
    54 STRATEGY_DFAST = lib.ZSTD_dfast
       
    55 STRATEGY_GREEDY = lib.ZSTD_greedy
       
    56 STRATEGY_LAZY = lib.ZSTD_lazy
       
    57 STRATEGY_LAZY2 = lib.ZSTD_lazy2
       
    58 STRATEGY_BTLAZY2 = lib.ZSTD_btlazy2
       
    59 STRATEGY_BTOPT = lib.ZSTD_btopt
       
    60 
       
    61 COMPRESSOBJ_FLUSH_FINISH = 0
       
    62 COMPRESSOBJ_FLUSH_BLOCK = 1
       
    63 
       
    64 
       
    65 class ZstdError(Exception):
       
    66     pass
       
    67 
       
    68 
       
    69 class CompressionParameters(object):
       
    70     def __init__(self, window_log, chain_log, hash_log, search_log,
       
    71                  search_length, target_length, strategy):
       
    72         if window_log < WINDOWLOG_MIN or window_log > WINDOWLOG_MAX:
       
    73             raise ValueError('invalid window log value')
       
    74 
       
    75         if chain_log < CHAINLOG_MIN or chain_log > CHAINLOG_MAX:
       
    76             raise ValueError('invalid chain log value')
       
    77 
       
    78         if hash_log < HASHLOG_MIN or hash_log > HASHLOG_MAX:
       
    79             raise ValueError('invalid hash log value')
       
    80 
       
    81         if search_log < SEARCHLOG_MIN or search_log > SEARCHLOG_MAX:
       
    82             raise ValueError('invalid search log value')
       
    83 
       
    84         if search_length < SEARCHLENGTH_MIN or search_length > SEARCHLENGTH_MAX:
       
    85             raise ValueError('invalid search length value')
       
    86 
       
    87         if target_length < TARGETLENGTH_MIN or target_length > TARGETLENGTH_MAX:
       
    88             raise ValueError('invalid target length value')
       
    89 
       
    90         if strategy < STRATEGY_FAST or strategy > STRATEGY_BTOPT:
       
    91             raise ValueError('invalid strategy value')
       
    92 
       
    93         self.window_log = window_log
       
    94         self.chain_log = chain_log
       
    95         self.hash_log = hash_log
       
    96         self.search_log = search_log
       
    97         self.search_length = search_length
       
    98         self.target_length = target_length
       
    99         self.strategy = strategy
       
   100 
       
   101     def as_compression_parameters(self):
       
   102         p = ffi.new('ZSTD_compressionParameters *')[0]
       
   103         p.windowLog = self.window_log
       
   104         p.chainLog = self.chain_log
       
   105         p.hashLog = self.hash_log
       
   106         p.searchLog = self.search_log
       
   107         p.searchLength = self.search_length
       
   108         p.targetLength = self.target_length
       
   109         p.strategy = self.strategy
       
   110 
       
   111         return p
       
   112 
       
   113 def get_compression_parameters(level, source_size=0, dict_size=0):
       
   114     params = lib.ZSTD_getCParams(level, source_size, dict_size)
       
   115     return CompressionParameters(window_log=params.windowLog,
       
   116                                  chain_log=params.chainLog,
       
   117                                  hash_log=params.hashLog,
       
   118                                  search_log=params.searchLog,
       
   119                                  search_length=params.searchLength,
       
   120                                  target_length=params.targetLength,
       
   121                                  strategy=params.strategy)
       
   122 
       
   123 
       
   124 def estimate_compression_context_size(params):
       
   125     if not isinstance(params, CompressionParameters):
       
   126         raise ValueError('argument must be a CompressionParameters')
       
   127 
       
   128     cparams = params.as_compression_parameters()
       
   129     return lib.ZSTD_estimateCCtxSize(cparams)
       
   130 
       
   131 
       
   132 def estimate_decompression_context_size():
       
   133     return lib.ZSTD_estimateDCtxSize()
       
   134 
       
   135 
       
   136 class ZstdCompressionWriter(object):
       
   137     def __init__(self, compressor, writer, source_size, write_size):
       
   138         self._compressor = compressor
    26         self._writer = writer
   139         self._writer = writer
       
   140         self._source_size = source_size
       
   141         self._write_size = write_size
       
   142         self._entered = False
    27 
   143 
    28     def __enter__(self):
   144     def __enter__(self):
       
   145         if self._entered:
       
   146             raise ZstdError('cannot __enter__ multiple times')
       
   147 
       
   148         self._cstream = self._compressor._get_cstream(self._source_size)
       
   149         self._entered = True
    29         return self
   150         return self
    30 
   151 
    31     def __exit__(self, exc_type, exc_value, exc_tb):
   152     def __exit__(self, exc_type, exc_value, exc_tb):
       
   153         self._entered = False
       
   154 
    32         if not exc_type and not exc_value and not exc_tb:
   155         if not exc_type and not exc_value and not exc_tb:
    33             out_buffer = ffi.new('ZSTD_outBuffer *')
   156             out_buffer = ffi.new('ZSTD_outBuffer *')
    34             out_buffer.dst = ffi.new('char[]', _CSTREAM_OUT_SIZE)
   157             dst_buffer = ffi.new('char[]', self._write_size)
    35             out_buffer.size = _CSTREAM_OUT_SIZE
   158             out_buffer.dst = dst_buffer
       
   159             out_buffer.size = self._write_size
    36             out_buffer.pos = 0
   160             out_buffer.pos = 0
    37 
   161 
    38             while True:
   162             while True:
    39                 res = lib.ZSTD_endStream(self._cstream, out_buffer)
   163                 zresult = lib.ZSTD_endStream(self._cstream, out_buffer)
    40                 if lib.ZSTD_isError(res):
   164                 if lib.ZSTD_isError(zresult):
    41                     raise Exception('error ending compression stream: %s' % lib.ZSTD_getErrorName)
   165                     raise ZstdError('error ending compression stream: %s' %
       
   166                                     ffi.string(lib.ZSTD_getErrorName(zresult)))
    42 
   167 
    43                 if out_buffer.pos:
   168                 if out_buffer.pos:
    44                     self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos))
   169                     self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:])
    45                     out_buffer.pos = 0
   170                     out_buffer.pos = 0
    46 
   171 
    47                 if res == 0:
   172                 if zresult == 0:
    48                     break
   173                     break
    49 
   174 
       
   175         self._cstream = None
       
   176         self._compressor = None
       
   177 
    50         return False
   178         return False
    51 
   179 
       
   180     def memory_size(self):
       
   181         if not self._entered:
       
   182             raise ZstdError('cannot determine size of an inactive compressor; '
       
   183                             'call when a context manager is active')
       
   184 
       
   185         return lib.ZSTD_sizeof_CStream(self._cstream)
       
   186 
    52     def write(self, data):
   187     def write(self, data):
       
   188         if not self._entered:
       
   189             raise ZstdError('write() must be called from an active context '
       
   190                             'manager')
       
   191 
       
   192         total_write = 0
       
   193 
       
   194         data_buffer = ffi.from_buffer(data)
       
   195 
       
   196         in_buffer = ffi.new('ZSTD_inBuffer *')
       
   197         in_buffer.src = data_buffer
       
   198         in_buffer.size = len(data_buffer)
       
   199         in_buffer.pos = 0
       
   200 
    53         out_buffer = ffi.new('ZSTD_outBuffer *')
   201         out_buffer = ffi.new('ZSTD_outBuffer *')
    54         out_buffer.dst = ffi.new('char[]', _CSTREAM_OUT_SIZE)
   202         dst_buffer = ffi.new('char[]', self._write_size)
    55         out_buffer.size = _CSTREAM_OUT_SIZE
   203         out_buffer.dst = dst_buffer
       
   204         out_buffer.size = self._write_size
    56         out_buffer.pos = 0
   205         out_buffer.pos = 0
    57 
   206 
    58         # TODO can we reuse existing memory?
       
    59         in_buffer = ffi.new('ZSTD_inBuffer *')
       
    60         in_buffer.src = ffi.new('char[]', data)
       
    61         in_buffer.size = len(data)
       
    62         in_buffer.pos = 0
       
    63         while in_buffer.pos < in_buffer.size:
   207         while in_buffer.pos < in_buffer.size:
    64             res = lib.ZSTD_compressStream(self._cstream, out_buffer, in_buffer)
   208             zresult = lib.ZSTD_compressStream(self._cstream, out_buffer, in_buffer)
    65             if lib.ZSTD_isError(res):
   209             if lib.ZSTD_isError(zresult):
    66                 raise Exception('zstd compress error: %s' % lib.ZSTD_getErrorName(res))
   210                 raise ZstdError('zstd compress error: %s' %
       
   211                                 ffi.string(lib.ZSTD_getErrorName(zresult)))
    67 
   212 
    68             if out_buffer.pos:
   213             if out_buffer.pos:
    69                 self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos))
   214                 self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:])
       
   215                 total_write += out_buffer.pos
    70                 out_buffer.pos = 0
   216                 out_buffer.pos = 0
    71 
   217 
       
   218         return total_write
       
   219 
       
   220     def flush(self):
       
   221         if not self._entered:
       
   222             raise ZstdError('flush must be called from an active context manager')
       
   223 
       
   224         total_write = 0
       
   225 
       
   226         out_buffer = ffi.new('ZSTD_outBuffer *')
       
   227         dst_buffer = ffi.new('char[]', self._write_size)
       
   228         out_buffer.dst = dst_buffer
       
   229         out_buffer.size = self._write_size
       
   230         out_buffer.pos = 0
       
   231 
       
   232         while True:
       
   233             zresult = lib.ZSTD_flushStream(self._cstream, out_buffer)
       
   234             if lib.ZSTD_isError(zresult):
       
   235                 raise ZstdError('zstd compress error: %s' %
       
   236                                 ffi.string(lib.ZSTD_getErrorName(zresult)))
       
   237 
       
   238             if not out_buffer.pos:
       
   239                 break
       
   240 
       
   241             self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:])
       
   242             total_write += out_buffer.pos
       
   243             out_buffer.pos = 0
       
   244 
       
   245         return total_write
       
   246 
       
   247 
       
   248 class ZstdCompressionObj(object):
       
   249     def compress(self, data):
       
   250         if self._finished:
       
   251             raise ZstdError('cannot call compress() after compressor finished')
       
   252 
       
   253         data_buffer = ffi.from_buffer(data)
       
   254         source = ffi.new('ZSTD_inBuffer *')
       
   255         source.src = data_buffer
       
   256         source.size = len(data_buffer)
       
   257         source.pos = 0
       
   258 
       
   259         chunks = []
       
   260 
       
   261         while source.pos < len(data):
       
   262             zresult = lib.ZSTD_compressStream(self._cstream, self._out, source)
       
   263             if lib.ZSTD_isError(zresult):
       
   264                 raise ZstdError('zstd compress error: %s' %
       
   265                                 ffi.string(lib.ZSTD_getErrorName(zresult)))
       
   266 
       
   267             if self._out.pos:
       
   268                 chunks.append(ffi.buffer(self._out.dst, self._out.pos)[:])
       
   269                 self._out.pos = 0
       
   270 
       
   271         return b''.join(chunks)
       
   272 
       
   273     def flush(self, flush_mode=COMPRESSOBJ_FLUSH_FINISH):
       
   274         if flush_mode not in (COMPRESSOBJ_FLUSH_FINISH, COMPRESSOBJ_FLUSH_BLOCK):
       
   275             raise ValueError('flush mode not recognized')
       
   276 
       
   277         if self._finished:
       
   278             raise ZstdError('compressor object already finished')
       
   279 
       
   280         assert self._out.pos == 0
       
   281 
       
   282         if flush_mode == COMPRESSOBJ_FLUSH_BLOCK:
       
   283             zresult = lib.ZSTD_flushStream(self._cstream, self._out)
       
   284             if lib.ZSTD_isError(zresult):
       
   285                 raise ZstdError('zstd compress error: %s' %
       
   286                                 ffi.string(lib.ZSTD_getErrorName(zresult)))
       
   287 
       
   288             # Output buffer is guaranteed to hold full block.
       
   289             assert zresult == 0
       
   290 
       
   291             if self._out.pos:
       
   292                 result = ffi.buffer(self._out.dst, self._out.pos)[:]
       
   293                 self._out.pos = 0
       
   294                 return result
       
   295             else:
       
   296                 return b''
       
   297 
       
   298         assert flush_mode == COMPRESSOBJ_FLUSH_FINISH
       
   299         self._finished = True
       
   300 
       
   301         chunks = []
       
   302 
       
   303         while True:
       
   304             zresult = lib.ZSTD_endStream(self._cstream, self._out)
       
   305             if lib.ZSTD_isError(zresult):
       
   306                 raise ZstdError('error ending compression stream: %s' %
       
   307                                 ffi.string(lib.ZSTD_getErroName(zresult)))
       
   308 
       
   309             if self._out.pos:
       
   310                 chunks.append(ffi.buffer(self._out.dst, self._out.pos)[:])
       
   311                 self._out.pos = 0
       
   312 
       
   313             if not zresult:
       
   314                 break
       
   315 
       
   316         # GC compression stream immediately.
       
   317         self._cstream = None
       
   318 
       
   319         return b''.join(chunks)
       
   320 
    72 
   321 
    73 class ZstdCompressor(object):
   322 class ZstdCompressor(object):
    74     def __init__(self, level=3, dict_data=None, compression_params=None):
   323     def __init__(self, level=3, dict_data=None, compression_params=None,
    75         if dict_data:
   324                  write_checksum=False, write_content_size=False,
    76             raise Exception('dict_data not yet supported')
   325                  write_dict_id=True):
    77         if compression_params:
   326         if level < 1:
    78             raise Exception('compression_params not yet supported')
   327             raise ValueError('level must be greater than 0')
       
   328         elif level > lib.ZSTD_maxCLevel():
       
   329             raise ValueError('level must be less than %d' % lib.ZSTD_maxCLevel())
    79 
   330 
    80         self._compression_level = level
   331         self._compression_level = level
    81 
   332         self._dict_data = dict_data
    82     def compress(self, data):
   333         self._cparams = compression_params
    83         # Just use the stream API for now.
   334         self._fparams = ffi.new('ZSTD_frameParameters *')[0]
    84         output = io.BytesIO()
   335         self._fparams.checksumFlag = write_checksum
    85         with self.write_to(output) as compressor:
   336         self._fparams.contentSizeFlag = write_content_size
    86             compressor.write(data)
   337         self._fparams.noDictIDFlag = not write_dict_id
    87         return output.getvalue()
   338 
    88 
   339         cctx = lib.ZSTD_createCCtx()
    89     def copy_stream(self, ifh, ofh):
   340         if cctx == ffi.NULL:
    90         cstream = self._get_cstream()
   341             raise MemoryError()
       
   342 
       
   343         self._cctx = ffi.gc(cctx, lib.ZSTD_freeCCtx)
       
   344 
       
   345     def compress(self, data, allow_empty=False):
       
   346         if len(data) == 0 and self._fparams.contentSizeFlag and not allow_empty:
       
   347             raise ValueError('cannot write empty inputs when writing content sizes')
       
   348 
       
   349         # TODO use a CDict for performance.
       
   350         dict_data = ffi.NULL
       
   351         dict_size = 0
       
   352 
       
   353         if self._dict_data:
       
   354             dict_data = self._dict_data.as_bytes()
       
   355             dict_size = len(self._dict_data)
       
   356 
       
   357         params = ffi.new('ZSTD_parameters *')[0]
       
   358         if self._cparams:
       
   359             params.cParams = self._cparams.as_compression_parameters()
       
   360         else:
       
   361             params.cParams = lib.ZSTD_getCParams(self._compression_level, len(data),
       
   362                                                  dict_size)
       
   363         params.fParams = self._fparams
       
   364 
       
   365         dest_size = lib.ZSTD_compressBound(len(data))
       
   366         out = new_nonzero('char[]', dest_size)
       
   367 
       
   368         zresult = lib.ZSTD_compress_advanced(self._cctx,
       
   369                                              ffi.addressof(out), dest_size,
       
   370                                              data, len(data),
       
   371                                              dict_data, dict_size,
       
   372                                              params)
       
   373 
       
   374         if lib.ZSTD_isError(zresult):
       
   375             raise ZstdError('cannot compress: %s' %
       
   376                             ffi.string(lib.ZSTD_getErrorName(zresult)))
       
   377 
       
   378         return ffi.buffer(out, zresult)[:]
       
   379 
       
   380     def compressobj(self, size=0):
       
   381         cstream = self._get_cstream(size)
       
   382         cobj = ZstdCompressionObj()
       
   383         cobj._cstream = cstream
       
   384         cobj._out = ffi.new('ZSTD_outBuffer *')
       
   385         cobj._dst_buffer = ffi.new('char[]', COMPRESSION_RECOMMENDED_OUTPUT_SIZE)
       
   386         cobj._out.dst = cobj._dst_buffer
       
   387         cobj._out.size = COMPRESSION_RECOMMENDED_OUTPUT_SIZE
       
   388         cobj._out.pos = 0
       
   389         cobj._compressor = self
       
   390         cobj._finished = False
       
   391 
       
   392         return cobj
       
   393 
       
   394     def copy_stream(self, ifh, ofh, size=0,
       
   395                     read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE,
       
   396                     write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE):
       
   397 
       
   398         if not hasattr(ifh, 'read'):
       
   399             raise ValueError('first argument must have a read() method')
       
   400         if not hasattr(ofh, 'write'):
       
   401             raise ValueError('second argument must have a write() method')
       
   402 
       
   403         cstream = self._get_cstream(size)
    91 
   404 
    92         in_buffer = ffi.new('ZSTD_inBuffer *')
   405         in_buffer = ffi.new('ZSTD_inBuffer *')
    93         out_buffer = ffi.new('ZSTD_outBuffer *')
   406         out_buffer = ffi.new('ZSTD_outBuffer *')
    94 
   407 
    95         out_buffer.dst = ffi.new('char[]', _CSTREAM_OUT_SIZE)
   408         dst_buffer = ffi.new('char[]', write_size)
    96         out_buffer.size = _CSTREAM_OUT_SIZE
   409         out_buffer.dst = dst_buffer
       
   410         out_buffer.size = write_size
    97         out_buffer.pos = 0
   411         out_buffer.pos = 0
    98 
   412 
    99         total_read, total_write = 0, 0
   413         total_read, total_write = 0, 0
   100 
   414 
   101         while True:
   415         while True:
   102             data = ifh.read(_CSTREAM_IN_SIZE)
   416             data = ifh.read(read_size)
   103             if not data:
   417             if not data:
   104                 break
   418                 break
   105 
   419 
   106             total_read += len(data)
   420             data_buffer = ffi.from_buffer(data)
   107 
   421             total_read += len(data_buffer)
   108             in_buffer.src = ffi.new('char[]', data)
   422             in_buffer.src = data_buffer
   109             in_buffer.size = len(data)
   423             in_buffer.size = len(data_buffer)
   110             in_buffer.pos = 0
   424             in_buffer.pos = 0
   111 
   425 
   112             while in_buffer.pos < in_buffer.size:
   426             while in_buffer.pos < in_buffer.size:
   113                 res = lib.ZSTD_compressStream(cstream, out_buffer, in_buffer)
   427                 zresult = lib.ZSTD_compressStream(cstream, out_buffer, in_buffer)
   114                 if lib.ZSTD_isError(res):
   428                 if lib.ZSTD_isError(zresult):
   115                     raise Exception('zstd compress error: %s' %
   429                     raise ZstdError('zstd compress error: %s' %
   116                                     lib.ZSTD_getErrorName(res))
   430                                     ffi.string(lib.ZSTD_getErrorName(zresult)))
   117 
   431 
   118                 if out_buffer.pos:
   432                 if out_buffer.pos:
   119                     ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos))
   433                     ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos))
   120                     total_write = out_buffer.pos
   434                     total_write += out_buffer.pos
   121                     out_buffer.pos = 0
   435                     out_buffer.pos = 0
   122 
   436 
   123         # We've finished reading. Flush the compressor.
   437         # We've finished reading. Flush the compressor.
   124         while True:
   438         while True:
   125             res = lib.ZSTD_endStream(cstream, out_buffer)
   439             zresult = lib.ZSTD_endStream(cstream, out_buffer)
   126             if lib.ZSTD_isError(res):
   440             if lib.ZSTD_isError(zresult):
   127                 raise Exception('error ending compression stream: %s' %
   441                 raise ZstdError('error ending compression stream: %s' %
   128                                 lib.ZSTD_getErrorName(res))
   442                                 ffi.string(lib.ZSTD_getErrorName(zresult)))
   129 
   443 
   130             if out_buffer.pos:
   444             if out_buffer.pos:
   131                 ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos))
   445                 ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos))
   132                 total_write += out_buffer.pos
   446                 total_write += out_buffer.pos
   133                 out_buffer.pos = 0
   447                 out_buffer.pos = 0
   134 
   448 
   135             if res == 0:
   449             if zresult == 0:
   136                 break
   450                 break
   137 
   451 
   138         return total_read, total_write
   452         return total_read, total_write
   139 
   453 
   140     def write_to(self, writer):
   454     def write_to(self, writer, size=0,
   141         return _ZstdCompressionWriter(self._get_cstream(), writer)
   455                  write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE):
   142 
   456 
   143     def _get_cstream(self):
   457         if not hasattr(writer, 'write'):
       
   458             raise ValueError('must pass an object with a write() method')
       
   459 
       
   460         return ZstdCompressionWriter(self, writer, size, write_size)
       
   461 
       
   462     def read_from(self, reader, size=0,
       
   463                   read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE,
       
   464                   write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE):
       
   465         if hasattr(reader, 'read'):
       
   466             have_read = True
       
   467         elif hasattr(reader, '__getitem__'):
       
   468             have_read = False
       
   469             buffer_offset = 0
       
   470             size = len(reader)
       
   471         else:
       
   472             raise ValueError('must pass an object with a read() method or '
       
   473                              'conforms to buffer protocol')
       
   474 
       
   475         cstream = self._get_cstream(size)
       
   476 
       
   477         in_buffer = ffi.new('ZSTD_inBuffer *')
       
   478         out_buffer = ffi.new('ZSTD_outBuffer *')
       
   479 
       
   480         in_buffer.src = ffi.NULL
       
   481         in_buffer.size = 0
       
   482         in_buffer.pos = 0
       
   483 
       
   484         dst_buffer = ffi.new('char[]', write_size)
       
   485         out_buffer.dst = dst_buffer
       
   486         out_buffer.size = write_size
       
   487         out_buffer.pos = 0
       
   488 
       
   489         while True:
       
   490             # We should never have output data sitting around after a previous
       
   491             # iteration.
       
   492             assert out_buffer.pos == 0
       
   493 
       
   494             # Collect input data.
       
   495             if have_read:
       
   496                 read_result = reader.read(read_size)
       
   497             else:
       
   498                 remaining = len(reader) - buffer_offset
       
   499                 slice_size = min(remaining, read_size)
       
   500                 read_result = reader[buffer_offset:buffer_offset + slice_size]
       
   501                 buffer_offset += slice_size
       
   502 
       
   503             # No new input data. Break out of the read loop.
       
   504             if not read_result:
       
   505                 break
       
   506 
       
   507             # Feed all read data into the compressor and emit output until
       
   508             # exhausted.
       
   509             read_buffer = ffi.from_buffer(read_result)
       
   510             in_buffer.src = read_buffer
       
   511             in_buffer.size = len(read_buffer)
       
   512             in_buffer.pos = 0
       
   513 
       
   514             while in_buffer.pos < in_buffer.size:
       
   515                 zresult = lib.ZSTD_compressStream(cstream, out_buffer, in_buffer)
       
   516                 if lib.ZSTD_isError(zresult):
       
   517                     raise ZstdError('zstd compress error: %s' %
       
   518                                     ffi.string(lib.ZSTD_getErrorName(zresult)))
       
   519 
       
   520                 if out_buffer.pos:
       
   521                     data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:]
       
   522                     out_buffer.pos = 0
       
   523                     yield data
       
   524 
       
   525             assert out_buffer.pos == 0
       
   526 
       
   527             # And repeat the loop to collect more data.
       
   528             continue
       
   529 
       
   530         # If we get here, input is exhausted. End the stream and emit what
       
   531         # remains.
       
   532         while True:
       
   533             assert out_buffer.pos == 0
       
   534             zresult = lib.ZSTD_endStream(cstream, out_buffer)
       
   535             if lib.ZSTD_isError(zresult):
       
   536                 raise ZstdError('error ending compression stream: %s' %
       
   537                                 ffi.string(lib.ZSTD_getErrorName(zresult)))
       
   538 
       
   539             if out_buffer.pos:
       
   540                 data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:]
       
   541                 out_buffer.pos = 0
       
   542                 yield data
       
   543 
       
   544             if zresult == 0:
       
   545                 break
       
   546 
       
   547     def _get_cstream(self, size):
   144         cstream = lib.ZSTD_createCStream()
   548         cstream = lib.ZSTD_createCStream()
       
   549         if cstream == ffi.NULL:
       
   550             raise MemoryError()
       
   551 
   145         cstream = ffi.gc(cstream, lib.ZSTD_freeCStream)
   552         cstream = ffi.gc(cstream, lib.ZSTD_freeCStream)
   146 
   553 
   147         res = lib.ZSTD_initCStream(cstream, self._compression_level)
   554         dict_data = ffi.NULL
   148         if lib.ZSTD_isError(res):
   555         dict_size = 0
       
   556         if self._dict_data:
       
   557             dict_data = self._dict_data.as_bytes()
       
   558             dict_size = len(self._dict_data)
       
   559 
       
   560         zparams = ffi.new('ZSTD_parameters *')[0]
       
   561         if self._cparams:
       
   562             zparams.cParams = self._cparams.as_compression_parameters()
       
   563         else:
       
   564             zparams.cParams = lib.ZSTD_getCParams(self._compression_level,
       
   565                                                   size, dict_size)
       
   566         zparams.fParams = self._fparams
       
   567 
       
   568         zresult = lib.ZSTD_initCStream_advanced(cstream, dict_data, dict_size,
       
   569                                                 zparams, size)
       
   570         if lib.ZSTD_isError(zresult):
   149             raise Exception('cannot init CStream: %s' %
   571             raise Exception('cannot init CStream: %s' %
   150                             lib.ZSTD_getErrorName(res))
   572                             ffi.string(lib.ZSTD_getErrorName(zresult)))
   151 
   573 
   152         return cstream
   574         return cstream
       
   575 
       
   576 
       
   577 class FrameParameters(object):
       
   578     def __init__(self, fparams):
       
   579         self.content_size = fparams.frameContentSize
       
   580         self.window_size = fparams.windowSize
       
   581         self.dict_id = fparams.dictID
       
   582         self.has_checksum = bool(fparams.checksumFlag)
       
   583 
       
   584 
       
   585 def get_frame_parameters(data):
       
   586     if not isinstance(data, bytes_type):
       
   587         raise TypeError('argument must be bytes')
       
   588 
       
   589     params = ffi.new('ZSTD_frameParams *')
       
   590 
       
   591     zresult = lib.ZSTD_getFrameParams(params, data, len(data))
       
   592     if lib.ZSTD_isError(zresult):
       
   593         raise ZstdError('cannot get frame parameters: %s' %
       
   594                         ffi.string(lib.ZSTD_getErrorName(zresult)))
       
   595 
       
   596     if zresult:
       
   597         raise ZstdError('not enough data for frame parameters; need %d bytes' %
       
   598                         zresult)
       
   599 
       
   600     return FrameParameters(params[0])
       
   601 
       
   602 
       
   603 class ZstdCompressionDict(object):
       
   604     def __init__(self, data):
       
   605         assert isinstance(data, bytes_type)
       
   606         self._data = data
       
   607 
       
   608     def __len__(self):
       
   609         return len(self._data)
       
   610 
       
   611     def dict_id(self):
       
   612         return int_type(lib.ZDICT_getDictID(self._data, len(self._data)))
       
   613 
       
   614     def as_bytes(self):
       
   615         return self._data
       
   616 
       
   617 
       
   618 def train_dictionary(dict_size, samples, parameters=None):
       
   619     if not isinstance(samples, list):
       
   620         raise TypeError('samples must be a list')
       
   621 
       
   622     total_size = sum(map(len, samples))
       
   623 
       
   624     samples_buffer = new_nonzero('char[]', total_size)
       
   625     sample_sizes = new_nonzero('size_t[]', len(samples))
       
   626 
       
   627     offset = 0
       
   628     for i, sample in enumerate(samples):
       
   629         if not isinstance(sample, bytes_type):
       
   630             raise ValueError('samples must be bytes')
       
   631 
       
   632         l = len(sample)
       
   633         ffi.memmove(samples_buffer + offset, sample, l)
       
   634         offset += l
       
   635         sample_sizes[i] = l
       
   636 
       
   637     dict_data = new_nonzero('char[]', dict_size)
       
   638 
       
   639     zresult = lib.ZDICT_trainFromBuffer(ffi.addressof(dict_data), dict_size,
       
   640                                         ffi.addressof(samples_buffer),
       
   641                                         ffi.addressof(sample_sizes, 0),
       
   642                                         len(samples))
       
   643     if lib.ZDICT_isError(zresult):
       
   644         raise ZstdError('Cannot train dict: %s' %
       
   645                         ffi.string(lib.ZDICT_getErrorName(zresult)))
       
   646 
       
   647     return ZstdCompressionDict(ffi.buffer(dict_data, zresult)[:])
       
   648 
       
   649 
       
   650 class ZstdDecompressionObj(object):
       
   651     def __init__(self, decompressor):
       
   652         self._decompressor = decompressor
       
   653         self._dstream = self._decompressor._get_dstream()
       
   654         self._finished = False
       
   655 
       
   656     def decompress(self, data):
       
   657         if self._finished:
       
   658             raise ZstdError('cannot use a decompressobj multiple times')
       
   659 
       
   660         in_buffer = ffi.new('ZSTD_inBuffer *')
       
   661         out_buffer = ffi.new('ZSTD_outBuffer *')
       
   662 
       
   663         data_buffer = ffi.from_buffer(data)
       
   664         in_buffer.src = data_buffer
       
   665         in_buffer.size = len(data_buffer)
       
   666         in_buffer.pos = 0
       
   667 
       
   668         dst_buffer = ffi.new('char[]', DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE)
       
   669         out_buffer.dst = dst_buffer
       
   670         out_buffer.size = len(dst_buffer)
       
   671         out_buffer.pos = 0
       
   672 
       
   673         chunks = []
       
   674 
       
   675         while in_buffer.pos < in_buffer.size:
       
   676             zresult = lib.ZSTD_decompressStream(self._dstream, out_buffer, in_buffer)
       
   677             if lib.ZSTD_isError(zresult):
       
   678                 raise ZstdError('zstd decompressor error: %s' %
       
   679                                 ffi.string(lib.ZSTD_getErrorName(zresult)))
       
   680 
       
   681             if zresult == 0:
       
   682                 self._finished = True
       
   683                 self._dstream = None
       
   684                 self._decompressor = None
       
   685 
       
   686             if out_buffer.pos:
       
   687                 chunks.append(ffi.buffer(out_buffer.dst, out_buffer.pos)[:])
       
   688                 out_buffer.pos = 0
       
   689 
       
   690         return b''.join(chunks)
       
   691 
       
   692 
       
   693 class ZstdDecompressionWriter(object):
       
   694     def __init__(self, decompressor, writer, write_size):
       
   695         self._decompressor = decompressor
       
   696         self._writer = writer
       
   697         self._write_size = write_size
       
   698         self._dstream = None
       
   699         self._entered = False
       
   700 
       
   701     def __enter__(self):
       
   702         if self._entered:
       
   703             raise ZstdError('cannot __enter__ multiple times')
       
   704 
       
   705         self._dstream = self._decompressor._get_dstream()
       
   706         self._entered = True
       
   707 
       
   708         return self
       
   709 
       
   710     def __exit__(self, exc_type, exc_value, exc_tb):
       
   711         self._entered = False
       
   712         self._dstream = None
       
   713 
       
   714     def memory_size(self):
       
   715         if not self._dstream:
       
   716             raise ZstdError('cannot determine size of inactive decompressor '
       
   717                             'call when context manager is active')
       
   718 
       
   719         return lib.ZSTD_sizeof_DStream(self._dstream)
       
   720 
       
   721     def write(self, data):
       
   722         if not self._entered:
       
   723             raise ZstdError('write must be called from an active context manager')
       
   724 
       
   725         total_write = 0
       
   726 
       
   727         in_buffer = ffi.new('ZSTD_inBuffer *')
       
   728         out_buffer = ffi.new('ZSTD_outBuffer *')
       
   729 
       
   730         data_buffer = ffi.from_buffer(data)
       
   731         in_buffer.src = data_buffer
       
   732         in_buffer.size = len(data_buffer)
       
   733         in_buffer.pos = 0
       
   734 
       
   735         dst_buffer = ffi.new('char[]', self._write_size)
       
   736         out_buffer.dst = dst_buffer
       
   737         out_buffer.size = len(dst_buffer)
       
   738         out_buffer.pos = 0
       
   739 
       
   740         while in_buffer.pos < in_buffer.size:
       
   741             zresult = lib.ZSTD_decompressStream(self._dstream, out_buffer, in_buffer)
       
   742             if lib.ZSTD_isError(zresult):
       
   743                 raise ZstdError('zstd decompress error: %s' %
       
   744                                 ffi.string(lib.ZSTD_getErrorName(zresult)))
       
   745 
       
   746             if out_buffer.pos:
       
   747                 self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:])
       
   748                 total_write += out_buffer.pos
       
   749                 out_buffer.pos = 0
       
   750 
       
   751         return total_write
       
   752 
       
   753 
       
   754 class ZstdDecompressor(object):
       
   755     def __init__(self, dict_data=None):
       
   756         self._dict_data = dict_data
       
   757 
       
   758         dctx = lib.ZSTD_createDCtx()
       
   759         if dctx == ffi.NULL:
       
   760             raise MemoryError()
       
   761 
       
   762         self._refdctx = ffi.gc(dctx, lib.ZSTD_freeDCtx)
       
   763 
       
   764     @property
       
   765     def _ddict(self):
       
   766         if self._dict_data:
       
   767             dict_data = self._dict_data.as_bytes()
       
   768             dict_size = len(self._dict_data)
       
   769 
       
   770             ddict = lib.ZSTD_createDDict(dict_data, dict_size)
       
   771             if ddict == ffi.NULL:
       
   772                 raise ZstdError('could not create decompression dict')
       
   773         else:
       
   774             ddict = None
       
   775 
       
   776         self.__dict__['_ddict'] = ddict
       
   777         return ddict
       
   778 
       
   779     def decompress(self, data, max_output_size=0):
       
   780         data_buffer = ffi.from_buffer(data)
       
   781 
       
   782         orig_dctx = new_nonzero('char[]', lib.ZSTD_sizeof_DCtx(self._refdctx))
       
   783         dctx = ffi.cast('ZSTD_DCtx *', orig_dctx)
       
   784         lib.ZSTD_copyDCtx(dctx, self._refdctx)
       
   785 
       
   786         ddict = self._ddict
       
   787 
       
   788         output_size = lib.ZSTD_getDecompressedSize(data_buffer, len(data_buffer))
       
   789         if output_size:
       
   790             result_buffer = ffi.new('char[]', output_size)
       
   791             result_size = output_size
       
   792         else:
       
   793             if not max_output_size:
       
   794                 raise ZstdError('input data invalid or missing content size '
       
   795                                 'in frame header')
       
   796 
       
   797             result_buffer = ffi.new('char[]', max_output_size)
       
   798             result_size = max_output_size
       
   799 
       
   800         if ddict:
       
   801             zresult = lib.ZSTD_decompress_usingDDict(dctx,
       
   802                                                      result_buffer, result_size,
       
   803                                                      data_buffer, len(data_buffer),
       
   804                                                      ddict)
       
   805         else:
       
   806             zresult = lib.ZSTD_decompressDCtx(dctx,
       
   807                                               result_buffer, result_size,
       
   808                                               data_buffer, len(data_buffer))
       
   809         if lib.ZSTD_isError(zresult):
       
   810             raise ZstdError('decompression error: %s' %
       
   811                             ffi.string(lib.ZSTD_getErrorName(zresult)))
       
   812         elif output_size and zresult != output_size:
       
   813             raise ZstdError('decompression error: decompressed %d bytes; expected %d' %
       
   814                             (zresult, output_size))
       
   815 
       
   816         return ffi.buffer(result_buffer, zresult)[:]
       
   817 
       
   818     def decompressobj(self):
       
   819         return ZstdDecompressionObj(self)
       
   820 
       
   821     def read_from(self, reader, read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE,
       
   822                   write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE,
       
   823                   skip_bytes=0):
       
   824         if skip_bytes >= read_size:
       
   825             raise ValueError('skip_bytes must be smaller than read_size')
       
   826 
       
   827         if hasattr(reader, 'read'):
       
   828             have_read = True
       
   829         elif hasattr(reader, '__getitem__'):
       
   830             have_read = False
       
   831             buffer_offset = 0
       
   832             size = len(reader)
       
   833         else:
       
   834             raise ValueError('must pass an object with a read() method or '
       
   835                              'conforms to buffer protocol')
       
   836 
       
   837         if skip_bytes:
       
   838             if have_read:
       
   839                 reader.read(skip_bytes)
       
   840             else:
       
   841                 if skip_bytes > size:
       
   842                     raise ValueError('skip_bytes larger than first input chunk')
       
   843 
       
   844                 buffer_offset = skip_bytes
       
   845 
       
   846         dstream = self._get_dstream()
       
   847 
       
   848         in_buffer = ffi.new('ZSTD_inBuffer *')
       
   849         out_buffer = ffi.new('ZSTD_outBuffer *')
       
   850 
       
   851         dst_buffer = ffi.new('char[]', write_size)
       
   852         out_buffer.dst = dst_buffer
       
   853         out_buffer.size = len(dst_buffer)
       
   854         out_buffer.pos = 0
       
   855 
       
   856         while True:
       
   857             assert out_buffer.pos == 0
       
   858 
       
   859             if have_read:
       
   860                 read_result = reader.read(read_size)
       
   861             else:
       
   862                 remaining = size - buffer_offset
       
   863                 slice_size = min(remaining, read_size)
       
   864                 read_result = reader[buffer_offset:buffer_offset + slice_size]
       
   865                 buffer_offset += slice_size
       
   866 
       
   867             # No new input. Break out of read loop.
       
   868             if not read_result:
       
   869                 break
       
   870 
       
   871             # Feed all read data into decompressor and emit output until
       
   872             # exhausted.
       
   873             read_buffer = ffi.from_buffer(read_result)
       
   874             in_buffer.src = read_buffer
       
   875             in_buffer.size = len(read_buffer)
       
   876             in_buffer.pos = 0
       
   877 
       
   878             while in_buffer.pos < in_buffer.size:
       
   879                 assert out_buffer.pos == 0
       
   880 
       
   881                 zresult = lib.ZSTD_decompressStream(dstream, out_buffer, in_buffer)
       
   882                 if lib.ZSTD_isError(zresult):
       
   883                     raise ZstdError('zstd decompress error: %s' %
       
   884                                     ffi.string(lib.ZSTD_getErrorName(zresult)))
       
   885 
       
   886                 if out_buffer.pos:
       
   887                     data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:]
       
   888                     out_buffer.pos = 0
       
   889                     yield data
       
   890 
       
   891                 if zresult == 0:
       
   892                     return
       
   893 
       
   894             # Repeat loop to collect more input data.
       
   895             continue
       
   896 
       
   897         # If we get here, input is exhausted.
       
   898 
       
   899     def write_to(self, writer, write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE):
       
   900         if not hasattr(writer, 'write'):
       
   901             raise ValueError('must pass an object with a write() method')
       
   902 
       
   903         return ZstdDecompressionWriter(self, writer, write_size)
       
   904 
       
   905     def copy_stream(self, ifh, ofh,
       
   906                     read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE,
       
   907                     write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE):
       
   908         if not hasattr(ifh, 'read'):
       
   909             raise ValueError('first argument must have a read() method')
       
   910         if not hasattr(ofh, 'write'):
       
   911             raise ValueError('second argument must have a write() method')
       
   912 
       
   913         dstream = self._get_dstream()
       
   914 
       
   915         in_buffer = ffi.new('ZSTD_inBuffer *')
       
   916         out_buffer = ffi.new('ZSTD_outBuffer *')
       
   917 
       
   918         dst_buffer = ffi.new('char[]', write_size)
       
   919         out_buffer.dst = dst_buffer
       
   920         out_buffer.size = write_size
       
   921         out_buffer.pos = 0
       
   922 
       
   923         total_read, total_write = 0, 0
       
   924 
       
   925         # Read all available input.
       
   926         while True:
       
   927             data = ifh.read(read_size)
       
   928             if not data:
       
   929                 break
       
   930 
       
   931             data_buffer = ffi.from_buffer(data)
       
   932             total_read += len(data_buffer)
       
   933             in_buffer.src = data_buffer
       
   934             in_buffer.size = len(data_buffer)
       
   935             in_buffer.pos = 0
       
   936 
       
   937             # Flush all read data to output.
       
   938             while in_buffer.pos < in_buffer.size:
       
   939                 zresult = lib.ZSTD_decompressStream(dstream, out_buffer, in_buffer)
       
   940                 if lib.ZSTD_isError(zresult):
       
   941                     raise ZstdError('zstd decompressor error: %s' %
       
   942                                     ffi.string(lib.ZSTD_getErrorName(zresult)))
       
   943 
       
   944                 if out_buffer.pos:
       
   945                     ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos))
       
   946                     total_write += out_buffer.pos
       
   947                     out_buffer.pos = 0
       
   948 
       
   949             # Continue loop to keep reading.
       
   950 
       
   951         return total_read, total_write
       
   952 
       
   953     def decompress_content_dict_chain(self, frames):
       
   954         if not isinstance(frames, list):
       
   955             raise TypeError('argument must be a list')
       
   956 
       
   957         if not frames:
       
   958             raise ValueError('empty input chain')
       
   959 
       
   960         # First chunk should not be using a dictionary. We handle it specially.
       
   961         chunk = frames[0]
       
   962         if not isinstance(chunk, bytes_type):
       
   963             raise ValueError('chunk 0 must be bytes')
       
   964 
       
   965         # All chunks should be zstd frames and should have content size set.
       
   966         chunk_buffer = ffi.from_buffer(chunk)
       
   967         params = ffi.new('ZSTD_frameParams *')
       
   968         zresult = lib.ZSTD_getFrameParams(params, chunk_buffer, len(chunk_buffer))
       
   969         if lib.ZSTD_isError(zresult):
       
   970             raise ValueError('chunk 0 is not a valid zstd frame')
       
   971         elif zresult:
       
   972             raise ValueError('chunk 0 is too small to contain a zstd frame')
       
   973 
       
   974         if not params.frameContentSize:
       
   975             raise ValueError('chunk 0 missing content size in frame')
       
   976 
       
   977         dctx = lib.ZSTD_createDCtx()
       
   978         if dctx == ffi.NULL:
       
   979             raise MemoryError()
       
   980 
       
   981         dctx = ffi.gc(dctx, lib.ZSTD_freeDCtx)
       
   982 
       
   983         last_buffer = ffi.new('char[]', params.frameContentSize)
       
   984 
       
   985         zresult = lib.ZSTD_decompressDCtx(dctx, last_buffer, len(last_buffer),
       
   986                                           chunk_buffer, len(chunk_buffer))
       
   987         if lib.ZSTD_isError(zresult):
       
   988             raise ZstdError('could not decompress chunk 0: %s' %
       
   989                             ffi.string(lib.ZSTD_getErrorName(zresult)))
       
   990 
       
   991         # Special case of chain length of 1
       
   992         if len(frames) == 1:
       
   993             return ffi.buffer(last_buffer, len(last_buffer))[:]
       
   994 
       
   995         i = 1
       
   996         while i < len(frames):
       
   997             chunk = frames[i]
       
   998             if not isinstance(chunk, bytes_type):
       
   999                 raise ValueError('chunk %d must be bytes' % i)
       
  1000 
       
  1001             chunk_buffer = ffi.from_buffer(chunk)
       
  1002             zresult = lib.ZSTD_getFrameParams(params, chunk_buffer, len(chunk_buffer))
       
  1003             if lib.ZSTD_isError(zresult):
       
  1004                 raise ValueError('chunk %d is not a valid zstd frame' % i)
       
  1005             elif zresult:
       
  1006                 raise ValueError('chunk %d is too small to contain a zstd frame' % i)
       
  1007 
       
  1008             if not params.frameContentSize:
       
  1009                 raise ValueError('chunk %d missing content size in frame' % i)
       
  1010 
       
  1011             dest_buffer = ffi.new('char[]', params.frameContentSize)
       
  1012 
       
  1013             zresult = lib.ZSTD_decompress_usingDict(dctx, dest_buffer, len(dest_buffer),
       
  1014                                                     chunk_buffer, len(chunk_buffer),
       
  1015                                                     last_buffer, len(last_buffer))
       
  1016             if lib.ZSTD_isError(zresult):
       
  1017                 raise ZstdError('could not decompress chunk %d' % i)
       
  1018 
       
  1019             last_buffer = dest_buffer
       
  1020             i += 1
       
  1021 
       
  1022         return ffi.buffer(last_buffer, len(last_buffer))[:]
       
  1023 
       
  1024     def _get_dstream(self):
       
  1025         dstream = lib.ZSTD_createDStream()
       
  1026         if dstream == ffi.NULL:
       
  1027             raise MemoryError()
       
  1028 
       
  1029         dstream = ffi.gc(dstream, lib.ZSTD_freeDStream)
       
  1030 
       
  1031         if self._dict_data:
       
  1032             zresult = lib.ZSTD_initDStream_usingDict(dstream,
       
  1033                                                      self._dict_data.as_bytes(),
       
  1034                                                      len(self._dict_data))
       
  1035         else:
       
  1036             zresult = lib.ZSTD_initDStream(dstream)
       
  1037 
       
  1038         if lib.ZSTD_isError(zresult):
       
  1039             raise ZstdError('could not initialize DStream: %s' %
       
  1040                             ffi.string(lib.ZSTD_getErrorName(zresult)))
       
  1041 
       
  1042         return dstream