Skip to content

Commit

Permalink
merge master
Browse files Browse the repository at this point in the history
  • Loading branch information
jgoday committed Jul 8, 2021
2 parents 966977d + 7378bb4 commit 34475c7
Show file tree
Hide file tree
Showing 45 changed files with 2,073 additions and 633 deletions.
4 changes: 2 additions & 2 deletions DEVELOPERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ Testing setup:

- `git submodule init`
- `git submodule update`
- `export PARQUET_TEST_DATA=parquet_testing/`
- `export ARROW_TEST_DATA=testing/data/`
- `export PARQUET_TEST_DATA=$(pwd)/parquet-testing/data/`
- `export ARROW_TEST_DATA=$(pwd)/testing/data/`

## How to add a new scalar function

Expand Down
1 change: 1 addition & 0 deletions ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ simd = ["datafusion/simd"]
ahash = "0.7"
async-trait = "0.1.36"
futures = "0.3"
hashbrown = "0.11"
log = "0.4"
prost = "0.7"
serde = {version = "1", features = ["derive"]}
Expand Down
10 changes: 8 additions & 2 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -378,12 +378,18 @@ enum JoinType {
ANTI = 5;
}

enum JoinConstraint {
ON = 0;
USING = 1;
}

message JoinNode {
LogicalPlanNode left = 1;
LogicalPlanNode right = 2;
JoinType join_type = 3;
repeated Column left_join_column = 4;
repeated Column right_join_column = 5;
JoinConstraint join_constraint = 4;
repeated Column left_join_column = 5;
repeated Column right_join_column = 6;
}

message LimitNode {
Expand Down
26 changes: 22 additions & 4 deletions ballista/rust/core/src/execution_plans/shuffle_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,17 @@ use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::error::Result as ArrowResult;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning};
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SQLMetric,
};
use datafusion::{
error::{DataFusionError, Result},
physical_plan::RecordBatchStream,
};
use futures::{future, Stream, StreamExt};
use hashbrown::HashMap;
use log::info;
use std::time::Instant;

/// ShuffleReaderExec reads partitions that have already been materialized by a ShuffleWriterExec
/// being executed by an executor
Expand All @@ -43,6 +47,8 @@ pub struct ShuffleReaderExec {
/// Each partition of a shuffle can read data from multiple locations
pub(crate) partition: Vec<Vec<PartitionLocation>>,
pub(crate) schema: SchemaRef,
/// Time to fetch data from executor
fetch_time: Arc<SQLMetric>,
}

impl ShuffleReaderExec {
Expand All @@ -51,7 +57,11 @@ impl ShuffleReaderExec {
partition: Vec<Vec<PartitionLocation>>,
schema: SchemaRef,
) -> Result<Self> {
Ok(Self { partition, schema })
Ok(Self {
partition,
schema,
fetch_time: SQLMetric::time_nanos(),
})
}
}

Expand Down Expand Up @@ -88,11 +98,13 @@ impl ExecutionPlan for ShuffleReaderExec {
) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
info!("ShuffleReaderExec::execute({})", partition);

let start = Instant::now();
let partition_locations = &self.partition[partition];
let result = future::join_all(partition_locations.iter().map(fetch_partition))
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
self.fetch_time.add_elapsed(start);

let result = WrappedStream::new(
Box::pin(futures::stream::iter(result).flatten()),
Expand All @@ -115,7 +127,7 @@ impl ExecutionPlan for ShuffleReaderExec {
x.iter()
.map(|l| {
format!(
"[executor={} part={}:{}:{} stats={:?}]",
"[executor={} part={}:{}:{} stats={}]",
l.executor_meta.id,
l.partition_id.job_id,
l.partition_id.stage_id,
Expand All @@ -127,11 +139,17 @@ impl ExecutionPlan for ShuffleReaderExec {
.join(",")
})
.collect::<Vec<String>>()
.join("\n");
.join(", ");
write!(f, "ShuffleReaderExec: partition_locations={}", loc_str)
}
}
}

fn metrics(&self) -> HashMap<String, SQLMetric> {
let mut metrics = HashMap::new();
metrics.insert("fetchTime".to_owned(), (*self.fetch_time).clone());
metrics
}
}

