Mercurial > hg
view rust/hg-core/src/revlog/file_io.rs @ 52159:426696af24d3
rust-revlog: add file IO helpers
This will be useful for the upcoming `InnerRevlog`.
author | Raphaël Gomès <rgomes@octobus.net> |
---|---|
date | Wed, 25 Sep 2024 18:10:03 +0200 |
parents | |
children | 7be39c5110c9 |
line wrap: on
line source
//! Helpers for revlog file reading and writing. use std::{ cell::RefCell, fs::File, io::{Read, Seek, SeekFrom, Write}, path::{Path, PathBuf}, sync::{Arc, Mutex}, }; use crate::{ errors::{HgError, IoResultExt}, vfs::Vfs, }; /// Wraps accessing arbitrary chunks of data within a file and reusing handles. /// This is currently useful for accessing a revlog's data file, only reading /// the ranges that are currently relevant, like a sort of basic and manual /// file-based mmap. /// /// XXX should this just be replaced with `mmap` + `madvise` ranges? /// The upcoming `UncompressedChunkCache` will make up for most of the slowness /// of re-reading the same chunks, so this might not be as useful. Aside from /// the major benefit of having less code to take care of, using `mmap` will /// allow multiple processes to share the same pages, especially for the /// changelog and manifest, which would make a difference in server contexts. pub struct RandomAccessFile { /// The current store VFS to pass it to [`FileHandle`] vfs: Box<dyn Vfs>, /// Filename of the open file, relative to the vfs root pub filename: PathBuf, /// The current read-only handle on the file, if any pub reading_handle: RefCell<Option<FileHandle>>, /// The current read-write handle on the file, if any pub writing_handle: RefCell<Option<FileHandle>>, } impl RandomAccessFile { /// Wrap a file for random access pub fn new(vfs: Box<dyn Vfs>, filename: PathBuf) -> Self { assert!(filename.is_relative()); Self { vfs, filename, reading_handle: RefCell::new(None), writing_handle: RefCell::new(None), } } /// Read a chunk of bytes from the file. pub fn read_chunk( &self, offset: usize, length: usize, ) -> Result<Vec<u8>, HgError> { let mut handle = self.get_read_handle()?; handle .seek(SeekFrom::Start(offset as u64)) .when_reading_file(&self.filename)?; handle.read_exact(length).when_reading_file(&self.filename) } /// `pub` only for hg-cpython #[doc(hidden)] pub fn get_read_handle(&self) -> Result<FileHandle, HgError> { if let Some(handle) = &*self.writing_handle.borrow() { // Use a file handle being actively used for writes, if available. // There is some danger to doing this because reads will seek the // file. // However, [`Revlog::write_entry`] performs a `SeekFrom::End(0)` // before all writes, so we should be safe. return Ok(handle.clone()); } if let Some(handle) = &*self.reading_handle.borrow() { return Ok(handle.clone()); } // early returns done to work around borrowck being overzealous // See https://github.com/rust-lang/rust/issues/103108 let new_handle = FileHandle::new( dyn_clone::clone_box(&*self.vfs), &self.filename, false, false, )?; *self.reading_handle.borrow_mut() = Some(new_handle.clone()); Ok(new_handle) } /// `pub` only for hg-cpython #[doc(hidden)] pub fn exit_reading_context(&self) { self.reading_handle.take(); } // Returns whether this file currently open pub fn is_open(&self) -> bool { self.reading_handle.borrow().is_some() || self.writing_handle.borrow().is_some() } } /// A buffer that holds new changelog index data that needs to be written /// after the manifest and filelogs so that the repo is updated atomically to /// external processes. #[derive(Clone, Debug, Default)] pub struct DelayedBuffer { // The actual in-memory bytes storing the delayed writes pub(super) buffer: Vec<u8>, /// The current offset into the virtual file composed of file + buffer offset: u64, /// The size of the file at the time of opening file_size: u64, } impl DelayedBuffer { /// Returns the length of the full data (on-disk + buffer length). pub fn len(&self) -> u64 { self.buffer.len() as u64 + self.file_size } pub fn is_empty(&self) -> bool { self.len() == 0 } } /// Holds an open [`File`] and the related data. This can be used for reading /// and writing. Writes can be delayed to a buffer before touching the disk, /// if relevant (in the changelog case), but reads are transparent. pub struct FileHandle { /// The actual open file pub file: File, /// The VFS with which the file was opened vfs: Box<dyn Vfs>, /// Filename of the open file, relative to the repo root filename: PathBuf, /// Buffer of delayed entry writes to the changelog index. This points /// back to the buffer inside the revlog this handle refers to. delayed_buffer: Option<Arc<Mutex<DelayedBuffer>>>, } impl std::fmt::Debug for FileHandle { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("FileHandle") .field("filename", &self.filename) .field("delayed_buffer", &self.delayed_buffer) .field("file", &self.file) .finish() } } impl Clone for FileHandle { fn clone(&self) -> Self { Self { vfs: dyn_clone::clone_box(&*self.vfs), filename: self.filename.clone(), delayed_buffer: self.delayed_buffer.clone(), // This can only fail if the OS doesn't have the file handle // anymore, so we're not going to do anything useful anyway. file: self.file.try_clone().expect("couldn't clone file handle"), } } } impl FileHandle { /// Get a (read or write) file handle to `filename`. Only creates the file /// if `create` is `true`. pub fn new( vfs: Box<dyn Vfs>, filename: impl AsRef<Path>, create: bool, write: bool, ) -> Result<Self, HgError> { let file = if create { vfs.create(filename.as_ref())? } else if write { vfs.open(filename.as_ref())? } else { vfs.open_read(filename.as_ref())? }; Ok(Self { vfs, filename: filename.as_ref().to_owned(), delayed_buffer: None, file, }) } /// Get a file handle to `filename`, but writes go to a [`DelayedBuffer`]. pub fn new_delayed( vfs: Box<dyn Vfs>, filename: impl AsRef<Path>, create: bool, delayed_buffer: Arc<Mutex<DelayedBuffer>>, ) -> Result<Self, HgError> { let mut file = if create { vfs.create(filename.as_ref())? } else { vfs.open(filename.as_ref())? }; let size = vfs.file_size(&file)?; let offset = file .stream_position() .when_reading_file(filename.as_ref())?; { let mut buf = delayed_buffer.lock().unwrap(); buf.file_size = size; buf.offset = offset; } Ok(Self { vfs, filename: filename.as_ref().to_owned(), delayed_buffer: Some(delayed_buffer), file, }) } /// Wrap an existing [`File`] pub fn from_file( file: File, vfs: Box<dyn Vfs>, filename: impl AsRef<Path>, ) -> Self { Self { vfs, filename: filename.as_ref().to_owned(), delayed_buffer: None, file, } } /// Wrap an existing [`File`], but writes go to a [`DelayedBuffer`]. pub fn from_file_delayed( mut file: File, vfs: Box<dyn Vfs>, filename: impl AsRef<Path>, delayed_buffer: Arc<Mutex<DelayedBuffer>>, ) -> Result<Self, HgError> { let size = vfs.file_size(&file)?; let offset = file .stream_position() .when_reading_file(filename.as_ref())?; { let mut buf = delayed_buffer.lock().unwrap(); buf.file_size = size; buf.offset = offset; } Ok(Self { vfs, filename: filename.as_ref().to_owned(), delayed_buffer: Some(delayed_buffer), file, }) } /// Move the position of the handle to `pos`, /// spanning the [`DelayedBuffer`] if defined. Will return an error if /// an invalid seek position is asked, or for any standard io error. pub fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> { if let Some(delay_buf) = &self.delayed_buffer { let mut delay_buf = delay_buf.lock().unwrap(); // Virtual file offset spans real file and data match pos { SeekFrom::Start(offset) => delay_buf.offset = offset, SeekFrom::End(offset) => { delay_buf.offset = delay_buf.len().saturating_add_signed(offset) } SeekFrom::Current(offset) => { delay_buf.offset = delay_buf.offset.saturating_add_signed(offset); } } if delay_buf.offset < delay_buf.file_size { self.file.seek(pos) } else { Ok(delay_buf.offset) } } else { self.file.seek(pos) } } /// Read exactly `length` bytes from the current position. /// Errors are the same as [`std::io::Read::read_exact`]. pub fn read_exact( &mut self, length: usize, ) -> Result<Vec<u8>, std::io::Error> { if let Some(delay_buf) = self.delayed_buffer.as_mut() { let mut delay_buf = delay_buf.lock().unwrap(); let mut buf = vec![0; length]; let offset: isize = delay_buf.offset.try_into().expect("buffer too large"); let file_size: isize = delay_buf.file_size.try_into().expect("file too large"); let span: isize = offset - file_size; let length = length.try_into().expect("too large of a length"); let absolute_span: u64 = span.unsigned_abs().try_into().expect("length too large"); if span < 0 { if length <= absolute_span { // We're only in the file self.file.read_exact(&mut buf)?; } else { // We're spanning file and buffer self.file .read_exact(&mut buf[..absolute_span as usize])?; delay_buf .buffer .take(length - absolute_span) .read_exact(&mut buf[absolute_span as usize..])?; } } else { // We're only in the buffer delay_buf.buffer[absolute_span as usize..] .take(length) .read_exact(&mut buf)?; } delay_buf.offset += length; Ok(buf.to_owned()) } else { let mut buf = vec![0; length]; self.file.read_exact(&mut buf)?; Ok(buf) } } /// Flush the in-memory changes to disk. This does *not* write the /// delayed buffer, only the pending file changes. pub fn flush(&mut self) -> Result<(), HgError> { self.file.flush().when_writing_file(&self.filename) } /// Return the current position in the file pub fn position(&mut self) -> Result<u64, HgError> { self.file .stream_position() .when_reading_file(&self.filename) } /// Append `data` to the file, or to the [`DelayedBuffer`], if any. pub fn write_all(&mut self, data: &[u8]) -> Result<(), HgError> { if let Some(buf) = &mut self.delayed_buffer { let mut delayed_buffer = buf.lock().expect("propagate the panic"); assert_eq!(delayed_buffer.offset, delayed_buffer.len()); delayed_buffer.buffer.extend_from_slice(data); delayed_buffer.offset += data.len() as u64; Ok(()) } else { self.file .write_all(data) .when_writing_file(&self.filename)?; Ok(()) } } } /// Write handles to a given revlog (index + maybe data) #[derive(Debug)] pub struct WriteHandles { /// Handle to the index file pub index_handle: FileHandle, /// Handle to the data file, if the revlog is non-inline pub data_handle: Option<FileHandle>, } #[cfg(test)] mod tests { use std::io::ErrorKind; use crate::vfs::VfsImpl; use super::*; #[test] fn test_random_access_file() { let base = tempfile::tempdir().unwrap().into_path(); let filename = Path::new("a"); let file_path = base.join(filename); let raf = RandomAccessFile::new( Box::new(VfsImpl { base }), filename.to_owned(), ); assert!(!raf.is_open()); assert_eq!(&raf.filename, &filename); // Should fail to read a non-existing file match raf.get_read_handle().unwrap_err() { HgError::IoError { error, .. } => match error.kind() { std::io::ErrorKind::NotFound => {} _ => panic!("should be not found"), }, e => panic!("{}", e.to_string()), } std::fs::write(file_path, b"1234567890").unwrap(); // Should be able to open an existing file let mut handle = raf.get_read_handle().unwrap(); assert!(raf.is_open()); assert_eq!(handle.read_exact(10).unwrap(), b"1234567890".to_vec()); } #[test] fn test_file_handle() { let base = tempfile::tempdir().unwrap().into_path(); let filename = base.join("a"); // No `create` should fail FileHandle::new( Box::new(VfsImpl { base: base.clone() }), &filename, false, false, ) .unwrap_err(); std::fs::write(&filename, b"1234567890").unwrap(); let mut read_handle = FileHandle::new( Box::new(VfsImpl { base: base.clone() }), &filename, false, false, ) .unwrap(); assert_eq!(&read_handle.filename, &filename); assert_eq!(read_handle.position().unwrap(), 0); // Writing to an explicit read handle should fail read_handle.write_all(b"some data").unwrap_err(); // reading exactly n bytes should work assert_eq!(read_handle.read_exact(3).unwrap(), b"123".to_vec()); // and the position should be remembered assert_eq!(read_handle.read_exact(2).unwrap(), b"45".to_vec()); // Seeking should work let position = read_handle.position().unwrap(); read_handle.seek(SeekFrom::Current(-2)).unwrap(); assert_eq!(position - 2, read_handle.position().unwrap()); // Seeking too much data should fail read_handle.read_exact(1000).unwrap_err(); // Work around the yet unimplemented VFS for write let mut options = std::fs::OpenOptions::new(); options.read(true); options.write(true); let file = options.open(&filename).unwrap(); // Open a write handle let mut handle = FileHandle::from_file( file, Box::new(VfsImpl { base: base.clone() }), &filename, ); // Now writing should succeed handle.write_all(b"new data").unwrap(); // Opening or writing does not seek, so we should be at the start assert_eq!(handle.position().unwrap(), 8); // We can still read assert_eq!(handle.read_exact(2).unwrap(), b"90".to_vec()); // Flushing doesn't do anything unexpected handle.flush().unwrap(); let delayed_buffer = Arc::new(Mutex::new(DelayedBuffer::default())); let file = options.open(&filename).unwrap(); let mut handle = FileHandle::from_file_delayed( file, Box::new(VfsImpl { base: base.clone() }), &filename, delayed_buffer, ) .unwrap(); assert_eq!( handle .delayed_buffer .as_ref() .unwrap() .lock() .unwrap() .file_size, 10 ); handle.seek(SeekFrom::End(0)).unwrap(); handle.write_all(b"should go to buffer").unwrap(); assert_eq!( handle .delayed_buffer .as_ref() .unwrap() .lock() .unwrap() .len(), 29 ); read_handle.seek(SeekFrom::Start(0)).unwrap(); // On-disk file contents should be unchanged assert_eq!( read_handle.read_exact(10).unwrap(), b"new data90".to_vec(), ); assert_eq!( read_handle.read_exact(1).unwrap_err().kind(), ErrorKind::UnexpectedEof ); handle.flush().unwrap(); // On-disk file contents should still be unchanged after a flush assert_eq!( read_handle.read_exact(1).unwrap_err().kind(), ErrorKind::UnexpectedEof ); // Read from the buffer only handle.seek(SeekFrom::End(-1)).unwrap(); assert_eq!(handle.read_exact(1).unwrap(), b"r".to_vec()); // Read from an overlapping section of file and buffer handle.seek(SeekFrom::Start(6)).unwrap(); assert_eq!( handle.read_exact(20).unwrap(), b"ta90should go to buf".to_vec() ); // Read from file only handle.seek(SeekFrom::Start(0)).unwrap(); assert_eq!(handle.read_exact(8).unwrap(), b"new data".to_vec()); } }