comparison mercurial/bundle2.py @ 35112:073eec083e25

bundle2: extract logic for seeking bundle2 part into own class Currently, unbundlepart classes support bi-directional seeking. Most consumers of unbundlepart only ever seek forward - typically as part of moving to the end of the bundle part so they can move on to the next one. But regardless of the actual usage of the part, instances maintain an index mapping offsets within the underlying raw payload to offsets within the decoded payload. Maintaining the mapping of offset data can be expensive in terms of memory use. Furthermore, many bundle2 consumers don't have access to an underlying seekable stream. This includes all compressed bundles. So maintaining offset data when the underlying stream can't be seeked anyway is wasteful. And since many bundle2 streams can't be seeked, it seems like a bad idea to expose a seek API in bundle2 parts by default. If you provide them, people will attempt to use them. Seekable bundle2 parts should be the exception, not the rule. This commit starts the process dividing unbundlepart into 2 classes: a base class that supports linear, one-time reads and a child class that supports bi-directional seeking. In this first commit, we split various methods and attributes out into a new "seekableunbundlepart" class. Previous instantiators of "unbundlepart" now instantiate "seekableunbundlepart." This preserves backwards compatibility. The coupling between the classes is still tight: "unbundlepart" cannot be used on its own. This will be addressed in subsequent commits. Differential Revision: https://phab.mercurial-scm.org/D1386
author Gregory Szorc <gregory.szorc@gmail.com>
date Mon, 13 Nov 2017 19:22:11 -0800
parents 241d9caca11e
children 8aa43ff9c12c
comparison
equal deleted inserted replaced
35109:e96613048bdd 35112:073eec083e25
852 # From there, payload need to be decompressed 852 # From there, payload need to be decompressed
853 self._fp = self._compengine.decompressorreader(self._fp) 853 self._fp = self._compengine.decompressorreader(self._fp)
854 indebug(self.ui, 'start extraction of bundle2 parts') 854 indebug(self.ui, 'start extraction of bundle2 parts')
855 headerblock = self._readpartheader() 855 headerblock = self._readpartheader()
856 while headerblock is not None: 856 while headerblock is not None:
857 part = unbundlepart(self.ui, headerblock, self._fp) 857 part = seekableunbundlepart(self.ui, headerblock, self._fp)
858 yield part 858 yield part
859 # Seek to the end of the part to force it's consumption so the next 859 # Seek to the end of the part to force it's consumption so the next
860 # part can be read. But then seek back to the beginning so the 860 # part can be read. But then seek back to the beginning so the
861 # code consuming this generator has a part that starts at 0. 861 # code consuming this generator has a part that starts at 0.
862 part.seek(0, os.SEEK_END) 862 part.seek(0, os.SEEK_END)
1153 indebug(self.ui, 'bundle2 stream interruption, looking for a part.') 1153 indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
1154 headerblock = self._readpartheader() 1154 headerblock = self._readpartheader()
1155 if headerblock is None: 1155 if headerblock is None:
1156 indebug(self.ui, 'no part found during interruption.') 1156 indebug(self.ui, 'no part found during interruption.')
1157 return 1157 return
1158 part = unbundlepart(self.ui, headerblock, self._fp) 1158 part = seekableunbundlepart(self.ui, headerblock, self._fp)
1159 op = interruptoperation(self.ui) 1159 op = interruptoperation(self.ui)
1160 hardabort = False 1160 hardabort = False
1161 try: 1161 try:
1162 _processpart(op, part) 1162 _processpart(op, part)
1163 except (SystemExit, KeyboardInterrupt): 1163 except (SystemExit, KeyboardInterrupt):
1205 self.type = None 1205 self.type = None
1206 self.mandatoryparams = None 1206 self.mandatoryparams = None
1207 self.advisoryparams = None 1207 self.advisoryparams = None
1208 self.params = None 1208 self.params = None
1209 self.mandatorykeys = () 1209 self.mandatorykeys = ()
1210 self._payloadstream = None
1211 self._readheader() 1210 self._readheader()
1212 self._mandatory = None 1211 self._mandatory = None
1213 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1214 self._pos = 0 1212 self._pos = 0
1215 1213
1216 def _fromheader(self, size): 1214 def _fromheader(self, size):
1217 """return the next <size> byte from the header""" 1215 """return the next <size> byte from the header"""
1218 offset = self._headeroffset 1216 offset = self._headeroffset
1234 self.advisoryparams = tuple(advisoryparams) 1232 self.advisoryparams = tuple(advisoryparams)
1235 # user friendly UI 1233 # user friendly UI
1236 self.params = util.sortdict(self.mandatoryparams) 1234 self.params = util.sortdict(self.mandatoryparams)
1237 self.params.update(self.advisoryparams) 1235 self.params.update(self.advisoryparams)
1238 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams) 1236 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1239
1240 def _payloadchunks(self, chunknum=0):
1241 '''seek to specified chunk and start yielding data'''
1242 if len(self._chunkindex) == 0:
1243 assert chunknum == 0, 'Must start with chunk 0'
1244 self._chunkindex.append((0, self._tellfp()))
1245 else:
1246 assert chunknum < len(self._chunkindex), \
1247 'Unknown chunk %d' % chunknum
1248 self._seekfp(self._chunkindex[chunknum][1])
1249
1250 pos = self._chunkindex[chunknum][0]
1251 payloadsize = self._unpack(_fpayloadsize)[0]
1252 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1253 while payloadsize:
1254 if payloadsize == flaginterrupt:
1255 # interruption detection, the handler will now read a
1256 # single part and process it.
1257 interrupthandler(self.ui, self._fp)()
1258 elif payloadsize < 0:
1259 msg = 'negative payload chunk size: %i' % payloadsize
1260 raise error.BundleValueError(msg)
1261 else:
1262 result = self._readexact(payloadsize)
1263 chunknum += 1
1264 pos += payloadsize
1265 if chunknum == len(self._chunkindex):
1266 self._chunkindex.append((pos, self._tellfp()))
1267 yield result
1268 payloadsize = self._unpack(_fpayloadsize)[0]
1269 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1270
1271 def _findchunk(self, pos):
1272 '''for a given payload position, return a chunk number and offset'''
1273 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1274 if ppos == pos:
1275 return chunk, 0
1276 elif ppos > pos:
1277 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1278 raise ValueError('Unknown chunk')
1279 1237
1280 def _readheader(self): 1238 def _readheader(self):
1281 """read the header and setup the object""" 1239 """read the header and setup the object"""
1282 typesize = self._unpackheader(_fparttypesize)[0] 1240 typesize = self._unpackheader(_fparttypesize)[0]
1283 self.type = self._fromheader(typesize) 1241 self.type = self._fromheader(typesize)
1325 if not self.consumed and self._pos: 1283 if not self.consumed and self._pos:
1326 self.ui.debug('bundle2-input-part: total payload size %i\n' 1284 self.ui.debug('bundle2-input-part: total payload size %i\n'
1327 % self._pos) 1285 % self._pos)
1328 self.consumed = True 1286 self.consumed = True
1329 return data 1287 return data
1288
1289 class seekableunbundlepart(unbundlepart):
1290 """A bundle2 part in a bundle that is seekable.
1291
1292 Regular ``unbundlepart`` instances can only be read once. This class
1293 extends ``unbundlepart`` to enable bi-directional seeking within the
1294 part.
1295
1296 Bundle2 part data consists of framed chunks. Offsets when seeking
1297 refer to the decoded data, not the offsets in the underlying bundle2
1298 stream.
1299
1300 To facilitate quickly seeking within the decoded data, instances of this
1301 class maintain a mapping between offsets in the underlying stream and
1302 the decoded payload. This mapping will consume memory in proportion
1303 to the number of chunks within the payload (which almost certainly
1304 increases in proportion with the size of the part).
1305 """
1306 def __init__(self, ui, header, fp):
1307 # (payload, file) offsets for chunk starts.
1308 self._chunkindex = []
1309
1310 super(seekableunbundlepart, self).__init__(ui, header, fp)
1311
1312 def _payloadchunks(self, chunknum=0):
1313 '''seek to specified chunk and start yielding data'''
1314 if len(self._chunkindex) == 0:
1315 assert chunknum == 0, 'Must start with chunk 0'
1316 self._chunkindex.append((0, self._tellfp()))
1317 else:
1318 assert chunknum < len(self._chunkindex), \
1319 'Unknown chunk %d' % chunknum
1320 self._seekfp(self._chunkindex[chunknum][1])
1321
1322 pos = self._chunkindex[chunknum][0]
1323 payloadsize = self._unpack(_fpayloadsize)[0]
1324 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1325 while payloadsize:
1326 if payloadsize == flaginterrupt:
1327 # interruption detection, the handler will now read a
1328 # single part and process it.
1329 interrupthandler(self.ui, self._fp)()
1330 elif payloadsize < 0:
1331 msg = 'negative payload chunk size: %i' % payloadsize
1332 raise error.BundleValueError(msg)
1333 else:
1334 result = self._readexact(payloadsize)
1335 chunknum += 1
1336 pos += payloadsize
1337 if chunknum == len(self._chunkindex):
1338 self._chunkindex.append((pos, self._tellfp()))
1339 yield result
1340 payloadsize = self._unpack(_fpayloadsize)[0]
1341 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1342
1343 def _findchunk(self, pos):
1344 '''for a given payload position, return a chunk number and offset'''
1345 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1346 if ppos == pos:
1347 return chunk, 0
1348 elif ppos > pos:
1349 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1350 raise ValueError('Unknown chunk')
1330 1351
1331 def tell(self): 1352 def tell(self):
1332 return self._pos 1353 return self._pos
1333 1354
1334 def seek(self, offset, whence=os.SEEK_SET): 1355 def seek(self, offset, whence=os.SEEK_SET):