646 self._requestid, |
646 self._requestid, |
647 typeid=FRAME_TYPE_COMMAND_RESPONSE, |
647 typeid=FRAME_TYPE_COMMAND_RESPONSE, |
648 flags=FLAG_COMMAND_RESPONSE_CONTINUATION, |
648 flags=FLAG_COMMAND_RESPONSE_CONTINUATION, |
649 payload=payload) |
649 payload=payload) |
650 |
650 |
|
651 # TODO consider defining encoders/decoders using the util.compressionengine |
|
652 # mechanism. |
|
653 |
|
654 class identityencoder(object): |
|
655 """Encoder for the "identity" stream encoding profile.""" |
|
656 def __init__(self, ui): |
|
657 pass |
|
658 |
|
659 def encode(self, data): |
|
660 return data |
|
661 |
|
662 def flush(self): |
|
663 return b'' |
|
664 |
|
665 def finish(self): |
|
666 return b'' |
|
667 |
|
668 class identitydecoder(object): |
|
669 """Decoder for the "identity" stream encoding profile.""" |
|
670 |
|
671 def __init__(self, ui, extraobjs): |
|
672 if extraobjs: |
|
673 raise error.Abort(_('identity decoder received unexpected ' |
|
674 'additional values')) |
|
675 |
|
676 def decode(self, data): |
|
677 return data |
|
678 |
|
679 class zlibencoder(object): |
|
680 def __init__(self, ui): |
|
681 import zlib |
|
682 self._zlib = zlib |
|
683 self._compressor = zlib.compressobj() |
|
684 |
|
685 def encode(self, data): |
|
686 return self._compressor.compress(data) |
|
687 |
|
688 def flush(self): |
|
689 # Z_SYNC_FLUSH doesn't reset compression context, which is |
|
690 # what we want. |
|
691 return self._compressor.flush(self._zlib.Z_SYNC_FLUSH) |
|
692 |
|
693 def finish(self): |
|
694 res = self._compressor.flush(self._zlib.Z_FINISH) |
|
695 self._compressor = None |
|
696 return res |
|
697 |
|
698 class zlibdecoder(object): |
|
699 def __init__(self, ui, extraobjs): |
|
700 import zlib |
|
701 |
|
702 if extraobjs: |
|
703 raise error.Abort(_('zlib decoder received unexpected ' |
|
704 'additional values')) |
|
705 |
|
706 self._decompressor = zlib.decompressobj() |
|
707 |
|
708 def decode(self, data): |
|
709 # Python 2's zlib module doesn't use the buffer protocol and can't |
|
710 # handle all bytes-like types. |
|
711 if not pycompat.ispy3 and isinstance(data, bytearray): |
|
712 data = bytes(data) |
|
713 |
|
714 return self._decompressor.decompress(data) |
|
715 |
|
716 class zstdbaseencoder(object): |
|
717 def __init__(self, level): |
|
718 from . import zstd |
|
719 |
|
720 self._zstd = zstd |
|
721 cctx = zstd.ZstdCompressor(level=level) |
|
722 self._compressor = cctx.compressobj() |
|
723 |
|
724 def encode(self, data): |
|
725 return self._compressor.compress(data) |
|
726 |
|
727 def flush(self): |
|
728 # COMPRESSOBJ_FLUSH_BLOCK flushes all data previously fed into the |
|
729 # compressor and allows a decompressor to access all encoded data |
|
730 # up to this point. |
|
731 return self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_BLOCK) |
|
732 |
|
733 def finish(self): |
|
734 res = self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_FINISH) |
|
735 self._compressor = None |
|
736 return res |
|
737 |
|
738 class zstd8mbencoder(zstdbaseencoder): |
|
739 def __init__(self, ui): |
|
740 super(zstd8mbencoder, self).__init__(3) |
|
741 |
|
742 class zstdbasedecoder(object): |
|
743 def __init__(self, maxwindowsize): |
|
744 from . import zstd |
|
745 dctx = zstd.ZstdDecompressor(max_window_size=maxwindowsize) |
|
746 self._decompressor = dctx.decompressobj() |
|
747 |
|
748 def decode(self, data): |
|
749 return self._decompressor.decompress(data) |
|
750 |
|
751 class zstd8mbdecoder(zstdbasedecoder): |
|
752 def __init__(self, ui, extraobjs): |
|
753 if extraobjs: |
|
754 raise error.Abort(_('zstd8mb decoder received unexpected ' |
|
755 'additional values')) |
|
756 |
|
757 super(zstd8mbdecoder, self).__init__(maxwindowsize=8 * 1048576) |
|
758 |
|
759 # We lazily populate this to avoid excessive module imports when importing |
|
760 # this module. |
|
761 STREAM_ENCODERS = {} |
|
762 STREAM_ENCODERS_ORDER = [] |
|
763 |
|
764 def populatestreamencoders(): |
|
765 if STREAM_ENCODERS: |
|
766 return |
|
767 |
|
768 try: |
|
769 from . import zstd |
|
770 zstd.__version__ |
|
771 except ImportError: |
|
772 zstd = None |
|
773 |
|
774 # zstandard is fastest and is preferred. |
|
775 if zstd: |
|
776 STREAM_ENCODERS[b'zstd-8mb'] = (zstd8mbencoder, zstd8mbdecoder) |
|
777 STREAM_ENCODERS_ORDER.append(b'zstd-8mb') |
|
778 |
|
779 STREAM_ENCODERS[b'zlib'] = (zlibencoder, zlibdecoder) |
|
780 STREAM_ENCODERS_ORDER.append(b'zlib') |
|
781 |
|
782 STREAM_ENCODERS[b'identity'] = (identityencoder, identitydecoder) |
|
783 STREAM_ENCODERS_ORDER.append(b'identity') |
|
784 |
651 class stream(object): |
785 class stream(object): |
652 """Represents a logical unidirectional series of frames.""" |
786 """Represents a logical unidirectional series of frames.""" |
653 |
787 |
654 def __init__(self, streamid, active=False): |
788 def __init__(self, streamid, active=False): |
655 self.streamid = streamid |
789 self.streamid = streamid |
669 payload) |
803 payload) |
670 |
804 |
671 class inputstream(stream): |
805 class inputstream(stream): |
672 """Represents a stream used for receiving data.""" |
806 """Represents a stream used for receiving data.""" |
673 |
807 |
674 def setdecoder(self, name, extraobjs): |
808 def __init__(self, streamid, active=False): |
|
809 super(inputstream, self).__init__(streamid, active=active) |
|
810 self._decoder = None |
|
811 |
|
812 def setdecoder(self, ui, name, extraobjs): |
675 """Set the decoder for this stream. |
813 """Set the decoder for this stream. |
676 |
814 |
677 Receives the stream profile name and any additional CBOR objects |
815 Receives the stream profile name and any additional CBOR objects |
678 decoded from the stream encoding settings frame payloads. |
816 decoded from the stream encoding settings frame payloads. |
679 """ |
817 """ |
|
818 if name not in STREAM_ENCODERS: |
|
819 raise error.Abort(_('unknown stream decoder: %s') % name) |
|
820 |
|
821 self._decoder = STREAM_ENCODERS[name][1](ui, extraobjs) |
|
822 |
|
823 def decode(self, data): |
|
824 # Default is identity decoder. We don't bother instantiating one |
|
825 # because it is trivial. |
|
826 if not self._decoder: |
|
827 return data |
|
828 |
|
829 return self._decoder.decode(data) |
|
830 |
|
831 def flush(self): |
|
832 if not self._decoder: |
|
833 return b'' |
|
834 |
|
835 return self._decoder.flush() |
680 |
836 |
681 class outputstream(stream): |
837 class outputstream(stream): |
682 """Represents a stream used for sending data.""" |
838 """Represents a stream used for sending data.""" |
|
839 |
|
840 def __init__(self, streamid, active=False): |
|
841 super(outputstream, self).__init__(streamid, active=active) |
|
842 self._encoder = None |
|
843 |
|
844 def setencoder(self, ui, name): |
|
845 """Set the encoder for this stream. |
|
846 |
|
847 Receives the stream profile name. |
|
848 """ |
|
849 if name not in STREAM_ENCODERS: |
|
850 raise error.Abort(_('unknown stream encoder: %s') % name) |
|
851 |
|
852 self._encoder = STREAM_ENCODERS[name][0](ui) |
|
853 |
|
854 def encode(self, data): |
|
855 if not self._encoder: |
|
856 return data |
|
857 |
|
858 return self._encoder.encode(data) |
|
859 |
|
860 def flush(self): |
|
861 if not self._encoder: |
|
862 return b'' |
|
863 |
|
864 return self._encoder.flush() |
|
865 |
|
866 def finish(self): |
|
867 if not self._encoder: |
|
868 return b'' |
|
869 |
|
870 self._encoder.finish() |
683 |
871 |
684 def ensureserverstream(stream): |
872 def ensureserverstream(stream): |
685 if stream.streamid % 2: |
873 if stream.streamid % 2: |
686 raise error.ProgrammingError('server should only write to even ' |
874 raise error.ProgrammingError('server should only write to even ' |
687 'numbered streams; %d is not even' % |
875 'numbered streams; %d is not even' % |
784 self._protocolsettingsdecoder = None |
972 self._protocolsettingsdecoder = None |
785 |
973 |
786 # Sender protocol settings are optional. Set implied default values. |
974 # Sender protocol settings are optional. Set implied default values. |
787 self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS) |
975 self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS) |
788 |
976 |
|
977 populatestreamencoders() |
|
978 |
789 def onframerecv(self, frame): |
979 def onframerecv(self, frame): |
790 """Process a frame that has been received off the wire. |
980 """Process a frame that has been received off the wire. |
791 |
981 |
792 Returns a dict with an ``action`` key that details what action, |
982 Returns a dict with an ``action`` key that details what action, |
793 if any, the consumer should take next. |
983 if any, the consumer should take next. |
1382 self._pendingrequests = collections.deque() |
1572 self._pendingrequests = collections.deque() |
1383 self._activerequests = {} |
1573 self._activerequests = {} |
1384 self._incomingstreams = {} |
1574 self._incomingstreams = {} |
1385 self._streamsettingsdecoders = {} |
1575 self._streamsettingsdecoders = {} |
1386 |
1576 |
|
1577 populatestreamencoders() |
|
1578 |
1387 def callcommand(self, name, args, datafh=None, redirect=None): |
1579 def callcommand(self, name, args, datafh=None, redirect=None): |
1388 """Request that a command be executed. |
1580 """Request that a command be executed. |
1389 |
1581 |
1390 Receives the command name, a dict of arguments to pass to the command, |
1582 Receives the command name, a dict of arguments to pass to the command, |
1391 and an optional file object containing the raw data for the command. |
1583 and an optional file object containing the raw data for the command. |