From 85a1401210aaf63e800db4b90e3e0ba6e118ee32 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 17 Jul 2021 09:03:04 -0600 Subject: [PATCH 01/10] CompletedTask now includes meta-data for shuffle output partitions --- ballista/rust/core/proto/ballista.proto | 11 + .../src/execution_plans/shuffle_writer.rs | 250 ++++++++++-------- ballista/rust/core/src/serde/scheduler/mod.rs | 6 +- ballista/rust/executor/src/execution_loop.rs | 12 +- ballista/rust/executor/src/executor.rs | 21 +- ballista/rust/scheduler/src/state/mod.rs | 44 ++- 6 files changed, 206 insertions(+), 138 deletions(-) diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index 0575460cfca3..28646583354a 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -776,6 +776,17 @@ message FailedTask { message CompletedTask { string executor_id = 1; + // TODO tasks are currently always shuffle writes but this will not always be the case + // so we might want to think about some refactoring of the task definitions + repeated ShuffleWritePartition partitions = 2; +} + +message ShuffleWritePartition { + uint64 partition_id = 1; + string path = 2; + uint64 num_batches = 3; + uint64 num_rows = 4; + uint64 num_bytes = 5; } message TaskStatus { diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs b/ballista/rust/core/src/execution_plans/shuffle_writer.rs index d5c7d8f28449..b4316d9c5158 100644 --- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs @@ -31,7 +31,8 @@ use crate::error::BallistaError; use crate::memory_stream::MemoryStream; use crate::utils; -use crate::serde::scheduler::PartitionStats; +use crate::serde::protobuf::ShuffleWritePartition; +use crate::serde::scheduler::{PartitionLocation, PartitionStats}; use async_trait::async_trait; use datafusion::arrow::array::{ Array, ArrayBuilder, ArrayRef, StringBuilder, StructBuilder, UInt32Builder, @@ -39,16 +40,19 @@ use datafusion::arrow::array::{ }; use datafusion::arrow::compute::take; use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::arrow::ipc::reader::FileReader; use datafusion::arrow::ipc::writer::FileWriter; use datafusion::arrow::record_batch::RecordBatch; use datafusion::error::{DataFusionError, Result}; use datafusion::physical_plan::hash_join::create_hashes; +use datafusion::physical_plan::repartition::RepartitionExec; +use datafusion::physical_plan::Partitioning::RoundRobinBatch; use datafusion::physical_plan::{ DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SQLMetric, }; use futures::StreamExt; use hashbrown::HashMap; -use log::info; +use log::{debug, info}; use uuid::Uuid; /// ShuffleWriterExec represents a section of a query plan that has consistent partitioning and @@ -75,12 +79,16 @@ pub struct ShuffleWriterExec { struct ShuffleWriteMetrics { /// Time spend writing batches to shuffle files write_time: Arc, + input_rows: Arc, + output_rows: Arc, } impl ShuffleWriteMetrics { fn new() -> Self { Self { write_time: SQLMetric::time_nanos(), + input_rows: SQLMetric::counter(), + output_rows: SQLMetric::counter(), } } } @@ -113,50 +121,14 @@ impl ShuffleWriterExec { pub fn stage_id(&self) -> usize { self.stage_id } -} - -#[async_trait] -impl ExecutionPlan for ShuffleWriterExec { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - self.plan.schema() - } - - fn output_partitioning(&self) -> Partitioning { - match &self.shuffle_output_partitioning { - Some(p) => p.clone(), - _ => Partitioning::UnknownPartitioning(1), - } - } - fn children(&self) -> Vec> { - vec![self.plan.clone()] - } - - fn with_new_children( + pub async fn execute_shuffle_write( &self, - children: Vec>, - ) -> Result> { - assert!(children.len() == 1); - Ok(Arc::new(ShuffleWriterExec::try_new( - self.job_id.clone(), - self.stage_id, - children[0].clone(), - self.work_dir.clone(), - self.shuffle_output_partitioning.clone(), - )?)) - } - - async fn execute( - &self, - partition: usize, - ) -> Result>> { + input_partition: usize, + ) -> Result> { let now = Instant::now(); - let mut stream = self.plan.execute(partition).await?; + let mut stream = self.plan.execute(input_partition).await?; let mut path = PathBuf::from(&self.work_dir); path.push(&self.job_id); @@ -164,7 +136,7 @@ impl ExecutionPlan for ShuffleWriterExec { match &self.shuffle_output_partitioning { None => { - path.push(&format!("{}", partition)); + path.push(&format!("{}", input_partition)); std::fs::create_dir_all(&path)?; path.push("data.arrow"); let path = path.to_str().unwrap(); @@ -181,29 +153,18 @@ impl ExecutionPlan for ShuffleWriterExec { info!( "Executed partition {} in {} seconds. Statistics: {}", - partition, + input_partition, now.elapsed().as_secs(), stats ); - let schema = result_schema(); - - // build result set with summary of the partition execution status - let mut part_builder = UInt32Builder::new(1); - part_builder.append_value(partition as u32)?; - let part: ArrayRef = Arc::new(part_builder.finish()); - - let mut path_builder = StringBuilder::new(1); - path_builder.append_value(&path)?; - let path: ArrayRef = Arc::new(path_builder.finish()); - - let stats: ArrayRef = stats - .to_arrow_arrayref() - .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?; - let batch = RecordBatch::try_new(schema.clone(), vec![part, path, stats]) - .map_err(DataFusionError::ArrowError)?; - - Ok(Box::pin(MemoryStream::try_new(vec![batch], schema, None)?)) + Ok(vec![ShuffleWritePartition { + partition_id: input_partition as u64, + path: path.to_owned(), + num_batches: stats.num_batches.unwrap_or(0), + num_rows: stats.num_rows.unwrap_or(0), + num_bytes: stats.num_bytes.unwrap_or(0), + }]) } Some(Partitioning::Hash(exprs, n)) => { @@ -218,8 +179,12 @@ impl ExecutionPlan for ShuffleWriterExec { let hashes_buf = &mut vec![]; let random_state = ahash::RandomState::with_seeds(0, 0, 0, 0); + while let Some(result) = stream.next().await { let input_batch = result?; + + self.metrics.input_rows.add(input_batch.num_rows()); + let arrays = exprs .iter() .map(|expr| { @@ -241,6 +206,7 @@ impl ExecutionPlan for ShuffleWriterExec { indices.into_iter().enumerate() { let indices = partition_indices.into(); + // Produce batches based on indices let columns = input_batch .columns() @@ -255,7 +221,8 @@ impl ExecutionPlan for ShuffleWriterExec { let output_batch = RecordBatch::try_new(input_batch.schema(), columns)?; - // write batch out + // write non-empty batch out + //if output_batch.num_rows() > 0 { let start = Instant::now(); match &mut writers[output_partition] { Some(w) => { @@ -266,7 +233,7 @@ impl ExecutionPlan for ShuffleWriterExec { path.push(&format!("{}", output_partition)); std::fs::create_dir_all(&path)?; - path.push("data.arrow"); + path.push(format!("data-{}.arrow", input_partition)); let path = path.to_str().unwrap(); info!("Writing results to {}", path); @@ -277,58 +244,39 @@ impl ExecutionPlan for ShuffleWriterExec { writers[output_partition] = Some(writer); } } + self.metrics.output_rows.add(output_batch.num_rows()); self.metrics.write_time.add_elapsed(start); + //} } } - // build metadata result batch - let num_writers = writers.iter().filter(|w| w.is_some()).count(); - let mut partition_builder = UInt32Builder::new(num_writers); - let mut path_builder = StringBuilder::new(num_writers); - let mut num_rows_builder = UInt64Builder::new(num_writers); - let mut num_batches_builder = UInt64Builder::new(num_writers); - let mut num_bytes_builder = UInt64Builder::new(num_writers); + let mut part_locs = vec![]; for (i, w) in writers.iter_mut().enumerate() { match w { Some(w) => { w.finish()?; - path_builder.append_value(w.path())?; - partition_builder.append_value(i as u32)?; - num_rows_builder.append_value(w.num_rows)?; - num_batches_builder.append_value(w.num_batches)?; - num_bytes_builder.append_value(w.num_bytes)?; + info!( + "Finished writing shuffle partition {} at {}. Batches: {}. Rows: {}. Bytes: {}.", + i, + w.path(), + w.num_batches, + w.num_rows, + w.num_bytes + ); + + part_locs.push(ShuffleWritePartition { + partition_id: i as u64, + path: w.path().to_owned(), + num_batches: w.num_batches, + num_rows: w.num_rows, + num_bytes: w.num_bytes, + }); } None => {} } } - - // build arrays - let partition_num: ArrayRef = Arc::new(partition_builder.finish()); - let path: ArrayRef = Arc::new(path_builder.finish()); - let field_builders: Vec> = vec![ - Box::new(num_rows_builder), - Box::new(num_batches_builder), - Box::new(num_bytes_builder), - ]; - let mut stats_builder = StructBuilder::new( - PartitionStats::default().arrow_struct_fields(), - field_builders, - ); - for _ in 0..num_writers { - stats_builder.append(true)?; - } - let stats = Arc::new(stats_builder.finish()); - - // build result batch containing metadata - let schema = result_schema(); - let batch = RecordBatch::try_new( - schema.clone(), - vec![partition_num, path, stats], - ) - .map_err(DataFusionError::ArrowError)?; - - Ok(Box::pin(MemoryStream::try_new(vec![batch], schema, None)?)) + Ok(part_locs) } _ => Err(DataFusionError::Execution( @@ -336,9 +284,97 @@ impl ExecutionPlan for ShuffleWriterExec { )), } } +} + +#[async_trait] +impl ExecutionPlan for ShuffleWriterExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.plan.schema() + } + + fn output_partitioning(&self) -> Partitioning { + match &self.shuffle_output_partitioning { + Some(p) => p.clone(), + _ => Partitioning::UnknownPartitioning(1), + } + } + + fn children(&self) -> Vec> { + vec![self.plan.clone()] + } + + fn with_new_children( + &self, + children: Vec>, + ) -> Result> { + assert!(children.len() == 1); + Ok(Arc::new(ShuffleWriterExec::try_new( + self.job_id.clone(), + self.stage_id, + children[0].clone(), + self.work_dir.clone(), + self.shuffle_output_partitioning.clone(), + )?)) + } + + async fn execute( + &self, + input_partition: usize, + ) -> Result>> { + let part_loc = self.execute_shuffle_write(input_partition).await?; + + // build metadata result batch + let num_writers = part_loc.len(); + let mut partition_builder = UInt32Builder::new(num_writers); + let mut path_builder = StringBuilder::new(num_writers); + let mut num_rows_builder = UInt64Builder::new(num_writers); + let mut num_batches_builder = UInt64Builder::new(num_writers); + let mut num_bytes_builder = UInt64Builder::new(num_writers); + + for loc in &part_loc { + path_builder.append_value(loc.path.clone())?; + partition_builder.append_value(loc.partition_id as u32)?; + num_rows_builder.append_value(loc.num_rows)?; + num_batches_builder.append_value(loc.num_batches)?; + num_bytes_builder.append_value(loc.num_bytes)?; + } + + // build arrays + let partition_num: ArrayRef = Arc::new(partition_builder.finish()); + let path: ArrayRef = Arc::new(path_builder.finish()); + let field_builders: Vec> = vec![ + Box::new(num_rows_builder), + Box::new(num_batches_builder), + Box::new(num_bytes_builder), + ]; + let mut stats_builder = StructBuilder::new( + PartitionStats::default().arrow_struct_fields(), + field_builders, + ); + for _ in 0..num_writers { + stats_builder.append(true)?; + } + let stats = Arc::new(stats_builder.finish()); + + // build result batch containing metadata + let schema = result_schema(); + let batch = + RecordBatch::try_new(schema.clone(), vec![partition_num, path, stats]) + .map_err(DataFusionError::ArrowError)?; + + debug!("RESULTS METADATA:\n{:?}", batch); + + Ok(Box::pin(MemoryStream::try_new(vec![batch], schema, None)?)) + } fn metrics(&self) -> HashMap { let mut metrics = HashMap::new(); + metrics.insert("inputRows".to_owned(), (*self.metrics.input_rows).clone()); + metrics.insert("outputRows".to_owned(), (*self.metrics.output_rows).clone()); metrics.insert("writeTime".to_owned(), (*self.metrics.write_time).clone()); metrics } @@ -454,13 +490,13 @@ mod tests { let file0 = path.value(0); assert!( - file0.ends_with("/jobOne/1/0/data.arrow") - || file0.ends_with("\\jobOne\\1\\0\\data.arrow") + file0.ends_with("/jobOne/1/0/data-0.arrow") + || file0.ends_with("\\jobOne\\1\\0\\data-0.arrow") ); let file1 = path.value(1); assert!( - file1.ends_with("/jobOne/1/1/data.arrow") - || file1.ends_with("\\jobOne\\1\\1\\data.arrow") + file1.ends_with("/jobOne/1/1/data-0.arrow") + || file1.ends_with("\\jobOne\\1\\1\\data-0.arrow") ); let stats = batch.columns()[2] diff --git a/ballista/rust/core/src/serde/scheduler/mod.rs b/ballista/rust/core/src/serde/scheduler/mod.rs index fa2c1b890e84..b9c0236f6e00 100644 --- a/ballista/rust/core/src/serde/scheduler/mod.rs +++ b/ballista/rust/core/src/serde/scheduler/mod.rs @@ -96,9 +96,9 @@ impl From for ExecutorMeta { /// Summary of executed partition #[derive(Debug, Copy, Clone)] pub struct PartitionStats { - num_rows: Option, - num_batches: Option, - num_bytes: Option, + pub(crate) num_rows: Option, + pub(crate) num_batches: Option, + pub(crate) num_bytes: Option, } impl Default for PartitionStats { diff --git a/ballista/rust/executor/src/execution_loop.rs b/ballista/rust/executor/src/execution_loop.rs index 17f6e4dd5d35..b65b83bbaf48 100644 --- a/ballista/rust/executor/src/execution_loop.rs +++ b/ballista/rust/executor/src/execution_loop.rs @@ -27,7 +27,8 @@ use tonic::transport::Channel; use ballista_core::serde::protobuf::ExecutorRegistration; use ballista_core::serde::protobuf::{ self, scheduler_grpc_client::SchedulerGrpcClient, task_status, FailedTask, - PartitionId, PollWorkParams, PollWorkResult, TaskDefinition, TaskStatus, + PartitionId, PollWorkParams, PollWorkResult, ShuffleWritePartition, TaskDefinition, + TaskStatus, }; use protobuf::CompletedTask; @@ -110,7 +111,7 @@ async fn run_received_tasks( tokio::spawn(async move { let execution_result = executor - .execute_partition( + .execute_shuffle_write( task_id.job_id.clone(), task_id.stage_id as usize, task_id.partition_id as usize, @@ -121,7 +122,7 @@ async fn run_received_tasks( debug!("Statistics: {:?}", execution_result); available_tasks_slots.fetch_add(1, Ordering::SeqCst); let _ = task_status_sender.send(as_task_status( - execution_result.map(|_| ()), + execution_result, executor_id, task_id, )); @@ -129,18 +130,19 @@ async fn run_received_tasks( } fn as_task_status( - execution_result: ballista_core::error::Result<()>, + execution_result: ballista_core::error::Result>, executor_id: String, task_id: PartitionId, ) -> TaskStatus { match execution_result { - Ok(_) => { + Ok(partitions) => { info!("Task {:?} finished", task_id); TaskStatus { partition_id: Some(task_id), status: Some(task_status::Status::Completed(CompletedTask { executor_id, + partitions, })), } } diff --git a/ballista/rust/executor/src/executor.rs b/ballista/rust/executor/src/executor.rs index 4a75448b5f06..cbf3eb040ff6 100644 --- a/ballista/rust/executor/src/executor.rs +++ b/ballista/rust/executor/src/executor.rs @@ -21,8 +21,7 @@ use std::sync::Arc; use ballista_core::error::BallistaError; use ballista_core::execution_plans::ShuffleWriterExec; -use ballista_core::utils; -use datafusion::arrow::record_batch::RecordBatch; +use ballista_core::serde::protobuf; use datafusion::physical_plan::display::DisplayableExecutionPlan; use datafusion::physical_plan::ExecutionPlan; @@ -45,22 +44,26 @@ impl Executor { /// Execute one partition of a query stage and persist the result to disk in IPC format. On /// success, return a RecordBatch containing metadata about the results, including path /// and statistics. - pub async fn execute_partition( + pub async fn execute_shuffle_write( &self, job_id: String, stage_id: usize, part: usize, plan: Arc, - ) -> Result { + ) -> Result, BallistaError> { + // TODO to enable shuffling we need to specify the output partitioning here and + // until we do that there is always a single output partition + // see https://github.com/apache/arrow-datafusion/issues/707 + let shuffle_output_partitioning = None; + let exec = ShuffleWriterExec::try_new( job_id, stage_id, plan, self.work_dir.clone(), - None, + shuffle_output_partitioning, )?; - let mut stream = exec.execute(part).await?; - let batches = utils::collect_stream(&mut stream).await?; + let partitions = exec.execute_shuffle_write(part).await?; println!( "=== Physical plan with metrics ===\n{}\n", @@ -69,9 +72,7 @@ impl Executor { .to_string() ); - // the output should be a single batch containing metadata (path and statistics) - assert!(batches.len() == 1); - Ok(batches[0].clone()) + Ok(partitions) } pub fn work_dir(&self) -> &str { diff --git a/ballista/rust/scheduler/src/state/mod.rs b/ballista/rust/scheduler/src/state/mod.rs index 3ddbced22684..9d8663a78b2a 100644 --- a/ballista/rust/scheduler/src/state/mod.rs +++ b/ballista/rust/scheduler/src/state/mod.rs @@ -254,9 +254,9 @@ impl SchedulerState { executors: &[ExecutorMeta], ) -> Result { let executor_id: &str = match &task_status.status { - Some(task_status::Status::Completed(CompletedTask { executor_id })) => { - executor_id - } + Some(task_status::Status::Completed(CompletedTask { + executor_id, .. + })) => executor_id, Some(task_status::Status::Running(RunningTask { executor_id })) => { executor_id } @@ -317,9 +317,13 @@ impl SchedulerState { if task_is_dead { continue 'tasks; } else if let Some(task_status::Status::Completed( - CompletedTask { executor_id }, + CompletedTask { executor_id, .. }, )) = &referenced_task.status { + //TODO: this logic is incorrect and is ignoring the shuffle write + // meta-data in the CompletedTask + // see https://github.com/apache/arrow-datafusion/issues/707 + let empty = vec![]; let locations = partition_locations .entry(unresolved_shuffle.stage_id) @@ -452,9 +456,10 @@ impl SchedulerState { let mut job_status = statuses .iter() .map(|status| match &status.status { - Some(task_status::Status::Completed(CompletedTask { executor_id })) => { - Ok((status, executor_id)) - } + Some(task_status::Status::Completed(CompletedTask { + executor_id, + .. + })) => Ok((status, executor_id)), _ => Err(BallistaError::General("Task not completed".to_string())), }) .collect::>>() @@ -462,12 +467,18 @@ impl SchedulerState { .map(|info| { let partition_location = info .into_iter() - .map(|(status, execution_id)| PartitionLocation { - partition_id: status.partition_id.to_owned(), - executor_meta: executors - .get(execution_id) - .map(|e| e.clone().into()), - partition_stats: None, + .map(|(status, execution_id)| { + //TODO: this logic is incorrect and is ignoring the shuffle write + // meta-data in the CompletedTask + // see https://github.com/apache/arrow-datafusion/issues/707 + + PartitionLocation { + partition_id: status.partition_id.to_owned(), + executor_meta: executors + .get(execution_id) + .map(|e| e.clone().into()), + partition_stats: None, + } }) .collect(); job_status::Status::Completed(CompletedJob { partition_location }) @@ -745,6 +756,7 @@ mod test { let meta = TaskStatus { status: Some(task_status::Status::Completed(CompletedTask { executor_id: "".to_owned(), + partitions: vec![], })), partition_id: Some(PartitionId { job_id: job_id.to_owned(), @@ -784,6 +796,7 @@ mod test { let meta = TaskStatus { status: Some(task_status::Status::Completed(CompletedTask { executor_id: "".to_owned(), + partitions: vec![], })), partition_id: Some(PartitionId { job_id: job_id.to_owned(), @@ -821,6 +834,7 @@ mod test { let meta = TaskStatus { status: Some(task_status::Status::Completed(CompletedTask { executor_id: "".to_owned(), + partitions: vec![], })), partition_id: Some(PartitionId { job_id: job_id.to_owned(), @@ -832,6 +846,7 @@ mod test { let meta = TaskStatus { status: Some(task_status::Status::Completed(CompletedTask { executor_id: "".to_owned(), + partitions: vec![], })), partition_id: Some(PartitionId { job_id: job_id.to_owned(), @@ -863,6 +878,7 @@ mod test { let meta = TaskStatus { status: Some(task_status::Status::Completed(CompletedTask { executor_id: "".to_owned(), + partitions: vec![], })), partition_id: Some(PartitionId { job_id: job_id.to_owned(), @@ -874,6 +890,7 @@ mod test { let meta = TaskStatus { status: Some(task_status::Status::Completed(CompletedTask { executor_id: "".to_owned(), + partitions: vec![], })), partition_id: Some(PartitionId { job_id: job_id.to_owned(), @@ -905,6 +922,7 @@ mod test { let meta = TaskStatus { status: Some(task_status::Status::Completed(CompletedTask { executor_id: "".to_owned(), + partitions: vec![], })), partition_id: Some(PartitionId { job_id: job_id.to_owned(), From bb64c6285a0d5533811fb95681bc6fc61952be35 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 17 Jul 2021 09:42:15 -0600 Subject: [PATCH 02/10] Bug fix --- ballista/rust/core/src/execution_plans/shuffle_writer.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs b/ballista/rust/core/src/execution_plans/shuffle_writer.rs index b4316d9c5158..11d0ce2d1128 100644 --- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs @@ -297,10 +297,11 @@ impl ExecutionPlan for ShuffleWriterExec { } fn output_partitioning(&self) -> Partitioning { - match &self.shuffle_output_partitioning { - Some(p) => p.clone(), - _ => Partitioning::UnknownPartitioning(1), - } + // This operator needs to be executed once for each *input* partition and there + // isn't really a mechanism yet in DataFusion to support this use case so we report + // the input partitioning as the output partitioning here. The executor reports + // output partition meta data back to the scheduler. + self.plan.output_partitioning() } fn children(&self) -> Vec> { From 45a4aa703e58b4ada873948bfccd06ef92f4a7d5 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 17 Jul 2021 10:47:11 -0600 Subject: [PATCH 03/10] Save --- ballista/rust/core/proto/ballista.proto | 1 + .../src/execution_plans/shuffle_writer.rs | 5 + .../core/src/serde/physical_plan/to_proto.rs | 38 +++--- .../core/src/serde/scheduler/from_proto.rs | 1 + ballista/rust/core/src/serde/scheduler/mod.rs | 1 + .../rust/core/src/serde/scheduler/to_proto.rs | 1 + ballista/rust/scheduler/src/planner.rs | 44 +++++-- ballista/rust/scheduler/src/state/mod.rs | 111 +++++++++++------- 8 files changed, 130 insertions(+), 72 deletions(-) diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index 28646583354a..50bd901f145d 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -721,6 +721,7 @@ message PartitionLocation { PartitionId partition_id = 1; ExecutorMetadata executor_meta = 2; PartitionStats partition_stats = 3; + string path = 4; } // Unique identifier for a materialized partition of data diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs b/ballista/rust/core/src/execution_plans/shuffle_writer.rs index 11d0ce2d1128..47bf2a25f636 100644 --- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs @@ -122,6 +122,11 @@ impl ShuffleWriterExec { self.stage_id } + /// Get the true output partitioning + pub fn shuffle_output_partitioning(&self) -> Option<&Partitioning> { + self.shuffle_output_partitioning.as_ref() + } + pub async fn execute_shuffle_write( &self, input_partition: usize, diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs b/ballista/rust/core/src/serde/physical_plan/to_proto.rs index 0429efb7c017..fa35eb48d4fa 100644 --- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs @@ -361,29 +361,33 @@ impl TryInto for Arc { } else if let Some(exec) = plan.downcast_ref::() { let input: protobuf::PhysicalPlanNode = exec.children()[0].to_owned().try_into()?; + // note that we use shuffle_output_partitioning() rather than output_partitioning() + // to get the true output partitioning + let output_partitioning = match exec.shuffle_output_partitioning() { + Some(Partitioning::Hash(exprs, partition_count)) => { + Some(protobuf::PhysicalHashRepartition { + hash_expr: exprs + .iter() + .map(|expr| expr.clone().try_into()) + .collect::, BallistaError>>()?, + partition_count: *partition_count as u64, + }) + } + None => None, + other => { + return Err(BallistaError::General(format!( + "physical_plan::to_proto() invalid partitioning for ShuffleWriterExec: {:?}", + other + ))) + } + }; Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::ShuffleWriter(Box::new( protobuf::ShuffleWriterExecNode { job_id: exec.job_id().to_string(), stage_id: exec.stage_id() as u32, input: Some(Box::new(input)), - output_partitioning: match exec.output_partitioning() { - Partitioning::Hash(exprs, partition_count) => { - Some(protobuf::PhysicalHashRepartition { - hash_expr: exprs - .iter() - .map(|expr| expr.clone().try_into()) - .collect::, BallistaError>>()?, - partition_count: partition_count as u64, - }) - } - other => { - return Err(BallistaError::General(format!( - "physical_plan::to_proto() invalid partitioning for ShuffleWriterExec: {:?}", - other - ))) - } - }, + output_partitioning, }, ))), }) diff --git a/ballista/rust/core/src/serde/scheduler/from_proto.rs b/ballista/rust/core/src/serde/scheduler/from_proto.rs index 73f8f53956de..4f9c9bc8877e 100644 --- a/ballista/rust/core/src/serde/scheduler/from_proto.rs +++ b/ballista/rust/core/src/serde/scheduler/from_proto.rs @@ -102,6 +102,7 @@ impl TryInto for protobuf::PartitionLocation { ) })? .into(), + path: self.path, }) } } diff --git a/ballista/rust/core/src/serde/scheduler/mod.rs b/ballista/rust/core/src/serde/scheduler/mod.rs index b9c0236f6e00..eeddfbbb41f3 100644 --- a/ballista/rust/core/src/serde/scheduler/mod.rs +++ b/ballista/rust/core/src/serde/scheduler/mod.rs @@ -62,6 +62,7 @@ pub struct PartitionLocation { pub partition_id: PartitionId, pub executor_meta: ExecutorMeta, pub partition_stats: PartitionStats, + pub path: String, } /// Meta-data for an executor, used when fetching shuffle partitions from other executors diff --git a/ballista/rust/core/src/serde/scheduler/to_proto.rs b/ballista/rust/core/src/serde/scheduler/to_proto.rs index c3f2046305cf..a8d6ada87278 100644 --- a/ballista/rust/core/src/serde/scheduler/to_proto.rs +++ b/ballista/rust/core/src/serde/scheduler/to_proto.rs @@ -70,6 +70,7 @@ impl TryInto for PartitionLocation { partition_id: Some(self.partition_id.into()), executor_meta: Some(self.executor_meta.into()), partition_stats: Some(self.partition_stats.into()), + path: self.path.to_string(), }) } } diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs index 3f90da238b7f..11f5c994fd52 100644 --- a/ballista/rust/scheduler/src/planner.rs +++ b/ballista/rust/scheduler/src/planner.rs @@ -108,6 +108,10 @@ impl DistributedPlanner { let query_stage = create_shuffle_writer( job_id, self.next_stage_id(), + //TODO should be children[0].clone() so that we replace this + // with an UnresolvedShuffleExec instead of just executing this + // part of the plan again + // see https://github.com/apache/arrow-datafusion/issues/707 coalesce.children()[0].clone(), None, )?; @@ -127,6 +131,10 @@ impl DistributedPlanner { let query_stage = create_shuffle_writer( job_id, self.next_stage_id(), + //TODO should be children[0].clone() so that we replace this + // with an UnresolvedShuffleExec instead of just executing this + // part of the plan again + // see https://github.com/apache/arrow-datafusion/issues/707 repart.children()[0].clone(), Some(repart.partitioning().to_owned()), )?; @@ -158,7 +166,7 @@ impl DistributedPlanner { pub fn remove_unresolved_shuffles( stage: &dyn ExecutionPlan, - partition_locations: &HashMap>>, + partition_locations: &HashMap>>, ) -> Result> { let mut new_children: Vec> = vec![]; for child in stage.children() { @@ -166,16 +174,30 @@ pub fn remove_unresolved_shuffles( child.as_any().downcast_ref::() { let mut relevant_locations = vec![]; - relevant_locations.append( - &mut partition_locations - .get(&unresolved_shuffle.stage_id) - .ok_or_else(|| { - BallistaError::General( - "Missing partition location. Could not remove unresolved shuffles" - .to_owned(), - ) - })? - .clone(), + let p = partition_locations + .get(&unresolved_shuffle.stage_id) + .ok_or_else(|| { + BallistaError::General( + "Missing partition location. Could not remove unresolved shuffles" + .to_owned(), + ) + })? + .clone(); + + for i in 0..unresolved_shuffle.partition_count { + if let Some(x) = p.get(&i) { + relevant_locations.push(x.to_owned()); + } else { + relevant_locations.push(vec![]); + } + } + println!( + "create shuffle reader with {:?}", + relevant_locations + .iter() + .map(|c| format!("{:?}", c)) + .collect::>() + .join("\n") ); new_children.push(Arc::new(ShuffleReaderExec::try_new( relevant_locations, diff --git a/ballista/rust/scheduler/src/state/mod.rs b/ballista/rust/scheduler/src/state/mod.rs index 9d8663a78b2a..a4ae59e1dfda 100644 --- a/ballista/rust/scheduler/src/state/mod.rs +++ b/ballista/rust/scheduler/src/state/mod.rs @@ -27,16 +27,13 @@ use prost::Message; use tokio::sync::OwnedMutexGuard; use ballista_core::serde::protobuf::{ - job_status, task_status, CompletedJob, CompletedTask, ExecutorHeartbeat, + self, job_status, task_status, CompletedJob, CompletedTask, ExecutorHeartbeat, ExecutorMetadata, FailedJob, FailedTask, JobStatus, PhysicalPlanNode, RunningJob, RunningTask, TaskStatus, }; use ballista_core::serde::scheduler::PartitionStats; use ballista_core::{error::BallistaError, serde::scheduler::ExecutorMeta}; -use ballista_core::{ - error::Result, execution_plans::UnresolvedShuffleExec, - serde::protobuf::PartitionLocation, -}; +use ballista_core::{error::Result, execution_plans::UnresolvedShuffleExec}; use super::planner::remove_unresolved_shuffles; @@ -298,8 +295,11 @@ impl SchedulerState { // Let's try to resolve any unresolved shuffles we find let unresolved_shuffles = find_unresolved_shuffles(&plan)?; let mut partition_locations: HashMap< - usize, - Vec>, + usize, // stage id + HashMap< + usize, // shuffle input partition id + Vec, // shuffle output partitions + >, > = HashMap::new(); for unresolved_shuffle in unresolved_shuffles { for partition_id in 0..unresolved_shuffle.partition_count { @@ -317,34 +317,49 @@ impl SchedulerState { if task_is_dead { continue 'tasks; } else if let Some(task_status::Status::Completed( - CompletedTask { executor_id, .. }, + CompletedTask { + executor_id, + partitions, + }, )) = &referenced_task.status { - //TODO: this logic is incorrect and is ignoring the shuffle write - // meta-data in the CompletedTask - // see https://github.com/apache/arrow-datafusion/issues/707 - - let empty = vec![]; let locations = partition_locations .entry(unresolved_shuffle.stage_id) - .or_insert(empty); + .or_insert_with(HashMap::new); let executor_meta = executors .iter() .find(|exec| exec.id == *executor_id) .unwrap() .clone(); - locations.push(vec![ - ballista_core::serde::scheduler::PartitionLocation { - partition_id: - ballista_core::serde::scheduler::PartitionId { - job_id: partition.job_id.clone(), - stage_id: unresolved_shuffle.stage_id, - partition_id, - }, - executor_meta, - partition_stats: PartitionStats::default(), - }, - ]); + + let temp = + locations.entry(partition_id).or_insert_with(Vec::new); + for p in partitions { + let executor_meta = executor_meta.clone(); + let partition_location = + ballista_core::serde::scheduler::PartitionLocation { + partition_id: + ballista_core::serde::scheduler::PartitionId { + job_id: partition.job_id.clone(), + stage_id: unresolved_shuffle.stage_id, + partition_id, + }, + executor_meta, + partition_stats: PartitionStats::new( + Some(p.num_rows), + Some(p.num_batches), + Some(p.num_bytes), + ), + path: p.path.clone(), + }; + info!( + "Scheduler storing stage {} partition {} path: {}", + unresolved_shuffle.stage_id, + partition_id, + partition_location.path + ); + temp.push(partition_location); + } } else { continue 'tasks; } @@ -458,29 +473,37 @@ impl SchedulerState { .map(|status| match &status.status { Some(task_status::Status::Completed(CompletedTask { executor_id, - .. - })) => Ok((status, executor_id)), + partitions, + })) => Ok((status, executor_id, partitions)), _ => Err(BallistaError::General("Task not completed".to_string())), }) .collect::>>() .ok() .map(|info| { - let partition_location = info - .into_iter() - .map(|(status, execution_id)| { - //TODO: this logic is incorrect and is ignoring the shuffle write - // meta-data in the CompletedTask - // see https://github.com/apache/arrow-datafusion/issues/707 - - PartitionLocation { - partition_id: status.partition_id.to_owned(), - executor_meta: executors - .get(execution_id) - .map(|e| e.clone().into()), - partition_stats: None, - } - }) - .collect(); + let mut partition_location = vec![]; + for (status, executor_id, partitions) in info { + let input_partition_id = status.partition_id.as_ref().unwrap(); //TODO unwrap + let executor_meta = + executors.get(executor_id).map(|e| e.clone().into()); + for shuffle_write_partition in partitions { + let shuffle_input_partition_id = Some(protobuf::PartitionId { + job_id: input_partition_id.job_id.clone(), + stage_id: input_partition_id.stage_id, + partition_id: input_partition_id.partition_id, + }); + partition_location.push(protobuf::PartitionLocation { + partition_id: shuffle_input_partition_id.clone(), + executor_meta: executor_meta.clone(), + partition_stats: Some(protobuf::PartitionStats { + num_batches: shuffle_write_partition.num_batches as i64, + num_rows: shuffle_write_partition.num_rows as i64, + num_bytes: shuffle_write_partition.num_bytes as i64, + column_stats: vec![], + }), + path: shuffle_write_partition.path.clone(), + }); + } + } job_status::Status::Completed(CompletedJob { partition_location }) }); From 6a9476a6c71fe5e9aa7351b70bbbd28656645b99 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 17 Jul 2021 11:05:05 -0600 Subject: [PATCH 04/10] save --- ballista/rust/core/src/serde/scheduler/to_proto.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ballista/rust/core/src/serde/scheduler/to_proto.rs b/ballista/rust/core/src/serde/scheduler/to_proto.rs index a8d6ada87278..57d4f615c5f8 100644 --- a/ballista/rust/core/src/serde/scheduler/to_proto.rs +++ b/ballista/rust/core/src/serde/scheduler/to_proto.rs @@ -70,7 +70,7 @@ impl TryInto for PartitionLocation { partition_id: Some(self.partition_id.into()), executor_meta: Some(self.executor_meta.into()), partition_stats: Some(self.partition_stats.into()), - path: self.path.to_string(), + path: self.path, }) } } From c775d2f32dff090314047cebbc24f66fe8cc3764 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 18 Jul 2021 14:19:57 -0600 Subject: [PATCH 05/10] Ballista shuffle mechanism now works as intended --- ballista/rust/client/src/context.rs | 1 + ballista/rust/core/proto/ballista.proto | 16 +++++- ballista/rust/core/src/client.rs | 9 +++- .../src/execution_plans/shuffle_reader.rs | 34 ++++++------ .../src/execution_plans/shuffle_writer.rs | 11 ++++ .../src/execution_plans/unresolved_shuffle.rs | 18 +++++-- .../src/serde/physical_plan/from_proto.rs | 43 +++++++++------ .../core/src/serde/physical_plan/to_proto.rs | 3 +- .../core/src/serde/scheduler/from_proto.rs | 9 ++-- ballista/rust/core/src/serde/scheduler/mod.rs | 12 ++++- .../rust/core/src/serde/scheduler/to_proto.rs | 41 +++++++++++++- ballista/rust/executor/src/execution_loop.rs | 23 ++++++-- ballista/rust/executor/src/executor.rs | 25 ++++++--- ballista/rust/executor/src/flight_service.rs | 21 +++----- ballista/rust/scheduler/src/lib.rs | 48 ++++++++++++----- ballista/rust/scheduler/src/planner.rs | 54 ++++++++++--------- ballista/rust/scheduler/src/state/mod.rs | 40 ++++++++------ 17 files changed, 284 insertions(+), 124 deletions(-) diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs index b8210cbc2626..26087f8e6693 100644 --- a/ballista/rust/client/src/context.rs +++ b/ballista/rust/client/src/context.rs @@ -223,6 +223,7 @@ impl BallistaContext { &partition_id.job_id, partition_id.stage_id as usize, partition_id.partition_id as usize, + &location.path, ) .await .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?) diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index 50bd901f145d..9dbce81c21f1 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -544,7 +544,8 @@ message PhysicalNegativeNode { message UnresolvedShuffleExecNode { uint32 stage_id = 1; Schema schema = 2; - uint32 partition_count = 3; + uint32 input_partition_count = 3; + uint32 output_partition_count = 4; } message FilterExecNode { @@ -700,7 +701,7 @@ message Action { oneof ActionType { // Fetch a partition from an executor - PartitionId fetch_partition = 3; + FetchPartition fetch_partition = 3; } // configuration settings @@ -714,6 +715,15 @@ message ExecutePartition { PhysicalPlanNode plan = 4; // The task could need to read partitions from other executors repeated PartitionLocation partition_location = 5; + // Output partition for shuffle writer + PhysicalHashRepartition output_partitioning = 6; +} + +message FetchPartition { + string job_id = 1; + uint32 stage_id = 2; + uint32 partition_id = 3; + string path = 4; } // Mapping from partition id to executor id @@ -809,6 +819,8 @@ message PollWorkParams { message TaskDefinition { PartitionId task_id = 1; PhysicalPlanNode plan = 2; + // Output partition for shuffle writer + PhysicalHashRepartition output_partitioning = 3; } message PollWorkResult { diff --git a/ballista/rust/core/src/client.rs b/ballista/rust/core/src/client.rs index 2df414578355..26c8d22b405d 100644 --- a/ballista/rust/core/src/client.rs +++ b/ballista/rust/core/src/client.rs @@ -81,9 +81,14 @@ impl BallistaClient { job_id: &str, stage_id: usize, partition_id: usize, + path: &str, ) -> Result { - let action = - Action::FetchPartition(PartitionId::new(job_id, stage_id, partition_id)); + let action = Action::FetchPartition { + job_id: job_id.to_string(), + stage_id, + partition_id, + path: path.to_owned(), + }; self.execute_action(&action).await } diff --git a/ballista/rust/core/src/execution_plans/shuffle_reader.rs b/ballista/rust/core/src/execution_plans/shuffle_reader.rs index db03d3ddf080..c1e66257b776 100644 --- a/ballista/rust/core/src/execution_plans/shuffle_reader.rs +++ b/ballista/rust/core/src/execution_plans/shuffle_reader.rs @@ -76,6 +76,7 @@ impl ExecutionPlan for ShuffleReaderExec { } fn output_partitioning(&self) -> Partitioning { + // TODO partitioning may be known and could be populated here? Partitioning::UnknownPartitioning(self.partition.len()) } @@ -123,24 +124,26 @@ impl ExecutionPlan for ShuffleReaderExec { let loc_str = self .partition .iter() - .map(|x| { - x.iter() - .map(|l| { - format!( - "[executor={} part={}:{}:{} stats={}]", - l.executor_meta.id, - l.partition_id.job_id, - l.partition_id.stage_id, - l.partition_id.partition_id, - l.partition_stats - ) - }) - .collect::>() - .join(",") + .enumerate() + .map(|(partition_id, locations)| { + format!( + "[partition={} paths={}]", + partition_id, + locations + .iter() + .map(|l| l.path.clone()) + .collect::>() + .join(",") + ) }) .collect::>() .join(", "); - write!(f, "ShuffleReaderExec: partition_locations={}", loc_str) + write!( + f, + "ShuffleReaderExec: partition_locations({})={}", + self.partition.len(), + loc_str + ) } } } @@ -166,6 +169,7 @@ async fn fetch_partition( &partition_id.job_id, partition_id.stage_id as usize, partition_id.partition_id as usize, + &location.path, ) .await .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?) diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs b/ballista/rust/core/src/execution_plans/shuffle_writer.rs index 47bf2a25f636..8081dab36ab5 100644 --- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs @@ -141,6 +141,7 @@ impl ShuffleWriterExec { match &self.shuffle_output_partitioning { None => { + let start = Instant::now(); path.push(&format!("{}", input_partition)); std::fs::create_dir_all(&path)?; path.push("data.arrow"); @@ -156,6 +157,14 @@ impl ShuffleWriterExec { .await .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?; + self.metrics + .input_rows + .add(stats.num_rows.unwrap_or(0) as usize); + self.metrics + .output_rows + .add(stats.num_rows.unwrap_or(0) as usize); + self.metrics.write_time.add_elapsed(start); + info!( "Executed partition {} in {} seconds. Statistics: {}", input_partition, @@ -227,6 +236,8 @@ impl ShuffleWriterExec { RecordBatch::try_new(input_batch.schema(), columns)?; // write non-empty batch out + + //TODO optimize so we don't write or fetch empty partitions //if output_batch.num_rows() > 0 { let start = Instant::now(); match &mut writers[output_partition] { diff --git a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs index cb351eec561a..c7b171089aef 100644 --- a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs +++ b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs @@ -43,17 +43,26 @@ pub struct UnresolvedShuffleExec { // The schema this node will have once it is replaced with a ShuffleReaderExec pub schema: SchemaRef, + // The number of shuffle writer partition tasks that will produce the partitions + pub input_partition_count: usize, + // The partition count this node will have once it is replaced with a ShuffleReaderExec - pub partition_count: usize, + pub output_partition_count: usize, } impl UnresolvedShuffleExec { /// Create a new UnresolvedShuffleExec - pub fn new(stage_id: usize, schema: SchemaRef, partition_count: usize) -> Self { + pub fn new( + stage_id: usize, + schema: SchemaRef, + input_partition_count: usize, + output_partition_count: usize, + ) -> Self { Self { stage_id, schema, - partition_count, + input_partition_count, + output_partition_count, } } } @@ -69,7 +78,8 @@ impl ExecutionPlan for UnresolvedShuffleExec { } fn output_partitioning(&self) -> Partitioning { - Partitioning::UnknownPartitioning(self.partition_count) + //TODO the output partition is known and should be populated here! + Partitioning::UnknownPartitioning(self.output_partition_count) } fn children(&self) -> Vec> { diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index 4b0a9844773c..509044b3d1ba 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -376,21 +376,9 @@ impl TryInto> for &protobuf::PhysicalPlanNode { let input: Arc = convert_box_required!(shuffle_writer.input)?; - let output_partitioning = match &shuffle_writer.output_partitioning { - Some(hash_part) => { - let expr = hash_part - .hash_expr - .iter() - .map(|e| e.try_into()) - .collect::>, _>>()?; - - Some(Partitioning::Hash( - expr, - hash_part.partition_count.try_into().unwrap(), - )) - } - None => None, - }; + let output_partitioning = parse_protobuf_hash_partitioning( + shuffle_writer.output_partitioning.as_ref(), + )?; Ok(Arc::new(ShuffleWriterExec::try_new( shuffle_writer.job_id.clone(), @@ -466,7 +454,10 @@ impl TryInto> for &protobuf::PhysicalPlanNode { Ok(Arc::new(UnresolvedShuffleExec { stage_id: unresolved_shuffle.stage_id as usize, schema, - partition_count: unresolved_shuffle.partition_count as usize, + input_partition_count: unresolved_shuffle.input_partition_count + as usize, + output_partition_count: unresolved_shuffle.output_partition_count + as usize, })) } } @@ -680,3 +671,23 @@ impl TryFrom<&protobuf::physical_window_expr_node::WindowFunction> for WindowFun } } } + +pub fn parse_protobuf_hash_partitioning( + partitioning: Option<&protobuf::PhysicalHashRepartition>, +) -> Result, BallistaError> { + match partitioning { + Some(hash_part) => { + let expr = hash_part + .hash_expr + .iter() + .map(|e| e.try_into()) + .collect::>, _>>()?; + + Ok(Some(Partitioning::Hash( + expr, + hash_part.partition_count.try_into().unwrap(), + ))) + } + None => Ok(None), + } +} diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs b/ballista/rust/core/src/serde/physical_plan/to_proto.rs index fa35eb48d4fa..ec5ec7cb7aff 100644 --- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs @@ -397,7 +397,8 @@ impl TryInto for Arc { protobuf::UnresolvedShuffleExecNode { stage_id: exec.stage_id as u32, schema: Some(exec.schema().as_ref().into()), - partition_count: exec.partition_count as u32, + input_partition_count: exec.input_partition_count as u32, + output_partition_count: exec.output_partition_count as u32, }, )), }) diff --git a/ballista/rust/core/src/serde/scheduler/from_proto.rs b/ballista/rust/core/src/serde/scheduler/from_proto.rs index 4f9c9bc8877e..8d4e279395fa 100644 --- a/ballista/rust/core/src/serde/scheduler/from_proto.rs +++ b/ballista/rust/core/src/serde/scheduler/from_proto.rs @@ -32,9 +32,12 @@ impl TryInto for protobuf::Action { fn try_into(self) -> Result { match self.action_type { - Some(ActionType::FetchPartition(partition)) => { - Ok(Action::FetchPartition(partition.try_into()?)) - } + Some(ActionType::FetchPartition(fetch)) => Ok(Action::FetchPartition { + job_id: fetch.job_id, + stage_id: fetch.stage_id as usize, + partition_id: fetch.partition_id as usize, + path: fetch.path, + }), _ => Err(BallistaError::General( "scheduler::from_proto(Action) invalid or missing action".to_owned(), )), diff --git a/ballista/rust/core/src/serde/scheduler/mod.rs b/ballista/rust/core/src/serde/scheduler/mod.rs index eeddfbbb41f3..a20d955f28b2 100644 --- a/ballista/rust/core/src/serde/scheduler/mod.rs +++ b/ballista/rust/core/src/serde/scheduler/mod.rs @@ -23,6 +23,7 @@ use datafusion::arrow::array::{ use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::logical_plan::LogicalPlan; use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_plan::Partitioning; use serde::Serialize; use uuid::Uuid; @@ -36,7 +37,12 @@ pub mod to_proto; #[derive(Debug, Clone)] pub enum Action { /// Collect a shuffle partition - FetchPartition(PartitionId), + FetchPartition { + job_id: String, + stage_id: usize, + partition_id: usize, + path: String, + }, } /// Unique identifier for the output partition of an operator. @@ -223,6 +229,8 @@ pub struct ExecutePartition { pub plan: Arc, /// Location of shuffle partitions that this query stage may depend on pub shuffle_locations: HashMap, + /// Output partitioning for shuffle writes + pub output_partitioning: Option, } impl ExecutePartition { @@ -232,6 +240,7 @@ impl ExecutePartition { partition_id: Vec, plan: Arc, shuffle_locations: HashMap, + output_partitioning: Option, ) -> Self { Self { job_id, @@ -239,6 +248,7 @@ impl ExecutePartition { partition_id, plan, shuffle_locations, + output_partitioning, } } diff --git a/ballista/rust/core/src/serde/scheduler/to_proto.rs b/ballista/rust/core/src/serde/scheduler/to_proto.rs index 57d4f615c5f8..bdc88d0b99a2 100644 --- a/ballista/rust/core/src/serde/scheduler/to_proto.rs +++ b/ballista/rust/core/src/serde/scheduler/to_proto.rs @@ -23,14 +23,25 @@ use crate::serde::protobuf::action::ActionType; use crate::serde::scheduler::{ Action, ExecutePartition, PartitionId, PartitionLocation, PartitionStats, }; +use datafusion::physical_plan::Partitioning; impl TryInto for Action { type Error = BallistaError; fn try_into(self) -> Result { match self { - Action::FetchPartition(partition_id) => Ok(protobuf::Action { - action_type: Some(ActionType::FetchPartition(partition_id.into())), + Action::FetchPartition { + job_id, + stage_id, + partition_id, + path, + } => Ok(protobuf::Action { + action_type: Some(ActionType::FetchPartition(protobuf::FetchPartition { + job_id, + stage_id: stage_id as u32, + partition_id: partition_id as u32, + path, + })), settings: vec![], }), } @@ -47,6 +58,9 @@ impl TryInto for ExecutePartition { partition_id: self.partition_id.iter().map(|n| *n as u32).collect(), plan: Some(self.plan.try_into()?), partition_location: vec![], + output_partitioning: hash_partitioning_to_proto( + self.output_partitioning.as_ref(), + )?, }) } } @@ -87,3 +101,26 @@ impl Into for PartitionStats { } } } + +pub fn hash_partitioning_to_proto( + output_partitioning: Option<&Partitioning>, +) -> Result, BallistaError> { + match output_partitioning { + Some(Partitioning::Hash(exprs, partition_count)) => { + Ok(Some(protobuf::PhysicalHashRepartition { + hash_expr: exprs + .iter() + .map(|expr| expr.clone().try_into()) + .collect::, BallistaError>>()?, + partition_count: *partition_count as u64, + })) + } + None => Ok(None), + other => { + return Err(BallistaError::General(format!( + "scheduler::to_proto() invalid partitioning for ExecutePartition: {:?}", + other + ))) + } + } +} diff --git a/ballista/rust/executor/src/execution_loop.rs b/ballista/rust/executor/src/execution_loop.rs index b65b83bbaf48..4d12dfc1c755 100644 --- a/ballista/rust/executor/src/execution_loop.rs +++ b/ballista/rust/executor/src/execution_loop.rs @@ -33,6 +33,8 @@ use ballista_core::serde::protobuf::{ use protobuf::CompletedTask; use crate::executor::Executor; +use ballista_core::error::BallistaError; +use ballista_core::serde::physical_plan::from_proto::parse_protobuf_hash_partitioning; pub async fn poll_loop( mut scheduler: SchedulerGrpcClient, @@ -70,15 +72,23 @@ pub async fn poll_loop( match poll_work_result { Ok(result) => { if let Some(task) = result.into_inner().task { - run_received_tasks( + match run_received_tasks( executor.clone(), executor_meta.id.clone(), available_tasks_slots.clone(), task_status_sender, task, ) - .await; - active_job = true; + .await + { + Ok(_) => { + active_job = true; + } + Err(e) => { + warn!("Failed to run task: {:?}", e); + active_job = false; + } + } } else { active_job = false; } @@ -99,7 +109,7 @@ async fn run_received_tasks( available_tasks_slots: Arc, task_status_sender: Sender, task: TaskDefinition, -) { +) -> Result<(), BallistaError> { let task_id = task.task_id.unwrap(); let task_id_log = format!( "{}/{}/{}", @@ -108,6 +118,8 @@ async fn run_received_tasks( info!("Received task {}", task_id_log); available_tasks_slots.fetch_sub(1, Ordering::SeqCst); let plan: Arc = (&task.plan.unwrap()).try_into().unwrap(); + let shuffle_output_partitioning = + parse_protobuf_hash_partitioning(task.output_partitioning.as_ref())?; tokio::spawn(async move { let execution_result = executor @@ -116,6 +128,7 @@ async fn run_received_tasks( task_id.stage_id as usize, task_id.partition_id as usize, plan, + shuffle_output_partitioning, ) .await; info!("Done with task {}", task_id_log); @@ -127,6 +140,8 @@ async fn run_received_tasks( task_id, )); }); + + Ok(()) } fn as_task_status( diff --git a/ballista/rust/executor/src/executor.rs b/ballista/rust/executor/src/executor.rs index cbf3eb040ff6..8e94f3eb19c2 100644 --- a/ballista/rust/executor/src/executor.rs +++ b/ballista/rust/executor/src/executor.rs @@ -23,7 +23,7 @@ use ballista_core::error::BallistaError; use ballista_core::execution_plans::ShuffleWriterExec; use ballista_core::serde::protobuf; use datafusion::physical_plan::display::DisplayableExecutionPlan; -use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_plan::{ExecutionPlan, Partitioning}; /// Ballista executor pub struct Executor { @@ -50,23 +50,32 @@ impl Executor { stage_id: usize, part: usize, plan: Arc, + _shuffle_output_partitioning: Option, ) -> Result, BallistaError> { - // TODO to enable shuffling we need to specify the output partitioning here and - // until we do that there is always a single output partition - // see https://github.com/apache/arrow-datafusion/issues/707 - let shuffle_output_partitioning = None; + let shuffle_output_partitioning = if let Some(shuffle_writer) = + plan.as_any().downcast_ref::() + { + shuffle_writer + .shuffle_output_partitioning() + .map(|x| x.to_owned()) + } else { + panic!() + }; let exec = ShuffleWriterExec::try_new( - job_id, + job_id.clone(), stage_id, - plan, + plan.children()[0].clone(), //TODO refactor to avoid this terrible hack to remove nested shuffle writers self.work_dir.clone(), shuffle_output_partitioning, )?; let partitions = exec.execute_shuffle_write(part).await?; println!( - "=== Physical plan with metrics ===\n{}\n", + "=== [{}/{}/{}] Physical plan with metrics ===\n{}\n", + job_id, + stage_id, + part, DisplayableExecutionPlan::with_metrics(&exec) .indent() .to_string() diff --git a/ballista/rust/executor/src/flight_service.rs b/ballista/rust/executor/src/flight_service.rs index 9a3f2d872d52..73dd1a946d55 100644 --- a/ballista/rust/executor/src/flight_service.rs +++ b/ballista/rust/executor/src/flight_service.rs @@ -18,7 +18,6 @@ //! Implementation of the Apache Arrow Flight protocol that wraps an executor. use std::fs::File; -use std::path::PathBuf; use std::pin::Pin; use std::sync::Arc; @@ -82,24 +81,13 @@ impl FlightService for BallistaFlightService { request: Request, ) -> Result, Status> { let ticket = request.into_inner(); - info!("Received do_get request"); let action = decode_protobuf(&ticket.ticket).map_err(|e| from_ballista_err(&e))?; match &action { - BallistaAction::FetchPartition(partition_id) => { - // fetch a partition that was previously executed by this executor - info!("FetchPartition {:?}", partition_id); - - let mut path = PathBuf::from(self.executor.work_dir()); - path.push(&partition_id.job_id); - path.push(&format!("{}", partition_id.stage_id)); - path.push(&format!("{}", partition_id.partition_id)); - path.push("data.arrow"); - let path = path.to_str().unwrap(); - - info!("FetchPartition {:?} reading {}", partition_id, path); + BallistaAction::FetchPartition { path, .. } => { + info!("FetchPartition reading {}", &path); let file = File::open(&path) .map_err(|e| { BallistaError::General(format!( @@ -222,7 +210,11 @@ where let schema_flight_data = SchemaAsIpc::new(reader.schema().as_ref(), &options).into(); send_response(&tx, Ok(schema_flight_data)).await?; + let mut row_count = 0; for batch in reader { + if let Ok(x) = &batch { + row_count += x.num_rows(); + } let batch_flight_data: Vec<_> = batch .map(|b| create_flight_iter(&b, &options).collect()) .map_err(|e| from_arrow_err(&e))?; @@ -230,6 +222,7 @@ where send_response(&tx, batch.clone()).await?; } } + info!("FetchPartition streamed {} rows", row_count); Ok(()) } diff --git a/ballista/rust/scheduler/src/lib.rs b/ballista/rust/scheduler/src/lib.rs index 905437d4d980..e9f9ec97fe9b 100644 --- a/ballista/rust/scheduler/src/lib.rs +++ b/ballista/rust/scheduler/src/lib.rs @@ -76,10 +76,12 @@ use crate::planner::DistributedPlanner; use log::{debug, error, info, warn}; use rand::{distributions::Alphanumeric, thread_rng, Rng}; -use tonic::{Request, Response}; +use tonic::{Request, Response, Status}; use self::state::{ConfigBackendClient, SchedulerState}; use ballista_core::config::BallistaConfig; +use ballista_core::execution_plans::ShuffleWriterExec; +use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto; use ballista_core::utils::create_datafusion_context; use datafusion::physical_plan::parquet::ParquetExec; use std::time::{Instant, SystemTime, UNIX_EPOCH}; @@ -209,7 +211,7 @@ impl SchedulerGrpc for SchedulerServer { tonic::Status::internal(msg) })?; } - let task = if can_accept_task { + let task: Result, Status> = if can_accept_task { let plan = self .state .assign_next_schedulable_task(&metadata.id) @@ -229,15 +231,34 @@ impl SchedulerGrpc for SchedulerServer { partition_id.partition_id ); } - plan.map(|(status, plan)| TaskDefinition { - plan: Some(plan.try_into().unwrap()), - task_id: status.partition_id, - }) + match plan { + Some((status, plan)) => { + let plan_clone = plan.clone(); + let output_partitioning = if let Some(shuffle_writer) = + plan_clone.as_any().downcast_ref::() + { + shuffle_writer.shuffle_output_partitioning() + } else { + //TODO should be error + warn!("Task was not a shuffle writer! {:?}", plan_clone); + None + }; + Ok(Some(TaskDefinition { + plan: Some(plan.try_into().unwrap()), + task_id: status.partition_id, + output_partitioning: hash_partitioning_to_proto( + output_partitioning, + ) + .map_err(|_| Status::internal("TBD".to_string()))?, + })) + } + None => Ok(None), + } } else { - None + Ok(None) }; lock.unlock().await; - Ok(Response::new(PollWorkResult { task })) + Ok(Response::new(PollWorkResult { task: task? })) } else { warn!("Received invalid executor poll_work request"); Err(tonic::Status::invalid_argument( @@ -431,12 +452,12 @@ impl SchedulerGrpc for SchedulerServer { })); // save stages into state - for stage in stages { + for shuffle_writer in stages { fail_job!(state .save_stage_plan( &job_id_spawn, - stage.stage_id(), - stage.children()[0].clone() + shuffle_writer.stage_id(), + shuffle_writer.clone() ) .await .map_err(|e| { @@ -444,12 +465,13 @@ impl SchedulerGrpc for SchedulerServer { error!("{}", msg); tonic::Status::internal(msg) })); - let num_partitions = stage.output_partitioning().partition_count(); + let num_partitions = + shuffle_writer.output_partitioning().partition_count(); for partition_id in 0..num_partitions { let pending_status = TaskStatus { partition_id: Some(PartitionId { job_id: job_id_spawn.clone(), - stage_id: stage.stage_id() as u32, + stage_id: shuffle_writer.stage_id() as u32, partition_id: partition_id as u32, }), status: None, diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs index 11f5c994fd52..335c4b1643af 100644 --- a/ballista/rust/scheduler/src/planner.rs +++ b/ballista/rust/scheduler/src/planner.rs @@ -105,22 +105,24 @@ impl DistributedPlanner { .as_any() .downcast_ref::() { - let query_stage = create_shuffle_writer( + let shuffle_writer = create_shuffle_writer( job_id, self.next_stage_id(), - //TODO should be children[0].clone() so that we replace this - // with an UnresolvedShuffleExec instead of just executing this - // part of the plan again - // see https://github.com/apache/arrow-datafusion/issues/707 - coalesce.children()[0].clone(), + children[0].clone(), None, )?; let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new( - query_stage.stage_id(), - query_stage.schema(), - query_stage.output_partitioning().partition_count(), + shuffle_writer.stage_id(), + shuffle_writer.schema(), + coalesce.children()[0] + .output_partitioning() + .partition_count(), + shuffle_writer + .shuffle_output_partitioning() + .map(|p| p.partition_count()) + .unwrap_or(1), )); - stages.push(query_stage); + stages.push(shuffle_writer); Ok(( coalesce.with_new_children(vec![unresolved_shuffle])?, stages, @@ -128,22 +130,22 @@ impl DistributedPlanner { } else if let Some(repart) = execution_plan.as_any().downcast_ref::() { - let query_stage = create_shuffle_writer( + let shuffle_writer = create_shuffle_writer( job_id, self.next_stage_id(), - //TODO should be children[0].clone() so that we replace this - // with an UnresolvedShuffleExec instead of just executing this - // part of the plan again - // see https://github.com/apache/arrow-datafusion/issues/707 - repart.children()[0].clone(), + children[0].clone(), Some(repart.partitioning().to_owned()), )?; let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new( - query_stage.stage_id(), - query_stage.schema(), - query_stage.output_partitioning().partition_count(), + shuffle_writer.stage_id(), + shuffle_writer.schema(), + shuffle_writer.output_partitioning().partition_count(), + shuffle_writer + .shuffle_output_partitioning() + .map(|p| p.partition_count()) + .unwrap_or(1), )); - stages.push(query_stage); + stages.push(shuffle_writer); Ok((unresolved_shuffle, stages)) } else if let Some(window) = execution_plan.as_any().downcast_ref::() @@ -184,18 +186,22 @@ pub fn remove_unresolved_shuffles( })? .clone(); - for i in 0..unresolved_shuffle.partition_count { + for i in 0..unresolved_shuffle.input_partition_count { if let Some(x) = p.get(&i) { relevant_locations.push(x.to_owned()); } else { relevant_locations.push(vec![]); } } - println!( - "create shuffle reader with {:?}", + info!( + "Creating shuffle reader: {}", relevant_locations .iter() - .map(|c| format!("{:?}", c)) + .map(|c| c + .iter() + .map(|l| l.path.clone()) + .collect::>() + .join(", ")) .collect::>() .join("\n") ); diff --git a/ballista/rust/scheduler/src/state/mod.rs b/ballista/rust/scheduler/src/state/mod.rs index a4ae59e1dfda..2e3be65f6e0f 100644 --- a/ballista/rust/scheduler/src/state/mod.rs +++ b/ballista/rust/scheduler/src/state/mod.rs @@ -297,12 +297,14 @@ impl SchedulerState { let mut partition_locations: HashMap< usize, // stage id HashMap< - usize, // shuffle input partition id - Vec, // shuffle output partitions + usize, // shuffle output partition id + Vec, // shuffle partitions >, > = HashMap::new(); for unresolved_shuffle in unresolved_shuffles { - for partition_id in 0..unresolved_shuffle.partition_count { + // we scheduler one task per *input* partition and each input partition + // can produce multiple output partitions + for partition_id in 0..unresolved_shuffle.input_partition_count { let referenced_task = tasks .get(&get_task_status_key( &self.namespace, @@ -323,7 +325,11 @@ impl SchedulerState { }, )) = &referenced_task.status { - let locations = partition_locations + info!("Task for unresolved shuffle input partition {} completed and produced these shuffle partitions:\n\t{:?}", + partition_id, + partitions.iter().map(|p| format!("{}={}", p.partition_id, &p.path)).collect::>().join("\n\t") + ); + let stage_shuffle_partition_locations = partition_locations .entry(unresolved_shuffle.stage_id) .or_insert_with(HashMap::new); let executor_meta = executors @@ -332,9 +338,10 @@ impl SchedulerState { .unwrap() .clone(); - let temp = - locations.entry(partition_id).or_insert_with(Vec::new); - for p in partitions { + for shuffle_write_partition in partitions { + let temp = stage_shuffle_partition_locations + .entry(shuffle_write_partition.partition_id as usize) + .or_insert_with(Vec::new); let executor_meta = executor_meta.clone(); let partition_location = ballista_core::serde::scheduler::PartitionLocation { @@ -342,22 +349,24 @@ impl SchedulerState { ballista_core::serde::scheduler::PartitionId { job_id: partition.job_id.clone(), stage_id: unresolved_shuffle.stage_id, - partition_id, + partition_id: shuffle_write_partition + .partition_id + as usize, }, executor_meta, partition_stats: PartitionStats::new( - Some(p.num_rows), - Some(p.num_batches), - Some(p.num_bytes), + Some(shuffle_write_partition.num_rows), + Some(shuffle_write_partition.num_batches), + Some(shuffle_write_partition.num_bytes), ), - path: p.path.clone(), + path: shuffle_write_partition.path.clone(), }; info!( - "Scheduler storing stage {} partition {} path: {}", + "Scheduler storing stage {} output partition {} path: {}", unresolved_shuffle.stage_id, - partition_id, + partition_location.partition_id.partition_id, partition_location.path - ); + ); temp.push(partition_location); } } else { @@ -365,6 +374,7 @@ impl SchedulerState { } } } + let plan = remove_unresolved_shuffles(plan.as_ref(), &partition_locations)?; From a6657f21417add7f75d624deb8b5c6271ebf59ed Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 18 Jul 2021 14:55:55 -0600 Subject: [PATCH 06/10] code cleanup --- ballista/rust/executor/src/executor.rs | 28 +++++++------ ballista/rust/scheduler/src/planner.rs | 57 +++++++++++++++++--------- 2 files changed, 52 insertions(+), 33 deletions(-) diff --git a/ballista/rust/executor/src/executor.rs b/ballista/rust/executor/src/executor.rs index 8e94f3eb19c2..398ebca2b8e6 100644 --- a/ballista/rust/executor/src/executor.rs +++ b/ballista/rust/executor/src/executor.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use ballista_core::error::BallistaError; use ballista_core::execution_plans::ShuffleWriterExec; use ballista_core::serde::protobuf; +use datafusion::error::DataFusionError; use datafusion::physical_plan::display::DisplayableExecutionPlan; use datafusion::physical_plan::{ExecutionPlan, Partitioning}; @@ -52,23 +53,24 @@ impl Executor { plan: Arc, _shuffle_output_partitioning: Option, ) -> Result, BallistaError> { - let shuffle_output_partitioning = if let Some(shuffle_writer) = + let exec = if let Some(shuffle_writer) = plan.as_any().downcast_ref::() { - shuffle_writer - .shuffle_output_partitioning() - .map(|x| x.to_owned()) + // recreate the shuffle writer with the correct working directory + ShuffleWriterExec::try_new( + job_id.clone(), + stage_id, + plan.children()[0].clone(), + self.work_dir.clone(), + shuffle_writer.shuffle_output_partitioning().cloned(), + ) } else { - panic!() - }; + Err(DataFusionError::Internal( + "Plan passed to execute_shuffle_write is not a ShuffleWriterExec" + .to_string(), + )) + }?; - let exec = ShuffleWriterExec::try_new( - job_id.clone(), - stage_id, - plan.children()[0].clone(), //TODO refactor to avoid this terrible hack to remove nested shuffle writers - self.work_dir.clone(), - shuffle_output_partitioning, - )?; let partitions = exec.execute_shuffle_write(part).await?; println!( diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs index 335c4b1643af..d2e5b1c77ab0 100644 --- a/ballista/rust/scheduler/src/planner.rs +++ b/ballista/rust/scheduler/src/planner.rs @@ -130,23 +130,31 @@ impl DistributedPlanner { } else if let Some(repart) = execution_plan.as_any().downcast_ref::() { - let shuffle_writer = create_shuffle_writer( - job_id, - self.next_stage_id(), - children[0].clone(), - Some(repart.partitioning().to_owned()), - )?; - let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new( - shuffle_writer.stage_id(), - shuffle_writer.schema(), - shuffle_writer.output_partitioning().partition_count(), - shuffle_writer - .shuffle_output_partitioning() - .map(|p| p.partition_count()) - .unwrap_or(1), - )); - stages.push(shuffle_writer); - Ok((unresolved_shuffle, stages)) + match repart.output_partitioning() { + Partitioning::Hash(_, _) => { + let shuffle_writer = create_shuffle_writer( + job_id, + self.next_stage_id(), + children[0].clone(), + Some(repart.partitioning().to_owned()), + )?; + let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new( + shuffle_writer.stage_id(), + shuffle_writer.schema(), + shuffle_writer.output_partitioning().partition_count(), + shuffle_writer + .shuffle_output_partitioning() + .map(|p| p.partition_count()) + .unwrap_or(1), + )); + stages.push(shuffle_writer); + Ok((unresolved_shuffle, stages)) + } + _ => { + // remove any non-hash repartition from the distributed plan + Ok((children[0].clone(), stages)) + } + } } else if let Some(window) = execution_plan.as_any().downcast_ref::() { @@ -241,6 +249,7 @@ mod test { use ballista_core::error::BallistaError; use ballista_core::execution_plans::UnresolvedShuffleExec; use ballista_core::serde::protobuf; + use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec}; use datafusion::physical_plan::sort::SortExec; use datafusion::physical_plan::{ @@ -290,9 +299,7 @@ mod test { ProjectionExec: expr=[l_returnflag@0 as l_returnflag, SUM(lineitem.l_extendedprice Multiply Int64(1))@1 as sum_disc_price] HashAggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag], aggr=[SUM(l_extendedprice Multiply Int64(1))] CoalesceBatchesExec: target_batch_size=4096 - RepartitionExec: partitioning=Hash([Column { name: "l_returnflag", index: 0 }], 2) - HashAggregateExec: mode=Partial, gby=[l_returnflag@1 as l_returnflag], aggr=[SUM(l_extendedprice Multiply Int64(1))] - CsvExec: source=Path(testdata/lineitem: [testdata/lineitem/partition0.tbl,testdata/lineitem/partition1.tbl]), has_header=false + UnresolvedShuffleExec ShuffleWriterExec: None SortExec: [l_returnflag@0 ASC] @@ -313,6 +320,14 @@ mod test { let final_hash = projection.children()[0].clone(); let final_hash = downcast_exec!(final_hash, HashAggregateExec); assert!(*final_hash.mode() == AggregateMode::FinalPartitioned); + let coalesce = final_hash.children()[0].clone(); + let coalesce = downcast_exec!(coalesce, CoalesceBatchesExec); + let unresolved_shuffle = coalesce.children()[0].clone(); + let unresolved_shuffle = + downcast_exec!(unresolved_shuffle, UnresolvedShuffleExec); + assert_eq!(unresolved_shuffle.stage_id, 1); + assert_eq!(unresolved_shuffle.input_partition_count, 2); + assert_eq!(unresolved_shuffle.output_partition_count, 2); // verify stage 2 let stage2 = stages[2].children()[0].clone(); @@ -324,6 +339,8 @@ mod test { let unresolved_shuffle = downcast_exec!(unresolved_shuffle, UnresolvedShuffleExec); assert_eq!(unresolved_shuffle.stage_id, 2); + assert_eq!(unresolved_shuffle.input_partition_count, 2); + assert_eq!(unresolved_shuffle.output_partition_count, 1); Ok(()) } From 274d092d4db239840e869129fe972d69a37d82ca Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 19 Jul 2021 22:15:06 -0600 Subject: [PATCH 07/10] integration tests pass --- ballista/rust/scheduler/src/planner.rs | 20 +++++++++++++------- ballista/rust/scheduler/src/state/mod.rs | 18 ++++++++++++------ 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs index d2e5b1c77ab0..f4bfc0d8b6c0 100644 --- a/ballista/rust/scheduler/src/planner.rs +++ b/ballista/rust/scheduler/src/planner.rs @@ -114,13 +114,13 @@ impl DistributedPlanner { let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new( shuffle_writer.stage_id(), shuffle_writer.schema(), - coalesce.children()[0] - .output_partitioning() - .partition_count(), + shuffle_writer.output_partitioning().partition_count(), shuffle_writer .shuffle_output_partitioning() .map(|p| p.partition_count()) - .unwrap_or(1), + .unwrap_or_else(|| { + shuffle_writer.output_partitioning().partition_count() + }), )); stages.push(shuffle_writer); Ok(( @@ -145,7 +145,9 @@ impl DistributedPlanner { shuffle_writer .shuffle_output_partitioning() .map(|p| p.partition_count()) - .unwrap_or(1), + .unwrap_or_else(|| { + shuffle_writer.output_partitioning().partition_count() + }), )); stages.push(shuffle_writer); Ok((unresolved_shuffle, stages)) @@ -194,7 +196,7 @@ pub fn remove_unresolved_shuffles( })? .clone(); - for i in 0..unresolved_shuffle.input_partition_count { + for i in 0..unresolved_shuffle.output_partition_count { if let Some(x) = p.get(&i) { relevant_locations.push(x.to_owned()); } else { @@ -335,12 +337,16 @@ mod test { let coalesce_partitions = sort.children()[0].clone(); let coalesce_partitions = downcast_exec!(coalesce_partitions, CoalescePartitionsExec); + assert_eq!( + coalesce_partitions.output_partitioning().partition_count(), + 1 + ); let unresolved_shuffle = coalesce_partitions.children()[0].clone(); let unresolved_shuffle = downcast_exec!(unresolved_shuffle, UnresolvedShuffleExec); assert_eq!(unresolved_shuffle.stage_id, 2); assert_eq!(unresolved_shuffle.input_partition_count, 2); - assert_eq!(unresolved_shuffle.output_partition_count, 1); + assert_eq!(unresolved_shuffle.output_partition_count, 2); Ok(()) } diff --git a/ballista/rust/scheduler/src/state/mod.rs b/ballista/rust/scheduler/src/state/mod.rs index 2e3be65f6e0f..0bbab8cebf89 100644 --- a/ballista/rust/scheduler/src/state/mod.rs +++ b/ballista/rust/scheduler/src/state/mod.rs @@ -302,15 +302,17 @@ impl SchedulerState { >, > = HashMap::new(); for unresolved_shuffle in unresolved_shuffles { - // we scheduler one task per *input* partition and each input partition + // we schedule one task per *input* partition and each input partition // can produce multiple output partitions - for partition_id in 0..unresolved_shuffle.input_partition_count { + for shuffle_input_partition_id in + 0..unresolved_shuffle.input_partition_count + { let referenced_task = tasks .get(&get_task_status_key( &self.namespace, &partition.job_id, unresolved_shuffle.stage_id, - partition_id, + shuffle_input_partition_id, )) .unwrap(); let task_is_dead = self @@ -325,8 +327,8 @@ impl SchedulerState { }, )) = &referenced_task.status { - info!("Task for unresolved shuffle input partition {} completed and produced these shuffle partitions:\n\t{:?}", - partition_id, + debug!("Task for unresolved shuffle input partition {} completed and produced these shuffle partitions:\n\t{}", + shuffle_input_partition_id, partitions.iter().map(|p| format!("{}={}", p.partition_id, &p.path)).collect::>().join("\n\t") ); let stage_shuffle_partition_locations = partition_locations @@ -361,7 +363,7 @@ impl SchedulerState { ), path: shuffle_write_partition.path.clone(), }; - info!( + debug!( "Scheduler storing stage {} output partition {} path: {}", unresolved_shuffle.stage_id, partition_location.partition_id.partition_id, @@ -370,6 +372,10 @@ impl SchedulerState { temp.push(partition_location); } } else { + debug!( + "Stage {} input partition {} has not completed yet", + unresolved_shuffle.stage_id, shuffle_input_partition_id, + ); continue 'tasks; } } From bf8e087ce8827ea8f2ef41724ea53f3cf357790f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 19 Jul 2021 22:28:30 -0600 Subject: [PATCH 08/10] error handling --- ballista/rust/scheduler/src/lib.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/ballista/rust/scheduler/src/lib.rs b/ballista/rust/scheduler/src/lib.rs index e9f9ec97fe9b..f5e2dc1dfd80 100644 --- a/ballista/rust/scheduler/src/lib.rs +++ b/ballista/rust/scheduler/src/lib.rs @@ -239,9 +239,10 @@ impl SchedulerGrpc for SchedulerServer { { shuffle_writer.shuffle_output_partitioning() } else { - //TODO should be error - warn!("Task was not a shuffle writer! {:?}", plan_clone); - None + return Err(Status::invalid_argument(format!( + "Task root plan was not a ShuffleWriterExec: {:?}", + plan_clone + ))); }; Ok(Some(TaskDefinition { plan: Some(plan.try_into().unwrap()), From 94c5dc69a49273e9f25636726e53f08232cfb091 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 20 Jul 2021 07:35:41 -0600 Subject: [PATCH 09/10] Additional test --- ballista/rust/scheduler/src/planner.rs | 172 +++++++++++++++++++++++++ 1 file changed, 172 insertions(+) diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs index f4bfc0d8b6c0..05025f282477 100644 --- a/ballista/rust/scheduler/src/planner.rs +++ b/ballista/rust/scheduler/src/planner.rs @@ -253,6 +253,7 @@ mod test { use ballista_core::serde::protobuf; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec}; + use datafusion::physical_plan::hash_join::HashJoinExec; use datafusion::physical_plan::sort::SortExec; use datafusion::physical_plan::{ coalesce_partitions::CoalescePartitionsExec, projection::ProjectionExec, @@ -351,6 +352,177 @@ mod test { Ok(()) } + #[test] + fn distributed_join_plan() -> Result<(), BallistaError> { + let mut ctx = datafusion_test_context("testdata")?; + + // simplified form of TPC-H query 12 + let df = ctx.sql( + "select + l_shipmode, + sum(case + when o_orderpriority = '1-URGENT' + or o_orderpriority = '2-HIGH' + then 1 + else 0 + end) as high_line_count, + sum(case + when o_orderpriority <> '1-URGENT' + and o_orderpriority <> '2-HIGH' + then 1 + else 0 + end) as low_line_count +from + lineitem + join + orders + on + l_orderkey = o_orderkey +where + l_shipmode in ('MAIL', 'SHIP') + and l_commitdate < l_receiptdate + and l_shipdate < l_commitdate + and l_receiptdate >= date '1994-01-01' + and l_receiptdate < date '1995-01-01' +group by + l_shipmode +order by + l_shipmode; +", + )?; + + let plan = df.to_logical_plan(); + let plan = ctx.optimize(&plan)?; + let plan = ctx.create_physical_plan(&plan)?; + + let mut planner = DistributedPlanner::new(); + let job_uuid = Uuid::new_v4(); + let stages = planner.plan_query_stages(&job_uuid.to_string(), plan)?; + for stage in &stages { + println!("{}", displayable(stage.as_ref()).indent().to_string()); + } + + /* Expected result: + + ShuffleWriterExec: Some(Hash([Column { name: "l_orderkey", index: 0 }], 2)) + CoalesceBatchesExec: target_batch_size=4096 + FilterExec: l_shipmode@4 IN ([Literal { value: Utf8("MAIL") }, Literal { value: Utf8("SHIP") }]) AND l_commitdate@2 < l_receiptdate@3 AND l_shipdate@1 < l_commitdate@2 AND l_receiptdate@3 >= 8766 AND l_receiptdate@3 < 9131 + CsvExec: source=Path(testdata/lineitem: [testdata/lineitem/partition0.tbl,testdata/lineitem/partition1.tbl]), has_header=false + + ShuffleWriterExec: Some(Hash([Column { name: "o_orderkey", index: 0 }], 2)) + CsvExec: source=Path(testdata/orders: [testdata/orders/orders.tbl]), has_header=false + + ShuffleWriterExec: Some(Hash([Column { name: "l_shipmode", index: 0 }], 2)) + HashAggregateExec: mode=Partial, gby=[l_shipmode@4 as l_shipmode], aggr=[SUM(CASE WHEN #orders.o_orderpriority Eq Utf8("1-URGENT") Or #orders.o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN #orders.o_orderpriority NotEq Utf8("1-URGENT") And #orders.o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)] + CoalesceBatchesExec: target_batch_size=4096 + HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_orderkey", index: 0 }, Column { name: "o_orderkey", index: 0 })] + CoalesceBatchesExec: target_batch_size=4096 + UnresolvedShuffleExec + CoalesceBatchesExec: target_batch_size=4096 + UnresolvedShuffleExec + + ShuffleWriterExec: None + ProjectionExec: expr=[l_shipmode@0 as l_shipmode, SUM(CASE WHEN #orders.o_orderpriority Eq Utf8("1-URGENT") Or #orders.o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)@1 as high_line_count, SUM(CASE WHEN #orders.o_orderpriority NotEq Utf8("1-URGENT") And #orders.o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)@2 as low_line_count] + HashAggregateExec: mode=FinalPartitioned, gby=[l_shipmode@0 as l_shipmode], aggr=[SUM(CASE WHEN #orders.o_orderpriority Eq Utf8("1-URGENT") Or #orders.o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN #orders.o_orderpriority NotEq Utf8("1-URGENT") And #orders.o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)] + CoalesceBatchesExec: target_batch_size=4096 + UnresolvedShuffleExec + + ShuffleWriterExec: None + SortExec: [l_shipmode@0 ASC] + CoalescePartitionsExec + UnresolvedShuffleExec + */ + + assert_eq!(5, stages.len()); + + // verify partitioning for each stage + + // csv "lineitem" (2 files) + assert_eq!( + 2, + stages[0].children()[0] + .output_partitioning() + .partition_count() + ); + assert_eq!( + 2, + stages[0] + .shuffle_output_partitioning() + .unwrap() + .partition_count() + ); + + // csv "orders" (1 file) + assert_eq!( + 1, + stages[1].children()[0] + .output_partitioning() + .partition_count() + ); + assert_eq!( + 2, + stages[1] + .shuffle_output_partitioning() + .unwrap() + .partition_count() + ); + + // join and partial hash aggregate + let input = stages[2].children()[0].clone(); + assert_eq!(2, input.output_partitioning().partition_count()); + assert_eq!( + 2, + stages[2] + .shuffle_output_partitioning() + .unwrap() + .partition_count() + ); + + let hash_agg = downcast_exec!(input, HashAggregateExec); + + let coalesce_batches = hash_agg.children()[0].clone(); + let coalesce_batches = downcast_exec!(coalesce_batches, CoalesceBatchesExec); + + let join = coalesce_batches.children()[0].clone(); + let join = downcast_exec!(join, HashJoinExec); + + let join_input_1 = join.children()[0].clone(); + // skip CoalesceBatches + let join_input_1 = join_input_1.children()[0].clone(); + let unresolved_shuffle_reader_1 = + downcast_exec!(join_input_1, UnresolvedShuffleExec); + assert_eq!(unresolved_shuffle_reader_1.input_partition_count, 2); // lineitem + assert_eq!(unresolved_shuffle_reader_1.output_partition_count, 2); + + let join_input_2 = join.children()[1].clone(); + // skip CoalesceBatches + let join_input_2 = join_input_2.children()[0].clone(); + let unresolved_shuffle_reader_2 = + downcast_exec!(join_input_2, UnresolvedShuffleExec); + assert_eq!(unresolved_shuffle_reader_2.input_partition_count, 1); //orders + assert_eq!(unresolved_shuffle_reader_2.output_partition_count, 2); + + // final partitioned hash aggregate + assert_eq!( + 2, + stages[3].children()[0] + .output_partitioning() + .partition_count() + ); + assert!(stages[3].shuffle_output_partitioning().is_none()); + + // coalesce partitions and sort + assert_eq!( + 1, + stages[4].children()[0] + .output_partitioning() + .partition_count() + ); + assert!(stages[4].shuffle_output_partitioning().is_none()); + + Ok(()) + } + #[test] fn roundtrip_serde_hash_aggregate() -> Result<(), BallistaError> { let mut ctx = datafusion_test_context("testdata")?; From 1696e7eb940736126a1db42c9f31d5c61d942e15 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 20 Jul 2021 07:47:50 -0600 Subject: [PATCH 10/10] add links to follow on issue --- ballista/rust/core/src/execution_plans/shuffle_reader.rs | 3 ++- ballista/rust/core/src/execution_plans/unresolved_shuffle.rs | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/ballista/rust/core/src/execution_plans/shuffle_reader.rs b/ballista/rust/core/src/execution_plans/shuffle_reader.rs index c1e66257b776..0447ca995313 100644 --- a/ballista/rust/core/src/execution_plans/shuffle_reader.rs +++ b/ballista/rust/core/src/execution_plans/shuffle_reader.rs @@ -76,7 +76,8 @@ impl ExecutionPlan for ShuffleReaderExec { } fn output_partitioning(&self) -> Partitioning { - // TODO partitioning may be known and could be populated here? + // TODO partitioning may be known and could be populated here + // see https://github.com/apache/arrow-datafusion/issues/758 Partitioning::UnknownPartitioning(self.partition.len()) } diff --git a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs index c7b171089aef..3111b5a41be3 100644 --- a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs +++ b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs @@ -79,6 +79,7 @@ impl ExecutionPlan for UnresolvedShuffleExec { fn output_partitioning(&self) -> Partitioning { //TODO the output partition is known and should be populated here! + // see https://github.com/apache/arrow-datafusion/issues/758 Partitioning::UnknownPartitioning(self.output_partition_count) }