Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: optimize series divide algo #4603

Merged
merged 2 commits into from
Aug 22, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 93 additions & 43 deletions src/promql/src/extension_plan/series_divide.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,12 @@ impl ExecutionPlan for SeriesDivideExec {
.collect();
Ok(Box::pin(SeriesDivideStream {
tag_indices,
buffer: None,
buffer: vec![],
schema,
input,
metric: baseline_metric,
num_series: 0,
inspect_start: 0,
}))
}

Expand All @@ -231,11 +232,13 @@ impl DisplayAs for SeriesDivideExec {
/// Assume the input stream is ordered on the tag columns.
pub struct SeriesDivideStream {
tag_indices: Vec<usize>,
buffer: Option<RecordBatch>,
buffer: Vec<RecordBatch>,
schema: SchemaRef,
input: SendableRecordBatchStream,
metric: BaselineMetrics,
num_series: usize,
/// Index of buffered batches to start inspect next time.
inspect_start: usize,
}

impl RecordBatchStream for SeriesDivideStream {
Expand All @@ -248,30 +251,45 @@ impl Stream for SeriesDivideStream {
type Item = DataFusionResult<RecordBatch>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let timer = std::time::Instant::now();
loop {
if let Some(batch) = self.buffer.as_ref() {
let same_length = self.find_first_diff_row(batch) + 1;
if same_length >= batch.num_rows() {
if !self.buffer.is_empty() {
let cut_at = self.find_first_diff_row();
if let Some((batch_index, row_index)) = cut_at {
// slice out the first time series and return it.
let half_batch_of_first_series =
self.buffer[batch_index].slice(0, row_index + 1);
let half_batch_of_second_series = self.buffer[batch_index].slice(
row_index + 1,
self.buffer[batch_index].num_rows() - row_index - 1,
);
let result_batches = self
.buffer
.drain(0..batch_index)
.chain([half_batch_of_first_series])
.collect::<Vec<_>>();
self.buffer[0] = half_batch_of_second_series;
let result_batch = compute::concat_batches(&self.schema, &result_batches)?;

self.inspect_start = 0;
self.num_series += 1;
self.metric.elapsed_compute().add_elapsed(timer);
return Poll::Ready(Some(Ok(result_batch)));
} else {
// continue to fetch next batch as the current buffer only contains one time series.
let next_batch = ready!(self.as_mut().fetch_next_batch(cx)).transpose()?;
// SAFETY: if-let guards the buffer is not None;
// and we cannot change the buffer at this point.
let batch = self.buffer.take().expect("this batch must exist");
if let Some(next_batch) = next_batch {
self.buffer = Some(compute::concat_batches(
&batch.schema(),
&[batch, next_batch],
)?);
self.buffer.push(next_batch);
continue;
} else {
// input stream is ended
let result = compute::concat_batches(&self.schema, &self.buffer)?;
self.buffer.clear();
self.inspect_start = 0;
self.num_series += 1;
return Poll::Ready(Some(Ok(batch)));
self.metric.elapsed_compute().add_elapsed(timer);
return Poll::Ready(Some(Ok(result)));
}
} else {
let result_batch = batch.slice(0, same_length);
let remaining_batch = batch.slice(same_length, batch.num_rows() - same_length);
self.buffer = Some(remaining_batch);
self.num_series += 1;
return Poll::Ready(Some(Ok(result_batch)));
}
} else {
let batch = match ready!(self.as_mut().fetch_next_batch(cx)) {
Expand All @@ -282,7 +300,7 @@ impl Stream for SeriesDivideStream {
}
error => return Poll::Ready(error),
};
self.buffer = Some(batch);
self.buffer.push(batch);
continue;
}
}
Expand All @@ -294,40 +312,72 @@ impl SeriesDivideStream {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<DataFusionResult<RecordBatch>>> {
let poll = match self.input.poll_next_unpin(cx) {
Poll::Ready(batch) => {
let _timer = self.metric.elapsed_compute().timer();
Poll::Ready(batch)
}
Poll::Pending => Poll::Pending,
};
let poll = self.input.poll_next_unpin(cx);
self.metric.record_poll(poll)
}

fn find_first_diff_row(&self, batch: &RecordBatch) -> usize {
/// Return the position to cut buffer.
/// None implies the current buffer only contains one time series.
fn find_first_diff_row(&mut self) -> Option<(usize, usize)> {
// fast path: no tag columns means all data belongs to the same series.
if self.tag_indices.is_empty() {
return batch.num_rows();
return None;
}

let num_rows = batch.num_rows();
let mut result = num_rows;

for index in &self.tag_indices {
let array = batch.column(*index);
let string_array = array.as_any().downcast_ref::<StringArray>().unwrap();
// the first row number that not equal to the next row.
let mut same_until = 0;
while same_until < num_rows - 1 {
if string_array.value(same_until) != string_array.value(same_until + 1) {
break;
let mut resumed_batch_index = self.inspect_start;

for batch in &self.buffer[resumed_batch_index..] {
let num_rows = batch.num_rows();
let mut result_index = num_rows;

// check if the first row is the same with last batch's last row
if resumed_batch_index > self.inspect_start {
let last_batch = &self.buffer[resumed_batch_index - 1];
let last_row = last_batch.num_rows() - 1;
for index in &self.tag_indices {
let current_array = batch.column(*index);
let last_array = last_batch.column(*index);
let current_value = current_array
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.value(0);
let last_value = last_array
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.value(last_row);
if current_value != last_value {
return Some((resumed_batch_index, 0));
}
}
same_until += 1;
}
result = result.min(same_until);

// check column by column
for index in &self.tag_indices {
let array = batch.column(*index);
let string_array = array.as_any().downcast_ref::<StringArray>().unwrap();
// the first row number that not equal to the next row.
let mut same_until = 0;
while same_until < num_rows - 1 {
if string_array.value(same_until) != string_array.value(same_until + 1) {
break;
}
same_until += 1;
}
result_index = result_index.min(same_until);
}

if result_index + 1 >= num_rows {
// all rows are the same, inspect next batch
resumed_batch_index += 1;
} else {
return Some((resumed_batch_index, result_index));
}
}

result
self.inspect_start = resumed_batch_index;
None
}
}

Expand Down