--- a/hgext/fix.py Thu Sep 02 14:07:55 2021 -0700
+++ b/hgext/fix.py Thu Sep 02 14:08:45 2021 -0700
@@ -284,20 +284,29 @@
# There are no data dependencies between the workers fixing each file
# revision, so we can use all available parallelism.
def getfixes(items):
- for rev, path in items:
- ctx = repo[rev]
+ for srcrev, path, dstrevs in items:
+ ctx = repo[srcrev]
olddata = ctx[path].data()
metadata, newdata = fixfile(
- ui, repo, opts, fixers, ctx, path, basepaths, basectxs[rev]
+ ui,
+ repo,
+ opts,
+ fixers,
+ ctx,
+ path,
+ basepaths,
+ basectxs[srcrev],
)
- # Don't waste memory/time passing unchanged content back, but
- # produce one result per item either way.
- yield (
- rev,
- path,
- metadata,
- newdata if newdata != olddata else None,
- )
+ # We ungroup the work items now, because the code that consumes
+ # these results has to handle each dstrev separately, and in
+ # topological order. Because these are handled in topological
+ # order, it's important that we pass around references to
+ # "newdata" instead of copying it. Otherwise, we would be
+ # keeping more copies of file content in memory at a time than
+ # if we hadn't bothered to group/deduplicate the work items.
+ data = newdata if newdata != olddata else None
+ for dstrev in dstrevs:
+ yield (dstrev, path, metadata, data)
results = worker.worker(
ui, 1.0, getfixes, tuple(), workqueue, threadsafe=False
@@ -377,23 +386,32 @@
def getworkqueue(ui, repo, pats, opts, revstofix, basectxs):
- """Constructs the list of files to be fixed at specific revisions
+ """Constructs a list of files to fix and which revisions each fix applies to
- It is up to the caller how to consume the work items, and the only
- dependence between them is that replacement revisions must be committed in
- topological order. Each work item represents a file in the working copy or
- in some revision that should be fixed and written back to the working copy
- or into a replacement revision.
+ To avoid duplicating work, there is usually only one work item for each file
+ revision that might need to be fixed. There can be multiple work items per
+ file revision if the same file needs to be fixed in multiple changesets with
+ different baserevs. Each work item also contains a list of changesets where
+ the file's data should be replaced with the fixed data. The work items for
+ earlier changesets come earlier in the work queue, to improve pipelining by
+ allowing the first changeset to be replaced while fixes are still being
+ computed for later changesets.
- Work items for the same revision are grouped together, so that a worker
- pool starting with the first N items in parallel is likely to finish the
- first revision's work before other revisions. This can allow us to write
- the result to disk and reduce memory footprint. At time of writing, the
- partition strategy in worker.py seems favorable to this. We also sort the
- items by ascending revision number to match the order in which we commit
- the fixes later.
+ Also returned is a map from changesets to the count of work items that might
+ affect each changeset. This is used later to count when all of a changeset's
+ work items have been finished, without having to inspect the remaining work
+ queue in each worker subprocess.
+
+ The example work item (1, "foo/bar.txt", (1, 2, 3)) means that the data of
+ bar.txt should be read from revision 1, then fixed, and written back to
+ revisions 1, 2 and 3. Revision 1 is called the "srcrev" and the list of
+ revisions is called the "dstrevs". In practice the srcrev is always one of
+ the dstrevs, and we make that choice when constructing the work item so that
+ the choice can't be made inconsistently later on. The dstrevs should all
+ have the same file revision for the given path, so the choice of srcrev is
+ arbitrary. The wdirrev can be a dstrev and a srcrev.
"""
- workqueue = []
+ dstrevmap = collections.defaultdict(list)
numitems = collections.defaultdict(int)
maxfilesize = ui.configbytes(b'fix', b'maxfilesize')
for rev in sorted(revstofix):
@@ -411,8 +429,21 @@
% (util.bytecount(maxfilesize), path)
)
continue
- workqueue.append((rev, path))
+ baserevs = tuple(ctx.rev() for ctx in basectxs[rev])
+ dstrevmap[(fctx.filerev(), baserevs, path)].append(rev)
numitems[rev] += 1
+ workqueue = [
+ (min(dstrevs), path, dstrevs)
+ for (filerev, baserevs, path), dstrevs in dstrevmap.items()
+ ]
+ # Move work items for earlier changesets to the front of the queue, so we
+ # might be able to replace those changesets (in topological order) while
+ # we're still processing later work items. Note the min() in the previous
+ # expression, which means we don't need a custom comparator here. The path
+ # is also important in the sort order to make the output order stable. There
+ # are some situations where this doesn't help much, but some situations
+ # where it lets us buffer O(1) files instead of O(n) files.
+ workqueue.sort()
return workqueue, numitems
@@ -517,9 +548,9 @@
return {}
basepaths = {}
- for rev, path in workqueue:
- fixctx = repo[rev]
- for basectx in basectxs[rev]:
+ for srcrev, path, _dstrevs in workqueue:
+ fixctx = repo[srcrev]
+ for basectx in basectxs[srcrev]:
basepath = copies.pathcopies(basectx, fixctx).get(path, path)
if basepath in basectx:
basepaths[(basectx.rev(), fixctx.rev(), path)] = basepath
@@ -642,10 +673,10 @@
toprefetch = set()
# Prefetch the files that will be fixed.
- for rev, path in workqueue:
- if rev == wdirrev:
+ for srcrev, path, _dstrevs in workqueue:
+ if srcrev == wdirrev:
continue
- toprefetch.add((rev, path))
+ toprefetch.add((srcrev, path))
# Prefetch the base contents for lineranges().
for (baserev, fixrev, path), basepath in basepaths.items():