Skip to content

Commit

Permalink
Ballista shuffle is finally working as intended, providing scalable d…
Browse files Browse the repository at this point in the history
…istributed joins (#750)
  • Loading branch information
andygrove authored Jul 21, 2021
1 parent 30693df commit ed5746d
Show file tree
Hide file tree
Showing 17 changed files with 507 additions and 141 deletions.
1 change: 1 addition & 0 deletions ballista/rust/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ impl BallistaContext {
&partition_id.job_id,
partition_id.stage_id as usize,
partition_id.partition_id as usize,
&location.path,
)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?)
Expand Down
16 changes: 14 additions & 2 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,8 @@ message PhysicalNegativeNode {
message UnresolvedShuffleExecNode {
uint32 stage_id = 1;
Schema schema = 2;
uint32 partition_count = 3;
uint32 input_partition_count = 3;
uint32 output_partition_count = 4;
}

message FilterExecNode {
Expand Down Expand Up @@ -700,7 +701,7 @@ message Action {

oneof ActionType {
// Fetch a partition from an executor
PartitionId fetch_partition = 3;
FetchPartition fetch_partition = 3;
}

// configuration settings
Expand All @@ -714,6 +715,15 @@ message ExecutePartition {
PhysicalPlanNode plan = 4;
// The task could need to read partitions from other executors
repeated PartitionLocation partition_location = 5;
// Output partition for shuffle writer
PhysicalHashRepartition output_partitioning = 6;
}

message FetchPartition {
string job_id = 1;
uint32 stage_id = 2;
uint32 partition_id = 3;
string path = 4;
}

// Mapping from partition id to executor id
Expand Down Expand Up @@ -809,6 +819,8 @@ message PollWorkParams {
message TaskDefinition {
PartitionId task_id = 1;
PhysicalPlanNode plan = 2;
// Output partition for shuffle writer
PhysicalHashRepartition output_partitioning = 3;
}

message PollWorkResult {
Expand Down
9 changes: 7 additions & 2 deletions ballista/rust/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,14 @@ impl BallistaClient {
job_id: &str,
stage_id: usize,
partition_id: usize,
path: &str,
) -> Result<SendableRecordBatchStream> {
let action =
Action::FetchPartition(PartitionId::new(job_id, stage_id, partition_id));
let action = Action::FetchPartition {
job_id: job_id.to_string(),
stage_id,
partition_id,
path: path.to_owned(),
};
self.execute_action(&action).await
}

Expand Down
35 changes: 20 additions & 15 deletions ballista/rust/core/src/execution_plans/shuffle_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ impl ExecutionPlan for ShuffleReaderExec {
}

fn output_partitioning(&self) -> Partitioning {
// TODO partitioning may be known and could be populated here
// see https://github.com/apache/arrow-datafusion/issues/758
Partitioning::UnknownPartitioning(self.partition.len())
}

Expand Down Expand Up @@ -123,24 +125,26 @@ impl ExecutionPlan for ShuffleReaderExec {
let loc_str = self
.partition
.iter()
.map(|x| {
x.iter()
.map(|l| {
format!(
"[executor={} part={}:{}:{} stats={}]",
l.executor_meta.id,
l.partition_id.job_id,
l.partition_id.stage_id,
l.partition_id.partition_id,
l.partition_stats
)
})
.collect::<Vec<String>>()
.join(",")
.enumerate()
.map(|(partition_id, locations)| {
format!(
"[partition={} paths={}]",
partition_id,
locations
.iter()
.map(|l| l.path.clone())
.collect::<Vec<String>>()
.join(",")
)
})
.collect::<Vec<String>>()
.join(", ");
write!(f, "ShuffleReaderExec: partition_locations={}", loc_str)
write!(
f,
"ShuffleReaderExec: partition_locations({})={}",
self.partition.len(),
loc_str
)
}
}
}
Expand All @@ -166,6 +170,7 @@ async fn fetch_partition(
&partition_id.job_id,
partition_id.stage_id as usize,
partition_id.partition_id as usize,
&location.path,
)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?)
Expand Down
11 changes: 11 additions & 0 deletions ballista/rust/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ impl ShuffleWriterExec {

match &self.shuffle_output_partitioning {
None => {
let start = Instant::now();
path.push(&format!("{}", input_partition));
std::fs::create_dir_all(&path)?;
path.push("data.arrow");
Expand All @@ -156,6 +157,14 @@ impl ShuffleWriterExec {
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;

self.metrics
.input_rows
.add(stats.num_rows.unwrap_or(0) as usize);
self.metrics
.output_rows
.add(stats.num_rows.unwrap_or(0) as usize);
self.metrics.write_time.add_elapsed(start);

info!(
"Executed partition {} in {} seconds. Statistics: {}",
input_partition,
Expand Down Expand Up @@ -227,6 +236,8 @@ impl ShuffleWriterExec {
RecordBatch::try_new(input_batch.schema(), columns)?;

// write non-empty batch out

//TODO optimize so we don't write or fetch empty partitions
//if output_batch.num_rows() > 0 {
let start = Instant::now();
match &mut writers[output_partition] {
Expand Down
19 changes: 15 additions & 4 deletions ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,26 @@ pub struct UnresolvedShuffleExec {
// The schema this node will have once it is replaced with a ShuffleReaderExec
pub schema: SchemaRef,

// The number of shuffle writer partition tasks that will produce the partitions
pub input_partition_count: usize,

// The partition count this node will have once it is replaced with a ShuffleReaderExec
pub partition_count: usize,
pub output_partition_count: usize,
}

impl UnresolvedShuffleExec {
/// Create a new UnresolvedShuffleExec
pub fn new(stage_id: usize, schema: SchemaRef, partition_count: usize) -> Self {
pub fn new(
stage_id: usize,
schema: SchemaRef,
input_partition_count: usize,
output_partition_count: usize,
) -> Self {
Self {
stage_id,
schema,
partition_count,
input_partition_count,
output_partition_count,
}
}
}
Expand All @@ -69,7 +78,9 @@ impl ExecutionPlan for UnresolvedShuffleExec {
}

fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(self.partition_count)
//TODO the output partition is known and should be populated here!
// see https://github.com/apache/arrow-datafusion/issues/758
Partitioning::UnknownPartitioning(self.output_partition_count)
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
Expand Down
43 changes: 27 additions & 16 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,21 +376,9 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
let input: Arc<dyn ExecutionPlan> =
convert_box_required!(shuffle_writer.input)?;

let output_partitioning = match &shuffle_writer.output_partitioning {
Some(hash_part) => {
let expr = hash_part
.hash_expr
.iter()
.map(|e| e.try_into())
.collect::<Result<Vec<Arc<dyn PhysicalExpr>>, _>>()?;

Some(Partitioning::Hash(
expr,
hash_part.partition_count.try_into().unwrap(),
))
}
None => None,
};
let output_partitioning = parse_protobuf_hash_partitioning(
shuffle_writer.output_partitioning.as_ref(),
)?;

Ok(Arc::new(ShuffleWriterExec::try_new(
shuffle_writer.job_id.clone(),
Expand Down Expand Up @@ -466,7 +454,10 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
Ok(Arc::new(UnresolvedShuffleExec {
stage_id: unresolved_shuffle.stage_id as usize,
schema,
partition_count: unresolved_shuffle.partition_count as usize,
input_partition_count: unresolved_shuffle.input_partition_count
as usize,
output_partition_count: unresolved_shuffle.output_partition_count
as usize,
}))
}
}
Expand Down Expand Up @@ -680,3 +671,23 @@ impl TryFrom<&protobuf::physical_window_expr_node::WindowFunction> for WindowFun
}
}
}

pub fn parse_protobuf_hash_partitioning(
partitioning: Option<&protobuf::PhysicalHashRepartition>,
) -> Result<Option<Partitioning>, BallistaError> {
match partitioning {
Some(hash_part) => {
let expr = hash_part
.hash_expr
.iter()
.map(|e| e.try_into())
.collect::<Result<Vec<Arc<dyn PhysicalExpr>>, _>>()?;

Ok(Some(Partitioning::Hash(
expr,
hash_part.partition_count.try_into().unwrap(),
)))
}
None => Ok(None),
}
}
3 changes: 2 additions & 1 deletion ballista/rust/core/src/serde/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,8 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
protobuf::UnresolvedShuffleExecNode {
stage_id: exec.stage_id as u32,
schema: Some(exec.schema().as_ref().into()),
partition_count: exec.partition_count as u32,
input_partition_count: exec.input_partition_count as u32,
output_partition_count: exec.output_partition_count as u32,
},
)),
})
Expand Down
9 changes: 6 additions & 3 deletions ballista/rust/core/src/serde/scheduler/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@ impl TryInto<Action> for protobuf::Action {

fn try_into(self) -> Result<Action, Self::Error> {
match self.action_type {
Some(ActionType::FetchPartition(partition)) => {
Ok(Action::FetchPartition(partition.try_into()?))
}
Some(ActionType::FetchPartition(fetch)) => Ok(Action::FetchPartition {
job_id: fetch.job_id,
stage_id: fetch.stage_id as usize,
partition_id: fetch.partition_id as usize,
path: fetch.path,
}),
_ => Err(BallistaError::General(
"scheduler::from_proto(Action) invalid or missing action".to_owned(),
)),
Expand Down
12 changes: 11 additions & 1 deletion ballista/rust/core/src/serde/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use datafusion::arrow::array::{
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::logical_plan::LogicalPlan;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::Partitioning;
use serde::Serialize;
use uuid::Uuid;

Expand All @@ -36,7 +37,12 @@ pub mod to_proto;
#[derive(Debug, Clone)]
pub enum Action {
/// Collect a shuffle partition
FetchPartition(PartitionId),
FetchPartition {
job_id: String,
stage_id: usize,
partition_id: usize,
path: String,
},
}

/// Unique identifier for the output partition of an operator.
Expand Down Expand Up @@ -223,6 +229,8 @@ pub struct ExecutePartition {
pub plan: Arc<dyn ExecutionPlan>,
/// Location of shuffle partitions that this query stage may depend on
pub shuffle_locations: HashMap<PartitionId, ExecutorMeta>,
/// Output partitioning for shuffle writes
pub output_partitioning: Option<Partitioning>,
}

impl ExecutePartition {
Expand All @@ -232,13 +240,15 @@ impl ExecutePartition {
partition_id: Vec<usize>,
plan: Arc<dyn ExecutionPlan>,
shuffle_locations: HashMap<PartitionId, ExecutorMeta>,
output_partitioning: Option<Partitioning>,
) -> Self {
Self {
job_id,
stage_id,
partition_id,
plan,
shuffle_locations,
output_partitioning,
}
}

Expand Down
41 changes: 39 additions & 2 deletions ballista/rust/core/src/serde/scheduler/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,25 @@ use crate::serde::protobuf::action::ActionType;
use crate::serde::scheduler::{
Action, ExecutePartition, PartitionId, PartitionLocation, PartitionStats,
};
use datafusion::physical_plan::Partitioning;

impl TryInto<protobuf::Action> for Action {
type Error = BallistaError;

fn try_into(self) -> Result<protobuf::Action, Self::Error> {
match self {
Action::FetchPartition(partition_id) => Ok(protobuf::Action {
action_type: Some(ActionType::FetchPartition(partition_id.into())),
Action::FetchPartition {
job_id,
stage_id,
partition_id,
path,
} => Ok(protobuf::Action {
action_type: Some(ActionType::FetchPartition(protobuf::FetchPartition {
job_id,
stage_id: stage_id as u32,
partition_id: partition_id as u32,
path,
})),
settings: vec![],
}),
}
Expand All @@ -47,6 +58,9 @@ impl TryInto<protobuf::ExecutePartition> for ExecutePartition {
partition_id: self.partition_id.iter().map(|n| *n as u32).collect(),
plan: Some(self.plan.try_into()?),
partition_location: vec![],
output_partitioning: hash_partitioning_to_proto(
self.output_partitioning.as_ref(),
)?,
})
}
}
Expand Down Expand Up @@ -87,3 +101,26 @@ impl Into<protobuf::PartitionStats> for PartitionStats {
}
}
}

pub fn hash_partitioning_to_proto(
output_partitioning: Option<&Partitioning>,
) -> Result<Option<protobuf::PhysicalHashRepartition>, BallistaError> {
match output_partitioning {
Some(Partitioning::Hash(exprs, partition_count)) => {
Ok(Some(protobuf::PhysicalHashRepartition {
hash_expr: exprs
.iter()
.map(|expr| expr.clone().try_into())
.collect::<Result<Vec<_>, BallistaError>>()?,
partition_count: *partition_count as u64,
}))
}
None => Ok(None),
other => {
return Err(BallistaError::General(format!(
"scheduler::to_proto() invalid partitioning for ExecutePartition: {:?}",
other
)))
}
}
}
Loading

0 comments on commit ed5746d

Please sign in to comment.