mercurial/admin/verify.py
author Matt Harbison <matt_harbison@yahoo.com>
Mon, 30 Sep 2024 23:50:40 -0400
changeset 51935 77e2994bd617
parent 51864 1c5810ce737e
permissions -rw-r--r--
mdiff: convert a few block definitions from lists to tuples These were flagged by adding type hints. Some places were using a tuple of 4 ints to define a block, and others were using a list of 4. A tuple is better for typing, because we can define the length and the type of each entry. One of the places had to redefine the tuple, since writing to a tuple at an index isn't supported. This change spills out into the tests, and archeology says it was added to the repo in this state. There was no reason given for the divergence, and I suspect it wasn't intentional. It looks like `splitblock()` is completely unused in the codebase.

# admin/verify.py - better repository integrity checking for Mercurial
#
# Copyright 2023 Octobus <contact@octobus.net>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.

from __future__ import annotations

import collections
import copy
import functools

from ..i18n import _
from .. import error, pycompat, registrar, requirements
from ..utils import stringutil


verify_table = {}
verify_alias_table = {}
check = registrar.verify_check(verify_table, verify_alias_table)


# Use this to declare options/aliases in the middle of the hierarchy.
# Checks like these are not run themselves and cannot have a body.
# For an example, see the `revlogs` check.
def noop_func(*args, **kwargs):
    return


@check(b"working-copy.dirstate", alias=b"dirstate")
def check_dirstate(ui, repo, **options):
    ui.status(_(b"checking dirstate\n"))

    parent1, parent2 = repo.dirstate.parents()
    m1 = repo[parent1].manifest()
    m2 = repo[parent2].manifest()
    errors = 0

    is_narrow = requirements.NARROW_REQUIREMENT in repo.requirements
    narrow_matcher = repo.narrowmatch() if is_narrow else None
    for err in repo.dirstate.verify(m1, m2, parent1, narrow_matcher):
        ui.warn(err)
        errors += 1

    return errors


# Tree of all checks and their associated function
pyramid = {}


def build_pyramid(table, full_pyramid):
    """Create a pyramid of checks of the registered checks.
    It is a name-based hierarchy that can be arbitrarily nested."""
    for entry, func in sorted(table.items(), key=lambda x: x[0], reverse=True):
        cursor = full_pyramid
        levels = entry.split(b".")
        for level in levels[:-1]:
            current_node = cursor.setdefault(level, {})
            cursor = current_node
        if cursor.get(levels[-1]) is None:
            cursor[levels[-1]] = (entry, func)
        elif func is not noop_func:
            m = b"intermediate checks need to use `verify.noop_func`"
            raise error.ProgrammingError(m)


def find_checks(name, table=None, alias_table=None, full_pyramid=None):
    """Find all checks for a given name and returns a dict of
    (qualified_check_name, check_function)

    # Examples

    Using a full qualified name:
    "working-copy.dirstate" -> {
        "working-copy.dirstate": CF,
    }

    Using a *prefix* of a qualified name:
    "store.revlogs" -> {
        "store.revlogs.changelog": CF,
        "store.revlogs.manifestlog": CF,
        "store.revlogs.filelog": CF,
    }

    Using a defined alias:
    "revlogs" -> {
        "store.revlogs.changelog": CF,
        "store.revlogs.manifestlog": CF,
        "store.revlogs.filelog": CF,
    }

    Using something that is none of the above will be an error.
    """
    if table is None:
        table = verify_table
    if alias_table is None:
        alias_table = verify_alias_table

    if name == b"full":
        return table
    checks = {}

    # is it a full name?
    check = table.get(name)

    if check is None:
        # is it an alias?
        qualified_name = alias_table.get(name)
        if qualified_name is not None:
            name = qualified_name
            check = table.get(name)
        else:
            split = name.split(b".", 1)
            if len(split) == 2:
                # split[0] can be an alias
                qualified_name = alias_table.get(split[0])
                if qualified_name is not None:
                    name = b"%s.%s" % (qualified_name, split[1])
                    check = table.get(name)
    else:
        qualified_name = name

    # Maybe it's a subtree in the check hierarchy that does not
    # have an explicit alias.
    levels = name.split(b".")
    if full_pyramid is not None:
        if not full_pyramid:
            build_pyramid(table, full_pyramid)

        pyramid.clear()
        pyramid.update(full_pyramid.items())
    else:
        build_pyramid(table, pyramid)

    subtree = pyramid
    # Find subtree
    for level in levels:
        subtree = subtree.get(level)
        if subtree is None:
            hint = error.getsimilar(list(alias_table) + list(table), name)
            hint = error.similarity_hint(hint)

            raise error.InputError(_(b"unknown check %s" % name), hint=hint)

    # Get all checks in that subtree
    if isinstance(subtree, dict):
        stack = list(subtree.items())
        while stack:
            current_name, entry = stack.pop()
            if isinstance(entry, dict):
                stack.extend(entry.items())
            else:
                # (qualified_name, func)
                checks[entry[0]] = entry[1]
    else:
        checks[name] = check

    return checks


