view mercurial/exchangev2.py @ 42745:4d20b1fe8a72

rust-discovery: using from Python code As previously done in other topics, the Rust version is used if it's been built. The version fully in Rust of the partialdiscovery class has the performance advantage over the Python version (actually using the Rust MissingAncestor) if the undecided set is big enough. Otherwise no sampling occurs, and the discovery is reasonably fast anyway. Note: it's hard to predict the size of the initial undecided set, it can depend on the kind of topological changes between the local and remote graphs. The point of the Rust version is to make the bad cases acceptable. More specifically, the performance advantages are: - faster sampling, especially takefullsample() - much faster addmissings() in almost all cases (see commit message in grandparent of the present changeset) - no conversion cost of the undecided set at the interface between Rust and Python == Measurements with big undecided sets For an extreme example, discovery between mozilla-try and mozilla-unified (over one million undecided revisions, same case as in dbd0fcca6dfc), we get roughly a x2.5/x3 better performance: Growing sample size (5% starting with 200): time goes down from 210 to 72 seconds. Constant sample size of 200: time down from 1853 to 659 seconds. With a sample size computed from number of roots and heads of the undecided set (`respectsize` is `False`), here are perfdiscovery results: Before ! wall 9.358729 comb 9.360000 user 9.310000 sys 0.050000 (median of 50) After ! wall 3.793819 comb 3.790000 user 3.750000 sys 0.040000 (median of 50) In that later case, the sample sizes are routinely in the hundreds of thousands of revisions. While still faster, the Rust iteration in addmissings has less of an advantage than with smaller sample sizes, but one sees addcommons becoming faster, probably a consequence of not having to copy big sets back and forth. This example is not a goal in itself, but it showcases several different areas in which the process can become slow, due to different factors, and how this full Rust version can help. == Measurements with small undecided sets In cases the undecided set is small enough than no sampling occurs, the Rust version has a disadvantage at init if `targetheads` is really big (some time is lost in the translation to Rust data structures), and that is compensated by the faster `addmissings()`. On a private repository with over one million commits, we still get a minor improvement, of 6.8%: Before ! wall 0.593585 comb 0.590000 user 0.550000 sys 0.040000 (median of 50) After ! wall 0.553035 comb 0.550000 user 0.520000 sys 0.030000 (median of 50) What's interesting in that case is the first addinfo() at 180ms for Rust and 233ms for Python+C, mostly due to add_missings and the children cache computation being done in less than 0.2ms on the Rust side vs over 40ms on the Python side. The worst case we have on hand is with mozilla-try, prepared with discovery-helper.sh for 10 heads and depth 10, time goes up 2.2% on the median. In this case `targetheads` is really huge with 165842 server heads. Before ! wall 0.823884 comb 0.810000 user 0.790000 sys 0.020000 (median of 50) After ! wall 0.842607 comb 0.840000 user 0.800000 sys 0.040000 (median of 50) If that would be considered a problem, more adjustments can be made, which are prematurate at this stage: cooking special variants of methods of the inner MissingAncestors object, retrieving local heads directly from Rust to avoid the cost of conversion. Effort would probably be better spent at this point improving the surroundings if needed. Here's another data point with a smaller repository, pypy, where performance is almost identical Before ! wall 0.015121 comb 0.030000 user 0.020000 sys 0.010000 (median of 186) After ! wall 0.015009 comb 0.010000 user 0.010000 sys 0.000000 (median of 184) Differential Revision: https://phab.mercurial-scm.org/D6430
author Georges Racinet <georges.racinet@octobus.net>
date Wed, 20 Feb 2019 09:04:54 +0100
parents afa884015e66
children 268662aac075
line wrap: on
line source

# exchangev2.py - repository exchange for wire protocol version 2
#
# Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.

from __future__ import absolute_import

import collections
import weakref

from .i18n import _
from .node import (
    nullid,
    short,
)
from . import (
    bookmarks,
    error,
    mdiff,
    narrowspec,
    phases,
    pycompat,
    repository,
    setdiscovery,
)

