diff --git a/src/storage/hummock_sdk/src/key.rs b/src/storage/hummock_sdk/src/key.rs index b18871070348..bf06dd477277 100644 --- a/src/storage/hummock_sdk/src/key.rs +++ b/src/storage/hummock_sdk/src/key.rs @@ -438,6 +438,10 @@ impl> UserKey { buf.put_slice(self.table_key.as_ref()); } + pub fn encode_table_key_into(&self, buf: &mut impl BufMut) { + buf.put_slice(self.table_key.as_ref()); + } + /// Encode in to a buffer. pub fn encode_length_prefixed(&self, buf: &mut impl BufMut) { buf.put_u32(self.table_id.table_id()); @@ -583,6 +587,12 @@ impl> FullKey { buf } + // Encode in to a buffer. + pub fn encode_into_without_table_id(&self, buf: &mut impl BufMut) { + self.user_key.encode_table_key_into(buf); + buf.put_u64(self.epoch); + } + pub fn encode_reverse_epoch(&self) -> Vec { let mut buf = Vec::with_capacity( TABLE_PREFIX_LEN + self.user_key.table_key.as_ref().len() + EPOCH_LEN, @@ -614,6 +624,20 @@ impl<'a> FullKey<&'a [u8]> { } } + /// Construct a [`FullKey`] from a byte slice without `table_id` encoded. + pub fn from_slice_without_table_id( + table_id: TableId, + slice_without_table_id: &'a [u8], + ) -> Self { + let epoch_pos = slice_without_table_id.len() - EPOCH_LEN; + let epoch = (&slice_without_table_id[epoch_pos..]).get_u64(); + + Self { + user_key: UserKey::new(table_id, TableKey(&slice_without_table_id[..epoch_pos])), + epoch, + } + } + /// Construct a [`FullKey`] from a byte slice. pub fn decode_reverse_epoch(slice: &'a [u8]) -> Self { let epoch_pos = slice.len() - EPOCH_LEN; diff --git a/src/storage/src/hummock/compactor/iterator.rs b/src/storage/src/hummock/compactor/iterator.rs index 6692f6ce807a..028d27c40e43 100644 --- a/src/storage/src/hummock/compactor/iterator.rs +++ b/src/storage/src/hummock/compactor/iterator.rs @@ -91,6 +91,7 @@ impl SstableStreamIterator { if let (Some(block_iter), Some(seek_key)) = (self.block_iter.as_mut(), seek_key) { block_iter.seek(seek_key); + if !block_iter.is_valid() { // `seek_key` is larger than everything in the first block. self.next_block().await?; diff --git a/src/storage/src/hummock/sstable/block.rs b/src/storage/src/hummock/sstable/block.rs index faee26340052..cdb36807f5a6 100644 --- a/src/storage/src/hummock/sstable/block.rs +++ b/src/storage/src/hummock/sstable/block.rs @@ -18,6 +18,7 @@ use std::mem::size_of; use std::ops::Range; use bytes::{Buf, BufMut, Bytes, BytesMut}; +use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::key::FullKey; use risingwave_hummock_sdk::KeyComparator; use {lz4, zstd}; @@ -142,6 +143,10 @@ pub struct Block { pub data: Bytes, /// Uncompressed entried data length. data_len: usize, + + /// Table id of this block. + table_id: TableId, + /// Restart points. restart_points: Vec, } @@ -149,6 +154,7 @@ pub struct Block { impl Block { pub fn decode(buf: Bytes, uncompressed_capacity: usize) -> HummockResult { // Verify checksum. + let xxhash64_checksum = (&buf[buf.len() - 8..]).get_u64_le(); xxhash64_verify(&buf[..buf.len() - 8], xxhash64_checksum)?; @@ -184,11 +190,12 @@ impl Block { } pub fn decode_from_raw(buf: Bytes) -> Self { + let table_id = (&buf[buf.len() - 4..]).get_u32_le(); // decode restart_points_type_index - let n_index = ((&buf[buf.len() - 4..]).get_u32_le()) as usize; + let n_index = ((&buf[buf.len() - 8..buf.len() - 4]).get_u32_le()) as usize; let index_data_len = size_of::() + n_index * RestartPoint::size_of(); - let data_len = buf.len() - index_data_len; - let mut restart_points_type_index_buf = &buf[data_len..buf.len() - 4]; + let data_len = buf.len() - 4 - index_data_len; + let mut restart_points_type_index_buf = &buf[data_len..buf.len() - 8]; let mut index_key_vec = Vec::with_capacity(n_index); for _ in 0..n_index { @@ -213,6 +220,7 @@ impl Block { let mut restart_points_buf = &buf[data_len..restarts_end]; let mut type_index: usize = 0; + for _ in 0..n_restarts { let offset = restart_points_buf.get_u32_le(); if type_index < index_key_vec.len() - 1 @@ -232,6 +240,7 @@ impl Block { data: buf, data_len, restart_points, + table_id: TableId::new(table_id), } } @@ -243,7 +252,13 @@ impl Block { } pub fn capacity(&self) -> usize { - self.data.len() + self.restart_points.capacity() * std::mem::size_of::() + self.data.len() + + self.restart_points.capacity() * std::mem::size_of::() + + std::mem::size_of::() + } + + pub fn table_id(&self) -> TableId { + self.table_id } /// Gets restart point by index. @@ -385,6 +400,7 @@ pub struct BlockBuilder { /// Compression algorithm. compression_algorithm: CompressionAlgorithm, + table_id: Option, // restart_points_type_index stores only the restart_point corresponding to each type change, // as an index, in order to reduce space usage restart_points_type_index: Vec, @@ -402,6 +418,7 @@ impl BlockBuilder { last_key: vec![], entry_count: 0, compression_algorithm: options.compression_algorithm, + table_id: None, restart_points_type_index: Vec::default(), } } @@ -420,15 +437,20 @@ impl BlockBuilder { /// /// Panic if key is not added in ASCEND order. pub fn add(&mut self, full_key: FullKey<&[u8]>, value: &[u8]) { + let input_table_id = full_key.user_key.table_id.table_id(); + match self.table_id { + Some(current_table_id) => debug_assert_eq!(current_table_id, input_table_id), + None => self.table_id = Some(input_table_id), + } #[cfg(debug_assertions)] self.debug_valid(); let mut key: BytesMut = Default::default(); - full_key.encode_into(&mut key); + full_key.encode_into_without_table_id(&mut key); if self.entry_count > 0 { debug_assert!(!key.is_empty()); debug_assert_eq!( - KeyComparator::compare_encoded_full_key(&self.last_key[..], &key), + KeyComparator::compare_encoded_full_key(&self.last_key[..], &key[..]), Ordering::Less ); } @@ -462,7 +484,7 @@ impl BlockBuilder { key.as_ref() } else { - bytes_diff_below_max_key_length(&self.last_key, &key) + bytes_diff_below_max_key_length(&self.last_key, &key[..]) }; let prefix = KeyPrefix::new_without_len( @@ -492,6 +514,7 @@ impl BlockBuilder { pub fn clear(&mut self) { self.buf.clear(); self.restart_points.clear(); + self.table_id = None; self.restart_points_type_index.clear(); self.last_key.clear(); self.entry_count = 0; @@ -504,6 +527,7 @@ impl BlockBuilder { + (RestartPoint::size_of()) // (offset + len_type(u8)) * len * self.restart_points_type_index.len() + std::mem::size_of::() // restart_points_type_index len + + std::mem::size_of::() // table_id len } /// Finishes building block. @@ -545,6 +569,7 @@ impl BlockBuilder { self.buf .put_u32_le(self.restart_points_type_index.len() as u32); + self.buf.put_u32_le(self.table_id.unwrap()); match self.compression_algorithm { CompressionAlgorithm::None => (), CompressionAlgorithm::Lz4 => { @@ -581,6 +606,7 @@ impl BlockBuilder { self.compression_algorithm.encode(&mut self.buf); let checksum = xxhash64_checksum(&self.buf); self.buf.put_u64_le(checksum); + self.buf.as_ref() } @@ -595,6 +621,7 @@ impl BlockBuilder { + std::mem::size_of::() // restart_points_type_indics.len + std::mem::size_of::() // compression_algorithm + std::mem::size_of::() // checksum + + std::mem::size_of::() // table_id } pub fn debug_valid(&self) { diff --git a/src/storage/src/hummock/sstable/block_iterator.rs b/src/storage/src/hummock/sstable/block_iterator.rs index 91e03ec9a93f..14b31e9406de 100644 --- a/src/storage/src/hummock/sstable/block_iterator.rs +++ b/src/storage/src/hummock/sstable/block_iterator.rs @@ -17,7 +17,6 @@ use std::ops::Range; use bytes::BytesMut; use risingwave_hummock_sdk::key::FullKey; -use risingwave_hummock_sdk::KeyComparator; use super::{KeyPrefix, LenType, RestartPoint}; use crate::hummock::BlockHolder; @@ -77,7 +76,8 @@ impl BlockIterator { pub fn key(&self) -> FullKey<&[u8]> { assert!(self.is_valid()); - FullKey::decode(&self.key) + + FullKey::from_slice_without_table_id(self.block.table_id(), &self.key[..]) } pub fn value(&self) -> &[u8] { @@ -99,19 +99,19 @@ impl BlockIterator { } pub fn seek(&mut self, key: FullKey<&[u8]>) { - let full_key_encoded = key.encode(); - self.seek_restart_point_by_key(&full_key_encoded); - self.next_until_key(&full_key_encoded); + self.seek_restart_point_by_key(key); + + self.next_until_key(key); } pub fn seek_le(&mut self, key: FullKey<&[u8]>) { - let full_key_encoded = key.encode(); - self.seek_restart_point_by_key(&full_key_encoded); - self.next_until_key(&full_key_encoded); + self.seek_restart_point_by_key(key); + + self.next_until_key(key); if !self.is_valid() { self.seek_to_last(); } - self.prev_until_key(&full_key_encoded); + self.prev_until_key(key); } } @@ -171,19 +171,15 @@ impl BlockIterator { } /// Moves forward until reaching the first that equals or larger than the given `key`. - fn next_until_key(&mut self, key: &[u8]) { - while self.is_valid() - && KeyComparator::compare_encoded_full_key(&self.key[..], key) == Ordering::Less - { + fn next_until_key(&mut self, key: FullKey<&[u8]>) { + while self.is_valid() && self.key().cmp(&key) == Ordering::Less { self.next_inner(); } } /// Moves backward until reaching the first key that equals or smaller than the given `key`. - fn prev_until_key(&mut self, key: &[u8]) { - while self.is_valid() - && KeyComparator::compare_encoded_full_key(&self.key[..], key) == Ordering::Greater - { + fn prev_until_key(&mut self, key: FullKey<&[u8]>) { + while self.is_valid() && self.key().cmp(&key) == Ordering::Greater { self.prev_inner(); } } @@ -240,7 +236,7 @@ impl BlockIterator { } /// Searches the restart point index that the given `key` belongs to. - fn search_restart_point_index_by_key(&self, key: &[u8]) -> usize { + fn search_restart_point_index_by_key(&self, key: FullKey<&[u8]>) -> usize { // Find the largest restart point that restart key equals or less than the given key. self.block .search_restart_partition_point( @@ -252,7 +248,9 @@ impl BlockIterator { let prefix = self.decode_prefix_at(probe as usize, key_len_type, value_len_type); let probe_key = &self.block.data()[prefix.diff_key_range()]; - match KeyComparator::compare_encoded_full_key(probe_key, key) { + let full_probe_key = + FullKey::from_slice_without_table_id(self.block.table_id(), probe_key); + match full_probe_key.cmp(&key) { Ordering::Less | Ordering::Equal => true, Ordering::Greater => false, } @@ -262,7 +260,7 @@ impl BlockIterator { } /// Seeks to the restart point that the given `key` belongs to. - fn seek_restart_point_by_key(&mut self, key: &[u8]) { + fn seek_restart_point_by_key(&mut self, key: FullKey<&[u8]>) { let index = self.search_restart_point_index_by_key(key); self.seek_restart_point_by_index(index) }