changeset 51892:fa9e8a6521c1

typing: manually add type annotations to `mercurial/vfs.py` This isn't everything, but hopefully it's close enough to hack on a protocol class.
author Matt Harbison <matt_harbison@yahoo.com>
date Fri, 20 Sep 2024 20:16:12 -0400
parents ad83e4f9b40e
children 22e1924e9402
files mercurial/vfs.py
diffstat 1 files changed, 71 insertions(+), 40 deletions(-) [+]
line wrap: on
line diff
--- a/mercurial/vfs.py	Fri Sep 20 16:36:28 2024 -0400
+++ b/mercurial/vfs.py	Fri Sep 20 20:16:12 2024 -0400
@@ -17,9 +17,12 @@
 
 from typing import (
     Any,
+    BinaryIO,
+    Callable,
     Iterable,
     Iterator,
     List,
+    MutableMapping,
     Optional,
     Tuple,
     Type,
@@ -36,13 +39,18 @@
 )
 
 if typing.TYPE_CHECKING:
+    from . import (
+        ui as uimod,
+    )
+
     _Tbackgroundfilecloser = TypeVar(
         '_Tbackgroundfilecloser', bound='backgroundfilecloser'
     )
     _Tclosewrapbase = TypeVar('_Tclosewrapbase', bound='closewrapbase')
+    _OnErrorFn = Callable[[Exception], Optional[object]]
 
 
-def _avoidambig(path: bytes, oldstat) -> None:
+def _avoidambig(path: bytes, oldstat: util.filestat) -> None:
     """Avoid file stat ambiguity forcibly
 
     This function causes copying ``path`` file, if it is owned by
@@ -78,7 +86,7 @@
         ...
 
     @abc.abstractmethod
-    def _auditpath(self, path: bytes, mode: bytes) -> Any:
+    def _auditpath(self, path: bytes, mode: bytes) -> None:
         ...
 
     @abc.abstractmethod
@@ -93,7 +101,7 @@
             pass
         return b""
 
-    def tryreadlines(self, path: bytes, mode: bytes = b'rb') -> Any:
+    def tryreadlines(self, path: bytes, mode: bytes = b'rb') -> List[bytes]:
         '''gracefully return an empty array for missing files'''
         try:
             return self.readlines(path, mode=mode)
@@ -115,12 +123,12 @@
         with self(path, b'rb') as fp:
             return fp.read()
 
-    def readlines(self, path: bytes, mode: bytes = b'rb') -> Any:
+    def readlines(self, path: bytes, mode: bytes = b'rb') -> List[bytes]:
         with self(path, mode=mode) as fp:
             return fp.readlines()
 
     def write(
-        self, path: bytes, data: bytes, backgroundclose=False, **kwargs
+        self, path: bytes, data: bytes, backgroundclose: bool = False, **kwargs
     ) -> int:
         with self(path, b'wb', backgroundclose=backgroundclose, **kwargs) as fp:
             return fp.write(data)
@@ -130,7 +138,7 @@
         path: bytes,
         data: Iterable[bytes],
         mode: bytes = b'wb',
-        notindexed=False,
+        notindexed: bool = False,
     ) -> None:
         with self(path, mode=mode, notindexed=notindexed) as fp:
             return fp.writelines(data)
@@ -157,7 +165,7 @@
     def exists(self, path: Optional[bytes] = None) -> bool:
         return os.path.exists(self.join(path))
 
-    def fstat(self, fp) -> os.stat_result:
+    def fstat(self, fp: BinaryIO) -> os.stat_result:
         return util.fstat(fp)
 
     def isdir(self, path: Optional[bytes] = None) -> bool:
@@ -249,7 +257,7 @@
     ) -> None:
         return util.makedirs(self.join(path), mode)
 
-    def makelock(self, info, path: bytes) -> None:
+    def makelock(self, info: bytes, path: bytes) -> None:
         return util.makelock(info, self.join(path))
 
     def mkdir(self, path: Optional[bytes] = None) -> None:
@@ -270,6 +278,12 @@
         else:
             return fd, fname
 
+    # TODO: This doesn't match osutil.listdir().  stat=False in pure;
+    #  non-optional bool in cext.  'skip' is bool if we trust cext, or bytes
+    #  going by how pure uses it.  Also, cext returns a custom stat structure.
+    #  from cext.osutil.pyi:
+    #
+    #     path: bytes, st: bool, skip: Optional[bool]
     def readdir(
         self, path: Optional[bytes] = None, stat=None, skip=None
     ) -> Any:
@@ -278,7 +292,7 @@
     def readlock(self, path: bytes) -> bytes:
         return util.readlock(self.join(path))
 
-    def rename(self, src: bytes, dst: bytes, checkambig=False) -> None:
+    def rename(self, src: bytes, dst: bytes, checkambig: bool = False) -> None:
         """Rename from src to dst
 
         checkambig argument is used with util.filestat, and is useful
