Mercurial > hg
comparison mercurial/exchangev2.py @ 39638:d292328e0143
exchangev2: fetch manifest revisions
Now that the server has support for retrieving manifest data, we can
implement the client bits to call it.
We teach the changeset fetching code to capture the manifest revisions
that are encountered on incoming changesets. We then feed this into a
new function which filters out known manifests and then batches up
manifest data requests to the server.
This is different from the previous wire protocol in a few notable
ways.
First, the client fetches manifest data separately and explicitly.
Before, we'd ask the server for data pertaining to some changesets
(via a "getbundle" command) and manifests (and files) would be sent
automatically. Providing an API for looking up just manifest data
separately gives clients much more flexibility for manifest management.
For example, a client may choose to only fetch manifest data on demand
instead of prefetching it (i.e. partial clone).
Second, we send N commands to the server for manifest retrieval instead
of 1. This property has a few nice side-effects. One is that the
deterministic nature of the requests lends itself to server-side
caching. For example, say the remote has 50,000 manifests. If the
server is configured to cache responses, each time a new commit
arrives, you will have a cache miss and need to regenerate all outgoing
data. But if you makes N requests requesting 10,000 manifests each,
a new commit will still yield cache hits on the initial, unchanged
manifest batches/requests.
A derived benefit from these properties is that resumable clone is
conceptually simpler to implement. When making a monolithic request
for all of the repository data, recovering from an interrupted clone
is hard because the server was in the driver's seat and was maintaining
state about all the data that needed transferred. With the client
driving fetching, the client can persist the set of unfetched entities
and retry/resume a fetch if something goes wrong. Or we can fetch all
data N changesets at a time and slowly build up a repository. This
approach is drastically easier to implement when we have server APIs
exposing low-level repository primitives (such as manifests and files).
We don't yet support tree manifests. But it should be possible to
implement that with the existing wire protocol command.
Differential Revision: https://phab.mercurial-scm.org/D4489
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Wed, 05 Sep 2018 09:09:57 -0700 |
parents | 399ddd3227a4 |
children | 039bf1eddc2e |
comparison
equal
deleted
inserted
replaced
39637:c7a7c7e844e5 | 39638:d292328e0143 |
---|---|
14 nullid, | 14 nullid, |
15 short, | 15 short, |
16 ) | 16 ) |
17 from . import ( | 17 from . import ( |
18 bookmarks, | 18 bookmarks, |
19 error, | |
19 mdiff, | 20 mdiff, |
20 phases, | 21 phases, |
21 pycompat, | 22 pycompat, |
22 setdiscovery, | 23 setdiscovery, |
23 ) | 24 ) |
54 | 55 |
55 # Write bookmark updates. | 56 # Write bookmark updates. |
56 bookmarks.updatefromremote(repo.ui, repo, csetres['bookmarks'], | 57 bookmarks.updatefromremote(repo.ui, repo, csetres['bookmarks'], |
57 remote.url(), pullop.gettransaction, | 58 remote.url(), pullop.gettransaction, |
58 explicit=pullop.explicitbookmarks) | 59 explicit=pullop.explicitbookmarks) |
60 | |
61 _fetchmanifests(repo, tr, remote, csetres['manifestnodes']) | |
59 | 62 |
60 def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True): | 63 def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True): |
61 """Determine which changesets need to be pulled.""" | 64 """Determine which changesets need to be pulled.""" |
62 | 65 |
63 if heads: | 66 if heads: |
119 | 122 |
120 progress = repo.ui.makeprogress(_('changesets'), | 123 progress = repo.ui.makeprogress(_('changesets'), |
121 unit=_('chunks'), | 124 unit=_('chunks'), |
122 total=meta.get(b'totalitems')) | 125 total=meta.get(b'totalitems')) |
123 | 126 |
127 manifestnodes = {} | |
128 | |
124 def linkrev(node): | 129 def linkrev(node): |
125 repo.ui.debug('add changeset %s\n' % short(node)) | 130 repo.ui.debug('add changeset %s\n' % short(node)) |
126 # Linkrev for changelog is always self. | 131 # Linkrev for changelog is always self. |
127 return len(cl) | 132 return len(cl) |
128 | 133 |
129 def onchangeset(cl, node): | 134 def onchangeset(cl, node): |
130 progress.increment() | 135 progress.increment() |
136 | |
137 revision = cl.changelogrevision(node) | |
138 | |
139 # We need to preserve the mapping of changelog revision to node | |
140 # so we can set the linkrev accordingly when manifests are added. | |
141 manifestnodes[cl.rev(node)] = revision.manifest | |
131 | 142 |
132 nodesbyphase = {phase: set() for phase in phases.phasenames} | 143 nodesbyphase = {phase: set() for phase in phases.phasenames} |
133 remotebookmarks = {} | 144 remotebookmarks = {} |
134 | 145 |
135 # addgroup() expects a 7-tuple describing revisions. This normalizes | 146 # addgroup() expects a 7-tuple describing revisions. This normalizes |
176 | 187 |
177 return { | 188 return { |
178 'added': added, | 189 'added': added, |
179 'nodesbyphase': nodesbyphase, | 190 'nodesbyphase': nodesbyphase, |
180 'bookmarks': remotebookmarks, | 191 'bookmarks': remotebookmarks, |
192 'manifestnodes': manifestnodes, | |
181 } | 193 } |
194 | |
195 def _fetchmanifests(repo, tr, remote, manifestnodes): | |
196 rootmanifest = repo.manifestlog.getstorage(b'') | |
197 | |
198 # Some manifests can be shared between changesets. Filter out revisions | |
199 # we already know about. | |
200 fetchnodes = [] | |
201 linkrevs = {} | |
202 seen = set() | |
203 | |
204 for clrev, node in sorted(manifestnodes.iteritems()): | |
205 if node in seen: | |
206 continue | |
207 | |
208 try: | |
209 rootmanifest.rev(node) | |
210 except error.LookupError: | |
211 fetchnodes.append(node) | |
212 linkrevs[node] = clrev | |
213 | |
214 seen.add(node) | |
215 | |
216 # TODO handle tree manifests | |
217 | |
218 # addgroup() expects 7-tuple describing revisions. This normalizes | |
219 # the wire data to that format. | |
220 def iterrevisions(objs, progress): | |
221 for manifest in objs: | |
222 node = manifest[b'node'] | |
223 | |
224 if b'deltasize' in manifest: | |
225 basenode = manifest[b'deltabasenode'] | |
226 delta = next(objs) | |
227 elif b'revisionsize' in manifest: | |
228 basenode = nullid | |
229 revision = next(objs) | |
230 delta = mdiff.trivialdiffheader(len(revision)) + revision | |
231 else: | |
232 continue | |
233 | |
234 yield ( | |
235 node, | |
236 manifest[b'parents'][0], | |
237 manifest[b'parents'][1], | |
238 # The value passed in is passed to the lookup function passed | |
239 # to addgroup(). We already have a map of manifest node to | |
240 # changelog revision number. So we just pass in the | |
241 # manifest node here and use linkrevs.__getitem__ as the | |
242 # resolution function. | |
243 node, | |
244 basenode, | |
245 delta, | |
246 # Flags not yet supported. | |
247 0 | |
248 ) | |
249 | |
250 progress.increment() | |
251 | |
252 progress = repo.ui.makeprogress(_('manifests'), unit=_('chunks'), | |
253 total=len(fetchnodes)) | |
254 | |
255 # Fetch manifests 10,000 per command. | |
256 # TODO have server advertise preferences? | |
257 # TODO make size configurable on client? | |
258 batchsize = 10000 | |
259 | |
260 # We send commands 1 at a time to the remote. This is not the most | |
261 # efficient because we incur a round trip at the end of each batch. | |
262 # However, the existing frame-based reactor keeps consuming server | |
263 # data in the background. And this results in response data buffering | |
264 # in memory. This can consume gigabytes of memory. | |
265 # TODO send multiple commands in a request once background buffering | |
266 # issues are resolved. | |
267 | |
268 added = [] | |
269 | |
270 for i in pycompat.xrange(0, len(fetchnodes), batchsize): | |
271 batch = [node for node in fetchnodes[i:i + batchsize]] | |
272 if not batch: | |
273 continue | |
274 | |
275 with remote.commandexecutor() as e: | |
276 objs = e.callcommand(b'manifestdata', { | |
277 b'tree': b'', | |
278 b'nodes': batch, | |
279 b'fields': {b'parents', b'revision'}, | |
280 }).result() | |
281 | |
282 # Chomp off header object. | |
283 next(objs) | |
284 | |
285 added.extend(rootmanifest.addgroup( | |
286 iterrevisions(objs, progress), | |
287 linkrevs.__getitem__, | |
288 weakref.proxy(tr))) | |
289 | |
290 progress.complete() | |
291 | |
292 return { | |
293 'added': added, | |
294 } |