rust-update: handle SIGINT from long-running update threads stable
authorRaphaël Gomès <rgomes@octobus.net>
Tue, 12 Nov 2024 12:52:13 +0100
branchstable
changeset 52183 96b113d22b34
parent 52182 fa58f4f97337
child 52184 9048a0d782e1
rust-update: handle SIGINT from long-running update threads The current code does not respond to ^C until after the Rust bit is finished doing its work. This is expected, since Rust holds the GIL for the duration of the call and does not call `PyErr_CheckSignals`. Freeing the GIL to do our work does not really improve anything since the Rust threads are still going, and the only way of cancelling a thread is by making it cooperate. So we do the following: - remember the SIGINT handler in hg-cpython and reset it after the call into core (see inline comment in `update.rs` about this) - make all update threads watch for a global `AtomicBool` being `true`, and if so stop their work - reset the global bool and exit early (i.e. before writing the dirstate) - raise SIGINT from `hg-cpython` if update returns `InterruptReceived`
rust/Cargo.lock
rust/hg-core/Cargo.toml
rust/hg-core/src/errors.rs
rust/hg-core/src/lib.rs
rust/hg-core/src/update.rs
rust/hg-cpython/src/update.rs
rust/hg-cpython/src/utils.rs
--- a/rust/Cargo.lock	Fri Nov 08 17:08:11 2024 +0100
+++ b/rust/Cargo.lock	Tue Nov 12 12:52:13 2024 +0100
@@ -170,6 +170,12 @@
 checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
 
 [[package]]
+name = "cfg_aliases"
+version = "0.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
+
+[[package]]
 name = "chrono"
 version = "0.4.34"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -349,6 +355,16 @@
 ]
 
 [[package]]
+name = "ctrlc"
+version = "3.4.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "90eeab0aa92f3f9b4e87f258c72b139c207d251f9cbc1080a0086b86a8870dd3"
+dependencies = [
+ "nix",
+ "windows-sys 0.59.0",
+]
+
+[[package]]
 name = "cxx"
 version = "1.0.81"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -620,6 +636,7 @@
  "chrono",
  "clap",
  "crossbeam-channel",
+ "ctrlc",
  "derive_more",
  "dyn-clone",
  "filetime",
@@ -895,6 +912,18 @@
 ]
 
 [[package]]
+name = "nix"
+version = "0.29.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46"
+dependencies = [
+ "bitflags 2.6.0",
+ "cfg-if",
+ "cfg_aliases",
+ "libc",
+]
+
+[[package]]
 name = "nom8"
 version = "0.2.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
--- a/rust/hg-core/Cargo.toml	Fri Nov 08 17:08:11 2024 +0100
+++ b/rust/hg-core/Cargo.toml	Tue Nov 12 12:52:13 2024 +0100
@@ -12,6 +12,7 @@
 bitflags = "1.3.2"
 bytes-cast = "0.3.0"
 byteorder = "1.4.3"
+ctrlc = "3.4"
 derive_more = "0.99.17"
 hashbrown = { version = "0.13.1", features = ["rayon"] }
 home = "0.5.4"
--- a/rust/hg-core/src/errors.rs	Fri Nov 08 17:08:11 2024 +0100
+++ b/rust/hg-core/src/errors.rs	Tue Nov 12 12:52:13 2024 +0100
@@ -52,6 +52,8 @@
     RaceDetected(String),
     /// An invalid path was found
     Path(HgPathError),
+    /// An interrupt was received and we need to stop whatever we're doing
+    InterruptReceived,
 }
 
 /// Details about where an I/O error happened
@@ -121,6 +123,7 @@
                 write!(f, "encountered a race condition {context}")
             }
             HgError::Path(hg_path_error) => write!(f, "{}", hg_path_error),
+            HgError::InterruptReceived => write!(f, "interrupt received"),
         }
     }
 }
