Mercurial > hg
view mercurial/worker.py @ 26402:05871262acd5
treemanifest: rework lazy-copying code (issue4840)
The old lazy-copy code formed a chain of copied manifests with each
copy. Under typical operation, the stack never got more than a couple
of manifests deep and was fine. Under conditions like hgsubversion or
convert, the stack could get hundreds of manifests deep, and
eventually overflow the recursion limit for Python. I was able to
consistently reproduce this by converting an hgsubversion clone of
svn's history to treemanifests.
This may result in fewer manifests staying in memory during operations
like convert when treemanifests are in use, and should make those
operations faster since there will be significantly fewer noop
function calls going on.
A previous attempt (never mailed) of mine to fix this problem tried to
simply have all treemanifests only have a loadfunc - that caused
somewhat weird problems because the gettext() callable passed into
read() wasn't idempotent, so the easy solution is to have a loadfunc
and a copyfunc.
author | Augie Fackler <augie@google.com> |
---|---|
date | Fri, 25 Sep 2015 22:54:46 -0400 |
parents | d29859cfcfc2 |
children | c0501c26b05c |
line wrap: on
line source
# worker.py - master-slave parallelism support # # Copyright 2013 Facebook, Inc. # # This software may be used and distributed according to the terms of the # GNU General Public License version 2 or any later version. from __future__ import absolute_import import errno import multiprocessing import os import signal import sys import threading from .i18n import _ from . import util def countcpus(): '''try to count the number of CPUs on the system''' try: return multiprocessing.cpu_count() except NotImplementedError: return 1 def _numworkers(ui): s = ui.config('worker', 'numcpus') if s: try: n = int(s) if n >= 1: return n except ValueError: raise util.Abort(_('number of cpus must be an integer')) return min(max(countcpus(), 4), 32) if os.name == 'posix': _startupcost = 0.01 else: _startupcost = 1e30 def worthwhile(ui, costperop, nops): '''try to determine whether the benefit of multiple processes can outweigh the cost of starting them''' linear = costperop * nops workers = _numworkers(ui) benefit = linear - (_startupcost * workers + linear / workers) return benefit >= 0.15 def worker(ui, costperarg, func, staticargs, args): '''run a function, possibly in parallel in multiple worker processes. returns a progress iterator costperarg - cost of a single task func - function to run staticargs - arguments to pass to every invocation of the function args - arguments to split into chunks, to pass to individual workers ''' if worthwhile(ui, costperarg, len(args)): return _platformworker(ui, func, staticargs, args) return func(*staticargs + (args,)) def _posixworker(ui, func, staticargs, args): rfd, wfd = os.pipe() workers = _numworkers(ui) oldhandler = signal.getsignal(signal.SIGINT) signal.signal(signal.SIGINT, signal.SIG_IGN) pids, problem = [], [0] for pargs in partition(args, workers): pid = os.fork() if pid == 0: signal.signal(signal.SIGINT, oldhandler) try: os.close(rfd) for i, item in func(*(staticargs + (pargs,))): os.write(wfd, '%d %s\n' % (i, item)) os._exit(0) except KeyboardInterrupt: os._exit(255) # other exceptions are allowed to propagate, we rely # on lock.py's pid checks to avoid release callbacks pids.append(pid) pids.reverse() os.close(wfd) fp = os.fdopen(rfd, 'rb', 0) def killworkers(): # if one worker bails, there's no good reason to wait for the rest for p in pids: try: os.kill(p, signal.SIGTERM) except OSError as err: if err.errno != errno.ESRCH: raise def waitforworkers(): for _pid in pids: st = _exitstatus(os.wait()[1]) if st and not problem[0]: problem[0] = st killworkers() t = threading.Thread(target=waitforworkers) t.start() def cleanup(): signal.signal(signal.SIGINT, oldhandler) t.join() status = problem[0] if status: if status < 0: os.kill(os.getpid(), -status) sys.exit(status) try: for line in fp: l = line.split(' ', 1) yield int(l[0]), l[1][:-1] except: # re-raises killworkers() cleanup() raise cleanup() def _posixexitstatus(code): '''convert a posix exit status into the same form returned by os.spawnv returns None if the process was stopped instead of exiting''' if os.WIFEXITED(code): return os.WEXITSTATUS(code) elif os.WIFSIGNALED(code): return -os.WTERMSIG(code) if os.name != 'nt': _platformworker = _posixworker _exitstatus = _posixexitstatus def partition(lst, nslices): '''partition a list into N slices of equal size''' n = len(lst) chunk, slop = n / nslices, n % nslices end = 0 for i in xrange(nslices): start = end end = start + chunk if slop: end += 1 slop -= 1 yield lst[start:end]