worker: rewrite error handling so os._exit covers all cases
authorJun Wu <quark@fb.com>
Sat, 22 Apr 2017 16:50:08 -0700
changeset 32112 31763785094b
parent 32111 1208b74841ff
child 32113 9f0c055eebae
worker: rewrite error handling so os._exit covers all cases Previously the worker error handling is like: pid = os.fork() --+ if pid == 0: | .... | problematic .... --+ try: --+ .... | worker error handling --+ If a signal arrives when Python is executing the "problematic" lines, an external error handling (dispatch.py) will take over the control flow and it's no longer guaranteed "os._exit" is called (see 86cd09bc13ba for why it is necessary). This patch rewrites the error handling so it covers all possible code paths for a worker even during fork. Note: "os.getpid() == parentpid" is used to test if the process is parent or not intentionally, instead of checking "pid", because "pid = os.fork()" may be not atomic - it's possible that that a signal hits the worker before the assignment completes [1]. The newly added test replaces "os.fork" to exercise that extreme case. [1]: CPython compiles "pid = os.fork()" to 2 byte codes: "CALL_FUNCTION" and "STORE_FAST", so it's probably not atomic: def f(): pid = os.fork() dis.dis(f) 2 0 LOAD_GLOBAL 0 (os) 3 LOAD_ATTR 1 (fork) 6 CALL_FUNCTION 0 9 STORE_FAST 0 (pid) 12 LOAD_CONST 0 (None) 15 RETURN_VALUE
mercurial/worker.py
tests/test-worker.t
--- a/mercurial/worker.py	Sat Apr 22 15:00:17 2017 -0700
+++ b/mercurial/worker.py	Sat Apr 22 16:50:08 2017 -0700
@@ -134,37 +134,43 @@
             killworkers()
     oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
     ui.flush()
+    parentpid = os.getpid()
     for pargs in partition(args, workers):
-        pid = os.fork()
-        if pid == 0:
-            signal.signal(signal.SIGINT, oldhandler)
-            signal.signal(signal.SIGCHLD, oldchldhandler)
-
-            def workerfunc():
-                os.close(rfd)
-                for i, item in func(*(staticargs + (pargs,))):
-                    os.write(wfd, '%d %s\n' % (i, item))
-                return 0
+        # make sure we use os._exit in all worker code paths. otherwise the
+        # worker may do some clean-ups which could cause surprises like
+        # deadlock. see sshpeer.cleanup for example.
+        # override error handling *before* fork. this is necessary because
+        # exception (signal) may arrive after fork, before "pid =" assignment
+        # completes, and other exception handler (dispatch.py) can lead to
+        # unexpected code path without os._exit.
+        ret = -1
+        try:
+            pid = os.fork()
+            if pid == 0:
+                signal.signal(signal.SIGINT, oldhandler)
+                signal.signal(signal.SIGCHLD, oldchldhandler)
 
-            # make sure we use os._exit in all code paths. otherwise the worker
-            # may do some clean-ups which could cause surprises like deadlock.
-            # see sshpeer.cleanup for example.
-            ret = 0
-            try:
+                def workerfunc():
+                    os.close(rfd)
+                    for i, item in func(*(staticargs + (pargs,))):
+                        os.write(wfd, '%d %s\n' % (i, item))
+                    return 0
+
+                ret = scmutil.callcatch(ui, workerfunc)
+        except: # parent re-raises, child never returns
+            if os.getpid() == parentpid:
+                raise
+            exctype = sys.exc_info()[0]
+            force = not issubclass(exctype, KeyboardInterrupt)
+            ui.traceback(force=force)
+        finally:
+            if os.getpid() != parentpid:
                 try:
-                    ret = scmutil.callcatch(ui, workerfunc)
-                finally:
                     ui.flush()
-            except KeyboardInterrupt:
-                os._exit(255)
-            except: # never return, therefore no re-raises
-                try:
-                    ui.traceback(force=True)
-                    ui.flush()
+                except: # never returns, no re-raises
+                    pass
                 finally:
-                    os._exit(255)
-            else:
-                os._exit(ret & 255)
+                    os._exit(ret & 255)
         pids.add(pid)
     os.close(wfd)
     fp = os.fdopen(rfd, pycompat.sysstr('rb'), 0)
--- a/tests/test-worker.t	Sat Apr 22 15:00:17 2017 -0700
+++ b/tests/test-worker.t	Sat Apr 22 16:50:08 2017 -0700
@@ -91,4 +91,36 @@
   > test 100000.0 exc 2>&1 | grep '^Traceback'
   Traceback (most recent call last):
 
+Workers should not do cleanups in all cases
+
+  $ cat > $TESTTMP/detectcleanup.py <<EOF
+  > from __future__ import absolute_import
+  > import atexit
+  > import os
+  > import time
+  > oldfork = os.fork
+  > count = 0
+  > parentpid = os.getpid()
+  > def delayedfork():
+  >     global count
+  >     count += 1
+  >     pid = oldfork()
+  >     # make it easier to test SIGTERM hitting other workers when they have
+  >     # not set up error handling yet.
+  >     if count > 1 and pid == 0:
+  >         time.sleep(0.1)
+  >     return pid
+  > os.fork = delayedfork
+  > def cleanup():
+  >     if os.getpid() != parentpid:
+  >         os.write(1, 'should never happen\n')
+  > atexit.register(cleanup)
+  > EOF
+
+  $ hg --config "extensions.t=$abspath" --config worker.numcpus=8 --config \
+  > "extensions.d=$TESTTMP/detectcleanup.py" test 100000 abort
+  start
+  abort: known exception
+  [255]
+
 #endif