view hgext3rd/evolve/utility.py @ 6099:435dfa125d89

picksplitsuccessor: refactor and rename this method for better quality This patch modify the prompt message and make it more generic by removing the `evolvecand` param, which would help us in using this method at other places as well.
author Sushil khanchi <sushilkhanchi97@gmail.com>
date Tue, 27 Jul 2021 23:32:47 +0530
parents 6f6b6218b6fe
children 17ffdea0edbb
line wrap: on
line source

# Various utility function for the evolve extension
#
# Copyright 2017 Pierre-Yves David <pierre-yves.david@ens-lyon.org>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.

from mercurial import (
    obsutil,
)

from mercurial.i18n import _

from mercurial.node import nullrev

shorttemplate = b"[{label('evolve.rev', rev)}] {desc|firstline}\n"
stacktemplate = b"""[{label('evolve.rev', if(topicidx, "s{topicidx}", rev))}] {desc|firstline}\n"""

def obsexcmsg(ui, message, important=False):
    verbose = ui.configbool(b'experimental', b'verbose-obsolescence-exchange')
    if verbose:
        message = b'OBSEXC: ' + message
    if important or verbose:
        ui.status(message)

def filterparents(parents):
    """filter nullrev parents

    (and other crazyness)"""
    p1, p2 = parents
    if p1 == nullrev and p2 == nullrev:
        return ()
    elif p1 != nullrev and (p2 == nullrev or p1 == p2):
        return (p1,)
    elif p1 == nullrev and p2 != nullrev:
        return (p2,)
    else:
        return parents

def shouldwarmcache(repo, tr):
    configbool = repo.ui.configbool
    config = repo.ui.config
    desc = getattr(tr, 'desc', b'')

    autocase = False
    if tr is None and not getattr(repo, '_destroying', False):
        autocase = True
    elif desc.startswith(b'serve'):
        autocase = True
    elif desc.startswith(b'push') and not desc.startswith(b'push-response'):
        autocase = True

    autocache = config(b'experimental', b'obshashrange.warm-cache',
                       b'auto') == b'auto'
    if autocache:
        warm = autocase
    else:
        # note: we should not get to the default case
        warm = configbool(b'experimental', b'obshashrange.warm-cache')
    if not configbool(b'experimental', b'obshashrange'):
        return False
    if not warm:
        return False
    maxrevs = repo.ui.configint(b'experimental', b'obshashrange.max-revs')
    if maxrevs is not None and maxrevs < len(repo.unfiltered()):
        return False
    return True

class MultipleSuccessorsError(RuntimeError):
    """Exception raised by _singlesuccessor() when multiple successor sets exists

    Attributes:
        successorssets    the list of successorssets to call to easily recover
        divergenceflag    indicate that changeset has divergent rewriting
        splitflag         indicate that changeset was split
    """

    def __init__(self, successorssets):
        self.successorssets = successorssets
        self.divergenceflag = len(successorssets) > 1
        self.splitflag = len(successorssets[0]) > 1

def builddependencies(repo, revs):
    """returns dependency graphs giving an order to solve instability of revs
    (see _orderrevs for more information on usage)"""

    # For each troubled revision we keep track of what instability if any should
    # be resolved in order to resolve it. Example:
    # dependencies = {3: [6], 6:[]}
    # Means that: 6 has no dependency, 3 depends on 6 to be solved
    dependencies = {}

    for r in revs:
        dependencies[r] = set()
        for p in repo[r].parents():
            for succ in _successorrevs(repo, p):
                if succ in revs:
                    dependencies[r].add(succ)

    # rdependencies is the inverted dict of dependencies
    rdependencies = {r: set() for r in revs}
    for r, deps in dependencies.items():
        for dep in deps:
            rdependencies[dep].add(r)

    return dependencies, rdependencies

