diff --git a/milli/src/update/index_documents/merge_function.rs b/milli/src/update/index_documents/merge_function.rs index 6da19bc847..fa823607bd 100644 --- a/milli/src/update/index_documents/merge_function.rs +++ b/milli/src/update/index_documents/merge_function.rs @@ -1,71 +1,33 @@ use std::borrow::Cow; -use anyhow::{bail, ensure, Context}; +use anyhow::bail; use bstr::ByteSlice as _; use fst::IntoStreamer; use roaring::RoaringBitmap; use crate::heed_codec::CboRoaringBitmapCodec; -const WORDS_FST_KEY: &[u8] = crate::index::WORDS_FST_KEY.as_bytes(); -const FIELDS_IDS_MAP_KEY: &[u8] = crate::index::FIELDS_IDS_MAP_KEY.as_bytes(); -const DOCUMENTS_IDS_KEY: &[u8] = crate::index::DOCUMENTS_IDS_KEY.as_bytes(); - -pub fn main_merge(key: &[u8], values: &[Cow<[u8]>]) -> anyhow::Result> { - match key { - WORDS_FST_KEY => { - let fsts: Vec<_> = values.iter().map(|v| fst::Set::new(v).unwrap()).collect(); - - // Union of the FSTs - let mut op = fst::set::OpBuilder::new(); - fsts.iter().for_each(|fst| op.push(fst.into_stream())); - let op = op.r#union(); - - let mut build = fst::SetBuilder::memory(); - build.extend_stream(op.into_stream()).unwrap(); - Ok(build.into_inner().unwrap()) - }, - FIELDS_IDS_MAP_KEY => { - ensure!(values.windows(2).all(|vs| vs[0] == vs[1]), "fields ids map doesn't match"); - Ok(values[0].to_vec()) - }, - DOCUMENTS_IDS_KEY => roaring_bitmap_merge(values), - otherwise => bail!("wut {:?}", otherwise), - } -} - -pub fn word_docids_merge(_key: &[u8], values: &[Cow<[u8]>]) -> anyhow::Result> { - roaring_bitmap_merge(values) -} - -pub fn docid_word_positions_merge(key: &[u8], _values: &[Cow<[u8]>]) -> anyhow::Result> { - bail!("merging docid word positions is an error ({:?})", key.as_bstr()) -} - -pub fn field_id_docid_facet_values_merge(_key: &[u8], values: &[Cow<[u8]>]) -> anyhow::Result> { - let first = values.first().context("no value to merge")?; - ensure!(values.iter().all(|v| v == first), "invalid field id docid facet value merging"); - Ok(first.to_vec()) -} +// Union of multiple FSTs +pub fn fst_merge(_key: &[u8], values: &[Cow<[u8]>]) -> anyhow::Result> { + let fsts = values.iter().map(fst::Set::new).collect::, _>>()?; + let op_builder: fst::set::OpBuilder = fsts.iter().map(|fst| fst.into_stream()).collect(); + let op = op_builder.r#union(); -pub fn words_pairs_proximities_docids_merge(_key: &[u8], values: &[Cow<[u8]>]) -> anyhow::Result> { - cbo_roaring_bitmap_merge(values) + let mut build = fst::SetBuilder::memory(); + build.extend_stream(op.into_stream()).unwrap(); + Ok(build.into_inner().unwrap()) } -pub fn word_prefix_level_positions_docids_merge(_key: &[u8], values: &[Cow<[u8]>]) -> anyhow::Result> { - cbo_roaring_bitmap_merge(values) +pub fn word_docids_merge(key: &[u8], values: &[Cow<[u8]>]) -> anyhow::Result> { + roaring_bitmap_merge(key, values) } -pub fn word_level_position_docids_merge(_key: &[u8], values: &[Cow<[u8]>]) -> anyhow::Result> { - cbo_roaring_bitmap_merge(values) -} - -pub fn field_id_word_count_docids_merge(_key: &[u8], values: &[Cow<[u8]>]) -> anyhow::Result> { - cbo_roaring_bitmap_merge(values) +pub fn docid_word_positions_merge(key: &[u8], _values: &[Cow<[u8]>]) -> anyhow::Result> { + panic!("merging docid word positions is an error ({:?})", key.as_bstr()) } -pub fn facet_field_value_docids_merge(_key: &[u8], values: &[Cow<[u8]>]) -> anyhow::Result> { - cbo_roaring_bitmap_merge(values) +pub fn keep_first(_key: &[u8], values: &[Cow<[u8]>]) -> anyhow::Result> { + Ok(values.first().unwrap().to_vec()) } pub fn documents_merge(key: &[u8], _values: &[Cow<[u8]>]) -> anyhow::Result> { @@ -88,7 +50,7 @@ pub fn merge_two_obkvs(base: obkv::KvReader, update: obkv::KvReader, buffer: &mu writer.finish().unwrap(); } -fn roaring_bitmap_merge(values: &[Cow<[u8]>]) -> anyhow::Result> { +pub fn roaring_bitmap_merge(_key: &[u8], values: &[Cow<[u8]>]) -> anyhow::Result> { let (head, tail) = values.split_first().unwrap(); let mut head = RoaringBitmap::deserialize_from(&head[..])?; @@ -102,7 +64,7 @@ fn roaring_bitmap_merge(values: &[Cow<[u8]>]) -> anyhow::Result> { Ok(vec) } -fn cbo_roaring_bitmap_merge(values: &[Cow<[u8]>]) -> anyhow::Result> { +pub fn cbo_roaring_bitmap_merge(_key: &[u8], values: &[Cow<[u8]>]) -> anyhow::Result> { let (head, tail) = values.split_first().unwrap(); let mut head = CboRoaringBitmapCodec::deserialize_from(&head[..])?; diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 4cf56b5637..f847ee2646 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -25,11 +25,8 @@ use crate::update::{ }; use self::store::{Store, Readers}; pub use self::merge_function::{ - main_merge, word_docids_merge, words_pairs_proximities_docids_merge, - docid_word_positions_merge, documents_merge, - word_level_position_docids_merge, word_prefix_level_positions_docids_merge, - facet_field_value_docids_merge, field_id_docid_facet_values_merge, - field_id_word_count_docids_merge, + fst_merge, word_docids_merge, cbo_roaring_bitmap_merge, roaring_bitmap_merge, + docid_word_positions_merge, documents_merge, keep_first }; pub use self::transform::{Transform, TransformOutput}; @@ -539,22 +536,22 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { debug!("Merging the main, word docids and words pairs proximity docids in parallel..."); rayon::spawn(move || { vec![ - (DatabaseType::Main, main_readers, main_merge as MergeFn), + (DatabaseType::Main, main_readers, fst_merge as MergeFn), (DatabaseType::WordDocids, word_docids_readers, word_docids_merge), ( DatabaseType::FacetLevel0NumbersDocids, facet_field_numbers_docids_readers, - facet_field_value_docids_merge, + cbo_roaring_bitmap_merge, ), ( DatabaseType::WordLevel0PositionDocids, word_level_position_docids_readers, - word_level_position_docids_merge, + cbo_roaring_bitmap_merge, ), ( DatabaseType::FieldIdWordCountDocids, field_id_word_count_docids_readers, - field_id_word_count_docids_merge, + cbo_roaring_bitmap_merge, ), ] .into_par_iter() @@ -657,7 +654,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { self.wtxn, *self.index.facet_id_string_docids.as_polymorph(), facet_field_strings_docids_readers, - facet_field_value_docids_merge, + cbo_roaring_bitmap_merge, write_method, )?; @@ -672,7 +669,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { self.wtxn, *self.index.field_id_docid_facet_f64s.as_polymorph(), field_id_docid_facet_numbers_readers, - field_id_docid_facet_values_merge, + keep_first, write_method, )?; @@ -687,7 +684,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { self.wtxn, *self.index.field_id_docid_facet_strings.as_polymorph(), field_id_docid_facet_strings_readers, - field_id_docid_facet_values_merge, + keep_first, write_method, )?; @@ -702,7 +699,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { self.wtxn, *self.index.word_pair_proximity_docids.as_polymorph(), words_pairs_proximities_docids_readers, - words_pairs_proximities_docids_merge, + cbo_roaring_bitmap_merge, write_method, )?; @@ -721,7 +718,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { self.wtxn, self.index.main, content, - main_merge, + fst_merge, WriteMethod::GetMergePut, )?; }, @@ -743,7 +740,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { self.wtxn, db, content, - facet_field_value_docids_merge, + cbo_roaring_bitmap_merge, write_method, )?; }, @@ -754,7 +751,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { self.wtxn, db, content, - field_id_word_count_docids_merge, + cbo_roaring_bitmap_merge, write_method, )?; }, @@ -765,7 +762,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { self.wtxn, db, content, - word_level_position_docids_merge, + cbo_roaring_bitmap_merge, write_method, )?; } diff --git a/milli/src/update/index_documents/store.rs b/milli/src/update/index_documents/store.rs index 69263e5a0d..91c2a9c276 100644 --- a/milli/src/update/index_documents/store.rs +++ b/milli/src/update/index_documents/store.rs @@ -26,11 +26,7 @@ use crate::update::UpdateIndexingStep; use crate::{json_to_string, SmallVec32, Position, DocumentId, FieldId}; use super::{MergeFn, create_writer, create_sorter, writer_into_reader}; -use super::merge_function::{ - main_merge, word_docids_merge, words_pairs_proximities_docids_merge, - word_level_position_docids_merge, facet_field_value_docids_merge, - field_id_docid_facet_values_merge, field_id_word_count_docids_merge, -}; +use super::merge_function::{fst_merge, word_docids_merge, keep_first, cbo_roaring_bitmap_merge}; const LMDB_MAX_KEY_LENGTH: usize = 511; const ONE_KILOBYTE: usize = 1024 * 1024; @@ -104,7 +100,7 @@ impl<'s, A: AsRef<[u8]>> Store<'s, A> { let linked_hash_map_size = linked_hash_map_size.unwrap_or(500); let main_sorter = create_sorter( - main_merge, + fst_merge, chunk_compression_type, chunk_compression_level, chunk_fusing_shrink_size, @@ -120,7 +116,7 @@ impl<'s, A: AsRef<[u8]>> Store<'s, A> { max_memory, ); let words_pairs_proximities_docids_sorter = create_sorter( - words_pairs_proximities_docids_merge, + cbo_roaring_bitmap_merge, chunk_compression_type, chunk_compression_level, chunk_fusing_shrink_size, @@ -128,7 +124,7 @@ impl<'s, A: AsRef<[u8]>> Store<'s, A> { max_memory, ); let word_level_position_docids_sorter = create_sorter( - word_level_position_docids_merge, + cbo_roaring_bitmap_merge, chunk_compression_type, chunk_compression_level, chunk_fusing_shrink_size, @@ -136,7 +132,7 @@ impl<'s, A: AsRef<[u8]>> Store<'s, A> { max_memory, ); let field_id_word_count_docids_sorter = create_sorter( - field_id_word_count_docids_merge, + cbo_roaring_bitmap_merge, chunk_compression_type, chunk_compression_level, chunk_fusing_shrink_size, @@ -144,7 +140,7 @@ impl<'s, A: AsRef<[u8]>> Store<'s, A> { max_memory, ); let facet_field_numbers_docids_sorter = create_sorter( - facet_field_value_docids_merge, + cbo_roaring_bitmap_merge, chunk_compression_type, chunk_compression_level, chunk_fusing_shrink_size, @@ -152,7 +148,7 @@ impl<'s, A: AsRef<[u8]>> Store<'s, A> { max_memory, ); let facet_field_strings_docids_sorter = create_sorter( - facet_field_value_docids_merge, + cbo_roaring_bitmap_merge, chunk_compression_type, chunk_compression_level, chunk_fusing_shrink_size, @@ -160,7 +156,7 @@ impl<'s, A: AsRef<[u8]>> Store<'s, A> { max_memory, ); let field_id_docid_facet_numbers_sorter = create_sorter( - field_id_docid_facet_values_merge, + keep_first, chunk_compression_type, chunk_compression_level, chunk_fusing_shrink_size, @@ -168,7 +164,7 @@ impl<'s, A: AsRef<[u8]>> Store<'s, A> { Some(1024 * 1024 * 1024), // 1MB ); let field_id_docid_facet_strings_sorter = create_sorter( - field_id_docid_facet_values_merge, + keep_first, chunk_compression_type, chunk_compression_level, chunk_fusing_shrink_size, diff --git a/milli/src/update/word_prefix_pair_proximity_docids.rs b/milli/src/update/word_prefix_pair_proximity_docids.rs index c972efc4f7..c6b935e540 100644 --- a/milli/src/update/word_prefix_pair_proximity_docids.rs +++ b/milli/src/update/word_prefix_pair_proximity_docids.rs @@ -11,7 +11,7 @@ use crate::Index; use crate::heed_codec::StrStrU8Codec; use crate::update::index_documents::{ WriteMethod, create_sorter, sorter_into_lmdb_database, - words_pairs_proximities_docids_merge, + cbo_roaring_bitmap_merge, }; pub struct WordPrefixPairProximityDocids<'t, 'u, 'i> { @@ -50,7 +50,7 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { // Here we create a sorter akin to the previous one. let mut word_prefix_pair_proximity_docids_sorter = create_sorter( - words_pairs_proximities_docids_merge, + cbo_roaring_bitmap_merge, self.chunk_compression_type, self.chunk_compression_level, self.chunk_fusing_shrink_size, @@ -80,7 +80,7 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { self.wtxn, *self.index.word_prefix_pair_proximity_docids.as_polymorph(), word_prefix_pair_proximity_docids_sorter, - words_pairs_proximities_docids_merge, + cbo_roaring_bitmap_merge, WriteMethod::Append, )?; diff --git a/milli/src/update/words_level_positions.rs b/milli/src/update/words_level_positions.rs index 1b772c37da..f94507aabe 100644 --- a/milli/src/update/words_level_positions.rs +++ b/milli/src/update/words_level_positions.rs @@ -15,7 +15,7 @@ use crate::heed_codec::{StrLevelPositionCodec, CboRoaringBitmapCodec}; use crate::update::index_documents::WriteMethod; use crate::update::index_documents::{ create_writer, create_sorter, writer_into_reader, write_into_lmdb_database, - word_prefix_level_positions_docids_merge, sorter_into_lmdb_database + cbo_roaring_bitmap_merge, sorter_into_lmdb_database }; use crate::{Index, TreeLevel}; @@ -86,7 +86,7 @@ impl<'t, 'u, 'i> WordsLevelPositions<'t, 'u, 'i> { self.index.word_prefix_level_position_docids.clear(self.wtxn)?; let mut word_prefix_level_positions_docids_sorter = create_sorter( - word_prefix_level_positions_docids_merge, + cbo_roaring_bitmap_merge, self.chunk_compression_type, self.chunk_compression_level, self.chunk_fusing_shrink_size, @@ -119,7 +119,7 @@ impl<'t, 'u, 'i> WordsLevelPositions<'t, 'u, 'i> { self.wtxn, *self.index.word_prefix_level_position_docids.as_polymorph(), word_prefix_level_positions_docids_sorter, - word_prefix_level_positions_docids_merge, + cbo_roaring_bitmap_merge, WriteMethod::Append, )?;