--- a/mercurial/vfs.py Fri Sep 20 01:10:17 2024 -0400
+++ b/mercurial/vfs.py Fri Sep 20 13:38:13 2024 -0400
@@ -13,9 +13,17 @@
import shutil
import stat
import threading
+import typing
from typing import (
+ Any,
+ Iterable,
+ Iterator,
+ List,
Optional,
+ Tuple,
+ Type,
+ TypeVar,
)
from .i18n import _
@@ -27,8 +35,14 @@
util,
)
+if typing.TYPE_CHECKING:
+ _Tbackgroundfilecloser = TypeVar(
+ '_Tbackgroundfilecloser', bound='backgroundfilecloser'
+ )
+ _Tclosewrapbase = TypeVar('_Tclosewrapbase', bound='closewrapbase')
-def _avoidambig(path: bytes, oldstat):
+
+def _avoidambig(path: bytes, oldstat) -> None:
"""Avoid file stat ambiguity forcibly
This function causes copying ``path`` file, if it is owned by
@@ -56,15 +70,15 @@
# abstract the use of `/` and make it work transparently. For consistency
# vfs will always use `/` when joining. This avoids some confusion in
# encoded vfs (see issue6546)
- _dir_sep = b'/'
+ _dir_sep: bytes = b'/'
# TODO: type return, which is util.posixfile wrapped by a proxy
@abc.abstractmethod
- def __call__(self, path: bytes, mode: bytes = b'rb', **kwargs):
+ def __call__(self, path: bytes, mode: bytes = b'rb', **kwargs) -> Any:
...
@abc.abstractmethod
- def _auditpath(self, path: bytes, mode: bytes):
+ def _auditpath(self, path: bytes, mode: bytes) -> Any:
...
@abc.abstractmethod
@@ -79,7 +93,7 @@
pass
return b""
- def tryreadlines(self, path: bytes, mode: bytes = b'rb'):
+ def tryreadlines(self, path: bytes, mode: bytes = b'rb') -> Any:
'''gracefully return an empty array for missing files'''
try:
return self.readlines(path, mode=mode)
@@ -101,7 +115,7 @@
with self(path, b'rb') as fp:
return fp.read()
- def readlines(self, path: bytes, mode: bytes = b'rb'):
+ def readlines(self, path: bytes, mode: bytes = b'rb') -> Any:
with self(path, mode=mode) as fp:
return fp.readlines()
@@ -139,7 +153,7 @@
def exists(self, path: Optional[bytes] = None) -> bool:
return os.path.exists(self.join(path))
- def fstat(self, fp):
+ def fstat(self, fp) -> os.stat_result:
return util.fstat(fp)
def isdir(self, path: Optional[bytes] = None) -> bool:
@@ -179,7 +193,7 @@
to allow handling of strange encoding if needed."""
return self._join(*paths)
- def split(self, path: bytes):
+ def split(self, path: bytes) -> Tuple[bytes, bytes]:
"""split top-most element of a path (as os.path.split would do)
This exists to allow handling of strange encoding if needed."""
@@ -188,7 +202,7 @@
def lexists(self, path: Optional[bytes] = None) -> bool:
return os.path.lexists(self.join(path))
- def lstat(self, path: Optional[bytes] = None):
+ def lstat(self, path: Optional[bytes] = None) -> os.stat_result:
return os.lstat(self.join(path))
def is_mmap_safe(self, path: Optional[bytes] = None) -> bool:
@@ -220,21 +234,21 @@
fstype = util.getfstype(self.join(path))
return fstype is not None and fstype != b'nfs'
- def listdir(self, path: Optional[bytes] = None):
+ def listdir(self, path: Optional[bytes] = None) -> List[bytes]:
return os.listdir(self.join(path))
- def makedir(self, path: Optional[bytes] = None, notindexed=True):
+ def makedir(self, path: Optional[bytes] = None, notindexed=True) -> None:
return util.makedir(self.join(path), notindexed)
def makedirs(
self, path: Optional[bytes] = None, mode: Optional[int] = None
- ):
+ ) -> None:
return util.makedirs(self.join(path), mode)
- def makelock(self, info, path: bytes):
+ def makelock(self, info, path: bytes) -> None:
return util.makelock(info, self.join(path))
- def mkdir(self, path: Optional[bytes] = None):
+ def mkdir(self, path: Optional[bytes] = None) -> None:
return os.mkdir(self.join(path))
def mkstemp(
@@ -242,7 +256,7 @@
suffix: bytes = b'',
prefix: bytes = b'tmp',
dir: Optional[bytes] = None,
- ):
+ ) -> Tuple[int, bytes]:
fd, name = pycompat.mkstemp(
suffix=suffix, prefix=prefix, dir=self.join(dir)
)
@@ -252,13 +266,15 @@
else:
return fd, fname
- def readdir(self, path: Optional[bytes] = None, stat=None, skip=None):
+ def readdir(
+ self, path: Optional[bytes] = None, stat=None, skip=None
+ ) -> Any:
return util.listdir(self.join(path), stat, skip)
def readlock(self, path: bytes) -> bytes:
return util.readlock(self.join(path))
- def rename(self, src: bytes, dst: bytes, checkambig=False):
+ def rename(self, src: bytes, dst: bytes, checkambig=False) -> None:
"""Rename from src to dst
checkambig argument is used with util.filestat, and is useful
@@ -283,11 +299,11 @@
def readlink(self, path: bytes) -> bytes:
return util.readlink(self.join(path))
- def removedirs(self, path: Optional[bytes] = None):
+ def removedirs(self, path: Optional[bytes] = None) -> None:
"""Remove a leaf directory and all empty intermediate ones"""
return util.removedirs(self.join(path))
- def rmdir(self, path: Optional[bytes] = None):
+ def rmdir(self, path: Optional[bytes] = None) -> None:
"""Remove an empty directory."""
return os.rmdir(self.join(path))
@@ -323,27 +339,27 @@
self.join(path), ignore_errors=ignore_errors, onerror=onexc
)
- def setflags(self, path: bytes, l: bool, x: bool):
+ def setflags(self, path: bytes, l: bool, x: bool) -> None:
return util.setflags(self.join(path), l, x)
- def stat(self, path: Optional[bytes] = None):
+ def stat(self, path: Optional[bytes] = None) -> os.stat_result:
return os.stat(self.join(path))
- def unlink(self, path: Optional[bytes] = None):
+ def unlink(self, path: Optional[bytes] = None) -> None:
return util.unlink(self.join(path))
- def tryunlink(self, path: Optional[bytes] = None):
+ def tryunlink(self, path: Optional[bytes] = None) -> bool:
"""Attempt to remove a file, ignoring missing file errors."""
return util.tryunlink(self.join(path))
def unlinkpath(
self, path: Optional[bytes] = None, ignoremissing=False, rmdir=True
- ):
+ ) -> None:
return util.unlinkpath(
self.join(path), ignoremissing=ignoremissing, rmdir=rmdir
)
- def utime(self, path: Optional[bytes] = None, t=None):
+ def utime(self, path: Optional[bytes] = None, t=None) -> None:
return os.utime(self.join(path), t)
def walk(self, path: Optional[bytes] = None, onerror=None):
@@ -393,7 +409,7 @@
None # pytype: disable=attribute-error
)
- def register_file(self, path):
+ def register_file(self, path) -> None:
"""generic hook point to lets fncache steer its stew"""
@@ -415,7 +431,7 @@
cacheaudited=False,
expandpath=False,
realpath=False,
- ):
+ ) -> None:
if expandpath:
base = util.expandpath(base)
if realpath:
@@ -438,7 +454,7 @@
def _chmod(self):
return util.checkexec(self.base)
- def _fixfilemode(self, name):
+ def _fixfilemode(self, name) -> None:
if self.createmode is None or not self._chmod:
return
os.chmod(name, self.createmode & 0o666)
@@ -487,7 +503,7 @@
checkambig=False,
auditpath=True,
makeparentdirs=True,
- ):
+ ) -> Any:
"""Open ``path`` file, which is relative to vfs root.
By default, parent directories are created as needed. Newly created
@@ -622,7 +638,7 @@
return self.base
-opener = vfs
+opener: Type[vfs] = vfs
class proxyvfs(abstractvfs, abc.ABC):
@@ -633,7 +649,7 @@
def createmode(self):
return self.vfs.createmode
- def _auditpath(self, path, mode):
+ def _auditpath(self, path, mode) -> None:
return self.vfs._auditpath(path, mode)
@property
@@ -656,7 +672,7 @@
proxyvfs.__init__(self, vfs)
self._filter = filter
- def __call__(self, path: bytes, *args, **kwargs):
+ def __call__(self, path: bytes, *args, **kwargs) -> Any:
return self.vfs(self._filter(path), *args, **kwargs)
def join(self, path: Optional[bytes], *insidef: bytes) -> bytes:
@@ -666,7 +682,7 @@
return self.vfs.join(path)
-filteropener = filtervfs
+filteropener: Type[filtervfs] = filtervfs
class readonlyvfs(proxyvfs):
@@ -675,7 +691,7 @@
def __init__(self, vfs: "vfs"):
proxyvfs.__init__(self, vfs)
- def __call__(self, path: bytes, mode: bytes = b'rb', *args, **kw):
+ def __call__(self, path: bytes, mode: bytes = b'rb', *args, **kw) -> Any:
if mode not in (b'r', b'rb'):
raise error.Abort(_(b'this vfs is read only'))
return self.vfs(path, mode, *args, **kw)
@@ -690,19 +706,19 @@
Do not instantiate outside the vfs layer.
"""
- def __init__(self, fh):
+ def __init__(self, fh) -> None:
object.__setattr__(self, '_origfh', fh)
- def __getattr__(self, attr):
+ def __getattr__(self, attr) -> Any:
return getattr(self._origfh, attr)
- def __setattr__(self, attr, value):
+ def __setattr__(self, attr, value) -> None:
return setattr(self._origfh, attr, value)
- def __delattr__(self, attr):
+ def __delattr__(self, attr) -> None:
return delattr(self._origfh, attr)
- def __enter__(self):
+ def __enter__(self: _Tclosewrapbase) -> _Tclosewrapbase:
self._origfh.__enter__()
return self
@@ -721,21 +737,21 @@
Do not instantiate outside the vfs layer.
"""
- def __init__(self, fh, closer):
+ def __init__(self, fh, closer) -> None:
super(delayclosedfile, self).__init__(fh)
object.__setattr__(self, '_closer', closer)
def __exit__(self, exc_type, exc_value, exc_tb):
self._closer.close(self._origfh)
- def close(self):
+ def close(self) -> None:
self._closer.close(self._origfh)
class backgroundfilecloser:
"""Coordinates background closing of file handles on multiple threads."""
- def __init__(self, ui, expectedcount=-1):
+ def __init__(self, ui, expectedcount=-1) -> None:
self._running = False
self._entered = False
self._threads = []
@@ -773,11 +789,11 @@
self._threads.append(t)
t.start()
- def __enter__(self):
+ def __enter__(self: _Tbackgroundfilecloser) -> _Tbackgroundfilecloser:
self._entered = True
return self
- def __exit__(self, exc_type, exc_value, exc_tb):
+ def __exit__(self, exc_type, exc_value, exc_tb) -> None: # TODO
self._running = False
# Wait for threads to finish closing so open files don't linger for
@@ -785,7 +801,7 @@
for t in self._threads:
t.join()
- def _worker(self):
+ def _worker(self) -> None:
"""Main routine for worker thread."""
while True:
try:
@@ -801,7 +817,7 @@
if not self._running:
break
- def close(self, fh):
+ def close(self, fh) -> None:
"""Schedule a file for closing."""
if not self._entered:
raise error.Abort(
@@ -835,19 +851,19 @@
Do not instantiate outside the vfs layer.
"""
- def __init__(self, fh):
+ def __init__(self, fh) -> None:
super(checkambigatclosing, self).__init__(fh)
object.__setattr__(self, '_oldstat', util.filestat.frompath(fh.name))
- def _checkambig(self):
+ def _checkambig(self) -> None:
oldstat = self._oldstat
if oldstat.stat:
_avoidambig(self._origfh.name, oldstat)
- def __exit__(self, exc_type, exc_value, exc_tb):
+ def __exit__(self, exc_type, exc_value, exc_tb) -> None: # TODO
self._origfh.__exit__(exc_type, exc_value, exc_tb)
self._checkambig()
- def close(self):
+ def close(self) -> None:
self._origfh.close()
self._checkambig()