Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(fulltext_index): integrate full-text indexer with parquet reader #4348

Merged
merged 12 commits into from
Jul 15, 2024
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions src/index/src/fulltext_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ use serde::{Deserialize, Serialize};

pub mod create;
pub mod error;
pub mod search;

#[cfg(test)]
mod tests;

/// Configuration for fulltext index.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
Expand Down
2 changes: 1 addition & 1 deletion src/index/src/fulltext_index/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
mod tantivy;

use async_trait::async_trait;
pub use tantivy::TantivyFulltextIndexCreator;
pub use tantivy::{TantivyFulltextIndexCreator, TEXT_FIELD_NAME};

use crate::fulltext_index::error::Result;

Expand Down
10 changes: 10 additions & 0 deletions src/index/src/fulltext_index/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ pub enum Error {
location: Location,
},

#[snafu(display("Tantivy parser error"))]
TantivyParser {
#[snafu(source)]
error: tantivy::query::QueryParserError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Operate on a finished creator"))]
Finished {
#[snafu(implicit)]
Expand All @@ -60,6 +68,8 @@ impl ErrorExt for Error {

match self {
Tantivy { .. } => StatusCode::Internal,
TantivyParser { .. } => StatusCode::InvalidSyntax,

Io { .. } | Finished { .. } => StatusCode::Unexpected,

External { source, .. } => source.status_code(),
Expand Down
30 changes: 30 additions & 0 deletions src/index/src/fulltext_index/search.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod tantivy;

use std::collections::BTreeSet;

use async_trait::async_trait;

use crate::fulltext_index::error::Result;
pub use crate::fulltext_index::search::tantivy::TantivyFulltextIndexSearcher;

pub type RowId = u32;

/// `FulltextIndexSearcher` is a trait for searching fulltext index.
#[async_trait]
pub trait FulltextIndexSearcher {
async fn search(&self, query: &str) -> Result<BTreeSet<RowId>>;
}
85 changes: 85 additions & 0 deletions src/index/src/fulltext_index/search/tantivy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeSet;
use std::path::Path;
use std::time::Instant;

use async_trait::async_trait;
use common_telemetry::debug;
use snafu::ResultExt;
use tantivy::collector::DocSetCollector;
use tantivy::query::QueryParser;
use tantivy::schema::Field;
use tantivy::{Index, IndexReader, ReloadPolicy};

use crate::fulltext_index::create::TEXT_FIELD_NAME;
use crate::fulltext_index::error::{Result, TantivyParserSnafu, TantivySnafu};
use crate::fulltext_index::search::{FulltextIndexSearcher, RowId};

/// `TantivyFulltextIndexSearcher` is a searcher using Tantivy.
pub struct TantivyFulltextIndexSearcher {
/// Tanitvy index.
index: Index,
/// Tanitvy index reader.
reader: IndexReader,
/// The default field used to build `QueryParser`
default_field: Field,
}

impl TantivyFulltextIndexSearcher {
/// Creates a new `TantivyFulltextIndexSearcher`.
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
let now = Instant::now();

let index = Index::open_in_dir(path.as_ref()).context(TantivySnafu)?;
let reader = index
.reader_builder()
.reload_policy(ReloadPolicy::Manual)
.num_warming_threads(0)
.try_into()
.context(TantivySnafu)?;
let default_field = index
.schema()
.get_field(TEXT_FIELD_NAME)
.context(TantivySnafu)?;

debug!(
"Opened tantivy index on {:?} in {:?}",
path.as_ref(),
now.elapsed()
);

Ok(Self {
index,
reader,
default_field,
})
}
}

#[async_trait]
impl FulltextIndexSearcher for TantivyFulltextIndexSearcher {
async fn search(&self, query: &str) -> Result<BTreeSet<RowId>> {
let searcher = self.reader.searcher();
let query_parser = QueryParser::for_index(&self.index, vec![self.default_field]);
let query = query_parser
.parse_query(query)
.context(TantivyParserSnafu)?;
let docs = searcher
.search(&query, &DocSetCollector)
.context(TantivySnafu)?;
Ok(docs.into_iter().map(|d| d.doc_id).collect())
}
}
167 changes: 167 additions & 0 deletions src/index/src/fulltext_index/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeSet;

use common_test_util::temp_dir::{create_temp_dir, TempDir};

use crate::fulltext_index::create::{FulltextIndexCreator, TantivyFulltextIndexCreator};
use crate::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher};
use crate::fulltext_index::{Analyzer, Config};

async fn create_index(prefix: &str, texts: Vec<&str>, config: Config) -> TempDir {
let tempdir = create_temp_dir(prefix);

let mut creator = TantivyFulltextIndexCreator::new(tempdir.path(), config, 1024 * 1024)
.await
.unwrap();

for text in texts {
creator.push_text(text).await.unwrap();
}

creator.finish().await.unwrap();
tempdir
}

async fn test_search(
prefix: &str,
config: Config,
texts: Vec<&str>,
query: &str,
expected: impl IntoIterator<Item = RowId>,
) {
let index_path = create_index(prefix, texts, config).await;

let searcher = TantivyFulltextIndexSearcher::new(index_path.path()).unwrap();
let results = searcher.search(query).await.unwrap();

let expected = expected.into_iter().collect::<BTreeSet<_>>();
assert_eq!(results, expected);
}

#[tokio::test]
async fn test_simple_term() {
test_search(
"test_simple_term_",
Config::default(),
vec![
"This is a sample text containing Barack Obama",
"Another document mentioning Barack",
],
"Barack Obama",
[0, 1],
)
.await;
}

#[tokio::test]
async fn test_negative_term() {
test_search(
"test_negative_term_",
Config::default(),
vec!["apple is a fruit", "I like apple", "fruit is healthy"],
"apple -fruit",
[1],
)
.await;
}

#[tokio::test]
async fn test_must_term() {
test_search(
"test_must_term_",
Config::default(),
vec![
"apple is tasty",
"I love apples and fruits",
"apple and fruit are good",
],
"+apple +fruit",
[2],
)
.await;
}

#[tokio::test]
async fn test_boolean_operators() {
test_search(
"test_boolean_operators_",
Config::default(),
vec!["a b c", "a b", "b c", "c"],
"a AND b OR c",
[0, 1, 2, 3],
)
.await;
}

#[tokio::test]
async fn test_phrase_term() {
test_search(
"test_phrase_term_",
Config::default(),
vec![
"This is a sample text containing Barack Obama",
"Another document mentioning Barack",
],
"\"Barack Obama\"",
[0],
)
.await;
}

#[tokio::test]
async fn test_config_english_analyzer_case_insensitive() {
test_search(
"test_config_english_analyzer_case_insensitive_",
Config {
case_sensitive: false,
..Config::default()
},
vec!["Banana is a fruit", "I like apple", "Fruit is healthy"],
"banana",
[0],
)
.await;
}

#[tokio::test]
async fn test_config_english_analyzer_case_sensitive() {
test_search(
"test_config_english_analyzer_case_sensitive_",
Config {
case_sensitive: true,
..Config::default()
},
vec!["Banana is a fruit", "I like apple", "Fruit is healthy"],
"banana",
[],
)
.await;
}

#[tokio::test]
async fn test_config_chinese_analyzer() {
test_search(
"test_config_chinese_analyzer_",
Config {
analyzer: Analyzer::Chinese,
..Default::default()
},
vec!["苹果是一种水果", "我喜欢苹果", "水果很健康"],
"苹果",
[0, 1],
)
.await;
}
2 changes: 2 additions & 0 deletions src/mito2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ tokio-util.workspace = true
uuid.workspace = true

[dev-dependencies]
common-function.workspace = true
common-procedure-test.workspace = true
common-test-util.workspace = true
criterion = "0.4"
Expand All @@ -82,6 +83,7 @@ object-store = { workspace = true, features = ["services-memory"] }
rskafka.workspace = true
rstest.workspace = true
rstest_reuse.workspace = true
session.workspace = true
toml.workspace = true

[[bench]]
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ impl EngineInner {
)
.with_parallelism(scan_parallelism)
.with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
.with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
.with_start_time(query_start);

Ok(scan_region)
Expand Down
Loading