Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): do not compress table_id #8512

Merged
merged 23 commits into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/storage/hummock_sdk/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,10 @@ impl<T: AsRef<[u8]>> UserKey<T> {
buf.put_slice(self.table_key.as_ref());
}

pub fn encode_table_key_into(&self, buf: &mut impl BufMut) {
buf.put_slice(self.table_key.as_ref());
}

/// Encode in to a buffer.
pub fn encode_length_prefixed(&self, buf: &mut impl BufMut) {
buf.put_u32(self.table_id.table_id());
Expand Down Expand Up @@ -583,6 +587,12 @@ impl<T: AsRef<[u8]>> FullKey<T> {
buf
}

// Encode in to a buffer.
pub fn encode_into_without_table_id(&self, buf: &mut impl BufMut) {
self.user_key.encode_table_key_into(buf);
buf.put_u64(self.epoch);
}

pub fn encode_reverse_epoch(&self) -> Vec<u8> {
let mut buf = Vec::with_capacity(
TABLE_PREFIX_LEN + self.user_key.table_key.as_ref().len() + EPOCH_LEN,
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/hummock/compactor/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ impl SstableStreamIterator {

if let (Some(block_iter), Some(seek_key)) = (self.block_iter.as_mut(), seek_key) {
block_iter.seek(seek_key);

if !block_iter.is_valid() {
// `seek_key` is larger than everything in the first block.
self.next_block().await?;
Expand Down
51 changes: 34 additions & 17 deletions src/storage/src/hummock/sstable/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,14 @@ pub struct Block {
data_len: usize,
/// Restart points.
restart_points: Vec<u32>,

table_id: u32,
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
}

impl Block {
pub fn decode(buf: Bytes, uncompressed_capacity: usize) -> HummockResult<Self> {
// Verify checksum.

let xxhash64_checksum = (&buf[buf.len() - 8..]).get_u64_le();
xxhash64_verify(&buf[..buf.len() - 8], xxhash64_checksum)?;

Expand Down Expand Up @@ -76,11 +79,12 @@ impl Block {
}

pub fn decode_from_raw(buf: Bytes) -> Self {
let table_id = (&buf[buf.len() - 4..]).get_u32_le();
// Decode restart points.
let n_restarts = (&buf[buf.len() - 4..]).get_u32_le();
let data_len = buf.len() - 4 - n_restarts as usize * 4;
let n_restarts = (&buf[buf.len() - 8..buf.len() - 4]).get_u32_le();
let data_len = buf.len() - 8 - n_restarts as usize * 4;
let mut restart_points = Vec::with_capacity(n_restarts as usize);
let mut restart_points_buf = &buf[data_len..buf.len() - 4];
let mut restart_points_buf = &buf[data_len..buf.len() - 8];
for _ in 0..n_restarts {
restart_points.push(restart_points_buf.get_u32_le());
}
Expand All @@ -89,6 +93,7 @@ impl Block {
data: buf,
data_len,
restart_points,
table_id,
}
}

Expand All @@ -100,7 +105,11 @@ impl Block {
}

pub fn capacity(&self) -> usize {
self.data.len() + self.restart_points.capacity() * std::mem::size_of::<u32>()
self.data.len() + self.restart_points.capacity() * std::mem::size_of::<u32>() + 4
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
}

pub fn table_id(&self) -> u32 {
self.table_id
}

/// Gets restart point by index.
Expand Down Expand Up @@ -238,6 +247,8 @@ pub struct BlockBuilder {
entry_count: usize,
/// Compression algorithm.
compression_algorithm: CompressionAlgorithm,

table_id: Option<u32>,
}

impl BlockBuilder {
Expand All @@ -252,6 +263,7 @@ impl BlockBuilder {
last_key: vec![],
entry_count: 0,
compression_algorithm: options.compression_algorithm,
table_id: None,
}
}

Expand All @@ -272,12 +284,18 @@ impl BlockBuilder {
///
/// Panic if key is not added in ASCEND order.
pub fn add(&mut self, full_key: FullKey<&[u8]>, value: &[u8]) {
let input_table_id = full_key.user_key.table_id.table_id();
match self.table_id {
Some(current_table_id) => debug_assert_eq!(current_table_id, input_table_id),
None => self.table_id = Some(input_table_id),
}

let mut key: BytesMut = Default::default();
full_key.encode_into(&mut key);
full_key.encode_into_without_table_id(&mut key);
if self.entry_count > 0 {
debug_assert!(!key.is_empty());
debug_assert_eq!(
KeyComparator::compare_encoded_full_key(&self.last_key[..], &key),
KeyComparator::compare_encoded_full_key(&self.last_key[..], &key[..]),
Ordering::Less
);
}
Expand All @@ -286,7 +304,7 @@ impl BlockBuilder {
self.restart_points.push(self.buf.len() as u32);
key.as_ref()
} else {
bytes_diff_below_max_key_length(&self.last_key, &key)
bytes_diff_below_max_key_length(&self.last_key, &key[..])
};

let prefix = KeyPrefix {
Expand Down Expand Up @@ -316,13 +334,14 @@ impl BlockBuilder {
pub fn clear(&mut self) {
self.buf.clear();
self.restart_points.clear();
self.table_id = None;
self.last_key.clear();
self.entry_count = 0;
}

/// Calculate block size without compression.
pub fn uncompressed_block_size(&mut self) -> usize {
self.buf.len() + (self.restart_points.len() + 1) * std::mem::size_of::<u32>()
self.buf.len() + 4 + (self.restart_points.len() + 1) * std::mem::size_of::<u32>()
}

/// Finishes building block.
Expand All @@ -339,10 +358,13 @@ impl BlockBuilder {
/// Panic if there is compression error.
pub fn build(&mut self) -> &[u8] {
assert!(self.entry_count > 0);

for restart_point in &self.restart_points {
self.buf.put_u32_le(*restart_point);
}
self.buf.put_u32_le(self.restart_points.len() as u32);

self.buf.put_u32_le(self.table_id.unwrap());
match self.compression_algorithm {
CompressionAlgorithm::None => (),
CompressionAlgorithm::Lz4 => {
Expand Down Expand Up @@ -378,13 +400,15 @@ impl BlockBuilder {
self.compression_algorithm.encode(&mut self.buf);
let checksum = xxhash64_checksum(&self.buf);
self.buf.put_u64_le(checksum);

self.buf.as_ref()
}

/// Approximate block len (uncompressed).
pub fn approximate_len(&self) -> usize {
// block + restart_points + restart_points.len + compression_algorithm + checksum
self.buf.len() + 4 * self.restart_points.len() + 4 + 1 + 8
// block + restart_points + restart_points.len + compression_algorithm + checksum +
// table_id
self.buf.len() + 4 * self.restart_points.len() + 4 + 1 + 8 + 4
}
}

Expand Down Expand Up @@ -478,13 +502,6 @@ mod tests {
assert!(!bi.is_valid());
}

// pub fn full_key(user_key: &[u8], epoch: u64) -> Bytes {
// let mut buf = BytesMut::with_capacity(user_key.len() + 8);
// buf.put_slice(user_key);
// buf.put_u64(!epoch);
// buf.freeze()
// }

pub fn construct_full_key_struct(
table_id: u32,
table_key: &[u8],
Expand Down
39 changes: 26 additions & 13 deletions src/storage/src/hummock/sstable/block_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
use std::cmp::Ordering;
use std::ops::Range;

use bytes::BytesMut;
use risingwave_hummock_sdk::key::FullKey;
use bytes::{Buf, BytesMut};
use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey, EPOCH_LEN};
use risingwave_hummock_sdk::KeyComparator;

use super::KeyPrefix;
Expand All @@ -25,7 +26,7 @@ use crate::hummock::BlockHolder;
/// [`BlockIterator`] is used to read kv pairs in a block.
pub struct BlockIterator {
/// Block that iterates on.
block: BlockHolder,
pub block: BlockHolder,
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
/// Current restart point index.
restart_point_index: usize,
/// Current offset.
Expand Down Expand Up @@ -72,7 +73,11 @@ impl BlockIterator {

pub fn key(&self) -> FullKey<&[u8]> {
assert!(self.is_valid());
FullKey::decode(&self.key)
let table_id = TableId::new(self.block.table_id());
let epoch_pos = self.key[..].len() - EPOCH_LEN;
let epoch = (&self.key[epoch_pos..]).get_u64();
let user_key = UserKey::new(table_id, TableKey(&self.key[..epoch_pos]));
FullKey::from_user_key(user_key, epoch)
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
}

pub fn value(&self) -> &[u8] {
Expand All @@ -94,19 +99,27 @@ impl BlockIterator {
}

pub fn seek(&mut self, key: FullKey<&[u8]>) {
let full_key_encoded = key.encode();
self.seek_restart_point_by_key(&full_key_encoded);
self.next_until_key(&full_key_encoded);
if key.user_key.table_id.table_id() == self.block.table_id() {
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
let mut full_key_encoded_without_table_id: BytesMut = Default::default();
key.encode_into_without_table_id(&mut full_key_encoded_without_table_id);

self.seek_restart_point_by_key(&full_key_encoded_without_table_id[..]);

self.next_until_key(&full_key_encoded_without_table_id[..]);
}
}

pub fn seek_le(&mut self, key: FullKey<&[u8]>) {
let full_key_encoded = key.encode();
self.seek_restart_point_by_key(&full_key_encoded);
self.next_until_key(&full_key_encoded);
if !self.is_valid() {
self.seek_to_last();
if key.user_key.table_id.table_id() == self.block.table_id() {
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
let mut full_key_encoded_without_table_id: BytesMut = Default::default();
key.encode_into_without_table_id(&mut full_key_encoded_without_table_id);
self.seek_restart_point_by_key(&full_key_encoded_without_table_id[..]);
self.next_until_key(&full_key_encoded_without_table_id[..]);
if !self.is_valid() {
self.seek_to_last();
}
self.prev_until_key(&full_key_encoded_without_table_id[..]);
}
self.prev_until_key(&full_key_encoded);
}
}

Expand Down