changeset 49722:cc9a60050a07

typing: add basic type hints to vfs.py Again, there's a lot more that could be done, but this sticks to the obviously correct stuff that is related to primitives or `vfs` objects. Hopefully this helps smoke out more path related bytes vs str issues in TortoiseHg. PyCharm seems smart enough to apply hints from annotated superclass functions, but pytype isn't (according to the *.pyi file generated), so those are annotated too. There was some discussion about changing the default path arg from `None` to `b''` in order to avoid the more verbose `Optional` declarations. This would be more in line with `os.path.join()` (which rejects `None`, but ignores empty strings), and still not change the behavior for callers still passing `None` (because the check is `if path` instead of an explicit check for `None`). But I didn't want to hold this up while discussing that, so this documents what _is_.
author Matt Harbison <matt_harbison@yahoo.com>
date Wed, 02 Nov 2022 17:30:57 -0400
parents c4f07a011714
children 2506c3ac73f4
files mercurial/vfs.py
diffstat 1 files changed, 78 insertions(+), 58 deletions(-) [+]
line wrap: on
line diff
--- a/mercurial/vfs.py	Fri Nov 04 17:35:44 2022 -0400
+++ b/mercurial/vfs.py	Wed Nov 02 17:30:57 2022 -0400
@@ -11,6 +11,10 @@
 import stat
 import threading
 
+from typing import (
+    Optional,
+)
+
 from .i18n import _
 from .pycompat import (
     delattr,
@@ -26,7 +30,7 @@
 )
 
 
-def _avoidambig(path, oldstat):
+def _avoidambig(path: bytes, oldstat):
     """Avoid file stat ambiguity forcibly
 
     This function causes copying ``path`` file, if it is owned by
@@ -60,16 +64,17 @@
         '''Prevent instantiation; don't call this from subclasses.'''
         raise NotImplementedError('attempted instantiating ' + str(type(self)))
 
-    def __call__(self, path, mode=b'rb', **kwargs):
+    # TODO: type return, which is util.posixfile wrapped by a proxy
+    def __call__(self, path: bytes, mode: bytes = b'rb', **kwargs):
         raise NotImplementedError
 
-    def _auditpath(self, path, mode):
+    def _auditpath(self, path: bytes, mode: bytes):
         raise NotImplementedError
 
-    def join(self, path, *insidef):
+    def join(self, path: Optional[bytes], *insidef: bytes) -> bytes:
         raise NotImplementedError
 
-    def tryread(self, path):
+    def tryread(self, path: bytes) -> bytes:
         '''gracefully return an empty string for missing files'''
         try:
             return self.read(path)
@@ -77,7 +82,7 @@
             pass
         return b""
 
-    def tryreadlines(self, path, mode=b'rb'):
+    def tryreadlines(self, path: bytes, mode: bytes = b'rb'):
         '''gracefully return an empty array for missing files'''
         try:
             return self.readlines(path, mode=mode)
@@ -95,57 +100,61 @@
         """
         return self.__call__
 
-    def read(self, path):
+    def read(self, path: bytes) -> bytes:
         with self(path, b'rb') as fp:
             return fp.read()
 
-    def readlines(self, path, mode=b'rb'):
+    def readlines(self, path: bytes, mode: bytes = b'rb'):
         with self(path, mode=mode) as fp:
             return fp.readlines()
 
-    def write(self, path, data, backgroundclose=False, **kwargs):
+    def write(
+        self, path: bytes, data: bytes, backgroundclose=False, **kwargs
+    ) -> int:
         with self(path, b'wb', backgroundclose=backgroundclose, **kwargs) as fp:
             return fp.write(data)
 
-    def writelines(self, path, data, mode=b'wb', notindexed=False):
+    def writelines(
+        self, path: bytes, data: bytes, mode: bytes = b'wb', notindexed=False
+    ) -> None:
         with self(path, mode=mode, notindexed=notindexed) as fp:
             return fp.writelines(data)
 
-    def append(self, path, data):
+    def append(self, path: bytes, data: bytes) -> int:
         with self(path, b'ab') as fp:
             return fp.write(data)
 
-    def basename(self, path):
+    def basename(self, path: bytes) -> bytes:
         """return base element of a path (as os.path.basename would do)
 
         This exists to allow handling of strange encoding if needed."""
         return os.path.basename(path)
 
-    def chmod(self, path, mode):
+    def chmod(self, path: bytes, mode: int) -> None:
         return os.chmod(self.join(path), mode)
 
-    def dirname(self, path):
+    def dirname(self, path: bytes) -> bytes:
         """return dirname element of a path (as os.path.dirname would do)
 
         This exists to allow handling of strange encoding if needed."""
         return os.path.dirname(path)
 
