comparison mercurial/bundle2.py @ 21019:3dc09f831a2e

bundle2: lazy unbundle of part payload The `unbundle` part gains a `read` method to retrieve payload content. This method behaves as a python file-like read method. The bundle-processing code is updated to make sure a part is fully consumed before another one is extracted. Test output changes because the debug output is even more interleaved now.
author Pierre-Yves David <pierre-yves.david@fb.com>
date Fri, 11 Apr 2014 16:05:22 -0400
parents b477afb1c81e
children 5041163ee382
comparison
equal deleted inserted replaced
21018:c848bfd02366 21019:3dc09f831a2e
286 # todo: 286 # todo:
287 # - replace this is a init function soon. 287 # - replace this is a init function soon.
288 # - exception catching 288 # - exception catching
289 unbundler.params 289 unbundler.params
290 iterparts = iter(unbundler) 290 iterparts = iter(unbundler)
291 part = None
291 try: 292 try:
292 for part in iterparts: 293 for part in iterparts:
293 parttype = part.type 294 parttype = part.type
294 # part key are matched lower case 295 # part key are matched lower case
295 key = parttype.lower() 296 key = parttype.lower()
300 if key != parttype: # mandatory parts 301 if key != parttype: # mandatory parts
301 # todo: 302 # todo:
302 # - use a more precise exception 303 # - use a more precise exception
303 raise 304 raise
304 op.ui.debug('ignoring unknown advisory part %r\n' % key) 305 op.ui.debug('ignoring unknown advisory part %r\n' % key)
305 # todo: 306 # consuming the part
306 # - consume the part once we use streaming 307 part.read()
307 continue 308 continue
308 309
309 # handler is called outside the above try block so that we don't 310 # handler is called outside the above try block so that we don't
310 # risk catching KeyErrors from anything other than the 311 # risk catching KeyErrors from anything other than the
311 # parthandlermapping lookup (any KeyError raised by handler() 312 # parthandlermapping lookup (any KeyError raised by handler()
312 # itself represents a defect of a different variety). 313 # itself represents a defect of a different variety).
313 handler(op, part) 314 handler(op, part)
315 part.read()
314 except Exception: 316 except Exception:
317 if part is not None:
318 # consume the bundle content
319 part.read()
315 for part in iterparts: 320 for part in iterparts:
316 pass # consume the bundle content 321 # consume the bundle content
322 part.read()
317 raise 323 raise
318 return op 324 return op
319 325
320 class bundle20(object): 326 class bundle20(object):
321 """represent an outgoing bundle2 container 327 """represent an outgoing bundle2 container
542 super(unbundlepart, self).__init__(fp) 548 super(unbundlepart, self).__init__(fp)
543 self.ui = ui 549 self.ui = ui
544 # unbundle state attr 550 # unbundle state attr
545 self._headerdata = header 551 self._headerdata = header
546 self._headeroffset = 0 552 self._headeroffset = 0
553 self._initialized = False
554 self.consumed = False
547 # part data 555 # part data
548 self.id = None 556 self.id = None
549 self.type = None 557 self.type = None
550 self.mandatoryparams = None 558 self.mandatoryparams = None
551 self.advisoryparams = None 559 self.advisoryparams = None
552 self.data = None 560 self._payloadstream = None
553 self._readdata() 561 self._readheader()
554 562
555 def _fromheader(self, size): 563 def _fromheader(self, size):
556 """return the next <size> byte from the header""" 564 """return the next <size> byte from the header"""
557 offset = self._headeroffset 565 offset = self._headeroffset
558 data = self._headerdata[offset:(offset + size)] 566 data = self._headerdata[offset:(offset + size)]
559 self._headeroffset += size 567 self._headeroffset = offset + size
560 return data 568 return data
561 569
562 def _unpackheader(self, format): 570 def _unpackheader(self, format):
563 """read given format from header 571 """read given format from header
564 572
565 This automatically compute the size of the format to read.""" 573 This automatically compute the size of the format to read."""
566 data = self._fromheader(struct.calcsize(format)) 574 data = self._fromheader(struct.calcsize(format))
567 return _unpack(format, data) 575 return _unpack(format, data)
568 576
569 def _readdata(self): 577 def _readheader(self):
570 """read the header and setup the object""" 578 """read the header and setup the object"""
571 # some utility to help reading from the header block
572
573 typesize = self._unpackheader(_fparttypesize)[0] 579 typesize = self._unpackheader(_fparttypesize)[0]
574 self.type = self._fromheader(typesize) 580 self.type = self._fromheader(typesize)
575 self.ui.debug('part type: "%s"\n' % self.type) 581 self.ui.debug('part type: "%s"\n' % self.type)
576 self.id = self._unpackheader(_fpartid)[0] 582 self.id = self._unpackheader(_fpartid)[0]
577 self.ui.debug('part id: "%s"\n' % self.id) 583 self.ui.debug('part id: "%s"\n' % self.id)
595 for key, value in advsizes: 601 for key, value in advsizes:
596 advparams.append((self._fromheader(key), self._fromheader(value))) 602 advparams.append((self._fromheader(key), self._fromheader(value)))
597 self.mandatoryparams = manparams 603 self.mandatoryparams = manparams
598 self.advisoryparams = advparams 604 self.advisoryparams = advparams
599 ## part payload 605 ## part payload
600 payload = [] 606 def payloadchunks():
601 payloadsize = self._unpack(_fpayloadsize)[0]
602 self.ui.debug('payload chunk size: %i\n' % payloadsize)
603 while payloadsize:
604 payload.append(self._readexact(payloadsize))
605 payloadsize = self._unpack(_fpayloadsize)[0] 607 payloadsize = self._unpack(_fpayloadsize)[0]
606 self.ui.debug('payload chunk size: %i\n' % payloadsize) 608 self.ui.debug('payload chunk size: %i\n' % payloadsize)
607 self.data = ''.join(payload) 609 while payloadsize:
610 yield self._readexact(payloadsize)
611 payloadsize = self._unpack(_fpayloadsize)[0]
612 self.ui.debug('payload chunk size: %i\n' % payloadsize)
613 self._payloadstream = util.chunkbuffer(payloadchunks())
614 # we read the data, tell it
615 self._initialized = True
616
617 def read(self, size=None):
618 """read payload data"""
619 if not self._initialized:
620 self._readheader()
621 if size is None:
622 data = self._payloadstream.read()
623 else:
624 data = self._payloadstream.read(size)
625 if size is None or len(data) < size:
626 self.consumed = True
627 return data
628
608 629
609 @parthandler('changegroup') 630 @parthandler('changegroup')
610 def handlechangegroup(op, inpart): 631 def handlechangegroup(op, inpart):
611 """apply a changegroup part on the repo 632 """apply a changegroup part on the repo
612 633
617 # 638 #
618 # The addchangegroup function will get a transaction object by itself, but 639 # The addchangegroup function will get a transaction object by itself, but
619 # we need to make sure we trigger the creation of a transaction object used 640 # we need to make sure we trigger the creation of a transaction object used
620 # for the whole processing scope. 641 # for the whole processing scope.
621 op.gettransaction() 642 op.gettransaction()
622 data = StringIO.StringIO(inpart.data) 643 data = StringIO.StringIO(inpart.read())
623 data.seek(0) 644 data.seek(0)
624 cg = changegroup.readbundle(data, 'bundle2part') 645 cg = changegroup.readbundle(data, 'bundle2part')
625 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2') 646 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
626 op.records.add('changegroup', {'return': ret}) 647 op.records.add('changegroup', {'return': ret})
627 if op.reply is not None: 648 if op.reply is not None:
629 # return. But one need to start somewhere. 650 # return. But one need to start somewhere.
630 part = bundlepart('reply:changegroup', (), 651 part = bundlepart('reply:changegroup', (),
631 [('in-reply-to', str(inpart.id)), 652 [('in-reply-to', str(inpart.id)),
632 ('return', '%i' % ret)]) 653 ('return', '%i' % ret)])
633 op.reply.addpart(part) 654 op.reply.addpart(part)
655 assert not inpart.read()
634 656
635 @parthandler('reply:changegroup') 657 @parthandler('reply:changegroup')
636 def handlechangegroup(op, inpart): 658 def handlechangegroup(op, inpart):
637 p = dict(inpart.advisoryparams) 659 p = dict(inpart.advisoryparams)
638 ret = int(p['return']) 660 ret = int(p['return'])