Skip to content
This repository has been archived by the owner on Apr 4, 2023. It is now read-only.

Enriched documents batch reader #561

Merged
merged 35 commits into from
Jul 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
048e174
Do not allocate when parsing CSV headers
Kerollmops Jun 14, 2022
eb63af1
Update grenad to 0.4.2
Kerollmops Jun 14, 2022
419ce39
Rework the DocumentsBatchBuilder/Reader to use grenad
Kerollmops Jun 14, 2022
e8297ad
Fix the tests for the new DocumentsBatchBuilder/Reader
Kerollmops Jun 14, 2022
6d0498d
Fix the fuzz tests
Kerollmops Jun 14, 2022
a4ceef9
Fix the cli for the new DocumentsBatchBuilder/Reader structs
Kerollmops Jun 14, 2022
f29114f
Fix http-ui to fit with the new DocumentsBatchBuilder/Reader structs
Kerollmops Jun 14, 2022
a97d4d6
Fix the benchmarks
Kerollmops Jun 14, 2022
bdc4263
Introduce the validate_documents_batch function
Kerollmops Jun 14, 2022
cefffde
Improve the .gitignore of the fuzz crate
Kerollmops Jun 14, 2022
0146175
Introduce the validate_documents_batch function
Kerollmops Jun 14, 2022
fcfc4ca
Move the Object type in the lib.rs file and use it everywhere
Kerollmops Jun 15, 2022
399eec5
Fix the indexation tests
Kerollmops Jun 15, 2022
2ceeb51
Support the auto-generated ids when validating documents
Kerollmops Jun 15, 2022
19eb3b4
Make sur that we do not accept floats as documents ids
Kerollmops Jun 15, 2022
8ebf5ee
Make the nested primary key work
Kerollmops Jun 15, 2022
dc3f092
Do not leak an internal grenad Error
Kerollmops Jun 16, 2022
ea85220
Fix the format used for a geo deleting benchmark
Kerollmops Jun 22, 2022
6a0a0ae
Make the Transform read from an EnrichedDocumentsBatchReader
Kerollmops Jun 20, 2022
5f1bfb7
Extract the primary key name and make it accessible
Kerollmops Jun 21, 2022
7425430
Constify the default primary key name
Kerollmops Jun 21, 2022
905af2a
Use the primary key and external id in the transform
Kerollmops Jun 21, 2022
c8ebf0d
Rename the validate function as an enriching function
Kerollmops Jun 21, 2022
d1a4da9
Generate a real UUIDv4 when ids are auto-generated
Kerollmops Jun 21, 2022
0bbcc7b
Expose the `DocumentId` struct to be sure to inject the generated ids
Kerollmops Jun 21, 2022
5d149d6
Remove tests for a function that no more exists
Kerollmops Jun 30, 2022
2eec290
Check the validity of the latitute and longitude numbers
Kerollmops Jul 11, 2022
dc61105
Fix the nested document id fetching function
Kerollmops Jul 11, 2022
a892a4a
Introduce a function to extend from a JSON array of objects
Kerollmops Jul 11, 2022
192793e
Add some tests to check for the nested documents ids
Kerollmops Jul 12, 2022
25e768f
Fix another issue with the nested primary key selector
Kerollmops Jul 12, 2022
448114c
Fix the benchmarks with the new indexation API
Kerollmops Jul 12, 2022
ab1571c
Simplify Transform::read_documents, enabled by enriched documents reader
Jul 18, 2022
fc9f3f3
Change DocumentsBatchReader to access cursor and index at same time
Jul 18, 2022
41a0ce0
Add a code comment, as suggested in PR review
loiclec Jul 20, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 91 additions & 61 deletions benchmarks/benches/indexing.rs

Large diffs are not rendered by default.

54 changes: 24 additions & 30 deletions benchmarks/benches/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ use std::path::Path;

use criterion::BenchmarkId;
use heed::EnvOpenOptions;
use milli::documents::DocumentBatchReader;
use milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader};
use milli::update::{
IndexDocuments, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings,
};
use milli::{Filter, Index};
use serde_json::{Map, Value};
use milli::{Filter, Index, Object};
use serde_json::Value;

pub struct Conf<'a> {
/// where we are going to create our database.mmdb directory
Expand Down Expand Up @@ -96,12 +96,10 @@ pub fn base_setup(conf: &Conf) -> Index {
update_method: IndexDocumentsMethod::ReplaceDocuments,
..Default::default()
};
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()).unwrap();
let builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()).unwrap();
let documents = documents_from(conf.dataset, conf.dataset_format);

