comparison mercurial/exchangev2.py @ 40179:b843356d4ae1

exchangev2: use filesdata filesdata is a more efficient mechanism for bulk fetching files data for a range of changesets. Let's use it in exchangev2. With this change, a client performing a full clone of mozilla-unified transmits substantially fewer bytes across the wire: before: 139,124,863 bytes sent after: 20,522,499 bytes sent The bulk of the remaining bytes is likely the transfer of ~1M nodes for changesets and manifests. We can eliminate this by making requests in terms of node ranges instead of explicit node lists... Differential Revision: https://phab.mercurial-scm.org/D4982
author Gregory Szorc <gregory.szorc@gmail.com>
date Wed, 03 Oct 2018 13:57:42 -0700
parents 41263df08109
children 55836a34f41b
comparison
equal deleted inserted replaced
40178:46a40bce3ae0 40179:b843356d4ae1
62 manres = _fetchmanifests(repo, tr, remote, csetres['manifestnodes']) 62 manres = _fetchmanifests(repo, tr, remote, csetres['manifestnodes'])
63 63
64 # Find all file nodes referenced by added manifests and fetch those 64 # Find all file nodes referenced by added manifests and fetch those
65 # revisions. 65 # revisions.
66 fnodes = _derivefilesfrommanifests(repo, manres['added']) 66 fnodes = _derivefilesfrommanifests(repo, manres['added'])
67 _fetchfiles(repo, tr, remote, fnodes, manres['linkrevs']) 67 _fetchfilesfromcsets(repo, tr, remote, fnodes, csetres['added'],
68 manres['linkrevs'])
68 69
69 def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True): 70 def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True):
70 """Determine which changesets need to be pulled.""" 71 """Determine which changesets need to be pulled."""
71 72
72 if heads: 73 if heads:
344 progress.increment() 345 progress.increment()
345 346
346 return fnodes 347 return fnodes
347 348
348 def _fetchfiles(repo, tr, remote, fnodes, linkrevs): 349 def _fetchfiles(repo, tr, remote, fnodes, linkrevs):
350 """Fetch file data from explicit file revisions."""
349 def iterrevisions(objs, progress): 351 def iterrevisions(objs, progress):
350 for filerevision in objs: 352 for filerevision in objs:
351 node = filerevision[b'node'] 353 node = filerevision[b'node']
352 354
353 extrafields = {} 355 extrafields = {}
416 store = repo.file(path) 418 store = repo.file(path)
417 store.addgroup( 419 store.addgroup(
418 iterrevisions(objs, progress), 420 iterrevisions(objs, progress),
419 locallinkrevs[path].__getitem__, 421 locallinkrevs[path].__getitem__,
420 weakref.proxy(tr)) 422 weakref.proxy(tr))
423
424 def _fetchfilesfromcsets(repo, tr, remote, fnodes, csets, manlinkrevs):
425 """Fetch file data from explicit changeset revisions."""
426
427 def iterrevisions(objs, remaining, progress):
428 while remaining:
429 filerevision = next(objs)
430
431 node = filerevision[b'node']
432
433 extrafields = {}
434
435 for field, size in filerevision.get(b'fieldsfollowing', []):
436 extrafields[field] = next(objs)
437
438 if b'delta' in extrafields:
439 basenode = filerevision[b'deltabasenode']
440 delta = extrafields[b'delta']
441 elif b'revision' in extrafields:
442 basenode = nullid
443 revision = extrafields[b'revision']
444 delta = mdiff.trivialdiffheader(len(revision)) + revision
445 else:
446 continue
447
448 yield (
449 node,
450 filerevision[b'parents'][0],
451 filerevision[b'parents'][1],
452 node,
453 basenode,
454 delta,
455 # Flags not yet supported.
456 0,
457 )
458
459 progress.increment()
460 remaining -= 1
461
462 progress = repo.ui.makeprogress(
463 _('files'), unit=_('chunks'),
464 total=sum(len(v) for v in fnodes.itervalues()))
465
466 commandmeta = remote.apidescriptor[b'commands'][b'filesdata']
467 batchsize = commandmeta.get(b'recommendedbatchsize', 50000)
468
469 for i in pycompat.xrange(0, len(csets), batchsize):
470 batch = [x for x in csets[i:i + batchsize]]
471 if not batch:
472 continue
473
474 with remote.commandexecutor() as e:
475 args = {
476 b'revisions': [{
477 b'type': b'changesetexplicit',
478 b'nodes': batch,
479 }],
480 b'fields': {b'parents', b'revision'},
481 b'haveparents': True,
482 }
483
484 objs = e.callcommand(b'filesdata', args).result()
485
486 # First object is an overall header.
487 overall = next(objs)
488
489 # We have overall['totalpaths'] segments.
490 for i in pycompat.xrange(overall[b'totalpaths']):
491 header = next(objs)
492
493 path = header[b'path']
494 store = repo.file(path)
495
496 linkrevs = {
497 fnode: manlinkrevs[mnode]
498 for fnode, mnode in fnodes[path].iteritems()}
499
500 store.addgroup(iterrevisions(objs, header[b'totalitems'],
501 progress),
502 linkrevs.__getitem__,
503 weakref.proxy(tr))