diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs index b8210cbc2626..26087f8e6693 100644 --- a/ballista/rust/client/src/context.rs +++ b/ballista/rust/client/src/context.rs @@ -223,6 +223,7 @@ impl BallistaContext { &partition_id.job_id, partition_id.stage_id as usize, partition_id.partition_id as usize, + &location.path, ) .await .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?) diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index 50bd901f145d..9dbce81c21f1 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -544,7 +544,8 @@ message PhysicalNegativeNode { message UnresolvedShuffleExecNode { uint32 stage_id = 1; Schema schema = 2; - uint32 partition_count = 3; + uint32 input_partition_count = 3; + uint32 output_partition_count = 4; } message FilterExecNode { @@ -700,7 +701,7 @@ message Action { oneof ActionType { // Fetch a partition from an executor - PartitionId fetch_partition = 3; + FetchPartition fetch_partition = 3; } // configuration settings @@ -714,6 +715,15 @@ message ExecutePartition { PhysicalPlanNode plan = 4; // The task could need to read partitions from other executors repeated PartitionLocation partition_location = 5; + // Output partition for shuffle writer + PhysicalHashRepartition output_partitioning = 6; +} + +message FetchPartition { + string job_id = 1; + uint32 stage_id = 2; + uint32 partition_id = 3; + string path = 4; } // Mapping from partition id to executor id @@ -809,6 +819,8 @@ message PollWorkParams { message TaskDefinition { PartitionId task_id = 1; PhysicalPlanNode plan = 2; + // Output partition for shuffle writer + PhysicalHashRepartition output_partitioning = 3; } message PollWorkResult { diff --git a/ballista/rust/core/src/client.rs b/ballista/rust/core/src/client.rs index 2df414578355..26c8d22b405d 100644 --- a/ballista/rust/core/src/client.rs +++ b/ballista/rust/core/src/client.rs @@ -81,9 +81,14 @@ impl BallistaClient { job_id: &str, stage_id: usize, partition_id: usize, + path: &str, ) -> Result { - let action = - Action::FetchPartition(PartitionId::new(job_id, stage_id, partition_id)); + let action = Action::FetchPartition { + job_id: job_id.to_string(), + stage_id, + partition_id, + path: path.to_owned(), + }; self.execute_action(&action).await } diff --git a/ballista/rust/core/src/execution_plans/shuffle_reader.rs b/ballista/rust/core/src/execution_plans/shuffle_reader.rs index db03d3ddf080..0447ca995313 100644 --- a/ballista/rust/core/src/execution_plans/shuffle_reader.rs +++ b/ballista/rust/core/src/execution_plans/shuffle_reader.rs @@ -76,6 +76,8 @@ impl ExecutionPlan for ShuffleReaderExec { } fn output_partitioning(&self) -> Partitioning { + // TODO partitioning may be known and could be populated here + // see https://github.com/apache/arrow-datafusion/issues/758 Partitioning::UnknownPartitioning(self.partition.len()) } @@ -123,24 +125,26 @@ impl ExecutionPlan for ShuffleReaderExec { let loc_str = self .partition .iter() - .map(|x| { - x.iter() - .map(|l| { - format!( - "[executor={} part={}:{}:{} stats={}]", - l.executor_meta.id, - l.partition_id.job_id, - l.partition_id.stage_id, - l.partition_id.partition_id, - l.partition_stats - ) - }) - .collect::>() - .join(",") + .enumerate() + .map(|(partition_id, locations)| { + format!( + "[partition={} paths={}]", + partition_id, + locations + .iter() + .map(|l| l.path.clone()) + .collect::>() + .join(",") + ) }) .collect::>() .join(", "); - write!(f, "ShuffleReaderExec: partition_locations={}", loc_str) + write!( + f, + "ShuffleReaderExec: partition_locations({})={}", + self.partition.len(), + loc_str + ) } } } @@ -166,6 +170,7 @@ async fn fetch_partition( &partition_id.job_id, partition_id.stage_id as usize, partition_id.partition_id as usize, + &location.path, ) .await .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?) diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs b/ballista/rust/core/src/execution_plans/shuffle_writer.rs index 47bf2a25f636..8081dab36ab5 100644 --- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs @@ -141,6 +141,7 @@ impl ShuffleWriterExec { match &self.shuffle_output_partitioning { None => { + let start = Instant::now(); path.push(&format!("{}", input_partition)); std::fs::create_dir_all(&path)?; path.push("data.arrow"); @@ -156,6 +157,14 @@ impl ShuffleWriterExec { .await .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?; + self.metrics + .input_rows + .add(stats.num_rows.unwrap_or(0) as usize); + self.metrics + .output_rows + .add(stats.num_rows.unwrap_or(0) as usize); + self.metrics.write_time.add_elapsed(start); + info!( "Executed partition {} in {} seconds. Statistics: {}", input_partition, @@ -227,6 +236,8 @@ impl ShuffleWriterExec { RecordBatch::try_new(input_batch.schema(), columns)?; // write non-empty batch out + + //TODO optimize so we don't write or fetch empty partitions //if output_batch.num_rows() > 0 { let start = Instant::now(); match &mut writers[output_partition] { diff --git a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs index cb351eec561a..3111b5a41be3 100644 --- a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs +++ b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs @@ -43,17 +43,26 @@ pub struct UnresolvedShuffleExec { // The schema this node will have once it is replaced with a ShuffleReaderExec pub schema: SchemaRef, + // The number of shuffle writer partition tasks that will produce the partitions + pub input_partition_count: usize, + // The partition count this node will have once it is replaced with a ShuffleReaderExec - pub partition_count: usize, + pub output_partition_count: usize, } impl UnresolvedShuffleExec { /// Create a new UnresolvedShuffleExec - pub fn new(stage_id: usize, schema: SchemaRef, partition_count: usize) -> Self { + pub fn new( + stage_id: usize, + schema: SchemaRef, + input_partition_count: usize, + output_partition_count: usize, + ) -> Self { Self { stage_id, schema, - partition_count, + input_partition_count, + output_partition_count, } } } @@ -69,7 +78,9 @@ impl ExecutionPlan for UnresolvedShuffleExec { } fn output_partitioning(&self) -> Partitioning { - Partitioning::UnknownPartitioning(self.partition_count) + //TODO the output partition is known and should be populated here! + // see https://github.com/apache/arrow-datafusion/issues/758 + Partitioning::UnknownPartitioning(self.output_partition_count) } fn children(&self) -> Vec> { diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index 4b0a9844773c..509044b3d1ba 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -376,21 +376,9 @@ impl TryInto> for &protobuf::PhysicalPlanNode { let input: Arc = convert_box_required!(shuffle_writer.input)?; - let output_partitioning = match &shuffle_writer.output_partitioning { - Some(hash_part) => { - let expr = hash_part - .hash_expr - .iter() - .map(|e| e.try_into()) - .collect::>, _>>()?; - - Some(Partitioning::Hash( - expr, - hash_part.partition_count.try_into().unwrap(), - )) - } - None => None, - }; + let output_partitioning = parse_protobuf_hash_partitioning( + shuffle_writer.output_partitioning.as_ref(), + )?; Ok(Arc::new(ShuffleWriterExec::try_new( shuffle_writer.job_id.clone(), @@ -466,7 +454,10 @@ impl TryInto> for &protobuf::PhysicalPlanNode { Ok(Arc::new(UnresolvedShuffleExec { stage_id: unresolved_shuffle.stage_id as usize, schema, - partition_count: unresolved_shuffle.partition_count as usize, + input_partition_count: unresolved_shuffle.input_partition_count + as usize, + output_partition_count: unresolved_shuffle.output_partition_count + as usize, })) } } @@ -680,3 +671,23 @@ impl TryFrom<&protobuf::physical_window_expr_node::WindowFunction> for WindowFun } } } + +pub fn parse_protobuf_hash_partitioning( + partitioning: Option<&protobuf::PhysicalHashRepartition>, +) -> Result, BallistaError> { + match partitioning { + Some(hash_part) => { + let expr = hash_part + .hash_expr + .iter() + .map(|e| e.try_into()) + .collect::>, _>>()?; + + Ok(Some(Partitioning::Hash( + expr, + hash_part.partition_count.try_into().unwrap(), + ))) + } + None => Ok(None), + } +} diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs b/ballista/rust/core/src/serde/physical_plan/to_proto.rs index fa35eb48d4fa..ec5ec7cb7aff 100644 --- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs @@ -397,7 +397,8 @@ impl TryInto for Arc { protobuf::UnresolvedShuffleExecNode { stage_id: exec.stage_id as u32, schema: Some(exec.schema().as_ref().into()), - partition_count: exec.partition_count as u32, + input_partition_count: exec.input_partition_count as u32, + output_partition_count: exec.output_partition_count as u32, }, )), }) diff --git a/ballista/rust/core/src/serde/scheduler/from_proto.rs b/ballista/rust/core/src/serde/scheduler/from_proto.rs index 4f9c9bc8877e..8d4e279395fa 100644 --- a/ballista/rust/core/src/serde/scheduler/from_proto.rs +++ b/ballista/rust/core/src/serde/scheduler/from_proto.rs @@ -32,9 +32,12 @@ impl TryInto for protobuf::Action { fn try_into(self) -> Result { match self.action_type { - Some(ActionType::FetchPartition(partition)) => { - Ok(Action::FetchPartition(partition.try_into()?)) - } + Some(ActionType::FetchPartition(fetch)) => Ok(Action::FetchPartition { + job_id: fetch.job_id, + stage_id: fetch.stage_id as usize, + partition_id: fetch.partition_id as usize, + path: fetch.path, + }), _ => Err(BallistaError::General( "scheduler::from_proto(Action) invalid or missing action".to_owned(), )), diff --git a/ballista/rust/core/src/serde/scheduler/mod.rs b/ballista/rust/core/src/serde/scheduler/mod.rs index eeddfbbb41f3..a20d955f28b2 100644 --- a/ballista/rust/core/src/serde/scheduler/mod.rs +++ b/ballista/rust/core/src/serde/scheduler/mod.rs @@ -23,6 +23,7 @@ use datafusion::arrow::array::{ use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::logical_plan::LogicalPlan; use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_plan::Partitioning; use serde::Serialize; use uuid::Uuid; @@ -36,7 +37,12 @@ pub mod to_proto; #[derive(Debug, Clone)] pub enum Action { /// Collect a shuffle partition - FetchPartition(PartitionId), + FetchPartition { + job_id: String, + stage_id: usize, + partition_id: usize, + path: String, + }, } /// Unique identifier for the output partition of an operator. @@ -223,6 +229,8 @@ pub struct ExecutePartition { pub plan: Arc, /// Location of shuffle partitions that this query stage may depend on pub shuffle_locations: HashMap, + /// Output partitioning for shuffle writes + pub output_partitioning: Option, } impl ExecutePartition { @@ -232,6 +240,7 @@ impl ExecutePartition { partition_id: Vec, plan: Arc, shuffle_locations: HashMap, + output_partitioning: Option, ) -> Self { Self { job_id, @@ -239,6 +248,7 @@ impl ExecutePartition { partition_id, plan, shuffle_locations, + output_partitioning, } } diff --git a/ballista/rust/core/src/serde/scheduler/to_proto.rs b/ballista/rust/core/src/serde/scheduler/to_proto.rs index 57d4f615c5f8..bdc88d0b99a2 100644 --- a/ballista/rust/core/src/serde/scheduler/to_proto.rs +++ b/ballista/rust/core/src/serde/scheduler/to_proto.rs @@ -23,14 +23,25 @@ use crate::serde::protobuf::action::ActionType; use crate::serde::scheduler::{ Action, ExecutePartition, PartitionId, PartitionLocation, PartitionStats, }; +use datafusion::physical_plan::Partitioning; impl TryInto for Action { type Error = BallistaError; fn try_into(self) -> Result { match self { - Action::FetchPartition(partition_id) => Ok(protobuf::Action { - action_type: Some(ActionType::FetchPartition(partition_id.into())), + Action::FetchPartition { + job_id, + stage_id, + partition_id, + path, + } => Ok(protobuf::Action { + action_type: Some(ActionType::FetchPartition(protobuf::FetchPartition { + job_id, + stage_id: stage_id as u32, + partition_id: partition_id as u32, + path, + })), settings: vec![], }), } @@ -47,6 +58,9 @@ impl TryInto for ExecutePartition { partition_id: self.partition_id.iter().map(|n| *n as u32).collect(), plan: Some(self.plan.try_into()?), partition_location: vec![], + output_partitioning: hash_partitioning_to_proto( + self.output_partitioning.as_ref(), + )?, }) } } @@ -87,3 +101,26 @@ impl Into for PartitionStats { } } } + +pub fn hash_partitioning_to_proto( + output_partitioning: Option<&Partitioning>, +) -> Result, BallistaError> { + match output_partitioning { + Some(Partitioning::Hash(exprs, partition_count)) => { + Ok(Some(protobuf::PhysicalHashRepartition { + hash_expr: exprs + .iter() + .map(|expr| expr.clone().try_into()) + .collect::, BallistaError>>()?, + partition_count: *partition_count as u64, + })) + } + None => Ok(None), + other => { + return Err(BallistaError::General(format!( + "scheduler::to_proto() invalid partitioning for ExecutePartition: {:?}", + other + ))) + } + } +} diff --git a/ballista/rust/executor/src/execution_loop.rs b/ballista/rust/executor/src/execution_loop.rs index b65b83bbaf48..4d12dfc1c755 100644 --- a/ballista/rust/executor/src/execution_loop.rs +++ b/ballista/rust/executor/src/execution_loop.rs @@ -33,6 +33,8 @@ use ballista_core::serde::protobuf::{ use protobuf::CompletedTask; use crate::executor::Executor; +use ballista_core::error::BallistaError; +use ballista_core::serde::physical_plan::from_proto::parse_protobuf_hash_partitioning; pub async fn poll_loop( mut scheduler: SchedulerGrpcClient, @@ -70,15 +72,23 @@ pub async fn poll_loop( match poll_work_result { Ok(result) => { if let Some(task) = result.into_inner().task { - run_received_tasks( + match run_received_tasks( executor.clone(), executor_meta.id.clone(), available_tasks_slots.clone(), task_status_sender, task, ) - .await; - active_job = true; + .await + { + Ok(_) => { + active_job = true; + } + Err(e) => { + warn!("Failed to run task: {:?}", e); + active_job = false; + } + } } else { active_job = false; } @@ -99,7 +109,7 @@ async fn run_received_tasks( available_tasks_slots: Arc, task_status_sender: Sender, task: TaskDefinition, -) { +) -> Result<(), BallistaError> { let task_id = task.task_id.unwrap(); let task_id_log = format!( "{}/{}/{}", @@ -108,6 +118,8 @@ async fn run_received_tasks( info!("Received task {}", task_id_log); available_tasks_slots.fetch_sub(1, Ordering::SeqCst); let plan: Arc = (&task.plan.unwrap()).try_into().unwrap(); + let shuffle_output_partitioning = + parse_protobuf_hash_partitioning(task.output_partitioning.as_ref())?; tokio::spawn(async move { let execution_result = executor @@ -116,6 +128,7 @@ async fn run_received_tasks( task_id.stage_id as usize, task_id.partition_id as usize, plan, + shuffle_output_partitioning, ) .await; info!("Done with task {}", task_id_log); @@ -127,6 +140,8 @@ async fn run_received_tasks( task_id, )); }); + + Ok(()) } fn as_task_status( diff --git a/ballista/rust/executor/src/executor.rs b/ballista/rust/executor/src/executor.rs index cbf3eb040ff6..398ebca2b8e6 100644 --- a/ballista/rust/executor/src/executor.rs +++ b/ballista/rust/executor/src/executor.rs @@ -22,8 +22,9 @@ use std::sync::Arc; use ballista_core::error::BallistaError; use ballista_core::execution_plans::ShuffleWriterExec; use ballista_core::serde::protobuf; +use datafusion::error::DataFusionError; use datafusion::physical_plan::display::DisplayableExecutionPlan; -use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_plan::{ExecutionPlan, Partitioning}; /// Ballista executor pub struct Executor { @@ -50,23 +51,33 @@ impl Executor { stage_id: usize, part: usize, plan: Arc, + _shuffle_output_partitioning: Option, ) -> Result, BallistaError> { - // TODO to enable shuffling we need to specify the output partitioning here and - // until we do that there is always a single output partition - // see https://github.com/apache/arrow-datafusion/issues/707 - let shuffle_output_partitioning = None; + let exec = if let Some(shuffle_writer) = + plan.as_any().downcast_ref::() + { + // recreate the shuffle writer with the correct working directory + ShuffleWriterExec::try_new( + job_id.clone(), + stage_id, + plan.children()[0].clone(), + self.work_dir.clone(), + shuffle_writer.shuffle_output_partitioning().cloned(), + ) + } else { + Err(DataFusionError::Internal( + "Plan passed to execute_shuffle_write is not a ShuffleWriterExec" + .to_string(), + )) + }?; - let exec = ShuffleWriterExec::try_new( - job_id, - stage_id, - plan, - self.work_dir.clone(), - shuffle_output_partitioning, - )?; let partitions = exec.execute_shuffle_write(part).await?; println!( - "=== Physical plan with metrics ===\n{}\n", + "=== [{}/{}/{}] Physical plan with metrics ===\n{}\n", + job_id, + stage_id, + part, DisplayableExecutionPlan::with_metrics(&exec) .indent() .to_string() diff --git a/ballista/rust/executor/src/flight_service.rs b/ballista/rust/executor/src/flight_service.rs index 9a3f2d872d52..73dd1a946d55 100644 --- a/ballista/rust/executor/src/flight_service.rs +++ b/ballista/rust/executor/src/flight_service.rs @@ -18,7 +18,6 @@ //! Implementation of the Apache Arrow Flight protocol that wraps an executor. use std::fs::File; -use std::path::PathBuf; use std::pin::Pin; use std::sync::Arc; @@ -82,24 +81,13 @@ impl FlightService for BallistaFlightService { request: Request, ) -> Result, Status> { let ticket = request.into_inner(); - info!("Received do_get request"); let action = decode_protobuf(&ticket.ticket).map_err(|e| from_ballista_err(&e))?; match &action { - BallistaAction::FetchPartition(partition_id) => { - // fetch a partition that was previously executed by this executor - info!("FetchPartition {:?}", partition_id); - - let mut path = PathBuf::from(self.executor.work_dir()); - path.push(&partition_id.job_id); - path.push(&format!("{}", partition_id.stage_id)); - path.push(&format!("{}", partition_id.partition_id)); - path.push("data.arrow"); - let path = path.to_str().unwrap(); - - info!("FetchPartition {:?} reading {}", partition_id, path); + BallistaAction::FetchPartition { path, .. } => { + info!("FetchPartition reading {}", &path); let file = File::open(&path) .map_err(|e| { BallistaError::General(format!( @@ -222,7 +210,11 @@ where let schema_flight_data = SchemaAsIpc::new(reader.schema().as_ref(), &options).into(); send_response(&tx, Ok(schema_flight_data)).await?; + let mut row_count = 0; for batch in reader { + if let Ok(x) = &batch { + row_count += x.num_rows(); + } let batch_flight_data: Vec<_> = batch .map(|b| create_flight_iter(&b, &options).collect()) .map_err(|e| from_arrow_err(&e))?; @@ -230,6 +222,7 @@ where send_response(&tx, batch.clone()).await?; } } + info!("FetchPartition streamed {} rows", row_count); Ok(()) } diff --git a/ballista/rust/scheduler/src/lib.rs b/ballista/rust/scheduler/src/lib.rs index 905437d4d980..f5e2dc1dfd80 100644 --- a/ballista/rust/scheduler/src/lib.rs +++ b/ballista/rust/scheduler/src/lib.rs @@ -76,10 +76,12 @@ use crate::planner::DistributedPlanner; use log::{debug, error, info, warn}; use rand::{distributions::Alphanumeric, thread_rng, Rng}; -use tonic::{Request, Response}; +use tonic::{Request, Response, Status}; use self::state::{ConfigBackendClient, SchedulerState}; use ballista_core::config::BallistaConfig; +use ballista_core::execution_plans::ShuffleWriterExec; +use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto; use ballista_core::utils::create_datafusion_context; use datafusion::physical_plan::parquet::ParquetExec; use std::time::{Instant, SystemTime, UNIX_EPOCH}; @@ -209,7 +211,7 @@ impl SchedulerGrpc for SchedulerServer { tonic::Status::internal(msg) })?; } - let task = if can_accept_task { + let task: Result, Status> = if can_accept_task { let plan = self .state .assign_next_schedulable_task(&metadata.id) @@ -229,15 +231,35 @@ impl SchedulerGrpc for SchedulerServer { partition_id.partition_id ); } - plan.map(|(status, plan)| TaskDefinition { - plan: Some(plan.try_into().unwrap()), - task_id: status.partition_id, - }) + match plan { + Some((status, plan)) => { + let plan_clone = plan.clone(); + let output_partitioning = if let Some(shuffle_writer) = + plan_clone.as_any().downcast_ref::() + { + shuffle_writer.shuffle_output_partitioning() + } else { + return Err(Status::invalid_argument(format!( + "Task root plan was not a ShuffleWriterExec: {:?}", + plan_clone + ))); + }; + Ok(Some(TaskDefinition { + plan: Some(plan.try_into().unwrap()), + task_id: status.partition_id, + output_partitioning: hash_partitioning_to_proto( + output_partitioning, + ) + .map_err(|_| Status::internal("TBD".to_string()))?, + })) + } + None => Ok(None), + } } else { - None + Ok(None) }; lock.unlock().await; - Ok(Response::new(PollWorkResult { task })) + Ok(Response::new(PollWorkResult { task: task? })) } else { warn!("Received invalid executor poll_work request"); Err(tonic::Status::invalid_argument( @@ -431,12 +453,12 @@ impl SchedulerGrpc for SchedulerServer { })); // save stages into state - for stage in stages { + for shuffle_writer in stages { fail_job!(state .save_stage_plan( &job_id_spawn, - stage.stage_id(), - stage.children()[0].clone() + shuffle_writer.stage_id(), + shuffle_writer.clone() ) .await .map_err(|e| { @@ -444,12 +466,13 @@ impl SchedulerGrpc for SchedulerServer { error!("{}", msg); tonic::Status::internal(msg) })); - let num_partitions = stage.output_partitioning().partition_count(); + let num_partitions = + shuffle_writer.output_partitioning().partition_count(); for partition_id in 0..num_partitions { let pending_status = TaskStatus { partition_id: Some(PartitionId { job_id: job_id_spawn.clone(), - stage_id: stage.stage_id() as u32, + stage_id: shuffle_writer.stage_id() as u32, partition_id: partition_id as u32, }), status: None, diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs index 11f5c994fd52..05025f282477 100644 --- a/ballista/rust/scheduler/src/planner.rs +++ b/ballista/rust/scheduler/src/planner.rs @@ -105,22 +105,24 @@ impl DistributedPlanner { .as_any() .downcast_ref::() { - let query_stage = create_shuffle_writer( + let shuffle_writer = create_shuffle_writer( job_id, self.next_stage_id(), - //TODO should be children[0].clone() so that we replace this - // with an UnresolvedShuffleExec instead of just executing this - // part of the plan again - // see https://github.com/apache/arrow-datafusion/issues/707 - coalesce.children()[0].clone(), + children[0].clone(), None, )?; let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new( - query_stage.stage_id(), - query_stage.schema(), - query_stage.output_partitioning().partition_count(), + shuffle_writer.stage_id(), + shuffle_writer.schema(), + shuffle_writer.output_partitioning().partition_count(), + shuffle_writer + .shuffle_output_partitioning() + .map(|p| p.partition_count()) + .unwrap_or_else(|| { + shuffle_writer.output_partitioning().partition_count() + }), )); - stages.push(query_stage); + stages.push(shuffle_writer); Ok(( coalesce.with_new_children(vec![unresolved_shuffle])?, stages, @@ -128,23 +130,33 @@ impl DistributedPlanner { } else if let Some(repart) = execution_plan.as_any().downcast_ref::() { - let query_stage = create_shuffle_writer( - job_id, - self.next_stage_id(), - //TODO should be children[0].clone() so that we replace this - // with an UnresolvedShuffleExec instead of just executing this - // part of the plan again - // see https://github.com/apache/arrow-datafusion/issues/707 - repart.children()[0].clone(), - Some(repart.partitioning().to_owned()), - )?; - let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new( - query_stage.stage_id(), - query_stage.schema(), - query_stage.output_partitioning().partition_count(), - )); - stages.push(query_stage); - Ok((unresolved_shuffle, stages)) + match repart.output_partitioning() { + Partitioning::Hash(_, _) => { + let shuffle_writer = create_shuffle_writer( + job_id, + self.next_stage_id(), + children[0].clone(), + Some(repart.partitioning().to_owned()), + )?; + let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new( + shuffle_writer.stage_id(), + shuffle_writer.schema(), + shuffle_writer.output_partitioning().partition_count(), + shuffle_writer + .shuffle_output_partitioning() + .map(|p| p.partition_count()) + .unwrap_or_else(|| { + shuffle_writer.output_partitioning().partition_count() + }), + )); + stages.push(shuffle_writer); + Ok((unresolved_shuffle, stages)) + } + _ => { + // remove any non-hash repartition from the distributed plan + Ok((children[0].clone(), stages)) + } + } } else if let Some(window) = execution_plan.as_any().downcast_ref::() { @@ -184,18 +196,22 @@ pub fn remove_unresolved_shuffles( })? .clone(); - for i in 0..unresolved_shuffle.partition_count { + for i in 0..unresolved_shuffle.output_partition_count { if let Some(x) = p.get(&i) { relevant_locations.push(x.to_owned()); } else { relevant_locations.push(vec![]); } } - println!( - "create shuffle reader with {:?}", + info!( + "Creating shuffle reader: {}", relevant_locations .iter() - .map(|c| format!("{:?}", c)) + .map(|c| c + .iter() + .map(|l| l.path.clone()) + .collect::>() + .join(", ")) .collect::>() .join("\n") ); @@ -235,7 +251,9 @@ mod test { use ballista_core::error::BallistaError; use ballista_core::execution_plans::UnresolvedShuffleExec; use ballista_core::serde::protobuf; + use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec}; + use datafusion::physical_plan::hash_join::HashJoinExec; use datafusion::physical_plan::sort::SortExec; use datafusion::physical_plan::{ coalesce_partitions::CoalescePartitionsExec, projection::ProjectionExec, @@ -284,9 +302,7 @@ mod test { ProjectionExec: expr=[l_returnflag@0 as l_returnflag, SUM(lineitem.l_extendedprice Multiply Int64(1))@1 as sum_disc_price] HashAggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag], aggr=[SUM(l_extendedprice Multiply Int64(1))] CoalesceBatchesExec: target_batch_size=4096 - RepartitionExec: partitioning=Hash([Column { name: "l_returnflag", index: 0 }], 2) - HashAggregateExec: mode=Partial, gby=[l_returnflag@1 as l_returnflag], aggr=[SUM(l_extendedprice Multiply Int64(1))] - CsvExec: source=Path(testdata/lineitem: [testdata/lineitem/partition0.tbl,testdata/lineitem/partition1.tbl]), has_header=false + UnresolvedShuffleExec ShuffleWriterExec: None SortExec: [l_returnflag@0 ASC] @@ -307,6 +323,14 @@ mod test { let final_hash = projection.children()[0].clone(); let final_hash = downcast_exec!(final_hash, HashAggregateExec); assert!(*final_hash.mode() == AggregateMode::FinalPartitioned); + let coalesce = final_hash.children()[0].clone(); + let coalesce = downcast_exec!(coalesce, CoalesceBatchesExec); + let unresolved_shuffle = coalesce.children()[0].clone(); + let unresolved_shuffle = + downcast_exec!(unresolved_shuffle, UnresolvedShuffleExec); + assert_eq!(unresolved_shuffle.stage_id, 1); + assert_eq!(unresolved_shuffle.input_partition_count, 2); + assert_eq!(unresolved_shuffle.output_partition_count, 2); // verify stage 2 let stage2 = stages[2].children()[0].clone(); @@ -314,10 +338,187 @@ mod test { let coalesce_partitions = sort.children()[0].clone(); let coalesce_partitions = downcast_exec!(coalesce_partitions, CoalescePartitionsExec); + assert_eq!( + coalesce_partitions.output_partitioning().partition_count(), + 1 + ); let unresolved_shuffle = coalesce_partitions.children()[0].clone(); let unresolved_shuffle = downcast_exec!(unresolved_shuffle, UnresolvedShuffleExec); assert_eq!(unresolved_shuffle.stage_id, 2); + assert_eq!(unresolved_shuffle.input_partition_count, 2); + assert_eq!(unresolved_shuffle.output_partition_count, 2); + + Ok(()) + } + + #[test] + fn distributed_join_plan() -> Result<(), BallistaError> { + let mut ctx = datafusion_test_context("testdata")?; + + // simplified form of TPC-H query 12 + let df = ctx.sql( + "select + l_shipmode, + sum(case + when o_orderpriority = '1-URGENT' + or o_orderpriority = '2-HIGH' + then 1 + else 0 + end) as high_line_count, + sum(case + when o_orderpriority <> '1-URGENT' + and o_orderpriority <> '2-HIGH' + then 1 + else 0 + end) as low_line_count +from + lineitem + join + orders + on + l_orderkey = o_orderkey +where + l_shipmode in ('MAIL', 'SHIP') + and l_commitdate < l_receiptdate + and l_shipdate < l_commitdate + and l_receiptdate >= date '1994-01-01' + and l_receiptdate < date '1995-01-01' +group by + l_shipmode +order by + l_shipmode; +", + )?; + + let plan = df.to_logical_plan(); + let plan = ctx.optimize(&plan)?; + let plan = ctx.create_physical_plan(&plan)?; + + let mut planner = DistributedPlanner::new(); + let job_uuid = Uuid::new_v4(); + let stages = planner.plan_query_stages(&job_uuid.to_string(), plan)?; + for stage in &stages { + println!("{}", displayable(stage.as_ref()).indent().to_string()); + } + + /* Expected result: + + ShuffleWriterExec: Some(Hash([Column { name: "l_orderkey", index: 0 }], 2)) + CoalesceBatchesExec: target_batch_size=4096 + FilterExec: l_shipmode@4 IN ([Literal { value: Utf8("MAIL") }, Literal { value: Utf8("SHIP") }]) AND l_commitdate@2 < l_receiptdate@3 AND l_shipdate@1 < l_commitdate@2 AND l_receiptdate@3 >= 8766 AND l_receiptdate@3 < 9131 + CsvExec: source=Path(testdata/lineitem: [testdata/lineitem/partition0.tbl,testdata/lineitem/partition1.tbl]), has_header=false + + ShuffleWriterExec: Some(Hash([Column { name: "o_orderkey", index: 0 }], 2)) + CsvExec: source=Path(testdata/orders: [testdata/orders/orders.tbl]), has_header=false + + ShuffleWriterExec: Some(Hash([Column { name: "l_shipmode", index: 0 }], 2)) + HashAggregateExec: mode=Partial, gby=[l_shipmode@4 as l_shipmode], aggr=[SUM(CASE WHEN #orders.o_orderpriority Eq Utf8("1-URGENT") Or #orders.o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN #orders.o_orderpriority NotEq Utf8("1-URGENT") And #orders.o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)] + CoalesceBatchesExec: target_batch_size=4096 + HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { name: "l_orderkey", index: 0 }, Column { name: "o_orderkey", index: 0 })] + CoalesceBatchesExec: target_batch_size=4096 + UnresolvedShuffleExec + CoalesceBatchesExec: target_batch_size=4096 + UnresolvedShuffleExec + + ShuffleWriterExec: None + ProjectionExec: expr=[l_shipmode@0 as l_shipmode, SUM(CASE WHEN #orders.o_orderpriority Eq Utf8("1-URGENT") Or #orders.o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)@1 as high_line_count, SUM(CASE WHEN #orders.o_orderpriority NotEq Utf8("1-URGENT") And #orders.o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)@2 as low_line_count] + HashAggregateExec: mode=FinalPartitioned, gby=[l_shipmode@0 as l_shipmode], aggr=[SUM(CASE WHEN #orders.o_orderpriority Eq Utf8("1-URGENT") Or #orders.o_orderpriority Eq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN #orders.o_orderpriority NotEq Utf8("1-URGENT") And #orders.o_orderpriority NotEq Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)] + CoalesceBatchesExec: target_batch_size=4096 + UnresolvedShuffleExec + + ShuffleWriterExec: None + SortExec: [l_shipmode@0 ASC] + CoalescePartitionsExec + UnresolvedShuffleExec + */ + + assert_eq!(5, stages.len()); + + // verify partitioning for each stage + + // csv "lineitem" (2 files) + assert_eq!( + 2, + stages[0].children()[0] + .output_partitioning() + .partition_count() + ); + assert_eq!( + 2, + stages[0] + .shuffle_output_partitioning() + .unwrap() + .partition_count() + ); + + // csv "orders" (1 file) + assert_eq!( + 1, + stages[1].children()[0] + .output_partitioning() + .partition_count() + ); + assert_eq!( + 2, + stages[1] + .shuffle_output_partitioning() + .unwrap() + .partition_count() + ); + + // join and partial hash aggregate + let input = stages[2].children()[0].clone(); + assert_eq!(2, input.output_partitioning().partition_count()); + assert_eq!( + 2, + stages[2] + .shuffle_output_partitioning() + .unwrap() + .partition_count() + ); + + let hash_agg = downcast_exec!(input, HashAggregateExec); + + let coalesce_batches = hash_agg.children()[0].clone(); + let coalesce_batches = downcast_exec!(coalesce_batches, CoalesceBatchesExec); + + let join = coalesce_batches.children()[0].clone(); + let join = downcast_exec!(join, HashJoinExec); + + let join_input_1 = join.children()[0].clone(); + // skip CoalesceBatches + let join_input_1 = join_input_1.children()[0].clone(); + let unresolved_shuffle_reader_1 = + downcast_exec!(join_input_1, UnresolvedShuffleExec); + assert_eq!(unresolved_shuffle_reader_1.input_partition_count, 2); // lineitem + assert_eq!(unresolved_shuffle_reader_1.output_partition_count, 2); + + let join_input_2 = join.children()[1].clone(); + // skip CoalesceBatches + let join_input_2 = join_input_2.children()[0].clone(); + let unresolved_shuffle_reader_2 = + downcast_exec!(join_input_2, UnresolvedShuffleExec); + assert_eq!(unresolved_shuffle_reader_2.input_partition_count, 1); //orders + assert_eq!(unresolved_shuffle_reader_2.output_partition_count, 2); + + // final partitioned hash aggregate + assert_eq!( + 2, + stages[3].children()[0] + .output_partitioning() + .partition_count() + ); + assert!(stages[3].shuffle_output_partitioning().is_none()); + + // coalesce partitions and sort + assert_eq!( + 1, + stages[4].children()[0] + .output_partitioning() + .partition_count() + ); + assert!(stages[4].shuffle_output_partitioning().is_none()); Ok(()) } diff --git a/ballista/rust/scheduler/src/state/mod.rs b/ballista/rust/scheduler/src/state/mod.rs index a4ae59e1dfda..0bbab8cebf89 100644 --- a/ballista/rust/scheduler/src/state/mod.rs +++ b/ballista/rust/scheduler/src/state/mod.rs @@ -297,18 +297,22 @@ impl SchedulerState { let mut partition_locations: HashMap< usize, // stage id HashMap< - usize, // shuffle input partition id - Vec, // shuffle output partitions + usize, // shuffle output partition id + Vec, // shuffle partitions >, > = HashMap::new(); for unresolved_shuffle in unresolved_shuffles { - for partition_id in 0..unresolved_shuffle.partition_count { + // we schedule one task per *input* partition and each input partition + // can produce multiple output partitions + for shuffle_input_partition_id in + 0..unresolved_shuffle.input_partition_count + { let referenced_task = tasks .get(&get_task_status_key( &self.namespace, &partition.job_id, unresolved_shuffle.stage_id, - partition_id, + shuffle_input_partition_id, )) .unwrap(); let task_is_dead = self @@ -323,7 +327,11 @@ impl SchedulerState { }, )) = &referenced_task.status { - let locations = partition_locations + debug!("Task for unresolved shuffle input partition {} completed and produced these shuffle partitions:\n\t{}", + shuffle_input_partition_id, + partitions.iter().map(|p| format!("{}={}", p.partition_id, &p.path)).collect::>().join("\n\t") + ); + let stage_shuffle_partition_locations = partition_locations .entry(unresolved_shuffle.stage_id) .or_insert_with(HashMap::new); let executor_meta = executors @@ -332,9 +340,10 @@ impl SchedulerState { .unwrap() .clone(); - let temp = - locations.entry(partition_id).or_insert_with(Vec::new); - for p in partitions { + for shuffle_write_partition in partitions { + let temp = stage_shuffle_partition_locations + .entry(shuffle_write_partition.partition_id as usize) + .or_insert_with(Vec::new); let executor_meta = executor_meta.clone(); let partition_location = ballista_core::serde::scheduler::PartitionLocation { @@ -342,29 +351,36 @@ impl SchedulerState { ballista_core::serde::scheduler::PartitionId { job_id: partition.job_id.clone(), stage_id: unresolved_shuffle.stage_id, - partition_id, + partition_id: shuffle_write_partition + .partition_id + as usize, }, executor_meta, partition_stats: PartitionStats::new( - Some(p.num_rows), - Some(p.num_batches), - Some(p.num_bytes), + Some(shuffle_write_partition.num_rows), + Some(shuffle_write_partition.num_batches), + Some(shuffle_write_partition.num_bytes), ), - path: p.path.clone(), + path: shuffle_write_partition.path.clone(), }; - info!( - "Scheduler storing stage {} partition {} path: {}", + debug!( + "Scheduler storing stage {} output partition {} path: {}", unresolved_shuffle.stage_id, - partition_id, + partition_location.partition_id.partition_id, partition_location.path - ); + ); temp.push(partition_location); } } else { + debug!( + "Stage {} input partition {} has not completed yet", + unresolved_shuffle.stage_id, shuffle_input_partition_id, + ); continue 'tasks; } } } + let plan = remove_unresolved_shuffles(plan.as_ref(), &partition_locations)?;