view hgext/simple4server.py @ 963:f80e8e3c3726

simple4server: backport the bundle2 exchange
author Pierre-Yves David <pierre-yves.david@fb.com>
date Wed, 21 May 2014 12:01:28 -0700
parents 097ddcb0da25
children 5d063fed9e3d
line wrap: on
line source

'''enable experimental obsolescence feature of Mercurial

OBSOLESCENCE IS AN EXPERIMENTAL FEATURE MAKE SURE YOU UNDERSTOOD THE INVOLVED
CONCEPT BEFORE USING IT.

/!\ THIS EXTENSION IS INTENDED FOR SERVER SIDE ONLY USAGE /!\

For client side usages it is recommended to use the evolve extension for
improved user interface.'''

testedwith = '3.0.1'
buglink = 'https://bitbucket.org/marmoute/mutable-history/issues'

import mercurial.obsolete
mercurial.obsolete._enabled = True

import struct
from mercurial import util
from mercurial import wireproto
from mercurial import extensions
from mercurial import obsolete
from cStringIO import StringIO
from mercurial import node
from mercurial.hgweb import hgweb_mod
from mercurial import bundle2
from mercurial import localrepo
from mercurial import exchange
_pack = struct.pack

gboptslist = gboptsmap = None
try:
    from mercurial import obsolete
    if not obsolete._enabled:
        obsolete._enabled = True
    from mercurial import wireproto
    gboptslist = getattr(wireproto, 'gboptslist', None)
    gboptsmap = getattr(wireproto, 'gboptsmap', None)
except (ImportError, AttributeError):
    raise util.Abort('Your Mercurial is too old for this version of Evolve\n'
                     'requires version 3.0.1 or above')

# Start of simple4server specific content

from mercurial import pushkey

# specific content also include the wrapping int extsetup
def _nslist(orig, repo):
    rep = orig(repo)
    if not repo.ui.configbool('__temporary__', 'advertiseobsolete', True):
        rep.pop('obsolete')
    return rep

# End of simple4server specific content



# from evolve extension: 1a23c7c52a43
def srv_pushobsmarkers(repo, proto):
    """That receives a stream of markers and apply then to the repo"""
    fp = StringIO()
    proto.redirect()
    proto.getfile(fp)
    data = fp.getvalue()
    fp.close()
    lock = repo.lock()
    try:
        tr = repo.transaction('pushkey: obsolete markers')
        try:
            repo.obsstore.mergemarkers(tr, data)
            tr.close()
        finally:
            tr.release()
    finally:
        lock.release()
    repo.hook('evolve_pushobsmarkers')
    return wireproto.pushres(0)

# from mercurial.obsolete: 19e9478c1a22
def _encodemarkersstream(fp, markers):
    """write a binary version of a set of markers

    Includes the initial version number"""
    fp.write(_pack('>B', 0))
    for mark in markers:
        fp.write(obsolete._encodeonemarker(mark))

# from evolve extension: 1a23c7c52a43
def _getobsmarkersstream(repo, heads=None, common=None):
    """Get a binary stream for all markers relevant to `::<heads> - ::<common>`
    """
    revset = ''
    args = []
    repo = repo.unfiltered()
    if heads is None:
        revset = 'all()'
    elif heads:
        revset += "(::%ln)"
        args.append(heads)
    else:
        assert False, 'pulling no heads?'
    if common:
        revset += ' - (::%ln)'
        args.append(common)
    nodes = [c.node() for c in repo.set(revset, *args)]
    markers = repo.obsstore.relevantmarkers(nodes)
    obsdata = StringIO()
    _encodemarkersstream(obsdata, markers)
    obsdata.seek(0)
    return obsdata

