tests/run-tests.py
author Thomas Arendsen Hein <thomas@intevation.de>
Tue, 19 Aug 2008 18:17:11 +0200
changeset 6908 b77c25c2d6c0
parent 6681 6a6ef726a2b3
child 6982 9fc5bf4adbcf
permissions -rwxr-xr-x
Do not sort hgwebdir repositories if python list or tuple is given. This was introduced by changeset f67d1468ac50 (util: add sort helper)

#!/usr/bin/env python
#
# run-tests.py - Run a set of tests on Mercurial
#
# Copyright 2006 Matt Mackall <mpm@selenic.com>
#
# This software may be used and distributed according to the terms
# of the GNU General Public License, incorporated herein by reference.

import difflib
import errno
import optparse
import os
import popen2
import shutil
import signal
import sys
import tempfile
import time

# reserved exit code to skip test (used by hghave)
SKIPPED_STATUS = 80
SKIPPED_PREFIX = 'skipped: '

required_tools = ["python", "diff", "grep", "unzip", "gunzip", "bunzip2", "sed"]

defaults = {
    'jobs': ('HGTEST_JOBS', 1),
    'timeout': ('HGTEST_TIMEOUT', 180),
    'port': ('HGTEST_PORT', 20059),
}

parser = optparse.OptionParser("%prog [options] [tests]")
parser.add_option("-C", "--annotate", action="store_true",
    help="output files annotated with coverage")
parser.add_option("--child", type="int",
    help="run as child process, summary to given fd")
parser.add_option("-c", "--cover", action="store_true",
    help="print a test coverage report")
parser.add_option("-f", "--first", action="store_true",
    help="exit on the first test failure")
parser.add_option("-i", "--interactive", action="store_true",
    help="prompt to accept changed output")
parser.add_option("-j", "--jobs", type="int",
    help="number of jobs to run in parallel"
         " (default: $%s or %d)" % defaults['jobs'])
parser.add_option("--keep-tmpdir", action="store_true",
    help="keep temporary directory after running tests"
         " (best used with --tmpdir)")
parser.add_option("-R", "--restart", action="store_true",
    help="restart at last error")
parser.add_option("-p", "--port", type="int",
    help="port on which servers should listen"
         " (default: $%s or %d)" % defaults['port'])
parser.add_option("-r", "--retest", action="store_true",
    help="retest failed tests")
parser.add_option("-s", "--cover_stdlib", action="store_true",
    help="print a test coverage report inc. standard libraries")
parser.add_option("-t", "--timeout", type="int",
    help="kill errant tests after TIMEOUT seconds"
         " (default: $%s or %d)" % defaults['timeout'])
parser.add_option("--tmpdir", type="string",
    help="run tests in the given temporary directory")
parser.add_option("-v", "--verbose", action="store_true",
    help="output verbose messages")
parser.add_option("--with-hg", type="string",
    help="test existing install at given location")

for option, default in defaults.items():
    defaults[option] = int(os.environ.get(*default))
parser.set_defaults(**defaults)
(options, args) = parser.parse_args()
verbose = options.verbose
coverage = options.cover or options.cover_stdlib or options.annotate
python = sys.executable

if options.jobs < 1:
    print >> sys.stderr, 'ERROR: -j/--jobs must be positive'
    sys.exit(1)
if options.interactive and options.jobs > 1:
    print >> sys.stderr, 'ERROR: cannot mix -interactive and --jobs > 1'
    sys.exit(1)

def rename(src, dst):
    """Like os.rename(), trade atomicity and opened files friendliness
    for existing destination support.
    """
    shutil.copy(src, dst)
    os.remove(src)

def vlog(*msg):
    if verbose:
        for m in msg:
            print m,
        print

def splitnewlines(text):
    '''like str.splitlines, but only split on newlines.
    keep line endings.'''
    i = 0
    lines = []
    while True:
        n = text.find('\n', i)
        if n == -1:
            last = text[i:]
            if last:
                lines.append(last)
            return lines
        lines.append(text[i:n+1])
        i = n + 1

