Skip to content

Commit

Permalink
feat: add prometheus metrics (#490)
Browse files Browse the repository at this point in the history
  • Loading branch information
ppca authored Mar 14, 2024
1 parent df3761d commit e729a9a
Show file tree
Hide file tree
Showing 9 changed files with 155 additions and 7 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ mpc-contract = { path = "../contract" }
mpc-keys = { path = "../keys" }

itertools = "0.12.0"
prometheus = { version = "0.13.3"}
once_cell = "1.13.1"

[dev-dependencies]
itertools = "0.12.0"
5 changes: 5 additions & 0 deletions node/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use near_lake_primitives::actions::ActionMetaDataExt;
use near_lake_primitives::{receipts::ExecutionStatus, AccountId};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::RwLock;

/// Configures indexer.
Expand Down Expand Up @@ -120,7 +121,11 @@ async fn handle_block(
epsilon,
delta,
entropy,
time_added: Instant::now(),
});
crate::metrics::NUM_SIGN_REQUESTS
.with_label_values(&[&ctx.gcp_service.account_id.to_string()])
.inc();
drop(queue);
}
}
Expand Down
1 change: 1 addition & 0 deletions node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod gcp;
pub mod http_client;
pub mod indexer;
pub mod kdf;
pub mod metrics;
pub mod protocol;
pub mod rpc_client;
pub mod storage;
Expand Down
80 changes: 80 additions & 0 deletions node/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use once_cell::sync::Lazy;
pub use prometheus::{
self, core::MetricVec, core::MetricVecBuilder, exponential_buckets, linear_buckets, Counter,
Encoder, Gauge, GaugeVec, Histogram, HistogramOpts, HistogramVec, IntCounter, IntCounterVec,
IntGauge, IntGaugeVec, Opts, Result, TextEncoder,
};

pub(crate) static NODE_RUNNING: Lazy<IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec(
"multichain_node_is_up",
"whether the multichain signer node is up and running",
&["node_account_id"],
)
.unwrap()
});

pub(crate) static NUM_SIGN_REQUESTS: Lazy<IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec(
"multichain_sign_requests_count",
"number of multichain sign requests, marked by sign requests indexed",
&["node_account_id"],
)
.unwrap()
});

pub(crate) static NUM_SIGN_SUCCESS: Lazy<IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec(
"multichain_sign_requests_success",
"number of successful multichain sign requests, marked by publish()",
&["node_account_id"],
)
.unwrap()
});

pub(crate) static SIGN_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"multichain_sign_latency_sec",
"Latency of multichain signing, start from indexing sign request, end when publish() called.",
&["node_account_id"],
Some(exponential_buckets(0.001, 2.0, 20).unwrap()),
)
.unwrap()
});

pub fn try_create_int_gauge_vec(name: &str, help: &str, labels: &[&str]) -> Result<IntGaugeVec> {
check_metric_multichain_prefix(name)?;
let opts = Opts::new(name, help);
let gauge = IntGaugeVec::new(opts, labels)?;
prometheus::register(Box::new(gauge.clone()))?;
Ok(gauge)
}

/// Attempts to create a `HistogramVector`, returning `Err` if the registry does not accept the counter
/// (potentially due to naming conflict).
pub fn try_create_histogram_vec(
name: &str,
help: &str,
labels: &[&str],
buckets: Option<Vec<f64>>,
) -> Result<HistogramVec> {
check_metric_multichain_prefix(name)?;
let mut opts = HistogramOpts::new(name, help);
if let Some(buckets) = buckets {
opts = opts.buckets(buckets);
}
let histogram = HistogramVec::new(opts, labels)?;
prometheus::register(Box::new(histogram.clone()))?;
Ok(histogram)
}

