view hgext3rd/evolve/stablesort.py @ 3352:0370c8503e2f

stablesort: cleanup the update logic
author Pierre-Yves David <pierre-yves.david@octobus.net>
date Thu, 21 Dec 2017 04:23:45 +0100
parents fd90e73bf79a
children 81aae43ee0f1
line wrap: on
line source

# Code dedicated to the computation of stable sorting
#
# These stable sorting are used stable ranges
#
# Copyright 2017 Pierre-Yves David <pierre-yves.david@ens-lyon.org>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.

import array
import collections
import struct
import weakref

from mercurial import (
    commands,
    cmdutil,
    localrepo,
    error,
    node as nodemod,
    scmutil,
    util,
)

from mercurial.i18n import _

from . import (
    depthcache,
    exthelper,
    utility,
    genericcaches,
)

filterparents = utility.filterparents

eh = exthelper.exthelper()
eh.merge(depthcache.eh)

def _mergepoint_tie_breaker(repo):
    """the key use to tie break merge parent

    Exists as a function to help playing with different approaches.

    Possible other factor are:
        * depth of node,
        * number of exclusive merges,
        * number of jump points.
        * <insert-your-idea>
    """
    node = repo.changelog.node
    depth = repo.depthcache.get

    def key(rev):
        return (-depth(rev), node(rev))
    return key

@eh.command(
    'debugstablesort',
    [
        ('r', 'rev', [], 'heads to start from'),
        ('', 'method', 'branchpoint', "method used for sorting, one of: "
         "branchpoint, basic-mergepoint and basic-headstart"),
        ('l', 'limit', '', 'number of revision display (default to all)')
    ] + commands.formatteropts,
    _(''))
def debugstablesort(ui, repo, **opts):
    """display the ::REVS set topologically sorted in a stable way
    """
    revs = scmutil.revrange(repo, opts['rev'])

    method = opts['method']
    sorting = _methodmap.get(method)
    if sorting is None:
        valid_method = ', '.join(sorted(_methodmap))
        raise error.Abort('unknown sorting method: "%s"' % method,
                          hint='pick one of: %s' % valid_method)

    displayer = cmdutil.show_changeset(ui, repo, opts, buffered=True)
    kwargs = {}
    if opts['limit']:
        kwargs['limit'] = int(opts['limit'])
    for r in sorting(repo, revs, **kwargs):
        ctx = repo[r]
        displayer.show(ctx)
        displayer.flush(ctx)
    displayer.close()

def stablesort_branchpoint(repo, revs, mergecallback=None):
    """return '::revs' topologically sorted in "stable" order

    This is a depth first traversal starting from 'nullrev', using node as a
    tie breaker.
    """
    # Various notes:
    #
    # * Bitbucket is used dates as tie breaker, that might be a good idea.
    #
    # * It seemds we can traverse in the same order from (one) head to bottom,
    #   if we the following record data for each merge:
    #
    #  - highest (stablesort-wise) common ancestors,
    #  - order of parents (tablesort-wise)
    cl = repo.changelog
    parents = cl.parentrevs
    nullrev = nodemod.nullrev
    n = cl.node
    # step 1: We need a parents -> children mapping for 2 reasons.
    #
    # * we build the order from nullrev to tip
    #
    # * we need to detect branching
    children = collections.defaultdict(list)
    for r in cl.ancestors(revs, inclusive=True):
        ps = filterparents(parents(r))
        if not ps:
            children[nullrev].append(r)
        for p in ps:
            children[p].append(r)
    # step two: walk back up
    # * pick lowest node in case of branching
    # * stack disregarded part of the branching
    # * process merge when both parents are yielded

    # track what changeset has been
    seen = [0] * (max(revs) + 2)
    seen[nullrev] = True # nullrev is known
    # starts from repository roots
    # reuse the list form the mapping as we won't need it again anyway
    stack = children[nullrev]
    if not stack:
        return []
    if 1 < len(stack):
        stack.sort(key=n, reverse=True)

    # list of rev, maybe we should yield, but since we built a children mapping we are 'O(N)' already
    result = []

    current = stack.pop()
    while current is not None or stack:
        if current is None:
            # previous iteration reached a merge or an unready merge,
            current = stack.pop()
            if seen[current]:
                current = None
                continue
        ps = filterparents(parents(current))
        if not all(seen[p] for p in ps):
            # we can't iterate on this merge yet because other child is not
            # yielded yet (and we are topo sorting) we can discard it for now
            # because it will be reached from the other child.
            current = None
            continue
        assert not seen[current]
        seen[current] = True
        result.append(current) # could be yield, cf earlier comment
        if mergecallback is not None and 2 <= len(ps):
            mergecallback(result, current)
        cs = children[current]
        if not cs:
            current = None
        elif 1 == len(cs):
            current = cs[0]
        else:
            cs.sort(key=n, reverse=True)
            current = cs.pop() # proceed on smallest
            stack.extend(cs)   # stack the rest for later
    assert len(result) == len(set(result))
    return result

