Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): change the prefix_hint to dist_key_hint for bloom_filter #6575

Merged
merged 59 commits into from
Dec 8, 2022
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
e6af44d
distribution_key_start_index_in_pk
wcy-fdu Nov 24, 2022
241723b
refactor judgement of prefix_hint
wcy-fdu Nov 24, 2022
759df20
rename prefix_hint to dist_key_hint
wcy-fdu Nov 24, 2022
4c6c942
retry
wcy-fdu Nov 24, 2022
02f48da
retry
wcy-fdu Nov 24, 2022
75a3a9b
add start_index in FilterKeyExtractor
wcy-fdu Nov 28, 2022
7ce696c
fix MultiFilterKeyExtractor
wcy-fdu Nov 28, 2022
b2209cb
todo: rename all pk_prefix to dist_key
wcy-fdu Nov 28, 2022
49381a9
rollback and fix
wcy-fdu Nov 28, 2022
21d7c68
detect correctness, fix
wcy-fdu Nov 28, 2022
b6d767d
hope to pass CI
wcy-fdu Nov 28, 2022
35faa1f
some rename
wcy-fdu Nov 29, 2022
4d7f27e
outer join fails
wcy-fdu Nov 29, 2022
85bc650
remove table_id and vnode in bloom_filter_key
wcy-fdu Nov 29, 2022
fd9b63e
ignore some check
wcy-fdu Nov 30, 2022
d0868c1
remove assertion, hope to pass CI
wcy-fdu Nov 30, 2022
9c37a58
fix ut: test_compaction_with_filter_key_extractor
wcy-fdu Nov 30, 2022
cb8e352
remove distribution_key_start_index_in_pk in catalog
wcy-fdu Nov 30, 2022
746e6a0
resolve conflict
wcy-fdu Nov 30, 2022
7216db8
fix typo
wcy-fdu Nov 30, 2022
88aab17
fix dist_key shuffle
wcy-fdu Dec 1, 2022
62e985d
add more check when point get
wcy-fdu Dec 1, 2022
7d3073d
fix
wcy-fdu Dec 1, 2022
09267dd
fix
wcy-fdu Dec 1, 2022
b5a91aa
resolve some comments
wcy-fdu Dec 2, 2022
9a8ff30
fix again
wcy-fdu Dec 2, 2022
6220865
try again
wcy-fdu Dec 2, 2022
0bdbdae
try again
wcy-fdu Dec 2, 2022
ba5752f
clean up code, fix point-get check bloom filter
wcy-fdu Dec 5, 2022
b11a81b
ignore discontinuous cases
wcy-fdu Dec 5, 2022
16a0a1d
ignore shuffle dist key
wcy-fdu Dec 5, 2022
fe060c3
add discontinuous dist key
wcy-fdu Dec 5, 2022
8052d10
handle shuffle dist key
wcy-fdu Dec 5, 2022
ab72252
we should ignore shuffle dist key
wcy-fdu Dec 5, 2022
9b7be4a
need to find bug
wcy-fdu Dec 5, 2022
09ee7c2
minor fix
wcy-fdu Dec 5, 2022
88c1d87
resolve some comments
wcy-fdu Dec 5, 2022
aa34b37
typo
wcy-fdu Dec 5, 2022
8c42dfa
sort before judge continuous
wcy-fdu Dec 6, 2022
c120fcd
debug
wcy-fdu Dec 6, 2022
44c8295
find bug and fix
wcy-fdu Dec 6, 2022
ba22bbc
Merge branch 'main' into wcy/prefix-bloom-filter
wcy-fdu Dec 6, 2022
059a6cc
use xxhash
wcy-fdu Dec 6, 2022
80dbeb9
cargo toml fmt
wcy-fdu Dec 6, 2022
d661984
cargo toml fmt
wcy-fdu Dec 6, 2022
de3d6a3
resolve comments
wcy-fdu Dec 7, 2022
8085a59
resolve some comments
wcy-fdu Dec 7, 2022
77403a1
retry
wcy-fdu Dec 7, 2022
2b0cc7d
add check when point get
wcy-fdu Dec 7, 2022
e15153d
fix ut
wcy-fdu Dec 8, 2022
e440703
keep xxh
wcy-fdu Dec 8, 2022
6573752
fix
wcy-fdu Dec 8, 2022
a65a211
resolve conflict
wcy-fdu Dec 8, 2022
b75ef11
resolve conflict
wcy-fdu Dec 8, 2022
ded6f46
ut
wcy-fdu Dec 8, 2022
4991892
retry
wcy-fdu Dec 8, 2022
84831a3
resolve comments
wcy-fdu Dec 8, 2022
62a68a5
resolve conflict
wcy-fdu Dec 8, 2022
e524946
Merge branch 'main' into wcy/prefix-bloom-filter
mergify[bot] Dec 8, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/batch/src/executor/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ mod tests {
epoch,
None,
ReadOptions {
prefix_hint: None,
dist_key_hint: None,
check_bloom_filter: false,
ignore_range_tombstone: false,
table_id: Default::default(),
Expand Down
101 changes: 101 additions & 0 deletions src/common/src/util/ordered/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,17 @@ impl OrderedRowSerde {
}
}

pub fn dist_key_serde(&self, start_index: usize, end_index: usize) -> Cow<'_, Self> {
if end_index - start_index == self.order_types.len() {
Cow::Borrowed(self)
} else {
Cow::Owned(Self {
schema: self.schema[start_index..end_index].to_vec(),
order_types: self.order_types[start_index..end_index].to_vec(),
})
}
}

