view contrib/python-zstandard/zstd_cffi.py @ 41157:c4639fdae1b9

simplestorerepo: minimal changes required to get this mostly working again I was going to change this code's use of CBOR to use our in-house CBOR code, but discovered it's been broken for a while. This messy change gets it back to a point where it mostly works, I think roughly as well as it ever did. Should we keep this and fix it up the rest of the way, or dump it in favor of the sqlite store? Would this be a good jumping-off point for some sort of union store that could facilitate a cleanup in remotefilelog? Differential Revision: https://phab.mercurial-scm.org/D5519
author Augie Fackler <augie@google.com>
date Mon, 07 Jan 2019 18:22:20 -0500
parents 73fef626dae3
children
line wrap: on
line source

# Copyright (c) 2016-present, Gregory Szorc
# All rights reserved.
#
# This software may be modified and distributed under the terms
# of the BSD license. See the LICENSE file for details.

"""Python interface to the Zstandard (zstd) compression library."""

from __future__ import absolute_import, unicode_literals

# This should match what the C extension exports.
__all__ = [
    #'BufferSegment',
    #'BufferSegments',
    #'BufferWithSegments',
    #'BufferWithSegmentsCollection',
    'CompressionParameters',
    'ZstdCompressionDict',
    'ZstdCompressionParameters',
    'ZstdCompressor',
    'ZstdError',
    'ZstdDecompressor',
    'FrameParameters',
    'estimate_decompression_context_size',
    'frame_content_size',
    'frame_header_size',
    'get_frame_parameters',
    'train_dictionary',

    # Constants.
    'COMPRESSOBJ_FLUSH_FINISH',
    'COMPRESSOBJ_FLUSH_BLOCK',
    'ZSTD_VERSION',
    'FRAME_HEADER',
    'CONTENTSIZE_UNKNOWN',
    'CONTENTSIZE_ERROR',
    'MAX_COMPRESSION_LEVEL',
    'COMPRESSION_RECOMMENDED_INPUT_SIZE',
    'COMPRESSION_RECOMMENDED_OUTPUT_SIZE',
    'DECOMPRESSION_RECOMMENDED_INPUT_SIZE',
    'DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE',
    'MAGIC_NUMBER',
    'BLOCKSIZELOG_MAX',
    'BLOCKSIZE_MAX',
    'WINDOWLOG_MIN',
    'WINDOWLOG_MAX',
    'CHAINLOG_MIN',
    'CHAINLOG_MAX',
    'HASHLOG_MIN',
    'HASHLOG_MAX',
    'HASHLOG3_MAX',
    'SEARCHLOG_MIN',
    'SEARCHLOG_MAX',
    'SEARCHLENGTH_MIN',
    'SEARCHLENGTH_MAX',
    'TARGETLENGTH_MIN',
    'TARGETLENGTH_MAX',
    'LDM_MINMATCH_MIN',
    'LDM_MINMATCH_MAX',
    'LDM_BUCKETSIZELOG_MAX',
    'STRATEGY_FAST',
    'STRATEGY_DFAST',
    'STRATEGY_GREEDY',
    'STRATEGY_LAZY',
    'STRATEGY_LAZY2',
    'STRATEGY_BTLAZY2',
    'STRATEGY_BTOPT',
    'STRATEGY_BTULTRA',
    'DICT_TYPE_AUTO',
    'DICT_TYPE_RAWCONTENT',
    'DICT_TYPE_FULLDICT',
    'FORMAT_ZSTD1',
    'FORMAT_ZSTD1_MAGICLESS',
]

import io
import os
import sys

from _zstd_cffi import (
    ffi,
    lib,
)

if sys.version_info[0] == 2:
    bytes_type = str
    int_type = long
else:
    bytes_type = bytes
    int_type = int


COMPRESSION_RECOMMENDED_INPUT_SIZE = lib.ZSTD_CStreamInSize()
COMPRESSION_RECOMMENDED_OUTPUT_SIZE = lib.ZSTD_CStreamOutSize()
DECOMPRESSION_RECOMMENDED_INPUT_SIZE = lib.ZSTD_DStreamInSize()
DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE = lib.ZSTD_DStreamOutSize()

new_nonzero = ffi.new_allocator(should_clear_after_alloc=False)


MAX_COMPRESSION_LEVEL = lib.ZSTD_maxCLevel()
MAGIC_NUMBER = lib.ZSTD_MAGICNUMBER
FRAME_HEADER = b'\x28\xb5\x2f\xfd'
CONTENTSIZE_UNKNOWN = lib.ZSTD_CONTENTSIZE_UNKNOWN
CONTENTSIZE_ERROR = lib.ZSTD_CONTENTSIZE_ERROR
ZSTD_VERSION = (lib.ZSTD_VERSION_MAJOR, lib.ZSTD_VERSION_MINOR, lib.ZSTD_VERSION_RELEASE)

BLOCKSIZELOG_MAX = lib.ZSTD_BLOCKSIZELOG_MAX
BLOCKSIZE_MAX = lib.ZSTD_BLOCKSIZE_MAX
WINDOWLOG_MIN = lib.ZSTD_WINDOWLOG_MIN
WINDOWLOG_MAX = lib.ZSTD_WINDOWLOG_MAX
CHAINLOG_MIN = lib.ZSTD_CHAINLOG_MIN
CHAINLOG_MAX = lib.ZSTD_CHAINLOG_MAX
HASHLOG_MIN = lib.ZSTD_HASHLOG_MIN
HASHLOG_MAX = lib.ZSTD_HASHLOG_MAX
HASHLOG3_MAX = lib.ZSTD_HASHLOG3_MAX
SEARCHLOG_MIN = lib.ZSTD_SEARCHLOG_MIN
SEARCHLOG_MAX = lib.ZSTD_SEARCHLOG_MAX
SEARCHLENGTH_MIN = lib.ZSTD_SEARCHLENGTH_MIN
SEARCHLENGTH_MAX = lib.ZSTD_SEARCHLENGTH_MAX
TARGETLENGTH_MIN = lib.ZSTD_TARGETLENGTH_MIN
TARGETLENGTH_MAX = lib.ZSTD_TARGETLENGTH_MAX
LDM_MINMATCH_MIN = lib.ZSTD_LDM_MINMATCH_MIN
LDM_MINMATCH_MAX = lib.ZSTD_LDM_MINMATCH_MAX
LDM_BUCKETSIZELOG_MAX = lib.ZSTD_LDM_BUCKETSIZELOG_MAX

STRATEGY_FAST = lib.ZSTD_fast
STRATEGY_DFAST = lib.ZSTD_dfast
STRATEGY_GREEDY = lib.ZSTD_greedy
STRATEGY_LAZY = lib.ZSTD_lazy
STRATEGY_LAZY2 = lib.ZSTD_lazy2
STRATEGY_BTLAZY2 = lib.ZSTD_btlazy2
STRATEGY_BTOPT = lib.ZSTD_btopt
STRATEGY_BTULTRA = lib.ZSTD_btultra

DICT_TYPE_AUTO = lib.ZSTD_dct_auto
DICT_TYPE_RAWCONTENT = lib.ZSTD_dct_rawContent
DICT_TYPE_FULLDICT = lib.ZSTD_dct_fullDict

FORMAT_ZSTD1 = lib.ZSTD_f_zstd1
FORMAT_ZSTD1_MAGICLESS = lib.ZSTD_f_zstd1_magicless

COMPRESSOBJ_FLUSH_FINISH = 0
COMPRESSOBJ_FLUSH_BLOCK = 1


def _cpu_count():
    # os.cpu_count() was introducd in Python 3.4.
    try:
        return os.cpu_count() or 0
    except AttributeError:
        pass

    # Linux.
    try:
        if sys.version_info[0] == 2:
            return os.sysconf(b'SC_NPROCESSORS_ONLN')
        else:
            return os.sysconf(u'SC_NPROCESSORS_ONLN')
    except (AttributeError, ValueError):
        pass

    # TODO implement on other platforms.
    return 0


