view hgext3rd/evolve/obsdiscovery.py @ 3958:926c8e2f5400 stable

obshashrange: fix computation of affected ranges The computation of impacted nodes and associated revs is fully reworked. In addition,we introduce multiple new tests covering cases that were previous wrongly handled.
author Pierre-Yves David <pierre-yves.david@octobus.net>
date Fri, 17 Aug 2018 00:23:20 +0200
parents 98295547c40f
children 42efc12b3d10
line wrap: on
line source

# Code dedicated to the discovery of obsolescence marker "over the wire"
#
# Copyright 2017 Pierre-Yves David <pierre-yves.david@ens-lyon.org>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.

# Status: Experiment in progress // open question
#
#   The final discovery algorithm and protocol will go into core when we'll be
#   happy with it.
#
#   Some of the code in this module is for compatiblity with older version
#   of evolve and will be eventually dropped.

from __future__ import absolute_import

try:
    import StringIO as io
    StringIO = io.StringIO
except ImportError:
    import io
    StringIO = io.StringIO

import hashlib
import heapq
import sqlite3
import struct
import weakref

from mercurial import (
    dagutil,
    error,
    exchange,
    extensions,
    localrepo,
    node,
    obsolete,
    scmutil,
    setdiscovery,
    util,
)
from mercurial.i18n import _

from . import (
    compat,
    exthelper,
    obscache,
    utility,
    stablerange,
    stablerangecache,
)

try:
    from mercurial import wireprototypes, wireprotov1server
    from mercurial.wireprotov1peer import wirepeer
    from mercurial.wireprototypes import encodelist, decodelist
except (ImportError, AttributeError): # <= hg-4.5
    from mercurial import wireproto as wireprototypes
    wireprotov1server = wireprototypes
    from mercurial.wireproto import wirepeer, encodelist, decodelist

_pack = struct.pack
_unpack = struct.unpack
_calcsize = struct.calcsize

eh = exthelper.exthelper()
obsexcmsg = utility.obsexcmsg

# Config
eh.configitem('experimental', 'evolution.obsdiscovery')
eh.configitem('experimental', 'obshashrange')
eh.configitem('experimental', 'obshashrange.warm-cache')
eh.configitem('experimental', 'obshashrange.max-revs')
eh.configitem('experimental', 'obshashrange.lru-size')

##################################
###  Code performing discovery ###
##################################

def findcommonobsmarkers(ui, local, remote, probeset,
                         initialsamplesize=100,
                         fullsamplesize=200):
    # from discovery
    roundtrips = 0
    cl = local.changelog
    dag = dagutil.revlogdag(cl)
    missing = set()
    common = set()
    undecided = set(probeset)
    totalnb = len(undecided)
    ui.progress(_("comparing with other"), 0, total=totalnb,
                unit=_("changesets"))
    _takefullsample = setdiscovery._takefullsample
    if remote.capable('_evoext_obshash_1'):
        getremotehash = remote.evoext_obshash1
        localhash = _obsrelsethashtreefm1(local)
    else:
        getremotehash = remote.evoext_obshash
        localhash = _obsrelsethashtreefm0(local)

    while undecided:

        ui.note(_("sampling from both directions\n"))
        if len(undecided) < fullsamplesize:
            sample = set(undecided)
        else:
            sample = _takefullsample(dag, undecided, size=fullsamplesize)

        roundtrips += 1
        ui.progress(_("comparing with other"), totalnb - len(undecided),
                    total=totalnb, unit=_("changesets"))
        ui.debug("query %i; still undecided: %i, sample size is: %i\n"
                 % (roundtrips, len(undecided), len(sample)))
        # indices between sample and externalized version must match
        sample = list(sample)
        remotehash = getremotehash(dag.externalizeall(sample))

        yesno = [localhash[ix][1] == remotehash[si]
                 for si, ix in enumerate(sample)]

        commoninsample = set(n for i, n in enumerate(sample) if yesno[i])
        common.update(dag.ancestorset(commoninsample, common))

        missinginsample = [n for i, n in enumerate(sample) if not yesno[i]]
        missing.update(dag.descendantset(missinginsample, missing))

        undecided.difference_update(missing)
        undecided.difference_update(common)

    ui.progress(_("comparing with other"), None)
    result = dag.headsetofconnecteds(common)
    ui.debug("%d total queries\n" % roundtrips)

    if not result:
        return set([node.nullid])
    return dag.externalizeall(result)

