Skip to content

Commit

Permalink
feat(streaming): monitor materialize cache miss rate when handling pk…
Browse files Browse the repository at this point in the history
… conflict (risingwavelabs#8946)
  • Loading branch information
wcy-fdu authored Apr 10, 2023
1 parent 1e3221b commit ed5af28
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 10 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

37 changes: 36 additions & 1 deletion grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -996,7 +996,7 @@ def section_streaming_actors(outer_panels):
[
panels.target(
f"rate({metric('stream_join_lookup_miss_count')}[$__rate_interval])",
"cache miss table - {{side}} side, join_table_id {{join_table_id}} degree_table_id {{degree_table_id}} actor {{actor_id}} ",
"cache miss - {{side}} side, join_table_id {{join_table_id}} degree_table_id {{degree_table_id}} actor {{actor_id}} ",
),
panels.target(
f"rate({metric('stream_join_lookup_total_count')}[$__rate_interval])",
Expand All @@ -1008,6 +1008,41 @@ def section_streaming_actors(outer_panels):
),
],
),
panels.timeseries_actor_ops(
"Materialize Executor Cache",
"",
[
panels.target(
f"rate({metric('stream_materialize_cache_hit_count')}[$__rate_interval])",
"cache hit count - table {{table_id}} - actor {{actor_id}} {{instance}}",
),
panels.target(
f"rate({metric('stream_materialize_cache_total_count')}[$__rate_interval])",
"total cached count - table {{table_id}} - actor {{actor_id}} {{instance}}",
),
],
),
panels.timeseries_percentage(
"Executor Cache Miss Ratio",
"",
[
panels.target(
f"(sum(rate({metric('stream_join_lookup_miss_count')}[$__rate_interval])) by (side, join_table_id, degree_table_id, actor_id) ) / (sum(rate({metric('stream_join_lookup_total_count')}[$__rate_interval])) by (side, join_table_id, degree_table_id, actor_id))",
"join executor cache miss ratio - - {{side}} side, join_table_id {{join_table_id}} degree_table_id {{degree_table_id}} actor {{actor_id}}",
),

panels.target(
f"(sum(rate({metric('stream_agg_lookup_miss_count')}[$__rate_interval])) by (table_id, actor_id) ) / (sum(rate({metric('stream_agg_lookup_total_count')}[$__rate_interval])) by (table_id, actor_id))",
"Agg cache miss ratio - table {{table_id}} actor {{actor_id}} ",
),

panels.target(
f"1 - (sum(rate({metric('stream_materialize_cache_hit_count')}[$__rate_interval])) by (table_id, actor_id) ) / (sum(rate({metric('stream_materialize_cache_total_count')}[$__rate_interval])) by (table_id, actor_id))",
"materialize executor cache miss ratio - table {{table_id}} actor {{actor_id}} {{instance}}",
),

],
),
panels.timeseries_actor_latency(
"Join Executor Barrier Align",
"",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

30 changes: 30 additions & 0 deletions grafana/risingwave-user-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,36 @@ def section_memory(outer_panels):
f"rate({metric('stream_agg_lookup_total_count')}[$__rate_interval])",
"Agg - total lookups - table {{table_id}} actor {{actor_id}}",
),
panels.target(
f"rate({metric('stream_materialize_cache_hit_count')}[$__rate_interval])",
"Materialize - cache hit count - table {{table_id}} - actor {{actor_id}} {{instance}}",
),
panels.target(
f"rate({metric('stream_materialize_cache_total_count')}[$__rate_interval])",
"Materialize - total cache count - table {{table_id}} - actor {{actor_id}} {{instance}}",
),
],
),

