Skip to content

Commit

Permalink
Fix projection name with DataFrame::with_column and window functions (#…
Browse files Browse the repository at this point in the history
…12000)

* fix/11982: resolves projection issue found in with_column window fn usage

Signed-off-by: Devan <devandbenz@gmail.com>

* fix/11982: resolves projection issue found in with_column window fn usage

Signed-off-by: Devan <devandbenz@gmail.com>

* fmt

Signed-off-by: Devan <devandbenz@gmail.com>

* fmt

Signed-off-by: Devan <devandbenz@gmail.com>

* refactor to get tests working

Signed-off-by: Devan <devandbenz@gmail.com>

* change test to use test harness

Signed-off-by: Devan <devandbenz@gmail.com>

* use row_number method and add comment about test

Signed-off-by: Devan <devandbenz@gmail.com>

* add back import

Signed-off-by: Devan <devandbenz@gmail.com>

---------

Signed-off-by: Devan <devandbenz@gmail.com>
  • Loading branch information
devanbenz authored Aug 17, 2024
1 parent bd2d4ee commit 48416e5
Showing 1 changed file with 40 additions and 4 deletions.
44 changes: 40 additions & 4 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1441,21 +1441,27 @@ impl DataFrame {
/// ```
pub fn with_column(self, name: &str, expr: Expr) -> Result<DataFrame> {
let window_func_exprs = find_window_exprs(&[expr.clone()]);
let plan = if window_func_exprs.is_empty() {
self.plan

let (plan, mut col_exists, window_func) = if window_func_exprs.is_empty() {
(self.plan, false, false)
} else {
LogicalPlanBuilder::window_plan(self.plan, window_func_exprs)?
(
LogicalPlanBuilder::window_plan(self.plan, window_func_exprs)?,
true,
true,
)
};

let new_column = expr.alias(name);
let mut col_exists = false;
let mut fields: Vec<Expr> = plan
.schema()
.iter()
.map(|(qualifier, field)| {
if field.name() == name {
col_exists = true;
new_column.clone()
} else if window_func && qualifier.is_none() {
col(Column::from((qualifier, field))).alias(name)
} else {
col(Column::from((qualifier, field)))
}
Expand Down Expand Up @@ -1704,6 +1710,7 @@ mod tests {
use datafusion_common::{Constraint, Constraints, ScalarValue};
use datafusion_common_runtime::SpawnedTask;
use datafusion_expr::expr::WindowFunction;
use datafusion_expr::window_function::row_number;
use datafusion_expr::{
cast, create_udf, expr, lit, BuiltInWindowFunction, ExprFunctionExt,
ScalarFunctionImplementation, Volatility, WindowFrame, WindowFrameBound,
Expand Down Expand Up @@ -2956,6 +2963,35 @@ mod tests {
Ok(())
}

// Test issue: https://github.com/apache/datafusion/issues/11982
// Window function was creating unwanted projection when using with_column() method.
#[tokio::test]
async fn test_window_function_with_column() -> Result<()> {
let df = test_table().await?.select_columns(&["c1", "c2", "c3"])?;
let ctx = SessionContext::new();
let df_impl = DataFrame::new(ctx.state(), df.plan.clone());
let func = row_number().alias("row_num");

// Should create an additional column with alias 'r' that has window func results
let df = df_impl.with_column("r", func)?.limit(0, Some(2))?;
assert_eq!(4, df.schema().fields().len());

let df_results = df.clone().collect().await?;
assert_batches_sorted_eq!(
[
"+----+----+-----+---+",
"| c1 | c2 | c3 | r |",
"+----+----+-----+---+",
"| c | 2 | 1 | 1 |",
"| d | 5 | -40 | 2 |",
"+----+----+-----+---+",
],
&df_results
);

Ok(())
}

// Test issue: https://github.com/apache/datafusion/issues/7790
// The join operation outputs two identical column names, but they belong to different relations.
#[tokio::test]
Expand Down

0 comments on commit 48416e5

Please sign in to comment.