Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: lazily create guarded metrics when first using it #17812

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions src/batch/src/executor/log_row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Deref;
use std::sync::Arc;

use futures::prelude::stream::StreamExt;
Expand Down Expand Up @@ -168,7 +167,7 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
old_epoch.clone(),
new_epoch.clone(),
chunk_size,
histogram.clone(),
histogram.as_ref().map(|h| &***h),
Arc::new(schema.clone()),
);
#[for_await]
Expand All @@ -184,7 +183,7 @@ impl<S: StateStore> LogRowSeqScanExecutor<S> {
old_epoch: BatchQueryEpoch,
new_epoch: BatchQueryEpoch,
chunk_size: usize,
histogram: Option<impl Deref<Target = Histogram>>,
histogram: Option<&Histogram>,
schema: Arc<Schema>,
) {
// Range Scan.
Expand Down
14 changes: 9 additions & 5 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,9 +340,13 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
// Point Get
for point_get in point_gets {
let table = table.clone();
if let Some(row) =
Self::execute_point_get(table, point_get, query_epoch.clone(), histogram.clone())
.await?
if let Some(row) = Self::execute_point_get(
table,
point_get,
query_epoch.clone(),
histogram.as_ref().map(|h| &***h),
)
.await?
{
if let Some(chunk) = data_chunk_builder.append_one_row(row) {
returned += chunk.cardinality() as u64;
Expand Down Expand Up @@ -376,7 +380,7 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
query_epoch.clone(),
chunk_size,
limit,
histogram.clone(),
histogram.as_ref().map(|h| &***h),
);
#[for_await]
for chunk in stream {
Expand All @@ -396,7 +400,7 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
table: Arc<StorageTable<S>>,
scan_range: ScanRange,
epoch: BatchQueryEpoch,
histogram: Option<impl Deref<Target = Histogram>>,
histogram: Option<&Histogram>,
) -> Result<Option<OwnedRow>> {
let pk_prefix = scan_range.pk_prefix;
assert!(pk_prefix.len() == table.pk_indices().len());
Expand Down
80 changes: 66 additions & 14 deletions src/common/metrics/src/guarded_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use prometheus::core::{
};
use prometheus::local::{LocalHistogram, LocalIntCounter};
use prometheus::proto::MetricFamily;
use prometheus::{Gauge, Histogram, IntCounter, IntGauge};
use prometheus::Histogram;
use thiserror_ext::AsReport;
use tracing::warn;

Expand Down Expand Up @@ -126,6 +126,8 @@ mod tait {
}
pub use tait::*;

use crate::guarded_metrics::lazy_guarded_metric::{lazy_guarded_metrics, LazyGuardedMetrics};

pub type LabelGuardedHistogramVec<const N: usize> = LabelGuardedMetricVec<VecBuilderOfHistogram, N>;
pub type LabelGuardedIntCounterVec<const N: usize> =
LabelGuardedMetricVec<VecBuilderOfCounter<AtomicU64>, N>;
Expand All @@ -134,10 +136,14 @@ pub type LabelGuardedIntGaugeVec<const N: usize> =
pub type LabelGuardedGaugeVec<const N: usize> =
LabelGuardedMetricVec<VecBuilderOfGauge<AtomicF64>, N>;

pub type LabelGuardedHistogram<const N: usize> = LabelGuardedMetric<Histogram, N>;
pub type LabelGuardedIntCounter<const N: usize> = LabelGuardedMetric<IntCounter, N>;
pub type LabelGuardedIntGauge<const N: usize> = LabelGuardedMetric<IntGauge, N>;
pub type LabelGuardedGauge<const N: usize> = LabelGuardedMetric<Gauge, N>;
pub type LabelGuardedHistogram<const N: usize> =
LabelGuardedMetric<LazyGuardedMetrics<VecBuilderOfHistogram, N>, N>;
pub type LabelGuardedIntCounter<const N: usize> =
LabelGuardedMetric<LazyGuardedMetrics<VecBuilderOfCounter<AtomicU64>, N>, N>;
pub type LabelGuardedIntGauge<const N: usize> =
LabelGuardedMetric<LazyGuardedMetrics<VecBuilderOfGauge<AtomicI64>, N>, N>;
pub type LabelGuardedGauge<const N: usize> =
LabelGuardedMetric<LazyGuardedMetrics<VecBuilderOfGauge<AtomicF64>, N>, N>;

pub type LabelGuardedLocalHistogram<const N: usize> = LabelGuardedMetric<LocalHistogram, N>;
pub type LabelGuardedLocalIntCounter<const N: usize> = LabelGuardedMetric<LocalIntCounter, N>;
Expand Down Expand Up @@ -227,6 +233,45 @@ impl<T: MetricVecBuilder, const N: usize> Collector for LabelGuardedMetricVec<T,
}
}

pub(crate) mod lazy_guarded_metric {
use std::marker::PhantomData;
use std::ops::Deref;
use std::sync::{Arc, LazyLock};

use prometheus::core::{MetricVec, MetricVecBuilder};

use crate::guarded_metrics::LabelGuard;

type GuardedMetricsLazyLock<T: MetricVecBuilder, const N: usize> =
Arc<LazyLock<T::M, impl FnOnce() -> T::M>>;

#[derive(Clone)]
pub struct LazyGuardedMetrics<T: MetricVecBuilder, const N: usize> {
inner: GuardedMetricsLazyLock<T, N>,
_phantom: PhantomData<T>,
}

pub(super) fn lazy_guarded_metrics<T: MetricVecBuilder, const N: usize>(
metric_vec: MetricVec<T>,
guard: Arc<LabelGuard<N>>,
) -> LazyGuardedMetrics<T, N> {
LazyGuardedMetrics {
inner: Arc::new(LazyLock::new(move || {
metric_vec.with_label_values(&guard.labels.each_ref().map(|s| s.as_str()))
})),
_phantom: PhantomData,
}
}

impl<T: MetricVecBuilder, const N: usize> Deref for LazyGuardedMetrics<T, N> {
type Target = T::M;

fn deref(&self) -> &Self::Target {
&self.inner
}
}
}

impl<T: MetricVecBuilder, const N: usize> LabelGuardedMetricVec<T, N> {
pub fn new(inner: MetricVec<T>, labels: &[&'static str; N]) -> Self {
Self {
Expand All @@ -246,16 +291,20 @@ impl<T: MetricVecBuilder, const N: usize> LabelGuardedMetricVec<T, N> {
/// Instead, we should store the returned `LabelGuardedMetric` in a scope with longer
/// lifetime so that the labels can be regarded as being used in its whole life scope.
/// This is also the recommended way to use the raw metrics vec.
pub fn with_guarded_label_values(&self, labels: &[&str; N]) -> LabelGuardedMetric<T::M, N> {
let guard = LabelGuardedMetricsInfo::register_new_label(&self.info, labels);
let inner = self.inner.with_label_values(labels);
pub fn with_guarded_label_values(
&self,
labels: &[&str; N],
) -> LabelGuardedMetric<LazyGuardedMetrics<T, N>, N> {
let guard = Arc::new(LabelGuardedMetricsInfo::register_new_label(
&self.info, labels,
));
LabelGuardedMetric {
inner,
_guard: Arc::new(guard),
inner: lazy_guarded_metrics(self.inner.clone(), guard.clone()),
_guard: guard,
}
}

pub fn with_test_label(&self) -> LabelGuardedMetric<T::M, N> {
pub fn with_test_label(&self) -> LabelGuardedMetric<LazyGuardedMetrics<T, N>, N> {
let labels: [&'static str; N] = gen_test_label::<N>();
self.with_guarded_label_values(&labels)
}
Expand Down Expand Up @@ -308,7 +357,6 @@ impl<const N: usize> LabelGuardedHistogramVec<N> {
}
}

#[derive(Clone)]
struct LabelGuard<const N: usize> {
labels: [String; N],
info: Arc<Mutex<LabelGuardedMetricsInfo<N>>>,
Expand Down Expand Up @@ -396,8 +444,12 @@ impl<P: Atomic> MetricWithLocal for GenericCounter<P> {
}
}

impl<T: MetricWithLocal, const N: usize> LabelGuardedMetric<T, N> {
pub fn local(&self) -> LabelGuardedMetric<T::Local, N> {
impl<T, const N: usize> LabelGuardedMetric<LazyGuardedMetrics<T, N>, N>
where
T: MetricVecBuilder,
T::M: MetricWithLocal,
{
pub fn local(&self) -> LabelGuardedMetric<<T::M as MetricWithLocal>::Local, N> {
LabelGuardedMetric {
inner: self.inner.local(),
_guard: self._guard.clone(),
Expand Down
6 changes: 5 additions & 1 deletion src/common/metrics/src/relabeled_metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use prometheus::core::{MetricVec, MetricVecBuilder};
use prometheus::{HistogramVec, IntCounterVec};

use crate::guarded_metrics::lazy_guarded_metric::LazyGuardedMetrics;
use crate::{
LabelGuardedHistogramVec, LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
LabelGuardedMetric, LabelGuardedMetricVec, MetricLevel,
Expand Down Expand Up @@ -89,7 +90,10 @@ impl<T: MetricVecBuilder> RelabeledMetricVec<MetricVec<T>> {
}

impl<T: MetricVecBuilder, const N: usize> RelabeledMetricVec<LabelGuardedMetricVec<T, N>> {
pub fn with_label_values(&self, vals: &[&str; N]) -> LabelGuardedMetric<T::M, N> {
pub fn with_label_values(
&self,
vals: &[&str; N],
) -> LabelGuardedMetric<LazyGuardedMetrics<T, N>, N> {
if self.metric_level > self.relabel_threshold {
// relabel first n labels to empty string
let mut relabeled_vals = *vals;
Expand Down
33 changes: 8 additions & 25 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ mod storage_catalog;
use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;
use std::num::NonZeroU64;
use std::ops::Deref;
use std::sync::Arc;

use anyhow::{anyhow, Context};
Expand Down Expand Up @@ -799,12 +798,8 @@ impl IcebergWriter {
let prometheus_builder = PrometheusWriterBuilder::new(
dispatch_builder,
WriterMetrics::new(
writer_param.sink_metrics.iceberg_write_qps.deref().clone(),
writer_param
.sink_metrics
.iceberg_write_latency
.deref()
.clone(),
(**writer_param.sink_metrics.iceberg_write_qps).clone(),
(**writer_param.sink_metrics.iceberg_write_latency).clone(),
),
);
let schema = Self::schema_with_extra_partition_col(&table, extra_partition_col_idx)?;
Expand All @@ -822,12 +817,8 @@ impl IcebergWriter {
let prometheus_builder = PrometheusWriterBuilder::new(
dispatch_builder,
WriterMetrics::new(
writer_param.sink_metrics.iceberg_write_qps.deref().clone(),
writer_param
.sink_metrics
.iceberg_write_latency
.deref()
.clone(),
(**writer_param.sink_metrics.iceberg_write_qps).clone(),
(**writer_param.sink_metrics.iceberg_write_latency).clone(),
),
);
let schema = table.current_arrow_schema()?;
Expand Down Expand Up @@ -879,12 +870,8 @@ impl IcebergWriter {
let prometheus_builder = PrometheusWriterBuilder::new(
dispatch_builder,
WriterMetrics::new(
writer_param.sink_metrics.iceberg_write_qps.deref().clone(),
writer_param
.sink_metrics
.iceberg_write_latency
.deref()
.clone(),
(**writer_param.sink_metrics.iceberg_write_qps).clone(),
(**writer_param.sink_metrics.iceberg_write_latency).clone(),
),
);
let schema = Self::schema_with_extra_partition_col(&table, extra_partition_col_idx)?;
Expand All @@ -902,12 +889,8 @@ impl IcebergWriter {
let prometheus_builder = PrometheusWriterBuilder::new(
dispatch_builder,
WriterMetrics::new(
writer_param.sink_metrics.iceberg_write_qps.deref().clone(),
writer_param
.sink_metrics
.iceberg_write_latency
.deref()
.clone(),
(**writer_param.sink_metrics.iceberg_write_qps).clone(),
(**writer_param.sink_metrics.iceberg_write_latency).clone(),
),
);
let schema = table.current_arrow_schema()?;
Expand Down
7 changes: 3 additions & 4 deletions src/stream/src/executor/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ use std::task::{Context, Poll};

use anyhow::Context as _;
use futures::stream::{FusedStream, FuturesUnordered, StreamFuture};
use prometheus::Histogram;
use risingwave_common::config::MetricLevel;
use risingwave_common::metrics::LabelGuardedMetric;
use risingwave_common::metrics::LabelGuardedHistogram;
use tokio::time::Instant;

use super::exchange::input::BoxedInput;
Expand Down Expand Up @@ -285,7 +284,7 @@ pub struct SelectReceivers {
/// watermark column index -> `BufferedWatermarks`
buffered_watermarks: BTreeMap<usize, BufferedWatermarks<ActorId>>,
/// If None, then we don't take `Instant::now()` and `observe` during `poll_next`
merge_barrier_align_duration: Option<LabelGuardedMetric<Histogram, 2>>,
merge_barrier_align_duration: Option<LabelGuardedHistogram<2>>,
}

impl Stream for SelectReceivers {
Expand Down Expand Up @@ -387,7 +386,7 @@ impl SelectReceivers {
fn new(
actor_id: u32,
upstreams: Vec<BoxedInput>,
merge_barrier_align_duration: Option<LabelGuardedMetric<Histogram, 2>>,
merge_barrier_align_duration: Option<LabelGuardedHistogram<2>>,
) -> Self {
assert!(!upstreams.is_empty());
let upstream_actor_ids = upstreams.iter().map(|input| input.actor_id()).collect();
Expand Down
Loading