--- a/rust/hg-core/src/lib.rs	Fri Nov 08 17:08:11 2024 +0100
+++ b/rust/hg-core/src/lib.rs	Tue Nov 12 12:52:13 2024 +0100
@@ -46,10 +46,14 @@
     parse_pattern_syntax_kind, read_pattern_file, IgnorePattern,
     PatternFileWarning, PatternSyntax,
 };
-use std::collections::HashMap;
 use std::fmt;
+use std::{collections::HashMap, sync::atomic::AtomicBool};
 use twox_hash::RandomXxHashBuilder64;
 
+/// Used to communicate with threads spawned from code within this crate that
+/// they should stop their work (SIGINT was received).
+pub static INTERRUPT_RECEIVED: AtomicBool = AtomicBool::new(false);
+
 pub type LineNumber = usize;
 
 /// Rust's default hasher is too slow because it tries to prevent collision
--- a/rust/hg-core/src/update.rs	Fri Nov 08 17:08:11 2024 +0100
+++ b/rust/hg-core/src/update.rs	Tue Nov 12 12:52:13 2024 +0100
@@ -5,6 +5,7 @@
     io::Write,
     os::unix::fs::{MetadataExt, PermissionsExt},
     path::Path,
+    sync::atomic::Ordering,
     time::Duration,
 };
 
@@ -30,6 +31,7 @@
     },
     vfs::{is_on_nfs_mount, VfsImpl},
     DirstateParents, RevlogError, RevlogOpenOptions, UncheckedRevision,
+    INTERRUPT_RECEIVED,
 };
 use crossbeam_channel::{Receiver, Sender};
 use rayon::prelude::*;
@@ -100,6 +102,15 @@
     let chunks = chunk_tracked_files(tracked_files);
     progress.update(0, Some(files_count as u64));
 
+    // TODO find a way (with `nix` or `signal-hook`?) of resetting the
+    // previous signal handler directly after. Currently, this is Python's
+    // job, but:
+    //     - it introduces a (small) race between catching and resetting
+    //     - it would break signal handlers in other contexts like `rhg``
+    let _ = ctrlc::set_handler(|| {
+        INTERRUPT_RECEIVED.store(true, Ordering::Relaxed)
+    });
+
     create_working_copy(
         chunks,
         working_directory_path,
@@ -111,6 +122,12 @@
         workers,
     );
 
+    // Reset the global interrupt now that we're done
+    if INTERRUPT_RECEIVED.swap(false, Ordering::Relaxed) {
+        // The threads have all exited early, let's re-raise
+        return Err(HgError::InterruptReceived);
+    }
+
     let errors: Vec<HgError> = errors_receiver.iter().collect();
     if !errors.is_empty() {
         log::debug!("{} errors during update (see trace logs)", errors.len());
@@ -192,7 +209,8 @@
     workers: Option<usize>,
 ) {
     let auditor = PathAuditor::new(working_directory_path);
-    let work_closure = |(dir_path, chunk)| {
+
+    let work_closure = |(dir_path, chunk)| -> Result<(), HgError> {
         if let Err(e) = working_copy_worker(
             dir_path,
             chunk,
@@ -207,6 +225,7 @@
                 .send(e)
                 .expect("channel should not be disconnected")
         }
+        Ok(())
     };
     if let Some(workers) = workers {
         if workers > 1 {
@@ -223,18 +242,19 @@
                 Ok(pool) => {
                     log::trace!("restricting update to {} threads", workers);
                     pool.install(|| {
-                        chunks.into_par_iter().for_each(work_closure);
+                        let _ =
+                            chunks.into_par_iter().try_for_each(work_closure);
                     });
                 }
             }
         } else {
             // Work sequentially, don't even invoke rayon
-            chunks.into_iter().for_each(work_closure);
+            let _ = chunks.into_iter().try_for_each(work_closure);
         }
     } else {
         // Work in parallel by default in the global threadpool
         let _ = cap_default_rayon_threads();
-        chunks.into_par_iter().for_each(work_closure);
+        let _ = chunks.into_par_iter().try_for_each(work_closure);
     }
 }
 
@@ -256,6 +276,11 @@
     let dir_path = working_directory_path.join(dir_path);
     std::fs::create_dir_all(&dir_path).when_writing_file(&dir_path)?;
 
