mercurial/wireprotoframing.py
changeset 40132 e67522413ca8
parent 40131 5d44c4d1d516
child 40133 762ef19a07e3
equal deleted inserted replaced
40131:5d44c4d1d516 40132:e67522413ca8
   646             self._requestid,
   646             self._requestid,
   647             typeid=FRAME_TYPE_COMMAND_RESPONSE,
   647             typeid=FRAME_TYPE_COMMAND_RESPONSE,
   648             flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
   648             flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
   649             payload=payload)
   649             payload=payload)
   650 
   650 
       
   651 # TODO consider defining encoders/decoders using the util.compressionengine
       
   652 # mechanism.
       
   653 
       
   654 class identityencoder(object):
       
   655     """Encoder for the "identity" stream encoding profile."""
       
   656     def __init__(self, ui):
       
   657         pass
       
   658 
       
   659     def encode(self, data):
       
   660         return data
       
   661 
       
   662     def flush(self):
       
   663         return b''
       
   664 
       
   665     def finish(self):
       
   666         return b''
       
   667 
       
   668 class identitydecoder(object):
       
   669     """Decoder for the "identity" stream encoding profile."""
       
   670 
       
   671     def __init__(self, ui, extraobjs):
       
   672         if extraobjs:
       
   673             raise error.Abort(_('identity decoder received unexpected '
       
   674                                 'additional values'))
       
   675 
       
   676     def decode(self, data):
       
   677         return data
       
   678 
       
   679 class zlibencoder(object):
       
   680     def __init__(self, ui):
       
   681         import zlib
       
   682         self._zlib = zlib
       
   683         self._compressor = zlib.compressobj()
       
   684 
       
   685     def encode(self, data):
       
   686         return self._compressor.compress(data)
       
   687 
       
   688     def flush(self):
       
   689         # Z_SYNC_FLUSH doesn't reset compression context, which is
       
   690         # what we want.
       
   691         return self._compressor.flush(self._zlib.Z_SYNC_FLUSH)
       
   692 
       
   693     def finish(self):
       
   694         res = self._compressor.flush(self._zlib.Z_FINISH)
       
   695         self._compressor = None
       
   696         return res
       
   697 
       
   698 class zlibdecoder(object):
       
   699     def __init__(self, ui, extraobjs):
       
   700         import zlib
       
   701 
       
   702         if extraobjs:
       
   703             raise error.Abort(_('zlib decoder received unexpected '
       
   704                                 'additional values'))
       
   705 
       
   706         self._decompressor = zlib.decompressobj()
       
   707 
       
   708     def decode(self, data):
       
   709         # Python 2's zlib module doesn't use the buffer protocol and can't
       
   710         # handle all bytes-like types.
       
   711         if not pycompat.ispy3 and isinstance(data, bytearray):
       
   712             data = bytes(data)
       
   713 
       
   714         return self._decompressor.decompress(data)
       
   715 
       
   716 class zstdbaseencoder(object):
       
   717     def __init__(self, level):
       
   718         from . import zstd
       
   719 
       
   720         self._zstd = zstd
       
   721         cctx = zstd.ZstdCompressor(level=level)
       
   722         self._compressor = cctx.compressobj()
       
   723 
       
   724     def encode(self, data):
       
   725         return self._compressor.compress(data)
       
   726 
       
   727     def flush(self):
       
   728         # COMPRESSOBJ_FLUSH_BLOCK flushes all data previously fed into the
       
   729         # compressor and allows a decompressor to access all encoded data
       
   730         # up to this point.
       
   731         return self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_BLOCK)
       
   732 
       
   733     def finish(self):
       
   734         res = self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_FINISH)
       
   735         self._compressor = None
       
   736         return res
       
   737 
       
   738 class zstd8mbencoder(zstdbaseencoder):
       
   739     def __init__(self, ui):
       
   740         super(zstd8mbencoder, self).__init__(3)
       
   741 
       
   742 class zstdbasedecoder(object):
       
   743     def __init__(self, maxwindowsize):
       
   744         from . import zstd
       
   745         dctx = zstd.ZstdDecompressor(max_window_size=maxwindowsize)
       
   746         self._decompressor = dctx.decompressobj()
       
   747 
       
   748     def decode(self, data):
       
   749         return self._decompressor.decompress(data)
       
   750 
       
   751 class zstd8mbdecoder(zstdbasedecoder):
       
   752     def __init__(self, ui, extraobjs):
       
   753         if extraobjs:
       
   754             raise error.Abort(_('zstd8mb decoder received unexpected '
       
   755                                 'additional values'))
       
   756 
       
   757         super(zstd8mbdecoder, self).__init__(maxwindowsize=8 * 1048576)
       
   758 
       
   759 # We lazily populate this to avoid excessive module imports when importing
       
   760 # this module.
       
   761 STREAM_ENCODERS = {}
       
   762 STREAM_ENCODERS_ORDER = []
       
   763 
       
   764 def populatestreamencoders():
       
   765     if STREAM_ENCODERS:
       
   766         return
       
   767 
       
   768     try:
       
   769         from . import zstd
       
   770         zstd.__version__
       
   771     except ImportError:
       
   772         zstd = None
       
   773 
       
   774     # zstandard is fastest and is preferred.
       
   775     if zstd:
       
   776         STREAM_ENCODERS[b'zstd-8mb'] = (zstd8mbencoder, zstd8mbdecoder)
       
   777         STREAM_ENCODERS_ORDER.append(b'zstd-8mb')
       
   778 
       
   779     STREAM_ENCODERS[b'zlib'] = (zlibencoder, zlibdecoder)
       
   780     STREAM_ENCODERS_ORDER.append(b'zlib')
       
   781 
       
   782     STREAM_ENCODERS[b'identity'] = (identityencoder, identitydecoder)
       
   783     STREAM_ENCODERS_ORDER.append(b'identity')
       
   784 
   651 class stream(object):
   785 class stream(object):
   652     """Represents a logical unidirectional series of frames."""
   786     """Represents a logical unidirectional series of frames."""
   653 
   787 
   654     def __init__(self, streamid, active=False):
   788     def __init__(self, streamid, active=False):
   655         self.streamid = streamid
   789         self.streamid = streamid
   669                          payload)
   803                          payload)
   670 
   804 
   671 class inputstream(stream):
   805 class inputstream(stream):
   672     """Represents a stream used for receiving data."""
   806     """Represents a stream used for receiving data."""
   673 
   807 
   674     def setdecoder(self, name, extraobjs):
   808     def __init__(self, streamid, active=False):
       
   809         super(inputstream, self).__init__(streamid, active=active)
       
   810         self._decoder = None
       
   811 
       
   812     def setdecoder(self, ui, name, extraobjs):
   675         """Set the decoder for this stream.
   813         """Set the decoder for this stream.
   676 
   814 
   677         Receives the stream profile name and any additional CBOR objects
   815         Receives the stream profile name and any additional CBOR objects
   678         decoded from the stream encoding settings frame payloads.
   816         decoded from the stream encoding settings frame payloads.
   679         """
   817         """
       
   818         if name not in STREAM_ENCODERS:
       
   819             raise error.Abort(_('unknown stream decoder: %s') % name)
       
   820 
       
   821         self._decoder = STREAM_ENCODERS[name][1](ui, extraobjs)
       
   822 
       
   823     def decode(self, data):
       
   824         # Default is identity decoder. We don't bother instantiating one
       
   825         # because it is trivial.
       
   826         if not self._decoder:
       
   827             return data
       
   828 
       
   829         return self._decoder.decode(data)
       
   830 
       
   831     def flush(self):
       
   832         if not self._decoder:
       
   833             return b''
       
   834 
       
   835         return self._decoder.flush()
   680 
   836 
   681 class outputstream(stream):
   837 class outputstream(stream):
   682     """Represents a stream used for sending data."""
   838     """Represents a stream used for sending data."""
       
   839 
       
   840     def __init__(self, streamid, active=False):
       
   841         super(outputstream, self).__init__(streamid, active=active)
       
   842         self._encoder = None
       
   843 
       
   844     def setencoder(self, ui, name):
       
   845         """Set the encoder for this stream.
       
   846 
       
   847         Receives the stream profile name.
       
   848         """
       
   849         if name not in STREAM_ENCODERS:
       
   850             raise error.Abort(_('unknown stream encoder: %s') % name)
       
   851 
       
   852         self._encoder = STREAM_ENCODERS[name][0](ui)
       
   853 
       
   854     def encode(self, data):
       
   855         if not self._encoder:
       
   856             return data
       
   857 
       
   858         return self._encoder.encode(data)
       
   859 
       
   860     def flush(self):
       
   861         if not self._encoder:
       
   862             return b''
       
   863 
       
   864         return self._encoder.flush()
       
   865 
       
   866     def finish(self):
       
   867         if not self._encoder:
       
   868             return b''
       
   869 
       
   870         self._encoder.finish()
   683 
   871 
   684 def ensureserverstream(stream):
   872 def ensureserverstream(stream):
   685     if stream.streamid % 2:
   873     if stream.streamid % 2:
   686         raise error.ProgrammingError('server should only write to even '
   874         raise error.ProgrammingError('server should only write to even '
   687                                      'numbered streams; %d is not even' %
   875                                      'numbered streams; %d is not even' %
   784         self._protocolsettingsdecoder = None
   972         self._protocolsettingsdecoder = None
   785 
   973 
   786         # Sender protocol settings are optional. Set implied default values.
   974         # Sender protocol settings are optional. Set implied default values.
   787         self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS)
   975         self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS)
   788 
   976 
       
   977         populatestreamencoders()
       
   978 
   789     def onframerecv(self, frame):
   979     def onframerecv(self, frame):
   790         """Process a frame that has been received off the wire.
   980         """Process a frame that has been received off the wire.
   791 
   981 
   792         Returns a dict with an ``action`` key that details what action,
   982         Returns a dict with an ``action`` key that details what action,
   793         if any, the consumer should take next.
   983         if any, the consumer should take next.
  1382         self._pendingrequests = collections.deque()
  1572         self._pendingrequests = collections.deque()
  1383         self._activerequests = {}
  1573         self._activerequests = {}
  1384         self._incomingstreams = {}
  1574         self._incomingstreams = {}
  1385         self._streamsettingsdecoders = {}
  1575         self._streamsettingsdecoders = {}
  1386 
  1576 
       
  1577         populatestreamencoders()
       
  1578 
  1387     def callcommand(self, name, args, datafh=None, redirect=None):
  1579     def callcommand(self, name, args, datafh=None, redirect=None):
  1388         """Request that a command be executed.
  1580         """Request that a command be executed.
  1389 
  1581 
  1390         Receives the command name, a dict of arguments to pass to the command,
  1582         Receives the command name, a dict of arguments to pass to the command,
  1391         and an optional file object containing the raw data for the command.
  1583         and an optional file object containing the raw data for the command.
  1492                 }
  1684                 }
  1493 
  1685 
  1494             self._incomingstreams[frame.streamid] = inputstream(
  1686             self._incomingstreams[frame.streamid] = inputstream(
  1495                 frame.streamid)
  1687                 frame.streamid)
  1496 
  1688 
       
  1689         stream = self._incomingstreams[frame.streamid]
       
  1690 
       
  1691         # If the payload is encoded, ask the stream to decode it. We
       
  1692         # merely substitute the decoded result into the frame payload as
       
  1693         # if it had been transferred all along.
  1497         if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
  1694         if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
  1498             raise error.ProgrammingError('support for decoding stream '
  1695             frame.payload = stream.decode(frame.payload)
  1499                                          'payloads not yet implemneted')
       
  1500 
  1696 
  1501         if frame.streamflags & STREAM_FLAG_END_STREAM:
  1697         if frame.streamflags & STREAM_FLAG_END_STREAM:
  1502             del self._incomingstreams[frame.streamid]
  1698             del self._incomingstreams[frame.streamid]
  1503 
  1699 
  1504         if frame.typeid == FRAME_TYPE_STREAM_SETTINGS:
  1700         if frame.typeid == FRAME_TYPE_STREAM_SETTINGS:
  1571                 'message': _('stream encoding settings frame did not contain '
  1767                 'message': _('stream encoding settings frame did not contain '
  1572                              'CBOR data'),
  1768                              'CBOR data'),
  1573             }
  1769             }
  1574 
  1770 
  1575         try:
  1771         try:
  1576             self._incomingstreams[frame.streamid].setdecoder(decoded[0],
  1772             self._incomingstreams[frame.streamid].setdecoder(self._ui,
       
  1773                                                              decoded[0],
  1577                                                              decoded[1:])
  1774                                                              decoded[1:])
  1578         except Exception as e:
  1775         except Exception as e:
  1579             return 'error', {
  1776             return 'error', {
  1580                 'message': (_('error setting stream decoder: %s') %
  1777                 'message': (_('error setting stream decoder: %s') %
  1581                             stringutil.forcebytestr(e)),
  1778                             stringutil.forcebytestr(e)),