Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor] #4075: Move metrics related functionality into MetricsReporter #4379

Merged
merged 2 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use std::{path::PathBuf, sync::Arc};
use clap::Parser;
use color_eyre::eyre::{eyre, Result, WrapErr};
use iroha_config::parameters::{actual::Root as Config, user::CliContext};
#[cfg(feature = "telemetry")]
use iroha_core::metrics::MetricsReporter;
use iroha_core::{
block_sync::{BlockSynchronizer, BlockSynchronizerHandle},
gossiper::{TransactionGossiper, TransactionGossiperHandle},
Expand All @@ -24,7 +26,7 @@ use iroha_core::{
try_read_snapshot, SnapshotMaker, SnapshotMakerHandle, TryReadError as TryReadSnapshotError,
},
state::{State, StateReadOnly, World},
sumeragi::{GenesisWithPubKey, SumeragiHandle, SumeragiStartArgs},
sumeragi::{GenesisWithPubKey, SumeragiHandle, SumeragiMetrics, SumeragiStartArgs},
IrohaNetwork,
};
use iroha_data_model::prelude::*;
Expand Down Expand Up @@ -214,6 +216,7 @@ impl Iroha {
);

let kura = Kura::new(&config.kura)?;
let kura_thread_handler = Kura::start(Arc::clone(&kura));
let live_query_store_handle = LiveQueryStore::from_config(config.live_query_store).start();