def findmissingrange(ui, local, remote, probeset,
                     initialsamplesize=100,
                     fullsamplesize=200):
    missing = set()
    starttime = util.timer()

    heads = local.revs('heads(%ld)', probeset)
    local.stablerange.warmup(local)

    rangelength = local.stablerange.rangelength
    subranges = local.stablerange.subranges
    # size of slice ?
    heappop = heapq.heappop
    heappush = heapq.heappush
    heapify = heapq.heapify

    tested = set()

    sample = []
    samplesize = initialsamplesize

    def addentry(entry):
        if entry in tested:
            return False
        sample.append(entry)
        tested.add(entry)
        return True

    for h in heads:
        entry = (h, 0)
        addentry(entry)

    local.obsstore.rangeobshashcache.update(local)
    querycount = 0
    ui.progress(_("comparing obsmarker with other"), querycount,
                unit=_("queries"))
    overflow = []
    while sample or overflow:
        if overflow:
            sample.extend(overflow)
            overflow = []

        if samplesize < len(sample):
            # too much sample already
            overflow = sample[samplesize:]
            sample = sample[:samplesize]
        elif len(sample) < samplesize:
            ui.debug("query %i; add more sample (target %i, current %i)\n"
                     % (querycount, samplesize, len(sample)))
            # we need more sample !
            needed = samplesize - len(sample)
            sliceme = []
            heapify(sliceme)
            for entry in sample:
                if 1 < rangelength(local, entry):
                    heappush(sliceme, (-rangelength(local, entry), entry))

            while sliceme and 0 < needed:
                _key, target = heappop(sliceme)
                for new in subranges(local, target):
                    # XXX we could record hierarchy to optimise drop
                    if addentry(new):
                        if 1 < len(new):
                            heappush(sliceme, (-rangelength(local, new), new))
                        needed -= 1
                        if needed <= 0:
                            break

        # no longer the first interation
        samplesize = fullsamplesize

        nbsample = len(sample)
        maxsize = max([rangelength(local, r) for r in sample])
        ui.debug("query %i; sample size is %i, largest range %i\n"
                 % (querycount, nbsample, maxsize))
        nbreplies = 0
        replies = list(_queryrange(ui, local, remote, sample))
        sample = []
        n = local.changelog.node
        for entry, remotehash in replies:
            nbreplies += 1
            if remotehash == _obshashrange(local, entry):
                continue
            elif 1 == rangelength(local, entry):
                missing.add(n(entry[0]))
            else:
                for new in subranges(local, entry):
                    addentry(new)
        assert nbsample == nbreplies
        querycount += 1
        ui.progress(_("comparing obsmarker with other"), querycount,
                    unit=_("queries"))
    ui.progress(_("comparing obsmarker with other"), None)
    local.obsstore.rangeobshashcache.save(local)
    duration = util.timer() - starttime
    logmsg = ('obsdiscovery, %d/%d mismatch'
              ' - %d obshashrange queries in %.4f seconds\n')
    logmsg %= (len(missing), len(probeset), querycount, duration)
    ui.log('evoext-obsdiscovery', logmsg)
    ui.debug(logmsg)
    return sorted(missing)

def _queryrange(ui, repo, remote, allentries):
    #  question are asked with node
    n = repo.changelog.node
    noderanges = [(n(entry[0]), entry[1]) for entry in allentries]
    replies = remote.evoext_obshashrange_v1(noderanges)
    result = []
    for idx, entry in enumerate(allentries):
        result.append((entry, replies[idx]))
    return result

##############################
### Range Hash computation ###
##############################

@eh.command(
    'debugobshashrange',
    [
        ('', 'rev', [], 'display obshash for all (rev, 0) range in REVS'),
        ('', 'subranges', False, 'display all subranges'),
    ],
    _(''))
def debugobshashrange(ui, repo, **opts):
    """display the ::REVS set topologically sorted in a stable way
    """
    s = node.short
    revs = scmutil.revrange(repo, opts['rev'])
    # prewarm depth cache
    if revs:
        repo.stablerange.warmup(repo, max(revs))
    cl = repo.changelog
    rangelength = repo.stablerange.rangelength
    depthrev = repo.stablerange.depthrev
    if opts['subranges']:
        ranges = stablerange.subrangesclosure(repo, repo.stablerange, revs)
    else:
        ranges = [(r, 0) for r in revs]
    headers = ('rev', 'node', 'index', 'size', 'depth', 'obshash')
    linetemplate = '%12d %12s %12d %12d %12d %12s\n'
    headertemplate = linetemplate.replace('d', 's')
    ui.status(headertemplate % headers)
    repo.obsstore.rangeobshashcache.update(repo)
    for r in ranges:
        d = (r[0],
             s(cl.node(r[0])),
             r[1],
             rangelength(repo, r),
             depthrev(repo, r[0]),
             node.short(_obshashrange(repo, r)))
        ui.status(linetemplate % d)
    repo.obsstore.rangeobshashcache.save(repo)

