Skip to content

Commit

Permalink
feat: skip caching uncompressed pages if they are large (#4705)
Browse files Browse the repository at this point in the history
* feat: cache each uncompressed page

* chore: remove unused function

* chore: log

* chore: log

* chore: row group pages cache kv

* feat: also support row group level cache

* chore: fix range count

* feat: don't cache compressed page for row group cache

* feat: use function to get part

* chore: log whether scan is from compaction

* chore: avoid get column

* feat: add timer metrics

* chore: Revert "feat: add timer metrics"

This reverts commit 4618f57.

* feat: don't cache individual uncompressed page

* feat: append in row group level under append mode

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* chore: fetch pages cost

* perf: yield

* Update src/mito2/src/sst/parquet/row_group.rs

* refactor: cache key

* feat: print file num and row groups num in explain

* test: update sqlness test

* chore: Update src/mito2/src/sst/parquet/page_reader.rs

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
  • Loading branch information
evenyag and waynexia authored Sep 10, 2024
1 parent 04de3ed commit 3e17c09
Show file tree
Hide file tree
Showing 15 changed files with 248 additions and 128 deletions.
102 changes: 76 additions & 26 deletions src/mito2/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub(crate) mod write_cache;
use std::mem;
use std::sync::Arc;

use bytes::Bytes;
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
use moka::notification::RemovalCause;
Expand Down Expand Up @@ -393,42 +394,101 @@ impl SstMetaKey {
}
}

/// Cache key for pages of a SST row group.
/// Path to column pages in the SST file.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PageKey {
pub struct ColumnPagePath {
/// Region id of the SST file to cache.
pub region_id: RegionId,
region_id: RegionId,
/// Id of the SST file to cache.
pub file_id: FileId,
file_id: FileId,
/// Index of the row group.
pub row_group_idx: usize,
row_group_idx: usize,
/// Index of the column in the row group.
pub column_idx: usize,
column_idx: usize,
}

/// Cache key for pages of a SST row group.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum PageKey {
/// Cache key for a compressed page in a row group.
Compressed(ColumnPagePath),
/// Cache key for all uncompressed pages in a row group.
Uncompressed(ColumnPagePath),
}

impl PageKey {
/// Creates a key for a compressed page.
pub fn new_compressed(
region_id: RegionId,
file_id: FileId,
row_group_idx: usize,
column_idx: usize,
) -> PageKey {
PageKey::Compressed(ColumnPagePath {
region_id,
file_id,
row_group_idx,
column_idx,
})
}

/// Creates a key for all uncompressed pages in a row group.
pub fn new_uncompressed(
region_id: RegionId,
file_id: FileId,
row_group_idx: usize,
column_idx: usize,
) -> PageKey {
PageKey::Uncompressed(ColumnPagePath {
region_id,
file_id,
row_group_idx,
column_idx,
})
}

/// Returns memory used by the key (estimated).
fn estimated_size(&self) -> usize {
mem::size_of::<Self>()
}
}

/// Cached row group pages for a column.
// We don't use enum here to make it easier to mock and use the struct.
#[derive(Default)]
pub struct PageValue {
/// Compressed page of the column in the row group.
pub compressed: Bytes,
/// All pages of the column in the row group.
pub pages: Vec<Page>,
pub row_group: Vec<Page>,
}

impl PageValue {
/// Creates a new page value.
pub fn new(pages: Vec<Page>) -> PageValue {
PageValue { pages }
/// Creates a new value from a compressed page.
pub fn new_compressed(bytes: Bytes) -> PageValue {
PageValue {
compressed: bytes,
row_group: vec![],
}
}

/// Creates a new value from all pages in a row group.
pub fn new_row_group(pages: Vec<Page>) -> PageValue {
PageValue {
compressed: Bytes::new(),
row_group: pages,
}
}

/// Returns memory used by the value (estimated).
fn estimated_size(&self) -> usize {
// We only consider heap size of all pages.
self.pages.iter().map(|page| page.buffer().len()).sum()
mem::size_of::<Self>()
+ self.compressed.len()
+ self
.row_group
.iter()
.map(|page| page.buffer().len())
.sum::<usize>()
}
}

Expand Down Expand Up @@ -507,13 +567,8 @@ mod tests {
.get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
.is_none());

let key = PageKey {
region_id,
file_id,
row_group_idx: 0,
column_idx: 0,
};
let pages = Arc::new(PageValue::new(Vec::new()));
let key = PageKey::new_uncompressed(region_id, file_id, 0, 0);
let pages = Arc::new(PageValue::default());
cache.put_pages(key.clone(), pages);
assert!(cache.get_pages(&key).is_none());

Expand Down Expand Up @@ -562,14 +617,9 @@ mod tests {
let cache = CacheManager::builder().page_cache_size(1000).build();
let region_id = RegionId::new(1, 1);
let file_id = FileId::random();
let key = PageKey {
region_id,
file_id,
row_group_idx: 0,
column_idx: 0,
};
let key = PageKey::new_compressed(region_id, file_id, 0, 0);
assert!(cache.get_pages(&key).is_none());
let pages = Arc::new(PageValue::new(Vec::new()));
let pages = Arc::new(PageValue::default());
cache.put_pages(key.clone(), pages);
assert!(cache.get_pages(&key).is_some());
}
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ lazy_static! {
vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
)
.unwrap();
pub static ref READ_STAGE_FETCH_PAGES: Histogram = READ_STAGE_ELAPSED.with_label_values(&["fetch_pages"]);
/// Counter of rows read from different source.
pub static ref READ_ROWS_TOTAL: IntCounterVec =
register_int_counter_vec!("greptime_mito_read_rows_total", "mito read rows total", &[TYPE_LABEL]).unwrap();
Expand Down
3 changes: 3 additions & 0 deletions src/mito2/src/read/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ impl Drop for MergeReader {
READ_STAGE_ELAPSED
.with_label_values(&["merge"])
.observe(self.metrics.scan_cost.as_secs_f64());
READ_STAGE_ELAPSED
.with_label_values(&["merge_fetch"])
.observe(self.metrics.fetch_cost.as_secs_f64());
}
}

