view tests/test-lock.py @ 26454:62c5e937f477

pager: recreate stdout to make it line-buffered We want to see partial command results as soon as possible. But the buffering mode of stdout (= pager's stdin) was set to fully-buffered because it isn't associated with a tty. So, this patch recreates new stdout object to force its buffering mode. Because two file objects are associated with the same stdout fd and their destructors will call close(), one of them must be closed carefully. Python expects that the stdout fd never be closed even after sys.stdout.close() [1], but newstdout has no such hack. So this patch calls newstdout.close() immediately before duplicating the original stdout fd to sys.stdout. operation sys.stdout newstdout fd --------------------- ---------- --------- -------- newstdout.close() open closed closed os.dup2(stdoutfd, ..) open closed open del sys.stdout closed closed open [1] [1]: https://hg.python.org/cpython/file/v2.7.10/Python/sysmodule.c#l1391
author Yuya Nishihara <yuya@tcha.org>
date Sat, 03 Oct 2015 15:16:33 +0900
parents e16f80f89a29
children 5f94e64f182c
line wrap: on
line source

from __future__ import absolute_import

import copy
import os
import silenttestrunner
import tempfile
import types
import unittest

from mercurial import (
    lock,
    scmutil,
)

testlockname = 'testlock'

# work around http://bugs.python.org/issue1515
if types.MethodType not in copy._deepcopy_dispatch:
    def _deepcopy_method(x, memo):
        return type(x)(x.im_func, copy.deepcopy(x.im_self, memo), x.im_class)
    copy._deepcopy_dispatch[types.MethodType] = _deepcopy_method

class lockwrapper(lock.lock):
    def __init__(self, pidoffset, *args, **kwargs):
        # lock.lock.__init__() calls lock(), so the pidoffset assignment needs
        # to be earlier
        self._pidoffset = pidoffset
        super(lockwrapper, self).__init__(*args, **kwargs)
    def _getpid(self):
        return os.getpid() + self._pidoffset

class teststate(object):
    def __init__(self, testcase, dir, pidoffset=0):
        self._testcase = testcase
        self._acquirecalled = False
        self._releasecalled = False
        self._postreleasecalled = False
        self.vfs = scmutil.vfs(dir, audit=False)
        self._pidoffset = pidoffset

    def makelock(self, *args, **kwargs):
        l = lockwrapper(self._pidoffset, self.vfs, testlockname,
                        releasefn=self.releasefn, acquirefn=self.acquirefn,
                        *args, **kwargs)
        l.postrelease.append(self.postreleasefn)
        return l

    def acquirefn(self):
        self._acquirecalled = True

    def releasefn(self):
        self._releasecalled = True

    def postreleasefn(self):
        self._postreleasecalled = True

    def assertacquirecalled(self, called):
        self._testcase.assertEqual(
            self._acquirecalled, called,
            'expected acquire to be %s but was actually %s' % (
                self._tocalled(called),
                self._tocalled(self._acquirecalled),
            ))

    def resetacquirefn(self):
        self._acquirecalled = False

    def assertreleasecalled(self, called):
        self._testcase.assertEqual(
            self._releasecalled, called,
            'expected release to be %s but was actually %s' % (
                self._tocalled(called),
                self._tocalled(self._releasecalled),
            ))

    def assertpostreleasecalled(self, called):
        self._testcase.assertEqual(
            self._postreleasecalled, called,
            'expected postrelease to be %s but was actually %s' % (
                self._tocalled(called),
                self._tocalled(self._postreleasecalled),
            ))

    def assertlockexists(self, exists):
        actual = self.vfs.lexists(testlockname)
        self._testcase.assertEqual(
            actual, exists,
            'expected lock to %s but actually did %s' % (
                self._toexists(exists),
                self._toexists(actual),
            ))

    def _tocalled(self, called):
        if called:
            return 'called'
        else:
            return 'not called'

    def _toexists(self, exists):
        if exists:
            return 'exist'
        else:
            return 'not exist'

class testlock(unittest.TestCase):
    def testlock(self):
        state = teststate(self, tempfile.mkdtemp(dir=os.getcwd()))
        lock = state.makelock()
        state.assertacquirecalled(True)
        lock.release()
        state.assertreleasecalled(True)
        state.assertpostreleasecalled(True)
        state.assertlockexists(False)

    def testrecursivelock(self):
        state = teststate(self, tempfile.mkdtemp(dir=os.getcwd()))
        lock = state.makelock()
        state.assertacquirecalled(True)

        state.resetacquirefn()
        lock.lock()
        # recursive lock should not call acquirefn again
        state.assertacquirecalled(False)

        lock.release() # brings lock refcount down from 2 to 1
        state.assertreleasecalled(False)
        state.assertpostreleasecalled(False)
        state.assertlockexists(True)

        lock.release() # releases the lock
        state.assertreleasecalled(True)
        state.assertpostreleasecalled(True)
        state.assertlockexists(False)

    def testlockfork(self):
        state = teststate(self, tempfile.mkdtemp(dir=os.getcwd()))
        lock = state.makelock()
        state.assertacquirecalled(True)

        # fake a fork
        forklock = copy.deepcopy(lock)
        forklock._pidoffset = 1
        forklock.release()
        state.assertreleasecalled(False)
        state.assertpostreleasecalled(False)
        state.assertlockexists(True)

        # release the actual lock
        lock.release()
        state.assertreleasecalled(True)
        state.assertpostreleasecalled(True)
        state.assertlockexists(False)

    def testinheritlock(self):
        d = tempfile.mkdtemp(dir=os.getcwd())
        parentstate = teststate(self, d)
        parentlock = parentstate.makelock()
        parentstate.assertacquirecalled(True)

        # set up lock inheritance
        lockname = parentlock.prepinherit()
        parentstate.assertreleasecalled(True)
        parentstate.assertpostreleasecalled(False)
        parentstate.assertlockexists(True)

        childstate = teststate(self, d, pidoffset=1)
        childlock = childstate.makelock(parentlock=lockname)
        childstate.assertacquirecalled(True)

        # release the child lock -- the lock file should still exist on disk
        childlock.release()
        childstate.assertreleasecalled(True)
        childstate.assertpostreleasecalled(True)
        childstate.assertlockexists(True)

        parentstate.resetacquirefn()
        parentlock.reacquire()
        parentstate.assertacquirecalled(True)

        parentlock.release()
        parentstate.assertreleasecalled(True)
        parentstate.assertpostreleasecalled(True)
        parentstate.assertlockexists(False)

    def testmultilock(self):
        d = tempfile.mkdtemp(dir=os.getcwd())
        state0 = teststate(self, d)
        lock0 = state0.makelock()
        state0.assertacquirecalled(True)

        lock0name = lock0.prepinherit()
        state0.assertreleasecalled(True)
        state0.assertpostreleasecalled(False)
        state0.assertlockexists(True)

        state1 = teststate(self, d, pidoffset=1)
        lock1 = state1.makelock(parentlock=lock0name)
        state1.assertacquirecalled(True)

        # from within lock1, acquire another lock
        lock1name = lock1.prepinherit()
        # since the file on disk is lock0's this should have the same name
        self.assertEqual(lock0name, lock1name)

        state2 = teststate(self, d, pidoffset=2)
        lock2 = state2.makelock(parentlock=lock1name)
        state2.assertacquirecalled(True)

        lock2.release()
        state2.assertreleasecalled(True)
        state2.assertpostreleasecalled(True)
        state2.assertlockexists(True)

        state1.resetacquirefn()
        lock1.reacquire()
        state1.assertacquirecalled(True)

        lock1.release()
        state1.assertreleasecalled(True)
        state1.assertpostreleasecalled(True)
        state1.assertlockexists(True)

        lock0.reacquire()
        lock0.release()

    def testinheritlockfork(self):
        d = tempfile.mkdtemp(dir=os.getcwd())
        parentstate = teststate(self, d)
        parentlock = parentstate.makelock()
        parentstate.assertacquirecalled(True)

        # set up lock inheritance
        lockname = parentlock.prepinherit()
        childstate = teststate(self, d, pidoffset=1)
        childlock = childstate.makelock(parentlock=lockname)
        childstate.assertacquirecalled(True)

        # fork the child lock
        forkchildlock = copy.deepcopy(childlock)
        forkchildlock._pidoffset += 1
        forkchildlock.release()
        childstate.assertreleasecalled(False)
        childstate.assertpostreleasecalled(False)
        childstate.assertlockexists(True)

        # release the child lock
        childlock.release()
        childstate.assertreleasecalled(True)
        childstate.assertpostreleasecalled(True)
        childstate.assertlockexists(True)

        parentlock.reacquire()
        parentlock.release()

if __name__ == '__main__':
    silenttestrunner.main(__name__)