# HG changeset patch # User Augie Fackler # Date 1444769931 14400 # Node ID 1121fced5b204e19de1e36edcad0a7643d7697c1 # Parent c2e6e3cc7cb4f88b4939009acf8af78a8ba4ae0d changegroup: migrate addchangegroup() to forward to cg?unpacker.apply() I'll clean up callers in subsequent patches, then remove the forwarding. diff -r c2e6e3cc7cb4 -r 1121fced5b20 mercurial/changegroup.py --- a/mercurial/changegroup.py Tue Oct 13 15:54:05 2015 -0400 +++ b/mercurial/changegroup.py Tue Oct 13 16:58:51 2015 -0400 @@ -267,6 +267,198 @@ pos = next yield closechunk() + def apply(self, repo, srctype, url, emptyok=False, + targetphase=phases.draft, expectedtotal=None): + """Add the changegroup returned by source.read() to this repo. + srctype is a string like 'push', 'pull', or 'unbundle'. url is + the URL of the repo where this changegroup is coming from. + + Return an integer summarizing the change to this repo: + - nothing changed or no source: 0 + - more heads than before: 1+added heads (2..n) + - fewer heads than before: -1-removed heads (-2..-n) + - number of heads stays the same: 1 + """ + repo = repo.unfiltered() + def csmap(x): + repo.ui.debug("add changeset %s\n" % short(x)) + return len(cl) + + def revmap(x): + return cl.rev(x) + + changesets = files = revisions = 0 + + tr = repo.transaction("\n".join([srctype, util.hidepassword(url)])) + # The transaction could have been created before and already + # carries source information. In this case we use the top + # level data. We overwrite the argument because we need to use + # the top level value (if they exist) in this function. + srctype = tr.hookargs.setdefault('source', srctype) + url = tr.hookargs.setdefault('url', url) + + # write changelog data to temp files so concurrent readers will not see + # inconsistent view + cl = repo.changelog + cl.delayupdate(tr) + oldheads = cl.heads() + try: + repo.hook('prechangegroup', throw=True, **tr.hookargs) + + trp = weakref.proxy(tr) + # pull off the changeset group + repo.ui.status(_("adding changesets\n")) + clstart = len(cl) + class prog(object): + def __init__(self, step, total): + self._step = step + self._total = total + self._count = 1 + def __call__(self): + repo.ui.progress(self._step, self._count, unit=_('chunks'), + total=self._total) + self._count += 1 + self.callback = prog(_('changesets'), expectedtotal) + + efiles = set() + def onchangelog(cl, node): + efiles.update(cl.read(node)[3]) + + self.changelogheader() + srccontent = cl.addgroup(self, csmap, trp, + addrevisioncb=onchangelog) + efiles = len(efiles) + + if not (srccontent or emptyok): + raise error.Abort(_("received changelog group is empty")) + clend = len(cl) + changesets = clend - clstart + repo.ui.progress(_('changesets'), None) + + # pull off the manifest group + repo.ui.status(_("adding manifests\n")) + # manifests <= changesets + self.callback = prog(_('manifests'), changesets) + # no need to check for empty manifest group here: + # if the result of the merge of 1 and 2 is the same in 3 and 4, + # no new manifest will be created and the manifest group will + # be empty during the pull + self.manifestheader() + repo.manifest.addgroup(self, revmap, trp) + repo.ui.progress(_('manifests'), None) + + needfiles = {} + if repo.ui.configbool('server', 'validate', default=False): + # validate incoming csets have their manifests + for cset in xrange(clstart, clend): + mfnode = repo.changelog.read(repo.changelog.node(cset))[0] + mfest = repo.manifest.readdelta(mfnode) + # store file nodes we must see + for f, n in mfest.iteritems(): + needfiles.setdefault(f, set()).add(n) + + # process the files + repo.ui.status(_("adding file changes\n")) + self.callback = None + pr = prog(_('files'), efiles) + newrevs, newfiles = addchangegroupfiles(repo, self, revmap, trp, pr, + needfiles) + revisions += newrevs + files += newfiles + + dh = 0 + if oldheads: + heads = cl.heads() + dh = len(heads) - len(oldheads) + for h in heads: + if h not in oldheads and repo[h].closesbranch(): + dh -= 1 + htext = "" + if dh: + htext = _(" (%+d heads)") % dh + + repo.ui.status(_("added %d changesets" + " with %d changes to %d files%s\n") + % (changesets, revisions, files, htext)) + repo.invalidatevolatilesets() + + if changesets > 0: + p = lambda: tr.writepending() and repo.root or "" + if 'node' not in tr.hookargs: + tr.hookargs['node'] = hex(cl.node(clstart)) + hookargs = dict(tr.hookargs) + else: + hookargs = dict(tr.hookargs) + hookargs['node'] = hex(cl.node(clstart)) + repo.hook('pretxnchangegroup', throw=True, pending=p, + **hookargs) + + added = [cl.node(r) for r in xrange(clstart, clend)] + publishing = repo.publishing() + if srctype in ('push', 'serve'): + # Old servers can not push the boundary themselves. + # New servers won't push the boundary if changeset already + # exists locally as secret + # + # We should not use added here but the list of all change in + # the bundle + if publishing: + phases.advanceboundary(repo, tr, phases.public, srccontent) + else: + # Those changesets have been pushed from the outside, their + # phases are going to be pushed alongside. Therefor + # `targetphase` is ignored. + phases.advanceboundary(repo, tr, phases.draft, srccontent) + phases.retractboundary(repo, tr, phases.draft, added) + elif srctype != 'strip': + # publishing only alter behavior during push + # + # strip should not touch boundary at all + phases.retractboundary(repo, tr, targetphase, added) + + if changesets > 0: + if srctype != 'strip': + # During strip, branchcache is invalid but coming call to + # `destroyed` will repair it. + # In other case we can safely update cache on disk. + branchmap.updatecache(repo.filtered('served')) + + def runhooks(): + # These hooks run when the lock releases, not when the + # transaction closes. So it's possible for the changelog + # to have changed since we last saw it. + if clstart >= len(repo): + return + + # forcefully update the on-disk branch cache + repo.ui.debug("updating the branch cache\n") + repo.hook("changegroup", **hookargs) + + for n in added: + args = hookargs.copy() + args['node'] = hex(n) + repo.hook("incoming", **args) + + newheads = [h for h in repo.heads() if h not in oldheads] + repo.ui.log("incoming", + "%s incoming changes - new heads: %s\n", + len(added), + ', '.join([hex(c[:6]) for c in newheads])) + + tr.addpostclose('changegroup-runhooks-%020i' % clstart, + lambda tr: repo._afterlock(runhooks)) + + tr.close() + + finally: + tr.release() + repo.ui.flush() + # never return 0 here: + if dh < 0: + return dh - 1 + else: + return dh + 1 + class cg2unpacker(cg1unpacker): deltaheader = _CHANGEGROUPV2_DELTA_HEADER deltaheadersize = struct.calcsize(deltaheader) @@ -720,194 +912,9 @@ def addchangegroup(repo, source, srctype, url, emptyok=False, targetphase=phases.draft, expectedtotal=None): - """Add the changegroup returned by source.read() to this repo. - srctype is a string like 'push', 'pull', or 'unbundle'. url is - the URL of the repo where this changegroup is coming from. - - Return an integer summarizing the change to this repo: - - nothing changed or no source: 0 - - more heads than before: 1+added heads (2..n) - - fewer heads than before: -1-removed heads (-2..-n) - - number of heads stays the same: 1 - """ + """Legacy forwarding method to cg?unpacker.apply() to be removed soon.""" if not source: return 0 - repo = repo.unfiltered() - def csmap(x): - repo.ui.debug("add changeset %s\n" % short(x)) - return len(cl) - - def revmap(x): - return cl.rev(x) - - changesets = files = revisions = 0 - - tr = repo.transaction("\n".join([srctype, util.hidepassword(url)])) - # The transaction could have been created before and already carries source - # information. In this case we use the top level data. We overwrite the - # argument because we need to use the top level value (if they exist) in - # this function. - srctype = tr.hookargs.setdefault('source', srctype) - url = tr.hookargs.setdefault('url', url) - - # write changelog data to temp files so concurrent readers will not see - # inconsistent view - cl = repo.changelog - cl.delayupdate(tr) - oldheads = cl.heads() - try: - repo.hook('prechangegroup', throw=True, **tr.hookargs) - - trp = weakref.proxy(tr) - # pull off the changeset group - repo.ui.status(_("adding changesets\n")) - clstart = len(cl) - class prog(object): - def __init__(self, step, total): - self._step = step - self._total = total - self._count = 1 - def __call__(self): - repo.ui.progress(self._step, self._count, unit=_('chunks'), - total=self._total) - self._count += 1 - source.callback = prog(_('changesets'), expectedtotal) - - efiles = set() - def onchangelog(cl, node): - efiles.update(cl.read(node)[3]) - - source.changelogheader() - srccontent = cl.addgroup(source, csmap, trp, - addrevisioncb=onchangelog) - efiles = len(efiles) - - if not (srccontent or emptyok): - raise error.Abort(_("received changelog group is empty")) - clend = len(cl) - changesets = clend - clstart - repo.ui.progress(_('changesets'), None) - - # pull off the manifest group - repo.ui.status(_("adding manifests\n")) - # manifests <= changesets - source.callback = prog(_('manifests'), changesets) - # no need to check for empty manifest group here: - # if the result of the merge of 1 and 2 is the same in 3 and 4, - # no new manifest will be created and the manifest group will - # be empty during the pull - source.manifestheader() - repo.manifest.addgroup(source, revmap, trp) - repo.ui.progress(_('manifests'), None) - - needfiles = {} - if repo.ui.configbool('server', 'validate', default=False): - # validate incoming csets have their manifests - for cset in xrange(clstart, clend): - mfnode = repo.changelog.read(repo.changelog.node(cset))[0] - mfest = repo.manifest.readdelta(mfnode) - # store file nodes we must see - for f, n in mfest.iteritems(): - needfiles.setdefault(f, set()).add(n) - - # process the files - repo.ui.status(_("adding file changes\n")) - source.callback = None - pr = prog(_('files'), efiles) - newrevs, newfiles = addchangegroupfiles(repo, source, revmap, trp, pr, - needfiles) - revisions += newrevs - files += newfiles - - dh = 0 - if oldheads: - heads = cl.heads() - dh = len(heads) - len(oldheads) - for h in heads: - if h not in oldheads and repo[h].closesbranch(): - dh -= 1 - htext = "" - if dh: - htext = _(" (%+d heads)") % dh - - repo.ui.status(_("added %d changesets" - " with %d changes to %d files%s\n") - % (changesets, revisions, files, htext)) - repo.invalidatevolatilesets() - - if changesets > 0: - p = lambda: tr.writepending() and repo.root or "" - if 'node' not in tr.hookargs: - tr.hookargs['node'] = hex(cl.node(clstart)) - hookargs = dict(tr.hookargs) - else: - hookargs = dict(tr.hookargs) - hookargs['node'] = hex(cl.node(clstart)) - repo.hook('pretxnchangegroup', throw=True, pending=p, **hookargs) - - added = [cl.node(r) for r in xrange(clstart, clend)] - publishing = repo.publishing() - if srctype in ('push', 'serve'): - # Old servers can not push the boundary themselves. - # New servers won't push the boundary if changeset already - # exists locally as secret - # - # We should not use added here but the list of all change in - # the bundle - if publishing: - phases.advanceboundary(repo, tr, phases.public, srccontent) - else: - # Those changesets have been pushed from the outside, their - # phases are going to be pushed alongside. Therefor - # `targetphase` is ignored. - phases.advanceboundary(repo, tr, phases.draft, srccontent) - phases.retractboundary(repo, tr, phases.draft, added) - elif srctype != 'strip': - # publishing only alter behavior during push - # - # strip should not touch boundary at all - phases.retractboundary(repo, tr, targetphase, added) - - if changesets > 0: - if srctype != 'strip': - # During strip, branchcache is invalid but coming call to - # `destroyed` will repair it. - # In other case we can safely update cache on disk. - branchmap.updatecache(repo.filtered('served')) - - def runhooks(): - # These hooks run when the lock releases, not when the - # transaction closes. So it's possible for the changelog - # to have changed since we last saw it. - if clstart >= len(repo): - return - - # forcefully update the on-disk branch cache - repo.ui.debug("updating the branch cache\n") - repo.hook("changegroup", **hookargs) - - for n in added: - args = hookargs.copy() - args['node'] = hex(n) - repo.hook("incoming", **args) - - newheads = [h for h in repo.heads() if h not in oldheads] - repo.ui.log("incoming", - "%s incoming changes - new heads: %s\n", - len(added), - ', '.join([hex(c[:6]) for c in newheads])) - - tr.addpostclose('changegroup-runhooks-%020i' % clstart, - lambda tr: repo._afterlock(runhooks)) - - tr.close() - - finally: - tr.release() - repo.ui.flush() - # never return 0 here: - if dh < 0: - return dh - 1 - else: - return dh + 1 + return source.apply(repo, srctype, url, emptyok=emptyok, + targetphase=targetphase, expectedtotal=expectedtotal)