# HG changeset patch # User Pierre-Yves David # Date 1685362078 -7200 # Node ID 9caa860dcbeca5f5263ba259faa02586c11fe58e # Parent 5e60abf811f332190bc8878d920eb08d0803cafd stream-clone: implement decidated `get_streams` method for revlog For revlog, we can do better using the maximum linkrev expected. This approach open the way to dealing with a much larger set of non-trivial changes, like splitting of inline revlogs. We will actually tackle this issue in the next changesets (thanks to this one). diff -r 5e60abf811f3 -r 9caa860dcbec mercurial/revlog.py --- a/mercurial/revlog.py Sun May 28 05:52:58 2023 +0200 +++ b/mercurial/revlog.py Mon May 29 14:07:58 2023 +0200 @@ -506,6 +506,74 @@ except FileNotFoundError: return b'' + def get_streams(self, max_linkrev): + n = len(self) + index = self.index + while n > 0: + linkrev = index[n - 1][4] + if linkrev < max_linkrev: + break + # note: this loop will rarely go through multiple iterations, since + # it only traverses commits created during the current streaming + # pull operation. + # + # If this become a problem, using a binary search should cap the + # runtime of this. + n = n - 1 + if n == 0: + # no data to send + return [] + index_size = n * index.entry_size + data_size = self.end(n - 1) + + # XXX we might have been split (or stripped) since the object + # initialization, We need to close this race too, but having a way to + # pre-open the file we feed to the revlog and never closing them before + # we are done streaming. + + if self._inline: + + def get_stream(): + with self._indexfp() as fp: + yield None + size = index_size + data_size + if size <= 65536: + yield fp.read(size) + else: + yield from util.filechunkiter(fp, limit=size) + + inline_stream = get_stream() + next(inline_stream) + return [ + (self._indexfile, inline_stream, index_size + data_size), + ] + else: + + def get_index_stream(): + with self._indexfp() as fp: + yield None + if index_size <= 65536: + yield fp.read(index_size) + else: + yield from util.filechunkiter(fp, limit=index_size) + + def get_data_stream(): + with self._datafp() as fp: + yield None + if data_size <= 65536: + yield fp.read(data_size) + else: + yield from util.filechunkiter(fp, limit=data_size) + + index_stream = get_index_stream() + next(index_stream) + data_stream = get_data_stream() + next(data_stream) + return [ + (self._datafile, data_stream, data_size), + (self._indexfile, index_stream, index_size), + ] + def _loadindex(self, docket=None): new_header, mmapindexthreshold, force_nodemap = self._init_opts() diff -r 5e60abf811f3 -r 9caa860dcbec mercurial/store.py --- a/mercurial/store.py Sun May 28 05:52:58 2023 +0200 +++ b/mercurial/store.py Mon May 29 14:07:58 2023 +0200 @@ -509,7 +509,13 @@ def files(self) -> List[StoreFile]: raise NotImplementedError - def get_streams(self, vfs, copies=None): + def get_streams( + self, + repo=None, + vfs=None, + copies=None, + max_changeset=None, + ): """return a list of data stream associated to files for this entry return [(unencoded_file_path, content_iterator, content_size), …] @@ -605,6 +611,57 @@ self._files.append(StoreFile(unencoded_path=path, **data)) return self._files + def get_streams( + self, + repo=None, + vfs=None, + copies=None, + max_changeset=None, + ): + if repo is None or max_changeset is None: + return super().get_streams( + repo=repo, + vfs=vfs, + copies=copies, + max_changeset=max_changeset, + ) + if any(k.endswith(b'.idx') for k in self._details.keys()): + # This use revlog-v2, ignore for now + return super().get_streams( + repo=repo, + vfs=vfs, + copies=copies, + max_changeset=max_changeset, + ) + name_to_ext = {} + for ext in self._details.keys(): + name_to_ext[self._path_prefix + ext] = ext + name_to_size = {} + for f in self.files(): + name_to_size[f.unencoded_path] = f.file_size(None) + stream = [ + f.get_stream(vfs, copies) + for f in self.files() + if name_to_ext[f.unencoded_path] not in (b'.d', b'.i') + ] + + rl = self.get_revlog_instance(repo).get_revlog() + rl_stream = rl.get_streams(max_changeset) + for name, s, size in rl_stream: + if name_to_size.get(name, 0) != size: + msg = _(b"expected %d bytes but %d provided for %s") + msg %= name_to_size.get(name, 0), size, name + raise error.Abort(msg) + stream.extend(rl_stream) + files = self.files() + assert len(stream) == len(files), ( + stream, + files, + self._path_prefix, + self.target_id, + ) + return stream + def get_revlog_instance(self, repo): """Obtain a revlog instance from this store entry diff -r 5e60abf811f3 -r 9caa860dcbec mercurial/streamclone.py --- a/mercurial/streamclone.py Sun May 28 05:52:58 2023 +0200 +++ b/mercurial/streamclone.py Mon May 29 14:07:58 2023 +0200 @@ -635,6 +635,7 @@ # translate the vfs one entries = [(vfs_key, vfsmap[vfs_key], e) for (vfs_key, e) in entries] + max_linkrev = len(repo) file_count = totalfilesize = 0 # record the expected size of every file for k, vfs, e in entries: @@ -657,7 +658,10 @@ totalbytecount = 0 for src, vfs, e in entries: - for name, stream, size in e.get_streams(vfs, copies=copy): + entry_streams = e.get_streams( + repo=repo, vfs=vfs, copies=copy, max_changeset=max_linkrev + ) + for name, stream, size in entry_streams: yield src yield util.uvarintencode(len(name)) yield util.uvarintencode(size) diff -r 5e60abf811f3 -r 9caa860dcbec tests/test-clone-stream-revlog-split.t --- a/tests/test-clone-stream-revlog-split.t Sun May 28 05:52:58 2023 +0200 +++ b/tests/test-clone-stream-revlog-split.t Mon May 29 14:07:58 2023 +0200 @@ -86,10 +86,10 @@ Check everything is fine $ cat client.log - remote: abort: unexpected error: clone could only read 256 bytes from data/some-file.i, but expected 1259 bytes (known-bad-output !) + remote: abort: unexpected error: expected 0 bytes but 1067 provided for data/some-file.d (known-bad-output !) abort: pull failed on remote (known-bad-output !) $ tail -2 errors.log - mercurial.error.Abort: clone could only read 256 bytes from data/some-file.i, but expected 1259 bytes (known-bad-output !) + mercurial.error.Abort: expected 0 bytes but 1067 provided for data/some-file.d (known-bad-output !) (known-bad-output !) $ hg -R clone-while-split verify checking changesets (missing-correct-output !)