Skip to content

Commit

Permalink
rm builtin
Browse files Browse the repository at this point in the history
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
  • Loading branch information
jayzhan211 committed May 4, 2024
1 parent ebc1d8f commit aa9e800
Show file tree
Hide file tree
Showing 15 changed files with 21 additions and 376 deletions.
1 change: 1 addition & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1901,6 +1901,7 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
let ignore_nulls = null_treatment
.unwrap_or(sqlparser::ast::NullTreatment::RespectNulls)
== NullTreatment::IgnoreNulls;

let (agg_expr, filter, order_by) = match func_def {
AggregateFunctionDefinition::BuiltIn(fun) => {
let physical_sort_exprs = match order_by {
Expand Down
11 changes: 1 addition & 10 deletions datafusion/expr/src/aggregate_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ pub enum AggregateFunction {
Stddev,
/// Standard Deviation (Population)
StddevPop,
/// Covariance (Sample)
Covariance,
/// Covariance (Population)
CovariancePop,
/// Correlation
Expand Down Expand Up @@ -128,7 +126,6 @@ impl AggregateFunction {
VariancePop => "VAR_POP",
Stddev => "STDDEV",
StddevPop => "STDDEV_POP",
Covariance => "COVAR",
CovariancePop => "COVAR_POP",
Correlation => "CORR",
RegrSlope => "REGR_SLOPE",
Expand Down Expand Up @@ -184,9 +181,7 @@ impl FromStr for AggregateFunction {
"string_agg" => AggregateFunction::StringAgg,
// statistical
"corr" => AggregateFunction::Correlation,
"covar" => AggregateFunction::Covariance,
"covar_pop" => AggregateFunction::CovariancePop,
"covar_samp" => AggregateFunction::Covariance,
"stddev" => AggregateFunction::Stddev,
"stddev_pop" => AggregateFunction::StddevPop,
"stddev_samp" => AggregateFunction::Stddev,
Expand Down Expand Up @@ -260,9 +255,6 @@ impl AggregateFunction {
AggregateFunction::VariancePop => {
variance_return_type(&coerced_data_types[0])
}
AggregateFunction::Covariance => {
covariance_return_type(&coerced_data_types[0])
}
AggregateFunction::CovariancePop => {
covariance_return_type(&coerced_data_types[0])
}
Expand Down Expand Up @@ -357,8 +349,7 @@ impl AggregateFunction {
Signature::uniform(1, NUMERICS.to_vec(), Volatility::Immutable)
}
AggregateFunction::NthValue => Signature::any(2, Volatility::Immutable),
AggregateFunction::Covariance
| AggregateFunction::CovariancePop
AggregateFunction::CovariancePop
| AggregateFunction::Correlation
| AggregateFunction::RegrSlope
| AggregateFunction::RegrIntercept
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/type_coercion/aggregates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ pub fn coerce_types(
}
Ok(vec![Float64, Float64])
}
AggregateFunction::Covariance | AggregateFunction::CovariancePop => {
AggregateFunction::CovariancePop => {
if !is_covariance_support_arg_type(&input_types[0]) {
return plan_err!(
"The function {:?} does not support inputs of type {:?}.",
Expand Down
21 changes: 5 additions & 16 deletions datafusion/functions-aggregate/src/covariance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ use datafusion_common::{
ScalarValue,
};
use datafusion_expr::{
function::AccumulatorArgs, type_coercion::aggregates::NUMERICS, utils::format_state_name, Accumulator, AggregateUDFImpl, Signature, Volatility
function::AccumulatorArgs, type_coercion::aggregates::NUMERICS,
utils::format_state_name, Accumulator, AggregateUDFImpl, Signature, Volatility,
};
use datafusion_physical_expr_common::aggregate::stats::StatsType;

Expand Down Expand Up @@ -100,21 +101,9 @@ impl AggregateUDFImpl for CovarianceSample {
_ordering_fields: Vec<Field>,
) -> Result<Vec<Field>> {
Ok(vec![
Field::new(
format_state_name(name, "count"),
DataType::UInt64,
true,
),
Field::new(
format_state_name(name, "mean1"),
DataType::Float64,
true,
),
Field::new(
format_state_name(name, "mean2"),
DataType::Float64,
true,
),
Field::new(format_state_name(name, "count"), DataType::UInt64, true),
Field::new(format_state_name(name, "mean1"), DataType::Float64, true),
Field::new(format_state_name(name, "mean2"), DataType::Float64, true),
Field::new(
format_state_name(name, "algo_const"),
DataType::Float64,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions-aggregate/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ use std::sync::Arc;

/// Fluent-style API for creating `Expr`s
pub mod expr_fn {
pub use super::first_last::first_value;
pub use super::covariance::covar_samp;
pub use super::first_last::first_value;
}

/// Registers all enabled packages with a [`FunctionRegistry`]
Expand Down
154 changes: 2 additions & 152 deletions datafusion/physical-expr/src/aggregate/build_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,6 @@ pub fn create_aggregate_expr(
(AggregateFunction::VariancePop, true) => {
return not_impl_err!("VAR_POP(DISTINCT) aggregations are not available");
}
(AggregateFunction::Covariance, false) => Arc::new(expressions::Covariance::new(
input_phy_exprs[0].clone(),
input_phy_exprs[1].clone(),
name,
data_type,
)),
(AggregateFunction::Covariance, true) => {
return not_impl_err!("COVAR(DISTINCT) aggregations are not available");
}
(AggregateFunction::CovariancePop, false) => {
Arc::new(expressions::CovariancePop::new(
input_phy_exprs[0].clone(),
Expand Down Expand Up @@ -428,8 +419,8 @@ mod tests {

use crate::expressions::{
try_cast, ApproxDistinct, ApproxMedian, ApproxPercentileCont, ArrayAgg, Avg,
BitAnd, BitOr, BitXor, BoolAnd, BoolOr, Correlation, Count, Covariance,
DistinctArrayAgg, DistinctCount, Max, Min, Stddev, Sum, Variance,
BitAnd, BitOr, BitXor, BoolAnd, BoolOr, Count, DistinctArrayAgg, DistinctCount,
Max, Min, Stddev, Sum, Variance,
};

use super::*;
Expand Down Expand Up @@ -950,147 +941,6 @@ mod tests {
Ok(())
}

#[test]
fn test_covar_expr() -> Result<()> {
let funcs = vec![AggregateFunction::Covariance];
let data_types = vec![
DataType::UInt32,
DataType::UInt64,
DataType::Int32,
DataType::Int64,
DataType::Float32,
DataType::Float64,
];
for fun in funcs {
for data_type in &data_types {
let input_schema = Schema::new(vec![
Field::new("c1", data_type.clone(), true),
Field::new("c2", data_type.clone(), true),
]);
let input_phy_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(
expressions::Column::new_with_schema("c1", &input_schema)
.unwrap(),
),
Arc::new(
expressions::Column::new_with_schema("c2", &input_schema)
.unwrap(),
),
];
let result_agg_phy_exprs = create_physical_agg_expr_for_test(
&fun,
false,
&input_phy_exprs[0..2],
&input_schema,
"c1",
)?;
if fun == AggregateFunction::Covariance {
assert!(result_agg_phy_exprs.as_any().is::<Covariance>());
assert_eq!("c1", result_agg_phy_exprs.name());
assert_eq!(
Field::new("c1", DataType::Float64, true),
result_agg_phy_exprs.field().unwrap()
)
}
}
}
Ok(())
}

#[test]
fn test_covar_pop_expr() -> Result<()> {
let funcs = vec![AggregateFunction::CovariancePop];
let data_types = vec![
DataType::UInt32,
DataType::UInt64,
DataType::Int32,
DataType::Int64,
DataType::Float32,
DataType::Float64,
];
for fun in funcs {
for data_type in &data_types {
let input_schema = Schema::new(vec![
Field::new("c1", data_type.clone(), true),
Field::new("c2", data_type.clone(), true),
]);
let input_phy_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(
expressions::Column::new_with_schema("c1", &input_schema)
.unwrap(),
),
Arc::new(
expressions::Column::new_with_schema("c2", &input_schema)
.unwrap(),
),
];
let result_agg_phy_exprs = create_physical_agg_expr_for_test(
&fun,
false,
&input_phy_exprs[0..2],
&input_schema,
"c1",
)?;
if fun == AggregateFunction::Covariance {
assert!(result_agg_phy_exprs.as_any().is::<Covariance>());
assert_eq!("c1", result_agg_phy_exprs.name());
assert_eq!(
Field::new("c1", DataType::Float64, true),
result_agg_phy_exprs.field().unwrap()
)
}
}
}
Ok(())
}

#[test]
fn test_corr_expr() -> Result<()> {
let funcs = vec![AggregateFunction::Correlation];
let data_types = vec![
DataType::UInt32,
DataType::UInt64,
DataType::Int32,
DataType::Int64,
DataType::Float32,
DataType::Float64,
];
for fun in funcs {
for data_type in &data_types {
let input_schema = Schema::new(vec![
Field::new("c1", data_type.clone(), true),
Field::new("c2", data_type.clone(), true),
]);
let input_phy_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(
expressions::Column::new_with_schema("c1", &input_schema)
.unwrap(),
),
Arc::new(
expressions::Column::new_with_schema("c2", &input_schema)
.unwrap(),
),
];
let result_agg_phy_exprs = create_physical_agg_expr_for_test(
&fun,
false,
&input_phy_exprs[0..2],
&input_schema,
"c1",
)?;
if fun == AggregateFunction::Covariance {
assert!(result_agg_phy_exprs.as_any().is::<Correlation>());
assert_eq!("c1", result_agg_phy_exprs.name());
assert_eq!(
Field::new("c1", DataType::Float64, true),
result_agg_phy_exprs.field().unwrap()
)
}
}
}
Ok(())
}

#[test]
fn test_median_expr() -> Result<()> {
let funcs = vec![AggregateFunction::ApproxMedian];
Expand Down
Loading

0 comments on commit aa9e800

Please sign in to comment.