--- a/mercurial/manifest.py Mon Aug 05 10:15:10 2024 +0200
+++ b/mercurial/manifest.py Mon Aug 05 10:03:06 2024 +0200
@@ -12,7 +12,17 @@
import weakref
from typing import (
+ ByteString,
+ Callable,
+ Dict,
Iterable,
+ Iterator,
+ List,
+ Optional,
+ Set,
+ Tuple,
+ Union,
+ cast,
)
from .i18n import _
@@ -47,7 +57,7 @@
FASTDELTA_TEXTDIFF_THRESHOLD = 1000
-def _parse(nodelen, data):
+def _parse(nodelen, data: bytes):
# This method does a little bit of excessive-looking
# precondition checking. This is so that the behavior of this
# class exactly matches its C counterpart to try and help
@@ -88,21 +98,23 @@
class lazymanifestiter:
- def __init__(self, lm):
+ def __init__(self, lm: '_LazyManifest') -> None:
self.pos = 0
self.lm = lm
- def __iter__(self):
+ def __iter__(self) -> 'lazymanifestiter':
return self
- def next(self):
+ def next(self) -> bytes:
try:
data, pos = self.lm._get(self.pos)
except IndexError:
raise StopIteration
if pos == -1:
+ assert isinstance(data, tuple)
self.pos += 1
return data[0]
+ assert isinstance(data, bytes)
self.pos += 1
zeropos = data.find(b'\x00', pos)
return data[pos:zeropos]
@@ -111,21 +123,23 @@
class lazymanifestiterentries:
- def __init__(self, lm):
+ def __init__(self, lm: '_LazyManifest') -> None:
self.lm = lm
self.pos = 0
- def __iter__(self):
+ def __iter__(self) -> 'lazymanifestiterentries':
return self
- def next(self):
+ def next(self) -> Tuple[bytes, bytes, bytes]:
try:
data, pos = self.lm._get(self.pos)
except IndexError:
raise StopIteration
if pos == -1:
+ assert isinstance(data, tuple)
self.pos += 1
return data
+ assert isinstance(data, bytes)
zeropos = data.find(b'\x00', pos)
nlpos = data.find(b'\n', pos)
if zeropos == -1 or nlpos == -1 or nlpos < zeropos:
@@ -181,12 +195,12 @@
def __init__(
self,
- nodelen,
- data,
+ nodelen: int,
+ data: bytes,
positions=None,
extrainfo=None,
extradata=None,
- hasremovals=False,
+ hasremovals: bool = False,
):
self._nodelen = nodelen
if positions is None:
@@ -202,7 +216,7 @@
self.data = data
self.hasremovals = hasremovals
- def findlines(self, data):
+ def findlines(self, data: bytes) -> List[int]:
if not data:
return []
pos = data.find(b"\n")
@@ -219,7 +233,9 @@
pos = data.find(b"\n", pos + 1)
return positions
- def _get(self, index):
+ def _get(
+ self, index: int
+ ) -> Tuple[Union[bytes, Tuple[bytes, bytes, bytes]], int]:
# get the position encoded in pos:
# positive number is an index in 'data'
# negative number is in extrapieces
@@ -228,12 +244,12 @@
return self.data, pos
return self.extradata[-pos - 1], -1
- def _getkey(self, pos):
+ def _getkey(self, pos) -> bytes:
if pos >= 0:
return self.data[pos : self.data.find(b'\x00', pos + 1)]
return self.extradata[-pos - 1][0]
- def bsearch(self, key):
+ def bsearch(self, key: bytes) -> int:
first = 0
last = len(self.positions) - 1
@@ -251,7 +267,7 @@
first = midpoint + 1
return -1
- def bsearch2(self, key):
+ def bsearch2(self, key: bytes) -> Tuple[int, bool]:
# same as the above, but will always return the position
# done for performance reasons
first = 0
@@ -271,10 +287,10 @@
first = midpoint + 1
return (first, False)
- def __contains__(self, key):
+ def __contains__(self, key: bytes) -> bool:
return self.bsearch(key) != -1
- def __getitem__(self, key):
+ def __getitem__(self, key: bytes) -> Tuple[bytes, bytes]:
if not isinstance(key, bytes):
raise TypeError(b"getitem: manifest keys must be a bytes.")
needle = self.bsearch(key)
@@ -282,7 +298,10 @@
raise KeyError
data, pos = self._get(needle)
if pos == -1:
+ assert isinstance(data, tuple)
return (data[1], data[2])
+
+ assert isinstance(data, bytes)
zeropos = data.find(b'\x00', pos)
nlpos = data.find(b'\n', zeropos)
assert 0 <= needle <= len(self.positions)
@@ -300,7 +319,7 @@
hashval = unhexlify(data, self.extrainfo[needle], zeropos + 1, hlen)
return (hashval, flags)
- def __delitem__(self, key):
+ def __delitem__(self, key: bytes) -> None:
needle, found = self.bsearch2(key)
if not found:
raise KeyError
@@ -313,7 +332,7 @@
self.data = self.data[:cur] + b'\x00' + self.data[cur + 1 :]
self.hasremovals = True
- def __setitem__(self, key, value):
+ def __setitem__(self, key: bytes, value: Tuple[bytes, bytes]):
if not isinstance(key, bytes):
raise TypeError(b"setitem: manifest keys must be a byte string.")
if not isinstance(value, tuple) or len(value) != 2:
@@ -348,7 +367,7 @@
self.extrainfo[:needle] + [0] + self.extrainfo[needle:]
)
- def copy(self):
+ def copy(self) -> '_LazyManifest':
# XXX call _compact like in C?
return _lazymanifest(
self._nodelen,
@@ -359,7 +378,7 @@
self.hasremovals,
)
- def _compact(self):
+ def _compact(self) -> None:
# hopefully not called TOO often
if len(self.extradata) == 0 and not self.hasremovals:
return
@@ -418,16 +437,23 @@
self.hasremovals = False
self.extradata = []
- def _pack(self, d):
+ def _pack(self, d: Tuple[bytes, bytes, bytes]) -> bytes:
n = d[1]
assert len(n) in (20, 32)
return d[0] + b'\x00' + hex(n) + d[2] + b'\n'
- def text(self):
+ def text(self) -> ByteString:
self._compact()
return self.data
- def diff(self, m2, clean=False):
+ def diff(
+ self, m2: '_LazyManifest', clean: bool = False
+ ) -> Dict[
+ bytes,
+ Optional[
+ Tuple[Tuple[Optional[bytes], bytes], Tuple[Optional[bytes], bytes]]
+ ],
+ ]:
'''Finds changes between the current manifest and m2.'''
# XXX think whether efficiency matters here
diff = {}
@@ -448,19 +474,19 @@
return diff
- def iterentries(self):
+ def iterentries(self) -> lazymanifestiterentries:
return lazymanifestiterentries(self)
- def iterkeys(self):
+ def iterkeys(self) -> lazymanifestiter:
return lazymanifestiter(self)
- def __iter__(self):
+ def __iter__(self) -> lazymanifestiter:
return lazymanifestiter(self)
- def __len__(self):
+ def __len__(self) -> int:
return len(self.positions)
- def filtercopy(self, filterfn):
+ def filtercopy(self, filterfn: Callable[[bytes], bool]) -> '_LazyManifest':
# XXX should be optimized
c = _lazymanifest(self._nodelen, b'')
for f, n, fl in self.iterentries():
@@ -476,50 +502,50 @@
class ManifestDict:
- def __init__(self, nodelen, data=b''):
+ def __init__(self, nodelen: int, data: ByteString = b''):
self._nodelen = nodelen
self._lm = _lazymanifest(nodelen, data)
- def __getitem__(self, key):
+ def __getitem__(self, key: bytes) -> bytes:
return self._lm[key][0]
- def find(self, key):
+ def find(self, key: bytes) -> Tuple[bytes, bytes]:
return self._lm[key]
- def __len__(self):
+ def __len__(self) -> int:
return len(self._lm)
- def __nonzero__(self):
+ def __nonzero__(self) -> bool:
# nonzero is covered by the __len__ function, but implementing it here
# makes it easier for extensions to override.
return len(self._lm) != 0
__bool__ = __nonzero__
- def set(self, key, node, flags):
+ def set(self, key: bytes, node: bytes, flags: bytes) -> None:
self._lm[key] = node, flags
- def __setitem__(self, key, node):
+ def __setitem__(self, key: bytes, node: bytes) -> None:
self._lm[key] = node, self.flags(key)
- def __contains__(self, key):
+ def __contains__(self, key: bytes) -> bool:
if key is None:
return False
return key in self._lm
- def __delitem__(self, key):
+ def __delitem__(self, key: bytes) -> bool:
del self._lm[key]
- def __iter__(self):
+ def __iter__(self) -> Iterator[bytes]:
return self._lm.__iter__()
- def iterkeys(self):
+ def iterkeys(self) -> Iterator[bytes]:
return self._lm.iterkeys()
- def keys(self):
+ def keys(self) -> List[bytes]:
return list(self.iterkeys())
- def filesnotin(self, m2, match=None):
+ def filesnotin(self, m2, match=None) -> Set[bytes]:
'''Set of files in this manifest that are not in the other'''
if match is not None:
match = matchmod.badmatch(match, lambda path, msg: None)
@@ -528,16 +554,16 @@
return {f for f in self if f not in m2}
@propertycache
- def _dirs(self):
+ def _dirs(self) -> pathutil.dirs:
return pathutil.dirs(self)
- def dirs(self):
+ def dirs(self) -> pathutil.dirs:
return self._dirs
- def hasdir(self, dir):
+ def hasdir(self, dir: bytes) -> bool:
return dir in self._dirs
- def _filesfastpath(self, match):
+ def _filesfastpath(self, match: matchmod.basematcher) -> bool:
"""Checks whether we can correctly and quickly iterate over matcher
files instead of over manifest files."""
files = match.files()
@@ -546,7 +572,7 @@
or (match.prefix() and all(fn in self for fn in files))
)
- def walk(self, match):
+ def walk(self, match: matchmod.basematcher) -> Iterator[bytes]:
"""Generates matching file names.
Equivalent to manifest.matches(match).iterkeys(), but without creating
@@ -583,7 +609,7 @@
if not self.hasdir(fn):
match.bad(fn, None)
- def _matches(self, match):
+ def _matches(self, match: matchmod.basematcher) -> 'ManifestDict':
'''generate a new manifest filtered by the match argument'''
if match.always():
return self.copy()
@@ -600,7 +626,17 @@
m._lm = self._lm.filtercopy(match)
return m
- def diff(self, m2, match=None, clean=False):
+ def diff(
+ self,
+ m2: 'ManifestDict',
+ match: Optional[matchmod.basematcher] = None,
+ clean: bool = False,
+ ) -> Dict[
+ bytes,
+ Optional[
+ Tuple[Tuple[Optional[bytes], bytes], Tuple[Optional[bytes], bytes]]
+ ],
+ ]:
"""Finds changes between the current manifest and m2.
Args:
@@ -621,42 +657,44 @@
return m1.diff(m2, clean=clean)
return self._lm.diff(m2._lm, clean)
- def setflag(self, key, flag):
+ def setflag(self, key: bytes, flag: bytes) -> None:
if flag not in _manifestflags:
raise TypeError(b"Invalid manifest flag set.")
self._lm[key] = self[key], flag
- def get(self, key, default=None):
+ def get(self, key: bytes, default=None) -> Optional[bytes]:
try:
return self._lm[key][0]
except KeyError:
return default
- def flags(self, key):
+ def flags(self, key: bytes) -> bytes:
try:
return self._lm[key][1]
except KeyError:
return b''
- def copy(self):
+ def copy(self) -> 'ManifestDict':
c = manifestdict(self._nodelen)
c._lm = self._lm.copy()
return c
- def items(self):
+ def items(self) -> Iterator[Tuple[bytes, bytes]]:
return (x[:2] for x in self._lm.iterentries())
- def iteritems(self):
+ def iteritems(self) -> Iterator[Tuple[bytes, bytes]]:
return (x[:2] for x in self._lm.iterentries())
- def iterentries(self):
+ def iterentries(self) -> Iterator[Tuple[bytes, bytes, bytes]]:
return self._lm.iterentries()
- def text(self):
+ def text(self) -> ByteString:
# most likely uses native version
return self._lm.text()
- def fastdelta(self, base, changes):
+ def fastdelta(
+ self, base: ByteString, changes: Iterable[Tuple[bytes, bool]]
+ ) -> Tuple[ByteString, ByteString]:
"""Given a base manifest text as a bytearray and a list of changes
relative to that text, compute a delta that can be used by revlog.
"""
@@ -715,17 +753,17 @@
manifestdict = interfaceutil.implementer(repository.imanifestdict)(ManifestDict)
-def _msearch(m, s, lo=0, hi=None):
+def _msearch(
+ m: ByteString, s: bytes, lo: int = 0, hi: Optional[int] = None
+) -> Tuple[int, int]:
"""return a tuple (start, end) that says where to find s within m.
If the string is found m[start:end] are the line containing
that string. If start == end the string was not found and
they indicate the proper sorted insertion point.
-
- m should be a buffer, a memoryview or a byte string.
- s is a byte string"""
-
- def advance(i, c):
+ """
+
+ def advance(i: int, c: bytes):
while i < lenm and m[i : i + 1] != c:
i += 1
return i
@@ -758,7 +796,7 @@
return (lo, lo)
-def _checkforbidden(l):
+def _checkforbidden(l: Iterable[bytes]) -> None:
"""Check filenames for illegal characters."""
for f in l:
if b'\n' in f or b'\r' in f:
@@ -770,7 +808,10 @@
# apply the changes collected during the bisect loop to our addlist
# return a delta suitable for addrevision
-def _addlistdelta(addlist, x):
+def _addlistdelta(
+ addlist: ByteString,
+ x: Iterable[Tuple[int, int, bytes]],
+) -> Tuple[bytes, ByteString]:
# for large addlist arrays, building a new array is cheaper
# than repeatedly modifying the existing one
currentposition = 0
@@ -792,7 +833,7 @@
return deltatext, newaddlist
-def _splittopdir(f):
+def _splittopdir(f: bytes) -> Tuple[bytes, bytes]:
if b'/' in f:
dir, subpath = f.split(b'/', 1)
return dir + b'/', subpath
@@ -804,7 +845,7 @@
class TreeManifest:
- def __init__(self, nodeconstants, dir=b'', text=b''):
+ def __init__(self, nodeconstants, dir: bytes = b'', text: bytes = b''):
self._dir = dir
self.nodeconstants = nodeconstants
self._node = self.nodeconstants.nullid
@@ -812,10 +853,13 @@
self._loadfunc = _noop
self._copyfunc = _noop
self._dirty = False
- self._dirs = {}
- self._lazydirs = {}
+ self._dirs: Dict[bytes, 'TreeManifest'] = {}
+ self._lazydirs: Dict[
+ bytes,
+ Tuple[bytes, Callable[[bytes, bytes], 'TreeManifest'], bool],
+ ] = {}
# Using _lazymanifest here is a little slower than plain old dicts
- self._files = {}
+ self._files: Dict[bytes, bytes] = {}
self._flags = {}
if text:
@@ -827,10 +871,10 @@
self.parse(text, readsubtree)
self._dirty = True # Mark flat manifest dirty after parsing
- def _subpath(self, path):
+ def _subpath(self, path: bytes) -> bytes:
return self._dir + path
- def _loadalllazy(self):
+ def _loadalllazy(self) -> None:
selfdirs = self._dirs
subpath = self._subpath
for d, (node, readsubtree, docopy) in self._lazydirs.items():
@@ -840,7 +884,7 @@
selfdirs[d] = readsubtree(subpath(d), node)
self._lazydirs.clear()
- def _loadlazy(self, d):
+ def _loadlazy(self, d: bytes) -> None:
v = self._lazydirs.get(d)
if v is not None:
node, readsubtree, docopy = v
@@ -850,19 +894,23 @@
self._dirs[d] = readsubtree(self._subpath(d), node)
del self._lazydirs[d]
- def _loadchildrensetlazy(self, visit):
+ def _loadchildrensetlazy(
+ self, visit: Union[Set[bytes], bytes]
+ ) -> Optional[Set[bytes]]:
if not visit:
return None
if visit == b'all' or visit == b'this':
self._loadalllazy()
return None
+ visit = cast(Set[bytes], visit)
+
loadlazy = self._loadlazy
for k in visit:
loadlazy(k + b'/')
return visit
- def _loaddifflazy(self, t1, t2):
+ def _loaddifflazy(self, t1: 'TreeManifest', t2: 'TreeManifest'):
"""load items in t1 and t2 if they're needed for diffing.
The criteria currently is:
@@ -884,7 +932,7 @@
t1._loadlazy(d)
t2._loadlazy(d)
- def __len__(self):
+ def __len__(self) -> int:
self._load()
size = len(self._files)
self._loadalllazy()
@@ -892,13 +940,13 @@
size += m.__len__()
return size
- def __nonzero__(self):
- # Faster than "__len() != 0" since it avoids loading sub-manifests
+ def __nonzero__(self) -> bool:
+ # Faster than "__len__() != 0" since it avoids loading sub-manifests
return not self._isempty()
__bool__ = __nonzero__
- def _isempty(self):
+ def _isempty(self) -> bool:
self._load() # for consistency; already loaded by all callers
# See if we can skip loading everything.
if self._files or (
@@ -909,7 +957,7 @@
return not self._dirs or all(m._isempty() for m in self._dirs.values())
@encoding.strmethod
- def __repr__(self):
+ def __repr__(self) -> bytes:
return (
b'<treemanifest dir=%s, node=%s, loaded=%r, dirty=%r at 0x%x>'
% (
@@ -921,23 +969,25 @@
)
)
- def dir(self):
+ def dir(self) -> bytes:
"""The directory that this tree manifest represents, including a
trailing '/'. Empty string for the repo root directory."""
return self._dir
- def node(self):
+ def node(self) -> bytes:
"""This node of this instance. nullid for unsaved instances. Should
be updated when the instance is read or written from a revlog.
"""
assert not self._dirty
return self._node
- def setnode(self, node):
+ def setnode(self, node: bytes) -> None:
self._node = node
self._dirty = False
- def iterentries(self):
+ def iterentries(
+ self,
+ ) -> Iterator[Tuple[bytes, Union[bytes, 'TreeManifest'], bytes]]:
self._load()
self._loadalllazy()
for p, n in sorted(
@@ -949,7 +999,7 @@
for x in n.iterentries():
yield x
- def items(self):
+ def items(self) -> Iterator[Tuple[bytes, Union[bytes, 'TreeManifest']]]:
self._load()
self._loadalllazy()
for p, n in sorted(
@@ -963,7 +1013,7 @@
iteritems = items
- def iterkeys(self):
+ def iterkeys(self) -> Iterator[bytes]:
self._load()
self._loadalllazy()
for p in sorted(itertools.chain(self._dirs, self._files)):
@@ -973,13 +1023,13 @@
for f in self._dirs[p]:
yield f
- def keys(self):
+ def keys(self) -> List[bytes]:
return list(self.iterkeys())
- def __iter__(self):
+ def __iter__(self) -> Iterator[bytes]:
return self.iterkeys()
- def __contains__(self, f):
+ def __contains__(self, f: bytes) -> bool:
if f is None:
return False
self._load()
@@ -994,7 +1044,7 @@
else:
return f in self._files
- def get(self, f, default=None):
+ def get(self, f: bytes, default: Optional[bytes] = None) -> Optional[bytes]:
self._load()
dir, subpath = _splittopdir(f)
if dir:
@@ -1006,7 +1056,7 @@
else:
return self._files.get(f, default)
- def __getitem__(self, f):
+ def __getitem__(self, f: bytes) -> bytes:
self._load()
dir, subpath = _splittopdir(f)
if dir:
@@ -1016,7 +1066,7 @@
else:
return self._files[f]
- def flags(self, f):
+ def flags(self, f: bytes) -> bytes:
self._load()
dir, subpath = _splittopdir(f)
if dir:
@@ -1030,7 +1080,7 @@
return b''
return self._flags.get(f, b'')
- def find(self, f):
+ def find(self, f: bytes) -> Tuple[bytes, bytes]:
self._load()
dir, subpath = _splittopdir(f)
if dir:
@@ -1040,7 +1090,7 @@
else:
return self._files[f], self._flags.get(f, b'')
- def __delitem__(self, f):
+ def __delitem__(self, f: bytes) -> None:
self._load()
dir, subpath = _splittopdir(f)
if dir:
@@ -1056,7 +1106,7 @@
del self._flags[f]
self._dirty = True
- def set(self, f, node, flags):
+ def set(self, f: bytes, node: bytes, flags: bytes) -> None:
"""Set both the node and the flags for path f."""
assert node is not None
if flags not in _manifestflags:
@@ -1076,7 +1126,7 @@
self._flags[f] = flags
self._dirty = True
- def __setitem__(self, f, n):
+ def __setitem__(self, f: bytes, n: bytes) -> None:
assert n is not None
self._load()
dir, subpath = _splittopdir(f)
@@ -1095,7 +1145,7 @@
self._files[f] = n
self._dirty = True
- def _load(self):
+ def _load(self) -> None:
if self._loadfunc is not _noop:
lf, self._loadfunc = self._loadfunc, _noop
lf(self)
@@ -1103,7 +1153,7 @@
cf, self._copyfunc = self._copyfunc, _noop
cf(self)
- def setflag(self, f, flags):
+ def setflag(self, f: bytes, flags: bytes) -> None:
"""Set the flags (symlink, executable) for path f."""
if flags not in _manifestflags:
raise TypeError(b"Invalid manifest flag set.")
@@ -1120,7 +1170,7 @@
self._flags[f] = flags
self._dirty = True
- def copy(self):
+ def copy(self) -> 'TreeManifest':
copy = treemanifest(self.nodeconstants, self._dir)
copy._node = self._node
copy._dirty = self._dirty
@@ -1145,7 +1195,9 @@
copy._copyfunc = self._copyfunc
return copy
- def filesnotin(self, m2, match=None):
+ def filesnotin(
+ self, m2: 'TreeManifest', match: Optional[matchmod.basematcher] = None
+ ) -> Set[bytes]:
'''Set of files in this manifest that are not in the other'''
if match and not match.always():
m1 = self._matches(match)
@@ -1175,13 +1227,13 @@
return files
@propertycache
- def _alldirs(self):
+ def _alldirs(self) -> pathutil.dirs:
return pathutil.dirs(self)
- def dirs(self):
+ def dirs(self) -> pathutil.dirs:
return self._alldirs
- def hasdir(self, dir):
+ def hasdir(self, dir: bytes) -> bool:
self._load()
topdir, subdir = _splittopdir(dir)
if topdir:
@@ -1192,7 +1244,7 @@
dirslash = dir + b'/'
return dirslash in self._dirs or dirslash in self._lazydirs
- def walk(self, match):
+ def walk(self, match: matchmod.basematcher) -> Iterator[bytes]:
"""Generates matching file names.
It also reports nonexistent files by marking them bad with match.bad().
@@ -1218,7 +1270,7 @@
if not self.hasdir(fn):
match.bad(fn, None)
- def _walk(self, match):
+ def _walk(self, match: matchmod.basematcher) -> Iterator[bytes]:
'''Recursively generates matching file names for walk().'''
visit = match.visitchildrenset(self._dir[:-1])
if not visit:
@@ -1237,13 +1289,13 @@
for f in self._dirs[p]._walk(match):
yield f
- def _matches(self, match):
+ def _matches(self, match: matchmod.basematcher) -> 'TreeManifest':
"""recursively generate a new manifest filtered by the match argument."""
if match.always():
return self.copy()
return self._matches_inner(match)
- def _matches_inner(self, match):
+ def _matches_inner(self, match: matchmod.basematcher) -> 'TreeManifest':
if match.always():
return self.copy()
@@ -1284,10 +1336,22 @@
ret._dirty = True
return ret
- def fastdelta(self, base, changes):
+ def fastdelta(
+ self, base: ByteString, changes: Iterable[Tuple[bytes, bool]]
+ ) -> ByteString:
raise FastdeltaUnavailable()
- def diff(self, m2, match=None, clean=False):
+ def diff(
+ self,
+ m2: 'TreeManifest',
+ match: Optional[matchmod.basematcher] = None,
+ clean: bool = False,
+ ) -> Dict[
+ bytes,
+ Optional[
+ Tuple[Tuple[Optional[bytes], bytes], Tuple[Optional[bytes], bytes]]
+ ],
+ ]:
"""Finds changes between the current manifest and m2.
Args:
@@ -1348,10 +1412,14 @@
_iterativediff(t1, t2, stackls)
return result
- def unmodifiedsince(self, m2):
+ def unmodifiedsince(self, m2: 'TreeManifest') -> bool:
return not self._dirty and not m2._dirty and self._node == m2._node
- def parse(self, text, readsubtree):
+ def parse(
+ self,
+ text: bytes,
+ readsubtree: Callable[[bytes, bytes], 'TreeManifest'],
+ ) -> None:
selflazy = self._lazydirs
for f, n, fl in _parse(self._nodelen, text):
if fl == b't':
@@ -1374,12 +1442,12 @@
if fl:
self._flags[f] = fl
- def text(self):
+ def text(self) -> ByteString:
"""Get the full data of this manifest as a bytestring."""
self._load()
return _text(self.iterentries())
- def dirtext(self):
+ def dirtext(self) -> ByteString:
"""Get the full data of this directory as a bytestring. Make sure that
any submanifests have been written first, so their nodeids are correct.
"""
@@ -1390,14 +1458,32 @@
files = [(f, self._files[f], flags(f)) for f in self._files]
return _text(sorted(dirs + files + lazydirs))
- def read(self, gettext, readsubtree):
+ def read(
+ self,
+ gettext: Callable[[], ByteString],
+ readsubtree: Callable[[bytes, bytes], 'TreeManifest'],
+ ) -> None:
def _load_for_read(s):
s.parse(gettext(), readsubtree)
s._dirty = False
self._loadfunc = _load_for_read
- def writesubtrees(self, m1, m2, writesubtree, match):
+ def writesubtrees(
+ self,
+ m1: 'TreeManifest',
+ m2: 'TreeManifest',
+ writesubtree: Callable[
+ [
+ Callable[['TreeManifest'], None],
+ bytes,
+ bytes,
+ matchmod.basematcher,
+ ],
+ None,
+ ],
+ match: matchmod.basematcher,
+ ) -> None:
self._load() # for consistency; should never have any effect here
m1._load()
m2._load()
@@ -1425,7 +1511,9 @@
subp1, subp2 = subp2, subp1
writesubtree(subm, subp1, subp2, match)
- def walksubtrees(self, matcher=None):
+ def walksubtrees(
+ self, matcher: Optional[matchmod.basematcher] = None
+ ) -> Iterator['TreeManifest']:
"""Returns an iterator of the subtrees of this manifest, including this
manifest itself.
@@ -1716,8 +1804,8 @@
link,
p1,
p2,
- added: Iterable[Iterable],
- removed: Iterable[Iterable],
+ added: Iterable[bytes],
+ removed: Iterable[bytes],
readtree=None,
match=None,
):
@@ -1959,6 +2047,10 @@
)
+AnyManifestCtx = Union['ManifestCtx', 'TreeManifestCtx']
+AnyManifestDict = Union[ManifestDict, TreeManifest]
+
+
@interfaceutil.implementer(repository.imanifestlog)
class manifestlog:
"""A collection class representing the collection of manifest snapshots
@@ -1997,7 +2089,9 @@
"""
return self.get(b'', node)
- def get(self, tree, node, verify=True):
+ def get(
+ self, tree: bytes, node: bytes, verify: bool = True
+ ) -> AnyManifestCtx:
"""Retrieves the manifest instance for the given node. Throws a
LookupError if not found.
@@ -2047,14 +2141,14 @@
def getstorage(self, tree):
return self._rootstore.dirlog(tree)
- def clearcaches(self, clear_persisted_data=False):
+ def clearcaches(self, clear_persisted_data: bool = False) -> None:
self._dirmancache.clear()
self._rootstore.clearcaches(clear_persisted_data=clear_persisted_data)
- def rev(self, node):
+ def rev(self, node) -> int:
return self._rootstore.rev(node)
- def update_caches(self, transaction):
+ def update_caches(self, transaction) -> None:
return self._rootstore._revlog.update_caches(transaction=transaction)
@@ -2063,15 +2157,15 @@
self._manifestlog = manifestlog
self._manifestdict = manifestdict(manifestlog.nodeconstants.nodelen)
- def _storage(self):
+ def _storage(self) -> ManifestRevlog:
return self._manifestlog.getstorage(b'')
- def copy(self):
+ def copy(self) -> 'MemManifestCtx':
memmf = memmanifestctx(self._manifestlog)
memmf._manifestdict = self.read().copy()
return memmf
- def read(self):
+ def read(self) -> 'ManifestDict':
return self._manifestdict
def write(self, transaction, link, p1, p2, added, removed, match=None):
@@ -2110,22 +2204,22 @@
# rev = store.rev(node)
# self.linkrev = store.linkrev(rev)
- def _storage(self):
+ def _storage(self) -> 'ManifestRevlog':
return self._manifestlog.getstorage(b'')
- def node(self):
+ def node(self) -> bytes:
return self._node
- def copy(self):
+ def copy(self) -> MemManifestCtx:
memmf = memmanifestctx(self._manifestlog)
memmf._manifestdict = self.read().copy()
return memmf
@propertycache
- def parents(self):
+ def parents(self) -> Tuple[bytes, bytes]:
return self._storage().parents(self._node)
- def read(self):
+ def read(self) -> 'ManifestDict':
if self._data is None:
nc = self._manifestlog.nodeconstants
if self._node == nc.nullid:
@@ -2141,7 +2235,7 @@
self._data = manifestdict(nc.nodelen, text)
return self._data
- def readfast(self, shallow=False):
+ def readfast(self, shallow: bool = False) -> 'ManifestDict':
"""Calls either readdelta or read, based on which would be less work.
readdelta is called if the delta is against the p1, and therefore can be
read quickly.
@@ -2155,7 +2249,7 @@
return self.readdelta()
return self.read()
- def readdelta(self, shallow=False):
+ def readdelta(self, shallow: bool = False) -> 'ManifestDict':
"""Returns a manifest containing just the entries that are present
in this manifest, but not in its p1 manifest. This is efficient to read
if the revlog delta is already p1.
@@ -2167,7 +2261,7 @@
d = mdiff.patchtext(store.revdiff(store.deltaparent(r), r))
return manifestdict(store.nodeconstants.nodelen, d)
- def find(self, key):
+ def find(self, key: bytes) -> Tuple[bytes, bytes]:
return self.read().find(key)
@@ -2182,15 +2276,15 @@
self._dir = dir
self._treemanifest = treemanifest(manifestlog.nodeconstants)
- def _storage(self):
+ def _storage(self) -> ManifestRevlog:
return self._manifestlog.getstorage(b'')
- def copy(self):
+ def copy(self) -> 'MemTreeManifestCtx':
memmf = memtreemanifestctx(self._manifestlog, dir=self._dir)
memmf._treemanifest = self._treemanifest.copy()
return memmf
- def read(self):
+ def read(self) -> 'TreeManifest':
return self._treemanifest
def write(self, transaction, link, p1, p2, added, removed, match=None):
@@ -2230,7 +2324,7 @@
# rev = store.rev(node)
# self.linkrev = store.linkrev(rev)
- def _storage(self):
+ def _storage(self) -> ManifestRevlog:
narrowmatch = self._manifestlog._narrowmatch
if not narrowmatch.always():
if not narrowmatch.visitdir(self._dir[:-1]):
@@ -2239,7 +2333,7 @@
)
return self._manifestlog.getstorage(self._dir)
- def read(self):
+ def read(self) -> 'TreeManifest':
if self._data is None:
store = self._storage()
if self._node == self._manifestlog.nodeconstants.nullid:
@@ -2272,19 +2366,19 @@
return self._data
- def node(self):
+ def node(self) -> bytes:
return self._node
- def copy(self):
+ def copy(self) -> 'MemTreeManifestCtx':
memmf = memtreemanifestctx(self._manifestlog, dir=self._dir)
memmf._treemanifest = self.read().copy()
return memmf
@propertycache
- def parents(self):
+ def parents(self) -> Tuple[bytes, bytes]:
return self._storage().parents(self._node)
- def readdelta(self, shallow=False):
+ def readdelta(self, shallow: bool = False) -> AnyManifestDict:
"""Returns a manifest containing just the entries that are present
in this manifest, but not in its p1 manifest. This is efficient to read
if the revlog delta is already p1.
@@ -2313,7 +2407,7 @@
md.setflag(f, fl1)
return md
- def readfast(self, shallow=False):
+ def readfast(self, shallow=False) -> AnyManifestDict:
"""Calls either readdelta or read, based on which would be less work.
readdelta is called if the delta is against the p1, and therefore can be
read quickly.
@@ -2334,7 +2428,7 @@
else:
return self.read()
- def find(self, key):
+ def find(self, key: bytes) -> Tuple[bytes, bytes]:
return self.read().find(key)