diff --git a/config/config.md b/config/config.md index 132fc7aff8dc..e9f139d9bc88 100644 --- a/config/config.md +++ b/config/config.md @@ -124,11 +124,16 @@ | `region_engine.mito.index.aux_path` | String | `""` | Auxiliary directory path for the index in filesystem, used to store intermediate files for
creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`.
The default name for this directory is `index_intermediate` for backward compatibility.

This path contains two subdirectories:
- `__intm`: for storing intermediate files used during creating index.
- `staging`: for storing staging files used during searching index. | | `region_engine.mito.index.staging_size` | String | `2GB` | The max capacity of the staging directory. | | `region_engine.mito.inverted_index` | -- | -- | The options for inverted index in Mito engine. | -| `region_engine.mito.inverted_index.create_on_flush` | String | `auto` | Whether to create the index on flush.
- `auto`: automatically
- `disable`: never | -| `region_engine.mito.inverted_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.
- `auto`: automatically
- `disable`: never | -| `region_engine.mito.inverted_index.apply_on_query` | String | `auto` | Whether to apply the index on query
- `auto`: automatically
- `disable`: never | -| `region_engine.mito.inverted_index.mem_threshold_on_create` | String | `64M` | Memory threshold for performing an external sort during index creation.
Setting to empty will disable external sorting, forcing all sorting operations to happen in memory. | +| `region_engine.mito.inverted_index.create_on_flush` | String | `auto` | Whether to create the index on flush.
- `auto`: automatically (default)
- `disable`: never | +| `region_engine.mito.inverted_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.
- `auto`: automatically (default)
- `disable`: never | +| `region_engine.mito.inverted_index.apply_on_query` | String | `auto` | Whether to apply the index on query
- `auto`: automatically (default)
- `disable`: never | +| `region_engine.mito.inverted_index.mem_threshold_on_create` | String | `auto` | Memory threshold for performing an external sort during index creation.
- `auto`: automatically determine the threshold based on the system memory size (default)
- `unlimited`: no memory limit
- `[size]` e.g. `64MB`: fixed memory threshold | | `region_engine.mito.inverted_index.intermediate_path` | String | `""` | Deprecated, use `region_engine.mito.index.aux_path` instead. | +| `region_engine.mito.fulltext_index` | -- | -- | The options for full-text index in Mito engine. | +| `region_engine.mito.fulltext_index.create_on_flush` | String | `auto` | Whether to create the index on flush.
- `auto`: automatically (default)
- `disable`: never | +| `region_engine.mito.fulltext_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.
- `auto`: automatically (default)
- `disable`: never | +| `region_engine.mito.fulltext_index.apply_on_query` | String | `auto` | Whether to apply the index on query
- `auto`: automatically (default)
- `disable`: never | +| `region_engine.mito.fulltext_index.mem_threshold_on_create` | String | `auto` | Memory threshold for index creation.
- `auto`: automatically determine the threshold based on the system memory size (default)
- `unlimited`: no memory limit
- `[size]` e.g. `64MB`: fixed memory threshold | | `region_engine.mito.memtable` | -- | -- | -- | | `region_engine.mito.memtable.type` | String | `time_series` | Memtable type.
- `time_series`: time-series memtable
- `partition_tree`: partition tree memtable (experimental) | | `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.
Only available for `partition_tree` memtable. | @@ -408,11 +413,16 @@ | `region_engine.mito.index.aux_path` | String | `""` | Auxiliary directory path for the index in filesystem, used to store intermediate files for
creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`.
The default name for this directory is `index_intermediate` for backward compatibility.