builder.add_documents(documents).unwrap();

let (builder, user_error) = builder.add_documents(documents).unwrap();
user_error.unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();

Expand Down Expand Up @@ -140,7 +138,7 @@ pub fn run_benches(c: &mut criterion::Criterion, confs: &[Conf]) {
}
}

pub fn documents_from(filename: &str, filetype: &str) -> DocumentBatchReader<impl BufRead + Seek> {
pub fn documents_from(filename: &str, filetype: &str) -> DocumentsBatchReader<impl BufRead + Seek> {
let reader =
File::open(filename).expect(&format!("could not find the dataset in: {}", filename));
let reader = BufReader::new(reader);
Expand All @@ -150,39 +148,35 @@ pub fn documents_from(filename: &str, filetype: &str) -> DocumentBatchReader<imp
"jsonl" => documents_from_jsonl(reader).unwrap(),
otherwise => panic!("invalid update format {:?}", otherwise),
};
DocumentBatchReader::from_reader(Cursor::new(documents)).unwrap()
DocumentsBatchReader::from_reader(Cursor::new(documents)).unwrap()
}

fn documents_from_jsonl(mut reader: impl BufRead) -> anyhow::Result<Vec<u8>> {
let mut writer = Cursor::new(Vec::new());
let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?;
fn documents_from_jsonl(reader: impl BufRead) -> anyhow::Result<Vec<u8>> {
let mut documents = DocumentsBatchBuilder::new(Vec::new());

let mut buf = String::new();

while reader.read_line(&mut buf)? > 0 {
documents.extend_from_json(&mut buf.as_bytes())?;
buf.clear();
for result in serde_json::Deserializer::from_reader(reader).into_iter::<Object>() {
let object = result?;
documents.append_json_object(&object)?;
}
documents.finish()?;

Ok(writer.into_inner())
documents.into_inner().map_err(Into::into)
}

fn documents_from_json(reader: impl BufRead) -> anyhow::Result<Vec<u8>> {
let mut writer = Cursor::new(Vec::new());
let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?;
let mut documents = DocumentsBatchBuilder::new(Vec::new());

documents.extend_from_json(reader)?;
documents.finish()?;
documents.append_json_array(reader)?;

Ok(writer.into_inner())
documents.into_inner().map_err(Into::into)
}

fn documents_from_csv(reader: impl BufRead) -> anyhow::Result<Vec<u8>> {
let mut writer = Cursor::new(Vec::new());
milli::documents::DocumentBatchBuilder::from_csv(reader, &mut writer)?.finish()?;
let csv = csv::Reader::from_reader(reader);

let mut documents = DocumentsBatchBuilder::new(Vec::new());
documents.append_csv(csv)?;

Ok(writer.into_inner())
documents.into_inner().map_err(Into::into)
}

enum AllowedType {
Expand Down Expand Up @@ -222,14 +216,14 @@ impl<R: Read> CSVDocumentDeserializer<R> {
}

impl<R: Read> Iterator for CSVDocumentDeserializer<R> {
type Item = anyhow::Result<Map<String, Value>>;
type Item = anyhow::Result<Object>;

fn next(&mut self) -> Option<Self::Item> {
let csv_document = self.documents.next()?;

match csv_document {
Ok(csv_document) => {
let mut document = Map::new();
let mut document = Object::new();

for ((field_name, field_type), value) in
self.headers.iter().zip(csv_document.into_iter())
Expand Down
48 changes: 24 additions & 24 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ use std::time::Instant;
use byte_unit::Byte;
use eyre::Result;
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader};
use milli::update::UpdateIndexingStep::{
ComputeIdsAndMergeDocuments, IndexDocuments, MergeDataIntoFinalDatabase, RemapDocumentAddition,
};
use milli::update::{self, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig};
use milli::Index;
use serde_json::{Map, Value};
use milli::{Index, Object};
use structopt::StructOpt;

#[cfg(target_os = "linux")]
Expand Down Expand Up @@ -225,9 +225,9 @@ impl Performer for DocumentAddition {
DocumentAdditionFormat::Jsonl => documents_from_jsonl(reader)?,
};

let reader = milli::documents::DocumentBatchReader::from_reader(Cursor::new(documents))?;
let reader = DocumentsBatchReader::from_reader(Cursor::new(documents))?;

println!("Adding {} documents to the index.", reader.len());
println!("Adding {} documents to the index.", reader.documents_count());

let mut txn = index.write_txn()?;
let config = milli::update::IndexerConfig { log_every_n: Some(100), ..Default::default() };
Expand Down Expand Up @@ -255,15 +255,18 @@ impl Performer for DocumentAddition {
let bar = progesses.add(bar);
bars.push(bar);
}
let mut addition = milli::update::IndexDocuments::new(
let addition = milli::update::IndexDocuments::new(
&mut txn,
&index,
&config,
indexing_config,
|step| indexing_callback(step, &bars),
)
.unwrap();
addition.add_documents(reader)?;
let (addition, user_error) = addition.add_documents(reader)?;
if let Err(error) = user_error {
return Err(error.into());
}

std::thread::spawn(move || {
progesses.join().unwrap();
Expand Down Expand Up @@ -321,35 +324,32 @@ fn indexing_callback(step: milli::update::UpdateIndexingStep, bars: &[ProgressBa
}

fn documents_from_jsonl(reader: impl Read) -> Result<Vec<u8>> {
let mut writer = Cursor::new(Vec::new());
let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?;

let mut buf = String::new();
let mut reader = BufReader::new(reader);
let mut documents = DocumentsBatchBuilder::new(Vec::new());
let reader = BufReader::new(reader);

while reader.read_line(&mut buf)? > 0 {
documents.extend_from_json(&mut buf.as_bytes())?;
for result in serde_json::Deserializer::from_reader(reader).into_iter::<Object>() {
let object = result?;
documents.append_json_object(&object)?;
}
documents.finish()?;

Ok(writer.into_inner())
documents.into_inner().map_err(Into::into)
}

fn documents_from_json(reader: impl Read) -> Result<Vec<u8>> {
let mut writer = Cursor::new(Vec::new());
let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?;
let mut documents = DocumentsBatchBuilder::new(Vec::new());

documents.extend_from_json(reader)?;
documents.finish()?;
documents.append_json_array(reader)?;

Ok(writer.into_inner())
documents.into_inner().map_err(Into::into)
}

fn documents_from_csv(reader: impl Read) -> Result<Vec<u8>> {
let mut writer = Cursor::new(Vec::new());
milli::documents::DocumentBatchBuilder::from_csv(reader, &mut writer)?.finish()?;
let csv = csv::Reader::from_reader(reader);

let mut documents = DocumentsBatchBuilder::new(Vec::new());
documents.append_csv(csv)?;

Ok(writer.into_inner())
documents.into_inner().map_err(Into::into)
}

#[derive(Debug, StructOpt)]
Expand Down Expand Up @@ -423,7 +423,7 @@ impl Search {
filter: &Option<String>,
offset: &Option<usize>,
limit: &Option<usize>,
) -> Result<Vec<Map<String, Value>>> {
) -> Result<Vec<Object>> {
let txn = index.read_txn()?;
let mut search = index.search(&txn);

Expand Down
60 changes: 27 additions & 33 deletions http-ui/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod update_store;
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::fmt::Display;
use std::fs::{create_dir_all, File};
use std::io::{BufRead, BufReader, Cursor, Read};
use std::io::{BufReader, Cursor, Read};
use std::net::SocketAddr;
use std::num::{NonZeroU32, NonZeroUsize};
use std::path::PathBuf;
Expand All @@ -18,19 +18,19 @@ use either::Either;
use flate2::read::GzDecoder;
use futures::{stream, FutureExt, StreamExt};
use heed::EnvOpenOptions;
use milli::documents::DocumentBatchReader;
use milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader};
use milli::tokenizer::TokenizerBuilder;
use milli::update::UpdateIndexingStep::*;
use milli::update::{
ClearDocuments, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Setting,
};
use milli::{
obkv_to_json, CompressionType, Filter as MilliFilter, FilterCondition, FormatOptions, Index,
MatcherBuilder, SearchResult, SortError,
MatcherBuilder, Object, SearchResult, SortError,
};
use once_cell::sync::OnceCell;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use serde_json::Value;
use structopt::StructOpt;
use tokio::fs::File as TFile;
use tokio::io::AsyncWriteExt;
Expand Down Expand Up @@ -169,11 +169,7 @@ impl<'s, A: AsRef<[u8]>> Highlighter<'s, A> {
}
}

fn highlight_record(
&self,
object: &mut Map<String, Value>,
attributes_to_highlight: &HashSet<String>,
) {
fn highlight_record(&self, object: &mut Object, attributes_to_highlight: &HashSet<String>) {
// TODO do we need to create a string for element that are not and needs to be highlight?
for (key, value) in object.iter_mut() {
if attributes_to_highlight.contains(key) {
Expand Down Expand Up @@ -378,7 +374,7 @@ async fn main() -> anyhow::Result<()> {
});
};

let mut builder = milli::update::IndexDocuments::new(
let builder = milli::update::IndexDocuments::new(
&mut wtxn,
&index_cloned,
GLOBAL_CONFIG.get().unwrap(),
Expand All @@ -399,10 +395,10 @@ async fn main() -> anyhow::Result<()> {
otherwise => panic!("invalid update format {:?}", otherwise),
};

let documents = DocumentBatchReader::from_reader(Cursor::new(documents))?;

builder.add_documents(documents)?;
let documents = DocumentsBatchReader::from_reader(Cursor::new(documents))?;

let (builder, user_error) = builder.add_documents(documents)?;
let _count = user_error?;
let result = builder.execute();

match result {
Expand Down Expand Up @@ -708,7 +704,7 @@ async fn main() -> anyhow::Result<()> {
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct Answer {
documents: Vec<Map<String, Value>>,
documents: Vec<Object>,
number_of_candidates: u64,
facets: BTreeMap<String, BTreeMap<String, u64>>,
}
Expand Down Expand Up @@ -1032,35 +1028,33 @@ async fn main() -> anyhow::Result<()> {
Ok(())
}

fn documents_from_jsonl(reader: impl io::Read) -> anyhow::Result<Vec<u8>> {
let mut writer = Cursor::new(Vec::new());
let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?;
fn documents_from_jsonl(reader: impl Read) -> anyhow::Result<Vec<u8>> {
let mut documents = DocumentsBatchBuilder::new(Vec::new());
let reader = BufReader::new(reader);

for result in BufReader::new(reader).lines() {
let line = result?;
documents.extend_from_json(Cursor::new(line))?;
for result in serde_json::Deserializer::from_reader(reader).into_iter::<Object>() {
let object = result?;
documents.append_json_object(&object)?;
}

documents.finish()?;

Ok(writer.into_inner())
documents.into_inner().map_err(Into::into)
}

fn documents_from_json(reader: impl io::Read) -> anyhow::Result<Vec<u8>> {
let mut writer = Cursor::new(Vec::new());
let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?;
fn documents_from_json(reader: impl Read) -> anyhow::Result<Vec<u8>> {
let mut documents = DocumentsBatchBuilder::new(Vec::new());

documents.extend_from_json(reader)?;
documents.finish()?;
documents.append_json_array(reader)?;

Ok(writer.into_inner())
documents.into_inner().map_err(Into::into)
}

fn documents_from_csv(reader: impl io::Read) -> anyhow::Result<Vec<u8>> {
let mut writer = Cursor::new(Vec::new());
milli::documents::DocumentBatchBuilder::from_csv(reader, &mut writer)?.finish()?;
fn documents_from_csv(reader: impl Read) -> anyhow::Result<Vec<u8>> {
let csv = csv::Reader::from_reader(reader);

let mut documents = DocumentsBatchBuilder::new(Vec::new());
documents.append_csv(csv)?;

Ok(writer.into_inner())
documents.into_inner().map_err(Into::into)
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion milli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ flatten-serde-json = { path = "../flatten-serde-json" }
fst = "0.4.7"
fxhash = "0.2.1"
geoutils = "0.4.1"
grenad = { version = "0.4.1", default-features = false, features = ["tempfile"] }
grenad = { version = "0.4.2", default-features = false, features = ["tempfile"] }
heed = { git = "https://github.com/meilisearch/heed", tag = "v0.12.1", default-features = false, features = ["lmdb", "sync-read-txn"] }
json-depth-checker = { path = "../json-depth-checker" }
levenshtein_automata = { version = "0.2.1", features = ["fst_automaton"] }
Expand Down
3 changes: 3 additions & 0 deletions milli/fuzz/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
Cargo.lock
target/

/corpus/
/artifacts/
Loading