cborutil: change buffering strategy
Profiling revealed that we were spending a lot of time on the
line that was concatenating the old buffer with the incoming data
when attempting to decode long byte strings, such as manifest
revisions.
Essentially, we were feeding N chunks of size len(X) << len(Y) into
decode() and continuously allocating a new, larger buffer to hold
the undecoded input. This created substantial memory churn and
slowed down execution.
Changing the code to aggregate pending chunks in a list until we
have enough data to fully decode the next atom makes things much
more efficient.
I don't have exact data, but I recall the old code spending >1s
on manifest fulltexts from the mozilla-unified repo. The new code
doesn't significantly appear in profile output.
Differential Revision: https://phab.mercurial-scm.org/D4854
--- a/mercurial/utils/cborutil.py Wed Oct 03 10:27:44 2018 -0700
+++ b/mercurial/utils/cborutil.py Wed Oct 03 09:43:01 2018 -0700
@@ -913,7 +913,8 @@
"""
def __init__(self):
self._decoder = sansiodecoder()
- self._leftover = None
+ self._chunks = []
+ self._wanted = 0
def decode(self, b):
"""Attempt to decode bytes to CBOR values.
@@ -924,19 +925,40 @@
* Integer number of bytes decoded from the new input.
* Integer number of bytes wanted to decode the next value.
"""
+ # Our strategy for buffering is to aggregate the incoming chunks in a
+ # list until we've received enough data to decode the next item.
+ # This is slightly more complicated than using an ``io.BytesIO``
+ # or continuously concatenating incoming data. However, because it
+ # isn't constantly reallocating backing memory for a growing buffer,
+ # it prevents excessive memory thrashing and is significantly faster,
+ # especially in cases where the percentage of input chunks that don't
+ # decode into a full item is high.
- if self._leftover:
- oldlen = len(self._leftover)
- b = self._leftover + b
- self._leftover = None
+ if self._chunks:
+ # A previous call said we needed N bytes to decode the next item.
+ # But this call doesn't provide enough data. We buffer the incoming
+ # chunk without attempting to decode.
+ if len(b) < self._wanted:
+ self._chunks.append(b)
+ self._wanted -= len(b)
+ return False, 0, self._wanted
+
+ # Else we may have enough data to decode the next item. Aggregate
+ # old data with new and reset the buffer.
+ newlen = len(b)
+ self._chunks.append(b)
+ b = b''.join(self._chunks)
+ self._chunks = []
+ oldlen = len(b) - newlen
+
else:
- b = b
oldlen = 0
available, readcount, wanted = self._decoder.decode(b)
+ self._wanted = wanted
if readcount < len(b):
- self._leftover = b[readcount:]
+ self._chunks.append(b[readcount:])
return available, readcount - oldlen, wanted