diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 33e72e1b7011..892ee4fb9274 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -29,7 +29,7 @@ use datatypes::vectors::VectorRef; use moka::sync::Cache; use parquet::column::page::Page; use parquet::file::metadata::ParquetMetaData; -use store_api::storage::RegionId; +use store_api::storage::{ConcreteDataType, RegionId}; use crate::cache::cache_size::parquet_meta_size; use crate::cache::file_cache::{FileType, IndexKey}; @@ -123,16 +123,21 @@ impl CacheManager { } /// Gets a vector with repeated value for specific `key`. - pub fn get_repeated_vector(&self, key: &Value) -> Option { + pub fn get_repeated_vector( + &self, + data_type: &ConcreteDataType, + value: &Value, + ) -> Option { self.vector_cache.as_ref().and_then(|vector_cache| { - let value = vector_cache.get(key); + let value = vector_cache.get(&(data_type.clone(), value.clone())); update_hit_miss(value, VECTOR_TYPE) }) } /// Puts a vector with repeated value into the cache. - pub fn put_repeated_vector(&self, key: Value, vector: VectorRef) { + pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) { if let Some(cache) = &self.vector_cache { + let key = (vector.data_type(), value); CACHE_BYTES .with_label_values(&[VECTOR_TYPE]) .add(vector_cache_weight(&key, &vector).into()); @@ -249,9 +254,9 @@ fn meta_cache_weight(k: &SstMetaKey, v: &Arc) -> u32 { (k.estimated_size() + parquet_meta_size(v)) as u32 } -fn vector_cache_weight(_k: &Value, v: &VectorRef) -> u32 { +fn vector_cache_weight(_k: &(ConcreteDataType, Value), v: &VectorRef) -> u32 { // We ignore the heap size of `Value`. - (mem::size_of::() + v.memory_size()) as u32 + (mem::size_of::() + mem::size_of::() + v.memory_size()) as u32 } fn page_cache_weight(k: &PageKey, v: &Arc) -> u32 { @@ -323,7 +328,7 @@ type SstMetaCache = Cache>; /// Maps [Value] to a vector that holds this value repeatedly. /// /// e.g. `"hello" => ["hello", "hello", "hello"]` -type VectorCache = Cache; +type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>; /// Maps (region, file, row group, column) to [PageValue]. type PageCache = Cache>; @@ -353,7 +358,9 @@ mod tests { let value = Value::Int64(10); let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10])); cache.put_repeated_vector(value.clone(), vector.clone()); - assert!(cache.get_repeated_vector(&value).is_none()); + assert!(cache + .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value) + .is_none()); let key = PageKey { region_id, @@ -394,10 +401,14 @@ mod tests { fn test_repeated_vector_cache() { let cache = CacheManager::builder().vector_cache_size(4096).build(); let value = Value::Int64(10); - assert!(cache.get_repeated_vector(&value).is_none()); + assert!(cache + .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value) + .is_none()); let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10])); cache.put_repeated_vector(value.clone(), vector.clone()); - let cached = cache.get_repeated_vector(&value).unwrap(); + let cached = cache + .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value) + .unwrap(); assert_eq!(vector, cached); } diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index 64c13778e3d4..a0f6b6df441b 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -17,13 +17,15 @@ use std::collections::HashMap; use api::v1::value::ValueData; -use api::v1::Rows; +use api::v1::{Rows, SemanticType}; use common_base::readable_size::ReadableSize; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_recordbatch::RecordBatches; use datatypes::prelude::ConcreteDataType; -use store_api::region_request::{RegionOpenRequest, RegionPutRequest}; +use datatypes::schema::ColumnSchema; +use store_api::metadata::ColumnMetadata; +use store_api::region_request::{RegionCreateRequest, RegionOpenRequest, RegionPutRequest}; use store_api::storage::RegionId; use super::*; @@ -598,3 +600,102 @@ async fn test_engine_with_write_cache() { +-------+---------+---------------------+"; assert_eq!(expected, batches.pretty_print().unwrap()); } + +#[tokio::test] +async fn test_cache_null_primary_key() { + let mut env = TestEnv::new(); + let engine = env + .create_engine(MitoConfig { + vector_cache_size: ReadableSize::mb(32), + ..Default::default() + }) + .await; + + let region_id = RegionId::new(1, 1); + let column_metadatas = vec![ + ColumnMetadata { + column_schema: ColumnSchema::new("tag_0", ConcreteDataType::string_datatype(), true), + semantic_type: SemanticType::Tag, + column_id: 1, + }, + ColumnMetadata { + column_schema: ColumnSchema::new("tag_1", ConcreteDataType::int64_datatype(), true), + semantic_type: SemanticType::Tag, + column_id: 2, + }, + ColumnMetadata { + column_schema: ColumnSchema::new("field_0", ConcreteDataType::float64_datatype(), true), + semantic_type: SemanticType::Field, + column_id: 3, + }, + ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 4, + }, + ]; + let request = RegionCreateRequest { + engine: MITO_ENGINE_NAME.to_string(), + column_metadatas, + primary_key: vec![1, 2], + options: HashMap::new(), + region_dir: "test".to_string(), + }; + + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let rows = Rows { + schema: column_schemas, + rows: vec![ + api::v1::Row { + values: vec![ + api::v1::Value { + value_data: Some(ValueData::StringValue("1".to_string())), + }, + api::v1::Value { value_data: None }, + api::v1::Value { + value_data: Some(ValueData::F64Value(10.0)), + }, + api::v1::Value { + value_data: Some(ValueData::TimestampMillisecondValue(1000)), + }, + ], + }, + api::v1::Row { + values: vec![ + api::v1::Value { value_data: None }, + api::v1::Value { + value_data: Some(ValueData::I64Value(200)), + }, + api::v1::Value { + value_data: Some(ValueData::F64Value(20.0)), + }, + api::v1::Value { + value_data: Some(ValueData::TimestampMillisecondValue(2000)), + }, + ], + }, + ], + }; + put_rows(&engine, region_id, rows).await; + + let request = ScanRequest::default(); + let stream = engine.handle_query(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+-------+---------+---------------------+ +| tag_0 | tag_1 | field_0 | ts | ++-------+-------+---------+---------------------+ +| | 200 | 20.0 | 1970-01-01T00:00:02 | +| 1 | | 10.0 | 1970-01-01T00:00:01 | ++-------+-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} diff --git a/src/mito2/src/read/projection.rs b/src/mito2/src/read/projection.rs index a6fab7267103..375248d4186c 100644 --- a/src/mito2/src/read/projection.rs +++ b/src/mito2/src/read/projection.rs @@ -241,7 +241,7 @@ fn repeated_vector_with_cache( num_rows: usize, cache_manager: &CacheManager, ) -> common_recordbatch::error::Result { - if let Some(vector) = cache_manager.get_repeated_vector(value) { + if let Some(vector) = cache_manager.get_repeated_vector(data_type, value) { // Tries to get the vector from cache manager. If the vector doesn't // have enough length, creates a new one. match vector.len().cmp(&num_rows) { @@ -366,9 +366,15 @@ mod tests { +---------------------+----+----+----+----+"; assert_eq!(expect, print_record_batch(record_batch)); - assert!(cache.get_repeated_vector(&Value::Int64(1)).is_some()); - assert!(cache.get_repeated_vector(&Value::Int64(2)).is_some()); - assert!(cache.get_repeated_vector(&Value::Int64(3)).is_none()); + assert!(cache + .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(1)) + .is_some()); + assert!(cache + .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(2)) + .is_some()); + assert!(cache + .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(3)) + .is_none()); let record_batch = mapper.convert(&batch, Some(&cache)).unwrap(); assert_eq!(expect, print_record_batch(record_batch)); } diff --git a/tests/cases/standalone/common/insert/nullable_tag.result b/tests/cases/standalone/common/insert/nullable_tag.result new file mode 100644 index 000000000000..8605b8f577c5 --- /dev/null +++ b/tests/cases/standalone/common/insert/nullable_tag.result @@ -0,0 +1,54 @@ +CREATE TABLE `esT`( + `eT` TIMESTAMP(3) TIME INDEX, + `eAque` BOOLEAN, + `DoLOruM` INT, + `repudiAndae` STRING, + `ULLaM` BOOLEAN, + `COnSECTeTuR` SMALLINT DEFAULT -31852, + `DOLOrIBUS` FLOAT NOT NULL, + `QUiS` SMALLINT NULL, + `consEquatuR` BOOLEAN NOT NULL, + `vERO` BOOLEAN, + PRIMARY KEY(`repudiAndae`, `ULLaM`, `DoLOruM`) +); + +Affected Rows: 0 + +INSERT INTO `esT` ( + `consEquatuR`, + `eAque`, + `eT`, + `repudiAndae`, + `DOLOrIBUS` +) +VALUES +( + false, + false, + '+234049-06-04 01:11:41.163+0000', + 'hello', + 0.97377783 +), +( + false, + true, + '-19578-12-20 11:45:59.875+0000', + NULL, + 0.3535998 +); + +Affected Rows: 2 + +SELECT * FROM `esT` order by `eT` desc; + ++----------------------------+-------+---------+-------------+-------+-------------+------------+------+-------------+------+ +| eT | eAque | DoLOruM | repudiAndae | ULLaM | COnSECTeTuR | DOLOrIBUS | QUiS | consEquatuR | vERO | ++----------------------------+-------+---------+-------------+-------+-------------+------------+------+-------------+------+ +| +234049-06-04T01:11:41.163 | false | | hello | | -31852 | 0.97377783 | | false | | +| -19578-12-20T11:45:59.875 | true | | | | -31852 | 0.3535998 | | false | | ++----------------------------+-------+---------+-------------+-------+-------------+------------+------+-------------+------+ + +DROP TABLE `esT`; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/insert/nullable_tag.sql b/tests/cases/standalone/common/insert/nullable_tag.sql new file mode 100644 index 000000000000..e73973a430e0 --- /dev/null +++ b/tests/cases/standalone/common/insert/nullable_tag.sql @@ -0,0 +1,40 @@ +CREATE TABLE `esT`( + `eT` TIMESTAMP(3) TIME INDEX, + `eAque` BOOLEAN, + `DoLOruM` INT, + `repudiAndae` STRING, + `ULLaM` BOOLEAN, + `COnSECTeTuR` SMALLINT DEFAULT -31852, + `DOLOrIBUS` FLOAT NOT NULL, + `QUiS` SMALLINT NULL, + `consEquatuR` BOOLEAN NOT NULL, + `vERO` BOOLEAN, + PRIMARY KEY(`repudiAndae`, `ULLaM`, `DoLOruM`) +); + +INSERT INTO `esT` ( + `consEquatuR`, + `eAque`, + `eT`, + `repudiAndae`, + `DOLOrIBUS` +) +VALUES +( + false, + false, + '+234049-06-04 01:11:41.163+0000', + 'hello', + 0.97377783 +), +( + false, + true, + '-19578-12-20 11:45:59.875+0000', + NULL, + 0.3535998 +); + +SELECT * FROM `esT` order by `eT` desc; + +DROP TABLE `esT`;