def _obshashrange(repo, rangeid):
    """return the obsolete hash associated to a range"""
    cache = repo.obsstore.rangeobshashcache
    cl = repo.changelog
    obshash = cache.get(rangeid)
    if obshash is not None:
        return obshash
    pieces = []
    nullid = node.nullid
    if repo.stablerange.rangelength(repo, rangeid) == 1:
        rangenode = cl.node(rangeid[0])
        tmarkers = repo.obsstore.relevantmarkers([rangenode])
        pieces = []
        for m in tmarkers:
            mbin = obsolete._fm1encodeonemarker(m)
            pieces.append(mbin)
        pieces.sort()
    else:
        for subrange in repo.stablerange.subranges(repo, rangeid):
            obshash = _obshashrange(repo, subrange)
            if obshash != nullid:
                pieces.append(obshash)

    sha = hashlib.sha1()
    # note: if there is only one subrange with actual data, we'll just
    # reuse the same hash.
    if not pieces:
        obshash = node.nullid
    elif len(pieces) != 1 or obshash is None:
        sha = hashlib.sha1()
        for p in pieces:
            sha.update(p)
        obshash = sha.digest()
    cache[rangeid] = obshash
    return obshash

### sqlite caching

_sqliteschema = [
    """CREATE TABLE meta(schemaversion INTEGER NOT NULL,
                         tiprev        INTEGER NOT NULL,
                         tipnode       BLOB    NOT NULL,
                         nbobsmarker   INTEGER NOT NULL,
                         obssize       BLOB    NOT NULL,
                         obskey        BLOB    NOT NULL
                        );""",
    """CREATE TABLE obshashrange(rev     INTEGER NOT NULL,
                                 idx     INTEGER NOT NULL,
                                 obshash BLOB    NOT NULL,
                                 PRIMARY KEY(rev, idx));""",
    "CREATE INDEX range_index ON obshashrange(rev, idx);",
]
_queryexist = "SELECT name FROM sqlite_master WHERE type='table' AND name='meta';"
_clearmeta = """DELETE FROM meta;"""
_newmeta = """INSERT INTO meta (schemaversion, tiprev, tipnode, nbobsmarker, obssize, obskey)
            VALUES (?,?,?,?,?,?);"""
_updateobshash = "INSERT INTO obshashrange(rev, idx, obshash) VALUES (?,?,?);"
_querymeta = "SELECT schemaversion, tiprev, tipnode, nbobsmarker, obssize, obskey FROM meta;"
_queryobshash = "SELECT obshash FROM obshashrange WHERE (rev = ? AND idx = ?);"
_query_max_stored = "SELECT MAX(rev) FROM obshashrange"

_reset = "DELETE FROM obshashrange;"
_delete = "DELETE FROM obshashrange WHERE (rev = ? AND idx = ?);"

def _affectedby(repo, markers):
    """return all nodes whose relevant set is affected by this changeset

    This is a reversed version of obsstore.relevantmarkers
    """
    affected_nodes = set()
    known_markers = set(markers)
    node_to_proceed = set()
    marker_to_proceed = set(known_markers)

    obsstore = repo.obsstore

    while node_to_proceed or marker_to_proceed:
        while marker_to_proceed:
            m = marker_to_proceed.pop()
            # check successors and parent
            if m[1]:
                relevant = (m[1], )
            else: # prune case
                relevant = ((m[0], ), m[5])
            for l in relevant:
                if l is None:
                    continue
                for n in l:
                    if n not in affected_nodes:
                        node_to_proceed.add(n)
                    affected_nodes.add(n)
        # marker_to_proceed is now empty:
        if node_to_proceed:
            n = node_to_proceed.pop()
            markers = set()
            markers.update(obsstore.successors.get(n, ()))
            markers.update(obsstore.predecessors.get(n, ()))
            markers -= known_markers
            marker_to_proceed.update(markers)
            known_markers.update(markers)

    return affected_nodes

