Mercurial > hg
view tests/test-lock.py @ 31793:69d8fcf20014
help: document bundle specifications
I softly formalized the concept of a "bundle specification" a while
ago when I was working on clone bundles and stream clone bundles and
wanted a more robust way to define what exactly is in a bundle file.
The concept has existed for a while. Since it is part of the clone
bundles feature and exposed to the user via the "-t" argument to
`hg bundle`, it is something we need to support for the long haul.
After the 4.1 release, I heard a few people comment that they didn't
realize you could generate zstd bundles with `hg bundle`. I'm
partially to blame for not documenting it in bundle's docstring.
Additionally, I added a hacky, experimental feature for controlling
the compression level of bundles in 76104a4899ad. As the commit
message says, I went with a quick and dirty solution out of time
constraints. Furthermore, I wanted to eventually store this
configuration in the "bundlespec" so it could be made more flexible.
Given:
a) bundlespecs are here to stay
b) we don't have great documentation over what they are, despite being
a user-facing feature
c) the list of available compression engines and their behavior isn't
exposed
d) we need an extensible place to modify behavior of compression
engines
I want to move forward with formalizing bundlespecs as a user-facing
feature. This commit does that by introducing a "bundlespec" help
page. Leaning on the just-added compression engine documentation
and API, the topic also conveniently lists available compression
engines and details about them. This makes features like zstd
bundle compression more discoverable. e.g. you can now
`hg help -k zstd` and it lists the "bundlespec" topic.
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Sat, 01 Apr 2017 13:42:06 -0700 |
parents | e067741d4607 |
children | 0d892d820a51 |
line wrap: on
line source
from __future__ import absolute_import import copy import os import silenttestrunner import tempfile import types import unittest from mercurial import ( error, lock, vfs as vfsmod, ) testlockname = 'testlock' # work around http://bugs.python.org/issue1515 if types.MethodType not in copy._deepcopy_dispatch: def _deepcopy_method(x, memo): return type(x)(x.im_func, copy.deepcopy(x.im_self, memo), x.im_class) copy._deepcopy_dispatch[types.MethodType] = _deepcopy_method class lockwrapper(lock.lock): def __init__(self, pidoffset, *args, **kwargs): # lock.lock.__init__() calls lock(), so the pidoffset assignment needs # to be earlier self._pidoffset = pidoffset super(lockwrapper, self).__init__(*args, **kwargs) def _getpid(self): return super(lockwrapper, self)._getpid() + self._pidoffset class teststate(object): def __init__(self, testcase, dir, pidoffset=0): self._testcase = testcase self._acquirecalled = False self._releasecalled = False self._postreleasecalled = False self.vfs = vfsmod.vfs(dir, audit=False) self._pidoffset = pidoffset def makelock(self, *args, **kwargs): l = lockwrapper(self._pidoffset, self.vfs, testlockname, releasefn=self.releasefn, acquirefn=self.acquirefn, *args, **kwargs) l.postrelease.append(self.postreleasefn) return l def acquirefn(self): self._acquirecalled = True def releasefn(self): self._releasecalled = True def postreleasefn(self): self._postreleasecalled = True def assertacquirecalled(self, called): self._testcase.assertEqual( self._acquirecalled, called, 'expected acquire to be %s but was actually %s' % ( self._tocalled(called), self._tocalled(self._acquirecalled), )) def resetacquirefn(self): self._acquirecalled = False def assertreleasecalled(self, called): self._testcase.assertEqual( self._releasecalled, called, 'expected release to be %s but was actually %s' % ( self._tocalled(called), self._tocalled(self._releasecalled), )) def assertpostreleasecalled(self, called): self._testcase.assertEqual( self._postreleasecalled, called, 'expected postrelease to be %s but was actually %s' % ( self._tocalled(called), self._tocalled(self._postreleasecalled), )) def assertlockexists(self, exists): actual = self.vfs.lexists(testlockname) self._testcase.assertEqual( actual, exists, 'expected lock to %s but actually did %s' % ( self._toexists(exists), self._toexists(actual), )) def _tocalled(self, called): if called: return 'called' else: return 'not called' def _toexists(self, exists): if exists: return 'exist' else: return 'not exist' class testlock(unittest.TestCase): def testlock(self): state = teststate(self, tempfile.mkdtemp(dir=os.getcwd())) lock = state.makelock() state.assertacquirecalled(True) lock.release() state.assertreleasecalled(True) state.assertpostreleasecalled(True) state.assertlockexists(False) def testrecursivelock(self): state = teststate(self, tempfile.mkdtemp(dir=os.getcwd())) lock = state.makelock() state.assertacquirecalled(True) state.resetacquirefn() lock.lock() # recursive lock should not call acquirefn again state.assertacquirecalled(False) lock.release() # brings lock refcount down from 2 to 1 state.assertreleasecalled(False) state.assertpostreleasecalled(False) state.assertlockexists(True) lock.release() # releases the lock state.assertreleasecalled(True) state.assertpostreleasecalled(True) state.assertlockexists(False) def testlockfork(self): state = teststate(self, tempfile.mkdtemp(dir=os.getcwd())) lock = state.makelock() state.assertacquirecalled(True) # fake a fork forklock = copy.deepcopy(lock) forklock._pidoffset = 1 forklock.release() state.assertreleasecalled(False) state.assertpostreleasecalled(False) state.assertlockexists(True) # release the actual lock lock.release() state.assertreleasecalled(True) state.assertpostreleasecalled(True) state.assertlockexists(False) def testinheritlock(self): d = tempfile.mkdtemp(dir=os.getcwd()) parentstate = teststate(self, d) parentlock = parentstate.makelock() parentstate.assertacquirecalled(True) # set up lock inheritance with parentlock.inherit() as lockname: parentstate.assertreleasecalled(True) parentstate.assertpostreleasecalled(False) parentstate.assertlockexists(True) childstate = teststate(self, d, pidoffset=1) childlock = childstate.makelock(parentlock=lockname) childstate.assertacquirecalled(True) childlock.release() childstate.assertreleasecalled(True) childstate.assertpostreleasecalled(False) childstate.assertlockexists(True) parentstate.resetacquirefn() parentstate.assertacquirecalled(True) parentlock.release() parentstate.assertreleasecalled(True) parentstate.assertpostreleasecalled(True) parentstate.assertlockexists(False) def testmultilock(self): d = tempfile.mkdtemp(dir=os.getcwd()) state0 = teststate(self, d) lock0 = state0.makelock() state0.assertacquirecalled(True) with lock0.inherit() as lock0name: state0.assertreleasecalled(True) state0.assertpostreleasecalled(False) state0.assertlockexists(True) state1 = teststate(self, d, pidoffset=1) lock1 = state1.makelock(parentlock=lock0name) state1.assertacquirecalled(True) # from within lock1, acquire another lock with lock1.inherit() as lock1name: # since the file on disk is lock0's this should have the same # name self.assertEqual(lock0name, lock1name) state2 = teststate(self, d, pidoffset=2) lock2 = state2.makelock(parentlock=lock1name) state2.assertacquirecalled(True) lock2.release() state2.assertreleasecalled(True) state2.assertpostreleasecalled(False) state2.assertlockexists(True) state1.resetacquirefn() state1.assertacquirecalled(True) lock1.release() state1.assertreleasecalled(True) state1.assertpostreleasecalled(False) state1.assertlockexists(True) lock0.release() def testinheritlockfork(self): d = tempfile.mkdtemp(dir=os.getcwd()) parentstate = teststate(self, d) parentlock = parentstate.makelock() parentstate.assertacquirecalled(True) # set up lock inheritance with parentlock.inherit() as lockname: childstate = teststate(self, d, pidoffset=1) childlock = childstate.makelock(parentlock=lockname) childstate.assertacquirecalled(True) # fork the child lock forkchildlock = copy.deepcopy(childlock) forkchildlock._pidoffset += 1 forkchildlock.release() childstate.assertreleasecalled(False) childstate.assertpostreleasecalled(False) childstate.assertlockexists(True) # release the child lock childlock.release() childstate.assertreleasecalled(True) childstate.assertpostreleasecalled(False) childstate.assertlockexists(True) parentlock.release() def testinheritcheck(self): d = tempfile.mkdtemp(dir=os.getcwd()) state = teststate(self, d) def check(): raise error.LockInheritanceContractViolation('check failed') lock = state.makelock(inheritchecker=check) state.assertacquirecalled(True) def tryinherit(): with lock.inherit(): pass self.assertRaises(error.LockInheritanceContractViolation, tryinherit) lock.release() if __name__ == '__main__': silenttestrunner.main(__name__)