Skip to content

Commit

Permalink
feat(refill): fetch whole sst file when refilling (#12265)
Browse files Browse the repository at this point in the history
Signed-off-by: MrCroxx <mrcroxx@outlook.com>
  • Loading branch information
MrCroxx authored and Li0k committed Sep 15, 2023
1 parent b320ddf commit 3aa1a47
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 125 deletions.
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

24 changes: 6 additions & 18 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -2262,16 +2262,12 @@ def section_hummock_tiered_cache(outer_panels):
"",
[
panels.target(
f"sum(rate({metric('data_refill_duration_count')}[$__rate_interval])) by (op, instance)",
"data file cache refill - {{op}} @ {{instance}}",
f"sum(rate({metric('refill_duration_count')}[$__rate_interval])) by (type, op, instance)",
"{{type}} file cache refill - {{op}} @ {{instance}}",
),
panels.target(
f"sum(rate({metric('data_refill_filtered_total')}[$__rate_interval])) by (instance)",
"data file cache refill - filtered @ {{instance}}",
),
panels.target(
f"sum(rate({metric('meta_refill_duration_count')}[$__rate_interval])) by (op, instance)",
"meta file cache refill - {{op}} @ {{instance}}",
f"sum(rate({metric('refill_total')}[$__rate_interval])) by (type, op, instance)",
"{{type}} file cache refill - {{op}} @ {{instance}}",
),
],
),
Expand All @@ -2281,17 +2277,9 @@ def section_hummock_tiered_cache(outer_panels):
[
*quantile(
lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate({metric('data_refill_duration_bucket')}[$__rate_interval])) by (le, op, instance))",
f"p{legend} - " +
"data file cache refill - {{op}} @ {{instance}}",
),
[50, 99, "max"],
),
*quantile(
lambda quantile, legend: panels.target(
f"histogram_quantile({quantile}, sum(rate({metric('meta_refill_duration_bucket')}[$__rate_interval])) by (le, instance))",
f"histogram_quantile({quantile}, sum(rate({metric('refill_duration_bucket')}[$__rate_interval])) by (le, type, op, instance))",
f"p{legend} - " +
"meta cache refill @ {{instance}}",
"{{type}} file cache refill - {{op}} @ {{instance}}",
),
[50, 99, "max"],
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1121,7 +1121,7 @@ pub mod default {
}

pub fn concurrency() -> usize {
100
10
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ reclaim_rate_limit_mb = 0
[storage.cache_refill]
data_refill_levels = []
timeout_ms = 6000
concurrency = 100
concurrency = 10

[system]
barrier_interval_ms = 1000
Expand Down
2 changes: 1 addition & 1 deletion src/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ dyn-clone = "1.0.13"
either = "1"
enum-as-inner = "0.6"
fail = "0.5"
foyer = { git = "https://github.com/mrcroxx/foyer", rev = "2c6f080" }
foyer = { git = "https://github.com/mrcroxx/foyer", rev = "41b1d39" }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
futures-async-stream = { workspace = true }
hex = "0.4"
Expand Down
135 changes: 68 additions & 67 deletions src/storage/src/hummock/event_handler/refiller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@ use std::time::{Duration, Instant};
use futures::future::{join_all, try_join_all};
use futures::{Future, FutureExt};
use itertools::Itertools;
use prometheus::core::{AtomicU64, GenericCounter};
use prometheus::core::{AtomicU64, GenericCounter, GenericCounterVec};
use prometheus::{
register_histogram_vec_with_registry, register_histogram_with_registry,
register_int_counter_with_registry, register_int_gauge_with_registry, Histogram, HistogramVec,
IntGauge, Registry,
register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
register_int_gauge_with_registry, Histogram, HistogramVec, IntGauge, Registry,
};
use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::SstDeltaInfo;
Expand All @@ -41,46 +40,52 @@ pub static GLOBAL_CACHE_REFILL_METRICS: LazyLock<CacheRefillMetrics> =
LazyLock::new(|| CacheRefillMetrics::new(&GLOBAL_METRICS_REGISTRY));

pub struct CacheRefillMetrics {
pub data_refill_duration: HistogramVec,
pub refill_duration: HistogramVec,
pub refill_total: GenericCounterVec<AtomicU64>,

pub data_refill_duration_admitted: Histogram,
pub data_refill_duration_rejected: Histogram,
pub data_refill_filtered_total: GenericCounter<AtomicU64>,
pub data_refill_success_duration: Histogram,
pub meta_refill_success_duration: Histogram,

pub meta_refill_duration: Histogram,
pub data_refill_filtered_total: GenericCounter<AtomicU64>,
pub data_refill_attempts_total: GenericCounter<AtomicU64>,
pub meta_refill_attempts_total: GenericCounter<AtomicU64>,

pub refill_queue_total: IntGauge,
}

impl CacheRefillMetrics {
pub fn new(registry: &Registry) -> Self {
let data_refill_duration = register_histogram_vec_with_registry!(
"data_refill_duration",
"data refill duration",
&["op"],
let refill_duration = register_histogram_vec_with_registry!(
"refill_duration",
"refill duration",
&["type", "op"],
registry,
)
.unwrap();
let data_refill_duration_admitted = data_refill_duration
.get_metric_with_label_values(&["admitted"])
.unwrap();
let data_refill_duration_rejected = data_refill_duration
.get_metric_with_label_values(&["rejected"])
.unwrap();

let data_refill_filtered_total = register_int_counter_with_registry!(
"data_refill_filtered_total",
"data refill filtered total",
let refill_total = register_int_counter_vec_with_registry!(
"refill_total",
"refill total",
&["type", "op"],
registry,
)
.unwrap();

let meta_refill_duration = register_histogram_with_registry!(
"meta_refill_duration",
"meta refill duration",
registry,
)
.unwrap();
let data_refill_success_duration = refill_duration
.get_metric_with_label_values(&["data", "success"])
.unwrap();
let meta_refill_success_duration = refill_duration
.get_metric_with_label_values(&["meta", "success"])
.unwrap();

let data_refill_filtered_total = refill_total
.get_metric_with_label_values(&["data", "filtered"])
.unwrap();
let data_refill_attempts_total = refill_total
.get_metric_with_label_values(&["data", "attempts"])
.unwrap();
let meta_refill_attempts_total = refill_total
.get_metric_with_label_values(&["meta", "attempts"])
.unwrap();

let refill_queue_total = register_int_gauge_with_registry!(
"refill_queue_total",
Expand All @@ -90,11 +95,14 @@ impl CacheRefillMetrics {
.unwrap();

Self {
data_refill_duration,
data_refill_duration_admitted,
data_refill_duration_rejected,
refill_duration,
refill_total,

data_refill_success_duration,
meta_refill_success_duration,
data_refill_filtered_total,
meta_refill_duration,
data_refill_attempts_total,
meta_refill_attempts_total,
refill_queue_total,
}
}
Expand Down Expand Up @@ -233,10 +241,14 @@ impl CacheRefillTask {
.insert_sst_infos
.iter()
.map(|info| async {
let _timer = GLOBAL_CACHE_REFILL_METRICS
.meta_refill_duration
.start_timer();
context.sstable_store.sstable_syncable(info, &stats).await
GLOBAL_CACHE_REFILL_METRICS.meta_refill_attempts_total.inc();

let now = Instant::now();
let res = context.sstable_store.sstable_syncable(info, &stats).await;
GLOBAL_CACHE_REFILL_METRICS
.meta_refill_success_duration
.observe(now.elapsed().as_secs_f64());
res
})
.collect_vec();
let res = try_join_all(tasks).await?;
Expand Down Expand Up @@ -271,42 +283,31 @@ impl CacheRefillTask {
.iter()
.any(|id| filter.contains(id))
{
let blocks = holders
.iter()
.map(|meta| meta.value().block_count() as u64)
.sum();
GLOBAL_CACHE_REFILL_METRICS
.data_refill_filtered_total
.inc_by(blocks);
GLOBAL_CACHE_REFILL_METRICS.data_refill_filtered_total.inc();
return;
}

let mut tasks = vec![];
for sst_info in &holders {
for block_index in 0..sst_info.value().block_count() {
let meta = sst_info.value();
let mut stat = StoreLocalStatistic::default();
let task = async move {
let permit = context.concurrency.acquire().await.unwrap();
match context
.sstable_store
.may_fill_data_file_cache(meta, block_index, &mut stat)
.await
{
Ok(true) => GLOBAL_CACHE_REFILL_METRICS
.data_refill_duration_admitted
.observe(now.elapsed().as_secs_f64()),
Ok(false) => GLOBAL_CACHE_REFILL_METRICS
.data_refill_duration_rejected
.observe(now.elapsed().as_secs_f64()),
Err(e) => {
tracing::warn!("data cache refill error: {:?}", e);
}
let task = async move {
let permit = context.concurrency.acquire().await.unwrap();

GLOBAL_CACHE_REFILL_METRICS.data_refill_attempts_total.inc();
match context
.sstable_store
.fill_data_file_cache(sst_info.value())
.await
{
Ok(()) => GLOBAL_CACHE_REFILL_METRICS
.data_refill_success_duration
.observe(now.elapsed().as_secs_f64()),
Err(e) => {
tracing::warn!("data cache refill error: {:?}", e);
}
drop(permit);
};
tasks.push(task);
}
}
drop(permit);
};
tasks.push(task);
}

join_all(tasks).await;
Expand Down
15 changes: 15 additions & 0 deletions src/storage/src/hummock/file_cache/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,21 @@ where
}
}

#[tracing::instrument(skip(self, value))]
pub async fn insert_force(&self, key: K, value: V) -> Result<bool> {
match self {
FileCache::None => Ok(false),
FileCache::FoyerRuntime { runtime, store, .. } => {
let store = store.clone();
runtime
.spawn(async move { store.insert_force(key, value).await })
.await
.unwrap()
.map_err(FileCacheError::foyer)
}
}
}

/// only fetch value if judge pass
#[tracing::instrument(skip(self, fetch_value))]
pub async fn insert_with<F, FU>(
Expand Down
Loading

0 comments on commit 3aa1a47

Please sign in to comment.