async fn fetch_partition(
Expand Down
42 changes: 36 additions & 6 deletions ballista/rust/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
//! partition is re-partitioned and streamed to disk in Arrow IPC format. Future stages of the query
//! will use the ShuffleReaderExec to read these results.

use std::fs::File;
use std::iter::Iterator;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
Expand All @@ -43,11 +44,11 @@ use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError, Result};
use datafusion::physical_plan::hash_join::create_hashes;
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SQLMetric,
};
use futures::StreamExt;
use hashbrown::HashMap;
use log::info;
use std::fs::File;
use uuid::Uuid;

/// ShuffleWriterExec represents a section of a query plan that has consistent partitioning and
Expand All @@ -66,6 +67,22 @@ pub struct ShuffleWriterExec {
work_dir: String,
/// Optional shuffle output partitioning
shuffle_output_partitioning: Option<Partitioning>,
/// Shuffle write metrics
metrics: ShuffleWriteMetrics,
}

#[derive(Debug, Clone)]
struct ShuffleWriteMetrics {
/// Time spend writing batches to shuffle files
write_time: Arc<SQLMetric>,
}

impl ShuffleWriteMetrics {
fn new() -> Self {
Self {
write_time: SQLMetric::time_nanos(),
}
}
}

impl ShuffleWriterExec {
Expand All @@ -83,6 +100,7 @@ impl ShuffleWriterExec {
plan,
work_dir,
shuffle_output_partitioning,
metrics: ShuffleWriteMetrics::new(),
})
}

Expand Down Expand Up @@ -150,12 +168,16 @@ impl ExecutionPlan for ShuffleWriterExec {
info!("Writing results to {}", path);

// stream results to disk
let stats = utils::write_stream_to_disk(&mut stream, path)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
let stats = utils::write_stream_to_disk(
&mut stream,
path,
self.metrics.write_time.clone(),
)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;

info!(
"Executed partition {} in {} seconds. Statistics: {:?}",
"Executed partition {} in {} seconds. Statistics: {}",
partition,
now.elapsed().as_secs(),
stats
Expand Down Expand Up @@ -231,6 +253,7 @@ impl ExecutionPlan for ShuffleWriterExec {
RecordBatch::try_new(input_batch.schema(), columns)?;

// write batch out
let start = Instant::now();
match &mut writers[num_output_partition] {
Some(w) => {
w.write(&output_batch)?;
Expand All @@ -251,6 +274,7 @@ impl ExecutionPlan for ShuffleWriterExec {
writers[num_output_partition] = Some(writer);
}
}
self.metrics.write_time.add_elapsed(start);
}
}

Expand Down Expand Up @@ -310,6 +334,12 @@ impl ExecutionPlan for ShuffleWriterExec {
}
}

fn metrics(&self) -> HashMap<String, SQLMetric> {
let mut metrics = HashMap::new();
metrics.insert("writeTime".to_owned(), (*self.metrics.write_time).clone());
metrics
}

fn fmt_as(
&self,
t: DisplayFormatType,
Expand Down
41 changes: 25 additions & 16 deletions ballista/rust/core/src/serde/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use datafusion::logical_plan::window_frames::{
};
use datafusion::logical_plan::{
abs, acos, asin, atan, ceil, cos, exp, floor, ln, log10, log2, round, signum, sin,
sqrt, tan, trunc, Column, DFField, DFSchema, Expr, JoinType, LogicalPlan,
LogicalPlanBuilder, Operator,
sqrt, tan, trunc, Column, DFField, DFSchema, Expr, JoinConstraint, JoinType,
LogicalPlan, LogicalPlanBuilder, Operator,
};
use datafusion::physical_plan::aggregates::AggregateFunction;
use datafusion::physical_plan::csv::CsvReadOptions;
Expand Down Expand Up @@ -257,23 +257,32 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
join.join_type
))
})?;
let join_type = match join_type {
protobuf::JoinType::Inner => JoinType::Inner,
protobuf::JoinType::Left => JoinType::Left,
protobuf::JoinType::Right => JoinType::Right,
protobuf::JoinType::Full => JoinType::Full,
protobuf::JoinType::Semi => JoinType::Semi,
protobuf::JoinType::Anti => JoinType::Anti,
};
LogicalPlanBuilder::from(convert_box_required!(join.left)?)
.join(
let join_constraint = protobuf::JoinConstraint::from_i32(
join.join_constraint,
)
.ok_or_else(|| {
proto_error(format!(
"Received a JoinNode message with unknown JoinConstraint {}",
join.join_constraint
))
})?;

let builder = LogicalPlanBuilder::from(convert_box_required!(join.left)?);
let builder = match join_constraint.into() {
JoinConstraint::On => builder.join(
&convert_box_required!(join.right)?,
join_type,
join_type.into(),
left_keys,
right_keys,
)?
.build()
.map_err(|e| e.into())
)?,
JoinConstraint::Using => builder.join_using(
&convert_box_required!(join.right)?,
join_type.into(),
left_keys,
)?,
};

