Skip to content

Commit

Permalink
feat: collect filters metrics for scanners (#4591)
Browse files Browse the repository at this point in the history
* feat: collect filter metrics

* refactor: reuse ReaderFilterMetrics

* feat: record read rows from parquet by type

* feat: unordered scan observe rows

also fix read type

* chore: rename label
  • Loading branch information
evenyag authored Aug 22, 2024
1 parent 0025fa6 commit d628079
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 70 deletions.
5 changes: 4 additions & 1 deletion src/mito2/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,10 @@ impl<'a> CompactionSstReaderBuilder<'a> {
scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
}

SeqScan::new(scan_input).build_reader().await
SeqScan::new(scan_input)
.with_compaction()
.build_reader()
.await
}
}

Expand Down
10 changes: 9 additions & 1 deletion src/mito2/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use crate::error::{
use crate::memtable::BoxedBatchIterator;
use crate::metrics::{READ_BATCHES_RETURN, READ_ROWS_RETURN, READ_STAGE_ELAPSED};
use crate::read::prune::PruneReader;
use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics};

/// Storage internal representation of a batch of rows for a primary key (time series).
///
Expand Down Expand Up @@ -752,11 +753,13 @@ pub(crate) struct ScannerMetrics {
num_batches: usize,
/// Number of rows returned.
num_rows: usize,
/// Filter related metrics for readers.
filter_metrics: ReaderFilterMetrics,
}

