diff --git a/crates/datasources/src/bson/builder.rs b/crates/datasources/src/bson/builder.rs index e4e7a19ec..f13bf8edd 100644 --- a/crates/datasources/src/bson/builder.rs +++ b/crates/datasources/src/bson/builder.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use bitvec::order::Lsb0; use bitvec::vec::BitVec; -use bson::{RawBsonRef, RawDocument}; +use bson::{RawBsonRef, RawDocumentBuf}; use datafusion::arrow::array::{ Array, ArrayBuilder, @@ -88,29 +88,26 @@ impl RecordStructBuilder { Ok(()) } - pub fn append_record(&mut self, doc: &RawDocument) -> Result<()> { + pub fn append_record(&mut self, doc: &RawDocumentBuf) -> Result<()> { let mut cols_set: BitVec = BitVec::repeat(false, self.fields.len()); - for iter_result in doc { - match iter_result { - Ok((key, val)) => { - let idx = *self - .field_index - .get(key) - .ok_or_else(|| BsonError::ColumnNotInInferredSchema(key.to_string()))?; - - if *cols_set.get(idx).unwrap() { - continue; - } + for item in doc.iter_elements() { + let elem = item?; - // Add value to columns. - self.add_value_at_index(idx, Some(val))?; + let idx = *self + .field_index + .get(elem.key()) + .ok_or_else(|| BsonError::ColumnNotInInferredSchema(elem.key().to_string()))?; - // Track which columns we've added values to. - cols_set.set(idx, true); - } - Err(_) => return Err(BsonError::FailedToReadRawBsonDocument), + if *cols_set.get(idx).unwrap() { + continue; } + + // Add value to columns. + self.add_value_at_index(idx, Some(elem.value()?))?; + + // Track which columns we've added values to. + cols_set.set(idx, true); } // Append nulls to all columns not included in the doc. @@ -124,13 +121,13 @@ impl RecordStructBuilder { Ok(()) } - pub fn project_and_append(&mut self, doc: &RawDocument) -> Result<()> { + pub fn project_and_append(&mut self, doc: &RawDocumentBuf) -> Result<()> { let mut cols_set: BitVec = BitVec::repeat(false, self.fields.len()); - for iter_result in doc { + for iter_result in doc.iter_elements() { match iter_result { - Ok((key, val)) => { - if let Some(&idx) = self.field_index.get(key) { + Ok(elem) => { + if let Some(&idx) = self.field_index.get(elem.key()) { if cols_set.get(idx).is_some_and(|v| v == true) { // If this happens it means that the bson document has a field // name that appears more than once. This is legal and possible to build @@ -141,7 +138,7 @@ impl RecordStructBuilder { } // Add value to columns. - self.add_value_at_index(idx, Some(val))?; + self.add_value_at_index(idx, Some(elem.value()?))?; // Track which columns we've added values to. cols_set.set(idx, true); @@ -366,7 +363,7 @@ fn append_value(val: RawBsonRef, typ: &DataType, col: &mut dyn ArrayBuilder) -> .as_any_mut() .downcast_mut::() .unwrap(); - builder.project_and_append(nested)?; + builder.project_and_append(&nested.to_raw_document_buf())?; } // Array @@ -560,8 +557,7 @@ mod test { buf.append("value", "second"); assert_eq!(buf.iter().count(), 4); - rsb.append_record(RawDocument::from_bytes(&buf.into_bytes()).unwrap()) - .unwrap(); + rsb.append_record(&buf).unwrap(); } assert_eq!(rsb.len(), 100); for value in rsb @@ -594,7 +590,7 @@ mod test { buf.append("value", "first"); assert_eq!(buf.iter().count(), 3); - rsb.append_record(RawDocument::from_bytes(&buf.into_bytes()).unwrap()) + rsb.append_record(&buf) .expect("first record matchex expectations"); assert_eq!(rsb.len(), 1); @@ -602,10 +598,10 @@ mod test { buf.append("index", 1); buf.append("values", 3); assert_eq!(buf.iter().count(), 2); - rsb.append_record(RawDocument::from_bytes(&buf.clone().into_bytes()).unwrap()) + rsb.append_record(&buf.clone()) .expect_err("for append_record schema changes are an error"); assert_eq!(rsb.len(), 1); - rsb.project_and_append(RawDocument::from_bytes(&buf.clone().into_bytes()).unwrap()) + rsb.project_and_append(&buf.clone()) .expect("project and append should filter out unrequired fields"); assert_eq!(rsb.len(), 2); @@ -615,12 +611,12 @@ mod test { buf.append("values", 3); assert_eq!(buf.iter().count(), 3); - rsb.append_record(RawDocument::from_bytes(&buf.clone().into_bytes()).unwrap()) + rsb.append_record(&buf) .expect_err("for append_record schema changes are an error"); // the first value was added successfully to another buffer to the rsb grew assert_eq!(rsb.len(), 3); - rsb.project_and_append(RawDocument::from_bytes(&buf.clone().into_bytes()).unwrap()) + rsb.project_and_append(&buf.clone()) .expect("project and append should filter out unrequired fields"); assert_eq!(rsb.len(), 4); } diff --git a/crates/datasources/src/bson/schema.rs b/crates/datasources/src/bson/schema.rs index 6bdabc6d4..a13364514 100644 --- a/crates/datasources/src/bson/schema.rs +++ b/crates/datasources/src/bson/schema.rs @@ -12,30 +12,24 @@ use crate::bson::errors::{BsonError, Result}; const RECURSION_LIMIT: usize = 100; pub fn schema_from_document(doc: &RawDocumentBuf) -> Result { - Ok(Schema::new(fields_from_document( - 0, - doc.iter().map(|item| item.map_err(|e| e.into())), - )?)) + Ok(Schema::new(fields_from_document(0, doc)?)) } -fn fields_from_document<'a>( - depth: usize, - doc_iter: impl Iterator)>>, -) -> Result> { +fn fields_from_document(depth: usize, doc: &RawDocumentBuf) -> Result> { if depth >= RECURSION_LIMIT { return Err(BsonError::RecursionLimitExceeded(RECURSION_LIMIT)); } - // let doc_iter = doc.iter(); - let (_, size) = doc_iter.size_hint(); + let iter = doc.iter_elements(); + let (_, size) = iter.size_hint(); let mut fields = Vec::with_capacity(size.unwrap_or_default()); - for item in doc_iter { - let (key, val) = item?; - let arrow_typ = bson_to_arrow_type(depth, val)?; + for item in iter { + let elem = item?; + let arrow_typ = bson_to_arrow_type(depth, elem.value()?)?; // Assume everything is nullable. - fields.push(Field::new(key, arrow_typ, true)); + fields.push(Field::new(elem.key(), arrow_typ, true)); } Ok(fields) @@ -56,13 +50,9 @@ fn bson_to_arrow_type(depth: usize, bson: RawBsonRef) -> Result { )?, true, ), - RawBsonRef::Document(nested) => DataType::Struct( - fields_from_document( - depth + 1, - nested.into_iter().map(|item| item.map_err(|e| e.into())), - )? - .into(), - ), + RawBsonRef::Document(nested) => { + DataType::Struct(fields_from_document(depth + 1, &nested.to_raw_document_buf())?.into()) + } RawBsonRef::String(_) => DataType::Utf8, RawBsonRef::Double(_) => DataType::Float64, RawBsonRef::Boolean(_) => DataType::Boolean,