view mercurial/repair.py @ 26623:5a95fe44121d

clonebundles: support for seeding clones from pre-generated bundles Cloning can be an expensive operation for servers because the server generates a bundle from existing repository data at request time. For a large repository like mozilla-central, this consumes 4+ minutes of CPU time on the server. It also results in significant network utilization. Multiplied by hundreds or even thousands of clients and the ensuing load can result in difficulties scaling the Mercurial server. Despite generation of bundles being deterministic until the next changeset is added, the generation of bundles to service a clone request is not cached. Each clone thus performs redundant work. This is wasteful. This patch introduces the "clonebundles" extension and related client-side functionality to help alleviate this deficiency. The client-side feature is behind an experimental flag and is not enabled by default. It works as follows: 1) Server operator generates a bundle and makes it available on a server (likely HTTP). 2) Server operator defines the URL of a bundle file in a .hg/clonebundles.manifest file. 3) Client `hg clone`ing sees the server is advertising bundle URLs. 4) Client fetches and applies the advertised bundle. 5) Client performs equivalent of `hg pull` to fetch changes made since the bundle was created. Essentially, the server performs the expensive work of generating a bundle once and all subsequent clones fetch a static file from somewhere. Scaling static file serving is a much more manageable problem than scaling a Python application like Mercurial. Assuming your repository grows less than 1% per day, the end result is 99+% of CPU and network load from clones is eliminated, allowing Mercurial servers to scale more easily. Serving static files also means data can be transferred to clients as fast as they can consume it, rather than as fast as servers can generate it. This makes clones faster. Mozilla has implemented similar functionality of this patch on hg.mozilla.org using a custom extension. We are hosting bundle files in Amazon S3 and CloudFront (a CDN) and have successfully offloaded >1 TB/day in data transfer from hg.mozilla.org, freeing up significant bandwidth and CPU resources. The positive impact has been stellar and I believe it has proved its value to be included in Mercurial core. I feel it is important for the client-side support to be enabled in core by default because it means that clients will get faster, more reliable clones and will enable server operators to reduce load without requiring any client-side configuration changes (assuming clients are up to date, of course). The scope of this feature is narrowly and specifically tailored to cloning, despite "serve pulls from pre-generated bundles" being a valid and useful feature. I would eventually like for Mercurial servers to support transferring *all* repository data via statically hosted files. You could imagine a server that siphons all pushed data to bundle files and instructs clients to apply a stream of bundles to reconstruct all repository data. This feature, while useful and powerful, is significantly more work to implement because it requires the server component have awareness of discovery and a mapping of which changesets are in which files. Full, clone bundles, by contrast, are much simpler. The wire protocol command is named "clonebundles" instead of something more generic like "staticbundles" to leave the door open for a new, more powerful and more generic server-side component with minimal backwards compatibility implications. The name "bundleclone" is used by Mozilla's extension and would cause problems since there are subtle differences in Mozilla's extension. Mozilla's experience with this idea has taught us that some form of "content negotiation" is required. Not all clients will support all bundle formats or even URLs (advanced TLS requirements, etc). To ensure the highest uptake possible, a server needs to advertise multiple versions of bundles and clients need to be able to choose the most appropriate from that list one. The "attributes" in each server-advertised entry facilitate this filtering and sorting. Their use will become apparent in subsequent patches. Initial inspiration and credit for the idea of cloning from static files belongs to Augie Fackler and his "lookaside clone" extension proof of concept.
author Gregory Szorc <gregory.szorc@gmail.com>
date Fri, 09 Oct 2015 11:22:01 -0700
parents 56b2bcea2529
children bcace0fbb4c8
line wrap: on
line source

# repair.py - functions for repository repair for mercurial
#
# Copyright 2005, 2006 Chris Mason <mason@suse.com>
# Copyright 2007 Matt Mackall
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.

from __future__ import absolute_import

import errno

from .i18n import _
from .node import short
from . import (
    bundle2,
    changegroup,
    error,
    exchange,
    util,
)

