view mercurial/worker.py @ 18639:5774732bb5e5

merge: apply non-interactive working dir updates in parallel This has a big effect on the performance of working dir updates. Here are the results of update from null to the given rev in several repos, on a Linux 3.2 system with 32 cores running ext4, with the progress extension enabled. repo rev plain parallel speedup hg 7068089c95a2 0.9 0.3 3 mozilla-central fe1600b22c77 42.8 7.7 5.5 linux-2.6 9ef4b770e069 31.4 4.9 6.4
author Bryan O'Sullivan <bryano@fb.com>
date Sat, 09 Feb 2013 15:51:32 -0800
parents 047110c0e2a8
children d1a2b086d058
line wrap: on
line source

# worker.py - master-slave parallelism support
#
# Copyright 2013 Facebook, Inc.
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.

from i18n import _
import os, signal, sys, util

def countcpus():
    '''try to count the number of CPUs on the system'''

    # posix
    try:
        n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
        if n > 0:
            return n
    except (AttributeError, ValueError):
        pass

    # windows
    try:
        n = int(os.environ['NUMBER_OF_PROCESSORS'])
        if n > 0:
            return n
    except (KeyError, ValueError):
        pass

    return 1

def _numworkers(ui):
    s = ui.config('worker', 'numcpus')
    if s:
        try:
            n = int(s)
            if n >= 1:
                return n
        except ValueError:
            raise util.Abort(_('number of cpus must be an integer'))
    return min(max(countcpus(), 4), 32)

if os.name == 'posix':
    _startupcost = 0.01
else:
    _startupcost = 1e30

def worthwhile(ui, costperop, nops):
    '''try to determine whether the benefit of multiple processes can
    outweigh the cost of starting them'''
    linear = costperop * nops
    workers = _numworkers(ui)
    benefit = linear - (_startupcost * workers + linear / workers)
    return benefit >= 0.15

def worker(ui, costperarg, func, staticargs, args):
    '''run a function, possibly in parallel in multiple worker
    processes.

    returns a progress iterator

    costperarg - cost of a single task

    func - function to run

    staticargs - arguments to pass to every invocation of the function

    args - arguments to split into chunks, to pass to individual
    workers
    '''
    if worthwhile(ui, costperarg, len(args)):
        return _platformworker(ui, func, staticargs, args)
    return func(*staticargs + (args,))

def _posixworker(ui, func, staticargs, args):
    rfd, wfd = os.pipe()
    workers = _numworkers(ui)
    for pargs in partition(args, workers):
        pid = os.fork()
        if pid == 0:
            try:
                os.close(rfd)
                for i, item in func(*(staticargs + (pargs,))):
                    os.write(wfd, '%d %s\n' % (i, item))
                os._exit(0)
            except KeyboardInterrupt:
                os._exit(255)
    os.close(wfd)
    fp = os.fdopen(rfd, 'rb', 0)
    oldhandler = signal.getsignal(signal.SIGINT)
    signal.signal(signal.SIGINT, signal.SIG_IGN)
    def cleanup():
        # python 2.4 is too dumb for try/yield/finally
        signal.signal(signal.SIGINT, oldhandler)
        problems = 0
        for i in xrange(workers):
            problems |= os.wait()[1]
        if problems:
            sys.exit(1)
    try:
        for line in fp:
            l = line.split(' ', 1)
            yield int(l[0]), l[1][:-1]
    except: # re-raises
        cleanup()
        raise
    cleanup()

if os.name != 'nt':
    _platformworker = _posixworker

def partition(lst, nslices):
    '''partition a list into N slices of equal size'''
    n = len(lst)
    chunk, slop = n / nslices, n % nslices
    end = 0
    for i in xrange(nslices):
        start = end
        end = start + chunk
        if slop:
            end += 1
            slop -= 1
        yield lst[start:end]