--- a/mercurial/scmutil.py Wed Jul 24 18:17:00 2024 -0400
+++ b/mercurial/scmutil.py Wed Jul 24 22:40:22 2024 -0400
@@ -16,6 +16,17 @@
import typing
import weakref
+from typing import (
+ Callable,
+ Dict,
+ Iterable,
+ Iterator,
+ List,
+ Optional,
+ Set,
+ Tuple,
+)
+
from .i18n import _
from .node import (
bin,
@@ -46,6 +57,7 @@
revsetlang,
similar,
smartset,
+ typelib,
url,
util,
vfs,
@@ -62,6 +74,11 @@
else:
from . import scmposix as scmplatform
+if typing.TYPE_CHECKING:
+ from . import (
+ ui as uimod,
+ )
+
parsers = policy.importmod('parsers')
rustrevlog = policy.importrust('revlog')
@@ -76,15 +93,15 @@
relevant to the working copy.
"""
- modified = attr.ib(default=attr.Factory(list))
- added = attr.ib(default=attr.Factory(list))
- removed = attr.ib(default=attr.Factory(list))
- deleted = attr.ib(default=attr.Factory(list))
- unknown = attr.ib(default=attr.Factory(list))
- ignored = attr.ib(default=attr.Factory(list))
- clean = attr.ib(default=attr.Factory(list))
-
- def __iter__(self):
+ modified = attr.ib(default=attr.Factory(list), type=List[bytes])
+ added = attr.ib(default=attr.Factory(list), type=List[bytes])
+ removed = attr.ib(default=attr.Factory(list), type=List[bytes])
+ deleted = attr.ib(default=attr.Factory(list), type=List[bytes])
+ unknown = attr.ib(default=attr.Factory(list), type=List[bytes])
+ ignored = attr.ib(default=attr.Factory(list), type=List[bytes])
+ clean = attr.ib(default=attr.Factory(list), type=List[bytes])
+
+ def __iter__(self) -> Iterator[List[bytes]]:
yield self.modified
yield self.added
yield self.removed
@@ -93,7 +110,7 @@
yield self.ignored
yield self.clean
- def __repr__(self):
+ def __repr__(self) -> str:
return (
r'<status modified=%s, added=%s, removed=%s, deleted=%s, '
r'unknown=%s, ignored=%s, clean=%s>'
@@ -126,7 +143,7 @@
yield subpath, ctx2.nullsub(subpath, ctx1)
-def nochangesfound(ui, repo, excluded=None):
+def nochangesfound(ui: "uimod.ui", repo, excluded=None) -> None:
"""Report no changes for push/pull, excluded is None or a list of
nodes excluded from the push/pull.
"""
@@ -146,7 +163,7 @@
ui.status(_(b"no changes found\n"))
-def callcatch(ui, func):
+def callcatch(ui: "uimod.ui", func: Callable[[], int]) -> int:
"""call func() with global exception handling
return func() if no exception happens. otherwise do some error handling
@@ -268,7 +285,7 @@
return coarse_exit_code
-def checknewlabel(repo, lbl, kind):
+def checknewlabel(repo, lbl: bytes, kind) -> None:
# Do not use the "kind" parameter in ui output.
# It makes strings difficult to translate.
if lbl in [b'tip', b'.', b'null']:
@@ -294,7 +311,7 @@
)
-def checkfilename(f):
+def checkfilename(f: bytes) -> None:
'''Check that the filename f is an acceptable filename for a tracked file'''
if b'\r' in f or b'\n' in f:
raise error.InputError(
@@ -303,7 +320,7 @@
)
-def checkportable(ui, f):
+def checkportable(ui: "uimod.ui", f: bytes) -> None:
'''Check if filename f is portable and warn or abort depending on config'''
checkfilename(f)
abort, warn = checkportabilityalert(ui)
@@ -316,7 +333,7 @@
ui.warn(_(b"warning: %s\n") % msg)
-def checkportabilityalert(ui):
+def checkportabilityalert(ui: "uimod.ui") -> Tuple[bool, bool]:
"""check if the user's config requests nothing, a warning, or abort for
non-portable filenames"""
val = ui.config(b'ui', b'portablefilenames')
@@ -332,7 +349,7 @@
class casecollisionauditor:
- def __init__(self, ui, abort, dirstate):
+ def __init__(self, ui: "uimod.ui", abort: bool, dirstate) -> None:
self._ui = ui
self._abort = abort
allfiles = b'\0'.join(dirstate)
@@ -343,7 +360,7 @@
# same filename twice.
self._newfiles = set()
- def __call__(self, f):
+ def __call__(self, f: bytes) -> None:
if f in self._newfiles:
return
fl = encoding.lower(f)
@@ -356,7 +373,9 @@
self._newfiles.add(f)
-def combined_filtered_and_obsolete_hash(repo, maxrev, needobsolete=False):
+def combined_filtered_and_obsolete_hash(
+ repo, maxrev, needobsolete: bool = False
+):
"""build hash of filtered revisions in the current repoview.
Multiple caches perform up-to-date validation by checking that the
@@ -437,7 +456,7 @@
return (filtered_set, obs_set)
-def _hash_revs(revs):
+def _hash_revs(revs: Iterable[int]) -> bytes:
"""return a hash from a list of revision numbers"""
s = hashutil.sha1()
for rev in revs:
@@ -445,7 +464,12 @@
return s.digest()
-def walkrepos(path, followsym=False, seen_dirs=None, recurse=False):
+def walkrepos(
+ path,
+ followsym: bool = False,
+ seen_dirs: Optional[List[bytes]] = None,
+ recurse: bool = False,
+) -> Iterable[bytes]:
"""yield every hg repository under path, always recursively.
The recurse flag will only control recursion into repo working dirs"""
@@ -494,7 +518,7 @@
dirs[:] = newdirs
-def binnode(ctx):
+def binnode(ctx) -> bytes:
"""Return binary node id for a given basectx"""
node = ctx.node()
if node is None:
@@ -502,7 +526,7 @@
return node
-def intrev(ctx):
+def intrev(ctx) -> int:
"""Return integer for a given basectx that can be used in comparison or
arithmetic operation"""
rev = ctx.rev()
@@ -511,14 +535,14 @@
return rev
-def formatchangeid(ctx):
+def formatchangeid(ctx) -> bytes:
"""Format changectx as '{rev}:{node|formatnode}', which is the default
template provided by logcmdutil.changesettemplater"""
repo = ctx.repo()
return formatrevnode(repo.ui, intrev(ctx), binnode(ctx))
-def formatrevnode(ui, rev, node):
+def formatrevnode(ui: "uimod.ui", rev: int, node: bytes) -> bytes:
"""Format given revision and node depending on the current verbosity"""
if ui.debugflag:
hexfunc = hex
@@ -527,7 +551,7 @@
return b'%d:%s' % (rev, hexfunc(node))
-def resolvehexnodeidprefix(repo, prefix):
+def resolvehexnodeidprefix(repo, prefix: bytes):
if prefix.startswith(b'x'):
prefix = prefix[1:]
try:
@@ -559,7 +583,7 @@
return node
-def mayberevnum(repo, prefix):
+def mayberevnum(repo, prefix: bytes) -> bool:
"""Checks if the given prefix may be mistaken for a revision number"""
try:
i = int(prefix)
@@ -574,7 +598,7 @@
return False
-def shortesthexnodeidprefix(repo, node, minlength=1, cache=None):
+def shortesthexnodeidprefix(repo, node: bytes, minlength: int = 1, cache=None):
"""Find the shortest unambiguous prefix that matches hexnode.
If "cache" is not None, it must be a dictionary that can be used for
@@ -586,7 +610,7 @@
minlength = max(minlength, 1)
- def disambiguate(prefix):
+ def disambiguate(prefix: bytes):
"""Disambiguate against revnums."""
if repo.ui.configbool(b'experimental', b'revisions.prefixhexnode'):
if mayberevnum(repo, prefix):
@@ -651,7 +675,7 @@
raise error.RepoLookupError()
-def isrevsymbol(repo, symbol):
+def isrevsymbol(repo, symbol: bytes) -> bool:
"""Checks if a symbol exists in the repo.
See revsymbol() for details. Raises error.AmbiguousPrefixLookupError if the
@@ -664,7 +688,7 @@
return False
-def revsymbol(repo, symbol):
+def revsymbol(repo, symbol: bytes):
"""Returns a context given a single revision symbol (as string).
This is similar to revsingle(), but accepts only a single revision symbol,
@@ -731,7 +755,7 @@
raise _filterederror(repo, symbol)
-def _filterederror(repo, changeid):
+def _filterederror(repo, changeid: bytes) -> error.FilteredRepoLookupError:
"""build an exception to be raised about a filtered changeid
This is extracted in a function to help extensions (eg: evolve) to
@@ -766,7 +790,7 @@
return repo[l.last()]
-def _pairspec(revspec):
+def _pairspec(revspec) -> bool:
tree = revsetlang.parse(revspec)
return tree and tree[0] in (
b'range',
@@ -831,7 +855,9 @@
return repo.anyrevs(allspecs, user=True, localalias=localalias)
-def increasingwindows(windowsize=8, sizelimit=512):
+def increasingwindows(
+ windowsize: int = 8, sizelimit: int = 512
+) -> Iterable[int]:
while True:
yield windowsize
if windowsize < sizelimit:
@@ -897,7 +923,11 @@
return parents
-def getuipathfn(repo, legacyrelativevalue=False, forcerelativevalue=None):
+def getuipathfn(
+ repo,
+ legacyrelativevalue: bool = False,
+ forcerelativevalue: Optional[bool] = None,
+) -> typelib.UiPathFn:
"""Return a function that produced paths for presenting to the user.
The returned function takes a repo-relative path and produces a path
@@ -937,12 +967,14 @@
return util.localpath
-def subdiruipathfn(subpath, uipathfn):
+def subdiruipathfn(
+ subpath: bytes, uipathfn: typelib.UiPathFn
+) -> typelib.UiPathFn:
'''Create a new uipathfn that treats the file as relative to subpath.'''
return lambda f: uipathfn(posixpath.join(subpath, f))
-def anypats(pats, opts):
+def anypats(pats, opts) -> bool:
"""Checks if any patterns, including --include and --exclude were given.
Some commands (e.g. addremove) use this condition for deciding whether to
@@ -951,7 +983,7 @@
return bool(pats or opts.get(b'include') or opts.get(b'exclude'))
-def expandpats(pats):
+def expandpats(pats: Iterable[bytes]) -> List[bytes]:
"""Expand bare globs when running on windows.
On posix we assume it already has already been done by sh."""
if not util.expandglobs:
@@ -972,7 +1004,12 @@
def matchandpats(
- ctx, pats=(), opts=None, globbed=False, default=b'relpath', badfn=None
+ ctx,
+ pats=(),
+ opts=None,
+ globbed: bool = False,
+ default: bytes = b'relpath',
+ badfn=None,
):
"""Return a matcher and the patterns that were used.
The matcher will warn about bad matches, unless an alternate badfn callback
@@ -1005,7 +1042,12 @@
def match(
- ctx, pats=(), opts=None, globbed=False, default=b'relpath', badfn=None
+ ctx,
+ pats=(),
+ opts=None,
+ globbed: bool = False,
+ default: bytes = b'relpath',
+ badfn=None,
):
'''Return a matcher that will warn about bad matches.'''
return matchandpats(ctx, pats, opts, globbed, default, badfn=badfn)[0]
@@ -1016,12 +1058,12 @@
return matchmod.always()
-def matchfiles(repo, files, badfn=None):
+def matchfiles(repo, files, badfn=None) -> matchmod.exactmatcher:
'''Return a matcher that will efficiently match exactly these files.'''
return matchmod.exact(files, badfn=badfn)
-def parsefollowlinespattern(repo, rev, pat, msg):
+def parsefollowlinespattern(repo, rev, pat: bytes, msg: bytes) -> bytes:
"""Return a file name from `pat` pattern suitable for usage in followlines
logic.
"""
@@ -1036,7 +1078,7 @@
return files[0]
-def getorigvfs(ui, repo):
+def getorigvfs(ui: "uimod.ui", repo):
"""return a vfs suitable to save 'orig' file
return None if no special directory is configured"""
@@ -1046,7 +1088,7 @@
return vfs.vfs(repo.wvfs.join(origbackuppath))
-def backuppath(ui, repo, filepath):
+def backuppath(ui: "uimod.ui", repo, filepath: bytes) -> bytes:
"""customize where working copy backup files (.orig files) are created
Fetch user defined path from config file: [ui] origbackuppath = <path>
@@ -1089,7 +1131,7 @@
self._torev = repo.changelog.rev
self._revcontains = revcontainer.__contains__
- def __contains__(self, node):
+ def __contains__(self, node) -> bool:
return self._revcontains(self._torev(node))
@@ -1102,7 +1144,7 @@
fixphase=False,
targetphase=None,
backup=True,
-):
+) -> None:
"""do common cleanups when old nodes are replaced by new nodes
That includes writing obsmarkers or stripping nodes, and moving bookmarks.
@@ -1276,7 +1318,14 @@
)
-def addremove(repo, matcher, prefix, uipathfn, opts=None, open_tr=None):
+def addremove(
+ repo,
+ matcher,
+ prefix: bytes,
+ uipathfn: typelib.UiPathFn,
+ opts=None,
+ open_tr=None,
+) -> int:
if opts is None:
opts = {}
m = matcher
@@ -1309,7 +1358,7 @@
rejected = []
- def badfn(f, msg):
+ def badfn(f: bytes, msg: bytes) -> None:
if f in m.files():
m.bad(f, msg)
rejected.append(f)
@@ -1347,7 +1396,7 @@
return ret
-def marktouched(repo, files, similarity=0.0):
+def marktouched(repo, files, similarity: float = 0.0) -> int:
"""Assert that files have somehow been operated upon. files are relative to
the repo root."""
m = matchfiles(repo, files, badfn=lambda x, y: rejected.append(x))
@@ -1382,7 +1431,9 @@
return 0
-def _interestingfiles(repo, matcher):
+def _interestingfiles(
+ repo, matcher
+) -> Tuple[List[bytes], List[bytes], List[bytes], List[bytes], List[bytes]]:
"""Walk dirstate with matcher, looking for files that addremove would care
about.
@@ -1418,7 +1469,9 @@
return added, unknown, deleted, removed, forgotten
-def _findrenames(repo, matcher, added, removed, similarity, uipathfn):
+def _findrenames(
+ repo, matcher, added, removed, similarity, uipathfn: typelib.UiPathFn
+) -> Dict[bytes, bytes]:
'''Find renames from removed files to added ones.'''
renames = {}
if similarity > 0:
@@ -1441,7 +1494,7 @@
return renames
-def _markchanges(repo, unknown, deleted, renames):
+def _markchanges(repo, unknown, deleted, renames) -> None:
"""Marks the files in unknown as added, the files in deleted as removed,
and the files in renames as copied."""
wctx = repo[None]
@@ -1524,7 +1577,15 @@
return copiesfn
-def dirstatecopy(ui, repo, wctx, src, dst, dryrun=False, cwd=None):
+def dirstatecopy(
+ ui: "uimod.ui",
+ repo,
+ wctx,
+ src,
+ dst,
+ dryrun: bool = False,
+ cwd: Optional[bytes] = None,
+) -> None:
"""Update the dirstate to reflect the intent of copying src to dst. For
different reasons it might not end with dst being marked as copied from src.
"""
@@ -1549,7 +1610,7 @@
wctx.copy(origsrc, dst)
-def movedirstate(repo, newctx, match=None):
+def movedirstate(repo, newctx, match=None) -> None:
"""Move the dirstate to newctx and adjust it as necessary.
A matcher can be provided as an optimization. It is probably a bug to pass
@@ -1602,12 +1663,12 @@
return requirements, None
-def istreemanifest(repo):
+def istreemanifest(repo) -> bool:
"""returns whether the repository is using treemanifest or not"""
return requirementsmod.TREEMANIFEST_REQUIREMENT in repo.requirements
-def writereporequirements(repo, requirements=None):
+def writereporequirements(repo, requirements=None) -> None:
"""writes requirements for the repo
Requirements are written to .hg/requires and .hg/store/requires based
@@ -1626,14 +1687,16 @@
repo.svfs.tryunlink(b'requires')
-def writerequires(opener, requirements):
+def writerequires(opener, requirements) -> None:
with opener(b'requires', b'w', atomictemp=True) as fp:
for r in sorted(requirements):
fp.write(b"%s\n" % r)
class filecachesubentry:
- def __init__(self, path, stat):
+ _cacheable: Optional[bool] = None
+
+ def __init__(self, path, stat: bool):
self.path = path
self.cachestat = None
self._cacheable = None
@@ -1647,18 +1710,18 @@
# None means we don't know yet
self._cacheable = None
- def refresh(self):
+ def refresh(self) -> None:
if self.cacheable():
self.cachestat = filecachesubentry.stat(self.path)
- def cacheable(self):
+ def cacheable(self) -> bool:
if self._cacheable is not None:
return self._cacheable
# we don't know yet, assume it is for now
return True
- def changed(self):
+ def changed(self) -> bool:
# no point in going further if we can't cache it
if not self.cacheable():
return True
@@ -1680,7 +1743,7 @@
return False
@staticmethod
- def stat(path):
+ def stat(path: bytes) -> Optional[typelib.CacheStat]:
try:
return util.cachestat(path)
except FileNotFoundError:
@@ -1688,19 +1751,19 @@
class filecacheentry:
- def __init__(self, paths, stat=True):
+ def __init__(self, paths, stat: bool = True) -> None:
self._entries = []
for path in paths:
self._entries.append(filecachesubentry(path, stat))
- def changed(self):
+ def changed(self) -> bool:
'''true if any entry has changed'''
for entry in self._entries:
if entry.changed():
return True
return False
- def refresh(self):
+ def refresh(self) -> None:
for entry in self._entries:
entry.refresh()
@@ -1731,13 +1794,15 @@
remove the ``filecacheentry``.
"""
- def __init__(self, *paths):
+ paths: Tuple[bytes, ...]
+
+ def __init__(self, *paths: bytes) -> None:
self.paths = paths
def tracked_paths(self, obj):
return [self.join(obj, path) for path in self.paths]
- def join(self, obj, fname):
+ def join(self, obj, fname: bytes):
"""Used to compute the runtime path of a cached file.
Users should subclass filecache and provide their own version of this
@@ -1798,7 +1863,7 @@
obj.__dict__[self.sname] = value # update copy returned by obj.x
-def extdatasource(repo, source):
+def extdatasource(repo, source: bytes):
"""Gather a map of rev -> value dict from the specified source
A source spec is treated as a URL, with a special case shell: type
@@ -1867,7 +1932,21 @@
class progress:
- def __init__(self, ui, updatebar, topic, unit=b"", total=None):
+ ui: "uimod.ui"
+ pos: Optional[int] # None once complete
+ topic: bytes
+ unit: bytes
+ total: Optional[int]
+ debug: bool
+
+ def __init__(
+ self,
+ ui: "uimod.ui",
+ updatebar,
+ topic: bytes,
+ unit: bytes = b"",
+ total: Optional[int] = None,
+ ) -> None:
self.ui = ui
self.pos = 0
self.topic = topic
@@ -1882,7 +1961,9 @@
def __exit__(self, exc_type, exc_value, exc_tb):
self.complete()
- def update(self, pos, item=b"", total=None):
+ def update(
+ self, pos: int, item: bytes = b"", total: Optional[int] = None
+ ) -> None:
assert pos is not None
if total:
self.total = total
@@ -1891,16 +1972,18 @@
if self.debug:
self._printdebug(item)
- def increment(self, step=1, item=b"", total=None):
+ def increment(
+ self, step: int = 1, item: bytes = b"", total: Optional[int] = None
+ ) -> None:
self.update(self.pos + step, item, total)
- def complete(self):
+ def complete(self) -> None:
self.pos = None
self.unit = b""
self.total = None
self._updatebar(self.topic, self.pos, b"", self.unit, self.total)
- def _printdebug(self, item):
+ def _printdebug(self, item: bytes) -> None:
unit = b''
if self.unit:
unit = b' ' + self.unit
@@ -1917,7 +2000,7 @@
self.ui.debug(b'%s:%s %d%s\n' % (self.topic, item, self.pos, unit))
-def gdinitconfig(ui):
+def gdinitconfig(ui: "uimod.ui"):
"""helper function to know if a repo should be created as general delta"""
# experimental config: format.generaldelta
return ui.configbool(b'format', b'generaldelta') or ui.configbool(
@@ -1925,7 +2008,7 @@
)
-def gddeltaconfig(ui):
+def gddeltaconfig(ui: "uimod.ui"):
"""helper function to know if incoming deltas should be optimized
The `format.generaldelta` config is an old form of the config that also
@@ -1944,11 +2027,11 @@
firstlinekey = b'__firstline'
- def __init__(self, vfs, path, keys=None):
+ def __init__(self, vfs, path: bytes, keys=None) -> None:
self.vfs = vfs
self.path = path
- def read(self, firstlinenonkeyval=False):
+ def read(self, firstlinenonkeyval: bool = False):
"""Read the contents of a simple key-value file
'firstlinenonkeyval' indicates whether the first line of file should
@@ -1979,7 +2062,7 @@
raise error.CorruptedState(stringutil.forcebytestr(e))
return d
- def write(self, data, firstline=None):
+ def write(self, data, firstline: Optional[bytes] = None) -> None:
"""Write key=>value mapping to a file
data is a dict. Keys must be alphanumerical and start with a letter.
Values must not contain newline characters.
@@ -2008,7 +2091,7 @@
fp.write(b''.join(lines))
-_reportobsoletedsource = [
+_reportobsoletedsource: List[bytes] = [
b'debugobsolete',
b'pull',
b'push',
@@ -2016,13 +2099,13 @@
b'unbundle',
]
-_reportnewcssource = [
+_reportnewcssource: List[bytes] = [
b'pull',
b'unbundle',
]
-def prefetchfiles(repo, revmatches):
+def prefetchfiles(repo, revmatches) -> None:
"""Invokes the registered file prefetch functions, allowing extensions to
ensure the corresponding files are available locally, before the command
uses them.
@@ -2051,10 +2134,12 @@
fileprefetchhooks = util.hooks()
# A marker that tells the evolve extension to suppress its own reporting
-_reportstroubledchangesets = True
-
-
-def registersummarycallback(repo, otr, txnname=b'', as_validator=False):
+_reportstroubledchangesets: bool = True
+
+
+def registersummarycallback(
+ repo, otr, txnname: bytes = b'', as_validator: bool = False
+) -> None:
"""register a callback to issue a summary after the transaction is closed
If as_validator is true, then the callbacks are registered as transaction
@@ -2225,7 +2310,7 @@
repo.ui.status(msg % len(published))
-def getinstabilitymessage(delta, instability):
+def getinstabilitymessage(delta: int, instability: bytes) -> Optional[bytes]:
"""function to return the message to show warning about new instabilities
exists as a separate function so that extension can wrap to show more
@@ -2234,14 +2319,14 @@
return _(b'%i new %s changesets\n') % (delta, instability)
-def nodesummaries(repo, nodes, maxnumnodes=4):
+def nodesummaries(repo, nodes, maxnumnodes: int = 4) -> bytes:
if len(nodes) <= maxnumnodes or repo.ui.verbose:
return b' '.join(short(h) for h in nodes)
first = b' '.join(short(h) for h in nodes[:maxnumnodes])
return _(b"%s and %d others") % (first, len(nodes) - maxnumnodes)
-def enforcesinglehead(repo, tr, desc, accountclosed, filtername):
+def enforcesinglehead(repo, tr, desc: bytes, accountclosed, filtername) -> None:
"""check that no named branch has multiple heads"""
if desc in (b'strip', b'repair'):
# skip the logic during strip
@@ -2266,7 +2351,7 @@
return sink
-def unhidehashlikerevs(repo, specs, hiddentype):
+def unhidehashlikerevs(repo, specs, hiddentype: bytes):
"""parse the user specs and unhide changesets whose hash or revision number
is passed.
@@ -2319,7 +2404,7 @@
return repo.filtered(b'visible-hidden', revs)
-def _getrevsfromsymbols(repo, symbols):
+def _getrevsfromsymbols(repo, symbols) -> Set[int]:
"""parse the list of symbols and returns a set of revision numbers of hidden
changesets present in symbols"""
revs = set()
@@ -2354,7 +2439,7 @@
return revs
-def bookmarkrevs(repo, mark):
+def bookmarkrevs(repo, mark: bytes):
"""Select revisions reachable by a given bookmark
If the bookmarked revision isn't a head, an empty set will be returned.
@@ -2362,7 +2447,7 @@
return repo.revs(format_bookmark_revspec(mark))
-def format_bookmark_revspec(mark):
+def format_bookmark_revspec(mark: bytes) -> bytes:
"""Build a revset expression to select revisions reachable by a given
bookmark"""
mark = b'literal:' + mark
@@ -2376,7 +2461,7 @@
)
-def ismember(ui, username, userlist):
+def ismember(ui: "uimod.ui", username: bytes, userlist: List[bytes]) -> bool:
"""Check if username is a member of userlist.
If userlist has a single '*' member, all users are considered members.
@@ -2386,22 +2471,24 @@
return userlist == [b'*'] or username in userlist
-RESOURCE_HIGH = 3
-RESOURCE_MEDIUM = 2
-RESOURCE_LOW = 1
-RESOURCE_DEFAULT = 0
-
-RESOURCE_MAPPING = {
+RESOURCE_HIGH: int = 3
+RESOURCE_MEDIUM: int = 2
+RESOURCE_LOW: int = 1
+RESOURCE_DEFAULT: int = 0
+
+RESOURCE_MAPPING: Dict[bytes, int] = {
b'default': RESOURCE_DEFAULT,
b'low': RESOURCE_LOW,
b'medium': RESOURCE_MEDIUM,
b'high': RESOURCE_HIGH,
}
-DEFAULT_RESOURCE = RESOURCE_MEDIUM
-
-
-def get_resource_profile(ui, dimension=None):
+DEFAULT_RESOURCE: int = RESOURCE_MEDIUM
+
+
+def get_resource_profile(
+ ui: "uimod.ui", dimension: Optional[bytes] = None
+) -> int:
"""return the resource profile for a dimension
If no dimension is specified, the generic value is returned"""