view hgext/git/dirstate.py @ 52283:fad30cb98579

histedit-continue: wire up `hg continue` and `histedit`
author Brian Hulette <bhulette@apache.org>
date Wed, 30 Oct 2024 19:38:56 -0700
parents 93d872a06132
children
line wrap: on
line source

from __future__ import annotations

import contextlib
import os

from typing import (
    Any,
    Dict,
    Iterable,
    Iterator,
    List,
    Optional,
    Tuple,
)

from mercurial.node import sha1nodeconstants
from mercurial import (
    dirstatemap,
    error,
    extensions,
    match as matchmod,
    pycompat,
    scmutil,
    util,
)
from mercurial.dirstateutils import (
    timestamp,
)
from mercurial.interfaces import (
    dirstate as intdirstate,
)

from . import gitutil


DirstateItem = dirstatemap.DirstateItem
propertycache = util.propertycache
pygit2 = gitutil.get_pygit2()


def readpatternfile(orig, filepath, warn, sourceinfo=False):
    if not (b'info/exclude' in filepath or filepath.endswith(b'.gitignore')):
        return orig(filepath, warn, sourceinfo=False)
    result = []
    warnings = []
    with open(filepath, 'rb') as fp:
        for l in fp:
            l = l.strip()
            if not l or l.startswith(b'#'):
                continue
            if l.startswith(b'!'):
                warnings.append(b'unsupported ignore pattern %s' % l)
                continue
            if l.startswith(b'/'):
                result.append(b'rootglob:' + l[1:])
            else:
                result.append(b'relglob:' + l)
    return result, warnings


extensions.wrapfunction(matchmod, 'readpatternfile', readpatternfile)


_STATUS_MAP = {}
if pygit2:
    _STATUS_MAP = {
        pygit2.GIT_STATUS_CONFLICTED: b'm',
        pygit2.GIT_STATUS_CURRENT: b'n',
        pygit2.GIT_STATUS_IGNORED: b'?',
        pygit2.GIT_STATUS_INDEX_DELETED: b'r',
        pygit2.GIT_STATUS_INDEX_MODIFIED: b'n',
        pygit2.GIT_STATUS_INDEX_NEW: b'a',
        pygit2.GIT_STATUS_INDEX_RENAMED: b'a',
        pygit2.GIT_STATUS_INDEX_TYPECHANGE: b'n',
        pygit2.GIT_STATUS_WT_DELETED: b'r',
        pygit2.GIT_STATUS_WT_MODIFIED: b'n',
        pygit2.GIT_STATUS_WT_NEW: b'?',
        pygit2.GIT_STATUS_WT_RENAMED: b'a',
        pygit2.GIT_STATUS_WT_TYPECHANGE: b'n',
        pygit2.GIT_STATUS_WT_UNREADABLE: b'?',
        pygit2.GIT_STATUS_INDEX_MODIFIED | pygit2.GIT_STATUS_WT_MODIFIED: b'm',
    }


