changeset 49812:58dff81ffba1

typing: add type hints to the common posix/windows platform functions These are done in sync because some platforms have empty implementations, and it isn't obvious what the types should be without examining the other. We want the types aligned, so @overload definitions that differ aren't generated. The only differences here are the few methods that unconditionally raise an error are marked as `NoReturn`, which doesn't seem to bother pytype. A couple of the posix module functions needed to be updated with a modern ternary operator, because pytype seems to want to use the type of the second object in the old `return x and y` style.
author Matt Harbison <matt_harbison@yahoo.com>
date Fri, 16 Dec 2022 00:54:39 -0500
parents 0a91aba258e0
children 54114bba7c7e
files mercurial/posix.py mercurial/windows.py
diffstat 2 files changed, 97 insertions(+), 64 deletions(-) [+]
line wrap: on
line diff
--- a/mercurial/posix.py	Thu Dec 15 21:13:11 2022 -0500
+++ b/mercurial/posix.py	Fri Dec 16 00:54:39 2022 -0500
@@ -20,9 +20,13 @@
 import unicodedata
 
 from typing import (
+    Iterable,
+    Iterator,
     List,
     NoReturn,
     Optional,
+    Sequence,
+    Union,
 )
 
 from .i18n import _
@@ -91,7 +95,7 @@
     return ht[0] + b'/', ht[1]
 
 
-def openhardlinks():
+def openhardlinks() -> bool:
     '''return true if it is safe to hold open file handles to hardlinks'''
     return True
 
@@ -101,7 +105,7 @@
     return os.lstat(name).st_nlink
 
 
-def parsepatchoutput(output_line):
+def parsepatchoutput(output_line: bytes) -> bytes:
     """parses the output produced by patch and returns the filename"""
     pf = output_line[14:]
     if pycompat.sysplatform == b'OpenVMS':
@@ -113,7 +117,9 @@
     return pf
 
 
-def sshargs(sshcmd, host, user, port):
+def sshargs(
+    sshcmd: bytes, host: bytes, user: Optional[bytes], port: Optional[bytes]
+) -> bytes:
     '''Build argument list for ssh'''
     args = user and (b"%s@%s" % (user, host)) or host
     if b'-' in args[:1]:
@@ -126,12 +132,12 @@
     return args
 
 
-def isexec(f):
+def isexec(f: bytes) -> bool:
     """check whether a file is executable"""
     return os.lstat(f).st_mode & 0o100 != 0
 
 
-def setflags(f, l, x):
+def setflags(f: bytes, l: bool, x: bool) -> None:
     st = os.lstat(f)
     s = st.st_mode
     if l:
@@ -175,7 +181,12 @@
         os.chmod(f, s & 0o666)
 
 
-def copymode(src, dst, mode=None, enforcewritable=False):
+def copymode(
+    src: bytes,
+    dst: bytes,
+    mode: Optional[bytes] = None,
+    enforcewritable: bool = False,
+) -> None:
     """Copy the file mode from the file at path src to dst.
     If src doesn't exist, we're using mode instead. If mode is None, we're
     using umask."""
@@ -195,7 +206,7 @@
     os.chmod(dst, new_mode)
 
 
-def checkexec(path):
+def checkexec(path: bytes) -> bool:
     """
     Check whether the given path is on a filesystem with UNIX-like exec flags
 
@@ -275,7 +286,7 @@
         return False
 
 
-def checklink(path):
+def checklink(path: bytes) -> bool:
     """check whether the given path is on a symlink-capable filesystem"""
     # mktemp is not racy because symlink creation will fail if the
     # file already exists
@@ -362,19 +373,19 @@
     return getattr(osutil, 'getfstype', lambda x: None)(dirpath)
 
 
-def get_password():
+def get_password() -> bytes:
     return encoding.strtolocal(getpass.getpass(''))
 
 
-def setbinary(fd):
+def setbinary(fd) -> None:
     pass
 
 
-def pconvert(path):
+def pconvert(path: bytes) -> bytes:
     return path
 
 
-def localpath(path):
+def localpath(path: bytes) -> bytes:
     return path
 
 
@@ -393,7 +404,7 @@
 
 
 # os.path.normcase is a no-op, which doesn't help us on non-native filesystems
-def normcase(path):
+def normcase(path: bytes) -> bytes:
     return path.lower()
 
 
@@ -404,7 +415,7 @@
 
 if pycompat.isdarwin:
 
-    def normcase(path):
+    def normcase(path: bytes) -> bytes:
         """
         Normalize a filename for OS X-compatible comparison:
         - escape-encode invalid characters
