cborutil: add a buffering decoder
The sansiodecoder leaves it up to the callers to feed in data that
wasn't fully consumed last time.
This commit implements a decoder that performs buffering of
leftover chunks from the previous invocation. It otherwise
behaves identically to sansiodecoder.
Differential Revision: https://phab.mercurial-scm.org/D4434
--- a/mercurial/utils/cborutil.py Fri Aug 31 15:54:17 2018 -0700
+++ b/mercurial/utils/cborutil.py Wed Aug 29 14:29:01 2018 -0700
@@ -898,6 +898,48 @@
self._decodedvalues = []
return l
+class bufferingdecoder(object):
+ """A CBOR decoder that buffers undecoded input.
+
+ This is a glorified wrapper around ``sansiodecoder`` that adds a buffering
+ layer. All input that isn't consumed by ``sansiodecoder`` will be buffered
+ and concatenated with any new input that arrives later.
+
+ TODO consider adding limits as to the maximum amount of data that can
+ be buffered.
+ """
+ def __init__(self):
+ self._decoder = sansiodecoder()
+ self._leftover = None
+
+ def decode(self, b):
+ """Attempt to decode bytes to CBOR values.
+
+ Returns a tuple with the following fields:
+
+ * Bool indicating whether new values are available for retrieval.
+ * Integer number of bytes decoded from the new input.
+ * Integer number of bytes wanted to decode the next value.
+ """
+
+ if self._leftover:
+ oldlen = len(self._leftover)
+ b = self._leftover + b
+ self._leftover = None
+ else:
+ b = b
+ oldlen = 0
+
+ available, readcount, wanted = self._decoder.decode(b)
+
+ if readcount < len(b):
+ self._leftover = b[readcount:]
+
+ return available, readcount - oldlen, wanted
+
+ def getavailable(self):
+ return self._decoder.getavailable()
+
def decodeall(b):
"""Decode all CBOR items present in an iterable of bytes.
--- a/tests/test-cbor.py Fri Aug 31 15:54:17 2018 -0700
+++ b/tests/test-cbor.py Wed Aug 29 14:29:01 2018 -0700
@@ -941,6 +941,30 @@
decoder = cborutil.sansiodecoder()
self.assertEqual(decoder.decode(b''), (False, 0, 0))
+class BufferingDecoderTests(TestCase):
+ def testsimple(self):
+ source = [
+ b'foobar',
+ b'x' * 128,
+ {b'foo': b'bar'},
+ True,
+ False,
+ None,
+ [None for i in range(128)],
+ ]
+
+ encoded = b''.join(cborutil.streamencode(source))
+
+ for step in range(1, 32):
+ decoder = cborutil.bufferingdecoder()
+ start = 0
+
+ while start < len(encoded):
+ decoder.decode(encoded[start:start + step])
+ start += step
+
+ self.assertEqual(decoder.getavailable(), [source])
+
class DecodeallTests(TestCase):
def testemptyinput(self):
self.assertEqual(cborutil.decodeall(b''), [])