Skip to content

Commit

Permalink
ShuffleReaderExec now supports multiple locations per partition (#541)
Browse files Browse the repository at this point in the history
* ShuffleReaderExec now supports multiple locations per partition

* Remove TODO

* avoid clone
  • Loading branch information
andygrove authored Jun 12, 2021
1 parent ad70a1e commit 8f4078d
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 88 deletions.
39 changes: 5 additions & 34 deletions ballista/rust/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,18 @@ use ballista_core::serde::protobuf::{
execute_query_params::Query, job_status, ExecuteQueryParams, GetJobStatusParams,
GetJobStatusResult,
};
use ballista_core::utils::WrappedStream;
use ballista_core::{
client::BallistaClient, datasource::DfTableAdapter, utils::create_datafusion_context,
};

use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::error::Result as ArrowResult;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::catalog::TableReference;
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_plan::LogicalPlan;
use datafusion::physical_plan::csv::CsvReadOptions;
use datafusion::{dataframe::DataFrame, physical_plan::RecordBatchStream};
use futures::future;
use futures::Stream;
use futures::StreamExt;
use log::{error, info};

Expand Down Expand Up @@ -74,32 +71,6 @@ impl BallistaContextState {
}
}

struct WrappedStream {
stream: Pin<Box<dyn Stream<Item = ArrowResult<RecordBatch>> + Send + Sync>>,
schema: SchemaRef,
}

impl RecordBatchStream for WrappedStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}

impl Stream for WrappedStream {
type Item = ArrowResult<RecordBatch>;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.stream.poll_next_unpin(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
}
}

#[allow(dead_code)]

pub struct BallistaContext {
Expand Down Expand Up @@ -287,10 +258,10 @@ impl BallistaContext {
.into_iter()
.collect::<Result<Vec<_>>>()?;

let result = WrappedStream {
stream: Box::pin(futures::stream::iter(result).flatten()),
schema: Arc::new(schema),
};
let result = WrappedStream::new(
Box::pin(futures::stream::iter(result).flatten()),
Arc::new(schema),
);
break Ok(Box::pin(result));
}
};
Expand Down
7 changes: 6 additions & 1 deletion ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -489,10 +489,15 @@ message HashAggregateExecNode {
}

message ShuffleReaderExecNode {
repeated PartitionLocation partition_location = 1;
repeated ShuffleReaderPartition partition = 1;
Schema schema = 2;
}

message ShuffleReaderPartition {
// each partition of a shuffle read can read data from multiple locations
repeated PartitionLocation location = 1;
}

message GlobalLimitExecNode {
PhysicalPlanNode input = 1;
uint32 limit = 2;
Expand Down
94 changes: 57 additions & 37 deletions ballista/rust/core/src/execution_plans/shuffle_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,42 +15,43 @@
// specific language governing permissions and limitations
// under the License.

use std::fmt::Formatter;
use std::sync::Arc;
use std::{any::Any, pin::Pin};

use crate::client::BallistaClient;
use crate::memory_stream::MemoryStream;
use crate::serde::scheduler::PartitionLocation;

use crate::utils::WrappedStream;
use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::error::Result as ArrowResult;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning};
use datafusion::{
error::{DataFusionError, Result},
physical_plan::RecordBatchStream,
};
use futures::{future, Stream, StreamExt};
use log::info;
use std::fmt::Formatter;

/// ShuffleReaderExec reads partitions that have already been materialized by an executor.
/// ShuffleReaderExec reads partitions that have already been materialized by a query stage
/// being executed by an executor
#[derive(Debug, Clone)]
pub struct ShuffleReaderExec {
// The query stage that is responsible for producing the shuffle partitions that
// this operator will read
pub(crate) partition_location: Vec<PartitionLocation>,
/// Each partition of a shuffle can read data from multiple locations
pub(crate) partition: Vec<Vec<PartitionLocation>>,
pub(crate) schema: SchemaRef,
}

