Skip to content

Commit

Permalink
fix test failures
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Sep 13, 2024
1 parent 992eb5b commit e7bf32d
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 10 deletions.
10 changes: 5 additions & 5 deletions ballista/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -974,11 +974,11 @@ mod standalone_tests {
.unwrap();
let res = df.collect().await.unwrap();
let expected = vec![
"+---------------------------------+",
"| COVAR(test.id,test.tinyint_col) |",
"+---------------------------------+",
"| 0.28571428571428586 |",
"+---------------------------------+",
"+--------------------------------------+",
"| covar_samp(test.id,test.tinyint_col) |",
"+--------------------------------------+",
"| 0.28571428571428586 |",
"+--------------------------------------+",
];
assert_result_eq(expected, &res);
}
Expand Down
12 changes: 10 additions & 2 deletions ballista/executor/src/execution_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ use ballista_core::error::BallistaError;
use ballista_core::serde::scheduler::{ExecutorSpecification, PartitionId};
use ballista_core::serde::BallistaCodec;
use datafusion::execution::context::TaskContext;
use datafusion::functions_aggregate::covariance::{covar_pop_udaf, covar_samp_udaf};
use datafusion::functions_aggregate::sum::sum_udaf;
use datafusion::functions_aggregate::variance::var_samp_udaf;
use datafusion_proto::logical_plan::AsLogicalPlan;
use datafusion_proto::physical_plan::AsExecutionPlan;
use futures::FutureExt;
Expand All @@ -44,7 +47,6 @@ use std::ops::Deref;
use std::sync::mpsc::{Receiver, Sender, TryRecvError};
use std::time::{SystemTime, UNIX_EPOCH};
use std::{sync::Arc, time::Duration};
use datafusion::functions_aggregate::variance::var_samp_udaf;
use tonic::transport::Channel;

pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
Expand Down Expand Up @@ -117,7 +119,7 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
{
Ok(_) => {}
Err(e) => {
panic!("Failed to run task: {:?}", e);
warn!("Failed to run task: {:?}", e);
}
}
}
Expand Down Expand Up @@ -190,7 +192,13 @@ async fn run_received_task<T: 'static + AsLogicalPlan, U: 'static + AsExecutionP
for agg_func in executor.aggregate_functions.clone() {
task_aggregate_functions.insert(agg_func.0, agg_func.1);
}
// since DataFusion 38 some internal functions were converted to UDAF, so
// we have to register them manually
task_aggregate_functions.insert("var".to_string(), var_samp_udaf());
task_aggregate_functions.insert("covar_samp".to_string(), covar_samp_udaf());
task_aggregate_functions.insert("covar_pop".to_string(), covar_pop_udaf());
task_aggregate_functions.insert("SUM".to_string(), sum_udaf());

for window_func in executor.window_functions.clone() {
task_window_functions.insert(window_func.0, window_func.1);
}
Expand Down
3 changes: 2 additions & 1 deletion ballista/scheduler/src/scheduler_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,8 @@ mod test {
use std::sync::Arc;

use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::logical_expr::{col, sum, LogicalPlan};
use datafusion::functions_aggregate::sum::sum;
use datafusion::logical_expr::{col, LogicalPlan};

use datafusion::test_util::scan_empty;
use datafusion_proto::protobuf::LogicalPlanNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,8 @@ mod tests {
use ballista_core::config::TaskSchedulingPolicy;
use ballista_core::error::Result;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::logical_expr::{col, sum, LogicalPlan};
use datafusion::functions_aggregate::sum::sum;
use datafusion::logical_expr::{col, LogicalPlan};
use datafusion::test_util::scan_empty_with_partitions;
use std::sync::Arc;
use std::time::Duration;
Expand Down
3 changes: 2 additions & 1 deletion ballista/scheduler/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::common::DataFusionError;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::execution::context::{SessionConfig, SessionContext, SessionState};
use datafusion::functions_aggregate::sum::sum;
use datafusion::logical_expr::expr::Sort;
use datafusion::logical_expr::{Expr, LogicalPlan};
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::{col, count, sum, CsvReadOptions, JoinType};
use datafusion::prelude::{col, count, CsvReadOptions, JoinType};
use datafusion::test_util::scan_empty;

use crate::cluster::BallistaCluster;
Expand Down

0 comments on commit e7bf32d

Please sign in to comment.