exchangev2: use filesdata
authorGregory Szorc <gregory.szorc@gmail.com>
Wed, 03 Oct 2018 13:57:42 -0700
changeset 40179 b843356d4ae1
parent 40178 46a40bce3ae0
child 40180 ba70e3acf58a
exchangev2: use filesdata filesdata is a more efficient mechanism for bulk fetching files data for a range of changesets. Let's use it in exchangev2. With this change, a client performing a full clone of mozilla-unified transmits substantially fewer bytes across the wire: before: 139,124,863 bytes sent after: 20,522,499 bytes sent The bulk of the remaining bytes is likely the transfer of ~1M nodes for changesets and manifests. We can eliminate this by making requests in terms of node ranges instead of explicit node lists... Differential Revision: https://phab.mercurial-scm.org/D4982
mercurial/exchangev2.py
tests/test-wireproto-exchangev2.t
--- a/mercurial/exchangev2.py	Wed Oct 03 12:54:39 2018 -0700
+++ b/mercurial/exchangev2.py	Wed Oct 03 13:57:42 2018 -0700
@@ -64,7 +64,8 @@
     # Find all file nodes referenced by added manifests and fetch those
     # revisions.
     fnodes = _derivefilesfrommanifests(repo, manres['added'])
-    _fetchfiles(repo, tr, remote, fnodes, manres['linkrevs'])
+    _fetchfilesfromcsets(repo, tr, remote, fnodes, csetres['added'],
+                         manres['linkrevs'])
 
 def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True):
     """Determine which changesets need to be pulled."""
@@ -346,6 +347,7 @@
     return fnodes
 
 def _fetchfiles(repo, tr, remote, fnodes, linkrevs):
+    """Fetch file data from explicit file revisions."""
     def iterrevisions(objs, progress):
         for filerevision in objs:
             node = filerevision[b'node']
@@ -418,3 +420,84 @@
                     iterrevisions(objs, progress),
                     locallinkrevs[path].__getitem__,
                     weakref.proxy(tr))