class gitdirstate(intdirstate.idirstate):
    def __init__(self, ui, vfs, gitrepo, use_dirstate_v2):
        self._ui = ui
        self._root = os.path.dirname(vfs.base)
        self._opener = vfs
        self.git = gitrepo
        self._plchangecallbacks = {}
        # TODO: context.poststatusfixup is bad and uses this attribute
        self._dirty = False
        self._mapcls = dirstatemap.dirstatemap
        self._use_dirstate_v2 = use_dirstate_v2

    @propertycache
    def _map(self):
        """Return the dirstate contents (see documentation for dirstatemap)."""
        self._map = self._mapcls(
            self._ui,
            self._opener,
            self._root,
            sha1nodeconstants,
            self._use_dirstate_v2,
        )
        return self._map

    def p1(self) -> bytes:
        try:
            return self.git.head.peel().id.raw
        except pygit2.GitError:
            # Typically happens when peeling HEAD fails, as in an
            # empty repository.
            return sha1nodeconstants.nullid

    def p2(self) -> bytes:
        # TODO: MERGE_HEAD? something like that, right?
        return sha1nodeconstants.nullid

    def setparents(self, p1: bytes, p2: Optional[bytes] = None):
        if p2 is None:
            p2 = sha1nodeconstants.nullid
        assert p2 == sha1nodeconstants.nullid, b'TODO merging support'
        self.git.head.set_target(gitutil.togitnode(p1))

    @util.propertycache
    def identity(self):
        return util.filestat.frompath(
            os.path.join(self._root, b'.git', b'index')
        )

    def branch(self) -> bytes:
        return b'default'

    def parents(self) -> List[bytes]:
        # TODO how on earth do we find p2 if a merge is in flight?
        return [self.p1(), sha1nodeconstants.nullid]

    def __iter__(self) -> Iterator[bytes]:
        return (pycompat.fsencode(f.path) for f in self.git.index)

    def items(self) -> Iterator[Tuple[bytes, intdirstate.DirstateItemT]]:
        for ie in self.git.index:
            yield ie.path, None  # value should be a DirstateItem

    # py2,3 compat forward
    iteritems = items

    def __getitem__(self, filename):
        try:
            gs = self.git.status_file(filename)
        except KeyError:
            return b'?'
        return _STATUS_MAP[gs]

    def __contains__(self, filename: Any) -> bool:
        try:
            gs = self.git.status_file(filename)
            return _STATUS_MAP[gs] != b'?'
        except KeyError:
            return False

    def status(
        self,
        match: matchmod.basematcher,
        subrepos: bool,
        ignored: bool,
        clean: bool,
        unknown: bool,
    ) -> intdirstate.StatusReturnT:
        listclean = clean
        # TODO handling of clean files - can we get that from git.status()?
        modified, added, removed, deleted, unknown, ignored, clean = (
            [],
            [],
            [],
            [],
            [],
            [],
            [],
        )

        try:
            mtime_boundary = timestamp.get_fs_now(self._opener)
        except OSError:
            # In largefiles or readonly context
            mtime_boundary = None

        gstatus = self.git.status()
        for path, status in gstatus.items():
            path = pycompat.fsencode(path)
            if not match(path):
                continue
            if status == pygit2.GIT_STATUS_IGNORED:
                if path.endswith(b'/'):
                    continue
                ignored.append(path)
            elif status in (
                pygit2.GIT_STATUS_WT_MODIFIED,
                pygit2.GIT_STATUS_INDEX_MODIFIED,
                pygit2.GIT_STATUS_WT_MODIFIED
                | pygit2.GIT_STATUS_INDEX_MODIFIED,
            ):
                modified.append(path)
            elif status == pygit2.GIT_STATUS_INDEX_NEW:
                added.append(path)
            elif status == pygit2.GIT_STATUS_WT_NEW:
                unknown.append(path)
            elif status == pygit2.GIT_STATUS_WT_DELETED:
                deleted.append(path)
            elif status == pygit2.GIT_STATUS_INDEX_DELETED:
                removed.append(path)
            else:
                raise error.Abort(
                    b'unhandled case: status for %r is %r' % (path, status)
                )

        if listclean:
            observed = set(
                modified + added + removed + deleted + unknown + ignored
            )
            index = self.git.index
            index.read()
            for entry in index:
                path = pycompat.fsencode(entry.path)
                if not match(path):
                    continue
                if path in observed:
                    continue  # already in some other set
                if path[-1] == b'/':
                    continue  # directory
                clean.append(path)

        # TODO are we really always sure of status here?
        return (
            False,
            scmutil.status(
                modified, added, removed, deleted, unknown, ignored, clean
            ),
            mtime_boundary,
        )

    def flagfunc(
        self, buildfallback: intdirstate.FlagFuncFallbackT
    ) -> intdirstate.FlagFuncReturnT:
        # TODO we can do better
        return buildfallback()

    def getcwd(self) -> bytes:
        # TODO is this a good way to do this?
        return os.path.dirname(
            os.path.dirname(pycompat.fsencode(self.git.path))
        )

    def get_entry(self, path: bytes) -> intdirstate.DirstateItemT:
        """return a DirstateItem for the associated path"""
        entry = self._map.get(path)
        if entry is None:
            return DirstateItem()
        return entry

    def normalize(
        self, path: bytes, isknown: bool = False, ignoremissing: bool = False
    ) -> bytes:
        normed = util.normcase(path)
        assert normed == path, b"TODO handling of case folding: %s != %s" % (
            normed,
            path,
        )
        return path

    @property
    def _checklink(self) -> bool:
        return util.checklink(os.path.dirname(pycompat.fsencode(self.git.path)))

    def copies(self) -> Dict[bytes, bytes]:
        # TODO support copies?
        return {}

    # # TODO what the heck is this
    _filecache = set()

    @property
    def is_changing_parents(self) -> bool:
        # TODO: we need to implement the context manager bits and
        # correctly stage/revert index edits.
        return False

    @property
    def is_changing_any(self) -> bool:
        # TODO: we need to implement the context manager bits and
        # correctly stage/revert index edits.
        return False

    def write(self, tr: Optional[intdirstate.TransactionT]) -> None:
        # TODO: call parent change callbacks

        if tr:

            def writeinner(category):
                self.git.index.write()

            tr.addpending(b'gitdirstate', writeinner)
        else:
            self.git.index.write()

    def pathto(self, f: bytes, cwd: Optional[bytes] = None) -> bytes:
        if cwd is None:
            cwd = self.getcwd()
        # TODO core dirstate does something about slashes here
        assert isinstance(f, bytes)
        r = util.pathto(self._root, cwd, f)
        return r

    def matches(self, match: matchmod.basematcher) -> Iterable[bytes]:
        for x in self.git.index:
            p = pycompat.fsencode(x.path)
            if match(p):
                yield p  # TODO: return list instead of yielding?

    def set_clean(self, f, parentfiledata):
        """Mark a file normal and clean."""
        # TODO: for now we just let libgit2 re-stat the file. We can
        # clearly do better.

    def set_possibly_dirty(self, f):
        """Mark a file normal, but possibly dirty."""
        # TODO: for now we just let libgit2 re-stat the file. We can
        # clearly do better.

    def walk(
        self,
        match: matchmod.basematcher,
        subrepos: Any,
        unknown: bool,
        ignored: bool,
        full: bool = True,
    ) -> intdirstate.WalkReturnT:
        # TODO: we need to use .status() and not iterate the index,
        # because the index doesn't force a re-walk and so `hg add` of
        # a new file without an intervening call to status will
        # silently do nothing.
        r = {}
        cwd = self.getcwd()
        for path, status in self.git.status().items():
            if path.startswith('.hg/'):
                continue
            path = pycompat.fsencode(path)
            if not match(path):
                continue
            # TODO construct the stat info from the status object?
            try:
                s = os.stat(os.path.join(cwd, path))
            except FileNotFoundError:
                continue
            r[path] = s
        return r

    def set_tracked(self, f, reset_copy=False):
        # TODO: support copies and reset_copy=True
        uf = pycompat.fsdecode(f)
        if uf in self.git.index:
            return False
        index = self.git.index
        index.read()
        index.add(uf)
        index.write()
        return True

    def add(self, f):
        index = self.git.index
        index.read()
        index.add(pycompat.fsdecode(f))
        index.write()

    def drop(self, f):
        index = self.git.index
        index.read()
        fs = pycompat.fsdecode(f)
        if fs in index:
            index.remove(fs)
            index.write()

    def set_untracked(self, f):
        index = self.git.index
        index.read()
        fs = pycompat.fsdecode(f)
        if fs in index:
            index.remove(fs)
            index.write()
            return True
        return False

    def remove(self, f):
        index = self.git.index
        index.read()
        index.remove(pycompat.fsdecode(f))
        index.write()

    def copied(self, file: bytes) -> Optional[bytes]:
        # TODO: track copies?
        return None

    def prefetch_parents(self):
        # TODO
        pass

    def update_file(self, *args, **kwargs):
        # TODO
        pass

    @contextlib.contextmanager
    def changing_parents(self, repo):
        # TODO: track this maybe?
        yield

    def addparentchangecallback(
        self, category: bytes, callback: intdirstate.AddParentChangeCallbackT
    ) -> None:
        # TODO: should this be added to the dirstate interface?
        self._plchangecallbacks[category] = callback

    def setbranch(
        self, branch: bytes, transaction: Optional[intdirstate.TransactionT]
    ) -> None:
        raise error.Abort(
            b'git repos do not support branches. try using bookmarks'
        )