Skip to content

Commit

Permalink
refactor to add an interval type (#252)
Browse files Browse the repository at this point in the history
Adds a new interval type so we can reduce code repitition across
the samplers.
  • Loading branch information
brayniac authored May 3, 2024
1 parent d092ddf commit 2bb29dc
Show file tree
Hide file tree
Showing 25 changed files with 243 additions and 754 deletions.
2 changes: 1 addition & 1 deletion build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ mod bpf {
// `SOURCES` lists all BPF programs and the sampler that contains them.
// Each entry `(sampler, program)` maps to a unique path in the `samplers`
// directory.
const SOURCES: &'static [(&str, &str)] = &[
const SOURCES: &[(&str, &str)] = &[
("block_io", "latency"),
("cpu", "usage"),
("network", "traffic"),
Expand Down
2 changes: 1 addition & 1 deletion src/common/bpf/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl<'a> Counters<'a> {
let start = (cpu * self.cachelines * CACHELINE_SIZE)
+ (idx * std::mem::size_of::<u64>());
let value = u64::from_ne_bytes([
self.mmap[start + 0],
self.mmap[start],
self.mmap[start + 1],
self.mmap[start + 2],
self.mmap[start + 3],
Expand Down
2 changes: 1 addition & 1 deletion src/common/bpf/distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl<'a> Distribution<'a> {
}

let val = u64::from_ne_bytes([
self.mmap[start + 0],
self.mmap[start],
self.mmap[start + 1],
self.mmap[start + 2],
self.mmap[start + 3],
Expand Down
43 changes: 43 additions & 0 deletions src/common/interval.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use crate::Instant;
use core::time::Duration;

pub struct Interval {
prev: Instant,
next: Instant,
period: Duration,
}

impl Interval {
pub fn new(start: Instant, period: Duration) -> Self {
Self {
prev: start,
next: start,
period,
}
}

/// Try to tick the interval forward to the provided instant. Returns true
/// if the interval has fired and returns false otherwise.
pub fn try_wait(&mut self, now: Instant) -> Result<Duration, ()> {
if now < self.next {
return Err(());
}

let next = self.next + self.period;

// check if we have fallen behind
if next > now {
self.next = next;
} else {
// if we fell behind, don't sample again until the interval has
// elapsed
self.next = now + self.period;
}

let elapsed = now - self.prev;

self.prev = now;

Ok(elapsed)
}
}
3 changes: 3 additions & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ pub mod bpf;
pub mod classic;
pub mod units;

mod interval;
mod nop;

use metriken::AtomicHistogram;
use metriken::LazyCounter;

pub use interval::Interval;
pub use nop::Nop;

pub const HISTOGRAM_GROUPING_POWER: u8 = 7;
Expand Down
34 changes: 7 additions & 27 deletions src/samplers/cpu/linux/perf/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::common::Interval;
use crate::common::Nop;
use crate::samplers::cpu::*;
use metriken::{DynBoxedMetric, MetricBuilder};
Expand Down Expand Up @@ -30,9 +31,7 @@ fn init(config: &Config) -> Box<dyn Sampler> {
const NAME: &str = "cpu_perf";

pub struct Perf {
prev: Instant,
next: Instant,
interval: Duration,
interval: Interval,
groups: Vec<PerfGroup>,
counters: Vec<Vec<DynBoxedMetric<metriken::Counter>>>,
gauges: Vec<Vec<DynBoxedMetric<metriken::Gauge>>>,
Expand All @@ -45,8 +44,6 @@ impl Perf {
return Err(());
}

let now = Instant::now();

let cpus = match hardware_info() {
Ok(hwinfo) => hwinfo.get_cpus(),
Err(_) => return Err(()),
Expand Down Expand Up @@ -101,27 +98,25 @@ impl Perf {
};
}

if groups.len() == 0 {
if groups.is_empty() {
error!("Failed to create the perf group on any CPU");
return Err(());
}

return Ok(Self {
prev: now,
next: now,
interval: config.interval(NAME),
Ok(Self {
interval: Interval::new(Instant::now(), config.interval(NAME)),
groups,
counters,
gauges,
});
})
}
}

impl Sampler for Perf {
fn sample(&mut self) {
let now = Instant::now();

if now < self.next {
if self.interval.try_wait(now).is_err() {
return;
}

Expand Down Expand Up @@ -178,20 +173,5 @@ impl Sampler for Perf {
CPU_FREQUENCY_AVERAGE.set((avg_running_frequency / nr_active_groups) as i64);
CPU_CORES.set(nr_active_groups as _);
}

// determine when to sample next
let next = self.next + self.interval;

// it's possible we fell behind
if next > now {
// if we didn't, sample at the next planned time
self.next = next;
} else {
// if we did, sample after the interval has elapsed
self.next = now + self.interval;
}

// mark when we last sampled
self.prev = now;
}
}
22 changes: 11 additions & 11 deletions src/samplers/cpu/linux/perf/perf_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl PerfGroup {
continue;
}

if let Ok(c) = counter.as_follower(cpu, &mut group[leader_id].as_mut().unwrap()) {
if let Ok(c) = counter.as_follower(cpu, group[leader_id].as_mut().unwrap()) {
group.resize_with(*counter as usize + 1, || None);
group[*counter as usize] = Some(c);
}
Expand All @@ -173,12 +173,12 @@ impl PerfGroup {
.map(|inner| GroupData { inner })
.ok();

return Ok(Self {
Ok(Self {
cpu,
leader_id,
group,
prev,
});
})
}

pub fn get_metrics(&mut self) -> Result<Reading, ()> {
Expand Down Expand Up @@ -226,30 +226,30 @@ impl PerfGroup {
let mut mperf = None;

if let Some(Some(c)) = &self.group.get(Counter::Cycles as usize) {
cycles = current.delta(prev, &c);
cycles = current.delta(prev, c);
}

if let Some(Some(c)) = &self.group.get(Counter::Instructions as usize) {
instructions = current.delta(prev, &c);
instructions = current.delta(prev, c);
}

if let Some(Some(c)) = &self.group.get(Counter::Tsc as usize) {
tsc = current.delta(prev, &c);
tsc = current.delta(prev, c);
}

if let Some(Some(c)) = &self.group.get(Counter::Aperf as usize) {
aperf = current.delta(prev, &c);
aperf = current.delta(prev, c);
}

if let Some(Some(c)) = &self.group.get(Counter::Mperf as usize) {
mperf = current.delta(prev, &c);
mperf = current.delta(prev, c);
}

let ipkc = if instructions.is_some() && cycles.is_some() {
if cycles.unwrap() == 0 {
let ipkc = if let (Some(instructions), Some(cycles)) = (instructions, cycles) {
if cycles == 0 {
None
} else {
Some(instructions.unwrap() * 1000 / cycles.unwrap())
Some(instructions * 1000 / cycles)
}
} else {
None
Expand Down
90 changes: 20 additions & 70 deletions src/samplers/cpu/linux/usage/bpf.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const ONLINE_CORES_REFRESH: Duration = Duration::from_secs(1);

#[allow(clippy::module_inception)]
mod bpf {
include!(concat!(env!("OUT_DIR"), "/cpu_usage.bpf.rs"));
}
Expand Down Expand Up @@ -34,16 +35,11 @@ pub struct CpuUsage {
percpu_counters: Arc<PercpuCounters>,
sum_prev: u64,
percpu_sum_prev: Vec<u64>,
counter_interval: Duration,
counter_next: Instant,
counter_prev: Instant,
distribution_interval: Duration,
distribution_next: Instant,
distribution_prev: Instant,
counter_interval: Interval,
distribution_interval: Interval,
online_cores: usize,
online_cores_file: std::fs::File,
online_cores_interval: Duration,
online_cores_next: Instant,
online_cores_interval: Interval,
}

const IDLE_CPUTIME_INDEX: usize = 5;
Expand Down Expand Up @@ -138,26 +134,16 @@ impl CpuUsage {
percpu_counters,
sum_prev: 0,
percpu_sum_prev: vec![0; cpus.len()],
counter_interval: config.interval(NAME),
counter_next: now,
counter_prev: now,
distribution_interval: config.distribution_interval(NAME),
distribution_next: now,
distribution_prev: now,
counter_interval: Interval::new(now, config.interval(NAME)),
distribution_interval: Interval::new(now, config.distribution_interval(NAME)),
online_cores,
online_cores_file,
online_cores_interval: ONLINE_CORES_REFRESH,
online_cores_next: now + ONLINE_CORES_REFRESH,
online_cores_interval: Interval::new(now, ONLINE_CORES_REFRESH),
})
}

pub fn refresh_counters(&mut self, now: Instant) {
if now < self.counter_next {
return;
}

// get the amount of time since we last sampled
let elapsed = now - self.counter_prev;
pub fn refresh_counters(&mut self, now: Instant) -> Result<(), ()> {
let elapsed = self.counter_interval.try_wait(now)?;

// refresh the counters from the kernel-space counters
self.bpf.refresh_counters(elapsed.as_secs_f64());
Expand Down Expand Up @@ -190,59 +176,24 @@ impl CpuUsage {
// update the previous sums
self.sum_prev += busy_delta + idle_delta;

// determine when to sample next
let next = self.counter_next + self.counter_interval;

// check that next sample time is in the future
if next > now {
self.counter_next = next;
} else {
self.counter_next = now + self.counter_interval;
}

// mark when we last sampled
self.counter_prev = now;
Ok(())
}

pub fn refresh_distributions(&mut self, now: Instant) {
if now < self.distribution_next {
return;
}

pub fn refresh_distributions(&mut self, now: Instant) -> Result<(), ()> {
self.distribution_interval.try_wait(now)?;
self.bpf.refresh_distributions();

// determine when to sample next
let next = self.distribution_next + self.distribution_interval;

// check that next sample time is in the future
if next > now {
self.distribution_next = next;
} else {
self.distribution_next = now + self.distribution_interval;
}

// mark when we last sampled
self.distribution_prev = now;
Ok(())
}

pub fn update_online_cores(&mut self, now: Instant) {
if now < self.online_cores_next {
return;
}
pub fn update_online_cores(&mut self, now: Instant) -> Result<(), ()> {
self.online_cores_interval.try_wait(now)?;

if let Ok(v) = online_cores(&mut self.online_cores_file) {
self.online_cores = v;
}

// determine when to update next
let next = self.online_cores_next + self.online_cores_interval;

// check that next update time is in the future
if next > now {
self.online_cores_next = next;
} else {
self.online_cores_next = now + self.online_cores_interval;
}
Ok(())
}
}

Expand All @@ -265,8 +216,7 @@ fn sum() -> u64 {
}

fn online_cores(file: &mut std::fs::File) -> Result<usize, ()> {
let _ = file
.rewind()
file.rewind()
.map_err(|e| error!("failed to seek to start of file: {e}"))?;

let mut count = 0;
Expand Down Expand Up @@ -312,8 +262,8 @@ fn online_cores(file: &mut std::fs::File) -> Result<usize, ()> {
impl Sampler for CpuUsage {
fn sample(&mut self) {
let now = Instant::now();
self.update_online_cores(now);
self.refresh_counters(now);
self.refresh_distributions(now);
let _ = self.update_online_cores(now);
let _ = self.refresh_counters(now);
let _ = self.refresh_distributions(now);
}
}
Loading

0 comments on commit 2bb29dc

Please sign in to comment.