mercurial/worker.py
author Durham Goode <durham@fb.com>
Sat, 07 Nov 2015 16:11:49 -0800
changeset 26909 e36118815a39
parent 26587 56b2bcea2529
child 28181 f8efc8a3a991
permissions -rw-r--r--
phase: improve retractboundary perf The existing retractboundary implementation computed the new boundary by walking all descendants of all existing roots and computing the new roots. This is O(commits since first root), which on long repos can be hundreds of thousands of commits. The new algorithm only updates roots that are greater than the new root locations. For common operations like commit on a repo with the earliest root several hundred thousand commits ago, this makes retractboundary go from 1 second to 0.008 seconds. I tested it by running the test suite with both implementations and checking that the root results were always the identical. There was some discussion on IRC about the safety of this (i.e. what if the new nodes are already part of the phase, etc). I've looked into it and believe this patch is safe: 1) The old existing code already filters the input nodes to only contain nodes that require retracting (i.e. we only make node X a new root if the old phase is less than the target phase), so there's no chance of us adding a unnecessary root to the phase (unless the input root is made unnecessary by another root in the same input, but see point #3). 2) Another way of thinking about this is: the only way the new algorithm would be different from the old algorithm is if it added a root that is a descendant of an old root (since the old algorithm would've caught this in the big "roots(%ln::)". At the beginning of the function, when we filter out roots that already meet the phase criteria, the *definition* of meeting the phase criteria is "not being a descendant of an existing root". Therefore, by definition none of the new roots we are processing are descendants of an existing root. 3) If two nodes are passed in as input, and one node is an ancestor of the other (and therefore the later node should not be a root), this is still caught by the 'roots(%ln::)' revset. So there's no chance of an extra root being introduced that way either.

# worker.py - master-slave parallelism support
#
# Copyright 2013 Facebook, Inc.
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.

from __future__ import absolute_import

import errno
import os
import signal
import sys
import threading

from .i18n import _
from . import error

def countcpus():
    '''try to count the number of CPUs on the system'''

    # posix
    try:
        n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
        if n > 0:
            return n
    except (AttributeError, ValueError):
        pass

    # windows
    try:
        n = int(os.environ['NUMBER_OF_PROCESSORS'])
        if n > 0:
            return n
    except (KeyError, ValueError):
        pass

    return 1

def _numworkers(ui):
    s = ui.config('worker', 'numcpus')
    if s:
        try:
            n = int(s)
            if n >= 1:
                return n
        except ValueError:
            raise error.Abort(_('number of cpus must be an integer'))
    return min(max(countcpus(), 4), 32)

if os.name == 'posix':
    _startupcost = 0.01
else:
    _startupcost = 1e30

def worthwhile(ui, costperop, nops):
    '''try to determine whether the benefit of multiple processes can
    outweigh the cost of starting them'''
    linear = costperop * nops
    workers = _numworkers(ui)
    benefit = linear - (_startupcost * workers + linear / workers)
    return benefit >= 0.15

def worker(ui, costperarg, func, staticargs, args):
    '''run a function, possibly in parallel in multiple worker
    processes.

    returns a progress iterator

    costperarg - cost of a single task

    func - function to run

    staticargs - arguments to pass to every invocation of the function

    args - arguments to split into chunks, to pass to individual
    workers
    '''
    if worthwhile(ui, costperarg, len(args)):
        return _platformworker(ui, func, staticargs, args)
    return func(*staticargs + (args,))

def _posixworker(ui, func, staticargs, args):
    rfd, wfd = os.pipe()
    workers = _numworkers(ui)
    oldhandler = signal.getsignal(signal.SIGINT)
    signal.signal(signal.SIGINT, signal.SIG_IGN)
    pids, problem = [], [0]
    for pargs in partition(args, workers):
        pid = os.fork()
        if pid == 0:
            signal.signal(signal.SIGINT, oldhandler)
            try:
                os.close(rfd)
                for i, item in func(*(staticargs + (pargs,))):
                    os.write(wfd, '%d %s\n' % (i, item))
                os._exit(0)
            except KeyboardInterrupt:
                os._exit(255)
                # other exceptions are allowed to propagate, we rely
                # on lock.py's pid checks to avoid release callbacks
        pids.append(pid)
    pids.reverse()
    os.close(wfd)
    fp = os.fdopen(rfd, 'rb', 0)
    def killworkers():
        # if one worker bails, there's no good reason to wait for the rest
        for p in pids:
            try:
                os.kill(p, signal.SIGTERM)
            except OSError as err:
                if err.errno != errno.ESRCH:
                    raise
    def waitforworkers():
        for _pid in pids:
            st = _exitstatus(os.wait()[1])
            if st and not problem[0]:
                problem[0] = st
                killworkers()
    t = threading.Thread(target=waitforworkers)
    t.start()
    def cleanup():
        signal.signal(signal.SIGINT, oldhandler)
        t.join()
        status = problem[0]
        if status:
            if status < 0:
                os.kill(os.getpid(), -status)
            sys.exit(status)
    try:
        for line in fp:
            l = line.split(' ', 1)
            yield int(l[0]), l[1][:-1]
    except: # re-raises
        killworkers()
        cleanup()
        raise
    cleanup()

def _posixexitstatus(code):
    '''convert a posix exit status into the same form returned by
    os.spawnv

    returns None if the process was stopped instead of exiting'''
    if os.WIFEXITED(code):
        return os.WEXITSTATUS(code)
    elif os.WIFSIGNALED(code):
        return -os.WTERMSIG(code)

if os.name != 'nt':
    _platformworker = _posixworker
    _exitstatus = _posixexitstatus

def partition(lst, nslices):
    '''partition a list into N slices of equal size'''
    n = len(lst)
    chunk, slop = n / nslices, n % nslices
    end = 0
    for i in xrange(nslices):
        start = end
        end = start + chunk
        if slop:
            end += 1
            slop -= 1
        yield lst[start:end]