rust-update: make `update_from_null` respect `worker.numcpu` config option stable
authorRaphaël Gomès <rgomes@octobus.net>
Tue, 05 Nov 2024 15:21:09 +0100
branchstable
changeset 52190 e6a44bc91bc2
parent 52189 e698e3e75420
child 52191 cfc4985b2964
rust-update: make `update_from_null` respect `worker.numcpu` config option This was overlooked in the original series. This is important for tests (because we run many at once), and for the occasional end user that wants to keep their CPU usage in check. A future series should clean up this `worker` parameter tunelling business by rewriting the config in Rust, but doing so on stable would be a very bad idea.
mercurial/merge.py
rust/hg-core/src/update.rs
rust/hg-core/src/utils.rs
rust/hg-cpython/src/update.rs
--- a/mercurial/merge.py	Tue Nov 05 15:18:32 2024 +0100
+++ b/mercurial/merge.py	Tue Nov 05 15:21:09 2024 +0100
@@ -2054,9 +2054,14 @@
             repo.hook(b'preupdate', throw=True, parent1=xp1, parent2=xp2)
             # note that we're in the middle of an update
             repo.vfs.write(b'updatestate', p2.hex())
+            num_cpus = (
+                repo.ui.configint(b"worker", b"numcpus", None)
+                if repo.ui.configbool(b"worker", b"enabled")
+                else 1
+            )
             try:
                 updated_count = rust_update_mod.update_from_null(
-                    repo.root, p2.rev()
+                    repo.root, p2.rev(), num_cpus
                 )
             except rust_update_mod.FallbackError:
                 update_from_null_fallback = True
--- a/rust/hg-core/src/update.rs	Tue Nov 05 15:18:32 2024 +0100
+++ b/rust/hg-core/src/update.rs	Tue Nov 05 15:21:09 2024 +0100
@@ -23,6 +23,7 @@
     repo::Repo,
     sparse,
     utils::{
+        cap_default_rayon_threads,
         files::{filesystem_now, get_path_from_bytes},
         hg_path::{hg_path_to_path_buf, HgPath, HgPathError},
         path_auditor::PathAuditor,
@@ -49,6 +50,7 @@
     repo: &Repo,
     to: UncheckedRevision,
     progress: &dyn Progress,
+    workers: Option<usize>,
 ) -> Result<usize, HgError> {
     // Ignore the warnings, they've been displayed by Python already
     // TODO non-Python clients: display narrow warnings
@@ -106,6 +108,7 @@
         files_sender,
         errors_sender,
         progress,
+        workers,
     );
 
     let errors: Vec<HgError> = errors_receiver.iter().collect();
@@ -177,6 +180,7 @@
 }
 
 #[logging_timer::time("trace")]
+#[allow(clippy::too_many_arguments)]
 fn create_working_copy<'a: 'b, 'b>(
     chunks: Vec<(&HgPath, Vec<ExpandedManifestEntry<'a>>)>,
     working_directory_path: &Path,
@@ -185,9 +189,10 @@
     files_sender: Sender<(&'b HgPath, u32, usize, TruncatedTimestamp)>,
     error_sender: Sender<HgError>,
     progress: &dyn Progress,
+    workers: Option<usize>,
 ) {
     let auditor = PathAuditor::new(working_directory_path);
-    chunks.into_par_iter().for_each(|(dir_path, chunk)| {
+    let work_closure = |(dir_path, chunk)| {
         if let Err(e) = working_copy_worker(
             dir_path,
             chunk,
@@ -202,7 +207,35 @@
                 .send(e)
                 .expect("channel should not be disconnected")
         }
-    });
+    };
+    if let Some(workers) = workers {
+        if workers > 1 {
+            // Work in parallel, potentially restricting the number of threads
+            match rayon::ThreadPoolBuilder::new().num_threads(workers).build()
+            {
+                Err(error) => error_sender
+                    .send(HgError::abort(
+                        error.to_string(),
+                        exit_codes::ABORT,
+                        None,
+                    ))
+                    .expect("channel should not be disconnected"),
+                Ok(pool) => {
+                    log::trace!("restricting update to {} threads", workers);
+                    pool.install(|| {
+                        chunks.into_par_iter().for_each(work_closure);
+                    });
+                }
+            }
+        } else {
+            // Work sequentially, don't even invoke rayon
+            chunks.into_iter().for_each(work_closure);
+        }
+    } else {
+        // Work in parallel by default in the global threadpool
+        let _ = cap_default_rayon_threads();
+        chunks.into_par_iter().for_each(work_closure);
+    }
 }
 
 /// Represents a work unit for a single thread, responsible for this set of
--- a/rust/hg-core/src/utils.rs	Tue Nov 05 15:18:32 2024 +0100
+++ b/rust/hg-core/src/utils.rs	Tue Nov 05 15:21:09 2024 +0100
@@ -542,7 +542,7 @@
 /// Force the global rayon threadpool to not exceed 16 concurrent threads
 /// unless the user has specified a value.
 /// This is a stop-gap measure until we figure out why using more than 16
-/// threads makes `status` slower for each additional thread.
+/// threads makes `status` and `update` slower for each additional thread.
 ///
 /// TODO find the underlying cause and fix it, then remove this.
 ///
--- a/rust/hg-cpython/src/update.rs	Tue Nov 05 15:18:32 2024 +0100
+++ b/rust/hg-cpython/src/update.rs	Tue Nov 05 15:21:09 2024 +0100
@@ -22,11 +22,15 @@
     py: Python,
     repo_path: PyObject,
     to: BaseRevision,
+    num_cpus: Option<usize>,
 ) -> PyResult<usize> {
     log::trace!("Using update from null fastpath");
     let repo = repo_from_path(py, repo_path)?;
     let progress: &dyn Progress = &HgProgressBar::new("updating");
-    hgerror_to_pyerr(py, update_from_null(&repo, to.into(), progress))
+    hgerror_to_pyerr(
+        py,
+        update_from_null(&repo, to.into(), progress, num_cpus),
+    )
 }
 
 pub fn init_module(py: Python, package: &str) -> PyResult<PyModule> {
@@ -41,7 +45,11 @@
         "update_from_null",
         py_fn!(
             py,
-            update_from_null_fast_path(repo_path: PyObject, to: BaseRevision,)
+            update_from_null_fast_path(
+                repo_path: PyObject,
+                to: BaseRevision,
+                num_cpus: Option<usize>
+            )
         ),
     )?;