let block_count = kura.init()?;
Expand Down Expand Up @@ -254,7 +257,13 @@ impl Iroha {
#[cfg(feature = "telemetry")]
Self::start_telemetry(&logger, &config).await?;

let kura_thread_handler = Kura::start(Arc::clone(&kura));
#[cfg(feature = "telemetry")]
let metrics_reporter = MetricsReporter::new(
Arc::clone(&state),
network.clone(),
kura.clone(),
queue.clone(),
);

let start_args = SumeragiStartArgs {
sumeragi_config: config.sumeragi.clone(),
Expand All @@ -269,6 +278,10 @@ impl Iroha {
public_key: config.genesis.public_key().clone(),
},
block_count,
sumeragi_metrics: SumeragiMetrics {
dropped_messages: metrics_reporter.metrics().dropped_messages.clone(),
view_changes: metrics_reporter.metrics().view_changes.clone(),
},
};
// Starting Sumeragi requires no async context enabled
let sumeragi = tokio::task::spawn_blocking(move || SumeragiHandle::start(start_args))
Expand Down Expand Up @@ -322,11 +335,11 @@ impl Iroha {
Arc::clone(&queue),
events_sender,
Arc::clone(&notify_shutdown),
#[cfg(feature = "telemetry")]
sumeragi.clone(),
live_query_store_handle,
Arc::clone(&kura),
Arc::clone(&state),
#[cfg(feature = "telemetry")]
metrics_reporter,
);

Self::spawn_config_updates_broadcasting(kiso.clone(), logger.clone());
Expand Down
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod executor;
pub mod gossiper;
pub mod kiso;
pub mod kura;
pub mod metrics;
pub mod query;
pub mod queue;
pub mod smartcontracts;
Expand Down
151 changes: 151 additions & 0 deletions core/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
//! Metrics and status reporting

use std::{sync::Arc, time::SystemTime};

use eyre::{Result, WrapErr as _};
use iroha_telemetry::metrics::Metrics;
use parking_lot::Mutex;
use storage::storage::StorageReadOnly;

use crate::{
kura::Kura,
queue::Queue,
state::{State, StateReadOnly, WorldReadOnly},
IrohaNetwork,
};

/// Responsible for collecting and updating metrics
#[derive(Clone)]
pub struct MetricsReporter {
state: Arc<State>,
network: IrohaNetwork,
kura: Arc<Kura>,
queue: Arc<Queue>,
metrics: Metrics,
/// Latest observed and processed height by metrics reporter
latest_block_height: Arc<Mutex<u64>>,
}

impl MetricsReporter {
/// Construct [`Self`]
pub fn new(
state: Arc<State>,
network: IrohaNetwork,
kura: Arc<Kura>,
queue: Arc<Queue>,
) -> Self {
Self {
state,
network,
queue,
kura,
metrics: Metrics::default(),
latest_block_height: Arc::new(Mutex::new(0)),
}
}

/// Update the metrics on the state.
///
/// # Errors
/// - Domains fail to compose
///
/// # Panics
/// - If either mutex is poisoned
#[allow(clippy::cast_precision_loss)]
pub fn update_metrics(&self) -> Result<()> {
let online_peers_count: u64 = self
.network
.online_peers(
#[allow(clippy::disallowed_types)]
std::collections::HashSet::len,
)
.try_into()
.expect("casting usize to u64");

let state_view = self.state.view();

let mut lastest_block_height = self.latest_block_height.lock();

let start_index = *lastest_block_height;
{
let mut block_index = start_index;
while block_index < state_view.height() {
let Some(block) = self.kura.get_block_by_height(block_index + 1) else {
break;
};
block_index += 1;
let mut block_txs_accepted = 0;
let mut block_txs_rejected = 0;
for tx in block.transactions() {
if tx.error.is_none() {
block_txs_accepted += 1;
} else {
block_txs_rejected += 1;
}
}

self.metrics
.txs
.with_label_values(&["accepted"])
.inc_by(block_txs_accepted);
self.metrics
.txs
.with_label_values(&["rejected"])
.inc_by(block_txs_rejected);
self.metrics
.txs
.with_label_values(&["total"])
.inc_by(block_txs_accepted + block_txs_rejected);
self.metrics.block_height.inc();
}
*lastest_block_height = block_index;
}

let new_tx_amounts = {
let mut new_buf = Vec::new();
core::mem::swap(&mut new_buf, &mut state_view.new_tx_amounts.lock());
new_buf
};

for amount in &new_tx_amounts {
self.metrics.tx_amounts.observe(*amount);
}

#[allow(clippy::cast_possible_truncation)]
if let Some(timestamp) = state_view.genesis_timestamp() {
let curr_time = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Failed to get the current system time");

// this will overflow in 584942417years.
self.metrics.uptime_since_genesis_ms.set(
(curr_time - timestamp)
.as_millis()
.try_into()
.expect("Timestamp should fit into u64"),
)
};

self.metrics.connected_peers.set(online_peers_count);

self.metrics
.domains
.set(state_view.world().domains().len() as u64);
for domain in state_view.world().domains_iter() {
self.metrics
.accounts
.get_metric_with_label_values(&[domain.id.name.as_ref()])
.wrap_err("Failed to compose domains")?
.set(domain.accounts.len() as u64);
}

self.metrics.queue_size.set(self.queue.tx_len() as u64);

Ok(())
}

/// Access node metrics.
pub fn metrics(&self) -> &Metrics {
&self.metrics
}
}
4 changes: 4 additions & 0 deletions core/src/sumeragi/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ pub struct Sumeragi {
/// sumeragi is more dependent on the code that is internal to the
/// subsystem.
pub transaction_cache: Vec<AcceptedTransaction>,
/// Metrics for reporting number of view changes in current round
pub view_changes_metric: iroha_telemetry::metrics::ViewChangesGauge,
}

#[allow(clippy::missing_fields_in_debug)]
Expand Down Expand Up @@ -919,6 +921,7 @@ pub(crate) fn run(
&mut last_view_change_time,
&mut view_change_time,
);
sumeragi.view_changes_metric.set(old_view_change_index);

if let Some(message) = {
let (msg, sleep) =
Expand Down Expand Up @@ -999,6 +1002,7 @@ pub(crate) fn run(
&mut last_view_change_time,
&mut view_change_time,
);
sumeragi.view_changes_metric.set(old_view_change_index);

sumeragi.process_message_independent(
&state,
Expand Down
Loading
Loading