def stablesort_mergepoint_multirevs(repo, revs):
    """return '::revs' topologically sorted in "stable" order

    This is a depth first traversal starting from 'revs' (toward root), using node as a
    tie breaker.
    """
    cl = repo.changelog
    tiebreaker = _mergepoint_tie_breaker(repo)
    if not revs:
        return []
    elif len(revs) == 1:
        heads = list(revs)
    else:
        # keeps heads only
        heads = sorted(repo.revs('heads(%ld::%ld)', revs, revs), key=tiebreaker)

    results = []
    while heads:
        h = heads.pop()
        if revs:
            bound = cl.findmissingrevs(common=heads, heads=[h])
        else:
            bound = cl.ancestors([h], inclusive=True)
        results.append(stablesort_mergepoint_bounded(repo, h, bound))
    if len(results) == 1:
        return results[0]
    finalresults = []
    for r in results[::-1]:
        finalresults.extend(r)
    return finalresults

def stablesort_mergepoint_bounded(repo, head, revs):
    """return 'revs' topologically sorted in "stable" order.

    The 'revs' set MUST have 'head' as its one and unique head.
    """
    # Various notes:
    #
    # * Bitbucket is using dates as tie breaker, that might be a good idea.
    cl = repo.changelog
    parents = cl.parentrevs
    nullrev = nodemod.nullrev
    tiebreaker = _mergepoint_tie_breaker(repo)
    # step 1: We need a parents -> children mapping to detect dependencies
    children = collections.defaultdict(set)
    parentmap = {}
    for r in revs:
        ps = filterparents(parents(r))
        if 2 <= len(ps):
            ps = tuple(sorted(ps, key=tiebreaker))
        parentmap[r] = ps
        for p in ps:
            children[p].add(r)
        if not ps:
            children[nullrev].add(r)
    # step two: walk again,
    stack = [head]
    resultset = set()
    result = []

    def add(current):
        resultset.add(current)
        result.append(current)

    while stack:
        current = stack.pop()
        add(current)
        parents = parentmap[current]
        for p in parents:
            if 1 < len(children[p]) and not children[p].issubset(resultset):
                # we need other children to be yield first
                continue
            if p in revs:
                stack.append(p)

    result.reverse()
    assert len(result) == len(resultset)
    return result

def stablesort_mergepoint_head_basic(repo, revs, limit=None):
    heads = repo.revs('heads(%ld)', revs)
    if not heads:
        return []
    elif 2 < len(heads):
        raise error.Abort('cannot use head based merging, %d heads found'
                          % len(heads))
    head = heads.first()
    revs = stablesort_mergepoint_bounded(repo, head, repo.revs('::%d', head))
    if limit is None:
        return revs
    return revs[-limit:]

def stablesort_mergepoint_head_debug(repo, revs, limit=None):
    heads = repo.revs('heads(%ld)', revs)
    if not heads:
        return []
    elif 2 < len(heads):
        raise error.Abort('cannot use head based merging, %d heads found'
                          % len(heads))
    head = heads.first()
    revs = stablesort_mergepoint_head(repo, head)
    if limit is None:
        return revs
    return revs[-limit:]

def stablesort_mergepoint_head(repo, head):
    """return '::rev' topologically sorted in "stable" order

    This is a depth first traversal starting from 'rev' (toward root), using node as a
    tie breaker.
    """
    cl = repo.changelog
    parents = cl.parentrevs
    tiebreaker = _mergepoint_tie_breaker(repo)

    top = [head]
    mid = []
    bottom = []

    ps = filterparents(parents(head))
    while len(ps) == 1:
        top.append(ps[0])
        ps = filterparents(parents(ps[0]))
    top.reverse()
    if len(ps) == 2:
        ps = sorted(ps, key=tiebreaker)

        # get the part from the highest parent. This is the part that changes
        mid_revs = repo.revs('only(%d, %d)', ps[1], ps[0])
        if mid_revs:
            mid = stablesort_mergepoint_bounded(repo, ps[1], mid_revs)

        # And follow up with part othe parent we can inherit from
        bottom = stablesort_mergepoint_head(repo, ps[0])

    return bottom + mid + top

def stablesort_mergepoint_head_cached(repo, revs, limit=None):
    heads = repo.revs('heads(%ld)', revs)
    if not heads:
        return []
    elif 2 < len(heads):
        raise error.Abort('cannot use head based merging, %d heads found'
                          % len(heads))
    head = heads.first()
    cache = stablesortcache()
    first = list(cache.get(repo, head, limit=limit))
    second = list(cache.get(repo, head, limit=limit))
    if first != second:
        repo.ui.warn('stablesort-cache: initial run different from re-run:\n'
                     '    %s\n'
                     '    %s\n' % (first, second))
    return second

class stablesortcache(object):

    def __init__(self):
        self._jumps = {}
        super(stablesortcache, self).__init__()

    def get(self, repo, rev, limit=None):
        result = []
        for r in self.walkfrom(repo, rev):
            result.append(r)
            if limit is not None and limit <= len(result):
                break
        result.reverse()
        return result

    def getjumps(self, repo, rev):
        if not self._hasjumpsdata(rev):
            parents = repo.changelog.parentrevs(rev)
            if parents[1] == nodemod.nullrev:
                self._setjumps(rev, None)
            else:
                # merge ! warn the cache
                tiebreaker = _mergepoint_tie_breaker(repo)
                minparent = sorted(parents, key=tiebreaker)[0]
                for r in self.walkfrom(repo, rev):
                    if r == minparent:
                        break
        return self._getjumps(rev)

    def _hasjumpsdata(self, rev):
        return rev in self._jumps

    def _getjumps(self, rev):
        return self._jumps.get(rev)

    def _setjumps(self, rev, jumps):
        self._jumps[rev] = jumps

    def walkfrom(self, repo, head):
        tiebreaker = _mergepoint_tie_breaker(repo)
        cl = repo.changelog
        parentsfunc = cl.parentrevs

        def parents(rev):
            return filterparents(parentsfunc(rev))

        def oneparent(rev):
            ps = parents(rev)
            if not ps:
                return None
            if len(ps) == 1:
                return ps[0]
            return max(ps, key=tiebreaker)

        current = head
        previous_current_1 = object()
        previous_current_2 = object()

        while current is not None:
            previous_current_2 = previous_current_1
            previous_current_1 = current
            assert previous_current_1 is not previous_current_2

            jumps = self._getjumps(current)
            if jumps is not None:
                # we have enough cached information to directly iterate over
                # the exclusive size.
                for j in jumps:
                    jump_point = j[0]
                    jump_dest = j[1]
                    if current == jump_point:
                        yield current
                    else:
                        while current != jump_point:
                            yield current
                            current = oneparent(current)
                            assert current is not None
                        yield current
                    current = jump_dest
                continue

            yield current

            ps = parents(current)
            if not ps:
                current = None # break
            if len(ps) == 1:
                current = ps[0]
            elif len(ps) == 2:
                lower_parent, higher_parent = sorted(ps, key=tiebreaker)

                rev = current
                jumps = []

                def recordjump(source, destination, size):
                    jump = (source, destination, size)
                    jumps.append(jump)
                process = self._process_exclusive_side
                for rev in process(lower_parent, higher_parent, cl, parents,
                                   tiebreaker, recordjump):
                    yield rev

                if rev == current:
                    recordjump(rev, lower_parent, 1)

                self._setjumps(current, jumps)

                current = lower_parent

    def _process_exclusive_side(self, lower, higher, cl, parents, tiebreaker,
                                recordjump):

        exclusive = cl.findmissingrevs(common=[lower], heads=[higher])

        def popready(stack):
            """pop the top most ready item in the list"""
            for idx in xrange(len(stack) - 1, -1, -1):
                if children[stack[idx]].issubset(seen):
                    return stack.pop(idx)
            return None

        hardstack = []
        previous = None
        seen = set()
        current = higher
        children = collections.defaultdict(set)
        bound = set(exclusive)
        if exclusive:
            for r in exclusive:
                for p in parents(r):
                    children[p].add(r)

            hardstack.append(higher)
        nextjump = False
        size = 1 # take the merge point into account
        while hardstack:
            current = popready(hardstack)
            if current in seen:
                continue
            softstack = []
            while current in bound and current not in seen:
                if nextjump:
                    recordjump(previous, current, size)
                    nextjump = False
                    size = 0
                yield current
                size += 1
                previous = current
                seen.add(current)

                all_parents = parents(current)

                # search or next parent to walk through
                fallback, next = None, None
                if 1 == len(all_parents):
                    next = all_parents[0]
                elif 2 <= len(all_parents):
                    fallback, next = sorted(all_parents, key=tiebreaker)

                # filter parent not ready (children not emitted)
                while next is not None and not children[next].issubset(seen):
                    nextjump = True
                    next = fallback
                    fallback = None

                # stack management
                if next is None:
                    next = popready(softstack)
                    if next is not None:
                        nextjump = True
                elif fallback is not None:
                    softstack.append(fallback)

                # get ready for next iteration
                current = next

            # any in processed head has to go in the hard stack
            nextjump = True
            hardstack.extend(softstack)

        if previous is not None:
            recordjump(previous, lower, size)

