Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: refine conditional compilation for mem control on different OSs #8504

Merged
merged 4 commits into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 6 additions & 19 deletions src/compute/src/memory_management/memory_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@

use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Duration;

use risingwave_batch::task::BatchManager;
use risingwave_common::util::epoch::Epoch;
use risingwave_stream::executor::monitor::StreamingMetrics;
use risingwave_stream::task::LocalStreamManager;

use super::policy::MemoryControlPolicy;
use super::MemoryControlPolicy;
use crate::memory_management::MemoryControlStats;

/// The minimal memory requirement of computing tasks in megabytes.
pub const MIN_COMPUTE_MEMORY_MB: usize = 512;
Expand All @@ -29,7 +32,6 @@ pub const SYSTEM_RESERVED_MEMORY_MB: usize = 512;

/// When `enable_managed_cache` is set, compute node will launch a [`GlobalMemoryManager`] to limit
/// the memory usage.
#[cfg_attr(not(target_os = "linux"), expect(dead_code))]
pub struct GlobalMemoryManager {
/// All cached data before the watermark should be evicted.
watermark_epoch: Arc<AtomicU64>,
Expand Down Expand Up @@ -74,28 +76,13 @@ impl GlobalMemoryManager {
self.watermark_epoch.clone()
}

// FIXME: remove such limitation after #7180
/// Jemalloc is not supported on Windows, because of tikv-jemalloc's own reasons.
/// See the comments for the macro `enable_jemalloc_on_linux!()`
#[cfg(not(target_os = "linux"))]
#[expect(clippy::unused_async)]
pub async fn run(self: Arc<Self>, _: Arc<BatchManager>, _: Arc<LocalStreamManager>) {}

/// Memory manager will get memory usage from batch and streaming, and do some actions.
/// 1. if batch exceeds, kill running query.
/// 2. if streaming exceeds, evict cache by watermark.
#[cfg(target_os = "linux")]
/// Memory manager will get memory usage statistics from batch and streaming and perform memory
/// control accordingly.
pub async fn run(
self: Arc<Self>,
batch_manager: Arc<BatchManager>,
stream_manager: Arc<LocalStreamManager>,
) {
use std::time::Duration;

use risingwave_common::util::epoch::Epoch;

use crate::memory_management::policy::MemoryControlStats;

let mut tick_interval =
tokio::time::interval(Duration::from_millis(self.barrier_interval_ms as u64));
let mut memory_control_stats = MemoryControlStats {
Expand Down
95 changes: 95 additions & 0 deletions src/compute/src/memory_management/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,99 @@
// limitations under the License.

pub mod memory_manager;
#[cfg(target_os = "linux")]
pub mod policy;

use std::sync::atomic::AtomicU64;
use std::sync::Arc;

use risingwave_batch::task::BatchManager;
use risingwave_common::error::Result;
use risingwave_stream::task::LocalStreamManager;

use crate::ComputeNodeOpts;

/// `MemoryControlStats` contains the necessary information for memory control, including both batch
/// and streaming.
#[derive(Default)]
pub struct MemoryControlStats {
pub batch_memory_usage: usize,
pub streaming_memory_usage: usize,
pub jemalloc_allocated_mib: usize,
pub lru_watermark_step: u64,
pub lru_watermark_time_ms: u64,
pub lru_physical_now_ms: u64,
}

pub type MemoryControlPolicy = Box<dyn MemoryControl>;

pub trait MemoryControl: Send + Sync {
fn apply(
&self,
total_compute_memory_bytes: usize,
barrier_interval_ms: u32,
prev_memory_stats: MemoryControlStats,
batch_manager: Arc<BatchManager>,
stream_manager: Arc<LocalStreamManager>,
watermark_epoch: Arc<AtomicU64>,
) -> MemoryControlStats;

fn describe(&self, total_compute_memory_bytes: usize) -> String;
}

#[cfg_attr(not(target_os = "linux"), expect(unused_variables))]
pub fn memory_control_policy_from_config(opts: &ComputeNodeOpts) -> Result<MemoryControlPolicy> {
#[cfg(target_os = "linux")]
xx01cyx marked this conversation as resolved.
Show resolved Hide resolved
{
use anyhow::anyhow;

use self::policy::{FixedProportionPolicy, StreamingOnlyPolicy};

let input_policy = &opts.memory_control_policy;
if input_policy == FixedProportionPolicy::CONFIG_STR {
Ok(Box::new(FixedProportionPolicy::new(
opts.streaming_memory_proportion,
)?))
} else if input_policy == StreamingOnlyPolicy::CONFIG_STR {
Ok(Box::new(StreamingOnlyPolicy))
} else {
let valid_values = [
FixedProportionPolicy::CONFIG_STR,
StreamingOnlyPolicy::CONFIG_STR,
];
Err(anyhow!(format!(
"invalid memory control policy in configuration: {}, valid values: {:?}",
input_policy, valid_values,
))
.into())
}
}

#[cfg(not(target_os = "linux"))]
{
// We disable memory control on operating systems other than Linux now because jemalloc
// stats do not work well.
tracing::warn!("memory control is only enabled on Linux now");
Ok(Box::new(DummyPolicy))
}
}

pub struct DummyPolicy;
xx01cyx marked this conversation as resolved.
Show resolved Hide resolved

impl MemoryControl for DummyPolicy {
fn apply(
&self,
_total_compute_memory_bytes: usize,
_barrier_interval_ms: u32,
_prev_memory_stats: MemoryControlStats,
_batch_manager: Arc<BatchManager>,
_stream_manager: Arc<LocalStreamManager>,
_watermark_epoch: Arc<AtomicU64>,
) -> MemoryControlStats {
MemoryControlStats::default()
}

fn describe(&self, _total_compute_memory_bytes: usize) -> String {
"DummyPolicy".to_string()
}
}
62 changes: 4 additions & 58 deletions src/compute/src/memory_management/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,55 +22,7 @@ use risingwave_common::error::Result;
use risingwave_common::util::epoch::Epoch;
use risingwave_stream::task::LocalStreamManager;

use crate::ComputeNodeOpts;

/// `MemoryControlStats` contains the necessary information for memory control, including both batch
/// and streaming.
pub struct MemoryControlStats {
pub batch_memory_usage: usize,
pub streaming_memory_usage: usize,
pub jemalloc_allocated_mib: usize,
pub lru_watermark_step: u64,
pub lru_watermark_time_ms: u64,
pub lru_physical_now_ms: u64,
}

pub type MemoryControlPolicy = Box<dyn MemoryControl>;

pub trait MemoryControl: Send + Sync {
fn apply(
&self,
total_compute_memory_bytes: usize,
barrier_interval_ms: u32,
prev_memory_stats: MemoryControlStats,
batch_manager: Arc<BatchManager>,
stream_manager: Arc<LocalStreamManager>,
watermark_epoch: Arc<AtomicU64>,
) -> MemoryControlStats;

fn describe(&self, total_compute_memory_bytes: usize) -> String;
}

pub fn memory_control_policy_from_config(opts: &ComputeNodeOpts) -> Result<MemoryControlPolicy> {
let input_policy = &opts.memory_control_policy;
if input_policy == FixedProportionPolicy::CONFIG_STR {
Ok(Box::new(FixedProportionPolicy::new(
opts.streaming_memory_proportion,
)?))
} else if input_policy == StreamingOnlyPolicy::CONFIG_STR {
Ok(Box::new(StreamingOnlyPolicy {}))
} else {
let valid_values = [
FixedProportionPolicy::CONFIG_STR,
StreamingOnlyPolicy::CONFIG_STR,
];
Err(anyhow!(format!(
"invalid memory control policy in configuration: {}, valid values: {:?}",
input_policy, valid_values,
))
.into())
}
}
use super::{MemoryControl, MemoryControlStats};

/// `FixedProportionPolicy` performs memory control by limiting the memory usage of both batch and
/// streaming to a fixed proportion.
Expand All @@ -83,7 +35,7 @@ pub struct FixedProportionPolicy {

impl FixedProportionPolicy {
const BATCH_KILL_QUERY_THRESHOLD: f64 = 0.8;
const CONFIG_STR: &str = "streaming-batch";
pub const CONFIG_STR: &str = "streaming-batch";
const STREAM_EVICTION_THRESHOLD_AGGRESSIVE: f64 = 0.9;
const STREAM_EVICTION_THRESHOLD_GRACEFUL: f64 = 0.7;

Expand Down Expand Up @@ -172,10 +124,10 @@ impl MemoryControl for FixedProportionPolicy {
/// `FixedProportionPolicy` in that it calculates the memory usage based on jemalloc statistics,
/// which actually contains system usage other than computing tasks. This is the default memory
/// control policy.
pub struct StreamingOnlyPolicy {}
pub struct StreamingOnlyPolicy;

impl StreamingOnlyPolicy {
const CONFIG_STR: &str = "streaming-only";
pub const CONFIG_STR: &str = "streaming-only";
const STREAM_EVICTION_THRESHOLD_AGGRESSIVE: f64 = 0.9;
const STREAM_EVICTION_THRESHOLD_GRACEFUL: f64 = 0.7;
}
Expand Down Expand Up @@ -230,7 +182,6 @@ impl MemoryControl for StreamingOnlyPolicy {
}
}

#[cfg(target_os = "linux")]
fn advance_jemalloc_epoch(prev_jemalloc_allocated_mib: usize) -> usize {
use tikv_jemalloc_ctl::{epoch as jemalloc_epoch, stats as jemalloc_stats};

Expand All @@ -246,11 +197,6 @@ fn advance_jemalloc_epoch(prev_jemalloc_allocated_mib: usize) -> usize {
})
}

#[cfg(not(target_os = "linux"))]
fn advance_jemalloc_epoch(_prev_jemalloc_allocated_mib: usize) -> usize {
0
}

fn calculate_lru_watermark(
cur_stream_used_memory_bytes: usize,
stream_memory_threshold_graceful: usize,
Expand Down
2 changes: 1 addition & 1 deletion src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ use risingwave_stream::task::{LocalStreamManager, StreamEnvironment};
use tokio::sync::oneshot::Sender;
use tokio::task::JoinHandle;

use crate::memory_management::memory_control_policy_from_config;
use crate::memory_management::memory_manager::{
GlobalMemoryManager, MIN_COMPUTE_MEMORY_MB, SYSTEM_RESERVED_MEMORY_MB,
};
use crate::memory_management::policy::memory_control_policy_from_config;
use crate::observer::observer_manager::ComputeObserverNode;
use crate::rpc::service::config_service::ConfigServiceImpl;
use crate::rpc::service::exchange_metrics::ExchangeServiceMetrics;
Expand Down