Skip to content

Commit

Permalink
feat: add object store table support for bson format data (#2600)
Browse files Browse the repository at this point in the history
  • Loading branch information
tychoish authored Feb 22, 2024
1 parent 1ab8eb0 commit 20804d0
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 35 deletions.
3 changes: 1 addition & 2 deletions crates/datasources/src/bson/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ use crate::bson::errors::BsonError;
use crate::bson::schema::{merge_schemas, schema_from_document};
use crate::bson::stream::BsonPartitionStream;
use crate::common::url::DatasourceUrl;
use crate::object_store::generic::GenericStoreAccess;
use crate::object_store::ObjStoreAccess;

pub async fn bson_streaming_table(
store_access: GenericStoreAccess,
store_access: Arc<dyn ObjStoreAccess>,
schema_inference_sample_size: Option<i64>,
source_url: DatasourceUrl,
) -> Result<Arc<dyn TableProvider>, BsonError> {
Expand Down
2 changes: 1 addition & 1 deletion crates/sqlbuiltins/src/functions/table/bson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,6 @@ impl TableFunc for BsonScan {
storage_options,
)?;

Ok(bson_streaming_table(store_access, Some(sample_size), url).await?)
Ok(bson_streaming_table(Arc::new(store_access), Some(sample_size), url).await?)
}
}
70 changes: 45 additions & 25 deletions crates/sqlexec/src/dispatch/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ use std::str::FromStr;
use std::sync::Arc;

use catalog::session_catalog::SessionCatalog;
use datafusion::common::FileType;
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
use datafusion::datasource::file_format::json::JsonFormat;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::TableProvider;
use datafusion::prelude::SessionContext;
use datafusion_ext::functions::{DefaultTableContextProvider, FuncParamValue};
Expand Down Expand Up @@ -545,10 +543,12 @@ impl<'a> ExternalDispatcher<'a> {
storage_options.to_owned(),
)?;
let source_url = DatasourceUrl::try_new(location)?;
Ok(
bson_streaming_table(store_access, schema_sample_size.to_owned(), source_url)
.await?,
Ok(bson_streaming_table(
Arc::new(store_access),
schema_sample_size.to_owned(),
source_url,
)
.await?)
}
TableOptions::Cassandra(TableOptionsCassandra {
host,
Expand Down Expand Up @@ -592,27 +592,47 @@ impl<'a> ExternalDispatcher<'a> {
.transpose()?
.unwrap_or(FileCompressionType::UNCOMPRESSED);

let ft: FileType = file_type.parse()?;
let ft: Arc<dyn FileFormat> = match ft {
FileType::CSV => Arc::new(
CsvFormat::default()
.with_file_compression_type(compression)
.with_schema_infer_max_rec(Some(20480)),
),
FileType::PARQUET => Arc::new(ParquetFormat::default()),
FileType::JSON => {
Arc::new(JsonFormat::default().with_file_compression_type(compression))
}
_ => return Err(DispatchError::InvalidDispatch("Unsupported file type")),
};

let accessor = ObjStoreAccessor::new(access)?;
let objects = accessor.list_globbed(path).await?;

let state = self.df_ctx.state();
let provider = accessor.into_table_provider(&state, ft, objects).await?;
let accessor = ObjStoreAccessor::new(access.clone())?;

Ok(provider)
match file_type {
"csv" => Ok(accessor
.clone()
.into_table_provider(
&self.df_ctx.state(),
Arc::new(
CsvFormat::default()
.with_file_compression_type(compression)
.with_schema_infer_max_rec(Some(20480)),
),
accessor.clone().list_globbed(path).await?,
)
.await?),
"parquet" => Ok(accessor
.clone()
.into_table_provider(
&self.df_ctx.state(),
Arc::new(ParquetFormat::default()),
accessor.clone().list_globbed(path).await?,
)
.await?),
"ndjson" | "json" => Ok(accessor
.clone()
.into_table_provider(
&self.df_ctx.state(),
Arc::new(JsonFormat::default().with_file_compression_type(compression)),
accessor.clone().list_globbed(path).await?,
)
.await?),
"bson" => {
Ok(
bson_streaming_table(access.clone(), Some(128), DatasourceUrl::try_new(path)?)
.await?,
)
}
_ => Err(DispatchError::String(
format!("Unsupported file type: {}, for '{}'", file_type, path,).to_string(),
)),
}
}

pub async fn dispatch_function(
Expand Down
19 changes: 12 additions & 7 deletions crates/sqlexec/src/planner/session_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use datafusion::arrow::datatypes::{
DECIMAL_DEFAULT_SCALE,
};
use datafusion::common::parsers::CompressionTypeVariant;
use datafusion::common::{FileType, OwnedSchemaReference, OwnedTableReference, ToDFSchema};
use datafusion::common::{OwnedSchemaReference, OwnedTableReference, ToDFSchema};
use datafusion::logical_expr::{cast, col, LogicalPlanBuilder};
use datafusion::sql::planner::{object_name_to_table_reference, IdentNormalizer, PlannerContext};
use datafusion::sql::sqlparser::ast::{self, Ident, ObjectName, ObjectType};
Expand Down Expand Up @@ -687,7 +687,7 @@ impl<'a> SessionPlanner<'a> {

TableOptions::Local(TableOptionsLocal {
location,
file_type: format!("{file_type:?}").to_lowercase(),
file_type: file_type.to_string().to_lowercase(),
compression: compression.map(|c| c.to_string()),
})
}
Expand All @@ -714,7 +714,7 @@ impl<'a> SessionPlanner<'a> {
bucket,
service_account_key,
location,
file_type: file_type.to_string(),
file_type,
compression: compression.map(|c| c.to_string()),
})
}
Expand Down Expand Up @@ -2030,7 +2030,7 @@ async fn validate_and_get_file_type_and_compression(
access: Arc<dyn ObjStoreAccess>,
path: impl AsRef<str>,
m: &mut StmtOptions,
) -> Result<(FileType, Option<CompressionTypeVariant>)> {
) -> Result<(String, Option<CompressionTypeVariant>)> {
let path = path.as_ref();
let accessor =
ObjStoreAccessor::new(access.clone()).map_err(|e| PlanError::InvalidExternalTable {
Expand Down Expand Up @@ -2063,21 +2063,26 @@ async fn validate_and_get_file_type_and_compression(
.and_then(|ext| ext.parse().ok()),
};

let file_type = match m.remove_optional::<FileType>("file_type")? {
let file_type = match m.remove_optional("file_type")? {
Some(file_type) => file_type,
None => {
let mut ft = None;
for obj in objects {
ft = match file_type_from_path(&obj.location) {
Err(_) => continue,
Ok(file_type) => Some(file_type),
Ok(file_type) => Some(file_type.to_string()),
Err(_) => match obj.location.extension() {
Some("bson") => Some("bson".to_string()),
_ => continue,
},
};
}

ft.ok_or_else(|| PlanError::InvalidExternalTable {
source: Box::new(internal!(
"unable to resolve file type from the objects, try passing `file_type` option"
)),
})?
.to_string()
}
};

Expand Down
14 changes: 14 additions & 0 deletions testdata/sqllogictests_object_store/gcs/external_table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,17 @@ select * from ext_table_2;
----
5 6
7 8



statement ok
create external table bson_beatles from gcs options (
service_account_key '${GCP_SERVICE_ACCOUNT_KEY}',
bucket '${GCS_BUCKET_NAME}',
location 'beatles.100.bson'
);

query I
select count(*) from bson_beatles;
----
100

0 comments on commit 20804d0

Please sign in to comment.