--- a/rust/hg-core/src/dirstate/status.rs Fri Mar 06 17:51:03 2020 +0100
+++ b/rust/hg-core/src/dirstate/status.rs Fri Mar 06 17:51:24 2020 +0100
@@ -300,61 +300,55 @@
}
/// Dispatch a single entry (file, folder, symlink...) found during `traverse`.
-/// If the entry is a folder that needs to be traversed, it will be pushed into
-/// `work`.
+/// If the entry is a folder that needs to be traversed, it will be handled
+/// in a separate thread.
+
fn handle_traversed_entry<'a>(
- dir_entry: &DirEntry,
- matcher: &(impl Matcher + Sync),
- root_dir: impl AsRef<Path>,
- dmap: &DirstateMap,
- filename: impl AsRef<HgPath>,
- old_results: &FastHashMap<Cow<'a, HgPath>, Dispatch>,
- ignore_fn: &(impl for<'r> Fn(&'r HgPath) -> bool + Sync),
- dir_ignore_fn: &(impl for<'r> Fn(&'r HgPath) -> bool + Sync),
+ scope: &rayon::Scope<'a>,
+ files_sender: &'a crossbeam::Sender<IoResult<(HgPathBuf, Dispatch)>>,
+ matcher: &'a (impl Matcher + Sync),
+ root_dir: impl AsRef<Path> + Sync + Send + Copy + 'a,
+ dmap: &'a DirstateMap,
+ old_results: &'a FastHashMap<Cow<HgPath>, Dispatch>,
+ ignore_fn: &'a (impl for<'r> Fn(&'r HgPath) -> bool + Sync),
+ dir_ignore_fn: &'a (impl for<'r> Fn(&'r HgPath) -> bool + Sync),
options: StatusOptions,
-) -> IoResult<Vec<(Cow<'a, HgPath>, Dispatch)>> {
+ filename: HgPathBuf,
+ dir_entry: DirEntry,
+) -> IoResult<()> {
let file_type = dir_entry.file_type()?;
- let filename = filename.as_ref();
- let entry_option = dmap.get(filename);
+ let entry_option = dmap.get(&filename);
if file_type.is_dir() {
- // Do we need to traverse it?
- if !ignore_fn(&filename) || options.list_ignored {
- return traverse_dir(
- matcher,
- root_dir,
- dmap,
- filename.to_owned(),
- &old_results,
- ignore_fn,
- dir_ignore_fn,
- options,
- );
- }
- // Nested `if` until `rust-lang/rust#53668` is stable
- if let Some(entry) = entry_option {
- // Used to be a file, is now a folder
- if matcher.matches_everything() || matcher.matches(&filename) {
- return Ok(vec![(
- Cow::Owned(filename.to_owned()),
- dispatch_missing(entry.state),
- )]);
- }
- }
+ handle_traversed_dir(
+ scope,
+ files_sender,
+ matcher,
+ root_dir,
+ dmap,
+ old_results,
+ ignore_fn,
+ dir_ignore_fn,
+ options,
+ entry_option,
+ filename,
+ );
} else if file_type.is_file() || file_type.is_symlink() {
if let Some(entry) = entry_option {
if matcher.matches_everything() || matcher.matches(&filename) {
let metadata = dir_entry.metadata()?;
- return Ok(vec![(
- Cow::Owned(filename.to_owned()),
- dispatch_found(
- &filename,
- *entry,
- HgMetadata::from_metadata(metadata),
- &dmap.copy_map,
- options,
- ),
- )]);
+ files_sender
+ .send(Ok((
+ filename.to_owned(),
+ dispatch_found(
+ &filename,
+ *entry,
+ HgMetadata::from_metadata(metadata),
+ &dmap.copy_map,
+ options,
+ ),
+ )))
+ .unwrap();
}
} else if (matcher.matches_everything() || matcher.matches(&filename))
&& !ignore_fn(&filename)
@@ -363,53 +357,96 @@
&& dir_ignore_fn(&filename)
{
if options.list_ignored {
- return Ok(vec![(
- Cow::Owned(filename.to_owned()),
- Dispatch::Ignored,
- )]);
+ files_sender
+ .send(Ok((filename.to_owned(), Dispatch::Ignored)))
+ .unwrap();
}
} else {
- return Ok(vec![(
- Cow::Owned(filename.to_owned()),
- Dispatch::Unknown,
- )]);
+ files_sender
+ .send(Ok((filename.to_owned(), Dispatch::Unknown)))
+ .unwrap();
}
} else if ignore_fn(&filename) && options.list_ignored {
- return Ok(vec![(
- Cow::Owned(filename.to_owned()),
- Dispatch::Ignored,
- )]);
+ files_sender
+ .send(Ok((filename.to_owned(), Dispatch::Ignored)))
+ .unwrap();
}
} else if let Some(entry) = entry_option {
// Used to be a file or a folder, now something else.
if matcher.matches_everything() || matcher.matches(&filename) {
- return Ok(vec![(
- Cow::Owned(filename.to_owned()),
- dispatch_missing(entry.state),
- )]);
+ files_sender
+ .send(Ok((filename.to_owned(), dispatch_missing(entry.state))))
+ .unwrap();
}
}
- return Ok(vec![]);
+
+ Ok(())
}
-/// Decides whether the directory needs to be listed, and if so dispatches its
-/// entries
+/// A directory was found in the filesystem and needs to be traversed
+fn handle_traversed_dir<'a>(
+ scope: &rayon::Scope<'a>,
+ files_sender: &'a crossbeam::Sender<IoResult<(HgPathBuf, Dispatch)>>,
+ matcher: &'a (impl Matcher + Sync),
+ root_dir: impl AsRef<Path> + Sync + Send + Copy + 'a,
+ dmap: &'a DirstateMap,
+ old_results: &'a FastHashMap<Cow<HgPath>, Dispatch>,
+ ignore_fn: &'a (impl for<'r> Fn(&'r HgPath) -> bool + Sync),
+ dir_ignore_fn: &'a (impl for<'r> Fn(&'r HgPath) -> bool + Sync),
+ options: StatusOptions,
+ entry_option: Option<&'a DirstateEntry>,
+ directory: HgPathBuf,
+) {
+ scope.spawn(move |_| {
+ // Nested `if` until `rust-lang/rust#53668` is stable
+ if let Some(entry) = entry_option {
+ // Used to be a file, is now a folder
+ if matcher.matches_everything() || matcher.matches(&directory) {
+ files_sender
+ .send(Ok((
+ directory.to_owned(),
+ dispatch_missing(entry.state),
+ )))
+ .unwrap();
+ }
+ }
+ // Do we need to traverse it?
+ if !ignore_fn(&directory) || options.list_ignored {
+ traverse_dir(
+ files_sender,
+ matcher,
+ root_dir,
+ dmap,
+ directory,
+ &old_results,
+ ignore_fn,
+ dir_ignore_fn,
+ options,
+ )
+ .unwrap_or_else(|e| files_sender.send(Err(e)).unwrap())
+ }
+ });
+}
+
+/// Decides whether the directory needs to be listed, and if so handles the
+/// entries in a separate thread.
fn traverse_dir<'a>(
- matcher: &(impl Matcher + Sync),
- root_dir: impl AsRef<Path>,
- dmap: &DirstateMap,
- path: impl AsRef<HgPath>,
+ files_sender: &crossbeam::Sender<IoResult<(HgPathBuf, Dispatch)>>,
+ matcher: &'a (impl Matcher + Sync),
+ root_dir: impl AsRef<Path> + Sync + Send + Copy,
+ dmap: &'a DirstateMap,
+ directory: impl AsRef<HgPath>,
old_results: &FastHashMap<Cow<'a, HgPath>, Dispatch>,
ignore_fn: &(impl for<'r> Fn(&'r HgPath) -> bool + Sync),
dir_ignore_fn: &(impl for<'r> Fn(&'r HgPath) -> bool + Sync),
options: StatusOptions,
-) -> IoResult<Vec<(Cow<'a, HgPath>, Dispatch)>> {
- let directory = path.as_ref();
+) -> IoResult<()> {
+ let directory = directory.as_ref();
if directory.as_bytes() == b".hg" {
- return Ok(vec![]);
+ return Ok(());
}
let visit_entries = match matcher.visit_children_set(directory) {
- VisitChildrenSet::Empty => return Ok(vec![]),
+ VisitChildrenSet::Empty => return Ok(()),
VisitChildrenSet::This | VisitChildrenSet::Recursive => None,
VisitChildrenSet::Set(set) => Some(set),
};
@@ -420,49 +457,108 @@
let entries = match list_directory(dir_path, skip_dot_hg) {
Err(e) => match e.kind() {
ErrorKind::NotFound | ErrorKind::PermissionDenied => {
- return Ok(vec![(
- Cow::Owned(directory.to_owned()),
- Dispatch::Bad(BadMatch::OsError(
- // Unwrapping here is OK because the error always
- // is a real os error
- e.raw_os_error().unwrap(),
- )),
- )]);
+ files_sender
+ .send(Ok((
+ directory.to_owned(),
+ Dispatch::Bad(BadMatch::OsError(
+ // Unwrapping here is OK because the error always
+ // is a real os error
+ e.raw_os_error().unwrap(),
+ )),
+ )))
+ .unwrap();
+ return Ok(());
}
_ => return Err(e),
},
Ok(entries) => entries,
};
- let mut new_results = vec![];
- for (filename, dir_entry) in entries {
- if let Some(ref set) = visit_entries {
- if !set.contains(filename.deref()) {
- continue;
+ rayon::scope(|scope| -> IoResult<()> {
+ for (filename, dir_entry) in entries {
+ if let Some(ref set) = visit_entries {
+ if !set.contains(filename.deref()) {
+ continue;
+ }
+ }
+ // TODO normalize
+ let filename = if directory.is_empty() {
+ filename.to_owned()
+ } else {
+ directory.join(&filename)
+ };
+
+ if !old_results.contains_key(filename.deref()) {
+ handle_traversed_entry(
+ scope,
+ files_sender,
+ matcher,
+ root_dir,
+ dmap,
+ old_results,
+ ignore_fn,
+ dir_ignore_fn,
+ options,
+ filename,
+ dir_entry,
+ )?;
}
}
- // TODO normalize
- let filename = if directory.is_empty() {
- filename.to_owned()
- } else {
- directory.join(&filename)
- };
+ Ok(())
+ })
+}
+
+/// Walk the working directory recursively to look for changes compared to the
+/// current `DirstateMap`.
+///
+/// This takes a mutable reference to the results to account for the `extend`
+/// in timings
+fn traverse<'a>(
+ matcher: &'a (impl Matcher + Sync),
+ root_dir: impl AsRef<Path> + Sync + Send + Copy,
+ dmap: &'a DirstateMap,
+ path: impl AsRef<HgPath>,
+ old_results: &FastHashMap<Cow<'a, HgPath>, Dispatch>,
+ ignore_fn: &(impl for<'r> Fn(&'r HgPath) -> bool + Sync),
+ dir_ignore_fn: &(impl for<'r> Fn(&'r HgPath) -> bool + Sync),
+ options: StatusOptions,
+ results: &mut Vec<(Cow<'a, HgPath>, Dispatch)>,
+) -> IoResult<()> {
+ let root_dir = root_dir.as_ref();
+
+ // The traversal is done in parallel, so use a channel to gather entries.
+ // `crossbeam::Sender` is `Send`, while `mpsc::Sender` is not.
+ let (files_transmitter, files_receiver) = crossbeam::channel::unbounded();
- if !old_results.contains_key(filename.deref()) {
- new_results.extend(handle_traversed_entry(
- &dir_entry,
- matcher,
- root_dir.as_ref(),
- &dmap,
- &filename,
- old_results,
- ignore_fn,
- dir_ignore_fn,
- options,
- )?);
- }
- }
- Ok(new_results)
+ traverse_dir(
+ &files_transmitter,
+ matcher,
+ root_dir,
+ &dmap,
+ path,
+ &old_results,
+ &ignore_fn,
+ &dir_ignore_fn,
+ options,
+ )?;
+
+ // Disconnect the channel so the receiver stops waiting
+ drop(files_transmitter);
+
+ // TODO don't collect. Find a way of replicating the behavior of
+ // `itertools::process_results`, but for `rayon::ParallelIterator`
+ let new_results: IoResult<Vec<(Cow<'a, HgPath>, Dispatch)>> =
+ files_receiver
+ .into_iter()
+ .map(|item| {
+ let (f, d) = item?;
+ Ok((Cow::Owned(f), d))
+ })
+ .collect();
+
+ results.par_extend(new_results?);
+
+ Ok(())
}
/// Stat all entries in the `DirstateMap` and mark them for dispatch.
@@ -753,7 +849,7 @@
if options.list_ignored
|| options.list_unknown && !dir_ignore_fn(&dir)
{
- results.par_extend(traverse_dir(
+ traverse(
matcher,
root_dir,
&dmap,
@@ -762,7 +858,8 @@
&ignore_fn,
&dir_ignore_fn,
options,
- )?);
+ &mut results,
+ )?;
}
}
_ => unreachable!("There can only be directories in `work`"),