class ZstdError(Exception):
    pass


def _zstd_error(zresult):
    # Resolves to bytes on Python 2 and 3. We use the string for formatting
    # into error messages, which will be literal unicode. So convert it to
    # unicode.
    return ffi.string(lib.ZSTD_getErrorName(zresult)).decode('utf-8')

def _make_cctx_params(params):
    res = lib.ZSTD_createCCtxParams()
    if res == ffi.NULL:
        raise MemoryError()

    res = ffi.gc(res, lib.ZSTD_freeCCtxParams)

    attrs = [
        (lib.ZSTD_p_format, params.format),
        (lib.ZSTD_p_compressionLevel, params.compression_level),
        (lib.ZSTD_p_windowLog, params.window_log),
        (lib.ZSTD_p_hashLog, params.hash_log),
        (lib.ZSTD_p_chainLog, params.chain_log),
        (lib.ZSTD_p_searchLog, params.search_log),
        (lib.ZSTD_p_minMatch, params.min_match),
        (lib.ZSTD_p_targetLength, params.target_length),
        (lib.ZSTD_p_compressionStrategy, params.compression_strategy),
        (lib.ZSTD_p_contentSizeFlag, params.write_content_size),
        (lib.ZSTD_p_checksumFlag, params.write_checksum),
        (lib.ZSTD_p_dictIDFlag, params.write_dict_id),
        (lib.ZSTD_p_nbWorkers, params.threads),
        (lib.ZSTD_p_jobSize, params.job_size),
        (lib.ZSTD_p_overlapSizeLog, params.overlap_size_log),
        (lib.ZSTD_p_forceMaxWindow, params.force_max_window),
        (lib.ZSTD_p_enableLongDistanceMatching, params.enable_ldm),
        (lib.ZSTD_p_ldmHashLog, params.ldm_hash_log),
        (lib.ZSTD_p_ldmMinMatch, params.ldm_min_match),
        (lib.ZSTD_p_ldmBucketSizeLog, params.ldm_bucket_size_log),
        (lib.ZSTD_p_ldmHashEveryLog, params.ldm_hash_every_log),
    ]

    for param, value in attrs:
        _set_compression_parameter(res, param, value)

    return res

class ZstdCompressionParameters(object):
    @staticmethod
    def from_level(level, source_size=0, dict_size=0, **kwargs):
        params = lib.ZSTD_getCParams(level, source_size, dict_size)

        args = {
            'window_log': 'windowLog',
            'chain_log': 'chainLog',
            'hash_log': 'hashLog',
            'search_log': 'searchLog',
            'min_match': 'searchLength',
            'target_length': 'targetLength',
            'compression_strategy': 'strategy',
        }

        for arg, attr in args.items():
            if arg not in kwargs:
                kwargs[arg] = getattr(params, attr)

        return ZstdCompressionParameters(**kwargs)

    def __init__(self, format=0, compression_level=0, window_log=0, hash_log=0,
                 chain_log=0, search_log=0, min_match=0, target_length=0,
                 compression_strategy=0, write_content_size=1, write_checksum=0,
                 write_dict_id=0, job_size=0, overlap_size_log=0,
                 force_max_window=0, enable_ldm=0, ldm_hash_log=0,
                 ldm_min_match=0, ldm_bucket_size_log=0, ldm_hash_every_log=0,
                 threads=0):

        if threads < 0:
            threads = _cpu_count()

        self.format = format
        self.compression_level = compression_level
        self.window_log = window_log
        self.hash_log = hash_log
        self.chain_log = chain_log
        self.search_log = search_log
        self.min_match = min_match
        self.target_length = target_length
        self.compression_strategy = compression_strategy
        self.write_content_size = write_content_size
        self.write_checksum = write_checksum
        self.write_dict_id = write_dict_id
        self.job_size = job_size
        self.overlap_size_log = overlap_size_log
        self.force_max_window = force_max_window
        self.enable_ldm = enable_ldm
        self.ldm_hash_log = ldm_hash_log
        self.ldm_min_match = ldm_min_match
        self.ldm_bucket_size_log = ldm_bucket_size_log
        self.ldm_hash_every_log = ldm_hash_every_log
        self.threads = threads

        self.params = _make_cctx_params(self)

    def estimated_compression_context_size(self):
        return lib.ZSTD_estimateCCtxSize_usingCCtxParams(self.params)

CompressionParameters = ZstdCompressionParameters

def estimate_decompression_context_size():
    return lib.ZSTD_estimateDCtxSize()


def _set_compression_parameter(params, param, value):
    zresult = lib.ZSTD_CCtxParam_setParameter(params, param,
                                              ffi.cast('unsigned', value))
    if lib.ZSTD_isError(zresult):
        raise ZstdError('unable to set compression context parameter: %s' %
                        _zstd_error(zresult))

class ZstdCompressionWriter(object):
    def __init__(self, compressor, writer, source_size, write_size):
        self._compressor = compressor
        self._writer = writer
        self._source_size = source_size
        self._write_size = write_size
        self._entered = False
        self._bytes_compressed = 0

    def __enter__(self):
        if self._entered:
            raise ZstdError('cannot __enter__ multiple times')

        zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._compressor._cctx,
                                                  self._source_size)
        if lib.ZSTD_isError(zresult):
            raise ZstdError('error setting source size: %s' %
                            _zstd_error(zresult))

        self._entered = True
        return self

    def __exit__(self, exc_type, exc_value, exc_tb):
        self._entered = False

        if not exc_type and not exc_value and not exc_tb:
            dst_buffer = ffi.new('char[]', self._write_size)

            out_buffer = ffi.new('ZSTD_outBuffer *')
            in_buffer = ffi.new('ZSTD_inBuffer *')

            out_buffer.dst = dst_buffer
            out_buffer.size = len(dst_buffer)
            out_buffer.pos = 0

            in_buffer.src = ffi.NULL
            in_buffer.size = 0
            in_buffer.pos = 0

            while True:
                zresult = lib.ZSTD_compress_generic(self._compressor._cctx,
                                                    out_buffer, in_buffer,
                                                    lib.ZSTD_e_end)

                if lib.ZSTD_isError(zresult):
                    raise ZstdError('error ending compression stream: %s' %
                                    _zstd_error(zresult))

                if out_buffer.pos:
                    self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:])
                    out_buffer.pos = 0

                if zresult == 0:
                    break

        self._compressor = None

        return False

    def memory_size(self):
        if not self._entered:
            raise ZstdError('cannot determine size of an inactive compressor; '
                            'call when a context manager is active')

        return lib.ZSTD_sizeof_CCtx(self._compressor._cctx)

    def write(self, data):
        if not self._entered:
            raise ZstdError('write() must be called from an active context '
                            'manager')

        total_write = 0

        data_buffer = ffi.from_buffer(data)

        in_buffer = ffi.new('ZSTD_inBuffer *')
        in_buffer.src = data_buffer
        in_buffer.size = len(data_buffer)
        in_buffer.pos = 0

        out_buffer = ffi.new('ZSTD_outBuffer *')
        dst_buffer = ffi.new('char[]', self._write_size)
        out_buffer.dst = dst_buffer
        out_buffer.size = self._write_size
        out_buffer.pos = 0

        while in_buffer.pos < in_buffer.size:
            zresult = lib.ZSTD_compress_generic(self._compressor._cctx,
                                                out_buffer, in_buffer,
                                                lib.ZSTD_e_continue)
            if lib.ZSTD_isError(zresult):
                raise ZstdError('zstd compress error: %s' %
                                _zstd_error(zresult))

            if out_buffer.pos:
                self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:])
                total_write += out_buffer.pos
                self._bytes_compressed += out_buffer.pos
                out_buffer.pos = 0

        return total_write

    def flush(self):
        if not self._entered:
            raise ZstdError('flush must be called from an active context manager')

        total_write = 0

        out_buffer = ffi.new('ZSTD_outBuffer *')
        dst_buffer = ffi.new('char[]', self._write_size)
        out_buffer.dst = dst_buffer
        out_buffer.size = self._write_size
        out_buffer.pos = 0

        in_buffer = ffi.new('ZSTD_inBuffer *')
        in_buffer.src = ffi.NULL
        in_buffer.size = 0
        in_buffer.pos = 0

        while True:
            zresult = lib.ZSTD_compress_generic(self._compressor._cctx,
                                                out_buffer, in_buffer,
                                                lib.ZSTD_e_flush)
            if lib.ZSTD_isError(zresult):
                raise ZstdError('zstd compress error: %s' %
                                _zstd_error(zresult))

            if out_buffer.pos:
                self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:])
                total_write += out_buffer.pos
                self._bytes_compressed += out_buffer.pos
                out_buffer.pos = 0

            if not zresult:
                break

        return total_write

    def tell(self):
        return self._bytes_compressed


