Skip to content

Commit

Permalink
perf(compaction): avoid duplicate data in LSM (#8489)
Browse files Browse the repository at this point in the history
  • Loading branch information
soundOfDestiny authored Mar 16, 2023
1 parent ad7e21b commit cfc0349
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 39 deletions.
120 changes: 85 additions & 35 deletions src/storage/src/hummock/compactor/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ struct SstableStreamIterator {
/// Counts the time used for IO.
stats_ptr: Arc<AtomicU64>,

// For debugging
/// For key sanity check of divided SST and debugging
sstable_info: SstableInfo,
}

Expand Down Expand Up @@ -77,6 +77,22 @@ impl SstableStreamIterator {
}
}

async fn prune_from_valid_block_iter(&mut self) -> HummockResult<()> {
while let Some(block_iter) = self.block_iter.as_mut() {
if self
.sstable_info
.get_table_ids()
.binary_search(&block_iter.table_id().table_id)
.is_ok()
{
return Ok(());
} else {
self.next_block().await?;
}
}
Ok(())
}

/// Initialises the iterator by moving it to the first KV-pair in the stream's first block where
/// key >= `seek_key`. If that block does not contain such a KV-pair, the iterator continues to
/// the first KV-pair of the next block. If `seek_key` is not given, the iterator will move to
Expand All @@ -98,7 +114,7 @@ impl SstableStreamIterator {
}
}

Ok(())
self.prune_from_valid_block_iter().await
}

