Skip to content

Commit

Permalink
refactor(metrics): fix cache metrics to be table id + actor id (risin…
Browse files Browse the repository at this point in the history
  • Loading branch information
wcy-fdu authored Apr 7, 2023
1 parent 9a15a88 commit c343f00
Show file tree
Hide file tree
Showing 10 changed files with 70 additions and 36 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-user-dashboard.json

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -996,15 +996,15 @@ def section_streaming_actors(outer_panels):
[
panels.target(
f"rate({metric('stream_join_lookup_miss_count')}[$__rate_interval])",
"cache miss {{actor_id}} {{side}}",
"cache miss table - {{side}} side, join_table_id {{join_table_id}} degree_table_id {{degree_table_id}} actor {{actor_id}} ",
),
panels.target(
f"rate({metric('stream_join_lookup_total_count')}[$__rate_interval])",
"total lookups {{actor_id}} {{side}}",
"total lookups {{side}} side, join_table_id {{join_table_id}} degree_table_id {{degree_table_id}} actor {{actor_id}}",
),
panels.target(
f"rate({metric('stream_join_insert_cache_miss_count')}[$__rate_interval])",
"cache miss when insert{{actor_id}} {{side}}",
"cache miss when insert {{side}} side, join_table_id {{join_table_id}} degree_table_id {{degree_table_id}} actor {{actor_id}}",
),
],
),
Expand Down Expand Up @@ -1080,11 +1080,11 @@ def section_streaming_actors(outer_panels):
[
panels.target(
f"rate({metric('stream_agg_lookup_miss_count')}[$__rate_interval])",
"cache miss {{actor_id}}",
"cache miss - table {{table_id}} actor {{actor_id}}",
),
panels.target(
f"rate({metric('stream_agg_lookup_total_count')}[$__rate_interval])",
"total lookups {{actor_id}}",
"total lookups - table {{table_id}} actor {{actor_id}}",
),
],
),
Expand All @@ -1094,11 +1094,11 @@ def section_streaming_actors(outer_panels):
[
panels.target(
f"rate({metric('stream_agg_chunk_lookup_miss_count')}[$__rate_interval])",
"chunk-level cache miss {{actor_id}}",
"chunk-level cache miss - table {{table_id}} actor {{actor_id}}}",
),
panels.target(
f"rate({metric('stream_agg_chunk_lookup_total_count')}[$__rate_interval])",
"chunk-level total lookups {{actor_id}}",
"chunk-level total lookups - table {{table_id}} actor {{actor_id}}",
),
],
),
Expand All @@ -1107,7 +1107,7 @@ def section_streaming_actors(outer_panels):
"The number of keys cached in each hash aggregation executor's executor cache.",
[
panels.target(f"{metric('stream_agg_cached_keys')}",
"{{actor_id}}"),
"table {{table_id}} actor {{actor_id}}"),
],
),
],
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions grafana/risingwave-user-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,19 +240,19 @@ def section_memory(outer_panels):
[
panels.target(
f"rate({metric('stream_join_lookup_miss_count')}[$__rate_interval])",
"Join - cache miss {{actor_id}} {{side}}",
"Join - cache miss - {{side}} side, join_table_id {{join_table_id}} degree_table_id {{degree_table_id}} actor {{actor_id}}",
),
panels.target(
f"rate({metric('stream_join_lookup_total_count')}[$__rate_interval])",
"Join - total lookups {{actor_id}} {{side}}",
"Join - total lookups - {{side}} side, join_table_id {{join_table_id}} degree_table_id {{degree_table_id}} actor {{actor_id}}",
),
panels.target(
f"rate({metric('stream_agg_lookup_miss_count')}[$__rate_interval])",
"Agg - cache miss {{actor_id}}",
"Agg - cache miss - table {{table_id}} actor {{actor_id}}",
),
panels.target(
f"rate({metric('stream_agg_lookup_total_count')}[$__rate_interval])",
"Agg - total lookups {{actor_id}}",
"Agg - total lookups - table {{table_id}} actor {{actor_id}}",
),
],
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-user-dashboard.json

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,8 +432,8 @@ where
}
}