class ZstdCompressionObj(object):
    def compress(self, data):
        if self._finished:
            raise ZstdError('cannot call compress() after compressor finished')

        data_buffer = ffi.from_buffer(data)
        source = ffi.new('ZSTD_inBuffer *')
        source.src = data_buffer
        source.size = len(data_buffer)
        source.pos = 0

        chunks = []

        while source.pos < len(data):
            zresult = lib.ZSTD_compress_generic(self._compressor._cctx,
                                                self._out,
                                                source,
                                                lib.ZSTD_e_continue)
            if lib.ZSTD_isError(zresult):
                raise ZstdError('zstd compress error: %s' %
                                _zstd_error(zresult))

            if self._out.pos:
                chunks.append(ffi.buffer(self._out.dst, self._out.pos)[:])
                self._out.pos = 0

        return b''.join(chunks)

    def flush(self, flush_mode=COMPRESSOBJ_FLUSH_FINISH):
        if flush_mode not in (COMPRESSOBJ_FLUSH_FINISH, COMPRESSOBJ_FLUSH_BLOCK):
            raise ValueError('flush mode not recognized')

        if self._finished:
            raise ZstdError('compressor object already finished')

        if flush_mode == COMPRESSOBJ_FLUSH_BLOCK:
            z_flush_mode = lib.ZSTD_e_flush
        elif flush_mode == COMPRESSOBJ_FLUSH_FINISH:
            z_flush_mode = lib.ZSTD_e_end
            self._finished = True
        else:
            raise ZstdError('unhandled flush mode')

        assert self._out.pos == 0

        in_buffer = ffi.new('ZSTD_inBuffer *')
        in_buffer.src = ffi.NULL
        in_buffer.size = 0
        in_buffer.pos = 0

        chunks = []

        while True:
            zresult = lib.ZSTD_compress_generic(self._compressor._cctx,
                                                self._out,
                                                in_buffer,
                                                z_flush_mode)
            if lib.ZSTD_isError(zresult):
                raise ZstdError('error ending compression stream: %s' %
                                _zstd_error(zresult))

            if self._out.pos:
                chunks.append(ffi.buffer(self._out.dst, self._out.pos)[:])
                self._out.pos = 0

            if not zresult:
                break

        return b''.join(chunks)


class ZstdCompressionChunker(object):
    def __init__(self, compressor, chunk_size):
        self._compressor = compressor
        self._out = ffi.new('ZSTD_outBuffer *')
        self._dst_buffer = ffi.new('char[]', chunk_size)
        self._out.dst = self._dst_buffer
        self._out.size = chunk_size
        self._out.pos = 0

        self._in = ffi.new('ZSTD_inBuffer *')
        self._in.src = ffi.NULL
        self._in.size = 0
        self._in.pos = 0
        self._finished = False

    def compress(self, data):
        if self._finished:
            raise ZstdError('cannot call compress() after compression finished')

        if self._in.src != ffi.NULL:
            raise ZstdError('cannot perform operation before consuming output '
                            'from previous operation')

        data_buffer = ffi.from_buffer(data)

        if not len(data_buffer):
            return

        self._in.src = data_buffer
        self._in.size = len(data_buffer)
        self._in.pos = 0

        while self._in.pos < self._in.size:
            zresult = lib.ZSTD_compress_generic(self._compressor._cctx,
                                                self._out,
                                                self._in,
                                                lib.ZSTD_e_continue)

            if self._in.pos == self._in.size:
                self._in.src = ffi.NULL
                self._in.size = 0
                self._in.pos = 0

            if lib.ZSTD_isError(zresult):
                raise ZstdError('zstd compress error: %s' %
                                _zstd_error(zresult))

            if self._out.pos == self._out.size:
                yield ffi.buffer(self._out.dst, self._out.pos)[:]
                self._out.pos = 0

    def flush(self):
        if self._finished:
            raise ZstdError('cannot call flush() after compression finished')

        if self._in.src != ffi.NULL:
            raise ZstdError('cannot call flush() before consuming output from '
                            'previous operation')

        while True:
            zresult = lib.ZSTD_compress_generic(self._compressor._cctx,
                                                self._out, self._in,
                                                lib.ZSTD_e_flush)
            if lib.ZSTD_isError(zresult):
                raise ZstdError('zstd compress error: %s' % _zstd_error(zresult))

            if self._out.pos:
                yield ffi.buffer(self._out.dst, self._out.pos)[:]
                self._out.pos = 0

            if not zresult:
                return

    def finish(self):
        if self._finished:
            raise ZstdError('cannot call finish() after compression finished')

        if self._in.src != ffi.NULL:
            raise ZstdError('cannot call finish() before consuming output from '
                            'previous operation')

        while True:
            zresult = lib.ZSTD_compress_generic(self._compressor._cctx,
                                                self._out, self._in,
                                                lib.ZSTD_e_end)
            if lib.ZSTD_isError(zresult):
                raise ZstdError('zstd compress error: %s' % _zstd_error(zresult))

            if self._out.pos:
                yield ffi.buffer(self._out.dst, self._out.pos)[:]
                self._out.pos = 0

            if not zresult:
                self._finished = True
                return


