Skip to content

Commit

Permalink
fix: properly satisfy enforcesorting rules (#2814)
Browse files Browse the repository at this point in the history
closes #2811
  • Loading branch information
universalmind303 authored Mar 21, 2024
1 parent 668bf36 commit fd58886
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 14 deletions.
87 changes: 82 additions & 5 deletions crates/datafusion_ext/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use std::task::{Context, Poll};

use datafusion::arrow::datatypes::{Schema, SchemaRef};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::Result;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::TaskContext;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_expr::{PhysicalSortExpr, PhysicalSortRequirement};
use datafusion::physical_plan::metrics::{
BaselineMetrics,
ExecutionPlanMetricsSet,
Expand Down Expand Up @@ -199,7 +199,7 @@ impl DataSourceMetricsOptsType for WriteOnlyDataSourceMetricsOptsType {
/// able to modify directly to record metrics (e.g. Delta). Otherwise, this
/// should be skipped and metrics collection should be added to the execution
/// plan directly.
#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct DataSourceMetricsExecAdapter<T: DataSourceMetricsOptsType> {
child: Arc<dyn ExecutionPlan>,
metrics: ExecutionPlanMetricsSet,
Expand All @@ -223,7 +223,15 @@ impl<T: DataSourceMetricsOptsType> DataSourceMetricsExecAdapter<T> {
}
}

impl<T: DataSourceMetricsOptsType> ExecutionPlan for DataSourceMetricsExecAdapter<T> {
impl<T: DataSourceMetricsOptsType> Debug for DataSourceMetricsExecAdapter<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct(format!("{}DataSourceMetricsExecAdapter", T::DISPLAY_NAME_PREFIX).as_str())
.field("child", &self.child)
.finish()
}
}

impl ExecutionPlan for WriteOnlyDataSourceMetricsExecAdapter {
fn as_any(&self) -> &dyn Any {
self
}
Expand All @@ -248,6 +256,75 @@ impl<T: DataSourceMetricsOptsType> ExecutionPlan for DataSourceMetricsExecAdapte
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
if children.is_empty() {
return Err(DataFusionError::Plan(
"DataSourceMetricsExecAdapter wrong number of children".to_string(),
));
}
Ok(Arc::new(Self::new(children[0].clone())))
}

fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let stream = self.child.execute(partition, context)?;
Ok(Box::pin(BoxedStreamAdapater::new(
stream,
partition,
&self.metrics,
WriteOnlyDataSourceMetricsOptsType::OPTS,
)))
}

fn statistics(&self) -> Result<Statistics> {
self.child.statistics()
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
// the input ordering must match the output ordering of the source
// without this, the sort may get removed by the optimizer
vec![self
.output_ordering()
.map(PhysicalSortRequirement::from_sort_exprs)]
}
}

impl ExecutionPlan for ReadOnlyDataSourceMetricsExecAdapter {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> Arc<Schema> {
self.child.schema()
}

fn output_partitioning(&self) -> Partitioning {
self.child.output_partitioning()
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
self.child.output_ordering()
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.child.clone()]
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
if children.is_empty() {
return Err(DataFusionError::Plan(
"DataSourceMetricsExecAdapter wrong number of children".to_string(),
));
}
Ok(Arc::new(Self::new(children[0].clone())))
}

Expand All @@ -261,7 +338,7 @@ impl<T: DataSourceMetricsOptsType> ExecutionPlan for DataSourceMetricsExecAdapte
stream,
partition,
&self.metrics,
T::OPTS,
ReadOnlyDataSourceMetricsOptsType::OPTS,
)))
}

Expand Down
9 changes: 7 additions & 2 deletions crates/datafusion_ext/src/runtime/runtime_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::fmt;
use std::sync::Arc;

