bundle2: lazy unbundle of part payload
authorPierre-Yves David <pierre-yves.david@fb.com>
Fri, 11 Apr 2014 16:05:22 -0400
changeset 21019 3dc09f831a2e
parent 21018 c848bfd02366
child 21020 5041163ee382
bundle2: lazy unbundle of part payload The `unbundle` part gains a `read` method to retrieve payload content. This method behaves as a python file-like read method. The bundle-processing code is updated to make sure a part is fully consumed before another one is extracted. Test output changes because the debug output is even more interleaved now.
mercurial/bundle2.py
tests/test-bundle2.t
--- a/mercurial/bundle2.py	Thu Apr 10 22:10:26 2014 -0700
+++ b/mercurial/bundle2.py	Fri Apr 11 16:05:22 2014 -0400
@@ -288,6 +288,7 @@
     # - exception catching
     unbundler.params
     iterparts = iter(unbundler)
+    part = None
     try:
         for part in iterparts:
             parttype = part.type
@@ -302,8 +303,8 @@
                     # - use a more precise exception
                     raise
                 op.ui.debug('ignoring unknown advisory part %r\n' % key)
-                # todo:
-                # - consume the part once we use streaming
+                # consuming the part
+                part.read()
                 continue
 
             # handler is called outside the above try block so that we don't
@@ -311,9 +312,14 @@
             # parthandlermapping lookup (any KeyError raised by handler()
             # itself represents a defect of a different variety).
             handler(op, part)
+            part.read()
     except Exception:
+        if part is not None:
+            # consume the bundle content
+            part.read()
         for part in iterparts:
-            pass # consume the bundle content
+            # consume the bundle content
+            part.read()
         raise
     return op
 
@@ -544,19 +550,21 @@
         # unbundle state attr
         self._headerdata = header
         self._headeroffset = 0
+        self._initialized = False
+        self.consumed = False
         # part data
         self.id = None
         self.type = None
         self.mandatoryparams = None
         self.advisoryparams = None
-        self.data = None
-        self._readdata()
+        self._payloadstream = None
+        self._readheader()
 
     def _fromheader(self, size):
         """return the next <size> byte from the header"""
         offset = self._headeroffset
         data = self._headerdata[offset:(offset + size)]
-        self._headeroffset += size
+        self._headeroffset = offset + size
         return data
 
     def _unpackheader(self, format):
@@ -566,10 +574,8 @@
         data = self._fromheader(struct.calcsize(format))
         return _unpack(format, data)
 
-    def _readdata(self):
+    def _readheader(self):
         """read the header and setup the object"""
-        # some utility to help reading from the header block
-
         typesize = self._unpackheader(_fparttypesize)[0]
         self.type = self._fromheader(typesize)
         self.ui.debug('part type: "%s"\n' % self.type)
@@ -597,14 +603,29 @@
         self.mandatoryparams = manparams
         self.advisoryparams  = advparams
         ## part payload
-        payload = []
-        payloadsize = self._unpack(_fpayloadsize)[0]
-        self.ui.debug('payload chunk size: %i\n' % payloadsize)
-        while payloadsize:
-            payload.append(self._readexact(payloadsize))
+        def payloadchunks():
             payloadsize = self._unpack(_fpayloadsize)[0]
             self.ui.debug('payload chunk size: %i\n' % payloadsize)
-        self.data = ''.join(payload)
+            while payloadsize:
+                yield self._readexact(payloadsize)
+                payloadsize = self._unpack(_fpayloadsize)[0]
+                self.ui.debug('payload chunk size: %i\n' % payloadsize)
+        self._payloadstream = util.chunkbuffer(payloadchunks())
+        # we read the data, tell it
+        self._initialized = True
+
+    def read(self, size=None):
+        """read payload data"""
+        if not self._initialized:
+            self._readheader()
+        if size is None:
+            data = self._payloadstream.read()
+        else:
+            data = self._payloadstream.read(size)
+        if size is None or len(data) < size:
+            self.consumed = True
+        return data
+
 
 @parthandler('changegroup')
 def handlechangegroup(op, inpart):
@@ -619,7 +640,7 @@
     # we need to make sure we trigger the creation of a transaction object used
     # for the whole processing scope.
     op.gettransaction()
-    data = StringIO.StringIO(inpart.data)
+    data = StringIO.StringIO(inpart.read())
     data.seek(0)
     cg = changegroup.readbundle(data, 'bundle2part')
     ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
@@ -631,6 +652,7 @@
                            [('in-reply-to', str(inpart.id)),
                             ('return', '%i' % ret)])
         op.reply.addpart(part)
+    assert not inpart.read()
 
 @parthandler('reply:changegroup')
 def handlechangegroup(op, inpart):
--- a/tests/test-bundle2.t	Thu Apr 10 22:10:26 2014 -0700
+++ b/tests/test-bundle2.t	Fri Apr 11 16:05:22 2014 -0400
@@ -28,7 +28,7 @@
   >     """handle a "test:song" bundle2 part, printing the lyrics on stdin"""
   >     op.ui.write('The choir starts singing:\n')
   >     verses = 0
-  >     for line in part.data.split('\n'):
+  >     for line in part.read().split('\n'):
   >         op.ui.write('    %s\n' % line)
   >         verses += 1
   >     op.records.add('song', {'verses': verses})
@@ -152,7 +152,7 @@
   >         ui.write('  :%s:\n' % p.type)
   >         ui.write('    mandatory: %i\n' % len(p.mandatoryparams))
   >         ui.write('    advisory: %i\n' % len(p.advisoryparams))
-  >         ui.write('    payload: %i bytes\n' % len(p.data))
+  >         ui.write('    payload: %i bytes\n' % len(p.read()))
   >     ui.write('parts count:   %i\n' % count)
   > EOF
   $ cat >> $HGRCPATH << EOF
@@ -378,48 +378,48 @@
   part type: "test:empty"
   part id: "0"
   part parameters: 0
-  payload chunk size: 0
     :test:empty:
       mandatory: 0
       advisory: 0
+  payload chunk size: 0
       payload: 0 bytes
   part header size: 17
   part type: "test:empty"
   part id: "1"
   part parameters: 0
-  payload chunk size: 0
     :test:empty:
       mandatory: 0
       advisory: 0
+  payload chunk size: 0
       payload: 0 bytes
   part header size: 16
   part type: "test:song"
   part id: "2"
   part parameters: 0
-  payload chunk size: 178
-  payload chunk size: 0
     :test:song:
       mandatory: 0
       advisory: 0
+  payload chunk size: 178
+  payload chunk size: 0
       payload: 178 bytes
   part header size: 43
   part type: "test:math"
   part id: "3"
   part parameters: 3
-  payload chunk size: 2
-  payload chunk size: 0
     :test:math:
       mandatory: 2
       advisory: 1
+  payload chunk size: 2
+  payload chunk size: 0
       payload: 2 bytes
   part header size: 16
   part type: "test:ping"
   part id: "4"
   part parameters: 0
-  payload chunk size: 0
     :test:ping:
       mandatory: 0
       advisory: 0
+  payload chunk size: 0
       payload: 0 bytes
   part header size: 0
   end of bundle2 stream
@@ -438,22 +438,22 @@
   part type: "test:empty"
   part id: "0"
   part parameters: 0
+  ignoring unknown advisory part 'test:empty'
   payload chunk size: 0
-  ignoring unknown advisory part 'test:empty'
   part header size: 17
   part type: "test:empty"
   part id: "1"
   part parameters: 0
+  ignoring unknown advisory part 'test:empty'
   payload chunk size: 0
-  ignoring unknown advisory part 'test:empty'
   part header size: 16
   part type: "test:song"
   part id: "2"
   part parameters: 0
+  found a handler for part 'test:song'
+  The choir starts singing:
   payload chunk size: 178
   payload chunk size: 0
-  found a handler for part 'test:song'
-  The choir starts singing:
       Patali Dirapata, Cromda Cromda Ripalo, Pata Pata, Ko Ko Ko
       Bokoro Dipoulito, Rondi Rondi Pepino, Pata Pata, Ko Ko Ko
       Emana Karassoli, Loucra Loucra Ponponto, Pata Pata, Ko Ko Ko.
@@ -461,16 +461,16 @@
   part type: "test:math"
   part id: "3"
   part parameters: 3
+  ignoring unknown advisory part 'test:math'
   payload chunk size: 2
   payload chunk size: 0
-  ignoring unknown advisory part 'test:math'
   part header size: 16
   part type: "test:ping"
   part id: "4"
   part parameters: 0
-  payload chunk size: 0
   found a handler for part 'test:ping'
   received ping request (id 4)
+  payload chunk size: 0
   part header size: 0
   end of bundle2 stream
   0 unread bytes