Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: properly satisfy enforcesorting rules #2814

Merged
merged 2 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 82 additions & 5 deletions crates/datafusion_ext/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use std::task::{Context, Poll};

use datafusion::arrow::datatypes::{Schema, SchemaRef};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::Result;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::TaskContext;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_expr::{PhysicalSortExpr, PhysicalSortRequirement};
use datafusion::physical_plan::metrics::{
BaselineMetrics,
ExecutionPlanMetricsSet,
Expand Down Expand Up @@ -199,7 +199,7 @@ impl DataSourceMetricsOptsType for WriteOnlyDataSourceMetricsOptsType {
/// able to modify directly to record metrics (e.g. Delta). Otherwise, this
/// should be skipped and metrics collection should be added to the execution
/// plan directly.
#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct DataSourceMetricsExecAdapter<T: DataSourceMetricsOptsType> {
child: Arc<dyn ExecutionPlan>,
metrics: ExecutionPlanMetricsSet,
Expand All @@ -223,7 +223,15 @@ impl<T: DataSourceMetricsOptsType> DataSourceMetricsExecAdapter<T> {
}
}

impl<T: DataSourceMetricsOptsType> ExecutionPlan for DataSourceMetricsExecAdapter<T> {
impl<T: DataSourceMetricsOptsType> Debug for DataSourceMetricsExecAdapter<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct(format!("{}DataSourceMetricsExecAdapter", T::DISPLAY_NAME_PREFIX).as_str())
.field("child", &self.child)
.finish()
}
}

impl ExecutionPlan for WriteOnlyDataSourceMetricsExecAdapter {
fn as_any(&self) -> &dyn Any {
self
}
Expand All @@ -248,6 +256,75 @@ impl<T: DataSourceMetricsOptsType> ExecutionPlan for DataSourceMetricsExecAdapte
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
if children.is_empty() {
return Err(DataFusionError::Plan(
"DataSourceMetricsExecAdapter wrong number of children".to_string(),
));
}
Ok(Arc::new(Self::new(children[0].clone())))
}

fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let stream = self.child.execute(partition, context)?;
Ok(Box::pin(BoxedStreamAdapater::new(
stream,
partition,
&self.metrics,
WriteOnlyDataSourceMetricsOptsType::OPTS,
)))
}

fn statistics(&self) -> Result<Statistics> {
self.child.statistics()
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
// the input ordering must match the output ordering of the source
// without this, the sort may get removed by the optimizer
vec![self
.output_ordering()
.map(PhysicalSortRequirement::from_sort_exprs)]
}
}
Comment on lines +289 to +296
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the WriteOnlyDataSourceMetricsExecAdapter needs an explicit check on the input ordering, otherwise the EnforceSorting rule may remove it. The ReadOnly... doesn't care, so this needed to be split out.

This states that if the child has an explicit output_ordering, then this node needs a matching input_ordering.


impl ExecutionPlan for ReadOnlyDataSourceMetricsExecAdapter {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> Arc<Schema> {
self.child.schema()
}

fn output_partitioning(&self) -> Partitioning {
self.child.output_partitioning()
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
self.child.output_ordering()
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.child.clone()]
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
if children.is_empty() {
return Err(DataFusionError::Plan(
"DataSourceMetricsExecAdapter wrong number of children".to_string(),
));
}
Ok(Arc::new(Self::new(children[0].clone())))
}

Expand All @@ -261,7 +338,7 @@ impl<T: DataSourceMetricsOptsType> ExecutionPlan for DataSourceMetricsExecAdapte
stream,
partition,
&self.metrics,
T::OPTS,
ReadOnlyDataSourceMetricsOptsType::OPTS,
)))
}

Expand Down
9 changes: 7 additions & 2 deletions crates/datafusion_ext/src/runtime/runtime_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::fmt;
use std::sync::Arc;

