--- a/mercurial/shelve.py Tue Aug 20 18:30:47 2024 -0400
+++ b/mercurial/shelve.py Tue Aug 20 22:34:51 2024 -0400
@@ -26,6 +26,16 @@
import itertools
import stat
+from typing import (
+ Any,
+ Dict,
+ Iterable,
+ Iterator,
+ List,
+ Sequence,
+ Tuple,
+)
+
from .i18n import _
from .node import (
bin,
@@ -37,6 +47,7 @@
bundle2,
changegroup,
cmdutil,
+ context as contextmod,
discovery,
error,
exchange,
@@ -69,16 +80,16 @@
class ShelfDir:
- def __init__(self, repo, for_backups=False):
+ def __init__(self, repo, for_backups: bool = False) -> None:
if for_backups:
self.vfs = vfsmod.vfs(repo.vfs.join(backupdir))
else:
self.vfs = vfsmod.vfs(repo.vfs.join(shelvedir))
- def get(self, name):
+ def get(self, name: bytes) -> "Shelf":
return Shelf(self.vfs, name)
- def listshelves(self):
+ def listshelves(self) -> List[Tuple[float, bytes]]:
"""return all shelves in repo as list of (time, name)"""
try:
names = self.vfs.listdir()
@@ -99,14 +110,14 @@
return sorted(info, reverse=True)
-def _use_internal_phase(repo):
+def _use_internal_phase(repo) -> bool:
return (
phases.supportinternal(repo)
and repo.ui.config(b'shelve', b'store') == b'internal'
)
-def _target_phase(repo):
+def _target_phase(repo) -> int:
return phases.internal if _use_internal_phase(repo) else phases.secret
@@ -118,29 +129,29 @@
differences and lets you work with the shelf as a whole.
"""
- def __init__(self, vfs, name):
+ def __init__(self, vfs: vfsmod.vfs, name: bytes) -> None:
self.vfs = vfs
self.name = name
- def exists(self):
+ def exists(self) -> bool:
return self._exists(b'.shelve') or self._exists(b'.patch', b'.hg')
- def _exists(self, *exts):
+ def _exists(self, *exts: bytes) -> bool:
return all(self.vfs.exists(self.name + ext) for ext in exts)
- def mtime(self):
+ def mtime(self) -> float:
try:
return self._stat(b'.shelve')[stat.ST_MTIME]
except FileNotFoundError:
return self._stat(b'.patch')[stat.ST_MTIME]
- def _stat(self, ext):
+ def _stat(self, ext: bytes):
return self.vfs.stat(self.name + ext)
- def writeinfo(self, info):
+ def writeinfo(self, info) -> None:
scmutil.simplekeyvaluefile(self.vfs, self.name + b'.shelve').write(info)
- def hasinfo(self):
+ def hasinfo(self) -> bool:
return self.vfs.exists(self.name + b'.shelve')
def readinfo(self):
@@ -148,7 +159,7 @@
self.vfs, self.name + b'.shelve'
).read()
- def writebundle(self, repo, bases, node):
+ def writebundle(self, repo, bases, node) -> None:
cgversion = changegroup.safeversion(repo)
if cgversion == b'01':
btype = b'HG10BZ'
@@ -174,7 +185,7 @@
compression=compression,
)
- def applybundle(self, repo, tr):
+ def applybundle(self, repo, tr) -> contextmod.changectx:
filename = self.name + b'.hg'
fp = self.vfs(filename)
try:
@@ -197,10 +208,10 @@
finally:
fp.close()
- def open_patch(self, mode=b'rb'):
+ def open_patch(self, mode: bytes = b'rb'):
return self.vfs(self.name + b'.patch', mode)
- def patch_from_node(self, repo, node):
+ def patch_from_node(self, repo, node) -> io.BytesIO:
repo = repo.unfiltered()
match = _optimized_match(repo, node)
fp = io.BytesIO()
@@ -221,8 +232,8 @@
except (FileNotFoundError, error.RepoLookupError):
return self.open_patch()
- def _backupfilename(self, backupvfs, filename):
- def gennames(base):
+ def _backupfilename(self, backupvfs: vfsmod.vfs, filename: bytes) -> bytes:
+ def gennames(base: bytes):
yield base
base, ext = base.rsplit(b'.', 1)
for i in itertools.count(1):
@@ -232,7 +243,10 @@
if not backupvfs.exists(n):
return backupvfs.join(n)
- def movetobackup(self, backupvfs):
+ # Help pytype- gennames() yields infinitely
+ raise error.ProgrammingError("unreachable")
+
+ def movetobackup(self, backupvfs: vfsmod.vfs) -> None:
if not backupvfs.isdir():
backupvfs.makedir()
for suffix in shelvefileextensions:
@@ -243,7 +257,7 @@
self._backupfilename(backupvfs, filename),
)
- def delete(self):
+ def delete(self) -> None:
for ext in shelvefileextensions:
self.vfs.tryunlink(self.name + b'.' + ext)
@@ -256,7 +270,7 @@
return patch.changedfiles(ui, repo, filename)
-def _optimized_match(repo, node):
+def _optimized_match(repo, node: bytes):
"""
Create a matcher so that prefetch doesn't attempt to fetch
the entire repository pointlessly, and as an optimisation
@@ -272,6 +286,7 @@
versions of a shelved state are possible and handles them appropriately.
"""
+ # Class-wide constants
_version = 2
_filename = b'shelvedstate'
_keep = b'keep'
@@ -280,8 +295,19 @@
_noactivebook = b':no-active-bookmark'
_interactive = b'interactive'
+ # Per instance attrs
+ name: bytes
+ wctx: contextmod.workingctx
+ pendingctx: contextmod.changectx
+ parents: List[bytes]
+ nodestoremove: List[bytes]
+ branchtorestore: bytes
+ keep: bool
+ activebookmark: bytes
+ interactive: bool
+
@classmethod
- def _verifyandtransform(cls, d):
+ def _verifyandtransform(cls, d: Dict[bytes, Any]) -> None:
"""Some basic shelvestate syntactic verification and transformation"""
try:
d[b'originalwctx'] = bin(d[b'originalwctx'])
@@ -294,7 +320,7 @@
raise error.CorruptedState(stringutil.forcebytestr(err))
@classmethod
- def _getversion(cls, repo):
+ def _getversion(cls, repo) -> int:
"""Read version information from shelvestate file"""
fp = repo.vfs(cls._filename)
try:
@@ -306,7 +332,7 @@
return version
@classmethod
- def _readold(cls, repo):
+ def _readold(cls, repo) -> Dict[bytes, Any]:
"""Read the old position-based version of a shelvestate file"""
# Order is important, because old shelvestate file uses it
# to detemine values of fields (i.g. name is on the second line,
@@ -373,15 +399,15 @@
def save(
cls,
repo,
- name,
- originalwctx,
- pendingctx,
- nodestoremove,
- branchtorestore,
- keep=False,
- activebook=b'',
- interactive=False,
- ):
+ name: bytes,
+ originalwctx: contextmod.workingctx,
+ pendingctx: contextmod.changectx,
+ nodestoremove: List[bytes],
+ branchtorestore: bytes,
+ keep: bool = False,
+ activebook: bytes = b'',
+ interactive: bool = False,
+ ) -> None:
info = {
b"name": name,
b"originalwctx": hex(originalwctx.node()),
@@ -399,11 +425,11 @@
)
@classmethod
- def clear(cls, repo):
+ def clear(cls, repo) -> None:
repo.vfs.unlinkpath(cls._filename, ignoremissing=True)
-def cleanupoldbackups(repo):
+def cleanupoldbackups(repo) -> None:
maxbackups = repo.ui.configint(b'shelve', b'maxbackups')
backup_dir = ShelfDir(repo, for_backups=True)
hgfiles = backup_dir.listshelves()
@@ -418,19 +444,19 @@
backup_dir.get(name).delete()
-def _backupactivebookmark(repo):
+def _backupactivebookmark(repo) -> bytes:
activebookmark = repo._activebookmark
if activebookmark:
bookmarks.deactivate(repo)
return activebookmark
-def _restoreactivebookmark(repo, mark):
+def _restoreactivebookmark(repo, mark) -> None:
if mark:
bookmarks.activate(repo, mark)
-def _aborttransaction(repo, tr):
+def _aborttransaction(repo, tr) -> None:
"""Abort current transaction for shelve/unshelve, but keep dirstate"""
# disable the transaction invalidation of the dirstate, to preserve the
# current change in memory.
@@ -456,7 +482,7 @@
ds.setbranch(current_branch, None)
-def getshelvename(repo, parent, opts):
+def getshelvename(repo, parent, opts) -> bytes:
"""Decide on the name this shelve is going to have"""
def gennames():
@@ -496,7 +522,7 @@
return name
-def mutableancestors(ctx):
+def mutableancestors(ctx) -> Iterator[bytes]:
"""return all mutable ancestors for ctx (included)
Much faster than the revset ancestors(ctx) & draft()"""
@@ -514,7 +540,7 @@
visit.append(parent)
-def getcommitfunc(extra, interactive, editor=False):
+def getcommitfunc(extra, interactive: bool, editor: bool = False):
def commitfunc(ui, repo, message, match, opts):
hasmq = hasattr(repo, 'mq')
if hasmq:
@@ -550,7 +576,7 @@
return interactivecommitfunc if interactive else commitfunc
-def _nothingtoshelvemessaging(ui, repo, pats, opts):
+def _nothingtoshelvemessaging(ui, repo, pats, opts) -> None:
stat = repo.status(match=scmutil.match(repo[None], pats, opts))
if stat.deleted:
ui.status(
@@ -561,7 +587,7 @@
ui.status(_(b"nothing changed\n"))
-def _shelvecreatedcommit(repo, node, name, match):
+def _shelvecreatedcommit(repo, node: bytes, name: bytes, match) -> None:
info = {b'node': hex(node)}
shelf = ShelfDir(repo).get(name)
shelf.writeinfo(info)
@@ -573,14 +599,14 @@
)
-def _includeunknownfiles(repo, pats, opts, extra):
+def _includeunknownfiles(repo, pats, opts, extra) -> None:
s = repo.status(match=scmutil.match(repo[None], pats, opts), unknown=True)
if s.unknown:
extra[b'shelve_unknown'] = b'\0'.join(s.unknown)
repo[None].add(s.unknown)
-def _finishshelve(repo, tr):
+def _finishshelve(repo, tr) -> None:
if _use_internal_phase(repo):
tr.close()
else:
@@ -675,7 +701,7 @@
lockmod.release(tr, lock)
-def _isbareshelve(pats, opts):
+def _isbareshelve(pats, opts) -> bool:
return (
not pats
and not opts.get(b'interactive', False)
@@ -684,11 +710,11 @@
)
-def _iswctxonnewbranch(repo):
+def _iswctxonnewbranch(repo) -> bool:
return repo[None].branch() != repo[b'.'].branch()
-def cleanupcmd(ui, repo):
+def cleanupcmd(ui, repo) -> None:
"""subcommand that deletes all shelves"""
with repo.wlock():
@@ -699,7 +725,7 @@
cleanupoldbackups(repo)
-def deletecmd(ui, repo, pats):
+def deletecmd(ui, repo, pats) -> None:
"""subcommand that deletes a specific shelve"""
if not pats:
raise error.InputError(_(b'no shelved changes specified!'))
@@ -715,7 +741,7 @@
cleanupoldbackups(repo)
-def listcmd(ui, repo, pats, opts):
+def listcmd(ui, repo, pats: Iterable[bytes], opts) -> None:
"""subcommand that displays the list of shelves"""
pats = set(pats)
width = 80
@@ -762,7 +788,7 @@
ui.write(chunk, label=label)
-def patchcmds(ui, repo, pats, opts):
+def patchcmds(ui, repo, pats: Sequence[bytes], opts) -> None:
"""subcommand that displays shelves"""
shelf_dir = ShelfDir(repo)
if len(pats) == 0:
@@ -779,7 +805,7 @@
listcmd(ui, repo, pats, opts)
-def checkparents(repo, state):
+def checkparents(repo, state: shelvedstate) -> None:
"""check parent while resuming an unshelve"""
if state.parents != repo.dirstate.parents():
raise error.Abort(
@@ -787,7 +813,7 @@
)
-def _loadshelvedstate(ui, repo, opts):
+def _loadshelvedstate(ui, repo, opts) -> shelvedstate:
try:
state = shelvedstate.load(repo)
if opts.get(b'keep') is None:
@@ -819,7 +845,7 @@
)
-def unshelveabort(ui, repo, state):
+def unshelveabort(ui, repo, state: shelvedstate) -> None:
"""subcommand that abort an in-progress unshelve"""
with repo.lock():
try:
@@ -838,14 +864,14 @@
ui.warn(_(b"unshelve of '%s' aborted\n") % state.name)
-def hgabortunshelve(ui, repo):
+def hgabortunshelve(ui, repo) -> None:
"""logic to abort unshelve using 'hg abort"""
with repo.wlock():
state = _loadshelvedstate(ui, repo, {b'abort': True})
return unshelveabort(ui, repo, state)
-def mergefiles(ui, repo, wctx, shelvectx):
+def mergefiles(ui, repo, wctx, shelvectx) -> None:
"""updates to wctx and merges the changes from shelvectx into the
dirstate."""
with ui.configoverride({(b'ui', b'quiet'): True}):
@@ -853,7 +879,7 @@
cmdutil.revert(ui, repo, shelvectx)
-def restorebranch(ui, repo, branchtorestore):
+def restorebranch(ui, repo, branchtorestore: bytes) -> None:
if branchtorestore and branchtorestore != repo.dirstate.branch():
repo.dirstate.setbranch(branchtorestore, repo.currenttransaction())
ui.status(
@@ -861,7 +887,7 @@
)
-def unshelvecleanup(ui, repo, name, opts):
+def unshelvecleanup(ui, repo, name: bytes, opts) -> None:
"""remove related files after an unshelve"""
if not opts.get(b'keep'):
backupvfs = vfsmod.vfs(repo.vfs.join(backupdir))
@@ -869,7 +895,7 @@
cleanupoldbackups(repo)
-def unshelvecontinue(ui, repo, state, opts):
+def unshelvecontinue(ui, repo, state: shelvedstate, opts) -> None:
"""subcommand to continue an in-progress unshelve"""
# We're finishing off a merge. First parent is our original
# parent, second is the temporary "fake" commit we're unshelving.
@@ -927,7 +953,7 @@
ui.status(_(b"unshelve of '%s' complete\n") % state.name)
-def hgcontinueunshelve(ui, repo):
+def hgcontinueunshelve(ui, repo) -> None:
"""logic to resume unshelve using 'hg continue'"""
with repo.wlock():
state = _loadshelvedstate(ui, repo, {b'continue': True})
@@ -959,7 +985,7 @@
return tmpwctx, addedbefore
-def _unshelverestorecommit(ui, repo, tr, basename):
+def _unshelverestorecommit(ui, repo, tr, basename: bytes):
"""Recreate commit in the repository during the unshelve"""
repo = repo.unfiltered()
node = None
@@ -980,7 +1006,9 @@
return repo, shelvectx
-def _createunshelvectx(ui, repo, shelvectx, basename, interactive, opts):
+def _createunshelvectx(
+ ui, repo, shelvectx, basename: bytes, interactive: bool, opts
+) -> Tuple[bytes, bool]:
"""Handles the creation of unshelve commit and updates the shelve if it
was partially unshelved.
@@ -1042,7 +1070,7 @@
opts,
tr,
oldtiprev,
- basename,
+ basename: bytes,
pctx,
tmpwctx,
shelvectx,
@@ -1113,7 +1141,7 @@
return shelvectx, ispartialunshelve
-def _forgetunknownfiles(repo, shelvectx, addedbefore):
+def _forgetunknownfiles(repo, shelvectx, addedbefore) -> None:
# Forget any files that were unknown before the shelve, unknown before
# unshelve started, but are now added.
shelveunknown = shelvectx.extra().get(b'shelve_unknown')
@@ -1125,7 +1153,7 @@
repo[None].forget(toforget)
-def _finishunshelve(repo, oldtiprev, tr, activebookmark):
+def _finishunshelve(repo, oldtiprev, tr, activebookmark) -> None:
_restoreactivebookmark(repo, activebookmark)
# We used to manually strip the commit to update inmemory structure and
# prevent some issue around hooks. This no longer seems to be the case, so
@@ -1133,7 +1161,7 @@
_aborttransaction(repo, tr)
-def _checkunshelveuntrackedproblems(ui, repo, shelvectx):
+def _checkunshelveuntrackedproblems(ui, repo, shelvectx) -> None:
"""Check potential problems which may result from working
copy having untracked changes."""
wcdeleted = set(repo.status().deleted)
@@ -1145,7 +1173,7 @@
raise error.Abort(m, hint=hint)
-def unshelvecmd(ui, repo, *shelved, **opts):
+def unshelvecmd(ui, repo, *shelved, **opts) -> None:
opts = pycompat.byteskwargs(opts)
abortf = opts.get(b'abort')
continuef = opts.get(b'continue')
@@ -1182,6 +1210,11 @@
)
elif continuef:
return unshelvecontinue(ui, repo, state, opts)
+ else:
+ # Unreachable code, but help type checkers not think that
+ # 'basename' may be used before initialization when checking
+ # ShelfDir below.
+ raise error.ProgrammingError("neither abort nor continue specified")
elif len(shelved) > 1:
raise error.InputError(_(b'can only unshelve one change at a time'))
elif not shelved:
@@ -1199,7 +1232,7 @@
return _dounshelve(ui, repo, basename, opts)
-def _dounshelve(ui, repo, basename, opts):
+def _dounshelve(ui, repo, basename: bytes, opts) -> None:
repo = repo.unfiltered()
lock = tr = None
try: