Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: predicate pushdowns for supported filetypes #2584

Merged
merged 2 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion crates/datafusion_ext/src/runtime/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ impl TableProvider for RuntimeAwareTableProvider {
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
self.provider.supports_filters_pushdown(filters)
let supports_pushdowns = self.provider.supports_filters_pushdown(filters)?;

Ok(supports_pushdowns)
}

fn statistics(&self) -> Option<Statistics> {
Expand Down
35 changes: 32 additions & 3 deletions crates/datasources/src/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ use datafusion::error::{DataFusionError, Result as DatafusionResult};
use datafusion::execution::context::SessionState;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::logical_expr::TableType;
use datafusion::logical_expr::{TableProviderFilterPushDown, TableType};
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::union::UnionExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::{project_schema, ExecutionPlan};
use datafusion::prelude::Expr;
use datafusion_ext::metrics::ReadOnlyDataSourceMetricsExecAdapter;
use errors::{ObjectStoreSourceError, Result};
Expand Down Expand Up @@ -105,6 +106,16 @@ impl TableProvider for MultiSourceTableProvider {
Ok(Arc::new(UnionExec::new(plans)))
}
}
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> datafusion::common::Result<Vec<TableProviderFilterPushDown>> {
// we just look at the first source
self.sources
.first()
.unwrap()
.supports_filters_pushdown(filters)
}
}

#[async_trait]
Expand Down Expand Up @@ -308,6 +319,13 @@ impl TableProvider for ObjStoreTableProvider {
.buffered(ctx.config_options().execution.meta_fetch_concurrency);
let (files, statistics) = get_statistics_with_limit(files, self.schema(), limit).await?;

// If there are no files, return an empty exec plan.
if files.is_empty() {
let schema = self.schema();
let projected_schema = project_schema(&schema, projection)?;
return Ok(Arc::new(EmptyExec::new(projected_schema)));
}

let config = FileScanConfig {
object_store_url: self.base_url.clone(),
file_schema: self.arrow_schema.clone(),
Expand All @@ -331,9 +349,20 @@ impl TableProvider for ObjStoreTableProvider {
.create_physical_plan(ctx, config, filters.as_ref())
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;

Ok(Arc::new(ReadOnlyDataSourceMetricsExecAdapter::new(plan)))
}

fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> std::result::Result<Vec<TableProviderFilterPushDown>, datafusion::error::DataFusionError>
{
// todo: support exact pushdonws based on hive style partitioning
filters
.iter()
.map(|_| Ok(TableProviderFilterPushDown::Inexact))
.collect()
}
}

pub fn file_type_from_path(path: &ObjectStorePath) -> Result<FileType> {
Expand Down
8 changes: 8 additions & 0 deletions testdata/sqllogictests/functions/parquet_scan.slt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ select count(*) from parquet_scan([
statement error No such file or directory
select * from parquet_scan('./testdata/parquet/userdata1.paruqet');


# filter pushdowns
# Skipping until we have a way to test the outputs of 'explain'. See https://github.com/GlareDB/glaredb/issues/2581
# query I
# select * from (explain select count(*) from parquet_scan('./testdata/parquet/userdata1.parquet') where country = 'Sweden') where plan_type = 'physical_plan' and plan like 'predicate=country@8 = Sweden, pruning_predicate=country_min@0 <= Sweden AND Sweden <= country_max@'
# ----


# Ambiguous name.
# query I
# select count(*)
Expand Down
Loading