bundle2: add a interrupt mechanism
It is now possible to emit a single part in the middle of a payload production.
This part will be processed with limitation (only access to a `ui` object). The
goal is to let the server raise exception and output while a part is being
processed. The source motivation is to transmit exception that occurs while
generating a part.
This change is was the motivation to bump the bundle2 format from HG2X to HG2Y.
Somehow, the format bump made it into 3.2 without it. So this change go on
stable. It is low risk as bundle2 is still disabled by default.
--- a/mercurial/bundle2.py Tue Oct 21 12:38:28 2014 -0700
+++ b/mercurial/bundle2.py Tue Oct 14 10:47:47 2014 -0700
@@ -695,6 +695,61 @@
elif len(self.data):
yield self.data
+
+flaginterrupt = -1
+
+class interrupthandler(unpackermixin):
+ """read one part and process it with restricted capability
+
+ This allows to transmit exception raised on the producer size during part
+ iteration while the consumer is reading a part.
+
+ Part processed in this manner only have access to a ui object,"""
+
+ def __init__(self, ui, fp):
+ super(interrupthandler, self).__init__(fp)
+ self.ui = ui
+
+ def _readpartheader(self):
+ """reads a part header size and return the bytes blob
+
+ returns None if empty"""
+ headersize = self._unpack(_fpartheadersize)[0]
+ if headersize < 0:
+ raise error.BundleValueError('negative part header size: %i'
+ % headersize)
+ self.ui.debug('part header size: %i\n' % headersize)
+ if headersize:
+ return self._readexact(headersize)
+ return None
+
+ def __call__(self):
+ self.ui.debug('bundle2 stream interruption, looking for a part.\n')
+ headerblock = self._readpartheader()
+ if headerblock is None:
+ self.ui.debug('no part found during iterruption.\n')
+ return
+ part = unbundlepart(self.ui, headerblock, self._fp)
+ op = interruptoperation(self.ui)
+ _processpart(op, part)
+
+class interruptoperation(object):
+ """A limited operation to be use by part handler during interruption
+
+ It only have access to an ui object.
+ """
+
+ def __init__(self, ui):
+ self.ui = ui
+ self.reply = None
+
+ @property
+ def repo(self):
+ raise RuntimeError('no repo access from stream interruption')
+
+ def gettransaction(self):
+ raise TransactionUnavailable('no repo access from stream interruption')
+
class unbundlepart(unpackermixin):
"""a bundle part read from a bundle"""
@@ -772,10 +827,15 @@
payloadsize = self._unpack(_fpayloadsize)[0]
self.ui.debug('payload chunk size: %i\n' % payloadsize)
while payloadsize:
- if payloadsize < 0:
- msg = 'negative payload chunk size: %i' % payloadsize
+ if payloadsize == flaginterrupt:
+ # interruption detection, the handler will now read a
+ # single part and process it.
+ interrupthandler(self.ui, self._fp)()
+ elif payloadsize < 0:
+ msg = 'negative payload chunk size: %i' % payloadsize
raise error.BundleValueError(msg)
- yield self._readexact(payloadsize)
+ else:
+ yield self._readexact(payloadsize)
payloadsize = self._unpack(_fpayloadsize)[0]
self.ui.debug('payload chunk size: %i\n' % payloadsize)
self._payloadstream = util.chunkbuffer(payloadchunks())