Skip to content

Commit

Permalink
Cleanup Repartition Exec code (#538)
Browse files Browse the repository at this point in the history
* Cleanup RepartitionExec code

* cleanup metric handling

* Add elapsed_nanos
  • Loading branch information
alamb authored Jun 13, 2021
1 parent 2568323 commit d382854
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 127 deletions.
5 changes: 5 additions & 0 deletions datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ impl SQLMetric {
self.value.fetch_add(n, Ordering::Relaxed);
}

/// Add elapsed nanoseconds since `start`to self
pub fn add_elapsed(&self, start: std::time::Instant) {
self.add(start.elapsed().as_nanos() as usize)
}

/// Get the current value
pub fn value(&self) -> usize {
self.value.load(Ordering::Relaxed)
Expand Down
279 changes: 152 additions & 127 deletions datafusion/src/physical_plan/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use futures::stream::Stream;
use futures::StreamExt;
use hashbrown::HashMap;
use tokio::sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
mpsc::{self, UnboundedReceiver, UnboundedSender},
Mutex,
};
use tokio::task::JoinHandle;
Expand All @@ -60,12 +60,40 @@ pub struct RepartitionExec {
HashMap<usize, (UnboundedSender<MaybeBatch>, UnboundedReceiver<MaybeBatch>)>,
>,
>,

/// Execution metrics
metrics: RepartitionMetrics,
}

#[derive(Debug, Clone)]
struct RepartitionMetrics {
/// Time in nanos to execute child operator and fetch batches
fetch_time_nanos: Arc<SQLMetric>,
fetch_nanos: Arc<SQLMetric>,
/// Time in nanos to perform repartitioning
repart_time_nanos: Arc<SQLMetric>,
repart_nanos: Arc<SQLMetric>,
/// Time in nanos for sending resulting batches to channels
send_time_nanos: Arc<SQLMetric>,
send_nanos: Arc<SQLMetric>,
}

impl RepartitionMetrics {
fn new() -> Self {
Self {
fetch_nanos: SQLMetric::time_nanos(),
repart_nanos: SQLMetric::time_nanos(),
send_nanos: SQLMetric::time_nanos(),
}
}
/// Convert into the external metrics form
fn to_hashmap(&self) -> HashMap<String, SQLMetric> {
let mut metrics = HashMap::new();
metrics.insert("fetchTime".to_owned(), self.fetch_nanos.as_ref().clone());
metrics.insert(
"repartitionTime".to_owned(),
self.repart_nanos.as_ref().clone(),
);
metrics.insert("sendTime".to_owned(), self.send_nanos.as_ref().clone());
metrics
}
}

impl RepartitionExec {
Expand Down Expand Up @@ -132,132 +160,33 @@ impl ExecutionPlan for RepartitionExec {
// being read yet. This may cause high memory usage if the next operator is
// reading output partitions in order rather than concurrently. One workaround
// for this would be to add spill-to-disk capabilities.
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<
Option<ArrowResult<RecordBatch>>,
>();
let (sender, receiver) =
mpsc::unbounded_channel::<Option<ArrowResult<RecordBatch>>>();
channels.insert(partition, (sender, receiver));
}
// Use fixed random state
let random = ahash::RandomState::with_seeds(0, 0, 0, 0);

// launch one async task per *input* partition
for i in 0..num_input_partitions {
let random_state = random.clone();
let input = self.input.clone();
let fetch_time = self.fetch_time_nanos.clone();
let repart_time = self.repart_time_nanos.clone();
let send_time = self.send_time_nanos.clone();
let txs: HashMap<_, _> = channels
.iter()
.map(|(partition, (tx, _rx))| (*partition, tx.clone()))
.collect();
let partitioning = self.partitioning.clone();
let mut txs_captured = txs.clone();
let input_task: JoinHandle<Result<()>> = tokio::spawn(async move {
// execute the child operator
let now = Instant::now();
let mut stream = input.execute(i).await?;
fetch_time.add(now.elapsed().as_nanos() as usize);

let mut counter = 0;
let hashes_buf = &mut vec![];

loop {
// fetch the next batch
let now = Instant::now();
let result = stream.next().await;
fetch_time.add(now.elapsed().as_nanos() as usize);

if result.is_none() {
break;
}
let result: ArrowResult<RecordBatch> = result.unwrap();

match &partitioning {
Partitioning::RoundRobinBatch(_) => {
let now = Instant::now();
let output_partition = counter % num_output_partitions;
let tx = txs_captured.get_mut(&output_partition).unwrap();
tx.send(Some(result)).map_err(|e| {
DataFusionError::Execution(e.to_string())
})?;
send_time.add(now.elapsed().as_nanos() as usize);
}
Partitioning::Hash(exprs, _) => {
let now = Instant::now();
let input_batch = result?;
let arrays = exprs
.iter()
.map(|expr| {
Ok(expr
.evaluate(&input_batch)?
.into_array(input_batch.num_rows()))
})
.collect::<Result<Vec<_>>>()?;
hashes_buf.clear();
hashes_buf.resize(arrays[0].len(), 0);
// Hash arrays and compute buckets based on number of partitions
let hashes =
create_hashes(&arrays, &random_state, hashes_buf)?;
let mut indices = vec![vec![]; num_output_partitions];
for (index, hash) in hashes.iter().enumerate() {
indices
[(*hash % num_output_partitions as u64) as usize]
.push(index as u64)
}
repart_time.add(now.elapsed().as_nanos() as usize);
for (num_output_partition, partition_indices) in
indices.into_iter().enumerate()
{
let now = Instant::now();
let indices = partition_indices.into();
// Produce batches based on indices
let columns = input_batch
.columns()
.iter()
.map(|c| {
take(c.as_ref(), &indices, None).map_err(
|e| {
DataFusionError::Execution(
e.to_string(),
)
},
)
})
.collect::<Result<Vec<Arc<dyn Array>>>>()?;
let output_batch = RecordBatch::try_new(
input_batch.schema(),
columns,
);
repart_time.add(now.elapsed().as_nanos() as usize);
let now = Instant::now();
let tx = txs_captured
.get_mut(&num_output_partition)
.unwrap();
tx.send(Some(output_batch)).map_err(|e| {
DataFusionError::Execution(e.to_string())
})?;
send_time.add(now.elapsed().as_nanos() as usize);
}
}
other => {
// this should be unreachable as long as the validation logic
// in the constructor is kept up-to-date
return Err(DataFusionError::NotImplemented(format!(
"Unsupported repartitioning scheme {:?}",
other
)));
}
}
counter += 1;
}

Ok(())
});
let input_task: JoinHandle<Result<()>> =
tokio::spawn(Self::pull_from_input(
random.clone(),
self.input.clone(),
i,
txs.clone(),
self.partitioning.clone(),
self.metrics.clone(),
));

// In a separate task, wait for each input to be done
// (and pass along any errors)
tokio::spawn(async move { Self::wait_for_task(input_task, txs).await });
// (and pass along any errors, including panic!s)
tokio::spawn(Self::wait_for_task(input_task, txs));
}
}

Expand All @@ -272,14 +201,7 @@ impl ExecutionPlan for RepartitionExec {
}

fn metrics(&self) -> HashMap<String, SQLMetric> {
let mut metrics = HashMap::new();
metrics.insert("fetchTime".to_owned(), (*self.fetch_time_nanos).clone());
metrics.insert(
"repartitionTime".to_owned(),
(*self.repart_time_nanos).clone(),
);
metrics.insert("sendTime".to_owned(), (*self.send_time_nanos).clone());
metrics
self.metrics.to_hashmap()
}

fn fmt_as(
Expand All @@ -305,12 +227,115 @@ impl RepartitionExec {
input,
partitioning,
channels: Arc::new(Mutex::new(HashMap::new())),
fetch_time_nanos: SQLMetric::time_nanos(),
repart_time_nanos: SQLMetric::time_nanos(),
send_time_nanos: SQLMetric::time_nanos(),
metrics: RepartitionMetrics::new(),
})
}

