Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: skip caching uncompressed pages if they are large #4705

Merged
merged 23 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 54 additions & 19 deletions src/mito2/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub(crate) mod write_cache;
use std::mem;
use std::sync::Arc;

use bytes::Bytes;
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
use moka::notification::RemovalCause;
Expand Down Expand Up @@ -395,15 +396,29 @@ impl SstMetaKey {

/// Cache key for pages of a SST row group.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PageKey {
/// Region id of the SST file to cache.
pub region_id: RegionId,
/// Id of the SST file to cache.
pub file_id: FileId,
/// Index of the row group.
pub row_group_idx: usize,
/// Index of the column in the row group.
pub column_idx: usize,
pub enum PageKey {
/// Cache key for a compressed byte range in a row group.
Compressed {
/// Region id of the SST file to cache.
region_id: RegionId,
/// Id of the SST file to cache.
file_id: FileId,
/// Index of the row group.
row_group_idx: usize,
/// Index of the column in the row group.
column_idx: usize,
},
/// Cache key for all uncompressed pages in a row group.
RowGroup {
/// Region id of the SST file to cache.
region_id: RegionId,
/// Id of the SST file to cache.
file_id: FileId,
/// Index of the row group.
row_group_idx: usize,
/// Index of the column in the row group.
column_idx: usize,
},
}

impl PageKey {
Expand All @@ -414,21 +429,41 @@ impl PageKey {
}

/// Cached row group pages for a column.
// We don't use enum here to make it easier to mock and use the struct.
#[derive(Default)]
pub struct PageValue {
/// Compressed page of the column in the row group.
pub compressed: Bytes,
/// All pages of the column in the row group.
pub pages: Vec<Page>,
pub row_group: Vec<Page>,
}

impl PageValue {
/// Creates a new page value.
pub fn new(pages: Vec<Page>) -> PageValue {
PageValue { pages }
/// Creates a new value from a compressed page.
pub fn new_compressed(bytes: Bytes) -> PageValue {
PageValue {
compressed: bytes,
row_group: vec![],
}
}

/// Creates a new value from all pages in a row group.
pub fn new_row_group(pages: Vec<Page>) -> PageValue {
PageValue {
compressed: Bytes::new(),
row_group: pages,
}
}

/// Returns memory used by the value (estimated).
fn estimated_size(&self) -> usize {
// We only consider heap size of all pages.
self.pages.iter().map(|page| page.buffer().len()).sum()
mem::size_of::<Self>()
+ self.compressed.len()
+ self
.row_group
.iter()
.map(|page| page.buffer().len())
.sum::<usize>()
}
}

Expand Down Expand Up @@ -507,13 +542,13 @@ mod tests {
.get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
.is_none());

let key = PageKey {
let key = PageKey::RowGroup {
region_id,
file_id,
row_group_idx: 0,
column_idx: 0,
};
let pages = Arc::new(PageValue::new(Vec::new()));
let pages = Arc::new(PageValue::default());
cache.put_pages(key.clone(), pages);
assert!(cache.get_pages(&key).is_none());

Expand Down Expand Up @@ -562,14 +597,14 @@ mod tests {
let cache = CacheManager::builder().page_cache_size(1000).build();
let region_id = RegionId::new(1, 1);
let file_id = FileId::random();
let key = PageKey {
let key = PageKey::Compressed {
region_id,
file_id,
row_group_idx: 0,
column_idx: 0,
};
assert!(cache.get_pages(&key).is_none());
let pages = Arc::new(PageValue::new(Vec::new()));
let pages = Arc::new(PageValue::default());
cache.put_pages(key.clone(), pages);
assert!(cache.get_pages(&key).is_some());
}
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ lazy_static! {
vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
)
.unwrap();
pub static ref READ_STAGE_FETCH_PAGES: Histogram = READ_STAGE_ELAPSED.with_label_values(&["fetch_pages"]);
/// Counter of rows read from different source.
pub static ref READ_ROWS_TOTAL: IntCounterVec =
register_int_counter_vec!("greptime_mito_read_rows_total", "mito read rows total", &[TYPE_LABEL]).unwrap();
Expand Down
3 changes: 3 additions & 0 deletions src/mito2/src/read/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ impl Drop for MergeReader {
READ_STAGE_ELAPSED
.with_label_values(&["merge"])
.observe(self.metrics.scan_cost.as_secs_f64());
READ_STAGE_ELAPSED
.with_label_values(&["merge_fetch"])
.observe(self.metrics.fetch_cost.as_secs_f64());
}
}

Expand Down
39 changes: 30 additions & 9 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -731,14 +731,28 @@ impl ScanInput {
}

for file in &self.files {
let range = PartitionRange {
start: file.meta_ref().time_range.0,
end: file.meta_ref().time_range.1,
num_rows: file.meta_ref().num_rows as usize,
identifier: id,
};
id += 1;
container.push(range);
if self.append_mode {
// For append mode, we can parallelize reading row groups.
for _ in 0..file.meta_ref().num_row_groups {
let range = PartitionRange {
start: file.time_range().0,
end: file.time_range().1,
num_rows: file.num_rows(),
identifier: id,
};
id += 1;
container.push(range);
}
} else {
let range = PartitionRange {
start: file.meta_ref().time_range.0,
end: file.meta_ref().time_range.1,
num_rows: file.meta_ref().num_rows as usize,
identifier: id,
};
id += 1;
container.push(range);
}
}

container
Expand Down Expand Up @@ -887,10 +901,16 @@ impl ScanPartList {
})
}

// TODO(yingwen): Also add a method to return num files.
/// Returns the number of file ranges.
pub(crate) fn num_file_ranges(&self) -> usize {
self.0.as_ref().map_or(0, |parts| {
parts.iter().map(|part| part.file_ranges.len()).sum()
parts
.iter()
.map(|part| part.file_ranges.iter())
.flatten()
.map(|ranges| ranges.len())
.sum()
})
}
}
Expand Down Expand Up @@ -931,6 +951,7 @@ impl StreamContext {
) -> fmt::Result {
match self.parts.try_lock() {
Ok(inner) => match t {
// TODO(yingwen): Print num files and ranges.
DisplayFormatType::Default => write!(
f,
"partition_count={} ({} memtable ranges, {} file ranges)",
Expand Down
19 changes: 11 additions & 8 deletions src/mito2/src/read/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ impl SeqScan {
reader_metrics.merge_from(reader.metrics());
}
debug!(
"Seq scan region {}, file {}, {} ranges finished, metrics: {:?}",
region_id, file_id, range_num, reader_metrics
"Seq scan region {}, file {}, {} ranges finished, metrics: {:?}, compaction: {}",
region_id, file_id, range_num, reader_metrics, compaction
);
// Reports metrics.
reader_metrics.observe_rows(read_type);
Expand Down Expand Up @@ -238,11 +238,12 @@ impl SeqScan {
let maybe_reader = Self::build_reader_from_sources(stream_ctx, sources, semaphore).await;
let build_reader_cost = build_start.elapsed();
metrics.build_reader_cost += build_reader_cost;
common_telemetry::debug!(
"Build reader region: {}, range_id: {}, from sources, build_reader_cost: {:?}",
debug!(
"Build reader region: {}, range_id: {}, from sources, build_reader_cost: {:?}, compaction: {}",
stream_ctx.input.mapper.metadata().region_id,
range_id,
build_reader_cost
build_reader_cost,
compaction,
);

maybe_reader
Expand Down Expand Up @@ -354,11 +355,12 @@ impl SeqScan {
metrics.observe_metrics_on_finish();

debug!(
"Seq scan finished, region_id: {:?}, partition: {}, metrics: {:?}, first_poll: {:?}",
"Seq scan finished, region_id: {:?}, partition: {}, metrics: {:?}, first_poll: {:?}, compaction: {}",
stream_ctx.input.mapper.metadata().region_id,
partition,
metrics,
first_poll,
compaction,
);
}
};
Expand Down Expand Up @@ -450,13 +452,14 @@ impl SeqScan {
metrics.total_cost = stream_ctx.query_start.elapsed();
metrics.observe_metrics_on_finish();

common_telemetry::debug!(
"Seq scan finished, region_id: {}, partition: {}, id: {}, metrics: {:?}, first_poll: {:?}",
debug!(
"Seq scan finished, region_id: {}, partition: {}, id: {}, metrics: {:?}, first_poll: {:?}, compaction: {}",
stream_ctx.input.mapper.metadata().region_id,
partition,
id,
metrics,
first_poll,
compaction,
);
}
};
Expand Down
22 changes: 11 additions & 11 deletions src/mito2/src/read/unordered_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ impl RegionScanner for UnorderedScan {
let parallelism = self.properties.num_partitions();
let stream = try_stream! {
let first_poll = stream_ctx.query_start.elapsed();

let part = {
let mut parts = stream_ctx.parts.lock().await;
maybe_init_parts(&stream_ctx.input, &mut parts, &mut metrics, parallelism)
Expand Down Expand Up @@ -180,6 +179,7 @@ impl RegionScanner for UnorderedScan {
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
metrics.build_reader_cost = build_reader_start.elapsed();

let query_start = stream_ctx.query_start;
let cache = stream_ctx.input.cache_manager.as_deref();
// Scans memtables first.
Expand Down Expand Up @@ -217,8 +217,8 @@ impl RegionScanner for UnorderedScan {
metrics.total_cost = query_start.elapsed();
metrics.observe_metrics_on_finish();
debug!(
"Unordered scan partition {} finished, region_id: {}, metrics: {:?}, reader_metrics: {:?}, first_poll: {:?}",
partition, mapper.metadata().region_id, metrics, reader_metrics, first_poll,
"Unordered scan partition {} finished, region_id: {}, metrics: {:?}, reader_metrics: {:?}, first_poll: {:?}, ranges: {}",
partition, mapper.metadata().region_id, metrics, reader_metrics, first_poll, part.file_ranges[0].len(),
);
};
let stream = Box::pin(RecordBatchStreamWrapper::new(
Expand Down Expand Up @@ -343,14 +343,14 @@ impl UnorderedDistributor {

let mems_per_part = ((self.mem_ranges.len() + parallelism - 1) / parallelism).max(1);
let ranges_per_part = ((self.file_ranges.len() + parallelism - 1) / parallelism).max(1);
common_telemetry::debug!(
"Parallel scan is enabled, parallelism: {}, {} mem_ranges, {} file_ranges, mems_per_part: {}, ranges_per_part: {}",
parallelism,
self.mem_ranges.len(),
self.file_ranges.len(),
mems_per_part,
ranges_per_part
);
debug!(
"Parallel scan is enabled, parallelism: {}, {} mem_ranges, {} file_ranges, mems_per_part: {}, ranges_per_part: {}",
parallelism,
self.mem_ranges.len(),
self.file_ranges.len(),
mems_per_part,
ranges_per_part
);
let mut scan_parts = self
.mem_ranges
.chunks(mems_per_part)
Expand Down
13 changes: 11 additions & 2 deletions src/mito2/src/sst/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,17 +216,26 @@ mod tests {
.await;
}

// Doesn't have compressed page cached.
let page_key = PageKey::Compressed {
region_id: metadata.region_id,
file_id: handle.file_id(),
row_group_idx: 0,
column_idx: 0,
};
assert!(cache.as_ref().unwrap().get_pages(&page_key).is_none());

// Cache 4 row groups.
for i in 0..4 {
let page_key = PageKey {
let page_key = PageKey::RowGroup {
region_id: metadata.region_id,
file_id: handle.file_id(),
row_group_idx: i,
column_idx: 0,
};
assert!(cache.as_ref().unwrap().get_pages(&page_key).is_some());
}
let page_key = PageKey {
let page_key = PageKey::RowGroup {
region_id: metadata.region_id,
file_id: handle.file_id(),
row_group_idx: 5,
Expand Down
13 changes: 6 additions & 7 deletions src/mito2/src/sst/parquet/page_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,22 @@ use std::collections::VecDeque;
use parquet::column::page::{Page, PageMetadata, PageReader};
use parquet::errors::Result;

/// A reader that reads from cached pages.
pub(crate) struct CachedPageReader {
/// A reader that reads all pages from a cache.
pub(crate) struct RowGroupCachedReader {
/// Cached pages.
pages: VecDeque<Page>,
}

impl CachedPageReader {
/// Returns a new reader from existing pages.
impl RowGroupCachedReader {
/// Returns a new reader from a cache.
evenyag marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) fn new(pages: &[Page]) -> Self {
Self {
pages: pages.iter().cloned().collect(),
}
}
}

impl PageReader for CachedPageReader {
impl PageReader for RowGroupCachedReader {
fn get_next_page(&mut self) -> Result<Option<Page>> {
Ok(self.pages.pop_front())
}
Expand All @@ -55,9 +55,8 @@ impl PageReader for CachedPageReader {
}
}

impl Iterator for CachedPageReader {
impl Iterator for RowGroupCachedReader {
type Item = Result<Page>;

fn next(&mut self) -> Option<Self::Item> {
self.get_next_page().transpose()
}
Expand Down
Loading
Loading