Mercurial > hg-stable
view mercurial/worker.py @ 28181:f8efc8a3a991
worker: change partition strategy to every Nth element
The only consumer of the worker pool code today is `hg update`.
Previously, the algorithm to partition work to each worker process
preserved input list ordering. We'd take the first N elements, then
the next N elements, etc. Measurements on mozilla-central demonstrate
this isn't an optimal partitioning strategy.
I added debug code to print when workers were exiting. When performing
a working copy update on a previously empty working copy of
mozilla-central, I noticed that process lifetimes were all over the
map. One worker would complete after 7s. Many would complete after
12s. And another worker would often take >16s. This behavior occurred
for many worker process counts and was more pronounced on some than
others.
What I suspect is happening is some workers end up with lots of
small files and others with large files. This is because the update
code passes in actions according to sorted filenames. And, directories
under tend to accumulate similar files. For example, test directories
often consist of many small test files and media directories contain
binary (often larger) media files.
This patch changes the partitioning algorithm to select every Nth
element from the input list. Each worker thus has a similar composition
of files to operate on.
The result of this change is that worker processes now all tend to exit
around the same time. The possibility of a long pole due to being
unlucky and receiving all the large files has been mitigated. Overall
execution time seems to drop, but not by a statistically significant
amount on mozilla-central. However, repositories with directories
containing many large files will likely show a drop.
There shouldn't be any regressions due to partial manifest decoding
because the update code already iterates the manifest to determine
what files to operate on, so the manifest should already be decoded.
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Sat, 20 Feb 2016 15:56:44 -0800 |
parents | 56b2bcea2529 |
children | 3eb7faf6d958 |
line wrap: on
line source
# worker.py - master-slave parallelism support # # Copyright 2013 Facebook, Inc. # # This software may be used and distributed according to the terms of the # GNU General Public License version 2 or any later version. from __future__ import absolute_import import errno import os import signal import sys import threading from .i18n import _ from . import error def countcpus(): '''try to count the number of CPUs on the system''' # posix try: n = int(os.sysconf('SC_NPROCESSORS_ONLN')) if n > 0: return n except (AttributeError, ValueError): pass # windows try: n = int(os.environ['NUMBER_OF_PROCESSORS']) if n > 0: return n except (KeyError, ValueError): pass return 1 def _numworkers(ui): s = ui.config('worker', 'numcpus') if s: try: n = int(s) if n >= 1: return n except ValueError: raise error.Abort(_('number of cpus must be an integer')) return min(max(countcpus(), 4), 32) if os.name == 'posix': _startupcost = 0.01 else: _startupcost = 1e30 def worthwhile(ui, costperop, nops): '''try to determine whether the benefit of multiple processes can outweigh the cost of starting them''' linear = costperop * nops workers = _numworkers(ui) benefit = linear - (_startupcost * workers + linear / workers) return benefit >= 0.15 def worker(ui, costperarg, func, staticargs, args): '''run a function, possibly in parallel in multiple worker processes. returns a progress iterator costperarg - cost of a single task func - function to run staticargs - arguments to pass to every invocation of the function args - arguments to split into chunks, to pass to individual workers ''' if worthwhile(ui, costperarg, len(args)): return _platformworker(ui, func, staticargs, args) return func(*staticargs + (args,)) def _posixworker(ui, func, staticargs, args): rfd, wfd = os.pipe() workers = _numworkers(ui) oldhandler = signal.getsignal(signal.SIGINT) signal.signal(signal.SIGINT, signal.SIG_IGN) pids, problem = [], [0] for pargs in partition(args, workers): pid = os.fork() if pid == 0: signal.signal(signal.SIGINT, oldhandler) try: os.close(rfd) for i, item in func(*(staticargs + (pargs,))): os.write(wfd, '%d %s\n' % (i, item)) os._exit(0) except KeyboardInterrupt: os._exit(255) # other exceptions are allowed to propagate, we rely # on lock.py's pid checks to avoid release callbacks pids.append(pid) pids.reverse() os.close(wfd) fp = os.fdopen(rfd, 'rb', 0) def killworkers(): # if one worker bails, there's no good reason to wait for the rest for p in pids: try: os.kill(p, signal.SIGTERM) except OSError as err: if err.errno != errno.ESRCH: raise def waitforworkers(): for _pid in pids: st = _exitstatus(os.wait()[1]) if st and not problem[0]: problem[0] = st killworkers() t = threading.Thread(target=waitforworkers) t.start() def cleanup(): signal.signal(signal.SIGINT, oldhandler) t.join() status = problem[0] if status: if status < 0: os.kill(os.getpid(), -status) sys.exit(status) try: for line in fp: l = line.split(' ', 1) yield int(l[0]), l[1][:-1] except: # re-raises killworkers() cleanup() raise cleanup() def _posixexitstatus(code): '''convert a posix exit status into the same form returned by os.spawnv returns None if the process was stopped instead of exiting''' if os.WIFEXITED(code): return os.WEXITSTATUS(code) elif os.WIFSIGNALED(code): return -os.WTERMSIG(code) if os.name != 'nt': _platformworker = _posixworker _exitstatus = _posixexitstatus def partition(lst, nslices): '''partition a list into N slices of roughly equal size The current strategy takes every Nth element from the input. If we ever write workers that need to preserve grouping in input we should consider allowing callers to specify a partition strategy. ''' for i in range(nslices): yield lst[i::nslices]