copies-rust: move CPU-heavy Rust processing into a child thread
… that runs in parallel with the parent thread fetching data.
This can be disabled through a new config. CLI example:
hg --config=devel.copy-tracing.multi-thread=no
For now both threads use the GIL, later commits will reduce this.
Differential Revision: https://phab.mercurial-scm.org/D9684
--- a/mercurial/configitems.py Tue Jan 05 21:02:00 2021 +0100
+++ b/mercurial/configitems.py Wed Jan 06 14:09:01 2021 +0100
@@ -700,6 +700,11 @@
)
coreconfigitem(
b'devel',
+ b'copy-tracing.multi-thread',
+ default=True,
+)
+coreconfigitem(
+ b'devel',
b'debug.extensions',
default=False,
)
--- a/mercurial/copies.py Tue Jan 05 21:02:00 2021 +0100
+++ b/mercurial/copies.py Wed Jan 06 14:09:01 2021 +0100
@@ -274,6 +274,7 @@
revs = cl.findmissingrevs(common=[a.rev()], heads=[b.rev()])
roots = set()
has_graph_roots = False
+ multi_thread = repo.ui.configbool(b'devel', b'copy-tracing.multi-thread')
# iterate over `only(B, A)`
for r in revs:
@@ -321,7 +322,13 @@
children_count[p] += 1
revinfo = _revinfo_getter(repo, match)
return _combine_changeset_copies(
- revs, children_count, b.rev(), revinfo, match, isancestor
+ revs,
+ children_count,
+ b.rev(),
+ revinfo,
+ match,
+ isancestor,
+ multi_thread,
)
else:
# When not using side-data, we will process the edges "from" the parent.
@@ -346,7 +353,7 @@
def _combine_changeset_copies(
- revs, children_count, targetrev, revinfo, match, isancestor
+ revs, children_count, targetrev, revinfo, match, isancestor, multi_thread
):
"""combine the copies information for each item of iterrevs
@@ -363,7 +370,7 @@
if rustmod is not None:
final_copies = rustmod.combine_changeset_copies(
- list(revs), children_count, targetrev, revinfo
+ list(revs), children_count, targetrev, revinfo, multi_thread
)
else:
isancestor = cached_is_ancestor(isancestor)
--- a/rust/Cargo.lock Tue Jan 05 21:02:00 2021 +0100
+++ b/rust/Cargo.lock Wed Jan 06 14:09:01 2021 +0100
@@ -331,6 +331,7 @@
version = "0.1.0"
dependencies = [
"cpython 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "crossbeam-channel 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)",
"env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"hg-core 0.1.0",
"libc 0.2.81 (registry+https://github.com/rust-lang/crates.io-index)",
--- a/rust/hg-cpython/Cargo.toml Tue Jan 05 21:02:00 2021 +0100
+++ b/rust/hg-cpython/Cargo.toml Wed Jan 06 14:09:01 2021 +0100
@@ -22,6 +22,7 @@
python3-bin = ["cpython/python3-sys"]
[dependencies]
+crossbeam-channel = "0.4"
hg-core = { path = "../hg-core"}
libc = '*'
log = "0.4.8"
--- a/rust/hg-cpython/src/copy_tracing.rs Tue Jan 05 21:02:00 2021 +0100
+++ b/rust/hg-cpython/src/copy_tracing.rs Wed Jan 06 14:09:01 2021 +0100
@@ -22,6 +22,7 @@
children_count: PyDict,
target_rev: Revision,
rev_info: PyObject,
+ multi_thread: bool,
) -> PyResult<PyDict> {
let children_count = children_count
.items(py)
@@ -42,20 +43,81 @@
Ok((rev, p1, p2, opt_bytes))
});
- let mut combine_changeset_copies =
- CombineChangesetCopies::new(children_count);
+ let path_copies = if !multi_thread {
+ let mut combine_changeset_copies =
+ CombineChangesetCopies::new(children_count);
+
+ for rev_info in revs_info {
+ let (rev, p1, p2, opt_bytes) = rev_info?;
+ let files = match &opt_bytes {
+ Some(bytes) => ChangedFiles::new(bytes.data(py)),
+ // Python None was extracted to Option::None,
+ // meaning there was no copy data.
+ None => ChangedFiles::new_empty(),
+ };
+
+ combine_changeset_copies.add_revision(rev, p1, p2, files)
+ }
+ combine_changeset_copies.finish(target_rev)
+ } else {
+ // Use a bounded channel to provide back-pressure:
+ // if the child thread is slower to process revisions than this thread
+ // is to gather data for them, an unbounded channel would keep
+ // growing and eat memory.
+ //
+ // TODO: tweak the bound?
+ let (rev_info_sender, rev_info_receiver) =
+ crossbeam_channel::bounded::<RevInfo>(1000);
- for rev_info in revs_info {
- let (rev, p1, p2, opt_bytes) = rev_info?;
- let files = match &opt_bytes {
- Some(bytes) => ChangedFiles::new(bytes.data(py)),
- // value was presumably None, meaning they was no copy data.
- None => ChangedFiles::new_empty(),
- };
+ // Start a thread that does CPU-heavy processing in parallel with the
+ // loop below.
+ //
+ // If the parent thread panics, `rev_info_sender` will be dropped and
+ // “disconnected”. `rev_info_receiver` will be notified of this and
+ // exit its own loop.
+ let thread = std::thread::spawn(move || {
+ let mut combine_changeset_copies =
+ CombineChangesetCopies::new(children_count);
+ for (rev, p1, p2, opt_bytes) in rev_info_receiver {
+ let gil = Python::acquire_gil();
+ let py = gil.python();
+ let files = match &opt_bytes {
+ Some(raw) => ChangedFiles::new(raw.data(py)),
+ // Python None was extracted to Option::None,
+ // meaning there was no copy data.
+ None => ChangedFiles::new_empty(),
+ };
+ combine_changeset_copies.add_revision(rev, p1, p2, files)
+ }
+
+ combine_changeset_copies.finish(target_rev)
+ });
- combine_changeset_copies.add_revision(rev, p1, p2, files)
- }
- let path_copies = combine_changeset_copies.finish(target_rev);
+ for rev_info in revs_info {
+ let (rev, p1, p2, opt_bytes) = rev_info?;
+
+ // We’d prefer to avoid the child thread calling into Python code,
+ // but this avoids a potential deadlock on the GIL if it does:
+ py.allow_threads(|| {
+ rev_info_sender.send((rev, p1, p2, opt_bytes)).expect(
+ "combine_changeset_copies: channel is disconnected",
+ );
+ });
+ }
+ // We’d prefer to avoid the child thread calling into Python code,
+ // but this avoids a potential deadlock on the GIL if it does:
+ py.allow_threads(|| {
+ // Disconnect the channel to signal the child thread to stop:
+ // the `for … in rev_info_receiver` loop will end.
+ drop(rev_info_sender);
+
+ // Wait for the child thread to stop, and propagate any panic.
+ thread.join().unwrap_or_else(|panic_payload| {
+ std::panic::resume_unwind(panic_payload)
+ })
+ })
+ };
+
let out = PyDict::new(py);
for (dest, source) in path_copies.into_iter() {
out.set_item(
@@ -84,7 +146,8 @@
revs: PyList,
children: PyDict,
target_rev: Revision,
- rev_info: PyObject
+ rev_info: PyObject,
+ multi_thread: bool
)
),
)?;