--- a/mercurial/copies.py Fri Nov 29 15:36:45 2019 +0100
+++ b/mercurial/copies.py Sun Sep 29 16:00:32 2019 +0200
@@ -8,6 +8,7 @@
from __future__ import absolute_import
import collections
+import multiprocessing
import os
from .i18n import _
@@ -1007,6 +1008,102 @@
def getsidedataadder(srcrepo, destrepo):
+ use_w = srcrepo.ui.configbool(b'experimental', b'worker.repository-upgrade')
+ if pycompat.iswindows or not use_w:
+ return _get_simple_sidedata_adder(srcrepo, destrepo)
+ else:
+ return _get_worker_sidedata_adder(srcrepo, destrepo)
+
+
+def _sidedata_worker(srcrepo, revs_queue, sidedata_queue, tokens):
+ """The function used by worker precomputing sidedata
+
+ It read an input queue containing revision numbers
+ It write in an output queue containing (rev, <sidedata-map>)
+
+ The `None` input value is used as a stop signal.
+
+ The `tokens` semaphore is user to avoid having too many unprocessed
+ entries. The workers needs to acquire one token before fetching a task.
+ They will be released by the consumer of the produced data.
+ """
+ tokens.acquire()
+ rev = revs_queue.get()
+ while rev is not None:
+ data = _getsidedata(srcrepo, rev)
+ sidedata_queue.put((rev, data))
+ tokens.acquire()
+ rev = revs_queue.get()
+ # processing of `None` is completed, release the token.
+ tokens.release()
+
+
+BUFF_PER_WORKER = 50
+
+
+def _get_worker_sidedata_adder(srcrepo, destrepo):
+ """The parallel version of the sidedata computation
+
+ This code spawn a pool of worker that precompute a buffer of sidedata
+ before we actually need them"""
+ # avoid circular import copies -> scmutil -> worker -> copies
+ from . import worker
+
+ nbworkers = worker._numworkers(srcrepo.ui)
+
+ tokens = multiprocessing.BoundedSemaphore(nbworkers * BUFF_PER_WORKER)
+ revsq = multiprocessing.Queue()
+ sidedataq = multiprocessing.Queue()
+
+ assert srcrepo.filtername is None
+ # queue all tasks beforehand, revision numbers are small and it make
+ # synchronisation simpler
+ #
+ # Since the computation for each node can be quite expensive, the overhead
+ # of using a single queue is not revelant. In practice, most computation
+ # are fast but some are very expensive and dominate all the other smaller
+ # cost.
+ for r in srcrepo.changelog.revs():
+ revsq.put(r)
+ # queue the "no more tasks" markers
+ for i in range(nbworkers):
+ revsq.put(None)
+
+ allworkers = []
+ for i in range(nbworkers):
+ args = (srcrepo, revsq, sidedataq, tokens)
+ w = multiprocessing.Process(target=_sidedata_worker, args=args)
+ allworkers.append(w)
+ w.start()
+
+ # dictionnary to store results for revision higher than we one we are
+ # looking for. For example, if we need the sidedatamap for 42, and 43 is
+ # received, when shelve 43 for later use.
+ staging = {}
+
+ def sidedata_companion(revlog, rev):
+ sidedata = {}
+ if util.safehasattr(revlog, b'filteredrevs'): # this is a changelog
+ # Is the data previously shelved ?
+ sidedata = staging.pop(rev, None)
+ if sidedata is None:
+ # look at the queued result until we find the one we are lookig
+ # for (shelve the other ones)
+ r, sidedata = sidedataq.get()
+ while r != rev:
+ staging[r] = sidedata
+ r, sidedata = sidedataq.get()
+ tokens.release()
+ return False, (), sidedata
+
+ return sidedata_companion
+
+
+def _get_simple_sidedata_adder(srcrepo, destrepo):
+ """The simple version of the sidedata computation
+
+ It just compute it in the same thread on request"""
+
def sidedatacompanion(revlog, rev):
sidedata = {}
if util.safehasattr(revlog, 'filteredrevs'): # this is a changelog