view rust/hg-core/src/revlog/inner_revlog.rs @ 52298:645d247d4c75

rust-vfs: rename `open` to `open_write` and `open_read` to `open` `open` being read *and* write is surprising because it differs from the Rust stdlib where `std::fs::File::open` is read-only by default. More importantly, writing is more dangerous than reading, so let's make it more explicit.
author Raphaël Gomès <rgomes@octobus.net>
date Tue, 29 Oct 2024 12:03:55 +0100
parents f90796d33aa0
children 4d0c0c255425
line wrap: on
line source

//! A layer of lower-level revlog functionality to encapsulate most of the
//! IO work and expensive operations.
use std::{
    borrow::Cow,
    cell::RefCell,
    io::{ErrorKind, Seek, SeekFrom, Write},
    ops::Deref,
    path::PathBuf,
    sync::{Arc, Mutex},
};

use schnellru::{ByMemoryUsage, LruMap};
use sha1::{Digest, Sha1};

use crate::{
    errors::{HgError, IoResultExt},
    exit_codes,
    transaction::Transaction,
    vfs::Vfs,
};

use super::{
    compression::{
        uncompressed_zstd_data, CompressionConfig, Compressor, NoneCompressor,
        ZlibCompressor, ZstdCompressor, ZLIB_BYTE, ZSTD_BYTE,
    },
    file_io::{DelayedBuffer, FileHandle, RandomAccessFile, WriteHandles},
    index::{Index, IndexHeader, INDEX_ENTRY_SIZE},
    node::{NODE_BYTES_LENGTH, NULL_NODE},
    options::{RevlogDataConfig, RevlogDeltaConfig, RevlogFeatureConfig},
    BaseRevision, Node, Revision, RevlogEntry, RevlogError, RevlogIndex,
    UncheckedRevision, NULL_REVISION, NULL_REVLOG_ENTRY_FLAGS,
};

/// Matches the `_InnerRevlog` class in the Python code, as an arbitrary
/// boundary to incrementally rewrite higher-level revlog functionality in
/// Rust.
pub struct InnerRevlog {
    /// When index and data are not interleaved: bytes of the revlog index.
    /// When index and data are interleaved (inline revlog): bytes of the
    /// revlog index and data.
    pub index: Index,
    /// The store vfs that is used to interact with the filesystem
    vfs: Box<dyn Vfs>,
    /// The index file path, relative to the vfs root
    pub index_file: PathBuf,
    /// The data file path, relative to the vfs root (same as `index_file`
    /// if inline)
    data_file: PathBuf,
    /// Data config that applies to this revlog
    data_config: RevlogDataConfig,
    /// Delta config that applies to this revlog
    delta_config: RevlogDeltaConfig,
    /// Feature config that applies to this revlog
    feature_config: RevlogFeatureConfig,
    /// A view into this revlog's data file
    segment_file: RandomAccessFile,
    /// A cache of uncompressed chunks that have previously been restored.
    /// Its eviction policy is defined in [`Self::new`].
    uncompressed_chunk_cache: Option<UncompressedChunkCache>,
    /// Used to keep track of the actual target during diverted writes
    /// for the changelog
    original_index_file: Option<PathBuf>,
    /// Write handles to the index and data files
    /// XXX why duplicate from `index` and `segment_file`?
    writing_handles: Option<WriteHandles>,
    /// See [`DelayedBuffer`].
    delayed_buffer: Option<Arc<Mutex<DelayedBuffer>>>,
    /// Whether this revlog is inline. XXX why duplicate from `index`?
    pub inline: bool,
    /// A cache of the last revision, which is usually accessed multiple
    /// times.
    pub last_revision_cache: Mutex<Option<SingleRevisionCache>>,
}

impl InnerRevlog {
    pub fn new(
        vfs: Box<dyn Vfs>,
        index: Index,
        index_file: PathBuf,
        data_file: PathBuf,
        data_config: RevlogDataConfig,
        delta_config: RevlogDeltaConfig,
        feature_config: RevlogFeatureConfig,
    ) -> Self {
        assert!(index_file.is_relative());
        assert!(data_file.is_relative());
        let segment_file = RandomAccessFile::new(
            dyn_clone::clone_box(&*vfs),
            if index.is_inline() {
                index_file.to_owned()
            } else {
                data_file.to_owned()
            },
        );

        let uncompressed_chunk_cache =
            data_config.uncompressed_cache_factor.map(
                // Arbitrary initial value
                // TODO check if using a hasher specific to integers is useful
                |_factor| RefCell::new(LruMap::with_memory_budget(65536)),
            );

        let inline = index.is_inline();
        Self {
            index,
            vfs,
            index_file,
            data_file,
            data_config,
            delta_config,
            feature_config,
            segment_file,
            uncompressed_chunk_cache,
            original_index_file: None,
            writing_handles: None,
            delayed_buffer: None,
            inline,
            last_revision_cache: Mutex::new(None),
        }
    }

    /// Return number of entries of the revlog index
    pub fn len(&self) -> usize {
        self.index.len()
    }

    /// Return `true` if this revlog has no entries
    pub fn is_empty(&self) -> bool {
        self.len() == 0
    }

    /// Return whether this revlog is inline (mixed index and data)
    pub fn is_inline(&self) -> bool {
        self.inline
    }

    /// Clear all caches from this revlog
    pub fn clear_cache(&mut self) {
        assert!(!self.is_delaying());
        if let Some(cache) = self.uncompressed_chunk_cache.as_ref() {
            // We don't clear the allocation here because it's probably faster.
            // We could change our minds later if this ends up being a problem
            // with regards to memory consumption.
            cache.borrow_mut().clear();
        }
    }

