--- a/mercurial/streamclone.py Sat Oct 17 15:28:02 2015 -0500
+++ b/mercurial/streamclone.py Sat Oct 17 11:14:52 2015 -0700
@@ -7,6 +7,7 @@
from __future__ import absolute_import
+import struct
import time
from .i18n import _
@@ -236,6 +237,61 @@
for chunk in it:
yield chunk
+def generatebundlev1(repo, compression='UN'):
+ """Emit content for version 1 of a stream clone bundle.
+
+ The first 4 bytes of the output ("HGS1") denote this as stream clone
+ bundle version 1.
+
+ The next 2 bytes indicate the compression type. Only "UN" is currently
+ supported.
+
+ The next 16 bytes are two 64-bit big endian unsigned integers indicating
+ file count and byte count, respectively.
+
+ The next 2 bytes is a 16-bit big endian unsigned short declaring the length
+ of the requirements string, including a trailing \0. The following N bytes
+ are the requirements string, which is ASCII containing a comma-delimited
+ list of repo requirements that are needed to support the data.
+
+ The remaining content is the output of ``generatev1()`` (which may be
+ compressed in the future).
+
+ Returns a tuple of (requirements, data generator).
+ """
+ if compression != 'UN':
+ raise ValueError('we do not support the compression argument yet')
+
+ requirements = repo.requirements & repo.supportedformats
+ requires = ','.join(sorted(requirements))
+
+ def gen():
+ yield 'HGS1'
+ yield compression
+
+ filecount, bytecount, it = generatev1(repo)
+ repo.ui.status(_('writing %d bytes for %d files\n') %
+ (bytecount, filecount))
+
+ yield struct.pack('>QQ', filecount, bytecount)
+ yield struct.pack('>H', len(requires) + 1)
+ yield requires + '\0'
+
+ # This is where we'll add compression in the future.
+ assert compression == 'UN'
+
+ seen = 0
+ repo.ui.progress(_('bundle'), 0, total=bytecount)
+
+ for chunk in it:
+ seen += len(chunk)
+ repo.ui.progress(_('bundle'), seen, total=bytecount)
+ yield chunk
+
+ repo.ui.progress(_('bundle'), None)
+
+ return requirements, gen()
+
def consumev1(repo, fp, filecount, bytecount):
"""Apply the contents from version 1 of a streaming clone file handle.
@@ -290,3 +346,47 @@
util.bytecount(bytecount / elapsed)))
finally:
lock.release()
+
+def applybundlev1(repo, fp):
+ """Apply the content from a stream clone bundle version 1.
+
+ We assume the 4 byte header has been read and validated and the file handle
+ is at the 2 byte compression identifier.
+ """
+ if len(repo):
+ raise error.Abort(_('cannot apply stream clone bundle on non-empty '
+ 'repo'))
+
+ compression = fp.read(2)
+ if compression != 'UN':
+ raise error.Abort(_('only uncompressed stream clone bundles are '
+ 'supported; got %s') % compression)
+
+ filecount, bytecount = struct.unpack('>QQ', fp.read(16))
+ requireslen = struct.unpack('>H', fp.read(2))[0]
+ requires = fp.read(requireslen)
+
+ if not requires.endswith('\0'):
+ raise error.Abort(_('malformed stream clone bundle: '
+ 'requirements not properly encoded'))
+
+ requirements = set(requires.rstrip('\0').split(','))
+ missingreqs = requirements - repo.supportedformats
+ if missingreqs:
+ raise error.Abort(_('unable to apply stream clone: '
+ 'unsupported format: %s') %
+ ', '.join(sorted(missingreqs)))
+
+ consumev1(repo, fp, filecount, bytecount)
+
+class streamcloneapplier(object):
+ """Class to manage applying streaming clone bundles.
+
+ We need to wrap ``applybundlev1()`` in a dedicated type to enable bundle
+ readers to perform bundle type-specific functionality.
+ """
+ def __init__(self, fh):
+ self._fh = fh
+
+ def apply(self, repo):
+ return applybundlev1(repo, self._fh)