view tests/test-simplekeyvaluefile.py @ 33765:e2fc2122029c

wireproto: remove support for local results in @batchable (API) @peer.batchable decorated generator functions have two forms: yield value, None and yield args, future yield value These forms have been present since the decorator was introduced. There are currently no in-repo consumers of the first form. So this commit removes support for it. Note that remoteiterbatcher.submit() asserts the 2nd form. And b6e71f8af5b8 removed the last user of remotebatcher, forcing everyone to remoteiterbatcher. So anything relying on this in the wild would have been broken since b6e71f8af5b8. .. api:: @peer.batchable can no longer emit local values Differential Revision: https://phab.mercurial-scm.org/D318
author Gregory Szorc <gregory.szorc@gmail.com>
date Wed, 09 Aug 2017 22:52:05 -0700
parents 68c43a416585
children 1859b9a7ddef
line wrap: on
line source

from __future__ import absolute_import

import unittest
import silenttestrunner

from mercurial import (
    error,
    scmutil,
)

class mockfile(object):
    def __init__(self, name, fs):
        self.name = name
        self.fs = fs

    def __enter__(self):
        return self

    def __exit__(self, *args, **kwargs):
        pass

    def write(self, text):
        self.fs.contents[self.name] = text

    def read(self):
        return self.fs.contents[self.name]

class mockvfs(object):
    def __init__(self):
        self.contents = {}

    def read(self, path):
        return mockfile(path, self).read()

    def readlines(self, path):
        # lines need to contain the trailing '\n' to mock the real readlines
        return [l for l in mockfile(path, self).read().splitlines(True)]

    def __call__(self, path, mode, atomictemp):
        return mockfile(path, self)

class testsimplekeyvaluefile(unittest.TestCase):
    def setUp(self):
        self.vfs = mockvfs()

    def testbasicwritingiandreading(self):
        dw = {'key1': 'value1', 'Key2': 'value2'}
        scmutil.simplekeyvaluefile(self.vfs, 'kvfile').write(dw)
        self.assertEqual(sorted(self.vfs.read('kvfile').split('\n')),
                         ['', 'Key2=value2', 'key1=value1'])
        dr = scmutil.simplekeyvaluefile(self.vfs, 'kvfile').read()
        self.assertEqual(dr, dw)

    def testinvalidkeys(self):
        d = {'0key1': 'value1', 'Key2': 'value2'}
        with self.assertRaisesRegexp(error.ProgrammingError,
                                     'keys must start with a letter.*'):
            scmutil.simplekeyvaluefile(self.vfs, 'kvfile').write(d)

        d = {'key1@': 'value1', 'Key2': 'value2'}
        with self.assertRaisesRegexp(error.ProgrammingError, 'invalid key.*'):
            scmutil.simplekeyvaluefile(self.vfs, 'kvfile').write(d)

    def testinvalidvalues(self):
        d = {'key1': 'value1', 'Key2': 'value2\n'}
        with self.assertRaisesRegexp(error.ProgrammingError,  'invalid val.*'):
            scmutil.simplekeyvaluefile(self.vfs, 'kvfile').write(d)

    def testcorruptedfile(self):
        self.vfs.contents['badfile'] = 'ababagalamaga\n'
        with self.assertRaisesRegexp(error.CorruptedState,
                                     'dictionary.*element.*'):
            scmutil.simplekeyvaluefile(self.vfs, 'badfile').read()

    def testfirstline(self):
        dw = {'key1': 'value1'}
        scmutil.simplekeyvaluefile(self.vfs, 'fl').write(dw, firstline='1.0')
        self.assertEqual(self.vfs.read('fl'), '1.0\nkey1=value1\n')
        dr = scmutil.simplekeyvaluefile(self.vfs, 'fl')\
                    .read(firstlinenonkeyval=True)
        self.assertEqual(dr, {'__firstline': '1.0', 'key1': 'value1'})

if __name__ == "__main__":
    silenttestrunner.main(__name__)