builder.build().map_err(|e| e.into())
}
}
}
Expand Down
15 changes: 6 additions & 9 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use datafusion::arrow::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUn
use datafusion::datasource::CsvFile;
use datafusion::logical_plan::{
window_frames::{WindowFrame, WindowFrameBound, WindowFrameUnits},
Column, Expr, JoinType, LogicalPlan,
Column, Expr, JoinConstraint, JoinType, LogicalPlan,
};
use datafusion::physical_plan::aggregates::AggregateFunction;
use datafusion::physical_plan::functions::BuiltinScalarFunction;
Expand Down Expand Up @@ -804,26 +804,23 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
right,
on,
join_type,
join_constraint,
..
} => {
let left: protobuf::LogicalPlanNode = left.as_ref().try_into()?;
let right: protobuf::LogicalPlanNode = right.as_ref().try_into()?;
let join_type = match join_type {
JoinType::Inner => protobuf::JoinType::Inner,
JoinType::Left => protobuf::JoinType::Left,
JoinType::Right => protobuf::JoinType::Right,
JoinType::Full => protobuf::JoinType::Full,
JoinType::Semi => protobuf::JoinType::Semi,
JoinType::Anti => protobuf::JoinType::Anti,
};
let (left_join_column, right_join_column) =
on.iter().map(|(l, r)| (l.into(), r.into())).unzip();
let join_type: protobuf::JoinType = join_type.to_owned().into();
let join_constraint: protobuf::JoinConstraint =
join_constraint.to_owned().into();
Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::Join(Box::new(
protobuf::JoinNode {
left: Some(Box::new(left)),
right: Some(Box::new(right)),
join_type: join_type.into(),
join_constraint: join_constraint.into(),
left_join_column,
right_join_column,
},
Expand Down
46 changes: 45 additions & 1 deletion ballista/rust/core/src/serde/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

use std::{convert::TryInto, io::Cursor};

use datafusion::logical_plan::Operator;
use datafusion::logical_plan::{JoinConstraint, JoinType, Operator};
use datafusion::physical_plan::aggregates::AggregateFunction;
use datafusion::physical_plan::window_functions::BuiltInWindowFunction;

Expand Down Expand Up @@ -291,3 +291,47 @@ impl Into<datafusion::arrow::datatypes::DataType> for protobuf::PrimitiveScalarT
}
}
}

impl From<protobuf::JoinType> for JoinType {
fn from(t: protobuf::JoinType) -> Self {
match t {
protobuf::JoinType::Inner => JoinType::Inner,
protobuf::JoinType::Left => JoinType::Left,
protobuf::JoinType::Right => JoinType::Right,
protobuf::JoinType::Full => JoinType::Full,
protobuf::JoinType::Semi => JoinType::Semi,
protobuf::JoinType::Anti => JoinType::Anti,
}
}
}

impl From<JoinType> for protobuf::JoinType {
fn from(t: JoinType) -> Self {
match t {
JoinType::Inner => protobuf::JoinType::Inner,
JoinType::Left => protobuf::JoinType::Left,
JoinType::Right => protobuf::JoinType::Right,
JoinType::Full => protobuf::JoinType::Full,
JoinType::Semi => protobuf::JoinType::Semi,
JoinType::Anti => protobuf::JoinType::Anti,
}
}
}

impl From<protobuf::JoinConstraint> for JoinConstraint {
fn from(t: protobuf::JoinConstraint) -> Self {
match t {
protobuf::JoinConstraint::On => JoinConstraint::On,
protobuf::JoinConstraint::Using => JoinConstraint::Using,
}
}
}

impl From<JoinConstraint> for protobuf::JoinConstraint {
fn from(t: JoinConstraint) -> Self {
match t {
JoinConstraint::On => protobuf::JoinConstraint::On,
JoinConstraint::Using => protobuf::JoinConstraint::Using,
}
}
}
Loading

0 comments on commit 34475c7

Please sign in to comment.