util: teach lrucachedict to enforce a max total cost
Now that lrucachedict entries can have a numeric cost associated
with them and we can easily pop the oldest item in the cache, it
now becomes relatively trivial to implement support for enforcing
a high water mark on the total cost of items in the cache.
This commit teaches lrucachedict instances to have a max cost
associated with them. When items are inserted, we pop old items
until enough "cost" frees up to make room for the new item.
This feature is close to zero cost when not used (modulo the insertion
regressed introduced by the previous commit):
$ ./hg perflrucachedict --size 4 --gets 1000000 --sets 1000000 --mixed 1000000
! gets
! wall 0.607444 comb 0.610000 user 0.610000 sys 0.000000 (best of 17)
! wall 0.601653 comb 0.600000 user 0.600000 sys 0.000000 (best of 17)
! inserts
! wall 0.678261 comb 0.680000 user 0.680000 sys 0.000000 (best of 14)
! wall 0.685042 comb 0.680000 user 0.680000 sys 0.000000 (best of 15)
! sets
! wall 0.808770 comb 0.800000 user 0.800000 sys 0.000000 (best of 13)
! wall 0.834241 comb 0.830000 user 0.830000 sys 0.000000 (best of 12)
! mixed
! wall 0.782441 comb 0.780000 user 0.780000 sys 0.000000 (best of 13)
! wall 0.803804 comb 0.800000 user 0.800000 sys 0.000000 (best of 13)
$ hg perflrucachedict --size 1000 --gets 1000000 --sets 1000000 --mixed 1000000
! init
! wall 0.006952 comb 0.010000 user 0.010000 sys 0.000000 (best of 418)
! gets
! wall 0.613350 comb 0.610000 user 0.610000 sys 0.000000 (best of 17)
! wall 0.617415 comb 0.620000 user 0.620000 sys 0.000000 (best of 17)
! inserts
! wall 0.701270 comb 0.700000 user 0.700000 sys 0.000000 (best of 15)
! wall 0.700516 comb 0.700000 user 0.700000 sys 0.000000 (best of 15)
! sets
! wall 0.825720 comb 0.830000 user 0.830000 sys 0.000000 (best of 13)
! wall 0.837946 comb 0.840000 user 0.830000 sys 0.010000 (best of 12)
! mixed
! wall 0.821644 comb 0.820000 user 0.820000 sys 0.000000 (best of 13)
! wall 0.850559 comb 0.850000 user 0.850000 sys 0.000000 (best of 12)
I reckon the slight slowdown on insert is due to added if checks.
For caches with total cost limiting enabled:
$ hg perflrucachedict --size 4 --gets 1000000 --sets 1000000 --mixed 1000000 --costlimit 100
! gets w/ cost limit
! wall 0.598737 comb 0.590000 user 0.590000 sys 0.000000 (best of 17)
! inserts w/ cost limit
! wall 1.694282 comb 1.700000 user 1.700000 sys 0.000000 (best of 6)
! mixed w/ cost limit
! wall 1.157655 comb 1.150000 user 1.150000 sys 0.000000 (best of 9)
$ hg perflrucachedict --size 1000 --gets 1000000 --sets 1000000 --mixed 1000000 --costlimit 10000
! gets w/ cost limit
! wall 0.598526 comb 0.600000 user 0.600000 sys 0.000000 (best of 17)
! inserts w/ cost limit
! wall 37.838315 comb 37.840000 user 37.840000 sys 0.000000 (best of 3)
! mixed w/ cost limit
! wall 18.060198 comb 18.060000 user 18.060000 sys 0.000000 (best of 3)
$ hg perflrucachedict --size 1000 --gets 1000000 --sets 1000000 --mixed 1000000 --costlimit 10000 --mixedgetfreq 90
! gets w/ cost limit
! wall 0.600024 comb 0.600000 user 0.600000 sys 0.000000 (best of 17)
! inserts w/ cost limit
! wall 37.154547 comb 37.120000 user 37.120000 sys 0.000000 (best of 3)
! mixed w/ cost limit
! wall 4.381602 comb 4.380000 user 4.370000 sys 0.010000 (best of 3)
The functions we're benchmarking are slightly different, which could
move numbers by a few milliseconds. But the slowdown on insert is too
great to be explained by that. The slowness is due to insert heavy
operations needing to call popoldest() repeatedly when the cache is
at capacity. The next commit will address this.
Differential Revision: https://phab.mercurial-scm.org/D4503
from __future__ import absolute_import
import unittest
from mercurial.thirdparty import (
cbor,
)
from mercurial.utils import (
cborutil,
)
class TestCase(unittest.TestCase):
if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
# Python 3.7 deprecates the regex*p* version, but 2.7 lacks
# the regex version.
assertRaisesRegex = (# camelcase-required
unittest.TestCase.assertRaisesRegexp)
def loadit(it):
return cbor.loads(b''.join(it))
class BytestringTests(TestCase):
def testsimple(self):
self.assertEqual(
list(cborutil.streamencode(b'foobar')),
[b'\x46', b'foobar'])
self.assertEqual(
loadit(cborutil.streamencode(b'foobar')),
b'foobar')
self.assertEqual(cborutil.decodeall(b'\x46foobar'),
[b'foobar'])
self.assertEqual(cborutil.decodeall(b'\x46foobar\x45fizbi'),
[b'foobar', b'fizbi'])
def testlong(self):
source = b'x' * 1048576
self.assertEqual(loadit(cborutil.streamencode(source)), source)
encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cborutil.decodeall(encoded), [source])
def testfromiter(self):
# This is the example from RFC 7049 Section 2.2.2.
source = [b'\xaa\xbb\xcc\xdd', b'\xee\xff\x99']
self.assertEqual(
list(cborutil.streamencodebytestringfromiter(source)),
[
b'\x5f',
b'\x44',
b'\xaa\xbb\xcc\xdd',
b'\x43',
b'\xee\xff\x99',
b'\xff',
])
self.assertEqual(
loadit(cborutil.streamencodebytestringfromiter(source)),
b''.join(source))
self.assertEqual(cborutil.decodeall(b'\x5f\x44\xaa\xbb\xcc\xdd'
b'\x43\xee\xff\x99\xff'),
[b'\xaa\xbb\xcc\xdd', b'\xee\xff\x99', b''])
for i, chunk in enumerate(
cborutil.decodeall(b'\x5f\x44\xaa\xbb\xcc\xdd'
b'\x43\xee\xff\x99\xff')):
self.assertIsInstance(chunk, cborutil.bytestringchunk)
if i == 0:
self.assertTrue(chunk.isfirst)
else:
self.assertFalse(chunk.isfirst)
if i == 2:
self.assertTrue(chunk.islast)
else:
self.assertFalse(chunk.islast)
def testfromiterlarge(self):
source = [b'a' * 16, b'b' * 128, b'c' * 1024, b'd' * 1048576]
self.assertEqual(
loadit(cborutil.streamencodebytestringfromiter(source)),
b''.join(source))
def testindefinite(self):
source = b'\x00\x01\x02\x03' + b'\xff' * 16384
it = cborutil.streamencodeindefinitebytestring(source, chunksize=2)
self.assertEqual(next(it), b'\x5f')
self.assertEqual(next(it), b'\x42')
self.assertEqual(next(it), b'\x00\x01')
self.assertEqual(next(it), b'\x42')
self.assertEqual(next(it), b'\x02\x03')
self.assertEqual(next(it), b'\x42')
self.assertEqual(next(it), b'\xff\xff')
dest = b''.join(cborutil.streamencodeindefinitebytestring(
source, chunksize=42))
self.assertEqual(cbor.loads(dest), source)
self.assertEqual(b''.join(cborutil.decodeall(dest)), source)
for chunk in cborutil.decodeall(dest):
self.assertIsInstance(chunk, cborutil.bytestringchunk)
self.assertIn(len(chunk), (0, 8, 42))
encoded = b'\x5f\xff'
b = cborutil.decodeall(encoded)
self.assertEqual(b, [b''])
self.assertTrue(b[0].isfirst)
self.assertTrue(b[0].islast)
def testdecodevariouslengths(self):
for i in (0, 1, 22, 23, 24, 25, 254, 255, 256, 65534, 65535, 65536):
source = b'x' * i
encoded = b''.join(cborutil.streamencode(source))
if len(source) < 24:
hlen = 1
elif len(source) < 256:
hlen = 2
elif len(source) < 65536:
hlen = 3
elif len(source) < 1048576:
hlen = 5
self.assertEqual(cborutil.decodeitem(encoded),
(True, source, hlen + len(source),
cborutil.SPECIAL_NONE))
def testpartialdecode(self):
encoded = b''.join(cborutil.streamencode(b'foobar'))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -6, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(False, None, -5, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:3]),
(False, None, -4, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:4]),
(False, None, -3, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:5]),
(False, None, -2, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:6]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:7]),
(True, b'foobar', 7, cborutil.SPECIAL_NONE))
def testpartialdecodevariouslengths(self):
lens = [
2,
3,
10,
23,
24,
25,
31,
100,
254,
255,
256,
257,
16384,
65534,
65535,
65536,
65537,
131071,
131072,
131073,
1048575,
1048576,
1048577,
]
for size in lens:
if size < 24:
hlen = 1
elif size < 2**8:
hlen = 2
elif size < 2**16:
hlen = 3
elif size < 2**32:
hlen = 5
else:
assert False
source = b'x' * size
encoded = b''.join(cborutil.streamencode(source))
res = cborutil.decodeitem(encoded[0:1])
if hlen > 1:
self.assertEqual(res, (False, None, -(hlen - 1),
cborutil.SPECIAL_NONE))
else:
self.assertEqual(res, (False, None, -(size + hlen - 1),
cborutil.SPECIAL_NONE))
# Decoding partial header reports remaining header size.
for i in range(hlen - 1):
self.assertEqual(cborutil.decodeitem(encoded[0:i + 1]),
(False, None, -(hlen - i - 1),
cborutil.SPECIAL_NONE))
# Decoding complete header reports item size.
self.assertEqual(cborutil.decodeitem(encoded[0:hlen]),
(False, None, -size, cborutil.SPECIAL_NONE))
# Decoding single byte after header reports item size - 1
self.assertEqual(cborutil.decodeitem(encoded[0:hlen + 1]),
(False, None, -(size - 1), cborutil.SPECIAL_NONE))
# Decoding all but the last byte reports -1 needed.
self.assertEqual(cborutil.decodeitem(encoded[0:hlen + size - 1]),
(False, None, -1, cborutil.SPECIAL_NONE))
# Decoding last byte retrieves value.
self.assertEqual(cborutil.decodeitem(encoded[0:hlen + size]),
(True, source, hlen + size, cborutil.SPECIAL_NONE))
def testindefinitepartialdecode(self):
encoded = b''.join(cborutil.streamencodebytestringfromiter(
[b'foobar', b'biz']))
# First item should be begin of bytestring special.
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(True, None, 1,
cborutil.SPECIAL_START_INDEFINITE_BYTESTRING))
# Second item should be the first chunk. But only available when
# we give it 7 bytes (1 byte header + 6 byte chunk).
self.assertEqual(cborutil.decodeitem(encoded[1:2]),
(False, None, -6, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[1:3]),
(False, None, -5, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[1:4]),
(False, None, -4, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[1:5]),
(False, None, -3, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[1:6]),
(False, None, -2, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[1:7]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[1:8]),
(True, b'foobar', 7, cborutil.SPECIAL_NONE))
# Third item should be second chunk. But only available when
# we give it 4 bytes (1 byte header + 3 byte chunk).
self.assertEqual(cborutil.decodeitem(encoded[8:9]),
(False, None, -3, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[8:10]),
(False, None, -2, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[8:11]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[8:12]),
(True, b'biz', 4, cborutil.SPECIAL_NONE))
# Fourth item should be end of indefinite stream marker.
self.assertEqual(cborutil.decodeitem(encoded[12:13]),
(True, None, 1, cborutil.SPECIAL_INDEFINITE_BREAK))
# Now test the behavior when going through the decoder.
self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:1]),
(False, 1, 0))
self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:2]),
(False, 1, 6))
self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:3]),
(False, 1, 5))
self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:4]),
(False, 1, 4))
self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:5]),
(False, 1, 3))
self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:6]),
(False, 1, 2))
self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:7]),
(False, 1, 1))
self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:8]),
(True, 8, 0))
self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:9]),
(True, 8, 3))
self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:10]),
(True, 8, 2))
self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:11]),
(True, 8, 1))
self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:12]),
(True, 12, 0))
self.assertEqual(cborutil.sansiodecoder().decode(encoded[0:13]),
(True, 13, 0))
decoder = cborutil.sansiodecoder()
decoder.decode(encoded[0:8])
values = decoder.getavailable()
self.assertEqual(values, [b'foobar'])
self.assertTrue(values[0].isfirst)
self.assertFalse(values[0].islast)
self.assertEqual(decoder.decode(encoded[8:12]),
(True, 4, 0))
values = decoder.getavailable()
self.assertEqual(values, [b'biz'])
self.assertFalse(values[0].isfirst)
self.assertFalse(values[0].islast)
self.assertEqual(decoder.decode(encoded[12:]),
(True, 1, 0))
values = decoder.getavailable()
self.assertEqual(values, [b''])
self.assertFalse(values[0].isfirst)
self.assertTrue(values[0].islast)
class StringTests(TestCase):
def testdecodeforbidden(self):
encoded = b'\x63foo'
with self.assertRaisesRegex(cborutil.CBORDecodeError,
'string major type not supported'):
cborutil.decodeall(encoded)
class IntTests(TestCase):
def testsmall(self):
self.assertEqual(list(cborutil.streamencode(0)), [b'\x00'])
self.assertEqual(cborutil.decodeall(b'\x00'), [0])
self.assertEqual(list(cborutil.streamencode(1)), [b'\x01'])
self.assertEqual(cborutil.decodeall(b'\x01'), [1])
self.assertEqual(list(cborutil.streamencode(2)), [b'\x02'])
self.assertEqual(cborutil.decodeall(b'\x02'), [2])
self.assertEqual(list(cborutil.streamencode(3)), [b'\x03'])
self.assertEqual(cborutil.decodeall(b'\x03'), [3])
self.assertEqual(list(cborutil.streamencode(4)), [b'\x04'])
self.assertEqual(cborutil.decodeall(b'\x04'), [4])
# Multiple value decode works.
self.assertEqual(cborutil.decodeall(b'\x00\x01\x02\x03\x04'),
[0, 1, 2, 3, 4])
def testnegativesmall(self):
self.assertEqual(list(cborutil.streamencode(-1)), [b'\x20'])
self.assertEqual(cborutil.decodeall(b'\x20'), [-1])
self.assertEqual(list(cborutil.streamencode(-2)), [b'\x21'])
self.assertEqual(cborutil.decodeall(b'\x21'), [-2])
self.assertEqual(list(cborutil.streamencode(-3)), [b'\x22'])
self.assertEqual(cborutil.decodeall(b'\x22'), [-3])
self.assertEqual(list(cborutil.streamencode(-4)), [b'\x23'])
self.assertEqual(cborutil.decodeall(b'\x23'), [-4])
self.assertEqual(list(cborutil.streamencode(-5)), [b'\x24'])
self.assertEqual(cborutil.decodeall(b'\x24'), [-5])
# Multiple value decode works.
self.assertEqual(cborutil.decodeall(b'\x20\x21\x22\x23\x24'),
[-1, -2, -3, -4, -5])
def testrange(self):
for i in range(-70000, 70000, 10):
encoded = b''.join(cborutil.streamencode(i))
self.assertEqual(encoded, cbor.dumps(i))
self.assertEqual(cborutil.decodeall(encoded), [i])
def testdecodepartialubyte(self):
encoded = b''.join(cborutil.streamencode(250))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(True, 250, 2, cborutil.SPECIAL_NONE))
def testdecodepartialbyte(self):
encoded = b''.join(cborutil.streamencode(-42))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(True, -42, 2, cborutil.SPECIAL_NONE))
def testdecodepartialushort(self):
encoded = b''.join(cborutil.streamencode(2**15))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -2, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:5]),
(True, 2**15, 3, cborutil.SPECIAL_NONE))
def testdecodepartialshort(self):
encoded = b''.join(cborutil.streamencode(-1024))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -2, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:3]),
(True, -1024, 3, cborutil.SPECIAL_NONE))
def testdecodepartialulong(self):
encoded = b''.join(cborutil.streamencode(2**28))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -4, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(False, None, -3, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:3]),
(False, None, -2, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:4]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:5]),
(True, 2**28, 5, cborutil.SPECIAL_NONE))
def testdecodepartiallong(self):
encoded = b''.join(cborutil.streamencode(-1048580))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -4, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(False, None, -3, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:3]),
(False, None, -2, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:4]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:5]),
(True, -1048580, 5, cborutil.SPECIAL_NONE))
def testdecodepartialulonglong(self):
encoded = b''.join(cborutil.streamencode(2**32))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -8, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(False, None, -7, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:3]),
(False, None, -6, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:4]),
(False, None, -5, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:5]),
(False, None, -4, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:6]),
(False, None, -3, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:7]),
(False, None, -2, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:8]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:9]),
(True, 2**32, 9, cborutil.SPECIAL_NONE))
with self.assertRaisesRegex(
cborutil.CBORDecodeError, 'input data not fully consumed'):
cborutil.decodeall(encoded[0:1])
with self.assertRaisesRegex(
cborutil.CBORDecodeError, 'input data not fully consumed'):
cborutil.decodeall(encoded[0:2])
def testdecodepartiallonglong(self):
encoded = b''.join(cborutil.streamencode(-7000000000))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -8, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(False, None, -7, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:3]),
(False, None, -6, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:4]),
(False, None, -5, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:5]),
(False, None, -4, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:6]),
(False, None, -3, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:7]),
(False, None, -2, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:8]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:9]),
(True, -7000000000, 9, cborutil.SPECIAL_NONE))
class ArrayTests(TestCase):
def testempty(self):
self.assertEqual(list(cborutil.streamencode([])), [b'\x80'])
self.assertEqual(loadit(cborutil.streamencode([])), [])
self.assertEqual(cborutil.decodeall(b'\x80'), [[]])
def testbasic(self):
source = [b'foo', b'bar', 1, -10]
chunks = [
b'\x84', b'\x43', b'foo', b'\x43', b'bar', b'\x01', b'\x29']
self.assertEqual(list(cborutil.streamencode(source)), chunks)
self.assertEqual(cborutil.decodeall(b''.join(chunks)), [source])
def testemptyfromiter(self):
self.assertEqual(b''.join(cborutil.streamencodearrayfromiter([])),
b'\x9f\xff')
with self.assertRaisesRegex(cborutil.CBORDecodeError,
'indefinite length uint not allowed'):
cborutil.decodeall(b'\x9f\xff')
def testfromiter1(self):
source = [b'foo']
self.assertEqual(list(cborutil.streamencodearrayfromiter(source)), [
b'\x9f',
b'\x43', b'foo',
b'\xff',
])
dest = b''.join(cborutil.streamencodearrayfromiter(source))
self.assertEqual(cbor.loads(dest), source)
with self.assertRaisesRegex(cborutil.CBORDecodeError,
'indefinite length uint not allowed'):
cborutil.decodeall(dest)
def testtuple(self):
source = (b'foo', None, 42)
encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cbor.loads(encoded), list(source))
self.assertEqual(cborutil.decodeall(encoded), [list(source)])
def testpartialdecode(self):
source = list(range(4))
encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(True, 4, 1, cborutil.SPECIAL_START_ARRAY))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(True, 4, 1, cborutil.SPECIAL_START_ARRAY))
source = list(range(23))
encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(True, 23, 1, cborutil.SPECIAL_START_ARRAY))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(True, 23, 1, cborutil.SPECIAL_START_ARRAY))
source = list(range(24))
encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(True, 24, 2, cborutil.SPECIAL_START_ARRAY))
self.assertEqual(cborutil.decodeitem(encoded[0:3]),
(True, 24, 2, cborutil.SPECIAL_START_ARRAY))
source = list(range(256))
encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -2, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:3]),
(True, 256, 3, cborutil.SPECIAL_START_ARRAY))
self.assertEqual(cborutil.decodeitem(encoded[0:4]),
(True, 256, 3, cborutil.SPECIAL_START_ARRAY))
def testnested(self):
source = [[], [], [[], [], []]]
encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cborutil.decodeall(encoded), [source])
source = [True, None, [True, 0, 2], [None], [], [[[]], -87]]
encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cborutil.decodeall(encoded), [source])
# A set within an array.
source = [None, {b'foo', b'bar', None, False}, set()]
encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cborutil.decodeall(encoded), [source])
# A map within an array.
source = [None, {}, {b'foo': b'bar', True: False}, [{}]]
encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cborutil.decodeall(encoded), [source])
def testindefinitebytestringvalues(self):
# Single value array whose value is an empty indefinite bytestring.
encoded = b'\x81\x5f\x40\xff'
with self.assertRaisesRegex(cborutil.CBORDecodeError,
'indefinite length bytestrings not '
'allowed as array values'):
cborutil.decodeall(encoded)
class SetTests(TestCase):
def testempty(self):
self.assertEqual(list(cborutil.streamencode(set())), [
b'\xd9\x01\x02',
b'\x80',
])
self.assertEqual(cborutil.decodeall(b'\xd9\x01\x02\x80'), [set()])
def testset(self):
source = {b'foo', None, 42}
encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cbor.loads(encoded), source)
self.assertEqual(cborutil.decodeall(encoded), [source])
def testinvalidtag(self):
# Must use array to encode sets.
encoded = b'\xd9\x01\x02\xa0'
with self.assertRaisesRegex(cborutil.CBORDecodeError,
'expected array after finite set '
'semantic tag'):
cborutil.decodeall(encoded)
def testpartialdecode(self):
# Semantic tag item will be 3 bytes. Set header will be variable
# depending on length.
encoded = b''.join(cborutil.streamencode({i for i in range(23)}))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -2, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:3]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:4]),
(True, 23, 4, cborutil.SPECIAL_START_SET))
self.assertEqual(cborutil.decodeitem(encoded[0:5]),
(True, 23, 4, cborutil.SPECIAL_START_SET))
encoded = b''.join(cborutil.streamencode({i for i in range(24)}))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -2, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:3]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:4]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:5]),
(True, 24, 5, cborutil.SPECIAL_START_SET))
self.assertEqual(cborutil.decodeitem(encoded[0:6]),
(True, 24, 5, cborutil.SPECIAL_START_SET))
encoded = b''.join(cborutil.streamencode({i for i in range(256)}))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -2, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:3]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:4]),
(False, None, -2, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:5]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:6]),
(True, 256, 6, cborutil.SPECIAL_START_SET))
def testinvalidvalue(self):
encoded = b''.join([
b'\xd9\x01\x02', # semantic tag
b'\x81', # array of size 1
b'\x5f\x43foo\xff', # indefinite length bytestring "foo"
])
with self.assertRaisesRegex(cborutil.CBORDecodeError,
'indefinite length bytestrings not '
'allowed as set values'):
cborutil.decodeall(encoded)
encoded = b''.join([
b'\xd9\x01\x02',
b'\x81',
b'\x80', # empty array
])
with self.assertRaisesRegex(cborutil.CBORDecodeError,
'collections not allowed as set values'):
cborutil.decodeall(encoded)
encoded = b''.join([
b'\xd9\x01\x02',
b'\x81',
b'\xa0', # empty map
])
with self.assertRaisesRegex(cborutil.CBORDecodeError,
'collections not allowed as set values'):
cborutil.decodeall(encoded)
encoded = b''.join([
b'\xd9\x01\x02',
b'\x81',
b'\xd9\x01\x02\x81\x01', # set with integer 1
])
with self.assertRaisesRegex(cborutil.CBORDecodeError,
'collections not allowed as set values'):
cborutil.decodeall(encoded)
class BoolTests(TestCase):
def testbasic(self):
self.assertEqual(list(cborutil.streamencode(True)), [b'\xf5'])
self.assertEqual(list(cborutil.streamencode(False)), [b'\xf4'])
self.assertIs(loadit(cborutil.streamencode(True)), True)
self.assertIs(loadit(cborutil.streamencode(False)), False)
self.assertEqual(cborutil.decodeall(b'\xf4'), [False])
self.assertEqual(cborutil.decodeall(b'\xf5'), [True])
self.assertEqual(cborutil.decodeall(b'\xf4\xf5\xf5\xf4'),
[False, True, True, False])
class NoneTests(TestCase):
def testbasic(self):
self.assertEqual(list(cborutil.streamencode(None)), [b'\xf6'])
self.assertIs(loadit(cborutil.streamencode(None)), None)
self.assertEqual(cborutil.decodeall(b'\xf6'), [None])
self.assertEqual(cborutil.decodeall(b'\xf6\xf6'), [None, None])
class MapTests(TestCase):
def testempty(self):
self.assertEqual(list(cborutil.streamencode({})), [b'\xa0'])
self.assertEqual(loadit(cborutil.streamencode({})), {})
self.assertEqual(cborutil.decodeall(b'\xa0'), [{}])
def testemptyindefinite(self):
self.assertEqual(list(cborutil.streamencodemapfromiter([])), [
b'\xbf', b'\xff'])
self.assertEqual(loadit(cborutil.streamencodemapfromiter([])), {})
with self.assertRaisesRegex(cborutil.CBORDecodeError,
'indefinite length uint not allowed'):
cborutil.decodeall(b'\xbf\xff')
def testone(self):
source = {b'foo': b'bar'}
self.assertEqual(list(cborutil.streamencode(source)), [
b'\xa1', b'\x43', b'foo', b'\x43', b'bar'])
self.assertEqual(loadit(cborutil.streamencode(source)), source)
self.assertEqual(cborutil.decodeall(b'\xa1\x43foo\x43bar'), [source])
def testmultiple(self):
source = {
b'foo': b'bar',
b'baz': b'value1',
}
self.assertEqual(loadit(cborutil.streamencode(source)), source)
self.assertEqual(
loadit(cborutil.streamencodemapfromiter(source.items())),
source)
encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cborutil.decodeall(encoded), [source])
def testcomplex(self):
source = {
b'key': 1,
2: -10,
}
self.assertEqual(loadit(cborutil.streamencode(source)),
source)
self.assertEqual(
loadit(cborutil.streamencodemapfromiter(source.items())),
source)
encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cborutil.decodeall(encoded), [source])
def testnested(self):
source = {b'key1': None, b'key2': {b'sub1': b'sub2'}, b'sub2': {}}
encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cborutil.decodeall(encoded), [source])
source = {
b'key1': [],
b'key2': [None, False],
b'key3': {b'foo', b'bar'},
b'key4': {},
}
encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cborutil.decodeall(encoded), [source])
def testillegalkey(self):
encoded = b''.join([
# map header + len 1
b'\xa1',
# indefinite length bytestring "foo" in key position
b'\x5f\x03foo\xff'
])
with self.assertRaisesRegex(cborutil.CBORDecodeError,
'indefinite length bytestrings not '
'allowed as map keys'):
cborutil.decodeall(encoded)
encoded = b''.join([
b'\xa1',
b'\x80', # empty array
b'\x43foo',
])
with self.assertRaisesRegex(cborutil.CBORDecodeError,
'collections not supported as map keys'):
cborutil.decodeall(encoded)
def testillegalvalue(self):
encoded = b''.join([
b'\xa1', # map headers
b'\x43foo', # key
b'\x5f\x03bar\xff', # indefinite length value
])
with self.assertRaisesRegex(cborutil.CBORDecodeError,
'indefinite length bytestrings not '
'allowed as map values'):
cborutil.decodeall(encoded)
def testpartialdecode(self):
source = {b'key1': b'value1'}
encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(True, 1, 1, cborutil.SPECIAL_START_MAP))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(True, 1, 1, cborutil.SPECIAL_START_MAP))
source = {b'key%d' % i: None for i in range(23)}
encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(True, 23, 1, cborutil.SPECIAL_START_MAP))
source = {b'key%d' % i: None for i in range(24)}
encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(True, 24, 2, cborutil.SPECIAL_START_MAP))
self.assertEqual(cborutil.decodeitem(encoded[0:3]),
(True, 24, 2, cborutil.SPECIAL_START_MAP))
source = {b'key%d' % i: None for i in range(256)}
encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -2, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:3]),
(True, 256, 3, cborutil.SPECIAL_START_MAP))
self.assertEqual(cborutil.decodeitem(encoded[0:4]),
(True, 256, 3, cborutil.SPECIAL_START_MAP))
source = {b'key%d' % i: None for i in range(65536)}
encoded = b''.join(cborutil.streamencode(source))
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -4, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(False, None, -3, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:3]),
(False, None, -2, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:4]),
(False, None, -1, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:5]),
(True, 65536, 5, cborutil.SPECIAL_START_MAP))
self.assertEqual(cborutil.decodeitem(encoded[0:6]),
(True, 65536, 5, cborutil.SPECIAL_START_MAP))
class SemanticTagTests(TestCase):
def testdecodeforbidden(self):
for i in range(500):
if i == cborutil.SEMANTIC_TAG_FINITE_SET:
continue
tag = cborutil.encodelength(cborutil.MAJOR_TYPE_SEMANTIC,
i)
encoded = tag + cborutil.encodelength(cborutil.MAJOR_TYPE_UINT, 42)
# Partial decode is incomplete.
if i < 24:
pass
elif i < 256:
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -1, cborutil.SPECIAL_NONE))
elif i < 65536:
self.assertEqual(cborutil.decodeitem(encoded[0:1]),
(False, None, -2, cborutil.SPECIAL_NONE))
self.assertEqual(cborutil.decodeitem(encoded[0:2]),
(False, None, -1, cborutil.SPECIAL_NONE))
with self.assertRaisesRegex(cborutil.CBORDecodeError,
'semantic tag \d+ not allowed'):
cborutil.decodeitem(encoded)
class SpecialTypesTests(TestCase):
def testforbiddentypes(self):
for i in range(256):
if i == cborutil.SUBTYPE_FALSE:
continue
elif i == cborutil.SUBTYPE_TRUE:
continue
elif i == cborutil.SUBTYPE_NULL:
continue
encoded = cborutil.encodelength(cborutil.MAJOR_TYPE_SPECIAL, i)
with self.assertRaisesRegex(cborutil.CBORDecodeError,
'special type \d+ not allowed'):
cborutil.decodeitem(encoded)
class SansIODecoderTests(TestCase):
def testemptyinput(self):
decoder = cborutil.sansiodecoder()
self.assertEqual(decoder.decode(b''), (False, 0, 0))
class BufferingDecoderTests(TestCase):
def testsimple(self):
source = [
b'foobar',
b'x' * 128,
{b'foo': b'bar'},
True,
False,
None,
[None for i in range(128)],
]
encoded = b''.join(cborutil.streamencode(source))
for step in range(1, 32):
decoder = cborutil.bufferingdecoder()
start = 0
while start < len(encoded):
decoder.decode(encoded[start:start + step])
start += step
self.assertEqual(decoder.getavailable(), [source])
class DecodeallTests(TestCase):
def testemptyinput(self):
self.assertEqual(cborutil.decodeall(b''), [])
def testpartialinput(self):
encoded = b''.join([
b'\x82', # array of 2 elements
b'\x01', # integer 1
])
with self.assertRaisesRegex(cborutil.CBORDecodeError,
'input data not complete'):
cborutil.decodeall(encoded)
if __name__ == '__main__':
import silenttestrunner
silenttestrunner.main(__name__)