class _obshashcache(obscache.dualsourcecache):

    _schemaversion = 2

    _cachename = 'evo-ext-obshashrange' # used for error message
    _filename = 'cache/evoext_obshashrange_v2.sqlite'

    def __init__(self, repo):
        super(_obshashcache, self).__init__()
        self._vfs = repo.vfs
        self._path = repo.vfs.join(self._filename)
        self._new = set()
        self._valid = True
        self._repo = weakref.ref(repo.unfiltered())
        # cache status
        self._ondiskcachekey = None
        self._data = {}

    def clear(self, reset=False):
        super(_obshashcache, self).clear(reset=reset)
        self._data.clear()
        self._new.clear()
        if reset:
            self._valid = False
        if '_con' in vars(self):
            del self._con

    def get(self, rangeid):
        # revision should be covered by the tiprev
        #
        # XXX there are issue with cache warming, we hack around it for now
        if not getattr(self, '_updating', False):
            if self._cachekey[0] < rangeid[0]:
                msg = ('using unwarmed obshashrangecache (%s %s)'
                       % (rangeid[0], self._cachekey[0]))
                raise error.ProgrammingError(msg)

        value = self._data.get(rangeid)
        if value is None and self._con is not None:
            nrange = (rangeid[0], rangeid[1])
            obshash = self._con.execute(_queryobshash, nrange).fetchone()
            if obshash is not None:
                value = obshash[0]
            self._data[rangeid] = value
        return value

    def __setitem__(self, rangeid, obshash):
        self._new.add(rangeid)
        self._data[rangeid] = obshash

    def _updatefrom(self, repo, revs, obsmarkers):
        """override this method to update your cache data incrementally

        revs:      list of new revision in the changelog
        obsmarker: list of new obsmarkers in the obsstore
        """
        # XXX for now, we'll not actually update the cache, but we'll be
        # smarter at invalidating it.
        #
        # 1) new revisions does not get their entry updated (not update)
        # 2) if we detect markers affecting non-new revision we reset the cache

        self._updating = True

        con = self._con
        if con is not None:
            max_stored = con.execute(_query_max_stored).fetchall()[0][0]
            affected_nodes = _affectedby(repo, obsmarkers)

            rev = repo.changelog.nodemap.get
            affected = [rev(n) for n in affected_nodes]
            affected = [r for r in affected
                        if r is not None and r <= max_stored]

            if affected:
                repo.ui.log('evoext-cache', 'obshashcache clean - '
                            'new markers affect %d changeset and cached ranges\n'
                            % len(affected))
                if con is not None:
                    # always reset for now, the code detecting affect is buggy
                    # so we need to reset more broadly than we would like.
                    if True or repo.stablerange._con is None:
                        con.execute(_reset)
                    else:
                        ranges = repo.stablerange.contains(repo, affected)
                        con.executemany(_delete, ranges)

                # rewarm key revisions
                #
                # (The current invalidation is too wide, but rewarming every
                # single revision is quite costly)
                newrevs = []
                stop = self._cachekey[0] # tiprev
                for h in repo.filtered('immutable').changelog.headrevs():
                    if h <= stop and h in affected:
                        newrevs.append(h)
                newrevs.extend(revs)
                revs = newrevs

        repo.depthcache.update(repo)
        total = len(revs)

        def progress(pos, rev):
            repo.ui.progress('updating obshashrange cache',
                             pos, 'rev %s' % rev, unit='revision', total=total)
        # warm the cache for the new revs
        progress(0, '')
        for idx, r in enumerate(revs):
            _obshashrange(repo, (r, 0))
            progress(idx, r)
        progress(None, '')

        del self._updating

    @property
    def _fullcachekey(self):
        return (self._schemaversion, ) + self._cachekey

    def load(self, repo):
        if self._con is None:
            self._cachekey = self.emptykey
            self._ondiskcachekey = self.emptykey
        assert self._cachekey is not None

    def _db(self):
        try:
            util.makedirs(self._vfs.dirname(self._path))
        except OSError:
            return None
        con = sqlite3.connect(self._path)
        con.text_factory = str
        return con

    @util.propertycache
    def _con(self):
        if not self._valid:
            return None
        repo = self._repo()
        if repo is None:
            return None
        con = self._db()
        if con is None:
            return None
        cur = con.execute(_queryexist)
        if cur.fetchone() is None:
            self._valid = False
            return None
        meta = con.execute(_querymeta).fetchone()
        if meta is None or meta[0] != self._schemaversion:
            self._valid = False
            return None
        self._cachekey = self._ondiskcachekey = meta[1:]
        return con

    def save(self, repo):
        if self._cachekey is None:
            return
        if self._cachekey == self._ondiskcachekey and not self._new:
            return
        repo = repo.unfiltered()
        try:
            with repo.lock():
                self._save(repo)
        except error.LockError:
            # Exceptionnally we are noisy about it since performance impact
            # is large We should address that before using this more
            # widely.
            msg = _('obshashrange cache: skipping save unable to lock repo\n')
            repo.ui.warn(msg)

    def _save(self, repo):
        if self._con is None:
            util.unlinkpath(self._path, ignoremissing=True)
            if '_con' in vars(self):
                del self._con

            con = self._db()
            if con is None:
                repo.ui.log('evoext-cache', 'unable to write obshashrange cache'
                            ' - cannot create database')
                return
            with con:
                for req in _sqliteschema:
                    con.execute(req)

                con.execute(_newmeta, self._fullcachekey)
        else:
            con = self._con
            if self._ondiskcachekey is not None:
                meta = con.execute(_querymeta).fetchone()
                if meta[1:] != self._ondiskcachekey:
                    # drifting is currently an issue because this means another
                    # process might have already added the cache line we are about
                    # to add. This will confuse sqlite
                    msg = _('obshashrange cache: skipping write, '
                            'database drifted under my feet\n')
                    data = (meta[2], meta[1], self._ondiskcachekey[0], self._ondiskcachekey[1])
                    repo.ui.warn(msg)
                    return
        data = ((rangeid[0], rangeid[1], self.get(rangeid)) for rangeid in self._new)
        con.executemany(_updateobshash, data)
        cachekey = self._fullcachekey
        con.execute(_clearmeta) # remove the older entry
        con.execute(_newmeta, cachekey)
        con.commit()
        self._new.clear()
        self._ondiskcachekey = self._cachekey
        self._valid = True

