Mercurial > hg-stable
changeset 44549:fe7d2cf0b429
rust-status: traverse working directory in parallel
Using `rayon` for this task ensures that we are using the same work-stealing
threadpool for everything.
This change introduces `crossbeam` as an explicit dependency, although it is
already a dependency of `rayon`. It provides better structures for
multi-threaded tasks than the stdlib.
Differential Revision: https://phab.mercurial-scm.org/D8251
author | Raphaël Gomès <rgomes@octobus.net> |
---|---|
date | Fri, 06 Mar 2020 17:51:24 +0100 |
parents | b8ba46c97cdd |
children | 82f51ab7a2dd |
files | rust/Cargo.lock rust/hg-core/Cargo.toml rust/hg-core/src/dirstate/status.rs |
diffstat | 3 files changed, 238 insertions(+), 109 deletions(-) [+] |
line wrap: on
line diff
--- a/rust/Cargo.lock Fri Mar 06 17:51:03 2020 +0100 +++ b/rust/Cargo.lock Fri Mar 06 17:51:24 2020 +0100 @@ -98,6 +98,28 @@ ] [[package]] +name = "crossbeam" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-channel 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-deque 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-epoch 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-queue 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-utils 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "crossbeam-channel" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "crossbeam-utils 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", + "maybe-uninit 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] name = "crossbeam-deque" version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -192,6 +214,7 @@ "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "cc 1.0.50 (registry+https://github.com/rust-lang/crates.io-index)", "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)", "hex 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)", @@ -227,6 +250,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] +name = "maybe-uninit" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] name = "memchr" version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -662,6 +690,8 @@ "checksum clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5067f5bb2d80ef5d68b4c87db81601f0b75bca627bc2ef76b141d7b846a3c6d9" "checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" "checksum cpython 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "bfaf3847ab963e40c4f6dd8d6be279bdf74007ae2413786a0dcbb28c52139a95" +"checksum crossbeam 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "69323bff1fb41c635347b8ead484a5ca6c3f11914d784170b158d8449ab07f8e" +"checksum crossbeam-channel 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "cced8691919c02aac3cb0a1bc2e9b73d89e832bf9a06fc579d4e71b68a2da061" "checksum crossbeam-deque 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c3aa945d63861bfe624b55d153a39684da1e8c0bc8fba932f7ee3a3c16cea3ca" "checksum crossbeam-epoch 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5064ebdbf05ce3cb95e45c8b086f72263f4166b29b97f6baff7ef7fe047b55ac" "checksum crossbeam-queue 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c695eeca1e7173472a32221542ae469b3e9aac3a4fc81f7696bcad82029493db" @@ -675,6 +705,7 @@ "checksum hex 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "023b39be39e3a2da62a94feb433e91e8bcd37676fbc8bea371daf52b7a769a3e" "checksum lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" "checksum libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)" = "d515b1f41455adea1313a4a2ac8a8a477634fbae63cc6100e3aebb207ce61558" +"checksum maybe-uninit 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00" "checksum memchr 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3197e20c7edb283f87c071ddfc7a2cca8f8e0b888c242959846a6fce03c72223" "checksum memmap 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6585fd95e7bb50d6cc31e20d4cf9afb4e2ba16c5846fc76793f11218da9c475b" "checksum memoffset 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "75189eb85871ea5c2e2c15abbdd541185f63b408415e5051f5cac122d8c774b9"
--- a/rust/hg-core/Cargo.toml Fri Mar 06 17:51:03 2020 +0100 +++ b/rust/hg-core/Cargo.toml Fri Mar 06 17:51:24 2020 +0100 @@ -21,6 +21,7 @@ regex = "1.1.0" twox-hash = "1.5.0" same-file = "1.0.6" +crossbeam = "0.7.3" [dev-dependencies] clap = "*"
--- a/rust/hg-core/src/dirstate/status.rs Fri Mar 06 17:51:03 2020 +0100 +++ b/rust/hg-core/src/dirstate/status.rs Fri Mar 06 17:51:24 2020 +0100 @@ -300,61 +300,55 @@ } /// Dispatch a single entry (file, folder, symlink...) found during `traverse`. -/// If the entry is a folder that needs to be traversed, it will be pushed into -/// `work`. +/// If the entry is a folder that needs to be traversed, it will be handled +/// in a separate thread. + fn handle_traversed_entry<'a>( - dir_entry: &DirEntry, - matcher: &(impl Matcher + Sync), - root_dir: impl AsRef<Path>, - dmap: &DirstateMap, - filename: impl AsRef<HgPath>, - old_results: &FastHashMap<Cow<'a, HgPath>, Dispatch>, - ignore_fn: &(impl for<'r> Fn(&'r HgPath) -> bool + Sync), - dir_ignore_fn: &(impl for<'r> Fn(&'r HgPath) -> bool + Sync), + scope: &rayon::Scope<'a>, + files_sender: &'a crossbeam::Sender<IoResult<(HgPathBuf, Dispatch)>>, + matcher: &'a (impl Matcher + Sync), + root_dir: impl AsRef<Path> + Sync + Send + Copy + 'a, + dmap: &'a DirstateMap, + old_results: &'a FastHashMap<Cow<HgPath>, Dispatch>, + ignore_fn: &'a (impl for<'r> Fn(&'r HgPath) -> bool + Sync), + dir_ignore_fn: &'a (impl for<'r> Fn(&'r HgPath) -> bool + Sync), options: StatusOptions, -) -> IoResult<Vec<(Cow<'a, HgPath>, Dispatch)>> { + filename: HgPathBuf, + dir_entry: DirEntry, +) -> IoResult<()> { let file_type = dir_entry.file_type()?; - let filename = filename.as_ref(); - let entry_option = dmap.get(filename); + let entry_option = dmap.get(&filename); if file_type.is_dir() { - // Do we need to traverse it? - if !ignore_fn(&filename) || options.list_ignored { - return traverse_dir( - matcher, - root_dir, - dmap, - filename.to_owned(), - &old_results, - ignore_fn, - dir_ignore_fn, - options, - ); - } - // Nested `if` until `rust-lang/rust#53668` is stable - if let Some(entry) = entry_option { - // Used to be a file, is now a folder - if matcher.matches_everything() || matcher.matches(&filename) { - return Ok(vec![( - Cow::Owned(filename.to_owned()), - dispatch_missing(entry.state), - )]); - } - } + handle_traversed_dir( + scope, + files_sender, + matcher, + root_dir, + dmap, + old_results, + ignore_fn, + dir_ignore_fn, + options, + entry_option, + filename, + ); } else if file_type.is_file() || file_type.is_symlink() { if let Some(entry) = entry_option { if matcher.matches_everything() || matcher.matches(&filename) { let metadata = dir_entry.metadata()?; - return Ok(vec![( - Cow::Owned(filename.to_owned()), - dispatch_found( - &filename, - *entry, - HgMetadata::from_metadata(metadata), - &dmap.copy_map, - options, - ), - )]); + files_sender + .send(Ok(( + filename.to_owned(), + dispatch_found( + &filename, + *entry, + HgMetadata::from_metadata(metadata), + &dmap.copy_map, + options, + ), + ))) + .unwrap(); } } else if (matcher.matches_everything() || matcher.matches(&filename)) && !ignore_fn(&filename) @@ -363,53 +357,96 @@ && dir_ignore_fn(&filename) { if options.list_ignored { - return Ok(vec![( - Cow::Owned(filename.to_owned()), - Dispatch::Ignored, - )]); + files_sender + .send(Ok((filename.to_owned(), Dispatch::Ignored))) + .unwrap(); } } else { - return Ok(vec![( - Cow::Owned(filename.to_owned()), - Dispatch::Unknown, - )]); + files_sender + .send(Ok((filename.to_owned(), Dispatch::Unknown))) + .unwrap(); } } else if ignore_fn(&filename) && options.list_ignored { - return Ok(vec![( - Cow::Owned(filename.to_owned()), - Dispatch::Ignored, - )]); + files_sender + .send(Ok((filename.to_owned(), Dispatch::Ignored))) + .unwrap(); } } else if let Some(entry) = entry_option { // Used to be a file or a folder, now something else. if matcher.matches_everything() || matcher.matches(&filename) { - return Ok(vec![( - Cow::Owned(filename.to_owned()), - dispatch_missing(entry.state), - )]); + files_sender + .send(Ok((filename.to_owned(), dispatch_missing(entry.state)))) + .unwrap(); } } - return Ok(vec![]); + + Ok(()) } -/// Decides whether the directory needs to be listed, and if so dispatches its -/// entries +/// A directory was found in the filesystem and needs to be traversed +fn handle_traversed_dir<'a>( + scope: &rayon::Scope<'a>, + files_sender: &'a crossbeam::Sender<IoResult<(HgPathBuf, Dispatch)>>, + matcher: &'a (impl Matcher + Sync), + root_dir: impl AsRef<Path> + Sync + Send + Copy + 'a, + dmap: &'a DirstateMap, + old_results: &'a FastHashMap<Cow<HgPath>, Dispatch>, + ignore_fn: &'a (impl for<'r> Fn(&'r HgPath) -> bool + Sync), + dir_ignore_fn: &'a (impl for<'r> Fn(&'r HgPath) -> bool + Sync), + options: StatusOptions, + entry_option: Option<&'a DirstateEntry>, + directory: HgPathBuf, +) { + scope.spawn(move |_| { + // Nested `if` until `rust-lang/rust#53668` is stable + if let Some(entry) = entry_option { + // Used to be a file, is now a folder + if matcher.matches_everything() || matcher.matches(&directory) { + files_sender + .send(Ok(( + directory.to_owned(), + dispatch_missing(entry.state), + ))) + .unwrap(); + } + } + // Do we need to traverse it? + if !ignore_fn(&directory) || options.list_ignored { + traverse_dir( + files_sender, + matcher, + root_dir, + dmap, + directory, + &old_results, + ignore_fn, + dir_ignore_fn, + options, + ) + .unwrap_or_else(|e| files_sender.send(Err(e)).unwrap()) + } + }); +} + +/// Decides whether the directory needs to be listed, and if so handles the +/// entries in a separate thread. fn traverse_dir<'a>( - matcher: &(impl Matcher + Sync), - root_dir: impl AsRef<Path>, - dmap: &DirstateMap, - path: impl AsRef<HgPath>, + files_sender: &crossbeam::Sender<IoResult<(HgPathBuf, Dispatch)>>, + matcher: &'a (impl Matcher + Sync), + root_dir: impl AsRef<Path> + Sync + Send + Copy, + dmap: &'a DirstateMap, + directory: impl AsRef<HgPath>, old_results: &FastHashMap<Cow<'a, HgPath>, Dispatch>, ignore_fn: &(impl for<'r> Fn(&'r HgPath) -> bool + Sync), dir_ignore_fn: &(impl for<'r> Fn(&'r HgPath) -> bool + Sync), options: StatusOptions, -) -> IoResult<Vec<(Cow<'a, HgPath>, Dispatch)>> { - let directory = path.as_ref(); +) -> IoResult<()> { + let directory = directory.as_ref(); if directory.as_bytes() == b".hg" { - return Ok(vec![]); + return Ok(()); } let visit_entries = match matcher.visit_children_set(directory) { - VisitChildrenSet::Empty => return Ok(vec![]), + VisitChildrenSet::Empty => return Ok(()), VisitChildrenSet::This | VisitChildrenSet::Recursive => None, VisitChildrenSet::Set(set) => Some(set), }; @@ -420,49 +457,108 @@ let entries = match list_directory(dir_path, skip_dot_hg) { Err(e) => match e.kind() { ErrorKind::NotFound | ErrorKind::PermissionDenied => { - return Ok(vec![( - Cow::Owned(directory.to_owned()), - Dispatch::Bad(BadMatch::OsError( - // Unwrapping here is OK because the error always - // is a real os error - e.raw_os_error().unwrap(), - )), - )]); + files_sender + .send(Ok(( + directory.to_owned(), + Dispatch::Bad(BadMatch::OsError( + // Unwrapping here is OK because the error always + // is a real os error + e.raw_os_error().unwrap(), + )), + ))) + .unwrap(); + return Ok(()); } _ => return Err(e), }, Ok(entries) => entries, }; - let mut new_results = vec![]; - for (filename, dir_entry) in entries { - if let Some(ref set) = visit_entries { - if !set.contains(filename.deref()) { - continue; + rayon::scope(|scope| -> IoResult<()> { + for (filename, dir_entry) in entries { + if let Some(ref set) = visit_entries { + if !set.contains(filename.deref()) { + continue; + } + } + // TODO normalize + let filename = if directory.is_empty() { + filename.to_owned() + } else { + directory.join(&filename) + }; + + if !old_results.contains_key(filename.deref()) { + handle_traversed_entry( + scope, + files_sender, + matcher, + root_dir, + dmap, + old_results, + ignore_fn, + dir_ignore_fn, + options, + filename, + dir_entry, + )?; } } - // TODO normalize - let filename = if directory.is_empty() { - filename.to_owned() - } else { - directory.join(&filename) - }; + Ok(()) + }) +} + +/// Walk the working directory recursively to look for changes compared to the +/// current `DirstateMap`. +/// +/// This takes a mutable reference to the results to account for the `extend` +/// in timings +fn traverse<'a>( + matcher: &'a (impl Matcher + Sync), + root_dir: impl AsRef<Path> + Sync + Send + Copy, + dmap: &'a DirstateMap, + path: impl AsRef<HgPath>, + old_results: &FastHashMap<Cow<'a, HgPath>, Dispatch>, + ignore_fn: &(impl for<'r> Fn(&'r HgPath) -> bool + Sync), + dir_ignore_fn: &(impl for<'r> Fn(&'r HgPath) -> bool + Sync), + options: StatusOptions, + results: &mut Vec<(Cow<'a, HgPath>, Dispatch)>, +) -> IoResult<()> { + let root_dir = root_dir.as_ref(); + + // The traversal is done in parallel, so use a channel to gather entries. + // `crossbeam::Sender` is `Send`, while `mpsc::Sender` is not. + let (files_transmitter, files_receiver) = crossbeam::channel::unbounded(); - if !old_results.contains_key(filename.deref()) { - new_results.extend(handle_traversed_entry( - &dir_entry, - matcher, - root_dir.as_ref(), - &dmap, - &filename, - old_results, - ignore_fn, - dir_ignore_fn, - options, - )?); - } - } - Ok(new_results) + traverse_dir( + &files_transmitter, + matcher, + root_dir, + &dmap, + path, + &old_results, + &ignore_fn, + &dir_ignore_fn, + options, + )?; + + // Disconnect the channel so the receiver stops waiting + drop(files_transmitter); + + // TODO don't collect. Find a way of replicating the behavior of + // `itertools::process_results`, but for `rayon::ParallelIterator` + let new_results: IoResult<Vec<(Cow<'a, HgPath>, Dispatch)>> = + files_receiver + .into_iter() + .map(|item| { + let (f, d) = item?; + Ok((Cow::Owned(f), d)) + }) + .collect(); + + results.par_extend(new_results?); + + Ok(()) } /// Stat all entries in the `DirstateMap` and mark them for dispatch. @@ -753,7 +849,7 @@ if options.list_ignored || options.list_unknown && !dir_ignore_fn(&dir) { - results.par_extend(traverse_dir( + traverse( matcher, root_dir, &dmap, @@ -762,7 +858,8 @@ &ignore_fn, &dir_ignore_fn, options, - )?); + &mut results, + )?; } } _ => unreachable!("There can only be directories in `work`"),