--- a/mercurial/vfs.py Fri Nov 04 17:35:44 2022 -0400
+++ b/mercurial/vfs.py Wed Nov 02 17:30:57 2022 -0400
@@ -11,6 +11,10 @@
import stat
import threading
+from typing import (
+ Optional,
+)
+
from .i18n import _
from .pycompat import (
delattr,
@@ -26,7 +30,7 @@
)
-def _avoidambig(path, oldstat):
+def _avoidambig(path: bytes, oldstat):
"""Avoid file stat ambiguity forcibly
This function causes copying ``path`` file, if it is owned by
@@ -60,16 +64,17 @@
'''Prevent instantiation; don't call this from subclasses.'''
raise NotImplementedError('attempted instantiating ' + str(type(self)))
- def __call__(self, path, mode=b'rb', **kwargs):
+ # TODO: type return, which is util.posixfile wrapped by a proxy
+ def __call__(self, path: bytes, mode: bytes = b'rb', **kwargs):
raise NotImplementedError
- def _auditpath(self, path, mode):
+ def _auditpath(self, path: bytes, mode: bytes):
raise NotImplementedError
- def join(self, path, *insidef):
+ def join(self, path: Optional[bytes], *insidef: bytes) -> bytes:
raise NotImplementedError
- def tryread(self, path):
+ def tryread(self, path: bytes) -> bytes:
'''gracefully return an empty string for missing files'''
try:
return self.read(path)
@@ -77,7 +82,7 @@
pass
return b""
- def tryreadlines(self, path, mode=b'rb'):
+ def tryreadlines(self, path: bytes, mode: bytes = b'rb'):
'''gracefully return an empty array for missing files'''
try:
return self.readlines(path, mode=mode)
@@ -95,57 +100,61 @@
"""
return self.__call__
- def read(self, path):
+ def read(self, path: bytes) -> bytes:
with self(path, b'rb') as fp:
return fp.read()
- def readlines(self, path, mode=b'rb'):
+ def readlines(self, path: bytes, mode: bytes = b'rb'):
with self(path, mode=mode) as fp:
return fp.readlines()
- def write(self, path, data, backgroundclose=False, **kwargs):
+ def write(
+ self, path: bytes, data: bytes, backgroundclose=False, **kwargs
+ ) -> int:
with self(path, b'wb', backgroundclose=backgroundclose, **kwargs) as fp:
return fp.write(data)
- def writelines(self, path, data, mode=b'wb', notindexed=False):
+ def writelines(
+ self, path: bytes, data: bytes, mode: bytes = b'wb', notindexed=False
+ ) -> None:
with self(path, mode=mode, notindexed=notindexed) as fp:
return fp.writelines(data)
- def append(self, path, data):
+ def append(self, path: bytes, data: bytes) -> int:
with self(path, b'ab') as fp:
return fp.write(data)
- def basename(self, path):
+ def basename(self, path: bytes) -> bytes:
"""return base element of a path (as os.path.basename would do)
This exists to allow handling of strange encoding if needed."""
return os.path.basename(path)
- def chmod(self, path, mode):
+ def chmod(self, path: bytes, mode: int) -> None:
return os.chmod(self.join(path), mode)
- def dirname(self, path):
+ def dirname(self, path: bytes) -> bytes:
"""return dirname element of a path (as os.path.dirname would do)
This exists to allow handling of strange encoding if needed."""
return os.path.dirname(path)
- def exists(self, path=None):
+ def exists(self, path: Optional[bytes] = None) -> bool:
return os.path.exists(self.join(path))
def fstat(self, fp):
return util.fstat(fp)
- def isdir(self, path=None):
+ def isdir(self, path: Optional[bytes] = None) -> bool:
return os.path.isdir(self.join(path))
- def isfile(self, path=None):
+ def isfile(self, path: Optional[bytes] = None) -> bool:
return os.path.isfile(self.join(path))
- def islink(self, path=None):
+ def islink(self, path: Optional[bytes] = None) -> bool:
return os.path.islink(self.join(path))
- def isfileorlink(self, path=None):
+ def isfileorlink(self, path: Optional[bytes] = None) -> bool:
"""return whether path is a regular file or a symlink
Unlike isfile, this doesn't follow symlinks."""
@@ -156,7 +165,7 @@
mode = st.st_mode
return stat.S_ISREG(mode) or stat.S_ISLNK(mode)
- def _join(self, *paths):
+ def _join(self, *paths: bytes) -> bytes:
root_idx = 0
for idx, p in enumerate(paths):
if os.path.isabs(p) or p.startswith(self._dir_sep):
@@ -166,41 +175,48 @@
paths = [p for p in paths if p]
return self._dir_sep.join(paths)
- def reljoin(self, *paths):
+ def reljoin(self, *paths: bytes) -> bytes:
"""join various elements of a path together (as os.path.join would do)
The vfs base is not injected so that path stay relative. This exists
to allow handling of strange encoding if needed."""
return self._join(*paths)
- def split(self, path):
+ def split(self, path: bytes):
"""split top-most element of a path (as os.path.split would do)
This exists to allow handling of strange encoding if needed."""
return os.path.split(path)
- def lexists(self, path=None):
+ def lexists(self, path: Optional[bytes] = None) -> bool:
return os.path.lexists(self.join(path))
- def lstat(self, path=None):
+ def lstat(self, path: Optional[bytes] = None):
return os.lstat(self.join(path))
- def listdir(self, path=None):
+ def listdir(self, path: Optional[bytes] = None):
return os.listdir(self.join(path))
- def makedir(self, path=None, notindexed=True):
+ def makedir(self, path: Optional[bytes] = None, notindexed=True):
return util.makedir(self.join(path), notindexed)
- def makedirs(self, path=None, mode=None):
+ def makedirs(
+ self, path: Optional[bytes] = None, mode: Optional[int] = None
+ ):
return util.makedirs(self.join(path), mode)
- def makelock(self, info, path):
+ def makelock(self, info, path: bytes):
return util.makelock(info, self.join(path))
- def mkdir(self, path=None):
+ def mkdir(self, path: Optional[bytes] = None):
return os.mkdir(self.join(path))
- def mkstemp(self, suffix=b'', prefix=b'tmp', dir=None):
+ def mkstemp(
+ self,
+ suffix: bytes = b'',
+ prefix: bytes = b'tmp',
+ dir: Optional[bytes] = None,
+ ):
fd, name = pycompat.mkstemp(
suffix=suffix, prefix=prefix, dir=self.join(dir)
)
@@ -210,13 +226,13 @@
else:
return fd, fname
- def readdir(self, path=None, stat=None, skip=None):
+ def readdir(self, path: Optional[bytes] = None, stat=None, skip=None):
return util.listdir(self.join(path), stat, skip)
- def readlock(self, path):
+ def readlock(self, path: bytes) -> bytes:
return util.readlock(self.join(path))
- def rename(self, src, dst, checkambig=False):
+ def rename(self, src: bytes, dst: bytes, checkambig=False):
"""Rename from src to dst
checkambig argument is used with util.filestat, and is useful
@@ -238,18 +254,20 @@
return ret
return util.rename(srcpath, dstpath)
- def readlink(self, path):
+ def readlink(self, path: bytes) -> bytes:
return util.readlink(self.join(path))
- def removedirs(self, path=None):
+ def removedirs(self, path: Optional[bytes] = None):
"""Remove a leaf directory and all empty intermediate ones"""
return util.removedirs(self.join(path))
- def rmdir(self, path=None):
+ def rmdir(self, path: Optional[bytes] = None):
"""Remove an empty directory."""
return os.rmdir(self.join(path))
- def rmtree(self, path=None, ignore_errors=False, forcibly=False):
+ def rmtree(
+ self, path: Optional[bytes] = None, ignore_errors=False, forcibly=False
+ ):
"""Remove a directory tree recursively
If ``forcibly``, this tries to remove READ-ONLY files, too.
@@ -272,28 +290,30 @@
self.join(path), ignore_errors=ignore_errors, onerror=onerror
)
- def setflags(self, path, l, x):
+ def setflags(self, path: bytes, l: bool, x: bool):
return util.setflags(self.join(path), l, x)
- def stat(self, path=None):
+ def stat(self, path: Optional[bytes] = None):
return os.stat(self.join(path))
- def unlink(self, path=None):
+ def unlink(self, path: Optional[bytes] = None):
return util.unlink(self.join(path))
- def tryunlink(self, path=None):
+ def tryunlink(self, path: Optional[bytes] = None):
"""Attempt to remove a file, ignoring missing file errors."""
util.tryunlink(self.join(path))
- def unlinkpath(self, path=None, ignoremissing=False, rmdir=True):
+ def unlinkpath(
+ self, path: Optional[bytes] = None, ignoremissing=False, rmdir=True
+ ):
return util.unlinkpath(
self.join(path), ignoremissing=ignoremissing, rmdir=rmdir
)
- def utime(self, path=None, t=None):
+ def utime(self, path: Optional[bytes] = None, t=None):
return os.utime(self.join(path), t)
- def walk(self, path=None, onerror=None):
+ def walk(self, path: Optional[bytes] = None, onerror=None):
"""Yield (dirpath, dirs, files) tuple for each directories under path
``dirpath`` is relative one from the root of this vfs. This
@@ -360,7 +380,7 @@
def __init__(
self,
- base,
+ base: bytes,
audit=True,
cacheaudited=False,
expandpath=False,
@@ -381,7 +401,7 @@
self.options = {}
@util.propertycache
- def _cansymlink(self):
+ def _cansymlink(self) -> bool:
return util.checklink(self.base)
@util.propertycache
@@ -393,7 +413,7 @@
return
os.chmod(name, self.createmode & 0o666)
- def _auditpath(self, path, mode):
+ def _auditpath(self, path, mode) -> None:
if self._audit:
if os.path.isabs(path) and path.startswith(self.base):
path = os.path.relpath(path, self.base)
@@ -404,8 +424,8 @@
def __call__(
self,
- path,
- mode=b"r",
+ path: bytes,
+ mode: bytes = b"r",
atomictemp=False,
notindexed=False,
backgroundclose=False,
@@ -518,7 +538,7 @@
return fp
- def symlink(self, src, dst):
+ def symlink(self, src: bytes, dst: bytes) -> None:
self.audit(dst)
linkname = self.join(dst)
util.tryunlink(linkname)
@@ -538,7 +558,7 @@
else:
self.write(dst, src)
- def join(self, path, *insidef):
+ def join(self, path: Optional[bytes], *insidef: bytes) -> bytes:
if path:
parts = [self.base, path]
parts.extend(insidef)
@@ -551,7 +571,7 @@
class proxyvfs(abstractvfs):
- def __init__(self, vfs):
+ def __init__(self, vfs: "vfs"):
self.vfs = vfs
def _auditpath(self, path, mode):
@@ -569,14 +589,14 @@
class filtervfs(proxyvfs, abstractvfs):
'''Wrapper vfs for filtering filenames with a function.'''
- def __init__(self, vfs, filter):
+ def __init__(self, vfs: "vfs", filter):
proxyvfs.__init__(self, vfs)
self._filter = filter
- def __call__(self, path, *args, **kwargs):
+ def __call__(self, path: bytes, *args, **kwargs):
return self.vfs(self._filter(path), *args, **kwargs)
- def join(self, path, *insidef):
+ def join(self, path: Optional[bytes], *insidef: bytes) -> bytes:
if path:
return self.vfs.join(self._filter(self.vfs.reljoin(path, *insidef)))
else:
@@ -589,15 +609,15 @@
class readonlyvfs(proxyvfs):
'''Wrapper vfs preventing any writing.'''
- def __init__(self, vfs):
+ def __init__(self, vfs: "vfs"):
proxyvfs.__init__(self, vfs)
- def __call__(self, path, mode=b'r', *args, **kw):
+ def __call__(self, path: bytes, mode: bytes = b'r', *args, **kw):
if mode not in (b'r', b'rb'):
raise error.Abort(_(b'this vfs is read only'))
return self.vfs(path, mode, *args, **kw)
- def join(self, path, *insidef):
+ def join(self, path: Optional[bytes], *insidef: bytes) -> bytes:
return self.vfs.join(path, *insidef)