    /// Return an entry for the null revision
    pub fn make_null_entry(&self) -> RevlogEntry {
        RevlogEntry {
            revlog: self,
            rev: NULL_REVISION,
            uncompressed_len: 0,
            p1: NULL_REVISION,
            p2: NULL_REVISION,
            flags: NULL_REVLOG_ENTRY_FLAGS,
            hash: NULL_NODE,
        }
    }

    /// Return the [`RevlogEntry`] for a [`Revision`] that is known to exist
    pub fn get_entry(
        &self,
        rev: Revision,
    ) -> Result<RevlogEntry, RevlogError> {
        if rev == NULL_REVISION {
            return Ok(self.make_null_entry());
        }
        let index_entry = self
            .index
            .get_entry(rev)
            .ok_or_else(|| RevlogError::InvalidRevision(rev.to_string()))?;
        let p1 =
            self.index.check_revision(index_entry.p1()).ok_or_else(|| {
                RevlogError::corrupted(format!(
                    "p1 for rev {} is invalid",
                    rev
                ))
            })?;
        let p2 =
            self.index.check_revision(index_entry.p2()).ok_or_else(|| {
                RevlogError::corrupted(format!(
                    "p2 for rev {} is invalid",
                    rev
                ))
            })?;
        let entry = RevlogEntry {
            revlog: self,
            rev,
            uncompressed_len: index_entry.uncompressed_len(),
            p1,
            p2,
            flags: index_entry.flags(),
            hash: *index_entry.hash(),
        };
        Ok(entry)
    }

    /// Return the [`RevlogEntry`] for `rev`. If `rev` fails to check, this
    /// returns a [`RevlogError`].
    pub fn get_entry_for_unchecked_rev(
        &self,
        rev: UncheckedRevision,
    ) -> Result<RevlogEntry, RevlogError> {
        if rev == NULL_REVISION.into() {
            return Ok(self.make_null_entry());
        }
        let rev = self.index.check_revision(rev).ok_or_else(|| {
            RevlogError::corrupted(format!("rev {} is invalid", rev))
        })?;
        self.get_entry(rev)
    }

    /// Is the revlog currently delaying the visibility of written data?
    ///
    /// The delaying mechanism can be either in-memory or written on disk in a
    /// side-file.
    pub fn is_delaying(&self) -> bool {
        self.delayed_buffer.is_some() || self.original_index_file.is_some()
    }

    /// The offset of the data chunk for this revision
    #[inline(always)]
    pub fn start(&self, rev: Revision) -> usize {
        self.index.start(
            rev,
            &self
                .index
                .get_entry(rev)
                .unwrap_or_else(|| self.index.make_null_entry()),
        )
    }

    /// The length of the data chunk for this revision
    /// TODO rename this method and others to more explicit names than the
    /// existing ones that were copied over from Python
    #[inline(always)]
    pub fn length(&self, rev: Revision) -> usize {
        self.index
            .get_entry(rev)
            .unwrap_or_else(|| self.index.make_null_entry())
            .compressed_len() as usize
    }

    /// The end of the data chunk for this revision
    #[inline(always)]
    pub fn end(&self, rev: Revision) -> usize {
        self.start(rev) + self.length(rev)
    }

    /// Return the delta parent of the given revision
    pub fn delta_parent(&self, rev: Revision) -> Revision {
        let base = self
            .index
            .get_entry(rev)
            .unwrap()
            .base_revision_or_base_of_delta_chain();
        if base.0 == rev.0 {
            NULL_REVISION
        } else if self.delta_config.general_delta {
            Revision(base.0)
        } else {
            Revision(rev.0 - 1)
        }
    }

    /// Return whether `rev` points to a snapshot revision (i.e. does not have
    /// a delta base).
    pub fn is_snapshot(&self, rev: Revision) -> Result<bool, RevlogError> {
        if !self.delta_config.sparse_revlog {
            return Ok(self.delta_parent(rev) == NULL_REVISION);
        }
        self.index.is_snapshot_unchecked(rev)
    }

    /// Return the delta chain for `rev` according to this revlog's config.
    /// See [`Index::delta_chain`] for more information.
    pub fn delta_chain(
        &self,
        rev: Revision,
        stop_rev: Option<Revision>,
    ) -> Result<(Vec<Revision>, bool), HgError> {
        self.index.delta_chain(
            rev,
            stop_rev,
            self.delta_config.general_delta.into(),
        )
    }

    fn compressor(&self) -> Result<Box<dyn Compressor>, HgError> {
        // TODO cache the compressor?
        Ok(match self.feature_config.compression_engine {
            CompressionConfig::Zlib { level } => {
                Box::new(ZlibCompressor::new(level))
            }
            CompressionConfig::Zstd { level, threads } => {
                Box::new(ZstdCompressor::new(level, threads))
            }
            CompressionConfig::None => Box::new(NoneCompressor),
        })
    }

