From bcfd5ef7b079bf1125a96edf360797b332cead7d Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Mon, 13 May 2024 09:49:42 +0000 Subject: [PATCH 1/6] fix: label mismatch --- src/mito2/src/memtable/partition_tree/dict.rs | 73 ++++++++++++------- .../src/memtable/partition_tree/partition.rs | 11 ++- .../src/memtable/partition_tree/shard.rs | 5 +- .../memtable/partition_tree/shard_builder.rs | 24 +++--- 4 files changed, 69 insertions(+), 44 deletions(-) diff --git a/src/mito2/src/memtable/partition_tree/dict.rs b/src/mito2/src/memtable/partition_tree/dict.rs index ea7874352bfd..6fc0f170cf76 100644 --- a/src/mito2/src/memtable/partition_tree/dict.rs +++ b/src/mito2/src/memtable/partition_tree/dict.rs @@ -26,7 +26,7 @@ use crate::metrics::MEMTABLE_DICT_BYTES; /// Maximum keys in a [DictBlock]. const MAX_KEYS_PER_BLOCK: u16 = 256; -type PkIndexMap = BTreeMap, PkIndex>; +type PkIndexMap = BTreeMap, (PkIndex, Option>)>; /// Builder to build a key dictionary. pub struct KeyDictBuilder { @@ -66,10 +66,15 @@ impl KeyDictBuilder { /// /// # Panics /// Panics if the builder is full. - pub fn insert_key(&mut self, key: &[u8], metrics: &mut WriteMetrics) -> PkIndex { + pub fn insert_key( + &mut self, + full_primary_key: &[u8], + sparse_key: Option<&[u8]>, + metrics: &mut WriteMetrics, + ) -> PkIndex { assert!(!self.is_full()); - if let Some(pk_index) = self.pk_to_index.get(key).copied() { + if let Some(pk_index) = self.pk_to_index.get(full_primary_key).map(|v| v.0) { // Already in the builder. return pk_index; } @@ -81,16 +86,22 @@ impl KeyDictBuilder { } // Safety: we have checked the buffer length. - let pk_index = self.key_buffer.push_key(key); - self.pk_to_index.insert(key.to_vec(), pk_index); + let pk_index = self.key_buffer.push_key(full_primary_key); + let (sparse_key, sparse_key_len) = if let Some(sparse_key) = sparse_key { + (Some(sparse_key.to_vec()), sparse_key.len()) + } else { + (None, 0) + }; + self.pk_to_index + .insert(full_primary_key.to_vec(), (pk_index, sparse_key)); self.num_keys += 1; // Since we store the key twice so the bytes usage doubled. - metrics.key_bytes += key.len() * 2; - self.key_bytes_in_index += key.len(); + metrics.key_bytes += full_primary_key.len() * 2 + sparse_key_len; + self.key_bytes_in_index += full_primary_key.len(); // Adds key size of index to the metrics. - MEMTABLE_DICT_BYTES.add(key.len() as i64); + MEMTABLE_DICT_BYTES.add((full_primary_key.len() + sparse_key_len) as i64); pk_index } @@ -108,37 +119,47 @@ impl KeyDictBuilder { } /// Finishes the builder. - pub fn finish(&mut self, pk_to_index: &mut BTreeMap, PkIndex>) -> Option { + pub fn finish(&mut self) -> Option<(KeyDict, BTreeMap, PkIndex>)> { if self.key_buffer.is_empty() { return None; } + let mut pk_to_index_map = BTreeMap::new(); // Finishes current dict block and resets the pk index. let dict_block = self.key_buffer.finish(true); self.dict_blocks.push(dict_block); // Computes key position and then alter pk index. let mut key_positions = vec![0; self.pk_to_index.len()]; - for (i, pk_index) in self.pk_to_index.values_mut().enumerate() { + + for (i, (full_pk, (pk_index, sparse_key))) in (std::mem::take(&mut self.pk_to_index)) + .into_iter() + .enumerate() + { // The position of the i-th key is the old pk index. - key_positions[i] = *pk_index; - // Overwrites the pk index. - *pk_index = i as PkIndex; + key_positions[i] = pk_index; + pk_to_index_map.insert(full_pk, i as PkIndex); + if let Some(sparse_key) = sparse_key { + pk_to_index_map.insert(sparse_key, i as PkIndex); + } } + self.num_keys = 0; let key_bytes_in_index = self.key_bytes_in_index; self.key_bytes_in_index = 0; - *pk_to_index = std::mem::take(&mut self.pk_to_index); - Some(KeyDict { - dict_blocks: std::mem::take(&mut self.dict_blocks), - key_positions, - key_bytes_in_index, - }) + Some(( + KeyDict { + dict_blocks: std::mem::take(&mut self.dict_blocks), + key_positions, + key_bytes_in_index, + }, + pk_to_index_map, + )) } /// Reads the builder. pub fn read(&self) -> DictBuilderReader { - let sorted_pk_indices = self.pk_to_index.values().copied().collect(); + let sorted_pk_indices = self.pk_to_index.values().map(|v| v.0).collect(); let block = self.key_buffer.finish_cloned(); let mut blocks = Vec::with_capacity(self.dict_blocks.len() + 1); blocks.extend_from_slice(&self.dict_blocks); @@ -394,7 +415,7 @@ mod tests { let mut metrics = WriteMetrics::default(); for key in &keys { assert!(!builder.is_full()); - let pk_index = builder.insert_key(key, &mut metrics); + let pk_index = builder.insert_key(key, None, &mut metrics); last_pk_index = Some(pk_index); } assert_eq!(num_keys - 1, last_pk_index.unwrap()); @@ -426,14 +447,14 @@ mod tests { for i in 0..num_keys { // Each key is 5 bytes. let key = format!("{i:05}"); - builder.insert_key(key.as_bytes(), &mut metrics); + builder.insert_key(key.as_bytes(), None, &mut metrics); } let key_bytes = num_keys as usize * 5; assert_eq!(key_bytes * 2, metrics.key_bytes); assert_eq!(key_bytes, builder.key_bytes_in_index); assert_eq!(8850, builder.memory_size()); - let dict = builder.finish(&mut BTreeMap::new()).unwrap(); + let (dict, _) = builder.finish().unwrap(); assert_eq!(0, builder.key_bytes_in_index); assert_eq!(key_bytes, dict.key_bytes_in_index); assert!(dict.shared_memory_size() > key_bytes); @@ -446,12 +467,12 @@ mod tests { for i in 0..MAX_KEYS_PER_BLOCK * 2 { let key = format!("{i:010}"); assert!(!builder.is_full()); - builder.insert_key(key.as_bytes(), &mut metrics); + builder.insert_key(key.as_bytes(), None, &mut metrics); } assert!(builder.is_full()); - builder.finish(&mut BTreeMap::new()); + builder.finish(); assert!(!builder.is_full()); - assert_eq!(0, builder.insert_key(b"a0", &mut metrics)); + assert_eq!(0, builder.insert_key(b"a0", None, &mut metrics)); } } diff --git a/src/mito2/src/memtable/partition_tree/partition.rs b/src/mito2/src/memtable/partition_tree/partition.rs index 3f38206dd8e7..740ea9a5bf8c 100644 --- a/src/mito2/src/memtable/partition_tree/partition.rs +++ b/src/mito2/src/memtable/partition_tree/partition.rs @@ -89,15 +89,18 @@ impl Partition { let sparse_key = primary_key.clone(); primary_key.clear(); row_codec.encode_to_vec(key_value.primary_keys(), primary_key)?; - let pk_id = inner - .shard_builder - .write_with_key(primary_key, &key_value, metrics); + let pk_id = inner.shard_builder.write_with_key( + primary_key, + Some(&sparse_key), + &key_value, + metrics, + ); inner.pk_to_pk_id.insert(sparse_key, pk_id); } else { // `primary_key` is already the full primary key. let pk_id = inner .shard_builder - .write_with_key(primary_key, &key_value, metrics); + .write_with_key(primary_key, None, &key_value, metrics); inner.pk_to_pk_id.insert(std::mem::take(primary_key), pk_id); }; diff --git a/src/mito2/src/memtable/partition_tree/shard.rs b/src/mito2/src/memtable/partition_tree/shard.rs index 33ca0e414b28..1229ca8f4965 100644 --- a/src/mito2/src/memtable/partition_tree/shard.rs +++ b/src/mito2/src/memtable/partition_tree/shard.rs @@ -423,7 +423,6 @@ impl Node for ShardNode { #[cfg(test)] mod tests { - use std::collections::BTreeMap; use std::sync::Arc; use super::*; @@ -488,10 +487,10 @@ mod tests { encode_keys(&metadata, kvs, &mut keys); } for key in &keys { - dict_builder.insert_key(key, &mut metrics); + dict_builder.insert_key(key, None, &mut metrics); } - let dict = dict_builder.finish(&mut BTreeMap::new()).unwrap(); + let (dict, _) = dict_builder.finish().unwrap(); let data_parts = DataParts::new(metadata, DATA_INIT_CAP, true); Shard::new( diff --git a/src/mito2/src/memtable/partition_tree/shard_builder.rs b/src/mito2/src/memtable/partition_tree/shard_builder.rs index 12739d16d3cb..a67e99adb4fa 100644 --- a/src/mito2/src/memtable/partition_tree/shard_builder.rs +++ b/src/mito2/src/memtable/partition_tree/shard_builder.rs @@ -14,7 +14,7 @@ //! Builder of a shard. -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -71,12 +71,15 @@ impl ShardBuilder { /// Write a key value with its encoded primary key. pub fn write_with_key( &mut self, - primary_key: &[u8], + full_primary_key: &[u8], + sparse_key: Option<&[u8]>, key_value: &KeyValue, metrics: &mut WriteMetrics, ) -> PkId { // Safety: we check whether the builder need to freeze before. - let pk_index = self.dict_builder.insert_key(primary_key, metrics); + let pk_index = self + .dict_builder + .insert_key(full_primary_key, sparse_key, metrics); self.data_buffer.write_row(pk_index, key_value); PkId { shard_id: self.current_shard_id, @@ -106,10 +109,8 @@ impl ShardBuilder { return Ok(None); } - let mut pk_to_index = BTreeMap::new(); - let key_dict = self.dict_builder.finish(&mut pk_to_index); - let data_part = match &key_dict { - Some(dict) => { + let (data_part, key_dict) = match self.dict_builder.finish() { + Some((dict, pk_to_index)) => { // Adds mapping to the map. pk_to_pk_id.reserve(pk_to_index.len()); for (k, pk_index) in pk_to_index { @@ -123,11 +124,12 @@ impl ShardBuilder { } let pk_weights = dict.pk_weights_to_sort_data(); - self.data_buffer.freeze(Some(&pk_weights), true)? + let part = self.data_buffer.freeze(Some(&pk_weights), true)?; + (part, Some(dict)) } None => { let pk_weights = [0]; - self.data_buffer.freeze(Some(&pk_weights), true)? + (self.data_buffer.freeze(Some(&pk_weights), true)?, None) } }; @@ -367,7 +369,7 @@ mod tests { for key_values in &input { for kv in key_values.iter() { let key = encode_key_by_kv(&kv); - shard_builder.write_with_key(&key, &kv, &mut metrics); + shard_builder.write_with_key(&key, None, &kv, &mut metrics); } } let shard = shard_builder @@ -389,7 +391,7 @@ mod tests { for key_values in &input { for kv in key_values.iter() { let key = encode_key_by_kv(&kv); - shard_builder.write_with_key(&key, &kv, &mut metrics); + shard_builder.write_with_key(&key, None, &kv, &mut metrics); } } From c78e14be94bbabf42840d6129190dc737a034084 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Tue, 14 May 2024 02:22:19 +0000 Subject: [PATCH 2/6] test: add unit test --- src/mito2/src/memtable/partition_tree.rs | 173 ++++++++++++++++++++++- src/mito2/src/test_util/memtable_util.rs | 2 +- 2 files changed, 173 insertions(+), 2 deletions(-) diff --git a/src/mito2/src/memtable/partition_tree.rs b/src/mito2/src/memtable/partition_tree.rs index fccbf8387c4a..d2435e0790c0 100644 --- a/src/mito2/src/memtable/partition_tree.rs +++ b/src/mito2/src/memtable/partition_tree.rs @@ -311,14 +311,24 @@ impl MemtableBuilder for PartitionTreeMemtableBuilder { #[cfg(test)] mod tests { + use api::v1::value::ValueData; + use api::v1::{Row, Rows, SemanticType}; use common_time::Timestamp; use datafusion_common::{Column, ScalarValue}; use datafusion_expr::{BinaryExpr, Expr, Operator}; + use datatypes::data_type::ConcreteDataType; use datatypes::scalars::ScalarVector; + use datatypes::schema::ColumnSchema; + use datatypes::value::Value; use datatypes::vectors::Int64Vector; + use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; + use store_api::storage::RegionId; use super::*; - use crate::test_util::memtable_util::{self, collect_iter_timestamps}; + use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; + use crate::test_util::memtable_util::{ + self, collect_iter_timestamps, region_metadata_to_row_schema, + }; #[test] fn test_memtable_sorted_input() { @@ -562,4 +572,165 @@ mod tests { assert!(config.dedup); assert_eq!(PartitionTreeConfig::default(), config); } + + fn metadata_for_metric_engine() -> RegionMetadataRef { + let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "__table_id", + ConcreteDataType::uint32_datatype(), + false, + ), + semantic_type: SemanticType::Tag, + column_id: 2147483652, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "__ts_id", + ConcreteDataType::uint64_datatype(), + false, + ), + semantic_type: SemanticType::Tag, + column_id: 2147483651, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "test_label", + ConcreteDataType::string_datatype(), + false, + ), + semantic_type: SemanticType::Tag, + column_id: 2, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "greptime_timestamp", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 0, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "greptime_value", + ConcreteDataType::float64_datatype(), + true, + ), + semantic_type: SemanticType::Field, + column_id: 1, + }) + .primary_key(vec![2147483652, 2147483651, 2]); + let region_metadata = builder.build().unwrap(); + Arc::new(region_metadata) + } + + fn build_key_values( + metadata: RegionMetadataRef, + labels: &[&str], + table_id: &[u32], + ts_id: &[u64], + ts: &[i64], + values: &[f64], + sequence: u64, + ) -> KeyValues { + let column_schema = region_metadata_to_row_schema(&metadata); + + let rows = ts + .iter() + .zip(table_id.iter()) + .zip(ts_id.iter()) + .zip(labels.iter()) + .zip(values.iter()) + .map(|((((ts, table_id), ts_id), label), val)| Row { + values: vec![ + api::v1::Value { + value_data: Some(ValueData::U32Value(*table_id)), + }, + api::v1::Value { + value_data: Some(ValueData::U64Value(*ts_id)), + }, + api::v1::Value { + value_data: Some(ValueData::StringValue(label.to_string())), + }, + api::v1::Value { + value_data: Some(ValueData::TimestampMillisecondValue(*ts)), + }, + api::v1::Value { + value_data: Some(ValueData::F64Value(*val)), + }, + ], + }) + .collect(); + let mutation = api::v1::Mutation { + op_type: 1, + sequence, + rows: Some(Rows { + schema: column_schema, + rows, + }), + }; + KeyValues::new(metadata.as_ref(), mutation).unwrap() + } + #[test] + fn test_write_freeze() { + let metadata = metadata_for_metric_engine(); + let memtable = PartitionTreeMemtableBuilder::new( + PartitionTreeConfig { + index_max_keys_per_shard: 40, + ..Default::default() + }, + None, + ) + .build(1, &metadata); + + let codec = McmpRowCodec::new( + metadata + .primary_key_columns() + .map(|c| SortField::new(c.column_schema.data_type.clone())) + .collect(), + ); + + memtable + .write(&build_key_values( + metadata.clone(), + &["daily", "10min", "daily", "10min"], + &[1025, 1025, 1025, 1025], + &[ + 16442255374049317291, + 5686004715529701024, + 16442255374049317291, + 5686004715529701024, + ], + &[1712070000000, 1712717731000, 1712761200000, 1712761200000], + &[0.0, 0.0, 0.0, 0.0], + 1, + )) + .unwrap(); + + memtable.freeze().unwrap(); + let new_memtable = memtable.fork(2, &metadata); + + new_memtable + .write(&build_key_values( + metadata.clone(), + &["10min"], + &[1025], + &[5686004715529701024], + &[1714643131000], + &[0.1], + 2, + )) + .unwrap(); + + let mut reader = new_memtable.iter(None, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + let pk = codec.decode(batch.primary_key()).unwrap(); + if let Value::String(s) = &pk[2] { + assert_eq!("10min", s.as_utf8()); + } else { + unreachable!() + } + } } diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs index 1628ecf56e7c..b2764ba88957 100644 --- a/src/mito2/src/test_util/memtable_util.rs +++ b/src/mito2/src/test_util/memtable_util.rs @@ -115,7 +115,7 @@ pub fn metadata_with_primary_key( enable_table_id: bool, ) -> RegionMetadataRef { let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456)); - let maybe_table_id = if enable_table_id { "table_id" } else { "k1" }; + let maybe_table_id = if enable_table_id { "__table_id" } else { "k1" }; builder .push_column_metadata(ColumnMetadata { column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false), From a30caf5f6c4fa3e485b6f5e7c8ad498b290d1ad4 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Tue, 14 May 2024 03:05:38 +0000 Subject: [PATCH 3/6] chore: avoid updating full primary keys --- src/mito2/src/memtable/partition_tree/dict.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/mito2/src/memtable/partition_tree/dict.rs b/src/mito2/src/memtable/partition_tree/dict.rs index 6fc0f170cf76..a7758876e160 100644 --- a/src/mito2/src/memtable/partition_tree/dict.rs +++ b/src/mito2/src/memtable/partition_tree/dict.rs @@ -131,13 +131,12 @@ impl KeyDictBuilder { // Computes key position and then alter pk index. let mut key_positions = vec![0; self.pk_to_index.len()]; - for (i, (full_pk, (pk_index, sparse_key))) in (std::mem::take(&mut self.pk_to_index)) + for (i, (_full_pk, (pk_index, sparse_key))) in (std::mem::take(&mut self.pk_to_index)) .into_iter() .enumerate() { // The position of the i-th key is the old pk index. key_positions[i] = pk_index; - pk_to_index_map.insert(full_pk, i as PkIndex); if let Some(sparse_key) = sparse_key { pk_to_index_map.insert(sparse_key, i as PkIndex); } From c6f86614032892b48c4680314752ef43c49df6ec Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Tue, 14 May 2024 06:55:24 +0000 Subject: [PATCH 4/6] fix: style --- src/mito2/src/memtable/partition_tree.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/mito2/src/memtable/partition_tree.rs b/src/mito2/src/memtable/partition_tree.rs index d2435e0790c0..ab29139aec14 100644 --- a/src/mito2/src/memtable/partition_tree.rs +++ b/src/mito2/src/memtable/partition_tree.rs @@ -673,6 +673,7 @@ mod tests { }; KeyValues::new(metadata.as_ref(), mutation).unwrap() } + #[test] fn test_write_freeze() { let metadata = metadata_for_metric_engine(); From 34ff396a8a79ee5c5d6eaae07d5c55f1c9b26fde Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Tue, 14 May 2024 11:50:48 +0000 Subject: [PATCH 5/6] chore: add some doc for PkIndexMap --- src/mito2/src/memtable/partition_tree/dict.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/mito2/src/memtable/partition_tree/dict.rs b/src/mito2/src/memtable/partition_tree/dict.rs index a7758876e160..3b0a02026105 100644 --- a/src/mito2/src/memtable/partition_tree/dict.rs +++ b/src/mito2/src/memtable/partition_tree/dict.rs @@ -26,6 +26,8 @@ use crate::metrics::MEMTABLE_DICT_BYTES; /// Maximum keys in a [DictBlock]. const MAX_KEYS_PER_BLOCK: u16 = 256; +/// The key is mcmp-encoded primary keys, while the values are the pk index and +/// optionally sparsely encoded primary keys. type PkIndexMap = BTreeMap, (PkIndex, Option>)>; /// Builder to build a key dictionary. From 81afee63edfdaef6c409cb71f3ff89242a2013c1 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Tue, 14 May 2024 12:41:45 +0000 Subject: [PATCH 6/6] chore: update some doc --- src/mito2/src/memtable/partition_tree.rs | 2 +- src/mito2/src/memtable/partition_tree/dict.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/mito2/src/memtable/partition_tree.rs b/src/mito2/src/memtable/partition_tree.rs index ab29139aec14..7db9a4877b81 100644 --- a/src/mito2/src/memtable/partition_tree.rs +++ b/src/mito2/src/memtable/partition_tree.rs @@ -587,7 +587,7 @@ mod tests { }) .push_column_metadata(ColumnMetadata { column_schema: ColumnSchema::new( - "__ts_id", + "__tsid", ConcreteDataType::uint64_datatype(), false, ), diff --git a/src/mito2/src/memtable/partition_tree/dict.rs b/src/mito2/src/memtable/partition_tree/dict.rs index 3b0a02026105..52217dc94bc2 100644 --- a/src/mito2/src/memtable/partition_tree/dict.rs +++ b/src/mito2/src/memtable/partition_tree/dict.rs @@ -120,7 +120,7 @@ impl KeyDictBuilder { .sum::() } - /// Finishes the builder. + /// Finishes the builder. The key of the second BTreeMap is sparse-encoded bytes. pub fn finish(&mut self) -> Option<(KeyDict, BTreeMap, PkIndex>)> { if self.key_buffer.is_empty() { return None;