fn check_metric_multichain_prefix(name: &str) -> Result<()> {
if name.starts_with("multichain_") {
Ok(())
} else {
Err(prometheus::Error::Msg(format!(
"Metrics are expected to start with 'multichain_', got {}",
name
)))
}
}
9 changes: 8 additions & 1 deletion node/src/protocol/cryptography.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ impl CryptographicProtocol for RunningState {
my_request.msg_hash,
my_request.epsilon,
my_request.delta,
my_request.time_added,
)?;
}
drop(sign_queue);
Expand All @@ -368,8 +369,14 @@ impl CryptographicProtocol for RunningState {
let info = self.participants.get(&p).unwrap();
messages.push(info.clone(), MpcMessage::Signature(msg));
}
let my_account_id = &self.fetch_participant(&ctx.me().await)?.account_id;
signature_manager
.publish(ctx.rpc_client(), ctx.signer(), ctx.mpc_contract_id())
.publish(
ctx.rpc_client(),
ctx.signer(),
ctx.mpc_contract_id(),
my_account_id,
)
.await?;
drop(signature_manager);
if let Err(err) = messages
Expand Down
6 changes: 5 additions & 1 deletion node/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,11 @@ impl MpcSignProtocol {
}

pub async fn run(mut self) -> anyhow::Result<()> {
let _span = tracing::info_span!("running", my_account_id = self.ctx.account_id.to_string());
let my_account_id = self.ctx.account_id.to_string();
let _span = tracing::info_span!("running", my_account_id);
crate::metrics::NODE_RUNNING
.with_label_values(&[&my_account_id])
.set(1);
let mut queue = MpcMessageQueue::default();
loop {
tracing::debug!("trying to advance mpc recovery protocol");
Expand Down
26 changes: 21 additions & 5 deletions node/src/protocol/signature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct SignRequest {
pub epsilon: Scalar,
pub delta: Scalar,
pub entropy: [u8; 32],
pub time_added: Instant,
}

#[derive(Default)]
Expand Down Expand Up @@ -107,6 +108,7 @@ impl SignatureGenerator {
msg_hash: [u8; 32],
epsilon: Scalar,
delta: Scalar,
timestamp: Instant,
) -> Self {
Self {
protocol,
Expand All @@ -115,7 +117,7 @@ impl SignatureGenerator {
msg_hash,
epsilon,
delta,
timestamp: Instant::now(),
timestamp,
}
}

Expand Down Expand Up @@ -147,8 +149,8 @@ pub struct SignatureManager {
/// Failed signatures awaiting to be retried.
failed_generators: VecDeque<(CryptoHash, FailedGenerator)>,
/// Generated signatures assigned to the current node that are yet to be published.
signatures: Vec<(CryptoHash, [u8; 32], FullSignature<Secp256k1>)>,

/// Vec<(receipt_id, msg_hash, timestamp, output)>
signatures: Vec<(CryptoHash, [u8; 32], Instant, FullSignature<Secp256k1>)>,
participants: Vec<Participant>,
me: Participant,
public_key: PublicKey,
Expand Down Expand Up @@ -187,6 +189,7 @@ impl SignatureManager {
msg_hash: [u8; 32],
epsilon: Scalar,
delta: Scalar,
time_added: Instant,
) -> Result<SignatureGenerator, InitializationError> {
let PresignOutput { big_r, k, sigma } = presignature.output;
// TODO: Check whether it is okay to use invert_vartime instead
Expand All @@ -209,6 +212,7 @@ impl SignatureManager {
msg_hash,
epsilon,
delta,
time_added,
))
}

Expand All @@ -223,13 +227,15 @@ impl SignatureManager {
failed_generator.msg_hash,
failed_generator.epsilon,
failed_generator.delta,
failed_generator.timestamp,
)
.unwrap();
self.generators.insert(hash, generator);
Some(())
}

/// Starts a new presignature generation protocol.
#[allow(clippy::too_many_arguments)]
pub fn generate(
&mut self,
receipt_id: CryptoHash,
Expand All @@ -238,6 +244,7 @@ impl SignatureManager {
msg_hash: [u8; 32],
epsilon: Scalar,
delta: Scalar,
time_added: Instant,
) -> Result<(), InitializationError> {
tracing::info!(%receipt_id, "starting protocol to generate a new signature");
let generator = Self::generate_internal(
Expand All @@ -249,6 +256,7 @@ impl SignatureManager {
msg_hash,
epsilon,
delta,
time_added,
)?;
self.generators.insert(receipt_id, generator);
Ok(())
Expand Down Expand Up @@ -287,6 +295,7 @@ impl SignatureManager {
msg_hash,
epsilon,
delta,
Instant::now(),
)?;
let generator = entry.insert(generator);
Ok(Some(&mut generator.protocol))
Expand Down Expand Up @@ -367,7 +376,7 @@ impl SignatureManager {
);
if generator.proposer == self.me {
self.signatures
.push((*receipt_id, generator.msg_hash, output));
.push((*receipt_id, generator.msg_hash, generator.timestamp, output));
}
// Do not retain the protocol
return false;
Expand All @@ -383,8 +392,9 @@ impl SignatureManager {
rpc_client: &near_fetch::Client,
signer: &T,
mpc_contract_id: &AccountId,
my_account_id: &AccountId,
) -> Result<(), near_fetch::Error> {
for (receipt_id, payload, signature) in self.signatures.drain(..) {
for (receipt_id, payload, time_added, signature) in self.signatures.drain(..) {
// TODO: Figure out how to properly serialize the signature
// let r_s = signature.big_r.x().concat(signature.s.to_bytes());
// let tag =
Expand All @@ -411,6 +421,12 @@ impl SignatureManager {
)],
)
.await?;
crate::metrics::NUM_SIGN_SUCCESS
.with_label_values(&[my_account_id])
.inc();
crate::metrics::SIGN_LATENCY
.with_label_values(&[my_account_id])
.observe(time_added.elapsed().as_secs_f64());
tracing::info!(%receipt_id, big_r = signature.big_r.to_base58(), s = ?signature.s, status = ?response.status, "published signature response");
}
Ok(())
Expand Down
31 changes: 31 additions & 0 deletions node/src/web/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use self::error::Error;
use crate::protocol::message::SignedMessage;
use crate::protocol::{MpcMessage, NodeState};
use crate::web::error::Result;
use anyhow::Context;
use axum::http::StatusCode;
use axum::routing::{get, post};
use axum::{Extension, Json, Router};
Expand All @@ -13,6 +14,7 @@ use mpc_keys::hpke::{self, Ciphered};
use near_crypto::InMemorySigner;
use near_primitives::transaction::{Action, FunctionCallAction};
use near_primitives::types::AccountId;
use prometheus::{Encoder, TextEncoder};
use serde::{Deserialize, Serialize};
use std::{net::SocketAddr, sync::Arc};
use tokio::sync::{mpsc::Sender, RwLock};
Expand Down Expand Up @@ -57,6 +59,7 @@ pub async fn run(
.route("/msg", post(msg))
.route("/join", post(join))
.route("/state", get(state))
.route("/metrics", get(metrics))
.layer(Extension(Arc::new(axum_state)));

let addr = SocketAddr::from(([0, 0, 0, 0], port));
Expand Down Expand Up @@ -187,3 +190,31 @@ async fn state(Extension(state): Extension<Arc<AxumState>>) -> Result<Json<State
}
}
}

#[tracing::instrument(level = "debug", skip_all)]
async fn metrics() -> (StatusCode, String) {
let grab_metrics = || {
let encoder = TextEncoder::new();
let mut buffer = vec![];
encoder
.encode(&prometheus::gather(), &mut buffer)
.with_context(|| "failed to encode metrics")?;

let response = String::from_utf8(buffer.clone())
.with_context(|| "failed to convert bytes to string")?;
buffer.clear();

Ok::<String, anyhow::Error>(response)
};

match grab_metrics() {
Ok(response) => (StatusCode::OK, response),
Err(err) => {
tracing::error!("failed to generate prometheus metrics: {err}");
(
StatusCode::INTERNAL_SERVER_ERROR,
"failed to generate prometheus metrics".to_string(),
)
}
}
}

0 comments on commit e729a9a

Please sign in to comment.