Mercurial > hg
view contrib/testparseutil.py @ 45095:8e04607023e5
procutil: ensure that procutil.std{out,err}.write() writes all bytes
Python 3 offers different kind of streams and it’s not guaranteed for all of
them that calling write() writes all bytes.
When Python is started in unbuffered mode, sys.std{out,err}.buffer are
instances of io.FileIO, whose write() can write less bytes for
platform-specific reasons (e.g. Linux has a 0x7ffff000 bytes maximum and could
write less if interrupted by a signal; when writing to Windows consoles, it’s
limited to 32767 bytes to avoid the "not enough space" error). This can lead to
silent loss of data, both when using sys.std{out,err}.buffer (which may in fact
not be a buffered stream) and when using the text streams sys.std{out,err}
(I’ve created a CPython bug report for that:
https://bugs.python.org/issue41221).
Python may fix the problem at some point. For now, we implement our own wrapper
for procutil.std{out,err} that calls the raw stream’s write() method until all
bytes have been written. We don’t use sys.std{out,err} for larger writes, so I
think it’s not worth the effort to patch them.
author | Manuel Jacob <me@manueljacob.de> |
---|---|
date | Fri, 10 Jul 2020 12:27:58 +0200 |
parents | ac3cb5e05a38 |
children | 89a2afe31e82 |
line wrap: on
line source
# testparseutil.py - utilities to parse test script for check tools # # Copyright 2018 FUJIWARA Katsunori <foozy@lares.dti.ne.jp> and others # # This software may be used and distributed according to the terms of the # GNU General Public License version 2 or any later version. from __future__ import absolute_import, print_function import abc import re import sys #################### # for Python3 compatibility (almost comes from mercurial/pycompat.py) ispy3 = sys.version_info[0] >= 3 def identity(a): return a def _rapply(f, xs): if xs is None: # assume None means non-value of optional data return xs if isinstance(xs, (list, set, tuple)): return type(xs)(_rapply(f, x) for x in xs) if isinstance(xs, dict): return type(xs)((_rapply(f, k), _rapply(f, v)) for k, v in xs.items()) return f(xs) def rapply(f, xs): if f is identity: # fast path mainly for py2 return xs return _rapply(f, xs) if ispy3: import builtins def bytestr(s): # tiny version of pycompat.bytestr return s.encode('latin1') def sysstr(s): if isinstance(s, builtins.str): return s return s.decode('latin-1') def opentext(f): return open(f, 'r') else: bytestr = str sysstr = identity opentext = open def b2s(x): # convert BYTES elements in "x" to SYSSTR recursively return rapply(sysstr, x) def writeout(data): # write "data" in BYTES into stdout sys.stdout.write(data) def writeerr(data): # write "data" in BYTES into stderr sys.stderr.write(data) #################### class embeddedmatcher(object): # pytype: disable=ignored-metaclass """Base class to detect embedded code fragments in *.t test script """ __metaclass__ = abc.ABCMeta def __init__(self, desc): self.desc = desc @abc.abstractmethod def startsat(self, line): """Examine whether embedded code starts at line This can return arbitrary object, and it is used as 'ctx' for subsequent method invocations. """ @abc.abstractmethod def endsat(self, ctx, line): """Examine whether embedded code ends at line""" @abc.abstractmethod def isinside(self, ctx, line): """Examine whether line is inside embedded code, if not yet endsat """ @abc.abstractmethod def ignores(self, ctx): """Examine whether detected embedded code should be ignored""" @abc.abstractmethod def filename(self, ctx): """Return filename of embedded code If filename isn't specified for embedded code explicitly, this returns None. """ @abc.abstractmethod def codeatstart(self, ctx, line): """Return actual code at the start line of embedded code This might return None, if the start line doesn't contain actual code. """ @abc.abstractmethod def codeatend(self, ctx, line): """Return actual code at the end line of embedded code This might return None, if the end line doesn't contain actual code. """ @abc.abstractmethod def codeinside(self, ctx, line): """Return actual code at line inside embedded code""" def embedded(basefile, lines, errors, matchers): """pick embedded code fragments up from given lines This is common parsing logic, which examines specified matchers on given lines. :basefile: a name of a file, from which lines to be parsed come. :lines: to be parsed (might be a value returned by "open(basefile)") :errors: an array, into which messages for detected error are stored :matchers: an array of embeddedmatcher objects This function yields '(filename, starts, ends, code)' tuple. :filename: a name of embedded code, if it is explicitly specified (e.g. "foobar" of "cat >> foobar <<EOF"). Otherwise, this is None :starts: line number (1-origin), at which embedded code starts (inclusive) :ends: line number (1-origin), at which embedded code ends (exclusive) :code: extracted embedded code, which is single-stringified >>> class ambigmatcher(object): ... # mock matcher class to examine implementation of ... # "ambiguous matching" corner case ... def __init__(self, desc, matchfunc): ... self.desc = desc ... self.matchfunc = matchfunc ... def startsat(self, line): ... return self.matchfunc(line) >>> ambig1 = ambigmatcher('ambiguous #1', ... lambda l: l.startswith(' $ cat ')) >>> ambig2 = ambigmatcher('ambiguous #2', ... lambda l: l.endswith('<< EOF\\n')) >>> lines = [' $ cat > foo.py << EOF\\n'] >>> errors = [] >>> matchers = [ambig1, ambig2] >>> list(t for t in embedded('<dummy>', lines, errors, matchers)) [] >>> b2s(errors) ['<dummy>:1: ambiguous line for "ambiguous #1", "ambiguous #2"'] """ matcher = None ctx = filename = code = startline = None # for pyflakes for lineno, line in enumerate(lines, 1): if not line.endswith('\n'): line += '\n' # to normalize EOF line if matcher: # now, inside embedded code if matcher.endsat(ctx, line): codeatend = matcher.codeatend(ctx, line) if codeatend is not None: code.append(codeatend) if not matcher.ignores(ctx): yield (filename, startline, lineno, ''.join(code)) matcher = None # DO NOT "continue", because line might start next fragment elif not matcher.isinside(ctx, line): # this is an error of basefile # (if matchers are implemented correctly) errors.append( '%s:%d: unexpected line for "%s"' % (basefile, lineno, matcher.desc) ) # stop extracting embedded code by current 'matcher', # because appearance of unexpected line might mean # that expected end-of-embedded-code line might never # appear matcher = None # DO NOT "continue", because line might start next fragment else: code.append(matcher.codeinside(ctx, line)) continue # examine whether current line starts embedded code or not assert not matcher matched = [] for m in matchers: ctx = m.startsat(line) if ctx: matched.append((m, ctx)) if matched: if len(matched) > 1: # this is an error of matchers, maybe errors.append( '%s:%d: ambiguous line for %s' % ( basefile, lineno, ', '.join(['"%s"' % m.desc for m, c in matched]), ) ) # omit extracting embedded code, because choosing # arbitrary matcher from matched ones might fail to # detect the end of embedded code as expected. continue matcher, ctx = matched[0] filename = matcher.filename(ctx) code = [] codeatstart = matcher.codeatstart(ctx, line) if codeatstart is not None: code.append(codeatstart) startline = lineno else: startline = lineno + 1 if matcher: # examine whether EOF ends embedded code, because embedded # code isn't yet ended explicitly if matcher.endsat(ctx, '\n'): codeatend = matcher.codeatend(ctx, '\n') if codeatend is not None: code.append(codeatend) if not matcher.ignores(ctx): yield (filename, startline, lineno + 1, ''.join(code)) else: # this is an error of basefile # (if matchers are implemented correctly) errors.append( '%s:%d: unexpected end of file for "%s"' % (basefile, lineno, matcher.desc) ) # heredoc limit mark to ignore embedded code at check-code.py or so heredocignorelimit = 'NO_CHECK_EOF' # the pattern to match against cases below, and to return a limit mark # string as 'lname' group # # - << LIMITMARK # - << "LIMITMARK" # - << 'LIMITMARK' heredoclimitpat = r'\s*<<\s*(?P<lquote>["\']?)(?P<limit>\w+)(?P=lquote)' class fileheredocmatcher(embeddedmatcher): """Detect "cat > FILE << LIMIT" style embedded code >>> matcher = fileheredocmatcher('heredoc .py file', r'[^<]+\\.py') >>> b2s(matcher.startsat(' $ cat > file.py << EOF\\n')) ('file.py', ' > EOF\\n') >>> b2s(matcher.startsat(' $ cat >>file.py <<EOF\\n')) ('file.py', ' > EOF\\n') >>> b2s(matcher.startsat(' $ cat> \\x27any file.py\\x27<< "EOF"\\n')) ('any file.py', ' > EOF\\n') >>> b2s(matcher.startsat(" $ cat > file.py << 'ANYLIMIT'\\n")) ('file.py', ' > ANYLIMIT\\n') >>> b2s(matcher.startsat(' $ cat<<ANYLIMIT>"file.py"\\n')) ('file.py', ' > ANYLIMIT\\n') >>> start = ' $ cat > file.py << EOF\\n' >>> ctx = matcher.startsat(start) >>> matcher.codeatstart(ctx, start) >>> b2s(matcher.filename(ctx)) 'file.py' >>> matcher.ignores(ctx) False >>> inside = ' > foo = 1\\n' >>> matcher.endsat(ctx, inside) False >>> matcher.isinside(ctx, inside) True >>> b2s(matcher.codeinside(ctx, inside)) 'foo = 1\\n' >>> end = ' > EOF\\n' >>> matcher.endsat(ctx, end) True >>> matcher.codeatend(ctx, end) >>> matcher.endsat(ctx, ' > EOFEOF\\n') False >>> ctx = matcher.startsat(' $ cat > file.py << NO_CHECK_EOF\\n') >>> matcher.ignores(ctx) True """ _prefix = ' > ' def __init__(self, desc, namepat): super(fileheredocmatcher, self).__init__(desc) # build the pattern to match against cases below (and ">>" # variants), and to return a target filename string as 'name' # group # # - > NAMEPAT # - > "NAMEPAT" # - > 'NAMEPAT' namepat = ( r'\s*>>?\s*(?P<nquote>["\']?)(?P<name>%s)(?P=nquote)' % namepat ) self._fileres = [ # "cat > NAME << LIMIT" case re.compile(r' {2}\$ \s*cat' + namepat + heredoclimitpat), # "cat << LIMIT > NAME" case re.compile(r' {2}\$ \s*cat' + heredoclimitpat + namepat), ] def startsat(self, line): # ctx is (filename, END-LINE-OF-EMBEDDED-CODE) tuple for filere in self._fileres: matched = filere.match(line) if matched: return ( matched.group('name'), ' > %s\n' % matched.group('limit'), ) def endsat(self, ctx, line): return ctx[1] == line def isinside(self, ctx, line): return line.startswith(self._prefix) def ignores(self, ctx): return ' > %s\n' % heredocignorelimit == ctx[1] def filename(self, ctx): return ctx[0] def codeatstart(self, ctx, line): return None # no embedded code at start line def codeatend(self, ctx, line): return None # no embedded code at end line def codeinside(self, ctx, line): return line[len(self._prefix) :] # strip prefix #### # for embedded python script class pydoctestmatcher(embeddedmatcher): """Detect ">>> code" style embedded python code >>> matcher = pydoctestmatcher() >>> startline = ' >>> foo = 1\\n' >>> matcher.startsat(startline) True >>> matcher.startsat(' ... foo = 1\\n') False >>> ctx = matcher.startsat(startline) >>> matcher.filename(ctx) >>> matcher.ignores(ctx) False >>> b2s(matcher.codeatstart(ctx, startline)) 'foo = 1\\n' >>> inside = ' >>> foo = 1\\n' >>> matcher.endsat(ctx, inside) False >>> matcher.isinside(ctx, inside) True >>> b2s(matcher.codeinside(ctx, inside)) 'foo = 1\\n' >>> inside = ' ... foo = 1\\n' >>> matcher.endsat(ctx, inside) False >>> matcher.isinside(ctx, inside) True >>> b2s(matcher.codeinside(ctx, inside)) 'foo = 1\\n' >>> inside = ' expected output\\n' >>> matcher.endsat(ctx, inside) False >>> matcher.isinside(ctx, inside) True >>> b2s(matcher.codeinside(ctx, inside)) '\\n' >>> inside = ' \\n' >>> matcher.endsat(ctx, inside) False >>> matcher.isinside(ctx, inside) True >>> b2s(matcher.codeinside(ctx, inside)) '\\n' >>> end = ' $ foo bar\\n' >>> matcher.endsat(ctx, end) True >>> matcher.codeatend(ctx, end) >>> end = '\\n' >>> matcher.endsat(ctx, end) True >>> matcher.codeatend(ctx, end) """ _prefix = ' >>> ' _prefixre = re.compile(r' {2}(>>>|\.\.\.) ') # If a line matches against not _prefixre but _outputre, that line # is "an expected output line" (= not a part of code fragment). # # Strictly speaking, a line matching against "(#if|#else|#endif)" # is also treated similarly in "inline python code" semantics by # run-tests.py. But "directive line inside inline python code" # should be rejected by Mercurial reviewers. Therefore, this # regexp does not matche against such directive lines. _outputre = re.compile(r' {2}$| {2}[^$]') def __init__(self): super(pydoctestmatcher, self).__init__("doctest style python code") def startsat(self, line): # ctx is "True" return line.startswith(self._prefix) def endsat(self, ctx, line): return not (self._prefixre.match(line) or self._outputre.match(line)) def isinside(self, ctx, line): return True # always true, if not yet ended def ignores(self, ctx): return False # should be checked always def filename(self, ctx): return None # no filename def codeatstart(self, ctx, line): return line[len(self._prefix) :] # strip prefix ' >>> '/' ... ' def codeatend(self, ctx, line): return None # no embedded code at end line def codeinside(self, ctx, line): if self._prefixre.match(line): return line[len(self._prefix) :] # strip prefix ' >>> '/' ... ' return '\n' # an expected output line is treated as an empty line class pyheredocmatcher(embeddedmatcher): """Detect "python << LIMIT" style embedded python code >>> matcher = pyheredocmatcher() >>> b2s(matcher.startsat(' $ python << EOF\\n')) ' > EOF\\n' >>> b2s(matcher.startsat(' $ $PYTHON <<EOF\\n')) ' > EOF\\n' >>> b2s(matcher.startsat(' $ "$PYTHON"<< "EOF"\\n')) ' > EOF\\n' >>> b2s(matcher.startsat(" $ $PYTHON << 'ANYLIMIT'\\n")) ' > ANYLIMIT\\n' >>> matcher.startsat(' $ "$PYTHON" < EOF\\n') >>> start = ' $ python << EOF\\n' >>> ctx = matcher.startsat(start) >>> matcher.codeatstart(ctx, start) >>> matcher.filename(ctx) >>> matcher.ignores(ctx) False >>> inside = ' > foo = 1\\n' >>> matcher.endsat(ctx, inside) False >>> matcher.isinside(ctx, inside) True >>> b2s(matcher.codeinside(ctx, inside)) 'foo = 1\\n' >>> end = ' > EOF\\n' >>> matcher.endsat(ctx, end) True >>> matcher.codeatend(ctx, end) >>> matcher.endsat(ctx, ' > EOFEOF\\n') False >>> ctx = matcher.startsat(' $ python << NO_CHECK_EOF\\n') >>> matcher.ignores(ctx) True """ _prefix = ' > ' _startre = re.compile( r' {2}\$ (\$PYTHON|"\$PYTHON"|python).*' + heredoclimitpat ) def __init__(self): super(pyheredocmatcher, self).__init__("heredoc python invocation") def startsat(self, line): # ctx is END-LINE-OF-EMBEDDED-CODE matched = self._startre.match(line) if matched: return ' > %s\n' % matched.group('limit') def endsat(self, ctx, line): return ctx == line def isinside(self, ctx, line): return line.startswith(self._prefix) def ignores(self, ctx): return ' > %s\n' % heredocignorelimit == ctx def filename(self, ctx): return None # no filename def codeatstart(self, ctx, line): return None # no embedded code at start line def codeatend(self, ctx, line): return None # no embedded code at end line def codeinside(self, ctx, line): return line[len(self._prefix) :] # strip prefix _pymatchers = [ pydoctestmatcher(), pyheredocmatcher(), # use '[^<]+' instead of '\S+', in order to match against # paths including whitespaces fileheredocmatcher('heredoc .py file', r'[^<]+\.py'), ] def pyembedded(basefile, lines, errors): return embedded(basefile, lines, errors, _pymatchers) #### # for embedded shell script _shmatchers = [ # use '[^<]+' instead of '\S+', in order to match against # paths including whitespaces fileheredocmatcher('heredoc .sh file', r'[^<]+\.sh'), ] def shembedded(basefile, lines, errors): return embedded(basefile, lines, errors, _shmatchers) #### # for embedded hgrc configuration _hgrcmatchers = [ # use '[^<]+' instead of '\S+', in order to match against # paths including whitespaces fileheredocmatcher( 'heredoc hgrc file', r'(([^/<]+/)+hgrc|\$HGRCPATH|\${HGRCPATH})' ), ] def hgrcembedded(basefile, lines, errors): return embedded(basefile, lines, errors, _hgrcmatchers) #### if __name__ == "__main__": import optparse import sys def showembedded(basefile, lines, embeddedfunc, opts): errors = [] for name, starts, ends, code in embeddedfunc(basefile, lines, errors): if not name: name = '<anonymous>' writeout("%s:%d: %s starts\n" % (basefile, starts, name)) if opts.verbose and code: writeout(" |%s\n" % "\n |".join(l for l in code.splitlines())) writeout("%s:%d: %s ends\n" % (basefile, ends, name)) for e in errors: writeerr("%s\n" % e) return len(errors) def applyembedded(args, embeddedfunc, opts): ret = 0 if args: for f in args: with opentext(f) as fp: if showembedded(f, fp, embeddedfunc, opts): ret = 1 else: lines = [l for l in sys.stdin.readlines()] if showembedded('<stdin>', lines, embeddedfunc, opts): ret = 1 return ret commands = {} def command(name, desc): def wrap(func): commands[name] = (desc, func) return wrap @command("pyembedded", "detect embedded python script") def pyembeddedcmd(args, opts): return applyembedded(args, pyembedded, opts) @command("shembedded", "detect embedded shell script") def shembeddedcmd(args, opts): return applyembedded(args, shembedded, opts) @command("hgrcembedded", "detect embedded hgrc configuration") def hgrcembeddedcmd(args, opts): return applyembedded(args, hgrcembedded, opts) availablecommands = "\n".join( [" - %s: %s" % (key, value[0]) for key, value in commands.items()] ) parser = optparse.OptionParser( """%prog COMMAND [file ...] Pick up embedded code fragments from given file(s) or stdin, and list up start/end lines of them in standard compiler format ("FILENAME:LINENO:"). Available commands are: """ + availablecommands + """ """ ) parser.add_option( "-v", "--verbose", help="enable additional output (e.g. actual code)", action="store_true", ) (opts, args) = parser.parse_args() if not args or args[0] not in commands: parser.print_help() sys.exit(255) sys.exit(commands[args[0]][1](args[1:], opts))