diff tests/test-cbor.py @ 39413:babad5ebaf0a

cborutil: add a buffering decoder The sansiodecoder leaves it up to the callers to feed in data that wasn't fully consumed last time. This commit implements a decoder that performs buffering of leftover chunks from the previous invocation. It otherwise behaves identically to sansiodecoder. Differential Revision: https://phab.mercurial-scm.org/D4434
author Gregory Szorc <gregory.szorc@gmail.com>
date Wed, 29 Aug 2018 14:29:01 -0700
parents a40d3da89b7d
children b638219a23c3
line wrap: on
line diff
--- a/tests/test-cbor.py	Fri Aug 31 15:54:17 2018 -0700
+++ b/tests/test-cbor.py	Wed Aug 29 14:29:01 2018 -0700
@@ -941,6 +941,30 @@
         decoder = cborutil.sansiodecoder()
         self.assertEqual(decoder.decode(b''), (False, 0, 0))
 
+class BufferingDecoderTests(TestCase):
+    def testsimple(self):
+        source = [
+            b'foobar',
+            b'x' * 128,
+            {b'foo': b'bar'},
+            True,
+            False,
+            None,
+            [None for i in range(128)],
+        ]
+
+        encoded = b''.join(cborutil.streamencode(source))
+
+        for step in range(1, 32):
+            decoder = cborutil.bufferingdecoder()
+            start = 0
+
+            while start < len(encoded):
+                decoder.decode(encoded[start:start + step])
+                start += step
+
+            self.assertEqual(decoder.getavailable(), [source])
+
 class DecodeallTests(TestCase):
     def testemptyinput(self):
         self.assertEqual(cborutil.decodeall(b''), [])