def _bundle(repo, bases, heads, node, suffix, compress=True):
    """create a bundle with the specified revisions as a backup"""
    cgversion = '01'
    if 'generaldelta' in repo.requirements:
        cgversion = '02'

    cg = changegroup.changegroupsubset(repo, bases, heads, 'strip',
                                       version=cgversion)
    backupdir = "strip-backup"
    vfs = repo.vfs
    if not vfs.isdir(backupdir):
        vfs.mkdir(backupdir)

    # Include a hash of all the nodes in the filename for uniqueness
    allcommits = repo.set('%ln::%ln', bases, heads)
    allhashes = sorted(c.hex() for c in allcommits)
    totalhash = util.sha1(''.join(allhashes)).hexdigest()
    name = "%s/%s-%s-%s.hg" % (backupdir, short(node), totalhash[:8], suffix)

    comp = None
    if cgversion != '01':
        bundletype = "HG20"
        if compress:
            comp = 'BZ'
    elif compress:
        bundletype = "HG10BZ"
    else:
        bundletype = "HG10UN"
    return changegroup.writebundle(repo.ui, cg, name, bundletype, vfs,
                                   compression=comp)

def _collectfiles(repo, striprev):
    """find out the filelogs affected by the strip"""
    files = set()

    for x in xrange(striprev, len(repo)):
        files.update(repo[x].files())

    return sorted(files)

def _collectbrokencsets(repo, files, striprev):
    """return the changesets which will be broken by the truncation"""
    s = set()
    def collectone(revlog):
        _, brokenset = revlog.getstrippoint(striprev)
        s.update([revlog.linkrev(r) for r in brokenset])

    collectone(repo.manifest)
    for fname in files:
        collectone(repo.file(fname))

    return s

def strip(ui, repo, nodelist, backup=True, topic='backup'):

    # Simple way to maintain backwards compatibility for this
    # argument.
    if backup in ['none', 'strip']:
        backup = False

    repo = repo.unfiltered()
    repo.destroying()

    cl = repo.changelog
    # TODO handle undo of merge sets
    if isinstance(nodelist, str):
        nodelist = [nodelist]
    striplist = [cl.rev(node) for node in nodelist]
    striprev = min(striplist)

    # Some revisions with rev > striprev may not be descendants of striprev.
    # We have to find these revisions and put them in a bundle, so that
    # we can restore them after the truncations.
    # To create the bundle we use repo.changegroupsubset which requires
    # the list of heads and bases of the set of interesting revisions.
    # (head = revision in the set that has no descendant in the set;
    #  base = revision in the set that has no ancestor in the set)
    tostrip = set(striplist)
    for rev in striplist:
        for desc in cl.descendants([rev]):
            tostrip.add(desc)

    files = _collectfiles(repo, striprev)
    saverevs = _collectbrokencsets(repo, files, striprev)

    # compute heads
    saveheads = set(saverevs)
    for r in xrange(striprev + 1, len(cl)):
        if r not in tostrip:
            saverevs.add(r)
            saveheads.difference_update(cl.parentrevs(r))
            saveheads.add(r)
    saveheads = [cl.node(r) for r in saveheads]

    # compute base nodes
    if saverevs:
        descendants = set(cl.descendants(saverevs))
        saverevs.difference_update(descendants)
    savebases = [cl.node(r) for r in saverevs]
    stripbases = [cl.node(r) for r in tostrip]

    # For a set s, max(parents(s) - s) is the same as max(heads(::s - s)), but
    # is much faster
    newbmtarget = repo.revs('max(parents(%ld) - (%ld))', tostrip, tostrip)
    if newbmtarget:
        newbmtarget = repo[newbmtarget.first()].node()
    else:
        newbmtarget = '.'

    bm = repo._bookmarks
    updatebm = []
    for m in bm:
        rev = repo[bm[m]].rev()
        if rev in tostrip:
            updatebm.append(m)

    # create a changegroup for all the branches we need to keep
    backupfile = None
    vfs = repo.vfs
    node = nodelist[-1]
    if backup:
        backupfile = _bundle(repo, stripbases, cl.heads(), node, topic)
        repo.ui.status(_("saved backup bundle to %s\n") %
                       vfs.join(backupfile))
        repo.ui.log("backupbundle", "saved backup bundle to %s\n",
                    vfs.join(backupfile))
    if saveheads or savebases:
        # do not compress partial bundle if we remove it from disk later
        chgrpfile = _bundle(repo, savebases, saveheads, node, 'temp',
                            compress=False)

    mfst = repo.manifest

    curtr = repo.currenttransaction()
    if curtr is not None:
        del curtr  # avoid carrying reference to transaction for nothing
        msg = _('programming error: cannot strip from inside a transaction')
        raise error.Abort(msg, hint=_('contact your extension maintainer'))

    tr = repo.transaction("strip")
    offset = len(tr.entries)

    try:
        tr.startgroup()
        cl.strip(striprev, tr)
        mfst.strip(striprev, tr)
        for fn in files:
            repo.file(fn).strip(striprev, tr)
        tr.endgroup()

        try:
            for i in xrange(offset, len(tr.entries)):
                file, troffset, ignore = tr.entries[i]
                repo.svfs(file, 'a').truncate(troffset)
                if troffset == 0:
                    repo.store.markremoved(file)
            tr.close()
        finally:
            tr.release()

        if saveheads or savebases:
            ui.note(_("adding branch\n"))
            f = vfs.open(chgrpfile, "rb")
            gen = exchange.readbundle(ui, f, chgrpfile, vfs)
            if not repo.ui.verbose:
                # silence internal shuffling chatter
                repo.ui.pushbuffer()
            if isinstance(gen, bundle2.unbundle20):
                tr = repo.transaction('strip')
                tr.hookargs = {'source': 'strip',
                               'url': 'bundle:' + vfs.join(chgrpfile)}
                try:
                    bundle2.processbundle(repo, gen, lambda: tr)
                    tr.close()
                finally:
                    tr.release()
            else:
                changegroup.addchangegroup(repo, gen, 'strip',
                                           'bundle:' + vfs.join(chgrpfile),
                                           True)
            if not repo.ui.verbose:
                repo.ui.popbuffer()
            f.close()

        # remove undo files
        for undovfs, undofile in repo.undofiles():
            try:
                undovfs.unlink(undofile)
            except OSError as e:
                if e.errno != errno.ENOENT:
                    ui.warn(_('error removing %s: %s\n') %
                            (undovfs.join(undofile), str(e)))

        for m in updatebm:
            bm[m] = repo[newbmtarget].node()
        bm.write()
    except: # re-raises
        if backupfile:
            ui.warn(_("strip failed, full bundle stored in '%s'\n")
                    % vfs.join(backupfile))
        elif saveheads:
            ui.warn(_("strip failed, partial bundle stored in '%s'\n")
                    % vfs.join(chgrpfile))
        raise
    else:
        if saveheads or savebases:
            # Remove partial backup only if there were no exceptions
            vfs.unlink(chgrpfile)

    repo.destroyed()

