diff --git a/Cargo.lock b/Cargo.lock index 376321bdbc23..4ab6741f2d35 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6385,6 +6385,8 @@ dependencies = [ "nix 0.26.2", "polkadot-cli", "polkadot-core-primitives", + "polkadot-node-core-pvf", + "polkadot-overseer", "substrate-rpc-client", "tempfile", "tikv-jemallocator", diff --git a/Cargo.toml b/Cargo.toml index 6ce31d1e938b..db341c55315e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,10 +19,14 @@ repository = "https://github.com/paritytech/polkadot.git" version = "0.9.37" [dependencies] -polkadot-cli = { path = "cli", features = [ "kusama-native", "westend-native", "rococo-native", "hostperfcheck" ] } color-eyre = { version = "0.6.1", default-features = false } tikv-jemallocator = "0.5.0" +# Crates in our workspace, defined as dependencies so we can pass them feature flags. +polkadot-cli = { path = "cli", features = [ "kusama-native", "westend-native", "rococo-native" ] } +polkadot-node-core-pvf = { path = "node/core/pvf" } +polkadot-overseer = { path = "node/overseer" } + [dev-dependencies] assert_cmd = "2.0.4" nix = { version = "0.26.1", features = ["signal"] } @@ -202,6 +206,7 @@ try-runtime = [ "polkadot-cli/try-runtime" ] fast-runtime = [ "polkadot-cli/fast-runtime" ] runtime-metrics = [ "polkadot-cli/runtime-metrics" ] pyroscope = ["polkadot-cli/pyroscope"] +jemalloc-allocator = ["polkadot-node-core-pvf/jemalloc-allocator", "polkadot-overseer/jemalloc-allocator"] # Configuration for building a .deb package - for use with `cargo-deb` [package.metadata.deb] diff --git a/node/core/pvf/Cargo.toml b/node/core/pvf/Cargo.toml index e918c5f90fb2..f19663bab98e 100644 --- a/node/core/pvf/Cargo.toml +++ b/node/core/pvf/Cargo.toml @@ -20,7 +20,7 @@ rand = "0.8.5" rayon = "1.5.1" slotmap = "1.0" tempfile = "3.3.0" -tikv-jemalloc-ctl = "0.5.0" +tikv-jemalloc-ctl = { version = "0.5.0", optional = true } tokio = { version = "1.24.2", features = ["fs", "process"] } parity-scale-codec = { version = "3.3.0", default-features = false, features = ["derive"] } @@ -41,9 +41,13 @@ sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master [target.'cfg(target_os = "linux")'.dependencies] libc = "0.2.139" +tikv-jemalloc-ctl = "0.5.0" [dev-dependencies] adder = { package = "test-parachain-adder", path = "../../../parachain/test-parachains/adder" } halt = { package = "test-parachain-halt", path = "../../../parachain/test-parachains/halt" } hex-literal = "0.3.4" tempfile = "3.3.0" + +[features] +jemalloc-allocator = ["dep:tikv-jemalloc-ctl"] diff --git a/node/core/pvf/src/prepare/memory_stats.rs b/node/core/pvf/src/prepare/memory_stats.rs index 4765a196d54e..013c0017e623 100644 --- a/node/core/pvf/src/prepare/memory_stats.rs +++ b/node/core/pvf/src/prepare/memory_stats.rs @@ -29,16 +29,7 @@ use crate::{metrics::Metrics, LOG_TARGET}; use parity_scale_codec::{Decode, Encode}; -use std::{ - io, - sync::mpsc::{Receiver, RecvTimeoutError, Sender}, - time::Duration, -}; -use tikv_jemalloc_ctl::{epoch, stats, Error}; -use tokio::task::JoinHandle; - -#[cfg(target_os = "linux")] -use libc::{getrusage, rusage, timeval, RUSAGE_THREAD}; +use std::io; /// Helper struct to contain all the memory stats, including [`MemoryAllocationStats`] and, if /// supported by the OS, `ru_maxrss`. @@ -60,164 +51,17 @@ pub struct MemoryAllocationStats { pub allocated: u64, } -#[derive(Clone)] -struct MemoryAllocationTracker { - epoch: tikv_jemalloc_ctl::epoch_mib, - allocated: stats::allocated_mib, - resident: stats::resident_mib, -} - -impl MemoryAllocationTracker { - pub fn new() -> Result { - Ok(Self { - epoch: epoch::mib()?, - allocated: stats::allocated::mib()?, - resident: stats::resident::mib()?, - }) - } - - pub fn snapshot(&self) -> Result { - // update stats by advancing the allocation epoch - self.epoch.advance()?; - - // Convert to `u64`, as `usize` is not `Encode`able. - let allocated = self.allocated.read()? as u64; - let resident = self.resident.read()? as u64; - Ok(MemoryAllocationStats { allocated, resident }) - } -} - -/// Get the rusage stats for the current thread. -#[cfg(target_os = "linux")] -fn getrusage_thread() -> io::Result { - let mut result = rusage { - ru_utime: timeval { tv_sec: 0, tv_usec: 0 }, - ru_stime: timeval { tv_sec: 0, tv_usec: 0 }, - ru_maxrss: 0, - ru_ixrss: 0, - ru_idrss: 0, - ru_isrss: 0, - ru_minflt: 0, - ru_majflt: 0, - ru_nswap: 0, - ru_inblock: 0, - ru_oublock: 0, - ru_msgsnd: 0, - ru_msgrcv: 0, - ru_nsignals: 0, - ru_nvcsw: 0, - ru_nivcsw: 0, - }; - if unsafe { getrusage(RUSAGE_THREAD, &mut result) } == -1 { - return Err(io::Error::last_os_error()) - } - Ok(result) -} - /// Gets the `ru_maxrss` for the current thread if the OS supports `getrusage`. Otherwise, just /// returns `None`. pub fn get_max_rss_thread() -> Option> { // `c_long` is either `i32` or `i64` depending on architecture. `i64::from` always works. #[cfg(target_os = "linux")] - let max_rss = Some(getrusage_thread().map(|rusage| i64::from(rusage.ru_maxrss))); + let max_rss = Some(getrusage::getrusage_thread().map(|rusage| i64::from(rusage.ru_maxrss))); #[cfg(not(target_os = "linux"))] let max_rss = None; max_rss } -/// Runs a thread in the background that observes memory statistics. The goal is to try to get -/// accurate stats during preparation. -/// -/// # Algorithm -/// -/// 1. Create the memory tracker. -/// -/// 2. Sleep for some short interval. Whenever we wake up, take a snapshot by updating the -/// allocation epoch. -/// -/// 3. When we receive a signal that preparation has completed, take one last snapshot and return -/// the maximum observed values. -/// -/// # Errors -/// -/// For simplicity, any errors are returned as a string. As this is not a critical component, errors -/// are used for informational purposes (logging) only. -pub fn memory_tracker_loop(finished_rx: Receiver<()>) -> Result { - // This doesn't need to be too fine-grained since preparation currently takes 3-10s or more. - // Apart from that, there is not really a science to this number. - const POLL_INTERVAL: Duration = Duration::from_millis(100); - - let tracker = MemoryAllocationTracker::new().map_err(|err| err.to_string())?; - let mut max_stats = MemoryAllocationStats::default(); - - let mut update_stats = || -> Result<(), String> { - let current_stats = tracker.snapshot().map_err(|err| err.to_string())?; - if current_stats.resident > max_stats.resident { - max_stats.resident = current_stats.resident; - } - if current_stats.allocated > max_stats.allocated { - max_stats.allocated = current_stats.allocated; - } - Ok(()) - }; - - loop { - // Take a snapshot and update the max stats. - update_stats()?; - - // Sleep. - match finished_rx.recv_timeout(POLL_INTERVAL) { - // Received finish signal. - Ok(()) => { - update_stats()?; - return Ok(max_stats) - }, - // Timed out, restart loop. - Err(RecvTimeoutError::Timeout) => continue, - Err(RecvTimeoutError::Disconnected) => - return Err("memory_tracker_loop: finished_rx disconnected".into()), - } - } -} - -/// Helper function to terminate the memory tracker thread and get the stats. Helps isolate all this -/// error handling. -pub async fn get_memory_tracker_loop_stats( - fut: JoinHandle>, - tx: Sender<()>, -) -> Option { - // Signal to the memory tracker thread to terminate. - if let Err(err) = tx.send(()) { - gum::warn!( - target: LOG_TARGET, - worker_pid = %std::process::id(), - "worker: error sending signal to memory tracker_thread: {}", err - ); - None - } else { - // Join on the thread handle. - match fut.await { - Ok(Ok(stats)) => Some(stats), - Ok(Err(err)) => { - gum::warn!( - target: LOG_TARGET, - worker_pid = %std::process::id(), - "worker: error occurred in the memory tracker thread: {}", err - ); - None - }, - Err(err) => { - gum::warn!( - target: LOG_TARGET, - worker_pid = %std::process::id(), - "worker: error joining on memory tracker thread: {}", err - ); - None - }, - } - } -} - /// Helper function to send the memory metrics, if available, to prometheus. pub fn observe_memory_metrics(metrics: &Metrics, memory_stats: MemoryStats, pid: u32) { if let Some(max_rss) = memory_stats.max_rss { @@ -241,3 +85,166 @@ pub fn observe_memory_metrics(metrics: &Metrics, memory_stats: MemoryStats, pid: metrics.observe_preparation_max_allocated(allocated_kb); } } + +#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] +pub mod memory_tracker { + use super::*; + use std::{ + sync::mpsc::{Receiver, RecvTimeoutError, Sender}, + time::Duration, + }; + use tikv_jemalloc_ctl::{epoch, stats, Error}; + use tokio::task::JoinHandle; + + #[derive(Clone)] + struct MemoryAllocationTracker { + epoch: tikv_jemalloc_ctl::epoch_mib, + allocated: stats::allocated_mib, + resident: stats::resident_mib, + } + + impl MemoryAllocationTracker { + pub fn new() -> Result { + Ok(Self { + epoch: epoch::mib()?, + allocated: stats::allocated::mib()?, + resident: stats::resident::mib()?, + }) + } + + pub fn snapshot(&self) -> Result { + // update stats by advancing the allocation epoch + self.epoch.advance()?; + + // Convert to `u64`, as `usize` is not `Encode`able. + let allocated = self.allocated.read()? as u64; + let resident = self.resident.read()? as u64; + Ok(MemoryAllocationStats { allocated, resident }) + } + } + + /// Runs a thread in the background that observes memory statistics. The goal is to try to get + /// accurate stats during preparation. + /// + /// # Algorithm + /// + /// 1. Create the memory tracker. + /// + /// 2. Sleep for some short interval. Whenever we wake up, take a snapshot by updating the + /// allocation epoch. + /// + /// 3. When we receive a signal that preparation has completed, take one last snapshot and return + /// the maximum observed values. + /// + /// # Errors + /// + /// For simplicity, any errors are returned as a string. As this is not a critical component, errors + /// are used for informational purposes (logging) only. + pub fn memory_tracker_loop(finished_rx: Receiver<()>) -> Result { + // This doesn't need to be too fine-grained since preparation currently takes 3-10s or more. + // Apart from that, there is not really a science to this number. + const POLL_INTERVAL: Duration = Duration::from_millis(100); + + let tracker = MemoryAllocationTracker::new().map_err(|err| err.to_string())?; + let mut max_stats = MemoryAllocationStats::default(); + + let mut update_stats = || -> Result<(), String> { + let current_stats = tracker.snapshot().map_err(|err| err.to_string())?; + if current_stats.resident > max_stats.resident { + max_stats.resident = current_stats.resident; + } + if current_stats.allocated > max_stats.allocated { + max_stats.allocated = current_stats.allocated; + } + Ok(()) + }; + + loop { + // Take a snapshot and update the max stats. + update_stats()?; + + // Sleep. + match finished_rx.recv_timeout(POLL_INTERVAL) { + // Received finish signal. + Ok(()) => { + update_stats()?; + return Ok(max_stats) + }, + // Timed out, restart loop. + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => + return Err("memory_tracker_loop: finished_rx disconnected".into()), + } + } + } + + /// Helper function to terminate the memory tracker thread and get the stats. Helps isolate all this + /// error handling. + pub async fn get_memory_tracker_loop_stats( + fut: JoinHandle>, + tx: Sender<()>, + ) -> Option { + // Signal to the memory tracker thread to terminate. + if let Err(err) = tx.send(()) { + gum::warn!( + target: LOG_TARGET, + worker_pid = %std::process::id(), + "worker: error sending signal to memory tracker_thread: {}", err + ); + None + } else { + // Join on the thread handle. + match fut.await { + Ok(Ok(stats)) => Some(stats), + Ok(Err(err)) => { + gum::warn!( + target: LOG_TARGET, + worker_pid = %std::process::id(), + "worker: error occurred in the memory tracker thread: {}", err + ); + None + }, + Err(err) => { + gum::warn!( + target: LOG_TARGET, + worker_pid = %std::process::id(), + "worker: error joining on memory tracker thread: {}", err + ); + None + }, + } + } + } +} + +#[cfg(target_os = "linux")] +mod getrusage { + use libc::{getrusage, rusage, timeval, RUSAGE_THREAD}; + use std::io; + + /// Get the rusage stats for the current thread. + pub fn getrusage_thread() -> io::Result { + let mut result = rusage { + ru_utime: timeval { tv_sec: 0, tv_usec: 0 }, + ru_stime: timeval { tv_sec: 0, tv_usec: 0 }, + ru_maxrss: 0, + ru_ixrss: 0, + ru_idrss: 0, + ru_isrss: 0, + ru_minflt: 0, + ru_majflt: 0, + ru_nswap: 0, + ru_inblock: 0, + ru_oublock: 0, + ru_msgsnd: 0, + ru_msgrcv: 0, + ru_nsignals: 0, + ru_nvcsw: 0, + ru_nivcsw: 0, + }; + if unsafe { getrusage(RUSAGE_THREAD, &mut result) } == -1 { + return Err(io::Error::last_os_error()) + } + Ok(result) + } +} diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index bb6e120a6691..69c1d3bb5557 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -14,10 +14,9 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use super::memory_stats::{ - get_max_rss_thread, get_memory_tracker_loop_stats, memory_tracker_loop, observe_memory_metrics, - MemoryStats, -}; +#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] +use super::memory_stats::memory_tracker::{get_memory_tracker_loop_stats, memory_tracker_loop}; +use super::memory_stats::{get_max_rss_thread, observe_memory_metrics, MemoryStats}; use crate::{ artifacts::CompiledArtifact, error::{PrepareError, PrepareResult}, @@ -373,9 +372,10 @@ pub fn worker_entrypoint(socket_path: &str) { let cpu_time_start = ProcessTime::now(); // Run the memory tracker. + #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] let (memory_tracker_tx, memory_tracker_rx) = channel::<()>(); - let memory_tracker_fut = - rt_handle.spawn_blocking(move || memory_tracker_loop(memory_tracker_rx)); + #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] + let memory_tracker_fut = rt_handle.spawn_blocking(move || memory_tracker_loop(memory_tracker_rx)); // Spawn a new thread that runs the CPU time monitor. let (cpu_time_monitor_tx, cpu_time_monitor_rx) = channel::<()>(); @@ -431,8 +431,11 @@ pub fn worker_entrypoint(socket_path: &str) { }, (Ok(compiled_artifact), max_rss) => { // Stop the memory stats worker and get its observed memory stats. + #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] let memory_tracker_stats = get_memory_tracker_loop_stats(memory_tracker_fut, memory_tracker_tx).await; + #[cfg(not(any(target_os = "linux", feature = "jemalloc-allocator")))] + let memory_tracker_stats = None; let memory_stats = MemoryStats { memory_tracker_stats, max_rss: max_rss.map(|inner| inner.map_err(|e| e.to_string())), diff --git a/node/overseer/Cargo.toml b/node/overseer/Cargo.toml index 262eddeec61e..5d26c0b6e2bf 100644 --- a/node/overseer/Cargo.toml +++ b/node/overseer/Cargo.toml @@ -20,7 +20,7 @@ gum = { package = "tracing-gum", path = "../gum" } lru = "0.9" sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } async-trait = "0.1.57" -tikv-jemalloc-ctl = "0.5.0" +tikv-jemalloc-ctl = { version = "0.5.0", optional = true } [dev-dependencies] metered = { package = "prioritized-metered-channel", version = "0.2.0" } @@ -31,7 +31,11 @@ femme = "2.2.1" assert_matches = "1.4.0" test-helpers = { package = "polkadot-primitives-test-helpers", path = "../../primitives/test-helpers" } +[target.'cfg(target_os = "linux")'.dependencies] +tikv-jemalloc-ctl = "0.5.0" + [features] default = [] expand = ["orchestra/expand"] dotgraph = ["orchestra/dotgraph"] +jemalloc-allocator = ["dep:tikv-jemalloc-ctl"] diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 6b63235e12a1..4d04ece70643 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -117,14 +117,13 @@ pub const KNOWN_LEAVES_CACHE_SIZE: NonZeroUsize = match NonZeroUsize::new(2 * 24 None => panic!("Known leaves cache size must be non-zero"), }; +#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] mod memory_stats; #[cfg(test)] mod tests; use sp_core::traits::SpawnNamed; -use memory_stats::MemoryAllocationTracker; - /// Glue to connect `trait orchestra::Spawner` and `SpawnNamed` from `substrate`. pub struct SpawnGlue(pub S); @@ -654,8 +653,9 @@ where } let subsystem_meters = overseer.map_subsystems(ExtractNameAndMeters); + #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] let collect_memory_stats: Box = - match MemoryAllocationTracker::new() { + match memory_stats::MemoryAllocationTracker::new() { Ok(memory_stats) => Box::new(move |metrics: &OverseerMetrics| match memory_stats.snapshot() { Ok(memory_stats_snapshot) => { @@ -679,6 +679,9 @@ where }, }; + #[cfg(not(any(target_os = "linux", feature = "jemalloc-allocator")))] + let collect_memory_stats: Box = Box::new(|_| {}); + let metronome = Metronome::new(std::time::Duration::from_millis(950)).for_each(move |_| { collect_memory_stats(&metronome_metrics); diff --git a/node/overseer/src/memory_stats.rs b/node/overseer/src/memory_stats.rs index 908e20cc213a..9d7ebdb943ea 100644 --- a/node/overseer/src/memory_stats.rs +++ b/node/overseer/src/memory_stats.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use tikv_jemalloc_ctl::{epoch, stats, Error}; +use tikv_jemalloc_ctl::stats; #[derive(Clone)] pub struct MemoryAllocationTracker { @@ -24,15 +24,15 @@ pub struct MemoryAllocationTracker { } impl MemoryAllocationTracker { - pub fn new() -> Result { + pub fn new() -> Result { Ok(Self { - epoch: epoch::mib()?, + epoch: tikv_jemalloc_ctl::epoch::mib()?, allocated: stats::allocated::mib()?, resident: stats::resident::mib()?, }) } - pub fn snapshot(&self) -> Result { + pub fn snapshot(&self) -> Result { // update stats by advancing the allocation epoch self.epoch.advance()?; diff --git a/node/overseer/src/metrics.rs b/node/overseer/src/metrics.rs index b7a4ff443fae..57858fe98073 100644 --- a/node/overseer/src/metrics.rs +++ b/node/overseer/src/metrics.rs @@ -19,8 +19,6 @@ use super::*; pub use polkadot_node_metrics::metrics::{self, prometheus, Metrics as MetricsTrait}; -use memory_stats::MemoryAllocationSnapshot; - /// Overseer Prometheus metrics. #[derive(Clone)] struct MetricsInner { @@ -40,7 +38,9 @@ struct MetricsInner { signals_sent: prometheus::GaugeVec, signals_received: prometheus::GaugeVec, + #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] memory_stats_resident: prometheus::Gauge, + #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] memory_stats_allocated: prometheus::Gauge, } @@ -67,7 +67,11 @@ impl Metrics { } } - pub(crate) fn memory_stats_snapshot(&self, memory_stats: MemoryAllocationSnapshot) { + #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] + pub(crate) fn memory_stats_snapshot( + &self, + memory_stats: memory_stats::MemoryAllocationSnapshot, + ) { if let Some(metrics) = &self.0 { metrics.memory_stats_allocated.set(memory_stats.allocated as u64); metrics.memory_stats_resident.set(memory_stats.resident as u64); @@ -246,7 +250,7 @@ impl MetricsTrait for Metrics { )?, registry, )?, - + #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] memory_stats_allocated: prometheus::register( prometheus::Gauge::::new( "polkadot_memory_allocated", @@ -254,6 +258,7 @@ impl MetricsTrait for Metrics { )?, registry, )?, + #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] memory_stats_resident: prometheus::register( prometheus::Gauge::::new( "polkadot_memory_resident", diff --git a/src/main.rs b/src/main.rs index 13c17df851f1..2deb198ec773 100644 --- a/src/main.rs +++ b/src/main.rs @@ -22,6 +22,7 @@ use color_eyre::eyre; /// Global allocator. Changing it to another allocator will require changing /// `memory_stats::MemoryAllocationTracker`. +#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] #[global_allocator] pub static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;