@eh.wrapfunction(obsolete.obsstore, '_addmarkers')
def _addmarkers(orig, obsstore, *args, **kwargs):
    obsstore.rangeobshashcache.clear()
    return orig(obsstore, *args, **kwargs)

obsstorefilecache = localrepo.localrepository.obsstore


# obsstore is a filecache so we have do to some spacial dancing
@eh.wrapfunction(obsstorefilecache, 'func')
def obsstorewithcache(orig, repo):
    obsstore = orig(repo)
    obsstore.rangeobshashcache = _obshashcache(repo.unfiltered())
    return obsstore

@eh.reposetup
def setupcache(ui, repo):

    class obshashrepo(repo.__class__):
        @localrepo.unfilteredmethod
        def destroyed(self):
            if 'obsstore' in vars(self):
                self.obsstore.rangeobshashcache.clear()
            super(obshashrepo, self).destroyed()

        if util.safehasattr(repo, 'updatecaches'):
            @localrepo.unfilteredmethod
            def updatecaches(self, tr=None, **kwargs):
                if utility.shouldwarmcache(self, tr):
                    self.obsstore.rangeobshashcache.update(self)
                    self.obsstore.rangeobshashcache.save(self)
                super(obshashrepo, self).updatecaches(tr, **kwargs)

        else:
            def transaction(self, *args, **kwargs):
                tr = super(obshashrepo, self).transaction(*args, **kwargs)
                reporef = weakref.ref(self)

                def _warmcache(tr):
                    repo = reporef()
                    if repo is None:
                        return
                    repo = repo.unfiltered()
                    repo.obsstore.rangeobshashcache.update(repo)
                    repo.obsstore.rangeobshashcache.save(repo)

                if utility.shouldwarmcache(self, tr):
                    tr.addpostclose('warmcache-20obshashrange', _warmcache)
                return tr

    repo.__class__ = obshashrepo

### wire protocol commands

def _obshashrange_v0(repo, ranges):
    """return a list of hash from a list of range

    The range have the id encoded as a node

    return 'wdirid' for unknown range"""
    nm = repo.changelog.nodemap
    ranges = [(nm.get(n), idx) for n, idx in ranges]
    if ranges:
        maxrev = max(r for r, i in ranges)
        if maxrev is not None:
            repo.stablerange.warmup(repo, upto=maxrev)
    result = []
    repo.obsstore.rangeobshashcache.update(repo)
    for r in ranges:
        if r[0] is None:
            result.append(node.wdirid)
        else:
            result.append(_obshashrange(repo, r))
    repo.obsstore.rangeobshashcache.save(repo)
    return result