panels.timeseries_percentage(
"Executor Cache Miss Ratio",
"",
[
panels.target(
f"(sum(rate({metric('stream_join_lookup_miss_count')}[$__rate_interval])) by (side, join_table_id, degree_table_id, actor_id) ) / (sum(rate({metric('stream_join_lookup_total_count')}[$__rate_interval])) by (side, join_table_id, degree_table_id, actor_id))",
"join executor cache miss ratio - - {{side}} side, join_table_id {{join_table_id}} degree_table_id {{degree_table_id}} actor {{actor_id}}",
),

panels.target(
f"(sum(rate({metric('stream_agg_lookup_miss_count')}[$__rate_interval])) by (table_id, actor_id) ) / (sum(rate({metric('stream_agg_lookup_total_count')}[$__rate_interval])) by (table_id, actor_id))",
"Agg cache miss ratio - table {{table_id}} actor {{actor_id}} ",
),

panels.target(
f"1 - (sum(rate({metric('stream_materialize_cache_hit_count')}[$__rate_interval])) by (table_id, actor_id) ) / (sum(rate({metric('stream_materialize_cache_total_count')}[$__rate_interval])) by (table_id, actor_id))",
"materialize executor cache miss ratio - table {{table_id}} - actor {{actor_id}} {{instance}}",
),

],
),
panels.timeseries_ops(
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

21 changes: 21 additions & 0 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ pub struct StreamingMetrics {

/// User compute error reporting
pub user_compute_error_count: GenericCounterVec<AtomicU64>,

// Materialize
pub materialize_cache_hit_count: GenericCounterVec<AtomicU64>,
pub materialize_cache_total_count: GenericCounterVec<AtomicU64>,
}

impl StreamingMetrics {
Expand Down Expand Up @@ -469,6 +473,21 @@ impl StreamingMetrics {
)
.unwrap();

let materialize_cache_hit_count = register_int_counter_vec_with_registry!(
"stream_materialize_cache_hit_count",
"Materialize executor cache hit count",
&["table_id", "actor_id"],
registry
)
.unwrap();

let materialize_cache_total_count = register_int_counter_vec_with_registry!(
"stream_materialize_cache_total_count",
"Materialize executor cache total operation",
&["table_id", "actor_id"],
registry
)
.unwrap();
Self {
registry,
executor_row_count,
Expand Down Expand Up @@ -519,6 +538,8 @@ impl StreamingMetrics {
lru_watermark_step,
jemalloc_allocated_bytes,
user_compute_error_count,
materialize_cache_hit_count,
materialize_cache_total_count,
}
}

Expand Down
37 changes: 32 additions & 5 deletions src/stream/src/executor/mview/materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use risingwave_storage::StateStore;
use crate::cache::{new_unbounded, ExecutorCache};
use crate::common::table::state_table::StateTableInner;
use crate::executor::error::StreamExecutorError;
use crate::executor::monitor::StreamingMetrics;
use crate::executor::{
expect_first_barrier, ActorContext, ActorContextRef, BoxedExecutor, BoxedMessageStream,
Executor, ExecutorInfo, Message, PkIndicesRef, StreamExecutorResult,
Expand All @@ -59,6 +60,7 @@ pub struct MaterializeExecutor<S: StateStore, SD: ValueRowSerde> {
info: ExecutorInfo,

materialize_cache: MaterializeCache<SD>,

conflict_behavior: ConflictBehavior,
}

Expand All @@ -77,6 +79,7 @@ impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
table_catalog: &Table,
watermark_epoch: AtomicU64Ref,
conflict_behavior: ConflictBehavior,
metrics: Arc<StreamingMetrics>,
) -> Self {
let arrange_columns: Vec<usize> = key.iter().map(|k| k.column_index).collect();

Expand All @@ -89,7 +92,8 @@ impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
// decoded row.
let state_table =
StateTableInner::from_table_catalog_inconsistent_op(table_catalog, store, vnodes).await;

let actor_id = actor_context.id;
let table_id = table_catalog.id;
Self {
input,
state_table,
Expand All @@ -100,7 +104,7 @@ impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
pk_indices: arrange_columns,
identity: format!("MaterializeExecutor {:X}", executor_id),
},
materialize_cache: MaterializeCache::new(watermark_epoch),
materialize_cache: MaterializeCache::new(watermark_epoch, metrics, actor_id, table_id),
conflict_behavior,
}
}
Expand Down Expand Up @@ -224,7 +228,12 @@ impl<S: StateStore> MaterializeExecutor<S, BasicSerde> {
pk_indices: arrange_columns,
identity: format!("MaterializeExecutor {:X}", executor_id),
},
materialize_cache: MaterializeCache::new(watermark_epoch),
materialize_cache: MaterializeCache::new(
watermark_epoch,
Arc::new(StreamingMetrics::unused()),
0,
0,
),
conflict_behavior,
}
}
Expand Down Expand Up @@ -421,6 +430,9 @@ impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for MaterializeExecutor<S
pub struct MaterializeCache<SD> {
data: ExecutorCache<Vec<u8>, CacheValue>,
_serde: PhantomData<SD>,
metrics: Arc<StreamingMetrics>,
actor_id: String,
table_id: String,
}

#[derive(EnumAsInner)]
Expand All @@ -432,11 +444,19 @@ pub enum CacheValue {
type EmptyValue = ();

impl<SD: ValueRowSerde> MaterializeCache<SD> {
pub fn new(watermark_epoch: AtomicU64Ref) -> Self {
pub fn new(
watermark_epoch: AtomicU64Ref,
metrics: Arc<StreamingMetrics>,
actor_id: u32,
table_id: u32,
) -> Self {
let cache = ExecutorCache::new(new_unbounded(watermark_epoch));
Self {
data: cache,
_serde: PhantomData,
metrics,
actor_id: actor_id.to_string(),
table_id: table_id.to_string(),
}
}

Expand Down Expand Up @@ -577,10 +597,17 @@ impl<SD: ValueRowSerde> MaterializeCache<SD> {
) -> StreamExecutorResult<()> {
let mut futures = vec![];
for key in keys {
self.metrics
.materialize_cache_total_count
.with_label_values(&[&self.table_id, &self.actor_id])
.inc();
if self.data.contains(key) {
self.metrics
.materialize_cache_hit_count
.with_label_values(&[&self.table_id, &self.actor_id])
.inc();
continue;
}

futures.push(async {
let key_row = table.pk_serde().deserialize(key).unwrap();
(key.to_vec(), table.get_compacted_row(&key_row).await)
Expand Down
2 changes: 2 additions & 0 deletions src/stream/src/from_proto/mview.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ impl ExecutorBuilder for MaterializeExecutorBuilder {
table,
stream.get_watermark_epoch(),
conflict_behavior,
stream.streaming_metrics.clone(),
)
.await
.boxed()
Expand Down Expand Up @@ -115,6 +116,7 @@ impl ExecutorBuilder for ArrangeExecutorBuilder {
table,
stream.get_watermark_epoch(),
conflict_behavior,
stream.streaming_metrics.clone(),
)
.await;

Expand Down

0 comments on commit ed5af28

Please sign in to comment.