exchangev2: fetch changeset revisions
authorGregory Szorc <gregory.szorc@gmail.com>
Wed, 12 Sep 2018 10:01:36 -0700
changeset 39631 b9e453d683a1
parent 39630 9c2c77c73f23
child 39632 c1aacb0d76ff
exchangev2: fetch changeset revisions All Mercurial repository data is derived from changesets: you can't do anything unless you have changesets. Therefore, it makes sense for changesets to be the first piece of data that we transfer as part of pull. To do this, we call our new "changesetdata" command, requesting parents and revision data. This gives us all the data that a changegroup delta group would give us. We simply normalize this data into what addgroup() expects and call that API on the changelog to bulk insert revisions into the changelog. Code in this commit is heavily borrowed from changegroup.cg1unpacker.apply(). Differential Revision: https://phab.mercurial-scm.org/D4482
mercurial/exchangev2.py
tests/test-wireproto-exchangev2.t
--- a/mercurial/exchangev2.py	Wed Sep 12 10:01:16 2018 -0700
+++ b/mercurial/exchangev2.py	Wed Sep 12 10:01:36 2018 -0700
@@ -7,10 +7,16 @@
 
 from __future__ import absolute_import
 
+import weakref
+
+from .i18n import _
 from .node import (
     nullid,
+    short,
 )
 from . import (
+    mdiff,
+    pycompat,
     setdiscovery,
 )
 
@@ -18,11 +24,15 @@
     """Pull using wire protocol version 2."""
     repo = pullop.repo
     remote = pullop.remote
+    tr = pullop.trmanager.transaction()
 
     # Figure out what needs to be fetched.
     common, fetch, remoteheads = _pullchangesetdiscovery(
         repo, remote, pullop.heads, abortwhenunrelated=pullop.force)
 
+    pullheads = pullop.heads or remoteheads
+    _fetchchangesets(repo, tr, remote, common, fetch, pullheads)
+
 def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True):
     """Determine which changesets need to be pulled."""
 
@@ -53,3 +63,76 @@
     common.discard(nullid)
 
     return common, fetch, remoteheads
+
+def _fetchchangesets(repo, tr, remote, common, fetch, remoteheads):
+    if not fetch:
+        return
+
+    # TODO consider adding a step here where we obtain the DAG shape first
+    # (or ask the server to slice changesets into chunks for us) so that
+    # we can perform multiple fetches in batches. This will facilitate
+    # resuming interrupted clones, higher server-side cache hit rates due
+    # to smaller segments, etc.
+    with remote.commandexecutor() as e:
+        objs = e.callcommand(b'changesetdata', {
+            b'noderange': [sorted(common), sorted(remoteheads)],
+            b'fields': {b'parents', b'revision'},
+        }).result()
+
+        # The context manager waits on all response data when exiting. So
+        # we need to remain in the context manager in order to stream data.
+        return _processchangesetdata(repo, tr, objs)
+
+def _processchangesetdata(repo, tr, objs):
+    repo.hook('prechangegroup', throw=True,
+              **pycompat.strkwargs(tr.hookargs))
+
+    urepo = repo.unfiltered()
+    cl = urepo.changelog
+
+    cl.delayupdate(tr)
+
+    # The first emitted object is a header describing the data that
+    # follows.
+    meta = next(objs)
+
+    progress = repo.ui.makeprogress(_('changesets'),
+                                    unit=_('chunks'),
+                                    total=meta.get(b'totalitems'))
+
+    def linkrev(node):
+        repo.ui.debug('add changeset %s\n' % short(node))
+        # Linkrev for changelog is always self.
+        return len(cl)
+
+    def onchangeset(cl, node):
+        progress.increment()
+
+    # addgroup() expects a 7-tuple describing revisions. This normalizes
+    # the wire data to that format.
+    def iterrevisions():
+        for cset in objs:
+            assert b'revisionsize' in cset
+            data = next(objs)
+
+            yield (
+                cset[b'node'],
+                cset[b'parents'][0],
+                cset[b'parents'][1],
+                # Linknode is always itself for changesets.
+                cset[b'node'],
+                # We always send full revisions. So delta base is not set.
+                nullid,
+                mdiff.trivialdiffheader(len(data)) + data,
+                # Flags not yet supported.
+                0,
+            )
+
+    added = cl.addgroup(iterrevisions(), linkrev, weakref.proxy(tr),
+                        addrevisioncb=onchangeset)
+
+    progress.complete()
+
+    return {
+        'added': added,
+    }
--- a/tests/test-wireproto-exchangev2.t	Wed Sep 12 10:01:16 2018 -0700
+++ b/tests/test-wireproto-exchangev2.t	Wed Sep 12 10:01:36 2018 -0700
@@ -51,3 +51,164 @@
   received frame(size=11; request=3; stream=2; streamflags=; type=command-response; flags=continuation)
   received frame(size=1; request=3; stream=2; streamflags=; type=command-response; flags=continuation)
   received frame(size=0; request=3; stream=2; streamflags=; type=command-response; flags=eos)