impl ShuffleReaderExec {
/// Create a new ShuffleReaderExec
pub fn try_new(
partition_meta: Vec<PartitionLocation>,
partition: Vec<Vec<PartitionLocation>>,
schema: SchemaRef,
) -> Result<Self> {
Ok(Self {
partition_location: partition_meta,
schema,
})
Ok(Self { partition, schema })
}
}

Expand All @@ -65,7 +66,7 @@ impl ExecutionPlan for ShuffleReaderExec {
}

fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(self.partition_location.len())
Partitioning::UnknownPartitioning(self.partition.len())
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
Expand All @@ -86,23 +87,18 @@ impl ExecutionPlan for ShuffleReaderExec {
partition: usize,
) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
info!("ShuffleReaderExec::execute({})", partition);
let partition_location = &self.partition_location[partition];

let mut client = BallistaClient::try_new(
&partition_location.executor_meta.host,
partition_location.executor_meta.port,
)
.await
.map_err(|e| DataFusionError::Execution(format!("Ballista Error: {:?}", e)))?;

client
.fetch_partition(
&partition_location.partition_id.job_id,
partition_location.partition_id.stage_id,
partition,
)
let partition_locations = &self.partition[partition];
let result = future::join_all(partition_locations.iter().map(fetch_partition))
.await
.map_err(|e| DataFusionError::Execution(format!("Ballista Error: {:?}", e)))
.into_iter()
.collect::<Result<Vec<_>>>()?;

let result = WrappedStream::new(
Box::pin(futures::stream::iter(result).flatten()),
Arc::new(self.schema.as_ref().clone()),
);
Ok(Box::pin(result))
}

fn fmt_as(
Expand All @@ -113,22 +109,46 @@ impl ExecutionPlan for ShuffleReaderExec {
match t {
DisplayFormatType::Default => {
let loc_str = self
.partition_location
.partition
.iter()
.map(|l| {
format!(
"[executor={} part={}:{}:{} stats={:?}]",
l.executor_meta.id,
l.partition_id.job_id,
l.partition_id.stage_id,
l.partition_id.partition_id,
l.partition_stats
)
.map(|x| {
x.iter()
.map(|l| {
format!(
"[executor={} part={}:{}:{} stats={:?}]",
l.executor_meta.id,
l.partition_id.job_id,
l.partition_id.stage_id,
l.partition_id.partition_id,
l.partition_stats
)
})
.collect::<Vec<String>>()
.join(",")
})
.collect::<Vec<String>>()
.join(",");
.join("\n");
write!(f, "ShuffleReaderExec: partition_locations={}", loc_str)
}
}
}
}

async fn fetch_partition(
location: &PartitionLocation,
) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
let metadata = &location.executor_meta;
let partition_id = &location.partition_id;
let mut ballista_client =
BallistaClient::try_new(metadata.host.as_str(), metadata.port as u16)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
Ok(ballista_client
.fetch_partition(
&partition_id.job_id,
partition_id.stage_id as usize,
partition_id.partition_id as usize,
)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?)
}
12 changes: 9 additions & 3 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::error::BallistaError;
use crate::execution_plans::{ShuffleReaderExec, UnresolvedShuffleExec};
use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
use crate::serde::protobuf::LogicalExprNode;
use crate::serde::protobuf::ShuffleReaderPartition;
use crate::serde::scheduler::PartitionLocation;
use crate::serde::{proto_error, protobuf};
use crate::{convert_box_required, convert_required};
Expand Down Expand Up @@ -327,10 +328,15 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
}
PhysicalPlanType::ShuffleReader(shuffle_reader) => {
let schema = Arc::new(convert_required!(shuffle_reader.schema)?);
let partition_location: Vec<PartitionLocation> = shuffle_reader
.partition_location
let partition_location: Vec<Vec<PartitionLocation>> = shuffle_reader
.partition
.iter()
.map(|p| p.clone().try_into())
.map(|p| {
p.location
.iter()
.map(|l| l.clone().try_into())
.collect::<Result<Vec<_>, _>>()
})
.collect::<Result<Vec<_>, BallistaError>>()?;
let shuffle_reader =
ShuffleReaderExec::try_new(partition_location, schema)?;
Expand Down
18 changes: 11 additions & 7 deletions ballista/rust/core/src/serde/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use protobuf::physical_plan_node::PhysicalPlanType;

use crate::execution_plans::{ShuffleReaderExec, UnresolvedShuffleExec};
use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
use crate::serde::scheduler::PartitionLocation;
use crate::serde::{protobuf, BallistaError};
use datafusion::physical_plan::functions::{BuiltinScalarFunction, ScalarFunctionExpr};
use datafusion::physical_plan::merge::MergeExec;
Expand Down Expand Up @@ -268,16 +269,19 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
)),
})
} else if let Some(exec) = plan.downcast_ref::<ShuffleReaderExec>() {
let partition_location = exec
.partition_location
.iter()
.map(|l| l.clone().try_into())
.collect::<Result<_, _>>()?;

let mut partition = vec![];
for location in &exec.partition {
partition.push(protobuf::ShuffleReaderPartition {
location: location
.iter()
.map(|l| l.clone().try_into())
.collect::<Result<Vec<_>, _>>()?,
});
}
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::ShuffleReader(
protobuf::ShuffleReaderExecNode {
partition_location,
partition,
schema: Some(exec.schema().as_ref().into()),
},
)),
Expand Down
40 changes: 38 additions & 2 deletions ballista/rust/core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ use crate::execution_plans::{QueryStageExec, UnresolvedShuffleExec};
use crate::memory_stream::MemoryStream;
use crate::serde::scheduler::PartitionStats;