def pull(pullop):
    """Pull using wire protocol version 2."""
    repo = pullop.repo
    remote = pullop.remote

    usingrawchangelogandmanifest = _checkuserawstorefiledata(pullop)

    # If this is a clone and it was requested to perform a "stream clone",
    # we obtain the raw files data from the remote then fall back to an
    # incremental pull. This is somewhat hacky and is not nearly robust enough
    # for long-term usage.
    if usingrawchangelogandmanifest:
        with repo.transaction('clone'):
            _fetchrawstorefiles(repo, remote)
            repo.invalidate(clearfilecache=True)

    tr = pullop.trmanager.transaction()

    # We don't use the repo's narrow matcher here because the patterns passed
    # to exchange.pull() could be different.
    narrowmatcher = narrowspec.match(repo.root,
                                     # Empty maps to nevermatcher. So always
                                     # set includes if missing.
                                     pullop.includepats or {'path:.'},
                                     pullop.excludepats)

    if pullop.includepats or pullop.excludepats:
        pathfilter = {}
        if pullop.includepats:
            pathfilter[b'include'] = sorted(pullop.includepats)
        if pullop.excludepats:
            pathfilter[b'exclude'] = sorted(pullop.excludepats)
    else:
        pathfilter = None

    # Figure out what needs to be fetched.
    common, fetch, remoteheads = _pullchangesetdiscovery(
        repo, remote, pullop.heads, abortwhenunrelated=pullop.force)

    # And fetch the data.
    pullheads = pullop.heads or remoteheads
    csetres = _fetchchangesets(repo, tr, remote, common, fetch, pullheads)

    # New revisions are written to the changelog. But all other updates
    # are deferred. Do those now.

    # Ensure all new changesets are draft by default. If the repo is
    # publishing, the phase will be adjusted by the loop below.
    if csetres['added']:
        phases.registernew(repo, tr, phases.draft, csetres['added'])

    # And adjust the phase of all changesets accordingly.
    for phase in phases.phasenames:
        if phase == b'secret' or not csetres['nodesbyphase'][phase]:
            continue

        phases.advanceboundary(repo, tr, phases.phasenames.index(phase),
                               csetres['nodesbyphase'][phase])

    # Write bookmark updates.
    bookmarks.updatefromremote(repo.ui, repo, csetres['bookmarks'],
                               remote.url(), pullop.gettransaction,
                               explicit=pullop.explicitbookmarks)

    manres = _fetchmanifests(repo, tr, remote, csetres['manifestnodes'])

    # We don't properly support shallow changeset and manifest yet. So we apply
    # depth limiting locally.
    if pullop.depth:
        relevantcsetnodes = set()
        clnode = repo.changelog.node

        for rev in repo.revs(b'ancestors(%ln, %s)',
                             pullheads, pullop.depth - 1):
            relevantcsetnodes.add(clnode(rev))

        csetrelevantfilter = lambda n: n in relevantcsetnodes

    else:
        csetrelevantfilter = lambda n: True

    # If obtaining the raw store files, we need to scan the full repo to
    # derive all the changesets, manifests, and linkrevs.
    if usingrawchangelogandmanifest:
        csetsforfiles = []
        mnodesforfiles = []
        manifestlinkrevs = {}

        for rev in repo:
            ctx = repo[rev]
            node = ctx.node()

            if not csetrelevantfilter(node):
                continue

            mnode = ctx.manifestnode()

            csetsforfiles.append(node)
            mnodesforfiles.append(mnode)
            manifestlinkrevs[mnode] = rev

    else:
        csetsforfiles = [n for n in csetres['added'] if csetrelevantfilter(n)]
        mnodesforfiles = manres['added']
        manifestlinkrevs = manres['linkrevs']

    # Find all file nodes referenced by added manifests and fetch those
    # revisions.
    fnodes = _derivefilesfrommanifests(repo, narrowmatcher, mnodesforfiles)
    _fetchfilesfromcsets(repo, tr, remote, pathfilter, fnodes, csetsforfiles,
                         manifestlinkrevs, shallow=bool(pullop.depth))