def extract_missing_features(lines):
    '''Extract missing/unknown features log lines as a list'''
    missing = []
    for line in lines:
        if not line.startswith(SKIPPED_PREFIX):
            continue
        line = line.splitlines()[0]
        missing.append(line[len(SKIPPED_PREFIX):])

    return missing

def show_diff(expected, output):
    for line in difflib.unified_diff(expected, output,
            "Expected output", "Test output"):
        sys.stdout.write(line)

def find_program(program):
    """Search PATH for a executable program"""
    for p in os.environ.get('PATH', os.defpath).split(os.pathsep):
        name = os.path.join(p, program)
        if os.access(name, os.X_OK):
            return name
    return None

def check_required_tools():
    # Before we go any further, check for pre-requisite tools
    # stuff from coreutils (cat, rm, etc) are not tested
    for p in required_tools:
        if os.name == 'nt':
            p += '.exe'
        found = find_program(p)
        if found:
            vlog("# Found prerequisite", p, "at", found)
        else:
            print "WARNING: Did not find prerequisite tool: "+p

def cleanup_exit():
    if not options.keep_tmpdir:
        if verbose:
            print "# Cleaning up HGTMP", HGTMP
        shutil.rmtree(HGTMP, True)

def use_correct_python():
    # some tests run python interpreter. they must use same
    # interpreter we use or bad things will happen.
    exedir, exename = os.path.split(sys.executable)
    if exename == 'python':
        path = find_program('python')
        if os.path.dirname(path) == exedir:
            return
    vlog('# Making python executable in test path use correct Python')
    my_python = os.path.join(BINDIR, 'python')
    try:
        os.symlink(sys.executable, my_python)
    except AttributeError:
        # windows fallback
        shutil.copyfile(sys.executable, my_python)
        shutil.copymode(sys.executable, my_python)

def install_hg():
    global python
    vlog("# Performing temporary installation of HG")
    installerrs = os.path.join("tests", "install.err")

    # Run installer in hg root
    os.chdir(os.path.join(os.path.dirname(sys.argv[0]), '..'))
    cmd = ('%s setup.py clean --all'
           ' install --force --home="%s" --install-lib="%s"'
           ' --install-scripts="%s" >%s 2>&1'
           % (sys.executable, INST, PYTHONDIR, BINDIR, installerrs))
    vlog("# Running", cmd)
    if os.system(cmd) == 0:
        if not verbose:
            os.remove(installerrs)
    else:
        f = open(installerrs)
        for line in f:
            print line,
        f.close()
        sys.exit(1)
    os.chdir(TESTDIR)

    os.environ["PATH"] = "%s%s%s" % (BINDIR, os.pathsep, os.environ["PATH"])

    pydir = os.pathsep.join([PYTHONDIR, TESTDIR])
    pythonpath = os.environ.get("PYTHONPATH")
    if pythonpath:
        pythonpath = pydir + os.pathsep + pythonpath
    else:
        pythonpath = pydir
    os.environ["PYTHONPATH"] = pythonpath

    use_correct_python()

    if coverage:
        vlog("# Installing coverage wrapper")
        os.environ['COVERAGE_FILE'] = COVERAGE_FILE
        if os.path.exists(COVERAGE_FILE):
            os.unlink(COVERAGE_FILE)
        # Create a wrapper script to invoke hg via coverage.py
        os.rename(os.path.join(BINDIR, "hg"), os.path.join(BINDIR, "_hg.py"))
        f = open(os.path.join(BINDIR, 'hg'), 'w')
        f.write('#!' + sys.executable + '\n')
        f.write('import sys, os; os.execv(sys.executable, [sys.executable, '
                '"%s", "-x", "%s"] + sys.argv[1:])\n' %
                (os.path.join(TESTDIR, 'coverage.py'),
                 os.path.join(BINDIR, '_hg.py')))
        f.close()
        os.chmod(os.path.join(BINDIR, 'hg'), 0700)
        python = '"%s" "%s" -x' % (sys.executable,
                                   os.path.join(TESTDIR,'coverage.py'))

