changegroup: migrate addchangegroup() to forward to cg?unpacker.apply()
authorAugie Fackler <augie@google.com>
Tue, 13 Oct 2015 16:58:51 -0400
changeset 26695 1121fced5b20
parent 26694 c2e6e3cc7cb4
child 26696 78aa4392c261
changegroup: migrate addchangegroup() to forward to cg?unpacker.apply() I'll clean up callers in subsequent patches, then remove the forwarding.
mercurial/changegroup.py
--- a/mercurial/changegroup.py	Tue Oct 13 15:54:05 2015 -0400
+++ b/mercurial/changegroup.py	Tue Oct 13 16:58:51 2015 -0400
@@ -267,6 +267,198 @@
                     pos = next
             yield closechunk()
 
+    def apply(self, repo, srctype, url, emptyok=False,
+              targetphase=phases.draft, expectedtotal=None):
+        """Add the changegroup returned by source.read() to this repo.
+        srctype is a string like 'push', 'pull', or 'unbundle'.  url is
+        the URL of the repo where this changegroup is coming from.
+
+        Return an integer summarizing the change to this repo:
+        - nothing changed or no source: 0
+        - more heads than before: 1+added heads (2..n)
+        - fewer heads than before: -1-removed heads (-2..-n)
+        - number of heads stays the same: 1
+        """
+        repo = repo.unfiltered()
+        def csmap(x):
+            repo.ui.debug("add changeset %s\n" % short(x))
+            return len(cl)
+
+        def revmap(x):
+            return cl.rev(x)
+
+        changesets = files = revisions = 0
+
+        tr = repo.transaction("\n".join([srctype, util.hidepassword(url)]))
+        # The transaction could have been created before and already
+        # carries source information. In this case we use the top
+        # level data. We overwrite the argument because we need to use
+        # the top level value (if they exist) in this function.
+        srctype = tr.hookargs.setdefault('source', srctype)
+        url = tr.hookargs.setdefault('url', url)
+
+        # write changelog data to temp files so concurrent readers will not see
+        # inconsistent view
+        cl = repo.changelog
+        cl.delayupdate(tr)
+        oldheads = cl.heads()
+        try:
+            repo.hook('prechangegroup', throw=True, **tr.hookargs)
+
+            trp = weakref.proxy(tr)
+            # pull off the changeset group
+            repo.ui.status(_("adding changesets\n"))
+            clstart = len(cl)
+            class prog(object):
+                def __init__(self, step, total):
+                    self._step = step
+                    self._total = total
+                    self._count = 1
+                def __call__(self):
+                    repo.ui.progress(self._step, self._count, unit=_('chunks'),
+                                     total=self._total)
+                    self._count += 1
+            self.callback = prog(_('changesets'), expectedtotal)
+
+            efiles = set()
+            def onchangelog(cl, node):
+                efiles.update(cl.read(node)[3])
+
+            self.changelogheader()
+            srccontent = cl.addgroup(self, csmap, trp,
+                                     addrevisioncb=onchangelog)
+            efiles = len(efiles)
+
+            if not (srccontent or emptyok):
+                raise error.Abort(_("received changelog group is empty"))
+            clend = len(cl)
+            changesets = clend - clstart
+            repo.ui.progress(_('changesets'), None)
+
+            # pull off the manifest group
+            repo.ui.status(_("adding manifests\n"))
+            # manifests <= changesets
+            self.callback = prog(_('manifests'), changesets)
+            # no need to check for empty manifest group here:
+            # if the result of the merge of 1 and 2 is the same in 3 and 4,
+            # no new manifest will be created and the manifest group will
+            # be empty during the pull
+            self.manifestheader()
+            repo.manifest.addgroup(self, revmap, trp)
+            repo.ui.progress(_('manifests'), None)
+
+            needfiles = {}
+            if repo.ui.configbool('server', 'validate', default=False):
+                # validate incoming csets have their manifests
+                for cset in xrange(clstart, clend):
+                    mfnode = repo.changelog.read(repo.changelog.node(cset))[0]
+                    mfest = repo.manifest.readdelta(mfnode)
+                    # store file nodes we must see
+                    for f, n in mfest.iteritems():
+                        needfiles.setdefault(f, set()).add(n)
+
+            # process the files
+            repo.ui.status(_("adding file changes\n"))
+            self.callback = None
+            pr = prog(_('files'), efiles)
+            newrevs, newfiles = addchangegroupfiles(repo, self, revmap, trp, pr,
+                                                    needfiles)
+            revisions += newrevs
+            files += newfiles
+
+            dh = 0
+            if oldheads:
+                heads = cl.heads()
+                dh = len(heads) - len(oldheads)
+                for h in heads:
+                    if h not in oldheads and repo[h].closesbranch():
+                        dh -= 1
+            htext = ""
+            if dh:
+                htext = _(" (%+d heads)") % dh
+
+            repo.ui.status(_("added %d changesets"
+                             " with %d changes to %d files%s\n")
+                             % (changesets, revisions, files, htext))
+            repo.invalidatevolatilesets()
+
+            if changesets > 0:
+                p = lambda: tr.writepending() and repo.root or ""
+                if 'node' not in tr.hookargs:
+                    tr.hookargs['node'] = hex(cl.node(clstart))
+                    hookargs = dict(tr.hookargs)
+                else:
+                    hookargs = dict(tr.hookargs)
+                    hookargs['node'] = hex(cl.node(clstart))
+                repo.hook('pretxnchangegroup', throw=True, pending=p,
+                          **hookargs)
+
+            added = [cl.node(r) for r in xrange(clstart, clend)]
+            publishing = repo.publishing()
+            if srctype in ('push', 'serve'):
+                # Old servers can not push the boundary themselves.
+                # New servers won't push the boundary if changeset already
+                # exists locally as secret
+                #
+                # We should not use added here but the list of all change in
+                # the bundle
+                if publishing:
+                    phases.advanceboundary(repo, tr, phases.public, srccontent)
+                else:
+                    # Those changesets have been pushed from the outside, their
+                    # phases are going to be pushed alongside. Therefor
+                    # `targetphase` is ignored.
+                    phases.advanceboundary(repo, tr, phases.draft, srccontent)
+                    phases.retractboundary(repo, tr, phases.draft, added)
+            elif srctype != 'strip':
+                # publishing only alter behavior during push
+                #
+                # strip should not touch boundary at all
+                phases.retractboundary(repo, tr, targetphase, added)
+
+            if changesets > 0:
+                if srctype != 'strip':
+                    # During strip, branchcache is invalid but coming call to
+                    # `destroyed` will repair it.
+                    # In other case we can safely update cache on disk.
+                    branchmap.updatecache(repo.filtered('served'))
+
+                def runhooks():
+                    # These hooks run when the lock releases, not when the
+                    # transaction closes. So it's possible for the changelog
+                    # to have changed since we last saw it.
+                    if clstart >= len(repo):
+                        return
+
+                    # forcefully update the on-disk branch cache
+                    repo.ui.debug("updating the branch cache\n")
+                    repo.hook("changegroup", **hookargs)
+
+                    for n in added:
+                        args = hookargs.copy()
+                        args['node'] = hex(n)
+                        repo.hook("incoming", **args)
+
+                    newheads = [h for h in repo.heads() if h not in oldheads]
+                    repo.ui.log("incoming",
+                                "%s incoming changes - new heads: %s\n",
+                                len(added),
+                                ', '.join([hex(c[:6]) for c in newheads]))
+
+                tr.addpostclose('changegroup-runhooks-%020i' % clstart,
+                                lambda tr: repo._afterlock(runhooks))
+
+            tr.close()
+
+        finally:
+            tr.release()
+            repo.ui.flush()
+        # never return 0 here:
+        if dh < 0:
+            return dh - 1
+        else:
+            return dh + 1
+
 class cg2unpacker(cg1unpacker):
     deltaheader = _CHANGEGROUPV2_DELTA_HEADER
     deltaheadersize = struct.calcsize(deltaheader)
