Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add data type to vector cache key #3876

Merged
merged 4 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 21 additions & 10 deletions src/mito2/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use datatypes::vectors::VectorRef;
use moka::sync::Cache;
use parquet::column::page::Page;
use parquet::file::metadata::ParquetMetaData;
use store_api::storage::RegionId;
use store_api::storage::{ConcreteDataType, RegionId};

use crate::cache::cache_size::parquet_meta_size;
use crate::cache::file_cache::{FileType, IndexKey};
Expand Down Expand Up @@ -123,16 +123,21 @@ impl CacheManager {
}

/// Gets a vector with repeated value for specific `key`.
pub fn get_repeated_vector(&self, key: &Value) -> Option<VectorRef> {
pub fn get_repeated_vector(
&self,
data_type: &ConcreteDataType,
value: &Value,
) -> Option<VectorRef> {
self.vector_cache.as_ref().and_then(|vector_cache| {
let value = vector_cache.get(key);
let value = vector_cache.get(&(data_type.clone(), value.clone()));
update_hit_miss(value, VECTOR_TYPE)
})
}

/// Puts a vector with repeated value into the cache.
pub fn put_repeated_vector(&self, key: Value, vector: VectorRef) {
pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
if let Some(cache) = &self.vector_cache {
let key = (vector.data_type(), value);
CACHE_BYTES
.with_label_values(&[VECTOR_TYPE])
.add(vector_cache_weight(&key, &vector).into());
Expand Down Expand Up @@ -249,9 +254,9 @@ fn meta_cache_weight(k: &SstMetaKey, v: &Arc<ParquetMetaData>) -> u32 {
(k.estimated_size() + parquet_meta_size(v)) as u32
}

fn vector_cache_weight(_k: &Value, v: &VectorRef) -> u32 {
fn vector_cache_weight(_k: &(ConcreteDataType, Value), v: &VectorRef) -> u32 {
// We ignore the heap size of `Value`.
(mem::size_of::<Value>() + v.memory_size()) as u32
(mem::size_of::<ConcreteDataType>() + mem::size_of::<Value>() + v.memory_size()) as u32
}

fn page_cache_weight(k: &PageKey, v: &Arc<PageValue>) -> u32 {
Expand Down Expand Up @@ -323,7 +328,7 @@ type SstMetaCache = Cache<SstMetaKey, Arc<ParquetMetaData>>;
/// Maps [Value] to a vector that holds this value repeatedly.
///
/// e.g. `"hello" => ["hello", "hello", "hello"]`
type VectorCache = Cache<Value, VectorRef>;
type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>;
/// Maps (region, file, row group, column) to [PageValue].
type PageCache = Cache<PageKey, Arc<PageValue>>;

Expand Down Expand Up @@ -353,7 +358,9 @@ mod tests {
let value = Value::Int64(10);
let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
cache.put_repeated_vector(value.clone(), vector.clone());
assert!(cache.get_repeated_vector(&value).is_none());
assert!(cache
.get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
.is_none());

let key = PageKey {
region_id,
Expand Down Expand Up @@ -394,10 +401,14 @@ mod tests {
fn test_repeated_vector_cache() {
let cache = CacheManager::builder().vector_cache_size(4096).build();
let value = Value::Int64(10);
assert!(cache.get_repeated_vector(&value).is_none());
assert!(cache
.get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
.is_none());
let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
cache.put_repeated_vector(value.clone(), vector.clone());
let cached = cache.get_repeated_vector(&value).unwrap();
let cached = cache
.get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
.unwrap();
assert_eq!(vector, cached);
}

Expand Down
105 changes: 103 additions & 2 deletions src/mito2/src/engine/basic_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
use std::collections::HashMap;

use api::v1::value::ValueData;
use api::v1::Rows;
use api::v1::{Rows, SemanticType};
use common_base::readable_size::ReadableSize;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_recordbatch::RecordBatches;
use datatypes::prelude::ConcreteDataType;
use store_api::region_request::{RegionOpenRequest, RegionPutRequest};
use datatypes::schema::ColumnSchema;
use store_api::metadata::ColumnMetadata;
use store_api::region_request::{RegionCreateRequest, RegionOpenRequest, RegionPutRequest};
use store_api::storage::RegionId;

use super::*;
Expand Down Expand Up @@ -598,3 +600,102 @@ async fn test_engine_with_write_cache() {
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}

#[tokio::test]
async fn test_cache_null_primary_key() {
let mut env = TestEnv::new();
let engine = env
.create_engine(MitoConfig {
vector_cache_size: ReadableSize::mb(32),
..Default::default()
})
.await;

let region_id = RegionId::new(1, 1);
let column_metadatas = vec![
ColumnMetadata {
column_schema: ColumnSchema::new("tag_0", ConcreteDataType::string_datatype(), true),
semantic_type: SemanticType::Tag,
column_id: 1,
},
ColumnMetadata {
column_schema: ColumnSchema::new("tag_1", ConcreteDataType::int64_datatype(), true),
semantic_type: SemanticType::Tag,
column_id: 2,
},
ColumnMetadata {
column_schema: ColumnSchema::new("field_0", ConcreteDataType::float64_datatype(), true),
semantic_type: SemanticType::Field,
column_id: 3,
},
ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 4,
},
];
let request = RegionCreateRequest {
engine: MITO_ENGINE_NAME.to_string(),
column_metadatas,
primary_key: vec![1, 2],
options: HashMap::new(),
region_dir: "test".to_string(),
};

let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

let rows = Rows {
schema: column_schemas,
rows: vec![
api::v1::Row {
values: vec![
api::v1::Value {
value_data: Some(ValueData::StringValue("1".to_string())),
},
api::v1::Value { value_data: None },
api::v1::Value {
value_data: Some(ValueData::F64Value(10.0)),
},
api::v1::Value {
value_data: Some(ValueData::TimestampMillisecondValue(1000)),
},
],
},
api::v1::Row {
values: vec![
api::v1::Value { value_data: None },
api::v1::Value {
value_data: Some(ValueData::I64Value(200)),
},
api::v1::Value {
value_data: Some(ValueData::F64Value(20.0)),
},
api::v1::Value {
value_data: Some(ValueData::TimestampMillisecondValue(2000)),
},
],
},
],
};
put_rows(&engine, region_id, rows).await;

let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+-------+---------+---------------------+
| tag_0 | tag_1 | field_0 | ts |
+-------+-------+---------+---------------------+
| | 200 | 20.0 | 1970-01-01T00:00:02 |
| 1 | | 10.0 | 1970-01-01T00:00:01 |
+-------+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}
14 changes: 10 additions & 4 deletions src/mito2/src/read/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ fn repeated_vector_with_cache(
num_rows: usize,
cache_manager: &CacheManager,
) -> common_recordbatch::error::Result<VectorRef> {
if let Some(vector) = cache_manager.get_repeated_vector(value) {
if let Some(vector) = cache_manager.get_repeated_vector(data_type, value) {
// Tries to get the vector from cache manager. If the vector doesn't
// have enough length, creates a new one.
match vector.len().cmp(&num_rows) {
Expand Down Expand Up @@ -366,9 +366,15 @@ mod tests {
+---------------------+----+----+----+----+";
assert_eq!(expect, print_record_batch(record_batch));

assert!(cache.get_repeated_vector(&Value::Int64(1)).is_some());
assert!(cache.get_repeated_vector(&Value::Int64(2)).is_some());
assert!(cache.get_repeated_vector(&Value::Int64(3)).is_none());
assert!(cache
.get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(1))
.is_some());
assert!(cache
.get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(2))
.is_some());
assert!(cache
.get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(3))
.is_none());
let record_batch = mapper.convert(&batch, Some(&cache)).unwrap();
assert_eq!(expect, print_record_batch(record_batch));
}
Expand Down
54 changes: 54 additions & 0 deletions tests/cases/standalone/common/insert/nullable_tag.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
CREATE TABLE `esT`(
`eT` TIMESTAMP(3) TIME INDEX,
`eAque` BOOLEAN,
`DoLOruM` INT,
`repudiAndae` STRING,
`ULLaM` BOOLEAN,
`COnSECTeTuR` SMALLINT DEFAULT -31852,
`DOLOrIBUS` FLOAT NOT NULL,
`QUiS` SMALLINT NULL,
`consEquatuR` BOOLEAN NOT NULL,
`vERO` BOOLEAN,
PRIMARY KEY(`repudiAndae`, `ULLaM`, `DoLOruM`)
);

Affected Rows: 0

INSERT INTO `esT` (
`consEquatuR`,
`eAque`,
`eT`,
`repudiAndae`,
`DOLOrIBUS`
)
VALUES
(
false,
false,
'+234049-06-04 01:11:41.163+0000',
'hello',
0.97377783
),
(
false,
true,
'-19578-12-20 11:45:59.875+0000',
NULL,
0.3535998
);

Affected Rows: 2

SELECT * FROM `esT` order by `eT` desc;

+----------------------------+-------+---------+-------------+-------+-------------+------------+------+-------------+------+
| eT | eAque | DoLOruM | repudiAndae | ULLaM | COnSECTeTuR | DOLOrIBUS | QUiS | consEquatuR | vERO |
+----------------------------+-------+---------+-------------+-------+-------------+------------+------+-------------+------+
| +234049-06-04T01:11:41.163 | false | | hello | | -31852 | 0.97377783 | | false | |
| -19578-12-20T11:45:59.875 | true | | | | -31852 | 0.3535998 | | false | |
+----------------------------+-------+---------+-------------+-------+-------------+------------+------+-------------+------+

DROP TABLE `esT`;

Affected Rows: 0

40 changes: 40 additions & 0 deletions tests/cases/standalone/common/insert/nullable_tag.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
CREATE TABLE `esT`(
`eT` TIMESTAMP(3) TIME INDEX,
`eAque` BOOLEAN,
`DoLOruM` INT,
`repudiAndae` STRING,
`ULLaM` BOOLEAN,
`COnSECTeTuR` SMALLINT DEFAULT -31852,
`DOLOrIBUS` FLOAT NOT NULL,
`QUiS` SMALLINT NULL,
`consEquatuR` BOOLEAN NOT NULL,
`vERO` BOOLEAN,
PRIMARY KEY(`repudiAndae`, `ULLaM`, `DoLOruM`)
);

INSERT INTO `esT` (
`consEquatuR`,
`eAque`,
`eT`,
`repudiAndae`,
`DOLOrIBUS`
)
VALUES
(
false,
false,
'+234049-06-04 01:11:41.163+0000',
'hello',
0.97377783
),
(
false,
true,
'-19578-12-20 11:45:59.875+0000',
NULL,
0.3535998
);

SELECT * FROM `esT` order by `eT` desc;

DROP TABLE `esT`;
Loading