def _singlesuccessor(repo, p):
    """returns p (as rev) if not obsolete or its unique latest successors

    fail if there are no such successor"""

    if not p.obsolete():
        return p.rev()
    obs = repo[p]
    ui = repo.ui
    cache = {}
    newer = obsutil.successorssets(repo, obs.node(), cache=cache)
    # search of a parent which is not killed
    while not newer:
        ui.debug(b"stabilize target %s is plain dead,"
                 b" trying to stabilize on its parent\n" %
                 obs)
        obs = obs.p1()
        newer = obsutil.successorssets(repo, obs.node(), cache=cache)
    if len(newer) > 1 or len(newer[0]) > 1:
        raise MultipleSuccessorsError(newer)

    return repo[newer[0][0]].rev()

def select_split_successor(ui, repo, ctx):
    """Return most suitable split successor of <ctx> for evolution.

    If all successors are on one topological branch, we choose tipmost successor.
    Otherwise, we ask user to choose an evolve destination.

    Return ``None`` if no successor selected.
    """
    targets = obsutil.successorssets(repo, ctx.node())[0]
    assert targets
    targetrevs = [repo[r].rev() for r in targets]
    heads = repo.revs(b'heads(%ld::%ld)', targetrevs, targetrevs)
    if len(heads) > 1:
        cheader = (_(b"changeset %s split over multiple topological"
                     b" branches, choose an evolve destination:") % ctx)
        selectedrev = revselectionprompt(ui, repo, list(heads), cheader)
        if selectedrev is None:
            return
        succ = repo[selectedrev]
    else:
        succ = repo[heads.first()]
    return repo[succ].rev()

def _successorrevs(repo, ctx):
    try:
        return {_singlesuccessor(repo, ctx)}
    except MultipleSuccessorsError as exc:
        return {repo[node].rev()
                for successorsset in exc.successorssets
                for node in successorsset}

def revselectionprompt(ui, repo, revs, customheader=b""):
    """function to prompt user to choose a revision from all the revs and return
    that revision for further tasks

    revs is a list of rev number of revision from which one revision should be
    choosed by the user
    customheader is a text which the caller wants as the header of the prompt
    which will list revisions to select

    returns value is:
        rev number of revision choosed: if user choose a revision
        None: if user entered a wrong input, user quit the prompt,
              ui.interactive is not set
    """

    # ui.interactive is not set, fallback to default behavior and avoid showing
    # the prompt
    if not ui.interactive():
        return None

    promptmsg = customheader + b"\n"
    for idx, rev in enumerate(revs):
        curctx = repo[rev]
        revmsg = b"%d: [%s] %s\n" % (idx + 1, curctx,
                                     curctx.description().split(b"\n")[0])
        promptmsg += revmsg

    promptmsg += _(b"q: quit the prompt\n")
    promptmsg += _(b"enter the index of the revision you want to select:")
    idxselected = ui.prompt(promptmsg)

    intidx = None
    try:
        intidx = int(idxselected)
    except ValueError:
        if idxselected == b'q':
            return None
        ui.write_err(_(b"invalid value '%s' entered for index\n") % idxselected)
        return None

    if intidx > len(revs) or intidx <= 0:
        # we can make this error message better
        ui.write_err(_(b"invalid value '%d' entered for index\n") % intidx)
        return None

    return revs[intidx - 1]

def mergeusers(ui, base, divergent, other):
    """Return the merged user from two divergent changesets.

    Perform merge using 3-way merge. If unable to merge, concatenate
    the two users.
    """
    baseuser = base.user()
    divuser = divergent.user()
    othuser = other.user()

    if divuser == othuser:
        return divuser
    else:
        if baseuser == divuser:
            return othuser
        elif baseuser == othuser:
            return divuser
        else:
            # all three are different, lets concatenate the two authors
            # XXX: should we let the user know about concatenation of authors
            #      by printing some message (or maybe in verbose mode)
            users = set(divuser.split(b', '))
            users.update(othuser.split(b', '))
            user = b', '.join(sorted(users))
            return user