mercurial/exchangev2.py
changeset 48526 04688c51f81f
parent 48525 d6c53b40b078
child 48527 bf5dc156bb4c
equal deleted inserted replaced
48525:d6c53b40b078 48526:04688c51f81f
     1 # exchangev2.py - repository exchange for wire protocol version 2
       
     2 #
       
     3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
       
     4 #
       
     5 # This software may be used and distributed according to the terms of the
       
     6 # GNU General Public License version 2 or any later version.
       
     7 
       
     8 from __future__ import absolute_import
       
     9 
       
    10 import collections
       
    11 import weakref
       
    12 
       
    13 from .i18n import _
       
    14 from .node import short
       
    15 from . import (
       
    16     bookmarks,
       
    17     error,
       
    18     mdiff,
       
    19     narrowspec,
       
    20     phases,
       
    21     pycompat,
       
    22     requirements as requirementsmod,
       
    23     setdiscovery,
       
    24 )
       
    25 from .interfaces import repository
       
    26 
       
    27 
       
    28 def pull(pullop):
       
    29     """Pull using wire protocol version 2."""
       
    30     repo = pullop.repo
       
    31     remote = pullop.remote
       
    32 
       
    33     usingrawchangelogandmanifest = _checkuserawstorefiledata(pullop)
       
    34 
       
    35     # If this is a clone and it was requested to perform a "stream clone",
       
    36     # we obtain the raw files data from the remote then fall back to an
       
    37     # incremental pull. This is somewhat hacky and is not nearly robust enough
       
    38     # for long-term usage.
       
    39     if usingrawchangelogandmanifest:
       
    40         with repo.transaction(b'clone'):
       
    41             _fetchrawstorefiles(repo, remote)
       
    42             repo.invalidate(clearfilecache=True)
       
    43 
       
    44     tr = pullop.trmanager.transaction()
       
    45 
       
    46     # We don't use the repo's narrow matcher here because the patterns passed
       
    47     # to exchange.pull() could be different.
       
    48     narrowmatcher = narrowspec.match(
       
    49         repo.root,
       
    50         # Empty maps to nevermatcher. So always
       
    51         # set includes if missing.
       
    52         pullop.includepats or {b'path:.'},
       
    53         pullop.excludepats,
       
    54     )
       
    55 
       
    56     if pullop.includepats or pullop.excludepats:
       
    57         pathfilter = {}
       
    58         if pullop.includepats:
       
    59             pathfilter[b'include'] = sorted(pullop.includepats)
       
    60         if pullop.excludepats:
       
    61             pathfilter[b'exclude'] = sorted(pullop.excludepats)
       
    62     else:
       
    63         pathfilter = None
       
    64 
       
    65     # Figure out what needs to be fetched.
       
    66     common, fetch, remoteheads = _pullchangesetdiscovery(
       
    67         repo, remote, pullop.heads, abortwhenunrelated=pullop.force
       
    68     )
       
    69 
       
    70     # And fetch the data.
       
    71     pullheads = pullop.heads or remoteheads
       
    72     csetres = _fetchchangesets(repo, tr, remote, common, fetch, pullheads)
       
    73 
       
    74     # New revisions are written to the changelog. But all other updates
       
    75     # are deferred. Do those now.
       
    76 
       
    77     # Ensure all new changesets are draft by default. If the repo is
       
    78     # publishing, the phase will be adjusted by the loop below.
       
    79     if csetres[b'added']:
       
    80         phases.registernew(
       
    81             repo, tr, phases.draft, [repo[n].rev() for n in csetres[b'added']]
       
    82         )
       
    83 
       
    84     # And adjust the phase of all changesets accordingly.
       
    85     for phasenumber, phase in phases.phasenames.items():
       
    86         if phase == b'secret' or not csetres[b'nodesbyphase'][phase]:
       
    87             continue
       
    88 
       
    89         phases.advanceboundary(
       
    90             repo,
       
    91             tr,
       
    92             phasenumber,
       
    93             csetres[b'nodesbyphase'][phase],
       
    94         )
       
    95 
       
    96     # Write bookmark updates.
       
    97     bookmarks.updatefromremote(
       
    98         repo.ui,
       
    99         repo,
       
   100         csetres[b'bookmarks'],
       
   101         remote.url(),
       
   102         pullop.gettransaction,
       
   103         explicit=pullop.explicitbookmarks,
       
   104     )
       
   105 
       
   106     manres = _fetchmanifests(repo, tr, remote, csetres[b'manifestnodes'])
       
   107 
       
   108     # We don't properly support shallow changeset and manifest yet. So we apply
       
   109     # depth limiting locally.
       
   110     if pullop.depth:
       
   111         relevantcsetnodes = set()
       
   112         clnode = repo.changelog.node
       
   113 
       
   114         for rev in repo.revs(
       
   115             b'ancestors(%ln, %s)', pullheads, pullop.depth - 1
       
   116         ):
       
   117             relevantcsetnodes.add(clnode(rev))
       
   118 
       
   119         csetrelevantfilter = lambda n: n in relevantcsetnodes
       
   120 
       
   121     else:
       
   122         csetrelevantfilter = lambda n: True
       
   123 
       
   124     # If obtaining the raw store files, we need to scan the full repo to
       
   125     # derive all the changesets, manifests, and linkrevs.
       
   126     if usingrawchangelogandmanifest:
       
   127         csetsforfiles = []
       
   128         mnodesforfiles = []
       
   129         manifestlinkrevs = {}
       
   130 
       
   131         for rev in repo:
       
   132             ctx = repo[rev]
       
   133             node = ctx.node()
       
   134 
       
   135             if not csetrelevantfilter(node):
       
   136                 continue
       
   137 
       
   138             mnode = ctx.manifestnode()
       
   139 
       
   140             csetsforfiles.append(node)
       
   141             mnodesforfiles.append(mnode)
       
   142             manifestlinkrevs[mnode] = rev
       
   143 
       
   144     else:
       
   145         csetsforfiles = [n for n in csetres[b'added'] if csetrelevantfilter(n)]
       
   146         mnodesforfiles = manres[b'added']
       
   147         manifestlinkrevs = manres[b'linkrevs']
       
   148 
       
   149     # Find all file nodes referenced by added manifests and fetch those
       
   150     # revisions.
       
   151     fnodes = _derivefilesfrommanifests(repo, narrowmatcher, mnodesforfiles)
       
   152     _fetchfilesfromcsets(
       
   153         repo,
       
   154         tr,
       
   155         remote,
       
   156         pathfilter,
       
   157         fnodes,
       
   158         csetsforfiles,
       
   159         manifestlinkrevs,
       
   160         shallow=bool(pullop.depth),
       
   161     )
       
   162 
       
   163 
       
   164 def _checkuserawstorefiledata(pullop):
       
   165     """Check whether we should use rawstorefiledata command to retrieve data."""
       
   166 
       
   167     repo = pullop.repo
       
   168     remote = pullop.remote
       
   169 
       
   170     # Command to obtain raw store data isn't available.
       
   171     if b'rawstorefiledata' not in remote.apidescriptor[b'commands']:
       
   172         return False
       
   173 
       
   174     # Only honor if user requested stream clone operation.
       
   175     if not pullop.streamclonerequested:
       
   176         return False
       
   177 
       
   178     # Only works on empty repos.
       
   179     if len(repo):
       
   180         return False
       
   181 
       
   182     # TODO This is super hacky. There needs to be a storage API for this. We
       
   183     # also need to check for compatibility with the remote.
       
   184     if requirementsmod.REVLOGV1_REQUIREMENT not in repo.requirements:
       
   185         return False
       
   186 
       
   187     return True
       
   188 
       
   189 
       
   190 def _fetchrawstorefiles(repo, remote):
       
   191     with remote.commandexecutor() as e:
       
   192         objs = e.callcommand(
       
   193             b'rawstorefiledata',
       
   194             {
       
   195                 b'files': [b'changelog', b'manifestlog'],
       
   196             },
       
   197         ).result()
       
   198 
       
   199         # First object is a summary of files data that follows.
       
   200         overall = next(objs)
       
   201 
       
   202         progress = repo.ui.makeprogress(
       
   203             _(b'clone'), total=overall[b'totalsize'], unit=_(b'bytes')
       
   204         )
       
   205         with progress:
       
   206             progress.update(0)
       
   207 
       
   208             # Next are pairs of file metadata, data.
       
   209             while True:
       
   210                 try:
       
   211                     filemeta = next(objs)
       
   212                 except StopIteration:
       
   213                     break
       
   214 
       
   215                 for k in (b'location', b'path', b'size'):
       
   216                     if k not in filemeta:
       
   217                         raise error.Abort(
       
   218                             _(b'remote file data missing key: %s') % k
       
   219                         )
       
   220 
       
   221                 if filemeta[b'location'] == b'store':
       
   222                     vfs = repo.svfs
       
   223                 else:
       
   224                     raise error.Abort(
       
   225                         _(b'invalid location for raw file data: %s')
       
   226                         % filemeta[b'location']
       
   227                     )
       
   228 
       
   229                 bytesremaining = filemeta[b'size']
       
   230 
       
   231                 with vfs.open(filemeta[b'path'], b'wb') as fh:
       
   232                     while True:
       
   233                         try:
       
   234                             chunk = next(objs)
       
   235                         except StopIteration:
       
   236                             break
       
   237 
       
   238                         bytesremaining -= len(chunk)
       
   239 
       
   240                         if bytesremaining < 0:
       
   241                             raise error.Abort(
       
   242                                 _(
       
   243                                     b'received invalid number of bytes for file '
       
   244                                     b'data; expected %d, got extra'
       
   245                                 )
       
   246                                 % filemeta[b'size']
       
   247                             )
       
   248 
       
   249                         progress.increment(step=len(chunk))
       
   250                         fh.write(chunk)
       
   251 
       
   252                         try:
       
   253                             if chunk.islast:
       
   254                                 break
       
   255                         except AttributeError:
       
   256                             raise error.Abort(
       
   257                                 _(
       
   258                                     b'did not receive indefinite length bytestring '
       
   259                                     b'for file data'
       
   260                                 )
       
   261                             )
       
   262 
       
   263                 if bytesremaining:
       
   264                     raise error.Abort(
       
   265                         _(
       
   266                             b'received invalid number of bytes for'
       
   267                             b'file data; expected %d got %d'
       
   268                         )
       
   269                         % (
       
   270                             filemeta[b'size'],
       
   271                             filemeta[b'size'] - bytesremaining,
       
   272                         )
       
   273                     )
       
   274 
       
   275 
       
   276 def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True):
       
   277     """Determine which changesets need to be pulled."""
       
   278 
       
   279     if heads:
       
   280         knownnode = repo.changelog.hasnode
       
   281         if all(knownnode(head) for head in heads):
       
   282             return heads, False, heads
       
   283 
       
   284     # TODO wire protocol version 2 is capable of more efficient discovery
       
   285     # than setdiscovery. Consider implementing something better.
       
   286     common, fetch, remoteheads = setdiscovery.findcommonheads(
       
   287         repo.ui, repo, remote, abortwhenunrelated=abortwhenunrelated
       
   288     )
       
   289 
       
   290     common = set(common)
       
   291     remoteheads = set(remoteheads)
       
   292 
       
   293     # If a remote head is filtered locally, put it back in the common set.
       
   294     # See the comment in exchange._pulldiscoverychangegroup() for more.
       
   295 
       
   296     if fetch and remoteheads:
       
   297         has_node = repo.unfiltered().changelog.index.has_node
       
   298 
       
   299         common |= {head for head in remoteheads if has_node(head)}
       
   300 
       
   301         if set(remoteheads).issubset(common):
       
   302             fetch = []
       
   303 
       
   304     common.discard(repo.nullid)
       
   305 
       
   306     return common, fetch, remoteheads
       
   307 
       
   308 
       
   309 def _fetchchangesets(repo, tr, remote, common, fetch, remoteheads):
       
   310     # TODO consider adding a step here where we obtain the DAG shape first
       
   311     # (or ask the server to slice changesets into chunks for us) so that
       
   312     # we can perform multiple fetches in batches. This will facilitate
       
   313     # resuming interrupted clones, higher server-side cache hit rates due
       
   314     # to smaller segments, etc.
       
   315     with remote.commandexecutor() as e:
       
   316         objs = e.callcommand(
       
   317             b'changesetdata',
       
   318             {
       
   319                 b'revisions': [
       
   320                     {
       
   321                         b'type': b'changesetdagrange',
       
   322                         b'roots': sorted(common),
       
   323                         b'heads': sorted(remoteheads),
       
   324                     }
       
   325                 ],
       
   326                 b'fields': {b'bookmarks', b'parents', b'phase', b'revision'},
       
   327             },
       
   328         ).result()
       
   329 
       
   330         # The context manager waits on all response data when exiting. So
       
   331         # we need to remain in the context manager in order to stream data.
       
   332         return _processchangesetdata(repo, tr, objs)
       
   333 
       
   334 
       
   335 def _processchangesetdata(repo, tr, objs):
       
   336     repo.hook(b'prechangegroup', throw=True, **pycompat.strkwargs(tr.hookargs))
       
   337 
       
   338     urepo = repo.unfiltered()
       
   339     cl = urepo.changelog
       
   340 
       
   341     cl.delayupdate(tr)
       
   342 
       
   343     # The first emitted object is a header describing the data that
       
   344     # follows.
       
   345     meta = next(objs)
       
   346 
       
   347     progress = repo.ui.makeprogress(
       
   348         _(b'changesets'), unit=_(b'chunks'), total=meta.get(b'totalitems')
       
   349     )
       
   350 
       
   351     manifestnodes = {}
       
   352     added = []
       
   353 
       
   354     def linkrev(node):
       
   355         repo.ui.debug(b'add changeset %s\n' % short(node))
       
   356         # Linkrev for changelog is always self.
       
   357         return len(cl)
       
   358 
       
   359     def ondupchangeset(cl, rev):
       
   360         added.append(cl.node(rev))
       
   361 
       
   362     def onchangeset(cl, rev):
       
   363         progress.increment()
       
   364 
       
   365         revision = cl.changelogrevision(rev)
       
   366         added.append(cl.node(rev))
       
   367 
       
   368         # We need to preserve the mapping of changelog revision to node
       
   369         # so we can set the linkrev accordingly when manifests are added.
       
   370         manifestnodes[rev] = revision.manifest
       
   371 
       
   372         repo.register_changeset(rev, revision)
       
   373 
       
   374     nodesbyphase = {phase: set() for phase in phases.phasenames.values()}
       
   375     remotebookmarks = {}
       
   376 
       
   377     # addgroup() expects a 7-tuple describing revisions. This normalizes
       
   378     # the wire data to that format.
       
   379     #
       
   380     # This loop also aggregates non-revision metadata, such as phase
       
   381     # data.
       
   382     def iterrevisions():
       
   383         for cset in objs:
       
   384             node = cset[b'node']
       
   385 
       
   386             if b'phase' in cset:
       
   387                 nodesbyphase[cset[b'phase']].add(node)
       
   388 
       
   389             for mark in cset.get(b'bookmarks', []):
       
   390                 remotebookmarks[mark] = node
       
   391 
       
   392             # TODO add mechanism for extensions to examine records so they
       
   393             # can siphon off custom data fields.
       
   394 
       
   395             extrafields = {}
       
   396 
       
   397             for field, size in cset.get(b'fieldsfollowing', []):
       
   398                 extrafields[field] = next(objs)
       
   399 
       
   400             # Some entries might only be metadata only updates.
       
   401             if b'revision' not in extrafields:
       
   402                 continue
       
   403 
       
   404             data = extrafields[b'revision']
       
   405 
       
   406             yield (
       
   407                 node,
       
   408                 cset[b'parents'][0],
       
   409                 cset[b'parents'][1],
       
   410                 # Linknode is always itself for changesets.
       
   411                 cset[b'node'],
       
   412                 # We always send full revisions. So delta base is not set.
       
   413                 repo.nullid,
       
   414                 mdiff.trivialdiffheader(len(data)) + data,
       
   415                 # Flags not yet supported.
       
   416                 0,
       
   417                 # Sidedata not yet supported
       
   418                 {},
       
   419             )
       
   420 
       
   421     cl.addgroup(
       
   422         iterrevisions(),
       
   423         linkrev,
       
   424         weakref.proxy(tr),
       
   425         alwayscache=True,
       
   426         addrevisioncb=onchangeset,
       
   427         duplicaterevisioncb=ondupchangeset,
       
   428     )
       
   429 
       
   430     progress.complete()
       
   431 
       
   432     return {
       
   433         b'added': added,
       
   434         b'nodesbyphase': nodesbyphase,
       
   435         b'bookmarks': remotebookmarks,
       
   436         b'manifestnodes': manifestnodes,
       
   437     }
       
   438 
       
   439 
       
   440 def _fetchmanifests(repo, tr, remote, manifestnodes):
       
   441     rootmanifest = repo.manifestlog.getstorage(b'')
       
   442 
       
   443     # Some manifests can be shared between changesets. Filter out revisions
       
   444     # we already know about.
       
   445     fetchnodes = []
       
   446     linkrevs = {}
       
   447     seen = set()
       
   448 
       
   449     for clrev, node in sorted(pycompat.iteritems(manifestnodes)):
       
   450         if node in seen:
       
   451             continue
       
   452 
       
   453         try:
       
   454             rootmanifest.rev(node)
       
   455         except error.LookupError:
       
   456             fetchnodes.append(node)
       
   457             linkrevs[node] = clrev
       
   458 
       
   459         seen.add(node)
       
   460 
       
   461     # TODO handle tree manifests
       
   462 
       
   463     # addgroup() expects 7-tuple describing revisions. This normalizes
       
   464     # the wire data to that format.
       
   465     def iterrevisions(objs, progress):
       
   466         for manifest in objs:
       
   467             node = manifest[b'node']
       
   468 
       
   469             extrafields = {}
       
   470 
       
   471             for field, size in manifest.get(b'fieldsfollowing', []):
       
   472                 extrafields[field] = next(objs)
       
   473 
       
   474             if b'delta' in extrafields:
       
   475                 basenode = manifest[b'deltabasenode']
       
   476                 delta = extrafields[b'delta']
       
   477             elif b'revision' in extrafields:
       
   478                 basenode = repo.nullid
       
   479                 revision = extrafields[b'revision']
       
   480                 delta = mdiff.trivialdiffheader(len(revision)) + revision
       
   481             else:
       
   482                 continue
       
   483 
       
   484             yield (
       
   485                 node,
       
   486                 manifest[b'parents'][0],
       
   487                 manifest[b'parents'][1],
       
   488                 # The value passed in is passed to the lookup function passed
       
   489                 # to addgroup(). We already have a map of manifest node to
       
   490                 # changelog revision number. So we just pass in the
       
   491                 # manifest node here and use linkrevs.__getitem__ as the
       
   492                 # resolution function.
       
   493                 node,
       
   494                 basenode,
       
   495                 delta,
       
   496                 # Flags not yet supported.
       
   497                 0,
       
   498                 # Sidedata not yet supported.
       
   499                 {},
       
   500             )
       
   501 
       
   502             progress.increment()
       
   503 
       
   504     progress = repo.ui.makeprogress(
       
   505         _(b'manifests'), unit=_(b'chunks'), total=len(fetchnodes)
       
   506     )
       
   507 
       
   508     commandmeta = remote.apidescriptor[b'commands'][b'manifestdata']
       
   509     batchsize = commandmeta.get(b'recommendedbatchsize', 10000)
       
   510     # TODO make size configurable on client?
       
   511 
       
   512     # We send commands 1 at a time to the remote. This is not the most
       
   513     # efficient because we incur a round trip at the end of each batch.
       
   514     # However, the existing frame-based reactor keeps consuming server
       
   515     # data in the background. And this results in response data buffering
       
   516     # in memory. This can consume gigabytes of memory.
       
   517     # TODO send multiple commands in a request once background buffering
       
   518     # issues are resolved.
       
   519 
       
   520     added = []
       
   521 
       
   522     for i in pycompat.xrange(0, len(fetchnodes), batchsize):
       
   523         batch = [node for node in fetchnodes[i : i + batchsize]]
       
   524         if not batch:
       
   525             continue
       
   526 
       
   527         with remote.commandexecutor() as e:
       
   528             objs = e.callcommand(
       
   529                 b'manifestdata',
       
   530                 {
       
   531                     b'tree': b'',
       
   532                     b'nodes': batch,
       
   533                     b'fields': {b'parents', b'revision'},
       
   534                     b'haveparents': True,
       
   535                 },
       
   536             ).result()
       
   537 
       
   538             # Chomp off header object.
       
   539             next(objs)
       
   540 
       
   541             def onchangeset(cl, rev):
       
   542                 added.append(cl.node(rev))
       
   543 
       
   544             rootmanifest.addgroup(
       
   545                 iterrevisions(objs, progress),
       
   546                 linkrevs.__getitem__,
       
   547                 weakref.proxy(tr),
       
   548                 addrevisioncb=onchangeset,
       
   549                 duplicaterevisioncb=onchangeset,
       
   550             )
       
   551 
       
   552     progress.complete()
       
   553 
       
   554     return {
       
   555         b'added': added,
       
   556         b'linkrevs': linkrevs,
       
   557     }
       
   558 
       
   559 
       
   560 def _derivefilesfrommanifests(repo, matcher, manifestnodes):
       
   561     """Determine what file nodes are relevant given a set of manifest nodes.
       
   562 
       
   563     Returns a dict mapping file paths to dicts of file node to first manifest
       
   564     node.
       
   565     """
       
   566     ml = repo.manifestlog
       
   567     fnodes = collections.defaultdict(dict)
       
   568 
       
   569     progress = repo.ui.makeprogress(
       
   570         _(b'scanning manifests'), total=len(manifestnodes)
       
   571     )
       
   572 
       
   573     with progress:
       
   574         for manifestnode in manifestnodes:
       
   575             m = ml.get(b'', manifestnode)
       
   576 
       
   577             # TODO this will pull in unwanted nodes because it takes the storage
       
   578             # delta into consideration. What we really want is something that
       
   579             # takes the delta between the manifest's parents. And ideally we
       
   580             # would ignore file nodes that are known locally. For now, ignore
       
   581             # both these limitations. This will result in incremental fetches
       
   582             # requesting data we already have. So this is far from ideal.
       
   583             md = m.readfast()
       
   584 
       
   585             for path, fnode in md.items():
       
   586                 if matcher(path):
       
   587                     fnodes[path].setdefault(fnode, manifestnode)
       
   588 
       
   589             progress.increment()
       
   590 
       
   591     return fnodes
       
   592 
       
   593 
       
   594 def _fetchfiles(repo, tr, remote, fnodes, linkrevs):
       
   595     """Fetch file data from explicit file revisions."""
       
   596 
       
   597     def iterrevisions(objs, progress):
       
   598         for filerevision in objs:
       
   599             node = filerevision[b'node']
       
   600 
       
   601             extrafields = {}
       
   602 
       
   603             for field, size in filerevision.get(b'fieldsfollowing', []):
       
   604                 extrafields[field] = next(objs)
       
   605 
       
   606             if b'delta' in extrafields:
       
   607                 basenode = filerevision[b'deltabasenode']
       
   608                 delta = extrafields[b'delta']
       
   609             elif b'revision' in extrafields:
       
   610                 basenode = repo.nullid
       
   611                 revision = extrafields[b'revision']
       
   612                 delta = mdiff.trivialdiffheader(len(revision)) + revision
       
   613             else:
       
   614                 continue
       
   615 
       
   616             yield (
       
   617                 node,
       
   618                 filerevision[b'parents'][0],
       
   619                 filerevision[b'parents'][1],
       
   620                 node,
       
   621                 basenode,
       
   622                 delta,
       
   623                 # Flags not yet supported.
       
   624                 0,
       
   625                 # Sidedata not yet supported.
       
   626                 {},
       
   627             )
       
   628 
       
   629             progress.increment()
       
   630 
       
   631     progress = repo.ui.makeprogress(
       
   632         _(b'files'),
       
   633         unit=_(b'chunks'),
       
   634         total=sum(len(v) for v in pycompat.itervalues(fnodes)),
       
   635     )
       
   636 
       
   637     # TODO make batch size configurable
       
   638     batchsize = 10000
       
   639     fnodeslist = [x for x in sorted(fnodes.items())]
       
   640 
       
   641     for i in pycompat.xrange(0, len(fnodeslist), batchsize):
       
   642         batch = [x for x in fnodeslist[i : i + batchsize]]
       
   643         if not batch:
       
   644             continue
       
   645 
       
   646         with remote.commandexecutor() as e:
       
   647             fs = []
       
   648             locallinkrevs = {}
       
   649 
       
   650             for path, nodes in batch:
       
   651                 fs.append(
       
   652                     (
       
   653                         path,
       
   654                         e.callcommand(
       
   655                             b'filedata',
       
   656                             {
       
   657                                 b'path': path,
       
   658                                 b'nodes': sorted(nodes),
       
   659                                 b'fields': {b'parents', b'revision'},
       
   660                                 b'haveparents': True,
       
   661                             },
       
   662                         ),
       
   663                     )
       
   664                 )
       
   665 
       
   666                 locallinkrevs[path] = {
       
   667                     node: linkrevs[manifestnode]
       
   668                     for node, manifestnode in pycompat.iteritems(nodes)
       
   669                 }
       
   670 
       
   671             for path, f in fs:
       
   672                 objs = f.result()
       
   673 
       
   674                 # Chomp off header objects.
       
   675                 next(objs)
       
   676 
       
   677                 store = repo.file(path)
       
   678                 store.addgroup(
       
   679                     iterrevisions(objs, progress),
       
   680                     locallinkrevs[path].__getitem__,
       
   681                     weakref.proxy(tr),
       
   682                 )
       
   683 
       
   684 
       
   685 def _fetchfilesfromcsets(
       
   686     repo, tr, remote, pathfilter, fnodes, csets, manlinkrevs, shallow=False
       
   687 ):
       
   688     """Fetch file data from explicit changeset revisions."""
       
   689 
       
   690     def iterrevisions(objs, remaining, progress):
       
   691         while remaining:
       
   692             filerevision = next(objs)
       
   693 
       
   694             node = filerevision[b'node']
       
   695 
       
   696             extrafields = {}
       
   697 
       
   698             for field, size in filerevision.get(b'fieldsfollowing', []):
       
   699                 extrafields[field] = next(objs)
       
   700 
       
   701             if b'delta' in extrafields:
       
   702                 basenode = filerevision[b'deltabasenode']
       
   703                 delta = extrafields[b'delta']
       
   704             elif b'revision' in extrafields:
       
   705                 basenode = repo.nullid
       
   706                 revision = extrafields[b'revision']
       
   707                 delta = mdiff.trivialdiffheader(len(revision)) + revision
       
   708             else:
       
   709                 continue
       
   710 
       
   711             if b'linknode' in filerevision:
       
   712                 linknode = filerevision[b'linknode']
       
   713             else:
       
   714                 linknode = node
       
   715 
       
   716             yield (
       
   717                 node,
       
   718                 filerevision[b'parents'][0],
       
   719                 filerevision[b'parents'][1],
       
   720                 linknode,
       
   721                 basenode,
       
   722                 delta,
       
   723                 # Flags not yet supported.
       
   724                 0,
       
   725                 # Sidedata not yet supported.
       
   726                 {},
       
   727             )
       
   728 
       
   729             progress.increment()
       
   730             remaining -= 1
       
   731 
       
   732     progress = repo.ui.makeprogress(
       
   733         _(b'files'),
       
   734         unit=_(b'chunks'),
       
   735         total=sum(len(v) for v in pycompat.itervalues(fnodes)),
       
   736     )
       
   737 
       
   738     commandmeta = remote.apidescriptor[b'commands'][b'filesdata']
       
   739     batchsize = commandmeta.get(b'recommendedbatchsize', 50000)
       
   740 
       
   741     shallowfiles = repository.REPO_FEATURE_SHALLOW_FILE_STORAGE in repo.features
       
   742     fields = {b'parents', b'revision'}
       
   743     clrev = repo.changelog.rev
       
   744 
       
   745     # There are no guarantees that we'll have ancestor revisions if
       
   746     # a) this repo has shallow file storage b) shallow data fetching is enabled.
       
   747     # Force remote to not delta against possibly unknown revisions when these
       
   748     # conditions hold.
       
   749     haveparents = not (shallowfiles or shallow)
       
   750 
       
   751     # Similarly, we may not have calculated linkrevs for all incoming file
       
   752     # revisions. Ask the remote to do work for us in this case.
       
   753     if not haveparents:
       
   754         fields.add(b'linknode')
       
   755 
       
   756     for i in pycompat.xrange(0, len(csets), batchsize):
       
   757         batch = [x for x in csets[i : i + batchsize]]
       
   758         if not batch:
       
   759             continue
       
   760 
       
   761         with remote.commandexecutor() as e:
       
   762             args = {
       
   763                 b'revisions': [
       
   764                     {
       
   765                         b'type': b'changesetexplicit',
       
   766                         b'nodes': batch,
       
   767                     }
       
   768                 ],
       
   769                 b'fields': fields,
       
   770                 b'haveparents': haveparents,
       
   771             }
       
   772 
       
   773             if pathfilter:
       
   774                 args[b'pathfilter'] = pathfilter
       
   775 
       
   776             objs = e.callcommand(b'filesdata', args).result()
       
   777 
       
   778             # First object is an overall header.
       
   779             overall = next(objs)
       
   780 
       
   781             # We have overall['totalpaths'] segments.
       
   782             for i in pycompat.xrange(overall[b'totalpaths']):
       
   783                 header = next(objs)
       
   784 
       
   785                 path = header[b'path']
       
   786                 store = repo.file(path)
       
   787 
       
   788                 linkrevs = {
       
   789                     fnode: manlinkrevs[mnode]
       
   790                     for fnode, mnode in pycompat.iteritems(fnodes[path])
       
   791                 }
       
   792 
       
   793                 def getlinkrev(node):
       
   794                     if node in linkrevs:
       
   795                         return linkrevs[node]
       
   796                     else:
       
   797                         return clrev(node)
       
   798 
       
   799                 store.addgroup(
       
   800                     iterrevisions(objs, header[b'totalitems'], progress),
       
   801                     getlinkrev,
       
   802                     weakref.proxy(tr),
       
   803                     maybemissingparents=shallow,
       
   804                 )