Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: bson builder should handle schema flexibly #2334

Merged
merged 12 commits into from
Jan 2, 2024
105 changes: 96 additions & 9 deletions crates/datasources/src/bson/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,11 @@ impl RecordStructBuilder {
.ok_or_else(|| BsonError::ColumnNotInInferredSchema(key.to_string()))?;

if *cols_set.get(idx).unwrap() {
println!("DUPLICATE SET: {}, {:?}", key, doc);
continue;
}

// Add value to columns.
let typ = self.fields.get(idx).unwrap().data_type(); // Programmer error if data type doesn't exist.
let col = self.builders.get_mut(idx).unwrap(); // Programmer error if this doesn't exist.
append_value(val, typ, col.as_mut())?;
self.add_value_at_index(idx, Some(val))?;

// Track which columns we've added values to.
cols_set.set(idx, true);
Expand All @@ -102,10 +100,46 @@ impl RecordStructBuilder {
// Append nulls to all columns not included in the doc.
for (idx, did_set) in cols_set.iter().enumerate() {
if !did_set {
// Add nulls...
let typ = self.fields.get(idx).unwrap().data_type(); // Programmer error if data type doesn't exist.
let col = self.builders.get_mut(idx).unwrap(); // Programmer error if column doesn't exist.
append_null(typ, col.as_mut())?;
// Add null...
self.add_value_at_index(idx, None)?;
}
}

Ok(())
}

pub fn project_and_append(&mut self, doc: &RawDocument) -> Result<()> {
let mut cols_set: BitVec<u8, Lsb0> = BitVec::repeat(false, self.fields.len());

for iter_result in doc {
match iter_result {
Ok((key, val)) => {
if let Some(&idx) = self.field_index.get(key) {
if cols_set.get(idx).is_some_and(|v| v == true) {
// If this happens it means that the bson document has a field
// name that appears more than once. This is legal and possible to build
// with some libraries but isn't forbidden, and (I think?) historically
// not (always?) rejected by MongoDB. Regardless "ignoring second
// appearances of the key" is a reasonable semantic.
continue;
}

// Add value to columns.
self.add_value_at_index(idx, Some(val))?;

// Track which columns we've added values to.
cols_set.set(idx, true);
};
}
Err(_) => return Err(BsonError::FailedToReadRawBsonDocument),
}
}

// Append nulls to all columns not included in the doc.
for (idx, did_set) in cols_set.iter().enumerate() {
if !did_set {
// Add null...
self.add_value_at_index(idx, None)?;
}
}

Expand All @@ -115,6 +149,16 @@ impl RecordStructBuilder {
pub fn into_fields_and_builders(self) -> (Fields, Vec<Box<dyn ArrayBuilder>>) {
(self.fields, self.builders)
}

fn add_value_at_index(&mut self, idx: usize, val: Option<RawBsonRef>) -> Result<()> {
let typ = self.fields.get(idx).unwrap().data_type(); // Programmer error if data type doesn't exist.
let col = self.builders.get_mut(idx).unwrap(); // Programmer error if column doesn't exist.

match val {
Some(v) => append_value(v, typ, col.as_mut()),
None => append_null(typ, col.as_mut()),
}
}
}

impl ArrayBuilder for RecordStructBuilder {
Expand Down Expand Up @@ -288,7 +332,7 @@ fn append_value(val: RawBsonRef, typ: &DataType, col: &mut dyn ArrayBuilder) ->
.as_any_mut()
.downcast_mut::<RecordStructBuilder>()
.unwrap();
builder.append_record(nested)?;
builder.project_and_append(nested)?;
}

// Array
Expand Down Expand Up @@ -410,3 +454,46 @@ fn column_builders_for_fields(

Ok(cols)
}

#[cfg(test)]
mod test {
use bson::oid::ObjectId;

use super::*;

#[test]
fn test_duplicate_field_handling() {
let fields = Fields::from_iter(vec![
Field::new("_id", DataType::Binary, true),
Field::new("idx", DataType::Int64, true),
Field::new("value", DataType::Utf8, true),
]);
let mut rsb = RecordStructBuilder::new_with_capacity(fields, 100).unwrap();
for idx in 0..100 {
let mut buf = bson::RawDocumentBuf::new();

buf.append("_id", ObjectId::new());
buf.append("idx", idx as i64);
buf.append("value", "first");
buf.append("value", "second");
assert_eq!(buf.iter().count(), 4);

rsb.append_record(RawDocument::from_bytes(&buf.into_bytes()).unwrap())
.unwrap();
}
assert_eq!(rsb.len(), 100);
for value in rsb
.builders
.get_mut(2)
.unwrap()
.as_any_mut()
.downcast_mut::<StringBuilder>()
.unwrap()
.finish_cloned()
.iter()
{
let v = value.unwrap();
assert_eq!(v, "first");
}
}
}
3 changes: 1 addition & 2 deletions crates/datasources/src/bson/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ impl BsonStream {
let mut builder = RecordStructBuilder::new_with_capacity(schema.fields().to_owned(), 100)?;

for result in results {
// TOOD: shouldn't convert here.
builder.append_record(&result?)?;
builder.project_and_append(&result?)?;
}

let (fields, builders) = builder.into_fields_and_builders();
Expand Down
Binary file added tests/tests/dupes.bson
tychoish marked this conversation as resolved.
Show resolved Hide resolved
Binary file not shown.
93 changes: 93 additions & 0 deletions tests/tests/test_bson.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,96 @@ def test_read_bson(
assert len(row) == 5
assert row["beatle_name"] in beatles
assert beatles.index(row["beatle_name"]) == row["beatle_idx"] - 1


def test_read_bson(
tychoish marked this conversation as resolved.
Show resolved Hide resolved
glaredb_connection: psycopg2.extensions.connection,
tmp_path_factory: pytest.TempPathFactory,
):
beatles = {"john": (0, 1940), "paul": (1, 1942), "george": (2, 1943), "ringo": (3, 1940)}

tmp_dir = tmp_path_factory.mktemp(basename="read-bson-beatles-", numbered=True)
data_path = tmp_dir.joinpath("beatles.208.bson")

counts = {"john": 0, "paul": 0, "george": 0, "ringo": 0}

keys = [b for b in beatles.keys()]
# construct a
with open(data_path, "wb") as f:
for i in range(100):
beatle = random.choice(keys)
counts[beatle] += 1
f.write(
bson.encode(
{
"_id": bson.objectid.ObjectId(),
"beatle_idx": beatles[beatle][0],
"beatle_name": beatle,
"case": i + 1,
}
)
)

for beatle in counts:
f.write(
bson.encode(
{
"_id": bson.objectid.ObjectId(),
"beatle_name": beatle,
"count": counts[beatle],
}
)
)

for i in range(100):
beatle = random.choice(keys)
counts[beatle] += 1
f.write(
bson.encode(
{
"_id": bson.objectid.ObjectId(),
"beatle_name": beatle,
"birth_year": beatles[beatle][1],
"beatle_idx": beatles[beatle][0],
}
)
)

for beatle in counts:
f.write(
bson.encode(
{
"_id": bson.objectid.ObjectId(),
"beatle_name": beatle,
"count": counts[beatle],
}
)
)

with glaredb_connection.cursor() as curr:
curr.execute(
f"create external table bson_beatles from bson options ( location='{data_path}', file_type='bson')"
)

for from_clause in ["bson_beatles", f"read_bson('{data_path}')"]:
with glaredb_connection.cursor() as curr:
curr.execute(f"select count(*) from {from_clause}")
r = curr.fetchone()
assert r[0] == 208

with glaredb_connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as curr:
curr.execute(f"select * from {from_clause}")
rows = curr.fetchall()
assert len(rows) == 208
meta_docs = 0
for row in rows:
assert len(row) == 4
assert row["beatle_name"] in beatles
assert "count" not in row, "count is not inferred in the schema"

if row["beatle_idx"] is not None:
assert beatles[row["beatle_name"]][0] == row["beatle_idx"]
else:
meta_docs += 1

assert meta_docs == 8
Loading