def rebuildfncache(ui, repo):
    """Rebuilds the fncache file from repo history.

    Missing entries will be added. Extra entries will be removed.
    """
    repo = repo.unfiltered()

    if 'fncache' not in repo.requirements:
        ui.warn(_('(not rebuilding fncache because repository does not '
                  'support fncache)\n'))
        return

    lock = repo.lock()
    try:
        fnc = repo.store.fncache
        # Trigger load of fncache.
        if 'irrelevant' in fnc:
            pass

        oldentries = set(fnc.entries)
        newentries = set()
        seenfiles = set()

        repolen = len(repo)
        for rev in repo:
            ui.progress(_('changeset'), rev, total=repolen)

            ctx = repo[rev]
            for f in ctx.files():
                # This is to minimize I/O.
                if f in seenfiles:
                    continue
                seenfiles.add(f)

                i = 'data/%s.i' % f
                d = 'data/%s.d' % f

                if repo.store._exists(i):
                    newentries.add(i)
                if repo.store._exists(d):
                    newentries.add(d)

        ui.progress(_('changeset'), None)

        addcount = len(newentries - oldentries)
        removecount = len(oldentries - newentries)
        for p in sorted(oldentries - newentries):
            ui.write(_('removing %s\n') % p)
        for p in sorted(newentries - oldentries):
            ui.write(_('adding %s\n') % p)

        if addcount or removecount:
            ui.write(_('%d items added, %d removed from fncache\n') %
                     (addcount, removecount))
            fnc.entries = newentries
            fnc._dirty = True

            tr = repo.transaction('fncache')
            try:
                fnc.write(tr)
                tr.close()
            finally:
                tr.release()
        else:
            ui.write(_('fncache already up to date\n'))
    finally:
        lock.release()