changeset 23066:ad144882318d stable

bundle2: add a interrupt mechanism It is now possible to emit a single part in the middle of a payload production. This part will be processed with limitation (only access to a `ui` object). The goal is to let the server raise exception and output while a part is being processed. The source motivation is to transmit exception that occurs while generating a part. This change is was the motivation to bump the bundle2 format from HG2X to HG2Y. Somehow, the format bump made it into 3.2 without it. So this change go on stable. It is low risk as bundle2 is still disabled by default.
author Pierre-Yves David <pierre-yves.david@fb.com>
date Tue, 14 Oct 2014 10:47:47 -0700
parents 963f311e3a81
children 420a051616ce
files mercurial/bundle2.py
diffstat 1 files changed, 63 insertions(+), 3 deletions(-) [+]
line wrap: on
line diff
--- a/mercurial/bundle2.py	Tue Oct 21 12:38:28 2014 -0700
+++ b/mercurial/bundle2.py	Tue Oct 14 10:47:47 2014 -0700
@@ -695,6 +695,61 @@
         elif len(self.data):
             yield self.data
 
+
+flaginterrupt = -1
+
+class interrupthandler(unpackermixin):
+    """read one part and process it with restricted capability
+
+    This allows to transmit exception raised on the producer size during part
+    iteration while the consumer is reading a part.
+
+    Part processed in this manner only have access to a ui object,"""
+
+    def __init__(self, ui, fp):
+        super(interrupthandler, self).__init__(fp)
+        self.ui = ui
+
+    def _readpartheader(self):
+        """reads a part header size and return the bytes blob
+
+        returns None if empty"""
+        headersize = self._unpack(_fpartheadersize)[0]
+        if headersize < 0:
+            raise error.BundleValueError('negative part header size: %i'
+                                         % headersize)
+        self.ui.debug('part header size: %i\n' % headersize)
+        if headersize:
+            return self._readexact(headersize)
+        return None
+
+    def __call__(self):
+        self.ui.debug('bundle2 stream interruption, looking for a part.\n')
+        headerblock = self._readpartheader()
+        if headerblock is None:
+            self.ui.debug('no part found during iterruption.\n')
+            return
+        part = unbundlepart(self.ui, headerblock, self._fp)
+        op = interruptoperation(self.ui)
+        _processpart(op, part)
+
+class interruptoperation(object):
+    """A limited operation to be use by part handler during interruption
+
+    It only have access to an ui object.
+    """
+
+    def __init__(self, ui):
+        self.ui = ui
+        self.reply = None
+
+    @property
+    def repo(self):
+        raise RuntimeError('no repo access from stream interruption')
+
+    def gettransaction(self):
+        raise TransactionUnavailable('no repo access from stream interruption')
+
 class unbundlepart(unpackermixin):
     """a bundle part read from a bundle"""
 
@@ -772,10 +827,15 @@
             payloadsize = self._unpack(_fpayloadsize)[0]
             self.ui.debug('payload chunk size: %i\n' % payloadsize)
             while payloadsize:
-                if payloadsize < 0:
-                    msg = 'negative payload chunk size: %i' % payloadsize
+                if payloadsize == flaginterrupt:
+                    # interruption detection, the handler will now read a
+                    # single part and process it.
+                    interrupthandler(self.ui, self._fp)()
+                elif payloadsize < 0:
+                    msg = 'negative payload chunk size: %i' %  payloadsize
                     raise error.BundleValueError(msg)
-                yield self._readexact(payloadsize)
+                else:
+                    yield self._readexact(payloadsize)
                 payloadsize = self._unpack(_fpayloadsize)[0]
                 self.ui.debug('payload chunk size: %i\n' % payloadsize)
         self._payloadstream = util.chunkbuffer(payloadchunks())