@@ -312,7 +326,10 @@
         return os.rmdir(self.join(path))
 
     def rmtree(
-        self, path: Optional[bytes] = None, ignore_errors=False, forcibly=False
+        self,
+        path: Optional[bytes] = None,
+        ignore_errors: bool = False,
+        forcibly: bool = False,
     ) -> None:
         """Remove a directory tree recursively
 
@@ -320,7 +337,7 @@
         """
         if forcibly:
 
-            def onexc(function, path, excinfo):
+            def onexc(function, path: bytes, excinfo):
                 if function is not os.remove:
                     raise
                 # read-only files cannot be unlinked under Windows
@@ -357,17 +374,23 @@
         return util.tryunlink(self.join(path))
 
     def unlinkpath(
-        self, path: Optional[bytes] = None, ignoremissing=False, rmdir=True
+        self,
+        path: Optional[bytes] = None,
+        ignoremissing: bool = False,
+        rmdir: bool = True,
     ) -> None:
         return util.unlinkpath(
             self.join(path), ignoremissing=ignoremissing, rmdir=rmdir
         )
 
-    def utime(self, path: Optional[bytes] = None, t=None) -> None:
+    # TODO: could be Tuple[float, float] too.
+    def utime(
+        self, path: Optional[bytes] = None, t: Optional[Tuple[int, int]] = None
+    ) -> None:
         return os.utime(self.join(path), t)
 
     def walk(
-        self, path: Optional[bytes] = None, onerror=None
+        self, path: Optional[bytes] = None, onerror: Optional[_OnErrorFn] = None
     ) -> Iterator[Tuple[bytes, List[bytes], List[bytes]]]:
         """Yield (dirpath, dirs, files) tuple for each directory under path
 
@@ -386,7 +409,7 @@
 
     @contextlib.contextmanager
     def backgroundclosing(
-        self, ui, expectedcount=-1
+        self, ui: uimod.ui, expectedcount: int = -1
     ) -> Iterator[Optional[backgroundfilecloser]]:
         """Allow files to be closed asynchronously.
 
@@ -417,7 +440,7 @@
                     None  # pytype: disable=attribute-error
                 )
 
-    def register_file(self, path) -> None:
+    def register_file(self, path: bytes) -> None:
         """generic hook point to lets fncache steer its stew"""
 
 
@@ -432,13 +455,15 @@
     See pathutil.pathauditor() for details.
     """
 
+    createmode: Optional[int]
+
     def __init__(
         self,
         base: bytes,
-        audit=True,
-        cacheaudited=False,
-        expandpath=False,
-        realpath=False,
+        audit: bool = True,
+        cacheaudited: bool = False,
+        expandpath: bool = False,
+        realpath: bool = False,
     ) -> None:
         if expandpath:
             base = util.expandpath(base)
@@ -459,15 +484,15 @@
         return util.checklink(self.base)
 
     @util.propertycache
-    def _chmod(self):
+    def _chmod(self) -> bool:
         return util.checkexec(self.base)
 
-    def _fixfilemode(self, name) -> None:
+    def _fixfilemode(self, name: bytes) -> None:
         if self.createmode is None or not self._chmod:
             return
         os.chmod(name, self.createmode & 0o666)
 
-    def _auditpath(self, path, mode) -> None:
+    def _auditpath(self, path: bytes, mode: bytes) -> None:
         if self._audit:
             if os.path.isabs(path) and path.startswith(self.base):
                 path = os.path.relpath(path, self.base)
@@ -477,7 +502,9 @@
             self.audit(path, mode=mode)
 
     def isfileorlink_checkdir(
-        self, dircache, path: Optional[bytes] = None
+        self,
+        dircache: MutableMapping[bytes, bool],
+        path: Optional[bytes] = None,
     ) -> bool:
         """return True if the path is a regular file or a symlink and
         the directories along the path are "normal", that is
