Skip to content

Commit

Permalink
feat: do not keep MemtableRefs in ScanInput (#5184)
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag authored Dec 18, 2024
1 parent e662c24 commit 58d6982
Show file tree
Hide file tree
Showing 10 changed files with 66 additions and 60 deletions.
11 changes: 10 additions & 1 deletion src/mito2/src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,15 @@ impl MemtableStats {

pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;

/// Ranges in a memtable.
#[derive(Default)]
pub struct MemtableRanges {
/// Range IDs and ranges.
pub ranges: BTreeMap<usize, MemtableRange>,
/// Statistics of the memtable at the query time.
pub stats: MemtableStats,
}

/// In memory write buffer.
pub trait Memtable: Send + Sync + fmt::Debug {
/// Returns the id of this memtable.
Expand Down Expand Up @@ -139,7 +148,7 @@ pub trait Memtable: Send + Sync + fmt::Debug {
&self,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
) -> BTreeMap<usize, MemtableRange>;
) -> MemtableRanges;

/// Returns true if the memtable is empty.
fn is_empty(&self) -> bool;
Expand Down
5 changes: 2 additions & 3 deletions src/mito2/src/memtable/bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

//! Memtable implementation for bulk load
use std::collections::BTreeMap;
use std::sync::{Arc, RwLock};

use store_api::metadata::RegionMetadataRef;
Expand All @@ -25,7 +24,7 @@ use crate::error::Result;
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::key_values::KeyValue;
use crate::memtable::{
BoxedBatchIterator, KeyValues, Memtable, MemtableId, MemtableRange, MemtableRef, MemtableStats,
BoxedBatchIterator, KeyValues, Memtable, MemtableId, MemtableRanges, MemtableRef, MemtableStats,
};

#[allow(unused)]
Expand Down Expand Up @@ -68,7 +67,7 @@ impl Memtable for BulkMemtable {
&self,
_projection: Option<&[ColumnId]>,
_predicate: Option<Predicate>,
) -> BTreeMap<usize, MemtableRange> {
) -> MemtableRanges {
todo!()
}

Expand Down
10 changes: 6 additions & 4 deletions src/mito2/src/memtable/partition_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ mod shard;
mod shard_builder;
mod tree;

use std::collections::BTreeMap;
use std::fmt;
use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering};
use std::sync::Arc;
Expand All @@ -41,7 +40,7 @@ use crate::memtable::partition_tree::tree::PartitionTree;
use crate::memtable::stats::WriteMetrics;
use crate::memtable::{
AllocTracker, BoxedBatchIterator, BulkPart, IterBuilder, KeyValues, Memtable, MemtableBuilder,
MemtableId, MemtableRange, MemtableRangeContext, MemtableRef, MemtableStats,
MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
};
use crate::region::options::MergeMode;

Expand Down Expand Up @@ -176,7 +175,7 @@ impl Memtable for PartitionTreeMemtable {
&self,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
) -> BTreeMap<usize, MemtableRange> {
) -> MemtableRanges {
let projection = projection.map(|ids| ids.to_vec());
let builder = Box::new(PartitionTreeIterBuilder {
tree: self.tree.clone(),
Expand All @@ -185,7 +184,10 @@ impl Memtable for PartitionTreeMemtable {
});
let context = Arc::new(MemtableRangeContext::new(self.id, builder));

[(0, MemtableRange::new(context))].into()
MemtableRanges {
ranges: [(0, MemtableRange::new(context))].into(),
stats: self.stats(),
}
}

fn is_empty(&self) -> bool {
Expand Down
9 changes: 6 additions & 3 deletions src/mito2/src/memtable/time_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use crate::memtable::key_values::KeyValue;
use crate::memtable::stats::WriteMetrics;
use crate::memtable::{
AllocTracker, BoxedBatchIterator, BulkPart, IterBuilder, KeyValues, Memtable, MemtableBuilder,
MemtableId, MemtableRange, MemtableRangeContext, MemtableRef, MemtableStats,
MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
};
use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
use crate::read::dedup::LastNonNullIter;
Expand Down Expand Up @@ -250,7 +250,7 @@ impl Memtable for TimeSeriesMemtable {
&self,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
) -> BTreeMap<usize, MemtableRange> {
) -> MemtableRanges {
let projection = if let Some(projection) = projection {
projection.iter().copied().collect()
} else {
Expand All @@ -268,7 +268,10 @@ impl Memtable for TimeSeriesMemtable {
});
let context = Arc::new(MemtableRangeContext::new(self.id, builder));

[(0, MemtableRange::new(context))].into()
MemtableRanges {
ranges: [(0, MemtableRange::new(context))].into(),
stats: self.stats(),
}
}

fn is_empty(&self) -> bool {
Expand Down
50 changes: 18 additions & 32 deletions src/mito2/src/read/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use store_api::region_engine::PartitionRange;

use crate::cache::CacheManager;
use crate::error::Result;
use crate::memtable::{MemtableRange, MemtableRef};
use crate::memtable::{MemtableRange, MemtableRanges, MemtableStats};
use crate::read::scan_region::ScanInput;
use crate::sst::file::{overlaps, FileHandle, FileTimeRange};
use crate::sst::parquet::file_range::{FileRange, FileRangeContextRef};
Expand Down Expand Up @@ -175,7 +175,7 @@ impl RangeMeta {
}
}

fn push_unordered_mem_ranges(memtables: &[MemtableRef], ranges: &mut Vec<RangeMeta>) {
fn push_unordered_mem_ranges(memtables: &[MemRangeBuilder], ranges: &mut Vec<RangeMeta>) {
// For append mode, we can parallelize reading memtables.
for (memtable_index, memtable) in memtables.iter().enumerate() {
let stats = memtable.stats();
Expand Down Expand Up @@ -270,7 +270,7 @@ impl RangeMeta {
}
}

fn push_seq_mem_ranges(memtables: &[MemtableRef], ranges: &mut Vec<RangeMeta>) {
fn push_seq_mem_ranges(memtables: &[MemRangeBuilder], ranges: &mut Vec<RangeMeta>) {
// For non append-only mode, each range only contains one memtable by default.
for (i, memtable) in memtables.iter().enumerate() {
let stats = memtable.stats();
Expand Down Expand Up @@ -421,48 +421,54 @@ impl FileRangeBuilder {
/// Builder to create mem ranges.
pub(crate) struct MemRangeBuilder {
/// Ranges of a memtable.
row_groups: BTreeMap<usize, MemtableRange>,
ranges: MemtableRanges,
}

impl MemRangeBuilder {
/// Builds a mem range builder from row groups.
pub(crate) fn new(row_groups: BTreeMap<usize, MemtableRange>) -> Self {
Self { row_groups }
pub(crate) fn new(ranges: MemtableRanges) -> Self {
Self { ranges }
}

/// Builds mem ranges to read in the memtable.
/// Negative `row_group_index` indicates all row groups.
fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[MemtableRange; 2]>) {
pub(crate) fn build_ranges(
&self,
row_group_index: i64,
ranges: &mut SmallVec<[MemtableRange; 2]>,
) {
if row_group_index >= 0 {
let row_group_index = row_group_index as usize;
// Scans one row group.
let Some(range) = self.row_groups.get(&row_group_index) else {
let Some(range) = self.ranges.ranges.get(&row_group_index) else {
return;
};
ranges.push(range.clone());
} else {
ranges.extend(self.row_groups.values().cloned());
ranges.extend(self.ranges.ranges.values().cloned());
}
}

/// Returns the statistics of the memtable.
pub(crate) fn stats(&self) -> &MemtableStats {
&self.ranges.stats
}
}

/// List to manages the builders to create file ranges.
/// Each scan partition should have its own list. Mutex inside this list is used to allow moving
/// the list to different streams in the same partition.
pub(crate) struct RangeBuilderList {
num_memtables: usize,
mem_builders: Mutex<Vec<Option<MemRangeBuilder>>>,
file_builders: Mutex<Vec<Option<Arc<FileRangeBuilder>>>>,
}

impl RangeBuilderList {
/// Creates a new [ReaderBuilderList] with the given number of memtables and files.
pub(crate) fn new(num_memtables: usize, num_files: usize) -> Self {
let mem_builders = (0..num_memtables).map(|_| None).collect();
let file_builders = (0..num_files).map(|_| None).collect();
Self {
num_memtables,
mem_builders: Mutex::new(mem_builders),
file_builders: Mutex::new(file_builders),
}
}
Expand All @@ -488,26 +494,6 @@ impl RangeBuilderList {
Ok(ranges)
}

/// Builds mem ranges to read the row group at `index`.
pub(crate) fn build_mem_ranges(
&self,
input: &ScanInput,
index: RowGroupIndex,
) -> SmallVec<[MemtableRange; 2]> {
let mut ranges = SmallVec::new();
let mut mem_builders = self.mem_builders.lock().unwrap();
match &mut mem_builders[index.index] {
Some(builder) => builder.build_ranges(index.row_group_index, &mut ranges),
None => {
let builder = input.prune_memtable(index.index);
builder.build_ranges(index.row_group_index, &mut ranges);
mem_builders[index.index] = Some(builder);
}
}

ranges
}

fn get_file_builder(&self, index: usize) -> Option<Arc<FileRangeBuilder>> {
let file_builders = self.file_builders.lock().unwrap();
file_builders[index].clone()
Expand Down
30 changes: 20 additions & 10 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::{debug, error, tracing, warn};
use common_time::range::TimestampRange;
use datafusion_expr::utils::expr_to_columns;
use smallvec::SmallVec;
use store_api::region_engine::{PartitionRange, RegionScannerRef};
use store_api::storage::{ScanRequest, TimeSeriesRowSelector};
use table::predicate::{build_time_range_predicate, Predicate};
Expand All @@ -35,7 +36,7 @@ use crate::cache::file_cache::FileCacheRef;
use crate::cache::CacheManagerRef;
use crate::config::DEFAULT_SCAN_CHANNEL_SIZE;
use crate::error::Result;
use crate::memtable::MemtableRef;
use crate::memtable::MemtableRange;
use crate::metrics::READ_SST_COUNT;
use crate::read::compat::{self, CompatBatch};
use crate::read::projection::ProjectionMapper;
Expand Down Expand Up @@ -328,6 +329,14 @@ impl ScanRegion {
Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied())?,
None => ProjectionMapper::all(&self.version.metadata)?,
};
// Get memtable ranges to scan.
let memtables = memtables
.into_iter()
.map(|mem| {
let ranges = mem.ranges(Some(mapper.column_ids()), Some(predicate.clone()));
MemRangeBuilder::new(ranges)
})
.collect();

let input = ScanInput::new(self.access_layer, mapper)
.with_time_range(Some(time_range))
Expand Down Expand Up @@ -484,8 +493,8 @@ pub(crate) struct ScanInput {
time_range: Option<TimestampRange>,
/// Predicate to push down.
pub(crate) predicate: Option<Predicate>,
/// Memtables to scan.
pub(crate) memtables: Vec<MemtableRef>,
/// Memtable range builders for memtables in the time range..
pub(crate) memtables: Vec<MemRangeBuilder>,
/// Handles to SST files to scan.
pub(crate) files: Vec<FileHandle>,
/// Cache.
Expand Down Expand Up @@ -547,9 +556,9 @@ impl ScanInput {
self
}

/// Sets memtables to read.
/// Sets memtable range builders.
#[must_use]
pub(crate) fn with_memtables(mut self, memtables: Vec<MemtableRef>) -> Self {
pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
self.memtables = memtables;
self
}
Expand Down Expand Up @@ -667,11 +676,12 @@ impl ScanInput {
Ok(sources)
}

/// Prunes a memtable to scan and returns the builder to build readers.
pub(crate) fn prune_memtable(&self, mem_index: usize) -> MemRangeBuilder {
let memtable = &self.memtables[mem_index];
let row_groups = memtable.ranges(Some(self.mapper.column_ids()), self.predicate.clone());
MemRangeBuilder::new(row_groups)
/// Builds memtable ranges to scan by `index`.
pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
let memtable = &self.memtables[index.index];
let mut ranges = SmallVec::new();
memtable.build_ranges(index.row_group_index, &mut ranges);
ranges
}

/// Prunes a file to scan and returns the builder to build readers.
Expand Down
3 changes: 1 addition & 2 deletions src/mito2/src/read/scan_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,9 @@ pub(crate) fn scan_mem_ranges(
part_metrics: PartitionMetrics,
index: RowGroupIndex,
time_range: FileTimeRange,
range_builder_list: Arc<RangeBuilderList>,
) -> impl Stream<Item = Result<Batch>> {
try_stream! {
let ranges = range_builder_list.build_mem_ranges(&stream_ctx.input, index);
let ranges = stream_ctx.input.build_mem_ranges(index);
part_metrics.inc_num_mem_ranges(ranges.len());
for range in ranges {
let build_reader_start = Instant::now();
Expand Down
1 change: 0 additions & 1 deletion src/mito2/src/read/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,6 @@ fn build_sources(
part_metrics.clone(),
*index,
range_meta.time_range,
range_builder_list.clone(),
);
Box::pin(stream) as _
} else {
Expand Down
1 change: 0 additions & 1 deletion src/mito2/src/read/unordered_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ impl UnorderedScan {
part_metrics.clone(),
*index,
range_meta.time_range,
range_builder_list.clone(),
);
for await batch in stream {
yield batch;
Expand Down
6 changes: 3 additions & 3 deletions src/mito2/src/test_util/memtable_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::data::{timestamp_array_to_i64_slice, DataBatch, DataBuffer};
use crate::memtable::{
BoxedBatchIterator, BulkPart, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRange,
MemtableRef, MemtableStats,
MemtableRanges, MemtableRef, MemtableStats,
};
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};

Expand Down Expand Up @@ -93,8 +93,8 @@ impl Memtable for EmptyMemtable {
&self,
_projection: Option<&[ColumnId]>,
_predicate: Option<Predicate>,
) -> BTreeMap<usize, MemtableRange> {
BTreeMap::new()
) -> MemtableRanges {
MemtableRanges::default()
}

fn is_empty(&self) -> bool {
Expand Down

0 comments on commit 58d6982

Please sign in to comment.