mercurial/ui.py
changeset 49892 0449fb7729d7
parent 49890 f1e820cda2f5
child 49893 a51328ba33ca
--- a/mercurial/ui.py	Sat Dec 10 14:57:42 2022 -0500
+++ b/mercurial/ui.py	Sun Dec 11 00:10:56 2022 -0500
@@ -20,10 +20,15 @@
 import traceback
 
 from typing import (
+    Any,
+    Callable,
     Dict,
     List,
+    NoReturn,
     Optional,
     Tuple,
+    Type,
+    TypeVar,
     Union,
     cast,
 )
@@ -57,21 +62,23 @@
     urlutil,
 )
 
+_ConfigItems = Dict[Tuple[bytes, bytes], object]  # {(section, name) : value}
 # The **opts args of the various write() methods can be basically anything, but
 # there's no way to express it as "anything but str".  So type it to be the
 # handful of known types that are used.
 _MsgOpts = Union[bytes, bool, List["_PromptChoice"]]
 _PromptChoice = Tuple[bytes, bytes]
+_Tui = TypeVar('_Tui', bound="ui")
 
 urlreq = util.urlreq
 
 # for use with str.translate(None, _keepalnum), to keep just alphanumerics
-_keepalnum = b''.join(
+_keepalnum: bytes = b''.join(
     c for c in map(pycompat.bytechr, range(256)) if not c.isalnum()
 )
 
 # The config knobs that will be altered (if unset) by ui.tweakdefaults.
-tweakrc = b"""
+tweakrc: bytes = b"""
 [ui]
 # The rollback command is dangerous. As a rule, don't use it.
 rollback = False
@@ -98,7 +105,7 @@
 word-diff = 1
 """
 
-samplehgrcs = {
+samplehgrcs: Dict[bytes, bytes] = {
     b'user': b"""# example user config (see 'hg help config' for more info)
 [ui]
 # name and email, e.g.
@@ -187,7 +194,7 @@
 class httppasswordmgrdbproxy:
     """Delays loading urllib2 until it's needed."""
 
-    def __init__(self):
+    def __init__(self) -> None:
         self._mgr = None
 
     def _get_mgr(self):
@@ -210,7 +217,7 @@
         )
 
 
-def _catchterm(*args):
+def _catchterm(*args) -> NoReturn:
     raise error.SignalInterrupt
 
 
@@ -219,11 +226,11 @@
 _unset = object()
 
 # _reqexithandlers: callbacks run at the end of a request
-_reqexithandlers = []
+_reqexithandlers: List = []
 
 
 class ui:
-    def __init__(self, src=None):
+    def __init__(self, src: Optional["ui"] = None) -> None:
         """Create a fresh new ui object if no src given
 
         Use uimod.ui.load() to create a ui which knows global and user configs.
@@ -318,13 +325,13 @@
                 if k in self.environ:
                     self._exportableenviron[k] = self.environ[k]
 
-    def _new_source(self):
+    def _new_source(self) -> None:
         self._ocfg.new_source()
         self._tcfg.new_source()
         self._ucfg.new_source()
 
     @classmethod
-    def load(cls):
+    def load(cls: Type[_Tui]) -> _Tui:
         """Create a ui and load global and user configs"""
         u = cls()
         # we always trust global config files and environment variables
@@ -350,7 +357,7 @@
         u._new_source()  # anything after that is a different level
         return u
 
-    def _maybetweakdefaults(self):
+    def _maybetweakdefaults(self) -> None:
         if not self.configbool(b'ui', b'tweakdefaults'):
             return
         if self._tweaked or self.plain(b'tweakdefaults'):
@@ -370,17 +377,17 @@
                 if not self.hasconfig(section, name):
                     self.setconfig(section, name, value, b"<tweakdefaults>")
 
-    def copy(self):
+    def copy(self: _Tui) -> _Tui:
         return self.__class__(self)
 
-    def resetstate(self):
+    def resetstate(self) -> None:
         """Clear internal state that shouldn't persist across commands"""
         if self._progbar:
             self._progbar.resetstate()  # reset last-print time of progress bar
         self.httppasswordmgrdb = httppasswordmgrdbproxy()
 
     @contextlib.contextmanager
-    def timeblockedsection(self, key):
+    def timeblockedsection(self, key: bytes):
         # this is open-coded below - search for timeblockedsection to find them
         starttime = util.timer()
         try:
@@ -425,10 +432,10 @@
             finally:
                 self._uninterruptible = False
 
-    def formatter(self, topic, opts):
+    def formatter(self, topic: bytes, opts):
         return formatter.formatter(self, self, topic, opts)
 
-    def _trusted(self, fp, f):
+    def _trusted(self, fp, f: bytes) -> bool:
         st = util.fstat(fp)
         if util.isowner(st):
             return True
@@ -454,7 +461,7 @@
 
     def read_resource_config(
         self, name, root=None, trust=False, sections=None, remap=None
-    ):
+    ) -> None:
         try:
             fp = resourceutil.open_resource(name[0], name[1])
         except IOError:
