Skip to content

Commit

Permalink
chore: use new ceramic one metric for validation
Browse files Browse the repository at this point in the history
made count optional so we can tell if we should fallback to the old one
  • Loading branch information
dav1do committed Jun 6, 2024
1 parent cf9be35 commit 09312e2
Showing 1 changed file with 43 additions and 13 deletions.
56 changes: 43 additions & 13 deletions runner/src/simulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ use crate::{

// FIXME: is it worth attaching metrics to the peer info?
const IPFS_SERVICE_METRICS_PORT: &str = "9465";
const EVENT_SYNC_METRIC_NAME: &str = "ceramic_store_key_value_insert_count_total";
const EVENT_SYNC_METRIC_NAME_OLD: &str = "ceramic_store_key_value_insert_count_total";
const EVENT_SYNC_METRIC_NAME: &str = "ceramic_one_store_key_value_insert_count_total";
const ANCHOR_REQUEST_MIDS_KEY: &str = "anchor_mids";
const CAS_ANCHOR_REQUEST_KEY: &str = "anchor_requests";

Expand Down Expand Up @@ -255,12 +256,12 @@ impl MetricsCollection for PromMetricCollector {
#[derive(Debug, Clone)]
struct PeerRequestMetricInfo {
pub name: String,
pub count: u64,
pub count: Option<u64>,
pub runtime: Option<u64>,
}

impl PeerRequestMetricInfo {
pub fn new(name: String, count: u64, runtime: Option<u64>) -> Self {
pub fn new(name: String, count: Option<u64>, runtime: Option<u64>) -> Self {
Self {
name,
count,
Expand All @@ -274,7 +275,7 @@ impl PeerRequestMetricInfo {
warn!("Runtime of 0 seconds is invalid for RPS calculation");
0.0
}
Some(runtime) => self.count as f64 / runtime as f64,
Some(runtime) => self.count.unwrap_or(0) as f64 / runtime as f64,
None => {
warn!("Runtime is missing for RPS calculation");
0.0
Expand Down Expand Up @@ -322,7 +323,12 @@ fn final_peer_req_metric_info(
if before.name == current.name {
Ok(PeerRequestMetricInfo::new(
before.name.to_owned(),
current.count - before.count,
Some(
current
.count
.unwrap_or(0)
.saturating_sub(before.count.unwrap_or(0)),
),
Some(run_time_seconds),
))
} else {
Expand Down Expand Up @@ -471,13 +477,13 @@ impl ScenarioState {
if let Some(m) = metric {
results.push(PeerRequestMetricInfo::new(
parse_base_url_to_pod(&addr)?,
m,
Some(m),
None,
));
} else {
results.push(PeerRequestMetricInfo::new(
parse_base_url_to_pod(&addr)?,
0, // Use 0 as the count if the metric is not found
None,
None,
));
}
Expand Down Expand Up @@ -507,10 +513,20 @@ impl ScenarioState {
Ok(())
}
Scenario::ReconEventSync => {
let peers = self
let mut peers = self
.get_peers_counter_metric(EVENT_SYNC_METRIC_NAME, IPFS_SERVICE_METRICS_PORT)
.await?;

// TEMP: fall back to old metric if new one is 0 jic we're on an old version of c1
if peers.iter().all(|p| p.count.is_none()) {
peers = self
.get_peers_counter_metric(
EVENT_SYNC_METRIC_NAME_OLD,
IPFS_SERVICE_METRICS_PORT,
)
.await?;
}

self.before_metrics = Some(peers);
Ok(())
}
Expand Down Expand Up @@ -553,7 +569,7 @@ impl ScenarioState {
for (name, count) in res {
let metric = PeerRequestMetricInfo::new(
name.clone(),
count as u64,
Some(count as u64),
Some(metrics.duration as u64),
);
let rps = metric.rps();
Expand Down Expand Up @@ -770,14 +786,28 @@ impl ScenarioState {
| Scenario::CeramicNewStreamsBenchmark => (CommandResult::Success, None),
Scenario::ReconEventSync => {
let default_rate = 300;
let metric_name = EVENT_SYNC_METRIC_NAME;
let mut metric_name = EVENT_SYNC_METRIC_NAME;

let peer_req_cnts = match self
.get_peers_counter_metric(metric_name, IPFS_SERVICE_METRICS_PORT)
.await
.map_err(CommandResult::Failure)
{
Ok(v) => v,
Ok(v) => {
if v.iter().all(|p| p.count.is_none()) {
metric_name = EVENT_SYNC_METRIC_NAME_OLD;
match self
.get_peers_counter_metric(metric_name, IPFS_SERVICE_METRICS_PORT)
.await
.map_err(CommandResult::Failure)
{
Ok(v) => v,
Err(e) => return (e, None),
}
} else {
v
}
}
Err(e) => return (e, None),
};

Expand Down Expand Up @@ -812,15 +842,15 @@ impl ScenarioState {
let errors = peer_metrics.iter().flat_map(|p| {
let rps = p.rps();
if rps < threshold {
warn!(current_req_cnt=%p.count, run_time_seconds=?p.runtime, %threshold, %rps, "rps less than threshold");
warn!(current_req_cnt=?p.count, run_time_seconds=?p.runtime, %threshold, %rps, "rps less than threshold");
Some(
format!(
"Peer {} RPS less than threshold: {} < {}",
p.name, rps, threshold
),
)
} else {
info!(current_req_cnt=%p.count, before=%p.count, run_time_seconds=?p.runtime, %threshold, %rps, "success! peer {} over the threshold", p.name);
info!(current_req_cnt=?p.count, before=?p.count, run_time_seconds=?p.runtime, %threshold, %rps, "success! peer {} over the threshold", p.name);
None
}
}
Expand Down

0 comments on commit 09312e2

Please sign in to comment.