/// Pulls data from the specified input plan, feeding it to the
/// output partitions based on the desired partitioning
///
/// i is the input partition index
///
/// txs hold the output sending channels for each output partition
async fn pull_from_input(
random_state: ahash::RandomState,
input: Arc<dyn ExecutionPlan>,
i: usize,
mut txs: HashMap<usize, UnboundedSender<Option<ArrowResult<RecordBatch>>>>,
partitioning: Partitioning,
metrics: RepartitionMetrics,
) -> Result<()> {
let num_output_partitions = txs.len();

// execute the child operator
let now = Instant::now();
let mut stream = input.execute(i).await?;
metrics.fetch_nanos.add_elapsed(now);

let mut counter = 0;
let hashes_buf = &mut vec![];

loop {
// fetch the next batch
let now = Instant::now();
let result = stream.next().await;
metrics.fetch_nanos.add_elapsed(now);

if result.is_none() {
break;
}
let result: ArrowResult<RecordBatch> = result.unwrap();

match &partitioning {
Partitioning::RoundRobinBatch(_) => {
let now = Instant::now();
let output_partition = counter % num_output_partitions;
let tx = txs.get_mut(&output_partition).unwrap();
tx.send(Some(result))
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
metrics.send_nanos.add_elapsed(now);
}
Partitioning::Hash(exprs, _) => {
let now = Instant::now();
let input_batch = result?;
let arrays = exprs
.iter()
.map(|expr| {
Ok(expr
.evaluate(&input_batch)?
.into_array(input_batch.num_rows()))
})
.collect::<Result<Vec<_>>>()?;
hashes_buf.clear();
hashes_buf.resize(arrays[0].len(), 0);
// Hash arrays and compute buckets based on number of partitions
let hashes = create_hashes(&arrays, &random_state, hashes_buf)?;
let mut indices = vec![vec![]; num_output_partitions];
for (index, hash) in hashes.iter().enumerate() {
indices[(*hash % num_output_partitions as u64) as usize]
.push(index as u64)
}
metrics.repart_nanos.add_elapsed(now);
for (num_output_partition, partition_indices) in
indices.into_iter().enumerate()
{
let now = Instant::now();
let indices = partition_indices.into();
// Produce batches based on indices
let columns = input_batch
.columns()
.iter()
.map(|c| {
take(c.as_ref(), &indices, None).map_err(|e| {
DataFusionError::Execution(e.to_string())
})
})
.collect::<Result<Vec<Arc<dyn Array>>>>()?;
let output_batch =
RecordBatch::try_new(input_batch.schema(), columns);
metrics.repart_nanos.add_elapsed(now);
let now = Instant::now();
let tx = txs.get_mut(&num_output_partition).unwrap();
tx.send(Some(output_batch))
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
metrics.send_nanos.add_elapsed(now);
}
}
other => {
// this should be unreachable as long as the validation logic
// in the constructor is kept up-to-date
return Err(DataFusionError::NotImplemented(format!(
"Unsupported repartitioning scheme {:?}",
other
)));
}
}
counter += 1;
}

Ok(())
}

/// Waits for `input_task` which is consuming one of the inputs to
/// complete. Upon each successful completion, sends a `None` to
/// each of the output tx channels to signal one of the inputs is
Expand Down

0 comments on commit d382854

Please sign in to comment.