impl ScannerMetrics {
/// Sets and observes metrics on initializing parts.
fn observe_init_part(&mut self, build_parts_cost: Duration) {
fn observe_init_part(&mut self, build_parts_cost: Duration, reader_metrics: &ReaderMetrics) {
self.build_parts_cost = build_parts_cost;

// Observes metrics.
Expand All @@ -766,6 +769,11 @@ impl ScannerMetrics {
READ_STAGE_ELAPSED
.with_label_values(&["build_parts"])
.observe(self.build_parts_cost.as_secs_f64());

// We only call this once so we overwrite it directly.
self.filter_metrics = reader_metrics.filter_metrics;
// Observes filter metrics.
self.filter_metrics.observe();
}

/// Observes metrics on scanner finish.
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/read/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,13 @@ impl PruneReader {
let num_rows_before_filter = batch.num_rows();
let Some(batch_filtered) = self.context.precise_filter(batch)? else {
// the entire batch is filtered out
self.metrics.num_rows_precise_filtered += num_rows_before_filter;
self.metrics.filter_metrics.num_rows_precise_filtered += num_rows_before_filter;
return Ok(None);
};

// update metric
let filtered_rows = num_rows_before_filter - batch_filtered.num_rows();
self.metrics.num_rows_precise_filtered += filtered_rows;
self.metrics.filter_metrics.num_rows_precise_filtered += filtered_rows;

if !batch_filtered.is_empty() {
Ok(Some(batch_filtered))
Expand Down
8 changes: 5 additions & 3 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
use crate::sst::parquet::file_range::FileRange;
use crate::sst::parquet::reader::ReaderMetrics;

/// A scanner scans a region and returns a [SendableRecordBatchStream].
pub(crate) enum Scanner {
Expand Down Expand Up @@ -606,8 +607,9 @@ impl ScanInput {
pub(crate) async fn prune_file_ranges(
&self,
collector: &mut impl FileRangeCollector,
) -> Result<()> {
) -> Result<ReaderMetrics> {
let mut file_prune_cost = Duration::ZERO;
let mut reader_metrics = ReaderMetrics::default();
for file in &self.files {
let prune_start = Instant::now();
let res = self
Expand All @@ -620,7 +622,7 @@ impl ScanInput {
.inverted_index_applier(self.inverted_index_applier.clone())
.fulltext_index_applier(self.fulltext_index_applier.clone())
.expected_metadata(Some(self.mapper.metadata().clone()))
.build_reader_input()
.build_reader_input(&mut reader_metrics)
.await;
file_prune_cost += prune_start.elapsed();
let (mut file_range_ctx, row_groups) = match res {
Expand Down Expand Up @@ -665,7 +667,7 @@ impl ScanInput {
file_prune_cost
);

Ok(())
Ok(reader_metrics)
}

/// Scans the input source in another task and sends batches to the sender.
Expand Down
47 changes: 39 additions & 8 deletions src/mito2/src/read/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ pub struct SeqScan {
/// Semaphore to control scan parallelism of files.
/// Streams created by the scanner share the same semaphore.
semaphore: Arc<Semaphore>,
/// The scanner is used for compaction.
compaction: bool,
}

impl SeqScan {
Expand All @@ -75,9 +77,16 @@ impl SeqScan {
properties,
stream_ctx,
semaphore: Arc::new(Semaphore::new(parallelism)),
compaction: false,
}
}

/// Sets the scanner to be used for compaction.
pub(crate) fn with_compaction(mut self) -> Self {
self.compaction = true;
self
}

/// Builds a stream for the query.
///
/// The returned stream is not partitioned and will contains all the data. If want
Expand All @@ -97,9 +106,13 @@ impl SeqScan {
prepare_scan_cost: self.stream_ctx.query_start.elapsed(),
..Default::default()
};
let maybe_reader =
Self::build_all_merge_reader(&self.stream_ctx, self.semaphore.clone(), &mut metrics)
.await?;
let maybe_reader = Self::build_all_merge_reader(
&self.stream_ctx,
self.semaphore.clone(),
&mut metrics,
self.compaction,
)
.await?;
// Safety: `build_merge_reader()` always returns a reader if partition is None.
let reader = maybe_reader.unwrap();
Ok(Box::new(reader))
Expand All @@ -110,13 +123,19 @@ impl SeqScan {
part: &ScanPart,
sources: &mut Vec<Source>,
row_selector: Option<TimeSeriesRowSelector>,
compaction: bool,
) -> Result<()> {
sources.reserve(part.memtable_ranges.len() + part.file_ranges.len());
// Read memtables.
for mem in &part.memtable_ranges {
let iter = mem.build_iter()?;
sources.push(Source::Iter(iter));
}
let read_type = if compaction {
"compaction"
} else {
"seq_scan_files"
};
// Read files.
for file in &part.file_ranges {
if file.is_empty() {
Expand Down Expand Up @@ -148,6 +167,8 @@ impl SeqScan {
"Seq scan region {}, file {}, {} ranges finished, metrics: {:?}",
region_id, file_id, range_num, reader_metrics
);
// Reports metrics.
reader_metrics.observe_rows(read_type);
};
let stream = Box::pin(stream);
sources.push(Source::Stream(stream));
Expand All @@ -161,6 +182,7 @@ impl SeqScan {
stream_ctx: &StreamContext,
semaphore: Arc<Semaphore>,
metrics: &mut ScannerMetrics,
compaction: bool,
) -> Result<Option<BoxedBatchReader>> {
// initialize parts list
let mut parts = stream_ctx.parts.lock().await;
Expand All @@ -173,7 +195,7 @@ impl SeqScan {
return Ok(None);
};

Self::build_part_sources(part, &mut sources, None)?;
Self::build_part_sources(part, &mut sources, None, compaction)?;
}

Self::build_reader_from_sources(stream_ctx, sources, semaphore).await
Expand All @@ -187,6 +209,7 @@ impl SeqScan {
range_id: usize,
semaphore: Arc<Semaphore>,
metrics: &mut ScannerMetrics,
compaction: bool,
) -> Result<Option<BoxedBatchReader>> {
let mut sources = Vec::new();
let build_start = {
Expand All @@ -198,7 +221,12 @@ impl SeqScan {
};

let build_start = Instant::now();
Self::build_part_sources(part, &mut sources, stream_ctx.input.series_row_selector)?;
Self::build_part_sources(
part,
&mut sources,
stream_ctx.input.series_row_selector,
compaction,
)?;

build_start
};
Expand Down Expand Up @@ -281,12 +309,13 @@ impl SeqScan {
let stream_ctx = self.stream_ctx.clone();
let semaphore = self.semaphore.clone();
let partition_ranges = self.properties.partitions[partition].clone();
let compaction = self.compaction;
let stream = try_stream! {
let first_poll = stream_ctx.query_start.elapsed();

for partition_range in partition_ranges {
let maybe_reader =
Self::build_merge_reader(&stream_ctx, partition_range.identifier, semaphore.clone(), &mut metrics)
Self::build_merge_reader(&stream_ctx, partition_range.identifier, semaphore.clone(), &mut metrics, compaction)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
Expand Down Expand Up @@ -359,6 +388,7 @@ impl SeqScan {
};
let stream_ctx = self.stream_ctx.clone();
let semaphore = self.semaphore.clone();
let compaction = self.compaction;

// build stream
let stream = try_stream! {
Expand All @@ -379,6 +409,7 @@ impl SeqScan {
id,
semaphore.clone(),
&mut metrics,
compaction,
)
.await
.map_err(BoxedError::new)
Expand Down Expand Up @@ -439,7 +470,7 @@ impl SeqScan {
if part_list.0.is_none() {
let now = Instant::now();
let mut distributor = SeqDistributor::default();
input.prune_file_ranges(&mut distributor).await?;
let reader_metrics = input.prune_file_ranges(&mut distributor).await?;
distributor.append_mem_ranges(
&input.memtables,
Some(input.mapper.column_ids()),
Expand All @@ -451,7 +482,7 @@ impl SeqScan {
let build_part_cost = now.elapsed();
part_list.1 = build_part_cost;

metrics.observe_init_part(build_part_cost);
metrics.observe_init_part(build_part_cost, &reader_metrics);
} else {
// Updates the cost of building parts.
metrics.build_parts_cost = part_list.1;
Expand Down
5 changes: 3 additions & 2 deletions src/mito2/src/read/unordered_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ impl RegionScanner for UnorderedScan {
}
}

reader_metrics.observe_rows("unordered_scan_files");
metrics.total_cost = query_start.elapsed();
metrics.observe_metrics_on_finish();
debug!(
Expand Down Expand Up @@ -263,7 +264,7 @@ async fn maybe_init_parts(
if part_list.0.is_none() {
let now = Instant::now();
let mut distributor = UnorderedDistributor::default();
input.prune_file_ranges(&mut distributor).await?;
let reader_metrics = input.prune_file_ranges(&mut distributor).await?;
distributor.append_mem_ranges(
&input.memtables,
Some(input.mapper.column_ids()),
Expand All @@ -275,7 +276,7 @@ async fn maybe_init_parts(
let build_part_cost = now.elapsed();
part_list.1 = build_part_cost;

metrics.observe_init_part(build_part_cost);
metrics.observe_init_part(build_part_cost, &reader_metrics);
} else {
// Updates the cost of building parts.
metrics.build_parts_cost = part_list.1;
Expand Down
Loading

0 comments on commit d628079

Please sign in to comment.