--- a/mercurial/vfs.py Fri Sep 20 16:36:28 2024 -0400
+++ b/mercurial/vfs.py Fri Sep 20 20:16:12 2024 -0400
@@ -17,9 +17,12 @@
from typing import (
Any,
+ BinaryIO,
+ Callable,
Iterable,
Iterator,
List,
+ MutableMapping,
Optional,
Tuple,
Type,
@@ -36,13 +39,18 @@
)
if typing.TYPE_CHECKING:
+ from . import (
+ ui as uimod,
+ )
+
_Tbackgroundfilecloser = TypeVar(
'_Tbackgroundfilecloser', bound='backgroundfilecloser'
)
_Tclosewrapbase = TypeVar('_Tclosewrapbase', bound='closewrapbase')
+ _OnErrorFn = Callable[[Exception], Optional[object]]
-def _avoidambig(path: bytes, oldstat) -> None:
+def _avoidambig(path: bytes, oldstat: util.filestat) -> None:
"""Avoid file stat ambiguity forcibly
This function causes copying ``path`` file, if it is owned by
@@ -78,7 +86,7 @@
...
@abc.abstractmethod
- def _auditpath(self, path: bytes, mode: bytes) -> Any:
+ def _auditpath(self, path: bytes, mode: bytes) -> None:
...
@abc.abstractmethod
@@ -93,7 +101,7 @@
pass
return b""
- def tryreadlines(self, path: bytes, mode: bytes = b'rb') -> Any:
+ def tryreadlines(self, path: bytes, mode: bytes = b'rb') -> List[bytes]:
'''gracefully return an empty array for missing files'''
try:
return self.readlines(path, mode=mode)
@@ -115,12 +123,12 @@
with self(path, b'rb') as fp:
return fp.read()
- def readlines(self, path: bytes, mode: bytes = b'rb') -> Any:
+ def readlines(self, path: bytes, mode: bytes = b'rb') -> List[bytes]:
with self(path, mode=mode) as fp:
return fp.readlines()
def write(
- self, path: bytes, data: bytes, backgroundclose=False, **kwargs
+ self, path: bytes, data: bytes, backgroundclose: bool = False, **kwargs
) -> int:
with self(path, b'wb', backgroundclose=backgroundclose, **kwargs) as fp:
return fp.write(data)
@@ -130,7 +138,7 @@
path: bytes,
data: Iterable[bytes],
mode: bytes = b'wb',
- notindexed=False,
+ notindexed: bool = False,
) -> None:
with self(path, mode=mode, notindexed=notindexed) as fp:
return fp.writelines(data)
@@ -157,7 +165,7 @@
def exists(self, path: Optional[bytes] = None) -> bool:
return os.path.exists(self.join(path))
- def fstat(self, fp) -> os.stat_result:
+ def fstat(self, fp: BinaryIO) -> os.stat_result:
return util.fstat(fp)
def isdir(self, path: Optional[bytes] = None) -> bool:
@@ -249,7 +257,7 @@
) -> None:
return util.makedirs(self.join(path), mode)
- def makelock(self, info, path: bytes) -> None:
+ def makelock(self, info: bytes, path: bytes) -> None:
return util.makelock(info, self.join(path))
def mkdir(self, path: Optional[bytes] = None) -> None:
@@ -270,6 +278,12 @@
else:
return fd, fname
+ # TODO: This doesn't match osutil.listdir(). stat=False in pure;
+ # non-optional bool in cext. 'skip' is bool if we trust cext, or bytes
+ # going by how pure uses it. Also, cext returns a custom stat structure.
+ # from cext.osutil.pyi:
+ #
+ # path: bytes, st: bool, skip: Optional[bool]
def readdir(
self, path: Optional[bytes] = None, stat=None, skip=None
) -> Any:
@@ -278,7 +292,7 @@
def readlock(self, path: bytes) -> bytes:
return util.readlock(self.join(path))
- def rename(self, src: bytes, dst: bytes, checkambig=False) -> None:
+ def rename(self, src: bytes, dst: bytes, checkambig: bool = False) -> None:
"""Rename from src to dst
checkambig argument is used with util.filestat, and is useful
@@ -312,7 +326,10 @@
return os.rmdir(self.join(path))
def rmtree(
- self, path: Optional[bytes] = None, ignore_errors=False, forcibly=False
+ self,
+ path: Optional[bytes] = None,
+ ignore_errors: bool = False,
+ forcibly: bool = False,
) -> None:
"""Remove a directory tree recursively
@@ -320,7 +337,7 @@
"""
if forcibly:
- def onexc(function, path, excinfo):
+ def onexc(function, path: bytes, excinfo):
if function is not os.remove:
raise
# read-only files cannot be unlinked under Windows
@@ -357,17 +374,23 @@
return util.tryunlink(self.join(path))
def unlinkpath(
- self, path: Optional[bytes] = None, ignoremissing=False, rmdir=True
+ self,
+ path: Optional[bytes] = None,
+ ignoremissing: bool = False,
+ rmdir: bool = True,
) -> None:
return util.unlinkpath(
self.join(path), ignoremissing=ignoremissing, rmdir=rmdir
)
- def utime(self, path: Optional[bytes] = None, t=None) -> None:
+ # TODO: could be Tuple[float, float] too.
+ def utime(
+ self, path: Optional[bytes] = None, t: Optional[Tuple[int, int]] = None
+ ) -> None:
return os.utime(self.join(path), t)
def walk(
- self, path: Optional[bytes] = None, onerror=None
+ self, path: Optional[bytes] = None, onerror: Optional[_OnErrorFn] = None
) -> Iterator[Tuple[bytes, List[bytes], List[bytes]]]:
"""Yield (dirpath, dirs, files) tuple for each directory under path
@@ -386,7 +409,7 @@
@contextlib.contextmanager
def backgroundclosing(
- self, ui, expectedcount=-1
+ self, ui: uimod.ui, expectedcount: int = -1
) -> Iterator[Optional[backgroundfilecloser]]:
"""Allow files to be closed asynchronously.
@@ -417,7 +440,7 @@
None # pytype: disable=attribute-error
)
- def register_file(self, path) -> None:
+ def register_file(self, path: bytes) -> None:
"""generic hook point to lets fncache steer its stew"""
@@ -432,13 +455,15 @@
See pathutil.pathauditor() for details.
"""
+ createmode: Optional[int]
+
def __init__(
self,
base: bytes,
- audit=True,
- cacheaudited=False,
- expandpath=False,
- realpath=False,
+ audit: bool = True,
+ cacheaudited: bool = False,
+ expandpath: bool = False,
+ realpath: bool = False,
) -> None:
if expandpath:
base = util.expandpath(base)
@@ -459,15 +484,15 @@
return util.checklink(self.base)
@util.propertycache
- def _chmod(self):
+ def _chmod(self) -> bool:
return util.checkexec(self.base)
- def _fixfilemode(self, name) -> None:
+ def _fixfilemode(self, name: bytes) -> None:
if self.createmode is None or not self._chmod:
return
os.chmod(name, self.createmode & 0o666)
- def _auditpath(self, path, mode) -> None:
+ def _auditpath(self, path: bytes, mode: bytes) -> None:
if self._audit:
if os.path.isabs(path) and path.startswith(self.base):
path = os.path.relpath(path, self.base)
@@ -477,7 +502,9 @@
self.audit(path, mode=mode)
def isfileorlink_checkdir(
- self, dircache, path: Optional[bytes] = None
+ self,
+ dircache: MutableMapping[bytes, bool],
+ path: Optional[bytes] = None,
) -> bool:
"""return True if the path is a regular file or a symlink and
the directories along the path are "normal", that is
@@ -486,6 +513,8 @@
Ignores the `_audit` setting, and checks the directories regardless.
`dircache` is used to cache the directory checks.
"""
+ # TODO: Should be a None check on 'path', or shouldn't default to None
+ # because of the immediate call to util.localpath().
try:
for prefix in pathutil.finddirs_rev_noroot(util.localpath(path)):
if prefix in dircache:
@@ -505,13 +534,13 @@
self,
path: bytes,
mode: bytes = b"rb",
- atomictemp=False,
- notindexed=False,
- backgroundclose=False,
- checkambig=False,
- auditpath=True,
- makeparentdirs=True,
- ) -> Any:
+ atomictemp: bool = False,
+ notindexed: bool = False,
+ backgroundclose: bool = False,
+ checkambig: bool = False,
+ auditpath: bool = True,
+ makeparentdirs: bool = True,
+ ) -> Any: # TODO: should be BinaryIO if util.atomictempfile can be coersed
"""Open ``path`` file, which is relative to vfs root.
By default, parent directories are created as needed. Newly created
@@ -650,14 +679,14 @@
class proxyvfs(abstractvfs, abc.ABC):
- def __init__(self, vfs: "vfs"):
+ def __init__(self, vfs: vfs) -> None:
self.vfs = vfs
@property
- def createmode(self):
+ def createmode(self) -> Optional[int]:
return self.vfs.createmode
- def _auditpath(self, path, mode) -> None:
+ def _auditpath(self, path: bytes, mode: bytes) -> None:
return self.vfs._auditpath(path, mode)
@property
@@ -676,10 +705,11 @@
class filtervfs(proxyvfs, abstractvfs):
'''Wrapper vfs for filtering filenames with a function.'''
- def __init__(self, vfs: "vfs", filter):
+ def __init__(self, vfs: vfs, filter) -> None:
proxyvfs.__init__(self, vfs)
self._filter = filter
+ # TODO: The return type should be BinaryIO
def __call__(self, path: bytes, *args, **kwargs) -> Any:
return self.vfs(self._filter(path), *args, **kwargs)
@@ -696,9 +726,10 @@
class readonlyvfs(proxyvfs):
'''Wrapper vfs preventing any writing.'''
- def __init__(self, vfs: "vfs"):
+ def __init__(self, vfs: vfs) -> None:
proxyvfs.__init__(self, vfs)
+ # TODO: The return type should be BinaryIO
def __call__(self, path: bytes, mode: bytes = b'rb', *args, **kw) -> Any:
if mode not in (b'r', b'rb'):
raise error.Abort(_(b'this vfs is read only'))
@@ -717,13 +748,13 @@
def __init__(self, fh) -> None:
object.__setattr__(self, '_origfh', fh)
- def __getattr__(self, attr) -> Any:
+ def __getattr__(self, attr: str) -> Any:
return getattr(self._origfh, attr)
- def __setattr__(self, attr, value) -> None:
+ def __setattr__(self, attr: str, value: Any) -> None:
return setattr(self._origfh, attr, value)
- def __delattr__(self, attr) -> None:
+ def __delattr__(self, attr: str) -> None:
return delattr(self._origfh, attr)
def __enter__(self: _Tclosewrapbase) -> _Tclosewrapbase:
@@ -759,7 +790,7 @@
class backgroundfilecloser:
"""Coordinates background closing of file handles on multiple threads."""
- def __init__(self, ui, expectedcount=-1) -> None:
+ def __init__(self, ui: uimod.ui, expectedcount: int = -1) -> None:
self._running = False
self._entered = False
self._threads = []