@eh.addattr(localrepo.localpeer, 'evoext_obshashrange_v1')
def local_obshashrange_v0(peer, ranges):
    return _obshashrange_v0(peer._repo, ranges)


_indexformat = '>I'
_indexsize = _calcsize(_indexformat)
def _encrange(node_rangeid):
    """encode a (node) range"""
    headnode, index = node_rangeid
    return headnode + _pack(_indexformat, index)

def _decrange(data):
    """encode a (node) range"""
    assert _indexsize < len(data), len(data)
    headnode = data[:-_indexsize]
    index = _unpack(_indexformat, data[-_indexsize:])[0]
    return (headnode, index)

@eh.addattr(wirepeer, 'evoext_obshashrange_v1')
def peer_obshashrange_v0(self, ranges):
    binranges = [_encrange(r) for r in ranges]
    encranges = encodelist(binranges)
    d = self._call("evoext_obshashrange_v1", ranges=encranges)
    try:
        return decodelist(d)
    except ValueError:
        self._abort(error.ResponseError(_("unexpected response:"), d))

@compat.wireprotocommand(eh, 'evoext_obshashrange_v1', 'ranges')
def srv_obshashrange_v1(repo, proto, ranges):
    ranges = decodelist(ranges)
    ranges = [_decrange(r) for r in ranges]
    hashes = _obshashrange_v0(repo, ranges)
    return encodelist(hashes)

def _useobshashrange(repo):
    base = repo.ui.configbool('experimental', 'obshashrange', False)
    if base:
        maxrevs = repo.ui.configint('experimental', 'obshashrange.max-revs', None)
        if maxrevs is not None and maxrevs < len(repo.unfiltered()):
            base = False
    return base

def _canobshashrange(local, remote):
    return (_useobshashrange(local)
            and remote.capable('_evoext_obshashrange_v1'))

def _obshashrange_capabilities(orig, repo, proto):
    """wrapper to advertise new capability"""
    caps = orig(repo, proto)
    enabled = _useobshashrange(repo)
    if obsolete.isenabled(repo, obsolete.exchangeopt) and enabled:

        # Compat hg 4.6+ (2f7290555c96)
        bytesresponse = False
        if util.safehasattr(caps, 'data'):
            bytesresponse = True
            caps = caps.data

        caps = caps.split()
        caps.append(b'_evoext_obshashrange_v1')
        caps.sort()
        caps = b' '.join(caps)

        # Compat hg 4.6+ (2f7290555c96)
        if bytesresponse:
            caps = wireprototypes.bytesresponse(caps)
    return caps

@eh.extsetup
def obshashrange_extsetup(ui):
    ###
    extensions.wrapfunction(wireprotov1server, 'capabilities',
                            _obshashrange_capabilities)
    # wrap command content
    oldcap, args = wireprotov1server.commands['capabilities']

    def newcap(repo, proto):
        return _obshashrange_capabilities(oldcap, repo, proto)
    wireprotov1server.commands['capabilities'] = (newcap, args)

#############################
### Tree Hash computation ###
#############################

# Dash computed from a given changesets using all markers relevant to it and
# the obshash of its parents.  This is similar to what happend for changeset
# node where the parent is used in the computation

def _canobshashtree(repo, remote):
    return remote.capable('_evoext_obshash_0')

@eh.command(
    'debugobsrelsethashtree',
    [('', 'v0', None, 'hash on marker format "0"'),
     ('', 'v1', None, 'hash on marker format "1" (default)')], _(''))
def debugobsrelsethashtree(ui, repo, v0=False, v1=False):
    """display Obsolete markers, Relevant Set, Hash Tree
    changeset-node obsrelsethashtree-node

    It computed form the "orsht" of its parent and markers
    relevant to the changeset itself."""
    if v0 and v1:
        raise error.Abort('cannot only specify one format')
    elif v0:
        treefunc = _obsrelsethashtreefm0
    else:
        treefunc = _obsrelsethashtreefm1

    for chg, obs in treefunc(repo):
        ui.status('%s %s\n' % (node.hex(chg), node.hex(obs)))

def _obsrelsethashtreefm0(repo):
    return _obsrelsethashtree(repo, obsolete._fm0encodeonemarker)

def _obsrelsethashtreefm1(repo):
    return _obsrelsethashtree(repo, obsolete._fm1encodeonemarker)