class CompressionReader(object):
    def __init__(self, compressor, source, read_size):
        self._compressor = compressor
        self._source = source
        self._read_size = read_size
        self._entered = False
        self._closed = False
        self._bytes_compressed = 0
        self._finished_input = False
        self._finished_output = False

        self._in_buffer = ffi.new('ZSTD_inBuffer *')
        # Holds a ref so backing bytes in self._in_buffer stay alive.
        self._source_buffer = None

    def __enter__(self):
        if self._entered:
            raise ValueError('cannot __enter__ multiple times')

        self._entered = True
        return self

    def __exit__(self, exc_type, exc_value, exc_tb):
        self._entered = False
        self._closed = True
        self._source = None
        self._compressor = None

        return False

    def readable(self):
        return True

    def writable(self):
        return False

    def seekable(self):
        return False

    def readline(self):
        raise io.UnsupportedOperation()

    def readlines(self):
        raise io.UnsupportedOperation()

    def write(self, data):
        raise OSError('stream is not writable')

    def writelines(self, ignored):
        raise OSError('stream is not writable')

    def isatty(self):
        return False

    def flush(self):
        return None

    def close(self):
        self._closed = True
        return None

    @property
    def closed(self):
        return self._closed

    def tell(self):
        return self._bytes_compressed

    def readall(self):
        raise NotImplementedError()

    def __iter__(self):
        raise io.UnsupportedOperation()

    def __next__(self):
        raise io.UnsupportedOperation()

    next = __next__

    def read(self, size=-1):
        if self._closed:
            raise ValueError('stream is closed')

        if self._finished_output:
            return b''

        if size < 1:
            raise ValueError('cannot read negative or size 0 amounts')

        # Need a dedicated ref to dest buffer otherwise it gets collected.
        dst_buffer = ffi.new('char[]', size)
        out_buffer = ffi.new('ZSTD_outBuffer *')
        out_buffer.dst = dst_buffer
        out_buffer.size = size
        out_buffer.pos = 0

        def compress_input():
            if self._in_buffer.pos >= self._in_buffer.size:
                return

            old_pos = out_buffer.pos

            zresult = lib.ZSTD_compress_generic(self._compressor._cctx,
                                                out_buffer, self._in_buffer,
                                                lib.ZSTD_e_continue)

            self._bytes_compressed += out_buffer.pos - old_pos

            if self._in_buffer.pos == self._in_buffer.size:
                self._in_buffer.src = ffi.NULL
                self._in_buffer.pos = 0
                self._in_buffer.size = 0
                self._source_buffer = None

                if not hasattr(self._source, 'read'):
                    self._finished_input = True

            if lib.ZSTD_isError(zresult):
                raise ZstdError('zstd compress error: %s',
                                _zstd_error(zresult))

            if out_buffer.pos and out_buffer.pos == out_buffer.size:
                return ffi.buffer(out_buffer.dst, out_buffer.pos)[:]

        def get_input():
            if self._finished_input:
                return

            if hasattr(self._source, 'read'):
                data = self._source.read(self._read_size)

                if not data:
                    self._finished_input = True
                    return

                self._source_buffer = ffi.from_buffer(data)
                self._in_buffer.src = self._source_buffer
                self._in_buffer.size = len(self._source_buffer)
                self._in_buffer.pos = 0
            else:
                self._source_buffer = ffi.from_buffer(self._source)
                self._in_buffer.src = self._source_buffer
                self._in_buffer.size = len(self._source_buffer)
                self._in_buffer.pos = 0

        result = compress_input()
        if result:
            return result

        while not self._finished_input:
            get_input()
            result = compress_input()
            if result:
                return result

        # EOF
        old_pos = out_buffer.pos

        zresult = lib.ZSTD_compress_generic(self._compressor._cctx,
                                            out_buffer, self._in_buffer,
                                            lib.ZSTD_e_end)

        self._bytes_compressed += out_buffer.pos - old_pos

        if lib.ZSTD_isError(zresult):
            raise ZstdError('error ending compression stream: %s',
                            _zstd_error(zresult))

        if zresult == 0:
            self._finished_output = True

        return ffi.buffer(out_buffer.dst, out_buffer.pos)[:]