@@ -429,7 +440,7 @@
 
     normcasespec = encoding.normcasespecs.lower
 
-    def normcasefallback(path):
+    def normcasefallback(path: bytes) -> bytes:
         try:
             u = path.decode('utf-8')
         except UnicodeDecodeError:
@@ -470,7 +481,7 @@
     )
 
     # use upper-ing as normcase as same as NTFS workaround
-    def normcase(path):
+    def normcase(path: bytes) -> bytes:
         pathlen = len(path)
         if (pathlen == 0) or (path[0] != pycompat.ossep):
             # treat as relative
@@ -496,20 +507,20 @@
     # but these translations are not supported by native
     # tools, so the exec bit tends to be set erroneously.
     # Therefore, disable executable bit access on Cygwin.
-    def checkexec(path):
+    def checkexec(path: bytes) -> bool:
         return False
 
     # Similarly, Cygwin's symlink emulation is likely to create
     # problems when Mercurial is used from both Cygwin and native
     # Windows, with other native tools, or on shared volumes
-    def checklink(path):
+    def checklink(path: bytes) -> bool:
         return False
 
 
 _needsshellquote = None
 
 
-def shellquote(s):
+def shellquote(s: bytes) -> bytes:
     if pycompat.sysplatform == b'OpenVMS':
         return b'"%s"' % s
     global _needsshellquote
@@ -522,7 +533,7 @@
         return b"'%s'" % s.replace(b"'", b"'\\''")
 
 
-def shellsplit(s):
+def shellsplit(s: bytes) -> List[bytes]:
     """Parse a command string in POSIX shell way (best-effort)"""
     return pycompat.shlexsplit(s, posix=True)
 
@@ -538,12 +549,12 @@
         return inst.errno != errno.ESRCH
 
 
-def isowner(st):
+def isowner(st: os.stat_result) -> bool:
     """Return True if the stat object st is from the current user."""
     return st.st_uid == os.getuid()
 
 
-def findexe(command):
+def findexe(command: bytes) -> Optional[bytes]:
     """Find executable for command searching like which does.
     If command is a basename then PATH is searched for command.
     PATH isn't searched if command is an absolute or relative path.
@@ -551,7 +562,7 @@
     if pycompat.sysplatform == b'OpenVMS':
         return command
 
-    def findexisting(executable):
+    def findexisting(executable: bytes) -> Optional[bytes]:
         b'Will return executable if existing file'
         if os.path.isfile(executable) and os.access(executable, os.X_OK):
             return executable
@@ -577,7 +588,7 @@
 _wantedkinds = {stat.S_IFREG, stat.S_IFLNK}
 
 
-def statfiles(files):
+def statfiles(files: Sequence[bytes]) -> Iterator[Optional[os.stat_result]]:
     """Stat each file in files. Yield each stat, or None if a file does not
     exist or has a type we don't care about."""
     lstat = os.lstat
@@ -597,7 +608,7 @@
     return pycompat.fsencode(getpass.getuser())
 
 
-def username(uid=None):
+def username(uid: Optional[int] = None) -> Optional[bytes]:
     """Return the name of the user with the given uid.
 
     If uid is None, return the name of the current user."""
@@ -610,7 +621,7 @@
         return b'%d' % uid
 
 
-def groupname(gid=None):
+def groupname(gid: Optional[int] = None) -> Optional[bytes]:
     """Return the name of the group with the given gid.
 
     If gid is None, return the name of the current group."""
@@ -623,7 +634,7 @@
         return pycompat.bytestr(gid)
 
 
-def groupmembers(name):
+def groupmembers(name: bytes) -> List[bytes]:
     """Return the list of members of the group with the given
     name, KeyError if the group does not exist.
     """
@@ -643,7 +654,11 @@
     os.mkdir(path)
 
 
