Mercurial > hg-stable
diff mercurial/bundle2.py @ 43076:2372284d9457
formatting: blacken the codebase
This is using my patch to black
(https://github.com/psf/black/pull/826) so we don't un-wrap collection
literals.
Done with:
hg files 'set:**.py - mercurial/thirdparty/** - "contrib/python-zstandard/**"' | xargs black -S
# skip-blame mass-reformatting only
# no-check-commit reformats foo_bar functions
Differential Revision: https://phab.mercurial-scm.org/D6971
author | Augie Fackler <augie@google.com> |
---|---|
date | Sun, 06 Oct 2019 09:45:02 -0400 |
parents | 181ee2118a96 |
children | 687b865b95ad |
line wrap: on
line diff
--- a/mercurial/bundle2.py Sat Oct 05 10:29:34 2019 -0400 +++ b/mercurial/bundle2.py Sun Oct 06 09:45:02 2019 -0400 @@ -171,9 +171,7 @@ url, util, ) -from .utils import ( - stringutil, -) +from .utils import stringutil urlerr = util.urlerr urlreq = util.urlreq @@ -192,31 +190,37 @@ _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]') + def outdebug(ui, message): """debug regarding output stream (bundling)""" if ui.configbool('devel', 'bundle2.debug'): ui.debug('bundle2-output: %s\n' % message) + def indebug(ui, message): """debug on input stream (unbundling)""" if ui.configbool('devel', 'bundle2.debug'): ui.debug('bundle2-input: %s\n' % message) + def validateparttype(parttype): """raise ValueError if a parttype contains invalid character""" if _parttypeforbidden.search(parttype): raise ValueError(parttype) + def _makefpartparamsizes(nbparams): """return a struct format to read part parameter sizes The number parameters is variable so we need to build that format dynamically. """ - return '>'+('BB'*nbparams) + return '>' + ('BB' * nbparams) + parthandlermapping = {} + def parthandler(parttype, params=()): """decorator that register a function as a bundle2 part handler @@ -228,14 +232,17 @@ ... """ validateparttype(parttype) + def _decorator(func): - lparttype = parttype.lower() # enforce lower case matching. + lparttype = parttype.lower() # enforce lower case matching. assert lparttype not in parthandlermapping parthandlermapping[lparttype] = func func.params = frozenset(params) return func + return _decorator + class unbundlerecords(object): """keep record of what happens during and unbundle @@ -283,6 +290,7 @@ __bool__ = __nonzero__ + class bundleoperation(object): """an object that represents a single bundling process @@ -328,13 +336,17 @@ def addhookargs(self, hookargs): if self.hookargs is None: - raise error.ProgrammingError('attempted to add hookargs to ' - 'operation after transaction started') + raise error.ProgrammingError( + 'attempted to add hookargs to ' + 'operation after transaction started' + ) self.hookargs.update(hookargs) + class TransactionUnavailable(RuntimeError): pass + def _notransaction(): """default method to get a transaction while processing a bundle @@ -342,6 +354,7 @@ to be created""" raise TransactionUnavailable() + def applybundle(repo, unbundler, tr, source, url=None, **kwargs): # transform me into unbundler.apply() as soon as the freeze is lifted if isinstance(unbundler, unbundle20): @@ -357,6 +370,7 @@ _processchangegroup(op, unbundler, tr, source, url, **kwargs) return op + class partiterator(object): def __init__(self, repo, op, unbundler): self.repo = repo @@ -375,6 +389,7 @@ yield p p.consume() self.current = None + self.iterator = func() return self.iterator @@ -422,8 +437,10 @@ if seekerror: raise exc - self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' % - self.count) + self.repo.ui.debug( + 'bundle2-input-bundle: %i parts total\n' % self.count + ) + def processbundle(repo, unbundler, transactiongetter=None, op=None, source=''): """This function process a bundle, apply effect to/from a repo @@ -461,20 +478,21 @@ return op + def processparts(repo, op, unbundler): with partiterator(repo, op, unbundler) as parts: for part in parts: _processpart(op, part) + def _processchangegroup(op, cg, tr, source, url, **kwargs): ret = cg.apply(op.repo, tr, source, url, **kwargs) - op.records.add('changegroup', { - 'return': ret, - }) + op.records.add('changegroup', {'return': ret,}) return ret + def _gethandler(op, part): - status = 'unknown' # used by debug output + status = 'unknown' # used by debug output try: handler = parthandlermapping.get(part.type) if handler is None: @@ -486,14 +504,15 @@ unknownparams = list(unknownparams) unknownparams.sort() status = 'unsupported-params (%s)' % ', '.join(unknownparams) - raise error.BundleUnknownFeatureError(parttype=part.type, - params=unknownparams) + raise error.BundleUnknownFeatureError( + parttype=part.type, params=unknownparams + ) status = 'supported' except error.BundleUnknownFeatureError as exc: - if part.mandatory: # mandatory parts + if part.mandatory: # mandatory parts raise indebug(op.ui, 'ignoring unsupported advisory part %s' % exc) - return # skip to part processing + return # skip to part processing finally: if op.ui.debugflag: msg = ['bundle2-input-part: "%s"' % part.type] @@ -513,6 +532,7 @@ return handler + def _processpart(op, part): """process a single part from a bundle @@ -536,10 +556,11 @@ if output is not None: output = op.ui.popbuffer() if output: - outpart = op.reply.newpart('output', data=output, - mandatory=False) + outpart = op.reply.newpart('output', data=output, mandatory=False) outpart.addparam( - 'in-reply-to', pycompat.bytestr(part.id), mandatory=False) + 'in-reply-to', pycompat.bytestr(part.id), mandatory=False + ) + def decodecaps(blob): """decode a bundle2 caps bytes blob into a dictionary @@ -564,6 +585,7 @@ caps[key] = vals return caps + def encodecaps(caps): """encode a bundle2 caps dictionary into a bytes blob""" chunks = [] @@ -576,11 +598,12 @@ chunks.append(ca) return '\n'.join(chunks) + bundletypes = { - "": ("", 'UN'), # only when using unbundle on ssh and old http servers - # since the unification ssh accepts a header but there - # is no capability signaling it. - "HG20": (), # special-cased below + "": ("", 'UN'), # only when using unbundle on ssh and old http servers + # since the unification ssh accepts a header but there + # is no capability signaling it. + "HG20": (), # special-cased below "HG10UN": ("HG10UN", 'UN'), "HG10BZ": ("HG10", 'BZ'), "HG10GZ": ("HG10GZ", 'GZ'), @@ -589,6 +612,7 @@ # hgweb uses this list to communicate its preferred type bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN'] + class bundle20(object): """represent an outgoing bundle2 container @@ -630,8 +654,9 @@ if not name: raise error.ProgrammingError(b'empty parameter name') if name[0:1] not in pycompat.bytestr(string.ascii_letters): - raise error.ProgrammingError(b'non letter first character: %s' - % name) + raise error.ProgrammingError( + b'non letter first character: %s' % name + ) self._params.append((name, value)) def addpart(self, part): @@ -639,7 +664,7 @@ Parts contains the actual applicative payload.""" assert part.id is None - part.id = len(self._parts) # very cheap counter + part.id = len(self._parts) # very cheap counter self._parts.append(part) def newpart(self, typeid, *args, **kwargs): @@ -670,8 +695,9 @@ yield _pack(_fstreamparamsize, len(param)) if param: yield param - for chunk in self._compengine.compressstream(self._getcorechunk(), - self._compopts): + for chunk in self._compengine.compressstream( + self._getcorechunk(), self._compopts + ): yield chunk def _paramchunk(self): @@ -697,7 +723,6 @@ outdebug(self.ui, 'end of bundle') yield _pack(_fpartheadersize, 0) - def salvageoutput(self): """return a list with a copy of all output parts in the bundle @@ -737,6 +762,7 @@ Do not use it to implement higher-level logic or methods.""" return changegroup.readexactly(self._fp, size) + def getunbundler(ui, fp, magicstring=None): """return a valid unbundler object for a given magicstring""" if magicstring is None: @@ -745,7 +771,8 @@ if magic != 'HG': ui.debug( "error: invalid magic: %r (version %r), should be 'HG'\n" - % (magic, version)) + % (magic, version) + ) raise error.Abort(_('not a Mercurial bundle')) unbundlerclass = formatmap.get(version) if unbundlerclass is None: @@ -754,6 +781,7 @@ indebug(ui, 'start processing of %s stream' % magicstring) return unbundler + class unbundle20(unpackermixin): """interpret a bundle2 stream @@ -776,8 +804,9 @@ params = {} paramssize = self._unpack(_fstreamparamsize)[0] if paramssize < 0: - raise error.BundleValueError('negative bundle param size: %i' - % paramssize) + raise error.BundleValueError( + 'negative bundle param size: %i' % paramssize + ) if paramssize: params = self._readexact(paramssize) params = self._processallparams(params) @@ -795,7 +824,6 @@ params[p[0]] = p[1] return params - def _processparam(self, name, value): """process a parameter, applying its effect if needed @@ -832,8 +860,9 @@ assert 'params' not in vars(self) paramssize = self._unpack(_fstreamparamsize)[0] if paramssize < 0: - raise error.BundleValueError('negative bundle param size: %i' - % paramssize) + raise error.BundleValueError( + 'negative bundle param size: %i' % paramssize + ) if paramssize: params = self._readexact(paramssize) self._processallparams(params) @@ -868,7 +897,6 @@ raise error.BundleValueError('negative chunk size: %i') yield self._readexact(size) - def iterparts(self, seekable=False): """yield all parts contained in the stream""" cls = seekableunbundlepart if seekable else unbundlepart @@ -894,15 +922,16 @@ returns None if empty""" headersize = self._unpack(_fpartheadersize)[0] if headersize < 0: - raise error.BundleValueError('negative part header size: %i' - % headersize) + raise error.BundleValueError( + 'negative part header size: %i' % headersize + ) indebug(self.ui, 'part header size: %i' % headersize) if headersize: return self._readexact(headersize) return None def compressed(self): - self.params # load params + self.params # load params return self._compressed def close(self): @@ -910,28 +939,33 @@ if util.safehasattr(self._fp, 'close'): return self._fp.close() + formatmap = {'20': unbundle20} b2streamparamsmap = {} + def b2streamparamhandler(name): """register a handler for a stream level parameter""" + def decorator(func): assert name not in formatmap b2streamparamsmap[name] = func return func + return decorator + @b2streamparamhandler('compression') def processcompression(unbundler, param, value): """read compression parameter and install payload decompression""" if value not in util.compengines.supportedbundletypes: - raise error.BundleUnknownFeatureError(params=(param,), - values=(value,)) + raise error.BundleUnknownFeatureError(params=(param,), values=(value,)) unbundler._compengine = util.compengines.forbundletype(value) if value is not None: unbundler._compressed = True + class bundlepart(object): """A bundle2 part contains application level payload @@ -948,8 +982,14 @@ Both data and parameters cannot be modified after the generation has begun. """ - def __init__(self, parttype, mandatoryparams=(), advisoryparams=(), - data='', mandatory=True): + def __init__( + self, + parttype, + mandatoryparams=(), + advisoryparams=(), + data='', + mandatory=True, + ): validateparttype(parttype) self.id = None self.type = parttype @@ -971,8 +1011,13 @@ def __repr__(self): cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__) - return ('<%s object at %x; id: %s; type: %s; mandatory: %s>' - % (cls, id(self), self.id, self.type, self.mandatory)) + return '<%s object at %x; id: %s; type: %s; mandatory: %s>' % ( + cls, + id(self), + self.id, + self.type, + self.mandatory, + ) def copy(self): """return a copy of the part @@ -980,8 +1025,13 @@ The new part have the very same content but no partid assigned yet. Parts with generated data cannot be copied.""" assert not util.safehasattr(self.data, 'next') - return self.__class__(self.type, self._mandatoryparams, - self._advisoryparams, self._data, self.mandatory) + return self.__class__( + self.type, + self._mandatoryparams, + self._advisoryparams, + self._data, + self.mandatory, + ) # methods used to defines the part content @property @@ -1043,8 +1093,9 @@ msg.append(')') if not self.data: msg.append(' empty payload') - elif (util.safehasattr(self.data, 'next') - or util.safehasattr(self.data, '__next__')): + elif util.safehasattr(self.data, 'next') or util.safehasattr( + self.data, '__next__' + ): msg.append(' streamed payload') else: msg.append(' %i bytes payload' % len(self.data)) @@ -1058,9 +1109,11 @@ parttype = self.type.lower() outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype)) ## parttype - header = [_pack(_fparttypesize, len(parttype)), - parttype, _pack(_fpartid, self.id), - ] + header = [ + _pack(_fparttypesize, len(parttype)), + parttype, + _pack(_fpartid, self.id), + ] ## parameters # count manpar = self.mandatoryparams @@ -1087,8 +1140,10 @@ try: headerchunk = ''.join(header) except TypeError: - raise TypeError(r'Found a non-bytes trying to ' - r'build bundle part header: %r' % header) + raise TypeError( + r'Found a non-bytes trying to ' + r'build bundle part header: %r' % header + ) outdebug(ui, 'header chunk size: %i' % len(headerchunk)) yield _pack(_fpartheadersize, len(headerchunk)) yield headerchunk @@ -1107,12 +1162,14 @@ except BaseException as exc: bexc = stringutil.forcebytestr(exc) # backup exception data for later - ui.debug('bundle2-input-stream-interrupt: encoding exception %s' - % bexc) + ui.debug( + 'bundle2-input-stream-interrupt: encoding exception %s' % bexc + ) tb = sys.exc_info()[2] msg = 'unexpected error: %s' % bexc - interpart = bundlepart('error:abort', [('message', msg)], - mandatory=False) + interpart = bundlepart( + 'error:abort', [('message', msg)], mandatory=False + ) interpart.id = 0 yield _pack(_fpayloadsize, -1) for chunk in interpart.getchunks(ui=ui): @@ -1132,8 +1189,9 @@ Exists to handle the different methods to provide data to a part.""" # we only support fixed size data now. # This will be improved in the future. - if (util.safehasattr(self.data, 'next') - or util.safehasattr(self.data, '__next__')): + if util.safehasattr(self.data, 'next') or util.safehasattr( + self.data, '__next__' + ): buff = util.chunkbuffer(self.data) chunk = buff.read(preferedchunksize) while chunk: @@ -1145,6 +1203,7 @@ flaginterrupt = -1 + class interrupthandler(unpackermixin): """read one part and process it with restricted capability @@ -1163,8 +1222,9 @@ returns None if empty""" headersize = self._unpack(_fpartheadersize)[0] if headersize < 0: - raise error.BundleValueError('negative part header size: %i' - % headersize) + raise error.BundleValueError( + 'negative part header size: %i' % headersize + ) indebug(self.ui, 'part header size: %i\n' % headersize) if headersize: return self._readexact(headersize) @@ -1172,8 +1232,9 @@ def __call__(self): - self.ui.debug('bundle2-input-stream-interrupt:' - ' opening out of band context\n') + self.ui.debug( + 'bundle2-input-stream-interrupt:' ' opening out of band context\n' + ) indebug(self.ui, 'bundle2 stream interruption, looking for a part.') headerblock = self._readpartheader() if headerblock is None: @@ -1190,8 +1251,10 @@ finally: if not hardabort: part.consume() - self.ui.debug('bundle2-input-stream-interrupt:' - ' closing out of band context\n') + self.ui.debug( + 'bundle2-input-stream-interrupt:' ' closing out of band context\n' + ) + class interruptoperation(object): """A limited operation to be use by part handler during interruption @@ -1211,6 +1274,7 @@ def gettransaction(self): raise TransactionUnavailable('no repo access from stream interruption') + def decodepayloadchunks(ui, fh): """Reads bundle2 part payload data into chunks. @@ -1235,9 +1299,13 @@ if chunksize >= 0: s = read(chunksize) if len(s) < chunksize: - raise error.Abort(_('stream ended unexpectedly ' - ' (got %d bytes, expected %d)') % - (len(s), chunksize)) + raise error.Abort( + _( + 'stream ended unexpectedly ' + ' (got %d bytes, expected %d)' + ) + % (len(s), chunksize) + ) yield s elif chunksize == flaginterrupt: @@ -1246,13 +1314,15 @@ interrupthandler(ui, fh)() else: raise error.BundleValueError( - 'negative payload chunk size: %s' % chunksize) + 'negative payload chunk size: %s' % chunksize + ) s = read(headersize) if len(s) < headersize: - raise error.Abort(_('stream ended unexpectedly ' - ' (got %d bytes, expected %d)') % - (len(s), chunksize)) + raise error.Abort( + _('stream ended unexpectedly ' ' (got %d bytes, expected %d)') + % (len(s), chunksize) + ) chunksize = unpack(s)[0] @@ -1260,13 +1330,15 @@ if dolog: debug('bundle2-input: payload chunk size: %i\n' % chunksize) + class unbundlepart(unpackermixin): """a bundle part read from a bundle""" def __init__(self, ui, header, fp): super(unbundlepart, self).__init__(fp) - self._seekable = (util.safehasattr(fp, 'seek') and - util.safehasattr(fp, 'tell')) + self._seekable = util.safehasattr(fp, 'seek') and util.safehasattr( + fp, 'tell' + ) self.ui = ui # unbundle state attr self._headerdata = header @@ -1287,7 +1359,7 @@ def _fromheader(self, size): """return the next <size> byte from the header""" offset = self._headeroffset - data = self._headerdata[offset:(offset + size)] + data = self._headerdata[offset : (offset + size)] self._headeroffset = offset + size return data @@ -1302,7 +1374,7 @@ """internal function to setup all logic related parameters""" # make it read only to prevent people touching it by mistake. self.mandatoryparams = tuple(mandatoryparams) - self.advisoryparams = tuple(advisoryparams) + self.advisoryparams = tuple(advisoryparams) # user friendly UI self.params = util.sortdict(self.mandatoryparams) self.params.update(self.advisoryparams) @@ -1316,7 +1388,7 @@ self.id = self._unpackheader(_fpartid)[0] indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id)) # extract mandatory bit from type - self.mandatory = (self.type != self.type.lower()) + self.mandatory = self.type != self.type.lower() self.type = self.type.lower() ## reading parameters # param count @@ -1372,11 +1444,13 @@ self._pos += len(data) if size is None or len(data) < size: if not self.consumed and self._pos: - self.ui.debug('bundle2-input-part: total payload size %i\n' - % self._pos) + self.ui.debug( + 'bundle2-input-part: total payload size %i\n' % self._pos + ) self.consumed = True return data + class seekableunbundlepart(unbundlepart): """A bundle2 part in a bundle that is seekable. @@ -1394,6 +1468,7 @@ to the number of chunks within the payload (which almost certainly increases in proportion with the size of the part). """ + def __init__(self, ui, header, fp): # (payload, file) offsets for chunk starts. self._chunkindex = [] @@ -1407,7 +1482,8 @@ self._chunkindex.append((0, self._tellfp())) else: assert chunknum < len(self._chunkindex), ( - 'Unknown chunk %d' % chunknum) + 'Unknown chunk %d' % chunknum + ) self._seekfp(self._chunkindex[chunknum][1]) pos = self._chunkindex[chunknum][0] @@ -1495,21 +1571,23 @@ raise return None + # These are only the static capabilities. # Check the 'getrepocaps' function for the rest. -capabilities = {'HG20': (), - 'bookmarks': (), - 'error': ('abort', 'unsupportedcontent', 'pushraced', - 'pushkey'), - 'listkeys': (), - 'pushkey': (), - 'digests': tuple(sorted(util.DIGESTS.keys())), - 'remote-changegroup': ('http', 'https'), - 'hgtagsfnodes': (), - 'rev-branch-cache': (), - 'phases': ('heads',), - 'stream': ('v2',), - } +capabilities = { + 'HG20': (), + 'bookmarks': (), + 'error': ('abort', 'unsupportedcontent', 'pushraced', 'pushkey'), + 'listkeys': (), + 'pushkey': (), + 'digests': tuple(sorted(util.DIGESTS.keys())), + 'remote-changegroup': ('http', 'https'), + 'hgtagsfnodes': (), + 'rev-branch-cache': (), + 'phases': ('heads',), + 'stream': ('v2',), +} + def getrepocaps(repo, allowpushback=False, role=None): """return the bundle2 capabilities for a given repo @@ -1524,8 +1602,9 @@ raise error.ProgrammingError('role argument must be client or server') caps = capabilities.copy() - caps['changegroup'] = tuple(sorted( - changegroup.supportedincomingversions(repo))) + caps['changegroup'] = tuple( + sorted(changegroup.supportedincomingversions(repo)) + ) if obsolete.isenabled(repo, obsolete.exchangeopt): supportedformat = tuple('V%i' % v for v in obsolete.formats) caps['obsmarkers'] = supportedformat @@ -1539,8 +1618,9 @@ # Don't advertise stream clone support in server mode if not configured. if role == 'server': - streamsupported = repo.ui.configbool('server', 'uncompressed', - untrusted=True) + streamsupported = repo.ui.configbool( + 'server', 'uncompressed', untrusted=True + ) featuresupported = repo.ui.configbool('server', 'bundle2.stream') if not streamsupported or not featuresupported: @@ -1550,6 +1630,7 @@ return caps + def bundle2caps(remote): """return the bundle capabilities of a peer as dict""" raw = remote.capable('bundle2') @@ -1558,18 +1639,37 @@ capsblob = urlreq.unquote(remote.capable('bundle2')) return decodecaps(capsblob) + def obsmarkersversion(caps): """extract the list of supported obsmarkers versions from a bundle2caps dict """ obscaps = caps.get('obsmarkers', ()) return [int(c[1:]) for c in obscaps if c.startswith('V')] -def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts, - vfs=None, compression=None, compopts=None): + +def writenewbundle( + ui, + repo, + source, + filename, + bundletype, + outgoing, + opts, + vfs=None, + compression=None, + compopts=None, +): if bundletype.startswith('HG10'): cg = changegroup.makechangegroup(repo, outgoing, '01', source) - return writebundle(ui, cg, filename, bundletype, vfs=vfs, - compression=compression, compopts=compopts) + return writebundle( + ui, + cg, + filename, + bundletype, + vfs=vfs, + compression=compression, + compopts=compopts, + ) elif not bundletype.startswith('HG20'): raise error.ProgrammingError('unknown bundle type: %s' % bundletype) @@ -1583,6 +1683,7 @@ return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs) + def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts): # We should eventually reconcile this logic with the one behind # 'exchange.getbundle2partsgenerator'. @@ -1601,10 +1702,12 @@ part = bundler.newpart('changegroup', data=cg.getchunks()) part.addparam('version', cg.version) if 'clcount' in cg.extras: - part.addparam('nbchanges', '%d' % cg.extras['clcount'], - mandatory=False) - if opts.get('phases') and repo.revs('%ln and secret()', - outgoing.missingheads): + part.addparam( + 'nbchanges', '%d' % cg.extras['clcount'], mandatory=False + ) + if opts.get('phases') and repo.revs( + '%ln and secret()', outgoing.missingheads + ): part.addparam('targetphase', '%d' % phases.secret, mandatory=False) if opts.get('streamv2', False): @@ -1625,6 +1728,7 @@ phasedata = phases.binaryencode(headsbyphase) bundler.newpart('phase-heads', data=phasedata) + def addparttagsfnodescache(repo, bundler, outgoing): # we include the tags fnode cache for the bundle changeset # (as an optional parts) @@ -1649,6 +1753,7 @@ if chunks: bundler.newpart('hgtagsfnodes', data=''.join(chunks)) + def addpartrevbranchcache(repo, bundler, outgoing): # we include the rev branch cache for the bundle changeset # (as an optional parts) @@ -1669,28 +1774,36 @@ for n in sorted(closed): yield n - bundler.newpart('cache:rev-branch-cache', data=generate(), - mandatory=False) + bundler.newpart('cache:rev-branch-cache', data=generate(), mandatory=False) + def _formatrequirementsspec(requirements): requirements = [req for req in requirements if req != "shared"] return urlreq.quote(','.join(sorted(requirements))) + def _formatrequirementsparams(requirements): requirements = _formatrequirementsspec(requirements) params = "%s%s" % (urlreq.quote("requirements="), requirements) return params + def addpartbundlestream2(bundler, repo, **kwargs): if not kwargs.get(r'stream', False): return if not streamclone.allowservergeneration(repo): - raise error.Abort(_('stream data requested but server does not allow ' - 'this feature'), - hint=_('well-behaved clients should not be ' - 'requesting stream data from servers not ' - 'advertising it; the client may be buggy')) + raise error.Abort( + _( + 'stream data requested but server does not allow ' + 'this feature' + ), + hint=_( + 'well-behaved clients should not be ' + 'requesting stream data from servers not ' + 'advertising it; the client may be buggy' + ), + ) # Stream clones don't compress well. And compression undermines a # goal of stream clones, which is to be fast. Communicate the desire @@ -1701,8 +1814,9 @@ includepats = kwargs.get(r'includepats') excludepats = kwargs.get(r'excludepats') - narrowstream = repo.ui.configbool('experimental', - 'server.stream-narrow-clones') + narrowstream = repo.ui.configbool( + 'experimental', 'server.stream-narrow-clones' + ) if (includepats or excludepats) and not narrowstream: raise error.Abort(_('server does not support narrow stream clones')) @@ -1711,20 +1825,25 @@ if repo.obsstore: remoteversions = obsmarkersversion(bundler.capabilities) if not remoteversions: - raise error.Abort(_('server has obsolescence markers, but client ' - 'cannot receive them via stream clone')) + raise error.Abort( + _( + 'server has obsolescence markers, but client ' + 'cannot receive them via stream clone' + ) + ) elif repo.obsstore._version in remoteversions: includeobsmarkers = True - filecount, bytecount, it = streamclone.generatev2(repo, includepats, - excludepats, - includeobsmarkers) + filecount, bytecount, it = streamclone.generatev2( + repo, includepats, excludepats, includeobsmarkers + ) requirements = _formatrequirementsspec(repo.requirements) part = bundler.newpart('stream2', data=it) part.addparam('bytecount', '%d' % bytecount, mandatory=True) part.addparam('filecount', '%d' % filecount, mandatory=True) part.addparam('requirements', requirements, mandatory=True) + def buildobsmarkerspart(bundler, markers): """add an obsmarker part to the bundler with <markers> @@ -1741,8 +1860,10 @@ stream = obsolete.encodemarkers(markers, True, version=version) return bundler.newpart('obsmarkers', data=stream) -def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None, - compopts=None): + +def writebundle( + ui, cg, filename, bundletype, vfs=None, compression=None, compopts=None +): """Write a bundle file and return its filename. Existing files will not be overwritten. @@ -1757,34 +1878,37 @@ part = bundle.newpart('changegroup', data=cg.getchunks()) part.addparam('version', cg.version) if 'clcount' in cg.extras: - part.addparam('nbchanges', '%d' % cg.extras['clcount'], - mandatory=False) + part.addparam( + 'nbchanges', '%d' % cg.extras['clcount'], mandatory=False + ) chunkiter = bundle.getchunks() else: # compression argument is only for the bundle2 case assert compression is None if cg.version != '01': - raise error.Abort(_('old bundle types only supports v1 ' - 'changegroups')) + raise error.Abort( + _('old bundle types only supports v1 ' 'changegroups') + ) header, comp = bundletypes[bundletype] if comp not in util.compengines.supportedbundletypes: - raise error.Abort(_('unknown stream compression type: %s') - % comp) + raise error.Abort(_('unknown stream compression type: %s') % comp) compengine = util.compengines.forbundletype(comp) + def chunkiter(): yield header for chunk in compengine.compressstream(cg.getchunks(), compopts): yield chunk + chunkiter = chunkiter() # parse the changegroup data, otherwise we will block # in case of sshrepo because we don't know the end of the stream return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs) + def combinechangegroupresults(op): """logic to combine 0 or more addchangegroup results into one""" - results = [r.get('return', 0) - for r in op.records['changegroup']] + results = [r.get('return', 0) for r in op.records['changegroup']] changedheads = 0 result = 1 for ret in results: @@ -1802,8 +1926,10 @@ result = -1 + changedheads return result -@parthandler('changegroup', ('version', 'nbchanges', 'treemanifest', - 'targetphase')) + +@parthandler( + 'changegroup', ('version', 'nbchanges', 'treemanifest', 'targetphase') +) def handlechangegroup(op, inpart): """apply a changegroup part on the repo @@ -1821,33 +1947,51 @@ nbchangesets = None if 'nbchanges' in inpart.params: nbchangesets = int(inpart.params.get('nbchanges')) - if ('treemanifest' in inpart.params and - 'treemanifest' not in op.repo.requirements): + if ( + 'treemanifest' in inpart.params + and 'treemanifest' not in op.repo.requirements + ): if len(op.repo.changelog) != 0: - raise error.Abort(_( - "bundle contains tree manifests, but local repo is " - "non-empty and does not use tree manifests")) + raise error.Abort( + _( + "bundle contains tree manifests, but local repo is " + "non-empty and does not use tree manifests" + ) + ) op.repo.requirements.add('treemanifest') op.repo.svfs.options = localrepo.resolvestorevfsoptions( - op.repo.ui, op.repo.requirements, op.repo.features) + op.repo.ui, op.repo.requirements, op.repo.features + ) op.repo._writerequirements() extrakwargs = {} targetphase = inpart.params.get('targetphase') if targetphase is not None: extrakwargs[r'targetphase'] = int(targetphase) - ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2', - expectedtotal=nbchangesets, **extrakwargs) + ret = _processchangegroup( + op, + cg, + tr, + 'bundle2', + 'bundle2', + expectedtotal=nbchangesets, + **extrakwargs + ) if op.reply is not None: # This is definitely not the final form of this # return. But one need to start somewhere. part = op.reply.newpart('reply:changegroup', mandatory=False) part.addparam( - 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False) + 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False + ) part.addparam('return', '%i' % ret, mandatory=False) assert not inpart.read() -_remotechangegroupparams = tuple(['url', 'size', 'digests'] + - ['digest:%s' % k for k in util.DIGESTS.keys()]) + +_remotechangegroupparams = tuple( + ['url', 'size', 'digests'] + ['digest:%s' % k for k in util.DIGESTS.keys()] +) + + @parthandler('remote-changegroup', _remotechangegroupparams) def handleremotechangegroup(op, inpart): """apply a bundle10 on the repo, given an url and validation information @@ -1871,14 +2015,16 @@ raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url') parsed_url = util.url(raw_url) if parsed_url.scheme not in capabilities['remote-changegroup']: - raise error.Abort(_('remote-changegroup does not support %s urls') % - parsed_url.scheme) + raise error.Abort( + _('remote-changegroup does not support %s urls') % parsed_url.scheme + ) try: size = int(inpart.params['size']) except ValueError: - raise error.Abort(_('remote-changegroup: invalid value for param "%s"') - % 'size') + raise error.Abort( + _('remote-changegroup: invalid value for param "%s"') % 'size' + ) except KeyError: raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size') @@ -1888,39 +2034,47 @@ try: value = inpart.params[param] except KeyError: - raise error.Abort(_('remote-changegroup: missing "%s" param') % - param) + raise error.Abort( + _('remote-changegroup: missing "%s" param') % param + ) digests[typ] = value real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests) tr = op.gettransaction() from . import exchange + cg = exchange.readbundle(op.repo.ui, real_part, raw_url) if not isinstance(cg, changegroup.cg1unpacker): - raise error.Abort(_('%s: not a bundle version 1.0') % - util.hidepassword(raw_url)) + raise error.Abort( + _('%s: not a bundle version 1.0') % util.hidepassword(raw_url) + ) ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2') if op.reply is not None: # This is definitely not the final form of this # return. But one need to start somewhere. part = op.reply.newpart('reply:changegroup') part.addparam( - 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False) + 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False + ) part.addparam('return', '%i' % ret, mandatory=False) try: real_part.validate() except error.Abort as e: - raise error.Abort(_('bundle at %s is corrupted:\n%s') % - (util.hidepassword(raw_url), bytes(e))) + raise error.Abort( + _('bundle at %s is corrupted:\n%s') + % (util.hidepassword(raw_url), bytes(e)) + ) assert not inpart.read() + @parthandler('reply:changegroup', ('return', 'in-reply-to')) def handlereplychangegroup(op, inpart): ret = int(inpart.params['return']) replyto = int(inpart.params['in-reply-to']) op.records.add('changegroup', {'return': ret}, replyto) + @parthandler('check:bookmarks') def handlecheckbookmarks(op, inpart): """check location of bookmarks @@ -1931,12 +2085,18 @@ """ bookdata = bookmarks.binarydecode(inpart) - msgstandard = ('remote repository changed while pushing - please try again ' - '(bookmark "%s" move from %s to %s)') - msgmissing = ('remote repository changed while pushing - please try again ' - '(bookmark "%s" is missing, expected %s)') - msgexist = ('remote repository changed while pushing - please try again ' - '(bookmark "%s" set on %s, expected missing)') + msgstandard = ( + 'remote repository changed while pushing - please try again ' + '(bookmark "%s" move from %s to %s)' + ) + msgmissing = ( + 'remote repository changed while pushing - please try again ' + '(bookmark "%s" is missing, expected %s)' + ) + msgexist = ( + 'remote repository changed while pushing - please try again ' + '(bookmark "%s" set on %s, expected missing)' + ) for book, node in bookdata: currentnode = op.repo._bookmarks.get(book) if currentnode != node: @@ -1945,10 +2105,14 @@ elif currentnode is None: finalmsg = msgmissing % (book, nodemod.short(node)) else: - finalmsg = msgstandard % (book, nodemod.short(node), - nodemod.short(currentnode)) + finalmsg = msgstandard % ( + book, + nodemod.short(node), + nodemod.short(currentnode), + ) raise error.PushRaced(finalmsg) + @parthandler('check:heads') def handlecheckheads(op, inpart): """check that head of the repo did not change @@ -1965,8 +2129,10 @@ if op.ui.configbool('experimental', 'bundle2lazylocking'): op.gettransaction() if sorted(heads) != sorted(op.repo.heads()): - raise error.PushRaced('remote repository changed while pushing - ' - 'please try again') + raise error.PushRaced( + 'remote repository changed while pushing - ' 'please try again' + ) + @parthandler('check:updated-heads') def handlecheckupdatedheads(op, inpart): @@ -1994,8 +2160,10 @@ for h in heads: if h not in currentheads: - raise error.PushRaced('remote repository changed while pushing - ' - 'please try again') + raise error.PushRaced( + 'remote repository changed while pushing - ' 'please try again' + ) + @parthandler('check:phases') def handlecheckphases(op, inpart): @@ -2007,23 +2175,29 @@ unfi = op.repo.unfiltered() cl = unfi.changelog phasecache = unfi._phasecache - msg = ('remote repository changed while pushing - please try again ' - '(%s is %s expected %s)') + msg = ( + 'remote repository changed while pushing - please try again ' + '(%s is %s expected %s)' + ) for expectedphase, nodes in enumerate(phasetonodes): for n in nodes: actualphase = phasecache.phase(unfi, cl.rev(n)) if actualphase != expectedphase: - finalmsg = msg % (nodemod.short(n), - phases.phasenames[actualphase], - phases.phasenames[expectedphase]) + finalmsg = msg % ( + nodemod.short(n), + phases.phasenames[actualphase], + phases.phasenames[expectedphase], + ) raise error.PushRaced(finalmsg) + @parthandler('output') def handleoutput(op, inpart): """forward output captured on the server to the client""" for line in inpart.read().splitlines(): op.ui.status(_('remote: %s\n') % line) + @parthandler('replycaps') def handlereplycaps(op, inpart): """Notify that a reply bundle should be created @@ -2033,17 +2207,22 @@ if op.reply is None: op.reply = bundle20(op.ui, caps) + class AbortFromPart(error.Abort): """Sub-class of Abort that denotes an error from a bundle2 part.""" + @parthandler('error:abort', ('message', 'hint')) def handleerrorabort(op, inpart): """Used to transmit abort error over the wire""" - raise AbortFromPart(inpart.params['message'], - hint=inpart.params.get('hint')) - -@parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret', - 'in-reply-to')) + raise AbortFromPart( + inpart.params['message'], hint=inpart.params.get('hint') + ) + + +@parthandler( + 'error:pushkey', ('namespace', 'key', 'new', 'old', 'ret', 'in-reply-to') +) def handleerrorpushkey(op, inpart): """Used to transmit failure of a mandatory pushkey over the wire""" kwargs = {} @@ -2051,8 +2230,10 @@ value = inpart.params.get(name) if value is not None: kwargs[name] = value - raise error.PushkeyFailed(inpart.params['in-reply-to'], - **pycompat.strkwargs(kwargs)) + raise error.PushkeyFailed( + inpart.params['in-reply-to'], **pycompat.strkwargs(kwargs) + ) + @parthandler('error:unsupportedcontent', ('parttype', 'params')) def handleerrorunsupportedcontent(op, inpart): @@ -2067,11 +2248,13 @@ raise error.BundleUnknownFeatureError(**pycompat.strkwargs(kwargs)) + @parthandler('error:pushraced', ('message',)) def handleerrorpushraced(op, inpart): """Used to transmit push race error over the wire""" raise error.ResponseError(_('push failed:'), inpart.params['message']) + @parthandler('listkeys', ('namespace',)) def handlelistkeys(op, inpart): """retrieve pushkey namespace content stored in a bundle2""" @@ -2079,6 +2262,7 @@ r = pushkey.decodekeys(inpart.read()) op.records.add('listkeys', (namespace, r)) + @parthandler('pushkey', ('namespace', 'key', 'old', 'new')) def handlepushkey(op, inpart): """process a pushkey request""" @@ -2092,23 +2276,23 @@ if op.ui.configbool('experimental', 'bundle2lazylocking'): op.gettransaction() ret = op.repo.pushkey(namespace, key, old, new) - record = {'namespace': namespace, - 'key': key, - 'old': old, - 'new': new} + record = {'namespace': namespace, 'key': key, 'old': old, 'new': new} op.records.add('pushkey', record) if op.reply is not None: rpart = op.reply.newpart('reply:pushkey') rpart.addparam( - 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False) + 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False + ) rpart.addparam('return', '%i' % ret, mandatory=False) if inpart.mandatory and not ret: kwargs = {} for key in ('namespace', 'key', 'new', 'old', 'ret'): if key in inpart.params: kwargs[key] = inpart.params[key] - raise error.PushkeyFailed(partid='%d' % inpart.id, - **pycompat.strkwargs(kwargs)) + raise error.PushkeyFailed( + partid='%d' % inpart.id, **pycompat.strkwargs(kwargs) + ) + @parthandler('bookmarks') def handlebookmark(op, inpart): @@ -2147,15 +2331,18 @@ allhooks.append(hookargs) for hookargs in allhooks: - op.repo.hook('prepushkey', throw=True, - **pycompat.strkwargs(hookargs)) + op.repo.hook( + 'prepushkey', throw=True, **pycompat.strkwargs(hookargs) + ) bookstore.applychanges(op.repo, op.gettransaction(), changes) if pushkeycompat: + def runhook(): for hookargs in allhooks: op.repo.hook('pushkey', **pycompat.strkwargs(hookargs)) + op.repo._afterlock(runhook) elif bookmarksmode == 'records': @@ -2165,12 +2352,14 @@ else: raise error.ProgrammingError('unkown bookmark mode: %s' % bookmarksmode) + @parthandler('phase-heads') def handlephases(op, inpart): """apply phases from bundle part to repo""" headsbyphase = phases.binarydecode(inpart) phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase) + @parthandler('reply:pushkey', ('return', 'in-reply-to')) def handlepushkeyreply(op, inpart): """retrieve the result of a pushkey request""" @@ -2178,14 +2367,14 @@ partid = int(inpart.params['in-reply-to']) op.records.add('pushkey', {'return': ret}, partid) + @parthandler('obsmarkers') def handleobsmarker(op, inpart): """add a stream of obsmarkers to the repo""" tr = op.gettransaction() markerdata = inpart.read() if op.ui.config('experimental', 'obsmarkers-exchange-debug'): - op.ui.write(('obsmarker-exchange: %i bytes received\n') - % len(markerdata)) + op.ui.write('obsmarker-exchange: %i bytes received\n' % len(markerdata)) # The mergemarkers call will crash if marker creation is not enabled. # we want to avoid this if the part is advisory. if not inpart.mandatory and op.repo.obsstore.readonly: @@ -2197,7 +2386,8 @@ if op.reply is not None: rpart = op.reply.newpart('reply:obsmarkers') rpart.addparam( - 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False) + 'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False + ) rpart.addparam('new', '%i' % new, mandatory=False) @@ -2208,6 +2398,7 @@ partid = int(inpart.params['in-reply-to']) op.records.add('obsmarkers', {'new': ret}, partid) + @parthandler('hgtagsfnodes') def handlehgtagsfnodes(op, inpart): """Applies .hgtags fnodes cache entries to the local repo. @@ -2232,8 +2423,10 @@ cache.write() op.ui.debug('applied %i hgtags fnodes cache entries\n' % count) + rbcstruct = struct.Struct('>III') + @parthandler('cache:rev-branch-cache') def handlerbc(op, inpart): """receive a rev-branch-cache payload and update the local cache @@ -2266,6 +2459,7 @@ rawheader = inpart.read(rbcstruct.size) cache.write() + @parthandler('pushvars') def bundle2getvars(op, part): '''unbundle a bundle2 containing shellvars on the server''' @@ -2280,6 +2474,7 @@ hookargs[key] = value op.addhookargs(hookargs) + @parthandler('stream2', ('requirements', 'filecount', 'bytecount')) def handlestreamv2bundle(op, part): @@ -2293,11 +2488,12 @@ raise error.Abort(msg) repo.ui.debug('applying stream bundle\n') - streamclone.applybundlev2(repo, part, filecount, bytecount, - requirements) - -def widen_bundle(bundler, repo, oldmatcher, newmatcher, common, - known, cgversion, ellipses): + streamclone.applybundlev2(repo, part, filecount, bytecount, requirements) + + +def widen_bundle( + bundler, repo, oldmatcher, newmatcher, common, known, cgversion, ellipses +): """generates bundle2 for widening a narrow clone bundler is the bundle to which data should be added @@ -2318,12 +2514,20 @@ if commonnodes: # XXX: we should only send the filelogs (and treemanifest). user # already has the changelog and manifest - packer = changegroup.getbundler(cgversion, repo, - oldmatcher=oldmatcher, - matcher=newmatcher, - fullnodes=commonnodes) - cgdata = packer.generate({nodemod.nullid}, list(commonnodes), - False, 'narrow_widen', changelog=False) + packer = changegroup.getbundler( + cgversion, + repo, + oldmatcher=oldmatcher, + matcher=newmatcher, + fullnodes=commonnodes, + ) + cgdata = packer.generate( + {nodemod.nullid}, + list(commonnodes), + False, + 'narrow_widen', + changelog=False, + ) part = bundler.newpart('changegroup', data=cgdata) part.addparam('version', cgversion)