class ZstdCompressor(object):
    def __init__(self, level=3, dict_data=None, compression_params=None,
                 write_checksum=None, write_content_size=None,
                 write_dict_id=None, threads=0):
        if level > lib.ZSTD_maxCLevel():
            raise ValueError('level must be less than %d' % lib.ZSTD_maxCLevel())

        if threads < 0:
            threads = _cpu_count()

        if compression_params and write_checksum is not None:
            raise ValueError('cannot define compression_params and '
                             'write_checksum')

        if compression_params and write_content_size is not None:
            raise ValueError('cannot define compression_params and '
                             'write_content_size')

        if compression_params and write_dict_id is not None:
            raise ValueError('cannot define compression_params and '
                             'write_dict_id')

        if compression_params and threads:
            raise ValueError('cannot define compression_params and threads')

        if compression_params:
            self._params = _make_cctx_params(compression_params)
        else:
            if write_dict_id is None:
                write_dict_id = True

            params = lib.ZSTD_createCCtxParams()
            if params == ffi.NULL:
                raise MemoryError()

            self._params = ffi.gc(params, lib.ZSTD_freeCCtxParams)

            _set_compression_parameter(self._params,
                                       lib.ZSTD_p_compressionLevel,
                                       level)

            _set_compression_parameter(
                self._params,
                lib.ZSTD_p_contentSizeFlag,
                write_content_size if write_content_size is not None else 1)

            _set_compression_parameter(self._params,
                                       lib.ZSTD_p_checksumFlag,
                                       1 if write_checksum else 0)

            _set_compression_parameter(self._params,
                                       lib.ZSTD_p_dictIDFlag,
                                       1 if write_dict_id else 0)

            if threads:
                _set_compression_parameter(self._params,
                                           lib.ZSTD_p_nbWorkers,
                                           threads)

        cctx = lib.ZSTD_createCCtx()
        if cctx == ffi.NULL:
            raise MemoryError()

        self._cctx = cctx
        self._dict_data = dict_data

        # We defer setting up garbage collection until after calling
        # _setup_cctx() to ensure the memory size estimate is more accurate.
        try:
            self._setup_cctx()
        finally:
            self._cctx = ffi.gc(cctx, lib.ZSTD_freeCCtx,
                                size=lib.ZSTD_sizeof_CCtx(cctx))

    def _setup_cctx(self):
        zresult = lib.ZSTD_CCtx_setParametersUsingCCtxParams(self._cctx,
                                                             self._params)
        if lib.ZSTD_isError(zresult):
            raise ZstdError('could not set compression parameters: %s' %
                            _zstd_error(zresult))

        dict_data = self._dict_data

        if dict_data:
            if dict_data._cdict:
                zresult = lib.ZSTD_CCtx_refCDict(self._cctx, dict_data._cdict)
            else:
                zresult = lib.ZSTD_CCtx_loadDictionary_advanced(
                    self._cctx, dict_data.as_bytes(), len(dict_data),
                    lib.ZSTD_dlm_byRef, dict_data._dict_type)

            if lib.ZSTD_isError(zresult):
                raise ZstdError('could not load compression dictionary: %s' %
                                _zstd_error(zresult))

    def memory_size(self):
        return lib.ZSTD_sizeof_CCtx(self._cctx)

    def compress(self, data):
        lib.ZSTD_CCtx_reset(self._cctx)

        data_buffer = ffi.from_buffer(data)

        dest_size = lib.ZSTD_compressBound(len(data_buffer))
        out = new_nonzero('char[]', dest_size)

        zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, len(data_buffer))
        if lib.ZSTD_isError(zresult):
            raise ZstdError('error setting source size: %s' %
                            _zstd_error(zresult))

        out_buffer = ffi.new('ZSTD_outBuffer *')
        in_buffer = ffi.new('ZSTD_inBuffer *')

        out_buffer.dst = out
        out_buffer.size = dest_size
        out_buffer.pos = 0

        in_buffer.src = data_buffer
        in_buffer.size = len(data_buffer)
        in_buffer.pos = 0

        zresult = lib.ZSTD_compress_generic(self._cctx,
                                            out_buffer,
                                            in_buffer,
                                            lib.ZSTD_e_end)

        if lib.ZSTD_isError(zresult):
            raise ZstdError('cannot compress: %s' %
                            _zstd_error(zresult))
        elif zresult:
            raise ZstdError('unexpected partial frame flush')

        return ffi.buffer(out, out_buffer.pos)[:]

    def compressobj(self, size=-1):
        lib.ZSTD_CCtx_reset(self._cctx)

        if size < 0:
            size = lib.ZSTD_CONTENTSIZE_UNKNOWN

        zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size)
        if lib.ZSTD_isError(zresult):
            raise ZstdError('error setting source size: %s' %
                            _zstd_error(zresult))

        cobj = ZstdCompressionObj()
        cobj._out = ffi.new('ZSTD_outBuffer *')
        cobj._dst_buffer = ffi.new('char[]', COMPRESSION_RECOMMENDED_OUTPUT_SIZE)
        cobj._out.dst = cobj._dst_buffer
        cobj._out.size = COMPRESSION_RECOMMENDED_OUTPUT_SIZE
        cobj._out.pos = 0
        cobj._compressor = self
        cobj._finished = False

        return cobj

    def chunker(self, size=-1, chunk_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE):
        lib.ZSTD_CCtx_reset(self._cctx)

        if size < 0:
            size = lib.ZSTD_CONTENTSIZE_UNKNOWN

        zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size)
        if lib.ZSTD_isError(zresult):
            raise ZstdError('error setting source size: %s' %
                            _zstd_error(zresult))

        return ZstdCompressionChunker(self, chunk_size=chunk_size)

    def copy_stream(self, ifh, ofh, size=-1,
                    read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE,
                    write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE):

        if not hasattr(ifh, 'read'):
            raise ValueError('first argument must have a read() method')
        if not hasattr(ofh, 'write'):
            raise ValueError('second argument must have a write() method')

        lib.ZSTD_CCtx_reset(self._cctx)

        if size < 0:
            size = lib.ZSTD_CONTENTSIZE_UNKNOWN

        zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size)
        if lib.ZSTD_isError(zresult):
            raise ZstdError('error setting source size: %s' %
                            _zstd_error(zresult))

        in_buffer = ffi.new('ZSTD_inBuffer *')
        out_buffer = ffi.new('ZSTD_outBuffer *')

        dst_buffer = ffi.new('char[]', write_size)
        out_buffer.dst = dst_buffer
        out_buffer.size = write_size
        out_buffer.pos = 0

        total_read, total_write = 0, 0

        while True:
            data = ifh.read(read_size)
            if not data:
                break

            data_buffer = ffi.from_buffer(data)
            total_read += len(data_buffer)
            in_buffer.src = data_buffer
            in_buffer.size = len(data_buffer)
            in_buffer.pos = 0

            while in_buffer.pos < in_buffer.size:
                zresult = lib.ZSTD_compress_generic(self._cctx,
                                                    out_buffer,
                                                    in_buffer,
                                                    lib.ZSTD_e_continue)
                if lib.ZSTD_isError(zresult):
                    raise ZstdError('zstd compress error: %s' %
                                    _zstd_error(zresult))

                if out_buffer.pos:
                    ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos))
                    total_write += out_buffer.pos
                    out_buffer.pos = 0

        # We've finished reading. Flush the compressor.
        while True:
            zresult = lib.ZSTD_compress_generic(self._cctx,
                                                out_buffer,
                                                in_buffer,
                                                lib.ZSTD_e_end)
            if lib.ZSTD_isError(zresult):
                raise ZstdError('error ending compression stream: %s' %
                                _zstd_error(zresult))

            if out_buffer.pos:
                ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos))
                total_write += out_buffer.pos
                out_buffer.pos = 0

            if zresult == 0:
                break

        return total_read, total_write

    def stream_reader(self, source, size=-1,
                      read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE):
        lib.ZSTD_CCtx_reset(self._cctx)

        try:
            size = len(source)
        except Exception:
            pass

        if size < 0:
            size = lib.ZSTD_CONTENTSIZE_UNKNOWN

        zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size)
        if lib.ZSTD_isError(zresult):
            raise ZstdError('error setting source size: %s' %
                            _zstd_error(zresult))

        return CompressionReader(self, source, read_size)

    def stream_writer(self, writer, size=-1,
                 write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE):

        if not hasattr(writer, 'write'):
            raise ValueError('must pass an object with a write() method')

        lib.ZSTD_CCtx_reset(self._cctx)

        if size < 0:
            size = lib.ZSTD_CONTENTSIZE_UNKNOWN

        return ZstdCompressionWriter(self, writer, size, write_size)

    write_to = stream_writer

    def read_to_iter(self, reader, size=-1,
                     read_size=COMPRESSION_RECOMMENDED_INPUT_SIZE,
                     write_size=COMPRESSION_RECOMMENDED_OUTPUT_SIZE):
        if hasattr(reader, 'read'):
            have_read = True
        elif hasattr(reader, '__getitem__'):
            have_read = False
            buffer_offset = 0
            size = len(reader)
        else:
            raise ValueError('must pass an object with a read() method or '
                             'conforms to buffer protocol')

        lib.ZSTD_CCtx_reset(self._cctx)

        if size < 0:
            size = lib.ZSTD_CONTENTSIZE_UNKNOWN

        zresult = lib.ZSTD_CCtx_setPledgedSrcSize(self._cctx, size)
        if lib.ZSTD_isError(zresult):
            raise ZstdError('error setting source size: %s' %
                            _zstd_error(zresult))

        in_buffer = ffi.new('ZSTD_inBuffer *')
        out_buffer = ffi.new('ZSTD_outBuffer *')

        in_buffer.src = ffi.NULL
        in_buffer.size = 0
        in_buffer.pos = 0

        dst_buffer = ffi.new('char[]', write_size)
        out_buffer.dst = dst_buffer
        out_buffer.size = write_size
        out_buffer.pos = 0

        while True:
            # We should never have output data sitting around after a previous
            # iteration.
            assert out_buffer.pos == 0

            # Collect input data.
            if have_read:
                read_result = reader.read(read_size)
            else:
                remaining = len(reader) - buffer_offset
                slice_size = min(remaining, read_size)
                read_result = reader[buffer_offset:buffer_offset + slice_size]
                buffer_offset += slice_size

            # No new input data. Break out of the read loop.
            if not read_result:
                break

            # Feed all read data into the compressor and emit output until
            # exhausted.
            read_buffer = ffi.from_buffer(read_result)
            in_buffer.src = read_buffer
            in_buffer.size = len(read_buffer)
            in_buffer.pos = 0

            while in_buffer.pos < in_buffer.size:
                zresult = lib.ZSTD_compress_generic(self._cctx, out_buffer, in_buffer,
                                                    lib.ZSTD_e_continue)
                if lib.ZSTD_isError(zresult):
                    raise ZstdError('zstd compress error: %s' %
                                    _zstd_error(zresult))

                if out_buffer.pos:
                    data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:]
                    out_buffer.pos = 0
                    yield data

            assert out_buffer.pos == 0

            # And repeat the loop to collect more data.
            continue

        # If we get here, input is exhausted. End the stream and emit what
        # remains.
        while True:
            assert out_buffer.pos == 0
            zresult = lib.ZSTD_compress_generic(self._cctx,
                                                out_buffer,
                                                in_buffer,
                                                lib.ZSTD_e_end)
            if lib.ZSTD_isError(zresult):
                raise ZstdError('error ending compression stream: %s' %
                                _zstd_error(zresult))

            if out_buffer.pos:
                data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:]
                out_buffer.pos = 0
                yield data

            if zresult == 0:
                break

    read_from = read_to_iter

    def frame_progression(self):
        progression = lib.ZSTD_getFrameProgression(self._cctx)

        return progression.ingested, progression.consumed, progression.produced


class FrameParameters(object):
    def __init__(self, fparams):
        self.content_size = fparams.frameContentSize
        self.window_size = fparams.windowSize
        self.dict_id = fparams.dictID
        self.has_checksum = bool(fparams.checksumFlag)


def frame_content_size(data):
    data_buffer = ffi.from_buffer(data)

    size = lib.ZSTD_getFrameContentSize(data_buffer, len(data_buffer))

    if size == lib.ZSTD_CONTENTSIZE_ERROR:
        raise ZstdError('error when determining content size')
    elif size == lib.ZSTD_CONTENTSIZE_UNKNOWN:
        return -1
    else:
        return size


def frame_header_size(data):
    data_buffer = ffi.from_buffer(data)

    zresult = lib.ZSTD_frameHeaderSize(data_buffer, len(data_buffer))
    if lib.ZSTD_isError(zresult):
        raise ZstdError('could not determine frame header size: %s' %
                        _zstd_error(zresult))

    return zresult


def get_frame_parameters(data):
    params = ffi.new('ZSTD_frameHeader *')

    data_buffer = ffi.from_buffer(data)
    zresult = lib.ZSTD_getFrameHeader(params, data_buffer, len(data_buffer))
    if lib.ZSTD_isError(zresult):
        raise ZstdError('cannot get frame parameters: %s' %
                        _zstd_error(zresult))

    if zresult:
        raise ZstdError('not enough data for frame parameters; need %d bytes' %
                        zresult)

    return FrameParameters(params[0])


