fix: use a worker pool to parallelize running tools
authorDanny Hooper <hooper@google.com>
Tue, 26 Jun 2018 15:30:49 -0700
changeset 38536 5ffe2041d427
parent 38535 8c38d2948217
child 38537 a3be09e277e9
fix: use a worker pool to parallelize running tools This is important for usability when tools are slow or numerous. Differential Revision: https://phab.mercurial-scm.org/D3846
hgext/fix.py
--- a/hgext/fix.py	Tue Jun 26 15:27:29 2018 -0700
+++ b/hgext/fix.py	Tue Jun 26 15:30:49 2018 -0700
@@ -70,6 +70,7 @@
     registrar,
     scmutil,
     util,
+    worker,
 )
 
 # Note for extension authors: ONLY specify testedwith = 'ships-with-hg-core' for
@@ -138,19 +139,40 @@
         basectxs = getbasectxs(repo, opts, revstofix)
         workqueue, numitems = getworkqueue(ui, repo, pats, opts, revstofix,
                                            basectxs)
+        fixers = getfixers(ui)
+
+        # There are no data dependencies between the workers fixing each file
+        # revision, so we can use all available parallelism.
+        def getfixes(items):
+            for rev, path in items:
+                ctx = repo[rev]
+                olddata = ctx[path].data()
+                newdata = fixfile(ui, opts, fixers, ctx, path, basectxs[rev])
+                # Don't waste memory/time passing unchanged content back, but
+                # produce one result per item either way.
+                yield (rev, path, newdata if newdata != olddata else None)
+        results = worker.worker(ui, 1.0, getfixes, tuple(), workqueue)
+
+        # We have to hold on to the data for each successor revision in memory
+        # until all its parents are committed. We ensure this by committing and
+        # freeing memory for the revisions in some topological order. This
+        # leaves a little bit of memory efficiency on the table, but also makes
+        # the tests deterministic. It might also be considered a feature since
+        # it makes the results more easily reproducible.
         filedata = collections.defaultdict(dict)
         replacements = {}
-        fixers = getfixers(ui)
-        # Some day this loop can become a worker pool, but for now it's easier
-        # to fix everything serially in topological order.
-        for rev, path in sorted(workqueue):
-            ctx = repo[rev]
-            olddata = ctx[path].data()
-            newdata = fixfile(ui, opts, fixers, ctx, path, basectxs[rev])
-            if newdata != olddata:
+        commitorder = sorted(revstofix, reverse=True)
+        for rev, path, newdata in results:
+            if newdata is not None:
                 filedata[rev][path] = newdata
             numitems[rev] -= 1
-            if not numitems[rev]:
+            # Apply the fixes for this and any other revisions that are ready
+            # and sitting at the front of the queue. Using a loop here prevents
+            # the queue from being blocked by the first revision to be ready out
+            # of order.
+            while commitorder and not numitems[commitorder[-1]]:
+                rev = commitorder.pop()
+                ctx = repo[rev]
                 if rev == wdirrev:
                     writeworkingdir(repo, ctx, filedata[rev], replacements)
                 else:
@@ -168,11 +190,19 @@
     topological order. Each work item represents a file in the working copy or
     in some revision that should be fixed and written back to the working copy
     or into a replacement revision.
+
+    Work items for the same revision are grouped together, so that a worker
+    pool starting with the first N items in parallel is likely to finish the
+    first revision's work before other revisions. This can allow us to write
+    the result to disk and reduce memory footprint. At time of writing, the
+    partition strategy in worker.py seems favorable to this. We also sort the
+    items by ascending revision number to match the order in which we commit
+    the fixes later.
     """
     workqueue = []
     numitems = collections.defaultdict(int)
     maxfilesize = ui.configbytes('fix', 'maxfilesize')
-    for rev in revstofix:
+    for rev in sorted(revstofix):
         fixctx = repo[rev]
         match = scmutil.match(fixctx, pats, opts)
         for path in pathstofix(ui, repo, pats, opts, match, basectxs[rev],