def _obsrelsethashtree(repo, encodeonemarker):
    cache = []
    unfi = repo.unfiltered()
    markercache = {}
    repo.ui.progress(_("preparing locally"), 0, total=len(unfi),
                     unit=_("changesets"))
    for i in unfi:
        ctx = unfi[i]
        entry = 0
        sha = hashlib.sha1()
        # add data from p1
        for p in ctx.parents():
            p = p.rev()
            if p < 0:
                p = node.nullid
            else:
                p = cache[p][1]
            if p != node.nullid:
                entry += 1
                sha.update(p)
        tmarkers = repo.obsstore.relevantmarkers([ctx.node()])
        if tmarkers:
            bmarkers = []
            for m in tmarkers:
                if m not in markercache:
                    markercache[m] = encodeonemarker(m)
                bmarkers.append(markercache[m])
            bmarkers.sort()
            for m in bmarkers:
                entry += 1
                sha.update(m)
        if entry:
            cache.append((ctx.node(), sha.digest()))
        else:
            cache.append((ctx.node(), node.nullid))
        repo.ui.progress(_("preparing locally"), i, total=len(unfi),
                         unit=_("changesets"))
    repo.ui.progress(_("preparing locally"), None)
    return cache

def _obshash(repo, nodes, version=0):
    if version == 0:
        hashs = _obsrelsethashtreefm0(repo)
    elif version == 1:
        hashs = _obsrelsethashtreefm1(repo)
    else:
        assert False
    nm = repo.changelog.nodemap
    revs = [nm.get(n) for n in nodes]
    return [r is None and node.nullid or hashs[r][1] for r in revs]

@eh.addattr(localrepo.localpeer, 'evoext_obshash')
def local_obshash(peer, nodes):
    return _obshash(peer._repo, nodes)

@eh.addattr(localrepo.localpeer, 'evoext_obshash1')
def local_obshash1(peer, nodes):
    return _obshash(peer._repo, nodes, version=1)

@eh.addattr(wirepeer, 'evoext_obshash')
def peer_obshash(self, nodes):
    d = self._call("evoext_obshash", nodes=encodelist(nodes))
    try:
        return decodelist(d)
    except ValueError:
        self._abort(error.ResponseError(_("unexpected response:"), d))

@eh.addattr(wirepeer, 'evoext_obshash1')
def peer_obshash1(self, nodes):
    d = self._call("evoext_obshash1", nodes=encodelist(nodes))
    try:
        return decodelist(d)
    except ValueError:
        self._abort(error.ResponseError(_("unexpected response:"), d))

@compat.wireprotocommand(eh, 'evoext_obshash', 'nodes')
def srv_obshash(repo, proto, nodes):
    return encodelist(_obshash(repo, decodelist(nodes)))

@compat.wireprotocommand(eh, 'evoext_obshash1', 'nodes')
def srv_obshash1(repo, proto, nodes):
    return encodelist(_obshash(repo, decodelist(nodes),
                      version=1))

def _obshash_capabilities(orig, repo, proto):
    """wrapper to advertise new capability"""
    caps = orig(repo, proto)
    if (obsolete.isenabled(repo, obsolete.exchangeopt)
        and repo.ui.configbool('experimental', 'evolution.obsdiscovery', True)):

        # Compat hg 4.6+ (2f7290555c96)
        bytesresponse = False
        if util.safehasattr(caps, 'data'):
            bytesresponse = True
            caps = caps.data

        caps = caps.split()
        caps.append(b'_evoext_obshash_0')
        caps.append(b'_evoext_obshash_1')
        caps.sort()
        caps = b' '.join(caps)

        # Compat hg 4.6+ (2f7290555c96)
        if bytesresponse:
            caps = wireprototypes.bytesresponse(caps)
    return caps

@eh.extsetup
def obshash_extsetup(ui):
    extensions.wrapfunction(wireprotov1server, 'capabilities',
                            _obshash_capabilities)
    # wrap command content
    oldcap, args = wireprotov1server.commands['capabilities']

    def newcap(repo, proto):
        return _obshash_capabilities(oldcap, repo, proto)
    wireprotov1server.commands['capabilities'] = (newcap, args)

##########################################
###  trigger discovery during exchange ###
##########################################

def _dopushmarkers(pushop):
    return (# we have any markers to push
            pushop.repo.obsstore
            # exchange of obsmarkers is enabled locally
            and obsolete.isenabled(pushop.repo, obsolete.exchangeopt)
            # remote server accept markers
            and 'obsolete' in pushop.remote.listkeys('namespaces'))

