pytype: move some type comment to proper annotation
We support direct type annotations now, while pytype is starting to complains
about them.
--- a/mercurial/branchmap.py Wed Dec 20 20:13:22 2023 +0100
+++ b/mercurial/branchmap.py Tue Dec 19 21:29:34 2023 +0100
@@ -196,15 +196,16 @@
def __init__(
self,
- repo,
- entries=(),
- tipnode=None,
- tiprev=nullrev,
- filteredhash=None,
- closednodes=None,
- hasnode=None,
- ):
- # type: (localrepo.localrepository, Union[Dict[bytes, List[bytes]], Iterable[Tuple[bytes, List[bytes]]]], bytes, int, Optional[bytes], Optional[Set[bytes]], Optional[Callable[[bytes], bool]]) -> None
+ repo: "localrepo.localrepository",
+ entries: Union[
+ Dict[bytes, List[bytes]], Iterable[Tuple[bytes, List[bytes]]]
+ ] = (),
+ tipnode: Optional[bytes] = None,
+ tiprev: Optional[int] = nullrev,
+ filteredhash: Optional[bytes] = None,
+ closednodes: Optional[Set[bytes]] = None,
+ hasnode: Optional[Callable[[bytes], bool]] = None,
+ ) -> None:
"""hasnode is a function which can be used to verify whether changelog
has a given node or not. If it's not provided, we assume that every node
we have exists in changelog"""
--- a/mercurial/cmdutil.py Wed Dec 20 20:13:22 2023 +0100
+++ b/mercurial/cmdutil.py Tue Dec 19 21:29:34 2023 +0100
@@ -4118,8 +4118,10 @@
return 0
-def readgraftstate(repo, graftstate):
- # type: (Any, statemod.cmdstate) -> Dict[bytes, Any]
+def readgraftstate(
+ repo: Any,
+ graftstate: statemod.cmdstate,
+) -> Dict[bytes, Any]:
"""read the graft state file and return a dict of the data stored in it"""
try:
return graftstate.read()
--- a/mercurial/encoding.py Wed Dec 20 20:13:22 2023 +0100
+++ b/mercurial/encoding.py Tue Dec 19 21:29:34 2023 +0100
@@ -59,8 +59,7 @@
assert all(i.startswith((b"\xe2", b"\xef")) for i in _ignore)
-def hfsignoreclean(s):
- # type: (bytes) -> bytes
+def hfsignoreclean(s: bytes) -> bytes:
"""Remove codepoints ignored by HFS+ from s.
>>> hfsignoreclean(u'.h\u200cg'.encode('utf-8'))
@@ -133,8 +132,7 @@
if typing.TYPE_CHECKING:
# pseudo implementation to help pytype see localstr() constructor
- def __init__(self, u, l):
- # type: (bytes, bytes) -> None
+ def __init__(self, u: bytes, l: bytes) -> None:
super(localstr, self).__init__(l)
self._utf8 = u
@@ -153,8 +151,7 @@
"""
-def tolocal(s):
- # type: (bytes) -> bytes
+def tolocal(s: bytes) -> bytes:
"""
Convert a string from internal UTF-8 to local encoding
@@ -222,8 +219,7 @@
)
-def fromlocal(s):
- # type: (bytes) -> bytes
+def fromlocal(s: bytes) -> bytes:
"""
Convert a string from the local character encoding to UTF-8
@@ -254,20 +250,17 @@
)
-def unitolocal(u):
- # type: (Text) -> bytes
+def unitolocal(u: str) -> bytes:
"""Convert a unicode string to a byte string of local encoding"""
return tolocal(u.encode('utf-8'))
-def unifromlocal(s):
- # type: (bytes) -> Text
+def unifromlocal(s: bytes) -> str:
"""Convert a byte string of local encoding to a unicode string"""
return fromlocal(s).decode('utf-8')
-def unimethod(bytesfunc):
- # type: (Callable[[Any], bytes]) -> Callable[[Any], Text]
+def unimethod(bytesfunc: Callable[[Any], bytes]) -> Callable[[Any], str]:
"""Create a proxy method that forwards __unicode__() and __str__() of
Python 3 to __bytes__()"""
@@ -285,8 +278,7 @@
strmethod = unimethod
-def lower(s):
- # type: (bytes) -> bytes
+def lower(s: bytes) -> bytes:
"""best-effort encoding-aware case-folding of local string s"""
try:
return asciilower(s)
@@ -310,8 +302,7 @@
)
-def upper(s):
- # type: (bytes) -> bytes
+def upper(s: bytes) -> bytes:
"""best-effort encoding-aware case-folding of local string s"""
try:
return asciiupper(s)
@@ -319,8 +310,7 @@
return upperfallback(s)
-def upperfallback(s):
- # type: (Any) -> Any
+def upperfallback(s: Any) -> Any:
try:
if isinstance(s, localstr):
u = s._utf8.decode("utf-8")
@@ -395,14 +385,12 @@
)
-def colwidth(s):
- # type: (bytes) -> int
+def colwidth(s: bytes) -> int:
"""Find the column width of a string for display in the local encoding"""
return ucolwidth(s.decode(_sysstr(encoding), 'replace'))
-def ucolwidth(d):
- # type: (Text) -> int
+def ucolwidth(d: Text) -> int:
"""Find the column width of a Unicode string for display"""
eaw = getattr(unicodedata, 'east_asian_width', None)
if eaw is not None:
@@ -410,8 +398,7 @@
return len(d)
-def getcols(s, start, c):
- # type: (bytes, int, int) -> bytes
+def getcols(s: bytes, start: int, c: int) -> bytes:
"""Use colwidth to find a c-column substring of s starting at byte
index start"""
for x in range(start + c, len(s)):
@@ -421,8 +408,12 @@
raise ValueError('substring not found')
-def trim(s, width, ellipsis=b'', leftside=False):
- # type: (bytes, int, bytes, bool) -> bytes
+def trim(
+ s: bytes,
+ width: int,
+ ellipsis: bytes = b'',
+ leftside: bool = False,
+) -> bytes:
"""Trim string 's' to at most 'width' columns (including 'ellipsis').
If 'leftside' is True, left side of string 's' is trimmed.
@@ -540,8 +531,7 @@
other = 0
-def jsonescape(s, paranoid=False):
- # type: (Any, Any) -> Any
+def jsonescape(s: Any, paranoid: Any = False) -> Any:
"""returns a string suitable for JSON
JSON is problematic for us because it doesn't support non-Unicode
@@ -601,8 +591,7 @@
_utf8len = [0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 3, 4]
-def getutf8char(s, pos):
- # type: (bytes, int) -> bytes
+def getutf8char(s: bytes, pos: int) -> bytes:
"""get the next full utf-8 character in the given string, starting at pos
Raises a UnicodeError if the given location does not start a valid
@@ -620,8 +609,7 @@
return c
-def toutf8b(s):
- # type: (bytes) -> bytes
+def toutf8b(s: bytes) -> bytes:
"""convert a local, possibly-binary string into UTF-8b
This is intended as a generic method to preserve data when working
@@ -689,8 +677,7 @@
return bytes(r)
-def fromutf8b(s):
- # type: (bytes) -> bytes
+def fromutf8b(s: bytes) -> bytes:
"""Given a UTF-8b string, return a local, possibly-binary string.
return the original binary string. This
--- a/mercurial/error.py Wed Dec 20 20:13:22 2023 +0100
+++ b/mercurial/error.py Tue Dec 19 21:29:34 2023 +0100
@@ -40,8 +40,7 @@
]
-def _tobytes(exc):
- # type: (...) -> bytes
+def _tobytes(exc) -> bytes:
"""Byte-stringify exception in the same way as BaseException_str()"""
if not exc.args:
return b''
@@ -68,8 +67,7 @@
coarse_exit_code = None
detailed_exit_code = None
- def __init__(self, message, hint=None):
- # type: (bytes, Optional[bytes]) -> None
+ def __init__(self, message: bytes, hint: Optional[bytes] = None) -> None:
self.message = message
self.hint = hint
# Pass the message into the Exception constructor to help extensions
@@ -79,15 +77,13 @@
def __bytes__(self):
return self.message
- def __str__(self):
- # type: () -> str
+ def __str__(self) -> str:
# the output would be unreadable if the message was translated,
# but do not replace it with encoding.strfromlocal(), which
# may raise another exception.
return pycompat.sysstr(self.__bytes__())
- def format(self):
- # type: () -> bytes
+ def format(self) -> bytes:
from .i18n import _
message = _(b"abort: %s\n") % self.message
@@ -114,8 +110,7 @@
class SidedataHashError(RevlogError):
- def __init__(self, key, expected, got):
- # type: (int, bytes, bytes) -> None
+ def __init__(self, key: int, expected: bytes, got: bytes) -> None:
self.hint = None
self.sidedatakey = key
self.expecteddigest = expected
@@ -127,8 +122,7 @@
class LookupError(RevlogError, KeyError):
- def __init__(self, name, index, message):
- # type: (bytes, bytes, bytes) -> None
+ def __init__(self, name: bytes, index: bytes, message: bytes) -> None:
self.name = name
self.index = index
# this can't be called 'message' because at least some installs of
@@ -165,8 +159,7 @@
class CommandError(Exception):
"""Exception raised on errors in parsing the command line."""
- def __init__(self, command, message):
- # type: (Optional[bytes], bytes) -> None
+ def __init__(self, command: Optional[bytes], message: bytes) -> None:
self.command = command
self.message = message
super(CommandError, self).__init__()
@@ -177,8 +170,11 @@
class UnknownCommand(Exception):
"""Exception raised if command is not in the command table."""
- def __init__(self, command, all_commands=None):
- # type: (bytes, Optional[List[bytes]]) -> None
+ def __init__(
+ self,
+ command: bytes,
+ all_commands: Optional[List[bytes]] = None,
+ ) -> None:
self.command = command
self.all_commands = all_commands
super(UnknownCommand, self).__init__()
@@ -189,8 +185,7 @@
class AmbiguousCommand(Exception):
"""Exception raised if command shortcut matches more than one command."""
- def __init__(self, prefix, matches):
- # type: (bytes, List[bytes]) -> None
+ def __init__(self, prefix: bytes, matches: List[bytes]) -> None:
self.prefix = prefix
self.matches = matches
super(AmbiguousCommand, self).__init__()
@@ -201,8 +196,7 @@
class WorkerError(Exception):
"""Exception raised when a worker process dies."""
- def __init__(self, status_code):
- # type: (int) -> None
+ def __init__(self, status_code: int) -> None:
self.status_code = status_code
# Pass status code to superclass just so it becomes part of __bytes__
super(WorkerError, self).__init__(status_code)
@@ -216,8 +210,7 @@
coarse_exit_code = 1
detailed_exit_code = 240
- def format(self):
- # type: () -> bytes
+ def format(self) -> bytes:
from .i18n import _
message = _(b"%s\n") % self.message
@@ -229,8 +222,7 @@
class ConflictResolutionRequired(InterventionRequired):
"""Exception raised when a continuable command required merge conflict resolution."""
- def __init__(self, opname):
- # type: (bytes) -> None
+ def __init__(self, opname: bytes) -> None:
from .i18n import _
self.opname = opname
@@ -299,13 +291,16 @@
detailed_exit_code = 30
- def __init__(self, message, location=None, hint=None):
- # type: (bytes, Optional[bytes], Optional[bytes]) -> None
+ def __init__(
+ self,
+ message: bytes,
+ location: Optional[bytes] = None,
+ hint: Optional[bytes] = None,
+ ) -> None:
super(ConfigError, self).__init__(message, hint=hint)
self.location = location
- def format(self):
- # type: () -> bytes
+ def format(self) -> bytes:
from .i18n import _
if self.location is not None:
@@ -354,8 +349,11 @@
class OutOfBandError(RemoteError):
"""Exception raised when a remote repo reports failure"""
- def __init__(self, message=None, hint=None):
- # type: (Optional[bytes], Optional[bytes]) -> None
+ def __init__(
+ self,
+ message: Optional[bytes] = None,
+ hint: Optional[bytes] = None,
+ ):
from .i18n import _
if message:
@@ -371,13 +369,16 @@
detailed_exit_code = 10
- def __init__(self, message, location=None, hint=None):
- # type: (bytes, Optional[Union[bytes, int]], Optional[bytes]) -> None
+ def __init__(
+ self,
+ message: bytes,
+ location: Optional[Union[bytes, int]] = None,
+ hint: Optional[bytes] = None,
+ ):
super(ParseError, self).__init__(message, hint=hint)
self.location = location
- def format(self):
- # type: () -> bytes
+ def format(self) -> bytes:
from .i18n import _
if self.location is not None:
@@ -404,16 +405,14 @@
__bytes__ = _tobytes
-def getsimilar(symbols, value):
- # type: (Iterable[bytes], bytes) -> List[bytes]
+def getsimilar(symbols: Iterable[bytes], value: bytes) -> List[bytes]:
sim = lambda x: difflib.SequenceMatcher(None, value, x).ratio()
# The cutoff for similarity here is pretty arbitrary. It should
# probably be investigated and tweaked.
return [s for s in symbols if sim(s) > 0.6]
-def similarity_hint(similar):
- # type: (List[bytes]) -> Optional[bytes]
+def similarity_hint(similar: List[bytes]) -> Optional[bytes]:
from .i18n import _
if len(similar) == 1:
@@ -428,8 +427,7 @@
class UnknownIdentifier(ParseError):
"""Exception raised when a {rev,file}set references an unknown identifier"""
- def __init__(self, function, symbols):
- # type: (bytes, Iterable[bytes]) -> None
+ def __init__(self, function: bytes, symbols: Iterable[bytes]) -> None:
from .i18n import _
similar = getsimilar(symbols, function)
@@ -463,16 +461,14 @@
class StdioError(IOError):
"""Raised if I/O to stdout or stderr fails"""
- def __init__(self, err):
- # type: (IOError) -> None
+ def __init__(self, err: IOError) -> None:
IOError.__init__(self, err.errno, err.strerror)
# no __bytes__() because error message is derived from the standard IOError
class UnsupportedMergeRecords(Abort):
- def __init__(self, recordtypes):
- # type: (Iterable[bytes]) -> None
+ def __init__(self, recordtypes: Iterable[bytes]) -> None:
from .i18n import _
self.recordtypes = sorted(recordtypes)
@@ -490,15 +486,24 @@
class UnknownVersion(Abort):
"""generic exception for aborting from an encounter with an unknown version"""
- def __init__(self, msg, hint=None, version=None):
- # type: (bytes, Optional[bytes], Optional[bytes]) -> None
+ def __init__(
+ self,
+ msg: bytes,
+ hint: Optional[bytes] = None,
+ version: Optional[bytes] = None,
+ ) -> None:
self.version = version
super(UnknownVersion, self).__init__(msg, hint=hint)
class LockError(IOError):
- def __init__(self, errno, strerror, filename, desc):
- # _type: (int, str, bytes, bytes) -> None
+ def __init__(
+ self,
+ errno: int,
+ strerror: str,
+ filename: bytes,
+ desc: Optional[bytes],
+ ) -> None:
IOError.__init__(self, errno, strerror, filename)
self.desc = desc
@@ -506,8 +511,15 @@
class LockHeld(LockError):
- def __init__(self, errno, filename, desc, locker):
+ def __init__(
+ self,
+ errno: int,
+ filename: bytes,
+ desc: Optional[bytes],
+ locker,
+ ):
LockError.__init__(self, errno, 'Lock held', filename, desc)
+ self.filename: bytes = filename
self.locker = locker
@@ -544,8 +556,7 @@
class ProgrammingError(Hint, RuntimeError):
"""Raised if a mercurial (core or extension) developer made a mistake"""
- def __init__(self, msg, *args, **kwargs):
- # type: (AnyStr, Any, Any) -> None
+ def __init__(self, msg: AnyStr, *args, **kwargs):
# On Python 3, turn the message back into a string since this is
# an internal-only error that won't be printed except in a
# stack traces.
@@ -622,8 +633,7 @@
Also contains the tombstone data substituted for the uncensored data.
"""
- def __init__(self, filename, node, tombstone):
- # type: (bytes, bytes, bytes) -> None
+ def __init__(self, filename: bytes, node: bytes, tombstone: bytes):
from .node import short
StorageError.__init__(self, b'%s:%s' % (filename, short(node)))
@@ -685,7 +695,10 @@
The error is a formatter string and an optional iterable of arguments.
"""
- def __init__(self, message, args=None):
- # type: (bytes, Optional[Sequence[bytes]]) -> None
+ def __init__(
+ self,
+ message: bytes,
+ args: Optional[Sequence[bytes]] = None,
+ ) -> None:
self.message = message
self.messageargs = args
--- a/mercurial/hgweb/webcommands.py Wed Dec 20 20:13:22 2023 +0100
+++ b/mercurial/hgweb/webcommands.py Tue Dec 19 21:29:34 2023 +0100
@@ -516,8 +516,7 @@
rev = webcommand(b'rev')(changeset)
-def decodepath(path):
- # type: (bytes) -> bytes
+def decodepath(path: bytes) -> bytes:
"""Hook for mapping a path in the repository to a path in the
working copy.
--- a/mercurial/i18n.py Wed Dec 20 20:13:22 2023 +0100
+++ b/mercurial/i18n.py Tue Dec 19 21:29:34 2023 +0100
@@ -71,8 +71,7 @@
_msgcache = {} # encoding: {message: translation}
-def gettext(message):
- # type: (bytes) -> bytes
+def gettext(message: bytes) -> bytes:
"""Translate message.
The message is looked up in the catalog to get a Unicode string,
@@ -123,6 +122,10 @@
if _plain():
- _ = lambda message: message # type: Callable[[bytes], bytes]
+
+ def _(message: bytes) -> bytes:
+ return message
+
+
else:
_ = gettext
--- a/mercurial/logcmdutil.py Wed Dec 20 20:13:22 2023 +0100
+++ b/mercurial/logcmdutil.py Tue Dec 19 21:29:34 2023 +0100
@@ -789,8 +789,11 @@
limit = attr.ib(default=None)
-def parseopts(ui, pats, opts):
- # type: (Any, Sequence[bytes], Dict[bytes, Any]) -> walkopts
+def parseopts(
+ ui: Any,
+ pats: Sequence[bytes],
+ opts: Dict[bytes, Any],
+) -> walkopts:
"""Parse log command options into walkopts
The returned walkopts will be passed in to getrevs() or makewalker().
@@ -1080,8 +1083,12 @@
return revs
-def makewalker(repo, wopts):
- # type: (Any, walkopts) -> Tuple[smartset.abstractsmartset, Optional[Callable[[Any], matchmod.basematcher]]]
+def makewalker(
+ repo: Any,
+ wopts: walkopts,
+) -> Tuple[
+ smartset.abstractsmartset, Optional[Callable[[Any], matchmod.basematcher]]
+]:
"""Build (revs, makefilematcher) to scan revision/file history
- revs is the smartset to be traversed.
@@ -1131,8 +1138,10 @@
return revs, filematcher
-def getrevs(repo, wopts):
- # type: (Any, walkopts) -> Tuple[smartset.abstractsmartset, Optional[changesetdiffer]]
+def getrevs(
+ repo: Any,
+ wopts: walkopts,
+) -> Tuple[smartset.abstractsmartset, Optional[changesetdiffer]]:
"""Return (revs, differ) where revs is a smartset
differ is a changesetdiffer with pre-configured file matcher.
--- a/mercurial/mail.py Wed Dec 20 20:13:22 2023 +0100
+++ b/mercurial/mail.py Tue Dec 19 21:29:34 2023 +0100
@@ -21,6 +21,7 @@
from typing import (
Any,
List,
+ Optional,
Tuple,
Union,
)
@@ -113,8 +114,7 @@
return new_socket
-def _pyhastls():
- # type: () -> bool
+def _pyhastls() -> bool:
"""Returns true iff Python has TLS support, false otherwise."""
try:
import ssl
@@ -277,8 +277,7 @@
)
-def codec2iana(cs):
- # type: (str) -> str
+def codec2iana(cs: str) -> str:
''' '''
cs = email.charset.Charset(cs).input_charset.lower()
@@ -288,8 +287,11 @@
return cs
-def mimetextpatch(s, subtype='plain', display=False):
- # type: (bytes, str, bool) -> email.message.Message
+def mimetextpatch(
+ s: bytes,
+ subtype: str = 'plain',
+ display: bool = False,
+) -> email.message.Message:
"""Return MIME message suitable for a patch.
Charset will be detected by first trying to decode as us-ascii, then utf-8,
and finally the global encodings. If all those fail, fall back to
@@ -314,8 +316,9 @@
return mimetextqp(s, subtype, "iso-8859-1")
-def mimetextqp(body, subtype, charset):
- # type: (bytes, str, str) -> email.message.Message
+def mimetextqp(
+ body: bytes, subtype: str, charset: str
+) -> email.message.Message:
"""Return MIME message.
Quoted-printable transfer encoding will be used if necessary.
"""
@@ -340,8 +343,7 @@
return msg
-def _charsets(ui):
- # type: (Any) -> List[str]
+def _charsets(ui: Any) -> List[str]:
'''Obtains charsets to send mail parts not containing patches.'''
charsets = [
pycompat.sysstr(cs.lower())
@@ -358,8 +360,7 @@
return [cs for cs in charsets if not cs.endswith('ascii')]
-def _encode(ui, s, charsets):
- # type: (Any, bytes, List[str]) -> Tuple[bytes, str]
+def _encode(ui: Any, s: bytes, charsets: List[str]) -> Tuple[bytes, str]:
"""Returns (converted) string, charset tuple.
Finds out best charset by cycling through sendcharsets in descending
order. Tries both encoding and fallbackencoding for input. Only as
@@ -409,8 +410,12 @@
return s, 'us-ascii'
-def headencode(ui, s, charsets=None, display=False):
- # type: (Any, Union[bytes, str], List[str], bool) -> str
+def headencode(
+ ui: Any,
+ s: Union[bytes, str],
+ charsets: Optional[List[str]] = None,
+ display: bool = False,
+) -> str:
'''Returns RFC-2047 compliant header from given string.'''
if not display:
# split into words?
@@ -419,8 +424,9 @@
return encoding.strfromlocal(s)
-def _addressencode(ui, name, addr, charsets=None):
- # type: (Any, str, str, List[str]) -> str
+def _addressencode(
+ ui: Any, name: str, addr: str, charsets: Optional[List[str]] = None
+) -> str:
addr = encoding.strtolocal(addr)
name = headencode(ui, name, charsets)
try:
@@ -439,8 +445,12 @@
return email.utils.formataddr((name, encoding.strfromlocal(addr)))
-def addressencode(ui, address, charsets=None, display=False):
- # type: (Any, bytes, List[str], bool) -> str
+def addressencode(
+ ui: Any,
+ address: bytes,
+ charsets: Optional[List[str]] = None,
+ display: bool = False,
+) -> str:
'''Turns address into RFC-2047 compliant header.'''
if display or not address:
return encoding.strfromlocal(address or b'')
@@ -448,8 +458,12 @@
return _addressencode(ui, name, addr, charsets)
-def addrlistencode(ui, addrs, charsets=None, display=False):
- # type: (Any, List[bytes], List[str], bool) -> List[str]
+def addrlistencode(
+ ui: Any,
+ addrs: List[bytes],
+ charsets: Optional[List[str]] = None,
+ display: bool = False,
+) -> List[str]:
"""Turns a list of addresses into a list of RFC-2047 compliant headers.
A single element of input list may contain multiple addresses, but output
always has one address per item"""
@@ -468,8 +482,12 @@
return result
-def mimeencode(ui, s, charsets=None, display=False):
- # type: (Any, bytes, List[str], bool) -> email.message.Message
+def mimeencode(
+ ui: Any,
+ s: bytes,
+ charsets: Optional[List[str]] = None,
+ display: bool = False,
+) -> email.message.Message:
"""creates mime text object, encodes it if needed, and sets
charset and transfer-encoding accordingly."""
cs = 'us-ascii'
@@ -481,8 +499,7 @@
Generator = email.generator.BytesGenerator
-def parse(fp):
- # type: (Any) -> email.message.Message
+def parse(fp: Any) -> email.message.Message:
ep = email.parser.Parser()
# disable the "universal newlines" mode, which isn't binary safe.
# I have no idea if ascii/surrogateescape is correct, but that's
@@ -496,14 +513,12 @@
fp.detach()
-def parsebytes(data):
- # type: (bytes) -> email.message.Message
+def parsebytes(data: bytes) -> email.message.Message:
ep = email.parser.BytesParser()
return ep.parsebytes(data)
-def headdecode(s):
- # type: (Union[email.header.Header, bytes]) -> bytes
+def headdecode(s: Union[email.header.Header, bytes]) -> bytes:
'''Decodes RFC-2047 header'''
uparts = []
for part, charset in email.header.decode_header(s):
--- a/mercurial/pathutil.py Wed Dec 20 20:13:22 2023 +0100
+++ b/mercurial/pathutil.py Tue Dec 19 21:29:34 2023 +0100
@@ -32,8 +32,7 @@
]
-def _lowerclean(s):
- # type: (bytes) -> bytes
+def _lowerclean(s: bytes) -> bytes:
return encoding.hfsignoreclean(s.lower())
@@ -72,8 +71,7 @@
else:
self.normcase = lambda x: x
- def __call__(self, path, mode=None):
- # type: (bytes, Optional[Any]) -> None
+ def __call__(self, path: bytes, mode: Optional[Any] = None) -> None:
"""Check the relative path.
path may contain a pattern (e.g. foodir/**.txt)"""
@@ -170,8 +168,7 @@
raise error.Abort(msg % (path, pycompat.bytestr(prefix)))
return True
- def check(self, path):
- # type: (bytes) -> bool
+ def check(self, path: bytes) -> bool:
try:
self(path)
return True
@@ -192,8 +189,12 @@
self._cached = False
-def canonpath(root, cwd, myname, auditor=None):
- # type: (bytes, bytes, bytes, Optional[pathauditor]) -> bytes
+def canonpath(
+ root: bytes,
+ cwd: bytes,
+ myname: bytes,
+ auditor: Optional[pathauditor] = None,
+) -> bytes:
"""return the canonical path of myname, given cwd and root
>>> def check(root, cwd, myname):
@@ -295,8 +296,7 @@
)
-def normasprefix(path):
- # type: (bytes) -> bytes
+def normasprefix(path: bytes) -> bytes:
"""normalize the specified path as path prefix
Returned value can be used safely for "p.startswith(prefix)",
@@ -319,8 +319,7 @@
return path
-def finddirs(path):
- # type: (bytes) -> Iterator[bytes]
+def finddirs(path: bytes) -> Iterator[bytes]:
pos = path.rfind(b'/')
while pos != -1:
yield path[:pos]
@@ -355,8 +354,7 @@
for f in map:
addpath(f)
- def addpath(self, path):
- # type: (bytes) -> None
+ def addpath(self, path: bytes) -> None:
dirs = self._dirs
for base in finddirs(path):
if base.endswith(b'/'):
@@ -368,8 +366,7 @@
return
dirs[base] = 1
- def delpath(self, path):
- # type: (bytes) -> None
+ def delpath(self, path: bytes) -> None:
dirs = self._dirs
for base in finddirs(path):
if dirs[base] > 1:
@@ -380,8 +377,7 @@
def __iter__(self):
return iter(self._dirs)
- def __contains__(self, d):
- # type: (bytes) -> bool
+ def __contains__(self, d: bytes) -> bool:
return d in self._dirs
--- a/mercurial/phases.py Wed Dec 20 20:13:22 2023 +0100
+++ b/mercurial/phases.py Tue Dec 19 21:29:34 2023 +0100
@@ -192,20 +192,20 @@
no_bundle_phases = all_internal_phases
-def supportinternal(repo):
- # type: (localrepo.localrepository) -> bool
+def supportinternal(repo: "localrepo.localrepository") -> bool:
"""True if the internal phase can be used on a repository"""
return requirements.INTERNAL_PHASE_REQUIREMENT in repo.requirements
-def supportarchived(repo):
- # type: (localrepo.localrepository) -> bool
+def supportarchived(repo: "localrepo.localrepository") -> bool:
"""True if the archived phase can be used on a repository"""
return requirements.ARCHIVED_PHASE_REQUIREMENT in repo.requirements
-def _readroots(repo, phasedefaults=None):
- # type: (localrepo.localrepository, Optional[Phasedefaults]) -> Tuple[Phaseroots, bool]
+def _readroots(
+ repo: "localrepo.localrepository",
+ phasedefaults: Optional["Phasedefaults"] = None,
+) -> Tuple[Phaseroots, bool]:
"""Read phase roots from disk
phasedefaults is a list of fn(repo, roots) callable, which are
@@ -235,8 +235,7 @@
return roots, dirty
-def binaryencode(phasemapping):
- # type: (Dict[int, List[bytes]]) -> bytes
+def binaryencode(phasemapping: Dict[int, List[bytes]]) -> bytes:
"""encode a 'phase -> nodes' mapping into a binary stream
The revision lists are encoded as (phase, root) pairs.
@@ -248,8 +247,7 @@
return b''.join(binarydata)
-def binarydecode(stream):
- # type: (...) -> Dict[int, List[bytes]]
+def binarydecode(stream) -> Dict[int, List[bytes]]:
"""decode a binary stream into a 'phase -> nodes' mapping
The (phase, root) pairs are turned back into a dictionary with
@@ -367,8 +365,12 @@
class phasecache:
- def __init__(self, repo, phasedefaults, _load=True):
- # type: (localrepo.localrepository, Optional[Phasedefaults], bool) -> None
+ def __init__(
+ self,
+ repo: "localrepo.localrepository",
+ phasedefaults: Optional["Phasedefaults"],
+ _load: bool = True,
+ ):
if _load:
# Cheap trick to allow shallow-copy without copy module
self.phaseroots, self.dirty = _readroots(repo, phasedefaults)
@@ -377,8 +379,7 @@
self.filterunknown(repo)
self.opener = repo.svfs
- def hasnonpublicphases(self, repo):
- # type: (localrepo.localrepository) -> bool
+ def hasnonpublicphases(self, repo: "localrepo.localrepository") -> bool:
"""detect if there are revisions with non-public phase"""
repo = repo.unfiltered()
cl = repo.changelog
@@ -389,8 +390,9 @@
revs for phase, revs in self.phaseroots.items() if phase != public
)
- def nonpublicphaseroots(self, repo):
- # type: (localrepo.localrepository) -> Set[bytes]
+ def nonpublicphaseroots(
+ self, repo: "localrepo.localrepository"
+ ) -> Set[bytes]:
"""returns the roots of all non-public phases
The roots are not minimized, so if the secret revisions are
@@ -409,8 +411,12 @@
]
)
- def getrevset(self, repo, phases, subset=None):
- # type: (localrepo.localrepository, Iterable[int], Optional[Any]) -> Any
+ def getrevset(
+ self,
+ repo: "localrepo.localrepository",
+ phases: Iterable[int],
+ subset: Optional[Any] = None,
+ ) -> Any:
# TODO: finish typing this
"""return a smartset for the given phases"""
self.loadphaserevs(repo) # ensure phase's sets are loaded
@@ -506,8 +512,7 @@
self._phasesets[phase] = ps
self._loadedrevslen = len(cl)
- def loadphaserevs(self, repo):
- # type: (localrepo.localrepository) -> None
+ def loadphaserevs(self, repo: "localrepo.localrepository") -> None:
"""ensure phase information is loaded in the object"""
if self._phasesets is None:
try:
@@ -520,8 +525,7 @@
self._loadedrevslen = 0
self._phasesets = None
- def phase(self, repo, rev):
- # type: (localrepo.localrepository, int) -> int
+ def phase(self, repo: "localrepo.localrepository", rev: int) -> int:
# We need a repo argument here to be able to build _phasesets
# if necessary. The repository instance is not stored in
# phasecache to avoid reference cycles. The changelog instance
@@ -708,8 +712,7 @@
return True
return False
- def filterunknown(self, repo):
- # type: (localrepo.localrepository) -> None
+ def filterunknown(self, repo: "localrepo.localrepository") -> None:
"""remove unknown nodes from the phase boundary
Nothing is lost as unknown nodes only hold data for their descendants.
@@ -786,8 +789,7 @@
repo._phasecache.replace(phcache)
-def listphases(repo):
- # type: (localrepo.localrepository) -> Dict[bytes, bytes]
+def listphases(repo: "localrepo.localrepository") -> Dict[bytes, bytes]:
"""List phases root for serialization over pushkey"""
# Use ordered dictionary so behavior is deterministic.
keys = util.sortdict()
@@ -818,8 +820,12 @@
return keys
-def pushphase(repo, nhex, oldphasestr, newphasestr):
- # type: (localrepo.localrepository, bytes, bytes, bytes) -> bool
+def pushphase(
+ repo: "localrepo.localrepository",
+ nhex: bytes,
+ oldphasestr: bytes,
+ newphasestr: bytes,
+) -> bool:
"""List phases root for serialization over pushkey"""
repo = repo.unfiltered()
with repo.lock():
@@ -966,8 +972,7 @@
return pycompat.maplist(cl.node, sorted(new_heads))
-def newcommitphase(ui):
- # type: (uimod.ui) -> int
+def newcommitphase(ui: "uimod.ui") -> int:
"""helper to get the target phase of new commit
Handle all possible values for the phases.new-commit options.
@@ -982,14 +987,16 @@
)
-def hassecret(repo):
- # type: (localrepo.localrepository) -> bool
+def hassecret(repo: "localrepo.localrepository") -> bool:
"""utility function that check if a repo have any secret changeset."""
return bool(repo._phasecache.phaseroots[secret])
-def preparehookargs(node, old, new):
- # type: (bytes, Optional[int], Optional[int]) -> Dict[bytes, bytes]
+def preparehookargs(
+ node: bytes,
+ old: Optional[int],
+ new: Optional[int],
+) -> Dict[bytes, bytes]:
if old is None:
old = b''
else:
--- a/mercurial/pvec.py Wed Dec 20 20:13:22 2023 +0100
+++ b/mercurial/pvec.py Tue Dec 19 21:29:34 2023 +0100
@@ -72,8 +72,7 @@
return v
-def _str(v, l):
- # type: (int, int) -> bytes
+def _str(v: int, l: int) -> bytes:
bs = b""
for p in range(l):
bs = pycompat.bytechr(v & 255) + bs
--- a/mercurial/state.py Wed Dec 20 20:13:22 2023 +0100
+++ b/mercurial/state.py Tue Dec 19 21:29:34 2023 +0100
@@ -59,8 +59,7 @@
self._repo = repo
self.fname = fname
- def read(self):
- # type: () -> Dict[bytes, Any]
+ def read(self) -> Dict[bytes, Any]:
"""read the existing state file and return a dict of data stored"""
return self._read()
--- a/mercurial/subrepoutil.py Wed Dec 20 20:13:22 2023 +0100
+++ b/mercurial/subrepoutil.py Tue Dec 19 21:29:34 2023 +0100
@@ -69,8 +69,7 @@
Substate = Dict[bytes, Tuple[bytes, bytes, bytes]]
-def state(ctx, ui):
- # type: (context.changectx, uimod.ui) -> Substate
+def state(ctx: "context.changectx", ui: "uimod.ui") -> Substate:
"""return a state dict, mapping subrepo paths configured in .hgsub
to tuple: (source from .hgsub, revision from .hgsubstate, kind
(key in types dict))
@@ -122,8 +121,7 @@
except FileNotFoundError:
pass
- def remap(src):
- # type: (bytes) -> bytes
+ def remap(src: bytes) -> bytes:
for pattern, repl in p.items(b'subpaths'):
# Turn r'C:\foo\bar' into r'C:\\foo\\bar' since re.sub
# does a string decode.
@@ -175,8 +173,7 @@
return state
-def writestate(repo, state):
- # type: (localrepo.localrepository, Substate) -> None
+def writestate(repo: "localrepo.localrepository", state: Substate) -> None:
"""rewrite .hgsubstate in (outer) repo with these subrepo states"""
lines = [
b'%s %s\n' % (state[s][1], s)
@@ -186,8 +183,14 @@
repo.wwrite(b'.hgsubstate', b''.join(lines), b'')
-def submerge(repo, wctx, mctx, actx, overwrite, labels=None):
- # type: (localrepo.localrepository, context.workingctx, context.changectx, context.changectx, bool, Optional[Any]) -> Substate
+def submerge(
+ repo: "localrepo.localrepository",
+ wctx: "context.workingctx",
+ mctx: "context.changectx",
+ actx: "context.changectx",
+ overwrite: bool,
+ labels: Optional[Any] = None,
+) -> Substate:
# TODO: type the `labels` arg
"""delegated from merge.applyupdates: merging of .hgsubstate file
in working context, merging context and ancestor context"""
@@ -327,8 +330,13 @@
return sm
-def precommit(ui, wctx, status, match, force=False):
- # type: (uimod.ui, context.workingcommitctx, scmutil.status, matchmod.basematcher, bool) -> Tuple[List[bytes], Set[bytes], Substate]
+def precommit(
+ ui: "uimod.ui",
+ wctx: "context.workingcommitctx",
+ status: "scmutil.status",
+ match: "matchmod.basematcher",
+ force: bool = False,
+) -> Tuple[List[bytes], Set[bytes], Substate]:
"""Calculate .hgsubstate changes that should be applied before committing
Returns (subs, commitsubs, newstate) where
@@ -416,8 +424,7 @@
return posixpath.normpath(path)
-def reporelpath(repo):
- # type: (localrepo.localrepository) -> bytes
+def reporelpath(repo: "localrepo.localrepository") -> bytes:
"""return path to this (sub)repo as seen from outermost repo"""
parent = repo
while hasattr(parent, '_subparent'):
@@ -425,14 +432,16 @@
return repo.root[len(pathutil.normasprefix(parent.root)) :]
-def subrelpath(sub):
- # type: (subrepo.abstractsubrepo) -> bytes
+def subrelpath(sub: "subrepo.abstractsubrepo") -> bytes:
"""return path to this subrepo as seen from outermost repo"""
return sub._relpath
-def _abssource(repo, push=False, abort=True):
- # type: (localrepo.localrepository, bool, bool) -> Optional[bytes]
+def _abssource(
+ repo: "localrepo.localrepository",
+ push: bool = False,
+ abort: bool = True,
+) -> Optional[bytes]:
"""return pull/push path of repo - either based on parent repo .hgsub info
or on the top repo config. Abort or return None if no source found."""
if hasattr(repo, '_subparent'):
@@ -480,8 +489,7 @@
raise error.Abort(_(b"default path for subrepository not found"))
-def newcommitphase(ui, ctx):
- # type: (uimod.ui, context.changectx) -> int
+def newcommitphase(ui: "uimod.ui", ctx: "context.changectx") -> int:
commitphase = phases.newcommitphase(ui)
substate = getattr(ctx, "substate", None)
if not substate:
--- a/mercurial/util.py Wed Dec 20 20:13:22 2023 +0100
+++ b/mercurial/util.py Tue Dec 19 21:29:34 2023 +0100
@@ -147,8 +147,7 @@
username = platform.username
-def setumask(val):
- # type: (int) -> None
+def setumask(val: int) -> None:
'''updates the umask. used by chg server'''
if pycompat.iswindows:
return
@@ -1850,8 +1849,7 @@
nogc = lambda x: x
-def pathto(root, n1, n2):
- # type: (bytes, bytes, bytes) -> bytes
+def pathto(root: bytes, n1: bytes, n2: bytes) -> bytes:
"""return the relative path from one place to another.
root should use os.sep to separate directories
n1 should use os.sep to separate directories
@@ -2062,8 +2060,7 @@
_winreservedchars = b':*?"<>|'
-def checkwinfilename(path):
- # type: (bytes) -> Optional[bytes]
+def checkwinfilename(path: bytes) -> Optional[bytes]:
r"""Check that the base-relative path is a valid filename on Windows.
Returns None if the path is ok, or a UI string describing the problem.
@@ -2157,8 +2154,7 @@
os.close(ld)
-def readlock(pathname):
- # type: (bytes) -> bytes
+def readlock(pathname: bytes) -> bytes:
try:
return readlink(pathname)
except OSError as why:
@@ -2181,8 +2177,7 @@
# File system features
-def fscasesensitive(path):
- # type: (bytes) -> bool
+def fscasesensitive(path: bytes) -> bool:
"""
Return true if the given path is on a case-sensitive filesystem
@@ -2286,8 +2281,7 @@
_fspathcache = {}
-def fspath(name, root):
- # type: (bytes, bytes) -> bytes
+def fspath(name: bytes, root: bytes) -> bytes:
"""Get name in the case stored in the filesystem
The name should be relative to root, and be normcase-ed for efficiency.
@@ -2331,8 +2325,7 @@
return b''.join(result)
-def checknlink(testfile):
- # type: (bytes) -> bool
+def checknlink(testfile: bytes) -> bool:
'''check whether hardlink count reporting works properly'''
# testfile may be open, so we need a separate file for checking to
@@ -2365,8 +2358,7 @@
pass
-def endswithsep(path):
- # type: (bytes) -> bool
+def endswithsep(path: bytes) -> bool:
'''Check path ends with os.sep or os.altsep.'''
return bool( # help pytype
path.endswith(pycompat.ossep)
@@ -2375,8 +2367,7 @@
)
-def splitpath(path):
- # type: (bytes) -> List[bytes]
+def splitpath(path: bytes) -> List[bytes]:
"""Split path by os.sep.
Note that this function does not use os.altsep because this is
an alternative of simple "xxx.split(os.sep)".
@@ -2609,8 +2600,9 @@
raise
-def unlinkpath(f, ignoremissing=False, rmdir=True):
- # type: (bytes, bool, bool) -> None
+def unlinkpath(
+ f: bytes, ignoremissing: bool = False, rmdir: bool = True
+) -> None:
"""unlink and remove the directory if it is empty"""
if ignoremissing:
tryunlink(f)
@@ -2624,8 +2616,7 @@
pass
-def tryunlink(f):
- # type: (bytes) -> None
+def tryunlink(f: bytes) -> None:
"""Attempt to remove a file, ignoring FileNotFoundError."""
try:
unlink(f)
@@ -2633,8 +2624,9 @@
pass
-def makedirs(name, mode=None, notindexed=False):
- # type: (bytes, Optional[int], bool) -> None
+def makedirs(
+ name: bytes, mode: Optional[int] = None, notindexed: bool = False
+) -> None:
"""recursive directory creation with parent mode inheritance
Newly created directories are marked as "not to be indexed by
@@ -2663,20 +2655,17 @@
os.chmod(name, mode)
-def readfile(path):
- # type: (bytes) -> bytes
+def readfile(path: bytes) -> bytes:
with open(path, b'rb') as fp:
return fp.read()
-def writefile(path, text):
- # type: (bytes, bytes) -> None
+def writefile(path: bytes, text: bytes) -> None:
with open(path, b'wb') as fp:
fp.write(text)
-def appendfile(path, text):
- # type: (bytes, bytes) -> None
+def appendfile(path: bytes, text: bytes) -> None:
with open(path, b'ab') as fp:
fp.write(text)
@@ -2837,8 +2826,7 @@
return go
-def processlinerange(fromline, toline):
- # type: (int, int) -> Tuple[int, int]
+def processlinerange(fromline: int, toline: int) -> Tuple[int, int]:
"""Check that linerange <fromline>:<toline> makes sense and return a
0-based range.
@@ -2897,13 +2885,11 @@
_eolre = remod.compile(br'\r*\n')
-def tolf(s):
- # type: (bytes) -> bytes
+def tolf(s: bytes) -> bytes:
return _eolre.sub(b'\n', s)
-def tocrlf(s):
- # type: (bytes) -> bytes
+def tocrlf(s: bytes) -> bytes:
return _eolre.sub(b'\r\n', s)
@@ -2926,15 +2912,13 @@
return fp
-def iterlines(iterator):
- # type: (Iterable[bytes]) -> Iterator[bytes]
+def iterlines(iterator: Iterable[bytes]) -> Iterator[bytes]:
for chunk in iterator:
for line in chunk.splitlines():
yield line
-def expandpath(path):
- # type: (bytes) -> bytes
+def expandpath(path: bytes) -> bytes:
return os.path.expanduser(os.path.expandvars(path))
@@ -3062,8 +3046,7 @@
)
-def sizetoint(s):
- # type: (bytes) -> int
+def sizetoint(s: bytes) -> int:
"""Convert a space specifier to a byte count.
>>> sizetoint(b'30')
@@ -3285,8 +3268,7 @@
yield
-def _estimatememory():
- # type: () -> Optional[int]
+def _estimatememory() -> Optional[int]:
"""Provide an estimate for the available system memory in Bytes.
If no estimate can be provided on the platform, returns None.
--- a/mercurial/utils/dateutil.py Wed Dec 20 20:13:22 2023 +0100
+++ b/mercurial/utils/dateutil.py Tue Dec 19 21:29:34 2023 +0100
@@ -81,8 +81,7 @@
)
-def makedate(timestamp=None):
- # type: (Optional[float]) -> hgdate
+def makedate(timestamp: Optional[float] = None) -> hgdate:
"""Return a unix timestamp (or the current time) as a (unixtime,
offset) tuple based off the local timezone."""
if timestamp is None:
@@ -103,8 +102,10 @@
return timestamp, tz
-def datestr(date=None, format=b'%a %b %d %H:%M:%S %Y %1%2'):
- # type: (Optional[hgdate], bytes) -> bytes
+def datestr(
+ date: Optional[hgdate] = None,
+ format: bytes = b'%a %b %d %H:%M:%S %Y %1%2',
+) -> bytes:
"""represent a (unixtime, offset) tuple as a localized time.
unixtime is seconds since the epoch, and offset is the time zone's
number of seconds away from UTC.
@@ -141,14 +142,12 @@
return s
-def shortdate(date=None):
- # type: (Optional[hgdate]) -> bytes
+def shortdate(date: Optional[hgdate] = None) -> bytes:
"""turn (timestamp, tzoff) tuple into iso 8631 date."""
return datestr(date, format=b'%Y-%m-%d')
-def parsetimezone(s):
- # type: (bytes) -> Tuple[Optional[int], bytes]
+def parsetimezone(s: bytes) -> Tuple[Optional[int], bytes]:
"""find a trailing timezone, if any, in string, and return a
(offset, remainder) pair"""
s = pycompat.bytestr(s)
@@ -183,8 +182,11 @@
return None, s
-def strdate(string, format, defaults=None):
- # type: (bytes, bytes, Optional[Dict[bytes, Tuple[bytes, bytes]]]) -> hgdate
+def strdate(
+ string: bytes,
+ format: bytes,
+ defaults: Optional[Dict[bytes, Tuple[bytes, bytes]]] = None,
+) -> hgdate:
"""parse a localized time string and return a (unixtime, offset) tuple.
if the string cannot be parsed, ValueError is raised."""
if defaults is None:
@@ -226,8 +228,11 @@
return unixtime, offset
-def parsedate(date, formats=None, bias=None):
- # type: (Union[bytes, hgdate], Optional[Iterable[bytes]], Optional[Dict[bytes, bytes]]) -> hgdate
+def parsedate(
+ date: Union[bytes, hgdate],
+ formats: Optional[Iterable[bytes]] = None,
+ bias: Optional[Dict[bytes, bytes]] = None,
+) -> hgdate:
"""parse a localized date/time and return a (unixtime, offset) tuple.
The date may be a "unixtime offset" string or in one of the specified
@@ -316,8 +321,7 @@
return when, offset
-def matchdate(date):
- # type: (bytes) -> Callable[[float], bool]
+def matchdate(date: bytes) -> Callable[[float], bool]:
"""Return a function that matches a given date match specifier
Formats include:
@@ -346,13 +350,11 @@
False
"""
- def lower(date):
- # type: (bytes) -> float
+ def lower(date: bytes) -> float:
d = {b'mb': b"1", b'd': b"1"}
return parsedate(date, extendeddateformats, d)[0]
- def upper(date):
- # type: (bytes) -> float
+ def upper(date: bytes) -> float:
d = {b'mb': b"12", b'HI': b"23", b'M': b"59", b'S': b"59"}
for days in (b"31", b"30", b"29"):
try:
--- a/mercurial/utils/urlutil.py Wed Dec 20 20:13:22 2023 +0100
+++ b/mercurial/utils/urlutil.py Tue Dec 19 21:29:34 2023 +0100
@@ -34,8 +34,7 @@
urlreq = urllibcompat.urlreq
-def getport(port):
- # type: (Union[bytes, int]) -> int
+def getport(port: Union[bytes, int]) -> int:
"""Return the port for a given network service.
If port is an integer, it's returned as is. If it's a string, it's
@@ -133,8 +132,12 @@
_safepchars = b"/!~*'()+:\\"
_matchscheme = remod.compile(b'^[a-zA-Z0-9+.\\-]+:').match
- def __init__(self, path, parsequery=True, parsefragment=True):
- # type: (bytes, bool, bool) -> None
+ def __init__(
+ self,
+ path: bytes,
+ parsequery: bool = True,
+ parsefragment: bool = True,
+ ) -> None:
# We slowly chomp away at path until we have only the path left
self.scheme = self.user = self.passwd = self.host = None
self.port = self.path = self.query = self.fragment = None
@@ -378,8 +381,7 @@
return True # POSIX-style
return False
- def localpath(self):
- # type: () -> bytes
+ def localpath(self) -> bytes:
if self.scheme == b'file' or self.scheme == b'bundle':
path = self.path or b'/'
# For Windows, we need to promote hosts containing drive
@@ -402,23 +404,19 @@
)
-def hasscheme(path):
- # type: (bytes) -> bool
+def hasscheme(path: bytes) -> bool:
return bool(url(path).scheme) # cast to help pytype
-def hasdriveletter(path):
- # type: (bytes) -> bool
+def hasdriveletter(path: bytes) -> bool:
return bool(path) and path[1:2] == b':' and path[0:1].isalpha()
-def urllocalpath(path):
- # type: (bytes) -> bytes
+def urllocalpath(path: bytes) -> bytes:
return url(path, parsequery=False, parsefragment=False).localpath()
-def checksafessh(path):
- # type: (bytes) -> None
+def checksafessh(path: bytes) -> None:
"""check if a path / url is a potentially unsafe ssh exploit (SEC)
This is a sanity check for ssh urls. ssh will parse the first item as
@@ -435,8 +433,7 @@
)
-def hidepassword(u):
- # type: (bytes) -> bytes
+def hidepassword(u: bytes) -> bytes:
'''hide user credential in a url string'''
u = url(u)
if u.passwd:
@@ -444,8 +441,7 @@
return bytes(u)
-def removeauth(u):
- # type: (bytes) -> bytes
+def removeauth(u: bytes) -> bytes:
'''remove all authentication information from a url string'''
u = url(u)
u.user = u.passwd = None