Skip to content

Commit

Permalink
refactor(storage): avoid decode and compression when refill (#13309)
Browse files Browse the repository at this point in the history
Signed-off-by: MrCroxx <mrcroxx@outlook.com>
  • Loading branch information
MrCroxx authored Nov 13, 2023
1 parent d932c58 commit aaccc89
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 39 deletions.
15 changes: 7 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ dyn-clone = "1.0.14"
either = "1"
enum-as-inner = "0.6"
fail = "0.5"
foyer = { git = "https://github.com/MrCroxx/foyer", rev = "9232b3a" }
foyer = { git = "https://github.com/MrCroxx/foyer", rev = "ce2e222" }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
futures-async-stream = { workspace = true }
hex = "0.4"
Expand Down
22 changes: 14 additions & 8 deletions src/storage/src/hummock/event_handler/refiller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::task::{ready, Context, Poll};
use std::time::{Duration, Instant};

use foyer::common::code::Key;
use foyer::common::range::RangeBoundsExt;
use futures::future::{join_all, try_join_all};
use futures::{Future, FutureExt};
use itertools::Itertools;
Expand All @@ -37,7 +38,8 @@ use tokio::task::JoinHandle;
use crate::hummock::file_cache::preclude::*;
use crate::hummock::local_version::pinned_version::PinnedVersion;
use crate::hummock::{
Block, HummockError, HummockResult, Sstable, SstableBlockIndex, SstableStoreRef, TableHolder,
CachedBlock, FileCacheCompression, HummockError, HummockResult, Sstable, SstableBlockIndex,
SstableStoreRef, TableHolder,
};
use crate::monitor::StoreLocalStatistic;

Expand Down Expand Up @@ -382,14 +384,15 @@ impl CacheRefillTask {
let mut admits = 0;

for block_index in block_index_start..block_index_end {
let (range, uncompressed_capacity) = sst.calculate_block_info(block_index);
let (range, _uncompressed_capacity) = sst.calculate_block_info(block_index);
let key = SstableBlockIndex {
sst_id: object_id,
block_idx: block_index as u64,
};
// see `CachedBlock::serialized_len()`
let mut writer = sstable_store
.data_file_cache()
.writer(key, key.serialized_len() + uncompressed_capacity);
.writer(key, key.serialized_len() + 1 + 8 + range.size().unwrap());

if writer.judge() {
admits += 1;
Expand Down Expand Up @@ -422,13 +425,16 @@ impl CacheRefillTask {
let bytes = data.slice(offset..offset + len);

let future = async move {
let block = Block::decode(
let value = CachedBlock::Fetched {
bytes,
writer.weight() - writer.key().serialized_len(),
)?;
let block = Box::new(block);
uncompressed_capacity: writer.weight()
- writer.key().serialized_len(),
};
writer.force();
let res = writer.finish(block).await.map_err(HummockError::file_cache);
// TODO(MrCroxx): compress if raw is not compressed?
// skip compression for it may already be compressed.
writer.set_compression(FileCacheCompression::None);
let res = writer.finish(value).await.map_err(HummockError::file_cache);
if matches!(res, Ok(true)) {
GLOBAL_CACHE_REFILL_METRICS
.data_refill_success_bytes
Expand Down
127 changes: 113 additions & 14 deletions src/storage/src/hummock/file_cache/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::path::PathBuf;
use std::sync::Arc;

use bytes::{Buf, BufMut, Bytes};
use foyer::common::code::{Key, Value};
use foyer::common::code::{CodingResult, Key, Value};
use foyer::intrusive::eviction::lfu::LfuConfig;
use foyer::storage::admission::rated_ticket::RatedTicketAdmissionPolicy;
use foyer::storage::admission::AdmissionPolicy;
Expand Down Expand Up @@ -48,6 +48,7 @@ pub type DeviceConfig = foyer::storage::device::fs::FsDeviceConfig;

pub type FileCacheResult<T> = foyer::storage::error::Result<T>;
pub type FileCacheError = foyer::storage::error::Error;
pub type FileCacheCompression = foyer::storage::compress::Compression;

#[derive(Debug)]
pub struct FileCacheConfig<K, V>
Expand Down Expand Up @@ -180,6 +181,20 @@ where
FileCacheWriter::None { writer } => writer.finish(value).await,
}
}

fn compression(&self) -> Compression {
match self {
FileCacheWriter::Foyer { writer } => writer.compression(),
FileCacheWriter::None { writer } => writer.compression(),
}
}

fn set_compression(&mut self, compression: Compression) {
match self {
FileCacheWriter::Foyer { writer } => writer.set_compression(compression),
FileCacheWriter::None { writer } => writer.set_compression(compression),
}
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -336,15 +351,95 @@ impl Key for SstableBlockIndex {
8 + 8 // sst_id (8B) + block_idx (8B)
}

fn write(&self, mut buf: &mut [u8]) {
fn write(&self, mut buf: &mut [u8]) -> CodingResult<()> {
buf.put_u64(self.sst_id);
buf.put_u64(self.block_idx);
Ok(())
}

fn read(mut buf: &[u8]) -> Self {
fn read(mut buf: &[u8]) -> CodingResult<Self> {
let sst_id = buf.get_u64();
let block_idx = buf.get_u64();
Self { sst_id, block_idx }
Ok(Self { sst_id, block_idx })
}
}

/// [`CachedBlock`] uses different coding for writing to use/bypass compression.
///
/// But when reading, it will always be `Loaded`.
#[derive(Debug)]
pub enum CachedBlock {
Loaded {
block: Box<Block>,
},
Fetched {
bytes: Bytes,
uncompressed_capacity: usize,
},
}

impl CachedBlock {
pub fn should_compress(&self) -> bool {
match self {
CachedBlock::Loaded { .. } => true,
// TODO(MrCroxx): based on block original compression algorithm?
CachedBlock::Fetched { .. } => false,
}
}

pub fn into_inner(self) -> Box<Block> {
match self {
CachedBlock::Loaded { block } => block,
CachedBlock::Fetched { .. } => unreachable!(),
}
}
}

impl Value for CachedBlock {
fn serialized_len(&self) -> usize {
1 /* type */ + match self {
CachedBlock::Loaded { block } => block.raw_data().len(),
CachedBlock::Fetched { bytes, uncompressed_capacity: _ } => 8 + bytes.len(),
}
}

fn write(&self, mut buf: &mut [u8]) -> CodingResult<()> {
match self {
CachedBlock::Loaded { block } => {
buf.put_u8(0);
buf.put_slice(block.raw_data())
}
CachedBlock::Fetched {
bytes,
uncompressed_capacity,
} => {
buf.put_u8(1);
buf.put_u64(*uncompressed_capacity as u64);
buf.put_slice(bytes);
}
}
Ok(())
}

fn read(mut buf: &[u8]) -> CodingResult<Self> {
let v = buf.get_u8();
let res = match v {
0 => {
let data = Bytes::copy_from_slice(buf);
let block = Block::decode_from_raw(data);
let block = Box::new(block);
Self::Loaded { block }
}
1 => {
let uncompressed_capacity = buf.get_u64() as usize;
let data = Bytes::copy_from_slice(buf);
let block = Block::decode(data, uncompressed_capacity)?;
let block = Box::new(block);
Self::Loaded { block }
}
_ => unreachable!(),
};
Ok(res)
}
}

Expand All @@ -353,14 +448,16 @@ impl Value for Box<Block> {
self.raw_data().len()
}

fn write(&self, mut buf: &mut [u8]) {
buf.put_slice(self.raw_data())
fn write(&self, mut buf: &mut [u8]) -> CodingResult<()> {
buf.put_slice(self.raw_data());
Ok(())
}

fn read(buf: &[u8]) -> Self {
fn read(buf: &[u8]) -> CodingResult<Self> {
let data = Bytes::copy_from_slice(buf);
let block = Block::decode_from_raw(data);
Box::new(block)
let block = Box::new(block);
Ok(block)
}
}

Expand All @@ -369,18 +466,20 @@ impl Value for Box<Sstable> {
8 + self.meta.encoded_size() // id (8B) + meta size
}

fn write(&self, mut buf: &mut [u8]) {
fn write(&self, mut buf: &mut [u8]) -> CodingResult<()> {
buf.put_u64(self.id);
// TODO(MrCroxx): avoid buffer copy
let mut buffer = vec![];
self.meta.encode_to(&mut buffer);
buf.put_slice(&buffer[..])
buf.put_slice(&buffer[..]);
Ok(())
}

fn read(mut buf: &[u8]) -> Self {
fn read(mut buf: &[u8]) -> CodingResult<Self> {
let id = buf.get_u64();
let meta = SstableMeta::decode(buf).unwrap();
Box::new(Sstable::new(id, meta))
let sstable = Box::new(Sstable::new(id, meta));
Ok(sstable)
}
}

Expand Down Expand Up @@ -416,9 +515,9 @@ mod tests {
);

let mut buf = vec![0; block.serialized_len()];
block.write(&mut buf[..]);
block.write(&mut buf[..]).unwrap();

let block = <Box<Block> as Value>::read(&buf[..]);
let block = <Box<Block> as Value>::read(&buf[..]).unwrap();

let mut bi = BlockIterator::new(BlockHolder::from_owned_block(block));

Expand Down
Loading

0 comments on commit aaccc89

Please sign in to comment.