def output_coverage():
    vlog("# Producing coverage report")
    omit = [BINDIR, TESTDIR, PYTHONDIR]
    if not options.cover_stdlib:
        # Exclude as system paths (ignoring empty strings seen on win)
        omit += [x for x in sys.path if x != '']
    omit = ','.join(omit)
    os.chdir(PYTHONDIR)
    cmd = '"%s" "%s" -i -r "--omit=%s"' % (
        sys.executable, os.path.join(TESTDIR, 'coverage.py'), omit)
    vlog("# Running: "+cmd)
    os.system(cmd)
    if options.annotate:
        adir = os.path.join(TESTDIR, 'annotated')
        if not os.path.isdir(adir):
            os.mkdir(adir)
        cmd = '"%s" "%s" -i -a "--directory=%s" "--omit=%s"' % (
            sys.executable, os.path.join(TESTDIR, 'coverage.py'),
            adir, omit)
        vlog("# Running: "+cmd)
        os.system(cmd)

class Timeout(Exception):
    pass

def alarmed(signum, frame):
    raise Timeout

def run(cmd):
    """Run command in a sub-process, capturing the output (stdout and stderr).
    Return the exist code, and output."""
    # TODO: Use subprocess.Popen if we're running on Python 2.4
    if os.name == 'nt':
        tochild, fromchild = os.popen4(cmd)
        tochild.close()
        output = fromchild.read()
        ret = fromchild.close()
        if ret == None:
            ret = 0
    else:
        proc = popen2.Popen4(cmd)
        try:
            output = ''
            proc.tochild.close()
            output = proc.fromchild.read()
            ret = proc.wait()
            if os.WIFEXITED(ret):
                ret = os.WEXITSTATUS(ret)
        except Timeout:
            vlog('# Process %d timed out - killing it' % proc.pid)
            os.kill(proc.pid, signal.SIGTERM)
            ret = proc.wait()
            if ret == 0:
                ret = signal.SIGTERM << 8
            output += ("\n### Abort: timeout after %d seconds.\n"
                       % options.timeout)
    return ret, splitnewlines(output)

