comparison mercurial/wireprotoframing.py @ 39560:84bf6ded9317

wireprotoframing: buffer emitted data to reduce frame count An upcoming commit introduces a wire protocol command that can emit hundreds of thousands of small objects. Without a buffering layer, we would emit a single, small frame for every object. Performance profiling revealed this to be a source of significant overhead for both client and server. This commit introduces a very crude buffering layer so that we emit fewer, bigger frames in such a scenario. This code will likely get rewritten in the future to be part of the streams API, as we'll need a similar strategy for compressing data. I don't want to think about it too much at the moment though. server before: user 32.500+0.000 sys 1.160+0.000 after: user 20.230+0.010 sys 0.180+0.000 client before: user 133.400+0.000 sys 93.120+0.000 after: user 68.370+0.000 sys 32.950+0.000 This appears to indicate we have significant overhead in the frame processing code on both client and server. It might be worth profiling that at some point... Differential Revision: https://phab.mercurial-scm.org/D4473
author Gregory Szorc <gregory.szorc@gmail.com>
date Wed, 29 Aug 2018 16:43:17 -0700
parents 07b58266bce3
children bce1c1af7518
comparison
equal deleted inserted replaced
39559:07b58266bce3 39560:84bf6ded9317
508 508
509 yield stream.makeframe(requestid=requestid, 509 yield stream.makeframe(requestid=requestid,
510 typeid=FRAME_TYPE_TEXT_OUTPUT, 510 typeid=FRAME_TYPE_TEXT_OUTPUT,
511 flags=0, 511 flags=0,
512 payload=payload) 512 payload=payload)
513
514 class bufferingcommandresponseemitter(object):
515 """Helper object to emit command response frames intelligently.
516
517 Raw command response data is likely emitted in chunks much smaller
518 than what can fit in a single frame. This class exists to buffer
519 chunks until enough data is available to fit in a single frame.
520
521 TODO we'll need something like this when compression is supported.
522 So it might make sense to implement this functionality at the stream
523 level.
524 """
525 def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
526 self._stream = stream
527 self._requestid = requestid
528 self._maxsize = maxframesize
529 self._chunks = []
530 self._chunkssize = 0
531
532 def send(self, data):
533 """Send new data for emission.
534
535 Is a generator of new frames that were derived from the new input.
536
537 If the special input ``None`` is received, flushes all buffered
538 data to frames.
539 """
540
541 if data is None:
542 for frame in self._flush():
543 yield frame
544 return
545
546 # There is a ton of potential to do more complicated things here.
547 # Our immediate goal is to coalesce small chunks into big frames,
548 # not achieve the fewest number of frames possible. So we go with
549 # a simple implementation:
550 #
551 # * If a chunk is too large for a frame, we flush and emit frames
552 # for the new chunk.
553 # * If a chunk can be buffered without total buffered size limits
554 # being exceeded, we do that.
555 # * If a chunk causes us to go over our buffering limit, we flush
556 # and then buffer the new chunk.
557
558 if len(data) > self._maxsize:
559 for frame in self._flush():
560 yield frame
561
562 # Now emit frames for the big chunk.
563 offset = 0
564 while True:
565 chunk = data[offset:offset + self._maxsize]
566 offset += len(chunk)
567
568 yield self._stream.makeframe(
569 self._requestid,
570 typeid=FRAME_TYPE_COMMAND_RESPONSE,
571 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
572 payload=chunk)
573
574 if offset == len(data):
575 return
576
577 # If we don't have enough to constitute a full frame, buffer and
578 # return.
579 if len(data) + self._chunkssize < self._maxsize:
580 self._chunks.append(data)
581 self._chunkssize += len(data)
582 return
583
584 # Else flush what we have and buffer the new chunk. We could do
585 # something more intelligent here, like break the chunk. Let's
586 # keep things simple for now.
587 for frame in self._flush():
588 yield frame
589
590 self._chunks.append(data)
591 self._chunkssize = len(data)
592
593 def _flush(self):
594 payload = b''.join(self._chunks)
595 assert len(payload) <= self._maxsize
596
597 self._chunks[:] = []
598 self._chunkssize = 0
599
600 yield self._stream.makeframe(
601 self._requestid,
602 typeid=FRAME_TYPE_COMMAND_RESPONSE,
603 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
604 payload=payload)
513 605
514 class stream(object): 606 class stream(object):
515 """Represents a logical unidirectional series of frames.""" 607 """Represents a logical unidirectional series of frames."""
516 608
517 def __init__(self, streamid, active=False): 609 def __init__(self, streamid, active=False):
714 # In all cases, when the function finishes, the request is fully 806 # In all cases, when the function finishes, the request is fully
715 # handled and no new frames for it should be seen. 807 # handled and no new frames for it should be seen.
716 808
717 def sendframes(): 809 def sendframes():
718 emitted = False 810 emitted = False
811 emitter = bufferingcommandresponseemitter(stream, requestid)
719 while True: 812 while True:
720 try: 813 try:
721 o = next(objs) 814 o = next(objs)
722 except StopIteration: 815 except StopIteration:
816 for frame in emitter.send(None):
817 yield frame
818
723 if emitted: 819 if emitted:
724 yield createcommandresponseeosframe(stream, requestid) 820 yield createcommandresponseeosframe(stream, requestid)
725 break 821 break
726 822
727 except error.WireprotoCommandError as e: 823 except error.WireprotoCommandError as e:
741 try: 837 try:
742 if not emitted: 838 if not emitted:
743 yield createcommandresponseokframe(stream, requestid) 839 yield createcommandresponseokframe(stream, requestid)
744 emitted = True 840 emitted = True
745 841
746 # TODO buffer chunks so emitted frame payloads can be 842 for chunk in cborutil.streamencode(o):
747 # larger. 843 for frame in emitter.send(chunk):
748 for frame in createbytesresponseframesfromgen( 844 yield frame
749 stream, requestid, cborutil.streamencode(o)): 845
750 yield frame
751 except Exception as e: 846 except Exception as e:
752 for frame in createerrorframe(stream, requestid, 847 for frame in createerrorframe(stream, requestid,
753 '%s' % e, 848 '%s' % e,
754 errtype='server'): 849 errtype='server'):
755 yield frame 850 yield frame