Skip to content
This repository has been archived by the owner on Apr 4, 2023. It is now read-only.

Commit

Permalink
Prefer using an explicit merge function name
Browse files Browse the repository at this point in the history
  • Loading branch information
Kerollmops committed Jun 9, 2021
1 parent eb9f5b5 commit 07b221c
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 91 deletions.
72 changes: 17 additions & 55 deletions milli/src/update/index_documents/merge_function.rs
Original file line number Diff line number Diff line change
@@ -1,71 +1,33 @@
use std::borrow::Cow;

use anyhow::{bail, ensure, Context};
use anyhow::bail;
use bstr::ByteSlice as _;
use fst::IntoStreamer;
use roaring::RoaringBitmap;

use crate::heed_codec::CboRoaringBitmapCodec;

const WORDS_FST_KEY: &[u8] = crate::index::WORDS_FST_KEY.as_bytes();
const FIELDS_IDS_MAP_KEY: &[u8] = crate::index::FIELDS_IDS_MAP_KEY.as_bytes();
const DOCUMENTS_IDS_KEY: &[u8] = crate::index::DOCUMENTS_IDS_KEY.as_bytes();

pub fn main_merge(key: &[u8], values: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> {
match key {
WORDS_FST_KEY => {
let fsts: Vec<_> = values.iter().map(|v| fst::Set::new(v).unwrap()).collect();

// Union of the FSTs
let mut op = fst::set::OpBuilder::new();
fsts.iter().for_each(|fst| op.push(fst.into_stream()));
let op = op.r#union();

let mut build = fst::SetBuilder::memory();
build.extend_stream(op.into_stream()).unwrap();
Ok(build.into_inner().unwrap())
},
FIELDS_IDS_MAP_KEY => {
ensure!(values.windows(2).all(|vs| vs[0] == vs[1]), "fields ids map doesn't match");
Ok(values[0].to_vec())
},
DOCUMENTS_IDS_KEY => roaring_bitmap_merge(values),
otherwise => bail!("wut {:?}", otherwise),
}
}

pub fn word_docids_merge(_key: &[u8], values: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> {
roaring_bitmap_merge(values)
}

pub fn docid_word_positions_merge(key: &[u8], _values: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> {
bail!("merging docid word positions is an error ({:?})", key.as_bstr())
}

pub fn field_id_docid_facet_values_merge(_key: &[u8], values: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> {
let first = values.first().context("no value to merge")?;
ensure!(values.iter().all(|v| v == first), "invalid field id docid facet value merging");
Ok(first.to_vec())
}
// Union of multiple FSTs
pub fn fst_merge(_key: &[u8], values: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> {
let fsts = values.iter().map(fst::Set::new).collect::<Result<Vec<_>, _>>()?;
let op_builder: fst::set::OpBuilder = fsts.iter().map(|fst| fst.into_stream()).collect();
let op = op_builder.r#union();

pub fn words_pairs_proximities_docids_merge(_key: &[u8], values: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> {
cbo_roaring_bitmap_merge(values)
let mut build = fst::SetBuilder::memory();
build.extend_stream(op.into_stream()).unwrap();
Ok(build.into_inner().unwrap())
}

pub fn word_prefix_level_positions_docids_merge(_key: &[u8], values: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> {
cbo_roaring_bitmap_merge(values)
pub fn word_docids_merge(key: &[u8], values: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> {
roaring_bitmap_merge(key, values)
}

pub fn word_level_position_docids_merge(_key: &[u8], values: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> {
cbo_roaring_bitmap_merge(values)
}

pub fn field_id_word_count_docids_merge(_key: &[u8], values: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> {
cbo_roaring_bitmap_merge(values)
pub fn docid_word_positions_merge(key: &[u8], _values: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> {
panic!("merging docid word positions is an error ({:?})", key.as_bstr())
}

pub fn facet_field_value_docids_merge(_key: &[u8], values: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> {
cbo_roaring_bitmap_merge(values)
pub fn keep_first(_key: &[u8], values: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> {
Ok(values.first().unwrap().to_vec())
}

pub fn documents_merge(key: &[u8], _values: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> {
Expand All @@ -88,7 +50,7 @@ pub fn merge_two_obkvs(base: obkv::KvReader, update: obkv::KvReader, buffer: &mu
writer.finish().unwrap();
}

fn roaring_bitmap_merge(values: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> {
pub fn roaring_bitmap_merge(_key: &[u8], values: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> {
let (head, tail) = values.split_first().unwrap();
let mut head = RoaringBitmap::deserialize_from(&head[..])?;

Expand All @@ -102,7 +64,7 @@ fn roaring_bitmap_merge(values: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> {
Ok(vec)
}

fn cbo_roaring_bitmap_merge(values: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> {
pub fn cbo_roaring_bitmap_merge(_key: &[u8], values: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> {
let (head, tail) = values.split_first().unwrap();
let mut head = CboRoaringBitmapCodec::deserialize_from(&head[..])?;

Expand Down
31 changes: 14 additions & 17 deletions milli/src/update/index_documents/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,8 @@ use crate::update::{
};
use self::store::{Store, Readers};
pub use self::merge_function::{
main_merge, word_docids_merge, words_pairs_proximities_docids_merge,
docid_word_positions_merge, documents_merge,
word_level_position_docids_merge, word_prefix_level_positions_docids_merge,
facet_field_value_docids_merge, field_id_docid_facet_values_merge,
field_id_word_count_docids_merge,
fst_merge, word_docids_merge, cbo_roaring_bitmap_merge, roaring_bitmap_merge,
docid_word_positions_merge, documents_merge, keep_first
};
pub use self::transform::{Transform, TransformOutput};

Expand Down Expand Up @@ -539,22 +536,22 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
debug!("Merging the main, word docids and words pairs proximity docids in parallel...");
rayon::spawn(move || {
vec![
(DatabaseType::Main, main_readers, main_merge as MergeFn),
(DatabaseType::Main, main_readers, fst_merge as MergeFn),
(DatabaseType::WordDocids, word_docids_readers, word_docids_merge),
(
DatabaseType::FacetLevel0NumbersDocids,
facet_field_numbers_docids_readers,
facet_field_value_docids_merge,
cbo_roaring_bitmap_merge,
),
(
DatabaseType::WordLevel0PositionDocids,
word_level_position_docids_readers,
word_level_position_docids_merge,
cbo_roaring_bitmap_merge,
),
(
DatabaseType::FieldIdWordCountDocids,
field_id_word_count_docids_readers,
field_id_word_count_docids_merge,
cbo_roaring_bitmap_merge,
),
]
.into_par_iter()
Expand Down Expand Up @@ -657,7 +654,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
self.wtxn,
*self.index.facet_id_string_docids.as_polymorph(),
facet_field_strings_docids_readers,
facet_field_value_docids_merge,
cbo_roaring_bitmap_merge,
write_method,
)?;

Expand All @@ -672,7 +669,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
self.wtxn,
*self.index.field_id_docid_facet_f64s.as_polymorph(),
field_id_docid_facet_numbers_readers,
field_id_docid_facet_values_merge,
keep_first,
write_method,
)?;

Expand All @@ -687,7 +684,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
self.wtxn,
*self.index.field_id_docid_facet_strings.as_polymorph(),
field_id_docid_facet_strings_readers,
field_id_docid_facet_values_merge,
keep_first,
write_method,
)?;

Expand All @@ -702,7 +699,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
self.wtxn,
*self.index.word_pair_proximity_docids.as_polymorph(),
words_pairs_proximities_docids_readers,
words_pairs_proximities_docids_merge,
cbo_roaring_bitmap_merge,
write_method,
)?;

Expand All @@ -721,7 +718,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
self.wtxn,
self.index.main,
content,
main_merge,
fst_merge,
WriteMethod::GetMergePut,
)?;
},
Expand All @@ -743,7 +740,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
self.wtxn,
db,
content,
facet_field_value_docids_merge,
cbo_roaring_bitmap_merge,
write_method,
)?;
},
Expand All @@ -754,7 +751,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
self.wtxn,
db,
content,
field_id_word_count_docids_merge,
cbo_roaring_bitmap_merge,
write_method,
)?;
},
Expand All @@ -765,7 +762,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
self.wtxn,
db,
content,
word_level_position_docids_merge,
cbo_roaring_bitmap_merge,
write_method,
)?;
}
Expand Down
22 changes: 9 additions & 13 deletions milli/src/update/index_documents/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,7 @@ use crate::update::UpdateIndexingStep;
use crate::{json_to_string, SmallVec32, Position, DocumentId, FieldId};

use super::{MergeFn, create_writer, create_sorter, writer_into_reader};
use super::merge_function::{
main_merge, word_docids_merge, words_pairs_proximities_docids_merge,
word_level_position_docids_merge, facet_field_value_docids_merge,
field_id_docid_facet_values_merge, field_id_word_count_docids_merge,
};
use super::merge_function::{fst_merge, word_docids_merge, keep_first, cbo_roaring_bitmap_merge};

const LMDB_MAX_KEY_LENGTH: usize = 511;
const ONE_KILOBYTE: usize = 1024 * 1024;
Expand Down Expand Up @@ -104,7 +100,7 @@ impl<'s, A: AsRef<[u8]>> Store<'s, A> {
let linked_hash_map_size = linked_hash_map_size.unwrap_or(500);

let main_sorter = create_sorter(
main_merge,
fst_merge,
chunk_compression_type,
chunk_compression_level,
chunk_fusing_shrink_size,
Expand All @@ -120,55 +116,55 @@ impl<'s, A: AsRef<[u8]>> Store<'s, A> {
max_memory,
);
let words_pairs_proximities_docids_sorter = create_sorter(
words_pairs_proximities_docids_merge,
cbo_roaring_bitmap_merge,
chunk_compression_type,
chunk_compression_level,
chunk_fusing_shrink_size,
max_nb_chunks,
max_memory,
);
let word_level_position_docids_sorter = create_sorter(
word_level_position_docids_merge,
cbo_roaring_bitmap_merge,
chunk_compression_type,
chunk_compression_level,
chunk_fusing_shrink_size,
max_nb_chunks,
max_memory,
);
let field_id_word_count_docids_sorter = create_sorter(
field_id_word_count_docids_merge,
cbo_roaring_bitmap_merge,
chunk_compression_type,
chunk_compression_level,
chunk_fusing_shrink_size,
max_nb_chunks,
max_memory,
);
let facet_field_numbers_docids_sorter = create_sorter(
facet_field_value_docids_merge,
cbo_roaring_bitmap_merge,
chunk_compression_type,
chunk_compression_level,
chunk_fusing_shrink_size,
max_nb_chunks,
max_memory,
);
let facet_field_strings_docids_sorter = create_sorter(
facet_field_value_docids_merge,
cbo_roaring_bitmap_merge,
chunk_compression_type,
chunk_compression_level,
chunk_fusing_shrink_size,
max_nb_chunks,
max_memory,
);
let field_id_docid_facet_numbers_sorter = create_sorter(
field_id_docid_facet_values_merge,
keep_first,
chunk_compression_type,
chunk_compression_level,
chunk_fusing_shrink_size,
max_nb_chunks,
Some(1024 * 1024 * 1024), // 1MB
);
let field_id_docid_facet_strings_sorter = create_sorter(
field_id_docid_facet_values_merge,
keep_first,
chunk_compression_type,
chunk_compression_level,
chunk_fusing_shrink_size,
Expand Down
6 changes: 3 additions & 3 deletions milli/src/update/word_prefix_pair_proximity_docids.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::Index;
use crate::heed_codec::StrStrU8Codec;
use crate::update::index_documents::{
WriteMethod, create_sorter, sorter_into_lmdb_database,
words_pairs_proximities_docids_merge,
cbo_roaring_bitmap_merge,
};

pub struct WordPrefixPairProximityDocids<'t, 'u, 'i> {
Expand Down Expand Up @@ -50,7 +50,7 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> {

// Here we create a sorter akin to the previous one.
let mut word_prefix_pair_proximity_docids_sorter = create_sorter(
words_pairs_proximities_docids_merge,
cbo_roaring_bitmap_merge,
self.chunk_compression_type,
self.chunk_compression_level,
self.chunk_fusing_shrink_size,
Expand Down Expand Up @@ -80,7 +80,7 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> {
self.wtxn,
*self.index.word_prefix_pair_proximity_docids.as_polymorph(),
word_prefix_pair_proximity_docids_sorter,
words_pairs_proximities_docids_merge,
cbo_roaring_bitmap_merge,
WriteMethod::Append,
)?;

Expand Down
6 changes: 3 additions & 3 deletions milli/src/update/words_level_positions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::heed_codec::{StrLevelPositionCodec, CboRoaringBitmapCodec};
use crate::update::index_documents::WriteMethod;
use crate::update::index_documents::{
create_writer, create_sorter, writer_into_reader, write_into_lmdb_database,
word_prefix_level_positions_docids_merge, sorter_into_lmdb_database
cbo_roaring_bitmap_merge, sorter_into_lmdb_database
};
use crate::{Index, TreeLevel};

Expand Down Expand Up @@ -86,7 +86,7 @@ impl<'t, 'u, 'i> WordsLevelPositions<'t, 'u, 'i> {
self.index.word_prefix_level_position_docids.clear(self.wtxn)?;

let mut word_prefix_level_positions_docids_sorter = create_sorter(
word_prefix_level_positions_docids_merge,
cbo_roaring_bitmap_merge,
self.chunk_compression_type,
self.chunk_compression_level,
self.chunk_fusing_shrink_size,
Expand Down Expand Up @@ -119,7 +119,7 @@ impl<'t, 'u, 'i> WordsLevelPositions<'t, 'u, 'i> {
self.wtxn,
*self.index.word_prefix_level_position_docids.as_polymorph(),
word_prefix_level_positions_docids_sorter,
word_prefix_level_positions_docids_merge,
cbo_roaring_bitmap_merge,
WriteMethod::Append,
)?;

Expand Down

0 comments on commit 07b221c

Please sign in to comment.