    /// Generate a possibly-compressed representation of data.
    /// Returns `None` if the data was not compressed.
    pub fn compress<'data>(
        &self,
        data: &'data [u8],
    ) -> Result<Option<Cow<'data, [u8]>>, RevlogError> {
        if data.is_empty() {
            return Ok(Some(data.into()));
        }
        let res = self.compressor()?.compress(data)?;
        if let Some(compressed) = res {
            // The revlog compressor added the header in the returned data.
            return Ok(Some(compressed.into()));
        }

        if data[0] == b'\0' {
            return Ok(Some(data.into()));
        }
        Ok(None)
    }

    /// Decompress a revlog chunk.
    ///
    /// The chunk is expected to begin with a header identifying the
    /// format type so it can be routed to an appropriate decompressor.
    pub fn decompress<'a>(
        &'a self,
        data: &'a [u8],
    ) -> Result<Cow<[u8]>, RevlogError> {
        if data.is_empty() {
            return Ok(data.into());
        }

        // Revlogs are read much more frequently than they are written and many
        // chunks only take microseconds to decompress, so performance is
        // important here.

        let header = data[0];
        match header {
            // Settings don't matter as they only affect compression
            ZLIB_BYTE => Ok(ZlibCompressor::new(0).decompress(data)?.into()),
            // Settings don't matter as they only affect compression
            ZSTD_BYTE => {
                Ok(ZstdCompressor::new(0, 0).decompress(data)?.into())
            }
            b'\0' => Ok(data.into()),
            b'u' => Ok((&data[1..]).into()),
            other => Err(HgError::UnsupportedFeature(format!(
                "unknown compression header '{}'",
                other
            ))
            .into()),
        }
    }

    /// Obtain a segment of raw data corresponding to a range of revisions.
    ///
    /// Requests for data may be satisfied by a cache.
    ///
    /// Returns a 2-tuple of (offset, data) for the requested range of
    /// revisions. Offset is the integer offset from the beginning of the
    /// revlog and data is a slice of the raw byte data.
    ///
    /// Callers will need to call `self.start(rev)` and `self.length(rev)`
    /// to determine where each revision's data begins and ends.
    pub fn get_segment_for_revs(
        &self,
        start_rev: Revision,
        end_rev: Revision,
    ) -> Result<(usize, Vec<u8>), HgError> {
        let start = if start_rev == NULL_REVISION {
            0
        } else {
            let start_entry = self
                .index
                .get_entry(start_rev)
                .expect("null revision segment");
            self.index.start(start_rev, &start_entry)
        };
        let end_entry = self
            .index
            .get_entry(end_rev)
            .expect("null revision segment");
        let end = self.index.start(end_rev, &end_entry) + self.length(end_rev);

        let length = end - start;

        // XXX should we use mmap instead of doing this for platforms that
        // support madvise/populate?
        Ok((start, self.segment_file.read_chunk(start, length)?))
    }

    /// Return the uncompressed raw data for `rev`
    pub fn chunk_for_rev(&self, rev: Revision) -> Result<Arc<[u8]>, HgError> {
        if let Some(cache) = self.uncompressed_chunk_cache.as_ref() {
            if let Some(chunk) = cache.borrow_mut().get(&rev) {
                return Ok(chunk.clone());
            }
        }
        // TODO revlogv2 should check the compression mode
        let data = self.get_segment_for_revs(rev, rev)?.1;
        let uncompressed = self.decompress(&data).map_err(|e| {
            HgError::abort(
                format!("revlog decompression error: {}", e),
                exit_codes::ABORT,
                None,
            )
        })?;
        let uncompressed: Arc<[u8]> = Arc::from(uncompressed.into_owned());
        if let Some(cache) = self.uncompressed_chunk_cache.as_ref() {
            cache.borrow_mut().insert(rev, uncompressed.clone());
        }
        Ok(uncompressed)
    }

    /// Execute `func` within a read context for the data file, meaning that
    /// the read handle will be taken and discarded after the operation.
    pub fn with_read<R>(
        &self,
        func: impl FnOnce() -> Result<R, RevlogError>,
    ) -> Result<R, RevlogError> {
        self.enter_reading_context()?;
        let res = func();
        self.exit_reading_context();
        res.map_err(Into::into)
    }

    /// `pub` only for use in hg-cpython
    #[doc(hidden)]
    pub fn enter_reading_context(&self) -> Result<(), HgError> {
        if self.is_empty() {
            // Nothing to be read
            return Ok(());
        }
        if self.delayed_buffer.is_some() && self.is_inline() {
            return Err(HgError::abort(
                "revlog with delayed write should not be inline",
                exit_codes::ABORT,
                None,
            ));
        }
        self.segment_file.get_read_handle()?;
        Ok(())
    }

    /// `pub` only for use in hg-cpython
    #[doc(hidden)]
    pub fn exit_reading_context(&self) {
        self.segment_file.exit_reading_context()
    }

    /// Fill the buffer returned by `get_buffer` with the possibly un-validated
    /// raw text for a revision. It can be already validated if it comes
    /// from the cache.
    pub fn raw_text<G, T>(
        &self,
        rev: Revision,
        get_buffer: G,
    ) -> Result<(), RevlogError>
    where
        G: FnOnce(
            usize,
            &mut dyn FnMut(
                &mut dyn RevisionBuffer<Target = T>,
            ) -> Result<(), RevlogError>,
        ) -> Result<(), RevlogError>,
    {
        let entry = &self.get_entry(rev)?;
        let raw_size = entry.uncompressed_len();
        let mut mutex_guard = self
            .last_revision_cache
            .lock()
            .expect("lock should not be held");
        let cached_rev = if let Some((_node, rev, data)) = &*mutex_guard {
            Some((*rev, data.deref().as_ref()))
        } else {
            None
        };
        if let Some(cache) = &self.uncompressed_chunk_cache {
            let cache = &mut cache.borrow_mut();
            if let Some(size) = raw_size {
                // Dynamically update the uncompressed_chunk_cache size to the
                // largest revision we've seen in this revlog.
                // Do it *before* restoration in case the current revision
                // is the largest.
                let factor = self
                    .data_config
                    .uncompressed_cache_factor
                    .expect("cache should not exist without factor");
                let candidate_size = (size as f64 * factor) as usize;
                let limiter_mut = cache.limiter_mut();
                if candidate_size > limiter_mut.max_memory_usage() {
                    std::mem::swap(
                        limiter_mut,
                        &mut ByMemoryUsage::new(candidate_size),
                    );
                }
            }
        }
        entry.rawdata(cached_rev, get_buffer)?;
        // drop cache to save memory, the caller is expected to update
        // the revision cache after validating the text
        mutex_guard.take();
        Ok(())
    }

    /// Only `pub` for `hg-cpython`.
    /// Obtain decompressed raw data for the specified revisions that are
    /// assumed to be in ascending order.
    ///
    /// Returns a list with decompressed data for each requested revision.
    #[doc(hidden)]
    pub fn chunks(
        &self,
        revs: Vec<Revision>,
        target_size: Option<u64>,
    ) -> Result<Vec<Arc<[u8]>>, RevlogError> {
        if revs.is_empty() {
            return Ok(vec![]);
        }
        let mut fetched_revs = vec![];
        let mut chunks = Vec::with_capacity(revs.len());

        match self.uncompressed_chunk_cache.as_ref() {
            Some(cache) => {
                let mut cache = cache.borrow_mut();
                for rev in revs.iter() {
                    match cache.get(rev) {
                        Some(hit) => chunks.push((*rev, hit.to_owned())),
                        None => fetched_revs.push(*rev),
                    }
                }
            }
            None => fetched_revs = revs,
        }

        let already_cached = chunks.len();

        let sliced_chunks = if fetched_revs.is_empty() {
            vec![]
        } else if !self.data_config.with_sparse_read || self.is_inline() {
            vec![fetched_revs]
        } else {
            self.slice_chunk(&fetched_revs, target_size)?
        };

        self.with_read(|| {
            for revs_chunk in sliced_chunks {
                let first_rev = revs_chunk[0];
                // Skip trailing revisions with empty diff
                let last_rev_idx = revs_chunk
                    .iter()
                    .rposition(|r| self.length(*r) != 0)
                    .unwrap_or(revs_chunk.len() - 1);

                let last_rev = revs_chunk[last_rev_idx];

                let (offset, data) =
                    self.get_segment_for_revs(first_rev, last_rev)?;

                let revs_chunk = &revs_chunk[..=last_rev_idx];

                for rev in revs_chunk {
                    let chunk_start = self.start(*rev);
                    let chunk_length = self.length(*rev);
                    // TODO revlogv2 should check the compression mode
                    let bytes = &data[chunk_start - offset..][..chunk_length];
                    let chunk = if !bytes.is_empty() && bytes[0] == ZSTD_BYTE {
                        // If we're using `zstd`, we want to try a more
                        // specialized decompression
                        let entry = self.index.get_entry(*rev).unwrap();
                        let is_delta = entry
                            .base_revision_or_base_of_delta_chain()
                            != (*rev).into();
                        let uncompressed = uncompressed_zstd_data(
                            bytes,
                            is_delta,
                            entry.uncompressed_len(),
                        )?;
                        Cow::Owned(uncompressed)
                    } else {
                        // Otherwise just fallback to generic decompression.
                        self.decompress(bytes)?
                    };

                    chunks.push((*rev, chunk.into()));
                }
            }
            Ok(())
        })?;

        if let Some(cache) = self.uncompressed_chunk_cache.as_ref() {
            let mut cache = cache.borrow_mut();
            for (rev, chunk) in chunks.iter().skip(already_cached) {
                cache.insert(*rev, chunk.clone());
            }
        }
        // Use stable sort here since it's *mostly* sorted
        chunks.sort_by(|a, b| a.0.cmp(&b.0));
        Ok(chunks.into_iter().map(|(_r, chunk)| chunk).collect())
    }

    /// Slice revs to reduce the amount of unrelated data to be read from disk.
    ///
    /// ``revs`` is sliced into groups that should be read in one time.
    /// Assume that revs are sorted.
    ///
    /// The initial chunk is sliced until the overall density
    /// (payload/chunks-span ratio) is above
    /// `revlog.data_config.sr_density_threshold`.
    /// No gap smaller than `revlog.data_config.sr_min_gap_size` is skipped.
    ///
    /// If `target_size` is set, no chunk larger than `target_size`
    /// will be returned.
    /// For consistency with other slicing choices, this limit won't go lower
    /// than `revlog.data_config.sr_min_gap_size`.
    ///
    /// If individual revision chunks are larger than this limit, they will
    /// still be raised individually.
    pub fn slice_chunk(
        &self,
        revs: &[Revision],
        target_size: Option<u64>,
    ) -> Result<Vec<Vec<Revision>>, RevlogError> {
        let target_size =
            target_size.map(|size| size.max(self.data_config.sr_min_gap_size));

        let target_density = self.data_config.sr_density_threshold;
        let min_gap_size = self.data_config.sr_min_gap_size as usize;
        let to_density = self.index.slice_chunk_to_density(
            revs,
            target_density,
            min_gap_size,
        );

        let mut sliced = vec![];

        for chunk in to_density {
            sliced.extend(
                self.slice_chunk_to_size(&chunk, target_size)?
                    .into_iter()
                    .map(ToOwned::to_owned),
            );
        }

        Ok(sliced)
    }

    /// Slice revs to match the target size
    ///
    /// This is intended to be used on chunks that density slicing selected,
    /// but that are still too large compared to the read guarantee of revlogs.
    /// This might happen when the "minimal gap size" interrupted the slicing
    /// or when chains are built in a way that create large blocks next to
    /// each other.
    fn slice_chunk_to_size<'a>(
        &self,
        revs: &'a [Revision],
        target_size: Option<u64>,
    ) -> Result<Vec<&'a [Revision]>, RevlogError> {
        let mut start_data = self.start(revs[0]);
        let end_data = self.end(revs[revs.len() - 1]);
        let full_span = end_data - start_data;

        let nothing_to_do = target_size
            .map(|size| full_span <= size as usize)
            .unwrap_or(true);

        if nothing_to_do {
            return Ok(vec![revs]);
        }
        let target_size = target_size.expect("target_size is set") as usize;

        let mut start_rev_idx = 0;
        let mut end_rev_idx = 1;
        let mut chunks = vec![];

        for (idx, rev) in revs.iter().enumerate().skip(1) {
            let span = self.end(*rev) - start_data;
            let is_snapshot = self.is_snapshot(*rev)?;
            if span <= target_size && is_snapshot {
                end_rev_idx = idx + 1;
            } else {
                let chunk =
                    self.trim_chunk(revs, start_rev_idx, Some(end_rev_idx));
                if !chunk.is_empty() {
                    chunks.push(chunk);
                }
                start_rev_idx = idx;
                start_data = self.start(*rev);
                end_rev_idx = idx + 1;
            }
            if !is_snapshot {
                break;
            }
        }

        // For the others, we use binary slicing to quickly converge towards
        // valid chunks (otherwise, we might end up looking for the start/end
        // of many revisions). This logic is not looking for the perfect
        // slicing point, it quickly converges towards valid chunks.
        let number_of_items = revs.len();

        while (end_data - start_data) > target_size {
            end_rev_idx = number_of_items;
            if number_of_items - start_rev_idx <= 1 {
                // Protect against individual chunks larger than the limit
                break;
            }
            let mut local_end_data = self.end(revs[end_rev_idx - 1]);
            let mut span = local_end_data - start_data;
            while span > target_size {
                if end_rev_idx - start_rev_idx <= 1 {
                    // Protect against individual chunks larger than the limit
                    break;
                }
                end_rev_idx -= (end_rev_idx - start_rev_idx) / 2;
                local_end_data = self.end(revs[end_rev_idx - 1]);
                span = local_end_data - start_data;
            }
            let chunk =
                self.trim_chunk(revs, start_rev_idx, Some(end_rev_idx));
            if !chunk.is_empty() {
                chunks.push(chunk);
            }
            start_rev_idx = end_rev_idx;
            start_data = self.start(revs[start_rev_idx]);
        }

        let chunk = self.trim_chunk(revs, start_rev_idx, None);
        if !chunk.is_empty() {
            chunks.push(chunk);
        }

        Ok(chunks)
    }

    /// Returns `revs[startidx..endidx]` without empty trailing revs
    fn trim_chunk<'a>(
        &self,
        revs: &'a [Revision],
        start_rev_idx: usize,
        end_rev_idx: Option<usize>,
    ) -> &'a [Revision] {
        let mut end_rev_idx = end_rev_idx.unwrap_or(revs.len());

        // If we have a non-empty delta candidate, there is nothing to trim
        if revs[end_rev_idx - 1].0 < self.len() as BaseRevision {
            // Trim empty revs at the end, except the very first rev of a chain
            while end_rev_idx > 1
                && end_rev_idx > start_rev_idx
                && self.length(revs[end_rev_idx - 1]) == 0
            {
                end_rev_idx -= 1
            }
        }

        &revs[start_rev_idx..end_rev_idx]
    }

    /// Check the hash of some given data against the recorded hash.
    pub fn check_hash(
        &self,
        p1: Revision,
        p2: Revision,
        expected: &[u8],
        data: &[u8],
    ) -> bool {
        let e1 = self.index.get_entry(p1);
        let h1 = match e1 {
            Some(ref entry) => entry.hash(),
            None => &NULL_NODE,
        };
        let e2 = self.index.get_entry(p2);
        let h2 = match e2 {
            Some(ref entry) => entry.hash(),
            None => &NULL_NODE,
        };

        hash(data, h1.as_bytes(), h2.as_bytes()) == expected
    }

    /// Returns whether we are currently in a [`Self::with_write`] context
    pub fn is_writing(&self) -> bool {
        self.writing_handles.is_some()
    }

    /// Open the revlog files for writing
    ///
    /// Adding content to a revlog should be done within this context.
    /// TODO try using `BufRead` and `BufWrite` and see if performance improves
    pub fn with_write<R>(
        &mut self,
        transaction: &mut impl Transaction,
        data_end: Option<usize>,
        func: impl FnOnce() -> R,
    ) -> Result<R, HgError> {
        if self.is_writing() {
            return Ok(func());
        }
        self.enter_writing_context(data_end, transaction)
            .inspect_err(|_| {
                self.exit_writing_context();
            })?;
        let res = func();
        self.exit_writing_context();
        Ok(res)
    }

    /// `pub` only for use in hg-cpython
    #[doc(hidden)]
    pub fn exit_writing_context(&mut self) {
        self.writing_handles.take();
        self.segment_file.writing_handle.take();
        self.segment_file.reading_handle.take();
    }

    /// `pub` only for use in hg-cpython
    #[doc(hidden)]
    pub fn python_writing_handles(&self) -> Option<&WriteHandles> {
        self.writing_handles.as_ref()
    }

    /// `pub` only for use in hg-cpython
    #[doc(hidden)]
    pub fn enter_writing_context(
        &mut self,
        data_end: Option<usize>,
        transaction: &mut impl Transaction,
    ) -> Result<(), HgError> {
        let data_size = if self.is_empty() {
            0
        } else {
            self.end(Revision((self.len() - 1) as BaseRevision))
        };
        let data_handle = if !self.is_inline() {
            let data_handle = match self.vfs.open_write(&self.data_file) {
                Ok(mut f) => {
                    if let Some(end) = data_end {
                        f.seek(SeekFrom::Start(end as u64))
                            .when_reading_file(&self.data_file)?;
                    } else {
                        f.seek(SeekFrom::End(0))
                            .when_reading_file(&self.data_file)?;
                    }
                    f
                }
                Err(e) => match e {
                    HgError::IoError { error, context } => {
                        if error.kind() != ErrorKind::NotFound {
                            return Err(HgError::IoError { error, context });
                        }
                        self.vfs.create(&self.data_file, true)?
                    }
                    e => return Err(e),
                },
            };
            transaction.add(&self.data_file, data_size);
            Some(FileHandle::from_file(
                data_handle,
                dyn_clone::clone_box(&*self.vfs),
                &self.data_file,
            ))
        } else {
            None
        };
        let index_size = self.len() * INDEX_ENTRY_SIZE;
        let index_handle = self.index_write_handle()?;
        if self.is_inline() {
            transaction.add(&self.index_file, data_size);
        } else {
            transaction.add(&self.index_file, index_size);
        }
        self.writing_handles = Some(WriteHandles {
            index_handle: index_handle.clone(),
            data_handle: data_handle.clone(),
        });
        *self.segment_file.reading_handle.borrow_mut() = if self.is_inline() {
            Some(index_handle)
        } else {
            data_handle
        };
        Ok(())
    }

    /// Get a write handle to the index, sought to the end of its data.
    fn index_write_handle(&self) -> Result<FileHandle, HgError> {
        let res = if self.delayed_buffer.is_none() {
            if self.data_config.check_ambig {
                self.vfs.open_check_ambig(&self.index_file)
            } else {
                self.vfs.open_write(&self.index_file)
            }
        } else {
            self.vfs.open_write(&self.index_file)
        };
        match res {
            Ok(mut handle) => {
                handle
                    .seek(SeekFrom::End(0))
                    .when_reading_file(&self.index_file)?;
                Ok(
                    if let Some(delayed_buffer) = self.delayed_buffer.as_ref()
                    {
                        FileHandle::from_file_delayed(
                            handle,
                            dyn_clone::clone_box(&*self.vfs),
                            &self.index_file,
                            delayed_buffer.clone(),
                        )?
                    } else {
                        FileHandle::from_file(
                            handle,
                            dyn_clone::clone_box(&*self.vfs),
                            &self.index_file,
                        )
                    },
                )
            }
            Err(e) => match e {
                HgError::IoError { error, context } => {
                    if error.kind() != ErrorKind::NotFound {
                        return Err(HgError::IoError { error, context });
                    };
                    if let Some(delayed_buffer) = self.delayed_buffer.as_ref()
                    {
                        FileHandle::new_delayed(
                            dyn_clone::clone_box(&*self.vfs),
                            &self.index_file,
                            true,
                            delayed_buffer.clone(),
                        )
                    } else {
                        FileHandle::new(
                            dyn_clone::clone_box(&*self.vfs),
                            &self.index_file,
                            true,
                            true,
                        )
                    }
                }
                e => Err(e),
            },
        }
    }

    /// Split the data of an inline revlog into an index and a data file
    pub fn split_inline(
        &mut self,
        header: IndexHeader,
        new_index_file_path: Option<PathBuf>,
    ) -> Result<PathBuf, RevlogError> {
        assert!(self.delayed_buffer.is_none());
        let existing_handles = self.writing_handles.is_some();
        if let Some(handles) = &mut self.writing_handles {
            handles.index_handle.flush()?;
            self.writing_handles.take();
            self.segment_file.writing_handle.take();
        }
        let mut new_data_file_handle =
            self.vfs.create(&self.data_file, true)?;
        // Drop any potential data, possibly redundant with the VFS impl.
        new_data_file_handle
            .set_len(0)
            .when_writing_file(&self.data_file)?;

        self.with_read(|| -> Result<(), RevlogError> {
            for r in 0..self.index.len() {
                let rev = Revision(r as BaseRevision);
                let rev_segment = self.get_segment_for_revs(rev, rev)?.1;
                new_data_file_handle
                    .write_all(&rev_segment)
                    .when_writing_file(&self.data_file)?;
            }
            new_data_file_handle
                .flush()
                .when_writing_file(&self.data_file)?;
            Ok(())
        })?;

        if let Some(index_path) = new_index_file_path {
            self.index_file = index_path
        }

        let mut new_index_handle = self.vfs.create(&self.index_file, true)?;
        let mut new_data = Vec::with_capacity(self.len() * INDEX_ENTRY_SIZE);
        for r in 0..self.len() {
            let rev = Revision(r as BaseRevision);
            let entry = self.index.entry_binary(rev).unwrap_or_else(|| {
                panic!(
                    "entry {} should exist in {}",
                    r,
                    self.index_file.display()
                )
            });
            if r == 0 {
                new_data.extend(header.header_bytes);
            }
            new_data.extend(entry);
        }
        new_index_handle
            .write_all(&new_data)
            .when_writing_file(&self.index_file)?;
        // Replace the index with a new one because the buffer contains inline
        // data
        self.index = Index::new(Box::new(new_data), header)?;
        self.inline = false;

        self.segment_file = RandomAccessFile::new(
            dyn_clone::clone_box(&*self.vfs),
            self.data_file.to_owned(),
        );
        if existing_handles {
            // Switched from inline to conventional, reopen the index
            let new_data_handle = Some(FileHandle::from_file(
                new_data_file_handle,
                dyn_clone::clone_box(&*self.vfs),
                &self.data_file,
            ));
            self.writing_handles = Some(WriteHandles {
                index_handle: self.index_write_handle()?,
                data_handle: new_data_handle.clone(),
            });
            *self.segment_file.writing_handle.borrow_mut() = new_data_handle;
        }

        Ok(self.index_file.to_owned())
    }

    /// Write a new entry to this revlog.
    /// - `entry` is the index bytes
    /// - `header_and_data` is the compression header and the revision data
    /// - `offset` is the position in the data file to write to
    /// - `index_end` is the overwritten position in the index in revlog-v2,
    ///   since the format may allow a rewrite of garbage data at the end.
    /// - `data_end` is the overwritten position in the data-file in revlog-v2,
    ///   since the format may allow a rewrite of garbage data at the end.
    ///
    /// XXX Why do we have `data_end` *and* `offset`? Same question in Python
    pub fn write_entry(
        &mut self,
        mut transaction: impl Transaction,
        entry: &[u8],
        header_and_data: (&[u8], &[u8]),
        mut offset: usize,
        index_end: Option<u64>,
        data_end: Option<u64>,
    ) -> Result<(u64, Option<u64>), HgError> {
        let current_revision = self.len() - 1;
        let canonical_index_file = self.canonical_index_file();

        let is_inline = self.is_inline();
        let handles = match &mut self.writing_handles {
            None => {
                return Err(HgError::abort(
                    "adding revision outside of the `with_write` context",
                    exit_codes::ABORT,
                    None,
                ));
            }
            Some(handles) => handles,
        };
        let index_handle = &mut handles.index_handle;
        let data_handle = &mut handles.data_handle;
        if let Some(end) = index_end {
            index_handle
                .seek(SeekFrom::Start(end))
                .when_reading_file(&self.index_file)?;
        } else {
            index_handle
                .seek(SeekFrom::End(0))
                .when_reading_file(&self.index_file)?;
        }
        if let Some(data_handle) = data_handle {
            if let Some(end) = data_end {
                data_handle
                    .seek(SeekFrom::Start(end))
                    .when_reading_file(&self.data_file)?;
            } else {
                data_handle
                    .seek(SeekFrom::End(0))
                    .when_reading_file(&self.data_file)?;
            }
        }
        let (header, data) = header_and_data;

        if !is_inline {
            transaction.add(&self.data_file, offset);
            transaction
                .add(&canonical_index_file, current_revision * entry.len());
            let data_handle = data_handle
                .as_mut()
                .expect("data handle should exist when not inline");
            if !header.is_empty() {
                data_handle.write_all(header)?;
            }
            data_handle.write_all(data)?;
            match &mut self.delayed_buffer {
                Some(buf) => {
                    buf.lock()
                        .expect("propagate the panic")
                        .buffer
                        .write_all(entry)
                        .expect("write to delay buffer should succeed");
                }
                None => index_handle.write_all(entry)?,
            }
        } else if self.delayed_buffer.is_some() {
            return Err(HgError::abort(
                "invalid delayed write on inline revlog",
                exit_codes::ABORT,
                None,
            ));
        } else {
            offset += current_revision * entry.len();
            transaction.add(&canonical_index_file, offset);
            index_handle.write_all(entry)?;
            index_handle.write_all(header)?;
            index_handle.write_all(data)?;
        }
        let data_position = match data_handle {
            Some(h) => Some(h.position()?),
            None => None,
        };
        Ok((index_handle.position()?, data_position))
    }

    /// Return the real target index file and not the temporary when diverting
    pub fn canonical_index_file(&self) -> PathBuf {
        self.original_index_file
            .as_ref()
            .map(ToOwned::to_owned)
            .unwrap_or_else(|| self.index_file.to_owned())
    }

    /// Return the path to the diverted index
    fn diverted_index(&self) -> PathBuf {
        self.index_file.with_extension("i.a")
    }

    /// True if we're in a [`Self::with_write`] or [`Self::with_read`] context
    pub fn is_open(&self) -> bool {
        self.segment_file.is_open()
    }

    /// Set this revlog to delay its writes to a buffer
    pub fn delay(&mut self) -> Result<Option<PathBuf>, HgError> {
        assert!(!self.is_open());
        if self.is_inline() {
            return Err(HgError::abort(
                "revlog with delayed write should not be inline",
                exit_codes::ABORT,
                None,
            ));
        }
        if self.delayed_buffer.is_some() || self.original_index_file.is_some()
        {
            // Delay or divert already happening
            return Ok(None);
        }
        if self.is_empty() {
            self.original_index_file = Some(self.index_file.to_owned());
            self.index_file = self.diverted_index();
            if self.vfs.exists(&self.index_file) {
                self.vfs.unlink(&self.index_file)?;
            }
            Ok(Some(self.index_file.to_owned()))
        } else {
            self.delayed_buffer =
                Some(Arc::new(Mutex::new(DelayedBuffer::default())));
            Ok(None)
        }
    }

    /// Write the pending data (in memory) if any to the diverted index file
    /// (on disk temporary file)
    pub fn write_pending(
        &mut self,
    ) -> Result<(Option<PathBuf>, bool), HgError> {
        assert!(!self.is_open());
        if self.is_inline() {
            return Err(HgError::abort(
                "revlog with delayed write should not be inline",
                exit_codes::ABORT,
                None,
            ));
        }
        if self.original_index_file.is_some() {
            return Ok((None, true));
        }
        let mut any_pending = false;
        let pending_index_file = self.diverted_index();
        if self.vfs.exists(&pending_index_file) {
            self.vfs.unlink(&pending_index_file)?;
        }
        self.vfs.copy(&self.index_file, &pending_index_file)?;
        if let Some(delayed_buffer) = self.delayed_buffer.take() {
            let mut index_file_handle =
                self.vfs.open_write(&pending_index_file)?;
            index_file_handle
                .seek(SeekFrom::End(0))
                .when_writing_file(&pending_index_file)?;
            let delayed_data =
                &delayed_buffer.lock().expect("propagate the panic").buffer;
            index_file_handle
                .write_all(delayed_data)
                .when_writing_file(&pending_index_file)?;
            any_pending = true;
        }
        self.original_index_file = Some(self.index_file.to_owned());
        self.index_file = pending_index_file;
        Ok((Some(self.index_file.to_owned()), any_pending))
    }

    /// Overwrite the canonical file with the diverted file, or write out the
    /// delayed buffer.
    /// Returns an error if the revlog is neither diverted nor delayed.
    pub fn finalize_pending(&mut self) -> Result<PathBuf, HgError> {
        assert!(!self.is_open());
        if self.is_inline() {
            return Err(HgError::abort(
                "revlog with delayed write should not be inline",
                exit_codes::ABORT,
                None,
            ));
        }
        match (
            self.delayed_buffer.as_ref(),
            self.original_index_file.as_ref(),
        ) {
            (None, None) => {
                return Err(HgError::abort(
                    "neither delay nor divert found on this revlog",
                    exit_codes::ABORT,
                    None,
                ));
            }
            (Some(delay), None) => {
                let mut index_file_handle =
                    self.vfs.open_write(&self.index_file)?;
                index_file_handle
                    .seek(SeekFrom::End(0))
                    .when_writing_file(&self.index_file)?;
                index_file_handle
                    .write_all(
                        &delay.lock().expect("propagate the panic").buffer,
                    )
                    .when_writing_file(&self.index_file)?;
                self.delayed_buffer = None;
            }
            (None, Some(divert)) => {
                if self.vfs.exists(&self.index_file) {
                    self.vfs.rename(&self.index_file, divert, true)?;
                }
                divert.clone_into(&mut self.index_file);
                self.original_index_file = None;
            }
            (Some(_), Some(_)) => unreachable!(
                "{} is in an inconsistent state of both delay and divert",
                self.canonical_index_file().display(),
            ),
        }
        Ok(self.canonical_index_file())
    }

    /// `pub` only for `hg-cpython`. This is made a different method than
    /// [`Revlog::index`] in case there is a different invariant that pops up
    /// later.
    #[doc(hidden)]
    pub fn shared_index(&self) -> &Index {
        &self.index
    }
}

