Skip to content

Commit

Permalink
feat: unordered scanner scans data by time ranges (#4757)
Browse files Browse the repository at this point in the history
* feat: define range meta

* feat: group ranges

* feat: split range

* feat: build ranges from the scan input

* feat: get partition range from range meta

* feat: build file range

* feat: unordered scan read by ranges

* feat: wip for mem ranges

* feat: build ranges

* feat: remove unused codes

* chore: update comments

* feat: update metrics

* chore: address review comments

* chore: debug assertion
  • Loading branch information
evenyag authored Sep 29, 2024
1 parent 50cb595 commit cd55202
Show file tree
Hide file tree
Showing 13 changed files with 691 additions and 256 deletions.
12 changes: 10 additions & 2 deletions src/mito2/src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

//! Memtables are write buffers for regions.
use std::collections::BTreeMap;
use std::fmt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
Expand Down Expand Up @@ -71,6 +72,8 @@ pub struct MemtableStats {
time_range: Option<(Timestamp, Timestamp)>,
/// Total rows in memtable
num_rows: usize,
/// Total number of ranges in the memtable.
num_ranges: usize,
}

impl MemtableStats {
Expand All @@ -95,6 +98,11 @@ impl MemtableStats {
pub fn num_rows(&self) -> usize {
self.num_rows
}

/// Returns the number of ranges in the memtable.
pub fn num_ranges(&self) -> usize {
self.num_ranges
}
}

pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
Expand Down Expand Up @@ -123,11 +131,12 @@ pub trait Memtable: Send + Sync + fmt::Debug {
) -> Result<BoxedBatchIterator>;

/// Returns the ranges in the memtable.
/// The returned map contains the range id and the range after applying the predicate.
fn ranges(
&self,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
) -> Vec<MemtableRange>;
) -> BTreeMap<usize, MemtableRange>;

/// Returns true if the memtable is empty.
fn is_empty(&self) -> bool;
Expand Down Expand Up @@ -332,7 +341,6 @@ impl MemtableRangeContext {
pub struct MemtableRange {
/// Shared context.
context: MemtableRangeContextRef,
// TODO(yingwen): Id to identify the range in the memtable.
}

impl MemtableRange {
Expand Down
3 changes: 2 additions & 1 deletion src/mito2/src/memtable/bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

//! Memtable implementation for bulk load
use std::collections::BTreeMap;
use std::sync::{Arc, RwLock};

use store_api::metadata::RegionMetadataRef;
Expand Down Expand Up @@ -67,7 +68,7 @@ impl Memtable for BulkMemtable {
&self,
_projection: Option<&[ColumnId]>,
_predicate: Option<Predicate>,
) -> Vec<MemtableRange> {
) -> BTreeMap<usize, MemtableRange> {
todo!()
}

Expand Down
7 changes: 5 additions & 2 deletions src/mito2/src/memtable/partition_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod shard;
mod shard_builder;
mod tree;

use std::collections::BTreeMap;
use std::fmt;
use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering};
use std::sync::Arc;
Expand Down Expand Up @@ -176,7 +177,7 @@ impl Memtable for PartitionTreeMemtable {
&self,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
) -> Vec<MemtableRange> {
) -> BTreeMap<usize, MemtableRange> {
let projection = projection.map(|ids| ids.to_vec());
let builder = Box::new(PartitionTreeIterBuilder {
tree: self.tree.clone(),
Expand All @@ -185,7 +186,7 @@ impl Memtable for PartitionTreeMemtable {
});
let context = Arc::new(MemtableRangeContext::new(self.id, builder));

vec![MemtableRange::new(context)]
[(0, MemtableRange::new(context))].into()
}

fn is_empty(&self) -> bool {
Expand All @@ -207,6 +208,7 @@ impl Memtable for PartitionTreeMemtable {
estimated_bytes,
time_range: None,
num_rows: 0,
num_ranges: 0,
};
}

Expand All @@ -225,6 +227,7 @@ impl Memtable for PartitionTreeMemtable {
estimated_bytes,
time_range: Some((min_timestamp, max_timestamp)),
num_rows: self.num_rows.load(Ordering::Relaxed),
num_ranges: 1,
}
}

Expand Down
6 changes: 4 additions & 2 deletions src/mito2/src/memtable/time_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ impl Memtable for TimeSeriesMemtable {
&self,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
) -> Vec<MemtableRange> {
) -> BTreeMap<usize, MemtableRange> {
let projection = if let Some(projection) = projection {
projection.iter().copied().collect()
} else {
Expand All @@ -305,7 +305,7 @@ impl Memtable for TimeSeriesMemtable {
});
let context = Arc::new(MemtableRangeContext::new(self.id, builder));

vec![MemtableRange::new(context)]
[(0, MemtableRange::new(context))].into()
}

fn is_empty(&self) -> bool {
Expand All @@ -327,6 +327,7 @@ impl Memtable for TimeSeriesMemtable {
estimated_bytes,
time_range: None,
num_rows: 0,
num_ranges: 0,
};
}
let ts_type = self
Expand All @@ -343,6 +344,7 @@ impl Memtable for TimeSeriesMemtable {
estimated_bytes,
time_range: Some((min_timestamp, max_timestamp)),
num_rows: self.num_rows.load(Ordering::Relaxed),
num_ranges: 1,
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/mito2/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod last_row;
pub mod merge;
pub mod projection;
pub(crate) mod prune;
pub(crate) mod range;
pub(crate) mod scan_region;
pub(crate) mod seq_scan;
pub(crate) mod unordered_scan;
Expand Down Expand Up @@ -753,6 +754,10 @@ pub(crate) struct ScannerMetrics {
num_batches: usize,
/// Number of rows returned.
num_rows: usize,
/// Number of mem ranges scanned.
num_mem_ranges: usize,
/// Number of file ranges scanned.
num_file_ranges: usize,
/// Filter related metrics for readers.
filter_metrics: ReaderFilterMetrics,
}
Expand Down
1 change: 0 additions & 1 deletion src/mito2/src/read/dedup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,6 @@ pub(crate) struct LastNonNull {

impl LastNonNull {
/// Creates a new strategy with the given `filter_deleted` flag.
#[allow(dead_code)]
pub(crate) fn new(filter_deleted: bool) -> Self {
Self {
buffer: None,
Expand Down
Loading

0 comments on commit cd55202

Please sign in to comment.