Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: bson builder should handle schema flexibly #2334

Merged
merged 12 commits into from
Jan 2, 2024
62 changes: 53 additions & 9 deletions crates/datasources/src/bson/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,11 @@ impl RecordStructBuilder {
.ok_or_else(|| BsonError::ColumnNotInInferredSchema(key.to_string()))?;

if *cols_set.get(idx).unwrap() {
println!("DUPLICATE SET: {}, {:?}", key, doc);
continue;
}

// Add value to columns.
let typ = self.fields.get(idx).unwrap().data_type(); // Programmer error if data type doesn't exist.
let col = self.builders.get_mut(idx).unwrap(); // Programmer error if this doesn't exist.
append_value(val, typ, col.as_mut())?;
self.add_value_at_index(idx, Some(val))?;

// Track which columns we've added values to.
cols_set.set(idx, true);
Expand All @@ -102,10 +100,46 @@ impl RecordStructBuilder {
// Append nulls to all columns not included in the doc.
for (idx, did_set) in cols_set.iter().enumerate() {
if !did_set {
// Add nulls...
let typ = self.fields.get(idx).unwrap().data_type(); // Programmer error if data type doesn't exist.
let col = self.builders.get_mut(idx).unwrap(); // Programmer error if column doesn't exist.
append_null(typ, col.as_mut())?;
// Add null...
self.add_value_at_index(idx, None)?;
}
}

Ok(())
}

pub fn project_and_append(&mut self, doc: &RawDocument) -> Result<()> {
let mut cols_set: BitVec<u8, Lsb0> = BitVec::repeat(false, self.fields.len());

for iter_result in doc {
match iter_result {
Ok((key, val)) => {
if let Some(&idx) = self.field_index.get(key) {
if cols_set.get(idx).is_some_and(|v| v == true) {
// If this happens it means that the bson document has a field
// name that appears more than once. This is legal and possible to build
// with some libraries but isn't forbidden, and (I think?) historically
// not (always?) rejected by MongoDB. Regardless "ignoring second
// appearances of the key" is a reasonable semantic.
continue;
}

// Add value to columns.
self.add_value_at_index(idx, Some(val))?;

// Track which columns we've added values to.
cols_set.set(idx, true);
};
}
Err(_) => return Err(BsonError::FailedToReadRawBsonDocument),
}
}

// Append nulls to all columns not included in the doc.
for (idx, did_set) in cols_set.iter().enumerate() {
if !did_set {
// Add null...
self.add_value_at_index(idx, None)?;
}
}

Expand All @@ -115,6 +149,16 @@ impl RecordStructBuilder {
pub fn into_fields_and_builders(self) -> (Fields, Vec<Box<dyn ArrayBuilder>>) {
(self.fields, self.builders)
}

fn add_value_at_index(&mut self, idx: usize, val: Option<RawBsonRef>) -> Result<()> {
let typ = self.fields.get(idx).unwrap().data_type(); // Programmer error if data type doesn't exist.
let col = self.builders.get_mut(idx).unwrap(); // Programmer error if column doesn't exist.

match val {
Some(v) => append_value(v, typ, col.as_mut()),
None => append_null(typ, col.as_mut()),
}
}
}

impl ArrayBuilder for RecordStructBuilder {
Expand Down Expand Up @@ -288,7 +332,7 @@ fn append_value(val: RawBsonRef, typ: &DataType, col: &mut dyn ArrayBuilder) ->
.as_any_mut()
.downcast_mut::<RecordStructBuilder>()
.unwrap();
builder.append_record(nested)?;
builder.project_and_append(nested)?;
}

// Array
Expand Down
3 changes: 1 addition & 2 deletions crates/datasources/src/bson/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ impl BsonStream {
let mut builder = RecordStructBuilder::new_with_capacity(schema.fields().to_owned(), 100)?;

for result in results {
// TOOD: shouldn't convert here.
builder.append_record(&result?)?;
builder.project_and_append(&result?)?;
}

let (fields, builders) = builder.into_fields_and_builders();
Expand Down
Binary file added tests/tests/dupes.bson
tychoish marked this conversation as resolved.
Show resolved Hide resolved
Binary file not shown.
7 changes: 7 additions & 0 deletions tests/tests/scripts/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module github.com/glaredb/glaredb/tests

go 1.21.5

require github.com/tychoish/fun v0.10.8

require github.com/tychoish/birch v0.2.3-0.20230825153023-79636ce03829 // indirect
6 changes: 6 additions & 0 deletions tests/tests/scripts/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
github.com/tychoish/birch v0.2.2 h1:+qiWibwCa8UoJvRFlpUxF6rLlGfnbNWKcIL8dkvY4Lo=
github.com/tychoish/birch v0.2.2/go.mod h1:k/5kBT4z40uY6vqQPJ3IBOU6E3/YMyIlUkerx3qY/Vo=
github.com/tychoish/birch v0.2.3-0.20230825153023-79636ce03829 h1:Lt2IbZbnSk49LfYZTyfCz22Pd5q6siVhS7xj3ELzIco=
github.com/tychoish/birch v0.2.3-0.20230825153023-79636ce03829/go.mod h1:Gv3VW01RrjmgGWJgxpT+wh71IKFV2vJOe7M00i+TFgE=
github.com/tychoish/fun v0.10.8 h1:NBC8ug2cRYGKzlD70+r/tz4lkwfKAshFv9aaxHoYNwQ=
github.com/tychoish/fun v0.10.8/go.mod h1:ZZfrwtsnHHV81ecZCBPp57DjjYY9Io39JH2QSXNpKn4=
27 changes: 27 additions & 0 deletions tests/tests/scripts/make_bad_bson.go
tychoish marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package main

import (
"os"

"github.com/tychoish/birch"
"github.com/tychoish/birch/types"
"github.com/tychoish/fun"
"github.com/tychoish/fun/ft"
)

func main() {
fn, err := os.Create("dupes.bson")
fun.Invariant.Must(err)
defer func() { fun.Invariant.Must(fn.Close()) }()

for i := 0; i < 100; i++ {
n := ft.Must(
birch.DC.Make(3).Append(
birch.EC.ObjectID("_id", types.NewObjectID()),
birch.EC.Int32("idx", 1),
birch.EC.String("dupe", "first"),
birch.EC.String("dupe", "second"),
).WriteTo(fn))
fun.Invariant.IsTrue(n == 64, "unexpected document length", n)
}
}
118 changes: 118 additions & 0 deletions tests/tests/test_bson.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,121 @@ def test_read_bson(
assert len(row) == 5
assert row["beatle_name"] in beatles
assert beatles.index(row["beatle_name"]) == row["beatle_idx"] - 1


def test_read_bson_multiple_fields(
glaredb_connection: psycopg2.extensions.connection,
tmp_path_factory: pytest.TempPathFactory,
):
data_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "dupes.bson")

with glaredb_connection.cursor() as curr:
curr.execute(
f"create external table dupes from bson options ( location='{data_path}', file_type='bson')"
)

for from_clause in ["dupes", f"read_bson('{data_path}')"]:
with glaredb_connection.cursor() as curr:
curr.execute(f"select count(*) from {from_clause}")
r = curr.fetchone()
assert r[0] == 100
with glaredb_connection.cursor() as curr:
curr.execute(f"select * from {from_clause}")
rows = curr.fetchall()
assert len(rows) == 100
for row in rows:
assert len(row) == 3
assert row[2] == "first", row


def test_read_bson(
tychoish marked this conversation as resolved.
Show resolved Hide resolved
glaredb_connection: psycopg2.extensions.connection,
tmp_path_factory: pytest.TempPathFactory,
):
beatles = {"john": (0, 1940), "paul": (1, 1942), "george": (2, 1943), "ringo": (3, 1940)}

tmp_dir = tmp_path_factory.mktemp(basename="read-bson-beatles-", numbered=True)
data_path = tmp_dir.joinpath("beatles.208.bson")

counts = {"john": 0, "paul": 0, "george": 0, "ringo": 0}

keys = [b for b in beatles.keys()]
# construct a
with open(data_path, "wb") as f:
for i in range(100):
beatle = random.choice(keys)
counts[beatle] += 1
f.write(
bson.encode(
{
"_id": bson.objectid.ObjectId(),
"beatle_idx": beatles[beatle][0],
"beatle_name": beatle,
"case": i + 1,
}
)
)

for beatle in counts:
f.write(
bson.encode(
{
"_id": bson.objectid.ObjectId(),
"beatle_name": beatle,
"count": counts[beatle],
}
)
)

for i in range(100):
beatle = random.choice(keys)
counts[beatle] += 1
f.write(
bson.encode(
{
"_id": bson.objectid.ObjectId(),
"beatle_name": beatle,
"birth_year": beatles[beatle][1],
"beatle_idx": beatles[beatle][0],
}
)
)

for beatle in counts:
f.write(
bson.encode(
{
"_id": bson.objectid.ObjectId(),
"beatle_name": beatle,
"count": counts[beatle],
}
)
)

with glaredb_connection.cursor() as curr:
curr.execute(
f"create external table bson_beatles from bson options ( location='{data_path}', file_type='bson')"
)

for from_clause in ["bson_beatles", f"read_bson('{data_path}')"]:
with glaredb_connection.cursor() as curr:
curr.execute(f"select count(*) from {from_clause}")
r = curr.fetchone()
assert r[0] == 208

with glaredb_connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as curr:
curr.execute(f"select * from {from_clause}")
rows = curr.fetchall()
assert len(rows) == 208
meta_docs = 0
for row in rows:
assert len(row) == 4
assert row["beatle_name"] in beatles
assert "count" not in row, "count is not inferred in the schema"

if row["beatle_idx"] is not None:
assert beatles[row["beatle_name"]][0] == row["beatle_idx"]
else:
meta_docs += 1

assert meta_docs == 8
Loading