/// The use of a [`Refcell`] assumes that a given revlog will only
/// be accessed (read or write) by a single thread.
type UncompressedChunkCache =
    RefCell<LruMap<Revision, Arc<[u8]>, ByMemoryUsage>>;

/// The node, revision and data for the last revision we've seen. Speeds up
/// a lot of sequential operations of the revlog.
///
/// The data is not just bytes since it can come from Python and we want to
/// avoid copies if possible.
type SingleRevisionCache =
    (Node, Revision, Box<dyn Deref<Target = [u8]> + Send>);

/// A way of progressively filling a buffer with revision data, then return
/// that buffer. Used to abstract away Python-allocated code to reduce copying
/// for performance reasons.
pub trait RevisionBuffer {
    /// The owned buffer type to return
    type Target;
    /// Copies the slice into the buffer
    fn extend_from_slice(&mut self, slice: &[u8]);
    /// Returns the now finished owned buffer
    fn finish(self) -> Self::Target;
}

/// A simple vec-based buffer. This is uselessly complicated for the pure Rust
/// case, but it's the price to pay for Python compatibility.
#[derive(Debug)]
pub(super) struct CoreRevisionBuffer {
    buf: Vec<u8>,
}

impl CoreRevisionBuffer {
    pub fn new() -> Self {
        Self { buf: vec![] }
    }

    #[inline]
    pub fn resize(&mut self, size: usize) {
        self.buf.reserve_exact(size - self.buf.capacity());
    }
}

impl RevisionBuffer for CoreRevisionBuffer {
    type Target = Vec<u8>;

    #[inline]
    fn extend_from_slice(&mut self, slice: &[u8]) {
        self.buf.extend_from_slice(slice);
    }

    #[inline]
    fn finish(self) -> Self::Target {
        self.buf
    }
}

/// Calculate the hash of a revision given its data and its parents.
pub fn hash(
    data: &[u8],
    p1_hash: &[u8],
    p2_hash: &[u8],
) -> [u8; NODE_BYTES_LENGTH] {
    let mut hasher = Sha1::new();
    let (a, b) = (p1_hash, p2_hash);
    if a > b {
        hasher.update(b);
        hasher.update(a);
    } else {
        hasher.update(a);
        hasher.update(b);
    }
    hasher.update(data);
    *hasher.finalize().as_ref()
}