Mercurial > hg
changeset 18638:047110c0e2a8
worker: allow a function to be run in multiple worker processes
If we estimate that it will be worth the cost, we run the function in
multiple processes. Otherwise, we run it in-process.
Children report progress to the parent through a pipe.
Not yet implemented on Windows.
author | Bryan O'Sullivan <bryano@fb.com> |
---|---|
date | Sat, 09 Feb 2013 15:51:32 -0800 |
parents | ac4dbceeb14a |
children | 5774732bb5e5 |
files | mercurial/worker.py |
diffstat | 1 files changed, 57 insertions(+), 1 deletions(-) [+] |
line wrap: on
line diff
--- a/mercurial/worker.py Sat Feb 09 15:51:32 2013 -0800 +++ b/mercurial/worker.py Sat Feb 09 15:51:32 2013 -0800 @@ -6,7 +6,7 @@ # GNU General Public License version 2 or any later version. from i18n import _ -import os, util +import os, signal, sys, util def countcpus(): '''try to count the number of CPUs on the system''' @@ -53,6 +53,62 @@ benefit = linear - (_startupcost * workers + linear / workers) return benefit >= 0.15 +def worker(ui, costperarg, func, staticargs, args): + '''run a function, possibly in parallel in multiple worker + processes. + + returns a progress iterator + + costperarg - cost of a single task + + func - function to run + + staticargs - arguments to pass to every invocation of the function + + args - arguments to split into chunks, to pass to individual + workers + ''' + if worthwhile(ui, costperarg, len(args)): + return _platformworker(ui, func, staticargs, args) + return func(*staticargs + (args,)) + +def _posixworker(ui, func, staticargs, args): + rfd, wfd = os.pipe() + workers = _numworkers(ui) + for pargs in partition(args, workers): + pid = os.fork() + if pid == 0: + try: + os.close(rfd) + for i, item in func(*(staticargs + (pargs,))): + os.write(wfd, '%d %s\n' % (i, item)) + os._exit(0) + except KeyboardInterrupt: + os._exit(255) + os.close(wfd) + fp = os.fdopen(rfd, 'rb', 0) + oldhandler = signal.getsignal(signal.SIGINT) + signal.signal(signal.SIGINT, signal.SIG_IGN) + def cleanup(): + # python 2.4 is too dumb for try/yield/finally + signal.signal(signal.SIGINT, oldhandler) + problems = 0 + for i in xrange(workers): + problems |= os.wait()[1] + if problems: + sys.exit(1) + try: + for line in fp: + l = line.split(' ', 1) + yield int(l[0]), l[1][:-1] + except: # re-raises + cleanup() + raise + cleanup() + +if os.name != 'nt': + _platformworker = _posixworker + def partition(lst, nslices): '''partition a list into N slices of equal size''' n = len(lst)