--- a/mercurial/bundle2.py Sat Oct 05 10:29:34 2019 -0400
+++ b/mercurial/bundle2.py Sun Oct 06 09:45:02 2019 -0400
@@ -171,9 +171,7 @@
url,
util,
)
-from .utils import (
- stringutil,
-)
+from .utils import stringutil
urlerr = util.urlerr
urlreq = util.urlreq
@@ -192,31 +190,37 @@
_parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
+
def outdebug(ui, message):
"""debug regarding output stream (bundling)"""
if ui.configbool('devel', 'bundle2.debug'):
ui.debug('bundle2-output: %s\n' % message)
+
def indebug(ui, message):
"""debug on input stream (unbundling)"""
if ui.configbool('devel', 'bundle2.debug'):
ui.debug('bundle2-input: %s\n' % message)
+
def validateparttype(parttype):
"""raise ValueError if a parttype contains invalid character"""
if _parttypeforbidden.search(parttype):
raise ValueError(parttype)
+
def _makefpartparamsizes(nbparams):
"""return a struct format to read part parameter sizes
The number parameters is variable so we need to build that format
dynamically.
"""
- return '>'+('BB'*nbparams)
+ return '>' + ('BB' * nbparams)
+
parthandlermapping = {}
+
def parthandler(parttype, params=()):
"""decorator that register a function as a bundle2 part handler
@@ -228,14 +232,17 @@
...
"""
validateparttype(parttype)
+
def _decorator(func):
- lparttype = parttype.lower() # enforce lower case matching.
+ lparttype = parttype.lower() # enforce lower case matching.
assert lparttype not in parthandlermapping
parthandlermapping[lparttype] = func
func.params = frozenset(params)
return func
+
return _decorator
+
class unbundlerecords(object):
"""keep record of what happens during and unbundle
@@ -283,6 +290,7 @@
__bool__ = __nonzero__
+
class bundleoperation(object):
"""an object that represents a single bundling process
@@ -328,13 +336,17 @@
def addhookargs(self, hookargs):
if self.hookargs is None:
- raise error.ProgrammingError('attempted to add hookargs to '
- 'operation after transaction started')
+ raise error.ProgrammingError(
+ 'attempted to add hookargs to '
+ 'operation after transaction started'
+ )
self.hookargs.update(hookargs)
+
class TransactionUnavailable(RuntimeError):
pass
+
def _notransaction():
"""default method to get a transaction while processing a bundle
@@ -342,6 +354,7 @@
to be created"""
raise TransactionUnavailable()
+
def applybundle(repo, unbundler, tr, source, url=None, **kwargs):
# transform me into unbundler.apply() as soon as the freeze is lifted
if isinstance(unbundler, unbundle20):
@@ -357,6 +370,7 @@
_processchangegroup(op, unbundler, tr, source, url, **kwargs)
return op
+
class partiterator(object):
def __init__(self, repo, op, unbundler):
self.repo = repo
@@ -375,6 +389,7 @@
yield p
p.consume()
self.current = None
+
self.iterator = func()
return self.iterator
@@ -422,8 +437,10 @@
if seekerror:
raise exc
- self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
- self.count)
+ self.repo.ui.debug(
+ 'bundle2-input-bundle: %i parts total\n' % self.count
+ )
+
def processbundle(repo, unbundler, transactiongetter=None, op=None, source=''):
"""This function process a bundle, apply effect to/from a repo
@@ -461,20 +478,21 @@
return op
+
def processparts(repo, op, unbundler):
with partiterator(repo, op, unbundler) as parts:
for part in parts:
_processpart(op, part)
+
def _processchangegroup(op, cg, tr, source, url, **kwargs):
ret = cg.apply(op.repo, tr, source, url, **kwargs)
- op.records.add('changegroup', {
- 'return': ret,
- })
+ op.records.add('changegroup', {'return': ret,})
return ret
+
def _gethandler(op, part):
- status = 'unknown' # used by debug output
+ status = 'unknown' # used by debug output
try:
handler = parthandlermapping.get(part.type)
if handler is None:
@@ -486,14 +504,15 @@
unknownparams = list(unknownparams)
unknownparams.sort()
status = 'unsupported-params (%s)' % ', '.join(unknownparams)
- raise error.BundleUnknownFeatureError(parttype=part.type,
- params=unknownparams)
+ raise error.BundleUnknownFeatureError(
+ parttype=part.type, params=unknownparams
+ )
status = 'supported'
except error.BundleUnknownFeatureError as exc:
- if part.mandatory: # mandatory parts
+ if part.mandatory: # mandatory parts
raise
indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
- return # skip to part processing
+ return # skip to part processing
finally:
if op.ui.debugflag:
msg = ['bundle2-input-part: "%s"' % part.type]
@@ -513,6 +532,7 @@
return handler
+
def _processpart(op, part):
"""process a single part from a bundle
@@ -536,10 +556,11 @@
if output is not None:
output = op.ui.popbuffer()
if output:
- outpart = op.reply.newpart('output', data=output,
- mandatory=False)
+ outpart = op.reply.newpart('output', data=output, mandatory=False)
outpart.addparam(
- 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
+ 'in-reply-to', pycompat.bytestr(part.id), mandatory=False
+ )
+
def decodecaps(blob):
"""decode a bundle2 caps bytes blob into a dictionary
@@ -564,6 +585,7 @@
caps[key] = vals
return caps
+
def encodecaps(caps):
"""encode a bundle2 caps dictionary into a bytes blob"""
chunks = []
@@ -576,11 +598,12 @@
chunks.append(ca)
return '\n'.join(chunks)
+
bundletypes = {
- "": ("", 'UN'), # only when using unbundle on ssh and old http servers
- # since the unification ssh accepts a header but there
- # is no capability signaling it.
- "HG20": (), # special-cased below
+ "": ("", 'UN'), # only when using unbundle on ssh and old http servers
+ # since the unification ssh accepts a header but there
+ # is no capability signaling it.
+ "HG20": (), # special-cased below
"HG10UN": ("HG10UN", 'UN'),
"HG10BZ": ("HG10", 'BZ'),
"HG10GZ": ("HG10GZ", 'GZ'),
@@ -589,6 +612,7 @@
# hgweb uses this list to communicate its preferred type
bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
+
class bundle20(object):
"""represent an outgoing bundle2 container
@@ -630,8 +654,9 @@
if not name:
raise error.ProgrammingError(b'empty parameter name')
if name[0:1] not in pycompat.bytestr(string.ascii_letters):
- raise error.ProgrammingError(b'non letter first character: %s'
- % name)
+ raise error.ProgrammingError(
+ b'non letter first character: %s' % name
+ )
self._params.append((name, value))
def addpart(self, part):
@@ -639,7 +664,7 @@
Parts contains the actual applicative payload."""
assert part.id is None
- part.id = len(self._parts) # very cheap counter
+ part.id = len(self._parts) # very cheap counter
self._parts.append(part)
def newpart(self, typeid, *args, **kwargs):
@@ -670,8 +695,9 @@
yield _pack(_fstreamparamsize, len(param))
if param:
yield param
- for chunk in self._compengine.compressstream(self._getcorechunk(),
- self._compopts):
+ for chunk in self._compengine.compressstream(
+ self._getcorechunk(), self._compopts
+ ):
yield chunk
def _paramchunk(self):
@@ -697,7 +723,6 @@
outdebug(self.ui, 'end of bundle')
yield _pack(_fpartheadersize, 0)
-
def salvageoutput(self):
"""return a list with a copy of all output parts in the bundle
@@ -737,6 +762,7 @@
Do not use it to implement higher-level logic or methods."""
return changegroup.readexactly(self._fp, size)
+
def getunbundler(ui, fp, magicstring=None):
"""return a valid unbundler object for a given magicstring"""
if magicstring is None:
@@ -745,7 +771,8 @@
if magic != 'HG':
ui.debug(
"error: invalid magic: %r (version %r), should be 'HG'\n"
- % (magic, version))
+ % (magic, version)
+ )
raise error.Abort(_('not a Mercurial bundle'))
unbundlerclass = formatmap.get(version)
if unbundlerclass is None:
@@ -754,6 +781,7 @@
indebug(ui, 'start processing of %s stream' % magicstring)
return unbundler
+
class unbundle20(unpackermixin):
"""interpret a bundle2 stream
@@ -776,8 +804,9 @@
params = {}
paramssize = self._unpack(_fstreamparamsize)[0]
if paramssize < 0:
- raise error.BundleValueError('negative bundle param size: %i'
- % paramssize)
+ raise error.BundleValueError(
+ 'negative bundle param size: %i' % paramssize
+ )
if paramssize:
params = self._readexact(paramssize)
params = self._processallparams(params)
@@ -795,7 +824,6 @@
params[p[0]] = p[1]
return params
-
def _processparam(self, name, value):
"""process a parameter, applying its effect if needed
@@ -832,8 +860,9 @@
assert 'params' not in vars(self)
paramssize = self._unpack(_fstreamparamsize)[0]
if paramssize < 0:
- raise error.BundleValueError('negative bundle param size: %i'
- % paramssize)
+ raise error.BundleValueError(
+ 'negative bundle param size: %i' % paramssize
+ )
if paramssize:
params = self._readexact(paramssize)
self._processallparams(params)
@@ -868,7 +897,6 @@
raise error.BundleValueError('negative chunk size: %i')
yield self._readexact(size)
-
def iterparts(self, seekable=False):
"""yield all parts contained in the stream"""
cls = seekableunbundlepart if seekable else unbundlepart
@@ -894,15 +922,16 @@
returns None if empty"""
headersize = self._unpack(_fpartheadersize)[0]
if headersize < 0:
- raise error.BundleValueError('negative part header size: %i'
- % headersize)
+ raise error.BundleValueError(
+ 'negative part header size: %i' % headersize
+ )
indebug(self.ui, 'part header size: %i' % headersize)
if headersize:
return self._readexact(headersize)
return None
def compressed(self):
- self.params # load params
+ self.params # load params
return self._compressed
def close(self):
@@ -910,28 +939,33 @@
if util.safehasattr(self._fp, 'close'):
return self._fp.close()
+
formatmap = {'20': unbundle20}
b2streamparamsmap = {}
+
def b2streamparamhandler(name):
"""register a handler for a stream level parameter"""
+
def decorator(func):
assert name not in formatmap
b2streamparamsmap[name] = func
return func
+
return decorator
+
@b2streamparamhandler('compression')
def processcompression(unbundler, param, value):
"""read compression parameter and install payload decompression"""
if value not in util.compengines.supportedbundletypes:
- raise error.BundleUnknownFeatureError(params=(param,),
- values=(value,))
+ raise error.BundleUnknownFeatureError(params=(param,), values=(value,))
unbundler._compengine = util.compengines.forbundletype(value)
if value is not None:
unbundler._compressed = True
+
class bundlepart(object):
"""A bundle2 part contains application level payload
@@ -948,8 +982,14 @@
Both data and parameters cannot be modified after the generation has begun.
"""
- def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
- data='', mandatory=True):
+ def __init__(
+ self,
+ parttype,
+ mandatoryparams=(),
+ advisoryparams=(),
+ data='',
+ mandatory=True,
+ ):
validateparttype(parttype)
self.id = None
self.type = parttype
@@ -971,8 +1011,13 @@
def __repr__(self):
cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
- return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
- % (cls, id(self), self.id, self.type, self.mandatory))
+ return '<%s object at %x; id: %s; type: %s; mandatory: %s>' % (
+ cls,
+ id(self),
+ self.id,
+ self.type,
+ self.mandatory,
+ )
def copy(self):
"""return a copy of the part
@@ -980,8 +1025,13 @@
The new part have the very same content but no partid assigned yet.
Parts with generated data cannot be copied."""
assert not util.safehasattr(self.data, 'next')
- return self.__class__(self.type, self._mandatoryparams,
- self._advisoryparams, self._data, self.mandatory)
+ return self.__class__(
+ self.type,
+ self._mandatoryparams,
+ self._advisoryparams,
+ self._data,
+ self.mandatory,
+ )
# methods used to defines the part content
@property
@@ -1043,8 +1093,9 @@
msg.append(')')
if not self.data:
msg.append(' empty payload')
- elif (util.safehasattr(self.data, 'next')
- or util.safehasattr(self.data, '__next__')):
+ elif util.safehasattr(self.data, 'next') or util.safehasattr(
+ self.data, '__next__'
+ ):
msg.append(' streamed payload')
else:
msg.append(' %i bytes payload' % len(self.data))
@@ -1058,9 +1109,11 @@
parttype = self.type.lower()
outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
## parttype
- header = [_pack(_fparttypesize, len(parttype)),
- parttype, _pack(_fpartid, self.id),
- ]
+ header = [
+ _pack(_fparttypesize, len(parttype)),
+ parttype,
+ _pack(_fpartid, self.id),
+ ]
## parameters
# count
manpar = self.mandatoryparams
@@ -1087,8 +1140,10 @@
try:
headerchunk = ''.join(header)
except TypeError:
- raise TypeError(r'Found a non-bytes trying to '
- r'build bundle part header: %r' % header)
+ raise TypeError(
+ r'Found a non-bytes trying to '
+ r'build bundle part header: %r' % header
+ )
outdebug(ui, 'header chunk size: %i' % len(headerchunk))
yield _pack(_fpartheadersize, len(headerchunk))
yield headerchunk
@@ -1107,12 +1162,14 @@
except BaseException as exc:
bexc = stringutil.forcebytestr(exc)
# backup exception data for later
- ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
- % bexc)
+ ui.debug(
+ 'bundle2-input-stream-interrupt: encoding exception %s' % bexc
+ )
tb = sys.exc_info()[2]
msg = 'unexpected error: %s' % bexc
- interpart = bundlepart('error:abort', [('message', msg)],
- mandatory=False)
+ interpart = bundlepart(
+ 'error:abort', [('message', msg)], mandatory=False
+ )
interpart.id = 0
yield _pack(_fpayloadsize, -1)
for chunk in interpart.getchunks(ui=ui):
@@ -1132,8 +1189,9 @@
Exists to handle the different methods to provide data to a part."""
# we only support fixed size data now.
# This will be improved in the future.
- if (util.safehasattr(self.data, 'next')
- or util.safehasattr(self.data, '__next__')):
+ if util.safehasattr(self.data, 'next') or util.safehasattr(
+ self.data, '__next__'
+ ):
buff = util.chunkbuffer(self.data)
chunk = buff.read(preferedchunksize)
while chunk:
@@ -1145,6 +1203,7 @@
flaginterrupt = -1
+
class interrupthandler(unpackermixin):
"""read one part and process it with restricted capability
@@ -1163,8 +1222,9 @@
returns None if empty"""
headersize = self._unpack(_fpartheadersize)[0]
if headersize < 0:
- raise error.BundleValueError('negative part header size: %i'
- % headersize)
+ raise error.BundleValueError(
+ 'negative part header size: %i' % headersize
+ )
indebug(self.ui, 'part header size: %i\n' % headersize)
if headersize:
return self._readexact(headersize)
@@ -1172,8 +1232,9 @@
def __call__(self):
- self.ui.debug('bundle2-input-stream-interrupt:'
- ' opening out of band context\n')
+ self.ui.debug(
+ 'bundle2-input-stream-interrupt:' ' opening out of band context\n'
+ )
indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
headerblock = self._readpartheader()
if headerblock is None:
@@ -1190,8 +1251,10 @@
finally:
if not hardabort:
part.consume()
- self.ui.debug('bundle2-input-stream-interrupt:'
- ' closing out of band context\n')
+ self.ui.debug(
+ 'bundle2-input-stream-interrupt:' ' closing out of band context\n'
+ )
+
class interruptoperation(object):
"""A limited operation to be use by part handler during interruption
@@ -1211,6 +1274,7 @@
def gettransaction(self):
raise TransactionUnavailable('no repo access from stream interruption')
+
def decodepayloadchunks(ui, fh):
"""Reads bundle2 part payload data into chunks.
@@ -1235,9 +1299,13 @@
if chunksize >= 0:
s = read(chunksize)
if len(s) < chunksize:
- raise error.Abort(_('stream ended unexpectedly '
- ' (got %d bytes, expected %d)') %
- (len(s), chunksize))
+ raise error.Abort(
+ _(
+ 'stream ended unexpectedly '
+ ' (got %d bytes, expected %d)'
+ )
+ % (len(s), chunksize)
+ )
yield s
elif chunksize == flaginterrupt:
@@ -1246,13 +1314,15 @@
interrupthandler(ui, fh)()
else:
raise error.BundleValueError(
- 'negative payload chunk size: %s' % chunksize)
+ 'negative payload chunk size: %s' % chunksize
+ )
s = read(headersize)
if len(s) < headersize:
- raise error.Abort(_('stream ended unexpectedly '
- ' (got %d bytes, expected %d)') %
- (len(s), chunksize))
+ raise error.Abort(
+ _('stream ended unexpectedly ' ' (got %d bytes, expected %d)')
+ % (len(s), chunksize)
+ )
chunksize = unpack(s)[0]
@@ -1260,13 +1330,15 @@
if dolog:
debug('bundle2-input: payload chunk size: %i\n' % chunksize)
+
class unbundlepart(unpackermixin):
"""a bundle part read from a bundle"""
def __init__(self, ui, header, fp):
super(unbundlepart, self).__init__(fp)
- self._seekable = (util.safehasattr(fp, 'seek') and
- util.safehasattr(fp, 'tell'))
+ self._seekable = util.safehasattr(fp, 'seek') and util.safehasattr(
+ fp, 'tell'
+ )
self.ui = ui
# unbundle state attr
self._headerdata = header
@@ -1287,7 +1359,7 @@
def _fromheader(self, size):
"""return the next <size> byte from the header"""
offset = self._headeroffset
- data = self._headerdata[offset:(offset + size)]
+ data = self._headerdata[offset : (offset + size)]
self._headeroffset = offset + size
return data
@@ -1302,7 +1374,7 @@
"""internal function to setup all logic related parameters"""
# make it read only to prevent people touching it by mistake.
self.mandatoryparams = tuple(mandatoryparams)
- self.advisoryparams = tuple(advisoryparams)
+ self.advisoryparams = tuple(advisoryparams)
# user friendly UI
self.params = util.sortdict(self.mandatoryparams)
self.params.update(self.advisoryparams)
@@ -1316,7 +1388,7 @@
self.id = self._unpackheader(_fpartid)[0]
indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
# extract mandatory bit from type
- self.mandatory = (self.type != self.type.lower())
+ self.mandatory = self.type != self.type.lower()
self.type = self.type.lower()
## reading parameters
# param count
@@ -1372,11 +1444,13 @@
self._pos += len(data)
if size is None or len(data) < size:
if not self.consumed and self._pos:
- self.ui.debug('bundle2-input-part: total payload size %i\n'
- % self._pos)
+ self.ui.debug(
+ 'bundle2-input-part: total payload size %i\n' % self._pos
+ )
self.consumed = True
return data
+
class seekableunbundlepart(unbundlepart):
"""A bundle2 part in a bundle that is seekable.
@@ -1394,6 +1468,7 @@
to the number of chunks within the payload (which almost certainly
increases in proportion with the size of the part).
"""
+
def __init__(self, ui, header, fp):
# (payload, file) offsets for chunk starts.
self._chunkindex = []
@@ -1407,7 +1482,8 @@
self._chunkindex.append((0, self._tellfp()))
else:
assert chunknum < len(self._chunkindex), (
- 'Unknown chunk %d' % chunknum)
+ 'Unknown chunk %d' % chunknum
+ )
self._seekfp(self._chunkindex[chunknum][1])
pos = self._chunkindex[chunknum][0]
@@ -1495,21 +1571,23 @@
raise
return None
+
# These are only the static capabilities.
# Check the 'getrepocaps' function for the rest.
-capabilities = {'HG20': (),
- 'bookmarks': (),
- 'error': ('abort', 'unsupportedcontent', 'pushraced',
- 'pushkey'),
- 'listkeys': (),
- 'pushkey': (),
- 'digests': tuple(sorted(util.DIGESTS.keys())),
- 'remote-changegroup': ('http', 'https'),
- 'hgtagsfnodes': (),
- 'rev-branch-cache': (),
- 'phases': ('heads',),
- 'stream': ('v2',),
- }
+capabilities = {
+ 'HG20': (),
+ 'bookmarks': (),
+ 'error': ('abort', 'unsupportedcontent', 'pushraced', 'pushkey'),
+ 'listkeys': (),
+ 'pushkey': (),
+ 'digests': tuple(sorted(util.DIGESTS.keys())),
+ 'remote-changegroup': ('http', 'https'),
+ 'hgtagsfnodes': (),
+ 'rev-branch-cache': (),
+ 'phases': ('heads',),
+ 'stream': ('v2',),
+}
+
def getrepocaps(repo, allowpushback=False, role=None):
"""return the bundle2 capabilities for a given repo
@@ -1524,8 +1602,9 @@
raise error.ProgrammingError('role argument must be client or server')
caps = capabilities.copy()
- caps['changegroup'] = tuple(sorted(
- changegroup.supportedincomingversions(repo)))
+ caps['changegroup'] = tuple(
+ sorted(changegroup.supportedincomingversions(repo))
+ )
if obsolete.isenabled(repo, obsolete.exchangeopt):
supportedformat = tuple('V%i' % v for v in obsolete.formats)
caps['obsmarkers'] = supportedformat
@@ -1539,8 +1618,9 @@
# Don't advertise stream clone support in server mode if not configured.
if role == 'server':
- streamsupported = repo.ui.configbool('server', 'uncompressed',
- untrusted=True)
+ streamsupported = repo.ui.configbool(
+ 'server', 'uncompressed', untrusted=True
+ )
featuresupported = repo.ui.configbool('server', 'bundle2.stream')
if not streamsupported or not featuresupported:
@@ -1550,6 +1630,7 @@
return caps
+
def bundle2caps(remote):
"""return the bundle capabilities of a peer as dict"""
raw = remote.capable('bundle2')
@@ -1558,18 +1639,37 @@
capsblob = urlreq.unquote(remote.capable('bundle2'))
return decodecaps(capsblob)
+
def obsmarkersversion(caps):
"""extract the list of supported obsmarkers versions from a bundle2caps dict
"""
obscaps = caps.get('obsmarkers', ())
return [int(c[1:]) for c in obscaps if c.startswith('V')]
-def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
- vfs=None, compression=None, compopts=None):
+
+def writenewbundle(
+ ui,
+ repo,
+ source,
+ filename,
+ bundletype,
+ outgoing,
+ opts,
+ vfs=None,
+ compression=None,
+ compopts=None,
+):
if bundletype.startswith('HG10'):
cg = changegroup.makechangegroup(repo, outgoing, '01', source)
- return writebundle(ui, cg, filename, bundletype, vfs=vfs,
- compression=compression, compopts=compopts)
+ return writebundle(
+ ui,
+ cg,
+ filename,
+ bundletype,
+ vfs=vfs,
+ compression=compression,
+ compopts=compopts,
+ )
elif not bundletype.startswith('HG20'):
raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
@@ -1583,6 +1683,7 @@
return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
+
def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
# We should eventually reconcile this logic with the one behind
# 'exchange.getbundle2partsgenerator'.
@@ -1601,10 +1702,12 @@
part = bundler.newpart('changegroup', data=cg.getchunks())
part.addparam('version', cg.version)
if 'clcount' in cg.extras:
- part.addparam('nbchanges', '%d' % cg.extras['clcount'],
- mandatory=False)
- if opts.get('phases') and repo.revs('%ln and secret()',
- outgoing.missingheads):
+ part.addparam(
+ 'nbchanges', '%d' % cg.extras['clcount'], mandatory=False
+ )
+ if opts.get('phases') and repo.revs(
+ '%ln and secret()', outgoing.missingheads
+ ):
part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
if opts.get('streamv2', False):
@@ -1625,6 +1728,7 @@
phasedata = phases.binaryencode(headsbyphase)
bundler.newpart('phase-heads', data=phasedata)
+
def addparttagsfnodescache(repo, bundler, outgoing):
# we include the tags fnode cache for the bundle changeset
# (as an optional parts)
@@ -1649,6 +1753,7 @@
if chunks:
bundler.newpart('hgtagsfnodes', data=''.join(chunks))
+
def addpartrevbranchcache(repo, bundler, outgoing):
# we include the rev branch cache for the bundle changeset
# (as an optional parts)
@@ -1669,28 +1774,36 @@
for n in sorted(closed):
yield n
- bundler.newpart('cache:rev-branch-cache', data=generate(),
- mandatory=False)
+ bundler.newpart('cache:rev-branch-cache', data=generate(), mandatory=False)
+
def _formatrequirementsspec(requirements):
requirements = [req for req in requirements if req != "shared"]
return urlreq.quote(','.join(sorted(requirements)))
+
def _formatrequirementsparams(requirements):
requirements = _formatrequirementsspec(requirements)
params = "%s%s" % (urlreq.quote("requirements="), requirements)
return params
+
def addpartbundlestream2(bundler, repo, **kwargs):
if not kwargs.get(r'stream', False):
return
if not streamclone.allowservergeneration(repo):
- raise error.Abort(_('stream data requested but server does not allow '
- 'this feature'),
- hint=_('well-behaved clients should not be '
- 'requesting stream data from servers not '
- 'advertising it; the client may be buggy'))
+ raise error.Abort(
+ _(
+ 'stream data requested but server does not allow '
+ 'this feature'
+ ),
+ hint=_(
+ 'well-behaved clients should not be '
+ 'requesting stream data from servers not '
+ 'advertising it; the client may be buggy'
+ ),
+ )
# Stream clones don't compress well. And compression undermines a
# goal of stream clones, which is to be fast. Communicate the desire
@@ -1701,8 +1814,9 @@
includepats = kwargs.get(r'includepats')
excludepats = kwargs.get(r'excludepats')
- narrowstream = repo.ui.configbool('experimental',
- 'server.stream-narrow-clones')
+ narrowstream = repo.ui.configbool(
+ 'experimental', 'server.stream-narrow-clones'
+ )
if (includepats or excludepats) and not narrowstream:
raise error.Abort(_('server does not support narrow stream clones'))
@@ -1711,20 +1825,25 @@
if repo.obsstore:
remoteversions = obsmarkersversion(bundler.capabilities)
if not remoteversions:
- raise error.Abort(_('server has obsolescence markers, but client '
- 'cannot receive them via stream clone'))
+ raise error.Abort(
+ _(
+ 'server has obsolescence markers, but client '
+ 'cannot receive them via stream clone'
+ )
+ )
elif repo.obsstore._version in remoteversions:
includeobsmarkers = True
- filecount, bytecount, it = streamclone.generatev2(repo, includepats,
- excludepats,
- includeobsmarkers)
+ filecount, bytecount, it = streamclone.generatev2(
+ repo, includepats, excludepats, includeobsmarkers
+ )
requirements = _formatrequirementsspec(repo.requirements)
part = bundler.newpart('stream2', data=it)
part.addparam('bytecount', '%d' % bytecount, mandatory=True)
part.addparam('filecount', '%d' % filecount, mandatory=True)
part.addparam('requirements', requirements, mandatory=True)
+
def buildobsmarkerspart(bundler, markers):
"""add an obsmarker part to the bundler with <markers>
@@ -1741,8 +1860,10 @@
stream = obsolete.encodemarkers(markers, True, version=version)
return bundler.newpart('obsmarkers', data=stream)
-def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
- compopts=None):
+
+def writebundle(
+ ui, cg, filename, bundletype, vfs=None, compression=None, compopts=None
+):
"""Write a bundle file and return its filename.
Existing files will not be overwritten.
@@ -1757,34 +1878,37 @@
part = bundle.newpart('changegroup', data=cg.getchunks())
part.addparam('version', cg.version)
if 'clcount' in cg.extras:
- part.addparam('nbchanges', '%d' % cg.extras['clcount'],
- mandatory=False)
+ part.addparam(
+ 'nbchanges', '%d' % cg.extras['clcount'], mandatory=False
+ )
chunkiter = bundle.getchunks()
else:
# compression argument is only for the bundle2 case
assert compression is None
if cg.version != '01':
- raise error.Abort(_('old bundle types only supports v1 '
- 'changegroups'))
+ raise error.Abort(
+ _('old bundle types only supports v1 ' 'changegroups')
+ )
header, comp = bundletypes[bundletype]
if comp not in util.compengines.supportedbundletypes:
- raise error.Abort(_('unknown stream compression type: %s')
- % comp)
+ raise error.Abort(_('unknown stream compression type: %s') % comp)
compengine = util.compengines.forbundletype(comp)
+
def chunkiter():
yield header
for chunk in compengine.compressstream(cg.getchunks(), compopts):
yield chunk
+
chunkiter = chunkiter()
# parse the changegroup data, otherwise we will block
# in case of sshrepo because we don't know the end of the stream
return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
+
def combinechangegroupresults(op):
"""logic to combine 0 or more addchangegroup results into one"""
- results = [r.get('return', 0)
- for r in op.records['changegroup']]
+ results = [r.get('return', 0) for r in op.records['changegroup']]
changedheads = 0
result = 1
for ret in results:
@@ -1802,8 +1926,10 @@
result = -1 + changedheads
return result
-@parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
- 'targetphase'))
+
+@parthandler(
+ 'changegroup', ('version', 'nbchanges', 'treemanifest', 'targetphase')
+)
def handlechangegroup(op, inpart):
"""apply a changegroup part on the repo
@@ -1821,33 +1947,51 @@
nbchangesets = None
if 'nbchanges' in inpart.params:
nbchangesets = int(inpart.params.get('nbchanges'))
- if ('treemanifest' in inpart.params and
- 'treemanifest' not in op.repo.requirements):
+ if (
+ 'treemanifest' in inpart.params
+ and 'treemanifest' not in op.repo.requirements
+ ):
if len(op.repo.changelog) != 0:
- raise error.Abort(_(
- "bundle contains tree manifests, but local repo is "
- "non-empty and does not use tree manifests"))
+ raise error.Abort(
+ _(
+ "bundle contains tree manifests, but local repo is "
+ "non-empty and does not use tree manifests"
+ )
+ )
op.repo.requirements.add('treemanifest')
op.repo.svfs.options = localrepo.resolvestorevfsoptions(
- op.repo.ui, op.repo.requirements, op.repo.features)
+ op.repo.ui, op.repo.requirements, op.repo.features
+ )
op.repo._writerequirements()
extrakwargs = {}
targetphase = inpart.params.get('targetphase')
if targetphase is not None:
extrakwargs[r'targetphase'] = int(targetphase)
- ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
- expectedtotal=nbchangesets, **extrakwargs)
+ ret = _processchangegroup(
+ op,
+ cg,
+ tr,
+ 'bundle2',
+ 'bundle2',
+ expectedtotal=nbchangesets,
+ **extrakwargs
+ )
if op.reply is not None:
# This is definitely not the final form of this
# return. But one need to start somewhere.
part = op.reply.newpart('reply:changegroup', mandatory=False)
part.addparam(
- 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
+ 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
+ )
part.addparam('return', '%i' % ret, mandatory=False)
assert not inpart.read()
-_remotechangegroupparams = tuple(['url', 'size', 'digests'] +
- ['digest:%s' % k for k in util.DIGESTS.keys()])
+
+_remotechangegroupparams = tuple(
+ ['url', 'size', 'digests'] + ['digest:%s' % k for k in util.DIGESTS.keys()]
+)
+
+
@parthandler('remote-changegroup', _remotechangegroupparams)
def handleremotechangegroup(op, inpart):
"""apply a bundle10 on the repo, given an url and validation information
@@ -1871,14 +2015,16 @@
raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
parsed_url = util.url(raw_url)
if parsed_url.scheme not in capabilities['remote-changegroup']:
- raise error.Abort(_('remote-changegroup does not support %s urls') %
- parsed_url.scheme)
+ raise error.Abort(
+ _('remote-changegroup does not support %s urls') % parsed_url.scheme
+ )
try:
size = int(inpart.params['size'])
except ValueError:
- raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
- % 'size')
+ raise error.Abort(
+ _('remote-changegroup: invalid value for param "%s"') % 'size'
+ )
except KeyError:
raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
@@ -1888,39 +2034,47 @@
try:
value = inpart.params[param]
except KeyError:
- raise error.Abort(_('remote-changegroup: missing "%s" param') %
- param)
+ raise error.Abort(
+ _('remote-changegroup: missing "%s" param') % param
+ )
digests[typ] = value
real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
tr = op.gettransaction()
from . import exchange
+
cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
if not isinstance(cg, changegroup.cg1unpacker):
- raise error.Abort(_('%s: not a bundle version 1.0') %
- util.hidepassword(raw_url))
+ raise error.Abort(
+ _('%s: not a bundle version 1.0') % util.hidepassword(raw_url)
+ )
ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
if op.reply is not None:
# This is definitely not the final form of this
# return. But one need to start somewhere.
part = op.reply.newpart('reply:changegroup')
part.addparam(
- 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
+ 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
+ )
part.addparam('return', '%i' % ret, mandatory=False)
try:
real_part.validate()
except error.Abort as e:
- raise error.Abort(_('bundle at %s is corrupted:\n%s') %
- (util.hidepassword(raw_url), bytes(e)))
+ raise error.Abort(
+ _('bundle at %s is corrupted:\n%s')
+ % (util.hidepassword(raw_url), bytes(e))
+ )
assert not inpart.read()
+
@parthandler('reply:changegroup', ('return', 'in-reply-to'))
def handlereplychangegroup(op, inpart):
ret = int(inpart.params['return'])
replyto = int(inpart.params['in-reply-to'])
op.records.add('changegroup', {'return': ret}, replyto)
+
@parthandler('check:bookmarks')
def handlecheckbookmarks(op, inpart):
"""check location of bookmarks
@@ -1931,12 +2085,18 @@
"""
bookdata = bookmarks.binarydecode(inpart)
- msgstandard = ('remote repository changed while pushing - please try again '
- '(bookmark "%s" move from %s to %s)')
- msgmissing = ('remote repository changed while pushing - please try again '
- '(bookmark "%s" is missing, expected %s)')
- msgexist = ('remote repository changed while pushing - please try again '
- '(bookmark "%s" set on %s, expected missing)')
+ msgstandard = (
+ 'remote repository changed while pushing - please try again '
+ '(bookmark "%s" move from %s to %s)'
+ )
+ msgmissing = (
+ 'remote repository changed while pushing - please try again '
+ '(bookmark "%s" is missing, expected %s)'
+ )
+ msgexist = (
+ 'remote repository changed while pushing - please try again '
+ '(bookmark "%s" set on %s, expected missing)'
+ )
for book, node in bookdata:
currentnode = op.repo._bookmarks.get(book)
if currentnode != node:
@@ -1945,10 +2105,14 @@
elif currentnode is None:
finalmsg = msgmissing % (book, nodemod.short(node))
else:
- finalmsg = msgstandard % (book, nodemod.short(node),
- nodemod.short(currentnode))
+ finalmsg = msgstandard % (
+ book,
+ nodemod.short(node),
+ nodemod.short(currentnode),
+ )
raise error.PushRaced(finalmsg)
+
@parthandler('check:heads')
def handlecheckheads(op, inpart):
"""check that head of the repo did not change
@@ -1965,8 +2129,10 @@
if op.ui.configbool('experimental', 'bundle2lazylocking'):
op.gettransaction()
if sorted(heads) != sorted(op.repo.heads()):
- raise error.PushRaced('remote repository changed while pushing - '
- 'please try again')
+ raise error.PushRaced(
+ 'remote repository changed while pushing - ' 'please try again'
+ )
+
@parthandler('check:updated-heads')
def handlecheckupdatedheads(op, inpart):
@@ -1994,8 +2160,10 @@
for h in heads:
if h not in currentheads:
- raise error.PushRaced('remote repository changed while pushing - '
- 'please try again')
+ raise error.PushRaced(
+ 'remote repository changed while pushing - ' 'please try again'
+ )
+
@parthandler('check:phases')
def handlecheckphases(op, inpart):
@@ -2007,23 +2175,29 @@
unfi = op.repo.unfiltered()
cl = unfi.changelog
phasecache = unfi._phasecache
- msg = ('remote repository changed while pushing - please try again '
- '(%s is %s expected %s)')
+ msg = (
+ 'remote repository changed while pushing - please try again '
+ '(%s is %s expected %s)'
+ )
for expectedphase, nodes in enumerate(phasetonodes):
for n in nodes:
actualphase = phasecache.phase(unfi, cl.rev(n))
if actualphase != expectedphase:
- finalmsg = msg % (nodemod.short(n),
- phases.phasenames[actualphase],
- phases.phasenames[expectedphase])
+ finalmsg = msg % (
+ nodemod.short(n),
+ phases.phasenames[actualphase],
+ phases.phasenames[expectedphase],
+ )
raise error.PushRaced(finalmsg)
+
@parthandler('output')
def handleoutput(op, inpart):
"""forward output captured on the server to the client"""
for line in inpart.read().splitlines():
op.ui.status(_('remote: %s\n') % line)
+
@parthandler('replycaps')
def handlereplycaps(op, inpart):
"""Notify that a reply bundle should be created
@@ -2033,17 +2207,22 @@
if op.reply is None:
op.reply = bundle20(op.ui, caps)
+
class AbortFromPart(error.Abort):
"""Sub-class of Abort that denotes an error from a bundle2 part."""
+
@parthandler('error:abort', ('message', 'hint'))
def handleerrorabort(op, inpart):
"""Used to transmit abort error over the wire"""
- raise AbortFromPart(inpart.params['message'],
- hint=inpart.params.get('hint'))
-
-@parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
- 'in-reply-to'))
+ raise AbortFromPart(
+ inpart.params['message'], hint=inpart.params.get('hint')
+ )
+
+
+@parthandler(
+ 'error:pushkey', ('namespace', 'key', 'new', 'old', 'ret', 'in-reply-to')
+)
def handleerrorpushkey(op, inpart):
"""Used to transmit failure of a mandatory pushkey over the wire"""
kwargs = {}
@@ -2051,8 +2230,10 @@
value = inpart.params.get(name)
if value is not None:
kwargs[name] = value
- raise error.PushkeyFailed(inpart.params['in-reply-to'],
- **pycompat.strkwargs(kwargs))
+ raise error.PushkeyFailed(
+ inpart.params['in-reply-to'], **pycompat.strkwargs(kwargs)
+ )
+
@parthandler('error:unsupportedcontent', ('parttype', 'params'))
def handleerrorunsupportedcontent(op, inpart):
@@ -2067,11 +2248,13 @@
raise error.BundleUnknownFeatureError(**pycompat.strkwargs(kwargs))
+
@parthandler('error:pushraced', ('message',))
def handleerrorpushraced(op, inpart):
"""Used to transmit push race error over the wire"""
raise error.ResponseError(_('push failed:'), inpart.params['message'])
+
@parthandler('listkeys', ('namespace',))
def handlelistkeys(op, inpart):
"""retrieve pushkey namespace content stored in a bundle2"""
@@ -2079,6 +2262,7 @@
r = pushkey.decodekeys(inpart.read())
op.records.add('listkeys', (namespace, r))
+
@parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
def handlepushkey(op, inpart):
"""process a pushkey request"""
@@ -2092,23 +2276,23 @@
if op.ui.configbool('experimental', 'bundle2lazylocking'):
op.gettransaction()
ret = op.repo.pushkey(namespace, key, old, new)
- record = {'namespace': namespace,
- 'key': key,
- 'old': old,
- 'new': new}
+ record = {'namespace': namespace, 'key': key, 'old': old, 'new': new}
op.records.add('pushkey', record)
if op.reply is not None:
rpart = op.reply.newpart('reply:pushkey')
rpart.addparam(
- 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
+ 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
+ )
rpart.addparam('return', '%i' % ret, mandatory=False)
if inpart.mandatory and not ret:
kwargs = {}
for key in ('namespace', 'key', 'new', 'old', 'ret'):
if key in inpart.params:
kwargs[key] = inpart.params[key]
- raise error.PushkeyFailed(partid='%d' % inpart.id,
- **pycompat.strkwargs(kwargs))
+ raise error.PushkeyFailed(
+ partid='%d' % inpart.id, **pycompat.strkwargs(kwargs)
+ )
+
@parthandler('bookmarks')
def handlebookmark(op, inpart):
@@ -2147,15 +2331,18 @@
allhooks.append(hookargs)
for hookargs in allhooks:
- op.repo.hook('prepushkey', throw=True,
- **pycompat.strkwargs(hookargs))
+ op.repo.hook(
+ 'prepushkey', throw=True, **pycompat.strkwargs(hookargs)
+ )
bookstore.applychanges(op.repo, op.gettransaction(), changes)
if pushkeycompat:
+
def runhook():
for hookargs in allhooks:
op.repo.hook('pushkey', **pycompat.strkwargs(hookargs))
+
op.repo._afterlock(runhook)
elif bookmarksmode == 'records':
@@ -2165,12 +2352,14 @@
else:
raise error.ProgrammingError('unkown bookmark mode: %s' % bookmarksmode)
+
@parthandler('phase-heads')
def handlephases(op, inpart):
"""apply phases from bundle part to repo"""
headsbyphase = phases.binarydecode(inpart)
phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
+
@parthandler('reply:pushkey', ('return', 'in-reply-to'))
def handlepushkeyreply(op, inpart):
"""retrieve the result of a pushkey request"""
@@ -2178,14 +2367,14 @@
partid = int(inpart.params['in-reply-to'])
op.records.add('pushkey', {'return': ret}, partid)
+
@parthandler('obsmarkers')
def handleobsmarker(op, inpart):
"""add a stream of obsmarkers to the repo"""
tr = op.gettransaction()
markerdata = inpart.read()
if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
- op.ui.write(('obsmarker-exchange: %i bytes received\n')
- % len(markerdata))
+ op.ui.write('obsmarker-exchange: %i bytes received\n' % len(markerdata))
# The mergemarkers call will crash if marker creation is not enabled.
# we want to avoid this if the part is advisory.
if not inpart.mandatory and op.repo.obsstore.readonly:
@@ -2197,7 +2386,8 @@
if op.reply is not None:
rpart = op.reply.newpart('reply:obsmarkers')
rpart.addparam(
- 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
+ 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
+ )
rpart.addparam('new', '%i' % new, mandatory=False)
@@ -2208,6 +2398,7 @@
partid = int(inpart.params['in-reply-to'])
op.records.add('obsmarkers', {'new': ret}, partid)
+
@parthandler('hgtagsfnodes')
def handlehgtagsfnodes(op, inpart):
"""Applies .hgtags fnodes cache entries to the local repo.
@@ -2232,8 +2423,10 @@
cache.write()
op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
+
rbcstruct = struct.Struct('>III')
+
@parthandler('cache:rev-branch-cache')
def handlerbc(op, inpart):
"""receive a rev-branch-cache payload and update the local cache
@@ -2266,6 +2459,7 @@
rawheader = inpart.read(rbcstruct.size)
cache.write()
+
@parthandler('pushvars')
def bundle2getvars(op, part):
'''unbundle a bundle2 containing shellvars on the server'''
@@ -2280,6 +2474,7 @@
hookargs[key] = value
op.addhookargs(hookargs)
+
@parthandler('stream2', ('requirements', 'filecount', 'bytecount'))
def handlestreamv2bundle(op, part):
@@ -2293,11 +2488,12 @@
raise error.Abort(msg)
repo.ui.debug('applying stream bundle\n')
- streamclone.applybundlev2(repo, part, filecount, bytecount,
- requirements)
-
-def widen_bundle(bundler, repo, oldmatcher, newmatcher, common,
- known, cgversion, ellipses):
+ streamclone.applybundlev2(repo, part, filecount, bytecount, requirements)
+
+
+def widen_bundle(
+ bundler, repo, oldmatcher, newmatcher, common, known, cgversion, ellipses
+):
"""generates bundle2 for widening a narrow clone
bundler is the bundle to which data should be added
@@ -2318,12 +2514,20 @@
if commonnodes:
# XXX: we should only send the filelogs (and treemanifest). user
# already has the changelog and manifest
- packer = changegroup.getbundler(cgversion, repo,
- oldmatcher=oldmatcher,
- matcher=newmatcher,
- fullnodes=commonnodes)
- cgdata = packer.generate({nodemod.nullid}, list(commonnodes),
- False, 'narrow_widen', changelog=False)
+ packer = changegroup.getbundler(
+ cgversion,
+ repo,
+ oldmatcher=oldmatcher,
+ matcher=newmatcher,
+ fullnodes=commonnodes,
+ )
+ cgdata = packer.generate(
+ {nodemod.nullid},
+ list(commonnodes),
+ False,
+ 'narrow_widen',
+ changelog=False,
+ )
part = bundler.newpart('changegroup', data=cgdata)
part.addparam('version', cgversion)