worker: use one pipe per posix worker and select() in parent process
This allows us to pass results larger than PIPE_BUF through the pipes without
interleaving them. This is necessary now because "hg fix" sends file contents
as the result from workers.
Differential Revision: https://phab.mercurial-scm.org/D3960
--- a/mercurial/worker.py Sun Jan 28 13:20:52 2018 +0100
+++ b/mercurial/worker.py Mon Jul 16 17:52:17 2018 -0700
@@ -14,6 +14,12 @@
import threading
import time
+try:
+ import selectors
+ selectors.BaseSelector
+except ImportError:
+ from .thirdparty import selectors2 as selectors
+
from .i18n import _
from . import (
encoding,
@@ -89,7 +95,6 @@
return func(*staticargs + (args,))
def _posixworker(ui, func, staticargs, args):
- rfd, wfd = os.pipe()
workers = _numworkers(ui)
oldhandler = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGINT, signal.SIG_IGN)
@@ -138,7 +143,15 @@
oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
ui.flush()
parentpid = os.getpid()
+ pipes = []
for pargs in partition(args, workers):
+ # Every worker gets its own pipe to send results on, so we don't have to
+ # implement atomic writes larger than PIPE_BUF. Each forked process has
+ # its own pipe's descriptors in the local variables, and the parent
+ # process has the full list of pipe descriptors (and it doesn't really
+ # care what order they're in).
+ rfd, wfd = os.pipe()
+ pipes.append((rfd, wfd))
# make sure we use os._exit in all worker code paths. otherwise the
# worker may do some clean-ups which could cause surprises like
# deadlock. see sshpeer.cleanup for example.
@@ -154,6 +167,9 @@
signal.signal(signal.SIGCHLD, oldchldhandler)
def workerfunc():
+ for r, w in pipes[:-1]:
+ os.close(r)
+ os.close(w)
os.close(rfd)
for result in func(*(staticargs + (pargs,))):
os.write(wfd, util.pickle.dumps(result))
@@ -175,8 +191,10 @@
finally:
os._exit(ret & 255)
pids.add(pid)
- os.close(wfd)
- fp = os.fdopen(rfd, r'rb', 0)
+ selector = selectors.DefaultSelector()
+ for rfd, wfd in pipes:
+ os.close(wfd)
+ selector.register(os.fdopen(rfd, r'rb', 0), selectors.EVENT_READ)
def cleanup():
signal.signal(signal.SIGINT, oldhandler)
waitforworkers()
@@ -187,15 +205,19 @@
os.kill(os.getpid(), -status)
sys.exit(status)
try:
- while True:
- try:
- yield util.pickle.load(fp)
- except EOFError:
- break
- except IOError as e:
- if e.errno == errno.EINTR:
- continue
- raise
+ openpipes = len(pipes)
+ while openpipes > 0:
+ for key, events in selector.select():
+ try:
+ yield util.pickle.load(key.fileobj)
+ except EOFError:
+ selector.unregister(key.fileobj)
+ key.fileobj.close()
+ openpipes -= 1
+ except IOError as e:
+ if e.errno == errno.EINTR:
+ continue
+ raise
except: # re-raises
killworkers()
cleanup()