+    if INTERRUPT_RECEIVED.load(Ordering::Relaxed) {
+        // Stop working, the user has requested that we stop
+        return Err(HgError::InterruptReceived);
+    }
+
     for (file, file_node, flags) in chunk {
         auditor.audit_path(file)?;
         let flags = flags.map(|f| f.into());
--- a/rust/hg-cpython/src/update.rs	Fri Nov 08 17:08:11 2024 +0100
+++ b/rust/hg-cpython/src/update.rs	Tue Nov 12 12:52:13 2024 +0100
@@ -15,7 +15,7 @@
 
 use crate::{
     exceptions::FallbackError,
-    utils::{hgerror_to_pyerr, repo_from_path},
+    utils::{hgerror_to_pyerr, repo_from_path, with_sigint_wrapper},
 };
 
 pub fn update_from_null_fast_path(
@@ -27,10 +27,12 @@
     log::trace!("Using update from null fastpath");
     let repo = repo_from_path(py, repo_path)?;
     let progress: &dyn Progress = &HgProgressBar::new("updating");
-    hgerror_to_pyerr(
-        py,
-        update_from_null(&repo, to.into(), progress, num_cpus),
-    )
+
+    let res = with_sigint_wrapper(py, || {
+        update_from_null(&repo, to.into(), progress, num_cpus)
+    })?;
+
+    hgerror_to_pyerr(py, res)
 }
 
 pub fn init_module(py: Python, package: &str) -> PyResult<PyModule> {
--- a/rust/hg-cpython/src/utils.rs	Fri Nov 08 17:08:11 2024 +0100
+++ b/rust/hg-cpython/src/utils.rs	Tue Nov 12 12:52:13 2024 +0100
@@ -1,7 +1,7 @@
-use cpython::exc::ValueError;
+use cpython::exc::{KeyboardInterrupt, ValueError};
 use cpython::{
-    ObjectProtocol, PyBytes, PyDict, PyErr, PyObject, PyResult, PyTuple,
-    Python, ToPyObject,
+    ObjectProtocol, PyBytes, PyClone, PyDict, PyErr, PyObject, PyResult,
+    PyTuple, Python, ToPyObject,
 };
 use hg::config::Config;
 use hg::errors::HgError;
@@ -50,6 +50,9 @@
                 cls.call(py, (msg,), None).ok().into_py_object(py),
             )
         }
+        HgError::InterruptReceived => {
+            PyErr::new::<KeyboardInterrupt, _>(py, "")
+        }
         e => PyErr::new::<cpython::exc::RuntimeError, _>(py, e.to_string()),
     })
 }
@@ -104,3 +107,38 @@
         })
         .map(Into::into)
 }
+
+/// Wrap a call to `func` so that Python's `SIGINT` handler is first stored,
+/// then restored after the call to `func` and finally raised if
+/// `func` returns a [`HgError::InterruptReceived`]
+pub fn with_sigint_wrapper<R>(
+    py: Python,
+    func: impl Fn() -> Result<R, HgError>,
+) -> PyResult<Result<R, HgError>> {
+    let signal_py_mod = py.import("signal")?;
+    let sigint_py_const = signal_py_mod.get(py, "SIGINT")?;
+    let old_handler = signal_py_mod.call(
+        py,
+        "getsignal",
+        PyTuple::new(py, &[sigint_py_const.clone_ref(py)]),
+        None,
+    )?;
+    let res = func();
+    // Reset the old signal handler in Python because we've may have changed it
+    signal_py_mod.call(
+        py,
+        "signal",
+        PyTuple::new(py, &[sigint_py_const.clone_ref(py), old_handler]),
+        None,
+    )?;
+    if let Err(HgError::InterruptReceived) = res {
+        // Trigger the signal in Python
+        signal_py_mod.call(
+            py,
+            "raise_signal",
+            PyTuple::new(py, &[sigint_py_const]),
+            None,
+        )?;
+    }
+    Ok(res)
+}