diff --git a/Cargo.lock b/Cargo.lock index a48b6afee2ad..6fd70f2b719b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6274,6 +6274,7 @@ dependencies = [ "common-datasource", "common-decimal", "common-error", + "common-function", "common-macro", "common-procedure-test", "common-query", @@ -6315,6 +6316,7 @@ dependencies = [ "serde", "serde_json", "serde_with", + "session", "smallvec", "snafu 0.8.3", "store-api", diff --git a/src/index/src/fulltext_index.rs b/src/index/src/fulltext_index.rs index f6dfb4ae83fd..8b0bde3d64e9 100644 --- a/src/index/src/fulltext_index.rs +++ b/src/index/src/fulltext_index.rs @@ -16,6 +16,10 @@ use serde::{Deserialize, Serialize}; pub mod create; pub mod error; +pub mod search; + +#[cfg(test)] +mod tests; /// Configuration for fulltext index. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] diff --git a/src/index/src/fulltext_index/create.rs b/src/index/src/fulltext_index/create.rs index 6b85d6b1630f..43b905968816 100644 --- a/src/index/src/fulltext_index/create.rs +++ b/src/index/src/fulltext_index/create.rs @@ -15,7 +15,7 @@ mod tantivy; use async_trait::async_trait; -pub use tantivy::TantivyFulltextIndexCreator; +pub use tantivy::{TantivyFulltextIndexCreator, TEXT_FIELD_NAME}; use crate::fulltext_index::error::Result; diff --git a/src/index/src/fulltext_index/error.rs b/src/index/src/fulltext_index/error.rs index ddc54d0dae2c..e4ecfc24fc80 100644 --- a/src/index/src/fulltext_index/error.rs +++ b/src/index/src/fulltext_index/error.rs @@ -40,6 +40,14 @@ pub enum Error { location: Location, }, + #[snafu(display("Tantivy parser error"))] + TantivyParser { + #[snafu(source)] + error: tantivy::query::QueryParserError, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Operate on a finished creator"))] Finished { #[snafu(implicit)] @@ -60,6 +68,8 @@ impl ErrorExt for Error { match self { Tantivy { .. } => StatusCode::Internal, + TantivyParser { .. } => StatusCode::InvalidSyntax, + Io { .. } | Finished { .. } => StatusCode::Unexpected, External { source, .. } => source.status_code(), diff --git a/src/index/src/fulltext_index/search.rs b/src/index/src/fulltext_index/search.rs new file mode 100644 index 000000000000..8f414ad6ddad --- /dev/null +++ b/src/index/src/fulltext_index/search.rs @@ -0,0 +1,30 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod tantivy; + +use std::collections::BTreeSet; + +use async_trait::async_trait; + +use crate::fulltext_index::error::Result; +pub use crate::fulltext_index::search::tantivy::TantivyFulltextIndexSearcher; + +pub type RowId = u32; + +/// `FulltextIndexSearcher` is a trait for searching fulltext index. +#[async_trait] +pub trait FulltextIndexSearcher { + async fn search(&self, query: &str) -> Result>; +} diff --git a/src/index/src/fulltext_index/search/tantivy.rs b/src/index/src/fulltext_index/search/tantivy.rs new file mode 100644 index 000000000000..0927f5dbfdef --- /dev/null +++ b/src/index/src/fulltext_index/search/tantivy.rs @@ -0,0 +1,85 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeSet; +use std::path::Path; +use std::time::Instant; + +use async_trait::async_trait; +use common_telemetry::debug; +use snafu::ResultExt; +use tantivy::collector::DocSetCollector; +use tantivy::query::QueryParser; +use tantivy::schema::Field; +use tantivy::{Index, IndexReader, ReloadPolicy}; + +use crate::fulltext_index::create::TEXT_FIELD_NAME; +use crate::fulltext_index::error::{Result, TantivyParserSnafu, TantivySnafu}; +use crate::fulltext_index::search::{FulltextIndexSearcher, RowId}; + +/// `TantivyFulltextIndexSearcher` is a searcher using Tantivy. +pub struct TantivyFulltextIndexSearcher { + /// Tanitvy index. + index: Index, + /// Tanitvy index reader. + reader: IndexReader, + /// The default field used to build `QueryParser` + default_field: Field, +} + +impl TantivyFulltextIndexSearcher { + /// Creates a new `TantivyFulltextIndexSearcher`. + pub fn new(path: impl AsRef) -> Result { + let now = Instant::now(); + + let index = Index::open_in_dir(path.as_ref()).context(TantivySnafu)?; + let reader = index + .reader_builder() + .reload_policy(ReloadPolicy::Manual) + .num_warming_threads(0) + .try_into() + .context(TantivySnafu)?; + let default_field = index + .schema() + .get_field(TEXT_FIELD_NAME) + .context(TantivySnafu)?; + + debug!( + "Opened tantivy index on {:?} in {:?}", + path.as_ref(), + now.elapsed() + ); + + Ok(Self { + index, + reader, + default_field, + }) + } +} + +#[async_trait] +impl FulltextIndexSearcher for TantivyFulltextIndexSearcher { + async fn search(&self, query: &str) -> Result> { + let searcher = self.reader.searcher(); + let query_parser = QueryParser::for_index(&self.index, vec![self.default_field]); + let query = query_parser + .parse_query(query) + .context(TantivyParserSnafu)?; + let docs = searcher + .search(&query, &DocSetCollector) + .context(TantivySnafu)?; + Ok(docs.into_iter().map(|d| d.doc_id).collect()) + } +} diff --git a/src/index/src/fulltext_index/tests.rs b/src/index/src/fulltext_index/tests.rs new file mode 100644 index 000000000000..3e7c88c6a25b --- /dev/null +++ b/src/index/src/fulltext_index/tests.rs @@ -0,0 +1,167 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeSet; + +use common_test_util::temp_dir::{create_temp_dir, TempDir}; + +use crate::fulltext_index::create::{FulltextIndexCreator, TantivyFulltextIndexCreator}; +use crate::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher}; +use crate::fulltext_index::{Analyzer, Config}; + +async fn create_index(prefix: &str, texts: Vec<&str>, config: Config) -> TempDir { + let tempdir = create_temp_dir(prefix); + + let mut creator = TantivyFulltextIndexCreator::new(tempdir.path(), config, 1024 * 1024) + .await + .unwrap(); + + for text in texts { + creator.push_text(text).await.unwrap(); + } + + creator.finish().await.unwrap(); + tempdir +} + +async fn test_search( + prefix: &str, + config: Config, + texts: Vec<&str>, + query: &str, + expected: impl IntoIterator, +) { + let index_path = create_index(prefix, texts, config).await; + + let searcher = TantivyFulltextIndexSearcher::new(index_path.path()).unwrap(); + let results = searcher.search(query).await.unwrap(); + + let expected = expected.into_iter().collect::>(); + assert_eq!(results, expected); +} + +#[tokio::test] +async fn test_simple_term() { + test_search( + "test_simple_term_", + Config::default(), + vec![ + "This is a sample text containing Barack Obama", + "Another document mentioning Barack", + ], + "Barack Obama", + [0, 1], + ) + .await; +} + +#[tokio::test] +async fn test_negative_term() { + test_search( + "test_negative_term_", + Config::default(), + vec!["apple is a fruit", "I like apple", "fruit is healthy"], + "apple -fruit", + [1], + ) + .await; +} + +#[tokio::test] +async fn test_must_term() { + test_search( + "test_must_term_", + Config::default(), + vec![ + "apple is tasty", + "I love apples and fruits", + "apple and fruit are good", + ], + "+apple +fruit", + [2], + ) + .await; +} + +#[tokio::test] +async fn test_boolean_operators() { + test_search( + "test_boolean_operators_", + Config::default(), + vec!["a b c", "a b", "b c", "c"], + "a AND b OR c", + [0, 1, 2, 3], + ) + .await; +} + +#[tokio::test] +async fn test_phrase_term() { + test_search( + "test_phrase_term_", + Config::default(), + vec![ + "This is a sample text containing Barack Obama", + "Another document mentioning Barack", + ], + "\"Barack Obama\"", + [0], + ) + .await; +} + +#[tokio::test] +async fn test_config_english_analyzer_case_insensitive() { + test_search( + "test_config_english_analyzer_case_insensitive_", + Config { + case_sensitive: false, + ..Config::default() + }, + vec!["Banana is a fruit", "I like apple", "Fruit is healthy"], + "banana", + [0], + ) + .await; +} + +#[tokio::test] +async fn test_config_english_analyzer_case_sensitive() { + test_search( + "test_config_english_analyzer_case_sensitive_", + Config { + case_sensitive: true, + ..Config::default() + }, + vec!["Banana is a fruit", "I like apple", "Fruit is healthy"], + "banana", + [], + ) + .await; +} + +#[tokio::test] +async fn test_config_chinese_analyzer() { + test_search( + "test_config_chinese_analyzer_", + Config { + analyzer: Analyzer::Chinese, + ..Default::default() + }, + vec!["苹果是一种水果", "我喜欢苹果", "水果很健康"], + "苹果", + [0, 1], + ) + .await; +} diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 4676beaefff1..d4646006d6a5 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -73,6 +73,7 @@ tokio-util.workspace = true uuid.workspace = true [dev-dependencies] +common-function.workspace = true common-procedure-test.workspace = true common-test-util.workspace = true criterion = "0.4" @@ -82,6 +83,7 @@ object-store = { workspace = true, features = ["services-memory"] } rskafka.workspace = true rstest.workspace = true rstest_reuse.workspace = true +session.workspace = true toml.workspace = true [[bench]] diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 8cd3da32f740..d92cd1044ded 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -431,6 +431,7 @@ impl EngineInner { ) .with_parallelism(scan_parallelism) .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled()) + .with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled()) .with_start_time(query_start); Ok(scan_region) diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index d6fe9f226ae0..bedcfcf927ac 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -556,8 +556,8 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to apply index"))] - ApplyIndex { + #[snafu(display("Failed to apply inverted index"))] + ApplyInvertedIndex { source: index::inverted_index::error::Error, #[snafu(implicit)] location: Location, @@ -821,6 +821,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to apply fulltext index"))] + ApplyFulltextIndex { + source: index::fulltext_index::error::Error, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -918,7 +925,7 @@ impl ErrorExt for Error { ConvertValue { source, .. } => source.status_code(), BuildIndexApplier { source, .. } | PushIndexValue { source, .. } - | ApplyIndex { source, .. } + | ApplyInvertedIndex { source, .. } | IndexFinish { source, .. } => source.status_code(), PuffinReadMetadata { source, .. } | PuffinReadBlob { source, .. } @@ -946,7 +953,9 @@ impl ErrorExt for Error { FulltextOptions { source, .. } => source.status_code(), CreateFulltextCreator { source, .. } => source.status_code(), CastVector { source, .. } => source.status_code(), - FulltextPushText { source, .. } | FulltextFinish { source, .. } => source.status_code(), + FulltextPushText { source, .. } + | FulltextFinish { source, .. } + | ApplyFulltextIndex { source, .. } => source.status_code(), } } diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index 75a79289cd92..c4cb707adda9 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -194,9 +194,10 @@ lazy_static! { // Index metrics. /// Timer of index application. - pub static ref INDEX_APPLY_ELAPSED: Histogram = register_histogram!( + pub static ref INDEX_APPLY_ELAPSED: HistogramVec = register_histogram_vec!( "greptime_index_apply_elapsed", "index apply elapsed", + &[TYPE_LABEL], ) .unwrap(); /// Gauge of index apply memory usage. diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 0fe4c7efa29e..7f68bb00f14c 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -45,8 +45,10 @@ use crate::read::{Batch, Source}; use crate::region::options::MergeMode; use crate::region::version::VersionRef; use crate::sst::file::{overlaps, FileHandle, FileMeta}; -use crate::sst::index::inverted_index::applier::builder::SstIndexApplierBuilder; -use crate::sst::index::inverted_index::applier::SstIndexApplierRef; +use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder; +use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef; +use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder; +use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef; use crate::sst::parquet::file_range::FileRange; /// A scanner scans a region and returns a [SendableRecordBatchStream]. @@ -165,6 +167,8 @@ pub(crate) struct ScanRegion { parallelism: ScanParallism, /// Whether to ignore inverted index. ignore_inverted_index: bool, + /// Whether to ignore fulltext index. + ignore_fulltext_index: bool, /// Start time of the scan task. start_time: Option, } @@ -184,6 +188,7 @@ impl ScanRegion { cache_manager, parallelism: ScanParallism::default(), ignore_inverted_index: false, + ignore_fulltext_index: false, start_time: None, } } @@ -195,12 +200,20 @@ impl ScanRegion { self } + /// Sets whether to ignore inverted index. #[must_use] pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self { self.ignore_inverted_index = ignore; self } + /// Sets whether to ignore fulltext index. + #[must_use] + pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self { + self.ignore_fulltext_index = ignore; + self + } + #[must_use] pub(crate) fn with_start_time(mut self, now: Instant) -> Self { self.start_time = Some(now); @@ -278,7 +291,8 @@ impl ScanRegion { self.version.options.append_mode, ); - let index_applier = self.build_index_applier(); + let inverted_index_applier = self.build_invereted_index_applier(); + let fulltext_index_applier = self.build_fulltext_index_applier(); let predicate = Predicate::new(self.request.filters.clone()); // The mapper always computes projected column ids as the schema of SSTs may change. let mapper = match &self.request.projection { @@ -292,7 +306,8 @@ impl ScanRegion { .with_memtables(memtables) .with_files(files) .with_cache(self.cache_manager) - .with_index_applier(index_applier) + .with_inverted_index_applier(inverted_index_applier) + .with_fulltext_index_applier(fulltext_index_applier) .with_parallelism(self.parallelism) .with_start_time(self.start_time) .with_append_mode(self.version.options.append_mode) @@ -318,8 +333,8 @@ impl ScanRegion { ) } - /// Use the latest schema to build the index applier. - fn build_index_applier(&self) -> Option { + /// Use the latest schema to build the inveretd index applier. + fn build_invereted_index_applier(&self) -> Option { if self.ignore_inverted_index { return None; } @@ -337,7 +352,7 @@ impl ScanRegion { .and_then(|c| c.index_cache()) .cloned(); - SstIndexApplierBuilder::new( + InvertedIndexApplierBuilder::new( self.access_layer.region_dir().to_string(), self.access_layer.object_store().clone(), file_cache, @@ -354,7 +369,26 @@ impl ScanRegion { self.access_layer.puffin_manager_factory().clone(), ) .build(&self.request.filters) - .inspect_err(|err| warn!(err; "Failed to build index applier")) + .inspect_err(|err| warn!(err; "Failed to build invereted index applier")) + .ok() + .flatten() + .map(Arc::new) + } + + /// Use the latest schema to build the fulltext index applier. + fn build_fulltext_index_applier(&self) -> Option { + if self.ignore_fulltext_index { + return None; + } + + FulltextIndexApplierBuilder::new( + self.access_layer.region_dir().to_string(), + self.access_layer.object_store().clone(), + self.access_layer.puffin_manager_factory().clone(), + self.version.metadata.as_ref(), + ) + .build(&self.request.filters) + .inspect_err(|err| warn!(err; "Failed to build fulltext index applier")) .ok() .flatten() .map(Arc::new) @@ -401,8 +435,9 @@ pub(crate) struct ScanInput { ignore_file_not_found: bool, /// Parallelism to scan data. pub(crate) parallelism: ScanParallism, - /// Index applier. - index_applier: Option, + /// Index appliers. + inverted_index_applier: Option, + fulltext_index_applier: Option, /// Start time of the query. pub(crate) query_start: Option, /// The region is using append mode. @@ -429,7 +464,8 @@ impl ScanInput { cache_manager: None, ignore_file_not_found: false, parallelism: ScanParallism::default(), - index_applier: None, + inverted_index_applier: None, + fulltext_index_applier: None, query_start: None, append_mode: false, filter_deleted: true, @@ -487,10 +523,23 @@ impl ScanInput { self } - /// Sets index applier. + /// Sets invereted index applier. #[must_use] - pub(crate) fn with_index_applier(mut self, index_applier: Option) -> Self { - self.index_applier = index_applier; + pub(crate) fn with_inverted_index_applier( + mut self, + applier: Option, + ) -> Self { + self.inverted_index_applier = applier; + self + } + + /// Sets fulltext index applier. + #[must_use] + pub(crate) fn with_fulltext_index_applier( + mut self, + applier: Option, + ) -> Self { + self.fulltext_index_applier = applier; self } @@ -566,7 +615,8 @@ impl ScanInput { .time_range(self.time_range) .projection(Some(self.mapper.column_ids().to_vec())) .cache(self.cache_manager.clone()) - .index_applier(self.index_applier.clone()) + .inverted_index_applier(self.inverted_index_applier.clone()) + .fulltext_index_applier(self.fulltext_index_applier.clone()) .expected_metadata(Some(self.mapper.metadata().clone())) .build_reader_input() .await; diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index d3a7a1704417..622594301852 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -34,9 +34,9 @@ use crate::metrics::INDEX_CREATE_MEMORY_USAGE; use crate::read::Batch; use crate::region::options::IndexOptions; use crate::sst::file::FileId; -use crate::sst::index::fulltext_index::creator::SstIndexCreator as FulltextIndexer; +use crate::sst::index::fulltext_index::creator::FulltextIndexer; use crate::sst::index::intermediate::IntermediateManager; -use crate::sst::index::inverted_index::creator::SstIndexCreator as InvertedIndexer; +use crate::sst::index::inverted_index::creator::InvertedIndexer; pub(crate) const TYPE_INVERTED_INDEX: &str = "inverted_index"; pub(crate) const TYPE_FULLTEXT_INDEX: &str = "fulltext_index"; diff --git a/src/mito2/src/sst/index/fulltext_index.rs b/src/mito2/src/sst/index/fulltext_index.rs index ed4db4ae87ba..04c2e6daba9f 100644 --- a/src/mito2/src/sst/index/fulltext_index.rs +++ b/src/mito2/src/sst/index/fulltext_index.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub(crate) mod applier; pub(crate) mod creator; const INDEX_BLOB_TYPE: &str = "greptime-fulltext-index-v1"; diff --git a/src/mito2/src/sst/index/fulltext_index/applier.rs b/src/mito2/src/sst/index/fulltext_index/applier.rs new file mode 100644 index 000000000000..8eda10160843 --- /dev/null +++ b/src/mito2/src/sst/index/fulltext_index/applier.rs @@ -0,0 +1,129 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeSet; +use std::sync::Arc; + +use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher}; +use object_store::ObjectStore; +use puffin::puffin_manager::{DirGuard, PuffinManager, PuffinReader}; +use snafu::ResultExt; +use store_api::storage::ColumnId; + +use crate::error::{ApplyFulltextIndexSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result}; +use crate::metrics::INDEX_APPLY_ELAPSED; +use crate::sst::file::FileId; +use crate::sst::index::fulltext_index::INDEX_BLOB_TYPE; +use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinDir}; +use crate::sst::index::TYPE_FULLTEXT_INDEX; +use crate::sst::location; + +pub mod builder; + +/// `FulltextIndexApplier` is responsible for applying fulltext index to the provided SST files +pub struct FulltextIndexApplier { + /// The root directory of the region. + region_dir: String, + + /// Queries to apply to the index. + queries: Vec<(ColumnId, String)>, + + /// The puffin manager factory. + puffin_manager_factory: PuffinManagerFactory, + + /// Store responsible for accessing index files. + store: ObjectStore, +} + +pub type FulltextIndexApplierRef = Arc; + +impl FulltextIndexApplier { + /// Creates a new `FulltextIndexApplier`. + pub fn new( + region_dir: String, + store: ObjectStore, + queries: Vec<(ColumnId, String)>, + puffin_manager_factory: PuffinManagerFactory, + ) -> Self { + Self { + region_dir, + store, + queries, + puffin_manager_factory, + } + } + + /// Applies the queries to the fulltext index of the specified SST file. + pub async fn apply(&self, file_id: FileId) -> Result> { + let _timer = INDEX_APPLY_ELAPSED + .with_label_values(&[TYPE_FULLTEXT_INDEX]) + .start_timer(); + + let mut inited = false; + let mut row_ids = BTreeSet::new(); + + for (column_id, query) in &self.queries { + let dir = self.index_dir_path(file_id, *column_id).await?; + let path = match &dir { + Some(dir) => dir.path(), + None => { + // Return empty set if the index not found. + return Ok(BTreeSet::new()); + } + }; + + let searcher = + TantivyFulltextIndexSearcher::new(path).context(ApplyFulltextIndexSnafu)?; + let result = searcher + .search(query) + .await + .context(ApplyFulltextIndexSnafu)?; + + if !inited { + row_ids = result; + inited = true; + continue; + } + + row_ids.retain(|id| result.contains(id)); + if row_ids.is_empty() { + break; + } + } + + Ok(row_ids) + } + + /// Returns `None` if the index not found. + async fn index_dir_path( + &self, + file_id: FileId, + column_id: ColumnId, + ) -> Result> { + let puffin_manager = self.puffin_manager_factory.build(self.store.clone()); + let file_path = location::index_file_path(&self.region_dir, file_id); + + match puffin_manager + .reader(&file_path) + .await + .context(PuffinBuildReaderSnafu)? + .dir(&format!("{INDEX_BLOB_TYPE}-{column_id}")) + .await + { + Ok(dir) => Ok(Some(dir)), + Err(puffin::error::Error::BlobNotFound { .. }) => Ok(None), + Err(err) => Err(err).context(PuffinReadBlobSnafu), + } + } +} diff --git a/src/mito2/src/sst/index/fulltext_index/applier/builder.rs b/src/mito2/src/sst/index/fulltext_index/applier/builder.rs new file mode 100644 index 000000000000..5a10ffd160c9 --- /dev/null +++ b/src/mito2/src/sst/index/fulltext_index/applier/builder.rs @@ -0,0 +1,234 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use datafusion_common::ScalarValue; +use datafusion_expr::Expr; +use object_store::ObjectStore; +use store_api::metadata::RegionMetadata; +use store_api::storage::{ColumnId, ConcreteDataType}; + +use crate::error::Result; +use crate::sst::index::fulltext_index::applier::FulltextIndexApplier; +use crate::sst::index::puffin_manager::PuffinManagerFactory; + +/// `FulltextIndexApplierBuilder` is a builder for `FulltextIndexApplier`. +pub struct FulltextIndexApplierBuilder<'a> { + region_dir: String, + store: ObjectStore, + puffin_manager_factory: PuffinManagerFactory, + metadata: &'a RegionMetadata, +} + +impl<'a> FulltextIndexApplierBuilder<'a> { + /// Creates a new `FulltextIndexApplierBuilder`. + pub fn new( + region_dir: String, + store: ObjectStore, + puffin_manager_factory: PuffinManagerFactory, + metadata: &'a RegionMetadata, + ) -> Self { + Self { + region_dir, + store, + puffin_manager_factory, + metadata, + } + } + + /// Builds `SstIndexApplier` from the given expressions. + pub fn build(self, exprs: &[Expr]) -> Result> { + let mut queries = Vec::with_capacity(exprs.len()); + for expr in exprs { + if let Some((column_id, query)) = Self::expr_to_query(self.metadata, expr) { + queries.push((column_id, query)); + } + } + + Ok((!queries.is_empty()).then(|| { + FulltextIndexApplier::new( + self.region_dir, + self.store, + queries, + self.puffin_manager_factory, + ) + })) + } + + fn expr_to_query(metadata: &RegionMetadata, expr: &Expr) -> Option<(ColumnId, String)> { + let Expr::ScalarFunction(f) = expr else { + return None; + }; + if f.name() != "matches" { + return None; + } + if f.args.len() != 2 { + return None; + } + + let Expr::Column(c) = &f.args[0] else { + return None; + }; + let column = metadata.column_by_name(&c.name)?; + + if column.column_schema.data_type != ConcreteDataType::string_datatype() { + return None; + } + + let Expr::Literal(ScalarValue::Utf8(Some(query))) = &f.args[1] else { + return None; + }; + + Some((column.column_id, query.to_string())) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use api::v1::SemanticType; + use common_function::function_registry::FUNCTION_REGISTRY; + use common_function::scalars::udf::create_udf; + use datafusion_common::Column; + use datafusion_expr::expr::ScalarFunction; + use datafusion_expr::ScalarUDF; + use datatypes::schema::ColumnSchema; + use session::context::QueryContext; + use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; + use store_api::storage::RegionId; + + use super::*; + + fn mock_metadata() -> RegionMetadata { + let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("text", ConcreteDataType::string_datatype(), true), + semantic_type: SemanticType::Field, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 2, + }); + + builder.build().unwrap() + } + + fn matches_func() -> Arc { + Arc::new( + create_udf( + FUNCTION_REGISTRY.get_function("matches").unwrap(), + QueryContext::arc(), + Default::default(), + ) + .into(), + ) + } + + #[test] + fn test_expr_to_query_basic() { + let metadata = mock_metadata(); + + let expr = Expr::ScalarFunction(ScalarFunction { + args: vec![ + Expr::Column(Column { + name: "text".to_string(), + relation: None, + }), + Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))), + ], + func: matches_func(), + }); + + let (column_id, query) = + FulltextIndexApplierBuilder::expr_to_query(&metadata, &expr).unwrap(); + assert_eq!(column_id, 1); + assert_eq!(query, "foo".to_string()); + } + + #[test] + fn test_expr_to_query_wrong_num_args() { + let metadata = mock_metadata(); + + let expr = Expr::ScalarFunction(ScalarFunction { + args: vec![Expr::Column(Column { + name: "text".to_string(), + relation: None, + })], + func: matches_func(), + }); + + assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &expr).is_none()); + } + + #[test] + fn test_expr_to_query_not_found_column() { + let metadata = mock_metadata(); + + let expr = Expr::ScalarFunction(ScalarFunction { + args: vec![ + Expr::Column(Column { + name: "not_found".to_string(), + relation: None, + }), + Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))), + ], + func: matches_func(), + }); + + assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &expr).is_none()); + } + + #[test] + fn test_expr_to_query_column_wrong_data_type() { + let metadata = mock_metadata(); + + let expr = Expr::ScalarFunction(ScalarFunction { + args: vec![ + Expr::Column(Column { + name: "ts".to_string(), + relation: None, + }), + Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))), + ], + func: matches_func(), + }); + + assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &expr).is_none()); + } + + #[test] + fn test_expr_to_query_pattern_not_string() { + let metadata = mock_metadata(); + + let expr = Expr::ScalarFunction(ScalarFunction { + args: vec![ + Expr::Column(Column { + name: "text".to_string(), + relation: None, + }), + Expr::Literal(ScalarValue::Int64(Some(42))), + ], + func: matches_func(), + }); + + assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &expr).is_none()); + } +} diff --git a/src/mito2/src/sst/index/fulltext_index/creator.rs b/src/mito2/src/sst/index/fulltext_index/creator.rs index b5c87b587b82..9fe5b57bf17f 100644 --- a/src/mito2/src/sst/index/fulltext_index/creator.rs +++ b/src/mito2/src/sst/index/fulltext_index/creator.rs @@ -38,8 +38,8 @@ use crate::sst::index::puffin_manager::SstPuffinWriter; use crate::sst::index::statistics::{ByteCount, RowCount, Statistics}; use crate::sst::index::TYPE_FULLTEXT_INDEX; -/// `SstIndexCreator` is responsible for creating fulltext indexes for SST files. -pub struct SstIndexCreator { +/// `FulltextIndexer` is responsible for creating fulltext indexes for SST files. +pub struct FulltextIndexer { /// Creators for each column. creators: HashMap, /// Whether the index creation was aborted. @@ -48,8 +48,8 @@ pub struct SstIndexCreator { stats: Statistics, } -impl SstIndexCreator { - /// Creates a new `SstIndexCreator`. +impl FulltextIndexer { + /// Creates a new `FulltextIndexer`. pub async fn new( region_id: &RegionId, sst_file_id: &FileId, @@ -173,7 +173,7 @@ impl SstIndexCreator { } } -impl SstIndexCreator { +impl FulltextIndexer { async fn do_update(&mut self, batch: &Batch) -> Result<()> { let mut guard = self.stats.record_update(); guard.inc_row_count(batch.num_rows()); @@ -293,5 +293,267 @@ impl SingleCreator { #[cfg(test)] mod tests { - // TODO(zhongzc): After search is implemented, add tests for full-text indexer. + use std::collections::BTreeSet; + use std::sync::Arc; + + use api::v1::SemanticType; + use datatypes::data_type::DataType; + use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextOptions}; + use datatypes::vectors::{UInt64Vector, UInt8Vector}; + use futures::future::BoxFuture; + use futures::FutureExt; + use index::fulltext_index::search::RowId; + use object_store::services::Memory; + use object_store::ObjectStore; + use puffin::puffin_manager::PuffinManager; + use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef}; + use store_api::storage::{ConcreteDataType, RegionId}; + + use super::*; + use crate::read::{Batch, BatchColumn}; + use crate::sst::file::FileId; + use crate::sst::index::fulltext_index::applier::FulltextIndexApplier; + use crate::sst::index::puffin_manager::PuffinManagerFactory; + use crate::sst::location; + + fn mock_object_store() -> ObjectStore { + ObjectStore::new(Memory::default()).unwrap().finish() + } + + async fn new_intm_mgr(path: impl AsRef) -> IntermediateManager { + IntermediateManager::init_fs(path).await.unwrap() + } + + fn mock_region_metadata() -> RegionMetadataRef { + let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "text_english_case_sensitive", + ConcreteDataType::string_datatype(), + true, + ) + .with_fulltext_options(FulltextOptions { + enable: true, + analyzer: FulltextAnalyzer::English, + case_sensitive: true, + }) + .unwrap(), + semantic_type: SemanticType::Field, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "text_english_case_insensitive", + ConcreteDataType::string_datatype(), + true, + ) + .with_fulltext_options(FulltextOptions { + enable: true, + analyzer: FulltextAnalyzer::English, + case_sensitive: false, + }) + .unwrap(), + semantic_type: SemanticType::Field, + column_id: 2, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "text_chinese", + ConcreteDataType::string_datatype(), + true, + ) + .with_fulltext_options(FulltextOptions { + enable: true, + analyzer: FulltextAnalyzer::Chinese, + case_sensitive: false, + }) + .unwrap(), + semantic_type: SemanticType::Field, + column_id: 3, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 4, + }); + + Arc::new(builder.build().unwrap()) + } + + fn new_batch( + rows: &[( + Option<&str>, // text_english_case_sensitive + Option<&str>, // text_english_case_insensitive + Option<&str>, // text_chinese + )], + ) -> Batch { + let mut vec_english_sensitive = + ConcreteDataType::string_datatype().create_mutable_vector(0); + let mut vec_english_insensitive = + ConcreteDataType::string_datatype().create_mutable_vector(0); + let mut vec_chinese = ConcreteDataType::string_datatype().create_mutable_vector(0); + + for (text_english_case_sensitive, text_english_case_insensitive, text_chinese) in rows { + match text_english_case_sensitive { + Some(s) => vec_english_sensitive.push_value_ref((*s).into()), + None => vec_english_sensitive.push_null(), + } + match text_english_case_insensitive { + Some(s) => vec_english_insensitive.push_value_ref((*s).into()), + None => vec_english_insensitive.push_null(), + } + match text_chinese { + Some(s) => vec_chinese.push_value_ref((*s).into()), + None => vec_chinese.push_null(), + } + } + + let num_rows = vec_english_sensitive.len(); + Batch::new( + vec![], + Arc::new(UInt64Vector::from_iter_values( + (0..num_rows).map(|n| n as u64), + )), + Arc::new(UInt64Vector::from_iter_values( + std::iter::repeat(0).take(num_rows), + )), + Arc::new(UInt8Vector::from_iter_values( + std::iter::repeat(1).take(num_rows), + )), + vec![ + BatchColumn { + column_id: 1, + data: vec_english_sensitive.to_vector(), + }, + BatchColumn { + column_id: 2, + data: vec_english_insensitive.to_vector(), + }, + BatchColumn { + column_id: 3, + data: vec_chinese.to_vector(), + }, + ], + ) + .unwrap() + } + + async fn build_applier_factory( + prefix: &str, + rows: &[( + Option<&str>, // text_english_case_sensitive + Option<&str>, // text_english_case_insensitive + Option<&str>, // text_chinese + )], + ) -> impl Fn(Vec<(ColumnId, &str)>) -> BoxFuture<'static, BTreeSet> { + let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await; + let region_dir = "region0".to_string(); + let sst_file_id = FileId::random(); + let file_path = location::index_file_path(®ion_dir, sst_file_id); + let object_store = mock_object_store(); + let region_metadata = mock_region_metadata(); + let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await; + + let mut indexer = FulltextIndexer::new( + ®ion_metadata.region_id, + &sst_file_id, + &intm_mgr, + ®ion_metadata, + true, + 1024, + ) + .await + .unwrap(); + + let batch = new_batch(rows); + indexer.update(&batch).await.unwrap(); + + let puffin_manager = factory.build(object_store.clone()); + let mut writer = puffin_manager.writer(&file_path).await.unwrap(); + let _ = indexer.finish(&mut writer).await.unwrap(); + writer.finish().await.unwrap(); + + move |queries| { + let _d = &d; + let applier = FulltextIndexApplier::new( + region_dir.clone(), + object_store.clone(), + queries + .into_iter() + .map(|(a, b)| (a, b.to_string())) + .collect(), + factory.clone(), + ); + + async move { applier.apply(sst_file_id).await.unwrap() }.boxed() + } + } + + #[tokio::test] + async fn test_fulltext_index_basic() { + let applier_factory = build_applier_factory( + "test_fulltext_index_basic_", + &[ + (Some("hello"), None, Some("你好")), + (Some("world"), Some("world"), None), + (None, Some("World"), Some("世界")), + ( + Some("Hello, World"), + Some("Hello, World"), + Some("你好,世界"), + ), + ], + ) + .await; + + let row_ids = applier_factory(vec![(1, "hello")]).await; + assert_eq!(row_ids, vec![0].into_iter().collect()); + + let row_ids = applier_factory(vec![(1, "world")]).await; + assert_eq!(row_ids, vec![1].into_iter().collect()); + + let row_ids = applier_factory(vec![(2, "hello")]).await; + assert_eq!(row_ids, vec![3].into_iter().collect()); + + let row_ids = applier_factory(vec![(2, "world")]).await; + assert_eq!(row_ids, vec![1, 2, 3].into_iter().collect()); + + let row_ids = applier_factory(vec![(3, "你好")]).await; + assert_eq!(row_ids, vec![0, 3].into_iter().collect()); + + let row_ids = applier_factory(vec![(3, "世界")]).await; + assert_eq!(row_ids, vec![2, 3].into_iter().collect()); + } + + #[tokio::test] + async fn test_fulltext_index_multi_columns() { + let applier_factory = build_applier_factory( + "test_fulltext_index_multi_columns_", + &[ + (Some("hello"), None, Some("你好")), + (Some("world"), Some("world"), None), + (None, Some("World"), Some("世界")), + ( + Some("Hello, World"), + Some("Hello, World"), + Some("你好,世界"), + ), + ], + ) + .await; + + let row_ids = applier_factory(vec![(1, "hello"), (3, "你好")]).await; + assert_eq!(row_ids, vec![0].into_iter().collect()); + + let row_ids = applier_factory(vec![(1, "world"), (3, "世界")]).await; + assert_eq!(row_ids, vec![].into_iter().collect()); + + let row_ids = applier_factory(vec![(2, "world"), (3, "世界")]).await; + assert_eq!(row_ids, vec![2, 3].into_iter().collect()); + } } diff --git a/src/mito2/src/sst/index/indexer/finish.rs b/src/mito2/src/sst/index/indexer/finish.rs index 324137ec2df0..a0157a9b66f4 100644 --- a/src/mito2/src/sst/index/indexer/finish.rs +++ b/src/mito2/src/sst/index/indexer/finish.rs @@ -15,8 +15,8 @@ use common_telemetry::{debug, warn}; use puffin::puffin_manager::{PuffinManager, PuffinWriter}; -use crate::sst::index::fulltext_index::creator::SstIndexCreator as FulltextIndexer; -use crate::sst::index::inverted_index::creator::SstIndexCreator as InvertedIndexer; +use crate::sst::index::fulltext_index::creator::FulltextIndexer; +use crate::sst::index::inverted_index::creator::InvertedIndexer; use crate::sst::index::puffin_manager::SstPuffinWriter; use crate::sst::index::statistics::{ByteCount, RowCount}; use crate::sst::index::{FulltextIndexOutput, IndexOutput, Indexer, InvertedIndexOutput}; diff --git a/src/mito2/src/sst/index/inverted_index/applier.rs b/src/mito2/src/sst/index/inverted_index/applier.rs index 566af65d2213..cac3ffedd74c 100644 --- a/src/mito2/src/sst/index/inverted_index/applier.rs +++ b/src/mito2/src/sst/index/inverted_index/applier.rs @@ -28,16 +28,17 @@ use store_api::storage::RegionId; use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey}; use crate::cache::index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef}; -use crate::error::{ApplyIndexSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result}; +use crate::error::{ApplyInvertedIndexSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result}; use crate::metrics::{INDEX_APPLY_ELAPSED, INDEX_APPLY_MEMORY_USAGE}; use crate::sst::file::FileId; use crate::sst::index::inverted_index::INDEX_BLOB_TYPE; use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory}; +use crate::sst::index::TYPE_INVERTED_INDEX; use crate::sst::location; -/// The [`SstIndexApplier`] is responsible for applying predicates to the provided SST files +/// `InvertedIndexApplier` is responsible for applying predicates to the provided SST files /// and returning the relevant row group ids for further scan. -pub(crate) struct SstIndexApplier { +pub(crate) struct InvertedIndexApplier { /// The root directory of the region. region_dir: String, @@ -61,10 +62,10 @@ pub(crate) struct SstIndexApplier { inverted_index_cache: Option, } -pub(crate) type SstIndexApplierRef = Arc; +pub(crate) type InvertedIndexApplierRef = Arc; -impl SstIndexApplier { - /// Creates a new [`SstIndexApplier`]. +impl InvertedIndexApplier { + /// Creates a new `InvertedIndexApplier`. pub fn new( region_dir: String, region_id: RegionId, @@ -89,7 +90,9 @@ impl SstIndexApplier { /// Applies predicates to the provided SST file id and returns the relevant row group ids pub async fn apply(&self, file_id: FileId) -> Result { - let _timer = INDEX_APPLY_ELAPSED.start_timer(); + let _timer = INDEX_APPLY_ELAPSED + .with_label_values(&[TYPE_INVERTED_INDEX]) + .start_timer(); let context = SearchContext { // Encountering a non-existing column indicates that it doesn't match predicates. @@ -115,13 +118,13 @@ impl SstIndexApplier { self.index_applier .apply(context, &mut index_reader) .await - .context(ApplyIndexSnafu) + .context(ApplyInvertedIndexSnafu) } else { let mut index_reader = InvertedIndexBlobReader::new(blob); self.index_applier .apply(context, &mut index_reader) .await - .context(ApplyIndexSnafu) + .context(ApplyInvertedIndexSnafu) } } @@ -169,7 +172,7 @@ impl SstIndexApplier { } } -impl Drop for SstIndexApplier { +impl Drop for InvertedIndexApplier { fn drop(&mut self) { INDEX_APPLY_MEMORY_USAGE.sub(self.index_applier.memory_usage() as i64); } @@ -212,7 +215,7 @@ mod tests { }) }); - let sst_index_applier = SstIndexApplier::new( + let sst_index_applier = InvertedIndexApplier::new( region_dir.clone(), RegionId::new(0, 0), object_store, @@ -254,7 +257,7 @@ mod tests { mock_index_applier.expect_memory_usage().returning(|| 100); mock_index_applier.expect_apply().never(); - let sst_index_applier = SstIndexApplier::new( + let sst_index_applier = InvertedIndexApplier::new( region_dir.clone(), RegionId::new(0, 0), object_store, diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder.rs b/src/mito2/src/sst/index/inverted_index/applier/builder.rs index 0736b07fb6b9..6d37ffc02305 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder.rs @@ -37,19 +37,19 @@ use crate::cache::file_cache::FileCacheRef; use crate::cache::index::InvertedIndexCacheRef; use crate::error::{BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, Result}; use crate::row_converter::SortField; -use crate::sst::index::inverted_index::applier::SstIndexApplier; +use crate::sst::index::inverted_index::applier::InvertedIndexApplier; use crate::sst::index::inverted_index::codec::IndexValueCodec; use crate::sst::index::puffin_manager::PuffinManagerFactory; -/// Constructs an [`SstIndexApplier`] which applies predicates to SST files during scan. -pub(crate) struct SstIndexApplierBuilder<'a> { - /// Directory of the region, required argument for constructing [`SstIndexApplier`]. +/// Constructs an [`InvertedIndexApplier`] which applies predicates to SST files during scan. +pub(crate) struct InvertedIndexApplierBuilder<'a> { + /// Directory of the region, required argument for constructing [`InvertedIndexApplier`]. region_dir: String, - /// Object store, required argument for constructing [`SstIndexApplier`]. + /// Object store, required argument for constructing [`InvertedIndexApplier`]. object_store: ObjectStore, - /// File cache, required argument for constructing [`SstIndexApplier`]. + /// File cache, required argument for constructing [`InvertedIndexApplier`]. file_cache: Option, /// Metadata of the region, used to get metadata like column type. @@ -68,8 +68,8 @@ pub(crate) struct SstIndexApplierBuilder<'a> { index_cache: Option, } -impl<'a> SstIndexApplierBuilder<'a> { - /// Creates a new [`SstIndexApplierBuilder`]. +impl<'a> InvertedIndexApplierBuilder<'a> { + /// Creates a new [`InvertedIndexApplierBuilder`]. pub fn new( region_dir: String, object_store: ObjectStore, @@ -91,9 +91,9 @@ impl<'a> SstIndexApplierBuilder<'a> { } } - /// Consumes the builder to construct an [`SstIndexApplier`], optionally returned based on + /// Consumes the builder to construct an [`InvertedIndexApplier`], optionally returned based on /// the expressions provided. If no predicates match, returns `None`. - pub fn build(mut self, exprs: &[Expr]) -> Result> { + pub fn build(mut self, exprs: &[Expr]) -> Result> { for expr in exprs { self.traverse_and_collect(expr); } @@ -109,7 +109,7 @@ impl<'a> SstIndexApplierBuilder<'a> { .collect(); let applier = PredicatesIndexApplier::try_from(predicates); - Ok(Some(SstIndexApplier::new( + Ok(Some(InvertedIndexApplier::new( self.region_dir, self.metadata.region_id, self.object_store, @@ -324,7 +324,7 @@ mod tests { let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_and_basic_"); let metadata = test_region_metadata(); - let mut builder = SstIndexApplierBuilder::new( + let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), None, diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder/between.rs b/src/mito2/src/sst/index/inverted_index/applier/builder/between.rs index c7f1f90cf0ce..42742b885ca8 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder/between.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder/between.rs @@ -16,9 +16,9 @@ use datafusion_expr::Between; use index::inverted_index::search::predicate::{Bound, Predicate, Range, RangePredicate}; use crate::error::Result; -use crate::sst::index::inverted_index::applier::builder::SstIndexApplierBuilder; +use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder; -impl<'a> SstIndexApplierBuilder<'a> { +impl<'a> InvertedIndexApplierBuilder<'a> { /// Collects a `BETWEEN` expression in the form of `column BETWEEN lit AND lit`. pub(crate) fn collect_between(&mut self, between: &Between) -> Result<()> { if between.negated { @@ -72,7 +72,7 @@ mod tests { fn test_collect_between_basic() { let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_between_basic_"); let metadata = test_region_metadata(); - let mut builder = SstIndexApplierBuilder::new( + let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), None, @@ -115,7 +115,7 @@ mod tests { let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_between_negated_"); let metadata = test_region_metadata(); - let mut builder = SstIndexApplierBuilder::new( + let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), None, @@ -141,7 +141,7 @@ mod tests { let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_between_field_column_"); let metadata = test_region_metadata(); - let mut builder = SstIndexApplierBuilder::new( + let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), None, @@ -167,7 +167,7 @@ mod tests { let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_between_type_mismatch_"); let metadata = test_region_metadata(); - let mut builder = SstIndexApplierBuilder::new( + let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), None, @@ -194,7 +194,7 @@ mod tests { let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_between_nonexistent_column_"); let metadata = test_region_metadata(); - let mut builder = SstIndexApplierBuilder::new( + let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), None, diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder/comparison.rs b/src/mito2/src/sst/index/inverted_index/applier/builder/comparison.rs index b7870039115d..dce83cf917d0 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder/comparison.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder/comparison.rs @@ -17,9 +17,9 @@ use index::inverted_index::search::predicate::{Bound, Predicate, Range, RangePre use index::inverted_index::Bytes; use crate::error::Result; -use crate::sst::index::inverted_index::applier::builder::SstIndexApplierBuilder; +use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder; -impl<'a> SstIndexApplierBuilder<'a> { +impl<'a> InvertedIndexApplierBuilder<'a> { /// Collects a comparison expression in the form of /// `column < lit`, `column > lit`, `column <= lit`, `column >= lit`, /// `lit < column`, `lit > column`, `lit <= column`, `lit >= column`. @@ -228,7 +228,7 @@ mod tests { let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_comparison_basic_"); let metadata = test_region_metadata(); - let mut builder = SstIndexApplierBuilder::new( + let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), None, @@ -257,7 +257,7 @@ mod tests { let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_comparison_type_mismatch_"); let metadata = test_region_metadata(); - let mut builder = SstIndexApplierBuilder::new( + let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), None, @@ -277,7 +277,7 @@ mod tests { let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_comparison_field_column_"); let metadata = test_region_metadata(); - let mut builder = SstIndexApplierBuilder::new( + let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), None, @@ -298,7 +298,7 @@ mod tests { let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_comparison_nonexistent_column_"); let metadata = test_region_metadata(); - let mut builder = SstIndexApplierBuilder::new( + let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), None, diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder/eq_list.rs b/src/mito2/src/sst/index/inverted_index/applier/builder/eq_list.rs index c776cdb74361..85c0eafdb4c5 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder/eq_list.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder/eq_list.rs @@ -20,9 +20,9 @@ use index::inverted_index::search::predicate::{InListPredicate, Predicate}; use index::inverted_index::Bytes; use crate::error::Result; -use crate::sst::index::inverted_index::applier::builder::SstIndexApplierBuilder; +use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder; -impl<'a> SstIndexApplierBuilder<'a> { +impl<'a> InvertedIndexApplierBuilder<'a> { /// Collects an eq expression in the form of `column = lit`. pub(crate) fn collect_eq(&mut self, left: &DfExpr, right: &DfExpr) -> Result<()> { let Some(column_name) = Self::column_name(left).or_else(|| Self::column_name(right)) else { @@ -134,7 +134,7 @@ mod tests { fn test_collect_eq_basic() { let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_eq_basic_"); let metadata = test_region_metadata(); - let mut builder = SstIndexApplierBuilder::new( + let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), None, @@ -172,7 +172,7 @@ mod tests { let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_eq_field_column_"); let metadata = test_region_metadata(); - let mut builder = SstIndexApplierBuilder::new( + let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), None, @@ -193,7 +193,7 @@ mod tests { let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_eq_nonexistent_column_"); let metadata = test_region_metadata(); - let mut builder = SstIndexApplierBuilder::new( + let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), None, @@ -213,7 +213,7 @@ mod tests { let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_eq_type_mismatch_"); let metadata = test_region_metadata(); - let mut builder = SstIndexApplierBuilder::new( + let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), None, @@ -233,7 +233,7 @@ mod tests { let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_basic_"); let metadata = test_region_metadata(); - let mut builder = SstIndexApplierBuilder::new( + let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), None, @@ -292,7 +292,7 @@ mod tests { let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_invalid_op_"); let metadata = test_region_metadata(); - let mut builder = SstIndexApplierBuilder::new( + let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), None, @@ -330,7 +330,7 @@ mod tests { let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_multiple_columns_"); let metadata = test_region_metadata(); - let mut builder = SstIndexApplierBuilder::new( + let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), None, diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder/in_list.rs b/src/mito2/src/sst/index/inverted_index/applier/builder/in_list.rs index 1f23d3fa0cee..0c5e2a74af07 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder/in_list.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder/in_list.rs @@ -18,9 +18,9 @@ use datafusion_expr::expr::InList; use index::inverted_index::search::predicate::{InListPredicate, Predicate}; use crate::error::Result; -use crate::sst::index::inverted_index::applier::builder::SstIndexApplierBuilder; +use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder; -impl<'a> SstIndexApplierBuilder<'a> { +impl<'a> InvertedIndexApplierBuilder<'a> { /// Collects an in list expression in the form of `column IN (lit, lit, ...)`. pub(crate) fn collect_inlist(&mut self, inlist: &InList) -> Result<()> { if inlist.negated { @@ -65,7 +65,7 @@ mod tests { fn test_collect_in_list_basic() { let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_in_list_basic_"); let metadata = test_region_metadata(); - let mut builder = SstIndexApplierBuilder::new( + let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), None, @@ -98,7 +98,7 @@ mod tests { let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_in_list_negated_"); let metadata = test_region_metadata(); - let mut builder = SstIndexApplierBuilder::new( + let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), None, @@ -123,7 +123,7 @@ mod tests { let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_in_list_field_column_"); let metadata = test_region_metadata(); - let mut builder = SstIndexApplierBuilder::new( + let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), None, @@ -148,7 +148,7 @@ mod tests { let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_in_list_type_mismatch_"); let metadata = test_region_metadata(); - let mut builder = SstIndexApplierBuilder::new( + let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), None, @@ -175,7 +175,7 @@ mod tests { PuffinManagerFactory::new_for_test_block("test_collect_in_list_nonexistent_column_"); let metadata = test_region_metadata(); - let mut builder = SstIndexApplierBuilder::new( + let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), None, diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder/regex_match.rs b/src/mito2/src/sst/index/inverted_index/applier/builder/regex_match.rs index ae19b5ef7ce8..baae6a2c49aa 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder/regex_match.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder/regex_match.rs @@ -17,9 +17,9 @@ use datafusion_expr::Expr as DfExpr; use index::inverted_index::search::predicate::{Predicate, RegexMatchPredicate}; use crate::error::Result; -use crate::sst::index::inverted_index::applier::builder::SstIndexApplierBuilder; +use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder; -impl<'a> SstIndexApplierBuilder<'a> { +impl<'a> InvertedIndexApplierBuilder<'a> { /// Collects a regex match expression in the form of `column ~ pattern`. pub(crate) fn collect_regex_match(&mut self, column: &DfExpr, pattern: &DfExpr) -> Result<()> { let Some(column_name) = Self::column_name(column) else { @@ -59,7 +59,7 @@ mod tests { fn test_regex_match_basic() { let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_regex_match_basic_"); let metadata = test_region_metadata(); - let mut builder = SstIndexApplierBuilder::new( + let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), None, @@ -88,7 +88,7 @@ mod tests { let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_regex_match_field_column_"); let metadata = test_region_metadata(); - let mut builder = SstIndexApplierBuilder::new( + let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), None, @@ -110,7 +110,7 @@ mod tests { let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_regex_match_type_mismatch_"); let metadata = test_region_metadata(); - let mut builder = SstIndexApplierBuilder::new( + let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), None, @@ -132,7 +132,7 @@ mod tests { let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_regex_match_type_nonexist_column_"); let metadata = test_region_metadata(); - let mut builder = SstIndexApplierBuilder::new( + let mut builder = InvertedIndexApplierBuilder::new( "test".to_string(), test_object_store(), None, diff --git a/src/mito2/src/sst/index/inverted_index/creator.rs b/src/mito2/src/sst/index/inverted_index/creator.rs index 72b9132e62e7..4cf20b94769a 100644 --- a/src/mito2/src/sst/index/inverted_index/creator.rs +++ b/src/mito2/src/sst/index/inverted_index/creator.rs @@ -51,8 +51,8 @@ const MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN: usize = 1024 * 1024; // 1MB /// The buffer size for the pipe used to send index data to the puffin blob. const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192; -/// Creates SST index. -pub struct SstIndexCreator { +/// `InvertedIndexer` creates inverted index for SST files. +pub struct InvertedIndexer { /// The index creator. index_creator: Box, /// The provider of intermediate files. @@ -75,8 +75,8 @@ pub struct SstIndexCreator { column_ids: HashSet, } -impl SstIndexCreator { - /// Creates a new `SstIndexCreator`. +impl InvertedIndexer { + /// Creates a new `InvertedIndexer`. /// Should ensure that the number of tag columns is greater than 0. pub fn new( sst_file_id: FileId, @@ -298,7 +298,7 @@ mod tests { use super::*; use crate::cache::index::InvertedIndexCache; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; - use crate::sst::index::inverted_index::applier::builder::SstIndexApplierBuilder; + use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder; use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::sst::location; @@ -384,7 +384,7 @@ mod tests { let memory_threshold = None; let segment_row_count = 2; - let mut creator = SstIndexCreator::new( + let mut creator = InvertedIndexer::new( sst_file_id, ®ion_metadata, intm_mgr, @@ -407,7 +407,7 @@ mod tests { move |expr| { let _d = &d; let cache = Arc::new(InvertedIndexCache::new(10, 10)); - let applier = SstIndexApplierBuilder::new( + let applier = InvertedIndexApplierBuilder::new( region_dir.clone(), object_store.clone(), None, diff --git a/src/mito2/src/sst/index/puffin_manager.rs b/src/mito2/src/sst/index/puffin_manager.rs index 98a240547d09..df49db75b6cc 100644 --- a/src/mito2/src/sst/index/puffin_manager.rs +++ b/src/mito2/src/sst/index/puffin_manager.rs @@ -39,7 +39,9 @@ pub(crate) type SstPuffinManager = FsPuffinManager, ObjectStorePuffinFileAccessor>; pub(crate) type SstPuffinReader = ::Reader; pub(crate) type SstPuffinWriter = ::Writer; -pub(crate) type BlobReader = <::Blob as BlobGuard>::Reader; +pub(crate) type SstPuffinBlob = ::Blob; +pub(crate) type SstPuffinDir = ::Dir; +pub(crate) type BlobReader = ::Reader; const STAGING_DIR: &str = "staging"; diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 32a24f682c13..9483d37e6a85 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -15,6 +15,7 @@ //! Parquet reader. use std::collections::{BTreeMap, VecDeque}; +use std::ops::Range; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -52,12 +53,15 @@ use crate::metrics::{ use crate::read::{Batch, BatchReader}; use crate::row_converter::{McmpRowCodec, SortField}; use crate::sst::file::FileHandle; -use crate::sst::index::inverted_index::applier::SstIndexApplierRef; +use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef; +use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef; use crate::sst::parquet::file_range::{FileRangeContext, FileRangeContextRef}; use crate::sst::parquet::format::ReadFormat; use crate::sst::parquet::metadata::MetadataLoader; use crate::sst::parquet::row_group::InMemoryRowGroup; -use crate::sst::parquet::row_selection::row_selection_from_row_ranges; +use crate::sst::parquet::row_selection::{ + intersect_row_selections, row_selection_from_row_ranges, row_selection_from_sorted_row_ids, +}; use crate::sst::parquet::stats::RowGroupPruningStats; use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY}; @@ -78,8 +82,9 @@ pub struct ParquetReaderBuilder { projection: Option>, /// Manager that caches SST data. cache_manager: Option, - /// Index applier. - index_applier: Option, + /// Index appliers. + inverted_index_applier: Option, + fulltext_index_applier: Option, /// Expected metadata of the region while reading the SST. /// This is usually the latest metadata of the region. The reader use /// it get the correct column id of a column by name. @@ -101,7 +106,8 @@ impl ParquetReaderBuilder { time_range: None, projection: None, cache_manager: None, - index_applier: None, + inverted_index_applier: None, + fulltext_index_applier: None, expected_metadata: None, } } @@ -136,10 +142,23 @@ impl ParquetReaderBuilder { self } - /// Attaches the index applier to the builder. + /// Attaches the inverted index applier to the builder. + #[must_use] + pub(crate) fn inverted_index_applier( + mut self, + index_applier: Option, + ) -> Self { + self.inverted_index_applier = index_applier; + self + } + + /// Attaches the fulltext index applier to the builder. #[must_use] - pub(crate) fn index_applier(mut self, index_applier: Option) -> Self { - self.index_applier = index_applier; + pub(crate) fn fulltext_index_applier( + mut self, + index_applier: Option, + ) -> Self { + self.fulltext_index_applier = index_applier; self } @@ -315,15 +334,92 @@ impl ParquetReaderBuilder { metrics: &mut ReaderMetrics, ) -> BTreeMap> { let num_row_groups = parquet_meta.num_row_groups(); - if num_row_groups == 0 { + let num_rows = parquet_meta.file_metadata().num_rows(); + if num_row_groups == 0 || num_rows == 0 { return BTreeMap::default(); } + + // Let's assume that the number of rows in the first row group + // can represent the `row_group_size` of the Parquet file. + let row_group_size = parquet_meta.row_group(0).num_rows() as usize; + if row_group_size == 0 { + return BTreeMap::default(); + } + metrics.num_row_groups_before_filtering += num_row_groups; + metrics.num_rows_in_row_group_before_filtering += num_rows as usize; - self.prune_row_groups_by_inverted_index(parquet_meta, metrics) - .await - .or_else(|| self.prune_row_groups_by_minmax(read_format, parquet_meta, metrics)) - .unwrap_or_else(|| (0..num_row_groups).map(|i| (i, None)).collect()) + let mut output = (0..num_row_groups).map(|i| (i, None)).collect(); + + self.prune_row_groups_by_fulltext_index(row_group_size, parquet_meta, &mut output, metrics) + .await; + if output.is_empty() { + return output; + } + + let inverted_filtered = self + .prune_row_groups_by_inverted_index(row_group_size, parquet_meta, &mut output, metrics) + .await; + if output.is_empty() { + return output; + } + + if !inverted_filtered { + self.prune_row_groups_by_minmax(read_format, parquet_meta, &mut output, metrics); + } + + output + } + + /// Prunes row groups by fulltext index. Returns `true` if the row groups are pruned. + async fn prune_row_groups_by_fulltext_index( + &self, + row_group_size: usize, + parquet_meta: &ParquetMetaData, + output: &mut BTreeMap>, + metrics: &mut ReaderMetrics, + ) -> bool { + let Some(index_applier) = &self.fulltext_index_applier else { + return false; + }; + if !self.file_handle.meta_ref().fulltext_index_available() { + return false; + } + + let apply_res = match index_applier.apply(self.file_handle.file_id()).await { + Ok(res) => res, + Err(err) => { + if cfg!(any(test, feature = "test")) { + panic!( + "Failed to apply full-text index, region_id: {}, file_id: {}, err: {}", + self.file_handle.region_id(), + self.file_handle.file_id(), + err + ); + } else { + warn!( + err; "Failed to apply full-text index, region_id: {}, file_id: {}", + self.file_handle.region_id(), self.file_handle.file_id() + ); + } + + return false; + } + }; + + let row_group_to_row_ids = apply_res + .into_iter() + .group_by(|row_id| *row_id as usize / row_group_size); + + Self::prune_row_groups_by_rows( + parquet_meta, + row_group_to_row_ids.into_iter(), + output, + &mut metrics.num_row_groups_fulltext_index_filtered, + &mut metrics.num_rows_in_row_group_fulltext_index_filtered, + ); + + true } /// Applies index to prune row groups. @@ -333,51 +429,42 @@ impl ParquetReaderBuilder { /// the correctness of the index. async fn prune_row_groups_by_inverted_index( &self, + row_group_size: usize, parquet_meta: &ParquetMetaData, + output: &mut BTreeMap>, metrics: &mut ReaderMetrics, - ) -> Option>> { - let Some(index_applier) = &self.index_applier else { - return None; + ) -> bool { + let Some(index_applier) = &self.inverted_index_applier else { + return false; }; if !self.file_handle.meta_ref().inverted_index_available() { - return None; + return false; } - let output = match index_applier.apply(self.file_handle.file_id()).await { + let apply_output = match index_applier.apply(self.file_handle.file_id()).await { Ok(output) => output, Err(err) => { if cfg!(any(test, feature = "test")) { panic!( - "Failed to apply index, region_id: {}, file_id: {}, err: {}", + "Failed to apply inverted index, region_id: {}, file_id: {}, err: {}", self.file_handle.region_id(), self.file_handle.file_id(), err ); } else { warn!( - err; "Failed to apply index, region_id: {}, file_id: {}", + err; "Failed to apply inverted index, region_id: {}, file_id: {}", self.file_handle.region_id(), self.file_handle.file_id() ); } - return None; + return false; } }; - // Let's assume that the number of rows in the first row group - // can represent the `row_group_size` of the Parquet file. - // - // If the file contains only one row group, i.e. the number of rows - // less than the `row_group_size`, the calculation of `row_group_id` - // and `rg_begin_row_id` is still correct. - let row_group_size = parquet_meta.row_group(0).num_rows() as usize; - if row_group_size == 0 { - return None; - } - - let segment_row_count = output.segment_row_count; - let row_groups = output + let segment_row_count = apply_output.segment_row_count; + let grouped_in_row_groups = apply_output .matched_segment_ids .iter_ones() .map(|seg_id| { @@ -389,26 +476,21 @@ impl ParquetReaderBuilder { (row_group_id, rg_begin_row_id..rg_end_row_id) }) - .group_by(|(row_group_id, _)| *row_group_id) - .into_iter() - .map(|(row_group_id, group)| { - let row_ranges = group.map(|(_, range)| range); - - let total_row_count = parquet_meta.row_group(row_group_id).num_rows() as usize; - let (row_selection, skipped) = - row_selection_from_row_ranges(row_ranges, total_row_count); + .group_by(|(row_group_id, _)| *row_group_id); - metrics.num_rows_in_row_group_before_filtering += total_row_count; - metrics.num_rows_in_row_group_inverted_index_filtered += skipped; - - (row_group_id, Some(row_selection)) - }) - .collect::>(); + let ranges_in_row_groups = grouped_in_row_groups + .into_iter() + .map(|(row_group_id, group)| (row_group_id, group.map(|(_, ranges)| ranges))); - let filtered = parquet_meta.num_row_groups() - row_groups.len(); - metrics.num_row_groups_inverted_index_filtered += filtered; + Self::prune_row_groups_by_ranges( + parquet_meta, + ranges_in_row_groups, + output, + &mut metrics.num_row_groups_inverted_index_filtered, + &mut metrics.num_rows_in_row_group_inverted_index_filtered, + ); - Some(row_groups) + true } /// Prunes row groups by min-max index. @@ -416,13 +498,14 @@ impl ParquetReaderBuilder { &self, read_format: &ReadFormat, parquet_meta: &ParquetMetaData, + output: &mut BTreeMap>, metrics: &mut ReaderMetrics, - ) -> Option>> { + ) -> bool { let Some(predicate) = &self.predicate else { - return None; + return false; }; - let num_row_groups = parquet_meta.num_row_groups(); + let row_groups_before = output.len(); let region_meta = read_format.metadata(); let row_groups = parquet_meta.row_groups(); @@ -431,18 +514,129 @@ impl ParquetReaderBuilder { // Here we use the schema of the SST to build the physical expression. If the column // in the SST doesn't have the same column id as the column in the expected metadata, // we will get a None statistics for that column. - let row_groups = predicate + let res = predicate .prune_with_stats(&stats, region_meta.schema.arrow_schema()) .iter() - .zip(0..num_row_groups) - .filter(|&(mask, _)| *mask) - .map(|(_, id)| (id, None)) + .zip(0..parquet_meta.num_row_groups()) + .filter_map(|(mask, row_group)| { + if !*mask { + return None; + } + + let selection = output.remove(&row_group)?; + Some((row_group, selection)) + }) .collect::>(); - let filtered = num_row_groups - row_groups.len(); - metrics.num_row_groups_min_max_filtered += filtered; + let row_groups_after = res.len(); + metrics.num_row_groups_min_max_filtered += row_groups_before - row_groups_after; - Some(row_groups) + *output = res; + true + } + + /// Prunes row groups by rows. The `rows_in_row_groups` is like a map from row group to + /// a list of row ids to keep. + fn prune_row_groups_by_rows( + parquet_meta: &ParquetMetaData, + rows_in_row_groups: impl Iterator)>, + output: &mut BTreeMap>, + filtered_row_groups: &mut usize, + filtered_rows: &mut usize, + ) { + let row_groups_before = output.len(); + let mut rows_in_row_group_before = 0; + let mut rows_in_row_group_after = 0; + + let mut res = BTreeMap::new(); + for (row_group, row_ids) in rows_in_row_groups { + let Some(selection) = output.remove(&row_group) else { + continue; + }; + + let total_row_count = parquet_meta.row_group(row_group).num_rows() as usize; + rows_in_row_group_before += selection + .as_ref() + .map_or(total_row_count, |s| s.row_count()); + + let new_selection = row_selection_from_sorted_row_ids(row_ids, total_row_count); + let intersected_selection = intersect_row_selections(selection, Some(new_selection)); + + let num_rows_after = intersected_selection + .as_ref() + .map_or(total_row_count, |s| s.row_count()); + rows_in_row_group_after += num_rows_after; + + if num_rows_after > 0 { + res.insert(row_group, intersected_selection); + } + } + + // Pruned row groups. + while let Some((row_group, selection)) = output.pop_first() { + let total_row_count = parquet_meta.row_group(row_group).num_rows() as usize; + rows_in_row_group_before += selection + .as_ref() + .map_or(total_row_count, |s| s.row_count()); + } + + let row_groups_after = res.len(); + *filtered_row_groups += row_groups_before - row_groups_after; + *filtered_rows += rows_in_row_group_before - rows_in_row_group_after; + + *output = res; + } + + /// Prunes row groups by ranges. The `ranges_in_row_groups` is like a map from row group to + /// a list of row ranges to keep. + fn prune_row_groups_by_ranges( + parquet_meta: &ParquetMetaData, + ranges_in_row_groups: impl Iterator>)>, + output: &mut BTreeMap>, + filtered_row_groups: &mut usize, + filtered_rows: &mut usize, + ) { + let row_groups_before = output.len(); + let mut rows_in_row_group_before = 0; + let mut rows_in_row_group_after = 0; + + let mut res = BTreeMap::new(); + for (row_group, row_ranges) in ranges_in_row_groups { + let Some(selection) = output.remove(&row_group) else { + continue; + }; + + let total_row_count = parquet_meta.row_group(row_group).num_rows() as usize; + rows_in_row_group_before += selection + .as_ref() + .map_or(total_row_count, |s| s.row_count()); + + let new_selection = row_selection_from_row_ranges(row_ranges, total_row_count); + let intersected_selection = intersect_row_selections(selection, Some(new_selection)); + + let num_rows_after = intersected_selection + .as_ref() + .map_or(total_row_count, |s| s.row_count()); + rows_in_row_group_after += num_rows_after; + + if num_rows_after > 0 { + res.insert(row_group, intersected_selection); + } + } + + // Pruned row groups. + while let Some((row_group, selection)) = output.pop_first() { + let total_row_count = parquet_meta.row_group(row_group).num_rows() as usize; + rows_in_row_group_before += selection + .as_ref() + .map_or(total_row_count, |s| s.row_count()); + } + + let row_groups_after = res.len(); + *filtered_row_groups += row_groups_before - row_groups_after; + *filtered_rows += rows_in_row_group_before - rows_in_row_group_after; + + *output = res; } } @@ -504,6 +698,8 @@ fn time_range_to_predicate( pub(crate) struct ReaderMetrics { /// Number of row groups before filtering. num_row_groups_before_filtering: usize, + /// Number of row groups filtered by fulltext index. + num_row_groups_fulltext_index_filtered: usize, /// Number of row groups filtered by inverted index. num_row_groups_inverted_index_filtered: usize, /// Number of row groups filtered by min-max index. @@ -512,6 +708,8 @@ pub(crate) struct ReaderMetrics { num_rows_precise_filtered: usize, /// Number of rows in row group before filtering. num_rows_in_row_group_before_filtering: usize, + /// Number of rows in row group filtered by fulltext index. + num_rows_in_row_group_fulltext_index_filtered: usize, /// Number of rows in row group filtered by inverted index. num_rows_in_row_group_inverted_index_filtered: usize, /// Duration to build the parquet reader. @@ -530,10 +728,13 @@ impl ReaderMetrics { /// Adds `other` metrics to this metrics. pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) { self.num_row_groups_before_filtering += other.num_row_groups_before_filtering; + self.num_row_groups_fulltext_index_filtered += other.num_row_groups_fulltext_index_filtered; self.num_row_groups_inverted_index_filtered += other.num_row_groups_inverted_index_filtered; self.num_row_groups_min_max_filtered += other.num_row_groups_min_max_filtered; self.num_rows_precise_filtered += other.num_rows_precise_filtered; self.num_rows_in_row_group_before_filtering += other.num_rows_in_row_group_before_filtering; + self.num_rows_in_row_group_fulltext_index_filtered += + other.num_rows_in_row_group_fulltext_index_filtered; self.num_rows_in_row_group_inverted_index_filtered += other.num_rows_in_row_group_inverted_index_filtered; self.build_cost += other.build_cost; @@ -782,6 +983,9 @@ impl Drop for ParquetReader { READ_ROW_GROUPS_TOTAL .with_label_values(&["before_filtering"]) .inc_by(metrics.num_row_groups_before_filtering as u64); + READ_ROW_GROUPS_TOTAL + .with_label_values(&["fulltext_index_filtered"]) + .inc_by(metrics.num_row_groups_fulltext_index_filtered as u64); READ_ROW_GROUPS_TOTAL .with_label_values(&["inverted_index_filtered"]) .inc_by(metrics.num_row_groups_inverted_index_filtered as u64); @@ -794,6 +998,9 @@ impl Drop for ParquetReader { READ_ROWS_IN_ROW_GROUP_TOTAL .with_label_values(&["before_filtering"]) .inc_by(metrics.num_rows_in_row_group_before_filtering as u64); + READ_ROWS_IN_ROW_GROUP_TOTAL + .with_label_values(&["fulltext_index_filtered"]) + .inc_by(metrics.num_rows_in_row_group_fulltext_index_filtered as u64); READ_ROWS_IN_ROW_GROUP_TOTAL .with_label_values(&["inverted_index_filtered"]) .inc_by(metrics.num_rows_in_row_group_inverted_index_filtered as u64); @@ -936,3 +1143,294 @@ impl RowGroupReader { Ok(()) } } + +#[cfg(test)] +mod tests { + use parquet::arrow::arrow_reader::RowSelector; + use parquet::file::metadata::{FileMetaData, RowGroupMetaData}; + use parquet::schema::types::{SchemaDescriptor, Type}; + + use super::*; + + fn mock_parquet_metadata_from_row_groups(num_rows_in_row_groups: Vec) -> ParquetMetaData { + let tp = Arc::new(Type::group_type_builder("test").build().unwrap()); + let schema_descr = Arc::new(SchemaDescriptor::new(tp)); + + let mut row_groups = Vec::new(); + for num_rows in &num_rows_in_row_groups { + let row_group = RowGroupMetaData::builder(schema_descr.clone()) + .set_num_rows(*num_rows) + .build() + .unwrap(); + row_groups.push(row_group); + } + + let file_meta = FileMetaData::new( + 0, + num_rows_in_row_groups.iter().sum(), + None, + None, + schema_descr, + None, + ); + ParquetMetaData::new(file_meta, row_groups) + } + + #[test] + fn prune_row_groups_by_rows_from_empty() { + let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]); + + let rows_in_row_groups = [ + (0, [5, 6, 7, 8, 9].into_iter()), + (2, [0, 1, 2, 3, 4].into_iter()), + ]; + + // The original output is empty. No row groups are pruned. + let mut output = BTreeMap::new(); + let mut filtered_row_groups = 0; + let mut filtered_rows = 0; + + ParquetReaderBuilder::prune_row_groups_by_rows( + &parquet_meta, + rows_in_row_groups.into_iter(), + &mut output, + &mut filtered_row_groups, + &mut filtered_rows, + ); + + assert!(output.is_empty()); + assert_eq!(filtered_row_groups, 0); + assert_eq!(filtered_rows, 0); + } + + #[test] + fn prune_row_groups_by_rows_from_full() { + let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]); + + let rows_in_row_groups = [ + (0, [5, 6, 7, 8, 9].into_iter()), + (2, [0, 1, 2, 3, 4].into_iter()), + ]; + + // The original output is full. + let mut output = BTreeMap::from([(0, None), (1, None), (2, None)]); + let mut filtered_row_groups = 0; + let mut filtered_rows = 0; + + ParquetReaderBuilder::prune_row_groups_by_rows( + &parquet_meta, + rows_in_row_groups.into_iter(), + &mut output, + &mut filtered_row_groups, + &mut filtered_rows, + ); + + assert_eq!( + output, + BTreeMap::from([ + ( + 0, + Some(RowSelection::from(vec![ + RowSelector::skip(5), + RowSelector::select(5), + ])) + ), + (2, Some(RowSelection::from(vec![RowSelector::select(5)]))), + ]) + ); + assert_eq!(filtered_row_groups, 1); + assert_eq!(filtered_rows, 15); + } + + #[test] + fn prune_row_groups_by_rows_from_not_full() { + let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]); + + let rows_in_row_groups = [ + (0, [5, 6, 7, 8, 9].into_iter()), + (2, [0, 1, 2, 3, 4].into_iter()), + ]; + + // The original output is not full. + let mut output = BTreeMap::from([ + ( + 0, + Some(RowSelection::from(vec![ + RowSelector::select(5), + RowSelector::skip(5), + ])), + ), + ( + 1, + Some(RowSelection::from(vec![ + RowSelector::select(5), + RowSelector::skip(5), + ])), + ), + (2, Some(RowSelection::from(vec![RowSelector::select(5)]))), + ]); + let mut filtered_row_groups = 0; + let mut filtered_rows = 0; + + ParquetReaderBuilder::prune_row_groups_by_rows( + &parquet_meta, + rows_in_row_groups.into_iter(), + &mut output, + &mut filtered_row_groups, + &mut filtered_rows, + ); + + assert_eq!( + output, + BTreeMap::from([(2, Some(RowSelection::from(vec![RowSelector::select(5)])))]) + ); + assert_eq!(filtered_row_groups, 2); + assert_eq!(filtered_rows, 10); + } + + #[test] + fn prune_row_groups_by_ranges_from_empty() { + let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]); + + let ranges_in_row_groups = [(0, Some(5..10).into_iter()), (2, Some(0..5).into_iter())]; + + // The original output is empty. No row groups are pruned. + let mut output = BTreeMap::new(); + let mut filtered_row_groups = 0; + let mut filtered_rows = 0; + + ParquetReaderBuilder::prune_row_groups_by_ranges( + &parquet_meta, + ranges_in_row_groups.into_iter(), + &mut output, + &mut filtered_row_groups, + &mut filtered_rows, + ); + + assert!(output.is_empty()); + assert_eq!(filtered_row_groups, 0); + assert_eq!(filtered_rows, 0); + } + + #[test] + fn prune_row_groups_by_ranges_from_full() { + let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]); + + let ranges_in_row_groups = [(0, Some(5..10).into_iter()), (2, Some(0..5).into_iter())]; + + // The original output is full. + let mut output = BTreeMap::from([(0, None), (1, None), (2, None)]); + let mut filtered_row_groups = 0; + let mut filtered_rows = 0; + + ParquetReaderBuilder::prune_row_groups_by_ranges( + &parquet_meta, + ranges_in_row_groups.into_iter(), + &mut output, + &mut filtered_row_groups, + &mut filtered_rows, + ); + + assert_eq!( + output, + BTreeMap::from([ + ( + 0, + Some(RowSelection::from(vec![ + RowSelector::skip(5), + RowSelector::select(5), + ])) + ), + (2, Some(RowSelection::from(vec![RowSelector::select(5)]))), + ]) + ); + assert_eq!(filtered_row_groups, 1); + assert_eq!(filtered_rows, 15); + } + + #[test] + fn prune_row_groups_by_ranges_from_not_full() { + let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]); + + let ranges_in_row_groups = [(0, Some(5..10).into_iter()), (2, Some(0..5).into_iter())]; + + // The original output is not full. + let mut output = BTreeMap::from([ + ( + 0, + Some(RowSelection::from(vec![ + RowSelector::select(5), + RowSelector::skip(5), + ])), + ), + ( + 1, + Some(RowSelection::from(vec![ + RowSelector::select(5), + RowSelector::skip(5), + ])), + ), + (2, Some(RowSelection::from(vec![RowSelector::select(5)]))), + ]); + let mut filtered_row_groups = 0; + let mut filtered_rows = 0; + + ParquetReaderBuilder::prune_row_groups_by_ranges( + &parquet_meta, + ranges_in_row_groups.into_iter(), + &mut output, + &mut filtered_row_groups, + &mut filtered_rows, + ); + + assert_eq!( + output, + BTreeMap::from([(2, Some(RowSelection::from(vec![RowSelector::select(5)])))]) + ); + assert_eq!(filtered_row_groups, 2); + assert_eq!(filtered_rows, 10); + } + + #[test] + fn prune_row_groups_by_ranges_exceed_end() { + let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]); + + // The range exceeds the end of the row group. + let ranges_in_row_groups = [(0, Some(5..10).into_iter()), (2, Some(0..10).into_iter())]; + + let mut output = BTreeMap::from([ + ( + 0, + Some(RowSelection::from(vec![ + RowSelector::select(5), + RowSelector::skip(5), + ])), + ), + ( + 1, + Some(RowSelection::from(vec![ + RowSelector::select(5), + RowSelector::skip(5), + ])), + ), + (2, Some(RowSelection::from(vec![RowSelector::select(5)]))), + ]); + let mut filtered_row_groups = 0; + let mut filtered_rows = 0; + + ParquetReaderBuilder::prune_row_groups_by_ranges( + &parquet_meta, + ranges_in_row_groups.into_iter(), + &mut output, + &mut filtered_row_groups, + &mut filtered_rows, + ); + + assert_eq!( + output, + BTreeMap::from([(2, Some(RowSelection::from(vec![RowSelector::select(5)])))]) + ); + assert_eq!(filtered_row_groups, 2); + assert_eq!(filtered_rows, 10); + } +} diff --git a/src/mito2/src/sst/parquet/row_selection.rs b/src/mito2/src/sst/parquet/row_selection.rs index 5ab961c22349..0750d7d2fd67 100644 --- a/src/mito2/src/sst/parquet/row_selection.rs +++ b/src/mito2/src/sst/parquet/row_selection.rs @@ -16,10 +16,7 @@ use std::ops::Range; use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; -type SkipRowCount = usize; - /// Converts an iterator of row ranges into a `RowSelection` by creating a sequence of `RowSelector`s. -/// Returns the `RowSelection` and the number of rows that were skipped. /// /// This function processes each range in the input and either creates a new selector or merges /// with the existing one, depending on whether the current range is contiguous with the preceding one @@ -30,23 +27,68 @@ type SkipRowCount = usize; pub(crate) fn row_selection_from_row_ranges( row_ranges: impl Iterator>, total_row_count: usize, -) -> (RowSelection, SkipRowCount) { +) -> RowSelection { let mut selectors: Vec = Vec::new(); let mut last_processed_end = 0; - let mut skip_row_count = 0; for Range { start, end } in row_ranges { + let end = end.min(total_row_count); if start > last_processed_end { add_or_merge_selector(&mut selectors, start - last_processed_end, true); - skip_row_count += start - last_processed_end; } add_or_merge_selector(&mut selectors, end - start, false); last_processed_end = end; } - skip_row_count += total_row_count.saturating_sub(last_processed_end); - (RowSelection::from(selectors), skip_row_count) + if last_processed_end < total_row_count { + add_or_merge_selector(&mut selectors, total_row_count - last_processed_end, true); + } + + RowSelection::from(selectors) +} + +/// Converts an iterator of sorted row IDs into a `RowSelection`. +/// +/// Note: the input iterator must be sorted in ascending order and +/// contain unique row IDs in the range [0, total_row_count). +pub(crate) fn row_selection_from_sorted_row_ids( + row_ids: impl Iterator, + total_row_count: usize, +) -> RowSelection { + let mut selectors: Vec = Vec::new(); + let mut last_processed_end = 0; + + for row_id in row_ids { + let start = row_id as usize; + let end = start + 1; + + if start > last_processed_end { + add_or_merge_selector(&mut selectors, start - last_processed_end, true); + } + + add_or_merge_selector(&mut selectors, end - start, false); + last_processed_end = end; + } + + if last_processed_end < total_row_count { + add_or_merge_selector(&mut selectors, total_row_count - last_processed_end, true); + } + + RowSelection::from(selectors) +} + +/// Intersects two `RowSelection`s. +pub(crate) fn intersect_row_selections( + a: Option, + b: Option, +) -> Option { + match (a, b) { + (Some(a), Some(b)) => Some(a.intersection(&b)), + (Some(a), None) => Some(a), + (None, Some(b)) => Some(b), + (None, None) => None, + } } /// Helper function to either add a new `RowSelector` to `selectors` or merge it with the last one @@ -74,55 +116,98 @@ mod tests { #[test] fn test_single_contiguous_range() { - let (selection, skipped) = row_selection_from_row_ranges(Some(5..10).into_iter(), 10); + let selection = row_selection_from_row_ranges(Some(5..10).into_iter(), 10); let expected = RowSelection::from(vec![RowSelector::skip(5), RowSelector::select(5)]); assert_eq!(selection, expected); - assert_eq!(skipped, 5); } #[test] fn test_non_contiguous_ranges() { let ranges = [1..3, 5..8]; - let (selection, skipped) = row_selection_from_row_ranges(ranges.iter().cloned(), 10); + let selection = row_selection_from_row_ranges(ranges.iter().cloned(), 10); let expected = RowSelection::from(vec![ RowSelector::skip(1), RowSelector::select(2), RowSelector::skip(2), RowSelector::select(3), + RowSelector::skip(2), ]); assert_eq!(selection, expected); - assert_eq!(skipped, 5); } #[test] fn test_empty_range() { let ranges = []; - let (selection, skipped) = row_selection_from_row_ranges(ranges.iter().cloned(), 10); - let expected = RowSelection::from(vec![]); + let selection = row_selection_from_row_ranges(ranges.iter().cloned(), 10); + let expected = RowSelection::from(vec![RowSelector::skip(10)]); assert_eq!(selection, expected); - assert_eq!(skipped, 10); } #[test] fn test_adjacent_ranges() { let ranges = [1..2, 2..3]; - let (selection, skipped) = row_selection_from_row_ranges(ranges.iter().cloned(), 10); - let expected = RowSelection::from(vec![RowSelector::skip(1), RowSelector::select(2)]); + let selection = row_selection_from_row_ranges(ranges.iter().cloned(), 10); + let expected = RowSelection::from(vec![ + RowSelector::skip(1), + RowSelector::select(2), + RowSelector::skip(7), + ]); assert_eq!(selection, expected); - assert_eq!(skipped, 8); } #[test] fn test_large_gap_between_ranges() { let ranges = [1..2, 100..101]; - let (selection, skipped) = row_selection_from_row_ranges(ranges.iter().cloned(), 10240); + let selection = row_selection_from_row_ranges(ranges.iter().cloned(), 10240); let expected = RowSelection::from(vec![ RowSelector::skip(1), RowSelector::select(1), RowSelector::skip(98), RowSelector::select(1), + RowSelector::skip(10139), ]); assert_eq!(selection, expected); - assert_eq!(skipped, 10238); + } + + #[test] + fn test_range_end_over_total_row_count() { + let ranges = Some(1..10); + let selection = row_selection_from_row_ranges(ranges.into_iter(), 5); + let expected = RowSelection::from(vec![RowSelector::skip(1), RowSelector::select(4)]); + assert_eq!(selection, expected); + } + + #[test] + fn test_row_ids_to_selection() { + let row_ids = [1, 3, 5, 7, 9].into_iter(); + let selection = row_selection_from_sorted_row_ids(row_ids, 10); + let expected = RowSelection::from(vec![ + RowSelector::skip(1), + RowSelector::select(1), + RowSelector::skip(1), + RowSelector::select(1), + RowSelector::skip(1), + RowSelector::select(1), + RowSelector::skip(1), + RowSelector::select(1), + RowSelector::skip(1), + RowSelector::select(1), + ]); + assert_eq!(selection, expected); + } + + #[test] + fn test_row_ids_to_selection_full() { + let row_ids = 0..10; + let selection = row_selection_from_sorted_row_ids(row_ids, 10); + let expected = RowSelection::from(vec![RowSelector::select(10)]); + assert_eq!(selection, expected); + } + + #[test] + fn test_row_ids_to_selection_empty() { + let selection = row_selection_from_sorted_row_ids(None.into_iter(), 10); + let expected = RowSelection::from(vec![RowSelector::skip(10)]); + assert_eq!(selection, expected); } } diff --git a/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs b/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs index ad0eccabe46a..b59c72bad709 100644 --- a/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs +++ b/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs @@ -129,7 +129,7 @@ where impl FsPuffinReader where S: Stager, - F: PuffinFileAccessor, + F: PuffinFileAccessor + Clone, { async fn init_blob_to_stager( mut reader: PuffinFileReader, @@ -164,14 +164,16 @@ where let dir_meta: DirMetadata = serde_json::from_slice(buf.as_slice()).context(DeserializeJsonSnafu)?; - let mut size = 0; + let mut tasks = vec![]; for file_meta in dir_meta.files { - let blob_meta = puffin_metadata.blobs.get(file_meta.blob_index).context( - BlobIndexOutOfBoundSnafu { + let blob_meta = puffin_metadata + .blobs + .get(file_meta.blob_index) + .context(BlobIndexOutOfBoundSnafu { index: file_meta.blob_index, max_index: puffin_metadata.blobs.len(), - }, - )?; + })? + .clone(); ensure!( blob_meta.blob_type == file_meta.key, FileKeyNotMatchSnafu { @@ -180,13 +182,24 @@ where } ); - let reader = file.blob_reader(blob_meta)?; + let reader = accessor.reader(&puffin_file_name).await?; let writer = writer_provider.writer(&file_meta.relative_path).await?; - - let compression = blob_meta.compression_codec; - size += Self::handle_decompress(reader, writer, compression).await?; + let task = common_runtime::spawn_read(async move { + let mut file = PuffinFileReader::new(reader); + let reader = file.blob_reader(&blob_meta)?; + let compression = blob_meta.compression_codec; + let size = Self::handle_decompress(reader, writer, compression).await?; + Ok(size) + }); + tasks.push(task); } + let size = futures::future::try_join_all(tasks.into_iter()) + .await + .into_iter() + .flatten() + .sum::>()?; + Ok(size) }