This path contains two subdirectories:
- `__intm`: for storing intermediate files used during creating index.
- `staging`: for storing staging files used during searching index. | | `region_engine.mito.index.staging_size` | String | `2GB` | The max capacity of the staging directory. | | `region_engine.mito.inverted_index` | -- | -- | The options for inverted index in Mito engine. | -| `region_engine.mito.inverted_index.create_on_flush` | String | `auto` | Whether to create the index on flush.
- `auto`: automatically
- `disable`: never | -| `region_engine.mito.inverted_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.
- `auto`: automatically
- `disable`: never | -| `region_engine.mito.inverted_index.apply_on_query` | String | `auto` | Whether to apply the index on query
- `auto`: automatically
- `disable`: never | -| `region_engine.mito.inverted_index.mem_threshold_on_create` | String | `64M` | Memory threshold for performing an external sort during index creation.
Setting to empty will disable external sorting, forcing all sorting operations to happen in memory. | +| `region_engine.mito.inverted_index.create_on_flush` | String | `auto` | Whether to create the index on flush.
- `auto`: automatically (default)
- `disable`: never | +| `region_engine.mito.inverted_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.
- `auto`: automatically (default)
- `disable`: never | +| `region_engine.mito.inverted_index.apply_on_query` | String | `auto` | Whether to apply the index on query
- `auto`: automatically (default)
- `disable`: never | +| `region_engine.mito.inverted_index.mem_threshold_on_create` | String | `auto` | Memory threshold for performing an external sort during index creation.
- `auto`: automatically determine the threshold based on the system memory size (default)
- `unlimited`: no memory limit
- `[size]` e.g. `64MB`: fixed memory threshold | | `region_engine.mito.inverted_index.intermediate_path` | String | `""` | Deprecated, use `region_engine.mito.index.aux_path` instead. | +| `region_engine.mito.fulltext_index` | -- | -- | The options for full-text index in Mito engine. | +| `region_engine.mito.fulltext_index.create_on_flush` | String | `auto` | Whether to create the index on flush.
- `auto`: automatically (default)
- `disable`: never | +| `region_engine.mito.fulltext_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.
- `auto`: automatically (default)
- `disable`: never | +| `region_engine.mito.fulltext_index.apply_on_query` | String | `auto` | Whether to apply the index on query
- `auto`: automatically (default)
- `disable`: never | +| `region_engine.mito.fulltext_index.mem_threshold_on_create` | String | `auto` | Memory threshold for index creation.
- `auto`: automatically determine the threshold based on the system memory size (default)
- `unlimited`: no memory limit
- `[size]` e.g. `64MB`: fixed memory threshold | | `region_engine.mito.memtable` | -- | -- | -- | | `region_engine.mito.memtable.type` | String | `time_series` | Memtable type.
- `time_series`: time-series memtable
- `partition_tree`: partition tree memtable (experimental) | | `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.
Only available for `partition_tree` memtable. | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index c12606110f6e..1f06271000e1 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -413,27 +413,53 @@ staging_size = "2GB" [region_engine.mito.inverted_index] ## Whether to create the index on flush. -## - `auto`: automatically +## - `auto`: automatically (default) ## - `disable`: never create_on_flush = "auto" ## Whether to create the index on compaction. -## - `auto`: automatically +## - `auto`: automatically (default) ## - `disable`: never create_on_compaction = "auto" ## Whether to apply the index on query -## - `auto`: automatically +## - `auto`: automatically (default) ## - `disable`: never apply_on_query = "auto" ## Memory threshold for performing an external sort during index creation. -## Setting to empty will disable external sorting, forcing all sorting operations to happen in memory. -mem_threshold_on_create = "64M" +## - `auto`: automatically determine the threshold based on the system memory size (default) +## - `unlimited`: no memory limit +## - `[size]` e.g. `64MB`: fixed memory threshold +mem_threshold_on_create = "auto" ## Deprecated, use `region_engine.mito.index.aux_path` instead. intermediate_path = "" +## The options for full-text index in Mito engine. +[region_engine.mito.fulltext_index] + +## Whether to create the index on flush. +## - `auto`: automatically (default) +## - `disable`: never +create_on_flush = "auto" + +## Whether to create the index on compaction. +## - `auto`: automatically (default) +## - `disable`: never +create_on_compaction = "auto" + +## Whether to apply the index on query +## - `auto`: automatically (default) +## - `disable`: never +apply_on_query = "auto" + +## Memory threshold for index creation. +## - `auto`: automatically determine the threshold based on the system memory size (default) +## - `unlimited`: no memory limit +## - `[size]` e.g. `64MB`: fixed memory threshold +mem_threshold_on_create = "auto" + [region_engine.mito.memtable] ## Memtable type. ## - `time_series`: time-series memtable diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 32c1840eeaff..2e8831951125 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -436,27 +436,53 @@ staging_size = "2GB" [region_engine.mito.inverted_index] ## Whether to create the index on flush. -## - `auto`: automatically +## - `auto`: automatically (default) ## - `disable`: never create_on_flush = "auto" ## Whether to create the index on compaction. -## - `auto`: automatically +## - `auto`: automatically (default) ## - `disable`: never create_on_compaction = "auto" ## Whether to apply the index on query -## - `auto`: automatically +## - `auto`: automatically (default) ## - `disable`: never apply_on_query = "auto" ## Memory threshold for performing an external sort during index creation. -## Setting to empty will disable external sorting, forcing all sorting operations to happen in memory. -mem_threshold_on_create = "64M" +## - `auto`: automatically determine the threshold based on the system memory size (default) +## - `unlimited`: no memory limit +## - `[size]` e.g. `64MB`: fixed memory threshold +mem_threshold_on_create = "auto" ## Deprecated, use `region_engine.mito.index.aux_path` instead. intermediate_path = "" +## The options for full-text index in Mito engine. +[region_engine.mito.fulltext_index] + +## Whether to create the index on flush. +## - `auto`: automatically (default) +## - `disable`: never +create_on_flush = "auto" + +## Whether to create the index on compaction. +## - `auto`: automatically (default) +## - `disable`: never +create_on_compaction = "auto" + +## Whether to apply the index on query +## - `auto`: automatically (default) +## - `disable`: never +apply_on_query = "auto" + +## Memory threshold for index creation. +## - `auto`: automatically determine the threshold based on the system memory size (default) +## - `unlimited`: no memory limit +## - `[size]` e.g. `64MB`: fixed memory threshold +mem_threshold_on_create = "auto" + [region_engine.mito.memtable] ## Memtable type. ## - `time_series`: time-series memtable diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index 2e58b87cd817..3bb35a595f4a 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -25,7 +25,10 @@ use datafusion_common::DFSchemaRef; use snafu::{ensure, ResultExt}; use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, Result}; -pub use crate::schema::column_schema::{ColumnSchema, Metadata, COMMENT_KEY, TIME_INDEX_KEY}; +pub use crate::schema::column_schema::{ + ColumnSchema, FulltextAnalyzer, FulltextOptions, Metadata, COMMENT_KEY, FULLTEXT_KEY, + TIME_INDEX_KEY, +}; pub use crate::schema::constraint::ColumnDefaultConstraint; pub use crate::schema::raw::RawSchema; diff --git a/src/datatypes/src/schema/column_schema.rs b/src/datatypes/src/schema/column_schema.rs index b263fed9a743..60b785b706d9 100644 --- a/src/datatypes/src/schema/column_schema.rs +++ b/src/datatypes/src/schema/column_schema.rs @@ -32,6 +32,8 @@ pub const TIME_INDEX_KEY: &str = "greptime:time_index"; pub const COMMENT_KEY: &str = "greptime:storage:comment"; /// Key used to store default constraint in arrow field's metadata. const DEFAULT_CONSTRAINT_KEY: &str = "greptime:default_constraint"; +/// Key used to store fulltext options in column metadata. +pub const FULLTEXT_KEY: &str = "greptime:fulltext"; /// Schema of a column, used as an immutable struct. #[derive(Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -240,6 +242,18 @@ impl ColumnSchema { } } } + + /// Retrieves the fulltext options for the column. + pub fn fulltext_options(&self) -> Result> { + match self.metadata.get(FULLTEXT_KEY) { + None => Ok(None), + Some(json) => { + let options = + serde_json::from_str(json).context(error::DeserializeSnafu { json })?; + Ok(Some(options)) + } + } + } } impl TryFrom<&Field> for ColumnSchema { @@ -296,6 +310,25 @@ impl TryFrom<&ColumnSchema> for Field { } } +/// Fulltext options for a column. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] +pub struct FulltextOptions { + /// Whether the fulltext index is enabled. + pub enable: bool, + /// The fulltext analyzer to use. + pub analyzer: FulltextAnalyzer, + /// Whether the fulltext index is case-sensitive. + pub case_sensitive: bool, +} + +/// Fulltext analyzer. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] +pub enum FulltextAnalyzer { + #[default] + English, + Chinese, +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index c77d1fd05e4e..47f5af9241e6 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -22,7 +22,7 @@ use store_api::metadata::RegionMetadataRef; use crate::cache::write_cache::SstUploadRequest; use crate::cache::CacheManagerRef; -use crate::config::InvertedIndexConfig; +use crate::config::{FulltextIndexConfig, InvertedIndexConfig}; use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result}; use crate::read::Source; use crate::region::options::IndexOptions; @@ -153,6 +153,7 @@ impl AccessLayer { intermediate_manager: self.intermediate_manager.clone(), index_options: request.index_options, inverted_index_config: request.inverted_index_config, + fulltext_index_config: request.fulltext_index_config, } .build() .await; @@ -204,6 +205,7 @@ pub(crate) struct SstWriteRequest { /// Configs for index pub(crate) index_options: IndexOptions, pub(crate) inverted_index_config: InvertedIndexConfig, + pub(crate) fulltext_index_config: FulltextIndexConfig, } /// Creates a fs object store with atomic write dir. diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index c47846fd4d55..ff544662f905 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -130,6 +130,7 @@ impl WriteCache { intermediate_manager: self.intermediate_manager.clone(), index_options: write_request.index_options, inverted_index_config: write_request.inverted_index_config, + fulltext_index_config: write_request.fulltext_index_config, } .build() .await; @@ -307,6 +308,7 @@ mod tests { cache_manager: Default::default(), index_options: IndexOptions::default(), inverted_index_config: Default::default(), + fulltext_index_config: Default::default(), }; let upload_request = SstUploadRequest { @@ -391,6 +393,7 @@ mod tests { cache_manager: cache_manager.clone(), index_options: IndexOptions::default(), inverted_index_config: Default::default(), + fulltext_index_config: Default::default(), }; let write_opts = WriteOptions { row_group_size: 512, diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index b6821006efd4..cd49af7b1172 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -275,6 +275,7 @@ impl Compactor for DefaultCompactor { let append_mode = compaction_region.current_version.options.append_mode; let merge_mode = compaction_region.current_version.options.merge_mode(); let inverted_index_config = compaction_region.engine_config.inverted_index.clone(); + let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone(); futs.push(async move { let reader = CompactionSstReaderBuilder { metadata: region_metadata.clone(), @@ -299,6 +300,7 @@ impl Compactor for DefaultCompactor { storage, index_options, inverted_index_config, + fulltext_index_config, }, &write_opts, ) @@ -314,6 +316,9 @@ impl Compactor for DefaultCompactor { if sst_info.index_metadata.inverted_index.is_available() { indexes.push(IndexType::InvertedIndex); } + if sst_info.index_metadata.fulltext_index.is_available() { + indexes.push(IndexType::FulltextIndex); + } indexes }, index_file_size: sst_info.index_metadata.file_size, diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 334d2de752a3..ca44342a6d4c 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -22,7 +22,7 @@ use common_base::readable_size::ReadableSize; use common_telemetry::warn; use object_store::util::join_dir; use serde::{Deserialize, Serialize}; -use serde_with::{serde_as, NoneAsEmptyString}; +use serde_with::serde_as; use crate::error::Result; use crate::memtable::MemtableConfig; @@ -41,6 +41,8 @@ const GLOBAL_WRITE_BUFFER_SIZE_FACTOR: u64 = 8; const SST_META_CACHE_SIZE_FACTOR: u64 = 32; /// Use `1/MEM_CACHE_SIZE_FACTOR` of OS memory size as mem cache size in default mode const MEM_CACHE_SIZE_FACTOR: u64 = 16; +/// Use `1/INDEX_CREATE_MEM_THRESHOLD_FACTOR` of OS memory size as mem threshold for creating index +const INDEX_CREATE_MEM_THRESHOLD_FACTOR: u64 = 16; /// Configuration for [MitoEngine](crate::engine::MitoEngine). #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] @@ -109,6 +111,8 @@ pub struct MitoConfig { pub index: IndexConfig, /// Inverted index configs. pub inverted_index: InvertedIndexConfig, + /// Full-text index configs. + pub fulltext_index: FulltextIndexConfig, /// Memtable config pub memtable: MemtableConfig, @@ -139,6 +143,7 @@ impl Default for MitoConfig { allow_stale_entries: false, index: IndexConfig::default(), inverted_index: InvertedIndexConfig::default(), + fulltext_index: FulltextIndexConfig::default(), memtable: MemtableConfig::default(), }; @@ -337,6 +342,20 @@ impl Mode { } } +/// Memory threshold for performing certain actions. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] +#[serde(rename_all = "snake_case")] +pub enum MemoryThreshold { + /// Automatically determine the threshold based on internal criteria. + #[default] + Auto, + /// Unlimited memory. + Unlimited, + /// Fixed memory threshold. + #[serde(untagged)] + Size(ReadableSize), +} + /// Configuration options for the inverted index. #[serde_as] #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] @@ -350,9 +369,7 @@ pub struct InvertedIndexConfig { pub apply_on_query: Mode, /// Memory threshold for performing an external sort during index creation. - /// `None` means all sorting will happen in memory. - #[serde_as(as = "NoneAsEmptyString")] - pub mem_threshold_on_create: Option, + pub mem_threshold_on_create: MemoryThreshold, /// Whether to compress the index data. pub compress: bool, @@ -373,8 +390,8 @@ impl Default for InvertedIndexConfig { create_on_flush: Mode::Auto, create_on_compaction: Mode::Auto, apply_on_query: Mode::Auto, + mem_threshold_on_create: MemoryThreshold::Auto, compress: true, - mem_threshold_on_create: Some(ReadableSize::mb(64)), write_buffer_size: ReadableSize::mb(8), intermediate_path: String::new(), @@ -382,6 +399,67 @@ impl Default for InvertedIndexConfig { } } +impl InvertedIndexConfig { + pub fn mem_threshold_on_create(&self) -> Option { + match self.mem_threshold_on_create { + MemoryThreshold::Auto => { + if let Some(sys_memory) = common_config::utils::get_sys_total_memory() { + Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize) + } else { + Some(ReadableSize::mb(64).as_bytes() as usize) + } + } + MemoryThreshold::Unlimited => None, + MemoryThreshold::Size(size) => Some(size.as_bytes() as usize), + } + } +} + +/// Configuration options for the full-text index. +#[serde_as] +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[serde(default)] +pub struct FulltextIndexConfig { + /// Whether to create the index on flush: automatically or never. + pub create_on_flush: Mode, + /// Whether to create the index on compaction: automatically or never. + pub create_on_compaction: Mode, + /// Whether to apply the index on query: automatically or never. + pub apply_on_query: Mode, + /// Memory threshold for creating the index. + pub mem_threshold_on_create: MemoryThreshold, + /// Whether to compress the index data. + pub compress: bool, +} + +impl Default for FulltextIndexConfig { + fn default() -> Self { + Self { + create_on_flush: Mode::Auto, + create_on_compaction: Mode::Auto, + apply_on_query: Mode::Auto, + mem_threshold_on_create: MemoryThreshold::Auto, + compress: true, + } + } +} + +impl FulltextIndexConfig { + pub fn mem_threshold_on_create(&self) -> usize { + match self.mem_threshold_on_create { + MemoryThreshold::Auto => { + if let Some(sys_memory) = common_config::utils::get_sys_total_memory() { + (sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as _ + } else { + ReadableSize::mb(64).as_bytes() as _ + } + } + MemoryThreshold::Unlimited => usize::MAX, + MemoryThreshold::Size(size) => size.as_bytes() as _, + } + } +} + /// Divide cpu num by a non-zero `divisor` and returns at least 1. fn divide_num_cpus(divisor: usize) -> usize { debug_assert!(divisor > 0); diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 35da912b7afb..d6fe9f226ae0 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -783,6 +783,44 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to retrieve fulltext options from column metadata"))] + FulltextOptions { + #[snafu(implicit)] + location: Location, + source: datatypes::error::Error, + column_name: String, + }, + + #[snafu(display("Failed to create fulltext index creator"))] + CreateFulltextCreator { + source: index::fulltext_index::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to cast vector of {from} to {to}"))] + CastVector { + #[snafu(implicit)] + location: Location, + from: ConcreteDataType, + to: ConcreteDataType, + source: datatypes::error::Error, + }, + + #[snafu(display("Failed to push text to fulltext index"))] + FulltextPushText { + source: index::fulltext_index::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to finalize fulltext index creator"))] + FulltextFinish { + source: index::fulltext_index::error::Error, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -904,6 +942,11 @@ impl ErrorExt for Error { BuildTimeRangeFilter { .. } => StatusCode::Unexpected, UnsupportedOperation { .. } => StatusCode::Unsupported, RemoteCompaction { .. } => StatusCode::Unexpected, + + FulltextOptions { source, .. } => source.status_code(), + CreateFulltextCreator { source, .. } => source.status_code(), + CastVector { source, .. } => source.status_code(), + FulltextPushText { source, .. } | FulltextFinish { source, .. } => source.status_code(), } } diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 1bd7c078b05b..0bfd01dd4fd9 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -332,6 +332,7 @@ impl RegionFlushTask { storage: version.options.storage.clone(), index_options: self.index_options.clone(), inverted_index_config: self.engine_config.inverted_index.clone(), + fulltext_index_config: self.engine_config.fulltext_index.clone(), }; let Some(sst_info) = self .access_layer @@ -354,6 +355,9 @@ impl RegionFlushTask { if sst_info.index_metadata.inverted_index.is_available() { indexes.push(IndexType::InvertedIndex); } + if sst_info.index_metadata.fulltext_index.is_available() { + indexes.push(IndexType::FulltextIndex); + } indexes }, index_file_size: sst_info.index_metadata.file_size, diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index a6f5c9d9f886..61e6a5537691 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -128,12 +128,17 @@ pub struct FileMeta { pub enum IndexType { /// Inverted index. InvertedIndex, + /// Full-text index. + FulltextIndex, } impl FileMeta { pub fn inverted_index_available(&self) -> bool { self.available_indexes.contains(&IndexType::InvertedIndex) } + pub fn fulltext_index_available(&self) -> bool { + self.available_indexes.contains(&IndexType::FulltextIndex) + } } /// Handle to a SST file. diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index 50d40f08942e..5a55cbf7b626 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub(crate) mod fulltext_index; mod indexer; pub(crate) mod intermediate; pub(crate) mod inverted_index; @@ -22,18 +23,18 @@ mod store; use std::num::NonZeroUsize; use common_telemetry::{debug, warn}; -use puffin::puffin_manager::PuffinManager; -use puffin_manager::{SstPuffinManager, SstPuffinWriter}; +use puffin_manager::SstPuffinManager; use statistics::{ByteCount, RowCount}; use store_api::metadata::RegionMetadataRef; use store_api::storage::{ColumnId, RegionId}; use crate::access_layer::OperationType; -use crate::config::InvertedIndexConfig; +use crate::config::{FulltextIndexConfig, InvertedIndexConfig}; use crate::metrics::INDEX_CREATE_MEMORY_USAGE; use crate::read::Batch; use crate::region::options::IndexOptions; use crate::sst::file::FileId; +use crate::sst::index::fulltext_index::creator::SstIndexCreator as FulltextIndexer; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::inverted_index::creator::SstIndexCreator as InvertedIndexer; @@ -44,6 +45,8 @@ pub struct IndexOutput { pub file_size: u64, /// Inverted index output. pub inverted_index: InvertedIndexOutput, + /// Fulltext index output. + pub fulltext_index: FulltextIndexOutput, } /// Output of the inverted index creation. @@ -57,21 +60,40 @@ pub struct InvertedIndexOutput { pub columns: Vec, } +/// Output of the fulltext index creation. +#[derive(Debug, Clone, Default)] +pub struct FulltextIndexOutput { + /// Size of the index. + pub index_size: ByteCount, + /// Number of rows in the index. + pub row_count: RowCount, + /// Available columns in the index. + pub columns: Vec, +} + impl InvertedIndexOutput { pub fn is_available(&self) -> bool { self.index_size > 0 } } +impl FulltextIndexOutput { + pub fn is_available(&self) -> bool { + self.index_size > 0 + } +} + /// The index creator that hides the error handling details. #[derive(Default)] pub struct Indexer { file_id: FileId, + file_path: String, region_id: RegionId, last_memory_usage: usize, + puffin_manager: Option, inverted_indexer: Option, - puffin_writer: Option, + fulltext_indexer: Option, } impl Indexer { @@ -104,6 +126,10 @@ impl Indexer { self.inverted_indexer .as_ref() .map_or(0, |creator| creator.memory_usage()) + + self + .fulltext_indexer + .as_ref() + .map_or(0, |creator| creator.memory_usage()) } } @@ -117,6 +143,7 @@ pub(crate) struct IndexerBuilder<'a> { pub(crate) intermediate_manager: IntermediateManager, pub(crate) index_options: IndexOptions, pub(crate) inverted_index_config: InvertedIndexConfig, + pub(crate) fulltext_index_config: FulltextIndexConfig, } impl<'a> IndexerBuilder<'a> { @@ -124,6 +151,7 @@ impl<'a> IndexerBuilder<'a> { pub(crate) async fn build(self) -> Indexer { let mut indexer = Indexer { file_id: self.file_id, + file_path: self.file_path.clone(), region_id: self.metadata.region_id, last_memory_usage: 0, @@ -131,17 +159,13 @@ impl<'a> IndexerBuilder<'a> { }; indexer.inverted_indexer = self.build_inverted_indexer(); - if indexer.inverted_indexer.is_none() { - indexer.abort().await; - return Indexer::default(); - } - - indexer.puffin_writer = self.build_puffin_writer().await; - if indexer.puffin_writer.is_none() { + indexer.fulltext_indexer = self.build_fulltext_indexer().await; + if indexer.inverted_indexer.is_none() && indexer.fulltext_indexer.is_none() { indexer.abort().await; return Indexer::default(); } + indexer.puffin_manager = Some(self.puffin_manager); indexer } @@ -190,16 +214,11 @@ impl<'a> IndexerBuilder<'a> { segment_row_count = row_group_size; } - let mem_threshold = self - .inverted_index_config - .mem_threshold_on_create - .map(|t| t.as_bytes() as usize); - let indexer = InvertedIndexer::new( self.file_id, self.metadata, self.intermediate_manager.clone(), - mem_threshold, + self.inverted_index_config.mem_threshold_on_create(), segment_row_count, self.inverted_index_config.compress, &self.index_options.inverted_index.ignore_column_ids, @@ -208,20 +227,54 @@ impl<'a> IndexerBuilder<'a> { Some(indexer) } - async fn build_puffin_writer(&self) -> Option { - let err = match self.puffin_manager.writer(&self.file_path).await { - Ok(writer) => return Some(writer), + async fn build_fulltext_indexer(&self) -> Option { + let create = match self.op_type { + OperationType::Flush => self.fulltext_index_config.create_on_flush.auto(), + OperationType::Compact => self.fulltext_index_config.create_on_compaction.auto(), + }; + + if !create { + debug!( + "Skip creating full-text index due to config, region_id: {}, file_id: {}", + self.metadata.region_id, self.file_id, + ); + return None; + } + + let mem_limit = self.fulltext_index_config.mem_threshold_on_create(); + let creator = FulltextIndexer::new( + &self.metadata.region_id, + &self.file_id, + &self.intermediate_manager, + self.metadata, + self.fulltext_index_config.compress, + mem_limit, + ) + .await; + + let err = match creator { + Ok(creator) => { + if creator.is_empty() { + debug!( + "Skip creating full-text index due to no columns require indexing, region_id: {}, file_id: {}", + self.metadata.region_id, self.file_id, + ); + return None; + } else { + return Some(creator); + } + } Err(err) => err, }; if cfg!(any(test, feature = "test")) { panic!( - "Failed to create puffin writer, region_id: {}, file_id: {}, err: {}", + "Failed to create full-text indexer, region_id: {}, file_id: {}, err: {}", self.metadata.region_id, self.file_id, err ); } else { warn!( - err; "Failed to create puffin writer, region_id: {}, file_id: {}", + err; "Failed to create full-text indexer, region_id: {}, file_id: {}", self.metadata.region_id, self.file_id, ); } @@ -236,21 +289,35 @@ mod tests { use api::v1::SemanticType; use datatypes::data_type::ConcreteDataType; - use datatypes::schema::ColumnSchema; + use datatypes::schema::{ColumnSchema, FulltextOptions, FULLTEXT_KEY}; use object_store::services::Memory; use object_store::ObjectStore; use puffin_manager::PuffinManagerFactory; use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder}; use super::*; - use crate::config::Mode; + use crate::config::{FulltextIndexConfig, Mode}; + + struct MetaConfig { + with_tag: bool, + with_fulltext: bool, + } - fn mock_region_metadata() -> RegionMetadataRef { + fn mock_region_metadata( + MetaConfig { + with_tag, + with_fulltext, + }: MetaConfig, + ) -> RegionMetadataRef { let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2)); builder .push_column_metadata(ColumnMetadata { column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false), - semantic_type: SemanticType::Tag, + semantic_type: if with_tag { + SemanticType::Tag + } else { + SemanticType::Field + }, column_id: 1, }) .push_column_metadata(ColumnMetadata { @@ -266,34 +333,33 @@ mod tests { ), semantic_type: SemanticType::Timestamp, column_id: 3, - }) - .primary_key(vec![1]); + }); - Arc::new(builder.build().unwrap()) - } + if with_tag { + builder.primary_key(vec![1]); + } - fn no_tag_region_metadata() -> RegionMetadataRef { - let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2)); - builder - .push_column_metadata(ColumnMetadata { - column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false), - semantic_type: SemanticType::Field, - column_id: 1, + if with_fulltext { + let opts = serde_json::to_string(&FulltextOptions { + enable: true, + ..Default::default() }) - .push_column_metadata(ColumnMetadata { - column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false), + .unwrap(); + + let mut column_schema = + ColumnSchema::new("text", ConcreteDataType::string_datatype(), true); + column_schema + .mut_metadata() + .insert(FULLTEXT_KEY.to_string(), opts); + + let column = ColumnMetadata { + column_schema, semantic_type: SemanticType::Field, - column_id: 2, - }) - .push_column_metadata(ColumnMetadata { - column_schema: ColumnSchema::new( - "c", - ConcreteDataType::timestamp_millisecond_datatype(), - false, - ), - semantic_type: SemanticType::Timestamp, - column_id: 3, - }); + column_id: 4, + }; + + builder.push_column_metadata(column); + } Arc::new(builder.build().unwrap()) } @@ -302,102 +368,164 @@ mod tests { ObjectStore::new(Memory::default()).unwrap().finish() } - fn mock_intm_mgr() -> IntermediateManager { - IntermediateManager::new(mock_object_store()) + async fn mock_intm_mgr(path: impl AsRef) -> IntermediateManager { + IntermediateManager::init_fs(path).await.unwrap() } #[tokio::test] async fn test_build_indexer_basic() { - let (_d, factory) = + let (dir, factory) = PuffinManagerFactory::new_for_test_async("test_build_indexer_basic_").await; - let store = mock_object_store(); - let puffin_manager = factory.build(store); - let metadata = mock_region_metadata(); + let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await; + + let metadata = mock_region_metadata(MetaConfig { + with_tag: true, + with_fulltext: true, + }); let indexer = IndexerBuilder { op_type: OperationType::Flush, file_id: FileId::random(), file_path: "test".to_string(), metadata: &metadata, row_group_size: 1024, - puffin_manager, - intermediate_manager: mock_intm_mgr(), + puffin_manager: factory.build(mock_object_store()), + intermediate_manager: intm_manager, index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), + fulltext_index_config: FulltextIndexConfig::default(), } .build() .await; assert!(indexer.inverted_indexer.is_some()); + assert!(indexer.fulltext_indexer.is_some()); } #[tokio::test] async fn test_build_indexer_disable_create() { - let (_d, factory) = + let (dir, factory) = PuffinManagerFactory::new_for_test_async("test_build_indexer_disable_create_").await; - let store = mock_object_store(); - let puffin_manager = factory.build(store); - let metadata = mock_region_metadata(); + let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await; + + let metadata = mock_region_metadata(MetaConfig { + with_tag: true, + with_fulltext: true, + }); let indexer = IndexerBuilder { op_type: OperationType::Flush, file_id: FileId::random(), file_path: "test".to_string(), metadata: &metadata, row_group_size: 1024, - puffin_manager, - intermediate_manager: mock_intm_mgr(), + puffin_manager: factory.build(mock_object_store()), + intermediate_manager: intm_manager.clone(), index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig { create_on_flush: Mode::Disable, ..Default::default() }, + fulltext_index_config: FulltextIndexConfig::default(), } .build() .await; assert!(indexer.inverted_indexer.is_none()); + assert!(indexer.fulltext_indexer.is_some()); + + let indexer = IndexerBuilder { + op_type: OperationType::Compact, + file_id: FileId::random(), + file_path: "test".to_string(), + metadata: &metadata, + row_group_size: 1024, + puffin_manager: factory.build(mock_object_store()), + intermediate_manager: intm_manager, + index_options: IndexOptions::default(), + inverted_index_config: InvertedIndexConfig::default(), + fulltext_index_config: FulltextIndexConfig { + create_on_compaction: Mode::Disable, + ..Default::default() + }, + } + .build() + .await; + + assert!(indexer.inverted_indexer.is_some()); + assert!(indexer.fulltext_indexer.is_none()); } #[tokio::test] - async fn test_build_indexer_no_tag() { - let (_d, factory) = - PuffinManagerFactory::new_for_test_async("test_build_indexer_no_tag_").await; - let store = mock_object_store(); - let puffin_manager = factory.build(store); - let metadata = no_tag_region_metadata(); + async fn test_build_indexer_no_required() { + let (dir, factory) = + PuffinManagerFactory::new_for_test_async("test_build_indexer_no_required_").await; + let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await; + + let metadata = mock_region_metadata(MetaConfig { + with_tag: false, + with_fulltext: true, + }); let indexer = IndexerBuilder { op_type: OperationType::Flush, file_id: FileId::random(), file_path: "test".to_string(), metadata: &metadata, row_group_size: 1024, - puffin_manager, - intermediate_manager: mock_intm_mgr(), + puffin_manager: factory.build(mock_object_store()), + intermediate_manager: intm_manager.clone(), index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), + fulltext_index_config: FulltextIndexConfig::default(), } .build() .await; assert!(indexer.inverted_indexer.is_none()); + assert!(indexer.fulltext_indexer.is_some()); + + let metadata = mock_region_metadata(MetaConfig { + with_tag: true, + with_fulltext: false, + }); + let indexer = IndexerBuilder { + op_type: OperationType::Flush, + file_id: FileId::random(), + file_path: "test".to_string(), + metadata: &metadata, + row_group_size: 1024, + puffin_manager: factory.build(mock_object_store()), + intermediate_manager: intm_manager, + index_options: IndexOptions::default(), + inverted_index_config: InvertedIndexConfig::default(), + fulltext_index_config: FulltextIndexConfig::default(), + } + .build() + .await; + + assert!(indexer.inverted_indexer.is_some()); + assert!(indexer.fulltext_indexer.is_none()); } #[tokio::test] async fn test_build_indexer_zero_row_group() { - let (_d, factory) = + let (dir, factory) = PuffinManagerFactory::new_for_test_async("test_build_indexer_zero_row_group_").await; - let store = mock_object_store(); - let puffin_manager = factory.build(store); - let metadata = mock_region_metadata(); + let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await; + + let metadata = mock_region_metadata(MetaConfig { + with_tag: true, + with_fulltext: true, + }); let indexer = IndexerBuilder { op_type: OperationType::Flush, file_id: FileId::random(), file_path: "test".to_string(), metadata: &metadata, row_group_size: 0, - puffin_manager, - intermediate_manager: mock_intm_mgr(), + puffin_manager: factory.build(mock_object_store()), + intermediate_manager: intm_manager, index_options: IndexOptions::default(), inverted_index_config: InvertedIndexConfig::default(), + fulltext_index_config: FulltextIndexConfig::default(), } .build() .await; diff --git a/src/mito2/src/sst/index/fulltext_index.rs b/src/mito2/src/sst/index/fulltext_index.rs new file mode 100644 index 000000000000..ed4db4ae87ba --- /dev/null +++ b/src/mito2/src/sst/index/fulltext_index.rs @@ -0,0 +1,17 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub(crate) mod creator; + +const INDEX_BLOB_TYPE: &str = "greptime-fulltext-index-v1"; diff --git a/src/mito2/src/sst/index/fulltext_index/creator.rs b/src/mito2/src/sst/index/fulltext_index/creator.rs new file mode 100644 index 000000000000..aea9e1b833e8 --- /dev/null +++ b/src/mito2/src/sst/index/fulltext_index/creator.rs @@ -0,0 +1,296 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::path::PathBuf; + +use common_telemetry::warn; +use datatypes::schema::FulltextAnalyzer; +use index::fulltext_index::create::{FulltextIndexCreator, TantivyFulltextIndexCreator}; +use index::fulltext_index::{Analyzer, Config}; +use puffin::blob_metadata::CompressionCodec; +use puffin::puffin_manager::{PuffinWriter, PutOptions}; +use snafu::{ensure, ResultExt}; +use store_api::metadata::RegionMetadataRef; +use store_api::storage::{ColumnId, ConcreteDataType, RegionId}; + +use crate::error::{ + CastVectorSnafu, CreateFulltextCreatorSnafu, FieldTypeMismatchSnafu, FulltextFinishSnafu, + FulltextOptionsSnafu, FulltextPushTextSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu, + Result, +}; +use crate::read::Batch; +use crate::sst::file::FileId; +use crate::sst::index::fulltext_index::INDEX_BLOB_TYPE; +use crate::sst::index::intermediate::IntermediateManager; +use crate::sst::index::puffin_manager::SstPuffinWriter; +use crate::sst::index::statistics::{ByteCount, RowCount, Statistics}; + +/// `SstIndexCreator` is responsible for creating fulltext indexes for SST files. +pub struct SstIndexCreator { + /// Creators for each column. + creators: HashMap, + /// Whether the index creation was aborted. + aborted: bool, + /// Statistics of index creation. + stats: Statistics, +} + +impl SstIndexCreator { + /// Creates a new `SstIndexCreator`. + pub async fn new( + region_id: &RegionId, + sst_file_id: &FileId, + intermediate_manager: &IntermediateManager, + metadata: &RegionMetadataRef, + compress: bool, + mem_limit: usize, + ) -> Result { + let mut creators = HashMap::new(); + + for column in &metadata.column_metadatas { + let options = + column + .column_schema + .fulltext_options() + .context(FulltextOptionsSnafu { + column_name: &column.column_schema.name, + })?; + + // Relax the type constraint here as many types can be casted to string. + + let options = match options { + Some(options) if options.enable => options, + _ => continue, + }; + + let column_id = column.column_id; + let intm_path = intermediate_manager.fulltext_path(region_id, sst_file_id, column_id); + + let config = Config { + analyzer: match options.analyzer { + FulltextAnalyzer::English => Analyzer::English, + FulltextAnalyzer::Chinese => Analyzer::Chinese, + }, + case_sensitive: options.case_sensitive, + }; + + let creator = TantivyFulltextIndexCreator::new(&intm_path, config, mem_limit) + .await + .context(CreateFulltextCreatorSnafu)?; + + creators.insert( + column_id, + SingleCreator { + column_id, + inner: Box::new(creator), + intm_path, + compress, + }, + ); + } + + Ok(Self { + creators, + aborted: false, + stats: Statistics::default(), + }) + } + + /// Updates the index with the given batch. + pub async fn update(&mut self, batch: &Batch) -> Result<()> { + ensure!(!self.aborted, OperateAbortedIndexSnafu); + + if let Err(update_err) = self.do_update(batch).await { + if let Err(err) = self.do_abort().await { + if cfg!(any(test, feature = "test")) { + panic!("Failed to abort index creator, err: {err}"); + } else { + warn!(err; "Failed to abort index creator"); + } + } + return Err(update_err); + } + + Ok(()) + } + + /// Finalizes the index creation. + pub async fn finish( + &mut self, + puffin_writer: &mut SstPuffinWriter, + ) -> Result<(RowCount, ByteCount)> { + ensure!(!self.aborted, OperateAbortedIndexSnafu); + + match self.do_finish(puffin_writer).await { + Ok(()) => Ok((self.stats.row_count(), self.stats.byte_count())), + Err(finish_err) => { + if let Err(err) = self.do_abort().await { + if cfg!(any(test, feature = "test")) { + panic!("Failed to abort index creator, err: {err}"); + } else { + warn!(err; "Failed to abort index creator"); + } + } + Err(finish_err) + } + } + } + + /// Aborts the index creation. + pub async fn abort(&mut self) -> Result<()> { + if self.aborted { + return Ok(()); + } + + self.do_abort().await + } + + /// Returns the memory usage of the index creator. + pub fn memory_usage(&self) -> usize { + self.creators.values().map(|c| c.inner.memory_usage()).sum() + } + + /// Returns IDs of columns that the creator is responsible for. + pub fn column_ids(&self) -> impl Iterator + '_ { + self.creators.keys().copied() + } + + pub(crate) fn is_empty(&self) -> bool { + self.creators.is_empty() + } +} + +impl SstIndexCreator { + async fn do_update(&mut self, batch: &Batch) -> Result<()> { + let mut guard = self.stats.record_update(); + guard.inc_row_count(batch.num_rows()); + + for creator in self.creators.values_mut() { + creator.update(batch).await?; + } + + Ok(()) + } + + async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> { + let mut guard = self.stats.record_finish(); + + let mut written_bytes = 0; + for creator in self.creators.values_mut() { + written_bytes += creator.finish(puffin_writer).await?; + } + + guard.inc_byte_count(written_bytes); + Ok(()) + } + + async fn do_abort(&mut self) -> Result<()> { + let _guard = self.stats.record_cleanup(); + + self.aborted = true; + + for (_, mut creator) in self.creators.drain() { + creator.abort().await?; + } + + Ok(()) + } +} + +/// `SingleCreator` is a creator for a single column. +struct SingleCreator { + /// Column ID. + column_id: ColumnId, + /// Inner creator. + inner: Box, + /// Intermediate path where the index is written to. + intm_path: PathBuf, + /// Whether the index should be compressed. + compress: bool, +} + +impl SingleCreator { + async fn update(&mut self, batch: &Batch) -> Result<()> { + let text_column = batch + .fields() + .iter() + .find(|c| c.column_id == self.column_id); + match text_column { + Some(column) => { + let data = column + .data + .cast(&ConcreteDataType::string_datatype()) + .context(CastVectorSnafu { + from: column.data.data_type(), + to: ConcreteDataType::string_datatype(), + })?; + + for i in 0..batch.num_rows() { + let data = data.get_ref(i); + let text = data + .as_string() + .context(FieldTypeMismatchSnafu)? + .unwrap_or_default(); + self.inner + .push_text(text) + .await + .context(FulltextPushTextSnafu)?; + } + } + _ => { + // If the column is not found in the batch, push empty text. + // Ensure that the number of texts pushed is the same as the number of rows in the SST, + // so that the texts are aligned with the row ids. + for _ in 0..batch.num_rows() { + self.inner + .push_text("") + .await + .context(FulltextPushTextSnafu)?; + } + } + } + + Ok(()) + } + + async fn finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result { + self.inner.finish().await.context(FulltextFinishSnafu)?; + + let options = PutOptions { + compression: self.compress.then_some(CompressionCodec::Zstd), + }; + + let key = format!("{INDEX_BLOB_TYPE}-{}", self.column_id); + puffin_writer + .put_dir(&key, self.intm_path.clone(), options) + .await + .context(PuffinAddBlobSnafu) + } + + async fn abort(&mut self) -> Result<()> { + if let Err(err) = self.inner.finish().await { + warn!(err; "Failed to finish fulltext index creator, col_id: {:?}, dir_path: {:?}", self.column_id, self.intm_path); + } + if let Err(err) = tokio::fs::remove_dir_all(&self.intm_path).await { + warn!(err; "Failed to remove fulltext index directory, col_id: {:?}, dir_path: {:?}", self.column_id, self.intm_path); + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + // TODO(zhongzc): After search is implemented, add tests for full-text indexer. +} diff --git a/src/mito2/src/sst/index/indexer/abort.rs b/src/mito2/src/sst/index/indexer/abort.rs index 2e7afe5d3944..68034d48fb29 100644 --- a/src/mito2/src/sst/index/indexer/abort.rs +++ b/src/mito2/src/sst/index/indexer/abort.rs @@ -13,14 +13,14 @@ // limitations under the License. use common_telemetry::warn; -use puffin::puffin_manager::PuffinWriter; use crate::sst::index::Indexer; impl Indexer { pub(crate) async fn do_abort(&mut self) { self.do_abort_inverted_index().await; - self.do_abort_puffin_writer().await; + self.do_abort_fulltext_index().await; + self.puffin_manager = None; } async fn do_abort_inverted_index(&mut self) { @@ -44,24 +44,22 @@ impl Indexer { } } - async fn do_abort_puffin_writer(&mut self) { - let Some(puffin_writer) = self.puffin_writer.take() else { + async fn do_abort_fulltext_index(&mut self) { + let Some(mut indexer) = self.fulltext_indexer.take() else { return; }; - - let err = match puffin_writer.finish().await { - Ok(_) => return, - Err(err) => err, + let Err(err) = indexer.abort().await else { + return; }; if cfg!(any(test, feature = "test")) { panic!( - "Failed to abort puffin writer, region_id: {}, file_id: {}, err: {}", + "Failed to abort full-text index, region_id: {}, file_id: {}, err: {}", self.region_id, self.file_id, err ); } else { warn!( - err; "Failed to abort puffin writer, region_id: {}, file_id: {}", + err; "Failed to abort full-text index, region_id: {}, file_id: {}", self.region_id, self.file_id, ); } diff --git a/src/mito2/src/sst/index/indexer/finish.rs b/src/mito2/src/sst/index/indexer/finish.rs index 31ec0c0e52fe..324137ec2df0 100644 --- a/src/mito2/src/sst/index/indexer/finish.rs +++ b/src/mito2/src/sst/index/indexer/finish.rs @@ -13,18 +13,20 @@ // limitations under the License. use common_telemetry::{debug, warn}; -use puffin::puffin_manager::PuffinWriter; +use puffin::puffin_manager::{PuffinManager, PuffinWriter}; +use crate::sst::index::fulltext_index::creator::SstIndexCreator as FulltextIndexer; use crate::sst::index::inverted_index::creator::SstIndexCreator as InvertedIndexer; use crate::sst::index::puffin_manager::SstPuffinWriter; use crate::sst::index::statistics::{ByteCount, RowCount}; -use crate::sst::index::{IndexOutput, Indexer, InvertedIndexOutput}; +use crate::sst::index::{FulltextIndexOutput, IndexOutput, Indexer, InvertedIndexOutput}; impl Indexer { pub(crate) async fn do_finish(&mut self) -> IndexOutput { let mut output = IndexOutput::default(); - let Some(mut writer) = self.puffin_writer.take() else { + let Some(mut writer) = self.build_puffin_writer().await else { + self.do_abort().await; return output; }; @@ -36,10 +38,41 @@ impl Indexer { return IndexOutput::default(); } + let success = self + .do_finish_fulltext_index(&mut writer, &mut output) + .await; + if !success { + self.do_abort().await; + return IndexOutput::default(); + } + output.file_size = self.do_finish_puffin_writer(writer).await; output } + async fn build_puffin_writer(&mut self) -> Option { + let puffin_manager = self.puffin_manager.take()?; + + let err = match puffin_manager.writer(&self.file_path).await { + Ok(writer) => return Some(writer), + Err(err) => err, + }; + + if cfg!(any(test, feature = "test")) { + panic!( + "Failed to create puffin writer, region_id: {}, file_id: {}, err: {}", + self.region_id, self.file_id, err + ); + } else { + warn!( + err; "Failed to create puffin writer, region_id: {}, file_id: {}", + self.region_id, self.file_id, + ); + } + + None + } + async fn do_finish_puffin_writer(&mut self, writer: SstPuffinWriter) -> ByteCount { let err = match writer.finish().await { Ok(size) => return size, @@ -99,6 +132,43 @@ impl Indexer { false } + async fn do_finish_fulltext_index( + &mut self, + puffin_writer: &mut SstPuffinWriter, + index_output: &mut IndexOutput, + ) -> bool { + let Some(mut indexer) = self.fulltext_indexer.take() else { + return true; + }; + + let err = match indexer.finish(puffin_writer).await { + Ok((row_count, byte_count)) => { + self.fill_fulltext_index_output( + &mut index_output.fulltext_index, + row_count, + byte_count, + &indexer, + ); + return true; + } + Err(err) => err, + }; + + if cfg!(any(test, feature = "test")) { + panic!( + "Failed to finish full-text index, region_id: {}, file_id: {}, err: {}", + self.region_id, self.file_id, err + ); + } else { + warn!( + err; "Failed to finish full-text index, region_id: {}, file_id: {}", + self.region_id, self.file_id, + ); + } + + false + } + fn fill_inverted_index_output( &mut self, output: &mut InvertedIndexOutput, @@ -115,4 +185,21 @@ impl Indexer { output.row_count = row_count; output.columns = indexer.column_ids().collect(); } + + fn fill_fulltext_index_output( + &mut self, + output: &mut FulltextIndexOutput, + row_count: RowCount, + byte_count: ByteCount, + indexer: &FulltextIndexer, + ) { + debug!( + "Full-text index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}", + self.region_id, self.file_id, byte_count, row_count + ); + + output.index_size = byte_count; + output.row_count = row_count; + output.columns = indexer.column_ids().collect(); + } } diff --git a/src/mito2/src/sst/index/indexer/update.rs b/src/mito2/src/sst/index/indexer/update.rs index 42302d83a724..c08f171bb415 100644 --- a/src/mito2/src/sst/index/indexer/update.rs +++ b/src/mito2/src/sst/index/indexer/update.rs @@ -26,6 +26,9 @@ impl Indexer { if !self.do_update_inverted_index(batch).await { self.do_abort().await; } + if !self.do_update_fulltext_index(batch).await { + self.do_abort().await; + } } /// Returns false if the update failed. @@ -52,4 +55,29 @@ impl Indexer { false } + + /// Returns false if the update failed. + async fn do_update_fulltext_index(&mut self, batch: &Batch) -> bool { + let Some(creator) = self.fulltext_indexer.as_mut() else { + return true; + }; + + let Err(err) = creator.update(batch).await else { + return true; + }; + + if cfg!(any(test, feature = "test")) { + panic!( + "Failed to update full-text index, region_id: {}, file_id: {}, err: {}", + self.region_id, self.file_id, err + ); + } else { + warn!( + err; "Failed to update full-text index, region_id: {}, file_id: {}", + self.region_id, self.file_id, + ); + } + + false + } } diff --git a/src/mito2/src/sst/index/intermediate.rs b/src/mito2/src/sst/index/intermediate.rs index 18e63e827c73..6464b97eff93 100644 --- a/src/mito2/src/sst/index/intermediate.rs +++ b/src/mito2/src/sst/index/intermediate.rs @@ -12,9 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::path::PathBuf; + use common_telemetry::warn; use object_store::util::{self, normalize_dir}; -use store_api::storage::RegionId; +use store_api::storage::{ColumnId, RegionId}; use uuid::Uuid; use crate::access_layer::new_fs_object_store; @@ -27,6 +29,7 @@ const INTERMEDIATE_DIR: &str = "__intm"; /// `IntermediateManager` provides store to access to intermediate files. #[derive(Clone)] pub struct IntermediateManager { + base_dir: PathBuf, store: InstrumentedStore, } @@ -42,7 +45,10 @@ impl IntermediateManager { warn!(err; "Failed to remove garbage intermediate files"); } - Ok(Self { store }) + Ok(Self { + base_dir: PathBuf::from(aux_path.as_ref()), + store, + }) } /// Set the write buffer size for the store. @@ -56,11 +62,20 @@ impl IntermediateManager { &self.store } - #[cfg(test)] - pub(crate) fn new(store: object_store::ObjectStore) -> Self { - Self { - store: InstrumentedStore::new(store), - } + /// Returns the intermediate directory path for building fulltext index. + /// The format is `{aux_path}/__intm/{region_id}/{sst_file_id}/fulltext-{column_id}-{uuid}`. + pub(crate) fn fulltext_path( + &self, + region_id: &RegionId, + sst_file_id: &FileId, + column_id: ColumnId, + ) -> PathBuf { + let uuid = Uuid::new_v4(); + self.base_dir + .join(INTERMEDIATE_DIR) + .join(region_id.as_u64().to_string()) + .join(sst_file_id.to_string()) + .join(format!("fulltext-{column_id}-{uuid}")) } } @@ -163,4 +178,30 @@ mod tests { format!("{INTERMEDIATE_DIR}/0/{sst_file_id}/{uuid}/{column_id}/{im_file_id}.im") ); } + + #[tokio::test] + async fn test_fulltext_intm_path() { + let temp_dir = temp_dir::create_temp_dir("test_fulltext_intm_path_"); + let aux_path = temp_dir.path().to_string_lossy().to_string(); + + let manager = IntermediateManager::init_fs(&aux_path).await.unwrap(); + let region_id = RegionId::new(0, 0); + let sst_file_id = FileId::random(); + let column_id = 0; + let fulltext_path = manager.fulltext_path(®ion_id, &sst_file_id, column_id); + + if cfg!(windows) { + let p = fulltext_path.to_string_lossy().to_string(); + let r = Regex::new(&format!( + "{aux_path}\\\\{INTERMEDIATE_DIR}\\\\0\\\\{sst_file_id}\\\\fulltext-0-\\w{{8}}-\\w{{4}}-\\w{{4}}-\\w{{4}}-\\w{{12}}", + )).unwrap(); + assert!(r.is_match(&p)); + } else { + let p = fulltext_path.to_string_lossy().to_string(); + let r = Regex::new(&format!( + "{aux_path}/{INTERMEDIATE_DIR}/0/{sst_file_id}/fulltext-0-\\w{{8}}-\\w{{4}}-\\w{{4}}-\\w{{4}}-\\w{{12}}", + )).unwrap(); + assert!(r.is_match(&p)); + } + } } diff --git a/src/mito2/src/sst/index/inverted_index/creator.rs b/src/mito2/src/sst/index/inverted_index/creator.rs index 4a464f770198..00aafad1595d 100644 --- a/src/mito2/src/sst/index/inverted_index/creator.rs +++ b/src/mito2/src/sst/index/inverted_index/creator.rs @@ -313,8 +313,8 @@ mod tests { ObjectStore::new(Memory::default()).unwrap().finish() } - fn mock_intm_mgr() -> IntermediateManager { - IntermediateManager::new(mock_object_store()) + async fn new_intm_mgr(path: impl AsRef) -> IntermediateManager { + IntermediateManager::init_fs(path).await.unwrap() } fn mock_region_metadata() -> RegionMetadataRef { @@ -387,7 +387,7 @@ mod tests { let file_path = location::index_file_path(®ion_dir, sst_file_id); let object_store = mock_object_store(); let region_metadata = mock_region_metadata(); - let intm_mgr = mock_intm_mgr(); + let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await; let memory_threshold = None; let segment_row_count = 2; diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 49b89b6a1b0a..0bac00714abc 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -45,7 +45,6 @@ use log_store::raft_engine::log_store::RaftEngineLogStore; use log_store::test_util::log_store_util; use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef}; use object_store::services::Fs; -use object_store::util::join_dir; use object_store::ObjectStore; use rskafka::client::partition::{Compression, UnknownTopicHandling}; use rskafka::client::{Client, ClientBuilder}; @@ -603,8 +602,6 @@ impl TestEnv { local_store: ObjectStore, capacity: ReadableSize, ) -> WriteCacheRef { - let data_home = self.data_home().display().to_string(); - let index_aux_path = self.data_home.path().join("index_aux"); let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None) .await diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index ebe821e428d9..7cebae847e26 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -837,7 +837,14 @@ write_buffer_size = "8MiB" create_on_flush = "auto" create_on_compaction = "auto" apply_on_query = "auto" -mem_threshold_on_create = "64.0MiB" +mem_threshold_on_create = "auto" +compress = true + +[region_engine.mito.fulltext_index] +create_on_flush = "auto" +create_on_compaction = "auto" +apply_on_query = "auto" +mem_threshold_on_create = "auto" compress = true [region_engine.mito.memtable]