Skip to content

Commit

Permalink
feat(fulltext_index): integrate full-text indexer with sst writer (#4302
Browse files Browse the repository at this point in the history
)

* feat(fulltext_index): integrate full-text indexer with sst writer

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: delay building puffin writer

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* test: indexer test

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: add abort on empty indexer

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* config: indicates default mode

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* config: introduce "auto" and "unlimited" as mem threshold

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: polish

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* doc: comment about push empty string

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
  • Loading branch information
zhongzc committed Jul 7, 2024
1 parent 3f4928e commit a710676
Show file tree
Hide file tree
Showing 22 changed files with 967 additions and 130 deletions.
26 changes: 18 additions & 8 deletions config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,16 @@
| `region_engine.mito.index.aux_path` | String | `""` | Auxiliary directory path for the index in filesystem, used to store intermediate files for<br/>creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`.<br/>The default name for this directory is `index_intermediate` for backward compatibility.<br/><br/>This path contains two subdirectories:<br/>- `__intm`: for storing intermediate files used during creating index.<br/>- `staging`: for storing staging files used during searching index. |
| `region_engine.mito.index.staging_size` | String | `2GB` | The max capacity of the staging directory. |
| `region_engine.mito.inverted_index` | -- | -- | The options for inverted index in Mito engine. |
| `region_engine.mito.inverted_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically<br/>- `disable`: never |
| `region_engine.mito.inverted_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically<br/>- `disable`: never |
| `region_engine.mito.inverted_index.apply_on_query` | String | `auto` | Whether to apply the index on query<br/>- `auto`: automatically<br/>- `disable`: never |
| `region_engine.mito.inverted_index.mem_threshold_on_create` | String | `64M` | Memory threshold for performing an external sort during index creation.<br/>Setting to empty will disable external sorting, forcing all sorting operations to happen in memory. |
| `region_engine.mito.inverted_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.inverted_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.inverted_index.apply_on_query` | String | `auto` | Whether to apply the index on query<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.inverted_index.mem_threshold_on_create` | String | `auto` | Memory threshold for performing an external sort during index creation.<br/>- `auto`: automatically determine the threshold based on the system memory size (default)<br/>- `unlimited`: no memory limit<br/>- `[size]` e.g. `64MB`: fixed memory threshold |
| `region_engine.mito.inverted_index.intermediate_path` | String | `""` | Deprecated, use `region_engine.mito.index.aux_path` instead. |
| `region_engine.mito.fulltext_index` | -- | -- | The options for full-text index in Mito engine. |
| `region_engine.mito.fulltext_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.apply_on_query` | String | `auto` | Whether to apply the index on query<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.mem_threshold_on_create` | String | `auto` | Memory threshold for index creation.<br/>- `auto`: automatically determine the threshold based on the system memory size (default)<br/>- `unlimited`: no memory limit<br/>- `[size]` e.g. `64MB`: fixed memory threshold |
| `region_engine.mito.memtable` | -- | -- | -- |
| `region_engine.mito.memtable.type` | String | `time_series` | Memtable type.<br/>- `time_series`: time-series memtable<br/>- `partition_tree`: partition tree memtable (experimental) |
| `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.<br/>Only available for `partition_tree` memtable. |
Expand Down Expand Up @@ -408,11 +413,16 @@
| `region_engine.mito.index.aux_path` | String | `""` | Auxiliary directory path for the index in filesystem, used to store intermediate files for<br/>creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`.<br/>The default name for this directory is `index_intermediate` for backward compatibility.<br/><br/>This path contains two subdirectories:<br/>- `__intm`: for storing intermediate files used during creating index.<br/>- `staging`: for storing staging files used during searching index. |
| `region_engine.mito.index.staging_size` | String | `2GB` | The max capacity of the staging directory. |
| `region_engine.mito.inverted_index` | -- | -- | The options for inverted index in Mito engine. |
| `region_engine.mito.inverted_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically<br/>- `disable`: never |
| `region_engine.mito.inverted_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically<br/>- `disable`: never |
| `region_engine.mito.inverted_index.apply_on_query` | String | `auto` | Whether to apply the index on query<br/>- `auto`: automatically<br/>- `disable`: never |
| `region_engine.mito.inverted_index.mem_threshold_on_create` | String | `64M` | Memory threshold for performing an external sort during index creation.<br/>Setting to empty will disable external sorting, forcing all sorting operations to happen in memory. |
| `region_engine.mito.inverted_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.inverted_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.inverted_index.apply_on_query` | String | `auto` | Whether to apply the index on query<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.inverted_index.mem_threshold_on_create` | String | `auto` | Memory threshold for performing an external sort during index creation.<br/>- `auto`: automatically determine the threshold based on the system memory size (default)<br/>- `unlimited`: no memory limit<br/>- `[size]` e.g. `64MB`: fixed memory threshold |
| `region_engine.mito.inverted_index.intermediate_path` | String | `""` | Deprecated, use `region_engine.mito.index.aux_path` instead. |
| `region_engine.mito.fulltext_index` | -- | -- | The options for full-text index in Mito engine. |
| `region_engine.mito.fulltext_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.apply_on_query` | String | `auto` | Whether to apply the index on query<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.mem_threshold_on_create` | String | `auto` | Memory threshold for index creation.<br/>- `auto`: automatically determine the threshold based on the system memory size (default)<br/>- `unlimited`: no memory limit<br/>- `[size]` e.g. `64MB`: fixed memory threshold |
| `region_engine.mito.memtable` | -- | -- | -- |
| `region_engine.mito.memtable.type` | String | `time_series` | Memtable type.<br/>- `time_series`: time-series memtable<br/>- `partition_tree`: partition tree memtable (experimental) |
| `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.<br/>Only available for `partition_tree` memtable. |
Expand Down
36 changes: 31 additions & 5 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -413,27 +413,53 @@ staging_size = "2GB"
[region_engine.mito.inverted_index]

