mercurial/streamclone.py
changeset 35756 cfdccd560b66
parent 35495 ded3a63f305b
child 35757 bbf7abd09ff0
equal deleted inserted replaced
35755:2384523cee4d 35756:cfdccd560b66
   426     def __init__(self, fh):
   426     def __init__(self, fh):
   427         self._fh = fh
   427         self._fh = fh
   428 
   428 
   429     def apply(self, repo):
   429     def apply(self, repo):
   430         return applybundlev1(repo, self._fh)
   430         return applybundlev1(repo, self._fh)
       
   431 
       
   432 def _emit(repo, entries, totalfilesize):
       
   433     """actually emit the stream bundle"""
       
   434     progress = repo.ui.progress
       
   435     progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes'))
       
   436     vfs = repo.svfs
       
   437     try:
       
   438         seen = 0
       
   439         for name, size in entries:
       
   440             yield util.uvarintencode(len(name))
       
   441             fp = vfs(name)
       
   442             try:
       
   443                 yield util.uvarintencode(size)
       
   444                 yield name
       
   445                 if size <= 65536:
       
   446                     chunks = (fp.read(size),)
       
   447                 else:
       
   448                     chunks = util.filechunkiter(fp, limit=size)
       
   449                 for chunk in chunks:
       
   450                     seen += len(chunk)
       
   451                     progress(_('bundle'), seen, total=totalfilesize,
       
   452                              unit=_('bytes'))
       
   453                     yield chunk
       
   454             finally:
       
   455                 fp.close()
       
   456     finally:
       
   457         progress(_('bundle'), None)
       
   458 
       
   459 def generatev2(repo):
       
   460     """Emit content for version 2 of a streaming clone.
       
   461 
       
   462     the data stream consists the following entries:
       
   463     1) A varint containing the length of the filename
       
   464     2) A varint containing the length of file data
       
   465     3) N bytes containing the filename (the internal, store-agnostic form)
       
   466     4) N bytes containing the file data
       
   467 
       
   468     Returns a 3-tuple of (file count, file size, data iterator).
       
   469     """
       
   470 
       
   471     with repo.lock():
       
   472 
       
   473         entries = []
       
   474         totalfilesize = 0
       
   475 
       
   476         repo.ui.debug('scanning\n')
       
   477         for name, ename, size in _walkstreamfiles(repo):
       
   478             if size:
       
   479                 entries.append((name, size))
       
   480                 totalfilesize += size
       
   481 
       
   482         chunks = _emit(repo, entries, totalfilesize)
       
   483 
       
   484     return len(entries), totalfilesize, chunks
       
   485 
       
   486 def consumev2(repo, fp, filecount, filesize):
       
   487     """Apply the contents from a version 2 streaming clone.
       
   488 
       
   489     Data is read from an object that only needs to provide a ``read(size)``
       
   490     method.
       
   491     """
       
   492     with repo.lock():
       
   493         repo.ui.status(_('%d files to transfer, %s of data\n') %
       
   494                        (filecount, util.bytecount(filesize)))
       
   495 
       
   496         start = util.timer()
       
   497         handledbytes = 0
       
   498         progress = repo.ui.progress
       
   499 
       
   500         progress(_('clone'), handledbytes, total=filesize, unit=_('bytes'))
       
   501 
       
   502         vfs = repo.svfs
       
   503 
       
   504         with repo.transaction('clone'):
       
   505             with vfs.backgroundclosing(repo.ui):
       
   506                 for i in range(filecount):
       
   507                     namelen = util.uvarintdecodestream(fp)
       
   508                     datalen = util.uvarintdecodestream(fp)
       
   509 
       
   510                     name = fp.read(namelen)
       
   511 
       
   512                     if repo.ui.debugflag:
       
   513                         repo.ui.debug('adding %s (%s)\n' %
       
   514                                       (name, util.bytecount(datalen)))
       
   515 
       
   516                     with vfs(name, 'w') as ofp:
       
   517                         for chunk in util.filechunkiter(fp, limit=datalen):
       
   518                             handledbytes += len(chunk)
       
   519                             progress(_('clone'), handledbytes, total=filesize,
       
   520                                      unit=_('bytes'))
       
   521                             ofp.write(chunk)
       
   522 
       
   523             # force @filecache properties to be reloaded from
       
   524             # streamclone-ed file at next access
       
   525             repo.invalidate(clearfilecache=True)
       
   526 
       
   527         elapsed = util.timer() - start
       
   528         if elapsed <= 0:
       
   529             elapsed = 0.001
       
   530         progress(_('clone'), None)
       
   531         repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
       
   532                        (util.bytecount(handledbytes), elapsed,
       
   533                         util.bytecount(handledbytes / elapsed)))
       
   534 
       
   535 def applybundlev2(repo, fp, filecount, filesize, requirements):
       
   536     missingreqs = [r for r in requirements if r not in repo.supported]
       
   537     if missingreqs:
       
   538         raise error.Abort(_('unable to apply stream clone: '
       
   539                             'unsupported format: %s') %
       
   540                           ', '.join(sorted(missingreqs)))
       
   541 
       
   542     consumev2(repo, fp, filecount, filesize)