Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: bson stream cleanup #3101

Merged
merged 2 commits into from
Jul 25, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 20 additions & 50 deletions crates/datasources/src/bson/table.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
use std::collections::VecDeque;
use std::sync::Arc;

use bson::RawDocumentBuf;
use bytes::BytesMut;
use datafusion::arrow::datatypes::Schema;
use datafusion::datasource::streaming::StreamingTable;
use datafusion::datasource::TableProvider;
use datafusion::parquet::data_type::AsBytes;
use datafusion::physical_plan::streaming::PartitionStream;
use futures::StreamExt;
use futures::{StreamExt, TryStreamExt};
use object_store::{ObjectMeta, ObjectStore};
use tokio_util::codec::LengthDelimitedCodec;

Expand All @@ -27,6 +25,7 @@ pub async fn bson_streaming_table(
let accessor = ObjStoreAccessor::new(store_access)?;

let mut list = accessor.list_globbed(source_url.path()).await?;

if list.is_empty() {
return Err(BsonError::NotFound(source_url.path().into()));
}
Expand All @@ -35,9 +34,13 @@ pub async fn bson_streaming_table(
// sort by location
list.sort_by(|a, b| a.location.cmp(&b.location));

let store = accessor.into_object_store();

bson_streaming_table_inner(store, list, schema, schema_inference_sample_size).await
bson_streaming_table_inner(
accessor.into_object_store(),
list,
schema,
schema_inference_sample_size,
)
.await
}

pub async fn bson_streaming_table_from_object(
Expand All @@ -53,14 +56,6 @@ async fn bson_streaming_table_inner(
schema: Option<Schema>,
schema_inference_sample_size: Option<i64>,
) -> Result<Arc<dyn TableProvider>, BsonError> {
// TODO: set a maximum (1024?) or have an adaptive mode
// (at least n but stop after n the same) or skip documents
let sample_size = if schema.is_some() {
0
} else {
schema_inference_sample_size.unwrap_or(100)
};

// build a vector of streams, one for each file, that handle BSON's framing.
let mut readers = VecDeque::with_capacity(list.len());
for obj in list {
Expand All @@ -86,23 +81,9 @@ async fn bson_streaming_table_inner(
&obj,
32 * 1024 * 1024, // 32 MB buffer, probably still too small.
))
.map_err(BsonError::from) // convert the error
// convert the chunk of bytes to bson.
.map(
// TODO: this probably wants to be a raw document
// eventually, so we can put all the _fields_ in a map,
// iterate over the document once, and check each bson
// field name against the schema, and only pull out the
// fields that match. This is easier in the short term
// but less performant for large documents where the
// documents are a superset of the schema, we'll end up
// doing much more parsing work than is actually needed
// for the bson documents.
|bt: Result<BytesMut, std::io::Error>| -> Result<RawDocumentBuf, BsonError> {
Ok(bson::de::from_slice::<RawDocumentBuf>(
bt?.freeze().as_bytes().to_owned().as_slice(),
)?)
},
),
.map(|bt| Ok(bson::de::from_reader(bt?.freeze().as_bytes())?)),
);
}

Expand All @@ -113,16 +94,17 @@ async fn bson_streaming_table_inner(
let schema = if let Some(schema) = schema {
Arc::new(schema)
} else {
// TODO: set a maximum (1024?) or have an adaptive mode
// (at least n but stop after n the same) or skip documents
let sample_size = schema_inference_sample_size.unwrap_or(100);

// iterate through the readers and build up a sample of the first <n>
// documents to be used to infer the schema.
let mut sample = Vec::with_capacity(sample_size as usize);
let mut first_active: usize = 0;
'readers: for reader in readers.iter_mut() {
while let Some(res) = reader.next().await {
match res {
Ok(doc) => sample.push(doc),
Err(e) => return Err(e),
};
while let Some(doc) = reader.next().await {
sample.push(doc?);

if sample.len() >= sample_size as usize {
break 'readers;
Expand All @@ -143,24 +125,15 @@ async fn bson_streaming_table_inner(
// of as a base-level projection, but we'd need a schema specification
// language). Or have some other strategy for inference rather than
// every unique field from the first <n> documents.
let schema = Arc::new(merge_schemas(
sample
.iter()
.map(|doc| schema_from_document(&doc.to_raw_document_buf())),
)?);
let schema = Arc::new(merge_schemas(sample.iter().map(schema_from_document))?);

// all the documents we read for the sample are hanging around
// somewhere and we want to make sure that callers access
// them: we're going to make a special stream with these
// documents here.
streams.push(Arc::new(BsonPartitionStream::new(
schema.clone(),
futures::stream::iter(
sample
.into_iter()
.map(|doc| -> Result<RawDocumentBuf, BsonError> { Ok(doc) }),
)
.boxed(),
futures::stream::iter(sample.into_iter().map(Ok)).boxed(),
)));

schema
Expand All @@ -176,8 +149,5 @@ async fn bson_streaming_table_inner(
)));
}

Ok(Arc::new(StreamingTable::try_new(
schema.clone(), // <= inferred schema
streams, // <= vector of partition streams
)?))
Ok(Arc::new(StreamingTable::try_new(schema.clone(), streams)?))
}
Loading