diff --git a/crates/datafusion_ext/src/errors.rs b/crates/datafusion_ext/src/errors.rs index 609583357..619881208 100644 --- a/crates/datafusion_ext/src/errors.rs +++ b/crates/datafusion_ext/src/errors.rs @@ -30,9 +30,6 @@ pub enum ExtensionError { #[error(transparent)] Arrow(#[from] datafusion::arrow::error::ArrowError), - #[error(transparent)] - BsonDeserialize(#[from] bson::de::Error), - #[error(transparent)] DecimalError(#[from] decimal::DecimalError), @@ -44,6 +41,9 @@ pub enum ExtensionError { #[error(transparent)] ListingErrBoxed(#[from] Box), + + #[error("object store: {0}")] + ObjectStore(String), } impl ExtensionError { diff --git a/crates/datasources/src/bson/builder.rs b/crates/datasources/src/bson/builder.rs index 2ca16e283..cea755d08 100644 --- a/crates/datasources/src/bson/builder.rs +++ b/crates/datasources/src/bson/builder.rs @@ -5,9 +5,10 @@ use std::sync::Arc; use bitvec::{order::Lsb0, vec::BitVec}; use bson::{RawBsonRef, RawDocument}; use datafusion::arrow::array::{ - Array, ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Decimal128Builder, - Float64Builder, Int32Builder, Int64Builder, StringBuilder, StructArray, - TimestampMicrosecondBuilder, TimestampMillisecondBuilder, + Array, ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, Date64Builder, + Decimal128Builder, Float64Builder, Int32Builder, Int64Builder, LargeBinaryBuilder, + LargeStringBuilder, StringBuilder, StructArray, TimestampMicrosecondBuilder, + TimestampMillisecondBuilder, TimestampSecondBuilder, }; use datafusion::arrow::datatypes::{DataType, Field, Fields, TimeUnit}; @@ -182,13 +183,10 @@ fn append_value(val: RawBsonRef, typ: &DataType, col: &mut dyn ArrayBuilder) -> // So robust match (val, typ) { // Boolean - (RawBsonRef::Boolean(v), DataType::Boolean) => { - append_scalar!(BooleanBuilder, col, v) - } + (RawBsonRef::Boolean(v), DataType::Boolean) => append_scalar!(BooleanBuilder, col, v), (RawBsonRef::Boolean(v), DataType::Utf8) => { append_scalar!(StringBuilder, col, v.to_string()) } - // Double (RawBsonRef::Double(v), DataType::Int32) => append_scalar!(Int32Builder, col, v as i32), (RawBsonRef::Double(v), DataType::Int64) => append_scalar!(Int64Builder, col, v as i64), @@ -214,6 +212,8 @@ fn append_value(val: RawBsonRef, typ: &DataType, col: &mut dyn ArrayBuilder) -> } // String + (RawBsonRef::String(v), DataType::Utf8) => append_scalar!(StringBuilder, col, v), + (RawBsonRef::String(v), DataType::LargeUtf8) => append_scalar!(LargeStringBuilder, col, v), (RawBsonRef::String(v), DataType::Boolean) => { append_scalar!(BooleanBuilder, col, v.parse().unwrap_or_default()) } @@ -226,31 +226,61 @@ fn append_value(val: RawBsonRef, typ: &DataType, col: &mut dyn ArrayBuilder) -> (RawBsonRef::String(v), DataType::Float64) => { append_scalar!(Float64Builder, col, v.parse().unwrap_or_default()) } - (RawBsonRef::String(v), DataType::Utf8) => { - append_scalar!(StringBuilder, col, v) - } // Binary (RawBsonRef::Binary(v), DataType::Binary) => append_scalar!(BinaryBuilder, col, v.bytes), + (RawBsonRef::Binary(v), DataType::LargeBinary) => { + append_scalar!(LargeBinaryBuilder, col, v.bytes) + } // Object id + (RawBsonRef::ObjectId(v), DataType::Binary) => { + append_scalar!(BinaryBuilder, col, v.bytes()) + } (RawBsonRef::ObjectId(v), DataType::Utf8) => { append_scalar!(StringBuilder, col, v.to_string()) } - // Timestamp + // Timestamp (internal mongodb type; second specified) + (RawBsonRef::Timestamp(v), DataType::Timestamp(TimeUnit::Second, _)) => { + append_scalar!(TimestampSecondBuilder, col, v.time as i64) + } + (RawBsonRef::Timestamp(v), DataType::Timestamp(TimeUnit::Millisecond, _)) => { + append_scalar!(TimestampSecondBuilder, col, v.time as i64 * 1000) + } (RawBsonRef::Timestamp(v), DataType::Timestamp(TimeUnit::Microsecond, _)) => { - append_scalar!(TimestampMicrosecondBuilder, col, v.time as i64) // TODO: Possibly change to nanosecond. + append_scalar!(TimestampSecondBuilder, col, v.time as i64 * 1000 * 1000) + } + (RawBsonRef::Timestamp(v), DataType::Date64) => { + append_scalar!(Date64Builder, col, v.time as i64 * 1000) + } + (RawBsonRef::Timestamp(v), DataType::Date32) => { + append_scalar!( + Date32Builder, + col, + v.time + .try_into() + .map_err(|_| BsonError::UnhandledElementType( + bson::spec::ElementType::Timestamp, + DataType::Date32 + ))? + ) } - // Datetime + // Datetime (actual timestamps that you'd actually use. in an application ) + (RawBsonRef::DateTime(v), DataType::Timestamp(TimeUnit::Millisecond, _)) => { + append_scalar!(TimestampMillisecondBuilder, col, v.timestamp_millis()) + } (RawBsonRef::DateTime(v), DataType::Timestamp(TimeUnit::Microsecond, _)) => { append_scalar!( - TimestampMicrosecondBuilder, // TODO: Possibly change to nanosecond. + TimestampMicrosecondBuilder, col, - v.timestamp_millis() + v.timestamp_millis() * 1000 ) } + (RawBsonRef::DateTime(v), DataType::Date64) => { + append_scalar!(Date64Builder, col, v.timestamp_millis()) + } // Document (RawBsonRef::Document(nested), DataType::Struct(_)) => { @@ -263,13 +293,16 @@ fn append_value(val: RawBsonRef, typ: &DataType, col: &mut dyn ArrayBuilder) -> // Array (RawBsonRef::Array(arr), DataType::Utf8) => { - // TODO: Proper types. - let s = arr - .into_iter() - .map(|r| r.map(|v| format!("{:?}", v)).unwrap_or_default()) - .collect::>() - .join(", "); - append_scalar!(StringBuilder, col, format!("[{}]", s)) + append_scalar!( + StringBuilder, + col, + serde_json::Value::try_from( + bson::Array::try_from(arr) + .map_err(|_| BsonError::FailedToReadRawBsonDocument)? + ) + .map_err(|_| BsonError::FailedToReadRawBsonDocument)? + .to_string() + ) } // Decimal128 diff --git a/crates/datasources/src/bson/errors.rs b/crates/datasources/src/bson/errors.rs index b74a7ee4d..d1775e26c 100644 --- a/crates/datasources/src/bson/errors.rs +++ b/crates/datasources/src/bson/errors.rs @@ -1,16 +1,34 @@ use datafusion::error::DataFusionError; +use datafusion_ext::errors::ExtensionError; + +use crate::object_store::errors::ObjectStoreSourceError; #[derive(Debug, thiserror::Error)] pub enum BsonError { #[error("Unsupported bson type: {0}")] - UnsupportedBsonType(&'static str), + UnspportedType(&'static str), #[error("Unexpected datatype for builder {0:?}")] UnexpectedDataTypeForBuilder(datafusion::arrow::datatypes::DataType), - #[error("External Datafusion Error")] + #[error(transparent)] + IO(#[from] std::io::Error), + + #[error(transparent)] Datafusion(#[from] datafusion::error::DataFusionError), + #[error(transparent)] + Arrow(#[from] datafusion::arrow::error::ArrowError), + + #[error(transparent)] + Raw(#[from] bson::raw::Error), + + #[error(transparent)] + Serialization(#[from] bson::de::Error), + + #[error(transparent)] + ObjectStore(#[from] ObjectStoreSourceError), + #[error("Unhandled element type to arrow type conversion; {0:?}, {1}")] UnhandledElementType( bson::spec::ElementType, @@ -28,6 +46,9 @@ pub enum BsonError { #[error("Recursion limit exceeded for schema inferrence: {0}")] RecursionLimitExceeded(usize), + + #[error("no objects found {0}")] + NotFound(String), } impl From for DataFusionError { @@ -36,4 +57,10 @@ impl From for DataFusionError { } } +impl From for ExtensionError { + fn from(e: BsonError) -> Self { + ExtensionError::String(e.to_string()) + } +} + pub type Result = std::result::Result; diff --git a/crates/datasources/src/bson/schema.rs b/crates/datasources/src/bson/schema.rs index c483ad0f6..ee2fb009b 100644 --- a/crates/datasources/src/bson/schema.rs +++ b/crates/datasources/src/bson/schema.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::iter::IntoIterator; use bson::{Bson, Document}; -use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; +use datafusion::arrow::datatypes::{DataType, Field, Schema}; use crate::bson::errors::{BsonError, Result}; @@ -11,9 +11,8 @@ use crate::bson::errors::{BsonError, Result}; /// The MongoDB kernel rejects nesting of greater than 100. const RECURSION_LIMIT: usize = 100; -pub fn schema_from_document(doc: &Document) -> Result { - let fields = fields_from_document(0, doc.iter())?; - Ok(Schema::new(fields)) +pub fn schema_from_document(doc: Document) -> Result { + Ok(Schema::new(fields_from_document(0, doc.iter())?)) } fn fields_from_document<'a>( @@ -39,61 +38,51 @@ fn fields_from_document<'a>( } fn bson_to_arrow_type(depth: usize, bson: &Bson) -> Result { - let arrow_typ = match bson { - Bson::Double(_) => DataType::Float64, - Bson::String(val) => { - if val.is_empty() { - // TODO: We'll want to determine if this is something we should - // keep in. Currently when we load in the test file, we're - // loading a csv that might have empty values (null). Mongo - // reads those in and puts them as empty strings. - // - // During schema merges, we'll widen "null" types to whatever - // type we're merging with. This _should_ be more resilient to - // empty values. - DataType::Null - } else { - DataType::Utf8 - } - } - Bson::Array(array_doc) => match array_doc.to_owned().pop() { - Some(val) => bson_to_arrow_type(0, &val)?, - None => DataType::Utf8, - }, + Ok(match bson { + Bson::Array(array_doc) => DataType::new_list( + bson_to_arrow_type(0, &array_doc.to_owned().pop().unwrap_or(Bson::Null))?, + true, + ), Bson::Document(nested) => { - let fields = fields_from_document(depth + 1, nested.iter())?; - DataType::Struct(fields.into()) + DataType::Struct(fields_from_document(depth + 1, nested.iter())?.into()) } + Bson::String(_) => DataType::Utf8, + Bson::Double(_) => DataType::Float64, Bson::Boolean(_) => DataType::Boolean, Bson::Null => DataType::Null, - Bson::RegularExpression(_) => DataType::Utf8, - Bson::JavaScriptCode(_) => DataType::Utf8, - Bson::JavaScriptCodeWithScope(_) => { - return Err(BsonError::UnsupportedBsonType("CodeWithScope")) - } Bson::Int32(_) => DataType::Float64, Bson::Int64(_) => DataType::Float64, - Bson::Timestamp(_) => return Err(BsonError::UnsupportedBsonType("OplogTimestamp")), Bson::Binary(_) => DataType::Binary, Bson::ObjectId(_) => DataType::Utf8, - Bson::DateTime(_) => DataType::Timestamp(TimeUnit::Millisecond, None), + Bson::DateTime(_) => DataType::Date64, Bson::Symbol(_) => DataType::Utf8, Bson::Decimal128(_) => DataType::Decimal128(38, 10), Bson::Undefined => DataType::Null, + Bson::RegularExpression(_) => DataType::Utf8, + Bson::JavaScriptCode(_) => DataType::Utf8, + + // TODO: storing these (which exist to establish a total order + // of types for indexing in the MongoDB server,) in documents + // that GlareDB would interact with is probably always an + // error. Bson::MaxKey => DataType::Utf8, Bson::MinKey => DataType::Utf8, - Bson::DbPointer(_) => return Err(BsonError::UnsupportedBsonType("DbPointer")), - }; - Ok(arrow_typ) + + // Deprecated or MongoDB server intrenal types + Bson::JavaScriptCodeWithScope(_) => return Err(BsonError::UnspportedType("CodeWithScope")), + Bson::Timestamp(_) => return Err(BsonError::UnspportedType("OplogTimestamp")), + Bson::DbPointer(_) => return Err(BsonError::UnspportedType("DbPointer")), + }) } #[derive(Debug, Clone)] struct OrderedField(usize, Field); -pub fn merge_schemas(schemas: impl IntoIterator) -> Result { +pub fn merge_schemas(schemas: impl IntoIterator>) -> Result { let mut fields: HashMap = HashMap::new(); for schema in schemas.into_iter() { + let schema = schema?; for (idx, field) in schema.fields.into_iter().enumerate() { match fields.get_mut(field.name()) { Some(existing) => { diff --git a/crates/datasources/src/bson/stream.rs b/crates/datasources/src/bson/stream.rs index e35115dbb..dd272254c 100644 --- a/crates/datasources/src/bson/stream.rs +++ b/crates/datasources/src/bson/stream.rs @@ -1,32 +1,35 @@ use std::pin::Pin; -use std::sync::Arc; -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; +use bson::RawDocumentBuf; +use datafusion::arrow::datatypes::{Schema, SchemaRef}; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::error::DataFusionError; use datafusion::execution::TaskContext; use datafusion::physical_plan::streaming::PartitionStream; -use datafusion::physical_plan::SendableRecordBatchStream; +use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream}; use futures::Stream; use futures::StreamExt; -use bson::Document; -use datafusion::arrow::datatypes::{Schema, SchemaRef}; -use datafusion::arrow::record_batch::RecordBatch; -use datafusion::error::DataFusionError; -use datafusion::physical_plan::RecordBatchStream; - use super::builder::RecordStructBuilder; +use super::errors::BsonError; + +pub type SendableDocumentStream = + Pin> + Send>>; pub struct BsonStream { schema: Arc, - stream: Pin> + Send>>, + stream: Pin> + Send>>, } impl Stream for BsonStream { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.stream.poll_next_unpin(cx) + self.stream + .poll_next_unpin(cx) + .map_err(DataFusionError::from) } } @@ -37,10 +40,7 @@ impl RecordBatchStream for BsonStream { } impl BsonStream { - pub fn new( - schema: Arc, - docs: Pin> + Send>>, - ) -> Self { + pub fn new(schema: Arc, docs: SendableDocumentStream) -> Self { let stream_schema = schema.clone(); let stream = docs @@ -52,16 +52,14 @@ impl BsonStream { } fn convert_chunk( - results: Vec>, + results: Vec>, schema: &Arc, - ) -> Result { + ) -> Result { let mut builder = RecordStructBuilder::new_with_capacity(schema.fields().to_owned(), 100)?; for result in results { - let item = result?; - let raw = bson::RawDocumentBuf::try_from(&item) - .map_err(|e| DataFusionError::External(Box::new(e)))?; - builder.append_record(&raw)?; + // TOOD: shouldn't convert here. + builder.append_record(&result?)?; } let (fields, builders) = builder.into_fields_and_builders(); @@ -74,9 +72,6 @@ impl BsonStream { } } -pub type SendableDocumentStream = - Pin> + Send>>; - pub struct BsonPartitionStream { schema: Arc, stream: Mutex>, diff --git a/crates/datasources/src/bson/table.rs b/crates/datasources/src/bson/table.rs index 10b7e4b31..6d2679de5 100644 --- a/crates/datasources/src/bson/table.rs +++ b/crates/datasources/src/bson/table.rs @@ -1,9 +1,8 @@ use std::collections::VecDeque; use std::sync::Arc; -use bson::Document; +use bson::RawDocumentBuf; use bytes::BytesMut; -use datafusion::common::DataFusionError; use datafusion::datasource::streaming::StreamingTable; use datafusion::datasource::TableProvider; use datafusion::parquet::data_type::AsBytes; @@ -11,8 +10,7 @@ use datafusion::physical_plan::streaming::PartitionStream; use futures::StreamExt; use tokio_util::codec::LengthDelimitedCodec; -use datafusion_ext::errors::ExtensionError; - +use crate::bson::errors::BsonError; use crate::bson::schema::{merge_schemas, schema_from_document}; use crate::bson::stream::BsonPartitionStream; use crate::common::url::DatasourceUrl; @@ -23,28 +21,21 @@ pub async fn bson_streaming_table( store_access: GenericStoreAccess, schema_inference_sample_size: Option, source_url: DatasourceUrl, -) -> Result, ExtensionError> { +) -> Result, BsonError> { // TODO: set a maximum (1024?) or have an adaptive mode // (at least n but stop after n the same) or skip documents let sample_size = schema_inference_sample_size.unwrap_or(100); let path = source_url.path(); - let store = store_access - .create_store() - .map_err(|e| ExtensionError::Arrow(e.into()))?; + let store = store_access.create_store()?; // assume that the file type is a glob and see if there are // more files... - let mut list = store_access - .list_globbed(&store, path.as_ref()) - .await - .map_err(|e| ExtensionError::Arrow(e.into()))?; + let mut list = store_access.list_globbed(&store, path.as_ref()).await?; if list.is_empty() { - return Err(ExtensionError::String(format!( - "no matching objects for '{path}'" - ))); + return Err(BsonError::NotFound(path.into_owned())); } // for consistent results, particularly for the sample, always @@ -87,12 +78,10 @@ pub async fn bson_streaming_table( // docuemnts are a superset of the schema, we'll end up // doing much more parsing work than is actually needed // for the bson documents. - |bt: Result| -> Result { - Ok(bson::de::from_slice::( + |bt: Result| -> Result { + Ok(bson::de::from_slice::( bt?.freeze().as_bytes().to_owned().as_slice(), - ) - .map_err(|e| DataFusionError::External(Box::new(e)))? - .to_owned()) + )?) }, ), ); @@ -100,13 +89,13 @@ pub async fn bson_streaming_table( // iterate through the readers and build up a sample of the first // documents to be used to infer the schema. - let mut sample = Vec::::with_capacity(sample_size as usize); + let mut sample = Vec::with_capacity(sample_size as usize); let mut first_active: usize = 0; 'readers: for reader in readers.iter_mut() { while let Some(res) = reader.next().await { match res { Ok(doc) => sample.push(doc), - Err(e) => return Err(e.into()), + Err(e) => return Err(e), }; if sample.len() >= sample_size as usize { @@ -128,10 +117,11 @@ pub async fn bson_streaming_table( // of as a base-level projection, but we'd need a schema specification // language). Or have some other strategy for inference rather than // every unique field from the first documents. - let schema = Arc::new( - merge_schemas(sample.iter().map(|doc| schema_from_document(doc).unwrap())) - .map_err(|e| ExtensionError::String(e.to_string()))?, - ); + let schema = Arc::new(merge_schemas( + sample + .iter() + .map(|doc| schema_from_document(doc.to_document()?)), + )?); let mut streams = Vec::>::with_capacity(readers.len() + 1); @@ -144,7 +134,7 @@ pub async fn bson_streaming_table( futures::stream::iter( sample .into_iter() - .map(|doc| -> Result { Ok(doc) }), + .map(|doc| -> Result { Ok(doc) }), ) .boxed(), ))); diff --git a/crates/datasources/src/mongodb/errors.rs b/crates/datasources/src/mongodb/errors.rs index f0242e7a0..aa449e771 100644 --- a/crates/datasources/src/mongodb/errors.rs +++ b/crates/datasources/src/mongodb/errors.rs @@ -10,13 +10,16 @@ pub enum MongoError { InvalidProtocol(String), #[error(transparent)] - Mongo(#[from] mongodb::error::Error), + MongoDB(#[from] mongodb::error::Error), #[error(transparent)] Arrow(#[from] datafusion::arrow::error::ArrowError), #[error(transparent)] Bson(#[from] crate::bson::errors::BsonError), + + #[error(transparent)] + RawBSON(#[from] mongodb::bson::raw::Error), } pub type Result = std::result::Result; diff --git a/crates/datasources/src/mongodb/infer.rs b/crates/datasources/src/mongodb/infer.rs index 7e5a2a8aa..c034e5ced 100644 --- a/crates/datasources/src/mongodb/infer.rs +++ b/crates/datasources/src/mongodb/infer.rs @@ -41,7 +41,7 @@ impl TableSampler { let mut schemas = Vec::with_capacity(sample_count as usize); while let Some(doc) = cursor.try_next().await? { - let schema = schema_from_document(&doc)?; + let schema = schema_from_document(doc); schemas.push(schema); } diff --git a/crates/datasources/src/object_store/errors.rs b/crates/datasources/src/object_store/errors.rs index fa12030f3..74e460491 100644 --- a/crates/datasources/src/object_store/errors.rs +++ b/crates/datasources/src/object_store/errors.rs @@ -1,4 +1,5 @@ use datafusion::arrow::error::ArrowError; +use datafusion_ext::errors::ExtensionError; #[derive(Debug, thiserror::Error)] pub enum ObjectStoreSourceError { @@ -47,3 +48,9 @@ impl From for ArrowError { ArrowError::ExternalError(Box::new(e)) } } + +impl From for ExtensionError { + fn from(e: ObjectStoreSourceError) -> Self { + ExtensionError::ObjectStore(e.to_string()) + } +} diff --git a/crates/sqlbuiltins/src/functions/table/bson.rs b/crates/sqlbuiltins/src/functions/table/bson.rs index e1402dfe4..c02b9824f 100644 --- a/crates/sqlbuiltins/src/functions/table/bson.rs +++ b/crates/sqlbuiltins/src/functions/table/bson.rs @@ -19,8 +19,8 @@ pub struct BsonScan; impl ConstBuiltinFunction for BsonScan { const NAME: &'static str = "read_bson"; - const DESCRIPTION: &'static str = "Reads one or more bson files. Supports globbing."; - const EXAMPLE: &'static str = "SELECT * FROM bson_scan('file:///path/to/table*.bson')"; + const DESCRIPTION: &'static str = "Reads one or more BSON files. Supports globbing."; + const EXAMPLE: &'static str = "SELECT * FROM read_bson('./path/to/table*.bson')"; const FUNCTION_TYPE: FunctionType = FunctionType::TableReturning; } @@ -69,9 +69,8 @@ impl TableFunc for BsonScan { let store_access = GenericStoreAccess::new_from_location_and_opts( source_url.to_string().as_str(), storage_options, - ) - .map_err(|e| ExtensionError::Arrow(e.into()))?; + )?; - bson_streaming_table(store_access, Some(sample_size), source_url).await + Ok(bson_streaming_table(store_access, Some(sample_size), source_url).await?) } } diff --git a/crates/sqlexec/src/dispatch/mod.rs b/crates/sqlexec/src/dispatch/mod.rs index 7ed5b36cd..03e88c0fd 100644 --- a/crates/sqlexec/src/dispatch/mod.rs +++ b/crates/sqlexec/src/dispatch/mod.rs @@ -86,6 +86,8 @@ pub enum DispatchError { #[error(transparent)] SqlServerError(#[from] datasources::sqlserver::errors::SqlServerError), #[error(transparent)] + BsonDatasource(#[from] datasources::bson::errors::BsonError), + #[error(transparent)] NativeDatasource(#[from] datasources::native::errors::NativeError), #[error(transparent)] CommonDatasource(#[from] datasources::common::errors::DatasourceCommonError), diff --git a/scripts/create-test-mongo-db.sh b/scripts/create-test-mongo-db.sh index a4c62a8e9..f8ccd9aa3 100755 --- a/scripts/create-test-mongo-db.sh +++ b/scripts/create-test-mongo-db.sh @@ -36,6 +36,7 @@ docker cp \ docker exec $CONTAINER_ID mongoimport \ --type csv \ --headerline \ + --ignoreBlanks \ "mongodb://localhost:27017/${DB_NAME}" \ /tmp/bikeshare_stations.csv 1>&2