@@ -468,7 +475,7 @@
 
     def readconfig(
         self, filename, root=None, trust=False, sections=None, remap=None
-    ):
+    ) -> None:
         try:
             fp = open(filename, 'rb')
         except IOError:
@@ -480,7 +487,7 @@
 
     def _readconfig(
         self, filename, fp, root=None, trust=False, sections=None, remap=None
-    ):
+    ) -> None:
         with fp:
             cfg = config.config()
             trusted = sections or trust or self._trusted(fp, filename)
@@ -496,7 +503,9 @@
 
         self._applyconfig(cfg, trusted, root)
 
-    def applyconfig(self, configitems, source=b"", root=None):
+    def applyconfig(
+        self, configitems: _ConfigItems, source=b"", root=None
+    ) -> None:
         """Add configitems from a non-file source.  Unlike with ``setconfig()``,
         they can be overridden by subsequent config file reads.  The items are
         in the same format as ``configoverride()``, namely a dict of the
@@ -512,7 +521,7 @@
 
         self._applyconfig(cfg, True, root)
 
-    def _applyconfig(self, cfg, trusted, root):
+    def _applyconfig(self, cfg, trusted, root) -> None:
         if self.plain():
             for k in (
                 b'debug',
@@ -555,7 +564,7 @@
             root = os.path.expanduser(b'~')
         self.fixconfig(root=root)
 
-    def fixconfig(self, root=None, section=None):
+    def fixconfig(self, root=None, section=None) -> None:
         if section in (None, b'paths'):
             # expand vars and ~
             # translate paths relative to root (or home) into absolute paths
@@ -618,12 +627,12 @@
             self._ucfg.backup(section, item),
         )
 
-    def restoreconfig(self, data):
+    def restoreconfig(self, data) -> None:
         self._ocfg.restore(data[0])
         self._tcfg.restore(data[1])
         self._ucfg.restore(data[2])
 
-    def setconfig(self, section, name, value, source=b''):
+    def setconfig(self, section, name, value, source=b'') -> None:
         for cfg in (self._ocfg, self._tcfg, self._ucfg):
             cfg.set(section, name, value, source)
         self.fixconfig(section=section)
@@ -1009,7 +1018,7 @@
             for name, value in self.configitems(section, untrusted):
                 yield section, name, value
 
-    def plain(self, feature=None):
+    def plain(self, feature: Optional[bytes] = None) -> bool:
         """is plain mode active?
 
         Plain mode means that all configuration variables which affect
@@ -1083,7 +1092,7 @@
             )
         return user
 
-    def shortuser(self, user):
+    def shortuser(self, user: bytes) -> bytes:
         """Return a short representation of a user name or email address."""
         if not self.verbose:
             user = stringutil.shortuser(user)
@@ -1161,14 +1170,18 @@
         self._fmsgout, self._fmsgerr = _selectmsgdests(self)
 
     @contextlib.contextmanager
-    def silent(self, error=False, subproc=False, labeled=False):
+    def silent(
+        self, error: bool = False, subproc: bool = False, labeled: bool = False
+    ):
         self.pushbuffer(error=error, subproc=subproc, labeled=labeled)
         try:
             yield
         finally:
             self.popbuffer()
 
-    def pushbuffer(self, error=False, subproc=False, labeled=False):
+    def pushbuffer(
+        self, error: bool = False, subproc: bool = False, labeled: bool = False
+    ) -> None:
         """install a buffer to capture standard output of the ui object
 
         If error is True, the error output will be captured too.
@@ -1187,7 +1200,7 @@
         self._bufferstates.append((error, subproc, labeled))
         self._bufferapplylabels = labeled
 
-    def popbuffer(self):
+    def popbuffer(self) -> bytes:
         '''pop the last buffer and return the buffered output'''
         self._bufferstates.pop()
         if self._bufferstates:
@@ -1197,20 +1210,20 @@
 
         return b"".join(self._buffers.pop())
 
-    def _isbuffered(self, dest):
+    def _isbuffered(self, dest) -> bool:
         if dest is self._fout:
             return bool(self._buffers)
         if dest is self._ferr:
             return bool(self._bufferstates and self._bufferstates[-1][0])
         return False
 
-    def canwritewithoutlabels(self):
+    def canwritewithoutlabels(self) -> bool:
         '''check if write skips the label'''
         if self._buffers and not self._bufferapplylabels:
             return True
         return self._colormode is None
 
-    def canbatchlabeledwrites(self):
+    def canbatchlabeledwrites(self) -> bool:
         '''check if write calls with labels are batchable'''
         # Windows color printing is special, see ``write``.
         return self._colormode != b'win32'
@@ -1369,7 +1382,7 @@
                 util.timer() - starttime
             ) * 1000
 
-    def _isatty(self, fh):
+    def _isatty(self, fh) -> bool:
         if self.configbool(b'ui', b'nontty'):
             return False
         return procutil.isatty(fh)
@@ -1407,10 +1420,10 @@
         finally:
             self.restorefinout(fin, fout)
 
-    def disablepager(self):
+    def disablepager(self) -> None:
         self._disablepager = True
 
-    def pager(self, command):
+    def pager(self, command: bytes) -> None:
         """Start a pager for subsequent command output.
 
         Commands which produce a long stream of output should call
