Skip to content

Commit

Permalink
Make repair metrics less chatty (#9094)
Browse files Browse the repository at this point in the history
  • Loading branch information
sakridge committed May 16, 2020
1 parent 3b526cc commit c7f8dba
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 36 deletions.
5 changes: 3 additions & 2 deletions archiver-lib/src/archiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use solana_core::{
gossip_service::GossipService,
packet::{limited_deserialize, PACKET_DATA_SIZE},
repair_service,
repair_service::{RepairService, RepairSlotRange, RepairStrategy},
repair_service::{RepairService, RepairSlotRange, RepairStats, RepairStrategy},
serve_repair::ServeRepair,
shred_fetch_stage::ShredFetchStage,
sigverify_stage::{DisabledSigVerifier, SigVerifyStage},
Expand Down Expand Up @@ -839,13 +839,14 @@ impl Archiver {
repair_service::MAX_REPAIR_LENGTH,
&repair_slot_range,
);
let mut repair_stats = RepairStats::default();
//iter over the repairs and send them
if let Ok(repairs) = repairs {
let reqs: Vec<_> = repairs
.into_iter()
.filter_map(|repair_request| {
serve_repair
.map_repair_request(&repair_request)
.map_repair_request(&repair_request, &mut repair_stats)
.map(|result| ((archiver_info.gossip, result), repair_request))
.ok()
})
Expand Down
46 changes: 44 additions & 2 deletions core/src/repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,31 @@ use std::{
sync::{Arc, RwLock},
thread::sleep,
thread::{self, Builder, JoinHandle},
time::Duration,
time::{Duration, Instant},
};

#[derive(Default)]
pub struct RepairStatsGroup {
pub count: u64,
pub min: u64,
pub max: u64,
}

impl RepairStatsGroup {
pub fn update(&mut self, slot: u64) {
self.count += 1;
self.min = std::cmp::min(self.min, slot);
self.max = std::cmp::max(self.max, slot);
}
}

#[derive(Default)]
pub struct RepairStats {
pub shred: RepairStatsGroup,
pub highest_shred: RepairStatsGroup,
pub orphan: RepairStatsGroup,
}

pub const MAX_REPAIR_LENGTH: usize = 512;
pub const REPAIR_MS: u64 = 100;
pub const MAX_ORPHANS: usize = 5;
Expand Down Expand Up @@ -107,6 +129,8 @@ impl RepairService {
cluster_info,
);
}
let mut repair_stats = RepairStats::default();
let mut last_stats = Instant::now();
loop {
if exit.load(Ordering::Relaxed) {
break;
Expand Down Expand Up @@ -148,7 +172,7 @@ impl RepairService {
.into_iter()
.filter_map(|repair_request| {
serve_repair
.repair_request(&repair_request)
.repair_request(&repair_request, &mut repair_stats)
.map(|result| (result, repair_request))
.ok()
})
Expand All @@ -161,6 +185,24 @@ impl RepairService {
});
}
}
if last_stats.elapsed().as_secs() > 1 {
let repair_total = repair_stats.shred.count
+ repair_stats.highest_shred.count
+ repair_stats.orphan.count;
if repair_total > 0 {
datapoint_info!(
"serve_repair-repair",
("repair-total", repair_total, i64),
("shred-count", repair_stats.shred.count, i64),
("highest-shred-count", repair_stats.highest_shred.count, i64),
("orphan-count", repair_stats.orphan.count, i64),
("repair-highest-slot", repair_stats.highest_shred.max, i64),
("repair-orphan", repair_stats.orphan.max, i64),
);
}
repair_stats = RepairStats::default();
last_stats = Instant::now();
}
sleep(Duration::from_millis(REPAIR_MS));
}
}
Expand Down
98 changes: 71 additions & 27 deletions core/src/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
cluster_info::{ClusterInfo, ClusterInfoError},
contact_info::ContactInfo,
packet::Packet,
repair_service::RepairStats,
result::{Error, Result},
};
use bincode::serialize;
Expand Down Expand Up @@ -46,6 +47,16 @@ impl RepairType {
}
}

#[derive(Default)]
pub struct ServeRepairStats {
pub total_packets: usize,
pub processed: usize,
pub self_repair: usize,
pub window_index: usize,
pub highest_window_index: usize,
pub orphan: usize,
}