def _checkuserawstorefiledata(pullop):
    """Check whether we should use rawstorefiledata command to retrieve data."""

    repo = pullop.repo
    remote = pullop.remote

    # Command to obtain raw store data isn't available.
    if b'rawstorefiledata' not in remote.apidescriptor[b'commands']:
        return False

    # Only honor if user requested stream clone operation.
    if not pullop.streamclonerequested:
        return False

    # Only works on empty repos.
    if len(repo):
        return False

    # TODO This is super hacky. There needs to be a storage API for this. We
    # also need to check for compatibility with the remote.
    if b'revlogv1' not in repo.requirements:
        return False

    return True

def _fetchrawstorefiles(repo, remote):
    with remote.commandexecutor() as e:
        objs = e.callcommand(b'rawstorefiledata', {
            b'files': [b'changelog', b'manifestlog'],
        }).result()

        # First object is a summary of files data that follows.
        overall = next(objs)

        progress = repo.ui.makeprogress(_('clone'), total=overall[b'totalsize'],
                                        unit=_('bytes'))
        with progress:
            progress.update(0)

            # Next are pairs of file metadata, data.
            while True:
                try:
                    filemeta = next(objs)
                except StopIteration:
                    break

                for k in (b'location', b'path', b'size'):
                    if k not in filemeta:
                        raise error.Abort(_(b'remote file data missing key: %s')
                                          % k)

                if filemeta[b'location'] == b'store':
                    vfs = repo.svfs
                else:
                    raise error.Abort(_(b'invalid location for raw file data: '
                                        b'%s') % filemeta[b'location'])

                bytesremaining = filemeta[b'size']

                with vfs.open(filemeta[b'path'], b'wb') as fh:
                    while True:
                        try:
                            chunk = next(objs)
                        except StopIteration:
                            break

                        bytesremaining -= len(chunk)

                        if bytesremaining < 0:
                            raise error.Abort(_(
                                b'received invalid number of bytes for file '
                                b'data; expected %d, got extra') %
                                              filemeta[b'size'])

                        progress.increment(step=len(chunk))
                        fh.write(chunk)

                        try:
                            if chunk.islast:
                                break
                        except AttributeError:
                            raise error.Abort(_(
                                b'did not receive indefinite length bytestring '
                                b'for file data'))

                if bytesremaining:
                    raise error.Abort(_(b'received invalid number of bytes for'
                                        b'file data; expected %d got %d') %
                                      (filemeta[b'size'],
                                       filemeta[b'size'] - bytesremaining))

def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True):
    """Determine which changesets need to be pulled."""

    if heads:
        knownnode = repo.changelog.hasnode
        if all(knownnode(head) for head in heads):
            return heads, False, heads

    # TODO wire protocol version 2 is capable of more efficient discovery
    # than setdiscovery. Consider implementing something better.
    common, fetch, remoteheads = setdiscovery.findcommonheads(
        repo.ui, repo, remote, abortwhenunrelated=abortwhenunrelated)

    common = set(common)
    remoteheads = set(remoteheads)

    # If a remote head is filtered locally, put it back in the common set.
    # See the comment in exchange._pulldiscoverychangegroup() for more.

    if fetch and remoteheads:
        nodemap = repo.unfiltered().changelog.nodemap

        common |= {head for head in remoteheads if head in nodemap}

        if set(remoteheads).issubset(common):
            fetch = []

    common.discard(nullid)

    return common, fetch, remoteheads

def _fetchchangesets(repo, tr, remote, common, fetch, remoteheads):
    # TODO consider adding a step here where we obtain the DAG shape first
    # (or ask the server to slice changesets into chunks for us) so that
    # we can perform multiple fetches in batches. This will facilitate
    # resuming interrupted clones, higher server-side cache hit rates due
    # to smaller segments, etc.
    with remote.commandexecutor() as e:
        objs = e.callcommand(b'changesetdata', {
            b'revisions': [{
                b'type': b'changesetdagrange',
                b'roots': sorted(common),
                b'heads': sorted(remoteheads),
            }],
            b'fields': {b'bookmarks', b'parents', b'phase', b'revision'},
        }).result()

        # The context manager waits on all response data when exiting. So
        # we need to remain in the context manager in order to stream data.
        return _processchangesetdata(repo, tr, objs)