-def lookupreg(key, name=None, scope=None):
+def lookupreg(
+    key: bytes,
+    name: Optional[bytes] = None,
+    scope: Optional[Union[int, Iterable[int]]] = None,
+) -> Optional[bytes]:
     return None
 
 
@@ -690,14 +705,14 @@
         return not self == other
 
 
-def statislink(st):
+def statislink(st: Optional[os.stat_result]) -> bool:
     '''check whether a stat result is a symlink'''
-    return st and stat.S_ISLNK(st.st_mode)
+    return stat.S_ISLNK(st.st_mode) if st else False
 
 
-def statisexec(st):
+def statisexec(st: Optional[os.stat_result]) -> bool:
     '''check whether a stat result is an executable file'''
-    return st and (st.st_mode & 0o100 != 0)
+    return (st.st_mode & 0o100 != 0) if st else False
 
 
 def poll(fds):
@@ -714,7 +729,7 @@
     return sorted(list(set(sum(res, []))))
 
 
-def readpipe(pipe):
+def readpipe(pipe) -> bytes:
     """Read all available data from a pipe."""
     # We can't fstat() a pipe because Linux will always report 0.
     # So, we set the pipe to non-blocking mode and read everything
@@ -739,7 +754,7 @@
         fcntl.fcntl(pipe, fcntl.F_SETFL, oldflags)
 
 
-def bindunixsocket(sock, path):
+def bindunixsocket(sock, path: bytes) -> None:
     """Bind the UNIX domain socket to the specified path"""
     # use relative path instead of full path at bind() if possible, since
     # AF_UNIX path has very small length limit (107 chars) on common
--- a/mercurial/windows.py	Thu Dec 15 21:13:11 2022 -0500
+++ b/mercurial/windows.py	Fri Dec 16 00:54:39 2022 -0500
@@ -18,6 +18,13 @@
 
 from typing import (
     BinaryIO,
+    Iterable,
+    Iterator,
+    List,
+    NoReturn,
+    Optional,
+    Sequence,
+    Union,
 )
 
 from .i18n import _
@@ -183,7 +190,7 @@
 listdir = osutil.listdir
 
 
-def get_password():
+def get_password() -> bytes:
     """Prompt for password with echo off, using Windows getch().
 
     This shouldn't be called directly- use ``ui.getpass()`` instead, which
@@ -244,11 +251,11 @@
             raise IOError(errno.EPIPE, 'Broken pipe')
 
 
-def openhardlinks():
+def openhardlinks() -> bool:
     return True
 
 
-def parsepatchoutput(output_line):
+def parsepatchoutput(output_line: bytes) -> bytes:
     """parses the output produced by patch and returns the filename"""
     pf = output_line[14:]
     if pf[0] == b'`':
@@ -256,7 +263,9 @@
     return pf
 
 
-def sshargs(sshcmd, host, user, port):
+def sshargs(
+    sshcmd: bytes, host: bytes, user: Optional[bytes], port: Optional[bytes]
+) -> bytes:
     '''Build argument list for ssh or Plink'''
     pflag = b'plink' in sshcmd.lower() and b'-P' or b'-p'
     args = user and (b"%s@%s" % (user, host)) or host
@@ -271,23 +280,28 @@
     return args
 
 
-def setflags(f, l, x):
-    pass
-
-
-def copymode(src, dst, mode=None, enforcewritable=False):
+def setflags(f: bytes, l: bool, x: bool) -> None:
     pass
 
 
-def checkexec(path):
+def copymode(
+    src: bytes,
+    dst: bytes,
+    mode: Optional[bytes] = None,
+    enforcewritable: bool = False,
+) -> None:
+    pass
+
+
+def checkexec(path: bytes) -> bool:
     return False
 
 
-def checklink(path):
+def checklink(path: bytes) -> bool:
     return False
 
 
-def setbinary(fd):
+def setbinary(fd) -> None:
     # When run without console, pipes may expose invalid
     # fileno(), usually set to -1.
     fno = getattr(fd, 'fileno', None)
@@ -295,11 +309,11 @@
         msvcrt.setmode(fno(), os.O_BINARY)  # pytype: disable=module-attr
 
 
-def pconvert(path):
+def pconvert(path: bytes) -> bytes:
     return path.replace(pycompat.ossep, b'/')
 
 
-def localpath(path):
+def localpath(path: bytes) -> bytes:
     return path.replace(b'/', b'\\')
 
 
@@ -307,7 +321,7 @@
     return pconvert(os.path.normpath(path))
 
 
-def normcase(path):
+def normcase(path: bytes) -> bytes:
     return encoding.upper(path)  # NTFS compares via upper()
 
 
@@ -468,7 +482,7 @@
 _needsshellquote = None
 
 
-def shellquote(s):
+def shellquote(s: bytes) -> bytes:
     r"""
     >>> shellquote(br'C:\Users\xyz')
     '"C:\\Users\\xyz"'