/// Loads a new block, creates a new iterator for it, and stores that iterator in
Expand Down Expand Up @@ -147,6 +163,7 @@ impl SstableStreamIterator {
block_iter.next();
if !block_iter.is_valid() {
self.next_block().await?;
self.prune_from_valid_block_iter().await?;
}

Ok(())
Expand Down Expand Up @@ -226,11 +243,12 @@ impl ConcatSstableIterator {
/// Resets the iterator, loads the specified SST, and seeks in that SST to `seek_key` if given.
async fn seek_idx(
&mut self,
idx: usize,
mut idx: usize,
seek_key: Option<FullKey<&[u8]>>,
) -> HummockResult<()> {
self.sstable_iter.take();
let seek_key: Option<FullKey<&[u8]>> = match (seek_key, self.key_range.left.is_empty()) {
let mut seek_key: Option<FullKey<&[u8]>> = match (seek_key, self.key_range.left.is_empty())
{
(Some(seek_key), false) => match seek_key.cmp(&FullKey::decode(&self.key_range.left)) {
Ordering::Less | Ordering::Equal => Some(FullKey::decode(&self.key_range.left)),
Ordering::Greater => Some(seek_key),
Expand All @@ -240,14 +258,14 @@ impl ConcatSstableIterator {
(None, false) => Some(FullKey::decode(&self.key_range.left)),
};

if idx < self.tables.len() {
while idx < self.tables.len() {
let table_info = &self.tables[idx];
let table = self
.sstable_store
.sstable(table_info, &mut self.stats)
.await?;
let block_metas = &table.value().meta.block_metas;
let start_index = match seek_key {
let mut start_index = match seek_key {
None => 0,
Some(seek_key) => {
// start_index points to the greatest block whose smallest_key <= seek_key.
Expand All @@ -268,32 +286,61 @@ impl ConcatSstableIterator {
) != Ordering::Greater
})
};
if end_index <= start_index {
return Ok(());
}

let stats_ptr = self.stats.remote_io_time.clone();
let now = Instant::now();

let block_stream = self
.sstable_store
.get_stream(table.value(), Some(start_index))
.await?;

// Determine time needed to open stream.
let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);

let mut sstable_iter = SstableStreamIterator::new(
table_info,
block_stream,
end_index - start_index,
&self.stats,
);
sstable_iter.seek(seek_key).await?;
while start_index < end_index {
let start_block_table_id = block_metas[start_index].table_id();
if table_info
.get_table_ids()
.binary_search(&start_block_table_id.table_id)
.is_ok()
{
break;
} else {
start_index +=
&block_metas[(start_index + 1)..].partition_point(|block_meta| {
block_meta.table_id() == start_block_table_id
}) + 1;
}
}

self.sstable_iter = Some(sstable_iter);
let found = if end_index <= start_index {
false
} else {
let stats_ptr = self.stats.remote_io_time.clone();
let now = Instant::now();

let block_stream = self
.sstable_store
.get_stream(table.value(), Some(start_index))
.await?;

// Determine time needed to open stream.
let add = (now.elapsed().as_secs_f64() * 1000.0).ceil();
stats_ptr.fetch_add(add as u64, atomic::Ordering::Relaxed);

let mut sstable_iter = SstableStreamIterator::new(
table_info,
block_stream,
end_index - start_index,
&self.stats,
);
sstable_iter.seek(seek_key).await?;

if sstable_iter.is_valid() {
self.sstable_iter = Some(sstable_iter);
true
} else {
false
}
};
self.cur_idx = idx;

if found {
return Ok(());
} else {
idx += 1;
seek_key = None;
}
}
Ok(())
}
Expand Down Expand Up @@ -383,7 +430,8 @@ mod tests {
use crate::hummock::iterator::test_utils::mock_sstable_store;
use crate::hummock::iterator::HummockIterator;
use crate::hummock::test_utils::{
default_builder_opt_for_test, gen_test_sstable, test_key_of, test_value_of, TEST_KEYS_COUNT,
default_builder_opt_for_test, gen_test_sstable_and_info, test_key_of, test_value_of,
TEST_KEYS_COUNT,
};
use crate::hummock::value::HummockValue;

Expand All @@ -394,15 +442,15 @@ mod tests {
for object_id in 0..3 {
let start_index = object_id * TEST_KEYS_COUNT;
let end_index = (object_id + 1) * TEST_KEYS_COUNT;
let table = gen_test_sstable(
let (_table, table_info) = gen_test_sstable_and_info(
default_builder_opt_for_test(),
object_id as u64,
(start_index..end_index)
.map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
sstable_store.clone(),
)
.await;
table_infos.push(table.get_sstable_info());
table_infos.push(table_info);
}
let start_index = 5000;
let end_index = 25000;
Expand Down Expand Up @@ -494,15 +542,15 @@ mod tests {
for object_id in 0..3 {
let start_index = object_id * TEST_KEYS_COUNT + TEST_KEYS_COUNT / 2;
let end_index = (object_id + 1) * TEST_KEYS_COUNT;
let table = gen_test_sstable(
let (_table, table_info) = gen_test_sstable_and_info(
default_builder_opt_for_test(),
object_id as u64,
(start_index..end_index)
.map(|i| (test_key_of(i), HummockValue::put(test_value_of(i)))),
sstable_store.clone(),
)
.await;
table_infos.push(table.get_sstable_info());
table_infos.push(table_info);
}

// Test seek_idx. Result is dominated by given seek key rather than key range.
Expand Down Expand Up @@ -536,7 +584,9 @@ mod tests {
let block_1_second_key = iter.key().to_vec();
// Use a big enough seek key and result in invalid iterator.
let seek_key = test_key_of(30001);
iter.seek_idx(0, Some(seek_key.to_ref())).await.unwrap();
iter.seek_idx(table_infos.len() - 1, Some(seek_key.to_ref()))
.await
.unwrap();
assert!(!iter.is_valid());

// Test seek_idx. Result is dominated by key range rather than given seek key.
Expand Down
7 changes: 6 additions & 1 deletion src/storage/src/hummock/sstable/block_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::cmp::Ordering;
use std::ops::Range;

use bytes::BytesMut;
use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::key::FullKey;

use super::{KeyPrefix, LenType, RestartPoint};
Expand Down Expand Up @@ -74,10 +75,14 @@ impl BlockIterator {
self.try_prev_inner()
}

pub fn table_id(&self) -> TableId {
self.block.table_id()
}

pub fn key(&self) -> FullKey<&[u8]> {
assert!(self.is_valid());

FullKey::from_slice_without_table_id(self.block.table_id(), &self.key[..])
FullKey::from_slice_without_table_id(self.table_id(), &self.key[..])
}

pub fn value(&self) -> &[u8] {
Expand Down
6 changes: 5 additions & 1 deletion src/storage/src/hummock/sstable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use bytes::{Buf, BufMut};
pub use forward_sstable_iterator::*;
mod backward_sstable_iterator;
pub use backward_sstable_iterator::*;
use risingwave_hummock_sdk::key::{KeyPayloadType, TableKey, UserKey};
use risingwave_hummock_sdk::key::{FullKey, KeyPayloadType, TableKey, UserKey};
use risingwave_hummock_sdk::{HummockEpoch, HummockSstableObjectId};
#[cfg(test)]
use risingwave_pb::hummock::{KeyRange, SstableInfo};
Expand Down Expand Up @@ -253,6 +253,10 @@ impl BlockMeta {
pub fn encoded_size(&self) -> usize {
16 /* offset + len + key len + uncompressed size */ + self.smallest_key.len()
}

pub fn table_id(&self) -> TableId {
FullKey::decode(&self.smallest_key).user_key.table_id
}
}

#[derive(Clone, PartialEq, Eq, Debug)]
Expand Down
24 changes: 22 additions & 2 deletions src/storage/src/hummock/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ pub async fn gen_test_sstable_inner<B: AsRef<[u8]>>(
range_tombstones: Vec<DeleteRangeTombstone>,
sstable_store: SstableStoreRef,
policy: CachePolicy,
) -> Sstable {
) -> (Sstable, SstableInfo) {
let writer_opts = SstableWriterOptions {
capacity_hint: None,
tracker: None,
Expand All @@ -227,7 +227,7 @@ pub async fn gen_test_sstable_inner<B: AsRef<[u8]>>(
)
.await
.unwrap();
table.value().as_ref().clone()
(table.value().as_ref().clone(), output.sst_info.sst_info)
}

/// Generate a test table from the given `kv_iter` and put the kv value to `sstable_store`
Expand All @@ -246,6 +246,25 @@ pub async fn gen_test_sstable<B: AsRef<[u8]>>(
CachePolicy::NotFill,
)
.await
.0
}

/// Generate a test table from the given `kv_iter` and put the kv value to `sstable_store`
pub async fn gen_test_sstable_and_info<B: AsRef<[u8]>>(
opts: SstableBuilderOptions,
object_id: HummockSstableObjectId,
kv_iter: impl Iterator<Item = (FullKey<B>, HummockValue<B>)>,
sstable_store: SstableStoreRef,
) -> (Sstable, SstableInfo) {
gen_test_sstable_inner(
opts,
object_id,
kv_iter,
vec![],
sstable_store,
CachePolicy::NotFill,
)
.await
}

/// Generate a test table from the given `kv_iter` and put the kv value to `sstable_store`
Expand All @@ -265,6 +284,7 @@ pub async fn gen_test_sstable_with_range_tombstone(
CachePolicy::NotFill,
)
.await
.0
}

/// Generates a user key with table id 0 and the given `table_key`
Expand Down

0 comments on commit cfc0349

Please sign in to comment.