pub fn serialize(&self, row: impl Row2, append_to: &mut Vec<u8>) {
self.serialize_datums(row.iter(), append_to)
}
Expand Down Expand Up @@ -104,6 +115,34 @@ impl OrderedRowSerde {

Ok(len)
}

/// return the distribution key start position in serialized key and the distribution key
/// length.
pub fn deserialize_dist_key_position_with_column_indices(
&self,
key: &[u8],
column_indices: impl Iterator<Item = usize>,
wcy-fdu marked this conversation as resolved.
Show resolved Hide resolved
dist_key_start_index: usize,
) -> memcomparable::Result<(usize, usize)> {
use crate::types::ScalarImpl;
let mut dist_key_start_position: usize = 0;
let mut len: usize = 0;
for index in column_indices {
let data_type = &self.schema[index];
let order_type = &self.order_types[index];
let data = &key[len..];
let mut deserializer = memcomparable::Deserializer::new(data);
deserializer.set_reverse(*order_type == OrderType::Descending);

let field_length = ScalarImpl::encoding_data_size(data_type, &mut deserializer)?;
len += field_length;
if index < dist_key_start_index {
dist_key_start_position += field_length;
}
}

Ok((dist_key_start_position, (len - dist_key_start_position)))
}
}

#[cfg(test)]
Expand Down Expand Up @@ -237,6 +276,68 @@ mod tests {
}
}

#[test]
fn test_deserialize_dist_key_position_with_column_indices() {
let order_types = vec![
OrderType::Descending,
OrderType::Ascending,
OrderType::Descending,
OrderType::Ascending,
];

let schema = vec![
DataType::Varchar,
DataType::Int16,
DataType::Varchar,
DataType::Varchar,
];
let serde = OrderedRowSerde::new(schema, order_types);
let row1 = Row::new(vec![
Some(Utf8("aaa".to_string())),
Some(Int16(5)),
Some(Utf8("bbb".to_string())),
Some(Utf8("ccc".to_string())),
]);
let rows = vec![row1];
let mut array = vec![];
for row in &rows {
let mut row_bytes = vec![];
serde.serialize(row, &mut row_bytes);
array.push(row_bytes);
}

{
let dist_key_indices = [1, 2];
let dist_key_start_index = 1;
let (dist_key_start_position, dist_key_len) = serde
.deserialize_dist_key_position_with_column_indices(
&array[0],
0..dist_key_start_index + dist_key_indices.len(),
dist_key_start_index,
)
.unwrap();

let schema = vec![DataType::Varchar];
let order_types = vec![OrderType::Descending];
let deserde = OrderedRowSerde::new(schema, order_types);
let prefix_slice = &array[0][0..dist_key_start_position];
assert_eq!(
deserde.deserialize(prefix_slice).unwrap(),
Row::new(vec![Some(Utf8("aaa".to_string()))])
);

let schema = vec![DataType::INT16, DataType::VARCHAR];
let order_types = vec![OrderType::Ascending, OrderType::Descending];
let deserde = OrderedRowSerde::new(schema, order_types);
let dist_key_slice =
&array[0][dist_key_start_position..dist_key_start_position + dist_key_len];
assert_eq!(
deserde.deserialize(dist_key_slice).unwrap(),
Row::new(vec![Some(Int16(5)), Some(Utf8("bbb".to_string()))])
);
}
}

#[test]
fn test_encoding_data_size() {
use std::mem::size_of;
Expand Down
2 changes: 1 addition & 1 deletion src/ctl/src/cmd_impl/hummock/list_kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub async fn list_kv(epoch: u64, table_id: u32) -> anyhow::Result<()> {
None,
ReadOptions {
ignore_range_tombstone: false,
prefix_hint: None,
dist_key_hint: None,
table_id: TableId { table_id },
retention_seconds: None,
check_bloom_filter: false,
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/catalog/table_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ mod tests {
vnode_col_idx: None,
value_indices: vec![0],
definition: "".into(),
handle_pk_conflict: false
handle_pk_conflict: false,
}
);
assert_eq!(table, TableCatalog::from(table.to_prost(0, 0)));
Expand Down
4 changes: 3 additions & 1 deletion src/frontend/src/optimizer/plan_node/stream_materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,16 @@ impl StreamMaterialize {

let ctx = input.ctx();
let properties = ctx.inner().with_options.internal_table_subset();
let distribution_key = base.dist.dist_column_indices().to_vec();

let table = TableCatalog {
id: TableId::placeholder(),
associated_source_id: None,
name: mv_name,
columns,
pk: pk_list,
stream_key: pk_indices.clone(),
distribution_key: base.dist.dist_column_indices().to_vec(),
distribution_key,
is_index,
appendonly: input.append_only(),
owner: risingwave_common::catalog::DEFAULT_SUPER_USER_ID,
Expand Down
Loading