def pass_options(
    ui,
    checks,
    options,
    table=None,
    alias_table=None,
    full_pyramid=None,
):
    """Given a dict of checks (fully qualified name to function), and a list
    of options as given by the user, pass each option down to the right check
    function."""
    ui.debug(b"passing options to check functions\n")
    to_modify = collections.defaultdict(dict)

    if not checks:
        raise error.Error(_(b"`checks` required"))

    for option in sorted(options):
        split = option.split(b":")
        hint = _(
            b"syntax is 'check:option=value', "
            b"eg. revlogs.changelog:copies=yes"
        )
        option_error = error.InputError(
            _(b"invalid option '%s'") % option, hint=hint
        )
        if len(split) != 2:
            raise option_error

        check_name, option_value = split
        if not option_value:
            raise option_error

        split = option_value.split(b"=")
        if len(split) != 2:
            raise option_error

        option_name, value = split
        if not value:
            raise option_error

        path = b"%s:%s" % (check_name, option_name)

        matching_checks = find_checks(
            check_name,
            table=table,
            alias_table=alias_table,
            full_pyramid=full_pyramid,
        )
        for name in matching_checks:
            check = checks.get(name)
            if check is None:
                msg = _(b"specified option '%s' for unselected check '%s'\n")
                raise error.InputError(msg % (name, option_name))

            assert hasattr(check, "func")  # help Pytype

            if not hasattr(check.func, "options"):
                raise error.InputError(
                    _(b"check '%s' has no option '%s'") % (name, option_name)
                )

            try:
                matching_option = next(
                    (o for o in check.func.options if o[0] == option_name)
                )
            except StopIteration:
                raise error.InputError(
                    _(b"check '%s' has no option '%s'") % (name, option_name)
                )

            # transform the argument from cli string to the expected Python type
            _name, typ, _docstring = matching_option

            as_typed = None
            if isinstance(typ, bool):
                as_bool = stringutil.parsebool(value)
                if as_bool is None:
                    raise error.InputError(
                        _(b"'%s' is not a boolean ('%s')") % (path, value)
                    )
                as_typed = as_bool
            elif isinstance(typ, list):
                as_list = stringutil.parselist(value)
                if as_list is None:
                    raise error.InputError(
                        _(b"'%s' is not a list ('%s')") % (path, value)
                    )
                as_typed = as_list
            else:
                raise error.ProgrammingError(b"unsupported type %s", type(typ))

            if option_name in to_modify[name]:
                raise error.InputError(
                    _(b"duplicated option '%s' for '%s'") % (option_name, name)
                )
            else:
                assert as_typed is not None
                to_modify[name][option_name] = as_typed

    # Manage case where a check is set but without command line options
    # it will later be set with default check options values
    for name, f in checks.items():
        if name not in to_modify:
            to_modify[name] = {}

    # Merge default options with command line options
    for check_name, cmd_options in to_modify.items():
        check = checks.get(check_name)
        func = checks[check_name]
        merged_options = {}
        # help Pytype
        assert check is not None
        assert check.func is not None
        assert hasattr(check.func, "options")

        if check.func.options:
            # copy the default value in case it's mutable (list, etc.)
            merged_options = {
                o[0]: copy.deepcopy(o[1]) for o in check.func.options
            }
            if cmd_options:
                for k, v in cmd_options.items():
                    merged_options[k] = v
        options = pycompat.strkwargs(merged_options)
        checks[check_name] = functools.partial(func, **options)
        ui.debug(b"merged options for '%s': '%r'\n" % (check_name, options))

    return checks


def get_checks(
    repo,
    ui,
    names=None,
    options=None,
    table=None,
    alias_table=None,
    full_pyramid=None,
):
    """Given a list of function names and optionally a list of
    options, return matched checks with merged options (command line options
    values take precedence on default ones)

    It runs find checks, then resolve options and returns a dict of matched
    functions with resolved options.
    """
    funcs = {}

    if names is None:
        names = []

    if options is None:
        options = []

    # find checks
    for name in names:
        matched = find_checks(
            name,
            table=table,
            alias_table=alias_table,
            full_pyramid=full_pyramid,
        )
        matched_names = b", ".join(matched)
        ui.debug(b"found checks '%s' for name '%s'\n" % (matched_names, name))
        funcs.update(matched)

    funcs = {n: functools.partial(f, ui, repo) for n, f in funcs.items()}

    # resolve options
    checks = pass_options(
        ui,
        funcs,
        options,
        table=table,
        alias_table=alias_table,
        full_pyramid=full_pyramid,
    )

    return checks