wireprotoframing: buffer emitted data to reduce frame count
An upcoming commit introduces a wire protocol command that can emit
hundreds of thousands of small objects. Without a buffering layer,
we would emit a single, small frame for every object. Performance
profiling revealed this to be a source of significant overhead for
both client and server.
This commit introduces a very crude buffering layer so that we emit
fewer, bigger frames in such a scenario. This code will likely get
rewritten in the future to be part of the streams API, as we'll
need a similar strategy for compressing data. I don't want to think
about it too much at the moment though.
server
before: user 32.500+0.000 sys 1.160+0.000
after: user 20.230+0.010 sys 0.180+0.000
client
before: user 133.400+0.000 sys 93.120+0.000
after: user 68.370+0.000 sys 32.950+0.000
This appears to indicate we have significant overhead in the frame
processing code on both client and server. It might be worth profiling
that at some point...
Differential Revision: https://phab.mercurial-scm.org/D4473
--- a/mercurial/wireprotoframing.py Wed Sep 05 09:06:40 2018 -0700
+++ b/mercurial/wireprotoframing.py Wed Aug 29 16:43:17 2018 -0700
@@ -511,6 +511,98 @@
flags=0,
payload=payload)
+class bufferingcommandresponseemitter(object):
+ """Helper object to emit command response frames intelligently.
+
+ Raw command response data is likely emitted in chunks much smaller
+ than what can fit in a single frame. This class exists to buffer
+ chunks until enough data is available to fit in a single frame.
+
+ TODO we'll need something like this when compression is supported.
+ So it might make sense to implement this functionality at the stream
+ level.
+ """
+ def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
+ self._stream = stream
+ self._requestid = requestid
+ self._maxsize = maxframesize
+ self._chunks = []
+ self._chunkssize = 0
+
+ def send(self, data):
+ """Send new data for emission.
+
+ Is a generator of new frames that were derived from the new input.
+
+ If the special input ``None`` is received, flushes all buffered
+ data to frames.
+ """
+
+ if data is None:
+ for frame in self._flush():
+ yield frame
+ return
+
+ # There is a ton of potential to do more complicated things here.
+ # Our immediate goal is to coalesce small chunks into big frames,
+ # not achieve the fewest number of frames possible. So we go with
+ # a simple implementation:
+ #
+ # * If a chunk is too large for a frame, we flush and emit frames
+ # for the new chunk.
+ # * If a chunk can be buffered without total buffered size limits
+ # being exceeded, we do that.
+ # * If a chunk causes us to go over our buffering limit, we flush
+ # and then buffer the new chunk.
+
+ if len(data) > self._maxsize:
+ for frame in self._flush():
+ yield frame
+
+ # Now emit frames for the big chunk.
+ offset = 0
+ while True:
+ chunk = data[offset:offset + self._maxsize]
+ offset += len(chunk)
+
+ yield self._stream.makeframe(
+ self._requestid,
+ typeid=FRAME_TYPE_COMMAND_RESPONSE,
+ flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
+ payload=chunk)
+
+ if offset == len(data):
+ return
+
+ # If we don't have enough to constitute a full frame, buffer and
+ # return.
+ if len(data) + self._chunkssize < self._maxsize:
+ self._chunks.append(data)
+ self._chunkssize += len(data)
+ return
+
+ # Else flush what we have and buffer the new chunk. We could do
+ # something more intelligent here, like break the chunk. Let's
+ # keep things simple for now.
+ for frame in self._flush():
+ yield frame
+
+ self._chunks.append(data)
+ self._chunkssize = len(data)
+
+ def _flush(self):
+ payload = b''.join(self._chunks)
+ assert len(payload) <= self._maxsize
+
+ self._chunks[:] = []
+ self._chunkssize = 0
+
+ yield self._stream.makeframe(
+ self._requestid,
+ typeid=FRAME_TYPE_COMMAND_RESPONSE,
+ flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
+ payload=payload)
+
class stream(object):
"""Represents a logical unidirectional series of frames."""
@@ -716,10 +808,14 @@
def sendframes():
emitted = False
+ emitter = bufferingcommandresponseemitter(stream, requestid)
while True:
try:
o = next(objs)
except StopIteration:
+ for frame in emitter.send(None):
+ yield frame
+
if emitted:
yield createcommandresponseeosframe(stream, requestid)
break
@@ -743,11 +839,10 @@
yield createcommandresponseokframe(stream, requestid)
emitted = True
- # TODO buffer chunks so emitted frame payloads can be
- # larger.
- for frame in createbytesresponseframesfromgen(
- stream, requestid, cborutil.streamencode(o)):
- yield frame
+ for chunk in cborutil.streamencode(o):
+ for frame in emitter.send(chunk):
+ yield frame
+
except Exception as e:
for frame in createerrorframe(stream, requestid,
'%s' % e,