--- a/mercurial/exchangev2.py Wed Sep 05 09:10:17 2018 -0700
+++ b/mercurial/exchangev2.py Tue Sep 04 10:42:24 2018 -0700
@@ -7,6 +7,7 @@
from __future__ import absolute_import
+import collections
import weakref
from .i18n import _
@@ -58,7 +59,12 @@
remote.url(), pullop.gettransaction,
explicit=pullop.explicitbookmarks)
- _fetchmanifests(repo, tr, remote, csetres['manifestnodes'])
+ manres = _fetchmanifests(repo, tr, remote, csetres['manifestnodes'])
+
+ # Find all file nodes referenced by added manifests and fetch those
+ # revisions.
+ fnodes = _derivefilesfrommanifests(repo, manres['added'])
+ _fetchfiles(repo, tr, remote, fnodes, manres['linkrevs'])
def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True):
"""Determine which changesets need to be pulled."""
@@ -291,4 +297,98 @@
return {
'added': added,
+ 'linkrevs': linkrevs,
}
+
+def _derivefilesfrommanifests(repo, manifestnodes):
+ """Determine what file nodes are relevant given a set of manifest nodes.
+
+ Returns a dict mapping file paths to dicts of file node to first manifest
+ node.
+ """
+ ml = repo.manifestlog
+ fnodes = collections.defaultdict(dict)
+
+ for manifestnode in manifestnodes:
+ m = ml.get(b'', manifestnode)
+
+ # TODO this will pull in unwanted nodes because it takes the storage
+ # delta into consideration. What we really want is something that takes
+ # the delta between the manifest's parents. And ideally we would
+ # ignore file nodes that are known locally. For now, ignore both
+ # these limitations. This will result in incremental fetches requesting
+ # data we already have. So this is far from ideal.
+ md = m.readfast()
+
+ for path, fnode in md.items():
+ fnodes[path].setdefault(fnode, manifestnode)
+
+ return fnodes
+
+def _fetchfiles(repo, tr, remote, fnodes, linkrevs):
+ def iterrevisions(objs, progress):
+ for filerevision in objs:
+ node = filerevision[b'node']
+
+ if b'deltasize' in filerevision:
+ basenode = filerevision[b'deltabasenode']
+ delta = next(objs)
+ elif b'revisionsize' in filerevision:
+ basenode = nullid
+ revision = next(objs)
+ delta = mdiff.trivialdiffheader(len(revision)) + revision
+ else:
+ continue
+
+ yield (
+ node,
+ filerevision[b'parents'][0],
+ filerevision[b'parents'][1],
+ node,
+ basenode,
+ delta,
+ # Flags not yet supported.
+ 0,
+ )
+
+ progress.increment()
+
+ progress = repo.ui.makeprogress(
+ _('files'), unit=_('chunks'),
+ total=sum(len(v) for v in fnodes.itervalues()))
+
+ # TODO make batch size configurable
+ batchsize = 10000
+ fnodeslist = [x for x in sorted(fnodes.items())]
+
+ for i in pycompat.xrange(0, len(fnodeslist), batchsize):
+ batch = [x for x in fnodeslist[i:i + batchsize]]
+ if not batch:
+ continue
+
+ with remote.commandexecutor() as e:
+ fs = []
+ locallinkrevs = {}
+
+ for path, nodes in batch:
+ fs.append((path, e.callcommand(b'filedata', {
+ b'path': path,
+ b'nodes': sorted(nodes),
+ b'fields': {b'parents', b'revision'}
+ })))
+
+ locallinkrevs[path] = {
+ node: linkrevs[manifestnode]
+ for node, manifestnode in nodes.iteritems()}
+
+ for path, f in fs:
+ objs = f.result()
+
+ # Chomp off header objects.
+ next(objs)
+
+ store = repo.file(path)
+ store.addgroup(
+ iterrevisions(objs, progress),
+ locallinkrevs[path].__getitem__,
+ weakref.proxy(tr))