def run_one(test, skips, fails):
    '''tristate output:
    None -> skipped
    True -> passed
    False -> failed'''

    def skip(msg):
        if not verbose:
            skips.append((test, msg))
        else:
            print "\nSkipping %s: %s" % (test, msg)
        return None

    def fail(msg):
        fails.append((test, msg))
        print "\nERROR: %s %s" % (test, msg)
        return None

    vlog("# Test", test)

    # create a fresh hgrc
    hgrc = file(HGRCPATH, 'w+')
    hgrc.write('[ui]\n')
    hgrc.write('slash = True\n')
    hgrc.write('[defaults]\n')
    hgrc.write('backout = -d "0 0"\n')
    hgrc.write('commit = -d "0 0"\n')
    hgrc.write('debugrawcommit = -d "0 0"\n')
    hgrc.write('tag = -d "0 0"\n')
    hgrc.close()

    err = os.path.join(TESTDIR, test+".err")
    ref = os.path.join(TESTDIR, test+".out")
    testpath = os.path.join(TESTDIR, test)

    if os.path.exists(err):
        os.remove(err)       # Remove any previous output files

    # Make a tmp subdirectory to work in
    tmpd = os.path.join(HGTMP, test)
    os.mkdir(tmpd)
    os.chdir(tmpd)

    try:
        tf = open(testpath)
        firstline = tf.readline().rstrip()
        tf.close()
    except:
        firstline = ''
    lctest = test.lower()

    if lctest.endswith('.py') or firstline == '#!/usr/bin/env python':
        cmd = '%s "%s"' % (python, testpath)
    elif lctest.endswith('.bat'):
        # do not run batch scripts on non-windows
        if os.name != 'nt':
            return skip("batch script")
        # To reliably get the error code from batch files on WinXP,
        # the "cmd /c call" prefix is needed. Grrr
        cmd = 'cmd /c call "%s"' % testpath
    else:
        # do not run shell scripts on windows
        if os.name == 'nt':
            return skip("shell script")
        # do not try to run non-executable programs
        if not os.access(testpath, os.X_OK):
            return skip("not executable")
        cmd = '"%s"' % testpath

    if options.timeout > 0:
        signal.alarm(options.timeout)

    vlog("# Running", cmd)
    ret, out = run(cmd)
    vlog("# Ret was:", ret)

    if options.timeout > 0:
        signal.alarm(0)

    skipped = (ret == SKIPPED_STATUS)
    # If reference output file exists, check test output against it
    if os.path.exists(ref):
        f = open(ref, "r")
        ref_out = splitnewlines(f.read())
        f.close()
    else:
        ref_out = []
    if skipped:
        missing = extract_missing_features(out)
        if not missing:
            missing = ['irrelevant']
        skip(missing[-1])
    elif out != ref_out:
        if ret:
            fail("output changed and returned error code %d" % ret)
        else:
            fail("output changed")
        show_diff(ref_out, out)
        ret = 1
    elif ret:
        fail("returned error code %d" % ret)

    if not verbose:
        sys.stdout.write(skipped and 's' or '.')
        sys.stdout.flush()

    if ret != 0 and not skipped:
        # Save errors to a file for diagnosis
        f = open(err, "wb")
        for line in out:
            f.write(line)
        f.close()

    # Kill off any leftover daemon processes
    try:
        fp = file(DAEMON_PIDS)
        for line in fp:
            try:
                pid = int(line)
            except ValueError:
                continue
            try:
                os.kill(pid, 0)
                vlog('# Killing daemon process %d' % pid)
                os.kill(pid, signal.SIGTERM)
                time.sleep(0.25)
                os.kill(pid, 0)
                vlog('# Daemon process %d is stuck - really killing it' % pid)
                os.kill(pid, signal.SIGKILL)
            except OSError, err:
                if err.errno != errno.ESRCH:
                    raise
        fp.close()
        os.unlink(DAEMON_PIDS)
    except IOError:
        pass

    os.chdir(TESTDIR)
    if not options.keep_tmpdir:
        shutil.rmtree(tmpd, True)
    if skipped:
        return None
    return ret == 0

if not options.child:
    os.umask(022)

    check_required_tools()

# Reset some environment variables to well-known values so that
# the tests produce repeatable output.
os.environ['LANG'] = os.environ['LC_ALL'] = 'C'
os.environ['TZ'] = 'GMT'
os.environ["EMAIL"] = "Foo Bar <foo.bar@example.com>"

TESTDIR = os.environ["TESTDIR"] = os.getcwd()
HGTMP = os.environ['HGTMP'] = tempfile.mkdtemp('', 'hgtests.', options.tmpdir)
DAEMON_PIDS = None
HGRCPATH = None

os.environ["HGEDITOR"] = sys.executable + ' -c "import sys; sys.exit(0)"'
os.environ["HGMERGE"] = "internal:merge"
os.environ["HGUSER"]   = "test"
os.environ["HGENCODING"] = "ascii"
os.environ["HGENCODINGMODE"] = "strict"
os.environ["HGPORT"] = str(options.port)
os.environ["HGPORT1"] = str(options.port + 1)
os.environ["HGPORT2"] = str(options.port + 2)

if options.with_hg:
    INST = options.with_hg
else:
    INST = os.path.join(HGTMP, "install")
BINDIR = os.path.join(INST, "bin")
PYTHONDIR = os.path.join(INST, "lib", "python")
COVERAGE_FILE = os.path.join(TESTDIR, ".coverage")

