From 241c3aa80231448c9f90b15aa4ff423590acb0c0 Mon Sep 17 00:00:00 2001 From: Icelk Date: Tue, 24 May 2022 23:36:48 +0200 Subject: [PATCH 1/4] Added tests for write::FrameDecoder --- test/tests.rs | 56 ++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 49 insertions(+), 7 deletions(-) diff --git a/test/tests.rs b/test/tests.rs index e23dafa..224fb8e 100644 --- a/test/tests.rs +++ b/test/tests.rs @@ -74,10 +74,16 @@ macro_rules! testtrip { } #[test] - fn roundtrip_frame() { - use super::{read_frame_depress, write_frame_press}; + fn roundtrip_frame_reader() { + use super::{read_frame_depress_reader, write_frame_press}; let d = &$data[..]; - assert_eq!(d, &*read_frame_depress(&write_frame_press(d))); + assert_eq!(d, &*read_frame_depress_reader(&write_frame_press(d))); + } + #[test] + fn roundtrip_frame_writer() { + use super::{read_frame_depress_writer, write_frame_press}; + let d = &$data[..]; + assert_eq!(d, &*read_frame_depress_writer(&write_frame_press(d))); } #[test] @@ -480,13 +486,28 @@ fn qc_roundtrip() { } #[test] -fn qc_roundtrip_stream() { +fn qc_roundtrip_stream_reader() { fn p(bytes: Vec) -> TestResult { if bytes.is_empty() { return TestResult::discard(); } TestResult::from_bool( - read_frame_depress(&write_frame_press(&bytes)) == bytes, + read_frame_depress_reader(&write_frame_press(&bytes)) == bytes, + ) + } + QuickCheck::new() + .gen(StdGen::new(rand::thread_rng(), 10_000)) + .tests(1_000) + .quickcheck(p as fn(_) -> _); +} +#[test] +fn qc_roundtrip_stream_writer() { + fn p(bytes: Vec) -> TestResult { + if bytes.is_empty() { + return TestResult::discard(); + } + TestResult::from_bool( + read_frame_depress_writer(&write_frame_press(&bytes)) == bytes, ) } QuickCheck::new() @@ -496,7 +517,7 @@ fn qc_roundtrip_stream() { } #[test] -fn test_short_input() { +fn test_short_input_reader() { // Regression test for https://github.com/BurntSushi/rust-snappy/issues/42 use snap::read; use std::io::Read; @@ -505,6 +526,18 @@ fn test_short_input() { read::FrameDecoder::new(&b"123"[..]).read_to_end(&mut Vec::new()); assert_eq!(err.unwrap_err().kind(), std::io::ErrorKind::UnexpectedEof); } +#[test] +fn test_short_input_writer() { + // Regression test for https://github.com/BurntSushi/rust-snappy/issues/42 + use snap::write; + use std::io::Write; + + let buf = Vec::new(); + + let err = write::FrameDecoder::new(buf).write_all(&b"123"[..]); + + assert_eq!(err.unwrap_err().kind(), std::io::ErrorKind::UnexpectedEof); +} #[test] #[cfg(feature = "cpp")] @@ -553,7 +586,7 @@ fn write_frame_press(bytes: &[u8]) -> Vec { wtr.into_inner().unwrap() } -fn read_frame_depress(bytes: &[u8]) -> Vec { +fn read_frame_depress_reader(bytes: &[u8]) -> Vec { use snap::read; use std::io::Read; @@ -561,6 +594,15 @@ fn read_frame_depress(bytes: &[u8]) -> Vec { read::FrameDecoder::new(bytes).read_to_end(&mut buf).unwrap(); buf } +fn read_frame_depress_writer(bytes: &[u8]) -> Vec { + use snap::write; + use std::io::Write; + + let buf = vec![]; + let mut writer = write::FrameDecoder::new(buf); + writer.write_all(bytes).unwrap(); + writer.into_inner().unwrap() +} fn read_frame_press(bytes: &[u8]) -> Vec { use snap::read; From fed7d3b109822e6ee4db7c987112acf82ff608d1 Mon Sep 17 00:00:00 2001 From: Icelk Date: Tue, 24 May 2022 23:38:32 +0200 Subject: [PATCH 2/4] Initial implementation of write::FrameDecoder. --- src/write.rs | 348 ++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 344 insertions(+), 4 deletions(-) diff --git a/src/write.rs b/src/write.rs index 7975bd1..63e9787 100644 --- a/src/write.rs +++ b/src/write.rs @@ -9,17 +9,357 @@ It would also be possible to provide a `write::FrameDecoder`, which decompresses data as it writes it, but it hasn't been implemented yet. */ -use std::fmt; use std::io::{self, Write}; +use std::{cmp, fmt}; use crate::compress::Encoder; use crate::crc32::CheckSummer; +use crate::decompress::decompress_len; pub use crate::error::IntoInnerError; use crate::frame::{ - compress_frame, CHUNK_HEADER_AND_CRC_SIZE, MAX_COMPRESS_BLOCK_SIZE, - STREAM_IDENTIFIER, + compress_frame, ChunkType, CHUNK_HEADER_AND_CRC_SIZE, + MAX_COMPRESS_BLOCK_SIZE, STREAM_BODY, STREAM_IDENTIFIER, }; -use crate::MAX_BLOCK_SIZE; +use crate::raw::Decoder; +use crate::{bytes, Error, MAX_BLOCK_SIZE}; + +/// A writer for decompressing a Snappy stream. +/// +/// This `FrameDecoder` wraps any other reader that implements `std::io::Write`. +/// Bytes written to this writer are decompressed using the +/// [Snappy frame format](https://github.com/google/snappy/blob/master/framing_format.txt) +/// (file extension `sz`, MIME type `application/x-snappy-framed`). +/// +/// Writes are buffered automatically, so there's no need to wrap the given +/// writer in a `std::io::BufWriter`. +/// +/// The writer will be flushed automatically when it is dropped. If an error +/// occurs, it is ignored. +pub struct FrameDecoder { + /// The underlying reader. + /// + /// An option so we can move out of it. + w: Option, + /// A Snappy decoder that we reuse that does the actual block based + /// decompression. + dec: Decoder, + /// A CRC32 checksummer that is configured to either use the portable + /// fallback version or the SSE4.2 accelerated version when the right CPU + /// features are available. + checksummer: CheckSummer, + /// The compressed bytes buffer, taken from the underlying reader. + src: Vec, + /// The current working starting position in `src`. + src_pos: usize, + /// The current working length of `src`, `src[src_pos..src_pos + src_len]`. + src_len: usize, + /// The decompressed bytes buffer. Bytes are decompressed from src to dst + /// before being passed back to the caller. + dst: Vec, + /// Index into dst: starting point of bytes not yet given back to caller. + dsts: usize, + /// Index into dst: ending point of bytes not yet given back to caller. + dste: usize, + /// Whether we've read the special stream header or not. + read_stream_ident: bool, +} + +impl FrameDecoder { + /// Create a new writer for streaming Snappy decompression. + pub fn new(wtr: W) -> FrameDecoder { + FrameDecoder { + w: Some(wtr), + dec: Decoder::new(), + checksummer: CheckSummer::new(), + src: vec![0; MAX_COMPRESS_BLOCK_SIZE], + src_pos: 0, + src_len: 0, + dst: vec![0; MAX_BLOCK_SIZE], + dsts: 0, + dste: 0, + read_stream_ident: false, + } + } + + /// Gets a reference to the underlying writer in this decoder. + pub fn get_ref(&self) -> &W { + self.w.as_ref().unwrap() + } + + /// Gets a mutable reference to the underlying writer in this decoder. + /// + /// Note that mutation of the stream may result in surprising results if + /// this decoder is continued to be used. + pub fn get_mut(&mut self) -> &mut W { + self.w.as_mut().unwrap() + } + + /// Finish decoding and return the underlying writer. + pub fn into_inner(mut self) -> io::Result { + self.flush()?; + Ok(self.w.take().unwrap()) + } + + /// Same as [`Self::read_exact`] but also advance the `src` pointer. + fn advance_exact(&mut self, len: usize) -> Option<&[u8]> { + if len > self.src_len { + return None; + } + let range = self.src_pos..self.src_pos + len; + self.src_pos += len; + self.src_len = self.src_len.checked_sub(len).unwrap(); + self.src.get(range) + } + /// Read `len` bytes from `src` with a start offset of `start`. + /// Returns [`None`] (which you should pass on to your caller) if + /// we don't have enough data in `src`. + fn read_exact(&self, start: usize, len: usize) -> Option<&[u8]> { + if len + start > self.src_len { + return None; + } + Some(&self.src[self.src_pos + start..self.src_pos + start + len]) + } + + fn write_from_buffer(&mut self) -> Option> { + macro_rules! fail { + ($err:expr) => { + return Some(Err(io::Error::from($err))) + }; + } + loop { + if self.dsts < self.dste { + let len = self.dste - self.dsts; + let dste = self.dsts.checked_add(len).unwrap(); + let r = + self.w.as_mut().unwrap().write(&self.dst[self.dsts..dste]); + self.dsts = dste; + return Some(r.map(|_| ())); + } + let first_byte = self.read_exact(0, 4)?[0]; + let ty = ChunkType::from_u8(first_byte); + if !self.read_stream_ident { + if ty != Ok(ChunkType::Stream) { + fail!(Error::StreamHeader { byte: first_byte }); + } + self.read_stream_ident = true; + } + // we need &mut above, so get the reference again to please borrow checker + let read = self.read_exact(0, 4)?; + let len64 = bytes::read_u24_le(&read[1..]) as u64; + if len64 > self.src_len as u64 { + return None; + } + let len = len64 as usize; + match ty { + Err(b) if 0x02 <= b && b <= 0x7F => { + // Spec says that chunk types 0x02-0x7F are reserved and + // conformant decoders must return an error. + fail!(Error::UnsupportedChunkType { byte: b }); + } + Err(b) if 0x80 <= b && b <= 0xFD => { + // Spec says that chunk types 0x80-0xFD are reserved but + // skippable. + self.advance_exact(len + 4).unwrap(); + } + Err(b) => { + // Can never happen. 0x02-0x7F and 0x80-0xFD are handled + // above in the error case. That leaves 0x00, 0x01, 0xFE + // and 0xFF, each of which correspond to one of the four + // defined chunk types. + unreachable!("BUG: unhandled chunk type: {}", b); + } + Ok(ChunkType::Padding) => { + // Just read and move on. + self.advance_exact(len + 4).unwrap(); + } + Ok(ChunkType::Stream) => { + if len != STREAM_BODY.len() { + fail!(Error::UnsupportedChunkLength { + len: len64, + header: true, + }) + } + // unwrap: we asserted above that `len` fits, and that `len>=4`. + let read = self.read_exact(4, len).unwrap(); + if &read[0..len] != STREAM_BODY { + fail!(Error::StreamHeaderMismatch { + bytes: read[0..len].to_vec(), + }); + } + self.advance_exact(4 + len).unwrap(); + } + Ok(ChunkType::Uncompressed) => { + if len < 4 { + fail!(Error::UnsupportedChunkLength { + len: len as u64, + header: false, + }); + } + // unwrap: we asserted above that `len` fits, and that `len>=4`. + let expected_sum = + bytes::read_u32_le(self.read_exact(4, 4).unwrap()); + let n = len - 4; + if n > self.dst.len() { + fail!(Error::UnsupportedChunkLength { + len: n as u64, + header: false, + }); + } + // inline self.read_exact due to needing to borrow both immutably and mutably + // + // self.read_exact(8, n) + if n + 8 > self.src_len { + return None; + } + let read = self + .src + .get(self.src_pos + 8..self.src_pos + 8 + n)?; + + self.dst[0..n].copy_from_slice(read); + let got_sum = + self.checksummer.crc32c_masked(&self.dst[0..n]); + if expected_sum != got_sum { + fail!(Error::Checksum { + expected: expected_sum, + got: got_sum, + }); + } + self.advance_exact(8 + n).unwrap(); + self.dsts = 0; + self.dste = n; + } + Ok(ChunkType::Compressed) => { + if len < 4 { + fail!(Error::UnsupportedChunkLength { + len: len as u64, + header: false, + }); + } + // unwrap: we asserted above that `len` fits, and that `len>=4`. + let expected_sum = + bytes::read_u32_le(self.read_exact(4, 4).unwrap()); + let sn = len - 4; + if sn > self.src.len() { + fail!(Error::UnsupportedChunkLength { + len: len64, + header: false, + }); + } + // inline self.read_exact due to needing to borrow both immutably and mutably + // + // self.read_exact(8, n) + if sn + 8 > self.src_len { + return None; + } + let read = self + .src + .get(self.src_pos + 8..self.src_pos + 8 + sn)?; + + let dn = match decompress_len(read) { + Err(err) => fail!(err), + Ok(dn) => dn, + }; + if dn > self.dst.len() { + fail!(Error::UnsupportedChunkLength { + len: dn as u64, + header: false, + }); + } + if let Err(err) = + self.dec.decompress(read, &mut self.dst[0..dn]) + { + fail!(err) + }; + let got_sum = + self.checksummer.crc32c_masked(&self.dst[0..dn]); + if expected_sum != got_sum { + fail!(Error::Checksum { + expected: expected_sum, + got: got_sum, + }); + } + self.advance_exact(8 + sn).unwrap(); + self.dsts = 0; + self.dste = dn; + } + } + } + } +} + +impl io::Write for FrameDecoder { + fn write(&mut self, mut buf: &[u8]) -> io::Result { + let initial_len = buf.len(); + loop { + if let Some(r) = self.write_from_buffer() { + r?; + } else { + if buf.is_empty() { + return if self.src_len == 0 { + Ok(initial_len - buf.len()) + } else { + Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "snappy wants more data to decompress", + )) + }; + } + // move rest of src to start + self.src + .copy_within(self.src_pos..self.src_pos + self.src_len, 0); + self.src_pos = 0; + + // copy more from `buf` + let len = cmp::min(self.src.len() - self.src_len, buf.len()); + self.src[self.src_len..self.src_len + len] + .copy_from_slice(&buf[..len]); + self.src_len += len; + buf = &buf[len..]; + } + } + } + fn flush(&mut self) -> io::Result<()> { + let r = if let Some(r) = self.write_from_buffer() { + r.map(|_| ()) + } else if self.src_len == 0 { + Ok(()) + } else { + Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "snappy wants more data to decompress", + )) + }; + self.w.as_mut().unwrap().flush()?; + r + } +} + +impl Drop for FrameDecoder { + fn drop(&mut self) { + if self.w.is_some() { + // Ignore errors because we can't conceivably return an error and + // panicing in a dtor is bad juju. + let _ = self.flush(); + } + } +} + +impl fmt::Debug for FrameDecoder { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("FrameDecoder") + .field("w", self.w.as_ref().unwrap()) + .field("dec", &self.dec) + .field("checksummer", &self.checksummer) + .field("src", &"[...]") + .field("src_pos", &self.src_pos) + .field("src_len", &self.src_len) + .field("dst", &"[...]") + .field("dsts", &self.dsts) + .field("dste", &self.dste) + .field("read_stream_ident", &self.read_stream_ident) + .finish() + } +} /// A writer for compressing a Snappy stream. /// From c3ad9310d48ca96cbb3edea59f94ec16f53c127a Mon Sep 17 00:00:00 2001 From: Icelk Date: Wed, 25 May 2022 13:13:06 +0200 Subject: [PATCH 3/4] Added more comments and changed some properties to be more coherent to the rest of snap. --- src/write.rs | 76 ++++++++++++++++++++++++++++++---------------------- 1 file changed, 44 insertions(+), 32 deletions(-) diff --git a/src/write.rs b/src/write.rs index 63e9787..6ff38ee 100644 --- a/src/write.rs +++ b/src/write.rs @@ -49,10 +49,10 @@ pub struct FrameDecoder { checksummer: CheckSummer, /// The compressed bytes buffer, taken from the underlying reader. src: Vec, - /// The current working starting position in `src`. - src_pos: usize, - /// The current working length of `src`, `src[src_pos..src_pos + src_len]`. - src_len: usize, + /// Index into src: starting point of bytes not yet decompressed. + srcs: usize, + /// Index into src: ending point of bytes not yet decompressed. + srce: usize, /// The decompressed bytes buffer. Bytes are decompressed from src to dst /// before being passed back to the caller. dst: Vec, @@ -72,8 +72,8 @@ impl FrameDecoder { dec: Decoder::new(), checksummer: CheckSummer::new(), src: vec![0; MAX_COMPRESS_BLOCK_SIZE], - src_pos: 0, - src_len: 0, + srcs: 0, + srce: 0, dst: vec![0; MAX_BLOCK_SIZE], dsts: 0, dste: 0, @@ -100,26 +100,31 @@ impl FrameDecoder { Ok(self.w.take().unwrap()) } - /// Same as [`Self::read_exact`] but also advance the `src` pointer. + /// Same as [`Self::read_exact`] but also advance `srcs`. + /// + /// If this returns [`None`] (we don't have enough data), the pointer isn't advanced. fn advance_exact(&mut self, len: usize) -> Option<&[u8]> { - if len > self.src_len { + if len + self.srcs > self.srce { return None; } - let range = self.src_pos..self.src_pos + len; - self.src_pos += len; - self.src_len = self.src_len.checked_sub(len).unwrap(); + let range = self.srcs..self.srcs + len; + self.srcs += len; + debug_assert!(self.srcs <= self.srce); self.src.get(range) } /// Read `len` bytes from `src` with a start offset of `start`. /// Returns [`None`] (which you should pass on to your caller) if /// we don't have enough data in `src`. fn read_exact(&self, start: usize, len: usize) -> Option<&[u8]> { - if len + start > self.src_len { + if len + self.srcs + start > self.srce { return None; } - Some(&self.src[self.src_pos + start..self.src_pos + start + len]) + Some(&self.src[self.srcs + start..self.srcs + start + len]) } + /// Tries to write data from the `src` buffer to our writer. + /// + /// Based of the implementation of [`crate::read::FrameDecoder`]. fn write_from_buffer(&mut self) -> Option> { macro_rules! fail { ($err:expr) => { @@ -146,7 +151,7 @@ impl FrameDecoder { // we need &mut above, so get the reference again to please borrow checker let read = self.read_exact(0, 4)?; let len64 = bytes::read_u24_le(&read[1..]) as u64; - if len64 > self.src_len as u64 { + if len64 + self.srcs as u64 > self.srce as u64 { return None; } let len = len64 as usize; @@ -208,12 +213,11 @@ impl FrameDecoder { // inline self.read_exact due to needing to borrow both immutably and mutably // // self.read_exact(8, n) - if n + 8 > self.src_len { + if n + 8 + self.srcs > self.srce { return None; } - let read = self - .src - .get(self.src_pos + 8..self.src_pos + 8 + n)?; + let read = + self.src.get(self.srcs + 8..self.srcs + 8 + n)?; self.dst[0..n].copy_from_slice(read); let got_sum = @@ -224,6 +228,9 @@ impl FrameDecoder { got: got_sum, }); } + // we read 4 bytes for the chunk type + frame length, + // 4 bytes for the expected sum, + // and `n` bytes for the data. self.advance_exact(8 + n).unwrap(); self.dsts = 0; self.dste = n; @@ -248,12 +255,11 @@ impl FrameDecoder { // inline self.read_exact due to needing to borrow both immutably and mutably // // self.read_exact(8, n) - if sn + 8 > self.src_len { + if sn + 8 + self.srcs > self.srce { return None; } - let read = self - .src - .get(self.src_pos + 8..self.src_pos + 8 + sn)?; + let read = + self.src.get(self.srcs + 8..self.srcs + 8 + sn)?; let dn = match decompress_len(read) { Err(err) => fail!(err), @@ -278,6 +284,9 @@ impl FrameDecoder { got: got_sum, }); } + // we read 4 bytes for the chunk type + frame length, + // 4 bytes for the expected sum, + // and `sn` bytes for the data. self.advance_exact(8 + sn).unwrap(); self.dsts = 0; self.dste = dn; @@ -294,8 +303,10 @@ impl io::Write for FrameDecoder { if let Some(r) = self.write_from_buffer() { r?; } else { + // we can no longer provide more data to the implementation + // - request more from the caller. if buf.is_empty() { - return if self.src_len == 0 { + return if self.srce == self.srcs { Ok(initial_len - buf.len()) } else { Err(io::Error::new( @@ -305,15 +316,16 @@ impl io::Write for FrameDecoder { }; } // move rest of src to start - self.src - .copy_within(self.src_pos..self.src_pos + self.src_len, 0); - self.src_pos = 0; + let len = self.srce - self.srcs; + self.src.copy_within(self.srcs..self.srce, 0); + self.srce = len; + self.srcs = 0; // copy more from `buf` - let len = cmp::min(self.src.len() - self.src_len, buf.len()); - self.src[self.src_len..self.src_len + len] + let len = cmp::min(self.src.len() - self.srce, buf.len()); + self.src[self.srce..self.srce + len] .copy_from_slice(&buf[..len]); - self.src_len += len; + self.srce += len; buf = &buf[len..]; } } @@ -321,7 +333,7 @@ impl io::Write for FrameDecoder { fn flush(&mut self) -> io::Result<()> { let r = if let Some(r) = self.write_from_buffer() { r.map(|_| ()) - } else if self.src_len == 0 { + } else if self.srce == self.srcs { Ok(()) } else { Err(io::Error::new( @@ -351,8 +363,8 @@ impl fmt::Debug for FrameDecoder { .field("dec", &self.dec) .field("checksummer", &self.checksummer) .field("src", &"[...]") - .field("src_pos", &self.src_pos) - .field("src_len", &self.src_len) + .field("src_pos", &self.srcs) + .field("src_len", &self.srce) .field("dst", &"[...]") .field("dsts", &self.dsts) .field("dste", &self.dste) From 223e6cf23b63e695b182656b89516df0f2f4eac9 Mon Sep 17 00:00:00 2001 From: Icelk Date: Wed, 25 May 2022 13:16:11 +0200 Subject: [PATCH 4/4] Removed no-op code. --- src/write.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/write.rs b/src/write.rs index 6ff38ee..ebe862e 100644 --- a/src/write.rs +++ b/src/write.rs @@ -57,6 +57,8 @@ pub struct FrameDecoder { /// before being passed back to the caller. dst: Vec, /// Index into dst: starting point of bytes not yet given back to caller. + /// + /// This is always 0, but is kept to be coherent. dsts: usize, /// Index into dst: ending point of bytes not yet given back to caller. dste: usize, @@ -133,11 +135,13 @@ impl FrameDecoder { } loop { if self.dsts < self.dste { - let len = self.dste - self.dsts; - let dste = self.dsts.checked_add(len).unwrap(); - let r = - self.w.as_mut().unwrap().write(&self.dst[self.dsts..dste]); - self.dsts = dste; + let r = self + .w + .as_mut() + .unwrap() + .write(&self.dst[self.dsts..self.dste]); + self.dsts = 0; + self.dste = 0; return Some(r.map(|_| ())); } let first_byte = self.read_exact(0, 4)?[0];