diff --git a/ballista/client/src/context.rs b/ballista/client/src/context.rs index c4aef7de3..c48ed33e0 100644 --- a/ballista/client/src/context.rs +++ b/ballista/client/src/context.rs @@ -974,11 +974,11 @@ mod standalone_tests { .unwrap(); let res = df.collect().await.unwrap(); let expected = vec![ - "+---------------------------------+", - "| COVAR(test.id,test.tinyint_col) |", - "+---------------------------------+", - "| 0.28571428571428586 |", - "+---------------------------------+", + "+--------------------------------------+", + "| covar_samp(test.id,test.tinyint_col) |", + "+--------------------------------------+", + "| 0.28571428571428586 |", + "+--------------------------------------+", ]; assert_result_eq(expected, &res); } diff --git a/ballista/executor/src/execution_loop.rs b/ballista/executor/src/execution_loop.rs index 339d8b2df..111120bd2 100644 --- a/ballista/executor/src/execution_loop.rs +++ b/ballista/executor/src/execution_loop.rs @@ -32,6 +32,9 @@ use ballista_core::error::BallistaError; use ballista_core::serde::scheduler::{ExecutorSpecification, PartitionId}; use ballista_core::serde::BallistaCodec; use datafusion::execution::context::TaskContext; +use datafusion::functions_aggregate::covariance::{covar_pop_udaf, covar_samp_udaf}; +use datafusion::functions_aggregate::sum::sum_udaf; +use datafusion::functions_aggregate::variance::var_samp_udaf; use datafusion_proto::logical_plan::AsLogicalPlan; use datafusion_proto::physical_plan::AsExecutionPlan; use futures::FutureExt; @@ -44,7 +47,6 @@ use std::ops::Deref; use std::sync::mpsc::{Receiver, Sender, TryRecvError}; use std::time::{SystemTime, UNIX_EPOCH}; use std::{sync::Arc, time::Duration}; -use datafusion::functions_aggregate::variance::var_samp_udaf; use tonic::transport::Channel; pub async fn poll_loop( @@ -117,7 +119,7 @@ pub async fn poll_loop { Ok(_) => {} Err(e) => { - panic!("Failed to run task: {:?}", e); + warn!("Failed to run task: {:?}", e); } } } @@ -190,7 +192,13 @@ async fn run_received_task