From 82021c67b7803eace8f8bd6d03c6dd97706fa2ca Mon Sep 17 00:00:00 2001 From: tycho garen Date: Mon, 5 Feb 2024 15:35:16 -0500 Subject: [PATCH 1/8] feat: add object store table support for bson format data --- crates/datasources/src/bson/table.rs | 3 +- .../sqlbuiltins/src/functions/table/bson.rs | 2 +- crates/sqlexec/src/dispatch/external.rs | 54 +++++++++++-------- 3 files changed, 35 insertions(+), 24 deletions(-) diff --git a/crates/datasources/src/bson/table.rs b/crates/datasources/src/bson/table.rs index 613593d70..5288fa5cb 100644 --- a/crates/datasources/src/bson/table.rs +++ b/crates/datasources/src/bson/table.rs @@ -14,11 +14,10 @@ use crate::bson::errors::BsonError; use crate::bson::schema::{merge_schemas, schema_from_document}; use crate::bson::stream::BsonPartitionStream; use crate::common::url::DatasourceUrl; -use crate::object_store::generic::GenericStoreAccess; use crate::object_store::ObjStoreAccess; pub async fn bson_streaming_table( - store_access: GenericStoreAccess, + store_access: Arc, schema_inference_sample_size: Option, source_url: DatasourceUrl, ) -> Result, BsonError> { diff --git a/crates/sqlbuiltins/src/functions/table/bson.rs b/crates/sqlbuiltins/src/functions/table/bson.rs index 1765d4bce..f335f2d1a 100644 --- a/crates/sqlbuiltins/src/functions/table/bson.rs +++ b/crates/sqlbuiltins/src/functions/table/bson.rs @@ -70,6 +70,6 @@ impl TableFunc for BsonScan { storage_options, )?; - Ok(bson_streaming_table(store_access, Some(sample_size), source_url).await?) + Ok(bson_streaming_table(Arc::new(store_access), Some(sample_size), source_url).await?) } } diff --git a/crates/sqlexec/src/dispatch/external.rs b/crates/sqlexec/src/dispatch/external.rs index bec780997..6665d7bcb 100644 --- a/crates/sqlexec/src/dispatch/external.rs +++ b/crates/sqlexec/src/dispatch/external.rs @@ -533,10 +533,12 @@ impl<'a> ExternalDispatcher<'a> { storage_options.to_owned(), )?; let source_url = DatasourceUrl::try_new(location)?; - Ok( - bson_streaming_table(store_access, schema_sample_size.to_owned(), source_url) - .await?, + Ok(bson_streaming_table( + Arc::new(store_access), + schema_sample_size.to_owned(), + source_url, ) + .await?) } TableOptions::Cassandra(TableOptionsCassandra { host, @@ -572,27 +574,37 @@ impl<'a> ExternalDispatcher<'a> { .transpose()? .unwrap_or(FileCompressionType::UNCOMPRESSED); - let ft: FileType = file_type.parse()?; - let ft: Arc = match ft { - FileType::CSV => Arc::new( - CsvFormat::default() - .with_file_compression_type(compression) - .with_schema_infer_max_rec(Some(20480)), - ), - FileType::PARQUET => Arc::new(ParquetFormat::default()), - FileType::JSON => { - Arc::new(JsonFormat::default().with_file_compression_type(compression)) - } - _ => return Err(DispatchError::InvalidDispatch("Unsupported file type")), - }; + match file_type.parse() { + Ok(v) => { + let ft: Arc = match v { + FileType::CSV => Arc::new( + CsvFormat::default() + .with_file_compression_type(compression) + .with_schema_infer_max_rec(Some(20480)), + ), + FileType::PARQUET => Arc::new(ParquetFormat::default()), + FileType::JSON => { + Arc::new(JsonFormat::default().with_file_compression_type(compression)) + } + _ => return Err(DispatchError::InvalidDispatch("Unsupported file type")), + }; - let accessor = ObjStoreAccessor::new(access)?; - let objects = accessor.list_globbed(path).await?; + let accessor = ObjStoreAccessor::new(access)?; + let objects = accessor.list_globbed(path).await?; - let state = self.df_ctx.state(); - let provider = accessor.into_table_provider(&state, ft, objects).await?; + let state = self.df_ctx.state(); + let provider = accessor.into_table_provider(&state, ft, objects).await?; - Ok(provider) + Ok(provider) + } + Err(e) => match file_type { + "bson" => { + let source_url = DatasourceUrl::try_new(path)?; + Ok(bson_streaming_table(access, Some(128), source_url).await?) + } + _ => Err(e.into()), + }, + } } pub async fn dispatch_function( From d51630eabe70bab19f0696aa69cddba5b227c5af Mon Sep 17 00:00:00 2001 From: tycho garen Date: Fri, 16 Feb 2024 18:10:18 -0500 Subject: [PATCH 2/8] add test --- .../gcs/external_table.slt | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/testdata/sqllogictests_object_store/gcs/external_table.slt b/testdata/sqllogictests_object_store/gcs/external_table.slt index 4044207f4..6043320c6 100644 --- a/testdata/sqllogictests_object_store/gcs/external_table.slt +++ b/testdata/sqllogictests_object_store/gcs/external_table.slt @@ -75,3 +75,24 @@ select * from ext_table_2; ---- 5 6 7 8 + + + +statement ok +create external table bson_beatles from gcs options ( + service_account_key '${GCP_SERVICE_ACCOUNT_KEY}', + bucket '${GCS_BUCKET_NAME}', + location 'beatles.100.bson' +); + +query I +select count(*) from bson_beatles; +---- +100 + +query I +select case from bson_beatles order by case limit 3; +---- +1 +2 +3 \ No newline at end of file From f317eb71f07763cb2bb707257634838e32d6d8d5 Mon Sep 17 00:00:00 2001 From: tycho garen Date: Fri, 16 Feb 2024 18:31:48 -0500 Subject: [PATCH 3/8] fix compile --- crates/sqlbuiltins/src/functions/table/bson.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/sqlbuiltins/src/functions/table/bson.rs b/crates/sqlbuiltins/src/functions/table/bson.rs index 9b73a9b19..28a1ca1e0 100644 --- a/crates/sqlbuiltins/src/functions/table/bson.rs +++ b/crates/sqlbuiltins/src/functions/table/bson.rs @@ -75,6 +75,6 @@ impl TableFunc for BsonScan { storage_options, )?; - Ok(bson_streaming_table(store_access, Some(sample_size), url).await?) + Ok(bson_streaming_table(Arc::new(store_access), Some(sample_size), url).await?) } } From 211435d4d79c7a97eba2b1e1412cf0322c471eab Mon Sep 17 00:00:00 2001 From: tycho garen Date: Fri, 16 Feb 2024 19:50:30 -0500 Subject: [PATCH 4/8] fixies --- crates/datasources/src/object_store/mod.rs | 3 ++- crates/sqlexec/src/planner/session_planner.rs | 15 ++++++++++----- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/crates/datasources/src/object_store/mod.rs b/crates/datasources/src/object_store/mod.rs index f43b440ec..ff8491ee5 100644 --- a/crates/datasources/src/object_store/mod.rs +++ b/crates/datasources/src/object_store/mod.rs @@ -364,11 +364,12 @@ impl TableProvider for ObjStoreTableProvider { } } -pub fn file_type_from_path(path: &ObjectStorePath) -> Result { +pub fn file_type_from_path(path: &ObjectStorePath) -> Result { path.extension() .ok_or(ObjectStoreSourceError::NoFileExtension)? .parse() .map_err(ObjectStoreSourceError::DataFusion) + .map(|ft: FileType| ft.to_string()) } pub fn init_session_registry<'a>( diff --git a/crates/sqlexec/src/planner/session_planner.rs b/crates/sqlexec/src/planner/session_planner.rs index 7c77263fa..1fda2845a 100644 --- a/crates/sqlexec/src/planner/session_planner.rs +++ b/crates/sqlexec/src/planner/session_planner.rs @@ -11,7 +11,7 @@ use datafusion::arrow::datatypes::{ DECIMAL_DEFAULT_SCALE, }; use datafusion::common::parsers::CompressionTypeVariant; -use datafusion::common::{FileType, OwnedSchemaReference, OwnedTableReference, ToDFSchema}; +use datafusion::common::{OwnedSchemaReference, OwnedTableReference, ToDFSchema}; use datafusion::logical_expr::{cast, col, LogicalPlanBuilder}; use datafusion::sql::planner::{object_name_to_table_reference, IdentNormalizer, PlannerContext}; use datafusion::sql::sqlparser::ast::{self, Ident, ObjectName, ObjectType}; @@ -689,7 +689,7 @@ impl<'a> SessionPlanner<'a> { bucket, service_account_key, location, - file_type: file_type.to_string(), + file_type: file_type, compression: compression.map(|c| c.to_string()), }) } @@ -1978,7 +1978,7 @@ async fn validate_and_get_file_type_and_compression( access: Arc, path: impl AsRef, m: &mut StmtOptions, -) -> Result<(FileType, Option)> { +) -> Result<(String, Option)> { let path = path.as_ref(); let accessor = ObjStoreAccessor::new(access.clone()).map_err(|e| PlanError::InvalidExternalTable { @@ -2011,21 +2011,26 @@ async fn validate_and_get_file_type_and_compression( .and_then(|ext| ext.parse().ok()), }; - let file_type = match m.remove_optional::("file_type")? { + let file_type = match m.remove_optional("file_type")? { Some(file_type) => file_type, None => { let mut ft = None; for obj in objects { ft = match file_type_from_path(&obj.location) { - Err(_) => continue, Ok(file_type) => Some(file_type), + Err(_) => match obj.location.extension() { + Some("bson") => Some("bson".to_string()), + _ => continue, + }, }; } + ft.ok_or_else(|| PlanError::InvalidExternalTable { source: Box::new(internal!( "unable to resolve file type from the objects, try passing `file_type` option" )), })? + .to_string() } }; From 79369626dba565b0883c9513a6b4ac781c0510d2 Mon Sep 17 00:00:00 2001 From: tycho garen Date: Fri, 16 Feb 2024 20:49:48 -0500 Subject: [PATCH 5/8] omit --- testdata/sqllogictests_object_store/gcs/external_table.slt | 7 ------- 1 file changed, 7 deletions(-) diff --git a/testdata/sqllogictests_object_store/gcs/external_table.slt b/testdata/sqllogictests_object_store/gcs/external_table.slt index 6043320c6..647e9c5c4 100644 --- a/testdata/sqllogictests_object_store/gcs/external_table.slt +++ b/testdata/sqllogictests_object_store/gcs/external_table.slt @@ -89,10 +89,3 @@ query I select count(*) from bson_beatles; ---- 100 - -query I -select case from bson_beatles order by case limit 3; ----- -1 -2 -3 \ No newline at end of file From bba0355fbb57ede2ceb365f9cf9b7e62774aa2c8 Mon Sep 17 00:00:00 2001 From: tycho garen Date: Fri, 16 Feb 2024 20:50:37 -0500 Subject: [PATCH 6/8] lint --- crates/sqlexec/src/planner/session_planner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/sqlexec/src/planner/session_planner.rs b/crates/sqlexec/src/planner/session_planner.rs index 1fda2845a..7aba42ee4 100644 --- a/crates/sqlexec/src/planner/session_planner.rs +++ b/crates/sqlexec/src/planner/session_planner.rs @@ -689,7 +689,7 @@ impl<'a> SessionPlanner<'a> { bucket, service_account_key, location, - file_type: file_type, + file_type, compression: compression.map(|c| c.to_string()), }) } From 644aae9398f4e684a396ef930c73c8db825f8173 Mon Sep 17 00:00:00 2001 From: tycho garen Date: Wed, 21 Feb 2024 15:45:42 -0500 Subject: [PATCH 7/8] beep --- crates/datasources/src/object_store/mod.rs | 3 +- crates/sqlexec/src/dispatch/external.rs | 62 +++++++++++-------- crates/sqlexec/src/planner/session_planner.rs | 3 +- 3 files changed, 37 insertions(+), 31 deletions(-) diff --git a/crates/datasources/src/object_store/mod.rs b/crates/datasources/src/object_store/mod.rs index 09f334e46..3f925ecb6 100644 --- a/crates/datasources/src/object_store/mod.rs +++ b/crates/datasources/src/object_store/mod.rs @@ -364,12 +364,11 @@ impl TableProvider for ObjStoreTableProvider { } } -pub fn file_type_from_path(path: &ObjectStorePath) -> Result { +pub fn file_type_from_path(path: &ObjectStorePath) -> Result { path.extension() .ok_or(ObjectStoreSourceError::NoFileExtension)? .parse() .map_err(ObjectStoreSourceError::DataFusion) - .map(|ft: FileType| ft.to_string()) } pub fn init_session_registry<'a>( diff --git a/crates/sqlexec/src/dispatch/external.rs b/crates/sqlexec/src/dispatch/external.rs index 365320991..8ce45981f 100644 --- a/crates/sqlexec/src/dispatch/external.rs +++ b/crates/sqlexec/src/dispatch/external.rs @@ -3,12 +3,10 @@ use std::str::FromStr; use std::sync::Arc; use catalog::session_catalog::SessionCatalog; -use datafusion::common::FileType; use datafusion::datasource::file_format::csv::CsvFormat; use datafusion::datasource::file_format::file_compression_type::FileCompressionType; use datafusion::datasource::file_format::json::JsonFormat; use datafusion::datasource::file_format::parquet::ParquetFormat; -use datafusion::datasource::file_format::FileFormat; use datafusion::datasource::TableProvider; use datafusion::prelude::SessionContext; use datafusion_ext::functions::{DefaultTableContextProvider, FuncParamValue}; @@ -594,36 +592,46 @@ impl<'a> ExternalDispatcher<'a> { .transpose()? .unwrap_or(FileCompressionType::UNCOMPRESSED); - match file_type.parse() { - Ok(v) => { - let ft: Arc = match v { - FileType::CSV => Arc::new( + let accessor = ObjStoreAccessor::new(access.clone())?; + + match file_type { + "csv" => Ok(accessor + .clone() + .into_table_provider( + &self.df_ctx.state(), + Arc::new( CsvFormat::default() .with_file_compression_type(compression) .with_schema_infer_max_rec(Some(20480)), ), - FileType::PARQUET => Arc::new(ParquetFormat::default()), - FileType::JSON => { - Arc::new(JsonFormat::default().with_file_compression_type(compression)) - } - _ => return Err(DispatchError::InvalidDispatch("Unsupported file type")), - }; - - let accessor = ObjStoreAccessor::new(access)?; - let objects = accessor.list_globbed(path).await?; - - let state = self.df_ctx.state(); - let provider = accessor.into_table_provider(&state, ft, objects).await?; - - Ok(provider) + accessor.clone().list_globbed(path).await?, + ) + .await?), + "parquet" => Ok(accessor + .clone() + .into_table_provider( + &self.df_ctx.state(), + Arc::new(ParquetFormat::default()), + accessor.clone().list_globbed(path).await?, + ) + .await?), + "ndjson" | "json" => Ok(accessor + .clone() + .into_table_provider( + &self.df_ctx.state(), + Arc::new(JsonFormat::default().with_file_compression_type(compression)), + accessor.clone().list_globbed(path).await?, + ) + .await?), + "bson" => { + Ok( + bson_streaming_table(access.clone(), Some(128), DatasourceUrl::try_new(path)?) + .await?, + ) } - Err(e) => match file_type { - "bson" => { - let source_url = DatasourceUrl::try_new(path)?; - Ok(bson_streaming_table(access, Some(128), source_url).await?) - } - _ => Err(e.into()), - }, + _ => Err(DispatchError::String( + format!("Unsupported file type: {}, for '{}'", file_type, path,).to_string(), + )), } } diff --git a/crates/sqlexec/src/planner/session_planner.rs b/crates/sqlexec/src/planner/session_planner.rs index ac2459030..729316dfb 100644 --- a/crates/sqlexec/src/planner/session_planner.rs +++ b/crates/sqlexec/src/planner/session_planner.rs @@ -2069,7 +2069,7 @@ async fn validate_and_get_file_type_and_compression( let mut ft = None; for obj in objects { ft = match file_type_from_path(&obj.location) { - Ok(file_type) => Some(file_type), + Ok(file_type) => Some(file_type.to_string()), Err(_) => match obj.location.extension() { Some("bson") => Some("bson".to_string()), _ => continue, @@ -2085,7 +2085,6 @@ async fn validate_and_get_file_type_and_compression( .to_string() } }; - Ok((file_type, compression)) } From 274de863940528201e76fd74f53fd618d9090945 Mon Sep 17 00:00:00 2001 From: tycho garen Date: Thu, 22 Feb 2024 15:50:25 -0500 Subject: [PATCH 8/8] fix quoting --- crates/sqlexec/src/planner/session_planner.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/sqlexec/src/planner/session_planner.rs b/crates/sqlexec/src/planner/session_planner.rs index 729316dfb..5c26c1c46 100644 --- a/crates/sqlexec/src/planner/session_planner.rs +++ b/crates/sqlexec/src/planner/session_planner.rs @@ -687,7 +687,7 @@ impl<'a> SessionPlanner<'a> { TableOptions::Local(TableOptionsLocal { location, - file_type: format!("{file_type:?}").to_lowercase(), + file_type: file_type.to_string().to_lowercase(), compression: compression.map(|c| c.to_string()), }) } @@ -2085,6 +2085,7 @@ async fn validate_and_get_file_type_and_compression( .to_string() } }; + Ok((file_type, compression)) }