# HG changeset patch # User Boris Feld # Date 1516233012 -3600 # Node ID 5f5fb279fd394c117fb64a95ad7fd1e4db72951d # Parent 72fdd99eb5265e4bf5c3af961eb8288d676e7946 streamclone: also stream caches to the client When stream clone is used over bundle2, relevant cache files are also streamed. This is expected to be a massive performance win for clone since no important cache will have to be recomputed. Some performance numbers: (All times are wall-clock times in seconds, 2 attempts per case.) # Mozilla-Central ## Clone over ssh over lan V1 streaming: 234.3 239.6 V2 streaming: 248.4 243.7 ## Clone over ssh over Internet V1 streaming: 175.5 110.9 V2 streaming: 109.1 111.0 ## Clone over HTTP over lan V1 streaming: 105.3 105.6 V2 streaming: 112.7 111.4 ## Clone over HTTP over internet V1 streaming: 105.6 114.6 V2 streaming: 226.7 225.9 ## Hg tags V1 streaming (no cache): 1.084 1.071 V2 streaming (cache): 0.312 0.325 ## Hg branches V1 streaming (no cache): 14.047 14.148 V2 streaming (with cache): 0.312 0.333 # Pypy ## Clone over ssh over internet V1 streaming: 29.4 30.1 V2 streaming: 31.2 30.1 ## Clone over http over internet V1 streaming: 29.7 29.7 V2 streaming: 75.2 72.9 (since ssh and lan are not affected, there seems to be an issue with how we read/write the http stream on connection with latency, unrelated to the format) ## Hg tags V1 streaming (no cache): 1.752 1.664 V2 streaming (with cache): 0.274 0.260 ## Hg branches V1 streaming (no cache): 4.469 4.728 V2 streaming (with cache): 0.318 0.321 # Private repository: * 500K revision revisions * 11K topological heads * 28K branch heads ## hg tags no cache: 1543.332 with cache: 4.900 ## hg branches no cache: 91.828 with cache: 2.955 diff -r 72fdd99eb526 -r 5f5fb279fd39 mercurial/streamclone.py --- a/mercurial/streamclone.py Wed Jan 17 17:46:49 2018 +0100 +++ b/mercurial/streamclone.py Thu Jan 18 00:50:12 2018 +0100 @@ -11,10 +11,12 @@ import os import struct import tempfile +import warnings from .i18n import _ from . import ( branchmap, + cacheutil, error, phases, store, @@ -435,6 +437,10 @@ _fileappend = 0 # append only file _filefull = 1 # full snapshot file +# Source of the file +_srcstore = 's' # store (svfs) +_srccache = 'c' # cache (cache) + # This is it's own function so extensions can override it. def _walkstreamfullstorefiles(repo): """list snapshot file from the store""" @@ -443,12 +449,12 @@ fnames.append('phaseroots') return fnames -def _filterfull(entry, copy, vfs): +def _filterfull(entry, copy, vfsmap): """actually copy the snapshot files""" - name, ftype, data = entry + src, name, ftype, data = entry if ftype != _filefull: return entry - return (name, ftype, copy(vfs.join(name))) + return (src, name, ftype, copy(vfsmap[src].join(name))) @contextlib.contextmanager def maketempcopies(): @@ -466,19 +472,33 @@ for tmp in files: util.tryunlink(tmp) +def _makemap(repo): + """make a (src -> vfs) map for the repo""" + vfsmap = { + _srcstore: repo.svfs, + _srccache: repo.cachevfs, + } + # we keep repo.vfs out of the on purpose, ther are too many danger there + # (eg: .hg/hgrc) + assert repo.vfs not in vfsmap.values() + + return vfsmap + def _emit(repo, entries, totalfilesize): """actually emit the stream bundle""" - vfs = repo.svfs + vfsmap = _makemap(repo) progress = repo.ui.progress progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes')) with maketempcopies() as copy: try: # copy is delayed until we are in the try - entries = [_filterfull(e, copy, vfs) for e in entries] + entries = [_filterfull(e, copy, vfsmap) for e in entries] yield None # this release the lock on the repository seen = 0 - for name, ftype, data in entries: + for src, name, ftype, data in entries: + vfs = vfsmap[src] + yield src yield util.uvarintencode(len(name)) if ftype == _fileappend: fp = vfs(name) @@ -507,10 +527,11 @@ """Emit content for version 2 of a streaming clone. the data stream consists the following entries: - 1) A varint containing the length of the filename - 2) A varint containing the length of file data - 3) N bytes containing the filename (the internal, store-agnostic form) - 4) N bytes containing the file data + 1) A char representing the file destination (eg: store or cache) + 2) A varint containing the length of the filename + 3) A varint containing the length of file data + 4) N bytes containing the filename (the internal, store-agnostic form) + 5) N bytes containing the file data Returns a 3-tuple of (file count, file size, data iterator). """ @@ -523,12 +544,16 @@ repo.ui.debug('scanning\n') for name, ename, size in _walkstreamfiles(repo): if size: - entries.append((name, _fileappend, size)) + entries.append((_srcstore, name, _fileappend, size)) totalfilesize += size for name in _walkstreamfullstorefiles(repo): if repo.svfs.exists(name): totalfilesize += repo.svfs.lstat(name).st_size - entries.append((name, _filefull, None)) + entries.append((_srcstore, name, _filefull, None)) + for name in cacheutil.cachetocopy(repo): + if repo.cachevfs.exists(name): + totalfilesize += repo.cachevfs.lstat(name).st_size + entries.append((_srccache, name, _filefull, None)) chunks = _emit(repo, entries, totalfilesize) first = next(chunks) @@ -536,6 +561,16 @@ return len(entries), totalfilesize, chunks +@contextlib.contextmanager +def nested(*ctxs): + with warnings.catch_warnings(): + # For some reason, Python decided 'nested' was deprecated without + # replacement. They officially advertised for filtering the deprecation + # warning for people who actually need the feature. + warnings.filterwarnings("ignore",category=DeprecationWarning) + with contextlib.nested(*ctxs): + yield + def consumev2(repo, fp, filecount, filesize): """Apply the contents from a version 2 streaming clone. @@ -552,19 +587,23 @@ progress(_('clone'), handledbytes, total=filesize, unit=_('bytes')) - vfs = repo.svfs + vfsmap = _makemap(repo) with repo.transaction('clone'): - with vfs.backgroundclosing(repo.ui): + ctxs = (vfs.backgroundclosing(repo.ui) + for vfs in vfsmap.values()) + with nested(*ctxs): for i in range(filecount): + src = fp.read(1) + vfs = vfsmap[src] namelen = util.uvarintdecodestream(fp) datalen = util.uvarintdecodestream(fp) name = fp.read(namelen) if repo.ui.debugflag: - repo.ui.debug('adding %s (%s)\n' % - (name, util.bytecount(datalen))) + repo.ui.debug('adding [%s] %s (%s)\n' % + (src, name, util.bytecount(datalen))) with vfs(name, 'w') as ofp: for chunk in util.filechunkiter(fp, limit=datalen): diff -r 72fdd99eb526 -r 5f5fb279fd39 tests/test-clone-uncompressed.t --- a/tests/test-clone-uncompressed.t Wed Jan 17 17:46:49 2018 +0100 +++ b/tests/test-clone-uncompressed.t Thu Jan 18 00:50:12 2018 +0100 @@ -38,8 +38,13 @@ #if stream-bundle2 $ hg clone --stream -U http://localhost:$HGPORT clone1 streaming all changes - 1027 files to transfer, 96.3 KB of data - transferred 96.3 KB in * seconds (* */sec) (glob) + 1030 files to transfer, 96.4 KB of data + transferred 96.4 KB in * seconds (* */sec) (glob) + + $ ls -1 clone1/.hg/cache + branch2-served + rbc-names-v1 + rbc-revs-v1 #endif --uncompressed is an alias to --stream @@ -55,8 +60,8 @@ #if stream-bundle2 $ hg clone --uncompressed -U http://localhost:$HGPORT clone1-uncompressed streaming all changes - 1027 files to transfer, 96.3 KB of data - transferred 96.3 KB in * seconds (* */sec) (glob) + 1030 files to transfer, 96.4 KB of data + transferred 96.4 KB in * seconds (* */sec) (glob) #endif Clone with background file closing enabled @@ -95,10 +100,11 @@ bundle2-input-bundle: with-transaction bundle2-input-part: "stream" (params: 4 mandatory) supported applying stream bundle - 1027 files to transfer, 96.3 KB of data + 1030 files to transfer, 96.4 KB of data + starting 4 threads for background file closing starting 4 threads for background file closing - transferred 96.3 KB in * seconds (* */sec) (glob) - bundle2-input-part: total payload size 110887 + transferred 96.4 KB in * seconds (* */sec) (glob) + bundle2-input-part: total payload size 112077 bundle2-input-part: "listkeys" (params: 1 mandatory) supported bundle2-input-bundle: 1 parts total checking for updated bookmarks @@ -136,8 +142,8 @@ #if stream-bundle2 $ hg clone --stream -U http://localhost:$HGPORT secret-allowed streaming all changes - 1027 files to transfer, 96.3 KB of data - transferred 96.3 KB in * seconds (* */sec) (glob) + 1030 files to transfer, 96.4 KB of data + transferred 96.4 KB in * seconds (* */sec) (glob) #endif $ killdaemons.py @@ -253,8 +259,8 @@ #if stream-bundle2 $ hg clone --stream http://localhost:$HGPORT with-bookmarks streaming all changes - 1027 files to transfer, 96.3 KB of data - transferred 96.3 KB in * seconds (* */sec) (glob) + 1033 files to transfer, 96.6 KB of data + transferred 96.6 KB in * seconds (* */sec) (glob) updating to branch default 1025 files updated, 0 files merged, 0 files removed, 0 files unresolved #endif @@ -283,8 +289,8 @@ #if stream-bundle2 $ hg clone --stream http://localhost:$HGPORT phase-publish streaming all changes - 1027 files to transfer, 96.3 KB of data - transferred 96.3 KB in * seconds (* */sec) (glob) + 1033 files to transfer, 96.6 KB of data + transferred 96.6 KB in * seconds (* */sec) (glob) updating to branch default 1025 files updated, 0 files merged, 0 files removed, 0 files unresolved #endif @@ -318,8 +324,8 @@ #if stream-bundle2 $ hg clone --stream http://localhost:$HGPORT phase-no-publish streaming all changes - 1028 files to transfer, 96.4 KB of data - transferred 96.4 KB in * seconds (* */sec) (glob) + 1034 files to transfer, 96.7 KB of data + transferred 96.7 KB in * seconds (* */sec) (glob) updating to branch default 1025 files updated, 0 files merged, 0 files removed, 0 files unresolved $ hg -R phase-no-publish phase -r 'all()'