-    def exists(self, path=None):
+    def exists(self, path: Optional[bytes] = None) -> bool:
         return os.path.exists(self.join(path))
 
     def fstat(self, fp):
         return util.fstat(fp)
 
-    def isdir(self, path=None):
+    def isdir(self, path: Optional[bytes] = None) -> bool:
         return os.path.isdir(self.join(path))
 
-    def isfile(self, path=None):
+    def isfile(self, path: Optional[bytes] = None) -> bool:
         return os.path.isfile(self.join(path))
 
-    def islink(self, path=None):
+    def islink(self, path: Optional[bytes] = None) -> bool:
         return os.path.islink(self.join(path))
 
-    def isfileorlink(self, path=None):
+    def isfileorlink(self, path: Optional[bytes] = None) -> bool:
         """return whether path is a regular file or a symlink
 
         Unlike isfile, this doesn't follow symlinks."""
@@ -156,7 +165,7 @@
         mode = st.st_mode
         return stat.S_ISREG(mode) or stat.S_ISLNK(mode)
 
-    def _join(self, *paths):
+    def _join(self, *paths: bytes) -> bytes:
         root_idx = 0
         for idx, p in enumerate(paths):
             if os.path.isabs(p) or p.startswith(self._dir_sep):
@@ -166,41 +175,48 @@
         paths = [p for p in paths if p]
         return self._dir_sep.join(paths)
 
-    def reljoin(self, *paths):
+    def reljoin(self, *paths: bytes) -> bytes:
         """join various elements of a path together (as os.path.join would do)
 
         The vfs base is not injected so that path stay relative. This exists
         to allow handling of strange encoding if needed."""
         return self._join(*paths)
 
-    def split(self, path):
+    def split(self, path: bytes):
         """split top-most element of a path (as os.path.split would do)
 
         This exists to allow handling of strange encoding if needed."""
         return os.path.split(path)
 
-    def lexists(self, path=None):
+    def lexists(self, path: Optional[bytes] = None) -> bool:
         return os.path.lexists(self.join(path))
 
-    def lstat(self, path=None):
+    def lstat(self, path: Optional[bytes] = None):
         return os.lstat(self.join(path))
 
-    def listdir(self, path=None):
+    def listdir(self, path: Optional[bytes] = None):
         return os.listdir(self.join(path))
 
-    def makedir(self, path=None, notindexed=True):
+    def makedir(self, path: Optional[bytes] = None, notindexed=True):
         return util.makedir(self.join(path), notindexed)
 
-    def makedirs(self, path=None, mode=None):
+    def makedirs(
+        self, path: Optional[bytes] = None, mode: Optional[int] = None
+    ):
         return util.makedirs(self.join(path), mode)
 
-    def makelock(self, info, path):
+    def makelock(self, info, path: bytes):
         return util.makelock(info, self.join(path))
 
-    def mkdir(self, path=None):
+    def mkdir(self, path: Optional[bytes] = None):
         return os.mkdir(self.join(path))
 
-    def mkstemp(self, suffix=b'', prefix=b'tmp', dir=None):
+    def mkstemp(
+        self,
+        suffix: bytes = b'',
+        prefix: bytes = b'tmp',
+        dir: Optional[bytes] = None,
+    ):
         fd, name = pycompat.mkstemp(
             suffix=suffix, prefix=prefix, dir=self.join(dir)
         )
@@ -210,13 +226,13 @@
         else:
             return fd, fname
 
-    def readdir(self, path=None, stat=None, skip=None):
+    def readdir(self, path: Optional[bytes] = None, stat=None, skip=None):
         return util.listdir(self.join(path), stat, skip)
 
-    def readlock(self, path):
+    def readlock(self, path: bytes) -> bytes:
         return util.readlock(self.join(path))
 
-    def rename(self, src, dst, checkambig=False):
+    def rename(self, src: bytes, dst: bytes, checkambig=False):
         """Rename from src to dst
 
         checkambig argument is used with util.filestat, and is useful
@@ -238,18 +254,20 @@
             return ret
         return util.rename(srcpath, dstpath)
 
-    def readlink(self, path):
+    def readlink(self, path: bytes) -> bytes:
         return util.readlink(self.join(path))
 
-    def removedirs(self, path=None):
+    def removedirs(self, path: Optional[bytes] = None):
         """Remove a leaf directory and all empty intermediate ones"""
         return util.removedirs(self.join(path))
 
-    def rmdir(self, path=None):
+    def rmdir(self, path: Optional[bytes] = None):
         """Remove an empty directory."""
         return os.rmdir(self.join(path))
 
-    def rmtree(self, path=None, ignore_errors=False, forcibly=False):
+    def rmtree(
+        self, path: Optional[bytes] = None, ignore_errors=False, forcibly=False
+    ):
         """Remove a directory tree recursively
 
         If ``forcibly``, this tries to remove READ-ONLY files, too.
