tests/testlib/sigpipe-remote.py
changeset 47618 27ff81547d35
parent 47617 d5fc1b59a2df
child 47636 b2ed9480b34a
equal deleted inserted replaced
47617:d5fc1b59a2df 47618:27ff81547d35
     3 
     3 
     4 import io
     4 import io
     5 import os
     5 import os
     6 import subprocess
     6 import subprocess
     7 import sys
     7 import sys
     8 import threading
       
     9 import time
     8 import time
    10 
     9 
    11 # we cannot use mercurial.testing as long as python2 is not dropped as the test will only install the mercurial module for python2 in python2 run
    10 # we cannot use mercurial.testing as long as python2 is not dropped as the test will only install the mercurial module for python2 in python2 run
    12 
    11 
    13 if isinstance(sys.stdout.buffer, io.BufferedWriter):
    12 if isinstance(sys.stdout.buffer, io.BufferedWriter):
    66 
    65 
    67 def sysstr(s):
    66 def sysstr(s):
    68     return s.decode('latin-1')
    67     return s.decode('latin-1')
    69 
    68 
    70 
    69 
    71 piped_stdout = os.pipe2(os.O_NONBLOCK | os.O_CLOEXEC)
       
    72 piped_stderr = os.pipe2(os.O_NONBLOCK | os.O_CLOEXEC)
       
    73 
       
    74 stdout_writer = os.fdopen(piped_stdout[1], "rb")
       
    75 stdout_reader = os.fdopen(piped_stdout[0], "rb")
       
    76 stderr_writer = os.fdopen(piped_stderr[1], "rb")
       
    77 stderr_reader = os.fdopen(piped_stderr[0], "rb")
       
    78 
       
    79 debug_stream.write(b'SIGPIPE-HELPER: Starting\n')
    70 debug_stream.write(b'SIGPIPE-HELPER: Starting\n')
    80 
    71 
    81 TESTLIB_DIR = os.path.dirname(sys.argv[0])
    72 TESTLIB_DIR = os.path.dirname(sys.argv[0])
    82 WAIT_SCRIPT = os.path.join(TESTLIB_DIR, 'wait-on-file')
    73 WAIT_SCRIPT = os.path.join(TESTLIB_DIR, 'wait-on-file')
    83 
    74 
    86     WAIT_SCRIPT,
    77     WAIT_SCRIPT,
    87     SYNCFILE2,
    78     SYNCFILE2,
    88     SYNCFILE1,
    79     SYNCFILE1,
    89 )
    80 )
    90 
    81 
    91 cmd = ['hg']
    82 try:
    92 cmd += sys.argv[1:]
    83     cmd = ['hg']
    93 sub = subprocess.Popen(
    84     cmd += sys.argv[1:]
    94     cmd,
    85     sub = subprocess.Popen(
    95     bufsize=0,
    86         cmd,
    96     close_fds=True,
    87         bufsize=0,
    97     stdin=sys.stdin,
    88         close_fds=True,
    98     stdout=stdout_writer,
    89         stdin=sys.stdin,
    99     stderr=stderr_writer,
    90         stdout=subprocess.PIPE,
   100 )
    91         stderr=subprocess.PIPE,
       
    92     )
   101 
    93 
   102 debug_stream.write(b'SIGPIPE-HELPER: Mercurial started\n')
    94     basedir = os.path.dirname(sys.argv[0])
       
    95     worker = os.path.join(basedir, 'sigpipe-worker.py')
   103 
    96 
       
    97     cmd = [sys.executable, worker]
   104 
    98 
   105 shut_down = threading.Event()
    99     stdout_worker = subprocess.Popen(
       
   100         cmd,
       
   101         bufsize=0,
       
   102         close_fds=True,
       
   103         stdin=sub.stdout,
       
   104         stdout=sys.stdout,
       
   105         stderr=sys.stderr,
       
   106     )
   106 
   107 
   107 close_lock = threading.Lock()
   108     stderr_worker = subprocess.Popen(
   108 
   109         cmd,
   109 
   110         bufsize=0,
   110 def _read(stream):
   111         close_fds=True,
   111     try:
   112         stdin=sub.stderr,
   112         return stream.read()
   113         stdout=sys.stderr,
   113     except ValueError:
   114         stderr=sys.stderr,
   114         # read on closed file
   115     )
   115         return None
       
   116 
       
   117 
       
   118 def forward_stdout():
       
   119     while not shut_down.is_set():
       
   120         c = _read(stdout_reader)
       
   121         while c is not None:
       
   122             sys.stdout.buffer.write(c)
       
   123             c = _read(stdout_reader)
       
   124         time.sleep(0.001)
       
   125     with close_lock:
       
   126         if not stdout_reader.closed:
       
   127             stdout_reader.close()
       
   128             debug_stream.write(b'SIGPIPE-HELPER: stdout closed\n')
       
   129 
       
   130 
       
   131 def forward_stderr():
       
   132     while not shut_down.is_set():
       
   133         c = _read(stderr_reader)
       
   134         if c is not None:
       
   135             sys.stderr.buffer.write(c)
       
   136             c = _read(stderr_reader)
       
   137         time.sleep(0.001)
       
   138     with close_lock:
       
   139         if not stderr_reader.closed:
       
   140             stderr_reader.close()
       
   141             debug_stream.write(b'SIGPIPE-HELPER: stderr closed\n')
       
   142 
       
   143 
       
   144 stdout_thread = threading.Thread(target=forward_stdout, daemon=True)
       
   145 stderr_thread = threading.Thread(target=forward_stderr, daemon=True)
       
   146 
       
   147 try:
       
   148     stdout_thread.start()
       
   149     stderr_thread.start()
       
   150 
       
   151     debug_stream.write(b'SIGPIPE-HELPER: Redirection in place\n')
   116     debug_stream.write(b'SIGPIPE-HELPER: Redirection in place\n')
       
   117     os.close(sub.stdout.fileno())
       
   118     os.close(sub.stderr.fileno())
       
   119     debug_stream.write(b'SIGPIPE-HELPER: pipes closed in main\n')
   152 
   120 
   153     try:
   121     try:
   154         wait_file(sysbytes(SYNCFILE1))
   122         wait_file(sysbytes(SYNCFILE1))
   155     except RuntimeError as exc:
   123     except RuntimeError as exc:
   156         msg = sysbytes(str(exc))
   124         msg = sysbytes(str(exc))
   157         debug_stream.write(b'SIGPIPE-HELPER: wait failed: %s\n' % msg)
   125         debug_stream.write(b'SIGPIPE-HELPER: wait failed: %s\n' % msg)
   158     else:
   126     else:
   159         debug_stream.write(b'SIGPIPE-HELPER: SYNCFILE1 detected\n')
   127         debug_stream.write(b'SIGPIPE-HELPER: SYNCFILE1 detected\n')
   160     with close_lock:
   128     stdout_worker.kill()
   161         if not stdout_reader.closed:
   129     stderr_worker.kill()
   162             stdout_reader.close()
   130     stdout_worker.wait(10)
   163         if not stderr_reader.closed:
   131     stderr_worker.wait(10)
   164             stderr_reader.close()
   132     debug_stream.write(b'SIGPIPE-HELPER: worker killed\n')
   165         sys.stdin.close()
   133 
   166         debug_stream.write(b'SIGPIPE-HELPER: pipes closed\n')
       
   167     debug_stream.write(b'SIGPIPE-HELPER: creating SYNCFILE2\n')
   134     debug_stream.write(b'SIGPIPE-HELPER: creating SYNCFILE2\n')
   168     write_file(sysbytes(SYNCFILE2))
   135     write_file(sysbytes(SYNCFILE2))
   169 finally:
   136 finally:
   170     debug_stream.write(b'SIGPIPE-HELPER: Shutting down\n')
   137     debug_stream.write(b'SIGPIPE-HELPER: Shutting down\n')
   171     shut_down.set()
       
   172     if not sys.stdin.closed:
   138     if not sys.stdin.closed:
   173         sys.stdin.close()
   139         sys.stdin.close()
   174     try:
   140     try:
   175         sub.wait(timeout=30)
   141         sub.wait(timeout=30)
   176     except subprocess.TimeoutExpired:
   142     except subprocess.TimeoutExpired:
   177         msg = b'SIGPIPE-HELPER: Server process failed to terminate\n'
   143         msg = b'SIGPIPE-HELPER: Server process failed to terminate\n'
   178         debug_stream.write(msg)
   144         debug_stream.write(msg)
       
   145         sub.kill()
       
   146         sub.wait()
       
   147         msg = b'SIGPIPE-HELPER: Server process killed\n'
   179     else:
   148     else:
   180         debug_stream.write(b'SIGPIPE-HELPER: Server process terminated\n')
   149         msg = b'SIGPIPE-HELPER: Server process terminated with status %d\n'
       
   150         msg %= sub.returncode
       
   151         debug_stream.write(msg)
   181     debug_stream.write(b'SIGPIPE-HELPER: Shut down\n')
   152     debug_stream.write(b'SIGPIPE-HELPER: Shut down\n')