Skip to content

Commit

Permalink
Add metric to measure the time it takes to gather enough assignments (#…
Browse files Browse the repository at this point in the history
…4587)

To understand with high granularity how many assignment tranches are
triggered before we concur that we have enough assignments.

This metric is important because the triggering of an assignment creates
a lot of work in the system for approving the candidate and gossiping
the necessary messages.

---------

Signed-off-by: Alexandru Gheorghe <alexandru.gheorghe@parity.io>
Co-authored-by: ordian <write@reusable.software>
  • Loading branch information
alexggh and ordian authored May 28, 2024
1 parent 650b124 commit ad22fa6
Show file tree
Hide file tree
Showing 3 changed files with 406 additions and 6 deletions.
6 changes: 5 additions & 1 deletion polkadot/node/core/approval-voting/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ pub(crate) mod tests {
use super::*;
use crate::{
approval_db::common::{load_block_entry, DbBackend},
RuntimeInfo, RuntimeInfoConfig,
RuntimeInfo, RuntimeInfoConfig, MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS,
};
use ::test_helpers::{dummy_candidate_receipt, dummy_hash};
use assert_matches::assert_matches;
Expand All @@ -622,6 +622,7 @@ pub(crate) mod tests {
node_features::FeatureIndex, ExecutorParams, Id as ParaId, IndexedVec, NodeFeatures,
SessionInfo, ValidatorId, ValidatorIndex,
};
use schnellru::{ByLength, LruMap};
pub(crate) use sp_consensus_babe::{
digests::{CompatibleDigestItem, PreDigest, SecondaryVRFPreDigest},
AllowedSlots, BabeEpochConfiguration, Epoch as BabeEpoch,
Expand Down Expand Up @@ -658,6 +659,9 @@ pub(crate) mod tests {
clock: Box::new(MockClock::default()),
assignment_criteria: Box::new(MockAssignmentCriteria::default()),
spans: HashMap::new(),
per_block_assignments_gathering_times: LruMap::new(ByLength::new(
MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS,
)),
}
}

Expand Down
170 changes: 166 additions & 4 deletions polkadot/node/core/approval-voting/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ use sc_keystore::LocalKeystore;
use sp_application_crypto::Pair;
use sp_consensus::SyncOracle;
use sp_consensus_slots::Slot;
use std::time::Instant;

// The max number of blocks we keep track of assignments gathering times. Normally,
// this would never be reached because we prune the data on finalization, but we need
// to also ensure the data is not growing unecessarily large.
const MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS: u32 = 100;

use futures::{
channel::oneshot,
Expand Down Expand Up @@ -182,6 +188,14 @@ struct MetricsInner {
time_recover_and_approve: prometheus::Histogram,
candidate_signatures_requests_total: prometheus::Counter<prometheus::U64>,
unapproved_candidates_in_unfinalized_chain: prometheus::Gauge<prometheus::U64>,
// The time it takes in each stage to gather enough assignments.
// We defined a `stage` as being the entire process of gathering enough assignments to
// be able to approve a candidate:
// E.g:
// - Stage 0: We wait for the needed_approvals assignments to be gathered.
// - Stage 1: We wait for enough tranches to cover all no-shows in stage 0.
// - Stage 2: We wait for enough tranches to cover all no-shows of stage 1.
assignments_gathering_time_by_stage: prometheus::HistogramVec,
}

/// Approval Voting metrics.
Expand Down Expand Up @@ -302,6 +316,20 @@ impl Metrics {
metrics.unapproved_candidates_in_unfinalized_chain.set(count as u64);
}
}

pub fn observe_assignment_gathering_time(&self, stage: usize, elapsed_as_millis: usize) {
if let Some(metrics) = &self.0 {
let stage_string = stage.to_string();
// We don't want to have too many metrics entries with this label to not put unncessary
// pressure on the metrics infrastructure, so we cap the stage at 10, which is
// equivalent to having already a finalization lag to 10 * no_show_slots, so it should
// be more than enough.
metrics
.assignments_gathering_time_by_stage
.with_label_values(&[if stage < 10 { stage_string.as_str() } else { "inf" }])
.observe(elapsed_as_millis as f64);
}
}
}

impl metrics::Metrics for Metrics {
Expand Down Expand Up @@ -431,6 +459,17 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
assignments_gathering_time_by_stage: prometheus::register(
prometheus::HistogramVec::new(
prometheus::HistogramOpts::new(
"polkadot_parachain_assignments_gather_time_by_stage_ms",
"The time in ms it takes for each stage to gather enough assignments needed for approval",
)
.buckets(vec![0.0, 250.0, 500.0, 1000.0, 2000.0, 4000.0, 8000.0, 16000.0, 32000.0]),
&["stage"],
)?,
registry,
)?,
};

Ok(Metrics(Some(metrics)))
Expand Down Expand Up @@ -788,6 +827,28 @@ struct State {
clock: Box<dyn Clock + Send + Sync>,
assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
spans: HashMap<Hash, jaeger::PerLeafSpan>,
// Per block, candidate records about how long we take until we gather enough
// assignments, this is relevant because it gives us a good idea about how many
// tranches we trigger and why.
per_block_assignments_gathering_times:
LruMap<BlockNumber, HashMap<(Hash, CandidateHash), AssignmentGatheringRecord>>,
}

