--- a/mercurial/copies.py Fri Jun 05 11:10:33 2020 -0700
+++ b/mercurial/copies.py Wed May 27 12:26:08 2020 +0200
@@ -8,7 +8,6 @@
from __future__ import absolute_import
import collections
-import multiprocessing
import os
from .i18n import _
@@ -17,7 +16,6 @@
from .revlogutils.flagutil import REVIDX_SIDEDATA
from . import (
- error,
match as matchmod,
node,
pathutil,
@@ -25,7 +23,6 @@
util,
)
-from .revlogutils import sidedata as sidedatamod
from .utils import stringutil
@@ -992,250 +989,3 @@
_filter(wctx.p1(), wctx, new_copies)
for dst, src in pycompat.iteritems(new_copies):
wctx[dst].markcopied(src)
-
-
-def computechangesetfilesadded(ctx):
- """return the list of files added in a changeset
- """
- added = []
- for f in ctx.files():
- if not any(f in p for p in ctx.parents()):
- added.append(f)
- return added
-
-
-def computechangesetfilesremoved(ctx):
- """return the list of files removed in a changeset
- """
- removed = []
- for f in ctx.files():
- if f not in ctx:
- removed.append(f)
- return removed
-
-
-def computechangesetcopies(ctx):
- """return the copies data for a changeset
-
- The copies data are returned as a pair of dictionnary (p1copies, p2copies).
-
- Each dictionnary are in the form: `{newname: oldname}`
- """
- p1copies = {}
- p2copies = {}
- p1 = ctx.p1()
- p2 = ctx.p2()
- narrowmatch = ctx._repo.narrowmatch()
- for dst in ctx.files():
- if not narrowmatch(dst) or dst not in ctx:
- continue
- copied = ctx[dst].renamed()
- if not copied:
- continue
- src, srcnode = copied
- if src in p1 and p1[src].filenode() == srcnode:
- p1copies[dst] = src
- elif src in p2 and p2[src].filenode() == srcnode:
- p2copies[dst] = src
- return p1copies, p2copies
-
-
-def encodecopies(files, copies):
- items = []
- for i, dst in enumerate(files):
- if dst in copies:
- items.append(b'%d\0%s' % (i, copies[dst]))
- if len(items) != len(copies):
- raise error.ProgrammingError(
- b'some copy targets missing from file list'
- )
- return b"\n".join(items)
-
-
-def decodecopies(files, data):
- try:
- copies = {}
- if not data:
- return copies
- for l in data.split(b'\n'):
- strindex, src = l.split(b'\0')
- i = int(strindex)
- dst = files[i]
- copies[dst] = src
- return copies
- except (ValueError, IndexError):
- # Perhaps someone had chosen the same key name (e.g. "p1copies") and
- # used different syntax for the value.
- return None
-
-
-def encodefileindices(files, subset):
- subset = set(subset)
- indices = []
- for i, f in enumerate(files):
- if f in subset:
- indices.append(b'%d' % i)
- return b'\n'.join(indices)
-
-
-def decodefileindices(files, data):
- try:
- subset = []
- if not data:
- return subset
- for strindex in data.split(b'\n'):
- i = int(strindex)
- if i < 0 or i >= len(files):
- return None
- subset.append(files[i])
- return subset
- except (ValueError, IndexError):
- # Perhaps someone had chosen the same key name (e.g. "added") and
- # used different syntax for the value.
- return None
-
-
-def _getsidedata(srcrepo, rev):
- ctx = srcrepo[rev]
- filescopies = computechangesetcopies(ctx)
- filesadded = computechangesetfilesadded(ctx)
- filesremoved = computechangesetfilesremoved(ctx)
- sidedata = {}
- if any([filescopies, filesadded, filesremoved]):
- sortedfiles = sorted(ctx.files())
- p1copies, p2copies = filescopies
- p1copies = encodecopies(sortedfiles, p1copies)
- p2copies = encodecopies(sortedfiles, p2copies)
- filesadded = encodefileindices(sortedfiles, filesadded)
- filesremoved = encodefileindices(sortedfiles, filesremoved)
- if p1copies:
- sidedata[sidedatamod.SD_P1COPIES] = p1copies
- if p2copies:
- sidedata[sidedatamod.SD_P2COPIES] = p2copies
- if filesadded:
- sidedata[sidedatamod.SD_FILESADDED] = filesadded
- if filesremoved:
- sidedata[sidedatamod.SD_FILESREMOVED] = filesremoved
- return sidedata
-
-
-def getsidedataadder(srcrepo, destrepo):
- use_w = srcrepo.ui.configbool(b'experimental', b'worker.repository-upgrade')
- if pycompat.iswindows or not use_w:
- return _get_simple_sidedata_adder(srcrepo, destrepo)
- else:
- return _get_worker_sidedata_adder(srcrepo, destrepo)
-
-
-def _sidedata_worker(srcrepo, revs_queue, sidedata_queue, tokens):
- """The function used by worker precomputing sidedata
-
- It read an input queue containing revision numbers
- It write in an output queue containing (rev, <sidedata-map>)
-
- The `None` input value is used as a stop signal.
-
- The `tokens` semaphore is user to avoid having too many unprocessed
- entries. The workers needs to acquire one token before fetching a task.
- They will be released by the consumer of the produced data.
- """
- tokens.acquire()
- rev = revs_queue.get()
- while rev is not None:
- data = _getsidedata(srcrepo, rev)
- sidedata_queue.put((rev, data))
- tokens.acquire()
- rev = revs_queue.get()
- # processing of `None` is completed, release the token.
- tokens.release()
-
-
-BUFF_PER_WORKER = 50
-
-
-def _get_worker_sidedata_adder(srcrepo, destrepo):
- """The parallel version of the sidedata computation
-
- This code spawn a pool of worker that precompute a buffer of sidedata
- before we actually need them"""
- # avoid circular import copies -> scmutil -> worker -> copies
- from . import worker
-
- nbworkers = worker._numworkers(srcrepo.ui)
-
- tokens = multiprocessing.BoundedSemaphore(nbworkers * BUFF_PER_WORKER)
- revsq = multiprocessing.Queue()
- sidedataq = multiprocessing.Queue()
-
- assert srcrepo.filtername is None
- # queue all tasks beforehand, revision numbers are small and it make
- # synchronisation simpler
- #
- # Since the computation for each node can be quite expensive, the overhead
- # of using a single queue is not revelant. In practice, most computation
- # are fast but some are very expensive and dominate all the other smaller
- # cost.
- for r in srcrepo.changelog.revs():
- revsq.put(r)
- # queue the "no more tasks" markers
- for i in range(nbworkers):
- revsq.put(None)
-
- allworkers = []
- for i in range(nbworkers):
- args = (srcrepo, revsq, sidedataq, tokens)
- w = multiprocessing.Process(target=_sidedata_worker, args=args)
- allworkers.append(w)
- w.start()
-
- # dictionnary to store results for revision higher than we one we are
- # looking for. For example, if we need the sidedatamap for 42, and 43 is
- # received, when shelve 43 for later use.
- staging = {}
-
- def sidedata_companion(revlog, rev):
- sidedata = {}
- if util.safehasattr(revlog, b'filteredrevs'): # this is a changelog
- # Is the data previously shelved ?
- sidedata = staging.pop(rev, None)
- if sidedata is None:
- # look at the queued result until we find the one we are lookig
- # for (shelve the other ones)
- r, sidedata = sidedataq.get()
- while r != rev:
- staging[r] = sidedata
- r, sidedata = sidedataq.get()
- tokens.release()
- return False, (), sidedata
-
- return sidedata_companion
-
-
-def _get_simple_sidedata_adder(srcrepo, destrepo):
- """The simple version of the sidedata computation
-
- It just compute it in the same thread on request"""
-
- def sidedatacompanion(revlog, rev):
- sidedata = {}
- if util.safehasattr(revlog, 'filteredrevs'): # this is a changelog
- sidedata = _getsidedata(srcrepo, rev)
- return False, (), sidedata
-
- return sidedatacompanion
-
-
-def getsidedataremover(srcrepo, destrepo):
- def sidedatacompanion(revlog, rev):
- f = ()
- if util.safehasattr(revlog, 'filteredrevs'): # this is a changelog
- if revlog.flags(rev) & REVIDX_SIDEDATA:
- f = (
- sidedatamod.SD_P1COPIES,
- sidedatamod.SD_P2COPIES,
- sidedatamod.SD_FILESADDED,
- sidedatamod.SD_FILESREMOVED,
- )
- return False, f, {}
-
- return sidedatacompanion