changeset 18638:047110c0e2a8

worker: allow a function to be run in multiple worker processes If we estimate that it will be worth the cost, we run the function in multiple processes. Otherwise, we run it in-process. Children report progress to the parent through a pipe. Not yet implemented on Windows.
author Bryan O'Sullivan <bryano@fb.com>
date Sat, 09 Feb 2013 15:51:32 -0800
parents ac4dbceeb14a
children 5774732bb5e5
files mercurial/worker.py
diffstat 1 files changed, 57 insertions(+), 1 deletions(-) [+]
line wrap: on
line diff
--- a/mercurial/worker.py	Sat Feb 09 15:51:32 2013 -0800
+++ b/mercurial/worker.py	Sat Feb 09 15:51:32 2013 -0800
@@ -6,7 +6,7 @@
 # GNU General Public License version 2 or any later version.
 
 from i18n import _
-import os, util
+import os, signal, sys, util
 
 def countcpus():
     '''try to count the number of CPUs on the system'''
@@ -53,6 +53,62 @@
     benefit = linear - (_startupcost * workers + linear / workers)
     return benefit >= 0.15
 
+def worker(ui, costperarg, func, staticargs, args):
+    '''run a function, possibly in parallel in multiple worker
+    processes.
+
+    returns a progress iterator
+
+    costperarg - cost of a single task
+
+    func - function to run
+
+    staticargs - arguments to pass to every invocation of the function
+
+    args - arguments to split into chunks, to pass to individual
+    workers
+    '''
+    if worthwhile(ui, costperarg, len(args)):
+        return _platformworker(ui, func, staticargs, args)
+    return func(*staticargs + (args,))
+
+def _posixworker(ui, func, staticargs, args):
+    rfd, wfd = os.pipe()
+    workers = _numworkers(ui)
+    for pargs in partition(args, workers):
+        pid = os.fork()
+        if pid == 0:
+            try:
+                os.close(rfd)
+                for i, item in func(*(staticargs + (pargs,))):
+                    os.write(wfd, '%d %s\n' % (i, item))
+                os._exit(0)
+            except KeyboardInterrupt:
+                os._exit(255)
+    os.close(wfd)
+    fp = os.fdopen(rfd, 'rb', 0)
+    oldhandler = signal.getsignal(signal.SIGINT)
+    signal.signal(signal.SIGINT, signal.SIG_IGN)
+    def cleanup():
+        # python 2.4 is too dumb for try/yield/finally
+        signal.signal(signal.SIGINT, oldhandler)
+        problems = 0
+        for i in xrange(workers):
+            problems |= os.wait()[1]
+        if problems:
+            sys.exit(1)
+    try:
+        for line in fp:
+            l = line.split(' ', 1)
+            yield int(l[0]), l[1][:-1]
+    except: # re-raises
+        cleanup()
+        raise
+    cleanup()
+
+if os.name != 'nt':
+    _platformworker = _posixworker
+
 def partition(lst, nslices):
     '''partition a list into N slices of equal size'''
     n = len(lst)