Skip to content

Commit

Permalink
Remove DataFusionError::into_arrow_external_error
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jan 22, 2022
1 parent 15af24a commit 342673b
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 51 deletions.
9 changes: 0 additions & 9 deletions datafusion/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,6 @@ pub enum DataFusionError {
External(GenericError),
}

impl DataFusionError {
/// Wraps this [DataFusionError] as an [arrow::error::ArrowError].
///
/// TODO this can be removed in favor if the conversion below
pub fn into_arrow_external_error(self) -> ArrowError {
ArrowError::from_external_error(Box::new(self))
}
}

impl From<io::Error> for DataFusionError {
fn from(e: io::Error) -> Self {
DataFusionError::IoError(e)
Expand Down
3 changes: 1 addition & 2 deletions datafusion/src/physical_plan/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,7 @@ fn build_batch(
let scalar = ScalarValue::try_from_array(arr, left_index)?;
Ok(scalar.to_array_of_size(batch.num_rows()))
})
.collect::<Result<Vec<_>>>()
.map_err(|x| x.into_arrow_external_error())?;
.collect::<Result<Vec<_>>>()?;

RecordBatch::try_new(
Arc::new(schema.clone()),
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/physical_plan/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ fn batch_filter(
predicate
.evaluate(batch)
.map(|v| v.into_array(batch.num_rows()))
.map_err(DataFusionError::into_arrow_external_error)
.map_err(DataFusionError::into)
.and_then(|array| {
array
.as_any()
Expand All @@ -185,7 +185,7 @@ fn batch_filter(
DataFusionError::Internal(
"Filter predicate evaluated to non-boolean value".to_string(),
)
.into_arrow_external_error()
.into()
})
// apply filter array to record batch
.and_then(|filter_array| filter_record_batch(batch, filter_array))
Expand Down
34 changes: 11 additions & 23 deletions datafusion/src/physical_plan/hash_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,8 +403,7 @@ fn group_aggregate_batch(
}
// 1.2 Need to create new entry
None => {
let accumulator_set = create_accumulators(aggr_expr)
.map_err(DataFusionError::into_arrow_external_error)?;
let accumulator_set = create_accumulators(aggr_expr)?;

// Copy group values out of arrays into `ScalarValue`s
let group_by_values = group_values
Expand Down Expand Up @@ -516,8 +515,7 @@ async fn compute_grouped_hash_aggregate(
// Assume create_schema() always put group columns in front of aggr columns, we set
// col_idx_base to group expression count.
let aggregate_expressions =
aggregate_expressions(&aggr_expr, &mode, group_expr.len())
.map_err(DataFusionError::into_arrow_external_error)?;
aggregate_expressions(&aggr_expr, &mode, group_expr.len())?;

let random_state = RandomState::new();

Expand All @@ -535,8 +533,7 @@ async fn compute_grouped_hash_aggregate(
batch,
accumulators,
&aggregate_expressions,
)
.map_err(DataFusionError::into_arrow_external_error)?;
)?;
timer.done();
}

Expand Down Expand Up @@ -754,10 +751,8 @@ async fn compute_hash_aggregate(
elapsed_compute: metrics::Time,
) -> ArrowResult<RecordBatch> {
let timer = elapsed_compute.timer();
let mut accumulators = create_accumulators(&aggr_expr)
.map_err(DataFusionError::into_arrow_external_error)?;
let expressions = aggregate_expressions(&aggr_expr, &mode, 0)
.map_err(DataFusionError::into_arrow_external_error)?;
let mut accumulators = create_accumulators(&aggr_expr)?;
let expressions = aggregate_expressions(&aggr_expr, &mode, 0)?;
let expressions = Arc::new(expressions);
timer.done();

Expand All @@ -766,16 +761,14 @@ async fn compute_hash_aggregate(
while let Some(batch) = input.next().await {
let batch = batch?;
let timer = elapsed_compute.timer();
aggregate_batch(&mode, &batch, &mut accumulators, &expressions)
.map_err(DataFusionError::into_arrow_external_error)?;
aggregate_batch(&mode, &batch, &mut accumulators, &expressions)?;
timer.done();
}

// 2. convert values to a record batch
let timer = elapsed_compute.timer();
let batch = finalize_aggregation(&accumulators, &mode)
.map(|columns| RecordBatch::try_new(schema.clone(), columns))
.map_err(DataFusionError::into_arrow_external_error)?;
.map(|columns| RecordBatch::try_new(schema.clone(), columns))?;
timer.done();
batch
}
Expand Down Expand Up @@ -904,9 +897,7 @@ fn create_batch_from_map(
match mode {
AggregateMode::Partial => {
for acc in accs.iter() {
let state = acc
.state()
.map_err(DataFusionError::into_arrow_external_error)?;
let state = acc.state()?;
acc_data_types.push(state.len());
}
}
Expand All @@ -924,8 +915,7 @@ fn create_batch_from_map(
.map(|group_state| group_state.group_by_values[i].clone()),
)
})
.collect::<Result<Vec<_>>>()
.map_err(|x| x.into_arrow_external_error())?;
.collect::<Result<Vec<_>>>()?;

// add state / evaluated arrays
for (x, &state_len) in acc_data_types.iter().enumerate() {
Expand All @@ -937,8 +927,7 @@ fn create_batch_from_map(
let x = group_state.accumulator_set[x].state().unwrap();
x[y].clone()
}),
)
.map_err(DataFusionError::into_arrow_external_error)?;
)?;

columns.push(res);
}
Expand All @@ -947,8 +936,7 @@ fn create_batch_from_map(
accumulators.group_states.iter().map(|group_state| {
group_state.accumulator_set[x].evaluate().unwrap()
}),
)
.map_err(DataFusionError::into_arrow_external_error)?;
)?;
columns.push(res);
}
}
Expand Down
11 changes: 5 additions & 6 deletions datafusion/src/physical_plan/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,15 +236,14 @@ impl ProjectionStream {
fn batch_project(&self, batch: &RecordBatch) -> ArrowResult<RecordBatch> {
// records time on drop
let _timer = self.baseline_metrics.elapsed_compute().timer();
self.expr
let arrays = self
.expr
.iter()
.map(|expr| expr.evaluate(batch))
.map(|r| r.map(|v| v.into_array(batch.num_rows())))
.collect::<Result<Vec<_>>>()
.map_or_else(
|e| Err(DataFusionError::into_arrow_external_error(e)),
|arrays| RecordBatch::try_new(self.schema.clone(), arrays),
)
.collect::<Result<Vec<_>>>()?;

RecordBatch::try_new(self.schema.clone(), arrays)
}
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/physical_plan/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ impl RepartitionExec {
Err(e) => {
for (_, tx) in txs {
let err = DataFusionError::Execution(format!("Join Error: {}", e));
let err = Err(err.into_arrow_external_error());
let err = Err(err.into());
tx.send(Some(err)).ok();
}
}
Expand All @@ -425,7 +425,7 @@ impl RepartitionExec {
for (_, tx) in txs {
// wrap it because need to send error to all output partitions
let err = DataFusionError::Execution(e.to_string());
let err = Err(err.into_arrow_external_error());
let err = Err(err.into());
tx.send(Some(err)).ok();
}
}
Expand Down
3 changes: 1 addition & 2 deletions datafusion/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -634,8 +634,7 @@ fn sort_batch(
&expr
.iter()
.map(|e| e.evaluate_to_sort_column(&batch))
.collect::<Result<Vec<SortColumn>>>()
.map_err(DataFusionError::into_arrow_external_error)?,
.collect::<Result<Vec<SortColumn>>>()?,
None,
)?;

Expand Down
7 changes: 2 additions & 5 deletions datafusion/src/physical_plan/windows/window_agg_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,18 +273,15 @@ impl WindowAggStream {
elapsed_compute: crate::physical_plan::metrics::Time,
) -> ArrowResult<RecordBatch> {
let input_schema = input.schema();
let batches = common::collect(input)
.await
.map_err(DataFusionError::into_arrow_external_error)?;
let batches = common::collect(input).await?;

// record compute time on drop
let _timer = elapsed_compute.timer();

let batch = common::combine_batches(&batches, input_schema.clone())?;
if let Some(batch) = batch {
// calculate window cols
let mut columns = compute_window_aggregates(window_expr, &batch)
.map_err(DataFusionError::into_arrow_external_error)?;
let mut columns = compute_window_aggregates(window_expr, &batch)?;
// combine with the original cols
// note the setup of window aggregates is that they newly calculated window
// expressions are always prepended to the columns
Expand Down

0 comments on commit 342673b

Please sign in to comment.