282 _prefetchfiles(repo, workqueue, basepaths) |
282 _prefetchfiles(repo, workqueue, basepaths) |
283 |
283 |
284 # There are no data dependencies between the workers fixing each file |
284 # There are no data dependencies between the workers fixing each file |
285 # revision, so we can use all available parallelism. |
285 # revision, so we can use all available parallelism. |
286 def getfixes(items): |
286 def getfixes(items): |
287 for rev, path in items: |
287 for srcrev, path, dstrevs in items: |
288 ctx = repo[rev] |
288 ctx = repo[srcrev] |
289 olddata = ctx[path].data() |
289 olddata = ctx[path].data() |
290 metadata, newdata = fixfile( |
290 metadata, newdata = fixfile( |
291 ui, repo, opts, fixers, ctx, path, basepaths, basectxs[rev] |
291 ui, |
|
292 repo, |
|
293 opts, |
|
294 fixers, |
|
295 ctx, |
|
296 path, |
|
297 basepaths, |
|
298 basectxs[srcrev], |
292 ) |
299 ) |
293 # Don't waste memory/time passing unchanged content back, but |
300 # We ungroup the work items now, because the code that consumes |
294 # produce one result per item either way. |
301 # these results has to handle each dstrev separately, and in |
295 yield ( |
302 # topological order. Because these are handled in topological |
296 rev, |
303 # order, it's important that we pass around references to |
297 path, |
304 # "newdata" instead of copying it. Otherwise, we would be |
298 metadata, |
305 # keeping more copies of file content in memory at a time than |
299 newdata if newdata != olddata else None, |
306 # if we hadn't bothered to group/deduplicate the work items. |
300 ) |
307 data = newdata if newdata != olddata else None |
|
308 for dstrev in dstrevs: |
|
309 yield (dstrev, path, metadata, data) |
301 |
310 |
302 results = worker.worker( |
311 results = worker.worker( |
303 ui, 1.0, getfixes, tuple(), workqueue, threadsafe=False |
312 ui, 1.0, getfixes, tuple(), workqueue, threadsafe=False |
304 ) |
313 ) |
305 |
314 |
375 } |
384 } |
376 scmutil.cleanupnodes(repo, replacements, b'fix', fixphase=True) |
385 scmutil.cleanupnodes(repo, replacements, b'fix', fixphase=True) |
377 |
386 |
378 |
387 |
379 def getworkqueue(ui, repo, pats, opts, revstofix, basectxs): |
388 def getworkqueue(ui, repo, pats, opts, revstofix, basectxs): |
380 """Constructs the list of files to be fixed at specific revisions |
389 """Constructs a list of files to fix and which revisions each fix applies to |
381 |
390 |
382 It is up to the caller how to consume the work items, and the only |
391 To avoid duplicating work, there is usually only one work item for each file |
383 dependence between them is that replacement revisions must be committed in |
392 revision that might need to be fixed. There can be multiple work items per |
384 topological order. Each work item represents a file in the working copy or |
393 file revision if the same file needs to be fixed in multiple changesets with |
385 in some revision that should be fixed and written back to the working copy |
394 different baserevs. Each work item also contains a list of changesets where |
386 or into a replacement revision. |
395 the file's data should be replaced with the fixed data. The work items for |
387 |
396 earlier changesets come earlier in the work queue, to improve pipelining by |
388 Work items for the same revision are grouped together, so that a worker |
397 allowing the first changeset to be replaced while fixes are still being |
389 pool starting with the first N items in parallel is likely to finish the |
398 computed for later changesets. |
390 first revision's work before other revisions. This can allow us to write |
399 |
391 the result to disk and reduce memory footprint. At time of writing, the |
400 Also returned is a map from changesets to the count of work items that might |
392 partition strategy in worker.py seems favorable to this. We also sort the |
401 affect each changeset. This is used later to count when all of a changeset's |
393 items by ascending revision number to match the order in which we commit |
402 work items have been finished, without having to inspect the remaining work |
394 the fixes later. |
403 queue in each worker subprocess. |
395 """ |
404 |
396 workqueue = [] |
405 The example work item (1, "foo/bar.txt", (1, 2, 3)) means that the data of |
|
406 bar.txt should be read from revision 1, then fixed, and written back to |
|
407 revisions 1, 2 and 3. Revision 1 is called the "srcrev" and the list of |
|
408 revisions is called the "dstrevs". In practice the srcrev is always one of |
|
409 the dstrevs, and we make that choice when constructing the work item so that |
|
410 the choice can't be made inconsistently later on. The dstrevs should all |
|
411 have the same file revision for the given path, so the choice of srcrev is |
|
412 arbitrary. The wdirrev can be a dstrev and a srcrev. |
|
413 """ |
|
414 dstrevmap = collections.defaultdict(list) |
397 numitems = collections.defaultdict(int) |
415 numitems = collections.defaultdict(int) |
398 maxfilesize = ui.configbytes(b'fix', b'maxfilesize') |
416 maxfilesize = ui.configbytes(b'fix', b'maxfilesize') |
399 for rev in sorted(revstofix): |
417 for rev in sorted(revstofix): |
400 fixctx = repo[rev] |
418 fixctx = repo[rev] |
401 match = scmutil.match(fixctx, pats, opts) |
419 match = scmutil.match(fixctx, pats, opts) |
409 ui.warn( |
427 ui.warn( |
410 _(b'ignoring file larger than %s: %s\n') |
428 _(b'ignoring file larger than %s: %s\n') |
411 % (util.bytecount(maxfilesize), path) |
429 % (util.bytecount(maxfilesize), path) |
412 ) |
430 ) |
413 continue |
431 continue |
414 workqueue.append((rev, path)) |
432 baserevs = tuple(ctx.rev() for ctx in basectxs[rev]) |
|
433 dstrevmap[(fctx.filerev(), baserevs, path)].append(rev) |
415 numitems[rev] += 1 |
434 numitems[rev] += 1 |
|
435 workqueue = [ |
|
436 (min(dstrevs), path, dstrevs) |
|
437 for (filerev, baserevs, path), dstrevs in dstrevmap.items() |
|
438 ] |
|
439 # Move work items for earlier changesets to the front of the queue, so we |
|
440 # might be able to replace those changesets (in topological order) while |
|
441 # we're still processing later work items. Note the min() in the previous |
|
442 # expression, which means we don't need a custom comparator here. The path |
|
443 # is also important in the sort order to make the output order stable. There |
|
444 # are some situations where this doesn't help much, but some situations |
|
445 # where it lets us buffer O(1) files instead of O(n) files. |
|
446 workqueue.sort() |
416 return workqueue, numitems |
447 return workqueue, numitems |
417 |
448 |
418 |
449 |
419 def getrevstofix(ui, repo, opts): |
450 def getrevstofix(ui, repo, opts): |
420 """Returns the set of revision numbers that should be fixed""" |
451 """Returns the set of revision numbers that should be fixed""" |
515 if opts.get(b'whole'): |
546 if opts.get(b'whole'): |
516 # Base paths will never be fetched for line range determination. |
547 # Base paths will never be fetched for line range determination. |
517 return {} |
548 return {} |
518 |
549 |
519 basepaths = {} |
550 basepaths = {} |
520 for rev, path in workqueue: |
551 for srcrev, path, _dstrevs in workqueue: |
521 fixctx = repo[rev] |
552 fixctx = repo[srcrev] |
522 for basectx in basectxs[rev]: |
553 for basectx in basectxs[srcrev]: |
523 basepath = copies.pathcopies(basectx, fixctx).get(path, path) |
554 basepath = copies.pathcopies(basectx, fixctx).get(path, path) |
524 if basepath in basectx: |
555 if basepath in basectx: |
525 basepaths[(basectx.rev(), fixctx.rev(), path)] = basepath |
556 basepaths[(basectx.rev(), fixctx.rev(), path)] = basepath |
526 return basepaths |
557 return basepaths |
527 |
558 |
640 |
671 |
641 def _prefetchfiles(repo, workqueue, basepaths): |
672 def _prefetchfiles(repo, workqueue, basepaths): |
642 toprefetch = set() |
673 toprefetch = set() |
643 |
674 |
644 # Prefetch the files that will be fixed. |
675 # Prefetch the files that will be fixed. |
645 for rev, path in workqueue: |
676 for srcrev, path, _dstrevs in workqueue: |
646 if rev == wdirrev: |
677 if srcrev == wdirrev: |
647 continue |
678 continue |
648 toprefetch.add((rev, path)) |
679 toprefetch.add((srcrev, path)) |
649 |
680 |
650 # Prefetch the base contents for lineranges(). |
681 # Prefetch the base contents for lineranges(). |
651 for (baserev, fixrev, path), basepath in basepaths.items(): |
682 for (baserev, fixrev, path), basepath in basepaths.items(): |
652 toprefetch.add((baserev, basepath)) |
683 toprefetch.add((baserev, basepath)) |
653 |
684 |