diff --git a/src/dataframe.rs b/src/dataframe.rs index 1f7f2e64..73b261bc 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -31,6 +31,8 @@ use datafusion::common::UnnestOptions; use datafusion::config::{CsvOptions, TableParquetOptions}; use datafusion::dataframe::{DataFrame, DataFrameWriteOptions}; use datafusion::execution::SendableRecordBatchStream; +use datafusion::logical_expr::utils::find_window_exprs; +use datafusion::logical_expr::LogicalPlanBuilder; use datafusion::parquet::basic::{BrotliLevel, Compression, GzipLevel, ZstdLevel}; use datafusion::prelude::*; use pyo3::exceptions::{PyTypeError, PyValueError}; @@ -176,6 +178,56 @@ impl PyDataFrame { } fn with_column(&self, name: &str, expr: PyExpr) -> PyResult { + println!("\n\n\nadding column: {:?} with expr: {:?}", name, expr); + let df = self.df.as_ref().clone(); + let plan = df.logical_plan().clone(); + let expr = expr.expr; + let window_func_exprs = find_window_exprs(&[expr.clone()]); + println!("window_func_exprs: {:?}", window_func_exprs); + let (plan, mut col_exists, window_func) = if window_func_exprs.is_empty() { + (plan, false, false) + } else { + ( + LogicalPlanBuilder::window_plan(plan, window_func_exprs)?, + true, + true, + ) + }; + println!("col_exists: {:?}, window_func: {:?}", col_exists, window_func); + println!("plan: {}", plan.display_indent()); + + let new_column = expr.clone().alias(name); + let mut fields: Vec = plan + .schema() + .iter() + .map(|(qualifier, field)| { + println!("qualifier: {:?}, field: {:?}", qualifier, field); + if field.name() == name { + println!("adding new column with same name"); + col_exists = true; + new_column.clone() + } else if window_func && qualifier.is_none() { + println!("adding window function with alias"); + col(Column::from((qualifier, field))).alias(name) + } else { + println!("adding column"); + col(Column::from((qualifier, field))) + } + }) + .collect(); + + if !col_exists { + println!("col does not exist - pushing {:?}", new_column); + fields.push(new_column); + } else { + println!("col exists - not pushing {:?}", new_column); + } + + let project_plan = LogicalPlanBuilder::from(plan).project(fields)?.build()?; + println!("project_plan: {}", project_plan.display_indent()); + + + // Ok(DataFrame::new(df.session_state, project_plan)) let df = self.df.as_ref().clone().with_column(name, expr.into())?; Ok(Self::new(df)) }