# from evolve extension: 1a23c7c52a43
class pruneobsstore(obsolete.obsstore):
    """And extended obsstore class that read parent information from v1 format

    Evolve extension adds parent information in prune marker. We use it to make
    markers relevant to pushed changeset."""

    def __init__(self, *args, **kwargs):
        self.prunedchildren = {}
        return super(pruneobsstore, self).__init__(*args, **kwargs)

    def _load(self, markers):
        markers = self._prunedetectingmarkers(markers)
        return super(pruneobsstore, self)._load(markers)


    def _prunedetectingmarkers(self, markers):
        for m in markers:
            if not m[1]: # no successors
                meta = obsolete.decodemeta(m[3])
                if 'p1' in meta:
                    p1 = node.bin(meta['p1'])
                    self.prunedchildren.setdefault(p1, set()).add(m)
                if 'p2' in meta:
                    p2 = node.bin(meta['p2'])
                    self.prunedchildren.setdefault(p2, set()).add(m)
            yield m

# from evolve extension: 1a23c7c52a43
def relevantmarkers(self, nodes):
    """return a set of all obsolescence marker relevant to a set of node.

    "relevant" to a set of node mean:

    - marker that use this changeset as successors
    - prune marker of direct children on this changeset.
    - recursive application of the two rules on precursors of these markers

    It is a set so you cannot rely on order"""
    seennodes = set(nodes)
    seenmarkers = set()
    pendingnodes = set(nodes)
    precursorsmarkers = self.precursors
    prunedchildren = self.prunedchildren
    while pendingnodes:
        direct = set()
        for current in pendingnodes:
            direct.update(precursorsmarkers.get(current, ()))
            direct.update(prunedchildren.get(current, ()))
        direct -= seenmarkers
        pendingnodes = set([m[0] for m in direct])
        seenmarkers |= direct
        pendingnodes -= seennodes
        seennodes |= pendingnodes
    return seenmarkers

# from evolve extension: cf35f38d6a10
def srv_pullobsmarkers(repo, proto, others):
    """serves a binary stream of markers.

    Serves relevant to changeset between heads and common. The stream is prefix
    by a -string- representation of an integer. This integer is the size of the
    stream."""
    opts = wireproto.options('', ['heads', 'common'], others)
    for k, v in opts.iteritems():
        if k in ('heads', 'common'):
            opts[k] = wireproto.decodelist(v)
    obsdata = _getobsmarkersstream(repo, **opts)
    finaldata = StringIO()
    obsdata = obsdata.getvalue()
    finaldata.write('%20i' % len(obsdata))
    finaldata.write(obsdata)
    finaldata.seek(0)
    return wireproto.streamres(proto.groupchunks(finaldata))


# from evolve extension: 1a23c7c52a43
def _obsrelsethashtree(repo):
    """Build an obshash for every node in a repo

    return a [(node), (obshash)] list. in revision order."""
    cache = []
    unfi = repo.unfiltered()
    for i in unfi:
        ctx = unfi[i]
        entry = 0
        sha = util.sha1()
        # add data from p1
        for p in ctx.parents():
            p = p.rev()
            if p < 0:
                p = node.nullid
            else:
                p = cache[p][1]
            if p != node.nullid:
                entry += 1
                sha.update(p)
        tmarkers = repo.obsstore.relevantmarkers([ctx.node()])
        if tmarkers:
            bmarkers = [obsolete._encodeonemarker(m) for m in tmarkers]
            bmarkers.sort()
            for m in bmarkers:
                entry += 1
                sha.update(m)
        if entry:
            cache.append((ctx.node(), sha.digest()))
        else:
            cache.append((ctx.node(), node.nullid))
    return cache

# from evolve extension: 1a23c7c52a43
def _obshash(repo, nodes):
    """hash of binary version of relevant markers + obsparent

    (special case so that all empty are hashed as nullid)"""
    hashs = _obsrelsethashtree(repo)
    nm = repo.changelog.nodemap
    return  [hashs[nm.get(n)][1] for n in nodes]

# from evolve extension: 1a23c7c52a43
def srv_obshash(repo, proto, nodes):
    """give the obshash of a a set of node

    Used for markes discovery"""
    return wireproto.encodelist(_obshash(repo, wireproto.decodelist(nodes)))