def _processchangesetdata(repo, tr, objs):
    repo.hook('prechangegroup', throw=True,
              **pycompat.strkwargs(tr.hookargs))

    urepo = repo.unfiltered()
    cl = urepo.changelog

    cl.delayupdate(tr)

    # The first emitted object is a header describing the data that
    # follows.
    meta = next(objs)

    progress = repo.ui.makeprogress(_('changesets'),
                                    unit=_('chunks'),
                                    total=meta.get(b'totalitems'))

    manifestnodes = {}

    def linkrev(node):
        repo.ui.debug('add changeset %s\n' % short(node))
        # Linkrev for changelog is always self.
        return len(cl)

    def onchangeset(cl, node):
        progress.increment()

        revision = cl.changelogrevision(node)

        # We need to preserve the mapping of changelog revision to node
        # so we can set the linkrev accordingly when manifests are added.
        manifestnodes[cl.rev(node)] = revision.manifest

    nodesbyphase = {phase: set() for phase in phases.phasenames}
    remotebookmarks = {}

    # addgroup() expects a 7-tuple describing revisions. This normalizes
    # the wire data to that format.
    #
    # This loop also aggregates non-revision metadata, such as phase
    # data.
    def iterrevisions():
        for cset in objs:
            node = cset[b'node']

            if b'phase' in cset:
                nodesbyphase[cset[b'phase']].add(node)

            for mark in cset.get(b'bookmarks', []):
                remotebookmarks[mark] = node

            # TODO add mechanism for extensions to examine records so they
            # can siphon off custom data fields.

            extrafields = {}

            for field, size in cset.get(b'fieldsfollowing', []):
                extrafields[field] = next(objs)

            # Some entries might only be metadata only updates.
            if b'revision' not in extrafields:
                continue

            data = extrafields[b'revision']

            yield (
                node,
                cset[b'parents'][0],
                cset[b'parents'][1],
                # Linknode is always itself for changesets.
                cset[b'node'],
                # We always send full revisions. So delta base is not set.
                nullid,
                mdiff.trivialdiffheader(len(data)) + data,
                # Flags not yet supported.
                0,
            )

    added = cl.addgroup(iterrevisions(), linkrev, weakref.proxy(tr),
                        addrevisioncb=onchangeset)

    progress.complete()

    return {
        'added': added,
        'nodesbyphase': nodesbyphase,
        'bookmarks': remotebookmarks,
        'manifestnodes': manifestnodes,
    }

