metadata: move computation related to files touched in a dedicated module
authorPierre-Yves David <pierre-yves.david@octobus.net>
Wed, 27 May 2020 12:26:08 +0200
changeset 44940 4c1d39215034
parent 44939 818b4f19ef23
child 44941 edd08aa193fb
metadata: move computation related to files touched in a dedicated module This was suggested by Yuya Nishihara a while back. Since I am about to add more metadata related computation, lets create a new repositories. Differential Revision: https://phab.mercurial-scm.org/D8587
mercurial/changelog.py
mercurial/context.py
mercurial/copies.py
mercurial/metadata.py
mercurial/upgrade.py
--- a/mercurial/changelog.py	Fri Jun 05 11:10:33 2020 -0700
+++ b/mercurial/changelog.py	Wed May 27 12:26:08 2020 +0200
@@ -16,9 +16,9 @@
 from .thirdparty import attr
 
 from . import (
-    copies,
     encoding,
     error,
+    metadata,
     pycompat,
     revlog,
 )
@@ -318,7 +318,7 @@
             rawindices = self.extra.get(b'filesadded')
         if rawindices is None:
             return None
-        return copies.decodefileindices(self.files, rawindices)
+        return metadata.decodefileindices(self.files, rawindices)
 
     @property
     def filesremoved(self):
@@ -330,7 +330,7 @@
             rawindices = self.extra.get(b'filesremoved')
         if rawindices is None:
             return None
-        return copies.decodefileindices(self.files, rawindices)
+        return metadata.decodefileindices(self.files, rawindices)
 
     @property
     def p1copies(self):
@@ -342,7 +342,7 @@
             rawcopies = self.extra.get(b'p1copies')
         if rawcopies is None:
             return None
-        return copies.decodecopies(self.files, rawcopies)
+        return metadata.decodecopies(self.files, rawcopies)
 
     @property
     def p2copies(self):
@@ -354,7 +354,7 @@
             rawcopies = self.extra.get(b'p2copies')
         if rawcopies is None:
             return None
-        return copies.decodecopies(self.files, rawcopies)
+        return metadata.decodecopies(self.files, rawcopies)
 
     @property
     def description(self):
@@ -570,13 +570,13 @@
             ):
                 extra.pop(name, None)
         if p1copies is not None:
-            p1copies = copies.encodecopies(sortedfiles, p1copies)
+            p1copies = metadata.encodecopies(sortedfiles, p1copies)
         if p2copies is not None:
-            p2copies = copies.encodecopies(sortedfiles, p2copies)
+            p2copies = metadata.encodecopies(sortedfiles, p2copies)
         if filesadded is not None:
-            filesadded = copies.encodefileindices(sortedfiles, filesadded)
+            filesadded = metadata.encodefileindices(sortedfiles, filesadded)
         if filesremoved is not None:
-            filesremoved = copies.encodefileindices(sortedfiles, filesremoved)
+            filesremoved = metadata.encodefileindices(sortedfiles, filesremoved)
         if self._copiesstorage == b'extra':
             extrasentries = p1copies, p2copies, filesadded, filesremoved
             if extra is None and any(x is not None for x in extrasentries):
