tests/test-cappedreader.py
author Pierre-Yves David <pierre-yves.david@octobus.net>
Fri, 15 Jan 2021 12:41:38 +0100
changeset 46314 95a615dd77bf
parent 43076 2372284d9457
child 48875 6000f5b25c9b
permissions -rw-r--r--
clone: make sure we warm the cache after a clone This work around any deviciency/limitation of the clone process. In our case this ensure the persistent nodemap exist with valid content. Ideally, the cloning process would also do "the right thing". However since older server will never be able to do "the right thing". The local workaround will be necessary anyway. I am not worried by the performance impact of this as `hg clone` is non-instant on large repositories where is could matters. Warming the cache if they are already correct is very fast. And if they are not already warm, this seems like a good time to do so. This impact various test as more cache are now warmed sooner, all the change should be harmless. Differential Revision: https://phab.mercurial-scm.org/D9789

from __future__ import absolute_import, print_function

import io
import unittest

from mercurial import util


class CappedReaderTests(unittest.TestCase):
    def testreadfull(self):
        source = io.BytesIO(b'x' * 100)

        reader = util.cappedreader(source, 10)
        res = reader.read(10)
        self.assertEqual(res, b'x' * 10)
        self.assertEqual(source.tell(), 10)
        source.seek(0)

        reader = util.cappedreader(source, 15)
        res = reader.read(16)
        self.assertEqual(res, b'x' * 15)
        self.assertEqual(source.tell(), 15)
        source.seek(0)

        reader = util.cappedreader(source, 100)
        res = reader.read(100)
        self.assertEqual(res, b'x' * 100)
        self.assertEqual(source.tell(), 100)
        source.seek(0)

        reader = util.cappedreader(source, 50)
        res = reader.read()
        self.assertEqual(res, b'x' * 50)
        self.assertEqual(source.tell(), 50)
        source.seek(0)

    def testreadnegative(self):
        source = io.BytesIO(b'x' * 100)

        reader = util.cappedreader(source, 20)
        res = reader.read(-1)
        self.assertEqual(res, b'x' * 20)
        self.assertEqual(source.tell(), 20)
        source.seek(0)

        reader = util.cappedreader(source, 100)
        res = reader.read(-1)
        self.assertEqual(res, b'x' * 100)
        self.assertEqual(source.tell(), 100)
        source.seek(0)

    def testreadmultiple(self):
        source = io.BytesIO(b'x' * 100)

        reader = util.cappedreader(source, 10)
        for i in range(10):
            res = reader.read(1)
            self.assertEqual(res, b'x')
            self.assertEqual(source.tell(), i + 1)

        self.assertEqual(source.tell(), 10)
        res = reader.read(1)
        self.assertEqual(res, b'')
        self.assertEqual(source.tell(), 10)
        source.seek(0)

        reader = util.cappedreader(source, 45)
        for i in range(4):
            res = reader.read(10)
            self.assertEqual(res, b'x' * 10)
            self.assertEqual(source.tell(), (i + 1) * 10)

        res = reader.read(10)
        self.assertEqual(res, b'x' * 5)
        self.assertEqual(source.tell(), 45)

    def readlimitpasteof(self):
        source = io.BytesIO(b'x' * 100)

        reader = util.cappedreader(source, 1024)
        res = reader.read(1000)
        self.assertEqual(res, b'x' * 100)
        self.assertEqual(source.tell(), 100)
        res = reader.read(1000)
        self.assertEqual(res, b'')
        self.assertEqual(source.tell(), 100)


if __name__ == '__main__':
    import silenttestrunner

    silenttestrunner.main(__name__)