def _fetchmanifests(repo, tr, remote, manifestnodes):
    rootmanifest = repo.manifestlog.getstorage(b'')

    # Some manifests can be shared between changesets. Filter out revisions
    # we already know about.
    fetchnodes = []
    linkrevs = {}
    seen = set()

    for clrev, node in sorted(manifestnodes.iteritems()):
        if node in seen:
            continue

        try:
            rootmanifest.rev(node)
        except error.LookupError:
            fetchnodes.append(node)
            linkrevs[node] = clrev

        seen.add(node)

    # TODO handle tree manifests

    # addgroup() expects 7-tuple describing revisions. This normalizes
    # the wire data to that format.
    def iterrevisions(objs, progress):
        for manifest in objs:
            node = manifest[b'node']

            extrafields = {}

            for field, size in manifest.get(b'fieldsfollowing', []):
                extrafields[field] = next(objs)

            if b'delta' in extrafields:
                basenode = manifest[b'deltabasenode']
                delta = extrafields[b'delta']
            elif b'revision' in extrafields:
                basenode = nullid
                revision = extrafields[b'revision']
                delta = mdiff.trivialdiffheader(len(revision)) + revision
            else:
                continue

            yield (
                node,
                manifest[b'parents'][0],
                manifest[b'parents'][1],
                # The value passed in is passed to the lookup function passed
                # to addgroup(). We already have a map of manifest node to
                # changelog revision number. So we just pass in the
                # manifest node here and use linkrevs.__getitem__ as the
                # resolution function.
                node,
                basenode,
                delta,
                # Flags not yet supported.
                0
            )

            progress.increment()

    progress = repo.ui.makeprogress(_('manifests'), unit=_('chunks'),
                                    total=len(fetchnodes))

    commandmeta = remote.apidescriptor[b'commands'][b'manifestdata']
    batchsize = commandmeta.get(b'recommendedbatchsize', 10000)
    # TODO make size configurable on client?

    # We send commands 1 at a time to the remote. This is not the most
    # efficient because we incur a round trip at the end of each batch.
    # However, the existing frame-based reactor keeps consuming server
    # data in the background. And this results in response data buffering
    # in memory. This can consume gigabytes of memory.
    # TODO send multiple commands in a request once background buffering
    # issues are resolved.

    added = []

    for i in pycompat.xrange(0, len(fetchnodes), batchsize):
        batch = [node for node in fetchnodes[i:i + batchsize]]
        if not batch:
            continue

        with remote.commandexecutor() as e:
            objs = e.callcommand(b'manifestdata', {
                b'tree': b'',
                b'nodes': batch,
                b'fields': {b'parents', b'revision'},
                b'haveparents': True,
            }).result()

            # Chomp off header object.
            next(objs)

            added.extend(rootmanifest.addgroup(
                iterrevisions(objs, progress),
                linkrevs.__getitem__,
                weakref.proxy(tr)))

    progress.complete()

    return {
        'added': added,
        'linkrevs': linkrevs,
    }

def _derivefilesfrommanifests(repo, matcher, manifestnodes):
    """Determine what file nodes are relevant given a set of manifest nodes.

    Returns a dict mapping file paths to dicts of file node to first manifest
    node.
    """
    ml = repo.manifestlog
    fnodes = collections.defaultdict(dict)

    progress = repo.ui.makeprogress(
        _('scanning manifests'), total=len(manifestnodes))

    with progress:
        for manifestnode in manifestnodes:
            m = ml.get(b'', manifestnode)

            # TODO this will pull in unwanted nodes because it takes the storage
            # delta into consideration. What we really want is something that
            # takes the delta between the manifest's parents. And ideally we
            # would ignore file nodes that are known locally. For now, ignore
            # both these limitations. This will result in incremental fetches
            # requesting data we already have. So this is far from ideal.
            md = m.readfast()

            for path, fnode in md.items():
                if matcher(path):
                    fnodes[path].setdefault(fnode, manifestnode)

            progress.increment()

    return fnodes

def _fetchfiles(repo, tr, remote, fnodes, linkrevs):
    """Fetch file data from explicit file revisions."""
    def iterrevisions(objs, progress):
        for filerevision in objs:
            node = filerevision[b'node']

            extrafields = {}

            for field, size in filerevision.get(b'fieldsfollowing', []):
                extrafields[field] = next(objs)

            if b'delta' in extrafields:
                basenode = filerevision[b'deltabasenode']
                delta = extrafields[b'delta']
            elif b'revision' in extrafields:
                basenode = nullid
                revision = extrafields[b'revision']
                delta = mdiff.trivialdiffheader(len(revision)) + revision
            else:
                continue

            yield (
                node,
                filerevision[b'parents'][0],
                filerevision[b'parents'][1],
                node,
                basenode,
                delta,
                # Flags not yet supported.
                0,
            )

            progress.increment()

    progress = repo.ui.makeprogress(
        _('files'), unit=_('chunks'),
         total=sum(len(v) for v in fnodes.itervalues()))

    # TODO make batch size configurable
    batchsize = 10000
    fnodeslist = [x for x in sorted(fnodes.items())]

    for i in pycompat.xrange(0, len(fnodeslist), batchsize):
        batch = [x for x in fnodeslist[i:i + batchsize]]
        if not batch:
            continue

        with remote.commandexecutor() as e:
            fs = []
            locallinkrevs = {}

            for path, nodes in batch:
                fs.append((path, e.callcommand(b'filedata', {
                    b'path': path,
                    b'nodes': sorted(nodes),
                    b'fields': {b'parents', b'revision'},
                    b'haveparents': True,
                })))

                locallinkrevs[path] = {
                    node: linkrevs[manifestnode]
                    for node, manifestnode in nodes.iteritems()}

            for path, f in fs:
                objs = f.result()

                # Chomp off header objects.
                next(objs)

                store = repo.file(path)
                store.addgroup(
                    iterrevisions(objs, progress),
                    locallinkrevs[path].__getitem__,
                    weakref.proxy(tr))

