-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement DISTINCT ON
from Postgres
#7981
Changes from 7 commits
deff43f
553f87a
04ce375
148fee2
26e9082
a5cff4e
525523a
5f0a26b
2730bbd
9b41907
7c232c8
57ffea9
5fdbef4
5f0cf19
2ee64cd
2bbcd7b
5d89700
8467736
8ebdc13
d2bc2b4
e74e1f8
a912cad
9f37d8f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,8 +25,8 @@ use std::sync::Arc; | |
use super::dml::CopyTo; | ||
use super::DdlStatement; | ||
use crate::dml::CopyOptions; | ||
use crate::expr::{Alias, Exists, InSubquery, Placeholder}; | ||
use crate::expr_rewriter::create_col_from_scalar_expr; | ||
use crate::expr::{Alias, Exists, InSubquery, Placeholder, Sort as SortExpr}; | ||
use crate::expr_rewriter::{create_col_from_scalar_expr, normalize_cols}; | ||
use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor}; | ||
use crate::logical_plan::extension::UserDefinedLogicalNode; | ||
use crate::logical_plan::{DmlStatement, Statement}; | ||
|
@@ -163,7 +163,8 @@ impl LogicalPlan { | |
}) => projected_schema, | ||
LogicalPlan::Projection(Projection { schema, .. }) => schema, | ||
LogicalPlan::Filter(Filter { input, .. }) => input.schema(), | ||
LogicalPlan::Distinct(Distinct { input }) => input.schema(), | ||
LogicalPlan::Distinct(Distinct::All(input)) => input.schema(), | ||
LogicalPlan::Distinct(Distinct::On(DistinctOn { schema, .. })) => schema, | ||
LogicalPlan::Window(Window { schema, .. }) => schema, | ||
LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema, | ||
LogicalPlan::Sort(Sort { input, .. }) => input.schema(), | ||
|
@@ -367,6 +368,16 @@ impl LogicalPlan { | |
LogicalPlan::Unnest(Unnest { column, .. }) => { | ||
f(&Expr::Column(column.clone())) | ||
} | ||
LogicalPlan::Distinct(Distinct::On(DistinctOn { | ||
on_expr, | ||
select_expr, | ||
sort_expr, | ||
.. | ||
})) => on_expr | ||
.iter() | ||
.chain(select_expr.iter()) | ||
.chain(sort_expr.clone().unwrap_or(vec![]).iter()) | ||
.try_for_each(f), | ||
// plans without expressions | ||
LogicalPlan::EmptyRelation(_) | ||
| LogicalPlan::Subquery(_) | ||
|
@@ -377,7 +388,7 @@ impl LogicalPlan { | |
| LogicalPlan::Analyze(_) | ||
| LogicalPlan::Explain(_) | ||
| LogicalPlan::Union(_) | ||
| LogicalPlan::Distinct(_) | ||
| LogicalPlan::Distinct(Distinct::All(_)) | ||
| LogicalPlan::Dml(_) | ||
| LogicalPlan::Ddl(_) | ||
| LogicalPlan::Copy(_) | ||
|
@@ -405,7 +416,9 @@ impl LogicalPlan { | |
LogicalPlan::Union(Union { inputs, .. }) => { | ||
inputs.iter().map(|arc| arc.as_ref()).collect() | ||
} | ||
LogicalPlan::Distinct(Distinct { input }) => vec![input], | ||
LogicalPlan::Distinct( | ||
Distinct::All(input) | Distinct::On(DistinctOn { input, .. }), | ||
) => vec![input], | ||
LogicalPlan::Explain(explain) => vec![&explain.plan], | ||
LogicalPlan::Analyze(analyze) => vec![&analyze.input], | ||
LogicalPlan::Dml(write) => vec![&write.input], | ||
|
@@ -461,8 +474,11 @@ impl LogicalPlan { | |
Ok(Some(agg.group_expr.as_slice()[0].clone())) | ||
} | ||
} | ||
LogicalPlan::Distinct(Distinct::On(DistinctOn { select_expr, .. })) => { | ||
Ok(Some(select_expr[0].clone())) | ||
} | ||
LogicalPlan::Filter(Filter { input, .. }) | ||
| LogicalPlan::Distinct(Distinct { input, .. }) | ||
| LogicalPlan::Distinct(Distinct::All(input)) | ||
| LogicalPlan::Sort(Sort { input, .. }) | ||
| LogicalPlan::Limit(Limit { input, .. }) | ||
| LogicalPlan::Repartition(Repartition { input, .. }) | ||
|
@@ -823,10 +839,29 @@ impl LogicalPlan { | |
inputs: inputs.iter().cloned().map(Arc::new).collect(), | ||
schema: schema.clone(), | ||
})), | ||
LogicalPlan::Distinct(Distinct { .. }) => { | ||
Ok(LogicalPlan::Distinct(Distinct { | ||
input: Arc::new(inputs[0].clone()), | ||
})) | ||
LogicalPlan::Distinct(distinct) => { | ||
let distinct = match distinct { | ||
Distinct::All(_) => Distinct::All(Arc::new(inputs[0].clone())), | ||
Distinct::On(DistinctOn { | ||
on_expr, | ||
select_expr, | ||
.. | ||
}) => { | ||
let sort_expr = expr.split_off(on_expr.len() + select_expr.len()); | ||
let select_expr = expr.split_off(on_expr.len()); | ||
Distinct::On(DistinctOn::try_new( | ||
expr, | ||
select_expr, | ||
if !sort_expr.is_empty() { | ||
Some(sort_expr) | ||
} else { | ||
None | ||
}, | ||
Arc::new(inputs[0].clone()), | ||
)?) | ||
} | ||
}; | ||
Ok(LogicalPlan::Distinct(distinct)) | ||
} | ||
LogicalPlan::Analyze(a) => { | ||
assert!(expr.is_empty()); | ||
|
@@ -1064,7 +1099,9 @@ impl LogicalPlan { | |
LogicalPlan::Subquery(_) => None, | ||
LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => input.max_rows(), | ||
LogicalPlan::Limit(Limit { fetch, .. }) => *fetch, | ||
LogicalPlan::Distinct(Distinct { input }) => input.max_rows(), | ||
LogicalPlan::Distinct( | ||
Distinct::All(input) | Distinct::On(DistinctOn { input, .. }), | ||
) => input.max_rows(), | ||
LogicalPlan::Values(v) => Some(v.values.len()), | ||
LogicalPlan::Unnest(_) => None, | ||
LogicalPlan::Ddl(_) | ||
|
@@ -1667,9 +1704,21 @@ impl LogicalPlan { | |
LogicalPlan::Statement(statement) => { | ||
write!(f, "{}", statement.display()) | ||
} | ||
LogicalPlan::Distinct(Distinct { .. }) => { | ||
write!(f, "Distinct:") | ||
} | ||
LogicalPlan::Distinct(distinct) => match distinct { | ||
Distinct::All(_) => write!(f, "Distinct:"), | ||
Distinct::On(DistinctOn { | ||
on_expr, | ||
select_expr, | ||
sort_expr, | ||
.. | ||
}) => write!( | ||
f, | ||
"DistinctOn: on_expr=[[{}]], select_expr=[[{}]], sort_expr=[[{}]]", | ||
expr_vec_fmt!(on_expr), | ||
expr_vec_fmt!(select_expr), | ||
if let Some(sort_expr) = sort_expr { expr_vec_fmt!(sort_expr) } else { "".to_string() }, | ||
), | ||
}, | ||
LogicalPlan::Explain { .. } => write!(f, "Explain"), | ||
LogicalPlan::Analyze { .. } => write!(f, "Analyze"), | ||
LogicalPlan::Union(_) => write!(f, "Union"), | ||
|
@@ -2132,9 +2181,96 @@ pub struct Limit { | |
|
||
/// Removes duplicate rows from the input | ||
#[derive(Clone, PartialEq, Eq, Hash)] | ||
pub struct Distinct { | ||
pub enum Distinct { | ||
/// Plain `DISTINCT` referencing all selection expressions | ||
All(Arc<LogicalPlan>), | ||
/// The `Postgres` addition, allowing separate control over DISTINCT'd and selected columns | ||
On(DistinctOn), | ||
} | ||
|
||
/// Removes duplicate rows from the input | ||
#[derive(Clone, PartialEq, Eq, Hash)] | ||
pub struct DistinctOn { | ||
/// The `DISTINCT ON` clause expression list | ||
pub on_expr: Vec<Expr>, | ||
/// The selected projection expression list | ||
pub select_expr: Vec<Expr>, | ||
/// The `ORDER BY` clause, whose initial expressions must match those of the `ON` clause | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since the same list of exprs is included in both the
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, Condensing So in order to keep them both in one vec we'd have to: a) add an additional field to This strikes me as less clear all in all, though if you'd prefer it I'll make that change. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think your explanation makes sense. Maybe we can take some of this rationale (e.g. that |
||
pub sort_expr: Option<Vec<Expr>>, | ||
/// The logical plan that is being DISTINCT'd | ||
pub input: Arc<LogicalPlan>, | ||
/// The schema description of the DISTINCT ON output | ||
pub schema: DFSchemaRef, | ||
} | ||
|
||
impl DistinctOn { | ||
/// Create a new `DistintOn` struct. | ||
pub fn try_new( | ||
on_expr: Vec<Expr>, | ||
select_expr: Vec<Expr>, | ||
sort_expr: Option<Vec<Expr>>, | ||
input: Arc<LogicalPlan>, | ||
) -> Result<Self> { | ||
let on_expr = normalize_cols(on_expr, input.as_ref())?; | ||
|
||
// Create fields with any qualifier stuffed in the name itself | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is sort of a hack, but the reason I was forced to do it is so that the plan schema remains unchanged after I feel like this case should be handled by UPDATE: I've opened a related issue for this: #8008 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Note #7997 may be related |
||
let fields = exprlist_to_fields(&select_expr, &input)? | ||
.iter() | ||
.map(|f| { | ||
DFField::new_unqualified( | ||
&f.qualified_name(), | ||
f.data_type().clone(), | ||
f.is_nullable(), | ||
) | ||
}) | ||
.collect(); | ||
let schema = | ||
DFSchema::new_with_metadata(fields, input.schema().metadata().clone())?; | ||
|
||
let mut distinct_on = DistinctOn { | ||
on_expr, | ||
select_expr, | ||
sort_expr: None, | ||
input, | ||
schema: Arc::new(schema), | ||
}; | ||
|
||
if let Some(sort_expr) = sort_expr { | ||
distinct_on = distinct_on.with_sort_expr(sort_expr)?; | ||
} | ||
|
||
Ok(distinct_on) | ||
} | ||
|
||
/// Try to update `self` with a new sort expressions. | ||
/// | ||
/// Validates that the sort expressions are a super-set of the `ON` expressions. | ||
pub fn with_sort_expr(mut self, sort_expr: Vec<Expr>) -> Result<Self> { | ||
let sort_expr = normalize_cols(sort_expr, self.input.as_ref())?; | ||
|
||
// Check that the left-most sort expressions are the same as the `ON` expressions. | ||
let mut matched = true; | ||
for (on, sort) in self.on_expr.iter().zip(sort_expr.iter()) { | ||
match sort { | ||
Expr::Sort(SortExpr { expr, .. }) => { | ||
if on != &**expr { | ||
matched = false; | ||
break; | ||
} | ||
} | ||
_ => return plan_err!("Not a sort expression: {sort}"), | ||
} | ||
} | ||
|
||
if self.on_expr.len() > sort_expr.len() || !matched { | ||
return plan_err!( | ||
"SELECT DISTINCT ON expressions must match initial ORDER BY expressions" | ||
); | ||
} | ||
|
||
self.sort_expr = Some(sort_expr); | ||
Ok(self) | ||
} | ||
} | ||
|
||
/// Aggregates its input based on a set of grouping and aggregate | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -238,8 +238,17 @@ impl CommonSubexprEliminate { | |
let rewritten = pop_expr(&mut rewritten)?; | ||
|
||
if affected_id.is_empty() { | ||
// Alias aggregation epxressions if they have changed | ||
// TODO: This should have really been identified above and handled in the `else` branch | ||
let aggr_exprs = new_aggr_expr | ||
.iter() | ||
.zip(aggr_expr.iter()) | ||
.map(|(new_expr, old_expr)| { | ||
new_expr.clone().alias_if_changed(old_expr.display_name()?) | ||
}) | ||
.collect::<Result<Vec<Expr>>>()?; | ||
// Since group_epxr changes, schema changes also. Use try_new method. | ||
Aggregate::try_new(Arc::new(new_input), new_group_expr, new_aggr_expr) | ||
Aggregate::try_new(Arc::new(new_input), new_group_expr, aggr_exprs) | ||
.map(LogicalPlan::Aggregate) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't yet understand why this wasn't identified/aliased automatically, but I found this part to be necessary in order to get There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I think I understand what is going on:
I think this explains the behavior I've encountered here, and that this behavior is not anomalous. The only question is whether there is a better way to align the new aggregation schema with the old one besides aliasing as above. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe @waynexia has some thoughts There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Btw, I've now clarified this comment since it was misleading, i.e. the aliasing should really be done in |
||
} else { | ||
let mut agg_exprs = vec![]; | ||
|
@@ -367,7 +376,7 @@ impl OptimizerRule for CommonSubexprEliminate { | |
Ok(Some(build_recover_project_plan( | ||
&original_schema, | ||
optimized_plan, | ||
))) | ||
)?)) | ||
} | ||
plan => Ok(plan), | ||
} | ||
|
@@ -458,16 +467,19 @@ fn build_common_expr_project_plan( | |
/// the "intermediate" projection plan built in [build_common_expr_project_plan]. | ||
/// | ||
/// This is for those plans who don't keep its own output schema like `Filter` or `Sort`. | ||
fn build_recover_project_plan(schema: &DFSchema, input: LogicalPlan) -> LogicalPlan { | ||
fn build_recover_project_plan( | ||
schema: &DFSchema, | ||
input: LogicalPlan, | ||
) -> Result<LogicalPlan> { | ||
let col_exprs = schema | ||
.fields() | ||
.iter() | ||
.map(|field| Expr::Column(field.qualified_column())) | ||
.collect(); | ||
LogicalPlan::Projection( | ||
Projection::try_new(col_exprs, Arc::new(input)) | ||
.expect("Cannot build projection plan from an invalid schema"), | ||
) | ||
Ok(LogicalPlan::Projection(Projection::try_new( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
col_exprs, | ||
Arc::new(input), | ||
)?)) | ||
} | ||
|
||
fn extract_expressions( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add
api-change
label for this diff