class ZstdCompressionDict(object):
    def __init__(self, data, dict_type=DICT_TYPE_AUTO, k=0, d=0):
        assert isinstance(data, bytes_type)
        self._data = data
        self.k = k
        self.d = d

        if dict_type not in (DICT_TYPE_AUTO, DICT_TYPE_RAWCONTENT,
                             DICT_TYPE_FULLDICT):
            raise ValueError('invalid dictionary load mode: %d; must use '
                             'DICT_TYPE_* constants')

        self._dict_type = dict_type
        self._cdict = None

    def __len__(self):
        return len(self._data)

    def dict_id(self):
        return int_type(lib.ZDICT_getDictID(self._data, len(self._data)))

    def as_bytes(self):
        return self._data

    def precompute_compress(self, level=0, compression_params=None):
        if level and compression_params:
            raise ValueError('must only specify one of level or '
                             'compression_params')

        if not level and not compression_params:
            raise ValueError('must specify one of level or compression_params')

        if level:
            cparams = lib.ZSTD_getCParams(level, 0, len(self._data))
        else:
            cparams = ffi.new('ZSTD_compressionParameters')
            cparams.chainLog = compression_params.chain_log
            cparams.hashLog = compression_params.hash_log
            cparams.searchLength = compression_params.min_match
            cparams.searchLog = compression_params.search_log
            cparams.strategy = compression_params.compression_strategy
            cparams.targetLength = compression_params.target_length
            cparams.windowLog = compression_params.window_log

        cdict = lib.ZSTD_createCDict_advanced(self._data, len(self._data),
                                              lib.ZSTD_dlm_byRef,
                                              self._dict_type,
                                              cparams,
                                              lib.ZSTD_defaultCMem)
        if cdict == ffi.NULL:
            raise ZstdError('unable to precompute dictionary')

        self._cdict = ffi.gc(cdict, lib.ZSTD_freeCDict,
                             size=lib.ZSTD_sizeof_CDict(cdict))

    @property
    def _ddict(self):
        ddict = lib.ZSTD_createDDict_advanced(self._data, len(self._data),
                                              lib.ZSTD_dlm_byRef,
                                              self._dict_type,
                                              lib.ZSTD_defaultCMem)

        if ddict == ffi.NULL:
            raise ZstdError('could not create decompression dict')

        ddict = ffi.gc(ddict, lib.ZSTD_freeDDict,
                       size=lib.ZSTD_sizeof_DDict(ddict))
        self.__dict__['_ddict'] = ddict

        return ddict

def train_dictionary(dict_size, samples, k=0, d=0, notifications=0, dict_id=0,
                     level=0, steps=0, threads=0):
    if not isinstance(samples, list):
        raise TypeError('samples must be a list')

    if threads < 0:
        threads = _cpu_count()

    total_size = sum(map(len, samples))

    samples_buffer = new_nonzero('char[]', total_size)
    sample_sizes = new_nonzero('size_t[]', len(samples))

    offset = 0
    for i, sample in enumerate(samples):
        if not isinstance(sample, bytes_type):
            raise ValueError('samples must be bytes')

        l = len(sample)
        ffi.memmove(samples_buffer + offset, sample, l)
        offset += l
        sample_sizes[i] = l

    dict_data = new_nonzero('char[]', dict_size)

    dparams = ffi.new('ZDICT_cover_params_t *')[0]
    dparams.k = k
    dparams.d = d
    dparams.steps = steps
    dparams.nbThreads = threads
    dparams.zParams.notificationLevel = notifications
    dparams.zParams.dictID = dict_id
    dparams.zParams.compressionLevel = level

    if (not dparams.k and not dparams.d and not dparams.steps
        and not dparams.nbThreads and not dparams.zParams.notificationLevel
        and not dparams.zParams.dictID
        and not dparams.zParams.compressionLevel):
        zresult = lib.ZDICT_trainFromBuffer(
            ffi.addressof(dict_data), dict_size,
            ffi.addressof(samples_buffer),
            ffi.addressof(sample_sizes, 0), len(samples))
    elif dparams.steps or dparams.nbThreads:
        zresult = lib.ZDICT_optimizeTrainFromBuffer_cover(
            ffi.addressof(dict_data), dict_size,
            ffi.addressof(samples_buffer),
            ffi.addressof(sample_sizes, 0), len(samples),
            ffi.addressof(dparams))
    else:
        zresult = lib.ZDICT_trainFromBuffer_cover(
            ffi.addressof(dict_data), dict_size,
            ffi.addressof(samples_buffer),
            ffi.addressof(sample_sizes, 0), len(samples),
            dparams)

    if lib.ZDICT_isError(zresult):
        msg = ffi.string(lib.ZDICT_getErrorName(zresult)).decode('utf-8')
        raise ZstdError('cannot train dict: %s' % msg)

    return ZstdCompressionDict(ffi.buffer(dict_data, zresult)[:],
                               dict_type=DICT_TYPE_FULLDICT,
                               k=dparams.k, d=dparams.d)


class ZstdDecompressionObj(object):
    def __init__(self, decompressor, write_size):
        self._decompressor = decompressor
        self._write_size = write_size
        self._finished = False

    def decompress(self, data):
        if self._finished:
            raise ZstdError('cannot use a decompressobj multiple times')

        in_buffer = ffi.new('ZSTD_inBuffer *')
        out_buffer = ffi.new('ZSTD_outBuffer *')

        data_buffer = ffi.from_buffer(data)
        in_buffer.src = data_buffer
        in_buffer.size = len(data_buffer)
        in_buffer.pos = 0

        dst_buffer = ffi.new('char[]', self._write_size)
        out_buffer.dst = dst_buffer
        out_buffer.size = len(dst_buffer)
        out_buffer.pos = 0

        chunks = []

        while True:
            zresult = lib.ZSTD_decompress_generic(self._decompressor._dctx,
                                                  out_buffer, in_buffer)
            if lib.ZSTD_isError(zresult):
                raise ZstdError('zstd decompressor error: %s' %
                                _zstd_error(zresult))

            if zresult == 0:
                self._finished = True
                self._decompressor = None

            if out_buffer.pos:
                chunks.append(ffi.buffer(out_buffer.dst, out_buffer.pos)[:])

            if (zresult == 0 or
                    (in_buffer.pos == in_buffer.size and out_buffer.pos == 0)):
                break

            out_buffer.pos = 0

        return b''.join(chunks)