def _pushobshashrange(pushop, commonrevs):
    repo = pushop.repo.unfiltered()
    remote = pushop.remote
    missing = findmissingrange(pushop.ui, repo, remote, commonrevs)
    missing += pushop.outgoing.missing
    return missing

def _pushobshashtree(pushop, commonrevs):
    repo = pushop.repo.unfiltered()
    remote = pushop.remote
    node = repo.changelog.node
    common = findcommonobsmarkers(pushop.ui, repo, remote, commonrevs)
    revs = list(repo.revs('only(%ln, %ln)', pushop.futureheads, common))
    return [node(r) for r in revs]

# available discovery method, first valid is used
# tuple (canuse, perform discovery))
obsdiscoveries = [
    (_canobshashrange, _pushobshashrange),
    (_canobshashtree, _pushobshashtree),
]

obsdiscovery_skip_message = """\
(skipping discovery of obsolescence markers, will exchange everything)
(controled by 'experimental.evolution.obsdiscovery' configuration)
"""

def usediscovery(repo):
    return repo.ui.configbool('experimental', 'evolution.obsdiscovery', True)

@eh.wrapfunction(exchange, '_pushdiscoveryobsmarkers')
def _pushdiscoveryobsmarkers(orig, pushop):
    if _dopushmarkers(pushop):
        repo = pushop.repo
        remote = pushop.remote
        obsexcmsg(repo.ui, "computing relevant nodes\n")
        revs = list(repo.revs('::%ln', pushop.futureheads))
        unfi = repo.unfiltered()

        if not usediscovery(repo):
            # discovery disabled by user
            repo.ui.status(obsdiscovery_skip_message)
            return orig(pushop)

        # look for an obs-discovery protocol we can use
        discovery = None
        for candidate in obsdiscoveries:
            if candidate[0](repo, remote):
                discovery = candidate[1]
                break

        if discovery is None:
            # no discovery available, rely on core to push all relevants
            # obs markers.
            return orig(pushop)

        obsexcmsg(repo.ui, "looking for common markers in %i nodes\n"
                           % len(revs))
        commonrevs = list(unfi.revs('::%ln', pushop.outgoing.commonheads))
        # find the nodes where the relevant obsmarkers mismatches
        nodes = discovery(pushop, commonrevs)

        if nodes:
            obsexcmsg(repo.ui, "computing markers relevant to %i nodes\n"
                               % len(nodes))
            pushop.outobsmarkers = repo.obsstore.relevantmarkers(nodes)
        else:
            obsexcmsg(repo.ui, "markers already in sync\n")
            pushop.outobsmarkers = []

@eh.extsetup
def _installobsmarkersdiscovery(ui):
    olddisco = exchange.pushdiscoverymapping['obsmarker']

    def newdisco(pushop):
        _pushdiscoveryobsmarkers(olddisco, pushop)
    exchange.pushdiscoverymapping['obsmarker'] = newdisco

def buildpullobsmarkersboundaries(pullop, bundle2=True):
    """small function returning the argument for pull markers call
    may to contains 'heads' and 'common'. skip the key for None.

    It is a separed function to play around with strategy for that."""
    repo = pullop.repo
    remote = pullop.remote
    unfi = repo.unfiltered()
    revs = unfi.revs('::(%ln - null)', pullop.common)
    boundaries = {'heads': pullop.pulledsubset}
    if not revs: # nothing common
        boundaries['common'] = [node.nullid]
        return boundaries

    if not usediscovery(repo):
        # discovery disabled by users.
        repo.ui.status(obsdiscovery_skip_message)
        boundaries['common'] = [node.nullid]
        return boundaries

    if bundle2 and _canobshashrange(repo, remote):
        obsexcmsg(repo.ui, "looking for common markers in %i nodes\n"
                  % len(revs))
        boundaries['missing'] = findmissingrange(repo.ui, repo, pullop.remote,
                                                 revs)
    elif remote.capable('_evoext_obshash_0'):
        obsexcmsg(repo.ui, "looking for common markers in %i nodes\n"
                           % len(revs))
        boundaries['common'] = findcommonobsmarkers(repo.ui, repo, remote, revs)
    else:
        boundaries['common'] = [node.nullid]
    return boundaries

# merge later for outer layer wrapping
eh.merge(stablerangecache.eh)