Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix projection name with DataFrame::with_column and window functions #12000

Merged
merged 9 commits into from
Aug 17, 2024
44 changes: 40 additions & 4 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1441,21 +1441,27 @@ impl DataFrame {
/// ```
pub fn with_column(self, name: &str, expr: Expr) -> Result<DataFrame> {
let window_func_exprs = find_window_exprs(&[expr.clone()]);
let plan = if window_func_exprs.is_empty() {
self.plan

let (plan, mut col_exists, window_func) = if window_func_exprs.is_empty() {
(self.plan, false, false)
} else {
LogicalPlanBuilder::window_plan(self.plan, window_func_exprs)?
(
LogicalPlanBuilder::window_plan(self.plan, window_func_exprs)?,
true,
true,
)
};

let new_column = expr.alias(name);
let mut col_exists = false;
let mut fields: Vec<Expr> = plan
.schema()
.iter()
.map(|(qualifier, field)| {
if field.name() == name {
col_exists = true;
new_column.clone()
} else if window_func && qualifier.is_none() {
col(Column::from((qualifier, field))).alias(name)
} else {
col(Column::from((qualifier, field)))
}
Expand Down Expand Up @@ -1704,6 +1710,7 @@ mod tests {
use datafusion_common::{Constraint, Constraints, ScalarValue};
use datafusion_common_runtime::SpawnedTask;
use datafusion_expr::expr::WindowFunction;
use datafusion_expr::window_function::row_number;
use datafusion_expr::{
cast, create_udf, expr, lit, BuiltInWindowFunction, ExprFunctionExt,
ScalarFunctionImplementation, Volatility, WindowFrame, WindowFrameBound,
Expand Down Expand Up @@ -2956,6 +2963,35 @@ mod tests {
Ok(())
}

// Test issue: https://github.com/apache/datafusion/issues/11982
// Window function was creating unwanted projection when using with_column() method.
#[tokio::test]
async fn test_window_function_with_column() -> Result<()> {
let df = test_table().await?.select_columns(&["c1", "c2", "c3"])?;
let ctx = SessionContext::new();
let df_impl = DataFrame::new(ctx.state(), df.plan.clone());
let func = row_number().alias("row_num");

// Should create an additional column with alias 'r' that has window func results
let df = df_impl.with_column("r", func)?.limit(0, Some(2))?;
assert_eq!(4, df.schema().fields().len());

let df_results = df.clone().collect().await?;
assert_batches_sorted_eq!(
[
"+----+----+-----+---+",
"| c1 | c2 | c3 | r |",
"+----+----+-----+---+",
"| c | 2 | 1 | 1 |",
"| d | 5 | -40 | 2 |",
"+----+----+-----+---+",
],
&df_results
);

Ok(())
}

// Test issue: https://github.com/apache/datafusion/issues/7790
// The join operation outputs two identical column names, but they belong to different relations.
#[tokio::test]
Expand Down
Loading