# HG changeset patch # User Arseniy Alekseyev # Date 1685637562 -3600 # Node ID 0452af30480870e22b9b2f94ae2384e9ee812c6c # Parent 0bba730005dfd38a3b37806b6247c7cbe555af6e stream-clone: add a v3 version of the protocol This new version is less rigid regarding the extract number of files and number of bytes to be actually transfered, it also lays the groundwork for other improvements. The format stays experimental, but this is an interesting base to build upon. diff -r 0bba730005df -r 0452af304808 mercurial/bundle2.py --- a/mercurial/bundle2.py Thu Jun 01 18:20:28 2023 +0100 +++ b/mercurial/bundle2.py Thu Jun 01 17:39:22 2023 +0100 @@ -1952,14 +1952,12 @@ part.addparam(b'filecount', b'%d' % filecount, mandatory=True) part.addparam(b'requirements', requirements, mandatory=True) elif version == b"v3-exp": - filecount, bytecount, it = streamclone.generatev2( + it = streamclone.generatev3( repo, includepats, excludepats, includeobsmarkers ) requirements = streamclone.streamed_requirements(repo) requirements = _formatrequirementsspec(requirements) part = bundler.newpart(b'stream3-exp', data=it) - part.addparam(b'bytecount', b'%d' % bytecount, mandatory=True) - part.addparam(b'filecount', b'%d' % filecount, mandatory=True) part.addparam(b'requirements', requirements, mandatory=True) @@ -2616,9 +2614,18 @@ streamclone.applybundlev2(repo, part, filecount, bytecount, requirements) -@parthandler(b'stream3-exp', (b'requirements', b'filecount', b'bytecount')) +@parthandler(b'stream3-exp', (b'requirements',)) def handlestreamv3bundle(op, part): - return handlestreamv2bundle(op, part) + requirements = urlreq.unquote(part.params[b'requirements']) + requirements = requirements.split(b',') if requirements else [] + + repo = op.repo + if len(repo): + msg = _(b'cannot apply stream clone to non empty repository') + raise error.Abort(msg) + + repo.ui.debug(b'applying stream bundle\n') + streamclone.applybundlev3(repo, part, requirements) def widen_bundle( diff -r 0bba730005df -r 0452af304808 mercurial/store.py --- a/mercurial/store.py Thu Jun 01 18:20:28 2023 +0100 +++ b/mercurial/store.py Thu Jun 01 17:39:22 2023 +0100 @@ -490,6 +490,7 @@ vfs=None, copies=None, max_changeset=None, + preserve_file_count=False, ): """return a list of data stream associated to files for this entry @@ -599,6 +600,7 @@ vfs=None, copies=None, max_changeset=None, + preserve_file_count=False, ): if ( repo is None @@ -613,7 +615,18 @@ vfs=vfs, copies=copies, max_changeset=max_changeset, + preserve_file_count=preserve_file_count, ) + elif not preserve_file_count: + stream = [ + f.get_stream(vfs, copies) + for f in self.files() + if not f.unencoded_path.endswith((b'.i', b'.d')) + ] + rl = self.get_revlog_instance(repo).get_revlog() + rl_stream = rl.get_streams(max_changeset) + stream.extend(rl_stream) + return stream name_to_size = {} for f in self.files(): diff -r 0bba730005df -r 0452af304808 mercurial/streamclone.py --- a/mercurial/streamclone.py Thu Jun 01 18:20:28 2023 +0100 +++ b/mercurial/streamclone.py Thu Jun 01 17:39:22 2023 +0100 @@ -668,7 +668,11 @@ for src, vfs, e in entries: entry_streams = e.get_streams( - repo=repo, vfs=vfs, copies=copy, max_changeset=max_linkrev + repo=repo, + vfs=vfs, + copies=copy, + max_changeset=max_linkrev, + preserve_file_count=True, ) for name, stream, size in entry_streams: yield src @@ -691,6 +695,59 @@ raise error.Abort(msg % (bytecount, name, size)) +def _emit3(repo, entries): + """actually emit the stream bundle (v3)""" + vfsmap = _makemap(repo) + # we keep repo.vfs out of the map on purpose, ther are too many dangers + # there (eg: .hg/hgrc), + # + # this assert is duplicated (from _makemap) as authors might think this is + # fine, while this is really not fine. + if repo.vfs in vfsmap.values(): + raise error.ProgrammingError( + b'repo.vfs must not be added to vfsmap for security reasons' + ) + + # translate the vfs once + entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries] + total_entry_count = len(entries) + + max_linkrev = len(repo) + progress = repo.ui.makeprogress( + _(b'bundle'), + total=total_entry_count, + unit=_(b'entry'), + ) + progress.update(0) + with TempCopyManager() as copy, progress: + # create a copy of volatile files + for k, vfs, e in entries: + for f in e.files(): + f.file_size(vfs) # record the expected size under lock + if f.is_volatile: + copy(vfs.join(f.unencoded_path)) + # the first yield release the lock on the repository + yield None + + yield util.uvarintencode(total_entry_count) + + for src, vfs, e in entries: + entry_streams = e.get_streams( + repo=repo, + vfs=vfs, + copies=copy, + max_changeset=max_linkrev, + ) + yield util.uvarintencode(len(entry_streams)) + for name, stream, size in entry_streams: + yield src + yield util.uvarintencode(len(name)) + yield util.uvarintencode(size) + yield name + yield from stream + progress.increment() + + def _test_sync_point_walk_1(repo): """a function for synchronisation during tests""" @@ -766,7 +823,47 @@ def generatev3(repo, includes, excludes, includeobsmarkers): - return generatev2(repo, includes, excludes, includeobsmarkers) + """Emit content for version 3 of a streaming clone. + + the data stream consists the following: + 1) A varint E containing the number of entries (can be 0), then E entries follow + 2) For each entry: + 2.1) The number of files in this entry (can be 0, but typically 1 or 2) + 2.2) For each file: + 2.2.1) A char representing the file destination (eg: store or cache) + 2.2.2) A varint N containing the length of the filename + 2.2.3) A varint M containing the length of file data + 2.2.4) N bytes containing the filename (the internal, store-agnostic form) + 2.2.5) M bytes containing the file data + + Returns the data iterator. + + XXX This format is experimental and subject to change. Here is a + XXX non-exhaustive list of things this format could do or change: + + - making it easier to write files in parallel + - holding the lock for a shorter time + - improving progress information + - ways to adjust the number of expected entries/files ? + """ + + with repo.lock(): + + repo.ui.debug(b'scanning\n') + + entries = _entries_walk( + repo, + includes=includes, + excludes=excludes, + includeobsmarkers=includeobsmarkers, + ) + chunks = _emit3(repo, list(entries)) + first = next(chunks) + assert first is None + _test_sync_point_walk_1(repo) + _test_sync_point_walk_2(repo) + + return chunks @contextlib.contextmanager @@ -850,6 +947,80 @@ progress.complete() +def consumev3(repo, fp): + """Apply the contents from a version 3 streaming clone. + + Data is read from an object that only needs to provide a ``read(size)`` + method. + """ + with repo.lock(): + start = util.timer() + + entrycount = util.uvarintdecodestream(fp) + repo.ui.status(_(b'%d entries to transfer\n') % (entrycount)) + + progress = repo.ui.makeprogress( + _(b'clone'), + total=entrycount, + unit=_(b'entries'), + ) + progress.update(0) + bytes_transferred = 0 + + vfsmap = _makemap(repo) + # we keep repo.vfs out of the on purpose, there are too many dangers + # there (eg: .hg/hgrc), + # + # this assert is duplicated (from _makemap) as authors might think this + # is fine, while this is really not fine. + if repo.vfs in vfsmap.values(): + raise error.ProgrammingError( + b'repo.vfs must not be added to vfsmap for security reasons' + ) + + with repo.transaction(b'clone'): + ctxs = (vfs.backgroundclosing(repo.ui) for vfs in vfsmap.values()) + with nested(*ctxs): + + for i in range(entrycount): + filecount = util.uvarintdecodestream(fp) + if filecount == 0: + if repo.ui.debugflag: + repo.ui.debug(b'entry with no files [%d]\n' % (i)) + for i in range(filecount): + src = util.readexactly(fp, 1) + vfs = vfsmap[src] + namelen = util.uvarintdecodestream(fp) + datalen = util.uvarintdecodestream(fp) + + name = util.readexactly(fp, namelen) + + if repo.ui.debugflag: + msg = b'adding [%s] %s (%s)\n' + msg %= (src, name, util.bytecount(datalen)) + repo.ui.debug(msg) + bytes_transferred += datalen + + with vfs(name, b'w') as ofp: + for chunk in util.filechunkiter(fp, limit=datalen): + ofp.write(chunk) + progress.increment(step=1) + + # force @filecache properties to be reloaded from + # streamclone-ed file at next access + repo.invalidate(clearfilecache=True) + + elapsed = util.timer() - start + if elapsed <= 0: + elapsed = 0.001 + msg = _(b'transferred %s in %.1f seconds (%s/sec)\n') + byte_count = util.bytecount(bytes_transferred) + bytes_sec = util.bytecount(bytes_transferred / elapsed) + msg %= (byte_count, elapsed, bytes_sec) + repo.ui.status(msg) + progress.complete() + + def applybundlev2(repo, fp, filecount, filesize, requirements): from . import localrepo @@ -873,6 +1044,28 @@ nodemap.post_stream_cleanup(repo) +def applybundlev3(repo, fp, requirements): + from . import localrepo + + missingreqs = [r for r in requirements if r not in repo.supported] + if missingreqs: + msg = _(b'unable to apply stream clone: unsupported format: %s') + msg %= b', '.join(sorted(missingreqs)) + raise error.Abort(msg) + + consumev3(repo, fp) + + repo.requirements = new_stream_clone_requirements( + repo.requirements, + requirements, + ) + repo.svfs.options = localrepo.resolvestorevfsoptions( + repo.ui, repo.requirements, repo.features + ) + scmutil.writereporequirements(repo) + nodemap.post_stream_cleanup(repo) + + def _copy_files(src_vfs_map, dst_vfs_map, entries, progress): hardlink = [True] diff -r 0bba730005df -r 0452af304808 tests/test-clone-stream-revlog-split.t --- a/tests/test-clone-stream-revlog-split.t Thu Jun 01 18:20:28 2023 +0100 +++ b/tests/test-clone-stream-revlog-split.t Thu Jun 01 17:39:22 2023 +0100 @@ -100,10 +100,13 @@ sending getbundle command bundle2-input-bundle: with-transaction bundle2-input-part: "stream2" (params: 3 mandatory) supported (stream-bundle2-v2 !) - bundle2-input-part: "stream3-exp" (params: 3 mandatory) supported (stream-bundle2-v3 !) + bundle2-input-part: "stream3-exp" (params: 1 mandatory) supported (stream-bundle2-v3 !) applying stream bundle - 7 files to transfer, 2.11 KB of data - adding [s] data/some-file.i (1.23 KB) + 7 files to transfer, 2.11 KB of data (stream-bundle2-v2 !) + adding [s] data/some-file.i (1.23 KB) (stream-bundle2-v2 !) + 7 entries to transfer (stream-bundle2-v3 !) + adding [s] data/some-file.d (1.04 KB) (stream-bundle2-v3 !) + adding [s] data/some-file.i (192 bytes) (stream-bundle2-v3 !) adding [s] phaseroots (43 bytes) adding [s] 00manifest.i (348 bytes) adding [s] 00changelog.i (381 bytes) @@ -112,7 +115,8 @@ adding [c] rbc-revs-v1 (24 bytes) updating the branch cache transferred 2.11 KB in * seconds (* */sec) (glob) - bundle2-input-part: total payload size 2268 + bundle2-input-part: total payload size 2268 (stream-bundle2-v2 !) + bundle2-input-part: total payload size 2296 (stream-bundle2-v3 !) bundle2-input-part: "listkeys" (params: 1 mandatory) supported bundle2-input-bundle: 2 parts total checking for updated bookmarks diff -r 0bba730005df -r 0452af304808 tests/test-clone-stream.t --- a/tests/test-clone-stream.t Thu Jun 01 18:20:28 2023 +0100 +++ b/tests/test-clone-stream.t Thu Jun 01 17:39:22 2023 +0100 @@ -360,9 +360,8 @@ #if stream-bundle2-v3 $ hg clone --stream -U http://localhost:$HGPORT clone1 streaming all changes - 1093 files to transfer, 102 KB of data (no-zstd !) + 1093 entries to transfer transferred 102 KB in * seconds (* */sec) (glob) (no-zstd !) - 1093 files to transfer, 98.9 KB of data (zstd !) transferred 98.9 KB in * seconds (* */sec) (glob) (zstd !) $ ls -1 clone1/.hg/cache @@ -492,9 +491,8 @@ #if stream-bundle2-v3 $ hg clone --uncompressed -U http://localhost:$HGPORT clone1-uncompressed streaming all changes - 1093 files to transfer, 102 KB of data (no-zstd !) + 1093 entries to transfer transferred 102 KB in * seconds (* */sec) (glob) (no-zstd !) - 1093 files to transfer, 98.9 KB of data (zstd !) transferred 98.9 KB in * seconds (* */sec) (glob) (zstd !) #endif @@ -564,18 +562,17 @@ streaming all changes sending getbundle command bundle2-input-bundle: with-transaction - bundle2-input-part: "stream3-exp" (params: 3 mandatory) supported + bundle2-input-part: "stream3-exp" (params: 1 mandatory) supported applying stream bundle - 1093 files to transfer, 102 KB of data (no-zstd !) - 1093 files to transfer, 98.9 KB of data (zstd !) + 1093 entries to transfer starting 4 threads for background file closing starting 4 threads for background file closing updating the branch cache transferred 102 KB in * seconds (* */sec) (glob) (no-zstd !) - bundle2-input-part: total payload size 118984 (no-zstd !) + bundle2-input-part: total payload size 120079 (no-zstd !) transferred 98.9 KB in * seconds (* */sec) (glob) (zstd !) - bundle2-input-part: total payload size 116145 (zstd no-bigendian !) - bundle2-input-part: total payload size 116140 (zstd bigendian !) + bundle2-input-part: total payload size 117240 (zstd no-bigendian !) + bundle2-input-part: total payload size 116138 (zstd bigendian !) bundle2-input-part: "listkeys" (params: 1 mandatory) supported bundle2-input-bundle: 2 parts total checking for updated bookmarks @@ -625,9 +622,8 @@ #if stream-bundle2-v3 $ hg clone --stream -U http://localhost:$HGPORT secret-allowed streaming all changes - 1093 files to transfer, 102 KB of data (no-zstd !) + 1093 entries to transfer transferred 102 KB in * seconds (* */sec) (glob) (no-zstd !) - 1093 files to transfer, 98.9 KB of data (zstd !) transferred 98.9 KB in * seconds (* */sec) (glob) (zstd !) #endif @@ -755,9 +751,8 @@ #if stream-bundle2-v3 $ hg clone --stream http://localhost:$HGPORT with-bookmarks streaming all changes - 1096 files to transfer, 102 KB of data (no-zstd !) + 1096 entries to transfer transferred 102 KB in * seconds (* */sec) (glob) (no-zstd !) - 1096 files to transfer, 99.1 KB of data (zstd !) transferred 99.1 KB in * seconds (* */sec) (glob) (zstd !) updating to branch default 1088 files updated, 0 files merged, 0 files removed, 0 files unresolved @@ -801,9 +796,8 @@ #if stream-bundle2-v3 $ hg clone --stream http://localhost:$HGPORT phase-publish streaming all changes - 1096 files to transfer, 102 KB of data (no-zstd !) + 1096 entries to transfer transferred 102 KB in * seconds (* */sec) (glob) (no-zstd !) - 1096 files to transfer, 99.1 KB of data (zstd !) transferred 99.1 KB in * seconds (* */sec) (glob) (zstd !) updating to branch default 1088 files updated, 0 files merged, 0 files removed, 0 files unresolved @@ -861,9 +855,8 @@ #if stream-bundle2-v3 $ hg clone --stream http://localhost:$HGPORT phase-no-publish streaming all changes - 1097 files to transfer, 102 KB of data (no-zstd !) + 1097 entries to transfer transferred 102 KB in * seconds (* */sec) (glob) (no-zstd !) - 1097 files to transfer, 99.1 KB of data (zstd !) transferred 99.1 KB in * seconds (* */sec) (glob) (zstd !) updating to branch default 1088 files updated, 0 files merged, 0 files removed, 0 files unresolved @@ -961,9 +954,8 @@ $ hg clone -U --stream http://localhost:$HGPORT with-obsolescence streaming all changes - 1098 files to transfer, 102 KB of data (no-zstd !) + 1098 entries to transfer transferred 102 KB in * seconds (* */sec) (glob) (no-zstd !) - 1098 files to transfer, 99.5 KB of data (zstd !) transferred 99.5 KB in * seconds (* */sec) (glob) (zstd !) $ hg -R with-obsolescence log -T '{rev}: {phase}\n' 2: draft diff -r 0bba730005df -r 0452af304808 tests/test-stream-bundle-v2.t --- a/tests/test-stream-bundle-v2.t Thu Jun 01 18:20:28 2023 +0100 +++ b/tests/test-stream-bundle-v2.t Thu Jun 01 17:39:22 2023 +0100 @@ -63,9 +63,9 @@ stream2 -- {bytecount: 1693, filecount: 11, requirements: generaldelta%2Crevlogv1%2Csparserevlog} (mandatory: True) (stream-v2 no-zstd !) stream2 -- {bytecount: 1693, filecount: 11, requirements: generaldelta%2Crevlog-compression-zstd%2Crevlogv1%2Csparserevlog} (mandatory: True) (stream-v2 zstd no-rust !) stream2 -- {bytecount: 1693, filecount: 11, requirements: generaldelta%2Crevlog-compression-zstd%2Crevlogv1%2Csparserevlog} (mandatory: True) (stream-v2 rust !) - stream3-exp -- {bytecount: 1693, filecount: 11, requirements: generaldelta%2Crevlogv1%2Csparserevlog} (mandatory: True) (stream-v3 no-zstd !) - stream3-exp -- {bytecount: 1693, filecount: 11, requirements: generaldelta%2Crevlog-compression-zstd%2Crevlogv1%2Csparserevlog} (mandatory: True) (stream-v3 zstd no-rust !) - stream3-exp -- {bytecount: 1693, filecount: 11, requirements: generaldelta%2Crevlog-compression-zstd%2Crevlogv1%2Csparserevlog} (mandatory: True) (stream-v3 rust !) + stream3-exp -- {requirements: generaldelta%2Crevlogv1%2Csparserevlog} (mandatory: True) (stream-v3 no-zstd !) + stream3-exp -- {requirements: generaldelta%2Crevlog-compression-zstd%2Crevlogv1%2Csparserevlog} (mandatory: True) (stream-v3 zstd no-rust !) + stream3-exp -- {requirements: generaldelta%2Crevlog-compression-zstd%2Crevlogv1%2Csparserevlog} (mandatory: True) (stream-v3 rust !) $ hg debugbundle --spec bundle.hg none-v2;stream=v2;requirements%3Dgeneraldelta%2Crevlogv1%2Csparserevlog (stream-v2 no-zstd !) none-v2;stream=v2;requirements%3Dgeneraldelta%2Crevlog-compression-zstd%2Crevlogv1%2Csparserevlog (stream-v2 zstd no-rust !) @@ -94,9 +94,10 @@ applying clone bundle from http://localhost:$HGPORT1/bundle.hg bundle2-input-bundle: with-transaction bundle2-input-part: "stream2" (params: 3 mandatory) supported (stream-v2 !) - bundle2-input-part: "stream3-exp" (params: 3 mandatory) supported (stream-v3 !) + bundle2-input-part: "stream3-exp" (params: 1 mandatory) supported (stream-v3 !) applying stream bundle - 11 files to transfer, 1.65 KB of data + 11 files to transfer, 1.65 KB of data (stream-v2 !) + 11 entries to transfer (stream-v3 !) starting 4 threads for background file closing (?) starting 4 threads for background file closing (?) adding [s] data/A.i (66 bytes) @@ -111,7 +112,8 @@ adding [c] rbc-names-v1 (7 bytes) adding [c] rbc-revs-v1 (40 bytes) transferred 1.65 KB in * seconds (* */sec) (glob) - bundle2-input-part: total payload size 1840 + bundle2-input-part: total payload size 1840 (stream-v2 !) + bundle2-input-part: total payload size 1852 (stream-v3 !) bundle2-input-bundle: 1 parts total updating the branch cache finished applying clone bundle @@ -152,9 +154,10 @@ applying clone bundle from http://localhost:$HGPORT1/bundle.hg bundle2-input-bundle: with-transaction bundle2-input-part: "stream2" (params: 3 mandatory) supported (stream-v2 !) - bundle2-input-part: "stream3-exp" (params: 3 mandatory) supported (stream-v3 !) + bundle2-input-part: "stream3-exp" (params: 1 mandatory) supported (stream-v3 !) applying stream bundle - 11 files to transfer, 1.65 KB of data + 11 files to transfer, 1.65 KB of data (stream-v2 !) + 11 entries to transfer (stream-v3 !) starting 4 threads for background file closing (?) starting 4 threads for background file closing (?) adding [s] data/A.i (66 bytes) @@ -169,7 +172,8 @@ adding [c] rbc-names-v1 (7 bytes) adding [c] rbc-revs-v1 (40 bytes) transferred 1.65 KB in * seconds (* */sec) (glob) - bundle2-input-part: total payload size 1840 + bundle2-input-part: total payload size 1840 (stream-v2 !) + bundle2-input-part: total payload size 1852 (stream-v3 !) bundle2-input-bundle: 1 parts total updating the branch cache finished applying clone bundle