Skip to content

Commit

Permalink
Add shuffle for SortPreservingMergeExec physical operator (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
kyotoYaho committed Oct 26, 2022
1 parent 71e92a6 commit 885b952
Showing 1 changed file with 27 additions and 0 deletions.
27 changes: 27 additions & 0 deletions ballista/scheduler/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use ballista_core::{
};
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion::physical_plan::windows::WindowAggExec;
use datafusion::physical_plan::{
with_new_children_if_necessary, ExecutionPlan, Partitioning,
Expand Down Expand Up @@ -124,6 +125,32 @@ impl DistributedPlanner {
with_new_children_if_necessary(execution_plan, vec![unresolved_shuffle])?,
stages,
))
} else if let Some(_sort_preserving_merge) = execution_plan
.as_any()
.downcast_ref::<SortPreservingMergeExec>(
) {
let shuffle_writer = create_shuffle_writer(
job_id,
self.next_stage_id(),
children[0].clone(),
None,
)?;
let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
shuffle_writer.stage_id(),
shuffle_writer.schema(),
shuffle_writer.output_partitioning().partition_count(),
shuffle_writer
.shuffle_output_partitioning()
.map(|p| p.partition_count())
.unwrap_or_else(|| {
shuffle_writer.output_partitioning().partition_count()
}),
));
stages.push(shuffle_writer);
Ok((
with_new_children_if_necessary(execution_plan, vec![unresolved_shuffle])?,
stages,
))
} else if let Some(repart) =
execution_plan.as_any().downcast_ref::<RepartitionExec>()
{
Expand Down

0 comments on commit 885b952

Please sign in to comment.