## Whether to create the index on flush.
## - `auto`: automatically
## - `auto`: automatically (default)
## - `disable`: never
create_on_flush = "auto"

## Whether to create the index on compaction.
## - `auto`: automatically
## - `auto`: automatically (default)
## - `disable`: never
create_on_compaction = "auto"

## Whether to apply the index on query
## - `auto`: automatically
## - `auto`: automatically (default)
## - `disable`: never
apply_on_query = "auto"

## Memory threshold for performing an external sort during index creation.
## Setting to empty will disable external sorting, forcing all sorting operations to happen in memory.
mem_threshold_on_create = "64M"
## - `auto`: automatically determine the threshold based on the system memory size (default)
## - `unlimited`: no memory limit
## - `[size]` e.g. `64MB`: fixed memory threshold
mem_threshold_on_create = "auto"

## Deprecated, use `region_engine.mito.index.aux_path` instead.
intermediate_path = ""

## The options for full-text index in Mito engine.
[region_engine.mito.fulltext_index]

## Whether to create the index on flush.
## - `auto`: automatically (default)
## - `disable`: never
create_on_flush = "auto"

## Whether to create the index on compaction.
## - `auto`: automatically (default)
## - `disable`: never
create_on_compaction = "auto"

## Whether to apply the index on query
## - `auto`: automatically (default)
## - `disable`: never
apply_on_query = "auto"

## Memory threshold for index creation.
## - `auto`: automatically determine the threshold based on the system memory size (default)
## - `unlimited`: no memory limit
## - `[size]` e.g. `64MB`: fixed memory threshold
mem_threshold_on_create = "auto"

[region_engine.mito.memtable]
## Memtable type.
## - `time_series`: time-series memtable
Expand Down
36 changes: 31 additions & 5 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -436,27 +436,53 @@ staging_size = "2GB"
[region_engine.mito.inverted_index]

## Whether to create the index on flush.
## - `auto`: automatically
## - `auto`: automatically (default)
## - `disable`: never
create_on_flush = "auto"

## Whether to create the index on compaction.
## - `auto`: automatically
## - `auto`: automatically (default)
## - `disable`: never
create_on_compaction = "auto"

## Whether to apply the index on query
## - `auto`: automatically
## - `auto`: automatically (default)
## - `disable`: never
apply_on_query = "auto"

## Memory threshold for performing an external sort during index creation.
## Setting to empty will disable external sorting, forcing all sorting operations to happen in memory.
mem_threshold_on_create = "64M"
## - `auto`: automatically determine the threshold based on the system memory size (default)
## - `unlimited`: no memory limit
## - `[size]` e.g. `64MB`: fixed memory threshold
mem_threshold_on_create = "auto"

## Deprecated, use `region_engine.mito.index.aux_path` instead.
intermediate_path = ""

## The options for full-text index in Mito engine.
[region_engine.mito.fulltext_index]

## Whether to create the index on flush.
## - `auto`: automatically (default)
## - `disable`: never
create_on_flush = "auto"

## Whether to create the index on compaction.
## - `auto`: automatically (default)
## - `disable`: never
create_on_compaction = "auto"

## Whether to apply the index on query
## - `auto`: automatically (default)
## - `disable`: never
apply_on_query = "auto"

## Memory threshold for index creation.
## - `auto`: automatically determine the threshold based on the system memory size (default)
## - `unlimited`: no memory limit
## - `[size]` e.g. `64MB`: fixed memory threshold
mem_threshold_on_create = "auto"

[region_engine.mito.memtable]
## Memtable type.
## - `time_series`: time-series memtable
Expand Down
5 changes: 4 additions & 1 deletion src/datatypes/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ use datafusion_common::DFSchemaRef;
use snafu::{ensure, ResultExt};

