comparison mercurial/exchangev2.py @ 39633:ff2de4f2eb3c

exchangev2: fetch and apply phases data Now that the server supports emitting phases data, we can request it and apply it on the client. Because we may receive phases-only updates from the server, we no longer conditionally perform the "changesetdata" command depending on whether there are revisions to fetch. In the previous wire protocol, this case would result in us falling back to performing "listkeys" commands to look up phases, bookmarks, etc data. But since "changesetdata" is smart enough to handle metadata only fetches, we can keep things consistent. It's worth noting that because of the unified approach to changeset data retrieval, phase handling code in wire proto v2 exchange is drastically simpler. Contrast with all the code in exchange.py dealing with all the variations for obtaining phases data. Differential Revision: https://phab.mercurial-scm.org/D4484
author Gregory Szorc <gregory.szorc@gmail.com>
date Wed, 12 Sep 2018 10:01:58 -0700
parents b9e453d683a1
children 349482d726ee
comparison
equal deleted inserted replaced
39632:c1aacb0d76ff 39633:ff2de4f2eb3c
14 nullid, 14 nullid,
15 short, 15 short,
16 ) 16 )
17 from . import ( 17 from . import (
18 mdiff, 18 mdiff,
19 phases,
19 pycompat, 20 pycompat,
20 setdiscovery, 21 setdiscovery,
21 ) 22 )
22 23
23 def pull(pullop): 24 def pull(pullop):
28 29
29 # Figure out what needs to be fetched. 30 # Figure out what needs to be fetched.
30 common, fetch, remoteheads = _pullchangesetdiscovery( 31 common, fetch, remoteheads = _pullchangesetdiscovery(
31 repo, remote, pullop.heads, abortwhenunrelated=pullop.force) 32 repo, remote, pullop.heads, abortwhenunrelated=pullop.force)
32 33
34 # And fetch the data.
33 pullheads = pullop.heads or remoteheads 35 pullheads = pullop.heads or remoteheads
34 _fetchchangesets(repo, tr, remote, common, fetch, pullheads) 36 csetres = _fetchchangesets(repo, tr, remote, common, fetch, pullheads)
37
38 # New revisions are written to the changelog. But all other updates
39 # are deferred. Do those now.
40
41 # Ensure all new changesets are draft by default. If the repo is
42 # publishing, the phase will be adjusted by the loop below.
43 if csetres['added']:
44 phases.registernew(repo, tr, phases.draft, csetres['added'])
45
46 # And adjust the phase of all changesets accordingly.
47 for phase in phases.phasenames:
48 if phase == b'secret' or not csetres['nodesbyphase'][phase]:
49 continue
50
51 phases.advanceboundary(repo, tr, phases.phasenames.index(phase),
52 csetres['nodesbyphase'][phase])
35 53
36 def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True): 54 def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True):
37 """Determine which changesets need to be pulled.""" 55 """Determine which changesets need to be pulled."""
38 56
39 if heads: 57 if heads:
63 common.discard(nullid) 81 common.discard(nullid)
64 82
65 return common, fetch, remoteheads 83 return common, fetch, remoteheads
66 84
67 def _fetchchangesets(repo, tr, remote, common, fetch, remoteheads): 85 def _fetchchangesets(repo, tr, remote, common, fetch, remoteheads):
68 if not fetch:
69 return
70
71 # TODO consider adding a step here where we obtain the DAG shape first 86 # TODO consider adding a step here where we obtain the DAG shape first
72 # (or ask the server to slice changesets into chunks for us) so that 87 # (or ask the server to slice changesets into chunks for us) so that
73 # we can perform multiple fetches in batches. This will facilitate 88 # we can perform multiple fetches in batches. This will facilitate
74 # resuming interrupted clones, higher server-side cache hit rates due 89 # resuming interrupted clones, higher server-side cache hit rates due
75 # to smaller segments, etc. 90 # to smaller segments, etc.
76 with remote.commandexecutor() as e: 91 with remote.commandexecutor() as e:
77 objs = e.callcommand(b'changesetdata', { 92 objs = e.callcommand(b'changesetdata', {
78 b'noderange': [sorted(common), sorted(remoteheads)], 93 b'noderange': [sorted(common), sorted(remoteheads)],
79 b'fields': {b'parents', b'revision'}, 94 b'fields': {b'parents', b'phase', b'revision'},
80 }).result() 95 }).result()
81 96
82 # The context manager waits on all response data when exiting. So 97 # The context manager waits on all response data when exiting. So
83 # we need to remain in the context manager in order to stream data. 98 # we need to remain in the context manager in order to stream data.
84 return _processchangesetdata(repo, tr, objs) 99 return _processchangesetdata(repo, tr, objs)
106 return len(cl) 121 return len(cl)
107 122
108 def onchangeset(cl, node): 123 def onchangeset(cl, node):
109 progress.increment() 124 progress.increment()
110 125
126 nodesbyphase = {phase: set() for phase in phases.phasenames}
127
111 # addgroup() expects a 7-tuple describing revisions. This normalizes 128 # addgroup() expects a 7-tuple describing revisions. This normalizes
112 # the wire data to that format. 129 # the wire data to that format.
130 #
131 # This loop also aggregates non-revision metadata, such as phase
132 # data.
113 def iterrevisions(): 133 def iterrevisions():
114 for cset in objs: 134 for cset in objs:
115 assert b'revisionsize' in cset 135 node = cset[b'node']
136
137 if b'phase' in cset:
138 nodesbyphase[cset[b'phase']].add(node)
139
140 # Some entries might only be metadata only updates.
141 if b'revisionsize' not in cset:
142 continue
143
116 data = next(objs) 144 data = next(objs)
117 145
118 yield ( 146 yield (
119 cset[b'node'], 147 node,
120 cset[b'parents'][0], 148 cset[b'parents'][0],
121 cset[b'parents'][1], 149 cset[b'parents'][1],
122 # Linknode is always itself for changesets. 150 # Linknode is always itself for changesets.
123 cset[b'node'], 151 cset[b'node'],
124 # We always send full revisions. So delta base is not set. 152 # We always send full revisions. So delta base is not set.
133 161
134 progress.complete() 162 progress.complete()
135 163
136 return { 164 return {
137 'added': added, 165 'added': added,
166 'nodesbyphase': nodesbyphase,
138 } 167 }