mercurial/exchangev2.py
changeset 48561 04688c51f81f
parent 48560 d6c53b40b078
child 48562 bf5dc156bb4c
--- a/mercurial/exchangev2.py	Thu Dec 30 13:25:44 2021 +0100
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,804 +0,0 @@
-# exchangev2.py - repository exchange for wire protocol version 2
-#
-# Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
-#
-# This software may be used and distributed according to the terms of the
-# GNU General Public License version 2 or any later version.
-
-from __future__ import absolute_import
-
-import collections
-import weakref
-
-from .i18n import _
-from .node import short
-from . import (
-    bookmarks,
-    error,
-    mdiff,
-    narrowspec,
-    phases,
-    pycompat,
-    requirements as requirementsmod,
-    setdiscovery,
-)
-from .interfaces import repository
-
-
-def pull(pullop):
-    """Pull using wire protocol version 2."""
-    repo = pullop.repo
-    remote = pullop.remote
-
-    usingrawchangelogandmanifest = _checkuserawstorefiledata(pullop)
-
-    # If this is a clone and it was requested to perform a "stream clone",
-    # we obtain the raw files data from the remote then fall back to an
-    # incremental pull. This is somewhat hacky and is not nearly robust enough
-    # for long-term usage.
-    if usingrawchangelogandmanifest:
-        with repo.transaction(b'clone'):
-            _fetchrawstorefiles(repo, remote)
-            repo.invalidate(clearfilecache=True)
-
-    tr = pullop.trmanager.transaction()
-
-    # We don't use the repo's narrow matcher here because the patterns passed
-    # to exchange.pull() could be different.
-    narrowmatcher = narrowspec.match(
-        repo.root,
-        # Empty maps to nevermatcher. So always
-        # set includes if missing.
-        pullop.includepats or {b'path:.'},
-        pullop.excludepats,
-    )
-
-    if pullop.includepats or pullop.excludepats:
-        pathfilter = {}
-        if pullop.includepats:
-            pathfilter[b'include'] = sorted(pullop.includepats)
-        if pullop.excludepats:
-            pathfilter[b'exclude'] = sorted(pullop.excludepats)
-    else:
-        pathfilter = None
-
-    # Figure out what needs to be fetched.
-    common, fetch, remoteheads = _pullchangesetdiscovery(
-        repo, remote, pullop.heads, abortwhenunrelated=pullop.force
-    )
-
-    # And fetch the data.
-    pullheads = pullop.heads or remoteheads
-    csetres = _fetchchangesets(repo, tr, remote, common, fetch, pullheads)
-
-    # New revisions are written to the changelog. But all other updates
-    # are deferred. Do those now.
-
-    # Ensure all new changesets are draft by default. If the repo is
-    # publishing, the phase will be adjusted by the loop below.
-    if csetres[b'added']:
-        phases.registernew(
-            repo, tr, phases.draft, [repo[n].rev() for n in csetres[b'added']]
-        )
-
-    # And adjust the phase of all changesets accordingly.
-    for phasenumber, phase in phases.phasenames.items():
-        if phase == b'secret' or not csetres[b'nodesbyphase'][phase]:
-            continue
-
-        phases.advanceboundary(
-            repo,
-            tr,
-            phasenumber,
-            csetres[b'nodesbyphase'][phase],
-        )
-
-    # Write bookmark updates.
-    bookmarks.updatefromremote(
-        repo.ui,
-        repo,
-        csetres[b'bookmarks'],
-        remote.url(),
-        pullop.gettransaction,
-        explicit=pullop.explicitbookmarks,
-    )
-
-    manres = _fetchmanifests(repo, tr, remote, csetres[b'manifestnodes'])
-
-    # We don't properly support shallow changeset and manifest yet. So we apply
-    # depth limiting locally.
-    if pullop.depth:
-        relevantcsetnodes = set()
-        clnode = repo.changelog.node
-
-        for rev in repo.revs(
-            b'ancestors(%ln, %s)', pullheads, pullop.depth - 1
-        ):
-            relevantcsetnodes.add(clnode(rev))
-
-        csetrelevantfilter = lambda n: n in relevantcsetnodes
-
-    else:
-        csetrelevantfilter = lambda n: True
-
-    # If obtaining the raw store files, we need to scan the full repo to
-    # derive all the changesets, manifests, and linkrevs.
-    if usingrawchangelogandmanifest:
-        csetsforfiles = []
-        mnodesforfiles = []
-        manifestlinkrevs = {}
-
-        for rev in repo:
-            ctx = repo[rev]
-            node = ctx.node()
-
-            if not csetrelevantfilter(node):
-                continue
-
-            mnode = ctx.manifestnode()
-
-            csetsforfiles.append(node)
-            mnodesforfiles.append(mnode)
-            manifestlinkrevs[mnode] = rev
-
-    else:
-        csetsforfiles = [n for n in csetres[b'added'] if csetrelevantfilter(n)]
-        mnodesforfiles = manres[b'added']
-        manifestlinkrevs = manres[b'linkrevs']
-
-    # Find all file nodes referenced by added manifests and fetch those
-    # revisions.
-    fnodes = _derivefilesfrommanifests(repo, narrowmatcher, mnodesforfiles)
-    _fetchfilesfromcsets(
-        repo,
-        tr,
-        remote,
-        pathfilter,
-        fnodes,
-        csetsforfiles,
-        manifestlinkrevs,
-        shallow=bool(pullop.depth),
-    )
-
-
-def _checkuserawstorefiledata(pullop):
-    """Check whether we should use rawstorefiledata command to retrieve data."""
-
-    repo = pullop.repo
-    remote = pullop.remote
-
-    # Command to obtain raw store data isn't available.
-    if b'rawstorefiledata' not in remote.apidescriptor[b'commands']:
-        return False
-
-    # Only honor if user requested stream clone operation.
-    if not pullop.streamclonerequested:
-        return False
-
-    # Only works on empty repos.
-    if len(repo):
-        return False
-
-    # TODO This is super hacky. There needs to be a storage API for this. We
-    # also need to check for compatibility with the remote.
-    if requirementsmod.REVLOGV1_REQUIREMENT not in repo.requirements:
-        return False
-
-    return True
-
-
-def _fetchrawstorefiles(repo, remote):
-    with remote.commandexecutor() as e:
-        objs = e.callcommand(
-            b'rawstorefiledata',
-            {
-                b'files': [b'changelog', b'manifestlog'],
-            },
-        ).result()
-
-        # First object is a summary of files data that follows.
-        overall = next(objs)
-
-        progress = repo.ui.makeprogress(
-            _(b'clone'), total=overall[b'totalsize'], unit=_(b'bytes')
-        )
-        with progress:
-            progress.update(0)
-
-            # Next are pairs of file metadata, data.
-            while True:
-                try:
-                    filemeta = next(objs)
-                except StopIteration:
-                    break
-
-                for k in (b'location', b'path', b'size'):
-                    if k not in filemeta:
-                        raise error.Abort(
-                            _(b'remote file data missing key: %s') % k
-                        )
-
-                if filemeta[b'location'] == b'store':
-                    vfs = repo.svfs
-                else:
-                    raise error.Abort(
-                        _(b'invalid location for raw file data: %s')
-                        % filemeta[b'location']
-                    )
-
-                bytesremaining = filemeta[b'size']
-
-                with vfs.open(filemeta[b'path'], b'wb') as fh:
-                    while True:
-                        try:
-                            chunk = next(objs)
-                        except StopIteration:
-                            break
-
-                        bytesremaining -= len(chunk)
-
-                        if bytesremaining < 0:
-                            raise error.Abort(
-                                _(
-                                    b'received invalid number of bytes for file '
-                                    b'data; expected %d, got extra'
-                                )
-                                % filemeta[b'size']
-                            )
-
-                        progress.increment(step=len(chunk))
-                        fh.write(chunk)
-
-                        try:
-                            if chunk.islast:
-                                break
-                        except AttributeError:
-                            raise error.Abort(
-                                _(
-                                    b'did not receive indefinite length bytestring '
-                                    b'for file data'
-                                )
-                            )
-
-                if bytesremaining:
-                    raise error.Abort(
-                        _(
-                            b'received invalid number of bytes for'
-                            b'file data; expected %d got %d'
-                        )
-                        % (
-                            filemeta[b'size'],
-                            filemeta[b'size'] - bytesremaining,
-                        )
-                    )
-
-
-def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True):
-    """Determine which changesets need to be pulled."""
-
-    if heads:
-        knownnode = repo.changelog.hasnode
-        if all(knownnode(head) for head in heads):
-            return heads, False, heads
-
-    # TODO wire protocol version 2 is capable of more efficient discovery
-    # than setdiscovery. Consider implementing something better.
-    common, fetch, remoteheads = setdiscovery.findcommonheads(
-        repo.ui, repo, remote, abortwhenunrelated=abortwhenunrelated
-    )
-
-    common = set(common)
-    remoteheads = set(remoteheads)
-
-    # If a remote head is filtered locally, put it back in the common set.
-    # See the comment in exchange._pulldiscoverychangegroup() for more.
-
-    if fetch and remoteheads:
-        has_node = repo.unfiltered().changelog.index.has_node
-
-        common |= {head for head in remoteheads if has_node(head)}
-
-        if set(remoteheads).issubset(common):
-            fetch = []
-
-    common.discard(repo.nullid)
-
-    return common, fetch, remoteheads
-
-
-def _fetchchangesets(repo, tr, remote, common, fetch, remoteheads):
-    # TODO consider adding a step here where we obtain the DAG shape first
-    # (or ask the server to slice changesets into chunks for us) so that
-    # we can perform multiple fetches in batches. This will facilitate
-    # resuming interrupted clones, higher server-side cache hit rates due
-    # to smaller segments, etc.
-    with remote.commandexecutor() as e:
-        objs = e.callcommand(
-            b'changesetdata',
-            {
-                b'revisions': [
-                    {
-                        b'type': b'changesetdagrange',
-                        b'roots': sorted(common),
-                        b'heads': sorted(remoteheads),
-                    }
-                ],
-                b'fields': {b'bookmarks', b'parents', b'phase', b'revision'},
-            },
-        ).result()
-
-        # The context manager waits on all response data when exiting. So
-        # we need to remain in the context manager in order to stream data.
-        return _processchangesetdata(repo, tr, objs)
-
-
-def _processchangesetdata(repo, tr, objs):
-    repo.hook(b'prechangegroup', throw=True, **pycompat.strkwargs(tr.hookargs))
-
-    urepo = repo.unfiltered()
-    cl = urepo.changelog
-
-    cl.delayupdate(tr)
-
-    # The first emitted object is a header describing the data that
-    # follows.
-    meta = next(objs)
-
-    progress = repo.ui.makeprogress(
-        _(b'changesets'), unit=_(b'chunks'), total=meta.get(b'totalitems')
-    )
-
-    manifestnodes = {}
-    added = []
-
-    def linkrev(node):
-        repo.ui.debug(b'add changeset %s\n' % short(node))
-        # Linkrev for changelog is always self.
-        return len(cl)
-
-    def ondupchangeset(cl, rev):
-        added.append(cl.node(rev))
-
-    def onchangeset(cl, rev):
-        progress.increment()
-
-        revision = cl.changelogrevision(rev)
-        added.append(cl.node(rev))
-
-        # We need to preserve the mapping of changelog revision to node
-        # so we can set the linkrev accordingly when manifests are added.
-        manifestnodes[rev] = revision.manifest
-
-        repo.register_changeset(rev, revision)
-
-    nodesbyphase = {phase: set() for phase in phases.phasenames.values()}
-    remotebookmarks = {}
-
-    # addgroup() expects a 7-tuple describing revisions. This normalizes
-    # the wire data to that format.
-    #
-    # This loop also aggregates non-revision metadata, such as phase
-    # data.
-    def iterrevisions():
-        for cset in objs:
-            node = cset[b'node']
-
-            if b'phase' in cset:
-                nodesbyphase[cset[b'phase']].add(node)
-
-            for mark in cset.get(b'bookmarks', []):
-                remotebookmarks[mark] = node
-
-            # TODO add mechanism for extensions to examine records so they
-            # can siphon off custom data fields.
-
-            extrafields = {}
-
-            for field, size in cset.get(b'fieldsfollowing', []):
-                extrafields[field] = next(objs)
-
-            # Some entries might only be metadata only updates.
-            if b'revision' not in extrafields:
-                continue
-
-            data = extrafields[b'revision']
-
-            yield (
-                node,
-                cset[b'parents'][0],
-                cset[b'parents'][1],
-                # Linknode is always itself for changesets.
-                cset[b'node'],
-                # We always send full revisions. So delta base is not set.
-                repo.nullid,
-                mdiff.trivialdiffheader(len(data)) + data,
-                # Flags not yet supported.
-                0,
-                # Sidedata not yet supported
-                {},
-            )
-
-    cl.addgroup(
-        iterrevisions(),
-        linkrev,
-        weakref.proxy(tr),
-        alwayscache=True,
-        addrevisioncb=onchangeset,
-        duplicaterevisioncb=ondupchangeset,
-    )
-
-    progress.complete()
-
-    return {
-        b'added': added,
-        b'nodesbyphase': nodesbyphase,
-        b'bookmarks': remotebookmarks,
-        b'manifestnodes': manifestnodes,
-    }
-
-
-def _fetchmanifests(repo, tr, remote, manifestnodes):
-    rootmanifest = repo.manifestlog.getstorage(b'')
-
-    # Some manifests can be shared between changesets. Filter out revisions
-    # we already know about.
-    fetchnodes = []
-    linkrevs = {}
-    seen = set()
-
-    for clrev, node in sorted(pycompat.iteritems(manifestnodes)):
-        if node in seen:
-            continue
-
-        try:
-            rootmanifest.rev(node)
-        except error.LookupError:
-            fetchnodes.append(node)
-            linkrevs[node] = clrev
-
-        seen.add(node)
-
-    # TODO handle tree manifests
-
-    # addgroup() expects 7-tuple describing revisions. This normalizes
-    # the wire data to that format.
-    def iterrevisions(objs, progress):
-        for manifest in objs:
-            node = manifest[b'node']
-
-            extrafields = {}
-
-            for field, size in manifest.get(b'fieldsfollowing', []):
-                extrafields[field] = next(objs)
-
-            if b'delta' in extrafields:
-                basenode = manifest[b'deltabasenode']
-                delta = extrafields[b'delta']
-            elif b'revision' in extrafields:
-                basenode = repo.nullid
-                revision = extrafields[b'revision']
-                delta = mdiff.trivialdiffheader(len(revision)) + revision
-            else:
-                continue
-
-            yield (
-                node,
-                manifest[b'parents'][0],
-                manifest[b'parents'][1],
-                # The value passed in is passed to the lookup function passed
-                # to addgroup(). We already have a map of manifest node to
-                # changelog revision number. So we just pass in the
-                # manifest node here and use linkrevs.__getitem__ as the
-                # resolution function.
-                node,
-                basenode,
-                delta,
-                # Flags not yet supported.
-                0,
-                # Sidedata not yet supported.
-                {},
-            )
-
-            progress.increment()
-
-    progress = repo.ui.makeprogress(
-        _(b'manifests'), unit=_(b'chunks'), total=len(fetchnodes)
-    )
-
-    commandmeta = remote.apidescriptor[b'commands'][b'manifestdata']
-    batchsize = commandmeta.get(b'recommendedbatchsize', 10000)
-    # TODO make size configurable on client?
-
-    # We send commands 1 at a time to the remote. This is not the most
-    # efficient because we incur a round trip at the end of each batch.
-    # However, the existing frame-based reactor keeps consuming server
-    # data in the background. And this results in response data buffering
-    # in memory. This can consume gigabytes of memory.
-    # TODO send multiple commands in a request once background buffering
-    # issues are resolved.
-
-    added = []
-
-    for i in pycompat.xrange(0, len(fetchnodes), batchsize):
-        batch = [node for node in fetchnodes[i : i + batchsize]]
-        if not batch:
-            continue
-
-        with remote.commandexecutor() as e:
-            objs = e.callcommand(
-                b'manifestdata',
-                {
-                    b'tree': b'',
-                    b'nodes': batch,
-                    b'fields': {b'parents', b'revision'},
-                    b'haveparents': True,
-                },
-            ).result()
-
-            # Chomp off header object.
-            next(objs)
-
-            def onchangeset(cl, rev):
-                added.append(cl.node(rev))
-
-            rootmanifest.addgroup(
-                iterrevisions(objs, progress),
-                linkrevs.__getitem__,
-                weakref.proxy(tr),
-                addrevisioncb=onchangeset,
-                duplicaterevisioncb=onchangeset,
-            )
-
-    progress.complete()
-
-    return {
-        b'added': added,
-        b'linkrevs': linkrevs,
-    }
-
-
-def _derivefilesfrommanifests(repo, matcher, manifestnodes):
-    """Determine what file nodes are relevant given a set of manifest nodes.
-
-    Returns a dict mapping file paths to dicts of file node to first manifest
-    node.
-    """
-    ml = repo.manifestlog
-    fnodes = collections.defaultdict(dict)
-
-    progress = repo.ui.makeprogress(
-        _(b'scanning manifests'), total=len(manifestnodes)
-    )
-
-    with progress:
-        for manifestnode in manifestnodes:
-            m = ml.get(b'', manifestnode)
-
-            # TODO this will pull in unwanted nodes because it takes the storage
-            # delta into consideration. What we really want is something that
-            # takes the delta between the manifest's parents. And ideally we
-            # would ignore file nodes that are known locally. For now, ignore
-            # both these limitations. This will result in incremental fetches
-            # requesting data we already have. So this is far from ideal.
-            md = m.readfast()
-
-            for path, fnode in md.items():
-                if matcher(path):
-                    fnodes[path].setdefault(fnode, manifestnode)
-
-            progress.increment()
-
-    return fnodes
-
-
-def _fetchfiles(repo, tr, remote, fnodes, linkrevs):
-    """Fetch file data from explicit file revisions."""
-
-    def iterrevisions(objs, progress):
-        for filerevision in objs:
-            node = filerevision[b'node']
-
-            extrafields = {}
-
-            for field, size in filerevision.get(b'fieldsfollowing', []):
-                extrafields[field] = next(objs)
-
-            if b'delta' in extrafields:
-                basenode = filerevision[b'deltabasenode']
-                delta = extrafields[b'delta']
-            elif b'revision' in extrafields:
-                basenode = repo.nullid
-                revision = extrafields[b'revision']
-                delta = mdiff.trivialdiffheader(len(revision)) + revision
-            else:
-                continue
-
-            yield (
-                node,
-                filerevision[b'parents'][0],
-                filerevision[b'parents'][1],
-                node,
-                basenode,
-                delta,
-                # Flags not yet supported.
-                0,
-                # Sidedata not yet supported.
-                {},
-            )
-
-            progress.increment()
-
-    progress = repo.ui.makeprogress(
-        _(b'files'),
-        unit=_(b'chunks'),
-        total=sum(len(v) for v in pycompat.itervalues(fnodes)),
-    )
-
-    # TODO make batch size configurable
-    batchsize = 10000
-    fnodeslist = [x for x in sorted(fnodes.items())]
-
-    for i in pycompat.xrange(0, len(fnodeslist), batchsize):
-        batch = [x for x in fnodeslist[i : i + batchsize]]
-        if not batch:
-            continue
-
-        with remote.commandexecutor() as e:
-            fs = []
-            locallinkrevs = {}
-
-            for path, nodes in batch:
-                fs.append(
-                    (
-                        path,
-                        e.callcommand(
-                            b'filedata',
-                            {
-                                b'path': path,
-                                b'nodes': sorted(nodes),
-                                b'fields': {b'parents', b'revision'},
-                                b'haveparents': True,
-                            },
-                        ),
-                    )
-                )
-
-                locallinkrevs[path] = {
-                    node: linkrevs[manifestnode]
-                    for node, manifestnode in pycompat.iteritems(nodes)
-                }
-
-            for path, f in fs:
-                objs = f.result()
-
-                # Chomp off header objects.
-                next(objs)
-
-                store = repo.file(path)
-                store.addgroup(
-                    iterrevisions(objs, progress),
-                    locallinkrevs[path].__getitem__,
-                    weakref.proxy(tr),
-                )
-
-
-def _fetchfilesfromcsets(
-    repo, tr, remote, pathfilter, fnodes, csets, manlinkrevs, shallow=False
-):
-    """Fetch file data from explicit changeset revisions."""
-
-    def iterrevisions(objs, remaining, progress):
-        while remaining:
-            filerevision = next(objs)
-
-            node = filerevision[b'node']
-
-            extrafields = {}
-
-            for field, size in filerevision.get(b'fieldsfollowing', []):
-                extrafields[field] = next(objs)
-
-            if b'delta' in extrafields:
-                basenode = filerevision[b'deltabasenode']
-                delta = extrafields[b'delta']
-            elif b'revision' in extrafields:
-                basenode = repo.nullid
-                revision = extrafields[b'revision']
-                delta = mdiff.trivialdiffheader(len(revision)) + revision
-            else:
-                continue
-
-            if b'linknode' in filerevision:
-                linknode = filerevision[b'linknode']
-            else:
-                linknode = node
-
-            yield (
-                node,
-                filerevision[b'parents'][0],
-                filerevision[b'parents'][1],
-                linknode,
-                basenode,
-                delta,
-                # Flags not yet supported.
-                0,
-                # Sidedata not yet supported.
-                {},
-            )
-
-            progress.increment()
-            remaining -= 1
-
-    progress = repo.ui.makeprogress(
-        _(b'files'),
-        unit=_(b'chunks'),
-        total=sum(len(v) for v in pycompat.itervalues(fnodes)),
-    )
-
-    commandmeta = remote.apidescriptor[b'commands'][b'filesdata']
-    batchsize = commandmeta.get(b'recommendedbatchsize', 50000)
-
-    shallowfiles = repository.REPO_FEATURE_SHALLOW_FILE_STORAGE in repo.features
-    fields = {b'parents', b'revision'}
-    clrev = repo.changelog.rev
-
-    # There are no guarantees that we'll have ancestor revisions if
-    # a) this repo has shallow file storage b) shallow data fetching is enabled.
-    # Force remote to not delta against possibly unknown revisions when these
-    # conditions hold.
-    haveparents = not (shallowfiles or shallow)
-
-    # Similarly, we may not have calculated linkrevs for all incoming file
-    # revisions. Ask the remote to do work for us in this case.
-    if not haveparents:
-        fields.add(b'linknode')
-
-    for i in pycompat.xrange(0, len(csets), batchsize):
-        batch = [x for x in csets[i : i + batchsize]]
-        if not batch:
-            continue
-
-        with remote.commandexecutor() as e:
-            args = {
-                b'revisions': [
-                    {
-                        b'type': b'changesetexplicit',
-                        b'nodes': batch,
-                    }
-                ],
-                b'fields': fields,
-                b'haveparents': haveparents,
-            }
-
-            if pathfilter:
-                args[b'pathfilter'] = pathfilter
-
-            objs = e.callcommand(b'filesdata', args).result()
-
-            # First object is an overall header.
-            overall = next(objs)
-
-            # We have overall['totalpaths'] segments.
-            for i in pycompat.xrange(overall[b'totalpaths']):
-                header = next(objs)
-
-                path = header[b'path']
-                store = repo.file(path)
-
-                linkrevs = {
-                    fnode: manlinkrevs[mnode]
-                    for fnode, mnode in pycompat.iteritems(fnodes[path])
-                }
-
-                def getlinkrev(node):
-                    if node in linkrevs:
-                        return linkrevs[node]
-                    else:
-                        return clrev(node)
-
-                store.addgroup(
-                    iterrevisions(objs, header[b'totalitems'], progress),
-                    getlinkrev,
-                    weakref.proxy(tr),
-                    maybemissingparents=shallow,
-                )