rust-revlog: add compression helpers
This will be used in the upcoming `InnerRevlog` when reading/writing data.
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/rust/hg-core/src/revlog/compression.rs Wed Sep 25 16:42:21 2024 +0200
@@ -0,0 +1,383 @@
+//! Helpers around revlog compression
+
+use std::cell::RefCell;
+use std::collections::HashSet;
+use std::io::Read;
+
+use flate2::bufread::ZlibEncoder;
+use flate2::read::ZlibDecoder;
+
+use crate::config::Config;
+use crate::errors::HgError;
+use crate::exit_codes;
+
+use super::corrupted;
+use super::RevlogError;
+
+/// Header byte used to identify ZSTD-compressed data
+pub const ZSTD_BYTE: u8 = b'\x28';
+/// Header byte used to identify Zlib-compressed data
+pub const ZLIB_BYTE: u8 = b'x';
+
+const ZSTD_DEFAULT_LEVEL: u8 = 3;
+const ZLIB_DEFAULT_LEVEL: u8 = 6;
+/// The length of data below which we don't even try to compress it when using
+/// Zstandard.
+const MINIMUM_LENGTH_ZSTD: usize = 50;
+/// The length of data below which we don't even try to compress it when using
+/// Zlib.
+const MINIMUM_LENGTH_ZLIB: usize = 44;
+
+/// Defines the available compression engines and their options.
+#[derive(Debug, Copy, Clone, PartialEq, Eq)]
+pub enum CompressionConfig {
+ Zlib {
+ /// Between 0 and 9 included
+ level: u8,
+ },
+ Zstd {
+ /// Between 0 and 22 included
+ level: u8,
+ /// Never used in practice for now
+ threads: u8,
+ },
+ /// No compression is performed
+ None,
+}
+
+impl CompressionConfig {
+ pub fn new(
+ config: &Config,
+ requirements: &HashSet<String>,
+ ) -> Result<Self, HgError> {
+ let mut new = Self::default();
+
+ let zlib_level = config.get_u32(b"storage", b"revlog.zlib.level")?;
+ let zstd_level = config.get_u32(b"storage", b"revlog.zstd.level")?;
+
+ for requirement in requirements {
+ if requirement.starts_with("revlog-compression-")
+ || requirement.starts_with("exp-compression-")
+ {
+ let split = &mut requirement.splitn(3, '-');
+ split.next();
+ split.next();
+ new = match split.next().unwrap() {
+ "zstd" => CompressionConfig::zstd(zstd_level)?,
+ e => {
+ return Err(HgError::UnsupportedFeature(format!(
+ "Unsupported compression engine '{e}'"
+ )))
+ }
+ };
+ }
+ }
+ if let Some(level) = zlib_level {
+ if matches!(new, CompressionConfig::Zlib { .. }) {
+ new.set_level(level as usize)?;
+ }
+ }
+ Ok(new)
+ }
+
+ /// Sets the level of the current compression engine
+ pub fn set_level(&mut self, new_level: usize) -> Result<(), HgError> {
+ match self {
+ CompressionConfig::Zlib { level } => {
+ if new_level > 9 {
+ return Err(HgError::abort(
+ format!(
+ "invalid compression zlib compression level {}, \
+ expected between 0 and 9 included",
+ new_level
+ ),
+ exit_codes::ABORT,
+ None,
+ ));
+ }
+ *level = new_level as u8;
+ }
+ CompressionConfig::Zstd { level, .. } => {
+ if new_level > 22 {
+ return Err(HgError::abort(
+ format!(
+ "invalid compression zstd compression level {}, \
+ expected between 0 and 22 included",
+ new_level
+ ),
+ exit_codes::ABORT,
+ None,
+ ));
+ }
+ *level = new_level as u8;
+ }
+ CompressionConfig::None => {}
+ }
+ Ok(())
+ }
+
+ /// Return a ZSTD compression config
+ pub fn zstd(
+ zstd_level: Option<u32>,
+ ) -> Result<CompressionConfig, HgError> {
+ let mut engine = CompressionConfig::Zstd {
+ level: ZSTD_DEFAULT_LEVEL,
+ threads: 0,
+ };
+ if let Some(level) = zstd_level {
+ engine.set_level(level as usize)?;
+ }
+ Ok(engine)
+ }
+}
+
+impl Default for CompressionConfig {
+ fn default() -> Self {
+ Self::Zlib {
+ level: ZLIB_DEFAULT_LEVEL,
+ }
+ }
+}
+
+/// A high-level trait to define compressors that should be able to compress
+/// and decompress arbitrary bytes.
+pub trait Compressor {
+ /// Returns a new [`Vec`] with the compressed data.
+ /// Should return `Ok(None)` if compression does not apply (e.g. too small)
+ fn compress(
+ &mut self,
+ data: &[u8],
+ ) -> Result<Option<Vec<u8>>, RevlogError>;
+ /// Returns a new [`Vec`] with the decompressed data.
+ fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, RevlogError>;
+}
+
+/// A compressor that does nothing (useful in tests)
+pub struct NoneCompressor;
+
+impl Compressor for NoneCompressor {
+ fn compress(
+ &mut self,
+ _data: &[u8],
+ ) -> Result<Option<Vec<u8>>, RevlogError> {
+ Ok(None)
+ }
+
+ fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, RevlogError> {
+ Ok(data.to_owned())
+ }
+}
+
+/// A compressor for Zstandard
+pub struct ZstdCompressor {
+ /// Level of compression to use
+ level: u8,
+ /// How many threads are used (not implemented yet)
+ threads: u8,
+ /// The underlying zstd compressor
+ compressor: zstd::bulk::Compressor<'static>,
+}
+
+impl ZstdCompressor {
+ pub fn new(level: u8, threads: u8) -> Self {
+ Self {
+ level,
+ threads,
+ compressor: zstd::bulk::Compressor::new(level.into())
+ .expect("invalid zstd arguments"),
+ }
+ }
+}
+
+impl Compressor for ZstdCompressor {
+ fn compress(
+ &mut self,
+ data: &[u8],
+ ) -> Result<Option<Vec<u8>>, RevlogError> {
+ if self.threads != 0 {
+ // TODO use a zstd builder + zstd cargo feature to support this
+ unimplemented!("zstd parallel compression is not implemented");
+ }
+ if data.len() < MINIMUM_LENGTH_ZSTD {
+ return Ok(None);
+ }
+ let level = self.level as i32;
+ if data.len() <= 1000000 {
+ let compressed = self.compressor.compress(data).map_err(|e| {
+ corrupted(format!("revlog compress error: {}", e))
+ })?;
+ Ok(if compressed.len() < data.len() {
+ Some(compressed)
+ } else {
+ None
+ })
+ } else {
+ Ok(Some(zstd::stream::encode_all(data, level).map_err(
+ |e| corrupted(format!("revlog compress error: {}", e)),
+ )?))
+ }
+ }
+
+ fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, RevlogError> {
+ zstd::stream::decode_all(data).map_err(|e| {
+ corrupted(format!("revlog decompress error: {}", e)).into()
+ })
+ }
+}
+
+/// A compressor for Zlib
+pub struct ZlibCompressor {
+ /// Level of compression to use
+ level: flate2::Compression,
+}
+
+impl ZlibCompressor {
+ pub fn new(level: u8) -> Self {
+ Self {
+ level: flate2::Compression::new(level.into()),
+ }
+ }
+}
+
+impl Compressor for ZlibCompressor {
+ fn compress(
+ &mut self,
+ data: &[u8],
+ ) -> Result<Option<Vec<u8>>, RevlogError> {
+ assert!(!data.is_empty());
+ if data.len() < MINIMUM_LENGTH_ZLIB {
+ return Ok(None);
+ }
+ let mut buf = Vec::with_capacity(data.len());
+ ZlibEncoder::new(data, self.level)
+ .read_to_end(&mut buf)
+ .map_err(|e| corrupted(format!("revlog compress error: {}", e)))?;
+
+ Ok(if buf.len() < data.len() {
+ buf.shrink_to_fit();
+ Some(buf)
+ } else {
+ None
+ })
+ }
+
+ fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, RevlogError> {
+ let mut decoder = ZlibDecoder::new(data);
+ // TODO reuse the allocation somehow?
+ let mut buf = vec![];
+ decoder.read_to_end(&mut buf).map_err(|e| {
+ corrupted(format!("revlog decompress error: {}", e))
+ })?;
+ Ok(buf)
+ }
+}
+
+thread_local! {
+ // seems fine to [unwrap] here: this can only fail due to memory allocation
+ // failing, and it's normal for that to cause panic.
+ static ZSTD_DECODER : RefCell<zstd::bulk::Decompressor<'static>> =
+ RefCell::new(zstd::bulk::Decompressor::new().ok().unwrap());
+}
+
+/// Util to wrap the reuse of a zstd decoder while controlling its buffer size.
+fn zstd_decompress_to_buffer(
+ bytes: &[u8],
+ buf: &mut Vec<u8>,
+) -> Result<usize, std::io::Error> {
+ ZSTD_DECODER
+ .with(|decoder| decoder.borrow_mut().decompress_to_buffer(bytes, buf))
+}
+
+/// Specialized revlog decompression to use less memory for deltas while
+/// keeping performance acceptable.
+pub(super) fn uncompressed_zstd_data(
+ bytes: &[u8],
+ is_delta: bool,
+ uncompressed_len: i32,
+) -> Result<Vec<u8>, HgError> {
+ let cap = uncompressed_len.max(0) as usize;
+ if is_delta {
+ // [cap] is usually an over-estimate of the space needed because
+ // it's the length of delta-decoded data, but we're interested
+ // in the size of the delta.
+ // This means we have to [shrink_to_fit] to avoid holding on
+ // to a large chunk of memory, but it also means we must have a
+ // fallback branch, for the case when the delta is longer than
+ // the original data (surprisingly, this does happen in practice)
+ let mut buf = Vec::with_capacity(cap);
+ match zstd_decompress_to_buffer(bytes, &mut buf) {
+ Ok(_) => buf.shrink_to_fit(),
+ Err(_) => {
+ buf.clear();
+ zstd::stream::copy_decode(bytes, &mut buf)
+ .map_err(|e| corrupted(e.to_string()))?;
+ }
+ };
+ Ok(buf)
+ } else {
+ let mut buf = Vec::with_capacity(cap);
+ let len = zstd_decompress_to_buffer(bytes, &mut buf)
+ .map_err(|e| corrupted(e.to_string()))?;
+ if len != uncompressed_len as usize {
+ Err(corrupted("uncompressed length does not match"))
+ } else {
+ Ok(buf)
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ const LARGE_TEXT: &[u8] = b"
+ Patali Dirapata, Cromda Cromda Ripalo, Pata Pata, Ko Ko Ko
+ Bokoro Dipoulito, Rondi Rondi Pepino, Pata Pata, Ko Ko Ko
+ Emana Karassoli, Loucra Loucra Nonponto, Pata Pata, Ko Ko Ko.";
+
+ #[test]
+ fn test_zlib_compressor() {
+ // Can return `Ok(None)`
+ let mut compressor = ZlibCompressor::new(1);
+ assert_eq!(compressor.compress(b"too small").unwrap(), None);
+
+ // Compression returns bytes
+ let compressed_with_1 =
+ compressor.compress(LARGE_TEXT).unwrap().unwrap();
+ assert!(compressed_with_1.len() < LARGE_TEXT.len());
+ // Round trip works
+ assert_eq!(
+ compressor.decompress(&compressed_with_1).unwrap(),
+ LARGE_TEXT
+ );
+
+ // Compression levels mean something
+ let mut compressor = ZlibCompressor::new(9);
+ // Compression returns bytes
+ let compressed = compressor.compress(LARGE_TEXT).unwrap().unwrap();
+ assert!(compressed.len() < compressed_with_1.len());
+ }
+
+ #[test]
+ fn test_zstd_compressor() {
+ // Can return `Ok(None)`
+ let mut compressor = ZstdCompressor::new(1, 0);
+ assert_eq!(compressor.compress(b"too small").unwrap(), None);
+
+ // Compression returns bytes
+ let compressed_with_1 =
+ compressor.compress(LARGE_TEXT).unwrap().unwrap();
+ assert!(compressed_with_1.len() < LARGE_TEXT.len());
+ // Round trip works
+ assert_eq!(
+ compressor.decompress(&compressed_with_1).unwrap(),
+ LARGE_TEXT
+ );
+
+ // Compression levels mean something
+ let mut compressor = ZstdCompressor::new(22, 0);
+ // Compression returns bytes
+ let compressed = compressor.compress(LARGE_TEXT).unwrap().unwrap();
+ assert!(compressed.len() < compressed_with_1.len());
+ }
+}
--- a/rust/hg-core/src/revlog/mod.rs Tue Oct 29 09:38:48 2024 +0100
+++ b/rust/hg-core/src/revlog/mod.rs Wed Sep 25 16:42:21 2024 +0200
@@ -9,8 +9,10 @@
pub mod nodemap;
mod nodemap_docket;
pub mod path_encode;
+use compression::{uncompressed_zstd_data, CompressionConfig};
pub use node::{FromHexError, Node, NodePrefix};
pub mod changelog;
+pub mod compression;
pub mod filelog;
pub mod index;
pub mod manifest;
@@ -24,8 +26,6 @@
use flate2::read::ZlibDecoder;
use sha1::{Digest, Sha1};
-use std::cell::RefCell;
-use zstd;
use self::node::{NODE_BYTES_LENGTH, NULL_NODE};
use self::nodemap_docket::NodeMapDocket;
@@ -258,75 +258,6 @@
}
}
-#[derive(Debug, Copy, Clone, PartialEq, Eq)]
-pub enum CompressionEngine {
- Zlib {
- /// Between 0 and 9 included
- level: u32,
- },
- Zstd {
- /// Between 0 and 22 included
- level: u32,
- /// Never used in practice for now
- threads: u32,
- },
- /// No compression is performed
- None,
-}
-impl CompressionEngine {
- pub fn set_level(&mut self, new_level: usize) -> Result<(), HgError> {
- match self {
- CompressionEngine::Zlib { level } => {
- if new_level > 9 {
- return Err(HgError::abort(
- format!(
- "invalid compression zlib compression level {}",
- new_level
- ),
- exit_codes::ABORT,
- None,
- ));
- }
- *level = new_level as u32;
- }
- CompressionEngine::Zstd { level, .. } => {
- if new_level > 22 {
- return Err(HgError::abort(
- format!(
- "invalid compression zstd compression level {}",
- new_level
- ),
- exit_codes::ABORT,
- None,
- ));
- }
- *level = new_level as u32;
- }
- CompressionEngine::None => {}
- }
- Ok(())
- }
-
- pub fn zstd(
- zstd_level: Option<u32>,
- ) -> Result<CompressionEngine, HgError> {
- let mut engine = CompressionEngine::Zstd {
- level: 3,
- threads: 0,
- };
- if let Some(level) = zstd_level {
- engine.set_level(level as usize)?;
- }
- Ok(engine)
- }
-}
-
-impl Default for CompressionEngine {
- fn default() -> Self {
- Self::Zlib { level: 6 }
- }
-}
-
#[derive(Debug, Clone, Copy, PartialEq)]
/// Holds configuration values about how the revlog data is read
pub struct RevlogDataConfig {
@@ -546,7 +477,7 @@
/// Holds configuration values about the available revlog features
pub struct RevlogFeatureConfig {
/// The compression engine and its options
- pub compression_engine: CompressionEngine,
+ pub compression_engine: CompressionConfig,
/// Can we use censor on this revlog
pub censorable: bool,
/// Does this revlog use the "side data" feature
@@ -568,46 +499,11 @@
config: &Config,
requirements: &HashSet<String>,
) -> Result<Self, HgError> {
- let mut feature_config = Self::default();
-
- let zlib_level = config.get_u32(b"storage", b"revlog.zlib.level")?;
- let zstd_level = config.get_u32(b"storage", b"revlog.zstd.level")?;
-
- feature_config.compression_engine = CompressionEngine::default();
-
- for requirement in requirements {
- if requirement.starts_with("revlog-compression-")
- || requirement.starts_with("exp-compression-")
- {
- let split = &mut requirement.splitn(3, '-');
- split.next();
- split.next();
- feature_config.compression_engine = match split.next().unwrap()
- {
- "zstd" => CompressionEngine::zstd(zstd_level)?,
- e => {
- return Err(HgError::UnsupportedFeature(format!(
- "Unsupported compression engine '{e}'"
- )))
- }
- };
- }
- }
- if let Some(level) = zlib_level {
- if matches!(
- feature_config.compression_engine,
- CompressionEngine::Zlib { .. }
- ) {
- feature_config
- .compression_engine
- .set_level(level as usize)?;
- }
- }
-
- feature_config.enable_ellipsis =
- requirements.contains(NARROW_REQUIREMENT);
-
- Ok(feature_config)
+ Ok(Self {
+ compression_engine: CompressionConfig::new(config, requirements)?,
+ enable_ellipsis: requirements.contains(NARROW_REQUIREMENT),
+ ..Default::default()
+ })
}
}
@@ -1058,21 +954,6 @@
hash: Node,
}
-thread_local! {
- // seems fine to [unwrap] here: this can only fail due to memory allocation
- // failing, and it's normal for that to cause panic.
- static ZSTD_DECODER : RefCell<zstd::bulk::Decompressor<'static>> =
- RefCell::new(zstd::bulk::Decompressor::new().ok().unwrap());
-}
-
-fn zstd_decompress_to_buffer(
- bytes: &[u8],
- buf: &mut Vec<u8>,
-) -> Result<usize, std::io::Error> {
- ZSTD_DECODER
- .with(|decoder| decoder.borrow_mut().decompress_to_buffer(bytes, buf))
-}
-
impl<'revlog> RevlogEntry<'revlog> {
pub fn revision(&self) -> Revision {
self.rev
@@ -1218,7 +1099,11 @@
// zlib (RFC 1950) data.
b'x' => Ok(Cow::Owned(self.uncompressed_zlib_data()?)),
// zstd data.
- b'\x28' => Ok(Cow::Owned(self.uncompressed_zstd_data()?)),
+ b'\x28' => Ok(Cow::Owned(uncompressed_zstd_data(
+ self.bytes,
+ self.is_delta(),
+ self.uncompressed_len.max(0),
+ )?)),
// A proper new format should have had a repo/store requirement.
format_type => Err(corrupted(format!(
"unknown compression header '{}'",
@@ -1245,38 +1130,6 @@
}
}
- fn uncompressed_zstd_data(&self) -> Result<Vec<u8>, HgError> {
- let cap = self.uncompressed_len.max(0) as usize;
- if self.is_delta() {
- // [cap] is usually an over-estimate of the space needed because
- // it's the length of delta-decoded data, but we're interested
- // in the size of the delta.
- // This means we have to [shrink_to_fit] to avoid holding on
- // to a large chunk of memory, but it also means we must have a
- // fallback branch, for the case when the delta is longer than
- // the original data (surprisingly, this does happen in practice)
- let mut buf = Vec::with_capacity(cap);
- match zstd_decompress_to_buffer(self.bytes, &mut buf) {
- Ok(_) => buf.shrink_to_fit(),
- Err(_) => {
- buf.clear();
- zstd::stream::copy_decode(self.bytes, &mut buf)
- .map_err(|e| corrupted(e.to_string()))?;
- }
- };
- Ok(buf)
- } else {
- let mut buf = Vec::with_capacity(cap);
- let len = zstd_decompress_to_buffer(self.bytes, &mut buf)
- .map_err(|e| corrupted(e.to_string()))?;
- if len != self.uncompressed_len as usize {
- Err(corrupted("uncompressed length does not match"))
- } else {
- Ok(buf)
- }
- }
- }
-
/// Tell if the entry is a snapshot or a delta
/// (influences on decompression).
fn is_delta(&self) -> bool {