--- a/hgext/fix.py Tue Jun 26 15:27:29 2018 -0700
+++ b/hgext/fix.py Tue Jun 26 15:30:49 2018 -0700
@@ -70,6 +70,7 @@
registrar,
scmutil,
util,
+ worker,
)
# Note for extension authors: ONLY specify testedwith = 'ships-with-hg-core' for
@@ -138,19 +139,40 @@
basectxs = getbasectxs(repo, opts, revstofix)
workqueue, numitems = getworkqueue(ui, repo, pats, opts, revstofix,
basectxs)
+ fixers = getfixers(ui)
+
+ # There are no data dependencies between the workers fixing each file
+ # revision, so we can use all available parallelism.
+ def getfixes(items):
+ for rev, path in items:
+ ctx = repo[rev]
+ olddata = ctx[path].data()
+ newdata = fixfile(ui, opts, fixers, ctx, path, basectxs[rev])
+ # Don't waste memory/time passing unchanged content back, but
+ # produce one result per item either way.
+ yield (rev, path, newdata if newdata != olddata else None)
+ results = worker.worker(ui, 1.0, getfixes, tuple(), workqueue)
+
+ # We have to hold on to the data for each successor revision in memory
+ # until all its parents are committed. We ensure this by committing and
+ # freeing memory for the revisions in some topological order. This
+ # leaves a little bit of memory efficiency on the table, but also makes
+ # the tests deterministic. It might also be considered a feature since
+ # it makes the results more easily reproducible.
filedata = collections.defaultdict(dict)
replacements = {}
- fixers = getfixers(ui)
- # Some day this loop can become a worker pool, but for now it's easier
- # to fix everything serially in topological order.
- for rev, path in sorted(workqueue):
- ctx = repo[rev]
- olddata = ctx[path].data()
- newdata = fixfile(ui, opts, fixers, ctx, path, basectxs[rev])
- if newdata != olddata:
+ commitorder = sorted(revstofix, reverse=True)
+ for rev, path, newdata in results:
+ if newdata is not None:
filedata[rev][path] = newdata
numitems[rev] -= 1
- if not numitems[rev]:
+ # Apply the fixes for this and any other revisions that are ready
+ # and sitting at the front of the queue. Using a loop here prevents
+ # the queue from being blocked by the first revision to be ready out
+ # of order.
+ while commitorder and not numitems[commitorder[-1]]:
+ rev = commitorder.pop()
+ ctx = repo[rev]
if rev == wdirrev:
writeworkingdir(repo, ctx, filedata[rev], replacements)
else:
@@ -168,11 +190,19 @@
topological order. Each work item represents a file in the working copy or
in some revision that should be fixed and written back to the working copy
or into a replacement revision.
+
+ Work items for the same revision are grouped together, so that a worker
+ pool starting with the first N items in parallel is likely to finish the
+ first revision's work before other revisions. This can allow us to write
+ the result to disk and reduce memory footprint. At time of writing, the
+ partition strategy in worker.py seems favorable to this. We also sort the
+ items by ascending revision number to match the order in which we commit
+ the fixes later.
"""
workqueue = []
numitems = collections.defaultdict(int)
maxfilesize = ui.configbytes('fix', 'maxfilesize')
- for rev in revstofix:
+ for rev in sorted(revstofix):
fixctx = repo[rev]
match = scmutil.match(fixctx, pats, opts)
for path in pathstofix(ui, repo, pats, opts, match, basectxs[rev],