@@ -504,18 +518,18 @@
     return s
 
 
-def shellsplit(s):
+def shellsplit(s: bytes) -> List[bytes]:
     """Parse a command string in cmd.exe way (best-effort)"""
     return pycompat.maplist(_unquote, pycompat.shlexsplit(s, posix=False))
 
 
 # if you change this stub into a real check, please try to implement the
 # username and groupname functions above, too.
-def isowner(st):
+def isowner(st: os.stat_result) -> bool:
     return True
 
 
-def findexe(command):
+def findexe(command: bytes) -> Optional[bytes]:
     """Find executable for command searching like cmd.exe does.
     If command is a basename then PATH is searched for command.
     PATH isn't searched if command is an absolute or relative path.
@@ -526,7 +540,7 @@
     if os.path.splitext(command)[1].lower() in pathexts:
         pathexts = [b'']
 
-    def findexisting(pathcommand):
+    def findexisting(pathcommand: bytes) -> Optional[bytes]:
         """Will append extension (if needed) and return existing file"""
         for ext in pathexts:
             executable = pathcommand + ext
@@ -547,7 +561,7 @@
 _wantedkinds = {stat.S_IFREG, stat.S_IFLNK}
 
 
-def statfiles(files):
+def statfiles(files: Sequence[bytes]) -> Iterator[Optional[os.stat_result]]:
     """Stat each file in files. Yield each stat, or None if a file
     does not exist or has a type we don't care about.
 
@@ -573,7 +587,7 @@
         yield cache.get(base, None)
 
 
-def username(uid=None):
+def username(uid: Optional[int] = None) -> Optional[bytes]:
     """Return the name of the user with the given uid.
 
     If uid is None, return the name of the current user."""
@@ -588,7 +602,7 @@
     return None
 
 
-def groupname(gid=None):
+def groupname(gid: Optional[int] = None) -> Optional[bytes]:
     """Return the name of the group with the given gid.
 
     If gid is None, return the name of the current group."""
@@ -640,12 +654,12 @@
     return [encoding.strtolocal(arg) for arg in [sys.executable] + sys.argv[:1]]
 
 
-def groupmembers(name):
+def groupmembers(name: bytes) -> List[bytes]:
     # Don't support groups on Windows for now
     raise KeyError
 
 
-def isexec(f):
+def isexec(f: bytes) -> bool:
     return False
 
 
@@ -657,7 +671,11 @@
         return False
 
 
-def lookupreg(key, valname=None, scope=None):
+def lookupreg(
+    key: bytes,
+    valname: Optional[bytes] = None,
+    scope: Optional[Union[int, Iterable[int]]] = None,
+) -> Optional[bytes]:
     """Look up a key/value name in the Windows registry.
 
     valname: value name. If unspecified, the default value for the key
@@ -693,12 +711,12 @@
 expandglobs = True
 
 
-def statislink(st):
+def statislink(st: Optional[os.stat_result]) -> bool:
     '''check whether a stat result is a symlink'''
     return False
 
 
-def statisexec(st):
+def statisexec(st: Optional[os.stat_result]) -> bool:
     '''check whether a stat result is an executable file'''
     return False
 
@@ -708,7 +726,7 @@
     raise NotImplementedError()
 
 
-def readpipe(pipe):
+def readpipe(pipe) -> bytes:
     """Read all available data from a pipe."""
     chunks = []
     while True:
@@ -724,5 +742,5 @@
     return b''.join(chunks)
 
 
-def bindunixsocket(sock, path):
+def bindunixsocket(sock, path: bytes) -> NoReturn:
     raise NotImplementedError('unsupported platform')