Mercurial > hg
view contrib/python-zstandard/tests/common.py @ 43976:943e34522b37
perf: drop an unused variable assignment
Caught by PyCharm. A different formatter is created below in `_displaystats()`.
Differential Revision: https://phab.mercurial-scm.org/D7744
author | Matt Harbison <matt_harbison@yahoo.com> |
---|---|
date | Fri, 27 Dec 2019 13:47:17 -0500 |
parents | 675775c33ab6 |
children | de7838053207 |
line wrap: on
line source
import imp import inspect import io import os import types try: import hypothesis except ImportError: hypothesis = None def make_cffi(cls): """Decorator to add CFFI versions of each test method.""" # The module containing this class definition should # `import zstandard as zstd`. Otherwise things may blow up. mod = inspect.getmodule(cls) if not hasattr(mod, 'zstd'): raise Exception('test module does not contain "zstd" symbol') if not hasattr(mod.zstd, 'backend'): raise Exception('zstd symbol does not have "backend" attribute; did ' 'you `import zstandard as zstd`?') # If `import zstandard` already chose the cffi backend, there is nothing # for us to do: we only add the cffi variation if the default backend # is the C extension. if mod.zstd.backend == 'cffi': return cls old_env = dict(os.environ) os.environ['PYTHON_ZSTANDARD_IMPORT_POLICY'] = 'cffi' try: try: mod_info = imp.find_module('zstandard') mod = imp.load_module('zstandard_cffi', *mod_info) except ImportError: return cls finally: os.environ.clear() os.environ.update(old_env) if mod.backend != 'cffi': raise Exception('got the zstandard %s backend instead of cffi' % mod.backend) # If CFFI version is available, dynamically construct test methods # that use it. for attr in dir(cls): fn = getattr(cls, attr) if not inspect.ismethod(fn) and not inspect.isfunction(fn): continue if not fn.__name__.startswith('test_'): continue name = '%s_cffi' % fn.__name__ # Replace the "zstd" symbol with the CFFI module instance. Then copy # the function object and install it in a new attribute. if isinstance(fn, types.FunctionType): globs = dict(fn.__globals__) globs['zstd'] = mod new_fn = types.FunctionType(fn.__code__, globs, name, fn.__defaults__, fn.__closure__) new_method = new_fn else: globs = dict(fn.__func__.func_globals) globs['zstd'] = mod new_fn = types.FunctionType(fn.__func__.func_code, globs, name, fn.__func__.func_defaults, fn.__func__.func_closure) new_method = types.UnboundMethodType(new_fn, fn.im_self, fn.im_class) setattr(cls, name, new_method) return cls class NonClosingBytesIO(io.BytesIO): """BytesIO that saves the underlying buffer on close(). This allows us to access written data after close(). """ def __init__(self, *args, **kwargs): super(NonClosingBytesIO, self).__init__(*args, **kwargs) self._saved_buffer = None def close(self): self._saved_buffer = self.getvalue() return super(NonClosingBytesIO, self).close() def getvalue(self): if self.closed: return self._saved_buffer else: return super(NonClosingBytesIO, self).getvalue() class OpCountingBytesIO(NonClosingBytesIO): def __init__(self, *args, **kwargs): self._flush_count = 0 self._read_count = 0 self._write_count = 0 return super(OpCountingBytesIO, self).__init__(*args, **kwargs) def flush(self): self._flush_count += 1 return super(OpCountingBytesIO, self).flush() def read(self, *args): self._read_count += 1 return super(OpCountingBytesIO, self).read(*args) def write(self, data): self._write_count += 1 return super(OpCountingBytesIO, self).write(data) _source_files = [] def random_input_data(): """Obtain the raw content of source files. This is used for generating "random" data to feed into fuzzing, since it is faster than random content generation. """ if _source_files: return _source_files for root, dirs, files in os.walk(os.path.dirname(__file__)): dirs[:] = list(sorted(dirs)) for f in sorted(files): try: with open(os.path.join(root, f), 'rb') as fh: data = fh.read() if data: _source_files.append(data) except OSError: pass # Also add some actual random data. _source_files.append(os.urandom(100)) _source_files.append(os.urandom(1000)) _source_files.append(os.urandom(10000)) _source_files.append(os.urandom(100000)) _source_files.append(os.urandom(1000000)) return _source_files def generate_samples(): inputs = [ b'foo', b'bar', b'abcdef', b'sometext', b'baz', ] samples = [] for i in range(128): samples.append(inputs[i % 5]) samples.append(inputs[i % 5] * (i + 3)) samples.append(inputs[-(i % 5)] * (i + 2)) return samples if hypothesis: default_settings = hypothesis.settings(deadline=10000) hypothesis.settings.register_profile('default', default_settings) ci_settings = hypothesis.settings(deadline=20000, max_examples=1000) hypothesis.settings.register_profile('ci', ci_settings) expensive_settings = hypothesis.settings(deadline=None, max_examples=10000) hypothesis.settings.register_profile('expensive', expensive_settings) hypothesis.settings.load_profile( os.environ.get('HYPOTHESIS_PROFILE', 'default'))