# HG changeset patch # User Pierre-Yves David # Date 1590575168 -7200 # Node ID 4c1d39215034af68b4b7d3a9221bbc8eca83f34f # Parent 818b4f19ef23258bf2b4373961f55a769ab4b3da metadata: move computation related to files touched in a dedicated module This was suggested by Yuya Nishihara a while back. Since I am about to add more metadata related computation, lets create a new repositories. Differential Revision: https://phab.mercurial-scm.org/D8587 diff -r 818b4f19ef23 -r 4c1d39215034 mercurial/changelog.py --- a/mercurial/changelog.py Fri Jun 05 11:10:33 2020 -0700 +++ b/mercurial/changelog.py Wed May 27 12:26:08 2020 +0200 @@ -16,9 +16,9 @@ from .thirdparty import attr from . import ( - copies, encoding, error, + metadata, pycompat, revlog, ) @@ -318,7 +318,7 @@ rawindices = self.extra.get(b'filesadded') if rawindices is None: return None - return copies.decodefileindices(self.files, rawindices) + return metadata.decodefileindices(self.files, rawindices) @property def filesremoved(self): @@ -330,7 +330,7 @@ rawindices = self.extra.get(b'filesremoved') if rawindices is None: return None - return copies.decodefileindices(self.files, rawindices) + return metadata.decodefileindices(self.files, rawindices) @property def p1copies(self): @@ -342,7 +342,7 @@ rawcopies = self.extra.get(b'p1copies') if rawcopies is None: return None - return copies.decodecopies(self.files, rawcopies) + return metadata.decodecopies(self.files, rawcopies) @property def p2copies(self): @@ -354,7 +354,7 @@ rawcopies = self.extra.get(b'p2copies') if rawcopies is None: return None - return copies.decodecopies(self.files, rawcopies) + return metadata.decodecopies(self.files, rawcopies) @property def description(self): @@ -570,13 +570,13 @@ ): extra.pop(name, None) if p1copies is not None: - p1copies = copies.encodecopies(sortedfiles, p1copies) + p1copies = metadata.encodecopies(sortedfiles, p1copies) if p2copies is not None: - p2copies = copies.encodecopies(sortedfiles, p2copies) + p2copies = metadata.encodecopies(sortedfiles, p2copies) if filesadded is not None: - filesadded = copies.encodefileindices(sortedfiles, filesadded) + filesadded = metadata.encodefileindices(sortedfiles, filesadded) if filesremoved is not None: - filesremoved = copies.encodefileindices(sortedfiles, filesremoved) + filesremoved = metadata.encodefileindices(sortedfiles, filesremoved) if self._copiesstorage == b'extra': extrasentries = p1copies, p2copies, filesadded, filesremoved if extra is None and any(x is not None for x in extrasentries): diff -r 818b4f19ef23 -r 4c1d39215034 mercurial/context.py --- a/mercurial/context.py Fri Jun 05 11:10:33 2020 -0700 +++ b/mercurial/context.py Wed May 27 12:26:08 2020 +0200 @@ -28,13 +28,13 @@ open, ) from . import ( - copies, dagop, encoding, error, fileset, match as matchmod, mergestate as mergestatemod, + metadata, obsolete as obsmod, patch, pathutil, @@ -300,7 +300,7 @@ @propertycache def _copies(self): - return copies.computechangesetcopies(self) + return metadata.computechangesetcopies(self) def p1copies(self): return self._copies[0] @@ -589,7 +589,7 @@ filesadded = None if filesadded is None: if compute_on_none: - filesadded = copies.computechangesetfilesadded(self) + filesadded = metadata.computechangesetfilesadded(self) else: filesadded = [] return filesadded @@ -608,7 +608,7 @@ filesremoved = None if filesremoved is None: if compute_on_none: - filesremoved = copies.computechangesetfilesremoved(self) + filesremoved = metadata.computechangesetfilesremoved(self) else: filesremoved = [] return filesremoved diff -r 818b4f19ef23 -r 4c1d39215034 mercurial/copies.py --- a/mercurial/copies.py Fri Jun 05 11:10:33 2020 -0700 +++ b/mercurial/copies.py Wed May 27 12:26:08 2020 +0200 @@ -8,7 +8,6 @@ from __future__ import absolute_import import collections -import multiprocessing import os from .i18n import _ @@ -17,7 +16,6 @@ from .revlogutils.flagutil import REVIDX_SIDEDATA from . import ( - error, match as matchmod, node, pathutil, @@ -25,7 +23,6 @@ util, ) -from .revlogutils import sidedata as sidedatamod from .utils import stringutil @@ -992,250 +989,3 @@ _filter(wctx.p1(), wctx, new_copies) for dst, src in pycompat.iteritems(new_copies): wctx[dst].markcopied(src) - - -def computechangesetfilesadded(ctx): - """return the list of files added in a changeset - """ - added = [] - for f in ctx.files(): - if not any(f in p for p in ctx.parents()): - added.append(f) - return added - - -def computechangesetfilesremoved(ctx): - """return the list of files removed in a changeset - """ - removed = [] - for f in ctx.files(): - if f not in ctx: - removed.append(f) - return removed - - -def computechangesetcopies(ctx): - """return the copies data for a changeset - - The copies data are returned as a pair of dictionnary (p1copies, p2copies). - - Each dictionnary are in the form: `{newname: oldname}` - """ - p1copies = {} - p2copies = {} - p1 = ctx.p1() - p2 = ctx.p2() - narrowmatch = ctx._repo.narrowmatch() - for dst in ctx.files(): - if not narrowmatch(dst) or dst not in ctx: - continue - copied = ctx[dst].renamed() - if not copied: - continue - src, srcnode = copied - if src in p1 and p1[src].filenode() == srcnode: - p1copies[dst] = src - elif src in p2 and p2[src].filenode() == srcnode: - p2copies[dst] = src - return p1copies, p2copies - - -def encodecopies(files, copies): - items = [] - for i, dst in enumerate(files): - if dst in copies: - items.append(b'%d\0%s' % (i, copies[dst])) - if len(items) != len(copies): - raise error.ProgrammingError( - b'some copy targets missing from file list' - ) - return b"\n".join(items) - - -def decodecopies(files, data): - try: - copies = {} - if not data: - return copies - for l in data.split(b'\n'): - strindex, src = l.split(b'\0') - i = int(strindex) - dst = files[i] - copies[dst] = src - return copies - except (ValueError, IndexError): - # Perhaps someone had chosen the same key name (e.g. "p1copies") and - # used different syntax for the value. - return None - - -def encodefileindices(files, subset): - subset = set(subset) - indices = [] - for i, f in enumerate(files): - if f in subset: - indices.append(b'%d' % i) - return b'\n'.join(indices) - - -def decodefileindices(files, data): - try: - subset = [] - if not data: - return subset - for strindex in data.split(b'\n'): - i = int(strindex) - if i < 0 or i >= len(files): - return None - subset.append(files[i]) - return subset - except (ValueError, IndexError): - # Perhaps someone had chosen the same key name (e.g. "added") and - # used different syntax for the value. - return None - - -def _getsidedata(srcrepo, rev): - ctx = srcrepo[rev] - filescopies = computechangesetcopies(ctx) - filesadded = computechangesetfilesadded(ctx) - filesremoved = computechangesetfilesremoved(ctx) - sidedata = {} - if any([filescopies, filesadded, filesremoved]): - sortedfiles = sorted(ctx.files()) - p1copies, p2copies = filescopies - p1copies = encodecopies(sortedfiles, p1copies) - p2copies = encodecopies(sortedfiles, p2copies) - filesadded = encodefileindices(sortedfiles, filesadded) - filesremoved = encodefileindices(sortedfiles, filesremoved) - if p1copies: - sidedata[sidedatamod.SD_P1COPIES] = p1copies - if p2copies: - sidedata[sidedatamod.SD_P2COPIES] = p2copies - if filesadded: - sidedata[sidedatamod.SD_FILESADDED] = filesadded - if filesremoved: - sidedata[sidedatamod.SD_FILESREMOVED] = filesremoved - return sidedata - - -def getsidedataadder(srcrepo, destrepo): - use_w = srcrepo.ui.configbool(b'experimental', b'worker.repository-upgrade') - if pycompat.iswindows or not use_w: - return _get_simple_sidedata_adder(srcrepo, destrepo) - else: - return _get_worker_sidedata_adder(srcrepo, destrepo) - - -def _sidedata_worker(srcrepo, revs_queue, sidedata_queue, tokens): - """The function used by worker precomputing sidedata - - It read an input queue containing revision numbers - It write in an output queue containing (rev, ) - - The `None` input value is used as a stop signal. - - The `tokens` semaphore is user to avoid having too many unprocessed - entries. The workers needs to acquire one token before fetching a task. - They will be released by the consumer of the produced data. - """ - tokens.acquire() - rev = revs_queue.get() - while rev is not None: - data = _getsidedata(srcrepo, rev) - sidedata_queue.put((rev, data)) - tokens.acquire() - rev = revs_queue.get() - # processing of `None` is completed, release the token. - tokens.release() - - -BUFF_PER_WORKER = 50 - - -def _get_worker_sidedata_adder(srcrepo, destrepo): - """The parallel version of the sidedata computation - - This code spawn a pool of worker that precompute a buffer of sidedata - before we actually need them""" - # avoid circular import copies -> scmutil -> worker -> copies - from . import worker - - nbworkers = worker._numworkers(srcrepo.ui) - - tokens = multiprocessing.BoundedSemaphore(nbworkers * BUFF_PER_WORKER) - revsq = multiprocessing.Queue() - sidedataq = multiprocessing.Queue() - - assert srcrepo.filtername is None - # queue all tasks beforehand, revision numbers are small and it make - # synchronisation simpler - # - # Since the computation for each node can be quite expensive, the overhead - # of using a single queue is not revelant. In practice, most computation - # are fast but some are very expensive and dominate all the other smaller - # cost. - for r in srcrepo.changelog.revs(): - revsq.put(r) - # queue the "no more tasks" markers - for i in range(nbworkers): - revsq.put(None) - - allworkers = [] - for i in range(nbworkers): - args = (srcrepo, revsq, sidedataq, tokens) - w = multiprocessing.Process(target=_sidedata_worker, args=args) - allworkers.append(w) - w.start() - - # dictionnary to store results for revision higher than we one we are - # looking for. For example, if we need the sidedatamap for 42, and 43 is - # received, when shelve 43 for later use. - staging = {} - - def sidedata_companion(revlog, rev): - sidedata = {} - if util.safehasattr(revlog, b'filteredrevs'): # this is a changelog - # Is the data previously shelved ? - sidedata = staging.pop(rev, None) - if sidedata is None: - # look at the queued result until we find the one we are lookig - # for (shelve the other ones) - r, sidedata = sidedataq.get() - while r != rev: - staging[r] = sidedata - r, sidedata = sidedataq.get() - tokens.release() - return False, (), sidedata - - return sidedata_companion - - -def _get_simple_sidedata_adder(srcrepo, destrepo): - """The simple version of the sidedata computation - - It just compute it in the same thread on request""" - - def sidedatacompanion(revlog, rev): - sidedata = {} - if util.safehasattr(revlog, 'filteredrevs'): # this is a changelog - sidedata = _getsidedata(srcrepo, rev) - return False, (), sidedata - - return sidedatacompanion - - -def getsidedataremover(srcrepo, destrepo): - def sidedatacompanion(revlog, rev): - f = () - if util.safehasattr(revlog, 'filteredrevs'): # this is a changelog - if revlog.flags(rev) & REVIDX_SIDEDATA: - f = ( - sidedatamod.SD_P1COPIES, - sidedatamod.SD_P2COPIES, - sidedatamod.SD_FILESADDED, - sidedatamod.SD_FILESREMOVED, - ) - return False, f, {} - - return sidedatacompanion diff -r 818b4f19ef23 -r 4c1d39215034 mercurial/metadata.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/mercurial/metadata.py Wed May 27 12:26:08 2020 +0200 @@ -0,0 +1,268 @@ +# metadata.py -- code related to various metadata computation and access. +# +# Copyright 2019 Google, Inc +# Copyright 2020 Pierre-Yves David +# +# This software may be used and distributed according to the terms of the +# GNU General Public License version 2 or any later version. +from __future__ import absolute_import, print_function + +import multiprocessing + +from . import ( + error, + pycompat, + util, +) + +from .revlogutils import ( + flagutil as sidedataflag, + sidedata as sidedatamod, +) + + +def computechangesetfilesadded(ctx): + """return the list of files added in a changeset + """ + added = [] + for f in ctx.files(): + if not any(f in p for p in ctx.parents()): + added.append(f) + return added + + +def computechangesetfilesremoved(ctx): + """return the list of files removed in a changeset + """ + removed = [] + for f in ctx.files(): + if f not in ctx: + removed.append(f) + return removed + + +def computechangesetcopies(ctx): + """return the copies data for a changeset + + The copies data are returned as a pair of dictionnary (p1copies, p2copies). + + Each dictionnary are in the form: `{newname: oldname}` + """ + p1copies = {} + p2copies = {} + p1 = ctx.p1() + p2 = ctx.p2() + narrowmatch = ctx._repo.narrowmatch() + for dst in ctx.files(): + if not narrowmatch(dst) or dst not in ctx: + continue + copied = ctx[dst].renamed() + if not copied: + continue + src, srcnode = copied + if src in p1 and p1[src].filenode() == srcnode: + p1copies[dst] = src + elif src in p2 and p2[src].filenode() == srcnode: + p2copies[dst] = src + return p1copies, p2copies + + +def encodecopies(files, copies): + items = [] + for i, dst in enumerate(files): + if dst in copies: + items.append(b'%d\0%s' % (i, copies[dst])) + if len(items) != len(copies): + raise error.ProgrammingError( + b'some copy targets missing from file list' + ) + return b"\n".join(items) + + +def decodecopies(files, data): + try: + copies = {} + if not data: + return copies + for l in data.split(b'\n'): + strindex, src = l.split(b'\0') + i = int(strindex) + dst = files[i] + copies[dst] = src + return copies + except (ValueError, IndexError): + # Perhaps someone had chosen the same key name (e.g. "p1copies") and + # used different syntax for the value. + return None + + +def encodefileindices(files, subset): + subset = set(subset) + indices = [] + for i, f in enumerate(files): + if f in subset: + indices.append(b'%d' % i) + return b'\n'.join(indices) + + +def decodefileindices(files, data): + try: + subset = [] + if not data: + return subset + for strindex in data.split(b'\n'): + i = int(strindex) + if i < 0 or i >= len(files): + return None + subset.append(files[i]) + return subset + except (ValueError, IndexError): + # Perhaps someone had chosen the same key name (e.g. "added") and + # used different syntax for the value. + return None + + +def _getsidedata(srcrepo, rev): + ctx = srcrepo[rev] + filescopies = computechangesetcopies(ctx) + filesadded = computechangesetfilesadded(ctx) + filesremoved = computechangesetfilesremoved(ctx) + sidedata = {} + if any([filescopies, filesadded, filesremoved]): + sortedfiles = sorted(ctx.files()) + p1copies, p2copies = filescopies + p1copies = encodecopies(sortedfiles, p1copies) + p2copies = encodecopies(sortedfiles, p2copies) + filesadded = encodefileindices(sortedfiles, filesadded) + filesremoved = encodefileindices(sortedfiles, filesremoved) + if p1copies: + sidedata[sidedatamod.SD_P1COPIES] = p1copies + if p2copies: + sidedata[sidedatamod.SD_P2COPIES] = p2copies + if filesadded: + sidedata[sidedatamod.SD_FILESADDED] = filesadded + if filesremoved: + sidedata[sidedatamod.SD_FILESREMOVED] = filesremoved + return sidedata + + +def getsidedataadder(srcrepo, destrepo): + use_w = srcrepo.ui.configbool(b'experimental', b'worker.repository-upgrade') + if pycompat.iswindows or not use_w: + return _get_simple_sidedata_adder(srcrepo, destrepo) + else: + return _get_worker_sidedata_adder(srcrepo, destrepo) + + +def _sidedata_worker(srcrepo, revs_queue, sidedata_queue, tokens): + """The function used by worker precomputing sidedata + + It read an input queue containing revision numbers + It write in an output queue containing (rev, ) + + The `None` input value is used as a stop signal. + + The `tokens` semaphore is user to avoid having too many unprocessed + entries. The workers needs to acquire one token before fetching a task. + They will be released by the consumer of the produced data. + """ + tokens.acquire() + rev = revs_queue.get() + while rev is not None: + data = _getsidedata(srcrepo, rev) + sidedata_queue.put((rev, data)) + tokens.acquire() + rev = revs_queue.get() + # processing of `None` is completed, release the token. + tokens.release() + + +BUFF_PER_WORKER = 50 + + +def _get_worker_sidedata_adder(srcrepo, destrepo): + """The parallel version of the sidedata computation + + This code spawn a pool of worker that precompute a buffer of sidedata + before we actually need them""" + # avoid circular import copies -> scmutil -> worker -> copies + from . import worker + + nbworkers = worker._numworkers(srcrepo.ui) + + tokens = multiprocessing.BoundedSemaphore(nbworkers * BUFF_PER_WORKER) + revsq = multiprocessing.Queue() + sidedataq = multiprocessing.Queue() + + assert srcrepo.filtername is None + # queue all tasks beforehand, revision numbers are small and it make + # synchronisation simpler + # + # Since the computation for each node can be quite expensive, the overhead + # of using a single queue is not revelant. In practice, most computation + # are fast but some are very expensive and dominate all the other smaller + # cost. + for r in srcrepo.changelog.revs(): + revsq.put(r) + # queue the "no more tasks" markers + for i in range(nbworkers): + revsq.put(None) + + allworkers = [] + for i in range(nbworkers): + args = (srcrepo, revsq, sidedataq, tokens) + w = multiprocessing.Process(target=_sidedata_worker, args=args) + allworkers.append(w) + w.start() + + # dictionnary to store results for revision higher than we one we are + # looking for. For example, if we need the sidedatamap for 42, and 43 is + # received, when shelve 43 for later use. + staging = {} + + def sidedata_companion(revlog, rev): + sidedata = {} + if util.safehasattr(revlog, b'filteredrevs'): # this is a changelog + # Is the data previously shelved ? + sidedata = staging.pop(rev, None) + if sidedata is None: + # look at the queued result until we find the one we are lookig + # for (shelve the other ones) + r, sidedata = sidedataq.get() + while r != rev: + staging[r] = sidedata + r, sidedata = sidedataq.get() + tokens.release() + return False, (), sidedata + + return sidedata_companion + + +def _get_simple_sidedata_adder(srcrepo, destrepo): + """The simple version of the sidedata computation + + It just compute it in the same thread on request""" + + def sidedatacompanion(revlog, rev): + sidedata = {} + if util.safehasattr(revlog, 'filteredrevs'): # this is a changelog + sidedata = _getsidedata(srcrepo, rev) + return False, (), sidedata + + return sidedatacompanion + + +def getsidedataremover(srcrepo, destrepo): + def sidedatacompanion(revlog, rev): + f = () + if util.safehasattr(revlog, 'filteredrevs'): # this is a changelog + if revlog.flags(rev) & sidedataflag.REVIDX_SIDEDATA: + f = ( + sidedatamod.SD_P1COPIES, + sidedatamod.SD_P2COPIES, + sidedatamod.SD_FILESADDED, + sidedatamod.SD_FILESREMOVED, + ) + return False, f, {} + + return sidedatacompanion diff -r 818b4f19ef23 -r 4c1d39215034 mercurial/upgrade.py --- a/mercurial/upgrade.py Fri Jun 05 11:10:33 2020 -0700 +++ b/mercurial/upgrade.py Wed May 27 12:26:08 2020 +0200 @@ -13,12 +13,12 @@ from .pycompat import getattr from . import ( changelog, - copies, error, filelog, hg, localrepo, manifest, + metadata, pycompat, revlog, scmutil, @@ -734,9 +734,9 @@ return False, (), {} elif localrepo.COPIESSDC_REQUIREMENT in addedreqs: - sidedatacompanion = copies.getsidedataadder(srcrepo, dstrepo) + sidedatacompanion = metadata.getsidedataadder(srcrepo, dstrepo) elif localrepo.COPIESSDC_REQUIREMENT in removedreqs: - sidedatacompanion = copies.getsidedataremover(srcrepo, dstrepo) + sidedatacompanion = metadata.getsidedataremover(srcrepo, dstrepo) return sidedatacompanion