Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics to RepartitionExec #398

Merged
merged 3 commits into from
May 23, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 50 additions & 3 deletions datafusion/src/physical_plan/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::{any::Any, collections::HashMap, vec};
use std::time::Instant;
use std::{any::Any, vec};

use crate::error::{DataFusionError, Result};
use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning};
use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning, SQLMetric};
use arrow::record_batch::RecordBatch;
use arrow::{array::Array, error::Result as ArrowResult};
use arrow::{compute::take, datatypes::SchemaRef};
Expand All @@ -35,6 +36,7 @@ use async_trait::async_trait;

use futures::stream::Stream;
use futures::StreamExt;
use hashbrown::HashMap;
use tokio::sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
Mutex,
Expand All @@ -58,6 +60,12 @@ pub struct RepartitionExec {
HashMap<usize, (UnboundedSender<MaybeBatch>, UnboundedReceiver<MaybeBatch>)>,
>,
>,
/// Time in nanos to execute child operator and fetch batches
fetch_time_nanos: Arc<SQLMetric>,
/// Time in nanos to perform repartitioning
repart_time_nanos: Arc<SQLMetric>,
/// Time in nanos for sending resulting batches to channels
send_time_nanos: Arc<SQLMetric>,
}

impl RepartitionExec {
Expand Down Expand Up @@ -136,26 +144,46 @@ impl ExecutionPlan for RepartitionExec {
for i in 0..num_input_partitions {
let random_state = random.clone();
let input = self.input.clone();
let fetch_time = self.fetch_time_nanos.clone();
let repart_time = self.repart_time_nanos.clone();
let send_time = self.send_time_nanos.clone();
let mut txs: HashMap<_, _> = channels
.iter()
.map(|(partition, (tx, _rx))| (*partition, tx.clone()))
.collect();
let partitioning = self.partitioning.clone();
let _: JoinHandle<Result<()>> = tokio::spawn(async move {
// execute the child operator
let now = Instant::now();
let mut stream = input.execute(i).await?;
fetch_time.add(now.elapsed().as_nanos() as usize);

let mut counter = 0;
let hashes_buf = &mut vec![];

while let Some(result) = stream.next().await {
loop {
// fetch the next batch
let now = Instant::now();
let result = stream.next().await;
fetch_time.add(now.elapsed().as_nanos() as usize);

if result.is_none() {
break;
}
let result = result.unwrap();

match &partitioning {
Partitioning::RoundRobinBatch(_) => {
let now = Instant::now();
let output_partition = counter % num_output_partitions;
let tx = txs.get_mut(&output_partition).unwrap();
tx.send(Some(result)).map_err(|e| {
DataFusionError::Execution(e.to_string())
})?;
send_time.add(now.elapsed().as_nanos() as usize);
}
Partitioning::Hash(exprs, _) => {
let now = Instant::now();
let input_batch = result?;
let arrays = exprs
.iter()
Expand All @@ -176,9 +204,11 @@ impl ExecutionPlan for RepartitionExec {
[(*hash % num_output_partitions as u64) as usize]
.push(index as u64)
}
repart_time.add(now.elapsed().as_nanos() as usize);
for (num_output_partition, partition_indices) in
indices.into_iter().enumerate()
{
let now = Instant::now();
let indices = partition_indices.into();
// Produce batches based on indices
let columns = input_batch
Expand All @@ -198,10 +228,13 @@ impl ExecutionPlan for RepartitionExec {
input_batch.schema(),
columns,
);
repart_time.add(now.elapsed().as_nanos() as usize);
let now = Instant::now();
let tx = txs.get_mut(&num_output_partition).unwrap();
tx.send(Some(output_batch)).map_err(|e| {
DataFusionError::Execution(e.to_string())
})?;
send_time.add(now.elapsed().as_nanos() as usize);
}
}
other => {
Expand Down Expand Up @@ -236,6 +269,17 @@ impl ExecutionPlan for RepartitionExec {
}))
}

fn metrics(&self) -> HashMap<String, SQLMetric> {
let mut metrics = HashMap::new();
metrics.insert("fetchTime".to_owned(), (*self.fetch_time_nanos).clone());
metrics.insert(
"repartitionTime".to_owned(),
(*self.repart_time_nanos).clone(),
);
metrics.insert("sendTime".to_owned(), (*self.send_time_nanos).clone());
metrics
}

fn fmt_as(
&self,
t: DisplayFormatType,
Expand All @@ -259,6 +303,9 @@ impl RepartitionExec {
input,
partitioning,
channels: Arc::new(Mutex::new(HashMap::new())),
fetch_time_nanos: SQLMetric::time_nanos(),
repart_time_nanos: SQLMetric::time_nanos(),
send_time_nanos: SQLMetric::time_nanos(),
})
}
}
Expand Down