comparison mercurial/wireprotoframing.py @ 40138:b5bf3dd6ec5b

wireprotov2: send content encoded frames from server Now that we have support for negotiating encodings and configuring an encoder, we can start sending content encoded frames from the server. This commit teaches the wireprotov2 server code to send content encoded frames. On the mozilla-unified repository with zstd enabled peers, this change reduces the total amount of data transferred from server to client drastically: befor: 7,190,995,812 bytes after: 1,605,508,691 bytes Differential Revision: https://phab.mercurial-scm.org/D4927
author Gregory Szorc <gregory.szorc@gmail.com>
date Mon, 08 Oct 2018 17:24:28 -0700
parents 3a6d6c54bd81
children 2c55716f8a1c
comparison
equal deleted inserted replaced
40137:ed4ebbb98ca0 40138:b5bf3dd6ec5b
366 break 366 break
367 367
368 def createcommandresponseokframe(stream, requestid): 368 def createcommandresponseokframe(stream, requestid):
369 overall = b''.join(cborutil.streamencode({b'status': b'ok'})) 369 overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
370 370
371 if stream.streamsettingssent:
372 overall = stream.encode(overall)
373 encoded = True
374
375 if not overall:
376 return None
377 else:
378 encoded = False
379
371 return stream.makeframe(requestid=requestid, 380 return stream.makeframe(requestid=requestid,
372 typeid=FRAME_TYPE_COMMAND_RESPONSE, 381 typeid=FRAME_TYPE_COMMAND_RESPONSE,
373 flags=FLAG_COMMAND_RESPONSE_CONTINUATION, 382 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
374 payload=overall) 383 payload=overall,
375 384 encoded=encoded)
376 def createcommandresponseeosframe(stream, requestid): 385
386 def createcommandresponseeosframes(stream, requestid,
387 maxframesize=DEFAULT_MAX_FRAME_SIZE):
377 """Create an empty payload frame representing command end-of-stream.""" 388 """Create an empty payload frame representing command end-of-stream."""
378 return stream.makeframe(requestid=requestid, 389 payload = stream.flush()
379 typeid=FRAME_TYPE_COMMAND_RESPONSE, 390
380 flags=FLAG_COMMAND_RESPONSE_EOS, 391 offset = 0
381 payload=b'') 392 while True:
393 chunk = payload[offset:offset + maxframesize]
394 offset += len(chunk)
395
396 done = offset == len(payload)
397
398 if done:
399 flags = FLAG_COMMAND_RESPONSE_EOS
400 else:
401 flags = FLAG_COMMAND_RESPONSE_CONTINUATION
402
403 yield stream.makeframe(requestid=requestid,
404 typeid=FRAME_TYPE_COMMAND_RESPONSE,
405 flags=flags,
406 payload=chunk,
407 encoded=payload != b'')
408
409 if done:
410 break
382 411
383 def createalternatelocationresponseframe(stream, requestid, location): 412 def createalternatelocationresponseframe(stream, requestid, location):
384 data = { 413 data = {
385 b'status': b'redirect', 414 b'status': b'redirect',
386 b'location': { 415 b'location': {
393 r'servercadercerts'): 422 r'servercadercerts'):
394 value = getattr(location, a) 423 value = getattr(location, a)
395 if value is not None: 424 if value is not None:
396 data[b'location'][pycompat.bytestr(a)] = value 425 data[b'location'][pycompat.bytestr(a)] = value
397 426
427 payload = b''.join(cborutil.streamencode(data))
428
429 if stream.streamsettingssent:
430 payload = stream.encode(payload)
431 encoded = True
432 else:
433 encoded = False
434
398 return stream.makeframe(requestid=requestid, 435 return stream.makeframe(requestid=requestid,
399 typeid=FRAME_TYPE_COMMAND_RESPONSE, 436 typeid=FRAME_TYPE_COMMAND_RESPONSE,
400 flags=FLAG_COMMAND_RESPONSE_CONTINUATION, 437 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
401 payload=b''.join(cborutil.streamencode(data))) 438 payload=payload,
439 encoded=encoded)
402 440
403 def createcommanderrorresponse(stream, requestid, message, args=None): 441 def createcommanderrorresponse(stream, requestid, message, args=None):
404 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom 442 # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
405 # formatting works consistently? 443 # formatting works consistently?
406 m = { 444 m = {
516 554
517 if data is None: 555 if data is None:
518 for frame in self._flush(): 556 for frame in self._flush():
519 yield frame 557 yield frame
520 return 558 return
559
560 data = self._stream.encode(data)
521 561
522 # There is a ton of potential to do more complicated things here. 562 # There is a ton of potential to do more complicated things here.
523 # Our immediate goal is to coalesce small chunks into big frames, 563 # Our immediate goal is to coalesce small chunks into big frames,
524 # not achieve the fewest number of frames possible. So we go with 564 # not achieve the fewest number of frames possible. So we go with
525 # a simple implementation: 565 # a simple implementation:
546 586
547 yield self._stream.makeframe( 587 yield self._stream.makeframe(
548 self._requestid, 588 self._requestid,
549 typeid=FRAME_TYPE_COMMAND_RESPONSE, 589 typeid=FRAME_TYPE_COMMAND_RESPONSE,
550 flags=FLAG_COMMAND_RESPONSE_CONTINUATION, 590 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
551 payload=chunk) 591 payload=chunk,
592 encoded=True)
552 593
553 if offset == len(data): 594 if offset == len(data):
554 return 595 return
555 596
556 # If we don't have enough to constitute a full frame, buffer and 597 # If we don't have enough to constitute a full frame, buffer and
581 622
582 yield self._stream.makeframe( 623 yield self._stream.makeframe(
583 self._requestid, 624 self._requestid,
584 typeid=FRAME_TYPE_COMMAND_RESPONSE, 625 typeid=FRAME_TYPE_COMMAND_RESPONSE,
585 flags=FLAG_COMMAND_RESPONSE_CONTINUATION, 626 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
586 payload=payload) 627 payload=payload,
628 encoded=True)
587 629
588 # TODO consider defining encoders/decoders using the util.compressionengine 630 # TODO consider defining encoders/decoders using the util.compressionengine
589 # mechanism. 631 # mechanism.
590 632
591 class identityencoder(object): 633 class identityencoder(object):
774 class outputstream(stream): 816 class outputstream(stream):
775 """Represents a stream used for sending data.""" 817 """Represents a stream used for sending data."""
776 818
777 def __init__(self, streamid, active=False): 819 def __init__(self, streamid, active=False):
778 super(outputstream, self).__init__(streamid, active=active) 820 super(outputstream, self).__init__(streamid, active=active)
821 self.streamsettingssent = False
779 self._encoder = None 822 self._encoder = None
823 self._encodername = None
780 824
781 def setencoder(self, ui, name): 825 def setencoder(self, ui, name):
782 """Set the encoder for this stream. 826 """Set the encoder for this stream.
783 827
784 Receives the stream profile name. 828 Receives the stream profile name.
785 """ 829 """
786 if name not in STREAM_ENCODERS: 830 if name not in STREAM_ENCODERS:
787 raise error.Abort(_('unknown stream encoder: %s') % name) 831 raise error.Abort(_('unknown stream encoder: %s') % name)
788 832
789 self._encoder = STREAM_ENCODERS[name][0](ui) 833 self._encoder = STREAM_ENCODERS[name][0](ui)
834 self._encodername = name
790 835
791 def encode(self, data): 836 def encode(self, data):
792 if not self._encoder: 837 if not self._encoder:
793 return data 838 return data
794 839
803 def finish(self): 848 def finish(self):
804 if not self._encoder: 849 if not self._encoder:
805 return b'' 850 return b''
806 851
807 self._encoder.finish() 852 self._encoder.finish()
853
854 def makeframe(self, requestid, typeid, flags, payload,
855 encoded=False):
856 """Create a frame to be sent out over this stream.
857
858 Only returns the frame instance. Does not actually send it.
859 """
860 streamflags = 0
861 if not self._active:
862 streamflags |= STREAM_FLAG_BEGIN_STREAM
863 self._active = True
864
865 if encoded:
866 if not self.streamsettingssent:
867 raise error.ProgrammingError(
868 b'attempting to send encoded frame without sending stream '
869 b'settings')
870
871 streamflags |= STREAM_FLAG_ENCODING_APPLIED
872
873 if (typeid == FRAME_TYPE_STREAM_SETTINGS
874 and flags & FLAG_STREAM_ENCODING_SETTINGS_EOS):
875 self.streamsettingssent = True
876
877 return makeframe(requestid, self.streamid, streamflags, typeid, flags,
878 payload)
879
880 def makestreamsettingsframe(self, requestid):
881 """Create a stream settings frame for this stream.
882
883 Returns frame data or None if no stream settings frame is needed or has
884 already been sent.
885 """
886 if not self._encoder or self.streamsettingssent:
887 return None
888
889 payload = b''.join(cborutil.streamencode(self._encodername))
890 return self.makeframe(requestid, FRAME_TYPE_STREAM_SETTINGS,
891 FLAG_STREAM_ENCODING_SETTINGS_EOS, payload)
808 892
809 def ensureserverstream(stream): 893 def ensureserverstream(stream):
810 if stream.streamid % 2: 894 if stream.streamid % 2:
811 raise error.ProgrammingError('server should only write to even ' 895 raise error.ProgrammingError('server should only write to even '
812 'numbered streams; %d is not even' % 896 'numbered streams; %d is not even' %
993 except StopIteration: 1077 except StopIteration:
994 for frame in emitter.send(None): 1078 for frame in emitter.send(None):
995 yield frame 1079 yield frame
996 1080
997 if emitted: 1081 if emitted:
998 yield createcommandresponseeosframe(stream, requestid) 1082 for frame in createcommandresponseeosframes(
1083 stream, requestid):
1084 yield frame
999 break 1085 break
1000 1086
1001 except error.WireprotoCommandError as e: 1087 except error.WireprotoCommandError as e:
1002 for frame in createcommanderrorresponse( 1088 for frame in createcommanderrorresponse(
1003 stream, requestid, e.message, e.messageargs): 1089 stream, requestid, e.message, e.messageargs):
1020 if emitted: 1106 if emitted:
1021 raise error.ProgrammingError( 1107 raise error.ProgrammingError(
1022 'alternatelocationresponse seen after initial ' 1108 'alternatelocationresponse seen after initial '
1023 'output object') 1109 'output object')
1024 1110
1111 frame = stream.makestreamsettingsframe(requestid)
1112 if frame:
1113 yield frame
1114
1025 yield createalternatelocationresponseframe( 1115 yield createalternatelocationresponseframe(
1026 stream, requestid, o) 1116 stream, requestid, o)
1027 1117
1028 alternatelocationsent = True 1118 alternatelocationsent = True
1029 emitted = True 1119 emitted = True
1032 if alternatelocationsent: 1122 if alternatelocationsent:
1033 raise error.ProgrammingError( 1123 raise error.ProgrammingError(
1034 'object follows alternatelocationresponse') 1124 'object follows alternatelocationresponse')
1035 1125
1036 if not emitted: 1126 if not emitted:
1037 yield createcommandresponseokframe(stream, requestid) 1127 # Frame is optional.
1128 frame = stream.makestreamsettingsframe(requestid)
1129 if frame:
1130 yield frame
1131
1132 # May be None if empty frame (due to encoding).
1133 frame = createcommandresponseokframe(stream, requestid)
1134 if frame:
1135 yield frame
1136
1038 emitted = True 1137 emitted = True
1039 1138
1040 # Objects emitted by command functions can be serializable 1139 # Objects emitted by command functions can be serializable
1041 # data structures or special types. 1140 # data structures or special types.
1042 # TODO consider extracting the content normalization to a 1141 # TODO consider extracting the content normalization to a
1119 self._activecommands.remove(requestid) 1218 self._activecommands.remove(requestid)
1120 1219
1121 return self._handlesendframes(sendframes()) 1220 return self._handlesendframes(sendframes())
1122 1221
1123 def makeoutputstream(self): 1222 def makeoutputstream(self):
1124 """Create a stream to be used for sending data to the client.""" 1223 """Create a stream to be used for sending data to the client.
1224
1225 If this is called before protocol settings frames are received, we
1226 don't know what stream encodings are supported by the client and
1227 we will default to identity.
1228 """
1125 streamid = self._nextoutgoingstreamid 1229 streamid = self._nextoutgoingstreamid
1126 self._nextoutgoingstreamid += 2 1230 self._nextoutgoingstreamid += 2
1127 1231
1128 s = outputstream(streamid) 1232 s = outputstream(streamid)
1129 self._outgoingstreams[streamid] = s 1233 self._outgoingstreams[streamid] = s
1234
1235 # Always use the *server's* preferred encoder over the client's,
1236 # as servers have more to lose from sub-optimal encoders being used.
1237 for name in STREAM_ENCODERS_ORDER:
1238 if name in self._sendersettings['contentencodings']:
1239 s.setencoder(self._ui, name)
1240 break
1130 1241
1131 return s 1242 return s
1132 1243
1133 def _makeerrorresult(self, msg): 1244 def _makeerrorresult(self, msg):
1134 return 'error', { 1245 return 'error', {