#[derive(Debug, Clone, PartialEq, Eq)]
struct AssignmentGatheringRecord {
// The stage we are in.
// Candidate assignment gathering goes in stages, first we wait for needed_approvals(stage 0)
// Then if we have no-shows, we move into stage 1 and wait for enough tranches to cover all
// no-shows.
stage: usize,
// The time we started the stage.
stage_start: Option<Instant>,
}

impl Default for AssignmentGatheringRecord {
fn default() -> Self {
AssignmentGatheringRecord { stage: 0, stage_start: Some(Instant::now()) }
}
}

#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
Expand Down Expand Up @@ -893,6 +954,96 @@ impl State {
},
}
}

fn mark_begining_of_gathering_assignments(
&mut self,
block_number: BlockNumber,
block_hash: Hash,
candidate: CandidateHash,
) {
if let Some(record) = self
.per_block_assignments_gathering_times
.get_or_insert(block_number, HashMap::new)
.and_then(|records| Some(records.entry((block_hash, candidate)).or_default()))
{
if record.stage_start.is_none() {
record.stage += 1;
gum::debug!(
target: LOG_TARGET,
stage = ?record.stage,
?block_hash,
?candidate,
"Started a new assignment gathering stage",
);
record.stage_start = Some(Instant::now());
}
}
}

fn mark_gathered_enough_assignments(
&mut self,
block_number: BlockNumber,
block_hash: Hash,
candidate: CandidateHash,
) -> AssignmentGatheringRecord {
let record = self
.per_block_assignments_gathering_times
.get(&block_number)
.and_then(|entry| entry.get_mut(&(block_hash, candidate)));
let stage = record.as_ref().map(|record| record.stage).unwrap_or_default();
AssignmentGatheringRecord {
stage,
stage_start: record.and_then(|record| record.stage_start.take()),
}
}

fn cleanup_assignments_gathering_timestamp(&mut self, remove_lower_than: BlockNumber) {
while let Some((block_number, _)) = self.per_block_assignments_gathering_times.peek_oldest()
{
if *block_number < remove_lower_than {
self.per_block_assignments_gathering_times.pop_oldest();
} else {
break
}
}
}

fn observe_assignment_gathering_status(
&mut self,
metrics: &Metrics,
required_tranches: &RequiredTranches,
block_hash: Hash,
block_number: BlockNumber,
candidate_hash: CandidateHash,
) {
match required_tranches {
RequiredTranches::All | RequiredTranches::Pending { .. } => {
self.mark_begining_of_gathering_assignments(
block_number,
block_hash,
candidate_hash,
);
},
RequiredTranches::Exact { .. } => {
let time_to_gather =
self.mark_gathered_enough_assignments(block_number, block_hash, candidate_hash);
if let Some(gathering_started) = time_to_gather.stage_start {
if gathering_started.elapsed().as_millis() > 6000 {
gum::trace!(
target: LOG_TARGET,
?block_hash,
?candidate_hash,
"Long assignment gathering time",
);
}
metrics.observe_assignment_gathering_time(
time_to_gather.stage,
gathering_started.elapsed().as_millis() as usize,
)
}
},
}
}
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -942,6 +1093,9 @@ where
clock: subsystem.clock,
assignment_criteria,
spans: HashMap::new(),
per_block_assignments_gathering_times: LruMap::new(ByLength::new(
MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS,
)),
};

// `None` on start-up. Gets initialized/updated on leaf update
Expand Down Expand Up @@ -973,7 +1127,7 @@ where
subsystem.metrics.on_wakeup();
process_wakeup(
&mut ctx,
&state,
&mut state,
&mut overlayed_db,
&mut session_info_provider,
woken_block,
Expand Down Expand Up @@ -1632,6 +1786,7 @@ async fn handle_from_overseer<Context>(
// `prune_finalized_wakeups` prunes all finalized block hashes. We prune spans
// accordingly.
wakeups.prune_finalized_wakeups(block_number, &mut state.spans);
state.cleanup_assignments_gathering_timestamp(block_number);

// // `prune_finalized_wakeups` prunes all finalized block hashes. We prune spans
// accordingly. let hash_set =
Expand Down Expand Up @@ -2478,7 +2633,7 @@ where

async fn check_and_import_approval<T, Sender>(
sender: &mut Sender,
state: &State,
state: &mut State,
db: &mut OverlayedBackend<'_, impl Backend>,
session_info_provider: &mut RuntimeInfo,
metrics: &Metrics,
Expand Down Expand Up @@ -2710,7 +2865,7 @@ impl ApprovalStateTransition {
// as necessary and schedules any further wakeups.
async fn advance_approval_state<Sender>(
sender: &mut Sender,
state: &State,
state: &mut State,
db: &mut OverlayedBackend<'_, impl Backend>,
session_info_provider: &mut RuntimeInfo,
metrics: &Metrics,
Expand Down Expand Up @@ -2761,6 +2916,13 @@ where
approval_entry,
status.required_tranches.clone(),
);
state.observe_assignment_gathering_status(
&metrics,
&status.required_tranches,
block_hash,
block_entry.block_number(),
candidate_hash,
);

// Check whether this is approved, while allowing a maximum
// assignment tick of `now - APPROVAL_DELAY` - that is, that
Expand Down Expand Up @@ -2941,7 +3103,7 @@ fn should_trigger_assignment(
#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
async fn process_wakeup<Context>(
ctx: &mut Context,
state: &State,
state: &mut State,
db: &mut OverlayedBackend<'_, impl Backend>,
session_info_provider: &mut RuntimeInfo,
relay_block: Hash,
Expand Down
Loading

0 comments on commit ad22fa6

Please sign in to comment.