diff --git a/Cargo.lock b/Cargo.lock index ac9a653c22ce..867e15c15131 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1645,6 +1645,7 @@ dependencies = [ "ahash", "allocator-api2", "rayon", + "serde", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index bd4f667b33a0..879eb9047876 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,7 +42,7 @@ either = "1.9" ethnum = "1.3.2" fallible-streaming-iterator = "0.1.9" futures = "0.3.25" -hashbrown = { version = "0.14", features = ["rayon", "ahash"] } +hashbrown = { version = "0.14", features = ["rayon", "ahash", "serde"] } hex = "0.4.3" indexmap = { version = "2", features = ["std"] } itoa = "1.0.6" diff --git a/crates/polars-lazy/src/dsl/functions.rs b/crates/polars-lazy/src/dsl/functions.rs index 7476b8b57483..a08559a9d14d 100644 --- a/crates/polars-lazy/src/dsl/functions.rs +++ b/crates/polars-lazy/src/dsl/functions.rs @@ -75,11 +75,11 @@ pub(crate) fn concat_impl>( else { unreachable!() }; - let mut schema = inputs[0].schema()?.as_ref().as_ref().clone(); + let mut schema = inputs[0].compute_schema()?.as_ref().clone(); let mut changed = false; for input in inputs[1..].iter() { - changed |= schema.to_supertype(input.schema()?.as_ref().as_ref())?; + changed |= schema.to_supertype(input.compute_schema()?.as_ref())?; } let mut placeholder = DslPlan::default(); @@ -87,7 +87,7 @@ pub(crate) fn concat_impl>( let mut exprs = vec![]; for input in &mut inputs { std::mem::swap(input, &mut placeholder); - let input_schema = placeholder.schema()?; + let input_schema = placeholder.compute_schema()?; exprs.clear(); let to_cast = input_schema.iter().zip(schema.iter_dtypes()).flat_map( diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index 39ef52c3b820..c7c622f96540 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -8,7 +8,6 @@ mod exitable; #[cfg(feature = "pivot")] pub mod pivot; -use std::borrow::Cow; #[cfg(any( feature = "parquet", feature = "ipc", @@ -93,7 +92,7 @@ impl LazyFrame { /// Returns an `Err` if the logical plan has already encountered an error (i.e., if /// `self.collect()` would fail), `Ok` otherwise. pub fn schema(&self) -> PolarsResult { - self.logical_plan.schema().map(|schema| schema.into_owned()) + self.logical_plan.compute_schema() } pub(crate) fn get_plan_builder(self) -> DslBuilder { @@ -378,34 +377,6 @@ impl LazyFrame { self.select(vec![col("*").reverse()]) } - /// Check the if the `names` are available in the `schema`, if not - /// return a `LogicalPlan` that raises an `Error`. - fn check_names(&self, names: &[SmartString], schema: Option<&SchemaRef>) -> Option { - let schema = schema - .map(Cow::Borrowed) - .unwrap_or_else(|| Cow::Owned(self.schema().unwrap())); - - let mut opt_not_found = None; - names.iter().for_each(|name| { - let invalid = schema.get(name).is_none(); - - if invalid && opt_not_found.is_none() { - opt_not_found = Some(name) - } - }); - - if let Some(name) = opt_not_found { - let lp = self - .clone() - .get_plan_builder() - .add_err(polars_err!(SchemaFieldNotFound: "{}", name)) - .build(); - Some(Self::from_logical_plan(lp, self.opt_state)) - } else { - None - } - } - /// Rename columns in the DataFrame. /// /// `existing` and `new` are iterables of the same length containing the old and @@ -435,19 +406,10 @@ impl LazyFrame { } } - // a column gets swapped - let schema = &self.schema().unwrap(); - let swapping = new_vec.iter().any(|name| schema.get(name).is_some()); - - if let Some(lp) = self.check_names(&existing_vec, Some(schema)) { - lp - } else { - self.map_private(FunctionNode::Rename { - existing: existing_vec.into(), - new: new_vec.into(), - swapping, - }) - } + self.map_private(DslFunction::Rename { + existing: existing_vec.into(), + new: new_vec.into(), + }) } /// Removes columns from the DataFrame. @@ -1339,37 +1301,18 @@ impl LazyFrame { Self::from_logical_plan(lp, opt_state) } - fn stats_helper(self, condition: F, expr: E) -> PolarsResult - where - F: Fn(&DataType) -> bool, - E: Fn(&str) -> Expr, - { - let exprs = self - .schema()? - .iter() - .map(|(name, dt)| { - if condition(dt) { - expr(name) - } else { - lit(NULL).cast(dt.clone()).alias(name) - } - }) - .collect::>(); - Ok(self.select(exprs)) - } - /// Aggregate all the columns as their maximum values. /// /// Aggregated columns will have the same names as the original columns. - pub fn max(self) -> PolarsResult { - self.stats_helper(|dt| dt.is_ord(), |name| col(name).max()) + pub fn max(self) -> Self { + self.map_private(DslFunction::Stats(StatsFunction::Max)) } /// Aggregate all the columns as their minimum values. /// /// Aggregated columns will have the same names as the original columns. - pub fn min(self) -> PolarsResult { - self.stats_helper(|dt| dt.is_ord(), |name| col(name).min()) + pub fn min(self) -> Self { + self.map_private(DslFunction::Stats(StatsFunction::Min)) } /// Aggregate all the columns as their sum values. @@ -1381,35 +1324,16 @@ impl LazyFrame { /// in `debug` mode, overflows will panic, whereas in `release` mode overflows will /// silently wrap. /// - String columns will sum to None. - pub fn sum(self) -> PolarsResult { - self.stats_helper( - |dt| { - dt.is_numeric() - || dt.is_decimal() - || matches!(dt, DataType::Boolean | DataType::Duration(_)) - }, - |name| col(name).sum(), - ) + pub fn sum(self) -> Self { + self.map_private(DslFunction::Stats(StatsFunction::Sum)) } /// Aggregate all the columns as their mean values. /// /// - Boolean and integer columns are converted to `f64` before computing the mean. /// - String columns will have a mean of None. - pub fn mean(self) -> PolarsResult { - self.stats_helper( - |dt| { - dt.is_numeric() - || matches!( - dt, - DataType::Boolean - | DataType::Duration(_) - | DataType::Datetime(_, _) - | DataType::Time - ) - }, - |name| col(name).mean(), - ) + pub fn mean(self) -> Self { + self.map_private(DslFunction::Stats(StatsFunction::Mean)) } /// Aggregate all the columns as their median values. @@ -1417,32 +1341,16 @@ impl LazyFrame { /// - Boolean and integer results are converted to `f64`. However, they are still /// susceptible to overflow before this conversion occurs. /// - String columns will sum to None. - pub fn median(self) -> PolarsResult { - self.stats_helper( - |dt| { - dt.is_numeric() - || matches!( - dt, - DataType::Boolean - | DataType::Duration(_) - | DataType::Datetime(_, _) - | DataType::Time - ) - }, - |name| col(name).median(), - ) + pub fn median(self) -> Self { + self.map_private(DslFunction::Stats(StatsFunction::Median)) } /// Aggregate all the columns as their quantile values. - pub fn quantile( - self, - quantile: Expr, - interpol: QuantileInterpolOptions, - ) -> PolarsResult { - self.stats_helper( - |dt| dt.is_numeric(), - |name| col(name).quantile(quantile.clone(), interpol), - ) + pub fn quantile(self, quantile: Expr, interpol: QuantileInterpolOptions) -> Self { + self.map_private(DslFunction::Stats(StatsFunction::Quantile { + quantile, + interpol, + })) } /// Aggregate all the columns as their standard deviation values. @@ -1457,11 +1365,8 @@ impl LazyFrame { /// > standard deviation per se. /// /// Source: [Numpy](https://numpy.org/doc/stable/reference/generated/numpy.std.html#) - pub fn std(self, ddof: u8) -> PolarsResult { - self.stats_helper( - |dt| dt.is_numeric() || dt.is_bool(), - |name| col(name).std(ddof), - ) + pub fn std(self, ddof: u8) -> Self { + self.map_private(DslFunction::Stats(StatsFunction::Std { ddof })) } /// Aggregate all the columns as their variance values. @@ -1473,11 +1378,8 @@ impl LazyFrame { /// > likelihood estimate of the variance for normally distributed variables. /// /// Source: [Numpy](https://numpy.org/doc/stable/reference/generated/numpy.var.html#) - pub fn var(self, ddof: u8) -> PolarsResult { - self.stats_helper( - |dt| dt.is_numeric() || dt.is_bool(), - |name| col(name).var(ddof), - ) + pub fn var(self, ddof: u8) -> Self { + self.map_private(DslFunction::Stats(StatsFunction::Var { ddof })) } /// Apply explode operation. [See eager explode](polars_core::frame::DataFrame::explode). @@ -1592,7 +1494,7 @@ impl LazyFrame { /// See [`MeltArgs`] for information on how to melt a DataFrame. pub fn melt(self, args: MeltArgs) -> LazyFrame { let opt_state = self.get_opt_state(); - let lp = self.get_plan_builder().melt(Arc::new(args)).build(); + let lp = self.get_plan_builder().melt(args).build(); Self::from_logical_plan(lp, opt_state) } @@ -1655,7 +1557,7 @@ impl LazyFrame { Self::from_logical_plan(lp, opt_state) } - pub(crate) fn map_private(self, function: FunctionNode) -> LazyFrame { + pub(crate) fn map_private(self, function: DslFunction) -> LazyFrame { let opt_state = self.get_opt_state(); let lp = self.get_plan_builder().map_private(function).build(); Self::from_logical_plan(lp, opt_state) @@ -1673,7 +1575,6 @@ impl LazyFrame { let add_row_index_in_map = match &mut self.logical_plan { DslPlan::Scan { file_options: options, - file_info, scan_type, .. } if !matches!(scan_type, FileScan::Anonymous { .. }) => { @@ -1681,22 +1582,15 @@ impl LazyFrame { name: name.to_string(), offset: offset.unwrap_or(0), }); - file_info.schema = Arc::new( - file_info - .schema - .new_inserting_at_index(0, name.into(), IDX_DTYPE) - .unwrap(), - ); false }, _ => true, }; if add_row_index_in_map { - self.map_private(FunctionNode::RowIndex { + self.map_private(DslFunction::RowIndex { name: Arc::from(name), offset, - schema: Default::default(), }) } else { self @@ -1712,9 +1606,9 @@ impl LazyFrame { /// inserted as columns. #[cfg(feature = "dtype-struct")] pub fn unnest, S: AsRef>(self, cols: I) -> Self { - self.map_private(FunctionNode::Unnest { + self.map_private(DslFunction::FunctionNode(FunctionNode::Unnest { columns: cols.into_iter().map(|s| Arc::from(s.as_ref())).collect(), - }) + })) } #[cfg(feature = "merge_sorted")] @@ -1723,7 +1617,7 @@ impl LazyFrame { // this indicates until which chunk the data is from the left df // this trick allows us to reuse the `Union` architecture to get map over // two DataFrames - let left = self.map_private(FunctionNode::Rechunk); + let left = self.map_private(DslFunction::FunctionNode(FunctionNode::Rechunk)); let q = concat( &[left, other], UnionArgs { @@ -1732,9 +1626,11 @@ impl LazyFrame { ..Default::default() }, )?; - Ok(q.map_private(FunctionNode::MergeSorted { - column: Arc::from(key), - })) + Ok( + q.map_private(DslFunction::FunctionNode(FunctionNode::MergeSorted { + column: Arc::from(key), + })), + ) } } @@ -1847,10 +1743,9 @@ impl LazyGroupBy { let lp = DslPlan::GroupBy { input: Arc::new(self.logical_plan), - keys: Arc::new(self.keys), + keys: self.keys, aggs: vec![], - schema, - apply: Some(Arc::new(f)), + apply: Some((Arc::new(f), schema)), maintain_order: self.maintain_order, options: Arc::new(options), }; diff --git a/crates/polars-lazy/src/tests/err_msg.rs b/crates/polars-lazy/src/tests/err_msg.rs deleted file mode 100644 index 5f73f1c30c9a..000000000000 --- a/crates/polars-lazy/src/tests/err_msg.rs +++ /dev/null @@ -1,83 +0,0 @@ -use polars_core::error::ErrString; - -use super::*; - -const INITIAL_PROJECTION_STR: &str = r#"DF ["c1"]; PROJECT */1 COLUMNS; SELECTION: "None""#; - -fn make_df() -> LazyFrame { - df! [ "c1" => [0, 1] ].unwrap().lazy() -} - -fn assert_errors_eq(e1: &PolarsError, e2: &PolarsError) { - use PolarsError::*; - match (e1, e2) { - (ColumnNotFound(s1), ColumnNotFound(s2)) => { - assert_eq!(s1.as_ref(), s2.as_ref()); - }, - (ComputeError(s1), ComputeError(s2)) => { - assert_eq!(s1.as_ref(), s2.as_ref()); - }, - _ => panic!("{e1:?} != {e2:?}"), - } -} - -#[test] -fn col_not_found_error_messages() { - fn get_err_msg(err_msg: &str, n: usize) -> String { - let plural_s; - let was_were; - - if n == 1 { - plural_s = ""; - was_were = "was" - } else { - plural_s = "s"; - was_were = "were"; - }; - format!( - "{err_msg}\n\nLogicalPlan had already failed with the above error; \ - after failure, {n} additional operation{plural_s} \ - {was_were} attempted on the LazyFrame" - ) - } - fn test_col_not_found(df: LazyFrame, n: usize) { - let err_msg = format!( - "xyz\n\nError originated just after this \ - operation:\n{INITIAL_PROJECTION_STR}" - ); - - let plan_err_str = - format!("ErrorState {{ n_times: {n}, err: ColumnNotFound(ErrString({err_msg:?})) }}"); - - let collect_err = if n == 0 { - PolarsError::ColumnNotFound(ErrString::from(err_msg.to_owned())) - } else { - PolarsError::ColumnNotFound(ErrString::from(get_err_msg(&err_msg, n))) - }; - - assert_eq!(df.describe_plan(), plan_err_str); - assert_errors_eq(&df.collect().unwrap_err(), &collect_err); - } - - let df = make_df(); - - assert_eq!(df.describe_plan(), INITIAL_PROJECTION_STR); - - test_col_not_found(df.clone().select([col("xyz")]), 0); - test_col_not_found(df.clone().select([col("xyz")]).select([col("c1")]), 1); - test_col_not_found( - df.clone() - .select([col("xyz")]) - .select([col("c1")]) - .select([col("c2")]), - 2, - ); - test_col_not_found( - df.clone() - .select([col("xyz")]) - .select([col("c1")]) - .select([col("c2")]) - .select([col("c3")]), - 3, - ); -} diff --git a/crates/polars-lazy/src/tests/logical.rs b/crates/polars-lazy/src/tests/logical.rs index 3c905bebf368..674e6ecd793b 100644 --- a/crates/polars-lazy/src/tests/logical.rs +++ b/crates/polars-lazy/src/tests/logical.rs @@ -100,14 +100,14 @@ fn test_lazy_logical_plan_schema() { .select(&[col("variety").alias("foo")]) .logical_plan; - assert!(lp.schema().unwrap().get("foo").is_some()); + assert!(lp.compute_schema().unwrap().get("foo").is_some()); let lp = df .lazy() .group_by([col("variety")]) .agg([col("sepal_width").min()]) .logical_plan; - assert!(lp.schema().unwrap().get("sepal_width").is_some()); + assert!(lp.compute_schema().unwrap().get("sepal_width").is_some()); } #[test] diff --git a/crates/polars-lazy/src/tests/mod.rs b/crates/polars-lazy/src/tests/mod.rs index c598dddb9233..b05c545465df 100644 --- a/crates/polars-lazy/src/tests/mod.rs +++ b/crates/polars-lazy/src/tests/mod.rs @@ -2,7 +2,6 @@ mod aggregations; mod arity; #[cfg(all(feature = "strings", feature = "cse"))] mod cse; -mod err_msg; #[cfg(feature = "parquet")] mod io; mod logical; diff --git a/crates/polars-ops/src/series/ops/horizontal.rs b/crates/polars-ops/src/series/ops/horizontal.rs index fd4dd76d2434..4aad55f7d966 100644 --- a/crates/polars-ops/src/series/ops/horizontal.rs +++ b/crates/polars-ops/src/series/ops/horizontal.rs @@ -23,7 +23,7 @@ pub fn any_horizontal(s: &[Series]) -> PolarsResult { Ok(out.into_series()) } -pub fn all_horizontal(s: &[Series]) -> PolarsResult { +pub fn all_horizontal_impl(s: &[Series]) -> PolarsResult { let out = POOL .install(|| { s.par_iter() diff --git a/crates/polars-plan/src/dot.rs b/crates/polars-plan/src/dot.rs index 81da961b9c2a..e8be059bf905 100644 --- a/crates/polars-plan/src/dot.rs +++ b/crates/polars-plan/src/dot.rs @@ -206,7 +206,7 @@ impl DslPlan { id_map, ), Select { expr, input, .. } => { - let schema = input.schema().map_err(|_| { + let schema = input.compute_schema().map_err(|_| { eprintln!("could not determine schema"); std::fmt::Error })?; @@ -407,15 +407,6 @@ impl DslPlan { self.write_dot(acc_str, prev_node, current_node, id_map)?; input.dot(acc_str, (branch, id + 1), current_node, id_map) }, - Error { err, .. } => { - let fmt = format!("{:?}", &err.0); - let current_node = DotNode { - branch, - id, - fmt: &fmt, - }; - self.write_dot(acc_str, prev_node, current_node, id_map) - }, } } diff --git a/crates/polars-plan/src/dsl/function_expr/boolean.rs b/crates/polars-plan/src/dsl/function_expr/boolean.rs index 6bb888fe7233..6f6cf1824eb8 100644 --- a/crates/polars-plan/src/dsl/function_expr/boolean.rs +++ b/crates/polars-plan/src/dsl/function_expr/boolean.rs @@ -207,7 +207,7 @@ fn any_horizontal(s: &[Series]) -> PolarsResult { } fn all_horizontal(s: &[Series]) -> PolarsResult { - polars_ops::prelude::all_horizontal(s) + polars_ops::prelude::all_horizontal_impl(s) } fn not(s: &Series) -> PolarsResult { diff --git a/crates/polars-plan/src/dsl/meta.rs b/crates/polars-plan/src/dsl/meta.rs index ccd1a17fda42..9c3ac8b4ae4f 100644 --- a/crates/polars-plan/src/dsl/meta.rs +++ b/crates/polars-plan/src/dsl/meta.rs @@ -2,7 +2,7 @@ use std::fmt::Display; use std::ops::BitAnd; use super::*; -use crate::logical_plan::projection::is_regex_projection; +use crate::logical_plan::expr_expansion::is_regex_projection; use crate::logical_plan::tree_format::TreeFmtVisitor; use crate::logical_plan::visitor::{AexprNode, TreeWalker}; diff --git a/crates/polars-plan/src/logical_plan/builder_dsl.rs b/crates/polars-plan/src/logical_plan/builder_dsl.rs index 4ca99e292be5..712a7dfab8fb 100644 --- a/crates/polars-plan/src/logical_plan/builder_dsl.rs +++ b/crates/polars-plan/src/logical_plan/builder_dsl.rs @@ -27,10 +27,8 @@ use polars_io::{ utils::get_reader_bytes, }; -use super::builder_functions::*; use crate::constants::UNLIMITED_CACHE; -use crate::dsl::functions::horizontal::all_horizontal; -use crate::logical_plan::projection::{is_regex_projection, rewrite_projections}; +use crate::logical_plan::expr_expansion::rewrite_projections; #[cfg(feature = "python")] use crate::prelude::python_udf::PythonFunction; use crate::prelude::*; @@ -52,41 +50,6 @@ impl From for DslBuilder { } } -fn format_err(msg: &str, input: &DslPlan) -> String { - format!("{msg}\n\nError originated just after this operation:\n{input:?}") -} - -/// Returns every error or msg: &str as `ComputeError`. It also shows the logical plan node where the error originated. -/// If `input` is already a `DslPlan::Error`, then return it as is; errors already keep track of their previous -/// inputs, so we don't have to do it again here. -macro_rules! raise_err { - ($err:expr, $input:expr, $convert:ident) => {{ - let input: DslPlan = $input.clone(); - match &input { - DslPlan::Error { .. } => input, - _ => { - let format_err_outer = |msg: &str| format_err(msg, &input); - let err = $err.wrap_msg(&format_err_outer); - - DslPlan::Error { - input: Arc::new(input), - err: err.into(), // PolarsError -> ErrorState - } - }, - } - .$convert() - }}; -} - -macro_rules! try_delayed { - ($fallible:expr, $input:expr, $convert:ident) => { - match $fallible { - Ok(success) => success, - Err(err) => return raise_err!(err, $input, $convert), - } - }; -} - #[cfg(any(feature = "parquet", feature = "parquet_async",))] fn prepare_schema(mut schema: Schema, row_index: Option<&RowIndex>) -> SchemaRef { if let Some(rc) = row_index { @@ -445,265 +408,59 @@ impl DslBuilder { } pub fn drop(self, to_drop: PlHashSet) -> Self { - let schema = try_delayed!(self.0.schema(), &self.0, into); - - let mut output_schema = Schema::with_capacity(schema.len().saturating_sub(to_drop.len())); - let columns = schema - .iter() - .filter_map(|(col_name, dtype)| { - if to_drop.contains(col_name.as_str()) { - None - } else { - let out = Some(col(col_name)); - output_schema.with_column(col_name.clone(), dtype.clone()); - out - } - }) - .collect::>(); - - if columns.is_empty() { - self.map( - |_| Ok(DataFrame::empty()), - AllowedOptimizations::default(), - Some(Arc::new(|_: &Schema| Ok(Arc::new(Schema::default())))), - "EMPTY PROJECTION", - ) - } else { - DslPlan::Select { - expr: columns, - input: Arc::new(self.0), - schema: Arc::new(output_schema), - options: ProjectionOptions { - run_parallel: false, - duplicate_check: false, - }, - } - .into() - } + self.map_private(DslFunction::Drop(to_drop)) } pub fn project(self, exprs: Vec, options: ProjectionOptions) -> Self { - let schema = try_delayed!(self.0.schema(), &self.0, into); - let (exprs, schema) = try_delayed!(prepare_projection(exprs, &schema), &self.0, into); - - if exprs.is_empty() { - self.map( - |_| Ok(DataFrame::empty()), - AllowedOptimizations::default(), - Some(Arc::new(|_: &Schema| Ok(Arc::new(Schema::default())))), - "EMPTY PROJECTION", - ) - } else { - DslPlan::Select { - expr: exprs, - input: Arc::new(self.0), - schema: Arc::new(schema), - options, - } - .into() + DslPlan::Select { + expr: exprs, + input: Arc::new(self.0), + options, } + .into() } pub fn fill_null(self, fill_value: Expr) -> Self { - let schema = try_delayed!(self.0.schema(), &self.0, into); - let exprs = schema - .iter_names() - .map(|name| col(name).fill_null(fill_value.clone())) - .collect(); - self.project(exprs, Default::default()) + self.project( + vec![all().fill_null(fill_value)], + ProjectionOptions { + duplicate_check: false, + ..Default::default() + }, + ) } pub fn drop_nulls(self, subset: Option>) -> Self { - match subset { - None => { - let predicate = - try_delayed!(all_horizontal([col("*").is_not_null()]), &self.0, into); - self.filter(predicate) - }, - Some(subset) => { - let predicate = try_delayed!( - all_horizontal( - subset - .into_iter() - .map(|e| e.is_not_null()) - .collect::>(), - ), - &self.0, - into - ); - self.filter(predicate) - }, - } + self.map_private(DslFunction::DropNulls(subset)) } pub fn fill_nan(self, fill_value: Expr) -> Self { - let schema = try_delayed!(self.0.schema(), &self.0, into); - - let exprs = schema - .iter() - .filter_map(|(name, dtype)| match dtype { - DataType::Float32 | DataType::Float64 => { - Some(col(name).fill_nan(fill_value.clone()).alias(name)) - }, - _ => None, - }) - .collect(); - self.with_columns( - exprs, - ProjectionOptions { - run_parallel: false, - duplicate_check: false, - }, - ) + self.map_private(DslFunction::FillNan(fill_value)) } pub fn with_columns(self, exprs: Vec, options: ProjectionOptions) -> Self { if exprs.is_empty() { return self; } - // current schema - let schema = try_delayed!(self.0.schema(), &self.0, into); - let mut new_schema = (**schema).clone(); - let (exprs, _) = try_delayed!(prepare_projection(exprs, &schema), &self.0, into); - - let mut output_names = PlHashSet::with_capacity(exprs.len()); - - let mut arena = Arena::with_capacity(8); - for e in &exprs { - let field = e - .to_field_amortized(&schema, Context::Default, &mut arena) - .unwrap(); - - if !output_names.insert(field.name().clone()) { - let msg = format!( - "the name: '{}' passed to `LazyFrame.with_columns` is duplicate\n\n\ - It's possible that multiple expressions are returning the same default column name. \ - If this is the case, try renaming the columns with `.alias(\"new_name\")` to avoid \ - duplicate column names.", - field.name() - ); - return raise_err!(polars_err!(ComputeError: msg), &self.0, into); - } - new_schema.with_column(field.name().clone(), field.data_type().clone()); - arena.clear(); - } DslPlan::HStack { input: Arc::new(self.0), exprs, - schema: Arc::new(new_schema), options, } .into() } - pub fn add_err(self, err: PolarsError) -> Self { - DslPlan::Error { - input: Arc::new(self.0), - err: err.into(), - } - .into() - } - pub fn with_context(self, contexts: Vec) -> Self { - let mut schema = try_delayed!(self.0.schema(), &self.0, into) - .as_ref() - .as_ref() - .clone(); - - for lp in &contexts { - let other_schema = try_delayed!(lp.schema(), lp, into); - - for fld in other_schema.iter_fields() { - if schema.get(fld.name()).is_none() { - schema.with_column(fld.name, fld.dtype); - } - } - } DslPlan::ExtContext { input: Arc::new(self.0), contexts, - schema: Arc::new(schema), } .into() } /// Apply a filter pub fn filter(self, predicate: Expr) -> Self { - let schema = try_delayed!(self.0.schema(), &self.0, into); - - let predicate = if has_expr(&predicate, |e| match e { - Expr::Column(name) => is_regex_projection(name), - Expr::Wildcard - | Expr::Selector(_) - | Expr::RenameAlias { .. } - | Expr::Columns(_) - | Expr::DtypeColumn(_) - | Expr::Nth(_) => true, - _ => false, - }) { - let mut rewritten = try_delayed!( - rewrite_projections(vec![predicate], &schema, &[]), - &self.0, - into - ); - match rewritten.len() { - 1 => { - // all good - rewritten.pop().unwrap() - }, - 0 => { - let msg = "The predicate expanded to zero expressions. \ - This may for example be caused by a regex not matching column names or \ - a column dtype match not hitting any dtypes in the DataFrame"; - return raise_err!(polars_err!(ComputeError: msg), &self.0, into); - }, - _ => { - let mut expanded = String::new(); - for e in rewritten.iter().take(5) { - expanded.push_str(&format!("\t{e},\n")) - } - // pop latest comma - expanded.pop(); - if rewritten.len() > 5 { - expanded.push_str("\t...\n") - } - - let msg = if cfg!(feature = "python") { - format!("The predicate passed to 'LazyFrame.filter' expanded to multiple expressions: \n\n{expanded}\n\ - This is ambiguous. Try to combine the predicates with the 'all' or `any' expression.") - } else { - format!("The predicate passed to 'LazyFrame.filter' expanded to multiple expressions: \n\n{expanded}\n\ - This is ambiguous. Try to combine the predicates with the 'all_horizontal' or `any_horizontal' expression.") - }; - return raise_err!(polars_err!(ComputeError: msg), &self.0, into); - }, - } - } else { - predicate - }; - - // Check predicates refer to valid column names here, as this is not - // checked by predicate pushdown and may otherwise lead to incorrect - // optimizations. For example: - // - // (unoptimized) - // FILTER [(col("x")) == (1)] FROM - // SELECT [col("x").alias("y")] FROM - // DF ["x"]; PROJECT 1/1 COLUMNS; SELECTION: "None" - // - // (optimized) - // SELECT [col("x").alias("y")] FROM - // DF ["x"]; PROJECT 1/1 COLUMNS; SELECTION: "[(col(\"x\")) == (1)]" - // ^^^ - // "x" is incorrectly pushed down even though it didn't exist after SELECT - try_delayed!( - expr_to_leaf_column_names_iter(&predicate) - .try_for_each(|c| schema.try_index_of(&c).and(Ok(()))), - &self.0, - into - ); - DslPlan::Filter { predicate, input: Arc::new(self.0), @@ -715,73 +472,12 @@ impl DslBuilder { self, keys: Vec, aggs: E, - apply: Option>, + apply: Option<(Arc, SchemaRef)>, maintain_order: bool, #[cfg(feature = "dynamic_group_by")] dynamic_options: Option, #[cfg(feature = "dynamic_group_by")] rolling_options: Option, ) -> Self { - let current_schema = try_delayed!(self.0.schema(), &self.0, into); - let current_schema = current_schema.as_ref(); - let keys = try_delayed!( - rewrite_projections(keys, current_schema, &[]), - &self.0, - into - ); - let aggs = try_delayed!( - rewrite_projections(aggs.as_ref().to_vec(), current_schema, keys.as_ref()), - &self.0, - into - ); - - // Initialize schema from keys - let mut schema = try_delayed!( - expressions_to_schema(&keys, current_schema, Context::Default), - &self.0, - into - ); - - // Add dynamic groupby index column(s) - #[cfg(feature = "dynamic_group_by")] - { - if let Some(options) = rolling_options.as_ref() { - let name = &options.index_column; - let dtype = try_delayed!(current_schema.try_get(name), self.0, into); - schema.with_column(name.clone(), dtype.clone()); - } else if let Some(options) = dynamic_options.as_ref() { - let name = &options.index_column; - let dtype = try_delayed!(current_schema.try_get(name), self.0, into); - if options.include_boundaries { - schema.with_column("_lower_boundary".into(), dtype.clone()); - schema.with_column("_upper_boundary".into(), dtype.clone()); - } - schema.with_column(name.clone(), dtype.clone()); - } - } - let keys_index_len = schema.len(); - - // Add aggregation column(s) - let aggs_schema = try_delayed!( - expressions_to_schema(&aggs, current_schema, Context::Aggregation), - &self.0, - into - ); - schema.merge(aggs_schema); - - // Make sure aggregation columns do not contain keys or index columns - if schema.len() < (keys_index_len + aggs.len()) { - let check_names = || { - let mut names = PlHashSet::with_capacity(schema.len()); - for expr in aggs.iter().chain(keys.iter()) { - let name = expr_output_name(expr)?; - if !names.insert(name.clone()) { - polars_bail!(duplicate = name); - } - } - Ok(()) - }; - try_delayed!(check_names(), &self.0, into) - } - + let aggs = aggs.as_ref().to_vec(); let options = GroupbyOptions { #[cfg(feature = "dynamic_group_by")] dynamic: dynamic_options, @@ -792,9 +488,8 @@ impl DslBuilder { DslPlan::GroupBy { input: Arc::new(self.0), - keys: Arc::new(keys), + keys, aggs, - schema: Arc::new(schema), apply, maintain_order, options: Arc::new(options), @@ -819,8 +514,6 @@ impl DslBuilder { } pub fn sort(self, by_column: Vec, sort_options: SortMultipleOptions) -> Self { - let schema = try_delayed!(self.0.schema(), &self.0, into); - let by_column = try_delayed!(rewrite_projections(by_column, &schema, &[]), &self.0, into); DslPlan::Sort { input: Arc::new(self.0), by_column, @@ -831,40 +524,17 @@ impl DslBuilder { } pub fn explode(self, columns: Vec) -> Self { - let schema = try_delayed!(self.0.schema(), &self.0, into); - let columns = try_delayed!(rewrite_projections(columns, &schema, &[]), &self.0, into); - - // columns to string - let columns = columns - .iter() - .map(|e| { - if let Expr::Column(name) = e { - name.clone() - } else { - panic!("expected column expression") - } - }) - .collect::]>>(); - - let mut schema = (**schema).clone(); - try_delayed!(explode_schema(&mut schema, &columns), &self.0, into); - DslPlan::MapFunction { input: Arc::new(self.0), - function: FunctionNode::Explode { - columns, - schema: Arc::new(schema), - }, + function: DslFunction::Explode { columns }, } .into() } - pub fn melt(self, args: Arc) -> Self { - let schema = try_delayed!(self.0.schema(), &self.0, into); - let schema = det_melt_schema(&args, &schema); + pub fn melt(self, args: MeltArgs) -> Self { DslPlan::MapFunction { input: Arc::new(self.0), - function: FunctionNode::Melt { args, schema }, + function: DslFunction::Melt { args }, } .into() } @@ -872,10 +542,9 @@ impl DslBuilder { pub fn row_index(self, name: &str, offset: Option) -> Self { DslPlan::MapFunction { input: Arc::new(self.0), - function: FunctionNode::RowIndex { + function: DslFunction::RowIndex { name: ColumnName::from(name), offset, - schema: Default::default(), }, } .into() @@ -905,40 +574,16 @@ impl DslBuilder { right_on: Vec, options: Arc, ) -> Self { - for e in left_on.iter().chain(right_on.iter()) { - if has_expr(e, |e| matches!(e, Expr::Alias(_, _))) { - return DslPlan::Error { - input: Arc::new(self.0), - err: polars_err!( - ComputeError: - "'alias' is not allowed in a join key, use 'with_columns' first", - ) - .into(), - } - .into(); - } - } - - let schema_left = try_delayed!(self.0.schema(), &self.0, into); - let schema_right = try_delayed!(other.schema(), &self.0, into); - - let schema = try_delayed!( - det_join_schema(&schema_left, &schema_right, &left_on, &right_on, &options), - self.0, - into - ); - DslPlan::Join { input_left: Arc::new(self.0), input_right: Arc::new(other), - schema, left_on, right_on, options, } .into() } - pub fn map_private(self, function: FunctionNode) -> Self { + pub fn map_private(self, function: DslFunction) -> Self { DslPlan::MapFunction { input: Arc::new(self.0), function, @@ -956,14 +601,14 @@ impl DslBuilder { ) -> Self { DslPlan::MapFunction { input: Arc::new(self.0), - function: FunctionNode::OpaquePython { + function: DslFunction::FunctionNode(FunctionNode::OpaquePython { function, schema, predicate_pd: optimizations.predicate_pushdown, projection_pd: optimizations.projection_pushdown, streamable: optimizations.streaming, validate_output, - }, + }), } .into() } @@ -982,14 +627,14 @@ impl DslBuilder { DslPlan::MapFunction { input: Arc::new(self.0), - function: FunctionNode::Opaque { + function: DslFunction::FunctionNode(FunctionNode::Opaque { function, schema, predicate_pd: optimizations.predicate_pushdown, projection_pd: optimizations.projection_pushdown, streamable: optimizations.streaming, fmt_str: name, - }, + }), } .into() } diff --git a/crates/polars-plan/src/logical_plan/builder_functions.rs b/crates/polars-plan/src/logical_plan/builder_functions.rs deleted file mode 100644 index 1d9ef90e2d58..000000000000 --- a/crates/polars-plan/src/logical_plan/builder_functions.rs +++ /dev/null @@ -1,56 +0,0 @@ -use polars_core::utils::try_get_supertype; - -use super::*; - -// Has functions that create schema's for both the `LogicalPlan` and the `IR` builders. - -pub(super) fn explode_schema(schema: &mut Schema, columns: &[Arc]) -> PolarsResult<()> { - // columns to string - columns.iter().try_for_each(|name| { - if let DataType::List(inner) = schema.try_get(name)? { - let inner = *inner.clone(); - schema.with_column(name.as_ref().into(), inner); - }; - Ok(()) - }) -} - -pub(super) fn det_melt_schema(args: &MeltArgs, input_schema: &Schema) -> SchemaRef { - let mut new_schema = args - .id_vars - .iter() - .map(|id| Field::new(id, input_schema.get(id).unwrap().clone())) - .collect::(); - let variable_name = args - .variable_name - .as_ref() - .cloned() - .unwrap_or_else(|| "variable".into()); - let value_name = args - .value_name - .as_ref() - .cloned() - .unwrap_or_else(|| "value".into()); - - new_schema.with_column(variable_name, DataType::String); - - // We need to determine the supertype of all value columns. - let mut supertype = DataType::Null; - - // take all columns that are not in `id_vars` as `value_var` - if args.value_vars.is_empty() { - let id_vars = PlHashSet::from_iter(&args.id_vars); - for (name, dtype) in input_schema.iter() { - if !id_vars.contains(name) { - supertype = try_get_supertype(&supertype, dtype).unwrap(); - } - } - } else { - for name in &args.value_vars { - let dtype = input_schema.get(name).unwrap(); - supertype = try_get_supertype(&supertype, dtype).unwrap(); - } - } - new_schema.with_column(value_name, supertype); - Arc::new(new_schema) -} diff --git a/crates/polars-plan/src/logical_plan/builder_ir.rs b/crates/polars-plan/src/logical_plan/builder_ir.rs index c5cf3f304874..fce36772f3f1 100644 --- a/crates/polars-plan/src/logical_plan/builder_ir.rs +++ b/crates/polars-plan/src/logical_plan/builder_ir.rs @@ -1,6 +1,5 @@ use std::borrow::Cow; -use super::builder_functions::*; use super::*; use crate::logical_plan::projection_expr::ProjectionExprs; @@ -198,14 +197,11 @@ impl<'a> IRBuilder<'a> { // call this if the schema needs to be updated pub(crate) fn explode(self, columns: Arc<[Arc]>) -> Self { - let mut schema = (*self.schema().into_owned()).clone(); - explode_schema(&mut schema, &columns).unwrap(); - let lp = IR::MapFunction { input: self.root, function: FunctionNode::Explode { columns, - schema: Arc::new(schema), + schema: Default::default(), }, }; self.add_alp(lp) @@ -301,11 +297,12 @@ impl<'a> IRBuilder<'a> { } pub fn melt(self, args: Arc) -> Self { - let schema = self.schema(); - let schema = det_melt_schema(&args, &schema); let lp = IR::MapFunction { input: self.root, - function: FunctionNode::Melt { args, schema }, + function: FunctionNode::Melt { + args, + schema: Default::default(), + }, }; self.add_alp(lp) } diff --git a/crates/polars-plan/src/logical_plan/conversion/dsl_plan_to_ir_plan.rs b/crates/polars-plan/src/logical_plan/conversion/dsl_plan_to_ir_plan.rs new file mode 100644 index 000000000000..f7d1dc270cc9 --- /dev/null +++ b/crates/polars-plan/src/logical_plan/conversion/dsl_plan_to_ir_plan.rs @@ -0,0 +1,579 @@ +use super::*; +use crate::logical_plan::expr_expansion::{is_regex_projection, rewrite_projections}; +use crate::logical_plan::projection_expr::ProjectionExprs; + +fn expand_expressions( + input: Node, + exprs: Vec, + lp_arena: &Arena, + expr_arena: &mut Arena, +) -> PolarsResult> { + let schema = lp_arena.get(input).schema(lp_arena); + let exprs = rewrite_projections(exprs, &schema, &[])?; + Ok(to_expr_irs(exprs, expr_arena)) +} + +fn empty_df() -> IR { + IR::DataFrameScan { + df: Arc::new(Default::default()), + schema: Arc::new(Default::default()), + output_schema: None, + projection: None, + selection: None, + } +} + +/// converts LogicalPlan to IR +/// it adds expressions & lps to the respective arenas as it traverses the plan +/// finally it returns the top node of the logical plan +#[recursive] +pub fn to_alp( + lp: DslPlan, + expr_arena: &mut Arena, + lp_arena: &mut Arena, +) -> PolarsResult { + let owned = Arc::unwrap_or_clone; + let v = match lp { + DslPlan::Scan { + mut file_info, + paths, + predicate, + scan_type, + file_options, + } => { + if let Some(row_index) = &file_options.row_index { + let schema = Arc::make_mut(&mut file_info.schema); + *schema = schema + .new_inserting_at_index(0, row_index.name.as_str().into(), IDX_DTYPE) + .unwrap(); + } + + IR::Scan { + file_info, + paths, + output_schema: None, + predicate: predicate.map(|expr| to_expr_ir(expr, expr_arena)), + scan_type, + file_options, + } + }, + #[cfg(feature = "python")] + DslPlan::PythonScan { options } => IR::PythonScan { + options, + predicate: None, + }, + DslPlan::Union { inputs, options } => { + let inputs = inputs + .into_iter() + .map(|lp| to_alp(lp, expr_arena, lp_arena)) + .collect::>()?; + IR::Union { inputs, options } + }, + DslPlan::HConcat { + inputs, + schema, + options, + } => { + let inputs = inputs + .into_iter() + .map(|lp| to_alp(lp, expr_arena, lp_arena)) + .collect::>()?; + IR::HConcat { + inputs, + schema, + options, + } + }, + DslPlan::Filter { input, predicate } => { + let input = to_alp(owned(input), expr_arena, lp_arena)?; + let predicate = expand_filter(predicate, input, lp_arena)?; + let predicate = to_expr_ir(predicate, expr_arena); + IR::Filter { input, predicate } + }, + DslPlan::Slice { input, offset, len } => { + let input = to_alp(owned(input), expr_arena, lp_arena)?; + IR::Slice { input, offset, len } + }, + DslPlan::DataFrameScan { + df, + schema, + output_schema, + projection, + selection, + } => IR::DataFrameScan { + df, + schema, + output_schema, + projection, + selection: selection.map(|expr| to_expr_ir(expr, expr_arena)), + }, + DslPlan::Select { + expr, + input, + options, + } => { + let input = to_alp(owned(input), expr_arena, lp_arena)?; + let schema = lp_arena.get(input).schema(lp_arena); + let (exprs, schema) = prepare_projection(expr, &schema)?; + + if exprs.is_empty() { + lp_arena.replace(input, empty_df()); + } + + let schema = Arc::new(schema); + let eirs = to_expr_irs(exprs, expr_arena); + let expr = eirs.into(); + IR::Select { + expr, + input, + schema, + options, + } + }, + DslPlan::Sort { + input, + by_column, + slice, + sort_options, + } => { + let input = to_alp(owned(input), expr_arena, lp_arena)?; + let by_column = expand_expressions(input, by_column, lp_arena, expr_arena)?; + IR::Sort { + input, + by_column, + slice, + sort_options, + } + }, + DslPlan::Cache { + input, + id, + cache_hits, + } => { + let input = to_alp(owned(input), expr_arena, lp_arena)?; + IR::Cache { + input, + id, + cache_hits, + } + }, + DslPlan::GroupBy { + input, + keys, + aggs, + apply, + maintain_order, + options, + } => { + let input = to_alp(owned(input), expr_arena, lp_arena)?; + + let (keys, aggs, schema) = + resolve_group_by(input, keys, aggs, &options, lp_arena, expr_arena)?; + + let (apply, schema) = if let Some((apply, schema)) = apply { + (Some(apply), schema) + } else { + (None, schema) + }; + + IR::GroupBy { + input, + keys, + aggs, + schema, + apply, + maintain_order, + options, + } + }, + DslPlan::Join { + input_left, + input_right, + left_on, + right_on, + options, + } => { + for e in left_on.iter().chain(right_on.iter()) { + if has_expr(e, |e| matches!(e, Expr::Alias(_, _))) { + polars_bail!( + ComputeError: + "'alias' is not allowed in a join key, use 'with_columns' first", + ) + } + } + + let input_left = to_alp(owned(input_left), expr_arena, lp_arena)?; + let input_right = to_alp(owned(input_right), expr_arena, lp_arena)?; + + let schema_left = lp_arena.get(input_left).schema(lp_arena); + let schema_right = lp_arena.get(input_right).schema(lp_arena); + + let schema = + det_join_schema(&schema_left, &schema_right, &left_on, &right_on, &options)?; + + let left_on = to_expr_irs_ignore_alias(left_on, expr_arena); + let right_on = to_expr_irs_ignore_alias(right_on, expr_arena); + + IR::Join { + input_left, + input_right, + schema, + left_on, + right_on, + options, + } + }, + DslPlan::HStack { + input, + exprs, + options, + } => { + let input = to_alp(owned(input), expr_arena, lp_arena)?; + let (exprs, schema) = resolve_with_columns(exprs, input, lp_arena, expr_arena)?; + IR::HStack { + input, + exprs, + schema, + options, + } + }, + DslPlan::Distinct { input, options } => { + let input = to_alp(owned(input), expr_arena, lp_arena)?; + IR::Distinct { input, options } + }, + DslPlan::MapFunction { input, function } => { + let input = to_alp(owned(input), expr_arena, lp_arena)?; + let input_schema = lp_arena.get(input).schema(lp_arena); + + match function { + DslFunction::FillNan(fill_value) => { + let exprs = input_schema + .iter() + .filter_map(|(name, dtype)| match dtype { + DataType::Float32 | DataType::Float64 => { + Some(col(name).fill_nan(fill_value.clone()).alias(name)) + }, + _ => None, + }) + .collect::>(); + + let (exprs, schema) = resolve_with_columns(exprs, input, lp_arena, expr_arena)?; + IR::HStack { + input, + exprs, + schema, + options: ProjectionOptions { + duplicate_check: false, + ..Default::default() + }, + } + }, + DslFunction::DropNulls(subset) => { + let predicate = match subset { + None => all_horizontal([col("*").is_not_null()]), + Some(subset) => all_horizontal( + subset + .into_iter() + .map(|e| e.is_not_null()) + .collect::>(), + ), + }?; + let predicate = rewrite_projections(vec![predicate], &input_schema, &[])? + .pop() + .unwrap(); + let predicate = to_expr_ir(predicate, expr_arena); + IR::Filter { predicate, input } + }, + DslFunction::Drop(to_drop) => { + let mut output_schema = + Schema::with_capacity(input_schema.len().saturating_sub(to_drop.len())); + + for (col_name, dtype) in input_schema.iter() { + if !to_drop.contains(col_name.as_str()) { + output_schema.with_column(col_name.clone(), dtype.clone()); + } + } + + if output_schema.is_empty() { + lp_arena.replace(input, empty_df()); + } + + IR::SimpleProjection { + input, + columns: Arc::new(output_schema), + duplicate_check: false, + } + }, + DslFunction::Stats(sf) => { + let exprs = match sf { + StatsFunction::Var { ddof } => stats_helper( + |dt| dt.is_numeric() || dt.is_bool(), + |name| col(name).var(ddof), + &input_schema, + ), + StatsFunction::Std { ddof } => stats_helper( + |dt| dt.is_numeric() || dt.is_bool(), + |name| col(name).std(ddof), + &input_schema, + ), + StatsFunction::Quantile { quantile, interpol } => stats_helper( + |dt| dt.is_numeric(), + |name| col(name).quantile(quantile.clone(), interpol), + &input_schema, + ), + StatsFunction::Mean => stats_helper( + |dt| { + dt.is_numeric() + || matches!( + dt, + DataType::Boolean + | DataType::Duration(_) + | DataType::Datetime(_, _) + | DataType::Time + ) + }, + |name| col(name).mean(), + &input_schema, + ), + StatsFunction::Sum => stats_helper( + |dt| { + dt.is_numeric() + || dt.is_decimal() + || matches!(dt, DataType::Boolean | DataType::Duration(_)) + }, + |name| col(name).sum(), + &input_schema, + ), + StatsFunction::Min => { + stats_helper(|dt| dt.is_ord(), |name| col(name).min(), &input_schema) + }, + StatsFunction::Max => { + stats_helper(|dt| dt.is_ord(), |name| col(name).max(), &input_schema) + }, + StatsFunction::Median => stats_helper( + |dt| { + dt.is_numeric() + || matches!( + dt, + DataType::Boolean + | DataType::Duration(_) + | DataType::Datetime(_, _) + | DataType::Time + ) + }, + |name| col(name).median(), + &input_schema, + ), + }; + let schema = Arc::new(expressions_to_schema( + &exprs, + &input_schema, + Context::Default, + )?); + let eirs = to_expr_irs(exprs, expr_arena); + let expr = eirs.into(); + IR::Select { + input, + expr, + schema, + options: ProjectionOptions { + duplicate_check: false, + ..Default::default() + }, + } + }, + _ => { + let function = function.into_function_node(&input_schema)?; + IR::MapFunction { input, function } + }, + } + }, + DslPlan::ExtContext { input, contexts } => { + let input = to_alp(owned(input), expr_arena, lp_arena)?; + let contexts = contexts + .into_iter() + .map(|lp| to_alp(lp, expr_arena, lp_arena)) + .collect::>>()?; + + let mut schema = (**lp_arena.get(input).schema(lp_arena)).clone(); + for input in &contexts { + let other_schema = lp_arena.get(*input).schema(lp_arena); + for fld in other_schema.iter_fields() { + if schema.get(fld.name()).is_none() { + schema.with_column(fld.name, fld.dtype); + } + } + } + + IR::ExtContext { + input, + contexts, + schema: Arc::new(schema), + } + }, + DslPlan::Sink { input, payload } => { + let input = to_alp(owned(input), expr_arena, lp_arena)?; + IR::Sink { input, payload } + }, + }; + Ok(lp_arena.add(v)) +} + +fn expand_filter(predicate: Expr, input: Node, lp_arena: &Arena) -> PolarsResult { + let schema = lp_arena.get(input).schema(lp_arena); + let predicate = if has_expr(&predicate, |e| match e { + Expr::Column(name) => is_regex_projection(name), + Expr::Wildcard + | Expr::Selector(_) + | Expr::RenameAlias { .. } + | Expr::Columns(_) + | Expr::DtypeColumn(_) + | Expr::Nth(_) => true, + _ => false, + }) { + let mut rewritten = rewrite_projections(vec![predicate], &schema, &[])?; + match rewritten.len() { + 1 => { + // all good + rewritten.pop().unwrap() + }, + 0 => { + let msg = "The predicate expanded to zero expressions. \ + This may for example be caused by a regex not matching column names or \ + a column dtype match not hitting any dtypes in the DataFrame"; + polars_bail!(ComputeError: msg); + }, + _ => { + let mut expanded = String::new(); + for e in rewritten.iter().take(5) { + expanded.push_str(&format!("\t{e},\n")) + } + // pop latest comma + expanded.pop(); + if rewritten.len() > 5 { + expanded.push_str("\t...\n") + } + + let msg = if cfg!(feature = "python") { + format!("The predicate passed to 'LazyFrame.filter' expanded to multiple expressions: \n\n{expanded}\n\ + This is ambiguous. Try to combine the predicates with the 'all' or `any' expression.") + } else { + format!("The predicate passed to 'LazyFrame.filter' expanded to multiple expressions: \n\n{expanded}\n\ + This is ambiguous. Try to combine the predicates with the 'all_horizontal' or `any_horizontal' expression.") + }; + polars_bail!(ComputeError: msg) + }, + } + } else { + predicate + }; + expr_to_leaf_column_names_iter(&predicate) + .try_for_each(|c| schema.try_index_of(&c).and(Ok(())))?; + + Ok(predicate) +} + +fn resolve_with_columns( + exprs: Vec, + input: Node, + lp_arena: &Arena, + expr_arena: &mut Arena, +) -> PolarsResult<(ProjectionExprs, SchemaRef)> { + let schema = lp_arena.get(input).schema(lp_arena); + let mut new_schema = (**schema).clone(); + let (exprs, _) = prepare_projection(exprs, &schema)?; + let mut output_names = PlHashSet::with_capacity(exprs.len()); + + let mut arena = Arena::with_capacity(8); + for e in &exprs { + let field = e + .to_field_amortized(&schema, Context::Default, &mut arena) + .unwrap(); + + if !output_names.insert(field.name().clone()) { + let msg = format!( + "the name: '{}' passed to `LazyFrame.with_columns` is duplicate\n\n\ + It's possible that multiple expressions are returning the same default column name. \ + If this is the case, try renaming the columns with `.alias(\"new_name\")` to avoid \ + duplicate column names.", + field.name() + ); + polars_bail!(ComputeError: msg) + } + new_schema.with_column(field.name().clone(), field.data_type().clone()); + arena.clear(); + } + + let eirs = to_expr_irs(exprs, expr_arena); + let exprs = eirs.into(); + Ok((exprs, Arc::new(new_schema))) +} + +fn resolve_group_by( + input: Node, + keys: Vec, + aggs: Vec, + _options: &GroupbyOptions, + lp_arena: &Arena, + expr_arena: &mut Arena, +) -> PolarsResult<(Vec, Vec, SchemaRef)> { + let current_schema = lp_arena.get(input).schema(lp_arena); + let current_schema = current_schema.as_ref(); + let keys = rewrite_projections(keys, current_schema, &[])?; + let aggs = rewrite_projections(aggs, current_schema, &keys)?; + + // Initialize schema from keys + let mut schema = expressions_to_schema(&keys, current_schema, Context::Default)?; + + // Add dynamic groupby index column(s) + #[cfg(feature = "dynamic_group_by")] + { + if let Some(options) = _options.rolling.as_ref() { + let name = &options.index_column; + let dtype = current_schema.try_get(name)?; + schema.with_column(name.clone(), dtype.clone()); + } else if let Some(options) = _options.dynamic.as_ref() { + let name = &options.index_column; + let dtype = current_schema.try_get(name)?; + if options.include_boundaries { + schema.with_column("_lower_boundary".into(), dtype.clone()); + schema.with_column("_upper_boundary".into(), dtype.clone()); + } + schema.with_column(name.clone(), dtype.clone()); + } + } + let keys_index_len = schema.len(); + + // Add aggregation column(s) + let aggs_schema = expressions_to_schema(&aggs, current_schema, Context::Aggregation)?; + schema.merge(aggs_schema); + + // Make sure aggregation columns do not contain keys or index columns + if schema.len() < (keys_index_len + aggs.len()) { + let mut names = PlHashSet::with_capacity(schema.len()); + for expr in aggs.iter().chain(keys.iter()) { + let name = expr_output_name(expr)?; + polars_ensure!(names.insert(name.clone()), duplicate = name) + } + } + let aggs = to_expr_irs(aggs, expr_arena); + let keys = keys.convert(|e| to_expr_ir(e.clone(), expr_arena)); + + Ok((keys, aggs, Arc::new(schema))) +} +fn stats_helper(condition: F, expr: E, schema: &Schema) -> Vec +where + F: Fn(&DataType) -> bool, + E: Fn(&str) -> Expr, +{ + schema + .iter() + .map(|(name, dt)| { + if condition(dt) { + expr(name) + } else { + lit(NULL).cast(dt.clone()).alias(name) + } + }) + .collect() +} diff --git a/crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs b/crates/polars-plan/src/logical_plan/conversion/expr_to_expr_ir.rs similarity index 60% rename from crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs rename to crates/polars-plan/src/logical_plan/conversion/expr_to_expr_ir.rs index 2ca49329921f..15f076f8064f 100644 --- a/crates/polars-plan/src/logical_plan/conversion/dsl_to_ir.rs +++ b/crates/polars-plan/src/logical_plan/conversion/expr_to_expr_ir.rs @@ -6,7 +6,7 @@ pub fn to_expr_ir(expr: Expr, arena: &mut Arena) -> ExprIR { ExprIR::new(node, state.output_name) } -fn to_expr_irs(input: Vec, arena: &mut Arena) -> Vec { +pub(super) fn to_expr_irs(input: Vec, arena: &mut Arena) -> Vec { input.convert_owned(|e| to_expr_ir(e, arena)) } @@ -17,7 +17,7 @@ pub fn to_expr_ir_ignore_alias(expr: Expr, arena: &mut Arena) -> ExprIR { ExprIR::new(node, state.output_name) } -fn to_expr_irs_ignore_alias(input: Vec, arena: &mut Arena) -> Vec { +pub(super) fn to_expr_irs_ignore_alias(input: Vec, arena: &mut Arena) -> Vec { input.convert_owned(|e| to_expr_ir_ignore_alias(e, arena)) } @@ -285,222 +285,3 @@ fn to_aexpr_impl(expr: Expr, arena: &mut Arena, state: &mut ConversionSta }; arena.add(v) } - -/// converts LogicalPlan to IR -/// it adds expressions & lps to the respective arenas as it traverses the plan -/// finally it returns the top node of the logical plan -#[recursive] -pub fn to_alp( - lp: DslPlan, - expr_arena: &mut Arena, - lp_arena: &mut Arena, -) -> PolarsResult { - let owned = Arc::unwrap_or_clone; - let v = match lp { - DslPlan::Scan { - file_info, - paths, - predicate, - scan_type, - file_options: options, - } => IR::Scan { - file_info, - paths, - output_schema: None, - predicate: predicate.map(|expr| to_expr_ir(expr, expr_arena)), - scan_type, - file_options: options, - }, - #[cfg(feature = "python")] - DslPlan::PythonScan { options } => IR::PythonScan { - options, - predicate: None, - }, - DslPlan::Union { inputs, options } => { - let inputs = inputs - .into_iter() - .map(|lp| to_alp(lp, expr_arena, lp_arena)) - .collect::>()?; - IR::Union { inputs, options } - }, - DslPlan::HConcat { - inputs, - schema, - options, - } => { - let inputs = inputs - .into_iter() - .map(|lp| to_alp(lp, expr_arena, lp_arena)) - .collect::>()?; - IR::HConcat { - inputs, - schema, - options, - } - }, - DslPlan::Filter { input, predicate } => { - let i = to_alp(owned(input), expr_arena, lp_arena)?; - let p = to_expr_ir(predicate, expr_arena); - IR::Filter { - input: i, - predicate: p, - } - }, - DslPlan::Slice { input, offset, len } => { - let input = to_alp(owned(input), expr_arena, lp_arena)?; - IR::Slice { input, offset, len } - }, - DslPlan::DataFrameScan { - df, - schema, - output_schema, - projection, - selection, - } => IR::DataFrameScan { - df, - schema, - output_schema, - projection, - selection: selection.map(|expr| to_expr_ir(expr, expr_arena)), - }, - DslPlan::Select { - expr, - input, - schema, - options, - } => { - let eirs = to_expr_irs(expr, expr_arena); - let expr = eirs.into(); - let i = to_alp(owned(input), expr_arena, lp_arena)?; - IR::Select { - expr, - input: i, - schema, - options, - } - }, - DslPlan::Sort { - input, - by_column, - slice, - sort_options, - } => { - let input = to_alp(owned(input), expr_arena, lp_arena)?; - let by_column = to_expr_irs(by_column, expr_arena); - IR::Sort { - input, - by_column, - slice, - sort_options, - } - }, - DslPlan::Cache { - input, - id, - cache_hits, - } => { - let input = to_alp(owned(input), expr_arena, lp_arena)?; - IR::Cache { - input, - id, - cache_hits, - } - }, - DslPlan::GroupBy { - input, - keys, - aggs, - schema, - apply, - maintain_order, - options, - } => { - let i = to_alp(owned(input), expr_arena, lp_arena)?; - let aggs = to_expr_irs(aggs, expr_arena); - let keys = keys.convert(|e| to_expr_ir(e.clone(), expr_arena)); - - IR::GroupBy { - input: i, - keys, - aggs, - schema, - apply, - maintain_order, - options, - } - }, - DslPlan::Join { - input_left, - input_right, - schema, - left_on, - right_on, - options, - } => { - let input_left = to_alp(owned(input_left), expr_arena, lp_arena)?; - let input_right = to_alp(owned(input_right), expr_arena, lp_arena)?; - - let left_on = to_expr_irs_ignore_alias(left_on, expr_arena); - let right_on = to_expr_irs_ignore_alias(right_on, expr_arena); - - IR::Join { - input_left, - input_right, - schema, - left_on, - right_on, - options, - } - }, - DslPlan::HStack { - input, - exprs, - schema, - options, - } => { - let eirs = to_expr_irs(exprs, expr_arena); - let exprs = eirs.into(); - let input = to_alp(owned(input), expr_arena, lp_arena)?; - IR::HStack { - input, - exprs, - schema, - options, - } - }, - DslPlan::Distinct { input, options } => { - let input = to_alp(owned(input), expr_arena, lp_arena)?; - IR::Distinct { input, options } - }, - DslPlan::MapFunction { input, function } => { - let input = to_alp(owned(input), expr_arena, lp_arena)?; - IR::MapFunction { input, function } - }, - DslPlan::Error { err, .. } => { - // We just take the error. The LogicalPlan should not be used anymore once this - // is taken. - return Err(err.take()); - }, - DslPlan::ExtContext { - input, - contexts, - schema, - } => { - let input = to_alp(owned(input), expr_arena, lp_arena)?; - let contexts = contexts - .into_iter() - .map(|lp| to_alp(lp, expr_arena, lp_arena)) - .collect::>()?; - IR::ExtContext { - input, - contexts, - schema, - } - }, - DslPlan::Sink { input, payload } => { - let input = to_alp(owned(input), expr_arena, lp_arena)?; - IR::Sink { input, payload } - }, - }; - Ok(lp_arena.add(v)) -} diff --git a/crates/polars-plan/src/logical_plan/conversion/mod.rs b/crates/polars-plan/src/logical_plan/conversion/mod.rs index dfec3683a60b..7da56e2a9c8a 100644 --- a/crates/polars-plan/src/logical_plan/conversion/mod.rs +++ b/crates/polars-plan/src/logical_plan/conversion/mod.rs @@ -1,9 +1,11 @@ -mod dsl_to_ir; +mod dsl_plan_to_ir_plan; +mod expr_to_expr_ir; mod ir_to_dsl; use std::borrow::Cow; -pub use dsl_to_ir::*; +pub use dsl_plan_to_ir_plan::*; +pub use expr_to_expr_ir::*; pub use ir_to_dsl::*; use polars_core::prelude::*; use polars_utils::vec::ConvertVec; @@ -102,7 +104,7 @@ impl IR { IR::Select { expr, input, - schema, + schema: _, options, } => { let i = convert_to_lp(input, lp_arena); @@ -110,7 +112,6 @@ impl IR { DslPlan::Select { expr, input: Arc::new(i), - schema, options, } }, @@ -123,7 +124,6 @@ impl IR { DslPlan::Select { expr, input: Arc::new(input), - schema: columns.clone(), options: Default::default(), } }, @@ -164,15 +164,14 @@ impl IR { options: dynamic_options, } => { let i = convert_to_lp(input, lp_arena); - let keys = Arc::new(expr_irs_to_exprs(keys, expr_arena)); + let keys = expr_irs_to_exprs(keys, expr_arena); let aggs = expr_irs_to_exprs(aggs, expr_arena); DslPlan::GroupBy { input: Arc::new(i), keys, aggs, - schema, - apply, + apply: apply.map(|apply| (apply, schema)), maintain_order, options: dynamic_options, } @@ -180,7 +179,7 @@ impl IR { IR::Join { input_left, input_right, - schema, + schema: _, left_on, right_on, options, @@ -194,7 +193,6 @@ impl IR { DslPlan::Join { input_left: Arc::new(i_l), input_right: Arc::new(i_r), - schema, left_on, right_on, options, @@ -203,8 +201,8 @@ impl IR { IR::HStack { input, exprs, - schema, options, + .. } => { let i = convert_to_lp(input, lp_arena); let exprs = expr_irs_to_exprs(exprs.all_exprs(), expr_arena); @@ -212,7 +210,6 @@ impl IR { DslPlan::HStack { input: Arc::new(i), exprs, - schema, options, } }, @@ -225,23 +222,20 @@ impl IR { }, IR::MapFunction { input, function } => { let input = Arc::new(convert_to_lp(input, lp_arena)); - DslPlan::MapFunction { input, function } + DslPlan::MapFunction { + input, + function: function.into(), + } }, IR::ExtContext { - input, - contexts, - schema, + input, contexts, .. } => { let input = Arc::new(convert_to_lp(input, lp_arena)); let contexts = contexts .into_iter() .map(|node| convert_to_lp(node, lp_arena)) .collect(); - DslPlan::ExtContext { - input, - contexts, - schema, - } + DslPlan::ExtContext { input, contexts } }, IR::Sink { input, payload } => { let input = Arc::new(convert_to_lp(input, lp_arena)); diff --git a/crates/polars-plan/src/logical_plan/projection.rs b/crates/polars-plan/src/logical_plan/expr_expansion.rs similarity index 100% rename from crates/polars-plan/src/logical_plan/projection.rs rename to crates/polars-plan/src/logical_plan/expr_expansion.rs diff --git a/crates/polars-plan/src/logical_plan/format.rs b/crates/polars-plan/src/logical_plan/format.rs index f63609661de4..7f343f471458 100644 --- a/crates/polars-plan/src/logical_plan/format.rs +++ b/crates/polars-plan/src/logical_plan/format.rs @@ -228,7 +228,6 @@ impl DslPlan { write!(f, "{:indent$}{function_fmt}", "")?; input._format(f, sub_indent) }, - Error { err, .. } => write!(f, "{err:?}"), ExtContext { input, .. } => { write!(f, "{:indent$}EXTERNAL_CONTEXT", "")?; input._format(f, sub_indent) diff --git a/crates/polars-plan/src/logical_plan/functions/dsl.rs b/crates/polars-plan/src/logical_plan/functions/dsl.rs new file mode 100644 index 000000000000..4f362075cdd5 --- /dev/null +++ b/crates/polars-plan/src/logical_plan/functions/dsl.rs @@ -0,0 +1,159 @@ +use super::*; +use crate::logical_plan::expr_expansion::rewrite_projections; + +// Except for Opaque functions, this only has the DSL name of the function. +#[derive(Clone)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub enum DslFunction { + FunctionNode(FunctionNode), + Explode { + columns: Vec, + }, + Melt { + args: MeltArgs, + }, + RowIndex { + name: Arc, + offset: Option, + }, + Rename { + existing: Arc<[SmartString]>, + new: Arc<[SmartString]>, + }, + Stats(StatsFunction), + /// FillValue + FillNan(Expr), + DropNulls(Option>), + Drop(PlHashSet), +} + +#[derive(Clone)] +#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] +pub enum StatsFunction { + Var { + ddof: u8, + }, + Std { + ddof: u8, + }, + Quantile { + quantile: Expr, + interpol: QuantileInterpolOptions, + }, + Median, + Mean, + Sum, + Min, + Max, +} + +impl DslFunction { + pub(crate) fn into_function_node(self, input_schema: &Schema) -> PolarsResult { + let function = match self { + DslFunction::Explode { columns } => { + let columns = rewrite_projections(columns, input_schema, &[])?; + // columns to string + let columns = columns + .iter() + .map(|e| { + if let Expr::Column(name) = e { + Ok(name.clone()) + } else { + polars_bail!(InvalidOperation: "expected column expression") + } + }) + .collect::]>>>()?; + FunctionNode::Explode { + columns, + schema: Default::default(), + } + }, + DslFunction::Melt { args } => FunctionNode::Melt { + args: Arc::new(args), + schema: Default::default(), + }, + DslFunction::FunctionNode(func) => func, + DslFunction::RowIndex { name, offset } => FunctionNode::RowIndex { + name, + offset, + schema: Default::default(), + }, + DslFunction::Rename { existing, new } => { + let swapping = new.iter().any(|name| input_schema.get(name).is_some()); + + // Check if the name exists. + for name in existing.iter() { + let _ = input_schema.try_get(name)?; + } + + FunctionNode::Rename { + existing, + new, + swapping, + schema: Default::default(), + } + }, + DslFunction::Stats(_) + | DslFunction::FillNan(_) + | DslFunction::DropNulls(_) + | DslFunction::Drop(_) => { + // We should not reach this. + panic!("impl error") + }, + }; + Ok(function) + } +} + +impl Debug for DslFunction { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{self}") + } +} + +impl Display for DslFunction { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + use DslFunction::*; + match self { + FunctionNode(inner) => write!(f, "{inner}"), + Explode { .. } => write!(f, "EXPLODE"), + Melt { .. } => write!(f, "MELT"), + RowIndex { .. } => write!(f, "WITH ROW INDEX"), + Stats(_) => write!(f, "STATS"), + FillNan(_) => write!(f, "FILL NAN"), + DropNulls(_) => write!(f, "DROP NULLS"), + Drop(_) => write!(f, "DROP"), + // DropNulls { subset } => { + // write!(f, "DROP_NULLS by: ")?; + // let subset = subset.as_ref(); + // fmt_column_delimited(f, subset, "[", "]") + // }, + // Rechunk => write!(f, "RECHUNK"), + // Count { .. } => write!(f, "FAST COUNT(*)"), + // Unnest { columns } => { + // write!(f, "UNNEST by:")?; + // let columns = columns.as_ref(); + // fmt_column_delimited(f, columns, "[", "]") + // }, + // #[cfg(feature = "merge_sorted")] + // MergeSorted { .. } => write!(f, "MERGE SORTED"), + // Pipeline { original, .. } => { + // if let Some(original) = original { + // writeln!(f, "--- STREAMING")?; + // write!(f, "{:?}", original.as_ref())?; + // let indent = 2; + // writeln!(f, "{:indent$}--- END STREAMING", "") + // } else { + // writeln!(f, "STREAMING") + // } + // }, + Rename { .. } => write!(f, "RENAME"), + } + } +} + +impl From for DslFunction { + fn from(value: FunctionNode) -> Self { + DslFunction::FunctionNode(value) + } +} diff --git a/crates/polars-plan/src/logical_plan/functions/mod.rs b/crates/polars-plan/src/logical_plan/functions/mod.rs index 68b8943b42ff..189c7ed46bd1 100644 --- a/crates/polars-plan/src/logical_plan/functions/mod.rs +++ b/crates/polars-plan/src/logical_plan/functions/mod.rs @@ -1,4 +1,5 @@ mod count; +mod dsl; #[cfg(feature = "merge_sorted")] mod merge_sorted; #[cfg(feature = "python")] @@ -9,12 +10,11 @@ mod schema; use std::borrow::Cow; use std::fmt::{Debug, Display, Formatter}; use std::hash::{Hash, Hasher}; -use std::ops::Deref; use std::path::PathBuf; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; +pub use dsl::*; use polars_core::prelude::*; -use schema::CachedSchema; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; use smartstring::alias::String as SmartString; @@ -84,14 +84,18 @@ pub enum FunctionNode { new: Arc<[SmartString]>, // A column name gets swapped with an existing column swapping: bool, + #[cfg_attr(feature = "serde", serde(skip))] + schema: CachedSchema, }, Explode { columns: Arc<[Arc]>, - schema: SchemaRef, + #[cfg_attr(feature = "serde", serde(skip))] + schema: CachedSchema, }, Melt { args: Arc, - schema: SchemaRef, + #[cfg_attr(feature = "serde", serde(skip))] + schema: CachedSchema, }, RowIndex { name: Arc, @@ -159,6 +163,7 @@ impl Hash for FunctionNode { existing, new, swapping: _, + .. } => { existing.hash(state); new.hash(state); diff --git a/crates/polars-plan/src/logical_plan/functions/rename.rs b/crates/polars-plan/src/logical_plan/functions/rename.rs index 6426c24fae3b..fea6c2cc635c 100644 --- a/crates/polars-plan/src/logical_plan/functions/rename.rs +++ b/crates/polars-plan/src/logical_plan/functions/rename.rs @@ -21,20 +21,3 @@ pub(super) fn rename_impl( let columns = unsafe { std::mem::take(df.get_columns_mut()) }; DataFrame::new(columns) } - -pub(super) fn rename_schema<'a>( - input_schema: &'a SchemaRef, - existing: &[SmartString], - new: &[SmartString], -) -> PolarsResult> { - let mut new_schema = input_schema.iter_fields().collect::>(); - - for (old, new) in existing.iter().zip(new.iter()) { - // The column might be removed due to projection pushdown - // so we only update if we can find it. - if let Some((idx, _, _)) = input_schema.get_full(old) { - new_schema[idx].name = new.as_str().into(); - } - } - Ok(Cow::Owned(Arc::new(new_schema.into_iter().collect()))) -} diff --git a/crates/polars-plan/src/logical_plan/functions/schema.rs b/crates/polars-plan/src/logical_plan/functions/schema.rs index 9bcdcb0d8643..ba6e7acdfc4c 100644 --- a/crates/polars-plan/src/logical_plan/functions/schema.rs +++ b/crates/polars-plan/src/logical_plan/functions/schema.rs @@ -1,3 +1,5 @@ +use polars_core::utils::try_get_supertype; + use super::*; impl FunctionNode { @@ -6,7 +8,10 @@ impl FunctionNode { // We will likely add more branches later #[allow(clippy::single_match)] match self { - RowIndex { schema, .. } => { + RowIndex { schema, .. } + | Explode { schema, .. } + | Rename { schema, .. } + | Melt { schema, .. } => { let mut guard = schema.lock().unwrap(); *guard = None; }, @@ -84,11 +89,17 @@ impl FunctionNode { }, #[cfg(feature = "merge_sorted")] MergeSorted { .. } => Ok(Cow::Borrowed(input_schema)), - Rename { existing, new, .. } => rename::rename_schema(input_schema, existing, new), + Rename { + existing, + new, + schema, + .. + } => rename_schema(input_schema, existing, new, schema), RowIndex { schema, name, .. } => { Ok(Cow::Owned(row_index_schema(schema, input_schema, name))) }, - Explode { schema, .. } | Melt { schema, .. } => Ok(Cow::Owned(schema.clone())), + Explode { schema, columns } => explode_schema(schema, input_schema, columns), + Melt { schema, args } => melt_schema(args, schema, input_schema), } } } @@ -109,28 +120,101 @@ fn row_index_schema( schema_ref } -// We don't use an `Arc` because caches should live in different query plans. -// For that reason we have a specialized deep clone. -#[derive(Default)] -pub struct CachedSchema(Mutex>); - -impl AsRef>> for CachedSchema { - fn as_ref(&self) -> &Mutex> { - &self.0 +fn explode_schema<'a>( + cached_schema: &CachedSchema, + schema: &'a Schema, + columns: &[Arc], +) -> PolarsResult> { + let mut guard = cached_schema.lock().unwrap(); + if let Some(schema) = &*guard { + return Ok(Cow::Owned(schema.clone())); } + let mut schema = schema.clone(); + + // columns to string + columns.iter().try_for_each(|name| { + if let DataType::List(inner) = schema.try_get(name)? { + let inner = *inner.clone(); + schema.with_column(name.as_ref().into(), inner); + }; + PolarsResult::Ok(()) + })?; + let schema = Arc::new(schema); + *guard = Some(schema.clone()); + Ok(Cow::Owned(schema)) } -impl Deref for CachedSchema { - type Target = Mutex>; +fn melt_schema<'a>( + args: &MeltArgs, + cached_schema: &CachedSchema, + input_schema: &'a Schema, +) -> PolarsResult> { + let mut guard = cached_schema.lock().unwrap(); + if let Some(schema) = &*guard { + return Ok(Cow::Owned(schema.clone())); + } + + let mut new_schema = args + .id_vars + .iter() + .map(|id| Field::new(id, input_schema.get(id).unwrap().clone())) + .collect::(); + let variable_name = args + .variable_name + .as_ref() + .cloned() + .unwrap_or_else(|| "variable".into()); + let value_name = args + .value_name + .as_ref() + .cloned() + .unwrap_or_else(|| "value".into()); - fn deref(&self) -> &Self::Target { - &self.0 + new_schema.with_column(variable_name, DataType::String); + + // We need to determine the supertype of all value columns. + let mut supertype = DataType::Null; + + // take all columns that are not in `id_vars` as `value_var` + if args.value_vars.is_empty() { + let id_vars = PlHashSet::from_iter(&args.id_vars); + for (name, dtype) in input_schema.iter() { + if !id_vars.contains(name) { + supertype = try_get_supertype(&supertype, dtype).unwrap(); + } + } + } else { + for name in &args.value_vars { + let dtype = input_schema.get(name).unwrap(); + supertype = try_get_supertype(&supertype, dtype).unwrap(); + } } + new_schema.with_column(value_name, supertype); + let schema = Arc::new(new_schema); + *guard = Some(schema.clone()); + Ok(Cow::Owned(schema)) } -impl Clone for CachedSchema { - fn clone(&self) -> Self { - let inner = self.0.lock().unwrap(); - Self(Mutex::new(inner.clone())) +fn rename_schema<'a>( + input_schema: &'a SchemaRef, + existing: &[SmartString], + new: &[SmartString], + cached_schema: &CachedSchema, +) -> PolarsResult> { + let mut guard = cached_schema.lock().unwrap(); + if let Some(schema) = &*guard { + return Ok(Cow::Owned(schema.clone())); + } + let mut new_schema = input_schema.iter_fields().collect::>(); + + for (old, new) in existing.iter().zip(new.iter()) { + // The column might be removed due to projection pushdown + // so we only update if we can find it. + if let Some((idx, _, _)) = input_schema.get_full(old) { + new_schema[idx].name = new.as_str().into(); + } } + let schema: SchemaRef = Arc::new(new_schema.into_iter().collect()); + *guard = Some(schema.clone()); + Ok(Cow::Owned(schema)) } diff --git a/crates/polars-plan/src/logical_plan/mod.rs b/crates/polars-plan/src/logical_plan/mod.rs index b56584cef834..84ce1b812e4f 100644 --- a/crates/polars-plan/src/logical_plan/mod.rs +++ b/crates/polars-plan/src/logical_plan/mod.rs @@ -1,6 +1,6 @@ use std::fmt::Debug; use std::path::PathBuf; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use polars_core::prelude::*; use recursive::recursive; @@ -15,11 +15,11 @@ pub(crate) mod anonymous_scan; mod apply; mod builder_dsl; -pub mod builder_functions; mod builder_ir; pub(crate) mod conversion; #[cfg(feature = "debugging")] pub(crate) mod debug; +pub(crate) mod expr_expansion; pub mod expr_ir; mod file_scan; mod format; @@ -29,7 +29,6 @@ pub(crate) mod iterator; mod lit; pub(crate) mod optimizer; pub(crate) mod options; -pub(crate) mod projection; mod projection_expr; #[cfg(feature = "python")] mod pyarrow; @@ -67,68 +66,6 @@ pub enum Context { Default, } -#[derive(Debug)] -pub(crate) struct ErrorStateUnsync { - n_times: usize, - err: PolarsError, -} - -#[derive(Clone)] -pub struct ErrorState(pub(crate) Arc>); - -impl std::fmt::Debug for ErrorState { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let this = self.0.lock().unwrap(); - // Skip over the Arc> and just print the fields we care - // about. Technically this is misleading, but the insides of ErrorState are not - // public, so this only affects authors of polars, not users (and the odds that - // this affects authors is slim) - f.debug_struct("ErrorState") - .field("n_times", &this.n_times) - .field("err", &this.err) - .finish() - } -} - -impl From for ErrorState { - fn from(err: PolarsError) -> Self { - Self(Arc::new(Mutex::new(ErrorStateUnsync { n_times: 0, err }))) - } -} - -impl ErrorState { - fn take(&self) -> PolarsError { - let mut this = self.0.lock().unwrap(); - - let ret_err = if this.n_times == 0 { - this.err.wrap_msg(&|msg| msg.to_owned()) - } else { - this.err.wrap_msg(&|msg| { - let n_times = this.n_times; - - let plural_s; - let was_were; - - if n_times == 1 { - plural_s = ""; - was_were = "was" - } else { - plural_s = "s"; - was_were = "were"; - }; - format!( - "{msg}\n\nLogicalPlan had already failed with the above error; \ - after failure, {n_times} additional operation{plural_s} \ - {was_were} attempted on the LazyFrame", - ) - }) - }; - this.n_times += 1; - - ret_err - } -} - // https://stackoverflow.com/questions/1031076/what-are-projection-and-selection #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub enum DslPlan { @@ -166,17 +103,15 @@ pub enum DslPlan { Select { expr: Vec, input: Arc, - schema: SchemaRef, options: ProjectionOptions, }, /// Groupby aggregation GroupBy { input: Arc, - keys: Arc>, + keys: Vec, aggs: Vec, - schema: SchemaRef, #[cfg_attr(feature = "serde", serde(skip))] - apply: Option>, + apply: Option<(Arc, SchemaRef)>, maintain_order: bool, options: Arc, }, @@ -184,7 +119,6 @@ pub enum DslPlan { Join { input_left: Arc, input_right: Arc, - schema: SchemaRef, left_on: Vec, right_on: Vec, options: Arc, @@ -193,7 +127,6 @@ pub enum DslPlan { HStack { input: Arc, exprs: Vec, - schema: SchemaRef, options: ProjectionOptions, }, /// Remove duplicates from the table @@ -217,7 +150,7 @@ pub enum DslPlan { /// A (User Defined) Function MapFunction { input: Arc, - function: FunctionNode, + function: DslFunction, }, Union { inputs: Vec, @@ -229,17 +162,10 @@ pub enum DslPlan { schema: SchemaRef, options: HConcatOptions, }, - /// Catches errors and throws them later - #[cfg_attr(feature = "serde", serde(skip))] - Error { - input: Arc, - err: ErrorState, - }, /// This allows expressions to access other tables ExtContext { input: Arc, contexts: Vec, - schema: SchemaRef, }, Sink { input: Arc, @@ -261,18 +187,17 @@ impl Clone for DslPlan { Self::Cache { input, id, cache_hits } => Self::Cache { input: input.clone(), id: id.clone(), cache_hits: cache_hits.clone() }, Self::Scan { paths, file_info, predicate, file_options, scan_type } => Self::Scan { paths: paths.clone(), file_info: file_info.clone(), predicate: predicate.clone(), file_options: file_options.clone(), scan_type: scan_type.clone() }, Self::DataFrameScan { df, schema, output_schema, projection, selection } => Self::DataFrameScan { df: df.clone(), schema: schema.clone(), output_schema: output_schema.clone(), projection: projection.clone(), selection: selection.clone() }, - Self::Select { expr, input, schema, options } => Self::Select { expr: expr.clone(), input: input.clone(), schema: schema.clone(), options: options.clone() }, - Self::GroupBy { input, keys, aggs, schema, apply, maintain_order, options } => Self::GroupBy { input: input.clone(), keys: keys.clone(), aggs: aggs.clone(), schema: schema.clone(), apply: apply.clone(), maintain_order: maintain_order.clone(), options: options.clone() }, - Self::Join { input_left, input_right, schema, left_on, right_on, options } => Self::Join { input_left: input_left.clone(), input_right: input_right.clone(), schema: schema.clone(), left_on: left_on.clone(), right_on: right_on.clone(), options: options.clone() }, - Self::HStack { input, exprs, schema, options } => Self::HStack { input: input.clone(), exprs: exprs.clone(), schema: schema.clone(), options: options.clone() }, + Self::Select { expr, input, options } => Self::Select { expr: expr.clone(), input: input.clone(), options: options.clone() }, + Self::GroupBy { input, keys, aggs, apply, maintain_order, options } => Self::GroupBy { input: input.clone(), keys: keys.clone(), aggs: aggs.clone(), apply: apply.clone(), maintain_order: maintain_order.clone(), options: options.clone() }, + Self::Join { input_left, input_right, left_on, right_on, options } => Self::Join { input_left: input_left.clone(), input_right: input_right.clone(), left_on: left_on.clone(), right_on: right_on.clone(), options: options.clone() }, + Self::HStack { input, exprs, options } => Self::HStack { input: input.clone(), exprs: exprs.clone(), options: options.clone() }, Self::Distinct { input, options } => Self::Distinct { input: input.clone(), options: options.clone() }, Self::Sort {input,by_column, slice, sort_options } => Self::Sort { input: input.clone(), by_column: by_column.clone(), slice: slice.clone(), sort_options: sort_options.clone() }, Self::Slice { input, offset, len } => Self::Slice { input: input.clone(), offset: offset.clone(), len: len.clone() }, Self::MapFunction { input, function } => Self::MapFunction { input: input.clone(), function: function.clone() }, Self::Union { inputs, options } => Self::Union { inputs: inputs.clone(), options: options.clone() }, Self::HConcat { inputs, schema, options } => Self::HConcat { inputs: inputs.clone(), schema: schema.clone(), options: options.clone() }, - Self::Error { input, err } => Self::Error { input: input.clone(), err: err.clone() }, - Self::ExtContext { input, contexts, schema } => Self::ExtContext { input: input.clone(), contexts: contexts.clone(), schema: schema.clone() }, + Self::ExtContext { input, contexts, } => Self::ExtContext { input: input.clone(), contexts: contexts.clone() }, Self::Sink { input, payload } => Self::Sink { input: input.clone(), payload: payload.clone() }, } } diff --git a/crates/polars-plan/src/logical_plan/optimizer/mod.rs b/crates/polars-plan/src/logical_plan/optimizer/mod.rs index d64edcadf7df..c465bf6651d0 100644 --- a/crates/polars-plan/src/logical_plan/optimizer/mod.rs +++ b/crates/polars-plan/src/logical_plan/optimizer/mod.rs @@ -94,11 +94,10 @@ pub fn optimize( let opt = StackOptimizer {}; let mut rules: Vec> = Vec::with_capacity(8); + let mut lp_top = to_alp(logical_plan, expr_arena, lp_arena)?; // During debug we check if the optimizations have not modified the final schema. #[cfg(debug_assertions)] - let prev_schema = logical_plan.schema()?.into_owned(); - - let mut lp_top = to_alp(logical_plan, expr_arena, lp_arena)?; + let prev_schema = lp_arena.get(lp_top).schema(lp_arena).into_owned(); // Collect members for optimizations that need it. let mut members = MemberCollector::new(); diff --git a/crates/polars-plan/src/logical_plan/optimizer/projection_pushdown/functions/mod.rs b/crates/polars-plan/src/logical_plan/optimizer/projection_pushdown/functions/mod.rs index d7faf2526a9a..0ff8f51b14d5 100644 --- a/crates/polars-plan/src/logical_plan/optimizer/projection_pushdown/functions/mod.rs +++ b/crates/polars-plan/src/logical_plan/optimizer/projection_pushdown/functions/mod.rs @@ -21,6 +21,7 @@ pub(super) fn process_functions( existing, new, swapping, + schema: _, } => { process_rename( &mut acc_projections, diff --git a/crates/polars-plan/src/logical_plan/schema.rs b/crates/polars-plan/src/logical_plan/schema.rs index d3cb4339acbf..36b5ff6c2dfc 100644 --- a/crates/polars-plan/src/logical_plan/schema.rs +++ b/crates/polars-plan/src/logical_plan/schema.rs @@ -1,5 +1,6 @@ -use std::borrow::Cow; +use std::ops::Deref; use std::path::Path; +use std::sync::Mutex; use arrow::datatypes::ArrowSchemaRef; use polars_core::prelude::*; @@ -11,36 +12,9 @@ use super::hive::HivePartitions; use crate::prelude::*; impl DslPlan { - pub fn schema(&self) -> PolarsResult> { - use DslPlan::*; - match self { - Scan { file_info, .. } => Ok(Cow::Borrowed(&file_info.schema)), - #[cfg(feature = "python")] - PythonScan { options } => Ok(Cow::Borrowed(&options.schema)), - Union { inputs, .. } => inputs[0].schema(), - HConcat { schema, .. } => Ok(Cow::Borrowed(schema)), - Cache { input, .. } => input.schema(), - Sort { input, .. } => input.schema(), - DataFrameScan { schema, .. } => Ok(Cow::Borrowed(schema)), - Filter { input, .. } => input.schema(), - Select { schema, .. } => Ok(Cow::Borrowed(schema)), - GroupBy { schema, .. } => Ok(Cow::Borrowed(schema)), - Join { schema, .. } => Ok(Cow::Borrowed(schema)), - HStack { schema, .. } => Ok(Cow::Borrowed(schema)), - Distinct { input, .. } | Sink { input, .. } => input.schema(), - Slice { input, .. } => input.schema(), - MapFunction { - input, function, .. - } => { - let input_schema = input.schema()?; - match input_schema { - Cow::Owned(schema) => Ok(Cow::Owned(function.schema(&schema)?.into_owned())), - Cow::Borrowed(schema) => function.schema(schema), - } - }, - Error { err, .. } => Err(err.take()), - ExtContext { schema, .. } => Ok(Cow::Borrowed(schema)), - } + pub fn compute_schema(&self) -> PolarsResult { + let (node, lp_arena, _) = self.clone().to_alp()?; + Ok(lp_arena.get(node).schema(&lp_arena).into_owned()) } } @@ -385,3 +359,35 @@ pub(crate) fn det_join_schema( }, } } + +// We don't use an `Arc` because caches should live in different query plans. +// For that reason we have a specialized deep clone. +#[derive(Default)] +pub struct CachedSchema(Mutex>); + +impl AsRef>> for CachedSchema { + fn as_ref(&self) -> &Mutex> { + &self.0 + } +} + +impl Deref for CachedSchema { + type Target = Mutex>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Clone for CachedSchema { + fn clone(&self) -> Self { + let inner = self.0.lock().unwrap(); + Self(Mutex::new(inner.clone())) + } +} + +impl CachedSchema { + pub fn get(&self) -> Option { + self.0.lock().unwrap().clone() + } +} diff --git a/crates/polars-plan/src/logical_plan/tree_format.rs b/crates/polars-plan/src/logical_plan/tree_format.rs index 4d7cc456aab6..353c6481b220 100644 --- a/crates/polars-plan/src/logical_plan/tree_format.rs +++ b/crates/polars-plan/src/logical_plan/tree_format.rs @@ -292,7 +292,6 @@ impl<'a> TreeFmtNode<'a> { NL(h, MapFunction { input, function }) => { ND(wh(h, &format!("{function}")), vec![NL(None, input)]) }, - NL(h, Error { input, err }) => ND(wh(h, &format!("{err:?}")), vec![NL(None, input)]), NL(h, ExtContext { input, .. }) => ND(wh(h, "EXTERNAL_CONTEXT"), vec![NL(None, input)]), NL(h, Sink { input, payload }) => ND( wh( diff --git a/crates/polars/tests/it/lazy/expressions/window.rs b/crates/polars/tests/it/lazy/expressions/window.rs index 4057e16bfbec..b42a8725acdd 100644 --- a/crates/polars/tests/it/lazy/expressions/window.rs +++ b/crates/polars/tests/it/lazy/expressions/window.rs @@ -317,8 +317,7 @@ fn test_window_exprs_in_binary_exprs() -> PolarsResult<()> { .cast(DataType::Int32) .alias("stdized3"), ]) - .sum() - .unwrap(); + .sum(); let df = q.collect()?; diff --git a/docs/src/rust/user-guide/expressions/missing-data.rs b/docs/src/rust/user-guide/expressions/missing-data.rs index 29ad48b622ff..8d78310cb0a9 100644 --- a/docs/src/rust/user-guide/expressions/missing-data.rs +++ b/docs/src/rust/user-guide/expressions/missing-data.rs @@ -81,7 +81,7 @@ fn main() -> Result<(), Box> { .clone() .lazy() .with_columns([col("value").fill_nan(lit(NULL)).alias("value")]) - .mean()? + .mean() .collect()?; println!("{}", &mean_nan_df); // --8<-- [end:nanfill] diff --git a/py-polars/polars/lazyframe/frame.py b/py-polars/polars/lazyframe/frame.py index e720b2d83679..61f92796ae78 100644 --- a/py-polars/polars/lazyframe/frame.py +++ b/py-polars/polars/lazyframe/frame.py @@ -582,7 +582,7 @@ def serialize(self, file: IOBase | str | Path | None = None) -> str | None: >>> lf = pl.LazyFrame({"a": [1, 2, 3]}).sum() >>> json = lf.serialize() >>> json - '{"Select":{"expr":[{"Agg":{"Sum":{"Column":"a"}}}],"input":{"DataFrameScan":{"df":{"columns":[{"name":"a","datatype":"Int64","bit_settings":"","values":[1,2,3]}]},"schema":{"inner":{"a":"Int64"}},"output_schema":null,"projection":null,"selection":null}},"schema":{"inner":{"a":"Int64"}},"options":{"run_parallel":true,"duplicate_check":true}}}' + '{"MapFunction":{"input":{"DataFrameScan":{"df":{"columns":[{"name":"a","datatype":"Int64","bit_settings":"","values":[1,2,3]}]},"schema":{"inner":{"a":"Int64"}},"output_schema":null,"projection":null,"selection":null}},"function":{"Stats":"Sum"}}}' The logical plan can later be deserialized back into a LazyFrame. diff --git a/py-polars/src/lazyframe/mod.rs b/py-polars/src/lazyframe/mod.rs index 64cc8cdc9232..e01882bc2657 100644 --- a/py-polars/src/lazyframe/mod.rs +++ b/py-polars/src/lazyframe/mod.rs @@ -938,58 +938,52 @@ impl PyLazyFrame { ldf.fill_nan(fill_value.inner).into() } - fn min(&self) -> PyResult { + fn min(&self) -> Self { let ldf = self.ldf.clone(); - let out = ldf.min().map_err(PyPolarsErr::from)?; - Ok(out.into()) + let out = ldf.min(); + out.into() } - fn max(&self) -> PyResult { + fn max(&self) -> Self { let ldf = self.ldf.clone(); - let out = ldf.max().map_err(PyPolarsErr::from)?; - Ok(out.into()) + let out = ldf.max(); + out.into() } - fn sum(&self) -> PyResult { + fn sum(&self) -> Self { let ldf = self.ldf.clone(); - let out = ldf.sum().map_err(PyPolarsErr::from)?; - Ok(out.into()) + let out = ldf.sum(); + out.into() } - fn mean(&self) -> PyResult { + fn mean(&self) -> Self { let ldf = self.ldf.clone(); - let out = ldf.mean().map_err(PyPolarsErr::from)?; - Ok(out.into()) + let out = ldf.mean(); + out.into() } - fn std(&self, ddof: u8) -> PyResult { + fn std(&self, ddof: u8) -> Self { let ldf = self.ldf.clone(); - let out = ldf.std(ddof).map_err(PyPolarsErr::from)?; - Ok(out.into()) + let out = ldf.std(ddof); + out.into() } - fn var(&self, ddof: u8) -> PyResult { + fn var(&self, ddof: u8) -> Self { let ldf = self.ldf.clone(); - let out = ldf.var(ddof).map_err(PyPolarsErr::from)?; - Ok(out.into()) + let out = ldf.var(ddof); + out.into() } - fn median(&self) -> PyResult { + fn median(&self) -> Self { let ldf = self.ldf.clone(); - let out = ldf.median().map_err(PyPolarsErr::from)?; - Ok(out.into()) + let out = ldf.median(); + out.into() } - fn quantile( - &self, - quantile: PyExpr, - interpolation: Wrap, - ) -> PyResult { + fn quantile(&self, quantile: PyExpr, interpolation: Wrap) -> Self { let ldf = self.ldf.clone(); - let out = ldf - .quantile(quantile.inner, interpolation.0) - .map_err(PyPolarsErr::from)?; - Ok(out.into()) + let out = ldf.quantile(quantile.inner, interpolation.0); + out.into() } fn explode(&self, column: Vec) -> Self { diff --git a/py-polars/tests/unit/datatypes/test_float.py b/py-polars/tests/unit/datatypes/test_float.py index 5a318fd0015c..eac6c70f92ad 100644 --- a/py-polars/tests/unit/datatypes/test_float.py +++ b/py-polars/tests/unit/datatypes/test_float.py @@ -142,7 +142,7 @@ def test_hash() -> None: assert s.item(2) == s.item(3) # hash(float('-nan')) == hash(float('nan')) -def test_group_by() -> None: +def test_group_by_float() -> None: # Test num_groups_proxy # * -0.0 and 0.0 in same groups # * -nan and nan in same groups diff --git a/py-polars/tests/unit/test_errors.py b/py-polars/tests/unit/test_errors.py index 43b2f6e646ea..721cef2ff6f6 100644 --- a/py-polars/tests/unit/test_errors.py +++ b/py-polars/tests/unit/test_errors.py @@ -692,16 +692,6 @@ def test_error_list_to_array() -> None: ).with_columns(array=pl.col("a").list.to_array(2)) -# https://github.com/pola-rs/polars/issues/8079 -def test_error_lazyframe_not_repeating() -> None: - lf = pl.LazyFrame({"a": 1, "b": range(2)}) - with pytest.raises(pl.ColumnNotFoundError) as exc_info: - lf.select("c").select("d").select("e").collect() - - match = "Error originated just after this operation:" - assert str(exc_info).count(match) == 1 - - def test_raise_not_found_in_simplify_14974() -> None: df = pl.DataFrame() with pytest.raises(pl.ColumnNotFoundError):