Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-introduce per-process CPU collection #1146

Merged
merged 5 commits into from
Dec 14, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 52 additions & 17 deletions lading/src/observer/linux/procfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
mod memory;
mod stat;

use std::{collections::VecDeque, io};
use std::{
collections::{hash_map::Entry, VecDeque},
io,
};

use metrics::gauge;
use nix::errno::Errno;
use procfs::process::Process;
use rustc_hash::FxHashSet;
use rustc_hash::{FxHashMap, FxHashSet};
use tracing::{error, warn};

const BYTES_PER_KIBIBYTE: u64 = 1024;
Expand Down Expand Up @@ -45,6 +48,15 @@ macro_rules! report_status_field {
};
}

#[derive(Debug)]
struct ProcessInfo {
cmdline: String,
exe: String,
comm: String,
pid_s: String,
stat_sampler: stat::Sampler,
}

#[derive(Debug)]
pub(crate) struct Sampler {
parent: Process,
Expand All @@ -65,6 +77,8 @@ impl Sampler {
clippy::cast_possible_wrap
)]
pub(crate) async fn poll(&mut self) -> Result<(), Error> {
let mut proccess_info: FxHashMap<i32, ProcessInfo> = FxHashMap::default();
blt marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be missing something in the control flow here, but it looks like this needs to be held across poll calls.

Related, once it is held for the duration of lading, expiration may be an issue. PID reuse seems unlikely, but memory bloat is a concern in cases where there are many short lived processes.

Copy link
Collaborator Author

@blt blt Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I just noticed that myself. Commit b84ec4a fixes this.

Expiration was a problem previously as well that we didn't solve, so we're no worse than before. But it is a potential problem I agree.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For each poll would it be wrong to remove pid entries in the process_info map whose pids were not seen in the current poll to solve the expiration problem?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No that would work. You could do it pretty cheaply by having two hashmaps, move entries from one to the other each loop and clear out the map that was being moved from.


// A tally of the total RSS and PSS consumed by the parent process and
// its children.
let mut aggr = memory::smaps_rollup::Aggregator::default();
Expand Down Expand Up @@ -121,15 +135,36 @@ impl Sampler {
continue;
}

// Construct labels and then start ripping data into metrics.
let exe = proc_exe(pid).await?;
let comm = proc_comm(pid).await?;
let cmdline = proc_cmdline(pid).await?;
let labels = [
(String::from("pid"), format!("{pid}")),
(String::from("exe"), exe.clone()),
(String::from("cmdline"), cmdline.clone()),
(String::from("comm"), comm.clone()),
// If we haven't seen this process before, initialize its ProcessInfo.
match proccess_info.entry(pid) {
Entry::Occupied(_) => { /* Already initialized */ }
Entry::Vacant(entry) => {
let exe = proc_exe(pid).await?;
let comm = proc_comm(pid).await?;
let cmdline = proc_cmdline(pid).await?;
let pid_s = format!("{pid}");
let stat_sampler = stat::Sampler::new();

entry.insert(ProcessInfo {
cmdline,
exe,
comm,
pid_s,
stat_sampler,
});
}
}

// SAFETY: We've just inserted this pid into the map.
let pinfo = proccess_info
.get_mut(&pid)
.expect("catastrophic programming error");

let labels: [(&'static str, String); 4] = [
("pid", pinfo.pid_s.clone()),
("exe", pinfo.exe.clone()),
("cmdline", pinfo.cmdline.clone()),
("comm", pinfo.comm.clone()),
];

// `/proc/{pid}/status`
Expand All @@ -145,7 +180,7 @@ impl Sampler {
let uptime = proc_uptime().await?;

// `/proc/{pid}/stat`, most especially per-process CPU data.
if let Err(e) = stat::poll(pid, uptime, &labels).await {
if let Err(e) = pinfo.stat_sampler.poll(pid, uptime, &labels).await {
// We don't want to bail out entirely if we can't read stats
// which will happen if we don't have permissions or, more
// likely, the process has exited.
Expand All @@ -157,11 +192,11 @@ impl Sampler {
match memory::smaps::Regions::from_pid(pid) {
Ok(memory_regions) => {
for (pathname, measures) in memory_regions.aggregate_by_pathname() {
let labels = [
("pid", format!("{pid}")),
("exe", exe.clone()),
("cmdline", cmdline.clone()),
("comm", comm.clone()),
let labels: [(&'static str, String); 5] = [
("pid", pinfo.pid_s.clone()),
("exe", pinfo.exe.clone()),
("cmdline", pinfo.cmdline.clone()),
("comm", pinfo.comm.clone()),
("pathname", pathname),
];
gauge!("smaps.rss.by_pathname", &labels).set(measures.rss as f64);
Expand Down
2 changes: 1 addition & 1 deletion lading/src/observer/linux/procfs/memory/smaps_rollup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub(crate) struct Aggregator {
// Read `/proc/{pid}/smaps_rollup` and parse it directly into metrics.
pub(crate) async fn poll(
pid: i32,
labels: &[(String, String)],
labels: &[(&'static str, String)],
aggr: &mut Aggregator,
) -> Result<(), Error> {
let path = format!("/proc/{pid}/smaps_rollup");
Expand Down
136 changes: 70 additions & 66 deletions lading/src/observer/linux/procfs/stat.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use metrics::gauge;
use once_cell::sync::OnceCell;
use std::sync::Mutex;
use tokio::fs;

use crate::observer::linux::cgroup;
Expand All @@ -19,7 +17,7 @@ pub enum Error {
Cgroup(#[from] cgroup::v2::Error),
}

#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, Default)]
#[allow(clippy::struct_field_names)] // The _ticks is useful even if clippy doesn't like it.
struct Stats {
user_ticks: u64,
Expand All @@ -37,72 +35,78 @@ struct CpuUtilization {
system_cpu_millicores: f64,
}

static PREV: OnceCell<Mutex<Stats>> = OnceCell::new();

#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
pub(crate) async fn poll(
pid: i32,
uptime_secs: f64,
labels: &[(String, String)],
) -> Result<(), Error> {
let group_prefix = cgroup::v2::get_path(pid).await?;

// Read cpu.max (cgroup v2)
let cpu_max = fs::read_to_string(group_prefix.join("cpu.max")).await?;
let parts: Vec<&str> = cpu_max.split_whitespace().collect();
let (max_str, period_str) = (parts[0], parts[1]);
let allowed_cores = if max_str == "max" {
// If the target cgroup has no CPU limit we assume it has access to all
// physical cores.
num_cpus::get_physical() as f64
} else {
let max_val = max_str.parse::<f64>()?;
let period_val = period_str.parse::<f64>()?;
max_val / period_val
};
let limit_millicores = allowed_cores * 1000.0;

// Read `/proc/<PID>/stat`
let stat_contents = fs::read_to_string(format!("/proc/{pid}/stat")).await?;
let (cur_pid, utime_ticks, stime_ticks) = parse(&stat_contents)?;
assert!(cur_pid == pid);

// See sysconf(3).
let ticks_per_second = unsafe { nix::libc::sysconf(nix::libc::_SC_CLK_TCK) } as f64;

// Get or initialize the previous stats. Note that the first time this is
// initialized we intentionally set last_instance to now to avoid scheduling
// shenanigans.
let cur_stats = Stats {
user_ticks: utime_ticks,
system_ticks: stime_ticks,
uptime_ticks: (uptime_secs * ticks_per_second).round() as u64,
};

let mut prev = PREV
.get_or_init(|| Mutex::new(cur_stats))
.lock()
.expect("/proc/pid/stat previous state lock poisoned");

if let Some(util) = compute_cpu_usage(*prev, cur_stats, allowed_cores) {
// NOTE these metric names are paired with names in cgroup/v2/cpu.rs and
// must remain consistent. If you change these, change those.
gauge!("stat.total_cpu_percentage", labels).set(util.total_cpu_percentage);
gauge!("stat.cpu_percentage", labels).set(util.total_cpu_percentage); // backward compatibility
gauge!("stat.user_cpu_percentage", labels).set(util.user_cpu_percentage);
gauge!("stat.kernel_cpu_percentage", labels).set(util.system_cpu_percentage); // kernel is a misnomer, keeping for compatibility
gauge!("stat.system_cpu_percentage", labels).set(util.system_cpu_percentage);

gauge!("stat.total_cpu_usage_millicores", labels).set(util.total_cpu_millicores);
gauge!("stat.user_cpu_usage_millicores", labels).set(util.user_cpu_millicores);
gauge!("stat.kernel_cpu_usage_millicores", labels).set(util.system_cpu_millicores); // kernel is a misnomer
gauge!("stat.system_cpu_usage_millicores", labels).set(util.system_cpu_millicores);
gauge!("stat.cpu_limit_millicores", labels).set(limit_millicores);
#[derive(Debug)]
pub(crate) struct Sampler {
ticks_per_second: f64,
prev: Stats,
}

impl Sampler {
pub(crate) fn new() -> Self {
Self {
ticks_per_second: unsafe { nix::libc::sysconf(nix::libc::_SC_CLK_TCK) } as f64,
prev: Stats::default(),
}
}

*prev = cur_stats;
#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
pub(crate) async fn poll(
&mut self,
pid: i32,
uptime_secs: f64,
labels: &[(&'static str, String)],
) -> Result<(), Error> {
let group_prefix = cgroup::v2::get_path(pid).await?;

// Read cpu.max (cgroup v2)
let cpu_max = fs::read_to_string(group_prefix.join("cpu.max")).await?;
let parts: Vec<&str> = cpu_max.split_whitespace().collect();
let (max_str, period_str) = (parts[0], parts[1]);
let allowed_cores = if max_str == "max" {
// If the target cgroup has no CPU limit we assume it has access to all
// physical cores.
num_cpus::get_physical() as f64
} else {
let max_val = max_str.parse::<f64>()?;
let period_val = period_str.parse::<f64>()?;
max_val / period_val
};
let limit_millicores = allowed_cores * 1000.0;

// Read `/proc/<PID>/stat`
let stat_contents = fs::read_to_string(format!("/proc/{pid}/stat")).await?;
let (cur_pid, utime_ticks, stime_ticks) = parse(&stat_contents)?;
assert!(cur_pid == pid);

// Get or initialize the previous stats. Note that the first time this is
// initialized we intentionally set last_instance to now to avoid scheduling
// shenanigans.
let cur_stats = Stats {
user_ticks: utime_ticks,
system_ticks: stime_ticks,
uptime_ticks: (uptime_secs * self.ticks_per_second).round() as u64,
};

Ok(())
if let Some(util) = compute_cpu_usage(self.prev, cur_stats, allowed_cores) {
// NOTE these metric names are paired with names in cgroup/v2/cpu.rs and
// must remain consistent. If you change these, change those.
gauge!("stat.total_cpu_percentage", labels).set(util.total_cpu_percentage);
gauge!("stat.cpu_percentage", labels).set(util.total_cpu_percentage); // backward compatibility
gauge!("stat.user_cpu_percentage", labels).set(util.user_cpu_percentage);
gauge!("stat.kernel_cpu_percentage", labels).set(util.system_cpu_percentage); // kernel is a misnomer, keeping for compatibility
gauge!("stat.system_cpu_percentage", labels).set(util.system_cpu_percentage);

gauge!("stat.total_cpu_usage_millicores", labels).set(util.total_cpu_millicores);
gauge!("stat.user_cpu_usage_millicores", labels).set(util.user_cpu_millicores);
gauge!("stat.kernel_cpu_usage_millicores", labels).set(util.system_cpu_millicores); // kernel is a misnomer
gauge!("stat.system_cpu_usage_millicores", labels).set(util.system_cpu_millicores);
gauge!("stat.cpu_limit_millicores", labels).set(limit_millicores);
}

self.prev = cur_stats;

Ok(())
}
}

/// Parse `/proc/<pid>/stat` and extracts:
Expand Down
Loading