Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: unordered scanner scans data by time ranges #4757

Merged
merged 14 commits into from
Sep 29, 2024
12 changes: 10 additions & 2 deletions src/mito2/src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

//! Memtables are write buffers for regions.

use std::collections::BTreeMap;
use std::fmt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
Expand Down Expand Up @@ -71,6 +72,8 @@ pub struct MemtableStats {
time_range: Option<(Timestamp, Timestamp)>,
/// Total rows in memtable
num_rows: usize,
/// Total number of ranges in the memtable.
num_ranges: usize,
}

impl MemtableStats {
Expand All @@ -95,6 +98,11 @@ impl MemtableStats {
pub fn num_rows(&self) -> usize {
self.num_rows
}

/// Returns the number of ranges in the memtable.
pub fn num_ranges(&self) -> usize {
self.num_ranges
}
}

pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
Expand Down Expand Up @@ -123,11 +131,12 @@ pub trait Memtable: Send + Sync + fmt::Debug {
) -> Result<BoxedBatchIterator>;

/// Returns the ranges in the memtable.
/// The returned map contains the range id and the range after applying the predicate.
fn ranges(
&self,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
) -> Vec<MemtableRange>;
) -> BTreeMap<usize, MemtableRange>;

/// Returns true if the memtable is empty.
fn is_empty(&self) -> bool;
Expand Down Expand Up @@ -332,7 +341,6 @@ impl MemtableRangeContext {
pub struct MemtableRange {
/// Shared context.
context: MemtableRangeContextRef,
// TODO(yingwen): Id to identify the range in the memtable.
}

impl MemtableRange {
Expand Down
3 changes: 2 additions & 1 deletion src/mito2/src/memtable/bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

//! Memtable implementation for bulk load

use std::collections::BTreeMap;
use std::sync::{Arc, RwLock};

use store_api::metadata::RegionMetadataRef;
Expand Down Expand Up @@ -67,7 +68,7 @@ impl Memtable for BulkMemtable {
&self,
_projection: Option<&[ColumnId]>,
_predicate: Option<Predicate>,
) -> Vec<MemtableRange> {
) -> BTreeMap<usize, MemtableRange> {
todo!()
}

Expand Down
7 changes: 5 additions & 2 deletions src/mito2/src/memtable/partition_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod shard;
mod shard_builder;
mod tree;

use std::collections::BTreeMap;
use std::fmt;
use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering};
use std::sync::Arc;
Expand Down Expand Up @@ -176,7 +177,7 @@ impl Memtable for PartitionTreeMemtable {
&self,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
) -> Vec<MemtableRange> {
) -> BTreeMap<usize, MemtableRange> {
let projection = projection.map(|ids| ids.to_vec());
let builder = Box::new(PartitionTreeIterBuilder {
tree: self.tree.clone(),
Expand All @@ -185,7 +186,7 @@ impl Memtable for PartitionTreeMemtable {
});
let context = Arc::new(MemtableRangeContext::new(self.id, builder));

vec![MemtableRange::new(context)]
[(0, MemtableRange::new(context))].into()
}

fn is_empty(&self) -> bool {
Expand All @@ -207,6 +208,7 @@ impl Memtable for PartitionTreeMemtable {
estimated_bytes,
time_range: None,
num_rows: 0,
num_ranges: 0,
};
}

Expand All @@ -225,6 +227,7 @@ impl Memtable for PartitionTreeMemtable {
estimated_bytes,
time_range: Some((min_timestamp, max_timestamp)),
num_rows: self.num_rows.load(Ordering::Relaxed),
num_ranges: 1,
}
}

Expand Down
6 changes: 4 additions & 2 deletions src/mito2/src/memtable/time_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ impl Memtable for TimeSeriesMemtable {
&self,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
) -> Vec<MemtableRange> {
) -> BTreeMap<usize, MemtableRange> {
let projection = if let Some(projection) = projection {
projection.iter().copied().collect()
} else {
Expand All @@ -305,7 +305,7 @@ impl Memtable for TimeSeriesMemtable {
});
let context = Arc::new(MemtableRangeContext::new(self.id, builder));

vec![MemtableRange::new(context)]
[(0, MemtableRange::new(context))].into()
}

fn is_empty(&self) -> bool {
Expand All @@ -327,6 +327,7 @@ impl Memtable for TimeSeriesMemtable {
estimated_bytes,
time_range: None,
num_rows: 0,
num_ranges: 0,
};
}
let ts_type = self
Expand All @@ -343,6 +344,7 @@ impl Memtable for TimeSeriesMemtable {
estimated_bytes,
time_range: Some((min_timestamp, max_timestamp)),
num_rows: self.num_rows.load(Ordering::Relaxed),
num_ranges: 1,
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/mito2/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod last_row;
pub mod merge;
pub mod projection;
pub(crate) mod prune;
pub(crate) mod range;
pub(crate) mod scan_region;
pub(crate) mod seq_scan;
pub(crate) mod unordered_scan;
Expand Down Expand Up @@ -753,6 +754,10 @@ pub(crate) struct ScannerMetrics {
num_batches: usize,
/// Number of rows returned.
num_rows: usize,
/// Number of mem ranges scanned.
num_mem_ranges: usize,
/// Number of file ranges scanned.
num_file_ranges: usize,
/// Filter related metrics for readers.
filter_metrics: ReaderFilterMetrics,
}
Expand Down
1 change: 0 additions & 1 deletion src/mito2/src/read/dedup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,6 @@ pub(crate) struct LastNonNull {

impl LastNonNull {
/// Creates a new strategy with the given `filter_deleted` flag.
#[allow(dead_code)]
pub(crate) fn new(filter_deleted: bool) -> Self {
Self {
buffer: None,
Expand Down
Loading