Mercurial > hg
comparison mercurial/wireprotoframing.py @ 40138:b5bf3dd6ec5b
wireprotov2: send content encoded frames from server
Now that we have support for negotiating encodings and configuring
an encoder, we can start sending content encoded frames from the
server.
This commit teaches the wireprotov2 server code to send content
encoded frames.
On the mozilla-unified repository with zstd enabled peers, this change
reduces the total amount of data transferred from server to client
drastically:
befor: 7,190,995,812 bytes
after: 1,605,508,691 bytes
Differential Revision: https://phab.mercurial-scm.org/D4927
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Mon, 08 Oct 2018 17:24:28 -0700 |
parents | 3a6d6c54bd81 |
children | 2c55716f8a1c |
comparison
equal
deleted
inserted
replaced
40137:ed4ebbb98ca0 | 40138:b5bf3dd6ec5b |
---|---|
366 break | 366 break |
367 | 367 |
368 def createcommandresponseokframe(stream, requestid): | 368 def createcommandresponseokframe(stream, requestid): |
369 overall = b''.join(cborutil.streamencode({b'status': b'ok'})) | 369 overall = b''.join(cborutil.streamencode({b'status': b'ok'})) |
370 | 370 |
371 if stream.streamsettingssent: | |
372 overall = stream.encode(overall) | |
373 encoded = True | |
374 | |
375 if not overall: | |
376 return None | |
377 else: | |
378 encoded = False | |
379 | |
371 return stream.makeframe(requestid=requestid, | 380 return stream.makeframe(requestid=requestid, |
372 typeid=FRAME_TYPE_COMMAND_RESPONSE, | 381 typeid=FRAME_TYPE_COMMAND_RESPONSE, |
373 flags=FLAG_COMMAND_RESPONSE_CONTINUATION, | 382 flags=FLAG_COMMAND_RESPONSE_CONTINUATION, |
374 payload=overall) | 383 payload=overall, |
375 | 384 encoded=encoded) |
376 def createcommandresponseeosframe(stream, requestid): | 385 |
386 def createcommandresponseeosframes(stream, requestid, | |
387 maxframesize=DEFAULT_MAX_FRAME_SIZE): | |
377 """Create an empty payload frame representing command end-of-stream.""" | 388 """Create an empty payload frame representing command end-of-stream.""" |
378 return stream.makeframe(requestid=requestid, | 389 payload = stream.flush() |
379 typeid=FRAME_TYPE_COMMAND_RESPONSE, | 390 |
380 flags=FLAG_COMMAND_RESPONSE_EOS, | 391 offset = 0 |
381 payload=b'') | 392 while True: |
393 chunk = payload[offset:offset + maxframesize] | |
394 offset += len(chunk) | |
395 | |
396 done = offset == len(payload) | |
397 | |
398 if done: | |
399 flags = FLAG_COMMAND_RESPONSE_EOS | |
400 else: | |
401 flags = FLAG_COMMAND_RESPONSE_CONTINUATION | |
402 | |
403 yield stream.makeframe(requestid=requestid, | |
404 typeid=FRAME_TYPE_COMMAND_RESPONSE, | |
405 flags=flags, | |
406 payload=chunk, | |
407 encoded=payload != b'') | |
408 | |
409 if done: | |
410 break | |
382 | 411 |
383 def createalternatelocationresponseframe(stream, requestid, location): | 412 def createalternatelocationresponseframe(stream, requestid, location): |
384 data = { | 413 data = { |
385 b'status': b'redirect', | 414 b'status': b'redirect', |
386 b'location': { | 415 b'location': { |
393 r'servercadercerts'): | 422 r'servercadercerts'): |
394 value = getattr(location, a) | 423 value = getattr(location, a) |
395 if value is not None: | 424 if value is not None: |
396 data[b'location'][pycompat.bytestr(a)] = value | 425 data[b'location'][pycompat.bytestr(a)] = value |
397 | 426 |
427 payload = b''.join(cborutil.streamencode(data)) | |
428 | |
429 if stream.streamsettingssent: | |
430 payload = stream.encode(payload) | |
431 encoded = True | |
432 else: | |
433 encoded = False | |
434 | |
398 return stream.makeframe(requestid=requestid, | 435 return stream.makeframe(requestid=requestid, |
399 typeid=FRAME_TYPE_COMMAND_RESPONSE, | 436 typeid=FRAME_TYPE_COMMAND_RESPONSE, |
400 flags=FLAG_COMMAND_RESPONSE_CONTINUATION, | 437 flags=FLAG_COMMAND_RESPONSE_CONTINUATION, |
401 payload=b''.join(cborutil.streamencode(data))) | 438 payload=payload, |
439 encoded=encoded) | |
402 | 440 |
403 def createcommanderrorresponse(stream, requestid, message, args=None): | 441 def createcommanderrorresponse(stream, requestid, message, args=None): |
404 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom | 442 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom |
405 # formatting works consistently? | 443 # formatting works consistently? |
406 m = { | 444 m = { |
516 | 554 |
517 if data is None: | 555 if data is None: |
518 for frame in self._flush(): | 556 for frame in self._flush(): |
519 yield frame | 557 yield frame |
520 return | 558 return |
559 | |
560 data = self._stream.encode(data) | |
521 | 561 |
522 # There is a ton of potential to do more complicated things here. | 562 # There is a ton of potential to do more complicated things here. |
523 # Our immediate goal is to coalesce small chunks into big frames, | 563 # Our immediate goal is to coalesce small chunks into big frames, |
524 # not achieve the fewest number of frames possible. So we go with | 564 # not achieve the fewest number of frames possible. So we go with |
525 # a simple implementation: | 565 # a simple implementation: |
546 | 586 |
547 yield self._stream.makeframe( | 587 yield self._stream.makeframe( |
548 self._requestid, | 588 self._requestid, |
549 typeid=FRAME_TYPE_COMMAND_RESPONSE, | 589 typeid=FRAME_TYPE_COMMAND_RESPONSE, |
550 flags=FLAG_COMMAND_RESPONSE_CONTINUATION, | 590 flags=FLAG_COMMAND_RESPONSE_CONTINUATION, |
551 payload=chunk) | 591 payload=chunk, |
592 encoded=True) | |
552 | 593 |
553 if offset == len(data): | 594 if offset == len(data): |
554 return | 595 return |
555 | 596 |
556 # If we don't have enough to constitute a full frame, buffer and | 597 # If we don't have enough to constitute a full frame, buffer and |
581 | 622 |
582 yield self._stream.makeframe( | 623 yield self._stream.makeframe( |
583 self._requestid, | 624 self._requestid, |
584 typeid=FRAME_TYPE_COMMAND_RESPONSE, | 625 typeid=FRAME_TYPE_COMMAND_RESPONSE, |
585 flags=FLAG_COMMAND_RESPONSE_CONTINUATION, | 626 flags=FLAG_COMMAND_RESPONSE_CONTINUATION, |
586 payload=payload) | 627 payload=payload, |
628 encoded=True) | |
587 | 629 |
588 # TODO consider defining encoders/decoders using the util.compressionengine | 630 # TODO consider defining encoders/decoders using the util.compressionengine |
589 # mechanism. | 631 # mechanism. |
590 | 632 |
591 class identityencoder(object): | 633 class identityencoder(object): |
774 class outputstream(stream): | 816 class outputstream(stream): |
775 """Represents a stream used for sending data.""" | 817 """Represents a stream used for sending data.""" |
776 | 818 |
777 def __init__(self, streamid, active=False): | 819 def __init__(self, streamid, active=False): |
778 super(outputstream, self).__init__(streamid, active=active) | 820 super(outputstream, self).__init__(streamid, active=active) |
821 self.streamsettingssent = False | |
779 self._encoder = None | 822 self._encoder = None |
823 self._encodername = None | |
780 | 824 |
781 def setencoder(self, ui, name): | 825 def setencoder(self, ui, name): |
782 """Set the encoder for this stream. | 826 """Set the encoder for this stream. |
783 | 827 |
784 Receives the stream profile name. | 828 Receives the stream profile name. |
785 """ | 829 """ |
786 if name not in STREAM_ENCODERS: | 830 if name not in STREAM_ENCODERS: |
787 raise error.Abort(_('unknown stream encoder: %s') % name) | 831 raise error.Abort(_('unknown stream encoder: %s') % name) |
788 | 832 |
789 self._encoder = STREAM_ENCODERS[name][0](ui) | 833 self._encoder = STREAM_ENCODERS[name][0](ui) |
834 self._encodername = name | |
790 | 835 |
791 def encode(self, data): | 836 def encode(self, data): |
792 if not self._encoder: | 837 if not self._encoder: |
793 return data | 838 return data |
794 | 839 |
803 def finish(self): | 848 def finish(self): |
804 if not self._encoder: | 849 if not self._encoder: |
805 return b'' | 850 return b'' |
806 | 851 |
807 self._encoder.finish() | 852 self._encoder.finish() |
853 | |
854 def makeframe(self, requestid, typeid, flags, payload, | |
855 encoded=False): | |
856 """Create a frame to be sent out over this stream. | |
857 | |
858 Only returns the frame instance. Does not actually send it. | |
859 """ | |
860 streamflags = 0 | |
861 if not self._active: | |
862 streamflags |= STREAM_FLAG_BEGIN_STREAM | |
863 self._active = True | |
864 | |
865 if encoded: | |
866 if not self.streamsettingssent: | |
867 raise error.ProgrammingError( | |
868 b'attempting to send encoded frame without sending stream ' | |
869 b'settings') | |
870 | |
871 streamflags |= STREAM_FLAG_ENCODING_APPLIED | |
872 | |
873 if (typeid == FRAME_TYPE_STREAM_SETTINGS | |
874 and flags & FLAG_STREAM_ENCODING_SETTINGS_EOS): | |
875 self.streamsettingssent = True | |
876 | |
877 return makeframe(requestid, self.streamid, streamflags, typeid, flags, | |
878 payload) | |
879 | |
880 def makestreamsettingsframe(self, requestid): | |
881 """Create a stream settings frame for this stream. | |
882 | |
883 Returns frame data or None if no stream settings frame is needed or has | |
884 already been sent. | |
885 """ | |
886 if not self._encoder or self.streamsettingssent: | |
887 return None | |
888 | |
889 payload = b''.join(cborutil.streamencode(self._encodername)) | |
890 return self.makeframe(requestid, FRAME_TYPE_STREAM_SETTINGS, | |
891 FLAG_STREAM_ENCODING_SETTINGS_EOS, payload) | |
808 | 892 |
809 def ensureserverstream(stream): | 893 def ensureserverstream(stream): |
810 if stream.streamid % 2: | 894 if stream.streamid % 2: |
811 raise error.ProgrammingError('server should only write to even ' | 895 raise error.ProgrammingError('server should only write to even ' |
812 'numbered streams; %d is not even' % | 896 'numbered streams; %d is not even' % |
993 except StopIteration: | 1077 except StopIteration: |
994 for frame in emitter.send(None): | 1078 for frame in emitter.send(None): |
995 yield frame | 1079 yield frame |
996 | 1080 |
997 if emitted: | 1081 if emitted: |
998 yield createcommandresponseeosframe(stream, requestid) | 1082 for frame in createcommandresponseeosframes( |
1083 stream, requestid): | |
1084 yield frame | |
999 break | 1085 break |
1000 | 1086 |
1001 except error.WireprotoCommandError as e: | 1087 except error.WireprotoCommandError as e: |
1002 for frame in createcommanderrorresponse( | 1088 for frame in createcommanderrorresponse( |
1003 stream, requestid, e.message, e.messageargs): | 1089 stream, requestid, e.message, e.messageargs): |
1020 if emitted: | 1106 if emitted: |
1021 raise error.ProgrammingError( | 1107 raise error.ProgrammingError( |
1022 'alternatelocationresponse seen after initial ' | 1108 'alternatelocationresponse seen after initial ' |
1023 'output object') | 1109 'output object') |
1024 | 1110 |
1111 frame = stream.makestreamsettingsframe(requestid) | |
1112 if frame: | |
1113 yield frame | |
1114 | |
1025 yield createalternatelocationresponseframe( | 1115 yield createalternatelocationresponseframe( |
1026 stream, requestid, o) | 1116 stream, requestid, o) |
1027 | 1117 |
1028 alternatelocationsent = True | 1118 alternatelocationsent = True |
1029 emitted = True | 1119 emitted = True |
1032 if alternatelocationsent: | 1122 if alternatelocationsent: |
1033 raise error.ProgrammingError( | 1123 raise error.ProgrammingError( |
1034 'object follows alternatelocationresponse') | 1124 'object follows alternatelocationresponse') |
1035 | 1125 |
1036 if not emitted: | 1126 if not emitted: |
1037 yield createcommandresponseokframe(stream, requestid) | 1127 # Frame is optional. |
1128 frame = stream.makestreamsettingsframe(requestid) | |
1129 if frame: | |
1130 yield frame | |
1131 | |
1132 # May be None if empty frame (due to encoding). | |
1133 frame = createcommandresponseokframe(stream, requestid) | |
1134 if frame: | |
1135 yield frame | |
1136 | |
1038 emitted = True | 1137 emitted = True |
1039 | 1138 |
1040 # Objects emitted by command functions can be serializable | 1139 # Objects emitted by command functions can be serializable |
1041 # data structures or special types. | 1140 # data structures or special types. |
1042 # TODO consider extracting the content normalization to a | 1141 # TODO consider extracting the content normalization to a |
1119 self._activecommands.remove(requestid) | 1218 self._activecommands.remove(requestid) |
1120 | 1219 |
1121 return self._handlesendframes(sendframes()) | 1220 return self._handlesendframes(sendframes()) |
1122 | 1221 |
1123 def makeoutputstream(self): | 1222 def makeoutputstream(self): |
1124 """Create a stream to be used for sending data to the client.""" | 1223 """Create a stream to be used for sending data to the client. |
1224 | |
1225 If this is called before protocol settings frames are received, we | |
1226 don't know what stream encodings are supported by the client and | |
1227 we will default to identity. | |
1228 """ | |
1125 streamid = self._nextoutgoingstreamid | 1229 streamid = self._nextoutgoingstreamid |
1126 self._nextoutgoingstreamid += 2 | 1230 self._nextoutgoingstreamid += 2 |
1127 | 1231 |
1128 s = outputstream(streamid) | 1232 s = outputstream(streamid) |
1129 self._outgoingstreams[streamid] = s | 1233 self._outgoingstreams[streamid] = s |
1234 | |
1235 # Always use the *server's* preferred encoder over the client's, | |
1236 # as servers have more to lose from sub-optimal encoders being used. | |
1237 for name in STREAM_ENCODERS_ORDER: | |
1238 if name in self._sendersettings['contentencodings']: | |
1239 s.setencoder(self._ui, name) | |
1240 break | |
1130 | 1241 |
1131 return s | 1242 return s |
1132 | 1243 |
1133 def _makeerrorresult(self, msg): | 1244 def _makeerrorresult(self, msg): |
1134 return 'error', { | 1245 return 'error', { |