class DecompressionReader(object):
    def __init__(self, decompressor, source, read_size):
        self._decompressor = decompressor
        self._source = source
        self._read_size = read_size
        self._entered = False
        self._closed = False
        self._bytes_decompressed = 0
        self._finished_input = False
        self._finished_output = False
        self._in_buffer = ffi.new('ZSTD_inBuffer *')
        # Holds a ref to self._in_buffer.src.
        self._source_buffer = None

    def __enter__(self):
        if self._entered:
            raise ValueError('cannot __enter__ multiple times')

        self._entered = True
        return self

    def __exit__(self, exc_type, exc_value, exc_tb):
        self._entered = False
        self._closed = True
        self._source = None
        self._decompressor = None

        return False

    def readable(self):
        return True

    def writable(self):
        return False

    def seekable(self):
        return True

    def readline(self):
        raise NotImplementedError()

    def readlines(self):
        raise NotImplementedError()

    def write(self, data):
        raise io.UnsupportedOperation()

    def writelines(self, lines):
        raise io.UnsupportedOperation()

    def isatty(self):
        return False

    def flush(self):
        return None

    def close(self):
        self._closed = True
        return None

    @property
    def closed(self):
        return self._closed

    def tell(self):
        return self._bytes_decompressed

    def readall(self):
        raise NotImplementedError()

    def __iter__(self):
        raise NotImplementedError()

    def __next__(self):
        raise NotImplementedError()

    next = __next__

    def read(self, size):
        if self._closed:
            raise ValueError('stream is closed')

        if self._finished_output:
            return b''

        if size < 1:
            raise ValueError('cannot read negative or size 0 amounts')

        dst_buffer = ffi.new('char[]', size)
        out_buffer = ffi.new('ZSTD_outBuffer *')
        out_buffer.dst = dst_buffer
        out_buffer.size = size
        out_buffer.pos = 0

        def decompress():
            zresult = lib.ZSTD_decompress_generic(self._decompressor._dctx,
                                                  out_buffer, self._in_buffer)

            if self._in_buffer.pos == self._in_buffer.size:
                self._in_buffer.src = ffi.NULL
                self._in_buffer.pos = 0
                self._in_buffer.size = 0
                self._source_buffer = None

                if not hasattr(self._source, 'read'):
                    self._finished_input = True

            if lib.ZSTD_isError(zresult):
                raise ZstdError('zstd decompress error: %s',
                                _zstd_error(zresult))
            elif zresult == 0:
                self._finished_output = True

            if out_buffer.pos and out_buffer.pos == out_buffer.size:
                self._bytes_decompressed += out_buffer.size
                return ffi.buffer(out_buffer.dst, out_buffer.pos)[:]

        def get_input():
            if self._finished_input:
                return

            if hasattr(self._source, 'read'):
                data = self._source.read(self._read_size)

                if not data:
                    self._finished_input = True
                    return

                self._source_buffer = ffi.from_buffer(data)
                self._in_buffer.src = self._source_buffer
                self._in_buffer.size = len(self._source_buffer)
                self._in_buffer.pos = 0
            else:
                self._source_buffer = ffi.from_buffer(self._source)
                self._in_buffer.src = self._source_buffer
                self._in_buffer.size = len(self._source_buffer)
                self._in_buffer.pos = 0

        get_input()
        result = decompress()
        if result:
            return result

        while not self._finished_input:
            get_input()
            result = decompress()
            if result:
                return result

        self._bytes_decompressed += out_buffer.pos
        return ffi.buffer(out_buffer.dst, out_buffer.pos)[:]

    def seek(self, pos, whence=os.SEEK_SET):
        if self._closed:
            raise ValueError('stream is closed')

        read_amount = 0

        if whence == os.SEEK_SET:
            if pos < 0:
                raise ValueError('cannot seek to negative position with SEEK_SET')

            if pos < self._bytes_decompressed:
                raise ValueError('cannot seek zstd decompression stream '
                                 'backwards')

            read_amount = pos - self._bytes_decompressed

        elif whence == os.SEEK_CUR:
            if pos < 0:
                raise ValueError('cannot seek zstd decompression stream '
                                 'backwards')

            read_amount = pos
        elif whence == os.SEEK_END:
            raise ValueError('zstd decompression streams cannot be seeked '
                             'with SEEK_END')

        while read_amount:
            result = self.read(min(read_amount,
                                   DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE))

            if not result:
                break

            read_amount -= len(result)

        return self._bytes_decompressed

class ZstdDecompressionWriter(object):
    def __init__(self, decompressor, writer, write_size):
        self._decompressor = decompressor
        self._writer = writer
        self._write_size = write_size
        self._entered = False

    def __enter__(self):
        if self._entered:
            raise ZstdError('cannot __enter__ multiple times')

        self._decompressor._ensure_dctx()
        self._entered = True

        return self

    def __exit__(self, exc_type, exc_value, exc_tb):
        self._entered = False

    def memory_size(self):
        if not self._decompressor._dctx:
            raise ZstdError('cannot determine size of inactive decompressor '
                            'call when context manager is active')

        return lib.ZSTD_sizeof_DCtx(self._decompressor._dctx)

    def write(self, data):
        if not self._entered:
            raise ZstdError('write must be called from an active context manager')

        total_write = 0

        in_buffer = ffi.new('ZSTD_inBuffer *')
        out_buffer = ffi.new('ZSTD_outBuffer *')

        data_buffer = ffi.from_buffer(data)
        in_buffer.src = data_buffer
        in_buffer.size = len(data_buffer)
        in_buffer.pos = 0

        dst_buffer = ffi.new('char[]', self._write_size)
        out_buffer.dst = dst_buffer
        out_buffer.size = len(dst_buffer)
        out_buffer.pos = 0

        dctx = self._decompressor._dctx

        while in_buffer.pos < in_buffer.size:
            zresult = lib.ZSTD_decompress_generic(dctx, out_buffer, in_buffer)
            if lib.ZSTD_isError(zresult):
                raise ZstdError('zstd decompress error: %s' %
                                _zstd_error(zresult))

            if out_buffer.pos:
                self._writer.write(ffi.buffer(out_buffer.dst, out_buffer.pos)[:])
                total_write += out_buffer.pos
                out_buffer.pos = 0

        return total_write


