Mercurial > hg-stable
changeset 32166:31763785094b
worker: rewrite error handling so os._exit covers all cases
Previously the worker error handling is like:
pid = os.fork() --+
if pid == 0: |
.... | problematic
.... --+
try: --+
.... | worker error handling
--+
If a signal arrives when Python is executing the "problematic" lines, an
external error handling (dispatch.py) will take over the control flow and
it's no longer guaranteed "os._exit" is called (see 86cd09bc13ba for why it
is necessary).
This patch rewrites the error handling so it covers all possible code paths
for a worker even during fork.
Note: "os.getpid() == parentpid" is used to test if the process is parent or
not intentionally, instead of checking "pid", because "pid = os.fork()" may
be not atomic - it's possible that that a signal hits the worker before the
assignment completes [1]. The newly added test replaces "os.fork" to
exercise that extreme case.
[1]: CPython compiles "pid = os.fork()" to 2 byte codes: "CALL_FUNCTION" and
"STORE_FAST", so it's probably not atomic:
def f():
pid = os.fork()
dis.dis(f)
2 0 LOAD_GLOBAL 0 (os)
3 LOAD_ATTR 1 (fork)
6 CALL_FUNCTION 0
9 STORE_FAST 0 (pid)
12 LOAD_CONST 0 (None)
15 RETURN_VALUE
author | Jun Wu <quark@fb.com> |
---|---|
date | Sat, 22 Apr 2017 16:50:08 -0700 |
parents | 1208b74841ff |
children | 9f0c055eebae |
files | mercurial/worker.py tests/test-worker.t |
diffstat | 2 files changed, 64 insertions(+), 26 deletions(-) [+] |
line wrap: on
line diff
--- a/mercurial/worker.py Sat Apr 22 15:00:17 2017 -0700 +++ b/mercurial/worker.py Sat Apr 22 16:50:08 2017 -0700 @@ -134,37 +134,43 @@ killworkers() oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler) ui.flush() + parentpid = os.getpid() for pargs in partition(args, workers): - pid = os.fork() - if pid == 0: - signal.signal(signal.SIGINT, oldhandler) - signal.signal(signal.SIGCHLD, oldchldhandler) - - def workerfunc(): - os.close(rfd) - for i, item in func(*(staticargs + (pargs,))): - os.write(wfd, '%d %s\n' % (i, item)) - return 0 + # make sure we use os._exit in all worker code paths. otherwise the + # worker may do some clean-ups which could cause surprises like + # deadlock. see sshpeer.cleanup for example. + # override error handling *before* fork. this is necessary because + # exception (signal) may arrive after fork, before "pid =" assignment + # completes, and other exception handler (dispatch.py) can lead to + # unexpected code path without os._exit. + ret = -1 + try: + pid = os.fork() + if pid == 0: + signal.signal(signal.SIGINT, oldhandler) + signal.signal(signal.SIGCHLD, oldchldhandler) - # make sure we use os._exit in all code paths. otherwise the worker - # may do some clean-ups which could cause surprises like deadlock. - # see sshpeer.cleanup for example. - ret = 0 - try: + def workerfunc(): + os.close(rfd) + for i, item in func(*(staticargs + (pargs,))): + os.write(wfd, '%d %s\n' % (i, item)) + return 0 + + ret = scmutil.callcatch(ui, workerfunc) + except: # parent re-raises, child never returns + if os.getpid() == parentpid: + raise + exctype = sys.exc_info()[0] + force = not issubclass(exctype, KeyboardInterrupt) + ui.traceback(force=force) + finally: + if os.getpid() != parentpid: try: - ret = scmutil.callcatch(ui, workerfunc) - finally: ui.flush() - except KeyboardInterrupt: - os._exit(255) - except: # never return, therefore no re-raises - try: - ui.traceback(force=True) - ui.flush() + except: # never returns, no re-raises + pass finally: - os._exit(255) - else: - os._exit(ret & 255) + os._exit(ret & 255) pids.add(pid) os.close(wfd) fp = os.fdopen(rfd, pycompat.sysstr('rb'), 0)
--- a/tests/test-worker.t Sat Apr 22 15:00:17 2017 -0700 +++ b/tests/test-worker.t Sat Apr 22 16:50:08 2017 -0700 @@ -91,4 +91,36 @@ > test 100000.0 exc 2>&1 | grep '^Traceback' Traceback (most recent call last): +Workers should not do cleanups in all cases + + $ cat > $TESTTMP/detectcleanup.py <<EOF + > from __future__ import absolute_import + > import atexit + > import os + > import time + > oldfork = os.fork + > count = 0 + > parentpid = os.getpid() + > def delayedfork(): + > global count + > count += 1 + > pid = oldfork() + > # make it easier to test SIGTERM hitting other workers when they have + > # not set up error handling yet. + > if count > 1 and pid == 0: + > time.sleep(0.1) + > return pid + > os.fork = delayedfork + > def cleanup(): + > if os.getpid() != parentpid: + > os.write(1, 'should never happen\n') + > atexit.register(cleanup) + > EOF + + $ hg --config "extensions.t=$abspath" --config worker.numcpus=8 --config \ + > "extensions.d=$TESTTMP/detectcleanup.py" test 100000 abort + start + abort: known exception + [255] + #endif