mercurial/streamclone.py
changeset 35767 5f5fb279fd39
parent 35765 56c30b31afbe
child 35802 bbc07357b567
equal deleted inserted replaced
35766:72fdd99eb526 35767:5f5fb279fd39
     9 
     9 
    10 import contextlib
    10 import contextlib
    11 import os
    11 import os
    12 import struct
    12 import struct
    13 import tempfile
    13 import tempfile
       
    14 import warnings
    14 
    15 
    15 from .i18n import _
    16 from .i18n import _
    16 from . import (
    17 from . import (
    17     branchmap,
    18     branchmap,
       
    19     cacheutil,
    18     error,
    20     error,
    19     phases,
    21     phases,
    20     store,
    22     store,
    21     util,
    23     util,
    22 )
    24 )
   433 
   435 
   434 # type of file to stream
   436 # type of file to stream
   435 _fileappend = 0 # append only file
   437 _fileappend = 0 # append only file
   436 _filefull = 1   # full snapshot file
   438 _filefull = 1   # full snapshot file
   437 
   439 
       
   440 # Source of the file
       
   441 _srcstore = 's' # store (svfs)
       
   442 _srccache = 'c' # cache (cache)
       
   443 
   438 # This is it's own function so extensions can override it.
   444 # This is it's own function so extensions can override it.
   439 def _walkstreamfullstorefiles(repo):
   445 def _walkstreamfullstorefiles(repo):
   440     """list snapshot file from the store"""
   446     """list snapshot file from the store"""
   441     fnames = []
   447     fnames = []
   442     if not repo.publishing():
   448     if not repo.publishing():
   443         fnames.append('phaseroots')
   449         fnames.append('phaseroots')
   444     return fnames
   450     return fnames
   445 
   451 
   446 def _filterfull(entry, copy, vfs):
   452 def _filterfull(entry, copy, vfsmap):
   447     """actually copy the snapshot files"""
   453     """actually copy the snapshot files"""
   448     name, ftype, data = entry
   454     src, name, ftype, data = entry
   449     if ftype != _filefull:
   455     if ftype != _filefull:
   450         return entry
   456         return entry
   451     return (name, ftype, copy(vfs.join(name)))
   457     return (src, name, ftype, copy(vfsmap[src].join(name)))
   452 
   458 
   453 @contextlib.contextmanager
   459 @contextlib.contextmanager
   454 def maketempcopies():
   460 def maketempcopies():
   455     """return a function to temporary copy file"""
   461     """return a function to temporary copy file"""
   456     files = []
   462     files = []
   464         yield copy
   470         yield copy
   465     finally:
   471     finally:
   466         for tmp in files:
   472         for tmp in files:
   467             util.tryunlink(tmp)
   473             util.tryunlink(tmp)
   468 
   474 
       
   475 def _makemap(repo):
       
   476     """make a (src -> vfs) map for the repo"""
       
   477     vfsmap = {
       
   478         _srcstore: repo.svfs,
       
   479         _srccache: repo.cachevfs,
       
   480     }
       
   481     # we keep repo.vfs out of the on purpose, ther are too many danger there
       
   482     # (eg: .hg/hgrc)
       
   483     assert repo.vfs not in vfsmap.values()
       
   484 
       
   485     return vfsmap
       
   486 
   469 def _emit(repo, entries, totalfilesize):
   487 def _emit(repo, entries, totalfilesize):
   470     """actually emit the stream bundle"""
   488     """actually emit the stream bundle"""
   471     vfs = repo.svfs
   489     vfsmap = _makemap(repo)
   472     progress = repo.ui.progress
   490     progress = repo.ui.progress
   473     progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes'))
   491     progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes'))
   474     with maketempcopies() as copy:
   492     with maketempcopies() as copy:
   475         try:
   493         try:
   476             # copy is delayed until we are in the try
   494             # copy is delayed until we are in the try
   477             entries = [_filterfull(e, copy, vfs) for e in entries]
   495             entries = [_filterfull(e, copy, vfsmap) for e in entries]
   478             yield None # this release the lock on the repository
   496             yield None # this release the lock on the repository
   479             seen = 0
   497             seen = 0
   480 
   498 
   481             for name, ftype, data in entries:
   499             for src, name, ftype, data in entries:
       
   500                 vfs = vfsmap[src]
       
   501                 yield src
   482                 yield util.uvarintencode(len(name))
   502                 yield util.uvarintencode(len(name))
   483                 if ftype == _fileappend:
   503                 if ftype == _fileappend:
   484                     fp = vfs(name)
   504                     fp = vfs(name)
   485                     size = data
   505                     size = data
   486                 elif ftype == _filefull:
   506                 elif ftype == _filefull:
   505 
   525 
   506 def generatev2(repo):
   526 def generatev2(repo):
   507     """Emit content for version 2 of a streaming clone.
   527     """Emit content for version 2 of a streaming clone.
   508 
   528 
   509     the data stream consists the following entries:
   529     the data stream consists the following entries:
   510     1) A varint containing the length of the filename
   530     1) A char representing the file destination (eg: store or cache)
   511     2) A varint containing the length of file data
   531     2) A varint containing the length of the filename
   512     3) N bytes containing the filename (the internal, store-agnostic form)
   532     3) A varint containing the length of file data
   513     4) N bytes containing the file data
   533     4) N bytes containing the filename (the internal, store-agnostic form)
       
   534     5) N bytes containing the file data
   514 
   535 
   515     Returns a 3-tuple of (file count, file size, data iterator).
   536     Returns a 3-tuple of (file count, file size, data iterator).
   516     """
   537     """
   517 
   538 
   518     with repo.lock():
   539     with repo.lock():
   521         totalfilesize = 0
   542         totalfilesize = 0
   522 
   543 
   523         repo.ui.debug('scanning\n')
   544         repo.ui.debug('scanning\n')
   524         for name, ename, size in _walkstreamfiles(repo):
   545         for name, ename, size in _walkstreamfiles(repo):
   525             if size:
   546             if size:
   526                 entries.append((name, _fileappend, size))
   547                 entries.append((_srcstore, name, _fileappend, size))
   527                 totalfilesize += size
   548                 totalfilesize += size
   528         for name in _walkstreamfullstorefiles(repo):
   549         for name in _walkstreamfullstorefiles(repo):
   529             if repo.svfs.exists(name):
   550             if repo.svfs.exists(name):
   530                 totalfilesize += repo.svfs.lstat(name).st_size
   551                 totalfilesize += repo.svfs.lstat(name).st_size
   531                 entries.append((name, _filefull, None))
   552                 entries.append((_srcstore, name, _filefull, None))
       
   553         for name in cacheutil.cachetocopy(repo):
       
   554             if repo.cachevfs.exists(name):
       
   555                 totalfilesize += repo.cachevfs.lstat(name).st_size
       
   556                 entries.append((_srccache, name, _filefull, None))
   532 
   557 
   533         chunks = _emit(repo, entries, totalfilesize)
   558         chunks = _emit(repo, entries, totalfilesize)
   534         first = next(chunks)
   559         first = next(chunks)
   535         assert first is None
   560         assert first is None
   536 
   561 
   537     return len(entries), totalfilesize, chunks
   562     return len(entries), totalfilesize, chunks
   538 
   563 
       
   564 @contextlib.contextmanager
       
   565 def nested(*ctxs):
       
   566     with warnings.catch_warnings():
       
   567         # For some reason, Python decided 'nested' was deprecated without
       
   568         # replacement. They officially advertised for filtering the deprecation
       
   569         # warning for people who actually need the feature.
       
   570         warnings.filterwarnings("ignore",category=DeprecationWarning)
       
   571         with contextlib.nested(*ctxs):
       
   572             yield
       
   573 
   539 def consumev2(repo, fp, filecount, filesize):
   574 def consumev2(repo, fp, filecount, filesize):
   540     """Apply the contents from a version 2 streaming clone.
   575     """Apply the contents from a version 2 streaming clone.
   541 
   576 
   542     Data is read from an object that only needs to provide a ``read(size)``
   577     Data is read from an object that only needs to provide a ``read(size)``
   543     method.
   578     method.
   550         handledbytes = 0
   585         handledbytes = 0
   551         progress = repo.ui.progress
   586         progress = repo.ui.progress
   552 
   587 
   553         progress(_('clone'), handledbytes, total=filesize, unit=_('bytes'))
   588         progress(_('clone'), handledbytes, total=filesize, unit=_('bytes'))
   554 
   589 
   555         vfs = repo.svfs
   590         vfsmap = _makemap(repo)
   556 
   591 
   557         with repo.transaction('clone'):
   592         with repo.transaction('clone'):
   558             with vfs.backgroundclosing(repo.ui):
   593             ctxs = (vfs.backgroundclosing(repo.ui)
       
   594                     for vfs in vfsmap.values())
       
   595             with nested(*ctxs):
   559                 for i in range(filecount):
   596                 for i in range(filecount):
       
   597                     src = fp.read(1)
       
   598                     vfs = vfsmap[src]
   560                     namelen = util.uvarintdecodestream(fp)
   599                     namelen = util.uvarintdecodestream(fp)
   561                     datalen = util.uvarintdecodestream(fp)
   600                     datalen = util.uvarintdecodestream(fp)
   562 
   601 
   563                     name = fp.read(namelen)
   602                     name = fp.read(namelen)
   564 
   603 
   565                     if repo.ui.debugflag:
   604                     if repo.ui.debugflag:
   566                         repo.ui.debug('adding %s (%s)\n' %
   605                         repo.ui.debug('adding [%s] %s (%s)\n' %
   567                                       (name, util.bytecount(datalen)))
   606                                       (src, name, util.bytecount(datalen)))
   568 
   607 
   569                     with vfs(name, 'w') as ofp:
   608                     with vfs(name, 'w') as ofp:
   570                         for chunk in util.filechunkiter(fp, limit=datalen):
   609                         for chunk in util.filechunkiter(fp, limit=datalen):
   571                             handledbytes += len(chunk)
   610                             handledbytes += len(chunk)
   572                             progress(_('clone'), handledbytes, total=filesize,
   611                             progress(_('clone'), handledbytes, total=filesize,