mercurial/bundle2.py
changeset 43076 2372284d9457
parent 42935 181ee2118a96
child 43077 687b865b95ad
equal deleted inserted replaced
43075:57875cf423c9 43076:2372284d9457
   169     streamclone,
   169     streamclone,
   170     tags,
   170     tags,
   171     url,
   171     url,
   172     util,
   172     util,
   173 )
   173 )
   174 from .utils import (
   174 from .utils import stringutil
   175     stringutil,
       
   176 )
       
   177 
   175 
   178 urlerr = util.urlerr
   176 urlerr = util.urlerr
   179 urlreq = util.urlreq
   177 urlreq = util.urlreq
   180 
   178 
   181 _pack = struct.pack
   179 _pack = struct.pack
   190 
   188 
   191 preferedchunksize = 32768
   189 preferedchunksize = 32768
   192 
   190 
   193 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
   191 _parttypeforbidden = re.compile('[^a-zA-Z0-9_:-]')
   194 
   192 
       
   193 
   195 def outdebug(ui, message):
   194 def outdebug(ui, message):
   196     """debug regarding output stream (bundling)"""
   195     """debug regarding output stream (bundling)"""
   197     if ui.configbool('devel', 'bundle2.debug'):
   196     if ui.configbool('devel', 'bundle2.debug'):
   198         ui.debug('bundle2-output: %s\n' % message)
   197         ui.debug('bundle2-output: %s\n' % message)
   199 
   198 
       
   199 
   200 def indebug(ui, message):
   200 def indebug(ui, message):
   201     """debug on input stream (unbundling)"""
   201     """debug on input stream (unbundling)"""
   202     if ui.configbool('devel', 'bundle2.debug'):
   202     if ui.configbool('devel', 'bundle2.debug'):
   203         ui.debug('bundle2-input: %s\n' % message)
   203         ui.debug('bundle2-input: %s\n' % message)
   204 
   204 
       
   205 
   205 def validateparttype(parttype):
   206 def validateparttype(parttype):
   206     """raise ValueError if a parttype contains invalid character"""
   207     """raise ValueError if a parttype contains invalid character"""
   207     if _parttypeforbidden.search(parttype):
   208     if _parttypeforbidden.search(parttype):
   208         raise ValueError(parttype)
   209         raise ValueError(parttype)
   209 
   210 
       
   211 
   210 def _makefpartparamsizes(nbparams):
   212 def _makefpartparamsizes(nbparams):
   211     """return a struct format to read part parameter sizes
   213     """return a struct format to read part parameter sizes
   212 
   214 
   213     The number parameters is variable so we need to build that format
   215     The number parameters is variable so we need to build that format
   214     dynamically.
   216     dynamically.
   215     """
   217     """
   216     return '>'+('BB'*nbparams)
   218     return '>' + ('BB' * nbparams)
       
   219 
   217 
   220 
   218 parthandlermapping = {}
   221 parthandlermapping = {}
       
   222 
   219 
   223 
   220 def parthandler(parttype, params=()):
   224 def parthandler(parttype, params=()):
   221     """decorator that register a function as a bundle2 part handler
   225     """decorator that register a function as a bundle2 part handler
   222 
   226 
   223     eg::
   227     eg::
   226         def myparttypehandler(...):
   230         def myparttypehandler(...):
   227             '''process a part of type "my part".'''
   231             '''process a part of type "my part".'''
   228             ...
   232             ...
   229     """
   233     """
   230     validateparttype(parttype)
   234     validateparttype(parttype)
       
   235 
   231     def _decorator(func):
   236     def _decorator(func):
   232         lparttype = parttype.lower() # enforce lower case matching.
   237         lparttype = parttype.lower()  # enforce lower case matching.
   233         assert lparttype not in parthandlermapping
   238         assert lparttype not in parthandlermapping
   234         parthandlermapping[lparttype] = func
   239         parthandlermapping[lparttype] = func
   235         func.params = frozenset(params)
   240         func.params = frozenset(params)
   236         return func
   241         return func
       
   242 
   237     return _decorator
   243     return _decorator
       
   244 
   238 
   245 
   239 class unbundlerecords(object):
   246 class unbundlerecords(object):
   240     """keep record of what happens during and unbundle
   247     """keep record of what happens during and unbundle
   241 
   248 
   242     New records are added using `records.add('cat', obj)`. Where 'cat' is a
   249     New records are added using `records.add('cat', obj)`. Where 'cat' is a
   280 
   287 
   281     def __nonzero__(self):
   288     def __nonzero__(self):
   282         return bool(self._sequences)
   289         return bool(self._sequences)
   283 
   290 
   284     __bool__ = __nonzero__
   291     __bool__ = __nonzero__
       
   292 
   285 
   293 
   286 class bundleoperation(object):
   294 class bundleoperation(object):
   287     """an object that represents a single bundling process
   295     """an object that represents a single bundling process
   288 
   296 
   289     Its purpose is to carry unbundle-related objects and states.
   297     Its purpose is to carry unbundle-related objects and states.
   326 
   334 
   327         return transaction
   335         return transaction
   328 
   336 
   329     def addhookargs(self, hookargs):
   337     def addhookargs(self, hookargs):
   330         if self.hookargs is None:
   338         if self.hookargs is None:
   331             raise error.ProgrammingError('attempted to add hookargs to '
   339             raise error.ProgrammingError(
   332                                          'operation after transaction started')
   340                 'attempted to add hookargs to '
       
   341                 'operation after transaction started'
       
   342             )
   333         self.hookargs.update(hookargs)
   343         self.hookargs.update(hookargs)
       
   344 
   334 
   345 
   335 class TransactionUnavailable(RuntimeError):
   346 class TransactionUnavailable(RuntimeError):
   336     pass
   347     pass
       
   348 
   337 
   349 
   338 def _notransaction():
   350 def _notransaction():
   339     """default method to get a transaction while processing a bundle
   351     """default method to get a transaction while processing a bundle
   340 
   352 
   341     Raise an exception to highlight the fact that no transaction was expected
   353     Raise an exception to highlight the fact that no transaction was expected
   342     to be created"""
   354     to be created"""
   343     raise TransactionUnavailable()
   355     raise TransactionUnavailable()
       
   356 
   344 
   357 
   345 def applybundle(repo, unbundler, tr, source, url=None, **kwargs):
   358 def applybundle(repo, unbundler, tr, source, url=None, **kwargs):
   346     # transform me into unbundler.apply() as soon as the freeze is lifted
   359     # transform me into unbundler.apply() as soon as the freeze is lifted
   347     if isinstance(unbundler, unbundle20):
   360     if isinstance(unbundler, unbundle20):
   348         tr.hookargs['bundle2'] = '1'
   361         tr.hookargs['bundle2'] = '1'
   355         # the transactiongetter won't be used, but we might as well set it
   368         # the transactiongetter won't be used, but we might as well set it
   356         op = bundleoperation(repo, lambda: tr, source=source)
   369         op = bundleoperation(repo, lambda: tr, source=source)
   357         _processchangegroup(op, unbundler, tr, source, url, **kwargs)
   370         _processchangegroup(op, unbundler, tr, source, url, **kwargs)
   358         return op
   371         return op
   359 
   372 
       
   373 
   360 class partiterator(object):
   374 class partiterator(object):
   361     def __init__(self, repo, op, unbundler):
   375     def __init__(self, repo, op, unbundler):
   362         self.repo = repo
   376         self.repo = repo
   363         self.op = op
   377         self.op = op
   364         self.unbundler = unbundler
   378         self.unbundler = unbundler
   373                 self.count = count
   387                 self.count = count
   374                 self.current = p
   388                 self.current = p
   375                 yield p
   389                 yield p
   376                 p.consume()
   390                 p.consume()
   377                 self.current = None
   391                 self.current = None
       
   392 
   378         self.iterator = func()
   393         self.iterator = func()
   379         return self.iterator
   394         return self.iterator
   380 
   395 
   381     def __exit__(self, type, exc, tb):
   396     def __exit__(self, type, exc, tb):
   382         if not self.iterator:
   397         if not self.iterator:
   420             # Re-raising from a variable loses the original stack. So only use
   435             # Re-raising from a variable loses the original stack. So only use
   421             # that form if we need to.
   436             # that form if we need to.
   422             if seekerror:
   437             if seekerror:
   423                 raise exc
   438                 raise exc
   424 
   439 
   425         self.repo.ui.debug('bundle2-input-bundle: %i parts total\n' %
   440         self.repo.ui.debug(
   426                            self.count)
   441             'bundle2-input-bundle: %i parts total\n' % self.count
       
   442         )
       
   443 
   427 
   444 
   428 def processbundle(repo, unbundler, transactiongetter=None, op=None, source=''):
   445 def processbundle(repo, unbundler, transactiongetter=None, op=None, source=''):
   429     """This function process a bundle, apply effect to/from a repo
   446     """This function process a bundle, apply effect to/from a repo
   430 
   447 
   431     It iterates over each part then searches for and uses the proper handling
   448     It iterates over each part then searches for and uses the proper handling
   459 
   476 
   460     processparts(repo, op, unbundler)
   477     processparts(repo, op, unbundler)
   461 
   478 
   462     return op
   479     return op
   463 
   480 
       
   481 
   464 def processparts(repo, op, unbundler):
   482 def processparts(repo, op, unbundler):
   465     with partiterator(repo, op, unbundler) as parts:
   483     with partiterator(repo, op, unbundler) as parts:
   466         for part in parts:
   484         for part in parts:
   467             _processpart(op, part)
   485             _processpart(op, part)
   468 
   486 
       
   487 
   469 def _processchangegroup(op, cg, tr, source, url, **kwargs):
   488 def _processchangegroup(op, cg, tr, source, url, **kwargs):
   470     ret = cg.apply(op.repo, tr, source, url, **kwargs)
   489     ret = cg.apply(op.repo, tr, source, url, **kwargs)
   471     op.records.add('changegroup', {
   490     op.records.add('changegroup', {'return': ret,})
   472         'return': ret,
       
   473     })
       
   474     return ret
   491     return ret
   475 
   492 
       
   493 
   476 def _gethandler(op, part):
   494 def _gethandler(op, part):
   477     status = 'unknown' # used by debug output
   495     status = 'unknown'  # used by debug output
   478     try:
   496     try:
   479         handler = parthandlermapping.get(part.type)
   497         handler = parthandlermapping.get(part.type)
   480         if handler is None:
   498         if handler is None:
   481             status = 'unsupported-type'
   499             status = 'unsupported-type'
   482             raise error.BundleUnknownFeatureError(parttype=part.type)
   500             raise error.BundleUnknownFeatureError(parttype=part.type)
   484         unknownparams = part.mandatorykeys - handler.params
   502         unknownparams = part.mandatorykeys - handler.params
   485         if unknownparams:
   503         if unknownparams:
   486             unknownparams = list(unknownparams)
   504             unknownparams = list(unknownparams)
   487             unknownparams.sort()
   505             unknownparams.sort()
   488             status = 'unsupported-params (%s)' % ', '.join(unknownparams)
   506             status = 'unsupported-params (%s)' % ', '.join(unknownparams)
   489             raise error.BundleUnknownFeatureError(parttype=part.type,
   507             raise error.BundleUnknownFeatureError(
   490                                                   params=unknownparams)
   508                 parttype=part.type, params=unknownparams
       
   509             )
   491         status = 'supported'
   510         status = 'supported'
   492     except error.BundleUnknownFeatureError as exc:
   511     except error.BundleUnknownFeatureError as exc:
   493         if part.mandatory: # mandatory parts
   512         if part.mandatory:  # mandatory parts
   494             raise
   513             raise
   495         indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
   514         indebug(op.ui, 'ignoring unsupported advisory part %s' % exc)
   496         return # skip to part processing
   515         return  # skip to part processing
   497     finally:
   516     finally:
   498         if op.ui.debugflag:
   517         if op.ui.debugflag:
   499             msg = ['bundle2-input-part: "%s"' % part.type]
   518             msg = ['bundle2-input-part: "%s"' % part.type]
   500             if not part.mandatory:
   519             if not part.mandatory:
   501                 msg.append(' (advisory)')
   520                 msg.append(' (advisory)')
   511             msg.append(' %s\n' % status)
   530             msg.append(' %s\n' % status)
   512             op.ui.debug(''.join(msg))
   531             op.ui.debug(''.join(msg))
   513 
   532 
   514     return handler
   533     return handler
   515 
   534 
       
   535 
   516 def _processpart(op, part):
   536 def _processpart(op, part):
   517     """process a single part from a bundle
   537     """process a single part from a bundle
   518 
   538 
   519     The part is guaranteed to have been fully consumed when the function exits
   539     The part is guaranteed to have been fully consumed when the function exits
   520     (even if an exception is raised)."""
   540     (even if an exception is raised)."""
   534         handler(op, part)
   554         handler(op, part)
   535     finally:
   555     finally:
   536         if output is not None:
   556         if output is not None:
   537             output = op.ui.popbuffer()
   557             output = op.ui.popbuffer()
   538         if output:
   558         if output:
   539             outpart = op.reply.newpart('output', data=output,
   559             outpart = op.reply.newpart('output', data=output, mandatory=False)
   540                                        mandatory=False)
       
   541             outpart.addparam(
   560             outpart.addparam(
   542                 'in-reply-to', pycompat.bytestr(part.id), mandatory=False)
   561                 'in-reply-to', pycompat.bytestr(part.id), mandatory=False
       
   562             )
       
   563 
   543 
   564 
   544 def decodecaps(blob):
   565 def decodecaps(blob):
   545     """decode a bundle2 caps bytes blob into a dictionary
   566     """decode a bundle2 caps bytes blob into a dictionary
   546 
   567 
   547     The blob is a list of capabilities (one per line)
   568     The blob is a list of capabilities (one per line)
   562         key = urlreq.unquote(key)
   583         key = urlreq.unquote(key)
   563         vals = [urlreq.unquote(v) for v in vals]
   584         vals = [urlreq.unquote(v) for v in vals]
   564         caps[key] = vals
   585         caps[key] = vals
   565     return caps
   586     return caps
   566 
   587 
       
   588 
   567 def encodecaps(caps):
   589 def encodecaps(caps):
   568     """encode a bundle2 caps dictionary into a bytes blob"""
   590     """encode a bundle2 caps dictionary into a bytes blob"""
   569     chunks = []
   591     chunks = []
   570     for ca in sorted(caps):
   592     for ca in sorted(caps):
   571         vals = caps[ca]
   593         vals = caps[ca]
   574         if vals:
   596         if vals:
   575             ca = "%s=%s" % (ca, ','.join(vals))
   597             ca = "%s=%s" % (ca, ','.join(vals))
   576         chunks.append(ca)
   598         chunks.append(ca)
   577     return '\n'.join(chunks)
   599     return '\n'.join(chunks)
   578 
   600 
       
   601 
   579 bundletypes = {
   602 bundletypes = {
   580     "": ("", 'UN'),       # only when using unbundle on ssh and old http servers
   603     "": ("", 'UN'),  # only when using unbundle on ssh and old http servers
   581                           # since the unification ssh accepts a header but there
   604     # since the unification ssh accepts a header but there
   582                           # is no capability signaling it.
   605     # is no capability signaling it.
   583     "HG20": (), # special-cased below
   606     "HG20": (),  # special-cased below
   584     "HG10UN": ("HG10UN", 'UN'),
   607     "HG10UN": ("HG10UN", 'UN'),
   585     "HG10BZ": ("HG10", 'BZ'),
   608     "HG10BZ": ("HG10", 'BZ'),
   586     "HG10GZ": ("HG10GZ", 'GZ'),
   609     "HG10GZ": ("HG10GZ", 'GZ'),
   587 }
   610 }
   588 
   611 
   589 # hgweb uses this list to communicate its preferred type
   612 # hgweb uses this list to communicate its preferred type
   590 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
   613 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
       
   614 
   591 
   615 
   592 class bundle20(object):
   616 class bundle20(object):
   593     """represent an outgoing bundle2 container
   617     """represent an outgoing bundle2 container
   594 
   618 
   595     Use the `addparam` method to add stream level parameter. and `newpart` to
   619     Use the `addparam` method to add stream level parameter. and `newpart` to
   628     def addparam(self, name, value=None):
   652     def addparam(self, name, value=None):
   629         """add a stream level parameter"""
   653         """add a stream level parameter"""
   630         if not name:
   654         if not name:
   631             raise error.ProgrammingError(b'empty parameter name')
   655             raise error.ProgrammingError(b'empty parameter name')
   632         if name[0:1] not in pycompat.bytestr(string.ascii_letters):
   656         if name[0:1] not in pycompat.bytestr(string.ascii_letters):
   633             raise error.ProgrammingError(b'non letter first character: %s'
   657             raise error.ProgrammingError(
   634                                          % name)
   658                 b'non letter first character: %s' % name
       
   659             )
   635         self._params.append((name, value))
   660         self._params.append((name, value))
   636 
   661 
   637     def addpart(self, part):
   662     def addpart(self, part):
   638         """add a new part to the bundle2 container
   663         """add a new part to the bundle2 container
   639 
   664 
   640         Parts contains the actual applicative payload."""
   665         Parts contains the actual applicative payload."""
   641         assert part.id is None
   666         assert part.id is None
   642         part.id = len(self._parts) # very cheap counter
   667         part.id = len(self._parts)  # very cheap counter
   643         self._parts.append(part)
   668         self._parts.append(part)
   644 
   669 
   645     def newpart(self, typeid, *args, **kwargs):
   670     def newpart(self, typeid, *args, **kwargs):
   646         """create a new part and add it to the containers
   671         """create a new part and add it to the containers
   647 
   672 
   668         param = self._paramchunk()
   693         param = self._paramchunk()
   669         outdebug(self.ui, 'bundle parameter: %s' % param)
   694         outdebug(self.ui, 'bundle parameter: %s' % param)
   670         yield _pack(_fstreamparamsize, len(param))
   695         yield _pack(_fstreamparamsize, len(param))
   671         if param:
   696         if param:
   672             yield param
   697             yield param
   673         for chunk in self._compengine.compressstream(self._getcorechunk(),
   698         for chunk in self._compengine.compressstream(
   674                                                      self._compopts):
   699             self._getcorechunk(), self._compopts
       
   700         ):
   675             yield chunk
   701             yield chunk
   676 
   702 
   677     def _paramchunk(self):
   703     def _paramchunk(self):
   678         """return a encoded version of all stream parameters"""
   704         """return a encoded version of all stream parameters"""
   679         blocks = []
   705         blocks = []
   695             for chunk in part.getchunks(ui=self.ui):
   721             for chunk in part.getchunks(ui=self.ui):
   696                 yield chunk
   722                 yield chunk
   697         outdebug(self.ui, 'end of bundle')
   723         outdebug(self.ui, 'end of bundle')
   698         yield _pack(_fpartheadersize, 0)
   724         yield _pack(_fpartheadersize, 0)
   699 
   725 
   700 
       
   701     def salvageoutput(self):
   726     def salvageoutput(self):
   702         """return a list with a copy of all output parts in the bundle
   727         """return a list with a copy of all output parts in the bundle
   703 
   728 
   704         This is meant to be used during error handling to make sure we preserve
   729         This is meant to be used during error handling to make sure we preserve
   705         server output"""
   730         server output"""
   734         They directly manipulate the low level stream including bundle2 level
   759         They directly manipulate the low level stream including bundle2 level
   735         instruction.
   760         instruction.
   736 
   761 
   737         Do not use it to implement higher-level logic or methods."""
   762         Do not use it to implement higher-level logic or methods."""
   738         return changegroup.readexactly(self._fp, size)
   763         return changegroup.readexactly(self._fp, size)
       
   764 
   739 
   765 
   740 def getunbundler(ui, fp, magicstring=None):
   766 def getunbundler(ui, fp, magicstring=None):
   741     """return a valid unbundler object for a given magicstring"""
   767     """return a valid unbundler object for a given magicstring"""
   742     if magicstring is None:
   768     if magicstring is None:
   743         magicstring = changegroup.readexactly(fp, 4)
   769         magicstring = changegroup.readexactly(fp, 4)
   744     magic, version = magicstring[0:2], magicstring[2:4]
   770     magic, version = magicstring[0:2], magicstring[2:4]
   745     if magic != 'HG':
   771     if magic != 'HG':
   746         ui.debug(
   772         ui.debug(
   747             "error: invalid magic: %r (version %r), should be 'HG'\n"
   773             "error: invalid magic: %r (version %r), should be 'HG'\n"
   748             % (magic, version))
   774             % (magic, version)
       
   775         )
   749         raise error.Abort(_('not a Mercurial bundle'))
   776         raise error.Abort(_('not a Mercurial bundle'))
   750     unbundlerclass = formatmap.get(version)
   777     unbundlerclass = formatmap.get(version)
   751     if unbundlerclass is None:
   778     if unbundlerclass is None:
   752         raise error.Abort(_('unknown bundle version %s') % version)
   779         raise error.Abort(_('unknown bundle version %s') % version)
   753     unbundler = unbundlerclass(ui, fp)
   780     unbundler = unbundlerclass(ui, fp)
   754     indebug(ui, 'start processing of %s stream' % magicstring)
   781     indebug(ui, 'start processing of %s stream' % magicstring)
   755     return unbundler
   782     return unbundler
   756 
   783 
       
   784 
   757 class unbundle20(unpackermixin):
   785 class unbundle20(unpackermixin):
   758     """interpret a bundle2 stream
   786     """interpret a bundle2 stream
   759 
   787 
   760     This class is fed with a binary stream and yields parts through its
   788     This class is fed with a binary stream and yields parts through its
   761     `iterparts` methods."""
   789     `iterparts` methods."""
   774         """dictionary of stream level parameters"""
   802         """dictionary of stream level parameters"""
   775         indebug(self.ui, 'reading bundle2 stream parameters')
   803         indebug(self.ui, 'reading bundle2 stream parameters')
   776         params = {}
   804         params = {}
   777         paramssize = self._unpack(_fstreamparamsize)[0]
   805         paramssize = self._unpack(_fstreamparamsize)[0]
   778         if paramssize < 0:
   806         if paramssize < 0:
   779             raise error.BundleValueError('negative bundle param size: %i'
   807             raise error.BundleValueError(
   780                                          % paramssize)
   808                 'negative bundle param size: %i' % paramssize
       
   809             )
   781         if paramssize:
   810         if paramssize:
   782             params = self._readexact(paramssize)
   811             params = self._readexact(paramssize)
   783             params = self._processallparams(params)
   812             params = self._processallparams(params)
   784         return params
   813         return params
   785 
   814 
   792             if len(p) < 2:
   821             if len(p) < 2:
   793                 p.append(None)
   822                 p.append(None)
   794             self._processparam(*p)
   823             self._processparam(*p)
   795             params[p[0]] = p[1]
   824             params[p[0]] = p[1]
   796         return params
   825         return params
   797 
       
   798 
   826 
   799     def _processparam(self, name, value):
   827     def _processparam(self, name, value):
   800         """process a parameter, applying its effect if needed
   828         """process a parameter, applying its effect if needed
   801 
   829 
   802         Parameter starting with a lower case letter are advisory and will be
   830         Parameter starting with a lower case letter are advisory and will be
   830         """
   858         """
   831         yield self._magicstring
   859         yield self._magicstring
   832         assert 'params' not in vars(self)
   860         assert 'params' not in vars(self)
   833         paramssize = self._unpack(_fstreamparamsize)[0]
   861         paramssize = self._unpack(_fstreamparamsize)[0]
   834         if paramssize < 0:
   862         if paramssize < 0:
   835             raise error.BundleValueError('negative bundle param size: %i'
   863             raise error.BundleValueError(
   836                                          % paramssize)
   864                 'negative bundle param size: %i' % paramssize
       
   865             )
   837         if paramssize:
   866         if paramssize:
   838             params = self._readexact(paramssize)
   867             params = self._readexact(paramssize)
   839             self._processallparams(params)
   868             self._processallparams(params)
   840             # The payload itself is decompressed below, so drop
   869             # The payload itself is decompressed below, so drop
   841             # the compression parameter passed down to compensate.
   870             # the compression parameter passed down to compensate.
   866                 continue
   895                 continue
   867             elif size < 0:
   896             elif size < 0:
   868                 raise error.BundleValueError('negative chunk size: %i')
   897                 raise error.BundleValueError('negative chunk size: %i')
   869             yield self._readexact(size)
   898             yield self._readexact(size)
   870 
   899 
   871 
       
   872     def iterparts(self, seekable=False):
   900     def iterparts(self, seekable=False):
   873         """yield all parts contained in the stream"""
   901         """yield all parts contained in the stream"""
   874         cls = seekableunbundlepart if seekable else unbundlepart
   902         cls = seekableunbundlepart if seekable else unbundlepart
   875         # make sure param have been loaded
   903         # make sure param have been loaded
   876         self.params
   904         self.params
   892         """reads a part header size and return the bytes blob
   920         """reads a part header size and return the bytes blob
   893 
   921 
   894         returns None if empty"""
   922         returns None if empty"""
   895         headersize = self._unpack(_fpartheadersize)[0]
   923         headersize = self._unpack(_fpartheadersize)[0]
   896         if headersize < 0:
   924         if headersize < 0:
   897             raise error.BundleValueError('negative part header size: %i'
   925             raise error.BundleValueError(
   898                                          % headersize)
   926                 'negative part header size: %i' % headersize
       
   927             )
   899         indebug(self.ui, 'part header size: %i' % headersize)
   928         indebug(self.ui, 'part header size: %i' % headersize)
   900         if headersize:
   929         if headersize:
   901             return self._readexact(headersize)
   930             return self._readexact(headersize)
   902         return None
   931         return None
   903 
   932 
   904     def compressed(self):
   933     def compressed(self):
   905         self.params # load params
   934         self.params  # load params
   906         return self._compressed
   935         return self._compressed
   907 
   936 
   908     def close(self):
   937     def close(self):
   909         """close underlying file"""
   938         """close underlying file"""
   910         if util.safehasattr(self._fp, 'close'):
   939         if util.safehasattr(self._fp, 'close'):
   911             return self._fp.close()
   940             return self._fp.close()
   912 
   941 
       
   942 
   913 formatmap = {'20': unbundle20}
   943 formatmap = {'20': unbundle20}
   914 
   944 
   915 b2streamparamsmap = {}
   945 b2streamparamsmap = {}
       
   946 
   916 
   947 
   917 def b2streamparamhandler(name):
   948 def b2streamparamhandler(name):
   918     """register a handler for a stream level parameter"""
   949     """register a handler for a stream level parameter"""
       
   950 
   919     def decorator(func):
   951     def decorator(func):
   920         assert name not in formatmap
   952         assert name not in formatmap
   921         b2streamparamsmap[name] = func
   953         b2streamparamsmap[name] = func
   922         return func
   954         return func
       
   955 
   923     return decorator
   956     return decorator
       
   957 
   924 
   958 
   925 @b2streamparamhandler('compression')
   959 @b2streamparamhandler('compression')
   926 def processcompression(unbundler, param, value):
   960 def processcompression(unbundler, param, value):
   927     """read compression parameter and install payload decompression"""
   961     """read compression parameter and install payload decompression"""
   928     if value not in util.compengines.supportedbundletypes:
   962     if value not in util.compengines.supportedbundletypes:
   929         raise error.BundleUnknownFeatureError(params=(param,),
   963         raise error.BundleUnknownFeatureError(params=(param,), values=(value,))
   930                                               values=(value,))
       
   931     unbundler._compengine = util.compengines.forbundletype(value)
   964     unbundler._compengine = util.compengines.forbundletype(value)
   932     if value is not None:
   965     if value is not None:
   933         unbundler._compressed = True
   966         unbundler._compressed = True
   934 
   967 
       
   968 
   935 class bundlepart(object):
   969 class bundlepart(object):
   936     """A bundle2 part contains application level payload
   970     """A bundle2 part contains application level payload
   937 
   971 
   938     The part `type` is used to route the part to the application level
   972     The part `type` is used to route the part to the application level
   939     handler.
   973     handler.
   946     should be able to safely ignore the advisory ones.
   980     should be able to safely ignore the advisory ones.
   947 
   981 
   948     Both data and parameters cannot be modified after the generation has begun.
   982     Both data and parameters cannot be modified after the generation has begun.
   949     """
   983     """
   950 
   984 
   951     def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
   985     def __init__(
   952                  data='', mandatory=True):
   986         self,
       
   987         parttype,
       
   988         mandatoryparams=(),
       
   989         advisoryparams=(),
       
   990         data='',
       
   991         mandatory=True,
       
   992     ):
   953         validateparttype(parttype)
   993         validateparttype(parttype)
   954         self.id = None
   994         self.id = None
   955         self.type = parttype
   995         self.type = parttype
   956         self._data = data
   996         self._data = data
   957         self._mandatoryparams = list(mandatoryparams)
   997         self._mandatoryparams = list(mandatoryparams)
   969         self._generated = None
  1009         self._generated = None
   970         self.mandatory = mandatory
  1010         self.mandatory = mandatory
   971 
  1011 
   972     def __repr__(self):
  1012     def __repr__(self):
   973         cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
  1013         cls = "%s.%s" % (self.__class__.__module__, self.__class__.__name__)
   974         return ('<%s object at %x; id: %s; type: %s; mandatory: %s>'
  1014         return '<%s object at %x; id: %s; type: %s; mandatory: %s>' % (
   975                 % (cls, id(self), self.id, self.type, self.mandatory))
  1015             cls,
       
  1016             id(self),
       
  1017             self.id,
       
  1018             self.type,
       
  1019             self.mandatory,
       
  1020         )
   976 
  1021 
   977     def copy(self):
  1022     def copy(self):
   978         """return a copy of the part
  1023         """return a copy of the part
   979 
  1024 
   980         The new part have the very same content but no partid assigned yet.
  1025         The new part have the very same content but no partid assigned yet.
   981         Parts with generated data cannot be copied."""
  1026         Parts with generated data cannot be copied."""
   982         assert not util.safehasattr(self.data, 'next')
  1027         assert not util.safehasattr(self.data, 'next')
   983         return self.__class__(self.type, self._mandatoryparams,
  1028         return self.__class__(
   984                               self._advisoryparams, self._data, self.mandatory)
  1029             self.type,
       
  1030             self._mandatoryparams,
       
  1031             self._advisoryparams,
       
  1032             self._data,
       
  1033             self.mandatory,
       
  1034         )
   985 
  1035 
   986     # methods used to defines the part content
  1036     # methods used to defines the part content
   987     @property
  1037     @property
   988     def data(self):
  1038     def data(self):
   989         return self._data
  1039         return self._data
  1041                 if nbap:
  1091                 if nbap:
  1042                     msg.append(' %i advisory' % nbmp)
  1092                     msg.append(' %i advisory' % nbmp)
  1043                 msg.append(')')
  1093                 msg.append(')')
  1044             if not self.data:
  1094             if not self.data:
  1045                 msg.append(' empty payload')
  1095                 msg.append(' empty payload')
  1046             elif (util.safehasattr(self.data, 'next')
  1096             elif util.safehasattr(self.data, 'next') or util.safehasattr(
  1047                   or util.safehasattr(self.data, '__next__')):
  1097                 self.data, '__next__'
       
  1098             ):
  1048                 msg.append(' streamed payload')
  1099                 msg.append(' streamed payload')
  1049             else:
  1100             else:
  1050                 msg.append(' %i bytes payload' % len(self.data))
  1101                 msg.append(' %i bytes payload' % len(self.data))
  1051             msg.append('\n')
  1102             msg.append('\n')
  1052             ui.debug(''.join(msg))
  1103             ui.debug(''.join(msg))
  1056             parttype = self.type.upper()
  1107             parttype = self.type.upper()
  1057         else:
  1108         else:
  1058             parttype = self.type.lower()
  1109             parttype = self.type.lower()
  1059         outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
  1110         outdebug(ui, 'part %s: "%s"' % (pycompat.bytestr(self.id), parttype))
  1060         ## parttype
  1111         ## parttype
  1061         header = [_pack(_fparttypesize, len(parttype)),
  1112         header = [
  1062                   parttype, _pack(_fpartid, self.id),
  1113             _pack(_fparttypesize, len(parttype)),
  1063                  ]
  1114             parttype,
       
  1115             _pack(_fpartid, self.id),
       
  1116         ]
  1064         ## parameters
  1117         ## parameters
  1065         # count
  1118         # count
  1066         manpar = self.mandatoryparams
  1119         manpar = self.mandatoryparams
  1067         advpar = self.advisoryparams
  1120         advpar = self.advisoryparams
  1068         header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
  1121         header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
  1085             header.append(value)
  1138             header.append(value)
  1086         ## finalize header
  1139         ## finalize header
  1087         try:
  1140         try:
  1088             headerchunk = ''.join(header)
  1141             headerchunk = ''.join(header)
  1089         except TypeError:
  1142         except TypeError:
  1090             raise TypeError(r'Found a non-bytes trying to '
  1143             raise TypeError(
  1091                             r'build bundle part header: %r' % header)
  1144                 r'Found a non-bytes trying to '
       
  1145                 r'build bundle part header: %r' % header
       
  1146             )
  1092         outdebug(ui, 'header chunk size: %i' % len(headerchunk))
  1147         outdebug(ui, 'header chunk size: %i' % len(headerchunk))
  1093         yield _pack(_fpartheadersize, len(headerchunk))
  1148         yield _pack(_fpartheadersize, len(headerchunk))
  1094         yield headerchunk
  1149         yield headerchunk
  1095         ## payload
  1150         ## payload
  1096         try:
  1151         try:
  1105             ui.debug('bundle2-generatorexit\n')
  1160             ui.debug('bundle2-generatorexit\n')
  1106             raise
  1161             raise
  1107         except BaseException as exc:
  1162         except BaseException as exc:
  1108             bexc = stringutil.forcebytestr(exc)
  1163             bexc = stringutil.forcebytestr(exc)
  1109             # backup exception data for later
  1164             # backup exception data for later
  1110             ui.debug('bundle2-input-stream-interrupt: encoding exception %s'
  1165             ui.debug(
  1111                      % bexc)
  1166                 'bundle2-input-stream-interrupt: encoding exception %s' % bexc
       
  1167             )
  1112             tb = sys.exc_info()[2]
  1168             tb = sys.exc_info()[2]
  1113             msg = 'unexpected error: %s' % bexc
  1169             msg = 'unexpected error: %s' % bexc
  1114             interpart = bundlepart('error:abort', [('message', msg)],
  1170             interpart = bundlepart(
  1115                                    mandatory=False)
  1171                 'error:abort', [('message', msg)], mandatory=False
       
  1172             )
  1116             interpart.id = 0
  1173             interpart.id = 0
  1117             yield _pack(_fpayloadsize, -1)
  1174             yield _pack(_fpayloadsize, -1)
  1118             for chunk in interpart.getchunks(ui=ui):
  1175             for chunk in interpart.getchunks(ui=ui):
  1119                 yield chunk
  1176                 yield chunk
  1120             outdebug(ui, 'closing payload chunk')
  1177             outdebug(ui, 'closing payload chunk')
  1130         """yield chunks of a the part payload
  1187         """yield chunks of a the part payload
  1131 
  1188 
  1132         Exists to handle the different methods to provide data to a part."""
  1189         Exists to handle the different methods to provide data to a part."""
  1133         # we only support fixed size data now.
  1190         # we only support fixed size data now.
  1134         # This will be improved in the future.
  1191         # This will be improved in the future.
  1135         if (util.safehasattr(self.data, 'next')
  1192         if util.safehasattr(self.data, 'next') or util.safehasattr(
  1136             or util.safehasattr(self.data, '__next__')):
  1193             self.data, '__next__'
       
  1194         ):
  1137             buff = util.chunkbuffer(self.data)
  1195             buff = util.chunkbuffer(self.data)
  1138             chunk = buff.read(preferedchunksize)
  1196             chunk = buff.read(preferedchunksize)
  1139             while chunk:
  1197             while chunk:
  1140                 yield chunk
  1198                 yield chunk
  1141                 chunk = buff.read(preferedchunksize)
  1199                 chunk = buff.read(preferedchunksize)
  1143             yield self.data
  1201             yield self.data
  1144 
  1202 
  1145 
  1203 
  1146 flaginterrupt = -1
  1204 flaginterrupt = -1
  1147 
  1205 
       
  1206 
  1148 class interrupthandler(unpackermixin):
  1207 class interrupthandler(unpackermixin):
  1149     """read one part and process it with restricted capability
  1208     """read one part and process it with restricted capability
  1150 
  1209 
  1151     This allows to transmit exception raised on the producer size during part
  1210     This allows to transmit exception raised on the producer size during part
  1152     iteration while the consumer is reading a part.
  1211     iteration while the consumer is reading a part.
  1161         """reads a part header size and return the bytes blob
  1220         """reads a part header size and return the bytes blob
  1162 
  1221 
  1163         returns None if empty"""
  1222         returns None if empty"""
  1164         headersize = self._unpack(_fpartheadersize)[0]
  1223         headersize = self._unpack(_fpartheadersize)[0]
  1165         if headersize < 0:
  1224         if headersize < 0:
  1166             raise error.BundleValueError('negative part header size: %i'
  1225             raise error.BundleValueError(
  1167                                          % headersize)
  1226                 'negative part header size: %i' % headersize
       
  1227             )
  1168         indebug(self.ui, 'part header size: %i\n' % headersize)
  1228         indebug(self.ui, 'part header size: %i\n' % headersize)
  1169         if headersize:
  1229         if headersize:
  1170             return self._readexact(headersize)
  1230             return self._readexact(headersize)
  1171         return None
  1231         return None
  1172 
  1232 
  1173     def __call__(self):
  1233     def __call__(self):
  1174 
  1234 
  1175         self.ui.debug('bundle2-input-stream-interrupt:'
  1235         self.ui.debug(
  1176                       ' opening out of band context\n')
  1236             'bundle2-input-stream-interrupt:' ' opening out of band context\n'
       
  1237         )
  1177         indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
  1238         indebug(self.ui, 'bundle2 stream interruption, looking for a part.')
  1178         headerblock = self._readpartheader()
  1239         headerblock = self._readpartheader()
  1179         if headerblock is None:
  1240         if headerblock is None:
  1180             indebug(self.ui, 'no part found during interruption.')
  1241             indebug(self.ui, 'no part found during interruption.')
  1181             return
  1242             return
  1188             hardabort = True
  1249             hardabort = True
  1189             raise
  1250             raise
  1190         finally:
  1251         finally:
  1191             if not hardabort:
  1252             if not hardabort:
  1192                 part.consume()
  1253                 part.consume()
  1193         self.ui.debug('bundle2-input-stream-interrupt:'
  1254         self.ui.debug(
  1194                       ' closing out of band context\n')
  1255             'bundle2-input-stream-interrupt:' ' closing out of band context\n'
       
  1256         )
       
  1257 
  1195 
  1258 
  1196 class interruptoperation(object):
  1259 class interruptoperation(object):
  1197     """A limited operation to be use by part handler during interruption
  1260     """A limited operation to be use by part handler during interruption
  1198 
  1261 
  1199     It only have access to an ui object.
  1262     It only have access to an ui object.
  1208     def repo(self):
  1271     def repo(self):
  1209         raise error.ProgrammingError('no repo access from stream interruption')
  1272         raise error.ProgrammingError('no repo access from stream interruption')
  1210 
  1273 
  1211     def gettransaction(self):
  1274     def gettransaction(self):
  1212         raise TransactionUnavailable('no repo access from stream interruption')
  1275         raise TransactionUnavailable('no repo access from stream interruption')
       
  1276 
  1213 
  1277 
  1214 def decodepayloadchunks(ui, fh):
  1278 def decodepayloadchunks(ui, fh):
  1215     """Reads bundle2 part payload data into chunks.
  1279     """Reads bundle2 part payload data into chunks.
  1216 
  1280 
  1217     Part payload data consists of framed chunks. This function takes
  1281     Part payload data consists of framed chunks. This function takes
  1233     # changegroup.readexactly() is inlined below for performance.
  1297     # changegroup.readexactly() is inlined below for performance.
  1234     while chunksize:
  1298     while chunksize:
  1235         if chunksize >= 0:
  1299         if chunksize >= 0:
  1236             s = read(chunksize)
  1300             s = read(chunksize)
  1237             if len(s) < chunksize:
  1301             if len(s) < chunksize:
  1238                 raise error.Abort(_('stream ended unexpectedly '
  1302                 raise error.Abort(
  1239                                     ' (got %d bytes, expected %d)') %
  1303                     _(
  1240                                   (len(s), chunksize))
  1304                         'stream ended unexpectedly '
       
  1305                         ' (got %d bytes, expected %d)'
       
  1306                     )
       
  1307                     % (len(s), chunksize)
       
  1308                 )
  1241 
  1309 
  1242             yield s
  1310             yield s
  1243         elif chunksize == flaginterrupt:
  1311         elif chunksize == flaginterrupt:
  1244             # Interrupt "signal" detected. The regular stream is interrupted
  1312             # Interrupt "signal" detected. The regular stream is interrupted
  1245             # and a bundle2 part follows. Consume it.
  1313             # and a bundle2 part follows. Consume it.
  1246             interrupthandler(ui, fh)()
  1314             interrupthandler(ui, fh)()
  1247         else:
  1315         else:
  1248             raise error.BundleValueError(
  1316             raise error.BundleValueError(
  1249                 'negative payload chunk size: %s' % chunksize)
  1317                 'negative payload chunk size: %s' % chunksize
       
  1318             )
  1250 
  1319 
  1251         s = read(headersize)
  1320         s = read(headersize)
  1252         if len(s) < headersize:
  1321         if len(s) < headersize:
  1253             raise error.Abort(_('stream ended unexpectedly '
  1322             raise error.Abort(
  1254                                 ' (got %d bytes, expected %d)') %
  1323                 _('stream ended unexpectedly ' ' (got %d bytes, expected %d)')
  1255                               (len(s), chunksize))
  1324                 % (len(s), chunksize)
       
  1325             )
  1256 
  1326 
  1257         chunksize = unpack(s)[0]
  1327         chunksize = unpack(s)[0]
  1258 
  1328 
  1259         # indebug() inlined for performance.
  1329         # indebug() inlined for performance.
  1260         if dolog:
  1330         if dolog:
  1261             debug('bundle2-input: payload chunk size: %i\n' % chunksize)
  1331             debug('bundle2-input: payload chunk size: %i\n' % chunksize)
  1262 
  1332 
       
  1333 
  1263 class unbundlepart(unpackermixin):
  1334 class unbundlepart(unpackermixin):
  1264     """a bundle part read from a bundle"""
  1335     """a bundle part read from a bundle"""
  1265 
  1336 
  1266     def __init__(self, ui, header, fp):
  1337     def __init__(self, ui, header, fp):
  1267         super(unbundlepart, self).__init__(fp)
  1338         super(unbundlepart, self).__init__(fp)
  1268         self._seekable = (util.safehasattr(fp, 'seek') and
  1339         self._seekable = util.safehasattr(fp, 'seek') and util.safehasattr(
  1269                           util.safehasattr(fp, 'tell'))
  1340             fp, 'tell'
       
  1341         )
  1270         self.ui = ui
  1342         self.ui = ui
  1271         # unbundle state attr
  1343         # unbundle state attr
  1272         self._headerdata = header
  1344         self._headerdata = header
  1273         self._headeroffset = 0
  1345         self._headeroffset = 0
  1274         self._initialized = False
  1346         self._initialized = False
  1285         self._pos = 0
  1357         self._pos = 0
  1286 
  1358 
  1287     def _fromheader(self, size):
  1359     def _fromheader(self, size):
  1288         """return the next <size> byte from the header"""
  1360         """return the next <size> byte from the header"""
  1289         offset = self._headeroffset
  1361         offset = self._headeroffset
  1290         data = self._headerdata[offset:(offset + size)]
  1362         data = self._headerdata[offset : (offset + size)]
  1291         self._headeroffset = offset + size
  1363         self._headeroffset = offset + size
  1292         return data
  1364         return data
  1293 
  1365 
  1294     def _unpackheader(self, format):
  1366     def _unpackheader(self, format):
  1295         """read given format from header
  1367         """read given format from header
  1300 
  1372 
  1301     def _initparams(self, mandatoryparams, advisoryparams):
  1373     def _initparams(self, mandatoryparams, advisoryparams):
  1302         """internal function to setup all logic related parameters"""
  1374         """internal function to setup all logic related parameters"""
  1303         # make it read only to prevent people touching it by mistake.
  1375         # make it read only to prevent people touching it by mistake.
  1304         self.mandatoryparams = tuple(mandatoryparams)
  1376         self.mandatoryparams = tuple(mandatoryparams)
  1305         self.advisoryparams  = tuple(advisoryparams)
  1377         self.advisoryparams = tuple(advisoryparams)
  1306         # user friendly UI
  1378         # user friendly UI
  1307         self.params = util.sortdict(self.mandatoryparams)
  1379         self.params = util.sortdict(self.mandatoryparams)
  1308         self.params.update(self.advisoryparams)
  1380         self.params.update(self.advisoryparams)
  1309         self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
  1381         self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
  1310 
  1382 
  1314         self.type = self._fromheader(typesize)
  1386         self.type = self._fromheader(typesize)
  1315         indebug(self.ui, 'part type: "%s"' % self.type)
  1387         indebug(self.ui, 'part type: "%s"' % self.type)
  1316         self.id = self._unpackheader(_fpartid)[0]
  1388         self.id = self._unpackheader(_fpartid)[0]
  1317         indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
  1389         indebug(self.ui, 'part id: "%s"' % pycompat.bytestr(self.id))
  1318         # extract mandatory bit from type
  1390         # extract mandatory bit from type
  1319         self.mandatory = (self.type != self.type.lower())
  1391         self.mandatory = self.type != self.type.lower()
  1320         self.type = self.type.lower()
  1392         self.type = self.type.lower()
  1321         ## reading parameters
  1393         ## reading parameters
  1322         # param count
  1394         # param count
  1323         mancount, advcount = self._unpackheader(_fpartparamcount)
  1395         mancount, advcount = self._unpackheader(_fpartparamcount)
  1324         indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
  1396         indebug(self.ui, 'part parameters: %i' % (mancount + advcount))
  1370         else:
  1442         else:
  1371             data = self._payloadstream.read(size)
  1443             data = self._payloadstream.read(size)
  1372         self._pos += len(data)
  1444         self._pos += len(data)
  1373         if size is None or len(data) < size:
  1445         if size is None or len(data) < size:
  1374             if not self.consumed and self._pos:
  1446             if not self.consumed and self._pos:
  1375                 self.ui.debug('bundle2-input-part: total payload size %i\n'
  1447                 self.ui.debug(
  1376                               % self._pos)
  1448                     'bundle2-input-part: total payload size %i\n' % self._pos
       
  1449                 )
  1377             self.consumed = True
  1450             self.consumed = True
  1378         return data
  1451         return data
       
  1452 
  1379 
  1453 
  1380 class seekableunbundlepart(unbundlepart):
  1454 class seekableunbundlepart(unbundlepart):
  1381     """A bundle2 part in a bundle that is seekable.
  1455     """A bundle2 part in a bundle that is seekable.
  1382 
  1456 
  1383     Regular ``unbundlepart`` instances can only be read once. This class
  1457     Regular ``unbundlepart`` instances can only be read once. This class
  1392     class maintain a mapping between offsets in the underlying stream and
  1466     class maintain a mapping between offsets in the underlying stream and
  1393     the decoded payload. This mapping will consume memory in proportion
  1467     the decoded payload. This mapping will consume memory in proportion
  1394     to the number of chunks within the payload (which almost certainly
  1468     to the number of chunks within the payload (which almost certainly
  1395     increases in proportion with the size of the part).
  1469     increases in proportion with the size of the part).
  1396     """
  1470     """
       
  1471 
  1397     def __init__(self, ui, header, fp):
  1472     def __init__(self, ui, header, fp):
  1398         # (payload, file) offsets for chunk starts.
  1473         # (payload, file) offsets for chunk starts.
  1399         self._chunkindex = []
  1474         self._chunkindex = []
  1400 
  1475 
  1401         super(seekableunbundlepart, self).__init__(ui, header, fp)
  1476         super(seekableunbundlepart, self).__init__(ui, header, fp)
  1405         if len(self._chunkindex) == 0:
  1480         if len(self._chunkindex) == 0:
  1406             assert chunknum == 0, 'Must start with chunk 0'
  1481             assert chunknum == 0, 'Must start with chunk 0'
  1407             self._chunkindex.append((0, self._tellfp()))
  1482             self._chunkindex.append((0, self._tellfp()))
  1408         else:
  1483         else:
  1409             assert chunknum < len(self._chunkindex), (
  1484             assert chunknum < len(self._chunkindex), (
  1410                    'Unknown chunk %d' % chunknum)
  1485                 'Unknown chunk %d' % chunknum
       
  1486             )
  1411             self._seekfp(self._chunkindex[chunknum][1])
  1487             self._seekfp(self._chunkindex[chunknum][1])
  1412 
  1488 
  1413         pos = self._chunkindex[chunknum][0]
  1489         pos = self._chunkindex[chunknum][0]
  1414 
  1490 
  1415         for chunk in decodepayloadchunks(self.ui, self._fp):
  1491         for chunk in decodepayloadchunks(self.ui, self._fp):
  1493                     self._seekable = False
  1569                     self._seekable = False
  1494                 else:
  1570                 else:
  1495                     raise
  1571                     raise
  1496         return None
  1572         return None
  1497 
  1573 
       
  1574 
  1498 # These are only the static capabilities.
  1575 # These are only the static capabilities.
  1499 # Check the 'getrepocaps' function for the rest.
  1576 # Check the 'getrepocaps' function for the rest.
  1500 capabilities = {'HG20': (),
  1577 capabilities = {
  1501                 'bookmarks': (),
  1578     'HG20': (),
  1502                 'error': ('abort', 'unsupportedcontent', 'pushraced',
  1579     'bookmarks': (),
  1503                           'pushkey'),
  1580     'error': ('abort', 'unsupportedcontent', 'pushraced', 'pushkey'),
  1504                 'listkeys': (),
  1581     'listkeys': (),
  1505                 'pushkey': (),
  1582     'pushkey': (),
  1506                 'digests': tuple(sorted(util.DIGESTS.keys())),
  1583     'digests': tuple(sorted(util.DIGESTS.keys())),
  1507                 'remote-changegroup': ('http', 'https'),
  1584     'remote-changegroup': ('http', 'https'),
  1508                 'hgtagsfnodes': (),
  1585     'hgtagsfnodes': (),
  1509                 'rev-branch-cache': (),
  1586     'rev-branch-cache': (),
  1510                 'phases': ('heads',),
  1587     'phases': ('heads',),
  1511                 'stream': ('v2',),
  1588     'stream': ('v2',),
  1512                }
  1589 }
       
  1590 
  1513 
  1591 
  1514 def getrepocaps(repo, allowpushback=False, role=None):
  1592 def getrepocaps(repo, allowpushback=False, role=None):
  1515     """return the bundle2 capabilities for a given repo
  1593     """return the bundle2 capabilities for a given repo
  1516 
  1594 
  1517     Exists to allow extensions (like evolution) to mutate the capabilities.
  1595     Exists to allow extensions (like evolution) to mutate the capabilities.
  1522     """
  1600     """
  1523     if role not in ('client', 'server'):
  1601     if role not in ('client', 'server'):
  1524         raise error.ProgrammingError('role argument must be client or server')
  1602         raise error.ProgrammingError('role argument must be client or server')
  1525 
  1603 
  1526     caps = capabilities.copy()
  1604     caps = capabilities.copy()
  1527     caps['changegroup'] = tuple(sorted(
  1605     caps['changegroup'] = tuple(
  1528         changegroup.supportedincomingversions(repo)))
  1606         sorted(changegroup.supportedincomingversions(repo))
       
  1607     )
  1529     if obsolete.isenabled(repo, obsolete.exchangeopt):
  1608     if obsolete.isenabled(repo, obsolete.exchangeopt):
  1530         supportedformat = tuple('V%i' % v for v in obsolete.formats)
  1609         supportedformat = tuple('V%i' % v for v in obsolete.formats)
  1531         caps['obsmarkers'] = supportedformat
  1610         caps['obsmarkers'] = supportedformat
  1532     if allowpushback:
  1611     if allowpushback:
  1533         caps['pushback'] = ()
  1612         caps['pushback'] = ()
  1537     if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
  1616     if 'phases' in repo.ui.configlist('devel', 'legacy.exchange'):
  1538         caps.pop('phases')
  1617         caps.pop('phases')
  1539 
  1618 
  1540     # Don't advertise stream clone support in server mode if not configured.
  1619     # Don't advertise stream clone support in server mode if not configured.
  1541     if role == 'server':
  1620     if role == 'server':
  1542         streamsupported = repo.ui.configbool('server', 'uncompressed',
  1621         streamsupported = repo.ui.configbool(
  1543                                              untrusted=True)
  1622             'server', 'uncompressed', untrusted=True
       
  1623         )
  1544         featuresupported = repo.ui.configbool('server', 'bundle2.stream')
  1624         featuresupported = repo.ui.configbool('server', 'bundle2.stream')
  1545 
  1625 
  1546         if not streamsupported or not featuresupported:
  1626         if not streamsupported or not featuresupported:
  1547             caps.pop('stream')
  1627             caps.pop('stream')
  1548     # Else always advertise support on client, because payload support
  1628     # Else always advertise support on client, because payload support
  1549     # should always be advertised.
  1629     # should always be advertised.
  1550 
  1630 
  1551     return caps
  1631     return caps
       
  1632 
  1552 
  1633 
  1553 def bundle2caps(remote):
  1634 def bundle2caps(remote):
  1554     """return the bundle capabilities of a peer as dict"""
  1635     """return the bundle capabilities of a peer as dict"""
  1555     raw = remote.capable('bundle2')
  1636     raw = remote.capable('bundle2')
  1556     if not raw and raw != '':
  1637     if not raw and raw != '':
  1557         return {}
  1638         return {}
  1558     capsblob = urlreq.unquote(remote.capable('bundle2'))
  1639     capsblob = urlreq.unquote(remote.capable('bundle2'))
  1559     return decodecaps(capsblob)
  1640     return decodecaps(capsblob)
  1560 
  1641 
       
  1642 
  1561 def obsmarkersversion(caps):
  1643 def obsmarkersversion(caps):
  1562     """extract the list of supported obsmarkers versions from a bundle2caps dict
  1644     """extract the list of supported obsmarkers versions from a bundle2caps dict
  1563     """
  1645     """
  1564     obscaps = caps.get('obsmarkers', ())
  1646     obscaps = caps.get('obsmarkers', ())
  1565     return [int(c[1:]) for c in obscaps if c.startswith('V')]
  1647     return [int(c[1:]) for c in obscaps if c.startswith('V')]
  1566 
  1648 
  1567 def writenewbundle(ui, repo, source, filename, bundletype, outgoing, opts,
  1649 
  1568                    vfs=None, compression=None, compopts=None):
  1650 def writenewbundle(
       
  1651     ui,
       
  1652     repo,
       
  1653     source,
       
  1654     filename,
       
  1655     bundletype,
       
  1656     outgoing,
       
  1657     opts,
       
  1658     vfs=None,
       
  1659     compression=None,
       
  1660     compopts=None,
       
  1661 ):
  1569     if bundletype.startswith('HG10'):
  1662     if bundletype.startswith('HG10'):
  1570         cg = changegroup.makechangegroup(repo, outgoing, '01', source)
  1663         cg = changegroup.makechangegroup(repo, outgoing, '01', source)
  1571         return writebundle(ui, cg, filename, bundletype, vfs=vfs,
  1664         return writebundle(
  1572                            compression=compression, compopts=compopts)
  1665             ui,
       
  1666             cg,
       
  1667             filename,
       
  1668             bundletype,
       
  1669             vfs=vfs,
       
  1670             compression=compression,
       
  1671             compopts=compopts,
       
  1672         )
  1573     elif not bundletype.startswith('HG20'):
  1673     elif not bundletype.startswith('HG20'):
  1574         raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
  1674         raise error.ProgrammingError('unknown bundle type: %s' % bundletype)
  1575 
  1675 
  1576     caps = {}
  1676     caps = {}
  1577     if 'obsolescence' in opts:
  1677     if 'obsolescence' in opts:
  1580     bundle.setcompression(compression, compopts)
  1680     bundle.setcompression(compression, compopts)
  1581     _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
  1681     _addpartsfromopts(ui, repo, bundle, source, outgoing, opts)
  1582     chunkiter = bundle.getchunks()
  1682     chunkiter = bundle.getchunks()
  1583 
  1683 
  1584     return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
  1684     return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
       
  1685 
  1585 
  1686 
  1586 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
  1687 def _addpartsfromopts(ui, repo, bundler, source, outgoing, opts):
  1587     # We should eventually reconcile this logic with the one behind
  1688     # We should eventually reconcile this logic with the one behind
  1588     # 'exchange.getbundle2partsgenerator'.
  1689     # 'exchange.getbundle2partsgenerator'.
  1589     #
  1690     #
  1599             cgversion = changegroup.safeversion(repo)
  1700             cgversion = changegroup.safeversion(repo)
  1600         cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
  1701         cg = changegroup.makechangegroup(repo, outgoing, cgversion, source)
  1601         part = bundler.newpart('changegroup', data=cg.getchunks())
  1702         part = bundler.newpart('changegroup', data=cg.getchunks())
  1602         part.addparam('version', cg.version)
  1703         part.addparam('version', cg.version)
  1603         if 'clcount' in cg.extras:
  1704         if 'clcount' in cg.extras:
  1604             part.addparam('nbchanges', '%d' % cg.extras['clcount'],
  1705             part.addparam(
  1605                           mandatory=False)
  1706                 'nbchanges', '%d' % cg.extras['clcount'], mandatory=False
  1606         if opts.get('phases') and repo.revs('%ln and secret()',
  1707             )
  1607                                             outgoing.missingheads):
  1708         if opts.get('phases') and repo.revs(
       
  1709             '%ln and secret()', outgoing.missingheads
       
  1710         ):
  1608             part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
  1711             part.addparam('targetphase', '%d' % phases.secret, mandatory=False)
  1609 
  1712 
  1610     if opts.get('streamv2', False):
  1713     if opts.get('streamv2', False):
  1611         addpartbundlestream2(bundler, repo, stream=True)
  1714         addpartbundlestream2(bundler, repo, stream=True)
  1612 
  1715 
  1622 
  1725 
  1623     if opts.get('phases', False):
  1726     if opts.get('phases', False):
  1624         headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
  1727         headsbyphase = phases.subsetphaseheads(repo, outgoing.missing)
  1625         phasedata = phases.binaryencode(headsbyphase)
  1728         phasedata = phases.binaryencode(headsbyphase)
  1626         bundler.newpart('phase-heads', data=phasedata)
  1729         bundler.newpart('phase-heads', data=phasedata)
       
  1730 
  1627 
  1731 
  1628 def addparttagsfnodescache(repo, bundler, outgoing):
  1732 def addparttagsfnodescache(repo, bundler, outgoing):
  1629     # we include the tags fnode cache for the bundle changeset
  1733     # we include the tags fnode cache for the bundle changeset
  1630     # (as an optional parts)
  1734     # (as an optional parts)
  1631     cache = tags.hgtagsfnodescache(repo.unfiltered())
  1735     cache = tags.hgtagsfnodescache(repo.unfiltered())
  1647             chunks.extend([node, fnode])
  1751             chunks.extend([node, fnode])
  1648 
  1752 
  1649     if chunks:
  1753     if chunks:
  1650         bundler.newpart('hgtagsfnodes', data=''.join(chunks))
  1754         bundler.newpart('hgtagsfnodes', data=''.join(chunks))
  1651 
  1755 
       
  1756 
  1652 def addpartrevbranchcache(repo, bundler, outgoing):
  1757 def addpartrevbranchcache(repo, bundler, outgoing):
  1653     # we include the rev branch cache for the bundle changeset
  1758     # we include the rev branch cache for the bundle changeset
  1654     # (as an optional parts)
  1759     # (as an optional parts)
  1655     cache = repo.revbranchcache()
  1760     cache = repo.revbranchcache()
  1656     cl = repo.unfiltered().changelog
  1761     cl = repo.unfiltered().changelog
  1667             for n in sorted(nodes):
  1772             for n in sorted(nodes):
  1668                 yield n
  1773                 yield n
  1669             for n in sorted(closed):
  1774             for n in sorted(closed):
  1670                 yield n
  1775                 yield n
  1671 
  1776 
  1672     bundler.newpart('cache:rev-branch-cache', data=generate(),
  1777     bundler.newpart('cache:rev-branch-cache', data=generate(), mandatory=False)
  1673                     mandatory=False)
  1778 
  1674 
  1779 
  1675 def _formatrequirementsspec(requirements):
  1780 def _formatrequirementsspec(requirements):
  1676     requirements = [req for req in requirements if req != "shared"]
  1781     requirements = [req for req in requirements if req != "shared"]
  1677     return urlreq.quote(','.join(sorted(requirements)))
  1782     return urlreq.quote(','.join(sorted(requirements)))
       
  1783 
  1678 
  1784 
  1679 def _formatrequirementsparams(requirements):
  1785 def _formatrequirementsparams(requirements):
  1680     requirements = _formatrequirementsspec(requirements)
  1786     requirements = _formatrequirementsspec(requirements)
  1681     params = "%s%s" % (urlreq.quote("requirements="), requirements)
  1787     params = "%s%s" % (urlreq.quote("requirements="), requirements)
  1682     return params
  1788     return params
  1683 
  1789 
       
  1790 
  1684 def addpartbundlestream2(bundler, repo, **kwargs):
  1791 def addpartbundlestream2(bundler, repo, **kwargs):
  1685     if not kwargs.get(r'stream', False):
  1792     if not kwargs.get(r'stream', False):
  1686         return
  1793         return
  1687 
  1794 
  1688     if not streamclone.allowservergeneration(repo):
  1795     if not streamclone.allowservergeneration(repo):
  1689         raise error.Abort(_('stream data requested but server does not allow '
  1796         raise error.Abort(
  1690                             'this feature'),
  1797             _(
  1691                           hint=_('well-behaved clients should not be '
  1798                 'stream data requested but server does not allow '
  1692                                  'requesting stream data from servers not '
  1799                 'this feature'
  1693                                  'advertising it; the client may be buggy'))
  1800             ),
       
  1801             hint=_(
       
  1802                 'well-behaved clients should not be '
       
  1803                 'requesting stream data from servers not '
       
  1804                 'advertising it; the client may be buggy'
       
  1805             ),
       
  1806         )
  1694 
  1807 
  1695     # Stream clones don't compress well. And compression undermines a
  1808     # Stream clones don't compress well. And compression undermines a
  1696     # goal of stream clones, which is to be fast. Communicate the desire
  1809     # goal of stream clones, which is to be fast. Communicate the desire
  1697     # to avoid compression to consumers of the bundle.
  1810     # to avoid compression to consumers of the bundle.
  1698     bundler.prefercompressed = False
  1811     bundler.prefercompressed = False
  1699 
  1812 
  1700     # get the includes and excludes
  1813     # get the includes and excludes
  1701     includepats = kwargs.get(r'includepats')
  1814     includepats = kwargs.get(r'includepats')
  1702     excludepats = kwargs.get(r'excludepats')
  1815     excludepats = kwargs.get(r'excludepats')
  1703 
  1816 
  1704     narrowstream = repo.ui.configbool('experimental',
  1817     narrowstream = repo.ui.configbool(
  1705                                       'server.stream-narrow-clones')
  1818         'experimental', 'server.stream-narrow-clones'
       
  1819     )
  1706 
  1820 
  1707     if (includepats or excludepats) and not narrowstream:
  1821     if (includepats or excludepats) and not narrowstream:
  1708         raise error.Abort(_('server does not support narrow stream clones'))
  1822         raise error.Abort(_('server does not support narrow stream clones'))
  1709 
  1823 
  1710     includeobsmarkers = False
  1824     includeobsmarkers = False
  1711     if repo.obsstore:
  1825     if repo.obsstore:
  1712         remoteversions = obsmarkersversion(bundler.capabilities)
  1826         remoteversions = obsmarkersversion(bundler.capabilities)
  1713         if not remoteversions:
  1827         if not remoteversions:
  1714             raise error.Abort(_('server has obsolescence markers, but client '
  1828             raise error.Abort(
  1715                                 'cannot receive them via stream clone'))
  1829                 _(
       
  1830                     'server has obsolescence markers, but client '
       
  1831                     'cannot receive them via stream clone'
       
  1832                 )
       
  1833             )
  1716         elif repo.obsstore._version in remoteversions:
  1834         elif repo.obsstore._version in remoteversions:
  1717             includeobsmarkers = True
  1835             includeobsmarkers = True
  1718 
  1836 
  1719     filecount, bytecount, it = streamclone.generatev2(repo, includepats,
  1837     filecount, bytecount, it = streamclone.generatev2(
  1720                                                       excludepats,
  1838         repo, includepats, excludepats, includeobsmarkers
  1721                                                       includeobsmarkers)
  1839     )
  1722     requirements = _formatrequirementsspec(repo.requirements)
  1840     requirements = _formatrequirementsspec(repo.requirements)
  1723     part = bundler.newpart('stream2', data=it)
  1841     part = bundler.newpart('stream2', data=it)
  1724     part.addparam('bytecount', '%d' % bytecount, mandatory=True)
  1842     part.addparam('bytecount', '%d' % bytecount, mandatory=True)
  1725     part.addparam('filecount', '%d' % filecount, mandatory=True)
  1843     part.addparam('filecount', '%d' % filecount, mandatory=True)
  1726     part.addparam('requirements', requirements, mandatory=True)
  1844     part.addparam('requirements', requirements, mandatory=True)
       
  1845 
  1727 
  1846 
  1728 def buildobsmarkerspart(bundler, markers):
  1847 def buildobsmarkerspart(bundler, markers):
  1729     """add an obsmarker part to the bundler with <markers>
  1848     """add an obsmarker part to the bundler with <markers>
  1730 
  1849 
  1731     No part is created if markers is empty.
  1850     No part is created if markers is empty.
  1739     if version is None:
  1858     if version is None:
  1740         raise ValueError('bundler does not support common obsmarker format')
  1859         raise ValueError('bundler does not support common obsmarker format')
  1741     stream = obsolete.encodemarkers(markers, True, version=version)
  1860     stream = obsolete.encodemarkers(markers, True, version=version)
  1742     return bundler.newpart('obsmarkers', data=stream)
  1861     return bundler.newpart('obsmarkers', data=stream)
  1743 
  1862 
  1744 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None,
  1863 
  1745                 compopts=None):
  1864 def writebundle(
       
  1865     ui, cg, filename, bundletype, vfs=None, compression=None, compopts=None
       
  1866 ):
  1746     """Write a bundle file and return its filename.
  1867     """Write a bundle file and return its filename.
  1747 
  1868 
  1748     Existing files will not be overwritten.
  1869     Existing files will not be overwritten.
  1749     If no filename is specified, a temporary file is created.
  1870     If no filename is specified, a temporary file is created.
  1750     bz2 compression can be turned off.
  1871     bz2 compression can be turned off.
  1755         bundle = bundle20(ui)
  1876         bundle = bundle20(ui)
  1756         bundle.setcompression(compression, compopts)
  1877         bundle.setcompression(compression, compopts)
  1757         part = bundle.newpart('changegroup', data=cg.getchunks())
  1878         part = bundle.newpart('changegroup', data=cg.getchunks())
  1758         part.addparam('version', cg.version)
  1879         part.addparam('version', cg.version)
  1759         if 'clcount' in cg.extras:
  1880         if 'clcount' in cg.extras:
  1760             part.addparam('nbchanges', '%d' % cg.extras['clcount'],
  1881             part.addparam(
  1761                           mandatory=False)
  1882                 'nbchanges', '%d' % cg.extras['clcount'], mandatory=False
       
  1883             )
  1762         chunkiter = bundle.getchunks()
  1884         chunkiter = bundle.getchunks()
  1763     else:
  1885     else:
  1764         # compression argument is only for the bundle2 case
  1886         # compression argument is only for the bundle2 case
  1765         assert compression is None
  1887         assert compression is None
  1766         if cg.version != '01':
  1888         if cg.version != '01':
  1767             raise error.Abort(_('old bundle types only supports v1 '
  1889             raise error.Abort(
  1768                                 'changegroups'))
  1890                 _('old bundle types only supports v1 ' 'changegroups')
       
  1891             )
  1769         header, comp = bundletypes[bundletype]
  1892         header, comp = bundletypes[bundletype]
  1770         if comp not in util.compengines.supportedbundletypes:
  1893         if comp not in util.compengines.supportedbundletypes:
  1771             raise error.Abort(_('unknown stream compression type: %s')
  1894             raise error.Abort(_('unknown stream compression type: %s') % comp)
  1772                               % comp)
       
  1773         compengine = util.compengines.forbundletype(comp)
  1895         compengine = util.compengines.forbundletype(comp)
       
  1896 
  1774         def chunkiter():
  1897         def chunkiter():
  1775             yield header
  1898             yield header
  1776             for chunk in compengine.compressstream(cg.getchunks(), compopts):
  1899             for chunk in compengine.compressstream(cg.getchunks(), compopts):
  1777                 yield chunk
  1900                 yield chunk
       
  1901 
  1778         chunkiter = chunkiter()
  1902         chunkiter = chunkiter()
  1779 
  1903 
  1780     # parse the changegroup data, otherwise we will block
  1904     # parse the changegroup data, otherwise we will block
  1781     # in case of sshrepo because we don't know the end of the stream
  1905     # in case of sshrepo because we don't know the end of the stream
  1782     return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
  1906     return changegroup.writechunks(ui, chunkiter, filename, vfs=vfs)
  1783 
  1907 
       
  1908 
  1784 def combinechangegroupresults(op):
  1909 def combinechangegroupresults(op):
  1785     """logic to combine 0 or more addchangegroup results into one"""
  1910     """logic to combine 0 or more addchangegroup results into one"""
  1786     results = [r.get('return', 0)
  1911     results = [r.get('return', 0) for r in op.records['changegroup']]
  1787                for r in op.records['changegroup']]
       
  1788     changedheads = 0
  1912     changedheads = 0
  1789     result = 1
  1913     result = 1
  1790     for ret in results:
  1914     for ret in results:
  1791         # If any changegroup result is 0, return 0
  1915         # If any changegroup result is 0, return 0
  1792         if ret == 0:
  1916         if ret == 0:
  1800         result = 1 + changedheads
  1924         result = 1 + changedheads
  1801     elif changedheads < 0:
  1925     elif changedheads < 0:
  1802         result = -1 + changedheads
  1926         result = -1 + changedheads
  1803     return result
  1927     return result
  1804 
  1928 
  1805 @parthandler('changegroup', ('version', 'nbchanges', 'treemanifest',
  1929 
  1806                              'targetphase'))
  1930 @parthandler(
       
  1931     'changegroup', ('version', 'nbchanges', 'treemanifest', 'targetphase')
       
  1932 )
  1807 def handlechangegroup(op, inpart):
  1933 def handlechangegroup(op, inpart):
  1808     """apply a changegroup part on the repo
  1934     """apply a changegroup part on the repo
  1809 
  1935 
  1810     This is a very early implementation that will massive rework before being
  1936     This is a very early implementation that will massive rework before being
  1811     inflicted to any end-user.
  1937     inflicted to any end-user.
  1819     # the source and url passed here are overwritten by the one contained in
  1945     # the source and url passed here are overwritten by the one contained in
  1820     # the transaction.hookargs argument. So 'bundle2' is a placeholder
  1946     # the transaction.hookargs argument. So 'bundle2' is a placeholder
  1821     nbchangesets = None
  1947     nbchangesets = None
  1822     if 'nbchanges' in inpart.params:
  1948     if 'nbchanges' in inpart.params:
  1823         nbchangesets = int(inpart.params.get('nbchanges'))
  1949         nbchangesets = int(inpart.params.get('nbchanges'))
  1824     if ('treemanifest' in inpart.params and
  1950     if (
  1825         'treemanifest' not in op.repo.requirements):
  1951         'treemanifest' in inpart.params
       
  1952         and 'treemanifest' not in op.repo.requirements
       
  1953     ):
  1826         if len(op.repo.changelog) != 0:
  1954         if len(op.repo.changelog) != 0:
  1827             raise error.Abort(_(
  1955             raise error.Abort(
  1828                 "bundle contains tree manifests, but local repo is "
  1956                 _(
  1829                 "non-empty and does not use tree manifests"))
  1957                     "bundle contains tree manifests, but local repo is "
       
  1958                     "non-empty and does not use tree manifests"
       
  1959                 )
       
  1960             )
  1830         op.repo.requirements.add('treemanifest')
  1961         op.repo.requirements.add('treemanifest')
  1831         op.repo.svfs.options = localrepo.resolvestorevfsoptions(
  1962         op.repo.svfs.options = localrepo.resolvestorevfsoptions(
  1832             op.repo.ui, op.repo.requirements, op.repo.features)
  1963             op.repo.ui, op.repo.requirements, op.repo.features
       
  1964         )
  1833         op.repo._writerequirements()
  1965         op.repo._writerequirements()
  1834     extrakwargs = {}
  1966     extrakwargs = {}
  1835     targetphase = inpart.params.get('targetphase')
  1967     targetphase = inpart.params.get('targetphase')
  1836     if targetphase is not None:
  1968     if targetphase is not None:
  1837         extrakwargs[r'targetphase'] = int(targetphase)
  1969         extrakwargs[r'targetphase'] = int(targetphase)
  1838     ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2',
  1970     ret = _processchangegroup(
  1839                               expectedtotal=nbchangesets, **extrakwargs)
  1971         op,
       
  1972         cg,
       
  1973         tr,
       
  1974         'bundle2',
       
  1975         'bundle2',
       
  1976         expectedtotal=nbchangesets,
       
  1977         **extrakwargs
       
  1978     )
  1840     if op.reply is not None:
  1979     if op.reply is not None:
  1841         # This is definitely not the final form of this
  1980         # This is definitely not the final form of this
  1842         # return. But one need to start somewhere.
  1981         # return. But one need to start somewhere.
  1843         part = op.reply.newpart('reply:changegroup', mandatory=False)
  1982         part = op.reply.newpart('reply:changegroup', mandatory=False)
  1844         part.addparam(
  1983         part.addparam(
  1845             'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
  1984             'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
       
  1985         )
  1846         part.addparam('return', '%i' % ret, mandatory=False)
  1986         part.addparam('return', '%i' % ret, mandatory=False)
  1847     assert not inpart.read()
  1987     assert not inpart.read()
  1848 
  1988 
  1849 _remotechangegroupparams = tuple(['url', 'size', 'digests'] +
  1989 
  1850     ['digest:%s' % k for k in util.DIGESTS.keys()])
  1990 _remotechangegroupparams = tuple(
       
  1991     ['url', 'size', 'digests'] + ['digest:%s' % k for k in util.DIGESTS.keys()]
       
  1992 )
       
  1993 
       
  1994 
  1851 @parthandler('remote-changegroup', _remotechangegroupparams)
  1995 @parthandler('remote-changegroup', _remotechangegroupparams)
  1852 def handleremotechangegroup(op, inpart):
  1996 def handleremotechangegroup(op, inpart):
  1853     """apply a bundle10 on the repo, given an url and validation information
  1997     """apply a bundle10 on the repo, given an url and validation information
  1854 
  1998 
  1855     All the information about the remote bundle to import are given as
  1999     All the information about the remote bundle to import are given as
  1869         raw_url = inpart.params['url']
  2013         raw_url = inpart.params['url']
  1870     except KeyError:
  2014     except KeyError:
  1871         raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
  2015         raise error.Abort(_('remote-changegroup: missing "%s" param') % 'url')
  1872     parsed_url = util.url(raw_url)
  2016     parsed_url = util.url(raw_url)
  1873     if parsed_url.scheme not in capabilities['remote-changegroup']:
  2017     if parsed_url.scheme not in capabilities['remote-changegroup']:
  1874         raise error.Abort(_('remote-changegroup does not support %s urls') %
  2018         raise error.Abort(
  1875             parsed_url.scheme)
  2019             _('remote-changegroup does not support %s urls') % parsed_url.scheme
       
  2020         )
  1876 
  2021 
  1877     try:
  2022     try:
  1878         size = int(inpart.params['size'])
  2023         size = int(inpart.params['size'])
  1879     except ValueError:
  2024     except ValueError:
  1880         raise error.Abort(_('remote-changegroup: invalid value for param "%s"')
  2025         raise error.Abort(
  1881             % 'size')
  2026             _('remote-changegroup: invalid value for param "%s"') % 'size'
       
  2027         )
  1882     except KeyError:
  2028     except KeyError:
  1883         raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
  2029         raise error.Abort(_('remote-changegroup: missing "%s" param') % 'size')
  1884 
  2030 
  1885     digests = {}
  2031     digests = {}
  1886     for typ in inpart.params.get('digests', '').split():
  2032     for typ in inpart.params.get('digests', '').split():
  1887         param = 'digest:%s' % typ
  2033         param = 'digest:%s' % typ
  1888         try:
  2034         try:
  1889             value = inpart.params[param]
  2035             value = inpart.params[param]
  1890         except KeyError:
  2036         except KeyError:
  1891             raise error.Abort(_('remote-changegroup: missing "%s" param') %
  2037             raise error.Abort(
  1892                 param)
  2038                 _('remote-changegroup: missing "%s" param') % param
       
  2039             )
  1893         digests[typ] = value
  2040         digests[typ] = value
  1894 
  2041 
  1895     real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
  2042     real_part = util.digestchecker(url.open(op.ui, raw_url), size, digests)
  1896 
  2043 
  1897     tr = op.gettransaction()
  2044     tr = op.gettransaction()
  1898     from . import exchange
  2045     from . import exchange
       
  2046 
  1899     cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
  2047     cg = exchange.readbundle(op.repo.ui, real_part, raw_url)
  1900     if not isinstance(cg, changegroup.cg1unpacker):
  2048     if not isinstance(cg, changegroup.cg1unpacker):
  1901         raise error.Abort(_('%s: not a bundle version 1.0') %
  2049         raise error.Abort(
  1902             util.hidepassword(raw_url))
  2050             _('%s: not a bundle version 1.0') % util.hidepassword(raw_url)
       
  2051         )
  1903     ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
  2052     ret = _processchangegroup(op, cg, tr, 'bundle2', 'bundle2')
  1904     if op.reply is not None:
  2053     if op.reply is not None:
  1905         # This is definitely not the final form of this
  2054         # This is definitely not the final form of this
  1906         # return. But one need to start somewhere.
  2055         # return. But one need to start somewhere.
  1907         part = op.reply.newpart('reply:changegroup')
  2056         part = op.reply.newpart('reply:changegroup')
  1908         part.addparam(
  2057         part.addparam(
  1909             'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
  2058             'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
       
  2059         )
  1910         part.addparam('return', '%i' % ret, mandatory=False)
  2060         part.addparam('return', '%i' % ret, mandatory=False)
  1911     try:
  2061     try:
  1912         real_part.validate()
  2062         real_part.validate()
  1913     except error.Abort as e:
  2063     except error.Abort as e:
  1914         raise error.Abort(_('bundle at %s is corrupted:\n%s') %
  2064         raise error.Abort(
  1915                           (util.hidepassword(raw_url), bytes(e)))
  2065             _('bundle at %s is corrupted:\n%s')
       
  2066             % (util.hidepassword(raw_url), bytes(e))
       
  2067         )
  1916     assert not inpart.read()
  2068     assert not inpart.read()
       
  2069 
  1917 
  2070 
  1918 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
  2071 @parthandler('reply:changegroup', ('return', 'in-reply-to'))
  1919 def handlereplychangegroup(op, inpart):
  2072 def handlereplychangegroup(op, inpart):
  1920     ret = int(inpart.params['return'])
  2073     ret = int(inpart.params['return'])
  1921     replyto = int(inpart.params['in-reply-to'])
  2074     replyto = int(inpart.params['in-reply-to'])
  1922     op.records.add('changegroup', {'return': ret}, replyto)
  2075     op.records.add('changegroup', {'return': ret}, replyto)
  1923 
  2076 
       
  2077 
  1924 @parthandler('check:bookmarks')
  2078 @parthandler('check:bookmarks')
  1925 def handlecheckbookmarks(op, inpart):
  2079 def handlecheckbookmarks(op, inpart):
  1926     """check location of bookmarks
  2080     """check location of bookmarks
  1927 
  2081 
  1928     This part is to be used to detect push race regarding bookmark, it
  2082     This part is to be used to detect push race regarding bookmark, it
  1929     contains binary encoded (bookmark, node) tuple. If the local state does
  2083     contains binary encoded (bookmark, node) tuple. If the local state does
  1930     not marks the one in the part, a PushRaced exception is raised
  2084     not marks the one in the part, a PushRaced exception is raised
  1931     """
  2085     """
  1932     bookdata = bookmarks.binarydecode(inpart)
  2086     bookdata = bookmarks.binarydecode(inpart)
  1933 
  2087 
  1934     msgstandard = ('remote repository changed while pushing - please try again '
  2088     msgstandard = (
  1935                    '(bookmark "%s" move from %s to %s)')
  2089         'remote repository changed while pushing - please try again '
  1936     msgmissing = ('remote repository changed while pushing - please try again '
  2090         '(bookmark "%s" move from %s to %s)'
  1937                   '(bookmark "%s" is missing, expected %s)')
  2091     )
  1938     msgexist = ('remote repository changed while pushing - please try again '
  2092     msgmissing = (
  1939                 '(bookmark "%s" set on %s, expected missing)')
  2093         'remote repository changed while pushing - please try again '
       
  2094         '(bookmark "%s" is missing, expected %s)'
       
  2095     )
       
  2096     msgexist = (
       
  2097         'remote repository changed while pushing - please try again '
       
  2098         '(bookmark "%s" set on %s, expected missing)'
       
  2099     )
  1940     for book, node in bookdata:
  2100     for book, node in bookdata:
  1941         currentnode = op.repo._bookmarks.get(book)
  2101         currentnode = op.repo._bookmarks.get(book)
  1942         if currentnode != node:
  2102         if currentnode != node:
  1943             if node is None:
  2103             if node is None:
  1944                 finalmsg = msgexist % (book, nodemod.short(currentnode))
  2104                 finalmsg = msgexist % (book, nodemod.short(currentnode))
  1945             elif currentnode is None:
  2105             elif currentnode is None:
  1946                 finalmsg = msgmissing % (book, nodemod.short(node))
  2106                 finalmsg = msgmissing % (book, nodemod.short(node))
  1947             else:
  2107             else:
  1948                 finalmsg = msgstandard % (book, nodemod.short(node),
  2108                 finalmsg = msgstandard % (
  1949                                           nodemod.short(currentnode))
  2109                     book,
       
  2110                     nodemod.short(node),
       
  2111                     nodemod.short(currentnode),
       
  2112                 )
  1950             raise error.PushRaced(finalmsg)
  2113             raise error.PushRaced(finalmsg)
       
  2114 
  1951 
  2115 
  1952 @parthandler('check:heads')
  2116 @parthandler('check:heads')
  1953 def handlecheckheads(op, inpart):
  2117 def handlecheckheads(op, inpart):
  1954     """check that head of the repo did not change
  2118     """check that head of the repo did not change
  1955 
  2119 
  1963     assert not h
  2127     assert not h
  1964     # Trigger a transaction so that we are guaranteed to have the lock now.
  2128     # Trigger a transaction so that we are guaranteed to have the lock now.
  1965     if op.ui.configbool('experimental', 'bundle2lazylocking'):
  2129     if op.ui.configbool('experimental', 'bundle2lazylocking'):
  1966         op.gettransaction()
  2130         op.gettransaction()
  1967     if sorted(heads) != sorted(op.repo.heads()):
  2131     if sorted(heads) != sorted(op.repo.heads()):
  1968         raise error.PushRaced('remote repository changed while pushing - '
  2132         raise error.PushRaced(
  1969                               'please try again')
  2133             'remote repository changed while pushing - ' 'please try again'
       
  2134         )
       
  2135 
  1970 
  2136 
  1971 @parthandler('check:updated-heads')
  2137 @parthandler('check:updated-heads')
  1972 def handlecheckupdatedheads(op, inpart):
  2138 def handlecheckupdatedheads(op, inpart):
  1973     """check for race on the heads touched by a push
  2139     """check for race on the heads touched by a push
  1974 
  2140 
  1992     for ls in op.repo.branchmap().iterheads():
  2158     for ls in op.repo.branchmap().iterheads():
  1993         currentheads.update(ls)
  2159         currentheads.update(ls)
  1994 
  2160 
  1995     for h in heads:
  2161     for h in heads:
  1996         if h not in currentheads:
  2162         if h not in currentheads:
  1997             raise error.PushRaced('remote repository changed while pushing - '
  2163             raise error.PushRaced(
  1998                                   'please try again')
  2164                 'remote repository changed while pushing - ' 'please try again'
       
  2165             )
       
  2166 
  1999 
  2167 
  2000 @parthandler('check:phases')
  2168 @parthandler('check:phases')
  2001 def handlecheckphases(op, inpart):
  2169 def handlecheckphases(op, inpart):
  2002     """check that phase boundaries of the repository did not change
  2170     """check that phase boundaries of the repository did not change
  2003 
  2171 
  2005     """
  2173     """
  2006     phasetonodes = phases.binarydecode(inpart)
  2174     phasetonodes = phases.binarydecode(inpart)
  2007     unfi = op.repo.unfiltered()
  2175     unfi = op.repo.unfiltered()
  2008     cl = unfi.changelog
  2176     cl = unfi.changelog
  2009     phasecache = unfi._phasecache
  2177     phasecache = unfi._phasecache
  2010     msg = ('remote repository changed while pushing - please try again '
  2178     msg = (
  2011            '(%s is %s expected %s)')
  2179         'remote repository changed while pushing - please try again '
       
  2180         '(%s is %s expected %s)'
       
  2181     )
  2012     for expectedphase, nodes in enumerate(phasetonodes):
  2182     for expectedphase, nodes in enumerate(phasetonodes):
  2013         for n in nodes:
  2183         for n in nodes:
  2014             actualphase = phasecache.phase(unfi, cl.rev(n))
  2184             actualphase = phasecache.phase(unfi, cl.rev(n))
  2015             if actualphase != expectedphase:
  2185             if actualphase != expectedphase:
  2016                 finalmsg = msg % (nodemod.short(n),
  2186                 finalmsg = msg % (
  2017                                   phases.phasenames[actualphase],
  2187                     nodemod.short(n),
  2018                                   phases.phasenames[expectedphase])
  2188                     phases.phasenames[actualphase],
       
  2189                     phases.phasenames[expectedphase],
       
  2190                 )
  2019                 raise error.PushRaced(finalmsg)
  2191                 raise error.PushRaced(finalmsg)
       
  2192 
  2020 
  2193 
  2021 @parthandler('output')
  2194 @parthandler('output')
  2022 def handleoutput(op, inpart):
  2195 def handleoutput(op, inpart):
  2023     """forward output captured on the server to the client"""
  2196     """forward output captured on the server to the client"""
  2024     for line in inpart.read().splitlines():
  2197     for line in inpart.read().splitlines():
  2025         op.ui.status(_('remote: %s\n') % line)
  2198         op.ui.status(_('remote: %s\n') % line)
  2026 
  2199 
       
  2200 
  2027 @parthandler('replycaps')
  2201 @parthandler('replycaps')
  2028 def handlereplycaps(op, inpart):
  2202 def handlereplycaps(op, inpart):
  2029     """Notify that a reply bundle should be created
  2203     """Notify that a reply bundle should be created
  2030 
  2204 
  2031     The payload contains the capabilities information for the reply"""
  2205     The payload contains the capabilities information for the reply"""
  2032     caps = decodecaps(inpart.read())
  2206     caps = decodecaps(inpart.read())
  2033     if op.reply is None:
  2207     if op.reply is None:
  2034         op.reply = bundle20(op.ui, caps)
  2208         op.reply = bundle20(op.ui, caps)
  2035 
  2209 
       
  2210 
  2036 class AbortFromPart(error.Abort):
  2211 class AbortFromPart(error.Abort):
  2037     """Sub-class of Abort that denotes an error from a bundle2 part."""
  2212     """Sub-class of Abort that denotes an error from a bundle2 part."""
       
  2213 
  2038 
  2214 
  2039 @parthandler('error:abort', ('message', 'hint'))
  2215 @parthandler('error:abort', ('message', 'hint'))
  2040 def handleerrorabort(op, inpart):
  2216 def handleerrorabort(op, inpart):
  2041     """Used to transmit abort error over the wire"""
  2217     """Used to transmit abort error over the wire"""
  2042     raise AbortFromPart(inpart.params['message'],
  2218     raise AbortFromPart(
  2043                         hint=inpart.params.get('hint'))
  2219         inpart.params['message'], hint=inpart.params.get('hint')
  2044 
  2220     )
  2045 @parthandler('error:pushkey', ('namespace', 'key', 'new', 'old', 'ret',
  2221 
  2046                                'in-reply-to'))
  2222 
       
  2223 @parthandler(
       
  2224     'error:pushkey', ('namespace', 'key', 'new', 'old', 'ret', 'in-reply-to')
       
  2225 )
  2047 def handleerrorpushkey(op, inpart):
  2226 def handleerrorpushkey(op, inpart):
  2048     """Used to transmit failure of a mandatory pushkey over the wire"""
  2227     """Used to transmit failure of a mandatory pushkey over the wire"""
  2049     kwargs = {}
  2228     kwargs = {}
  2050     for name in ('namespace', 'key', 'new', 'old', 'ret'):
  2229     for name in ('namespace', 'key', 'new', 'old', 'ret'):
  2051         value = inpart.params.get(name)
  2230         value = inpart.params.get(name)
  2052         if value is not None:
  2231         if value is not None:
  2053             kwargs[name] = value
  2232             kwargs[name] = value
  2054     raise error.PushkeyFailed(inpart.params['in-reply-to'],
  2233     raise error.PushkeyFailed(
  2055                               **pycompat.strkwargs(kwargs))
  2234         inpart.params['in-reply-to'], **pycompat.strkwargs(kwargs)
       
  2235     )
       
  2236 
  2056 
  2237 
  2057 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
  2238 @parthandler('error:unsupportedcontent', ('parttype', 'params'))
  2058 def handleerrorunsupportedcontent(op, inpart):
  2239 def handleerrorunsupportedcontent(op, inpart):
  2059     """Used to transmit unknown content error over the wire"""
  2240     """Used to transmit unknown content error over the wire"""
  2060     kwargs = {}
  2241     kwargs = {}
  2065     if params is not None:
  2246     if params is not None:
  2066         kwargs['params'] = params.split('\0')
  2247         kwargs['params'] = params.split('\0')
  2067 
  2248 
  2068     raise error.BundleUnknownFeatureError(**pycompat.strkwargs(kwargs))
  2249     raise error.BundleUnknownFeatureError(**pycompat.strkwargs(kwargs))
  2069 
  2250 
       
  2251 
  2070 @parthandler('error:pushraced', ('message',))
  2252 @parthandler('error:pushraced', ('message',))
  2071 def handleerrorpushraced(op, inpart):
  2253 def handleerrorpushraced(op, inpart):
  2072     """Used to transmit push race error over the wire"""
  2254     """Used to transmit push race error over the wire"""
  2073     raise error.ResponseError(_('push failed:'), inpart.params['message'])
  2255     raise error.ResponseError(_('push failed:'), inpart.params['message'])
       
  2256 
  2074 
  2257 
  2075 @parthandler('listkeys', ('namespace',))
  2258 @parthandler('listkeys', ('namespace',))
  2076 def handlelistkeys(op, inpart):
  2259 def handlelistkeys(op, inpart):
  2077     """retrieve pushkey namespace content stored in a bundle2"""
  2260     """retrieve pushkey namespace content stored in a bundle2"""
  2078     namespace = inpart.params['namespace']
  2261     namespace = inpart.params['namespace']
  2079     r = pushkey.decodekeys(inpart.read())
  2262     r = pushkey.decodekeys(inpart.read())
  2080     op.records.add('listkeys', (namespace, r))
  2263     op.records.add('listkeys', (namespace, r))
       
  2264 
  2081 
  2265 
  2082 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
  2266 @parthandler('pushkey', ('namespace', 'key', 'old', 'new'))
  2083 def handlepushkey(op, inpart):
  2267 def handlepushkey(op, inpart):
  2084     """process a pushkey request"""
  2268     """process a pushkey request"""
  2085     dec = pushkey.decode
  2269     dec = pushkey.decode
  2090     # Grab the transaction to ensure that we have the lock before performing the
  2274     # Grab the transaction to ensure that we have the lock before performing the
  2091     # pushkey.
  2275     # pushkey.
  2092     if op.ui.configbool('experimental', 'bundle2lazylocking'):
  2276     if op.ui.configbool('experimental', 'bundle2lazylocking'):
  2093         op.gettransaction()
  2277         op.gettransaction()
  2094     ret = op.repo.pushkey(namespace, key, old, new)
  2278     ret = op.repo.pushkey(namespace, key, old, new)
  2095     record = {'namespace': namespace,
  2279     record = {'namespace': namespace, 'key': key, 'old': old, 'new': new}
  2096               'key': key,
       
  2097               'old': old,
       
  2098               'new': new}
       
  2099     op.records.add('pushkey', record)
  2280     op.records.add('pushkey', record)
  2100     if op.reply is not None:
  2281     if op.reply is not None:
  2101         rpart = op.reply.newpart('reply:pushkey')
  2282         rpart = op.reply.newpart('reply:pushkey')
  2102         rpart.addparam(
  2283         rpart.addparam(
  2103             'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
  2284             'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
       
  2285         )
  2104         rpart.addparam('return', '%i' % ret, mandatory=False)
  2286         rpart.addparam('return', '%i' % ret, mandatory=False)
  2105     if inpart.mandatory and not ret:
  2287     if inpart.mandatory and not ret:
  2106         kwargs = {}
  2288         kwargs = {}
  2107         for key in ('namespace', 'key', 'new', 'old', 'ret'):
  2289         for key in ('namespace', 'key', 'new', 'old', 'ret'):
  2108             if key in inpart.params:
  2290             if key in inpart.params:
  2109                 kwargs[key] = inpart.params[key]
  2291                 kwargs[key] = inpart.params[key]
  2110         raise error.PushkeyFailed(partid='%d' % inpart.id,
  2292         raise error.PushkeyFailed(
  2111                                   **pycompat.strkwargs(kwargs))
  2293             partid='%d' % inpart.id, **pycompat.strkwargs(kwargs)
       
  2294         )
       
  2295 
  2112 
  2296 
  2113 @parthandler('bookmarks')
  2297 @parthandler('bookmarks')
  2114 def handlebookmark(op, inpart):
  2298 def handlebookmark(op, inpart):
  2115     """transmit bookmark information
  2299     """transmit bookmark information
  2116 
  2300 
  2145                 hookargs['old'] = nodemod.hex(bookstore.get(book, ''))
  2329                 hookargs['old'] = nodemod.hex(bookstore.get(book, ''))
  2146                 hookargs['new'] = nodemod.hex(node if node is not None else '')
  2330                 hookargs['new'] = nodemod.hex(node if node is not None else '')
  2147                 allhooks.append(hookargs)
  2331                 allhooks.append(hookargs)
  2148 
  2332 
  2149             for hookargs in allhooks:
  2333             for hookargs in allhooks:
  2150                 op.repo.hook('prepushkey', throw=True,
  2334                 op.repo.hook(
  2151                              **pycompat.strkwargs(hookargs))
  2335                     'prepushkey', throw=True, **pycompat.strkwargs(hookargs)
       
  2336                 )
  2152 
  2337 
  2153         bookstore.applychanges(op.repo, op.gettransaction(), changes)
  2338         bookstore.applychanges(op.repo, op.gettransaction(), changes)
  2154 
  2339 
  2155         if pushkeycompat:
  2340         if pushkeycompat:
       
  2341 
  2156             def runhook():
  2342             def runhook():
  2157                 for hookargs in allhooks:
  2343                 for hookargs in allhooks:
  2158                     op.repo.hook('pushkey', **pycompat.strkwargs(hookargs))
  2344                     op.repo.hook('pushkey', **pycompat.strkwargs(hookargs))
       
  2345 
  2159             op.repo._afterlock(runhook)
  2346             op.repo._afterlock(runhook)
  2160 
  2347 
  2161     elif bookmarksmode == 'records':
  2348     elif bookmarksmode == 'records':
  2162         for book, node in changes:
  2349         for book, node in changes:
  2163             record = {'bookmark': book, 'node': node}
  2350             record = {'bookmark': book, 'node': node}
  2164             op.records.add('bookmarks', record)
  2351             op.records.add('bookmarks', record)
  2165     else:
  2352     else:
  2166         raise error.ProgrammingError('unkown bookmark mode: %s' % bookmarksmode)
  2353         raise error.ProgrammingError('unkown bookmark mode: %s' % bookmarksmode)
  2167 
  2354 
       
  2355 
  2168 @parthandler('phase-heads')
  2356 @parthandler('phase-heads')
  2169 def handlephases(op, inpart):
  2357 def handlephases(op, inpart):
  2170     """apply phases from bundle part to repo"""
  2358     """apply phases from bundle part to repo"""
  2171     headsbyphase = phases.binarydecode(inpart)
  2359     headsbyphase = phases.binarydecode(inpart)
  2172     phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
  2360     phases.updatephases(op.repo.unfiltered(), op.gettransaction, headsbyphase)
       
  2361 
  2173 
  2362 
  2174 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
  2363 @parthandler('reply:pushkey', ('return', 'in-reply-to'))
  2175 def handlepushkeyreply(op, inpart):
  2364 def handlepushkeyreply(op, inpart):
  2176     """retrieve the result of a pushkey request"""
  2365     """retrieve the result of a pushkey request"""
  2177     ret = int(inpart.params['return'])
  2366     ret = int(inpart.params['return'])
  2178     partid = int(inpart.params['in-reply-to'])
  2367     partid = int(inpart.params['in-reply-to'])
  2179     op.records.add('pushkey', {'return': ret}, partid)
  2368     op.records.add('pushkey', {'return': ret}, partid)
  2180 
  2369 
       
  2370 
  2181 @parthandler('obsmarkers')
  2371 @parthandler('obsmarkers')
  2182 def handleobsmarker(op, inpart):
  2372 def handleobsmarker(op, inpart):
  2183     """add a stream of obsmarkers to the repo"""
  2373     """add a stream of obsmarkers to the repo"""
  2184     tr = op.gettransaction()
  2374     tr = op.gettransaction()
  2185     markerdata = inpart.read()
  2375     markerdata = inpart.read()
  2186     if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
  2376     if op.ui.config('experimental', 'obsmarkers-exchange-debug'):
  2187         op.ui.write(('obsmarker-exchange: %i bytes received\n')
  2377         op.ui.write('obsmarker-exchange: %i bytes received\n' % len(markerdata))
  2188                     % len(markerdata))
       
  2189     # The mergemarkers call will crash if marker creation is not enabled.
  2378     # The mergemarkers call will crash if marker creation is not enabled.
  2190     # we want to avoid this if the part is advisory.
  2379     # we want to avoid this if the part is advisory.
  2191     if not inpart.mandatory and op.repo.obsstore.readonly:
  2380     if not inpart.mandatory and op.repo.obsstore.readonly:
  2192         op.repo.ui.debug('ignoring obsolescence markers, feature not enabled\n')
  2381         op.repo.ui.debug('ignoring obsolescence markers, feature not enabled\n')
  2193         return
  2382         return
  2195     op.repo.invalidatevolatilesets()
  2384     op.repo.invalidatevolatilesets()
  2196     op.records.add('obsmarkers', {'new': new})
  2385     op.records.add('obsmarkers', {'new': new})
  2197     if op.reply is not None:
  2386     if op.reply is not None:
  2198         rpart = op.reply.newpart('reply:obsmarkers')
  2387         rpart = op.reply.newpart('reply:obsmarkers')
  2199         rpart.addparam(
  2388         rpart.addparam(
  2200             'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False)
  2389             'in-reply-to', pycompat.bytestr(inpart.id), mandatory=False
       
  2390         )
  2201         rpart.addparam('new', '%i' % new, mandatory=False)
  2391         rpart.addparam('new', '%i' % new, mandatory=False)
  2202 
  2392 
  2203 
  2393 
  2204 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
  2394 @parthandler('reply:obsmarkers', ('new', 'in-reply-to'))
  2205 def handleobsmarkerreply(op, inpart):
  2395 def handleobsmarkerreply(op, inpart):
  2206     """retrieve the result of a pushkey request"""
  2396     """retrieve the result of a pushkey request"""
  2207     ret = int(inpart.params['new'])
  2397     ret = int(inpart.params['new'])
  2208     partid = int(inpart.params['in-reply-to'])
  2398     partid = int(inpart.params['in-reply-to'])
  2209     op.records.add('obsmarkers', {'new': ret}, partid)
  2399     op.records.add('obsmarkers', {'new': ret}, partid)
       
  2400 
  2210 
  2401 
  2211 @parthandler('hgtagsfnodes')
  2402 @parthandler('hgtagsfnodes')
  2212 def handlehgtagsfnodes(op, inpart):
  2403 def handlehgtagsfnodes(op, inpart):
  2213     """Applies .hgtags fnodes cache entries to the local repo.
  2404     """Applies .hgtags fnodes cache entries to the local repo.
  2214 
  2405 
  2230         count += 1
  2421         count += 1
  2231 
  2422 
  2232     cache.write()
  2423     cache.write()
  2233     op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
  2424     op.ui.debug('applied %i hgtags fnodes cache entries\n' % count)
  2234 
  2425 
       
  2426 
  2235 rbcstruct = struct.Struct('>III')
  2427 rbcstruct = struct.Struct('>III')
       
  2428 
  2236 
  2429 
  2237 @parthandler('cache:rev-branch-cache')
  2430 @parthandler('cache:rev-branch-cache')
  2238 def handlerbc(op, inpart):
  2431 def handlerbc(op, inpart):
  2239     """receive a rev-branch-cache payload and update the local cache
  2432     """receive a rev-branch-cache payload and update the local cache
  2240 
  2433 
  2264             rev = cl.rev(node)
  2457             rev = cl.rev(node)
  2265             cache.setdata(branch, rev, node, True)
  2458             cache.setdata(branch, rev, node, True)
  2266         rawheader = inpart.read(rbcstruct.size)
  2459         rawheader = inpart.read(rbcstruct.size)
  2267     cache.write()
  2460     cache.write()
  2268 
  2461 
       
  2462 
  2269 @parthandler('pushvars')
  2463 @parthandler('pushvars')
  2270 def bundle2getvars(op, part):
  2464 def bundle2getvars(op, part):
  2271     '''unbundle a bundle2 containing shellvars on the server'''
  2465     '''unbundle a bundle2 containing shellvars on the server'''
  2272     # An option to disable unbundling on server-side for security reasons
  2466     # An option to disable unbundling on server-side for security reasons
  2273     if op.ui.configbool('push', 'pushvars.server'):
  2467     if op.ui.configbool('push', 'pushvars.server'):
  2278             # they came from the --pushvar flag.
  2472             # they came from the --pushvar flag.
  2279             key = "USERVAR_" + key
  2473             key = "USERVAR_" + key
  2280             hookargs[key] = value
  2474             hookargs[key] = value
  2281         op.addhookargs(hookargs)
  2475         op.addhookargs(hookargs)
  2282 
  2476 
       
  2477 
  2283 @parthandler('stream2', ('requirements', 'filecount', 'bytecount'))
  2478 @parthandler('stream2', ('requirements', 'filecount', 'bytecount'))
  2284 def handlestreamv2bundle(op, part):
  2479 def handlestreamv2bundle(op, part):
  2285 
  2480 
  2286     requirements = urlreq.unquote(part.params['requirements']).split(',')
  2481     requirements = urlreq.unquote(part.params['requirements']).split(',')
  2287     filecount = int(part.params['filecount'])
  2482     filecount = int(part.params['filecount'])
  2291     if len(repo):
  2486     if len(repo):
  2292         msg = _('cannot apply stream clone to non empty repository')
  2487         msg = _('cannot apply stream clone to non empty repository')
  2293         raise error.Abort(msg)
  2488         raise error.Abort(msg)
  2294 
  2489 
  2295     repo.ui.debug('applying stream bundle\n')
  2490     repo.ui.debug('applying stream bundle\n')
  2296     streamclone.applybundlev2(repo, part, filecount, bytecount,
  2491     streamclone.applybundlev2(repo, part, filecount, bytecount, requirements)
  2297                               requirements)
  2492 
  2298 
  2493 
  2299 def widen_bundle(bundler, repo, oldmatcher, newmatcher, common,
  2494 def widen_bundle(
  2300                  known, cgversion, ellipses):
  2495     bundler, repo, oldmatcher, newmatcher, common, known, cgversion, ellipses
       
  2496 ):
  2301     """generates bundle2 for widening a narrow clone
  2497     """generates bundle2 for widening a narrow clone
  2302 
  2498 
  2303     bundler is the bundle to which data should be added
  2499     bundler is the bundle to which data should be added
  2304     repo is the localrepository instance
  2500     repo is the localrepository instance
  2305     oldmatcher matches what the client already has
  2501     oldmatcher matches what the client already has
  2316     for r in repo.revs("::%ln", common):
  2512     for r in repo.revs("::%ln", common):
  2317         commonnodes.add(cl.node(r))
  2513         commonnodes.add(cl.node(r))
  2318     if commonnodes:
  2514     if commonnodes:
  2319         # XXX: we should only send the filelogs (and treemanifest). user
  2515         # XXX: we should only send the filelogs (and treemanifest). user
  2320         # already has the changelog and manifest
  2516         # already has the changelog and manifest
  2321         packer = changegroup.getbundler(cgversion, repo,
  2517         packer = changegroup.getbundler(
  2322                                         oldmatcher=oldmatcher,
  2518             cgversion,
  2323                                         matcher=newmatcher,
  2519             repo,
  2324                                         fullnodes=commonnodes)
  2520             oldmatcher=oldmatcher,
  2325         cgdata = packer.generate({nodemod.nullid}, list(commonnodes),
  2521             matcher=newmatcher,
  2326                                  False, 'narrow_widen', changelog=False)
  2522             fullnodes=commonnodes,
       
  2523         )
       
  2524         cgdata = packer.generate(
       
  2525             {nodemod.nullid},
       
  2526             list(commonnodes),
       
  2527             False,
       
  2528             'narrow_widen',
       
  2529             changelog=False,
       
  2530         )
  2327 
  2531 
  2328         part = bundler.newpart('changegroup', data=cgdata)
  2532         part = bundler.newpart('changegroup', data=cgdata)
  2329         part.addparam('version', cgversion)
  2533         part.addparam('version', cgversion)
  2330         if 'treemanifest' in repo.requirements:
  2534         if 'treemanifest' in repo.requirements:
  2331             part.addparam('treemanifest', '1')
  2535             part.addparam('treemanifest', '1')