Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: bson builder should handle schema flexibly #2334

Merged
merged 12 commits into from
Jan 2, 2024
152 changes: 143 additions & 9 deletions crates/datasources/src/bson/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,14 @@ impl RecordStructBuilder {
.field_index
.get(key)
.ok_or_else(|| BsonError::ColumnNotInInferredSchema(key.to_string()))?;
println!("{}->{}", key, idx);

if *cols_set.get(idx).unwrap() {
println!("DUPLICATE SET: {}, {:?}", key, doc);
continue;
}

// Add value to columns.
let typ = self.fields.get(idx).unwrap().data_type(); // Programmer error if data type doesn't exist.
let col = self.builders.get_mut(idx).unwrap(); // Programmer error if this doesn't exist.
append_value(val, typ, col.as_mut())?;
self.add_value_at_index(idx, Some(val))?;

// Track which columns we've added values to.
cols_set.set(idx, true);
Expand All @@ -102,10 +101,46 @@ impl RecordStructBuilder {
// Append nulls to all columns not included in the doc.
for (idx, did_set) in cols_set.iter().enumerate() {
if !did_set {
// Add nulls...
let typ = self.fields.get(idx).unwrap().data_type(); // Programmer error if data type doesn't exist.
let col = self.builders.get_mut(idx).unwrap(); // Programmer error if column doesn't exist.
append_null(typ, col.as_mut())?;
// Add null...
self.add_value_at_index(idx, None)?;
}
}

Ok(())
}

pub fn project_and_append(&mut self, doc: &RawDocument) -> Result<()> {
let mut cols_set: BitVec<u8, Lsb0> = BitVec::repeat(false, self.fields.len());

for iter_result in doc {
match iter_result {
Ok((key, val)) => {
if let Some(&idx) = self.field_index.get(key) {
if cols_set.get(idx).is_some_and(|v| v == true) {
// If this happens it means that the bson document has a field
// name that appears more than once. This is legal and possible to build
// with some libraries but isn't forbidden, and (I think?) historically
// not (always?) rejected by MongoDB. Regardless "ignoring second
// appearances of the key" is a reasonable semantic.
continue;
}

// Add value to columns.
self.add_value_at_index(idx, Some(val))?;

// Track which columns we've added values to.
cols_set.set(idx, true);
};
}
Err(_) => return Err(BsonError::FailedToReadRawBsonDocument),
}
}

// Append nulls to all columns not included in the doc.
for (idx, did_set) in cols_set.iter().enumerate() {
if !did_set {
// Add null...
self.add_value_at_index(idx, None)?;
}
}

Expand All @@ -115,6 +150,16 @@ impl RecordStructBuilder {
pub fn into_fields_and_builders(self) -> (Fields, Vec<Box<dyn ArrayBuilder>>) {
(self.fields, self.builders)
}

fn add_value_at_index(&mut self, idx: usize, val: Option<RawBsonRef>) -> Result<()> {
let typ = self.fields.get(idx).unwrap().data_type(); // Programmer error if data type doesn't exist.
let col = self.builders.get_mut(idx).unwrap(); // Programmer error if column doesn't exist.

match val {
Some(v) => append_value(v, typ, col.as_mut()),
None => append_null(typ, col.as_mut()),
}
}
}

impl ArrayBuilder for RecordStructBuilder {
Expand Down Expand Up @@ -288,7 +333,7 @@ fn append_value(val: RawBsonRef, typ: &DataType, col: &mut dyn ArrayBuilder) ->
.as_any_mut()
.downcast_mut::<RecordStructBuilder>()
.unwrap();
builder.append_record(nested)?;
builder.project_and_append(nested)?;
}

// Array
Expand Down Expand Up @@ -410,3 +455,92 @@ fn column_builders_for_fields(

Ok(cols)
}

#[cfg(test)]
mod test {
use bson::oid::ObjectId;

use super::*;

#[test]
fn test_duplicate_field_handling() {
let fields = Fields::from_iter(vec![
Field::new("_id", DataType::Binary, true),
Field::new("idx", DataType::Int64, true),
Field::new("value", DataType::Utf8, true),
]);
let mut rsb = RecordStructBuilder::new_with_capacity(fields, 100).unwrap();
for idx in 0..100 {
let mut buf = bson::RawDocumentBuf::new();

buf.append("_id", ObjectId::new());
buf.append("idx", idx as i64);
buf.append("value", "first");
buf.append("value", "second");
assert_eq!(buf.iter().count(), 4);

rsb.append_record(RawDocument::from_bytes(&buf.into_bytes()).unwrap())
.unwrap();
}
assert_eq!(rsb.len(), 100);
for value in rsb
.builders
.get_mut(2)
.unwrap()
.as_any_mut()
.downcast_mut::<StringBuilder>()
.unwrap()
.finish_cloned()
.iter()
{
let v = value.unwrap();
assert_eq!(v, "first");
}
}

#[test]
fn test_unexpected_schema_change() {
let fields = Fields::from_iter(vec![
Field::new("_id", DataType::Binary, true),
Field::new("idx", DataType::Int64, true),
Field::new("value", DataType::Utf8, true),
]);
let mut rsb = RecordStructBuilder::new_with_capacity(fields, 100).unwrap();
let mut buf = bson::RawDocumentBuf::new();

buf.append("_id", ObjectId::new());
buf.append("idx", 0);
buf.append("value", "first");
assert_eq!(buf.iter().count(), 3);

rsb.append_record(RawDocument::from_bytes(&buf.into_bytes()).unwrap())
.expect("first record matchex expectations");
assert_eq!(rsb.len(), 1);

let mut buf = bson::RawDocumentBuf::new();
buf.append("index", 1);
buf.append("values", 3);
assert_eq!(buf.iter().count(), 2);
rsb.append_record(RawDocument::from_bytes(&buf.clone().into_bytes()).unwrap())
.expect_err("for append_record schema changes are an error");
assert_eq!(rsb.len(), 1);
rsb.project_and_append(RawDocument::from_bytes(&buf.clone().into_bytes()).unwrap())
.expect("project and append should filter out unrequired fields");
assert_eq!(rsb.len(), 2);

let mut buf = bson::RawDocumentBuf::new();
buf.append("_id", ObjectId::new());
buf.append("index", 1);
buf.append("values", 3);
assert_eq!(buf.iter().count(), 3);

rsb.append_record(RawDocument::from_bytes(&buf.clone().into_bytes()).unwrap())
.expect_err("for append_record schema changes are an error");
// the first value was added successfully to another buffer to the rsb grew
assert_eq!(rsb.len(), 3);

rsb.project_and_append(RawDocument::from_bytes(&buf.clone().into_bytes()).unwrap())
.expect("project and append should filter out unrequired fields");
assert_eq!(rsb.len(), 4);
}
}
3 changes: 1 addition & 2 deletions crates/datasources/src/bson/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ impl BsonStream {
let mut builder = RecordStructBuilder::new_with_capacity(schema.fields().to_owned(), 100)?;

for result in results {
// TOOD: shouldn't convert here.
builder.append_record(&result?)?;
builder.project_and_append(&result?)?;
}

let (fields, builders) = builder.into_fields_and_builders();
Expand Down
Binary file added tests/tests/dupes.bson
Binary file not shown.
Loading