revlogv2: track current index size in the docket
authorPierre-Yves David <pierre-yves.david@octobus.net>
Mon, 03 May 2021 12:34:52 +0200
changeset 47238 6597255a4f94
parent 47237 864f4ebe6a8d
child 47239 682f09857d69
revlogv2: track current index size in the docket This help use to fix transaction safety on repos. See next changesets for details. Differential Revision: https://phab.mercurial-scm.org/D10628
mercurial/configitems.py
mercurial/localrepo.py
mercurial/revlog.py
mercurial/revlogutils/docket.py
--- a/mercurial/configitems.py	Mon May 03 12:34:41 2021 +0200
+++ b/mercurial/configitems.py	Mon May 03 12:34:52 2021 +0200
@@ -1157,6 +1157,7 @@
 #      - for censoring operation
 #      - for stripping operation
 #      - for rollback operation
+# * proper streaming (race free) of the docket file
 # * store the data size in the docket to simplify sidedata rewrite.
 # * track garbage data to evemtually allow rewriting -existing- sidedata.
 # * Exchange-wise, we will also need to do something more efficient than
--- a/mercurial/localrepo.py	Mon May 03 12:34:41 2021 +0200
+++ b/mercurial/localrepo.py	Mon May 03 12:34:52 2021 +0200
@@ -739,6 +739,8 @@
 
     if requirementsmod.REVLOGV2_REQUIREMENT in requirements:
         features.add(repository.REPO_FEATURE_SIDE_DATA)
+        # the revlogv2 docket introduced race condition that we need to fix
+        features.discard(repository.REPO_FEATURE_STREAM_CLONE)
 
     # The cache vfs is used to manage cache files.
     cachevfs = vfsmod.vfs(cachepath, cacheaudited=True)
--- a/mercurial/revlog.py	Mon May 03 12:34:41 2021 +0200
+++ b/mercurial/revlog.py	Mon May 03 12:34:52 2021 +0200
@@ -453,7 +453,7 @@
         force_nodemap = opts.get(b'devel-force-nodemap', False)
         return new_header, mmapindexthreshold, force_nodemap
 
-    def _get_data(self, filepath, mmap_threshold):
+    def _get_data(self, filepath, mmap_threshold, size=None):
         """return a file content with or without mmap
 
         If the file is missing return the empty string"""
@@ -462,10 +462,19 @@
                 if mmap_threshold is not None:
                     file_size = self.opener.fstat(fp).st_size
                     if file_size >= mmap_threshold:
+                        if size is not None:
+                            # avoid potentiel mmap crash
+                            size = min(file_size, size)
                         # TODO: should .close() to release resources without
                         # relying on Python GC
-                        return util.buffer(util.mmapread(fp))
-                return fp.read()
+                        if size is None:
+                            return util.buffer(util.mmapread(fp))
+                        else:
+                            return util.buffer(util.mmapread(fp, size))
+                if size is None:
+                    return fp.read()
+                else:
+                    return fp.read(size)
         except IOError as inst:
             if inst.errno != errno.ENOENT:
                 raise
@@ -518,7 +527,17 @@
             else:
                 self._docket = docketutil.parse_docket(self, entry_data)
             self._indexfile = self._docket.index_filepath()
-            index_data = self._get_data(self._indexfile, mmapindexthreshold)
+            index_data = b''
+            index_size = self._docket.index_end
+            if index_size > 0:
+                index_data = self._get_data(
+                    self._indexfile, mmapindexthreshold, size=index_size
+                )
+                if len(index_data) < index_size:
+                    msg = _(b'too few index data for %s: got %d, expected %d')
+                    msg %= (self.display_id, len(index_data), index_size)
+                    raise error.RevlogError(msg)
+
             self._inline = False
             # generaldelta implied by version 2 revlogs.
             self._generaldelta = True
@@ -619,7 +638,10 @@
             f = self.opener(
                 self._indexfile, mode=b"r+", checkambig=self._checkambig
             )
-            f.seek(0, os.SEEK_END)
+            if self._docket is None:
+                f.seek(0, os.SEEK_END)
+            else:
+                f.seek(self._docket.index_end, os.SEEK_SET)
             return f
         except IOError as inst:
             if inst.errno != errno.ENOENT:
@@ -2022,6 +2044,8 @@
                         header = self.index.pack_header(header)
                         e = header + e
                     fp.write(e)
+                if self._docket is not None:
+                    self._docket.index_end = fp.tell()
                 # the temp file replace the real index when we exit the context
                 # manager
 
@@ -2440,7 +2464,10 @@
             msg = b'adding revision outside `revlog._writing` context'
             raise error.ProgrammingError(msg)
         ifh, dfh = self._writinghandles
-        ifh.seek(0, os.SEEK_END)
+        if self._docket is None:
+            ifh.seek(0, os.SEEK_END)
+        else:
+            ifh.seek(self._docket.index_end, os.SEEK_SET)
         if dfh:
             dfh.seek(0, os.SEEK_END)
 
@@ -2463,6 +2490,9 @@
             if sidedata:
                 ifh.write(sidedata)
             self._enforceinlinesize(transaction)
+        if self._docket is not None:
+            self._docket.index_end = self._writinghandles[0].tell()
+
         nodemaputil.setup_persistent_nodemap(transaction, self)
 
     def addgroup(
@@ -2632,6 +2662,11 @@
             end += rev * self.index.entry_size
 
         transaction.add(self._indexfile, end)
+        if self._docket is not None:
+            # XXX we could, leverage the docket while stripping. However it is
+            # not powerfull enough at the time of this comment
+            self._docket.index_end = end
+            self._docket.write(transaction, stripping=True)
 
         # then reset internal state in memory to forget those revisions
         self._revisioncache = None
--- a/mercurial/revlogutils/docket.py	Mon May 03 12:34:41 2021 +0200
+++ b/mercurial/revlogutils/docket.py	Mon May 03 12:34:52 2021 +0200
@@ -28,36 +28,55 @@
 # * 4 bytes: revlog version
 #          |   This is mandatory as docket must be compatible with the previous
 #          |   revlog index header.
-S_HEADER = struct.Struct(constants.INDEX_HEADER.format)
+# * 8 bytes: size of index data
+S_HEADER = struct.Struct(constants.INDEX_HEADER.format + 'L')
 
 
 class RevlogDocket(object):
     """metadata associated with revlog"""
 
-    def __init__(self, revlog, version_header=None):
+    def __init__(self, revlog, version_header=None, index_end=0):
         self._version_header = version_header
         self._dirty = False
         self._radix = revlog.radix
         self._path = revlog._docket_file
         self._opener = revlog.opener
+        self._index_end = index_end
 
     def index_filepath(self):
         """file path to the current index file associated to this docket"""
         # very simplistic version at first
         return b"%s.idx" % self._radix
 
-    def write(self, transaction):
+    @property
+    def index_end(self):
+        return self._index_end
+
+    @index_end.setter
+    def index_end(self, new_size):
+        if new_size != self._index_end:
+            self._index_end = new_size
+            self._dirty = True
+
+    def write(self, transaction, stripping=False):
         """write the modification of disk if any
 
         This make the new content visible to all process"""
         if self._dirty:
-            transaction.addbackup(self._path, location=b'store')
+            if not stripping:
+                # XXX we could, leverage the docket while stripping. However it
+                # is not powerfull enough at the time of this comment
+                transaction.addbackup(self._path, location=b'store')
             with self._opener(self._path, mode=b'w', atomictemp=True) as f:
                 f.write(self._serialize())
             self._dirty = False
 
     def _serialize(self):
-        return S_HEADER.pack(self._version_header)
+        data = (
+            self._version_header,
+            self._index_end,
+        )
+        return S_HEADER.pack(*data)
 
 
 def default_docket(revlog, version_header):
@@ -72,9 +91,10 @@
 def parse_docket(revlog, data):
     """given some docket data return a docket object for the given revlog"""
     header = S_HEADER.unpack(data[: S_HEADER.size])
-    (version_header,) = header
+    version_header, index_size = header
     docket = RevlogDocket(
         revlog,
         version_header=version_header,
+        index_end=index_size,
     )
     return docket