# from evolve extension: 1a23c7c52a43
def capabilities(orig, repo, proto):
    """wrapper to advertise new capability"""
    caps = orig(repo, proto)
    advertise = repo.ui.configbool('__temporary__', 'advertiseobsolete', True)
    if obsolete._enabled and advertise:
        caps += ' _evoext_pushobsmarkers_0'
        caps += ' _evoext_pullobsmarkers_0'
        caps += ' _evoext_obshash_0'
        caps += ' _evoext_b2x_obsmarkers_0'
    return caps


# from evolve extension: 10867a8e27c6
# heavily modified
def extsetup(ui):
    localrepo.moderncaps.add('_evoext_b2x_obsmarkers_0')
    if gboptsmap is not None:
        gboptsmap['evo_obsmarker'] = 'plain'
        gboptsmap['evo_obscommon'] = 'plain'
        gboptsmap['evo_obsheads'] = 'plain'
    else:
        gboptslist.append('evo_obsheads')
        gboptslist.append('evo_obscommon')
        gboptslist.append('evo_obsmarker')
    obsolete.obsstore = pruneobsstore
    obsolete.obsstore.relevantmarkers = relevantmarkers
    hgweb_mod.perms['evoext_pushobsmarkers_0'] = 'push'
    hgweb_mod.perms['evoext_pullobsmarkers_0'] = 'pull'
    hgweb_mod.perms['evoext_obshash'] = 'pull'
    wireproto.commands['evoext_pushobsmarkers_0'] = (srv_pushobsmarkers, '')
    wireproto.commands['evoext_pullobsmarkers_0'] = (srv_pullobsmarkers, '*')
    # wrap module content
    extensions.wrapfunction(exchange, '_getbundleextrapart', _getbundleextrapart)
    extensions.wrapfunction(wireproto, 'capabilities', capabilities)
    # wrap command content
    oldcap, args = wireproto.commands['capabilities']
    def newcap(repo, proto):
        return capabilities(oldcap, repo, proto)
    wireproto.commands['capabilities'] = (newcap, args)
    wireproto.commands['evoext_obshash'] = (srv_obshash, 'nodes')
    # specific simple4server content
    extensions.wrapfunction(pushkey, '_nslist', _nslist)
    pushkey._namespaces['namespaces'] = (lambda *x: False, pushkey._nslist)


#from evolve extension
@bundle2.parthandler('evolve:b2x:obsmarkerv1')
def handleobsmarkerv1(op, inpart):
    """add a stream of obsmarker to the repo"""
    tr = op.gettransaction()
    advparams = dict(inpart.advisoryparams)
    length = advparams.get('totalbytes')
    if length is None:
        obsdata = inpart.read()
    else:
        length = int(length)
        data = StringIO()
        current = 0
        op.ui.progress('OBSEXC', current, unit="bytes", total=length)
        while current < length:
            readsize = min(length-current, 4096)
            data.write(inpart.read(readsize))
            current += readsize
            op.ui.progress('OBSEXC', current, unit="bytes", total=length)
        op.ui.progress('OBSEXC', None)
        obsdata = data.getvalue()
    totalsize = len(obsdata)
    old = len(op.repo.obsstore._all)
    op.repo.obsstore.mergemarkers(tr, obsdata)
    new = len(op.repo.obsstore._all) - old
    op.records.add('evo_obsmarkers', {'new': new, 'bytes': totalsize})
    tr.hookargs['evolve_new_obsmarkers'] = str(new)

#from evolve extension
def _getbundleextrapart(orig, bundler, repo, source, **kwargs):
    if int(kwargs.pop('evo_obsmarker', False)):
        common = kwargs.pop('evo_obscommon')
        common = wireproto.decodelist(common)
        heads = kwargs.pop('evo_obsheads')
        heads = wireproto.decodelist(heads)
        obsdata = _getobsmarkersstream(repo, common=common, heads=heads)
        if len(obsdata.getvalue()) > 5:
            advparams = [('totalbytes', str(len(obsdata.getvalue())))]
            obspart = bundle2.bundlepart('EVOLVE:B2X:OBSMARKERV1',
                                         advisoryparams=advparams,
                                         data=obsdata)
            bundler.addpart(obspart)
    orig(bundler, repo, source)