def _fetchfilesfromcsets(repo, tr, remote, pathfilter, fnodes, csets,
                         manlinkrevs, shallow=False):
    """Fetch file data from explicit changeset revisions."""

    def iterrevisions(objs, remaining, progress):
        while remaining:
            filerevision = next(objs)

            node = filerevision[b'node']

            extrafields = {}

            for field, size in filerevision.get(b'fieldsfollowing', []):
                extrafields[field] = next(objs)

            if b'delta' in extrafields:
                basenode = filerevision[b'deltabasenode']
                delta = extrafields[b'delta']
            elif b'revision' in extrafields:
                basenode = nullid
                revision = extrafields[b'revision']
                delta = mdiff.trivialdiffheader(len(revision)) + revision
            else:
                continue

            if b'linknode' in filerevision:
                linknode = filerevision[b'linknode']
            else:
                linknode = node

            yield (
                node,
                filerevision[b'parents'][0],
                filerevision[b'parents'][1],
                linknode,
                basenode,
                delta,
                # Flags not yet supported.
                0,
            )

            progress.increment()
            remaining -= 1

    progress = repo.ui.makeprogress(
        _('files'), unit=_('chunks'),
        total=sum(len(v) for v in fnodes.itervalues()))

    commandmeta = remote.apidescriptor[b'commands'][b'filesdata']
    batchsize = commandmeta.get(b'recommendedbatchsize', 50000)

    shallowfiles = repository.REPO_FEATURE_SHALLOW_FILE_STORAGE in repo.features
    fields = {b'parents', b'revision'}
    clrev = repo.changelog.rev

    # There are no guarantees that we'll have ancestor revisions if
    # a) this repo has shallow file storage b) shallow data fetching is enabled.
    # Force remote to not delta against possibly unknown revisions when these
    # conditions hold.
    haveparents = not (shallowfiles or shallow)

    # Similarly, we may not have calculated linkrevs for all incoming file
    # revisions. Ask the remote to do work for us in this case.
    if not haveparents:
        fields.add(b'linknode')

    for i in pycompat.xrange(0, len(csets), batchsize):
        batch = [x for x in csets[i:i + batchsize]]
        if not batch:
            continue

        with remote.commandexecutor() as e:
            args = {
                b'revisions': [{
                    b'type': b'changesetexplicit',
                    b'nodes': batch,
                }],
                b'fields': fields,
                b'haveparents': haveparents,
            }

            if pathfilter:
                args[b'pathfilter'] = pathfilter

            objs = e.callcommand(b'filesdata', args).result()

            # First object is an overall header.
            overall = next(objs)

            # We have overall['totalpaths'] segments.
            for i in pycompat.xrange(overall[b'totalpaths']):
                header = next(objs)

                path = header[b'path']
                store = repo.file(path)

                linkrevs = {
                    fnode: manlinkrevs[mnode]
                    for fnode, mnode in fnodes[path].iteritems()}

                def getlinkrev(node):
                    if node in linkrevs:
                        return linkrevs[node]
                    else:
                        return clrev(node)

                store.addgroup(iterrevisions(objs, header[b'totalitems'],
                                             progress),
                               getlinkrev,
                               weakref.proxy(tr),
                               maybemissingparents=shallow)