hgext/censor.py
author Matt Harbison <matt_harbison@yahoo.com>
Sat, 03 Aug 2024 01:33:13 -0400
changeset 51871 cfd30df0f8e4
parent 51863 f4733654f144
permissions -rw-r--r--
bundlerepo: fix mismatches with repository and revlog classes Both pytype and PyCharm complained that `write()` and `_write()` in the bundlephasecache class aren't proper overrides- indeed they seem to be missing an argument that the base class has. PyCharm and pytype also complained that the `revlog.revlog` class doesn't have a `_chunk()` method. That looks like it was moved from revlog to `_InnerRevlog` back in e8ad6d8de8b8, and wasn't caught because this module wasn't type checked. However, I couldn't figure out a syntax with `revlog.revlog._inner._chunk(self, rev)`, as it complained about passing too many args. `bundlerevlog._rawtext()` uses this `super(...)` style to call the super class, so hopefully that works, even with the wonky dynamic subclassing. The revlog class needed the `_InnerRevlog` field typed because it isn't set in the constructor. Finally, the vfs type hints look broken. This initially failed with: File "/mnt/c/Users/Matt/hg/mercurial/bundlerepo.py", line 65, in __init__: Function readonlyvfs.__init__ was called with the wrong arguments [wrong-arg-types] Expected: (self, vfs: mercurial.vfs.vfs) Actually passed: (self, vfs: Callable) Called from (traceback): line 232, in dirlog line 214, in __init__ I don't see a raw Callable, but I tried changing some of the vfs args to be typed as `vfsmod.abstractvfs`, but that class doesn't have `options`, so it failed elsewhere. `readonlyvfs` isn't a subclass of `vfs` (it's a subclass of `abstractvfs`), so I'm not sure how to handle that. It would be a shame to have to make a union of vfs subclasses (but not all of them have `options` either).

# Copyright (C) 2015 - Mike Edgar <adgar@google.com>
#
# This extension enables removal of file content at a given revision,
# rewriting the data/metadata of successive revisions to preserve revision log
# integrity.

"""erase file content at a given revision

The censor command instructs Mercurial to erase all content of a file at a given
revision *without updating the changeset hash.* This allows existing history to
remain valid while preventing future clones/pulls from receiving the erased
data.

Typical uses for censor are due to security or legal requirements, including::

 * Passwords, private keys, cryptographic material
 * Licensed data/code/libraries for which the license has expired
 * Personally Identifiable Information or other private data

Censored nodes can interrupt mercurial's typical operation whenever the excised
data needs to be materialized. Some commands, like ``hg cat``/``hg revert``,
simply fail when asked to produce censored data. Others, like ``hg verify`` and
``hg update``, must be capable of tolerating censored data to continue to
function in a meaningful way. Such commands only tolerate censored file
As having a censored version in a checkout is impractical. The current head
revisions of the repository are checked. If the revision to be censored is in
any of them the command will abort. You can configure this behavior using the
following option:

    `censor.policy`
        :config-doc:`censor.policy`
"""

from __future__ import annotations

from mercurial.i18n import _
from mercurial.node import short

from mercurial import (
    error,
    registrar,
    scmutil,
)

cmdtable = {}
command = registrar.command(cmdtable)
# Note for extension authors: ONLY specify testedwith = 'ships-with-hg-core' for
# extensions which SHIP WITH MERCURIAL. Non-mainline extensions should
# be specifying the version(s) of Mercurial they are tested with, or
# leave the attribute unspecified.
testedwith = b'ships-with-hg-core'


@command(
    b'censor',
    [
        (
            b'r',
            b'rev',
            [],
            _(b'censor file from specified revision'),
            _(b'REV'),
        ),
        (
            b'',
            b'check-heads',
            True,
            _(b'check that repository heads are not affected'),
        ),
        (b't', b'tombstone', b'', _(b'replacement tombstone data'), _(b'TEXT')),
    ],
    _(b'-r REV [-t TEXT] [FILE]'),
    helpcategory=command.CATEGORY_MAINTENANCE,
)
def censor(ui, repo, path, rev=(), tombstone=b'', check_heads=True, **opts):
    with repo.wlock(), repo.lock():
        return _docensor(
            ui,
            repo,
            path,
            rev,
            tombstone,
            check_heads=check_heads,
            **opts,
        )


def _docensor(ui, repo, path, revs=(), tombstone=b'', check_heads=True, **opts):
    if not path:
        raise error.Abort(_(b'must specify file path to censor'))
    if not revs:
        raise error.Abort(_(b'must specify revisions to censor'))

    wctx = repo[None]

    m = scmutil.match(wctx, (path,))
    if m.anypats() or len(m.files()) != 1:
        raise error.Abort(_(b'can only specify an explicit filename'))
    path = m.files()[0]
    flog = repo.file(path)
    if not len(flog):
        raise error.Abort(_(b'cannot censor file with no history'))

    revs = scmutil.revrange(repo, revs)
    if not revs:
        raise error.Abort(_(b'no matching revisions'))
    file_nodes = set()
    for r in revs:
        try:
            ctx = repo[r]
            file_nodes.add(ctx.filectx(path).filenode())
        except error.LookupError:
            raise error.Abort(_(b'file does not exist at revision %s') % ctx)

    if check_heads:
        heads = []
        repo_heads = repo.heads()
        msg = b'checking for the censored content in %d heads\n'
        msg %= len(repo_heads)
        ui.status(msg)
        for headnode in repo_heads:
            hc = repo[headnode]
            if path in hc and hc.filenode(path) in file_nodes:
                heads.append(hc)
        if heads:
            headlist = b', '.join([short(c.node()) for c in heads])
            raise error.Abort(
                _(b'cannot censor file in heads (%s)') % headlist,
                hint=_(b'clean/delete and commit first'),
            )

    msg = b'checking for the censored content in the working directory\n'
    ui.status(msg)
    wp = wctx.parents()
    if ctx.node() in [p.node() for p in wp]:
        raise error.Abort(
            _(b'cannot censor working directory'),
            hint=_(b'clean/delete/update first'),
        )

    msg = b'censoring %d file revisions\n'
    msg %= len(file_nodes)
    ui.status(msg)
    with repo.transaction(b'censor') as tr:
        flog.censorrevision(tr, file_nodes, tombstone=tombstone)