1 # exchangev2.py - repository exchange for wire protocol version 2 |
|
2 # |
|
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com> |
|
4 # |
|
5 # This software may be used and distributed according to the terms of the |
|
6 # GNU General Public License version 2 or any later version. |
|
7 |
|
8 from __future__ import absolute_import |
|
9 |
|
10 import collections |
|
11 import weakref |
|
12 |
|
13 from .i18n import _ |
|
14 from .node import short |
|
15 from . import ( |
|
16 bookmarks, |
|
17 error, |
|
18 mdiff, |
|
19 narrowspec, |
|
20 phases, |
|
21 pycompat, |
|
22 requirements as requirementsmod, |
|
23 setdiscovery, |
|
24 ) |
|
25 from .interfaces import repository |
|
26 |
|
27 |
|
28 def pull(pullop): |
|
29 """Pull using wire protocol version 2.""" |
|
30 repo = pullop.repo |
|
31 remote = pullop.remote |
|
32 |
|
33 usingrawchangelogandmanifest = _checkuserawstorefiledata(pullop) |
|
34 |
|
35 # If this is a clone and it was requested to perform a "stream clone", |
|
36 # we obtain the raw files data from the remote then fall back to an |
|
37 # incremental pull. This is somewhat hacky and is not nearly robust enough |
|
38 # for long-term usage. |
|
39 if usingrawchangelogandmanifest: |
|
40 with repo.transaction(b'clone'): |
|
41 _fetchrawstorefiles(repo, remote) |
|
42 repo.invalidate(clearfilecache=True) |
|
43 |
|
44 tr = pullop.trmanager.transaction() |
|
45 |
|
46 # We don't use the repo's narrow matcher here because the patterns passed |
|
47 # to exchange.pull() could be different. |
|
48 narrowmatcher = narrowspec.match( |
|
49 repo.root, |
|
50 # Empty maps to nevermatcher. So always |
|
51 # set includes if missing. |
|
52 pullop.includepats or {b'path:.'}, |
|
53 pullop.excludepats, |
|
54 ) |
|
55 |
|
56 if pullop.includepats or pullop.excludepats: |
|
57 pathfilter = {} |
|
58 if pullop.includepats: |
|
59 pathfilter[b'include'] = sorted(pullop.includepats) |
|
60 if pullop.excludepats: |
|
61 pathfilter[b'exclude'] = sorted(pullop.excludepats) |
|
62 else: |
|
63 pathfilter = None |
|
64 |
|
65 # Figure out what needs to be fetched. |
|
66 common, fetch, remoteheads = _pullchangesetdiscovery( |
|
67 repo, remote, pullop.heads, abortwhenunrelated=pullop.force |
|
68 ) |
|
69 |
|
70 # And fetch the data. |
|
71 pullheads = pullop.heads or remoteheads |
|
72 csetres = _fetchchangesets(repo, tr, remote, common, fetch, pullheads) |
|
73 |
|
74 # New revisions are written to the changelog. But all other updates |
|
75 # are deferred. Do those now. |
|
76 |
|
77 # Ensure all new changesets are draft by default. If the repo is |
|
78 # publishing, the phase will be adjusted by the loop below. |
|
79 if csetres[b'added']: |
|
80 phases.registernew( |
|
81 repo, tr, phases.draft, [repo[n].rev() for n in csetres[b'added']] |
|
82 ) |
|
83 |
|
84 # And adjust the phase of all changesets accordingly. |
|
85 for phasenumber, phase in phases.phasenames.items(): |
|
86 if phase == b'secret' or not csetres[b'nodesbyphase'][phase]: |
|
87 continue |
|
88 |
|
89 phases.advanceboundary( |
|
90 repo, |
|
91 tr, |
|
92 phasenumber, |
|
93 csetres[b'nodesbyphase'][phase], |
|
94 ) |
|
95 |
|
96 # Write bookmark updates. |
|
97 bookmarks.updatefromremote( |
|
98 repo.ui, |
|
99 repo, |
|
100 csetres[b'bookmarks'], |
|
101 remote.url(), |
|
102 pullop.gettransaction, |
|
103 explicit=pullop.explicitbookmarks, |
|
104 ) |
|
105 |
|
106 manres = _fetchmanifests(repo, tr, remote, csetres[b'manifestnodes']) |
|
107 |
|
108 # We don't properly support shallow changeset and manifest yet. So we apply |
|
109 # depth limiting locally. |
|
110 if pullop.depth: |
|
111 relevantcsetnodes = set() |
|
112 clnode = repo.changelog.node |
|
113 |
|
114 for rev in repo.revs( |
|
115 b'ancestors(%ln, %s)', pullheads, pullop.depth - 1 |
|
116 ): |
|
117 relevantcsetnodes.add(clnode(rev)) |
|
118 |
|
119 csetrelevantfilter = lambda n: n in relevantcsetnodes |
|
120 |
|
121 else: |
|
122 csetrelevantfilter = lambda n: True |
|
123 |
|
124 # If obtaining the raw store files, we need to scan the full repo to |
|
125 # derive all the changesets, manifests, and linkrevs. |
|
126 if usingrawchangelogandmanifest: |
|
127 csetsforfiles = [] |
|
128 mnodesforfiles = [] |
|
129 manifestlinkrevs = {} |
|
130 |
|
131 for rev in repo: |
|
132 ctx = repo[rev] |
|
133 node = ctx.node() |
|
134 |
|
135 if not csetrelevantfilter(node): |
|
136 continue |
|
137 |
|
138 mnode = ctx.manifestnode() |
|
139 |
|
140 csetsforfiles.append(node) |
|
141 mnodesforfiles.append(mnode) |
|
142 manifestlinkrevs[mnode] = rev |
|
143 |
|
144 else: |
|
145 csetsforfiles = [n for n in csetres[b'added'] if csetrelevantfilter(n)] |
|
146 mnodesforfiles = manres[b'added'] |
|
147 manifestlinkrevs = manres[b'linkrevs'] |
|
148 |
|
149 # Find all file nodes referenced by added manifests and fetch those |
|
150 # revisions. |
|
151 fnodes = _derivefilesfrommanifests(repo, narrowmatcher, mnodesforfiles) |
|
152 _fetchfilesfromcsets( |
|
153 repo, |
|
154 tr, |
|
155 remote, |
|
156 pathfilter, |
|
157 fnodes, |
|
158 csetsforfiles, |
|
159 manifestlinkrevs, |
|
160 shallow=bool(pullop.depth), |
|
161 ) |
|
162 |
|
163 |
|
164 def _checkuserawstorefiledata(pullop): |
|
165 """Check whether we should use rawstorefiledata command to retrieve data.""" |
|
166 |
|
167 repo = pullop.repo |
|
168 remote = pullop.remote |
|
169 |
|
170 # Command to obtain raw store data isn't available. |
|
171 if b'rawstorefiledata' not in remote.apidescriptor[b'commands']: |
|
172 return False |
|
173 |
|
174 # Only honor if user requested stream clone operation. |
|
175 if not pullop.streamclonerequested: |
|
176 return False |
|
177 |
|
178 # Only works on empty repos. |
|
179 if len(repo): |
|
180 return False |
|
181 |
|
182 # TODO This is super hacky. There needs to be a storage API for this. We |
|
183 # also need to check for compatibility with the remote. |
|
184 if requirementsmod.REVLOGV1_REQUIREMENT not in repo.requirements: |
|
185 return False |
|
186 |
|
187 return True |
|
188 |
|
189 |
|
190 def _fetchrawstorefiles(repo, remote): |
|
191 with remote.commandexecutor() as e: |
|
192 objs = e.callcommand( |
|
193 b'rawstorefiledata', |
|
194 { |
|
195 b'files': [b'changelog', b'manifestlog'], |
|
196 }, |
|
197 ).result() |
|
198 |
|
199 # First object is a summary of files data that follows. |
|
200 overall = next(objs) |
|
201 |
|
202 progress = repo.ui.makeprogress( |
|
203 _(b'clone'), total=overall[b'totalsize'], unit=_(b'bytes') |
|
204 ) |
|
205 with progress: |
|
206 progress.update(0) |
|
207 |
|
208 # Next are pairs of file metadata, data. |
|
209 while True: |
|
210 try: |
|
211 filemeta = next(objs) |
|
212 except StopIteration: |
|
213 break |
|
214 |
|
215 for k in (b'location', b'path', b'size'): |
|
216 if k not in filemeta: |
|
217 raise error.Abort( |
|
218 _(b'remote file data missing key: %s') % k |
|
219 ) |
|
220 |
|
221 if filemeta[b'location'] == b'store': |
|
222 vfs = repo.svfs |
|
223 else: |
|
224 raise error.Abort( |
|
225 _(b'invalid location for raw file data: %s') |
|
226 % filemeta[b'location'] |
|
227 ) |
|
228 |
|
229 bytesremaining = filemeta[b'size'] |
|
230 |
|
231 with vfs.open(filemeta[b'path'], b'wb') as fh: |
|
232 while True: |
|
233 try: |
|
234 chunk = next(objs) |
|
235 except StopIteration: |
|
236 break |
|
237 |
|
238 bytesremaining -= len(chunk) |
|
239 |
|
240 if bytesremaining < 0: |
|
241 raise error.Abort( |
|
242 _( |
|
243 b'received invalid number of bytes for file ' |
|
244 b'data; expected %d, got extra' |
|
245 ) |
|
246 % filemeta[b'size'] |
|
247 ) |
|
248 |
|
249 progress.increment(step=len(chunk)) |
|
250 fh.write(chunk) |
|
251 |
|
252 try: |
|
253 if chunk.islast: |
|
254 break |
|
255 except AttributeError: |
|
256 raise error.Abort( |
|
257 _( |
|
258 b'did not receive indefinite length bytestring ' |
|
259 b'for file data' |
|
260 ) |
|
261 ) |
|
262 |
|
263 if bytesremaining: |
|
264 raise error.Abort( |
|
265 _( |
|
266 b'received invalid number of bytes for' |
|
267 b'file data; expected %d got %d' |
|
268 ) |
|
269 % ( |
|
270 filemeta[b'size'], |
|
271 filemeta[b'size'] - bytesremaining, |
|
272 ) |
|
273 ) |
|
274 |
|
275 |
|
276 def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True): |
|
277 """Determine which changesets need to be pulled.""" |
|
278 |
|
279 if heads: |
|
280 knownnode = repo.changelog.hasnode |
|
281 if all(knownnode(head) for head in heads): |
|
282 return heads, False, heads |
|
283 |
|
284 # TODO wire protocol version 2 is capable of more efficient discovery |
|
285 # than setdiscovery. Consider implementing something better. |
|
286 common, fetch, remoteheads = setdiscovery.findcommonheads( |
|
287 repo.ui, repo, remote, abortwhenunrelated=abortwhenunrelated |
|
288 ) |
|
289 |
|
290 common = set(common) |
|
291 remoteheads = set(remoteheads) |
|
292 |
|
293 # If a remote head is filtered locally, put it back in the common set. |
|
294 # See the comment in exchange._pulldiscoverychangegroup() for more. |
|
295 |
|
296 if fetch and remoteheads: |
|
297 has_node = repo.unfiltered().changelog.index.has_node |
|
298 |
|
299 common |= {head for head in remoteheads if has_node(head)} |
|
300 |
|
301 if set(remoteheads).issubset(common): |
|
302 fetch = [] |
|
303 |
|
304 common.discard(repo.nullid) |
|
305 |
|
306 return common, fetch, remoteheads |
|
307 |
|
308 |
|
309 def _fetchchangesets(repo, tr, remote, common, fetch, remoteheads): |
|
310 # TODO consider adding a step here where we obtain the DAG shape first |
|
311 # (or ask the server to slice changesets into chunks for us) so that |
|
312 # we can perform multiple fetches in batches. This will facilitate |
|
313 # resuming interrupted clones, higher server-side cache hit rates due |
|
314 # to smaller segments, etc. |
|
315 with remote.commandexecutor() as e: |
|
316 objs = e.callcommand( |
|
317 b'changesetdata', |
|
318 { |
|
319 b'revisions': [ |
|
320 { |
|
321 b'type': b'changesetdagrange', |
|
322 b'roots': sorted(common), |
|
323 b'heads': sorted(remoteheads), |
|
324 } |
|
325 ], |
|
326 b'fields': {b'bookmarks', b'parents', b'phase', b'revision'}, |
|
327 }, |
|
328 ).result() |
|
329 |
|
330 # The context manager waits on all response data when exiting. So |
|
331 # we need to remain in the context manager in order to stream data. |
|
332 return _processchangesetdata(repo, tr, objs) |
|
333 |
|
334 |
|
335 def _processchangesetdata(repo, tr, objs): |
|
336 repo.hook(b'prechangegroup', throw=True, **pycompat.strkwargs(tr.hookargs)) |
|
337 |
|
338 urepo = repo.unfiltered() |
|
339 cl = urepo.changelog |
|
340 |
|
341 cl.delayupdate(tr) |
|
342 |
|
343 # The first emitted object is a header describing the data that |
|
344 # follows. |
|
345 meta = next(objs) |
|
346 |
|
347 progress = repo.ui.makeprogress( |
|
348 _(b'changesets'), unit=_(b'chunks'), total=meta.get(b'totalitems') |
|
349 ) |
|
350 |
|
351 manifestnodes = {} |
|
352 added = [] |
|
353 |
|
354 def linkrev(node): |
|
355 repo.ui.debug(b'add changeset %s\n' % short(node)) |
|
356 # Linkrev for changelog is always self. |
|
357 return len(cl) |
|
358 |
|
359 def ondupchangeset(cl, rev): |
|
360 added.append(cl.node(rev)) |
|
361 |
|
362 def onchangeset(cl, rev): |
|
363 progress.increment() |
|
364 |
|
365 revision = cl.changelogrevision(rev) |
|
366 added.append(cl.node(rev)) |
|
367 |
|
368 # We need to preserve the mapping of changelog revision to node |
|
369 # so we can set the linkrev accordingly when manifests are added. |
|
370 manifestnodes[rev] = revision.manifest |
|
371 |
|
372 repo.register_changeset(rev, revision) |
|
373 |
|
374 nodesbyphase = {phase: set() for phase in phases.phasenames.values()} |
|
375 remotebookmarks = {} |
|
376 |
|
377 # addgroup() expects a 7-tuple describing revisions. This normalizes |
|
378 # the wire data to that format. |
|
379 # |
|
380 # This loop also aggregates non-revision metadata, such as phase |
|
381 # data. |
|
382 def iterrevisions(): |
|
383 for cset in objs: |
|
384 node = cset[b'node'] |
|
385 |
|
386 if b'phase' in cset: |
|
387 nodesbyphase[cset[b'phase']].add(node) |
|
388 |
|
389 for mark in cset.get(b'bookmarks', []): |
|
390 remotebookmarks[mark] = node |
|
391 |
|
392 # TODO add mechanism for extensions to examine records so they |
|
393 # can siphon off custom data fields. |
|
394 |
|
395 extrafields = {} |
|
396 |
|
397 for field, size in cset.get(b'fieldsfollowing', []): |
|
398 extrafields[field] = next(objs) |
|
399 |
|
400 # Some entries might only be metadata only updates. |
|
401 if b'revision' not in extrafields: |
|
402 continue |
|
403 |
|
404 data = extrafields[b'revision'] |
|
405 |
|
406 yield ( |
|
407 node, |
|
408 cset[b'parents'][0], |
|
409 cset[b'parents'][1], |
|
410 # Linknode is always itself for changesets. |
|
411 cset[b'node'], |
|
412 # We always send full revisions. So delta base is not set. |
|
413 repo.nullid, |
|
414 mdiff.trivialdiffheader(len(data)) + data, |
|
415 # Flags not yet supported. |
|
416 0, |
|
417 # Sidedata not yet supported |
|
418 {}, |
|
419 ) |
|
420 |
|
421 cl.addgroup( |
|
422 iterrevisions(), |
|
423 linkrev, |
|
424 weakref.proxy(tr), |
|
425 alwayscache=True, |
|
426 addrevisioncb=onchangeset, |
|
427 duplicaterevisioncb=ondupchangeset, |
|
428 ) |
|
429 |
|
430 progress.complete() |
|
431 |
|
432 return { |
|
433 b'added': added, |
|
434 b'nodesbyphase': nodesbyphase, |
|
435 b'bookmarks': remotebookmarks, |
|
436 b'manifestnodes': manifestnodes, |
|
437 } |
|
438 |
|
439 |
|
440 def _fetchmanifests(repo, tr, remote, manifestnodes): |
|
441 rootmanifest = repo.manifestlog.getstorage(b'') |
|
442 |
|
443 # Some manifests can be shared between changesets. Filter out revisions |
|
444 # we already know about. |
|
445 fetchnodes = [] |
|
446 linkrevs = {} |
|
447 seen = set() |
|
448 |
|
449 for clrev, node in sorted(pycompat.iteritems(manifestnodes)): |
|
450 if node in seen: |
|
451 continue |
|
452 |
|
453 try: |
|
454 rootmanifest.rev(node) |
|
455 except error.LookupError: |
|
456 fetchnodes.append(node) |
|
457 linkrevs[node] = clrev |
|
458 |
|
459 seen.add(node) |
|
460 |
|
461 # TODO handle tree manifests |
|
462 |
|
463 # addgroup() expects 7-tuple describing revisions. This normalizes |
|
464 # the wire data to that format. |
|
465 def iterrevisions(objs, progress): |
|
466 for manifest in objs: |
|
467 node = manifest[b'node'] |
|
468 |
|
469 extrafields = {} |
|
470 |
|
471 for field, size in manifest.get(b'fieldsfollowing', []): |
|
472 extrafields[field] = next(objs) |
|
473 |
|
474 if b'delta' in extrafields: |
|
475 basenode = manifest[b'deltabasenode'] |
|
476 delta = extrafields[b'delta'] |
|
477 elif b'revision' in extrafields: |
|
478 basenode = repo.nullid |
|
479 revision = extrafields[b'revision'] |
|
480 delta = mdiff.trivialdiffheader(len(revision)) + revision |
|
481 else: |
|
482 continue |
|
483 |
|
484 yield ( |
|
485 node, |
|
486 manifest[b'parents'][0], |
|
487 manifest[b'parents'][1], |
|
488 # The value passed in is passed to the lookup function passed |
|
489 # to addgroup(). We already have a map of manifest node to |
|
490 # changelog revision number. So we just pass in the |
|
491 # manifest node here and use linkrevs.__getitem__ as the |
|
492 # resolution function. |
|
493 node, |
|
494 basenode, |
|
495 delta, |
|
496 # Flags not yet supported. |
|
497 0, |
|
498 # Sidedata not yet supported. |
|
499 {}, |
|
500 ) |
|
501 |
|
502 progress.increment() |
|
503 |
|
504 progress = repo.ui.makeprogress( |
|
505 _(b'manifests'), unit=_(b'chunks'), total=len(fetchnodes) |
|
506 ) |
|
507 |
|
508 commandmeta = remote.apidescriptor[b'commands'][b'manifestdata'] |
|
509 batchsize = commandmeta.get(b'recommendedbatchsize', 10000) |
|
510 # TODO make size configurable on client? |
|
511 |
|
512 # We send commands 1 at a time to the remote. This is not the most |
|
513 # efficient because we incur a round trip at the end of each batch. |
|
514 # However, the existing frame-based reactor keeps consuming server |
|
515 # data in the background. And this results in response data buffering |
|
516 # in memory. This can consume gigabytes of memory. |
|
517 # TODO send multiple commands in a request once background buffering |
|
518 # issues are resolved. |
|
519 |
|
520 added = [] |
|
521 |
|
522 for i in pycompat.xrange(0, len(fetchnodes), batchsize): |
|
523 batch = [node for node in fetchnodes[i : i + batchsize]] |
|
524 if not batch: |
|
525 continue |
|
526 |
|
527 with remote.commandexecutor() as e: |
|
528 objs = e.callcommand( |
|
529 b'manifestdata', |
|
530 { |
|
531 b'tree': b'', |
|
532 b'nodes': batch, |
|
533 b'fields': {b'parents', b'revision'}, |
|
534 b'haveparents': True, |
|
535 }, |
|
536 ).result() |
|
537 |
|
538 # Chomp off header object. |
|
539 next(objs) |
|
540 |
|
541 def onchangeset(cl, rev): |
|
542 added.append(cl.node(rev)) |
|
543 |
|
544 rootmanifest.addgroup( |
|
545 iterrevisions(objs, progress), |
|
546 linkrevs.__getitem__, |
|
547 weakref.proxy(tr), |
|
548 addrevisioncb=onchangeset, |
|
549 duplicaterevisioncb=onchangeset, |
|
550 ) |
|
551 |
|
552 progress.complete() |
|
553 |
|
554 return { |
|
555 b'added': added, |
|
556 b'linkrevs': linkrevs, |
|
557 } |
|
558 |
|
559 |
|
560 def _derivefilesfrommanifests(repo, matcher, manifestnodes): |
|
561 """Determine what file nodes are relevant given a set of manifest nodes. |
|
562 |
|
563 Returns a dict mapping file paths to dicts of file node to first manifest |
|
564 node. |
|
565 """ |
|
566 ml = repo.manifestlog |
|
567 fnodes = collections.defaultdict(dict) |
|
568 |
|
569 progress = repo.ui.makeprogress( |
|
570 _(b'scanning manifests'), total=len(manifestnodes) |
|
571 ) |
|
572 |
|
573 with progress: |
|
574 for manifestnode in manifestnodes: |
|
575 m = ml.get(b'', manifestnode) |
|
576 |
|
577 # TODO this will pull in unwanted nodes because it takes the storage |
|
578 # delta into consideration. What we really want is something that |
|
579 # takes the delta between the manifest's parents. And ideally we |
|
580 # would ignore file nodes that are known locally. For now, ignore |
|
581 # both these limitations. This will result in incremental fetches |
|
582 # requesting data we already have. So this is far from ideal. |
|
583 md = m.readfast() |
|
584 |
|
585 for path, fnode in md.items(): |
|
586 if matcher(path): |
|
587 fnodes[path].setdefault(fnode, manifestnode) |
|
588 |
|
589 progress.increment() |
|
590 |
|
591 return fnodes |
|
592 |
|
593 |
|
594 def _fetchfiles(repo, tr, remote, fnodes, linkrevs): |
|
595 """Fetch file data from explicit file revisions.""" |
|
596 |
|
597 def iterrevisions(objs, progress): |
|
598 for filerevision in objs: |
|
599 node = filerevision[b'node'] |
|
600 |
|
601 extrafields = {} |
|
602 |
|
603 for field, size in filerevision.get(b'fieldsfollowing', []): |
|
604 extrafields[field] = next(objs) |
|
605 |
|
606 if b'delta' in extrafields: |
|
607 basenode = filerevision[b'deltabasenode'] |
|
608 delta = extrafields[b'delta'] |
|
609 elif b'revision' in extrafields: |
|
610 basenode = repo.nullid |
|
611 revision = extrafields[b'revision'] |
|
612 delta = mdiff.trivialdiffheader(len(revision)) + revision |
|
613 else: |
|
614 continue |
|
615 |
|
616 yield ( |
|
617 node, |
|
618 filerevision[b'parents'][0], |
|
619 filerevision[b'parents'][1], |
|
620 node, |
|
621 basenode, |
|
622 delta, |
|
623 # Flags not yet supported. |
|
624 0, |
|
625 # Sidedata not yet supported. |
|
626 {}, |
|
627 ) |
|
628 |
|
629 progress.increment() |
|
630 |
|
631 progress = repo.ui.makeprogress( |
|
632 _(b'files'), |
|
633 unit=_(b'chunks'), |
|
634 total=sum(len(v) for v in pycompat.itervalues(fnodes)), |
|
635 ) |
|
636 |
|
637 # TODO make batch size configurable |
|
638 batchsize = 10000 |
|
639 fnodeslist = [x for x in sorted(fnodes.items())] |
|
640 |
|
641 for i in pycompat.xrange(0, len(fnodeslist), batchsize): |
|
642 batch = [x for x in fnodeslist[i : i + batchsize]] |
|
643 if not batch: |
|
644 continue |
|
645 |
|
646 with remote.commandexecutor() as e: |
|
647 fs = [] |
|
648 locallinkrevs = {} |
|
649 |
|
650 for path, nodes in batch: |
|
651 fs.append( |
|
652 ( |
|
653 path, |
|
654 e.callcommand( |
|
655 b'filedata', |
|
656 { |
|
657 b'path': path, |
|
658 b'nodes': sorted(nodes), |
|
659 b'fields': {b'parents', b'revision'}, |
|
660 b'haveparents': True, |
|
661 }, |
|
662 ), |
|
663 ) |
|
664 ) |
|
665 |
|
666 locallinkrevs[path] = { |
|
667 node: linkrevs[manifestnode] |
|
668 for node, manifestnode in pycompat.iteritems(nodes) |
|
669 } |
|
670 |
|
671 for path, f in fs: |
|
672 objs = f.result() |
|
673 |
|
674 # Chomp off header objects. |
|
675 next(objs) |
|
676 |
|
677 store = repo.file(path) |
|
678 store.addgroup( |
|
679 iterrevisions(objs, progress), |
|
680 locallinkrevs[path].__getitem__, |
|
681 weakref.proxy(tr), |
|
682 ) |
|
683 |
|
684 |
|
685 def _fetchfilesfromcsets( |
|
686 repo, tr, remote, pathfilter, fnodes, csets, manlinkrevs, shallow=False |
|
687 ): |
|
688 """Fetch file data from explicit changeset revisions.""" |
|
689 |
|
690 def iterrevisions(objs, remaining, progress): |
|
691 while remaining: |
|
692 filerevision = next(objs) |
|
693 |
|
694 node = filerevision[b'node'] |
|
695 |
|
696 extrafields = {} |
|
697 |
|
698 for field, size in filerevision.get(b'fieldsfollowing', []): |
|
699 extrafields[field] = next(objs) |
|
700 |
|
701 if b'delta' in extrafields: |
|
702 basenode = filerevision[b'deltabasenode'] |
|
703 delta = extrafields[b'delta'] |
|
704 elif b'revision' in extrafields: |
|
705 basenode = repo.nullid |
|
706 revision = extrafields[b'revision'] |
|
707 delta = mdiff.trivialdiffheader(len(revision)) + revision |
|
708 else: |
|
709 continue |
|
710 |
|
711 if b'linknode' in filerevision: |
|
712 linknode = filerevision[b'linknode'] |
|
713 else: |
|
714 linknode = node |
|
715 |
|
716 yield ( |
|
717 node, |
|
718 filerevision[b'parents'][0], |
|
719 filerevision[b'parents'][1], |
|
720 linknode, |
|
721 basenode, |
|
722 delta, |
|
723 # Flags not yet supported. |
|
724 0, |
|
725 # Sidedata not yet supported. |
|
726 {}, |
|
727 ) |
|
728 |
|
729 progress.increment() |
|
730 remaining -= 1 |
|
731 |
|
732 progress = repo.ui.makeprogress( |
|
733 _(b'files'), |
|
734 unit=_(b'chunks'), |
|
735 total=sum(len(v) for v in pycompat.itervalues(fnodes)), |
|
736 ) |
|
737 |
|
738 commandmeta = remote.apidescriptor[b'commands'][b'filesdata'] |
|
739 batchsize = commandmeta.get(b'recommendedbatchsize', 50000) |
|
740 |
|
741 shallowfiles = repository.REPO_FEATURE_SHALLOW_FILE_STORAGE in repo.features |
|
742 fields = {b'parents', b'revision'} |
|
743 clrev = repo.changelog.rev |
|
744 |
|
745 # There are no guarantees that we'll have ancestor revisions if |
|
746 # a) this repo has shallow file storage b) shallow data fetching is enabled. |
|
747 # Force remote to not delta against possibly unknown revisions when these |
|
748 # conditions hold. |
|
749 haveparents = not (shallowfiles or shallow) |
|
750 |
|
751 # Similarly, we may not have calculated linkrevs for all incoming file |
|
752 # revisions. Ask the remote to do work for us in this case. |
|
753 if not haveparents: |
|
754 fields.add(b'linknode') |
|
755 |
|
756 for i in pycompat.xrange(0, len(csets), batchsize): |
|
757 batch = [x for x in csets[i : i + batchsize]] |
|
758 if not batch: |
|
759 continue |
|
760 |
|
761 with remote.commandexecutor() as e: |
|
762 args = { |
|
763 b'revisions': [ |
|
764 { |
|
765 b'type': b'changesetexplicit', |
|
766 b'nodes': batch, |
|
767 } |
|
768 ], |
|
769 b'fields': fields, |
|
770 b'haveparents': haveparents, |
|
771 } |
|
772 |
|
773 if pathfilter: |
|
774 args[b'pathfilter'] = pathfilter |
|
775 |
|
776 objs = e.callcommand(b'filesdata', args).result() |
|
777 |
|
778 # First object is an overall header. |
|
779 overall = next(objs) |
|
780 |
|
781 # We have overall['totalpaths'] segments. |
|
782 for i in pycompat.xrange(overall[b'totalpaths']): |
|
783 header = next(objs) |
|
784 |
|
785 path = header[b'path'] |
|
786 store = repo.file(path) |
|
787 |
|
788 linkrevs = { |
|
789 fnode: manlinkrevs[mnode] |
|
790 for fnode, mnode in pycompat.iteritems(fnodes[path]) |
|
791 } |
|
792 |
|
793 def getlinkrev(node): |
|
794 if node in linkrevs: |
|
795 return linkrevs[node] |
|
796 else: |
|
797 return clrev(node) |
|
798 |
|
799 store.addgroup( |
|
800 iterrevisions(objs, header[b'totalitems'], progress), |
|
801 getlinkrev, |
|
802 weakref.proxy(tr), |
|
803 maybemissingparents=shallow, |
|
804 ) |
|