--- a/mercurial/streamclone.py Fri Jan 19 22:52:35 2018 +0100
+++ b/mercurial/streamclone.py Thu Jan 18 00:48:56 2018 +0100
@@ -428,3 +428,115 @@
def apply(self, repo):
return applybundlev1(repo, self._fh)
+
+def _emit(repo, entries, totalfilesize):
+ """actually emit the stream bundle"""
+ progress = repo.ui.progress
+ progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes'))
+ vfs = repo.svfs
+ try:
+ seen = 0
+ for name, size in entries:
+ yield util.uvarintencode(len(name))
+ fp = vfs(name)
+ try:
+ yield util.uvarintencode(size)
+ yield name
+ if size <= 65536:
+ chunks = (fp.read(size),)
+ else:
+ chunks = util.filechunkiter(fp, limit=size)
+ for chunk in chunks:
+ seen += len(chunk)
+ progress(_('bundle'), seen, total=totalfilesize,
+ unit=_('bytes'))
+ yield chunk
+ finally:
+ fp.close()
+ finally:
+ progress(_('bundle'), None)
+
+def generatev2(repo):
+ """Emit content for version 2 of a streaming clone.
+
+ the data stream consists the following entries:
+ 1) A varint containing the length of the filename
+ 2) A varint containing the length of file data
+ 3) N bytes containing the filename (the internal, store-agnostic form)
+ 4) N bytes containing the file data
+
+ Returns a 3-tuple of (file count, file size, data iterator).
+ """
+
+ with repo.lock():
+
+ entries = []
+ totalfilesize = 0
+
+ repo.ui.debug('scanning\n')
+ for name, ename, size in _walkstreamfiles(repo):
+ if size:
+ entries.append((name, size))
+ totalfilesize += size
+
+ chunks = _emit(repo, entries, totalfilesize)
+
+ return len(entries), totalfilesize, chunks
+
+def consumev2(repo, fp, filecount, filesize):
+ """Apply the contents from a version 2 streaming clone.
+
+ Data is read from an object that only needs to provide a ``read(size)``
+ method.
+ """
+ with repo.lock():
+ repo.ui.status(_('%d files to transfer, %s of data\n') %
+ (filecount, util.bytecount(filesize)))
+
+ start = util.timer()
+ handledbytes = 0
+ progress = repo.ui.progress
+
+ progress(_('clone'), handledbytes, total=filesize, unit=_('bytes'))
+
+ vfs = repo.svfs
+
+ with repo.transaction('clone'):
+ with vfs.backgroundclosing(repo.ui):
+ for i in range(filecount):
+ namelen = util.uvarintdecodestream(fp)
+ datalen = util.uvarintdecodestream(fp)
+
+ name = fp.read(namelen)
+
+ if repo.ui.debugflag:
+ repo.ui.debug('adding %s (%s)\n' %
+ (name, util.bytecount(datalen)))
+
+ with vfs(name, 'w') as ofp:
+ for chunk in util.filechunkiter(fp, limit=datalen):
+ handledbytes += len(chunk)
+ progress(_('clone'), handledbytes, total=filesize,
+ unit=_('bytes'))
+ ofp.write(chunk)
+
+ # force @filecache properties to be reloaded from
+ # streamclone-ed file at next access
+ repo.invalidate(clearfilecache=True)
+
+ elapsed = util.timer() - start
+ if elapsed <= 0:
+ elapsed = 0.001
+ progress(_('clone'), None)
+ repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
+ (util.bytecount(handledbytes), elapsed,
+ util.bytecount(handledbytes / elapsed)))
+
+def applybundlev2(repo, fp, filecount, filesize, requirements):
+ missingreqs = [r for r in requirements if r not in repo.supported]
+ if missingreqs:
+ raise error.Abort(_('unable to apply stream clone: '
+ 'unsupported format: %s') %
+ ', '.join(sorted(missingreqs)))
+
+ consumev2(repo, fp, filecount, filesize)