Skip to content

Commit

Permalink
fix(metric engine): label mismatch in metric engine (#3927)
Browse files Browse the repository at this point in the history
* fix: label mismatch

* test: add unit test

* chore: avoid updating full primary keys

* fix: style

* chore: add some doc for PkIndexMap

* chore: update some doc
  • Loading branch information
v0y4g3r authored May 14, 2024
1 parent 4ca7ac7 commit c04d024
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 47 deletions.
174 changes: 173 additions & 1 deletion src/mito2/src/memtable/partition_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,14 +311,24 @@ impl MemtableBuilder for PartitionTreeMemtableBuilder {

#[cfg(test)]
mod tests {
use api::v1::value::ValueData;
use api::v1::{Row, Rows, SemanticType};
use common_time::Timestamp;
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::{BinaryExpr, Expr, Operator};
use datatypes::data_type::ConcreteDataType;
use datatypes::scalars::ScalarVector;
use datatypes::schema::ColumnSchema;
use datatypes::value::Value;
use datatypes::vectors::Int64Vector;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;

use super::*;
use crate::test_util::memtable_util::{self, collect_iter_timestamps};
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
use crate::test_util::memtable_util::{
self, collect_iter_timestamps, region_metadata_to_row_schema,
};

#[test]
fn test_memtable_sorted_input() {
Expand Down Expand Up @@ -562,4 +572,166 @@ mod tests {
assert!(config.dedup);
assert_eq!(PartitionTreeConfig::default(), config);
}

fn metadata_for_metric_engine() -> RegionMetadataRef {
let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"__table_id",
ConcreteDataType::uint32_datatype(),
false,
),
semantic_type: SemanticType::Tag,
column_id: 2147483652,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"__tsid",
ConcreteDataType::uint64_datatype(),
false,
),
semantic_type: SemanticType::Tag,
column_id: 2147483651,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"test_label",
ConcreteDataType::string_datatype(),
false,
),
semantic_type: SemanticType::Tag,
column_id: 2,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"greptime_timestamp",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 0,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"greptime_value",
ConcreteDataType::float64_datatype(),
true,
),
semantic_type: SemanticType::Field,
column_id: 1,
})
.primary_key(vec![2147483652, 2147483651, 2]);
let region_metadata = builder.build().unwrap();
Arc::new(region_metadata)
}

fn build_key_values(
metadata: RegionMetadataRef,
labels: &[&str],
table_id: &[u32],
ts_id: &[u64],
ts: &[i64],
values: &[f64],
sequence: u64,
) -> KeyValues {
let column_schema = region_metadata_to_row_schema(&metadata);

let rows = ts
.iter()
.zip(table_id.iter())
.zip(ts_id.iter())
.zip(labels.iter())
.zip(values.iter())
.map(|((((ts, table_id), ts_id), label), val)| Row {
values: vec![
api::v1::Value {
value_data: Some(ValueData::U32Value(*table_id)),
},
api::v1::Value {
value_data: Some(ValueData::U64Value(*ts_id)),
},
api::v1::Value {
value_data: Some(ValueData::StringValue(label.to_string())),
},
api::v1::Value {
value_data: Some(ValueData::TimestampMillisecondValue(*ts)),
},
api::v1::Value {
value_data: Some(ValueData::F64Value(*val)),
},
],
})
.collect();
let mutation = api::v1::Mutation {
op_type: 1,
sequence,
rows: Some(Rows {
schema: column_schema,
rows,
}),
};
KeyValues::new(metadata.as_ref(), mutation).unwrap()
}

#[test]
fn test_write_freeze() {
let metadata = metadata_for_metric_engine();
let memtable = PartitionTreeMemtableBuilder::new(
PartitionTreeConfig {
index_max_keys_per_shard: 40,
..Default::default()
},
None,
)
.build(1, &metadata);

let codec = McmpRowCodec::new(
metadata
.primary_key_columns()
.map(|c| SortField::new(c.column_schema.data_type.clone()))
.collect(),
);

memtable
.write(&build_key_values(
metadata.clone(),
&["daily", "10min", "daily", "10min"],
&[1025, 1025, 1025, 1025],
&[
16442255374049317291,
5686004715529701024,
16442255374049317291,
5686004715529701024,
],
&[1712070000000, 1712717731000, 1712761200000, 1712761200000],
&[0.0, 0.0, 0.0, 0.0],
1,
))
.unwrap();

memtable.freeze().unwrap();
let new_memtable = memtable.fork(2, &metadata);

new_memtable
.write(&build_key_values(
metadata.clone(),
&["10min"],
&[1025],
&[5686004715529701024],
&[1714643131000],
&[0.1],
2,
))
.unwrap();

let mut reader = new_memtable.iter(None, None).unwrap();
let batch = reader.next().unwrap().unwrap();
let pk = codec.decode(batch.primary_key()).unwrap();
if let Value::String(s) = &pk[2] {
assert_eq!("10min", s.as_utf8());
} else {
unreachable!()
}
}
}
76 changes: 49 additions & 27 deletions src/mito2/src/memtable/partition_tree/dict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use crate::metrics::MEMTABLE_DICT_BYTES;
/// Maximum keys in a [DictBlock].
const MAX_KEYS_PER_BLOCK: u16 = 256;

type PkIndexMap = BTreeMap<Vec<u8>, PkIndex>;
/// The key is mcmp-encoded primary keys, while the values are the pk index and
/// optionally sparsely encoded primary keys.
type PkIndexMap = BTreeMap<Vec<u8>, (PkIndex, Option<Vec<u8>>)>;

/// Builder to build a key dictionary.
pub struct KeyDictBuilder {
Expand Down Expand Up @@ -66,10 +68,15 @@ impl KeyDictBuilder {
///
/// # Panics
/// Panics if the builder is full.
pub fn insert_key(&mut self, key: &[u8], metrics: &mut WriteMetrics) -> PkIndex {
pub fn insert_key(
&mut self,
full_primary_key: &[u8],
sparse_key: Option<&[u8]>,
metrics: &mut WriteMetrics,
) -> PkIndex {
assert!(!self.is_full());

if let Some(pk_index) = self.pk_to_index.get(key).copied() {
if let Some(pk_index) = self.pk_to_index.get(full_primary_key).map(|v| v.0) {
// Already in the builder.
return pk_index;
}
Expand All @@ -81,16 +88,22 @@ impl KeyDictBuilder {
}

// Safety: we have checked the buffer length.
let pk_index = self.key_buffer.push_key(key);
self.pk_to_index.insert(key.to_vec(), pk_index);
let pk_index = self.key_buffer.push_key(full_primary_key);
let (sparse_key, sparse_key_len) = if let Some(sparse_key) = sparse_key {
(Some(sparse_key.to_vec()), sparse_key.len())
} else {
(None, 0)
};
self.pk_to_index
.insert(full_primary_key.to_vec(), (pk_index, sparse_key));
self.num_keys += 1;

// Since we store the key twice so the bytes usage doubled.
metrics.key_bytes += key.len() * 2;
self.key_bytes_in_index += key.len();
metrics.key_bytes += full_primary_key.len() * 2 + sparse_key_len;
self.key_bytes_in_index += full_primary_key.len();

// Adds key size of index to the metrics.
MEMTABLE_DICT_BYTES.add(key.len() as i64);
MEMTABLE_DICT_BYTES.add((full_primary_key.len() + sparse_key_len) as i64);

pk_index
}
Expand All @@ -107,38 +120,47 @@ impl KeyDictBuilder {
.sum::<usize>()
}

/// Finishes the builder.
pub fn finish(&mut self, pk_to_index: &mut BTreeMap<Vec<u8>, PkIndex>) -> Option<KeyDict> {
/// Finishes the builder. The key of the second BTreeMap is sparse-encoded bytes.
pub fn finish(&mut self) -> Option<(KeyDict, BTreeMap<Vec<u8>, PkIndex>)> {
if self.key_buffer.is_empty() {
return None;
}
let mut pk_to_index_map = BTreeMap::new();

// Finishes current dict block and resets the pk index.
let dict_block = self.key_buffer.finish(true);
self.dict_blocks.push(dict_block);
// Computes key position and then alter pk index.
let mut key_positions = vec![0; self.pk_to_index.len()];
for (i, pk_index) in self.pk_to_index.values_mut().enumerate() {

for (i, (_full_pk, (pk_index, sparse_key))) in (std::mem::take(&mut self.pk_to_index))
.into_iter()
.enumerate()
{
// The position of the i-th key is the old pk index.
key_positions[i] = *pk_index;
// Overwrites the pk index.
*pk_index = i as PkIndex;
key_positions[i] = pk_index;
if let Some(sparse_key) = sparse_key {
pk_to_index_map.insert(sparse_key, i as PkIndex);
}
}

self.num_keys = 0;
let key_bytes_in_index = self.key_bytes_in_index;
self.key_bytes_in_index = 0;
*pk_to_index = std::mem::take(&mut self.pk_to_index);

Some(KeyDict {
dict_blocks: std::mem::take(&mut self.dict_blocks),
key_positions,
key_bytes_in_index,
})
Some((
KeyDict {
dict_blocks: std::mem::take(&mut self.dict_blocks),
key_positions,
key_bytes_in_index,
},
pk_to_index_map,
))
}

/// Reads the builder.
pub fn read(&self) -> DictBuilderReader {
let sorted_pk_indices = self.pk_to_index.values().copied().collect();
let sorted_pk_indices = self.pk_to_index.values().map(|v| v.0).collect();
let block = self.key_buffer.finish_cloned();
let mut blocks = Vec::with_capacity(self.dict_blocks.len() + 1);
blocks.extend_from_slice(&self.dict_blocks);
Expand Down Expand Up @@ -394,7 +416,7 @@ mod tests {
let mut metrics = WriteMetrics::default();
for key in &keys {
assert!(!builder.is_full());
let pk_index = builder.insert_key(key, &mut metrics);
let pk_index = builder.insert_key(key, None, &mut metrics);
last_pk_index = Some(pk_index);
}
assert_eq!(num_keys - 1, last_pk_index.unwrap());
Expand Down Expand Up @@ -426,14 +448,14 @@ mod tests {
for i in 0..num_keys {
// Each key is 5 bytes.
let key = format!("{i:05}");
builder.insert_key(key.as_bytes(), &mut metrics);
builder.insert_key(key.as_bytes(), None, &mut metrics);
}
let key_bytes = num_keys as usize * 5;
assert_eq!(key_bytes * 2, metrics.key_bytes);
assert_eq!(key_bytes, builder.key_bytes_in_index);
assert_eq!(8850, builder.memory_size());

let dict = builder.finish(&mut BTreeMap::new()).unwrap();
let (dict, _) = builder.finish().unwrap();
assert_eq!(0, builder.key_bytes_in_index);
assert_eq!(key_bytes, dict.key_bytes_in_index);
assert!(dict.shared_memory_size() > key_bytes);
Expand All @@ -446,12 +468,12 @@ mod tests {
for i in 0..MAX_KEYS_PER_BLOCK * 2 {
let key = format!("{i:010}");
assert!(!builder.is_full());
builder.insert_key(key.as_bytes(), &mut metrics);
builder.insert_key(key.as_bytes(), None, &mut metrics);
}
assert!(builder.is_full());
builder.finish(&mut BTreeMap::new());
builder.finish();

assert!(!builder.is_full());
assert_eq!(0, builder.insert_key(b"a0", &mut metrics));
assert_eq!(0, builder.insert_key(b"a0", None, &mut metrics));
}
}
11 changes: 7 additions & 4 deletions src/mito2/src/memtable/partition_tree/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,18 @@ impl Partition {
let sparse_key = primary_key.clone();
primary_key.clear();
row_codec.encode_to_vec(key_value.primary_keys(), primary_key)?;
let pk_id = inner
.shard_builder
.write_with_key(primary_key, &key_value, metrics);
let pk_id = inner.shard_builder.write_with_key(
primary_key,
Some(&sparse_key),
&key_value,
metrics,
);
inner.pk_to_pk_id.insert(sparse_key, pk_id);
} else {
// `primary_key` is already the full primary key.
let pk_id = inner
.shard_builder
.write_with_key(primary_key, &key_value, metrics);
.write_with_key(primary_key, None, &key_value, metrics);
inner.pk_to_pk_id.insert(std::mem::take(primary_key), pk_id);
};

Expand Down
5 changes: 2 additions & 3 deletions src/mito2/src/memtable/partition_tree/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,6 @@ impl Node for ShardNode {

#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use std::sync::Arc;

use super::*;
Expand Down Expand Up @@ -488,10 +487,10 @@ mod tests {
encode_keys(&metadata, kvs, &mut keys);
}
for key in &keys {
dict_builder.insert_key(key, &mut metrics);
dict_builder.insert_key(key, None, &mut metrics);
}

let dict = dict_builder.finish(&mut BTreeMap::new()).unwrap();
let (dict, _) = dict_builder.finish().unwrap();
let data_parts = DataParts::new(metadata, DATA_INIT_CAP, true);

Shard::new(
Expand Down
Loading

0 comments on commit c04d024

Please sign in to comment.