# HG changeset patch # User Siddharth Agarwal # Date 1442529480 25200 # Node ID c4b667a7a51ddb2d2e728d4b708718af8ff5aa68 # Parent 2239626369f5d89a8c4a1b8dffe2dfcbe33d9012 tests: add unit tests for locking code We're going to make significant changes to lock behavior soon. diff -r 2239626369f5 -r c4b667a7a51d tests/test-lock.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/test-lock.py Thu Sep 17 15:38:00 2015 -0700 @@ -0,0 +1,114 @@ +from __future__ import absolute_import + +import os +import silenttestrunner +import tempfile +import unittest + +from mercurial import ( + lock, + scmutil, +) + +testlockname = 'testlock' + +class teststate(object): + def __init__(self, testcase): + self._testcase = testcase + self._releasecalled = False + self._postreleasecalled = False + d = tempfile.mkdtemp(dir=os.getcwd()) + self.vfs = scmutil.vfs(d, audit=False) + + def makelock(self, *args, **kwargs): + l = lock.lock(self.vfs, testlockname, releasefn=self.releasefn, *args, + **kwargs) + l.postrelease.append(self.postreleasefn) + return l + + def releasefn(self): + self._releasecalled = True + + def postreleasefn(self): + self._postreleasecalled = True + + def assertreleasecalled(self, called): + self._testcase.assertEqual( + self._releasecalled, called, + 'expected release to be %s but was actually %s' % ( + self._tocalled(called), + self._tocalled(self._releasecalled), + )) + + def assertpostreleasecalled(self, called): + self._testcase.assertEqual( + self._postreleasecalled, called, + 'expected postrelease to be %s but was actually %s' % ( + self._tocalled(called), + self._tocalled(self._postreleasecalled), + )) + + def assertlockexists(self, exists): + actual = self.vfs.lexists(testlockname) + self._testcase.assertEqual( + actual, exists, + 'expected lock to %s but actually did %s' % ( + self._toexists(exists), + self._toexists(actual), + )) + + def _tocalled(self, called): + if called: + return 'called' + else: + return 'not called' + + def _toexists(self, exists): + if exists: + return 'exists' + else: + return 'not exists' + +class testlock(unittest.TestCase): + def testlock(self): + state = teststate(self) + lock = state.makelock() + lock.release() + state.assertreleasecalled(True) + state.assertpostreleasecalled(True) + state.assertlockexists(False) + + def testrecursivelock(self): + state = teststate(self) + lock = state.makelock() + lock.lock() + lock.release() # brings lock refcount down from 2 to 1 + state.assertreleasecalled(False) + state.assertpostreleasecalled(False) + state.assertlockexists(True) + + lock.release() # releases the lock + state.assertreleasecalled(True) + state.assertpostreleasecalled(True) + state.assertlockexists(False) + + def testlockfork(self): + state = teststate(self) + lock = state.makelock() + lock.lock() + # fake a fork + lock.pid += 1 + lock.release() + state.assertreleasecalled(False) + state.assertpostreleasecalled(False) + state.assertlockexists(True) + + # release the actual lock + lock.pid -= 1 + lock.release() + state.assertreleasecalled(True) + state.assertpostreleasecalled(True) + state.assertlockexists(False) + +if __name__ == '__main__': + silenttestrunner.main(__name__)