Mercurial > hg
view tests/test-fastannotate-revmap.py @ 43023:8af909893188
context: clarify the various mode in the _copies property cache
The previous code was compact but a bit dense. The new proposed code deal with
each mode separately, there are some duplicated lines, but the meaning of each
mode stand out.
One of the benefit it to make it simpler to add further mode in the future.
Differential Revision: https://phab.mercurial-scm.org/D6933
author | Pierre-Yves David <pierre-yves.david@octobus.net> |
---|---|
date | Fri, 27 Sep 2019 00:11:03 +0200 |
parents | 0fea133780bf |
children | 2372284d9457 |
line wrap: on
line source
from __future__ import absolute_import, print_function import os import tempfile from mercurial import ( pycompat, util, ) from hgext.fastannotate import error, revmap if pycompat.ispy3: xrange = range def genhsh(i): return pycompat.bytechr(i) + b'\0' * 19 def gettemppath(): fd, path = tempfile.mkstemp() os.close(fd) os.unlink(path) return path def ensure(condition): if not condition: raise RuntimeError('Unexpected') def testbasicreadwrite(): path = gettemppath() rm = revmap.revmap(path) ensure(rm.maxrev == 0) for i in xrange(5): ensure(rm.rev2hsh(i) is None) ensure(rm.hsh2rev(b'\0' * 20) is None) paths = [ b'', b'a', None, b'b', b'b', b'c', b'c', None, b'a', b'b', b'a', b'a'] for i in xrange(1, 5): ensure(rm.append(genhsh(i), sidebranch=(i & 1), path=paths[i]) == i) ensure(rm.maxrev == 4) for i in xrange(1, 5): ensure(rm.hsh2rev(genhsh(i)) == i) ensure(rm.rev2hsh(i) == genhsh(i)) # re-load and verify rm.flush() rm = revmap.revmap(path) ensure(rm.maxrev == 4) for i in xrange(1, 5): ensure(rm.hsh2rev(genhsh(i)) == i) ensure(rm.rev2hsh(i) == genhsh(i)) ensure(bool(rm.rev2flag(i) & revmap.sidebranchflag) == bool(i & 1)) # append without calling save() explicitly for i in xrange(5, 12): ensure(rm.append(genhsh(i), sidebranch=(i & 1), path=paths[i], flush=True) == i) # re-load and verify rm = revmap.revmap(path) ensure(rm.maxrev == 11) for i in xrange(1, 12): ensure(rm.hsh2rev(genhsh(i)) == i) ensure(rm.rev2hsh(i) == genhsh(i)) ensure(rm.rev2path(i) == paths[i] or paths[i - 1]) ensure(bool(rm.rev2flag(i) & revmap.sidebranchflag) == bool(i & 1)) os.unlink(path) # missing keys ensure(rm.rev2hsh(12) is None) ensure(rm.rev2hsh(0) is None) ensure(rm.rev2hsh(-1) is None) ensure(rm.rev2flag(12) is None) ensure(rm.rev2path(12) is None) ensure(rm.hsh2rev(b'\1' * 20) is None) # illformed hash (not 20 bytes) try: rm.append(b'\0') ensure(False) except Exception: pass def testcorruptformat(): path = gettemppath() # incorrect header with open(path, 'wb') as f: f.write(b'NOT A VALID HEADER') try: revmap.revmap(path) ensure(False) except error.CorruptedFileError: pass # rewrite the file os.unlink(path) rm = revmap.revmap(path) rm.append(genhsh(0), flush=True) rm = revmap.revmap(path) ensure(rm.maxrev == 1) # corrupt the file by appending a byte size = os.stat(path).st_size with open(path, 'ab') as f: f.write(b'\xff') try: revmap.revmap(path) ensure(False) except error.CorruptedFileError: pass # corrupt the file by removing the last byte ensure(size > 0) with open(path, 'wb') as f: f.truncate(size - 1) try: revmap.revmap(path) ensure(False) except error.CorruptedFileError: pass os.unlink(path) def testcopyfrom(): path = gettemppath() rm = revmap.revmap(path) for i in xrange(1, 10): ensure(rm.append(genhsh(i), sidebranch=(i & 1), path=(b'%d' % (i // 3))) == i) rm.flush() # copy rm to rm2 rm2 = revmap.revmap() rm2.copyfrom(rm) path2 = gettemppath() rm2.path = path2 rm2.flush() # two files should be the same ensure(len(set(util.readfile(p) for p in [path, path2])) == 1) os.unlink(path) os.unlink(path2) class fakefctx(object): def __init__(self, node, path=None): self._node = node self._path = path def node(self): return self._node def path(self): return self._path def testcontains(): path = gettemppath() rm = revmap.revmap(path) for i in xrange(1, 5): ensure(rm.append(genhsh(i), sidebranch=(i & 1)) == i) for i in xrange(1, 5): ensure(((genhsh(i), None) in rm) == ((i & 1) == 0)) ensure((fakefctx(genhsh(i)) in rm) == ((i & 1) == 0)) for i in xrange(5, 10): ensure(fakefctx(genhsh(i)) not in rm) ensure((genhsh(i), None) not in rm) # "contains" checks paths rm = revmap.revmap() for i in xrange(1, 5): ensure(rm.append(genhsh(i), path=(b'%d' % (i // 2))) == i) for i in xrange(1, 5): ensure(fakefctx(genhsh(i), path=(b'%d' % (i // 2))) in rm) ensure(fakefctx(genhsh(i), path=b'a') not in rm) def testlastnode(): path = gettemppath() ensure(revmap.getlastnode(path) is None) rm = revmap.revmap(path) ensure(revmap.getlastnode(path) is None) for i in xrange(1, 10): hsh = genhsh(i) rm.append(hsh, path=(b'%d' % (i // 2)), flush=True) ensure(revmap.getlastnode(path) == hsh) rm2 = revmap.revmap(path) ensure(rm2.rev2hsh(rm2.maxrev) == hsh) testbasicreadwrite() testcorruptformat() testcopyfrom() testcontains() testlastnode()