bundle2: lazy unbundle of part payload
The `unbundle` part gains a `read` method to retrieve payload content.
This method behaves as a python file-like read method.
The bundle-processing code is updated to make sure a part is fully consumed before
another one is extracted.
Test output changes because the debug output is even more interleaved now.
--- a/mercurial/bundle2.py Thu Apr 10 22:10:26 2014 -0700
+++ b/mercurial/bundle2.py Fri Apr 11 16:05:22 2014 -0400
@@ -288,6 +288,7 @@
# - exception catching
unbundler.params
iterparts = iter(unbundler)
+ part = None
try:
for part in iterparts:
parttype = part.type
@@ -302,8 +303,8 @@
# - use a more precise exception
raise
op.ui.debug('ignoring unknown advisory part %r\n' % key)
- # todo:
- # - consume the part once we use streaming
+ # consuming the part
+ part.read()
continue
# handler is called outside the above try block so that we don't
@@ -311,9 +312,14 @@
# parthandlermapping lookup (any KeyError raised by handler()
# itself represents a defect of a different variety).
handler(op, part)
+ part.read()
except Exception:
+ if part is not None:
+ # consume the bundle content
+ part.read()
for part in iterparts:
- pass # consume the bundle content
+ # consume the bundle content
+ part.read()
raise
return op
@@ -544,19 +550,21 @@
# unbundle state attr
self._headerdata = header
self._headeroffset = 0
+ self._initialized = False
+ self.consumed = False
# part data
self.id = None
self.type = None
self.mandatoryparams = None
self.advisoryparams = None
- self.data = None
- self._readdata()
+ self._payloadstream = None
+ self._readheader()
def _fromheader(self, size):
"""return the next <size> byte from the header"""
offset = self._headeroffset
data = self._headerdata[offset:(offset + size)]
- self._headeroffset += size
+ self._headeroffset = offset + size
return data
def _unpackheader(self, format):
@@ -566,10 +574,8 @@
data = self._fromheader(struct.calcsize(format))
return _unpack(format, data)
- def _readdata(self):
+ def _readheader(self):
"""read the header and setup the object"""
- # some utility to help reading from the header block
-
typesize = self._unpackheader(_fparttypesize)[0]
self.type = self._fromheader(typesize)
self.ui.debug('part type: "%s"\n' % self.type)
@@ -597,14 +603,29 @@
self.mandatoryparams = manparams
self.advisoryparams = advparams
## part payload
- payload = []
- payloadsize = self._unpack(_fpayloadsize)[0]
- self.ui.debug('payload chunk size: %i\n' % payloadsize)
- while payloadsize:
- payload.append(self._readexact(payloadsize))
+ def payloadchunks():
payloadsize = self._unpack(_fpayloadsize)[0]
self.ui.debug('payload chunk size: %i\n' % payloadsize)
- self.data = ''.join(payload)
+ while payloadsize:
+ yield self._readexact(payloadsize)
+ payloadsize = self._unpack(_fpayloadsize)[0]
+ self.ui.debug('payload chunk size: %i\n' % payloadsize)
+ self._payloadstream = util.chunkbuffer(payloadchunks())
+ # we read the data, tell it
+ self._initialized = True
+
+ def read(self, size=None):
+ """read payload data"""
+ if not self._initialized:
+ self._readheader()
+ if size is None:
+ data = self._payloadstream.read()
+ else:
+ data = self._payloadstream.read(size)
+ if size is None or len(data) < size:
+ self.consumed = True
+ return data
+
@parthandler('changegroup')
def handlechangegroup(op, inpart):
@@ -619,7 +640,7 @@
# we need to make sure we trigger the creation of a transaction object used
# for the whole processing scope.
op.gettransaction()
- data = StringIO.StringIO(inpart.data)
+ data = StringIO.StringIO(inpart.read())
data.seek(0)
cg = changegroup.readbundle(data, 'bundle2part')
ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
@@ -631,6 +652,7 @@
[('in-reply-to', str(inpart.id)),
('return', '%i' % ret)])
op.reply.addpart(part)
+ assert not inpart.read()
@parthandler('reply:changegroup')
def handlechangegroup(op, inpart):
--- a/tests/test-bundle2.t Thu Apr 10 22:10:26 2014 -0700
+++ b/tests/test-bundle2.t Fri Apr 11 16:05:22 2014 -0400
@@ -28,7 +28,7 @@
> """handle a "test:song" bundle2 part, printing the lyrics on stdin"""
> op.ui.write('The choir starts singing:\n')
> verses = 0
- > for line in part.data.split('\n'):
+ > for line in part.read().split('\n'):
> op.ui.write(' %s\n' % line)
> verses += 1
> op.records.add('song', {'verses': verses})
@@ -152,7 +152,7 @@
> ui.write(' :%s:\n' % p.type)
> ui.write(' mandatory: %i\n' % len(p.mandatoryparams))
> ui.write(' advisory: %i\n' % len(p.advisoryparams))
- > ui.write(' payload: %i bytes\n' % len(p.data))
+ > ui.write(' payload: %i bytes\n' % len(p.read()))
> ui.write('parts count: %i\n' % count)
> EOF
$ cat >> $HGRCPATH << EOF
@@ -378,48 +378,48 @@
part type: "test:empty"
part id: "0"
part parameters: 0
- payload chunk size: 0
:test:empty:
mandatory: 0
advisory: 0
+ payload chunk size: 0
payload: 0 bytes
part header size: 17
part type: "test:empty"
part id: "1"
part parameters: 0
- payload chunk size: 0
:test:empty:
mandatory: 0
advisory: 0
+ payload chunk size: 0
payload: 0 bytes
part header size: 16
part type: "test:song"
part id: "2"
part parameters: 0
- payload chunk size: 178
- payload chunk size: 0
:test:song:
mandatory: 0
advisory: 0
+ payload chunk size: 178
+ payload chunk size: 0
payload: 178 bytes
part header size: 43
part type: "test:math"
part id: "3"
part parameters: 3
- payload chunk size: 2
- payload chunk size: 0
:test:math:
mandatory: 2
advisory: 1
+ payload chunk size: 2
+ payload chunk size: 0
payload: 2 bytes
part header size: 16
part type: "test:ping"
part id: "4"
part parameters: 0
- payload chunk size: 0
:test:ping:
mandatory: 0
advisory: 0
+ payload chunk size: 0
payload: 0 bytes
part header size: 0
end of bundle2 stream
@@ -438,22 +438,22 @@
part type: "test:empty"
part id: "0"
part parameters: 0
+ ignoring unknown advisory part 'test:empty'
payload chunk size: 0
- ignoring unknown advisory part 'test:empty'
part header size: 17
part type: "test:empty"
part id: "1"
part parameters: 0
+ ignoring unknown advisory part 'test:empty'
payload chunk size: 0
- ignoring unknown advisory part 'test:empty'
part header size: 16
part type: "test:song"
part id: "2"
part parameters: 0
+ found a handler for part 'test:song'
+ The choir starts singing:
payload chunk size: 178
payload chunk size: 0
- found a handler for part 'test:song'
- The choir starts singing:
Patali Dirapata, Cromda Cromda Ripalo, Pata Pata, Ko Ko Ko
Bokoro Dipoulito, Rondi Rondi Pepino, Pata Pata, Ko Ko Ko
Emana Karassoli, Loucra Loucra Ponponto, Pata Pata, Ko Ko Ko.
@@ -461,16 +461,16 @@
part type: "test:math"
part id: "3"
part parameters: 3
+ ignoring unknown advisory part 'test:math'
payload chunk size: 2
payload chunk size: 0
- ignoring unknown advisory part 'test:math'
part header size: 16
part type: "test:ping"
part id: "4"
part parameters: 0
- payload chunk size: 0
found a handler for part 'test:ping'
received ping request (id 4)
+ payload chunk size: 0
part header size: 0
end of bundle2 stream
0 unread bytes