diff mercurial/changegroup.py @ 46718:ba8e508a8e69

sidedata-exchange: rewrite sidedata on-the-fly whenever possible When a A exchanges with B, the difference of their supported sidedata categories is made, and the responsibility is always with the client to generated it: - If A pushes to B and B requires category `foo` that A does not have, A will need to generate it when sending it to B. - If A pulls from B and A needs category `foo`, it will generate `foo` before the end of the transaction. - Any category that is not required is removed. If peers are not compatible, abort. It is forbidden to rewrite sidedata for a rev that already has sidedata, since that would introduce unreachable (garbage) data in the data file, something we're not prepared for yet. Differential Revision: https://phab.mercurial-scm.org/D10032
author Raphaël Gomès <rgomes@octobus.net>
date Fri, 19 Feb 2021 11:24:50 +0100
parents 45f0d5297698
children 6266d19556ad
line wrap: on
line diff
--- a/mercurial/changegroup.py	Mon Feb 15 11:08:28 2021 +0100
+++ b/mercurial/changegroup.py	Fri Feb 19 11:24:50 2021 +0100
@@ -7,6 +7,7 @@
 
 from __future__ import absolute_import
 
+import collections
 import os
 import struct
 import weakref
@@ -252,7 +253,7 @@
                     pos = next
             yield closechunk()
 
-    def _unpackmanifests(self, repo, revmap, trp, prog):
+    def _unpackmanifests(self, repo, revmap, trp, prog, addrevisioncb=None):
         self.callback = prog.increment
         # no need to check for empty manifest group here:
         # if the result of the merge of 1 and 2 is the same in 3 and 4,
@@ -260,7 +261,8 @@
         # be empty during the pull
         self.manifestheader()
         deltas = self.deltaiter()
-        repo.manifestlog.getstorage(b'').addgroup(deltas, revmap, trp)
+        storage = repo.manifestlog.getstorage(b'')
+        storage.addgroup(deltas, revmap, trp, addrevisioncb=addrevisioncb)
         prog.complete()
         self.callback = None
 
@@ -369,6 +371,13 @@
             efilesset = None
             self.callback = None
 
+            # Keep track of the (non-changelog) revlogs we've updated and their
+            # range of new revisions for sidedata rewrite.
+            # TODO do something more efficient than keeping the reference to
+            # the revlogs, especially memory-wise.
+            touched_manifests = {}
+            touched_filelogs = {}
+
             # pull off the manifest group
             repo.ui.status(_(b"adding manifests\n"))
             # We know that we'll never have more manifests than we had
@@ -376,7 +385,24 @@
             progress = repo.ui.makeprogress(
                 _(b'manifests'), unit=_(b'chunks'), total=changesets
             )
-            self._unpackmanifests(repo, revmap, trp, progress)
+            on_manifest_rev = None
+            if sidedata_helpers and b'manifest' in sidedata_helpers[1]:
+
+                def on_manifest_rev(manifest, rev):
+                    range = touched_manifests.get(manifest)
+                    if not range:
+                        touched_manifests[manifest] = (rev, rev)
+                    else:
+                        assert rev == range[1] + 1
+                        touched_manifests[manifest] = (range[0], rev)
+
+            self._unpackmanifests(
+                repo,
+                revmap,
+                trp,
+                progress,
+                addrevisioncb=on_manifest_rev,
+            )
 
             needfiles = {}
             if repo.ui.configbool(b'server', b'validate'):
@@ -390,12 +416,37 @@
                     for f, n in pycompat.iteritems(mfest):
                         needfiles.setdefault(f, set()).add(n)
 
+            on_filelog_rev = None
+            if sidedata_helpers and b'filelog' in sidedata_helpers[1]:
+
+                def on_filelog_rev(filelog, rev):
+                    range = touched_filelogs.get(filelog)
+                    if not range:
+                        touched_filelogs[filelog] = (rev, rev)
+                    else:
+                        assert rev == range[1] + 1
+                        touched_filelogs[filelog] = (range[0], rev)
+
             # process the files
             repo.ui.status(_(b"adding file changes\n"))
             newrevs, newfiles = _addchangegroupfiles(
-                repo, self, revmap, trp, efiles, needfiles
+                repo,
+                self,
+                revmap,
+                trp,
+                efiles,
+                needfiles,
+                addrevisioncb=on_filelog_rev,
             )
 
+            if sidedata_helpers:
+                if b'changelog' in sidedata_helpers[1]:
+                    cl.rewrite_sidedata(sidedata_helpers, clstart, clend - 1)
+                for mf, (startrev, endrev) in touched_manifests.items():
+                    mf.rewrite_sidedata(sidedata_helpers, startrev, endrev)
+                for fl, (startrev, endrev) in touched_filelogs.items():
+                    fl.rewrite_sidedata(sidedata_helpers, startrev, endrev)
+
             # making sure the value exists
             tr.changes.setdefault(b'changegroup-count-changesets', 0)
             tr.changes.setdefault(b'changegroup-count-revisions', 0)
@@ -559,14 +610,18 @@
         node, p1, p2, deltabase, cs, flags = headertuple
         return node, p1, p2, deltabase, cs, flags
 
-    def _unpackmanifests(self, repo, revmap, trp, prog):
-        super(cg3unpacker, self)._unpackmanifests(repo, revmap, trp, prog)
+    def _unpackmanifests(self, repo, revmap, trp, prog, addrevisioncb=None):
+        super(cg3unpacker, self)._unpackmanifests(
+            repo, revmap, trp, prog, addrevisioncb=addrevisioncb
+        )
         for chunkdata in iter(self.filelogheader, {}):
             # If we get here, there are directory manifests in the changegroup
             d = chunkdata[b"filename"]
             repo.ui.debug(b"adding %s revisions\n" % d)
             deltas = self.deltaiter()
-            if not repo.manifestlog.getstorage(d).addgroup(deltas, revmap, trp):
+            if not repo.manifestlog.getstorage(d).addgroup(
+                deltas, revmap, trp, addrevisioncb=addrevisioncb
+            ):
                 raise error.Abort(_(b"received dir revlog group is empty"))
 
 
@@ -1793,7 +1848,15 @@
     return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
 
 
-def _addchangegroupfiles(repo, source, revmap, trp, expectedfiles, needfiles):
+def _addchangegroupfiles(
+    repo,
+    source,
+    revmap,
+    trp,
+    expectedfiles,
+    needfiles,
+    addrevisioncb=None,
+):
     revisions = 0
     files = 0
     progress = repo.ui.makeprogress(
@@ -1808,7 +1871,13 @@
         o = len(fl)
         try:
             deltas = source.deltaiter()
-            if not fl.addgroup(deltas, revmap, trp):
+            added = fl.addgroup(
+                deltas,
+                revmap,
+                trp,
+                addrevisioncb=addrevisioncb,
+            )
+            if not added:
                 raise error.Abort(_(b"received file revlog group is empty"))
         except error.CensoredBaseError as e:
             raise error.Abort(_(b"received delta base is censored: %s") % e)