/// Window protocol messages
#[derive(Serialize, Deserialize, Debug)]
enum RepairProtocol {
Expand Down Expand Up @@ -104,25 +115,22 @@ impl ServeRepair {
from_addr: &SocketAddr,
blockstore: Option<&Arc<Blockstore>>,
request: RepairProtocol,
stats: &mut ServeRepairStats,
) -> Option<Packets> {
let now = Instant::now();

//TODO verify from is signed
let my_id = me.read().unwrap().keypair.pubkey();
let from = Self::get_repair_sender(&request);
if from.id == my_id {
warn!(
"{}: Ignored received repair request from ME {}",
my_id, from.id,
);
inc_new_counter_debug!("serve_repair-handle-repair--eq", 1);
stats.self_repair += 1;
return None;
}

let (res, label) = {
match &request {
RepairProtocol::WindowIndex(from, slot, shred_index) => {
inc_new_counter_debug!("serve_repair-request-window-index", 1);
stats.window_index += 1;
(
Self::run_window_request(
recycler,
Expand All @@ -138,7 +146,7 @@ impl ServeRepair {
}

RepairProtocol::HighestWindowIndex(_, slot, highest_index) => {
inc_new_counter_debug!("serve_repair-request-highest-window-index", 1);
stats.highest_window_index += 1;
(
Self::run_highest_window_request(
recycler,
Expand All @@ -151,7 +159,7 @@ impl ServeRepair {
)
}
RepairProtocol::Orphan(_, slot) => {
inc_new_counter_debug!("serve_repair-request-orphan", 1);
stats.orphan += 1;
(
Self::run_orphan(
recycler,
Expand Down Expand Up @@ -186,6 +194,7 @@ impl ServeRepair {
requests_receiver: &PacketReceiver,
response_sender: &PacketSender,
max_packets: &mut usize,
stats: &mut ServeRepairStats,
) -> Result<()> {
//TODO cache connections
let timeout = Duration::new(1, 0);
Expand All @@ -202,7 +211,7 @@ impl ServeRepair {

let mut time = Measure::start("repair::handle_packets");
for reqs in reqs_v {
Self::handle_packets(obj, &recycler, blockstore, reqs, response_sender);
Self::handle_packets(obj, &recycler, blockstore, reqs, response_sender, stats);
}
time.stop();
if total_packets >= *max_packets {
Expand All @@ -215,6 +224,31 @@ impl ServeRepair {
Ok(())
}

fn report_reset_stats(me: &Arc<RwLock<Self>>, stats: &mut ServeRepairStats) {
if stats.self_repair > 0 {
let my_id = me.read().unwrap().keypair.pubkey();
warn!(
"{}: Ignored received repair requests from ME: {}",
my_id, stats.self_repair,
);
inc_new_counter_debug!("serve_repair-handle-repair--eq", stats.self_repair);
}

debug!(
"repair_listener: total_packets: {} passed: {}",
stats.total_packets, stats.processed
);

inc_new_counter_debug!("serve_repair-request-window-index", stats.window_index);
inc_new_counter_debug!(
"serve_repair-request-highest-window-index",
stats.highest_window_index
);
inc_new_counter_debug!("serve_repair-request-orphan", stats.orphan);

*stats = ServeRepairStats::default();
}

pub fn listen(
me: Arc<RwLock<Self>>,
blockstore: Option<Arc<Blockstore>>,
Expand All @@ -228,6 +262,8 @@ impl ServeRepair {
.name("solana-repair-listen".to_string())
.spawn(move || {
let mut max_packets = 1024;
let mut last_print = Instant::now();
let mut stats = ServeRepairStats::default();
loop {
let result = Self::run_listen(
&me,
Expand All @@ -236,6 +272,7 @@ impl ServeRepair {
&requests_receiver,
&response_sender,
&mut max_packets,
&mut stats,
);
match result {
Err(Error::RecvTimeoutError(_)) | Ok(_) => {}
Expand All @@ -244,6 +281,10 @@ impl ServeRepair {
if exit.load(Ordering::Relaxed) {
return;
}
if last_print.elapsed().as_secs() > 2 {
Self::report_reset_stats(&me, &mut stats);
last_print = Instant::now();
}
thread_mem_usage::datapoint("solana-repair-listen");
}
})
Expand All @@ -256,6 +297,7 @@ impl ServeRepair {
blockstore: Option<&Arc<Blockstore>>,
packets: Packets,
response_sender: &PacketSender,
stats: &mut ServeRepairStats,
) {
// iter over the packets, collect pulls separately and process everything else
let allocated = thread_mem_usage::Allocatedp::default();
Expand All @@ -265,7 +307,9 @@ impl ServeRepair {
limited_deserialize(&packet.data[..packet.meta.size])
.into_iter()
.for_each(|request| {
let rsp = Self::handle_repair(me, recycler, &from_addr, blockstore, request);
stats.processed += 1;
let rsp =
Self::handle_repair(me, recycler, &from_addr, blockstore, request, stats);
if let Some(rsp) = rsp {
let _ignore_disconnect = response_sender.send(rsp);
}
Expand Down Expand Up @@ -295,7 +339,11 @@ impl ServeRepair {
Ok(out)
}

pub fn repair_request(&self, repair_request: &RepairType) -> Result<(SocketAddr, Vec<u8>)> {
pub fn repair_request(
&self,
repair_request: &RepairType,
repair_stats: &mut RepairStats,
) -> Result<(SocketAddr, Vec<u8>)> {
// find a peer that appears to be accepting replication and has the desired slot, as indicated
// by a valid tvu port location
let valid: Vec<_> = self
Expand All @@ -308,31 +356,27 @@ impl ServeRepair {
}
let n = thread_rng().gen::<usize>() % valid.len();
let addr = valid[n].serve_repair; // send the request to the peer's serve_repair port
let out = self.map_repair_request(repair_request)?;
let out = self.map_repair_request(repair_request, repair_stats)?;

Ok((addr, out))
}

pub fn map_repair_request(&self, repair_request: &RepairType) -> Result<Vec<u8>> {
pub fn map_repair_request(
&self,
repair_request: &RepairType,
repair_stats: &mut RepairStats,
) -> Result<Vec<u8>> {
match repair_request {
RepairType::Shred(slot, shred_index) => {
datapoint_debug!(
"serve_repair-repair",
("repair-slot", *slot, i64),
("repair-ix", *shred_index, i64)
);
repair_stats.shred.update(*slot);
Ok(self.window_index_request_bytes(*slot, *shred_index)?)
}
RepairType::HighestShred(slot, shred_index) => {
datapoint_info!(
"serve_repair-repair_highest",
("repair-highest-slot", *slot, i64),
("repair-highest-ix", *shred_index, i64)
);
repair_stats.highest_shred.update(*slot);
Ok(self.window_highest_index_request_bytes(*slot, *shred_index)?)
}
RepairType::Orphan(slot) => {
datapoint_info!("serve_repair-repair_orphan", ("repair-orphan", *slot, i64));
repair_stats.orphan.update(*slot);
Ok(self.orphan_bytes(*slot)?)
}
}
Expand Down Expand Up @@ -592,7 +636,7 @@ mod tests {
let me = ContactInfo::new_localhost(&Pubkey::new_rand(), timestamp());
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair(me)));
let serve_repair = ServeRepair::new(cluster_info.clone());
let rv = serve_repair.repair_request(&RepairType::Shred(0, 0));
let rv = serve_repair.repair_request(&RepairType::Shred(0, 0), &mut RepairStats::default());
assert_matches!(rv, Err(Error::ClusterInfoError(ClusterInfoError::NoPeers)));

let serve_repair_addr = socketaddr!([127, 0, 0, 1], 1243);
Expand All @@ -613,7 +657,7 @@ mod tests {
};
cluster_info.write().unwrap().insert_info(nxt.clone());
let rv = serve_repair
.repair_request(&RepairType::Shred(0, 0))
.repair_request(&RepairType::Shred(0, 0), &mut RepairStats::default())
.unwrap();
assert_eq!(nxt.serve_repair, serve_repair_addr);
assert_eq!(rv.0, nxt.serve_repair);
Expand All @@ -640,7 +684,7 @@ mod tests {
while !one || !two {
//this randomly picks an option, so eventually it should pick both
let rv = serve_repair
.repair_request(&RepairType::Shred(0, 0))
.repair_request(&RepairType::Shred(0, 0), &mut RepairStats::default())
.unwrap();
if rv.0 == serve_repair_addr {
one = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6620,8 +6620,8 @@
"hide": false,
"measurement": "cluster_info-vote-count",
"orderByTime": "ASC",
"policy": "autogen",
"query": "SELECT sum(\"count\") FROM \"$testnet\".\"autogen\".\"banking_stage-buffered_packets\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)",
"policy": "default",
"query": "SELECT last(\"repair-highest-slot\") AS \"slot\" FROM \"$testnet\".\"autogen\".\"serve_repair-repair\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)",
"rawQuery": true,
"refId": "A",
"resultFormat": "time_series",
Expand Down Expand Up @@ -6658,7 +6658,7 @@
],
"orderByTime": "ASC",
"policy": "default",
"query": "SELECT sum(\"count\") FROM \"$testnet\".\"autogen\".\"banking_stage-forwarded_packets\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)",
"query": "SELECT last(\"repair-highest-ix\") AS \"ix\" FROM \"$testnet\".\"autogen\".\"serve_repair-repair\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)",
"rawQuery": true,
"refId": "B",
"resultFormat": "time_series",
Expand Down Expand Up @@ -6922,8 +6922,8 @@
],
"measurement": "cluster_info-vote-count",
"orderByTime": "ASC",
"policy": "autogen",
"query": "SELECT sum(\"recovered\") AS \"recovered\" FROM \"$testnet\".\"autogen\".\"blockstore-erasure\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval) FILL(0)",
"policy": "default",
"query": "SELECT last(\"repair-orphan\") AS \"slot\" FROM \"$testnet\".\"autogen\".\"serve_repair-repair\" WHERE host_id::tag =~ /$hostid/ AND $timeFilter GROUP BY time($__interval)",
"rawQuery": true,
"refId": "B",
"resultFormat": "time_series",
Expand Down

0 comments on commit c7f8dba

Please sign in to comment.