diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index f1baffcb7d31..5cb1b4ca6d8c 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -531,7 +531,10 @@ impl<'a> CompactionSstReaderBuilder<'a> { scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?); } - SeqScan::new(scan_input).build_reader().await + SeqScan::new(scan_input) + .with_compaction() + .build_reader() + .await } } diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index 04a34fc9ac56..edde28dab80c 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -56,6 +56,7 @@ use crate::error::{ use crate::memtable::BoxedBatchIterator; use crate::metrics::{READ_BATCHES_RETURN, READ_ROWS_RETURN, READ_STAGE_ELAPSED}; use crate::read::prune::PruneReader; +use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics}; /// Storage internal representation of a batch of rows for a primary key (time series). /// @@ -752,11 +753,13 @@ pub(crate) struct ScannerMetrics { num_batches: usize, /// Number of rows returned. num_rows: usize, + /// Filter related metrics for readers. + filter_metrics: ReaderFilterMetrics, } impl ScannerMetrics { /// Sets and observes metrics on initializing parts. - fn observe_init_part(&mut self, build_parts_cost: Duration) { + fn observe_init_part(&mut self, build_parts_cost: Duration, reader_metrics: &ReaderMetrics) { self.build_parts_cost = build_parts_cost; // Observes metrics. @@ -766,6 +769,11 @@ impl ScannerMetrics { READ_STAGE_ELAPSED .with_label_values(&["build_parts"]) .observe(self.build_parts_cost.as_secs_f64()); + + // We only call this once so we overwrite it directly. + self.filter_metrics = reader_metrics.filter_metrics; + // Observes filter metrics. + self.filter_metrics.observe(); } /// Observes metrics on scanner finish. diff --git a/src/mito2/src/read/prune.rs b/src/mito2/src/read/prune.rs index 58c81a1815a8..0ab2f9e2b5d5 100644 --- a/src/mito2/src/read/prune.rs +++ b/src/mito2/src/read/prune.rs @@ -97,13 +97,13 @@ impl PruneReader { let num_rows_before_filter = batch.num_rows(); let Some(batch_filtered) = self.context.precise_filter(batch)? else { // the entire batch is filtered out - self.metrics.num_rows_precise_filtered += num_rows_before_filter; + self.metrics.filter_metrics.num_rows_precise_filtered += num_rows_before_filter; return Ok(None); }; // update metric let filtered_rows = num_rows_before_filter - batch_filtered.num_rows(); - self.metrics.num_rows_precise_filtered += filtered_rows; + self.metrics.filter_metrics.num_rows_precise_filtered += filtered_rows; if !batch_filtered.is_empty() { Ok(Some(batch_filtered)) diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 5e27ebe6df77..d0bef554727f 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -50,6 +50,7 @@ use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef; use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder; use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef; use crate::sst::parquet::file_range::FileRange; +use crate::sst::parquet::reader::ReaderMetrics; /// A scanner scans a region and returns a [SendableRecordBatchStream]. pub(crate) enum Scanner { @@ -606,8 +607,9 @@ impl ScanInput { pub(crate) async fn prune_file_ranges( &self, collector: &mut impl FileRangeCollector, - ) -> Result<()> { + ) -> Result { let mut file_prune_cost = Duration::ZERO; + let mut reader_metrics = ReaderMetrics::default(); for file in &self.files { let prune_start = Instant::now(); let res = self @@ -620,7 +622,7 @@ impl ScanInput { .inverted_index_applier(self.inverted_index_applier.clone()) .fulltext_index_applier(self.fulltext_index_applier.clone()) .expected_metadata(Some(self.mapper.metadata().clone())) - .build_reader_input() + .build_reader_input(&mut reader_metrics) .await; file_prune_cost += prune_start.elapsed(); let (mut file_range_ctx, row_groups) = match res { @@ -665,7 +667,7 @@ impl ScanInput { file_prune_cost ); - Ok(()) + Ok(reader_metrics) } /// Scans the input source in another task and sends batches to the sender. diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 551d304f2f16..954580986ec2 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -59,6 +59,8 @@ pub struct SeqScan { /// Semaphore to control scan parallelism of files. /// Streams created by the scanner share the same semaphore. semaphore: Arc, + /// The scanner is used for compaction. + compaction: bool, } impl SeqScan { @@ -75,9 +77,16 @@ impl SeqScan { properties, stream_ctx, semaphore: Arc::new(Semaphore::new(parallelism)), + compaction: false, } } + /// Sets the scanner to be used for compaction. + pub(crate) fn with_compaction(mut self) -> Self { + self.compaction = true; + self + } + /// Builds a stream for the query. /// /// The returned stream is not partitioned and will contains all the data. If want @@ -97,9 +106,13 @@ impl SeqScan { prepare_scan_cost: self.stream_ctx.query_start.elapsed(), ..Default::default() }; - let maybe_reader = - Self::build_all_merge_reader(&self.stream_ctx, self.semaphore.clone(), &mut metrics) - .await?; + let maybe_reader = Self::build_all_merge_reader( + &self.stream_ctx, + self.semaphore.clone(), + &mut metrics, + self.compaction, + ) + .await?; // Safety: `build_merge_reader()` always returns a reader if partition is None. let reader = maybe_reader.unwrap(); Ok(Box::new(reader)) @@ -110,6 +123,7 @@ impl SeqScan { part: &ScanPart, sources: &mut Vec, row_selector: Option, + compaction: bool, ) -> Result<()> { sources.reserve(part.memtable_ranges.len() + part.file_ranges.len()); // Read memtables. @@ -117,6 +131,11 @@ impl SeqScan { let iter = mem.build_iter()?; sources.push(Source::Iter(iter)); } + let read_type = if compaction { + "compaction" + } else { + "seq_scan_files" + }; // Read files. for file in &part.file_ranges { if file.is_empty() { @@ -148,6 +167,8 @@ impl SeqScan { "Seq scan region {}, file {}, {} ranges finished, metrics: {:?}", region_id, file_id, range_num, reader_metrics ); + // Reports metrics. + reader_metrics.observe_rows(read_type); }; let stream = Box::pin(stream); sources.push(Source::Stream(stream)); @@ -161,6 +182,7 @@ impl SeqScan { stream_ctx: &StreamContext, semaphore: Arc, metrics: &mut ScannerMetrics, + compaction: bool, ) -> Result> { // initialize parts list let mut parts = stream_ctx.parts.lock().await; @@ -173,7 +195,7 @@ impl SeqScan { return Ok(None); }; - Self::build_part_sources(part, &mut sources, None)?; + Self::build_part_sources(part, &mut sources, None, compaction)?; } Self::build_reader_from_sources(stream_ctx, sources, semaphore).await @@ -187,6 +209,7 @@ impl SeqScan { range_id: usize, semaphore: Arc, metrics: &mut ScannerMetrics, + compaction: bool, ) -> Result> { let mut sources = Vec::new(); let build_start = { @@ -198,7 +221,12 @@ impl SeqScan { }; let build_start = Instant::now(); - Self::build_part_sources(part, &mut sources, stream_ctx.input.series_row_selector)?; + Self::build_part_sources( + part, + &mut sources, + stream_ctx.input.series_row_selector, + compaction, + )?; build_start }; @@ -281,12 +309,13 @@ impl SeqScan { let stream_ctx = self.stream_ctx.clone(); let semaphore = self.semaphore.clone(); let partition_ranges = self.properties.partitions[partition].clone(); + let compaction = self.compaction; let stream = try_stream! { let first_poll = stream_ctx.query_start.elapsed(); for partition_range in partition_ranges { let maybe_reader = - Self::build_merge_reader(&stream_ctx, partition_range.identifier, semaphore.clone(), &mut metrics) + Self::build_merge_reader(&stream_ctx, partition_range.identifier, semaphore.clone(), &mut metrics, compaction) .await .map_err(BoxedError::new) .context(ExternalSnafu)?; @@ -359,6 +388,7 @@ impl SeqScan { }; let stream_ctx = self.stream_ctx.clone(); let semaphore = self.semaphore.clone(); + let compaction = self.compaction; // build stream let stream = try_stream! { @@ -379,6 +409,7 @@ impl SeqScan { id, semaphore.clone(), &mut metrics, + compaction, ) .await .map_err(BoxedError::new) @@ -439,7 +470,7 @@ impl SeqScan { if part_list.0.is_none() { let now = Instant::now(); let mut distributor = SeqDistributor::default(); - input.prune_file_ranges(&mut distributor).await?; + let reader_metrics = input.prune_file_ranges(&mut distributor).await?; distributor.append_mem_ranges( &input.memtables, Some(input.mapper.column_ids()), @@ -451,7 +482,7 @@ impl SeqScan { let build_part_cost = now.elapsed(); part_list.1 = build_part_cost; - metrics.observe_init_part(build_part_cost); + metrics.observe_init_part(build_part_cost, &reader_metrics); } else { // Updates the cost of building parts. metrics.build_parts_cost = part_list.1; diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index 1d8a0cd8ebd3..3bca8e0afea8 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -211,6 +211,7 @@ impl RegionScanner for UnorderedScan { } } + reader_metrics.observe_rows("unordered_scan_files"); metrics.total_cost = query_start.elapsed(); metrics.observe_metrics_on_finish(); debug!( @@ -263,7 +264,7 @@ async fn maybe_init_parts( if part_list.0.is_none() { let now = Instant::now(); let mut distributor = UnorderedDistributor::default(); - input.prune_file_ranges(&mut distributor).await?; + let reader_metrics = input.prune_file_ranges(&mut distributor).await?; distributor.append_mem_ranges( &input.memtables, Some(input.mapper.column_ids()), @@ -275,7 +276,7 @@ async fn maybe_init_parts( let build_part_cost = now.elapsed(); part_list.1 = build_part_cost; - metrics.observe_init_part(build_part_cost); + metrics.observe_init_part(build_part_cost, &reader_metrics); } else { // Updates the cost of building parts. metrics.build_parts_cost = part_list.1; diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 4cca1c1c29e2..766dab04290e 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -174,14 +174,19 @@ impl ParquetReaderBuilder { /// /// This needs to perform IO operation. pub async fn build(&self) -> Result { - let (context, row_groups) = self.build_reader_input().await?; + let mut metrics = ReaderMetrics::default(); + + let (context, row_groups) = self.build_reader_input(&mut metrics).await?; ParquetReader::new(Arc::new(context), row_groups).await } /// Builds a [FileRangeContext] and collects row groups to read. /// /// This needs to perform IO operation. - pub(crate) async fn build_reader_input(&self) -> Result<(FileRangeContext, RowGroupMap)> { + pub(crate) async fn build_reader_input( + &self, + metrics: &mut ReaderMetrics, + ) -> Result<(FileRangeContext, RowGroupMap)> { let start = Instant::now(); let file_path = self.file_handle.file_path(&self.file_dir); @@ -219,10 +224,8 @@ impl ParquetReaderBuilder { parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint) .context(ReadParquetSnafu { path: &file_path })?; - let mut metrics = ReaderMetrics::default(); - let row_groups = self - .row_groups_to_read(&read_format, &parquet_meta, &mut metrics) + .row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics) .await; let reader_builder = RowGroupReaderBuilder { @@ -336,7 +339,7 @@ impl ParquetReaderBuilder { &self, read_format: &ReadFormat, parquet_meta: &ParquetMetaData, - metrics: &mut ReaderMetrics, + metrics: &mut ReaderFilterMetrics, ) -> BTreeMap> { let num_row_groups = parquet_meta.num_row_groups(); let num_rows = parquet_meta.file_metadata().num_rows(); @@ -382,7 +385,7 @@ impl ParquetReaderBuilder { row_group_size: usize, parquet_meta: &ParquetMetaData, output: &mut BTreeMap>, - metrics: &mut ReaderMetrics, + metrics: &mut ReaderFilterMetrics, ) -> bool { let Some(index_applier) = &self.fulltext_index_applier else { return false; @@ -462,7 +465,7 @@ impl ParquetReaderBuilder { row_group_size: usize, parquet_meta: &ParquetMetaData, output: &mut BTreeMap>, - metrics: &mut ReaderMetrics, + metrics: &mut ReaderFilterMetrics, ) -> bool { let Some(index_applier) = &self.inverted_index_applier else { return false; @@ -529,7 +532,7 @@ impl ParquetReaderBuilder { read_format: &ReadFormat, parquet_meta: &ParquetMetaData, output: &mut BTreeMap>, - metrics: &mut ReaderMetrics, + metrics: &mut ReaderFilterMetrics, ) -> bool { let Some(predicate) = &self.predicate else { return false; @@ -724,9 +727,9 @@ fn time_range_to_predicate( Ok(predicates) } -/// Parquet reader metrics. -#[derive(Debug, Default, Clone)] -pub(crate) struct ReaderMetrics { +/// Metrics of filtering rows groups and rows. +#[derive(Debug, Default, Clone, Copy)] +pub(crate) struct ReaderFilterMetrics { /// Number of row groups before filtering. pub(crate) num_row_groups_before_filtering: usize, /// Number of row groups filtered by fulltext index. @@ -743,6 +746,57 @@ pub(crate) struct ReaderMetrics { pub(crate) num_rows_in_row_group_fulltext_index_filtered: usize, /// Number of rows in row group filtered by inverted index. pub(crate) num_rows_in_row_group_inverted_index_filtered: usize, +} + +impl ReaderFilterMetrics { + /// Adds `other` metrics to this metrics. + pub(crate) fn merge_from(&mut self, other: &ReaderFilterMetrics) { + self.num_row_groups_before_filtering += other.num_row_groups_before_filtering; + self.num_row_groups_fulltext_index_filtered += other.num_row_groups_fulltext_index_filtered; + self.num_row_groups_inverted_index_filtered += other.num_row_groups_inverted_index_filtered; + self.num_row_groups_min_max_filtered += other.num_row_groups_min_max_filtered; + self.num_rows_precise_filtered += other.num_rows_precise_filtered; + self.num_rows_in_row_group_before_filtering += other.num_rows_in_row_group_before_filtering; + self.num_rows_in_row_group_fulltext_index_filtered += + other.num_rows_in_row_group_fulltext_index_filtered; + self.num_rows_in_row_group_inverted_index_filtered += + other.num_rows_in_row_group_inverted_index_filtered; + } + + /// Reports metrics. + pub(crate) fn observe(&self) { + READ_ROW_GROUPS_TOTAL + .with_label_values(&["before_filtering"]) + .inc_by(self.num_row_groups_before_filtering as u64); + READ_ROW_GROUPS_TOTAL + .with_label_values(&["fulltext_index_filtered"]) + .inc_by(self.num_row_groups_fulltext_index_filtered as u64); + READ_ROW_GROUPS_TOTAL + .with_label_values(&["inverted_index_filtered"]) + .inc_by(self.num_row_groups_inverted_index_filtered as u64); + READ_ROW_GROUPS_TOTAL + .with_label_values(&["minmax_index_filtered"]) + .inc_by(self.num_row_groups_min_max_filtered as u64); + PRECISE_FILTER_ROWS_TOTAL + .with_label_values(&["parquet"]) + .inc_by(self.num_rows_precise_filtered as u64); + READ_ROWS_IN_ROW_GROUP_TOTAL + .with_label_values(&["before_filtering"]) + .inc_by(self.num_rows_in_row_group_before_filtering as u64); + READ_ROWS_IN_ROW_GROUP_TOTAL + .with_label_values(&["fulltext_index_filtered"]) + .inc_by(self.num_rows_in_row_group_fulltext_index_filtered as u64); + READ_ROWS_IN_ROW_GROUP_TOTAL + .with_label_values(&["inverted_index_filtered"]) + .inc_by(self.num_rows_in_row_group_inverted_index_filtered as u64); + } +} + +/// Parquet reader metrics. +#[derive(Debug, Default, Clone)] +pub(crate) struct ReaderMetrics { + /// Filtered row groups and rows metrics. + pub(crate) filter_metrics: ReaderFilterMetrics, /// Duration to build the parquet reader. pub(crate) build_cost: Duration, /// Duration to scan the reader. @@ -758,22 +812,20 @@ pub(crate) struct ReaderMetrics { impl ReaderMetrics { /// Adds `other` metrics to this metrics. pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) { - self.num_row_groups_before_filtering += other.num_row_groups_before_filtering; - self.num_row_groups_fulltext_index_filtered += other.num_row_groups_fulltext_index_filtered; - self.num_row_groups_inverted_index_filtered += other.num_row_groups_inverted_index_filtered; - self.num_row_groups_min_max_filtered += other.num_row_groups_min_max_filtered; - self.num_rows_precise_filtered += other.num_rows_precise_filtered; - self.num_rows_in_row_group_before_filtering += other.num_rows_in_row_group_before_filtering; - self.num_rows_in_row_group_fulltext_index_filtered += - other.num_rows_in_row_group_fulltext_index_filtered; - self.num_rows_in_row_group_inverted_index_filtered += - other.num_rows_in_row_group_inverted_index_filtered; + self.filter_metrics.merge_from(&other.filter_metrics); self.build_cost += other.build_cost; self.scan_cost += other.scan_cost; self.num_record_batches += other.num_record_batches; self.num_batches += other.num_batches; self.num_rows += other.num_rows; } + + /// Reports total rows. + pub(crate) fn observe_rows(&self, read_type: &str) { + READ_ROWS_TOTAL + .with_label_values(&[read_type]) + .inc_by(self.num_rows as u64); + } } /// Builder to build a [ParquetRecordBatchReader] for a row group. @@ -1006,10 +1058,12 @@ impl Drop for ParquetReader { self.context.reader_builder().file_handle.region_id(), self.context.reader_builder().file_handle.file_id(), self.context.reader_builder().file_handle.time_range(), - metrics.num_row_groups_before_filtering - - metrics.num_row_groups_inverted_index_filtered - - metrics.num_row_groups_min_max_filtered, - metrics.num_row_groups_before_filtering, + metrics.filter_metrics.num_row_groups_before_filtering + - metrics + .filter_metrics + .num_row_groups_inverted_index_filtered + - metrics.filter_metrics.num_row_groups_min_max_filtered, + metrics.filter_metrics.num_row_groups_before_filtering, metrics ); @@ -1020,33 +1074,8 @@ impl Drop for ParquetReader { READ_STAGE_ELAPSED .with_label_values(&["scan_row_groups"]) .observe(metrics.scan_cost.as_secs_f64()); - READ_ROWS_TOTAL - .with_label_values(&["parquet"]) - .inc_by(metrics.num_rows as u64); - READ_ROW_GROUPS_TOTAL - .with_label_values(&["before_filtering"]) - .inc_by(metrics.num_row_groups_before_filtering as u64); - READ_ROW_GROUPS_TOTAL - .with_label_values(&["fulltext_index_filtered"]) - .inc_by(metrics.num_row_groups_fulltext_index_filtered as u64); - READ_ROW_GROUPS_TOTAL - .with_label_values(&["inverted_index_filtered"]) - .inc_by(metrics.num_row_groups_inverted_index_filtered as u64); - READ_ROW_GROUPS_TOTAL - .with_label_values(&["minmax_index_filtered"]) - .inc_by(metrics.num_row_groups_min_max_filtered as u64); - PRECISE_FILTER_ROWS_TOTAL - .with_label_values(&["parquet"]) - .inc_by(metrics.num_rows_precise_filtered as u64); - READ_ROWS_IN_ROW_GROUP_TOTAL - .with_label_values(&["before_filtering"]) - .inc_by(metrics.num_rows_in_row_group_before_filtering as u64); - READ_ROWS_IN_ROW_GROUP_TOTAL - .with_label_values(&["fulltext_index_filtered"]) - .inc_by(metrics.num_rows_in_row_group_fulltext_index_filtered as u64); - READ_ROWS_IN_ROW_GROUP_TOTAL - .with_label_values(&["inverted_index_filtered"]) - .inc_by(metrics.num_rows_in_row_group_inverted_index_filtered as u64); + metrics.observe_rows("parquet_reader"); + metrics.filter_metrics.observe(); } }