changeset 51816:a1a94d488e14

typing: add type hints to `mercurial.shelve` Pytype wasn't flagging anything here yet, but PyCharm was really unhappy about the usage of `state` objects being passed to various methods that accessed attrs on it, without any obvious attrs on the class because there's no contructor. Filling that out made PyCharm happy, and a few other things needed to be filled in to make that easier, so I made a pass over the whole file and filled in the trivial hints. The other repo, ui, context, matcher, and pats items can be filled in after the context and match modules are typed.
author Matt Harbison <matt_harbison@yahoo.com>
date Tue, 20 Aug 2024 22:34:51 -0400
parents 460e80488cf0
children adbfbbf9963f
files mercurial/shelve.py
diffstat 1 files changed, 101 insertions(+), 68 deletions(-) [+]
line wrap: on
line diff
--- a/mercurial/shelve.py	Tue Aug 20 18:30:47 2024 -0400
+++ b/mercurial/shelve.py	Tue Aug 20 22:34:51 2024 -0400
@@ -26,6 +26,16 @@
 import itertools
 import stat
 
+from typing import (
+    Any,
+    Dict,
+    Iterable,
+    Iterator,
+    List,
+    Sequence,
+    Tuple,
+)
+
 from .i18n import _
 from .node import (
     bin,
@@ -37,6 +47,7 @@
     bundle2,
     changegroup,
     cmdutil,
+    context as contextmod,
     discovery,
     error,
     exchange,
@@ -69,16 +80,16 @@
 
 
 class ShelfDir:
-    def __init__(self, repo, for_backups=False):
+    def __init__(self, repo, for_backups: bool = False) -> None:
         if for_backups:
             self.vfs = vfsmod.vfs(repo.vfs.join(backupdir))
         else:
             self.vfs = vfsmod.vfs(repo.vfs.join(shelvedir))
 
-    def get(self, name):
+    def get(self, name: bytes) -> "Shelf":
         return Shelf(self.vfs, name)
 
-    def listshelves(self):
+    def listshelves(self) -> List[Tuple[float, bytes]]:
         """return all shelves in repo as list of (time, name)"""
         try:
             names = self.vfs.listdir()
@@ -99,14 +110,14 @@
         return sorted(info, reverse=True)
 
 
-def _use_internal_phase(repo):
+def _use_internal_phase(repo) -> bool:
     return (
         phases.supportinternal(repo)
         and repo.ui.config(b'shelve', b'store') == b'internal'
     )
 
 
-def _target_phase(repo):
+def _target_phase(repo) -> int:
     return phases.internal if _use_internal_phase(repo) else phases.secret
 
 
@@ -118,29 +129,29 @@
     differences and lets you work with the shelf as a whole.
     """
 
-    def __init__(self, vfs, name):
+    def __init__(self, vfs: vfsmod.vfs, name: bytes) -> None:
         self.vfs = vfs
         self.name = name
 
-    def exists(self):
+    def exists(self) -> bool:
         return self._exists(b'.shelve') or self._exists(b'.patch', b'.hg')
 
-    def _exists(self, *exts):
+    def _exists(self, *exts: bytes) -> bool:
         return all(self.vfs.exists(self.name + ext) for ext in exts)
 
-    def mtime(self):
+    def mtime(self) -> float:
         try:
             return self._stat(b'.shelve')[stat.ST_MTIME]
         except FileNotFoundError:
             return self._stat(b'.patch')[stat.ST_MTIME]
 
-    def _stat(self, ext):
+    def _stat(self, ext: bytes):
         return self.vfs.stat(self.name + ext)
 
-    def writeinfo(self, info):
+    def writeinfo(self, info) -> None:
         scmutil.simplekeyvaluefile(self.vfs, self.name + b'.shelve').write(info)
 
-    def hasinfo(self):
+    def hasinfo(self) -> bool:
         return self.vfs.exists(self.name + b'.shelve')
 
     def readinfo(self):
@@ -148,7 +159,7 @@
             self.vfs, self.name + b'.shelve'
         ).read()
 
-    def writebundle(self, repo, bases, node):
+    def writebundle(self, repo, bases, node) -> None:
         cgversion = changegroup.safeversion(repo)
         if cgversion == b'01':
             btype = b'HG10BZ'
@@ -174,7 +185,7 @@
             compression=compression,
         )
 
-    def applybundle(self, repo, tr):
+    def applybundle(self, repo, tr) -> contextmod.changectx:
         filename = self.name + b'.hg'
         fp = self.vfs(filename)
         try:
@@ -197,10 +208,10 @@
         finally:
             fp.close()
 
-    def open_patch(self, mode=b'rb'):
+    def open_patch(self, mode: bytes = b'rb'):
         return self.vfs(self.name + b'.patch', mode)
 
-    def patch_from_node(self, repo, node):
+    def patch_from_node(self, repo, node) -> io.BytesIO:
         repo = repo.unfiltered()
         match = _optimized_match(repo, node)
         fp = io.BytesIO()
@@ -221,8 +232,8 @@
         except (FileNotFoundError, error.RepoLookupError):
             return self.open_patch()
 
-    def _backupfilename(self, backupvfs, filename):
-        def gennames(base):
+    def _backupfilename(self, backupvfs: vfsmod.vfs, filename: bytes) -> bytes:
+        def gennames(base: bytes):
             yield base
             base, ext = base.rsplit(b'.', 1)
             for i in itertools.count(1):
@@ -232,7 +243,10 @@
             if not backupvfs.exists(n):
                 return backupvfs.join(n)
 
-    def movetobackup(self, backupvfs):
+        # Help pytype- gennames() yields infinitely
+        raise error.ProgrammingError("unreachable")
+
+    def movetobackup(self, backupvfs: vfsmod.vfs) -> None:
         if not backupvfs.isdir():
             backupvfs.makedir()
         for suffix in shelvefileextensions:
@@ -243,7 +257,7 @@
                     self._backupfilename(backupvfs, filename),
                 )
 
-    def delete(self):
+    def delete(self) -> None:
         for ext in shelvefileextensions:
             self.vfs.tryunlink(self.name + b'.' + ext)
 
@@ -256,7 +270,7 @@
             return patch.changedfiles(ui, repo, filename)
 
 
-def _optimized_match(repo, node):
+def _optimized_match(repo, node: bytes):
     """
     Create a matcher so that prefetch doesn't attempt to fetch
     the entire repository pointlessly, and as an optimisation
@@ -272,6 +286,7 @@
     versions of a shelved state are possible and handles them appropriately.
     """
 
+    # Class-wide constants
     _version = 2
     _filename = b'shelvedstate'
     _keep = b'keep'
@@ -280,8 +295,19 @@
     _noactivebook = b':no-active-bookmark'
     _interactive = b'interactive'
 
+    # Per instance attrs
+    name: bytes
+    wctx: contextmod.workingctx
+    pendingctx: contextmod.changectx
+    parents: List[bytes]
+    nodestoremove: List[bytes]
+    branchtorestore: bytes
+    keep: bool
+    activebookmark: bytes
+    interactive: bool
+
     @classmethod
-    def _verifyandtransform(cls, d):
+    def _verifyandtransform(cls, d: Dict[bytes, Any]) -> None:
         """Some basic shelvestate syntactic verification and transformation"""
         try:
             d[b'originalwctx'] = bin(d[b'originalwctx'])
@@ -294,7 +320,7 @@
             raise error.CorruptedState(stringutil.forcebytestr(err))
 
     @classmethod
-    def _getversion(cls, repo):
+    def _getversion(cls, repo) -> int:
         """Read version information from shelvestate file"""
         fp = repo.vfs(cls._filename)
         try:
@@ -306,7 +332,7 @@
         return version
 
     @classmethod
-    def _readold(cls, repo):
+    def _readold(cls, repo) -> Dict[bytes, Any]:
         """Read the old position-based version of a shelvestate file"""
         # Order is important, because old shelvestate file uses it
         # to detemine values of fields (i.g. name is on the second line,
@@ -373,15 +399,15 @@
     def save(
         cls,
         repo,
-        name,
-        originalwctx,
-        pendingctx,
-        nodestoremove,
-        branchtorestore,
-        keep=False,
-        activebook=b'',
-        interactive=False,
-    ):
+        name: bytes,
+        originalwctx: contextmod.workingctx,
+        pendingctx: contextmod.changectx,
+        nodestoremove: List[bytes],
+        branchtorestore: bytes,
+        keep: bool = False,
+        activebook: bytes = b'',
+        interactive: bool = False,
+    ) -> None:
         info = {
             b"name": name,
             b"originalwctx": hex(originalwctx.node()),
@@ -399,11 +425,11 @@
         )
 
     @classmethod
-    def clear(cls, repo):
+    def clear(cls, repo) -> None:
         repo.vfs.unlinkpath(cls._filename, ignoremissing=True)
 
 
-def cleanupoldbackups(repo):
+def cleanupoldbackups(repo) -> None:
     maxbackups = repo.ui.configint(b'shelve', b'maxbackups')
     backup_dir = ShelfDir(repo, for_backups=True)
     hgfiles = backup_dir.listshelves()
@@ -418,19 +444,19 @@
         backup_dir.get(name).delete()
 
 
-def _backupactivebookmark(repo):
+def _backupactivebookmark(repo) -> bytes:
     activebookmark = repo._activebookmark
     if activebookmark:
         bookmarks.deactivate(repo)
     return activebookmark
 
 
-def _restoreactivebookmark(repo, mark):
+def _restoreactivebookmark(repo, mark) -> None:
     if mark:
         bookmarks.activate(repo, mark)
 
 
-def _aborttransaction(repo, tr):
+def _aborttransaction(repo, tr) -> None:
     """Abort current transaction for shelve/unshelve, but keep dirstate"""
     # disable the transaction invalidation of the dirstate, to preserve the
     # current change in memory.
@@ -456,7 +482,7 @@
     ds.setbranch(current_branch, None)
 
 
-def getshelvename(repo, parent, opts):
+def getshelvename(repo, parent, opts) -> bytes:
     """Decide on the name this shelve is going to have"""
 
     def gennames():
@@ -496,7 +522,7 @@
     return name
 
 
-def mutableancestors(ctx):
+def mutableancestors(ctx) -> Iterator[bytes]:
     """return all mutable ancestors for ctx (included)
 
     Much faster than the revset ancestors(ctx) & draft()"""
@@ -514,7 +540,7 @@
                     visit.append(parent)
 
 
-def getcommitfunc(extra, interactive, editor=False):
+def getcommitfunc(extra, interactive: bool, editor: bool = False):
     def commitfunc(ui, repo, message, match, opts):
         hasmq = hasattr(repo, 'mq')
         if hasmq:
@@ -550,7 +576,7 @@
     return interactivecommitfunc if interactive else commitfunc
 
 
-def _nothingtoshelvemessaging(ui, repo, pats, opts):
+def _nothingtoshelvemessaging(ui, repo, pats, opts) -> None:
     stat = repo.status(match=scmutil.match(repo[None], pats, opts))
     if stat.deleted:
         ui.status(
@@ -561,7 +587,7 @@
         ui.status(_(b"nothing changed\n"))
 
 
-def _shelvecreatedcommit(repo, node, name, match):
+def _shelvecreatedcommit(repo, node: bytes, name: bytes, match) -> None:
     info = {b'node': hex(node)}
     shelf = ShelfDir(repo).get(name)
     shelf.writeinfo(info)
@@ -573,14 +599,14 @@
         )
 
 
-def _includeunknownfiles(repo, pats, opts, extra):
+def _includeunknownfiles(repo, pats, opts, extra) -> None:
     s = repo.status(match=scmutil.match(repo[None], pats, opts), unknown=True)
     if s.unknown:
         extra[b'shelve_unknown'] = b'\0'.join(s.unknown)
         repo[None].add(s.unknown)
 
 
-def _finishshelve(repo, tr):
+def _finishshelve(repo, tr) -> None:
     if _use_internal_phase(repo):
         tr.close()
     else:
@@ -675,7 +701,7 @@
         lockmod.release(tr, lock)
 
 
-def _isbareshelve(pats, opts):
+def _isbareshelve(pats, opts) -> bool:
     return (
         not pats
         and not opts.get(b'interactive', False)
@@ -684,11 +710,11 @@
     )
 
 
-def _iswctxonnewbranch(repo):
+def _iswctxonnewbranch(repo) -> bool:
     return repo[None].branch() != repo[b'.'].branch()
 
 
-def cleanupcmd(ui, repo):
+def cleanupcmd(ui, repo) -> None:
     """subcommand that deletes all shelves"""
 
     with repo.wlock():
@@ -699,7 +725,7 @@
             cleanupoldbackups(repo)
 
 
-def deletecmd(ui, repo, pats):
+def deletecmd(ui, repo, pats) -> None:
     """subcommand that deletes a specific shelve"""
     if not pats:
         raise error.InputError(_(b'no shelved changes specified!'))
@@ -715,7 +741,7 @@
             cleanupoldbackups(repo)
 
 
-def listcmd(ui, repo, pats, opts):
+def listcmd(ui, repo, pats: Iterable[bytes], opts) -> None:
     """subcommand that displays the list of shelves"""
     pats = set(pats)
     width = 80
@@ -762,7 +788,7 @@
                     ui.write(chunk, label=label)
 
 
-def patchcmds(ui, repo, pats, opts):
+def patchcmds(ui, repo, pats: Sequence[bytes], opts) -> None:
     """subcommand that displays shelves"""
     shelf_dir = ShelfDir(repo)
     if len(pats) == 0:
@@ -779,7 +805,7 @@
     listcmd(ui, repo, pats, opts)
 
 
-def checkparents(repo, state):
+def checkparents(repo, state: shelvedstate) -> None:
     """check parent while resuming an unshelve"""
     if state.parents != repo.dirstate.parents():
         raise error.Abort(
@@ -787,7 +813,7 @@
         )
 
 
-def _loadshelvedstate(ui, repo, opts):
+def _loadshelvedstate(ui, repo, opts) -> shelvedstate:
     try:
         state = shelvedstate.load(repo)
         if opts.get(b'keep') is None:
@@ -819,7 +845,7 @@
             )
 
 
-def unshelveabort(ui, repo, state):
+def unshelveabort(ui, repo, state: shelvedstate) -> None:
     """subcommand that abort an in-progress unshelve"""
     with repo.lock():
         try:
@@ -838,14 +864,14 @@
             ui.warn(_(b"unshelve of '%s' aborted\n") % state.name)
 
 
-def hgabortunshelve(ui, repo):
+def hgabortunshelve(ui, repo) -> None:
     """logic to  abort unshelve using 'hg abort"""
     with repo.wlock():
         state = _loadshelvedstate(ui, repo, {b'abort': True})
         return unshelveabort(ui, repo, state)
 
 
-def mergefiles(ui, repo, wctx, shelvectx):
+def mergefiles(ui, repo, wctx, shelvectx) -> None:
     """updates to wctx and merges the changes from shelvectx into the
     dirstate."""
     with ui.configoverride({(b'ui', b'quiet'): True}):
@@ -853,7 +879,7 @@
         cmdutil.revert(ui, repo, shelvectx)
 
 
-def restorebranch(ui, repo, branchtorestore):
+def restorebranch(ui, repo, branchtorestore: bytes) -> None:
     if branchtorestore and branchtorestore != repo.dirstate.branch():
         repo.dirstate.setbranch(branchtorestore, repo.currenttransaction())
         ui.status(
@@ -861,7 +887,7 @@
         )
 
 
-def unshelvecleanup(ui, repo, name, opts):
+def unshelvecleanup(ui, repo, name: bytes, opts) -> None:
     """remove related files after an unshelve"""
     if not opts.get(b'keep'):
         backupvfs = vfsmod.vfs(repo.vfs.join(backupdir))
@@ -869,7 +895,7 @@
         cleanupoldbackups(repo)
 
 
-def unshelvecontinue(ui, repo, state, opts):
+def unshelvecontinue(ui, repo, state: shelvedstate, opts) -> None:
     """subcommand to continue an in-progress unshelve"""
     # We're finishing off a merge. First parent is our original
     # parent, second is the temporary "fake" commit we're unshelving.
@@ -927,7 +953,7 @@
         ui.status(_(b"unshelve of '%s' complete\n") % state.name)
 
 
-def hgcontinueunshelve(ui, repo):
+def hgcontinueunshelve(ui, repo) -> None:
     """logic to resume unshelve using 'hg continue'"""
     with repo.wlock():
         state = _loadshelvedstate(ui, repo, {b'continue': True})
@@ -959,7 +985,7 @@
     return tmpwctx, addedbefore
 
 
-def _unshelverestorecommit(ui, repo, tr, basename):
+def _unshelverestorecommit(ui, repo, tr, basename: bytes):
     """Recreate commit in the repository during the unshelve"""
     repo = repo.unfiltered()
     node = None
@@ -980,7 +1006,9 @@
     return repo, shelvectx
 
 
-def _createunshelvectx(ui, repo, shelvectx, basename, interactive, opts):
+def _createunshelvectx(
+    ui, repo, shelvectx, basename: bytes, interactive: bool, opts
+) -> Tuple[bytes, bool]:
     """Handles the creation of unshelve commit and updates the shelve if it
     was partially unshelved.
 
@@ -1042,7 +1070,7 @@
     opts,
     tr,
     oldtiprev,
-    basename,
+    basename: bytes,
     pctx,
     tmpwctx,
     shelvectx,
@@ -1113,7 +1141,7 @@
     return shelvectx, ispartialunshelve
 
 
-def _forgetunknownfiles(repo, shelvectx, addedbefore):
+def _forgetunknownfiles(repo, shelvectx, addedbefore) -> None:
     # Forget any files that were unknown before the shelve, unknown before
     # unshelve started, but are now added.
     shelveunknown = shelvectx.extra().get(b'shelve_unknown')
@@ -1125,7 +1153,7 @@
     repo[None].forget(toforget)
 
 
-def _finishunshelve(repo, oldtiprev, tr, activebookmark):
+def _finishunshelve(repo, oldtiprev, tr, activebookmark) -> None:
     _restoreactivebookmark(repo, activebookmark)
     # We used to manually strip the commit to update inmemory structure and
     # prevent some issue around hooks. This no longer seems to be the case, so
@@ -1133,7 +1161,7 @@
     _aborttransaction(repo, tr)
 
 
-def _checkunshelveuntrackedproblems(ui, repo, shelvectx):
+def _checkunshelveuntrackedproblems(ui, repo, shelvectx) -> None:
     """Check potential problems which may result from working
     copy having untracked changes."""
     wcdeleted = set(repo.status().deleted)
@@ -1145,7 +1173,7 @@
         raise error.Abort(m, hint=hint)
 
 
-def unshelvecmd(ui, repo, *shelved, **opts):
+def unshelvecmd(ui, repo, *shelved, **opts) -> None:
     opts = pycompat.byteskwargs(opts)
     abortf = opts.get(b'abort')
     continuef = opts.get(b'continue')
@@ -1182,6 +1210,11 @@
             )
         elif continuef:
             return unshelvecontinue(ui, repo, state, opts)
+        else:
+            # Unreachable code, but help type checkers not think that
+            # 'basename' may be used before initialization when checking
+            # ShelfDir below.
+            raise error.ProgrammingError("neither abort nor continue specified")
     elif len(shelved) > 1:
         raise error.InputError(_(b'can only unshelve one change at a time'))
     elif not shelved:
@@ -1199,7 +1232,7 @@
     return _dounshelve(ui, repo, basename, opts)
 
 
-def _dounshelve(ui, repo, basename, opts):
+def _dounshelve(ui, repo, basename: bytes, opts) -> None:
     repo = repo.unfiltered()
     lock = tr = None
     try: