From a4afac3d374958ece7f8921df33f3f94fceb5a65 Mon Sep 17 00:00:00 2001 From: Yuanxin Cao <60498509+xx01cyx@users.noreply.github.com> Date: Tue, 14 Mar 2023 14:21:29 +0800 Subject: [PATCH] refactor: refine conditional compilation for mem control on different OSs (#8504) Co-authored-by: Bugen Zhao --- .../src/memory_management/memory_manager.rs | 25 ++--- src/compute/src/memory_management/mod.rs | 94 +++++++++++++++++++ src/compute/src/memory_management/policy.rs | 62 +----------- src/compute/src/server.rs | 2 +- 4 files changed, 105 insertions(+), 78 deletions(-) diff --git a/src/compute/src/memory_management/memory_manager.rs b/src/compute/src/memory_management/memory_manager.rs index 97d6be1c38dc..b585bfbc973e 100644 --- a/src/compute/src/memory_management/memory_manager.rs +++ b/src/compute/src/memory_management/memory_manager.rs @@ -14,12 +14,15 @@ use std::sync::atomic::AtomicU64; use std::sync::Arc; +use std::time::Duration; use risingwave_batch::task::BatchManager; +use risingwave_common::util::epoch::Epoch; use risingwave_stream::executor::monitor::StreamingMetrics; use risingwave_stream::task::LocalStreamManager; -use super::policy::MemoryControlPolicy; +use super::MemoryControlPolicy; +use crate::memory_management::MemoryControlStats; /// The minimal memory requirement of computing tasks in megabytes. pub const MIN_COMPUTE_MEMORY_MB: usize = 512; @@ -29,7 +32,6 @@ pub const SYSTEM_RESERVED_MEMORY_MB: usize = 512; /// When `enable_managed_cache` is set, compute node will launch a [`GlobalMemoryManager`] to limit /// the memory usage. -#[cfg_attr(not(target_os = "linux"), expect(dead_code))] pub struct GlobalMemoryManager { /// All cached data before the watermark should be evicted. watermark_epoch: Arc, @@ -74,28 +76,13 @@ impl GlobalMemoryManager { self.watermark_epoch.clone() } - // FIXME: remove such limitation after #7180 - /// Jemalloc is not supported on Windows, because of tikv-jemalloc's own reasons. - /// See the comments for the macro `enable_jemalloc_on_linux!()` - #[cfg(not(target_os = "linux"))] - #[expect(clippy::unused_async)] - pub async fn run(self: Arc, _: Arc, _: Arc) {} - - /// Memory manager will get memory usage from batch and streaming, and do some actions. - /// 1. if batch exceeds, kill running query. - /// 2. if streaming exceeds, evict cache by watermark. - #[cfg(target_os = "linux")] + /// Memory manager will get memory usage statistics from batch and streaming and perform memory + /// control accordingly. pub async fn run( self: Arc, batch_manager: Arc, stream_manager: Arc, ) { - use std::time::Duration; - - use risingwave_common::util::epoch::Epoch; - - use crate::memory_management::policy::MemoryControlStats; - let mut tick_interval = tokio::time::interval(Duration::from_millis(self.barrier_interval_ms as u64)); let mut memory_control_stats = MemoryControlStats { diff --git a/src/compute/src/memory_management/mod.rs b/src/compute/src/memory_management/mod.rs index 4fc5f94fbbfe..6e3c7526af8a 100644 --- a/src/compute/src/memory_management/mod.rs +++ b/src/compute/src/memory_management/mod.rs @@ -13,4 +13,98 @@ // limitations under the License. pub mod memory_manager; +#[cfg(target_os = "linux")] pub mod policy; + +use std::sync::atomic::AtomicU64; +use std::sync::Arc; + +use risingwave_batch::task::BatchManager; +use risingwave_common::error::Result; +use risingwave_stream::task::LocalStreamManager; + +use crate::ComputeNodeOpts; + +/// `MemoryControlStats` contains the necessary information for memory control, including both batch +/// and streaming. +#[derive(Default)] +pub struct MemoryControlStats { + pub batch_memory_usage: usize, + pub streaming_memory_usage: usize, + pub jemalloc_allocated_mib: usize, + pub lru_watermark_step: u64, + pub lru_watermark_time_ms: u64, + pub lru_physical_now_ms: u64, +} + +pub type MemoryControlPolicy = Box; + +pub trait MemoryControl: Send + Sync { + fn apply( + &self, + total_compute_memory_bytes: usize, + barrier_interval_ms: u32, + prev_memory_stats: MemoryControlStats, + batch_manager: Arc, + stream_manager: Arc, + watermark_epoch: Arc, + ) -> MemoryControlStats; + + fn describe(&self, total_compute_memory_bytes: usize) -> String; +} + +#[cfg(target_os = "linux")] +pub fn memory_control_policy_from_config(opts: &ComputeNodeOpts) -> Result { + use anyhow::anyhow; + + use self::policy::{FixedProportionPolicy, StreamingOnlyPolicy}; + + let input_policy = &opts.memory_control_policy; + if input_policy == FixedProportionPolicy::CONFIG_STR { + Ok(Box::new(FixedProportionPolicy::new( + opts.streaming_memory_proportion, + )?)) + } else if input_policy == StreamingOnlyPolicy::CONFIG_STR { + Ok(Box::new(StreamingOnlyPolicy)) + } else { + let valid_values = [ + FixedProportionPolicy::CONFIG_STR, + StreamingOnlyPolicy::CONFIG_STR, + ]; + Err(anyhow!(format!( + "invalid memory control policy in configuration: {}, valid values: {:?}", + input_policy, valid_values, + )) + .into()) + } +} + +#[cfg(not(target_os = "linux"))] +pub fn memory_control_policy_from_config(_opts: &ComputeNodeOpts) -> Result { + // We disable memory control on operating systems other than Linux now because jemalloc + // stats do not work well. + tracing::warn!("memory control is only enabled on Linux now"); + Ok(Box::new(DummyPolicy)) +} + +/// `DummyPolicy` is used for operarting systems other than Linux. It does nothing as memory control +/// is disabled on non-Linux OS. +pub struct DummyPolicy; + +impl MemoryControl for DummyPolicy { + fn apply( + &self, + _total_compute_memory_bytes: usize, + _barrier_interval_ms: u32, + _prev_memory_stats: MemoryControlStats, + _batch_manager: Arc, + _stream_manager: Arc, + _watermark_epoch: Arc, + ) -> MemoryControlStats { + MemoryControlStats::default() + } + + fn describe(&self, _total_compute_memory_bytes: usize) -> String { + "DummyPolicy".to_string() + } +} diff --git a/src/compute/src/memory_management/policy.rs b/src/compute/src/memory_management/policy.rs index 56caa279a6f9..5b7f20c88cb4 100644 --- a/src/compute/src/memory_management/policy.rs +++ b/src/compute/src/memory_management/policy.rs @@ -22,55 +22,7 @@ use risingwave_common::error::Result; use risingwave_common::util::epoch::Epoch; use risingwave_stream::task::LocalStreamManager; -use crate::ComputeNodeOpts; - -/// `MemoryControlStats` contains the necessary information for memory control, including both batch -/// and streaming. -pub struct MemoryControlStats { - pub batch_memory_usage: usize, - pub streaming_memory_usage: usize, - pub jemalloc_allocated_mib: usize, - pub lru_watermark_step: u64, - pub lru_watermark_time_ms: u64, - pub lru_physical_now_ms: u64, -} - -pub type MemoryControlPolicy = Box; - -pub trait MemoryControl: Send + Sync { - fn apply( - &self, - total_compute_memory_bytes: usize, - barrier_interval_ms: u32, - prev_memory_stats: MemoryControlStats, - batch_manager: Arc, - stream_manager: Arc, - watermark_epoch: Arc, - ) -> MemoryControlStats; - - fn describe(&self, total_compute_memory_bytes: usize) -> String; -} - -pub fn memory_control_policy_from_config(opts: &ComputeNodeOpts) -> Result { - let input_policy = &opts.memory_control_policy; - if input_policy == FixedProportionPolicy::CONFIG_STR { - Ok(Box::new(FixedProportionPolicy::new( - opts.streaming_memory_proportion, - )?)) - } else if input_policy == StreamingOnlyPolicy::CONFIG_STR { - Ok(Box::new(StreamingOnlyPolicy {})) - } else { - let valid_values = [ - FixedProportionPolicy::CONFIG_STR, - StreamingOnlyPolicy::CONFIG_STR, - ]; - Err(anyhow!(format!( - "invalid memory control policy in configuration: {}, valid values: {:?}", - input_policy, valid_values, - )) - .into()) - } -} +use super::{MemoryControl, MemoryControlStats}; /// `FixedProportionPolicy` performs memory control by limiting the memory usage of both batch and /// streaming to a fixed proportion. @@ -83,7 +35,7 @@ pub struct FixedProportionPolicy { impl FixedProportionPolicy { const BATCH_KILL_QUERY_THRESHOLD: f64 = 0.8; - const CONFIG_STR: &str = "streaming-batch"; + pub const CONFIG_STR: &str = "streaming-batch"; const STREAM_EVICTION_THRESHOLD_AGGRESSIVE: f64 = 0.9; const STREAM_EVICTION_THRESHOLD_GRACEFUL: f64 = 0.7; @@ -172,10 +124,10 @@ impl MemoryControl for FixedProportionPolicy { /// `FixedProportionPolicy` in that it calculates the memory usage based on jemalloc statistics, /// which actually contains system usage other than computing tasks. This is the default memory /// control policy. -pub struct StreamingOnlyPolicy {} +pub struct StreamingOnlyPolicy; impl StreamingOnlyPolicy { - const CONFIG_STR: &str = "streaming-only"; + pub const CONFIG_STR: &str = "streaming-only"; const STREAM_EVICTION_THRESHOLD_AGGRESSIVE: f64 = 0.9; const STREAM_EVICTION_THRESHOLD_GRACEFUL: f64 = 0.7; } @@ -230,7 +182,6 @@ impl MemoryControl for StreamingOnlyPolicy { } } -#[cfg(target_os = "linux")] fn advance_jemalloc_epoch(prev_jemalloc_allocated_mib: usize) -> usize { use tikv_jemalloc_ctl::{epoch as jemalloc_epoch, stats as jemalloc_stats}; @@ -246,11 +197,6 @@ fn advance_jemalloc_epoch(prev_jemalloc_allocated_mib: usize) -> usize { }) } -#[cfg(not(target_os = "linux"))] -fn advance_jemalloc_epoch(_prev_jemalloc_allocated_mib: usize) -> usize { - 0 -} - fn calculate_lru_watermark( cur_stream_used_memory_bytes: usize, stream_memory_threshold_graceful: usize, diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 8c27dd874967..94fdbbd90a27 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -57,10 +57,10 @@ use risingwave_stream::task::{LocalStreamManager, StreamEnvironment}; use tokio::sync::oneshot::Sender; use tokio::task::JoinHandle; +use crate::memory_management::memory_control_policy_from_config; use crate::memory_management::memory_manager::{ GlobalMemoryManager, MIN_COMPUTE_MEMORY_MB, SYSTEM_RESERVED_MEMORY_MB, }; -use crate::memory_management::policy::memory_control_policy_from_config; use crate::observer::observer_manager::ComputeObserverNode; use crate::rpc::service::config_service::ConfigServiceImpl; use crate::rpc::service::exchange_metrics::ExchangeServiceMetrics;