diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index e79a3290bee4..668eb51023f0 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -14,6 +14,7 @@ //! Memtables are write buffers for regions. +use std::collections::BTreeMap; use std::fmt; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; @@ -71,6 +72,8 @@ pub struct MemtableStats { time_range: Option<(Timestamp, Timestamp)>, /// Total rows in memtable num_rows: usize, + /// Total number of ranges in the memtable. + num_ranges: usize, } impl MemtableStats { @@ -95,6 +98,11 @@ impl MemtableStats { pub fn num_rows(&self) -> usize { self.num_rows } + + /// Returns the number of ranges in the memtable. + pub fn num_ranges(&self) -> usize { + self.num_ranges + } } pub type BoxedBatchIterator = Box> + Send>; @@ -123,11 +131,12 @@ pub trait Memtable: Send + Sync + fmt::Debug { ) -> Result; /// Returns the ranges in the memtable. + /// The returned map contains the range id and the range after applying the predicate. fn ranges( &self, projection: Option<&[ColumnId]>, predicate: Option, - ) -> Vec; + ) -> BTreeMap; /// Returns true if the memtable is empty. fn is_empty(&self) -> bool; @@ -332,7 +341,6 @@ impl MemtableRangeContext { pub struct MemtableRange { /// Shared context. context: MemtableRangeContextRef, - // TODO(yingwen): Id to identify the range in the memtable. } impl MemtableRange { diff --git a/src/mito2/src/memtable/bulk.rs b/src/mito2/src/memtable/bulk.rs index 3ed2ed1347d8..46e757f3df16 100644 --- a/src/mito2/src/memtable/bulk.rs +++ b/src/mito2/src/memtable/bulk.rs @@ -14,6 +14,7 @@ //! Memtable implementation for bulk load +use std::collections::BTreeMap; use std::sync::{Arc, RwLock}; use store_api::metadata::RegionMetadataRef; @@ -67,7 +68,7 @@ impl Memtable for BulkMemtable { &self, _projection: Option<&[ColumnId]>, _predicate: Option, - ) -> Vec { + ) -> BTreeMap { todo!() } diff --git a/src/mito2/src/memtable/partition_tree.rs b/src/mito2/src/memtable/partition_tree.rs index e320503886be..3309e66b7e36 100644 --- a/src/mito2/src/memtable/partition_tree.rs +++ b/src/mito2/src/memtable/partition_tree.rs @@ -24,6 +24,7 @@ mod shard; mod shard_builder; mod tree; +use std::collections::BTreeMap; use std::fmt; use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering}; use std::sync::Arc; @@ -176,7 +177,7 @@ impl Memtable for PartitionTreeMemtable { &self, projection: Option<&[ColumnId]>, predicate: Option, - ) -> Vec { + ) -> BTreeMap { let projection = projection.map(|ids| ids.to_vec()); let builder = Box::new(PartitionTreeIterBuilder { tree: self.tree.clone(), @@ -185,7 +186,7 @@ impl Memtable for PartitionTreeMemtable { }); let context = Arc::new(MemtableRangeContext::new(self.id, builder)); - vec![MemtableRange::new(context)] + [(0, MemtableRange::new(context))].into() } fn is_empty(&self) -> bool { @@ -207,6 +208,7 @@ impl Memtable for PartitionTreeMemtable { estimated_bytes, time_range: None, num_rows: 0, + num_ranges: 0, }; } @@ -225,6 +227,7 @@ impl Memtable for PartitionTreeMemtable { estimated_bytes, time_range: Some((min_timestamp, max_timestamp)), num_rows: self.num_rows.load(Ordering::Relaxed), + num_ranges: 1, } } diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 6d5fbb33a079..d82db55730ed 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -287,7 +287,7 @@ impl Memtable for TimeSeriesMemtable { &self, projection: Option<&[ColumnId]>, predicate: Option, - ) -> Vec { + ) -> BTreeMap { let projection = if let Some(projection) = projection { projection.iter().copied().collect() } else { @@ -305,7 +305,7 @@ impl Memtable for TimeSeriesMemtable { }); let context = Arc::new(MemtableRangeContext::new(self.id, builder)); - vec![MemtableRange::new(context)] + [(0, MemtableRange::new(context))].into() } fn is_empty(&self) -> bool { @@ -327,6 +327,7 @@ impl Memtable for TimeSeriesMemtable { estimated_bytes, time_range: None, num_rows: 0, + num_ranges: 0, }; } let ts_type = self @@ -343,6 +344,7 @@ impl Memtable for TimeSeriesMemtable { estimated_bytes, time_range: Some((min_timestamp, max_timestamp)), num_rows: self.num_rows.load(Ordering::Relaxed), + num_ranges: 1, } } diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index edde28dab80c..d9a6fae7461e 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -20,6 +20,7 @@ pub mod last_row; pub mod merge; pub mod projection; pub(crate) mod prune; +pub(crate) mod range; pub(crate) mod scan_region; pub(crate) mod seq_scan; pub(crate) mod unordered_scan; @@ -753,6 +754,10 @@ pub(crate) struct ScannerMetrics { num_batches: usize, /// Number of rows returned. num_rows: usize, + /// Number of mem ranges scanned. + num_mem_ranges: usize, + /// Number of file ranges scanned. + num_file_ranges: usize, /// Filter related metrics for readers. filter_metrics: ReaderFilterMetrics, } diff --git a/src/mito2/src/read/dedup.rs b/src/mito2/src/read/dedup.rs index ddc96049e720..c77d0c3fabe1 100644 --- a/src/mito2/src/read/dedup.rs +++ b/src/mito2/src/read/dedup.rs @@ -400,7 +400,6 @@ pub(crate) struct LastNonNull { impl LastNonNull { /// Creates a new strategy with the given `filter_deleted` flag. - #[allow(dead_code)] pub(crate) fn new(filter_deleted: bool) -> Self { Self { buffer: None, diff --git a/src/mito2/src/read/range.rs b/src/mito2/src/read/range.rs new file mode 100644 index 000000000000..8408aad7b2d8 --- /dev/null +++ b/src/mito2/src/read/range.rs @@ -0,0 +1,266 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Structs for partition ranges. + +use smallvec::{smallvec, SmallVec}; + +use crate::memtable::MemtableRef; +use crate::read::scan_region::ScanInput; +use crate::sst::file::{overlaps, FileHandle, FileTimeRange}; +use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE; + +const ALL_ROW_GROUPS: i64 = -1; + +/// Index to access a row group. +#[derive(Clone, Copy, PartialEq)] +pub(crate) struct RowGroupIndex { + /// Index to the memtable/file. + pub(crate) index: usize, + /// Row group index in the file. + /// Negative index indicates all row groups. + pub(crate) row_group_index: i64, +} + +/// Meta data of a partition range. +/// If the scanner is [UnorderedScan], each meta only has one row group or memtable. +/// If the scanner is [SeqScan], each meta may have multiple row groups and memtables. +pub(crate) struct RangeMeta { + /// The time range of the range. + pub(crate) time_range: FileTimeRange, + /// Indices to memtables or files. + indices: SmallVec<[usize; 2]>, + /// Indices to memtable/file row groups that this range scans. + pub(crate) row_group_indices: SmallVec<[RowGroupIndex; 2]>, + /// Estimated number of rows in the range. This can be 0 if the statistics are not available. + pub(crate) num_rows: usize, +} + +impl RangeMeta { + /// Creates a list of ranges from the `input` for seq scan. + pub(crate) fn seq_scan_ranges(input: &ScanInput) -> Vec { + let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len()); + Self::push_seq_mem_ranges(&input.memtables, &mut ranges); + Self::push_seq_file_ranges(input.memtables.len(), &input.files, &mut ranges); + + let ranges = group_ranges_for_seq_scan(ranges); + maybe_split_ranges_for_seq_scan(ranges) + } + + /// Creates a list of ranges from the `input` for unordered scan. + pub(crate) fn unordered_scan_ranges(input: &ScanInput) -> Vec { + let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len()); + Self::push_unordered_mem_ranges(&input.memtables, &mut ranges); + Self::push_unordered_file_ranges(input.memtables.len(), &input.files, &mut ranges); + + ranges + } + + /// Returns true if the time range of given `meta` overlaps with the time range of this meta. + pub(crate) fn overlaps(&self, meta: &RangeMeta) -> bool { + overlaps(&self.time_range, &meta.time_range) + } + + /// Merges given `meta` to this meta. + /// It assumes that the time ranges overlap and they don't have the same file or memtable index. + pub(crate) fn merge(&mut self, mut other: RangeMeta) { + debug_assert!(self.overlaps(&other)); + debug_assert!(self.indices.iter().all(|idx| !other.indices.contains(idx))); + debug_assert!(self + .row_group_indices + .iter() + .all(|idx| !other.row_group_indices.contains(idx))); + + self.time_range = ( + self.time_range.0.min(other.time_range.0), + self.time_range.1.max(other.time_range.1), + ); + self.indices.append(&mut other.indices); + self.row_group_indices.append(&mut other.row_group_indices); + self.num_rows += other.num_rows; + } + + /// Returns true if we can split the range into multiple smaller ranges and + /// still preserve the order for [SeqScan]. + pub(crate) fn can_split_preserve_order(&self) -> bool { + // Only one source and multiple row groups. + self.indices.len() == 1 && self.row_group_indices.len() > 1 + } + + /// Splits the range if it can preserve the order. + pub(crate) fn maybe_split(self, output: &mut Vec) { + if self.can_split_preserve_order() { + output.reserve(self.row_group_indices.len()); + let num_rows = self.num_rows / self.row_group_indices.len(); + // Splits by row group. + for index in self.row_group_indices { + output.push(RangeMeta { + time_range: self.time_range, + indices: self.indices.clone(), + row_group_indices: smallvec![index], + num_rows, + }); + } + } else { + output.push(self); + } + } + + fn push_unordered_mem_ranges(memtables: &[MemtableRef], ranges: &mut Vec) { + // For append mode, we can parallelize reading memtables. + for (memtable_index, memtable) in memtables.iter().enumerate() { + let stats = memtable.stats(); + let Some(time_range) = stats.time_range() else { + continue; + }; + for row_group_index in 0..stats.num_ranges() { + let num_rows = stats.num_rows() / stats.num_ranges(); + ranges.push(RangeMeta { + time_range, + indices: smallvec![memtable_index], + row_group_indices: smallvec![RowGroupIndex { + index: memtable_index, + row_group_index: row_group_index as i64, + }], + num_rows, + }); + } + } + } + + fn push_unordered_file_ranges( + num_memtables: usize, + files: &[FileHandle], + ranges: &mut Vec, + ) { + // For append mode, we can parallelize reading row groups. + for (i, file) in files.iter().enumerate() { + let file_index = num_memtables + i; + if file.meta_ref().num_row_groups > 0 { + // Scans each row group. + for row_group_index in 0..file.meta_ref().num_row_groups { + ranges.push(RangeMeta { + time_range: file.time_range(), + indices: smallvec![file_index], + row_group_indices: smallvec![RowGroupIndex { + index: file_index, + row_group_index: row_group_index as i64, + }], + num_rows: DEFAULT_ROW_GROUP_SIZE, + }); + } + } else { + // If we don't known the number of row groups in advance, scan all row groups. + ranges.push(RangeMeta { + time_range: file.time_range(), + indices: smallvec![file_index], + row_group_indices: smallvec![RowGroupIndex { + index: file_index, + row_group_index: ALL_ROW_GROUPS, + }], + // This may be 0. + num_rows: file.meta_ref().num_rows as usize, + }); + } + } + } + + fn push_seq_mem_ranges(memtables: &[MemtableRef], ranges: &mut Vec) { + // For non append-only mode, each range only contains one memtable. + for (i, memtable) in memtables.iter().enumerate() { + let stats = memtable.stats(); + let Some(time_range) = stats.time_range() else { + continue; + }; + ranges.push(RangeMeta { + time_range, + indices: smallvec![i], + row_group_indices: smallvec![RowGroupIndex { + index: i, + row_group_index: ALL_ROW_GROUPS, + }], + num_rows: stats.num_rows(), + }); + } + } + + fn push_seq_file_ranges( + num_memtables: usize, + files: &[FileHandle], + ranges: &mut Vec, + ) { + // For non append-only mode, each range only contains one file. + for (i, file) in files.iter().enumerate() { + let file_index = num_memtables + i; + ranges.push(RangeMeta { + time_range: file.time_range(), + indices: smallvec![file_index], + row_group_indices: smallvec![RowGroupIndex { + index: file_index, + row_group_index: ALL_ROW_GROUPS, + }], + num_rows: file.meta_ref().num_rows as usize, + }); + } + } +} + +/// Groups ranges by time range. +/// It assumes each input range only contains a file or a memtable. +fn group_ranges_for_seq_scan(mut ranges: Vec) -> Vec { + if ranges.is_empty() { + return ranges; + } + + // Sorts ranges by time range (start asc, end desc). + ranges.sort_unstable_by(|a, b| { + let l = a.time_range; + let r = b.time_range; + l.0.cmp(&r.0).then_with(|| r.1.cmp(&l.1)) + }); + let mut range_in_progress = None; + // Parts with exclusive time ranges. + let mut exclusive_ranges = Vec::with_capacity(ranges.len()); + for range in ranges { + let Some(mut prev_range) = range_in_progress.take() else { + // This is the new range to process. + range_in_progress = Some(range); + continue; + }; + + if prev_range.overlaps(&range) { + prev_range.merge(range); + range_in_progress = Some(prev_range); + } else { + exclusive_ranges.push(prev_range); + range_in_progress = Some(range); + } + } + if let Some(range) = range_in_progress { + exclusive_ranges.push(range); + } + + exclusive_ranges +} + +/// Splits the range into multiple smaller ranges. +/// It assumes the input `ranges` list is created by [group_ranges_for_seq_scan()]. +fn maybe_split_ranges_for_seq_scan(ranges: Vec) -> Vec { + let mut new_ranges = Vec::with_capacity(ranges.len()); + for range in ranges { + range.maybe_split(&mut new_ranges); + } + + new_ranges +} diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index d28562c554c3..28deb1186ebd 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -14,9 +14,9 @@ //! Scans a region according to the scan request. -use std::collections::HashSet; +use std::collections::{BTreeMap, HashSet}; use std::fmt; -use std::sync::Arc; +use std::sync::{Arc, Mutex as StdMutex}; use std::time::{Duration, Instant}; use common_error::ext::BoxedError; @@ -26,6 +26,7 @@ use common_time::range::TimestampRange; use common_time::Timestamp; use datafusion::physical_plan::DisplayFormatType; use datafusion_expr::utils::expr_to_columns; +use parquet::arrow::arrow_reader::RowSelection; use smallvec::SmallVec; use store_api::region_engine::{PartitionRange, RegionScannerRef}; use store_api::storage::{ScanRequest, TimeSeriesRowSelector}; @@ -41,6 +42,7 @@ use crate::memtable::{MemtableRange, MemtableRef}; use crate::metrics::READ_SST_COUNT; use crate::read::compat::{self, CompatBatch}; use crate::read::projection::ProjectionMapper; +use crate::read::range::{RangeMeta, RowGroupIndex}; use crate::read::seq_scan::SeqScan; use crate::read::unordered_scan::UnorderedScan; use crate::read::{Batch, Source}; @@ -51,7 +53,7 @@ use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBui use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef; use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder; use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef; -use crate::sst::parquet::file_range::FileRange; +use crate::sst::parquet::file_range::{FileRange, FileRangeContextRef}; use crate::sst::parquet::reader::ReaderMetrics; /// A scanner scans a region and returns a [SendableRecordBatchStream]. @@ -643,6 +645,61 @@ impl ScanInput { Ok(sources) } + /// Prunes a memtable to scan and returns the builder to build readers. + fn prune_memtable(&self, mem_index: usize) -> MemRangeBuilder { + let memtable = &self.memtables[mem_index]; + let row_groups = memtable.ranges(Some(self.mapper.column_ids()), self.predicate.clone()); + MemRangeBuilder { row_groups } + } + + /// Prunes a file to scan and returns the builder to build readers. + async fn prune_file( + &self, + file_index: usize, + reader_metrics: &mut ReaderMetrics, + ) -> Result { + let file = &self.files[file_index]; + let res = self + .access_layer + .read_sst(file.clone()) + .predicate(self.predicate.clone()) + .time_range(self.time_range) + .projection(Some(self.mapper.column_ids().to_vec())) + .cache(self.cache_manager.clone()) + .inverted_index_applier(self.inverted_index_applier.clone()) + .fulltext_index_applier(self.fulltext_index_applier.clone()) + .expected_metadata(Some(self.mapper.metadata().clone())) + .build_reader_input(reader_metrics) + .await; + let (mut file_range_ctx, row_groups) = match res { + Ok(x) => x, + Err(e) => { + if e.is_object_not_found() && self.ignore_file_not_found { + error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id()); + return Ok(FileRangeBuilder::default()); + } else { + return Err(e); + } + } + }; + if !compat::has_same_columns( + self.mapper.metadata(), + file_range_ctx.read_format().metadata(), + ) { + // They have different schema. We need to adapt the batch first so the + // mapper can convert it. + let compat = CompatBatch::new( + &self.mapper, + file_range_ctx.read_format().metadata().clone(), + )?; + file_range_ctx.set_compat_batch(Some(compat)); + } + Ok(FileRangeBuilder { + context: Some(Arc::new(file_range_ctx)), + row_groups, + }) + } + /// Prunes file ranges to scan and adds them to the `collector`. pub(crate) async fn prune_file_ranges( &self, @@ -749,51 +806,6 @@ impl ScanInput { pub(crate) fn predicate(&self) -> Option { self.predicate.clone() } - - /// Retrieves [`PartitionRange`] from memtable and files - pub(crate) fn partition_ranges(&self) -> Vec { - let mut id = 0; - let mut container = Vec::with_capacity(self.memtables.len() + self.files.len()); - - for memtable in &self.memtables { - let range = PartitionRange { - // TODO(ruihang): filter out empty memtables in the future. - start: memtable.stats().time_range().unwrap().0, - end: memtable.stats().time_range().unwrap().1, - num_rows: memtable.stats().num_rows(), - identifier: id, - }; - id += 1; - container.push(range); - } - - for file in &self.files { - if self.append_mode { - // For append mode, we can parallelize reading row groups. - for _ in 0..file.meta_ref().num_row_groups { - let range = PartitionRange { - start: file.time_range().0, - end: file.time_range().1, - num_rows: file.num_rows(), - identifier: id, - }; - id += 1; - container.push(range); - } - } else { - let range = PartitionRange { - start: file.meta_ref().time_range.0, - end: file.meta_ref().time_range.1, - num_rows: file.meta_ref().num_rows as usize, - identifier: id, - }; - id += 1; - container.push(range); - } - } - - container - } } #[cfg(test)] @@ -967,6 +979,10 @@ pub(crate) struct StreamContext { /// The scanner builds parts to scan from the input lazily. /// The mutex is used to ensure the parts are only built once. pub(crate) parts: Mutex<(ScanPartList, Duration)>, + /// Metadata for partition ranges. + pub(crate) ranges: Vec, + /// Lists of range builders. + range_builders: RangeBuilderList, // Metrics: /// The start time of the query. @@ -974,17 +990,77 @@ pub(crate) struct StreamContext { } impl StreamContext { - /// Creates a new [StreamContext]. - pub(crate) fn new(input: ScanInput) -> Self { + /// Creates a new [StreamContext] for [SeqScan]. + pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self { + let query_start = input.query_start.unwrap_or_else(Instant::now); + let ranges = RangeMeta::seq_scan_ranges(&input); + READ_SST_COUNT.observe(input.files.len() as f64); + let range_builders = RangeBuilderList::new(input.memtables.len(), input.files.len()); + + Self { + input, + parts: Mutex::new((ScanPartList::default(), Duration::default())), + ranges, + range_builders, + query_start, + } + } + + /// Creates a new [StreamContext] for [UnorderedScan]. + pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self { let query_start = input.query_start.unwrap_or_else(Instant::now); + let ranges = RangeMeta::unordered_scan_ranges(&input); + READ_SST_COUNT.observe(input.files.len() as f64); + let range_builders = RangeBuilderList::new(input.memtables.len(), input.files.len()); Self { input, parts: Mutex::new((ScanPartList::default(), Duration::default())), + ranges, + range_builders, query_start, } } + /// Returns true if the index refers to a memtable. + pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool { + self.input.memtables.len() > index.index + } + + /// Creates file ranges to scan. + pub(crate) async fn build_file_ranges( + &self, + index: RowGroupIndex, + ranges: &mut Vec, + reader_metrics: &mut ReaderMetrics, + ) -> Result<()> { + ranges.clear(); + self.range_builders + .build_file_ranges(&self.input, index, ranges, reader_metrics) + .await + } + + /// Creates memtable ranges to scan. + pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex, ranges: &mut Vec) { + ranges.clear(); + self.range_builders + .build_mem_ranges(&self.input, index, ranges) + } + + /// Retrieves the partition ranges. + pub(crate) fn partition_ranges(&self) -> Vec { + self.ranges + .iter() + .enumerate() + .map(|(idx, range_meta)| PartitionRange { + start: range_meta.time_range.0, + end: range_meta.time_range.1, + num_rows: range_meta.num_rows, + identifier: idx, + }) + .collect() + } + /// Format the context for explain. pub(crate) fn format_for_explain( &self, @@ -1011,3 +1087,125 @@ impl StreamContext { Ok(()) } } + +/// List to manages the builders to create file ranges. +struct RangeBuilderList { + mem_builders: Vec>>, + file_builders: Vec>>, +} + +impl RangeBuilderList { + /// Creates a new [ReaderBuilderList] with the given number of memtables and files. + fn new(num_memtables: usize, num_files: usize) -> Self { + let mem_builders = (0..num_memtables).map(|_| StdMutex::new(None)).collect(); + let file_builders = (0..num_files).map(|_| Mutex::new(None)).collect(); + Self { + mem_builders, + file_builders, + } + } + + /// Builds file ranges to read the row group at `index`. + async fn build_file_ranges( + &self, + input: &ScanInput, + index: RowGroupIndex, + ranges: &mut Vec, + reader_metrics: &mut ReaderMetrics, + ) -> Result<()> { + let file_index = index.index - self.mem_builders.len(); + let mut builder_opt = self.file_builders[file_index].lock().await; + match &mut *builder_opt { + Some(builder) => builder.build_ranges(index.row_group_index, ranges), + None => { + let builder = input.prune_file(file_index, reader_metrics).await?; + builder.build_ranges(index.row_group_index, ranges); + *builder_opt = Some(builder); + } + } + Ok(()) + } + + /// Builds mem ranges to read the row group at `index`. + fn build_mem_ranges( + &self, + input: &ScanInput, + index: RowGroupIndex, + ranges: &mut Vec, + ) { + let mut builder_opt = self.mem_builders[index.index].lock().unwrap(); + match &mut *builder_opt { + Some(builder) => builder.build_ranges(index.row_group_index, ranges), + None => { + let builder = input.prune_memtable(index.index); + builder.build_ranges(index.row_group_index, ranges); + *builder_opt = Some(builder); + } + } + } +} + +/// Builder to create file ranges. +#[derive(Default)] +struct FileRangeBuilder { + /// Context for the file. + /// None indicates nothing to read. + context: Option, + /// Row selections for each row group to read. + /// It skips the row group if it is not in the map. + row_groups: BTreeMap>, +} + +impl FileRangeBuilder { + /// Builds file ranges to read. + /// Negative `row_group_index` indicates all row groups. + fn build_ranges(&self, row_group_index: i64, ranges: &mut Vec) { + let Some(context) = self.context.clone() else { + return; + }; + if row_group_index >= 0 { + let row_group_index = row_group_index as usize; + // Scans one row group. + let Some(row_selection) = self.row_groups.get(&row_group_index) else { + return; + }; + ranges.push(FileRange::new( + context, + row_group_index, + row_selection.clone(), + )); + } else { + // Scans all row groups. + ranges.extend( + self.row_groups + .iter() + .map(|(row_group_index, row_selection)| { + FileRange::new(context.clone(), *row_group_index, row_selection.clone()) + }), + ); + } + } +} + +/// Builder to create mem ranges. +struct MemRangeBuilder { + /// Ranges of a memtable. + row_groups: BTreeMap, +} + +impl MemRangeBuilder { + /// Builds mem ranges to read in the memtable. + /// Negative `row_group_index` indicates all row groups. + fn build_ranges(&self, row_group_index: i64, ranges: &mut Vec) { + if row_group_index >= 0 { + let row_group_index = row_group_index as usize; + // Scans one row group. + let Some(range) = self.row_groups.get(&row_group_index) else { + return; + }; + ranges.push(range.clone()); + } else { + ranges.extend(self.row_groups.values().cloned()); + } + } +} diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 296d55250b0e..6e45c8ce4a80 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -68,11 +68,10 @@ impl SeqScan { pub(crate) fn new(input: ScanInput) -> Self { let parallelism = input.parallelism.parallelism.max(1); let mut properties = ScannerProperties::default() - .with_parallelism(parallelism) .with_append_mode(input.append_mode) .with_total_rows(input.total_rows()); - properties.partitions = vec![input.partition_ranges()]; - let stream_ctx = Arc::new(StreamContext::new(input)); + let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input)); + properties.partitions = vec![stream_ctx.partition_ranges()]; Self { properties, @@ -594,7 +593,7 @@ impl SeqDistributor { continue; } let part = ScanPart { - memtable_ranges: mem_ranges, + memtable_ranges: mem_ranges.into_values().collect(), file_ranges: smallvec![], time_range: stats.time_range(), }; diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index 67e87197a697..8a019132d784 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -16,7 +16,7 @@ use std::fmt; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Instant; use async_stream::{stream, try_stream}; use common_error::ext::BoxedError; @@ -25,23 +25,18 @@ use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper, SendableRecordBa use common_telemetry::debug; use datafusion::physical_plan::{DisplayAs, DisplayFormatType}; use datatypes::schema::SchemaRef; -use futures::StreamExt; -use smallvec::smallvec; +use futures::{Stream, StreamExt}; use snafu::ResultExt; use store_api::region_engine::{PartitionRange, RegionScanner, ScannerProperties}; -use store_api::storage::ColumnId; -use table::predicate::Predicate; use crate::cache::CacheManager; use crate::error::Result; -use crate::memtable::{MemtableRange, MemtableRef}; +use crate::memtable::MemtableRange; use crate::read::compat::CompatBatch; use crate::read::projection::ProjectionMapper; -use crate::read::scan_region::{ - FileRangeCollector, ScanInput, ScanPart, ScanPartList, StreamContext, -}; +use crate::read::range::RowGroupIndex; +use crate::read::scan_region::{ScanInput, StreamContext}; use crate::read::{ScannerMetrics, Source}; -use crate::sst::file::FileMeta; use crate::sst::parquet::file_range::FileRange; use crate::sst::parquet::reader::ReaderMetrics; @@ -58,13 +53,11 @@ pub struct UnorderedScan { impl UnorderedScan { /// Creates a new [UnorderedScan]. pub(crate) fn new(input: ScanInput) -> Self { - let parallelism = input.parallelism.parallelism.max(1); let mut properties = ScannerProperties::default() - .with_parallelism(parallelism) .with_append_mode(input.append_mode) .with_total_rows(input.total_rows()); - properties.partitions = vec![input.partition_ranges()]; - let stream_ctx = Arc::new(StreamContext::new(input)); + let stream_ctx = Arc::new(StreamContext::unordered_scan_ctx(input)); + properties.partitions = vec![stream_ctx.partition_ranges()]; Self { properties, @@ -127,64 +120,56 @@ impl UnorderedScan { Ok(Some(record_batch)) } -} -impl RegionScanner for UnorderedScan { - fn properties(&self) -> &ScannerProperties { - &self.properties - } - - fn schema(&self) -> SchemaRef { - self.stream_ctx.input.mapper.output_schema() - } - - fn prepare(&mut self, ranges: Vec>) -> Result<(), BoxedError> { - self.properties.partitions = ranges; - Ok(()) + /// Scans a [PartitionRange] and returns a stream. + fn scan_partition_range<'a>( + stream_ctx: &'a StreamContext, + part_range: &'a PartitionRange, + mem_ranges: &'a mut Vec, + file_ranges: &'a mut Vec, + reader_metrics: &'a mut ReaderMetrics, + metrics: &'a mut ScannerMetrics, + ) -> impl Stream> + 'a { + stream! { + // Gets range meta. + let range_meta = &stream_ctx.ranges[part_range.identifier]; + for index in &range_meta.row_group_indices { + if stream_ctx.is_mem_range_index(*index) { + let stream = Self::scan_mem_ranges(stream_ctx, *index, mem_ranges, metrics); + for await batch in stream { + yield batch; + } + } else { + let stream = Self::scan_file_ranges(stream_ctx, *index, file_ranges, reader_metrics, metrics); + for await batch in stream { + yield batch; + } + } + } + } } - fn scan_partition(&self, partition: usize) -> Result { - let mut metrics = ScannerMetrics { - prepare_scan_cost: self.stream_ctx.query_start.elapsed(), - ..Default::default() - }; - let stream_ctx = self.stream_ctx.clone(); - let parallelism = self.properties.num_partitions(); - let stream = try_stream! { - let first_poll = stream_ctx.query_start.elapsed(); - let part = { - let mut parts = stream_ctx.parts.lock().await; - maybe_init_parts(&stream_ctx.input, &mut parts, &mut metrics, parallelism) - .await - .map_err(BoxedError::new) - .context(ExternalSnafu)?; - // Clone the part and releases the lock. - // TODO(yingwen): We might wrap the part in an Arc in the future if cloning is too expensive. - let Some(part) = parts.0.get_part(partition).cloned() else { - return; - }; - part - }; - - let build_reader_start = Instant::now(); + /// Scans memtable ranges at `index`. + fn scan_mem_ranges<'a>( + stream_ctx: &'a StreamContext, + index: RowGroupIndex, + ranges: &'a mut Vec, + metrics: &'a mut ScannerMetrics, + ) -> impl Stream> + 'a { + try_stream! { let mapper = &stream_ctx.input.mapper; - let memtable_sources = part - .memtable_ranges - .iter() - .map(|mem| { - let iter = mem.build_iter()?; - Ok(Source::Iter(iter)) - }) - .collect::>>() - .map_err(BoxedError::new) - .context(ExternalSnafu)?; - metrics.build_reader_cost = build_reader_start.elapsed(); - - let query_start = stream_ctx.query_start; let cache = stream_ctx.input.cache_manager.as_deref(); - // Scans memtables first. - for mut source in memtable_sources { - while let Some(batch) = Self::fetch_from_source(&mut source, mapper, cache, None, &mut metrics).await? { + stream_ctx.build_mem_ranges(index, ranges); + metrics.num_mem_ranges += ranges.len(); + for range in ranges { + let build_reader_start = Instant::now(); + let iter = range.build_iter().map_err(BoxedError::new).context(ExternalSnafu)?; + metrics.build_reader_cost = build_reader_start.elapsed(); + + let mut source = Source::Iter(iter); + while let Some(batch) = + Self::fetch_from_source(&mut source, mapper, cache, None, metrics).await? + { metrics.num_batches += 1; metrics.num_rows += batch.num_rows(); let yield_start = Instant::now(); @@ -192,16 +177,40 @@ impl RegionScanner for UnorderedScan { metrics.yield_cost += yield_start.elapsed(); } } - // Then scans file ranges. - let mut reader_metrics = ReaderMetrics::default(); - // Safety: UnorderedDistributor::build_parts() ensures this. - for file_range in &part.file_ranges[0] { + } + } + + /// Scans file ranges at `index`. + fn scan_file_ranges<'a>( + stream_ctx: &'a StreamContext, + index: RowGroupIndex, + ranges: &'a mut Vec, + reader_metrics: &'a mut ReaderMetrics, + metrics: &'a mut ScannerMetrics, + ) -> impl Stream> + 'a { + try_stream! { + let mapper = &stream_ctx.input.mapper; + let cache = stream_ctx.input.cache_manager.as_deref(); + stream_ctx + .build_file_ranges(index, ranges, reader_metrics) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + metrics.num_file_ranges += ranges.len(); + for range in ranges { let build_reader_start = Instant::now(); - let reader = file_range.reader(None).await.map_err(BoxedError::new).context(ExternalSnafu)?; + let reader = range + .reader(None) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)?; metrics.build_reader_cost += build_reader_start.elapsed(); - let compat_batch = file_range.compat_batch(); + let compat_batch = range.compat_batch(); let mut source = Source::PruneReader(reader); - while let Some(batch) = Self::fetch_from_source(&mut source, mapper, cache, compat_batch, &mut metrics).await? { + while let Some(batch) = + Self::fetch_from_source(&mut source, mapper, cache, compat_batch, metrics) + .await? + { metrics.num_batches += 1; metrics.num_rows += batch.num_rows(); let yield_start = Instant::now(); @@ -212,13 +221,51 @@ impl RegionScanner for UnorderedScan { reader_metrics.merge_from(reader.metrics()); } } + } + } + + fn scan_partition_impl( + &self, + partition: usize, + ) -> Result { + let mut metrics = ScannerMetrics { + prepare_scan_cost: self.stream_ctx.query_start.elapsed(), + ..Default::default() + }; + let stream_ctx = self.stream_ctx.clone(); + let ranges_opt = self.properties.partitions.get(partition).cloned(); + + let stream = stream! { + let first_poll = stream_ctx.query_start.elapsed(); + let Some(part_ranges) = ranges_opt else { + return; + }; + + let mut mem_ranges = Vec::new(); + let mut file_ranges = Vec::new(); + let mut reader_metrics = ReaderMetrics::default(); + // Scans each part. + for part_range in part_ranges { + let stream = Self::scan_partition_range( + &stream_ctx, + &part_range, + &mut mem_ranges, + &mut file_ranges, + &mut reader_metrics, + &mut metrics, + ); + for await batch in stream { + yield batch; + } + } reader_metrics.observe_rows("unordered_scan_files"); - metrics.total_cost = query_start.elapsed(); + metrics.total_cost = stream_ctx.query_start.elapsed(); metrics.observe_metrics_on_finish(); + let mapper = &stream_ctx.input.mapper; debug!( - "Unordered scan partition {} finished, region_id: {}, metrics: {:?}, reader_metrics: {:?}, first_poll: {:?}, ranges: {}", - partition, mapper.metadata().region_id, metrics, reader_metrics, first_poll, part.file_ranges[0].len(), + "Unordered scan partition {} finished, region_id: {}, metrics: {:?}, reader_metrics: {:?}, first_poll: {:?}", + partition, mapper.metadata().region_id, metrics, reader_metrics, first_poll, ); }; let stream = Box::pin(RecordBatchStreamWrapper::new( @@ -228,6 +275,25 @@ impl RegionScanner for UnorderedScan { Ok(stream) } +} + +impl RegionScanner for UnorderedScan { + fn properties(&self) -> &ScannerProperties { + &self.properties + } + + fn schema(&self) -> SchemaRef { + self.stream_ctx.input.mapper.output_schema() + } + + fn prepare(&mut self, ranges: Vec>) -> Result<(), BoxedError> { + self.properties.partitions = ranges; + Ok(()) + } + + fn scan_partition(&self, partition: usize) -> Result { + self.scan_partition_impl(partition) + } fn has_predicate(&self) -> bool { let predicate = self.stream_ctx.input.predicate(); @@ -261,117 +327,3 @@ impl UnorderedScan { &self.stream_ctx.input } } - -/// Initializes parts if they are not built yet. -async fn maybe_init_parts( - input: &ScanInput, - part_list: &mut (ScanPartList, Duration), - metrics: &mut ScannerMetrics, - parallelism: usize, -) -> Result<()> { - if part_list.0.is_none() { - let now = Instant::now(); - let mut distributor = UnorderedDistributor::default(); - let reader_metrics = input.prune_file_ranges(&mut distributor).await?; - distributor.append_mem_ranges( - &input.memtables, - Some(input.mapper.column_ids()), - input.predicate.clone(), - ); - part_list.0.set_parts(distributor.build_parts(parallelism)); - let build_part_cost = now.elapsed(); - part_list.1 = build_part_cost; - - metrics.observe_init_part(build_part_cost, &reader_metrics); - } else { - // Updates the cost of building parts. - metrics.build_parts_cost = part_list.1; - } - Ok(()) -} - -/// Builds [ScanPart]s without preserving order. It distributes file ranges and memtables -/// across partitions. Each partition scans a subset of memtables and file ranges. There -/// is no output ordering guarantee of each partition. -#[derive(Default)] -struct UnorderedDistributor { - mem_ranges: Vec, - file_ranges: Vec, -} - -impl FileRangeCollector for UnorderedDistributor { - fn append_file_ranges( - &mut self, - _file_meta: &FileMeta, - file_ranges: impl Iterator, - ) { - self.file_ranges.extend(file_ranges); - } -} - -impl UnorderedDistributor { - /// Appends memtable ranges to the distributor. - fn append_mem_ranges( - &mut self, - memtables: &[MemtableRef], - projection: Option<&[ColumnId]>, - predicate: Option, - ) { - for mem in memtables { - let mut mem_ranges = mem.ranges(projection, predicate.clone()); - if mem_ranges.is_empty() { - continue; - } - self.mem_ranges.append(&mut mem_ranges); - } - } - - /// Distributes file ranges and memtables across partitions according to the `parallelism`. - /// The output number of parts may be `<= parallelism`. - /// - /// [ScanPart] created by this distributor only contains one group of file ranges. - fn build_parts(self, parallelism: usize) -> Vec { - if parallelism <= 1 { - // Returns a single part. - let part = ScanPart { - memtable_ranges: self.mem_ranges.clone(), - file_ranges: smallvec![self.file_ranges], - time_range: None, - }; - return vec![part]; - } - - let mems_per_part = ((self.mem_ranges.len() + parallelism - 1) / parallelism).max(1); - let ranges_per_part = ((self.file_ranges.len() + parallelism - 1) / parallelism).max(1); - debug!( - "Parallel scan is enabled, parallelism: {}, {} mem_ranges, {} file_ranges, mems_per_part: {}, ranges_per_part: {}", - parallelism, - self.mem_ranges.len(), - self.file_ranges.len(), - mems_per_part, - ranges_per_part - ); - let mut scan_parts = self - .mem_ranges - .chunks(mems_per_part) - .map(|mems| ScanPart { - memtable_ranges: mems.to_vec(), - file_ranges: smallvec![Vec::new()], // Ensures there is always one group. - time_range: None, - }) - .collect::>(); - for (i, ranges) in self.file_ranges.chunks(ranges_per_part).enumerate() { - if i == scan_parts.len() { - scan_parts.push(ScanPart { - memtable_ranges: Vec::new(), - file_ranges: smallvec![ranges.to_vec()], - time_range: None, - }); - } else { - scan_parts[i].file_ranges = smallvec![ranges.to_vec()]; - } - } - - scan_parts - } -} diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index cce996553970..cc0cff605b5c 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -40,7 +40,7 @@ pub const PARQUET_METADATA_KEY: &str = "greptime:metadata"; /// Default batch size to read parquet files. pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024; /// Default row group size for parquet files. -const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE; +pub(crate) const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE; /// Parquet write options. #[derive(Debug)] diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 766dab04290e..ff7fe77e7a9f 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -238,6 +238,7 @@ impl ParquetReaderBuilder { cache_manager: self.cache_manager.clone(), }; + // TODO(yingwen): count the cost of the method. metrics.build_cost = start.elapsed(); let mut filters = if let Some(predicate) = &self.predicate { diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs index 235e9694c712..70dec3cad446 100644 --- a/src/mito2/src/test_util/memtable_util.rs +++ b/src/mito2/src/test_util/memtable_util.rs @@ -14,6 +14,7 @@ //! Memtable test utilities. +use std::collections::BTreeMap; use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; @@ -92,8 +93,8 @@ impl Memtable for EmptyMemtable { &self, _projection: Option<&[ColumnId]>, _predicate: Option, - ) -> Vec { - vec![] + ) -> BTreeMap { + BTreeMap::new() } fn is_empty(&self) -> bool {