changeset 50670:9caa860dcbec

stream-clone: implement decidated `get_streams` method for revlog For revlog, we can do better using the maximum linkrev expected. This approach open the way to dealing with a much larger set of non-trivial changes, like splitting of inline revlogs. We will actually tackle this issue in the next changesets (thanks to this one).
author Pierre-Yves David <pierre-yves.david@octobus.net>
date Mon, 29 May 2023 14:07:58 +0200
parents 5e60abf811f3
children 5460424092e2
files mercurial/revlog.py mercurial/store.py mercurial/streamclone.py tests/test-clone-stream-revlog-split.t
diffstat 4 files changed, 133 insertions(+), 4 deletions(-) [+]
line wrap: on
line diff
--- a/mercurial/revlog.py	Sun May 28 05:52:58 2023 +0200
+++ b/mercurial/revlog.py	Mon May 29 14:07:58 2023 +0200
@@ -506,6 +506,74 @@
         except FileNotFoundError:
             return b''
 
+    def get_streams(self, max_linkrev):
+        n = len(self)
+        index = self.index
+        while n > 0:
+            linkrev = index[n - 1][4]
+            if linkrev < max_linkrev:
+                break
+            # note: this loop will rarely go through multiple iterations, since
+            # it only traverses commits created during the current streaming
+            # pull operation.
+            #
+            # If this become a problem, using a binary search should cap the
+            # runtime of this.
+            n = n - 1
+        if n == 0:
+            # no data to send
+            return []
+        index_size = n * index.entry_size
+        data_size = self.end(n - 1)
+
+        # XXX we might have been split (or stripped) since the object
+        # initialization, We need to close this race too, but having a way to
+        # pre-open the file we feed to the revlog and never closing them before
+        # we are done streaming.
+
+        if self._inline:
+
+            def get_stream():
+                with self._indexfp() as fp:
+                    yield None
+                    size = index_size + data_size
+                    if size <= 65536:
+                        yield fp.read(size)
+                    else:
+                        yield from util.filechunkiter(fp, limit=size)
+
+            inline_stream = get_stream()
+            next(inline_stream)
+            return [
+                (self._indexfile, inline_stream, index_size + data_size),
+            ]
+        else:
+
+            def get_index_stream():
+                with self._indexfp() as fp:
+                    yield None
+                    if index_size <= 65536:
+                        yield fp.read(index_size)
+                    else:
+                        yield from util.filechunkiter(fp, limit=index_size)
+
+            def get_data_stream():
+                with self._datafp() as fp:
+                    yield None
+                    if data_size <= 65536:
+                        yield fp.read(data_size)
+                    else:
+                        yield from util.filechunkiter(fp, limit=data_size)
+
+            index_stream = get_index_stream()
+            next(index_stream)
+            data_stream = get_data_stream()
+            next(data_stream)
+            return [
+                (self._datafile, data_stream, data_size),
+                (self._indexfile, index_stream, index_size),
+            ]
+
     def _loadindex(self, docket=None):
 
         new_header, mmapindexthreshold, force_nodemap = self._init_opts()
--- a/mercurial/store.py	Sun May 28 05:52:58 2023 +0200
+++ b/mercurial/store.py	Mon May 29 14:07:58 2023 +0200
@@ -509,7 +509,13 @@
     def files(self) -> List[StoreFile]:
         raise NotImplementedError
 
-    def get_streams(self, vfs, copies=None):
+    def get_streams(
+        self,
+        repo=None,
+        vfs=None,
+        copies=None,
+        max_changeset=None,
+    ):
         """return a list of data stream associated to files for this entry
 
         return [(unencoded_file_path, content_iterator, content_size), …]
@@ -605,6 +611,57 @@
                 self._files.append(StoreFile(unencoded_path=path, **data))
         return self._files
 
+    def get_streams(
+        self,
+        repo=None,
+        vfs=None,
+        copies=None,
+        max_changeset=None,
+    ):
+        if repo is None or max_changeset is None:
+            return super().get_streams(
+                repo=repo,
+                vfs=vfs,
+                copies=copies,
+                max_changeset=max_changeset,
+            )
+        if any(k.endswith(b'.idx') for k in self._details.keys()):
+            # This use revlog-v2, ignore for now
+            return super().get_streams(
+                repo=repo,
+                vfs=vfs,
+                copies=copies,
+                max_changeset=max_changeset,
+            )
+        name_to_ext = {}
+        for ext in self._details.keys():
+            name_to_ext[self._path_prefix + ext] = ext
+        name_to_size = {}
+        for f in self.files():
+            name_to_size[f.unencoded_path] = f.file_size(None)
+        stream = [
+            f.get_stream(vfs, copies)
+            for f in self.files()
+            if name_to_ext[f.unencoded_path] not in (b'.d', b'.i')
+        ]
+
+        rl = self.get_revlog_instance(repo).get_revlog()
+        rl_stream = rl.get_streams(max_changeset)
+        for name, s, size in rl_stream:
+            if name_to_size.get(name, 0) != size:
+                msg = _(b"expected %d bytes but %d provided for %s")
+                msg %= name_to_size.get(name, 0), size, name
+                raise error.Abort(msg)
+        stream.extend(rl_stream)
+        files = self.files()
+        assert len(stream) == len(files), (
+            stream,
+            files,
+            self._path_prefix,
+            self.target_id,
+        )
+        return stream
+
     def get_revlog_instance(self, repo):
         """Obtain a revlog instance from this store entry
 
--- a/mercurial/streamclone.py	Sun May 28 05:52:58 2023 +0200
+++ b/mercurial/streamclone.py	Mon May 29 14:07:58 2023 +0200
@@ -635,6 +635,7 @@
     # translate the vfs one
     entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries]
 
+    max_linkrev = len(repo)
     file_count = totalfilesize = 0
     # record the expected size of every file
     for k, vfs, e in entries:
@@ -657,7 +658,10 @@
         totalbytecount = 0
 
         for src, vfs, e in entries:
-            for name, stream, size in e.get_streams(vfs, copies=copy):
+            entry_streams = e.get_streams(
+                repo=repo, vfs=vfs, copies=copy, max_changeset=max_linkrev
+            )
+            for name, stream, size in entry_streams:
                 yield src
                 yield util.uvarintencode(len(name))
                 yield util.uvarintencode(size)
--- a/tests/test-clone-stream-revlog-split.t	Sun May 28 05:52:58 2023 +0200
+++ b/tests/test-clone-stream-revlog-split.t	Mon May 29 14:07:58 2023 +0200
@@ -86,10 +86,10 @@
 Check everything is fine
 
   $ cat client.log
-  remote: abort: unexpected error: clone could only read 256 bytes from data/some-file.i, but expected 1259 bytes (known-bad-output !)
+  remote: abort: unexpected error: expected 0 bytes but 1067 provided for data/some-file.d (known-bad-output !)
   abort: pull failed on remote (known-bad-output !)
   $ tail -2 errors.log
-  mercurial.error.Abort: clone could only read 256 bytes from data/some-file.i, but expected 1259 bytes (known-bad-output !)
+  mercurial.error.Abort: expected 0 bytes but 1067 provided for data/some-file.d (known-bad-output !)
    (known-bad-output !)
   $ hg -R clone-while-split verify
   checking changesets (missing-correct-output !)