diff tests/testlib/sigpipe-remote.py @ 47557:ed81f2be5527

test: use a python script in `test-transaction-rollback-on-sigpipe.t` This still does not work on Windows, but at least this is a python script now. Differential Revision: https://phab.mercurial-scm.org/D10947
author Pierre-Yves David <pierre-yves.david@octobus.net>
date Fri, 02 Jul 2021 20:20:37 +0200
parents
children df6148ca7120
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/testlib/sigpipe-remote.py	Fri Jul 02 20:20:37 2021 +0200
@@ -0,0 +1,176 @@
+#!/usr/bin/env python3
+from __future__ import print_function
+
+import os
+import subprocess
+import sys
+import threading
+import time
+
+# we cannot use mercurial.testing as long as python2 is not dropped as the test will only install the mercurial module for python2 in python2 run
+
+
+def _timeout_factor():
+    """return the current modification to timeout"""
+    default = int(os.environ.get('HGTEST_TIMEOUT_DEFAULT', 360))
+    current = int(os.environ.get('HGTEST_TIMEOUT', default))
+    if current == 0:
+        return 1
+    return current / float(default)
+
+
+def wait_file(path, timeout=10):
+    timeout *= _timeout_factor()
+    start = time.time()
+    while not os.path.exists(path):
+        if (time.time() - start) > timeout:
+            raise RuntimeError(b"timed out waiting for file: %s" % path)
+        time.sleep(0.01)
+
+
+def write_file(path, content=b''):
+    with open(path, 'wb') as f:
+        f.write(content)
+
+
+# end of mercurial.testing content
+
+if sys.version_info[0] < 3:
+    print('SIGPIPE-HELPER: script should run with Python 3', file=sys.stderr)
+    sys.exit(255)
+
+
+def sysbytes(s):
+    return s.encode('utf-8')
+
+
+def sysstr(s):
+    return s.decode('latin-1')
+
+
+piped_stdout = os.pipe2(os.O_NONBLOCK | os.O_CLOEXEC)
+piped_stderr = os.pipe2(os.O_NONBLOCK | os.O_CLOEXEC)
+
+stdout_writer = os.fdopen(piped_stdout[1], "rb")
+stdout_reader = os.fdopen(piped_stdout[0], "rb")
+stderr_writer = os.fdopen(piped_stderr[1], "rb")
+stderr_reader = os.fdopen(piped_stderr[0], "rb")
+
+DEBUG_FILE = os.environ.get('SIGPIPE_REMOTE_DEBUG_FILE')
+if DEBUG_FILE is None:
+    debug_stream = sys.stderr.buffer
+else:
+    debug_stream = open(DEBUG_FILE, 'bw', buffering=0)
+
+SYNCFILE1 = os.environ.get('SYNCFILE1')
+SYNCFILE2 = os.environ.get('SYNCFILE2')
+if SYNCFILE1 is None:
+    print('SIGPIPE-HELPER: missing variable $SYNCFILE1', file=sys.stderr)
+    sys.exit(255)
+if SYNCFILE2 is None:
+    print('SIGPIPE-HELPER: missing variable $SYNCFILE2', file=sys.stderr)
+    sys.exit(255)
+
+debug_stream.write(b'SIGPIPE-HELPER: Starting\n')
+
+TESTLIB_DIR = os.path.dirname(sys.argv[0])
+WAIT_SCRIPT = os.path.join(TESTLIB_DIR, 'wait-on-file')
+
+hooks_cmd = '%s 10 %s %s'
+hooks_cmd %= (
+    WAIT_SCRIPT,
+    SYNCFILE2,
+    SYNCFILE1,
+)
+
+cmd = ['hg']
+cmd += sys.argv[1:]
+sub = subprocess.Popen(
+    cmd,
+    bufsize=0,
+    close_fds=True,
+    stdin=sys.stdin,
+    stdout=stdout_writer,
+    stderr=stderr_writer,
+)
+
+debug_stream.write(b'SIGPIPE-HELPER: Mercurial started\n')
+
+
+shut_down = threading.Event()
+
+close_lock = threading.Lock()
+
+
+def _read(stream):
+    try:
+        return stream.read()
+    except ValueError:
+        # read on closed file
+        return None
+
+
+def forward_stdout():
+    while not shut_down.is_set():
+        c = _read(stdout_reader)
+        while c is not None:
+            sys.stdout.buffer.write(c)
+            c = _read(stdout_reader)
+        time.sleep(0.001)
+    with close_lock:
+        if not stdout_reader.closed:
+            stdout_reader.close()
+            debug_stream.write(b'SIGPIPE-HELPER: stdout closed\n')
+
+
+def forward_stderr():
+    while not shut_down.is_set():
+        c = _read(stderr_reader)
+        if c is not None:
+            sys.stderr.buffer.write(c)
+            c = _read(stderr_reader)
+        time.sleep(0.001)
+    with close_lock:
+        if not stderr_reader.closed:
+            stderr_reader.close()
+            debug_stream.write(b'SIGPIPE-HELPER: stderr closed\n')
+
+
+stdout_thread = threading.Thread(target=forward_stdout, daemon=True)
+stderr_thread = threading.Thread(target=forward_stderr, daemon=True)
+
+try:
+    stdout_thread.start()
+    stderr_thread.start()
+
+    debug_stream.write(b'SIGPIPE-HELPER: Redirection in place\n')
+
+    try:
+        wait_file(sysbytes(SYNCFILE1))
+    except RuntimeError as exc:
+        msg = sysbytes(str(exc))
+        debug_stream.write(b'SIGPIPE-HELPER: wait failed: %s\n' % msg)
+    else:
+        debug_stream.write(b'SIGPIPE-HELPER: SYNCFILE1 detected\n')
+    with close_lock:
+        if not stdout_reader.closed:
+            stdout_reader.close()
+        if not stderr_reader.closed:
+            stderr_reader.close()
+        sys.stdin.close()
+        debug_stream.write(b'SIGPIPE-HELPER: pipes closed\n')
+    debug_stream.write(b'SIGPIPE-HELPER: creating SYNCFILE2\n')
+    write_file(sysbytes(SYNCFILE2))
+finally:
+    debug_stream.write(b'SIGPIPE-HELPER: Shutting down\n')
+    shut_down.set()
+    if not sys.stdin.closed:
+        sys.stdin.close()
+    try:
+        sub.wait(timeout=30)
+    except subprocess.TimeoutExpired:
+        msg = b'SIGPIPE-HELPER: Server process failed to terminate\n'
+        debug_stream.write(msg)
+    else:
+        debug_stream.write(b'SIGPIPE-HELPER: Server process terminated\n')
+    debug_stream.write(b'SIGPIPE-HELPER: Shut down\n')