view rust/hg-core/src/discovery.rs @ 45095:8e04607023e5

procutil: ensure that procutil.std{out,err}.write() writes all bytes Python 3 offers different kind of streams and it’s not guaranteed for all of them that calling write() writes all bytes. When Python is started in unbuffered mode, sys.std{out,err}.buffer are instances of io.FileIO, whose write() can write less bytes for platform-specific reasons (e.g. Linux has a 0x7ffff000 bytes maximum and could write less if interrupted by a signal; when writing to Windows consoles, it’s limited to 32767 bytes to avoid the "not enough space" error). This can lead to silent loss of data, both when using sys.std{out,err}.buffer (which may in fact not be a buffered stream) and when using the text streams sys.std{out,err} (I’ve created a CPython bug report for that: https://bugs.python.org/issue41221). Python may fix the problem at some point. For now, we implement our own wrapper for procutil.std{out,err} that calls the raw stream’s write() method until all bytes have been written. We don’t use sys.std{out,err} for larger writes, so I think it’s not worth the effort to patch them.
author Manuel Jacob <me@manueljacob.de>
date Fri, 10 Jul 2020 12:27:58 +0200
parents 26114bd6ec60
children e98fd81bb151
line wrap: on
line source

// discovery.rs
//
// Copyright 2019 Georges Racinet <georges.racinet@octobus.net>
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2 or any later version.

//! Discovery operations
//!
//! This is a Rust counterpart to the `partialdiscovery` class of
//! `mercurial.setdiscovery`

use super::{Graph, GraphError, Revision, NULL_REVISION};
use crate::{ancestors::MissingAncestors, dagops, FastHashMap};
use rand::seq::SliceRandom;
use rand::{thread_rng, RngCore, SeedableRng};
use std::cmp::{max, min};
use std::collections::{HashSet, VecDeque};

type Rng = rand_pcg::Pcg32;
type Seed = [u8; 16];

pub struct PartialDiscovery<G: Graph + Clone> {
    target_heads: Option<Vec<Revision>>,
    graph: G, // plays the role of self._repo
    common: MissingAncestors<G>,
    undecided: Option<HashSet<Revision>>,
    children_cache: Option<FastHashMap<Revision, Vec<Revision>>>,
    missing: HashSet<Revision>,
    rng: Rng,
    respect_size: bool,
    randomize: bool,
}

pub struct DiscoveryStats {
    pub undecided: Option<usize>,
}

/// Update an existing sample to match the expected size
///
/// The sample is updated with revisions exponentially distant from each
/// element of `heads`.
///
/// If a target size is specified, the sampling will stop once this size is
/// reached. Otherwise sampling will happen until roots of the <revs> set are
/// reached.
///
/// - `revs`: set of revs we want to discover (if None, `assume` the whole dag
///   represented by `parentfn`
/// - `heads`: set of DAG head revs
/// - `sample`: a sample to update
/// - `parentfn`: a callable to resolve parents for a revision
/// - `quicksamplesize`: optional target size of the sample
fn update_sample<I>(
    revs: Option<&HashSet<Revision>>,
    heads: impl IntoIterator<Item = Revision>,
    sample: &mut HashSet<Revision>,
    parentsfn: impl Fn(Revision) -> Result<I, GraphError>,
    quicksamplesize: Option<usize>,
) -> Result<(), GraphError>
where
    I: Iterator<Item = Revision>,
{
    let mut distances: FastHashMap<Revision, u32> = FastHashMap::default();
    let mut visit: VecDeque<Revision> = heads.into_iter().collect();
    let mut factor: u32 = 1;
    let mut seen: HashSet<Revision> = HashSet::new();
    while let Some(current) = visit.pop_front() {
        if !seen.insert(current) {
            continue;
        }

        let d = *distances.entry(current).or_insert(1);
        if d > factor {
            factor *= 2;
        }
        if d == factor {
            sample.insert(current);
            if let Some(sz) = quicksamplesize {
                if sample.len() >= sz {
                    return Ok(());
                }
            }
        }
        for p in parentsfn(current)? {
            if let Some(revs) = revs {
                if !revs.contains(&p) {
                    continue;
                }
            }
            distances.entry(p).or_insert(d + 1);
            visit.push_back(p);
        }
    }
    Ok(())
}

struct ParentsIterator {
    parents: [Revision; 2],
    cur: usize,
}

impl ParentsIterator {
    fn graph_parents(
        graph: &impl Graph,
        r: Revision,
    ) -> Result<ParentsIterator, GraphError> {
        Ok(ParentsIterator {
            parents: graph.parents(r)?,
            cur: 0,
        })
    }
}

impl Iterator for ParentsIterator {
    type Item = Revision;

    fn next(&mut self) -> Option<Revision> {
        if self.cur > 1 {
            return None;
        }
        let rev = self.parents[self.cur];
        self.cur += 1;
        if rev == NULL_REVISION {
            return self.next();
        }
        Some(rev)
    }
}

impl<G: Graph + Clone> PartialDiscovery<G> {
    /// Create a PartialDiscovery object, with the intent
    /// of comparing our `::<target_heads>` revset to the contents of another
    /// repo.
    ///
    /// For now `target_heads` is passed as a vector, and will be used
    /// at the first call to `ensure_undecided()`.
    ///
    /// If we want to make the signature more flexible,
    /// we'll have to make it a type argument of `PartialDiscovery` or a trait
    /// object since we'll keep it in the meanwhile
    ///
    /// The `respect_size` boolean controls how the sampling methods
    /// will interpret the size argument requested by the caller. If it's
    /// `false`, they are allowed to produce a sample whose size is more
    /// appropriate to the situation (typically bigger).
    ///
    /// The `randomize` boolean affects sampling, and specifically how
    /// limiting or last-minute expanding is been done:
    ///
    /// If `true`, both will perform random picking from `self.undecided`.
    /// This is currently the best for actual discoveries.
    ///
    /// If `false`, a reproductible picking strategy is performed. This is
    /// useful for integration tests.
    pub fn new(
        graph: G,
        target_heads: Vec<Revision>,
        respect_size: bool,
        randomize: bool,
    ) -> Self {
        let mut seed = [0; 16];
        if randomize {
            thread_rng().fill_bytes(&mut seed);
        }
        Self::new_with_seed(graph, target_heads, seed, respect_size, randomize)
    }

    pub fn new_with_seed(
        graph: G,
        target_heads: Vec<Revision>,
        seed: Seed,
        respect_size: bool,
        randomize: bool,
    ) -> Self {
        PartialDiscovery {
            undecided: None,
            children_cache: None,
            target_heads: Some(target_heads),
            graph: graph.clone(),
            common: MissingAncestors::new(graph, vec![]),
            missing: HashSet::new(),
            rng: Rng::from_seed(seed),
            respect_size,
            randomize,
        }
    }

    /// Extract at most `size` random elements from sample and return them
    /// as a vector
    fn limit_sample(
        &mut self,
        mut sample: Vec<Revision>,
        size: usize,
    ) -> Vec<Revision> {
        if !self.randomize {
            sample.sort();
            sample.truncate(size);
            return sample;
        }
        let sample_len = sample.len();
        if sample_len <= size {
            return sample;
        }
        let rng = &mut self.rng;
        let dropped_size = sample_len - size;
        let limited_slice = if size < dropped_size {
            sample.partial_shuffle(rng, size).0
        } else {
            sample.partial_shuffle(rng, dropped_size).1
        };
        limited_slice.to_owned()
    }

    /// Register revisions known as being common
    pub fn add_common_revisions(
        &mut self,
        common: impl IntoIterator<Item = Revision>,
    ) -> Result<(), GraphError> {
        let before_len = self.common.get_bases().len();
        self.common.add_bases(common);
        if self.common.get_bases().len() == before_len {
            return Ok(());
        }
        if let Some(ref mut undecided) = self.undecided {
            self.common.remove_ancestors_from(undecided)?;
        }
        Ok(())
    }

    /// Register revisions known as being missing
    ///
    /// # Performance note
    ///
    /// Except in the most trivial case, the first call of this method has
    /// the side effect of computing `self.undecided` set for the first time,
    /// and the related caches it might need for efficiency of its internal
    /// computation. This is typically faster if more information is
    /// available in `self.common`. Therefore, for good performance, the
    /// caller should avoid calling this too early.
    pub fn add_missing_revisions(
        &mut self,
        missing: impl IntoIterator<Item = Revision>,
    ) -> Result<(), GraphError> {
        let mut tovisit: VecDeque<Revision> = missing.into_iter().collect();
        if tovisit.is_empty() {
            return Ok(());
        }
        self.ensure_children_cache()?;
        self.ensure_undecided()?; // for safety of possible future refactors
        let children = self.children_cache.as_ref().unwrap();
        let mut seen: HashSet<Revision> = HashSet::new();
        let undecided_mut = self.undecided.as_mut().unwrap();
        while let Some(rev) = tovisit.pop_front() {
            if !self.missing.insert(rev) {
                // either it's known to be missing from a previous
                // invocation, and there's no need to iterate on its
                // children (we now they are all missing)
                // or it's from a previous iteration of this loop
                // and its children have already been queued
                continue;
            }
            undecided_mut.remove(&rev);
            match children.get(&rev) {
                None => {
                    continue;
                }
                Some(this_children) => {
                    for child in this_children.iter().cloned() {
                        if seen.insert(child) {
                            tovisit.push_back(child);
                        }
                    }
                }
            }
        }
        Ok(())
    }

    /// Do we have any information about the peer?
    pub fn has_info(&self) -> bool {
        self.common.has_bases()
    }

    /// Did we acquire full knowledge of our Revisions that the peer has?
    pub fn is_complete(&self) -> bool {
        self.undecided.as_ref().map_or(false, HashSet::is_empty)
    }

    /// Return the heads of the currently known common set of revisions.
    ///
    /// If the discovery process is not complete (see `is_complete()`), the
    /// caller must be aware that this is an intermediate state.
    ///
    /// On the other hand, if it is complete, then this is currently
    /// the only way to retrieve the end results of the discovery process.
    ///
    /// We may introduce in the future an `into_common_heads` call that
    /// would be more appropriate for normal Rust callers, dropping `self`
    /// if it is complete.
    pub fn common_heads(&self) -> Result<HashSet<Revision>, GraphError> {
        self.common.bases_heads()
    }

    /// Force first computation of `self.undecided`
    ///
    /// After this, `self.undecided.as_ref()` and `.as_mut()` can be
    /// unwrapped to get workable immutable or mutable references without
    /// any panic.
    ///
    /// This is an imperative call instead of an access with added lazyness
    /// to reduce easily the scope of mutable borrow for the caller,
    /// compared to undecided(&'a mut self) -> &'a… that would keep it
    /// as long as the resulting immutable one.
    fn ensure_undecided(&mut self) -> Result<(), GraphError> {
        if self.undecided.is_some() {
            return Ok(());
        }
        let tgt = self.target_heads.take().unwrap();
        self.undecided =
            Some(self.common.missing_ancestors(tgt)?.into_iter().collect());
        Ok(())
    }

    fn ensure_children_cache(&mut self) -> Result<(), GraphError> {
        if self.children_cache.is_some() {
            return Ok(());
        }
        self.ensure_undecided()?;

        let mut children: FastHashMap<Revision, Vec<Revision>> =
            FastHashMap::default();
        for &rev in self.undecided.as_ref().unwrap() {
            for p in ParentsIterator::graph_parents(&self.graph, rev)? {
                children.entry(p).or_insert_with(Vec::new).push(rev);
            }
        }
        self.children_cache = Some(children);
        Ok(())
    }

    /// Provide statistics about the current state of the discovery process
    pub fn stats(&self) -> DiscoveryStats {
        DiscoveryStats {
            undecided: self.undecided.as_ref().map(HashSet::len),
        }
    }

    pub fn take_quick_sample(
        &mut self,
        headrevs: impl IntoIterator<Item = Revision>,
        size: usize,
    ) -> Result<Vec<Revision>, GraphError> {
        self.ensure_undecided()?;
        let mut sample = {
            let undecided = self.undecided.as_ref().unwrap();
            if undecided.len() <= size {
                return Ok(undecided.iter().cloned().collect());
            }
            dagops::heads(&self.graph, undecided.iter())?
        };
        if sample.len() >= size {
            return Ok(self.limit_sample(sample.into_iter().collect(), size));
        }
        update_sample(
            None,
            headrevs,
            &mut sample,
            |r| ParentsIterator::graph_parents(&self.graph, r),
            Some(size),
        )?;
        Ok(sample.into_iter().collect())
    }

    /// Extract a sample from `self.undecided`, going from its heads and roots.
    ///
    /// The `size` parameter is used to avoid useless computations if
    /// it turns out to be bigger than the whole set of undecided Revisions.
    ///
    /// The sample is taken by using `update_sample` from the heads, then
    /// from the roots, working on the reverse DAG,
    /// expressed by `self.children_cache`.
    ///
    /// No effort is being made to complete or limit the sample to `size`
    /// but this method returns another interesting size that it derives
    /// from its knowledge of the structure of the various sets, leaving
    /// to the caller the decision to use it or not.
    fn bidirectional_sample(
        &mut self,
        size: usize,
    ) -> Result<(HashSet<Revision>, usize), GraphError> {
        self.ensure_undecided()?;
        {
            // we don't want to compute children_cache before this
            // but doing it after extracting self.undecided takes a mutable
            // ref to self while a shareable one is still active.
            let undecided = self.undecided.as_ref().unwrap();
            if undecided.len() <= size {
                return Ok((undecided.clone(), size));
            }
        }

        self.ensure_children_cache()?;
        let revs = self.undecided.as_ref().unwrap();
        let mut sample: HashSet<Revision> = revs.clone();

        // it's possible that leveraging the children cache would be more
        // efficient here
        dagops::retain_heads(&self.graph, &mut sample)?;
        let revsheads = sample.clone(); // was again heads(revs) in python

        // update from heads
        update_sample(
            Some(revs),
            revsheads.iter().cloned(),
            &mut sample,
            |r| ParentsIterator::graph_parents(&self.graph, r),
            None,
        )?;

        // update from roots
        let revroots: HashSet<Revision> =
            dagops::roots(&self.graph, revs)?.into_iter().collect();
        let prescribed_size = max(size, min(revroots.len(), revsheads.len()));

        let children = self.children_cache.as_ref().unwrap();
        let empty_vec: Vec<Revision> = Vec::new();
        update_sample(
            Some(revs),
            revroots,
            &mut sample,
            |r| Ok(children.get(&r).unwrap_or(&empty_vec).iter().cloned()),
            None,
        )?;
        Ok((sample, prescribed_size))
    }

    /// Fill up sample up to the wished size with random undecided Revisions.
    ///
    /// This is intended to be used as a last resort completion if the
    /// regular sampling algorithm returns too few elements.
    fn random_complete_sample(
        &mut self,
        sample: &mut Vec<Revision>,
        size: usize,
    ) {
        let sample_len = sample.len();
        if size <= sample_len {
            return;
        }
        let take_from: Vec<Revision> = self
            .undecided
            .as_ref()
            .unwrap()
            .iter()
            .filter(|&r| !sample.contains(r))
            .cloned()
            .collect();
        sample.extend(self.limit_sample(take_from, size - sample_len));
    }

    pub fn take_full_sample(
        &mut self,
        size: usize,
    ) -> Result<Vec<Revision>, GraphError> {
        let (sample_set, prescribed_size) = self.bidirectional_sample(size)?;
        let size = if self.respect_size {
            size
        } else {
            prescribed_size
        };
        let mut sample =
            self.limit_sample(sample_set.into_iter().collect(), size);
        self.random_complete_sample(&mut sample, size);
        Ok(sample)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::testing::SampleGraph;

    /// A PartialDiscovery as for pushing all the heads of `SampleGraph`
    ///
    /// To avoid actual randomness in these tests, we give it a fixed
    /// random seed, but by default we'll test the random version.
    fn full_disco() -> PartialDiscovery<SampleGraph> {
        PartialDiscovery::new_with_seed(
            SampleGraph,
            vec![10, 11, 12, 13],
            [0; 16],
            true,
            true,
        )
    }

    /// A PartialDiscovery as for pushing the 12 head of `SampleGraph`
    ///
    /// To avoid actual randomness in tests, we give it a fixed random seed.
    fn disco12() -> PartialDiscovery<SampleGraph> {
        PartialDiscovery::new_with_seed(
            SampleGraph,
            vec![12],
            [0; 16],
            true,
            true,
        )
    }

    fn sorted_undecided(
        disco: &PartialDiscovery<SampleGraph>,
    ) -> Vec<Revision> {
        let mut as_vec: Vec<Revision> =
            disco.undecided.as_ref().unwrap().iter().cloned().collect();
        as_vec.sort();
        as_vec
    }

    fn sorted_missing(disco: &PartialDiscovery<SampleGraph>) -> Vec<Revision> {
        let mut as_vec: Vec<Revision> =
            disco.missing.iter().cloned().collect();
        as_vec.sort();
        as_vec
    }

    fn sorted_common_heads(
        disco: &PartialDiscovery<SampleGraph>,
    ) -> Result<Vec<Revision>, GraphError> {
        let mut as_vec: Vec<Revision> =
            disco.common_heads()?.iter().cloned().collect();
        as_vec.sort();
        Ok(as_vec)
    }

    #[test]
    fn test_add_common_get_undecided() -> Result<(), GraphError> {
        let mut disco = full_disco();
        assert_eq!(disco.undecided, None);
        assert!(!disco.has_info());
        assert_eq!(disco.stats().undecided, None);

        disco.add_common_revisions(vec![11, 12])?;
        assert!(disco.has_info());
        assert!(!disco.is_complete());
        assert!(disco.missing.is_empty());

        // add_common_revisions did not trigger a premature computation
        // of `undecided`, let's check that and ask for them
        assert_eq!(disco.undecided, None);
        disco.ensure_undecided()?;
        assert_eq!(sorted_undecided(&disco), vec![5, 8, 10, 13]);
        assert_eq!(disco.stats().undecided, Some(4));
        Ok(())
    }

    /// in this test, we pretend that our peer misses exactly (8+10)::
    /// and we're comparing all our repo to it (as in a bare push)
    #[test]
    fn test_discovery() -> Result<(), GraphError> {
        let mut disco = full_disco();
        disco.add_common_revisions(vec![11, 12])?;
        disco.add_missing_revisions(vec![8, 10])?;
        assert_eq!(sorted_undecided(&disco), vec![5]);
        assert_eq!(sorted_missing(&disco), vec![8, 10, 13]);
        assert!(!disco.is_complete());

        disco.add_common_revisions(vec![5])?;
        assert_eq!(sorted_undecided(&disco), vec![]);
        assert_eq!(sorted_missing(&disco), vec![8, 10, 13]);
        assert!(disco.is_complete());
        assert_eq!(sorted_common_heads(&disco)?, vec![5, 11, 12]);
        Ok(())
    }

    #[test]
    fn test_add_missing_early_continue() -> Result<(), GraphError> {
        eprintln!("test_add_missing_early_stop");
        let mut disco = full_disco();
        disco.add_common_revisions(vec![13, 3, 4])?;
        disco.ensure_children_cache()?;
        // 12 is grand-child of 6 through 9
        // passing them in this order maximizes the chances of the
        // early continue to do the wrong thing
        disco.add_missing_revisions(vec![6, 9, 12])?;
        assert_eq!(sorted_undecided(&disco), vec![5, 7, 10, 11]);
        assert_eq!(sorted_missing(&disco), vec![6, 9, 12]);
        assert!(!disco.is_complete());
        Ok(())
    }

    #[test]
    fn test_limit_sample_no_need_to() {
        let sample = vec![1, 2, 3, 4];
        assert_eq!(full_disco().limit_sample(sample, 10), vec![1, 2, 3, 4]);
    }

    #[test]
    fn test_limit_sample_less_than_half() {
        assert_eq!(full_disco().limit_sample((1..6).collect(), 2), vec![2, 5]);
    }

    #[test]
    fn test_limit_sample_more_than_half() {
        assert_eq!(full_disco().limit_sample((1..4).collect(), 2), vec![1, 2]);
    }

    #[test]
    fn test_limit_sample_no_random() {
        let mut disco = full_disco();
        disco.randomize = false;
        assert_eq!(
            disco.limit_sample(vec![1, 8, 13, 5, 7, 3], 4),
            vec![1, 3, 5, 7]
        );
    }

    #[test]
    fn test_quick_sample_enough_undecided_heads() -> Result<(), GraphError> {
        let mut disco = full_disco();
        disco.undecided = Some((1..=13).collect());

        let mut sample_vec = disco.take_quick_sample(vec![], 4)?;
        sample_vec.sort();
        assert_eq!(sample_vec, vec![10, 11, 12, 13]);
        Ok(())
    }

    #[test]
    fn test_quick_sample_climbing_from_12() -> Result<(), GraphError> {
        let mut disco = disco12();
        disco.ensure_undecided()?;

        let mut sample_vec = disco.take_quick_sample(vec![12], 4)?;
        sample_vec.sort();
        // r12's only parent is r9, whose unique grand-parent through the
        // diamond shape is r4. This ends there because the distance from r4
        // to the root is only 3.
        assert_eq!(sample_vec, vec![4, 9, 12]);
        Ok(())
    }

    #[test]
    fn test_children_cache() -> Result<(), GraphError> {
        let mut disco = full_disco();
        disco.ensure_children_cache()?;

        let cache = disco.children_cache.unwrap();
        assert_eq!(cache.get(&2).cloned(), Some(vec![4]));
        assert_eq!(cache.get(&10).cloned(), None);

        let mut children_4 = cache.get(&4).cloned().unwrap();
        children_4.sort();
        assert_eq!(children_4, vec![5, 6, 7]);

        let mut children_7 = cache.get(&7).cloned().unwrap();
        children_7.sort();
        assert_eq!(children_7, vec![9, 11]);

        Ok(())
    }

    #[test]
    fn test_complete_sample() {
        let mut disco = full_disco();
        let undecided: HashSet<Revision> =
            [4, 7, 9, 2, 3].iter().cloned().collect();
        disco.undecided = Some(undecided);

        let mut sample = vec![0];
        disco.random_complete_sample(&mut sample, 3);
        assert_eq!(sample.len(), 3);

        let mut sample = vec![2, 4, 7];
        disco.random_complete_sample(&mut sample, 1);
        assert_eq!(sample.len(), 3);
    }

    #[test]
    fn test_bidirectional_sample() -> Result<(), GraphError> {
        let mut disco = full_disco();
        disco.undecided = Some((0..=13).into_iter().collect());

        let (sample_set, size) = disco.bidirectional_sample(7)?;
        assert_eq!(size, 7);
        let mut sample: Vec<Revision> = sample_set.into_iter().collect();
        sample.sort();
        // our DAG is a bit too small for the results to be really interesting
        // at least it shows that
        // - we went both ways
        // - we didn't take all Revisions (6 is not in the sample)
        assert_eq!(sample, vec![0, 1, 2, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13]);
        Ok(())
    }
}