--- a/mercurial/bundle2.py Sun Nov 12 19:46:15 2017 -0800
+++ b/mercurial/bundle2.py Mon Nov 13 20:03:02 2017 -0800
@@ -363,7 +363,7 @@
self.count = count
self.current = p
yield p
- p.seek(0, os.SEEK_END)
+ p.consume()
self.current = None
self.iterator = func()
return self.iterator
@@ -385,11 +385,11 @@
try:
if self.current:
# consume the part content to not corrupt the stream.
- self.current.seek(0, os.SEEK_END)
+ self.current.consume()
for part in self.iterator:
# consume the bundle content
- part.seek(0, os.SEEK_END)
+ part.consume()
except Exception:
seekerror = True
@@ -856,10 +856,11 @@
while headerblock is not None:
part = seekableunbundlepart(self.ui, headerblock, self._fp)
yield part
- # Seek to the end of the part to force it's consumption so the next
- # part can be read. But then seek back to the beginning so the
- # code consuming this generator has a part that starts at 0.
- part.seek(0, os.SEEK_END)
+ # Ensure part is fully consumed so we can start reading the next
+ # part.
+ part.consume()
+ # But then seek back to the beginning so the code consuming this
+ # generator has a part that starts at 0.
part.seek(0, os.SEEK_SET)
headerblock = self._readpartheader()
indebug(self.ui, 'end of bundle2 stream')
@@ -1165,7 +1166,7 @@
raise
finally:
if not hardabort:
- part.seek(0, os.SEEK_END)
+ part.consume()
self.ui.debug('bundle2-input-stream-interrupt:'
' closing out of band context\n')
@@ -1300,6 +1301,20 @@
"""Generator of decoded chunks in the payload."""
return decodepayloadchunks(self.ui, self._fp)
+ def consume(self):
+ """Read the part payload until completion.
+
+ By consuming the part data, the underlying stream read offset will
+ be advanced to the next part (or end of stream).
+ """
+ if self.consumed:
+ return
+
+ chunk = self.read(32768)
+ while chunk:
+ self._pos += len(chunk)
+ chunk = self.read(32768)
+
def read(self, size=None):
"""read payload data"""
if not self._initialized: