diff --git a/Cargo.lock b/Cargo.lock index 4bc87e2..a8682f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -86,11 +86,20 @@ version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" dependencies = [ - "hermit-abi", + "hermit-abi 0.1.19", "libc", "winapi", ] +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -414,6 +423,12 @@ dependencies = [ "libc", ] +[[package]] +name = "hermit-abi" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" + [[package]] name = "hex" version = "0.4.3" @@ -521,6 +536,16 @@ dependencies = [ "libc", ] +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi 0.3.3", + "libc", +] + [[package]] name = "num_enum" version = "0.6.1" @@ -710,6 +735,7 @@ dependencies = [ "walkdir", "xattr", "zstd", + "zstd-seekable", ] [[package]] @@ -936,6 +962,15 @@ dependencies = [ "syn", ] +[[package]] +name = "threadpool" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa" +dependencies = [ + "num_cpus", +] + [[package]] name = "time" version = "0.3.28" @@ -1191,6 +1226,22 @@ dependencies = [ "zstd-sys", ] +[[package]] +name = "zstd-seekable" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "574a117c5cdb88d1f13381ee3a19a6a45fb6ca0c98436d3a95df852b7ca6c3c2" +dependencies = [ + "bincode", + "cc", + "libc", + "pkg-config", + "serde", + "serde_derive", + "thiserror", + "threadpool", +] + [[package]] name = "zstd-sys" version = "2.0.8+zstd.1.5.5" diff --git a/puzzlefs-lib/Cargo.toml b/puzzlefs-lib/Cargo.toml index c14e87f..57952d4 100644 --- a/puzzlefs-lib/Cargo.toml +++ b/puzzlefs-lib/Cargo.toml @@ -39,6 +39,8 @@ fuser = {version = "0.11.1", default-features = false} os_pipe = "1.1.2" tempfile = "3.8.0" openat = "0.1.21" +zstd-seekable = "0.1.23" + [dev-dependencies] tempfile = "3.8.0" diff --git a/puzzlefs-lib/src/builder.rs b/puzzlefs-lib/src/builder.rs index c3e687a..533d8a3 100644 --- a/puzzlefs-lib/src/builder.rs +++ b/puzzlefs-lib/src/builder.rs @@ -106,7 +106,7 @@ fn serialize_metadata(inodes: Vec) -> Result> { Ok(buf) } -fn process_chunks Compression<'a> + Any>( +fn process_chunks( oci: &Image, mut chunker: StreamCDC, files: &mut [File], @@ -178,7 +178,7 @@ fn process_chunks Compression<'a> + Any>( Ok(()) } -fn build_delta Compression<'a> + Any>( +fn build_delta( rootfs: &Path, oci: &Image, mut existing: Option, @@ -414,7 +414,7 @@ fn build_delta Compression<'a> + Any>( Ok(desc) } -pub fn build_initial_rootfs Compression<'a> + Any>( +pub fn build_initial_rootfs( rootfs: &Path, oci: &Image, ) -> Result { @@ -440,7 +440,7 @@ pub fn build_initial_rootfs Compression<'a> + Any>( // add_rootfs_delta adds whatever the delta between the current rootfs and the puzzlefs // representation from the tag is. -pub fn add_rootfs_delta Compression<'a> + Any>( +pub fn add_rootfs_delta( rootfs_path: &Path, oci: Image, tag: &str, @@ -554,7 +554,7 @@ pub mod tests { // there should be a blob that matches the hash of the test data, since it all gets input // as one chunk and there's only one file const FILE_DIGEST: &str = - "a7b1fbc3c77f9ffc40c051e3608d607d63eebcd23c559958043eccb64bdab7ff"; + "3eee1082ab3babf6c1595f1069d11ebc2a60135890a11e402e017ddd831a220d"; let md = fs::symlink_metadata(image.blob_path().join(FILE_DIGEST)).unwrap(); assert!(md.is_file()); diff --git a/puzzlefs-lib/src/compression.rs b/puzzlefs-lib/src/compression.rs index f86478b..44bdb28 100644 --- a/puzzlefs-lib/src/compression.rs +++ b/puzzlefs-lib/src/compression.rs @@ -4,21 +4,21 @@ use std::io::Seek; mod noop; pub use noop::Noop; -mod zstd_wrapper; -pub use zstd_wrapper::*; +mod zstd_seekable_wrapper; +pub use zstd_seekable_wrapper::*; pub trait Compressor: io::Write { // https://users.rust-lang.org/t/how-to-move-self-when-using-dyn-trait/50123 fn end(self: Box) -> io::Result<()>; } -pub trait Decompressor: io::Read + io::Seek + Send { +pub trait Decompressor: io::Read + io::Seek { fn get_uncompressed_length(&mut self) -> io::Result; } -pub trait Compression<'a> { - fn compress(dest: W) -> io::Result>; - fn decompress( +pub trait Compression { + fn compress<'a, W: std::io::Write + 'a>(dest: W) -> io::Result>; + fn decompress<'a, R: std::io::Read + Seek + 'a>( source: R, ) -> io::Result>; fn append_extension(media_type: &str) -> String; @@ -31,7 +31,7 @@ mod tests { pub const TRUTH: &str = "meshuggah rocks"; - pub fn compress_decompress Compression<'a>>() -> anyhow::Result<()> { + pub fn compress_decompress() -> anyhow::Result<()> { let f = NamedTempFile::new()?; let mut compressed = C::compress(f.reopen()?)?; compressed.write_all(TRUTH.as_bytes())?; @@ -45,7 +45,7 @@ mod tests { Ok(()) } - pub fn compression_is_seekable Compression<'a>>() -> anyhow::Result<()> { + pub fn compression_is_seekable() -> anyhow::Result<()> { let f = NamedTempFile::new()?; let mut compressed = C::compress(f.reopen()?)?; compressed.write_all(TRUTH.as_bytes())?; diff --git a/puzzlefs-lib/src/compression/noop.rs b/puzzlefs-lib/src/compression/noop.rs index 52c2d1c..83f1462 100644 --- a/puzzlefs-lib/src/compression/noop.rs +++ b/puzzlefs-lib/src/compression/noop.rs @@ -24,36 +24,36 @@ impl Compressor for NoopCompressor { } } -pub struct NoopDecompressor { +pub struct NoopDecompressor { decoder: Box, } -impl Seek for NoopDecompressor { +impl Seek for NoopDecompressor { fn seek(&mut self, offset: io::SeekFrom) -> io::Result { self.decoder.seek(offset) } } -impl Read for NoopDecompressor { +impl Read for NoopDecompressor { fn read(&mut self, out: &mut [u8]) -> io::Result { self.decoder.read(out) } } -impl Decompressor for NoopDecompressor { +impl Decompressor for NoopDecompressor { fn get_uncompressed_length(&mut self) -> io::Result { self.decoder.stream_len() } } -impl<'a> Compression<'a> for Noop { - fn compress(dest: W) -> io::Result> { +impl Compression for Noop { + fn compress<'a, W: std::io::Write + 'a>(dest: W) -> io::Result> { Ok(Box::new(NoopCompressor { encoder: Box::new(dest), })) } - fn decompress( + fn decompress<'a, R: std::io::Read + Seek + 'a>( source: R, ) -> io::Result> { Ok(Box::new(NoopDecompressor { diff --git a/puzzlefs-lib/src/compression/zstd_seekable_wrapper.rs b/puzzlefs-lib/src/compression/zstd_seekable_wrapper.rs new file mode 100644 index 0000000..b10da92 --- /dev/null +++ b/puzzlefs-lib/src/compression/zstd_seekable_wrapper.rs @@ -0,0 +1,165 @@ +use std::cmp::min; +use std::convert::TryFrom; +use std::io; +use std::io::{Read, Seek, Write}; + +use zstd_seekable::{CStream, Seekable, SeekableCStream}; + +use crate::compression::{Compression, Compressor, Decompressor}; + +// We compress files in 4KB frames; it's not clear what the ideal size for this is, but each frame +// is compressed independently so the bigger they are the more compression savings we get. However, +// the bigger they are the more decompression we have to do to get to the data in the middle of a +// frame if someone e.g. mmap()s something in the middle of a frame. +// +// Another consideration is the average chunk size from FastCDC: if we make this the same as the +// chunk size, there's no real point in using seekable compression at all, at least for files. It's +// also possible that we want different frame sizes for metadata blobs and file content. +const FRAME_SIZE: usize = 4096; +const COMPRESSION_LEVEL: usize = 3; + +fn err_to_io(e: E) -> io::Error { + io::Error::new(io::ErrorKind::Other, e) +} + +pub struct ZstdCompressor { + f: W, + stream: SeekableCStream, + buf: Vec, +} + +impl Compressor for ZstdCompressor { + fn end(mut self: Box) -> io::Result<()> { + // end_stream has to be called multiple times until 0 is returned, see + // https://docs.rs/zstd-seekable/0.1.23/src/zstd_seekable/lib.rs.html#224-237 and + // https://fossies.org/linux/zstd/contrib/seekable_format/zstd_seekable.h + loop { + let size = self.stream.end_stream(&mut self.buf).map_err(err_to_io)?; + self.f.write_all(&self.buf[0..size])?; + if size == 0 { + break; + } + } + Ok(()) + } +} + +impl Write for ZstdCompressor { + fn write(&mut self, buf: &[u8]) -> io::Result { + // TODO: we could try to consume all the input, but for now we just consume a single block + let (out_pos, in_pos) = self + .stream + .compress(&mut self.buf, buf) + .map_err(err_to_io)?; + self.f.write_all(&self.buf[0..out_pos])?; + Ok(in_pos) + } + + fn flush(&mut self) -> io::Result<()> { + // we could self.stream.flush(), but that adversely affects compression ratio... let's + // cheat for now. + Ok(()) + } +} + +pub struct ZstdDecompressor<'a, R: Read + Seek> { + stream: Seekable<'a, R>, + offset: u64, + uncompressed_length: u64, +} + +impl<'a, R: Seek + Read> Decompressor for ZstdDecompressor<'a, R> { + fn get_uncompressed_length(&mut self) -> io::Result { + Ok(self.uncompressed_length) + } +} + +impl<'a, R: Seek + Read> Seek for ZstdDecompressor<'a, R> { + fn seek(&mut self, offset: io::SeekFrom) -> io::Result { + match offset { + io::SeekFrom::Start(s) => { + self.offset = s; + } + io::SeekFrom::End(e) => { + if e > 0 { + return Err(io::Error::new(io::ErrorKind::Other, "zstd seek past end")); + } + self.offset = self.uncompressed_length - u64::try_from(-e).map_err(err_to_io)?; + } + io::SeekFrom::Current(c) => { + if c > 0 { + self.offset += u64::try_from(c).map_err(err_to_io)?; + } else { + self.offset -= u64::try_from(-c).map_err(err_to_io)?; + } + } + } + Ok(self.offset) + } +} + +impl<'a, R: Seek + Read> Read for ZstdDecompressor<'a, R> { + fn read(&mut self, out: &mut [u8]) -> io::Result { + // decompress() gets angry (ZSTD("Corrupted block detected")) if you pass it a buffer + // longer than the uncompressable data, so let's be careful to truncate the buffer if it + // would make zstd angry. maybe soon they'll implement a real read() API :) + let end = min(out.len(), (self.uncompressed_length - self.offset) as usize); + let size = self + .stream + .decompress(&mut out[0..end], self.offset) + .map_err(err_to_io)?; + self.offset += size as u64; + Ok(size) + } +} + +pub struct Zstd {} + +impl Compression for Zstd { + fn compress<'a, W: Write + 'a>(dest: W) -> io::Result> { + // a "pretty high" compression level, since decompression should be nearly the same no + // matter what compression level. Maybe we should turn this to 22 or whatever the max is... + let stream = SeekableCStream::new(COMPRESSION_LEVEL, FRAME_SIZE).map_err(err_to_io)?; + Ok(Box::new(ZstdCompressor { + f: dest, + stream, + buf: vec![0_u8; CStream::out_size()], + })) + } + + fn decompress<'a, R: Read + Seek + 'a>(source: R) -> io::Result> { + let stream = Seekable::init(Box::new(source)).map_err(err_to_io)?; + + // zstd-seekable doesn't like it when we pass a buffer past the end of the uncompressed + // stream, so let's figure out the size of the uncompressed file so we can implement + // ::read() in a reasonable way. This also lets us implement SeekFrom::End. + let uncompressed_length = (0..stream.get_num_frames()) + .map(|i| stream.get_frame_decompressed_size(i) as u64) + .sum(); + Ok(Box::new(ZstdDecompressor { + stream, + offset: 0, + uncompressed_length, + })) + } + + fn append_extension(media_type: &str) -> String { + format!("{media_type}+zstd") + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::compression::tests::{compress_decompress, compression_is_seekable}; + + #[test] + fn test_ztsd_roundtrip() -> anyhow::Result<()> { + compress_decompress::() + } + + #[test] + fn test_zstd_seekable() -> anyhow::Result<()> { + compression_is_seekable::() + } +} diff --git a/puzzlefs-lib/src/compression/zstd_wrapper.rs b/puzzlefs-lib/src/compression/zstd_wrapper.rs deleted file mode 100644 index d94f4eb..0000000 --- a/puzzlefs-lib/src/compression/zstd_wrapper.rs +++ /dev/null @@ -1,127 +0,0 @@ -use crate::common::MAX_CHUNK_SIZE; -use std::cmp::min; -use std::convert::TryFrom; -use std::convert::TryInto; -use std::io; -use std::io::{Read, Write}; - -use crate::compression::{Compression, Compressor, Decompressor}; - -const COMPRESSION_LEVEL: i32 = 3; - -fn err_to_io(e: E) -> io::Error { - io::Error::new(io::ErrorKind::Other, e) -} - -pub struct ZstdCompressor { - encoder: zstd::stream::write::Encoder<'static, W>, -} - -impl Compressor for ZstdCompressor { - fn end(self: Box) -> io::Result<()> { - self.encoder.finish()?; - Ok(()) - } -} - -impl io::Write for ZstdCompressor { - fn write(&mut self, buf: &[u8]) -> io::Result { - self.encoder.write(buf) - } - - fn flush(&mut self) -> io::Result<()> { - self.encoder.flush() - } -} - -pub struct ZstdDecompressor { - buf: Vec, - offset: u64, - uncompressed_length: u64, -} - -impl Decompressor for ZstdDecompressor { - fn get_uncompressed_length(&mut self) -> io::Result { - Ok(self.uncompressed_length) - } -} - -impl io::Seek for ZstdDecompressor { - fn seek(&mut self, offset: io::SeekFrom) -> io::Result { - match offset { - io::SeekFrom::Start(s) => { - self.offset = s; - } - io::SeekFrom::End(e) => { - if e > 0 { - return Err(io::Error::new(io::ErrorKind::Other, "zstd seek past end")); - } - self.offset = self.uncompressed_length - u64::try_from(-e).map_err(err_to_io)?; - } - io::SeekFrom::Current(c) => { - if c > 0 { - self.offset += u64::try_from(c).map_err(err_to_io)?; - } else { - self.offset -= u64::try_from(-c).map_err(err_to_io)?; - } - } - } - Ok(self.offset) - } -} - -impl io::Read for ZstdDecompressor { - fn read(&mut self, out: &mut [u8]) -> io::Result { - let len = min( - out.len(), - (self.uncompressed_length - self.offset) - .try_into() - .map_err(err_to_io)?, - ); - let offset: usize = self.offset.try_into().map_err(err_to_io)?; - out[..len].copy_from_slice(&self.buf[offset..offset + len]); - Ok(len) - } -} -pub struct Zstd {} - -impl<'a> Compression<'a> for Zstd { - fn compress(dest: W) -> io::Result> { - let encoder = zstd::stream::write::Encoder::new(dest, COMPRESSION_LEVEL)?; - Ok(Box::new(ZstdCompressor { encoder })) - } - - fn decompress(mut source: R) -> io::Result> { - let mut contents = Vec::new(); - source.read_to_end(&mut contents)?; - let mut decompressor = zstd::bulk::Decompressor::new()?; - let decompressed_buffer = - decompressor.decompress(&contents, MAX_CHUNK_SIZE.try_into().map_err(err_to_io)?)?; - let uncompressed_length = decompressed_buffer.len(); - Ok(Box::new(ZstdDecompressor { - buf: decompressed_buffer, - offset: 0, - uncompressed_length: uncompressed_length.try_into().map_err(err_to_io)?, - })) - } - - fn append_extension(media_type: &str) -> String { - format!("{media_type}+zstd") - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::compression::tests::{compress_decompress, compression_is_seekable}; - - #[test] - fn test_ztsd_roundtrip() -> anyhow::Result<()> { - compress_decompress::() - } - - #[test] - fn test_zstd_seekable() -> anyhow::Result<()> { - compression_is_seekable::() - } -} diff --git a/puzzlefs-lib/src/oci.rs b/puzzlefs-lib/src/oci.rs index 97fd96d..f99a520 100644 --- a/puzzlefs-lib/src/oci.rs +++ b/puzzlefs-lib/src/oci.rs @@ -83,7 +83,7 @@ impl Image { PathBuf::from("blobs/sha256") } - pub fn put_blob Compression<'a> + Any, MT: media_types::MediaType>( + pub fn put_blob( &self, buf: &[u8], ) -> Result<(Descriptor, [u8; SHA256_BLOCK_SIZE], bool)> { @@ -149,7 +149,7 @@ impl Image { Ok(file) } - pub fn open_compressed_blob Compression<'a>>( + pub fn open_compressed_blob( &self, digest: &Digest, verity: Option<&[u8]>, @@ -176,7 +176,7 @@ impl Image { Ok(file) } - pub fn open_rootfs_blob Compression<'a>>( + pub fn open_rootfs_blob( &self, tag: &str, verity: Option<&[u8]>,