@@ -720,194 +912,9 @@
 
 def addchangegroup(repo, source, srctype, url, emptyok=False,
                    targetphase=phases.draft, expectedtotal=None):
-    """Add the changegroup returned by source.read() to this repo.
-    srctype is a string like 'push', 'pull', or 'unbundle'.  url is
-    the URL of the repo where this changegroup is coming from.
-
-    Return an integer summarizing the change to this repo:
-    - nothing changed or no source: 0
-    - more heads than before: 1+added heads (2..n)
-    - fewer heads than before: -1-removed heads (-2..-n)
-    - number of heads stays the same: 1
-    """
+    """Legacy forwarding method to cg?unpacker.apply() to be removed soon."""
     if not source:
         return 0
 
-    repo = repo.unfiltered()
-    def csmap(x):
-        repo.ui.debug("add changeset %s\n" % short(x))
-        return len(cl)
-
-    def revmap(x):
-        return cl.rev(x)
-
-    changesets = files = revisions = 0
-
-    tr = repo.transaction("\n".join([srctype, util.hidepassword(url)]))
-    # The transaction could have been created before and already carries source
-    # information. In this case we use the top level data. We overwrite the
-    # argument because we need to use the top level value (if they exist) in
-    # this function.
-    srctype = tr.hookargs.setdefault('source', srctype)
-    url = tr.hookargs.setdefault('url', url)
-
-    # write changelog data to temp files so concurrent readers will not see
-    # inconsistent view
-    cl = repo.changelog
-    cl.delayupdate(tr)
-    oldheads = cl.heads()
-    try:
-        repo.hook('prechangegroup', throw=True, **tr.hookargs)
-
-        trp = weakref.proxy(tr)
-        # pull off the changeset group
-        repo.ui.status(_("adding changesets\n"))
-        clstart = len(cl)
-        class prog(object):
-            def __init__(self, step, total):
-                self._step = step
-                self._total = total
-                self._count = 1
-            def __call__(self):
-                repo.ui.progress(self._step, self._count, unit=_('chunks'),
-                                 total=self._total)
-                self._count += 1
-        source.callback = prog(_('changesets'), expectedtotal)
-
-        efiles = set()
-        def onchangelog(cl, node):
-            efiles.update(cl.read(node)[3])
-
-        source.changelogheader()
-        srccontent = cl.addgroup(source, csmap, trp,
-                                 addrevisioncb=onchangelog)
-        efiles = len(efiles)
-
-        if not (srccontent or emptyok):
-            raise error.Abort(_("received changelog group is empty"))
-        clend = len(cl)
-        changesets = clend - clstart
-        repo.ui.progress(_('changesets'), None)
-
-        # pull off the manifest group
-        repo.ui.status(_("adding manifests\n"))
-        # manifests <= changesets
-        source.callback = prog(_('manifests'), changesets)
-        # no need to check for empty manifest group here:
-        # if the result of the merge of 1 and 2 is the same in 3 and 4,
-        # no new manifest will be created and the manifest group will
-        # be empty during the pull
-        source.manifestheader()
-        repo.manifest.addgroup(source, revmap, trp)
-        repo.ui.progress(_('manifests'), None)
-
-        needfiles = {}
-        if repo.ui.configbool('server', 'validate', default=False):
-            # validate incoming csets have their manifests
-            for cset in xrange(clstart, clend):
-                mfnode = repo.changelog.read(repo.changelog.node(cset))[0]
-                mfest = repo.manifest.readdelta(mfnode)
-                # store file nodes we must see
-                for f, n in mfest.iteritems():
-                    needfiles.setdefault(f, set()).add(n)
-
-        # process the files
-        repo.ui.status(_("adding file changes\n"))
-        source.callback = None
-        pr = prog(_('files'), efiles)
-        newrevs, newfiles = addchangegroupfiles(repo, source, revmap, trp, pr,
-                                                needfiles)
-        revisions += newrevs
-        files += newfiles
-
-        dh = 0
-        if oldheads:
-            heads = cl.heads()
-            dh = len(heads) - len(oldheads)
-            for h in heads:
-                if h not in oldheads and repo[h].closesbranch():
-                    dh -= 1
-        htext = ""
-        if dh:
-            htext = _(" (%+d heads)") % dh
-
-        repo.ui.status(_("added %d changesets"
-                         " with %d changes to %d files%s\n")
-                         % (changesets, revisions, files, htext))
-        repo.invalidatevolatilesets()
-
-        if changesets > 0:
-            p = lambda: tr.writepending() and repo.root or ""
-            if 'node' not in tr.hookargs:
-                tr.hookargs['node'] = hex(cl.node(clstart))
-                hookargs = dict(tr.hookargs)
-            else:
-                hookargs = dict(tr.hookargs)
-                hookargs['node'] = hex(cl.node(clstart))
-            repo.hook('pretxnchangegroup', throw=True, pending=p, **hookargs)
-
-        added = [cl.node(r) for r in xrange(clstart, clend)]
-        publishing = repo.publishing()
-        if srctype in ('push', 'serve'):
-            # Old servers can not push the boundary themselves.
-            # New servers won't push the boundary if changeset already
-            # exists locally as secret
-            #
-            # We should not use added here but the list of all change in
-            # the bundle
-            if publishing:
-                phases.advanceboundary(repo, tr, phases.public, srccontent)
-            else:
-                # Those changesets have been pushed from the outside, their
-                # phases are going to be pushed alongside. Therefor
-                # `targetphase` is ignored.
-                phases.advanceboundary(repo, tr, phases.draft, srccontent)
-                phases.retractboundary(repo, tr, phases.draft, added)
-        elif srctype != 'strip':
-            # publishing only alter behavior during push
-            #
-            # strip should not touch boundary at all
-            phases.retractboundary(repo, tr, targetphase, added)
-
-        if changesets > 0:
-            if srctype != 'strip':
-                # During strip, branchcache is invalid but coming call to
-                # `destroyed` will repair it.
-                # In other case we can safely update cache on disk.
-                branchmap.updatecache(repo.filtered('served'))
-
-            def runhooks():
-                # These hooks run when the lock releases, not when the
-                # transaction closes. So it's possible for the changelog
-                # to have changed since we last saw it.
-                if clstart >= len(repo):
-                    return
-
-                # forcefully update the on-disk branch cache
-                repo.ui.debug("updating the branch cache\n")
-                repo.hook("changegroup", **hookargs)
-
-                for n in added:
-                    args = hookargs.copy()
-                    args['node'] = hex(n)
-                    repo.hook("incoming", **args)
-
-                newheads = [h for h in repo.heads() if h not in oldheads]
-                repo.ui.log("incoming",
-                            "%s incoming changes - new heads: %s\n",
-                            len(added),
-                            ', '.join([hex(c[:6]) for c in newheads]))
-
-            tr.addpostclose('changegroup-runhooks-%020i' % clstart,
-                            lambda tr: repo._afterlock(runhooks))
-
-        tr.close()
-
-    finally:
-        tr.release()
-        repo.ui.flush()
-    # never return 0 here:
-    if dh < 0:
-        return dh - 1
-    else:
-        return dh + 1
+    return source.apply(repo, srctype, url, emptyok=emptyok,
+                        targetphase=targetphase, expectedtotal=expectedtotal)