Skip to content

Commit

Permalink
Add support for multiple partitions with SortExec (#362) (#378)
Browse files Browse the repository at this point in the history
* Add support for multiple partitions with SortExec

* make SortExec partitioning optional
  • Loading branch information
tustvold authored May 25, 2021
1 parent d9b0447 commit 3593d1f
Showing 1 changed file with 39 additions and 16 deletions.
55 changes: 39 additions & 16 deletions datafusion/src/physical_plan/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ pub struct SortExec {
output_rows: Arc<SQLMetric>,
/// Time to sort batches
sort_time_nanos: Arc<SQLMetric>,
/// Preserve partitions of input plan
preserve_partitioning: bool,
}

impl SortExec {
Expand All @@ -63,12 +65,23 @@ impl SortExec {
expr: Vec<PhysicalSortExpr>,
input: Arc<dyn ExecutionPlan>,
) -> Result<Self> {
Ok(Self {
Ok(Self::new_with_partitioning(expr, input, false))
}

/// Create a new sort execution plan with the option to preserve
/// the partitioning of the input plan
pub fn new_with_partitioning(
expr: Vec<PhysicalSortExpr>,
input: Arc<dyn ExecutionPlan>,
preserve_partitioning: bool,
) -> Self {
Self {
expr,
input,
preserve_partitioning,
output_rows: SQLMetric::counter(),
sort_time_nanos: SQLMetric::time_nanos(),
})
}
}

/// Input schema
Expand Down Expand Up @@ -99,11 +112,19 @@ impl ExecutionPlan for SortExec {

/// Get the output partitioning of this plan
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(1)
if self.preserve_partitioning {
self.input.output_partitioning()
} else {
Partitioning::UnknownPartitioning(1)
}
}

fn required_child_distribution(&self) -> Distribution {
Distribution::SinglePartition
if self.preserve_partitioning {
Distribution::UnspecifiedDistribution
} else {
Distribution::SinglePartition
}
}

fn with_new_children(
Expand All @@ -122,21 +143,23 @@ impl ExecutionPlan for SortExec {
}

async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
if 0 != partition {
return Err(DataFusionError::Internal(format!(
"SortExec invalid partition {}",
partition
)));
}
if !self.preserve_partitioning {
if 0 != partition {
return Err(DataFusionError::Internal(format!(
"SortExec invalid partition {}",
partition
)));
}

// sort needs to operate on a single partition currently
if 1 != self.input.output_partitioning().partition_count() {
return Err(DataFusionError::Internal(
"SortExec requires a single input partition".to_owned(),
));
// sort needs to operate on a single partition currently
if 1 != self.input.output_partitioning().partition_count() {
return Err(DataFusionError::Internal(
"SortExec requires a single input partition".to_owned(),
));
}
}

let input = self.input.execute(0).await?;
let input = self.input.execute(partition).await?;

Ok(Box::pin(SortStream::new(
input,
Expand Down

0 comments on commit 3593d1f

Please sign in to comment.