--- a/mercurial/util.py Sat Mar 06 18:51:33 2021 -0500
+++ b/mercurial/util.py Sat Mar 06 23:41:32 2021 -0500
@@ -59,6 +59,16 @@
stringutil,
)
+if pycompat.TYPE_CHECKING:
+ from typing import (
+ Iterator,
+ List,
+ Optional,
+ Tuple,
+ Union,
+ )
+
+
base85 = policy.importmod('base85')
osutil = policy.importmod('osutil')
@@ -133,6 +143,7 @@
def setumask(val):
+ # type: (int) -> None
''' updates the umask. used by chg server '''
if pycompat.iswindows:
return
@@ -307,7 +318,7 @@
try:
- buffer = buffer
+ buffer = buffer # pytype: disable=name-error
except NameError:
def buffer(sliceable, offset=0, length=None):
@@ -1833,6 +1844,7 @@
def pathto(root, n1, n2):
+ # type: (bytes, bytes, bytes) -> bytes
"""return the relative path from one place to another.
root should use os.sep to separate directories
n1 should use os.sep to separate directories
@@ -2017,6 +2029,7 @@
def checkwinfilename(path):
+ # type: (bytes) -> Optional[bytes]
r"""Check that the base-relative path is a valid filename on Windows.
Returns None if the path is ok, or a UI string describing the problem.
@@ -2111,6 +2124,7 @@
def readlock(pathname):
+ # type: (bytes) -> bytes
try:
return readlink(pathname)
except OSError as why:
@@ -2134,6 +2148,7 @@
def fscasesensitive(path):
+ # type: (bytes) -> bool
"""
Return true if the given path is on a case-sensitive filesystem
@@ -2215,6 +2230,7 @@
def fspath(name, root):
+ # type: (bytes, bytes) -> bytes
"""Get name in the case stored in the filesystem
The name should be relative to root, and be normcase-ed for efficiency.
@@ -2259,6 +2275,7 @@
def checknlink(testfile):
+ # type: (bytes) -> bool
'''check whether hardlink count reporting works properly'''
# testfile may be open, so we need a separate file for checking to
@@ -2292,8 +2309,9 @@
def endswithsep(path):
+ # type: (bytes) -> bool
'''Check path ends with os.sep or os.altsep.'''
- return (
+ return bool( # help pytype
path.endswith(pycompat.ossep)
or pycompat.osaltsep
and path.endswith(pycompat.osaltsep)
@@ -2301,6 +2319,7 @@
def splitpath(path):
+ # type: (bytes) -> List[bytes]
"""Split path by os.sep.
Note that this function does not use os.altsep because this is
an alternative of simple "xxx.split(os.sep)".
@@ -2529,6 +2548,7 @@
def unlinkpath(f, ignoremissing=False, rmdir=True):
+ # type: (bytes, bool, bool) -> None
"""unlink and remove the directory if it is empty"""
if ignoremissing:
tryunlink(f)
@@ -2543,6 +2563,7 @@
def tryunlink(f):
+ # type: (bytes) -> None
"""Attempt to remove a file, ignoring ENOENT errors."""
try:
unlink(f)
@@ -2552,6 +2573,7 @@
def makedirs(name, mode=None, notindexed=False):
+ # type: (bytes, Optional[int], bool) -> None
"""recursive directory creation with parent mode inheritance
Newly created directories are marked as "not to be indexed by
@@ -2581,16 +2603,19 @@
def readfile(path):
+ # type: (bytes) -> bytes
with open(path, b'rb') as fp:
return fp.read()
def writefile(path, text):
+ # type: (bytes, bytes) -> None
with open(path, b'wb') as fp:
fp.write(text)
def appendfile(path, text):
+ # type: (bytes, bytes) -> None
with open(path, b'ab') as fp:
fp.write(text)
@@ -2752,6 +2777,7 @@
def processlinerange(fromline, toline):
+ # type: (int, int) -> Tuple[int, int]
"""Check that linerange <fromline>:<toline> makes sense and return a
0-based range.
@@ -2811,10 +2837,12 @@
def tolf(s):
+ # type: (bytes) -> bytes
return _eolre.sub(b'\n', s)
def tocrlf(s):
+ # type: (bytes) -> bytes
return _eolre.sub(b'\r\n', s)
@@ -2878,12 +2906,14 @@
def iterlines(iterator):
+ # type: (Iterator[bytes]) -> Iterator[bytes]
for chunk in iterator:
for line in chunk.splitlines():
yield line
def expandpath(path):
+ # type: (bytes) -> bytes
return os.path.expanduser(os.path.expandvars(path))
@@ -2914,6 +2944,7 @@
def getport(port):
+ # type: (Union[bytes, int]) -> int
"""Return the port for a given network service.
If port is an integer, it's returned as is. If it's a string, it's
@@ -3012,6 +3043,7 @@
_matchscheme = remod.compile(b'^[a-zA-Z0-9+.\\-]+:').match
def __init__(self, path, parsequery=True, parsefragment=True):
+ # type: (bytes, bool, bool) -> None
# We slowly chomp away at path until we have only the path left
self.scheme = self.user = self.passwd = self.host = None
self.port = self.path = self.query = self.fragment = None
@@ -3239,6 +3271,7 @@
return False
def localpath(self):
+ # type: () -> bytes
if self.scheme == b'file' or self.scheme == b'bundle':
path = self.path or b'/'
# For Windows, we need to promote hosts containing drive
@@ -3262,18 +3295,22 @@
def hasscheme(path):
- return bool(url(path).scheme)
+ # type: (bytes) -> bool
+ return bool(url(path).scheme) # cast to help pytype
def hasdriveletter(path):
- return path and path[1:2] == b':' and path[0:1].isalpha()
+ # type: (bytes) -> bool
+ return bool(path) and path[1:2] == b':' and path[0:1].isalpha()
def urllocalpath(path):
+ # type: (bytes) -> bytes
return url(path, parsequery=False, parsefragment=False).localpath()
def checksafessh(path):
+ # type: (bytes) -> None
"""check if a path / url is a potentially unsafe ssh exploit (SEC)
This is a sanity check for ssh urls. ssh will parse the first item as
@@ -3291,6 +3328,7 @@
def hidepassword(u):
+ # type: (bytes) -> bytes
'''hide user credential in a url string'''
u = url(u)
if u.passwd:
@@ -3299,6 +3337,7 @@
def removeauth(u):
+ # type: (bytes) -> bytes
'''remove all authentication information from a url string'''
u = url(u)
u.user = u.passwd = None
@@ -3404,6 +3443,7 @@
def sizetoint(s):
+ # type: (bytes) -> int
"""Convert a space specifier to a byte count.
>>> sizetoint(b'30')
@@ -3629,6 +3669,7 @@
def _estimatememory():
+ # type: () -> Optional[int]
"""Provide an estimate for the available system memory in Bytes.
If no estimate can be provided on the platform, returns None.
@@ -3636,7 +3677,12 @@
if pycompat.sysplatform.startswith(b'win'):
# On Windows, use the GlobalMemoryStatusEx kernel function directly.
from ctypes import c_long as DWORD, c_ulonglong as DWORDLONG
- from ctypes.wintypes import Structure, byref, sizeof, windll
+ from ctypes.wintypes import ( # pytype: disable=import-error
+ Structure,
+ byref,
+ sizeof,
+ windll,
+ )
class MEMORYSTATUSEX(Structure):
_fields_ = [