def stablesort_mergepoint_head_ondisk(repo, revs, limit=None):
    heads = repo.revs('heads(%ld)', revs)
    if not heads:
        return []
    elif 2 < len(heads):
        raise error.Abort('cannot use head based merging, %d heads found'
                          % len(heads))
    head = heads.first()
    unfi = repo.unfiltered()
    cache = unfi.stablesort
    cache.save(unfi)
    return cache.get(repo, head, limit=limit)

S_INDEXSIZE = struct.Struct('>I')

class ondiskstablesortcache(stablesortcache, genericcaches.changelogsourcebase):

    _filepath = 'evoext-stablesortcache-00'
    _cachename = 'evo-ext-stablesort'

    def __init__(self):
        super(ondiskstablesortcache, self).__init__()
        self._index = array.array('l')
        self._data = array.array('l')
        del self._jumps

    def getjumps(self, repo, rev):
        if len(self._index) < rev:
            msg = 'stablesortcache must be warmed before use (%d < %d)'
            msg %= (len(self._index), rev)
            raise error.ProgrammingError(msg)
        return self._getjumps(rev)

    def _getjumps(self, rev):
        # very first revision
        if rev == 0:
            return None
        # no data yet
        if len(self._index) <= rev:
            return None
        index = self._index
        # non merge revision
        if index[rev] == index[rev - 1]:
            return None
        data = self._data
        # merge revision

        def jumps():
            for idx in xrange(index[rev - 1], index[rev]):
                i = idx * 3
                yield tuple(data[i:i + 3])
        return jumps()

    def _setjumps(self, rev, jumps):
        assert len(self._index) == rev, (len(self._index), rev)
        if rev == 0:
            self._index.append(0)
            return
        end = self._index[rev - 1]
        if jumps is None:
            self._index.append(end)
            return
        assert len(self._data) == end * 3, (len(self._data), end)
        for j in jumps:
            self._data.append(j[0])
            self._data.append(j[1])
            self._data.append(j[2])
            end += 1
        self._index.append(end)

    def _updatefrom(self, repo, data):
        repo = repo.unfiltered()

        total = len(data)

        def progress(pos, rev):
            repo.ui.progress('updating stablesort cache',
                             pos, 'rev %s' % rev, unit='revision', total=total)

        progress(0, '')
        for idx, rev in enumerate(data):
            parents = repo.changelog.parentrevs(rev)
            if parents[1] == nodemod.nullrev:
                self._setjumps(rev, None)
            else:
                # merge! warn the cache
                tiebreaker = _mergepoint_tie_breaker(repo)
                minparent = sorted(parents, key=tiebreaker)[0]
                for r in self.walkfrom(repo, rev):
                    if r == minparent:
                        break
            if not (idx % 1000): # progress as a too high performance impact
                progress(idx, rev)
        progress(None, '')

    def clear(self, reset=False):
        super(ondiskstablesortcache, self).clear()
        self._index = array.array('l')
        self._data = array.array('l')

    def load(self, repo):
        """load data from disk

        (crude version, read everything all the time)
        """
        assert repo.filtername is None

        data = repo.cachevfs.tryread(self._filepath)
        self._index = array.array('l')
        self._data = array.array('l')
        if not data:
            self._cachekey = self.emptykey
        else:
            headerdata = data[:self._cachekeysize]
            self._cachekey = self._deserializecachekey(headerdata)
            offset = self._cachekeysize
            indexsizedata = data[offset:offset + S_INDEXSIZE.size]
            indexsize = S_INDEXSIZE.unpack(indexsizedata)[0]
            offset += S_INDEXSIZE.size
            self._index.fromstring(data[offset:offset + indexsize])
            offset += indexsize
            self._data.fromstring(data[offset:])
        self._ondiskkey = self._cachekey
        pass

    def save(self, repo):
        """save the data to disk

        (crude version, rewrite everything all the time)
        """
        if self._cachekey is None or self._cachekey == self._ondiskkey:
            return
        cachefile = repo.cachevfs(self._filepath, 'w', atomictemp=True)

        # data to write
        headerdata = self._serializecachekey()
        indexdata = self._index.tostring()
        data = self._data.tostring()
        indexsize = S_INDEXSIZE.pack(len(indexdata))

        # writing
        cachefile.write(headerdata)
        cachefile.write(indexsize)
        cachefile.write(indexdata)
        cachefile.write(data)
        cachefile.close()