@@ -272,28 +290,30 @@
             self.join(path), ignore_errors=ignore_errors, onerror=onerror
         )
 
-    def setflags(self, path, l, x):
+    def setflags(self, path: bytes, l: bool, x: bool):
         return util.setflags(self.join(path), l, x)
 
-    def stat(self, path=None):
+    def stat(self, path: Optional[bytes] = None):
         return os.stat(self.join(path))
 
-    def unlink(self, path=None):
+    def unlink(self, path: Optional[bytes] = None):
         return util.unlink(self.join(path))
 
-    def tryunlink(self, path=None):
+    def tryunlink(self, path: Optional[bytes] = None):
         """Attempt to remove a file, ignoring missing file errors."""
         util.tryunlink(self.join(path))
 
-    def unlinkpath(self, path=None, ignoremissing=False, rmdir=True):
+    def unlinkpath(
+        self, path: Optional[bytes] = None, ignoremissing=False, rmdir=True
+    ):
         return util.unlinkpath(
             self.join(path), ignoremissing=ignoremissing, rmdir=rmdir
         )
 
-    def utime(self, path=None, t=None):
+    def utime(self, path: Optional[bytes] = None, t=None):
         return os.utime(self.join(path), t)
 
-    def walk(self, path=None, onerror=None):
+    def walk(self, path: Optional[bytes] = None, onerror=None):
         """Yield (dirpath, dirs, files) tuple for each directories under path
 
         ``dirpath`` is relative one from the root of this vfs. This
@@ -360,7 +380,7 @@
 
     def __init__(
         self,
-        base,
+        base: bytes,
         audit=True,
         cacheaudited=False,
         expandpath=False,
@@ -381,7 +401,7 @@
         self.options = {}
 
     @util.propertycache
-    def _cansymlink(self):
+    def _cansymlink(self) -> bool:
         return util.checklink(self.base)
 
     @util.propertycache
@@ -393,7 +413,7 @@
             return
         os.chmod(name, self.createmode & 0o666)
 
-    def _auditpath(self, path, mode):
+    def _auditpath(self, path, mode) -> None:
         if self._audit:
             if os.path.isabs(path) and path.startswith(self.base):
                 path = os.path.relpath(path, self.base)
@@ -404,8 +424,8 @@
 
     def __call__(
         self,
-        path,
-        mode=b"r",
+        path: bytes,
+        mode: bytes = b"r",
         atomictemp=False,
         notindexed=False,
         backgroundclose=False,
@@ -518,7 +538,7 @@
 
         return fp
 
-    def symlink(self, src, dst):
+    def symlink(self, src: bytes, dst: bytes) -> None:
         self.audit(dst)
         linkname = self.join(dst)
         util.tryunlink(linkname)
@@ -538,7 +558,7 @@
         else:
             self.write(dst, src)
 
-    def join(self, path, *insidef):
+    def join(self, path: Optional[bytes], *insidef: bytes) -> bytes:
         if path:
             parts = [self.base, path]
             parts.extend(insidef)
@@ -551,7 +571,7 @@
 
 
 class proxyvfs(abstractvfs):
-    def __init__(self, vfs):
+    def __init__(self, vfs: "vfs"):
         self.vfs = vfs
 
     def _auditpath(self, path, mode):
@@ -569,14 +589,14 @@
 class filtervfs(proxyvfs, abstractvfs):
     '''Wrapper vfs for filtering filenames with a function.'''
 
-    def __init__(self, vfs, filter):
+    def __init__(self, vfs: "vfs", filter):
         proxyvfs.__init__(self, vfs)
         self._filter = filter
 
-    def __call__(self, path, *args, **kwargs):
+    def __call__(self, path: bytes, *args, **kwargs):
         return self.vfs(self._filter(path), *args, **kwargs)
 
-    def join(self, path, *insidef):
+    def join(self, path: Optional[bytes], *insidef: bytes) -> bytes:
         if path:
             return self.vfs.join(self._filter(self.vfs.reljoin(path, *insidef)))
         else:
@@ -589,15 +609,15 @@
 class readonlyvfs(proxyvfs):
     '''Wrapper vfs preventing any writing.'''
 
-    def __init__(self, vfs):
+    def __init__(self, vfs: "vfs"):
         proxyvfs.__init__(self, vfs)
 
-    def __call__(self, path, mode=b'r', *args, **kw):
+    def __call__(self, path: bytes, mode: bytes = b'r', *args, **kw):
         if mode not in (b'r', b'rb'):
             raise error.Abort(_(b'this vfs is read only'))
         return self.vfs(path, mode, *args, **kw)
 
-    def join(self, path, *insidef):
+    def join(self, path: Optional[bytes], *insidef: bytes) -> bytes:
         return self.vfs.join(path, *insidef)