diff --git a/datafusion/src/error.rs b/datafusion/src/error.rs index b7a8f45b2e67..248f24350356 100644 --- a/datafusion/src/error.rs +++ b/datafusion/src/error.rs @@ -72,15 +72,6 @@ pub enum DataFusionError { External(GenericError), } -impl DataFusionError { - /// Wraps this [DataFusionError] as an [arrow::error::ArrowError]. - /// - /// TODO this can be removed in favor if the conversion below - pub fn into_arrow_external_error(self) -> ArrowError { - ArrowError::from_external_error(Box::new(self)) - } -} - impl From for DataFusionError { fn from(e: io::Error) -> Self { DataFusionError::IoError(e) diff --git a/datafusion/src/physical_plan/cross_join.rs b/datafusion/src/physical_plan/cross_join.rs index 087507e1dece..48301f0916fe 100644 --- a/datafusion/src/physical_plan/cross_join.rs +++ b/datafusion/src/physical_plan/cross_join.rs @@ -331,8 +331,7 @@ fn build_batch( let scalar = ScalarValue::try_from_array(arr, left_index)?; Ok(scalar.to_array_of_size(batch.num_rows())) }) - .collect::>>() - .map_err(|x| x.into_arrow_external_error())?; + .collect::>>()?; RecordBatch::try_new( Arc::new(schema.clone()), diff --git a/datafusion/src/physical_plan/filter.rs b/datafusion/src/physical_plan/filter.rs index a071d0e709b7..a48d11236dc8 100644 --- a/datafusion/src/physical_plan/filter.rs +++ b/datafusion/src/physical_plan/filter.rs @@ -176,7 +176,7 @@ fn batch_filter( predicate .evaluate(batch) .map(|v| v.into_array(batch.num_rows())) - .map_err(DataFusionError::into_arrow_external_error) + .map_err(DataFusionError::into) .and_then(|array| { array .as_any() @@ -185,7 +185,7 @@ fn batch_filter( DataFusionError::Internal( "Filter predicate evaluated to non-boolean value".to_string(), ) - .into_arrow_external_error() + .into() }) // apply filter array to record batch .and_then(|filter_array| filter_record_batch(batch, filter_array)) diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs index 8074ae36ce05..2b1a59efb653 100644 --- a/datafusion/src/physical_plan/hash_aggregate.rs +++ b/datafusion/src/physical_plan/hash_aggregate.rs @@ -403,8 +403,7 @@ fn group_aggregate_batch( } // 1.2 Need to create new entry None => { - let accumulator_set = create_accumulators(aggr_expr) - .map_err(DataFusionError::into_arrow_external_error)?; + let accumulator_set = create_accumulators(aggr_expr)?; // Copy group values out of arrays into `ScalarValue`s let group_by_values = group_values @@ -516,8 +515,7 @@ async fn compute_grouped_hash_aggregate( // Assume create_schema() always put group columns in front of aggr columns, we set // col_idx_base to group expression count. let aggregate_expressions = - aggregate_expressions(&aggr_expr, &mode, group_expr.len()) - .map_err(DataFusionError::into_arrow_external_error)?; + aggregate_expressions(&aggr_expr, &mode, group_expr.len())?; let random_state = RandomState::new(); @@ -535,8 +533,7 @@ async fn compute_grouped_hash_aggregate( batch, accumulators, &aggregate_expressions, - ) - .map_err(DataFusionError::into_arrow_external_error)?; + )?; timer.done(); } @@ -754,10 +751,8 @@ async fn compute_hash_aggregate( elapsed_compute: metrics::Time, ) -> ArrowResult { let timer = elapsed_compute.timer(); - let mut accumulators = create_accumulators(&aggr_expr) - .map_err(DataFusionError::into_arrow_external_error)?; - let expressions = aggregate_expressions(&aggr_expr, &mode, 0) - .map_err(DataFusionError::into_arrow_external_error)?; + let mut accumulators = create_accumulators(&aggr_expr)?; + let expressions = aggregate_expressions(&aggr_expr, &mode, 0)?; let expressions = Arc::new(expressions); timer.done(); @@ -766,16 +761,14 @@ async fn compute_hash_aggregate( while let Some(batch) = input.next().await { let batch = batch?; let timer = elapsed_compute.timer(); - aggregate_batch(&mode, &batch, &mut accumulators, &expressions) - .map_err(DataFusionError::into_arrow_external_error)?; + aggregate_batch(&mode, &batch, &mut accumulators, &expressions)?; timer.done(); } // 2. convert values to a record batch let timer = elapsed_compute.timer(); let batch = finalize_aggregation(&accumulators, &mode) - .map(|columns| RecordBatch::try_new(schema.clone(), columns)) - .map_err(DataFusionError::into_arrow_external_error)?; + .map(|columns| RecordBatch::try_new(schema.clone(), columns))?; timer.done(); batch } @@ -904,9 +897,7 @@ fn create_batch_from_map( match mode { AggregateMode::Partial => { for acc in accs.iter() { - let state = acc - .state() - .map_err(DataFusionError::into_arrow_external_error)?; + let state = acc.state()?; acc_data_types.push(state.len()); } } @@ -924,8 +915,7 @@ fn create_batch_from_map( .map(|group_state| group_state.group_by_values[i].clone()), ) }) - .collect::>>() - .map_err(|x| x.into_arrow_external_error())?; + .collect::>>()?; // add state / evaluated arrays for (x, &state_len) in acc_data_types.iter().enumerate() { @@ -937,8 +927,7 @@ fn create_batch_from_map( let x = group_state.accumulator_set[x].state().unwrap(); x[y].clone() }), - ) - .map_err(DataFusionError::into_arrow_external_error)?; + )?; columns.push(res); } @@ -947,8 +936,7 @@ fn create_batch_from_map( accumulators.group_states.iter().map(|group_state| { group_state.accumulator_set[x].evaluate().unwrap() }), - ) - .map_err(DataFusionError::into_arrow_external_error)?; + )?; columns.push(res); } } diff --git a/datafusion/src/physical_plan/projection.rs b/datafusion/src/physical_plan/projection.rs index 6d9d5876b986..4d0cc61c99e2 100644 --- a/datafusion/src/physical_plan/projection.rs +++ b/datafusion/src/physical_plan/projection.rs @@ -236,15 +236,14 @@ impl ProjectionStream { fn batch_project(&self, batch: &RecordBatch) -> ArrowResult { // records time on drop let _timer = self.baseline_metrics.elapsed_compute().timer(); - self.expr + let arrays = self + .expr .iter() .map(|expr| expr.evaluate(batch)) .map(|r| r.map(|v| v.into_array(batch.num_rows()))) - .collect::>>() - .map_or_else( - |e| Err(DataFusionError::into_arrow_external_error(e)), - |arrays| RecordBatch::try_new(self.schema.clone(), arrays), - ) + .collect::>>()?; + + RecordBatch::try_new(self.schema.clone(), arrays) } } diff --git a/datafusion/src/physical_plan/repartition.rs b/datafusion/src/physical_plan/repartition.rs index d69ecbd996be..746075429a45 100644 --- a/datafusion/src/physical_plan/repartition.rs +++ b/datafusion/src/physical_plan/repartition.rs @@ -416,7 +416,7 @@ impl RepartitionExec { Err(e) => { for (_, tx) in txs { let err = DataFusionError::Execution(format!("Join Error: {}", e)); - let err = Err(err.into_arrow_external_error()); + let err = Err(err.into()); tx.send(Some(err)).ok(); } } @@ -425,7 +425,7 @@ impl RepartitionExec { for (_, tx) in txs { // wrap it because need to send error to all output partitions let err = DataFusionError::Execution(e.to_string()); - let err = Err(err.into_arrow_external_error()); + let err = Err(err.into()); tx.send(Some(err)).ok(); } } diff --git a/datafusion/src/physical_plan/sorts/sort.rs b/datafusion/src/physical_plan/sorts/sort.rs index 5ea5a72581dc..e09146499c26 100644 --- a/datafusion/src/physical_plan/sorts/sort.rs +++ b/datafusion/src/physical_plan/sorts/sort.rs @@ -634,8 +634,7 @@ fn sort_batch( &expr .iter() .map(|e| e.evaluate_to_sort_column(&batch)) - .collect::>>() - .map_err(DataFusionError::into_arrow_external_error)?, + .collect::>>()?, None, )?; diff --git a/datafusion/src/physical_plan/windows/window_agg_exec.rs b/datafusion/src/physical_plan/windows/window_agg_exec.rs index b86ac1b02385..491e0ebf45f8 100644 --- a/datafusion/src/physical_plan/windows/window_agg_exec.rs +++ b/datafusion/src/physical_plan/windows/window_agg_exec.rs @@ -273,9 +273,7 @@ impl WindowAggStream { elapsed_compute: crate::physical_plan::metrics::Time, ) -> ArrowResult { let input_schema = input.schema(); - let batches = common::collect(input) - .await - .map_err(DataFusionError::into_arrow_external_error)?; + let batches = common::collect(input).await?; // record compute time on drop let _timer = elapsed_compute.timer(); @@ -283,8 +281,7 @@ impl WindowAggStream { let batch = common::combine_batches(&batches, input_schema.clone())?; if let Some(batch) = batch { // calculate window cols - let mut columns = compute_window_aggregates(window_expr, &batch) - .map_err(DataFusionError::into_arrow_external_error)?; + let mut columns = compute_window_aggregates(window_expr, &batch)?; // combine with the original cols // note the setup of window aggregates is that they newly calculated window // expressions are always prepended to the columns