From 055ba132e742929af6b22074389b7cb15bcc995e Mon Sep 17 00:00:00 2001 From: marin postma Date: Mon, 25 Oct 2021 17:38:32 +0200 Subject: [PATCH] implement review suggestions --- cli/src/main.rs | 28 +- http-ui/src/documents_from_csv.rs | 285 ------------------ http-ui/src/main.rs | 28 +- milli/src/documents/builder.rs | 54 ++-- milli/src/documents/mod.rs | 17 +- milli/src/documents/serde.rs | 39 +-- milli/src/update/index_documents/mod.rs | 3 +- milli/src/update/index_documents/transform.rs | 3 +- 8 files changed, 78 insertions(+), 379 deletions(-) delete mode 100644 http-ui/src/documents_from_csv.rs diff --git a/cli/src/main.rs b/cli/src/main.rs index b84ff32432..8e28d4a255 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -1,5 +1,5 @@ use std::fs::File; -use std::io::{stdin, Cursor, Read}; +use std::io::{stdin, BufRead, BufReader, Cursor, Read}; use std::path::PathBuf; use std::str::FromStr; @@ -9,7 +9,6 @@ use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use milli::update::UpdateIndexingStep::{ ComputeIdsAndMergeDocuments, IndexDocuments, MergeDataIntoFinalDatabase, RemapDocumentAddition, }; -use serde_json::{Map, Value}; use structopt::StructOpt; #[cfg(target_os = "linux")] @@ -202,11 +201,11 @@ fn documents_from_jsonl(reader: impl Read) -> Result> { let mut writer = Cursor::new(Vec::new()); let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?; - let values = serde_json::Deserializer::from_reader(reader) - .into_iter::>(); - for document in values { - let document = document?; - documents.add_documents(document)?; + let mut buf = String::new(); + let mut reader = BufReader::new(reader); + + while reader.read_line(&mut buf)? > 0 { + documents.extend_from_json(&mut buf.as_bytes())?; } documents.finish()?; @@ -217,8 +216,7 @@ fn documents_from_json(reader: impl Read) -> Result> { let mut writer = Cursor::new(Vec::new()); let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?; - let json: serde_json::Value = serde_json::from_reader(reader)?; - documents.add_documents(json)?; + documents.extend_from_json(reader)?; documents.finish()?; Ok(writer.into_inner()) @@ -226,17 +224,7 @@ fn documents_from_json(reader: impl Read) -> Result> { fn documents_from_csv(reader: impl Read) -> Result> { let mut writer = Cursor::new(Vec::new()); - let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?; - - let mut records = csv::Reader::from_reader(reader); - let iter = records.deserialize::>(); - - for doc in iter { - let doc = doc?; - documents.add_documents(doc)?; - } - - documents.finish()?; + milli::documents::DocumentBatchBuilder::from_csv(reader, &mut writer)?.finish()?; Ok(writer.into_inner()) } diff --git a/http-ui/src/documents_from_csv.rs b/http-ui/src/documents_from_csv.rs deleted file mode 100644 index 2b62f23c29..0000000000 --- a/http-ui/src/documents_from_csv.rs +++ /dev/null @@ -1,285 +0,0 @@ -use std::io::{Read, Result as IoResult}; -use std::num::ParseFloatError; - -use serde_json::{Map, Value}; - -enum AllowedType { - String, - Number, -} - -fn parse_csv_header(header: &str) -> (String, AllowedType) { - // if there are several separators we only split on the last one. - match header.rsplit_once(':') { - Some((field_name, field_type)) => match field_type { - "string" => (field_name.to_string(), AllowedType::String), - "number" => (field_name.to_string(), AllowedType::Number), - // we may return an error in this case. - _otherwise => (header.to_string(), AllowedType::String), - }, - None => (header.to_string(), AllowedType::String), - } -} - -pub struct CSVDocumentDeserializer -where - R: Read, -{ - documents: csv::StringRecordsIntoIter, - headers: Vec<(String, AllowedType)>, -} - -impl CSVDocumentDeserializer { - pub fn from_reader(reader: R) -> IoResult { - let mut records = csv::Reader::from_reader(reader); - - let headers = records.headers()?.into_iter().map(parse_csv_header).collect(); - - Ok(Self { documents: records.into_records(), headers }) - } -} - -impl Iterator for CSVDocumentDeserializer { - type Item = anyhow::Result>; - - fn next(&mut self) -> Option { - let csv_document = self.documents.next()?; - - match csv_document { - Ok(csv_document) => { - let mut document = Map::new(); - - for ((field_name, field_type), value) in - self.headers.iter().zip(csv_document.into_iter()) - { - let parsed_value: Result = match field_type { - AllowedType::Number => { - value.parse::().map(Value::from).map_err(Into::into) - } - AllowedType::String => Ok(Value::String(value.to_string())), - }; - - match parsed_value { - Ok(value) => drop(document.insert(field_name.to_string(), value)), - Err(_e) => { - return Some(Err(anyhow::anyhow!( - "Value '{}' is not a valid number", - value - ))) - } - } - } - - Some(Ok(document)) - } - Err(e) => Some(Err(anyhow::anyhow!("Error parsing csv document: {}", e))), - } - } -} - -#[cfg(test)] -mod test { - use serde_json::json; - - use super::*; - - #[test] - fn simple_csv_document() { - let documents = r#"city,country,pop -"Boston","United States","4628910""#; - - let mut csv_iter = CSVDocumentDeserializer::from_reader(documents.as_bytes()).unwrap(); - - assert_eq!( - Value::Object(csv_iter.next().unwrap().unwrap()), - json!({ - "city": "Boston", - "country": "United States", - "pop": "4628910", - }) - ); - } - - #[test] - fn coma_in_field() { - let documents = r#"city,country,pop -"Boston","United, States","4628910""#; - - let mut csv_iter = CSVDocumentDeserializer::from_reader(documents.as_bytes()).unwrap(); - - assert_eq!( - Value::Object(csv_iter.next().unwrap().unwrap()), - json!({ - "city": "Boston", - "country": "United, States", - "pop": "4628910", - }) - ); - } - - #[test] - fn quote_in_field() { - let documents = r#"city,country,pop -"Boston","United"" States","4628910""#; - - let mut csv_iter = CSVDocumentDeserializer::from_reader(documents.as_bytes()).unwrap(); - - assert_eq!( - Value::Object(csv_iter.next().unwrap().unwrap()), - json!({ - "city": "Boston", - "country": "United\" States", - "pop": "4628910", - }) - ); - } - - #[test] - fn integer_in_field() { - let documents = r#"city,country,pop:number -"Boston","United States","4628910""#; - - let mut csv_iter = CSVDocumentDeserializer::from_reader(documents.as_bytes()).unwrap(); - - assert_eq!( - Value::Object(csv_iter.next().unwrap().unwrap()), - json!({ - "city": "Boston", - "country": "United States", - "pop": 4628910.0, - }) - ); - } - - #[test] - fn float_in_field() { - let documents = r#"city,country,pop:number -"Boston","United States","4628910.01""#; - - let mut csv_iter = CSVDocumentDeserializer::from_reader(documents.as_bytes()).unwrap(); - - assert_eq!( - Value::Object(csv_iter.next().unwrap().unwrap()), - json!({ - "city": "Boston", - "country": "United States", - "pop": 4628910.01, - }) - ); - } - - #[test] - fn several_double_dot_in_header() { - let documents = r#"city:love:string,country:state,pop -"Boston","United States","4628910""#; - - let mut csv_iter = CSVDocumentDeserializer::from_reader(documents.as_bytes()).unwrap(); - - assert_eq!( - Value::Object(csv_iter.next().unwrap().unwrap()), - json!({ - "city:love": "Boston", - "country:state": "United States", - "pop": "4628910", - }) - ); - } - - #[test] - fn ending_by_double_dot_in_header() { - let documents = r#"city:,country,pop -"Boston","United States","4628910""#; - - let mut csv_iter = CSVDocumentDeserializer::from_reader(documents.as_bytes()).unwrap(); - - assert_eq!( - Value::Object(csv_iter.next().unwrap().unwrap()), - json!({ - "city:": "Boston", - "country": "United States", - "pop": "4628910", - }) - ); - } - - #[test] - fn starting_by_double_dot_in_header() { - let documents = r#":city,country,pop -"Boston","United States","4628910""#; - - let mut csv_iter = CSVDocumentDeserializer::from_reader(documents.as_bytes()).unwrap(); - - assert_eq!( - Value::Object(csv_iter.next().unwrap().unwrap()), - json!({ - ":city": "Boston", - "country": "United States", - "pop": "4628910", - }) - ); - } - - #[test] - fn starting_by_double_dot_in_header2() { - let documents = r#":string,country,pop -"Boston","United States","4628910""#; - - let mut csv_iter = CSVDocumentDeserializer::from_reader(documents.as_bytes()).unwrap(); - - assert_eq!( - Value::Object(csv_iter.next().unwrap().unwrap()), - json!({ - "": "Boston", - "country": "United States", - "pop": "4628910", - }) - ); - } - - #[test] - fn double_double_dot_in_header() { - let documents = r#"city::string,country,pop -"Boston","United States","4628910""#; - - let mut csv_iter = CSVDocumentDeserializer::from_reader(documents.as_bytes()).unwrap(); - - assert_eq!( - Value::Object(csv_iter.next().unwrap().unwrap()), - json!({ - "city:": "Boston", - "country": "United States", - "pop": "4628910", - }) - ); - } - - #[test] - fn bad_type_in_header() { - let documents = r#"city,country:number,pop -"Boston","United States","4628910""#; - - let mut csv_iter = CSVDocumentDeserializer::from_reader(documents.as_bytes()).unwrap(); - - assert!(csv_iter.next().unwrap().is_err()); - } - - #[test] - fn bad_column_count1() { - let documents = r#"city,country,pop -"Boston","United States","4628910", "too much""#; - - let mut csv_iter = CSVDocumentDeserializer::from_reader(documents.as_bytes()).unwrap(); - - assert!(csv_iter.next().unwrap().is_err()); - } - - #[test] - fn bad_column_count2() { - let documents = r#"city,country,pop -"Boston","United States""#; - - let mut csv_iter = CSVDocumentDeserializer::from_reader(documents.as_bytes()).unwrap(); - - assert!(csv_iter.next().unwrap().is_err()); - } -} diff --git a/http-ui/src/main.rs b/http-ui/src/main.rs index d27c6d5bb5..9e9fe4a2b4 100644 --- a/http-ui/src/main.rs +++ b/http-ui/src/main.rs @@ -1,10 +1,9 @@ -mod documents_from_csv; mod update_store; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::fmt::Display; use std::fs::{create_dir_all, File}; -use std::io::Cursor; +use std::io::{BufRead, BufReader, Cursor}; use std::net::SocketAddr; use std::num::{NonZeroU32, NonZeroUsize}; use std::path::PathBuf; @@ -39,7 +38,6 @@ use warp::http::Response; use warp::Filter; use self::update_store::UpdateStore; -use crate::documents_from_csv::CSVDocumentDeserializer; #[cfg(target_os = "linux")] #[global_allocator] @@ -1041,11 +1039,11 @@ fn documents_from_jsonl(reader: impl io::Read) -> anyhow::Result> { let mut writer = Cursor::new(Vec::new()); let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?; - let values = serde_json::Deserializer::from_reader(reader) - .into_iter::>(); - for document in values { - let document = document?; - documents.add_documents(document)?; + let mut buf = String::new(); + let mut reader = BufReader::new(reader); + + while reader.read_line(&mut buf)? > 0 { + documents.extend_from_json(&mut buf.as_bytes())?; } documents.finish()?; @@ -1056,8 +1054,7 @@ fn documents_from_json(reader: impl io::Read) -> anyhow::Result> { let mut writer = Cursor::new(Vec::new()); let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?; - let json: serde_json::Value = serde_json::from_reader(reader)?; - documents.add_documents(json)?; + documents.extend_from_json(reader)?; documents.finish()?; Ok(writer.into_inner()) @@ -1065,16 +1062,7 @@ fn documents_from_json(reader: impl io::Read) -> anyhow::Result> { fn documents_from_csv(reader: impl io::Read) -> anyhow::Result> { let mut writer = Cursor::new(Vec::new()); - let mut documents = milli::documents::DocumentBatchBuilder::new(&mut writer)?; - - let iter = CSVDocumentDeserializer::from_reader(reader)?; - - for doc in iter { - let doc = doc?; - documents.add_documents(doc)?; - } - - documents.finish()?; + milli::documents::DocumentBatchBuilder::from_csv(reader, &mut writer)?.finish()?; Ok(writer.into_inner()) } diff --git a/milli/src/documents/builder.rs b/milli/src/documents/builder.rs index c2b0e01ccc..6ba890b79a 100644 --- a/milli/src/documents/builder.rs +++ b/milli/src/documents/builder.rs @@ -1,16 +1,14 @@ use std::collections::BTreeMap; use std::io; -use std::io::Cursor; -use std::io::Write; +use std::io::{Cursor, Write}; use byteorder::{BigEndian, WriteBytesExt}; use serde::Deserializer; use serde_json::Value; -use crate::FieldId; - use super::serde::DocumentVisitor; use super::{ByteCounter, DocumentsBatchIndex, DocumentsMetadata, Error}; +use crate::FieldId; /// The `DocumentsBatchBuilder` provides a way to build a documents batch in the intermediary /// format used by milli. @@ -27,7 +25,7 @@ use super::{ByteCounter, DocumentsBatchIndex, DocumentsMetadata, Error}; /// let json = r##"{"id": 1, "name": "foo"}"##; /// let mut writer = Cursor::new(Vec::new()); /// let mut builder = DocumentBatchBuilder::new(&mut writer).unwrap(); -/// builder.extend_from_json(Cursor::new(json.as_bytes())).unwrap(); +/// builder.extend_from_json(&mut json.as_bytes()).unwrap(); /// builder.finish().unwrap(); /// ``` pub struct DocumentBatchBuilder { @@ -46,16 +44,14 @@ impl DocumentBatchBuilder { // add space to write the offset of the metadata at the end of the writer writer.write_u64::(0)?; - let this = Self { + Ok(Self { inner: writer, index, obkv_buffer: Vec::new(), value_buffer: Vec::new(), values: BTreeMap::new(), count: 0, - }; - - Ok(this) + }) } /// Returns the number of documents that have been written to the builder. @@ -117,27 +113,31 @@ impl DocumentBatchBuilder { for (i, record) in records.into_records().enumerate() { let record = record?; - let mut writer = obkv::KvWriter::new(Cursor::new(&mut this.obkv_buffer)); + this.obkv_buffer.clear(); + let mut writer = obkv::KvWriter::new(&mut this.obkv_buffer); for (value, (fid, ty)) in record.into_iter().zip(headers.iter()) { let value = match ty { - AllowedType::Number => value.parse::().map(Value::from).map_err(|error| Error::ParseFloat { - error, - // +1 for the header offset. - line: i + 1, - value: value.to_string(), - })?, + AllowedType::Number => { + value.parse::().map(Value::from).map_err(|error| { + Error::ParseFloat { + error, + // +1 for the header offset. + line: i + 1, + value: value.to_string(), + } + })? + } AllowedType::String => Value::String(value.to_string()), }; + this.value_buffer.clear(); serde_json::to_writer(Cursor::new(&mut this.value_buffer), &value)?; writer.insert(*fid, &this.value_buffer)?; - this.value_buffer.clear(); } this.inner.write_u32::(this.obkv_buffer.len() as u32)?; this.inner.write_all(&this.obkv_buffer)?; - this.obkv_buffer.clear(); this.count += 1; } @@ -156,7 +156,8 @@ fn parse_csv_header(header: &str) -> (String, AllowedType) { match header.rsplit_once(':') { Some((field_name, field_type)) => match field_type { "string" => (field_name.to_string(), AllowedType::String), - "number" => (field_name.to_string(), AllowedType::Number), // if the pattern isn't reconized, we keep the whole field. + "number" => (field_name.to_string(), AllowedType::Number), + // if the pattern isn't reconized, we keep the whole field. _otherwise => (header.to_string(), AllowedType::String), }, None => (header.to_string(), AllowedType::String), @@ -169,9 +170,8 @@ mod test { use serde_json::{json, Map}; - use crate::documents::DocumentBatchReader; - use super::*; + use crate::documents::DocumentBatchReader; fn obkv_to_value(obkv: &obkv::KvReader, index: &DocumentsBatchIndex) -> Value { let mut map = Map::new(); @@ -525,7 +525,9 @@ mod test { "Boston","United States","4628910""#; let mut buf = Vec::new(); - assert!(DocumentBatchBuilder::from_csv(documents.as_bytes(), Cursor::new(&mut buf)).is_err()); + assert!( + DocumentBatchBuilder::from_csv(documents.as_bytes(), Cursor::new(&mut buf)).is_err() + ); } #[test] @@ -534,7 +536,9 @@ mod test { "Boston","United States","4628910", "too much""#; let mut buf = Vec::new(); - assert!(DocumentBatchBuilder::from_csv(documents.as_bytes(), Cursor::new(&mut buf)).is_err()); + assert!( + DocumentBatchBuilder::from_csv(documents.as_bytes(), Cursor::new(&mut buf)).is_err() + ); } #[test] @@ -543,6 +547,8 @@ mod test { "Boston","United States""#; let mut buf = Vec::new(); - assert!(DocumentBatchBuilder::from_csv(documents.as_bytes(), Cursor::new(&mut buf)).is_err()); + assert!( + DocumentBatchBuilder::from_csv(documents.as_bytes(), Cursor::new(&mut buf)).is_err() + ); } } diff --git a/milli/src/documents/mod.rs b/milli/src/documents/mod.rs index 6f653d825a..14d97ee7dc 100644 --- a/milli/src/documents/mod.rs +++ b/milli/src/documents/mod.rs @@ -7,9 +7,8 @@ mod builder; mod reader; mod serde; -use std::num::ParseFloatError; -use std::io; use std::fmt::{self, Debug}; +use std::io; use ::serde::{Deserialize, Serialize}; use bimap::BiHashMap; @@ -24,7 +23,7 @@ pub struct DocumentsBatchIndex(pub BiHashMap); impl DocumentsBatchIndex { /// Insert the field in the map, or return it's field id if it doesn't already exists. - pub fn insert(&mut self, field: &str) -> FieldId { + pub fn insert(&mut self, field: &str) -> FieldId { match self.0.get_by_right(field) { Some(field_id) => *field_id, None => { @@ -43,7 +42,7 @@ impl DocumentsBatchIndex { self.0.len() } - pub fn iter(&self) -> impl Iterator { + pub fn iter(&self) -> bimap::hash::Iter { self.0.iter() } @@ -83,11 +82,7 @@ impl io::Write for ByteCounter { #[derive(Debug)] pub enum Error { - ParseFloat { - error: std::num::ParseFloatError, - line: usize, - value: String, - }, + ParseFloat { error: std::num::ParseFloatError, line: usize, value: String }, InvalidDocumentFormat, Custom(String), JsonError(serde_json::Error), @@ -124,7 +119,9 @@ impl From for Error { impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - Error::ParseFloat { error, line, value} => write!(f, "Error parsing number {:?} at line {}: {}", value, line, error), + Error::ParseFloat { error, line, value } => { + write!(f, "Error parsing number {:?} at line {}: {}", value, line, error) + } Error::Custom(s) => write!(f, "Unexpected serialization error: {}", s), Error::InvalidDocumentFormat => f.write_str("Invalid document addition format."), Error::JsonError(err) => write!(f, "Couldn't serialize document value: {}", err), diff --git a/milli/src/documents/serde.rs b/milli/src/documents/serde.rs index 4dfdca2c75..d57bf1ffbb 100644 --- a/milli/src/documents/serde.rs +++ b/milli/src/documents/serde.rs @@ -1,18 +1,13 @@ use std::collections::BTreeMap; -use std::io::Cursor; -use std::io::Write; use std::fmt; +use std::io::{Cursor, Write}; use byteorder::WriteBytesExt; +use serde::de::{DeserializeSeed, MapAccess, SeqAccess, Visitor}; use serde::Deserialize; -use serde::de::DeserializeSeed; -use serde::de::MapAccess; -use serde::de::SeqAccess; -use serde::de::Visitor; use serde_json::Value; -use super::Error; -use super::{ByteCounter, DocumentsBatchIndex}; +use super::{ByteCounter, DocumentsBatchIndex, Error}; use crate::FieldId; macro_rules! tri { @@ -31,8 +26,9 @@ impl<'a, 'de> DeserializeSeed<'de> for FieldIdResolver<'a> { fn deserialize(self, deserializer: D) -> Result where - D: serde::Deserializer<'de> { - deserializer.deserialize_str(self) + D: serde::Deserializer<'de>, + { + deserializer.deserialize_str(self) } } @@ -43,7 +39,7 @@ impl<'a, 'de> Visitor<'de> for FieldIdResolver<'a> { where E: serde::de::Error, { - Ok(self.0.insert(v)) + Ok(self.0.insert(v)) } fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result { @@ -58,8 +54,9 @@ impl<'de> DeserializeSeed<'de> for ValueDeserializer { fn deserialize(self, deserializer: D) -> Result where - D: serde::Deserializer<'de> { - serde_json::Value::deserialize(deserializer) + D: serde::Deserializer<'de>, + { + serde_json::Value::deserialize(deserializer) } } @@ -80,7 +77,9 @@ impl<'a, 'de, W: Write> Visitor<'de> for &mut DocumentVisitor<'a, W> { where A: SeqAccess<'de>, { - while let Some(v) = seq.next_element_seed(&mut *self)? { tri!(v) } + while let Some(v) = seq.next_element_seed(&mut *self)? { + tri!(v) + } Ok(Ok(())) } @@ -89,7 +88,9 @@ impl<'a, 'de, W: Write> Visitor<'de> for &mut DocumentVisitor<'a, W> { where A: MapAccess<'de>, { - while let Some((key, value)) = map.next_entry_seed(FieldIdResolver(&mut *self.index), ValueDeserializer)? { + while let Some((key, value)) = + map.next_entry_seed(FieldIdResolver(&mut *self.index), ValueDeserializer)? + { self.values.insert(key, value); } @@ -119,13 +120,15 @@ impl<'a, 'de, W: Write> Visitor<'de> for &mut DocumentVisitor<'a, W> { } impl<'a, 'de, W> DeserializeSeed<'de> for &mut DocumentVisitor<'a, W> -where W: Write, +where + W: Write, { type Value = Result<(), Error>; fn deserialize(self, deserializer: D) -> Result where - D: serde::Deserializer<'de> { - deserializer.deserialize_map(self) + D: serde::Deserializer<'de>, + { + deserializer.deserialize_map(self) } } diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 17c778060e..440546b10c 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -906,8 +906,9 @@ mod tests { let mut cursor = Cursor::new(Vec::new()); + let big_object = serde_json::to_string(&big_object).unwrap(); let mut builder = DocumentBatchBuilder::new(&mut cursor).unwrap(); - builder.add_documents(big_object).unwrap(); + builder.extend_from_json(&mut big_object.as_bytes()).unwrap(); builder.finish().unwrap(); cursor.set_position(0); let content = DocumentBatchReader::from_reader(cursor).unwrap(); diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index eac60effb0..08aa72d35a 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -544,7 +544,8 @@ mod test { mod primary_key_inference { use bimap::BiHashMap; - use crate::{documents::DocumentsBatchIndex, update::index_documents::transform::find_primary_key}; + use crate::documents::DocumentsBatchIndex; + use crate::update::index_documents::transform::find_primary_key; #[test] fn primary_key_infered_on_first_field() {