Mercurial > hg
changeset 27883:4f4b80b3f2bf
exchange: implement function for inferring bundle specification
We don't currently have a mechanism for inferring bundle spec strings
from bundle files. This patch adds one.
This will eventually be used to make the producing of clone bundles
manifests easier.
author | Gregory Szorc <gregory.szorc@gmail.com> |
---|---|
date | Thu, 14 Jan 2016 22:49:03 -0800 |
parents | 319b0bf6ecc9 |
children | acfe40eb8cb5 |
files | mercurial/exchange.py |
diffstat | 1 files changed, 53 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- a/mercurial/exchange.py Thu Jan 14 22:48:54 2016 -0800 +++ b/mercurial/exchange.py Thu Jan 14 22:49:03 2016 -0800 @@ -187,6 +187,59 @@ else: raise error.Abort(_('%s: unknown bundle version %s') % (fname, version)) +def getbundlespec(ui, fh): + """Infer the bundlespec from a bundle file handle. + + The input file handle is seeked and the original seek position is not + restored. + """ + def speccompression(alg): + for k, v in _bundlespeccompressions.items(): + if v == alg: + return k + return None + + b = readbundle(ui, fh, None) + if isinstance(b, changegroup.cg1unpacker): + alg = b._type + if alg == '_truncatedBZ': + alg = 'BZ' + comp = speccompression(alg) + if not comp: + raise error.Abort(_('unknown compression algorithm: %s') % alg) + return '%s-v1' % comp + elif isinstance(b, bundle2.unbundle20): + if 'Compression' in b.params: + comp = speccompression(b.params['Compression']) + if not comp: + raise error.Abort(_('unknown compression algorithm: %s') % comp) + else: + comp = 'none' + + version = None + for part in b.iterparts(): + if part.type == 'changegroup': + version = part.params['version'] + if version in ('01', '02'): + version = 'v2' + else: + raise error.Abort(_('changegroup version %s does not have ' + 'a known bundlespec') % version, + hint=_('try upgrading your Mercurial ' + 'client')) + + if not version: + raise error.Abort(_('could not identify changegroup version in ' + 'bundle')) + + return '%s-%s' % (comp, version) + elif isinstance(b, streamclone.streamcloneapplier): + requirements = streamclone.readbundle1header(fh)[2] + params = 'requirements=%s' % ','.join(sorted(requirements)) + return 'none-packed1;%s' % urllib.quote(params) + else: + raise error.Abort(_('unknown bundle type: %s') % b) + def buildobsmarkerspart(bundler, markers): """add an obsmarker part to the bundler with <markers>