@@ -1491,7 +1504,7 @@
             # warning about a missing pager command.
             self.disablepager()
 
-    def _runpager(self, command, env=None):
+    def _runpager(self, command: bytes, env=None) -> bool:
         """Actually start the pager and set up file descriptors.
 
         This is separate in part so that extensions (like chg) can
@@ -1571,7 +1584,7 @@
         self._exithandlers.append((func, args, kwargs))
         return func
 
-    def interface(self, feature):
+    def interface(self, feature: bytes) -> bytes:
         """what interface to use for interactive console features?
 
         The interface is controlled by the value of `ui.interface` but also by
@@ -1626,12 +1639,12 @@
         defaultinterface = b"text"
         i = self.config(b"ui", b"interface")
         if i in alldefaults:
-            defaultinterface = i
+            defaultinterface = cast(bytes, i)  # cast to help pytype
 
-        choseninterface = defaultinterface
+        choseninterface: bytes = defaultinterface
         f = self.config(b"ui", b"interface.%s" % feature)
         if f in availableinterfaces:
-            choseninterface = f
+            choseninterface = cast(bytes, f)  # cast to help pytype
 
         if i is not None and defaultinterface != i:
             if f is not None:
@@ -1671,7 +1684,7 @@
 
         return i
 
-    def termwidth(self):
+    def termwidth(self) -> int:
         """how wide is the terminal in columns?"""
         if b'COLUMNS' in encoding.environ:
             try:
@@ -1918,14 +1931,14 @@
 
     def edit(
         self,
-        text,
-        user,
-        extra=None,
+        text: bytes,
+        user: bytes,
+        extra: Optional[Dict[bytes, Any]] = None,  # TODO: value type of bytes?
         editform=None,
         pending=None,
-        repopath=None,
-        action=None,
-    ):
+        repopath: Optional[bytes] = None,
+        action: Optional[bytes] = None,
+    ) -> bytes:
         if action is None:
             self.develwarn(
                 b'action is None but will soon be a required '
@@ -1994,13 +2007,13 @@
 
     def system(
         self,
-        cmd,
+        cmd: bytes,
         environ=None,
-        cwd=None,
-        onerr=None,
-        errprefix=None,
-        blockedtag=None,
-    ):
+        cwd: Optional[bytes] = None,
+        onerr: Optional[Callable[[bytes], Exception]] = None,
+        errprefix: Optional[bytes] = None,
+        blockedtag: Optional[bytes] = None,
+    ) -> int:
         """execute shell command with appropriate output stream. command
         output will be redirected if fout is not stdout.
 
@@ -2027,12 +2040,12 @@
             raise onerr(errmsg)
         return rc
 
-    def _runsystem(self, cmd, environ, cwd, out):
+    def _runsystem(self, cmd: bytes, environ, cwd: Optional[bytes], out) -> int:
         """actually execute the given shell command (can be overridden by
         extensions like chg)"""
         return procutil.system(cmd, environ=environ, cwd=cwd, out=out)
 
-    def traceback(self, exc=None, force=False):
+    def traceback(self, exc=None, force: bool = False):
         """print exception traceback if traceback printing enabled or forced.
         only to call in exception handler. returns true if traceback
         printed."""
@@ -2130,7 +2143,7 @@
         """Returns a logger of the given name; or None if not registered"""
         return self._loggers.get(name)
 
-    def setlogger(self, name, logger):
+    def setlogger(self, name, logger) -> None:
         """Install logger which can be identified later by the given name
 
         More than one loggers can be registered. Use extension or module
@@ -2138,7 +2151,7 @@
         """
         self._loggers[name] = logger
 
-    def log(self, event, msgfmt, *msgargs, **opts):
+    def log(self, event, msgfmt, *msgargs, **opts) -> None:
         """hook for logging facility extensions
 
         event should be a readily-identifiable subsystem, which will
@@ -2239,7 +2252,7 @@
         return self._exportableenviron
 
     @contextlib.contextmanager
-    def configoverride(self, overrides, source=b""):
+    def configoverride(self, overrides: _ConfigItems, source: bytes = b""):
         """Context manager for temporary config overrides
         `overrides` must be a dict of the following structure:
         {(section, name) : value}"""
@@ -2257,7 +2270,7 @@
             if (b'ui', b'quiet') in overrides:
                 self.fixconfig(section=b'ui')
 
-    def estimatememory(self):
+    def estimatememory(self) -> Optional[int]:
         """Provide an estimate for the available system memory in Bytes.
 
         This can be overriden via ui.available-memory. It returns None, if
@@ -2292,7 +2305,7 @@
     return _progresssingleton is not None
 
 
-def _selectmsgdests(ui):
+def _selectmsgdests(ui: ui):
     name = ui.config(b'ui', b'message-output')
     if name == b'channel':
         if ui.fmsg: