view tests/testlib/sigpipe-remote.py @ 47571:e9c5c368be17

run-tests: also catch double-escapes for $TESTTMP On Windows the `$TESTTMP` string "repr" is different than its value, because `\` become `\\`. We introduce a new `$STR_REPR_TESTTMP` substitution to match that too. Differential Revision: https://phab.mercurial-scm.org/D11001
author Raphaël Gomès <rgomes@octobus.net>
date Wed, 23 Jun 2021 14:34:55 +0200
parents ed81f2be5527
children df6148ca7120
line wrap: on
line source

#!/usr/bin/env python3
from __future__ import print_function

import os
import subprocess
import sys
import threading
import time

# we cannot use mercurial.testing as long as python2 is not dropped as the test will only install the mercurial module for python2 in python2 run


def _timeout_factor():
    """return the current modification to timeout"""
    default = int(os.environ.get('HGTEST_TIMEOUT_DEFAULT', 360))
    current = int(os.environ.get('HGTEST_TIMEOUT', default))
    if current == 0:
        return 1
    return current / float(default)


def wait_file(path, timeout=10):
    timeout *= _timeout_factor()
    start = time.time()
    while not os.path.exists(path):
        if (time.time() - start) > timeout:
            raise RuntimeError(b"timed out waiting for file: %s" % path)
        time.sleep(0.01)


def write_file(path, content=b''):
    with open(path, 'wb') as f:
        f.write(content)


# end of mercurial.testing content

if sys.version_info[0] < 3:
    print('SIGPIPE-HELPER: script should run with Python 3', file=sys.stderr)
    sys.exit(255)


def sysbytes(s):
    return s.encode('utf-8')


def sysstr(s):
    return s.decode('latin-1')


piped_stdout = os.pipe2(os.O_NONBLOCK | os.O_CLOEXEC)
piped_stderr = os.pipe2(os.O_NONBLOCK | os.O_CLOEXEC)

stdout_writer = os.fdopen(piped_stdout[1], "rb")
stdout_reader = os.fdopen(piped_stdout[0], "rb")
stderr_writer = os.fdopen(piped_stderr[1], "rb")
stderr_reader = os.fdopen(piped_stderr[0], "rb")

DEBUG_FILE = os.environ.get('SIGPIPE_REMOTE_DEBUG_FILE')
if DEBUG_FILE is None:
    debug_stream = sys.stderr.buffer
else:
    debug_stream = open(DEBUG_FILE, 'bw', buffering=0)

SYNCFILE1 = os.environ.get('SYNCFILE1')
SYNCFILE2 = os.environ.get('SYNCFILE2')
if SYNCFILE1 is None:
    print('SIGPIPE-HELPER: missing variable $SYNCFILE1', file=sys.stderr)
    sys.exit(255)
if SYNCFILE2 is None:
    print('SIGPIPE-HELPER: missing variable $SYNCFILE2', file=sys.stderr)
    sys.exit(255)

debug_stream.write(b'SIGPIPE-HELPER: Starting\n')

TESTLIB_DIR = os.path.dirname(sys.argv[0])
WAIT_SCRIPT = os.path.join(TESTLIB_DIR, 'wait-on-file')

hooks_cmd = '%s 10 %s %s'
hooks_cmd %= (
    WAIT_SCRIPT,
    SYNCFILE2,
    SYNCFILE1,
)

cmd = ['hg']
cmd += sys.argv[1:]
sub = subprocess.Popen(
    cmd,
    bufsize=0,
    close_fds=True,
    stdin=sys.stdin,
    stdout=stdout_writer,
    stderr=stderr_writer,
)

debug_stream.write(b'SIGPIPE-HELPER: Mercurial started\n')


shut_down = threading.Event()

close_lock = threading.Lock()


def _read(stream):
    try:
        return stream.read()
    except ValueError:
        # read on closed file
        return None


def forward_stdout():
    while not shut_down.is_set():
        c = _read(stdout_reader)
        while c is not None:
            sys.stdout.buffer.write(c)
            c = _read(stdout_reader)
        time.sleep(0.001)
    with close_lock:
        if not stdout_reader.closed:
            stdout_reader.close()
            debug_stream.write(b'SIGPIPE-HELPER: stdout closed\n')


def forward_stderr():
    while not shut_down.is_set():
        c = _read(stderr_reader)
        if c is not None:
            sys.stderr.buffer.write(c)
            c = _read(stderr_reader)
        time.sleep(0.001)
    with close_lock:
        if not stderr_reader.closed:
            stderr_reader.close()
            debug_stream.write(b'SIGPIPE-HELPER: stderr closed\n')


stdout_thread = threading.Thread(target=forward_stdout, daemon=True)
stderr_thread = threading.Thread(target=forward_stderr, daemon=True)

try:
    stdout_thread.start()
    stderr_thread.start()

    debug_stream.write(b'SIGPIPE-HELPER: Redirection in place\n')

    try:
        wait_file(sysbytes(SYNCFILE1))
    except RuntimeError as exc:
        msg = sysbytes(str(exc))
        debug_stream.write(b'SIGPIPE-HELPER: wait failed: %s\n' % msg)
    else:
        debug_stream.write(b'SIGPIPE-HELPER: SYNCFILE1 detected\n')
    with close_lock:
        if not stdout_reader.closed:
            stdout_reader.close()
        if not stderr_reader.closed:
            stderr_reader.close()
        sys.stdin.close()
        debug_stream.write(b'SIGPIPE-HELPER: pipes closed\n')
    debug_stream.write(b'SIGPIPE-HELPER: creating SYNCFILE2\n')
    write_file(sysbytes(SYNCFILE2))
finally:
    debug_stream.write(b'SIGPIPE-HELPER: Shutting down\n')
    shut_down.set()
    if not sys.stdin.closed:
        sys.stdin.close()
    try:
        sub.wait(timeout=30)
    except subprocess.TimeoutExpired:
        msg = b'SIGPIPE-HELPER: Server process failed to terminate\n'
        debug_stream.write(msg)
    else:
        debug_stream.write(b'SIGPIPE-HELPER: Server process terminated\n')
    debug_stream.write(b'SIGPIPE-HELPER: Shut down\n')