changeset 38729:9e6afe7fca31

worker: use one pipe per posix worker and select() in parent process This allows us to pass results larger than PIPE_BUF through the pipes without interleaving them. This is necessary now because "hg fix" sends file contents as the result from workers. Differential Revision: https://phab.mercurial-scm.org/D3960
author Danny Hooper <hooper@google.com>
date Mon, 16 Jul 2018 17:52:17 -0700
parents 88be288e8ac1
children 69ed2cff4277
files mercurial/worker.py
diffstat 1 files changed, 34 insertions(+), 12 deletions(-) [+]
line wrap: on
line diff
--- a/mercurial/worker.py	Sun Jan 28 13:20:52 2018 +0100
+++ b/mercurial/worker.py	Mon Jul 16 17:52:17 2018 -0700
@@ -14,6 +14,12 @@
 import threading
 import time
 
+try:
+    import selectors
+    selectors.BaseSelector
+except ImportError:
+    from .thirdparty import selectors2 as selectors
+
 from .i18n import _
 from . import (
     encoding,
@@ -89,7 +95,6 @@
     return func(*staticargs + (args,))
 
 def _posixworker(ui, func, staticargs, args):
-    rfd, wfd = os.pipe()
     workers = _numworkers(ui)
     oldhandler = signal.getsignal(signal.SIGINT)
     signal.signal(signal.SIGINT, signal.SIG_IGN)
@@ -138,7 +143,15 @@
     oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
     ui.flush()
     parentpid = os.getpid()
+    pipes = []
     for pargs in partition(args, workers):
+        # Every worker gets its own pipe to send results on, so we don't have to
+        # implement atomic writes larger than PIPE_BUF. Each forked process has
+        # its own pipe's descriptors in the local variables, and the parent
+        # process has the full list of pipe descriptors (and it doesn't really
+        # care what order they're in).
+        rfd, wfd = os.pipe()
+        pipes.append((rfd, wfd))
         # make sure we use os._exit in all worker code paths. otherwise the
         # worker may do some clean-ups which could cause surprises like
         # deadlock. see sshpeer.cleanup for example.
@@ -154,6 +167,9 @@
                 signal.signal(signal.SIGCHLD, oldchldhandler)
 
                 def workerfunc():
+                    for r, w in pipes[:-1]:
+                        os.close(r)
+                        os.close(w)
                     os.close(rfd)
                     for result in func(*(staticargs + (pargs,))):
                         os.write(wfd, util.pickle.dumps(result))
@@ -175,8 +191,10 @@
                 finally:
                     os._exit(ret & 255)
         pids.add(pid)
-    os.close(wfd)
-    fp = os.fdopen(rfd, r'rb', 0)
+    selector = selectors.DefaultSelector()
+    for rfd, wfd in pipes:
+        os.close(wfd)
+        selector.register(os.fdopen(rfd, r'rb', 0), selectors.EVENT_READ)
     def cleanup():
         signal.signal(signal.SIGINT, oldhandler)
         waitforworkers()
@@ -187,15 +205,19 @@
                 os.kill(os.getpid(), -status)
             sys.exit(status)
     try:
-        while True:
-            try:
-                yield util.pickle.load(fp)
-            except EOFError:
-                break
-            except IOError as e:
-                if e.errno == errno.EINTR:
-                    continue
-                raise
+        openpipes = len(pipes)
+        while openpipes > 0:
+            for key, events in selector.select():
+                try:
+                    yield util.pickle.load(key.fileobj)
+                except EOFError:
+                    selector.unregister(key.fileobj)
+                    key.fileobj.close()
+                    openpipes -= 1
+                except IOError as e:
+                    if e.errno == errno.EINTR:
+                        continue
+                    raise
     except: # re-raises
         killworkers()
         cleanup()