Mercurial > hg
view mercurial/worker.py @ 21932:21a2f31f054d stable
largefiles: use "normallookup", if "mtime" of standin is unset
Before this patch, largefiles gotten from "other" revision (without
conflict) at "hg merge" become "clean" unexpectedly in steps below:
1. "merge.update()" is invoked
1-1 standinfile SF is updated in the working directory
1-2 "dirstate" entry for SF is "normallookup"-ed
2. "lfcommands.updatelfiles()" is invoked (by "overrides.hgmerge()")
2-1 largefile LF (for SF) is updated in the working directory
2-2 "dirstate" returns "n" for SF (by 1-2)
2-3 "lfdirstate" entry for LF is "normal"-ed
2-4 "lfdirstate" is written into ".hg/largefiles/dirstate", and
timestamp of LF is stored into "lfdirstate" file
(ASSUMPTION: timestamp of LF differs from one of "lfdirstate" file)
Then, "hs status" treats LF as "clean", even though LF is updated by
"other" revision (by 2-1), because "lfilesrepo.status()" always treats
"normal"-ed files (by 2-3 and 2-4) as "clean".
When timestamp is not set (= negative value) for standinfile in
"dirstate", largefile should be "normallookup"-ed regardless of
rebasing or not, because "n" state in "dirstate" doesn't ensure
"clean"-ness of a standinfile at that time.
This patch uses "normallookup" instead of "normal", if "mtime" of
standin is unset
This is a temporary way to fix with less changes. For fundamental
resolution of this kind of problems in the future, "lfdirstate" should
be updated with "dirstate" simultaneously while "merge.update"
execution: maybe by hooking "recordupdates"
It is also why this patch (temporarily) uses internal field "_map" of
"dirstate" directly.
This patch uses "[debug] dirstate.delaywrite" feature in the test, to
ensure that timestamp of the largefile gotten from "other" revision is
stored into ".hg/largefiles/dirstate". (for ASSUMPTION at 2-4)
This patch newly adds "test-largefiles-update.t", to avoid increasing
cost to run other tests for largefiles by subsequent patches
(especially, "[debug] dirstate.delaywrite" causes so).
author | FUJIWARA Katsunori <foozy@lares.dti.ne.jp> |
---|---|
date | Tue, 22 Jul 2014 23:59:34 +0900 |
parents | 1e5b38a919dd |
children | b3e51675f98e |
line wrap: on
line source
# worker.py - master-slave parallelism support # # Copyright 2013 Facebook, Inc. # # This software may be used and distributed according to the terms of the # GNU General Public License version 2 or any later version. from i18n import _ import errno, os, signal, sys, threading import util def countcpus(): '''try to count the number of CPUs on the system''' # posix try: n = int(os.sysconf('SC_NPROCESSORS_ONLN')) if n > 0: return n except (AttributeError, ValueError): pass # windows try: n = int(os.environ['NUMBER_OF_PROCESSORS']) if n > 0: return n except (KeyError, ValueError): pass return 1 def _numworkers(ui): s = ui.config('worker', 'numcpus') if s: try: n = int(s) if n >= 1: return n except ValueError: raise util.Abort(_('number of cpus must be an integer')) return min(max(countcpus(), 4), 32) if os.name == 'posix': _startupcost = 0.01 else: _startupcost = 1e30 def worthwhile(ui, costperop, nops): '''try to determine whether the benefit of multiple processes can outweigh the cost of starting them''' linear = costperop * nops workers = _numworkers(ui) benefit = linear - (_startupcost * workers + linear / workers) return benefit >= 0.15 def worker(ui, costperarg, func, staticargs, args): '''run a function, possibly in parallel in multiple worker processes. returns a progress iterator costperarg - cost of a single task func - function to run staticargs - arguments to pass to every invocation of the function args - arguments to split into chunks, to pass to individual workers ''' if worthwhile(ui, costperarg, len(args)): return _platformworker(ui, func, staticargs, args) return func(*staticargs + (args,)) def _posixworker(ui, func, staticargs, args): rfd, wfd = os.pipe() workers = _numworkers(ui) oldhandler = signal.getsignal(signal.SIGINT) signal.signal(signal.SIGINT, signal.SIG_IGN) pids, problem = [], [0] for pargs in partition(args, workers): pid = os.fork() if pid == 0: signal.signal(signal.SIGINT, oldhandler) try: os.close(rfd) for i, item in func(*(staticargs + (pargs,))): os.write(wfd, '%d %s\n' % (i, item)) os._exit(0) except KeyboardInterrupt: os._exit(255) # other exceptions are allowed to propagate, we rely # on lock.py's pid checks to avoid release callbacks pids.append(pid) pids.reverse() os.close(wfd) fp = os.fdopen(rfd, 'rb', 0) def killworkers(): # if one worker bails, there's no good reason to wait for the rest for p in pids: try: os.kill(p, signal.SIGTERM) except OSError, err: if err.errno != errno.ESRCH: raise def waitforworkers(): for _ in pids: st = _exitstatus(os.wait()[1]) if st and not problem[0]: problem[0] = st killworkers() t = threading.Thread(target=waitforworkers) t.start() def cleanup(): signal.signal(signal.SIGINT, oldhandler) t.join() status = problem[0] if status: if status < 0: os.kill(os.getpid(), -status) sys.exit(status) try: for line in fp: l = line.split(' ', 1) yield int(l[0]), l[1][:-1] except: # re-raises killworkers() cleanup() raise cleanup() def _posixexitstatus(code): '''convert a posix exit status into the same form returned by os.spawnv returns None if the process was stopped instead of exiting''' if os.WIFEXITED(code): return os.WEXITSTATUS(code) elif os.WIFSIGNALED(code): return -os.WTERMSIG(code) if os.name != 'nt': _platformworker = _posixworker _exitstatus = _posixexitstatus def partition(lst, nslices): '''partition a list into N slices of equal size''' n = len(lst) chunk, slop = n / nslices, n % nslices end = 0 for i in xrange(nslices): start = end end = start + chunk if slop: end += 1 slop -= 1 yield lst[start:end]