diff --git a/Cargo.lock b/Cargo.lock index 2c2b53fac6e9..48afff670a71 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5944,6 +5944,7 @@ dependencies = [ "twox-hash", "uuid", "workspace-hack", + "xxhash-rust", "zstd", ] @@ -8047,6 +8048,12 @@ version = "0.13.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4d25c75bf9ea12c4040a97f829154768bbbce366287e2dc044af160cd79a13fd" +[[package]] +name = "xxhash-rust" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "735a71d46c4d68d71d4b24d03fdc2b98e38cea81730595801db779c04fe80d70" + [[package]] name = "xz2" version = "0.1.7" diff --git a/src/batch/src/executor/insert.rs b/src/batch/src/executor/insert.rs index aee76a2281b4..64657297a0ff 100644 --- a/src/batch/src/executor/insert.rs +++ b/src/batch/src/executor/insert.rs @@ -307,7 +307,7 @@ mod tests { epoch, None, ReadOptions { - prefix_hint: None, + dist_key_hint: None, check_bloom_filter: false, ignore_range_tombstone: false, table_id: Default::default(), diff --git a/src/common/src/catalog/internal_table.rs b/src/common/src/catalog/internal_table.rs index 06bcb5adf703..0f3dcbaab597 100644 --- a/src/common/src/catalog/internal_table.rs +++ b/src/common/src/catalog/internal_table.rs @@ -14,6 +14,7 @@ use std::sync::LazyLock; +use itertools::Itertools; use regex::Regex; pub const RW_INTERNAL_TABLE_FUNCTION_NAME: &str = "rw_table"; @@ -38,3 +39,41 @@ pub fn valid_table_name(table_name: &str) -> bool { LazyLock::new(|| Regex::new(r"__internal_.*_\d+").unwrap()); !INTERNAL_TABLE_NAME.is_match(table_name) } + +pub fn get_dist_key_in_pk_indices(dist_key_indices: &[usize], pk_indices: &[usize]) -> Vec { + let dist_key_in_pk_indices = dist_key_indices + .iter() + .map(|&di| { + pk_indices + .iter() + .position(|&pi| di == pi) + .unwrap_or_else(|| { + panic!( + "distribution key {:?} must be a subset of primary key {:?}", + dist_key_indices, pk_indices + ) + }) + }) + .collect_vec(); + dist_key_in_pk_indices +} + +/// Get distribution key start index in pk, and return None if `dist_key_in_pk_indices` is not empty +/// or continuous. +/// Note that `dist_key_in_pk_indices` may be shuffled, the start index should be the +/// minimum value. +pub fn get_dist_key_start_index_in_pk(dist_key_in_pk_indices: &[usize]) -> Option { + let mut sorted_dist_key = dist_key_in_pk_indices.iter().sorted(); + if let Some(min_idx) = sorted_dist_key.next() { + let mut prev_idx = min_idx; + for idx in sorted_dist_key { + if *idx != prev_idx + 1 { + return None; + } + prev_idx = idx; + } + Some(*min_idx) + } else { + None + } +} diff --git a/src/common/src/util/ordered/serde.rs b/src/common/src/util/ordered/serde.rs index 4a037eab995d..5cf79370dcb9 100644 --- a/src/common/src/util/ordered/serde.rs +++ b/src/common/src/util/ordered/serde.rs @@ -104,6 +104,34 @@ impl OrderedRowSerde { Ok(len) } + + /// return the distribution key start position in serialized key and the distribution key + /// length. + pub fn deserialize_dist_key_position_with_column_indices( + &self, + key: &[u8], + dist_key_indices_pair: (usize, usize), + ) -> memcomparable::Result<(usize, usize)> { + let (dist_key_start_index, dist_key_end_index) = dist_key_indices_pair; + use crate::types::ScalarImpl; + let mut dist_key_start_position: usize = 0; + let mut len: usize = 0; + for index in 0..dist_key_end_index { + let data_type = &self.schema[index]; + let order_type = &self.order_types[index]; + let data = &key[len..]; + let mut deserializer = memcomparable::Deserializer::new(data); + deserializer.set_reverse(*order_type == OrderType::Descending); + + let field_length = ScalarImpl::encoding_data_size(data_type, &mut deserializer)?; + len += field_length; + if index < dist_key_start_index { + dist_key_start_position += field_length; + } + } + + Ok((dist_key_start_position, (len - dist_key_start_position))) + } } #[cfg(test)] @@ -237,6 +265,70 @@ mod tests { } } + #[test] + fn test_deserialize_dist_key_position_with_column_indices() { + let order_types = vec![ + OrderType::Descending, + OrderType::Ascending, + OrderType::Descending, + OrderType::Ascending, + ]; + + let schema = vec![ + DataType::Varchar, + DataType::Int16, + DataType::Varchar, + DataType::Varchar, + ]; + let serde = OrderedRowSerde::new(schema, order_types); + let row1 = Row::new(vec![ + Some(Utf8("aaa".to_string().into())), + Some(Int16(5)), + Some(Utf8("bbb".to_string().into())), + Some(Utf8("ccc".to_string().into())), + ]); + let rows = vec![row1]; + let mut array = vec![]; + for row in &rows { + let mut row_bytes = vec![]; + serde.serialize(row, &mut row_bytes); + array.push(row_bytes); + } + + { + let dist_key_indices = [1, 2]; + let dist_key_start_index = 1; + let (dist_key_start_position, dist_key_len) = serde + .deserialize_dist_key_position_with_column_indices( + &array[0], + ( + dist_key_start_index, + dist_key_start_index + dist_key_indices.len(), + ), + ) + .unwrap(); + + let schema = vec![DataType::Varchar]; + let order_types = vec![OrderType::Descending]; + let deserde = OrderedRowSerde::new(schema, order_types); + let prefix_slice = &array[0][0..dist_key_start_position]; + assert_eq!( + deserde.deserialize(prefix_slice).unwrap(), + Row::new(vec![Some(Utf8("aaa".to_string().into()))]) + ); + + let schema = vec![DataType::INT16, DataType::VARCHAR]; + let order_types = vec![OrderType::Ascending, OrderType::Descending]; + let deserde = OrderedRowSerde::new(schema, order_types); + let dist_key_slice = + &array[0][dist_key_start_position..dist_key_start_position + dist_key_len]; + assert_eq!( + deserde.deserialize(dist_key_slice).unwrap(), + Row::new(vec![Some(Int16(5)), Some(Utf8("bbb".to_string().into()))]) + ); + } + } + #[test] fn test_encoding_data_size() { use std::mem::size_of; diff --git a/src/common/src/util/scan_range.rs b/src/common/src/util/scan_range.rs index dea95f69ed93..1f657df6614a 100644 --- a/src/common/src/util/scan_range.rs +++ b/src/common/src/util/scan_range.rs @@ -14,12 +14,12 @@ use std::ops::{Bound, RangeBounds}; -use itertools::Itertools; use paste::paste; use risingwave_pb::batch_plan::scan_range::Bound as BoundProst; use risingwave_pb::batch_plan::ScanRange as ScanRangeProst; use super::value_encoding::serialize_datum_to_bytes; +use crate::catalog::get_dist_key_in_pk_indices; use crate::hash::VirtualNode; use crate::row::{Row2, RowExt}; use crate::types::{Datum, ScalarImpl}; @@ -84,20 +84,7 @@ impl ScanRange { return None; } - let dist_key_in_pk_indices = dist_key_indices - .iter() - .map(|&di| { - pk_indices - .iter() - .position(|&pi| di == pi) - .unwrap_or_else(|| { - panic!( - "distribution keys {:?} must be a subset of primary keys {:?}", - dist_key_indices, pk_indices - ) - }) - }) - .collect_vec(); + let dist_key_in_pk_indices = get_dist_key_in_pk_indices(dist_key_indices, pk_indices); let pk_prefix_len = self.eq_conds.len(); if dist_key_in_pk_indices.iter().any(|&i| i >= pk_prefix_len) { return None; diff --git a/src/ctl/src/cmd_impl/hummock/list_kv.rs b/src/ctl/src/cmd_impl/hummock/list_kv.rs index 4d289ec848b5..aa5367f03f7b 100644 --- a/src/ctl/src/cmd_impl/hummock/list_kv.rs +++ b/src/ctl/src/cmd_impl/hummock/list_kv.rs @@ -34,7 +34,7 @@ pub async fn list_kv(epoch: u64, table_id: u32) -> anyhow::Result<()> { None, ReadOptions { ignore_range_tombstone: false, - prefix_hint: None, + dist_key_hint: None, table_id: TableId { table_id }, retention_seconds: None, check_bloom_filter: false, diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index dd6570830146..3654d358ad7a 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -411,7 +411,7 @@ mod tests { vnode_col_idx: None, value_indices: vec![0], definition: "".into(), - handle_pk_conflict: false + handle_pk_conflict: false, } ); assert_eq!(table, TableCatalog::from(table.to_prost(0, 0))); diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index d3568c508191..28a1a487d3f3 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -150,6 +150,7 @@ impl StreamMaterialize { } let ctx = input.ctx(); + let distribution_key = base.dist.dist_column_indices().to_vec(); let properties = ctx.with_options().internal_table_subset(); let table = TableCatalog { id: TableId::placeholder(), @@ -158,7 +159,7 @@ impl StreamMaterialize { columns, pk: pk_list, stream_key: pk_indices.clone(), - distribution_key: base.dist.dist_column_indices().to_vec(), + distribution_key, is_index, appendonly: input.append_only(), owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID, diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index c447abc39ae1..7badd7734e05 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -80,6 +80,7 @@ tokio-stream = "0.1" tonic = { version = "0.2", package = "madsim-tonic" } tracing = "0.1" twox-hash = "1" +xxhash-rust = { version = "0.8.5", features = ["xxh32"] } zstd = "0.11.2" [target.'cfg(not(madsim))'.dependencies] diff --git a/src/storage/hummock_sdk/src/filter_key_extractor.rs b/src/storage/hummock_sdk/src/filter_key_extractor.rs index 6471b83e857c..5fe4abace18c 100644 --- a/src/storage/hummock_sdk/src/filter_key_extractor.rs +++ b/src/storage/hummock_sdk/src/filter_key_extractor.rs @@ -18,7 +18,9 @@ use std::sync::Arc; use std::time::Duration; use parking_lot::RwLock; -use risingwave_common::catalog::ColumnDesc; +use risingwave_common::catalog::{ + get_dist_key_in_pk_indices, get_dist_key_start_index_in_pk, ColumnDesc, +}; use risingwave_common::hash::VirtualNode; use risingwave_common::util::ordered::OrderedRowSerde; use risingwave_common::util::sort_util::OrderType; @@ -56,12 +58,14 @@ impl FilterKeyExtractorImpl { .map(|col_order| col_order.index as usize) .collect(); - let match_read_pattern = - !dist_key_indices.is_empty() && pk_indices.starts_with(&dist_key_indices); + let dist_key_in_pk_indices = get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices); + + let match_read_pattern = !dist_key_in_pk_indices.is_empty() + && get_dist_key_start_index_in_pk(&dist_key_in_pk_indices).is_some(); if !match_read_pattern { // for now frontend had not infer the table_id_to_filter_key_extractor, so we // use FullKeyFilterKeyExtractor - FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor::default()) + FilterKeyExtractorImpl::Dummy(DummyFilterKeyExtractor::default()) } else { FilterKeyExtractorImpl::Schema(SchemaFilterKeyExtractor::new(table_catalog)) } @@ -78,6 +82,7 @@ macro_rules! impl_filter_key_extractor { } } } + } macro_rules! for_all_filter_key_extractor_variants { @@ -136,7 +141,9 @@ pub struct SchemaFilterKeyExtractor { /// Prefix key length can be decoded through its `DataType` and `OrderType` which obtained from /// `TableCatalog`. `read_pattern_prefix_column` means the count of column to decode prefix /// from storage key. - read_pattern_prefix_column: usize, + + /// distribution_key does not need to be the prefix of pk. + distribution_key_indices_pair_in_pk: Option<(usize, usize)>, deserializer: OrderedRowSerde, // TODO:need some bench test for same prefix case like join (if we need a prefix_cache for same // prefix_key) @@ -145,36 +152,61 @@ pub struct SchemaFilterKeyExtractor { impl FilterKeyExtractor for SchemaFilterKeyExtractor { fn extract<'a>(&self, full_key: &'a [u8]) -> &'a [u8] { if full_key.len() < TABLE_PREFIX_LEN + VirtualNode::SIZE { - return full_key; + return &[]; } let (_table_prefix, key) = full_key.split_at(TABLE_PREFIX_LEN); let (_vnode_prefix, pk) = key.split_at(VirtualNode::SIZE); // if the key with table_id deserializer fail from schema, that should panic here for early - // detection - let pk_prefix_len = self - .deserializer - .deserialize_prefix_len_with_column_indices(pk, 0..self.read_pattern_prefix_column) - .unwrap(); - - let prefix_len = TABLE_PREFIX_LEN + VirtualNode::SIZE + pk_prefix_len; - &full_key[0..prefix_len] + // detection. + if let Some((distribution_key_start_index_in_pk, distribution_key_end_index_in_pk)) = + self.distribution_key_indices_pair_in_pk + { + let (dist_key_start_position, dist_key_len) = self + .deserializer + .deserialize_dist_key_position_with_column_indices( + pk, + ( + distribution_key_start_index_in_pk, + distribution_key_end_index_in_pk, + ), + ) + .unwrap(); + + let start_position = TABLE_PREFIX_LEN + VirtualNode::SIZE + dist_key_start_position; + &full_key[start_position..start_position + dist_key_len] + } else { + &[] + } } } impl SchemaFilterKeyExtractor { pub fn new(table_catalog: &Table) -> Self { - let read_pattern_prefix_column = table_catalog.distribution_key.len(); - assert_ne!(0, read_pattern_prefix_column); - - // column_index in pk + let dist_key_indices: Vec = table_catalog + .distribution_key + .iter() + .map(|idx| *idx as usize) + .collect(); let pk_indices: Vec = table_catalog .pk .iter() .map(|col_order| col_order.index as usize) .collect(); + let dist_key_in_pk_indices = get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices); + + let distribution_key_start_index_in_pk = + get_dist_key_start_index_in_pk(&dist_key_in_pk_indices); + let distribution_key_indices_pair_in_pk = + distribution_key_start_index_in_pk.map(|distribution_key_start_index_in_pk| { + ( + distribution_key_start_index_in_pk, + dist_key_in_pk_indices.len() + distribution_key_start_index_in_pk, + ) + }); + let data_types = pk_indices .iter() .map(|column_idx| &table_catalog.columns[*column_idx]) @@ -192,7 +224,7 @@ impl SchemaFilterKeyExtractor { .collect(); Self { - read_pattern_prefix_column, + distribution_key_indices_pair_in_pk, deserializer: OrderedRowSerde::new(data_types, order_types), } } @@ -358,7 +390,6 @@ mod tests { use std::time::Duration; use bytes::{BufMut, BytesMut}; - use itertools::Itertools; use risingwave_common::catalog::{ColumnDesc, ColumnId}; use risingwave_common::constants::hummock::PROPERTIES_RETENTION_SECOND_KEY; use risingwave_common::hash::VirtualNode; @@ -392,7 +423,7 @@ mod tests { assert_eq!(full_key, output_key); } - fn build_table_with_prefix_column_num(column_count: u32) -> ProstTable { + fn build_table_with_prefix_column_num() -> ProstTable { ProstTable { is_index: false, id: 0, @@ -416,11 +447,11 @@ mod tests { ProstColumnCatalog { column_desc: Some( (&ColumnDesc { - data_type: DataType::Int64, + data_type: DataType::Varchar, column_id: ColumnId::new(0), name: "col_1".to_string(), field_descs: vec![], - type_name: "Int64".to_string(), + type_name: "Varchar".to_string(), }) .into(), ), @@ -442,11 +473,11 @@ mod tests { ProstColumnCatalog { column_desc: Some( (&ColumnDesc { - data_type: DataType::Varchar, + data_type: DataType::Int64, column_id: ColumnId::new(0), name: "col_3".to_string(), field_descs: vec![], - type_name: "Varchar".to_string(), + type_name: "Int64".to_string(), }) .into(), ), @@ -465,7 +496,7 @@ mod tests { ], stream_key: vec![0], dependent_relations: vec![], - distribution_key: (0..column_count as i32).collect_vec(), + distribution_key: vec![3], optional_associated_source_id: None, appendonly: false, owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID, @@ -483,15 +514,15 @@ mod tests { #[test] fn test_schema_filter_key_extractor() { - let prost_table = build_table_with_prefix_column_num(1); + let prost_table = build_table_with_prefix_column_num(); let schema_filter_key_extractor = SchemaFilterKeyExtractor::new(&prost_table); let order_types: Vec = vec![OrderType::Ascending, OrderType::Ascending]; - let schema = vec![DataType::Int64, DataType::Varchar]; + let schema = vec![DataType::Varchar, DataType::Int64]; let serializer = OrderedRowSerde::new(schema, order_types); let row = Row::new(vec![ + Some(ScalarImpl::Utf8("abc".to_string().into())), Some(ScalarImpl::Int64(100)), - Some(ScalarImpl::Utf8("abc".into())), ]); let mut row_bytes = vec![]; serializer.serialize(&row, &mut row_bytes); @@ -507,10 +538,7 @@ mod tests { let full_key = [&table_prefix, vnode_prefix, &row_bytes].concat(); let output_key = schema_filter_key_extractor.extract(&full_key); - assert_eq!( - TABLE_PREFIX_LEN + VirtualNode::SIZE + 1 + mem::size_of::(), - output_key.len() - ); + assert_eq!(mem::size_of::() + 1, output_key.len()); } #[test] @@ -518,50 +546,44 @@ mod tests { let mut multi_filter_key_extractor = MultiFilterKeyExtractor::default(); { // test table_id 1 - let prost_table = build_table_with_prefix_column_num(1); + let prost_table = build_table_with_prefix_column_num(); let schema_filter_key_extractor = SchemaFilterKeyExtractor::new(&prost_table); multi_filter_key_extractor.register( 1, Arc::new(FilterKeyExtractorImpl::Schema(schema_filter_key_extractor)), ); let order_types: Vec = vec![OrderType::Ascending, OrderType::Ascending]; - let schema = vec![DataType::Int64, DataType::Varchar]; + let schema = vec![DataType::Varchar, DataType::Int64]; let serializer = OrderedRowSerde::new(schema, order_types); let row = Row::new(vec![ + Some(ScalarImpl::Utf8("abc".to_string().into())), Some(ScalarImpl::Int64(100)), - Some(ScalarImpl::Utf8("abc".into())), ]); - let mut row_bytes = vec![]; - serializer.serialize(&row, &mut row_bytes); + let vnode_prefix = "v".as_bytes(); + assert_eq!(VirtualNode::SIZE, vnode_prefix.len()); let table_prefix = { let mut buf = BytesMut::with_capacity(TABLE_PREFIX_LEN); buf.put_u32(1); buf.to_vec() }; - - let vnode_prefix = "v".as_bytes(); - assert_eq!(VirtualNode::SIZE, vnode_prefix.len()); - + let mut row_bytes = vec![]; + serializer.serialize(&row, &mut row_bytes); let full_key = [&table_prefix, vnode_prefix, &row_bytes].concat(); let output_key = multi_filter_key_extractor.extract(&full_key); + let order_types: Vec = vec![OrderType::Ascending, OrderType::Ascending]; + let schema = vec![DataType::Varchar, DataType::Int64]; + let deserializer = OrderedRowSerde::new(schema, order_types); - let data_types = vec![DataType::Int64]; - let order_types = vec![OrderType::Ascending]; - let deserializer = OrderedRowSerde::new(data_types, order_types); - - let pk_prefix_len = deserializer - .deserialize_prefix_len_with_column_indices(&row_bytes, 0..=0) + let (_, dist_key_len) = deserializer + .deserialize_dist_key_position_with_column_indices(&row_bytes, (1, 2)) .unwrap(); - assert_eq!( - TABLE_PREFIX_LEN + VirtualNode::SIZE + pk_prefix_len, - output_key.len() - ); + assert_eq!(dist_key_len, output_key.len()); } { - // test table_id 1 - let prost_table = build_table_with_prefix_column_num(2); + // test table_id 2 + let prost_table = build_table_with_prefix_column_num(); let schema_filter_key_extractor = SchemaFilterKeyExtractor::new(&prost_table); multi_filter_key_extractor.register( 2, @@ -571,11 +593,12 @@ mod tests { let schema = vec![DataType::Int64, DataType::Varchar]; let serializer = OrderedRowSerde::new(schema, order_types); let row = Row::new(vec![ + Some(ScalarImpl::Utf8("abc".to_string().into())), Some(ScalarImpl::Int64(100)), - Some(ScalarImpl::Utf8("abc".into())), ]); let mut row_bytes = vec![]; serializer.serialize(&row, &mut row_bytes); + serializer.serialize(&row, &mut row_bytes); let table_prefix = { let mut buf = BytesMut::with_capacity(TABLE_PREFIX_LEN); @@ -589,21 +612,19 @@ mod tests { let full_key = [&table_prefix, vnode_prefix, &row_bytes].concat(); let output_key = multi_filter_key_extractor.extract(&full_key); - let data_types = vec![DataType::Int64, DataType::Varchar]; + let data_types = vec![DataType::Varchar, DataType::Int64]; let order_types = vec![OrderType::Ascending, OrderType::Ascending]; let deserializer = OrderedRowSerde::new(data_types, order_types); - let pk_prefix_len = deserializer - .deserialize_prefix_len_with_column_indices(&row_bytes, 0..=1) + let (_dist_key_start_position, dist_key_len) = deserializer + .deserialize_dist_key_position_with_column_indices(&row_bytes, (1, 2)) .unwrap(); - assert_eq!( - TABLE_PREFIX_LEN + VirtualNode::SIZE + pk_prefix_len, - output_key.len() - ); + assert_eq!(dist_key_len, output_key.len()); } { + // test table_id 3 let full_key_filter_key_extractor = FullKeyFilterKeyExtractor::default(); multi_filter_key_extractor.register( 3, @@ -625,10 +646,7 @@ mod tests { let full_key = [&table_prefix, vnode_prefix, row_bytes].concat(); let output_key = multi_filter_key_extractor.extract(&full_key); - assert_eq!( - TABLE_PREFIX_LEN + VirtualNode::SIZE + row_bytes.len(), - output_key.len() - ); + assert_eq!(full_key.len(), output_key.len()); } } diff --git a/src/storage/hummock_test/benches/bench_hummock_iter.rs b/src/storage/hummock_test/benches/bench_hummock_iter.rs index e02b7aed36fe..f25b7993d399 100644 --- a/src/storage/hummock_test/benches/bench_hummock_iter.rs +++ b/src/storage/hummock_test/benches/bench_hummock_iter.rs @@ -95,7 +95,7 @@ fn criterion_benchmark(c: &mut Criterion) { (Unbounded, Unbounded), epoch, ReadOptions { - prefix_hint: None, + dist_key_hint: None, ignore_range_tombstone: true, check_bloom_filter: false, retention_seconds: None, diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index fbd00839a703..4382b14bc560 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -315,8 +315,8 @@ pub(crate) mod tests { (32 * 1000) << 16, ReadOptions { ignore_range_tombstone: false, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, }, @@ -334,7 +334,7 @@ pub(crate) mod tests { ReadOptions { ignore_range_tombstone: false, check_bloom_filter: true, - prefix_hint: None, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, }, @@ -439,8 +439,8 @@ pub(crate) mod tests { 129, ReadOptions { ignore_range_tombstone: false, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, }, @@ -791,7 +791,7 @@ pub(crate) mod tests { ReadOptions { ignore_range_tombstone: false, check_bloom_filter: false, - prefix_hint: None, + dist_key_hint: None, table_id: TableId::from(existing_table_ids), retention_seconds: None, }, @@ -962,7 +962,7 @@ pub(crate) mod tests { ReadOptions { ignore_range_tombstone: false, check_bloom_filter: false, - prefix_hint: None, + dist_key_hint: None, table_id: TableId::from(existing_table_id), retention_seconds: None, }, @@ -1122,7 +1122,11 @@ pub(crate) mod tests { storage.wait_version(version).await; // 6. scan kv to check key table_id - let bloom_filter_key = key_prefix.to_vec(); + let bloom_filter_key = [ + existing_table_id.to_be_bytes().to_vec(), + key_prefix.to_vec(), + ] + .concat(); let start_bound_key = key_prefix.to_vec(); let end_bound_key = next_key(start_bound_key.as_slice()); let scan_result = storage @@ -1136,7 +1140,7 @@ pub(crate) mod tests { ReadOptions { ignore_range_tombstone: false, check_bloom_filter: true, - prefix_hint: Some(bloom_filter_key), + dist_key_hint: Some(bloom_filter_key), table_id: TableId::from(existing_table_id), retention_seconds: None, }, diff --git a/src/storage/hummock_test/src/failpoint_tests.rs b/src/storage/hummock_test/src/failpoint_tests.rs index ea065df09d3c..74a6725b30a2 100644 --- a/src/storage/hummock_test/src/failpoint_tests.rs +++ b/src/storage/hummock_test/src/failpoint_tests.rs @@ -88,7 +88,7 @@ async fn test_failpoints_state_store_read_upload() { ReadOptions { ignore_range_tombstone: false, check_bloom_filter: true, - prefix_hint: None, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, }, @@ -133,7 +133,7 @@ async fn test_failpoints_state_store_read_upload() { ReadOptions { ignore_range_tombstone: false, check_bloom_filter: true, - prefix_hint: None, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, }, @@ -147,7 +147,7 @@ async fn test_failpoints_state_store_read_upload() { ReadOptions { ignore_range_tombstone: false, check_bloom_filter: false, - prefix_hint: None, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, }, @@ -162,7 +162,7 @@ async fn test_failpoints_state_store_read_upload() { ReadOptions { ignore_range_tombstone: false, check_bloom_filter: true, - prefix_hint: None, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, }, @@ -196,7 +196,7 @@ async fn test_failpoints_state_store_read_upload() { ReadOptions { ignore_range_tombstone: false, check_bloom_filter: true, - prefix_hint: None, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, }, @@ -212,7 +212,7 @@ async fn test_failpoints_state_store_read_upload() { ReadOptions { ignore_range_tombstone: false, check_bloom_filter: true, - prefix_hint: None, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, }, diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index fcf1770e69e6..538a65ece21f 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -225,7 +225,7 @@ async fn test_storage_basic() { table_id: Default::default(), retention_seconds: None, check_bloom_filter: true, - prefix_hint: None, + dist_key_hint: None, }, ) .await @@ -241,7 +241,7 @@ async fn test_storage_basic() { table_id: Default::default(), retention_seconds: None, check_bloom_filter: true, - prefix_hint: None, + dist_key_hint: None, }, ) .await @@ -259,7 +259,7 @@ async fn test_storage_basic() { table_id: Default::default(), retention_seconds: None, check_bloom_filter: true, - prefix_hint: None, + dist_key_hint: None, }, ) .await @@ -289,7 +289,7 @@ async fn test_storage_basic() { table_id: Default::default(), retention_seconds: None, check_bloom_filter: true, - prefix_hint: None, + dist_key_hint: None, }, ) .await @@ -321,7 +321,7 @@ async fn test_storage_basic() { table_id: Default::default(), retention_seconds: None, check_bloom_filter: true, - prefix_hint: None, + dist_key_hint: None, }, ) .await @@ -338,7 +338,7 @@ async fn test_storage_basic() { table_id: Default::default(), retention_seconds: None, check_bloom_filter: true, - prefix_hint: None, + dist_key_hint: None, }, ) .await @@ -355,7 +355,7 @@ async fn test_storage_basic() { table_id: Default::default(), retention_seconds: None, check_bloom_filter: true, - prefix_hint: None, + dist_key_hint: None, }, ) .await @@ -387,7 +387,7 @@ async fn test_storage_basic() { table_id: Default::default(), retention_seconds: None, check_bloom_filter: true, - prefix_hint: None, + dist_key_hint: None, }, ) .await @@ -405,7 +405,7 @@ async fn test_storage_basic() { table_id: Default::default(), retention_seconds: None, check_bloom_filter: true, - prefix_hint: None, + dist_key_hint: None, }, ) .await @@ -422,7 +422,7 @@ async fn test_storage_basic() { table_id: Default::default(), retention_seconds: None, check_bloom_filter: true, - prefix_hint: None, + dist_key_hint: None, }, ) .await @@ -461,7 +461,7 @@ async fn test_storage_basic() { table_id: Default::default(), retention_seconds: None, check_bloom_filter: true, - prefix_hint: None, + dist_key_hint: None, }, ) .await @@ -626,8 +626,8 @@ async fn test_state_store_sync() { ignore_range_tombstone: false, table_id: Default::default(), retention_seconds: None, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, }, ) .await @@ -669,8 +669,8 @@ async fn test_state_store_sync() { ignore_range_tombstone: false, table_id: Default::default(), retention_seconds: None, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, }, ) .await @@ -690,8 +690,8 @@ async fn test_state_store_sync() { ignore_range_tombstone: false, table_id: Default::default(), retention_seconds: None, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, }, ) .await @@ -729,8 +729,8 @@ async fn test_state_store_sync() { ignore_range_tombstone: false, table_id: Default::default(), retention_seconds: None, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, }, ) .await @@ -848,7 +848,7 @@ async fn test_delete_get() { epoch2, ReadOptions { ignore_range_tombstone: false, - prefix_hint: None, + dist_key_hint: None, check_bloom_filter: true, table_id: Default::default(), retention_seconds: None, @@ -960,8 +960,8 @@ async fn test_multiple_epoch_sync() { ignore_range_tombstone: false, table_id: Default::default(), retention_seconds: None, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, }, ) .await @@ -977,8 +977,8 @@ async fn test_multiple_epoch_sync() { ignore_range_tombstone: false, table_id: Default::default(), retention_seconds: None, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, }, ) .await @@ -993,8 +993,8 @@ async fn test_multiple_epoch_sync() { ignore_range_tombstone: false, table_id: Default::default(), retention_seconds: None, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, }, ) .await @@ -1128,7 +1128,7 @@ async fn test_iter_with_min_epoch() { table_id: Default::default(), retention_seconds: None, check_bloom_filter: true, - prefix_hint: None, + dist_key_hint: None, }, ) .await @@ -1150,7 +1150,7 @@ async fn test_iter_with_min_epoch() { table_id: Default::default(), retention_seconds: None, check_bloom_filter: true, - prefix_hint: None, + dist_key_hint: None, }, ) .await @@ -1170,7 +1170,7 @@ async fn test_iter_with_min_epoch() { table_id: Default::default(), retention_seconds: Some(1), check_bloom_filter: true, - prefix_hint: None, + dist_key_hint: None, }, ) .await @@ -1209,7 +1209,7 @@ async fn test_iter_with_min_epoch() { table_id: Default::default(), retention_seconds: None, check_bloom_filter: true, - prefix_hint: None, + dist_key_hint: None, }, ) .await @@ -1231,7 +1231,7 @@ async fn test_iter_with_min_epoch() { table_id: Default::default(), retention_seconds: None, check_bloom_filter: true, - prefix_hint: None, + dist_key_hint: None, }, ) .await @@ -1253,7 +1253,7 @@ async fn test_iter_with_min_epoch() { table_id: Default::default(), retention_seconds: Some(1), check_bloom_filter: true, - prefix_hint: None, + dist_key_hint: None, }, ) .await @@ -1403,7 +1403,7 @@ async fn test_hummock_version_reader() { table_id: Default::default(), retention_seconds: None, check_bloom_filter: true, - prefix_hint: None, + dist_key_hint: None, }, read_snapshot, ) @@ -1432,7 +1432,7 @@ async fn test_hummock_version_reader() { table_id: Default::default(), retention_seconds: None, check_bloom_filter: true, - prefix_hint: None, + dist_key_hint: None, }, read_snapshot, ) @@ -1461,7 +1461,7 @@ async fn test_hummock_version_reader() { table_id: Default::default(), retention_seconds: Some(1), check_bloom_filter: true, - prefix_hint: None, + dist_key_hint: None, }, read_snapshot, ) @@ -1528,7 +1528,7 @@ async fn test_hummock_version_reader() { table_id: Default::default(), retention_seconds: None, check_bloom_filter: true, - prefix_hint: None, + dist_key_hint: None, }, read_snapshot, ) @@ -1566,7 +1566,7 @@ async fn test_hummock_version_reader() { table_id: Default::default(), retention_seconds: None, check_bloom_filter: true, - prefix_hint: None, + dist_key_hint: None, }, read_snapshot, ) @@ -1604,7 +1604,7 @@ async fn test_hummock_version_reader() { table_id: Default::default(), retention_seconds: Some(1), check_bloom_filter: true, - prefix_hint: None, + dist_key_hint: None, }, read_snapshot, ) @@ -1642,7 +1642,7 @@ async fn test_hummock_version_reader() { table_id: Default::default(), retention_seconds: None, check_bloom_filter: true, - prefix_hint: None, + dist_key_hint: None, }, read_snapshot, ) @@ -1686,7 +1686,7 @@ async fn test_hummock_version_reader() { table_id: Default::default(), retention_seconds: None, check_bloom_filter: true, - prefix_hint: None, + dist_key_hint: None, }, read_snapshot, ) @@ -1724,7 +1724,7 @@ async fn test_hummock_version_reader() { table_id: Default::default(), retention_seconds: None, check_bloom_filter: true, - prefix_hint: None, + dist_key_hint: None, }, read_snapshot, ) diff --git a/src/storage/hummock_test/src/snapshot_tests.rs b/src/storage/hummock_test/src/snapshot_tests.rs index bec858d2c284..69153b298227 100644 --- a/src/storage/hummock_test/src/snapshot_tests.rs +++ b/src/storage/hummock_test/src/snapshot_tests.rs @@ -49,7 +49,7 @@ macro_rules! assert_count_range_scan { ReadOptions { ignore_range_tombstone: false, check_bloom_filter: false, - prefix_hint: None, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, }, diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index a55387ee93ff..97feb4ba8355 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -121,8 +121,8 @@ async fn test_basic_inner( epoch1, ReadOptions { ignore_range_tombstone: false, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, }, @@ -137,8 +137,8 @@ async fn test_basic_inner( epoch1, ReadOptions { ignore_range_tombstone: false, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, }, @@ -155,8 +155,8 @@ async fn test_basic_inner( epoch1, ReadOptions { ignore_range_tombstone: false, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, }, @@ -186,8 +186,8 @@ async fn test_basic_inner( epoch2, ReadOptions { ignore_range_tombstone: false, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, }, @@ -218,8 +218,8 @@ async fn test_basic_inner( epoch3, ReadOptions { ignore_range_tombstone: false, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, }, @@ -235,8 +235,8 @@ async fn test_basic_inner( epoch3, ReadOptions { ignore_range_tombstone: false, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, }, @@ -253,7 +253,7 @@ async fn test_basic_inner( ReadOptions { ignore_range_tombstone: false, check_bloom_filter: false, - prefix_hint: None, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, }, @@ -270,8 +270,8 @@ async fn test_basic_inner( epoch1, ReadOptions { ignore_range_tombstone: false, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, }, @@ -288,8 +288,8 @@ async fn test_basic_inner( epoch2, ReadOptions { ignore_range_tombstone: false, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, }, @@ -306,7 +306,7 @@ async fn test_basic_inner( ReadOptions { ignore_range_tombstone: false, check_bloom_filter: false, - prefix_hint: None, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, }, @@ -323,8 +323,8 @@ async fn test_basic_inner( epoch3, ReadOptions { ignore_range_tombstone: false, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, }, @@ -349,8 +349,8 @@ async fn test_basic_inner( epoch2, ReadOptions { ignore_range_tombstone: false, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, }, @@ -365,8 +365,8 @@ async fn test_basic_inner( epoch2, ReadOptions { ignore_range_tombstone: false, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, }, @@ -567,8 +567,8 @@ async fn test_reload_storage() { epoch1, ReadOptions { ignore_range_tombstone: false, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, }, @@ -585,8 +585,8 @@ async fn test_reload_storage() { epoch1, ReadOptions { ignore_range_tombstone: false, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, }, @@ -616,8 +616,8 @@ async fn test_reload_storage() { epoch2, ReadOptions { ignore_range_tombstone: false, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, }, @@ -634,8 +634,8 @@ async fn test_reload_storage() { epoch1, ReadOptions { ignore_range_tombstone: false, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, }, @@ -652,8 +652,8 @@ async fn test_reload_storage() { epoch1, ReadOptions { ignore_range_tombstone: false, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, }, @@ -670,8 +670,8 @@ async fn test_reload_storage() { epoch2, ReadOptions { ignore_range_tombstone: false, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, }, @@ -687,8 +687,8 @@ async fn test_reload_storage() { epoch2, ReadOptions { ignore_range_tombstone: false, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, }, @@ -731,8 +731,8 @@ async fn test_write_anytime_inner( epoch, ReadOptions { ignore_range_tombstone: false, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, } @@ -749,8 +749,8 @@ async fn test_write_anytime_inner( epoch, ReadOptions { ignore_range_tombstone: false, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, } @@ -767,8 +767,8 @@ async fn test_write_anytime_inner( epoch, ReadOptions { ignore_range_tombstone: false, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, } @@ -788,7 +788,7 @@ async fn test_write_anytime_inner( ReadOptions { ignore_range_tombstone: false, check_bloom_filter: false, - prefix_hint: None, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, }, @@ -852,8 +852,8 @@ async fn test_write_anytime_inner( epoch, ReadOptions { ignore_range_tombstone: false, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, } @@ -868,8 +868,8 @@ async fn test_write_anytime_inner( epoch, ReadOptions { ignore_range_tombstone: false, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, } @@ -885,8 +885,8 @@ async fn test_write_anytime_inner( epoch, ReadOptions { ignore_range_tombstone: false, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, } @@ -905,7 +905,7 @@ async fn test_write_anytime_inner( ReadOptions { ignore_range_tombstone: false, check_bloom_filter: false, - prefix_hint: None, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, }, @@ -1058,8 +1058,8 @@ async fn test_delete_get_inner( epoch2, ReadOptions { ignore_range_tombstone: false, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, } @@ -1143,8 +1143,8 @@ async fn test_multiple_epoch_sync_inner( epoch1, ReadOptions { ignore_range_tombstone: false, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, } @@ -1160,8 +1160,8 @@ async fn test_multiple_epoch_sync_inner( epoch2, ReadOptions { ignore_range_tombstone: false, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, } @@ -1176,8 +1176,8 @@ async fn test_multiple_epoch_sync_inner( epoch3, ReadOptions { ignore_range_tombstone: false, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, table_id: Default::default(), retention_seconds: None, } diff --git a/src/storage/hummock_test/src/sync_point_tests.rs b/src/storage/hummock_test/src/sync_point_tests.rs index 505e78b7c65d..0b33adae11b7 100644 --- a/src/storage/hummock_test/src/sync_point_tests.rs +++ b/src/storage/hummock_test/src/sync_point_tests.rs @@ -326,8 +326,8 @@ async fn test_syncpoints_get_in_delete_range_boundary() { storage.wait_version(version).await; let read_options = ReadOptions { ignore_range_tombstone: false, - check_bloom_filter: true, - prefix_hint: None, + check_bloom_filter: false, + dist_key_hint: None, table_id: TableId::from(existing_table_id), retention_seconds: None, }; diff --git a/src/storage/src/hummock/mod.rs b/src/storage/src/hummock/mod.rs index dc036bcf90b0..f1f6ed511f09 100644 --- a/src/storage/src/hummock/mod.rs +++ b/src/storage/src/hummock/mod.rs @@ -22,6 +22,7 @@ use arc_swap::ArcSwap; use bytes::Bytes; use risingwave_common::catalog::TableId; use risingwave_common::config::StorageConfig; +use risingwave_common::hash::VirtualNode; use risingwave_hummock_sdk::key::{FullKey, TableKey}; use risingwave_hummock_sdk::{HummockEpoch, *}; #[cfg(any(test, feature = "test"))] @@ -328,15 +329,17 @@ pub async fn get_from_sstable_info( } else { get_delete_range_epoch_from_sstable(sstable.value().as_ref(), &full_key) }; + // Bloom filter key is the distribution key, which is no need to be the prefix of pk, and do not + // contain `TablePrefix` and `VnodePrefix`. + let dist_key = &ukey.table_key[VirtualNode::SIZE..]; if read_options.check_bloom_filter - && !hit_sstable_bloom_filter(sstable.value(), ukey.encode().as_slice(), local_stats) + && !hit_sstable_bloom_filter(sstable.value(), dist_key, local_stats) { if delete_epoch.is_some() { return Ok(Some(HummockValue::Delete)); } return Ok(None); } - // TODO: now SstableIterator does not use prefetch through SstableIteratorReadOptions, so we // use default before refinement. let mut iter = SstableIterator::create( @@ -376,16 +379,15 @@ pub async fn get_from_sstable_info( pub fn hit_sstable_bloom_filter( sstable_info_ref: &Sstable, - user_key: &[u8], + dist_key: &[u8], local_stats: &mut StoreLocalStatistic, ) -> bool { local_stats.bloom_filter_check_counts += 1; - let surely_not_have = sstable_info_ref.surely_not_have_user_key(user_key); + let surely_not_have = sstable_info_ref.surely_not_have_dist_key(dist_key); if surely_not_have { local_stats.bloom_filter_true_negative_count += 1; } - !surely_not_have } diff --git a/src/storage/src/hummock/sstable/builder.rs b/src/storage/src/hummock/sstable/builder.rs index 98e7f63e62da..543fe29888c5 100644 --- a/src/storage/src/hummock/sstable/builder.rs +++ b/src/storage/src/hummock/sstable/builder.rs @@ -25,6 +25,7 @@ use risingwave_hummock_sdk::key::{user_key, FullKey}; use risingwave_hummock_sdk::table_stats::{TableStats, TableStatsMap}; use risingwave_hummock_sdk::{HummockEpoch, KeyComparator, LocalSstableInfo}; use risingwave_pb::hummock::SstableInfo; +use xxhash_rust::xxh32; use super::bloom::Bloom; use super::utils::CompressionAlgorithm; @@ -101,14 +102,13 @@ pub struct SstableBuilder { /// Hashes of user keys. user_key_hashes: Vec, last_full_key: Vec, + last_extract_key: Vec, /// Buffer for encoded key and value to avoid allocation. raw_key: BytesMut, raw_value: BytesMut, last_table_id: u32, sstable_id: u64, - last_bloom_filter_key_length: usize, - /// `stale_key_count` counts range_tombstones as well. stale_key_count: u64, /// `total_key_count` counts range_tombstones as well. @@ -153,10 +153,10 @@ impl SstableBuilder { raw_key: BytesMut::new(), raw_value: BytesMut::new(), last_full_key: vec![], + last_extract_key: vec![], range_tombstones: vec![], sstable_id, filter_key_extractor, - last_bloom_filter_key_length: 0, stale_key_count: 0, total_key_count: 0, table_stats: Default::default(), @@ -210,13 +210,11 @@ impl SstableBuilder { // add bloom_filter check // 1. not empty_key // 2. extract_key key is not duplicate - if !extract_key.is_empty() - && (extract_key != &self.last_full_key[0..self.last_bloom_filter_key_length]) - { + if !extract_key.is_empty() && extract_key != self.last_extract_key.as_slice() { // avoid duplicate add to bloom filter - self.user_key_hashes - .push(farmhash::fingerprint32(extract_key)); - self.last_bloom_filter_key_length = extract_key.len(); + self.user_key_hashes.push(xxh32::xxh32(extract_key, 0)); + self.last_extract_key.clear(); + self.last_extract_key.extend_from_slice(extract_key); } } else { self.stale_key_count += 1; @@ -512,7 +510,7 @@ pub(super) mod tests { assert_eq!(table.has_bloom_filter(), with_blooms); for i in 0..key_count { let full_key = test_key_of(i); - assert!(!table.surely_not_have_user_key(full_key.user_key.encode().as_slice())); + assert!(!table.surely_not_have_dist_key(full_key.user_key.encode().as_slice())); } } diff --git a/src/storage/src/hummock/sstable/mod.rs b/src/storage/src/hummock/sstable/mod.rs index f884d23a502e..dbb80672ec84 100644 --- a/src/storage/src/hummock/sstable/mod.rs +++ b/src/storage/src/hummock/sstable/mod.rs @@ -51,6 +51,7 @@ pub use delete_range_aggregator::{ pub use sstable_id_manager::*; pub use utils::CompressionAlgorithm; use utils::{get_length_prefixed_slice, put_length_prefixed_slice}; +use xxhash_rust::xxh32; use self::utils::{xxhash64_checksum, xxhash64_verify}; use super::{HummockError, HummockResult}; @@ -139,13 +140,13 @@ impl Sstable { !self.meta.bloom_filter.is_empty() } - pub fn surely_not_have_user_key(&self, user_key: &[u8]) -> bool { + pub fn surely_not_have_dist_key(&self, dist_key: &[u8]) -> bool { let enable_bloom_filter: fn() -> bool = || { fail_point!("disable_bloom_filter", |_| false); true }; if enable_bloom_filter() && self.has_bloom_filter() { - let hash = farmhash::fingerprint32(user_key); + let hash = xxh32::xxh32(dist_key, 0); let bloom = Bloom::new(&self.meta.bloom_filter); bloom.surely_not_have_hash(hash) } else { diff --git a/src/storage/src/hummock/state_store.rs b/src/storage/src/hummock/state_store.rs index 5ba944be286f..ab8912d9e499 100644 --- a/src/storage/src/hummock/state_store.rs +++ b/src/storage/src/hummock/state_store.rs @@ -13,8 +13,7 @@ // limitations under the License. use std::future::Future; -use std::ops::Bound::{Excluded, Included}; -use std::ops::{Bound, RangeBounds}; +use std::ops::Bound; use std::sync::atomic::Ordering as MemOrdering; use std::time::Duration; @@ -22,7 +21,7 @@ use bytes::Bytes; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_common::util::epoch::INVALID_EPOCH; -use risingwave_hummock_sdk::key::{map_table_key_range, next_key, TableKey, TableKeyRange}; +use risingwave_hummock_sdk::key::{map_table_key_range, TableKey, TableKeyRange}; use risingwave_hummock_sdk::HummockReadEpoch; use risingwave_pb::hummock::SstableInfo; use tokio::sync::oneshot; @@ -142,55 +141,6 @@ impl StateStoreRead for HummockStorage { epoch: u64, read_options: ReadOptions, ) -> Self::IterFuture<'_> { - if let Some(prefix_hint) = read_options.prefix_hint.as_ref() { - let next_key = next_key(prefix_hint); - - // learn more detail about start_bound with storage_table.rs. - match key_range.start_bound() { - // it guarantees that the start bound must be included (some different case) - // 1. Include(pk + col_bound) => prefix_hint <= start_bound < - // next_key(prefix_hint) - // - // for case2, frontend need to reject this, avoid excluded start_bound and - // transform it to included(next_key), without this case we can just guarantee - // that start_bound < next_key - // - // 2. Include(next_key(pk + - // col_bound)) => prefix_hint <= start_bound <= next_key(prefix_hint) - // - // 3. Include(pk) => prefix_hint <= start_bound < next_key(prefix_hint) - Included(range_start) | Excluded(range_start) => { - assert!(range_start.as_slice() >= prefix_hint.as_slice()); - assert!(range_start.as_slice() < next_key.as_slice() || next_key.is_empty()); - } - - _ => unreachable!(), - } - - match key_range.end_bound() { - Included(range_end) => { - assert!(range_end.as_slice() >= prefix_hint.as_slice()); - assert!(range_end.as_slice() < next_key.as_slice() || next_key.is_empty()); - } - - // 1. Excluded(end_bound_of_prefix(pk + col)) => prefix_hint < end_bound <= - // next_key(prefix_hint) - // - // 2. Excluded(pk + bound) => prefix_hint < end_bound <= - // next_key(prefix_hint) - Excluded(range_end) => { - assert!(range_end.as_slice() > prefix_hint.as_slice()); - assert!(range_end.as_slice() <= next_key.as_slice() || next_key.is_empty()); - } - - std::ops::Bound::Unbounded => { - assert!(next_key.is_empty()); - } - } - } else { - // not check - } - self.iter_inner(map_table_key_range(key_range), epoch, read_options) } } diff --git a/src/storage/src/hummock/state_store_v1.rs b/src/storage/src/hummock/state_store_v1.rs index 8c8eca89ff34..ac2b4be6ab49 100644 --- a/src/storage/src/hummock/state_store_v1.rs +++ b/src/storage/src/hummock/state_store_v1.rs @@ -27,8 +27,7 @@ use minitrace::Span; use risingwave_common::catalog::TableId; use risingwave_common::util::epoch::INVALID_EPOCH; use risingwave_hummock_sdk::key::{ - bound_table_key_range, map_table_key_range, next_key, user_key, FullKey, TableKey, - TableKeyRange, UserKey, + bound_table_key_range, map_table_key_range, user_key, FullKey, TableKey, TableKeyRange, UserKey, }; use risingwave_hummock_sdk::key_range::KeyRangeCommon; use risingwave_hummock_sdk::{can_concat, HummockReadEpoch}; @@ -297,11 +296,7 @@ impl HummockStorageV1 { ); assert!(pinned_version.is_valid()); // encode once - let bloom_filter_key = if let Some(prefix) = read_options.prefix_hint.as_ref() { - Some(UserKey::new(read_options.table_id, TableKey(prefix)).encode()) - } else { - None - }; + let bloom_filter_key = read_options.dist_key_hint.map(TableKey); for level in pinned_version.levels(table_id) { if level.table_infos.is_empty() { continue; @@ -438,55 +433,6 @@ impl StateStoreRead for HummockStorageV1 { epoch: HummockEpoch, read_options: ReadOptions, ) -> Self::IterFuture<'_> { - if let Some(prefix_hint) = read_options.prefix_hint.as_ref() { - let next_key = next_key(prefix_hint); - - // learn more detail about start_bound with storage_table.rs. - match key_range.start_bound() { - // it guarantees that the start bound must be included (some different case) - // 1. Include(pk + col_bound) => prefix_hint <= start_bound < - // next_key(prefix_hint) - // - // for case2, frontend need to reject this, avoid excluded start_bound and - // transform it to included(next_key), without this case we can just guarantee - // that start_bound < next_key - // - // 2. Include(next_key(pk + - // col_bound)) => prefix_hint <= start_bound <= next_key(prefix_hint) - // - // 3. Include(pk) => prefix_hint <= start_bound < next_key(prefix_hint) - Included(range_start) | Excluded(range_start) => { - assert!(range_start.as_slice() >= prefix_hint.as_slice()); - assert!(range_start.as_slice() < next_key.as_slice() || next_key.is_empty()); - } - - _ => unreachable!(), - } - - match key_range.end_bound() { - Included(range_end) => { - assert!(range_end.as_slice() >= prefix_hint.as_slice()); - assert!(range_end.as_slice() < next_key.as_slice() || next_key.is_empty()); - } - - // 1. Excluded(end_bound_of_prefix(pk + col)) => prefix_hint < end_bound <= - // next_key(prefix_hint) - // - // 2. Excluded(pk + bound) => prefix_hint < end_bound <= - // next_key(prefix_hint) - Excluded(range_end) => { - assert!(range_end.as_slice() > prefix_hint.as_slice()); - assert!(range_end.as_slice() <= next_key.as_slice() || next_key.is_empty()); - } - - std::ops::Bound::Unbounded => { - assert!(next_key.is_empty()); - } - } - } else { - // not check - } - self.iter_inner::(epoch, map_table_key_range(key_range), read_options) .map_ok(|iter| iter.into_stream()) .in_span(self.tracing.new_tracer("hummock_iter")) diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 2a52ad1de2db..c45e7169ae14 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -539,11 +539,7 @@ impl HummockVersionReader { } let mut staging_sst_iter_count = 0; // encode once - let bloom_filter_key = if let Some(prefix) = read_options.prefix_hint.as_ref() { - Some(UserKey::new(read_options.table_id, TableKey(prefix)).encode()) - } else { - None - }; + let bloom_filter_key = read_options.dist_key_hint.as_deref(); for sstable_info in &uncommitted_ssts { let table_holder = self @@ -554,7 +550,7 @@ impl HummockVersionReader { if let Some(bloom_filter_key) = bloom_filter_key.as_ref() { if !hit_sstable_bloom_filter( table_holder.value(), - bloom_filter_key.as_slice(), + bloom_filter_key, &mut local_stats, ) { continue; @@ -625,12 +621,10 @@ impl HummockVersionReader { .sstable(sstable_info, &mut local_stats) .in_span(Span::enter_with_local_parent("get_sstable")) .await?; - if let Some(bloom_filter_key) = read_options.prefix_hint.as_ref() { + if let Some(bloom_filter_key) = read_options.dist_key_hint.as_deref() { if !hit_sstable_bloom_filter( sstable.value(), - UserKey::new(read_options.table_id, TableKey(bloom_filter_key)) - .encode() - .as_slice(), + bloom_filter_key, &mut local_stats, ) { continue; @@ -670,7 +664,7 @@ impl HummockVersionReader { if let Some(bloom_filter_key) = bloom_filter_key.as_ref() && !hit_sstable_bloom_filter( sstable.value(), - bloom_filter_key.as_slice(), + bloom_filter_key, &mut local_stats, ) { diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 31997201829d..d437a4320b84 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -94,11 +94,11 @@ pub trait StateStoreRead: StaticSendSync { read_options: ReadOptions, ) -> Self::GetFuture<'_>; - /// Opens and returns an iterator for given `prefix_hint` and `full_key_range` - /// Internally, `prefix_hint` will be used to for checking `bloom_filter` and - /// `full_key_range` used for iter. (if the `prefix_hint` not None, it should be be included in - /// `key_range`) The returned iterator will iterate data based on a snapshot corresponding to - /// the given `epoch`. + /// Opens and returns an iterator for given `dist_key_hint` and `full_key_range` + /// Internally, `dist_key_hint` will be used to for checking `bloom_filter` and + /// `full_key_range` used for iter. (if the `dist_key_hint` not None, it should be be included + /// in `key_range`) The returned iterator will iterate data based on a snapshot + /// corresponding to the given `epoch`. fn iter( &self, key_range: (Bound>, Bound>), @@ -113,7 +113,7 @@ pub trait StateStoreReadExt: StaticSendSync { type ScanFuture<'a>: ScanFutureTrait<'a>; /// Scans `limit` number of keys from a key range. If `limit` is `None`, scans all elements. - /// Internally, `prefix_hint` will be used to for checking `bloom_filter` and + /// Internally, `dist_key_hint` will be used to for checking `bloom_filter` and /// `full_key_range` used for iter. /// The result is based on a snapshot corresponding to the given `epoch`. /// @@ -274,9 +274,9 @@ pub trait LocalStateStore: StateStoreRead + StateStoreWrite + StaticSendSync { #[derive(Default, Clone)] pub struct ReadOptions { /// A hint for prefix key to check bloom filter. - /// If the `prefix_hint` is not None, it should be included in + /// If the `dist_key_hint` is not None, it should be included in /// `key` or `key_range` in the read API. - pub prefix_hint: Option>, + pub dist_key_hint: Option>, pub ignore_range_tombstone: bool, pub check_bloom_filter: bool, diff --git a/src/storage/src/table/batch_table/storage_table.rs b/src/storage/src/table/batch_table/storage_table.rs index b231298f3b39..6a4b5cde100e 100644 --- a/src/storage/src/table/batch_table/storage_table.rs +++ b/src/storage/src/table/batch_table/storage_table.rs @@ -23,7 +23,10 @@ use futures::{Stream, StreamExt}; use futures_async_stream::try_stream; use itertools::Itertools; use risingwave_common::buffer::Bitmap; -use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId, TableOption}; +use risingwave_common::catalog::{ + get_dist_key_in_pk_indices, get_dist_key_start_index_in_pk, ColumnDesc, ColumnId, Schema, + TableId, TableOption, +}; use risingwave_common::hash::VirtualNode; use risingwave_common::row::{self, Row, Row2, RowDeserializer, RowExt}; use risingwave_common::util::ordered::*; @@ -78,6 +81,7 @@ pub struct StorageTable { /// Indices of distribution key for computing vnode. /// Note that the index is based on the primary key columns by `pk_indices`. dist_key_in_pk_indices: Vec, + distribution_key_start_index_in_pk: Option, /// Virtual nodes that the table is partitioned into. /// @@ -183,21 +187,9 @@ impl StorageTable { let pk_serializer = OrderedRowSerde::new(pk_data_types, order_types); let row_deserializer = RowDeserializer::new(all_data_types); - let dist_key_in_pk_indices = dist_key_indices - .iter() - .map(|&di| { - pk_indices - .iter() - .position(|&pi| di == pi) - .unwrap_or_else(|| { - panic!( - "distribution key {:?} must be a subset of primary key {:?}", - dist_key_indices, pk_indices - ) - }) - }) - .collect_vec(); - + let dist_key_in_pk_indices = get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices); + let distribution_key_start_index_in_pk = + get_dist_key_start_index_in_pk(&dist_key_in_pk_indices); Self { table_id, store, @@ -208,6 +200,7 @@ impl StorageTable { pk_indices, dist_key_indices, dist_key_in_pk_indices, + distribution_key_start_index_in_pk, vnodes, table_option, } @@ -253,9 +246,11 @@ impl StorageTable { .into_iter() .map(|index| self.pk_indices[index]) .collect_vec(); + let read_options = ReadOptions { - prefix_hint: None, - check_bloom_filter: self.dist_key_indices == key_indices, + dist_key_hint: None, + check_bloom_filter: !self.dist_key_indices.is_empty() + && self.dist_key_indices == key_indices, retention_seconds: self.table_option.retention_seconds, ignore_range_tombstone: false, table_id: self.table_id, @@ -292,7 +287,7 @@ impl StorageTable { /// `vnode_hint`, and merge or concat them by given `ordered`. async fn iter_with_encoded_key_range( &self, - prefix_hint: Option>, + dist_key_hint: Option>, encoded_key_range: R, wait_epoch: HummockReadEpoch, vnode_hint: Option, @@ -316,14 +311,13 @@ impl StorageTable { // can use a single iterator. let iterators: Vec<_> = try_join_all(vnodes.map(|vnode| { let raw_key_range = prefixed_range(encoded_key_range.clone(), &vnode.to_be_bytes()); - let prefix_hint = prefix_hint - .clone() - .map(|prefix_hint| [&vnode.to_be_bytes(), prefix_hint.as_slice()].concat()); + + let dist_key_hint = dist_key_hint.clone(); let wait_epoch = wait_epoch.clone(); async move { - let check_bloom_filter = prefix_hint.is_some(); + let check_bloom_filter = dist_key_hint.is_some(); let read_options = ReadOptions { - prefix_hint, + dist_key_hint, check_bloom_filter, ignore_range_tombstone: false, retention_seconds: self.table_option.retention_seconds, @@ -433,27 +427,48 @@ impl StorageTable { .into_iter() .map(|index| self.pk_indices[index]) .collect_vec(); - let prefix_hint = if self.dist_key_indices.is_empty() - || self.dist_key_indices != pk_prefix_indices + + let dist_key_hint = if let Some(distribution_key_start_index_in_pk) = + self.distribution_key_start_index_in_pk && + self.dist_key_indices.len() + distribution_key_start_index_in_pk + <= pk_prefix.len() { + let encoded_prefix = if let Bound::Included(start_key) = start_key.as_ref() { + start_key + } else { + unreachable!() + }; + let distribution_key_end_index_in_pk = + self.dist_key_in_pk_indices.len() + distribution_key_start_index_in_pk; + let (dist_key_start_position, dist_key_len) = self + .pk_serializer + .deserialize_dist_key_position_with_column_indices( + encoded_prefix, + ( + distribution_key_start_index_in_pk, + distribution_key_end_index_in_pk, + ), + ) + .unwrap(); + Some( + encoded_prefix[dist_key_start_position..dist_key_len + dist_key_start_position] + .to_vec(), + ) + } else { trace!( - "iter_with_pk_bounds dist_key_indices table_id {} not match prefix pk_prefix {:?} dist_key_indices {:?} pk_prefix_indices {:?}", - self.table_id, - pk_prefix, - self.dist_key_indices, - pk_prefix_indices - ); + "iter_with_pk_bounds dist_key_indices table_id {} not match prefix pk_prefix {:?} dist_key_indices {:?} pk_prefix_indices {:?}", + self.table_id, + pk_prefix, + self.dist_key_indices, + pk_prefix_indices + ); None - } else { - let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len()); - let serialized_pk_prefix = serialize_pk(&pk_prefix, &pk_prefix_serializer); - Some(serialized_pk_prefix) }; trace!( - "iter_with_pk_bounds table_id {} prefix_hint {:?} start_key: {:?}, end_key: {:?} pk_prefix {:?} dist_key_indices {:?} pk_prefix_indices {:?}" , + "iter_with_pk_bounds table_id {} dist_key_hint {:?} start_key: {:?}, end_key: {:?} pk_prefix {:?} dist_key_indices {:?} pk_prefix_indices {:?}" , self.table_id, - prefix_hint, + dist_key_hint, start_key, end_key, pk_prefix, @@ -462,7 +477,7 @@ impl StorageTable { ); self.iter_with_encoded_key_range( - prefix_hint, + dist_key_hint, (start_key, end_key), epoch, self.try_compute_vnode_by_pk_prefix(pk_prefix), diff --git a/src/stream/src/common/table/state_table.rs b/src/stream/src/common/table/state_table.rs index 989f5c1633b0..8cc451e5c6c6 100644 --- a/src/stream/src/common/table/state_table.rs +++ b/src/stream/src/common/table/state_table.rs @@ -26,7 +26,9 @@ use futures_async_stream::try_stream; use itertools::{izip, Itertools}; use risingwave_common::array::{Op, StreamChunk, Vis}; use risingwave_common::buffer::Bitmap; -use risingwave_common::catalog::{ColumnDesc, TableId, TableOption}; +use risingwave_common::catalog::{ + get_dist_key_in_pk_indices, get_dist_key_start_index_in_pk, ColumnDesc, TableId, TableOption, +}; use risingwave_common::hash::VirtualNode; use risingwave_common::row::{self, CompactedRow, Row, Row2, RowDeserializer, RowExt}; use risingwave_common::types::ScalarImpl; @@ -89,6 +91,8 @@ pub struct StateTable { /// Note that the index is based on the primary key columns by `pk_indices`. dist_key_in_pk_indices: Vec, + distribution_key_start_index_in_pk: Option, + /// Virtual nodes that the table is partitioned into. /// /// Only the rows whose vnode of the primary key is in this set will be visible to the @@ -111,13 +115,13 @@ pub struct StateTable { /// The epoch flush to the state store last time. epoch: Option, - /// last watermark that is used to construct delete ranges in `ingest` + /// last watermark that is used to construct delete ranges in `ingest`. last_watermark: Option, /// latest watermark cur_watermark: Option, - /// number of commits with watermark since the last time we did state cleaning by watermark + /// number of commits with watermark since the last time we did state cleaning by watermark. num_wmked_commits_since_last_clean: usize, } @@ -156,21 +160,7 @@ impl StateTable { .map(|col_order| col_order.index as usize) .collect_vec(); - let dist_key_in_pk_indices = dist_key_indices - .iter() - .map(|&di| { - pk_indices - .iter() - .position(|&pi| di == pi) - .unwrap_or_else(|| { - panic!( - "distribution key {:?} must be a subset of primary key {:?}", - dist_key_indices, pk_indices - ) - }) - }) - .collect_vec(); - + let dist_key_in_pk_indices = get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices); let local_state_store = store.new_local(table_id).await; let pk_data_types = pk_indices @@ -189,6 +179,8 @@ impl StateTable { }, None => Distribution::fallback(), }; + let distribution_key_start_index_in_pk = + get_dist_key_start_index_in_pk(&dist_key_in_pk_indices); let vnode_col_idx_in_pk = table_catalog .vnode_col_idx @@ -226,6 +218,7 @@ impl StateTable { pk_indices: pk_indices.to_vec(), dist_key_indices, dist_key_in_pk_indices, + distribution_key_start_index_in_pk, vnodes, table_option: TableOption::build_table_option(table_catalog.get_properties()), disable_sanity_check: false, @@ -308,20 +301,7 @@ impl StateTable { .collect(), None => table_columns.iter().map(|c| c.data_type.clone()).collect(), }; - let dist_key_in_pk_indices = dist_key_indices - .iter() - .map(|&di| { - pk_indices - .iter() - .position(|&pi| di == pi) - .unwrap_or_else(|| { - panic!( - "distribution key {:?} must be a subset of primary key {:?}", - dist_key_indices, pk_indices - ) - }) - }) - .collect_vec(); + let dist_key_in_pk_indices = get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices); Self { table_id, mem_table: MemTable::new(), @@ -331,6 +311,7 @@ impl StateTable { pk_indices, dist_key_indices, dist_key_in_pk_indices, + distribution_key_start_index_in_pk: None, vnodes, table_option: Default::default(), disable_sanity_check: false, @@ -472,8 +453,9 @@ impl StateTable { .map(|index| self.pk_indices[index]) .collect_vec(); let read_options = ReadOptions { - prefix_hint: None, - check_bloom_filter: self.dist_key_indices == key_indices, + dist_key_hint: None, + check_bloom_filter: !self.dist_key_indices.is_empty() + && self.dist_key_indices == key_indices, retention_seconds: self.table_option.retention_seconds, table_id: self.table_id, ignore_range_tombstone: false, @@ -514,7 +496,6 @@ impl StateTable { std::mem::replace(&mut self.vnodes, new_vnodes) } } - // write impl StateTable { fn handle_mem_table_error(&self, e: MemTableError) { @@ -778,7 +759,7 @@ impl StateTable { epoch: u64, ) -> StreamExecutorResult<()> { let read_options = ReadOptions { - prefix_hint: None, + dist_key_hint: None, check_bloom_filter: false, retention_seconds: self.table_option.retention_seconds, table_id: self.table_id, @@ -810,7 +791,7 @@ impl StateTable { epoch: u64, ) -> StreamExecutorResult<()> { let read_options = ReadOptions { - prefix_hint: None, + dist_key_hint: None, check_bloom_filter: false, retention_seconds: self.table_option.retention_seconds, table_id: self.table_id, @@ -844,7 +825,7 @@ impl StateTable { epoch: u64, ) -> StreamExecutorResult<()> { let read_options = ReadOptions { - prefix_hint: None, + dist_key_hint: None, ignore_range_tombstone: false, check_bloom_filter: false, retention_seconds: self.table_option.retention_seconds, @@ -1010,37 +991,56 @@ impl StateTable { // Construct prefix hint for prefix bloom filter. let pk_prefix_indices = &self.pk_indices[..pk_prefix.len()]; - let prefix_hint = { - if self.dist_key_indices.is_empty() || self.dist_key_indices != pk_prefix_indices { + let dist_key_hint = { + if self.dist_key_indices.is_empty() + || self.distribution_key_start_index_in_pk.is_none() + || self.dist_key_indices.len() + self.distribution_key_start_index_in_pk.unwrap() + > pk_prefix.len() + { None } else { - Some([&vnode, &encoded_prefix[..]].concat()) + let distribution_key_end_index_in_pk = self.dist_key_in_pk_indices.len() + + self.distribution_key_start_index_in_pk.unwrap(); + let (dist_key_start_position, dist_key_len) = self + .pk_serde + .deserialize_dist_key_position_with_column_indices( + &encoded_prefix, + ( + self.distribution_key_start_index_in_pk.unwrap(), + distribution_key_end_index_in_pk, + ), + )?; + + Some( + encoded_prefix[dist_key_start_position..dist_key_len + dist_key_start_position] + .to_vec(), + ) } }; trace!( table_id = ?self.table_id(), - ?prefix_hint, ?encoded_key_range_with_vnode, ?pk_prefix, + ?dist_key_hint, ?encoded_key_range_with_vnode, ?pk_prefix, dist_key_indices = ?self.dist_key_indices, ?pk_prefix_indices, "storage_iter_with_prefix" ); - self.iter_inner(encoded_key_range_with_vnode, prefix_hint, epoch) + self.iter_inner(encoded_key_range_with_vnode, dist_key_hint, epoch) .await } async fn iter_inner( &self, key_range: (Bound>, Bound>), - prefix_hint: Option>, + dist_key_hint: Option>, epoch: u64, ) -> StreamExecutorResult<(MemTableIter<'_>, StorageIterInner)> { // Mem table iterator. let mem_table_iter = self.mem_table.iter(key_range.clone()); - let check_bloom_filter = prefix_hint.is_some(); + let check_bloom_filter = dist_key_hint.is_some(); let read_options = ReadOptions { - prefix_hint, + dist_key_hint, check_bloom_filter, ignore_range_tombstone: false, retention_seconds: self.table_option.retention_seconds, diff --git a/src/tests/compaction_test/src/runner.rs b/src/tests/compaction_test/src/runner.rs index 6e6ffabdd5cc..6d6cd6fcc9db 100644 --- a/src/tests/compaction_test/src/runner.rs +++ b/src/tests/compaction_test/src/runner.rs @@ -591,7 +591,7 @@ async fn open_hummock_iters( range.clone(), epoch, ReadOptions { - prefix_hint: None, + dist_key_hint: None, table_id: TableId { table_id }, retention_seconds: None, check_bloom_filter: false,