From 9408225d2724274c62a4de7680ec4a49f50b2ca1 Mon Sep 17 00:00:00 2001 From: "Brian L. Troutwine" Date: Thu, 12 Dec 2024 14:35:41 -0800 Subject: [PATCH 1/5] Re-introduce per-process CPU collection This commit re-introduces per-process CPU data from /proc/{pid}/stat reusing the same naming scheme from the cgroup sourced data, offset with stat. to avoid summing issues in aggregations. I have removed much of the stat code from the main procfs sample loop. The data was either no longer used or is duplicated elsewhere. Some data could be re-introduced if we desire by extending the poll loop in stat.rs. I am continuing to remove more and more of our procfs crate integration. I think, ultimately, we should be able to parse directly without the need of a third-party dependency. That removal is near done now. Signed-off-by: Brian L. Troutwine --- lading/src/observer/linux/cgroup.rs | 2 +- lading/src/observer/linux/cgroup/v2/cpu.rs | 5 +- lading/src/observer/linux/procfs.rs | 405 ++++++++------------- lading/src/observer/linux/procfs/stat.rs | 140 +++++++ 4 files changed, 287 insertions(+), 265 deletions(-) create mode 100644 lading/src/observer/linux/procfs/stat.rs diff --git a/lading/src/observer/linux/cgroup.rs b/lading/src/observer/linux/cgroup.rs index 99065720f..89eee56d6 100644 --- a/lading/src/observer/linux/cgroup.rs +++ b/lading/src/observer/linux/cgroup.rs @@ -1,5 +1,5 @@ /// Code to read cgroup information. -mod v2; +pub(crate) mod v2; use std::{collections::VecDeque, io}; diff --git a/lading/src/observer/linux/cgroup/v2/cpu.rs b/lading/src/observer/linux/cgroup/v2/cpu.rs index f92680995..91837fbb3 100644 --- a/lading/src/observer/linux/cgroup/v2/cpu.rs +++ b/lading/src/observer/linux/cgroup/v2/cpu.rs @@ -96,6 +96,9 @@ pub(crate) async fn poll(group_prefix: &Path, labels: &[(String, String)]) -> Re let user_fraction = delta_user as f64 / delta_time; let system_fraction = delta_system as f64 / delta_time; + // NOTE these metric names are paired with names in procfs/stat.rs and + // must remain consistent. If you change these, change those. + // Convert usage to a percentage of the cores granted to the target. let total_cpu = (usage_fraction / allowed_cores) * 100.0; let user_cpu = (user_fraction / allowed_cores) * 100.0; @@ -106,7 +109,7 @@ pub(crate) async fn poll(group_prefix: &Path, labels: &[(String, String)]) -> Re gauge!("kernel_cpu_percentage", labels).set(system_cpu); // kernel is a misnomer, keeping for compatibility gauge!("system_cpu_percentage", labels).set(system_cpu); - // Convert usage to kubernetes style millicores. These + // Convert usage to kubernetes style millicores. let total_millicores = usage_fraction * 1000.0; let user_millicores = user_fraction * 1000.0; let system_millicores = system_fraction * 1000.0; diff --git a/lading/src/observer/linux/procfs.rs b/lading/src/observer/linux/procfs.rs index b8afe8731..2b5ea741f 100644 --- a/lading/src/observer/linux/procfs.rs +++ b/lading/src/observer/linux/procfs.rs @@ -1,13 +1,13 @@ /// Sampler implementation for procfs filesystems mod memory; +mod stat; use std::{collections::VecDeque, io}; use metrics::gauge; use nix::errno::Errno; -use procfs::ProcError::PermissionDenied; -use procfs::{process::Process, Current}; -use rustc_hash::{FxHashMap, FxHashSet}; +use procfs::process::Process; +use rustc_hash::FxHashSet; use tracing::{error, warn}; const BYTES_PER_KIBIBYTE: u64 = 1024; @@ -25,6 +25,9 @@ pub enum Error { /// Wrapper for [`procfs::ProcError`] #[error("Unable to read procfs: {0}")] Proc(#[from] procfs::ProcError), + /// Wrapper for [`stat::Error`] + #[error("Unable to read stat: {0}")] + Stat(#[from] stat::Error), } macro_rules! report_status_field { @@ -36,41 +39,16 @@ macro_rules! report_status_field { }; } -#[derive(Debug, Default)] -struct Sample { - utime: u64, - stime: u64, - uptime: u64, -} - -#[derive(Debug, Eq, Hash, PartialEq)] -struct ProcessIdentifier { - pid: i32, - exe: String, - cmdline: String, - comm: String, -} - #[derive(Debug)] pub(crate) struct Sampler { parent: Process, - ticks_per_second: u64, - page_size: u64, - previous_totals: Sample, - have_logged_perms_err: bool, } impl Sampler { pub(crate) fn new(parent_pid: i32) -> Result { let parent = Process::new(parent_pid)?; - Ok(Self { - parent, - ticks_per_second: procfs::ticks_per_second(), - page_size: procfs::page_size(), - previous_totals: Sample::default(), - have_logged_perms_err: false, - }) + Ok(Self { parent }) } #[allow( @@ -81,18 +59,6 @@ impl Sampler { clippy::cast_possible_wrap )] pub(crate) async fn poll(&mut self) -> Result<(), Error> { - // Key for this map is (pid, basename/exe, cmdline) - let mut samples: FxHashMap = FxHashMap::default(); - - let mut total_processes: u64 = 0; - // Calculate the ticks since machine uptime. This will be important - // later for calculating per-process uptime. Because we capture this one - // we will be slightly out of date with each subsequent iteration of the - // loop. We do not believe this to be an issue. - let uptime_seconds: f64 = procfs::Uptime::current()?.uptime; // seconds since boot - let uptime_ticks: u64 = - (uptime_seconds.round() as u64).saturating_mul(self.ticks_per_second); // CPU-ticks since boot - // A tally of the total RSS and PSS consumed by the parent process and // its children. let mut aggr = memory::smaps_rollup::Aggregator::default(); @@ -103,6 +69,8 @@ impl Sampler { let mut pids: FxHashSet = FxHashSet::default(); let mut processes: VecDeque = VecDeque::with_capacity(16); // an arbitrary smallish number processes.push_back(Process::new(self.parent.pid())?); + + // BEGIN pid loop while let Some(process) = processes.pop_back() { // Search for child processes. This is done by querying for every // thread of `process` and inspecting each child of the thread. Note @@ -131,7 +99,8 @@ impl Sampler { } let pid = process.pid(); - let mut has_ptrace_perm = true; + + // `/proc/{pid}/status` let status = match process.status() { Ok(status) => status, Err(e) => { @@ -146,130 +115,18 @@ impl Sampler { continue; } - // Collect the 'name' of the process. This is pulled from - // /proc//exe and we take the last part of that, like posix - // `top` does. This will require us to label all data with both pid - // and name, again like `top`. - let basename: String = match process.exe() { - Ok(exe) => { - if let Some(basename) = exe.file_name() { - String::from( - basename - .to_str() - .expect("could not convert basename to str"), - ) - } else { - // It's possible to have a process with no named exe. On - // Linux systems with functional security setups it's not - // clear _when_ this would be the case but, hey. - String::new() - } - } - Err(PermissionDenied(_)) => { - // permission to dereference or read this symbolic link is governed - // by a ptrace(2) access mode PTRACE_MODE_READ_FSCREDS check - // - // In practice, this can occur when an unprivileged lading process - // is given a container to monitor as the container pids are owned by root - has_ptrace_perm = false; - if !self.have_logged_perms_err { - error!("lading lacks ptrace permissions, exe will be empty and smaps-related metrics will be missing."); - self.have_logged_perms_err = true; - } - String::new() - } - Err(e) => { - warn!("Couldn't read exe symlink: {:?}", e); - // This is an unknown failure case - String::new() - } - }; - - let cmdline: String = match process.cmdline() { - Ok(cmdline) => cmdline.join(" "), - Err(_) => "zombie".to_string(), - }; - - let stats = match process.stat() { - Ok(stats) => stats, - Err(e) => { - // We don't want to bail out entirely if we can't read stats - // which will happen if we don't have permissions or, more - // likely, the process has exited. - warn!("Got err when reading process stat: {e:?}"); - continue; - } - }; - let comm = stats.comm.clone(); - - // Calculate process uptime. We have two pieces of information from - // the kernel: computer uptime and process starttime relative to - // power-on of the computer. - let uptime: u64 = uptime_ticks.saturating_sub(stats.starttime); // ticks - - // The times that the process and the processes' waited for children - // have been scheduled in kernel and user space. We exclude cstime, - // cutime because while the parent has waited it has not spent CPU - // time, it's children have. - let utime: u64 = stats.utime; // CPU-ticks - let stime: u64 = stats.stime; // CPU-ticks - - let sample = Sample { - utime, - stime, - uptime, - }; - let id = ProcessIdentifier { - pid, - exe: basename.clone(), - comm: comm.clone(), - cmdline: cmdline.clone(), - }; - samples.insert(id, sample); - - // Answering the question "How much memory is my program consuming?" - // is not as straightforward as one might hope. Reside set size - // (RSS) is the amount of memory held in memory measured in bytes, - // Proportional Set Size (PSS) is the amount of memory held by the - // program but unshared between processes (think data mmapped - // multiple times), Virtual Size (vsize) is the amount of memory - // held in pages, which may or may not be reflected in real memory. - // VSize is often much, much larger than RSS. - // - // We currently do not pull PSS as it requires trawling the smaps - // but we don't have call to do that, avoiding an allocation. - // - // Consider that Linux allocation is done in pages. If I allocate 1 - // byte, say, from the OS I will receive a page of memory back -- - // see `page_size` for the size of a page -- and the RSS and VSize - // of my program is then a page worth of bytes. If I deallocate that - // byte my RSS is 0 but _as an optimization_ Linux may not free the - // page. My VSize remains one page. Allocators muddy this even - // further by trying to account for the behavior of the operating - // system. Anyway, good luck out there. You'll be fine. - let rss: u64 = stats.rss * self.page_size; - let rsslim: u64 = stats.rsslim; - let vsize: u64 = stats.vsize; - + // Construct labels and then start ripping data into metrics. + let exe = proc_exe(pid).await?; + let comm = proc_comm(pid).await?; + let cmdline = proc_cmdline(pid).await?; let labels = [ (String::from("pid"), format!("{pid}")), - (String::from("exe"), basename.clone()), + (String::from("exe"), exe.clone()), (String::from("cmdline"), cmdline.clone()), (String::from("comm"), comm.clone()), ]; - // Number of pages that the process has in real memory. - gauge!("rss_bytes", &labels).set(rss as f64); - // Soft limit on RSS bytes, see RLIMIT_RSS in getrlimit(2). - gauge!("rsslim_bytes", &labels).set(rsslim as f64); - // The size in bytes of the process in virtual memory. - gauge!("vsize_bytes", &labels).set(vsize as f64); - // Number of threads this process has active. - gauge!("num_threads", &labels).set(stats.num_threads as f64); - - total_processes = total_processes.saturating_add(1); - - // Also report memory data from `proc/status` as a reference point + // `/proc/{pid}/status` report_status_field!(status, labels, vmrss); report_status_field!(status, labels, rssanon); report_status_field!(status, labels, rssfile); @@ -279,122 +136,144 @@ impl Sampler { report_status_field!(status, labels, vmexe); report_status_field!(status, labels, vmlib); - // smaps and smaps_rollup are both governed by the same permission - // restrictions as /proc/[pid]/maps. Per man 5 proc: Permission to - // access this file is governed by a ptrace access mode - // PTRACE_MODE_READ_FSCREDS check; see ptrace(2). If a previous call - // to process.status() failed due to a lack of ptrace permissions, - // then we assume smaps operations will fail as well. - if has_ptrace_perm { - // `/proc/{pid}/smaps` - match memory::smaps::Regions::from_pid(pid) { - Ok(memory_regions) => { - for (pathname, measures) in memory_regions.aggregate_by_pathname() { - let labels = [ - ("pid", format!("{pid}")), - ("exe", basename.clone()), - ("cmdline", cmdline.clone()), - ("comm", comm.clone()), - ("pathname", pathname), - ]; - gauge!("smaps.rss.by_pathname", &labels).set(measures.rss as f64); - gauge!("smaps.pss.by_pathname", &labels).set(measures.pss as f64); - gauge!("smaps.swap.by_pathname", &labels).set(measures.swap as f64); - gauge!("smaps.size.by_pathname", &labels).set(measures.size as f64); + // `/proc/{pid}/stat`, most especially per-process CPU data. + if let Err(e) = stat::poll(pid, &labels).await { + // We don't want to bail out entirely if we can't read stats + // which will happen if we don't have permissions or, more + // likely, the process has exited. + warn!("Couldn't process `/proc/{pid}/stat`: {e}"); + continue; + } - if let Some(m) = measures.private_clean { - gauge!("smaps.private_clean.by_pathname", &labels).set(m as f64); - } - if let Some(m) = measures.private_dirty { - gauge!("smaps.private_dirty.by_pathname", &labels).set(m as f64); - } - if let Some(m) = measures.shared_clean { - gauge!("smaps.shared_clean.by_pathname", &labels).set(m as f64); - } - if let Some(m) = measures.shared_dirty { - gauge!("smaps.shared_dirty.by_pathname", &labels).set(m as f64); - } - if let Some(m) = measures.referenced { - gauge!("smaps.referenced.by_pathname", &labels).set(m as f64); - } - if let Some(m) = measures.anonymous { - gauge!("smaps.anonymous.by_pathname", &labels).set(m as f64); - } - if let Some(m) = measures.lazy_free { - gauge!("smaps.lazy_free.by_pathname", &labels).set(m as f64); - } - if let Some(m) = measures.anon_huge_pages { - gauge!("smaps.anon_huge_pages.by_pathname", &labels).set(m as f64); - } - if let Some(m) = measures.shmem_pmd_mapped { - gauge!("smaps.shmem_pmd_mapped.by_pathname", &labels).set(m as f64); - } - if let Some(m) = measures.shared_hugetlb { - gauge!("smaps.shared_hugetlb.by_pathname", &labels).set(m as f64); - } - if let Some(m) = measures.private_hugetlb { - gauge!("smaps.private_hugetlb.by_pathname", &labels).set(m as f64); - } - if let Some(m) = measures.file_pmd_mapped { - gauge!("smaps.file_pmd_mapped.by_pathname", &labels).set(m as f64); - } - if let Some(m) = measures.locked { - gauge!("smaps.locked.by_pathname", &labels).set(m as f64); - } - if let Some(m) = measures.swap_pss { - gauge!("smaps.swap_pss.by_pathname", &labels).set(m as f64); - } + // `/proc/{pid}/smaps` + match memory::smaps::Regions::from_pid(pid) { + Ok(memory_regions) => { + for (pathname, measures) in memory_regions.aggregate_by_pathname() { + let labels = [ + ("pid", format!("{pid}")), + ("exe", exe.clone()), + ("cmdline", cmdline.clone()), + ("comm", comm.clone()), + ("pathname", pathname), + ]; + gauge!("smaps.rss.by_pathname", &labels).set(measures.rss as f64); + gauge!("smaps.pss.by_pathname", &labels).set(measures.pss as f64); + gauge!("smaps.swap.by_pathname", &labels).set(measures.swap as f64); + gauge!("smaps.size.by_pathname", &labels).set(measures.size as f64); + + if let Some(m) = measures.private_clean { + gauge!("smaps.private_clean.by_pathname", &labels).set(m as f64); + } + if let Some(m) = measures.private_dirty { + gauge!("smaps.private_dirty.by_pathname", &labels).set(m as f64); + } + if let Some(m) = measures.shared_clean { + gauge!("smaps.shared_clean.by_pathname", &labels).set(m as f64); + } + if let Some(m) = measures.shared_dirty { + gauge!("smaps.shared_dirty.by_pathname", &labels).set(m as f64); + } + if let Some(m) = measures.referenced { + gauge!("smaps.referenced.by_pathname", &labels).set(m as f64); + } + if let Some(m) = measures.anonymous { + gauge!("smaps.anonymous.by_pathname", &labels).set(m as f64); + } + if let Some(m) = measures.lazy_free { + gauge!("smaps.lazy_free.by_pathname", &labels).set(m as f64); + } + if let Some(m) = measures.anon_huge_pages { + gauge!("smaps.anon_huge_pages.by_pathname", &labels).set(m as f64); + } + if let Some(m) = measures.shmem_pmd_mapped { + gauge!("smaps.shmem_pmd_mapped.by_pathname", &labels).set(m as f64); + } + if let Some(m) = measures.shared_hugetlb { + gauge!("smaps.shared_hugetlb.by_pathname", &labels).set(m as f64); + } + if let Some(m) = measures.private_hugetlb { + gauge!("smaps.private_hugetlb.by_pathname", &labels).set(m as f64); + } + if let Some(m) = measures.file_pmd_mapped { + gauge!("smaps.file_pmd_mapped.by_pathname", &labels).set(m as f64); + } + if let Some(m) = measures.locked { + gauge!("smaps.locked.by_pathname", &labels).set(m as f64); + } + if let Some(m) = measures.swap_pss { + gauge!("smaps.swap_pss.by_pathname", &labels).set(m as f64); } - } - Err(err) => { - // We don't want to bail out entirely if we can't read stats - // which will happen if we don't have permissions or, more - // likely, the process has exited. - warn!("Couldn't process `/proc/{pid}/smaps`: {err}"); } } - - // `/proc/{pid}/smaps_rollup` - if let Err(err) = memory::smaps_rollup::poll(pid, &labels, &mut aggr).await { - // We don't want to bail out entirely if we can't read smap rollup + Err(err) => { + // We don't want to bail out entirely if we can't read stats // which will happen if we don't have permissions or, more // likely, the process has exited. - warn!("Couldn't process `/proc/{pid}/smaps_rollup`: {err}"); + warn!("Couldn't process `/proc/{pid}/smaps`: {err}"); } } - } - - gauge!("num_processes").set(total_processes as f64); - let total_sample = samples - .iter() - .fold(Sample::default(), |acc, (key, sample)| { - let ProcessIdentifier { - pid, - exe: _, - cmdline: _, - comm: _, - } = key; - - Sample { - utime: acc.utime.saturating_add(sample.utime), - stime: acc.stime.saturating_add(sample.stime), - // use parent process uptime - uptime: if *pid == self.parent.pid() { - sample.uptime - } else { - acc.uptime - }, - } - }); + // `/proc/{pid}/smaps_rollup` + if let Err(err) = memory::smaps_rollup::poll(pid, &labels, &mut aggr).await { + // We don't want to bail out entirely if we can't read smap rollup + // which will happen if we don't have permissions or, more + // likely, the process has exited. + warn!("Couldn't process `/proc/{pid}/smaps_rollup`: {err}"); + } + } + // END pid loop gauge!("total_rss_bytes").set(aggr.rss as f64); gauge!("total_pss_bytes").set(aggr.pss as f64); - gauge!("total_utime").set(total_sample.utime as f64); - gauge!("total_stime").set(total_sample.stime as f64); - - self.previous_totals = total_sample; Ok(()) } } + +/// Read `/proc/{pid}/comm` +async fn proc_comm(pid: i32) -> Result { + let comm_path = format!("/proc/{pid}/comm"); + let buf = tokio::fs::read_to_string(&comm_path).await?; + Ok(buf.trim().to_string()) +} + +/// Collect the 'name' of the process. This is pulled from `/proc//exe` and +/// we take the last part of that, like posix `top` does. +async fn proc_exe(pid: i32) -> Result { + let exe_path = format!("/proc/{pid}/exe"); + let exe = tokio::fs::read_link(&exe_path).await?; + if let Some(basename) = exe.file_name() { + Ok(String::from( + basename + .to_str() + .expect("could not convert basename to str"), + )) + } else { + // It's possible to have a process with no named exe. On + // Linux systems with functional security setups it's not + // clear _when_ this would be the case but, hey. + Ok(String::new()) + } +} + +/// Read `/proc/{pid}/cmdline` +async fn proc_cmdline(pid: i32) -> Result { + let cmdline_path = format!("/proc/{pid}/cmdline"); + let buf = tokio::fs::read_to_string(&cmdline_path).await?; + let parts: Vec = buf + .split('\0') + .filter_map(|s| { + if s.is_empty() { + None + } else { + Some(s.to_string()) + } + }) + .collect(); + let res = if parts.is_empty() { + String::from("zombie") + } else { + parts.join(" ") + }; + Ok(res) +} diff --git a/lading/src/observer/linux/procfs/stat.rs b/lading/src/observer/linux/procfs/stat.rs new file mode 100644 index 000000000..c201b1171 --- /dev/null +++ b/lading/src/observer/linux/procfs/stat.rs @@ -0,0 +1,140 @@ +use metrics::gauge; +use once_cell::sync::OnceCell; +use std::sync::Mutex; +use std::time::Instant; +use tokio::fs; + +use crate::observer::linux::cgroup; + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("IO error: {0}")] + Io(#[from] std::io::Error), + #[error("Float Parsing: {0}")] + ParseFloat(#[from] std::num::ParseFloatError), + #[error("Integer Parsing: {0}")] + ParseInt(#[from] std::num::ParseIntError), + #[error("Stat Malformed: {0}")] + StatMalformed(&'static str), + #[error("Cgroup get_path: {0}")] + Cgroup(#[from] cgroup::v2::Error), +} +struct PrevStats { + total_ticks: u64, + user_ticks: u64, + system_ticks: u64, + last_instant: Instant, +} + +static PREV: OnceCell> = OnceCell::new(); + +pub(crate) async fn poll(pid: i32, labels: &[(String, String)]) -> Result<(), Error> { + let group_prefix = cgroup::v2::get_path(pid).await?; + + // Read cpu.max (cgroup v2) + let cpu_max = fs::read_to_string(group_prefix.join("cpu.max")).await?; + let parts: Vec<&str> = cpu_max.split_whitespace().collect(); + let (max_str, period_str) = (parts[0], parts[1]); + let allowed_cores = if max_str == "max" { + // If the target cgroup has no CPU limit we assume it has access to all + // physical cores. + num_cpus::get_physical() as f64 + } else { + let max_val = max_str.parse::()?; + let period_val = period_str.parse::()?; + max_val / period_val + }; + let limit_millicores = allowed_cores * 1000.0; + + // Read `/proc//stat` + let stat_contents = fs::read_to_string(format!("/proc/{pid}/stat")).await?; + let start_paren = stat_contents + .find('(') + .ok_or_else(|| Error::StatMalformed("Failed to find '(' in stat contents"))?; + let end_paren = stat_contents + .rfind(')') + .ok_or_else(|| Error::StatMalformed("Failed to find ')' in stat contents"))?; + let before = &stat_contents[..start_paren]; + let after = &stat_contents[end_paren + 2..]; // skip ") " + + let mut parts: Vec<&str> = before.split_whitespace().collect(); + let pid_str = parts[0]; // PID + let _pid_val: u32 = pid_str.parse()?; // confirm PID + + parts = after.split_whitespace().collect(); + // Alright, per the proc(5) manpage utime = field 14, stime = field 15. + // After skipping the name, field #3 is parts[0], so utime (14) is + // parts[11], stime (15) is parts[12]. + let utime_ticks: u64 = parts[11].parse()?; + let stime_ticks: u64 = parts[12].parse()?; + let total_ticks = utime_ticks + stime_ticks; + + // See sysconf(3). + let ticks_per_second = unsafe { nix::libc::sysconf(nix::libc::_SC_CLK_TCK) } as f64; + // Get or initialize the previous stats. Note that the first time this is + // initialized we intentionally set last_instance to now to avoid scheduling + // shenanigans. + let now = Instant::now(); + let mut prev = PREV + .get_or_init(|| { + Mutex::new(PrevStats { + total_ticks, + user_ticks: utime_ticks, + system_ticks: stime_ticks, + last_instant: now, + }) + }) + .lock() + .expect("Lock poisoned"); + + let delta_time = now.duration_since(prev.last_instant).as_micros(); + let delta_total_ticks = total_ticks.saturating_sub(prev.total_ticks); + let delta_user_ticks = utime_ticks.saturating_sub(prev.user_ticks); + let delta_system_ticks = stime_ticks.saturating_sub(prev.system_ticks); + + // Update previous stats and if there's a time delta calculate the CPU + // usage. + prev.total_ticks = total_ticks; + prev.user_ticks = utime_ticks; + prev.system_ticks = stime_ticks; + prev.last_instant = now; + if delta_time > 0 { + let delta_time = delta_time as f64; + + let tick_to_usec = 1_000_000.0 / ticks_per_second; + + let delta_usage_usec = (delta_total_ticks as f64) * tick_to_usec; + let delta_user_usec = (delta_user_ticks as f64) * tick_to_usec; + let delta_system_usec = (delta_system_ticks as f64) * tick_to_usec; + + // Compute CPU usage as a fraction of a single CPU + let usage_fraction = delta_usage_usec / delta_time; + let user_fraction = delta_user_usec / delta_time; + let system_fraction = delta_system_usec / delta_time; + + // NOTE these metric names are paired with names in cgroup/v2/cpu.rs and + // must remain consistent. If you change these, change those. + + // Convert usage to a percentage of the cores granted to the target. + let total_cpu = (usage_fraction / allowed_cores) * 100.0; + let user_cpu = (user_fraction / allowed_cores) * 100.0; + let system_cpu = (system_fraction / allowed_cores) * 100.0; + gauge!("stat.total_cpu_percentage", labels).set(total_cpu); + gauge!("stat.cpu_percentage", labels).set(total_cpu); // backward compatibility + gauge!("stat.user_cpu_percentage", labels).set(user_cpu); + gauge!("stat.kernel_cpu_percentage", labels).set(system_cpu); // kernel is a misnomer, keeping for compatibility + gauge!("stat.system_cpu_percentage", labels).set(system_cpu); + + // Convert usage to kubernetes style millicores. + let total_millicores = usage_fraction * 1000.0; + let user_millicores = user_fraction * 1000.0; + let system_millicores = system_fraction * 1000.0; + gauge!("stat.total_cpu_usage_millicores", labels).set(total_millicores); + gauge!("stat.user_cpu_usage_millicores", labels).set(user_millicores); + gauge!("stat.kernel_cpu_usage_millicores", labels).set(system_millicores); // kernel is a misnomer, keeping for compatibility + gauge!("stat.system_cpu_usage_millicores", labels).set(system_millicores); + gauge!("stat.cpu_limit_millicores", labels).set(limit_millicores); + } + + Ok(()) +} From 5001c72a9d5e57403ea27e1dee5dd12c69ead4a3 Mon Sep 17 00:00:00 2001 From: "Brian L. Troutwine" Date: Thu, 12 Dec 2024 17:54:29 -0800 Subject: [PATCH 2/5] Use `/proc/uptime` tick data for timing information Signed-off-by: Brian L. Troutwine --- lading/src/observer/linux/procfs.rs | 48 +++- lading/src/observer/linux/procfs/stat.rs | 307 +++++++++++++++++------ 2 files changed, 276 insertions(+), 79 deletions(-) diff --git a/lading/src/observer/linux/procfs.rs b/lading/src/observer/linux/procfs.rs index 2b5ea741f..203aace1e 100644 --- a/lading/src/observer/linux/procfs.rs +++ b/lading/src/observer/linux/procfs.rs @@ -28,6 +28,12 @@ pub enum Error { /// Wrapper for [`stat::Error`] #[error("Unable to read stat: {0}")] Stat(#[from] stat::Error), + /// Unable to parse /proc/uptime + #[error("/proc/uptime malformed: {0}")] + MalformedUptime(&'static str), + /// Unable to parse floating point + #[error("Float Parsing: {0}")] + ParseFloat(#[from] std::num::ParseFloatError), } macro_rules! report_status_field { @@ -136,8 +142,10 @@ impl Sampler { report_status_field!(status, labels, vmexe); report_status_field!(status, labels, vmlib); + let uptime = proc_uptime().await?; + // `/proc/{pid}/stat`, most especially per-process CPU data. - if let Err(e) = stat::poll(pid, &labels).await { + if let Err(e) = stat::poll(pid, uptime, &labels).await { // We don't want to bail out entirely if we can't read stats // which will happen if we don't have permissions or, more // likely, the process has exited. @@ -277,3 +285,41 @@ async fn proc_cmdline(pid: i32) -> Result { }; Ok(res) } + +/// Read `/proc/uptime` +async fn proc_uptime() -> Result { + let buf = tokio::fs::read_to_string("/proc/uptime").await?; + let uptime_secs = proc_uptime_inner(&buf)?; + Ok(uptime_secs) +} + +/// Parse `/proc/uptime` to extract total uptime in seconds. +/// +/// # Errors +/// +/// Function errors if the file is malformed. +#[inline] +fn proc_uptime_inner(contents: &str) -> Result { + // TODO this should probably be scooted up to procfs.rs. Implies the + // `proc_*` functions there need a test component, making this an inner + // function eventually. + + let fields: Vec<&str> = contents.split_whitespace().collect(); + if fields.is_empty() { + return Err(Error::MalformedUptime("/proc/uptime empty")); + } + let uptime_secs = fields[0].parse::()?; + Ok(uptime_secs) +} + +#[cfg(test)] +mod test { + use super::proc_uptime_inner; + + #[test] + fn parse_uptime_basic() { + let line = "12345.67 4321.00\n"; + let uptime = proc_uptime_inner(line).unwrap(); + assert!((uptime - 12345.67).abs() < f64::EPSILON); + } +} diff --git a/lading/src/observer/linux/procfs/stat.rs b/lading/src/observer/linux/procfs/stat.rs index c201b1171..cc7251d76 100644 --- a/lading/src/observer/linux/procfs/stat.rs +++ b/lading/src/observer/linux/procfs/stat.rs @@ -1,7 +1,6 @@ use metrics::gauge; use once_cell::sync::OnceCell; use std::sync::Mutex; -use std::time::Instant; use tokio::fs; use crate::observer::linux::cgroup; @@ -19,16 +18,33 @@ pub enum Error { #[error("Cgroup get_path: {0}")] Cgroup(#[from] cgroup::v2::Error), } -struct PrevStats { - total_ticks: u64, + +#[derive(Debug, Clone, Copy)] +#[allow(clippy::struct_field_names)] // The _ticks is useful even if clippy doesn't like it. +struct Stats { user_ticks: u64, system_ticks: u64, - last_instant: Instant, + uptime_ticks: u64, +} + +#[derive(Debug, Clone, Copy)] +struct CpuUtilization { + total_cpu_percentage: f64, + user_cpu_percentage: f64, + system_cpu_percentage: f64, + total_cpu_millicores: f64, + user_cpu_millicores: f64, + system_cpu_millicores: f64, } -static PREV: OnceCell> = OnceCell::new(); +static PREV: OnceCell> = OnceCell::new(); -pub(crate) async fn poll(pid: i32, labels: &[(String, String)]) -> Result<(), Error> { +#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)] +pub(crate) async fn poll( + pid: i32, + uptime_secs: f64, + labels: &[(String, String)], +) -> Result<(), Error> { let group_prefix = cgroup::v2::get_path(pid).await?; // Read cpu.max (cgroup v2) @@ -48,93 +64,228 @@ pub(crate) async fn poll(pid: i32, labels: &[(String, String)]) -> Result<(), Er // Read `/proc//stat` let stat_contents = fs::read_to_string(format!("/proc/{pid}/stat")).await?; - let start_paren = stat_contents - .find('(') - .ok_or_else(|| Error::StatMalformed("Failed to find '(' in stat contents"))?; - let end_paren = stat_contents - .rfind(')') - .ok_or_else(|| Error::StatMalformed("Failed to find ')' in stat contents"))?; - let before = &stat_contents[..start_paren]; - let after = &stat_contents[end_paren + 2..]; // skip ") " - - let mut parts: Vec<&str> = before.split_whitespace().collect(); - let pid_str = parts[0]; // PID - let _pid_val: u32 = pid_str.parse()?; // confirm PID - - parts = after.split_whitespace().collect(); - // Alright, per the proc(5) manpage utime = field 14, stime = field 15. - // After skipping the name, field #3 is parts[0], so utime (14) is - // parts[11], stime (15) is parts[12]. - let utime_ticks: u64 = parts[11].parse()?; - let stime_ticks: u64 = parts[12].parse()?; - let total_ticks = utime_ticks + stime_ticks; + let (cur_pid, utime_ticks, stime_ticks) = parse(&stat_contents)?; + assert!(cur_pid == pid); // See sysconf(3). let ticks_per_second = unsafe { nix::libc::sysconf(nix::libc::_SC_CLK_TCK) } as f64; + // Get or initialize the previous stats. Note that the first time this is // initialized we intentionally set last_instance to now to avoid scheduling // shenanigans. - let now = Instant::now(); + let cur_stats = Stats { + user_ticks: utime_ticks, + system_ticks: stime_ticks, + uptime_ticks: (uptime_secs * ticks_per_second).round() as u64, + }; + let mut prev = PREV - .get_or_init(|| { - Mutex::new(PrevStats { - total_ticks, - user_ticks: utime_ticks, - system_ticks: stime_ticks, - last_instant: now, - }) - }) + .get_or_init(|| Mutex::new(cur_stats)) .lock() - .expect("Lock poisoned"); + .expect("/proc/pid/stat previous state lock poisoned"); - let delta_time = now.duration_since(prev.last_instant).as_micros(); - let delta_total_ticks = total_ticks.saturating_sub(prev.total_ticks); - let delta_user_ticks = utime_ticks.saturating_sub(prev.user_ticks); - let delta_system_ticks = stime_ticks.saturating_sub(prev.system_ticks); + if let Some(util) = compute_cpu_usage(*prev, cur_stats, allowed_cores) { + // NOTE these metric names are paired with names in cgroup/v2/cpu.rs and + // must remain consistent. If you change these, change those. + gauge!("stat.total_cpu_percentage", labels).set(util.total_cpu_percentage); + gauge!("stat.cpu_percentage", labels).set(util.total_cpu_percentage); // backward compatibility + gauge!("stat.user_cpu_percentage", labels).set(util.user_cpu_percentage); + gauge!("stat.kernel_cpu_percentage", labels).set(util.system_cpu_percentage); // kernel is a misnomer, keeping for compatibility + gauge!("stat.system_cpu_percentage", labels).set(util.system_cpu_percentage); - // Update previous stats and if there's a time delta calculate the CPU - // usage. - prev.total_ticks = total_ticks; - prev.user_ticks = utime_ticks; - prev.system_ticks = stime_ticks; - prev.last_instant = now; - if delta_time > 0 { - let delta_time = delta_time as f64; + gauge!("stat.total_cpu_usage_millicores", labels).set(util.total_cpu_millicores); + gauge!("stat.user_cpu_usage_millicores", labels).set(util.user_cpu_millicores); + gauge!("stat.kernel_cpu_usage_millicores", labels).set(util.system_cpu_millicores); // kernel is a misnomer + gauge!("stat.system_cpu_usage_millicores", labels).set(util.system_cpu_millicores); + gauge!("stat.cpu_limit_millicores", labels).set(limit_millicores); + } - let tick_to_usec = 1_000_000.0 / ticks_per_second; + *prev = cur_stats; - let delta_usage_usec = (delta_total_ticks as f64) * tick_to_usec; - let delta_user_usec = (delta_user_ticks as f64) * tick_to_usec; - let delta_system_usec = (delta_system_ticks as f64) * tick_to_usec; + Ok(()) +} - // Compute CPU usage as a fraction of a single CPU - let usage_fraction = delta_usage_usec / delta_time; - let user_fraction = delta_user_usec / delta_time; - let system_fraction = delta_system_usec / delta_time; +/// Parse `/proc//stat` and extracts: +/// +/// * pid (1st field) +/// * utime (14th field) +/// * stime (15th field) +/// +/// The pid we already have and it's a check. The utime and stime are going to +/// be used to calculate CPU data. +/// +/// # Errors +/// +/// Function will fail if the stat file is malformed. +fn parse(contents: &str) -> Result<(i32, u64, u64), Error> { + // Search first for command name by searching for the parantheses that + // surround it. These allow us to divide the stat file into two parts, the + // bit with the pid and all the rest of the fields that Linux adds to over + // time. + let start_paren = contents + .find('(') + .ok_or_else(|| Error::StatMalformed("Failed to find '(' in stat contents"))?; + let end_paren = contents + .rfind(')') + .ok_or_else(|| Error::StatMalformed("Failed to find ')' in stat contents"))?; - // NOTE these metric names are paired with names in cgroup/v2/cpu.rs and - // must remain consistent. If you change these, change those. + let before = &contents[..start_paren]; + let after = &contents[end_paren + 2..]; // skip ") " + let before_parts: Vec<&str> = before.split_whitespace().collect(); + if before_parts.is_empty() { + return Err(Error::StatMalformed("Not enough fields before paren")); + } - // Convert usage to a percentage of the cores granted to the target. - let total_cpu = (usage_fraction / allowed_cores) * 100.0; - let user_cpu = (user_fraction / allowed_cores) * 100.0; - let system_cpu = (system_fraction / allowed_cores) * 100.0; - gauge!("stat.total_cpu_percentage", labels).set(total_cpu); - gauge!("stat.cpu_percentage", labels).set(total_cpu); // backward compatibility - gauge!("stat.user_cpu_percentage", labels).set(user_cpu); - gauge!("stat.kernel_cpu_percentage", labels).set(system_cpu); // kernel is a misnomer, keeping for compatibility - gauge!("stat.system_cpu_percentage", labels).set(system_cpu); - - // Convert usage to kubernetes style millicores. - let total_millicores = usage_fraction * 1000.0; - let user_millicores = user_fraction * 1000.0; - let system_millicores = system_fraction * 1000.0; - gauge!("stat.total_cpu_usage_millicores", labels).set(total_millicores); - gauge!("stat.user_cpu_usage_millicores", labels).set(user_millicores); - gauge!("stat.kernel_cpu_usage_millicores", labels).set(system_millicores); // kernel is a misnomer, keeping for compatibility - gauge!("stat.system_cpu_usage_millicores", labels).set(system_millicores); - gauge!("stat.cpu_limit_millicores", labels).set(limit_millicores); + let pid_str = before_parts[0]; + let pid = pid_str.parse::()?; + + let after_parts: Vec<&str> = after.split_whitespace().collect(); + // Okay, looking at proc_pid_stat(5) here's a little table to convince you + // the indexes are right: + // + // Field # Name Index in after_parts + // 3 state 0 + // 4 ppid 1 + // 5 pgrp 2 + // 6 session 3 + // 7 tty_nr 4 + // 8 tpgid 5 + // 9 flags 6 + // 10 minflt 7 + // 11 cminflt 8 + // 12 majflt 9 + // 13 cmajflt 10 + // 14 utime 11 + // 15 stime 12 + + // There might be more fields after stime, but we don't parse these. + if after_parts.len() < 13 { + return Err(Error::StatMalformed("Not enough fields after comm")); } - Ok(()) + let utime = after_parts[11].parse::()?; + let stime = after_parts[12].parse::()?; + + Ok((pid, utime, stime)) +} + +/// Computes CPU usage given current and previous `Stats`. +/// +/// Returns a `CpuUtilization` struct if successful, or `None` if no time has passed. +fn compute_cpu_usage(prev: Stats, cur: Stats, allowed_cores: f64) -> Option { + // Time in ticks since between prev and cur samples. + let delta_time = cur.uptime_ticks.saturating_sub(prev.uptime_ticks); + // If time has not passed we cannot make any claims. + if delta_time == 0 { + return None; + } + // Calculate actual time passed in CPU ticks. + let delta_user_ticks = cur.user_ticks.saturating_sub(prev.user_ticks); + let delta_system_ticks = cur.system_ticks.saturating_sub(prev.system_ticks); + let delta_total_ticks = delta_user_ticks + delta_system_ticks; + + // Fraction of one core's capacity used. + let usage_fraction = (delta_total_ticks as f64) / (delta_time as f64); + let user_fraction = (delta_user_ticks as f64) / (delta_time as f64); + let system_fraction = (delta_system_ticks as f64) / (delta_time as f64); + + // Calculate percentage and millicore views. + let total_cpu_percentage = (usage_fraction / allowed_cores) * 100.0; + let user_cpu_percentage = (user_fraction / allowed_cores) * 100.0; + let system_cpu_percentage = (system_fraction / allowed_cores) * 100.0; + let total_cpu_millicores = usage_fraction * 1000.0; + let user_cpu_millicores = user_fraction * 1000.0; + let system_cpu_millicores = system_fraction * 1000.0; + + Some(CpuUtilization { + total_cpu_percentage, + user_cpu_percentage, + system_cpu_percentage, + total_cpu_millicores, + user_cpu_millicores, + system_cpu_millicores, + }) +} + +#[cfg(test)] +mod test { + use super::{compute_cpu_usage, parse, Stats}; + + #[test] + fn parse_basic() { + let line = + "1234 (some process) S 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23"; + let (pid, utime, stime) = parse(line).unwrap(); + assert_eq!(pid, 1234); + assert_eq!(utime, 11); + assert_eq!(stime, 12); + } + + #[test] + fn compute_cpu_usage_basic() { + // 1 second of time passes over 1 allowed core with 100 ticks per + // second. Values chosen to make the math simple. + + let allowed_cores = 1.0; + + let prev = Stats { + user_ticks: 1_000, + system_ticks: 2_000, + uptime_ticks: 0, + }; + let cur = Stats { + user_ticks: 1_500, + system_ticks: 2_500, + uptime_ticks: 1_000, + }; + + let util = compute_cpu_usage(prev, cur, allowed_cores).unwrap(); + assert!((util.total_cpu_percentage - 100.0).abs() < f64::EPSILON); + assert!((util.user_cpu_percentage - 50.0).abs() < f64::EPSILON); + assert!((util.system_cpu_percentage - 50.0).abs() < f64::EPSILON); + assert!((util.total_cpu_millicores - 1000.0).abs() < f64::EPSILON); + assert!((util.user_cpu_millicores - 500.0).abs() < f64::EPSILON); + assert!((util.system_cpu_millicores - 500.0).abs() < f64::EPSILON); + } + + #[test] + fn compute_cpu_usage_no_time_passed() { + let allowed_cores = 1.0; + let prev = Stats { + user_ticks: 1_000, + system_ticks: 2_000, + uptime_ticks: 10_000, + }; + let cur = Stats { + user_ticks: prev.user_ticks + 100, + system_ticks: prev.system_ticks + 100, + uptime_ticks: prev.uptime_ticks, // no time passed + }; + + let util = compute_cpu_usage(prev, cur, allowed_cores); + assert!(util.is_none()); + } + + #[test] + fn compute_cpu_usage_fractional_cores() { + let allowed_cores = 0.5; + let prev = Stats { + user_ticks: 1_000, + system_ticks: 2_000, + uptime_ticks: 0, + }; + let cur = Stats { + user_ticks: prev.user_ticks + 500, + system_ticks: prev.system_ticks + 500, + uptime_ticks: prev.uptime_ticks + 1_000, + }; + + let util = compute_cpu_usage(prev, cur, allowed_cores).unwrap(); + assert!((util.total_cpu_percentage - 200.0).abs() < f64::EPSILON); + assert!((util.user_cpu_percentage - 100.0).abs() < f64::EPSILON); + assert!((util.system_cpu_percentage - 100.0).abs() < f64::EPSILON); + assert!((util.total_cpu_millicores - 1000.0).abs() < f64::EPSILON); + assert!((util.user_cpu_millicores - 500.0).abs() < f64::EPSILON); + assert!((util.system_cpu_millicores - 500.0).abs() < f64::EPSILON); + } } From 0674f3f4e688a5b3a734182e97cff0c286c58daf Mon Sep 17 00:00:00 2001 From: "Brian L. Troutwine" Date: Fri, 13 Dec 2024 12:59:36 -0800 Subject: [PATCH 3/5] remove accidental global state in /proc/pid/stat sampler Signed-off-by: Brian L. Troutwine --- lading/src/observer/linux/procfs.rs | 69 ++++++--- .../linux/procfs/memory/smaps_rollup.rs | 2 +- lading/src/observer/linux/procfs/stat.rs | 136 +++++++++--------- 3 files changed, 123 insertions(+), 84 deletions(-) diff --git a/lading/src/observer/linux/procfs.rs b/lading/src/observer/linux/procfs.rs index 203aace1e..e188efcf3 100644 --- a/lading/src/observer/linux/procfs.rs +++ b/lading/src/observer/linux/procfs.rs @@ -2,12 +2,15 @@ mod memory; mod stat; -use std::{collections::VecDeque, io}; +use std::{ + collections::{hash_map::Entry, VecDeque}, + io, +}; use metrics::gauge; use nix::errno::Errno; use procfs::process::Process; -use rustc_hash::FxHashSet; +use rustc_hash::{FxHashMap, FxHashSet}; use tracing::{error, warn}; const BYTES_PER_KIBIBYTE: u64 = 1024; @@ -45,6 +48,15 @@ macro_rules! report_status_field { }; } +#[derive(Debug)] +struct ProcessInfo { + cmdline: String, + exe: String, + comm: String, + pid_s: String, + stat_sampler: stat::Sampler, +} + #[derive(Debug)] pub(crate) struct Sampler { parent: Process, @@ -65,6 +77,8 @@ impl Sampler { clippy::cast_possible_wrap )] pub(crate) async fn poll(&mut self) -> Result<(), Error> { + let mut proccess_info: FxHashMap = FxHashMap::default(); + // A tally of the total RSS and PSS consumed by the parent process and // its children. let mut aggr = memory::smaps_rollup::Aggregator::default(); @@ -121,15 +135,36 @@ impl Sampler { continue; } - // Construct labels and then start ripping data into metrics. - let exe = proc_exe(pid).await?; - let comm = proc_comm(pid).await?; - let cmdline = proc_cmdline(pid).await?; - let labels = [ - (String::from("pid"), format!("{pid}")), - (String::from("exe"), exe.clone()), - (String::from("cmdline"), cmdline.clone()), - (String::from("comm"), comm.clone()), + // If we haven't seen this process before, initialize its ProcessInfo. + match proccess_info.entry(pid) { + Entry::Occupied(_) => { /* Already initialized */ } + Entry::Vacant(entry) => { + let exe = proc_exe(pid).await?; + let comm = proc_comm(pid).await?; + let cmdline = proc_cmdline(pid).await?; + let pid_s = format!("{pid}"); + let stat_sampler = stat::Sampler::new(); + + entry.insert(ProcessInfo { + cmdline, + exe, + comm, + pid_s, + stat_sampler, + }); + } + } + + // SAFETY: We've just inserted this pid into the map. + let pinfo = proccess_info + .get_mut(&pid) + .expect("catastrophic programming error"); + + let labels: [(&'static str, String); 4] = [ + ("pid", pinfo.pid_s.clone()), + ("exe", pinfo.exe.clone()), + ("cmdline", pinfo.cmdline.clone()), + ("comm", pinfo.comm.clone()), ]; // `/proc/{pid}/status` @@ -145,7 +180,7 @@ impl Sampler { let uptime = proc_uptime().await?; // `/proc/{pid}/stat`, most especially per-process CPU data. - if let Err(e) = stat::poll(pid, uptime, &labels).await { + if let Err(e) = pinfo.stat_sampler.poll(pid, uptime, &labels).await { // We don't want to bail out entirely if we can't read stats // which will happen if we don't have permissions or, more // likely, the process has exited. @@ -157,11 +192,11 @@ impl Sampler { match memory::smaps::Regions::from_pid(pid) { Ok(memory_regions) => { for (pathname, measures) in memory_regions.aggregate_by_pathname() { - let labels = [ - ("pid", format!("{pid}")), - ("exe", exe.clone()), - ("cmdline", cmdline.clone()), - ("comm", comm.clone()), + let labels: [(&'static str, String); 5] = [ + ("pid", pinfo.pid_s.clone()), + ("exe", pinfo.exe.clone()), + ("cmdline", pinfo.cmdline.clone()), + ("comm", pinfo.comm.clone()), ("pathname", pathname), ]; gauge!("smaps.rss.by_pathname", &labels).set(measures.rss as f64); diff --git a/lading/src/observer/linux/procfs/memory/smaps_rollup.rs b/lading/src/observer/linux/procfs/memory/smaps_rollup.rs index 55765f94e..0e4d72e88 100644 --- a/lading/src/observer/linux/procfs/memory/smaps_rollup.rs +++ b/lading/src/observer/linux/procfs/memory/smaps_rollup.rs @@ -25,7 +25,7 @@ pub(crate) struct Aggregator { // Read `/proc/{pid}/smaps_rollup` and parse it directly into metrics. pub(crate) async fn poll( pid: i32, - labels: &[(String, String)], + labels: &[(&'static str, String)], aggr: &mut Aggregator, ) -> Result<(), Error> { let path = format!("/proc/{pid}/smaps_rollup"); diff --git a/lading/src/observer/linux/procfs/stat.rs b/lading/src/observer/linux/procfs/stat.rs index cc7251d76..d3bb3a29b 100644 --- a/lading/src/observer/linux/procfs/stat.rs +++ b/lading/src/observer/linux/procfs/stat.rs @@ -1,6 +1,4 @@ use metrics::gauge; -use once_cell::sync::OnceCell; -use std::sync::Mutex; use tokio::fs; use crate::observer::linux::cgroup; @@ -19,7 +17,7 @@ pub enum Error { Cgroup(#[from] cgroup::v2::Error), } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, Default)] #[allow(clippy::struct_field_names)] // The _ticks is useful even if clippy doesn't like it. struct Stats { user_ticks: u64, @@ -37,72 +35,78 @@ struct CpuUtilization { system_cpu_millicores: f64, } -static PREV: OnceCell> = OnceCell::new(); - -#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)] -pub(crate) async fn poll( - pid: i32, - uptime_secs: f64, - labels: &[(String, String)], -) -> Result<(), Error> { - let group_prefix = cgroup::v2::get_path(pid).await?; - - // Read cpu.max (cgroup v2) - let cpu_max = fs::read_to_string(group_prefix.join("cpu.max")).await?; - let parts: Vec<&str> = cpu_max.split_whitespace().collect(); - let (max_str, period_str) = (parts[0], parts[1]); - let allowed_cores = if max_str == "max" { - // If the target cgroup has no CPU limit we assume it has access to all - // physical cores. - num_cpus::get_physical() as f64 - } else { - let max_val = max_str.parse::()?; - let period_val = period_str.parse::()?; - max_val / period_val - }; - let limit_millicores = allowed_cores * 1000.0; - - // Read `/proc//stat` - let stat_contents = fs::read_to_string(format!("/proc/{pid}/stat")).await?; - let (cur_pid, utime_ticks, stime_ticks) = parse(&stat_contents)?; - assert!(cur_pid == pid); - - // See sysconf(3). - let ticks_per_second = unsafe { nix::libc::sysconf(nix::libc::_SC_CLK_TCK) } as f64; - - // Get or initialize the previous stats. Note that the first time this is - // initialized we intentionally set last_instance to now to avoid scheduling - // shenanigans. - let cur_stats = Stats { - user_ticks: utime_ticks, - system_ticks: stime_ticks, - uptime_ticks: (uptime_secs * ticks_per_second).round() as u64, - }; - - let mut prev = PREV - .get_or_init(|| Mutex::new(cur_stats)) - .lock() - .expect("/proc/pid/stat previous state lock poisoned"); - - if let Some(util) = compute_cpu_usage(*prev, cur_stats, allowed_cores) { - // NOTE these metric names are paired with names in cgroup/v2/cpu.rs and - // must remain consistent. If you change these, change those. - gauge!("stat.total_cpu_percentage", labels).set(util.total_cpu_percentage); - gauge!("stat.cpu_percentage", labels).set(util.total_cpu_percentage); // backward compatibility - gauge!("stat.user_cpu_percentage", labels).set(util.user_cpu_percentage); - gauge!("stat.kernel_cpu_percentage", labels).set(util.system_cpu_percentage); // kernel is a misnomer, keeping for compatibility - gauge!("stat.system_cpu_percentage", labels).set(util.system_cpu_percentage); - - gauge!("stat.total_cpu_usage_millicores", labels).set(util.total_cpu_millicores); - gauge!("stat.user_cpu_usage_millicores", labels).set(util.user_cpu_millicores); - gauge!("stat.kernel_cpu_usage_millicores", labels).set(util.system_cpu_millicores); // kernel is a misnomer - gauge!("stat.system_cpu_usage_millicores", labels).set(util.system_cpu_millicores); - gauge!("stat.cpu_limit_millicores", labels).set(limit_millicores); +#[derive(Debug)] +pub(crate) struct Sampler { + ticks_per_second: f64, + prev: Stats, +} + +impl Sampler { + pub(crate) fn new() -> Self { + Self { + ticks_per_second: unsafe { nix::libc::sysconf(nix::libc::_SC_CLK_TCK) } as f64, + prev: Stats::default(), + } } - *prev = cur_stats; + #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)] + pub(crate) async fn poll( + &mut self, + pid: i32, + uptime_secs: f64, + labels: &[(&'static str, String)], + ) -> Result<(), Error> { + let group_prefix = cgroup::v2::get_path(pid).await?; + + // Read cpu.max (cgroup v2) + let cpu_max = fs::read_to_string(group_prefix.join("cpu.max")).await?; + let parts: Vec<&str> = cpu_max.split_whitespace().collect(); + let (max_str, period_str) = (parts[0], parts[1]); + let allowed_cores = if max_str == "max" { + // If the target cgroup has no CPU limit we assume it has access to all + // physical cores. + num_cpus::get_physical() as f64 + } else { + let max_val = max_str.parse::()?; + let period_val = period_str.parse::()?; + max_val / period_val + }; + let limit_millicores = allowed_cores * 1000.0; + + // Read `/proc//stat` + let stat_contents = fs::read_to_string(format!("/proc/{pid}/stat")).await?; + let (cur_pid, utime_ticks, stime_ticks) = parse(&stat_contents)?; + assert!(cur_pid == pid); + + // Get or initialize the previous stats. Note that the first time this is + // initialized we intentionally set last_instance to now to avoid scheduling + // shenanigans. + let cur_stats = Stats { + user_ticks: utime_ticks, + system_ticks: stime_ticks, + uptime_ticks: (uptime_secs * self.ticks_per_second).round() as u64, + }; - Ok(()) + if let Some(util) = compute_cpu_usage(self.prev, cur_stats, allowed_cores) { + // NOTE these metric names are paired with names in cgroup/v2/cpu.rs and + // must remain consistent. If you change these, change those. + gauge!("stat.total_cpu_percentage", labels).set(util.total_cpu_percentage); + gauge!("stat.cpu_percentage", labels).set(util.total_cpu_percentage); // backward compatibility + gauge!("stat.user_cpu_percentage", labels).set(util.user_cpu_percentage); + gauge!("stat.kernel_cpu_percentage", labels).set(util.system_cpu_percentage); // kernel is a misnomer, keeping for compatibility + gauge!("stat.system_cpu_percentage", labels).set(util.system_cpu_percentage); + + gauge!("stat.total_cpu_usage_millicores", labels).set(util.total_cpu_millicores); + gauge!("stat.user_cpu_usage_millicores", labels).set(util.user_cpu_millicores); + gauge!("stat.kernel_cpu_usage_millicores", labels).set(util.system_cpu_millicores); // kernel is a misnomer + gauge!("stat.system_cpu_usage_millicores", labels).set(util.system_cpu_millicores); + gauge!("stat.cpu_limit_millicores", labels).set(limit_millicores); + } + + self.prev = cur_stats; + + Ok(()) + } } /// Parse `/proc//stat` and extracts: From be6cf91774c95d8d99eff2b868cfb98b9dc884e9 Mon Sep 17 00:00:00 2001 From: "Brian L. Troutwine" Date: Fri, 13 Dec 2024 13:03:23 -0800 Subject: [PATCH 4/5] proccess -> process Signed-off-by: Brian L. Troutwine --- lading/src/observer/linux/procfs.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lading/src/observer/linux/procfs.rs b/lading/src/observer/linux/procfs.rs index e188efcf3..6e0b2f74a 100644 --- a/lading/src/observer/linux/procfs.rs +++ b/lading/src/observer/linux/procfs.rs @@ -77,7 +77,7 @@ impl Sampler { clippy::cast_possible_wrap )] pub(crate) async fn poll(&mut self) -> Result<(), Error> { - let mut proccess_info: FxHashMap = FxHashMap::default(); + let mut process_info: FxHashMap = FxHashMap::default(); // A tally of the total RSS and PSS consumed by the parent process and // its children. @@ -136,7 +136,7 @@ impl Sampler { } // If we haven't seen this process before, initialize its ProcessInfo. - match proccess_info.entry(pid) { + match process_info.entry(pid) { Entry::Occupied(_) => { /* Already initialized */ } Entry::Vacant(entry) => { let exe = proc_exe(pid).await?; @@ -156,7 +156,7 @@ impl Sampler { } // SAFETY: We've just inserted this pid into the map. - let pinfo = proccess_info + let pinfo = process_info .get_mut(&pid) .expect("catastrophic programming error"); From b84ec4a946c78fd7210aefc22703d36d8d49b68d Mon Sep 17 00:00:00 2001 From: "Brian L. Troutwine" Date: Fri, 13 Dec 2024 13:12:27 -0800 Subject: [PATCH 5/5] make sure process_info persists Signed-off-by: Brian L. Troutwine --- lading/src/observer/linux/procfs.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/lading/src/observer/linux/procfs.rs b/lading/src/observer/linux/procfs.rs index 6e0b2f74a..6e9fea610 100644 --- a/lading/src/observer/linux/procfs.rs +++ b/lading/src/observer/linux/procfs.rs @@ -60,13 +60,18 @@ struct ProcessInfo { #[derive(Debug)] pub(crate) struct Sampler { parent: Process, + process_info: FxHashMap, } impl Sampler { pub(crate) fn new(parent_pid: i32) -> Result { let parent = Process::new(parent_pid)?; + let process_info = FxHashMap::default(); - Ok(Self { parent }) + Ok(Self { + parent, + process_info, + }) } #[allow( @@ -77,8 +82,6 @@ impl Sampler { clippy::cast_possible_wrap )] pub(crate) async fn poll(&mut self) -> Result<(), Error> { - let mut process_info: FxHashMap = FxHashMap::default(); - // A tally of the total RSS and PSS consumed by the parent process and // its children. let mut aggr = memory::smaps_rollup::Aggregator::default(); @@ -136,7 +139,7 @@ impl Sampler { } // If we haven't seen this process before, initialize its ProcessInfo. - match process_info.entry(pid) { + match self.process_info.entry(pid) { Entry::Occupied(_) => { /* Already initialized */ } Entry::Vacant(entry) => { let exe = proc_exe(pid).await?; @@ -156,7 +159,8 @@ impl Sampler { } // SAFETY: We've just inserted this pid into the map. - let pinfo = process_info + let pinfo = self + .process_info .get_mut(&pid) .expect("catastrophic programming error");