Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: collect filters metrics for scanners #4591

Merged
merged 5 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/mito2/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,10 @@ impl<'a> CompactionSstReaderBuilder<'a> {
scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
}

SeqScan::new(scan_input).build_reader().await
SeqScan::new(scan_input)
.with_compaction()
.build_reader()
.await
}
}

Expand Down
10 changes: 9 additions & 1 deletion src/mito2/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use crate::error::{
use crate::memtable::BoxedBatchIterator;
use crate::metrics::{READ_BATCHES_RETURN, READ_ROWS_RETURN, READ_STAGE_ELAPSED};
use crate::read::prune::PruneReader;
use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics};

/// Storage internal representation of a batch of rows for a primary key (time series).
///
Expand Down Expand Up @@ -752,11 +753,13 @@ pub(crate) struct ScannerMetrics {
num_batches: usize,
/// Number of rows returned.
num_rows: usize,
/// Filter related metrics for readers.
filter_metrics: ReaderFilterMetrics,
}

impl ScannerMetrics {
/// Sets and observes metrics on initializing parts.
fn observe_init_part(&mut self, build_parts_cost: Duration) {
fn observe_init_part(&mut self, build_parts_cost: Duration, reader_metrics: &ReaderMetrics) {
self.build_parts_cost = build_parts_cost;

// Observes metrics.
Expand All @@ -766,6 +769,11 @@ impl ScannerMetrics {
READ_STAGE_ELAPSED
.with_label_values(&["build_parts"])
.observe(self.build_parts_cost.as_secs_f64());

// We only call this once so we overwrite it directly.
self.filter_metrics = reader_metrics.filter_metrics;
// Observes filter metrics.
self.filter_metrics.observe();
}

/// Observes metrics on scanner finish.
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/read/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,13 @@ impl PruneReader {
let num_rows_before_filter = batch.num_rows();
let Some(batch_filtered) = self.context.precise_filter(batch)? else {
// the entire batch is filtered out
self.metrics.num_rows_precise_filtered += num_rows_before_filter;
self.metrics.filter_metrics.num_rows_precise_filtered += num_rows_before_filter;
return Ok(None);
};

// update metric
let filtered_rows = num_rows_before_filter - batch_filtered.num_rows();
self.metrics.num_rows_precise_filtered += filtered_rows;
self.metrics.filter_metrics.num_rows_precise_filtered += filtered_rows;

if !batch_filtered.is_empty() {
Ok(Some(batch_filtered))
Expand Down
8 changes: 5 additions & 3 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
use crate::sst::parquet::file_range::FileRange;
use crate::sst::parquet::reader::ReaderMetrics;

/// A scanner scans a region and returns a [SendableRecordBatchStream].
pub(crate) enum Scanner {
Expand Down Expand Up @@ -606,8 +607,9 @@ impl ScanInput {
pub(crate) async fn prune_file_ranges(
&self,
collector: &mut impl FileRangeCollector,
) -> Result<()> {
) -> Result<ReaderMetrics> {
let mut file_prune_cost = Duration::ZERO;
let mut reader_metrics = ReaderMetrics::default();
for file in &self.files {
let prune_start = Instant::now();
let res = self
Expand All @@ -620,7 +622,7 @@ impl ScanInput {
.inverted_index_applier(self.inverted_index_applier.clone())
.fulltext_index_applier(self.fulltext_index_applier.clone())
.expected_metadata(Some(self.mapper.metadata().clone()))
.build_reader_input()
.build_reader_input(&mut reader_metrics)
.await;
file_prune_cost += prune_start.elapsed();
let (mut file_range_ctx, row_groups) = match res {
Expand Down Expand Up @@ -665,7 +667,7 @@ impl ScanInput {
file_prune_cost
);

Ok(())
Ok(reader_metrics)
}

/// Scans the input source in another task and sends batches to the sender.
Expand Down
47 changes: 39 additions & 8 deletions src/mito2/src/read/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ pub struct SeqScan {
/// Semaphore to control scan parallelism of files.
/// Streams created by the scanner share the same semaphore.
semaphore: Arc<Semaphore>,
/// The scanner is used for compaction.
compaction: bool,
}

impl SeqScan {
Expand All @@ -75,9 +77,16 @@ impl SeqScan {
properties,
stream_ctx,
semaphore: Arc::new(Semaphore::new(parallelism)),
compaction: false,
}
}

/// Sets the scanner to be used for compaction.
pub(crate) fn with_compaction(mut self) -> Self {
self.compaction = true;
self
}

/// Builds a stream for the query.
///
/// The returned stream is not partitioned and will contains all the data. If want
Expand All @@ -97,9 +106,13 @@ impl SeqScan {
prepare_scan_cost: self.stream_ctx.query_start.elapsed(),
..Default::default()
};
let maybe_reader =
Self::build_all_merge_reader(&self.stream_ctx, self.semaphore.clone(), &mut metrics)
.await?;
let maybe_reader = Self::build_all_merge_reader(
&self.stream_ctx,
self.semaphore.clone(),
&mut metrics,
self.compaction,
)
.await?;
// Safety: `build_merge_reader()` always returns a reader if partition is None.
let reader = maybe_reader.unwrap();
Ok(Box::new(reader))
Expand All @@ -110,13 +123,19 @@ impl SeqScan {
part: &ScanPart,
sources: &mut Vec<Source>,
row_selector: Option<TimeSeriesRowSelector>,
compaction: bool,
) -> Result<()> {
sources.reserve(part.memtable_ranges.len() + part.file_ranges.len());
// Read memtables.
for mem in &part.memtable_ranges {
let iter = mem.build_iter()?;
sources.push(Source::Iter(iter));
}
let read_type = if compaction {
"compaction"
} else {
"seq_scan_files"
};
// Read files.
for file in &part.file_ranges {
if file.is_empty() {
Expand Down Expand Up @@ -148,6 +167,8 @@ impl SeqScan {
"Seq scan region {}, file {}, {} ranges finished, metrics: {:?}",
region_id, file_id, range_num, reader_metrics
);
// Reports metrics.
reader_metrics.observe_rows(read_type);
};
let stream = Box::pin(stream);
sources.push(Source::Stream(stream));
Expand All @@ -161,6 +182,7 @@ impl SeqScan {
stream_ctx: &StreamContext,
semaphore: Arc<Semaphore>,
metrics: &mut ScannerMetrics,
compaction: bool,
) -> Result<Option<BoxedBatchReader>> {
// initialize parts list
let mut parts = stream_ctx.parts.lock().await;
Expand All @@ -173,7 +195,7 @@ impl SeqScan {
return Ok(None);
};

Self::build_part_sources(part, &mut sources, None)?;
Self::build_part_sources(part, &mut sources, None, compaction)?;
}

Self::build_reader_from_sources(stream_ctx, sources, semaphore).await
Expand All @@ -187,6 +209,7 @@ impl SeqScan {
range_id: usize,
semaphore: Arc<Semaphore>,
metrics: &mut ScannerMetrics,
compaction: bool,
) -> Result<Option<BoxedBatchReader>> {
let mut sources = Vec::new();
let build_start = {
Expand All @@ -198,7 +221,12 @@ impl SeqScan {
};

let build_start = Instant::now();
Self::build_part_sources(part, &mut sources, stream_ctx.input.series_row_selector)?;
Self::build_part_sources(
part,
&mut sources,
stream_ctx.input.series_row_selector,
compaction,
)?;

build_start
};
Expand Down Expand Up @@ -281,12 +309,13 @@ impl SeqScan {
let stream_ctx = self.stream_ctx.clone();
let semaphore = self.semaphore.clone();
let partition_ranges = self.properties.partitions[partition].clone();
let compaction = self.compaction;
let stream = try_stream! {
let first_poll = stream_ctx.query_start.elapsed();

for partition_range in partition_ranges {
let maybe_reader =
Self::build_merge_reader(&stream_ctx, partition_range.identifier, semaphore.clone(), &mut metrics)
Self::build_merge_reader(&stream_ctx, partition_range.identifier, semaphore.clone(), &mut metrics, compaction)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
Expand Down Expand Up @@ -359,6 +388,7 @@ impl SeqScan {
};
let stream_ctx = self.stream_ctx.clone();
let semaphore = self.semaphore.clone();
let compaction = self.compaction;

// build stream
let stream = try_stream! {
Expand All @@ -379,6 +409,7 @@ impl SeqScan {
id,
semaphore.clone(),
&mut metrics,
compaction,
)
.await
.map_err(BoxedError::new)
Expand Down Expand Up @@ -439,7 +470,7 @@ impl SeqScan {
if part_list.0.is_none() {
let now = Instant::now();
let mut distributor = SeqDistributor::default();
input.prune_file_ranges(&mut distributor).await?;
let reader_metrics = input.prune_file_ranges(&mut distributor).await?;
distributor.append_mem_ranges(
&input.memtables,
Some(input.mapper.column_ids()),
Expand All @@ -451,7 +482,7 @@ impl SeqScan {
let build_part_cost = now.elapsed();
part_list.1 = build_part_cost;

metrics.observe_init_part(build_part_cost);
metrics.observe_init_part(build_part_cost, &reader_metrics);
} else {
// Updates the cost of building parts.
metrics.build_parts_cost = part_list.1;
Expand Down
5 changes: 3 additions & 2 deletions src/mito2/src/read/unordered_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ impl RegionScanner for UnorderedScan {
}
}

reader_metrics.observe_rows("unordered_scan_files");
metrics.total_cost = query_start.elapsed();
metrics.observe_metrics_on_finish();
debug!(
Expand Down Expand Up @@ -263,7 +264,7 @@ async fn maybe_init_parts(
if part_list.0.is_none() {
let now = Instant::now();
let mut distributor = UnorderedDistributor::default();
input.prune_file_ranges(&mut distributor).await?;
let reader_metrics = input.prune_file_ranges(&mut distributor).await?;
distributor.append_mem_ranges(
&input.memtables,
Some(input.mapper.column_ids()),
Expand All @@ -275,7 +276,7 @@ async fn maybe_init_parts(
let build_part_cost = now.elapsed();
part_list.1 = build_part_cost;

metrics.observe_init_part(build_part_cost);
metrics.observe_init_part(build_part_cost, &reader_metrics);
} else {
// Updates the cost of building parts.
metrics.build_parts_cost = part_list.1;
Expand Down
Loading
Loading