--- a/mercurial/ui.py Sat Dec 10 14:57:42 2022 -0500
+++ b/mercurial/ui.py Sun Dec 11 00:10:56 2022 -0500
@@ -20,10 +20,15 @@
import traceback
from typing import (
+ Any,
+ Callable,
Dict,
List,
+ NoReturn,
Optional,
Tuple,
+ Type,
+ TypeVar,
Union,
cast,
)
@@ -57,21 +62,23 @@
urlutil,
)
+_ConfigItems = Dict[Tuple[bytes, bytes], object] # {(section, name) : value}
# The **opts args of the various write() methods can be basically anything, but
# there's no way to express it as "anything but str". So type it to be the
# handful of known types that are used.
_MsgOpts = Union[bytes, bool, List["_PromptChoice"]]
_PromptChoice = Tuple[bytes, bytes]
+_Tui = TypeVar('_Tui', bound="ui")
urlreq = util.urlreq
# for use with str.translate(None, _keepalnum), to keep just alphanumerics
-_keepalnum = b''.join(
+_keepalnum: bytes = b''.join(
c for c in map(pycompat.bytechr, range(256)) if not c.isalnum()
)
# The config knobs that will be altered (if unset) by ui.tweakdefaults.
-tweakrc = b"""
+tweakrc: bytes = b"""
[ui]
# The rollback command is dangerous. As a rule, don't use it.
rollback = False
@@ -98,7 +105,7 @@
word-diff = 1
"""
-samplehgrcs = {
+samplehgrcs: Dict[bytes, bytes] = {
b'user': b"""# example user config (see 'hg help config' for more info)
[ui]
# name and email, e.g.
@@ -187,7 +194,7 @@
class httppasswordmgrdbproxy:
"""Delays loading urllib2 until it's needed."""
- def __init__(self):
+ def __init__(self) -> None:
self._mgr = None
def _get_mgr(self):
@@ -210,7 +217,7 @@
)
-def _catchterm(*args):
+def _catchterm(*args) -> NoReturn:
raise error.SignalInterrupt
@@ -219,11 +226,11 @@
_unset = object()
# _reqexithandlers: callbacks run at the end of a request
-_reqexithandlers = []
+_reqexithandlers: List = []
class ui:
- def __init__(self, src=None):
+ def __init__(self, src: Optional["ui"] = None) -> None:
"""Create a fresh new ui object if no src given
Use uimod.ui.load() to create a ui which knows global and user configs.
@@ -318,13 +325,13 @@
if k in self.environ:
self._exportableenviron[k] = self.environ[k]
- def _new_source(self):
+ def _new_source(self) -> None:
self._ocfg.new_source()
self._tcfg.new_source()
self._ucfg.new_source()
@classmethod
- def load(cls):
+ def load(cls: Type[_Tui]) -> _Tui:
"""Create a ui and load global and user configs"""
u = cls()
# we always trust global config files and environment variables
@@ -350,7 +357,7 @@
u._new_source() # anything after that is a different level
return u
- def _maybetweakdefaults(self):
+ def _maybetweakdefaults(self) -> None:
if not self.configbool(b'ui', b'tweakdefaults'):
return
if self._tweaked or self.plain(b'tweakdefaults'):
@@ -370,17 +377,17 @@
if not self.hasconfig(section, name):
self.setconfig(section, name, value, b"<tweakdefaults>")
- def copy(self):
+ def copy(self: _Tui) -> _Tui:
return self.__class__(self)
- def resetstate(self):
+ def resetstate(self) -> None:
"""Clear internal state that shouldn't persist across commands"""
if self._progbar:
self._progbar.resetstate() # reset last-print time of progress bar
self.httppasswordmgrdb = httppasswordmgrdbproxy()
@contextlib.contextmanager
- def timeblockedsection(self, key):
+ def timeblockedsection(self, key: bytes):
# this is open-coded below - search for timeblockedsection to find them
starttime = util.timer()
try:
@@ -425,10 +432,10 @@
finally:
self._uninterruptible = False
- def formatter(self, topic, opts):
+ def formatter(self, topic: bytes, opts):
return formatter.formatter(self, self, topic, opts)
- def _trusted(self, fp, f):
+ def _trusted(self, fp, f: bytes) -> bool:
st = util.fstat(fp)
if util.isowner(st):
return True
@@ -454,7 +461,7 @@
def read_resource_config(
self, name, root=None, trust=False, sections=None, remap=None
- ):
+ ) -> None:
try:
fp = resourceutil.open_resource(name[0], name[1])
except IOError:
@@ -468,7 +475,7 @@
def readconfig(
self, filename, root=None, trust=False, sections=None, remap=None
- ):
+ ) -> None:
try:
fp = open(filename, 'rb')
except IOError:
@@ -480,7 +487,7 @@
def _readconfig(
self, filename, fp, root=None, trust=False, sections=None, remap=None
- ):
+ ) -> None:
with fp:
cfg = config.config()
trusted = sections or trust or self._trusted(fp, filename)
@@ -496,7 +503,9 @@
self._applyconfig(cfg, trusted, root)
- def applyconfig(self, configitems, source=b"", root=None):
+ def applyconfig(
+ self, configitems: _ConfigItems, source=b"", root=None
+ ) -> None:
"""Add configitems from a non-file source. Unlike with ``setconfig()``,
they can be overridden by subsequent config file reads. The items are
in the same format as ``configoverride()``, namely a dict of the
@@ -512,7 +521,7 @@
self._applyconfig(cfg, True, root)
- def _applyconfig(self, cfg, trusted, root):
+ def _applyconfig(self, cfg, trusted, root) -> None:
if self.plain():
for k in (
b'debug',
@@ -555,7 +564,7 @@
root = os.path.expanduser(b'~')
self.fixconfig(root=root)
- def fixconfig(self, root=None, section=None):
+ def fixconfig(self, root=None, section=None) -> None:
if section in (None, b'paths'):
# expand vars and ~
# translate paths relative to root (or home) into absolute paths
@@ -618,12 +627,12 @@
self._ucfg.backup(section, item),
)
- def restoreconfig(self, data):
+ def restoreconfig(self, data) -> None:
self._ocfg.restore(data[0])
self._tcfg.restore(data[1])
self._ucfg.restore(data[2])
- def setconfig(self, section, name, value, source=b''):
+ def setconfig(self, section, name, value, source=b'') -> None:
for cfg in (self._ocfg, self._tcfg, self._ucfg):
cfg.set(section, name, value, source)
self.fixconfig(section=section)
@@ -1009,7 +1018,7 @@
for name, value in self.configitems(section, untrusted):
yield section, name, value
- def plain(self, feature=None):
+ def plain(self, feature: Optional[bytes] = None) -> bool:
"""is plain mode active?
Plain mode means that all configuration variables which affect
@@ -1083,7 +1092,7 @@
)
return user
- def shortuser(self, user):
+ def shortuser(self, user: bytes) -> bytes:
"""Return a short representation of a user name or email address."""
if not self.verbose:
user = stringutil.shortuser(user)
@@ -1161,14 +1170,18 @@
self._fmsgout, self._fmsgerr = _selectmsgdests(self)
@contextlib.contextmanager
- def silent(self, error=False, subproc=False, labeled=False):
+ def silent(
+ self, error: bool = False, subproc: bool = False, labeled: bool = False
+ ):
self.pushbuffer(error=error, subproc=subproc, labeled=labeled)
try:
yield
finally:
self.popbuffer()
- def pushbuffer(self, error=False, subproc=False, labeled=False):
+ def pushbuffer(
+ self, error: bool = False, subproc: bool = False, labeled: bool = False
+ ) -> None:
"""install a buffer to capture standard output of the ui object
If error is True, the error output will be captured too.
@@ -1187,7 +1200,7 @@
self._bufferstates.append((error, subproc, labeled))
self._bufferapplylabels = labeled
- def popbuffer(self):
+ def popbuffer(self) -> bytes:
'''pop the last buffer and return the buffered output'''
self._bufferstates.pop()
if self._bufferstates:
@@ -1197,20 +1210,20 @@
return b"".join(self._buffers.pop())
- def _isbuffered(self, dest):
+ def _isbuffered(self, dest) -> bool:
if dest is self._fout:
return bool(self._buffers)
if dest is self._ferr:
return bool(self._bufferstates and self._bufferstates[-1][0])
return False
- def canwritewithoutlabels(self):
+ def canwritewithoutlabels(self) -> bool:
'''check if write skips the label'''
if self._buffers and not self._bufferapplylabels:
return True
return self._colormode is None
- def canbatchlabeledwrites(self):
+ def canbatchlabeledwrites(self) -> bool:
'''check if write calls with labels are batchable'''
# Windows color printing is special, see ``write``.
return self._colormode != b'win32'
@@ -1369,7 +1382,7 @@
util.timer() - starttime
) * 1000
- def _isatty(self, fh):
+ def _isatty(self, fh) -> bool:
if self.configbool(b'ui', b'nontty'):
return False
return procutil.isatty(fh)
@@ -1407,10 +1420,10 @@
finally:
self.restorefinout(fin, fout)
- def disablepager(self):
+ def disablepager(self) -> None:
self._disablepager = True
- def pager(self, command):
+ def pager(self, command: bytes) -> None:
"""Start a pager for subsequent command output.
Commands which produce a long stream of output should call
@@ -1491,7 +1504,7 @@
# warning about a missing pager command.
self.disablepager()
- def _runpager(self, command, env=None):
+ def _runpager(self, command: bytes, env=None) -> bool:
"""Actually start the pager and set up file descriptors.
This is separate in part so that extensions (like chg) can
@@ -1571,7 +1584,7 @@
self._exithandlers.append((func, args, kwargs))
return func
- def interface(self, feature):
+ def interface(self, feature: bytes) -> bytes:
"""what interface to use for interactive console features?
The interface is controlled by the value of `ui.interface` but also by
@@ -1626,12 +1639,12 @@
defaultinterface = b"text"
i = self.config(b"ui", b"interface")
if i in alldefaults:
- defaultinterface = i
+ defaultinterface = cast(bytes, i) # cast to help pytype
- choseninterface = defaultinterface
+ choseninterface: bytes = defaultinterface
f = self.config(b"ui", b"interface.%s" % feature)
if f in availableinterfaces:
- choseninterface = f
+ choseninterface = cast(bytes, f) # cast to help pytype
if i is not None and defaultinterface != i:
if f is not None:
@@ -1671,7 +1684,7 @@
return i
- def termwidth(self):
+ def termwidth(self) -> int:
"""how wide is the terminal in columns?"""
if b'COLUMNS' in encoding.environ:
try:
@@ -1918,14 +1931,14 @@
def edit(
self,
- text,
- user,
- extra=None,
+ text: bytes,
+ user: bytes,
+ extra: Optional[Dict[bytes, Any]] = None, # TODO: value type of bytes?
editform=None,
pending=None,
- repopath=None,
- action=None,
- ):
+ repopath: Optional[bytes] = None,
+ action: Optional[bytes] = None,
+ ) -> bytes:
if action is None:
self.develwarn(
b'action is None but will soon be a required '
@@ -1994,13 +2007,13 @@
def system(
self,
- cmd,
+ cmd: bytes,
environ=None,
- cwd=None,
- onerr=None,
- errprefix=None,
- blockedtag=None,
- ):
+ cwd: Optional[bytes] = None,
+ onerr: Optional[Callable[[bytes], Exception]] = None,
+ errprefix: Optional[bytes] = None,
+ blockedtag: Optional[bytes] = None,
+ ) -> int:
"""execute shell command with appropriate output stream. command
output will be redirected if fout is not stdout.
@@ -2027,12 +2040,12 @@
raise onerr(errmsg)
return rc
- def _runsystem(self, cmd, environ, cwd, out):
+ def _runsystem(self, cmd: bytes, environ, cwd: Optional[bytes], out) -> int:
"""actually execute the given shell command (can be overridden by
extensions like chg)"""
return procutil.system(cmd, environ=environ, cwd=cwd, out=out)
- def traceback(self, exc=None, force=False):
+ def traceback(self, exc=None, force: bool = False):
"""print exception traceback if traceback printing enabled or forced.
only to call in exception handler. returns true if traceback
printed."""
@@ -2130,7 +2143,7 @@
"""Returns a logger of the given name; or None if not registered"""
return self._loggers.get(name)
- def setlogger(self, name, logger):
+ def setlogger(self, name, logger) -> None:
"""Install logger which can be identified later by the given name
More than one loggers can be registered. Use extension or module
@@ -2138,7 +2151,7 @@
"""
self._loggers[name] = logger
- def log(self, event, msgfmt, *msgargs, **opts):
+ def log(self, event, msgfmt, *msgargs, **opts) -> None:
"""hook for logging facility extensions
event should be a readily-identifiable subsystem, which will
@@ -2239,7 +2252,7 @@
return self._exportableenviron
@contextlib.contextmanager
- def configoverride(self, overrides, source=b""):
+ def configoverride(self, overrides: _ConfigItems, source: bytes = b""):
"""Context manager for temporary config overrides
`overrides` must be a dict of the following structure:
{(section, name) : value}"""
@@ -2257,7 +2270,7 @@
if (b'ui', b'quiet') in overrides:
self.fixconfig(section=b'ui')
- def estimatememory(self):
+ def estimatememory(self) -> Optional[int]:
"""Provide an estimate for the available system memory in Bytes.
This can be overriden via ui.available-memory. It returns None, if
@@ -2292,7 +2305,7 @@
return _progresssingleton is not None
-def _selectmsgdests(ui):
+def _selectmsgdests(ui: ui):
name = ui.config(b'ui', b'message-output')
if name == b'channel':
if ui.fmsg: