comparison hgext/fix.py @ 38536:5ffe2041d427

fix: use a worker pool to parallelize running tools This is important for usability when tools are slow or numerous. Differential Revision: https://phab.mercurial-scm.org/D3846
author Danny Hooper <hooper@google.com>
date Tue, 26 Jun 2018 15:30:49 -0700
parents 32fba6fe893d
children a3be09e277e9
comparison
equal deleted inserted replaced
38535:8c38d2948217 38536:5ffe2041d427
68 obsolete, 68 obsolete,
69 pycompat, 69 pycompat,
70 registrar, 70 registrar,
71 scmutil, 71 scmutil,
72 util, 72 util,
73 worker,
73 ) 74 )
74 75
75 # Note for extension authors: ONLY specify testedwith = 'ships-with-hg-core' for 76 # Note for extension authors: ONLY specify testedwith = 'ships-with-hg-core' for
76 # extensions which SHIP WITH MERCURIAL. Non-mainline extensions should 77 # extensions which SHIP WITH MERCURIAL. Non-mainline extensions should
77 # be specifying the version(s) of Mercurial they are tested with, or 78 # be specifying the version(s) of Mercurial they are tested with, or
136 with repo.wlock(), repo.lock(), repo.transaction('fix'): 137 with repo.wlock(), repo.lock(), repo.transaction('fix'):
137 revstofix = getrevstofix(ui, repo, opts) 138 revstofix = getrevstofix(ui, repo, opts)
138 basectxs = getbasectxs(repo, opts, revstofix) 139 basectxs = getbasectxs(repo, opts, revstofix)
139 workqueue, numitems = getworkqueue(ui, repo, pats, opts, revstofix, 140 workqueue, numitems = getworkqueue(ui, repo, pats, opts, revstofix,
140 basectxs) 141 basectxs)
142 fixers = getfixers(ui)
143
144 # There are no data dependencies between the workers fixing each file
145 # revision, so we can use all available parallelism.
146 def getfixes(items):
147 for rev, path in items:
148 ctx = repo[rev]
149 olddata = ctx[path].data()
150 newdata = fixfile(ui, opts, fixers, ctx, path, basectxs[rev])
151 # Don't waste memory/time passing unchanged content back, but
152 # produce one result per item either way.
153 yield (rev, path, newdata if newdata != olddata else None)
154 results = worker.worker(ui, 1.0, getfixes, tuple(), workqueue)
155
156 # We have to hold on to the data for each successor revision in memory
157 # until all its parents are committed. We ensure this by committing and
158 # freeing memory for the revisions in some topological order. This
159 # leaves a little bit of memory efficiency on the table, but also makes
160 # the tests deterministic. It might also be considered a feature since
161 # it makes the results more easily reproducible.
141 filedata = collections.defaultdict(dict) 162 filedata = collections.defaultdict(dict)
142 replacements = {} 163 replacements = {}
143 fixers = getfixers(ui) 164 commitorder = sorted(revstofix, reverse=True)
144 # Some day this loop can become a worker pool, but for now it's easier 165 for rev, path, newdata in results:
145 # to fix everything serially in topological order. 166 if newdata is not None:
146 for rev, path in sorted(workqueue):
147 ctx = repo[rev]
148 olddata = ctx[path].data()
149 newdata = fixfile(ui, opts, fixers, ctx, path, basectxs[rev])
150 if newdata != olddata:
151 filedata[rev][path] = newdata 167 filedata[rev][path] = newdata
152 numitems[rev] -= 1 168 numitems[rev] -= 1
153 if not numitems[rev]: 169 # Apply the fixes for this and any other revisions that are ready
170 # and sitting at the front of the queue. Using a loop here prevents
171 # the queue from being blocked by the first revision to be ready out
172 # of order.
173 while commitorder and not numitems[commitorder[-1]]:
174 rev = commitorder.pop()
175 ctx = repo[rev]
154 if rev == wdirrev: 176 if rev == wdirrev:
155 writeworkingdir(repo, ctx, filedata[rev], replacements) 177 writeworkingdir(repo, ctx, filedata[rev], replacements)
156 else: 178 else:
157 replacerev(ui, repo, ctx, filedata[rev], replacements) 179 replacerev(ui, repo, ctx, filedata[rev], replacements)
158 del filedata[rev] 180 del filedata[rev]
166 It is up to the caller how to consume the work items, and the only 188 It is up to the caller how to consume the work items, and the only
167 dependence between them is that replacement revisions must be committed in 189 dependence between them is that replacement revisions must be committed in
168 topological order. Each work item represents a file in the working copy or 190 topological order. Each work item represents a file in the working copy or
169 in some revision that should be fixed and written back to the working copy 191 in some revision that should be fixed and written back to the working copy
170 or into a replacement revision. 192 or into a replacement revision.
193
194 Work items for the same revision are grouped together, so that a worker
195 pool starting with the first N items in parallel is likely to finish the
196 first revision's work before other revisions. This can allow us to write
197 the result to disk and reduce memory footprint. At time of writing, the
198 partition strategy in worker.py seems favorable to this. We also sort the
199 items by ascending revision number to match the order in which we commit
200 the fixes later.
171 """ 201 """
172 workqueue = [] 202 workqueue = []
173 numitems = collections.defaultdict(int) 203 numitems = collections.defaultdict(int)
174 maxfilesize = ui.configbytes('fix', 'maxfilesize') 204 maxfilesize = ui.configbytes('fix', 'maxfilesize')
175 for rev in revstofix: 205 for rev in sorted(revstofix):
176 fixctx = repo[rev] 206 fixctx = repo[rev]
177 match = scmutil.match(fixctx, pats, opts) 207 match = scmutil.match(fixctx, pats, opts)
178 for path in pathstofix(ui, repo, pats, opts, match, basectxs[rev], 208 for path in pathstofix(ui, repo, pats, opts, match, basectxs[rev],
179 fixctx): 209 fixctx):
180 if path not in fixctx: 210 if path not in fixctx: