Mercurial > hg-stable
diff mercurial/wireprotoframing.py @ 39576:84bf6ded9317
wireprotoframing: buffer emitted data to reduce frame count
An upcoming commit introduces a wire protocol command that can emit
hundreds of thousands of small objects. Without a buffering layer,
we would emit a single, small frame for every object. Performance
profiling revealed this to be a source of significant overhead for
both client and server.
This commit introduces a very crude buffering layer so that we emit
fewer, bigger frames in such a scenario. This code will likely get
rewritten in the future to be part of the streams API, as we'll
need a similar strategy for compressing data. I don't want to think
about it too much at the moment though.
server
before: user 32.500+0.000 sys 1.160+0.000
after: user 20.230+0.010 sys 0.180+0.000
client
before: user 133.400+0.000 sys 93.120+0.000
after: user 68.370+0.000 sys 32.950+0.000
This appears to indicate we have significant overhead in the frame
processing code on both client and server. It might be worth profiling
that at some point...
Differential Revision: https://phab.mercurial-scm.org/D4473
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Wed, 29 Aug 2018 16:43:17 -0700 |
parents | 07b58266bce3 |
children | bce1c1af7518 |
line wrap: on
line diff
--- a/mercurial/wireprotoframing.py Wed Sep 05 09:06:40 2018 -0700 +++ b/mercurial/wireprotoframing.py Wed Aug 29 16:43:17 2018 -0700 @@ -511,6 +511,98 @@ flags=0, payload=payload) +class bufferingcommandresponseemitter(object): + """Helper object to emit command response frames intelligently. + + Raw command response data is likely emitted in chunks much smaller + than what can fit in a single frame. This class exists to buffer + chunks until enough data is available to fit in a single frame. + + TODO we'll need something like this when compression is supported. + So it might make sense to implement this functionality at the stream + level. + """ + def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE): + self._stream = stream + self._requestid = requestid + self._maxsize = maxframesize + self._chunks = [] + self._chunkssize = 0 + + def send(self, data): + """Send new data for emission. + + Is a generator of new frames that were derived from the new input. + + If the special input ``None`` is received, flushes all buffered + data to frames. + """ + + if data is None: + for frame in self._flush(): + yield frame + return + + # There is a ton of potential to do more complicated things here. + # Our immediate goal is to coalesce small chunks into big frames, + # not achieve the fewest number of frames possible. So we go with + # a simple implementation: + # + # * If a chunk is too large for a frame, we flush and emit frames + # for the new chunk. + # * If a chunk can be buffered without total buffered size limits + # being exceeded, we do that. + # * If a chunk causes us to go over our buffering limit, we flush + # and then buffer the new chunk. + + if len(data) > self._maxsize: + for frame in self._flush(): + yield frame + + # Now emit frames for the big chunk. + offset = 0 + while True: + chunk = data[offset:offset + self._maxsize] + offset += len(chunk) + + yield self._stream.makeframe( + self._requestid, + typeid=FRAME_TYPE_COMMAND_RESPONSE, + flags=FLAG_COMMAND_RESPONSE_CONTINUATION, + payload=chunk) + + if offset == len(data): + return + + # If we don't have enough to constitute a full frame, buffer and + # return. + if len(data) + self._chunkssize < self._maxsize: + self._chunks.append(data) + self._chunkssize += len(data) + return + + # Else flush what we have and buffer the new chunk. We could do + # something more intelligent here, like break the chunk. Let's + # keep things simple for now. + for frame in self._flush(): + yield frame + + self._chunks.append(data) + self._chunkssize = len(data) + + def _flush(self): + payload = b''.join(self._chunks) + assert len(payload) <= self._maxsize + + self._chunks[:] = [] + self._chunkssize = 0 + + yield self._stream.makeframe( + self._requestid, + typeid=FRAME_TYPE_COMMAND_RESPONSE, + flags=FLAG_COMMAND_RESPONSE_CONTINUATION, + payload=payload) + class stream(object): """Represents a logical unidirectional series of frames.""" @@ -716,10 +808,14 @@ def sendframes(): emitted = False + emitter = bufferingcommandresponseemitter(stream, requestid) while True: try: o = next(objs) except StopIteration: + for frame in emitter.send(None): + yield frame + if emitted: yield createcommandresponseeosframe(stream, requestid) break @@ -743,11 +839,10 @@ yield createcommandresponseokframe(stream, requestid) emitted = True - # TODO buffer chunks so emitted frame payloads can be - # larger. - for frame in createbytesresponseframesfromgen( - stream, requestid, cborutil.streamencode(o)): - yield frame + for chunk in cborutil.streamencode(o): + for frame in emitter.send(chunk): + yield frame + except Exception as e: for frame in createerrorframe(stream, requestid, '%s' % e,