66 |
65 |
67 def sysstr(s): |
66 def sysstr(s): |
68 return s.decode('latin-1') |
67 return s.decode('latin-1') |
69 |
68 |
70 |
69 |
71 piped_stdout = os.pipe2(os.O_NONBLOCK | os.O_CLOEXEC) |
|
72 piped_stderr = os.pipe2(os.O_NONBLOCK | os.O_CLOEXEC) |
|
73 |
|
74 stdout_writer = os.fdopen(piped_stdout[1], "rb") |
|
75 stdout_reader = os.fdopen(piped_stdout[0], "rb") |
|
76 stderr_writer = os.fdopen(piped_stderr[1], "rb") |
|
77 stderr_reader = os.fdopen(piped_stderr[0], "rb") |
|
78 |
|
79 debug_stream.write(b'SIGPIPE-HELPER: Starting\n') |
70 debug_stream.write(b'SIGPIPE-HELPER: Starting\n') |
80 |
71 |
81 TESTLIB_DIR = os.path.dirname(sys.argv[0]) |
72 TESTLIB_DIR = os.path.dirname(sys.argv[0]) |
82 WAIT_SCRIPT = os.path.join(TESTLIB_DIR, 'wait-on-file') |
73 WAIT_SCRIPT = os.path.join(TESTLIB_DIR, 'wait-on-file') |
83 |
74 |
86 WAIT_SCRIPT, |
77 WAIT_SCRIPT, |
87 SYNCFILE2, |
78 SYNCFILE2, |
88 SYNCFILE1, |
79 SYNCFILE1, |
89 ) |
80 ) |
90 |
81 |
91 cmd = ['hg'] |
82 try: |
92 cmd += sys.argv[1:] |
83 cmd = ['hg'] |
93 sub = subprocess.Popen( |
84 cmd += sys.argv[1:] |
94 cmd, |
85 sub = subprocess.Popen( |
95 bufsize=0, |
86 cmd, |
96 close_fds=True, |
87 bufsize=0, |
97 stdin=sys.stdin, |
88 close_fds=True, |
98 stdout=stdout_writer, |
89 stdin=sys.stdin, |
99 stderr=stderr_writer, |
90 stdout=subprocess.PIPE, |
100 ) |
91 stderr=subprocess.PIPE, |
|
92 ) |
101 |
93 |
102 debug_stream.write(b'SIGPIPE-HELPER: Mercurial started\n') |
94 basedir = os.path.dirname(sys.argv[0]) |
|
95 worker = os.path.join(basedir, 'sigpipe-worker.py') |
103 |
96 |
|
97 cmd = [sys.executable, worker] |
104 |
98 |
105 shut_down = threading.Event() |
99 stdout_worker = subprocess.Popen( |
|
100 cmd, |
|
101 bufsize=0, |
|
102 close_fds=True, |
|
103 stdin=sub.stdout, |
|
104 stdout=sys.stdout, |
|
105 stderr=sys.stderr, |
|
106 ) |
106 |
107 |
107 close_lock = threading.Lock() |
108 stderr_worker = subprocess.Popen( |
108 |
109 cmd, |
109 |
110 bufsize=0, |
110 def _read(stream): |
111 close_fds=True, |
111 try: |
112 stdin=sub.stderr, |
112 return stream.read() |
113 stdout=sys.stderr, |
113 except ValueError: |
114 stderr=sys.stderr, |
114 # read on closed file |
115 ) |
115 return None |
|
116 |
|
117 |
|
118 def forward_stdout(): |
|
119 while not shut_down.is_set(): |
|
120 c = _read(stdout_reader) |
|
121 while c is not None: |
|
122 sys.stdout.buffer.write(c) |
|
123 c = _read(stdout_reader) |
|
124 time.sleep(0.001) |
|
125 with close_lock: |
|
126 if not stdout_reader.closed: |
|
127 stdout_reader.close() |
|
128 debug_stream.write(b'SIGPIPE-HELPER: stdout closed\n') |
|
129 |
|
130 |
|
131 def forward_stderr(): |
|
132 while not shut_down.is_set(): |
|
133 c = _read(stderr_reader) |
|
134 if c is not None: |
|
135 sys.stderr.buffer.write(c) |
|
136 c = _read(stderr_reader) |
|
137 time.sleep(0.001) |
|
138 with close_lock: |
|
139 if not stderr_reader.closed: |
|
140 stderr_reader.close() |
|
141 debug_stream.write(b'SIGPIPE-HELPER: stderr closed\n') |
|
142 |
|
143 |
|
144 stdout_thread = threading.Thread(target=forward_stdout, daemon=True) |
|
145 stderr_thread = threading.Thread(target=forward_stderr, daemon=True) |
|
146 |
|
147 try: |
|
148 stdout_thread.start() |
|
149 stderr_thread.start() |
|
150 |
|
151 debug_stream.write(b'SIGPIPE-HELPER: Redirection in place\n') |
116 debug_stream.write(b'SIGPIPE-HELPER: Redirection in place\n') |
|
117 os.close(sub.stdout.fileno()) |
|
118 os.close(sub.stderr.fileno()) |
|
119 debug_stream.write(b'SIGPIPE-HELPER: pipes closed in main\n') |
152 |
120 |
153 try: |
121 try: |
154 wait_file(sysbytes(SYNCFILE1)) |
122 wait_file(sysbytes(SYNCFILE1)) |
155 except RuntimeError as exc: |
123 except RuntimeError as exc: |
156 msg = sysbytes(str(exc)) |
124 msg = sysbytes(str(exc)) |
157 debug_stream.write(b'SIGPIPE-HELPER: wait failed: %s\n' % msg) |
125 debug_stream.write(b'SIGPIPE-HELPER: wait failed: %s\n' % msg) |
158 else: |
126 else: |
159 debug_stream.write(b'SIGPIPE-HELPER: SYNCFILE1 detected\n') |
127 debug_stream.write(b'SIGPIPE-HELPER: SYNCFILE1 detected\n') |
160 with close_lock: |
128 stdout_worker.kill() |
161 if not stdout_reader.closed: |
129 stderr_worker.kill() |
162 stdout_reader.close() |
130 stdout_worker.wait(10) |
163 if not stderr_reader.closed: |
131 stderr_worker.wait(10) |
164 stderr_reader.close() |
132 debug_stream.write(b'SIGPIPE-HELPER: worker killed\n') |
165 sys.stdin.close() |
133 |
166 debug_stream.write(b'SIGPIPE-HELPER: pipes closed\n') |
|
167 debug_stream.write(b'SIGPIPE-HELPER: creating SYNCFILE2\n') |
134 debug_stream.write(b'SIGPIPE-HELPER: creating SYNCFILE2\n') |
168 write_file(sysbytes(SYNCFILE2)) |
135 write_file(sysbytes(SYNCFILE2)) |
169 finally: |
136 finally: |
170 debug_stream.write(b'SIGPIPE-HELPER: Shutting down\n') |
137 debug_stream.write(b'SIGPIPE-HELPER: Shutting down\n') |
171 shut_down.set() |
|
172 if not sys.stdin.closed: |
138 if not sys.stdin.closed: |
173 sys.stdin.close() |
139 sys.stdin.close() |
174 try: |
140 try: |
175 sub.wait(timeout=30) |
141 sub.wait(timeout=30) |
176 except subprocess.TimeoutExpired: |
142 except subprocess.TimeoutExpired: |
177 msg = b'SIGPIPE-HELPER: Server process failed to terminate\n' |
143 msg = b'SIGPIPE-HELPER: Server process failed to terminate\n' |
178 debug_stream.write(msg) |
144 debug_stream.write(msg) |
|
145 sub.kill() |
|
146 sub.wait() |
|
147 msg = b'SIGPIPE-HELPER: Server process killed\n' |
179 else: |
148 else: |
180 debug_stream.write(b'SIGPIPE-HELPER: Server process terminated\n') |
149 msg = b'SIGPIPE-HELPER: Server process terminated with status %d\n' |
|
150 msg %= sub.returncode |
|
151 debug_stream.write(msg) |
181 debug_stream.write(b'SIGPIPE-HELPER: Shut down\n') |
152 debug_stream.write(b'SIGPIPE-HELPER: Shut down\n') |