+
+def _fetchfilesfromcsets(repo, tr, remote, fnodes, csets, manlinkrevs):
+    """Fetch file data from explicit changeset revisions."""
+
+    def iterrevisions(objs, remaining, progress):
+        while remaining:
+            filerevision = next(objs)
+
+            node = filerevision[b'node']
+
+            extrafields = {}
+
+            for field, size in filerevision.get(b'fieldsfollowing', []):
+                extrafields[field] = next(objs)
+
+            if b'delta' in extrafields:
+                basenode = filerevision[b'deltabasenode']
+                delta = extrafields[b'delta']
+            elif b'revision' in extrafields:
+                basenode = nullid
+                revision = extrafields[b'revision']
+                delta = mdiff.trivialdiffheader(len(revision)) + revision
+            else:
+                continue
+
+            yield (
+                node,
+                filerevision[b'parents'][0],
+                filerevision[b'parents'][1],
+                node,
+                basenode,
+                delta,
+                # Flags not yet supported.
+                0,
+            )
+
+            progress.increment()
+            remaining -= 1
+
+    progress = repo.ui.makeprogress(
+        _('files'), unit=_('chunks'),
+        total=sum(len(v) for v in fnodes.itervalues()))
+
+    commandmeta = remote.apidescriptor[b'commands'][b'filesdata']
+    batchsize = commandmeta.get(b'recommendedbatchsize', 50000)
+
+    for i in pycompat.xrange(0, len(csets), batchsize):
+        batch = [x for x in csets[i:i + batchsize]]
+        if not batch:
+            continue
+
+        with remote.commandexecutor() as e:
+            args = {
+                b'revisions': [{
+                    b'type': b'changesetexplicit',
+                    b'nodes': batch,
+                }],
+                b'fields': {b'parents', b'revision'},
+                b'haveparents': True,
+            }
+
+            objs = e.callcommand(b'filesdata', args).result()
+
+            # First object is an overall header.
+            overall = next(objs)
+
+            # We have overall['totalpaths'] segments.
+            for i in pycompat.xrange(overall[b'totalpaths']):
+                header = next(objs)
+
+                path = header[b'path']
+                store = repo.file(path)
+
+                linkrevs = {
+                    fnode: manlinkrevs[mnode]
+                    for fnode, mnode in fnodes[path].iteritems()}
+
+                store.addgroup(iterrevisions(objs, header[b'totalitems'],
+                                             progress),
+                               linkrevs.__getitem__,
+                               weakref.proxy(tr))
--- a/tests/test-wireproto-exchangev2.t	Wed Oct 03 12:54:39 2018 -0700
+++ b/tests/test-wireproto-exchangev2.t	Wed Oct 03 13:57:42 2018 -0700
@@ -101,40 +101,30 @@
   received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
   received frame(size=992; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
   received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
-  sending 2 commands
-  sending command filedata: {
+  sending 1 commands
+  sending command filesdata: {
     'fields': set([
       'parents',
       'revision'
     ]),
     'haveparents': True,
-    'nodes': [
-      '+N\xb0s\x19\xbf\xa0w\xa4\n/\x04\x916Y\xae\xf0\xdaB\xda',
-      '\x9a8\x12)\x97\xb3\xac\x97\xbe*\x9a\xa2\xe5V\x83\x83A\xfd\xf2\xcc',
-      '\xc2\xa2\x05\xc8\xb2\xad\xe2J\xf2`b\xe5<\xd5\xbc8\x01\xd6`\xda'
-    ],
-    'path': 'a'
-  }
-  sending command filedata: {
-    'fields': set([
-      'parents',
-      'revision'
-    ]),
-    'haveparents': True,
-    'nodes': [
-      '\x81\x9e%\x8d1\xa5\xe1`f)\xf3e\xbb\x90*\x1b!\xeeB\x16',
-      '\xb1zk\xd3g=\x9a\xb8\xce\xd5\x81\xa2\t\xf6/=\xa5\xccEx',
-      '\xc5\xb1\xf9\xd3n\x1c\xc18\xbf\xb6\xef\xb3\xde\xb7]\x8c\xcad\x94\xc3'
-    ],
-    'path': 'b'
+    'revisions': [
+      {
+        'nodes': [
+          '3\x90\xef\x85\x00s\xfb\xc2\xf0\xdf\xff"D4,\x8e\x92)\x01:',
+          'D2\xd86&\xe8\xa9\x86U\xf0b\xec\x1f*C\xb0\x7f\x7f\xbb\xb0',
+          '\xcd%4vk\xec\xe18\xc7\xc1\xaf\xdch%0/\x0fb\xd8\x1f',
+          '\xe9j\xe2\x0fA\x88H{\x9a\xe4\xef9A\xc2|\x81\x141F\xe5',
+          '\xca\xa2\xa4eE\x1d\xd1\xfa\xcd\xa0\xf5\xb1#\x12\xc3UXA\x88\xa1'
+        ],
+        'type': 'changesetexplicit'
+      }
+    ]
   }
   received frame(size=9; request=1; stream=2; streamflags=stream-begin; type=stream-settings; flags=eos)
   received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
-  received frame(size=431; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=901; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
   received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
-  received frame(size=11; request=3; stream=2; streamflags=encoded; type=command-response; flags=continuation)
-  received frame(size=431; request=3; stream=2; streamflags=encoded; type=command-response; flags=continuation)
-  received frame(size=0; request=3; stream=2; streamflags=; type=command-response; flags=eos)
   updating the branch cache
   new changesets 3390ef850073:caa2a465451d (3 drafts)
   (sent 5 HTTP requests and * bytes; received * bytes in responses) (glob)
@@ -239,37 +229,27 @@
   received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
   received frame(size=404; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
   received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
-  sending 2 commands
-  sending command filedata: {
+  sending 1 commands
+  sending command filesdata: {
     'fields': set([
       'parents',
       'revision'
     ]),
     'haveparents': True,
-    'nodes': [
-      '+N\xb0s\x19\xbf\xa0w\xa4\n/\x04\x916Y\xae\xf0\xdaB\xda',
-      '\x9a8\x12)\x97\xb3\xac\x97\xbe*\x9a\xa2\xe5V\x83\x83A\xfd\xf2\xcc'
-    ],
-    'path': 'a'
-  }
-  sending command filedata: {
-    'fields': set([
-      'parents',
-      'revision'
-    ]),
-    'haveparents': True,
-    'nodes': [
-      '\x81\x9e%\x8d1\xa5\xe1`f)\xf3e\xbb\x90*\x1b!\xeeB\x16'
-    ],
-    'path': 'b'
+    'revisions': [
+      {
+        'nodes': [
+          '3\x90\xef\x85\x00s\xfb\xc2\xf0\xdf\xff"D4,\x8e\x92)\x01:',
+          'D2\xd86&\xe8\xa9\x86U\xf0b\xec\x1f*C\xb0\x7f\x7f\xbb\xb0'
+        ],
+        'type': 'changesetexplicit'
+      }
+    ]
   }
   received frame(size=9; request=1; stream=2; streamflags=stream-begin; type=stream-settings; flags=eos)
   received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
-  received frame(size=277; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=439; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
   received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
-  received frame(size=11; request=3; stream=2; streamflags=encoded; type=command-response; flags=continuation)
-  received frame(size=123; request=3; stream=2; streamflags=encoded; type=command-response; flags=continuation)
-  received frame(size=0; request=3; stream=2; streamflags=; type=command-response; flags=eos)
   updating the branch cache
   new changesets 3390ef850073:4432d83626e8
   (sent 6 HTTP requests and * bytes; received * bytes in responses) (glob)
@@ -357,39 +337,28 @@
   received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
   received frame(size=601; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
   received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
-  sending 2 commands
-  sending command filedata: {
+  sending 1 commands
+  sending command filesdata: {
     'fields': set([
       'parents',
       'revision'
     ]),
     'haveparents': True,
-    'nodes': [
-      '+N\xb0s\x19\xbf\xa0w\xa4\n/\x04\x916Y\xae\xf0\xdaB\xda',
-      '\xc2\xa2\x05\xc8\xb2\xad\xe2J\xf2`b\xe5<\xd5\xbc8\x01\xd6`\xda'
-    ],
-    'path': 'a'
-  }
-  sending command filedata: {
-    'fields': set([
-      'parents',
-      'revision'
-    ]),
-    'haveparents': True,
-    'nodes': [
-      '\x81\x9e%\x8d1\xa5\xe1`f)\xf3e\xbb\x90*\x1b!\xeeB\x16',
-      '\xb1zk\xd3g=\x9a\xb8\xce\xd5\x81\xa2\t\xf6/=\xa5\xccEx',
-      '\xc5\xb1\xf9\xd3n\x1c\xc18\xbf\xb6\xef\xb3\xde\xb7]\x8c\xcad\x94\xc3'
-    ],
-    'path': 'b'
+    'revisions': [
+      {
+        'nodes': [
+          '\xcd%4vk\xec\xe18\xc7\xc1\xaf\xdch%0/\x0fb\xd8\x1f',
+          '\xe9j\xe2\x0fA\x88H{\x9a\xe4\xef9A\xc2|\x81\x141F\xe5',
+          '\xca\xa2\xa4eE\x1d\xd1\xfa\xcd\xa0\xf5\xb1#\x12\xc3UXA\x88\xa1'
+        ],
+        'type': 'changesetexplicit'
+      }
+    ]
   }
   received frame(size=9; request=1; stream=2; streamflags=stream-begin; type=stream-settings; flags=eos)
   received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
-  received frame(size=277; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=527; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
   received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
-  received frame(size=11; request=3; stream=2; streamflags=encoded; type=command-response; flags=continuation)
-  received frame(size=431; request=3; stream=2; streamflags=encoded; type=command-response; flags=continuation)
-  received frame(size=0; request=3; stream=2; streamflags=; type=command-response; flags=eos)
   updating the branch cache
   new changesets cd2534766bec:caa2a465451d (3 drafts)
   (run 'hg update' to get a working copy)
@@ -557,40 +526,30 @@
   received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
   received frame(size=992; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
   received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
-  sending 2 commands
-  sending command filedata: {
+  sending 1 commands
+  sending command filesdata: {
     'fields': set([
       'parents',
       'revision'
     ]),
     'haveparents': True,
-    'nodes': [
-      '+N\xb0s\x19\xbf\xa0w\xa4\n/\x04\x916Y\xae\xf0\xdaB\xda',
-      '\x9a8\x12)\x97\xb3\xac\x97\xbe*\x9a\xa2\xe5V\x83\x83A\xfd\xf2\xcc',
-      '\xc2\xa2\x05\xc8\xb2\xad\xe2J\xf2`b\xe5<\xd5\xbc8\x01\xd6`\xda'
-    ],
-    'path': 'a'
-  }
-  sending command filedata: {
-    'fields': set([
-      'parents',
-      'revision'
-    ]),
-    'haveparents': True,
-    'nodes': [
-      '\x81\x9e%\x8d1\xa5\xe1`f)\xf3e\xbb\x90*\x1b!\xeeB\x16',
-      '\xb1zk\xd3g=\x9a\xb8\xce\xd5\x81\xa2\t\xf6/=\xa5\xccEx',
-      '\xc5\xb1\xf9\xd3n\x1c\xc18\xbf\xb6\xef\xb3\xde\xb7]\x8c\xcad\x94\xc3'
-    ],
-    'path': 'b'
+    'revisions': [
+      {
+        'nodes': [
+          '3\x90\xef\x85\x00s\xfb\xc2\xf0\xdf\xff"D4,\x8e\x92)\x01:',
+          'D2\xd86&\xe8\xa9\x86U\xf0b\xec\x1f*C\xb0\x7f\x7f\xbb\xb0',
+          '\xcd%4vk\xec\xe18\xc7\xc1\xaf\xdch%0/\x0fb\xd8\x1f',
+          '\xe9j\xe2\x0fA\x88H{\x9a\xe4\xef9A\xc2|\x81\x141F\xe5',
+          '\xca\xa2\xa4eE\x1d\xd1\xfa\xcd\xa0\xf5\xb1#\x12\xc3UXA\x88\xa1'
+        ],
+        'type': 'changesetexplicit'
+      }
+    ]
   }
   received frame(size=9; request=1; stream=2; streamflags=stream-begin; type=stream-settings; flags=eos)
   received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
-  received frame(size=431; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+  received frame(size=901; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
   received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
-  received frame(size=11; request=3; stream=2; streamflags=encoded; type=command-response; flags=continuation)
-  received frame(size=431; request=3; stream=2; streamflags=encoded; type=command-response; flags=continuation)
-  received frame(size=0; request=3; stream=2; streamflags=; type=command-response; flags=eos)
   updating the branch cache
   new changesets 3390ef850073:caa2a465451d (1 drafts)
   (sent 5 HTTP requests and * bytes; received * bytes in responses) (glob)