changeset 52159:426696af24d3

rust-revlog: add file IO helpers This will be useful for the upcoming `InnerRevlog`.
author Raphaël Gomès <rgomes@octobus.net>
date Wed, 25 Sep 2024 18:10:03 +0200
parents 0744248cc541
children 039b7caeb4d9
files rust/hg-core/src/revlog/file_io.rs rust/hg-core/src/revlog/mod.rs
diffstat 2 files changed, 536 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/rust/hg-core/src/revlog/file_io.rs	Wed Sep 25 18:10:03 2024 +0200
@@ -0,0 +1,535 @@
+//! Helpers for revlog file reading and writing.
+
+use std::{
+    cell::RefCell,
+    fs::File,
+    io::{Read, Seek, SeekFrom, Write},
+    path::{Path, PathBuf},
+    sync::{Arc, Mutex},
+};
+
+use crate::{
+    errors::{HgError, IoResultExt},
+    vfs::Vfs,
+};
+
+/// Wraps accessing arbitrary chunks of data within a file and reusing handles.
+/// This is currently useful for accessing a revlog's data file, only reading
+/// the ranges that are currently relevant, like a sort of basic and manual
+/// file-based mmap.
+///
+/// XXX should this just be replaced with `mmap` + `madvise` ranges?
+/// The upcoming `UncompressedChunkCache` will make up for most of the slowness
+/// of re-reading the same chunks, so this might not be as useful. Aside from
+/// the major benefit of having less code to take care of, using `mmap` will
+/// allow multiple processes to share the same pages, especially for the
+/// changelog and manifest, which would make a difference in server contexts.
+pub struct RandomAccessFile {
+    /// The current store VFS to pass it to [`FileHandle`]
+    vfs: Box<dyn Vfs>,
+    /// Filename of the open file, relative to the vfs root
+    pub filename: PathBuf,
+    /// The current read-only handle on the file, if any
+    pub reading_handle: RefCell<Option<FileHandle>>,
+    /// The current read-write handle on the file, if any
+    pub writing_handle: RefCell<Option<FileHandle>>,
+}
+
+impl RandomAccessFile {
+    /// Wrap a file for random access
+    pub fn new(vfs: Box<dyn Vfs>, filename: PathBuf) -> Self {
+        assert!(filename.is_relative());
+        Self {
+            vfs,
+            filename,
+            reading_handle: RefCell::new(None),
+            writing_handle: RefCell::new(None),
+        }
+    }
+
+    /// Read a chunk of bytes from the file.
+    pub fn read_chunk(
+        &self,
+        offset: usize,
+        length: usize,
+    ) -> Result<Vec<u8>, HgError> {
+        let mut handle = self.get_read_handle()?;
+        handle
+            .seek(SeekFrom::Start(offset as u64))
+            .when_reading_file(&self.filename)?;
+        handle.read_exact(length).when_reading_file(&self.filename)
+    }
+
+    /// `pub` only for hg-cpython
+    #[doc(hidden)]
+    pub fn get_read_handle(&self) -> Result<FileHandle, HgError> {
+        if let Some(handle) = &*self.writing_handle.borrow() {
+            // Use a file handle being actively used for writes, if available.
+            // There is some danger to doing this because reads will seek the
+            // file.
+            // However, [`Revlog::write_entry`] performs a `SeekFrom::End(0)`
+            // before all writes, so we should be safe.
+            return Ok(handle.clone());
+        }
+        if let Some(handle) = &*self.reading_handle.borrow() {
+            return Ok(handle.clone());
+        }
+        // early returns done to work around borrowck being overzealous
+        // See https://github.com/rust-lang/rust/issues/103108
+        let new_handle = FileHandle::new(
+            dyn_clone::clone_box(&*self.vfs),
+            &self.filename,
+            false,
+            false,
+        )?;
+        *self.reading_handle.borrow_mut() = Some(new_handle.clone());
+        Ok(new_handle)
+    }
+
+    /// `pub` only for hg-cpython
+    #[doc(hidden)]
+    pub fn exit_reading_context(&self) {
+        self.reading_handle.take();
+    }
+
+    // Returns whether this file currently open
+    pub fn is_open(&self) -> bool {
+        self.reading_handle.borrow().is_some()
+            || self.writing_handle.borrow().is_some()
+    }
+}
+
+/// A buffer that holds new changelog index data that needs to be written
+/// after the manifest and filelogs so that the repo is updated atomically to
+/// external processes.
+#[derive(Clone, Debug, Default)]
+pub struct DelayedBuffer {
+    // The actual in-memory bytes storing the delayed writes
+    pub(super) buffer: Vec<u8>,
+    /// The current offset into the virtual file composed of file + buffer
+    offset: u64,
+    /// The size of the file at the time of opening
+    file_size: u64,
+}
+
+impl DelayedBuffer {
+    /// Returns the length of the full data (on-disk + buffer length).
+    pub fn len(&self) -> u64 {
+        self.buffer.len() as u64 + self.file_size
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.len() == 0
+    }
+}
+
+/// Holds an open [`File`] and the related data. This can be used for reading
+/// and writing. Writes can be delayed to a buffer before touching the disk,
+/// if relevant (in the changelog case), but reads are transparent.
+pub struct FileHandle {
+    /// The actual open file
+    pub file: File,
+    /// The VFS with which the file was opened
+    vfs: Box<dyn Vfs>,
+    /// Filename of the open file, relative to the repo root
+    filename: PathBuf,
+    /// Buffer of delayed entry writes to the changelog index. This points
+    /// back to the buffer inside the revlog this handle refers to.
+    delayed_buffer: Option<Arc<Mutex<DelayedBuffer>>>,
+}
+
+impl std::fmt::Debug for FileHandle {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("FileHandle")
+            .field("filename", &self.filename)
+            .field("delayed_buffer", &self.delayed_buffer)
+            .field("file", &self.file)
+            .finish()
+    }
+}
+
+impl Clone for FileHandle {
+    fn clone(&self) -> Self {
+        Self {
+            vfs: dyn_clone::clone_box(&*self.vfs),
+            filename: self.filename.clone(),
+            delayed_buffer: self.delayed_buffer.clone(),
+            // This can only fail if the OS doesn't have the file handle
+            // anymore, so we're not going to do anything useful anyway.
+            file: self.file.try_clone().expect("couldn't clone file handle"),
+        }
+    }
+}
+
+impl FileHandle {
+    /// Get a (read or write) file handle to `filename`. Only creates the file
+    /// if `create` is `true`.
+    pub fn new(
+        vfs: Box<dyn Vfs>,
+        filename: impl AsRef<Path>,
+        create: bool,
+        write: bool,
+    ) -> Result<Self, HgError> {
+        let file = if create {
+            vfs.create(filename.as_ref())?
+        } else if write {
+            vfs.open(filename.as_ref())?
+        } else {
+            vfs.open_read(filename.as_ref())?
+        };
+        Ok(Self {
+            vfs,
+            filename: filename.as_ref().to_owned(),
+            delayed_buffer: None,
+            file,
+        })
+    }
+
+    /// Get a file handle to `filename`, but writes go to a [`DelayedBuffer`].
+    pub fn new_delayed(
+        vfs: Box<dyn Vfs>,
+        filename: impl AsRef<Path>,
+        create: bool,
+        delayed_buffer: Arc<Mutex<DelayedBuffer>>,
+    ) -> Result<Self, HgError> {
+        let mut file = if create {
+            vfs.create(filename.as_ref())?
+        } else {
+            vfs.open(filename.as_ref())?
+        };
+        let size = vfs.file_size(&file)?;
+        let offset = file
+            .stream_position()
+            .when_reading_file(filename.as_ref())?;
+
+        {
+            let mut buf = delayed_buffer.lock().unwrap();
+            buf.file_size = size;
+            buf.offset = offset;
+        }
+
+        Ok(Self {
+            vfs,
+            filename: filename.as_ref().to_owned(),
+            delayed_buffer: Some(delayed_buffer),
+            file,
+        })
+    }
+
+    /// Wrap an existing [`File`]
+    pub fn from_file(
+        file: File,
+        vfs: Box<dyn Vfs>,
+        filename: impl AsRef<Path>,
+    ) -> Self {
+        Self {
+            vfs,
+            filename: filename.as_ref().to_owned(),
+            delayed_buffer: None,
+            file,
+        }
+    }
+
+    /// Wrap an existing [`File`], but writes go to a [`DelayedBuffer`].
+    pub fn from_file_delayed(
+        mut file: File,
+        vfs: Box<dyn Vfs>,
+        filename: impl AsRef<Path>,
+        delayed_buffer: Arc<Mutex<DelayedBuffer>>,
+    ) -> Result<Self, HgError> {
+        let size = vfs.file_size(&file)?;
+        let offset = file
+            .stream_position()
+            .when_reading_file(filename.as_ref())?;
+
+        {
+            let mut buf = delayed_buffer.lock().unwrap();
+            buf.file_size = size;
+            buf.offset = offset;
+        }
+
+        Ok(Self {
+            vfs,
+            filename: filename.as_ref().to_owned(),
+            delayed_buffer: Some(delayed_buffer),
+            file,
+        })
+    }
+
+    /// Move the position of the handle to `pos`,
+    /// spanning the [`DelayedBuffer`] if defined. Will return an error if
+    /// an invalid seek position is asked, or for any standard io error.
+    pub fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
+        if let Some(delay_buf) = &self.delayed_buffer {
+            let mut delay_buf = delay_buf.lock().unwrap();
+            // Virtual file offset spans real file and data
+            match pos {
+                SeekFrom::Start(offset) => delay_buf.offset = offset,
+                SeekFrom::End(offset) => {
+                    delay_buf.offset =
+                        delay_buf.len().saturating_add_signed(offset)
+                }
+                SeekFrom::Current(offset) => {
+                    delay_buf.offset =
+                        delay_buf.offset.saturating_add_signed(offset);
+                }
+            }
+            if delay_buf.offset < delay_buf.file_size {
+                self.file.seek(pos)
+            } else {
+                Ok(delay_buf.offset)
+            }
+        } else {
+            self.file.seek(pos)
+        }
+    }
+
+    /// Read exactly `length` bytes from the current position.
+    /// Errors are the same as [`std::io::Read::read_exact`].
+    pub fn read_exact(
+        &mut self,
+        length: usize,
+    ) -> Result<Vec<u8>, std::io::Error> {
+        if let Some(delay_buf) = self.delayed_buffer.as_mut() {
+            let mut delay_buf = delay_buf.lock().unwrap();
+            let mut buf = vec![0; length];
+            let offset: isize =
+                delay_buf.offset.try_into().expect("buffer too large");
+            let file_size: isize =
+                delay_buf.file_size.try_into().expect("file too large");
+            let span: isize = offset - file_size;
+            let length = length.try_into().expect("too large of a length");
+            let absolute_span: u64 =
+                span.unsigned_abs().try_into().expect("length too large");
+            if span < 0 {
+                if length <= absolute_span {
+                    // We're only in the file
+                    self.file.read_exact(&mut buf)?;
+                } else {
+                    // We're spanning file and buffer
+                    self.file
+                        .read_exact(&mut buf[..absolute_span as usize])?;
+                    delay_buf
+                        .buffer
+                        .take(length - absolute_span)
+                        .read_exact(&mut buf[absolute_span as usize..])?;
+                }
+            } else {
+                // We're only in the buffer
+                delay_buf.buffer[absolute_span as usize..]
+                    .take(length)
+                    .read_exact(&mut buf)?;
+            }
+            delay_buf.offset += length;
+            Ok(buf.to_owned())
+        } else {
+            let mut buf = vec![0; length];
+            self.file.read_exact(&mut buf)?;
+            Ok(buf)
+        }
+    }
+
+    /// Flush the in-memory changes to disk. This does *not* write the
+    /// delayed buffer, only the pending file changes.
+    pub fn flush(&mut self) -> Result<(), HgError> {
+        self.file.flush().when_writing_file(&self.filename)
+    }
+
+    /// Return the current position in the file
+    pub fn position(&mut self) -> Result<u64, HgError> {
+        self.file
+            .stream_position()
+            .when_reading_file(&self.filename)
+    }
+
+    /// Append `data` to the file, or to the [`DelayedBuffer`], if any.
+    pub fn write_all(&mut self, data: &[u8]) -> Result<(), HgError> {
+        if let Some(buf) = &mut self.delayed_buffer {
+            let mut delayed_buffer = buf.lock().expect("propagate the panic");
+            assert_eq!(delayed_buffer.offset, delayed_buffer.len());
+            delayed_buffer.buffer.extend_from_slice(data);
+            delayed_buffer.offset += data.len() as u64;
+            Ok(())
+        } else {
+            self.file
+                .write_all(data)
+                .when_writing_file(&self.filename)?;
+            Ok(())
+        }
+    }
+}
+
+/// Write handles to a given revlog (index + maybe data)
+#[derive(Debug)]
+pub struct WriteHandles {
+    /// Handle to the index file
+    pub index_handle: FileHandle,
+    /// Handle to the data file, if the revlog is non-inline
+    pub data_handle: Option<FileHandle>,
+}
+
+#[cfg(test)]
+mod tests {
+    use std::io::ErrorKind;
+
+    use crate::vfs::VfsImpl;
+
+    use super::*;
+
+    #[test]
+    fn test_random_access_file() {
+        let base = tempfile::tempdir().unwrap().into_path();
+        let filename = Path::new("a");
+        let file_path = base.join(filename);
+        let raf = RandomAccessFile::new(
+            Box::new(VfsImpl { base }),
+            filename.to_owned(),
+        );
+
+        assert!(!raf.is_open());
+        assert_eq!(&raf.filename, &filename);
+        // Should fail to read a non-existing file
+        match raf.get_read_handle().unwrap_err() {
+            HgError::IoError { error, .. } => match error.kind() {
+                std::io::ErrorKind::NotFound => {}
+                _ => panic!("should be not found"),
+            },
+            e => panic!("{}", e.to_string()),
+        }
+
+        std::fs::write(file_path, b"1234567890").unwrap();
+
+        // Should be able to open an existing file
+        let mut handle = raf.get_read_handle().unwrap();
+        assert!(raf.is_open());
+        assert_eq!(handle.read_exact(10).unwrap(), b"1234567890".to_vec());
+    }
+
+    #[test]
+    fn test_file_handle() {
+        let base = tempfile::tempdir().unwrap().into_path();
+        let filename = base.join("a");
+        // No `create` should fail
+        FileHandle::new(
+            Box::new(VfsImpl { base: base.clone() }),
+            &filename,
+            false,
+            false,
+        )
+        .unwrap_err();
+        std::fs::write(&filename, b"1234567890").unwrap();
+
+        let mut read_handle = FileHandle::new(
+            Box::new(VfsImpl { base: base.clone() }),
+            &filename,
+            false,
+            false,
+        )
+        .unwrap();
+        assert_eq!(&read_handle.filename, &filename);
+        assert_eq!(read_handle.position().unwrap(), 0);
+
+        // Writing to an explicit read handle should fail
+        read_handle.write_all(b"some data").unwrap_err();
+
+        // reading exactly n bytes should work
+        assert_eq!(read_handle.read_exact(3).unwrap(), b"123".to_vec());
+        // and the position should be remembered
+        assert_eq!(read_handle.read_exact(2).unwrap(), b"45".to_vec());
+
+        // Seeking should work
+        let position = read_handle.position().unwrap();
+        read_handle.seek(SeekFrom::Current(-2)).unwrap();
+        assert_eq!(position - 2, read_handle.position().unwrap());
+
+        // Seeking too much data should fail
+        read_handle.read_exact(1000).unwrap_err();
+
+        // Work around the yet unimplemented VFS for write
+        let mut options = std::fs::OpenOptions::new();
+        options.read(true);
+        options.write(true);
+        let file = options.open(&filename).unwrap();
+        // Open a write handle
+        let mut handle = FileHandle::from_file(
+            file,
+            Box::new(VfsImpl { base: base.clone() }),
+            &filename,
+        );
+
+        // Now writing should succeed
+        handle.write_all(b"new data").unwrap();
+        // Opening or writing does not seek, so we should be at the start
+        assert_eq!(handle.position().unwrap(), 8);
+        // We can still read
+        assert_eq!(handle.read_exact(2).unwrap(), b"90".to_vec());
+        // Flushing doesn't do anything unexpected
+        handle.flush().unwrap();
+
+        let delayed_buffer = Arc::new(Mutex::new(DelayedBuffer::default()));
+        let file = options.open(&filename).unwrap();
+        let mut handle = FileHandle::from_file_delayed(
+            file,
+            Box::new(VfsImpl { base: base.clone() }),
+            &filename,
+            delayed_buffer,
+        )
+        .unwrap();
+
+        assert_eq!(
+            handle
+                .delayed_buffer
+                .as_ref()
+                .unwrap()
+                .lock()
+                .unwrap()
+                .file_size,
+            10
+        );
+        handle.seek(SeekFrom::End(0)).unwrap();
+        handle.write_all(b"should go to buffer").unwrap();
+        assert_eq!(
+            handle
+                .delayed_buffer
+                .as_ref()
+                .unwrap()
+                .lock()
+                .unwrap()
+                .len(),
+            29
+        );
+        read_handle.seek(SeekFrom::Start(0)).unwrap();
+        // On-disk file contents should be unchanged
+        assert_eq!(
+            read_handle.read_exact(10).unwrap(),
+            b"new data90".to_vec(),
+        );
+
+        assert_eq!(
+            read_handle.read_exact(1).unwrap_err().kind(),
+            ErrorKind::UnexpectedEof
+        );
+
+        handle.flush().unwrap();
+        // On-disk file contents should still be unchanged after a flush
+        assert_eq!(
+            read_handle.read_exact(1).unwrap_err().kind(),
+            ErrorKind::UnexpectedEof
+        );
+
+        // Read from the buffer only
+        handle.seek(SeekFrom::End(-1)).unwrap();
+        assert_eq!(handle.read_exact(1).unwrap(), b"r".to_vec());
+
+        // Read from an overlapping section of file and buffer
+        handle.seek(SeekFrom::Start(6)).unwrap();
+        assert_eq!(
+            handle.read_exact(20).unwrap(),
+            b"ta90should go to buf".to_vec()
+        );
+
+        // Read from file only
+        handle.seek(SeekFrom::Start(0)).unwrap();
+        assert_eq!(handle.read_exact(8).unwrap(), b"new data".to_vec());
+    }
+}
--- a/rust/hg-core/src/revlog/mod.rs	Wed Sep 25 16:42:21 2024 +0200
+++ b/rust/hg-core/src/revlog/mod.rs	Wed Sep 25 18:10:03 2024 +0200
@@ -13,6 +13,7 @@
 pub use node::{FromHexError, Node, NodePrefix};
 pub mod changelog;
 pub mod compression;
+pub mod file_io;
 pub mod filelog;
 pub mod index;
 pub mod manifest;