use datafusion::arrow::datatypes::Schema;
use datafusion::error::Result as DataFusionResult;
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::execution::TaskContext;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::metrics::MetricsSet;
Expand Down Expand Up @@ -58,6 +58,11 @@ impl ExecutionPlan for RuntimeGroupExec {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
if children.len() != 1 {
return Err(DataFusionError::Plan(
"RuntimeGroupExec wrong number of children".to_string(),
));
}
Ok(Arc::new(Self {
preference: self.preference,
child: children[0].clone(),
Expand All @@ -77,7 +82,7 @@ impl ExecutionPlan for RuntimeGroupExec {
}

fn metrics(&self) -> Option<MetricsSet> {
None
self.child.metrics()
}
}

Expand Down
46 changes: 39 additions & 7 deletions crates/sqlexec/src/planner/physical_plan/copy_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,20 @@ use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::execution::TaskContext;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_expr::{PhysicalSortExpr, PhysicalSortRequirement};
use datafusion::physical_plan::insert::DataSink;
use datafusion::physical_plan::metrics::MetricsSet;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
execute_stream,
DisplayAs,
DisplayFormatType,
Distribution,
ExecutionPlan,
Partitioning,
SendableRecordBatchStream,
Statistics,
};
use datafusion_ext::metrics::WriteOnlyDataSourceMetricsExecAdapter;
use datasources::common::sink::bson::BsonSink;
use datasources::common::sink::csv::{CsvSink, CsvSinkOpts};
use datasources::common::sink::json::{JsonSink, JsonSinkOpts};
Expand All @@ -39,7 +40,7 @@ use super::{new_operation_with_count_batch, GENERIC_OPERATION_AND_COUNT_PHYSICAL
pub struct CopyToExec {
pub format: CopyToFormatOptions,
pub dest: CopyToDestinationOptions,
pub source: Arc<WriteOnlyDataSourceMetricsExecAdapter>,
pub source: Arc<dyn ExecutionPlan>,
}

impl ExecutionPlan for CopyToExec {
Expand All @@ -56,7 +57,7 @@ impl ExecutionPlan for CopyToExec {
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
self.source.output_ordering()
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
Expand All @@ -67,12 +68,15 @@ impl ExecutionPlan for CopyToExec {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
if children.len() != 1 {
return Err(DataFusionError::Plan(
"CopyToExec wrong number of children".to_string(),
));
}
Ok(Arc::new(CopyToExec {
format: self.format.clone(),
dest: self.dest.clone(),
source: Arc::new(WriteOnlyDataSourceMetricsExecAdapter::new(
children.first().unwrap().clone(),
)),
source: children.first().unwrap().clone(),
}))
}

Expand All @@ -81,6 +85,7 @@ impl ExecutionPlan for CopyToExec {
partition: usize,
context: Arc<TaskContext>,
) -> DataFusionResult<SendableRecordBatchStream> {
// This should never happen as we set the `required_input_distribution` to `SinglePartition`
if partition != 0 {
return Err(DataFusionError::Execution(
"CopyToExec only supports 1 partition".to_string(),
Expand All @@ -99,6 +104,33 @@ impl ExecutionPlan for CopyToExec {
fn statistics(&self) -> DataFusionResult<Statistics> {
Ok(Statistics::new_unknown(self.schema().as_ref()))
}

fn required_input_distribution(&self) -> Vec<Distribution> {
// We currently expect the input to be a single partition
vec![Distribution::SinglePartition]
}

fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
// the input ordering must match the output ordering of the source
// without this, the sort may get removed by the optimizer
vec![self
.output_ordering()
.map(PhysicalSortRequirement::from_sort_exprs)]
}

fn maintains_input_order(&self) -> Vec<bool> {
// tell optimizer this operator doesn't reorder its input
vec![true]
}

fn benefits_from_input_partitioning(&self) -> Vec<bool> {
// We currently dont support partitioned `COPY TO` so we can't benefit from input partitioning
vec![false]
}

fn metrics(&self) -> Option<MetricsSet> {
self.source.metrics()
}
}

impl DisplayAs for CopyToExec {
Expand Down
10 changes: 10 additions & 0 deletions testdata/sqllogictests/copy_to.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
statement ok
COPY (select * from './testdata/parquet/userdata1.parquet' order by id desc ) to '${TMP}/userdata1_sorted.parquet';


#-this only works because it's a parquet file with a single row group.
# if it had multiple row groups, the ordering would not be guaranteed.
query I
select id from '${TMP}/userdata1_sorted.parquet' limit 1;
----
1000

0 comments on commit fd58886

Please sign in to comment.