Skip to content

Commit

Permalink
feat(metrics): add metric for source upstream (risingwavelabs#9027)
Browse files Browse the repository at this point in the history
Signed-off-by: tabVersion <tabvision@bupt.icu>
  • Loading branch information
tabVersion authored Apr 6, 2023
1 parent 78505b1 commit 83b7c79
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 6 deletions.
2 changes: 1 addition & 1 deletion docker/dashboards/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions grafana/risingwave-dev-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,16 @@ def section_streaming(panels):
)
]
),
panels.timeseries_count(
"Source Upstream Status",
"Monitor each source upstream, 0 means the upstream is not normal, 1 means the source is ready.",
[
panels.target(
f"{metric('source_status_is_up')}",
"source_id={{source_id}}, source_name={{source_name}} @ {{instance}}"
)
]
),
panels.timeseries_rowsps(
"Backfill Snapshot Read Throughput(rows)",
"Total number of rows that have been read from the backfill snapshot",
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dev-dashboard.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/meta/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#![cfg_attr(coverage, feature(no_coverage))]
#![test_runner(risingwave_test_runner::test_runner::run_failpont_tests)]
#![feature(is_sorted)]
#![feature(string_leak)]

pub mod backup_restore;
mod barrier;
Expand Down
12 changes: 12 additions & 0 deletions src/meta/src/rpc/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ pub struct MetaMetrics {
/// ********************************** Object Store ************************************
// Object store related metrics (for backup/restore and version checkpoint)
pub object_store_metric: Arc<ObjectStoreMetrics>,

/// supervisor for which source is still up.
pub source_is_up: IntGaugeVec,
}

impl MetaMetrics {
Expand Down Expand Up @@ -403,6 +406,14 @@ impl MetaMetrics {
);
let recovery_latency = register_histogram_with_registry!(opts, registry).unwrap();

let source_is_up = register_int_gauge_vec_with_registry!(
"source_status_is_up",
"source is up or not",
&["source_id", "source_name"],
registry
)
.unwrap();

Self {
registry,
grpc_latency,
Expand Down Expand Up @@ -446,6 +457,7 @@ impl MetaMetrics {
scale_compactor_core_num,
level_compact_task_cnt,
object_store_metric,
source_is_up,
}
}

Expand Down
1 change: 1 addition & 0 deletions src/meta/src/rpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ pub async fn start_service_as_election_leader<S: MetaStore>(
barrier_scheduler.clone(),
catalog_manager.clone(),
fragment_manager.clone(),
meta_metrics.clone(),
)
.await
.unwrap(),
Expand Down
36 changes: 32 additions & 4 deletions src/meta/src/stream/source_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use tokio::{select, time};
use crate::barrier::{BarrierScheduler, Command};
use crate::manager::{CatalogManagerRef, FragmentManagerRef, SourceId};
use crate::model::{ActorId, FragmentId, TableFragments};
use crate::rpc::metrics::MetaMetrics;
use crate::storage::MetaStore;
use crate::MetaResult;

Expand All @@ -49,6 +50,7 @@ pub struct SourceManager<S: MetaStore> {
barrier_scheduler: BarrierScheduler<S>,
core: Mutex<SourceManagerCore<S>>,
connector_rpc_endpoint: Option<String>,
metrics: Arc<MetaMetrics>,
}

struct SharedSplitMap {
Expand All @@ -58,16 +60,20 @@ struct SharedSplitMap {
type SharedSplitMapRef = Arc<Mutex<SharedSplitMap>>;

struct ConnectorSourceWorker {
source_id: SourceId,
source_name: String,
current_splits: SharedSplitMapRef,
enumerator: SplitEnumeratorImpl,
period: Duration,
metrics: Arc<MetaMetrics>,
}

impl ConnectorSourceWorker {
pub async fn create(
connector_rpc_endpoint: &Option<String>,
source: &Source,
period: Duration,
metrics: Arc<MetaMetrics>,
) -> MetaResult<Self> {
let mut properties = ConnectorProperties::extract(source.properties.clone())?;
// init cdc properties
Expand All @@ -78,9 +84,12 @@ impl ConnectorSourceWorker {
let enumerator = SplitEnumeratorImpl::create(properties).await?;
let splits = Arc::new(Mutex::new(SharedSplitMap { splits: None }));
Ok(Self {
source_id: source.id,
source_name: source.name.clone(),
current_splits: splits,
enumerator,
period,
metrics,
})
}

Expand Down Expand Up @@ -108,7 +117,17 @@ impl ConnectorSourceWorker {
}

async fn tick(&mut self) -> MetaResult<()> {
let splits = self.enumerator.list_splits().await?;
let source_is_up = |res: i64| {
self.metrics
.source_is_up
.with_label_values(&[self.source_id.to_string().as_str(), &self.source_name])
.set(res);
};
let splits = self.enumerator.list_splits().await.map_err(|e| {
source_is_up(0);
e
})?;
source_is_up(1);
let mut current_splits = self.current_splits.lock().await;
current_splits.splits.replace(
splits
Expand Down Expand Up @@ -396,6 +415,7 @@ where
barrier_scheduler: BarrierScheduler<S>,
catalog_manager: CatalogManagerRef<S>,
fragment_manager: FragmentManagerRef<S>,
metrics: Arc<MetaMetrics>,
) -> MetaResult<Self> {
let mut managed_sources = HashMap::new();
{
Expand All @@ -407,6 +427,7 @@ where
&source,
&mut managed_sources,
false,
metrics.clone(),
)
.await?
}
Expand All @@ -431,6 +452,7 @@ where
core,
paused: Mutex::new(()),
connector_rpc_endpoint,
metrics,
})
}

Expand Down Expand Up @@ -565,6 +587,7 @@ where
source,
&mut core.managed_sources,
true,
self.metrics.clone(),
)
.await?;
}
Expand All @@ -576,10 +599,15 @@ where
source: &Source,
managed_sources: &mut HashMap<SourceId, ConnectorSourceWorkerHandle>,
force_tick: bool,
metrics: Arc<MetaMetrics>,
) -> MetaResult<()> {
let mut worker =
ConnectorSourceWorker::create(connector_rpc_endpoint, source, Duration::from_secs(10))
.await?;
let mut worker = ConnectorSourceWorker::create(
connector_rpc_endpoint,
source,
Duration::from_secs(10),
metrics,
)
.await?;
let current_splits_ref = worker.current_splits.clone();
tracing::info!("spawning new watcher for source {}", source.id);

Expand Down
1 change: 1 addition & 0 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,7 @@ mod tests {
barrier_scheduler.clone(),
catalog_manager.clone(),
fragment_manager.clone(),
meta_metrics.clone(),
)
.await?,
);
Expand Down

0 comments on commit 83b7c79

Please sign in to comment.