diff --git a/src/promql/src/extension_plan/series_divide.rs b/src/promql/src/extension_plan/series_divide.rs index 772ee079eb77..87a0ad65b650 100644 --- a/src/promql/src/extension_plan/series_divide.rs +++ b/src/promql/src/extension_plan/series_divide.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::any::Any; +use std::cmp::min; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -255,29 +256,24 @@ impl Stream for SeriesDivideStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { - if let Some(batch) = self.buffer.clone() { - let same_length = self.find_first_diff_row(&batch) + 1; - if same_length >= batch.num_rows() { - let next_batch = match ready!(self.as_mut().fetch_next_batch(cx)) { - Some(Ok(batch)) => batch, - None => { - self.buffer = None; - self.num_series += 1; - return Poll::Ready(Some(Ok(batch))); - } - error => return Poll::Ready(error), - }; - let new_batch = - compute::concat_batches(&batch.schema(), &[batch.clone(), next_batch])?; - self.buffer = Some(new_batch); - continue; - } else { - let result_batch = batch.slice(0, same_length); - let remaining_batch = batch.slice(same_length, batch.num_rows() - same_length); - self.buffer = Some(remaining_batch); + if let Some(batch) = self.buffer.take() { + let next_batch = match ready!(self.as_mut().fetch_next_batch(cx)) { + Some(Ok(batch)) => batch, + None => { + self.num_series += 1; + return Poll::Ready(Some(Ok(batch))); + } + error => return Poll::Ready(error), + }; + + if !self.is_same_series(&batch, &next_batch) { + self.buffer = Some(next_batch); self.num_series += 1; - return Poll::Ready(Some(Ok(result_batch))); + return Poll::Ready(Some(Ok(batch))); } + + let new_batch = compute::concat_batches(&batch.schema(), &[batch, next_batch])?; + self.buffer = Some(new_batch); } else { let batch = match ready!(self.as_mut().fetch_next_batch(cx)) { Some(Ok(batch)) => batch, @@ -288,7 +284,6 @@ impl Stream for SeriesDivideStream { error => return Poll::Ready(error), }; self.buffer = Some(batch); - continue; } } } @@ -309,30 +304,33 @@ impl SeriesDivideStream { self.metric.record_poll(poll) } - fn find_first_diff_row(&self, batch: &RecordBatch) -> usize { - // fast path: no tag columns means all data belongs to the same series. + fn is_same_series(&self, current: &RecordBatch, next: &RecordBatch) -> bool { if self.tag_indices.is_empty() { - return batch.num_rows(); + return true; } - let num_rows = batch.num_rows(); - let mut result = num_rows; + if min(current.num_rows(), next.num_rows()) == 0 { + return true; + } for index in &self.tag_indices { - let array = batch.column(*index); - let string_array = array.as_any().downcast_ref::().unwrap(); - // the first row number that not equal to the next row. - let mut same_until = 0; - while same_until < num_rows - 1 { - if string_array.value(same_until) != string_array.value(same_until + 1) { - break; - } - same_until += 1; + let current_array = current + .column(*index) + .as_any() + .downcast_ref::() + .unwrap(); + let next_array = next + .column(*index) + .as_any() + .downcast_ref::() + .unwrap(); + + if current_array.value(0) != next_array.value(0) { + return false; } - result = result.min(same_until); } - result + true } } @@ -350,31 +348,55 @@ mod test { Field::new("path", DataType::Utf8, true), ])); - let path_column_1 = Arc::new(StringArray::from(vec![ - "foo", "foo", "foo", "bar", "bar", "bar", "bar", "bar", "bar", "bla", "bla", "bla", - ])) as _; - let host_column_1 = Arc::new(StringArray::from(vec![ - "000", "000", "001", "002", "002", "002", "002", "002", "003", "005", "005", "005", - ])) as _; + let path_column_1 = Arc::new(StringArray::from(vec!["foo", "foo"])) as _; + let host_column_1 = Arc::new(StringArray::from(vec!["000", "000"])) as _; + + let path_column_2 = Arc::new(StringArray::from(vec!["foo"])) as _; + let host_column_2 = Arc::new(StringArray::from(vec!["001"])) as _; + + let path_column_3 = + Arc::new(StringArray::from(vec!["bar", "bar", "bar", "bar", "bar"])) as _; + let host_column_3 = + Arc::new(StringArray::from(vec!["002", "002", "002", "002", "002"])) as _; - let path_column_2 = Arc::new(StringArray::from(vec!["bla", "bla", "bla"])) as _; - let host_column_2 = Arc::new(StringArray::from(vec!["005", "005", "005"])) as _; + let path_column_4 = Arc::new(StringArray::from(vec!["bar"])) as _; + let host_column_4 = Arc::new(StringArray::from(vec!["003"])) as _; - let path_column_3 = Arc::new(StringArray::from(vec![ - "bla", "🥺", "🥺", "🥺", "🥺", "🥺", "🫠", "🫠", + let path_column_5 = Arc::new(StringArray::from(vec![ + "bla", "bla", "bla", "bla", "bla", "bla", "bla", ])) as _; - let host_column_3 = Arc::new(StringArray::from(vec![ - "005", "001", "001", "001", "001", "001", "001", "001", + let host_column_5 = Arc::new(StringArray::from(vec![ + "005", "005", "005", "005", "005", "005", "005", ])) as _; + let path_column_6 = Arc::new(StringArray::from(vec!["🥺", "🥺", "🥺", "🥺", "🥺"])) as _; + let host_column_6 = + Arc::new(StringArray::from(vec!["001", "001", "001", "001", "001"])) as _; + + let path_column_7 = Arc::new(StringArray::from(vec!["🫠", "🫠"])) as _; + let host_column_7 = Arc::new(StringArray::from(vec!["001", "001"])) as _; + let data_1 = RecordBatch::try_new(schema.clone(), vec![path_column_1, host_column_1]).unwrap(); let data_2 = RecordBatch::try_new(schema.clone(), vec![path_column_2, host_column_2]).unwrap(); let data_3 = RecordBatch::try_new(schema.clone(), vec![path_column_3, host_column_3]).unwrap(); - - MemoryExec::try_new(&[vec![data_1, data_2, data_3]], schema, None).unwrap() + let data_4 = + RecordBatch::try_new(schema.clone(), vec![path_column_4, host_column_4]).unwrap(); + let data_5 = + RecordBatch::try_new(schema.clone(), vec![path_column_5, host_column_5]).unwrap(); + let data_6 = + RecordBatch::try_new(schema.clone(), vec![path_column_6, host_column_6]).unwrap(); + let data_7 = + RecordBatch::try_new(schema.clone(), vec![path_column_7, host_column_7]).unwrap(); + + MemoryExec::try_new( + &[vec![data_1, data_2, data_3, data_4, data_5, data_6, data_7]], + schema, + None, + ) + .unwrap() } #[tokio::test]