Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ordered read_postgres tables #2596

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions crates/datasources/src/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ pub struct PostgresTableProviderConfig {
pub access: PostgresAccess,
pub schema: String,
pub table: String,
pub order_by: Option<String>,
}

impl TryFrom<protogen::sqlexec::table_provider::PostgresTableProviderConfig>
Expand All @@ -540,6 +541,7 @@ impl TryFrom<protogen::sqlexec::table_provider::PostgresTableProviderConfig>
access: value.access.required("postgres access")?,
schema: value.schema,
table: value.table,
order_by: value.order_by,
})
}
}
Expand All @@ -552,6 +554,7 @@ impl From<PostgresTableProviderConfig>
access: Some(value.access.into()),
schema: value.schema,
table: value.table,
order_by: value.order_by,
}
}
}
Expand All @@ -562,6 +565,7 @@ pub struct PostgresTableProvider {
schema: String,
/// Table we're accessing.
table: String,
order_by: Option<String>,
state: Arc<PostgresAccessState>,
arrow_schema: ArrowSchemaRef,
pg_types: Arc<Vec<PostgresType>>,
Expand All @@ -577,6 +581,7 @@ impl PostgresTableProvider {
access,
schema,
table,
order_by,
} = conf;

let state = Arc::new(access.connect().await?);
Expand All @@ -586,6 +591,7 @@ impl PostgresTableProvider {
schema,
table,
state,
order_by,
arrow_schema: Arc::new(arrow_schema),
pg_types: Arc::new(pg_types),
})
Expand Down Expand Up @@ -664,9 +670,17 @@ impl TableProvider for PostgresTableProvider {
.map_err(|e| DataFusionError::External(Box::new(e)))?
};

let order_by_string = {
if self.order_by.as_ref().is_some_and(|stmt| !stmt.is_empty()) {
format!("ORDER BY {}", self.order_by.as_ref().unwrap())
} else {
"".to_string()
}
};

// Build copy query.
let query = format!(
"COPY (SELECT {} FROM {}.{} {} {} {}) TO STDOUT (FORMAT binary)",
"COPY (SELECT {} FROM {}.{} {} {} {} {}) TO STDOUT (FORMAT binary)",
projection_string, // SELECT <str>
self.schema, // FROM <schema>
self.table, // .<table>
Expand All @@ -677,7 +691,8 @@ impl TableProvider for PostgresTableProvider {
"WHERE "
},
predicate_string.as_str(), // <where-predicate>
limit_string, // [LIMIT ..]
order_by_string.as_str(),
limit_string, // [LIMIT ..]
);