--- a/mercurial/context.py	Fri Jun 05 11:10:33 2020 -0700
+++ b/mercurial/context.py	Wed May 27 12:26:08 2020 +0200
@@ -28,13 +28,13 @@
     open,
 )
 from . import (
-    copies,
     dagop,
     encoding,
     error,
     fileset,
     match as matchmod,
     mergestate as mergestatemod,
+    metadata,
     obsolete as obsmod,
     patch,
     pathutil,
@@ -300,7 +300,7 @@
 
     @propertycache
     def _copies(self):
-        return copies.computechangesetcopies(self)
+        return metadata.computechangesetcopies(self)
 
     def p1copies(self):
         return self._copies[0]
@@ -589,7 +589,7 @@
                 filesadded = None
         if filesadded is None:
             if compute_on_none:
-                filesadded = copies.computechangesetfilesadded(self)
+                filesadded = metadata.computechangesetfilesadded(self)
             else:
                 filesadded = []
         return filesadded
@@ -608,7 +608,7 @@
                 filesremoved = None
         if filesremoved is None:
             if compute_on_none:
-                filesremoved = copies.computechangesetfilesremoved(self)
+                filesremoved = metadata.computechangesetfilesremoved(self)
             else:
                 filesremoved = []
         return filesremoved
--- a/mercurial/copies.py	Fri Jun 05 11:10:33 2020 -0700
+++ b/mercurial/copies.py	Wed May 27 12:26:08 2020 +0200
@@ -8,7 +8,6 @@
 from __future__ import absolute_import
 
 import collections
-import multiprocessing
 import os
 
 from .i18n import _
@@ -17,7 +16,6 @@
 from .revlogutils.flagutil import REVIDX_SIDEDATA
 
 from . import (
-    error,
     match as matchmod,
     node,
     pathutil,
@@ -25,7 +23,6 @@
     util,
 )
 
-from .revlogutils import sidedata as sidedatamod
 
 from .utils import stringutil
 
@@ -992,250 +989,3 @@
     _filter(wctx.p1(), wctx, new_copies)
     for dst, src in pycompat.iteritems(new_copies):
         wctx[dst].markcopied(src)
-
-
-def computechangesetfilesadded(ctx):
-    """return the list of files added in a changeset
-    """
-    added = []
-    for f in ctx.files():
-        if not any(f in p for p in ctx.parents()):
-            added.append(f)
-    return added
-
-
-def computechangesetfilesremoved(ctx):
-    """return the list of files removed in a changeset
-    """
-    removed = []
-    for f in ctx.files():
-        if f not in ctx:
-            removed.append(f)
-    return removed
-
-
-def computechangesetcopies(ctx):
-    """return the copies data for a changeset
-
-    The copies data are returned as a pair of dictionnary (p1copies, p2copies).
-
-    Each dictionnary are in the form: `{newname: oldname}`
-    """
-    p1copies = {}
-    p2copies = {}
-    p1 = ctx.p1()
-    p2 = ctx.p2()
-    narrowmatch = ctx._repo.narrowmatch()
-    for dst in ctx.files():
-        if not narrowmatch(dst) or dst not in ctx:
-            continue
-        copied = ctx[dst].renamed()
-        if not copied:
-            continue
-        src, srcnode = copied
-        if src in p1 and p1[src].filenode() == srcnode:
-            p1copies[dst] = src
-        elif src in p2 and p2[src].filenode() == srcnode:
-            p2copies[dst] = src
-    return p1copies, p2copies
-
-
-def encodecopies(files, copies):
-    items = []
-    for i, dst in enumerate(files):
-        if dst in copies:
-            items.append(b'%d\0%s' % (i, copies[dst]))
-    if len(items) != len(copies):
-        raise error.ProgrammingError(
-            b'some copy targets missing from file list'
-        )
-    return b"\n".join(items)
-
-
-def decodecopies(files, data):
-    try:
-        copies = {}
-        if not data:
-            return copies
-        for l in data.split(b'\n'):
-            strindex, src = l.split(b'\0')
-            i = int(strindex)
-            dst = files[i]
-            copies[dst] = src
-        return copies
-    except (ValueError, IndexError):
-        # Perhaps someone had chosen the same key name (e.g. "p1copies") and
-        # used different syntax for the value.
-        return None
-
-
-def encodefileindices(files, subset):
-    subset = set(subset)
-    indices = []
-    for i, f in enumerate(files):
-        if f in subset:
-            indices.append(b'%d' % i)
-    return b'\n'.join(indices)
-
-
-def decodefileindices(files, data):
-    try:
-        subset = []
-        if not data:
-            return subset
-        for strindex in data.split(b'\n'):
-            i = int(strindex)
-            if i < 0 or i >= len(files):
-                return None
-            subset.append(files[i])
-        return subset
-    except (ValueError, IndexError):
-        # Perhaps someone had chosen the same key name (e.g. "added") and
-        # used different syntax for the value.
-        return None
-
-
-def _getsidedata(srcrepo, rev):
-    ctx = srcrepo[rev]
-    filescopies = computechangesetcopies(ctx)
-    filesadded = computechangesetfilesadded(ctx)
-    filesremoved = computechangesetfilesremoved(ctx)
-    sidedata = {}
-    if any([filescopies, filesadded, filesremoved]):
-        sortedfiles = sorted(ctx.files())
-        p1copies, p2copies = filescopies
-        p1copies = encodecopies(sortedfiles, p1copies)
-        p2copies = encodecopies(sortedfiles, p2copies)
-        filesadded = encodefileindices(sortedfiles, filesadded)
-        filesremoved = encodefileindices(sortedfiles, filesremoved)
-        if p1copies:
-            sidedata[sidedatamod.SD_P1COPIES] = p1copies
-        if p2copies:
-            sidedata[sidedatamod.SD_P2COPIES] = p2copies
-        if filesadded:
-            sidedata[sidedatamod.SD_FILESADDED] = filesadded
-        if filesremoved:
-            sidedata[sidedatamod.SD_FILESREMOVED] = filesremoved
-    return sidedata
-
-
-def getsidedataadder(srcrepo, destrepo):
-    use_w = srcrepo.ui.configbool(b'experimental', b'worker.repository-upgrade')
-    if pycompat.iswindows or not use_w:
-        return _get_simple_sidedata_adder(srcrepo, destrepo)
-    else:
-        return _get_worker_sidedata_adder(srcrepo, destrepo)
-
-
-def _sidedata_worker(srcrepo, revs_queue, sidedata_queue, tokens):
-    """The function used by worker precomputing sidedata
-
-    It read an input queue containing revision numbers
-    It write in an output queue containing (rev, <sidedata-map>)
-
-    The `None` input value is used as a stop signal.
-
-    The `tokens` semaphore is user to avoid having too many unprocessed
-    entries. The workers needs to acquire one token before fetching a task.
-    They will be released by the consumer of the produced data.
-    """
-    tokens.acquire()
-    rev = revs_queue.get()
-    while rev is not None:
-        data = _getsidedata(srcrepo, rev)
-        sidedata_queue.put((rev, data))
-        tokens.acquire()
-        rev = revs_queue.get()
-    # processing of `None` is completed, release the token.
-    tokens.release()
-
-
-BUFF_PER_WORKER = 50
-
-
-def _get_worker_sidedata_adder(srcrepo, destrepo):
-    """The parallel version of the sidedata computation
-
-    This code spawn a pool of worker that precompute a buffer of sidedata
-    before we actually need them"""
-    # avoid circular import copies -> scmutil -> worker -> copies
-    from . import worker
-
-    nbworkers = worker._numworkers(srcrepo.ui)
-
-    tokens = multiprocessing.BoundedSemaphore(nbworkers * BUFF_PER_WORKER)
-    revsq = multiprocessing.Queue()
-    sidedataq = multiprocessing.Queue()
-
-    assert srcrepo.filtername is None
-    # queue all tasks beforehand, revision numbers are small and it make
-    # synchronisation simpler
-    #
-    # Since the computation for each node can be quite expensive, the overhead
-    # of using a single queue is not revelant. In practice, most computation
-    # are fast but some are very expensive and dominate all the other smaller
-    # cost.
-    for r in srcrepo.changelog.revs():
-        revsq.put(r)
-    # queue the "no more tasks" markers
-    for i in range(nbworkers):
-        revsq.put(None)
-
-    allworkers = []
-    for i in range(nbworkers):
-        args = (srcrepo, revsq, sidedataq, tokens)
-        w = multiprocessing.Process(target=_sidedata_worker, args=args)
-        allworkers.append(w)
-        w.start()
-
-    # dictionnary to store results for revision higher than we one we are
-    # looking for. For example, if we need the sidedatamap for 42, and 43 is
-    # received, when shelve 43 for later use.
-    staging = {}
-
-    def sidedata_companion(revlog, rev):
-        sidedata = {}
-        if util.safehasattr(revlog, b'filteredrevs'):  # this is a changelog
-            # Is the data previously shelved ?
-            sidedata = staging.pop(rev, None)
-            if sidedata is None:
-                # look at the queued result until we find the one we are lookig
-                # for (shelve the other ones)
-                r, sidedata = sidedataq.get()
-                while r != rev:
-                    staging[r] = sidedata
-                    r, sidedata = sidedataq.get()
-            tokens.release()
-        return False, (), sidedata
-
-    return sidedata_companion
-
-
-def _get_simple_sidedata_adder(srcrepo, destrepo):
-    """The simple version of the sidedata computation
-
-    It just compute it in the same thread on request"""
-
-    def sidedatacompanion(revlog, rev):
-        sidedata = {}
-        if util.safehasattr(revlog, 'filteredrevs'):  # this is a changelog
-            sidedata = _getsidedata(srcrepo, rev)
-        return False, (), sidedata
-
-    return sidedatacompanion
-
-
-def getsidedataremover(srcrepo, destrepo):
-    def sidedatacompanion(revlog, rev):
-        f = ()
-        if util.safehasattr(revlog, 'filteredrevs'):  # this is a changelog
-            if revlog.flags(rev) & REVIDX_SIDEDATA:
-                f = (
-                    sidedatamod.SD_P1COPIES,
-                    sidedatamod.SD_P2COPIES,
-                    sidedatamod.SD_FILESADDED,
-                    sidedatamod.SD_FILESREMOVED,
-                )
-        return False, f, {}
-
-    return sidedatacompanion
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/mercurial/metadata.py	Wed May 27 12:26:08 2020 +0200
@@ -0,0 +1,268 @@
+# metadata.py -- code related to various metadata computation and access.
+#
+# Copyright 2019 Google, Inc <martinvonz@google.com>
+# Copyright 2020 Pierre-Yves David <pierre-yves.david@octobus.net>
+#
+# This software may be used and distributed according to the terms of the
+# GNU General Public License version 2 or any later version.
+from __future__ import absolute_import, print_function
+
+import multiprocessing
+
+from . import (
+    error,
+    pycompat,
+    util,
+)
+
+from .revlogutils import (
+    flagutil as sidedataflag,
+    sidedata as sidedatamod,
+)
+
+
+def computechangesetfilesadded(ctx):
+    """return the list of files added in a changeset
+    """
+    added = []
+    for f in ctx.files():
+        if not any(f in p for p in ctx.parents()):
+            added.append(f)
+    return added
+
+
+def computechangesetfilesremoved(ctx):
+    """return the list of files removed in a changeset
+    """
+    removed = []
+    for f in ctx.files():
+        if f not in ctx:
+            removed.append(f)
+    return removed
+
+
+def computechangesetcopies(ctx):
+    """return the copies data for a changeset
+
+    The copies data are returned as a pair of dictionnary (p1copies, p2copies).
+
+    Each dictionnary are in the form: `{newname: oldname}`
+    """
+    p1copies = {}
+    p2copies = {}
+    p1 = ctx.p1()
+    p2 = ctx.p2()
+    narrowmatch = ctx._repo.narrowmatch()
+    for dst in ctx.files():
+        if not narrowmatch(dst) or dst not in ctx:
+            continue
+        copied = ctx[dst].renamed()
+        if not copied:
+            continue
+        src, srcnode = copied
+        if src in p1 and p1[src].filenode() == srcnode:
+            p1copies[dst] = src
+        elif src in p2 and p2[src].filenode() == srcnode:
+            p2copies[dst] = src
+    return p1copies, p2copies
+
+
+def encodecopies(files, copies):
+    items = []
+    for i, dst in enumerate(files):
+        if dst in copies:
+            items.append(b'%d\0%s' % (i, copies[dst]))
+    if len(items) != len(copies):
+        raise error.ProgrammingError(
+            b'some copy targets missing from file list'
+        )
+    return b"\n".join(items)
+
+
+def decodecopies(files, data):
+    try:
+        copies = {}
+        if not data:
+            return copies
+        for l in data.split(b'\n'):
+            strindex, src = l.split(b'\0')
+            i = int(strindex)
+            dst = files[i]
+            copies[dst] = src
+        return copies
+    except (ValueError, IndexError):
+        # Perhaps someone had chosen the same key name (e.g. "p1copies") and
+        # used different syntax for the value.
+        return None
+
+
+def encodefileindices(files, subset):
+    subset = set(subset)
+    indices = []
+    for i, f in enumerate(files):
+        if f in subset:
+            indices.append(b'%d' % i)
+    return b'\n'.join(indices)
+
+
+def decodefileindices(files, data):
+    try:
+        subset = []
+        if not data:
+            return subset
+        for strindex in data.split(b'\n'):
+            i = int(strindex)
+            if i < 0 or i >= len(files):
+                return None
+            subset.append(files[i])
+        return subset
+    except (ValueError, IndexError):
+        # Perhaps someone had chosen the same key name (e.g. "added") and
+        # used different syntax for the value.
+        return None
+
+
+def _getsidedata(srcrepo, rev):
+    ctx = srcrepo[rev]
+    filescopies = computechangesetcopies(ctx)
+    filesadded = computechangesetfilesadded(ctx)
+    filesremoved = computechangesetfilesremoved(ctx)
+    sidedata = {}
+    if any([filescopies, filesadded, filesremoved]):
+        sortedfiles = sorted(ctx.files())
+        p1copies, p2copies = filescopies
+        p1copies = encodecopies(sortedfiles, p1copies)
+        p2copies = encodecopies(sortedfiles, p2copies)
+        filesadded = encodefileindices(sortedfiles, filesadded)
+        filesremoved = encodefileindices(sortedfiles, filesremoved)
+        if p1copies:
+            sidedata[sidedatamod.SD_P1COPIES] = p1copies
+        if p2copies:
+            sidedata[sidedatamod.SD_P2COPIES] = p2copies
+        if filesadded:
+            sidedata[sidedatamod.SD_FILESADDED] = filesadded
+        if filesremoved:
+            sidedata[sidedatamod.SD_FILESREMOVED] = filesremoved
+    return sidedata
+
+
+def getsidedataadder(srcrepo, destrepo):
+    use_w = srcrepo.ui.configbool(b'experimental', b'worker.repository-upgrade')
+    if pycompat.iswindows or not use_w:
+        return _get_simple_sidedata_adder(srcrepo, destrepo)
+    else:
+        return _get_worker_sidedata_adder(srcrepo, destrepo)
+
+
+def _sidedata_worker(srcrepo, revs_queue, sidedata_queue, tokens):
+    """The function used by worker precomputing sidedata
+
+    It read an input queue containing revision numbers
+    It write in an output queue containing (rev, <sidedata-map>)
+
+    The `None` input value is used as a stop signal.
+
+    The `tokens` semaphore is user to avoid having too many unprocessed
+    entries. The workers needs to acquire one token before fetching a task.
+    They will be released by the consumer of the produced data.
+    """
+    tokens.acquire()
+    rev = revs_queue.get()
+    while rev is not None:
+        data = _getsidedata(srcrepo, rev)
+        sidedata_queue.put((rev, data))
+        tokens.acquire()
+        rev = revs_queue.get()
+    # processing of `None` is completed, release the token.
+    tokens.release()
+
+
+BUFF_PER_WORKER = 50
+
+
+def _get_worker_sidedata_adder(srcrepo, destrepo):
+    """The parallel version of the sidedata computation
+
+    This code spawn a pool of worker that precompute a buffer of sidedata
+    before we actually need them"""
+    # avoid circular import copies -> scmutil -> worker -> copies
+    from . import worker
+
+    nbworkers = worker._numworkers(srcrepo.ui)
+
+    tokens = multiprocessing.BoundedSemaphore(nbworkers * BUFF_PER_WORKER)
+    revsq = multiprocessing.Queue()
+    sidedataq = multiprocessing.Queue()
+
+    assert srcrepo.filtername is None
+    # queue all tasks beforehand, revision numbers are small and it make
+    # synchronisation simpler
+    #
+    # Since the computation for each node can be quite expensive, the overhead
+    # of using a single queue is not revelant. In practice, most computation
+    # are fast but some are very expensive and dominate all the other smaller
+    # cost.
+    for r in srcrepo.changelog.revs():
+        revsq.put(r)
+    # queue the "no more tasks" markers
+    for i in range(nbworkers):
+        revsq.put(None)
+
+    allworkers = []
+    for i in range(nbworkers):
+        args = (srcrepo, revsq, sidedataq, tokens)
+        w = multiprocessing.Process(target=_sidedata_worker, args=args)
+        allworkers.append(w)
+        w.start()
+
+    # dictionnary to store results for revision higher than we one we are
+    # looking for. For example, if we need the sidedatamap for 42, and 43 is
+    # received, when shelve 43 for later use.
+    staging = {}
+
+    def sidedata_companion(revlog, rev):
+        sidedata = {}
+        if util.safehasattr(revlog, b'filteredrevs'):  # this is a changelog
+            # Is the data previously shelved ?
+            sidedata = staging.pop(rev, None)
+            if sidedata is None:
+                # look at the queued result until we find the one we are lookig
+                # for (shelve the other ones)
+                r, sidedata = sidedataq.get()
+                while r != rev:
+                    staging[r] = sidedata
+                    r, sidedata = sidedataq.get()
+            tokens.release()
+        return False, (), sidedata
+
+    return sidedata_companion
+
+
+def _get_simple_sidedata_adder(srcrepo, destrepo):
+    """The simple version of the sidedata computation
+
+    It just compute it in the same thread on request"""
+
+    def sidedatacompanion(revlog, rev):
+        sidedata = {}
+        if util.safehasattr(revlog, 'filteredrevs'):  # this is a changelog
+            sidedata = _getsidedata(srcrepo, rev)
+        return False, (), sidedata
+
+    return sidedatacompanion
+
+
+def getsidedataremover(srcrepo, destrepo):
+    def sidedatacompanion(revlog, rev):
+        f = ()
+        if util.safehasattr(revlog, 'filteredrevs'):  # this is a changelog
+            if revlog.flags(rev) & sidedataflag.REVIDX_SIDEDATA:
+                f = (
+                    sidedatamod.SD_P1COPIES,
+                    sidedatamod.SD_P2COPIES,
+                    sidedatamod.SD_FILESADDED,
+                    sidedatamod.SD_FILESREMOVED,
+                )
+        return False, f, {}
+
+    return sidedatacompanion
--- a/mercurial/upgrade.py	Fri Jun 05 11:10:33 2020 -0700
+++ b/mercurial/upgrade.py	Wed May 27 12:26:08 2020 +0200
@@ -13,12 +13,12 @@
 from .pycompat import getattr
 from . import (
     changelog,
-    copies,
     error,
     filelog,
     hg,
     localrepo,
     manifest,
+    metadata,
     pycompat,
     revlog,
     scmutil,
@@ -734,9 +734,9 @@
             return False, (), {}
 
     elif localrepo.COPIESSDC_REQUIREMENT in addedreqs:
-        sidedatacompanion = copies.getsidedataadder(srcrepo, dstrepo)
+        sidedatacompanion = metadata.getsidedataadder(srcrepo, dstrepo)
     elif localrepo.COPIESSDC_REQUIREMENT in removedreqs:
-        sidedatacompanion = copies.getsidedataremover(srcrepo, dstrepo)
+        sidedatacompanion = metadata.getsidedataremover(srcrepo, dstrepo)
     return sidedatacompanion