view mercurial/worker.py @ 21932:21a2f31f054d stable

largefiles: use "normallookup", if "mtime" of standin is unset Before this patch, largefiles gotten from "other" revision (without conflict) at "hg merge" become "clean" unexpectedly in steps below: 1. "merge.update()" is invoked 1-1 standinfile SF is updated in the working directory 1-2 "dirstate" entry for SF is "normallookup"-ed 2. "lfcommands.updatelfiles()" is invoked (by "overrides.hgmerge()") 2-1 largefile LF (for SF) is updated in the working directory 2-2 "dirstate" returns "n" for SF (by 1-2) 2-3 "lfdirstate" entry for LF is "normal"-ed 2-4 "lfdirstate" is written into ".hg/largefiles/dirstate", and timestamp of LF is stored into "lfdirstate" file (ASSUMPTION: timestamp of LF differs from one of "lfdirstate" file) Then, "hs status" treats LF as "clean", even though LF is updated by "other" revision (by 2-1), because "lfilesrepo.status()" always treats "normal"-ed files (by 2-3 and 2-4) as "clean". When timestamp is not set (= negative value) for standinfile in "dirstate", largefile should be "normallookup"-ed regardless of rebasing or not, because "n" state in "dirstate" doesn't ensure "clean"-ness of a standinfile at that time. This patch uses "normallookup" instead of "normal", if "mtime" of standin is unset This is a temporary way to fix with less changes. For fundamental resolution of this kind of problems in the future, "lfdirstate" should be updated with "dirstate" simultaneously while "merge.update" execution: maybe by hooking "recordupdates" It is also why this patch (temporarily) uses internal field "_map" of "dirstate" directly. This patch uses "[debug] dirstate.delaywrite" feature in the test, to ensure that timestamp of the largefile gotten from "other" revision is stored into ".hg/largefiles/dirstate". (for ASSUMPTION at 2-4) This patch newly adds "test-largefiles-update.t", to avoid increasing cost to run other tests for largefiles by subsequent patches (especially, "[debug] dirstate.delaywrite" causes so).
author FUJIWARA Katsunori <foozy@lares.dti.ne.jp>
date Tue, 22 Jul 2014 23:59:34 +0900
parents 1e5b38a919dd
children b3e51675f98e
line wrap: on
line source

# worker.py - master-slave parallelism support
#
# Copyright 2013 Facebook, Inc.
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.

from i18n import _
import errno, os, signal, sys, threading
import util

def countcpus():
    '''try to count the number of CPUs on the system'''

    # posix
    try:
        n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
        if n > 0:
            return n
    except (AttributeError, ValueError):
        pass

    # windows
    try:
        n = int(os.environ['NUMBER_OF_PROCESSORS'])
        if n > 0:
            return n
    except (KeyError, ValueError):
        pass

    return 1

def _numworkers(ui):
    s = ui.config('worker', 'numcpus')
    if s:
        try:
            n = int(s)
            if n >= 1:
                return n
        except ValueError:
            raise util.Abort(_('number of cpus must be an integer'))
    return min(max(countcpus(), 4), 32)

if os.name == 'posix':
    _startupcost = 0.01
else:
    _startupcost = 1e30

def worthwhile(ui, costperop, nops):
    '''try to determine whether the benefit of multiple processes can
    outweigh the cost of starting them'''
    linear = costperop * nops
    workers = _numworkers(ui)
    benefit = linear - (_startupcost * workers + linear / workers)
    return benefit >= 0.15

def worker(ui, costperarg, func, staticargs, args):
    '''run a function, possibly in parallel in multiple worker
    processes.

    returns a progress iterator

    costperarg - cost of a single task

    func - function to run

    staticargs - arguments to pass to every invocation of the function

    args - arguments to split into chunks, to pass to individual
    workers
    '''
    if worthwhile(ui, costperarg, len(args)):
        return _platformworker(ui, func, staticargs, args)
    return func(*staticargs + (args,))

def _posixworker(ui, func, staticargs, args):
    rfd, wfd = os.pipe()
    workers = _numworkers(ui)
    oldhandler = signal.getsignal(signal.SIGINT)
    signal.signal(signal.SIGINT, signal.SIG_IGN)
    pids, problem = [], [0]
    for pargs in partition(args, workers):
        pid = os.fork()
        if pid == 0:
            signal.signal(signal.SIGINT, oldhandler)
            try:
                os.close(rfd)
                for i, item in func(*(staticargs + (pargs,))):
                    os.write(wfd, '%d %s\n' % (i, item))
                os._exit(0)
            except KeyboardInterrupt:
                os._exit(255)
                # other exceptions are allowed to propagate, we rely
                # on lock.py's pid checks to avoid release callbacks
        pids.append(pid)
    pids.reverse()
    os.close(wfd)
    fp = os.fdopen(rfd, 'rb', 0)
    def killworkers():
        # if one worker bails, there's no good reason to wait for the rest
        for p in pids:
            try:
                os.kill(p, signal.SIGTERM)
            except OSError, err:
                if err.errno != errno.ESRCH:
                    raise
    def waitforworkers():
        for _ in pids:
            st = _exitstatus(os.wait()[1])
            if st and not problem[0]:
                problem[0] = st
                killworkers()
    t = threading.Thread(target=waitforworkers)
    t.start()
    def cleanup():
        signal.signal(signal.SIGINT, oldhandler)
        t.join()
        status = problem[0]
        if status:
            if status < 0:
                os.kill(os.getpid(), -status)
            sys.exit(status)
    try:
        for line in fp:
            l = line.split(' ', 1)
            yield int(l[0]), l[1][:-1]
    except: # re-raises
        killworkers()
        cleanup()
        raise
    cleanup()

def _posixexitstatus(code):
    '''convert a posix exit status into the same form returned by
    os.spawnv

    returns None if the process was stopped instead of exiting'''
    if os.WIFEXITED(code):
        return os.WEXITSTATUS(code)
    elif os.WIFSIGNALED(code):
        return -os.WTERMSIG(code)

if os.name != 'nt':
    _platformworker = _posixworker
    _exitstatus = _posixexitstatus

def partition(lst, nslices):
    '''partition a list into N slices of equal size'''
    n = len(lst)
    chunk, slop = n / nslices, n % nslices
    end = 0
    for i in xrange(nslices):
        start = end
        end = start + chunk
        if slop:
            end += 1
            slop -= 1
        yield lst[start:end]