# HG changeset patch # User Danny Hooper # Date 1531788737 25200 # Node ID 9e6afe7fca3181a3e48aa701e7607c915e988a63 # Parent 88be288e8ac1c1125d896d2c25a515a664b87228 worker: use one pipe per posix worker and select() in parent process This allows us to pass results larger than PIPE_BUF through the pipes without interleaving them. This is necessary now because "hg fix" sends file contents as the result from workers. Differential Revision: https://phab.mercurial-scm.org/D3960 diff -r 88be288e8ac1 -r 9e6afe7fca31 mercurial/worker.py --- a/mercurial/worker.py Sun Jan 28 13:20:52 2018 +0100 +++ b/mercurial/worker.py Mon Jul 16 17:52:17 2018 -0700 @@ -14,6 +14,12 @@ import threading import time +try: + import selectors + selectors.BaseSelector +except ImportError: + from .thirdparty import selectors2 as selectors + from .i18n import _ from . import ( encoding, @@ -89,7 +95,6 @@ return func(*staticargs + (args,)) def _posixworker(ui, func, staticargs, args): - rfd, wfd = os.pipe() workers = _numworkers(ui) oldhandler = signal.getsignal(signal.SIGINT) signal.signal(signal.SIGINT, signal.SIG_IGN) @@ -138,7 +143,15 @@ oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler) ui.flush() parentpid = os.getpid() + pipes = [] for pargs in partition(args, workers): + # Every worker gets its own pipe to send results on, so we don't have to + # implement atomic writes larger than PIPE_BUF. Each forked process has + # its own pipe's descriptors in the local variables, and the parent + # process has the full list of pipe descriptors (and it doesn't really + # care what order they're in). + rfd, wfd = os.pipe() + pipes.append((rfd, wfd)) # make sure we use os._exit in all worker code paths. otherwise the # worker may do some clean-ups which could cause surprises like # deadlock. see sshpeer.cleanup for example. @@ -154,6 +167,9 @@ signal.signal(signal.SIGCHLD, oldchldhandler) def workerfunc(): + for r, w in pipes[:-1]: + os.close(r) + os.close(w) os.close(rfd) for result in func(*(staticargs + (pargs,))): os.write(wfd, util.pickle.dumps(result)) @@ -175,8 +191,10 @@ finally: os._exit(ret & 255) pids.add(pid) - os.close(wfd) - fp = os.fdopen(rfd, r'rb', 0) + selector = selectors.DefaultSelector() + for rfd, wfd in pipes: + os.close(wfd) + selector.register(os.fdopen(rfd, r'rb', 0), selectors.EVENT_READ) def cleanup(): signal.signal(signal.SIGINT, oldhandler) waitforworkers() @@ -187,15 +205,19 @@ os.kill(os.getpid(), -status) sys.exit(status) try: - while True: - try: - yield util.pickle.load(fp) - except EOFError: - break - except IOError as e: - if e.errno == errno.EINTR: - continue - raise + openpipes = len(pipes) + while openpipes > 0: + for key, events in selector.select(): + try: + yield util.pickle.load(key.fileobj) + except EOFError: + selector.unregister(key.fileobj) + key.fileobj.close() + openpipes -= 1 + except IOError as e: + if e.errno == errno.EINTR: + continue + raise except: # re-raises killworkers() cleanup()