Mercurial > hg
view rust/hg-core/src/update.rs @ 52307:22d24f6d6411
rust-lib: remove exports for not too common pattern-related types
This only muddies the lib and makes the imports more confusing.
author | Raphaël Gomès <rgomes@octobus.net> |
---|---|
date | Mon, 04 Nov 2024 11:26:41 +0100 |
parents | b422acba55f1 |
children |
line wrap: on
line source
//! Tools for moving the repository to a given revision use std::{ fs::Permissions, io::Write, os::unix::fs::{MetadataExt, PermissionsExt}, path::Path, sync::atomic::Ordering, time::Duration, }; use crate::{ dirstate::entry::{ParentFileData, TruncatedTimestamp}, dirstate::{dirstate_map::DirstateEntryReset, on_disk::write_tracked_key}, errors::{HgError, IoResultExt}, exit_codes, narrow, operations::{list_rev_tracked_files, ExpandedManifestEntry}, progress::Progress, repo::Repo, revlog::filelog::Filelog, revlog::node::NULL_NODE, revlog::options::{default_revlog_options, RevlogOpenOptions}, revlog::RevlogError, sparse, utils::{ cap_default_rayon_threads, files::{filesystem_now, get_path_from_bytes}, hg_path::{hg_path_to_path_buf, HgPath, HgPathError}, path_auditor::PathAuditor, }, vfs::{is_on_nfs_mount, VfsImpl}, DirstateParents, UncheckedRevision, INTERRUPT_RECEIVED, }; use crossbeam_channel::{Receiver, Sender}; use rayon::prelude::*; fn write_dirstate(repo: &Repo) -> Result<(), HgError> { repo.write_dirstate() .map_err(|e| HgError::abort(e.to_string(), exit_codes::ABORT, None))?; write_tracked_key(repo) } /// Update the current working copy of `repo` to the given revision `to`, from /// the null revision and set + write out the dirstate to reflect that. /// /// Do not call this outside of a Python context. This does *not* handle any /// of the checks, hooks, lock taking needed to setup and get out of this /// update from the null revision. pub fn update_from_null( repo: &Repo, to: UncheckedRevision, progress: &dyn Progress, workers: Option<usize>, ) -> Result<usize, HgError> { // Ignore the warnings, they've been displayed by Python already // TODO non-Python clients: display narrow warnings let (narrow_matcher, _) = narrow::matcher(repo)?; let files_for_rev = list_rev_tracked_files(repo, to, narrow_matcher) .map_err(handle_revlog_error)?; repo.manually_set_parents(DirstateParents { p1: repo.node(to).expect("update target should exist"), p2: NULL_NODE, })?; // Filter the working copy according to the sparse spec let tracked_files: Result<Vec<_>, _> = if !repo.has_sparse() { files_for_rev.iter().collect() } else { // Ignore the warnings, they've been displayed by Python already // TODO non-Python clients: display sparse warnings let (sparse_matcher, _) = sparse::matcher(repo)?; files_for_rev .iter() .filter(|f| { match f { Ok(f) => sparse_matcher.matches(f.0), Err(_) => true, // Errors stop the update, include them } }) .collect() }; let tracked_files = tracked_files?; if tracked_files.is_empty() { // Still write the dirstate because we might not be in the null // revision. // This can happen in narrow repos where all paths are excluded in // this revision. write_dirstate(repo)?; return Ok(0); } let store_vfs = &repo.store_vfs(); let options = default_revlog_options( repo.config(), repo.requirements(), crate::revlog::RevlogType::Filelog, )?; let (errors_sender, errors_receiver) = crossbeam_channel::unbounded(); let (files_sender, files_receiver) = crossbeam_channel::unbounded(); let working_directory_path = &repo.working_directory_path(); let files_count = tracked_files.len(); let chunks = chunk_tracked_files(tracked_files); progress.update(0, Some(files_count as u64)); // TODO find a way (with `nix` or `signal-hook`?) of resetting the // previous signal handler directly after. Currently, this is Python's // job, but: // - it introduces a (small) race between catching and resetting // - it would break signal handlers in other contexts like `rhg`` let _ = ctrlc::set_handler(|| { INTERRUPT_RECEIVED.store(true, Ordering::Relaxed) }); create_working_copy( chunks, working_directory_path, store_vfs, options, files_sender, errors_sender, progress, workers, ); // Reset the global interrupt now that we're done if INTERRUPT_RECEIVED.swap(false, Ordering::Relaxed) { // The threads have all exited early, let's re-raise return Err(HgError::InterruptReceived); } let errors: Vec<HgError> = errors_receiver.iter().collect(); if !errors.is_empty() { log::debug!("{} errors during update (see trace logs)", errors.len()); for error in errors.iter() { log::trace!("{}", error); } // Best we can do is raise the first error (in order of the channel) return Err(errors.into_iter().next().expect("can never be empty")); } // TODO try to run this concurrently to update the dirstate while we're // still writing out the working copy to see if that improves performance. let total = update_dirstate(repo, files_receiver)?; write_dirstate(repo)?; Ok(total) } fn handle_revlog_error(e: RevlogError) -> HgError { match e { crate::revlog::RevlogError::Other(hg_error) => hg_error, e => HgError::abort( format!("revlog error: {}", e), exit_codes::ABORT, None, ), } } /// Preallocated size of Vec holding directory contents. This aims at /// preventing the need for re-allocating the Vec in most cases. /// /// The value is arbitrarily picked as a little over an average number of files /// per directory done by looking at a few larger open-source repos. /// Most of the runtime is IO anyway, so this doesn't matter too much. const FILES_PER_DIRECTORY: usize = 16; /// Chunk files per directory prefix, so almost every directory is handled /// in a separate thread, which works around the FS inode mutex. /// Chunking less (and doing approximately `files_count`/`threads`) actually /// ends up being less performant: my hypothesis is `rayon`'s work stealing /// being more efficient with tasks of varying lengths. #[logging_timer::time("trace")] fn chunk_tracked_files( tracked_files: Vec<ExpandedManifestEntry>, ) -> Vec<(&HgPath, Vec<ExpandedManifestEntry>)> { let files_count = tracked_files.len(); let mut chunks = Vec::with_capacity(files_count / FILES_PER_DIRECTORY); let mut current_chunk = Vec::with_capacity(FILES_PER_DIRECTORY); let mut last_directory = tracked_files[0].0.parent(); for file_info in tracked_files { let current_directory = file_info.0.parent(); let different_directory = current_directory != last_directory; if different_directory { chunks.push((last_directory, current_chunk)); current_chunk = Vec::with_capacity(FILES_PER_DIRECTORY); } current_chunk.push(file_info); last_directory = current_directory; } chunks.push((last_directory, current_chunk)); chunks } #[logging_timer::time("trace")] #[allow(clippy::too_many_arguments)] fn create_working_copy<'a: 'b, 'b>( chunks: Vec<(&HgPath, Vec<ExpandedManifestEntry<'a>>)>, working_directory_path: &Path, store_vfs: &VfsImpl, options: RevlogOpenOptions, files_sender: Sender<(&'b HgPath, u32, usize, TruncatedTimestamp)>, error_sender: Sender<HgError>, progress: &dyn Progress, workers: Option<usize>, ) { let auditor = PathAuditor::new(working_directory_path); let work_closure = |(dir_path, chunk)| -> Result<(), HgError> { if let Err(e) = working_copy_worker( dir_path, chunk, working_directory_path, store_vfs, options, &files_sender, progress, &auditor, ) { error_sender .send(e) .expect("channel should not be disconnected") } Ok(()) }; if let Some(workers) = workers { if workers > 1 { // Work in parallel, potentially restricting the number of threads match rayon::ThreadPoolBuilder::new().num_threads(workers).build() { Err(error) => error_sender .send(HgError::abort( error.to_string(), exit_codes::ABORT, None, )) .expect("channel should not be disconnected"), Ok(pool) => { log::trace!("restricting update to {} threads", workers); pool.install(|| { let _ = chunks.into_par_iter().try_for_each(work_closure); }); } } } else { // Work sequentially, don't even invoke rayon let _ = chunks.into_iter().try_for_each(work_closure); } } else { // Work in parallel by default in the global threadpool let _ = cap_default_rayon_threads(); let _ = chunks.into_par_iter().try_for_each(work_closure); } } /// Represents a work unit for a single thread, responsible for this set of /// files and restoring them to the working copy. #[allow(clippy::too_many_arguments)] fn working_copy_worker<'a: 'b, 'b>( dir_path: &HgPath, chunk: Vec<ExpandedManifestEntry<'a>>, working_directory_path: &Path, store_vfs: &VfsImpl, options: RevlogOpenOptions, files_sender: &Sender<(&'b HgPath, u32, usize, TruncatedTimestamp)>, progress: &dyn Progress, auditor: &PathAuditor, ) -> Result<(), HgError> { let dir_path = hg_path_to_path_buf(dir_path).expect("invalid path in manifest"); let dir_path = working_directory_path.join(dir_path); std::fs::create_dir_all(&dir_path).when_writing_file(&dir_path)?; if INTERRUPT_RECEIVED.load(Ordering::Relaxed) { // Stop working, the user has requested that we stop return Err(HgError::InterruptReceived); } for (file, file_node, flags) in chunk { auditor.audit_path(file)?; let flags = flags.map(|f| f.into()); let path = working_directory_path.join(get_path_from_bytes(file.as_bytes())); // Treemanifest is not supported assert!(flags != Some(b't')); let filelog = Filelog::open_vfs(store_vfs, file, options)?; let filelog_revision_data = &filelog .data_for_node(file_node) .map_err(handle_revlog_error)?; let file_data = filelog_revision_data.file_data()?; if flags == Some(b'l') { let target = get_path_from_bytes(file_data); if let Err(e) = std::os::unix::fs::symlink(target, &path) { // If the path already exists either: // - another process created this file while ignoring the // lock => error // - our check for the fast path is incorrect => error // - this is a malicious repo/bundle and this is symlink that // tries to write things where it shouldn't be able to. match e.kind() { std::io::ErrorKind::AlreadyExists => { let metadata = std::fs::symlink_metadata(&path) .when_reading_file(&path)?; if metadata.is_dir() { return Err(HgError::Path( HgPathError::TraversesSymbolicLink { // Technically it should be one of the // children, but good enough path: file .join(HgPath::new(b"*")) .to_owned(), symlink: file.to_owned(), }, )); } return Err(e).when_writing_file(&path); } _ => return Err(e).when_writing_file(&path), } } } else { let mut f = std::fs::File::create(&path).when_writing_file(&path)?; f.write_all(file_data).when_writing_file(&path)?; } if flags == Some(b'x') { std::fs::set_permissions(&path, Permissions::from_mode(0o755)) .when_writing_file(&path)?; } let metadata = std::fs::symlink_metadata(&path).when_reading_file(&path)?; let mode = metadata.mode(); files_sender .send(( file, mode, file_data.len(), TruncatedTimestamp::for_mtime_of(&metadata) .when_reading_file(&path)?, )) .expect("channel should not be closed"); progress.increment(1, None); } Ok(()) } #[logging_timer::time("trace")] fn update_dirstate( repo: &Repo, files_receiver: Receiver<(&HgPath, u32, usize, TruncatedTimestamp)>, ) -> Result<usize, HgError> { let mut dirstate = repo .dirstate_map_mut() .map_err(|e| HgError::abort(e.to_string(), exit_codes::ABORT, None))?; // (see the comments in `filter_ambiguous_files` in `merge.py` for more) // It turns out that (on Linux at least) the filesystem resolution time // for most filesystems is based on the HZ kernel config. Their internal // clocks do return nanoseconds if the hardware clock is precise enough, // which should be the case on most recent computers but are only updated // every few milliseconds at best (every "jiffy"). // // We are still not concerned with fixing the race with other // processes that might modify the working copy right after it was created // within the same tick, because it is impossible to catch. // However, we might as well not race with operations that could run right // after this one, especially other Mercurial operations that could be // waiting for the wlock to change file contents and the dirstate. // // Thus: wait until the filesystem clock has ticked to filter ambiguous // entries and write the dirstate, but only for dirstate-v2, since v1 only // has second-level granularity and waiting for a whole second is too much // of a penalty in the general case. // Although we're assuming that people running dirstate-v2 on Linux // don't have a second-granularity FS (with the exclusion of NFS), users // can be surprising, and at some point in the future dirstate-v2 will // become the default. To that end, we limit the wait time to 100ms and // fall back to the filter method in case of a timeout. // // +------------+------+--------------+ // | version | wait | filter level | // +------------+------+--------------+ // | V1 | No | Second | // | V2 | Yes | Nanosecond | // | V2-slow-fs | No | Second | // +------------+------+--------------+ let dirstate_v2 = repo.use_dirstate_v2(); // Let's ignore NFS right off the bat let mut fast_enough_fs = !is_on_nfs_mount(repo.working_directory_path()); let fs_time_now = if dirstate_v2 && fast_enough_fs { match wait_until_fs_tick(repo.working_directory_path()) { None => None, Some(Ok(time)) => Some(time), Some(Err(time)) => { fast_enough_fs = false; Some(time) } } } else { filesystem_now(repo.working_directory_path()) .ok() .map(TruncatedTimestamp::from) }; let mut total = 0; for (filename, mode, size, mtime) in files_receiver.into_iter() { total += 1; // When using dirstate-v2 on a filesystem with reasonable performance // this is basically always true unless you get a mtime from the // far future. let has_meaningful_mtime = if let Some(fs_time) = fs_time_now { mtime.for_reliable_mtime_of_self(&fs_time).is_some_and(|t| { // Dirstate-v1 only has second-level information !t.second_ambiguous || dirstate_v2 && fast_enough_fs }) } else { // We somehow failed to write to the filesystem, so don't store // the cache information. false }; let reset = DirstateEntryReset { filename, wc_tracked: true, p1_tracked: true, p2_info: false, has_meaningful_mtime, parent_file_data_opt: Some(ParentFileData { mode_size: Some(( mode, size.try_into().expect("invalid file size in manifest"), )), mtime: Some(mtime), }), from_empty: true, }; dirstate.reset_state(reset).map_err(|e| { HgError::abort(e.to_string(), exit_codes::ABORT, None) })?; } Ok(total) } /// Wait until the next update from the filesystem time by writing in a loop /// a new temporary file inside the working directory and checking if its time /// differs from the first one observed. /// /// Returns `None` if we are unable to get the filesystem time, /// `Some(Err(timestamp))` if we've timed out waiting for the filesystem clock /// to tick, and `Some(Ok(timestamp))` if we've waited successfully. /// /// On Linux, your average tick is going to be a "jiffy", or 1/HZ. /// HZ is your kernel's tick rate (if it has one configured) and the value /// is the one returned by `grep 'CONFIG_HZ=' /boot/config-$(uname -r)`, /// again assuming a normal setup. /// /// In my case (Alphare) at the time of writing, I get `CONFIG_HZ=250`, /// which equates to 4ms. /// /// This might change with a series that could make it to Linux 6.12: /// https://lore.kernel.org/all/20241002-mgtime-v10-8-d1c4717f5284@kernel.org fn wait_until_fs_tick( working_directory_path: &Path, ) -> Option<Result<TruncatedTimestamp, TruncatedTimestamp>> { let start = std::time::Instant::now(); let old_fs_time = filesystem_now(working_directory_path).ok()?; let mut fs_time = filesystem_now(working_directory_path).ok()?; const FS_TICK_WAIT_TIMEOUT: Duration = Duration::from_millis(100); while fs_time == old_fs_time { if std::time::Instant::now() - start > FS_TICK_WAIT_TIMEOUT { log::trace!( "timed out waiting for the fs clock to tick after {:?}", FS_TICK_WAIT_TIMEOUT ); return Some(Err(TruncatedTimestamp::from(old_fs_time))); } fs_time = filesystem_now(working_directory_path).ok()?; } log::trace!( "waited for {:?} before writing the dirstate", fs_time.duration_since(old_fs_time) ); Some(Ok(TruncatedTimestamp::from(fs_time))) } #[cfg(test)] mod test { use super::*; use pretty_assertions::assert_eq; #[test] fn test_chunk_tracked_files() { fn chunk(v: Vec<&'static str>) -> Vec<ExpandedManifestEntry> { v.into_iter() .map(|f| (HgPath::new(f.as_bytes()), NULL_NODE, None)) .collect() } let p = HgPath::new; let files = chunk(vec!["a"]); let expected = vec![(p(""), chunk(vec!["a"]))]; assert_eq!(chunk_tracked_files(files), expected); let files = chunk(vec!["a", "b", "c"]); let expected = vec![(p(""), chunk(vec!["a", "b", "c"]))]; assert_eq!(chunk_tracked_files(files), expected); let files = chunk(vec![ "dir/a-new", "dir/a/mut", "dir/a/mut-mut", "dir/albert", "dir/b", "dir/subdir/c", "dir/subdir/d", "file", ]); let expected = vec![ (p("dir"), chunk(vec!["dir/a-new"])), (p("dir/a"), chunk(vec!["dir/a/mut", "dir/a/mut-mut"])), (p("dir"), chunk(vec!["dir/albert", "dir/b"])), (p("dir/subdir"), chunk(vec!["dir/subdir/c", "dir/subdir/d"])), (p(""), chunk(vec!["file"])), ]; assert_eq!(chunk_tracked_files(files), expected); // Doesn't get split let large_dir = vec![ "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23", ]; let files = chunk(large_dir.clone()); let expected = vec![(p(""), chunk(large_dir))]; assert_eq!(chunk_tracked_files(files), expected); } }