From f2f9064c687624757ed9f0f969936c3156658a0e Mon Sep 17 00:00:00 2001 From: Ilya Date: Thu, 4 Apr 2024 11:51:28 +0200 Subject: [PATCH] Add API for monitoring exited tasks (#11) To be compliant with the taskstats docs, also added methods to adjust the RX socket buffer Co-authored-by: Ilya Byckevich --- Cargo.toml | 2 +- src/lib.rs | 121 +++++++++++++++++++++++++++++++++++++++++++------ src/netlink.rs | 8 ++++ 3 files changed, 115 insertions(+), 16 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7ff1871..0854135 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ required-features = ["executable"] [dependencies] libc = "0.2.139" -netlink-sys = "0.8.3" +netlink-sys = "0.8.6" thiserror = "1.0.38" log = "0.4.17" env_logger = { version = "0.10.0", optional = true } diff --git a/src/lib.rs b/src/lib.rs index cf16064..7a1f657 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,15 +12,15 @@ pub use model::*; pub use c_headers::taskstats as __bindgen_taskstats; use c_headers::{ - __u16, __u32, __u64, __u8, TASKSTATS_CMD_ATTR_PID, TASKSTATS_CMD_ATTR_TGID, TASKSTATS_CMD_GET, + __u16, __u32, __u64, __u8, TASKSTATS_CMD_ATTR_DEREGISTER_CPUMASK, TASKSTATS_CMD_ATTR_PID, + TASKSTATS_CMD_ATTR_REGISTER_CPUMASK, TASKSTATS_CMD_ATTR_TGID, TASKSTATS_CMD_GET, TASKSTATS_GENL_NAME, TASKSTATS_TYPE_AGGR_PID, TASKSTATS_TYPE_AGGR_TGID, TASKSTATS_TYPE_NULL, TASKSTATS_TYPE_PID, TASKSTATS_TYPE_STATS, TASKSTATS_TYPE_TGID, }; use log::{debug, warn}; use netlink::Netlink; use netlink::NlPayload; -use std::mem; -use std::slice; +use std::{mem, slice}; use thiserror::Error; /// Errors possibly returned by `Client` @@ -93,12 +93,7 @@ impl Client { /// * when kernel responded error /// * when the returned data couldn't be interpreted pub fn pid_stats(&self, tid: u32) -> Result { - self.netlink.send_cmd( - self.ts_family_id, - TASKSTATS_CMD_GET as u8, - TASKSTATS_CMD_ATTR_PID as u16, - tid.as_buf(), - )?; + self.send(TASKSTATS_CMD_ATTR_PID as u16, tid.as_buf())?; let resp = self.netlink.recv_response()?; for na in resp.payload_as_nlattrs() { @@ -138,12 +133,7 @@ impl Client { /// * when kernel responded error /// * when the returned data couldn't be interpreted pub fn tgid_stats(&self, tgid: u32) -> Result { - self.netlink.send_cmd( - self.ts_family_id, - TASKSTATS_CMD_GET as u8, - TASKSTATS_CMD_ATTR_TGID as u16, - tgid.as_buf(), - )?; + self.send(TASKSTATS_CMD_ATTR_TGID as u16, tgid.as_buf())?; let resp = self.netlink.recv_response()?; for na in resp.payload_as_nlattrs() { @@ -168,6 +158,107 @@ impl Client { "no TASKSTATS_TYPE_STATS found in response".to_string(), )) } + + /// Register listener with the specific cpumask + /// + /// # Arguments + /// * `cpu_mask` - cpumask is specified as an ascii string of comma-separated cpu ranges e.g. + /// to listen to exit data from cpus 1,2,3,5,7,8 the cpumask would be "1-3,5,7-8". + pub fn register_cpumask(&self, cpu_mask: &str) -> Result<()> { + self.send( + TASKSTATS_CMD_ATTR_REGISTER_CPUMASK as u16, + cpu_mask.as_bytes(), + )?; + Ok(()) + } + + /// Deregister listener with the specific cpumask + /// If userspace forgets to deregister interest in cpus before closing the listening socket, + /// the kernel cleans up its interest set over time. However, for the sake of efficiency, + /// an explicit deregistration is advisable. + /// + /// # Arguments + /// * `cpu_mask` - cpumask is specified as an ascii string of comma-separated cpu ranges e.g. + /// to listen to exit data from cpus 1,2,3,5,7,8 the cpumask would be "1-3,5,7-8". + pub fn deregister_cpumask(&self, cpu_mask: &str) -> Result<()> { + self.send( + TASKSTATS_CMD_ATTR_DEREGISTER_CPUMASK as u16, + cpu_mask.as_bytes(), + )?; + Ok(()) + } + + /// Listen registered cpumask's. + /// If no messages are available at the socket, the receive call + /// wait for a message to arrive, unless the socket is nonblocking. + /// + /// # Return + /// * `Ok(Vec)`: vector with stats messages. If the current task is NOT the last + /// one in its thread group, only one message is returned in the vector. + /// However, if it is the last task, an additional element containing the per-thread + /// group ID (tgid) statistics is also included. This additional element sums up + /// the statistics for all threads within the thread group, both past and present + pub fn listen_registered(&self) -> Result> { + let resp = self.netlink.recv_response()?; + let mut stats_vec = Vec::new(); + + for na in resp.payload_as_nlattrs() { + match na.header.nla_type as u32 { + TASKSTATS_TYPE_NULL => break, + TASKSTATS_TYPE_AGGR_PID | TASKSTATS_TYPE_AGGR_TGID => { + for inner in na.payload_as_nlattrs() { + match inner.header.nla_type as u32 { + TASKSTATS_TYPE_PID => debug!("Received TASKSTATS_TYPE_PID"), + TASKSTATS_TYPE_TGID => debug!("Received TASKSTATS_TYPE_TGID"), + TASKSTATS_TYPE_STATS => { + stats_vec.push(TaskStats::from(inner.payload())); + } + unknown => println!("Skipping unknown nla_type: {}", unknown), + } + } + } + unknown => println!("Skipping unknown nla_type: {}", unknown), + } + } + if !stats_vec.is_empty() { + return Ok(stats_vec); + } + Err(Error::Unknown( + "no TASKSTATS_TYPE_STATS found in response".to_string(), + )) + } + + /// Set receiver buffer size in bytes (SO_RCVBUF socket option, see socket(7)) + /// + /// # Arguments + /// * `payload` - buffer size in bytes. The kernel doubles this value + /// (to allow space for bookkeeping overhead). The default value is set by the + /// /proc/sys/net/core/rmem_default file, and the maximum allowed value is set by the + /// /proc/sys/net/core/rmem_max file. The minimum (doubled) value for this option is 256. + pub fn set_rx_buf_sz(&self, payload: T) -> Result<()> { + self.netlink + .set_rx_buf_sz(payload) + .map_err(|err| err.into()) + } + + /// Get receiver buffer size in bytes (SO_RCVBUF socket option, see socket(7)) + /// + /// # Return + /// * `usize` buffer size in bytes. + /// Kernel returns doubled value, that have been set using [set_rx_buf_sz] + pub fn get_rx_buf_sz(&self) -> Result { + self.netlink.get_rx_buf_sz().map_err(|err| err.into()) + } + + pub fn send(&self, taskstats_cmd: u16, data: &[u8]) -> Result<()> { + self.netlink.send_cmd( + self.ts_family_id, + TASKSTATS_CMD_GET as u8, + taskstats_cmd, + data, + )?; + Ok(()) + } } trait AsBuf { diff --git a/src/netlink.rs b/src/netlink.rs index 463cec0..037173d 100644 --- a/src/netlink.rs +++ b/src/netlink.rs @@ -105,6 +105,14 @@ impl Netlink { mypid: process::id(), }) } + + pub fn set_rx_buf_sz(&self, payload: T) -> Result<()> { + self.sock.set_rx_buf_sz(payload).map_err(|err| err.into()) + } + + pub fn get_rx_buf_sz(&self) -> Result { + self.sock.get_rx_buf_sz().map_err(|err| err.into()) + } } impl Netlink {