use datafusion::arrow::datatypes::Schema;
use datafusion::error::Result as DataFusionResult;
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::execution::TaskContext;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::metrics::MetricsSet;
Expand Down Expand Up @@ -58,6 +58,11 @@ impl ExecutionPlan for RuntimeGroupExec {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
if children.len() != 1 {
return Err(DataFusionError::Plan(
"RuntimeGroupExec wrong number of children".to_string(),
));
}
Ok(Arc::new(Self {
preference: self.preference,
child: children[0].clone(),
Expand All @@ -77,7 +82,7 @@ impl ExecutionPlan for RuntimeGroupExec {
}

fn metrics(&self) -> Option<MetricsSet> {
None
self.child.metrics()
}
}

Expand Down
46 changes: 39 additions & 7 deletions crates/sqlexec/src/planner/physical_plan/copy_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,20 @@ use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::execution::TaskContext;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_expr::{PhysicalSortExpr, PhysicalSortRequirement};
use datafusion::physical_plan::insert::DataSink;
use datafusion::physical_plan::metrics::MetricsSet;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
execute_stream,
DisplayAs,
DisplayFormatType,
Distribution,
ExecutionPlan,
Partitioning,
SendableRecordBatchStream,
Statistics,
};
use datafusion_ext::metrics::WriteOnlyDataSourceMetricsExecAdapter;
use datasources::common::sink::bson::BsonSink;
use datasources::common::sink::csv::{CsvSink, CsvSinkOpts};
use datasources::common::sink::json::{JsonSink, JsonSinkOpts};
Expand All @@ -39,7 +40,7 @@ use super::{new_operation_with_count_batch, GENERIC_OPERATION_AND_COUNT_PHYSICAL
pub struct CopyToExec {
pub format: CopyToFormatOptions,
pub dest: CopyToDestinationOptions,
pub source: Arc<WriteOnlyDataSourceMetricsExecAdapter>,
pub source: Arc<dyn ExecutionPlan>,
}

impl ExecutionPlan for CopyToExec {
Expand All @@ -56,7 +57,7 @@ impl ExecutionPlan for CopyToExec {
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
self.source.output_ordering()
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
Expand All @@ -67,12 +68,15 @@ impl ExecutionPlan for CopyToExec {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
if children.len() != 1 {
return Err(DataFusionError::Plan(
"CopyToExec wrong number of children".to_string(),
));
}
Ok(Arc::new(CopyToExec {
format: self.format.clone(),
dest: self.dest.clone(),
source: Arc::new(WriteOnlyDataSourceMetricsExecAdapter::new(
children.first().unwrap().clone(),
)),
source: children.first().unwrap().clone(),
}))
}

Expand All @@ -81,6 +85,7 @@ impl ExecutionPlan for CopyToExec {
partition: usize,
context: Arc<TaskContext>,
) -> DataFusionResult<SendableRecordBatchStream> {
// This should never happen as we set the `required_input_distribution` to `SinglePartition`
if partition != 0 {
return Err(DataFusionError::Execution(
"CopyToExec only supports 1 partition".to_string(),
Expand All @@ -99,6 +104,33 @@ impl ExecutionPlan for CopyToExec {
fn statistics(&self) -> DataFusionResult<Statistics> {
Ok(Statistics::new_unknown(self.schema().as_ref()))
}

fn required_input_distribution(&self) -> Vec<Distribution> {
// We currently expect the input to be a single partition
vec![Distribution::SinglePartition]
}

fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
// the input ordering must match the output ordering of the source
// without this, the sort may get removed by the optimizer
vec![self
.output_ordering()
.map(PhysicalSortRequirement::from_sort_exprs)]
}

fn maintains_input_order(&self) -> Vec<bool> {
// tell optimizer this operator doesn't reorder its input
vec![true]
}

fn benefits_from_input_partitioning(&self) -> Vec<bool> {
// We currently dont support partitioned `COPY TO` so we can't benefit from input partitioning
vec![false]
}

fn metrics(&self) -> Option<MetricsSet> {
self.source.metrics()
}
}

impl DisplayAs for CopyToExec {
Expand Down
10 changes: 10 additions & 0 deletions testdata/sqllogictests/copy_to.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
statement ok
COPY (select * from './testdata/parquet/userdata1.parquet' order by id desc ) to '${TMP}/userdata1_sorted.parquet';


#-this only works because it's a parquet file with a single row group.
# if it had multiple row groups, the ordering would not be guaranteed.
query I
select id from '${TMP}/userdata1_sorted.parquet' limit 1;
----
1000