Mercurial > hg
changeset 35756:cfdccd560b66
streamclone: define first iteration of version 2 of stream format
(This patch is based on a first draft from Gregory Szorc, with deeper rework)
Version 1 of the stream clone format was invented many years ago and suffers
from a few deficiencies:
1) Filenames are stored in store-encoded (on filesystem) form rather than in
their internal form. This makes future compatibility with new store
filename encodings more difficult.
2) File entry "headers" consist of a newline of the file name followed by the
string file size. Converting strings to integers is avoidable overhead. We
can't store filenames with newlines (manifests have this limitation as
well, so it isn't a major concern). But the big concern here is the
necessity for readline(). Scanning for newlines means reading ahead and
that means extra buffer allocations and slicing (in Python) and this makes
performance suffer.
3) Filenames aren't compressed optimally. Filenames should be compressed well
since there is a lot of repeated data. However, since they are scattered
all over the stream (with revlog data in between), they typically fall
outside the window size of the compressor and don't compress.
4) It can only exchange stored based content, being able to exchange caches
too would be nice.
5) It is limited to a stream-based protocol and isn't suitable for an on-disk
format for general repository reading because the offset of individual file
entries requires scanning the entire file to find file records.
As part of enabling streaming clones to work in bundle2, #2 proved to have a
significant negative impact on performance. Since bundle2 provides the
opportunity to start fresh, Gregory Szorc figured he would take the
opportunity to invent a new streaming clone data format.
The new format devised in this series addresses #1, #2, and #4. It punts on #3
because it was complex without yielding a significant gain and on #5 because
devising a new store format that "packs" multiple revlogs into a single
"packed revlog" is massive scope bloat. However, this v2 format might be
suitable for streaming into a "packed revlog" with minimal processing. If it
works, great. If not, we can always invent stream format when it is needed.
This patch only introduces the bases of the format. We'll get it usable through
bundle2 first, then we'll extend the format in future patches to bring it to its
full potential (especially #4).
author | Boris Feld <boris.feld@octobus.net> |
---|---|
date | Thu, 18 Jan 2018 00:48:56 +0100 |
parents | 2384523cee4d |
children | bbf7abd09ff0 |
files | mercurial/streamclone.py |
diffstat | 1 files changed, 112 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- a/mercurial/streamclone.py Fri Jan 19 22:52:35 2018 +0100 +++ b/mercurial/streamclone.py Thu Jan 18 00:48:56 2018 +0100 @@ -428,3 +428,115 @@ def apply(self, repo): return applybundlev1(repo, self._fh) + +def _emit(repo, entries, totalfilesize): + """actually emit the stream bundle""" + progress = repo.ui.progress + progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes')) + vfs = repo.svfs + try: + seen = 0 + for name, size in entries: + yield util.uvarintencode(len(name)) + fp = vfs(name) + try: + yield util.uvarintencode(size) + yield name + if size <= 65536: + chunks = (fp.read(size),) + else: + chunks = util.filechunkiter(fp, limit=size) + for chunk in chunks: + seen += len(chunk) + progress(_('bundle'), seen, total=totalfilesize, + unit=_('bytes')) + yield chunk + finally: + fp.close() + finally: + progress(_('bundle'), None) + +def generatev2(repo): + """Emit content for version 2 of a streaming clone. + + the data stream consists the following entries: + 1) A varint containing the length of the filename + 2) A varint containing the length of file data + 3) N bytes containing the filename (the internal, store-agnostic form) + 4) N bytes containing the file data + + Returns a 3-tuple of (file count, file size, data iterator). + """ + + with repo.lock(): + + entries = [] + totalfilesize = 0 + + repo.ui.debug('scanning\n') + for name, ename, size in _walkstreamfiles(repo): + if size: + entries.append((name, size)) + totalfilesize += size + + chunks = _emit(repo, entries, totalfilesize) + + return len(entries), totalfilesize, chunks + +def consumev2(repo, fp, filecount, filesize): + """Apply the contents from a version 2 streaming clone. + + Data is read from an object that only needs to provide a ``read(size)`` + method. + """ + with repo.lock(): + repo.ui.status(_('%d files to transfer, %s of data\n') % + (filecount, util.bytecount(filesize))) + + start = util.timer() + handledbytes = 0 + progress = repo.ui.progress + + progress(_('clone'), handledbytes, total=filesize, unit=_('bytes')) + + vfs = repo.svfs + + with repo.transaction('clone'): + with vfs.backgroundclosing(repo.ui): + for i in range(filecount): + namelen = util.uvarintdecodestream(fp) + datalen = util.uvarintdecodestream(fp) + + name = fp.read(namelen) + + if repo.ui.debugflag: + repo.ui.debug('adding %s (%s)\n' % + (name, util.bytecount(datalen))) + + with vfs(name, 'w') as ofp: + for chunk in util.filechunkiter(fp, limit=datalen): + handledbytes += len(chunk) + progress(_('clone'), handledbytes, total=filesize, + unit=_('bytes')) + ofp.write(chunk) + + # force @filecache properties to be reloaded from + # streamclone-ed file at next access + repo.invalidate(clearfilecache=True) + + elapsed = util.timer() - start + if elapsed <= 0: + elapsed = 0.001 + progress(_('clone'), None) + repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') % + (util.bytecount(handledbytes), elapsed, + util.bytecount(handledbytes / elapsed))) + +def applybundlev2(repo, fp, filecount, filesize, requirements): + missingreqs = [r for r in requirements if r not in repo.supported] + if missingreqs: + raise error.Abort(_('unable to apply stream clone: ' + 'unsupported format: %s') % + ', '.join(sorted(missingreqs))) + + consumev2(repo, fp, filecount, filesize)