-
-
Notifications
You must be signed in to change notification settings - Fork 671
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Zstd compression support, Make block size configurable via IndexSettings #1374
Changes from all commits
aaa22ad
03040ed
d4e5b48
152e823
0759bf9
6837a4d
fc045e6
c95013b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
use std::io; | ||
|
||
use zstd::bulk::{compress_to_buffer, decompress_to_buffer}; | ||
use zstd::DEFAULT_COMPRESSION_LEVEL; | ||
|
||
#[inline] | ||
pub fn compress(uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()> { | ||
let count_size = std::mem::size_of::<u32>(); | ||
let max_size = zstd::zstd_safe::compress_bound(uncompressed.len()) + count_size; | ||
|
||
compressed.clear(); | ||
compressed.resize(max_size, 0); | ||
|
||
let compressed_size = compress_to_buffer( | ||
uncompressed, | ||
&mut compressed[count_size..], | ||
DEFAULT_COMPRESSION_LEVEL, | ||
)?; | ||
|
||
compressed[0..count_size].copy_from_slice(&(uncompressed.len() as u32).to_le_bytes()); | ||
compressed.resize(compressed_size + count_size, 0); | ||
|
||
Ok(()) | ||
} | ||
|
||
#[inline] | ||
pub fn decompress(compressed: &[u8], decompressed: &mut Vec<u8>) -> io::Result<()> { | ||
let count_size = std::mem::size_of::<u32>(); | ||
let uncompressed_size = u32::from_le_bytes( | ||
compressed | ||
.get(..count_size) | ||
.ok_or(io::ErrorKind::InvalidData)? | ||
.try_into() | ||
.unwrap(), | ||
) as usize; | ||
|
||
decompressed.clear(); | ||
decompressed.resize(uncompressed_size, 0); | ||
|
||
let decompressed_size = decompress_to_buffer(&compressed[count_size..], decompressed)?; | ||
|
||
if decompressed_size != uncompressed_size { | ||
return Err(io::Error::new( | ||
io::ErrorKind::InvalidData, | ||
"doc store block not completely decompressed, data corruption".to_string(), | ||
)); | ||
} | ||
|
||
Ok(()) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -50,6 +50,9 @@ mod compression_brotli; | |
#[cfg(feature = "snappy-compression")] | ||
mod compression_snap; | ||
|
||
#[cfg(feature = "zstd-compression")] | ||
mod compression_zstd_block; | ||
|
||
#[cfg(test)] | ||
pub mod tests { | ||
|
||
|
@@ -69,18 +72,21 @@ pub mod tests { | |
sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt \ | ||
mollit anim id est laborum."; | ||
|
||
const BLOCK_SIZE: usize = 16_384; | ||
|
||
pub fn write_lorem_ipsum_store( | ||
writer: WritePtr, | ||
num_docs: usize, | ||
compressor: Compressor, | ||
blocksize: usize, | ||
) -> Schema { | ||
let mut schema_builder = Schema::builder(); | ||
let field_body = schema_builder.add_text_field("body", TextOptions::default().set_stored()); | ||
let field_title = | ||
schema_builder.add_text_field("title", TextOptions::default().set_stored()); | ||
let schema = schema_builder.build(); | ||
{ | ||
let mut store_writer = StoreWriter::new(writer, compressor); | ||
let mut store_writer = StoreWriter::new(writer, compressor, blocksize); | ||
for i in 0..num_docs { | ||
let mut doc = Document::default(); | ||
doc.add_field_value(field_body, LOREM.to_string()); | ||
|
@@ -103,7 +109,7 @@ pub mod tests { | |
let path = Path::new("store"); | ||
let directory = RamDirectory::create(); | ||
let store_wrt = directory.open_write(path)?; | ||
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4); | ||
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4, BLOCK_SIZE); | ||
let field_title = schema.get_field("title").unwrap(); | ||
let store_file = directory.open_read(path)?; | ||
let store = StoreReader::open(store_file)?; | ||
|
@@ -139,11 +145,11 @@ pub mod tests { | |
Ok(()) | ||
} | ||
|
||
fn test_store(compressor: Compressor) -> crate::Result<()> { | ||
fn test_store(compressor: Compressor, blocksize: usize) -> crate::Result<()> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add a test in our test_store suite that test for random (high entropy) payloads? We spotted a bug in the original form of this PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can look into it later today - but just wanted to confirm, would the test be for the case where the input is incompressible to the point where the compressed output is larger than the input? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes |
||
let path = Path::new("store"); | ||
let directory = RamDirectory::create(); | ||
let store_wrt = directory.open_write(path)?; | ||
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor); | ||
let schema = write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor, blocksize); | ||
let field_title = schema.get_field("title").unwrap(); | ||
let store_file = directory.open_read(path)?; | ||
let store = StoreReader::open(store_file)?; | ||
|
@@ -169,22 +175,28 @@ pub mod tests { | |
|
||
#[test] | ||
fn test_store_noop() -> crate::Result<()> { | ||
test_store(Compressor::None) | ||
test_store(Compressor::None, BLOCK_SIZE) | ||
} | ||
#[cfg(feature = "lz4-compression")] | ||
#[test] | ||
fn test_store_lz4_block() -> crate::Result<()> { | ||
test_store(Compressor::Lz4) | ||
test_store(Compressor::Lz4, BLOCK_SIZE) | ||
} | ||
#[cfg(feature = "snappy-compression")] | ||
#[test] | ||
fn test_store_snap() -> crate::Result<()> { | ||
test_store(Compressor::Snappy) | ||
test_store(Compressor::Snappy, BLOCK_SIZE) | ||
} | ||
#[cfg(feature = "brotli-compression")] | ||
#[test] | ||
fn test_store_brotli() -> crate::Result<()> { | ||
test_store(Compressor::Brotli) | ||
test_store(Compressor::Brotli, BLOCK_SIZE) | ||
} | ||
|
||
#[cfg(feature = "zstd-compression")] | ||
#[test] | ||
fn test_store_zstd() -> crate::Result<()> { | ||
test_store(Compressor::Zstd, BLOCK_SIZE) | ||
} | ||
|
||
#[test] | ||
|
@@ -348,6 +360,7 @@ mod bench { | |
directory.open_write(path).unwrap(), | ||
1_000, | ||
Compressor::default(), | ||
16_384, | ||
); | ||
directory.delete(path).unwrap(); | ||
}); | ||
|
@@ -361,6 +374,7 @@ mod bench { | |
directory.open_write(path).unwrap(), | ||
1_000, | ||
Compressor::default(), | ||
16_384, | ||
); | ||
let store_file = directory.open_read(path).unwrap(); | ||
let store = StoreReader::open(store_file).unwrap(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,8 +11,6 @@ use crate::schema::Document; | |
use crate::store::index::Checkpoint; | ||
use crate::DocId; | ||
|
||
const BLOCK_SIZE: usize = 16_384; | ||
|
||
/// Write tantivy's [`Store`](./index.html) | ||
/// | ||
/// Contrary to the other components of `tantivy`, | ||
|
@@ -22,6 +20,7 @@ const BLOCK_SIZE: usize = 16_384; | |
/// The skip list index on the other hand, is built in memory. | ||
pub struct StoreWriter { | ||
compressor: Compressor, | ||
block_size: usize, | ||
doc: DocId, | ||
first_doc_in_block: DocId, | ||
offset_index_writer: SkipIndexBuilder, | ||
|
@@ -35,9 +34,10 @@ impl StoreWriter { | |
/// | ||
/// The store writer will writes blocks on disc as | ||
/// document are added. | ||
pub fn new(writer: WritePtr, compressor: Compressor) -> StoreWriter { | ||
pub fn new(writer: WritePtr, compressor: Compressor, block_size: usize) -> StoreWriter { | ||
StoreWriter { | ||
compressor, | ||
block_size, | ||
doc: 0, | ||
first_doc_in_block: 0, | ||
offset_index_writer: SkipIndexBuilder::new(), | ||
|
@@ -65,7 +65,7 @@ impl StoreWriter { | |
VInt(doc_num_bytes as u64).serialize(&mut self.current_block)?; | ||
self.current_block.write_all(serialized_document)?; | ||
self.doc += 1; | ||
if self.current_block.len() > BLOCK_SIZE { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that's true for the other stores, and it does not matter much, but shouldn't we make that This is almost philosophy at this point, but my mental model is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that's true for the other stores, and it does not matter much, but shouldn't we make that This is almost philosophy at this point, but my mental model is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just went with what was already there, personally I don't think it makes much difference since blocks are allowed to overflow anyway; |
||
if self.current_block.len() > self.block_size { | ||
self.write_and_compress_block()?; | ||
} | ||
Ok(()) | ||
|
@@ -86,7 +86,7 @@ impl StoreWriter { | |
self.current_block | ||
.write_all(&self.intermediary_buffer[..])?; | ||
self.doc += 1; | ||
if self.current_block.len() > BLOCK_SIZE { | ||
if self.current_block.len() > self.block_size { | ||
self.write_and_compress_block()?; | ||
} | ||
Ok(()) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a test in our test_store suite that test for random payloads?