use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, Result};
pub use crate::schema::column_schema::{ColumnSchema, Metadata, COMMENT_KEY, TIME_INDEX_KEY};
pub use crate::schema::column_schema::{
ColumnSchema, FulltextAnalyzer, FulltextOptions, Metadata, COMMENT_KEY, FULLTEXT_KEY,
TIME_INDEX_KEY,
};
pub use crate::schema::constraint::ColumnDefaultConstraint;
pub use crate::schema::raw::RawSchema;

Expand Down
33 changes: 33 additions & 0 deletions src/datatypes/src/schema/column_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ pub const TIME_INDEX_KEY: &str = "greptime:time_index";
pub const COMMENT_KEY: &str = "greptime:storage:comment";
/// Key used to store default constraint in arrow field's metadata.
const DEFAULT_CONSTRAINT_KEY: &str = "greptime:default_constraint";
/// Key used to store fulltext options in column metadata.
pub const FULLTEXT_KEY: &str = "greptime:fulltext";

/// Schema of a column, used as an immutable struct.
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
Expand Down Expand Up @@ -240,6 +242,18 @@ impl ColumnSchema {
}
}
}

/// Retrieves the fulltext options for the column.
pub fn fulltext_options(&self) -> Result<Option<FulltextOptions>> {
match self.metadata.get(FULLTEXT_KEY) {
None => Ok(None),
Some(json) => {
let options =
serde_json::from_str(json).context(error::DeserializeSnafu { json })?;
Ok(Some(options))
}
}
}
}

impl TryFrom<&Field> for ColumnSchema {
Expand Down Expand Up @@ -296,6 +310,25 @@ impl TryFrom<&ColumnSchema> for Field {
}
}

/// Fulltext options for a column.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct FulltextOptions {
/// Whether the fulltext index is enabled.
pub enable: bool,
/// The fulltext analyzer to use.
pub analyzer: FulltextAnalyzer,
/// Whether the fulltext index is case-sensitive.
pub case_sensitive: bool,
}

/// Fulltext analyzer.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub enum FulltextAnalyzer {
#[default]
English,
Chinese,
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down
4 changes: 3 additions & 1 deletion src/mito2/src/access_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use store_api::metadata::RegionMetadataRef;

use crate::cache::write_cache::SstUploadRequest;
use crate::cache::CacheManagerRef;
use crate::config::InvertedIndexConfig;
use crate::config::{FulltextIndexConfig, InvertedIndexConfig};
use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result};
use crate::read::Source;
use crate::region::options::IndexOptions;
Expand Down Expand Up @@ -153,6 +153,7 @@ impl AccessLayer {
intermediate_manager: self.intermediate_manager.clone(),
index_options: request.index_options,
inverted_index_config: request.inverted_index_config,
fulltext_index_config: request.fulltext_index_config,
}
.build()
.await;
Expand Down Expand Up @@ -204,6 +205,7 @@ pub(crate) struct SstWriteRequest {
/// Configs for index
pub(crate) index_options: IndexOptions,
pub(crate) inverted_index_config: InvertedIndexConfig,
pub(crate) fulltext_index_config: FulltextIndexConfig,
}

/// Creates a fs object store with atomic write dir.
Expand Down
3 changes: 3 additions & 0 deletions src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ impl WriteCache {
intermediate_manager: self.intermediate_manager.clone(),
index_options: write_request.index_options,
inverted_index_config: write_request.inverted_index_config,
fulltext_index_config: write_request.fulltext_index_config,
}
.build()
.await;
Expand Down Expand Up @@ -307,6 +308,7 @@ mod tests {
cache_manager: Default::default(),
index_options: IndexOptions::default(),
inverted_index_config: Default::default(),
fulltext_index_config: Default::default(),
};

let upload_request = SstUploadRequest {
Expand Down Expand Up @@ -391,6 +393,7 @@ mod tests {
cache_manager: cache_manager.clone(),
index_options: IndexOptions::default(),
inverted_index_config: Default::default(),
fulltext_index_config: Default::default(),
};
let write_opts = WriteOptions {
row_group_size: 512,
Expand Down
5 changes: 5 additions & 0 deletions src/mito2/src/compaction/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ impl Compactor for DefaultCompactor {
let append_mode = compaction_region.current_version.options.append_mode;
let merge_mode = compaction_region.current_version.options.merge_mode();
let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
futs.push(async move {
let reader = CompactionSstReaderBuilder {
metadata: region_metadata.clone(),
Expand All @@ -299,6 +300,7 @@ impl Compactor for DefaultCompactor {
storage,
index_options,
inverted_index_config,
fulltext_index_config,
},
&write_opts,
)
Expand All @@ -314,6 +316,9 @@ impl Compactor for DefaultCompactor {
if sst_info.index_metadata.inverted_index.is_available() {
indexes.push(IndexType::InvertedIndex);
}
if sst_info.index_metadata.fulltext_index.is_available() {
indexes.push(IndexType::FulltextIndex);
}
indexes
},
index_file_size: sst_info.index_metadata.file_size,
Expand Down
Loading

0 comments on commit a710676

Please sign in to comment.