diff mercurial/exchangev2.py @ 39631:b9e453d683a1

exchangev2: fetch changeset revisions All Mercurial repository data is derived from changesets: you can't do anything unless you have changesets. Therefore, it makes sense for changesets to be the first piece of data that we transfer as part of pull. To do this, we call our new "changesetdata" command, requesting parents and revision data. This gives us all the data that a changegroup delta group would give us. We simply normalize this data into what addgroup() expects and call that API on the changelog to bulk insert revisions into the changelog. Code in this commit is heavily borrowed from changegroup.cg1unpacker.apply(). Differential Revision: https://phab.mercurial-scm.org/D4482
author Gregory Szorc <gregory.szorc@gmail.com>
date Wed, 12 Sep 2018 10:01:36 -0700
parents a86d21e70b2b
children ff2de4f2eb3c
line wrap: on
line diff
--- a/mercurial/exchangev2.py	Wed Sep 12 10:01:16 2018 -0700
+++ b/mercurial/exchangev2.py	Wed Sep 12 10:01:36 2018 -0700
@@ -7,10 +7,16 @@
 
 from __future__ import absolute_import
 
+import weakref
+
+from .i18n import _
 from .node import (
     nullid,
+    short,
 )
 from . import (
+    mdiff,
+    pycompat,
     setdiscovery,
 )
 
@@ -18,11 +24,15 @@
     """Pull using wire protocol version 2."""
     repo = pullop.repo
     remote = pullop.remote
+    tr = pullop.trmanager.transaction()
 
     # Figure out what needs to be fetched.
     common, fetch, remoteheads = _pullchangesetdiscovery(
         repo, remote, pullop.heads, abortwhenunrelated=pullop.force)
 
+    pullheads = pullop.heads or remoteheads
+    _fetchchangesets(repo, tr, remote, common, fetch, pullheads)
+
 def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True):
     """Determine which changesets need to be pulled."""
 
@@ -53,3 +63,76 @@
     common.discard(nullid)
 
     return common, fetch, remoteheads
+
+def _fetchchangesets(repo, tr, remote, common, fetch, remoteheads):
+    if not fetch:
+        return
+
+    # TODO consider adding a step here where we obtain the DAG shape first
+    # (or ask the server to slice changesets into chunks for us) so that
+    # we can perform multiple fetches in batches. This will facilitate
+    # resuming interrupted clones, higher server-side cache hit rates due
+    # to smaller segments, etc.
+    with remote.commandexecutor() as e:
+        objs = e.callcommand(b'changesetdata', {
+            b'noderange': [sorted(common), sorted(remoteheads)],
+            b'fields': {b'parents', b'revision'},
+        }).result()
+
+        # The context manager waits on all response data when exiting. So
+        # we need to remain in the context manager in order to stream data.
+        return _processchangesetdata(repo, tr, objs)
+
+def _processchangesetdata(repo, tr, objs):
+    repo.hook('prechangegroup', throw=True,
+              **pycompat.strkwargs(tr.hookargs))
+
+    urepo = repo.unfiltered()
+    cl = urepo.changelog
+
+    cl.delayupdate(tr)
+
+    # The first emitted object is a header describing the data that
+    # follows.
+    meta = next(objs)
+
+    progress = repo.ui.makeprogress(_('changesets'),
+                                    unit=_('chunks'),
+                                    total=meta.get(b'totalitems'))
+
+    def linkrev(node):
+        repo.ui.debug('add changeset %s\n' % short(node))
+        # Linkrev for changelog is always self.
+        return len(cl)
+
+    def onchangeset(cl, node):
+        progress.increment()
+
+    # addgroup() expects a 7-tuple describing revisions. This normalizes
+    # the wire data to that format.
+    def iterrevisions():
+        for cset in objs:
+            assert b'revisionsize' in cset
+            data = next(objs)
+
+            yield (
+                cset[b'node'],
+                cset[b'parents'][0],
+                cset[b'parents'][1],
+                # Linknode is always itself for changesets.
+                cset[b'node'],
+                # We always send full revisions. So delta base is not set.
+                nullid,
+                mdiff.trivialdiffheader(len(data)) + data,
+                # Flags not yet supported.
+                0,
+            )
+
+    added = cl.addgroup(iterrevisions(), linkrev, weakref.proxy(tr),
+                        addrevisioncb=onchangeset)
+
+    progress.complete()
+
+    return {
+        'added': added,
+    }