Mercurial > hg-stable
changeset 26755:bb0b955d050d
streamclone: support for producing and consuming stream clone bundles
Up to this point, stream clones only existed as a dynamically generated
data format produced and consumed during streaming clones. In order to
support this efficient cloning format with the clone bundles feature, we
need a more formal, on disk representation of the streaming clone data.
This patch introduces a new "bundle" type for streaming clones. Unlike
existing bundles, it does not contain changegroup data. It does,
however, share the same concepts like the 4 byte header which identifies
the type of data that follows and the 2 byte abbreviation for
compression types (of which only "UN" is currently supported).
The new bundle format is essentially the existing stream clone version 1
data format with some headers at the beginning.
Content negotiation at stream clone request time checked for repository
format/requirements compatibility before initiating a stream clone. We
can't do active content negotiation when using clone bundles. So, we put
this set of requirements inside the payload so consumers have a built-in
mechanism for checking compatibility before reading and applying lots of
data. Of course, we will also advertise this requirements set in clone
bundles. But that's for another patch.
We currently don't have a mechanism to produce and consume this new
bundle format. This will be implemented in upcoming patches.
It's worth noting that if a legacy client attempts to `hg unbundle` a
stream clone bundle (with the "HGS1" header), it will abort with:
"unknown bundle version S1," which seems appropriate.
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Sat, 17 Oct 2015 11:14:52 -0700 |
parents | e7e1528cf200 |
children | 9e272a96f764 |
files | mercurial/streamclone.py |
diffstat | 1 files changed, 100 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- a/mercurial/streamclone.py Sat Oct 17 15:28:02 2015 -0500 +++ b/mercurial/streamclone.py Sat Oct 17 11:14:52 2015 -0700 @@ -7,6 +7,7 @@ from __future__ import absolute_import +import struct import time from .i18n import _ @@ -236,6 +237,61 @@ for chunk in it: yield chunk +def generatebundlev1(repo, compression='UN'): + """Emit content for version 1 of a stream clone bundle. + + The first 4 bytes of the output ("HGS1") denote this as stream clone + bundle version 1. + + The next 2 bytes indicate the compression type. Only "UN" is currently + supported. + + The next 16 bytes are two 64-bit big endian unsigned integers indicating + file count and byte count, respectively. + + The next 2 bytes is a 16-bit big endian unsigned short declaring the length + of the requirements string, including a trailing \0. The following N bytes + are the requirements string, which is ASCII containing a comma-delimited + list of repo requirements that are needed to support the data. + + The remaining content is the output of ``generatev1()`` (which may be + compressed in the future). + + Returns a tuple of (requirements, data generator). + """ + if compression != 'UN': + raise ValueError('we do not support the compression argument yet') + + requirements = repo.requirements & repo.supportedformats + requires = ','.join(sorted(requirements)) + + def gen(): + yield 'HGS1' + yield compression + + filecount, bytecount, it = generatev1(repo) + repo.ui.status(_('writing %d bytes for %d files\n') % + (bytecount, filecount)) + + yield struct.pack('>QQ', filecount, bytecount) + yield struct.pack('>H', len(requires) + 1) + yield requires + '\0' + + # This is where we'll add compression in the future. + assert compression == 'UN' + + seen = 0 + repo.ui.progress(_('bundle'), 0, total=bytecount) + + for chunk in it: + seen += len(chunk) + repo.ui.progress(_('bundle'), seen, total=bytecount) + yield chunk + + repo.ui.progress(_('bundle'), None) + + return requirements, gen() + def consumev1(repo, fp, filecount, bytecount): """Apply the contents from version 1 of a streaming clone file handle. @@ -290,3 +346,47 @@ util.bytecount(bytecount / elapsed))) finally: lock.release() + +def applybundlev1(repo, fp): + """Apply the content from a stream clone bundle version 1. + + We assume the 4 byte header has been read and validated and the file handle + is at the 2 byte compression identifier. + """ + if len(repo): + raise error.Abort(_('cannot apply stream clone bundle on non-empty ' + 'repo')) + + compression = fp.read(2) + if compression != 'UN': + raise error.Abort(_('only uncompressed stream clone bundles are ' + 'supported; got %s') % compression) + + filecount, bytecount = struct.unpack('>QQ', fp.read(16)) + requireslen = struct.unpack('>H', fp.read(2))[0] + requires = fp.read(requireslen) + + if not requires.endswith('\0'): + raise error.Abort(_('malformed stream clone bundle: ' + 'requirements not properly encoded')) + + requirements = set(requires.rstrip('\0').split(',')) + missingreqs = requirements - repo.supportedformats + if missingreqs: + raise error.Abort(_('unable to apply stream clone: ' + 'unsupported format: %s') % + ', '.join(sorted(missingreqs))) + + consumev1(repo, fp, filecount, bytecount) + +class streamcloneapplier(object): + """Class to manage applying streaming clone bundles. + + We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle + readers to perform bundle type-specific functionality. + """ + def __init__(self, fh): + self._fh = fh + + def apply(self, repo): + return applybundlev1(repo, self._fh)