exchangev2: use filesdata
filesdata is a more efficient mechanism for bulk fetching files data for a
range of changesets. Let's use it in exchangev2.
With this change, a client performing a full clone of mozilla-unified
transmits substantially fewer bytes across the wire:
before: 139,124,863 bytes sent
after: 20,522,499 bytes sent
The bulk of the remaining bytes is likely the transfer of ~1M nodes for
changesets and manifests. We can eliminate this by making requests in
terms of node ranges instead of explicit node lists...
Differential Revision: https://phab.mercurial-scm.org/D4982
--- a/mercurial/exchangev2.py Wed Oct 03 12:54:39 2018 -0700
+++ b/mercurial/exchangev2.py Wed Oct 03 13:57:42 2018 -0700
@@ -64,7 +64,8 @@
# Find all file nodes referenced by added manifests and fetch those
# revisions.
fnodes = _derivefilesfrommanifests(repo, manres['added'])
- _fetchfiles(repo, tr, remote, fnodes, manres['linkrevs'])
+ _fetchfilesfromcsets(repo, tr, remote, fnodes, csetres['added'],
+ manres['linkrevs'])
def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True):
"""Determine which changesets need to be pulled."""
@@ -346,6 +347,7 @@
return fnodes
def _fetchfiles(repo, tr, remote, fnodes, linkrevs):
+ """Fetch file data from explicit file revisions."""
def iterrevisions(objs, progress):
for filerevision in objs:
node = filerevision[b'node']
@@ -418,3 +420,84 @@
iterrevisions(objs, progress),
locallinkrevs[path].__getitem__,
weakref.proxy(tr))
+
+def _fetchfilesfromcsets(repo, tr, remote, fnodes, csets, manlinkrevs):
+ """Fetch file data from explicit changeset revisions."""
+
+ def iterrevisions(objs, remaining, progress):
+ while remaining:
+ filerevision = next(objs)
+
+ node = filerevision[b'node']
+
+ extrafields = {}
+
+ for field, size in filerevision.get(b'fieldsfollowing', []):
+ extrafields[field] = next(objs)
+
+ if b'delta' in extrafields:
+ basenode = filerevision[b'deltabasenode']
+ delta = extrafields[b'delta']
+ elif b'revision' in extrafields:
+ basenode = nullid
+ revision = extrafields[b'revision']
+ delta = mdiff.trivialdiffheader(len(revision)) + revision
+ else:
+ continue
+
+ yield (
+ node,
+ filerevision[b'parents'][0],
+ filerevision[b'parents'][1],
+ node,
+ basenode,
+ delta,
+ # Flags not yet supported.
+ 0,
+ )
+
+ progress.increment()
+ remaining -= 1
+
+ progress = repo.ui.makeprogress(
+ _('files'), unit=_('chunks'),
+ total=sum(len(v) for v in fnodes.itervalues()))
+
+ commandmeta = remote.apidescriptor[b'commands'][b'filesdata']
+ batchsize = commandmeta.get(b'recommendedbatchsize', 50000)
+
+ for i in pycompat.xrange(0, len(csets), batchsize):
+ batch = [x for x in csets[i:i + batchsize]]
+ if not batch:
+ continue
+
+ with remote.commandexecutor() as e:
+ args = {
+ b'revisions': [{
+ b'type': b'changesetexplicit',
+ b'nodes': batch,
+ }],
+ b'fields': {b'parents', b'revision'},
+ b'haveparents': True,
+ }
+
+ objs = e.callcommand(b'filesdata', args).result()
+
+ # First object is an overall header.
+ overall = next(objs)
+
+ # We have overall['totalpaths'] segments.
+ for i in pycompat.xrange(overall[b'totalpaths']):
+ header = next(objs)
+
+ path = header[b'path']
+ store = repo.file(path)
+
+ linkrevs = {
+ fnode: manlinkrevs[mnode]
+ for fnode, mnode in fnodes[path].iteritems()}
+
+ store.addgroup(iterrevisions(objs, header[b'totalitems'],
+ progress),
+ linkrevs.__getitem__,
+ weakref.proxy(tr))
--- a/tests/test-wireproto-exchangev2.t Wed Oct 03 12:54:39 2018 -0700
+++ b/tests/test-wireproto-exchangev2.t Wed Oct 03 13:57:42 2018 -0700
@@ -101,40 +101,30 @@
received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
received frame(size=992; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
- sending 2 commands
- sending command filedata: {
+ sending 1 commands
+ sending command filesdata: {
'fields': set([
'parents',
'revision'
]),
'haveparents': True,
- 'nodes': [
- '+N\xb0s\x19\xbf\xa0w\xa4\n/\x04\x916Y\xae\xf0\xdaB\xda',
- '\x9a8\x12)\x97\xb3\xac\x97\xbe*\x9a\xa2\xe5V\x83\x83A\xfd\xf2\xcc',
- '\xc2\xa2\x05\xc8\xb2\xad\xe2J\xf2`b\xe5<\xd5\xbc8\x01\xd6`\xda'
- ],
- 'path': 'a'
- }
- sending command filedata: {
- 'fields': set([
- 'parents',
- 'revision'
- ]),
- 'haveparents': True,
- 'nodes': [
- '\x81\x9e%\x8d1\xa5\xe1`f)\xf3e\xbb\x90*\x1b!\xeeB\x16',
- '\xb1zk\xd3g=\x9a\xb8\xce\xd5\x81\xa2\t\xf6/=\xa5\xccEx',
- '\xc5\xb1\xf9\xd3n\x1c\xc18\xbf\xb6\xef\xb3\xde\xb7]\x8c\xcad\x94\xc3'
- ],
- 'path': 'b'
+ 'revisions': [
+ {
+ 'nodes': [
+ '3\x90\xef\x85\x00s\xfb\xc2\xf0\xdf\xff"D4,\x8e\x92)\x01:',
+ 'D2\xd86&\xe8\xa9\x86U\xf0b\xec\x1f*C\xb0\x7f\x7f\xbb\xb0',
+ '\xcd%4vk\xec\xe18\xc7\xc1\xaf\xdch%0/\x0fb\xd8\x1f',
+ '\xe9j\xe2\x0fA\x88H{\x9a\xe4\xef9A\xc2|\x81\x141F\xe5',
+ '\xca\xa2\xa4eE\x1d\xd1\xfa\xcd\xa0\xf5\xb1#\x12\xc3UXA\x88\xa1'
+ ],
+ 'type': 'changesetexplicit'
+ }
+ ]
}
received frame(size=9; request=1; stream=2; streamflags=stream-begin; type=stream-settings; flags=eos)
received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
- received frame(size=431; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+ received frame(size=901; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
- received frame(size=11; request=3; stream=2; streamflags=encoded; type=command-response; flags=continuation)
- received frame(size=431; request=3; stream=2; streamflags=encoded; type=command-response; flags=continuation)
- received frame(size=0; request=3; stream=2; streamflags=; type=command-response; flags=eos)
updating the branch cache
new changesets 3390ef850073:caa2a465451d (3 drafts)
(sent 5 HTTP requests and * bytes; received * bytes in responses) (glob)
@@ -239,37 +229,27 @@
received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
received frame(size=404; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
- sending 2 commands
- sending command filedata: {
+ sending 1 commands
+ sending command filesdata: {
'fields': set([
'parents',
'revision'
]),
'haveparents': True,
- 'nodes': [
- '+N\xb0s\x19\xbf\xa0w\xa4\n/\x04\x916Y\xae\xf0\xdaB\xda',
- '\x9a8\x12)\x97\xb3\xac\x97\xbe*\x9a\xa2\xe5V\x83\x83A\xfd\xf2\xcc'
- ],
- 'path': 'a'
- }
- sending command filedata: {
- 'fields': set([
- 'parents',
- 'revision'
- ]),
- 'haveparents': True,
- 'nodes': [
- '\x81\x9e%\x8d1\xa5\xe1`f)\xf3e\xbb\x90*\x1b!\xeeB\x16'
- ],
- 'path': 'b'
+ 'revisions': [
+ {
+ 'nodes': [
+ '3\x90\xef\x85\x00s\xfb\xc2\xf0\xdf\xff"D4,\x8e\x92)\x01:',
+ 'D2\xd86&\xe8\xa9\x86U\xf0b\xec\x1f*C\xb0\x7f\x7f\xbb\xb0'
+ ],
+ 'type': 'changesetexplicit'
+ }
+ ]
}
received frame(size=9; request=1; stream=2; streamflags=stream-begin; type=stream-settings; flags=eos)
received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
- received frame(size=277; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+ received frame(size=439; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
- received frame(size=11; request=3; stream=2; streamflags=encoded; type=command-response; flags=continuation)
- received frame(size=123; request=3; stream=2; streamflags=encoded; type=command-response; flags=continuation)
- received frame(size=0; request=3; stream=2; streamflags=; type=command-response; flags=eos)
updating the branch cache
new changesets 3390ef850073:4432d83626e8
(sent 6 HTTP requests and * bytes; received * bytes in responses) (glob)
@@ -357,39 +337,28 @@
received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
received frame(size=601; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
- sending 2 commands
- sending command filedata: {
+ sending 1 commands
+ sending command filesdata: {
'fields': set([
'parents',
'revision'
]),
'haveparents': True,
- 'nodes': [
- '+N\xb0s\x19\xbf\xa0w\xa4\n/\x04\x916Y\xae\xf0\xdaB\xda',
- '\xc2\xa2\x05\xc8\xb2\xad\xe2J\xf2`b\xe5<\xd5\xbc8\x01\xd6`\xda'
- ],
- 'path': 'a'
- }
- sending command filedata: {
- 'fields': set([
- 'parents',
- 'revision'
- ]),
- 'haveparents': True,
- 'nodes': [
- '\x81\x9e%\x8d1\xa5\xe1`f)\xf3e\xbb\x90*\x1b!\xeeB\x16',
- '\xb1zk\xd3g=\x9a\xb8\xce\xd5\x81\xa2\t\xf6/=\xa5\xccEx',
- '\xc5\xb1\xf9\xd3n\x1c\xc18\xbf\xb6\xef\xb3\xde\xb7]\x8c\xcad\x94\xc3'
- ],
- 'path': 'b'
+ 'revisions': [
+ {
+ 'nodes': [
+ '\xcd%4vk\xec\xe18\xc7\xc1\xaf\xdch%0/\x0fb\xd8\x1f',
+ '\xe9j\xe2\x0fA\x88H{\x9a\xe4\xef9A\xc2|\x81\x141F\xe5',
+ '\xca\xa2\xa4eE\x1d\xd1\xfa\xcd\xa0\xf5\xb1#\x12\xc3UXA\x88\xa1'
+ ],
+ 'type': 'changesetexplicit'
+ }
+ ]
}
received frame(size=9; request=1; stream=2; streamflags=stream-begin; type=stream-settings; flags=eos)
received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
- received frame(size=277; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+ received frame(size=527; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
- received frame(size=11; request=3; stream=2; streamflags=encoded; type=command-response; flags=continuation)
- received frame(size=431; request=3; stream=2; streamflags=encoded; type=command-response; flags=continuation)
- received frame(size=0; request=3; stream=2; streamflags=; type=command-response; flags=eos)
updating the branch cache
new changesets cd2534766bec:caa2a465451d (3 drafts)
(run 'hg update' to get a working copy)
@@ -557,40 +526,30 @@
received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
received frame(size=992; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
- sending 2 commands
- sending command filedata: {
+ sending 1 commands
+ sending command filesdata: {
'fields': set([
'parents',
'revision'
]),
'haveparents': True,
- 'nodes': [
- '+N\xb0s\x19\xbf\xa0w\xa4\n/\x04\x916Y\xae\xf0\xdaB\xda',
- '\x9a8\x12)\x97\xb3\xac\x97\xbe*\x9a\xa2\xe5V\x83\x83A\xfd\xf2\xcc',
- '\xc2\xa2\x05\xc8\xb2\xad\xe2J\xf2`b\xe5<\xd5\xbc8\x01\xd6`\xda'
- ],
- 'path': 'a'
- }
- sending command filedata: {
- 'fields': set([
- 'parents',
- 'revision'
- ]),
- 'haveparents': True,
- 'nodes': [
- '\x81\x9e%\x8d1\xa5\xe1`f)\xf3e\xbb\x90*\x1b!\xeeB\x16',
- '\xb1zk\xd3g=\x9a\xb8\xce\xd5\x81\xa2\t\xf6/=\xa5\xccEx',
- '\xc5\xb1\xf9\xd3n\x1c\xc18\xbf\xb6\xef\xb3\xde\xb7]\x8c\xcad\x94\xc3'
- ],
- 'path': 'b'
+ 'revisions': [
+ {
+ 'nodes': [
+ '3\x90\xef\x85\x00s\xfb\xc2\xf0\xdf\xff"D4,\x8e\x92)\x01:',
+ 'D2\xd86&\xe8\xa9\x86U\xf0b\xec\x1f*C\xb0\x7f\x7f\xbb\xb0',
+ '\xcd%4vk\xec\xe18\xc7\xc1\xaf\xdch%0/\x0fb\xd8\x1f',
+ '\xe9j\xe2\x0fA\x88H{\x9a\xe4\xef9A\xc2|\x81\x141F\xe5',
+ '\xca\xa2\xa4eE\x1d\xd1\xfa\xcd\xa0\xf5\xb1#\x12\xc3UXA\x88\xa1'
+ ],
+ 'type': 'changesetexplicit'
+ }
+ ]
}
received frame(size=9; request=1; stream=2; streamflags=stream-begin; type=stream-settings; flags=eos)
received frame(size=11; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
- received frame(size=431; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
+ received frame(size=901; request=1; stream=2; streamflags=encoded; type=command-response; flags=continuation)
received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
- received frame(size=11; request=3; stream=2; streamflags=encoded; type=command-response; flags=continuation)
- received frame(size=431; request=3; stream=2; streamflags=encoded; type=command-response; flags=continuation)
- received frame(size=0; request=3; stream=2; streamflags=; type=command-response; flags=eos)
updating the branch cache
new changesets 3390ef850073:caa2a465451d (1 drafts)
(sent 5 HTTP requests and * bytes; received * bytes in responses) (glob)