Mercurial > hg
changeset 52158:0744248cc541
rust-revlog: add compression helpers
This will be used in the upcoming `InnerRevlog` when reading/writing data.
author | Raphaël Gomès <rgomes@octobus.net> |
---|---|
date | Wed, 25 Sep 2024 16:42:21 +0200 |
parents | de4b9ea2fa34 |
children | 426696af24d3 |
files | rust/hg-core/src/revlog/compression.rs rust/hg-core/src/revlog/mod.rs |
diffstat | 2 files changed, 396 insertions(+), 160 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/rust/hg-core/src/revlog/compression.rs Wed Sep 25 16:42:21 2024 +0200 @@ -0,0 +1,383 @@ +//! Helpers around revlog compression + +use std::cell::RefCell; +use std::collections::HashSet; +use std::io::Read; + +use flate2::bufread::ZlibEncoder; +use flate2::read::ZlibDecoder; + +use crate::config::Config; +use crate::errors::HgError; +use crate::exit_codes; + +use super::corrupted; +use super::RevlogError; + +/// Header byte used to identify ZSTD-compressed data +pub const ZSTD_BYTE: u8 = b'\x28'; +/// Header byte used to identify Zlib-compressed data +pub const ZLIB_BYTE: u8 = b'x'; + +const ZSTD_DEFAULT_LEVEL: u8 = 3; +const ZLIB_DEFAULT_LEVEL: u8 = 6; +/// The length of data below which we don't even try to compress it when using +/// Zstandard. +const MINIMUM_LENGTH_ZSTD: usize = 50; +/// The length of data below which we don't even try to compress it when using +/// Zlib. +const MINIMUM_LENGTH_ZLIB: usize = 44; + +/// Defines the available compression engines and their options. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum CompressionConfig { + Zlib { + /// Between 0 and 9 included + level: u8, + }, + Zstd { + /// Between 0 and 22 included + level: u8, + /// Never used in practice for now + threads: u8, + }, + /// No compression is performed + None, +} + +impl CompressionConfig { + pub fn new( + config: &Config, + requirements: &HashSet<String>, + ) -> Result<Self, HgError> { + let mut new = Self::default(); + + let zlib_level = config.get_u32(b"storage", b"revlog.zlib.level")?; + let zstd_level = config.get_u32(b"storage", b"revlog.zstd.level")?; + + for requirement in requirements { + if requirement.starts_with("revlog-compression-") + || requirement.starts_with("exp-compression-") + { + let split = &mut requirement.splitn(3, '-'); + split.next(); + split.next(); + new = match split.next().unwrap() { + "zstd" => CompressionConfig::zstd(zstd_level)?, + e => { + return Err(HgError::UnsupportedFeature(format!( + "Unsupported compression engine '{e}'" + ))) + } + }; + } + } + if let Some(level) = zlib_level { + if matches!(new, CompressionConfig::Zlib { .. }) { + new.set_level(level as usize)?; + } + } + Ok(new) + } + + /// Sets the level of the current compression engine + pub fn set_level(&mut self, new_level: usize) -> Result<(), HgError> { + match self { + CompressionConfig::Zlib { level } => { + if new_level > 9 { + return Err(HgError::abort( + format!( + "invalid compression zlib compression level {}, \ + expected between 0 and 9 included", + new_level + ), + exit_codes::ABORT, + None, + )); + } + *level = new_level as u8; + } + CompressionConfig::Zstd { level, .. } => { + if new_level > 22 { + return Err(HgError::abort( + format!( + "invalid compression zstd compression level {}, \ + expected between 0 and 22 included", + new_level + ), + exit_codes::ABORT, + None, + )); + } + *level = new_level as u8; + } + CompressionConfig::None => {} + } + Ok(()) + } + + /// Return a ZSTD compression config + pub fn zstd( + zstd_level: Option<u32>, + ) -> Result<CompressionConfig, HgError> { + let mut engine = CompressionConfig::Zstd { + level: ZSTD_DEFAULT_LEVEL, + threads: 0, + }; + if let Some(level) = zstd_level { + engine.set_level(level as usize)?; + } + Ok(engine) + } +} + +impl Default for CompressionConfig { + fn default() -> Self { + Self::Zlib { + level: ZLIB_DEFAULT_LEVEL, + } + } +} + +/// A high-level trait to define compressors that should be able to compress +/// and decompress arbitrary bytes. +pub trait Compressor { + /// Returns a new [`Vec`] with the compressed data. + /// Should return `Ok(None)` if compression does not apply (e.g. too small) + fn compress( + &mut self, + data: &[u8], + ) -> Result<Option<Vec<u8>>, RevlogError>; + /// Returns a new [`Vec`] with the decompressed data. + fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, RevlogError>; +} + +/// A compressor that does nothing (useful in tests) +pub struct NoneCompressor; + +impl Compressor for NoneCompressor { + fn compress( + &mut self, + _data: &[u8], + ) -> Result<Option<Vec<u8>>, RevlogError> { + Ok(None) + } + + fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, RevlogError> { + Ok(data.to_owned()) + } +} + +/// A compressor for Zstandard +pub struct ZstdCompressor { + /// Level of compression to use + level: u8, + /// How many threads are used (not implemented yet) + threads: u8, + /// The underlying zstd compressor + compressor: zstd::bulk::Compressor<'static>, +} + +impl ZstdCompressor { + pub fn new(level: u8, threads: u8) -> Self { + Self { + level, + threads, + compressor: zstd::bulk::Compressor::new(level.into()) + .expect("invalid zstd arguments"), + } + } +} + +impl Compressor for ZstdCompressor { + fn compress( + &mut self, + data: &[u8], + ) -> Result<Option<Vec<u8>>, RevlogError> { + if self.threads != 0 { + // TODO use a zstd builder + zstd cargo feature to support this + unimplemented!("zstd parallel compression is not implemented"); + } + if data.len() < MINIMUM_LENGTH_ZSTD { + return Ok(None); + } + let level = self.level as i32; + if data.len() <= 1000000 { + let compressed = self.compressor.compress(data).map_err(|e| { + corrupted(format!("revlog compress error: {}", e)) + })?; + Ok(if compressed.len() < data.len() { + Some(compressed) + } else { + None + }) + } else { + Ok(Some(zstd::stream::encode_all(data, level).map_err( + |e| corrupted(format!("revlog compress error: {}", e)), + )?)) + } + } + + fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, RevlogError> { + zstd::stream::decode_all(data).map_err(|e| { + corrupted(format!("revlog decompress error: {}", e)).into() + }) + } +} + +/// A compressor for Zlib +pub struct ZlibCompressor { + /// Level of compression to use + level: flate2::Compression, +} + +impl ZlibCompressor { + pub fn new(level: u8) -> Self { + Self { + level: flate2::Compression::new(level.into()), + } + } +} + +impl Compressor for ZlibCompressor { + fn compress( + &mut self, + data: &[u8], + ) -> Result<Option<Vec<u8>>, RevlogError> { + assert!(!data.is_empty()); + if data.len() < MINIMUM_LENGTH_ZLIB { + return Ok(None); + } + let mut buf = Vec::with_capacity(data.len()); + ZlibEncoder::new(data, self.level) + .read_to_end(&mut buf) + .map_err(|e| corrupted(format!("revlog compress error: {}", e)))?; + + Ok(if buf.len() < data.len() { + buf.shrink_to_fit(); + Some(buf) + } else { + None + }) + } + + fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, RevlogError> { + let mut decoder = ZlibDecoder::new(data); + // TODO reuse the allocation somehow? + let mut buf = vec![]; + decoder.read_to_end(&mut buf).map_err(|e| { + corrupted(format!("revlog decompress error: {}", e)) + })?; + Ok(buf) + } +} + +thread_local! { + // seems fine to [unwrap] here: this can only fail due to memory allocation + // failing, and it's normal for that to cause panic. + static ZSTD_DECODER : RefCell<zstd::bulk::Decompressor<'static>> = + RefCell::new(zstd::bulk::Decompressor::new().ok().unwrap()); +} + +/// Util to wrap the reuse of a zstd decoder while controlling its buffer size. +fn zstd_decompress_to_buffer( + bytes: &[u8], + buf: &mut Vec<u8>, +) -> Result<usize, std::io::Error> { + ZSTD_DECODER + .with(|decoder| decoder.borrow_mut().decompress_to_buffer(bytes, buf)) +} + +/// Specialized revlog decompression to use less memory for deltas while +/// keeping performance acceptable. +pub(super) fn uncompressed_zstd_data( + bytes: &[u8], + is_delta: bool, + uncompressed_len: i32, +) -> Result<Vec<u8>, HgError> { + let cap = uncompressed_len.max(0) as usize; + if is_delta { + // [cap] is usually an over-estimate of the space needed because + // it's the length of delta-decoded data, but we're interested + // in the size of the delta. + // This means we have to [shrink_to_fit] to avoid holding on + // to a large chunk of memory, but it also means we must have a + // fallback branch, for the case when the delta is longer than + // the original data (surprisingly, this does happen in practice) + let mut buf = Vec::with_capacity(cap); + match zstd_decompress_to_buffer(bytes, &mut buf) { + Ok(_) => buf.shrink_to_fit(), + Err(_) => { + buf.clear(); + zstd::stream::copy_decode(bytes, &mut buf) + .map_err(|e| corrupted(e.to_string()))?; + } + }; + Ok(buf) + } else { + let mut buf = Vec::with_capacity(cap); + let len = zstd_decompress_to_buffer(bytes, &mut buf) + .map_err(|e| corrupted(e.to_string()))?; + if len != uncompressed_len as usize { + Err(corrupted("uncompressed length does not match")) + } else { + Ok(buf) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + const LARGE_TEXT: &[u8] = b" + Patali Dirapata, Cromda Cromda Ripalo, Pata Pata, Ko Ko Ko + Bokoro Dipoulito, Rondi Rondi Pepino, Pata Pata, Ko Ko Ko + Emana Karassoli, Loucra Loucra Nonponto, Pata Pata, Ko Ko Ko."; + + #[test] + fn test_zlib_compressor() { + // Can return `Ok(None)` + let mut compressor = ZlibCompressor::new(1); + assert_eq!(compressor.compress(b"too small").unwrap(), None); + + // Compression returns bytes + let compressed_with_1 = + compressor.compress(LARGE_TEXT).unwrap().unwrap(); + assert!(compressed_with_1.len() < LARGE_TEXT.len()); + // Round trip works + assert_eq!( + compressor.decompress(&compressed_with_1).unwrap(), + LARGE_TEXT + ); + + // Compression levels mean something + let mut compressor = ZlibCompressor::new(9); + // Compression returns bytes + let compressed = compressor.compress(LARGE_TEXT).unwrap().unwrap(); + assert!(compressed.len() < compressed_with_1.len()); + } + + #[test] + fn test_zstd_compressor() { + // Can return `Ok(None)` + let mut compressor = ZstdCompressor::new(1, 0); + assert_eq!(compressor.compress(b"too small").unwrap(), None); + + // Compression returns bytes + let compressed_with_1 = + compressor.compress(LARGE_TEXT).unwrap().unwrap(); + assert!(compressed_with_1.len() < LARGE_TEXT.len()); + // Round trip works + assert_eq!( + compressor.decompress(&compressed_with_1).unwrap(), + LARGE_TEXT + ); + + // Compression levels mean something + let mut compressor = ZstdCompressor::new(22, 0); + // Compression returns bytes + let compressed = compressor.compress(LARGE_TEXT).unwrap().unwrap(); + assert!(compressed.len() < compressed_with_1.len()); + } +}
--- a/rust/hg-core/src/revlog/mod.rs Tue Oct 29 09:38:48 2024 +0100 +++ b/rust/hg-core/src/revlog/mod.rs Wed Sep 25 16:42:21 2024 +0200 @@ -9,8 +9,10 @@ pub mod nodemap; mod nodemap_docket; pub mod path_encode; +use compression::{uncompressed_zstd_data, CompressionConfig}; pub use node::{FromHexError, Node, NodePrefix}; pub mod changelog; +pub mod compression; pub mod filelog; pub mod index; pub mod manifest; @@ -24,8 +26,6 @@ use flate2::read::ZlibDecoder; use sha1::{Digest, Sha1}; -use std::cell::RefCell; -use zstd; use self::node::{NODE_BYTES_LENGTH, NULL_NODE}; use self::nodemap_docket::NodeMapDocket; @@ -258,75 +258,6 @@ } } -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum CompressionEngine { - Zlib { - /// Between 0 and 9 included - level: u32, - }, - Zstd { - /// Between 0 and 22 included - level: u32, - /// Never used in practice for now - threads: u32, - }, - /// No compression is performed - None, -} -impl CompressionEngine { - pub fn set_level(&mut self, new_level: usize) -> Result<(), HgError> { - match self { - CompressionEngine::Zlib { level } => { - if new_level > 9 { - return Err(HgError::abort( - format!( - "invalid compression zlib compression level {}", - new_level - ), - exit_codes::ABORT, - None, - )); - } - *level = new_level as u32; - } - CompressionEngine::Zstd { level, .. } => { - if new_level > 22 { - return Err(HgError::abort( - format!( - "invalid compression zstd compression level {}", - new_level - ), - exit_codes::ABORT, - None, - )); - } - *level = new_level as u32; - } - CompressionEngine::None => {} - } - Ok(()) - } - - pub fn zstd( - zstd_level: Option<u32>, - ) -> Result<CompressionEngine, HgError> { - let mut engine = CompressionEngine::Zstd { - level: 3, - threads: 0, - }; - if let Some(level) = zstd_level { - engine.set_level(level as usize)?; - } - Ok(engine) - } -} - -impl Default for CompressionEngine { - fn default() -> Self { - Self::Zlib { level: 6 } - } -} - #[derive(Debug, Clone, Copy, PartialEq)] /// Holds configuration values about how the revlog data is read pub struct RevlogDataConfig { @@ -546,7 +477,7 @@ /// Holds configuration values about the available revlog features pub struct RevlogFeatureConfig { /// The compression engine and its options - pub compression_engine: CompressionEngine, + pub compression_engine: CompressionConfig, /// Can we use censor on this revlog pub censorable: bool, /// Does this revlog use the "side data" feature @@ -568,46 +499,11 @@ config: &Config, requirements: &HashSet<String>, ) -> Result<Self, HgError> { - let mut feature_config = Self::default(); - - let zlib_level = config.get_u32(b"storage", b"revlog.zlib.level")?; - let zstd_level = config.get_u32(b"storage", b"revlog.zstd.level")?; - - feature_config.compression_engine = CompressionEngine::default(); - - for requirement in requirements { - if requirement.starts_with("revlog-compression-") - || requirement.starts_with("exp-compression-") - { - let split = &mut requirement.splitn(3, '-'); - split.next(); - split.next(); - feature_config.compression_engine = match split.next().unwrap() - { - "zstd" => CompressionEngine::zstd(zstd_level)?, - e => { - return Err(HgError::UnsupportedFeature(format!( - "Unsupported compression engine '{e}'" - ))) - } - }; - } - } - if let Some(level) = zlib_level { - if matches!( - feature_config.compression_engine, - CompressionEngine::Zlib { .. } - ) { - feature_config - .compression_engine - .set_level(level as usize)?; - } - } - - feature_config.enable_ellipsis = - requirements.contains(NARROW_REQUIREMENT); - - Ok(feature_config) + Ok(Self { + compression_engine: CompressionConfig::new(config, requirements)?, + enable_ellipsis: requirements.contains(NARROW_REQUIREMENT), + ..Default::default() + }) } } @@ -1058,21 +954,6 @@ hash: Node, } -thread_local! { - // seems fine to [unwrap] here: this can only fail due to memory allocation - // failing, and it's normal for that to cause panic. - static ZSTD_DECODER : RefCell<zstd::bulk::Decompressor<'static>> = - RefCell::new(zstd::bulk::Decompressor::new().ok().unwrap()); -} - -fn zstd_decompress_to_buffer( - bytes: &[u8], - buf: &mut Vec<u8>, -) -> Result<usize, std::io::Error> { - ZSTD_DECODER - .with(|decoder| decoder.borrow_mut().decompress_to_buffer(bytes, buf)) -} - impl<'revlog> RevlogEntry<'revlog> { pub fn revision(&self) -> Revision { self.rev @@ -1218,7 +1099,11 @@ // zlib (RFC 1950) data. b'x' => Ok(Cow::Owned(self.uncompressed_zlib_data()?)), // zstd data. - b'\x28' => Ok(Cow::Owned(self.uncompressed_zstd_data()?)), + b'\x28' => Ok(Cow::Owned(uncompressed_zstd_data( + self.bytes, + self.is_delta(), + self.uncompressed_len.max(0), + )?)), // A proper new format should have had a repo/store requirement. format_type => Err(corrupted(format!( "unknown compression header '{}'", @@ -1245,38 +1130,6 @@ } } - fn uncompressed_zstd_data(&self) -> Result<Vec<u8>, HgError> { - let cap = self.uncompressed_len.max(0) as usize; - if self.is_delta() { - // [cap] is usually an over-estimate of the space needed because - // it's the length of delta-decoded data, but we're interested - // in the size of the delta. - // This means we have to [shrink_to_fit] to avoid holding on - // to a large chunk of memory, but it also means we must have a - // fallback branch, for the case when the delta is longer than - // the original data (surprisingly, this does happen in practice) - let mut buf = Vec::with_capacity(cap); - match zstd_decompress_to_buffer(self.bytes, &mut buf) { - Ok(_) => buf.shrink_to_fit(), - Err(_) => { - buf.clear(); - zstd::stream::copy_decode(self.bytes, &mut buf) - .map_err(|e| corrupted(e.to_string()))?; - } - }; - Ok(buf) - } else { - let mut buf = Vec::with_capacity(cap); - let len = zstd_decompress_to_buffer(self.bytes, &mut buf) - .map_err(|e| corrupted(e.to_string()))?; - if len != self.uncompressed_len as usize { - Err(corrupted("uncompressed length does not match")) - } else { - Ok(buf) - } - } - } - /// Tell if the entry is a snapshot or a delta /// (influences on decompression). fn is_delta(&self) -> bool {