def run_children(tests):
    if not options.with_hg:
        install_hg()

    optcopy = dict(options.__dict__)
    optcopy['jobs'] = 1
    optcopy['with_hg'] = INST
    opts = []
    for opt, value in optcopy.iteritems():
        name = '--' + opt.replace('_', '-')
        if value is True:
            opts.append(name)
        elif value is not None:
            opts.append(name + '=' + str(value))

    tests.reverse()
    jobs = [[] for j in xrange(options.jobs)]
    while tests:
        for j in xrange(options.jobs):
            if not tests: break
            jobs[j].append(tests.pop())
    fps = {}
    for j in xrange(len(jobs)):
        job = jobs[j]
        if not job:
            continue
        rfd, wfd = os.pipe()
        childopts = ['--child=%d' % wfd, '--port=%d' % (options.port + j * 3)]
        cmdline = [python, sys.argv[0]] + opts + childopts + job
        vlog(' '.join(cmdline))
        fps[os.spawnvp(os.P_NOWAIT, cmdline[0], cmdline)] = os.fdopen(rfd, 'r')
        os.close(wfd)
    failures = 0
    tested, skipped, failed = 0, 0, 0
    skips = []
    fails = []
    while fps:
        pid, status = os.wait()
        fp = fps.pop(pid)
        l = fp.read().splitlines()
        test, skip, fail = map(int, l[:3])
        split = -fail or len(l)
        for s in l[3:split]:
            skips.append(s.split(" ", 1))
        for s in l[split:]:
            fails.append(s.split(" ", 1))
        tested += test
        skipped += skip
        failed += fail
        vlog('pid %d exited, status %d' % (pid, status))
        failures |= status
    print
    for s in skips:
        print "Skipped %s: %s" % (s[0], s[1])
    for s in fails:
        print "Failed %s: %s" % (s[0], s[1])
    print "# Ran %d tests, %d skipped, %d failed." % (
        tested, skipped, failed)
    sys.exit(failures != 0)

def run_tests(tests):
    global DAEMON_PIDS, HGRCPATH
    DAEMON_PIDS = os.environ["DAEMON_PIDS"] = os.path.join(HGTMP, 'daemon.pids')
    HGRCPATH = os.environ["HGRCPATH"] = os.path.join(HGTMP, '.hgrc')

    try:
        if not options.with_hg:
            install_hg()

        if options.timeout > 0:
            try:
                signal.signal(signal.SIGALRM, alarmed)
                vlog('# Running tests with %d-second timeout' %
                     options.timeout)
            except AttributeError:
                print 'WARNING: cannot run tests with timeouts'
                options.timeout = 0

        tested = 0
        failed = 0
        skipped = 0

        if options.restart:
            orig = list(tests)
            while tests:
                if os.path.exists(tests[0] + ".err"):
                    break
                tests.pop(0)
            if not tests:
                print "running all tests"
                tests = orig

        skips = []
        fails = []
        for test in tests:
            if options.retest and not os.path.exists(test + ".err"):
                skipped += 1
                continue
            ret = run_one(test, skips, fails)
            if ret is None:
                skipped += 1
            elif not ret:
                if options.interactive:
                    print "Accept this change? [n] ",
                    answer = sys.stdin.readline().strip()
                    if answer.lower() in "y yes".split():
                        rename(test + ".err", test + ".out")
                        tested += 1
                        fails.pop()
                        continue
                failed += 1
                if options.first:
                    break
            tested += 1

        if options.child:
            fp = os.fdopen(options.child, 'w')
            fp.write('%d\n%d\n%d\n' % (tested, skipped, failed))
            for s in skips:
                fp.write("%s %s\n" % s)
            for s in fails:
                fp.write("%s %s\n" % s)
            fp.close()
        else:
            print
            for s in skips:
                print "Skipped %s: %s" % s
            for s in fails:
                print "Failed %s: %s" % s
            print "# Ran %d tests, %d skipped, %d failed." % (
                tested, skipped, failed)

        if coverage:
            output_coverage()
    except KeyboardInterrupt:
        failed = True
        print "\ninterrupted!"

    if failed:
        sys.exit(1)

if len(args) == 0:
    args = os.listdir(".")
    args.sort()

tests = []
for test in args:
    if (test.startswith("test-") and '~' not in test and
        ('.' not in test or test.endswith('.py') or
         test.endswith('.bat'))):
        tests.append(test)

vlog("# Using TESTDIR", TESTDIR)
vlog("# Using HGTMP", HGTMP)

try:
    if len(tests) > 1 and options.jobs > 1:
        run_children(tests)
    else:
        run_tests(tests)
finally:
    cleanup_exit()