@@ -486,6 +513,8 @@
         Ignores the `_audit` setting, and checks the directories regardless.
         `dircache` is used to cache the directory checks.
         """
+        # TODO: Should be a None check on 'path', or shouldn't default to None
+        #  because of the immediate call to util.localpath().
         try:
             for prefix in pathutil.finddirs_rev_noroot(util.localpath(path)):
                 if prefix in dircache:
@@ -505,13 +534,13 @@
         self,
         path: bytes,
         mode: bytes = b"rb",
-        atomictemp=False,
-        notindexed=False,
-        backgroundclose=False,
-        checkambig=False,
-        auditpath=True,
-        makeparentdirs=True,
-    ) -> Any:
+        atomictemp: bool = False,
+        notindexed: bool = False,
+        backgroundclose: bool = False,
+        checkambig: bool = False,
+        auditpath: bool = True,
+        makeparentdirs: bool = True,
+    ) -> Any:  # TODO: should be BinaryIO if util.atomictempfile can be coersed
         """Open ``path`` file, which is relative to vfs root.
 
         By default, parent directories are created as needed. Newly created
@@ -650,14 +679,14 @@
 
 
 class proxyvfs(abstractvfs, abc.ABC):
-    def __init__(self, vfs: "vfs"):
+    def __init__(self, vfs: vfs) -> None:
         self.vfs = vfs
 
     @property
-    def createmode(self):
+    def createmode(self) -> Optional[int]:
         return self.vfs.createmode
 
-    def _auditpath(self, path, mode) -> None:
+    def _auditpath(self, path: bytes, mode: bytes) -> None:
         return self.vfs._auditpath(path, mode)
 
     @property
@@ -676,10 +705,11 @@
 class filtervfs(proxyvfs, abstractvfs):
     '''Wrapper vfs for filtering filenames with a function.'''
 
-    def __init__(self, vfs: "vfs", filter):
+    def __init__(self, vfs: vfs, filter) -> None:
         proxyvfs.__init__(self, vfs)
         self._filter = filter
 
+    # TODO: The return type should be BinaryIO
     def __call__(self, path: bytes, *args, **kwargs) -> Any:
         return self.vfs(self._filter(path), *args, **kwargs)
 
@@ -696,9 +726,10 @@
 class readonlyvfs(proxyvfs):
     '''Wrapper vfs preventing any writing.'''
 
-    def __init__(self, vfs: "vfs"):
+    def __init__(self, vfs: vfs) -> None:
         proxyvfs.__init__(self, vfs)
 
+    # TODO: The return type should be BinaryIO
     def __call__(self, path: bytes, mode: bytes = b'rb', *args, **kw) -> Any:
         if mode not in (b'r', b'rb'):
             raise error.Abort(_(b'this vfs is read only'))
@@ -717,13 +748,13 @@
     def __init__(self, fh) -> None:
         object.__setattr__(self, '_origfh', fh)
 
-    def __getattr__(self, attr) -> Any:
+    def __getattr__(self, attr: str) -> Any:
         return getattr(self._origfh, attr)
 
-    def __setattr__(self, attr, value) -> None:
+    def __setattr__(self, attr: str, value: Any) -> None:
         return setattr(self._origfh, attr, value)
 
-    def __delattr__(self, attr) -> None:
+    def __delattr__(self, attr: str) -> None:
         return delattr(self._origfh, attr)
 
     def __enter__(self: _Tclosewrapbase) -> _Tclosewrapbase:
@@ -759,7 +790,7 @@
 class backgroundfilecloser:
     """Coordinates background closing of file handles on multiple threads."""
 
-    def __init__(self, ui, expectedcount=-1) -> None:
+    def __init__(self, ui: uimod.ui, expectedcount: int = -1) -> None:
         self._running = False
         self._entered = False
         self._threads = []