From f1133a4f1a0b766d9373016a6f078f4d3c7f71f4 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Tue, 24 Sep 2024 13:26:13 +0200 Subject: [PATCH] fix: Use proper thread pool in cumulative_eval (#18885) --- Cargo.lock | 2 +- crates/polars-lazy/src/dsl/eval.rs | 29 ++++++++++++++++------------- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e83ff50bdbcc..92604f563ed4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3623,7 +3623,7 @@ dependencies = [ [[package]] name = "py-polars" -version = "1.8.0" +version = "1.8.1" dependencies = [ "built", "jemallocator", diff --git a/crates/polars-lazy/src/dsl/eval.rs b/crates/polars-lazy/src/dsl/eval.rs index b25a30240a41..a3e3a97a589a 100644 --- a/crates/polars-lazy/src/dsl/eval.rs +++ b/crates/polars-lazy/src/dsl/eval.rs @@ -1,4 +1,5 @@ use polars_core::prelude::*; +use polars_core::POOL; use polars_expr::{create_physical_expr, ExpressionConversionState}; use rayon::prelude::*; @@ -76,19 +77,21 @@ pub trait ExprEvalExtension: IntoExpr + Sized { }; let avs = if parallel { - (1..c.len() + 1) - .into_par_iter() - .map(|len| { - let s = c.slice(0, len); - if (len - s.null_count()) >= min_periods { - let df = c.clone().into_frame(); - let out = phys_expr.evaluate(&df, &state)?.into_column(); - finish(out) - } else { - Ok(AnyValue::Null) - } - }) - .collect::>>()? + POOL.install(|| { + (1..c.len() + 1) + .into_par_iter() + .map(|len| { + let s = c.slice(0, len); + if (len - s.null_count()) >= min_periods { + let df = c.clone().into_frame(); + let out = phys_expr.evaluate(&df, &state)?.into_column(); + finish(out) + } else { + Ok(AnyValue::Null) + } + }) + .collect::>>() + })? } else { let mut df_container = DataFrame::empty(); (1..c.len() + 1)