fn table_id(&self) -> TableId {
self.table_id
pub fn table_id(&self) -> u32 {
self.table_id.table_id
}

/// Returns whether the table is a singleton table.
Expand Down
11 changes: 6 additions & 5 deletions src/stream/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,28 +504,29 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
) {
// Update metrics.
let actor_id_str = this.actor_ctx.id.to_string();
let table_id_str = this.result_table.table_id().to_string();
this.metrics
.agg_lookup_miss_count
.with_label_values(&[&actor_id_str])
.with_label_values(&[&table_id_str, &actor_id_str])
.inc_by(vars.stats.lookup_miss_count);
vars.stats.lookup_miss_count = 0;
this.metrics
.agg_total_lookup_count
.with_label_values(&[&actor_id_str])
.with_label_values(&[&table_id_str, &actor_id_str])
.inc_by(vars.stats.total_lookup_count);
vars.stats.total_lookup_count = 0;
this.metrics
.agg_cached_keys
.with_label_values(&[&actor_id_str])
.with_label_values(&[&table_id_str, &actor_id_str])
.set(vars.agg_group_cache.len() as i64);
this.metrics
.agg_chunk_lookup_miss_count
.with_label_values(&[&actor_id_str])
.with_label_values(&[&table_id_str, &actor_id_str])
.inc_by(vars.stats.chunk_lookup_miss_count);
vars.stats.chunk_lookup_miss_count = 0;
this.metrics
.agg_chunk_total_lookup_count
.with_label_values(&[&actor_id_str])
.with_label_values(&[&table_id_str, &actor_id_str])
.inc_by(vars.stats.chunk_total_lookup_count);
vars.stats.chunk_total_lookup_count = 0;

Expand Down
43 changes: 38 additions & 5 deletions src/stream/src/executor/managed_state/join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ pub struct JoinHashMapMetrics {
metrics: Arc<StreamingMetrics>,
/// Basic information
actor_id: String,
join_table_id: String,
degree_table_id: String,
side: &'static str,
/// How many times have we hit the cache of join executor
lookup_miss_count: usize,
Expand All @@ -163,10 +165,18 @@ pub struct JoinHashMapMetrics {
}

impl JoinHashMapMetrics {
pub fn new(metrics: Arc<StreamingMetrics>, actor_id: ActorId, side: &'static str) -> Self {
pub fn new(
metrics: Arc<StreamingMetrics>,
actor_id: ActorId,
side: &'static str,
join_table_id: u32,
degree_table_id: u32,
) -> Self {
Self {
metrics,
actor_id: actor_id.to_string(),
join_table_id: join_table_id.to_string(),
degree_table_id: degree_table_id.to_string(),
side,
lookup_miss_count: 0,
total_lookup_count: 0,
Expand All @@ -177,15 +187,30 @@ impl JoinHashMapMetrics {
pub fn flush(&mut self) {
self.metrics
.join_lookup_miss_count
.with_label_values(&[&self.actor_id, self.side])
.with_label_values(&[
(self.side),
&self.join_table_id,
&self.degree_table_id,
&self.actor_id,
])
.inc_by(self.lookup_miss_count as u64);
self.metrics
.join_total_lookup_count
.with_label_values(&[&self.actor_id, self.side])
.with_label_values(&[
(self.side),
&self.join_table_id,
&self.degree_table_id,
&self.actor_id,
])
.inc_by(self.total_lookup_count as u64);
self.metrics
.join_insert_cache_miss_count
.with_label_values(&[&self.actor_id, self.side])
.with_label_values(&[
(self.side),
&self.join_table_id,
&self.degree_table_id,
&self.actor_id,
])
.inc_by(self.insert_cache_miss_count as u64);
self.total_lookup_count = 0;
self.lookup_miss_count = 0;
Expand Down Expand Up @@ -265,6 +290,8 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {
vec![OrderType::ascending(); state_pk_indices.len()],
);

let join_table_id = state_table.table_id();
let degree_table_id = degree_table.table_id();
let state = TableInner {
pk_indices: state_pk_indices,
order_key_indices: state_table.pk_indices().to_vec(),
Expand Down Expand Up @@ -294,7 +321,13 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {
degree_state,
need_degree_table,
pk_contained_in_jk,
metrics: JoinHashMapMetrics::new(metrics, actor_id, side),
metrics: JoinHashMapMetrics::new(
metrics,
actor_id,
side,
join_table_id,
degree_table_id,
),
}
}

Expand Down
16 changes: 8 additions & 8 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,23 +279,23 @@ impl StreamingMetrics {
let join_lookup_miss_count = register_int_counter_vec_with_registry!(
"stream_join_lookup_miss_count",
"Join executor lookup miss duration",
&["actor_id", "side"],
&["side", "join_table_id", "degree_table_id", "actor_id"],
registry
)
.unwrap();

let join_total_lookup_count = register_int_counter_vec_with_registry!(
"stream_join_lookup_total_count",
"Join executor lookup total operation",
&["actor_id", "side"],
&["side", "join_table_id", "degree_table_id", "actor_id"],
registry
)
.unwrap();

let join_insert_cache_miss_count = register_int_counter_vec_with_registry!(
"stream_join_insert_cache_miss_count",
"Join executor cache miss when insert operation",
&["actor_id", "side"],
&["side", "join_table_id", "degree_table_id", "actor_id"],
registry
)
.unwrap();
Expand Down Expand Up @@ -352,39 +352,39 @@ impl StreamingMetrics {
let agg_lookup_miss_count = register_int_counter_vec_with_registry!(
"stream_agg_lookup_miss_count",
"Aggregation executor lookup miss duration",
&["actor_id"],
&["table_id", "actor_id"],
registry
)
.unwrap();

let agg_total_lookup_count = register_int_counter_vec_with_registry!(
"stream_agg_lookup_total_count",
"Aggregation executor lookup total operation",
&["actor_id"],
&["table_id", "actor_id"],
registry
)
.unwrap();

let agg_cached_keys = register_int_gauge_vec_with_registry!(
"stream_agg_cached_keys",
"Number of cached keys in streaming aggregation operators",
&["actor_id"],
&["table_id", "actor_id"],
registry
)
.unwrap();

let agg_chunk_lookup_miss_count = register_int_counter_vec_with_registry!(
"stream_agg_chunk_lookup_miss_count",
"Aggregation executor chunk-level lookup miss duration",
&["actor_id"],
&["table_id", "actor_id"],
registry
)
.unwrap();

let agg_chunk_total_lookup_count = register_int_counter_vec_with_registry!(
"stream_agg_chunk_lookup_total_count",
"Aggregation executor chunk-level lookup total operation",
&["actor_id"],
&["table_id", "actor_id"],
registry
)
.unwrap();
Expand Down

0 comments on commit c343f00

Please sign in to comment.