Mercurial > hg
view contrib/python-zstandard/tests/common.py @ 33756:5866ba5e9c48
cext: move _dict_new_presized() to header
Prepares for splitting encoding functions from parsers.c.
author | Yuya Nishihara <yuya@tcha.org> |
---|---|
date | Mon, 31 Jul 2017 22:12:24 +0900 |
parents | e0dc40530c5a |
children | b1fb341d8a61 |
line wrap: on
line source
import inspect import io import os import types def make_cffi(cls): """Decorator to add CFFI versions of each test method.""" try: import zstd_cffi except ImportError: return cls # If CFFI version is available, dynamically construct test methods # that use it. for attr in dir(cls): fn = getattr(cls, attr) if not inspect.ismethod(fn) and not inspect.isfunction(fn): continue if not fn.__name__.startswith('test_'): continue name = '%s_cffi' % fn.__name__ # Replace the "zstd" symbol with the CFFI module instance. Then copy # the function object and install it in a new attribute. if isinstance(fn, types.FunctionType): globs = dict(fn.__globals__) globs['zstd'] = zstd_cffi new_fn = types.FunctionType(fn.__code__, globs, name, fn.__defaults__, fn.__closure__) new_method = new_fn else: globs = dict(fn.__func__.func_globals) globs['zstd'] = zstd_cffi new_fn = types.FunctionType(fn.__func__.func_code, globs, name, fn.__func__.func_defaults, fn.__func__.func_closure) new_method = types.UnboundMethodType(new_fn, fn.im_self, fn.im_class) setattr(cls, name, new_method) return cls class OpCountingBytesIO(io.BytesIO): def __init__(self, *args, **kwargs): self._read_count = 0 self._write_count = 0 return super(OpCountingBytesIO, self).__init__(*args, **kwargs) def read(self, *args): self._read_count += 1 return super(OpCountingBytesIO, self).read(*args) def write(self, data): self._write_count += 1 return super(OpCountingBytesIO, self).write(data) _source_files = [] def random_input_data(): """Obtain the raw content of source files. This is used for generating "random" data to feed into fuzzing, since it is faster than random content generation. """ if _source_files: return _source_files for root, dirs, files in os.walk(os.path.dirname(__file__)): dirs[:] = list(sorted(dirs)) for f in sorted(files): try: with open(os.path.join(root, f), 'rb') as fh: data = fh.read() if data: _source_files.append(data) except OSError: pass return _source_files