Skip to content

Commit

Permalink
feat(meta): export metrics of meta count/role info (#8057)
Browse files Browse the repository at this point in the history
Export metrics of meta count and role infos to grafana.

Approved-By: shanicky
  • Loading branch information
yezizp2012 authored Feb 22, 2023
1 parent 8dff620 commit 014eb09
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 38 deletions.
6 changes: 6 additions & 0 deletions dashboard/proto/gen/common.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions grafana/risingwave-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,15 @@ def section_cluster_node(panels):
)
],
),
panels.timeseries_count(
"Meta Cluster",
"",
[
panels.target(f"sum({metric('meta_num')}) by (worker_addr,role)",
"{{worker_addr}} @ {{role}}")
],
["last"],
),
]


Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dashboard.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ enum WorkerType {
COMPUTE_NODE = 2;
RISE_CTL = 3;
COMPACTOR = 4;
META = 5;
}

message ParallelUnit {
Expand Down
35 changes: 2 additions & 33 deletions src/meta/src/manager/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use tokio::task::JoinHandle;

use crate::manager::{IdCategory, LocalNotification, MetaSrvEnv};
use crate::model::{MetadataModel, Worker, INVALID_EXPIRE_AT};
use crate::rpc::metrics::MetaMetrics;
use crate::storage::MetaStore;
use crate::{MetaError, MetaResult};

Expand Down Expand Up @@ -87,38 +86,8 @@ where
self.core.read().await
}

pub async fn start_worker_num_monitor(
cluster_manager: ClusterManagerRef<S>,
interval: Duration,
meta_metrics: Arc<MetaMetrics>,
) -> (JoinHandle<()>, Sender<()>) {
let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
let join_handle = tokio::spawn(async move {
let mut monitor_interval = tokio::time::interval(interval);
monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
// Wait for interval
_ = monitor_interval.tick() => {},
// Shutdown monitor
_ = &mut shutdown_rx => {
tracing::info!("Worker number monitor is stopped");
return;
}
}

for (worker_type, worker_num) in
cluster_manager.core.read().await.count_worker_node()
{
meta_metrics
.worker_num
.with_label_values(&[(worker_type.as_str_name())])
.set(worker_num as i64);
}
}
});

(join_handle, shutdown_tx)
pub async fn count_worker_node(&self) -> HashMap<WorkerType, u64> {
self.core.read().await.count_worker_node()
}

/// A worker node will immediately register itself to meta when it bootstraps.
Expand Down
66 changes: 65 additions & 1 deletion src/meta/src/rpc/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,22 @@
// limitations under the License.

use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Duration;

use prometheus::{
exponential_buckets, histogram_opts, register_histogram_vec_with_registry,
register_histogram_with_registry, register_int_counter_vec_with_registry,
register_int_gauge_vec_with_registry, register_int_gauge_with_registry, Histogram,
HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry,
};
use risingwave_pb::common::WorkerType;
use tokio::sync::oneshot::Sender;
use tokio::task::JoinHandle;

use crate::manager::ClusterManagerRef;
use crate::rpc::server::ElectionClientRef;
use crate::storage::MetaStore;

pub struct MetaMetrics {
registry: Registry,
Expand All @@ -34,7 +43,7 @@ pub struct MetaMetrics {

/// Latency between each barrier send
pub barrier_send_latency: Histogram,
/// The number of all barriers. It is the sum of barreriers that are in-flight or completed but
/// The number of all barriers. It is the sum of barriers that are in-flight or completed but
/// waiting for other barriers
pub all_barrier_nums: IntGauge,
/// The number of in-flight barriers
Expand Down Expand Up @@ -81,6 +90,9 @@ pub struct MetaMetrics {
/// The number of workers in the cluster.
pub worker_num: IntGaugeVec,
pub compact_skip_frequency: IntCounterVec,

/// The roles of all meta nodes in the cluster.
pub meta_type: IntGaugeVec,
}

impl MetaMetrics {
Expand Down Expand Up @@ -256,6 +268,14 @@ impl MetaMetrics {
)
.unwrap();

let meta_type = register_int_gauge_vec_with_registry!(
"meta_num",
"role of meta nodes in the cluster",
&["worker_addr", "role"],
registry,
)
.unwrap();

Self {
registry,

Expand Down Expand Up @@ -286,6 +306,7 @@ impl MetaMetrics {
time_after_last_observation: AtomicU64::new(0),

worker_num,
meta_type,
}
}

Expand All @@ -298,3 +319,46 @@ impl Default for MetaMetrics {
Self::new()
}
}

pub async fn start_worker_info_monitor<S: MetaStore>(
cluster_manager: ClusterManagerRef<S>,
election_client: Option<ElectionClientRef>,
interval: Duration,
meta_metrics: Arc<MetaMetrics>,
) -> (JoinHandle<()>, Sender<()>) {
let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
let join_handle = tokio::spawn(async move {
let mut monitor_interval = tokio::time::interval(interval);
monitor_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
// Wait for interval
_ = monitor_interval.tick() => {},
// Shutdown monitor
_ = &mut shutdown_rx => {
tracing::info!("Worker number monitor is stopped");
return;
}
}

for (worker_type, worker_num) in cluster_manager.count_worker_node().await {
meta_metrics
.worker_num
.with_label_values(&[(worker_type.as_str_name())])
.set(worker_num as i64);
}
if let Some(client) = &election_client && let Ok(meta_members) = client.get_members().await {
meta_metrics
.worker_num
.with_label_values(&[WorkerType::Meta.as_str_name()])
.set(meta_members.len() as i64);
meta_members.into_iter().for_each(|m| {
let role = if m.is_leader {"leader"} else {"follower"};
meta_metrics.meta_type.with_label_values(&[&m.id, role]).set(1);
});
}
}
});

(join_handle, shutdown_tx)
}
7 changes: 4 additions & 3 deletions src/meta/src/rpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use crate::manager::{
SystemParamManager,
};
use crate::rpc::election_client::{ElectionClient, EtcdElectionClient};
use crate::rpc::metrics::MetaMetrics;
use crate::rpc::metrics::{start_worker_info_monitor, MetaMetrics};
use crate::rpc::service::backup_service::BackupServiceImpl;
use crate::rpc::service::cluster_service::ClusterServiceImpl;
use crate::rpc::service::heartbeat_service::HeartbeatServiceImpl;
Expand Down Expand Up @@ -338,7 +338,7 @@ pub async fn start_service_as_election_leader<S: MetaStore>(
.await
.unwrap();

let meta_member_srv = MetaMemberServiceImpl::new(match election_client {
let meta_member_srv = MetaMemberServiceImpl::new(match election_client.clone() {
None => Either::Right(address_info.clone()),
Some(election_client) => Either::Left(election_client),
});
Expand Down Expand Up @@ -509,8 +509,9 @@ pub async fn start_service_as_election_leader<S: MetaStore>(
let mut sub_tasks =
hummock::start_hummock_workers(vacuum_manager, compaction_scheduler, &env.opts);
sub_tasks.push(
ClusterManager::start_worker_num_monitor(
start_worker_info_monitor(
cluster_manager.clone(),
election_client.clone(),
Duration::from_secs(env.opts.node_num_monitor_interval_sec),
meta_metrics.clone(),
)
Expand Down

0 comments on commit 014eb09

Please sign in to comment.