Mercurial > hg
changeset 52217:96b113d22b34 stable
rust-update: handle SIGINT from long-running update threads
The current code does not respond to ^C until after the Rust bit is finished
doing its work. This is expected, since Rust holds the GIL for the duration
of the call and does not call `PyErr_CheckSignals`. Freeing the GIL to do our
work does not really improve anything since the Rust threads are still going,
and the only way of cancelling a thread is by making it cooperate.
So we do the following:
- remember the SIGINT handler in hg-cpython and reset it after the call
into core (see inline comment in `update.rs` about this)
- make all update threads watch for a global `AtomicBool` being `true`,
and if so stop their work
- reset the global bool and exit early (i.e. before writing the dirstate)
- raise SIGINT from `hg-cpython` if update returns `InterruptReceived`
author | Raphaël Gomès <rgomes@octobus.net> |
---|---|
date | Tue, 12 Nov 2024 12:52:13 +0100 |
parents | fa58f4f97337 |
children | 9048a0d782e1 |
files | rust/Cargo.lock rust/hg-core/Cargo.toml rust/hg-core/src/errors.rs rust/hg-core/src/lib.rs rust/hg-core/src/update.rs rust/hg-cpython/src/update.rs rust/hg-cpython/src/utils.rs |
diffstat | 7 files changed, 115 insertions(+), 13 deletions(-) [+] |
line wrap: on
line diff
--- a/rust/Cargo.lock Fri Nov 08 17:08:11 2024 +0100 +++ b/rust/Cargo.lock Tue Nov 12 12:52:13 2024 +0100 @@ -170,6 +170,12 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + +[[package]] name = "chrono" version = "0.4.34" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -349,6 +355,16 @@ ] [[package]] +name = "ctrlc" +version = "3.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90eeab0aa92f3f9b4e87f258c72b139c207d251f9cbc1080a0086b86a8870dd3" +dependencies = [ + "nix", + "windows-sys 0.59.0", +] + +[[package]] name = "cxx" version = "1.0.81" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -620,6 +636,7 @@ "chrono", "clap", "crossbeam-channel", + "ctrlc", "derive_more", "dyn-clone", "filetime", @@ -895,6 +912,18 @@ ] [[package]] +name = "nix" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" +dependencies = [ + "bitflags 2.6.0", + "cfg-if", + "cfg_aliases", + "libc", +] + +[[package]] name = "nom8" version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index"
--- a/rust/hg-core/Cargo.toml Fri Nov 08 17:08:11 2024 +0100 +++ b/rust/hg-core/Cargo.toml Tue Nov 12 12:52:13 2024 +0100 @@ -12,6 +12,7 @@ bitflags = "1.3.2" bytes-cast = "0.3.0" byteorder = "1.4.3" +ctrlc = "3.4" derive_more = "0.99.17" hashbrown = { version = "0.13.1", features = ["rayon"] } home = "0.5.4"
--- a/rust/hg-core/src/errors.rs Fri Nov 08 17:08:11 2024 +0100 +++ b/rust/hg-core/src/errors.rs Tue Nov 12 12:52:13 2024 +0100 @@ -52,6 +52,8 @@ RaceDetected(String), /// An invalid path was found Path(HgPathError), + /// An interrupt was received and we need to stop whatever we're doing + InterruptReceived, } /// Details about where an I/O error happened @@ -121,6 +123,7 @@ write!(f, "encountered a race condition {context}") } HgError::Path(hg_path_error) => write!(f, "{}", hg_path_error), + HgError::InterruptReceived => write!(f, "interrupt received"), } } }
--- a/rust/hg-core/src/lib.rs Fri Nov 08 17:08:11 2024 +0100 +++ b/rust/hg-core/src/lib.rs Tue Nov 12 12:52:13 2024 +0100 @@ -46,10 +46,14 @@ parse_pattern_syntax_kind, read_pattern_file, IgnorePattern, PatternFileWarning, PatternSyntax, }; -use std::collections::HashMap; use std::fmt; +use std::{collections::HashMap, sync::atomic::AtomicBool}; use twox_hash::RandomXxHashBuilder64; +/// Used to communicate with threads spawned from code within this crate that +/// they should stop their work (SIGINT was received). +pub static INTERRUPT_RECEIVED: AtomicBool = AtomicBool::new(false); + pub type LineNumber = usize; /// Rust's default hasher is too slow because it tries to prevent collision
--- a/rust/hg-core/src/update.rs Fri Nov 08 17:08:11 2024 +0100 +++ b/rust/hg-core/src/update.rs Tue Nov 12 12:52:13 2024 +0100 @@ -5,6 +5,7 @@ io::Write, os::unix::fs::{MetadataExt, PermissionsExt}, path::Path, + sync::atomic::Ordering, time::Duration, }; @@ -30,6 +31,7 @@ }, vfs::{is_on_nfs_mount, VfsImpl}, DirstateParents, RevlogError, RevlogOpenOptions, UncheckedRevision, + INTERRUPT_RECEIVED, }; use crossbeam_channel::{Receiver, Sender}; use rayon::prelude::*; @@ -100,6 +102,15 @@ let chunks = chunk_tracked_files(tracked_files); progress.update(0, Some(files_count as u64)); + // TODO find a way (with `nix` or `signal-hook`?) of resetting the + // previous signal handler directly after. Currently, this is Python's + // job, but: + // - it introduces a (small) race between catching and resetting + // - it would break signal handlers in other contexts like `rhg`` + let _ = ctrlc::set_handler(|| { + INTERRUPT_RECEIVED.store(true, Ordering::Relaxed) + }); + create_working_copy( chunks, working_directory_path, @@ -111,6 +122,12 @@ workers, ); + // Reset the global interrupt now that we're done + if INTERRUPT_RECEIVED.swap(false, Ordering::Relaxed) { + // The threads have all exited early, let's re-raise + return Err(HgError::InterruptReceived); + } + let errors: Vec<HgError> = errors_receiver.iter().collect(); if !errors.is_empty() { log::debug!("{} errors during update (see trace logs)", errors.len()); @@ -192,7 +209,8 @@ workers: Option<usize>, ) { let auditor = PathAuditor::new(working_directory_path); - let work_closure = |(dir_path, chunk)| { + + let work_closure = |(dir_path, chunk)| -> Result<(), HgError> { if let Err(e) = working_copy_worker( dir_path, chunk, @@ -207,6 +225,7 @@ .send(e) .expect("channel should not be disconnected") } + Ok(()) }; if let Some(workers) = workers { if workers > 1 { @@ -223,18 +242,19 @@ Ok(pool) => { log::trace!("restricting update to {} threads", workers); pool.install(|| { - chunks.into_par_iter().for_each(work_closure); + let _ = + chunks.into_par_iter().try_for_each(work_closure); }); } } } else { // Work sequentially, don't even invoke rayon - chunks.into_iter().for_each(work_closure); + let _ = chunks.into_iter().try_for_each(work_closure); } } else { // Work in parallel by default in the global threadpool let _ = cap_default_rayon_threads(); - chunks.into_par_iter().for_each(work_closure); + let _ = chunks.into_par_iter().try_for_each(work_closure); } } @@ -256,6 +276,11 @@ let dir_path = working_directory_path.join(dir_path); std::fs::create_dir_all(&dir_path).when_writing_file(&dir_path)?; + if INTERRUPT_RECEIVED.load(Ordering::Relaxed) { + // Stop working, the user has requested that we stop + return Err(HgError::InterruptReceived); + } + for (file, file_node, flags) in chunk { auditor.audit_path(file)?; let flags = flags.map(|f| f.into());
--- a/rust/hg-cpython/src/update.rs Fri Nov 08 17:08:11 2024 +0100 +++ b/rust/hg-cpython/src/update.rs Tue Nov 12 12:52:13 2024 +0100 @@ -15,7 +15,7 @@ use crate::{ exceptions::FallbackError, - utils::{hgerror_to_pyerr, repo_from_path}, + utils::{hgerror_to_pyerr, repo_from_path, with_sigint_wrapper}, }; pub fn update_from_null_fast_path( @@ -27,10 +27,12 @@ log::trace!("Using update from null fastpath"); let repo = repo_from_path(py, repo_path)?; let progress: &dyn Progress = &HgProgressBar::new("updating"); - hgerror_to_pyerr( - py, - update_from_null(&repo, to.into(), progress, num_cpus), - ) + + let res = with_sigint_wrapper(py, || { + update_from_null(&repo, to.into(), progress, num_cpus) + })?; + + hgerror_to_pyerr(py, res) } pub fn init_module(py: Python, package: &str) -> PyResult<PyModule> {
--- a/rust/hg-cpython/src/utils.rs Fri Nov 08 17:08:11 2024 +0100 +++ b/rust/hg-cpython/src/utils.rs Tue Nov 12 12:52:13 2024 +0100 @@ -1,7 +1,7 @@ -use cpython::exc::ValueError; +use cpython::exc::{KeyboardInterrupt, ValueError}; use cpython::{ - ObjectProtocol, PyBytes, PyDict, PyErr, PyObject, PyResult, PyTuple, - Python, ToPyObject, + ObjectProtocol, PyBytes, PyClone, PyDict, PyErr, PyObject, PyResult, + PyTuple, Python, ToPyObject, }; use hg::config::Config; use hg::errors::HgError; @@ -50,6 +50,9 @@ cls.call(py, (msg,), None).ok().into_py_object(py), ) } + HgError::InterruptReceived => { + PyErr::new::<KeyboardInterrupt, _>(py, "") + } e => PyErr::new::<cpython::exc::RuntimeError, _>(py, e.to_string()), }) } @@ -104,3 +107,38 @@ }) .map(Into::into) } + +/// Wrap a call to `func` so that Python's `SIGINT` handler is first stored, +/// then restored after the call to `func` and finally raised if +/// `func` returns a [`HgError::InterruptReceived`] +pub fn with_sigint_wrapper<R>( + py: Python, + func: impl Fn() -> Result<R, HgError>, +) -> PyResult<Result<R, HgError>> { + let signal_py_mod = py.import("signal")?; + let sigint_py_const = signal_py_mod.get(py, "SIGINT")?; + let old_handler = signal_py_mod.call( + py, + "getsignal", + PyTuple::new(py, &[sigint_py_const.clone_ref(py)]), + None, + )?; + let res = func(); + // Reset the old signal handler in Python because we've may have changed it + signal_py_mod.call( + py, + "signal", + PyTuple::new(py, &[sigint_py_const.clone_ref(py), old_handler]), + None, + )?; + if let Err(HgError::InterruptReceived) = res { + // Trigger the signal in Python + signal_py_mod.call( + py, + "raise_signal", + PyTuple::new(py, &[sigint_py_const]), + None, + )?; + } + Ok(res) +}