From 945ef03f2c21742c51cd4b262e7d806db5e7f1df Mon Sep 17 00:00:00 2001 From: xx01cyx Date: Mon, 13 Mar 2023 08:25:56 +0000 Subject: [PATCH 1/4] refine conditional compilation for mem control on different OSs --- .../src/memory_management/memory_manager.rs | 25 ++--- src/compute/src/memory_management/mod.rs | 95 +++++++++++++++++++ src/compute/src/memory_management/policy.rs | 62 +----------- src/compute/src/server.rs | 2 +- 4 files changed, 106 insertions(+), 78 deletions(-) diff --git a/src/compute/src/memory_management/memory_manager.rs b/src/compute/src/memory_management/memory_manager.rs index 97d6be1c38dcd..b585bfbc973ef 100644 --- a/src/compute/src/memory_management/memory_manager.rs +++ b/src/compute/src/memory_management/memory_manager.rs @@ -14,12 +14,15 @@ use std::sync::atomic::AtomicU64; use std::sync::Arc; +use std::time::Duration; use risingwave_batch::task::BatchManager; +use risingwave_common::util::epoch::Epoch; use risingwave_stream::executor::monitor::StreamingMetrics; use risingwave_stream::task::LocalStreamManager; -use super::policy::MemoryControlPolicy; +use super::MemoryControlPolicy; +use crate::memory_management::MemoryControlStats; /// The minimal memory requirement of computing tasks in megabytes. pub const MIN_COMPUTE_MEMORY_MB: usize = 512; @@ -29,7 +32,6 @@ pub const SYSTEM_RESERVED_MEMORY_MB: usize = 512; /// When `enable_managed_cache` is set, compute node will launch a [`GlobalMemoryManager`] to limit /// the memory usage. -#[cfg_attr(not(target_os = "linux"), expect(dead_code))] pub struct GlobalMemoryManager { /// All cached data before the watermark should be evicted. watermark_epoch: Arc, @@ -74,28 +76,13 @@ impl GlobalMemoryManager { self.watermark_epoch.clone() } - // FIXME: remove such limitation after #7180 - /// Jemalloc is not supported on Windows, because of tikv-jemalloc's own reasons. - /// See the comments for the macro `enable_jemalloc_on_linux!()` - #[cfg(not(target_os = "linux"))] - #[expect(clippy::unused_async)] - pub async fn run(self: Arc, _: Arc, _: Arc) {} - - /// Memory manager will get memory usage from batch and streaming, and do some actions. - /// 1. if batch exceeds, kill running query. - /// 2. if streaming exceeds, evict cache by watermark. - #[cfg(target_os = "linux")] + /// Memory manager will get memory usage statistics from batch and streaming and perform memory + /// control accordingly. pub async fn run( self: Arc, batch_manager: Arc, stream_manager: Arc, ) { - use std::time::Duration; - - use risingwave_common::util::epoch::Epoch; - - use crate::memory_management::policy::MemoryControlStats; - let mut tick_interval = tokio::time::interval(Duration::from_millis(self.barrier_interval_ms as u64)); let mut memory_control_stats = MemoryControlStats { diff --git a/src/compute/src/memory_management/mod.rs b/src/compute/src/memory_management/mod.rs index 4fc5f94fbbfe5..89d2b3e7906c3 100644 --- a/src/compute/src/memory_management/mod.rs +++ b/src/compute/src/memory_management/mod.rs @@ -13,4 +13,99 @@ // limitations under the License. pub mod memory_manager; +#[cfg(target_os = "linux")] pub mod policy; + +use std::sync::atomic::AtomicU64; +use std::sync::Arc; + +use risingwave_batch::task::BatchManager; +use risingwave_common::error::Result; +use risingwave_stream::task::LocalStreamManager; + +use crate::ComputeNodeOpts; + +/// `MemoryControlStats` contains the necessary information for memory control, including both batch +/// and streaming. +#[derive(Default)] +pub struct MemoryControlStats { + pub batch_memory_usage: usize, + pub streaming_memory_usage: usize, + pub jemalloc_allocated_mib: usize, + pub lru_watermark_step: u64, + pub lru_watermark_time_ms: u64, + pub lru_physical_now_ms: u64, +} + +pub type MemoryControlPolicy = Box; + +pub trait MemoryControl: Send + Sync { + fn apply( + &self, + total_compute_memory_bytes: usize, + barrier_interval_ms: u32, + prev_memory_stats: MemoryControlStats, + batch_manager: Arc, + stream_manager: Arc, + watermark_epoch: Arc, + ) -> MemoryControlStats; + + fn describe(&self, total_compute_memory_bytes: usize) -> String; +} + +#[cfg_attr(not(target_os = "linux"), expect(unused_variables))] +pub fn memory_control_policy_from_config(opts: &ComputeNodeOpts) -> Result { + #[cfg(target_os = "linux")] + { + use anyhow::anyhow; + + use self::policy::{FixedProportionPolicy, StreamingOnlyPolicy}; + + let input_policy = &opts.memory_control_policy; + if input_policy == FixedProportionPolicy::CONFIG_STR { + Ok(Box::new(FixedProportionPolicy::new( + opts.streaming_memory_proportion, + )?)) + } else if input_policy == StreamingOnlyPolicy::CONFIG_STR { + Ok(Box::new(StreamingOnlyPolicy)) + } else { + let valid_values = [ + FixedProportionPolicy::CONFIG_STR, + StreamingOnlyPolicy::CONFIG_STR, + ]; + Err(anyhow!(format!( + "invalid memory control policy in configuration: {}, valid values: {:?}", + input_policy, valid_values, + )) + .into()) + } + } + + #[cfg(not(target_os = "linux"))] + { + // We disable memory control on operating systems other than Linux now because jemalloc + // stats do not work well. + tracing::warn!("memory control is only enabled on Linux now"); + Ok(Box::new(DummyPolicy)) + } +} + +pub struct DummyPolicy; + +impl MemoryControl for DummyPolicy { + fn apply( + &self, + _total_compute_memory_bytes: usize, + _barrier_interval_ms: u32, + _prev_memory_stats: MemoryControlStats, + _batch_manager: Arc, + _stream_manager: Arc, + _watermark_epoch: Arc, + ) -> MemoryControlStats { + MemoryControlStats::default() + } + + fn describe(&self, _total_compute_memory_bytes: usize) -> String { + "DummyPolicy".to_string() + } +} diff --git a/src/compute/src/memory_management/policy.rs b/src/compute/src/memory_management/policy.rs index 56caa279a6f9b..5b7f20c88cb46 100644 --- a/src/compute/src/memory_management/policy.rs +++ b/src/compute/src/memory_management/policy.rs @@ -22,55 +22,7 @@ use risingwave_common::error::Result; use risingwave_common::util::epoch::Epoch; use risingwave_stream::task::LocalStreamManager; -use crate::ComputeNodeOpts; - -/// `MemoryControlStats` contains the necessary information for memory control, including both batch -/// and streaming. -pub struct MemoryControlStats { - pub batch_memory_usage: usize, - pub streaming_memory_usage: usize, - pub jemalloc_allocated_mib: usize, - pub lru_watermark_step: u64, - pub lru_watermark_time_ms: u64, - pub lru_physical_now_ms: u64, -} - -pub type MemoryControlPolicy = Box; - -pub trait MemoryControl: Send + Sync { - fn apply( - &self, - total_compute_memory_bytes: usize, - barrier_interval_ms: u32, - prev_memory_stats: MemoryControlStats, - batch_manager: Arc, - stream_manager: Arc, - watermark_epoch: Arc, - ) -> MemoryControlStats; - - fn describe(&self, total_compute_memory_bytes: usize) -> String; -} - -pub fn memory_control_policy_from_config(opts: &ComputeNodeOpts) -> Result { - let input_policy = &opts.memory_control_policy; - if input_policy == FixedProportionPolicy::CONFIG_STR { - Ok(Box::new(FixedProportionPolicy::new( - opts.streaming_memory_proportion, - )?)) - } else if input_policy == StreamingOnlyPolicy::CONFIG_STR { - Ok(Box::new(StreamingOnlyPolicy {})) - } else { - let valid_values = [ - FixedProportionPolicy::CONFIG_STR, - StreamingOnlyPolicy::CONFIG_STR, - ]; - Err(anyhow!(format!( - "invalid memory control policy in configuration: {}, valid values: {:?}", - input_policy, valid_values, - )) - .into()) - } -} +use super::{MemoryControl, MemoryControlStats}; /// `FixedProportionPolicy` performs memory control by limiting the memory usage of both batch and /// streaming to a fixed proportion. @@ -83,7 +35,7 @@ pub struct FixedProportionPolicy { impl FixedProportionPolicy { const BATCH_KILL_QUERY_THRESHOLD: f64 = 0.8; - const CONFIG_STR: &str = "streaming-batch"; + pub const CONFIG_STR: &str = "streaming-batch"; const STREAM_EVICTION_THRESHOLD_AGGRESSIVE: f64 = 0.9; const STREAM_EVICTION_THRESHOLD_GRACEFUL: f64 = 0.7; @@ -172,10 +124,10 @@ impl MemoryControl for FixedProportionPolicy { /// `FixedProportionPolicy` in that it calculates the memory usage based on jemalloc statistics, /// which actually contains system usage other than computing tasks. This is the default memory /// control policy. -pub struct StreamingOnlyPolicy {} +pub struct StreamingOnlyPolicy; impl StreamingOnlyPolicy { - const CONFIG_STR: &str = "streaming-only"; + pub const CONFIG_STR: &str = "streaming-only"; const STREAM_EVICTION_THRESHOLD_AGGRESSIVE: f64 = 0.9; const STREAM_EVICTION_THRESHOLD_GRACEFUL: f64 = 0.7; } @@ -230,7 +182,6 @@ impl MemoryControl for StreamingOnlyPolicy { } } -#[cfg(target_os = "linux")] fn advance_jemalloc_epoch(prev_jemalloc_allocated_mib: usize) -> usize { use tikv_jemalloc_ctl::{epoch as jemalloc_epoch, stats as jemalloc_stats}; @@ -246,11 +197,6 @@ fn advance_jemalloc_epoch(prev_jemalloc_allocated_mib: usize) -> usize { }) } -#[cfg(not(target_os = "linux"))] -fn advance_jemalloc_epoch(_prev_jemalloc_allocated_mib: usize) -> usize { - 0 -} - fn calculate_lru_watermark( cur_stream_used_memory_bytes: usize, stream_memory_threshold_graceful: usize, diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index c972951180cae..8b38728238d45 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -57,10 +57,10 @@ use risingwave_stream::task::{LocalStreamManager, StreamEnvironment}; use tokio::sync::oneshot::Sender; use tokio::task::JoinHandle; +use crate::memory_management::memory_control_policy_from_config; use crate::memory_management::memory_manager::{ GlobalMemoryManager, MIN_COMPUTE_MEMORY_MB, SYSTEM_RESERVED_MEMORY_MB, }; -use crate::memory_management::policy::memory_control_policy_from_config; use crate::observer::observer_manager::ComputeObserverNode; use crate::rpc::service::config_service::ConfigServiceImpl; use crate::rpc::service::exchange_metrics::ExchangeServiceMetrics; From f3791f3140912461933911e99c50d7ba81cd84fa Mon Sep 17 00:00:00 2001 From: xx01cyx Date: Tue, 14 Mar 2023 05:35:24 +0000 Subject: [PATCH 2/4] apply pr suggestions --- src/compute/src/memory_management/mod.rs | 59 ++++++++++++------------ 1 file changed, 29 insertions(+), 30 deletions(-) diff --git a/src/compute/src/memory_management/mod.rs b/src/compute/src/memory_management/mod.rs index 89d2b3e7906c3..dcc0f8855fd16 100644 --- a/src/compute/src/memory_management/mod.rs +++ b/src/compute/src/memory_management/mod.rs @@ -53,43 +53,42 @@ pub trait MemoryControl: Send + Sync { fn describe(&self, total_compute_memory_bytes: usize) -> String; } -#[cfg_attr(not(target_os = "linux"), expect(unused_variables))] +#[cfg(target_os = "linux")] pub fn memory_control_policy_from_config(opts: &ComputeNodeOpts) -> Result { - #[cfg(target_os = "linux")] - { - use anyhow::anyhow; + use anyhow::anyhow; - use self::policy::{FixedProportionPolicy, StreamingOnlyPolicy}; + use self::policy::{FixedProportionPolicy, StreamingOnlyPolicy}; - let input_policy = &opts.memory_control_policy; - if input_policy == FixedProportionPolicy::CONFIG_STR { - Ok(Box::new(FixedProportionPolicy::new( - opts.streaming_memory_proportion, - )?)) - } else if input_policy == StreamingOnlyPolicy::CONFIG_STR { - Ok(Box::new(StreamingOnlyPolicy)) - } else { - let valid_values = [ - FixedProportionPolicy::CONFIG_STR, - StreamingOnlyPolicy::CONFIG_STR, - ]; - Err(anyhow!(format!( - "invalid memory control policy in configuration: {}, valid values: {:?}", - input_policy, valid_values, - )) - .into()) - } + let input_policy = &opts.memory_control_policy; + if input_policy == FixedProportionPolicy::CONFIG_STR { + Ok(Box::new(FixedProportionPolicy::new( + opts.streaming_memory_proportion, + )?)) + } else if input_policy == StreamingOnlyPolicy::CONFIG_STR { + Ok(Box::new(StreamingOnlyPolicy)) + } else { + let valid_values = [ + FixedProportionPolicy::CONFIG_STR, + StreamingOnlyPolicy::CONFIG_STR, + ]; + Err(anyhow!(format!( + "invalid memory control policy in configuration: {}, valid values: {:?}", + input_policy, valid_values, + )) + .into()) } +} - #[cfg(not(target_os = "linux"))] - { - // We disable memory control on operating systems other than Linux now because jemalloc - // stats do not work well. - tracing::warn!("memory control is only enabled on Linux now"); - Ok(Box::new(DummyPolicy)) - } +#[cfg(not(target_os = "linux"))] +pub fn memory_control_policy_from_config(opts: &ComputeNodeOpts) -> Result { + // We disable memory control on operating systems other than Linux now because jemalloc + // stats do not work well. + tracing::warn!("memory control is only enabled on Linux now"); + Ok(Box::new(DummyPolicy)) } +// `DummyPolicy` is used for operarting systems other than Linux. It does nothing as memory control +// is disabled on non-Linux OS. pub struct DummyPolicy; impl MemoryControl for DummyPolicy { From a6076458a029f9cf28384ada05030eda1a1d6e0d Mon Sep 17 00:00:00 2001 From: xx01cyx Date: Tue, 14 Mar 2023 05:36:57 +0000 Subject: [PATCH 3/4] fix doc comment style --- src/compute/src/memory_management/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/compute/src/memory_management/mod.rs b/src/compute/src/memory_management/mod.rs index dcc0f8855fd16..e7386d7321580 100644 --- a/src/compute/src/memory_management/mod.rs +++ b/src/compute/src/memory_management/mod.rs @@ -87,8 +87,8 @@ pub fn memory_control_policy_from_config(opts: &ComputeNodeOpts) -> Result Date: Tue, 14 Mar 2023 14:03:47 +0800 Subject: [PATCH 4/4] Update src/compute/src/memory_management/mod.rs Co-authored-by: Bugen Zhao --- src/compute/src/memory_management/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/compute/src/memory_management/mod.rs b/src/compute/src/memory_management/mod.rs index e7386d7321580..6e3c7526af8aa 100644 --- a/src/compute/src/memory_management/mod.rs +++ b/src/compute/src/memory_management/mod.rs @@ -80,7 +80,7 @@ pub fn memory_control_policy_from_config(opts: &ComputeNodeOpts) -> Result Result { +pub fn memory_control_policy_from_config(_opts: &ComputeNodeOpts) -> Result { // We disable memory control on operating systems other than Linux now because jemalloc // stats do not work well. tracing::warn!("memory control is only enabled on Linux now");