Mercurial > hg-stable
changeset 51965:93d872a06132
typing: add type annotations to the dirstate classes
The basic procedure here was to use `merge-pyi` to merge the `git/dirstate.pyi`
file in (after renaming the interface class to match), cleaning up the import
statement mess, and then repeating the procedure for `mercurial/dirstate.pyi`.
Surprisingly, git's dirstate had more hints inferred in its *.pyi file.
After that, it was a manual examination of each method in the interface, and how
they were implemented in the core and git classes to verify what was inferred by
pytype, and fill in the missing gaps. Since this involved jumping around
between three different files, I applied the same type info to all three at the
same time. Complex types I rolled up into type aliases in the interface module,
and used that as needed. That way if it changes, there's one place to edit.
There are some hints still missing, and some documentation that doesn't match
the signatures. They should all be marked with TODOs. There are also a bunch
of methods on the core class that aren't on the Protocol class that seem like
maybe they should be (like `set_tracked()`). There are even more methods
missing from the git class. But that's a project for another time.
author | Matt Harbison <matt_harbison@yahoo.com> |
---|---|
date | Fri, 27 Sep 2024 12:30:37 -0400 |
parents | 3688a984134b |
children | bc9ed92d4753 |
files | hgext/git/dirstate.py mercurial/dirstate.py mercurial/interfaces/dirstate.py |
diffstat | 3 files changed, 247 insertions(+), 92 deletions(-) [+] |
line wrap: on
line diff
--- a/hgext/git/dirstate.py Fri Sep 27 12:10:25 2024 -0400 +++ b/hgext/git/dirstate.py Fri Sep 27 12:30:37 2024 -0400 @@ -3,6 +3,16 @@ import contextlib import os +from typing import ( + Any, + Dict, + Iterable, + Iterator, + List, + Optional, + Tuple, +) + from mercurial.node import sha1nodeconstants from mercurial import ( dirstatemap, @@ -96,7 +106,7 @@ ) return self._map - def p1(self): + def p1(self) -> bytes: try: return self.git.head.peel().id.raw except pygit2.GitError: @@ -104,11 +114,11 @@ # empty repository. return sha1nodeconstants.nullid - def p2(self): + def p2(self) -> bytes: # TODO: MERGE_HEAD? something like that, right? return sha1nodeconstants.nullid - def setparents(self, p1, p2=None): + def setparents(self, p1: bytes, p2: Optional[bytes] = None): if p2 is None: p2 = sha1nodeconstants.nullid assert p2 == sha1nodeconstants.nullid, b'TODO merging support' @@ -120,17 +130,17 @@ os.path.join(self._root, b'.git', b'index') ) - def branch(self): + def branch(self) -> bytes: return b'default' - def parents(self): + def parents(self) -> List[bytes]: # TODO how on earth do we find p2 if a merge is in flight? return [self.p1(), sha1nodeconstants.nullid] - def __iter__(self): + def __iter__(self) -> Iterator[bytes]: return (pycompat.fsencode(f.path) for f in self.git.index) - def items(self): + def items(self) -> Iterator[Tuple[bytes, intdirstate.DirstateItemT]]: for ie in self.git.index: yield ie.path, None # value should be a DirstateItem @@ -144,14 +154,21 @@ return b'?' return _STATUS_MAP[gs] - def __contains__(self, filename): + def __contains__(self, filename: Any) -> bool: try: gs = self.git.status_file(filename) return _STATUS_MAP[gs] != b'?' except KeyError: return False - def status(self, match, subrepos, ignored, clean, unknown): + def status( + self, + match: matchmod.basematcher, + subrepos: bool, + ignored: bool, + clean: bool, + unknown: bool, + ) -> intdirstate.StatusReturnT: listclean = clean # TODO handling of clean files - can we get that from git.status()? modified, added, removed, deleted, unknown, ignored, clean = ( @@ -224,24 +241,28 @@ mtime_boundary, ) - def flagfunc(self, buildfallback): + def flagfunc( + self, buildfallback: intdirstate.FlagFuncFallbackT + ) -> intdirstate.FlagFuncReturnT: # TODO we can do better return buildfallback() - def getcwd(self): + def getcwd(self) -> bytes: # TODO is this a good way to do this? return os.path.dirname( os.path.dirname(pycompat.fsencode(self.git.path)) ) - def get_entry(self, path): + def get_entry(self, path: bytes) -> intdirstate.DirstateItemT: """return a DirstateItem for the associated path""" entry = self._map.get(path) if entry is None: return DirstateItem() return entry - def normalize(self, path, isknown=False, ignoremissing=False): + def normalize( + self, path: bytes, isknown: bool = False, ignoremissing: bool = False + ) -> bytes: normed = util.normcase(path) assert normed == path, b"TODO handling of case folding: %s != %s" % ( normed, @@ -250,10 +271,10 @@ return path @property - def _checklink(self): + def _checklink(self) -> bool: return util.checklink(os.path.dirname(pycompat.fsencode(self.git.path))) - def copies(self): + def copies(self) -> Dict[bytes, bytes]: # TODO support copies? return {} @@ -261,18 +282,18 @@ _filecache = set() @property - def is_changing_parents(self): + def is_changing_parents(self) -> bool: # TODO: we need to implement the context manager bits and # correctly stage/revert index edits. return False @property - def is_changing_any(self): + def is_changing_any(self) -> bool: # TODO: we need to implement the context manager bits and # correctly stage/revert index edits. return False - def write(self, tr): + def write(self, tr: Optional[intdirstate.TransactionT]) -> None: # TODO: call parent change callbacks if tr: @@ -284,7 +305,7 @@ else: self.git.index.write() - def pathto(self, f, cwd=None): + def pathto(self, f: bytes, cwd: Optional[bytes] = None) -> bytes: if cwd is None: cwd = self.getcwd() # TODO core dirstate does something about slashes here @@ -292,11 +313,11 @@ r = util.pathto(self._root, cwd, f) return r - def matches(self, match): + def matches(self, match: matchmod.basematcher) -> Iterable[bytes]: for x in self.git.index: p = pycompat.fsencode(x.path) if match(p): - yield p + yield p # TODO: return list instead of yielding? def set_clean(self, f, parentfiledata): """Mark a file normal and clean.""" @@ -308,7 +329,14 @@ # TODO: for now we just let libgit2 re-stat the file. We can # clearly do better. - def walk(self, match, subrepos, unknown, ignored, full=True): + def walk( + self, + match: matchmod.basematcher, + subrepos: Any, + unknown: bool, + ignored: bool, + full: bool = True, + ) -> intdirstate.WalkReturnT: # TODO: we need to use .status() and not iterate the index, # because the index doesn't force a re-walk and so `hg add` of # a new file without an intervening call to status will @@ -370,7 +398,7 @@ index.remove(pycompat.fsdecode(f)) index.write() - def copied(self, path): + def copied(self, file: bytes) -> Optional[bytes]: # TODO: track copies? return None @@ -387,11 +415,15 @@ # TODO: track this maybe? yield - def addparentchangecallback(self, category, callback): + def addparentchangecallback( + self, category: bytes, callback: intdirstate.AddParentChangeCallbackT + ) -> None: # TODO: should this be added to the dirstate interface? self._plchangecallbacks[category] = callback - def setbranch(self, branch, transaction): + def setbranch( + self, branch: bytes, transaction: Optional[intdirstate.TransactionT] + ) -> None: raise error.Abort( b'git repos do not support branches. try using bookmarks' )
--- a/mercurial/dirstate.py Fri Sep 27 12:10:25 2024 -0400 +++ b/mercurial/dirstate.py Fri Sep 27 12:30:37 2024 -0400 @@ -13,6 +13,16 @@ import stat import uuid +from typing import ( + Any, + Dict, + Iterable, + Iterator, + List, + Optional, + Tuple, +) + from .i18n import _ from hgdemandimport import tracing @@ -396,7 +406,7 @@ raise error.ProgrammingError(msg) @property - def is_changing_any(self): + def is_changing_any(self) -> bool: """Returns true if the dirstate is in the middle of a set of changes. This returns True for any kind of change. @@ -404,7 +414,7 @@ return self._changing_level > 0 @property - def is_changing_parents(self): + def is_changing_parents(self) -> bool: """Returns true if the dirstate is in the middle of a set of changes that modify the dirstate parent. """ @@ -413,7 +423,7 @@ return self._change_type == CHANGE_TYPE_PARENTS @property - def is_changing_files(self): + def is_changing_files(self) -> bool: """Returns true if the dirstate is in the middle of a set of changes that modify the files tracked or their sources. """ @@ -469,11 +479,11 @@ def _pl(self): return self._map.parents() - def hasdir(self, d): + def hasdir(self, d: bytes) -> bool: return self._map.hastrackeddir(d) @rootcache(b'.hgignore') - def _ignore(self): + def _ignore(self) -> matchmod.basematcher: files = self._ignorefiles() if not files: return matchmod.never() @@ -486,11 +496,11 @@ return self._ui.configbool(b'ui', b'slash') and pycompat.ossep != b'/' @propertycache - def _checklink(self): + def _checklink(self) -> bool: return util.checklink(self._root) @propertycache - def _checkexec(self): + def _checkexec(self) -> bool: return bool(util.checkexec(self._root)) @propertycache @@ -502,7 +512,9 @@ # it's safe because f is always a relative path return self._rootdir + f - def flagfunc(self, buildfallback): + def flagfunc( + self, buildfallback: intdirstate.FlagFuncFallbackT + ) -> intdirstate.FlagFuncReturnT: """build a callable that returns flags associated with a filename The information is extracted from three possible layers: @@ -514,7 +526,7 @@ # small hack to cache the result of buildfallback() fallback_func = [] - def get_flags(x): + def get_flags(x: bytes) -> bytes: entry = None fallback_value = None try: @@ -565,7 +577,7 @@ return forcecwd return encoding.getcwd() - def getcwd(self): + def getcwd(self) -> bytes: """Return the path from which a canonical path is calculated. This path should be used to resolve file patterns or to convert @@ -585,7 +597,7 @@ # we're outside the repo. return an absolute path. return cwd - def pathto(self, f, cwd=None): + def pathto(self, f: bytes, cwd: Optional[bytes] = None) -> bytes: if cwd is None: cwd = self.getcwd() path = util.pathto(self._root, cwd, f) @@ -593,31 +605,31 @@ return util.pconvert(path) return path - def get_entry(self, path): + def get_entry(self, path: bytes) -> intdirstate.DirstateItemT: """return a DirstateItem for the associated path""" entry = self._map.get(path) if entry is None: return DirstateItem() return entry - def __contains__(self, key): + def __contains__(self, key: Any) -> bool: return key in self._map - def __iter__(self): + def __iter__(self) -> Iterator[bytes]: return iter(sorted(self._map)) - def items(self): + def items(self) -> Iterator[Tuple[bytes, intdirstate.DirstateItemT]]: return self._map.items() iteritems = items - def parents(self): + def parents(self) -> List[bytes]: return [self._validate(p) for p in self._pl] - def p1(self): + def p1(self) -> bytes: return self._validate(self._pl[0]) - def p2(self): + def p2(self) -> bytes: return self._validate(self._pl[1]) @property @@ -625,11 +637,11 @@ """True if a merge is in progress""" return self._pl[1] != self._nodeconstants.nullid - def branch(self): + def branch(self) -> bytes: return encoding.tolocal(self._branch) @requires_changing_parents - def setparents(self, p1, p2=None): + def setparents(self, p1: bytes, p2: Optional[bytes] = None): """Set dirstate parents to p1 and p2. When moving from two parents to one, "merged" entries a @@ -655,7 +667,9 @@ fold_p2 = oldp2 != nullid and p2 == nullid return self._map.setparents(p1, p2, fold_p2=fold_p2) - def setbranch(self, branch, transaction): + def setbranch( + self, branch: bytes, transaction: Optional[intdirstate.TransactionT] + ) -> None: self.__class__._branch.set(self, encoding.fromlocal(branch)) if transaction is not None: self._setup_tr_abort(transaction) @@ -683,7 +697,7 @@ def _write_branch(self, file_obj): file_obj.write(self._branch + b'\n') - def invalidate(self): + def invalidate(self) -> None: """Causes the next access to reread the dirstate. This is different from localrepo.invalidatedirstate() because it always @@ -703,7 +717,7 @@ self._origpl = None @requires_changing_any - def copy(self, source, dest): + def copy(self, source: Optional[bytes], dest: bytes) -> None: """Mark dest as a copy of source. Unmark dest if source is None.""" if source == dest: return @@ -714,10 +728,10 @@ else: self._map.copymap.pop(dest, None) - def copied(self, file): + def copied(self, file: bytes) -> Optional[bytes]: return self._map.copymap.get(file, None) - def copies(self): + def copies(self) -> Dict[bytes, bytes]: return self._map.copymap @requires_changing_files @@ -983,7 +997,9 @@ ) return folded - def normalize(self, path, isknown=False, ignoremissing=False): + def normalize( + self, path: bytes, isknown: bool = False, ignoremissing: bool = False + ) -> bytes: """ normalize the case of a pathname when on a casefolding filesystem @@ -1009,12 +1025,17 @@ # - its semantic is unclear # - do we really needs it ? @requires_changing_parents - def clear(self): + def clear(self) -> None: self._map.clear() self._dirty = True @requires_changing_parents - def rebuild(self, parent, allfiles, changedfiles=None): + def rebuild( + self, + parent: bytes, + allfiles: Iterable[bytes], # TODO: more than iterable? (uses len()) + changedfiles: Optional[Iterable[bytes]] = None, + ) -> None: matcher = self._sparsematcher if matcher is not None and not matcher.always(): # should not add non-matching files @@ -1080,7 +1101,7 @@ on_abort, ) - def write(self, tr): + def write(self, tr: Optional[intdirstate.TransactionT]) -> None: if not self._dirty: return # make sure we don't request a write of invalidated content @@ -1130,7 +1151,9 @@ self._opener.unlink(self._filename_th) self._use_tracked_hint = False - def addparentchangecallback(self, category, callback): + def addparentchangecallback( + self, category: bytes, callback: intdirstate.AddParentChangeCallbackT + ) -> None: """add a callback to be called when the wd parents are changed Callback will be called with the following arguments: @@ -1165,7 +1188,7 @@ return True return False - def _ignorefiles(self): + def _ignorefiles(self) -> List[bytes]: files = [] if os.path.exists(self._join(b'.hgignore')): files.append(self._join(b'.hgignore')) @@ -1176,7 +1199,7 @@ files.append(os.path.join(self._rootdir, util.expandpath(path))) return files - def _ignorefileandline(self, f): + def _ignorefileandline(self, f: bytes) -> intdirstate.IgnoreFileAndLineT: files = collections.deque(self._ignorefiles()) visited = set() while files: @@ -1334,7 +1357,14 @@ return results, dirsfound, dirsnotfound - def walk(self, match, subrepos, unknown, ignored, full=True): + def walk( + self, + match: matchmod.basematcher, + subrepos: Any, + unknown: bool, + ignored: bool, + full: bool = True, + ) -> intdirstate.WalkReturnT: """ Walk recursively through the directory tree, finding all files matched by match. @@ -1607,7 +1637,14 @@ ) return (lookup, status) - def status(self, match, subrepos, ignored, clean, unknown): + def status( + self, + match: matchmod.basematcher, + subrepos: bool, + ignored: bool, + clean: bool, + unknown: bool, + ) -> intdirstate.StatusReturnT: """Determine the status of the working copy relative to the dirstate and return a pair of (unsure, status), where status is of type scmutil.status and: @@ -1745,7 +1782,7 @@ ) return (lookup, status, mtime_boundary) - def matches(self, match): + def matches(self, match: matchmod.basematcher) -> Iterable[bytes]: """ return files in the dirstate (in whatever state) filtered by match """ @@ -1778,7 +1815,9 @@ files.append(self._map.docket.data_filename()) return tuple(files) - def verify(self, m1, m2, p1, narrow_matcher=None): + def verify( + self, m1, m2, p1: bytes, narrow_matcher: Optional[Any] = None + ) -> Iterator[bytes]: """ check the dirstate contents against the parent manifest and yield errors """
--- a/mercurial/interfaces/dirstate.py Fri Sep 27 12:10:25 2024 -0400 +++ b/mercurial/interfaces/dirstate.py Fri Sep 27 12:30:37 2024 -0400 @@ -1,11 +1,19 @@ from __future__ import annotations import contextlib +import os import typing from typing import ( + Any, Callable, + Dict, + Iterable, + Iterator, + List, + Optional, Protocol, + Tuple, ) if typing.TYPE_CHECKING: @@ -13,8 +21,52 @@ # to avoid circular imports from .. import ( match as matchmod, + scmutil, + transaction as txnmod, ) + # TODO: finish adding type hints + AddParentChangeCallbackT = Callable[ + ["idirstate", Tuple[Any, Any], Tuple[Any, Any]], Any + ] + """The callback type for dirstate.addparentchangecallback().""" + + # TODO: add a Protocol for dirstatemap.DirStateItem? (It is + # conditionalized with python or rust implementations. Also, + # git.dirstate needs to yield non-None from ``items()``.) + DirstateItemT = Any # dirstatemap.DirstateItem + + IgnoreFileAndLineT = Tuple[Optional[bytes], int, bytes] + """The return type of dirstate._ignorefileandline(), which holds + ``(file, lineno, originalline)``. + """ + + FlagFuncFallbackT = Callable[[], "FlagFuncReturnT"] + """The type for the dirstate.flagfunc() fallback function.""" + + FlagFuncReturnT = Callable[[bytes], bytes] + """The return type of dirstate.flagfunc().""" + + # TODO: verify and complete this- it came from a pytype *.pyi file + StatusReturnT = Tuple[Any, scmutil.status, Any] + """The return type of dirstate.status().""" + + # TODO: probably doesn't belong here. + TransactionT = txnmod.transaction + """The type for a transaction used with dirstate. + + This is meant to help callers avoid having to remember to delay the import + of the transaction module. + """ + + # TODO: The value can also be mercurial.osutil.stat + WalkReturnT = Dict[bytes, Optional[os.stat_result]] + """The return type of dirstate.walk(). + + The matched files are keyed in the dictionary, mapped to a stat-like object + if the file exists. + """ + class idirstate(Protocol): # TODO: convert these constructor args to fields? @@ -56,10 +108,10 @@ def is_changing_files(self) -> bool: """True if file tracking changes in progress.""" - def _ignorefiles(self): + def _ignorefiles(self) -> List[bytes]: """Return a list of files containing patterns to ignore.""" - def _ignorefileandline(self, f): + def _ignorefileandline(self, f: bytes) -> IgnoreFileAndLineT: """Given a file `f`, return the ignore file and line that ignores it.""" # TODO: decorate with `@util.propertycache` like dirstate class? @@ -75,7 +127,7 @@ """Callable for checking exec bits.""" # TODO: this comment looks stale @contextlib.contextmanager - def changing_parents(self, repo): + def changing_parents(self, repo) -> Iterator: # TODO: typehint this """Context manager for handling dirstate parents. If an exception occurs in the scope of the context manager, @@ -84,7 +136,7 @@ """ @contextlib.contextmanager - def changing_files(self, repo): + def changing_files(self, repo) -> Iterator: # TODO: typehint this """Context manager for handling dirstate files. If an exception occurs in the scope of the context manager, @@ -92,10 +144,10 @@ released. """ - def hasdir(self, d): + def hasdir(self, d: bytes) -> bool: pass - def flagfunc(self, buildfallback): + def flagfunc(self, buildfallback: FlagFuncFallbackT) -> FlagFuncReturnT: """build a callable that returns flags associated with a filename The information is extracted from three possible layers: @@ -104,7 +156,7 @@ 3. a more expensive mechanism inferring the flags from the parents. """ - def getcwd(self): + def getcwd(self) -> bytes: """Return the path from which a canonical path is calculated. This path should be used to resolve file patterns or to convert @@ -112,19 +164,19 @@ used to get real file paths. Use vfs functions instead. """ - def pathto(self, f, cwd=None): + def pathto(self, f: bytes, cwd: Optional[bytes] = None) -> bytes: pass - def get_entry(self, path): + def get_entry(self, path: bytes) -> DirstateItemT: """return a DirstateItem for the associated path""" - def __contains__(self, key): + def __contains__(self, key: Any) -> bool: """Check if bytestring `key` is known to the dirstate.""" - def __iter__(self): + def __iter__(self) -> Iterator[bytes]: """Iterate the dirstate's contained filenames as bytestrings.""" - def items(self): + def items(self) -> Iterator[Tuple[bytes, DirstateItemT]]: """Iterate the dirstate's entries as (filename, DirstateItem. As usual, filename is a bytestring. @@ -132,19 +184,20 @@ iteritems = items - def parents(self): + def parents(self) -> List[bytes]: pass - def p1(self): + def p1(self) -> bytes: pass - def p2(self): + def p2(self) -> bytes: pass - def branch(self): + def branch(self) -> bytes: pass - def setparents(self, p1, p2=None): + # TODO: typehint the return. It's a copies Map of some sort. + def setparents(self, p1: bytes, p2: Optional[bytes] = None): """Set dirstate parents to p1 and p2. When moving from two parents to one, "merged" entries a @@ -154,26 +207,30 @@ See localrepo.setparents() """ - def setbranch(self, branch, transaction): + def setbranch( + self, branch: bytes, transaction: Optional[TransactionT] + ) -> None: pass - def invalidate(self): + def invalidate(self) -> None: """Causes the next access to reread the dirstate. This is different from localrepo.invalidatedirstate() because it always rereads the dirstate. Use localrepo.invalidatedirstate() if you want to check whether the dirstate has changed before rereading it.""" - def copy(self, source, dest): + def copy(self, source: Optional[bytes], dest: bytes) -> None: """Mark dest as a copy of source. Unmark dest if source is None.""" - def copied(self, file): + def copied(self, file: bytes) -> Optional[bytes]: pass - def copies(self): + def copies(self) -> Dict[bytes, bytes]: pass - def normalize(self, path, isknown=False, ignoremissing=False): + def normalize( + self, path: bytes, isknown: bool = False, ignoremissing: bool = False + ) -> bytes: """ normalize the case of a pathname when on a casefolding filesystem @@ -191,16 +248,23 @@ - version provided via command arguments """ - def clear(self): + def clear(self) -> None: pass - def rebuild(self, parent, allfiles, changedfiles=None): + def rebuild( + self, + parent: bytes, + allfiles: Iterable[bytes], # TODO: more than iterable? (uses len()) + changedfiles: Optional[Iterable[bytes]] = None, + ) -> None: pass - def write(self, tr): + def write(self, tr: Optional[TransactionT]) -> None: pass - def addparentchangecallback(self, category, callback): + def addparentchangecallback( + self, category: bytes, callback: AddParentChangeCallbackT + ) -> None: """add a callback to be called when the wd parents are changed Callback will be called with the following arguments: @@ -210,7 +274,14 @@ with a newer callback. """ - def walk(self, match, subrepos, unknown, ignored, full=True): + def walk( + self, + match: matchmod.basematcher, + subrepos: Any, # TODO: figure out what this is + unknown: bool, + ignored: bool, + full: bool = True, + ) -> WalkReturnT: """ Walk recursively through the directory tree, finding all files matched by match. @@ -222,7 +293,14 @@ """ - def status(self, match, subrepos, ignored, clean, unknown): + def status( + self, + match: matchmod.basematcher, + subrepos: bool, + ignored: bool, + clean: bool, + unknown: bool, + ) -> StatusReturnT: """Determine the status of the working copy relative to the dirstate and return a pair of (unsure, status), where status is of type scmutil.status and: @@ -239,12 +317,18 @@ dirstate was written """ - def matches(self, match): + # TODO: could return a list, except git.dirstate is a generator + + def matches(self, match: matchmod.basematcher) -> Iterable[bytes]: """ return files in the dirstate (in whatever state) filtered by match """ - def verify(self, m1, m2, p1, narrow_matcher=None): + # TODO: finish adding typehints here, and to subclasses + + def verify( + self, m1, m2, p1: bytes, narrow_matcher: Optional[Any] = None + ) -> Iterator[bytes]: """ check the dirstate contents against the parent manifest and yield errors """