let exec = PostgresBinaryCopyExec::try_new(BinaryCopyConfig::State {
Expand Down Expand Up @@ -736,7 +751,7 @@ impl TableProvider for PostgresTableProvider {

debug!(%query, "inserting into postgres datasource");

let exec = PostgresQueryExec::new(query, self.state.clone());
let exec = PostgresQueryExec::new(query, self.state.clone(), self.order_by.clone());
Ok(Arc::new(exec))
}
}
Expand Down Expand Up @@ -882,6 +897,7 @@ impl ExecutionPlan for PostgresBinaryCopyExec {
opener: self.opener.clone(),
arrow_schema: self.arrow_schema.clone(),
};

Ok(Box::pin(DataSourceMetricsStreamAdapter::new(
stream,
partition,
Expand Down
59 changes: 55 additions & 4 deletions crates/datasources/src/postgres/query_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use datafusion::arrow::compute::SortOptions;
use datafusion::arrow::datatypes::Schema as ArrowSchema;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::DFSchema;
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::execution::TaskContext;
use datafusion::logical_expr::expr::Sort;
use datafusion::physical_expr::execution_props::ExecutionProps;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
Expand All @@ -19,6 +23,8 @@ use datafusion::physical_plan::{
SendableRecordBatchStream,
Statistics,
};
use datafusion::prelude::{Column, Expr};
use datafusion::sql::sqlparser::dialect::GenericDialect;
use datafusion_ext::metrics::DataSourceMetricsStreamAdapter;
use futures::future::BoxFuture;
use futures::{ready, FutureExt, Stream};
Expand All @@ -31,15 +37,60 @@ pub struct PostgresQueryExec {
query: String,
state: Arc<PostgresAccessState>,
metrics: ExecutionPlanMetricsSet,
order_by: Option<String>,
sort_spec: Option<Vec<PhysicalSortExpr>>,
}

impl PostgresQueryExec {
pub fn new(query: String, state: Arc<PostgresAccessState>) -> Self {
PostgresQueryExec {
pub fn new(query: String, state: Arc<PostgresAccessState>, order_by: Option<String>) -> Self {
let mut obj = PostgresQueryExec {
query,
state,
order_by,
metrics: ExecutionPlanMetricsSet::new(),
}
sort_spec: None,
};
obj.sort_spec = obj.build_sort();
obj
}

fn build_sort(&mut self) -> Option<Vec<PhysicalSortExpr>> {
let stmt = self.order_by.clone()?;

let parser = datafusion::sql::sqlparser::parser::Parser::new(&GenericDialect);

let expr = parser
.try_with_sql(stmt.as_str())
.ok()?
.parse_order_by_expr()
.ok()?;

let expression = Expr::Sort(Sort {
expr: Box::new(match expr.expr {
datafusion::sql::sqlparser::ast::Expr::Identifier(col) => {
Some(Expr::Column(Column::from_name(col.value)))
}
_ => return None,
}?),
asc: expr.asc.unwrap_or(false),
nulls_first: expr.nulls_first.unwrap_or(false),
});
let dfschema = DFSchema::try_from(self.schema().as_ref().to_owned()).ok()?;

let pxpr = datafusion::physical_expr::create_physical_expr(
&expression,
&dfschema,
&self.schema(),
&ExecutionProps::new(),
)
.ok()?;
Some(vec![PhysicalSortExpr {
expr: pxpr,
options: SortOptions {
nulls_first: expr.nulls_first.unwrap_or(false),
descending: !expr.asc.unwrap_or(false),
},
}])
}
}

Expand All @@ -57,7 +108,7 @@ impl ExecutionPlan for PostgresQueryExec {
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
self.sort_spec.as_ref().map(|v| v.as_ref())
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
Expand Down
2 changes: 2 additions & 0 deletions crates/protogen/src/sqlexec/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ pub struct PostgresTableProviderConfig {
pub schema: String,
#[prost(string, tag = "3")]
pub table: String,
#[prost(message, tag = "4")]
pub order_by: Option<String>,
}
41 changes: 22 additions & 19 deletions crates/sqlbuiltins/src/functions/table/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,26 +47,29 @@ impl TableFunc for ReadPostgres {
args: Vec<FuncParamValue>,
_opts: HashMap<String, FuncParamValue>,
) -> Result<Arc<dyn TableProvider>> {
match args.len() {
3 => {
let mut args = args.into_iter();
let conn_str: String = args.next().unwrap().try_into()?;
let schema: String = args.next().unwrap().try_into()?;
let table: String = args.next().unwrap().try_into()?;
if args.len() < 3 || args.len() > 4 {
return Err(ExtensionError::InvalidNumArgs);
}

let access = PostgresAccess::new_from_conn_str(conn_str, None);
let prov_conf = PostgresTableProviderConfig {
access,
schema,
table,
};
let prov = PostgresTableProvider::try_new(prov_conf)
.await
.map_err(|e| ExtensionError::Access(Box::new(e)))?;
let mut args = args.into_iter();
let conn_str: String = args.next().unwrap().try_into()?;
let schema: String = args.next().unwrap().try_into()?;
let table: String = args.next().unwrap().try_into()?;

Ok(Arc::new(prov))
}
_ => Err(ExtensionError::InvalidNumArgs),
}
let access = PostgresAccess::new_from_conn_str(conn_str, None);

let order_by: Option<String> = args.next().and_then(|p| p.try_into().ok());

let prov_conf = PostgresTableProviderConfig {
access,
schema,
table,
order_by,
};
let prov = PostgresTableProvider::try_new(prov_conf)
.await
.map_err(|e| ExtensionError::Access(Box::new(e)))?;

Ok(Arc::new(prov))
}
}
2 changes: 2 additions & 0 deletions crates/sqlexec/src/dispatch/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ impl<'a> ExternalDispatcher<'a> {
access,
schema: schema.to_owned(),
table: name.to_owned(),
order_by: None, // TODO: add pushdown sorts via un-materialized view,
};
let prov = PostgresTableProvider::try_new(prov_conf).await?;
Ok(Arc::new(prov))
Expand Down Expand Up @@ -290,6 +291,7 @@ impl<'a> ExternalDispatcher<'a> {
access,
schema: schema.to_owned(),
table: table.to_owned(),
order_by: None, // TODO: add pushdown sorts via un-materialized view,
};
let prov = PostgresTableProvider::try_new(prov_conf).await?;
Ok(Arc::new(prov))
Expand Down
52 changes: 52 additions & 0 deletions testdata/sqllogictests_postgres/read.slt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,55 @@ query I
SELECT count(*) FROM read_postgres('${POSTGRES_CONN_STRING}', 'public', 'bikeshare_stations');
----
102


# order by passthrough in function. select first and last items
#
query T
SELECT station_id = 0 FROM read_postgres('${POSTGRES_CONN_STRING}', 'public', 'bikeshare_stations', 'station_id asc') LIMIT 1;
----
t

query T
SELECT station_id = 4879 FROM read_postgres('${POSTGRES_CONN_STRING}', 'public', 'bikeshare_stations', 'station_id desc') LIMIT 1;
----
t


# ensure case sensitivity:
#
query T
SELECT station_id = 0 FROM read_postgres('${POSTGRES_CONN_STRING}', 'public', 'bikeshare_stations', 'station_id ASC') LIMIT 1;
----
t

query T
SELECT station_id = 4879 FROM read_postgres('${POSTGRES_CONN_STRING}', 'public', 'bikeshare_stations', 'station_id DESC') LIMIT 1;
----
t


# reversed sort order to expectations, always incorrect
#
query T
SELECT station_id = 4879 FROM read_postgres('${POSTGRES_CONN_STRING}', 'public', 'bikeshare_stations', 'station_id asc') LIMIT 1;
----
f

query T
SELECT station_id = 0 FROM read_postgres('${POSTGRES_CONN_STRING}', 'public', 'bikeshare_stations', 'station_id desc') LIMIT 1;
----
f


# without sorts, possibly non-deterministic depending on output/scan order:
#
query T
SELECT station_id = 4879 FROM read_postgres('${POSTGRES_CONN_STRING}', 'public', 'bikeshare_stations') LIMIT 1;
----
f

query T
SELECT station_id = 0 FROM read_postgres('${POSTGRES_CONN_STRING}', 'public', 'bikeshare_stations') LIMIT 1;
----
f
Loading