Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ballista shuffle is finally working as intended, providing scalable distributed joins #750

Merged
merged 11 commits into from
Jul 21, 2021
1 change: 1 addition & 0 deletions ballista/rust/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ impl BallistaContext {
&partition_id.job_id,
partition_id.stage_id as usize,
partition_id.partition_id as usize,
&location.path,
)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?)
Expand Down
16 changes: 14 additions & 2 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,8 @@ message PhysicalNegativeNode {
message UnresolvedShuffleExecNode {
uint32 stage_id = 1;
Schema schema = 2;
uint32 partition_count = 3;
uint32 input_partition_count = 3;
uint32 output_partition_count = 4;
}

message FilterExecNode {
Expand Down Expand Up @@ -700,7 +701,7 @@ message Action {

oneof ActionType {
// Fetch a partition from an executor
PartitionId fetch_partition = 3;
FetchPartition fetch_partition = 3;
}

// configuration settings
Expand All @@ -714,6 +715,15 @@ message ExecutePartition {
PhysicalPlanNode plan = 4;
// The task could need to read partitions from other executors
repeated PartitionLocation partition_location = 5;
// Output partition for shuffle writer
PhysicalHashRepartition output_partitioning = 6;
}

message FetchPartition {
string job_id = 1;
uint32 stage_id = 2;
uint32 partition_id = 3;
string path = 4;
}

// Mapping from partition id to executor id
Expand Down Expand Up @@ -809,6 +819,8 @@ message PollWorkParams {
message TaskDefinition {
PartitionId task_id = 1;
PhysicalPlanNode plan = 2;
// Output partition for shuffle writer
PhysicalHashRepartition output_partitioning = 3;
}

message PollWorkResult {
Expand Down
9 changes: 7 additions & 2 deletions ballista/rust/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,14 @@ impl BallistaClient {
job_id: &str,
stage_id: usize,
partition_id: usize,
path: &str,
) -> Result<SendableRecordBatchStream> {
let action =
Action::FetchPartition(PartitionId::new(job_id, stage_id, partition_id));
let action = Action::FetchPartition {
job_id: job_id.to_string(),
stage_id,
partition_id,
path: path.to_owned(),
};
self.execute_action(&action).await
}

Expand Down
35 changes: 20 additions & 15 deletions ballista/rust/core/src/execution_plans/shuffle_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ impl ExecutionPlan for ShuffleReaderExec {
}

fn output_partitioning(&self) -> Partitioning {
// TODO partitioning may be known and could be populated here
// see https://github.com/apache/arrow-datafusion/issues/758
Partitioning::UnknownPartitioning(self.partition.len())
}

Expand Down Expand Up @@ -123,24 +125,26 @@ impl ExecutionPlan for ShuffleReaderExec {
let loc_str = self
.partition
.iter()
.map(|x| {
x.iter()
.map(|l| {
format!(
"[executor={} part={}:{}:{} stats={}]",
l.executor_meta.id,
l.partition_id.job_id,
l.partition_id.stage_id,
l.partition_id.partition_id,
l.partition_stats
)
})
.collect::<Vec<String>>()
.join(",")
.enumerate()
.map(|(partition_id, locations)| {
format!(
"[partition={} paths={}]",
partition_id,
locations
.iter()
.map(|l| l.path.clone())
.collect::<Vec<String>>()
.join(",")
)
})
.collect::<Vec<String>>()
.join(", ");
write!(f, "ShuffleReaderExec: partition_locations={}", loc_str)
write!(
f,
"ShuffleReaderExec: partition_locations({})={}",
self.partition.len(),
loc_str
)
}
}
}
Expand All @@ -166,6 +170,7 @@ async fn fetch_partition(
&partition_id.job_id,
partition_id.stage_id as usize,
partition_id.partition_id as usize,
&location.path,
)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?)
Expand Down
11 changes: 11 additions & 0 deletions ballista/rust/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ impl ShuffleWriterExec {

match &self.shuffle_output_partitioning {
None => {
let start = Instant::now();
path.push(&format!("{}", input_partition));
std::fs::create_dir_all(&path)?;
path.push("data.arrow");
Expand All @@ -156,6 +157,14 @@ impl ShuffleWriterExec {
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;

self.metrics
.input_rows
.add(stats.num_rows.unwrap_or(0) as usize);
self.metrics
.output_rows
.add(stats.num_rows.unwrap_or(0) as usize);
self.metrics.write_time.add_elapsed(start);

info!(
"Executed partition {} in {} seconds. Statistics: {}",
input_partition,
Expand Down Expand Up @@ -227,6 +236,8 @@ impl ShuffleWriterExec {
RecordBatch::try_new(input_batch.schema(), columns)?;

// write non-empty batch out

//TODO optimize so we don't write or fetch empty partitions
//if output_batch.num_rows() > 0 {
let start = Instant::now();
match &mut writers[output_partition] {
Expand Down
19 changes: 15 additions & 4 deletions ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,26 @@ pub struct UnresolvedShuffleExec {
// The schema this node will have once it is replaced with a ShuffleReaderExec
pub schema: SchemaRef,

// The number of shuffle writer partition tasks that will produce the partitions
pub input_partition_count: usize,

// The partition count this node will have once it is replaced with a ShuffleReaderExec
pub partition_count: usize,
pub output_partition_count: usize,
}

impl UnresolvedShuffleExec {
/// Create a new UnresolvedShuffleExec
pub fn new(stage_id: usize, schema: SchemaRef, partition_count: usize) -> Self {
pub fn new(
stage_id: usize,
schema: SchemaRef,
input_partition_count: usize,
output_partition_count: usize,
) -> Self {
Self {
stage_id,
schema,
partition_count,
input_partition_count,
output_partition_count,
}
}
}
Expand All @@ -69,7 +78,9 @@ impl ExecutionPlan for UnresolvedShuffleExec {
}

fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(self.partition_count)
//TODO the output partition is known and should be populated here!
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this something that you want to finish up in this PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've filed https://github.com/apache/arrow-datafusion/issues/758 as a follow-up for implementing this since it involves more serde work.

// see https://github.com/apache/arrow-datafusion/issues/758
Partitioning::UnknownPartitioning(self.output_partition_count)
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
Expand Down
43 changes: 27 additions & 16 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,21 +376,9 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
let input: Arc<dyn ExecutionPlan> =
convert_box_required!(shuffle_writer.input)?;

let output_partitioning = match &shuffle_writer.output_partitioning {
Some(hash_part) => {
let expr = hash_part
.hash_expr
.iter()
.map(|e| e.try_into())
.collect::<Result<Vec<Arc<dyn PhysicalExpr>>, _>>()?;

Some(Partitioning::Hash(
expr,
hash_part.partition_count.try_into().unwrap(),
))
}
None => None,
};
let output_partitioning = parse_protobuf_hash_partitioning(
shuffle_writer.output_partitioning.as_ref(),
)?;

Ok(Arc::new(ShuffleWriterExec::try_new(
shuffle_writer.job_id.clone(),
Expand Down Expand Up @@ -466,7 +454,10 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
Ok(Arc::new(UnresolvedShuffleExec {
stage_id: unresolved_shuffle.stage_id as usize,
schema,
partition_count: unresolved_shuffle.partition_count as usize,
input_partition_count: unresolved_shuffle.input_partition_count
as usize,
output_partition_count: unresolved_shuffle.output_partition_count
as usize,
}))
}
}
Expand Down Expand Up @@ -680,3 +671,23 @@ impl TryFrom<&protobuf::physical_window_expr_node::WindowFunction> for WindowFun
}
}
}

pub fn parse_protobuf_hash_partitioning(
partitioning: Option<&protobuf::PhysicalHashRepartition>,
) -> Result<Option<Partitioning>, BallistaError> {
match partitioning {
Some(hash_part) => {
let expr = hash_part
.hash_expr
.iter()
.map(|e| e.try_into())
.collect::<Result<Vec<Arc<dyn PhysicalExpr>>, _>>()?;

Ok(Some(Partitioning::Hash(
expr,
hash_part.partition_count.try_into().unwrap(),
)))
}
None => Ok(None),
}
}
3 changes: 2 additions & 1 deletion ballista/rust/core/src/serde/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,8 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
protobuf::UnresolvedShuffleExecNode {
stage_id: exec.stage_id as u32,
schema: Some(exec.schema().as_ref().into()),
partition_count: exec.partition_count as u32,
input_partition_count: exec.input_partition_count as u32,
output_partition_count: exec.output_partition_count as u32,
},
)),
})
Expand Down
9 changes: 6 additions & 3 deletions ballista/rust/core/src/serde/scheduler/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@ impl TryInto<Action> for protobuf::Action {

fn try_into(self) -> Result<Action, Self::Error> {
match self.action_type {
Some(ActionType::FetchPartition(partition)) => {
Ok(Action::FetchPartition(partition.try_into()?))
}
Some(ActionType::FetchPartition(fetch)) => Ok(Action::FetchPartition {
job_id: fetch.job_id,
stage_id: fetch.stage_id as usize,
partition_id: fetch.partition_id as usize,
path: fetch.path,
}),
_ => Err(BallistaError::General(
"scheduler::from_proto(Action) invalid or missing action".to_owned(),
)),
Expand Down
12 changes: 11 additions & 1 deletion ballista/rust/core/src/serde/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use datafusion::arrow::array::{
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::logical_plan::LogicalPlan;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::Partitioning;
use serde::Serialize;
use uuid::Uuid;

Expand All @@ -36,7 +37,12 @@ pub mod to_proto;
#[derive(Debug, Clone)]
pub enum Action {
/// Collect a shuffle partition
FetchPartition(PartitionId),
FetchPartition {
job_id: String,
stage_id: usize,
partition_id: usize,
path: String,
},
}

/// Unique identifier for the output partition of an operator.
Expand Down Expand Up @@ -223,6 +229,8 @@ pub struct ExecutePartition {
pub plan: Arc<dyn ExecutionPlan>,
/// Location of shuffle partitions that this query stage may depend on
pub shuffle_locations: HashMap<PartitionId, ExecutorMeta>,
/// Output partitioning for shuffle writes
pub output_partitioning: Option<Partitioning>,
}

impl ExecutePartition {
Expand All @@ -232,13 +240,15 @@ impl ExecutePartition {
partition_id: Vec<usize>,
plan: Arc<dyn ExecutionPlan>,
shuffle_locations: HashMap<PartitionId, ExecutorMeta>,
output_partitioning: Option<Partitioning>,
) -> Self {
Self {
job_id,
stage_id,
partition_id,
plan,
shuffle_locations,
output_partitioning,
}
}

Expand Down
41 changes: 39 additions & 2 deletions ballista/rust/core/src/serde/scheduler/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,25 @@ use crate::serde::protobuf::action::ActionType;
use crate::serde::scheduler::{
Action, ExecutePartition, PartitionId, PartitionLocation, PartitionStats,
};
use datafusion::physical_plan::Partitioning;

impl TryInto<protobuf::Action> for Action {
type Error = BallistaError;

fn try_into(self) -> Result<protobuf::Action, Self::Error> {
match self {
Action::FetchPartition(partition_id) => Ok(protobuf::Action {
action_type: Some(ActionType::FetchPartition(partition_id.into())),
Action::FetchPartition {
job_id,
stage_id,
partition_id,
path,
} => Ok(protobuf::Action {
action_type: Some(ActionType::FetchPartition(protobuf::FetchPartition {
job_id,
stage_id: stage_id as u32,
partition_id: partition_id as u32,
path,
})),
settings: vec![],
}),
}
Expand All @@ -47,6 +58,9 @@ impl TryInto<protobuf::ExecutePartition> for ExecutePartition {
partition_id: self.partition_id.iter().map(|n| *n as u32).collect(),
plan: Some(self.plan.try_into()?),
partition_location: vec![],
output_partitioning: hash_partitioning_to_proto(
self.output_partitioning.as_ref(),
)?,
})
}
}
Expand Down Expand Up @@ -87,3 +101,26 @@ impl Into<protobuf::PartitionStats> for PartitionStats {
}
}
}

pub fn hash_partitioning_to_proto(
output_partitioning: Option<&Partitioning>,
) -> Result<Option<protobuf::PhysicalHashRepartition>, BallistaError> {
match output_partitioning {
Some(Partitioning::Hash(exprs, partition_count)) => {
Ok(Some(protobuf::PhysicalHashRepartition {
hash_expr: exprs
.iter()
.map(|expr| expr.clone().try_into())
.collect::<Result<Vec<_>, BallistaError>>()?,
partition_count: *partition_count as u64,
}))
}
None => Ok(None),
other => {
return Err(BallistaError::General(format!(
"scheduler::to_proto() invalid partitioning for ExecutePartition: {:?}",
other
)))
}
}
}
Loading