Expand Down
16 changes: 14 additions & 2 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -901,10 +901,21 @@ impl ScanPartList {
})
}

/// Returns the number of files.
pub(crate) fn num_files(&self) -> usize {
self.0.as_ref().map_or(0, |parts| {
parts.iter().map(|part| part.file_ranges.len()).sum()
})
}

/// Returns the number of file ranges.
pub(crate) fn num_file_ranges(&self) -> usize {
self.0.as_ref().map_or(0, |parts| {
parts.iter().map(|part| part.file_ranges.len()).sum()
parts
.iter()
.flat_map(|part| part.file_ranges.iter())
.map(|ranges| ranges.len())
.sum()
})
}
}
Expand Down Expand Up @@ -947,9 +958,10 @@ impl StreamContext {
Ok(inner) => match t {
DisplayFormatType::Default => write!(
f,
"partition_count={} ({} memtable ranges, {} file ranges)",
"partition_count={} ({} memtable ranges, {} file {} ranges)",
inner.0.len(),
inner.0.num_mem_ranges(),
inner.0.num_files(),
inner.0.num_file_ranges()
)?,
DisplayFormatType::Verbose => write!(f, "{:?}", inner.0)?,
Expand Down
19 changes: 11 additions & 8 deletions src/mito2/src/read/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ impl SeqScan {
reader_metrics.merge_from(reader.metrics());
}
debug!(
"Seq scan region {}, file {}, {} ranges finished, metrics: {:?}",
region_id, file_id, range_num, reader_metrics
"Seq scan region {}, file {}, {} ranges finished, metrics: {:?}, compaction: {}",
region_id, file_id, range_num, reader_metrics, compaction
);
// Reports metrics.
reader_metrics.observe_rows(read_type);
Expand Down Expand Up @@ -238,11 +238,12 @@ impl SeqScan {
let maybe_reader = Self::build_reader_from_sources(stream_ctx, sources, semaphore).await;
let build_reader_cost = build_start.elapsed();
metrics.build_reader_cost += build_reader_cost;
common_telemetry::debug!(
"Build reader region: {}, range_id: {}, from sources, build_reader_cost: {:?}",
debug!(
"Build reader region: {}, range_id: {}, from sources, build_reader_cost: {:?}, compaction: {}",
stream_ctx.input.mapper.metadata().region_id,
range_id,
build_reader_cost
build_reader_cost,
compaction,
);

maybe_reader
Expand Down Expand Up @@ -354,11 +355,12 @@ impl SeqScan {
metrics.observe_metrics_on_finish();

debug!(
"Seq scan finished, region_id: {:?}, partition: {}, metrics: {:?}, first_poll: {:?}",
"Seq scan finished, region_id: {:?}, partition: {}, metrics: {:?}, first_poll: {:?}, compaction: {}",
stream_ctx.input.mapper.metadata().region_id,
partition,
metrics,
first_poll,
compaction,
);
}
};
Expand Down Expand Up @@ -450,13 +452,14 @@ impl SeqScan {
metrics.total_cost = stream_ctx.query_start.elapsed();
metrics.observe_metrics_on_finish();

common_telemetry::debug!(
"Seq scan finished, region_id: {}, partition: {}, id: {}, metrics: {:?}, first_poll: {:?}",
debug!(
"Seq scan finished, region_id: {}, partition: {}, id: {}, metrics: {:?}, first_poll: {:?}, compaction: {}",
stream_ctx.input.mapper.metadata().region_id,
partition,
id,
metrics,
first_poll,
compaction,
);
}
};
Expand Down
22 changes: 11 additions & 11 deletions src/mito2/src/read/unordered_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ impl RegionScanner for UnorderedScan {
let parallelism = self.properties.num_partitions();
let stream = try_stream! {
let first_poll = stream_ctx.query_start.elapsed();

let part = {
let mut parts = stream_ctx.parts.lock().await;
maybe_init_parts(&stream_ctx.input, &mut parts, &mut metrics, parallelism)
Expand Down Expand Up @@ -180,6 +179,7 @@ impl RegionScanner for UnorderedScan {
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
metrics.build_reader_cost = build_reader_start.elapsed();

let query_start = stream_ctx.query_start;
let cache = stream_ctx.input.cache_manager.as_deref();
// Scans memtables first.
Expand Down Expand Up @@ -217,8 +217,8 @@ impl RegionScanner for UnorderedScan {
metrics.total_cost = query_start.elapsed();
metrics.observe_metrics_on_finish();
debug!(
"Unordered scan partition {} finished, region_id: {}, metrics: {:?}, reader_metrics: {:?}, first_poll: {:?}",
partition, mapper.metadata().region_id, metrics, reader_metrics, first_poll,
"Unordered scan partition {} finished, region_id: {}, metrics: {:?}, reader_metrics: {:?}, first_poll: {:?}, ranges: {}",
partition, mapper.metadata().region_id, metrics, reader_metrics, first_poll, part.file_ranges[0].len(),
);
};
let stream = Box::pin(RecordBatchStreamWrapper::new(
Expand Down Expand Up @@ -343,14 +343,14 @@ impl UnorderedDistributor {

let mems_per_part = ((self.mem_ranges.len() + parallelism - 1) / parallelism).max(1);
let ranges_per_part = ((self.file_ranges.len() + parallelism - 1) / parallelism).max(1);
common_telemetry::debug!(
"Parallel scan is enabled, parallelism: {}, {} mem_ranges, {} file_ranges, mems_per_part: {}, ranges_per_part: {}",
parallelism,
self.mem_ranges.len(),
self.file_ranges.len(),
mems_per_part,
ranges_per_part
);
debug!(
"Parallel scan is enabled, parallelism: {}, {} mem_ranges, {} file_ranges, mems_per_part: {}, ranges_per_part: {}",
parallelism,
self.mem_ranges.len(),
self.file_ranges.len(),
mems_per_part,
ranges_per_part
);
let mut scan_parts = self
.mem_ranges
.chunks(mems_per_part)
Expand Down
18 changes: 6 additions & 12 deletions src/mito2/src/sst/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,22 +216,16 @@ mod tests {
.await;
}

// Doesn't have compressed page cached.
let page_key = PageKey::new_compressed(metadata.region_id, handle.file_id(), 0, 0);
assert!(cache.as_ref().unwrap().get_pages(&page_key).is_none());

// Cache 4 row groups.
for i in 0..4 {
let page_key = PageKey {
region_id: metadata.region_id,
file_id: handle.file_id(),
row_group_idx: i,
column_idx: 0,
};
let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), i, 0);
assert!(cache.as_ref().unwrap().get_pages(&page_key).is_some());
}
let page_key = PageKey {
region_id: metadata.region_id,
file_id: handle.file_id(),
row_group_idx: 5,
column_idx: 0,
};
let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), 5, 0);
assert!(cache.as_ref().unwrap().get_pages(&page_key).is_none());
}

Expand Down
13 changes: 6 additions & 7 deletions src/mito2/src/sst/parquet/page_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,22 @@ use std::collections::VecDeque;
use parquet::column::page::{Page, PageMetadata, PageReader};
use parquet::errors::Result;

/// A reader that reads from cached pages.
pub(crate) struct CachedPageReader {
/// A reader that reads all pages from a cache.
pub(crate) struct RowGroupCachedReader {
/// Cached pages.
pages: VecDeque<Page>,
}

impl CachedPageReader {
/// Returns a new reader from existing pages.
impl RowGroupCachedReader {
/// Returns a new reader from pages of a column in a row group.
pub(crate) fn new(pages: &[Page]) -> Self {
Self {
pages: pages.iter().cloned().collect(),
}
}
}

impl PageReader for CachedPageReader {
impl PageReader for RowGroupCachedReader {
fn get_next_page(&mut self) -> Result<Option<Page>> {
Ok(self.pages.pop_front())
}
Expand All @@ -55,9 +55,8 @@ impl PageReader for CachedPageReader {
}
}

impl Iterator for CachedPageReader {
impl Iterator for RowGroupCachedReader {
type Item = Result<Page>;

fn next(&mut self) -> Option<Self::Item> {
self.get_next_page().transpose()
}
Expand Down
Loading

0 comments on commit 3e17c09

Please sign in to comment.