Skip to content

Commit

Permalink
add debug prints to Dataframe::with_column
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael-J-Ward committed Sep 10, 2024
1 parent 7f6187a commit 6014e8c
Showing 1 changed file with 52 additions and 0 deletions.
52 changes: 52 additions & 0 deletions src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ use datafusion::common::UnnestOptions;
use datafusion::config::{CsvOptions, TableParquetOptions};
use datafusion::dataframe::{DataFrame, DataFrameWriteOptions};
use datafusion::execution::SendableRecordBatchStream;
use datafusion::logical_expr::utils::find_window_exprs;
use datafusion::logical_expr::LogicalPlanBuilder;
use datafusion::parquet::basic::{BrotliLevel, Compression, GzipLevel, ZstdLevel};
use datafusion::prelude::*;
use pyo3::exceptions::{PyTypeError, PyValueError};
Expand Down Expand Up @@ -176,6 +178,56 @@ impl PyDataFrame {
}

fn with_column(&self, name: &str, expr: PyExpr) -> PyResult<Self> {
println!("\n\n\nadding column: {:?} with expr: {:?}", name, expr);
let df = self.df.as_ref().clone();
let plan = df.logical_plan().clone();
let expr = expr.expr;
let window_func_exprs = find_window_exprs(&[expr.clone()]);
println!("window_func_exprs: {:?}", window_func_exprs);
let (plan, mut col_exists, window_func) = if window_func_exprs.is_empty() {
(plan, false, false)
} else {
(
LogicalPlanBuilder::window_plan(plan, window_func_exprs)?,
true,
true,
)
};
println!("col_exists: {:?}, window_func: {:?}", col_exists, window_func);
println!("plan: {}", plan.display_indent());

let new_column = expr.clone().alias(name);
let mut fields: Vec<Expr> = plan
.schema()
.iter()
.map(|(qualifier, field)| {
println!("qualifier: {:?}, field: {:?}", qualifier, field);
if field.name() == name {
println!("adding new column with same name");
col_exists = true;
new_column.clone()
} else if window_func && qualifier.is_none() {
println!("adding window function with alias");
col(Column::from((qualifier, field))).alias(name)
} else {
println!("adding column");
col(Column::from((qualifier, field)))
}
})
.collect();

if !col_exists {
println!("col does not exist - pushing {:?}", new_column);
fields.push(new_column);
} else {
println!("col exists - not pushing {:?}", new_column);
}

let project_plan = LogicalPlanBuilder::from(plan).project(fields)?.build()?;
println!("project_plan: {}", project_plan.display_indent());


// Ok(DataFrame::new(df.session_state, project_plan))
let df = self.df.as_ref().clone().with_column(name, expr.into())?;
Ok(Self::new(df))
}
Expand Down

0 comments on commit 6014e8c

Please sign in to comment.