@eh.reposetup
def setupcache(ui, repo):

    class stablesortrepo(repo.__class__):

        @localrepo.unfilteredpropertycache
        def stablesort(self):
            cache = ondiskstablesortcache()
            cache.update(self)
            return cache

        @localrepo.unfilteredmethod
        def destroyed(self):
            if 'stablesort' in vars(self):
                self.stablesort.clear()
            super(stablesortrepo, self).destroyed()

        if util.safehasattr(repo, 'updatecaches'):
            @localrepo.unfilteredmethod
            def updatecaches(self, tr=None):
                if utility.shouldwarmcache(self):
                    self.stablesort.update(self)
                    self.stablesort.save(self)
                super(stablesortrepo, self).updatecaches(tr)

        else:
            def transaction(self, *args, **kwargs):
                tr = super(stablesortrepo, self).transaction(*args, **kwargs)
                reporef = weakref.ref(self)

                def _warmcache(tr):
                    repo = reporef()
                    if repo is None:
                        return
                    repo = repo.unfiltered()
                    repo.stablesort.update(repo)
                    repo.stablesort.save(repo)

                if utility.shouldwarmcache(self):
                    tr.addpostclose('warmcache-02stablesort', _warmcache)
                return tr

    repo.__class__ = stablesortrepo

_methodmap = {
    'branchpoint': stablesort_branchpoint,
    'basic-mergepoint': stablesort_mergepoint_multirevs,
    'basic-headstart': stablesort_mergepoint_head_basic,
    'headstart': stablesort_mergepoint_head_debug,
    'headcached': stablesort_mergepoint_head_cached,
    'headondisk': stablesort_mergepoint_head_ondisk,
}