Skip to content
This repository has been archived by the owner on Apr 4, 2023. It is now read-only.

Optimize document transform #402

Merged
merged 8 commits into from
Oct 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 11 additions & 21 deletions benchmarks/benches/utils.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![allow(dead_code)]

use std::fs::{create_dir_all, remove_dir_all, File};
use std::io::{self, Cursor, Read, Seek};
use std::io::{self, BufRead, BufReader, Cursor, Read, Seek};
use std::num::ParseFloatError;
use std::path::Path;

Expand Down Expand Up @@ -146,44 +146,34 @@ pub fn documents_from(filename: &str, filetype: &str) -> DocumentBatchReader<imp
DocumentBatchReader::from_reader(Cursor::new(documents)).unwrap()
}

fn documents_from_jsonl(reader: impl io::Read) -> anyhow::Result<Vec<u8>> {
fn documents_from_jsonl(reader: impl Read) -> anyhow::Result<Vec<u8>> {
let mut writer = Cursor::new(Vec::new());
let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?;

let values = serde_json::Deserializer::from_reader(reader)
.into_iter::<serde_json::Map<String, serde_json::Value>>();
for document in values {
let document = document?;
documents.add_documents(document)?;
let mut buf = String::new();
let mut reader = BufReader::new(reader);

while reader.read_line(&mut buf)? > 0 {
documents.extend_from_json(&mut buf.as_bytes())?;
}
documents.finish()?;

Ok(writer.into_inner())
}

fn documents_from_json(reader: impl io::Read) -> anyhow::Result<Vec<u8>> {
fn documents_from_json(reader: impl Read) -> anyhow::Result<Vec<u8>> {
let mut writer = Cursor::new(Vec::new());
let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?;

let json: serde_json::Value = serde_json::from_reader(reader)?;
documents.add_documents(json)?;
documents.extend_from_json(reader)?;
documents.finish()?;

Ok(writer.into_inner())
}

fn documents_from_csv(reader: impl io::Read) -> anyhow::Result<Vec<u8>> {
fn documents_from_csv(reader: impl Read) -> anyhow::Result<Vec<u8>> {
let mut writer = Cursor::new(Vec::new());
let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?;

let iter = CSVDocumentDeserializer::from_reader(reader)?;

for doc in iter {
let doc = doc?;
documents.add_documents(doc)?;
}

documents.finish()?;
milli::documents::DocumentBatchBuilder::from_csv(reader, &mut writer)?.finish()?;

Ok(writer.into_inner())
}
Expand Down
28 changes: 8 additions & 20 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::fs::File;
use std::io::{stdin, Cursor, Read};
use std::io::{stdin, BufRead, BufReader, Cursor, Read};
use std::path::PathBuf;
use std::str::FromStr;

Expand All @@ -9,7 +9,6 @@ use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use milli::update::UpdateIndexingStep::{
ComputeIdsAndMergeDocuments, IndexDocuments, MergeDataIntoFinalDatabase, RemapDocumentAddition,
};
use serde_json::{Map, Value};
use structopt::StructOpt;

#[cfg(target_os = "linux")]
Expand Down Expand Up @@ -202,11 +201,11 @@ fn documents_from_jsonl(reader: impl Read) -> Result<Vec<u8>> {
let mut writer = Cursor::new(Vec::new());
let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?;

let values = serde_json::Deserializer::from_reader(reader)
.into_iter::<serde_json::Map<String, serde_json::Value>>();
for document in values {
let document = document?;
documents.add_documents(document)?;
let mut buf = String::new();
let mut reader = BufReader::new(reader);

while reader.read_line(&mut buf)? > 0 {
documents.extend_from_json(&mut buf.as_bytes())?;
}
documents.finish()?;

Expand All @@ -217,26 +216,15 @@ fn documents_from_json(reader: impl Read) -> Result<Vec<u8>> {
let mut writer = Cursor::new(Vec::new());
let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?;

let json: serde_json::Value = serde_json::from_reader(reader)?;
documents.add_documents(json)?;
documents.extend_from_json(reader)?;
documents.finish()?;

Ok(writer.into_inner())
}

fn documents_from_csv(reader: impl Read) -> Result<Vec<u8>> {
let mut writer = Cursor::new(Vec::new());
let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?;

let mut records = csv::Reader::from_reader(reader);
let iter = records.deserialize::<Map<String, Value>>();

for doc in iter {
let doc = doc?;
documents.add_documents(doc)?;
}

documents.finish()?;
milli::documents::DocumentBatchBuilder::from_csv(reader, &mut writer)?.finish()?;

Ok(writer.into_inner())
}
Expand Down
Loading