Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add object store table support for bson format data #2600

Merged
merged 11 commits into from
Feb 22, 2024
3 changes: 1 addition & 2 deletions crates/datasources/src/bson/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ use crate::bson::errors::BsonError;
use crate::bson::schema::{merge_schemas, schema_from_document};
use crate::bson::stream::BsonPartitionStream;
use crate::common::url::DatasourceUrl;
use crate::object_store::generic::GenericStoreAccess;
use crate::object_store::ObjStoreAccess;

pub async fn bson_streaming_table(
store_access: GenericStoreAccess,
store_access: Arc<dyn ObjStoreAccess>,
schema_inference_sample_size: Option<i64>,
source_url: DatasourceUrl,
) -> Result<Arc<dyn TableProvider>, BsonError> {
Expand Down
2 changes: 1 addition & 1 deletion crates/sqlbuiltins/src/functions/table/bson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,6 @@ impl TableFunc for BsonScan {
storage_options,
)?;

Ok(bson_streaming_table(store_access, Some(sample_size), url).await?)
Ok(bson_streaming_table(Arc::new(store_access), Some(sample_size), url).await?)
}
}
70 changes: 45 additions & 25 deletions crates/sqlexec/src/dispatch/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ use std::str::FromStr;
use std::sync::Arc;

use catalog::session_catalog::SessionCatalog;
use datafusion::common::FileType;
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
use datafusion::datasource::file_format::json::JsonFormat;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::TableProvider;
use datafusion::prelude::SessionContext;
use datafusion_ext::functions::{DefaultTableContextProvider, FuncParamValue};
Expand Down Expand Up @@ -545,10 +543,12 @@ impl<'a> ExternalDispatcher<'a> {
storage_options.to_owned(),
)?;
let source_url = DatasourceUrl::try_new(location)?;
Ok(
bson_streaming_table(store_access, schema_sample_size.to_owned(), source_url)
.await?,
Ok(bson_streaming_table(
Arc::new(store_access),
schema_sample_size.to_owned(),
source_url,
)
.await?)
}
TableOptions::Cassandra(TableOptionsCassandra {
host,
Expand Down Expand Up @@ -592,27 +592,47 @@ impl<'a> ExternalDispatcher<'a> {
.transpose()?
.unwrap_or(FileCompressionType::UNCOMPRESSED);

let ft: FileType = file_type.parse()?;
let ft: Arc<dyn FileFormat> = match ft {
FileType::CSV => Arc::new(
CsvFormat::default()
.with_file_compression_type(compression)
.with_schema_infer_max_rec(Some(20480)),
),
FileType::PARQUET => Arc::new(ParquetFormat::default()),
FileType::JSON => {
Arc::new(JsonFormat::default().with_file_compression_type(compression))
}
_ => return Err(DispatchError::InvalidDispatch("Unsupported file type")),
};

let accessor = ObjStoreAccessor::new(access)?;
let objects = accessor.list_globbed(path).await?;

let state = self.df_ctx.state();
let provider = accessor.into_table_provider(&state, ft, objects).await?;
let accessor = ObjStoreAccessor::new(access.clone())?;

Ok(provider)
match file_type {
"csv" => Ok(accessor
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently the file type is getting stored with with quotes, so matching on "\"csv\"" works...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it's not getting stored as a quote but just printed into a quoted form when it's read which is easy enough clean up.

.clone()
.into_table_provider(
&self.df_ctx.state(),
Arc::new(
CsvFormat::default()
.with_file_compression_type(compression)
.with_schema_infer_max_rec(Some(20480)),
),
accessor.clone().list_globbed(path).await?,
)
.await?),
"parquet" => Ok(accessor
.clone()
.into_table_provider(
&self.df_ctx.state(),
Arc::new(ParquetFormat::default()),
accessor.clone().list_globbed(path).await?,
)
.await?),
"ndjson" | "json" => Ok(accessor
.clone()
.into_table_provider(
&self.df_ctx.state(),
Arc::new(JsonFormat::default().with_file_compression_type(compression)),
accessor.clone().list_globbed(path).await?,
)
.await?),
"bson" => {
Ok(
bson_streaming_table(access.clone(), Some(128), DatasourceUrl::try_new(path)?)
.await?,
)
}
_ => Err(DispatchError::String(
format!("Unsupported file type: {}, for '{}'", file_type, path,).to_string(),
)),
}
}

pub async fn dispatch_function(
Expand Down
18 changes: 11 additions & 7 deletions crates/sqlexec/src/planner/session_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use datafusion::arrow::datatypes::{
DECIMAL_DEFAULT_SCALE,
};
use datafusion::common::parsers::CompressionTypeVariant;
use datafusion::common::{FileType, OwnedSchemaReference, OwnedTableReference, ToDFSchema};
use datafusion::common::{OwnedSchemaReference, OwnedTableReference, ToDFSchema};
use datafusion::logical_expr::{cast, col, LogicalPlanBuilder};
use datafusion::sql::planner::{object_name_to_table_reference, IdentNormalizer, PlannerContext};
use datafusion::sql::sqlparser::ast::{self, Ident, ObjectName, ObjectType};
Expand Down Expand Up @@ -714,7 +714,7 @@ impl<'a> SessionPlanner<'a> {
bucket,
service_account_key,
location,
file_type: file_type.to_string(),
file_type,
compression: compression.map(|c| c.to_string()),
})
}
Expand Down Expand Up @@ -2030,7 +2030,7 @@ async fn validate_and_get_file_type_and_compression(
access: Arc<dyn ObjStoreAccess>,
path: impl AsRef<str>,
m: &mut StmtOptions,
) -> Result<(FileType, Option<CompressionTypeVariant>)> {
) -> Result<(String, Option<CompressionTypeVariant>)> {
let path = path.as_ref();
let accessor =
ObjStoreAccessor::new(access.clone()).map_err(|e| PlanError::InvalidExternalTable {
Expand Down Expand Up @@ -2063,24 +2063,28 @@ async fn validate_and_get_file_type_and_compression(
.and_then(|ext| ext.parse().ok()),
};

let file_type = match m.remove_optional::<FileType>("file_type")? {
let file_type = match m.remove_optional("file_type")? {
Some(file_type) => file_type,
None => {
let mut ft = None;
for obj in objects {
ft = match file_type_from_path(&obj.location) {
Err(_) => continue,
Ok(file_type) => Some(file_type),
Ok(file_type) => Some(file_type.to_string()),
Err(_) => match obj.location.extension() {
Some("bson") => Some("bson".to_string()),
_ => continue,
},
};
}

ft.ok_or_else(|| PlanError::InvalidExternalTable {
source: Box::new(internal!(
"unable to resolve file type from the objects, try passing `file_type` option"
)),
})?
.to_string()
}
};

Ok((file_type, compression))
}

Expand Down
14 changes: 14 additions & 0 deletions testdata/sqllogictests_object_store/gcs/external_table.slt
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,17 @@ select * from ext_table_2;
----
5 6
7 8



statement ok
create external table bson_beatles from gcs options (
service_account_key '${GCP_SERVICE_ACCOUNT_KEY}',
bucket '${GCS_BUCKET_NAME}',
location 'beatles.100.bson'
);

query I
select count(*) from bson_beatles;
----
100
Loading