Mercurial > hg
view tests/f @ 45095:8e04607023e5
procutil: ensure that procutil.std{out,err}.write() writes all bytes
Python 3 offers different kind of streams and it’s not guaranteed for all of
them that calling write() writes all bytes.
When Python is started in unbuffered mode, sys.std{out,err}.buffer are
instances of io.FileIO, whose write() can write less bytes for
platform-specific reasons (e.g. Linux has a 0x7ffff000 bytes maximum and could
write less if interrupted by a signal; when writing to Windows consoles, it’s
limited to 32767 bytes to avoid the "not enough space" error). This can lead to
silent loss of data, both when using sys.std{out,err}.buffer (which may in fact
not be a buffered stream) and when using the text streams sys.std{out,err}
(I’ve created a CPython bug report for that:
https://bugs.python.org/issue41221).
Python may fix the problem at some point. For now, we implement our own wrapper
for procutil.std{out,err} that calls the raw stream’s write() method until all
bytes have been written. We don’t use sys.std{out,err} for larger writes, so I
think it’s not worth the effort to patch them.
author | Manuel Jacob <me@manueljacob.de> |
---|---|
date | Fri, 10 Jul 2020 12:27:58 +0200 |
parents | 47ef023d0165 |
children | c102b704edb5 |
line wrap: on
line source
#!/usr/bin/env python """ Utility for inspecting files in various ways. This tool is like the collection of tools found in a unix environment but are cross platform and stable and suitable for our needs in the test suite. This can be used instead of tools like: [ dd find head hexdump ls md5sum readlink sha1sum stat tail test readlink.py md5sum.py """ from __future__ import absolute_import import binascii import glob import hashlib import optparse import os import re import sys # Python 3 adapters ispy3 = sys.version_info[0] >= 3 if ispy3: def iterbytes(s): for i in range(len(s)): yield s[i : i + 1] else: iterbytes = iter def visit(opts, filenames, outfile): """Process filenames in the way specified in opts, writing output to outfile.""" for f in sorted(filenames): isstdin = f == '-' if not isstdin and not os.path.lexists(f): outfile.write(b'%s: file not found\n' % f.encode('utf-8')) continue quiet = opts.quiet and not opts.recurse or isstdin isdir = os.path.isdir(f) islink = os.path.islink(f) isfile = os.path.isfile(f) and not islink dirfiles = None content = None facts = [] if isfile: if opts.type: facts.append(b'file') if any((opts.hexdump, opts.dump, opts.md5, opts.sha1, opts.sha256)): with open(f, 'rb') as fobj: content = fobj.read() elif islink: if opts.type: facts.append(b'link') content = os.readlink(f).encode('utf8') elif isstdin: content = getattr(sys.stdin, 'buffer', sys.stdin).read() if opts.size: facts.append(b'size=%d' % len(content)) elif isdir: if opts.recurse or opts.type: dirfiles = glob.glob(f + '/*') facts.append(b'directory with %d files' % len(dirfiles)) elif opts.type: facts.append(b'type unknown') if not isstdin: stat = os.lstat(f) if opts.size and not isdir: facts.append(b'size=%d' % stat.st_size) if opts.mode and not islink: facts.append(b'mode=%o' % (stat.st_mode & 0o777)) if opts.links: facts.append(b'links=%d' % stat.st_nlink) if opts.newer: # mtime might be in whole seconds so newer file might be same if stat.st_mtime >= os.stat(opts.newer).st_mtime: facts.append( b'newer than %s' % opts.newer.encode('utf8', 'replace') ) else: facts.append( b'older than %s' % opts.newer.encode('utf8', 'replace') ) if opts.md5 and content is not None: h = hashlib.md5(content) facts.append(b'md5=%s' % binascii.hexlify(h.digest())[: opts.bytes]) if opts.sha1 and content is not None: h = hashlib.sha1(content) facts.append( b'sha1=%s' % binascii.hexlify(h.digest())[: opts.bytes] ) if opts.sha256 and content is not None: h = hashlib.sha256(content) facts.append( b'sha256=%s' % binascii.hexlify(h.digest())[: opts.bytes] ) if isstdin: outfile.write(b', '.join(facts) + b'\n') elif facts: outfile.write(b'%s: %s\n' % (f.encode('utf-8'), b', '.join(facts))) elif not quiet: outfile.write(b'%s:\n' % f.encode('utf-8')) if content is not None: chunk = content if not islink: if opts.lines: if opts.lines >= 0: chunk = b''.join(chunk.splitlines(True)[: opts.lines]) else: chunk = b''.join(chunk.splitlines(True)[opts.lines :]) if opts.bytes: if opts.bytes >= 0: chunk = chunk[: opts.bytes] else: chunk = chunk[opts.bytes :] if opts.hexdump: for i in range(0, len(chunk), 16): s = chunk[i : i + 16] outfile.write( b'%04x: %-47s |%s|\n' % ( i, b' '.join(b'%02x' % ord(c) for c in iterbytes(s)), re.sub(b'[^ -~]', b'.', s), ) ) if opts.dump: if not quiet: outfile.write(b'>>>\n') outfile.write(chunk) if not quiet: if chunk.endswith(b'\n'): outfile.write(b'<<<\n') else: outfile.write(b'\n<<< no trailing newline\n') if opts.recurse and dirfiles: assert not isstdin visit(opts, dirfiles, outfile) if __name__ == "__main__": parser = optparse.OptionParser("%prog [options] [filenames]") parser.add_option( "-t", "--type", action="store_true", help="show file type (file or directory)", ) parser.add_option( "-m", "--mode", action="store_true", help="show file mode" ) parser.add_option( "-l", "--links", action="store_true", help="show number of links" ) parser.add_option( "-s", "--size", action="store_true", help="show size of file" ) parser.add_option( "-n", "--newer", action="store", help="check if file is newer (or same)" ) parser.add_option( "-r", "--recurse", action="store_true", help="recurse into directories" ) parser.add_option( "-S", "--sha1", action="store_true", help="show sha1 hash of the content", ) parser.add_option( "", "--sha256", action="store_true", help="show sha256 hash of the content", ) parser.add_option( "-M", "--md5", action="store_true", help="show md5 hash of the content" ) parser.add_option( "-D", "--dump", action="store_true", help="dump file content" ) parser.add_option( "-H", "--hexdump", action="store_true", help="hexdump file content" ) parser.add_option( "-B", "--bytes", type="int", help="number of characters to dump" ) parser.add_option( "-L", "--lines", type="int", help="number of lines to dump" ) parser.add_option( "-q", "--quiet", action="store_true", help="no default output" ) (opts, filenames) = parser.parse_args(sys.argv[1:]) if not filenames: filenames = ['-'] visit(opts, filenames, getattr(sys.stdout, 'buffer', sys.stdout))