Mercurial > hg
view tests/testlib/sigpipe-remote.py @ 50706:0452af304808
stream-clone: add a v3 version of the protocol
This new version is less rigid regarding the extract number of files and number
of bytes to be actually transfered, it also lays the groundwork for other
improvements.
The format stays experimental, but this is an interesting base to build upon.
author | Arseniy Alekseyev <aalekseyev@janestreet.com> |
---|---|
date | Thu, 01 Jun 2023 17:39:22 +0100 |
parents | 70df51a2c2ce |
children |
line wrap: on
line source
#!/usr/bin/env python3 import io import os import subprocess import sys import time if isinstance(sys.stdout.buffer, io.BufferedWriter): print('SIGPIPE-HELPER: script need unbuffered output', file=sys.stderr) sys.exit(255) DEBUG_FILE = os.environ.get('SIGPIPE_REMOTE_DEBUG_FILE') if DEBUG_FILE is None: debug_stream = sys.stderr.buffer else: debug_stream = open(DEBUG_FILE, 'bw', buffering=0) SYNCFILE1 = os.environ.get('SYNCFILE1') SYNCFILE2 = os.environ.get('SYNCFILE2') if SYNCFILE1 is None: print('SIGPIPE-HELPER: missing variable $SYNCFILE1', file=sys.stderr) sys.exit(255) if SYNCFILE2 is None: print('SIGPIPE-HELPER: missing variable $SYNCFILE2', file=sys.stderr) sys.exit(255) def _timeout_factor(): """return the current modification to timeout""" default = int(os.environ.get('HGTEST_TIMEOUT_DEFAULT', 360)) current = int(os.environ.get('HGTEST_TIMEOUT', default)) if current == 0: return 1 return current / float(default) def wait_file(path, timeout=10): timeout *= _timeout_factor() start = time.time() while not os.path.exists(path): if (time.time() - start) > timeout: raise RuntimeError(b"timed out waiting for file: %s" % path) time.sleep(0.01) def write_file(path, content=b''): with open(path, 'wb') as f: f.write(content) # end of mercurial.testing content def sysbytes(s): return s.encode('utf-8') def sysstr(s): return s.decode('latin-1') debug_stream.write(b'SIGPIPE-HELPER: Starting\n') TESTLIB_DIR = os.path.dirname(sys.argv[0]) WAIT_SCRIPT = os.path.join(TESTLIB_DIR, 'wait-on-file') hooks_cmd = '%s 10 %s %s' hooks_cmd %= ( WAIT_SCRIPT, SYNCFILE2, SYNCFILE1, ) try: cmd = ['hg'] cmd += sys.argv[1:] sub = subprocess.Popen( cmd, bufsize=0, close_fds=True, stdin=sys.stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) basedir = os.path.dirname(sys.argv[0]) worker = os.path.join(basedir, 'sigpipe-worker.py') cmd = [sys.executable, worker] stdout_worker = subprocess.Popen( cmd, bufsize=0, close_fds=True, stdin=sub.stdout, stdout=sys.stdout, stderr=sys.stderr, ) stderr_worker = subprocess.Popen( cmd, bufsize=0, close_fds=True, stdin=sub.stderr, stdout=sys.stderr, stderr=sys.stderr, ) debug_stream.write(b'SIGPIPE-HELPER: Redirection in place\n') os.close(sub.stdout.fileno()) os.close(sub.stderr.fileno()) debug_stream.write(b'SIGPIPE-HELPER: pipes closed in main\n') try: wait_file(sysbytes(SYNCFILE1)) except RuntimeError as exc: msg = sysbytes(str(exc)) debug_stream.write(b'SIGPIPE-HELPER: wait failed: %s\n' % msg) else: debug_stream.write(b'SIGPIPE-HELPER: SYNCFILE1 detected\n') stdout_worker.kill() stderr_worker.kill() stdout_worker.wait(10) stderr_worker.wait(10) debug_stream.write(b'SIGPIPE-HELPER: worker killed\n') debug_stream.write(b'SIGPIPE-HELPER: creating SYNCFILE2\n') write_file(sysbytes(SYNCFILE2)) finally: debug_stream.write(b'SIGPIPE-HELPER: Shutting down\n') if not sys.stdin.closed: sys.stdin.close() try: sub.wait(timeout=30) except subprocess.TimeoutExpired: msg = b'SIGPIPE-HELPER: Server process failed to terminate\n' debug_stream.write(msg) sub.kill() sub.wait() msg = b'SIGPIPE-HELPER: Server process killed\n' else: msg = b'SIGPIPE-HELPER: Server process terminated with status %d\n' msg %= sub.returncode debug_stream.write(msg) debug_stream.write(b'SIGPIPE-HELPER: Shut down\n')