Skip to content

Commit

Permalink
refactor: refine conditional compilation for mem control on different…
Browse files Browse the repository at this point in the history
… OSs (risingwavelabs#8504)

Co-authored-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
xx01cyx and BugenZhao authored Mar 14, 2023
1 parent 9564db0 commit a4afac3
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 78 deletions.
25 changes: 6 additions & 19 deletions src/compute/src/memory_management/memory_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@

use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Duration;

use risingwave_batch::task::BatchManager;
use risingwave_common::util::epoch::Epoch;
use risingwave_stream::executor::monitor::StreamingMetrics;
use risingwave_stream::task::LocalStreamManager;

use super::policy::MemoryControlPolicy;
use super::MemoryControlPolicy;
use crate::memory_management::MemoryControlStats;

/// The minimal memory requirement of computing tasks in megabytes.
pub const MIN_COMPUTE_MEMORY_MB: usize = 512;
Expand All @@ -29,7 +32,6 @@ pub const SYSTEM_RESERVED_MEMORY_MB: usize = 512;

/// When `enable_managed_cache` is set, compute node will launch a [`GlobalMemoryManager`] to limit
/// the memory usage.
#[cfg_attr(not(target_os = "linux"), expect(dead_code))]
pub struct GlobalMemoryManager {
/// All cached data before the watermark should be evicted.
watermark_epoch: Arc<AtomicU64>,
Expand Down Expand Up @@ -74,28 +76,13 @@ impl GlobalMemoryManager {
self.watermark_epoch.clone()
}

// FIXME: remove such limitation after #7180
/// Jemalloc is not supported on Windows, because of tikv-jemalloc's own reasons.
/// See the comments for the macro `enable_jemalloc_on_linux!()`
#[cfg(not(target_os = "linux"))]
#[expect(clippy::unused_async)]
pub async fn run(self: Arc<Self>, _: Arc<BatchManager>, _: Arc<LocalStreamManager>) {}

/// Memory manager will get memory usage from batch and streaming, and do some actions.
/// 1. if batch exceeds, kill running query.
/// 2. if streaming exceeds, evict cache by watermark.
#[cfg(target_os = "linux")]
/// Memory manager will get memory usage statistics from batch and streaming and perform memory
/// control accordingly.
pub async fn run(
self: Arc<Self>,
batch_manager: Arc<BatchManager>,
stream_manager: Arc<LocalStreamManager>,
) {
use std::time::Duration;

use risingwave_common::util::epoch::Epoch;

use crate::memory_management::policy::MemoryControlStats;

let mut tick_interval =
tokio::time::interval(Duration::from_millis(self.barrier_interval_ms as u64));
let mut memory_control_stats = MemoryControlStats {
Expand Down
94 changes: 94 additions & 0 deletions src/compute/src/memory_management/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,98 @@
// limitations under the License.

pub mod memory_manager;
#[cfg(target_os = "linux")]
pub mod policy;

use std::sync::atomic::AtomicU64;
use std::sync::Arc;

use risingwave_batch::task::BatchManager;
use risingwave_common::error::Result;
use risingwave_stream::task::LocalStreamManager;

use crate::ComputeNodeOpts;

/// `MemoryControlStats` contains the necessary information for memory control, including both batch
/// and streaming.
#[derive(Default)]
pub struct MemoryControlStats {
pub batch_memory_usage: usize,
pub streaming_memory_usage: usize,
pub jemalloc_allocated_mib: usize,
pub lru_watermark_step: u64,
pub lru_watermark_time_ms: u64,
pub lru_physical_now_ms: u64,
}

pub type MemoryControlPolicy = Box<dyn MemoryControl>;

pub trait MemoryControl: Send + Sync {
fn apply(
&self,
total_compute_memory_bytes: usize,
barrier_interval_ms: u32,
prev_memory_stats: MemoryControlStats,
batch_manager: Arc<BatchManager>,
stream_manager: Arc<LocalStreamManager>,
watermark_epoch: Arc<AtomicU64>,
) -> MemoryControlStats;

fn describe(&self, total_compute_memory_bytes: usize) -> String;
}

#[cfg(target_os = "linux")]
pub fn memory_control_policy_from_config(opts: &ComputeNodeOpts) -> Result<MemoryControlPolicy> {
use anyhow::anyhow;

use self::policy::{FixedProportionPolicy, StreamingOnlyPolicy};

let input_policy = &opts.memory_control_policy;
if input_policy == FixedProportionPolicy::CONFIG_STR {
Ok(Box::new(FixedProportionPolicy::new(
opts.streaming_memory_proportion,
)?))
} else if input_policy == StreamingOnlyPolicy::CONFIG_STR {
Ok(Box::new(StreamingOnlyPolicy))
} else {
let valid_values = [
FixedProportionPolicy::CONFIG_STR,
StreamingOnlyPolicy::CONFIG_STR,
];
Err(anyhow!(format!(
"invalid memory control policy in configuration: {}, valid values: {:?}",
input_policy, valid_values,
))
.into())
}
}

#[cfg(not(target_os = "linux"))]
pub fn memory_control_policy_from_config(_opts: &ComputeNodeOpts) -> Result<MemoryControlPolicy> {
// We disable memory control on operating systems other than Linux now because jemalloc
// stats do not work well.
tracing::warn!("memory control is only enabled on Linux now");
Ok(Box::new(DummyPolicy))
}

/// `DummyPolicy` is used for operarting systems other than Linux. It does nothing as memory control
/// is disabled on non-Linux OS.
pub struct DummyPolicy;

impl MemoryControl for DummyPolicy {
fn apply(
&self,
_total_compute_memory_bytes: usize,
_barrier_interval_ms: u32,
_prev_memory_stats: MemoryControlStats,
_batch_manager: Arc<BatchManager>,
_stream_manager: Arc<LocalStreamManager>,
_watermark_epoch: Arc<AtomicU64>,
) -> MemoryControlStats {
MemoryControlStats::default()
}

fn describe(&self, _total_compute_memory_bytes: usize) -> String {
"DummyPolicy".to_string()
}
}
62 changes: 4 additions & 58 deletions src/compute/src/memory_management/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,55 +22,7 @@ use risingwave_common::error::Result;
use risingwave_common::util::epoch::Epoch;
use risingwave_stream::task::LocalStreamManager;

use crate::ComputeNodeOpts;

/// `MemoryControlStats` contains the necessary information for memory control, including both batch
/// and streaming.
pub struct MemoryControlStats {
pub batch_memory_usage: usize,
pub streaming_memory_usage: usize,
pub jemalloc_allocated_mib: usize,
pub lru_watermark_step: u64,
pub lru_watermark_time_ms: u64,
pub lru_physical_now_ms: u64,
}

pub type MemoryControlPolicy = Box<dyn MemoryControl>;

pub trait MemoryControl: Send + Sync {
fn apply(
&self,
total_compute_memory_bytes: usize,
barrier_interval_ms: u32,
prev_memory_stats: MemoryControlStats,
batch_manager: Arc<BatchManager>,
stream_manager: Arc<LocalStreamManager>,
watermark_epoch: Arc<AtomicU64>,
) -> MemoryControlStats;

fn describe(&self, total_compute_memory_bytes: usize) -> String;
}

pub fn memory_control_policy_from_config(opts: &ComputeNodeOpts) -> Result<MemoryControlPolicy> {
let input_policy = &opts.memory_control_policy;
if input_policy == FixedProportionPolicy::CONFIG_STR {
Ok(Box::new(FixedProportionPolicy::new(
opts.streaming_memory_proportion,
)?))
} else if input_policy == StreamingOnlyPolicy::CONFIG_STR {
Ok(Box::new(StreamingOnlyPolicy {}))
} else {
let valid_values = [
FixedProportionPolicy::CONFIG_STR,
StreamingOnlyPolicy::CONFIG_STR,
];
Err(anyhow!(format!(
"invalid memory control policy in configuration: {}, valid values: {:?}",
input_policy, valid_values,
))
.into())
}
}
use super::{MemoryControl, MemoryControlStats};

/// `FixedProportionPolicy` performs memory control by limiting the memory usage of both batch and
/// streaming to a fixed proportion.
Expand All @@ -83,7 +35,7 @@ pub struct FixedProportionPolicy {

impl FixedProportionPolicy {
const BATCH_KILL_QUERY_THRESHOLD: f64 = 0.8;
const CONFIG_STR: &str = "streaming-batch";
pub const CONFIG_STR: &str = "streaming-batch";
const STREAM_EVICTION_THRESHOLD_AGGRESSIVE: f64 = 0.9;
const STREAM_EVICTION_THRESHOLD_GRACEFUL: f64 = 0.7;

Expand Down Expand Up @@ -172,10 +124,10 @@ impl MemoryControl for FixedProportionPolicy {
/// `FixedProportionPolicy` in that it calculates the memory usage based on jemalloc statistics,
/// which actually contains system usage other than computing tasks. This is the default memory
/// control policy.
pub struct StreamingOnlyPolicy {}
pub struct StreamingOnlyPolicy;

impl StreamingOnlyPolicy {
const CONFIG_STR: &str = "streaming-only";
pub const CONFIG_STR: &str = "streaming-only";
const STREAM_EVICTION_THRESHOLD_AGGRESSIVE: f64 = 0.9;
const STREAM_EVICTION_THRESHOLD_GRACEFUL: f64 = 0.7;
}
Expand Down Expand Up @@ -230,7 +182,6 @@ impl MemoryControl for StreamingOnlyPolicy {
}
}

#[cfg(target_os = "linux")]
fn advance_jemalloc_epoch(prev_jemalloc_allocated_mib: usize) -> usize {
use tikv_jemalloc_ctl::{epoch as jemalloc_epoch, stats as jemalloc_stats};

Expand All @@ -246,11 +197,6 @@ fn advance_jemalloc_epoch(prev_jemalloc_allocated_mib: usize) -> usize {
})
}

#[cfg(not(target_os = "linux"))]
fn advance_jemalloc_epoch(_prev_jemalloc_allocated_mib: usize) -> usize {
0
}

fn calculate_lru_watermark(
cur_stream_used_memory_bytes: usize,
stream_memory_threshold_graceful: usize,
Expand Down
2 changes: 1 addition & 1 deletion src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ use risingwave_stream::task::{LocalStreamManager, StreamEnvironment};
use tokio::sync::oneshot::Sender;
use tokio::task::JoinHandle;

use crate::memory_management::memory_control_policy_from_config;
use crate::memory_management::memory_manager::{
GlobalMemoryManager, MIN_COMPUTE_MEMORY_MB, SYSTEM_RESERVED_MEMORY_MB,
};
use crate::memory_management::policy::memory_control_policy_from_config;
use crate::observer::observer_manager::ComputeObserverNode;
use crate::rpc::service::config_service::ConfigServiceImpl;
use crate::rpc::service::exchange_metrics::ExchangeServiceMetrics;
Expand Down

0 comments on commit a4afac3

Please sign in to comment.