use datafusion::arrow::error::Result as ArrowResult;
use datafusion::arrow::{
array::{
ArrayBuilder, ArrayRef, StructArray, StructBuilder, UInt64Array, UInt64Builder,
},
datatypes::{DataType, Field},
datatypes::{DataType, Field, SchemaRef},
ipc::reader::FileReader,
ipc::writer::FileWriter,
record_batch::RecordBatch,
Expand All @@ -54,7 +55,7 @@ use datafusion::physical_plan::sort::SortExec;
use datafusion::physical_plan::{
AggregateExpr, ExecutionPlan, PhysicalExpr, RecordBatchStream,
};
use futures::StreamExt;
use futures::{future, Stream, StreamExt};

/// Stream data to disk in Arrow IPC format

Expand Down Expand Up @@ -234,3 +235,38 @@ pub fn create_datafusion_context() -> ExecutionContext {
.with_physical_optimizer_rules(rules);
ExecutionContext::with_config(config)
}

pub struct WrappedStream {
stream: Pin<Box<dyn Stream<Item = ArrowResult<RecordBatch>> + Send + Sync>>,
schema: SchemaRef,
}

impl WrappedStream {
pub fn new(
stream: Pin<Box<dyn Stream<Item = ArrowResult<RecordBatch>> + Send + Sync>>,
schema: SchemaRef,
) -> Self {
Self { stream, schema }
}
}

impl RecordBatchStream for WrappedStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}

impl Stream for WrappedStream {
type Item = ArrowResult<RecordBatch>;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.stream.poll_next_unpin(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
}
}
2 changes: 1 addition & 1 deletion ballista/rust/scheduler/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl DistributedPlanner {

pub fn remove_unresolved_shuffles(
stage: &dyn ExecutionPlan,
partition_locations: &HashMap<usize, Vec<PartitionLocation>>,
partition_locations: &HashMap<usize, Vec<Vec<PartitionLocation>>>,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut new_children: Vec<Arc<dyn ExecutionPlan>> = vec![];
for child in stage.children() {
Expand Down
Loading

0 comments on commit 8f4078d

Please sign in to comment.