class ZstdDecompressor(object):
    def __init__(self, dict_data=None, max_window_size=0, format=FORMAT_ZSTD1):
        self._dict_data = dict_data
        self._max_window_size = max_window_size
        self._format = format

        dctx = lib.ZSTD_createDCtx()
        if dctx == ffi.NULL:
            raise MemoryError()

        self._dctx = dctx

        # Defer setting up garbage collection until full state is loaded so
        # the memory size is more accurate.
        try:
            self._ensure_dctx()
        finally:
            self._dctx = ffi.gc(dctx, lib.ZSTD_freeDCtx,
                                size=lib.ZSTD_sizeof_DCtx(dctx))

    def memory_size(self):
        return lib.ZSTD_sizeof_DCtx(self._dctx)

    def decompress(self, data, max_output_size=0):
        self._ensure_dctx()

        data_buffer = ffi.from_buffer(data)

        output_size = lib.ZSTD_getFrameContentSize(data_buffer, len(data_buffer))

        if output_size == lib.ZSTD_CONTENTSIZE_ERROR:
            raise ZstdError('error determining content size from frame header')
        elif output_size == 0:
            return b''
        elif output_size == lib.ZSTD_CONTENTSIZE_UNKNOWN:
            if not max_output_size:
                raise ZstdError('could not determine content size in frame header')

            result_buffer = ffi.new('char[]', max_output_size)
            result_size = max_output_size
            output_size = 0
        else:
            result_buffer = ffi.new('char[]', output_size)
            result_size = output_size

        out_buffer = ffi.new('ZSTD_outBuffer *')
        out_buffer.dst = result_buffer
        out_buffer.size = result_size
        out_buffer.pos = 0

        in_buffer = ffi.new('ZSTD_inBuffer *')
        in_buffer.src = data_buffer
        in_buffer.size = len(data_buffer)
        in_buffer.pos = 0

        zresult = lib.ZSTD_decompress_generic(self._dctx, out_buffer, in_buffer)
        if lib.ZSTD_isError(zresult):
            raise ZstdError('decompression error: %s' %
                            _zstd_error(zresult))
        elif zresult:
            raise ZstdError('decompression error: did not decompress full frame')
        elif output_size and out_buffer.pos != output_size:
            raise ZstdError('decompression error: decompressed %d bytes; expected %d' %
                            (zresult, output_size))

        return ffi.buffer(result_buffer, out_buffer.pos)[:]

    def stream_reader(self, source, read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE):
        self._ensure_dctx()
        return DecompressionReader(self, source, read_size)

    def decompressobj(self, write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE):
        if write_size < 1:
            raise ValueError('write_size must be positive')

        self._ensure_dctx()
        return ZstdDecompressionObj(self, write_size=write_size)

    def read_to_iter(self, reader, read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE,
                     write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE,
                     skip_bytes=0):
        if skip_bytes >= read_size:
            raise ValueError('skip_bytes must be smaller than read_size')

        if hasattr(reader, 'read'):
            have_read = True
        elif hasattr(reader, '__getitem__'):
            have_read = False
            buffer_offset = 0
            size = len(reader)
        else:
            raise ValueError('must pass an object with a read() method or '
                             'conforms to buffer protocol')

        if skip_bytes:
            if have_read:
                reader.read(skip_bytes)
            else:
                if skip_bytes > size:
                    raise ValueError('skip_bytes larger than first input chunk')

                buffer_offset = skip_bytes

        self._ensure_dctx()

        in_buffer = ffi.new('ZSTD_inBuffer *')
        out_buffer = ffi.new('ZSTD_outBuffer *')

        dst_buffer = ffi.new('char[]', write_size)
        out_buffer.dst = dst_buffer
        out_buffer.size = len(dst_buffer)
        out_buffer.pos = 0

        while True:
            assert out_buffer.pos == 0

            if have_read:
                read_result = reader.read(read_size)
            else:
                remaining = size - buffer_offset
                slice_size = min(remaining, read_size)
                read_result = reader[buffer_offset:buffer_offset + slice_size]
                buffer_offset += slice_size

            # No new input. Break out of read loop.
            if not read_result:
                break

            # Feed all read data into decompressor and emit output until
            # exhausted.
            read_buffer = ffi.from_buffer(read_result)
            in_buffer.src = read_buffer
            in_buffer.size = len(read_buffer)
            in_buffer.pos = 0

            while in_buffer.pos < in_buffer.size:
                assert out_buffer.pos == 0

                zresult = lib.ZSTD_decompress_generic(self._dctx, out_buffer, in_buffer)
                if lib.ZSTD_isError(zresult):
                    raise ZstdError('zstd decompress error: %s' %
                                    _zstd_error(zresult))

                if out_buffer.pos:
                    data = ffi.buffer(out_buffer.dst, out_buffer.pos)[:]
                    out_buffer.pos = 0
                    yield data

                if zresult == 0:
                    return

            # Repeat loop to collect more input data.
            continue

        # If we get here, input is exhausted.

    read_from = read_to_iter

    def stream_writer(self, writer, write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE):
        if not hasattr(writer, 'write'):
            raise ValueError('must pass an object with a write() method')

        return ZstdDecompressionWriter(self, writer, write_size)

    write_to = stream_writer

    def copy_stream(self, ifh, ofh,
                    read_size=DECOMPRESSION_RECOMMENDED_INPUT_SIZE,
                    write_size=DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE):
        if not hasattr(ifh, 'read'):
            raise ValueError('first argument must have a read() method')
        if not hasattr(ofh, 'write'):
            raise ValueError('second argument must have a write() method')

        self._ensure_dctx()

        in_buffer = ffi.new('ZSTD_inBuffer *')
        out_buffer = ffi.new('ZSTD_outBuffer *')

        dst_buffer = ffi.new('char[]', write_size)
        out_buffer.dst = dst_buffer
        out_buffer.size = write_size
        out_buffer.pos = 0

        total_read, total_write = 0, 0

        # Read all available input.
        while True:
            data = ifh.read(read_size)
            if not data:
                break

            data_buffer = ffi.from_buffer(data)
            total_read += len(data_buffer)
            in_buffer.src = data_buffer
            in_buffer.size = len(data_buffer)
            in_buffer.pos = 0

            # Flush all read data to output.
            while in_buffer.pos < in_buffer.size:
                zresult = lib.ZSTD_decompress_generic(self._dctx, out_buffer, in_buffer)
                if lib.ZSTD_isError(zresult):
                    raise ZstdError('zstd decompressor error: %s' %
                                    _zstd_error(zresult))

                if out_buffer.pos:
                    ofh.write(ffi.buffer(out_buffer.dst, out_buffer.pos))
                    total_write += out_buffer.pos
                    out_buffer.pos = 0

            # Continue loop to keep reading.

        return total_read, total_write

    def decompress_content_dict_chain(self, frames):
        if not isinstance(frames, list):
            raise TypeError('argument must be a list')

        if not frames:
            raise ValueError('empty input chain')

        # First chunk should not be using a dictionary. We handle it specially.
        chunk = frames[0]
        if not isinstance(chunk, bytes_type):
            raise ValueError('chunk 0 must be bytes')

        # All chunks should be zstd frames and should have content size set.
        chunk_buffer = ffi.from_buffer(chunk)
        params = ffi.new('ZSTD_frameHeader *')
        zresult = lib.ZSTD_getFrameHeader(params, chunk_buffer, len(chunk_buffer))
        if lib.ZSTD_isError(zresult):
            raise ValueError('chunk 0 is not a valid zstd frame')
        elif zresult:
            raise ValueError('chunk 0 is too small to contain a zstd frame')

        if params.frameContentSize == lib.ZSTD_CONTENTSIZE_UNKNOWN:
            raise ValueError('chunk 0 missing content size in frame')

        self._ensure_dctx(load_dict=False)

        last_buffer = ffi.new('char[]', params.frameContentSize)

        out_buffer = ffi.new('ZSTD_outBuffer *')
        out_buffer.dst = last_buffer
        out_buffer.size = len(last_buffer)
        out_buffer.pos = 0

        in_buffer = ffi.new('ZSTD_inBuffer *')
        in_buffer.src = chunk_buffer
        in_buffer.size = len(chunk_buffer)
        in_buffer.pos = 0

        zresult = lib.ZSTD_decompress_generic(self._dctx, out_buffer, in_buffer)
        if lib.ZSTD_isError(zresult):
            raise ZstdError('could not decompress chunk 0: %s' %
                            _zstd_error(zresult))
        elif zresult:
            raise ZstdError('chunk 0 did not decompress full frame')

        # Special case of chain length of 1
        if len(frames) == 1:
            return ffi.buffer(last_buffer, len(last_buffer))[:]

        i = 1
        while i < len(frames):
            chunk = frames[i]
            if not isinstance(chunk, bytes_type):
                raise ValueError('chunk %d must be bytes' % i)

            chunk_buffer = ffi.from_buffer(chunk)
            zresult = lib.ZSTD_getFrameHeader(params, chunk_buffer, len(chunk_buffer))
            if lib.ZSTD_isError(zresult):
                raise ValueError('chunk %d is not a valid zstd frame' % i)
            elif zresult:
                raise ValueError('chunk %d is too small to contain a zstd frame' % i)

            if params.frameContentSize == lib.ZSTD_CONTENTSIZE_UNKNOWN:
                raise ValueError('chunk %d missing content size in frame' % i)

            dest_buffer = ffi.new('char[]', params.frameContentSize)

            out_buffer.dst = dest_buffer
            out_buffer.size = len(dest_buffer)
            out_buffer.pos = 0

            in_buffer.src = chunk_buffer
            in_buffer.size = len(chunk_buffer)
            in_buffer.pos = 0

            zresult = lib.ZSTD_decompress_generic(self._dctx, out_buffer, in_buffer)
            if lib.ZSTD_isError(zresult):
                raise ZstdError('could not decompress chunk %d: %s' %
                                _zstd_error(zresult))
            elif zresult:
                raise ZstdError('chunk %d did not decompress full frame' % i)

            last_buffer = dest_buffer
            i += 1

        return ffi.buffer(last_buffer, len(last_buffer))[:]

    def _ensure_dctx(self, load_dict=True):
        lib.ZSTD_DCtx_reset(self._dctx)

        if self._max_window_size:
            zresult = lib.ZSTD_DCtx_setMaxWindowSize(self._dctx,
                                                     self._max_window_size)
            if lib.ZSTD_isError(zresult):
                raise ZstdError('unable to set max window size: %s' %
                                _zstd_error(zresult))

        zresult = lib.ZSTD_DCtx_setFormat(self._dctx, self._format)
        if lib.ZSTD_isError(zresult):
            raise ZstdError('unable to set decoding format: %s' %
                            _zstd_error(zresult))

        if self._dict_data and load_dict:
            zresult = lib.ZSTD_DCtx_refDDict(self._dctx, self._dict_data._ddict)
            if lib.ZSTD_isError(zresult):
                raise ZstdError('unable to reference prepared dictionary: %s' %
                                _zstd_error(zresult))