Mercurial > hg-stable
changeset 51826:79e0ee356f32
manifest: add many type annotations to the manifest module
This help to clarify the API a bit, this caught various bug in the process and
will help to catch more in the future. This also make large refactoring
significantly simpler.
author | Pierre-Yves David <pierre-yves.david@octobus.net> |
---|---|
date | Mon, 05 Aug 2024 10:03:06 +0200 |
parents | e2f1efa2bd86 |
children | ca4208713875 |
files | mercurial/manifest.py |
diffstat | 1 files changed, 239 insertions(+), 145 deletions(-) [+] |
line wrap: on
line diff
--- a/mercurial/manifest.py Mon Aug 05 10:15:10 2024 +0200 +++ b/mercurial/manifest.py Mon Aug 05 10:03:06 2024 +0200 @@ -12,7 +12,17 @@ import weakref from typing import ( + ByteString, + Callable, + Dict, Iterable, + Iterator, + List, + Optional, + Set, + Tuple, + Union, + cast, ) from .i18n import _ @@ -47,7 +57,7 @@ FASTDELTA_TEXTDIFF_THRESHOLD = 1000 -def _parse(nodelen, data): +def _parse(nodelen, data: bytes): # This method does a little bit of excessive-looking # precondition checking. This is so that the behavior of this # class exactly matches its C counterpart to try and help @@ -88,21 +98,23 @@ class lazymanifestiter: - def __init__(self, lm): + def __init__(self, lm: '_LazyManifest') -> None: self.pos = 0 self.lm = lm - def __iter__(self): + def __iter__(self) -> 'lazymanifestiter': return self - def next(self): + def next(self) -> bytes: try: data, pos = self.lm._get(self.pos) except IndexError: raise StopIteration if pos == -1: + assert isinstance(data, tuple) self.pos += 1 return data[0] + assert isinstance(data, bytes) self.pos += 1 zeropos = data.find(b'\x00', pos) return data[pos:zeropos] @@ -111,21 +123,23 @@ class lazymanifestiterentries: - def __init__(self, lm): + def __init__(self, lm: '_LazyManifest') -> None: self.lm = lm self.pos = 0 - def __iter__(self): + def __iter__(self) -> 'lazymanifestiterentries': return self - def next(self): + def next(self) -> Tuple[bytes, bytes, bytes]: try: data, pos = self.lm._get(self.pos) except IndexError: raise StopIteration if pos == -1: + assert isinstance(data, tuple) self.pos += 1 return data + assert isinstance(data, bytes) zeropos = data.find(b'\x00', pos) nlpos = data.find(b'\n', pos) if zeropos == -1 or nlpos == -1 or nlpos < zeropos: @@ -181,12 +195,12 @@ def __init__( self, - nodelen, - data, + nodelen: int, + data: bytes, positions=None, extrainfo=None, extradata=None, - hasremovals=False, + hasremovals: bool = False, ): self._nodelen = nodelen if positions is None: @@ -202,7 +216,7 @@ self.data = data self.hasremovals = hasremovals - def findlines(self, data): + def findlines(self, data: bytes) -> List[int]: if not data: return [] pos = data.find(b"\n") @@ -219,7 +233,9 @@ pos = data.find(b"\n", pos + 1) return positions - def _get(self, index): + def _get( + self, index: int + ) -> Tuple[Union[bytes, Tuple[bytes, bytes, bytes]], int]: # get the position encoded in pos: # positive number is an index in 'data' # negative number is in extrapieces @@ -228,12 +244,12 @@ return self.data, pos return self.extradata[-pos - 1], -1 - def _getkey(self, pos): + def _getkey(self, pos) -> bytes: if pos >= 0: return self.data[pos : self.data.find(b'\x00', pos + 1)] return self.extradata[-pos - 1][0] - def bsearch(self, key): + def bsearch(self, key: bytes) -> int: first = 0 last = len(self.positions) - 1 @@ -251,7 +267,7 @@ first = midpoint + 1 return -1 - def bsearch2(self, key): + def bsearch2(self, key: bytes) -> Tuple[int, bool]: # same as the above, but will always return the position # done for performance reasons first = 0 @@ -271,10 +287,10 @@ first = midpoint + 1 return (first, False) - def __contains__(self, key): + def __contains__(self, key: bytes) -> bool: return self.bsearch(key) != -1 - def __getitem__(self, key): + def __getitem__(self, key: bytes) -> Tuple[bytes, bytes]: if not isinstance(key, bytes): raise TypeError(b"getitem: manifest keys must be a bytes.") needle = self.bsearch(key) @@ -282,7 +298,10 @@ raise KeyError data, pos = self._get(needle) if pos == -1: + assert isinstance(data, tuple) return (data[1], data[2]) + + assert isinstance(data, bytes) zeropos = data.find(b'\x00', pos) nlpos = data.find(b'\n', zeropos) assert 0 <= needle <= len(self.positions) @@ -300,7 +319,7 @@ hashval = unhexlify(data, self.extrainfo[needle], zeropos + 1, hlen) return (hashval, flags) - def __delitem__(self, key): + def __delitem__(self, key: bytes) -> None: needle, found = self.bsearch2(key) if not found: raise KeyError @@ -313,7 +332,7 @@ self.data = self.data[:cur] + b'\x00' + self.data[cur + 1 :] self.hasremovals = True - def __setitem__(self, key, value): + def __setitem__(self, key: bytes, value: Tuple[bytes, bytes]): if not isinstance(key, bytes): raise TypeError(b"setitem: manifest keys must be a byte string.") if not isinstance(value, tuple) or len(value) != 2: @@ -348,7 +367,7 @@ self.extrainfo[:needle] + [0] + self.extrainfo[needle:] ) - def copy(self): + def copy(self) -> '_LazyManifest': # XXX call _compact like in C? return _lazymanifest( self._nodelen, @@ -359,7 +378,7 @@ self.hasremovals, ) - def _compact(self): + def _compact(self) -> None: # hopefully not called TOO often if len(self.extradata) == 0 and not self.hasremovals: return @@ -418,16 +437,23 @@ self.hasremovals = False self.extradata = [] - def _pack(self, d): + def _pack(self, d: Tuple[bytes, bytes, bytes]) -> bytes: n = d[1] assert len(n) in (20, 32) return d[0] + b'\x00' + hex(n) + d[2] + b'\n' - def text(self): + def text(self) -> ByteString: self._compact() return self.data - def diff(self, m2, clean=False): + def diff( + self, m2: '_LazyManifest', clean: bool = False + ) -> Dict[ + bytes, + Optional[ + Tuple[Tuple[Optional[bytes], bytes], Tuple[Optional[bytes], bytes]] + ], + ]: '''Finds changes between the current manifest and m2.''' # XXX think whether efficiency matters here diff = {} @@ -448,19 +474,19 @@ return diff - def iterentries(self): + def iterentries(self) -> lazymanifestiterentries: return lazymanifestiterentries(self) - def iterkeys(self): + def iterkeys(self) -> lazymanifestiter: return lazymanifestiter(self) - def __iter__(self): + def __iter__(self) -> lazymanifestiter: return lazymanifestiter(self) - def __len__(self): + def __len__(self) -> int: return len(self.positions) - def filtercopy(self, filterfn): + def filtercopy(self, filterfn: Callable[[bytes], bool]) -> '_LazyManifest': # XXX should be optimized c = _lazymanifest(self._nodelen, b'') for f, n, fl in self.iterentries(): @@ -476,50 +502,50 @@ class ManifestDict: - def __init__(self, nodelen, data=b''): + def __init__(self, nodelen: int, data: ByteString = b''): self._nodelen = nodelen self._lm = _lazymanifest(nodelen, data) - def __getitem__(self, key): + def __getitem__(self, key: bytes) -> bytes: return self._lm[key][0] - def find(self, key): + def find(self, key: bytes) -> Tuple[bytes, bytes]: return self._lm[key] - def __len__(self): + def __len__(self) -> int: return len(self._lm) - def __nonzero__(self): + def __nonzero__(self) -> bool: # nonzero is covered by the __len__ function, but implementing it here # makes it easier for extensions to override. return len(self._lm) != 0 __bool__ = __nonzero__ - def set(self, key, node, flags): + def set(self, key: bytes, node: bytes, flags: bytes) -> None: self._lm[key] = node, flags - def __setitem__(self, key, node): + def __setitem__(self, key: bytes, node: bytes) -> None: self._lm[key] = node, self.flags(key) - def __contains__(self, key): + def __contains__(self, key: bytes) -> bool: if key is None: return False return key in self._lm - def __delitem__(self, key): + def __delitem__(self, key: bytes) -> bool: del self._lm[key] - def __iter__(self): + def __iter__(self) -> Iterator[bytes]: return self._lm.__iter__() - def iterkeys(self): + def iterkeys(self) -> Iterator[bytes]: return self._lm.iterkeys() - def keys(self): + def keys(self) -> List[bytes]: return list(self.iterkeys()) - def filesnotin(self, m2, match=None): + def filesnotin(self, m2, match=None) -> Set[bytes]: '''Set of files in this manifest that are not in the other''' if match is not None: match = matchmod.badmatch(match, lambda path, msg: None) @@ -528,16 +554,16 @@ return {f for f in self if f not in m2} @propertycache - def _dirs(self): + def _dirs(self) -> pathutil.dirs: return pathutil.dirs(self) - def dirs(self): + def dirs(self) -> pathutil.dirs: return self._dirs - def hasdir(self, dir): + def hasdir(self, dir: bytes) -> bool: return dir in self._dirs - def _filesfastpath(self, match): + def _filesfastpath(self, match: matchmod.basematcher) -> bool: """Checks whether we can correctly and quickly iterate over matcher files instead of over manifest files.""" files = match.files() @@ -546,7 +572,7 @@ or (match.prefix() and all(fn in self for fn in files)) ) - def walk(self, match): + def walk(self, match: matchmod.basematcher) -> Iterator[bytes]: """Generates matching file names. Equivalent to manifest.matches(match).iterkeys(), but without creating @@ -583,7 +609,7 @@ if not self.hasdir(fn): match.bad(fn, None) - def _matches(self, match): + def _matches(self, match: matchmod.basematcher) -> 'ManifestDict': '''generate a new manifest filtered by the match argument''' if match.always(): return self.copy() @@ -600,7 +626,17 @@ m._lm = self._lm.filtercopy(match) return m - def diff(self, m2, match=None, clean=False): + def diff( + self, + m2: 'ManifestDict', + match: Optional[matchmod.basematcher] = None, + clean: bool = False, + ) -> Dict[ + bytes, + Optional[ + Tuple[Tuple[Optional[bytes], bytes], Tuple[Optional[bytes], bytes]] + ], + ]: """Finds changes between the current manifest and m2. Args: @@ -621,42 +657,44 @@ return m1.diff(m2, clean=clean) return self._lm.diff(m2._lm, clean) - def setflag(self, key, flag): + def setflag(self, key: bytes, flag: bytes) -> None: if flag not in _manifestflags: raise TypeError(b"Invalid manifest flag set.") self._lm[key] = self[key], flag - def get(self, key, default=None): + def get(self, key: bytes, default=None) -> Optional[bytes]: try: return self._lm[key][0] except KeyError: return default - def flags(self, key): + def flags(self, key: bytes) -> bytes: try: return self._lm[key][1] except KeyError: return b'' - def copy(self): + def copy(self) -> 'ManifestDict': c = manifestdict(self._nodelen) c._lm = self._lm.copy() return c - def items(self): + def items(self) -> Iterator[Tuple[bytes, bytes]]: return (x[:2] for x in self._lm.iterentries()) - def iteritems(self): + def iteritems(self) -> Iterator[Tuple[bytes, bytes]]: return (x[:2] for x in self._lm.iterentries()) - def iterentries(self): + def iterentries(self) -> Iterator[Tuple[bytes, bytes, bytes]]: return self._lm.iterentries() - def text(self): + def text(self) -> ByteString: # most likely uses native version return self._lm.text() - def fastdelta(self, base, changes): + def fastdelta( + self, base: ByteString, changes: Iterable[Tuple[bytes, bool]] + ) -> Tuple[ByteString, ByteString]: """Given a base manifest text as a bytearray and a list of changes relative to that text, compute a delta that can be used by revlog. """ @@ -715,17 +753,17 @@ manifestdict = interfaceutil.implementer(repository.imanifestdict)(ManifestDict) -def _msearch(m, s, lo=0, hi=None): +def _msearch( + m: ByteString, s: bytes, lo: int = 0, hi: Optional[int] = None +) -> Tuple[int, int]: """return a tuple (start, end) that says where to find s within m. If the string is found m[start:end] are the line containing that string. If start == end the string was not found and they indicate the proper sorted insertion point. - - m should be a buffer, a memoryview or a byte string. - s is a byte string""" - - def advance(i, c): + """ + + def advance(i: int, c: bytes): while i < lenm and m[i : i + 1] != c: i += 1 return i @@ -758,7 +796,7 @@ return (lo, lo) -def _checkforbidden(l): +def _checkforbidden(l: Iterable[bytes]) -> None: """Check filenames for illegal characters.""" for f in l: if b'\n' in f or b'\r' in f: @@ -770,7 +808,10 @@ # apply the changes collected during the bisect loop to our addlist # return a delta suitable for addrevision -def _addlistdelta(addlist, x): +def _addlistdelta( + addlist: ByteString, + x: Iterable[Tuple[int, int, bytes]], +) -> Tuple[bytes, ByteString]: # for large addlist arrays, building a new array is cheaper # than repeatedly modifying the existing one currentposition = 0 @@ -792,7 +833,7 @@ return deltatext, newaddlist -def _splittopdir(f): +def _splittopdir(f: bytes) -> Tuple[bytes, bytes]: if b'/' in f: dir, subpath = f.split(b'/', 1) return dir + b'/', subpath @@ -804,7 +845,7 @@ class TreeManifest: - def __init__(self, nodeconstants, dir=b'', text=b''): + def __init__(self, nodeconstants, dir: bytes = b'', text: bytes = b''): self._dir = dir self.nodeconstants = nodeconstants self._node = self.nodeconstants.nullid @@ -812,10 +853,13 @@ self._loadfunc = _noop self._copyfunc = _noop self._dirty = False - self._dirs = {} - self._lazydirs = {} + self._dirs: Dict[bytes, 'TreeManifest'] = {} + self._lazydirs: Dict[ + bytes, + Tuple[bytes, Callable[[bytes, bytes], 'TreeManifest'], bool], + ] = {} # Using _lazymanifest here is a little slower than plain old dicts - self._files = {} + self._files: Dict[bytes, bytes] = {} self._flags = {} if text: @@ -827,10 +871,10 @@ self.parse(text, readsubtree) self._dirty = True # Mark flat manifest dirty after parsing - def _subpath(self, path): + def _subpath(self, path: bytes) -> bytes: return self._dir + path - def _loadalllazy(self): + def _loadalllazy(self) -> None: selfdirs = self._dirs subpath = self._subpath for d, (node, readsubtree, docopy) in self._lazydirs.items(): @@ -840,7 +884,7 @@ selfdirs[d] = readsubtree(subpath(d), node) self._lazydirs.clear() - def _loadlazy(self, d): + def _loadlazy(self, d: bytes) -> None: v = self._lazydirs.get(d) if v is not None: node, readsubtree, docopy = v @@ -850,19 +894,23 @@ self._dirs[d] = readsubtree(self._subpath(d), node) del self._lazydirs[d] - def _loadchildrensetlazy(self, visit): + def _loadchildrensetlazy( + self, visit: Union[Set[bytes], bytes] + ) -> Optional[Set[bytes]]: if not visit: return None if visit == b'all' or visit == b'this': self._loadalllazy() return None + visit = cast(Set[bytes], visit) + loadlazy = self._loadlazy for k in visit: loadlazy(k + b'/') return visit - def _loaddifflazy(self, t1, t2): + def _loaddifflazy(self, t1: 'TreeManifest', t2: 'TreeManifest'): """load items in t1 and t2 if they're needed for diffing. The criteria currently is: @@ -884,7 +932,7 @@ t1._loadlazy(d) t2._loadlazy(d) - def __len__(self): + def __len__(self) -> int: self._load() size = len(self._files) self._loadalllazy() @@ -892,13 +940,13 @@ size += m.__len__() return size - def __nonzero__(self): - # Faster than "__len() != 0" since it avoids loading sub-manifests + def __nonzero__(self) -> bool: + # Faster than "__len__() != 0" since it avoids loading sub-manifests return not self._isempty() __bool__ = __nonzero__ - def _isempty(self): + def _isempty(self) -> bool: self._load() # for consistency; already loaded by all callers # See if we can skip loading everything. if self._files or ( @@ -909,7 +957,7 @@ return not self._dirs or all(m._isempty() for m in self._dirs.values()) @encoding.strmethod - def __repr__(self): + def __repr__(self) -> bytes: return ( b'<treemanifest dir=%s, node=%s, loaded=%r, dirty=%r at 0x%x>' % ( @@ -921,23 +969,25 @@ ) ) - def dir(self): + def dir(self) -> bytes: """The directory that this tree manifest represents, including a trailing '/'. Empty string for the repo root directory.""" return self._dir - def node(self): + def node(self) -> bytes: """This node of this instance. nullid for unsaved instances. Should be updated when the instance is read or written from a revlog. """ assert not self._dirty return self._node - def setnode(self, node): + def setnode(self, node: bytes) -> None: self._node = node self._dirty = False - def iterentries(self): + def iterentries( + self, + ) -> Iterator[Tuple[bytes, Union[bytes, 'TreeManifest'], bytes]]: self._load() self._loadalllazy() for p, n in sorted( @@ -949,7 +999,7 @@ for x in n.iterentries(): yield x - def items(self): + def items(self) -> Iterator[Tuple[bytes, Union[bytes, 'TreeManifest']]]: self._load() self._loadalllazy() for p, n in sorted( @@ -963,7 +1013,7 @@ iteritems = items - def iterkeys(self): + def iterkeys(self) -> Iterator[bytes]: self._load() self._loadalllazy() for p in sorted(itertools.chain(self._dirs, self._files)): @@ -973,13 +1023,13 @@ for f in self._dirs[p]: yield f - def keys(self): + def keys(self) -> List[bytes]: return list(self.iterkeys()) - def __iter__(self): + def __iter__(self) -> Iterator[bytes]: return self.iterkeys() - def __contains__(self, f): + def __contains__(self, f: bytes) -> bool: if f is None: return False self._load() @@ -994,7 +1044,7 @@ else: return f in self._files - def get(self, f, default=None): + def get(self, f: bytes, default: Optional[bytes] = None) -> Optional[bytes]: self._load() dir, subpath = _splittopdir(f) if dir: @@ -1006,7 +1056,7 @@ else: return self._files.get(f, default) - def __getitem__(self, f): + def __getitem__(self, f: bytes) -> bytes: self._load() dir, subpath = _splittopdir(f) if dir: @@ -1016,7 +1066,7 @@ else: return self._files[f] - def flags(self, f): + def flags(self, f: bytes) -> bytes: self._load() dir, subpath = _splittopdir(f) if dir: @@ -1030,7 +1080,7 @@ return b'' return self._flags.get(f, b'') - def find(self, f): + def find(self, f: bytes) -> Tuple[bytes, bytes]: self._load() dir, subpath = _splittopdir(f) if dir: @@ -1040,7 +1090,7 @@ else: return self._files[f], self._flags.get(f, b'') - def __delitem__(self, f): + def __delitem__(self, f: bytes) -> None: self._load() dir, subpath = _splittopdir(f) if dir: @@ -1056,7 +1106,7 @@ del self._flags[f] self._dirty = True - def set(self, f, node, flags): + def set(self, f: bytes, node: bytes, flags: bytes) -> None: """Set both the node and the flags for path f.""" assert node is not None if flags not in _manifestflags: @@ -1076,7 +1126,7 @@ self._flags[f] = flags self._dirty = True - def __setitem__(self, f, n): + def __setitem__(self, f: bytes, n: bytes) -> None: assert n is not None self._load() dir, subpath = _splittopdir(f) @@ -1095,7 +1145,7 @@ self._files[f] = n self._dirty = True - def _load(self): + def _load(self) -> None: if self._loadfunc is not _noop: lf, self._loadfunc = self._loadfunc, _noop lf(self) @@ -1103,7 +1153,7 @@ cf, self._copyfunc = self._copyfunc, _noop cf(self) - def setflag(self, f, flags): + def setflag(self, f: bytes, flags: bytes) -> None: """Set the flags (symlink, executable) for path f.""" if flags not in _manifestflags: raise TypeError(b"Invalid manifest flag set.") @@ -1120,7 +1170,7 @@ self._flags[f] = flags self._dirty = True - def copy(self): + def copy(self) -> 'TreeManifest': copy = treemanifest(self.nodeconstants, self._dir) copy._node = self._node copy._dirty = self._dirty @@ -1145,7 +1195,9 @@ copy._copyfunc = self._copyfunc return copy - def filesnotin(self, m2, match=None): + def filesnotin( + self, m2: 'TreeManifest', match: Optional[matchmod.basematcher] = None + ) -> Set[bytes]: '''Set of files in this manifest that are not in the other''' if match and not match.always(): m1 = self._matches(match) @@ -1175,13 +1227,13 @@ return files @propertycache - def _alldirs(self): + def _alldirs(self) -> pathutil.dirs: return pathutil.dirs(self) - def dirs(self): + def dirs(self) -> pathutil.dirs: return self._alldirs - def hasdir(self, dir): + def hasdir(self, dir: bytes) -> bool: self._load() topdir, subdir = _splittopdir(dir) if topdir: @@ -1192,7 +1244,7 @@ dirslash = dir + b'/' return dirslash in self._dirs or dirslash in self._lazydirs - def walk(self, match): + def walk(self, match: matchmod.basematcher) -> Iterator[bytes]: """Generates matching file names. It also reports nonexistent files by marking them bad with match.bad(). @@ -1218,7 +1270,7 @@ if not self.hasdir(fn): match.bad(fn, None) - def _walk(self, match): + def _walk(self, match: matchmod.basematcher) -> Iterator[bytes]: '''Recursively generates matching file names for walk().''' visit = match.visitchildrenset(self._dir[:-1]) if not visit: @@ -1237,13 +1289,13 @@ for f in self._dirs[p]._walk(match): yield f - def _matches(self, match): + def _matches(self, match: matchmod.basematcher) -> 'TreeManifest': """recursively generate a new manifest filtered by the match argument.""" if match.always(): return self.copy() return self._matches_inner(match) - def _matches_inner(self, match): + def _matches_inner(self, match: matchmod.basematcher) -> 'TreeManifest': if match.always(): return self.copy() @@ -1284,10 +1336,22 @@ ret._dirty = True return ret - def fastdelta(self, base, changes): + def fastdelta( + self, base: ByteString, changes: Iterable[Tuple[bytes, bool]] + ) -> ByteString: raise FastdeltaUnavailable() - def diff(self, m2, match=None, clean=False): + def diff( + self, + m2: 'TreeManifest', + match: Optional[matchmod.basematcher] = None, + clean: bool = False, + ) -> Dict[ + bytes, + Optional[ + Tuple[Tuple[Optional[bytes], bytes], Tuple[Optional[bytes], bytes]] + ], + ]: """Finds changes between the current manifest and m2. Args: @@ -1348,10 +1412,14 @@ _iterativediff(t1, t2, stackls) return result - def unmodifiedsince(self, m2): + def unmodifiedsince(self, m2: 'TreeManifest') -> bool: return not self._dirty and not m2._dirty and self._node == m2._node - def parse(self, text, readsubtree): + def parse( + self, + text: bytes, + readsubtree: Callable[[bytes, bytes], 'TreeManifest'], + ) -> None: selflazy = self._lazydirs for f, n, fl in _parse(self._nodelen, text): if fl == b't': @@ -1374,12 +1442,12 @@ if fl: self._flags[f] = fl - def text(self): + def text(self) -> ByteString: """Get the full data of this manifest as a bytestring.""" self._load() return _text(self.iterentries()) - def dirtext(self): + def dirtext(self) -> ByteString: """Get the full data of this directory as a bytestring. Make sure that any submanifests have been written first, so their nodeids are correct. """ @@ -1390,14 +1458,32 @@ files = [(f, self._files[f], flags(f)) for f in self._files] return _text(sorted(dirs + files + lazydirs)) - def read(self, gettext, readsubtree): + def read( + self, + gettext: Callable[[], ByteString], + readsubtree: Callable[[bytes, bytes], 'TreeManifest'], + ) -> None: def _load_for_read(s): s.parse(gettext(), readsubtree) s._dirty = False self._loadfunc = _load_for_read - def writesubtrees(self, m1, m2, writesubtree, match): + def writesubtrees( + self, + m1: 'TreeManifest', + m2: 'TreeManifest', + writesubtree: Callable[ + [ + Callable[['TreeManifest'], None], + bytes, + bytes, + matchmod.basematcher, + ], + None, + ], + match: matchmod.basematcher, + ) -> None: self._load() # for consistency; should never have any effect here m1._load() m2._load() @@ -1425,7 +1511,9 @@ subp1, subp2 = subp2, subp1 writesubtree(subm, subp1, subp2, match) - def walksubtrees(self, matcher=None): + def walksubtrees( + self, matcher: Optional[matchmod.basematcher] = None + ) -> Iterator['TreeManifest']: """Returns an iterator of the subtrees of this manifest, including this manifest itself. @@ -1716,8 +1804,8 @@ link, p1, p2, - added: Iterable[Iterable], - removed: Iterable[Iterable], + added: Iterable[bytes], + removed: Iterable[bytes], readtree=None, match=None, ): @@ -1959,6 +2047,10 @@ ) +AnyManifestCtx = Union['ManifestCtx', 'TreeManifestCtx'] +AnyManifestDict = Union[ManifestDict, TreeManifest] + + @interfaceutil.implementer(repository.imanifestlog) class manifestlog: """A collection class representing the collection of manifest snapshots @@ -1997,7 +2089,9 @@ """ return self.get(b'', node) - def get(self, tree, node, verify=True): + def get( + self, tree: bytes, node: bytes, verify: bool = True + ) -> AnyManifestCtx: """Retrieves the manifest instance for the given node. Throws a LookupError if not found. @@ -2047,14 +2141,14 @@ def getstorage(self, tree): return self._rootstore.dirlog(tree) - def clearcaches(self, clear_persisted_data=False): + def clearcaches(self, clear_persisted_data: bool = False) -> None: self._dirmancache.clear() self._rootstore.clearcaches(clear_persisted_data=clear_persisted_data) - def rev(self, node): + def rev(self, node) -> int: return self._rootstore.rev(node) - def update_caches(self, transaction): + def update_caches(self, transaction) -> None: return self._rootstore._revlog.update_caches(transaction=transaction) @@ -2063,15 +2157,15 @@ self._manifestlog = manifestlog self._manifestdict = manifestdict(manifestlog.nodeconstants.nodelen) - def _storage(self): + def _storage(self) -> ManifestRevlog: return self._manifestlog.getstorage(b'') - def copy(self): + def copy(self) -> 'MemManifestCtx': memmf = memmanifestctx(self._manifestlog) memmf._manifestdict = self.read().copy() return memmf - def read(self): + def read(self) -> 'ManifestDict': return self._manifestdict def write(self, transaction, link, p1, p2, added, removed, match=None): @@ -2110,22 +2204,22 @@ # rev = store.rev(node) # self.linkrev = store.linkrev(rev) - def _storage(self): + def _storage(self) -> 'ManifestRevlog': return self._manifestlog.getstorage(b'') - def node(self): + def node(self) -> bytes: return self._node - def copy(self): + def copy(self) -> MemManifestCtx: memmf = memmanifestctx(self._manifestlog) memmf._manifestdict = self.read().copy() return memmf @propertycache - def parents(self): + def parents(self) -> Tuple[bytes, bytes]: return self._storage().parents(self._node) - def read(self): + def read(self) -> 'ManifestDict': if self._data is None: nc = self._manifestlog.nodeconstants if self._node == nc.nullid: @@ -2141,7 +2235,7 @@ self._data = manifestdict(nc.nodelen, text) return self._data - def readfast(self, shallow=False): + def readfast(self, shallow: bool = False) -> 'ManifestDict': """Calls either readdelta or read, based on which would be less work. readdelta is called if the delta is against the p1, and therefore can be read quickly. @@ -2155,7 +2249,7 @@ return self.readdelta() return self.read() - def readdelta(self, shallow=False): + def readdelta(self, shallow: bool = False) -> 'ManifestDict': """Returns a manifest containing just the entries that are present in this manifest, but not in its p1 manifest. This is efficient to read if the revlog delta is already p1. @@ -2167,7 +2261,7 @@ d = mdiff.patchtext(store.revdiff(store.deltaparent(r), r)) return manifestdict(store.nodeconstants.nodelen, d) - def find(self, key): + def find(self, key: bytes) -> Tuple[bytes, bytes]: return self.read().find(key) @@ -2182,15 +2276,15 @@ self._dir = dir self._treemanifest = treemanifest(manifestlog.nodeconstants) - def _storage(self): + def _storage(self) -> ManifestRevlog: return self._manifestlog.getstorage(b'') - def copy(self): + def copy(self) -> 'MemTreeManifestCtx': memmf = memtreemanifestctx(self._manifestlog, dir=self._dir) memmf._treemanifest = self._treemanifest.copy() return memmf - def read(self): + def read(self) -> 'TreeManifest': return self._treemanifest def write(self, transaction, link, p1, p2, added, removed, match=None): @@ -2230,7 +2324,7 @@ # rev = store.rev(node) # self.linkrev = store.linkrev(rev) - def _storage(self): + def _storage(self) -> ManifestRevlog: narrowmatch = self._manifestlog._narrowmatch if not narrowmatch.always(): if not narrowmatch.visitdir(self._dir[:-1]): @@ -2239,7 +2333,7 @@ ) return self._manifestlog.getstorage(self._dir) - def read(self): + def read(self) -> 'TreeManifest': if self._data is None: store = self._storage() if self._node == self._manifestlog.nodeconstants.nullid: @@ -2272,19 +2366,19 @@ return self._data - def node(self): + def node(self) -> bytes: return self._node - def copy(self): + def copy(self) -> 'MemTreeManifestCtx': memmf = memtreemanifestctx(self._manifestlog, dir=self._dir) memmf._treemanifest = self.read().copy() return memmf @propertycache - def parents(self): + def parents(self) -> Tuple[bytes, bytes]: return self._storage().parents(self._node) - def readdelta(self, shallow=False): + def readdelta(self, shallow: bool = False) -> AnyManifestDict: """Returns a manifest containing just the entries that are present in this manifest, but not in its p1 manifest. This is efficient to read if the revlog delta is already p1. @@ -2313,7 +2407,7 @@ md.setflag(f, fl1) return md - def readfast(self, shallow=False): + def readfast(self, shallow=False) -> AnyManifestDict: """Calls either readdelta or read, based on which would be less work. readdelta is called if the delta is against the p1, and therefore can be read quickly. @@ -2334,7 +2428,7 @@ else: return self.read() - def find(self, key): + def find(self, key: bytes) -> Tuple[bytes, bytes]: return self.read().find(key)