+  sending 1 commands
+  sending command changesetdata: {
+    'fields': set([
+      'parents',
+      'revision'
+    ]),
+    'noderange': [
+      [],
+      [
+        '\xca\xa2\xa4eE\x1d\xd1\xfa\xcd\xa0\xf5\xb1#\x12\xc3UXA\x88\xa1',
+        '\xcd%4vk\xec\xe18\xc7\xc1\xaf\xdch%0/\x0fb\xd8\x1f'
+      ]
+    ]
+  }
+  received frame(size=11; request=1; stream=2; streamflags=stream-begin; type=command-response; flags=continuation)
+  received frame(size=809; request=1; stream=2; streamflags=; type=command-response; flags=continuation)
+  received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  add changeset 3390ef850073
+  add changeset 4432d83626e8
+  add changeset cd2534766bec
+  add changeset e96ae20f4188
+  add changeset caa2a465451d
+  updating the branch cache
+  new changesets 3390ef850073:caa2a465451d
+
+All changesets should have been transferred
+
+  $ hg -R client-simple debugindex -c
+     rev linkrev nodeid       p1           p2
+       0       0 3390ef850073 000000000000 000000000000
+       1       1 4432d83626e8 3390ef850073 000000000000
+       2       2 cd2534766bec 4432d83626e8 000000000000
+       3       3 e96ae20f4188 3390ef850073 000000000000
+       4       4 caa2a465451d e96ae20f4188 000000000000
+
+  $ hg -R client-simple log -G -T '{rev} {node} {phase}\n'
+  o  4 caa2a465451dd1facda0f5b12312c355584188a1 public
+  |
+  o  3 e96ae20f4188487b9ae4ef3941c27c81143146e5 public
+  |
+  | o  2 cd2534766bece138c7c1afdc6825302f0f62d81f public
+  | |
+  | o  1 4432d83626e8a98655f062ec1f2a43b07f7fbbb0 public
+  |/
+  o  0 3390ef850073fbc2f0dfff2244342c8e9229013a public
+  
+
+Cloning only a specific revision works
+
+  $ hg --debug clone -U -r 4432d83626e8 http://localhost:$HGPORT client-singlehead
+  using http://localhost:$HGPORT/
+  sending capabilities command
+  sending 1 commands
+  sending command lookup: {
+    'key': '4432d83626e8'
+  }
+  received frame(size=11; request=1; stream=2; streamflags=stream-begin; type=command-response; flags=continuation)
+  received frame(size=21; request=1; stream=2; streamflags=; type=command-response; flags=continuation)
+  received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  query 1; heads
+  sending 2 commands
+  sending command heads: {}
+  sending command known: {
+    'nodes': []
+  }
+  received frame(size=11; request=1; stream=2; streamflags=stream-begin; type=command-response; flags=continuation)
+  received frame(size=43; request=1; stream=2; streamflags=; type=command-response; flags=continuation)
+  received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  received frame(size=11; request=3; stream=2; streamflags=; type=command-response; flags=continuation)
+  received frame(size=1; request=3; stream=2; streamflags=; type=command-response; flags=continuation)
+  received frame(size=0; request=3; stream=2; streamflags=; type=command-response; flags=eos)
+  sending 1 commands
+  sending command changesetdata: {
+    'fields': set([
+      'parents',
+      'revision'
+    ]),
+    'noderange': [
+      [],
+      [
+        'D2\xd86&\xe8\xa9\x86U\xf0b\xec\x1f*C\xb0\x7f\x7f\xbb\xb0'
+      ]
+    ]
+  }
+  received frame(size=11; request=1; stream=2; streamflags=stream-begin; type=command-response; flags=continuation)
+  received frame(size=327; request=1; stream=2; streamflags=; type=command-response; flags=continuation)
+  received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  add changeset 3390ef850073
+  add changeset 4432d83626e8
+  updating the branch cache
+  new changesets 3390ef850073:4432d83626e8
+
+  $ cd client-singlehead
+
+  $ hg log -G -T '{rev} {node} {phase}\n'
+  o  1 4432d83626e8a98655f062ec1f2a43b07f7fbbb0 public
+  |
+  o  0 3390ef850073fbc2f0dfff2244342c8e9229013a public
+  
+
+Incremental pull works
+
+  $ hg --debug pull
+  pulling from http://localhost:$HGPORT/
+  using http://localhost:$HGPORT/
+  sending capabilities command
+  query 1; heads
+  sending 2 commands
+  sending command heads: {}
+  sending command known: {
+    'nodes': [
+      'D2\xd86&\xe8\xa9\x86U\xf0b\xec\x1f*C\xb0\x7f\x7f\xbb\xb0'
+    ]
+  }
+  received frame(size=11; request=1; stream=2; streamflags=stream-begin; type=command-response; flags=continuation)
+  received frame(size=43; request=1; stream=2; streamflags=; type=command-response; flags=continuation)
+  received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  received frame(size=11; request=3; stream=2; streamflags=; type=command-response; flags=continuation)
+  received frame(size=2; request=3; stream=2; streamflags=; type=command-response; flags=continuation)
+  received frame(size=0; request=3; stream=2; streamflags=; type=command-response; flags=eos)
+  searching for changes
+  all local heads known remotely
+  sending 1 commands
+  sending command changesetdata: {
+    'fields': set([
+      'parents',
+      'revision'
+    ]),
+    'noderange': [
+      [
+        'D2\xd86&\xe8\xa9\x86U\xf0b\xec\x1f*C\xb0\x7f\x7f\xbb\xb0'
+      ],
+      [
+        '\xca\xa2\xa4eE\x1d\xd1\xfa\xcd\xa0\xf5\xb1#\x12\xc3UXA\x88\xa1',
+        '\xcd%4vk\xec\xe18\xc7\xc1\xaf\xdch%0/\x0fb\xd8\x1f'
+      ]
+    ]
+  }
+  received frame(size=11; request=1; stream=2; streamflags=stream-begin; type=command-response; flags=continuation)
+  received frame(size=495; request=1; stream=2; streamflags=; type=command-response; flags=continuation)
+  received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
+  add changeset cd2534766bec
+  add changeset e96ae20f4188
+  add changeset caa2a465451d
+  updating the branch cache
+  new changesets cd2534766bec:caa2a465451d
+  (run 'hg update' to get a working copy)
+
+  $ hg log -G -T '{rev} {node} {phase}\n'
+  o  4 caa2a465451dd1facda0f5b12312c355584188a1 public
+  |
+  o  3 e96ae20f4188487b9ae4ef3941c27c81143146e5 public
+  |
+  | o  2 cd2534766bece138c7c1afdc6825302f0f62d81f public
+  | |
+  | o  1 4432d83626e8a98655f062ec1f2a43b07f7fbbb0 public
+  |/
+  o  0 3390ef850073fbc2f0dfff2244342c8e9229013a public
+  
+
+  $ cd ..