Mercurial > hg
diff rust/hg-cpython/src/copy_tracing.rs @ 46588:47557ea79fc7
copies-rust: move CPU-heavy Rust processing into a child thread
… that runs in parallel with the parent thread fetching data.
This can be disabled through a new config. CLI example:
hg --config=devel.copy-tracing.multi-thread=no
For now both threads use the GIL, later commits will reduce this.
Differential Revision: https://phab.mercurial-scm.org/D9684
author | Simon Sapin <simon.sapin@octobus.net> |
---|---|
date | Wed, 06 Jan 2021 14:09:01 +0100 |
parents | cb4b0b0c6de4 |
children | 620c88fb42a2 |
line wrap: on
line diff
--- a/rust/hg-cpython/src/copy_tracing.rs Tue Jan 05 21:02:00 2021 +0100 +++ b/rust/hg-cpython/src/copy_tracing.rs Wed Jan 06 14:09:01 2021 +0100 @@ -22,6 +22,7 @@ children_count: PyDict, target_rev: Revision, rev_info: PyObject, + multi_thread: bool, ) -> PyResult<PyDict> { let children_count = children_count .items(py) @@ -42,20 +43,81 @@ Ok((rev, p1, p2, opt_bytes)) }); - let mut combine_changeset_copies = - CombineChangesetCopies::new(children_count); + let path_copies = if !multi_thread { + let mut combine_changeset_copies = + CombineChangesetCopies::new(children_count); + + for rev_info in revs_info { + let (rev, p1, p2, opt_bytes) = rev_info?; + let files = match &opt_bytes { + Some(bytes) => ChangedFiles::new(bytes.data(py)), + // Python None was extracted to Option::None, + // meaning there was no copy data. + None => ChangedFiles::new_empty(), + }; + + combine_changeset_copies.add_revision(rev, p1, p2, files) + } + combine_changeset_copies.finish(target_rev) + } else { + // Use a bounded channel to provide back-pressure: + // if the child thread is slower to process revisions than this thread + // is to gather data for them, an unbounded channel would keep + // growing and eat memory. + // + // TODO: tweak the bound? + let (rev_info_sender, rev_info_receiver) = + crossbeam_channel::bounded::<RevInfo>(1000); - for rev_info in revs_info { - let (rev, p1, p2, opt_bytes) = rev_info?; - let files = match &opt_bytes { - Some(bytes) => ChangedFiles::new(bytes.data(py)), - // value was presumably None, meaning they was no copy data. - None => ChangedFiles::new_empty(), - }; + // Start a thread that does CPU-heavy processing in parallel with the + // loop below. + // + // If the parent thread panics, `rev_info_sender` will be dropped and + // “disconnected”. `rev_info_receiver` will be notified of this and + // exit its own loop. + let thread = std::thread::spawn(move || { + let mut combine_changeset_copies = + CombineChangesetCopies::new(children_count); + for (rev, p1, p2, opt_bytes) in rev_info_receiver { + let gil = Python::acquire_gil(); + let py = gil.python(); + let files = match &opt_bytes { + Some(raw) => ChangedFiles::new(raw.data(py)), + // Python None was extracted to Option::None, + // meaning there was no copy data. + None => ChangedFiles::new_empty(), + }; + combine_changeset_copies.add_revision(rev, p1, p2, files) + } + + combine_changeset_copies.finish(target_rev) + }); - combine_changeset_copies.add_revision(rev, p1, p2, files) - } - let path_copies = combine_changeset_copies.finish(target_rev); + for rev_info in revs_info { + let (rev, p1, p2, opt_bytes) = rev_info?; + + // We’d prefer to avoid the child thread calling into Python code, + // but this avoids a potential deadlock on the GIL if it does: + py.allow_threads(|| { + rev_info_sender.send((rev, p1, p2, opt_bytes)).expect( + "combine_changeset_copies: channel is disconnected", + ); + }); + } + // We’d prefer to avoid the child thread calling into Python code, + // but this avoids a potential deadlock on the GIL if it does: + py.allow_threads(|| { + // Disconnect the channel to signal the child thread to stop: + // the `for … in rev_info_receiver` loop will end. + drop(rev_info_sender); + + // Wait for the child thread to stop, and propagate any panic. + thread.join().unwrap_or_else(|panic_payload| { + std::panic::resume_unwind(panic_payload) + }) + }) + }; + let out = PyDict::new(py); for (dest, source) in path_copies.into_iter() { out.set_item( @@ -84,7 +146,8 @@ revs: PyList, children: PyDict, target_rev: Revision, - rev_info: PyObject + rev_info: PyObject, + multi_thread: bool ) ), )?;