stream-clone: implement decidated `get_streams` method for revlog
For revlog, we can do better using the maximum linkrev expected. This approach
open the way to dealing with a much larger set of non-trivial changes, like
splitting of inline revlogs.
We will actually tackle this issue in the next changesets (thanks to this one).
--- a/mercurial/revlog.py Sun May 28 05:52:58 2023 +0200
+++ b/mercurial/revlog.py Mon May 29 14:07:58 2023 +0200
@@ -506,6 +506,74 @@
except FileNotFoundError:
return b''
+ def get_streams(self, max_linkrev):
+ n = len(self)
+ index = self.index
+ while n > 0:
+ linkrev = index[n - 1][4]
+ if linkrev < max_linkrev:
+ break
+ # note: this loop will rarely go through multiple iterations, since
+ # it only traverses commits created during the current streaming
+ # pull operation.
+ #
+ # If this become a problem, using a binary search should cap the
+ # runtime of this.
+ n = n - 1
+ if n == 0:
+ # no data to send
+ return []
+ index_size = n * index.entry_size
+ data_size = self.end(n - 1)
+
+ # XXX we might have been split (or stripped) since the object
+ # initialization, We need to close this race too, but having a way to
+ # pre-open the file we feed to the revlog and never closing them before
+ # we are done streaming.
+
+ if self._inline:
+
+ def get_stream():
+ with self._indexfp() as fp:
+ yield None
+ size = index_size + data_size
+ if size <= 65536:
+ yield fp.read(size)
+ else:
+ yield from util.filechunkiter(fp, limit=size)
+
+ inline_stream = get_stream()
+ next(inline_stream)
+ return [
+ (self._indexfile, inline_stream, index_size + data_size),
+ ]
+ else:
+
+ def get_index_stream():
+ with self._indexfp() as fp:
+ yield None
+ if index_size <= 65536:
+ yield fp.read(index_size)
+ else:
+ yield from util.filechunkiter(fp, limit=index_size)
+
+ def get_data_stream():
+ with self._datafp() as fp:
+ yield None
+ if data_size <= 65536:
+ yield fp.read(data_size)
+ else:
+ yield from util.filechunkiter(fp, limit=data_size)
+
+ index_stream = get_index_stream()
+ next(index_stream)
+ data_stream = get_data_stream()
+ next(data_stream)
+ return [
+ (self._datafile, data_stream, data_size),
+ (self._indexfile, index_stream, index_size),
+ ]
+
def _loadindex(self, docket=None):
new_header, mmapindexthreshold, force_nodemap = self._init_opts()
--- a/mercurial/store.py Sun May 28 05:52:58 2023 +0200
+++ b/mercurial/store.py Mon May 29 14:07:58 2023 +0200
@@ -509,7 +509,13 @@
def files(self) -> List[StoreFile]:
raise NotImplementedError
- def get_streams(self, vfs, copies=None):
+ def get_streams(
+ self,
+ repo=None,
+ vfs=None,
+ copies=None,
+ max_changeset=None,
+ ):
"""return a list of data stream associated to files for this entry
return [(unencoded_file_path, content_iterator, content_size), …]
@@ -605,6 +611,57 @@
self._files.append(StoreFile(unencoded_path=path, **data))
return self._files
+ def get_streams(
+ self,
+ repo=None,
+ vfs=None,
+ copies=None,
+ max_changeset=None,
+ ):
+ if repo is None or max_changeset is None:
+ return super().get_streams(
+ repo=repo,
+ vfs=vfs,
+ copies=copies,
+ max_changeset=max_changeset,
+ )
+ if any(k.endswith(b'.idx') for k in self._details.keys()):
+ # This use revlog-v2, ignore for now
+ return super().get_streams(
+ repo=repo,
+ vfs=vfs,
+ copies=copies,
+ max_changeset=max_changeset,
+ )
+ name_to_ext = {}
+ for ext in self._details.keys():
+ name_to_ext[self._path_prefix + ext] = ext
+ name_to_size = {}
+ for f in self.files():
+ name_to_size[f.unencoded_path] = f.file_size(None)
+ stream = [
+ f.get_stream(vfs, copies)
+ for f in self.files()
+ if name_to_ext[f.unencoded_path] not in (b'.d', b'.i')
+ ]
+
+ rl = self.get_revlog_instance(repo).get_revlog()
+ rl_stream = rl.get_streams(max_changeset)
+ for name, s, size in rl_stream:
+ if name_to_size.get(name, 0) != size:
+ msg = _(b"expected %d bytes but %d provided for %s")
+ msg %= name_to_size.get(name, 0), size, name
+ raise error.Abort(msg)
+ stream.extend(rl_stream)
+ files = self.files()
+ assert len(stream) == len(files), (
+ stream,
+ files,
+ self._path_prefix,
+ self.target_id,
+ )
+ return stream
+
def get_revlog_instance(self, repo):
"""Obtain a revlog instance from this store entry
--- a/mercurial/streamclone.py Sun May 28 05:52:58 2023 +0200
+++ b/mercurial/streamclone.py Mon May 29 14:07:58 2023 +0200
@@ -635,6 +635,7 @@
# translate the vfs one
entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries]
+ max_linkrev = len(repo)
file_count = totalfilesize = 0
# record the expected size of every file
for k, vfs, e in entries:
@@ -657,7 +658,10 @@
totalbytecount = 0
for src, vfs, e in entries:
- for name, stream, size in e.get_streams(vfs, copies=copy):
+ entry_streams = e.get_streams(
+ repo=repo, vfs=vfs, copies=copy, max_changeset=max_linkrev
+ )
+ for name, stream, size in entry_streams:
yield src
yield util.uvarintencode(len(name))
yield util.uvarintencode(size)
--- a/tests/test-clone-stream-revlog-split.t Sun May 28 05:52:58 2023 +0200
+++ b/tests/test-clone-stream-revlog-split.t Mon May 29 14:07:58 2023 +0200
@@ -86,10 +86,10 @@
Check everything is fine
$ cat client.log
- remote: abort: unexpected error: clone could only read 256 bytes from data/some-file.i, but expected 1259 bytes (known-bad-output !)
+ remote: abort: unexpected error: expected 0 bytes but 1067 provided for data/some-file.d (known-bad-output !)
abort: pull failed on remote (known-bad-output !)
$ tail -2 errors.log
- mercurial.error.Abort: clone could only read 256 bytes from data/some-file.i, but expected 1259 bytes (known-bad-output !)
+ mercurial.error.Abort: expected 0 bytes but 1067 provided for data/some-file.d (known-bad-output !)
(known-bad-output !)
$ hg -R clone-while-split verify
checking changesets (missing-correct-output !)